diff --git a/internal/nodes/client_conn.go b/internal/nodes/client_conn.go index c25834b..dfda90d 100644 --- a/internal/nodes/client_conn.go +++ b/internal/nodes/client_conn.go @@ -188,6 +188,8 @@ func (this *ClientConn) Write(b []byte) (n int, err error) { var before = time.Now() n, err = this.rawConn.Write(b) if n > 0 { + atomic.AddInt64(&this.totalSentBytes, int64(n)) + // 统计当前服务带宽 if this.serverId > 0 { // TODO 需要加入在serverId绑定之前的带宽 diff --git a/internal/nodes/client_conn_base.go b/internal/nodes/client_conn_base.go index d193542..0dc85b4 100644 --- a/internal/nodes/client_conn_base.go +++ b/internal/nodes/client_conn_base.go @@ -7,6 +7,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/firewalls" "github.com/TeaOSLab/EdgeNode/internal/iplibrary" "net" + "sync/atomic" "time" ) @@ -25,6 +26,8 @@ type BaseClientConn struct { isClosed bool rawIP string + + totalSentBytes int64 } func (this *BaseClientConn) IsClosed() bool { @@ -160,3 +163,10 @@ func (this *BaseClientConn) SetFingerprint(fingerprint []byte) { func (this *BaseClientConn) Fingerprint() []byte { return this.fingerprint } + +// LastRequestBytes 读取上一次请求发送的字节数 +func (this *BaseClientConn) LastRequestBytes() int64 { + var result = atomic.LoadInt64(&this.totalSentBytes) + atomic.StoreInt64(&this.totalSentBytes, 0) + return result +} diff --git a/internal/nodes/client_conn_interface.go b/internal/nodes/client_conn_interface.go index 738602e..651f4b0 100644 --- a/internal/nodes/client_conn_interface.go +++ b/internal/nodes/client_conn_interface.go @@ -32,4 +32,7 @@ type ClientConnInterface interface { // Fingerprint 读取指纹信息 Fingerprint() []byte + + // LastRequestBytes 读取上一次请求发送的字节数 + LastRequestBytes() int64 } diff --git a/internal/nodes/client_tls_conn.go b/internal/nodes/client_tls_conn.go index 98db73b..cf51f10 100644 --- a/internal/nodes/client_tls_conn.go +++ b/internal/nodes/client_tls_conn.go @@ -82,3 +82,18 @@ func (this *ClientTLSConn) Fingerprint() []byte { } return nil } + +// LastRequestBytes 读取上一次请求发送的字节数 +func (this *ClientTLSConn) LastRequestBytes() int64 { + tlsConn, ok := this.rawConn.(*tls.Conn) + if ok { + var rawConn = tlsConn.NetConn() + if rawConn != nil { + clientConn, ok := rawConn.(*ClientConn) + if ok { + return clientConn.LastRequestBytes() + } + } + } + return 0 +} diff --git a/internal/nodes/http_request.go b/internal/nodes/http_request.go index 9809dfb..0c25299 100644 --- a/internal/nodes/http_request.go +++ b/internal/nodes/http_request.go @@ -389,6 +389,21 @@ func (this *HTTPRequest) doEnd() { // 流量统计 // TODO 增加是否开启开关 if this.ReqServer != nil && this.ReqServer.Id > 0 { + var totalBytes int64 = 0 + + var requestConn = this.RawReq.Context().Value(HTTPConnContextKey) + if requestConn != nil { + requestClientConn, ok := requestConn.(ClientConnInterface) + if ok { + // 这里读取的其实是上一个请求消耗的流量,不是当前请求消耗的流量,只不过单个请求的流量统计不需要特别精确,整体趋于一致即可 + totalBytes = requestClientConn.LastRequestBytes() + } + } + + if totalBytes == 0 { + totalBytes = this.writer.SentBodyBytes() + this.writer.SentHeaderBytes() + } + var countCached int64 = 0 var cachedBytes int64 = 0 @@ -397,14 +412,17 @@ func (this *HTTPRequest) doEnd() { if this.isCached { countCached = 1 - cachedBytes = this.writer.SentBodyBytes() + this.writer.SentHeaderBytes() + cachedBytes = totalBytes } if this.isAttack { countAttacks = 1 attackBytes = this.CalculateSize() + if attackBytes < totalBytes { + attackBytes = totalBytes + } } - stats.SharedTrafficStatManager.Add(this.ReqServer.UserId, this.ReqServer.Id, this.ReqHost, this.writer.SentBodyBytes()+this.writer.SentHeaderBytes(), cachedBytes, 1, countCached, countAttacks, attackBytes, this.ReqServer.ShouldCheckTrafficLimit(), this.ReqServer.PlanId()) + stats.SharedTrafficStatManager.Add(this.ReqServer.UserId, this.ReqServer.Id, this.ReqHost, totalBytes, cachedBytes, 1, countCached, countAttacks, attackBytes, this.ReqServer.ShouldCheckTrafficLimit(), this.ReqServer.PlanId()) // 指标 if metrics.SharedManager.HasHTTPMetrics() { diff --git a/internal/nodes/listener_base.go b/internal/nodes/listener_base.go index 6ca4392..4d7c0f4 100644 --- a/internal/nodes/listener_base.go +++ b/internal/nodes/listener_base.go @@ -38,7 +38,7 @@ func (this *BaseListener) buildTLSConfig() *tls.Config { GetConfigForClient: func(clientInfo *tls.ClientHelloInfo) (config *tls.Config, e error) { // 指纹信息 var fingerprint = this.calculateFingerprint(clientInfo) - if len(fingerprint) > 0 { + if len(fingerprint) > 0 && clientInfo.Conn != nil { clientConn, ok := clientInfo.Conn.(ClientConnInterface) if ok { clientConn.SetFingerprint(fingerprint) @@ -61,7 +61,7 @@ func (this *BaseListener) buildTLSConfig() *tls.Config { GetCertificate: func(clientInfo *tls.ClientHelloInfo) (certificate *tls.Certificate, e error) { // 指纹信息 var fingerprint = this.calculateFingerprint(clientInfo) - if len(fingerprint) > 0 { + if len(fingerprint) > 0 && clientInfo.Conn != nil { clientConn, ok := clientInfo.Conn.(ClientConnInterface) if ok { clientConn.SetFingerprint(fingerprint) @@ -235,7 +235,7 @@ func (this *BaseListener) findNamedServerMatched(name string) (serverConfig *ser // 从Hello信息中获取服务名称 func (this *BaseListener) helloServerName(clientInfo *tls.ClientHelloInfo) string { var serverName = clientInfo.ServerName - if len(serverName) == 0 { + if len(serverName) == 0 && clientInfo.Conn != nil { var localAddr = clientInfo.Conn.LocalAddr() if localAddr != nil { tcpAddr, ok := localAddr.(*net.TCPAddr) diff --git a/internal/stats/bandwidth_stat_manager.go b/internal/stats/bandwidth_stat_manager.go index b3f100d..b385963 100644 --- a/internal/stats/bandwidth_stat_manager.go +++ b/internal/stats/bandwidth_stat_manager.go @@ -132,9 +132,10 @@ func (this *BandwidthStatManager) Loop() error { for key, stat := range this.m { if stat.Day < day || stat.TimeAt < currentTime { // 防止数据出现错误 - if stat.CachedBytes > stat.TotalBytes { + if stat.CachedBytes > stat.TotalBytes || stat.CountCachedRequests == stat.CountRequests { stat.CachedBytes = stat.TotalBytes } + if stat.AttackBytes > stat.TotalBytes { stat.AttackBytes = stat.TotalBytes } diff --git a/internal/stats/traffic_stat_manager.go b/internal/stats/traffic_stat_manager.go index 68ced9e..d171df9 100644 --- a/internal/stats/traffic_stat_manager.go +++ b/internal/stats/traffic_stat_manager.go @@ -42,8 +42,6 @@ func (this *TrafficItem) Add(anotherItem *TrafficItem) { this.AttackBytes += anotherItem.AttackBytes } -const trafficStatsMaxLife = 1200 // 最大只保存20分钟内的数据 - // TrafficStatManager 区域流量统计 type TrafficStatManager struct { itemMap map[string]*TrafficItem // [timestamp serverId] => *TrafficItem @@ -231,6 +229,12 @@ func (this *TrafficStatManager) Upload() error { if len(pieces) != 2 { continue } + + // 修正数据 + if item.CachedBytes > item.Bytes || item.CountCachedRequests == item.CountRequests { + item.CachedBytes = item.Bytes + } + var pbItem = &pb.UploadServerDailyStatsRequest_DomainStat{ ServerId: serverId, Domain: pieces[1],