Files
EdgeAPI/internal/db/models/server_bandwidth_stat_dao.go

622 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package models
import (
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeAPI/internal/utils/regexputils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
timeutil "github.com/iwind/TeaGo/utils/time"
"math"
"regexp"
"strings"
"sync"
"time"
)
type ServerBandwidthStatDAO dbs.DAO
const (
ServerBandwidthStatTablePartials = 20 // 分表数量
)
func init() {
dbs.OnReadyDone(func() {
// 清理数据任务
var ticker = time.NewTicker(time.Duration(rands.Int(24, 48)) * time.Hour)
goman.New(func() {
for range ticker.C {
err := SharedServerBandwidthStatDAO.Clean(nil)
if err != nil {
remotelogs.Error("SharedServerBandwidthStatDAO", "clean expired data failed: "+err.Error())
}
}
})
})
}
func NewServerBandwidthStatDAO() *ServerBandwidthStatDAO {
return dbs.NewDAO(&ServerBandwidthStatDAO{
DAOObject: dbs.DAOObject{
DB: Tea.Env,
Table: "edgeServerBandwidthStats",
Model: new(ServerBandwidthStat),
PkName: "id",
},
}).(*ServerBandwidthStatDAO)
}
var SharedServerBandwidthStatDAO *ServerBandwidthStatDAO
func init() {
dbs.OnReady(func() {
SharedServerBandwidthStatDAO = NewServerBandwidthStatDAO()
})
}
// UpdateServerBandwidth 写入数据
// 暂时不使用region区分
func (this *ServerBandwidthStatDAO) UpdateServerBandwidth(tx *dbs.Tx, userId int64, serverId int64, day string, timeAt string, bytes int64, totalBytes int64) error {
if serverId <= 0 {
return errors.New("invalid server id '" + types.String(serverId) + "'")
}
return this.Query(tx).
Table(this.partialTable(serverId)).
Param("bytes", bytes).
Param("totalBytes", totalBytes).
InsertOrUpdateQuickly(maps.Map{
"userId": userId,
"serverId": serverId,
"day": day,
"timeAt": timeAt,
"bytes": bytes,
"totalBytes": totalBytes,
"avgBytes": totalBytes / 300,
}, maps.Map{
"bytes": dbs.SQL("bytes+:bytes"),
"avgBytes": dbs.SQL("(totalBytes+:totalBytes)/300"), // 因为生成SQL语句时会自动将avgBytes排在totalBytes之前所以这里不用担心先后顺序的问题
"totalBytes": dbs.SQL("totalBytes+:totalBytes"),
})
}
// FindMinutelyPeekBandwidthBytes 获取某分钟的带宽峰值
// day YYYYMMDD
// minute HHII
func (this *ServerBandwidthStatDAO) FindMinutelyPeekBandwidthBytes(tx *dbs.Tx, serverId int64, day string, minute string, useAvg bool) (int64, error) {
return this.Query(tx).
Table(this.partialTable(serverId)).
Attr("serverId", serverId).
Result(this.bytesField(useAvg)).
Attr("day", day).
Attr("timeAt", minute).
FindInt64Col(0)
}
// FindHourlyBandwidthStats 按小时获取带宽峰值
func (this *ServerBandwidthStatDAO) FindHourlyBandwidthStats(tx *dbs.Tx, serverId int64, hours int32, useAvg bool) (result []*pb.FindHourlyServerBandwidthStatsResponse_Stat, err error) {
if hours <= 0 {
hours = 24
}
var timestamp = time.Now().Unix() - int64(hours)*3600
ones, _, err := this.Query(tx).
Table(this.partialTable(serverId)).
Attr("serverId", serverId).
Result(this.maxBytesField(useAvg), "CONCAT(day, '.', SUBSTRING(timeAt, 1, 2)) AS fullTime").
Gte("CONCAT(day, '.', SUBSTRING(timeAt, 1, 2))", timeutil.FormatTime("Ymd.H", timestamp)).
Group("fullTime").
FindOnes()
if err != nil {
return nil, err
}
var m = map[string]*pb.FindHourlyServerBandwidthStatsResponse_Stat{}
for _, one := range ones {
var fullTime = one.GetString("fullTime")
var timePieces = strings.Split(fullTime, ".")
var day = timePieces[0]
var hour = timePieces[1]
var bytes = one.GetInt64("bytes")
m[day+hour] = &pb.FindHourlyServerBandwidthStatsResponse_Stat{
Bytes: bytes,
Bits: bytes * 8,
Day: day,
Hour: types.Int32(hour),
}
}
fullHours, err := utils.RangeHours(timeutil.FormatTime("YmdH", timestamp), timeutil.Format("YmdH"))
if err != nil {
return nil, err
}
for _, fullHour := range fullHours {
stat, ok := m[fullHour]
if ok {
result = append(result, stat)
} else {
result = append(result, &pb.FindHourlyServerBandwidthStatsResponse_Stat{
Bytes: 0,
Bits: 0,
Day: fullHour[:8],
Hour: types.Int32(fullHour[8:]),
})
}
}
return result, nil
}
// FindDailyPeekBandwidthBytes 获取某天的带宽峰值
// day YYYYMMDD
func (this *ServerBandwidthStatDAO) FindDailyPeekBandwidthBytes(tx *dbs.Tx, serverId int64, day string, useAvg bool) (int64, error) {
return this.Query(tx).
Table(this.partialTable(serverId)).
Attr("serverId", serverId).
Attr("day", day).
Result(this.maxBytesField(useAvg)).
FindInt64Col(0)
}
// FindDailyBandwidthStats 按天获取带宽峰值
func (this *ServerBandwidthStatDAO) FindDailyBandwidthStats(tx *dbs.Tx, serverId int64, days int32, useAvg bool) (result []*pb.FindDailyServerBandwidthStatsResponse_Stat, err error) {
if days <= 0 {
days = 14
}
var timestamp = time.Now().Unix() - int64(days)*86400
ones, _, err := this.Query(tx).
Table(this.partialTable(serverId)).
Result(this.maxBytesField(useAvg), "day").
Attr("serverId", serverId).
Gte("day", timeutil.FormatTime("Ymd", timestamp)).
Group("day").
FindOnes()
if err != nil {
return nil, err
}
var m = map[string]*pb.FindDailyServerBandwidthStatsResponse_Stat{}
for _, one := range ones {
var day = one.GetString("day")
var bytes = one.GetInt64("bytes")
m[day] = &pb.FindDailyServerBandwidthStatsResponse_Stat{
Bytes: bytes,
Bits: bytes * 8,
Day: day,
}
}
allDays, err := utils.RangeDays(timeutil.FormatTime("Ymd", timestamp), timeutil.Format("Ymd"))
if err != nil {
return nil, err
}
for _, day := range allDays {
stat, ok := m[day]
if ok {
result = append(result, stat)
} else {
result = append(result, &pb.FindDailyServerBandwidthStatsResponse_Stat{
Bytes: 0,
Bits: 0,
Day: day,
})
}
}
return result, nil
}
// FindBandwidthStatsBetweenDays 查找日期段内的带宽峰值
// dayFrom YYYYMMDD
// dayTo YYYYMMDD
func (this *ServerBandwidthStatDAO) FindBandwidthStatsBetweenDays(tx *dbs.Tx, serverId int64, dayFrom string, dayTo string, useAvg bool) (result []*pb.FindDailyServerBandwidthStatsBetweenDaysResponse_Stat, err error) {
if serverId <= 0 {
return nil, nil
}
if !regexputils.YYYYMMDD.MatchString(dayFrom) {
return nil, errors.New("invalid dayFrom '" + dayFrom + "'")
}
if !regexputils.YYYYMMDD.MatchString(dayTo) {
return nil, errors.New("invalid dayTo '" + dayTo + "'")
}
if dayFrom > dayTo {
dayFrom, dayTo = dayTo, dayFrom
}
ones, _, err := this.Query(tx).
Table(this.partialTable(serverId)).
Result(this.bytesField(useAvg), "day", "timeAt").
Attr("serverId", serverId).
Between("day", dayFrom, dayTo).
FindOnes()
if err != nil {
return nil, err
}
var m = map[string]*pb.FindDailyServerBandwidthStatsBetweenDaysResponse_Stat{}
for _, one := range ones {
var day = one.GetString("day")
var bytes = one.GetInt64("bytes")
var timeAt = one.GetString("timeAt")
var key = day + "@" + timeAt
m[key] = &pb.FindDailyServerBandwidthStatsBetweenDaysResponse_Stat{
Bytes: bytes,
Bits: bytes * 8,
Day: day,
TimeAt: timeAt,
}
}
allDays, err := utils.RangeDays(dayFrom, dayTo)
if err != nil {
return nil, err
}
dayTimes, err := utils.Range24HourTimes(5)
if err != nil {
return nil, err
}
// 截止到当前时间
var currentTime = timeutil.Format("Ymd@Hi")
for _, day := range allDays {
for _, timeAt := range dayTimes {
var key = day + "@" + timeAt
if key >= currentTime {
break
}
stat, ok := m[key]
if ok {
result = append(result, stat)
} else {
result = append(result, &pb.FindDailyServerBandwidthStatsBetweenDaysResponse_Stat{
Day: day,
TimeAt: timeAt,
})
}
}
}
return result, nil
}
// FindMonthlyPeekBandwidthBytes 获取某月的带宽峰值
// month YYYYMM
func (this *ServerBandwidthStatDAO) FindMonthlyPeekBandwidthBytes(tx *dbs.Tx, serverId int64, month string, useAvg bool) (int64, error) {
return this.Query(tx).
Table(this.partialTable(serverId)).
Attr("serverId", serverId).
Between("day", month+"01", month+"31").
Result(this.maxBytesField(useAvg)).
FindInt64Col(0)
}
// FindServerStats 查找某个时间段的带宽统计
// 参数:
// - day YYYYMMDD
// - timeAt HHII
func (this *ServerBandwidthStatDAO) FindServerStats(tx *dbs.Tx, serverId int64, day string, timeFrom string, timeTo string, useAvg bool) (result []*ServerBandwidthStat, err error) {
_, err = this.Query(tx).
Table(this.partialTable(serverId)).
Attr("serverId", serverId).
Attr("day", day).
Between("timeAt", timeFrom, timeTo).
Slice(&result).
FindAll()
// 使用平均带宽
this.fixServerStats(result, useAvg)
return
}
// FindAllServerStatsWithDay 查找某个服务的当天的所有带宽峰值
// day YYYYMMDD
func (this *ServerBandwidthStatDAO) FindAllServerStatsWithDay(tx *dbs.Tx, serverId int64, day string, useAvg bool) (result []*ServerBandwidthStat, err error) {
_, err = this.Query(tx).
Table(this.partialTable(serverId)).
Attr("serverId", serverId).
Attr("day", day).
AscPk().
Slice(&result).
FindAll()
// 使用平均带宽
this.fixServerStats(result, useAvg)
return
}
// FindAllServerStatsWithMonth 查找某个服务的当月的所有带宽峰值
// month YYYYMM
func (this *ServerBandwidthStatDAO) FindAllServerStatsWithMonth(tx *dbs.Tx, serverId int64, month string, useAvg bool) (result []*ServerBandwidthStat, err error) {
_, err = this.Query(tx).
Table(this.partialTable(serverId)).
Attr("serverId", serverId).
Between("day", month+"01", month+"31").
AscPk().
Slice(&result).
FindAll()
// 使用平均带宽
this.fixServerStats(result, useAvg)
return
}
// FindMonthlyPercentile 获取某月内百分位
func (this *ServerBandwidthStatDAO) FindMonthlyPercentile(tx *dbs.Tx, serverId int64, month string, percentile int, useAvg bool) (result int64, err error) {
if percentile <= 0 {
percentile = 95
}
// 如果是100%以上,则快速返回
if percentile >= 100 {
result, err = this.Query(tx).
Table(this.partialTable(serverId)).
Attr("serverId", serverId).
Result(this.bytesField(useAvg)).
Between("day", month+"01", month+"31").
Desc("bytes").
Limit(1).
FindInt64Col(0)
return
}
// 总数量
total, err := this.Query(tx).
Table(this.partialTable(serverId)).
Attr("serverId", serverId).
Between("day", month+"01", month+"31").
Count()
if err != nil {
return 0, err
}
if total == 0 {
return 0, nil
}
var offset int64
if total > 1 {
offset = int64(math.Ceil(float64(total) * float64(100-percentile) / 100))
}
// 查询 nth 位置
result, err = this.Query(tx).
Table(this.partialTable(serverId)).
Attr("serverId", serverId).
Result(this.bytesField(useAvg)).
Between("day", month+"01", month+"31").
Desc("bytes").
Offset(offset).
Limit(1).
FindInt64Col(0)
return
}
// FindPercentileBetweenDays 获取日期段内内百分位
func (this *ServerBandwidthStatDAO) FindPercentileBetweenDays(tx *dbs.Tx, serverId int64, dayFrom string, dayTo string, percentile int32, useAvg bool) (result *ServerBandwidthStat, err error) {
if dayFrom > dayTo {
dayFrom, dayTo = dayTo, dayFrom
}
if percentile <= 0 {
percentile = 95
}
// 如果是100%以上,则快速返回
if percentile >= 100 {
one, err := this.Query(tx).
Table(this.partialTable(serverId)).
Attr("serverId", serverId).
Between("day", dayFrom, dayTo).
Desc(this.bytesOrderField(useAvg)).
Find()
if err != nil || one == nil {
return nil, err
}
return this.fixServerStat(one.(*ServerBandwidthStat), useAvg), nil
}
// 总数量
total, err := this.Query(tx).
Table(this.partialTable(serverId)).
Attr("serverId", serverId).
Between("day", dayFrom, dayTo).
Count()
if err != nil {
return nil, err
}
if total == 0 {
return nil, nil
}
var offset int64
if total > 1 {
offset = int64(math.Ceil(float64(total) * float64(100-percentile) / 100))
}
// 查询 nth 位置
one, err := this.Query(tx).
Table(this.partialTable(serverId)).
Attr("serverId", serverId).
Between("day", dayFrom, dayTo).
Desc(this.bytesOrderField(useAvg)).
Offset(offset).
Find()
if err != nil || one == nil {
return nil, err
}
return this.fixServerStat(one.(*ServerBandwidthStat), useAvg), nil
}
// FindPercentileBetweenTimes 获取时间段内内百分位
// timeFrom 开始时间,格式 YYYYMMDDHHII
// timeTo 结束时间,格式 YYYYMMDDHHII
func (this *ServerBandwidthStatDAO) FindPercentileBetweenTimes(tx *dbs.Tx, serverId int64, timeFrom string, timeTo string, percentile int32, useAvg bool) (result *ServerBandwidthStat, err error) {
var reg = regexp.MustCompile(`^\d{12}$`)
if !reg.MatchString(timeFrom) {
return nil, errors.New("invalid timeFrom '" + timeFrom + "'")
}
if !reg.MatchString(timeTo) {
return nil, errors.New("invalid timeTo '" + timeTo + "'")
}
if timeFrom > timeTo {
timeFrom, timeTo = timeTo, timeFrom
}
if percentile <= 0 {
percentile = 95
}
// 如果是100%以上,则快速返回
if percentile >= 100 {
one, err := this.Query(tx).
Table(this.partialTable(serverId)).
Attr("serverId", serverId).
Between("CONCAT(day, timeAt)", timeFrom, timeTo).
Desc(this.bytesOrderField(useAvg)).
Find()
if err != nil || one == nil {
return nil, err
}
return this.fixServerStat(one.(*ServerBandwidthStat), useAvg), nil
}
// 总数量
total, err := this.Query(tx).
Table(this.partialTable(serverId)).
Attr("serverId", serverId).
Between("CONCAT(day, timeAt)", timeFrom, timeTo).
Count()
if err != nil {
return nil, err
}
if total == 0 {
return nil, nil
}
var offset int64
if total > 1 {
offset = int64(math.Ceil(float64(total) * float64(100-percentile) / 100))
}
// 查询 nth 位置
one, err := this.Query(tx).
Table(this.partialTable(serverId)).
Attr("serverId", serverId).
Between("CONCAT(day, timeAt)", timeFrom, timeTo).
Desc(this.bytesOrderField(useAvg)).
Offset(offset).
Find()
if err != nil || one == nil {
return nil, err
}
return this.fixServerStat(one.(*ServerBandwidthStat), useAvg), nil
}
// Clean 清理过期数据
func (this *ServerBandwidthStatDAO) Clean(tx *dbs.Tx) error {
var day = timeutil.Format("Ymd", time.Now().AddDate(0, 0, -100)) // 保留大约3个月的数据
return this.runBatch(func(table string, locker *sync.Mutex) error {
_, err := this.Query(tx).
Table(table).
Lt("day", day).
Delete()
return err
})
}
// 批量执行
func (this *ServerBandwidthStatDAO) runBatch(f func(table string, locker *sync.Mutex) error) error {
var locker = &sync.Mutex{}
var wg = sync.WaitGroup{}
wg.Add(ServerBandwidthStatTablePartials)
var resultErr error
for i := 0; i < ServerBandwidthStatTablePartials; i++ {
var table = this.partialTable(int64(i))
go func(table string) {
defer wg.Done()
err := f(table, locker)
if err != nil {
resultErr = err
}
}(table)
}
wg.Wait()
return resultErr
}
// 获取分区表
func (this *ServerBandwidthStatDAO) partialTable(serverId int64) string {
return this.Table + "_" + types.String(serverId%int64(ServerBandwidthStatTablePartials))
}
// 获取字节字段
func (this *ServerBandwidthStatDAO) bytesField(useAvg bool) string {
if useAvg {
return "avgBytes AS bytes"
}
return "bytes"
}
// 获取最大字节字段
func (this *ServerBandwidthStatDAO) maxBytesField(useAvg bool) string {
if useAvg {
return "MAX(avgBytes) AS bytes"
}
return "MAX(bytes) AS bytes"
}
// 获取排序字段
func (this *ServerBandwidthStatDAO) bytesOrderField(useAvg bool) string {
if useAvg {
return "avgBytes"
}
return "bytes"
}
func (this *ServerBandwidthStatDAO) fixServerStat(stat *ServerBandwidthStat, useAvg bool) *ServerBandwidthStat {
if stat == nil {
return nil
}
if useAvg {
stat.Bytes = stat.AvgBytes
}
return stat
}
func (this *ServerBandwidthStatDAO) fixServerStats(stats []*ServerBandwidthStat, useAvg bool) {
if useAvg {
for _, stat := range stats {
stat.Bytes = stat.AvgBytes
}
}
}