diff --git a/internal/nodes/http_request.go b/internal/nodes/http_request.go index 8bfde85..865f9d6 100644 --- a/internal/nodes/http_request.go +++ b/internal/nodes/http_request.go @@ -243,9 +243,9 @@ func (this *HTTPRequest) doEnd() { // TODO 增加是否开启开关 if this.Server != nil { if this.isCached { - stats.SharedTrafficStatManager.Add(this.Server.Id, this.writer.sentBodyBytes, this.writer.sentBodyBytes, 1, 1) + stats.SharedTrafficStatManager.Add(this.Server.Id, this.Host, this.writer.sentBodyBytes, this.writer.sentBodyBytes, 1, 1) } else { - stats.SharedTrafficStatManager.Add(this.Server.Id, this.writer.sentBodyBytes, 0, 1, 0) + stats.SharedTrafficStatManager.Add(this.Server.Id, this.Host, this.writer.sentBodyBytes, 0, 1, 0) } } diff --git a/internal/nodes/listener_tcp.go b/internal/nodes/listener_tcp.go index 235e31e..045989f 100644 --- a/internal/nodes/listener_tcp.go +++ b/internal/nodes/listener_tcp.go @@ -80,7 +80,7 @@ func (this *TCPListener) handleConn(conn net.Conn) error { } // 记录流量 - stats.SharedTrafficStatManager.Add(firstServer.Id, int64(n), 0, 0, 0) + stats.SharedTrafficStatManager.Add(firstServer.Id, "", int64(n), 0, 0, 0) } if err != nil { closer() diff --git a/internal/nodes/listener_udp.go b/internal/nodes/listener_udp.go index f2bdc85..3e17968 100644 --- a/internal/nodes/listener_udp.go +++ b/internal/nodes/listener_udp.go @@ -164,7 +164,7 @@ func NewUDPConn(serverId int64, addr net.Addr, proxyConn *net.UDPConn, serverCon } // 记录流量 - stats.SharedTrafficStatManager.Add(serverId, int64(n), 0, 0, 0) + stats.SharedTrafficStatManager.Add(serverId, "", int64(n), 0, 0, 0) } if err != nil { conn.isOk = false diff --git a/internal/stats/http_request_stat_manager.go b/internal/stats/http_request_stat_manager.go index ce1a7a2..2bce948 100644 --- a/internal/stats/http_request_stat_manager.go +++ b/internal/stats/http_request_stat_manager.go @@ -38,10 +38,10 @@ func NewHTTPRequestStatManager() *HTTPRequestStatManager { ipChan: make(chan string, 10_000), // TODO 将来可以配置容量 userAgentChan: make(chan string, 10_000), // TODO 将来可以配置容量 firewallRuleGroupChan: make(chan string, 10_000), // TODO 将来可以配置容量 - cityMap: map[string]int64{}, - providerMap: map[string]int64{}, - systemMap: map[string]int64{}, - browserMap: map[string]int64{}, + cityMap: map[string]int64{}, + providerMap: map[string]int64{}, + systemMap: map[string]int64{}, + browserMap: map[string]int64{}, dailyFirewallRuleGroupMap: map[string]int64{}, } } diff --git a/internal/stats/traffic_stat_manager.go b/internal/stats/traffic_stat_manager.go index af50616..a2452eb 100644 --- a/internal/stats/traffic_stat_manager.go +++ b/internal/stats/traffic_stat_manager.go @@ -8,7 +8,9 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/types" "strconv" + "strings" "sync" "time" ) @@ -24,7 +26,8 @@ type TrafficItem struct { // TrafficStatManager 区域流量统计 type TrafficStatManager struct { - itemMap map[string]*TrafficItem // [timestamp serverId] => bytes + itemMap map[string]*TrafficItem // [timestamp serverId] => *TrafficItem + domainsMap map[string]*TrafficItem // timestamp @ serverId @ domain => *TrafficItem locker sync.Mutex configFunc func() *nodeconfigs.NodeConfig } @@ -32,7 +35,8 @@ type TrafficStatManager struct { // NewTrafficStatManager 获取新对象 func NewTrafficStatManager() *TrafficStatManager { manager := &TrafficStatManager{ - itemMap: map[string]*TrafficItem{}, + itemMap: map[string]*TrafficItem{}, + domainsMap: map[string]*TrafficItem{}, } return manager @@ -62,7 +66,7 @@ func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig) } // Add 添加流量 -func (this *TrafficStatManager) Add(serverId int64, bytes int64, cachedBytes int64, countRequests int64, countCachedRequests int64) { +func (this *TrafficStatManager) Add(serverId int64, domain string, bytes int64, cachedBytes int64, countRequests int64, countCachedRequests int64) { if bytes == 0 { return } @@ -71,6 +75,8 @@ func (this *TrafficStatManager) Add(serverId int64, bytes int64, cachedBytes int key := strconv.FormatInt(timestamp, 10) + strconv.FormatInt(serverId, 10) this.locker.Lock() + + // 总的流量 item, ok := this.itemMap[key] if !ok { item = &TrafficItem{} @@ -80,6 +86,19 @@ func (this *TrafficStatManager) Add(serverId int64, bytes int64, cachedBytes int item.CachedBytes += cachedBytes item.CountRequests += countRequests item.CountCachedRequests += countCachedRequests + + // 单个域名流量 + var domainKey = strconv.FormatInt(timestamp, 10) + "@" + strconv.FormatInt(serverId, 10) + "@" + domain + domainItem, ok := this.domainsMap[domainKey] + if !ok { + domainItem = &TrafficItem{} + this.domainsMap[domainKey] = domainItem + } + domainItem.Bytes += bytes + domainItem.CachedBytes += cachedBytes + domainItem.CountRequests += countRequests + domainItem.CountCachedRequests += countCachedRequests + this.locker.Unlock() } @@ -96,12 +115,15 @@ func (this *TrafficStatManager) Upload() error { } this.locker.Lock() - m := this.itemMap + itemMap := this.itemMap + domainMap := this.domainsMap this.itemMap = map[string]*TrafficItem{} + this.domainsMap = map[string]*TrafficItem{} this.locker.Unlock() - pbStats := []*pb.ServerDailyStat{} - for key, item := range m { + // 服务统计 + var pbServerStats = []*pb.ServerDailyStat{} + for key, item := range itemMap { timestamp, err := strconv.ParseInt(key[:10], 10, 64) if err != nil { return err @@ -111,7 +133,7 @@ func (this *TrafficStatManager) Upload() error { return err } - pbStats = append(pbStats, &pb.ServerDailyStat{ + pbServerStats = append(pbServerStats, &pb.ServerDailyStat{ ServerId: serverId, RegionId: config.RegionId, Bytes: item.Bytes, @@ -121,9 +143,31 @@ func (this *TrafficStatManager) Upload() error { CreatedAt: timestamp, }) } - if len(pbStats) == 0 { + if len(pbServerStats) == 0 { return nil } - _, err = client.ServerDailyStatRPC().UploadServerDailyStats(client.Context(), &pb.UploadServerDailyStatsRequest{Stats: pbStats}) + + // 域名统计 + var pbDomainStats = []*pb.UploadServerDailyStatsRequest_DomainStat{} + for key, item := range domainMap { + var pieces = strings.SplitN(key, "@", 3) + if len(pieces) != 3 { + continue + } + pbDomainStats = append(pbDomainStats, &pb.UploadServerDailyStatsRequest_DomainStat{ + ServerId: types.Int64(pieces[1]), + Domain: pieces[2], + Bytes: item.Bytes, + CachedBytes: item.CachedBytes, + CountRequests: item.CountRequests, + CountCachedRequests: item.CountCachedRequests, + CreatedAt: types.Int64(pieces[0]), + }) + } + + _, err = client.ServerDailyStatRPC().UploadServerDailyStats(client.Context(), &pb.UploadServerDailyStatsRequest{ + Stats: pbServerStats, + DomainStats: pbDomainStats, + }) return err }