Files
EdgeNode/internal/stats/traffic_stat_manager.go

112 lines
2.6 KiB
Go
Raw Normal View History

2021-01-25 16:40:31 +08:00
package stats
import (
2021-01-25 16:40:31 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
2021-01-25 16:40:31 +08:00
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/Tea"
"strconv"
"sync"
"time"
)
var SharedTrafficStatManager = NewTrafficStatManager()
2021-04-29 16:48:47 +08:00
// TrafficStatManager 区域流量统计
type TrafficStatManager struct {
2021-01-25 16:40:31 +08:00
m map[string]int64 // [timestamp serverId] => bytes
locker sync.Mutex
configFunc func() *nodeconfigs.NodeConfig
}
2021-04-29 16:48:47 +08:00
// NewTrafficStatManager 获取新对象
func NewTrafficStatManager() *TrafficStatManager {
manager := &TrafficStatManager{
m: map[string]int64{},
}
return manager
}
2021-04-29 16:48:47 +08:00
// Start 启动自动任务
2021-01-25 16:40:31 +08:00
func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig) {
this.configFunc = configFunc
duration := 5 * time.Minute
if Tea.IsTesting() {
// 测试环境缩短上传时间,方便我们调试
duration = 30 * time.Second
}
ticker := time.NewTicker(duration)
2021-01-25 16:40:31 +08:00
events.On(events.EventQuit, func() {
remotelogs.Println("TRAFFIC_STAT_MANAGER", "quit")
ticker.Stop()
})
2021-01-26 18:42:46 +08:00
remotelogs.Println("TRAFFIC_STA_MANAGER", "start ...")
for range ticker.C {
err := this.Upload()
if err != nil {
remotelogs.Error("TRAFFIC_STAT_MANAGER", "upload stats failed: "+err.Error())
}
}
}
2021-04-29 16:48:47 +08:00
// Add 添加流量
func (this *TrafficStatManager) Add(serverId int64, bytes int64) {
if bytes == 0 {
return
}
timestamp := utils.UnixTime() / 300 * 300
key := strconv.FormatInt(timestamp, 10) + strconv.FormatInt(serverId, 10)
this.locker.Lock()
this.m[key] += bytes
this.locker.Unlock()
}
2021-04-29 16:48:47 +08:00
// Upload 上传流量
func (this *TrafficStatManager) Upload() error {
2021-01-25 16:40:31 +08:00
config := this.configFunc()
if config == nil {
return nil
}
client, err := rpc.SharedRPC()
if err != nil {
return err
}
this.locker.Lock()
m := this.m
this.m = map[string]int64{}
this.locker.Unlock()
pbStats := []*pb.ServerDailyStat{}
for key, bytes := range m {
timestamp, err := strconv.ParseInt(key[:10], 10, 64)
if err != nil {
return err
}
serverId, err := strconv.ParseInt(key[10:], 10, 64)
if err != nil {
return err
}
pbStats = append(pbStats, &pb.ServerDailyStat{
ServerId: serverId,
2021-01-25 16:40:31 +08:00
RegionId: config.RegionId,
Bytes: bytes,
CreatedAt: timestamp,
})
}
if len(pbStats) == 0 {
return nil
}
_, err = client.ServerDailyStatRPC().UploadServerDailyStats(client.Context(), &pb.UploadServerDailyStatsRequest{Stats: pbStats})
return err
}