mirror of
https://github.com/TeaOSLab/EdgeAPI.git
synced 2025-11-03 15:00:27 +08:00
224 lines
6.1 KiB
Go
224 lines
6.1 KiB
Go
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
|
|
|
package services
|
|
|
|
import (
|
|
"context"
|
|
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
|
"github.com/TeaOSLab/EdgeAPI/internal/goman"
|
|
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
|
|
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
|
"github.com/iwind/TeaGo/dbs"
|
|
"github.com/iwind/TeaGo/types"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// 队列相关数据
|
|
var metricStatsMap = map[string]*pb.UploadMetricStatsRequest{} // key (clusterId@nodeId@serverId@itemId) => UploadMetricStatsRequest
|
|
var metricStatKeysQueue = make(chan string, 100_000)
|
|
var metricStatsLocker = &sync.Mutex{}
|
|
|
|
func init() {
|
|
dbs.OnReadyDone(func() {
|
|
goman.New(func() {
|
|
// 将队列导入数据库
|
|
var countKeys = 0
|
|
var useTx = true
|
|
|
|
for key := range metricStatKeysQueue {
|
|
err := func(key string) error {
|
|
metricStatsLocker.Lock()
|
|
req, ok := metricStatsMap[key]
|
|
if !ok {
|
|
metricStatsLocker.Unlock()
|
|
return nil
|
|
}
|
|
delete(metricStatsMap, key)
|
|
metricStatsLocker.Unlock()
|
|
|
|
var pieces = strings.Split(key, "@")
|
|
var clusterId = types.Int64(pieces[0])
|
|
var nodeId = types.Int64(pieces[1])
|
|
var serverId = types.Int64(pieces[2])
|
|
var itemId = types.Int64(pieces[3])
|
|
|
|
// 删除旧的数据
|
|
var tx *dbs.Tx
|
|
var err error
|
|
if useTx {
|
|
var before = time.Now()
|
|
|
|
tx, err = models.SharedMetricStatDAO.Instance.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer func() {
|
|
// 失败时不需要rollback
|
|
if tx != nil {
|
|
commitErr := tx.Commit()
|
|
if commitErr != nil {
|
|
remotelogs.Error("METRIC_STAT", "commit metric stats failed: "+commitErr.Error())
|
|
}
|
|
}
|
|
|
|
// 如果运行时间过长,则不使用事务
|
|
if time.Since(before) > 1*time.Second {
|
|
useTx = false
|
|
}
|
|
}()
|
|
}
|
|
|
|
if len(req.MetricStats) > 0 {
|
|
err = models.SharedMetricStatDAO.DeleteNodeItemStats(tx, nodeId, serverId, itemId, req.Time, req.KeepKeys)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, stat := range req.MetricStats {
|
|
err = models.SharedMetricStatDAO.CreateStat(tx, stat.Hash, clusterId, nodeId, req.ServerId, req.ItemId, stat.Keys, float64(stat.Value), req.Time, req.Version)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// 保存总和
|
|
err = models.SharedMetricSumStatDAO.UpdateSum(tx, clusterId, nodeId, req.ServerId, req.Time, req.ItemId, req.Version, req.Count, req.Total)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}(key)
|
|
if err != nil {
|
|
if !models.CheckSQLErrCode(err, 1213 /** transaction deadlock **/) {
|
|
remotelogs.Error("METRIC_STAT", "upload metric stats failed: "+err.Error())
|
|
}
|
|
}
|
|
|
|
// 人为限速
|
|
countKeys++
|
|
if countKeys >= 100 {
|
|
countKeys = 0
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
}
|
|
})
|
|
})
|
|
}
|
|
|
|
// MetricStatService 指标统计数据相关服务
|
|
type MetricStatService struct {
|
|
BaseService
|
|
}
|
|
|
|
// UploadMetricStats 上传统计数据
|
|
func (this *MetricStatService) UploadMetricStats(ctx context.Context, req *pb.UploadMetricStatsRequest) (*pb.RPCSuccess, error) {
|
|
nodeId, err := this.ValidateNode(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var tx = this.NullTx()
|
|
clusterId, err := models.SharedServerDAO.FindServerClusterId(tx, req.ServerId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var key = types.String(clusterId) + "@" + types.String(nodeId) + "@" + types.String(req.ServerId) + "@" + types.String(req.ItemId)
|
|
metricStatsLocker.Lock()
|
|
metricStatsMap[key] = req
|
|
|
|
select {
|
|
case metricStatKeysQueue <- key:
|
|
default:
|
|
// 如果满了就删除
|
|
delete(metricStatsMap, key)
|
|
}
|
|
|
|
metricStatsLocker.Unlock()
|
|
|
|
return this.Success()
|
|
}
|
|
|
|
// CountMetricStats 计算指标数据数量
|
|
func (this *MetricStatService) CountMetricStats(ctx context.Context, req *pb.CountMetricStatsRequest) (*pb.RPCCountResponse, error) {
|
|
_, err := this.ValidateAdmin(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var tx = this.NullTx()
|
|
version, err := models.SharedMetricItemDAO.FindItemVersion(tx, req.MetricItemId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
count, err := models.SharedMetricStatDAO.CountItemStats(tx, req.MetricItemId, version)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return this.SuccessCount(count)
|
|
}
|
|
|
|
// ListMetricStats 读取单页指标数据
|
|
func (this *MetricStatService) ListMetricStats(ctx context.Context, req *pb.ListMetricStatsRequest) (*pb.ListMetricStatsResponse, error) {
|
|
_, err := this.ValidateAdmin(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var tx = this.NullTx()
|
|
version, err := models.SharedMetricItemDAO.FindItemVersion(tx, req.MetricItemId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
stats, err := models.SharedMetricStatDAO.ListItemStats(tx, req.MetricItemId, version, req.Offset, req.Size)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var pbStats []*pb.MetricStat
|
|
for _, stat := range stats {
|
|
// cluster
|
|
clusterName, err := models.SharedNodeClusterDAO.FindNodeClusterName(tx, int64(stat.ClusterId))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// node
|
|
nodeName, err := models.SharedNodeDAO.FindNodeName(tx, int64(stat.NodeId))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// server
|
|
serverName, err := models.SharedServerDAO.FindEnabledServerName(tx, int64(stat.ServerId))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// 查找sum值
|
|
count, total, err := models.SharedMetricSumStatDAO.FindNodeServerSum(tx, int64(stat.NodeId), int64(stat.ServerId), stat.Time, int64(stat.ItemId), types.Int32(stat.Version))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
pbStats = append(pbStats, &pb.MetricStat{
|
|
Id: int64(stat.Id),
|
|
Hash: stat.Hash,
|
|
ServerId: int64(stat.ServerId),
|
|
ItemId: int64(stat.ItemId),
|
|
Keys: stat.DecodeKeys(),
|
|
Value: float32(stat.Value),
|
|
Time: stat.Time,
|
|
Version: types.Int32(stat.Version),
|
|
NodeCluster: &pb.NodeCluster{Id: int64(stat.ClusterId), Name: clusterName},
|
|
Node: &pb.Node{Id: int64(stat.NodeId), Name: nodeName},
|
|
Server: &pb.Server{Id: int64(stat.ServerId), Name: serverName},
|
|
SumCount: count,
|
|
SumTotal: total,
|
|
})
|
|
}
|
|
return &pb.ListMetricStatsResponse{MetricStats: pbStats}, nil
|
|
}
|