修复无法同时对相同对象执行多次DNS解析任务的问题

This commit is contained in:
GoEdgeLab
2023-03-18 16:40:00 +08:00
parent 67743c616e
commit 55d832cde9
2 changed files with 39 additions and 23 deletions

View File

@@ -156,10 +156,24 @@ func (this *DNSTaskDAO) UpdateDNSTaskError(tx *dbs.Tx, taskId int64, err string)
} }
// UpdateDNSTaskDone 设置任务完成 // UpdateDNSTaskDone 设置任务完成
func (this *DNSTaskDAO) UpdateDNSTaskDone(tx *dbs.Tx, taskId int64) error { func (this *DNSTaskDAO) UpdateDNSTaskDone(tx *dbs.Tx, taskId int64, taskVersion int64) error {
if taskId <= 0 { if taskId <= 0 {
return errors.New("invalid taskId") return errors.New("invalid taskId")
} }
currentVersion, err := this.Query(tx).
Pk(taskId).
Result("version").
FindInt64Col(0)
if err != nil {
return err
}
// 如果版本号发生变化,则说明有新的要执行的任务
if taskVersion > 0 && currentVersion > 0 && currentVersion != taskVersion {
return nil
}
var op = NewDNSTaskOperator() var op = NewDNSTaskOperator()
op.Id = taskId op.Id = taskId
op.IsDone = true op.IsDone = true

View File

@@ -20,7 +20,7 @@ import (
func init() { func init() {
dbs.OnReadyDone(func() { dbs.OnReadyDone(func() {
goman.New(func() { goman.New(func() {
NewDNSTaskExecutor(10 * time.Second).Start() NewDNSTaskExecutor(20 * time.Second).Start()
}) })
}) })
} }
@@ -60,11 +60,13 @@ func (this *DNSTaskExecutor) loop() error {
if err != nil { if err != nil {
return err return err
} }
for _, task := range tasks { for _, task := range tasks {
taskId := int64(task.Id) var taskId = int64(task.Id)
var taskVersion = int64(task.Version)
switch task.Type { switch task.Type {
case dnsmodels.DNSTaskTypeServerChange: case dnsmodels.DNSTaskTypeServerChange:
err = this.doServer(taskId, int64(task.ClusterId), int64(task.ServerId)) err = this.doServer(taskId, int64(task.Version), int64(task.ClusterId), int64(task.ServerId))
if err != nil { if err != nil {
err = dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskError(nil, taskId, err.Error()) err = dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskError(nil, taskId, err.Error())
if err != nil { if err != nil {
@@ -72,7 +74,7 @@ func (this *DNSTaskExecutor) loop() error {
} }
} }
case dnsmodels.DNSTaskTypeNodeChange: case dnsmodels.DNSTaskTypeNodeChange:
err = this.doNode(taskId, int64(task.ClusterId), int64(task.NodeId)) err = this.doNode(taskId, taskVersion, int64(task.ClusterId), int64(task.NodeId))
if err != nil { if err != nil {
err = dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskError(nil, taskId, err.Error()) err = dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskError(nil, taskId, err.Error())
if err != nil { if err != nil {
@@ -80,7 +82,7 @@ func (this *DNSTaskExecutor) loop() error {
} }
} }
case dnsmodels.DNSTaskTypeClusterChange: case dnsmodels.DNSTaskTypeClusterChange:
err = this.doCluster(taskId, int64(task.ClusterId)) err = this.doCluster(taskId, taskVersion, int64(task.ClusterId))
if err != nil { if err != nil {
err = dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskError(nil, taskId, err.Error()) err = dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskError(nil, taskId, err.Error())
if err != nil { if err != nil {
@@ -88,7 +90,7 @@ func (this *DNSTaskExecutor) loop() error {
} }
} }
case dnsmodels.DNSTaskTypeClusterRemoveDomain: case dnsmodels.DNSTaskTypeClusterRemoveDomain:
err = this.doClusterRemove(taskId, int64(task.ClusterId), int64(task.DomainId), task.RecordName) err = this.doClusterRemove(taskId, taskVersion, int64(task.ClusterId), int64(task.DomainId), task.RecordName)
if err != nil { if err != nil {
err = dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskError(nil, taskId, err.Error()) err = dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskError(nil, taskId, err.Error())
if err != nil { if err != nil {
@@ -96,7 +98,7 @@ func (this *DNSTaskExecutor) loop() error {
} }
} }
case dnsmodels.DNSTaskTypeDomainChange: case dnsmodels.DNSTaskTypeDomainChange:
err = this.doDomainWithTask(taskId, int64(task.DomainId)) err = this.doDomainWithTask(taskId, taskVersion, int64(task.DomainId))
if err != nil { if err != nil {
err = dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskError(nil, taskId, err.Error()) err = dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskError(nil, taskId, err.Error())
if err != nil { if err != nil {
@@ -110,13 +112,13 @@ func (this *DNSTaskExecutor) loop() error {
} }
// 修改服务相关记录 // 修改服务相关记录
func (this *DNSTaskExecutor) doServer(taskId int64, oldClusterId int64, serverId int64) error { func (this *DNSTaskExecutor) doServer(taskId int64, taskVersion int64, oldClusterId int64, serverId int64) error {
var tx *dbs.Tx var tx *dbs.Tx
isOk := false var isOk = false
defer func() { defer func() {
if isOk { if isOk {
err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(tx, taskId) err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(tx, taskId, taskVersion)
if err != nil { if err != nil {
this.logErr("DNSTaskExecutor", err.Error()) this.logErr("DNSTaskExecutor", err.Error())
} }
@@ -269,11 +271,11 @@ func (this *DNSTaskExecutor) doServer(taskId int64, oldClusterId int64, serverId
} }
// 修改节点相关记录 // 修改节点相关记录
func (this *DNSTaskExecutor) doNode(taskId int64, nodeClusterId int64, nodeId int64) error { func (this *DNSTaskExecutor) doNode(taskId int64, taskVersion int64, nodeClusterId int64, nodeId int64) error {
var isOk = false var isOk = false
defer func() { defer func() {
if isOk { if isOk {
err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId) err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId, taskVersion)
if err != nil { if err != nil {
this.logErr("DNSTaskExecutor", err.Error()) this.logErr("DNSTaskExecutor", err.Error())
} }
@@ -315,11 +317,11 @@ func (this *DNSTaskExecutor) doNode(taskId int64, nodeClusterId int64, nodeId in
} }
// 修改集群相关记录 // 修改集群相关记录
func (this *DNSTaskExecutor) doCluster(taskId int64, clusterId int64) error { func (this *DNSTaskExecutor) doCluster(taskId int64, taskVersion int64, clusterId int64) error {
isOk := false isOk := false
defer func() { defer func() {
if isOk { if isOk {
err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId) err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId, taskVersion)
if err != nil { if err != nil {
this.logErr("DNSTaskExecutor", err.Error()) this.logErr("DNSTaskExecutor", err.Error())
} }
@@ -539,11 +541,11 @@ func (this *DNSTaskExecutor) doCluster(taskId int64, clusterId int64) error {
return nil return nil
} }
func (this *DNSTaskExecutor) doClusterRemove(taskId int64, clusterId int64, domainId int64, dnsName string) error { func (this *DNSTaskExecutor) doClusterRemove(taskId int64, taskVersion int64, clusterId int64, domainId int64, dnsName string) error {
var isOk = false var isOk = false
defer func() { defer func() {
if isOk { if isOk {
err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId) err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId, taskVersion)
if err != nil { if err != nil {
this.logErr("DNSTaskExecutor", err.Error()) this.logErr("DNSTaskExecutor", err.Error())
} }
@@ -625,17 +627,17 @@ func (this *DNSTaskExecutor) doClusterRemove(taskId int64, clusterId int64, doma
} }
func (this *DNSTaskExecutor) doDomain(domainId int64) error { func (this *DNSTaskExecutor) doDomain(domainId int64) error {
return this.doDomainWithTask(0, domainId) return this.doDomainWithTask(0, 0, domainId)
} }
func (this *DNSTaskExecutor) doDomainWithTask(taskId int64, domainId int64) error { func (this *DNSTaskExecutor) doDomainWithTask(taskId int64, taskVersion int64, domainId int64) error {
var tx *dbs.Tx var tx *dbs.Tx
isOk := false var isOk = false
defer func() { defer func() {
if isOk { if isOk {
if taskId > 0 { if taskId > 0 {
err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(tx, taskId) err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(tx, taskId, taskVersion)
if err != nil { if err != nil {
this.logErr("DNSTaskExecutor", err.Error()) this.logErr("DNSTaskExecutor", err.Error())
} }
@@ -651,7 +653,7 @@ func (this *DNSTaskExecutor) doDomainWithTask(taskId int64, domainId int64) erro
isOk = true isOk = true
return nil return nil
} }
providerId := int64(dnsDomain.ProviderId) var providerId = int64(dnsDomain.ProviderId)
if providerId <= 0 { if providerId <= 0 {
isOk = true isOk = true
return nil return nil
@@ -666,7 +668,7 @@ func (this *DNSTaskExecutor) doDomainWithTask(taskId int64, domainId int64) erro
return nil return nil
} }
manager := dnsclients.FindProvider(provider.Type, int64(provider.Id)) var manager = dnsclients.FindProvider(provider.Type, int64(provider.Id))
if manager == nil { if manager == nil {
this.logErr("DNSTaskExecutor", "unsupported dns provider type '"+provider.Type+"'") this.logErr("DNSTaskExecutor", "unsupported dns provider type '"+provider.Type+"'")
isOk = true isOk = true