From b017b902f86a01b2cbce9c4bed451fdc5cb6aee5 Mon Sep 17 00:00:00 2001 From: kanzihuang Date: Fri, 19 Jan 2024 00:40:44 +0000 Subject: [PATCH] =?UTF-8?q?!85=20fix:=20=E4=BF=AE=E5=A4=8D=20BINLOG?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E4=BB=BB=E5=8A=A1=E5=8A=A0=E8=BD=BD=E9=97=AE?= =?UTF-8?q?=E9=A2=98=20*=20Merge=20branch=20'dev'=20of=20gitee.com:dromara?= =?UTF-8?q?/mayfly-go=20into=20feat-db-bak=20*=20fix:=20=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=20BINLOG=20=E5=90=8C=E6=AD=A5=E4=BB=BB=E5=8A=A1=E5=8A=A0?= =?UTF-8?q?=E8=BD=BD=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/internal/db/domain/entity/db_backup.go | 15 ++++++------ server/internal/db/domain/entity/db_binlog.go | 3 +-- .../internal/db/domain/entity/db_restore.go | 21 +++++++--------- .../infrastructure/persistence/db_backup.go | 24 +++++++++++-------- .../infrastructure/persistence/db_restore.go | 13 ++++++---- server/pkg/runner/runner.go | 14 ++++++----- server/pkg/runner/runner_test.go | 2 +- 7 files changed, 48 insertions(+), 44 deletions(-) diff --git a/server/internal/db/domain/entity/db_backup.go b/server/internal/db/domain/entity/db_backup.go index c1193760..327e4548 100644 --- a/server/internal/db/domain/entity/db_backup.go +++ b/server/internal/db/domain/entity/db_backup.go @@ -24,9 +24,11 @@ func (b *DbBackup) GetDbName() string { } func (b *DbBackup) Schedule() (time.Time, error) { - var deadline time.Time - if b.IsFinished() || !b.Enabled { - return deadline, runner.ErrFinished + if b.IsFinished() { + return time.Time{}, runner.ErrJobFinished + } + if !b.Enabled { + return time.Time{}, runner.ErrJobDisabled } switch b.LastStatus { case DbJobSuccess: @@ -34,13 +36,12 @@ func (b *DbBackup) Schedule() (time.Time, error) { if lastTime.Before(b.StartTime) { lastTime = b.StartTime.Add(-b.Interval) } - deadline = lastTime.Add(b.Interval - lastTime.Sub(b.StartTime)%b.Interval) + return lastTime.Add(b.Interval - lastTime.Sub(b.StartTime)%b.Interval), nil case DbJobFailed: - deadline = time.Now().Add(time.Minute) + return time.Now().Add(time.Minute), nil default: - deadline = b.StartTime + return b.StartTime, nil } - return deadline, nil } func (b *DbBackup) IsFinished() bool { diff --git a/server/internal/db/domain/entity/db_binlog.go b/server/internal/db/domain/entity/db_binlog.go index 77d6586d..4a343eb2 100644 --- a/server/internal/db/domain/entity/db_binlog.go +++ b/server/internal/db/domain/entity/db_binlog.go @@ -43,9 +43,8 @@ func (b *DbBinlog) GetDbName() string { func (b *DbBinlog) Schedule() (time.Time, error) { switch b.GetJobBase().LastStatus { case DbJobSuccess: - return time.Time{}, runner.ErrFinished + return time.Time{}, runner.ErrJobFinished case DbJobFailed: - return time.Now().Add(BinlogDownloadInterval), nil default: return time.Now(), nil diff --git a/server/internal/db/domain/entity/db_restore.go b/server/internal/db/domain/entity/db_restore.go index 92354e8a..e81817dc 100644 --- a/server/internal/db/domain/entity/db_restore.go +++ b/server/internal/db/domain/entity/db_restore.go @@ -28,23 +28,18 @@ func (r *DbRestore) GetDbName() string { } func (r *DbRestore) Schedule() (time.Time, error) { - var deadline time.Time - if r.IsFinished() || !r.Enabled { - return deadline, runner.ErrFinished + if !r.Enabled { + return time.Time{}, runner.ErrJobDisabled } switch r.LastStatus { - case DbJobSuccess: - lastTime := r.LastTime.Time - if lastTime.Before(r.StartTime) { - lastTime = r.StartTime.Add(-r.Interval) - } - deadline = lastTime.Add(r.Interval - lastTime.Sub(r.StartTime)%r.Interval) - case DbJobFailed: - deadline = time.Now().Add(time.Minute) + case DbJobSuccess, DbJobFailed: + return time.Time{}, runner.ErrJobFinished default: - deadline = r.StartTime + if time.Now().Sub(r.StartTime) > time.Hour { + return time.Time{}, runner.ErrJobTimeout + } + return r.StartTime, nil } - return deadline, nil } func (r *DbRestore) IsEnabled() bool { diff --git a/server/internal/db/infrastructure/persistence/db_backup.go b/server/internal/db/infrastructure/persistence/db_backup.go index a42ebd3e..d520d03c 100644 --- a/server/internal/db/infrastructure/persistence/db_backup.go +++ b/server/internal/db/infrastructure/persistence/db_backup.go @@ -22,11 +22,13 @@ func NewDbBackupRepo() repository.DbBackup { func (d *dbBackupRepoImpl) GetDbNamesWithoutBackup(instanceId uint64, dbNames []string) ([]string, error) { var dbNamesWithBackup []string - query := gormx.NewQuery(d.GetModel()). - Eq("db_instance_id", instanceId). - Eq("repeated", true). - Undeleted() - if err := query.GenGdb().Pluck("db_name", &dbNamesWithBackup).Error; err != nil { + err := global.Db.Model(d.GetModel()). + Where("db_instance_id = ?", instanceId). + Where("repeated = ?", true). + Scopes(gormx.UndeleteScope). + Pluck("db_name", &dbNamesWithBackup). + Error + if err != nil { return nil, err } result := make([]string, 0, len(dbNames)) @@ -39,11 +41,13 @@ func (d *dbBackupRepoImpl) GetDbNamesWithoutBackup(instanceId uint64, dbNames [] } func (d *dbBackupRepoImpl) ListDbInstances(enabled bool, repeated bool, instanceIds *[]uint64) error { - query := gormx.NewQuery(d.GetModel()). - Eq0("enabled", enabled). - Eq0("repeated", repeated). - Undeleted() - return query.GenGdb().Distinct().Pluck("db_instance_id", &instanceIds).Error + return global.Db.Model(d.GetModel()). + Where("enabled = ?", enabled). + Where("repeated = ?", repeated). + Scopes(gormx.UndeleteScope). + Distinct(). + Pluck("db_instance_id", &instanceIds). + Error } func (d *dbBackupRepoImpl) ListToDo(jobs any) error { diff --git a/server/internal/db/infrastructure/persistence/db_restore.go b/server/internal/db/infrastructure/persistence/db_restore.go index b0884386..57b36c92 100644 --- a/server/internal/db/infrastructure/persistence/db_restore.go +++ b/server/internal/db/infrastructure/persistence/db_restore.go @@ -22,13 +22,16 @@ func NewDbRestoreRepo() repository.DbRestore { func (d *dbRestoreRepoImpl) GetDbNamesWithoutRestore(instanceId uint64, dbNames []string) ([]string, error) { var dbNamesWithRestore []string - query := gormx.NewQuery(d.GetModel()). - Eq("db_instance_id", instanceId). - Eq("repeated", true). - Undeleted() - if err := query.GenGdb().Pluck("db_name", &dbNamesWithRestore).Error; err != nil { + err := global.Db.Model(d.GetModel()). + Where("db_instance_id = ?", instanceId). + Where("repeated = ?", true). + Scopes(gormx.UndeleteScope). + Pluck("db_name", &dbNamesWithRestore). + Error + if err != nil { return nil, err } + result := make([]string, 0, len(dbNames)) for _, name := range dbNames { if !slices.Contains(dbNamesWithRestore, name) { diff --git a/server/pkg/runner/runner.go b/server/pkg/runner/runner.go index 44187b97..65fce064 100644 --- a/server/pkg/runner/runner.go +++ b/server/pkg/runner/runner.go @@ -11,9 +11,11 @@ import ( ) var ( - ErrNotFound = errors.New("job not found") - ErrExist = errors.New("job already exists") - ErrFinished = errors.New("job already finished") + ErrJobNotFound = errors.New("job not found") + ErrJobExist = errors.New("job already exists") + ErrJobFinished = errors.New("job already finished") + ErrJobDisabled = errors.New("job has been disabled") + ErrJobTimeout = errors.New("job has timed out") ) type JobKey = string @@ -236,7 +238,7 @@ func (r *Runner[T]) afterRun(wrap *wrapper[T]) { func (r *Runner[T]) doScheduleJob(job T, finished bool) (time.Time, error) { if r.scheduleJob == nil { if finished { - return time.Time{}, ErrFinished + return time.Time{}, ErrJobFinished } return time.Now(), nil } @@ -293,7 +295,7 @@ func (r *Runner[T]) Add(ctx context.Context, job T) error { defer r.mutex.Unlock() if _, ok := r.all[job.GetKey()]; ok { - return ErrExist + return ErrJobExist } deadline, err := r.doScheduleJob(job, false) if err != nil { @@ -358,7 +360,7 @@ func (r *Runner[T]) Remove(ctx context.Context, key JobKey) error { wrap, ok := r.all[key] if !ok { - return ErrNotFound + return ErrJobNotFound } switch wrap.status { case JobDelaying: diff --git a/server/pkg/runner/runner_test.go b/server/pkg/runner/runner_test.go index 2eedc8b0..5be0a356 100644 --- a/server/pkg/runner/runner_test.go +++ b/server/pkg/runner/runner_test.go @@ -75,7 +75,7 @@ func TestRunner_AddJob(t *testing.T) { { name: "repetitive job", job: newTestJob("dual"), - want: ErrExist, + want: ErrJobExist, }, } runner := NewRunner[*testJob](1, func(ctx context.Context, job *testJob) {