diff --git a/Dockerfile b/Dockerfile index 59cc1e47..ccc92616 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,9 +5,9 @@ WORKDIR /mayfly COPY mayfly_go_web . -RUN yarn install - -RUN yarn build +RUN yarn config set registry 'https://registry.npm.taobao.org' && \ + yarn install && \ + yarn build # 构建后端资源 FROM golang:1.21.5 as be-builder diff --git a/server/internal/db/application/application.go b/server/internal/db/application/application.go index 4f231c05..94afbae0 100644 --- a/server/internal/db/application/application.go +++ b/server/internal/db/application/application.go @@ -19,10 +19,6 @@ var ( dataSyncApp DataSyncTask ) -//var repositories *repository.Repositories -//var scheduler *dbScheduler[*entity.DbBackup] -//var scheduler1 *dbScheduler[*entity.DbRestore] - func Init() { sync.OnceFunc(func() { repositories := &repository.Repositories{ diff --git a/server/internal/db/application/db_binlog.go b/server/internal/db/application/db_binlog.go index 35e4a7c3..33cbd644 100644 --- a/server/internal/db/application/db_binlog.go +++ b/server/internal/db/application/db_binlog.go @@ -65,10 +65,13 @@ func (app *DbBinlogApp) fetchBinlog(ctx context.Context, backup *entity.DbBackup if ok { latestBinlogSequence = binlogHistory.Sequence } else { - backupHistory, err := app.backupHistoryRepo.GetEarliestHistory(backup.DbInstanceId) + backupHistory, ok, err := app.backupHistoryRepo.GetEarliestHistory(backup.DbInstanceId) if err != nil { return err } + if !ok { + return nil + } earliestBackupSequence = backupHistory.BinlogSequence } conn, err := app.dbApp.GetDbConnByInstanceId(backup.DbInstanceId) @@ -92,6 +95,8 @@ func (app *DbBinlogApp) fetchBinlog(ctx context.Context, backup *entity.DbBackup func (app *DbBinlogApp) run() { defer app.waitGroup.Done() + // todo: 实现 binlog 并发下载 + timex.SleepWithContext(app.context, time.Minute) for !app.closed() { app.fetchFromAllInstances() timex.SleepWithContext(app.context, binlogDownloadInterval) diff --git a/server/internal/db/application/db_scheduler.go b/server/internal/db/application/db_scheduler.go index 52ca044d..9a40e16b 100644 --- a/server/internal/db/application/db_scheduler.go +++ b/server/internal/db/application/db_scheduler.go @@ -232,10 +232,13 @@ func (s *dbScheduler) restoreMysql(ctx context.Context, job entity.DbJob) error if ok { latestBinlogSequence = binlogHistory.Sequence } else { - backupHistory, err := s.backupHistoryRepo.GetEarliestHistory(restore.DbInstanceId) + backupHistory, ok, err := s.backupHistoryRepo.GetEarliestHistory(restore.DbInstanceId) if err != nil { return err } + if !ok { + return nil + } earliestBackupSequence = backupHistory.BinlogSequence } binlogFiles, err := dbProgram.FetchBinlogs(ctx, true, earliestBackupSequence, latestBinlogSequence) @@ -300,12 +303,12 @@ func (s *dbScheduler) runnable(job entity.DbJob, next runner.NextFunc) bool { itemBase := item.(entity.DbJob).GetJobBase() if jobBase.DbInstanceId == itemBase.DbInstanceId { countByInstanceId++ - if countByInstanceId > maxCountByInstanceId { + if countByInstanceId >= maxCountByInstanceId { return false } if jobBase.DbName == itemBase.DbName { countByDbName++ - if countByDbName > maxCountByDbName { + if countByDbName >= maxCountByDbName { return false } } diff --git a/server/internal/db/domain/entity/db_job.go b/server/internal/db/domain/entity/db_job.go index 29233907..15c354dc 100644 --- a/server/internal/db/domain/entity/db_job.go +++ b/server/internal/db/domain/entity/db_job.go @@ -187,7 +187,7 @@ func (d *DbJobBaseImpl) IsFinished() bool { return !d.Repeated && d.LastStatus == DbJobSuccess } -func (d *DbJobBaseImpl) Renew(job runner.Job) { +func (d *DbJobBaseImpl) Update(job runner.Job) { jobBase := job.(DbJob).GetJobBase() d.StartTime = jobBase.StartTime d.Interval = jobBase.Interval diff --git a/server/internal/db/domain/repository/db_backup_history.go b/server/internal/db/domain/repository/db_backup_history.go index 46967dae..83652705 100644 --- a/server/internal/db/domain/repository/db_backup_history.go +++ b/server/internal/db/domain/repository/db_backup_history.go @@ -14,5 +14,5 @@ type DbBackupHistory interface { GetLatestHistory(instanceId uint64, dbName string, bi *entity.BinlogInfo) (*entity.DbBackupHistory, error) - GetEarliestHistory(instanceId uint64) (*entity.DbBackupHistory, error) + GetEarliestHistory(instanceId uint64) (*entity.DbBackupHistory, bool, error) } diff --git a/server/internal/db/infrastructure/persistence/db_backup_history.go b/server/internal/db/infrastructure/persistence/db_backup_history.go index 138fb2ad..3bb32571 100644 --- a/server/internal/db/infrastructure/persistence/db_backup_history.go +++ b/server/internal/db/infrastructure/persistence/db_backup_history.go @@ -1,6 +1,8 @@ package persistence import ( + "errors" + "gorm.io/gorm" "mayfly-go/internal/db/domain/entity" "mayfly-go/internal/db/domain/repository" "mayfly-go/pkg/base" @@ -47,15 +49,19 @@ func (repo *dbBackupHistoryRepoImpl) GetLatestHistory(instanceId uint64, dbName return history, err } -func (repo *dbBackupHistoryRepoImpl) GetEarliestHistory(instanceId uint64) (*entity.DbBackupHistory, error) { +func (repo *dbBackupHistoryRepoImpl) GetEarliestHistory(instanceId uint64) (*entity.DbBackupHistory, bool, error) { history := &entity.DbBackupHistory{} db := global.Db.Model(repo.GetModel()) err := db.Where("db_instance_id = ?", instanceId). Scopes(gormx.UndeleteScope). Order("binlog_sequence"). First(history).Error - if err != nil { - return nil, err + switch { + case err == nil: + return history, true, nil + case errors.Is(err, gorm.ErrRecordNotFound): + return history, false, nil + default: + return nil, false, err } - return history, nil } diff --git a/server/internal/db/infrastructure/persistence/db_binlog_history.go b/server/internal/db/infrastructure/persistence/db_binlog_history.go index 29c3352f..09a11444 100644 --- a/server/internal/db/infrastructure/persistence/db_binlog_history.go +++ b/server/internal/db/infrastructure/persistence/db_binlog_history.go @@ -94,7 +94,7 @@ func (repo *dbBinlogHistoryRepoImpl) InsertWithBinlogFiles(ctx context.Context, if len(binlogFiles) == 0 { return nil } - histories := make([]any, 0, len(binlogFiles)) + histories := make([]*entity.DbBinlogHistory, 0, len(binlogFiles)) for _, fileOnServer := range binlogFiles { if !fileOnServer.Downloaded { break @@ -115,7 +115,7 @@ func (repo *dbBinlogHistoryRepoImpl) InsertWithBinlogFiles(ctx context.Context, } } if len(histories) > 0 { - if err := repo.Upsert(ctx, histories[len(histories)-1].(*entity.DbBinlogHistory)); err != nil { + if err := repo.Upsert(ctx, histories[len(histories)-1]); err != nil { return err } } diff --git a/server/pkg/runner/delay_queue.go b/server/pkg/runner/delay_queue.go index 7d786492..9ae4b661 100644 --- a/server/pkg/runner/delay_queue.go +++ b/server/pkg/runner/delay_queue.go @@ -114,13 +114,16 @@ func (s *DelayQueue[T]) Dequeue(ctx context.Context) (T, bool) { // 等待时间到期或新元素加入 timer := time.NewTimer(delay) select { - case elm := <-s.transferChan: - return elm, true - case <-s.enqueuedSignal: - continue case <-timer.C: continue + case elm := <-s.transferChan: + timer.Stop() + return elm, true + case <-s.enqueuedSignal: + timer.Stop() + continue case <-ctx.Done(): + timer.Stop() return s.zero, false } } @@ -187,12 +190,14 @@ func (s *DelayQueue[T]) Enqueue(ctx context.Context, val T) bool { // 新元素需要延迟,等待退出信号、出队信号和到期信号 timer := time.NewTimer(delay) select { - case <-s.dequeuedSignal: - // 收到出队信号,从头开始尝试入队 - continue case <-timer.C: // 新元素不再需要延迟 + case <-s.dequeuedSignal: + // 收到出队信号,从头开始尝试入队 + timer.Stop() + continue case <-ctx.Done(): + timer.Stop() return false } } else { diff --git a/server/pkg/runner/runner.go b/server/pkg/runner/runner.go index d6813dfa..0e5d153d 100644 --- a/server/pkg/runner/runner.go +++ b/server/pkg/runner/runner.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/emirpasic/gods/maps/linkedhashmap" "mayfly-go/pkg/logx" + "mayfly-go/pkg/utils/timex" "sync" "time" ) @@ -32,7 +33,7 @@ type Job interface { Runnable(next NextFunc) bool GetDeadline() time.Time Schedule() bool - Renew(job Job) + Update(job Job) } type iterator[T Job] struct { @@ -138,6 +139,7 @@ func NewRunner[T Job](maxRunning int) *Runner[T] { } go func() { defer runner.wg.Done() + timex.SleepWithContext(runner.context, time.Second*10) for runner.context.Err() == nil { job, ok := runner.delayQueue.Dequeue(ctx) if !ok { @@ -277,7 +279,7 @@ func (r *Runner[T]) UpdateOrAdd(ctx context.Context, job T) error { defer r.mutex.Unlock() if old, ok := r.all[job.GetKey()]; ok { - old.Renew(job) + old.Update(job) job = old } r.schedule(ctx, job) diff --git a/server/pkg/runner/runner_test.go b/server/pkg/runner/runner_test.go index cf48e80c..c5443037 100644 --- a/server/pkg/runner/runner_test.go +++ b/server/pkg/runner/runner_test.go @@ -29,7 +29,7 @@ type testJob struct { deadline time.Time } -func (t *testJob) Renew(job Job) { +func (t *testJob) Update(_ Job) { } func (t *testJob) GetDeadline() time.Time { @@ -82,6 +82,7 @@ func TestRunner_Close(t *testing.T) { }() waiting.Wait() timer := time.NewTimer(time.Microsecond * 10) + defer timer.Stop() runner.Close() select { case <-timer.C: diff --git a/server/pkg/utils/timex/timex.go b/server/pkg/utils/timex/timex.go index e787a2ac..fa2ff905 100644 --- a/server/pkg/utils/timex/timex.go +++ b/server/pkg/utils/timex/timex.go @@ -54,7 +54,10 @@ func (nt *NullTime) MarshalJSON() ([]byte, error) { } func SleepWithContext(ctx context.Context, d time.Duration) { - ctx, cancel := context.WithTimeout(ctx, d) - <-ctx.Done() - cancel() + timer := time.NewTimer(d) + defer timer.Stop() + select { + case <-timer.C: + case <-ctx.Done(): + } }