From 146a947d0be6c1f8b82a496d8f6b38b46a43873a Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Wed, 29 Jun 2022 21:58:41 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E6=BA=90=E7=AB=99=E7=AB=AF?= =?UTF-8?q?=E5=8F=A3=E8=B7=9F=E9=9A=8F=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dist/.gitignore | 3 +- internal/nodes/http_request_reverse_proxy.go | 13 +++++ internal/nodes/http_request_websocket.go | 2 +- internal/nodes/listener.go | 2 +- internal/nodes/listener_base.go | 10 ++-- internal/nodes/listener_tcp.go | 23 ++++++-- internal/nodes/listener_udp.go | 26 ++++++--- internal/nodes/origin_state.go | 1 + internal/nodes/origin_state_manager.go | 2 +- internal/nodes/origin_utils.go | 55 +++++++++++++------- 10 files changed, 96 insertions(+), 41 deletions(-) diff --git a/dist/.gitignore b/dist/.gitignore index 6f66c74..1811c18 100644 --- a/dist/.gitignore +++ b/dist/.gitignore @@ -1 +1,2 @@ -*.zip \ No newline at end of file +*.zip +edge-node \ No newline at end of file diff --git a/internal/nodes/http_request_reverse_proxy.go b/internal/nodes/http_request_reverse_proxy.go index 7d789ac..5ab2304 100644 --- a/internal/nodes/http_request_reverse_proxy.go +++ b/internal/nodes/http_request_reverse_proxy.go @@ -7,6 +7,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/iwind/TeaGo/types" "io" "net/http" "net/url" @@ -124,6 +125,18 @@ func (this *HTTPRequest) doReverseProxy() { if origin.Addr.HostHasVariables() { originAddr = this.Format(originAddr) } + + // 端口跟随 + if origin.FollowPort { + var originHostIndex = strings.Index(originAddr, ":") + if originHostIndex < 0 { + var originErr = errors.New("invalid origin address '" + originAddr + "', lacking port") + remotelogs.Error("HTTP_REQUEST_REVERSE_PROXY", originErr.Error()) + this.write50x(originErr, http.StatusBadGateway, true) + return + } + originAddr = originAddr[:originHostIndex+1] + types.String(this.requestServerPort()) + } this.originAddr = originAddr // RequestHost diff --git a/internal/nodes/http_request_websocket.go b/internal/nodes/http_request_websocket.go index b159035..a1f6e19 100644 --- a/internal/nodes/http_request_websocket.go +++ b/internal/nodes/http_request_websocket.go @@ -41,7 +41,7 @@ func (this *HTTPRequest) doWebsocket(requestHost string) { } // TODO 增加N次错误重试,重试的时候需要尝试不同的源站 - originConn, err := OriginConnect(this.origin, this.RawReq.RemoteAddr, requestHost) + originConn, _, err := OriginConnect(this.origin, this.requestServerPort(), this.RawReq.RemoteAddr, requestHost) if err != nil { this.write50x(err, http.StatusBadGateway, false) diff --git a/internal/nodes/listener.go b/internal/nodes/listener.go index 5e99b31..7d27616 100644 --- a/internal/nodes/listener.go +++ b/internal/nodes/listener.go @@ -153,7 +153,7 @@ func (this *Listener) Close() error { // 创建TCP监听器 func (this *Listener) createTCPListener() (net.Listener, error) { - listenConfig := net.ListenConfig{ + var listenConfig = net.ListenConfig{ Control: nil, KeepAlive: 0, } diff --git a/internal/nodes/listener_base.go b/internal/nodes/listener_base.go index cf9718d..5046c05 100644 --- a/internal/nodes/listener_base.go +++ b/internal/nodes/listener_base.go @@ -107,7 +107,7 @@ func (this *BaseListener) matchSSL(domain string) (*sslconfigs.SSLPolicy, *tls.C } // 证书是否匹配 - sslConfig := server.SSLPolicy() + var sslConfig = server.SSLPolicy() cert, ok := sslConfig.MatchDomain(domain) if ok { return sslConfig, cert, nil @@ -127,7 +127,7 @@ func (this *BaseListener) findNamedServer(name string) (serverConfig *serverconf return } - matchDomainStrictly := sharedNodeConfig.GlobalConfig != nil && sharedNodeConfig.GlobalConfig.HTTPAll.MatchDomainStrictly + var matchDomainStrictly = sharedNodeConfig.GlobalConfig != nil && sharedNodeConfig.GlobalConfig.HTTPAll.MatchDomainStrictly if sharedNodeConfig.GlobalConfig != nil && len(sharedNodeConfig.GlobalConfig.HTTPAll.DefaultDomain) > 0 && @@ -144,9 +144,9 @@ func (this *BaseListener) findNamedServer(name string) (serverConfig *serverconf } // 如果没有找到,则匹配到第一个 - group := this.Group - currentServers := group.Servers() - countServers := len(currentServers) + var group = this.Group + var currentServers = group.Servers() + var countServers = len(currentServers) if countServers == 0 { return nil, "" } diff --git a/internal/nodes/listener_tcp.go b/internal/nodes/listener_tcp.go index 19c6807..2210bca 100644 --- a/internal/nodes/listener_tcp.go +++ b/internal/nodes/listener_tcp.go @@ -8,6 +8,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/stats" "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/iwind/TeaGo/types" "github.com/pires/go-proxyproto" "net" "strings" @@ -18,6 +19,8 @@ type TCPListener struct { BaseListener Listener net.Listener + + port int } func (this *TCPListener) Serve() error { @@ -26,6 +29,14 @@ func (this *TCPListener) Serve() error { listener = tls.NewListener(listener, this.buildTLSConfig()) } + // 获取分组端口 + var groupAddr = this.Group.Addr() + var portIndex = strings.LastIndex(groupAddr, ":") + if portIndex >= 0 { + var port = groupAddr[portIndex+1:] + this.port = types.Int(port) + } + for { conn, err := listener.Accept() if err != nil { @@ -52,6 +63,7 @@ func (this *TCPListener) Reload(group *serverconfigs.ServerAddressGroup) { } func (this *TCPListener) handleConn(conn net.Conn) error { + var server = this.Group.FirstServer() if server == nil { return errors.New("no server available") @@ -193,9 +205,10 @@ func (this *TCPListener) connectOrigin(serverId int64, reverseProxy *serverconfi return nil, errors.New("no reverse proxy config") } - retries := 3 + var retries = 3 + var addr string for i := 0; i < retries; i++ { - origin := reverseProxy.NextOrigin(nil) + var origin = reverseProxy.NextOrigin(nil) if origin == nil { continue } @@ -209,15 +222,15 @@ func (this *TCPListener) connectOrigin(serverId int64, reverseProxy *serverconfi requestHost = origin.RequestHost } - conn, err = OriginConnect(origin, remoteAddr, requestHost) + conn, addr, err = OriginConnect(origin, this.port, remoteAddr, requestHost) if err != nil { - remotelogs.ServerError(serverId, "TCP_LISTENER", "unable to connect origin: "+origin.Addr.Host+":"+origin.Addr.PortRange+": "+err.Error(), "", nil) + remotelogs.ServerError(serverId, "TCP_LISTENER", "unable to connect origin server: "+addr+": "+err.Error(), "", nil) continue } else { return } } - err = errors.New("no origin can be used") + err = errors.New("no origin server can be used") return } diff --git a/internal/nodes/listener_udp.go b/internal/nodes/listener_udp.go index f361054..416fff5 100644 --- a/internal/nodes/listener_udp.go +++ b/internal/nodes/listener_udp.go @@ -7,6 +7,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/stats" "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/iwind/TeaGo/types" "github.com/pires/go-proxyproto" "net" "strings" @@ -25,11 +26,21 @@ type UDPListener struct { reverseProxy *serverconfigs.ReverseProxyConfig + port int + isClosed bool } func (this *UDPListener) Serve() error { - firstServer := this.Group.FirstServer() + // 获取分组端口 + var groupAddr = this.Group.Addr() + var portIndex = strings.LastIndex(groupAddr, ":") + if portIndex >= 0 { + var port = groupAddr[portIndex+1:] + this.port = types.Int(port) + } + + var firstServer = this.Group.FirstServer() if firstServer == nil { return errors.New("no server available") } @@ -110,7 +121,7 @@ func (this *UDPListener) Reload(group *serverconfigs.ServerAddressGroup) { this.Reset() // 重置配置 - firstServer := this.Group.FirstServer() + var firstServer = this.Group.FirstServer() if firstServer == nil { return } @@ -122,15 +133,16 @@ func (this *UDPListener) connectOrigin(serverId int64, reverseProxy *serverconfi return nil, errors.New("no reverse proxy config") } - retries := 3 + var retries = 3 + var addr string for i := 0; i < retries; i++ { - origin := reverseProxy.NextOrigin(nil) + var origin = reverseProxy.NextOrigin(nil) if origin == nil { continue } - conn, err = OriginConnect(origin, remoteAddr.String(), "") + conn, addr, err = OriginConnect(origin, this.port, remoteAddr.String(), "") if err != nil { - remotelogs.ServerError(serverId, "UDP_LISTENER", "unable to connect origin: "+origin.Addr.Host+":"+origin.Addr.PortRange+": "+err.Error(), "", nil) + remotelogs.ServerError(serverId, "UDP_LISTENER", "unable to connect origin server: "+addr+": "+err.Error(), "", nil) continue } else { // PROXY Protocol @@ -159,7 +171,7 @@ func (this *UDPListener) connectOrigin(serverId int64, reverseProxy *serverconfi return } } - err = errors.New("no origin can be used") + err = errors.New("no origin server can be used") return } diff --git a/internal/nodes/origin_state.go b/internal/nodes/origin_state.go index e3180d8..8d4b1b6 100644 --- a/internal/nodes/origin_state.go +++ b/internal/nodes/origin_state.go @@ -8,6 +8,7 @@ type OriginState struct { CountFails int64 UpdatedAt int64 Config *serverconfigs.OriginConfig + Addr string TLSHost string ReverseProxy *serverconfigs.ReverseProxyConfig } diff --git a/internal/nodes/origin_state_manager.go b/internal/nodes/origin_state_manager.go index 5b82fe7..0cefe81 100644 --- a/internal/nodes/origin_state_manager.go +++ b/internal/nodes/origin_state_manager.go @@ -99,7 +99,7 @@ func (this *OriginStateManager) Loop() error { for _, state := range currentStates { go func(state *OriginState) { defer wg.Done() - conn, err := OriginConnect(state.Config, "", state.TLSHost) + conn, _, err := OriginConnect(state.Config, 0, "", state.TLSHost) if err == nil { _ = conn.Close() diff --git a/internal/nodes/origin_utils.go b/internal/nodes/origin_utils.go index 9537012..313fc58 100644 --- a/internal/nodes/origin_utils.go +++ b/internal/nodes/origin_utils.go @@ -3,47 +3,54 @@ package nodes import ( "crypto/tls" "errors" + "github.com/TeaOSLab/EdgeCommon/pkg/configutils" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/iwind/TeaGo/types" "net" "strconv" ) // OriginConnect 连接源站 -func OriginConnect(origin *serverconfigs.OriginConfig, remoteAddr string, tlsHost string) (net.Conn, error) { +func OriginConnect(origin *serverconfigs.OriginConfig, serverPort int, remoteAddr string, tlsHost string) (originConn net.Conn, originAddr string, err error) { if origin.Addr == nil { - return nil, errors.New("origin server address should not be empty") + return nil, "", errors.New("origin server address should not be empty") } // 支持TOA的连接 // 这个条件很重要,如果没有传递remoteAddr,表示不使用TOA if len(remoteAddr) > 0 { - toaConfig := sharedTOAManager.Config() + var toaConfig = sharedTOAManager.Config() if toaConfig != nil && toaConfig.IsOn { - retries := 3 + var retries = 3 for i := 1; i <= retries; i++ { - port := int(toaConfig.RandLocalPort()) - err := sharedTOAManager.SendMsg("add:" + strconv.Itoa(port) + ":" + remoteAddr) + var port = int(toaConfig.RandLocalPort()) + err = sharedTOAManager.SendMsg("add:" + strconv.Itoa(port) + ":" + remoteAddr) if err != nil { remotelogs.Error("TOA", "add failed: "+err.Error()) } else { - dialer := net.Dialer{ + var dialer = net.Dialer{ Timeout: origin.ConnTimeoutDuration(), LocalAddr: &net.TCPAddr{ Port: port, }, } + originAddr = origin.Addr.PickAddress() + + // 端口跟随 + if origin.FollowPort && serverPort > 0 { + originAddr = configutils.QuoteIP(origin.Addr.Host) + ":" + types.String(serverPort) + } + var conn net.Conn switch origin.Addr.Protocol { case "", serverconfigs.ProtocolTCP, serverconfigs.ProtocolHTTP: // TODO 支持TCP4/TCP6 // TODO 支持指定特定网卡 - // TODO Addr支持端口范围,如果有多个端口时,随机一个端口使用 - conn, err = dialer.Dial("tcp", origin.Addr.Host+":"+origin.Addr.PortRange) + conn, err = dialer.Dial("tcp", originAddr) case serverconfigs.ProtocolTLS, serverconfigs.ProtocolHTTPS: // TODO 支持TCP4/TCP6 // TODO 支持指定特定网卡 - // TODO Addr支持端口范围,如果有多个端口时,随机一个端口使用 var tlsConfig = &tls.Config{ InsecureSkipVerify: true, @@ -62,28 +69,34 @@ func OriginConnect(origin *serverconfigs.OriginConfig, remoteAddr string, tlsHos tlsConfig.ServerName = tlsHost } - conn, err = tls.DialWithDialer(&dialer, "tcp", origin.Addr.Host+":"+origin.Addr.PortRange, tlsConfig) + conn, err = tls.DialWithDialer(&dialer, "tcp", originAddr, tlsConfig) } // TODO 需要在合适的时机删除TOA记录 if err == nil || i == retries { - return conn, err + return conn, originAddr, err } } } } } + originAddr = origin.Addr.PickAddress() + + // 端口跟随 + if origin.FollowPort && serverPort > 0 { + originAddr = configutils.QuoteIP(origin.Addr.Host) + ":" + types.String(serverPort) + } + switch origin.Addr.Protocol { case "", serverconfigs.ProtocolTCP, serverconfigs.ProtocolHTTP: // TODO 支持TCP4/TCP6 // TODO 支持指定特定网卡 - // TODO Addr支持端口范围,如果有多个端口时,随机一个端口使用 - return net.DialTimeout("tcp", origin.Addr.Host+":"+origin.Addr.PortRange, origin.ConnTimeoutDuration()) + originConn, err = net.DialTimeout("tcp", originAddr, origin.ConnTimeoutDuration()) + return originConn, originAddr, err case serverconfigs.ProtocolTLS, serverconfigs.ProtocolHTTPS: // TODO 支持TCP4/TCP6 // TODO 支持指定特定网卡 - // TODO Addr支持端口范围,如果有多个端口时,随机一个端口使用 var tlsConfig = &tls.Config{ InsecureSkipVerify: true, @@ -102,16 +115,18 @@ func OriginConnect(origin *serverconfigs.OriginConfig, remoteAddr string, tlsHos tlsConfig.ServerName = tlsHost } - return tls.Dial("tcp", origin.Addr.Host+":"+origin.Addr.PortRange, tlsConfig) + originConn, err = tls.Dial("tcp", originAddr, tlsConfig) + return originConn, originAddr, err case serverconfigs.ProtocolUDP: - addr, err := net.ResolveUDPAddr("udp", origin.Addr.Host+":"+origin.Addr.PortRange) + addr, err := net.ResolveUDPAddr("udp", originAddr) if err != nil { - return nil, err + return nil, originAddr, err } - return net.DialUDP("udp", nil, addr) + originConn, err = net.DialUDP("udp", nil, addr) + return originConn, originAddr, err } // TODO 支持从Unix、Pipe、HTTP、HTTPS中读取数据 - return nil, errors.New("invalid origin scheme '" + origin.Addr.Protocol.String() + "'") + return nil, originAddr, errors.New("invalid origin scheme '" + origin.Addr.Protocol.String() + "'") }