diff --git a/internal/db/models/metric_chart_dao.go b/internal/db/models/metric_chart_dao.go index 9acd0d52..241a71d4 100644 --- a/internal/db/models/metric_chart_dao.go +++ b/internal/db/models/metric_chart_dao.go @@ -141,3 +141,14 @@ func (this *MetricChartDAO) ListEnabledCharts(tx *dbs.Tx, itemId int64, offset i FindAll() return } + +// FindAllEnabledCharts 查找所有图表 +func (this *MetricChartDAO) FindAllEnabledCharts(tx *dbs.Tx, itemId int64) (result []*MetricChart, err error) { + _, err = this.Query(tx). + Attr("itemId", itemId). + State(MetricChartStateEnabled). + DescPk(). + Slice(&result). + FindAll() + return +} diff --git a/internal/db/models/metric_stat_dao.go b/internal/db/models/metric_stat_dao.go index b9f07fc2..81982f88 100644 --- a/internal/db/models/metric_stat_dao.go +++ b/internal/db/models/metric_stat_dao.go @@ -5,6 +5,7 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/lists" "github.com/iwind/TeaGo/maps" "strconv" ) @@ -99,3 +100,59 @@ func (this *MetricStatDAO) ListItemStats(tx *dbs.Tx, itemId int64, version int32 FindAll() return } + +// FindItemStatsWithClusterIdAndLastTime 取得最近一次计时前 N 个数据 +// 适合每条数据中包含不同的Key的场景 +func (this *MetricStatDAO) FindItemStatsWithClusterIdAndLastTime(tx *dbs.Tx, clusterId int64, itemId int64, version int32, size int64) (result []*MetricStat, err error) { + // 最近一次时间 + statOne, err := this.Query(tx). + Attr("itemId", itemId). + Attr("version", version). + DescPk(). + Find() + if err != nil { + return nil, err + } + if statOne == nil { + return nil, nil + } + var lastStat = statOne.(*MetricStat) + var lastTime = lastStat.Time + + _, err = this.Query(tx). + Attr("clusterId", clusterId). + Attr("itemId", itemId). + Attr("version", version). + Attr("time", lastTime). + // TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等 + // TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE + Result("MIN(time) AS time", "SUM(value) AS value", "keys"). + Desc("value"). + Group("keys"). + Limit(size). + Slice(&result). + FindAll() + return +} + +// FindLatestItemStatsWithClusterId 取得集群最近 N 个时间的数据 +// 适合同个Key在不同时间段的变化场景 +func (this *MetricStatDAO) FindLatestItemStatsWithClusterId(tx *dbs.Tx, clusterId int64, itemId int64, version int32, size int64) (result []*MetricStat, err error) { + _, err = this.Query(tx). + Attr("clusterId", clusterId). + Attr("itemId", itemId). + Attr("version", version). + // TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等 + // TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE + Result("time", "SUM(value) AS value", "MIN(`keys`) AS `keys`"). + Desc("time"). + Group("time"). + Limit(size). + Slice(&result). + FindAll() + if err != nil { + return nil, err + } + lists.Reverse(result) + return +} diff --git a/internal/db/models/metric_sum_stat_dao.go b/internal/db/models/metric_sum_stat_dao.go index c57669e7..7945f692 100644 --- a/internal/db/models/metric_sum_stat_dao.go +++ b/internal/db/models/metric_sum_stat_dao.go @@ -29,24 +29,27 @@ func init() { } // UpdateSum 更新统计数据 -func (this *MetricSumStatDAO) UpdateSum(tx *dbs.Tx, serverId int64, time string, itemId int64, version int32, count int64, total float32) error { +func (this *MetricSumStatDAO) UpdateSum(tx *dbs.Tx, clusterId int64, nodeId int64, serverId int64, time string, itemId int64, version int32, count int64, total float32) error { return this.Query(tx). InsertOrUpdateQuickly(maps.Map{ - "serverId": serverId, - "itemId": itemId, - "version": version, - "time": time, - "count": count, - "total": total, + "clusterId": clusterId, + "nodeId": nodeId, + "serverId": serverId, + "itemId": itemId, + "version": version, + "time": time, + "count": count, + "total": total, }, maps.Map{ "count": count, "total": total, }) } -// FindSum 查找统计数据 -func (this *MetricSumStatDAO) FindSum(tx *dbs.Tx, serverId int64, time string, itemId int64, version int32) (count int64, total float32, err error) { +// FindNodeSum 查找节点上的统计数据 +func (this *MetricSumStatDAO) FindNodeSum(tx *dbs.Tx, nodeId int64, serverId int64, time string, itemId int64, version int32) (count int64, total float32, err error) { one, err := this.Query(tx). + Attr("nodeId", nodeId). Attr("serverId", serverId). Attr("time", time). Attr("itemId", itemId). @@ -60,3 +63,21 @@ func (this *MetricSumStatDAO) FindSum(tx *dbs.Tx, serverId int64, time string, i } return int64(one.(*MetricSumStat).Count), float32(one.(*MetricSumStat).Total), nil } + +// FindClusterSum 查找集群上的统计数据 +func (this *MetricSumStatDAO) FindClusterSum(tx *dbs.Tx, clusterId int64, time string, itemId int64, version int32) (count int64, total float32, err error) { + one, err := this.Query(tx). + Attr("clusterId", clusterId). + Attr("time", time). + Attr("itemId", itemId). + Attr("version", version). + Result("SUM(count) AS `count`, SUM(total) AS total"). + Find() + if err != nil { + return 0, 0, err + } + if one == nil { + return + } + return int64(one.(*MetricSumStat).Count), float32(one.(*MetricSumStat).Total), nil +} diff --git a/internal/db/models/metric_sum_stat_model.go b/internal/db/models/metric_sum_stat_model.go index e657591e..0c834e0b 100644 --- a/internal/db/models/metric_sum_stat_model.go +++ b/internal/db/models/metric_sum_stat_model.go @@ -2,23 +2,27 @@ package models // MetricSumStat 指标统计总和数据 type MetricSumStat struct { - Id uint64 `field:"id"` // ID - ServerId uint32 `field:"serverId"` // 服务ID - ItemId uint64 `field:"itemId"` // 指标 - Count uint64 `field:"count"` // 数量 - Total float64 `field:"total"` // 总和 - Time string `field:"time"` // 分钟值YYYYMMDDHHII - Version uint32 `field:"version"` // 版本号 + Id uint64 `field:"id"` // ID + ClusterId uint32 `field:"clusterId"` // 集群ID + NodeId uint32 `field:"nodeId"` // 节点ID + ServerId uint32 `field:"serverId"` // 服务ID + ItemId uint64 `field:"itemId"` // 指标 + Count uint64 `field:"count"` // 数量 + Total float64 `field:"total"` // 总和 + Time string `field:"time"` // 分钟值YYYYMMDDHHII + Version uint32 `field:"version"` // 版本号 } type MetricSumStatOperator struct { - Id interface{} // ID - ServerId interface{} // 服务ID - ItemId interface{} // 指标 - Count interface{} // 数量 - Total interface{} // 总和 - Time interface{} // 分钟值YYYYMMDDHHII - Version interface{} // 版本号 + Id interface{} // ID + ClusterId interface{} // 集群ID + NodeId interface{} // 节点ID + ServerId interface{} // 服务ID + ItemId interface{} // 指标 + Count interface{} // 数量 + Total interface{} // 总和 + Time interface{} // 分钟值YYYYMMDDHHII + Version interface{} // 版本号 } func NewMetricSumStatOperator() *MetricSumStatOperator { diff --git a/internal/rpc/services/service_metric_stat.go b/internal/rpc/services/service_metric_stat.go index becb74bd..7ff4510b 100644 --- a/internal/rpc/services/service_metric_stat.go +++ b/internal/rpc/services/service_metric_stat.go @@ -35,7 +35,7 @@ func (this *MetricStatService) UploadMetricStats(ctx context.Context, req *pb.Up } // 保存总和 - err = models.SharedMetricSumStatDAO.UpdateSum(tx, req.ServerId, req.Time, req.ItemId, req.Version, req.Count, req.Total) + err = models.SharedMetricSumStatDAO.UpdateSum(tx, clusterId, nodeId, req.ServerId, req.Time, req.ItemId, req.Version, req.Count, req.Total) if err != nil { return nil, err } @@ -97,7 +97,7 @@ func (this *MetricStatService) ListMetricStats(ctx context.Context, req *pb.List } // 查找sum值 - count, total, err := models.SharedMetricSumStatDAO.FindSum(tx, int64(stat.ServerId), stat.Time, int64(stat.ItemId), types.Int32(stat.Version)) + count, total, err := models.SharedMetricSumStatDAO.FindNodeSum(tx, int64(stat.NodeId), int64(stat.ServerId), stat.Time, int64(stat.ItemId), types.Int32(stat.Version)) if err != nil { return nil, err } diff --git a/internal/rpc/services/service_server_stat_board.go b/internal/rpc/services/service_server_stat_board.go index a904724d..7adc737b 100644 --- a/internal/rpc/services/service_server_stat_board.go +++ b/internal/rpc/services/service_server_stat_board.go @@ -10,6 +10,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/configutils" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/iwind/TeaGo/types" timeutil "github.com/iwind/TeaGo/utils/time" "time" @@ -195,5 +196,123 @@ func (this *ServerStatBoardService) ComposeServerStatNodeClusterBoard(ctx contex }) } + // 指标 + clusterMetricItems, err := models.SharedNodeClusterMetricItemDAO.FindAllClusterItems(tx, req.NodeClusterId, serverconfigs.MetricItemCategoryHTTP) + if err != nil { + return nil, err + } + var pbMetricCharts = []*pb.ComposeServerStatNodeClusterBoardResponse_MetricData{} + for _, clusterMetricItem := range clusterMetricItems { + if clusterMetricItem.IsOn != 1 { + continue + } + var itemId = int64(clusterMetricItem.ItemId) + charts, err := models.SharedMetricChartDAO.FindAllEnabledCharts(tx, itemId) + if err != nil { + return nil, err + } + + item, err := models.SharedMetricItemDAO.FindEnabledMetricItem(tx, itemId) + if err != nil { + return nil, err + } + if item == nil || item.IsOn == 0 { + continue + } + + for _, chart := range charts { + if chart.IsOn == 0 { + continue + } + + var pbChart = &pb.MetricChart{ + Id: int64(chart.Id), + Name: chart.Name, + Type: chart.Type, + WidthDiv: chart.WidthDiv, + ParamsJSON: nil, + IsOn: chart.IsOn == 1, + MaxItems: types.Int32(chart.MaxItems), + MetricItem: &pb.MetricItem{ + Id: itemId, + PeriodUnit: item.PeriodUnit, + Period: types.Int32(item.Period), + Name: item.Name, + Value: item.Value, + Category: item.Category, + Keys: item.DecodeKeys(), + Code: item.Code, + IsOn: item.IsOn == 1, + }, + } + var pbStats = []*pb.MetricStat{} + switch chart.Type { + case serverconfigs.MetricChartTypeTimeLine: + itemStats, err := models.SharedMetricStatDAO.FindLatestItemStatsWithClusterId(tx, req.NodeClusterId, itemId, types.Int32(item.Version), 10) + if err != nil { + return nil, err + } + + for _, stat := range itemStats { + // 当前时间总和 + count, total, err := models.SharedMetricSumStatDAO.FindClusterSum(tx, req.NodeClusterId, stat.Time, itemId, types.Int32(item.Version)) + if err != nil { + return nil, err + } + + pbStats = append(pbStats, &pb.MetricStat{ + Id: int64(stat.Id), + Hash: stat.Hash, + ServerId: 0, + ItemId: 0, + Keys: stat.DecodeKeys(), + Value: types.Float32(stat.Value), + Time: stat.Time, + Version: 0, + NodeCluster: nil, + Node: nil, + Server: nil, + SumCount: count, + SumTotal: total, + }) + } + default: + itemStats, err := models.SharedMetricStatDAO.FindItemStatsWithClusterIdAndLastTime(tx, req.NodeClusterId, itemId, types.Int32(item.Version), 10) + if err != nil { + return nil, err + } + for _, stat := range itemStats { + // 当前时间总和 + // 当前时间总和 + count, total, err := models.SharedMetricSumStatDAO.FindClusterSum(tx, req.NodeClusterId, stat.Time, itemId, types.Int32(item.Version)) + if err != nil { + return nil, err + } + + pbStats = append(pbStats, &pb.MetricStat{ + Id: int64(stat.Id), + Hash: stat.Hash, + ServerId: 0, + ItemId: 0, + Keys: stat.DecodeKeys(), + Value: types.Float32(stat.Value), + Time: stat.Time, + Version: 0, + NodeCluster: nil, + Node: nil, + Server: nil, + SumCount: count, + SumTotal: total, + }) + } + } + pbMetricCharts = append(pbMetricCharts, &pb.ComposeServerStatNodeClusterBoardResponse_MetricData{ + MetricChart: pbChart, + MetricStats: pbStats, + }) + } + } + result.MetricCharts = pbMetricCharts + return result, nil }