mirror of
				https://github.com/TeaOSLab/EdgeAPI.git
				synced 2025-11-04 16:00:24 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			622 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			622 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package models
 | 
						||
 | 
						||
import (
 | 
						||
	"github.com/TeaOSLab/EdgeAPI/internal/errors"
 | 
						||
	"github.com/TeaOSLab/EdgeAPI/internal/goman"
 | 
						||
	"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
 | 
						||
	"github.com/TeaOSLab/EdgeAPI/internal/utils"
 | 
						||
	"github.com/TeaOSLab/EdgeAPI/internal/utils/regexputils"
 | 
						||
	"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
 | 
						||
	_ "github.com/go-sql-driver/mysql"
 | 
						||
	"github.com/iwind/TeaGo/Tea"
 | 
						||
	"github.com/iwind/TeaGo/dbs"
 | 
						||
	"github.com/iwind/TeaGo/maps"
 | 
						||
	"github.com/iwind/TeaGo/rands"
 | 
						||
	"github.com/iwind/TeaGo/types"
 | 
						||
	timeutil "github.com/iwind/TeaGo/utils/time"
 | 
						||
	"math"
 | 
						||
	"regexp"
 | 
						||
	"strings"
 | 
						||
	"sync"
 | 
						||
	"time"
 | 
						||
)
 | 
						||
 | 
						||
type ServerBandwidthStatDAO dbs.DAO
 | 
						||
 | 
						||
const (
 | 
						||
	ServerBandwidthStatTablePartials = 20 // 分表数量
 | 
						||
)
 | 
						||
 | 
						||
func init() {
 | 
						||
	dbs.OnReadyDone(func() {
 | 
						||
		// 清理数据任务
 | 
						||
		var ticker = time.NewTicker(time.Duration(rands.Int(24, 48)) * time.Hour)
 | 
						||
		goman.New(func() {
 | 
						||
			for range ticker.C {
 | 
						||
				err := SharedServerBandwidthStatDAO.Clean(nil)
 | 
						||
				if err != nil {
 | 
						||
					remotelogs.Error("SharedServerBandwidthStatDAO", "clean expired data failed: "+err.Error())
 | 
						||
				}
 | 
						||
			}
 | 
						||
		})
 | 
						||
	})
 | 
						||
}
 | 
						||
 | 
						||
func NewServerBandwidthStatDAO() *ServerBandwidthStatDAO {
 | 
						||
	return dbs.NewDAO(&ServerBandwidthStatDAO{
 | 
						||
		DAOObject: dbs.DAOObject{
 | 
						||
			DB:     Tea.Env,
 | 
						||
			Table:  "edgeServerBandwidthStats",
 | 
						||
			Model:  new(ServerBandwidthStat),
 | 
						||
			PkName: "id",
 | 
						||
		},
 | 
						||
	}).(*ServerBandwidthStatDAO)
 | 
						||
}
 | 
						||
 | 
						||
var SharedServerBandwidthStatDAO *ServerBandwidthStatDAO
 | 
						||
 | 
						||
func init() {
 | 
						||
	dbs.OnReady(func() {
 | 
						||
		SharedServerBandwidthStatDAO = NewServerBandwidthStatDAO()
 | 
						||
	})
 | 
						||
}
 | 
						||
 | 
						||
// UpdateServerBandwidth 写入数据
 | 
						||
// 暂时不使用region区分
 | 
						||
func (this *ServerBandwidthStatDAO) UpdateServerBandwidth(tx *dbs.Tx, userId int64, serverId int64, day string, timeAt string, bytes int64, totalBytes int64) error {
 | 
						||
	if serverId <= 0 {
 | 
						||
		return errors.New("invalid server id '" + types.String(serverId) + "'")
 | 
						||
	}
 | 
						||
 | 
						||
	return this.Query(tx).
 | 
						||
		Table(this.partialTable(serverId)).
 | 
						||
		Param("bytes", bytes).
 | 
						||
		Param("totalBytes", totalBytes).
 | 
						||
		InsertOrUpdateQuickly(maps.Map{
 | 
						||
			"userId":     userId,
 | 
						||
			"serverId":   serverId,
 | 
						||
			"day":        day,
 | 
						||
			"timeAt":     timeAt,
 | 
						||
			"bytes":      bytes,
 | 
						||
			"totalBytes": totalBytes,
 | 
						||
			"avgBytes":   totalBytes / 300,
 | 
						||
		}, maps.Map{
 | 
						||
			"bytes":      dbs.SQL("bytes+:bytes"),
 | 
						||
			"avgBytes":   dbs.SQL("(totalBytes+:totalBytes)/300"), // 因为生成SQL语句时会自动将avgBytes排在totalBytes之前,所以这里不用担心先后顺序的问题
 | 
						||
			"totalBytes": dbs.SQL("totalBytes+:totalBytes"),
 | 
						||
		})
 | 
						||
}
 | 
						||
 | 
						||
// FindMinutelyPeekBandwidthBytes 获取某分钟的带宽峰值
 | 
						||
// day YYYYMMDD
 | 
						||
// minute HHII
 | 
						||
func (this *ServerBandwidthStatDAO) FindMinutelyPeekBandwidthBytes(tx *dbs.Tx, serverId int64, day string, minute string, useAvg bool) (int64, error) {
 | 
						||
	return this.Query(tx).
 | 
						||
		Table(this.partialTable(serverId)).
 | 
						||
		Attr("serverId", serverId).
 | 
						||
		Result(this.bytesField(useAvg)).
 | 
						||
		Attr("day", day).
 | 
						||
		Attr("timeAt", minute).
 | 
						||
		FindInt64Col(0)
 | 
						||
}
 | 
						||
 | 
						||
// FindHourlyBandwidthStats 按小时获取带宽峰值
 | 
						||
func (this *ServerBandwidthStatDAO) FindHourlyBandwidthStats(tx *dbs.Tx, serverId int64, hours int32, useAvg bool) (result []*pb.FindHourlyServerBandwidthStatsResponse_Stat, err error) {
 | 
						||
	if hours <= 0 {
 | 
						||
		hours = 24
 | 
						||
	}
 | 
						||
 | 
						||
	var timestamp = time.Now().Unix() - int64(hours)*3600
 | 
						||
 | 
						||
	ones, _, err := this.Query(tx).
 | 
						||
		Table(this.partialTable(serverId)).
 | 
						||
		Attr("serverId", serverId).
 | 
						||
		Result(this.maxBytesField(useAvg), "CONCAT(day, '.', SUBSTRING(timeAt, 1, 2)) AS fullTime").
 | 
						||
		Gte("CONCAT(day, '.', SUBSTRING(timeAt, 1, 2))", timeutil.FormatTime("Ymd.H", timestamp)).
 | 
						||
		Group("fullTime").
 | 
						||
		FindOnes()
 | 
						||
	if err != nil {
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
 | 
						||
	var m = map[string]*pb.FindHourlyServerBandwidthStatsResponse_Stat{}
 | 
						||
	for _, one := range ones {
 | 
						||
		var fullTime = one.GetString("fullTime")
 | 
						||
		var timePieces = strings.Split(fullTime, ".")
 | 
						||
		var day = timePieces[0]
 | 
						||
		var hour = timePieces[1]
 | 
						||
		var bytes = one.GetInt64("bytes")
 | 
						||
 | 
						||
		m[day+hour] = &pb.FindHourlyServerBandwidthStatsResponse_Stat{
 | 
						||
			Bytes: bytes,
 | 
						||
			Bits:  bytes * 8,
 | 
						||
			Day:   day,
 | 
						||
			Hour:  types.Int32(hour),
 | 
						||
		}
 | 
						||
	}
 | 
						||
 | 
						||
	fullHours, err := utils.RangeHours(timeutil.FormatTime("YmdH", timestamp), timeutil.Format("YmdH"))
 | 
						||
	if err != nil {
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
	for _, fullHour := range fullHours {
 | 
						||
		stat, ok := m[fullHour]
 | 
						||
		if ok {
 | 
						||
			result = append(result, stat)
 | 
						||
		} else {
 | 
						||
			result = append(result, &pb.FindHourlyServerBandwidthStatsResponse_Stat{
 | 
						||
				Bytes: 0,
 | 
						||
				Bits:  0,
 | 
						||
				Day:   fullHour[:8],
 | 
						||
				Hour:  types.Int32(fullHour[8:]),
 | 
						||
			})
 | 
						||
		}
 | 
						||
	}
 | 
						||
 | 
						||
	return result, nil
 | 
						||
}
 | 
						||
 | 
						||
// FindDailyPeekBandwidthBytes 获取某天的带宽峰值
 | 
						||
// day YYYYMMDD
 | 
						||
func (this *ServerBandwidthStatDAO) FindDailyPeekBandwidthBytes(tx *dbs.Tx, serverId int64, day string, useAvg bool) (int64, error) {
 | 
						||
	return this.Query(tx).
 | 
						||
		Table(this.partialTable(serverId)).
 | 
						||
		Attr("serverId", serverId).
 | 
						||
		Attr("day", day).
 | 
						||
		Result(this.maxBytesField(useAvg)).
 | 
						||
		FindInt64Col(0)
 | 
						||
}
 | 
						||
 | 
						||
// FindDailyBandwidthStats 按天获取带宽峰值
 | 
						||
func (this *ServerBandwidthStatDAO) FindDailyBandwidthStats(tx *dbs.Tx, serverId int64, days int32, useAvg bool) (result []*pb.FindDailyServerBandwidthStatsResponse_Stat, err error) {
 | 
						||
	if days <= 0 {
 | 
						||
		days = 14
 | 
						||
	}
 | 
						||
 | 
						||
	var timestamp = time.Now().Unix() - int64(days)*86400
 | 
						||
	ones, _, err := this.Query(tx).
 | 
						||
		Table(this.partialTable(serverId)).
 | 
						||
		Result(this.maxBytesField(useAvg), "day").
 | 
						||
		Attr("serverId", serverId).
 | 
						||
		Gte("day", timeutil.FormatTime("Ymd", timestamp)).
 | 
						||
		Group("day").
 | 
						||
		FindOnes()
 | 
						||
	if err != nil {
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
 | 
						||
	var m = map[string]*pb.FindDailyServerBandwidthStatsResponse_Stat{}
 | 
						||
	for _, one := range ones {
 | 
						||
		var day = one.GetString("day")
 | 
						||
		var bytes = one.GetInt64("bytes")
 | 
						||
 | 
						||
		m[day] = &pb.FindDailyServerBandwidthStatsResponse_Stat{
 | 
						||
			Bytes: bytes,
 | 
						||
			Bits:  bytes * 8,
 | 
						||
			Day:   day,
 | 
						||
		}
 | 
						||
	}
 | 
						||
 | 
						||
	allDays, err := utils.RangeDays(timeutil.FormatTime("Ymd", timestamp), timeutil.Format("Ymd"))
 | 
						||
	if err != nil {
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
	for _, day := range allDays {
 | 
						||
		stat, ok := m[day]
 | 
						||
		if ok {
 | 
						||
			result = append(result, stat)
 | 
						||
		} else {
 | 
						||
			result = append(result, &pb.FindDailyServerBandwidthStatsResponse_Stat{
 | 
						||
				Bytes: 0,
 | 
						||
				Bits:  0,
 | 
						||
				Day:   day,
 | 
						||
			})
 | 
						||
		}
 | 
						||
	}
 | 
						||
 | 
						||
	return result, nil
 | 
						||
}
 | 
						||
 | 
						||
// FindBandwidthStatsBetweenDays 查找日期段内的带宽峰值
 | 
						||
// dayFrom YYYYMMDD
 | 
						||
// dayTo YYYYMMDD
 | 
						||
func (this *ServerBandwidthStatDAO) FindBandwidthStatsBetweenDays(tx *dbs.Tx, serverId int64, dayFrom string, dayTo string, useAvg bool) (result []*pb.FindDailyServerBandwidthStatsBetweenDaysResponse_Stat, err error) {
 | 
						||
	if serverId <= 0 {
 | 
						||
		return nil, nil
 | 
						||
	}
 | 
						||
 | 
						||
	if !regexputils.YYYYMMDD.MatchString(dayFrom) {
 | 
						||
		return nil, errors.New("invalid dayFrom '" + dayFrom + "'")
 | 
						||
	}
 | 
						||
	if !regexputils.YYYYMMDD.MatchString(dayTo) {
 | 
						||
		return nil, errors.New("invalid dayTo '" + dayTo + "'")
 | 
						||
	}
 | 
						||
 | 
						||
	if dayFrom > dayTo {
 | 
						||
		dayFrom, dayTo = dayTo, dayFrom
 | 
						||
	}
 | 
						||
 | 
						||
	ones, _, err := this.Query(tx).
 | 
						||
		Table(this.partialTable(serverId)).
 | 
						||
		Result(this.bytesField(useAvg), "day", "timeAt").
 | 
						||
		Attr("serverId", serverId).
 | 
						||
		Between("day", dayFrom, dayTo).
 | 
						||
		FindOnes()
 | 
						||
	if err != nil {
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
 | 
						||
	var m = map[string]*pb.FindDailyServerBandwidthStatsBetweenDaysResponse_Stat{}
 | 
						||
	for _, one := range ones {
 | 
						||
		var day = one.GetString("day")
 | 
						||
		var bytes = one.GetInt64("bytes")
 | 
						||
		var timeAt = one.GetString("timeAt")
 | 
						||
		var key = day + "@" + timeAt
 | 
						||
 | 
						||
		m[key] = &pb.FindDailyServerBandwidthStatsBetweenDaysResponse_Stat{
 | 
						||
			Bytes:  bytes,
 | 
						||
			Bits:   bytes * 8,
 | 
						||
			Day:    day,
 | 
						||
			TimeAt: timeAt,
 | 
						||
		}
 | 
						||
	}
 | 
						||
 | 
						||
	allDays, err := utils.RangeDays(dayFrom, dayTo)
 | 
						||
	if err != nil {
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
 | 
						||
	dayTimes, err := utils.Range24HourTimes(5)
 | 
						||
	if err != nil {
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
 | 
						||
	// 截止到当前时间
 | 
						||
	var currentTime = timeutil.Format("Ymd@Hi")
 | 
						||
 | 
						||
	for _, day := range allDays {
 | 
						||
		for _, timeAt := range dayTimes {
 | 
						||
			var key = day + "@" + timeAt
 | 
						||
			if key >= currentTime {
 | 
						||
				break
 | 
						||
			}
 | 
						||
 | 
						||
			stat, ok := m[key]
 | 
						||
			if ok {
 | 
						||
				result = append(result, stat)
 | 
						||
			} else {
 | 
						||
				result = append(result, &pb.FindDailyServerBandwidthStatsBetweenDaysResponse_Stat{
 | 
						||
					Day:    day,
 | 
						||
					TimeAt: timeAt,
 | 
						||
				})
 | 
						||
			}
 | 
						||
		}
 | 
						||
	}
 | 
						||
 | 
						||
	return result, nil
 | 
						||
}
 | 
						||
 | 
						||
// FindMonthlyPeekBandwidthBytes 获取某月的带宽峰值
 | 
						||
// month YYYYMM
 | 
						||
func (this *ServerBandwidthStatDAO) FindMonthlyPeekBandwidthBytes(tx *dbs.Tx, serverId int64, month string, useAvg bool) (int64, error) {
 | 
						||
	return this.Query(tx).
 | 
						||
		Table(this.partialTable(serverId)).
 | 
						||
		Attr("serverId", serverId).
 | 
						||
		Between("day", month+"01", month+"31").
 | 
						||
		Result(this.maxBytesField(useAvg)).
 | 
						||
		FindInt64Col(0)
 | 
						||
}
 | 
						||
 | 
						||
// FindServerStats 查找某个时间段的带宽统计
 | 
						||
// 参数:
 | 
						||
//   - day YYYYMMDD
 | 
						||
//   - timeAt HHII
 | 
						||
func (this *ServerBandwidthStatDAO) FindServerStats(tx *dbs.Tx, serverId int64, day string, timeFrom string, timeTo string, useAvg bool) (result []*ServerBandwidthStat, err error) {
 | 
						||
	_, err = this.Query(tx).
 | 
						||
		Table(this.partialTable(serverId)).
 | 
						||
		Attr("serverId", serverId).
 | 
						||
		Attr("day", day).
 | 
						||
		Between("timeAt", timeFrom, timeTo).
 | 
						||
		Slice(&result).
 | 
						||
		FindAll()
 | 
						||
 | 
						||
	// 使用平均带宽
 | 
						||
	this.fixServerStats(result, useAvg)
 | 
						||
 | 
						||
	return
 | 
						||
}
 | 
						||
 | 
						||
// FindAllServerStatsWithDay 查找某个服务的当天的所有带宽峰值
 | 
						||
// day YYYYMMDD
 | 
						||
func (this *ServerBandwidthStatDAO) FindAllServerStatsWithDay(tx *dbs.Tx, serverId int64, day string, useAvg bool) (result []*ServerBandwidthStat, err error) {
 | 
						||
	_, err = this.Query(tx).
 | 
						||
		Table(this.partialTable(serverId)).
 | 
						||
		Attr("serverId", serverId).
 | 
						||
		Attr("day", day).
 | 
						||
		AscPk().
 | 
						||
		Slice(&result).
 | 
						||
		FindAll()
 | 
						||
 | 
						||
	// 使用平均带宽
 | 
						||
	this.fixServerStats(result, useAvg)
 | 
						||
 | 
						||
	return
 | 
						||
}
 | 
						||
 | 
						||
// FindAllServerStatsWithMonth 查找某个服务的当月的所有带宽峰值
 | 
						||
// month YYYYMM
 | 
						||
func (this *ServerBandwidthStatDAO) FindAllServerStatsWithMonth(tx *dbs.Tx, serverId int64, month string, useAvg bool) (result []*ServerBandwidthStat, err error) {
 | 
						||
	_, err = this.Query(tx).
 | 
						||
		Table(this.partialTable(serverId)).
 | 
						||
		Attr("serverId", serverId).
 | 
						||
		Between("day", month+"01", month+"31").
 | 
						||
		AscPk().
 | 
						||
		Slice(&result).
 | 
						||
		FindAll()
 | 
						||
 | 
						||
	// 使用平均带宽
 | 
						||
	this.fixServerStats(result, useAvg)
 | 
						||
 | 
						||
	return
 | 
						||
}
 | 
						||
 | 
						||
// FindMonthlyPercentile 获取某月内百分位
 | 
						||
func (this *ServerBandwidthStatDAO) FindMonthlyPercentile(tx *dbs.Tx, serverId int64, month string, percentile int, useAvg bool) (result int64, err error) {
 | 
						||
	if percentile <= 0 {
 | 
						||
		percentile = 95
 | 
						||
	}
 | 
						||
 | 
						||
	// 如果是100%以上,则快速返回
 | 
						||
	if percentile >= 100 {
 | 
						||
		result, err = this.Query(tx).
 | 
						||
			Table(this.partialTable(serverId)).
 | 
						||
			Attr("serverId", serverId).
 | 
						||
			Result(this.bytesField(useAvg)).
 | 
						||
			Between("day", month+"01", month+"31").
 | 
						||
			Desc("bytes").
 | 
						||
			Limit(1).
 | 
						||
			FindInt64Col(0)
 | 
						||
		return
 | 
						||
	}
 | 
						||
 | 
						||
	// 总数量
 | 
						||
	total, err := this.Query(tx).
 | 
						||
		Table(this.partialTable(serverId)).
 | 
						||
		Attr("serverId", serverId).
 | 
						||
		Between("day", month+"01", month+"31").
 | 
						||
		Count()
 | 
						||
	if err != nil {
 | 
						||
		return 0, err
 | 
						||
	}
 | 
						||
	if total == 0 {
 | 
						||
		return 0, nil
 | 
						||
	}
 | 
						||
 | 
						||
	var offset int64
 | 
						||
 | 
						||
	if total > 1 {
 | 
						||
		offset = int64(math.Ceil(float64(total) * float64(100-percentile) / 100))
 | 
						||
	}
 | 
						||
 | 
						||
	// 查询 nth 位置
 | 
						||
	result, err = this.Query(tx).
 | 
						||
		Table(this.partialTable(serverId)).
 | 
						||
		Attr("serverId", serverId).
 | 
						||
		Result(this.bytesField(useAvg)).
 | 
						||
		Between("day", month+"01", month+"31").
 | 
						||
		Desc("bytes").
 | 
						||
		Offset(offset).
 | 
						||
		Limit(1).
 | 
						||
		FindInt64Col(0)
 | 
						||
 | 
						||
	return
 | 
						||
}
 | 
						||
 | 
						||
// FindPercentileBetweenDays 获取日期段内内百分位
 | 
						||
func (this *ServerBandwidthStatDAO) FindPercentileBetweenDays(tx *dbs.Tx, serverId int64, dayFrom string, dayTo string, percentile int32, useAvg bool) (result *ServerBandwidthStat, err error) {
 | 
						||
	if dayFrom > dayTo {
 | 
						||
		dayFrom, dayTo = dayTo, dayFrom
 | 
						||
	}
 | 
						||
 | 
						||
	if percentile <= 0 {
 | 
						||
		percentile = 95
 | 
						||
	}
 | 
						||
 | 
						||
	// 如果是100%以上,则快速返回
 | 
						||
	if percentile >= 100 {
 | 
						||
		one, err := this.Query(tx).
 | 
						||
			Table(this.partialTable(serverId)).
 | 
						||
			Attr("serverId", serverId).
 | 
						||
			Between("day", dayFrom, dayTo).
 | 
						||
			Desc(this.bytesOrderField(useAvg)).
 | 
						||
			Find()
 | 
						||
		if err != nil || one == nil {
 | 
						||
			return nil, err
 | 
						||
		}
 | 
						||
 | 
						||
		return this.fixServerStat(one.(*ServerBandwidthStat), useAvg), nil
 | 
						||
	}
 | 
						||
 | 
						||
	// 总数量
 | 
						||
	total, err := this.Query(tx).
 | 
						||
		Table(this.partialTable(serverId)).
 | 
						||
		Attr("serverId", serverId).
 | 
						||
		Between("day", dayFrom, dayTo).
 | 
						||
		Count()
 | 
						||
	if err != nil {
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
	if total == 0 {
 | 
						||
		return nil, nil
 | 
						||
	}
 | 
						||
 | 
						||
	var offset int64
 | 
						||
 | 
						||
	if total > 1 {
 | 
						||
		offset = int64(math.Ceil(float64(total) * float64(100-percentile) / 100))
 | 
						||
	}
 | 
						||
 | 
						||
	// 查询 nth 位置
 | 
						||
	one, err := this.Query(tx).
 | 
						||
		Table(this.partialTable(serverId)).
 | 
						||
		Attr("serverId", serverId).
 | 
						||
		Between("day", dayFrom, dayTo).
 | 
						||
		Desc(this.bytesOrderField(useAvg)).
 | 
						||
		Offset(offset).
 | 
						||
		Find()
 | 
						||
	if err != nil || one == nil {
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
 | 
						||
	return this.fixServerStat(one.(*ServerBandwidthStat), useAvg), nil
 | 
						||
}
 | 
						||
 | 
						||
// FindPercentileBetweenTimes 获取时间段内内百分位
 | 
						||
// timeFrom 开始时间,格式 YYYYMMDDHHII
 | 
						||
// timeTo 结束时间,格式 YYYYMMDDHHII
 | 
						||
func (this *ServerBandwidthStatDAO) FindPercentileBetweenTimes(tx *dbs.Tx, serverId int64, timeFrom string, timeTo string, percentile int32, useAvg bool) (result *ServerBandwidthStat, err error) {
 | 
						||
	var reg = regexp.MustCompile(`^\d{12}$`)
 | 
						||
	if !reg.MatchString(timeFrom) {
 | 
						||
		return nil, errors.New("invalid timeFrom '" + timeFrom + "'")
 | 
						||
	}
 | 
						||
	if !reg.MatchString(timeTo) {
 | 
						||
		return nil, errors.New("invalid timeTo '" + timeTo + "'")
 | 
						||
	}
 | 
						||
 | 
						||
	if timeFrom > timeTo {
 | 
						||
		timeFrom, timeTo = timeTo, timeFrom
 | 
						||
	}
 | 
						||
 | 
						||
	if percentile <= 0 {
 | 
						||
		percentile = 95
 | 
						||
	}
 | 
						||
 | 
						||
	// 如果是100%以上,则快速返回
 | 
						||
	if percentile >= 100 {
 | 
						||
		one, err := this.Query(tx).
 | 
						||
			Table(this.partialTable(serverId)).
 | 
						||
			Attr("serverId", serverId).
 | 
						||
			Between("CONCAT(day, timeAt)", timeFrom, timeTo).
 | 
						||
			Desc(this.bytesOrderField(useAvg)).
 | 
						||
			Find()
 | 
						||
		if err != nil || one == nil {
 | 
						||
			return nil, err
 | 
						||
		}
 | 
						||
 | 
						||
		return this.fixServerStat(one.(*ServerBandwidthStat), useAvg), nil
 | 
						||
	}
 | 
						||
 | 
						||
	// 总数量
 | 
						||
	total, err := this.Query(tx).
 | 
						||
		Table(this.partialTable(serverId)).
 | 
						||
		Attr("serverId", serverId).
 | 
						||
		Between("CONCAT(day, timeAt)", timeFrom, timeTo).
 | 
						||
		Count()
 | 
						||
	if err != nil {
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
	if total == 0 {
 | 
						||
		return nil, nil
 | 
						||
	}
 | 
						||
 | 
						||
	var offset int64
 | 
						||
 | 
						||
	if total > 1 {
 | 
						||
		offset = int64(math.Ceil(float64(total) * float64(100-percentile) / 100))
 | 
						||
	}
 | 
						||
 | 
						||
	// 查询 nth 位置
 | 
						||
	one, err := this.Query(tx).
 | 
						||
		Table(this.partialTable(serverId)).
 | 
						||
		Attr("serverId", serverId).
 | 
						||
		Between("CONCAT(day, timeAt)", timeFrom, timeTo).
 | 
						||
		Desc(this.bytesOrderField(useAvg)).
 | 
						||
		Offset(offset).
 | 
						||
		Find()
 | 
						||
	if err != nil || one == nil {
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
 | 
						||
	return this.fixServerStat(one.(*ServerBandwidthStat), useAvg), nil
 | 
						||
}
 | 
						||
 | 
						||
// Clean 清理过期数据
 | 
						||
func (this *ServerBandwidthStatDAO) Clean(tx *dbs.Tx) error {
 | 
						||
	var day = timeutil.Format("Ymd", time.Now().AddDate(0, 0, -100)) // 保留大约3个月的数据
 | 
						||
	return this.runBatch(func(table string, locker *sync.Mutex) error {
 | 
						||
		_, err := this.Query(tx).
 | 
						||
			Table(table).
 | 
						||
			Lt("day", day).
 | 
						||
			Delete()
 | 
						||
		return err
 | 
						||
	})
 | 
						||
}
 | 
						||
 | 
						||
// 批量执行
 | 
						||
func (this *ServerBandwidthStatDAO) runBatch(f func(table string, locker *sync.Mutex) error) error {
 | 
						||
	var locker = &sync.Mutex{}
 | 
						||
	var wg = sync.WaitGroup{}
 | 
						||
	wg.Add(ServerBandwidthStatTablePartials)
 | 
						||
	var resultErr error
 | 
						||
	for i := 0; i < ServerBandwidthStatTablePartials; i++ {
 | 
						||
		var table = this.partialTable(int64(i))
 | 
						||
		go func(table string) {
 | 
						||
			defer wg.Done()
 | 
						||
 | 
						||
			err := f(table, locker)
 | 
						||
			if err != nil {
 | 
						||
				resultErr = err
 | 
						||
			}
 | 
						||
		}(table)
 | 
						||
	}
 | 
						||
	wg.Wait()
 | 
						||
	return resultErr
 | 
						||
}
 | 
						||
 | 
						||
// 获取分区表
 | 
						||
func (this *ServerBandwidthStatDAO) partialTable(serverId int64) string {
 | 
						||
	return this.Table + "_" + types.String(serverId%int64(ServerBandwidthStatTablePartials))
 | 
						||
}
 | 
						||
 | 
						||
// 获取字节字段
 | 
						||
func (this *ServerBandwidthStatDAO) bytesField(useAvg bool) string {
 | 
						||
	if useAvg {
 | 
						||
		return "avgBytes AS bytes"
 | 
						||
	}
 | 
						||
	return "bytes"
 | 
						||
}
 | 
						||
 | 
						||
// 获取最大字节字段
 | 
						||
func (this *ServerBandwidthStatDAO) maxBytesField(useAvg bool) string {
 | 
						||
	if useAvg {
 | 
						||
		return "MAX(avgBytes) AS bytes"
 | 
						||
	}
 | 
						||
	return "MAX(bytes) AS bytes"
 | 
						||
}
 | 
						||
 | 
						||
// 获取排序字段
 | 
						||
func (this *ServerBandwidthStatDAO) bytesOrderField(useAvg bool) string {
 | 
						||
	if useAvg {
 | 
						||
		return "avgBytes"
 | 
						||
	}
 | 
						||
	return "bytes"
 | 
						||
}
 | 
						||
 | 
						||
func (this *ServerBandwidthStatDAO) fixServerStat(stat *ServerBandwidthStat, useAvg bool) *ServerBandwidthStat {
 | 
						||
	if stat == nil {
 | 
						||
		return nil
 | 
						||
	}
 | 
						||
	if useAvg {
 | 
						||
		stat.Bytes = stat.AvgBytes
 | 
						||
	}
 | 
						||
	return stat
 | 
						||
}
 | 
						||
 | 
						||
func (this *ServerBandwidthStatDAO) fixServerStats(stats []*ServerBandwidthStat, useAvg bool) {
 | 
						||
	if useAvg {
 | 
						||
		for _, stat := range stats {
 | 
						||
			stat.Bytes = stat.AvgBytes
 | 
						||
		}
 | 
						||
	}
 | 
						||
}
 |