diff --git a/internal/db/models/node_dao.go b/internal/db/models/node_dao.go index d6acbb9f..dad9221f 100644 --- a/internal/db/models/node_dao.go +++ b/internal/db/models/node_dao.go @@ -456,9 +456,26 @@ func (this *NodeDAO) FindEnabledNodeClusterIds(tx *dbs.Tx, nodeId int64) (result return } +// FindEnabledNodeIdsWithClusterId 查找某个集群下的所有节点IDs +func (this *NodeDAO) FindEnabledNodeIdsWithClusterId(tx *dbs.Tx, clusterId int64) ([]int64, error) { + ones, err := this.Query(tx). + Attr("clusterId", clusterId). + State(NodeClusterStateEnabled). + ResultPk(). + FindAll() + if err != nil { + return nil, err + } + var result = []int64{} + for _, one := range ones { + result = append(result, int64(one.(*Node).Id)) + } + return result, nil +} + // FindAllNodeIdsMatch 匹配节点并返回节点ID func (this *NodeDAO) FindAllNodeIdsMatch(tx *dbs.Tx, clusterId int64, includeSecondaryNodes bool, isOn configutils.BoolState) (result []int64, err error) { - query := this.Query(tx) + var query = this.Query(tx) query.State(NodeStateEnabled) if clusterId > 0 { if includeSecondaryNodes { diff --git a/internal/rpc/services/service_admin.go b/internal/rpc/services/service_admin.go index 2088cb83..1f999891 100644 --- a/internal/rpc/services/service_admin.go +++ b/internal/rpc/services/service_admin.go @@ -9,14 +9,12 @@ import ( "github.com/TeaOSLab/EdgeAPI/internal/db/models/regions" "github.com/TeaOSLab/EdgeAPI/internal/db/models/stats" "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/TeaOSLab/EdgeAPI/internal/rpc/tasks" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" "github.com/TeaOSLab/EdgeAPI/internal/utils" "github.com/TeaOSLab/EdgeCommon/pkg/configutils" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" - "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs" - "github.com/iwind/TeaGo/dbs" - "github.com/iwind/TeaGo/types" timeutil "github.com/iwind/TeaGo/utils/time" "time" ) @@ -743,7 +741,11 @@ func (this *AdminService) ComposeAdminDashboard(ctx context.Context, req *pb.Com // 域名排行 this.BeginTag(ctx, "SharedServerDomainHourlyStatDAO.FindTopDomainStats") - topDomainStats, err := stats.SharedServerDomainHourlyStatDAO.FindTopDomainStats(tx, hourFrom, hourTo, 10) + var topDomainStats []*stats.ServerDomainHourlyStat + topDomainStatsCache, ok := tasks.SharedCacheTaskManager.GetGlobalTopDomains() + if ok { + topDomainStats = topDomainStatsCache.([]*stats.ServerDomainHourlyStat) + } this.EndTag(ctx, "SharedServerDomainHourlyStatDAO.FindTopDomainStats") if err != nil { return nil, err @@ -816,7 +818,11 @@ func (this *AdminService) ComposeAdminDashboard(ctx context.Context, req *pb.Com // 指标数据 this.BeginTag(ctx, "findMetricDataCharts") - pbCharts, err := this.findMetricDataCharts(tx) + var pbCharts []*pb.MetricDataChart + pbChartsCache, ok := tasks.SharedCacheTaskManager.Get(tasks.CacheKeyFindAllMetricDataCharts) + if ok { + pbCharts = pbChartsCache.([]*pb.MetricDataChart) + } this.EndTag(ctx, "findMetricDataCharts") if err != nil { return nil, err @@ -839,111 +845,3 @@ func (this *AdminService) UpdateAdminTheme(ctx context.Context, req *pb.UpdateAd } return this.Success() } - -// 查找集群、节点和服务的指标数据 -func (this *AdminService) findMetricDataCharts(tx *dbs.Tx) (result []*pb.MetricDataChart, err error) { - // 集群指标 - items, err := models.SharedMetricItemDAO.FindAllPublicItems(tx, serverconfigs.MetricItemCategoryHTTP, nil) - if err != nil { - return nil, err - } - var pbMetricCharts = []*pb.MetricDataChart{} - for _, item := range items { - var itemId = int64(item.Id) - charts, err := models.SharedMetricChartDAO.FindAllEnabledCharts(tx, itemId) - if err != nil { - return nil, err - } - - for _, chart := range charts { - if chart.IsOn == 0 { - continue - } - - var pbChart = &pb.MetricChart{ - Id: int64(chart.Id), - Name: chart.Name, - Type: chart.Type, - WidthDiv: chart.WidthDiv, - ParamsJSON: nil, - IsOn: chart.IsOn == 1, - MaxItems: types.Int32(chart.MaxItems), - MetricItem: &pb.MetricItem{ - Id: itemId, - PeriodUnit: item.PeriodUnit, - Period: types.Int32(item.Period), - Name: item.Name, - Value: item.Value, - Category: item.Category, - Keys: item.DecodeKeys(), - Code: item.Code, - IsOn: item.IsOn == 1, - }, - } - var pbStats = []*pb.MetricStat{} - switch chart.Type { - case serverconfigs.MetricChartTypeTimeLine: - itemStats, err := models.SharedMetricStatDAO.FindLatestItemStats(tx, itemId, chart.IgnoreEmptyKeys == 1, chart.DecodeIgnoredKeys(), types.Int32(item.Version), 10) - if err != nil { - return nil, err - } - - for _, stat := range itemStats { - // 当前时间总和 - count, total, err := models.SharedMetricSumStatDAO.FindSumAtTime(tx, stat.Time, itemId, types.Int32(item.Version)) - if err != nil { - return nil, err - } - - pbStats = append(pbStats, &pb.MetricStat{ - Id: int64(stat.Id), - Hash: stat.Hash, - ServerId: 0, - ItemId: 0, - Keys: stat.DecodeKeys(), - Value: types.Float32(stat.Value), - Time: stat.Time, - Version: 0, - NodeCluster: nil, - Node: nil, - Server: nil, - SumCount: count, - SumTotal: total, - }) - } - default: - itemStats, err := models.SharedMetricStatDAO.FindItemStatsAtLastTime(tx, itemId, chart.IgnoreEmptyKeys == 1, chart.DecodeIgnoredKeys(), types.Int32(item.Version), 10) - if err != nil { - return nil, err - } - for _, stat := range itemStats { - count, total, err := models.SharedMetricSumStatDAO.FindSumAtTime(tx, stat.Time, itemId, types.Int32(item.Version)) - if err != nil { - return nil, err - } - - pbStats = append(pbStats, &pb.MetricStat{ - Id: int64(stat.Id), - Hash: stat.Hash, - ServerId: 0, - ItemId: 0, - Keys: stat.DecodeKeys(), - Value: types.Float32(stat.Value), - Time: stat.Time, - Version: 0, - NodeCluster: nil, - Node: nil, - Server: nil, - SumCount: count, - SumTotal: total, - }) - } - } - pbMetricCharts = append(pbMetricCharts, &pb.MetricDataChart{ - MetricChart: pbChart, - MetricStats: pbStats, - }) - } - } - return pbMetricCharts, nil -} diff --git a/internal/rpc/services/service_server_stat_board.go b/internal/rpc/services/service_server_stat_board.go index 7fd0c8d8..033e3a97 100644 --- a/internal/rpc/services/service_server_stat_board.go +++ b/internal/rpc/services/service_server_stat_board.go @@ -9,12 +9,11 @@ import ( "github.com/TeaOSLab/EdgeAPI/internal/db/models/regions" "github.com/TeaOSLab/EdgeAPI/internal/db/models/stats" "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/TeaOSLab/EdgeAPI/internal/rpc/tasks" "github.com/TeaOSLab/EdgeCommon/pkg/configutils" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" - "github.com/iwind/TeaGo/dbs" - "github.com/iwind/TeaGo/lists" "github.com/iwind/TeaGo/types" timeutil "github.com/iwind/TeaGo/utils/time" "time" @@ -208,11 +207,12 @@ func (this *ServerStatBoardService) ComposeServerStatNodeClusterBoard(ctx contex }) } - charts, err := this.findNodeClusterMetricDataCharts(tx, req.NodeClusterId, 0, 0, serverconfigs.MetricItemCategoryHTTP) - if err != nil { - return nil, err + var pbCharts []*pb.MetricDataChart + charts, ok := tasks.SharedCacheTaskManager.GetCluster(tasks.CacheKeyFindNodeClusterMetricDataCharts, req.NodeClusterId, serverconfigs.MetricItemCategoryHTTP) + if ok { + pbCharts = charts.([]*pb.MetricDataChart) } - result.MetricDataCharts = charts + result.MetricDataCharts = pbCharts return result, nil } @@ -443,11 +443,12 @@ func (this *ServerStatBoardService) ComposeServerStatNodeBoard(ctx context.Conte // 指标 var clusterId = int64(node.ClusterId) - charts, err := this.findNodeClusterMetricDataCharts(tx, clusterId, req.NodeId, 0, serverconfigs.MetricItemCategoryHTTP) - if err != nil { - return nil, err + var pbCharts []*pb.MetricDataChart + charts, ok := tasks.SharedCacheTaskManager.GetNode(tasks.CacheKeyFindNodeMetricDataCharts, clusterId, req.NodeId, serverconfigs.MetricItemCategoryHTTP) + if ok { + pbCharts = charts.([]*pb.MetricDataChart) } - result.MetricDataCharts = charts + result.MetricDataCharts = pbCharts return result, nil } @@ -560,182 +561,13 @@ func (this *ServerStatBoardService) ComposeServerStatBoard(ctx context.Context, case serverconfigs.ServerTypeUDPProxy: metricCategory = serverconfigs.MetricItemCategoryUDP } - charts, err := this.findNodeClusterMetricDataCharts(tx, clusterId, 0, req.ServerId, metricCategory) - if err != nil { - return nil, err + + var pbCharts []*pb.MetricDataChart + charts, ok := tasks.SharedCacheTaskManager.GetServer(tasks.CacheKeyFindServerMetricDataCharts, clusterId, req.ServerId, metricCategory) + if ok { + pbCharts = charts.([]*pb.MetricDataChart) } - result.MetricDataCharts = charts + result.MetricDataCharts = pbCharts return result, nil } - -// 查找集群、节点和服务的指标数据 -func (this *ServerStatBoardService) findNodeClusterMetricDataCharts(tx *dbs.Tx, clusterId int64, nodeId int64, serverId int64, category string) (result []*pb.MetricDataChart, err error) { - // 集群指标 - clusterMetricItems, err := models.SharedNodeClusterMetricItemDAO.FindAllClusterItems(tx, clusterId, category) - if err != nil { - return nil, err - } - var pbMetricCharts = []*pb.MetricDataChart{} - var metricItemIds = []int64{} - var items = []*models.MetricItem{} - for _, clusterMetricItem := range clusterMetricItems { - if clusterMetricItem.IsOn != 1 { - continue - } - var itemId = int64(clusterMetricItem.ItemId) - item, err := models.SharedMetricItemDAO.FindEnabledMetricItem(tx, itemId) - if err != nil { - return nil, err - } - if item == nil || item.IsOn == 0 { - continue - } - items = append(items, item) - metricItemIds = append(metricItemIds, itemId) - } - - publicMetricItems, err := models.SharedMetricItemDAO.FindAllPublicItems(tx, category, nil) - if err != nil { - return nil, err - } - for _, item := range publicMetricItems { - if item.IsOn != 1 { - continue - } - if lists.ContainsInt64(metricItemIds, int64(item.Id)) { - continue - } - items = append(items, item) - } - - for _, item := range items { - var itemId = int64(item.Id) - charts, err := models.SharedMetricChartDAO.FindAllEnabledCharts(tx, itemId) - if err != nil { - return nil, err - } - - for _, chart := range charts { - if chart.IsOn == 0 { - continue - } - - var pbChart = &pb.MetricChart{ - Id: int64(chart.Id), - Name: chart.Name, - Type: chart.Type, - WidthDiv: chart.WidthDiv, - ParamsJSON: nil, - IsOn: chart.IsOn == 1, - MaxItems: types.Int32(chart.MaxItems), - MetricItem: &pb.MetricItem{ - Id: itemId, - PeriodUnit: item.PeriodUnit, - Period: types.Int32(item.Period), - Name: item.Name, - Value: item.Value, - Category: item.Category, - Keys: item.DecodeKeys(), - Code: item.Code, - IsOn: item.IsOn == 1, - }, - } - var pbStats = []*pb.MetricStat{} - switch chart.Type { - case serverconfigs.MetricChartTypeTimeLine: - var itemStats []*models.MetricStat - if serverId > 0 { - itemStats, err = models.SharedMetricStatDAO.FindLatestItemStatsWithServerId(tx, serverId, itemId, chart.IgnoreEmptyKeys == 1, chart.DecodeIgnoredKeys(), types.Int32(item.Version), 10) - } else if nodeId > 0 { - itemStats, err = models.SharedMetricStatDAO.FindLatestItemStatsWithNodeId(tx, nodeId, itemId, chart.IgnoreEmptyKeys == 1, chart.DecodeIgnoredKeys(), types.Int32(item.Version), 10) - } else { - itemStats, err = models.SharedMetricStatDAO.FindLatestItemStatsWithClusterId(tx, clusterId, itemId, chart.IgnoreEmptyKeys == 1, chart.DecodeIgnoredKeys(), types.Int32(item.Version), 10) - } - if err != nil { - return nil, err - } - - for _, stat := range itemStats { - // 当前时间总和 - var count int64 - var total float32 - if serverId > 0 { - count, total, err = models.SharedMetricSumStatDAO.FindServerSum(tx, serverId, stat.Time, itemId, types.Int32(item.Version)) - } else if nodeId > 0 { - count, total, err = models.SharedMetricSumStatDAO.FindNodeSum(tx, nodeId, stat.Time, itemId, types.Int32(item.Version)) - } else { - count, total, err = models.SharedMetricSumStatDAO.FindClusterSum(tx, clusterId, stat.Time, itemId, types.Int32(item.Version)) - } - if err != nil { - return nil, err - } - - pbStats = append(pbStats, &pb.MetricStat{ - Id: int64(stat.Id), - Hash: stat.Hash, - ServerId: 0, - ItemId: 0, - Keys: stat.DecodeKeys(), - Value: types.Float32(stat.Value), - Time: stat.Time, - Version: 0, - NodeCluster: nil, - Node: nil, - Server: nil, - SumCount: count, - SumTotal: total, - }) - } - default: - var itemStats []*models.MetricStat - if serverId > 0 { - itemStats, err = models.SharedMetricStatDAO.FindItemStatsWithServerIdAndLastTime(tx, serverId, itemId, chart.IgnoreEmptyKeys == 1, chart.DecodeIgnoredKeys(), types.Int32(item.Version), 10) - } else if nodeId > 0 { - itemStats, err = models.SharedMetricStatDAO.FindItemStatsWithNodeIdAndLastTime(tx, nodeId, itemId, chart.IgnoreEmptyKeys == 1, chart.DecodeIgnoredKeys(), types.Int32(item.Version), 10) - } else { - itemStats, err = models.SharedMetricStatDAO.FindItemStatsWithClusterIdAndLastTime(tx, clusterId, itemId, chart.IgnoreEmptyKeys == 1, chart.DecodeIgnoredKeys(), types.Int32(item.Version), 10) - } - if err != nil { - return nil, err - } - for _, stat := range itemStats { - // 当前时间总和 - var count int64 - var total float32 - if serverId > 0 { - count, total, err = models.SharedMetricSumStatDAO.FindServerSum(tx, serverId, stat.Time, itemId, types.Int32(item.Version)) - } else if nodeId > 0 { - count, total, err = models.SharedMetricSumStatDAO.FindNodeSum(tx, nodeId, stat.Time, itemId, types.Int32(item.Version)) - } else { - count, total, err = models.SharedMetricSumStatDAO.FindClusterSum(tx, clusterId, stat.Time, itemId, types.Int32(item.Version)) - } - if err != nil { - return nil, err - } - - pbStats = append(pbStats, &pb.MetricStat{ - Id: int64(stat.Id), - Hash: stat.Hash, - ServerId: 0, - ItemId: 0, - Keys: stat.DecodeKeys(), - Value: types.Float32(stat.Value), - Time: stat.Time, - Version: 0, - NodeCluster: nil, - Node: nil, - Server: nil, - SumCount: count, - SumTotal: total, - }) - } - } - pbMetricCharts = append(pbMetricCharts, &pb.MetricDataChart{ - MetricChart: pbChart, - MetricStats: pbStats, - }) - } - } - return pbMetricCharts, nil -} diff --git a/internal/rpc/tasks/cache_task_manager.go b/internal/rpc/tasks/cache_task_manager.go new file mode 100644 index 00000000..647580ee --- /dev/null +++ b/internal/rpc/tasks/cache_task_manager.go @@ -0,0 +1,494 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package tasks + +import ( + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeAPI/internal/db/models/stats" + "github.com/TeaOSLab/EdgeAPI/internal/goman" + "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/lists" + "github.com/iwind/TeaGo/types" + timeutil "github.com/iwind/TeaGo/utils/time" + "sync" + "time" +) + +var SharedCacheTaskManager = NewCacheTaskManager() + +const ( + // global keys + + CacheKeyFindAllMetricDataCharts = "findAllMetricDataCharts" + CacheKeyFindGlobalTopDomains = "globalFindTopDomains" + + // cluster keys + + CacheKeyFindNodeClusterMetricDataCharts = "findNodeClusterMetricDataCharts" + + // node keys + + CacheKeyFindNodeMetricDataCharts = "findNodeMetricDataCharts" + + // server keys + + CacheKeyFindServerMetricDataCharts = "findServerMetricDataCharts" +) + +func init() { + dbs.OnReadyDone(func() { + goman.New(func() { + _ = SharedCacheTaskManager.Loop() + SharedCacheTaskManager.Start() + }) + }) +} + +type CacheTaskManager struct { + cacheMap map[string]interface{} + locker sync.Mutex + + nodeTasksTooSlow bool // 记录节点任务是否太慢,如果太慢就不再后台执行 + + ticker *time.Ticker +} + +func NewCacheTaskManager() *CacheTaskManager { + return &CacheTaskManager{ + cacheMap: map[string]interface{}{}, + ticker: time.NewTicker(5 * time.Minute), + } +} + +func (this *CacheTaskManager) Start() { + for range this.ticker.C { + err := this.Loop() + if err != nil { + remotelogs.Error("CACHE_TASK_MANAGER", err.Error()) + } + } +} + +func (this *CacheTaskManager) Loop() error { + var tx *dbs.Tx + + // Admin看板指标数据 + { + value, err := this.findAllMetricDataCharts(tx) + if err != nil { + return err + } + this.locker.Lock() + this.cacheMap[CacheKeyFindAllMetricDataCharts] = value + this.locker.Unlock() + } + + { + var hourFrom = timeutil.Format("YmdH", time.Now().Add(-23*time.Hour)) + var hourTo = timeutil.Format("YmdH") + + value, err := stats.SharedServerDomainHourlyStatDAO.FindTopDomainStats(tx, hourFrom, hourTo, 10) + if err != nil { + return err + } + + var composedKey = CacheKeyFindGlobalTopDomains + + this.locker.Lock() + this.cacheMap[composedKey] = value + this.locker.Unlock() + } + + // 集群数据 + clusterIds, err := models.SharedNodeClusterDAO.FindAllEnabledNodeClusterIds(tx) + if err != nil { + return err + } + for _, clusterId := range clusterIds { + // metric charts + { + var category = serverconfigs.MetricItemCategoryHTTP + var composedKey = CacheKeyFindNodeClusterMetricDataCharts + "@" + types.String(clusterId) + "@" + category + + value, err := this.findNodeClusterMetricDataCharts(tx, clusterId, 0, 0, category) + if err != nil { + return err + } + this.locker.Lock() + this.cacheMap[composedKey] = value + this.locker.Unlock() + } + + // nodes + var before = time.Now() + if !this.nodeTasksTooSlow { + nodeIds, err := models.SharedNodeDAO.FindEnabledNodeIdsWithClusterId(tx, clusterId) + if err != nil { + return err + } + for _, nodeId := range nodeIds { + // metric + { + var category = serverconfigs.MetricItemCategoryHTTP + var composedKey = CacheKeyFindNodeMetricDataCharts + "@" + types.String(clusterId) + "@" + types.String(nodeId) + "@" + category + + value, err := this.findNodeClusterMetricDataCharts(tx, clusterId, nodeId, 0, category) + if err != nil { + return err + } + this.locker.Lock() + this.cacheMap[composedKey] = value + this.locker.Unlock() + } + } + } + var cost = time.Since(before).Seconds() + if cost > 600 { + this.nodeTasksTooSlow = true + } + } + + return nil +} + +func (this *CacheTaskManager) Get(key string) (value interface{}, ok bool) { + this.locker.Lock() + defer this.locker.Unlock() + + if key == CacheKeyFindAllMetricDataCharts { + value, ok = this.cacheMap[key] + return + } + + return +} + +func (this *CacheTaskManager) GetCluster(key string, clusterId int64, category string) (value interface{}, ok bool) { + this.locker.Lock() + defer this.locker.Unlock() + + var composedKey = key + "@" + types.String(clusterId) + "@" + category + value, ok = this.cacheMap[composedKey] + return +} + +func (this *CacheTaskManager) GetNode(key string, clusterId int64, nodeId int64, category string) (value interface{}, ok bool) { + this.locker.Lock() + var composedKey = key + "@" + types.String(clusterId) + "@" + types.String(nodeId) + "@" + category + value, ok = this.cacheMap[composedKey] + this.locker.Unlock() + + if ok { + result, err := this.findNodeClusterMetricDataCharts(nil, clusterId, nodeId, 0, category) + if err == nil { + value = result + } + } + + return +} + +func (this *CacheTaskManager) GetServer(key string, clusterId int64, serverId int64, category string) (value interface{}, ok bool) { + switch key { + case CacheKeyFindServerMetricDataCharts: + var tx *dbs.Tx + v, err := this.findNodeClusterMetricDataCharts(tx, clusterId, 0, serverId, category) + if err != nil { + return nil, false + } + return v, true + } + + return +} + +func (this *CacheTaskManager) GetGlobalTopDomains() (value interface{}, ok bool) { + this.locker.Lock() + defer this.locker.Unlock() + + var composedKey = CacheKeyFindGlobalTopDomains + value, ok = this.cacheMap[composedKey] + return +} + +// 所有集群的指标统计 +func (this *CacheTaskManager) findAllMetricDataCharts(tx *dbs.Tx) (result []*pb.MetricDataChart, err error) { + // 集群指标 + items, err := models.SharedMetricItemDAO.FindAllPublicItems(tx, serverconfigs.MetricItemCategoryHTTP, nil) + if err != nil { + return nil, err + } + var pbMetricCharts = []*pb.MetricDataChart{} + for _, item := range items { + var itemId = int64(item.Id) + charts, err := models.SharedMetricChartDAO.FindAllEnabledCharts(tx, itemId) + if err != nil { + return nil, err + } + + for _, chart := range charts { + if chart.IsOn == 0 { + continue + } + + var pbChart = &pb.MetricChart{ + Id: int64(chart.Id), + Name: chart.Name, + Type: chart.Type, + WidthDiv: chart.WidthDiv, + ParamsJSON: nil, + IsOn: chart.IsOn == 1, + MaxItems: types.Int32(chart.MaxItems), + MetricItem: &pb.MetricItem{ + Id: itemId, + PeriodUnit: item.PeriodUnit, + Period: types.Int32(item.Period), + Name: item.Name, + Value: item.Value, + Category: item.Category, + Keys: item.DecodeKeys(), + Code: item.Code, + IsOn: item.IsOn == 1, + }, + } + var pbStats = []*pb.MetricStat{} + switch chart.Type { + case serverconfigs.MetricChartTypeTimeLine: + itemStats, err := models.SharedMetricStatDAO.FindLatestItemStats(tx, itemId, chart.IgnoreEmptyKeys == 1, chart.DecodeIgnoredKeys(), types.Int32(item.Version), 10) + if err != nil { + return nil, err + } + + for _, stat := range itemStats { + // 当前时间总和 + count, total, err := models.SharedMetricSumStatDAO.FindSumAtTime(tx, stat.Time, itemId, types.Int32(item.Version)) + if err != nil { + return nil, err + } + + pbStats = append(pbStats, &pb.MetricStat{ + Id: int64(stat.Id), + Hash: stat.Hash, + ServerId: 0, + ItemId: 0, + Keys: stat.DecodeKeys(), + Value: types.Float32(stat.Value), + Time: stat.Time, + Version: 0, + NodeCluster: nil, + Node: nil, + Server: nil, + SumCount: count, + SumTotal: total, + }) + } + default: + itemStats, err := models.SharedMetricStatDAO.FindItemStatsAtLastTime(tx, itemId, chart.IgnoreEmptyKeys == 1, chart.DecodeIgnoredKeys(), types.Int32(item.Version), 10) + if err != nil { + return nil, err + } + for _, stat := range itemStats { + count, total, err := models.SharedMetricSumStatDAO.FindSumAtTime(tx, stat.Time, itemId, types.Int32(item.Version)) + if err != nil { + return nil, err + } + + pbStats = append(pbStats, &pb.MetricStat{ + Id: int64(stat.Id), + Hash: stat.Hash, + ServerId: 0, + ItemId: 0, + Keys: stat.DecodeKeys(), + Value: types.Float32(stat.Value), + Time: stat.Time, + Version: 0, + NodeCluster: nil, + Node: nil, + Server: nil, + SumCount: count, + SumTotal: total, + }) + } + } + pbMetricCharts = append(pbMetricCharts, &pb.MetricDataChart{ + MetricChart: pbChart, + MetricStats: pbStats, + }) + } + } + return pbMetricCharts, nil +} + +// 某个集群、节点或者服务的指标统计 +func (this *CacheTaskManager) findNodeClusterMetricDataCharts(tx *dbs.Tx, clusterId int64, nodeId int64, serverId int64, category string) (result []*pb.MetricDataChart, err error) { + // 集群指标 + clusterMetricItems, err := models.SharedNodeClusterMetricItemDAO.FindAllClusterItems(tx, clusterId, category) + if err != nil { + return nil, err + } + var pbMetricCharts = []*pb.MetricDataChart{} + var metricItemIds = []int64{} + var items = []*models.MetricItem{} + for _, clusterMetricItem := range clusterMetricItems { + if clusterMetricItem.IsOn != 1 { + continue + } + var itemId = int64(clusterMetricItem.ItemId) + item, err := models.SharedMetricItemDAO.FindEnabledMetricItem(tx, itemId) + if err != nil { + return nil, err + } + if item == nil || item.IsOn == 0 { + continue + } + items = append(items, item) + metricItemIds = append(metricItemIds, itemId) + } + + publicMetricItems, err := models.SharedMetricItemDAO.FindAllPublicItems(tx, category, nil) + if err != nil { + return nil, err + } + for _, item := range publicMetricItems { + if item.IsOn != 1 { + continue + } + if lists.ContainsInt64(metricItemIds, int64(item.Id)) { + continue + } + items = append(items, item) + } + + for _, item := range items { + var itemId = int64(item.Id) + charts, err := models.SharedMetricChartDAO.FindAllEnabledCharts(tx, itemId) + if err != nil { + return nil, err + } + + for _, chart := range charts { + if chart.IsOn == 0 { + continue + } + + var pbChart = &pb.MetricChart{ + Id: int64(chart.Id), + Name: chart.Name, + Type: chart.Type, + WidthDiv: chart.WidthDiv, + ParamsJSON: nil, + IsOn: chart.IsOn == 1, + MaxItems: types.Int32(chart.MaxItems), + MetricItem: &pb.MetricItem{ + Id: itemId, + PeriodUnit: item.PeriodUnit, + Period: types.Int32(item.Period), + Name: item.Name, + Value: item.Value, + Category: item.Category, + Keys: item.DecodeKeys(), + Code: item.Code, + IsOn: item.IsOn == 1, + }, + } + var pbStats = []*pb.MetricStat{} + switch chart.Type { + case serverconfigs.MetricChartTypeTimeLine: + var itemStats []*models.MetricStat + if serverId > 0 { + itemStats, err = models.SharedMetricStatDAO.FindLatestItemStatsWithServerId(tx, serverId, itemId, chart.IgnoreEmptyKeys == 1, chart.DecodeIgnoredKeys(), types.Int32(item.Version), 10) + } else if nodeId > 0 { + itemStats, err = models.SharedMetricStatDAO.FindLatestItemStatsWithNodeId(tx, nodeId, itemId, chart.IgnoreEmptyKeys == 1, chart.DecodeIgnoredKeys(), types.Int32(item.Version), 10) + } else { + itemStats, err = models.SharedMetricStatDAO.FindLatestItemStatsWithClusterId(tx, clusterId, itemId, chart.IgnoreEmptyKeys == 1, chart.DecodeIgnoredKeys(), types.Int32(item.Version), 10) + } + if err != nil { + return nil, err + } + + for _, stat := range itemStats { + // 当前时间总和 + var count int64 + var total float32 + if serverId > 0 { + count, total, err = models.SharedMetricSumStatDAO.FindServerSum(tx, serverId, stat.Time, itemId, types.Int32(item.Version)) + } else if nodeId > 0 { + count, total, err = models.SharedMetricSumStatDAO.FindNodeSum(tx, nodeId, stat.Time, itemId, types.Int32(item.Version)) + } else { + count, total, err = models.SharedMetricSumStatDAO.FindClusterSum(tx, clusterId, stat.Time, itemId, types.Int32(item.Version)) + } + if err != nil { + return nil, err + } + + pbStats = append(pbStats, &pb.MetricStat{ + Id: int64(stat.Id), + Hash: stat.Hash, + ServerId: 0, + ItemId: 0, + Keys: stat.DecodeKeys(), + Value: types.Float32(stat.Value), + Time: stat.Time, + Version: 0, + NodeCluster: nil, + Node: nil, + Server: nil, + SumCount: count, + SumTotal: total, + }) + } + default: + var itemStats []*models.MetricStat + if serverId > 0 { + itemStats, err = models.SharedMetricStatDAO.FindItemStatsWithServerIdAndLastTime(tx, serverId, itemId, chart.IgnoreEmptyKeys == 1, chart.DecodeIgnoredKeys(), types.Int32(item.Version), 10) + } else if nodeId > 0 { + itemStats, err = models.SharedMetricStatDAO.FindItemStatsWithNodeIdAndLastTime(tx, nodeId, itemId, chart.IgnoreEmptyKeys == 1, chart.DecodeIgnoredKeys(), types.Int32(item.Version), 10) + } else { + itemStats, err = models.SharedMetricStatDAO.FindItemStatsWithClusterIdAndLastTime(tx, clusterId, itemId, chart.IgnoreEmptyKeys == 1, chart.DecodeIgnoredKeys(), types.Int32(item.Version), 10) + } + if err != nil { + return nil, err + } + for _, stat := range itemStats { + // 当前时间总和 + var count int64 + var total float32 + if serverId > 0 { + count, total, err = models.SharedMetricSumStatDAO.FindServerSum(tx, serverId, stat.Time, itemId, types.Int32(item.Version)) + } else if nodeId > 0 { + count, total, err = models.SharedMetricSumStatDAO.FindNodeSum(tx, nodeId, stat.Time, itemId, types.Int32(item.Version)) + } else { + count, total, err = models.SharedMetricSumStatDAO.FindClusterSum(tx, clusterId, stat.Time, itemId, types.Int32(item.Version)) + } + if err != nil { + return nil, err + } + + pbStats = append(pbStats, &pb.MetricStat{ + Id: int64(stat.Id), + Hash: stat.Hash, + ServerId: 0, + ItemId: 0, + Keys: stat.DecodeKeys(), + Value: types.Float32(stat.Value), + Time: stat.Time, + Version: 0, + NodeCluster: nil, + Node: nil, + Server: nil, + SumCount: count, + SumTotal: total, + }) + } + } + pbMetricCharts = append(pbMetricCharts, &pb.MetricDataChart{ + MetricChart: pbChart, + MetricStats: pbStats, + }) + } + } + return pbMetricCharts, nil +}