!85 fix: 修复 BINLOG同步任务加载问题

* Merge branch 'dev' of gitee.com:dromara/mayfly-go into feat-db-bak
* fix: 修复 BINLOG 同步任务加载问题
This commit is contained in:
kanzihuang
2024-01-19 00:40:44 +00:00
committed by Coder慌
parent 7c53353c60
commit b017b902f8
7 changed files with 48 additions and 44 deletions

View File

@@ -24,9 +24,11 @@ func (b *DbBackup) GetDbName() string {
} }
func (b *DbBackup) Schedule() (time.Time, error) { func (b *DbBackup) Schedule() (time.Time, error) {
var deadline time.Time if b.IsFinished() {
if b.IsFinished() || !b.Enabled { return time.Time{}, runner.ErrJobFinished
return deadline, runner.ErrFinished }
if !b.Enabled {
return time.Time{}, runner.ErrJobDisabled
} }
switch b.LastStatus { switch b.LastStatus {
case DbJobSuccess: case DbJobSuccess:
@@ -34,13 +36,12 @@ func (b *DbBackup) Schedule() (time.Time, error) {
if lastTime.Before(b.StartTime) { if lastTime.Before(b.StartTime) {
lastTime = b.StartTime.Add(-b.Interval) lastTime = b.StartTime.Add(-b.Interval)
} }
deadline = lastTime.Add(b.Interval - lastTime.Sub(b.StartTime)%b.Interval) return lastTime.Add(b.Interval - lastTime.Sub(b.StartTime)%b.Interval), nil
case DbJobFailed: case DbJobFailed:
deadline = time.Now().Add(time.Minute) return time.Now().Add(time.Minute), nil
default: default:
deadline = b.StartTime return b.StartTime, nil
} }
return deadline, nil
} }
func (b *DbBackup) IsFinished() bool { func (b *DbBackup) IsFinished() bool {

View File

@@ -43,9 +43,8 @@ func (b *DbBinlog) GetDbName() string {
func (b *DbBinlog) Schedule() (time.Time, error) { func (b *DbBinlog) Schedule() (time.Time, error) {
switch b.GetJobBase().LastStatus { switch b.GetJobBase().LastStatus {
case DbJobSuccess: case DbJobSuccess:
return time.Time{}, runner.ErrFinished return time.Time{}, runner.ErrJobFinished
case DbJobFailed: case DbJobFailed:
return time.Now().Add(BinlogDownloadInterval), nil return time.Now().Add(BinlogDownloadInterval), nil
default: default:
return time.Now(), nil return time.Now(), nil

View File

@@ -28,23 +28,18 @@ func (r *DbRestore) GetDbName() string {
} }
func (r *DbRestore) Schedule() (time.Time, error) { func (r *DbRestore) Schedule() (time.Time, error) {
var deadline time.Time if !r.Enabled {
if r.IsFinished() || !r.Enabled { return time.Time{}, runner.ErrJobDisabled
return deadline, runner.ErrFinished
} }
switch r.LastStatus { switch r.LastStatus {
case DbJobSuccess: case DbJobSuccess, DbJobFailed:
lastTime := r.LastTime.Time return time.Time{}, runner.ErrJobFinished
if lastTime.Before(r.StartTime) {
lastTime = r.StartTime.Add(-r.Interval)
}
deadline = lastTime.Add(r.Interval - lastTime.Sub(r.StartTime)%r.Interval)
case DbJobFailed:
deadline = time.Now().Add(time.Minute)
default: default:
deadline = r.StartTime if time.Now().Sub(r.StartTime) > time.Hour {
return time.Time{}, runner.ErrJobTimeout
}
return r.StartTime, nil
} }
return deadline, nil
} }
func (r *DbRestore) IsEnabled() bool { func (r *DbRestore) IsEnabled() bool {

View File

@@ -22,11 +22,13 @@ func NewDbBackupRepo() repository.DbBackup {
func (d *dbBackupRepoImpl) GetDbNamesWithoutBackup(instanceId uint64, dbNames []string) ([]string, error) { func (d *dbBackupRepoImpl) GetDbNamesWithoutBackup(instanceId uint64, dbNames []string) ([]string, error) {
var dbNamesWithBackup []string var dbNamesWithBackup []string
query := gormx.NewQuery(d.GetModel()). err := global.Db.Model(d.GetModel()).
Eq("db_instance_id", instanceId). Where("db_instance_id = ?", instanceId).
Eq("repeated", true). Where("repeated = ?", true).
Undeleted() Scopes(gormx.UndeleteScope).
if err := query.GenGdb().Pluck("db_name", &dbNamesWithBackup).Error; err != nil { Pluck("db_name", &dbNamesWithBackup).
Error
if err != nil {
return nil, err return nil, err
} }
result := make([]string, 0, len(dbNames)) result := make([]string, 0, len(dbNames))
@@ -39,11 +41,13 @@ func (d *dbBackupRepoImpl) GetDbNamesWithoutBackup(instanceId uint64, dbNames []
} }
func (d *dbBackupRepoImpl) ListDbInstances(enabled bool, repeated bool, instanceIds *[]uint64) error { func (d *dbBackupRepoImpl) ListDbInstances(enabled bool, repeated bool, instanceIds *[]uint64) error {
query := gormx.NewQuery(d.GetModel()). return global.Db.Model(d.GetModel()).
Eq0("enabled", enabled). Where("enabled = ?", enabled).
Eq0("repeated", repeated). Where("repeated = ?", repeated).
Undeleted() Scopes(gormx.UndeleteScope).
return query.GenGdb().Distinct().Pluck("db_instance_id", &instanceIds).Error Distinct().
Pluck("db_instance_id", &instanceIds).
Error
} }
func (d *dbBackupRepoImpl) ListToDo(jobs any) error { func (d *dbBackupRepoImpl) ListToDo(jobs any) error {

View File

@@ -22,13 +22,16 @@ func NewDbRestoreRepo() repository.DbRestore {
func (d *dbRestoreRepoImpl) GetDbNamesWithoutRestore(instanceId uint64, dbNames []string) ([]string, error) { func (d *dbRestoreRepoImpl) GetDbNamesWithoutRestore(instanceId uint64, dbNames []string) ([]string, error) {
var dbNamesWithRestore []string var dbNamesWithRestore []string
query := gormx.NewQuery(d.GetModel()). err := global.Db.Model(d.GetModel()).
Eq("db_instance_id", instanceId). Where("db_instance_id = ?", instanceId).
Eq("repeated", true). Where("repeated = ?", true).
Undeleted() Scopes(gormx.UndeleteScope).
if err := query.GenGdb().Pluck("db_name", &dbNamesWithRestore).Error; err != nil { Pluck("db_name", &dbNamesWithRestore).
Error
if err != nil {
return nil, err return nil, err
} }
result := make([]string, 0, len(dbNames)) result := make([]string, 0, len(dbNames))
for _, name := range dbNames { for _, name := range dbNames {
if !slices.Contains(dbNamesWithRestore, name) { if !slices.Contains(dbNamesWithRestore, name) {

View File

@@ -11,9 +11,11 @@ import (
) )
var ( var (
ErrNotFound = errors.New("job not found") ErrJobNotFound = errors.New("job not found")
ErrExist = errors.New("job already exists") ErrJobExist = errors.New("job already exists")
ErrFinished = errors.New("job already finished") ErrJobFinished = errors.New("job already finished")
ErrJobDisabled = errors.New("job has been disabled")
ErrJobTimeout = errors.New("job has timed out")
) )
type JobKey = string type JobKey = string
@@ -236,7 +238,7 @@ func (r *Runner[T]) afterRun(wrap *wrapper[T]) {
func (r *Runner[T]) doScheduleJob(job T, finished bool) (time.Time, error) { func (r *Runner[T]) doScheduleJob(job T, finished bool) (time.Time, error) {
if r.scheduleJob == nil { if r.scheduleJob == nil {
if finished { if finished {
return time.Time{}, ErrFinished return time.Time{}, ErrJobFinished
} }
return time.Now(), nil return time.Now(), nil
} }
@@ -293,7 +295,7 @@ func (r *Runner[T]) Add(ctx context.Context, job T) error {
defer r.mutex.Unlock() defer r.mutex.Unlock()
if _, ok := r.all[job.GetKey()]; ok { if _, ok := r.all[job.GetKey()]; ok {
return ErrExist return ErrJobExist
} }
deadline, err := r.doScheduleJob(job, false) deadline, err := r.doScheduleJob(job, false)
if err != nil { if err != nil {
@@ -358,7 +360,7 @@ func (r *Runner[T]) Remove(ctx context.Context, key JobKey) error {
wrap, ok := r.all[key] wrap, ok := r.all[key]
if !ok { if !ok {
return ErrNotFound return ErrJobNotFound
} }
switch wrap.status { switch wrap.status {
case JobDelaying: case JobDelaying:

View File

@@ -75,7 +75,7 @@ func TestRunner_AddJob(t *testing.T) {
{ {
name: "repetitive job", name: "repetitive job",
job: newTestJob("dual"), job: newTestJob("dual"),
want: ErrExist, want: ErrJobExist,
}, },
} }
runner := NewRunner[*testJob](1, func(ctx context.Context, job *testJob) { runner := NewRunner[*testJob](1, func(ctx context.Context, job *testJob) {