diff --git a/internal/db/models/metric_stat_dao.go b/internal/db/models/metric_stat_dao.go index 6ddd24fd..54ed1b8c 100644 --- a/internal/db/models/metric_stat_dao.go +++ b/internal/db/models/metric_stat_dao.go @@ -169,6 +169,40 @@ func (this *MetricStatDAO) FindItemStatsWithNodeIdAndLastTime(tx *dbs.Tx, nodeId return } +// FindItemStatsWithServerIdAndLastTime 取得节点最近一次计时前 N 个数据 +// 适合每条数据中包含不同的Key的场景 +func (this *MetricStatDAO) FindItemStatsWithServerIdAndLastTime(tx *dbs.Tx, serverId int64, itemId int64, version int32, size int64) (result []*MetricStat, err error) { + // 最近一次时间 + statOne, err := this.Query(tx). + Attr("itemId", itemId). + Attr("version", version). + DescPk(). + Find() + if err != nil { + return nil, err + } + if statOne == nil { + return nil, nil + } + var lastStat = statOne.(*MetricStat) + var lastTime = lastStat.Time + + _, err = this.Query(tx). + Attr("serverId", serverId). + Attr("itemId", itemId). + Attr("version", version). + Attr("time", lastTime). + // TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等 + // TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE + Result("MIN(time) AS time", "SUM(value) AS value", "keys"). + Desc("value"). + Group("keys"). + Limit(size). + Slice(&result). + FindAll() + return +} + // FindLatestItemStatsWithClusterId 取得集群最近 N 个时间的数据 // 适合同个Key在不同时间段的变化场景 func (this *MetricStatDAO) FindLatestItemStatsWithClusterId(tx *dbs.Tx, clusterId int64, itemId int64, version int32, size int64) (result []*MetricStat, err error) { @@ -212,3 +246,25 @@ func (this *MetricStatDAO) FindLatestItemStatsWithNodeId(tx *dbs.Tx, nodeId int6 lists.Reverse(result) return } + +// FindLatestItemStatsWithServerId 取得服务最近 N 个时间的数据 +// 适合同个Key在不同时间段的变化场景 +func (this *MetricStatDAO) FindLatestItemStatsWithServerId(tx *dbs.Tx, serverId int64, itemId int64, version int32, size int64) (result []*MetricStat, err error) { + _, err = this.Query(tx). + Attr("serverId", serverId). + Attr("itemId", itemId). + Attr("version", version). + // TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等 + // TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE + Result("time", "SUM(value) AS value", "MIN(`keys`) AS `keys`"). + Desc("time"). + Group("time"). + Limit(size). + Slice(&result). + FindAll() + if err != nil { + return nil, err + } + lists.Reverse(result) + return +} diff --git a/internal/db/models/metric_sum_stat_dao.go b/internal/db/models/metric_sum_stat_dao.go index 59670f08..0524bdec 100644 --- a/internal/db/models/metric_sum_stat_dao.go +++ b/internal/db/models/metric_sum_stat_dao.go @@ -46,8 +46,8 @@ func (this *MetricSumStatDAO) UpdateSum(tx *dbs.Tx, clusterId int64, nodeId int6 }) } -// FindServerSum 查找某个服务的统计数据 -func (this *MetricSumStatDAO) FindServerSum(tx *dbs.Tx, nodeId int64, serverId int64, time string, itemId int64, version int32) (count int64, total float32, err error) { +// FindNodeServerSum 查找某个服务在某个节点上的统计数据 +func (this *MetricSumStatDAO) FindNodeServerSum(tx *dbs.Tx, nodeId int64, serverId int64, time string, itemId int64, version int32) (count int64, total float32, err error) { one, err := this.Query(tx). Attr("nodeId", nodeId). Attr("serverId", serverId). @@ -64,6 +64,24 @@ func (this *MetricSumStatDAO) FindServerSum(tx *dbs.Tx, nodeId int64, serverId i return int64(one.(*MetricSumStat).Count), float32(one.(*MetricSumStat).Total), nil } +// FindServerSum 查找某个服务的统计数据 +func (this *MetricSumStatDAO) FindServerSum(tx *dbs.Tx, serverId int64, time string, itemId int64, version int32) (count int64, total float32, err error) { + one, err := this.Query(tx). + Attr("serverId", serverId). + Attr("time", time). + Attr("itemId", itemId). + Attr("version", version). + Result("SUM(count) AS `count`, SUM(total) AS total"). + Find() + if err != nil { + return 0, 0, err + } + if one == nil { + return + } + return int64(one.(*MetricSumStat).Count), float32(one.(*MetricSumStat).Total), nil +} + // FindClusterSum 查找集群上的统计数据 func (this *MetricSumStatDAO) FindClusterSum(tx *dbs.Tx, clusterId int64, time string, itemId int64, version int32) (count int64, total float32, err error) { one, err := this.Query(tx). diff --git a/internal/db/models/server_daily_stat_dao.go b/internal/db/models/server_daily_stat_dao.go index f9a6605b..9760a213 100644 --- a/internal/db/models/server_daily_stat_dao.go +++ b/internal/db/models/server_daily_stat_dao.go @@ -1,6 +1,7 @@ package models import ( + "github.com/TeaOSLab/EdgeAPI/internal/utils" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" @@ -237,6 +238,72 @@ func (this *ServerDailyStatDAO) SumDailyStat(tx *dbs.Tx, serverId int64, day str return } +// FindDailyStats 按天统计 +func (this *ServerDailyStatDAO) FindDailyStats(tx *dbs.Tx, serverId int64, dayFrom string, dayTo string) (result []*ServerDailyStat, err error) { + ones, err := this.Query(tx). + Result("SUM(bytes) AS bytes", "SUM(cachedBytes) AS cachedBytes", "SUM(countRequests) AS countRequests", "SUM(countCachedRequests) AS countCachedRequests", "day"). + Attr("serverId", serverId). + Between("day", dayFrom, dayTo). + Group("day"). + FindAll() + if err != nil { + return nil, err + } + + dayMap := map[string]*ServerDailyStat{} // day => Stat + for _, one := range ones { + stat := one.(*ServerDailyStat) + dayMap[stat.Day] = stat + } + days, err := utils.RangeDays(dayFrom, dayTo) + if err != nil { + return nil, err + } + for _, day := range days { + stat, ok := dayMap[day] + if ok { + result = append(result, stat) + } else { + result = append(result, &ServerDailyStat{Day: day}) + } + } + + return +} + +// FindHourlyStats 按小时统计 +func (this *ServerDailyStatDAO) FindHourlyStats(tx *dbs.Tx, serverId int64, hourFrom string, hourTo string) (result []*ServerDailyStat, err error) { + ones, err := this.Query(tx). + Result("SUM(bytes) AS bytes", "SUM(cachedBytes) AS cachedBytes", "SUM(countRequests) AS countRequests", "SUM(countCachedRequests) AS countCachedRequests", "hour"). + Attr("serverId", serverId). + Between("hour", hourFrom, hourTo). + Group("hour"). + FindAll() + if err != nil { + return nil, err + } + + hourMap := map[string]*ServerDailyStat{} // hour => Stat + for _, one := range ones { + stat := one.(*ServerDailyStat) + hourMap[stat.Hour] = stat + } + hours, err := utils.RangeHours(hourFrom, hourTo) + if err != nil { + return nil, err + } + for _, hour := range hours { + stat, ok := hourMap[hour] + if ok { + result = append(result, stat) + } else { + result = append(result, &ServerDailyStat{Hour: hour}) + } + } + + return +} + // Clean 清理历史数据 func (this *ServerDailyStatDAO) Clean(tx *dbs.Tx, days int) error { var day = timeutil.Format("Ymd", time.Now().AddDate(0, 0, -days)) diff --git a/internal/db/models/stats/server_domain_hourly_stat_dao.go b/internal/db/models/stats/server_domain_hourly_stat_dao.go index 40e1ec9c..3fc68e4a 100644 --- a/internal/db/models/stats/server_domain_hourly_stat_dao.go +++ b/internal/db/models/stats/server_domain_hourly_stat_dao.go @@ -107,6 +107,20 @@ func (this *ServerDomainHourlyStatDAO) FindTopDomainStatsWithNodeId(tx *dbs.Tx, return } +// FindTopDomainStatsWithServerId 取得某个服务的一定时间内的域名排行数据 +func (this *ServerDomainHourlyStatDAO) FindTopDomainStatsWithServerId(tx *dbs.Tx, serverId int64, hourFrom string, hourTo string) (result []*ServerDomainHourlyStat, err error) { + // TODO 节点如果已经被删除,则忽略 + _, err = this.Query(tx). + Attr("serverId", serverId). + Between("hour", hourFrom, hourTo). + Result("domain, MIN(serverId) AS serverId, SUM(bytes) AS bytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests"). + Group("domain"). + Desc("countRequests"). + Slice(&result). + FindAll() + return +} + // Clean 清理历史数据 func (this *ServerDomainHourlyStatDAO) Clean(tx *dbs.Tx, days int) error { var hour = timeutil.Format("Ymd00", time.Now().AddDate(0, 0, -days)) diff --git a/internal/rpc/services/service_metric_stat.go b/internal/rpc/services/service_metric_stat.go index 4024fd5c..73a7cee0 100644 --- a/internal/rpc/services/service_metric_stat.go +++ b/internal/rpc/services/service_metric_stat.go @@ -97,7 +97,7 @@ func (this *MetricStatService) ListMetricStats(ctx context.Context, req *pb.List } // 查找sum值 - count, total, err := models.SharedMetricSumStatDAO.FindServerSum(tx, int64(stat.NodeId), int64(stat.ServerId), stat.Time, int64(stat.ItemId), types.Int32(stat.Version)) + count, total, err := models.SharedMetricSumStatDAO.FindNodeServerSum(tx, int64(stat.NodeId), int64(stat.ServerId), stat.Time, int64(stat.ItemId), types.Int32(stat.Version)) if err != nil { return nil, err } diff --git a/internal/rpc/services/service_server_stat_board.go b/internal/rpc/services/service_server_stat_board.go index 10639333..cef6d494 100644 --- a/internal/rpc/services/service_server_stat_board.go +++ b/internal/rpc/services/service_server_stat_board.go @@ -283,7 +283,6 @@ func (this *ServerStatBoardService) ComposeServerStatNodeClusterBoard(ctx contex return nil, err } for _, stat := range itemStats { - // 当前时间总和 // 当前时间总和 count, total, err := models.SharedMetricSumStatDAO.FindClusterSum(tx, req.NodeClusterId, stat.Time, itemId, types.Int32(item.Version)) if err != nil { @@ -614,7 +613,6 @@ func (this *ServerStatBoardService) ComposeServerStatNodeBoard(ctx context.Conte return nil, err } for _, stat := range itemStats { - // 当前时间总和 // 当前时间总和 count, total, err := models.SharedMetricSumStatDAO.FindNodeSum(tx, req.NodeId, stat.Time, itemId, types.Int32(item.Version)) if err != nil { @@ -648,3 +646,184 @@ func (this *ServerStatBoardService) ComposeServerStatNodeBoard(ctx context.Conte return result, nil } + +// ComposeServerStatBoard 组合服务看板数据 +func (this *ServerStatBoardService) ComposeServerStatBoard(ctx context.Context, req *pb.ComposeServerStatBoardRequest) (*pb.ComposeServerStatBoardResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var result = &pb.ComposeServerStatBoardResponse{} + var tx = this.NullTx() + + // 按日流量统计 + dayFrom := timeutil.Format("Ymd", time.Now().AddDate(0, 0, -14)) + dailyTrafficStats, err := models.SharedServerDailyStatDAO.FindDailyStats(tx, req.ServerId, dayFrom, timeutil.Format("Ymd")) + if err != nil { + return nil, err + } + for _, stat := range dailyTrafficStats { + result.DailyTrafficStats = append(result.DailyTrafficStats, &pb.ComposeServerStatBoardResponse_DailyTrafficStat{ + Day: stat.Day, + Bytes: int64(stat.Bytes), + CachedBytes: int64(stat.CachedBytes), + CountRequests: int64(stat.CountRequests), + CountCachedRequests: int64(stat.CountCachedRequests), + }) + } + + // 小时流量统计 + hourFrom := timeutil.Format("YmdH", time.Now().Add(-23*time.Hour)) + hourTo := timeutil.Format("YmdH") + hourlyTrafficStats, err := models.SharedServerDailyStatDAO.FindHourlyStats(tx, req.ServerId, hourFrom, hourTo) + if err != nil { + return nil, err + } + for _, stat := range hourlyTrafficStats { + result.HourlyTrafficStats = append(result.HourlyTrafficStats, &pb.ComposeServerStatBoardResponse_HourlyTrafficStat{ + Hour: stat.Hour, + Bytes: int64(stat.Bytes), + CachedBytes: int64(stat.CachedBytes), + CountRequests: int64(stat.CountRequests), + CountCachedRequests: int64(stat.CountCachedRequests), + }) + } + + // 域名排行 + topDomainStats, err := stats.SharedServerDomainHourlyStatDAO.FindTopDomainStatsWithServerId(tx, req.ServerId, hourFrom, hourTo) + if err != nil { + return nil, err + } + for _, stat := range topDomainStats { + result.TopDomainStats = append(result.TopDomainStats, &pb.ComposeServerStatBoardResponse_DomainStat{ + ServerId: int64(stat.ServerId), + Domain: stat.Domain, + CountRequests: int64(stat.CountRequests), + Bytes: int64(stat.Bytes), + }) + } + + // 指标 + clusterId, err := models.SharedServerDAO.FindServerClusterId(tx, req.ServerId) + if err != nil { + return nil, err + } + clusterMetricItems, err := models.SharedNodeClusterMetricItemDAO.FindAllClusterItems(tx, clusterId, serverconfigs.MetricItemCategoryHTTP) + if err != nil { + return nil, err + } + var pbMetricCharts = []*pb.ComposeServerStatBoardResponse_MetricData{} + for _, clusterMetricItem := range clusterMetricItems { + if clusterMetricItem.IsOn != 1 { + continue + } + var itemId = int64(clusterMetricItem.ItemId) + charts, err := models.SharedMetricChartDAO.FindAllEnabledCharts(tx, itemId) + if err != nil { + return nil, err + } + + item, err := models.SharedMetricItemDAO.FindEnabledMetricItem(tx, itemId) + if err != nil { + return nil, err + } + if item == nil || item.IsOn == 0 { + continue + } + + for _, chart := range charts { + if chart.IsOn == 0 { + continue + } + + var pbChart = &pb.MetricChart{ + Id: int64(chart.Id), + Name: chart.Name, + Type: chart.Type, + WidthDiv: chart.WidthDiv, + ParamsJSON: nil, + IsOn: chart.IsOn == 1, + MaxItems: types.Int32(chart.MaxItems), + MetricItem: &pb.MetricItem{ + Id: itemId, + PeriodUnit: item.PeriodUnit, + Period: types.Int32(item.Period), + Name: item.Name, + Value: item.Value, + Category: item.Category, + Keys: item.DecodeKeys(), + Code: item.Code, + IsOn: item.IsOn == 1, + }, + } + var pbStats = []*pb.MetricStat{} + switch chart.Type { + case serverconfigs.MetricChartTypeTimeLine: + itemStats, err := models.SharedMetricStatDAO.FindLatestItemStatsWithServerId(tx, req.ServerId, itemId, types.Int32(item.Version), 10) + if err != nil { + return nil, err + } + + for _, stat := range itemStats { + // 当前时间总和 + count, total, err := models.SharedMetricSumStatDAO.FindServerSum(tx, req.ServerId, stat.Time, itemId, types.Int32(item.Version)) + if err != nil { + return nil, err + } + + pbStats = append(pbStats, &pb.MetricStat{ + Id: int64(stat.Id), + Hash: stat.Hash, + ServerId: 0, + ItemId: 0, + Keys: stat.DecodeKeys(), + Value: types.Float32(stat.Value), + Time: stat.Time, + Version: 0, + NodeCluster: nil, + Node: nil, + Server: nil, + SumCount: count, + SumTotal: total, + }) + } + default: + itemStats, err := models.SharedMetricStatDAO.FindItemStatsWithServerIdAndLastTime(tx, req.ServerId, itemId, types.Int32(item.Version), 10) + if err != nil { + return nil, err + } + for _, stat := range itemStats { + // 当前时间总和 + count, total, err := models.SharedMetricSumStatDAO.FindServerSum(tx, req.ServerId, stat.Time, itemId, types.Int32(item.Version)) + if err != nil { + return nil, err + } + + pbStats = append(pbStats, &pb.MetricStat{ + Id: int64(stat.Id), + Hash: stat.Hash, + ServerId: 0, + ItemId: 0, + Keys: stat.DecodeKeys(), + Value: types.Float32(stat.Value), + Time: stat.Time, + Version: 0, + NodeCluster: nil, + Node: nil, + Server: nil, + SumCount: count, + SumTotal: total, + }) + } + } + pbMetricCharts = append(pbMetricCharts, &pb.ComposeServerStatBoardResponse_MetricData{ + MetricChart: pbChart, + MetricStats: pbStats, + }) + } + } + result.MetricCharts = pbMetricCharts + + return result, nil +}