反向代理实现AutoFlush/修复配置自动加载问题

This commit is contained in:
GoEdgeLab
2020-09-27 18:41:56 +08:00
parent ee2281b581
commit 6f3d688ece
14 changed files with 101 additions and 23 deletions

View File

@@ -7,7 +7,6 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/types"
"net"
"net/http"
@@ -141,9 +140,6 @@ func (this *HTTPRequest) doBegin() {
// Fastcgi
// TODO
// Server Event Sent
// TODO 实现Location的AutoFlush
// 返回404页面
this.write404()
}
@@ -234,7 +230,6 @@ func (this *HTTPRequest) configureWeb(web *serverconfigs.HTTPWebConfig, isTop bo
if !location.IsOn {
continue
}
logs.Println("rawPath:", rawPath, "location:", location.Pattern) // TODO
if varMapping, isMatched := location.Match(rawPath, this.Format); isMatched {
if len(varMapping) > 0 {
this.addVarMapping(varMapping)

View File

@@ -190,7 +190,7 @@ func (this *HTTPRequest) doReverseProxy() {
this.processResponseHeaders(resp.StatusCode)
// 是否需要刷新
shouldFlush := this.RawReq.Header.Get("Accept") == "text/event-stream"
shouldAutoFlush := this.reverseProxy.AutoFlush || this.RawReq.Header.Get("Accept") == "text/event-stream"
// 准备
this.writer.Prepare(resp.ContentLength)
@@ -201,7 +201,7 @@ func (this *HTTPRequest) doReverseProxy() {
// 输出到客户端
pool := this.bytePool(resp.ContentLength)
buf := pool.Get()
if shouldFlush {
if shouldAutoFlush {
for {
n, readErr := resp.Body.Read(buf)
if n > 0 {
@@ -226,7 +226,7 @@ func (this *HTTPRequest) doReverseProxy() {
logs.Error(err1)
}
if err != nil {
if err != nil && err != io.EOF {
logs.Error(err)
this.addError(err)
}

View File

@@ -12,7 +12,7 @@ import (
type Listener struct {
group *serverconfigs.ServerGroup
isListening bool
listener ListenerImpl // 监听器
listener ListenerInterface // 监听器
locker sync.RWMutex
}
@@ -23,8 +23,11 @@ func NewListener() *Listener {
func (this *Listener) Reload(group *serverconfigs.ServerGroup) {
this.locker.Lock()
defer this.locker.Unlock()
this.group = group
if this.listener != nil {
this.listener.Reload(group)
}
this.locker.Unlock()
}
func (this *Listener) FullAddr() string {

View File

@@ -20,6 +20,13 @@ func (this *BaseListener) Init() {
this.namedServers = map[string]*NamedServer{}
}
// 清除既有配置
func (this *BaseListener) Reset() {
this.namedServersLocker.Lock()
this.namedServers = map[string]*NamedServer{}
this.namedServersLocker.Unlock()
}
// 构造TLS配置
func (this *BaseListener) buildTLSConfig(group *serverconfigs.ServerGroup) *tls.Config {
return &tls.Config{

View File

@@ -73,6 +73,16 @@ func (this *HTTPListener) Close() error {
return this.Listener.Close()
}
func (this *HTTPListener) Reload(group *serverconfigs.ServerGroup) {
this.Group = group
if this.isHTTPS {
this.httpServer.TLSConfig = this.buildTLSConfig(this.Group)
}
this.Reset()
}
// 处理HTTP请求
func (this *HTTPListener) handleHTTP(rawWriter http.ResponseWriter, rawReq *http.Request) {
// 域名

View File

@@ -1,13 +0,0 @@
package nodes
// 各协议监听器的具体实现
type ListenerImpl interface {
// 初始化
Init()
// 监听
Serve() error
// 关闭
Close() error
}

View File

@@ -0,0 +1,18 @@
package nodes
import "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
// 各协议监听器的接口
type ListenerInterface interface {
// 初始化
Init()
// 监听
Serve() error
// 关闭
Close() error
// 重载配置
Reload(serverGroup *serverconfigs.ServerGroup)
}

View File

@@ -11,18 +11,21 @@ import (
var sharedListenerManager = NewListenerManager()
// 端口监听管理器
type ListenerManager struct {
listenersMap map[string]*Listener // addr => *Listener
locker sync.Mutex
lastConfig *nodeconfigs.NodeConfig
}
// 获取新对象
func NewListenerManager() *ListenerManager {
return &ListenerManager{
listenersMap: map[string]*Listener{},
}
}
// 启动监听
func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error {
this.locker.Lock()
defer this.locker.Unlock()
@@ -78,6 +81,7 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error {
return nil
}
// 返回更加友好格式的地址
func (this *ListenerManager) prettyAddress(addr string) string {
u, err := url.Parse(addr)
if err != nil {

View File

@@ -29,6 +29,11 @@ func (this *TCPListener) Serve() error {
return nil
}
func (this *TCPListener) Reload(group *serverconfigs.ServerGroup) {
this.Group = group
this.Reset()
}
func (this *TCPListener) handleConn(conn net.Conn) error {
firstServer := this.Group.FirstServer()
if firstServer == nil {

View File

@@ -21,3 +21,8 @@ func (this *UDPListener) Close() error {
// TODO
return nil
}
func (this *UDPListener) Reload(group *serverconfigs.ServerGroup) {
this.Group = group
this.Reset()
}

View File

@@ -21,3 +21,8 @@ func (this *UnixListener) Close() error {
// TODO
return nil
}
func (this *UnixListener) Reload(group *serverconfigs.ServerGroup) {
this.Group = group
this.Reset()
}

24
internal/utils/net.go Normal file
View File

@@ -0,0 +1,24 @@
package utils
import (
"context"
"github.com/iwind/TeaGo/logs"
"net"
"syscall"
)
// 监听可重用的端口
func ListenReuseAddr(network string, addr string) (net.Listener, error) {
config := &net.ListenConfig{
Control: func(network, address string, c syscall.RawConn) error {
return c.Control(func(fd uintptr) {
err := syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, SO_REUSEPORT, 1)
if err != nil {
logs.Println("[LISTEN]" + err.Error())
}
})
},
KeepAlive: 0,
}
return config.Listen(context.Background(), network, addr)
}

View File

@@ -0,0 +1,9 @@
// +build darwin
package utils
import (
"syscall"
)
const SO_REUSEPORT = syscall.SO_REUSEPORT

View File

@@ -0,0 +1,6 @@
// +build linux
// 可以在 /usr/include/asm-generic/socket.h 中找到 SO_REUSEPORT 值
package utils
const SO_REUSEPORT = 15