diff --git a/internal/db/models/dns/dns_task_dao.go b/internal/db/models/dns/dns_task_dao.go index 21ef68ec..0b02eef6 100644 --- a/internal/db/models/dns/dns_task_dao.go +++ b/internal/db/models/dns/dns_task_dao.go @@ -156,10 +156,24 @@ func (this *DNSTaskDAO) UpdateDNSTaskError(tx *dbs.Tx, taskId int64, err string) } // UpdateDNSTaskDone 设置任务完成 -func (this *DNSTaskDAO) UpdateDNSTaskDone(tx *dbs.Tx, taskId int64) error { +func (this *DNSTaskDAO) UpdateDNSTaskDone(tx *dbs.Tx, taskId int64, taskVersion int64) error { if taskId <= 0 { return errors.New("invalid taskId") } + + currentVersion, err := this.Query(tx). + Pk(taskId). + Result("version"). + FindInt64Col(0) + if err != nil { + return err + } + + // 如果版本号发生变化,则说明有新的要执行的任务 + if taskVersion > 0 && currentVersion > 0 && currentVersion != taskVersion { + return nil + } + var op = NewDNSTaskOperator() op.Id = taskId op.IsDone = true diff --git a/internal/tasks/dns_task_executor.go b/internal/tasks/dns_task_executor.go index 1ec69dc0..f53f7b06 100644 --- a/internal/tasks/dns_task_executor.go +++ b/internal/tasks/dns_task_executor.go @@ -20,7 +20,7 @@ import ( func init() { dbs.OnReadyDone(func() { goman.New(func() { - NewDNSTaskExecutor(10 * time.Second).Start() + NewDNSTaskExecutor(20 * time.Second).Start() }) }) } @@ -60,11 +60,13 @@ func (this *DNSTaskExecutor) loop() error { if err != nil { return err } + for _, task := range tasks { - taskId := int64(task.Id) + var taskId = int64(task.Id) + var taskVersion = int64(task.Version) switch task.Type { case dnsmodels.DNSTaskTypeServerChange: - err = this.doServer(taskId, int64(task.ClusterId), int64(task.ServerId)) + err = this.doServer(taskId, int64(task.Version), int64(task.ClusterId), int64(task.ServerId)) if err != nil { err = dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskError(nil, taskId, err.Error()) if err != nil { @@ -72,7 +74,7 @@ func (this *DNSTaskExecutor) loop() error { } } case dnsmodels.DNSTaskTypeNodeChange: - err = this.doNode(taskId, int64(task.ClusterId), int64(task.NodeId)) + err = this.doNode(taskId, taskVersion, int64(task.ClusterId), int64(task.NodeId)) if err != nil { err = dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskError(nil, taskId, err.Error()) if err != nil { @@ -80,7 +82,7 @@ func (this *DNSTaskExecutor) loop() error { } } case dnsmodels.DNSTaskTypeClusterChange: - err = this.doCluster(taskId, int64(task.ClusterId)) + err = this.doCluster(taskId, taskVersion, int64(task.ClusterId)) if err != nil { err = dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskError(nil, taskId, err.Error()) if err != nil { @@ -88,7 +90,7 @@ func (this *DNSTaskExecutor) loop() error { } } case dnsmodels.DNSTaskTypeClusterRemoveDomain: - err = this.doClusterRemove(taskId, int64(task.ClusterId), int64(task.DomainId), task.RecordName) + err = this.doClusterRemove(taskId, taskVersion, int64(task.ClusterId), int64(task.DomainId), task.RecordName) if err != nil { err = dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskError(nil, taskId, err.Error()) if err != nil { @@ -96,7 +98,7 @@ func (this *DNSTaskExecutor) loop() error { } } case dnsmodels.DNSTaskTypeDomainChange: - err = this.doDomainWithTask(taskId, int64(task.DomainId)) + err = this.doDomainWithTask(taskId, taskVersion, int64(task.DomainId)) if err != nil { err = dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskError(nil, taskId, err.Error()) if err != nil { @@ -110,13 +112,13 @@ func (this *DNSTaskExecutor) loop() error { } // 修改服务相关记录 -func (this *DNSTaskExecutor) doServer(taskId int64, oldClusterId int64, serverId int64) error { +func (this *DNSTaskExecutor) doServer(taskId int64, taskVersion int64, oldClusterId int64, serverId int64) error { var tx *dbs.Tx - isOk := false + var isOk = false defer func() { if isOk { - err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(tx, taskId) + err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(tx, taskId, taskVersion) if err != nil { this.logErr("DNSTaskExecutor", err.Error()) } @@ -269,11 +271,11 @@ func (this *DNSTaskExecutor) doServer(taskId int64, oldClusterId int64, serverId } // 修改节点相关记录 -func (this *DNSTaskExecutor) doNode(taskId int64, nodeClusterId int64, nodeId int64) error { +func (this *DNSTaskExecutor) doNode(taskId int64, taskVersion int64, nodeClusterId int64, nodeId int64) error { var isOk = false defer func() { if isOk { - err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId) + err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId, taskVersion) if err != nil { this.logErr("DNSTaskExecutor", err.Error()) } @@ -315,11 +317,11 @@ func (this *DNSTaskExecutor) doNode(taskId int64, nodeClusterId int64, nodeId in } // 修改集群相关记录 -func (this *DNSTaskExecutor) doCluster(taskId int64, clusterId int64) error { +func (this *DNSTaskExecutor) doCluster(taskId int64, taskVersion int64, clusterId int64) error { isOk := false defer func() { if isOk { - err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId) + err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId, taskVersion) if err != nil { this.logErr("DNSTaskExecutor", err.Error()) } @@ -539,11 +541,11 @@ func (this *DNSTaskExecutor) doCluster(taskId int64, clusterId int64) error { return nil } -func (this *DNSTaskExecutor) doClusterRemove(taskId int64, clusterId int64, domainId int64, dnsName string) error { +func (this *DNSTaskExecutor) doClusterRemove(taskId int64, taskVersion int64, clusterId int64, domainId int64, dnsName string) error { var isOk = false defer func() { if isOk { - err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId) + err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId, taskVersion) if err != nil { this.logErr("DNSTaskExecutor", err.Error()) } @@ -625,17 +627,17 @@ func (this *DNSTaskExecutor) doClusterRemove(taskId int64, clusterId int64, doma } func (this *DNSTaskExecutor) doDomain(domainId int64) error { - return this.doDomainWithTask(0, domainId) + return this.doDomainWithTask(0, 0, domainId) } -func (this *DNSTaskExecutor) doDomainWithTask(taskId int64, domainId int64) error { +func (this *DNSTaskExecutor) doDomainWithTask(taskId int64, taskVersion int64, domainId int64) error { var tx *dbs.Tx - isOk := false + var isOk = false defer func() { if isOk { if taskId > 0 { - err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(tx, taskId) + err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(tx, taskId, taskVersion) if err != nil { this.logErr("DNSTaskExecutor", err.Error()) } @@ -651,7 +653,7 @@ func (this *DNSTaskExecutor) doDomainWithTask(taskId int64, domainId int64) erro isOk = true return nil } - providerId := int64(dnsDomain.ProviderId) + var providerId = int64(dnsDomain.ProviderId) if providerId <= 0 { isOk = true return nil @@ -666,7 +668,7 @@ func (this *DNSTaskExecutor) doDomainWithTask(taskId int64, domainId int64) erro return nil } - manager := dnsclients.FindProvider(provider.Type, int64(provider.Id)) + var manager = dnsclients.FindProvider(provider.Type, int64(provider.Id)) if manager == nil { this.logErr("DNSTaskExecutor", "unsupported dns provider type '"+provider.Type+"'") isOk = true