合并部分流量查询和带宽查询

This commit is contained in:
GoEdgeLab
2023-03-22 17:54:44 +08:00
parent daf17358bd
commit c4a8d0794c
14 changed files with 669 additions and 104 deletions

View File

@@ -1,6 +1,7 @@
package models
import (
"fmt"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
@@ -26,6 +27,9 @@ const (
UserBandwidthStatTablePartials = 20
)
var fullDataMap = map[string]bool{} // month => bool
var fullDataLocker = &sync.Mutex{}
func init() {
dbs.OnReadyDone(func() {
// 清理数据任务
@@ -61,7 +65,7 @@ func init() {
}
// UpdateUserBandwidth 写入数据
func (this *UserBandwidthStatDAO) UpdateUserBandwidth(tx *dbs.Tx, userId int64, regionId int64, day string, timeAt string, bytes int64, totalBytes int64) error {
func (this *UserBandwidthStatDAO) UpdateUserBandwidth(tx *dbs.Tx, userId int64, regionId int64, day string, timeAt string, bytes int64, totalBytes int64, cachedBytes int64, attackBytes int64, countRequests int64, countCachedRequests int64, countAttackRequests int64) error {
if userId <= 0 {
// 如果用户ID不大于0则说明服务不属于任何用户此时不需要处理
return nil
@@ -71,18 +75,33 @@ func (this *UserBandwidthStatDAO) UpdateUserBandwidth(tx *dbs.Tx, userId int64,
Table(this.partialTable(userId)).
Param("bytes", bytes).
Param("totalBytes", totalBytes).
Param("cachedBytes", cachedBytes).
Param("attackBytes", attackBytes).
Param("countRequests", countRequests).
Param("countCachedRequests", countCachedRequests).
Param("countAttackRequests", countAttackRequests).
InsertOrUpdateQuickly(maps.Map{
"userId": userId,
"regionId": regionId,
"day": day,
"timeAt": timeAt,
"bytes": bytes,
"totalBytes": totalBytes,
"avgBytes": totalBytes / 300,
"userId": userId,
"regionId": regionId,
"day": day,
"timeAt": timeAt,
"bytes": bytes,
"totalBytes": totalBytes,
"avgBytes": totalBytes / 300,
"cachedBytes": cachedBytes,
"attackBytes": attackBytes,
"countRequests": countRequests,
"countCachedRequests": countCachedRequests,
"countAttackRequests": countAttackRequests,
}, maps.Map{
"bytes": dbs.SQL("bytes+:bytes"),
"avgBytes": dbs.SQL("(totalBytes+:totalBytes)/300"), // 因为生成SQL语句时会自动将avgBytes排在totalBytes之前所以这里不用担心先后顺序的问题
"totalBytes": dbs.SQL("totalBytes+:totalBytes"),
"bytes": dbs.SQL("bytes+:bytes"),
"avgBytes": dbs.SQL("(totalBytes+:totalBytes)/300"), // 因为生成SQL语句时会自动将avgBytes排在totalBytes之前所以这里不用担心先后顺序的问题
"totalBytes": dbs.SQL("totalBytes+:totalBytes"),
"cachedBytes": dbs.SQL("cachedBytes+:cachedBytes"),
"attackBytes": dbs.SQL("attackBytes+:attackBytes"),
"countRequests": dbs.SQL("countRequests+:countRequests"),
"countCachedRequests": dbs.SQL("countCachedRequests+:countCachedRequests"),
"countAttackRequests": dbs.SQL("countAttackRequests+:countAttackRequests"),
})
}
@@ -320,6 +339,127 @@ func (this *UserBandwidthStatDAO) FindDistinctUserIds(tx *dbs.Tx, dayFrom string
return
}
// SumUserMonthly 获取某月流量总和
// month 格式为YYYYMM
func (this *UserBandwidthStatDAO) SumUserMonthly(tx *dbs.Tx, userId int64, month string) (int64, error) {
// 兼容以往版本
hasFullData, err := this.HasFullData(tx, userId, month)
if err != nil {
return 0, err
}
if !hasFullData {
return SharedServerDailyStatDAO.compatSumUserMonthly(tx, userId, month)
}
return this.Query(tx).
Table(this.partialTable(userId)).
Between("day", month+"01", month+"31").
Attr("userId", userId).
SumInt64("totalBytes", 0)
}
// SumUserDaily 获取某天流量总和
// day 格式为YYYYMMDD
func (this *UserBandwidthStatDAO) SumUserDaily(tx *dbs.Tx, userId int64, regionId int64, day string) (stat *UserBandwidthStat, err error) {
if !regexputils.YYYYMMDD.MatchString(day) {
return nil, nil
}
// 兼容以往版本
hasFullData, err := this.HasFullData(tx, userId, day[:6])
if err != nil {
return nil, err
}
if !hasFullData {
serverStat, err := SharedServerDailyStatDAO.compatSumUserDaily(tx, userId, regionId, day)
if err != nil || serverStat == nil {
return nil, err
}
return serverStat.AsUserBandwidthStat(), nil
}
var query = this.Query(tx)
if regionId > 0 {
query.Attr("regionId", regionId)
}
one, err := query.
Table(this.partialTable(userId)).
Attr("day", day).
Attr("userId", userId).
Result("SUM(totalBytes) AS totalBytes", "SUM(cachedBytes) AS cachedBytes", "SUM(attackBytes) AS attackBytes", "SUM(countRequests) AS countRequests", "SUM(countCachedRequests) AS countCachedRequests", "SUM(countAttackRequests) AS countAttackRequests").
Find()
if err != nil || one == nil {
return nil, err
}
return one.(*UserBandwidthStat), nil
}
// SumDailyStat 获取某天内的流量
// dayFrom 格式为YYYYMMDD
// dayTo 格式为YYYYMMDD
func (this *UserBandwidthStatDAO) SumDailyStat(tx *dbs.Tx, userId int64, regionId int64, dayFrom string, dayTo string) (stat *pb.ServerDailyStat, err error) {
if !regexputils.YYYYMMDD.MatchString(dayFrom) {
return nil, errors.New("invalid dayFrom '" + dayFrom + "'")
}
if !regexputils.YYYYMMDD.MatchString(dayTo) {
return nil, errors.New("invalid dayTo '" + dayTo + "'")
}
// 兼容以往版本
hasFullData, err := this.HasFullData(tx, userId, dayFrom[:6])
if err != nil {
return nil, err
}
if !hasFullData {
return SharedServerDailyStatDAO.compatSumDailyStat(tx, userId, 0, regionId, dayFrom, dayTo)
}
stat = &pb.ServerDailyStat{}
if userId <= 0 {
return
}
if dayFrom > dayTo {
dayFrom, dayTo = dayTo, dayFrom
}
var query = this.Query(tx).
Table(this.partialTable(userId)).
Result("SUM(totalBytes) AS totalBytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests, SUM(countAttackRequests) AS countAttackRequests, SUM(attackBytes) AS attackBytes")
query.Attr("userId", userId)
if regionId > 0 {
query.Attr("regionId", regionId)
}
if dayFrom == dayTo {
query.Attr("day", dayFrom)
} else {
query.Between("day", dayFrom, dayTo)
}
one, _, err := query.FindOne()
if err != nil {
return nil, err
}
if one == nil {
return
}
stat.Bytes = one.GetInt64("totalBytes")
stat.CachedBytes = one.GetInt64("cachedBytes")
stat.CountRequests = one.GetInt64("countRequests")
stat.CountCachedRequests = one.GetInt64("countCachedRequests")
stat.CountAttackRequests = one.GetInt64("countAttackRequests")
stat.AttackBytes = one.GetInt64("attackBytes")
return
}
// Clean 清理过期数据
func (this *UserBandwidthStatDAO) Clean(tx *dbs.Tx) error {
var day = timeutil.Format("Ymd", time.Now().AddDate(0, 0, -100)) // 保留大约3个月的数据
@@ -375,3 +515,53 @@ func (this *UserBandwidthStatDAO) fixUserStat(stat *UserBandwidthStat, useAvg bo
}
return stat
}
// HasFullData 检查一个月是否完整数据
// 是为了兼容以前数据,以前的表中没有缓存流量、请求数等字段
func (this *UserBandwidthStatDAO) HasFullData(tx *dbs.Tx, userId int64, month string) (bool, error) {
if !regexputils.YYYYMM.MatchString(month) {
return false, errors.New("invalid month '" + month + "'")
}
// 仅供调试
if Tea.IsTesting() {
return true, nil
}
fullDataLocker.Lock()
hasData, ok := fullDataMap[month]
fullDataLocker.Unlock()
if ok {
return hasData, nil
}
var year = types.Int(month[:4])
var monthInt = types.Int(month[4:])
if year < 2000 || monthInt > 12 || monthInt < 1 {
return false, nil
}
var lastMonth = monthInt - 1
if lastMonth == 0 {
lastMonth = 12
year--
}
var lastMonthString = fmt.Sprintf("%d%02d", year, lastMonth)
one, err := this.Query(tx).
Table(this.partialTable(userId)).
Between("day", lastMonthString+"01", lastMonthString+"31").
DescPk().
Find()
if err != nil {
return false, err
}
var b = one != nil && one.(*UserBandwidthStat).CountRequests > 0
fullDataLocker.Lock()
fullDataMap[month] = b
fullDataLocker.Unlock()
return b, nil
}