diff --git a/internal/nodes/client_conn.go b/internal/nodes/client_conn.go index 885c540..399d840 100644 --- a/internal/nodes/client_conn.go +++ b/internal/nodes/client_conn.go @@ -11,6 +11,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/ttlcache" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/waf" + "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/types" "net" "os" @@ -22,6 +23,8 @@ import ( // ClientConn 客户端连接 type ClientConn struct { + BaseClientConn + once sync.Once isTLS bool @@ -31,8 +34,6 @@ type ClientConn struct { isLO bool // 是否为环路 hasResetSYNFlood bool - - BaseClientConn } func NewClientConn(conn net.Conn, isTLS bool, quickClose bool) net.Conn { @@ -110,11 +111,10 @@ func (this *ClientConn) Read(b []byte) (n int, err error) { func (this *ClientConn) Write(b []byte) (n int, err error) { n, err = this.rawConn.Write(b) if n > 0 { - atomic.AddUint64(&teaconst.OutTrafficBytes, uint64(n)) - // 统计当前服务带宽 if this.serverId > 0 { - if !this.isLO { // 环路不统计带宽,避免缓存预热等行为产生带宽 + if !this.isLO || Tea.IsTesting() { // 环路不统计带宽,避免缓存预热等行为产生带宽 + atomic.AddUint64(&teaconst.OutTrafficBytes, uint64(n)) stats.SharedBandwidthStatManager.Add(this.userId, this.serverId, int64(n)) } } diff --git a/internal/nodes/client_conn_base.go b/internal/nodes/client_conn_base.go index fc8f6b6..0341070 100644 --- a/internal/nodes/client_conn_base.go +++ b/internal/nodes/client_conn_base.go @@ -2,7 +2,10 @@ package nodes -import "net" +import ( + "crypto/tls" + "net" +) type BaseClientConn struct { rawConn net.Conn @@ -42,6 +45,17 @@ func (this *BaseClientConn) Bind(serverId int64, remoteAddr string, maxConnsPerS // SetServerId 设置服务ID func (this *BaseClientConn) SetServerId(serverId int64) { this.serverId = serverId + + // 设置包装前连接 + switch conn := this.rawConn.(type) { + case *tls.Conn: + nativeConn, ok := conn.NetConn().(ClientConnInterface) + if ok { + nativeConn.SetServerId(serverId) + } + case *ClientConn: + conn.SetServerId(serverId) + } } // ServerId 读取当前连接绑定的服务ID @@ -52,6 +66,17 @@ func (this *BaseClientConn) ServerId() int64 { // SetUserId 设置所属服务的用户ID func (this *BaseClientConn) SetUserId(userId int64) { this.userId = userId + + // 设置包装前连接 + switch conn := this.rawConn.(type) { + case *tls.Conn: + nativeConn, ok := conn.NetConn().(ClientConnInterface) + if ok { + nativeConn.SetUserId(userId) + } + case *ClientConn: + conn.SetUserId(userId) + } } // UserId 获取当前连接所属服务的用户ID @@ -66,9 +91,15 @@ func (this *BaseClientConn) RawIP() string { } // TCPConn 转换为TCPConn -func (this *BaseClientConn) TCPConn() (*net.TCPConn, bool) { - conn, ok := this.rawConn.(*net.TCPConn) - return conn, ok +func (this *BaseClientConn) TCPConn() (tcpConn *net.TCPConn, ok bool) { + // 设置包装前连接 + switch conn := this.rawConn.(type) { + case *tls.Conn: + tcpConn, ok = conn.NetConn().(*net.TCPConn) + default: + tcpConn, ok = this.rawConn.(*net.TCPConn) + } + return } // SetLinger 设置Linger diff --git a/internal/nodes/node.go b/internal/nodes/node.go index fcf81d1..36a18f9 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -869,6 +869,11 @@ func (this *Node) listenSock() error { } else { _ = cmd.ReplyOk() } + case "bandwidth": + var m = stats.SharedBandwidthStatManager.Map() + _ = cmd.Reply(&gosock.Command{Params: maps.Map{ + "stats": m, + }}) } }) diff --git a/internal/stats/bandwidth_stat_manager.go b/internal/stats/bandwidth_stat_manager.go index 088ba5f..5e3b848 100644 --- a/internal/stats/bandwidth_stat_manager.go +++ b/internal/stats/bandwidth_stat_manager.go @@ -17,6 +17,8 @@ import ( var SharedBandwidthStatManager = NewBandwidthStatManager() +const bandwidthTimestampDelim = 2 // N秒平均,更为精确 + func init() { events.On(events.EventLoaded, func() { goman.New(func() { @@ -83,7 +85,7 @@ func (this *BandwidthStatManager) Loop() error { ServerId: stat.ServerId, Day: stat.Day, TimeAt: stat.TimeAt, - Bytes: stat.MaxBytes, + Bytes: stat.MaxBytes / bandwidthTimestampDelim, }) delete(this.m, key) } @@ -112,11 +114,18 @@ func (this *BandwidthStatManager) Add(userId int64, serverId int64, bytes int64) } var now = time.Now() - var timestamp = now.Unix() + var timestamp = now.Unix() / bandwidthTimestampDelim * bandwidthTimestampDelim // 将时间戳均分成N等份 var day = timeutil.Format("Ymd", now) var timeAt = timeutil.FormatTime("Hi", now.Unix()/300*300) var key = types.String(serverId) + "@" + day + "@" + timeAt + // 增加TCP Header尺寸,这里默认MTU为1500,且默认为IPv4 + const mtu = 1500 + const tcpHeaderSize = 20 + if bytes > mtu { + bytes += bytes * tcpHeaderSize / mtu + } + this.locker.Lock() stat, ok := this.m[key] if ok { @@ -150,3 +159,15 @@ func (this *BandwidthStatManager) Inspect() { logs.PrintAsJSON(this.m) this.locker.Unlock() } + +func (this *BandwidthStatManager) Map() map[int64]int64 /** serverId => max bytes **/ { + this.locker.Lock() + defer this.locker.Unlock() + + var m = map[int64]int64{} + for _, v := range this.m { + m[v.ServerId] = v.MaxBytes / bandwidthTimestampDelim + } + + return m +} diff --git a/internal/stats/bandwidth_stat_manager_test.go b/internal/stats/bandwidth_stat_manager_test.go index 1169bd7..d22cd7f 100644 --- a/internal/stats/bandwidth_stat_manager_test.go +++ b/internal/stats/bandwidth_stat_manager_test.go @@ -14,7 +14,7 @@ func TestBandwidthStatManager_Add(t *testing.T) { manager.Add(1, 1, 10) manager.Add(1, 1, 10) time.Sleep(1 * time.Second) - manager.Add(1, 1, 15) + manager.Add(1, 1, 85) time.Sleep(1 * time.Second) manager.Add(1, 1, 25) manager.Add(1, 1, 75) diff --git a/internal/stats/http_request_stat_manager_test.go b/internal/stats/http_request_stat_manager_test.go index d0b1391..7fd3cf6 100644 --- a/internal/stats/http_request_stat_manager_test.go +++ b/internal/stats/http_request_stat_manager_test.go @@ -1,19 +1,19 @@ package stats import ( - "github.com/TeaOSLab/EdgeNode/internal/iplibrary" + iplib "github.com/TeaOSLab/EdgeCommon/pkg/iplibrary" + _ "github.com/iwind/TeaGo/bootstrap" "github.com/iwind/TeaGo/logs" "testing" ) func TestHTTPRequestStatManager_Loop_Region(t *testing.T) { - library, err := iplibrary.SharedManager.Load() + err := iplib.InitDefault() if err != nil { t.Fatal(err) } - iplibrary.SharedLibrary = library - manager := NewHTTPRequestStatManager() + var manager = NewHTTPRequestStatManager() manager.AddRemoteAddr(11, "202.196.0.20", 0, false) manager.AddRemoteAddr(11, "202.196.0.20", 0, false) // 重复添加一个测试相加 manager.AddRemoteAddr(11, "8.8.8.8", 0, false) @@ -36,19 +36,13 @@ func TestHTTPRequestStatManager_Loop_Region(t *testing.T) { } func TestHTTPRequestStatManager_Loop_UserAgent(t *testing.T) { - library, err := iplibrary.SharedManager.Load() - if err != nil { - t.Fatal(err) - } - iplibrary.SharedLibrary = library - - manager := NewHTTPRequestStatManager() + var manager = NewHTTPRequestStatManager() manager.AddUserAgent(1, "Mozilla/5.0 (Macintosh; Intel Mac OS X 11_1_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.96 Safari/537.36") manager.AddUserAgent(1, "Mozilla/5.0 (Macintosh; Intel Mac OS X 11_1_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.96 Safari/537.36") manager.AddUserAgent(1, "Mozilla/5.0 (Macintosh; Intel Mac OS X 11) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76 Safari/537.36") manager.AddUserAgent(1, "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:49.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.96 Safari/537.36") manager.AddUserAgent(1, "Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko") - err = manager.Loop() + err := manager.Loop() if err != nil { t.Fatal(err) }