From 4cadf38c98a9c19ee4b0c9bd7d17a90061e6de7d Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Sun, 6 Nov 2022 12:03:11 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8=E7=89=88=E6=9C=AC=E5=8F=B7?= =?UTF-8?q?=E6=9D=A5=E8=AF=BB=E5=8F=96=E8=8A=82=E7=82=B9=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=EF=BC=8C=E6=8F=90=E5=8D=87=E4=BB=BB=E5=8A=A1=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E7=A8=B3=E5=AE=9A=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/db/models/node_dao.go | 4 +- internal/db/models/node_task_dao.go | 43 +++++++++++++++++----- internal/db/models/node_task_dao_test.go | 2 +- internal/rpc/services/service_node_task.go | 4 +- internal/setup/sql_upgrade.go | 20 +++++++++- internal/setup/sql_upgrade_test.go | 19 ++++++++++ 6 files changed, 75 insertions(+), 17 deletions(-) diff --git a/internal/db/models/node_dao.go b/internal/db/models/node_dao.go index 59e5f99b..45298c0a 100644 --- a/internal/db/models/node_dao.go +++ b/internal/db/models/node_dao.go @@ -2022,7 +2022,7 @@ func (this *NodeDAO) UpdateNodeDDoSProtection(tx *dbs.Tx, nodeId int64, ddosProt return err } if clusterId > 0 { - return SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, 0, 0, NodeTaskTypeDDosProtectionChanged, 0) + return SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, 0, 0, NodeTaskTypeDDosProtectionChanged) } return nil } @@ -2034,7 +2034,7 @@ func (this *NodeDAO) NotifyUpdate(tx *dbs.Tx, nodeId int64) error { if err != nil { return err } - err = SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, 0, 0, NodeTaskTypeConfigChanged, 0) + err = SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, 0, 0, NodeTaskTypeConfigChanged) if err != nil { return err } diff --git a/internal/db/models/node_task_dao.go b/internal/db/models/node_task_dao.go index f72348a8..af004307 100644 --- a/internal/db/models/node_task_dao.go +++ b/internal/db/models/node_task_dao.go @@ -57,7 +57,7 @@ func init() { } // CreateNodeTask 创建单个节点任务 -func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, role string, clusterId int64, nodeId int64, userId int64, serverId int64, taskType NodeTaskType, version int64) error { +func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, role string, clusterId int64, nodeId int64, userId int64, serverId int64, taskType NodeTaskType) error { if clusterId <= 0 || nodeId <= 0 { return nil } @@ -69,8 +69,13 @@ func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, role string, clusterId int64 uniqueId += "@" + types.String(userId) } + version, err := this.increaseVersion(tx) + if err != nil { + return err + } + var updatedAt = time.Now().Unix() - _, _, err := this.Query(tx). + _, _, err = this.Query(tx). InsertOrUpdate(maps.Map{ "role": role, "clusterId": clusterId, @@ -157,9 +162,8 @@ func (this *NodeTaskDAO) ExtractNodeClusterTask(tx *dbs.Tx, clusterId int64, use return err } - var version = time.Now().UnixNano() for _, nodeId := range nodeIds { - err = this.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, userId, serverId, taskType, version) + err = this.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, userId, serverId, taskType) if err != nil { return err } @@ -225,14 +229,22 @@ func (this *NodeTaskDAO) DeleteNodeTasks(tx *dbs.Tx, role string, nodeId int64) } // FindDoingNodeTasks 查询一个节点的所有任务 -func (this *NodeTaskDAO) FindDoingNodeTasks(tx *dbs.Tx, role string, nodeId int64) (result []*NodeTask, err error) { +func (this *NodeTaskDAO) FindDoingNodeTasks(tx *dbs.Tx, role string, nodeId int64, version int64) (result []*NodeTask, err error) { if nodeId <= 0 { return } - _, err = this.Query(tx). + var query = this.Query(tx). Attr("role", role). Attr("nodeId", nodeId). - Where("(isDone=0 OR (isDone=1 AND isOk=0))"). + Asc("version") + if version > 0 { + query.Lt("LENGTH(version)", 19) // 兼容以往版本 + query.Gt("version", version) + } else { + // 第一次访问时只取当前正在执行的或者执行失败的 + query.Where("(isDone=0 OR (isDone=1 AND isOk=0))") + } + _, err = query. Slice(&result). FindAll() return @@ -240,8 +252,16 @@ func (this *NodeTaskDAO) FindDoingNodeTasks(tx *dbs.Tx, role string, nodeId int6 // UpdateNodeTaskDone 修改节点任务的完成状态 func (this *NodeTaskDAO) UpdateNodeTaskDone(tx *dbs.Tx, taskId int64, isOk bool, errorMessage string) error { - _, err := this.Query(tx). - Pk(taskId). + var query = this.Query(tx). + Pk(taskId) + if !isOk { + version, err := this.increaseVersion(tx) + if err != nil { + return err + } + query.Set("version", version) + } + _, err := query. Set("isDone", 1). Set("isOk", isOk). Set("error", errorMessage). @@ -373,3 +393,8 @@ func (this *NodeTaskDAO) UpdateTasksNotified(tx *dbs.Tx, taskIds []int64) error } return nil } + +// 生成一个版本号 +func (this *NodeTaskDAO) increaseVersion(tx *dbs.Tx) (version int64, err error) { + return SharedSysLockerDAO.Increase(tx, "NODE_TASK_VERSION", 0) +} diff --git a/internal/db/models/node_task_dao_test.go b/internal/db/models/node_task_dao_test.go index 7848f2b1..e3f723e8 100644 --- a/internal/db/models/node_task_dao_test.go +++ b/internal/db/models/node_task_dao_test.go @@ -11,7 +11,7 @@ func TestNodeTaskDAO_CreateNodeTask(t *testing.T) { dbs.NotifyReady() var tx *dbs.Tx - err := SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, 1, 2, 0, 0, NodeTaskTypeConfigChanged, 0) + err := SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, 1, 2, 0, 0, NodeTaskTypeConfigChanged) if err != nil { t.Fatal(err) } diff --git a/internal/rpc/services/service_node_task.go b/internal/rpc/services/service_node_task.go index 830f1637..63f015b2 100644 --- a/internal/rpc/services/service_node_task.go +++ b/internal/rpc/services/service_node_task.go @@ -24,10 +24,8 @@ func (this *NodeTaskService) FindNodeTasks(ctx context.Context, req *pb.FindNode return nil, err } - _ = req - var tx = this.NullTx() - tasks, err := models.SharedNodeTaskDAO.FindDoingNodeTasks(tx, nodeType, nodeId) + tasks, err := models.SharedNodeTaskDAO.FindDoingNodeTasks(tx, nodeType, nodeId, req.Version) if err != nil { return nil, err } diff --git a/internal/setup/sql_upgrade.go b/internal/setup/sql_upgrade.go index 7c77e9af..3ca96da1 100644 --- a/internal/setup/sql_upgrade.go +++ b/internal/setup/sql_upgrade.go @@ -83,10 +83,13 @@ var upgradeFuncs = []*upgradeVersion{ "0.4.11", upgradeV0_4_11, }, { - "v0.5.3", upgradeV0_5_3, + "0.5.3", upgradeV0_5_3, }, { - "v0.5.6", upgradeV0_5_6, + "0.5.6", upgradeV0_5_6, + }, + { + "0.5.7", upgradeV0_5_7, }, } @@ -716,3 +719,16 @@ func upgradeV0_4_11(db *dbs.DB) error { return nil } + +// v0.5.7 +func upgradeV0_5_7(db *dbs.DB) error { + // node task versions + { + _, err := db.Exec("UPDATE edgeNodeTasks SET version=0 WHERE LENGTH(version)=19") + if err != nil { + return err + } + } + + return nil +} diff --git a/internal/setup/sql_upgrade_test.go b/internal/setup/sql_upgrade_test.go index e8424ea3..2022fd67 100644 --- a/internal/setup/sql_upgrade_test.go +++ b/internal/setup/sql_upgrade_test.go @@ -251,3 +251,22 @@ func TestUpgradeSQLData_v0_5_3(t *testing.T) { } t.Log("ok") } + +func TestUpgradeSQLData_v0_5_7(t *testing.T) { + db, err := dbs.NewInstanceFromConfig(&dbs.DBConfig{ + Driver: "mysql", + Dsn: "root:123456@tcp(127.0.0.1:3306)/db_edge?charset=utf8mb4&timeout=30s", + Prefix: "edge", + }) + if err != nil { + t.Fatal(err) + } + defer func() { + _ = db.Close() + }() + err = upgradeV0_5_7(db) + if err != nil { + t.Fatal(err) + } + t.Log("ok") +}