Files
EdgeAPI/internal/db/models/server_daily_stat_dao.go

394 lines
12 KiB
Go
Raw Normal View History

package models
import (
"github.com/TeaOSLab/EdgeAPI/internal/errors"
2021-07-07 19:55:37 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
2021-07-05 11:37:22 +08:00
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/maps"
2021-07-19 21:01:26 +08:00
"github.com/iwind/TeaGo/rands"
timeutil "github.com/iwind/TeaGo/utils/time"
2021-06-08 11:18:27 +08:00
"regexp"
2021-07-05 11:37:22 +08:00
"time"
)
type ServerDailyStatDAO dbs.DAO
2021-07-05 11:37:22 +08:00
func init() {
dbs.OnReadyDone(func() {
// 清理数据任务
2021-07-19 21:01:26 +08:00
var ticker = time.NewTicker(time.Duration(rands.Int(24, 48)) * time.Hour)
2021-07-05 11:37:22 +08:00
go func() {
for range ticker.C {
err := SharedServerDailyStatDAO.Clean(nil, 60) // 只保留60天
if err != nil {
logs.Println("ServerDailyStatDAO", "clean expired data failed: "+err.Error())
}
}
}()
})
}
func NewServerDailyStatDAO() *ServerDailyStatDAO {
return dbs.NewDAO(&ServerDailyStatDAO{
DAOObject: dbs.DAOObject{
DB: Tea.Env,
Table: "edgeServerDailyStats",
Model: new(ServerDailyStat),
PkName: "id",
},
}).(*ServerDailyStatDAO)
}
var SharedServerDailyStatDAO *ServerDailyStatDAO
func init() {
dbs.OnReady(func() {
SharedServerDailyStatDAO = NewServerDailyStatDAO()
})
}
2021-06-08 11:18:27 +08:00
// SaveStats 提交数据
func (this *ServerDailyStatDAO) SaveStats(tx *dbs.Tx, stats []*pb.ServerDailyStat) error {
2021-07-11 18:05:57 +08:00
var serverUserMap = map[int64]int64{} // serverId => userId
var cacheMap = maps.Map{}
for _, stat := range stats {
day := timeutil.FormatTime("Ymd", stat.CreatedAt)
2021-07-05 11:37:22 +08:00
hour := timeutil.FormatTime("YmdH", stat.CreatedAt)
timeFrom := timeutil.FormatTime("His", stat.CreatedAt)
2021-06-08 11:18:27 +08:00
timeTo := timeutil.FormatTime("His", stat.CreatedAt+5*60-1) // 5分钟
// 所属用户
2021-07-11 18:05:57 +08:00
serverUserId, ok := serverUserMap[stat.ServerId]
if !ok {
userId, err := SharedServerDAO.FindServerUserId(tx, stat.ServerId)
if err != nil {
return err
}
serverUserId = userId
}
_, _, err := this.Query(tx).
Param("bytes", stat.Bytes).
2021-06-08 11:18:27 +08:00
Param("cachedBytes", stat.CachedBytes).
Param("countRequests", stat.CountRequests).
Param("countCachedRequests", stat.CountCachedRequests).
2021-07-13 11:04:45 +08:00
Param("countAttackRequests", stat.CountAttackRequests).
Param("attackBytes", stat.AttackBytes).
InsertOrUpdate(maps.Map{
2021-07-11 18:05:57 +08:00
"userId": serverUserId,
2021-06-08 11:18:27 +08:00
"serverId": stat.ServerId,
"regionId": stat.RegionId,
2021-07-13 11:04:45 +08:00
"bytes": stat.Bytes,
"cachedBytes": stat.CachedBytes,
"countRequests": stat.CountRequests,
"countCachedRequests": stat.CountCachedRequests,
"countAttackRequests": stat.CountAttackRequests,
"attackBytes": stat.AttackBytes,
2021-06-08 11:18:27 +08:00
"day": day,
2021-07-05 11:37:22 +08:00
"hour": hour,
2021-06-08 11:18:27 +08:00
"timeFrom": timeFrom,
"timeTo": timeTo,
}, maps.Map{
2021-06-08 11:18:27 +08:00
"bytes": dbs.SQL("bytes+:bytes"),
"cachedBytes": dbs.SQL("cachedBytes+:cachedBytes"),
"countRequests": dbs.SQL("countRequests+:countRequests"),
"countCachedRequests": dbs.SQL("countCachedRequests+:countCachedRequests"),
2021-07-13 11:04:45 +08:00
"countAttackRequests": dbs.SQL("countAttackRequests+:countAttackRequests"),
"attackBytes": dbs.SQL("attackBytes+:attackBytes"),
})
if err != nil {
return err
}
2021-11-09 17:36:54 +08:00
// 更新流量限制状态
trafficLimit, err := SharedServerDAO.FindServerTrafficLimitConfig(tx, stat.ServerId, cacheMap)
if err != nil {
return err
}
2021-11-09 17:36:54 +08:00
if trafficLimit != nil && trafficLimit.IsOn && !trafficLimit.IsEmpty() {
err = SharedServerDAO.UpdateServerTrafficLimitStatus(tx, trafficLimit, stat.ServerId, false)
if err != nil {
return err
}
}
}
return nil
}
2021-06-08 11:18:27 +08:00
// SumUserMonthly 根据用户计算某月合计
// month 格式为YYYYMM
func (this *ServerDailyStatDAO) SumUserMonthly(tx *dbs.Tx, userId int64, regionId int64, month string) (int64, error) {
query := this.Query(tx)
2020-12-15 16:53:31 +08:00
if regionId > 0 {
query.Attr("regionId", regionId)
}
return query.Between("day", month+"01", month+"32").
2021-07-11 18:05:57 +08:00
Attr("userId", userId).
2020-12-15 16:53:31 +08:00
SumInt64("bytes", 0)
}
2021-06-08 11:18:27 +08:00
// SumUserMonthlyPeek 获取某月带宽峰值
2020-12-15 16:53:31 +08:00
// month 格式为YYYYMM
func (this *ServerDailyStatDAO) SumUserMonthlyPeek(tx *dbs.Tx, userId int64, regionId int64, month string) (int64, error) {
query := this.Query(tx)
2020-12-15 16:53:31 +08:00
if regionId > 0 {
query.Attr("regionId", regionId)
}
max, err := query.Between("day", month+"01", month+"32").
2021-07-11 18:05:57 +08:00
Attr("userId", userId).
2020-12-15 16:53:31 +08:00
Max("bytes", 0)
if err != nil {
return 0, err
}
return int64(max), nil
}
2021-06-08 11:18:27 +08:00
// SumUserDaily 获取某天流量总和
2020-12-15 16:53:31 +08:00
// day 格式为YYYYMMDD
func (this *ServerDailyStatDAO) SumUserDaily(tx *dbs.Tx, userId int64, regionId int64, day string) (int64, error) {
query := this.Query(tx)
2020-12-15 16:53:31 +08:00
if regionId > 0 {
query.Attr("regionId", regionId)
}
return query.
Attr("day", day).
2021-07-11 18:05:57 +08:00
Attr("userId", userId).
SumInt64("bytes", 0)
}
2020-12-15 16:53:31 +08:00
2021-06-08 11:18:27 +08:00
// SumUserDailyPeek 获取某天带宽峰值
2020-12-15 16:53:31 +08:00
// day 格式为YYYYMMDD
func (this *ServerDailyStatDAO) SumUserDailyPeek(tx *dbs.Tx, userId int64, regionId int64, day string) (int64, error) {
query := this.Query(tx)
2020-12-15 16:53:31 +08:00
if regionId > 0 {
query.Attr("regionId", regionId)
}
max, err := query.
Attr("day", day).
2021-07-11 18:05:57 +08:00
Attr("userId", userId).
2020-12-15 16:53:31 +08:00
Max("bytes", 0)
if err != nil {
return 0, err
}
return int64(max), nil
}
2021-06-08 11:18:27 +08:00
2021-06-08 15:10:08 +08:00
// SumMinutelyStat 获取某个分钟内的流量
// minute 格式为YYYYMMDDHHMM并且已经格式化成每5分钟一个值
func (this *ServerDailyStatDAO) SumMinutelyStat(tx *dbs.Tx, serverId int64, minute string) (stat *pb.ServerDailyStat, err error) {
stat = &pb.ServerDailyStat{}
if !regexp.MustCompile(`^\d{12}$`).MatchString(minute) {
return
}
one, _, err := this.Query(tx).
2021-07-13 11:04:45 +08:00
Result("SUM(bytes) AS bytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests, SUM(countAttackRequests) AS countAttackRequests, SUM(attackBytes) AS attackBytes").
2021-06-08 15:10:08 +08:00
Attr("serverId", serverId).
Attr("day", minute[:8]).
Attr("timeFrom", minute[8:]+"00").
FindOne()
if err != nil {
return nil, err
}
if one == nil {
return
}
stat.Bytes = one.GetInt64("bytes")
stat.CachedBytes = one.GetInt64("cachedBytes")
stat.CountRequests = one.GetInt64("countRequests")
stat.CountCachedRequests = one.GetInt64("countCachedRequests")
2021-07-13 11:04:45 +08:00
stat.CountAttackRequests = one.GetInt64("countAttackRequests")
stat.AttackBytes = one.GetInt64("attackBytes")
2021-06-08 15:10:08 +08:00
return
}
// SumHourlyStat 获取某个小时内的流量
2021-06-08 11:18:27 +08:00
// hour 格式为YYYYMMDDHH
func (this *ServerDailyStatDAO) SumHourlyStat(tx *dbs.Tx, serverId int64, hour string) (stat *pb.ServerDailyStat, err error) {
stat = &pb.ServerDailyStat{}
if !regexp.MustCompile(`^\d{10}$`).MatchString(hour) {
return
}
one, _, err := this.Query(tx).
2021-07-13 11:04:45 +08:00
Result("SUM(bytes) AS bytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests, SUM(countAttackRequests) AS countAttackRequests, SUM(attackBytes) AS attackBytes").
2021-06-08 11:18:27 +08:00
Attr("serverId", serverId).
Attr("day", hour[:8]).
Gte("timeFrom", hour[8:]+"0000").
Lte("timeTo", hour[8:]+"5959").
FindOne()
if err != nil {
return nil, err
}
if one == nil {
return
}
stat.Bytes = one.GetInt64("bytes")
stat.CachedBytes = one.GetInt64("cachedBytes")
stat.CountRequests = one.GetInt64("countRequests")
stat.CountCachedRequests = one.GetInt64("countCachedRequests")
2021-07-13 11:04:45 +08:00
stat.CountAttackRequests = one.GetInt64("countAttackRequests")
stat.AttackBytes = one.GetInt64("attackBytes")
2021-06-08 11:18:27 +08:00
return
}
2021-06-08 15:10:08 +08:00
// SumDailyStat 获取某天内的流量
// day 格式为YYYYMMDD
func (this *ServerDailyStatDAO) SumDailyStat(tx *dbs.Tx, serverId int64, day string) (stat *pb.ServerDailyStat, err error) {
stat = &pb.ServerDailyStat{}
if !regexp.MustCompile(`^\d{8}$`).MatchString(day) {
return nil, errors.New("invalid day '" + day + "'")
2021-06-08 15:10:08 +08:00
}
one, _, err := this.Query(tx).
2021-07-13 11:04:45 +08:00
Result("SUM(bytes) AS bytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests, SUM(countAttackRequests) AS countAttackRequests, SUM(attackBytes) AS attackBytes").
2021-06-08 15:10:08 +08:00
Attr("serverId", serverId).
Attr("day", day).
FindOne()
if err != nil {
return nil, err
}
if one == nil {
return
}
stat.Bytes = one.GetInt64("bytes")
stat.CachedBytes = one.GetInt64("cachedBytes")
stat.CountRequests = one.GetInt64("countRequests")
stat.CountCachedRequests = one.GetInt64("countCachedRequests")
stat.CountAttackRequests = one.GetInt64("countAttackRequests")
stat.AttackBytes = one.GetInt64("attackBytes")
return
}
// SumMonthlyStat 获取某月内的流量
// month 格式为YYYYMM
func (this *ServerDailyStatDAO) SumMonthlyStat(tx *dbs.Tx, serverId int64, month string) (stat *pb.ServerDailyStat, err error) {
stat = &pb.ServerDailyStat{}
if !regexp.MustCompile(`^\d{6}$`).MatchString(month) {
return
}
one, _, err := this.Query(tx).
Result("SUM(bytes) AS bytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests, SUM(countAttackRequests) AS countAttackRequests, SUM(attackBytes) AS attackBytes").
Attr("serverId", serverId).
Between("day", month+"01", month+"31").
FindOne()
if err != nil {
return nil, err
}
2021-06-08 15:10:08 +08:00
if one == nil {
return
}
stat.Bytes = one.GetInt64("bytes")
stat.CachedBytes = one.GetInt64("cachedBytes")
stat.CountRequests = one.GetInt64("countRequests")
stat.CountCachedRequests = one.GetInt64("countCachedRequests")
2021-07-13 11:04:45 +08:00
stat.CountAttackRequests = one.GetInt64("countAttackRequests")
stat.AttackBytes = one.GetInt64("attackBytes")
2021-06-08 15:10:08 +08:00
return
}
2021-07-05 11:37:22 +08:00
2021-07-07 19:55:37 +08:00
// FindDailyStats 按天统计
func (this *ServerDailyStatDAO) FindDailyStats(tx *dbs.Tx, serverId int64, dayFrom string, dayTo string) (result []*ServerDailyStat, err error) {
ones, err := this.Query(tx).
2021-07-13 11:04:45 +08:00
Result("SUM(bytes) AS bytes", "SUM(cachedBytes) AS cachedBytes", "SUM(countRequests) AS countRequests", "SUM(countCachedRequests) AS countCachedRequests", "SUM(countAttackRequests) AS countAttackRequests", "SUM(attackBytes) AS attackBytes", "day").
2021-07-07 19:55:37 +08:00
Attr("serverId", serverId).
Between("day", dayFrom, dayTo).
Group("day").
FindAll()
if err != nil {
return nil, err
}
dayMap := map[string]*ServerDailyStat{} // day => Stat
for _, one := range ones {
stat := one.(*ServerDailyStat)
dayMap[stat.Day] = stat
}
days, err := utils.RangeDays(dayFrom, dayTo)
if err != nil {
return nil, err
}
for _, day := range days {
stat, ok := dayMap[day]
if ok {
result = append(result, stat)
} else {
result = append(result, &ServerDailyStat{Day: day})
}
}
return
}
// FindHourlyStats 按小时统计
func (this *ServerDailyStatDAO) FindHourlyStats(tx *dbs.Tx, serverId int64, hourFrom string, hourTo string) (result []*ServerDailyStat, err error) {
ones, err := this.Query(tx).
2021-07-13 11:04:45 +08:00
Result("SUM(bytes) AS bytes", "SUM(cachedBytes) AS cachedBytes", "SUM(countRequests) AS countRequests", "SUM(countCachedRequests) AS countCachedRequests", "SUM(countAttackRequests) AS countAttackRequests", "SUM(attackBytes) AS attackBytes", "hour").
2021-07-07 19:55:37 +08:00
Attr("serverId", serverId).
Between("hour", hourFrom, hourTo).
Group("hour").
FindAll()
if err != nil {
return nil, err
}
hourMap := map[string]*ServerDailyStat{} // hour => Stat
for _, one := range ones {
stat := one.(*ServerDailyStat)
hourMap[stat.Hour] = stat
}
hours, err := utils.RangeHours(hourFrom, hourTo)
if err != nil {
return nil, err
}
for _, hour := range hours {
stat, ok := hourMap[hour]
if ok {
result = append(result, stat)
} else {
result = append(result, &ServerDailyStat{Hour: hour})
}
}
return
}
2021-07-11 18:05:57 +08:00
// FindTopUserStats 流量排行
func (this *ServerDailyStatDAO) FindTopUserStats(tx *dbs.Tx, hourFrom string, hourTo string) (result []*ServerDailyStat, err error) {
_, err = this.Query(tx).
2021-07-13 11:04:45 +08:00
Result("userId", "SUM(bytes) AS bytes", "SUM(countRequests) AS countRequests, SUM(countAttackRequests) AS countAttackRequests, SUM(attackBytes) AS attackBytes").
2021-07-11 18:05:57 +08:00
Between("hour", hourFrom, hourTo).
Where("userId>0").
Group("userId").
Slice(&result).
FindAll()
return
}
2021-07-05 11:37:22 +08:00
// Clean 清理历史数据
func (this *ServerDailyStatDAO) Clean(tx *dbs.Tx, days int) error {
var day = timeutil.Format("Ymd", time.Now().AddDate(0, 0, -days))
_, err := this.Query(tx).
Lt("day", day).
Delete()
return err
}