Files
EdgeAPI/internal/rpc/services/service_metric_stat.go
GoEdgeLab 5a17ae9d79 v1.4.1
2024-07-27 14:15:25 +08:00

225 lines
6.1 KiB
Go

// Copyright 2021 GoEdge CDN goedge.cdn@gmail.com. All rights reserved.
package services
import (
"context"
"strings"
"sync"
"time"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/types"
)
// 队列相关数据
var metricStatsMap = map[string]*pb.UploadMetricStatsRequest{} // key (clusterId@nodeId@serverId@itemId) => UploadMetricStatsRequest
var metricStatKeysQueue = make(chan string, 100_000)
var metricStatsLocker = &sync.Mutex{}
func init() {
dbs.OnReadyDone(func() {
goman.New(func() {
// 将队列导入数据库
var countKeys = 0
var useTx = true
for key := range metricStatKeysQueue {
err := func(key string) error {
metricStatsLocker.Lock()
req, ok := metricStatsMap[key]
if !ok {
metricStatsLocker.Unlock()
return nil
}
delete(metricStatsMap, key)
metricStatsLocker.Unlock()
var pieces = strings.Split(key, "@")
var clusterId = types.Int64(pieces[0])
var nodeId = types.Int64(pieces[1])
var serverId = types.Int64(pieces[2])
var itemId = types.Int64(pieces[3])
// 删除旧的数据
var tx *dbs.Tx
var err error
if useTx {
var before = time.Now()
tx, err = models.SharedMetricStatDAO.Instance.Begin()
if err != nil {
return err
}
defer func() {
// 失败时不需要rollback
if tx != nil {
commitErr := tx.Commit()
if commitErr != nil {
remotelogs.Error("METRIC_STAT", "commit metric stats failed: "+commitErr.Error())
}
}
// 如果运行时间过长,则不使用事务
if time.Since(before) > 1*time.Second {
useTx = false
}
}()
}
if len(req.MetricStats) > 0 {
err = models.SharedMetricStatDAO.DeleteNodeItemStats(tx, nodeId, serverId, itemId, req.Time, req.KeepKeys)
if err != nil {
return err
}
for _, stat := range req.MetricStats {
err = models.SharedMetricStatDAO.CreateStat(tx, stat.Hash, clusterId, nodeId, req.ServerId, req.ItemId, stat.Keys, float64(stat.Value), req.Time, req.Version)
if err != nil {
return err
}
}
}
// 保存总和
err = models.SharedMetricSumStatDAO.UpdateSum(tx, clusterId, nodeId, req.ServerId, req.Time, req.ItemId, req.Version, req.Count, req.Total)
if err != nil {
return err
}
return nil
}(key)
if err != nil {
if !models.CheckSQLErrCode(err, 1213 /** transaction deadlock **/) {
remotelogs.Error("METRIC_STAT", "upload metric stats failed: "+err.Error())
}
}
// 人为限速
countKeys++
if countKeys >= 100 {
countKeys = 0
time.Sleep(1 * time.Second)
}
}
})
})
}
// MetricStatService 指标统计数据相关服务
type MetricStatService struct {
BaseService
}
// UploadMetricStats 上传统计数据
func (this *MetricStatService) UploadMetricStats(ctx context.Context, req *pb.UploadMetricStatsRequest) (*pb.RPCSuccess, error) {
nodeId, err := this.ValidateNode(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
clusterId, err := models.SharedServerDAO.FindServerClusterId(tx, req.ServerId)
if err != nil {
return nil, err
}
var key = types.String(clusterId) + "@" + types.String(nodeId) + "@" + types.String(req.ServerId) + "@" + types.String(req.ItemId)
metricStatsLocker.Lock()
metricStatsMap[key] = req
select {
case metricStatKeysQueue <- key:
default:
// 如果满了就删除
delete(metricStatsMap, key)
}
metricStatsLocker.Unlock()
return this.Success()
}
// CountMetricStats 计算指标数据数量
func (this *MetricStatService) CountMetricStats(ctx context.Context, req *pb.CountMetricStatsRequest) (*pb.RPCCountResponse, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
version, err := models.SharedMetricItemDAO.FindItemVersion(tx, req.MetricItemId)
if err != nil {
return nil, err
}
count, err := models.SharedMetricStatDAO.CountItemStats(tx, req.MetricItemId, version)
if err != nil {
return nil, err
}
return this.SuccessCount(count)
}
// ListMetricStats 读取单页指标数据
func (this *MetricStatService) ListMetricStats(ctx context.Context, req *pb.ListMetricStatsRequest) (*pb.ListMetricStatsResponse, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
version, err := models.SharedMetricItemDAO.FindItemVersion(tx, req.MetricItemId)
if err != nil {
return nil, err
}
stats, err := models.SharedMetricStatDAO.ListItemStats(tx, req.MetricItemId, version, req.Offset, req.Size)
if err != nil {
return nil, err
}
var pbStats []*pb.MetricStat
for _, stat := range stats {
// cluster
clusterName, err := models.SharedNodeClusterDAO.FindNodeClusterName(tx, int64(stat.ClusterId))
if err != nil {
return nil, err
}
// node
nodeName, err := models.SharedNodeDAO.FindNodeName(tx, int64(stat.NodeId))
if err != nil {
return nil, err
}
// server
serverName, err := models.SharedServerDAO.FindEnabledServerName(tx, int64(stat.ServerId))
if err != nil {
return nil, err
}
// 查找sum值
count, total, err := models.SharedMetricSumStatDAO.FindNodeServerSum(tx, int64(stat.NodeId), int64(stat.ServerId), stat.Time, int64(stat.ItemId), types.Int32(stat.Version))
if err != nil {
return nil, err
}
pbStats = append(pbStats, &pb.MetricStat{
Id: int64(stat.Id),
Hash: stat.Hash,
ServerId: int64(stat.ServerId),
ItemId: int64(stat.ItemId),
Keys: stat.DecodeKeys(),
Value: float32(stat.Value),
Time: stat.Time,
Version: types.Int32(stat.Version),
NodeCluster: &pb.NodeCluster{Id: int64(stat.ClusterId), Name: clusterName},
Node: &pb.Node{Id: int64(stat.NodeId), Name: nodeName},
Server: &pb.Server{Id: int64(stat.ServerId), Name: serverName},
SumCount: count,
SumTotal: total,
})
}
return &pb.ListMetricStatsResponse{MetricStats: pbStats}, nil
}