mirror of
				https://github.com/TeaOSLab/EdgeAPI.git
				synced 2025-11-04 07:50:25 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			224 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			224 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
 | 
						|
 | 
						|
package services
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"github.com/TeaOSLab/EdgeAPI/internal/db/models"
 | 
						|
	"github.com/TeaOSLab/EdgeAPI/internal/goman"
 | 
						|
	"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
 | 
						|
	"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
 | 
						|
	"github.com/iwind/TeaGo/dbs"
 | 
						|
	"github.com/iwind/TeaGo/types"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
)
 | 
						|
 | 
						|
// 队列相关数据
 | 
						|
var metricStatsMap = map[string]*pb.UploadMetricStatsRequest{} // key (clusterId@nodeId@serverId@itemId) => UploadMetricStatsRequest
 | 
						|
var metricStatKeysQueue = make(chan string, 100_000)
 | 
						|
var metricStatsLocker = &sync.Mutex{}
 | 
						|
 | 
						|
func init() {
 | 
						|
	dbs.OnReadyDone(func() {
 | 
						|
		goman.New(func() {
 | 
						|
			// 将队列导入数据库
 | 
						|
			var countKeys = 0
 | 
						|
			var useTx = true
 | 
						|
 | 
						|
			for key := range metricStatKeysQueue {
 | 
						|
				err := func(key string) error {
 | 
						|
					metricStatsLocker.Lock()
 | 
						|
					req, ok := metricStatsMap[key]
 | 
						|
					if !ok {
 | 
						|
						metricStatsLocker.Unlock()
 | 
						|
						return nil
 | 
						|
					}
 | 
						|
					delete(metricStatsMap, key)
 | 
						|
					metricStatsLocker.Unlock()
 | 
						|
 | 
						|
					var pieces = strings.Split(key, "@")
 | 
						|
					var clusterId = types.Int64(pieces[0])
 | 
						|
					var nodeId = types.Int64(pieces[1])
 | 
						|
					var serverId = types.Int64(pieces[2])
 | 
						|
					var itemId = types.Int64(pieces[3])
 | 
						|
 | 
						|
					// 删除旧的数据
 | 
						|
					var tx *dbs.Tx
 | 
						|
					var err error
 | 
						|
					if useTx {
 | 
						|
						var before = time.Now()
 | 
						|
 | 
						|
						tx, err = models.SharedMetricStatDAO.Instance.Begin()
 | 
						|
						if err != nil {
 | 
						|
							return err
 | 
						|
						}
 | 
						|
 | 
						|
						defer func() {
 | 
						|
							// 失败时不需要rollback
 | 
						|
							if tx != nil {
 | 
						|
								commitErr := tx.Commit()
 | 
						|
								if commitErr != nil {
 | 
						|
									remotelogs.Error("METRIC_STAT", "commit metric stats failed: "+commitErr.Error())
 | 
						|
								}
 | 
						|
							}
 | 
						|
 | 
						|
							// 如果运行时间过长,则不使用事务
 | 
						|
							if time.Since(before) > 1*time.Second {
 | 
						|
								useTx = false
 | 
						|
							}
 | 
						|
						}()
 | 
						|
					}
 | 
						|
 | 
						|
					if len(req.MetricStats) > 0 {
 | 
						|
						err = models.SharedMetricStatDAO.DeleteNodeItemStats(tx, nodeId, serverId, itemId, req.Time, req.KeepKeys)
 | 
						|
						if err != nil {
 | 
						|
							return err
 | 
						|
						}
 | 
						|
 | 
						|
						for _, stat := range req.MetricStats {
 | 
						|
							err = models.SharedMetricStatDAO.CreateStat(tx, stat.Hash, clusterId, nodeId, req.ServerId, req.ItemId, stat.Keys, float64(stat.Value), req.Time, req.Version)
 | 
						|
							if err != nil {
 | 
						|
								return err
 | 
						|
							}
 | 
						|
						}
 | 
						|
					}
 | 
						|
 | 
						|
					// 保存总和
 | 
						|
					err = models.SharedMetricSumStatDAO.UpdateSum(tx, clusterId, nodeId, req.ServerId, req.Time, req.ItemId, req.Version, req.Count, req.Total)
 | 
						|
					if err != nil {
 | 
						|
						return err
 | 
						|
					}
 | 
						|
 | 
						|
					return nil
 | 
						|
				}(key)
 | 
						|
				if err != nil {
 | 
						|
					if !models.CheckSQLErrCode(err, 1213 /** transaction deadlock **/) {
 | 
						|
						remotelogs.Error("METRIC_STAT", "upload metric stats failed: "+err.Error())
 | 
						|
					}
 | 
						|
				}
 | 
						|
 | 
						|
				// 人为限速
 | 
						|
				countKeys++
 | 
						|
				if countKeys >= 100 {
 | 
						|
					countKeys = 0
 | 
						|
					time.Sleep(1 * time.Second)
 | 
						|
				}
 | 
						|
			}
 | 
						|
		})
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// MetricStatService 指标统计数据相关服务
 | 
						|
type MetricStatService struct {
 | 
						|
	BaseService
 | 
						|
}
 | 
						|
 | 
						|
// UploadMetricStats 上传统计数据
 | 
						|
func (this *MetricStatService) UploadMetricStats(ctx context.Context, req *pb.UploadMetricStatsRequest) (*pb.RPCSuccess, error) {
 | 
						|
	nodeId, err := this.ValidateNode(ctx)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	var tx = this.NullTx()
 | 
						|
	clusterId, err := models.SharedServerDAO.FindServerClusterId(tx, req.ServerId)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	var key = types.String(clusterId) + "@" + types.String(nodeId) + "@" + types.String(req.ServerId) + "@" + types.String(req.ItemId)
 | 
						|
	metricStatsLocker.Lock()
 | 
						|
	metricStatsMap[key] = req
 | 
						|
 | 
						|
	select {
 | 
						|
	case metricStatKeysQueue <- key:
 | 
						|
	default:
 | 
						|
		// 如果满了就删除
 | 
						|
		delete(metricStatsMap, key)
 | 
						|
	}
 | 
						|
 | 
						|
	metricStatsLocker.Unlock()
 | 
						|
 | 
						|
	return this.Success()
 | 
						|
}
 | 
						|
 | 
						|
// CountMetricStats 计算指标数据数量
 | 
						|
func (this *MetricStatService) CountMetricStats(ctx context.Context, req *pb.CountMetricStatsRequest) (*pb.RPCCountResponse, error) {
 | 
						|
	_, err := this.ValidateAdmin(ctx)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	var tx = this.NullTx()
 | 
						|
	version, err := models.SharedMetricItemDAO.FindItemVersion(tx, req.MetricItemId)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	count, err := models.SharedMetricStatDAO.CountItemStats(tx, req.MetricItemId, version)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	return this.SuccessCount(count)
 | 
						|
}
 | 
						|
 | 
						|
// ListMetricStats 读取单页指标数据
 | 
						|
func (this *MetricStatService) ListMetricStats(ctx context.Context, req *pb.ListMetricStatsRequest) (*pb.ListMetricStatsResponse, error) {
 | 
						|
	_, err := this.ValidateAdmin(ctx)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	var tx = this.NullTx()
 | 
						|
	version, err := models.SharedMetricItemDAO.FindItemVersion(tx, req.MetricItemId)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	stats, err := models.SharedMetricStatDAO.ListItemStats(tx, req.MetricItemId, version, req.Offset, req.Size)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	var pbStats []*pb.MetricStat
 | 
						|
	for _, stat := range stats {
 | 
						|
		// cluster
 | 
						|
		clusterName, err := models.SharedNodeClusterDAO.FindNodeClusterName(tx, int64(stat.ClusterId))
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
 | 
						|
		// node
 | 
						|
		nodeName, err := models.SharedNodeDAO.FindNodeName(tx, int64(stat.NodeId))
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
 | 
						|
		// server
 | 
						|
		serverName, err := models.SharedServerDAO.FindEnabledServerName(tx, int64(stat.ServerId))
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
 | 
						|
		// 查找sum值
 | 
						|
		count, total, err := models.SharedMetricSumStatDAO.FindNodeServerSum(tx, int64(stat.NodeId), int64(stat.ServerId), stat.Time, int64(stat.ItemId), types.Int32(stat.Version))
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
 | 
						|
		pbStats = append(pbStats, &pb.MetricStat{
 | 
						|
			Id:          int64(stat.Id),
 | 
						|
			Hash:        stat.Hash,
 | 
						|
			ServerId:    int64(stat.ServerId),
 | 
						|
			ItemId:      int64(stat.ItemId),
 | 
						|
			Keys:        stat.DecodeKeys(),
 | 
						|
			Value:       float32(stat.Value),
 | 
						|
			Time:        stat.Time,
 | 
						|
			Version:     types.Int32(stat.Version),
 | 
						|
			NodeCluster: &pb.NodeCluster{Id: int64(stat.ClusterId), Name: clusterName},
 | 
						|
			Node:        &pb.Node{Id: int64(stat.NodeId), Name: nodeName},
 | 
						|
			Server:      &pb.Server{Id: int64(stat.ServerId), Name: serverName},
 | 
						|
			SumCount:    count,
 | 
						|
			SumTotal:    total,
 | 
						|
		})
 | 
						|
	}
 | 
						|
	return &pb.ListMetricStatsResponse{MetricStats: pbStats}, nil
 | 
						|
}
 |