Files
mayfly-go/server/internal/db/application/db_scheduler.go
kanzihuang 3f828cc5b0 !96 删除数据库备份和恢复历史
* feat: 删除数据库备份历史
* refactor dbScheduler
* feat: 从数据库备份历史中恢复数据库
* feat: 删除数据库恢复历史记录
* refactor dbScheuler
2024-01-30 13:12:43 +00:00

423 lines
12 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package application
import (
"context"
"errors"
"fmt"
"gorm.io/gorm"
"mayfly-go/internal/db/dbm/dbi"
"mayfly-go/internal/db/domain/entity"
"mayfly-go/internal/db/domain/repository"
"mayfly-go/pkg/runner"
"reflect"
"sync"
"time"
)
const (
maxRunning = 8
)
type dbScheduler struct {
mutex sync.Mutex
runner *runner.Runner[entity.DbJob]
dbApp Db `inject:"DbApp"`
backupRepo repository.DbBackup `inject:"DbBackupRepo"`
backupHistoryRepo repository.DbBackupHistory `inject:"DbBackupHistoryRepo"`
restoreRepo repository.DbRestore `inject:"DbRestoreRepo"`
restoreHistoryRepo repository.DbRestoreHistory `inject:"DbRestoreHistoryRepo"`
binlogRepo repository.DbBinlog `inject:"DbBinlogRepo"`
binlogHistoryRepo repository.DbBinlogHistory `inject:"DbBinlogHistoryRepo"`
}
func newDbScheduler() *dbScheduler {
scheduler := &dbScheduler{}
scheduler.runner = runner.NewRunner[entity.DbJob](maxRunning, scheduler.runJob,
runner.WithScheduleJob[entity.DbJob](scheduler.scheduleJob),
runner.WithRunnableJob[entity.DbJob](scheduler.runnableJob),
runner.WithUpdateJob[entity.DbJob](scheduler.updateJob),
)
return scheduler
}
func (s *dbScheduler) scheduleJob(job entity.DbJob) (time.Time, error) {
return job.Schedule()
}
func (s *dbScheduler) UpdateJob(ctx context.Context, job entity.DbJob) error {
s.mutex.Lock()
defer s.mutex.Unlock()
_ = s.runner.Update(ctx, job)
return nil
}
func (s *dbScheduler) Close() {
s.runner.Close()
}
func (s *dbScheduler) AddJob(ctx context.Context, jobs any) error {
s.mutex.Lock()
defer s.mutex.Unlock()
reflectValue := reflect.ValueOf(jobs)
switch reflectValue.Kind() {
case reflect.Array, reflect.Slice:
reflectLen := reflectValue.Len()
for i := 0; i < reflectLen; i++ {
job := reflectValue.Index(i).Interface().(entity.DbJob)
_ = s.runner.Add(ctx, job)
}
default:
job := jobs.(entity.DbJob)
_ = s.runner.Add(ctx, job)
}
return nil
}
func (s *dbScheduler) RemoveJob(ctx context.Context, jobType entity.DbJobType, jobId uint64) error {
// todo: 删除数据库备份历史文件
s.mutex.Lock()
defer s.mutex.Unlock()
if err := s.runner.Remove(ctx, entity.FormatJobKey(jobType, jobId)); err != nil {
return err
}
return nil
}
func (s *dbScheduler) EnableJob(ctx context.Context, job entity.DbJob) error {
s.mutex.Lock()
defer s.mutex.Unlock()
_ = s.runner.Add(ctx, job)
return nil
}
func (s *dbScheduler) DisableJob(ctx context.Context, jobType entity.DbJobType, jobId uint64) error {
s.mutex.Lock()
defer s.mutex.Unlock()
_ = s.runner.Remove(ctx, entity.FormatJobKey(jobType, jobId))
return nil
}
func (s *dbScheduler) StartJobNow(ctx context.Context, job entity.DbJob) error {
s.mutex.Lock()
defer s.mutex.Unlock()
_ = s.runner.StartNow(ctx, job)
return nil
}
func (s *dbScheduler) backup(ctx context.Context, dbProgram dbi.DbProgram, job entity.DbJob) error {
id, err := NewIncUUID()
if err != nil {
return err
}
backup := job.(*entity.DbBackup)
history := &entity.DbBackupHistory{
Uuid: id.String(),
DbBackupId: backup.Id,
DbInstanceId: backup.DbInstanceId,
DbName: backup.DbName,
}
binlogInfo, err := dbProgram.Backup(ctx, history)
if err != nil {
return err
}
now := time.Now()
name := backup.DbName
if len(backup.Name) > 0 {
name = fmt.Sprintf("%s-%s", backup.DbName, backup.Name)
}
history.Name = fmt.Sprintf("%s[%s]", name, now.Format(time.DateTime))
history.CreateTime = now
history.BinlogFileName = binlogInfo.FileName
history.BinlogSequence = binlogInfo.Sequence
history.BinlogPosition = binlogInfo.Position
if err := s.backupHistoryRepo.Insert(ctx, history); err != nil {
return err
}
return nil
}
func (s *dbScheduler) restore(ctx context.Context, dbProgram dbi.DbProgram, job entity.DbJob) error {
restore := job.(*entity.DbRestore)
if restore.PointInTime.Valid {
//if enabled, err := dbProgram.CheckBinlogEnabled(ctx); err != nil {
// return err
//} else if !enabled {
// return errors.New("数据库未启用 BINLOG")
//}
//if enabled, err := dbProgram.CheckBinlogRowFormat(ctx); err != nil {
// return err
//} else if !enabled {
// return errors.New("数据库未启用 BINLOG 行模式")
//}
//
//latestBinlogSequence, earliestBackupSequence := int64(-1), int64(-1)
//binlogHistory, ok, err := s.binlogHistoryRepo.GetLatestHistory(restore.DbInstanceId)
//if err != nil {
// return err
//}
//if ok {
// latestBinlogSequence = binlogHistory.Sequence
//} else {
// backupHistory, ok, err := s.backupHistoryRepo.GetEarliestHistory(restore.DbInstanceId)
// if err != nil {
// return err
// }
// if !ok {
// return nil
// }
// earliestBackupSequence = backupHistory.BinlogSequence
//}
//binlogFiles, err := dbProgram.FetchBinlogs(ctx, true, earliestBackupSequence, latestBinlogSequence)
//if err != nil {
// return err
//}
//if err := s.binlogHistoryRepo.InsertWithBinlogFiles(ctx, restore.DbInstanceId, binlogFiles); err != nil {
// return err
//}
if err := s.fetchBinlog(ctx, dbProgram, job.GetInstanceId(), true); err != nil {
return err
}
if err := s.restorePointInTime(ctx, dbProgram, restore); err != nil {
return err
}
} else {
backupHistory := &entity.DbBackupHistory{}
if err := s.backupHistoryRepo.GetById(backupHistory, restore.DbBackupHistoryId); err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
err = errors.New("备份历史已删除")
}
return err
}
if err := s.restoreBackupHistory(ctx, dbProgram, backupHistory); err != nil {
return err
}
}
history := &entity.DbRestoreHistory{
CreateTime: time.Now(),
DbRestoreId: restore.Id,
}
if err := s.restoreHistoryRepo.Insert(ctx, history); err != nil {
return err
}
return nil
}
//func (s *dbScheduler) updateLastStatus(ctx context.Context, job entity.DbJob) error {
// switch typ := job.GetJobType(); typ {
// case entity.DbJobTypeBackup:
// return s.backupRepo.UpdateLastStatus(ctx, job)
// case entity.DbJobTypeRestore:
// return s.restoreRepo.UpdateLastStatus(ctx, job)
// case entity.DbJobTypeBinlog:
// return s.binlogRepo.UpdateLastStatus(ctx, job)
// default:
// panic(fmt.Errorf("无效的数据库任务类型: %v", typ))
// }
//}
func (s *dbScheduler) updateJob(ctx context.Context, job entity.DbJob) error {
switch typ := job.GetJobType(); typ {
case entity.DbJobTypeBackup:
return s.backupRepo.UpdateById(ctx, job)
case entity.DbJobTypeRestore:
return s.restoreRepo.UpdateById(ctx, job)
case entity.DbJobTypeBinlog:
return s.binlogRepo.UpdateById(ctx, job)
default:
return fmt.Errorf("无效的数据库任务类型: %v", typ)
}
}
func (s *dbScheduler) runJob(ctx context.Context, job entity.DbJob) error {
//job.SetLastStatus(entity.DbJobRunning, nil)
//if err := s.updateLastStatus(ctx, job); err != nil {
// logx.Errorf("failed to update job status: %v", err)
// return
//}
//var errRun error
conn, err := s.dbApp.GetDbConnByInstanceId(job.GetInstanceId())
if err != nil {
return err
}
dbProgram := conn.GetDialect().GetDbProgram()
switch typ := job.GetJobType(); typ {
case entity.DbJobTypeBackup:
return s.backup(ctx, dbProgram, job)
case entity.DbJobTypeRestore:
return s.restore(ctx, dbProgram, job)
case entity.DbJobTypeBinlog:
return s.fetchBinlog(ctx, dbProgram, job.GetInstanceId(), false)
default:
return fmt.Errorf("无效的数据库任务类型: %v", typ)
}
//status := entity.DbJobSuccess
//if errRun != nil {
// status = entity.DbJobFailed
//}
//job.SetLastStatus(status, errRun)
//if err := s.updateLastStatus(ctx, job); err != nil {
// logx.Errorf("failed to update job status: %v", err)
// return
//}
}
func (s *dbScheduler) runnableJob(job entity.DbJob, next runner.NextJobFunc[entity.DbJob]) (bool, error) {
if job.IsExpired() {
return false, runner.ErrJobExpired
}
const maxCountByInstanceId = 4
const maxCountByDbName = 1
var countByInstanceId, countByDbName int
for item, ok := next(); ok; item, ok = next() {
if job.GetInstanceId() == item.GetInstanceId() {
countByInstanceId++
if countByInstanceId >= maxCountByInstanceId {
return false, nil
}
if relatedToBinlog(job.GetJobType()) {
// todo: 恢复数据库前触发 BINLOG 同步BINLOG 同步完成后才能恢复数据库
if relatedToBinlog(item.GetJobType()) {
return false, nil
}
}
if job.GetDbName() == item.GetDbName() {
countByDbName++
if countByDbName >= maxCountByDbName {
return false, nil
}
}
}
}
return true, nil
}
func relatedToBinlog(typ entity.DbJobType) bool {
return typ == entity.DbJobTypeRestore || typ == entity.DbJobTypeBinlog
}
func (s *dbScheduler) restorePointInTime(ctx context.Context, dbProgram dbi.DbProgram, job *entity.DbRestore) error {
binlogHistory, err := s.binlogHistoryRepo.GetHistoryByTime(job.DbInstanceId, job.PointInTime.Time)
if err != nil {
return err
}
position, err := dbProgram.GetBinlogEventPositionAtOrAfterTime(ctx, binlogHistory.FileName, job.PointInTime.Time)
if err != nil {
return err
}
target := &entity.BinlogInfo{
FileName: binlogHistory.FileName,
Sequence: binlogHistory.Sequence,
Position: position,
}
backupHistory, err := s.backupHistoryRepo.GetLatestHistory(job.DbInstanceId, job.DbName, target)
if err != nil {
return err
}
start := &entity.BinlogInfo{
FileName: backupHistory.BinlogFileName,
Sequence: backupHistory.BinlogSequence,
Position: backupHistory.BinlogPosition,
}
binlogHistories, err := s.binlogHistoryRepo.GetHistories(job.DbInstanceId, start, target)
if err != nil {
return err
}
restoreInfo := &dbi.RestoreInfo{
BackupHistory: backupHistory,
BinlogHistories: binlogHistories,
StartPosition: backupHistory.BinlogPosition,
TargetPosition: target.Position,
TargetTime: job.PointInTime.Time,
}
if err := dbProgram.ReplayBinlog(ctx, job.DbName, job.DbName, restoreInfo); err != nil {
return err
}
if err := s.restoreBackupHistory(ctx, dbProgram, backupHistory); err != nil {
return err
}
// 由于 ReplayBinlog 未记录 BINLOG 事件,系统自动备份,避免数据丢失
backup := &entity.DbBackup{
DbInstanceId: backupHistory.DbInstanceId,
DbName: backupHistory.DbName,
Enabled: true,
Repeated: false,
StartTime: time.Now(),
Interval: 0,
Name: "系统备份",
}
backup.Id = backupHistory.DbBackupId
if err := s.backup(ctx, dbProgram, backup); err != nil {
return err
}
return nil
}
func (s *dbScheduler) restoreBackupHistory(ctx context.Context, program dbi.DbProgram, backupHistory *entity.DbBackupHistory) (retErr error) {
ok, err := s.backupHistoryRepo.UpdateRestoring(true, backupHistory.Id)
if err != nil {
return err
}
defer func() {
_, err = s.backupHistoryRepo.UpdateRestoring(false, backupHistory.Id)
if err == nil {
return
}
if retErr == nil {
retErr = err
return
}
retErr = fmt.Errorf("%w, %w", retErr, err)
}()
if !ok {
return errors.New("关联的数据库备份历史已删除")
}
return program.RestoreBackupHistory(ctx, backupHistory.DbName, backupHistory.DbBackupId, backupHistory.Uuid)
}
func (s *dbScheduler) fetchBinlog(ctx context.Context, dbProgram dbi.DbProgram, instanceId uint64, downloadLatestBinlogFile bool) error {
if enabled, err := dbProgram.CheckBinlogEnabled(ctx); err != nil {
return err
} else if !enabled {
return errors.New("数据库未启用 BINLOG")
}
if enabled, err := dbProgram.CheckBinlogRowFormat(ctx); err != nil {
return err
} else if !enabled {
return errors.New("数据库未启用 BINLOG 行模式")
}
latestBinlogSequence, earliestBackupSequence := int64(-1), int64(-1)
binlogHistory, ok, err := s.binlogHistoryRepo.GetLatestHistory(instanceId)
if err != nil {
return err
}
if ok {
latestBinlogSequence = binlogHistory.Sequence
} else {
backupHistory, ok, err := s.backupHistoryRepo.GetEarliestHistory(instanceId)
if err != nil {
return err
}
if !ok {
return nil
}
earliestBackupSequence = backupHistory.BinlogSequence
}
binlogFiles, err := dbProgram.FetchBinlogs(ctx, downloadLatestBinlogFile, earliestBackupSequence, latestBinlogSequence)
if err != nil {
return err
}
return s.binlogHistoryRepo.InsertWithBinlogFiles(ctx, instanceId, binlogFiles)
}