当服务配置变化时创建单个服务通知任务

This commit is contained in:
刘祥超
2022-01-19 22:15:01 +08:00
parent 5205136809
commit ef045e90f2
16 changed files with 60 additions and 26 deletions

View File

@@ -49,7 +49,7 @@ func init() {
}
// CreateNodeTask 创建单个节点任务
func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, role string, clusterId int64, nodeId int64, taskType NodeTaskType, version int64) error {
func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, role string, clusterId int64, nodeId int64, serverId int64, taskType NodeTaskType, version int64) error {
if clusterId <= 0 || nodeId <= 0 {
return nil
}
@@ -60,6 +60,7 @@ func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, role string, clusterId int64
"role": role,
"clusterId": clusterId,
"nodeId": nodeId,
"serverId": serverId,
"type": taskType,
"uniqueId": uniqueId,
"updatedAt": updatedAt,
@@ -80,17 +81,18 @@ func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, role string, clusterId int64
}
// CreateClusterTask 创建集群任务
func (this *NodeTaskDAO) CreateClusterTask(tx *dbs.Tx, role string, clusterId int64, taskType NodeTaskType) error {
func (this *NodeTaskDAO) CreateClusterTask(tx *dbs.Tx, role string, clusterId int64, serverId int64, taskType NodeTaskType) error {
if clusterId <= 0 {
return nil
}
uniqueId := role + "@" + types.String(clusterId) + "@cluster@" + taskType
uniqueId := role + "@" + types.String(clusterId) + "@" + types.String(serverId) + "@cluster@" + taskType
updatedAt := time.Now().Unix()
_, _, err := this.Query(tx).
InsertOrUpdate(maps.Map{
"role": role,
"clusterId": clusterId,
"serverId": serverId,
"nodeId": 0,
"type": taskType,
"uniqueId": uniqueId,
@@ -112,7 +114,7 @@ func (this *NodeTaskDAO) CreateClusterTask(tx *dbs.Tx, role string, clusterId in
}
// ExtractNodeClusterTask 分解边缘节点集群任务
func (this *NodeTaskDAO) ExtractNodeClusterTask(tx *dbs.Tx, clusterId int64, taskType NodeTaskType) error {
func (this *NodeTaskDAO) ExtractNodeClusterTask(tx *dbs.Tx, clusterId int64, serverId int64, taskType NodeTaskType) error {
nodeIds, err := SharedNodeDAO.FindAllNodeIdsMatch(tx, clusterId, true, configutils.BoolStateYes)
if err != nil {
return err
@@ -131,7 +133,7 @@ func (this *NodeTaskDAO) ExtractNodeClusterTask(tx *dbs.Tx, clusterId int64, tas
var version = time.Now().UnixNano()
for _, nodeId := range nodeIds {
err = this.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, taskType, version)
err = this.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, serverId, taskType, version)
if err != nil {
return err
}
@@ -170,7 +172,7 @@ func (this *NodeTaskDAO) ExtractNSClusterTask(tx *dbs.Tx, clusterId int64, taskT
var version = time.Now().UnixNano()
for _, nodeId := range nodeIds {
err = this.CreateNodeTask(tx, nodeconfigs.NodeRoleDNS, clusterId, nodeId, taskType, version)
err = this.CreateNodeTask(tx, nodeconfigs.NodeRoleDNS, clusterId, nodeId, 0, taskType, version)
if err != nil {
return err
}
@@ -202,7 +204,8 @@ func (this *NodeTaskDAO) ExtractAllClusterTasks(tx *dbs.Tx, role string) error {
clusterId := int64(one.(*NodeTask).ClusterId)
switch role {
case nodeconfigs.NodeRoleNode:
err = this.ExtractNodeClusterTask(tx, clusterId, one.(*NodeTask).Type)
var nodeTask = one.(*NodeTask)
err = this.ExtractNodeClusterTask(tx, clusterId, int64(nodeTask.ServerId), nodeTask.Type)
if err != nil {
return err
}