fix: 依赖注入支持私有变量

This commit is contained in:
wanli
2024-01-23 16:29:41 +08:00
parent 3fc86f0fae
commit 070d4ea104
10 changed files with 107 additions and 112 deletions

View File

@@ -21,13 +21,13 @@ const (
type dbScheduler struct {
mutex sync.Mutex
runner *runner.Runner[entity.DbJob]
DbApp Db `inject:"DbApp"`
BackupRepo repository.DbBackup `inject:"DbBackupRepo"`
BackupHistoryRepo repository.DbBackupHistory `inject:"DbBackupHistoryRepo"`
RestoreRepo repository.DbRestore `inject:"DbRestoreRepo"`
RestoreHistoryRepo repository.DbRestoreHistory `inject:"DbRestoreHistoryRepo"`
BinlogRepo repository.DbBinlog `inject:"DbBinlogRepo"`
BinlogHistoryRepo repository.DbBinlogHistory `inject:"DbBinlogHistoryRepo"`
dbApp Db `inject:"DbApp"`
backupRepo repository.DbBackup `inject:"DbBackupRepo"`
backupHistoryRepo repository.DbBackupHistory `inject:"DbBackupHistoryRepo"`
restoreRepo repository.DbRestore `inject:"DbRestoreRepo"`
restoreHistoryRepo repository.DbRestoreHistory `inject:"DbRestoreHistoryRepo"`
binlogRepo repository.DbBinlog `inject:"DbBinlogRepo"`
binlogHistoryRepo repository.DbBinlogHistory `inject:"DbBinlogHistoryRepo"`
binlogTimes map[uint64]time.Time
}
@@ -47,11 +47,11 @@ func (s *dbScheduler) scheduleJob(job entity.DbJob) (time.Time, error) {
func (s *dbScheduler) repo(typ entity.DbJobType) repository.DbJob {
switch typ {
case entity.DbJobTypeBackup:
return s.BackupRepo
return s.backupRepo
case entity.DbJobTypeRestore:
return s.RestoreRepo
return s.restoreRepo
case entity.DbJobTypeBinlog:
return s.BinlogRepo
return s.binlogRepo
default:
panic(errors.New(fmt.Sprintf("无效的数据库任务类型: %v", typ)))
}
@@ -177,7 +177,7 @@ func (s *dbScheduler) backupMysql(ctx context.Context, job entity.DbJob) error {
DbInstanceId: backup.DbInstanceId,
DbName: backup.DbName,
}
conn, err := s.DbApp.GetDbConnByInstanceId(backup.DbInstanceId)
conn, err := s.dbApp.GetDbConnByInstanceId(backup.DbInstanceId)
if err != nil {
return err
}
@@ -197,7 +197,7 @@ func (s *dbScheduler) backupMysql(ctx context.Context, job entity.DbJob) error {
history.BinlogSequence = binlogInfo.Sequence
history.BinlogPosition = binlogInfo.Position
if err := s.BackupHistoryRepo.Insert(ctx, history); err != nil {
if err := s.backupHistoryRepo.Insert(ctx, history); err != nil {
return err
}
return nil
@@ -205,7 +205,7 @@ func (s *dbScheduler) backupMysql(ctx context.Context, job entity.DbJob) error {
func (s *dbScheduler) restoreMysql(ctx context.Context, job entity.DbJob) error {
restore := job.(*entity.DbRestore)
conn, err := s.DbApp.GetDbConnByInstanceId(restore.DbInstanceId)
conn, err := s.dbApp.GetDbConnByInstanceId(restore.DbInstanceId)
if err != nil {
return err
}
@@ -223,14 +223,14 @@ func (s *dbScheduler) restoreMysql(ctx context.Context, job entity.DbJob) error
}
latestBinlogSequence, earliestBackupSequence := int64(-1), int64(-1)
binlogHistory, ok, err := s.BinlogHistoryRepo.GetLatestHistory(restore.DbInstanceId)
binlogHistory, ok, err := s.binlogHistoryRepo.GetLatestHistory(restore.DbInstanceId)
if err != nil {
return err
}
if ok {
latestBinlogSequence = binlogHistory.Sequence
} else {
backupHistory, ok, err := s.BackupHistoryRepo.GetEarliestHistory(restore.DbInstanceId)
backupHistory, ok, err := s.backupHistoryRepo.GetEarliestHistory(restore.DbInstanceId)
if err != nil {
return err
}
@@ -243,7 +243,7 @@ func (s *dbScheduler) restoreMysql(ctx context.Context, job entity.DbJob) error
if err != nil {
return err
}
if err := s.BinlogHistoryRepo.InsertWithBinlogFiles(ctx, restore.DbInstanceId, binlogFiles); err != nil {
if err := s.binlogHistoryRepo.InsertWithBinlogFiles(ctx, restore.DbInstanceId, binlogFiles); err != nil {
return err
}
if err := s.restorePointInTime(ctx, dbProgram, restore); err != nil {
@@ -259,7 +259,7 @@ func (s *dbScheduler) restoreMysql(ctx context.Context, job entity.DbJob) error
CreateTime: time.Now(),
DbRestoreId: restore.Id,
}
if err := s.RestoreHistoryRepo.Insert(ctx, history); err != nil {
if err := s.restoreHistoryRepo.Insert(ctx, history); err != nil {
return err
}
return nil
@@ -330,7 +330,7 @@ func relatedToBinlog(typ entity.DbJobType) bool {
}
func (s *dbScheduler) restorePointInTime(ctx context.Context, program dbi.DbProgram, job *entity.DbRestore) error {
binlogHistory, err := s.BinlogHistoryRepo.GetHistoryByTime(job.DbInstanceId, job.PointInTime.Time)
binlogHistory, err := s.binlogHistoryRepo.GetHistoryByTime(job.DbInstanceId, job.PointInTime.Time)
if err != nil {
return err
}
@@ -343,7 +343,7 @@ func (s *dbScheduler) restorePointInTime(ctx context.Context, program dbi.DbProg
Sequence: binlogHistory.Sequence,
Position: position,
}
backupHistory, err := s.BackupHistoryRepo.GetLatestHistory(job.DbInstanceId, job.DbName, target)
backupHistory, err := s.backupHistoryRepo.GetLatestHistory(job.DbInstanceId, job.DbName, target)
if err != nil {
return err
}
@@ -352,7 +352,7 @@ func (s *dbScheduler) restorePointInTime(ctx context.Context, program dbi.DbProg
Sequence: backupHistory.BinlogSequence,
Position: backupHistory.BinlogPosition,
}
binlogHistories, err := s.BinlogHistoryRepo.GetHistories(job.DbInstanceId, start, target)
binlogHistories, err := s.binlogHistoryRepo.GetHistories(job.DbInstanceId, start, target)
if err != nil {
return err
}
@@ -389,7 +389,7 @@ func (s *dbScheduler) restorePointInTime(ctx context.Context, program dbi.DbProg
func (s *dbScheduler) restoreBackupHistory(ctx context.Context, program dbi.DbProgram, job *entity.DbRestore) error {
backupHistory := &entity.DbBackupHistory{}
if err := s.BackupHistoryRepo.GetById(backupHistory, job.DbBackupHistoryId); err != nil {
if err := s.backupHistoryRepo.GetById(backupHistory, job.DbBackupHistoryId); err != nil {
return err
}
return program.RestoreBackupHistory(ctx, backupHistory.DbName, backupHistory.DbBackupId, backupHistory.Uuid)
@@ -398,14 +398,14 @@ func (s *dbScheduler) restoreBackupHistory(ctx context.Context, program dbi.DbPr
func (s *dbScheduler) fetchBinlogMysql(ctx context.Context, backup entity.DbJob) error {
instanceId := backup.GetJobBase().DbInstanceId
latestBinlogSequence, earliestBackupSequence := int64(-1), int64(-1)
binlogHistory, ok, err := s.BinlogHistoryRepo.GetLatestHistory(instanceId)
binlogHistory, ok, err := s.binlogHistoryRepo.GetLatestHistory(instanceId)
if err != nil {
return err
}
if ok {
latestBinlogSequence = binlogHistory.Sequence
} else {
backupHistory, ok, err := s.BackupHistoryRepo.GetEarliestHistory(instanceId)
backupHistory, ok, err := s.backupHistoryRepo.GetEarliestHistory(instanceId)
if err != nil {
return err
}
@@ -414,7 +414,7 @@ func (s *dbScheduler) fetchBinlogMysql(ctx context.Context, backup entity.DbJob)
}
earliestBackupSequence = backupHistory.BinlogSequence
}
conn, err := s.DbApp.GetDbConnByInstanceId(instanceId)
conn, err := s.dbApp.GetDbConnByInstanceId(instanceId)
if err != nil {
return err
}
@@ -423,5 +423,5 @@ func (s *dbScheduler) fetchBinlogMysql(ctx context.Context, backup entity.DbJob)
if err != nil {
return err
}
return s.BinlogHistoryRepo.InsertWithBinlogFiles(ctx, instanceId, binlogFiles)
return s.binlogHistoryRepo.InsertWithBinlogFiles(ctx, instanceId, binlogFiles)
}