mirror of
				https://github.com/TeaOSLab/EdgeNode.git
				synced 2025-11-04 07:40:56 +08:00 
			
		
		
		
	实现全局的TCP最大连接数
This commit is contained in:
		
							
								
								
									
										3
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										3
									
								
								go.mod
									
									
									
									
									
								
							@@ -19,12 +19,9 @@ require (
 | 
				
			|||||||
	github.com/iwind/gofcgi v0.0.0-20210528023741-a92711d45f11
 | 
						github.com/iwind/gofcgi v0.0.0-20210528023741-a92711d45f11
 | 
				
			||||||
	github.com/iwind/gosock v0.0.0-20210722083328-12b2d66abec3
 | 
						github.com/iwind/gosock v0.0.0-20210722083328-12b2d66abec3
 | 
				
			||||||
	github.com/iwind/gowebp v0.0.0-20211029040624-7331ecc78ed8
 | 
						github.com/iwind/gowebp v0.0.0-20211029040624-7331ecc78ed8
 | 
				
			||||||
	github.com/json-iterator/go v1.1.12 // indirect
 | 
					 | 
				
			||||||
	github.com/jsummers/gobmp v0.0.0-20151104160322-e2ba15ffa76e // indirect
 | 
						github.com/jsummers/gobmp v0.0.0-20151104160322-e2ba15ffa76e // indirect
 | 
				
			||||||
	github.com/lionsoul2014/ip2region v2.2.0-release+incompatible
 | 
					 | 
				
			||||||
	github.com/mattn/go-sqlite3 v1.14.9
 | 
						github.com/mattn/go-sqlite3 v1.14.9
 | 
				
			||||||
	github.com/miekg/dns v1.1.43
 | 
						github.com/miekg/dns v1.1.43
 | 
				
			||||||
	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
 | 
					 | 
				
			||||||
	github.com/mssola/user_agent v0.5.2
 | 
						github.com/mssola/user_agent v0.5.2
 | 
				
			||||||
	github.com/pires/go-proxyproto v0.6.1
 | 
						github.com/pires/go-proxyproto v0.6.1
 | 
				
			||||||
	github.com/shirou/gopsutil v3.21.5+incompatible
 | 
						github.com/shirou/gopsutil v3.21.5+incompatible
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -8,8 +8,10 @@ import (
 | 
				
			|||||||
	"github.com/TeaOSLab/EdgeNode/internal/events"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/events"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/goman"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/goman"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/monitor"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/monitor"
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeNode/internal/ratelimit"
 | 
				
			||||||
	"github.com/iwind/TeaGo/maps"
 | 
						"github.com/iwind/TeaGo/maps"
 | 
				
			||||||
	"net"
 | 
						"net"
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
	"sync/atomic"
 | 
						"sync/atomic"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
@@ -44,18 +46,22 @@ func init() {
 | 
				
			|||||||
type ClientConn struct {
 | 
					type ClientConn struct {
 | 
				
			||||||
	rawConn  net.Conn
 | 
						rawConn  net.Conn
 | 
				
			||||||
	isClosed bool
 | 
						isClosed bool
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						once    sync.Once
 | 
				
			||||||
 | 
						limiter *ratelimit.Counter
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func NewClientConn(conn net.Conn, quickClose bool) net.Conn {
 | 
					func NewClientConn(conn net.Conn, quickClose bool, limiter *ratelimit.Counter) net.Conn {
 | 
				
			||||||
	if quickClose {
 | 
						if quickClose {
 | 
				
			||||||
 | 
							// TCP
 | 
				
			||||||
		tcpConn, ok := conn.(*net.TCPConn)
 | 
							tcpConn, ok := conn.(*net.TCPConn)
 | 
				
			||||||
		if ok {
 | 
							if ok {
 | 
				
			||||||
			// TODO 可以设置此值
 | 
								// TODO 可以设置此值
 | 
				
			||||||
			_ = tcpConn.SetLinger(3)
 | 
								_ = tcpConn.SetLinger(nodeconfigs.DefaultTCPLinger)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return &ClientConn{rawConn: conn}
 | 
						return &ClientConn{rawConn: conn, limiter: limiter}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (this *ClientConn) Read(b []byte) (n int, err error) {
 | 
					func (this *ClientConn) Read(b []byte) (n int, err error) {
 | 
				
			||||||
@@ -76,6 +82,11 @@ func (this *ClientConn) Write(b []byte) (n int, err error) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func (this *ClientConn) Close() error {
 | 
					func (this *ClientConn) Close() error {
 | 
				
			||||||
	this.isClosed = true
 | 
						this.isClosed = true
 | 
				
			||||||
 | 
						this.once.Do(func() {
 | 
				
			||||||
 | 
							if this.limiter != nil {
 | 
				
			||||||
 | 
								this.limiter.Release()
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
	return this.rawConn.Close()
 | 
						return this.rawConn.Close()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -3,12 +3,16 @@
 | 
				
			|||||||
package nodes
 | 
					package nodes
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs"
 | 
						"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/iplibrary"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/iplibrary"
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeNode/internal/ratelimit"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/waf"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/waf"
 | 
				
			||||||
	"net"
 | 
						"net"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var sharedConnectionsLimiter = ratelimit.NewCounter(nodeconfigs.DefaultTCPMaxConnections)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// ClientListener 客户端网络监听
 | 
					// ClientListener 客户端网络监听
 | 
				
			||||||
type ClientListener struct {
 | 
					type ClientListener struct {
 | 
				
			||||||
	rawListener net.Listener
 | 
						rawListener net.Listener
 | 
				
			||||||
@@ -23,10 +27,21 @@ func NewClientListener(listener net.Listener, quickClose bool) net.Listener {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (this *ClientListener) Accept() (net.Conn, error) {
 | 
					func (this *ClientListener) Accept() (net.Conn, error) {
 | 
				
			||||||
 | 
						// 限制并发连接数
 | 
				
			||||||
 | 
						var isOk = false
 | 
				
			||||||
 | 
						var limiter = sharedConnectionsLimiter
 | 
				
			||||||
 | 
						limiter.Ack()
 | 
				
			||||||
 | 
						defer func() {
 | 
				
			||||||
 | 
							if !isOk {
 | 
				
			||||||
 | 
								limiter.Release()
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	conn, err := this.rawListener.Accept()
 | 
						conn, err := this.rawListener.Accept()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 是否在WAF名单中
 | 
						// 是否在WAF名单中
 | 
				
			||||||
	ip, _, err := net.SplitHostPort(conn.RemoteAddr().String())
 | 
						ip, _, err := net.SplitHostPort(conn.RemoteAddr().String())
 | 
				
			||||||
	if err == nil {
 | 
						if err == nil {
 | 
				
			||||||
@@ -42,7 +57,8 @@ func (this *ClientListener) Accept() (net.Conn, error) {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return NewClientConn(conn, this.quickClose), nil
 | 
						isOk = true
 | 
				
			||||||
 | 
						return NewClientConn(conn, this.quickClose, limiter), nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (this *ClientListener) Close() error {
 | 
					func (this *ClientListener) Close() error {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -14,6 +14,7 @@ import (
 | 
				
			|||||||
	"github.com/TeaOSLab/EdgeNode/internal/goman"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/goman"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/iplibrary"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/iplibrary"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/metrics"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/metrics"
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeNode/internal/ratelimit"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/rpc"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/rpc"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/stats"
 | 
						"github.com/TeaOSLab/EdgeNode/internal/stats"
 | 
				
			||||||
@@ -51,12 +52,15 @@ type Node struct {
 | 
				
			|||||||
	sock     *gosock.Sock
 | 
						sock     *gosock.Sock
 | 
				
			||||||
	locker   sync.Mutex
 | 
						locker   sync.Mutex
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	timezone string
 | 
						maxCPU     int32
 | 
				
			||||||
 | 
						maxThreads int
 | 
				
			||||||
 | 
						timezone   string
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func NewNode() *Node {
 | 
					func NewNode() *Node {
 | 
				
			||||||
	return &Node{
 | 
						return &Node{
 | 
				
			||||||
		sock: gosock.NewTmpSock(teaconst.ProcessName),
 | 
							sock:       gosock.NewTmpSock(teaconst.ProcessName),
 | 
				
			||||||
 | 
							maxThreads: -1,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -637,18 +641,39 @@ func (this *Node) listenSock() error {
 | 
				
			|||||||
// 重载配置调用
 | 
					// 重载配置调用
 | 
				
			||||||
func (this *Node) onReload(config *nodeconfigs.NodeConfig) {
 | 
					func (this *Node) onReload(config *nodeconfigs.NodeConfig) {
 | 
				
			||||||
	// max cpu
 | 
						// max cpu
 | 
				
			||||||
	if config.MaxCPU > 0 && config.MaxCPU < int32(runtime.NumCPU()) {
 | 
						if config.MaxCPU != this.maxCPU {
 | 
				
			||||||
		runtime.GOMAXPROCS(int(config.MaxCPU))
 | 
							if config.MaxCPU > 0 && config.MaxCPU < int32(runtime.NumCPU()) {
 | 
				
			||||||
	} else {
 | 
								runtime.GOMAXPROCS(int(config.MaxCPU))
 | 
				
			||||||
		runtime.GOMAXPROCS(runtime.NumCPU())
 | 
								remotelogs.Println("NODE", "[CPU]set max cpu to '"+types.String(config.MaxCPU)+"'")
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								runtime.GOMAXPROCS(runtime.NumCPU())
 | 
				
			||||||
 | 
								remotelogs.Println("NODE", "[CPU]set max cpu to '"+types.String(runtime.NumCPU())+"'")
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							this.maxCPU = config.MaxCPU
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// max threads
 | 
						// max threads
 | 
				
			||||||
	if config.MaxThreads > 0 {
 | 
						if config.MaxThreads != this.maxThreads {
 | 
				
			||||||
		debug.SetMaxThreads(config.MaxThreads)
 | 
							if config.MaxThreads > 0 {
 | 
				
			||||||
		remotelogs.Println("NODE", "[THREADS]set max threads to '"+types.String(config.MaxThreads)+"'")
 | 
								debug.SetMaxThreads(config.MaxThreads)
 | 
				
			||||||
	} else {
 | 
								remotelogs.Println("NODE", "[THREADS]set max threads to '"+types.String(config.MaxThreads)+"'")
 | 
				
			||||||
		debug.SetMaxThreads(nodeconfigs.DefaultMaxThreads)
 | 
							} else {
 | 
				
			||||||
 | 
								debug.SetMaxThreads(nodeconfigs.DefaultMaxThreads)
 | 
				
			||||||
 | 
								remotelogs.Println("NODE", "[THREADS]set max threads to '"+types.String(nodeconfigs.DefaultMaxThreads)+"'")
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							this.maxThreads = config.MaxThreads
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// max tcp connections
 | 
				
			||||||
 | 
						if config.TCPMaxConnections <= 0 {
 | 
				
			||||||
 | 
							config.TCPMaxConnections = nodeconfigs.DefaultTCPMaxConnections
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if config.TCPMaxConnections != sharedConnectionsLimiter.Count() {
 | 
				
			||||||
 | 
							remotelogs.Println("NODE", "[TCP]changed tcp max connections to '"+types.String(config.TCPMaxConnections)+"'")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							sharedConnectionsLimiter.Close()
 | 
				
			||||||
 | 
							sharedConnectionsLimiter = ratelimit.NewCounter(config.TCPMaxConnections)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// timezone
 | 
						// timezone
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										56
									
								
								internal/ratelimit/counter.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										56
									
								
								internal/ratelimit/counter.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,56 @@
 | 
				
			|||||||
 | 
					// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package ratelimit
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeNode/internal/zero"
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type Counter struct {
 | 
				
			||||||
 | 
						count     int
 | 
				
			||||||
 | 
						sem       chan zero.Zero
 | 
				
			||||||
 | 
						done      chan zero.Zero
 | 
				
			||||||
 | 
						closeOnce sync.Once
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func NewCounter(count int) *Counter {
 | 
				
			||||||
 | 
						return &Counter{
 | 
				
			||||||
 | 
							count: count,
 | 
				
			||||||
 | 
							sem:   make(chan zero.Zero, count),
 | 
				
			||||||
 | 
							done:  make(chan zero.Zero),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (this *Counter) Count() int {
 | 
				
			||||||
 | 
						return this.count
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Len 已占用数量
 | 
				
			||||||
 | 
					// 注意:非线程安全
 | 
				
			||||||
 | 
					func (this *Counter) Len() int {
 | 
				
			||||||
 | 
						return len(this.sem)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (this *Counter) Ack() bool {
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case this.sem <- zero.New():
 | 
				
			||||||
 | 
							return true
 | 
				
			||||||
 | 
						case <-this.done:
 | 
				
			||||||
 | 
							return false
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (this *Counter) Release() {
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case <-this.sem:
 | 
				
			||||||
 | 
						default:
 | 
				
			||||||
 | 
							// 总是能Release成功
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (this *Counter) Close() {
 | 
				
			||||||
 | 
						this.closeOnce.Do(func() {
 | 
				
			||||||
 | 
							close(this.done)
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										38
									
								
								internal/ratelimit/counter_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										38
									
								
								internal/ratelimit/counter_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,38 @@
 | 
				
			|||||||
 | 
					// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package ratelimit
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"testing"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestCounter_ACK(t *testing.T) {
 | 
				
			||||||
 | 
						var counter = NewCounter(10)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						go func() {
 | 
				
			||||||
 | 
							for i := 0; i < 10; i++ {
 | 
				
			||||||
 | 
								counter.Ack()
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							//counter.Release()
 | 
				
			||||||
 | 
							t.Log("waiting", time.Now().Unix())
 | 
				
			||||||
 | 
							counter.Ack()
 | 
				
			||||||
 | 
							t.Log("done", time.Now().Unix())
 | 
				
			||||||
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						time.Sleep(1 * time.Second)
 | 
				
			||||||
 | 
						counter.Close()
 | 
				
			||||||
 | 
						time.Sleep(1 * time.Second)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestCounter_Release(t *testing.T) {
 | 
				
			||||||
 | 
						var counter = NewCounter(10)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for i := 0; i < 10; i++ {
 | 
				
			||||||
 | 
							counter.Ack()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						for i := 0; i < 10; i++ {
 | 
				
			||||||
 | 
							counter.Release()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						t.Log(len(counter.sem))
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
		Reference in New Issue
	
	Block a user