Files
EdgeAPI/internal/rpc/services/service_metric_stat.go

194 lines
5.4 KiB
Go
Raw Normal View History

2021-06-30 19:59:49 +08:00
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package services
import (
"context"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
2022-06-22 21:51:44 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
2021-06-30 19:59:49 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
2022-06-22 21:51:44 +08:00
"github.com/iwind/TeaGo/dbs"
2021-06-30 19:59:49 +08:00
"github.com/iwind/TeaGo/types"
2022-06-22 21:51:44 +08:00
"strings"
"sync"
"time"
2021-06-30 19:59:49 +08:00
)
2022-06-22 21:51:44 +08:00
// 队列相关数据
var metricStatsMap = map[string]*pb.UploadMetricStatsRequest{} // key (clusterId@nodeId@serverId@itemId) => UploadMetricStatsRequest
var metricStatKeysQueue = make(chan string, 100_000)
var metricStatsLocker = &sync.Mutex{}
func init() {
dbs.OnReadyDone(func() {
goman.New(func() {
// 将队列导入数据库
var countKeys = 0
for key := range metricStatKeysQueue {
err := func(key string) error {
var tx *dbs.Tx
metricStatsLocker.Lock()
req, ok := metricStatsMap[key]
if !ok {
metricStatsLocker.Unlock()
return nil
}
delete(metricStatsMap, key)
metricStatsLocker.Unlock()
var pieces = strings.Split(key, "@")
var clusterId = types.Int64(pieces[0])
var nodeId = types.Int64(pieces[1])
var serverId = types.Int64(pieces[2])
var itemId = types.Int64(pieces[3])
// 删除旧的数据
err := models.SharedMetricStatDAO.DeleteNodeItemStats(tx, nodeId, serverId, itemId, req.Time)
if err != nil {
return err
}
for _, stat := range req.MetricStats {
err := models.SharedMetricStatDAO.CreateStat(tx, stat.Hash, clusterId, nodeId, req.ServerId, req.ItemId, stat.Keys, float64(stat.Value), req.Time, req.Version)
if err != nil {
return err
}
}
// 保存总和
err = models.SharedMetricSumStatDAO.UpdateSum(tx, clusterId, nodeId, req.ServerId, req.Time, req.ItemId, req.Version, req.Count, req.Total)
if err != nil {
return err
}
return nil
}(key)
if err != nil {
remotelogs.Error("METRIC_STAT", "upload metric stats failed: "+err.Error())
}
// 人为限速
countKeys++
if countKeys >= 100 {
countKeys = 0
time.Sleep(1 * time.Second)
}
}
})
})
}
2021-06-30 19:59:49 +08:00
// MetricStatService 指标统计数据相关服务
type MetricStatService struct {
BaseService
}
// UploadMetricStats 上传统计数据
func (this *MetricStatService) UploadMetricStats(ctx context.Context, req *pb.UploadMetricStatsRequest) (*pb.RPCSuccess, error) {
nodeId, err := this.ValidateNode(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
2021-08-01 11:13:46 +08:00
clusterId, err := models.SharedServerDAO.FindServerClusterId(tx, req.ServerId)
2021-06-30 19:59:49 +08:00
if err != nil {
return nil, err
}
2022-06-22 21:51:44 +08:00
var key = types.String(clusterId) + "@" + types.String(nodeId) + "@" + types.String(req.ServerId) + "@" + types.String(req.ItemId)
metricStatsLocker.Lock()
metricStatsMap[key] = req
2022-06-22 21:51:44 +08:00
select {
case metricStatKeysQueue <- key:
default:
// 如果满了就删除
delete(metricStatsMap, key)
2021-06-30 19:59:49 +08:00
}
2022-06-22 21:51:44 +08:00
metricStatsLocker.Unlock()
2021-07-01 10:39:42 +08:00
2021-06-30 19:59:49 +08:00
return this.Success()
}
// CountMetricStats 计算指标数据数量
func (this *MetricStatService) CountMetricStats(ctx context.Context, req *pb.CountMetricStatsRequest) (*pb.RPCCountResponse, error) {
_, err := this.ValidateAdmin(ctx, 0)
if err != nil {
return nil, err
}
var tx = this.NullTx()
version, err := models.SharedMetricItemDAO.FindItemVersion(tx, req.MetricItemId)
if err != nil {
return nil, err
}
count, err := models.SharedMetricStatDAO.CountItemStats(tx, req.MetricItemId, version)
if err != nil {
return nil, err
}
return this.SuccessCount(count)
}
// ListMetricStats 读取单页指标数据
func (this *MetricStatService) ListMetricStats(ctx context.Context, req *pb.ListMetricStatsRequest) (*pb.ListMetricStatsResponse, error) {
_, err := this.ValidateAdmin(ctx, 0)
if err != nil {
return nil, err
}
var tx = this.NullTx()
version, err := models.SharedMetricItemDAO.FindItemVersion(tx, req.MetricItemId)
if err != nil {
return nil, err
}
stats, err := models.SharedMetricStatDAO.ListItemStats(tx, req.MetricItemId, version, req.Offset, req.Size)
if err != nil {
return nil, err
}
var pbStats []*pb.MetricStat
for _, stat := range stats {
// cluster
clusterName, err := models.SharedNodeClusterDAO.FindNodeClusterName(tx, int64(stat.ClusterId))
if err != nil {
return nil, err
}
// node
nodeName, err := models.SharedNodeDAO.FindNodeName(tx, int64(stat.NodeId))
if err != nil {
return nil, err
}
// server
serverName, err := models.SharedServerDAO.FindEnabledServerName(tx, int64(stat.ServerId))
if err != nil {
return nil, err
}
2021-07-01 10:39:42 +08:00
// 查找sum值
2021-07-07 19:55:37 +08:00
count, total, err := models.SharedMetricSumStatDAO.FindNodeServerSum(tx, int64(stat.NodeId), int64(stat.ServerId), stat.Time, int64(stat.ItemId), types.Int32(stat.Version))
2021-07-01 10:39:42 +08:00
if err != nil {
return nil, err
}
2021-06-30 19:59:49 +08:00
pbStats = append(pbStats, &pb.MetricStat{
Id: int64(stat.Id),
Hash: stat.Hash,
ServerId: int64(stat.ServerId),
ItemId: int64(stat.ItemId),
Keys: stat.DecodeKeys(),
Value: float32(stat.Value),
Time: stat.Time,
Version: types.Int32(stat.Version),
NodeCluster: &pb.NodeCluster{Id: int64(stat.ClusterId), Name: clusterName},
Node: &pb.Node{Id: int64(stat.NodeId), Name: nodeName},
Server: &pb.Server{Id: int64(stat.ServerId), Name: serverName},
2021-07-01 10:39:42 +08:00
SumCount: count,
SumTotal: total,
2021-06-30 19:59:49 +08:00
})
}
return &pb.ListMetricStatsResponse{MetricStats: pbStats}, nil
}