diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 91a158d..ee3fbdc 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -205,9 +205,7 @@ func (this *Node) Start() { // 统计 goman.New(func() { - stats.SharedTrafficStatManager.Start(func() *nodeconfigs.NodeConfig { - return sharedNodeConfig - }) + stats.SharedTrafficStatManager.Start() }) goman.New(func() { stats.SharedHTTPRequestStatManager.Start() diff --git a/internal/stats/bandwidth_stat_manager.go b/internal/stats/bandwidth_stat_manager.go index ce9a3cd..266e232 100644 --- a/internal/stats/bandwidth_stat_manager.go +++ b/internal/stats/bandwidth_stat_manager.go @@ -5,6 +5,8 @@ package stats import ( "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/iwind/TeaGo/logs" @@ -18,6 +20,14 @@ var SharedBandwidthStatManager = NewBandwidthStatManager() const bandwidthTimestampDelim = 2 // N秒平均,更为精确 +func init() { + events.On(events.EventLoaded, func() { + goman.New(func() { + SharedBandwidthStatManager.Start() + }) + }) +} + type BandwidthStat struct { Day string TimeAt string @@ -33,6 +43,8 @@ type BandwidthStat struct { type BandwidthStatManager struct { m map[string]*BandwidthStat // key => *BandwidthStat + pbStats []*pb.ServerBandwidthStat + lastTime string // 上一次执行的时间 ticker *time.Ticker @@ -73,6 +85,18 @@ func (this *BandwidthStatManager) Loop() error { var pbStats = []*pb.ServerBandwidthStat{} + // 历史未提交记录 + if len(this.pbStats) > 0 { + var expiredTime = timeutil.FormatTime("Hi", time.Now().Unix()-1200) // 只保留20分钟 + + for _, stat := range this.pbStats { + if stat.TimeAt > expiredTime { + pbStats = append(pbStats, stat) + } + } + this.pbStats = nil + } + this.locker.Lock() for key, stat := range this.m { if stat.Day < day || stat.TimeAt < currentTime { @@ -98,6 +122,8 @@ func (this *BandwidthStatManager) Loop() error { } _, err = rpcClient.ServerBandwidthStatRPC.UploadServerBandwidthStats(rpcClient.Context(), &pb.UploadServerBandwidthStatsRequest{ServerBandwidthStats: pbStats}) if err != nil { + this.pbStats = pbStats + return err } } diff --git a/internal/stats/traffic_stat_manager.go b/internal/stats/traffic_stat_manager.go index 3fca488..b0a5bf6 100644 --- a/internal/stats/traffic_stat_manager.go +++ b/internal/stats/traffic_stat_manager.go @@ -31,19 +31,33 @@ type TrafficItem struct { CheckingTrafficLimit bool } +func (this *TrafficItem) Add(anotherItem *TrafficItem) { + this.Bytes += anotherItem.Bytes + this.CachedBytes += anotherItem.CachedBytes + this.CountRequests += anotherItem.CountRequests + this.CountCachedRequests += anotherItem.CountCachedRequests + this.CountAttackRequests += anotherItem.CountAttackRequests + this.AttackBytes += anotherItem.AttackBytes +} + +const trafficStatsMaxLife = 1200 // 最大只保存20分钟内的数据 + // TrafficStatManager 区域流量统计 type TrafficStatManager struct { itemMap map[string]*TrafficItem // [timestamp serverId] => *TrafficItem domainsMap map[string]*TrafficItem // timestamp @ serverId @ domain => *TrafficItem - locker sync.Mutex - configFunc func() *nodeconfigs.NodeConfig + + pbItems []*pb.ServerDailyStat + pbDomainItems []*pb.UploadServerDailyStatsRequest_DomainStat + + locker sync.Mutex totalRequests int64 } // NewTrafficStatManager 获取新对象 func NewTrafficStatManager() *TrafficStatManager { - manager := &TrafficStatManager{ + var manager = &TrafficStatManager{ itemMap: map[string]*TrafficItem{}, domainsMap: map[string]*TrafficItem{}, } @@ -52,9 +66,7 @@ func NewTrafficStatManager() *TrafficStatManager { } // Start 启动自动任务 -func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig) { - this.configFunc = configFunc - +func (this *TrafficStatManager) Start() { // 上传请求总数 var monitorTicker = time.NewTicker(1 * time.Minute) events.OnKey(events.EventQuit, this, func() { @@ -70,7 +82,7 @@ func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig) }) // 上传统计数据 - duration := 5 * time.Minute + var duration = 5 * time.Minute if Tea.IsTesting() { // 测试环境缩短上传时间,方便我们调试 duration = 30 * time.Second @@ -143,9 +155,10 @@ func (this *TrafficStatManager) Add(serverId int64, domain string, bytes int64, // Upload 上传流量 func (this *TrafficStatManager) Upload() error { - var config = this.configFunc() - if config == nil { - return nil + var regionId int64 + nodeConfig, _ := nodeconfigs.SharedNodeConfig() + if nodeConfig != nil { + regionId = nodeConfig.RegionId } client, err := rpc.SharedRPC() @@ -154,10 +167,14 @@ func (this *TrafficStatManager) Upload() error { } this.locker.Lock() + var itemMap = this.itemMap var domainMap = this.domainsMap + + // reset this.itemMap = map[string]*TrafficItem{} this.domainsMap = map[string]*TrafficItem{} + this.locker.Unlock() // 服务统计 @@ -174,7 +191,7 @@ func (this *TrafficStatManager) Upload() error { pbServerStats = append(pbServerStats, &pb.ServerDailyStat{ ServerId: serverId, - NodeRegionId: config.RegionId, + NodeRegionId: regionId, Bytes: item.Bytes, CachedBytes: item.CachedBytes, CountRequests: item.CountRequests, @@ -186,9 +203,6 @@ func (this *TrafficStatManager) Upload() error { CreatedAt: timestamp, }) } - if len(pbServerStats) == 0 { - return nil - } // 域名统计 var pbDomainStats = []*pb.UploadServerDailyStatsRequest_DomainStat{} @@ -210,9 +224,40 @@ func (this *TrafficStatManager) Upload() error { }) } + // 历史未提交记录 + if len(this.pbItems) > 0 || len(this.pbDomainItems) > 0 { + var expiredAt = time.Now().Unix() - 1200 // 只保留20分钟 + + for _, item := range this.pbItems { + if item.CreatedAt > expiredAt { + pbServerStats = append(pbServerStats, item) + } + } + this.pbItems = nil + + for _, item := range this.pbDomainItems { + if item.CreatedAt > expiredAt { + pbDomainStats = append(pbDomainStats, item) + } + } + this.pbDomainItems = nil + } + + if len(pbServerStats) == 0 && len(pbDomainStats) == 0 { + return nil + } + _, err = client.ServerDailyStatRPC.UploadServerDailyStats(client.Context(), &pb.UploadServerDailyStatsRequest{ Stats: pbServerStats, DomainStats: pbDomainStats, }) - return err + if err != nil { + // 加回历史记录 + this.pbItems = pbServerStats + this.pbDomainItems = pbDomainStats + + return err + } + + return nil }