mirror of
				https://github.com/TeaOSLab/EdgeNode.git
				synced 2025-11-04 16:00:25 +08:00 
			
		
		
		
	修复HTTPS连接无法记录带宽的问题,优化带宽计算方法
This commit is contained in:
		@@ -11,6 +11,7 @@ import (
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/ttlcache"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/utils"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/waf"
 | 
			
		||||
	"github.com/iwind/TeaGo/Tea"
 | 
			
		||||
	"github.com/iwind/TeaGo/types"
 | 
			
		||||
	"net"
 | 
			
		||||
	"os"
 | 
			
		||||
@@ -22,6 +23,8 @@ import (
 | 
			
		||||
 | 
			
		||||
// ClientConn 客户端连接
 | 
			
		||||
type ClientConn struct {
 | 
			
		||||
	BaseClientConn
 | 
			
		||||
 | 
			
		||||
	once sync.Once
 | 
			
		||||
 | 
			
		||||
	isTLS       bool
 | 
			
		||||
@@ -31,8 +34,6 @@ type ClientConn struct {
 | 
			
		||||
	isLO bool // 是否为环路
 | 
			
		||||
 | 
			
		||||
	hasResetSYNFlood bool
 | 
			
		||||
 | 
			
		||||
	BaseClientConn
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewClientConn(conn net.Conn, isTLS bool, quickClose bool) net.Conn {
 | 
			
		||||
@@ -110,11 +111,10 @@ func (this *ClientConn) Read(b []byte) (n int, err error) {
 | 
			
		||||
func (this *ClientConn) Write(b []byte) (n int, err error) {
 | 
			
		||||
	n, err = this.rawConn.Write(b)
 | 
			
		||||
	if n > 0 {
 | 
			
		||||
		atomic.AddUint64(&teaconst.OutTrafficBytes, uint64(n))
 | 
			
		||||
 | 
			
		||||
		// 统计当前服务带宽
 | 
			
		||||
		if this.serverId > 0 {
 | 
			
		||||
			if !this.isLO { // 环路不统计带宽,避免缓存预热等行为产生带宽
 | 
			
		||||
			if !this.isLO || Tea.IsTesting() { // 环路不统计带宽,避免缓存预热等行为产生带宽
 | 
			
		||||
				atomic.AddUint64(&teaconst.OutTrafficBytes, uint64(n))
 | 
			
		||||
				stats.SharedBandwidthStatManager.Add(this.userId, this.serverId, int64(n))
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 
 | 
			
		||||
@@ -2,7 +2,10 @@
 | 
			
		||||
 | 
			
		||||
package nodes
 | 
			
		||||
 | 
			
		||||
import "net"
 | 
			
		||||
import (
 | 
			
		||||
	"crypto/tls"
 | 
			
		||||
	"net"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type BaseClientConn struct {
 | 
			
		||||
	rawConn net.Conn
 | 
			
		||||
@@ -42,6 +45,17 @@ func (this *BaseClientConn) Bind(serverId int64, remoteAddr string, maxConnsPerS
 | 
			
		||||
// SetServerId 设置服务ID
 | 
			
		||||
func (this *BaseClientConn) SetServerId(serverId int64) {
 | 
			
		||||
	this.serverId = serverId
 | 
			
		||||
 | 
			
		||||
	// 设置包装前连接
 | 
			
		||||
	switch conn := this.rawConn.(type) {
 | 
			
		||||
	case *tls.Conn:
 | 
			
		||||
		nativeConn, ok := conn.NetConn().(ClientConnInterface)
 | 
			
		||||
		if ok {
 | 
			
		||||
			nativeConn.SetServerId(serverId)
 | 
			
		||||
		}
 | 
			
		||||
	case *ClientConn:
 | 
			
		||||
		conn.SetServerId(serverId)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ServerId 读取当前连接绑定的服务ID
 | 
			
		||||
@@ -52,6 +66,17 @@ func (this *BaseClientConn) ServerId() int64 {
 | 
			
		||||
// SetUserId 设置所属服务的用户ID
 | 
			
		||||
func (this *BaseClientConn) SetUserId(userId int64) {
 | 
			
		||||
	this.userId = userId
 | 
			
		||||
 | 
			
		||||
	// 设置包装前连接
 | 
			
		||||
	switch conn := this.rawConn.(type) {
 | 
			
		||||
	case *tls.Conn:
 | 
			
		||||
		nativeConn, ok := conn.NetConn().(ClientConnInterface)
 | 
			
		||||
		if ok {
 | 
			
		||||
			nativeConn.SetUserId(userId)
 | 
			
		||||
		}
 | 
			
		||||
	case *ClientConn:
 | 
			
		||||
		conn.SetUserId(userId)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// UserId 获取当前连接所属服务的用户ID
 | 
			
		||||
@@ -66,9 +91,15 @@ func (this *BaseClientConn) RawIP() string {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TCPConn 转换为TCPConn
 | 
			
		||||
func (this *BaseClientConn) TCPConn() (*net.TCPConn, bool) {
 | 
			
		||||
	conn, ok := this.rawConn.(*net.TCPConn)
 | 
			
		||||
	return conn, ok
 | 
			
		||||
func (this *BaseClientConn) TCPConn() (tcpConn *net.TCPConn, ok bool) {
 | 
			
		||||
	// 设置包装前连接
 | 
			
		||||
	switch conn := this.rawConn.(type) {
 | 
			
		||||
	case *tls.Conn:
 | 
			
		||||
		tcpConn, ok = conn.NetConn().(*net.TCPConn)
 | 
			
		||||
	default:
 | 
			
		||||
		tcpConn, ok = this.rawConn.(*net.TCPConn)
 | 
			
		||||
	}
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetLinger 设置Linger
 | 
			
		||||
 
 | 
			
		||||
@@ -869,6 +869,11 @@ func (this *Node) listenSock() error {
 | 
			
		||||
				} else {
 | 
			
		||||
					_ = cmd.ReplyOk()
 | 
			
		||||
				}
 | 
			
		||||
			case "bandwidth":
 | 
			
		||||
				var m = stats.SharedBandwidthStatManager.Map()
 | 
			
		||||
				_ = cmd.Reply(&gosock.Command{Params: maps.Map{
 | 
			
		||||
					"stats": m,
 | 
			
		||||
				}})
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -17,6 +17,8 @@ import (
 | 
			
		||||
 | 
			
		||||
var SharedBandwidthStatManager = NewBandwidthStatManager()
 | 
			
		||||
 | 
			
		||||
const bandwidthTimestampDelim = 2 // N秒平均,更为精确
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	events.On(events.EventLoaded, func() {
 | 
			
		||||
		goman.New(func() {
 | 
			
		||||
@@ -83,7 +85,7 @@ func (this *BandwidthStatManager) Loop() error {
 | 
			
		||||
				ServerId: stat.ServerId,
 | 
			
		||||
				Day:      stat.Day,
 | 
			
		||||
				TimeAt:   stat.TimeAt,
 | 
			
		||||
				Bytes:    stat.MaxBytes,
 | 
			
		||||
				Bytes:    stat.MaxBytes / bandwidthTimestampDelim,
 | 
			
		||||
			})
 | 
			
		||||
			delete(this.m, key)
 | 
			
		||||
		}
 | 
			
		||||
@@ -112,11 +114,18 @@ func (this *BandwidthStatManager) Add(userId int64, serverId int64, bytes int64)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var now = time.Now()
 | 
			
		||||
	var timestamp = now.Unix()
 | 
			
		||||
	var timestamp = now.Unix() / bandwidthTimestampDelim * bandwidthTimestampDelim // 将时间戳均分成N等份
 | 
			
		||||
	var day = timeutil.Format("Ymd", now)
 | 
			
		||||
	var timeAt = timeutil.FormatTime("Hi", now.Unix()/300*300)
 | 
			
		||||
	var key = types.String(serverId) + "@" + day + "@" + timeAt
 | 
			
		||||
 | 
			
		||||
	// 增加TCP Header尺寸,这里默认MTU为1500,且默认为IPv4
 | 
			
		||||
	const mtu = 1500
 | 
			
		||||
	const tcpHeaderSize = 20
 | 
			
		||||
	if bytes > mtu {
 | 
			
		||||
		bytes += bytes * tcpHeaderSize / mtu
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	this.locker.Lock()
 | 
			
		||||
	stat, ok := this.m[key]
 | 
			
		||||
	if ok {
 | 
			
		||||
@@ -150,3 +159,15 @@ func (this *BandwidthStatManager) Inspect() {
 | 
			
		||||
	logs.PrintAsJSON(this.m)
 | 
			
		||||
	this.locker.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *BandwidthStatManager) Map() map[int64]int64 /** serverId => max bytes **/ {
 | 
			
		||||
	this.locker.Lock()
 | 
			
		||||
	defer this.locker.Unlock()
 | 
			
		||||
 | 
			
		||||
	var m = map[int64]int64{}
 | 
			
		||||
	for _, v := range this.m {
 | 
			
		||||
		m[v.ServerId] = v.MaxBytes / bandwidthTimestampDelim
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return m
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -14,7 +14,7 @@ func TestBandwidthStatManager_Add(t *testing.T) {
 | 
			
		||||
	manager.Add(1, 1, 10)
 | 
			
		||||
	manager.Add(1, 1, 10)
 | 
			
		||||
	time.Sleep(1 * time.Second)
 | 
			
		||||
	manager.Add(1, 1, 15)
 | 
			
		||||
	manager.Add(1, 1, 85)
 | 
			
		||||
	time.Sleep(1 * time.Second)
 | 
			
		||||
	manager.Add(1, 1, 25)
 | 
			
		||||
	manager.Add(1, 1, 75)
 | 
			
		||||
 
 | 
			
		||||
@@ -1,19 +1,19 @@
 | 
			
		||||
package stats
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/iplibrary"
 | 
			
		||||
	iplib "github.com/TeaOSLab/EdgeCommon/pkg/iplibrary"
 | 
			
		||||
	_ "github.com/iwind/TeaGo/bootstrap"
 | 
			
		||||
	"github.com/iwind/TeaGo/logs"
 | 
			
		||||
	"testing"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestHTTPRequestStatManager_Loop_Region(t *testing.T) {
 | 
			
		||||
	library, err := iplibrary.SharedManager.Load()
 | 
			
		||||
	err := iplib.InitDefault()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
	iplibrary.SharedLibrary = library
 | 
			
		||||
 | 
			
		||||
	manager := NewHTTPRequestStatManager()
 | 
			
		||||
	var manager = NewHTTPRequestStatManager()
 | 
			
		||||
	manager.AddRemoteAddr(11, "202.196.0.20", 0, false)
 | 
			
		||||
	manager.AddRemoteAddr(11, "202.196.0.20", 0, false) // 重复添加一个测试相加
 | 
			
		||||
	manager.AddRemoteAddr(11, "8.8.8.8", 0, false)
 | 
			
		||||
@@ -36,19 +36,13 @@ func TestHTTPRequestStatManager_Loop_Region(t *testing.T) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestHTTPRequestStatManager_Loop_UserAgent(t *testing.T) {
 | 
			
		||||
	library, err := iplibrary.SharedManager.Load()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
	iplibrary.SharedLibrary = library
 | 
			
		||||
 | 
			
		||||
	manager := NewHTTPRequestStatManager()
 | 
			
		||||
	var manager = NewHTTPRequestStatManager()
 | 
			
		||||
	manager.AddUserAgent(1, "Mozilla/5.0 (Macintosh; Intel Mac OS X 11_1_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.96 Safari/537.36")
 | 
			
		||||
	manager.AddUserAgent(1, "Mozilla/5.0 (Macintosh; Intel Mac OS X 11_1_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.96 Safari/537.36")
 | 
			
		||||
	manager.AddUserAgent(1, "Mozilla/5.0 (Macintosh; Intel Mac OS X 11) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76 Safari/537.36")
 | 
			
		||||
	manager.AddUserAgent(1, "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:49.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.96 Safari/537.36")
 | 
			
		||||
	manager.AddUserAgent(1, "Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko")
 | 
			
		||||
	err = manager.Loop()
 | 
			
		||||
	err := manager.Loop()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user