diff --git a/internal/nodes/client_conn.go b/internal/nodes/client_conn.go index 767de82..0f759d9 100644 --- a/internal/nodes/client_conn.go +++ b/internal/nodes/client_conn.go @@ -182,14 +182,15 @@ func (this *ClientConn) Write(b []byte) (n int, err error) { if n > 0 { // 统计当前服务带宽 if this.serverId > 0 { + // TODO 需要加入在serverId绑定之前的带宽 if !this.isLO || Tea.IsTesting() { // 环路不统计带宽,避免缓存预热等行为产生带宽 atomic.AddUint64(&teaconst.OutTrafficBytes, uint64(n)) var cost = time.Since(before).Seconds() if cost > 1 { - stats.SharedBandwidthStatManager.Add(this.userId, this.serverId, int64(float64(n)/cost), int64(n)) + stats.SharedBandwidthStatManager.AddBandwidth(this.userId, this.serverId, int64(float64(n)/cost), int64(n)) } else { - stats.SharedBandwidthStatManager.Add(this.userId, this.serverId, int64(n), int64(n)) + stats.SharedBandwidthStatManager.AddBandwidth(this.userId, this.serverId, int64(n), int64(n)) } } } diff --git a/internal/nodes/listener_udp.go b/internal/nodes/listener_udp.go index 065f9b2..1760515 100644 --- a/internal/nodes/listener_udp.go +++ b/internal/nodes/listener_udp.go @@ -404,7 +404,7 @@ func NewUDPConn(server *serverconfigs.ServerConfig, addr net.Addr, proxyListener stats.SharedTrafficStatManager.Add(server.Id, "", int64(n), 0, 0, 0, 0, 0, server.ShouldCheckTrafficLimit(), server.PlanId()) // 带宽 - stats.SharedBandwidthStatManager.Add(server.UserId, server.Id, int64(n), int64(n)) + stats.SharedBandwidthStatManager.AddBandwidth(server.UserId, server.Id, int64(n), int64(n)) } } if err != nil { diff --git a/internal/stats/bandwidth_stat_manager.go b/internal/stats/bandwidth_stat_manager.go index ee69981..64e82cc 100644 --- a/internal/stats/bandwidth_stat_manager.go +++ b/internal/stats/bandwidth_stat_manager.go @@ -43,11 +43,17 @@ type BandwidthStat struct { CurrentTimestamp int64 MaxBytes int64 TotalBytes int64 + + CachedBytes int64 + AttackBytes int64 + CountRequests int64 + CountCachedRequests int64 + CountAttackRequests int64 } // BandwidthStatManager 服务带宽统计 type BandwidthStatManager struct { - m map[string]*BandwidthStat // key => *BandwidthStat + m map[string]*BandwidthStat // serverId@day@time => *BandwidthStat pbStats []*pb.ServerBandwidthStat @@ -82,7 +88,7 @@ func (this *BandwidthStatManager) Loop() error { var now = time.Now() var day = timeutil.Format("Ymd", now) - var currentTime = timeutil.FormatTime("Hi", now.Unix()/300*300) + var currentTime = timeutil.FormatTime("Hi", now.Unix()/300*300) // 300s = 5 minutes if this.lastTime == currentTime { return nil @@ -106,15 +112,28 @@ func (this *BandwidthStatManager) Loop() error { this.locker.Lock() for key, stat := range this.m { if stat.Day < day || stat.TimeAt < currentTime { + // 防止数据出现错误 + if stat.CachedBytes > stat.TotalBytes { + stat.CachedBytes = stat.TotalBytes + } + if stat.AttackBytes > stat.TotalBytes { + stat.AttackBytes = stat.TotalBytes + } + pbStats = append(pbStats, &pb.ServerBandwidthStat{ - Id: 0, - UserId: stat.UserId, - ServerId: stat.ServerId, - Day: stat.Day, - TimeAt: stat.TimeAt, - Bytes: stat.MaxBytes / bandwidthTimestampDelim, - TotalBytes: stat.TotalBytes, - NodeRegionId: regionId, + Id: 0, + UserId: stat.UserId, + ServerId: stat.ServerId, + Day: stat.Day, + TimeAt: stat.TimeAt, + Bytes: stat.MaxBytes / bandwidthTimestampDelim, + TotalBytes: stat.TotalBytes, + CachedBytes: stat.CachedBytes, + AttackBytes: stat.AttackBytes, + CountRequests: stat.CountRequests, + CountCachedRequests: stat.CountCachedRequests, + CountAttackRequests: stat.CountAttackRequests, + NodeRegionId: regionId, }) delete(this.m, key) } @@ -138,8 +157,8 @@ func (this *BandwidthStatManager) Loop() error { return nil } -// Add 添加带宽数据 -func (this *BandwidthStatManager) Add(userId int64, serverId int64, peekBytes int64, totalBytes int64) { +// AddBandwidth 添加带宽数据 +func (this *BandwidthStatManager) AddBandwidth(userId int64, serverId int64, peekBytes int64, totalBytes int64) { if serverId <= 0 || (peekBytes == 0 && totalBytes == 0) { return } @@ -188,6 +207,25 @@ func (this *BandwidthStatManager) Add(userId int64, serverId int64, peekBytes in this.locker.Unlock() } +// AddTraffic 添加请求数据 +func (this *BandwidthStatManager) AddTraffic(serverId int64, cachedBytes int64, countRequests int64, countCachedRequests int64, countAttacks int64, attackBytes int64) { + var now = time.Now() + var day = timeutil.Format("Ymd", now) + var timeAt = timeutil.FormatTime("Hi", now.Unix()/300*300) + var key = types.String(serverId) + "@" + day + "@" + timeAt + this.locker.Lock() + // 只有有记录了才会添加 + stat, ok := this.m[key] + if ok { + stat.CachedBytes += cachedBytes + stat.CountRequests += countRequests + stat.CountCachedRequests += countCachedRequests + stat.CountAttackRequests += countAttacks + stat.AttackBytes += attackBytes + } + this.locker.Unlock() +} + func (this *BandwidthStatManager) Inspect() { this.locker.Lock() logs.PrintAsJSON(this.m) diff --git a/internal/stats/traffic_stat_manager.go b/internal/stats/traffic_stat_manager.go index b0a5bf6..4bdd2e8 100644 --- a/internal/stats/traffic_stat_manager.go +++ b/internal/stats/traffic_stat_manager.go @@ -111,6 +111,9 @@ func (this *TrafficStatManager) Add(serverId int64, domain string, bytes int64, return } + // 添加到带宽 + SharedBandwidthStatManager.AddTraffic(serverId, cachedBytes, countRequests, countCachedRequests, countAttacks, attackBytes) + if bytes == 0 && countRequests == 0 { return }