实现峰值带宽和平均带宽两种带宽算法

This commit is contained in:
刘祥超
2023-02-27 10:47:25 +08:00
parent e89e11c421
commit e13abfc09e
12 changed files with 404 additions and 181 deletions

View File

@@ -61,7 +61,7 @@ func init() {
}
// UpdateUserBandwidth 写入数据
func (this *UserBandwidthStatDAO) UpdateUserBandwidth(tx *dbs.Tx, userId int64, regionId int64, day string, timeAt string, bytes int64) error {
func (this *UserBandwidthStatDAO) UpdateUserBandwidth(tx *dbs.Tx, userId int64, regionId int64, day string, timeAt string, bytes int64, totalBytes int64) error {
if userId <= 0 {
// 如果用户ID不大于0则说明服务不属于任何用户此时不需要处理
return nil
@@ -70,23 +70,28 @@ func (this *UserBandwidthStatDAO) UpdateUserBandwidth(tx *dbs.Tx, userId int64,
return this.Query(tx).
Table(this.partialTable(userId)).
Param("bytes", bytes).
Param("totalBytes", totalBytes).
InsertOrUpdateQuickly(maps.Map{
"userId": userId,
"regionId": regionId,
"day": day,
"timeAt": timeAt,
"bytes": bytes,
"userId": userId,
"regionId": regionId,
"day": day,
"timeAt": timeAt,
"bytes": bytes,
"totalBytes": totalBytes,
"avgBytes": totalBytes / 300,
}, maps.Map{
"bytes": dbs.SQL("bytes+:bytes"),
"bytes": dbs.SQL("bytes+:bytes"),
"avgBytes": dbs.SQL("(totalBytes+:totalBytes)/300"), // 因为生成SQL语句时会自动将avgBytes排在totalBytes之前所以这里不用担心先后顺序的问题
"totalBytes": dbs.SQL("totalBytes+:totalBytes"),
})
}
// FindUserPeekBandwidthInMonth 读取某月带宽峰值
// month YYYYMM
func (this *UserBandwidthStatDAO) FindUserPeekBandwidthInMonth(tx *dbs.Tx, userId int64, month string) (*UserBandwidthStat, error) {
func (this *UserBandwidthStatDAO) FindUserPeekBandwidthInMonth(tx *dbs.Tx, userId int64, month string, useAvg bool) (*UserBandwidthStat, error) {
one, err := this.Query(tx).
Table(this.partialTable(userId)).
Result("MIN(id) AS id", "MIN(userId) AS userId", "day", "timeAt", "SUM(bytes) AS bytes").
Result("MIN(id) AS id", "MIN(userId) AS userId", "day", "timeAt", this.sumBytesField(useAvg)).
Attr("userId", userId).
Between("day", month+"01", month+"31").
Desc("bytes").
@@ -101,7 +106,7 @@ func (this *UserBandwidthStatDAO) FindUserPeekBandwidthInMonth(tx *dbs.Tx, userI
// FindPercentileBetweenDays 获取日期段内内百分位
// regionId 如果为 -1 表示没有区域的带宽;如果为 0 表示所有区域的带宽
func (this *UserBandwidthStatDAO) FindPercentileBetweenDays(tx *dbs.Tx, userId int64, regionId int64, dayFrom string, dayTo string, percentile int32) (result *UserBandwidthStat, err error) {
func (this *UserBandwidthStatDAO) FindPercentileBetweenDays(tx *dbs.Tx, userId int64, regionId int64, dayFrom string, dayTo string, percentile int32, useAvg bool) (result *UserBandwidthStat, err error) {
if dayFrom > dayTo {
dayFrom, dayTo = dayTo, dayFrom
}
@@ -120,7 +125,7 @@ func (this *UserBandwidthStatDAO) FindPercentileBetweenDays(tx *dbs.Tx, userId i
query.Attr("regionId", 0)
}
one, err := query.
Result("MIN(id) AS id", "MIN(userId) AS userId", "day", "timeAt", "SUM(bytes) AS bytes").
Result("MIN(id) AS id", "MIN(userId) AS userId", "day", "timeAt", this.sumBytesField(useAvg)).
Attr("userId", userId).
Between("day", dayFrom, dayTo).
Desc("bytes").
@@ -168,7 +173,7 @@ func (this *UserBandwidthStatDAO) FindPercentileBetweenDays(tx *dbs.Tx, userId i
query.Attr("regionId", 0)
}
one, err := query.
Result("MIN(id) AS id", "MIN(userId) AS userId", "day", "timeAt", "SUM(bytes) AS bytes").
Result("MIN(id) AS id", "MIN(userId) AS userId", "day", "timeAt", this.sumBytesField(useAvg)).
Attr("userId", userId).
Between("day", dayFrom, dayTo).
Desc("bytes").
@@ -185,10 +190,10 @@ func (this *UserBandwidthStatDAO) FindPercentileBetweenDays(tx *dbs.Tx, userId i
// FindUserPeekBandwidthInDay 读取某日带宽峰值
// day YYYYMMDD
func (this *UserBandwidthStatDAO) FindUserPeekBandwidthInDay(tx *dbs.Tx, userId int64, day string) (*UserBandwidthStat, error) {
func (this *UserBandwidthStatDAO) FindUserPeekBandwidthInDay(tx *dbs.Tx, userId int64, day string, useAvg bool) (*UserBandwidthStat, error) {
one, err := this.Query(tx).
Table(this.partialTable(userId)).
Result("MIN(id) AS id", "MIN(userId) AS userId", "MIN(day) AS day", "timeAt", "SUM(bytes) AS bytes").
Result("MIN(id) AS id", "MIN(userId) AS userId", "MIN(day) AS day", "timeAt", this.sumBytesField(useAvg)).
Attr("userId", userId).
Attr("day", day).
Desc("bytes").
@@ -203,7 +208,7 @@ func (this *UserBandwidthStatDAO) FindUserPeekBandwidthInDay(tx *dbs.Tx, userId
// FindUserBandwidthStatsBetweenDays 查找日期段内的带宽峰值
// dayFrom YYYYMMDD
// dayTo YYYYMMDD
func (this *UserBandwidthStatDAO) FindUserBandwidthStatsBetweenDays(tx *dbs.Tx, userId int64, regionId int64, dayFrom string, dayTo string) (result []*pb.FindDailyServerBandwidthStatsBetweenDaysResponse_Stat, err error) {
func (this *UserBandwidthStatDAO) FindUserBandwidthStatsBetweenDays(tx *dbs.Tx, userId int64, regionId int64, dayFrom string, dayTo string, useAvg bool) (result []*pb.FindDailyServerBandwidthStatsBetweenDaysResponse_Stat, err error) {
if userId <= 0 {
return nil, nil
}
@@ -225,7 +230,7 @@ func (this *UserBandwidthStatDAO) FindUserBandwidthStatsBetweenDays(tx *dbs.Tx,
query.Attr("regionId", regionId)
}
ones, _, err := query.
Result("SUM(bytes) AS bytes", "day", "timeAt").
Result(this.sumBytesField(useAvg), "day", "timeAt").
Attr("userId", userId).
Between("day", dayFrom, dayTo).
Group("day").
@@ -352,3 +357,21 @@ func (this *UserBandwidthStatDAO) runBatch(f func(table string, locker *sync.Mut
func (this *UserBandwidthStatDAO) partialTable(userId int64) string {
return this.Table + "_" + types.String(userId%int64(UserBandwidthStatTablePartials))
}
// 获取总数字段
func (this *UserBandwidthStatDAO) sumBytesField(useAvg bool) string {
if useAvg {
return "SUM(avgBytes) AS bytes"
}
return "SUM(bytes) AS bytes"
}
func (this *UserBandwidthStatDAO) fixUserStat(stat *UserBandwidthStat, useAvg bool) *UserBandwidthStat {
if stat == nil {
return nil
}
if useAvg {
stat.Bytes = stat.AvgBytes
}
return stat
}