指标数据写入改成队列执行

This commit is contained in:
GoEdgeLab
2022-06-22 21:51:44 +08:00
parent ad5782f71b
commit c8fc1d5ab9

View File

@@ -5,10 +5,81 @@ package services
import (
"context"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/types"
"strings"
"sync"
"time"
)
// 队列相关数据
var metricStatsMap = map[string]*pb.UploadMetricStatsRequest{} // key (clusterId@nodeId@serverId@itemId) => UploadMetricStatsRequest
var metricStatKeysQueue = make(chan string, 100_000)
var metricStatsLocker = &sync.Mutex{}
func init() {
dbs.OnReadyDone(func() {
goman.New(func() {
// 将队列导入数据库
var countKeys = 0
for key := range metricStatKeysQueue {
err := func(key string) error {
var tx *dbs.Tx
metricStatsLocker.Lock()
req, ok := metricStatsMap[key]
if !ok {
metricStatsLocker.Unlock()
return nil
}
delete(metricStatsMap, key)
metricStatsLocker.Unlock()
var pieces = strings.Split(key, "@")
var clusterId = types.Int64(pieces[0])
var nodeId = types.Int64(pieces[1])
var serverId = types.Int64(pieces[2])
var itemId = types.Int64(pieces[3])
// 删除旧的数据
err := models.SharedMetricStatDAO.DeleteNodeItemStats(tx, nodeId, serverId, itemId, req.Time)
if err != nil {
return err
}
for _, stat := range req.MetricStats {
err := models.SharedMetricStatDAO.CreateStat(tx, stat.Hash, clusterId, nodeId, req.ServerId, req.ItemId, stat.Keys, float64(stat.Value), req.Time, req.Version)
if err != nil {
return err
}
}
// 保存总和
err = models.SharedMetricSumStatDAO.UpdateSum(tx, clusterId, nodeId, req.ServerId, req.Time, req.ItemId, req.Version, req.Count, req.Total)
if err != nil {
return err
}
return nil
}(key)
if err != nil {
remotelogs.Error("METRIC_STAT", "upload metric stats failed: "+err.Error())
}
// 人为限速
countKeys++
if countKeys >= 100 {
countKeys = 0
time.Sleep(1 * time.Second)
}
}
})
})
}
// MetricStatService 指标统计数据相关服务
type MetricStatService struct {
BaseService
@@ -27,24 +98,18 @@ func (this *MetricStatService) UploadMetricStats(ctx context.Context, req *pb.Up
return nil, err
}
// 删除旧的数据
err = models.SharedMetricStatDAO.DeleteNodeItemStats(tx, nodeId, req.ServerId, req.ItemId, req.Time)
if err != nil {
return nil, err
var key = types.String(clusterId) + "@" + types.String(nodeId) + "@" + types.String(req.ServerId) + "@" + types.String(req.ItemId)
metricStatsLocker.Lock()
metricStatsMap[key] = req
select {
case metricStatKeysQueue <- key:
default:
// 如果满了就删除
delete(metricStatsMap, key)
}
for _, stat := range req.MetricStats {
err := models.SharedMetricStatDAO.CreateStat(tx, stat.Hash, clusterId, nodeId, req.ServerId, req.ItemId, stat.Keys, float64(stat.Value), req.Time, req.Version)
if err != nil {
return nil, err
}
}
// 保存总和
err = models.SharedMetricSumStatDAO.UpdateSum(tx, clusterId, nodeId, req.ServerId, req.Time, req.ItemId, req.Version, req.Count, req.Total)
if err != nil {
return nil, err
}
metricStatsLocker.Unlock()
return this.Success()
}