diff --git a/internal/db/models/server_bandwidth_stat_dao.go b/internal/db/models/server_bandwidth_stat_dao.go index 1bd86746..eda99998 100644 --- a/internal/db/models/server_bandwidth_stat_dao.go +++ b/internal/db/models/server_bandwidth_stat_dao.go @@ -1,6 +1,7 @@ package models import ( + "fmt" "github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/goman" "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" @@ -63,7 +64,7 @@ func init() { // UpdateServerBandwidth 写入数据 // 暂时不使用region区分 -func (this *ServerBandwidthStatDAO) UpdateServerBandwidth(tx *dbs.Tx, userId int64, serverId int64, day string, timeAt string, bytes int64, totalBytes int64) error { +func (this *ServerBandwidthStatDAO) UpdateServerBandwidth(tx *dbs.Tx, userId int64, serverId int64, regionId int64, day string, timeAt string, bytes int64, totalBytes int64, cachedBytes int64, attackBytes int64, countRequests int64, countCachedRequests int64, countAttackRequests int64) error { if serverId <= 0 { return errors.New("invalid server id '" + types.String(serverId) + "'") } @@ -72,18 +73,34 @@ func (this *ServerBandwidthStatDAO) UpdateServerBandwidth(tx *dbs.Tx, userId int Table(this.partialTable(serverId)). Param("bytes", bytes). Param("totalBytes", totalBytes). + Param("cachedBytes", cachedBytes). + Param("attackBytes", attackBytes). + Param("countRequests", countRequests). + Param("countCachedRequests", countCachedRequests). + Param("countAttackRequests", countAttackRequests). InsertOrUpdateQuickly(maps.Map{ - "userId": userId, - "serverId": serverId, - "day": day, - "timeAt": timeAt, - "bytes": bytes, - "totalBytes": totalBytes, - "avgBytes": totalBytes / 300, + "userId": userId, + "serverId": serverId, + "regionId": regionId, + "day": day, + "timeAt": timeAt, + "bytes": bytes, + "totalBytes": totalBytes, + "avgBytes": totalBytes / 300, + "cachedBytes": cachedBytes, + "attackBytes": attackBytes, + "countRequests": countRequests, + "countCachedRequests": countCachedRequests, + "countAttackRequests": countAttackRequests, }, maps.Map{ - "bytes": dbs.SQL("bytes+:bytes"), - "avgBytes": dbs.SQL("(totalBytes+:totalBytes)/300"), // 因为生成SQL语句时会自动将avgBytes排在totalBytes之前,所以这里不用担心先后顺序的问题 - "totalBytes": dbs.SQL("totalBytes+:totalBytes"), + "bytes": dbs.SQL("bytes+:bytes"), + "avgBytes": dbs.SQL("(totalBytes+:totalBytes)/300"), // 因为生成SQL语句时会自动将avgBytes排在totalBytes之前,所以这里不用担心先后顺序的问题 + "totalBytes": dbs.SQL("totalBytes+:totalBytes"), + "cachedBytes": dbs.SQL("cachedBytes+:cachedBytes"), + "attackBytes": dbs.SQL("attackBytes+:attackBytes"), + "countRequests": dbs.SQL("countRequests+:countRequests"), + "countCachedRequests": dbs.SQL("countCachedRequests+:countCachedRequests"), + "countAttackRequests": dbs.SQL("countAttackRequests+:countAttackRequests"), }) } @@ -540,6 +557,190 @@ func (this *ServerBandwidthStatDAO) FindPercentileBetweenTimes(tx *dbs.Tx, serve return this.fixServerStat(one.(*ServerBandwidthStat), useAvg), nil } +// FindDailyStats 按天统计 +func (this *ServerBandwidthStatDAO) FindDailyStats(tx *dbs.Tx, serverId int64, dayFrom string, dayTo string) (result []*ServerBandwidthStat, err error) { + // 兼容以往版本 + if !regexputils.YYYYMMDD.MatchString(dayFrom) || !regexputils.YYYYMMDD.MatchString(dayTo) { + return nil, nil + } + hasFullData, err := this.HasFullData(tx, serverId, dayFrom[:6]) + if err != nil { + return nil, err + } + if !hasFullData { + ones, err := SharedServerDailyStatDAO.compatFindDailyStats(tx, serverId, dayFrom, dayTo) + if err != nil { + return nil, err + } + for _, one := range ones { + result = append(result, one.AsServerBandwidthStat()) + } + + return result, nil + } + + ones, err := this.Query(tx). + Table(this.partialTable(serverId)). + Result("SUM(totalBytes) AS totalBytes", "SUM(cachedBytes) AS cachedBytes", "SUM(countRequests) AS countRequests", "SUM(countCachedRequests) AS countCachedRequests", "SUM(countAttackRequests) AS countAttackRequests", "SUM(attackBytes) AS attackBytes", "day"). + Attr("serverId", serverId). + Between("day", dayFrom, dayTo). + Group("day"). + FindAll() + if err != nil { + return nil, err + } + + var dayMap = map[string]*ServerBandwidthStat{} // day => Stat + for _, one := range ones { + var stat = one.(*ServerBandwidthStat) + dayMap[stat.Day] = stat + } + days, err := utils.RangeDays(dayFrom, dayTo) + if err != nil { + return nil, err + } + for _, day := range days { + stat, ok := dayMap[day] + if ok { + result = append(result, stat) + } else { + result = append(result, &ServerBandwidthStat{Day: day}) + } + } + + return +} + +// FindHourlyStats 按小时统计 +func (this *ServerBandwidthStatDAO) FindHourlyStats(tx *dbs.Tx, serverId int64, hourFrom string, hourTo string) (result []*ServerBandwidthStat, err error) { + // 兼容以往版本 + if !regexputils.YYYYMMDDHH.MatchString(hourFrom) || !regexputils.YYYYMMDDHH.MatchString(hourTo) { + return nil, nil + } + hasFullData, err := this.HasFullData(tx, serverId, hourFrom[:6]) + if err != nil { + return nil, err + } + if !hasFullData { + ones, err := SharedServerDailyStatDAO.compatFindHourlyStats(tx, serverId, hourFrom, hourTo) + if err != nil { + return nil, err + } + for _, one := range ones { + result = append(result, one.AsServerBandwidthStat()) + } + + return result, nil + } + + var query = this.Query(tx). + Table(this.partialTable(serverId)). + Result("MIN(day) AS day", "MIN(timeAt) AS timeAt", "SUM(totalBytes) AS totalBytes", "SUM(cachedBytes) AS cachedBytes", "SUM(countRequests) AS countRequests", "SUM(countCachedRequests) AS countCachedRequests", "SUM(countAttackRequests) AS countAttackRequests", "SUM(attackBytes) AS attackBytes", "CONCAT(day, SUBSTR(timeAt, 1, 2)) AS hour"). + Attr("serverId", serverId) + + if hourFrom[:8] == hourTo[:8] { // 同一天 + query.Attr("day", hourFrom[:8]) + query.Between("timeAt", hourFrom[8:]+"00", hourTo[8:]+"59") + } else { + query.Between("CONCAT(day, SUBSTR(timeAt, 1, 2))", hourFrom, hourTo) + } + + ones, err := query. + Group("hour"). + FindAll() + if err != nil { + return nil, err + } + + var hourMap = map[string]*ServerBandwidthStat{} // hour => Stat + for _, one := range ones { + var stat = one.(*ServerBandwidthStat) + var hour = stat.Day + stat.TimeAt[:2] + hourMap[hour] = stat + } + hours, err := utils.RangeHours(hourFrom, hourTo) + if err != nil { + return nil, err + } + for _, hour := range hours { + stat, ok := hourMap[hour] + if ok { + result = append(result, stat) + } else { + result = append(result, &ServerBandwidthStat{ + Day: hour[:8], + TimeAt: hour[8:] + "00", + }) + } + } + + return +} + +// SumDailyStat 获取某天内的流量 +// dayFrom 格式为YYYYMMDD +// dayTo 格式为YYYYMMDD +func (this *ServerBandwidthStatDAO) SumDailyStat(tx *dbs.Tx, serverId int64, regionId int64, dayFrom string, dayTo string) (stat *pb.ServerDailyStat, err error) { + if !regexputils.YYYYMMDD.MatchString(dayFrom) { + return nil, errors.New("invalid dayFrom '" + dayFrom + "'") + } + if !regexputils.YYYYMMDD.MatchString(dayTo) { + return nil, errors.New("invalid dayTo '" + dayTo + "'") + } + + // 兼容以往版本 + hasFullData, err := this.HasFullData(tx, serverId, dayFrom[:6]) + if err != nil { + return nil, err + } + if !hasFullData { + return SharedServerDailyStatDAO.compatSumDailyStat(tx, 0, serverId, regionId, dayFrom, dayTo) + } + + stat = &pb.ServerDailyStat{} + + if serverId <= 0 { + return + } + + if dayFrom > dayTo { + dayFrom, dayTo = dayTo, dayFrom + } + + var query = this.Query(tx). + Table(this.partialTable(serverId)). + Result("SUM(totalBytes) AS totalBytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests, SUM(countAttackRequests) AS countAttackRequests, SUM(attackBytes) AS attackBytes") + + query.Attr("serverId", serverId) + + if regionId > 0 { + query.Attr("regionId", regionId) + } + + if dayFrom == dayTo { + query.Attr("day", dayFrom) + } else { + query.Between("day", dayFrom, dayTo) + } + + one, _, err := query.FindOne() + if err != nil { + return nil, err + } + + if one == nil { + return + } + + stat.Bytes = one.GetInt64("totalBytes") + stat.CachedBytes = one.GetInt64("cachedBytes") + stat.CountRequests = one.GetInt64("countRequests") + stat.CountCachedRequests = one.GetInt64("countCachedRequests") + stat.CountAttackRequests = one.GetInt64("countAttackRequests") + stat.AttackBytes = one.GetInt64("attackBytes") + return +} + // Clean 清理过期数据 func (this *ServerBandwidthStatDAO) Clean(tx *dbs.Tx) error { var day = timeutil.Format("Ymd", time.Now().AddDate(0, 0, -100)) // 保留大约3个月的数据 @@ -619,3 +820,55 @@ func (this *ServerBandwidthStatDAO) fixServerStats(stats []*ServerBandwidthStat, } } } + +// HasFullData 检查一个月是否完整数据 +// 是为了兼容以前数据,以前的表中没有缓存流量、请求数等字段 +func (this *ServerBandwidthStatDAO) HasFullData(tx *dbs.Tx, serverId int64, month string) (bool, error) { + var monthKey = month + "@" + types.String(serverId) + + if !regexputils.YYYYMM.MatchString(month) { + return false, errors.New("invalid month '" + month + "'") + } + + // 仅供调试 + if Tea.IsTesting() { + return true, nil + } + + fullDataLocker.Lock() + hasData, ok := fullDataMap[monthKey] + fullDataLocker.Unlock() + if ok { + return hasData, nil + } + + var year = types.Int(month[:4]) + var monthInt = types.Int(month[4:]) + + if year < 2000 || monthInt > 12 || monthInt < 1 { + return false, nil + } + + var lastMonth = monthInt - 1 + if lastMonth == 0 { + lastMonth = 12 + year-- + } + + var lastMonthString = fmt.Sprintf("%d%02d", year, lastMonth) + one, err := this.Query(tx). + Table(this.partialTable(serverId)). + Between("day", lastMonthString+"01", lastMonthString+"31"). + DescPk(). + Find() + if err != nil { + return false, err + } + + var b = one != nil && one.(*ServerBandwidthStat).CountRequests > 0 + fullDataLocker.Lock() + fullDataMap[monthKey] = b + fullDataLocker.Unlock() + + return b, nil +} diff --git a/internal/db/models/server_bandwidth_stat_dao_test.go b/internal/db/models/server_bandwidth_stat_dao_test.go index 7a9657d7..e3a37649 100644 --- a/internal/db/models/server_bandwidth_stat_dao_test.go +++ b/internal/db/models/server_bandwidth_stat_dao_test.go @@ -16,7 +16,7 @@ import ( func TestServerBandwidthStatDAO_UpdateServerBandwidth(t *testing.T) { var dao = models.NewServerBandwidthStatDAO() var tx *dbs.Tx - err := dao.UpdateServerBandwidth(tx, 1, 1, timeutil.Format("Ymd"), timeutil.FormatTime("Hi", time.Now().Unix()/300*300), 1024, 300) + err := dao.UpdateServerBandwidth(tx, 1, 1, timeutil.Format("Ymd"), timeutil.FormatTime("Hi", time.Now().Unix()/300*300), 1024, 300, 0, 0, 0, 0, 0) if err != nil { t.Fatal(err) } @@ -28,9 +28,12 @@ func TestSeverBandwidthStatDAO_InsertManyStats(t *testing.T) { var tx *dbs.Tx var count = 1 // 测试时将此值设为一个比较大的数字 for i := 0; i < count; i++ { + if i%10000 == 0 { + t.Log(i) + } var day = timeutil.Format("Ymd", time.Now().AddDate(0, 0, -rands.Int(0, 200))) var minute = fmt.Sprintf("%02d%02d", rands.Int(0, 23), rands.Int(0, 59)) - err := dao.UpdateServerBandwidth(tx, 1, 1, day, minute, 1024, 300) + err := dao.UpdateServerBandwidth(tx, 1, int64(rands.Int(1, 10000)), day, minute, 1024, 300, 0, 0, 0, 0, 0) if err != nil { t.Fatal(err) } diff --git a/internal/db/models/server_bandwidth_stat_model.go b/internal/db/models/server_bandwidth_stat_model.go index e5c54235..c82a4225 100644 --- a/internal/db/models/server_bandwidth_stat_model.go +++ b/internal/db/models/server_bandwidth_stat_model.go @@ -2,27 +2,37 @@ package models // ServerBandwidthStat 服务峰值带宽统计 type ServerBandwidthStat struct { - Id uint64 `field:"id"` // ID - UserId uint64 `field:"userId"` // 用户ID - ServerId uint64 `field:"serverId"` // 服务ID - RegionId uint32 `field:"regionId"` // 区域ID - Day string `field:"day"` // 日期YYYYMMDD - TimeAt string `field:"timeAt"` // 时间点HHMM - Bytes uint64 `field:"bytes"` // 带宽字节 - AvgBytes uint64 `field:"avgBytes"` // 平均流量 - TotalBytes uint64 `field:"totalBytes"` // 总流量 + Id uint64 `field:"id"` // ID + UserId uint64 `field:"userId"` // 用户ID + ServerId uint64 `field:"serverId"` // 服务ID + RegionId uint32 `field:"regionId"` // 区域ID + Day string `field:"day"` // 日期YYYYMMDD + TimeAt string `field:"timeAt"` // 时间点HHMM + Bytes uint64 `field:"bytes"` // 带宽字节 + AvgBytes uint64 `field:"avgBytes"` // 平均流量 + CachedBytes uint64 `field:"cachedBytes"` // 缓存的流量 + AttackBytes uint64 `field:"attackBytes"` // 攻击流量 + CountRequests uint64 `field:"countRequests"` // 请求数 + CountCachedRequests uint64 `field:"countCachedRequests"` // 缓存的请求数 + CountAttackRequests uint64 `field:"countAttackRequests"` // 攻击请求数 + TotalBytes uint64 `field:"totalBytes"` // 总流量 } type ServerBandwidthStatOperator struct { - Id any // ID - UserId any // 用户ID - ServerId any // 服务ID - RegionId any // 区域ID - Day any // 日期YYYYMMDD - TimeAt any // 时间点HHMM - Bytes any // 带宽字节 - AvgBytes any // 平均流量 - TotalBytes any // 总流量 + Id any // ID + UserId any // 用户ID + ServerId any // 服务ID + RegionId any // 区域ID + Day any // 日期YYYYMMDD + TimeAt any // 时间点HHMM + Bytes any // 带宽字节 + AvgBytes any // 平均流量 + CachedBytes any // 缓存的流量 + AttackBytes any // 攻击流量 + CountRequests any // 请求数 + CountCachedRequests any // 缓存的请求数 + CountAttackRequests any // 攻击请求数 + TotalBytes any // 总流量 } func NewServerBandwidthStatOperator() *ServerBandwidthStatOperator { diff --git a/internal/db/models/server_daily_stat_dao.go b/internal/db/models/server_daily_stat_dao.go index c4fa20e8..b173283d 100644 --- a/internal/db/models/server_daily_stat_dao.go +++ b/internal/db/models/server_daily_stat_dao.go @@ -195,7 +195,7 @@ func (this *ServerDailyStatDAO) SumUserMonthlyPeek(tx *dbs.Tx, userId int64, reg // SumUserDaily 获取某天流量总和 // day 格式为YYYYMMDD -func (this *ServerDailyStatDAO) SumUserDaily(tx *dbs.Tx, userId int64, regionId int64, day string) (stat *ServerDailyStat, err error) { +func (this *ServerDailyStatDAO) compatSumUserDaily(tx *dbs.Tx, userId int64, regionId int64, day string) (stat *ServerDailyStat, err error) { var query = this.Query(tx) if regionId > 0 { query.Attr("regionId", regionId) @@ -234,7 +234,7 @@ func (this *ServerDailyStatDAO) SumUserTrafficBytesBetweenDays(tx *dbs.Tx, userI // SumUserMonthly 获取某月流量总和 // month 格式为YYYYMM -func (this *ServerDailyStatDAO) SumUserMonthly(tx *dbs.Tx, userId int64, month string) (int64, error) { +func (this *ServerDailyStatDAO) compatSumUserMonthly(tx *dbs.Tx, userId int64, month string) (int64, error) { return this.Query(tx). Between("day", month+"01", month+"31"). Attr("userId", userId). @@ -323,10 +323,10 @@ func (this *ServerDailyStatDAO) SumHourlyStat(tx *dbs.Tx, serverId int64, hour s return } -// SumDailyStat 获取某天内的流量 +// compatSumDailyStat 获取某天内的流量 // dayFrom 格式为YYYYMMDD // dayTo 格式为YYYYMMDD -func (this *ServerDailyStatDAO) SumDailyStat(tx *dbs.Tx, userId int64, serverId int64, regionId int64, dayFrom string, dayTo string) (stat *pb.ServerDailyStat, err error) { +func (this *ServerDailyStatDAO) compatSumDailyStat(tx *dbs.Tx, userId int64, serverId int64, regionId int64, dayFrom string, dayTo string) (stat *pb.ServerDailyStat, err error) { stat = &pb.ServerDailyStat{} if userId <= 0 && serverId <= 0 { @@ -463,7 +463,7 @@ func (this *ServerDailyStatDAO) SumMonthlyBytes(tx *dbs.Tx, serverId int64, mont } // FindDailyStats 按天统计 -func (this *ServerDailyStatDAO) FindDailyStats(tx *dbs.Tx, serverId int64, dayFrom string, dayTo string) (result []*ServerDailyStat, err error) { +func (this *ServerDailyStatDAO) compatFindDailyStats(tx *dbs.Tx, serverId int64, dayFrom string, dayTo string) (result []*ServerDailyStat, err error) { ones, err := this.Query(tx). Result("SUM(bytes) AS bytes", "SUM(cachedBytes) AS cachedBytes", "SUM(countRequests) AS countRequests", "SUM(countCachedRequests) AS countCachedRequests", "SUM(countAttackRequests) AS countAttackRequests", "SUM(attackBytes) AS attackBytes", "day"). Attr("serverId", serverId). @@ -640,7 +640,7 @@ func (this *ServerDailyStatDAO) FindMonthlyStatsWithPlan(tx *dbs.Tx, month strin } // FindHourlyStats 按小时统计 -func (this *ServerDailyStatDAO) FindHourlyStats(tx *dbs.Tx, serverId int64, hourFrom string, hourTo string) (result []*ServerDailyStat, err error) { +func (this *ServerDailyStatDAO) compatFindHourlyStats(tx *dbs.Tx, serverId int64, hourFrom string, hourTo string) (result []*ServerDailyStat, err error) { ones, err := this.Query(tx). Result("SUM(bytes) AS bytes", "SUM(cachedBytes) AS cachedBytes", "SUM(countRequests) AS countRequests", "SUM(countCachedRequests) AS countCachedRequests", "SUM(countAttackRequests) AS countAttackRequests", "SUM(attackBytes) AS attackBytes", "hour"). Attr("serverId", serverId). diff --git a/internal/db/models/server_daily_stat_dao_test.go b/internal/db/models/server_daily_stat_dao_test.go index b9368131..01c73393 100644 --- a/internal/db/models/server_daily_stat_dao_test.go +++ b/internal/db/models/server_daily_stat_dao_test.go @@ -1,10 +1,13 @@ -package models +package models_test import ( + "fmt" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/logs" + "github.com/iwind/TeaGo/rands" timeutil "github.com/iwind/TeaGo/utils/time" "testing" "time" @@ -12,7 +15,7 @@ import ( func TestServerDailyStatDAO_SaveStats(t *testing.T) { var tx *dbs.Tx - stats := []*pb.ServerDailyStat{ + var stats = []*pb.ServerDailyStat{ { ServerId: 1, NodeRegionId: 2, @@ -20,7 +23,7 @@ func TestServerDailyStatDAO_SaveStats(t *testing.T) { CreatedAt: 1607671488, }, } - err := NewServerDailyStatDAO().SaveStats(tx, stats) + err := models.NewServerDailyStatDAO().SaveStats(tx, stats) if err != nil { t.Fatal(err) } @@ -29,7 +32,7 @@ func TestServerDailyStatDAO_SaveStats(t *testing.T) { func TestServerDailyStatDAO_SaveStats2(t *testing.T) { var tx *dbs.Tx - stats := []*pb.ServerDailyStat{ + var stats = []*pb.ServerDailyStat{ { ServerId: 1, NodeRegionId: 3, @@ -37,7 +40,7 @@ func TestServerDailyStatDAO_SaveStats2(t *testing.T) { CreatedAt: 1607671488, }, } - err := NewServerDailyStatDAO().SaveStats(tx, stats) + err := models.NewServerDailyStatDAO().SaveStats(tx, stats) if err != nil { t.Fatal(err) } @@ -47,7 +50,7 @@ func TestServerDailyStatDAO_SaveStats2(t *testing.T) { func TestServerDailyStatDAO_SumUserMonthly(t *testing.T) { dbs.NotifyReady() var tx *dbs.Tx - bytes, err := NewServerDailyStatDAO().SumUserMonthly(tx, 1, timeutil.Format("Ym")) + bytes, err := models.NewUserBandwidthStatDAO().SumUserMonthly(tx, 1, timeutil.Format("Ym")) if err != nil { t.Fatal(err) } @@ -58,7 +61,7 @@ func TestServerDailyStatDAO_SumHourlyRequests(t *testing.T) { dbs.NotifyReady() var tx *dbs.Tx - stat, err := NewServerDailyStatDAO().SumHourlyStat(tx, 23, timeutil.Format("YmdH")) + stat, err := models.NewServerDailyStatDAO().SumHourlyStat(tx, 23, timeutil.Format("YmdH")) if err != nil { t.Fatal(err) } @@ -69,7 +72,7 @@ func TestServerDailyStatDAO_SumMinutelyRequests(t *testing.T) { dbs.NotifyReady() var tx *dbs.Tx - stat, err := NewServerDailyStatDAO().SumMinutelyStat(tx, 23, timeutil.Format("Ymd")+"1435") + stat, err := models.NewServerDailyStatDAO().SumMinutelyStat(tx, 23, timeutil.Format("Ymd")+"1435") if err != nil { t.Fatal(err) } @@ -78,7 +81,7 @@ func TestServerDailyStatDAO_SumMinutelyRequests(t *testing.T) { func TestServerDailyStatDAO_FindDistinctPlanServerIdsBetweenDay(t *testing.T) { var tx *dbs.Tx - serverIds, err := NewServerDailyStatDAO().FindDistinctServerIds(tx, timeutil.Format("Ym01"), timeutil.Format("Ymd")) + serverIds, err := models.NewServerDailyStatDAO().FindDistinctServerIds(tx, timeutil.Format("Ym01"), timeutil.Format("Ymd")) if err != nil { t.Fatal(err) } @@ -87,7 +90,7 @@ func TestServerDailyStatDAO_FindDistinctPlanServerIdsBetweenDay(t *testing.T) { func TestServerDailyStatDAO_FindStatsBetweenDays(t *testing.T) { var tx *dbs.Tx - stats, err := NewServerDailyStatDAO().FindStatsBetweenDays(tx, 1, 0, 0, timeutil.Format("Ymd", time.Now().AddDate(0, 0, -1)), timeutil.Format("Ymd")) + stats, err := models.NewServerDailyStatDAO().FindStatsBetweenDays(tx, 1, 0, 0, timeutil.Format("Ymd", time.Now().AddDate(0, 0, -1)), timeutil.Format("Ymd")) if err != nil { t.Fatal(err) } @@ -96,8 +99,41 @@ func TestServerDailyStatDAO_FindStatsBetweenDays(t *testing.T) { } } +func TestSeverDailyStatDAO_InsertMany(t *testing.T) { + dbs.NotifyReady() + + var tx *dbs.Tx + var dao = models.NewServerDailyStatDAO() + var count = 1 // 实际测试时可以将此值调的很大 + for i := 0; i < count; i++ { + if i%10000 == 0 { + t.Log(i) + } + err := dao.SaveStats(tx, []*pb.ServerDailyStat{{ + ServerId: 23, + NodeRegionId: int64(rands.Int(0, 999999)), + Bytes: 1024, + CachedBytes: 1024, + CountRequests: 1024, + CountCachedRequests: 1024, + CreatedAt: time.Now().Unix(), + CountAttackRequests: 1024, + AttackBytes: 1024, + CheckTrafficLimiting: false, + PlanId: 0, + Day: "202303" + fmt.Sprintf("%02d", rands.Int(1, 31)), + Hour: "2023032101", + TimeFrom: fmt.Sprintf("%06d", rands.Int(0, 999999)), + TimeTo: "211459", + }}) + if err != nil { + t.Fatal(err) + } + } +} + func TestServerDailyStatDAO_FindStatsWithDay(t *testing.T) { - var dao = NewServerDailyStatDAO() + var dao = models.NewServerDailyStatDAO() var tx *dbs.Tx stats, err := dao.FindStatsWithDay(tx, 23, timeutil.Format("Ymd"), "000000", "235900") if err != nil { diff --git a/internal/db/models/server_daily_stat_model_ext.go b/internal/db/models/server_daily_stat_model_ext.go index 2640e7f9..7ae50c50 100644 --- a/internal/db/models/server_daily_stat_model_ext.go +++ b/internal/db/models/server_daily_stat_model_ext.go @@ -1 +1,38 @@ package models + +func (this *ServerDailyStat) AsUserBandwidthStat() *UserBandwidthStat { + return &UserBandwidthStat{ + Id: 0, + UserId: uint64(this.UserId), + RegionId: this.RegionId, + Day: this.Day, + TimeAt: this.TimeFrom[:4], + Bytes: this.Bytes / 300, + TotalBytes: this.Bytes, + AvgBytes: this.Bytes / 300, + CachedBytes: this.CachedBytes, + AttackBytes: this.AttackBytes, + CountRequests: this.CountRequests, + CountCachedRequests: this.CountCachedRequests, + CountAttackRequests: this.CountAttackRequests, + } +} + +func (this *ServerDailyStat) AsServerBandwidthStat() *ServerBandwidthStat { + return &ServerBandwidthStat{ + Id: 0, + UserId: uint64(this.UserId), + ServerId: uint64(this.ServerId), + RegionId: this.RegionId, + Day: this.Day, + TimeAt: this.TimeFrom[:4], + Bytes: this.Bytes / 300, + TotalBytes: this.Bytes, + AvgBytes: this.Bytes / 300, + CachedBytes: this.CachedBytes, + AttackBytes: this.AttackBytes, + CountRequests: this.CountRequests, + CountCachedRequests: this.CountCachedRequests, + CountAttackRequests: this.CountAttackRequests, + } +} diff --git a/internal/db/models/user_bandwidth_stat_dao.go b/internal/db/models/user_bandwidth_stat_dao.go index 4effa8d6..a1d2587a 100644 --- a/internal/db/models/user_bandwidth_stat_dao.go +++ b/internal/db/models/user_bandwidth_stat_dao.go @@ -1,6 +1,7 @@ package models import ( + "fmt" "github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/goman" "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" @@ -26,6 +27,9 @@ const ( UserBandwidthStatTablePartials = 20 ) +var fullDataMap = map[string]bool{} // month => bool +var fullDataLocker = &sync.Mutex{} + func init() { dbs.OnReadyDone(func() { // 清理数据任务 @@ -61,7 +65,7 @@ func init() { } // UpdateUserBandwidth 写入数据 -func (this *UserBandwidthStatDAO) UpdateUserBandwidth(tx *dbs.Tx, userId int64, regionId int64, day string, timeAt string, bytes int64, totalBytes int64) error { +func (this *UserBandwidthStatDAO) UpdateUserBandwidth(tx *dbs.Tx, userId int64, regionId int64, day string, timeAt string, bytes int64, totalBytes int64, cachedBytes int64, attackBytes int64, countRequests int64, countCachedRequests int64, countAttackRequests int64) error { if userId <= 0 { // 如果用户ID不大于0,则说明服务不属于任何用户,此时不需要处理 return nil @@ -71,18 +75,33 @@ func (this *UserBandwidthStatDAO) UpdateUserBandwidth(tx *dbs.Tx, userId int64, Table(this.partialTable(userId)). Param("bytes", bytes). Param("totalBytes", totalBytes). + Param("cachedBytes", cachedBytes). + Param("attackBytes", attackBytes). + Param("countRequests", countRequests). + Param("countCachedRequests", countCachedRequests). + Param("countAttackRequests", countAttackRequests). InsertOrUpdateQuickly(maps.Map{ - "userId": userId, - "regionId": regionId, - "day": day, - "timeAt": timeAt, - "bytes": bytes, - "totalBytes": totalBytes, - "avgBytes": totalBytes / 300, + "userId": userId, + "regionId": regionId, + "day": day, + "timeAt": timeAt, + "bytes": bytes, + "totalBytes": totalBytes, + "avgBytes": totalBytes / 300, + "cachedBytes": cachedBytes, + "attackBytes": attackBytes, + "countRequests": countRequests, + "countCachedRequests": countCachedRequests, + "countAttackRequests": countAttackRequests, }, maps.Map{ - "bytes": dbs.SQL("bytes+:bytes"), - "avgBytes": dbs.SQL("(totalBytes+:totalBytes)/300"), // 因为生成SQL语句时会自动将avgBytes排在totalBytes之前,所以这里不用担心先后顺序的问题 - "totalBytes": dbs.SQL("totalBytes+:totalBytes"), + "bytes": dbs.SQL("bytes+:bytes"), + "avgBytes": dbs.SQL("(totalBytes+:totalBytes)/300"), // 因为生成SQL语句时会自动将avgBytes排在totalBytes之前,所以这里不用担心先后顺序的问题 + "totalBytes": dbs.SQL("totalBytes+:totalBytes"), + "cachedBytes": dbs.SQL("cachedBytes+:cachedBytes"), + "attackBytes": dbs.SQL("attackBytes+:attackBytes"), + "countRequests": dbs.SQL("countRequests+:countRequests"), + "countCachedRequests": dbs.SQL("countCachedRequests+:countCachedRequests"), + "countAttackRequests": dbs.SQL("countAttackRequests+:countAttackRequests"), }) } @@ -320,6 +339,127 @@ func (this *UserBandwidthStatDAO) FindDistinctUserIds(tx *dbs.Tx, dayFrom string return } +// SumUserMonthly 获取某月流量总和 +// month 格式为YYYYMM +func (this *UserBandwidthStatDAO) SumUserMonthly(tx *dbs.Tx, userId int64, month string) (int64, error) { + // 兼容以往版本 + hasFullData, err := this.HasFullData(tx, userId, month) + if err != nil { + return 0, err + } + if !hasFullData { + return SharedServerDailyStatDAO.compatSumUserMonthly(tx, userId, month) + } + + return this.Query(tx). + Table(this.partialTable(userId)). + Between("day", month+"01", month+"31"). + Attr("userId", userId). + SumInt64("totalBytes", 0) +} + +// SumUserDaily 获取某天流量总和 +// day 格式为YYYYMMDD +func (this *UserBandwidthStatDAO) SumUserDaily(tx *dbs.Tx, userId int64, regionId int64, day string) (stat *UserBandwidthStat, err error) { + if !regexputils.YYYYMMDD.MatchString(day) { + return nil, nil + } + + // 兼容以往版本 + hasFullData, err := this.HasFullData(tx, userId, day[:6]) + if err != nil { + return nil, err + } + if !hasFullData { + serverStat, err := SharedServerDailyStatDAO.compatSumUserDaily(tx, userId, regionId, day) + if err != nil || serverStat == nil { + return nil, err + } + + return serverStat.AsUserBandwidthStat(), nil + } + + var query = this.Query(tx) + if regionId > 0 { + query.Attr("regionId", regionId) + } + + one, err := query. + Table(this.partialTable(userId)). + Attr("day", day). + Attr("userId", userId). + Result("SUM(totalBytes) AS totalBytes", "SUM(cachedBytes) AS cachedBytes", "SUM(attackBytes) AS attackBytes", "SUM(countRequests) AS countRequests", "SUM(countCachedRequests) AS countCachedRequests", "SUM(countAttackRequests) AS countAttackRequests"). + Find() + if err != nil || one == nil { + return nil, err + } + return one.(*UserBandwidthStat), nil +} + +// SumDailyStat 获取某天内的流量 +// dayFrom 格式为YYYYMMDD +// dayTo 格式为YYYYMMDD +func (this *UserBandwidthStatDAO) SumDailyStat(tx *dbs.Tx, userId int64, regionId int64, dayFrom string, dayTo string) (stat *pb.ServerDailyStat, err error) { + if !regexputils.YYYYMMDD.MatchString(dayFrom) { + return nil, errors.New("invalid dayFrom '" + dayFrom + "'") + } + if !regexputils.YYYYMMDD.MatchString(dayTo) { + return nil, errors.New("invalid dayTo '" + dayTo + "'") + } + + // 兼容以往版本 + hasFullData, err := this.HasFullData(tx, userId, dayFrom[:6]) + if err != nil { + return nil, err + } + if !hasFullData { + return SharedServerDailyStatDAO.compatSumDailyStat(tx, userId, 0, regionId, dayFrom, dayTo) + } + + stat = &pb.ServerDailyStat{} + + if userId <= 0 { + return + } + + if dayFrom > dayTo { + dayFrom, dayTo = dayTo, dayFrom + } + + var query = this.Query(tx). + Table(this.partialTable(userId)). + Result("SUM(totalBytes) AS totalBytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests, SUM(countAttackRequests) AS countAttackRequests, SUM(attackBytes) AS attackBytes") + + query.Attr("userId", userId) + + if regionId > 0 { + query.Attr("regionId", regionId) + } + + if dayFrom == dayTo { + query.Attr("day", dayFrom) + } else { + query.Between("day", dayFrom, dayTo) + } + + one, _, err := query.FindOne() + if err != nil { + return nil, err + } + + if one == nil { + return + } + + stat.Bytes = one.GetInt64("totalBytes") + stat.CachedBytes = one.GetInt64("cachedBytes") + stat.CountRequests = one.GetInt64("countRequests") + stat.CountCachedRequests = one.GetInt64("countCachedRequests") + stat.CountAttackRequests = one.GetInt64("countAttackRequests") + stat.AttackBytes = one.GetInt64("attackBytes") + return +} + // Clean 清理过期数据 func (this *UserBandwidthStatDAO) Clean(tx *dbs.Tx) error { var day = timeutil.Format("Ymd", time.Now().AddDate(0, 0, -100)) // 保留大约3个月的数据 @@ -375,3 +515,53 @@ func (this *UserBandwidthStatDAO) fixUserStat(stat *UserBandwidthStat, useAvg bo } return stat } + +// HasFullData 检查一个月是否完整数据 +// 是为了兼容以前数据,以前的表中没有缓存流量、请求数等字段 +func (this *UserBandwidthStatDAO) HasFullData(tx *dbs.Tx, userId int64, month string) (bool, error) { + if !regexputils.YYYYMM.MatchString(month) { + return false, errors.New("invalid month '" + month + "'") + } + + // 仅供调试 + if Tea.IsTesting() { + return true, nil + } + + fullDataLocker.Lock() + hasData, ok := fullDataMap[month] + fullDataLocker.Unlock() + if ok { + return hasData, nil + } + + var year = types.Int(month[:4]) + var monthInt = types.Int(month[4:]) + + if year < 2000 || monthInt > 12 || monthInt < 1 { + return false, nil + } + + var lastMonth = monthInt - 1 + if lastMonth == 0 { + lastMonth = 12 + year-- + } + + var lastMonthString = fmt.Sprintf("%d%02d", year, lastMonth) + one, err := this.Query(tx). + Table(this.partialTable(userId)). + Between("day", lastMonthString+"01", lastMonthString+"31"). + DescPk(). + Find() + if err != nil { + return false, err + } + + var b = one != nil && one.(*UserBandwidthStat).CountRequests > 0 + fullDataLocker.Lock() + fullDataMap[month] = b + fullDataLocker.Unlock() + + return b, nil +} diff --git a/internal/db/models/user_bandwidth_stat_dao_test.go b/internal/db/models/user_bandwidth_stat_dao_test.go index 73087244..46c64024 100644 --- a/internal/db/models/user_bandwidth_stat_dao_test.go +++ b/internal/db/models/user_bandwidth_stat_dao_test.go @@ -50,7 +50,7 @@ func TestUserBandwidthStatDAO_FindUserPeekBandwidthInDay(t *testing.T) { func TestUserBandwidthStatDAO_UpdateServerBandwidth(t *testing.T) { var dao = models.NewUserBandwidthStatDAO() var tx *dbs.Tx - err := dao.UpdateUserBandwidth(tx, 1, 0, timeutil.Format("Ymd"), timeutil.FormatTime("Hi", time.Now().Unix()/300*300), 1024, 300) + err := dao.UpdateUserBandwidth(tx, 1, 0, timeutil.Format("Ymd"), timeutil.FormatTime("Hi", time.Now().Unix()/300*300), 1024, 300, 0, 0, 0, 0, 0) if err != nil { t.Fatal(err) } @@ -93,3 +93,13 @@ func TestUserBandwidthStatDAO_FindPercentileBetweenDays(t *testing.T) { } logs.PrintAsJSON(stat, t) } + +func TestUserBandwidthStatDAO_HasFullData(t *testing.T) { + var tx *dbs.Tx + var dao = models.NewUserBandwidthStatDAO() + + var month = "202304" + for i := 0; i < 3; i++ { + t.Log(dao.HasFullData(tx, 1, month)) + } +} diff --git a/internal/db/models/user_bandwidth_stat_model.go b/internal/db/models/user_bandwidth_stat_model.go index a4cfe406..cb90eed1 100644 --- a/internal/db/models/user_bandwidth_stat_model.go +++ b/internal/db/models/user_bandwidth_stat_model.go @@ -2,25 +2,35 @@ package models // UserBandwidthStat 用户月带宽峰值 type UserBandwidthStat struct { - Id uint64 `field:"id"` // ID - UserId uint64 `field:"userId"` // 用户ID - RegionId uint32 `field:"regionId"` // 区域ID - Day string `field:"day"` // 日期YYYYMMDD - TimeAt string `field:"timeAt"` // 时间点HHII - Bytes uint64 `field:"bytes"` // 带宽 - TotalBytes uint64 `field:"totalBytes"` // 总流量 - AvgBytes uint64 `field:"avgBytes"` // 平均流量 + Id uint64 `field:"id"` // ID + UserId uint64 `field:"userId"` // 用户ID + RegionId uint32 `field:"regionId"` // 区域ID + Day string `field:"day"` // 日期YYYYMMDD + TimeAt string `field:"timeAt"` // 时间点HHII + Bytes uint64 `field:"bytes"` // 带宽 + TotalBytes uint64 `field:"totalBytes"` // 总流量 + AvgBytes uint64 `field:"avgBytes"` // 平均流量 + CachedBytes uint64 `field:"cachedBytes"` // 缓存的流量 + AttackBytes uint64 `field:"attackBytes"` // 攻击流量 + CountRequests uint64 `field:"countRequests"` // 请求数 + CountCachedRequests uint64 `field:"countCachedRequests"` // 缓存的请求数 + CountAttackRequests uint64 `field:"countAttackRequests"` // 攻击请求数 } type UserBandwidthStatOperator struct { - Id any // ID - UserId any // 用户ID - RegionId any // 区域ID - Day any // 日期YYYYMMDD - TimeAt any // 时间点HHII - Bytes any // 带宽 - TotalBytes any // 总流量 - AvgBytes any // 平均流量 + Id any // ID + UserId any // 用户ID + RegionId any // 区域ID + Day any // 日期YYYYMMDD + TimeAt any // 时间点HHII + Bytes any // 带宽 + TotalBytes any // 总流量 + AvgBytes any // 平均流量 + CachedBytes any // 缓存的流量 + AttackBytes any // 攻击流量 + CountRequests any // 请求数 + CountCachedRequests any // 缓存的请求数 + CountAttackRequests any // 攻击请求数 } func NewUserBandwidthStatOperator() *UserBandwidthStatOperator { diff --git a/internal/rpc/services/service_server_bandwidth_stat.go b/internal/rpc/services/service_server_bandwidth_stat.go index f5dbb0e2..bb943e8c 100644 --- a/internal/rpc/services/service_server_bandwidth_stat.go +++ b/internal/rpc/services/service_server_bandwidth_stat.go @@ -65,7 +65,7 @@ func init() { for _, stat := range m { // 更新服务的带宽峰值 if stat.ServerId > 0 { - err := models.SharedServerBandwidthStatDAO.UpdateServerBandwidth(tx, stat.UserId, stat.ServerId, stat.Day, stat.TimeAt, stat.Bytes, stat.TotalBytes) + err := models.SharedServerBandwidthStatDAO.UpdateServerBandwidth(tx, stat.UserId, stat.ServerId, stat.NodeRegionId, stat.Day, stat.TimeAt, stat.Bytes, stat.TotalBytes, stat.CachedBytes, stat.AttackBytes, stat.CountRequests, stat.CountCachedRequests, stat.CountAttackRequests) if err != nil { remotelogs.Error("ServerBandwidthStatService", "dump bandwidth stats failed: "+err.Error()) } @@ -78,7 +78,7 @@ func init() { // 更新用户的带宽峰值 if stat.UserId > 0 { - err = models.SharedUserBandwidthStatDAO.UpdateUserBandwidth(tx, stat.UserId, stat.NodeRegionId, stat.Day, stat.TimeAt, stat.Bytes, stat.TotalBytes) + err = models.SharedUserBandwidthStatDAO.UpdateUserBandwidth(tx, stat.UserId, stat.NodeRegionId, stat.Day, stat.TimeAt, stat.Bytes, stat.TotalBytes, stat.CachedBytes, stat.AttackBytes, stat.CountRequests, stat.CountCachedRequests, stat.CountAttackRequests) if err != nil { remotelogs.Error("SharedUserBandwidthStatDAO", "dump bandwidth stats failed: "+err.Error()) } @@ -127,16 +127,26 @@ func (this *ServerBandwidthStatService) UploadServerBandwidthStats(ctx context.C if ok { oldStat.Bytes += stat.Bytes oldStat.TotalBytes += stat.TotalBytes + oldStat.CachedBytes += stat.CachedBytes + oldStat.AttackBytes += stat.AttackBytes + oldStat.CountRequests += stat.CountRequests + oldStat.CountCachedRequests += stat.CountCachedRequests + oldStat.CountAttackRequests += stat.CountAttackRequests } else { serverBandwidthStatsMap[key] = &pb.ServerBandwidthStat{ - Id: 0, - NodeRegionId: stat.NodeRegionId, - UserId: stat.UserId, - ServerId: stat.ServerId, - Day: stat.Day, - TimeAt: stat.TimeAt, - Bytes: stat.Bytes, - TotalBytes: stat.TotalBytes, + Id: 0, + NodeRegionId: stat.NodeRegionId, + UserId: stat.UserId, + ServerId: stat.ServerId, + Day: stat.Day, + TimeAt: stat.TimeAt, + Bytes: stat.Bytes, + TotalBytes: stat.TotalBytes, + CachedBytes: stat.CachedBytes, + AttackBytes: stat.AttackBytes, + CountRequests: stat.CountRequests, + CountCachedRequests: stat.CountCachedRequests, + CountAttackRequests: stat.CountAttackRequests, } } serverBandwidthStatsLocker.Unlock() diff --git a/internal/rpc/services/service_server_daily_stat.go b/internal/rpc/services/service_server_daily_stat.go index 0a0a231f..51609664 100644 --- a/internal/rpc/services/service_server_daily_stat.go +++ b/internal/rpc/services/service_server_daily_stat.go @@ -239,7 +239,7 @@ func (this *ServerDailyStatService) FindLatestServerDailyStats(ctx context.Conte if req.Days > 0 { for i := int32(0); i < req.Days; i++ { dayString := timeutil.Format("Ymd", time.Now().AddDate(0, 0, -int(i))) - stat, err := models.SharedServerDailyStatDAO.SumDailyStat(tx, 0, req.ServerId, req.NodeRegionId, dayString, dayString) + stat, err := models.SharedServerBandwidthStatDAO.SumDailyStat(tx, req.ServerId, req.NodeRegionId, dayString, dayString) if err != nil { return nil, err } @@ -396,7 +396,12 @@ func (this *ServerDailyStatService) SumServerDailyStats(ctx context.Context, req req.DayTo = req.DayFrom } - stat, err := models.SharedServerDailyStatDAO.SumDailyStat(tx, req.UserId, req.ServerId, req.NodeRegionId, req.DayFrom, req.DayTo) + var stat *pb.ServerDailyStat + if req.ServerId > 0 { + stat, err = models.SharedServerBandwidthStatDAO.SumDailyStat(tx, req.ServerId, req.NodeRegionId, req.DayFrom, req.DayTo) + } else { + stat, err = models.SharedUserBandwidthStatDAO.SumDailyStat(tx, req.UserId, req.NodeRegionId, req.DayFrom, req.DayTo) + } if err != nil { return nil, err } diff --git a/internal/rpc/services/service_server_stat_board.go b/internal/rpc/services/service_server_stat_board.go index 87aca776..3bb6fa29 100644 --- a/internal/rpc/services/service_server_stat_board.go +++ b/internal/rpc/services/service_server_stat_board.go @@ -587,14 +587,14 @@ func (this *ServerStatBoardService) ComposeServerStatBoard(ctx context.Context, // 按日流量统计 var dayFrom = timeutil.Format("Ymd", time.Now().AddDate(0, 0, -14)) - dailyTrafficStats, err := models.SharedServerDailyStatDAO.FindDailyStats(tx, req.ServerId, dayFrom, timeutil.Format("Ymd")) + dailyTrafficStats, err := models.SharedServerBandwidthStatDAO.FindDailyStats(tx, req.ServerId, dayFrom, timeutil.Format("Ymd")) if err != nil { return nil, err } for _, stat := range dailyTrafficStats { result.DailyTrafficStats = append(result.DailyTrafficStats, &pb.ComposeServerStatBoardResponse_DailyTrafficStat{ Day: stat.Day, - Bytes: int64(stat.Bytes), + Bytes: int64(stat.TotalBytes), CachedBytes: int64(stat.CachedBytes), CountRequests: int64(stat.CountRequests), CountCachedRequests: int64(stat.CountCachedRequests), @@ -606,14 +606,14 @@ func (this *ServerStatBoardService) ComposeServerStatBoard(ctx context.Context, // 小时流量统计 var hourFrom = timeutil.Format("YmdH", time.Now().Add(-23*time.Hour)) var hourTo = timeutil.Format("YmdH") - hourlyTrafficStats, err := models.SharedServerDailyStatDAO.FindHourlyStats(tx, req.ServerId, hourFrom, hourTo) + hourlyTrafficStats, err := models.SharedServerBandwidthStatDAO.FindHourlyStats(tx, req.ServerId, hourFrom, hourTo) if err != nil { return nil, err } for _, stat := range hourlyTrafficStats { result.HourlyTrafficStats = append(result.HourlyTrafficStats, &pb.ComposeServerStatBoardResponse_HourlyTrafficStat{ - Hour: stat.Hour, - Bytes: int64(stat.Bytes), + Hour: stat.Day + stat.TimeAt[:2], + Bytes: int64(stat.TotalBytes), CachedBytes: int64(stat.CachedBytes), CountRequests: int64(stat.CountRequests), CountCachedRequests: int64(stat.CountCachedRequests), diff --git a/internal/rpc/services/users/service_user.go b/internal/rpc/services/users/service_user.go index cace4927..ff205362 100644 --- a/internal/rpc/services/users/service_user.go +++ b/internal/rpc/services/users/service_user.go @@ -421,7 +421,7 @@ func (this *UserService) ComposeUserDashboard(ctx context.Context, req *pb.Compo var currentDay = timeutil.Format("Ymd") // 本月总流量 - monthlyTrafficBytes, err := models.SharedServerDailyStatDAO.SumUserMonthly(tx, req.UserId, currentMonth) + monthlyTrafficBytes, err := models.SharedUserBandwidthStatDAO.SumUserMonthly(tx, req.UserId, currentMonth) if err != nil { return nil, err } @@ -451,12 +451,12 @@ func (this *UserService) ComposeUserDashboard(ctx context.Context, req *pb.Compo } // 今日总流量 - dailyTrafficStat, err := models.SharedServerDailyStatDAO.SumUserDaily(tx, req.UserId, 0, currentDay) + dailyTrafficStat, err := models.SharedUserBandwidthStatDAO.SumUserDaily(tx, req.UserId, 0, currentDay) if err != nil { return nil, err } if dailyTrafficStat == nil { - dailyTrafficStat = &models.ServerDailyStat{} + dailyTrafficStat = &models.UserBandwidthStat{} } // 近 30 日流量带宽趋势 @@ -475,12 +475,12 @@ func (this *UserService) ComposeUserDashboard(ctx context.Context, req *pb.Compo } // 流量 - trafficStat, err := models.SharedServerDailyStatDAO.SumUserDaily(tx, req.UserId, 0, day) + trafficStat, err := models.SharedUserBandwidthStatDAO.SumUserDaily(tx, req.UserId, 0, day) if err != nil { return nil, err } if trafficStat == nil { - trafficStat = &models.ServerDailyStat{} + trafficStat = &models.UserBandwidthStat{} } // 峰值带宽 @@ -495,7 +495,7 @@ func (this *UserService) ComposeUserDashboard(ctx context.Context, req *pb.Compo dailyTrafficStats = append(dailyTrafficStats, &pb.ComposeUserDashboardResponse_DailyTrafficStat{ Day: day, - Bytes: int64(trafficStat.Bytes), + Bytes: int64(trafficStat.TotalBytes), CachedBytes: int64(trafficStat.CachedBytes), AttackBytes: int64(trafficStat.AttackBytes), CountRequests: int64(trafficStat.CountRequests), @@ -508,7 +508,7 @@ func (this *UserService) ComposeUserDashboard(ctx context.Context, req *pb.Compo CountServers: countServers, MonthlyTrafficBytes: monthlyTrafficBytes, MonthlyPeekBandwidthBytes: monthlyPeekBandwidthBytes, - DailyTrafficBytes: int64(dailyTrafficStat.Bytes), + DailyTrafficBytes: int64(dailyTrafficStat.TotalBytes), DailyPeekBandwidthBytes: dailyPeekBandwidthBytes, DailyTrafficStats: dailyTrafficStats, DailyPeekBandwidthStats: dailyPeekBandwidthStats, @@ -728,8 +728,8 @@ func (this *UserService) ComposeUserGlobalBoard(ctx context.Context, req *pb.Com } // 流量排行 - hourFrom := timeutil.Format("YmdH", time.Now().Add(-23*time.Hour)) - hourTo := timeutil.Format("YmdH") + var hourFrom = timeutil.Format("YmdH", time.Now().Add(-23*time.Hour)) + var hourTo = timeutil.Format("YmdH") topUserStats, err := models.SharedServerDailyStatDAO.FindTopUserStats(tx, hourFrom, hourTo) if err != nil { return nil, err diff --git a/internal/utils/regexputils/expr.go b/internal/utils/regexputils/expr.go index 94bb2163..5a22a5a2 100644 --- a/internal/utils/regexputils/expr.go +++ b/internal/utils/regexputils/expr.go @@ -5,6 +5,7 @@ package regexputils import "regexp" var ( - YYYYMMDD = regexp.MustCompile(`^\d{8}$`) - YYYYMM = regexp.MustCompile(`^\d{6}$`) + YYYYMMDDHH = regexp.MustCompile(`^\d{10}$`) + YYYYMMDD = regexp.MustCompile(`^\d{8}$`) + YYYYMM = regexp.MustCompile(`^\d{6}$`) )