diff --git a/internal/nodes/http_client_pool.go b/internal/nodes/http_client_pool.go index c3cf403..4463c55 100644 --- a/internal/nodes/http_client_pool.go +++ b/internal/nodes/http_client_pool.go @@ -6,6 +6,7 @@ import ( "errors" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/goman" + "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" "github.com/pires/go-proxyproto" "golang.org/x/net/http2" "net" @@ -20,6 +21,8 @@ import ( // SharedHTTPClientPool HTTP客户端池单例 var SharedHTTPClientPool = NewHTTPClientPool() +const httpClientProxyProtocolTag = "@ProxyProtocol@" + // HTTPClientPool 客户端池 type HTTPClientPool struct { clientsMap map[string]*HTTPClient // backend key => client @@ -54,6 +57,14 @@ func (this *HTTPClientPool) Client(req *HTTPRequest, } var key = origin.UniqueKey() + "@" + originAddr + + // if we are under available ProxyProtocol, we add client ip to key to make every client unique + var isProxyProtocol = false + if proxyProtocol != nil && proxyProtocol.IsOn { + key += httpClientProxyProtocolTag + req.requestRemoteAddr(true) + isProxyProtocol = true + } + var isLnRequest = origin.Id == 0 this.locker.RLock() @@ -102,8 +113,9 @@ func (this *HTTPClientPool) Client(req *HTTPRequest, idleConns = numberCPU * 16 } - // 可以判断为Ln节点请求 - if isLnRequest { + if isProxyProtocol { // ProxyProtocol无需保持太多空闲连接 + idleConns = 3 + } else if isLnRequest { // 可以判断为Ln节点请求 maxConnections *= 8 idleConns *= 8 idleTimeout *= 4 @@ -195,22 +207,45 @@ func (this *HTTPClientPool) Client(req *HTTPRequest, // 清理不使用的Client func (this *HTTPClientPool) cleanClients() { for range this.cleanTicker.C { - var nowTime = time.Now().Unix() + var nowTime = fasttime.Now().Unix() - this.locker.Lock() + var expiredKeys = []string{} + var expiredClients = []*HTTPClient{} + + // lookup expired clients + this.locker.RLock() for k, client := range this.clientsMap { - if client.AccessTime() < nowTime+86400 { // 超过 N 秒没有调用就关闭 + if client.AccessTime() < nowTime-86400 || + (strings.Contains(k, httpClientProxyProtocolTag) && client.AccessTime() < nowTime-3600) { // 超过 N 秒没有调用就关闭 + expiredKeys = append(expiredKeys, k) + expiredClients = append(expiredClients, client) + } + } + this.locker.RUnlock() + + // remove expired keys + if len(expiredKeys) > 0 { + this.locker.Lock() + for _, k := range expiredKeys { delete(this.clientsMap, k) + } + this.locker.Unlock() + } + + // close expired clients + if len(expiredClients) > 0 { + for _, client := range expiredClients { client.Close() } } - this.locker.Unlock() } } // 支持PROXY Protocol func (this *HTTPClientPool) handlePROXYProtocol(conn net.Conn, req *HTTPRequest, proxyProtocol *serverconfigs.ProxyProtocolConfig) error { - if proxyProtocol != nil && proxyProtocol.IsOn && (proxyProtocol.Version == serverconfigs.ProxyProtocolVersion1 || proxyProtocol.Version == serverconfigs.ProxyProtocolVersion2) { + if proxyProtocol != nil && + proxyProtocol.IsOn && + (proxyProtocol.Version == serverconfigs.ProxyProtocolVersion1 || proxyProtocol.Version == serverconfigs.ProxyProtocolVersion2) { var remoteAddr = req.requestRemoteAddr(true) var transportProtocol = proxyproto.TCPv4 if strings.Contains(remoteAddr, ":") {