增加按用户统计带宽/修改其他相关代码

This commit is contained in:
刘祥超
2022-07-07 09:18:08 +08:00
parent c0a99a4ba3
commit c15cc6d75c
19 changed files with 470 additions and 113 deletions

View File

@@ -1,4 +1,4 @@
package stats
package models
import (
"github.com/TeaOSLab/EdgeAPI/internal/errors"
@@ -11,6 +11,7 @@ import (
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
timeutil "github.com/iwind/TeaGo/utils/time"
"math"
"sync"
"time"
)
@@ -56,7 +57,7 @@ func init() {
}
// UpdateServerBandwidth 写入数据
func (this *ServerBandwidthStatDAO) UpdateServerBandwidth(tx *dbs.Tx, serverId int64, day string, timeAt string, bytes int64) error {
func (this *ServerBandwidthStatDAO) UpdateServerBandwidth(tx *dbs.Tx, userId int64, serverId int64, day string, timeAt string, bytes int64) error {
if serverId <= 0 {
return errors.New("invalid server id '" + types.String(serverId) + "'")
}
@@ -65,6 +66,7 @@ func (this *ServerBandwidthStatDAO) UpdateServerBandwidth(tx *dbs.Tx, serverId i
Table(this.partialTable(serverId)).
Param("bytes", bytes).
InsertOrUpdateQuickly(maps.Map{
"userId": userId,
"serverId": serverId,
"day": day,
"timeAt": timeAt,
@@ -89,6 +91,58 @@ func (this *ServerBandwidthStatDAO) FindServerStats(tx *dbs.Tx, serverId int64,
return
}
// FindMonthlyPercentile 获取某月内百分位
func (this *ServerBandwidthStatDAO) FindMonthlyPercentile(tx *dbs.Tx, serverId int64, month string, percentile int) (result int64, err error) {
if percentile <= 0 {
percentile = 95
}
// 如果是100%以上,则快速返回
if percentile >= 100 {
result, err = this.Query(tx).
Table(this.partialTable(serverId)).
Result("bytes").
Attr("serverId", serverId).
Between("day", month+"01", month+"31").
Desc("bytes").
Limit(1).
FindInt64Col(0)
return
}
// 总数量
total, err := this.Query(tx).
Table(this.partialTable(serverId)).
Attr("serverId", serverId).
Between("day", month+"01", month+"31").
Count()
if err != nil {
return 0, err
}
if total == 0 {
return 0, nil
}
var offset int64
if total > 1 {
offset = int64(math.Ceil(float64(total) * float64(100-percentile) / 100))
}
// 查询 nth 位置
result, err = this.Query(tx).
Table(this.partialTable(serverId)).
Result("bytes").
Attr("serverId", serverId).
Between("day", month+"01", month+"31").
Desc("bytes").
Offset(offset).
Limit(1).
FindInt64Col(0)
return
}
// Clean 清理过期数据
func (this *ServerBandwidthStatDAO) Clean(tx *dbs.Tx) error {
var day = timeutil.Format("Ymd", time.Now().AddDate(0, 0, -62)) // 保留大约2个月的数据

View File

@@ -1,8 +1,8 @@
package stats_test
package models_test
import (
"fmt"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/stats"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
_ "github.com/go-sql-driver/mysql"
_ "github.com/iwind/TeaGo/bootstrap"
"github.com/iwind/TeaGo/dbs"
@@ -13,9 +13,9 @@ import (
)
func TestServerBandwidthStatDAO_UpdateServerBandwidth(t *testing.T) {
var dao = stats.NewServerBandwidthStatDAO()
var dao = models.NewServerBandwidthStatDAO()
var tx *dbs.Tx
err := dao.UpdateServerBandwidth(tx, 1, timeutil.Format("Ymd"), timeutil.Format("Hi"), 1024)
err := dao.UpdateServerBandwidth(tx, 1, 1, timeutil.Format("Ymd"), timeutil.Format("Hi"), 1024)
if err != nil {
t.Fatal(err)
}
@@ -23,12 +23,13 @@ func TestServerBandwidthStatDAO_UpdateServerBandwidth(t *testing.T) {
}
func TestSeverBandwidthStatDAO_InsertManyStats(t *testing.T) {
var dao = stats.NewServerBandwidthStatDAO()
var dao = models.NewServerBandwidthStatDAO()
var tx *dbs.Tx
for i := 0; i < 1_000_000; i++ {
var count = 1 // 测试时将此值设为一个比较大的数字
for i := 0; i < count; i++ {
var day = timeutil.Format("Ymd", time.Now().AddDate(0, 0, -rands.Int(0, 200)))
var minute = fmt.Sprintf("%02d%02d", rands.Int(0, 23), rands.Int(0, 59))
err := dao.UpdateServerBandwidth(tx, 1, day, minute, 1024)
err := dao.UpdateServerBandwidth(tx, 1, 1, day, minute, 1024)
if err != nil {
t.Fatal(err)
}
@@ -36,8 +37,14 @@ func TestSeverBandwidthStatDAO_InsertManyStats(t *testing.T) {
t.Log("ok")
}
func TestServerBandwidthStatDAO_FindMonthlyPercentile(t *testing.T) {
var dao = models.NewServerBandwidthStatDAO()
var tx *dbs.Tx
t.Log(dao.FindMonthlyPercentile(tx, 23, timeutil.Format("Ym"), 95))
}
func TestServerBandwidthStatDAO_Clean(t *testing.T) {
var dao = stats.NewServerBandwidthStatDAO()
var dao = models.NewServerBandwidthStatDAO()
var tx *dbs.Tx
var before = time.Now()
err := dao.Clean(tx)

View File

@@ -1,8 +1,9 @@
package stats
package models
// ServerBandwidthStat 服务峰值带宽统计
type ServerBandwidthStat struct {
Id uint64 `field:"id"` // ID
UserId uint64 `field:"userId"` // 用户ID
ServerId uint64 `field:"serverId"` // 服务ID
Day string `field:"day"` // 日期YYYYMMDD
TimeAt string `field:"timeAt"` // 时间点HHMM
@@ -11,6 +12,7 @@ type ServerBandwidthStat struct {
type ServerBandwidthStatOperator struct {
Id interface{} // ID
UserId interface{} // 用户ID
ServerId interface{} // 服务ID
Day interface{} // 日期YYYYMMDD
TimeAt interface{} // 时间点HHMM

View File

@@ -0,0 +1 @@
package models

View File

@@ -12,7 +12,6 @@ import (
"github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/rands"
timeutil "github.com/iwind/TeaGo/utils/time"
"math"
"regexp"
"strings"
"time"
@@ -405,45 +404,6 @@ func (this *ServerDailyStatDAO) SumMonthlyBytes(tx *dbs.Tx, serverId int64, mont
FindInt64Col(0)
}
// FindMonthlyPercentile 获取某月内百分位
func (this *ServerDailyStatDAO) FindMonthlyPercentile(tx *dbs.Tx, serverId int64, month string, percentile int) (result int64, err error) {
if percentile <= 0 {
percentile = 95
}
if percentile > 100 {
percentile = 100
}
total, err := this.Query(tx).
Attr("serverId", serverId).
Between("day", month+"01", month+"31").
Count()
if err != nil {
return 0, err
}
if total == 0 {
return 0, nil
}
var offset int64
if total > 1 {
offset = int64(math.Ceil(float64(total) * float64(100-percentile) / 100))
}
result, err = this.Query(tx).
Result("bytes").
Attr("serverId", serverId).
Between("day", month+"01", month+"31").
Desc("bytes").
Offset(offset).
Limit(1).
FindInt64Col(0)
// 因为是5分钟统计所以需要除以300
result = result / 300
return
}
// FindDailyStats 按天统计
func (this *ServerDailyStatDAO) FindDailyStats(tx *dbs.Tx, serverId int64, dayFrom string, dayTo string) (result []*ServerDailyStat, err error) {
ones, err := this.Query(tx).

View File

@@ -83,13 +83,3 @@ func TestServerDailyStatDAO_FindDistinctPlanServerIdsBetweenDay(t *testing.T) {
}
t.Log(serverIds)
}
func TestServerDailyStatDAO_FindMonthlyPercentile(t *testing.T) {
var tx *dbs.Tx
var dao = NewServerDailyStatDAO()
result, err := dao.FindMonthlyPercentile(tx, 23, timeutil.Format("Ym"), 95)
if err != nil {
t.Fatal(err)
}
t.Log("result:", result)
}

View File

@@ -1039,9 +1039,10 @@ func (this *ServerDAO) ComposeServerConfig(tx *dbs.Tx, server *Server, cacheMap
return cache.(*serverconfigs.ServerConfig), nil
}
config := &serverconfigs.ServerConfig{}
var config = &serverconfigs.ServerConfig{}
config.Id = int64(server.Id)
config.ClusterId = int64(server.ClusterId)
config.UserId = int64(server.UserId)
config.Type = server.Type
config.IsOn = server.IsOn
config.Name = server.Name
@@ -1063,7 +1064,7 @@ func (this *ServerDAO) ComposeServerConfig(tx *dbs.Tx, server *Server, cacheMap
// ServerNames
if IsNotNull(server.ServerNames) {
serverNames := []*serverconfigs.ServerNameConfig{}
var serverNames = []*serverconfigs.ServerNameConfig{}
err := json.Unmarshal(server.ServerNames, &serverNames)
if err != nil {
return nil, err
@@ -1092,7 +1093,7 @@ func (this *ServerDAO) ComposeServerConfig(tx *dbs.Tx, server *Server, cacheMap
// HTTP
if IsNotNull(server.Http) {
httpConfig := &serverconfigs.HTTPProtocolConfig{}
var httpConfig = &serverconfigs.HTTPProtocolConfig{}
err := json.Unmarshal(server.Http, httpConfig)
if err != nil {
return nil, err
@@ -1102,7 +1103,7 @@ func (this *ServerDAO) ComposeServerConfig(tx *dbs.Tx, server *Server, cacheMap
// HTTPS
if IsNotNull(server.Https) {
httpsConfig := &serverconfigs.HTTPSProtocolConfig{}
var httpsConfig = &serverconfigs.HTTPSProtocolConfig{}
err := json.Unmarshal(server.Https, httpsConfig)
if err != nil {
return nil, err
@@ -1124,7 +1125,7 @@ func (this *ServerDAO) ComposeServerConfig(tx *dbs.Tx, server *Server, cacheMap
// TCP
if IsNotNull(server.Tcp) {
tcpConfig := &serverconfigs.TCPProtocolConfig{}
var tcpConfig = &serverconfigs.TCPProtocolConfig{}
err := json.Unmarshal(server.Tcp, tcpConfig)
if err != nil {
return nil, err
@@ -1134,7 +1135,7 @@ func (this *ServerDAO) ComposeServerConfig(tx *dbs.Tx, server *Server, cacheMap
// TLS
if IsNotNull(server.Tls) {
tlsConfig := &serverconfigs.TLSProtocolConfig{}
var tlsConfig = &serverconfigs.TLSProtocolConfig{}
err := json.Unmarshal(server.Tls, tlsConfig)
if err != nil {
return nil, err
@@ -1156,7 +1157,7 @@ func (this *ServerDAO) ComposeServerConfig(tx *dbs.Tx, server *Server, cacheMap
// Unix
if IsNotNull(server.Unix) {
unixConfig := &serverconfigs.UnixProtocolConfig{}
var unixConfig = &serverconfigs.UnixProtocolConfig{}
err := json.Unmarshal(server.Unix, unixConfig)
if err != nil {
return nil, err
@@ -1166,7 +1167,7 @@ func (this *ServerDAO) ComposeServerConfig(tx *dbs.Tx, server *Server, cacheMap
// UDP
if IsNotNull(server.Udp) {
udpConfig := &serverconfigs.UDPProtocolConfig{}
var udpConfig = &serverconfigs.UDPProtocolConfig{}
err := json.Unmarshal(server.Udp, udpConfig)
if err != nil {
return nil, err
@@ -1187,7 +1188,7 @@ func (this *ServerDAO) ComposeServerConfig(tx *dbs.Tx, server *Server, cacheMap
// ReverseProxy
if IsNotNull(server.ReverseProxy) {
reverseProxyRef := &serverconfigs.ReverseProxyRef{}
var reverseProxyRef = &serverconfigs.ReverseProxyRef{}
err := json.Unmarshal(server.ReverseProxy, reverseProxyRef)
if err != nil {
return nil, err
@@ -1204,7 +1205,7 @@ func (this *ServerDAO) ComposeServerConfig(tx *dbs.Tx, server *Server, cacheMap
}
// WAF策略
clusterId := int64(server.ClusterId)
var clusterId = int64(server.ClusterId)
httpFirewallPolicyId, err := SharedNodeClusterDAO.FindClusterHTTPFirewallPolicyId(tx, clusterId, cacheMap)
if err != nil {
return nil, err

View File

@@ -1 +0,0 @@
package stats

View File

@@ -0,0 +1,143 @@
package models
import (
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
timeutil "github.com/iwind/TeaGo/utils/time"
"sync"
"time"
)
type UserBandwidthStatDAO dbs.DAO
const (
UserBandwidthStatTablePartials = 20
)
func init() {
dbs.OnReadyDone(func() {
// 清理数据任务
var ticker = time.NewTicker(time.Duration(rands.Int(24, 48)) * time.Hour)
goman.New(func() {
for range ticker.C {
err := SharedUserBandwidthStatDAO.Clean(nil)
if err != nil {
remotelogs.Error("SharedUserBandwidthStatDAO", "clean expired data failed: "+err.Error())
}
}
})
})
}
func NewUserBandwidthStatDAO() *UserBandwidthStatDAO {
return dbs.NewDAO(&UserBandwidthStatDAO{
DAOObject: dbs.DAOObject{
DB: Tea.Env,
Table: "edgeUserBandwidthStats",
Model: new(UserBandwidthStat),
PkName: "id",
},
}).(*UserBandwidthStatDAO)
}
var SharedUserBandwidthStatDAO *UserBandwidthStatDAO
func init() {
dbs.OnReady(func() {
SharedUserBandwidthStatDAO = NewUserBandwidthStatDAO()
})
}
// UpdateUserBandwidth 写入数据
func (this *UserBandwidthStatDAO) UpdateUserBandwidth(tx *dbs.Tx, userId int64, day string, timeAt string, bytes int64) error {
if userId <= 0 {
// 如果用户ID不大于0则说明服务不属于任何用户此时不需要处理
return nil
}
return this.Query(tx).
Table(this.partialTable(userId)).
Param("bytes", bytes).
InsertOrUpdateQuickly(maps.Map{
"userId": userId,
"day": day,
"timeAt": timeAt,
"bytes": bytes,
}, maps.Map{
"bytes": dbs.SQL("bytes+:bytes"),
})
}
// FindUserPeekBandwidthInMonth 读取某月带宽峰值
// month YYYYMM
func (this *UserBandwidthStatDAO) FindUserPeekBandwidthInMonth(tx *dbs.Tx, userId int64, month string) (*UserBandwidthStat, error) {
one, err := this.Query(tx).
Table(this.partialTable(userId)).
Attr("userId", userId).
Between("day", month+"01", month+"31").
Desc("bytes").
Find()
if err != nil || one == nil {
return nil, err
}
return one.(*UserBandwidthStat), nil
}
// FindUserPeekBandwidthInDay 读取某日带宽峰值
// day YYYYMMDD
func (this *UserBandwidthStatDAO) FindUserPeekBandwidthInDay(tx *dbs.Tx, userId int64, day string) (*UserBandwidthStat, error) {
one, err := this.Query(tx).
Table(this.partialTable(userId)).
Attr("userId", userId).
Attr("day", day).
Desc("bytes").
Find()
if err != nil || one == nil {
return nil, err
}
return one.(*UserBandwidthStat), nil
}
// Clean 清理过期数据
func (this *UserBandwidthStatDAO) Clean(tx *dbs.Tx) error {
var day = timeutil.Format("Ymd", time.Now().AddDate(0, 0, -62)) // 保留大约2个月的数据
return this.runBatch(func(table string, locker *sync.Mutex) error {
_, err := this.Query(tx).
Table(table).
Lt("day", day).
Delete()
return err
})
}
// 批量执行
func (this *UserBandwidthStatDAO) runBatch(f func(table string, locker *sync.Mutex) error) error {
var locker = &sync.Mutex{}
var wg = sync.WaitGroup{}
wg.Add(UserBandwidthStatTablePartials)
var resultErr error
for i := 0; i < UserBandwidthStatTablePartials; i++ {
var table = this.partialTable(int64(i))
go func(table string) {
defer wg.Done()
err := f(table, locker)
if err != nil {
resultErr = err
}
}(table)
}
wg.Wait()
return resultErr
}
// 获取分区表
func (this *UserBandwidthStatDAO) partialTable(userId int64) string {
return this.Table + "_" + types.String(userId%int64(UserBandwidthStatTablePartials))
}

View File

@@ -0,0 +1,30 @@
package models_test
import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
_ "github.com/go-sql-driver/mysql"
_ "github.com/iwind/TeaGo/bootstrap"
"github.com/iwind/TeaGo/dbs"
timeutil "github.com/iwind/TeaGo/utils/time"
"testing"
)
func TestUserBandwidthStatDAO_UpdateServerBandwidth(t *testing.T) {
var dao = models.NewUserBandwidthStatDAO()
var tx *dbs.Tx
err := dao.UpdateUserBandwidth(tx, 1, timeutil.Format("Ymd"), timeutil.Format("Hi"), 1024)
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}
func TestUserBandwidthStatDAO_Clean(t *testing.T) {
var dao = models.NewUserBandwidthStatDAO()
var tx *dbs.Tx
err := dao.Clean(tx)
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}

View File

@@ -0,0 +1,22 @@
package models
// UserBandwidthStat 用户月带宽峰值
type UserBandwidthStat struct {
Id uint64 `field:"id"` // ID
UserId uint64 `field:"userId"` // 用户ID
Day string `field:"day"` // 日期YYYYMMDD
TimeAt string `field:"timeAt"` // 时间点HHII
Bytes uint64 `field:"bytes"` // 带宽
}
type UserBandwidthStatOperator struct {
Id interface{} // ID
UserId interface{} // 用户ID
Day interface{} // 日期YYYYMMDD
TimeAt interface{} // 时间点HHII
Bytes interface{} // 带宽
}
func NewUserBandwidthStatOperator() *UserBandwidthStatOperator {
return &UserBandwidthStatOperator{}
}

View File

@@ -0,0 +1 @@
package models

View File

@@ -236,7 +236,7 @@ func (this *UserBillDAO) GenerateBills(tx *dbs.Tx, month string) error {
// 百分位
var percentile = 95
percentileBytes, err := SharedServerDailyStatDAO.FindMonthlyPercentile(tx, serverId, month, percentile)
percentileBytes, err := SharedServerBandwidthStatDAO.FindMonthlyPercentile(tx, serverId, month, percentile)
if err != nil {
return err
}
@@ -257,7 +257,7 @@ func (this *UserBillDAO) GenerateBills(tx *dbs.Tx, month string) error {
percentile = 100
}
}
percentileBytes, err := SharedServerDailyStatDAO.FindMonthlyPercentile(tx, serverId, month, percentile)
percentileBytes, err := SharedServerBandwidthStatDAO.FindMonthlyPercentile(tx, serverId, month, percentile)
if err != nil {
return err
}
@@ -306,7 +306,7 @@ func (this *UserBillDAO) GenerateBills(tx *dbs.Tx, month string) error {
// 百分位
var percentile = 95
percentileBytes, err := SharedServerDailyStatDAO.FindMonthlyPercentile(tx, serverId, month, percentile)
percentileBytes, err := SharedServerBandwidthStatDAO.FindMonthlyPercentile(tx, serverId, month, percentile)
if err != nil {
return err
}
@@ -343,7 +343,7 @@ func (this *UserBillDAO) GenerateBills(tx *dbs.Tx, month string) error {
// 百分位
var percentile = 95
percentileBytes, err := SharedServerDailyStatDAO.FindMonthlyPercentile(tx, serverId, month, percentile)
percentileBytes, err := SharedServerBandwidthStatDAO.FindMonthlyPercentile(tx, serverId, month, percentile)
if err != nil {
return err
}
@@ -361,7 +361,7 @@ func (this *UserBillDAO) GenerateBills(tx *dbs.Tx, month string) error {
// 百分位
var percentile = 95
percentileBytes, err := SharedServerDailyStatDAO.FindMonthlyPercentile(tx, serverId, month, percentile)
percentileBytes, err := SharedServerBandwidthStatDAO.FindMonthlyPercentile(tx, serverId, month, percentile)
if err != nil {
return err
}
@@ -382,7 +382,7 @@ func (this *UserBillDAO) GenerateBills(tx *dbs.Tx, month string) error {
percentile = 100
}
}
percentileBytes, err := SharedServerDailyStatDAO.FindMonthlyPercentile(tx, serverId, month, percentile)
percentileBytes, err := SharedServerBandwidthStatDAO.FindMonthlyPercentile(tx, serverId, month, percentile)
if err != nil {
return err
}