优化源站调度

This commit is contained in:
GoEdgeLab
2021-08-01 21:56:02 +08:00
parent e723a64e68
commit 8d49c380d7
5 changed files with 244 additions and 34 deletions

View File

@@ -160,6 +160,9 @@ func (this *HTTPRequest) doReverseProxy() {
httpErr, ok := err.(*url.Error)
if !ok || httpErr.Err != context.Canceled {
// TODO 如果超过最大失败次数,则下线
SharedOriginStateManager.Fail(origin, this.reverseProxy, func() {
this.reverseProxy.ResetScheduling()
})
this.write502(err)
remotelogs.Warn("HTTP_REQUEST_REVERSE_PROXY", this.RawReq.URL.String()+"': "+err.Error())
@@ -183,6 +186,11 @@ func (this *HTTPRequest) doReverseProxy() {
}
return
}
if !origin.IsOk {
SharedOriginStateManager.Success(origin, func() {
this.reverseProxy.ResetScheduling()
})
}
// WAF对出站进行检查
if this.web.FirewallRef != nil && this.web.FirewallRef.IsOn {

View File

@@ -0,0 +1,12 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package nodes
import "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
type OriginState struct {
CountFails int64
UpdatedAt int64
Config *serverconfigs.OriginConfig
ReverseProxy *serverconfigs.ReverseProxyConfig
}

View File

@@ -0,0 +1,172 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package nodes
import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/iwind/TeaGo/Tea"
"sync"
"time"
)
var SharedOriginStateManager = NewOriginStateManager()
func init() {
events.On(events.EventLoaded, func() {
go SharedOriginStateManager.Start()
})
}
// OriginStateManager 源站状态管理
type OriginStateManager struct {
stateMap map[int64]*OriginState // originId => *OriginState
ticker *time.Ticker
locker sync.RWMutex
}
// NewOriginStateManager 获取新管理对象
func NewOriginStateManager() *OriginStateManager {
return &OriginStateManager{
stateMap: map[int64]*OriginState{},
ticker: time.NewTicker(60 * time.Second),
}
}
// Start 启动
func (this *OriginStateManager) Start() {
events.On(events.EventReload, func() {
this.locker.Lock()
this.stateMap = map[int64]*OriginState{}
this.locker.Unlock()
})
if Tea.IsTesting() {
this.ticker = time.NewTicker(10 * time.Second)
}
for range this.ticker.C {
err := this.Loop()
if err != nil {
remotelogs.Error("ORIGIN_MANAGER", err.Error())
}
}
}
// Loop 单次循环检查
func (this *OriginStateManager) Loop() error {
if sharedNodeConfig == nil {
return nil
}
var currentStates = []*OriginState{}
this.locker.Lock()
for originId, state := range this.stateMap {
// 检查Origin是否正在使用
config := sharedNodeConfig.FindOrigin(originId)
if config == nil || !config.IsOn {
delete(this.stateMap, originId)
continue
}
state.Config = config
currentStates = append(currentStates, state)
}
this.locker.Unlock()
if len(currentStates) == 0 {
return nil
}
var count = len(currentStates)
wg := &sync.WaitGroup{}
wg.Add(count)
for _, state := range currentStates {
go func(state *OriginState) {
defer wg.Done()
conn, err := OriginConnect(state.Config, "")
if err == nil {
_ = conn.Close()
// 已经恢复正常
this.locker.Lock()
state.Config.IsOk = true
delete(this.stateMap, state.Config.Id)
this.locker.Unlock()
var reverseProxy = state.ReverseProxy
if reverseProxy != nil {
reverseProxy.ResetScheduling()
}
}
}(state)
}
wg.Wait()
return nil
}
// Fail 添加失败的源站
func (this *OriginStateManager) Fail(origin *serverconfigs.OriginConfig, reverseProxy *serverconfigs.ReverseProxyConfig, callback func()) {
if origin == nil {
return
}
this.locker.Lock()
state, ok := this.stateMap[origin.Id]
var timestamp = time.Now().Unix()
if ok {
if state.UpdatedAt < timestamp-300 { // N 分钟之后重新计数
state.CountFails = 0
state.Config.IsOk = true
}
state.CountFails++
state.Config = origin
state.ReverseProxy = reverseProxy
state.UpdatedAt = timestamp
if origin.IsOk {
origin.IsOk = state.CountFails > 5 // 超过 N 次之后认为是异常
if !origin.IsOk {
if callback != nil {
callback()
}
}
}
} else {
this.stateMap[origin.Id] = &OriginState{
CountFails: 1,
Config: origin,
ReverseProxy: reverseProxy,
UpdatedAt: timestamp,
}
origin.IsOk = true
}
this.locker.Unlock()
}
// Success 添加成功的源站
func (this *OriginStateManager) Success(origin *serverconfigs.OriginConfig, callback func()) {
if origin == nil {
return
}
if !origin.IsOk {
if callback != nil {
callback()
}
}
origin.IsOk = true
this.locker.Lock()
delete(this.stateMap, origin.Id)
this.locker.Unlock()
}
// IsAvailable 检查是否正常
func (this *OriginStateManager) IsAvailable(originId int64) bool {
this.locker.RLock()
_, ok := this.stateMap[originId]
this.locker.RUnlock()
return !ok
}

View File

@@ -0,0 +1,15 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package nodes
import "testing"
func TestOriginManager_Loop(t *testing.T) {
var manager = NewOriginStateManager()
err := manager.Loop()
if err != nil {
t.Fatal(err)
}
t.Log(manager.stateMap)
}

View File

@@ -16,41 +16,44 @@ func OriginConnect(origin *serverconfigs.OriginConfig, remoteAddr string) (net.C
}
// 支持TOA的连接
toaConfig := sharedTOAManager.Config()
if toaConfig != nil && toaConfig.IsOn {
retries := 3
for i := 1; i <= retries; i++ {
port := int(toaConfig.RandLocalPort())
err := sharedTOAManager.SendMsg("add:" + strconv.Itoa(port) + ":" + remoteAddr)
if err != nil {
remotelogs.Error("TOA", "add failed: "+err.Error())
} else {
dialer := net.Dialer{
Timeout: origin.ConnTimeoutDuration(),
LocalAddr: &net.TCPAddr{
Port: port,
},
}
var conn net.Conn
switch origin.Addr.Protocol {
case "", serverconfigs.ProtocolTCP, serverconfigs.ProtocolHTTP:
// TODO 支持TCP4/TCP6
// TODO 支持指定特定网卡
// TODO Addr支持端口范围如果有多个端口时随机一个端口使用
conn, err = dialer.Dial("tcp", origin.Addr.Host+":"+origin.Addr.PortRange)
case serverconfigs.ProtocolTLS, serverconfigs.ProtocolHTTPS:
// TODO 支持TCP4/TCP6
// TODO 支持指定特定网卡
// TODO Addr支持端口范围如果有多个端口时随机一个端口使用
// TODO 支持使用证书
conn, err = tls.DialWithDialer(&dialer, "tcp", origin.Addr.Host+":"+origin.Addr.PortRange, &tls.Config{
InsecureSkipVerify: true,
})
}
// 这个条件很重要如果没有传递remoteAddr表示不使用TOA
if len(remoteAddr) > 0 {
toaConfig := sharedTOAManager.Config()
if toaConfig != nil && toaConfig.IsOn {
retries := 3
for i := 1; i <= retries; i++ {
port := int(toaConfig.RandLocalPort())
err := sharedTOAManager.SendMsg("add:" + strconv.Itoa(port) + ":" + remoteAddr)
if err != nil {
remotelogs.Error("TOA", "add failed: "+err.Error())
} else {
dialer := net.Dialer{
Timeout: origin.ConnTimeoutDuration(),
LocalAddr: &net.TCPAddr{
Port: port,
},
}
var conn net.Conn
switch origin.Addr.Protocol {
case "", serverconfigs.ProtocolTCP, serverconfigs.ProtocolHTTP:
// TODO 支持TCP4/TCP6
// TODO 支持指定特定网卡
// TODO Addr支持端口范围如果有多个端口时随机一个端口使用
conn, err = dialer.Dial("tcp", origin.Addr.Host+":"+origin.Addr.PortRange)
case serverconfigs.ProtocolTLS, serverconfigs.ProtocolHTTPS:
// TODO 支持TCP4/TCP6
// TODO 支持指定特定网卡
// TODO Addr支持端口范围如果有多个端口时随机一个端口使用
// TODO 支持使用证书
conn, err = tls.DialWithDialer(&dialer, "tcp", origin.Addr.Host+":"+origin.Addr.PortRange, &tls.Config{
InsecureSkipVerify: true,
})
}
// TODO 需要在合适的时机删除TOA记录
if err == nil || i == retries {
return conn, err
// TODO 需要在合适的时机删除TOA记录
if err == nil || i == retries {
return conn, err
}
}
}
}