mirror of
https://github.com/TeaOSLab/EdgeAPI.git
synced 2025-11-03 15:00:27 +08:00
244 lines
7.2 KiB
Go
244 lines
7.2 KiB
Go
package models
|
||
|
||
import (
|
||
"errors"
|
||
"math"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/TeaOSLab/EdgeAPI/internal/goman"
|
||
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
|
||
"github.com/TeaOSLab/EdgeAPI/internal/utils/regexputils"
|
||
_ "github.com/go-sql-driver/mysql"
|
||
"github.com/iwind/TeaGo/Tea"
|
||
"github.com/iwind/TeaGo/dbs"
|
||
"github.com/iwind/TeaGo/maps"
|
||
"github.com/iwind/TeaGo/rands"
|
||
"github.com/iwind/TeaGo/types"
|
||
timeutil "github.com/iwind/TeaGo/utils/time"
|
||
)
|
||
|
||
type UserPlanBandwidthStatDAO dbs.DAO
|
||
|
||
const (
|
||
UserPlanBandwidthStatTablePartitions = 20 // 分表数量
|
||
)
|
||
|
||
func init() {
|
||
dbs.OnReadyDone(func() {
|
||
// 清理数据任务
|
||
var ticker = time.NewTicker(time.Duration(rands.Int(24, 48)) * time.Hour)
|
||
goman.New(func() {
|
||
for range ticker.C {
|
||
err := SharedUserPlanBandwidthStatDAO.CleanDefaultDays(nil, 100)
|
||
if err != nil {
|
||
remotelogs.Error("SharedUserPlanBandwidthStatDAO", "clean expired data failed: "+err.Error())
|
||
}
|
||
}
|
||
})
|
||
})
|
||
}
|
||
|
||
func NewUserPlanBandwidthStatDAO() *UserPlanBandwidthStatDAO {
|
||
return dbs.NewDAO(&UserPlanBandwidthStatDAO{
|
||
DAOObject: dbs.DAOObject{
|
||
DB: Tea.Env,
|
||
Table: "edgeUserPlanBandwidthStats",
|
||
Model: new(UserPlanBandwidthStat),
|
||
PkName: "id",
|
||
},
|
||
}).(*UserPlanBandwidthStatDAO)
|
||
}
|
||
|
||
var SharedUserPlanBandwidthStatDAO *UserPlanBandwidthStatDAO
|
||
|
||
func init() {
|
||
dbs.OnReady(func() {
|
||
SharedUserPlanBandwidthStatDAO = NewUserPlanBandwidthStatDAO()
|
||
})
|
||
}
|
||
|
||
// UpdateUserPlanBandwidth 写入数据
|
||
// 暂时不使用region区分
|
||
func (this *UserPlanBandwidthStatDAO) UpdateUserPlanBandwidth(tx *dbs.Tx, userId int64, userPlanId int64, regionId int64, day string, timeAt string, bandwidthBytes int64, totalBytes int64, cachedBytes int64, attackBytes int64, countRequests int64, countCachedRequests int64, countAttackRequests int64, countWebsocketConnections int64) error {
|
||
if userId <= 0 || userPlanId <= 0 {
|
||
return nil
|
||
}
|
||
|
||
return this.Query(tx).
|
||
Table(this.partialTable(userPlanId)).
|
||
Param("bytes", bandwidthBytes).
|
||
Param("totalBytes", totalBytes).
|
||
Param("cachedBytes", cachedBytes).
|
||
Param("attackBytes", attackBytes).
|
||
Param("countRequests", countRequests).
|
||
Param("countCachedRequests", countCachedRequests).
|
||
Param("countAttackRequests", countAttackRequests).
|
||
Param("countWebsocketConnections", countWebsocketConnections).
|
||
InsertOrUpdateQuickly(maps.Map{
|
||
"userId": userId,
|
||
"userPlanId": userPlanId,
|
||
"regionId": regionId,
|
||
"day": day,
|
||
"timeAt": timeAt,
|
||
"bytes": bandwidthBytes,
|
||
"totalBytes": totalBytes,
|
||
"avgBytes": totalBytes / 300,
|
||
"cachedBytes": cachedBytes,
|
||
"attackBytes": attackBytes,
|
||
"countRequests": countRequests,
|
||
"countCachedRequests": countCachedRequests,
|
||
"countAttackRequests": countAttackRequests,
|
||
"countWebsocketConnections": countWebsocketConnections,
|
||
}, maps.Map{
|
||
"bytes": dbs.SQL("bytes+:bytes"),
|
||
"avgBytes": dbs.SQL("(totalBytes+:totalBytes)/300"), // 因为生成SQL语句时会自动将avgBytes排在totalBytes之前,所以这里不用担心先后顺序的问题
|
||
"totalBytes": dbs.SQL("totalBytes+:totalBytes"),
|
||
"cachedBytes": dbs.SQL("cachedBytes+:cachedBytes"),
|
||
"attackBytes": dbs.SQL("attackBytes+:attackBytes"),
|
||
"countRequests": dbs.SQL("countRequests+:countRequests"),
|
||
"countCachedRequests": dbs.SQL("countCachedRequests+:countCachedRequests"),
|
||
"countAttackRequests": dbs.SQL("countAttackRequests+:countAttackRequests"),
|
||
"countWebsocketConnections": dbs.SQL("countWebsocketConnections+:countWebsocketConnections"),
|
||
})
|
||
}
|
||
|
||
// FindMonthlyPercentile 获取某月内百分位
|
||
func (this *UserPlanBandwidthStatDAO) FindMonthlyPercentile(tx *dbs.Tx, userPlanId int64, month string, percentile int, useAvg bool) (result int64, err error) {
|
||
if percentile <= 0 {
|
||
percentile = 95
|
||
}
|
||
|
||
// 如果是100%以上,则快速返回
|
||
if percentile >= 100 {
|
||
result, err = this.Query(tx).
|
||
Table(this.partialTable(userPlanId)).
|
||
Attr("userPlanId", userPlanId).
|
||
Result(this.sumBytesField(useAvg)).
|
||
Between("day", month+"01", month+"31").
|
||
Group("day").
|
||
Group("timeAt").
|
||
Desc("bytes").
|
||
Limit(1).
|
||
FindInt64Col(0)
|
||
return
|
||
}
|
||
|
||
// 总数量
|
||
total, err := this.Query(tx).
|
||
Table(this.partialTable(userPlanId)).
|
||
Attr("userPlanId", userPlanId).
|
||
Between("day", month+"01", month+"31").
|
||
CountAttr("DISTINCT day, timeAt")
|
||
if err != nil {
|
||
return 0, err
|
||
}
|
||
if total == 0 {
|
||
return 0, nil
|
||
}
|
||
|
||
var offset int64
|
||
|
||
if total > 1 {
|
||
offset = int64(math.Ceil(float64(total) * float64(100-percentile) / 100))
|
||
}
|
||
|
||
// 查询 nth 位置
|
||
result, err = this.Query(tx).
|
||
Table(this.partialTable(userPlanId)).
|
||
Attr("userPlanId", userPlanId).
|
||
Result(this.sumBytesField(useAvg)).
|
||
Between("day", month+"01", month+"31").
|
||
Group("day").
|
||
Group("timeAt").
|
||
Desc("bytes").
|
||
Offset(offset).
|
||
Limit(1).
|
||
FindInt64Col(0)
|
||
|
||
return
|
||
}
|
||
|
||
// SumMonthlyBytes 读取单月总流量
|
||
func (this *UserPlanBandwidthStatDAO) SumMonthlyBytes(tx *dbs.Tx, userPlanId int64, month string) (int64, error) {
|
||
if !regexputils.YYYYMM.MatchString(month) {
|
||
return 0, errors.New("invalid ")
|
||
}
|
||
|
||
return this.Query(tx).
|
||
Table(this.partialTable(userPlanId)).
|
||
Attr("userPlanId", userPlanId).
|
||
Between("day", month+"01", month+"31").
|
||
SumInt64("totalBytes", 0)
|
||
}
|
||
|
||
// CleanDefaultDays 清理过期数据
|
||
func (this *UserPlanBandwidthStatDAO) CleanDefaultDays(tx *dbs.Tx, defaultDays int) error {
|
||
databaseConfig, err := SharedSysSettingDAO.ReadDatabaseConfig(tx)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
if databaseConfig != nil && databaseConfig.UserPlanBandwidthStat.Clean.Days > 0 {
|
||
defaultDays = databaseConfig.UserPlanBandwidthStat.Clean.Days
|
||
}
|
||
if defaultDays <= 0 {
|
||
defaultDays = 100
|
||
}
|
||
|
||
return this.CleanDays(tx, defaultDays)
|
||
}
|
||
|
||
// CleanDays 清理过期数据
|
||
func (this *UserPlanBandwidthStatDAO) CleanDays(tx *dbs.Tx, days int) error {
|
||
var day = timeutil.Format("Ymd", time.Now().AddDate(0, 0, -days)) // 保留大约3个月的数据
|
||
return this.runBatch(func(table string, locker *sync.Mutex) error {
|
||
_, err := this.Query(tx).
|
||
Table(table).
|
||
Lt("day", day).
|
||
Delete()
|
||
return err
|
||
})
|
||
}
|
||
|
||
// 获取字节字段
|
||
func (this *UserPlanBandwidthStatDAO) bytesField(useAvg bool) string {
|
||
if useAvg {
|
||
return "avgBytes AS bytes"
|
||
}
|
||
return "bytes"
|
||
}
|
||
|
||
func (this *UserPlanBandwidthStatDAO) sumBytesField(useAvg bool) string {
|
||
if useAvg {
|
||
return "SUM(avgBytes) AS bytes"
|
||
}
|
||
return "SUM(bytes) AS bytes"
|
||
}
|
||
|
||
// 批量执行
|
||
func (this *UserPlanBandwidthStatDAO) runBatch(f func(table string, locker *sync.Mutex) error) error {
|
||
var locker = &sync.Mutex{}
|
||
var wg = sync.WaitGroup{}
|
||
wg.Add(UserPlanBandwidthStatTablePartitions)
|
||
var resultErr error
|
||
for i := 0; i < UserPlanBandwidthStatTablePartitions; i++ {
|
||
var table = this.partialTable(int64(i))
|
||
go func(table string) {
|
||
defer wg.Done()
|
||
|
||
err := f(table, locker)
|
||
if err != nil {
|
||
resultErr = err
|
||
}
|
||
}(table)
|
||
}
|
||
wg.Wait()
|
||
return resultErr
|
||
}
|
||
|
||
// 获取分区表
|
||
func (this *UserPlanBandwidthStatDAO) partialTable(userPlanId int64) string {
|
||
return this.Table + "_" + types.String(userPlanId%int64(UserPlanBandwidthStatTablePartitions))
|
||
}
|