diff --git a/internal/rpc/services/service_metric_stat.go b/internal/rpc/services/service_metric_stat.go index aa187d4b..d2e1be27 100644 --- a/internal/rpc/services/service_metric_stat.go +++ b/internal/rpc/services/service_metric_stat.go @@ -5,10 +5,81 @@ package services import ( "context" "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeAPI/internal/goman" + "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/types" + "strings" + "sync" + "time" ) +// 队列相关数据 +var metricStatsMap = map[string]*pb.UploadMetricStatsRequest{} // key (clusterId@nodeId@serverId@itemId) => UploadMetricStatsRequest +var metricStatKeysQueue = make(chan string, 100_000) +var metricStatsLocker = &sync.Mutex{} + +func init() { + dbs.OnReadyDone(func() { + goman.New(func() { + // 将队列导入数据库 + var countKeys = 0 + for key := range metricStatKeysQueue { + err := func(key string) error { + var tx *dbs.Tx + + metricStatsLocker.Lock() + req, ok := metricStatsMap[key] + if !ok { + metricStatsLocker.Unlock() + return nil + } + delete(metricStatsMap, key) + metricStatsLocker.Unlock() + + var pieces = strings.Split(key, "@") + var clusterId = types.Int64(pieces[0]) + var nodeId = types.Int64(pieces[1]) + var serverId = types.Int64(pieces[2]) + var itemId = types.Int64(pieces[3]) + + // 删除旧的数据 + err := models.SharedMetricStatDAO.DeleteNodeItemStats(tx, nodeId, serverId, itemId, req.Time) + if err != nil { + return err + } + + for _, stat := range req.MetricStats { + err := models.SharedMetricStatDAO.CreateStat(tx, stat.Hash, clusterId, nodeId, req.ServerId, req.ItemId, stat.Keys, float64(stat.Value), req.Time, req.Version) + if err != nil { + return err + } + } + + // 保存总和 + err = models.SharedMetricSumStatDAO.UpdateSum(tx, clusterId, nodeId, req.ServerId, req.Time, req.ItemId, req.Version, req.Count, req.Total) + if err != nil { + return err + } + + return nil + }(key) + if err != nil { + remotelogs.Error("METRIC_STAT", "upload metric stats failed: "+err.Error()) + } + + // 人为限速 + countKeys++ + if countKeys >= 100 { + countKeys = 0 + time.Sleep(1 * time.Second) + } + } + }) + }) +} + // MetricStatService 指标统计数据相关服务 type MetricStatService struct { BaseService @@ -27,24 +98,18 @@ func (this *MetricStatService) UploadMetricStats(ctx context.Context, req *pb.Up return nil, err } - // 删除旧的数据 - err = models.SharedMetricStatDAO.DeleteNodeItemStats(tx, nodeId, req.ServerId, req.ItemId, req.Time) - if err != nil { - return nil, err + var key = types.String(clusterId) + "@" + types.String(nodeId) + "@" + types.String(req.ServerId) + "@" + types.String(req.ItemId) + metricStatsLocker.Lock() + metricStatsMap[key] = req + + select { + case metricStatKeysQueue <- key: + default: + // 如果满了就删除 + delete(metricStatsMap, key) } - for _, stat := range req.MetricStats { - err := models.SharedMetricStatDAO.CreateStat(tx, stat.Hash, clusterId, nodeId, req.ServerId, req.ItemId, stat.Keys, float64(stat.Value), req.Time, req.Version) - if err != nil { - return nil, err - } - } - - // 保存总和 - err = models.SharedMetricSumStatDAO.UpdateSum(tx, clusterId, nodeId, req.ServerId, req.Time, req.ItemId, req.Version, req.Count, req.Total) - if err != nil { - return nil, err - } + metricStatsLocker.Unlock() return this.Success() }