使用版本号来读取节点任务,提升任务同步稳定性

This commit is contained in:
GoEdgeLab
2022-11-06 12:03:11 +08:00
parent d916468974
commit 4cadf38c98
6 changed files with 75 additions and 17 deletions

View File

@@ -2022,7 +2022,7 @@ func (this *NodeDAO) UpdateNodeDDoSProtection(tx *dbs.Tx, nodeId int64, ddosProt
return err return err
} }
if clusterId > 0 { if clusterId > 0 {
return SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, 0, 0, NodeTaskTypeDDosProtectionChanged, 0) return SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, 0, 0, NodeTaskTypeDDosProtectionChanged)
} }
return nil return nil
} }
@@ -2034,7 +2034,7 @@ func (this *NodeDAO) NotifyUpdate(tx *dbs.Tx, nodeId int64) error {
if err != nil { if err != nil {
return err return err
} }
err = SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, 0, 0, NodeTaskTypeConfigChanged, 0) err = SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, 0, 0, NodeTaskTypeConfigChanged)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -57,7 +57,7 @@ func init() {
} }
// CreateNodeTask 创建单个节点任务 // CreateNodeTask 创建单个节点任务
func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, role string, clusterId int64, nodeId int64, userId int64, serverId int64, taskType NodeTaskType, version int64) error { func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, role string, clusterId int64, nodeId int64, userId int64, serverId int64, taskType NodeTaskType) error {
if clusterId <= 0 || nodeId <= 0 { if clusterId <= 0 || nodeId <= 0 {
return nil return nil
} }
@@ -69,8 +69,13 @@ func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, role string, clusterId int64
uniqueId += "@" + types.String(userId) uniqueId += "@" + types.String(userId)
} }
version, err := this.increaseVersion(tx)
if err != nil {
return err
}
var updatedAt = time.Now().Unix() var updatedAt = time.Now().Unix()
_, _, err := this.Query(tx). _, _, err = this.Query(tx).
InsertOrUpdate(maps.Map{ InsertOrUpdate(maps.Map{
"role": role, "role": role,
"clusterId": clusterId, "clusterId": clusterId,
@@ -157,9 +162,8 @@ func (this *NodeTaskDAO) ExtractNodeClusterTask(tx *dbs.Tx, clusterId int64, use
return err return err
} }
var version = time.Now().UnixNano()
for _, nodeId := range nodeIds { for _, nodeId := range nodeIds {
err = this.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, userId, serverId, taskType, version) err = this.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, userId, serverId, taskType)
if err != nil { if err != nil {
return err return err
} }
@@ -225,14 +229,22 @@ func (this *NodeTaskDAO) DeleteNodeTasks(tx *dbs.Tx, role string, nodeId int64)
} }
// FindDoingNodeTasks 查询一个节点的所有任务 // FindDoingNodeTasks 查询一个节点的所有任务
func (this *NodeTaskDAO) FindDoingNodeTasks(tx *dbs.Tx, role string, nodeId int64) (result []*NodeTask, err error) { func (this *NodeTaskDAO) FindDoingNodeTasks(tx *dbs.Tx, role string, nodeId int64, version int64) (result []*NodeTask, err error) {
if nodeId <= 0 { if nodeId <= 0 {
return return
} }
_, err = this.Query(tx). var query = this.Query(tx).
Attr("role", role). Attr("role", role).
Attr("nodeId", nodeId). Attr("nodeId", nodeId).
Where("(isDone=0 OR (isDone=1 AND isOk=0))"). Asc("version")
if version > 0 {
query.Lt("LENGTH(version)", 19) // 兼容以往版本
query.Gt("version", version)
} else {
// 第一次访问时只取当前正在执行的或者执行失败的
query.Where("(isDone=0 OR (isDone=1 AND isOk=0))")
}
_, err = query.
Slice(&result). Slice(&result).
FindAll() FindAll()
return return
@@ -240,8 +252,16 @@ func (this *NodeTaskDAO) FindDoingNodeTasks(tx *dbs.Tx, role string, nodeId int6
// UpdateNodeTaskDone 修改节点任务的完成状态 // UpdateNodeTaskDone 修改节点任务的完成状态
func (this *NodeTaskDAO) UpdateNodeTaskDone(tx *dbs.Tx, taskId int64, isOk bool, errorMessage string) error { func (this *NodeTaskDAO) UpdateNodeTaskDone(tx *dbs.Tx, taskId int64, isOk bool, errorMessage string) error {
_, err := this.Query(tx). var query = this.Query(tx).
Pk(taskId). Pk(taskId)
if !isOk {
version, err := this.increaseVersion(tx)
if err != nil {
return err
}
query.Set("version", version)
}
_, err := query.
Set("isDone", 1). Set("isDone", 1).
Set("isOk", isOk). Set("isOk", isOk).
Set("error", errorMessage). Set("error", errorMessage).
@@ -373,3 +393,8 @@ func (this *NodeTaskDAO) UpdateTasksNotified(tx *dbs.Tx, taskIds []int64) error
} }
return nil return nil
} }
// 生成一个版本号
func (this *NodeTaskDAO) increaseVersion(tx *dbs.Tx) (version int64, err error) {
return SharedSysLockerDAO.Increase(tx, "NODE_TASK_VERSION", 0)
}

View File

@@ -11,7 +11,7 @@ func TestNodeTaskDAO_CreateNodeTask(t *testing.T) {
dbs.NotifyReady() dbs.NotifyReady()
var tx *dbs.Tx var tx *dbs.Tx
err := SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, 1, 2, 0, 0, NodeTaskTypeConfigChanged, 0) err := SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, 1, 2, 0, 0, NodeTaskTypeConfigChanged)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -24,10 +24,8 @@ func (this *NodeTaskService) FindNodeTasks(ctx context.Context, req *pb.FindNode
return nil, err return nil, err
} }
_ = req
var tx = this.NullTx() var tx = this.NullTx()
tasks, err := models.SharedNodeTaskDAO.FindDoingNodeTasks(tx, nodeType, nodeId) tasks, err := models.SharedNodeTaskDAO.FindDoingNodeTasks(tx, nodeType, nodeId, req.Version)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -83,10 +83,13 @@ var upgradeFuncs = []*upgradeVersion{
"0.4.11", upgradeV0_4_11, "0.4.11", upgradeV0_4_11,
}, },
{ {
"v0.5.3", upgradeV0_5_3, "0.5.3", upgradeV0_5_3,
}, },
{ {
"v0.5.6", upgradeV0_5_6, "0.5.6", upgradeV0_5_6,
},
{
"0.5.7", upgradeV0_5_7,
}, },
} }
@@ -716,3 +719,16 @@ func upgradeV0_4_11(db *dbs.DB) error {
return nil return nil
} }
// v0.5.7
func upgradeV0_5_7(db *dbs.DB) error {
// node task versions
{
_, err := db.Exec("UPDATE edgeNodeTasks SET version=0 WHERE LENGTH(version)=19")
if err != nil {
return err
}
}
return nil
}

View File

@@ -251,3 +251,22 @@ func TestUpgradeSQLData_v0_5_3(t *testing.T) {
} }
t.Log("ok") t.Log("ok")
} }
func TestUpgradeSQLData_v0_5_7(t *testing.T) {
db, err := dbs.NewInstanceFromConfig(&dbs.DBConfig{
Driver: "mysql",
Dsn: "root:123456@tcp(127.0.0.1:3306)/db_edge?charset=utf8mb4&timeout=30s",
Prefix: "edge",
})
if err != nil {
t.Fatal(err)
}
defer func() {
_ = db.Close()
}()
err = upgradeV0_5_7(db)
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}