From e2d504d9e6391cd05f071dc0e3242cee13f015e2 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Thu, 9 Dec 2021 17:34:05 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E5=85=A8=E5=B1=80=E7=9A=84TC?= =?UTF-8?q?P=E6=9C=80=E5=A4=A7=E8=BF=9E=E6=8E=A5=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 3 -- internal/nodes/client_conn.go | 17 +++++++-- internal/nodes/client_listener.go | 18 +++++++++- internal/nodes/node.go | 47 +++++++++++++++++++------ internal/ratelimit/counter.go | 56 ++++++++++++++++++++++++++++++ internal/ratelimit/counter_test.go | 38 ++++++++++++++++++++ 6 files changed, 161 insertions(+), 18 deletions(-) create mode 100644 internal/ratelimit/counter.go create mode 100644 internal/ratelimit/counter_test.go diff --git a/go.mod b/go.mod index 7e45ca1..7967078 100644 --- a/go.mod +++ b/go.mod @@ -19,12 +19,9 @@ require ( github.com/iwind/gofcgi v0.0.0-20210528023741-a92711d45f11 github.com/iwind/gosock v0.0.0-20210722083328-12b2d66abec3 github.com/iwind/gowebp v0.0.0-20211029040624-7331ecc78ed8 - github.com/json-iterator/go v1.1.12 // indirect github.com/jsummers/gobmp v0.0.0-20151104160322-e2ba15ffa76e // indirect - github.com/lionsoul2014/ip2region v2.2.0-release+incompatible github.com/mattn/go-sqlite3 v1.14.9 github.com/miekg/dns v1.1.43 - github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/mssola/user_agent v0.5.2 github.com/pires/go-proxyproto v0.6.1 github.com/shirou/gopsutil v3.21.5+incompatible diff --git a/internal/nodes/client_conn.go b/internal/nodes/client_conn.go index 639f73e..a28c13c 100644 --- a/internal/nodes/client_conn.go +++ b/internal/nodes/client_conn.go @@ -8,8 +8,10 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/monitor" + "github.com/TeaOSLab/EdgeNode/internal/ratelimit" "github.com/iwind/TeaGo/maps" "net" + "sync" "sync/atomic" "time" ) @@ -44,18 +46,22 @@ func init() { type ClientConn struct { rawConn net.Conn isClosed bool + + once sync.Once + limiter *ratelimit.Counter } -func NewClientConn(conn net.Conn, quickClose bool) net.Conn { +func NewClientConn(conn net.Conn, quickClose bool, limiter *ratelimit.Counter) net.Conn { if quickClose { + // TCP tcpConn, ok := conn.(*net.TCPConn) if ok { // TODO 可以设置此值 - _ = tcpConn.SetLinger(3) + _ = tcpConn.SetLinger(nodeconfigs.DefaultTCPLinger) } } - return &ClientConn{rawConn: conn} + return &ClientConn{rawConn: conn, limiter: limiter} } func (this *ClientConn) Read(b []byte) (n int, err error) { @@ -76,6 +82,11 @@ func (this *ClientConn) Write(b []byte) (n int, err error) { func (this *ClientConn) Close() error { this.isClosed = true + this.once.Do(func() { + if this.limiter != nil { + this.limiter.Release() + } + }) return this.rawConn.Close() } diff --git a/internal/nodes/client_listener.go b/internal/nodes/client_listener.go index 1f798e0..f607ffc 100644 --- a/internal/nodes/client_listener.go +++ b/internal/nodes/client_listener.go @@ -3,12 +3,16 @@ package nodes import ( + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs" "github.com/TeaOSLab/EdgeNode/internal/iplibrary" + "github.com/TeaOSLab/EdgeNode/internal/ratelimit" "github.com/TeaOSLab/EdgeNode/internal/waf" "net" ) +var sharedConnectionsLimiter = ratelimit.NewCounter(nodeconfigs.DefaultTCPMaxConnections) + // ClientListener 客户端网络监听 type ClientListener struct { rawListener net.Listener @@ -23,10 +27,21 @@ func NewClientListener(listener net.Listener, quickClose bool) net.Listener { } func (this *ClientListener) Accept() (net.Conn, error) { + // 限制并发连接数 + var isOk = false + var limiter = sharedConnectionsLimiter + limiter.Ack() + defer func() { + if !isOk { + limiter.Release() + } + }() + conn, err := this.rawListener.Accept() if err != nil { return nil, err } + // 是否在WAF名单中 ip, _, err := net.SplitHostPort(conn.RemoteAddr().String()) if err == nil { @@ -42,7 +57,8 @@ func (this *ClientListener) Accept() (net.Conn, error) { } } - return NewClientConn(conn, this.quickClose), nil + isOk = true + return NewClientConn(conn, this.quickClose, limiter), nil } func (this *ClientListener) Close() error { diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 7936051..01fe7ed 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -14,6 +14,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/iplibrary" "github.com/TeaOSLab/EdgeNode/internal/metrics" + "github.com/TeaOSLab/EdgeNode/internal/ratelimit" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/stats" @@ -51,12 +52,15 @@ type Node struct { sock *gosock.Sock locker sync.Mutex - timezone string + maxCPU int32 + maxThreads int + timezone string } func NewNode() *Node { return &Node{ - sock: gosock.NewTmpSock(teaconst.ProcessName), + sock: gosock.NewTmpSock(teaconst.ProcessName), + maxThreads: -1, } } @@ -637,18 +641,39 @@ func (this *Node) listenSock() error { // 重载配置调用 func (this *Node) onReload(config *nodeconfigs.NodeConfig) { // max cpu - if config.MaxCPU > 0 && config.MaxCPU < int32(runtime.NumCPU()) { - runtime.GOMAXPROCS(int(config.MaxCPU)) - } else { - runtime.GOMAXPROCS(runtime.NumCPU()) + if config.MaxCPU != this.maxCPU { + if config.MaxCPU > 0 && config.MaxCPU < int32(runtime.NumCPU()) { + runtime.GOMAXPROCS(int(config.MaxCPU)) + remotelogs.Println("NODE", "[CPU]set max cpu to '"+types.String(config.MaxCPU)+"'") + } else { + runtime.GOMAXPROCS(runtime.NumCPU()) + remotelogs.Println("NODE", "[CPU]set max cpu to '"+types.String(runtime.NumCPU())+"'") + } + + this.maxCPU = config.MaxCPU } // max threads - if config.MaxThreads > 0 { - debug.SetMaxThreads(config.MaxThreads) - remotelogs.Println("NODE", "[THREADS]set max threads to '"+types.String(config.MaxThreads)+"'") - } else { - debug.SetMaxThreads(nodeconfigs.DefaultMaxThreads) + if config.MaxThreads != this.maxThreads { + if config.MaxThreads > 0 { + debug.SetMaxThreads(config.MaxThreads) + remotelogs.Println("NODE", "[THREADS]set max threads to '"+types.String(config.MaxThreads)+"'") + } else { + debug.SetMaxThreads(nodeconfigs.DefaultMaxThreads) + remotelogs.Println("NODE", "[THREADS]set max threads to '"+types.String(nodeconfigs.DefaultMaxThreads)+"'") + } + this.maxThreads = config.MaxThreads + } + + // max tcp connections + if config.TCPMaxConnections <= 0 { + config.TCPMaxConnections = nodeconfigs.DefaultTCPMaxConnections + } + if config.TCPMaxConnections != sharedConnectionsLimiter.Count() { + remotelogs.Println("NODE", "[TCP]changed tcp max connections to '"+types.String(config.TCPMaxConnections)+"'") + + sharedConnectionsLimiter.Close() + sharedConnectionsLimiter = ratelimit.NewCounter(config.TCPMaxConnections) } // timezone diff --git a/internal/ratelimit/counter.go b/internal/ratelimit/counter.go new file mode 100644 index 0000000..7bf012a --- /dev/null +++ b/internal/ratelimit/counter.go @@ -0,0 +1,56 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package ratelimit + +import ( + "github.com/TeaOSLab/EdgeNode/internal/zero" + "sync" +) + +type Counter struct { + count int + sem chan zero.Zero + done chan zero.Zero + closeOnce sync.Once +} + +func NewCounter(count int) *Counter { + return &Counter{ + count: count, + sem: make(chan zero.Zero, count), + done: make(chan zero.Zero), + } +} + +func (this *Counter) Count() int { + return this.count +} + +// Len 已占用数量 +// 注意:非线程安全 +func (this *Counter) Len() int { + return len(this.sem) +} + +func (this *Counter) Ack() bool { + select { + case this.sem <- zero.New(): + return true + case <-this.done: + return false + } +} + +func (this *Counter) Release() { + select { + case <-this.sem: + default: + // 总是能Release成功 + } +} + +func (this *Counter) Close() { + this.closeOnce.Do(func() { + close(this.done) + }) +} diff --git a/internal/ratelimit/counter_test.go b/internal/ratelimit/counter_test.go new file mode 100644 index 0000000..9587064 --- /dev/null +++ b/internal/ratelimit/counter_test.go @@ -0,0 +1,38 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package ratelimit + +import ( + "testing" + "time" +) + +func TestCounter_ACK(t *testing.T) { + var counter = NewCounter(10) + + go func() { + for i := 0; i < 10; i++ { + counter.Ack() + } + //counter.Release() + t.Log("waiting", time.Now().Unix()) + counter.Ack() + t.Log("done", time.Now().Unix()) + }() + + time.Sleep(1 * time.Second) + counter.Close() + time.Sleep(1 * time.Second) +} + +func TestCounter_Release(t *testing.T) { + var counter = NewCounter(10) + + for i := 0; i < 10; i++ { + counter.Ack() + } + for i := 0; i < 10; i++ { + counter.Release() + } + t.Log(len(counter.sem)) +}