使用版本号来读取节点任务,提升任务同步稳定性

This commit is contained in:
刘祥超
2022-11-06 12:07:26 +08:00
parent 258380f75c
commit 534f013f59

View File

@@ -29,7 +29,6 @@ import (
"github.com/andybalholm/brotli" "github.com/andybalholm/brotli"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/lists" "github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/maps" "github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/types" "github.com/iwind/TeaGo/types"
"github.com/iwind/gosock/pkg/gosock" "github.com/iwind/gosock/pkg/gosock"
@@ -64,6 +63,8 @@ type Node struct {
timezone string timezone string
updatingServerMap map[int64]*serverconfigs.ServerConfig updatingServerMap map[int64]*serverconfigs.ServerConfig
lastTaskVersion int64
} }
func NewNode() *Node { func NewNode() *Node {
@@ -312,7 +313,9 @@ func (this *Node) loop() error {
} }
var nodeCtx = rpcClient.Context() var nodeCtx = rpcClient.Context()
tasksResp, err := rpcClient.NodeTaskRPC.FindNodeTasks(nodeCtx, &pb.FindNodeTasksRequest{}) tasksResp, err := rpcClient.NodeTaskRPC.FindNodeTasks(nodeCtx, &pb.FindNodeTasksRequest{
Version: this.lastTaskVersion,
})
if err != nil { if err != nil {
if rpc.IsConnError(err) && !Tea.IsTesting() { if rpc.IsConnError(err) && !Tea.IsTesting() {
return nil return nil
@@ -321,7 +324,10 @@ func (this *Node) loop() error {
} }
for _, task := range tasksResp.NodeTasks { for _, task := range tasksResp.NodeTasks {
err := this.execTask(rpcClient, nodeCtx, task) err := this.execTask(rpcClient, nodeCtx, task)
this.finishTask(task.Id, err) if !this.finishTask(task.Id, task.Version, err) {
// 防止失败的任务无法重试
break
}
} }
return nil return nil
@@ -452,20 +458,24 @@ func (this *Node) execTask(rpcClient *rpc.RPCClient, nodeCtx context.Context, ta
} }
// 标记任务完成 // 标记任务完成
func (this *Node) finishTask(taskId int64, taskErr error) { func (this *Node) finishTask(taskId int64, taskVersion int64, taskErr error) (success bool) {
if taskId <= 0 { if taskId <= 0 {
return return true
} }
rpcClient, err := rpc.SharedRPC() rpcClient, err := rpc.SharedRPC()
if err != nil { if err != nil {
logs.Println("[NODE]", "create rpc client failed: "+err.Error()) remotelogs.Debug("NODE", "create rpc client failed: "+err.Error())
return return false
} }
var nodeCtx = rpcClient.Context() var nodeCtx = rpcClient.Context()
var isOk = taskErr == nil var isOk = taskErr == nil
if isOk && taskVersion > this.lastTaskVersion {
this.lastTaskVersion = taskVersion
}
var errMsg = "" var errMsg = ""
if taskErr != nil { if taskErr != nil {
errMsg = taskErr.Error() errMsg = taskErr.Error()
@@ -476,15 +486,18 @@ func (this *Node) finishTask(taskId int64, taskErr error) {
IsOk: isOk, IsOk: isOk,
Error: errMsg, Error: errMsg,
}) })
success = err == nil
if err != nil { if err != nil {
// 不需要上报到服务中心 // 连接错误不需要上报到服务中心
if rpc.IsConnError(err) { if rpc.IsConnError(err) {
logs.Println("[NODE]", "report task done failed: "+err.Error()) remotelogs.Debug("NODE", "report task done failed: "+err.Error())
} else { } else {
remotelogs.Error("NODE", "report task done failed: "+err.Error()) remotelogs.Error("NODE", "report task done failed: "+err.Error())
} }
} }
return success
} }
// 读取API配置 // 读取API配置
@@ -720,12 +733,12 @@ func (this *Node) checkClusterConfig() error {
return err return err
} }
logs.Println("[NODE]registering node to cluster ...") remotelogs.Debug("NODE", "registering node to cluster ...")
resp, err := rpcClient.NodeRPC.RegisterClusterNode(rpcClient.ClusterContext(config.ClusterId, config.Secret), &pb.RegisterClusterNodeRequest{Name: HOSTNAME}) resp, err := rpcClient.NodeRPC.RegisterClusterNode(rpcClient.ClusterContext(config.ClusterId, config.Secret), &pb.RegisterClusterNodeRequest{Name: HOSTNAME})
if err != nil { if err != nil {
return err return err
} }
logs.Println("[NODE]registered successfully") remotelogs.Debug("NODE", "registered successfully")
// 写入到配置文件中 // 写入到配置文件中
if len(resp.Endpoints) == 0 { if len(resp.Endpoints) == 0 {
@@ -742,12 +755,12 @@ func (this *Node) checkClusterConfig() error {
NodeId: resp.UniqueId, NodeId: resp.UniqueId,
Secret: resp.Secret, Secret: resp.Secret,
} }
logs.Println("[NODE]writing 'configs/api.yaml' ...") remotelogs.Debug("NODE", "writing 'configs/api.yaml' ...")
err = apiConfig.WriteFile(Tea.ConfigFile("api.yaml")) err = apiConfig.WriteFile(Tea.ConfigFile("api.yaml"))
if err != nil { if err != nil {
return err return err
} }
logs.Println("[NODE]wrote 'configs/api.yaml' successfully") remotelogs.Debug("NODE", "wrote 'configs/api.yaml' successfully")
return nil return nil
} }
@@ -952,12 +965,12 @@ func (this *Node) listenSock() error {
err := this.sock.Listen() err := this.sock.Listen()
if err != nil { if err != nil {
logs.Println("NODE", err.Error()) remotelogs.Debug("NODE", err.Error())
} }
}) })
events.OnKey(events.EventQuit, this, func() { events.OnKey(events.EventQuit, this, func() {
remotelogs.Println("NODE", "quit unix sock") remotelogs.Debug("NODE", "quit unix sock")
_ = this.sock.Close() _ = this.sock.Close()
}) })