diff --git a/internal/stats/bandwidth_stat_manager_test.go b/internal/stats/bandwidth_stat_manager_test.go index b0750aa..5e2d3b1 100644 --- a/internal/stats/bandwidth_stat_manager_test.go +++ b/internal/stats/bandwidth_stat_manager_test.go @@ -10,22 +10,22 @@ import ( func TestBandwidthStatManager_Add(t *testing.T) { var manager = stats.NewBandwidthStatManager() - manager.Add(1, 1, 10, 10) - manager.Add(1, 1, 10, 10) - manager.Add(1, 1, 10, 10) + manager.AddBandwidth(1, 1, 10, 10) + manager.AddBandwidth(1, 1, 10, 10) + manager.AddBandwidth(1, 1, 10, 10) time.Sleep(1 * time.Second) - manager.Add(1, 1, 85, 85) + manager.AddBandwidth(1, 1, 85, 85) time.Sleep(1 * time.Second) - manager.Add(1, 1, 25, 25) - manager.Add(1, 1, 75, 75) + manager.AddBandwidth(1, 1, 25, 25) + manager.AddBandwidth(1, 1, 75, 75) manager.Inspect() } func TestBandwidthStatManager_Loop(t *testing.T) { var manager = stats.NewBandwidthStatManager() - manager.Add(1, 1, 10, 10) - manager.Add(1, 1, 10, 10) - manager.Add(1, 1, 10, 10) + manager.AddBandwidth(1, 1, 10, 10) + manager.AddBandwidth(1, 1, 10, 10) + manager.AddBandwidth(1, 1, 10, 10) err := manager.Loop() if err != nil { t.Fatal(err) diff --git a/internal/stats/traffic_stat_manager.go b/internal/stats/traffic_stat_manager.go index 4bdd2e8..eb44004 100644 --- a/internal/stats/traffic_stat_manager.go +++ b/internal/stats/traffic_stat_manager.go @@ -12,6 +12,7 @@ import ( "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/maps" "github.com/iwind/TeaGo/types" + "sort" "strconv" "strings" "sync" @@ -44,8 +45,8 @@ const trafficStatsMaxLife = 1200 // 最大只保存20分钟内的数据 // TrafficStatManager 区域流量统计 type TrafficStatManager struct { - itemMap map[string]*TrafficItem // [timestamp serverId] => *TrafficItem - domainsMap map[string]*TrafficItem // timestamp @ serverId @ domain => *TrafficItem + itemMap map[string]*TrafficItem // [timestamp serverId] => *TrafficItem + domainsMap map[int64]map[string]*TrafficItem // serverIde => { timestamp @ domain => *TrafficItem } pbItems []*pb.ServerDailyStat pbDomainItems []*pb.UploadServerDailyStatsRequest_DomainStat @@ -59,7 +60,7 @@ type TrafficStatManager struct { func NewTrafficStatManager() *TrafficStatManager { var manager = &TrafficStatManager{ itemMap: map[string]*TrafficItem{}, - domainsMap: map[string]*TrafficItem{}, + domainsMap: map[int64]map[string]*TrafficItem{}, } return manager @@ -140,11 +141,17 @@ func (this *TrafficStatManager) Add(serverId int64, domain string, bytes int64, item.PlanId = planId // 单个域名流量 - var domainKey = strconv.FormatInt(timestamp, 10) + "@" + strconv.FormatInt(serverId, 10) + "@" + domain - domainItem, ok := this.domainsMap[domainKey] + var domainKey = types.String(timestamp) + "@" + domain + serverDomainMap, ok := this.domainsMap[serverId] + if !ok { + serverDomainMap = map[string]*TrafficItem{} + this.domainsMap[serverId] = serverDomainMap + } + + domainItem, ok := serverDomainMap[domainKey] if !ok { domainItem = &TrafficItem{} - this.domainsMap[domainKey] = domainItem + serverDomainMap[domainKey] = domainItem } domainItem.Bytes += bytes domainItem.CachedBytes += cachedBytes @@ -176,7 +183,7 @@ func (this *TrafficStatManager) Upload() error { // reset this.itemMap = map[string]*TrafficItem{} - this.domainsMap = map[string]*TrafficItem{} + this.domainsMap = map[int64]map[string]*TrafficItem{} this.locker.Unlock() @@ -208,23 +215,43 @@ func (this *TrafficStatManager) Upload() error { } // 域名统计 + const maxDomainsPerServer = 20 var pbDomainStats = []*pb.UploadServerDailyStatsRequest_DomainStat{} - for key, item := range domainMap { - var pieces = strings.SplitN(key, "@", 3) - if len(pieces) != 3 { - continue + for serverId, serverDomainMap := range domainMap { + // 如果超过单个服务最大值,则只取前N个 + var shouldTrim = len(serverDomainMap) > maxDomainsPerServer + var tempItems []*pb.UploadServerDailyStatsRequest_DomainStat + + for key, item := range serverDomainMap { + var pieces = strings.SplitN(key, "@", 2) + if len(pieces) != 2 { + continue + } + var pbItem = &pb.UploadServerDailyStatsRequest_DomainStat{ + ServerId: serverId, + Domain: pieces[1], + Bytes: item.Bytes, + CachedBytes: item.CachedBytes, + CountRequests: item.CountRequests, + CountCachedRequests: item.CountCachedRequests, + CountAttackRequests: item.CountAttackRequests, + AttackBytes: item.AttackBytes, + CreatedAt: types.Int64(pieces[0]), + } + if !shouldTrim { + pbDomainStats = append(pbDomainStats, pbItem) + } else { + tempItems = append(tempItems, pbItem) + } + } + + if shouldTrim { + sort.Slice(tempItems, func(i, j int) bool { + return tempItems[i].CountRequests > tempItems[j].CountRequests + }) + + pbDomainStats = append(pbDomainStats, tempItems[:maxDomainsPerServer]...) } - pbDomainStats = append(pbDomainStats, &pb.UploadServerDailyStatsRequest_DomainStat{ - ServerId: types.Int64(pieces[1]), - Domain: pieces[2], - Bytes: item.Bytes, - CachedBytes: item.CachedBytes, - CountRequests: item.CountRequests, - CountCachedRequests: item.CountCachedRequests, - CountAttackRequests: item.CountAttackRequests, - AttackBytes: item.AttackBytes, - CreatedAt: types.Int64(pieces[0]), - }) } // 历史未提交记录 diff --git a/internal/stats/traffic_stat_manager_test.go b/internal/stats/traffic_stat_manager_test.go index 4dadec3..4d632ca 100644 --- a/internal/stats/traffic_stat_manager_test.go +++ b/internal/stats/traffic_stat_manager_test.go @@ -1,6 +1,8 @@ package stats import ( + "github.com/iwind/TeaGo/rands" + "github.com/iwind/TeaGo/types" "runtime" "testing" ) @@ -16,7 +18,7 @@ func TestTrafficStatManager_Add(t *testing.T) { func TestTrafficStatManager_Upload(t *testing.T) { manager := NewTrafficStatManager() for i := 0; i < 100; i++ { - manager.Add(1, "goedge.cn", 1, 0, 0, 0, 0, 0, false, 0) + manager.Add(1, "goedge.cn"+types.String(rands.Int(0, 10)), 1, 0, 1, 0, 0, 0, false, 0) } err := manager.Upload() if err != nil {