某个服务端口启动失败后,会自动重试

This commit is contained in:
刘祥超
2021-06-06 13:40:00 +08:00
parent aeb1bc08a7
commit 0df5dfad23
3 changed files with 74 additions and 9 deletions

View File

@@ -2,20 +2,20 @@ package nodes
import "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" import "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
// 各协议监听器的接口 // ListenerInterface 各协议监听器的接口
type ListenerInterface interface { type ListenerInterface interface {
// 初始化 // Init 初始化
Init() Init()
// 监听 // Serve 监听
Serve() error Serve() error
// 关闭 // Close 关闭
Close() error Close() error
// 重载配置 // Reload 重载配置
Reload(serverGroup *serverconfigs.ServerGroup) Reload(serverGroup *serverconfigs.ServerGroup)
// 获取当前活跃的连接数 // CountActiveListeners 获取当前活跃的连接数
CountActiveListeners() int CountActiveListeners() int
} }

View File

@@ -4,10 +4,12 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/lists" "github.com/iwind/TeaGo/lists"
"net/url" "net/url"
"regexp" "regexp"
"sync" "sync"
"time"
) )
var sharedListenerManager = NewListenerManager() var sharedListenerManager = NewListenerManager()
@@ -17,13 +19,31 @@ type ListenerManager struct {
listenersMap map[string]*Listener // addr => *Listener listenersMap map[string]*Listener // addr => *Listener
locker sync.Mutex locker sync.Mutex
lastConfig *nodeconfigs.NodeConfig lastConfig *nodeconfigs.NodeConfig
retryListenerMap map[string]*Listener // 需要重试的监听器 addr => Listener
ticker *time.Ticker
} }
// NewListenerManager 获取新对象 // NewListenerManager 获取新对象
func NewListenerManager() *ListenerManager { func NewListenerManager() *ListenerManager {
return &ListenerManager{ manager := &ListenerManager{
listenersMap: map[string]*Listener{}, listenersMap: map[string]*Listener{},
retryListenerMap: map[string]*Listener{},
ticker: time.NewTicker(1 * time.Minute),
} }
// 提升测试效率
if Tea.IsTesting() {
manager.ticker = time.NewTicker(5 * time.Second)
}
go func() {
for range manager.ticker.C {
manager.retryListeners()
}
}()
return manager
} }
// Start 启动监听 // Start 启动监听
@@ -31,6 +51,9 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error {
this.locker.Lock() this.locker.Lock()
defer this.locker.Unlock() defer this.locker.Unlock()
// 重置数据
this.retryListenerMap = map[string]*Listener{}
// 检查是否有变化 // 检查是否有变化
/**if this.lastConfig != nil && this.lastConfig.Version == node.Version { /**if this.lastConfig != nil && this.lastConfig.Version == node.Version {
return nil return nil
@@ -83,6 +106,9 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error {
listener.Reload(group) listener.Reload(group)
err := listener.Listen() err := listener.Listen()
if err != nil { if err != nil {
// 放入到重试队列中
this.retryListenerMap[addr] = listener
firstServer := group.FirstServer() firstServer := group.FirstServer()
if firstServer == nil { if firstServer == nil {
remotelogs.Error("LISTENER_MANAGER", err.Error()) remotelogs.Error("LISTENER_MANAGER", err.Error())
@@ -122,3 +148,18 @@ func (this *ListenerManager) prettyAddress(addr string) string {
} }
return u.String() return u.String()
} }
// 重试失败的Listener
func (this *ListenerManager) retryListeners() {
this.locker.Lock()
defer this.locker.Unlock()
for addr, listener := range this.retryListenerMap {
err := listener.Listen()
if err == nil {
delete(this.retryListenerMap, addr)
this.listenersMap[addr] = listener
remotelogs.ServerSuccess(listener.group.FirstServer().Id, "LISTENER_MANAGER", "retry to listen '"+addr+"' successfully")
}
}
}

View File

@@ -93,7 +93,7 @@ func Error(tag string, description string) {
} }
} }
// ServerError 打印错误信息 // ServerError 打印服务相关错误信息
func ServerError(serverId int64, tag string, description string) { func ServerError(serverId int64, tag string, description string) {
logs.Println("[" + tag + "]" + description) logs.Println("[" + tag + "]" + description)
@@ -117,6 +117,30 @@ func ServerError(serverId int64, tag string, description string) {
} }
} }
// ServerSuccess 打印服务相关成功信息
func ServerSuccess(serverId int64, tag string, description string) {
logs.Println("[" + tag + "]" + description)
nodeConfig, _ := nodeconfigs.SharedNodeConfig()
if nodeConfig == nil {
return
}
select {
case logChan <- &pb.NodeLog{
Role: teaconst.Role,
Tag: tag,
Description: description,
Level: "success",
NodeId: nodeConfig.Id,
ServerId: serverId,
CreatedAt: time.Now().Unix(),
}:
default:
}
}
// 上传日志 // 上传日志
func uploadLogs() error { func uploadLogs() error {
logList := []*pb.NodeLog{} logList := []*pb.NodeLog{}