支持PROXY Protocol/修复UDP源站无法修改的问题

This commit is contained in:
刘祥超
2021-10-12 20:20:06 +08:00
parent bcce2a2767
commit f0d0a39a03
8 changed files with 115 additions and 14 deletions

1
go.mod
View File

@@ -23,6 +23,7 @@ require (
github.com/lionsoul2014/ip2region v2.2.0-release+incompatible github.com/lionsoul2014/ip2region v2.2.0-release+incompatible
github.com/mattn/go-sqlite3 v2.0.3+incompatible github.com/mattn/go-sqlite3 v2.0.3+incompatible
github.com/mssola/user_agent v0.5.2 github.com/mssola/user_agent v0.5.2
github.com/pires/go-proxyproto v0.6.1
github.com/shirou/gopsutil v3.21.5+incompatible github.com/shirou/gopsutil v3.21.5+incompatible
github.com/tklauser/go-sysconf v0.3.6 // indirect github.com/tklauser/go-sysconf v0.3.6 // indirect
golang.org/x/image v0.0.0-20190802002840-cff245a6509b golang.org/x/image v0.0.0-20190802002840-cff245a6509b

2
go.sum
View File

@@ -110,6 +110,8 @@ github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/opentracing/opentracing-go v1.1.1-0.20190913142402-a7454ce5950e/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.1.1-0.20190913142402-a7454ce5950e/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pires/go-proxyproto v0.6.1 h1:EBupykFmo22SDjv4fQVQd2J9NOoLPmyZA/15ldOGkPw=
github.com/pires/go-proxyproto v0.6.1/go.mod h1:Odh9VFOZJCf9G8cLW5o435Xf1J95Jw9Gw5rnCjcwzAY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=

View File

@@ -6,10 +6,12 @@ import (
"errors" "errors"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/pires/go-proxyproto"
"net" "net"
"net/http" "net/http"
"runtime" "runtime"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
) )
@@ -37,7 +39,7 @@ func NewHTTPClientPool() *HTTPClientPool {
} }
// Client 根据地址获取客户端 // Client 根据地址获取客户端
func (this *HTTPClientPool) Client(req *http.Request, origin *serverconfigs.OriginConfig, originAddr string) (rawClient *http.Client, err error) { func (this *HTTPClientPool) Client(req *HTTPRequest, origin *serverconfigs.OriginConfig, originAddr string, proxyProtocol *serverconfigs.ProxyProtocolConfig) (rawClient *http.Client, err error) {
if origin.Addr == nil { if origin.Addr == nil {
return nil, errors.New("origin addr should not be empty (originId:" + strconv.FormatInt(origin.Id, 10) + ")") return nil, errors.New("origin addr should not be empty (originId:" + strconv.FormatInt(origin.Id, 10) + ")")
} }
@@ -105,7 +107,7 @@ func (this *HTTPClientPool) Client(req *http.Request, origin *serverconfigs.Orig
for i := 1; i <= retries; i++ { for i := 1; i <= retries; i++ {
port := int(toaConfig.RandLocalPort()) port := int(toaConfig.RandLocalPort())
// TODO 思考是否支持X-Real-IP/X-Forwarded-IP // TODO 思考是否支持X-Real-IP/X-Forwarded-IP
err := sharedTOAManager.SendMsg("add:" + strconv.Itoa(port) + ":" + req.RemoteAddr) err := sharedTOAManager.SendMsg("add:" + strconv.Itoa(port) + ":" + req.requestRemoteAddr(true))
if err != nil { if err != nil {
remotelogs.Error("TOA", "add failed: "+err.Error()) remotelogs.Error("TOA", "add failed: "+err.Error())
} else { } else {
@@ -126,10 +128,43 @@ func (this *HTTPClientPool) Client(req *http.Request, origin *serverconfigs.Orig
} }
// 普通的连接 // 普通的连接
return (&net.Dialer{ conn, err := (&net.Dialer{
Timeout: connectionTimeout, Timeout: connectionTimeout,
KeepAlive: 1 * time.Minute, KeepAlive: 1 * time.Minute,
}).DialContext(ctx, network, originAddr) }).DialContext(ctx, network, originAddr)
if err != nil {
return nil, err
}
if proxyProtocol != nil && proxyProtocol.IsOn && (proxyProtocol.Version == serverconfigs.ProxyProtocolVersion1 || proxyProtocol.Version == serverconfigs.ProxyProtocolVersion2) {
var remoteAddr = req.requestRemoteAddr(true)
var transportProtocol = proxyproto.TCPv4
if strings.Contains(remoteAddr, ":") {
transportProtocol = proxyproto.TCPv6
}
var destAddr = conn.RemoteAddr()
var reqConn = req.RawReq.Context().Value(HTTPConnContextKey)
if reqConn != nil {
destAddr = reqConn.(net.Conn).LocalAddr()
}
header := proxyproto.Header{
Version: byte(proxyProtocol.Version),
Command: proxyproto.PROXY,
TransportProtocol: transportProtocol,
SourceAddr: &net.TCPAddr{
IP: net.ParseIP(remoteAddr),
Port: req.requestRemotePort(),
},
DestinationAddr: destAddr,
}
_, err = header.WriteTo(conn)
if err != nil {
_ = conn.Close()
return nil, err
}
}
return conn, nil
}, },
MaxIdleConns: 0, MaxIdleConns: 0,
MaxIdleConnsPerHost: idleConns, MaxIdleConnsPerHost: idleConns,

View File

@@ -21,14 +21,14 @@ func TestHTTPClientPool_Client(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
{ {
client, err := pool.Client(nil, origin, origin.Addr.PickAddress()) client, err := pool.Client(nil, origin, origin.Addr.PickAddress(), nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
t.Log("client:", client) t.Log("client:", client)
} }
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
client, err := pool.Client(nil, origin, origin.Addr.PickAddress()) client, err := pool.Client(nil, origin, origin.Addr.PickAddress(), nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -53,7 +53,7 @@ func TestHTTPClientPool_cleanClients(t *testing.T) {
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
t.Log("get", i) t.Log("get", i)
_, _ = pool.Client(nil, origin, origin.Addr.PickAddress()) _, _ = pool.Client(nil, origin, origin.Addr.PickAddress(), nil)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} }
} }
@@ -73,6 +73,6 @@ func BenchmarkHTTPClientPool_Client(b *testing.B) {
pool := NewHTTPClientPool() pool := NewHTTPClientPool()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
_, _ = pool.Client(nil, origin, origin.Addr.PickAddress()) _, _ = pool.Client(nil, origin, origin.Addr.PickAddress(), nil)
} }
} }

View File

@@ -145,7 +145,7 @@ func (this *HTTPRequest) doReverseProxy() {
} }
// 获取请求客户端 // 获取请求客户端
client, err := SharedHTTPClientPool.Client(this.RawReq, origin, originAddr) client, err := SharedHTTPClientPool.Client(this, origin, originAddr, this.reverseProxy.ProxyProtocol)
if err != nil { if err != nil {
remotelogs.Error("HTTP_REQUEST_REVERSE_PROXY", err.Error()) remotelogs.Error("HTTP_REQUEST_REVERSE_PROXY", err.Error())
this.write50x(err, http.StatusBadGateway) this.write50x(err, http.StatusBadGateway)

View File

@@ -6,7 +6,9 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/stats" "github.com/TeaOSLab/EdgeNode/internal/stats"
"github.com/pires/go-proxyproto"
"net" "net"
"strings"
"sync/atomic" "sync/atomic"
) )
@@ -83,6 +85,31 @@ func (this *TCPListener) handleConn(conn net.Conn) error {
_ = originConn.Close() _ = originConn.Close()
} }
// PROXY Protocol
if firstServer.ReverseProxy != nil &&
firstServer.ReverseProxy.ProxyProtocol != nil &&
firstServer.ReverseProxy.ProxyProtocol.IsOn &&
(firstServer.ReverseProxy.ProxyProtocol.Version == serverconfigs.ProxyProtocolVersion1 || firstServer.ReverseProxy.ProxyProtocol.Version == serverconfigs.ProxyProtocolVersion2) {
var remoteAddr = conn.RemoteAddr()
var transportProtocol = proxyproto.TCPv4
if strings.Contains(remoteAddr.String(), "[") {
transportProtocol = proxyproto.TCPv6
}
header := proxyproto.Header{
Version: byte(firstServer.ReverseProxy.ProxyProtocol.Version),
Command: proxyproto.PROXY,
TransportProtocol: transportProtocol,
SourceAddr: remoteAddr,
DestinationAddr: conn.LocalAddr(),
}
_, err = header.WriteTo(originConn)
if err != nil {
closer()
return err
}
}
// 从源站读取
go func() { go func() {
originBuffer := bytePool32k.Get() originBuffer := bytePool32k.Get()
defer func() { defer func() {
@@ -107,6 +134,7 @@ func (this *TCPListener) handleConn(conn net.Conn) error {
} }
}() }()
// 从客户端读取
clientBuffer := bytePool32k.Get() clientBuffer := bytePool32k.Get()
defer func() { defer func() {
bytePool32k.Put(clientBuffer) bytePool32k.Put(clientBuffer)

View File

@@ -8,7 +8,7 @@ import (
func TestListener_Listen(t *testing.T) { func TestListener_Listen(t *testing.T) {
listener := NewListener() listener := NewListener()
group := serverconfigs.NewServerGroup("http://:1234") group := serverconfigs.NewServerAddressGroup("https://:1234")
listener.Reload(group) listener.Reload(group)
err := listener.Listen() err := listener.Listen()

View File

@@ -6,7 +6,9 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/stats" "github.com/TeaOSLab/EdgeNode/internal/stats"
"github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/pires/go-proxyproto"
"net" "net"
"strings"
"sync" "sync"
"time" "time"
) )
@@ -19,6 +21,8 @@ type UDPListener struct {
connMap map[string]*UDPConn connMap map[string]*UDPConn
connLocker sync.Mutex connLocker sync.Mutex
connTicker *utils.Ticker connTicker *utils.Ticker
reverseProxy *serverconfigs.ReverseProxyConfig
} }
func (this *UDPListener) Serve() error { func (this *UDPListener) Serve() error {
@@ -26,8 +30,9 @@ func (this *UDPListener) Serve() error {
if firstServer == nil { if firstServer == nil {
return errors.New("no server available") return errors.New("no server available")
} }
if firstServer.ReverseProxy == nil { this.reverseProxy = firstServer.ReverseProxy
return errors.New("no ReverseProxy configured for the server") if this.reverseProxy == nil {
return errors.New("no ReverseProxy configured for the server '" + firstServer.Name + "'")
} }
this.connMap = map[string]*UDPConn{} this.connMap = map[string]*UDPConn{}
@@ -50,7 +55,7 @@ func (this *UDPListener) Serve() error {
ok = false ok = false
} }
if !ok { if !ok {
originConn, err := this.connectOrigin(firstServer.ReverseProxy, "") originConn, err := this.connectOrigin(this.reverseProxy, addr)
if err != nil { if err != nil {
remotelogs.Error("UDP_LISTENER", "unable to connect to origin server: "+err.Error()) remotelogs.Error("UDP_LISTENER", "unable to connect to origin server: "+err.Error())
continue continue
@@ -87,9 +92,16 @@ func (this *UDPListener) Close() error {
func (this *UDPListener) Reload(group *serverconfigs.ServerAddressGroup) { func (this *UDPListener) Reload(group *serverconfigs.ServerAddressGroup) {
this.Group = group this.Group = group
this.Reset() this.Reset()
// 重置配置
firstServer := this.Group.FirstServer()
if firstServer == nil {
return
}
this.reverseProxy = firstServer.ReverseProxy
} }
func (this *UDPListener) connectOrigin(reverseProxy *serverconfigs.ReverseProxyConfig, remoteAddr string) (conn net.Conn, err error) { func (this *UDPListener) connectOrigin(reverseProxy *serverconfigs.ReverseProxyConfig, remoteAddr net.Addr) (conn net.Conn, err error) {
if reverseProxy == nil { if reverseProxy == nil {
return nil, errors.New("no reverse proxy config") return nil, errors.New("no reverse proxy config")
} }
@@ -100,11 +112,34 @@ func (this *UDPListener) connectOrigin(reverseProxy *serverconfigs.ReverseProxyC
if origin == nil { if origin == nil {
continue continue
} }
conn, err = OriginConnect(origin, remoteAddr) conn, err = OriginConnect(origin, remoteAddr.String())
if err != nil { if err != nil {
remotelogs.Error("UDP_LISTENER", "unable to connect origin: "+origin.Addr.Host+":"+origin.Addr.PortRange+": "+err.Error()) remotelogs.Error("UDP_LISTENER", "unable to connect origin: "+origin.Addr.Host+":"+origin.Addr.PortRange+": "+err.Error())
continue continue
} else { } else {
// PROXY Protocol
if reverseProxy != nil &&
reverseProxy.ProxyProtocol != nil &&
reverseProxy.ProxyProtocol.IsOn &&
(reverseProxy.ProxyProtocol.Version == serverconfigs.ProxyProtocolVersion1 || reverseProxy.ProxyProtocol.Version == serverconfigs.ProxyProtocolVersion2) {
var transportProtocol = proxyproto.UDPv4
if strings.Contains(remoteAddr.String(), "[") {
transportProtocol = proxyproto.UDPv6
}
header := proxyproto.Header{
Version: byte(reverseProxy.ProxyProtocol.Version),
Command: proxyproto.PROXY,
TransportProtocol: transportProtocol,
SourceAddr: remoteAddr,
DestinationAddr: this.Listener.LocalAddr(),
}
_, err = header.WriteTo(conn)
if err != nil {
_ = conn.Close()
return nil, err
}
}
return return
} }
} }