From 6f3d688ece93893cb862b5ace0d42ed7acb87a9f Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Sun, 27 Sep 2020 18:41:56 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8F=8D=E5=90=91=E4=BB=A3=E7=90=86=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0AutoFlush/=E4=BF=AE=E5=A4=8D=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E8=87=AA=E5=8A=A8=E5=8A=A0=E8=BD=BD=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/nodes/http_request.go | 5 ---- internal/nodes/http_request_reverse_proxy.go | 6 ++--- internal/nodes/listener.go | 7 ++++-- internal/nodes/listener_base.go | 7 ++++++ internal/nodes/listener_http.go | 10 ++++++++ internal/nodes/listener_impl.go | 13 ----------- internal/nodes/listener_interface.go | 18 +++++++++++++++ internal/nodes/listener_manager.go | 4 ++++ internal/nodes/listener_tcp.go | 5 ++++ internal/nodes/listener_udp.go | 5 ++++ internal/nodes/listener_unix.go | 5 ++++ internal/utils/net.go | 24 ++++++++++++++++++++ internal/utils/net_darwin.go | 9 ++++++++ internal/utils/net_linux.go | 6 +++++ 14 files changed, 101 insertions(+), 23 deletions(-) delete mode 100644 internal/nodes/listener_impl.go create mode 100644 internal/nodes/listener_interface.go create mode 100644 internal/utils/net.go create mode 100644 internal/utils/net_darwin.go create mode 100644 internal/utils/net_linux.go diff --git a/internal/nodes/http_request.go b/internal/nodes/http_request.go index 7d5c496..9f000d3 100644 --- a/internal/nodes/http_request.go +++ b/internal/nodes/http_request.go @@ -7,7 +7,6 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/utils" - "github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/types" "net" "net/http" @@ -141,9 +140,6 @@ func (this *HTTPRequest) doBegin() { // Fastcgi // TODO - // Server Event Sent - // TODO 实现Location的AutoFlush - // 返回404页面 this.write404() } @@ -234,7 +230,6 @@ func (this *HTTPRequest) configureWeb(web *serverconfigs.HTTPWebConfig, isTop bo if !location.IsOn { continue } - logs.Println("rawPath:", rawPath, "location:", location.Pattern) // TODO if varMapping, isMatched := location.Match(rawPath, this.Format); isMatched { if len(varMapping) > 0 { this.addVarMapping(varMapping) diff --git a/internal/nodes/http_request_reverse_proxy.go b/internal/nodes/http_request_reverse_proxy.go index a475b42..84672ac 100644 --- a/internal/nodes/http_request_reverse_proxy.go +++ b/internal/nodes/http_request_reverse_proxy.go @@ -190,7 +190,7 @@ func (this *HTTPRequest) doReverseProxy() { this.processResponseHeaders(resp.StatusCode) // 是否需要刷新 - shouldFlush := this.RawReq.Header.Get("Accept") == "text/event-stream" + shouldAutoFlush := this.reverseProxy.AutoFlush || this.RawReq.Header.Get("Accept") == "text/event-stream" // 准备 this.writer.Prepare(resp.ContentLength) @@ -201,7 +201,7 @@ func (this *HTTPRequest) doReverseProxy() { // 输出到客户端 pool := this.bytePool(resp.ContentLength) buf := pool.Get() - if shouldFlush { + if shouldAutoFlush { for { n, readErr := resp.Body.Read(buf) if n > 0 { @@ -226,7 +226,7 @@ func (this *HTTPRequest) doReverseProxy() { logs.Error(err1) } - if err != nil { + if err != nil && err != io.EOF { logs.Error(err) this.addError(err) } diff --git a/internal/nodes/listener.go b/internal/nodes/listener.go index f4c7092..d0e5752 100644 --- a/internal/nodes/listener.go +++ b/internal/nodes/listener.go @@ -12,7 +12,7 @@ import ( type Listener struct { group *serverconfigs.ServerGroup isListening bool - listener ListenerImpl // 监听器 + listener ListenerInterface // 监听器 locker sync.RWMutex } @@ -23,8 +23,11 @@ func NewListener() *Listener { func (this *Listener) Reload(group *serverconfigs.ServerGroup) { this.locker.Lock() - defer this.locker.Unlock() this.group = group + if this.listener != nil { + this.listener.Reload(group) + } + this.locker.Unlock() } func (this *Listener) FullAddr() string { diff --git a/internal/nodes/listener_base.go b/internal/nodes/listener_base.go index 920f674..fa47013 100644 --- a/internal/nodes/listener_base.go +++ b/internal/nodes/listener_base.go @@ -20,6 +20,13 @@ func (this *BaseListener) Init() { this.namedServers = map[string]*NamedServer{} } +// 清除既有配置 +func (this *BaseListener) Reset() { + this.namedServersLocker.Lock() + this.namedServers = map[string]*NamedServer{} + this.namedServersLocker.Unlock() +} + // 构造TLS配置 func (this *BaseListener) buildTLSConfig(group *serverconfigs.ServerGroup) *tls.Config { return &tls.Config{ diff --git a/internal/nodes/listener_http.go b/internal/nodes/listener_http.go index dbef9ae..bfdda62 100644 --- a/internal/nodes/listener_http.go +++ b/internal/nodes/listener_http.go @@ -73,6 +73,16 @@ func (this *HTTPListener) Close() error { return this.Listener.Close() } +func (this *HTTPListener) Reload(group *serverconfigs.ServerGroup) { + this.Group = group + + if this.isHTTPS { + this.httpServer.TLSConfig = this.buildTLSConfig(this.Group) + } + + this.Reset() +} + // 处理HTTP请求 func (this *HTTPListener) handleHTTP(rawWriter http.ResponseWriter, rawReq *http.Request) { // 域名 diff --git a/internal/nodes/listener_impl.go b/internal/nodes/listener_impl.go deleted file mode 100644 index 38ed2ba..0000000 --- a/internal/nodes/listener_impl.go +++ /dev/null @@ -1,13 +0,0 @@ -package nodes - -// 各协议监听器的具体实现 -type ListenerImpl interface { - // 初始化 - Init() - - // 监听 - Serve() error - - // 关闭 - Close() error -} diff --git a/internal/nodes/listener_interface.go b/internal/nodes/listener_interface.go new file mode 100644 index 0000000..861ccfc --- /dev/null +++ b/internal/nodes/listener_interface.go @@ -0,0 +1,18 @@ +package nodes + +import "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + +// 各协议监听器的接口 +type ListenerInterface interface { + // 初始化 + Init() + + // 监听 + Serve() error + + // 关闭 + Close() error + + // 重载配置 + Reload(serverGroup *serverconfigs.ServerGroup) +} diff --git a/internal/nodes/listener_manager.go b/internal/nodes/listener_manager.go index 424b219..c6a346a 100644 --- a/internal/nodes/listener_manager.go +++ b/internal/nodes/listener_manager.go @@ -11,18 +11,21 @@ import ( var sharedListenerManager = NewListenerManager() +// 端口监听管理器 type ListenerManager struct { listenersMap map[string]*Listener // addr => *Listener locker sync.Mutex lastConfig *nodeconfigs.NodeConfig } +// 获取新对象 func NewListenerManager() *ListenerManager { return &ListenerManager{ listenersMap: map[string]*Listener{}, } } +// 启动监听 func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error { this.locker.Lock() defer this.locker.Unlock() @@ -78,6 +81,7 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error { return nil } +// 返回更加友好格式的地址 func (this *ListenerManager) prettyAddress(addr string) string { u, err := url.Parse(addr) if err != nil { diff --git a/internal/nodes/listener_tcp.go b/internal/nodes/listener_tcp.go index b44d664..ee70d57 100644 --- a/internal/nodes/listener_tcp.go +++ b/internal/nodes/listener_tcp.go @@ -29,6 +29,11 @@ func (this *TCPListener) Serve() error { return nil } +func (this *TCPListener) Reload(group *serverconfigs.ServerGroup) { + this.Group = group + this.Reset() +} + func (this *TCPListener) handleConn(conn net.Conn) error { firstServer := this.Group.FirstServer() if firstServer == nil { diff --git a/internal/nodes/listener_udp.go b/internal/nodes/listener_udp.go index 56e79b1..4d4537b 100644 --- a/internal/nodes/listener_udp.go +++ b/internal/nodes/listener_udp.go @@ -21,3 +21,8 @@ func (this *UDPListener) Close() error { // TODO return nil } + +func (this *UDPListener) Reload(group *serverconfigs.ServerGroup) { + this.Group = group + this.Reset() +} diff --git a/internal/nodes/listener_unix.go b/internal/nodes/listener_unix.go index 170b6b7..f0d9b9a 100644 --- a/internal/nodes/listener_unix.go +++ b/internal/nodes/listener_unix.go @@ -21,3 +21,8 @@ func (this *UnixListener) Close() error { // TODO return nil } + +func (this *UnixListener) Reload(group *serverconfigs.ServerGroup) { + this.Group = group + this.Reset() +} diff --git a/internal/utils/net.go b/internal/utils/net.go new file mode 100644 index 0000000..9486889 --- /dev/null +++ b/internal/utils/net.go @@ -0,0 +1,24 @@ +package utils + +import ( + "context" + "github.com/iwind/TeaGo/logs" + "net" + "syscall" +) + +// 监听可重用的端口 +func ListenReuseAddr(network string, addr string) (net.Listener, error) { + config := &net.ListenConfig{ + Control: func(network, address string, c syscall.RawConn) error { + return c.Control(func(fd uintptr) { + err := syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, SO_REUSEPORT, 1) + if err != nil { + logs.Println("[LISTEN]" + err.Error()) + } + }) + }, + KeepAlive: 0, + } + return config.Listen(context.Background(), network, addr) +} diff --git a/internal/utils/net_darwin.go b/internal/utils/net_darwin.go new file mode 100644 index 0000000..e6a5935 --- /dev/null +++ b/internal/utils/net_darwin.go @@ -0,0 +1,9 @@ +// +build darwin + +package utils + +import ( + "syscall" +) + +const SO_REUSEPORT = syscall.SO_REUSEPORT diff --git a/internal/utils/net_linux.go b/internal/utils/net_linux.go new file mode 100644 index 0000000..46b0b5c --- /dev/null +++ b/internal/utils/net_linux.go @@ -0,0 +1,6 @@ +// +build linux +// 可以在 /usr/include/asm-generic/socket.h 中找到 SO_REUSEPORT 值 + +package utils + +const SO_REUSEPORT = 15