diff --git a/internal/db/models/metric_stat_dao.go b/internal/db/models/metric_stat_dao.go index 81982f88..6ddd24fd 100644 --- a/internal/db/models/metric_stat_dao.go +++ b/internal/db/models/metric_stat_dao.go @@ -101,7 +101,7 @@ func (this *MetricStatDAO) ListItemStats(tx *dbs.Tx, itemId int64, version int32 return } -// FindItemStatsWithClusterIdAndLastTime 取得最近一次计时前 N 个数据 +// FindItemStatsWithClusterIdAndLastTime 取得集群最近一次计时前 N 个数据 // 适合每条数据中包含不同的Key的场景 func (this *MetricStatDAO) FindItemStatsWithClusterIdAndLastTime(tx *dbs.Tx, clusterId int64, itemId int64, version int32, size int64) (result []*MetricStat, err error) { // 最近一次时间 @@ -135,6 +135,40 @@ func (this *MetricStatDAO) FindItemStatsWithClusterIdAndLastTime(tx *dbs.Tx, clu return } +// FindItemStatsWithNodeIdAndLastTime 取得节点最近一次计时前 N 个数据 +// 适合每条数据中包含不同的Key的场景 +func (this *MetricStatDAO) FindItemStatsWithNodeIdAndLastTime(tx *dbs.Tx, nodeId int64, itemId int64, version int32, size int64) (result []*MetricStat, err error) { + // 最近一次时间 + statOne, err := this.Query(tx). + Attr("itemId", itemId). + Attr("version", version). + DescPk(). + Find() + if err != nil { + return nil, err + } + if statOne == nil { + return nil, nil + } + var lastStat = statOne.(*MetricStat) + var lastTime = lastStat.Time + + _, err = this.Query(tx). + Attr("nodeId", nodeId). + Attr("itemId", itemId). + Attr("version", version). + Attr("time", lastTime). + // TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等 + // TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE + Result("MIN(time) AS time", "SUM(value) AS value", "keys"). + Desc("value"). + Group("keys"). + Limit(size). + Slice(&result). + FindAll() + return +} + // FindLatestItemStatsWithClusterId 取得集群最近 N 个时间的数据 // 适合同个Key在不同时间段的变化场景 func (this *MetricStatDAO) FindLatestItemStatsWithClusterId(tx *dbs.Tx, clusterId int64, itemId int64, version int32, size int64) (result []*MetricStat, err error) { @@ -156,3 +190,25 @@ func (this *MetricStatDAO) FindLatestItemStatsWithClusterId(tx *dbs.Tx, clusterI lists.Reverse(result) return } + +// FindLatestItemStatsWithNodeId 取得节点最近 N 个时间的数据 +// 适合同个Key在不同时间段的变化场景 +func (this *MetricStatDAO) FindLatestItemStatsWithNodeId(tx *dbs.Tx, nodeId int64, itemId int64, version int32, size int64) (result []*MetricStat, err error) { + _, err = this.Query(tx). + Attr("nodeId", nodeId). + Attr("itemId", itemId). + Attr("version", version). + // TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等 + // TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE + Result("time", "SUM(value) AS value", "MIN(`keys`) AS `keys`"). + Desc("time"). + Group("time"). + Limit(size). + Slice(&result). + FindAll() + if err != nil { + return nil, err + } + lists.Reverse(result) + return +} diff --git a/internal/db/models/metric_sum_stat_dao.go b/internal/db/models/metric_sum_stat_dao.go index 7945f692..59670f08 100644 --- a/internal/db/models/metric_sum_stat_dao.go +++ b/internal/db/models/metric_sum_stat_dao.go @@ -46,8 +46,8 @@ func (this *MetricSumStatDAO) UpdateSum(tx *dbs.Tx, clusterId int64, nodeId int6 }) } -// FindNodeSum 查找节点上的统计数据 -func (this *MetricSumStatDAO) FindNodeSum(tx *dbs.Tx, nodeId int64, serverId int64, time string, itemId int64, version int32) (count int64, total float32, err error) { +// FindServerSum 查找某个服务的统计数据 +func (this *MetricSumStatDAO) FindServerSum(tx *dbs.Tx, nodeId int64, serverId int64, time string, itemId int64, version int32) (count int64, total float32, err error) { one, err := this.Query(tx). Attr("nodeId", nodeId). Attr("serverId", serverId). @@ -81,3 +81,21 @@ func (this *MetricSumStatDAO) FindClusterSum(tx *dbs.Tx, clusterId int64, time s } return int64(one.(*MetricSumStat).Count), float32(one.(*MetricSumStat).Total), nil } + +// FindNodeSum 查找节点上的统计数据 +func (this *MetricSumStatDAO) FindNodeSum(tx *dbs.Tx, nodeId int64, time string, itemId int64, version int32) (count int64, total float32, err error) { + one, err := this.Query(tx). + Attr("nodeId", nodeId). + Attr("time", time). + Attr("itemId", itemId). + Attr("version", version). + Result("SUM(count) AS `count`, SUM(total) AS total"). + Find() + if err != nil { + return 0, 0, err + } + if one == nil { + return + } + return int64(one.(*MetricSumStat).Count), float32(one.(*MetricSumStat).Total), nil +} diff --git a/internal/db/models/node_value_dao.go b/internal/db/models/node_value_dao.go index 572cb447..2dcd11b5 100644 --- a/internal/db/models/node_value_dao.go +++ b/internal/db/models/node_value_dao.go @@ -92,7 +92,7 @@ func (this *NodeValueDAO) ListValues(tx *dbs.Tx, role string, nodeId int64, item } // ListValuesWithClusterId 列出集群最近的的平均数据 -func (this *NodeValueDAO) ListValuesWithClusterId(tx *dbs.Tx, clusterId int64, role string, item string, key string, timeRange nodeconfigs.NodeValueRange) (result []*NodeValue, err error) { +func (this *NodeValueDAO) ListValuesWithClusterId(tx *dbs.Tx, role string, clusterId int64, item string, key string, timeRange nodeconfigs.NodeValueRange) (result []*NodeValue, err error) { query := this.Query(tx). Attr("role", role). Attr("clusterId", clusterId). @@ -144,3 +144,20 @@ func (this *NodeValueDAO) SumValues(tx *dbs.Tx, role string, nodeId int64, item } return query.FindFloat64Col(0) } + +// FindLatestNodeValue 获取最近一条数据 +func (this *NodeValueDAO) FindLatestNodeValue(tx *dbs.Tx, role string, nodeId int64, item string) (*NodeValue, error) { + one, err := this.Query(tx). + Attr("role", role). + Attr("nodeId", nodeId). + Attr("item", item). + DescPk(). + Find() + if err != nil { + return nil, err + } + if one == nil { + return nil, nil + } + return one.(*NodeValue), nil +} diff --git a/internal/db/models/node_value_model_ext.go b/internal/db/models/node_value_model_ext.go index 2640e7f9..7060146f 100644 --- a/internal/db/models/node_value_model_ext.go +++ b/internal/db/models/node_value_model_ext.go @@ -1 +1,19 @@ package models + +import ( + "encoding/json" + "github.com/iwind/TeaGo/maps" +) + +func (this *NodeValue) DecodeMapValue() maps.Map { + if len(this.Value) == 0 { + return maps.Map{} + } + var m = maps.Map{} + err := json.Unmarshal([]byte(this.Value), &m) + if err != nil { + // 忽略错误 + return m + } + return m +} diff --git a/internal/db/models/stats/node_traffic_daily_stat_dao.go b/internal/db/models/stats/node_traffic_daily_stat_dao.go index f0598a19..248e2fae 100644 --- a/internal/db/models/stats/node_traffic_daily_stat_dao.go +++ b/internal/db/models/stats/node_traffic_daily_stat_dao.go @@ -3,6 +3,7 @@ package stats import ( "github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" + "github.com/TeaOSLab/EdgeAPI/internal/utils" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" @@ -78,6 +79,37 @@ func (this *NodeTrafficDailyStatDAO) IncreaseDailyStat(tx *dbs.Tx, clusterId int return nil } + +// FindDailyStats 获取日期之间统计 +func (this *NodeTrafficDailyStatDAO) FindDailyStats(tx *dbs.Tx, role string, nodeId int64, dayFrom string, dayTo string) (result []*NodeTrafficDailyStat, err error) { + ones, err := this.Query(tx). + Attr("nodeId", nodeId). + Attr("role", role). + Between("day", dayFrom, dayTo). + FindAll() + if err != nil { + return nil, err + } + dayMap := map[string]*NodeTrafficDailyStat{} // day => Stat + for _, one := range ones { + stat := one.(*NodeTrafficDailyStat) + dayMap[stat.Day] = stat + } + days, err := utils.RangeDays(dayFrom, dayTo) + if err != nil { + return nil, err + } + for _, day := range days { + stat, ok := dayMap[day] + if ok { + result = append(result, stat) + } else { + result = append(result, &NodeTrafficDailyStat{Day: day}) + } + } + return result, nil +} + // Clean 清理历史数据 func (this *NodeTrafficDailyStatDAO) Clean(tx *dbs.Tx, days int) error { var day = timeutil.Format("Ymd", time.Now().AddDate(0, 0, -days)) diff --git a/internal/db/models/stats/node_traffic_hourly_stat_dao.go b/internal/db/models/stats/node_traffic_hourly_stat_dao.go index 5f0624be..10736211 100644 --- a/internal/db/models/stats/node_traffic_hourly_stat_dao.go +++ b/internal/db/models/stats/node_traffic_hourly_stat_dao.go @@ -110,10 +110,11 @@ func (this *NodeTrafficHourlyStatDAO) FindHourlyStatsWithClusterId(tx *dbs.Tx, c return result, nil } -// FindTopNodeStatsWithClusterId 取得一定时间内的节点排行数据 -func (this *NodeTrafficHourlyStatDAO) FindTopNodeStatsWithClusterId(tx *dbs.Tx, clusterId int64, hourFrom string, hourTo string) (result []*NodeTrafficHourlyStat, err error) { +// FindTopNodeStatsWithClusterId 取得集群一定时间内的节点排行数据 +func (this *NodeTrafficHourlyStatDAO) FindTopNodeStatsWithClusterId(tx *dbs.Tx, role string, clusterId int64, hourFrom string, hourTo string) (result []*NodeTrafficHourlyStat, err error) { // TODO 节点如果已经被删除,则忽略 _, err = this.Query(tx). + Attr("role", role). Attr("clusterId", clusterId). Between("hour", hourFrom, hourTo). Result("nodeId, SUM(bytes) AS bytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests"). @@ -124,6 +125,38 @@ func (this *NodeTrafficHourlyStatDAO) FindTopNodeStatsWithClusterId(tx *dbs.Tx, return } +// FindHourlyStatsWithNodeId 获取节点小时之间统计 +func (this *NodeTrafficHourlyStatDAO) FindHourlyStatsWithNodeId(tx *dbs.Tx, role string, nodeId int64, hourFrom string, hourTo string) (result []*NodeTrafficHourlyStat, err error) { + ones, err := this.Query(tx). + Attr("role", role). + Attr("nodeId", nodeId). + Between("hour", hourFrom, hourTo). + Result("hour, SUM(bytes) AS bytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests"). + Group("hour"). + FindAll() + if err != nil { + return nil, err + } + hourMap := map[string]*NodeTrafficHourlyStat{} // hour => Stat + for _, one := range ones { + stat := one.(*NodeTrafficHourlyStat) + hourMap[stat.Hour] = stat + } + hours, err := utils.RangeHours(hourFrom, hourTo) + if err != nil { + return nil, err + } + for _, hour := range hours { + stat, ok := hourMap[hour] + if ok { + result = append(result, stat) + } else { + result = append(result, &NodeTrafficHourlyStat{Hour: hour}) + } + } + return result, nil +} + // Clean 清理历史数据 func (this *NodeTrafficHourlyStatDAO) Clean(tx *dbs.Tx, days int) error { var hour = timeutil.Format("Ymd00", time.Now().AddDate(0, 0, -days)) diff --git a/internal/db/models/stats/server_domain_hourly_stat_dao.go b/internal/db/models/stats/server_domain_hourly_stat_dao.go index c42e5dba..40e1ec9c 100644 --- a/internal/db/models/stats/server_domain_hourly_stat_dao.go +++ b/internal/db/models/stats/server_domain_hourly_stat_dao.go @@ -79,7 +79,7 @@ func (this *ServerDomainHourlyStatDAO) IncreaseHourlyStat(tx *dbs.Tx, clusterId return nil } -// FindTopDomainStatsWithClusterId 取得一定时间内的节点排行数据 +// FindTopDomainStatsWithClusterId 取得集群上的一定时间内的域名排行数据 func (this *ServerDomainHourlyStatDAO) FindTopDomainStatsWithClusterId(tx *dbs.Tx, clusterId int64, hourFrom string, hourTo string) (result []*ServerDomainHourlyStat, err error) { // TODO 节点如果已经被删除,则忽略 _, err = this.Query(tx). @@ -93,6 +93,20 @@ func (this *ServerDomainHourlyStatDAO) FindTopDomainStatsWithClusterId(tx *dbs.T return } +// FindTopDomainStatsWithNodeId 取得节点上的一定时间内的域名排行数据 +func (this *ServerDomainHourlyStatDAO) FindTopDomainStatsWithNodeId(tx *dbs.Tx, nodeId int64, hourFrom string, hourTo string) (result []*ServerDomainHourlyStat, err error) { + // TODO 节点如果已经被删除,则忽略 + _, err = this.Query(tx). + Attr("nodeId", nodeId). + Between("hour", hourFrom, hourTo). + Result("domain, MIN(serverId) AS serverId, SUM(bytes) AS bytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests"). + Group("domain"). + Desc("countRequests"). + Slice(&result). + FindAll() + return +} + // Clean 清理历史数据 func (this *ServerDomainHourlyStatDAO) Clean(tx *dbs.Tx, days int) error { var hour = timeutil.Format("Ymd00", time.Now().AddDate(0, 0, -days)) diff --git a/internal/rpc/services/service_metric_stat.go b/internal/rpc/services/service_metric_stat.go index 7ff4510b..4024fd5c 100644 --- a/internal/rpc/services/service_metric_stat.go +++ b/internal/rpc/services/service_metric_stat.go @@ -97,7 +97,7 @@ func (this *MetricStatService) ListMetricStats(ctx context.Context, req *pb.List } // 查找sum值 - count, total, err := models.SharedMetricSumStatDAO.FindNodeSum(tx, int64(stat.NodeId), int64(stat.ServerId), stat.Time, int64(stat.ItemId), types.Int32(stat.Version)) + count, total, err := models.SharedMetricSumStatDAO.FindServerSum(tx, int64(stat.NodeId), int64(stat.ServerId), stat.Time, int64(stat.ItemId), types.Int32(stat.Version)) if err != nil { return nil, err } diff --git a/internal/rpc/services/service_server_stat_board.go b/internal/rpc/services/service_server_stat_board.go index 7adc737b..10639333 100644 --- a/internal/rpc/services/service_server_stat_board.go +++ b/internal/rpc/services/service_server_stat_board.go @@ -7,6 +7,7 @@ import ( "encoding/json" "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models/stats" + "github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeCommon/pkg/configutils" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" @@ -116,7 +117,7 @@ func (this *ServerStatBoardService) ComposeServerStatNodeClusterBoard(ctx contex } // 节点排行 - topNodeStats, err := stats.SharedNodeTrafficHourlyStatDAO.FindTopNodeStatsWithClusterId(tx, req.NodeClusterId, hourFrom, hourTo) + topNodeStats, err := stats.SharedNodeTrafficHourlyStatDAO.FindTopNodeStatsWithClusterId(tx, "node", req.NodeClusterId, hourFrom, hourTo) if err != nil { return nil, err } @@ -151,7 +152,7 @@ func (this *ServerStatBoardService) ComposeServerStatNodeClusterBoard(ctx contex } // CPU、内存、负载 - cpuValues, err := models.SharedNodeValueDAO.ListValuesWithClusterId(tx, req.NodeClusterId, "node", nodeconfigs.NodeValueItemCPU, "usage", nodeconfigs.NodeValueRangeMinute) + cpuValues, err := models.SharedNodeValueDAO.ListValuesWithClusterId(tx, "node", req.NodeClusterId, nodeconfigs.NodeValueItemCPU, "usage", nodeconfigs.NodeValueRangeMinute) if err != nil { return nil, err } @@ -166,7 +167,7 @@ func (this *ServerStatBoardService) ComposeServerStatNodeClusterBoard(ctx contex }) } - memoryValues, err := models.SharedNodeValueDAO.ListValuesWithClusterId(tx, req.NodeClusterId, "node", nodeconfigs.NodeValueItemMemory, "usage", nodeconfigs.NodeValueRangeMinute) + memoryValues, err := models.SharedNodeValueDAO.ListValuesWithClusterId(tx, "node", req.NodeClusterId, nodeconfigs.NodeValueItemMemory, "usage", nodeconfigs.NodeValueRangeMinute) if err != nil { return nil, err } @@ -181,7 +182,7 @@ func (this *ServerStatBoardService) ComposeServerStatNodeClusterBoard(ctx contex }) } - loadValues, err := models.SharedNodeValueDAO.ListValuesWithClusterId(tx, req.NodeClusterId, "node", nodeconfigs.NodeValueItemLoad, "load5m", nodeconfigs.NodeValueRangeMinute) + loadValues, err := models.SharedNodeValueDAO.ListValuesWithClusterId(tx, "node", req.NodeClusterId, nodeconfigs.NodeValueItemLoad, "load5m", nodeconfigs.NodeValueRangeMinute) if err != nil { return nil, err } @@ -316,3 +317,334 @@ func (this *ServerStatBoardService) ComposeServerStatNodeClusterBoard(ctx contex return result, nil } + +// ComposeServerStatNodeBoard 组合节点看板数据 +func (this *ServerStatBoardService) ComposeServerStatNodeBoard(ctx context.Context, req *pb.ComposeServerStatNodeBoardRequest) (*pb.ComposeServerStatNodeBoardResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var result = &pb.ComposeServerStatNodeBoardResponse{} + + // 在线状态 + var tx = this.NullTx() + node, err := models.SharedNodeDAO.FindEnabledNode(tx, req.NodeId) + if err != nil { + return nil, err + } + if node == nil { + return nil, errors.New("node not found") + } + + status, err := node.DecodeStatus() + if err != nil { + return nil, err + } + if status != nil && time.Now().Unix()-status.UpdatedAt < 60 { + result.IsActive = true + result.CacheDiskSize = status.CacheTotalDiskSize + result.CacheMemorySize = status.CacheTotalMemorySize + + } + + // 流量 + { + value, err := models.SharedNodeValueDAO.FindLatestNodeValue(tx, "node", int64(node.Id), nodeconfigs.NodeValueItemTrafficIn) + if err != nil { + return nil, err + } + if value != nil && time.Now().Unix()-int64(value.CreatedAt) < 120 { + result.TrafficInBytes = value.DecodeMapValue().GetInt64("total") + } + } + { + value, err := models.SharedNodeValueDAO.FindLatestNodeValue(tx, "node", int64(node.Id), nodeconfigs.NodeValueItemTrafficOut) + if err != nil { + return nil, err + } + if value != nil && time.Now().Unix()-int64(value.CreatedAt) < 120 { + result.TrafficOutBytes = value.DecodeMapValue().GetInt64("total") + } + } + + // 连接数 + { + value, err := models.SharedNodeValueDAO.FindLatestNodeValue(tx, "node", int64(node.Id), nodeconfigs.NodeValueItemConnections) + if err != nil { + return nil, err + } + if value != nil && time.Now().Unix()-int64(value.CreatedAt) < 120 { + result.CountConnections = value.DecodeMapValue().GetInt64("total") + } + } + + // 请求量 + { + value, err := models.SharedNodeValueDAO.FindLatestNodeValue(tx, "node", int64(node.Id), nodeconfigs.NodeValueItemRequests) + if err != nil { + return nil, err + } + if value != nil && time.Now().Unix()-int64(value.CreatedAt) < 120 { + result.CountRequests = value.DecodeMapValue().GetInt64("total") + } + } + { + value, err := models.SharedNodeValueDAO.FindLatestNodeValue(tx, "node", int64(node.Id), nodeconfigs.NodeValueItemAttackRequests) + if err != nil { + return nil, err + } + if value != nil && time.Now().Unix()-int64(value.CreatedAt) < 120 { + result.CountAttackRequests = value.DecodeMapValue().GetInt64("total") + } + } + + // CPU + { + value, err := models.SharedNodeValueDAO.FindLatestNodeValue(tx, "node", int64(node.Id), nodeconfigs.NodeValueItemCPU) + if err != nil { + return nil, err + } + if value != nil && time.Now().Unix()-int64(value.CreatedAt) < 120 { + result.CpuUsage = value.DecodeMapValue().GetFloat32("usage") + } + } + + // 内存 + { + value, err := models.SharedNodeValueDAO.FindLatestNodeValue(tx, "node", int64(node.Id), nodeconfigs.NodeValueItemMemory) + if err != nil { + return nil, err + } + if value != nil && time.Now().Unix()-int64(value.CreatedAt) < 120 { + m := value.DecodeMapValue() + result.MemoryUsage = m.GetFloat32("usage") + result.MemoryTotalSize = m.GetInt64("total") + } + } + + // 负载 + { + value, err := models.SharedNodeValueDAO.FindLatestNodeValue(tx, "node", int64(node.Id), nodeconfigs.NodeValueItemLoad) + if err != nil { + return nil, err + } + if value != nil && time.Now().Unix()-int64(value.CreatedAt) < 120 { + result.Load = value.DecodeMapValue().GetFloat32("load1m") + } + } + + // 按日流量统计 + dayFrom := timeutil.Format("Ymd", time.Now().AddDate(0, 0, -14)) + dailyTrafficStats, err := stats.SharedNodeTrafficDailyStatDAO.FindDailyStats(tx, "node", req.NodeId, dayFrom, timeutil.Format("Ymd")) + if err != nil { + return nil, err + } + for _, stat := range dailyTrafficStats { + result.DailyTrafficStats = append(result.DailyTrafficStats, &pb.ComposeServerStatNodeBoardResponse_DailyTrafficStat{ + Day: stat.Day, + Bytes: int64(stat.Bytes), + CachedBytes: int64(stat.CachedBytes), + CountRequests: int64(stat.CountRequests), + CountCachedRequests: int64(stat.CountCachedRequests), + }) + } + + // 小时流量统计 + hourFrom := timeutil.Format("YmdH", time.Now().Add(-23*time.Hour)) + hourTo := timeutil.Format("YmdH") + hourlyTrafficStats, err := stats.SharedNodeTrafficHourlyStatDAO.FindHourlyStatsWithNodeId(tx, "node", req.NodeId, hourFrom, hourTo) + if err != nil { + return nil, err + } + for _, stat := range hourlyTrafficStats { + result.HourlyTrafficStats = append(result.HourlyTrafficStats, &pb.ComposeServerStatNodeBoardResponse_HourlyTrafficStat{ + Hour: stat.Hour, + Bytes: int64(stat.Bytes), + CachedBytes: int64(stat.CachedBytes), + CountRequests: int64(stat.CountRequests), + CountCachedRequests: int64(stat.CountCachedRequests), + }) + } + + // 域名排行 + topDomainStats, err := stats.SharedServerDomainHourlyStatDAO.FindTopDomainStatsWithNodeId(tx, req.NodeId, hourFrom, hourTo) + if err != nil { + return nil, err + } + for _, stat := range topDomainStats { + result.TopDomainStats = append(result.TopDomainStats, &pb.ComposeServerStatNodeBoardResponse_DomainStat{ + ServerId: int64(stat.ServerId), + Domain: stat.Domain, + CountRequests: int64(stat.CountRequests), + Bytes: int64(stat.Bytes), + }) + } + + // CPU、内存、负载 + cpuValues, err := models.SharedNodeValueDAO.ListValues(tx, "node", req.NodeId, nodeconfigs.NodeValueItemCPU, nodeconfigs.NodeValueRangeMinute) + if err != nil { + return nil, err + } + for _, v := range cpuValues { + valueJSON, err := json.Marshal(types.Float32(v.DecodeMapValue().GetFloat32("usage"))) + if err != nil { + return nil, err + } + result.CpuNodeValues = append(result.CpuNodeValues, &pb.NodeValue{ + ValueJSON: valueJSON, + CreatedAt: int64(v.CreatedAt), + }) + } + + memoryValues, err := models.SharedNodeValueDAO.ListValues(tx, "node", req.NodeId, nodeconfigs.NodeValueItemMemory, nodeconfigs.NodeValueRangeMinute) + if err != nil { + return nil, err + } + for _, v := range memoryValues { + valueJSON, err := json.Marshal(types.Float32(v.DecodeMapValue().GetFloat32("usage"))) + if err != nil { + return nil, err + } + result.MemoryNodeValues = append(result.MemoryNodeValues, &pb.NodeValue{ + ValueJSON: valueJSON, + CreatedAt: int64(v.CreatedAt), + }) + } + + loadValues, err := models.SharedNodeValueDAO.ListValues(tx, "node", req.NodeId, nodeconfigs.NodeValueItemLoad, nodeconfigs.NodeValueRangeMinute) + if err != nil { + return nil, err + } + for _, v := range loadValues { + valueJSON, err := json.Marshal(types.Float32(v.DecodeMapValue().GetFloat32("load5m"))) + if err != nil { + return nil, err + } + result.LoadNodeValues = append(result.LoadNodeValues, &pb.NodeValue{ + ValueJSON: valueJSON, + CreatedAt: int64(v.CreatedAt), + }) + } + + // 指标 + var clusterId = int64(node.ClusterId) + clusterMetricItems, err := models.SharedNodeClusterMetricItemDAO.FindAllClusterItems(tx, clusterId, serverconfigs.MetricItemCategoryHTTP) + if err != nil { + return nil, err + } + var pbMetricCharts = []*pb.ComposeServerStatNodeBoardResponse_MetricData{} + for _, clusterMetricItem := range clusterMetricItems { + if clusterMetricItem.IsOn != 1 { + continue + } + var itemId = int64(clusterMetricItem.ItemId) + charts, err := models.SharedMetricChartDAO.FindAllEnabledCharts(tx, itemId) + if err != nil { + return nil, err + } + + item, err := models.SharedMetricItemDAO.FindEnabledMetricItem(tx, itemId) + if err != nil { + return nil, err + } + if item == nil || item.IsOn == 0 { + continue + } + + for _, chart := range charts { + if chart.IsOn == 0 { + continue + } + + var pbChart = &pb.MetricChart{ + Id: int64(chart.Id), + Name: chart.Name, + Type: chart.Type, + WidthDiv: chart.WidthDiv, + ParamsJSON: nil, + IsOn: chart.IsOn == 1, + MaxItems: types.Int32(chart.MaxItems), + MetricItem: &pb.MetricItem{ + Id: itemId, + PeriodUnit: item.PeriodUnit, + Period: types.Int32(item.Period), + Name: item.Name, + Value: item.Value, + Category: item.Category, + Keys: item.DecodeKeys(), + Code: item.Code, + IsOn: item.IsOn == 1, + }, + } + var pbStats = []*pb.MetricStat{} + switch chart.Type { + case serverconfigs.MetricChartTypeTimeLine: + itemStats, err := models.SharedMetricStatDAO.FindLatestItemStatsWithNodeId(tx, req.NodeId, itemId, types.Int32(item.Version), 10) + if err != nil { + return nil, err + } + + for _, stat := range itemStats { + // 当前时间总和 + count, total, err := models.SharedMetricSumStatDAO.FindNodeSum(tx, req.NodeId, stat.Time, itemId, types.Int32(item.Version)) + if err != nil { + return nil, err + } + + pbStats = append(pbStats, &pb.MetricStat{ + Id: int64(stat.Id), + Hash: stat.Hash, + ServerId: 0, + ItemId: 0, + Keys: stat.DecodeKeys(), + Value: types.Float32(stat.Value), + Time: stat.Time, + Version: 0, + NodeCluster: nil, + Node: nil, + Server: nil, + SumCount: count, + SumTotal: total, + }) + } + default: + itemStats, err := models.SharedMetricStatDAO.FindItemStatsWithNodeIdAndLastTime(tx, req.NodeId, itemId, types.Int32(item.Version), 10) + if err != nil { + return nil, err + } + for _, stat := range itemStats { + // 当前时间总和 + // 当前时间总和 + count, total, err := models.SharedMetricSumStatDAO.FindNodeSum(tx, req.NodeId, stat.Time, itemId, types.Int32(item.Version)) + if err != nil { + return nil, err + } + + pbStats = append(pbStats, &pb.MetricStat{ + Id: int64(stat.Id), + Hash: stat.Hash, + ServerId: 0, + ItemId: 0, + Keys: stat.DecodeKeys(), + Value: types.Float32(stat.Value), + Time: stat.Time, + Version: 0, + NodeCluster: nil, + Node: nil, + Server: nil, + SumCount: count, + SumTotal: total, + }) + } + } + pbMetricCharts = append(pbMetricCharts, &pb.ComposeServerStatNodeBoardResponse_MetricData{ + MetricChart: pbChart, + MetricStats: pbStats, + }) + } + } + result.MetricCharts = pbMetricCharts + + return result, nil +}