diff --git a/internal/nodes/http_request_reverse_proxy.go b/internal/nodes/http_request_reverse_proxy.go index 0e07573..a90a45b 100644 --- a/internal/nodes/http_request_reverse_proxy.go +++ b/internal/nodes/http_request_reverse_proxy.go @@ -160,6 +160,9 @@ func (this *HTTPRequest) doReverseProxy() { httpErr, ok := err.(*url.Error) if !ok || httpErr.Err != context.Canceled { // TODO 如果超过最大失败次数,则下线 + SharedOriginStateManager.Fail(origin, this.reverseProxy, func() { + this.reverseProxy.ResetScheduling() + }) this.write502(err) remotelogs.Warn("HTTP_REQUEST_REVERSE_PROXY", this.RawReq.URL.String()+"': "+err.Error()) @@ -183,6 +186,11 @@ func (this *HTTPRequest) doReverseProxy() { } return } + if !origin.IsOk { + SharedOriginStateManager.Success(origin, func() { + this.reverseProxy.ResetScheduling() + }) + } // WAF对出站进行检查 if this.web.FirewallRef != nil && this.web.FirewallRef.IsOn { diff --git a/internal/nodes/origin_state.go b/internal/nodes/origin_state.go new file mode 100644 index 0000000..17dfa8a --- /dev/null +++ b/internal/nodes/origin_state.go @@ -0,0 +1,12 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package nodes + +import "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + +type OriginState struct { + CountFails int64 + UpdatedAt int64 + Config *serverconfigs.OriginConfig + ReverseProxy *serverconfigs.ReverseProxyConfig +} diff --git a/internal/nodes/origin_state_manager.go b/internal/nodes/origin_state_manager.go new file mode 100644 index 0000000..17f9614 --- /dev/null +++ b/internal/nodes/origin_state_manager.go @@ -0,0 +1,172 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package nodes + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/iwind/TeaGo/Tea" + "sync" + "time" +) + +var SharedOriginStateManager = NewOriginStateManager() + +func init() { + events.On(events.EventLoaded, func() { + go SharedOriginStateManager.Start() + }) +} + +// OriginStateManager 源站状态管理 +type OriginStateManager struct { + stateMap map[int64]*OriginState // originId => *OriginState + + ticker *time.Ticker + locker sync.RWMutex +} + +// NewOriginStateManager 获取新管理对象 +func NewOriginStateManager() *OriginStateManager { + return &OriginStateManager{ + stateMap: map[int64]*OriginState{}, + ticker: time.NewTicker(60 * time.Second), + } +} + +// Start 启动 +func (this *OriginStateManager) Start() { + events.On(events.EventReload, func() { + this.locker.Lock() + this.stateMap = map[int64]*OriginState{} + this.locker.Unlock() + }) + + if Tea.IsTesting() { + this.ticker = time.NewTicker(10 * time.Second) + } + for range this.ticker.C { + err := this.Loop() + if err != nil { + remotelogs.Error("ORIGIN_MANAGER", err.Error()) + } + } +} + +// Loop 单次循环检查 +func (this *OriginStateManager) Loop() error { + if sharedNodeConfig == nil { + return nil + } + + var currentStates = []*OriginState{} + this.locker.Lock() + for originId, state := range this.stateMap { + // 检查Origin是否正在使用 + config := sharedNodeConfig.FindOrigin(originId) + if config == nil || !config.IsOn { + delete(this.stateMap, originId) + continue + } + state.Config = config + currentStates = append(currentStates, state) + } + this.locker.Unlock() + + if len(currentStates) == 0 { + return nil + } + + var count = len(currentStates) + wg := &sync.WaitGroup{} + wg.Add(count) + for _, state := range currentStates { + go func(state *OriginState) { + defer wg.Done() + conn, err := OriginConnect(state.Config, "") + if err == nil { + _ = conn.Close() + + // 已经恢复正常 + this.locker.Lock() + state.Config.IsOk = true + delete(this.stateMap, state.Config.Id) + this.locker.Unlock() + + var reverseProxy = state.ReverseProxy + if reverseProxy != nil { + reverseProxy.ResetScheduling() + } + } + }(state) + } + wg.Wait() + + return nil +} + +// Fail 添加失败的源站 +func (this *OriginStateManager) Fail(origin *serverconfigs.OriginConfig, reverseProxy *serverconfigs.ReverseProxyConfig, callback func()) { + if origin == nil { + return + } + this.locker.Lock() + state, ok := this.stateMap[origin.Id] + var timestamp = time.Now().Unix() + if ok { + if state.UpdatedAt < timestamp-300 { // N 分钟之后重新计数 + state.CountFails = 0 + state.Config.IsOk = true + } + + state.CountFails++ + state.Config = origin + state.ReverseProxy = reverseProxy + state.UpdatedAt = timestamp + + if origin.IsOk { + origin.IsOk = state.CountFails > 5 // 超过 N 次之后认为是异常 + + if !origin.IsOk { + if callback != nil { + callback() + } + } + } + } else { + this.stateMap[origin.Id] = &OriginState{ + CountFails: 1, + Config: origin, + ReverseProxy: reverseProxy, + UpdatedAt: timestamp, + } + origin.IsOk = true + } + this.locker.Unlock() +} + +// Success 添加成功的源站 +func (this *OriginStateManager) Success(origin *serverconfigs.OriginConfig, callback func()) { + if origin == nil { + return + } + if !origin.IsOk { + if callback != nil { + callback() + } + } + origin.IsOk = true + this.locker.Lock() + delete(this.stateMap, origin.Id) + this.locker.Unlock() +} + +// IsAvailable 检查是否正常 +func (this *OriginStateManager) IsAvailable(originId int64) bool { + this.locker.RLock() + _, ok := this.stateMap[originId] + this.locker.RUnlock() + + return !ok +} diff --git a/internal/nodes/origin_state_manager_test.go b/internal/nodes/origin_state_manager_test.go new file mode 100644 index 0000000..79ab3e3 --- /dev/null +++ b/internal/nodes/origin_state_manager_test.go @@ -0,0 +1,15 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package nodes + +import "testing" + +func TestOriginManager_Loop(t *testing.T) { + var manager = NewOriginStateManager() + err := manager.Loop() + if err != nil { + t.Fatal(err) + } + + t.Log(manager.stateMap) +} diff --git a/internal/nodes/origin_utils.go b/internal/nodes/origin_utils.go index 0db48cc..ac9e66b 100644 --- a/internal/nodes/origin_utils.go +++ b/internal/nodes/origin_utils.go @@ -16,41 +16,44 @@ func OriginConnect(origin *serverconfigs.OriginConfig, remoteAddr string) (net.C } // 支持TOA的连接 - toaConfig := sharedTOAManager.Config() - if toaConfig != nil && toaConfig.IsOn { - retries := 3 - for i := 1; i <= retries; i++ { - port := int(toaConfig.RandLocalPort()) - err := sharedTOAManager.SendMsg("add:" + strconv.Itoa(port) + ":" + remoteAddr) - if err != nil { - remotelogs.Error("TOA", "add failed: "+err.Error()) - } else { - dialer := net.Dialer{ - Timeout: origin.ConnTimeoutDuration(), - LocalAddr: &net.TCPAddr{ - Port: port, - }, - } - var conn net.Conn - switch origin.Addr.Protocol { - case "", serverconfigs.ProtocolTCP, serverconfigs.ProtocolHTTP: - // TODO 支持TCP4/TCP6 - // TODO 支持指定特定网卡 - // TODO Addr支持端口范围,如果有多个端口时,随机一个端口使用 - conn, err = dialer.Dial("tcp", origin.Addr.Host+":"+origin.Addr.PortRange) - case serverconfigs.ProtocolTLS, serverconfigs.ProtocolHTTPS: - // TODO 支持TCP4/TCP6 - // TODO 支持指定特定网卡 - // TODO Addr支持端口范围,如果有多个端口时,随机一个端口使用 - // TODO 支持使用证书 - conn, err = tls.DialWithDialer(&dialer, "tcp", origin.Addr.Host+":"+origin.Addr.PortRange, &tls.Config{ - InsecureSkipVerify: true, - }) - } + // 这个条件很重要,如果没有传递remoteAddr,表示不使用TOA + if len(remoteAddr) > 0 { + toaConfig := sharedTOAManager.Config() + if toaConfig != nil && toaConfig.IsOn { + retries := 3 + for i := 1; i <= retries; i++ { + port := int(toaConfig.RandLocalPort()) + err := sharedTOAManager.SendMsg("add:" + strconv.Itoa(port) + ":" + remoteAddr) + if err != nil { + remotelogs.Error("TOA", "add failed: "+err.Error()) + } else { + dialer := net.Dialer{ + Timeout: origin.ConnTimeoutDuration(), + LocalAddr: &net.TCPAddr{ + Port: port, + }, + } + var conn net.Conn + switch origin.Addr.Protocol { + case "", serverconfigs.ProtocolTCP, serverconfigs.ProtocolHTTP: + // TODO 支持TCP4/TCP6 + // TODO 支持指定特定网卡 + // TODO Addr支持端口范围,如果有多个端口时,随机一个端口使用 + conn, err = dialer.Dial("tcp", origin.Addr.Host+":"+origin.Addr.PortRange) + case serverconfigs.ProtocolTLS, serverconfigs.ProtocolHTTPS: + // TODO 支持TCP4/TCP6 + // TODO 支持指定特定网卡 + // TODO Addr支持端口范围,如果有多个端口时,随机一个端口使用 + // TODO 支持使用证书 + conn, err = tls.DialWithDialer(&dialer, "tcp", origin.Addr.Host+":"+origin.Addr.PortRange, &tls.Config{ + InsecureSkipVerify: true, + }) + } - // TODO 需要在合适的时机删除TOA记录 - if err == nil || i == retries { - return conn, err + // TODO 需要在合适的时机删除TOA记录 + if err == nil || i == retries { + return conn, err + } } } }