diff --git a/internal/nodes/listener_interface.go b/internal/nodes/listener_interface.go index 01ae031..f346b72 100644 --- a/internal/nodes/listener_interface.go +++ b/internal/nodes/listener_interface.go @@ -2,20 +2,20 @@ package nodes import "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" -// 各协议监听器的接口 +// ListenerInterface 各协议监听器的接口 type ListenerInterface interface { - // 初始化 + // Init 初始化 Init() - // 监听 + // Serve 监听 Serve() error - // 关闭 + // Close 关闭 Close() error - // 重载配置 + // Reload 重载配置 Reload(serverGroup *serverconfigs.ServerGroup) - // 获取当前活跃的连接数 + // CountActiveListeners 获取当前活跃的连接数 CountActiveListeners() int } diff --git a/internal/nodes/listener_manager.go b/internal/nodes/listener_manager.go index 591d125..4056822 100644 --- a/internal/nodes/listener_manager.go +++ b/internal/nodes/listener_manager.go @@ -4,10 +4,12 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/lists" "net/url" "regexp" "sync" + "time" ) var sharedListenerManager = NewListenerManager() @@ -17,13 +19,31 @@ type ListenerManager struct { listenersMap map[string]*Listener // addr => *Listener locker sync.Mutex lastConfig *nodeconfigs.NodeConfig + + retryListenerMap map[string]*Listener // 需要重试的监听器 addr => Listener + ticker *time.Ticker } // NewListenerManager 获取新对象 func NewListenerManager() *ListenerManager { - return &ListenerManager{ - listenersMap: map[string]*Listener{}, + manager := &ListenerManager{ + listenersMap: map[string]*Listener{}, + retryListenerMap: map[string]*Listener{}, + ticker: time.NewTicker(1 * time.Minute), } + + // 提升测试效率 + if Tea.IsTesting() { + manager.ticker = time.NewTicker(5 * time.Second) + } + + go func() { + for range manager.ticker.C { + manager.retryListeners() + } + }() + + return manager } // Start 启动监听 @@ -31,6 +51,9 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error { this.locker.Lock() defer this.locker.Unlock() + // 重置数据 + this.retryListenerMap = map[string]*Listener{} + // 检查是否有变化 /**if this.lastConfig != nil && this.lastConfig.Version == node.Version { return nil @@ -83,6 +106,9 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error { listener.Reload(group) err := listener.Listen() if err != nil { + // 放入到重试队列中 + this.retryListenerMap[addr] = listener + firstServer := group.FirstServer() if firstServer == nil { remotelogs.Error("LISTENER_MANAGER", err.Error()) @@ -122,3 +148,18 @@ func (this *ListenerManager) prettyAddress(addr string) string { } return u.String() } + +// 重试失败的Listener +func (this *ListenerManager) retryListeners() { + this.locker.Lock() + defer this.locker.Unlock() + + for addr, listener := range this.retryListenerMap { + err := listener.Listen() + if err == nil { + delete(this.retryListenerMap, addr) + this.listenersMap[addr] = listener + remotelogs.ServerSuccess(listener.group.FirstServer().Id, "LISTENER_MANAGER", "retry to listen '"+addr+"' successfully") + } + } +} diff --git a/internal/remotelogs/utils.go b/internal/remotelogs/utils.go index 3c82065..8ac79b2 100644 --- a/internal/remotelogs/utils.go +++ b/internal/remotelogs/utils.go @@ -93,7 +93,7 @@ func Error(tag string, description string) { } } -// ServerError 打印错误信息 +// ServerError 打印服务相关错误信息 func ServerError(serverId int64, tag string, description string) { logs.Println("[" + tag + "]" + description) @@ -117,6 +117,30 @@ func ServerError(serverId int64, tag string, description string) { } } +// ServerSuccess 打印服务相关成功信息 +func ServerSuccess(serverId int64, tag string, description string) { + logs.Println("[" + tag + "]" + description) + + nodeConfig, _ := nodeconfigs.SharedNodeConfig() + if nodeConfig == nil { + return + } + + select { + case logChan <- &pb.NodeLog{ + Role: teaconst.Role, + Tag: tag, + Description: description, + Level: "success", + NodeId: nodeConfig.Id, + ServerId: serverId, + CreatedAt: time.Now().Unix(), + }: + default: + + } +} + // 上传日志 func uploadLogs() error { logList := []*pb.NodeLog{}