From 9b6ab2fa8b22bad99c57840e7eba32483f45d422 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E7=A5=A5=E8=B6=85?= Date: Fri, 14 Jan 2022 11:21:28 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/nodes/http_client_pool.go | 127 +++++++++++++++++------------ 1 file changed, 75 insertions(+), 52 deletions(-) diff --git a/internal/nodes/http_client_pool.go b/internal/nodes/http_client_pool.go index 58d0839..9918bc8 100644 --- a/internal/nodes/http_client_pool.go +++ b/internal/nodes/http_client_pool.go @@ -103,34 +103,13 @@ func (this *HTTPClientPool) Client(req *HTTPRequest, origin *serverconfigs.Origi var transport = &http.Transport{ DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { // 支持TOA的连接 - toaConfig := sharedTOAManager.Config() - if toaConfig != nil && toaConfig.IsOn { - retries := 3 - for i := 1; i <= retries; i++ { - port := int(toaConfig.RandLocalPort()) - // TODO 思考是否支持X-Real-IP/X-Forwarded-IP - err := sharedTOAManager.SendMsg("add:" + strconv.Itoa(port) + ":" + req.requestRemoteAddr(true)) - if err != nil { - remotelogs.Error("TOA", "add failed: "+err.Error()) - } else { - dialer := net.Dialer{ - Timeout: connectionTimeout, - KeepAlive: 1 * time.Minute, - LocalAddr: &net.TCPAddr{ - Port: port, - }, - } - conn, err := dialer.DialContext(ctx, network, originAddr) - // TODO 需要在合适的时机删除TOA记录 - if err == nil || i == retries { - return conn, err - } - } - } + conn, err := this.handleTOA(req, ctx, network, originAddr, connectionTimeout) + if conn != nil || err != nil { + return conn, err } // 普通的连接 - conn, err := (&net.Dialer{ + conn, err = (&net.Dialer{ Timeout: connectionTimeout, KeepAlive: 1 * time.Minute, }).DialContext(ctx, network, originAddr) @@ -138,32 +117,10 @@ func (this *HTTPClientPool) Client(req *HTTPRequest, origin *serverconfigs.Origi return nil, err } - if proxyProtocol != nil && proxyProtocol.IsOn && (proxyProtocol.Version == serverconfigs.ProxyProtocolVersion1 || proxyProtocol.Version == serverconfigs.ProxyProtocolVersion2) { - var remoteAddr = req.requestRemoteAddr(true) - var transportProtocol = proxyproto.TCPv4 - if strings.Contains(remoteAddr, ":") { - transportProtocol = proxyproto.TCPv6 - } - var destAddr = conn.RemoteAddr() - var reqConn = req.RawReq.Context().Value(HTTPConnContextKey) - if reqConn != nil { - destAddr = reqConn.(net.Conn).LocalAddr() - } - header := proxyproto.Header{ - Version: byte(proxyProtocol.Version), - Command: proxyproto.PROXY, - TransportProtocol: transportProtocol, - SourceAddr: &net.TCPAddr{ - IP: net.ParseIP(remoteAddr), - Port: req.requestRemotePort(), - }, - DestinationAddr: destAddr, - } - _, err = header.WriteTo(conn) - if err != nil { - _ = conn.Close() - return nil, err - } + // 处理PROXY protocol + err = this.handlePROXYProtocol(conn, req, proxyProtocol) + if err != nil { + return nil, err } return conn, nil @@ -173,7 +130,7 @@ func (this *HTTPClientPool) Client(req *HTTPRequest, origin *serverconfigs.Origi MaxConnsPerHost: maxConnections, IdleConnTimeout: idleTimeout, ExpectContinueTimeout: 1 * time.Second, - TLSHandshakeTimeout: 10 * time.Second, + TLSHandshakeTimeout: 3 * time.Second, TLSClientConfig: tlsConfig, Proxy: nil, } @@ -207,3 +164,69 @@ func (this *HTTPClientPool) cleanClients() { this.locker.Unlock() } } + +// 支持TOA +func (this *HTTPClientPool) handleTOA(req *HTTPRequest, ctx context.Context, network string, originAddr string, connectionTimeout time.Duration) (net.Conn, error) { + // TODO 每个服务读取自身所属集群的TOA设置 + toaConfig := sharedTOAManager.Config() + if toaConfig != nil && toaConfig.IsOn { + retries := 3 + for i := 1; i <= retries; i++ { + port := int(toaConfig.RandLocalPort()) + // TODO 思考是否支持X-Real-IP/X-Forwarded-IP + err := sharedTOAManager.SendMsg("add:" + strconv.Itoa(port) + ":" + req.requestRemoteAddr(true)) + if err != nil { + remotelogs.Error("TOA", "add failed: "+err.Error()) + } else { + dialer := net.Dialer{ + Timeout: connectionTimeout, + KeepAlive: 1 * time.Minute, + LocalAddr: &net.TCPAddr{ + Port: port, + }, + } + conn, err := dialer.DialContext(ctx, network, originAddr) + // TODO 需要在合适的时机删除TOA记录 + if err == nil || i == retries { + return conn, err + } + } + } + } + + return nil, nil +} + +// 支持PROXY Protocol +func (this *HTTPClientPool) handlePROXYProtocol(conn net.Conn, req *HTTPRequest, proxyProtocol *serverconfigs.ProxyProtocolConfig) error { + if proxyProtocol != nil && proxyProtocol.IsOn && (proxyProtocol.Version == serverconfigs.ProxyProtocolVersion1 || proxyProtocol.Version == serverconfigs.ProxyProtocolVersion2) { + var remoteAddr = req.requestRemoteAddr(true) + var transportProtocol = proxyproto.TCPv4 + if strings.Contains(remoteAddr, ":") { + transportProtocol = proxyproto.TCPv6 + } + var destAddr = conn.RemoteAddr() + var reqConn = req.RawReq.Context().Value(HTTPConnContextKey) + if reqConn != nil { + destAddr = reqConn.(net.Conn).LocalAddr() + } + header := proxyproto.Header{ + Version: byte(proxyProtocol.Version), + Command: proxyproto.PROXY, + TransportProtocol: transportProtocol, + SourceAddr: &net.TCPAddr{ + IP: net.ParseIP(remoteAddr), + Port: req.requestRemotePort(), + }, + DestinationAddr: destAddr, + } + _, err := header.WriteTo(conn) + if err != nil { + _ = conn.Close() + return err + } + return nil + } + + return nil +}