diff --git a/internal/nodes/api_stream.go b/internal/nodes/api_stream.go index 88a33d3..8c8a3d7 100644 --- a/internal/nodes/api_stream.go +++ b/internal/nodes/api_stream.go @@ -10,7 +10,6 @@ import ( teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/errors" "github.com/TeaOSLab/EdgeNode/internal/events" - "github.com/TeaOSLab/EdgeNode/internal/iplibrary" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/utils" @@ -97,10 +96,8 @@ func (this *APIStream) loop() error { err = this.handlePurgeCache(message) case messageconfigs.MessageCodePreheatCache: // 预热缓存 err = this.handlePreheatCache(message) - case messageconfigs.MessageCodeConfigChanged: // 配置变化 - err = this.handleConfigChanged(message) - case messageconfigs.MessageCodeIPListChanged: // IPList变化 - err = this.handleIPListChanged(message) + case messageconfigs.MessageCodeNewNodeTask: // 有新的任务 + err = this.handleNewNodeTask(message) case messageconfigs.MessageCodeCheckSystemdService: // 检查Systemd服务 err = this.handleCheckSystemdService(message) default: @@ -480,20 +477,9 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error { } // 处理配置变化 -func (this *APIStream) handleConfigChanged(message *pb.NodeStreamMessage) error { +func (this *APIStream) handleNewNodeTask(message *pb.NodeStreamMessage) error { select { - case changeNotify <- true: - default: - - } - this.replyOk(message.RequestId, "ok") - return nil -} - -// 处理IPList变化 -func (this *APIStream) handleIPListChanged(message *pb.NodeStreamMessage) error { - select { - case iplibrary.IPListUpdateNotify <- true: + case nodeTaskNotify <- true: default: } diff --git a/internal/nodes/listener_manager.go b/internal/nodes/listener_manager.go index bcdb716..644ba59 100644 --- a/internal/nodes/listener_manager.go +++ b/internal/nodes/listener_manager.go @@ -32,9 +32,9 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error { defer this.locker.Unlock() // 检查是否有变化 - if this.lastConfig != nil && this.lastConfig.Version == node.Version { + /**if this.lastConfig != nil && this.lastConfig.Version == node.Version { return nil - } + }**/ this.lastConfig = node // 初始化 diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 3a3dcd8..637c669 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -12,6 +12,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/configs" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/iplibrary" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/utils" @@ -30,12 +31,12 @@ import ( "time" ) -var lastVersion = int64(-1) var sharedNodeConfig *nodeconfigs.NodeConfig -var changeNotify = make(chan bool, 8) +var nodeTaskNotify = make(chan bool, 8) // 节点 type Node struct { + isLoaded bool } func NewNode() *Node { @@ -75,7 +76,7 @@ func (this *Node) Start() { // 读取API配置 tryTimes := 0 for { - err = this.syncConfig(false) + err = this.syncConfig() if err != nil { tryTimes++ @@ -227,8 +228,66 @@ func (this *Node) listenSignals() { }() } +// 循环 +func (this *Node) loop() error { + // 检查api.yaml是否存在 + apiConfigFile := Tea.ConfigFile("api.yaml") + _, err := os.Stat(apiConfigFile) + if err != nil { + return nil + } + + rpcClient, err := rpc.SharedRPC() + if err != nil { + return errors.New("create rpc client failed: " + err.Error()) + } + + nodeCtx := rpcClient.Context() + tasksResp, err := rpcClient.NodeTaskRPC().FindNodeTasks(nodeCtx, &pb.FindNodeTasksRequest{}) + if err != nil { + return errors.New("read node tasks failed: " + err.Error()) + } + for _, task := range tasksResp.NodeTasks { + logs.Println("update task:", task.Type) // TODO + switch task.Type { + case "ipItemChanged": + iplibrary.IPListUpdateNotify <- true + + // 修改为已同步 + _, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{ + NodeTaskId: task.Id, + IsOk: true, + Error: "", + }) + if err != nil { + return err + } + case "configChanged": + err := this.syncConfig() + if err != nil { + _, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{ + NodeTaskId: task.Id, + IsOk: false, + Error: err.Error(), + }) + } else { + _, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{ + NodeTaskId: task.Id, + IsOk: true, + Error: "", + }) + } + if err != nil { + return err + } + } + } + + return nil +} + // 读取API配置 -func (this *Node) syncConfig(isFirstTime bool) error { +func (this *Node) syncConfig() error { // 检查api.yaml是否存在 apiConfigFile := Tea.ConfigFile("api.yaml") _, err := os.Stat(apiConfigFile) @@ -250,9 +309,13 @@ func (this *Node) syncConfig(isFirstTime bool) error { if err != nil { return errors.New("create rpc client failed: " + err.Error()) } + + // 获取同步任务 + nodeCtx := rpcClient.Context() + // TODO 这里考虑只同步版本号有变更的 - configResp, err := rpcClient.NodeRPC().FindCurrentNodeConfig(rpcClient.Context(), &pb.FindCurrentNodeConfigRequest{ - Version: lastVersion, + configResp, err := rpcClient.NodeRPC().FindCurrentNodeConfig(nodeCtx, &pb.FindCurrentNodeConfigRequest{ + Version: -1, // 更新所有版本 }) if err != nil { return errors.New("read config from rpc failed: " + err.Error()) @@ -274,12 +337,6 @@ func (this *Node) syncConfig(isFirstTime bool) error { return err } - // 如果版本相同,则只是保存 - if lastVersion == nodeConfig.Version { - return nil - } - lastVersion = nodeConfig.Version - err = nodeConfig.Init() if err != nil { return err @@ -293,7 +350,7 @@ func (this *Node) syncConfig(isFirstTime bool) error { } // 刷新配置 - if isFirstTime { + if this.isLoaded { remotelogs.Println("NODE", "reloading config ...") } else { remotelogs.Println("NODE", "loading config ...") @@ -315,10 +372,12 @@ func (this *Node) syncConfig(isFirstTime bool) error { // 发送事件 events.Notify(events.EventReload) - if !isFirstTime { + if this.isLoaded { return sharedListenerManager.Start(nodeConfig) } + this.isLoaded = true + return nil } @@ -334,13 +393,13 @@ func (this *Node) startSyncTimer() { for { select { case <-ticker.C: - err := this.syncConfig(false) + err := this.loop() if err != nil { remotelogs.Error("NODE", "sync config error: "+err.Error()) continue } - case <-changeNotify: - err := this.syncConfig(false) + case <-nodeTaskNotify: + err := this.loop() if err != nil { remotelogs.Error("NODE", "sync config error: "+err.Error()) continue diff --git a/internal/rpc/rpc_client.go b/internal/rpc/rpc_client.go index aea509b..468bef0 100644 --- a/internal/rpc/rpc_client.go +++ b/internal/rpc/rpc_client.go @@ -53,6 +53,10 @@ func (this *RPCClient) NodeLogRPC() pb.NodeLogServiceClient { return pb.NewNodeLogServiceClient(this.pickConn()) } +func (this *RPCClient) NodeTaskRPC() pb.NodeTaskServiceClient { + return pb.NewNodeTaskServiceClient(this.pickConn()) +} + func (this *RPCClient) HTTPAccessLogRPC() pb.HTTPAccessLogServiceClient { return pb.NewHTTPAccessLogServiceClient(this.pickConn()) }