From ad1f51bf1f20072866a3b4686fd9c11ee2fb3f7d Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Sun, 6 Nov 2022 12:07:26 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8=E7=89=88=E6=9C=AC=E5=8F=B7?= =?UTF-8?q?=E6=9D=A5=E8=AF=BB=E5=8F=96=E8=8A=82=E7=82=B9=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=EF=BC=8C=E6=8F=90=E5=8D=87=E4=BB=BB=E5=8A=A1=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E7=A8=B3=E5=AE=9A=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/nodes/node.go | 43 +++++++++++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 30d6e68..0845c0f 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -29,7 +29,6 @@ import ( "github.com/andybalholm/brotli" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/lists" - "github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/maps" "github.com/iwind/TeaGo/types" "github.com/iwind/gosock/pkg/gosock" @@ -64,6 +63,8 @@ type Node struct { timezone string updatingServerMap map[int64]*serverconfigs.ServerConfig + + lastTaskVersion int64 } func NewNode() *Node { @@ -312,7 +313,9 @@ func (this *Node) loop() error { } var nodeCtx = rpcClient.Context() - tasksResp, err := rpcClient.NodeTaskRPC.FindNodeTasks(nodeCtx, &pb.FindNodeTasksRequest{}) + tasksResp, err := rpcClient.NodeTaskRPC.FindNodeTasks(nodeCtx, &pb.FindNodeTasksRequest{ + Version: this.lastTaskVersion, + }) if err != nil { if rpc.IsConnError(err) && !Tea.IsTesting() { return nil @@ -321,7 +324,10 @@ func (this *Node) loop() error { } for _, task := range tasksResp.NodeTasks { err := this.execTask(rpcClient, nodeCtx, task) - this.finishTask(task.Id, err) + if !this.finishTask(task.Id, task.Version, err) { + // 防止失败的任务无法重试 + break + } } return nil @@ -452,20 +458,24 @@ func (this *Node) execTask(rpcClient *rpc.RPCClient, nodeCtx context.Context, ta } // 标记任务完成 -func (this *Node) finishTask(taskId int64, taskErr error) { +func (this *Node) finishTask(taskId int64, taskVersion int64, taskErr error) (success bool) { if taskId <= 0 { - return + return true } rpcClient, err := rpc.SharedRPC() if err != nil { - logs.Println("[NODE]", "create rpc client failed: "+err.Error()) - return + remotelogs.Debug("NODE", "create rpc client failed: "+err.Error()) + return false } var nodeCtx = rpcClient.Context() var isOk = taskErr == nil + if isOk && taskVersion > this.lastTaskVersion { + this.lastTaskVersion = taskVersion + } + var errMsg = "" if taskErr != nil { errMsg = taskErr.Error() @@ -476,15 +486,18 @@ func (this *Node) finishTask(taskId int64, taskErr error) { IsOk: isOk, Error: errMsg, }) + success = err == nil if err != nil { - // 不需要上报到服务中心 + // 连接错误不需要上报到服务中心 if rpc.IsConnError(err) { - logs.Println("[NODE]", "report task done failed: "+err.Error()) + remotelogs.Debug("NODE", "report task done failed: "+err.Error()) } else { remotelogs.Error("NODE", "report task done failed: "+err.Error()) } } + + return success } // 读取API配置 @@ -720,12 +733,12 @@ func (this *Node) checkClusterConfig() error { return err } - logs.Println("[NODE]registering node to cluster ...") + remotelogs.Debug("NODE", "registering node to cluster ...") resp, err := rpcClient.NodeRPC.RegisterClusterNode(rpcClient.ClusterContext(config.ClusterId, config.Secret), &pb.RegisterClusterNodeRequest{Name: HOSTNAME}) if err != nil { return err } - logs.Println("[NODE]registered successfully") + remotelogs.Debug("NODE", "registered successfully") // 写入到配置文件中 if len(resp.Endpoints) == 0 { @@ -742,12 +755,12 @@ func (this *Node) checkClusterConfig() error { NodeId: resp.UniqueId, Secret: resp.Secret, } - logs.Println("[NODE]writing 'configs/api.yaml' ...") + remotelogs.Debug("NODE", "writing 'configs/api.yaml' ...") err = apiConfig.WriteFile(Tea.ConfigFile("api.yaml")) if err != nil { return err } - logs.Println("[NODE]wrote 'configs/api.yaml' successfully") + remotelogs.Debug("NODE", "wrote 'configs/api.yaml' successfully") return nil } @@ -952,12 +965,12 @@ func (this *Node) listenSock() error { err := this.sock.Listen() if err != nil { - logs.Println("NODE", err.Error()) + remotelogs.Debug("NODE", err.Error()) } }) events.OnKey(events.EventQuit, this, func() { - remotelogs.Println("NODE", "quit unix sock") + remotelogs.Debug("NODE", "quit unix sock") _ = this.sock.Close() })