Files
mayfly-go/server/internal/db/application/db_scheduler.go

423 lines
12 KiB
Go
Raw Normal View History

package application
import (
"context"
"errors"
"fmt"
"gorm.io/gorm"
"mayfly-go/internal/db/dbm/dbi"
"mayfly-go/internal/db/domain/entity"
"mayfly-go/internal/db/domain/repository"
"mayfly-go/pkg/runner"
"reflect"
"sync"
"time"
)
const (
maxRunning = 8
)
type dbScheduler struct {
mutex sync.Mutex
runner *runner.Runner[entity.DbJob]
2024-01-23 16:29:41 +08:00
dbApp Db `inject:"DbApp"`
backupRepo repository.DbBackup `inject:"DbBackupRepo"`
backupHistoryRepo repository.DbBackupHistory `inject:"DbBackupHistoryRepo"`
restoreRepo repository.DbRestore `inject:"DbRestoreRepo"`
restoreHistoryRepo repository.DbRestoreHistory `inject:"DbRestoreHistoryRepo"`
binlogRepo repository.DbBinlog `inject:"DbBinlogRepo"`
binlogHistoryRepo repository.DbBinlogHistory `inject:"DbBinlogHistoryRepo"`
}
func newDbScheduler() *dbScheduler {
scheduler := &dbScheduler{}
scheduler.runner = runner.NewRunner[entity.DbJob](maxRunning, scheduler.runJob,
runner.WithScheduleJob[entity.DbJob](scheduler.scheduleJob),
runner.WithRunnableJob[entity.DbJob](scheduler.runnableJob),
runner.WithUpdateJob[entity.DbJob](scheduler.updateJob),
)
return scheduler
}
func (s *dbScheduler) scheduleJob(job entity.DbJob) (time.Time, error) {
return job.Schedule()
}
func (s *dbScheduler) UpdateJob(ctx context.Context, job entity.DbJob) error {
s.mutex.Lock()
defer s.mutex.Unlock()
_ = s.runner.Update(ctx, job)
return nil
}
func (s *dbScheduler) Close() {
s.runner.Close()
}
func (s *dbScheduler) AddJob(ctx context.Context, jobs any) error {
s.mutex.Lock()
defer s.mutex.Unlock()
reflectValue := reflect.ValueOf(jobs)
switch reflectValue.Kind() {
case reflect.Array, reflect.Slice:
reflectLen := reflectValue.Len()
for i := 0; i < reflectLen; i++ {
job := reflectValue.Index(i).Interface().(entity.DbJob)
_ = s.runner.Add(ctx, job)
}
default:
job := jobs.(entity.DbJob)
_ = s.runner.Add(ctx, job)
}
return nil
}
func (s *dbScheduler) RemoveJob(ctx context.Context, jobType entity.DbJobType, jobId uint64) error {
// todo: 删除数据库备份历史文件
s.mutex.Lock()
defer s.mutex.Unlock()
if err := s.runner.Remove(ctx, entity.FormatJobKey(jobType, jobId)); err != nil {
return err
}
return nil
}
func (s *dbScheduler) EnableJob(ctx context.Context, job entity.DbJob) error {
s.mutex.Lock()
defer s.mutex.Unlock()
_ = s.runner.Add(ctx, job)
return nil
}
func (s *dbScheduler) DisableJob(ctx context.Context, jobType entity.DbJobType, jobId uint64) error {
s.mutex.Lock()
defer s.mutex.Unlock()
_ = s.runner.Remove(ctx, entity.FormatJobKey(jobType, jobId))
return nil
}
func (s *dbScheduler) StartJobNow(ctx context.Context, job entity.DbJob) error {
s.mutex.Lock()
defer s.mutex.Unlock()
_ = s.runner.StartNow(ctx, job)
return nil
}
func (s *dbScheduler) backup(ctx context.Context, dbProgram dbi.DbProgram, job entity.DbJob) error {
id, err := NewIncUUID()
if err != nil {
return err
}
backup := job.(*entity.DbBackup)
history := &entity.DbBackupHistory{
Uuid: id.String(),
DbBackupId: backup.Id,
DbInstanceId: backup.DbInstanceId,
DbName: backup.DbName,
}
binlogInfo, err := dbProgram.Backup(ctx, history)
if err != nil {
return err
}
now := time.Now()
name := backup.DbName
if len(backup.Name) > 0 {
name = fmt.Sprintf("%s-%s", backup.DbName, backup.Name)
}
history.Name = fmt.Sprintf("%s[%s]", name, now.Format(time.DateTime))
history.CreateTime = now
history.BinlogFileName = binlogInfo.FileName
history.BinlogSequence = binlogInfo.Sequence
history.BinlogPosition = binlogInfo.Position
2024-01-23 16:29:41 +08:00
if err := s.backupHistoryRepo.Insert(ctx, history); err != nil {
return err
}
return nil
}
func (s *dbScheduler) restore(ctx context.Context, dbProgram dbi.DbProgram, job entity.DbJob) error {
restore := job.(*entity.DbRestore)
if restore.PointInTime.Valid {
//if enabled, err := dbProgram.CheckBinlogEnabled(ctx); err != nil {
// return err
//} else if !enabled {
// return errors.New("数据库未启用 BINLOG")
//}
//if enabled, err := dbProgram.CheckBinlogRowFormat(ctx); err != nil {
// return err
//} else if !enabled {
// return errors.New("数据库未启用 BINLOG 行模式")
//}
//
//latestBinlogSequence, earliestBackupSequence := int64(-1), int64(-1)
//binlogHistory, ok, err := s.binlogHistoryRepo.GetLatestHistory(restore.DbInstanceId)
//if err != nil {
// return err
//}
//if ok {
// latestBinlogSequence = binlogHistory.Sequence
//} else {
// backupHistory, ok, err := s.backupHistoryRepo.GetEarliestHistory(restore.DbInstanceId)
// if err != nil {
// return err
// }
// if !ok {
// return nil
// }
// earliestBackupSequence = backupHistory.BinlogSequence
//}
//binlogFiles, err := dbProgram.FetchBinlogs(ctx, true, earliestBackupSequence, latestBinlogSequence)
//if err != nil {
// return err
//}
//if err := s.binlogHistoryRepo.InsertWithBinlogFiles(ctx, restore.DbInstanceId, binlogFiles); err != nil {
// return err
//}
if err := s.fetchBinlog(ctx, dbProgram, job.GetInstanceId(), true); err != nil {
return err
}
if err := s.restorePointInTime(ctx, dbProgram, restore); err != nil {
return err
}
} else {
backupHistory := &entity.DbBackupHistory{}
if err := s.backupHistoryRepo.GetById(backupHistory, restore.DbBackupHistoryId); err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
err = errors.New("备份历史已删除")
}
return err
}
if err := s.restoreBackupHistory(ctx, dbProgram, backupHistory); err != nil {
return err
}
}
history := &entity.DbRestoreHistory{
CreateTime: time.Now(),
DbRestoreId: restore.Id,
}
2024-01-23 16:29:41 +08:00
if err := s.restoreHistoryRepo.Insert(ctx, history); err != nil {
return err
}
return nil
}
//func (s *dbScheduler) updateLastStatus(ctx context.Context, job entity.DbJob) error {
// switch typ := job.GetJobType(); typ {
// case entity.DbJobTypeBackup:
// return s.backupRepo.UpdateLastStatus(ctx, job)
// case entity.DbJobTypeRestore:
// return s.restoreRepo.UpdateLastStatus(ctx, job)
// case entity.DbJobTypeBinlog:
// return s.binlogRepo.UpdateLastStatus(ctx, job)
// default:
// panic(fmt.Errorf("无效的数据库任务类型: %v", typ))
// }
//}
func (s *dbScheduler) updateJob(ctx context.Context, job entity.DbJob) error {
switch typ := job.GetJobType(); typ {
case entity.DbJobTypeBackup:
return s.backupRepo.UpdateById(ctx, job)
case entity.DbJobTypeRestore:
return s.restoreRepo.UpdateById(ctx, job)
case entity.DbJobTypeBinlog:
return s.binlogRepo.UpdateById(ctx, job)
default:
return fmt.Errorf("无效的数据库任务类型: %v", typ)
}
}
func (s *dbScheduler) runJob(ctx context.Context, job entity.DbJob) error {
//job.SetLastStatus(entity.DbJobRunning, nil)
//if err := s.updateLastStatus(ctx, job); err != nil {
// logx.Errorf("failed to update job status: %v", err)
// return
//}
//var errRun error
conn, err := s.dbApp.GetDbConnByInstanceId(job.GetInstanceId())
if err != nil {
return err
}
dbProgram := conn.GetDialect().GetDbProgram()
switch typ := job.GetJobType(); typ {
case entity.DbJobTypeBackup:
return s.backup(ctx, dbProgram, job)
case entity.DbJobTypeRestore:
return s.restore(ctx, dbProgram, job)
case entity.DbJobTypeBinlog:
return s.fetchBinlog(ctx, dbProgram, job.GetInstanceId(), false)
default:
return fmt.Errorf("无效的数据库任务类型: %v", typ)
}
//status := entity.DbJobSuccess
//if errRun != nil {
// status = entity.DbJobFailed
//}
//job.SetLastStatus(status, errRun)
//if err := s.updateLastStatus(ctx, job); err != nil {
// logx.Errorf("failed to update job status: %v", err)
// return
//}
}
func (s *dbScheduler) runnableJob(job entity.DbJob, next runner.NextJobFunc[entity.DbJob]) (bool, error) {
if job.IsExpired() {
return false, runner.ErrJobExpired
}
const maxCountByInstanceId = 4
const maxCountByDbName = 1
var countByInstanceId, countByDbName int
for item, ok := next(); ok; item, ok = next() {
if job.GetInstanceId() == item.GetInstanceId() {
countByInstanceId++
if countByInstanceId >= maxCountByInstanceId {
return false, nil
}
if relatedToBinlog(job.GetJobType()) {
// todo: 恢复数据库前触发 BINLOG 同步BINLOG 同步完成后才能恢复数据库
if relatedToBinlog(item.GetJobType()) {
return false, nil
}
}
if job.GetDbName() == item.GetDbName() {
countByDbName++
if countByDbName >= maxCountByDbName {
return false, nil
}
}
}
}
return true, nil
}
func relatedToBinlog(typ entity.DbJobType) bool {
return typ == entity.DbJobTypeRestore || typ == entity.DbJobTypeBinlog
}
func (s *dbScheduler) restorePointInTime(ctx context.Context, dbProgram dbi.DbProgram, job *entity.DbRestore) error {
2024-01-23 16:29:41 +08:00
binlogHistory, err := s.binlogHistoryRepo.GetHistoryByTime(job.DbInstanceId, job.PointInTime.Time)
if err != nil {
return err
}
position, err := dbProgram.GetBinlogEventPositionAtOrAfterTime(ctx, binlogHistory.FileName, job.PointInTime.Time)
if err != nil {
return err
}
target := &entity.BinlogInfo{
FileName: binlogHistory.FileName,
Sequence: binlogHistory.Sequence,
Position: position,
}
2024-01-23 16:29:41 +08:00
backupHistory, err := s.backupHistoryRepo.GetLatestHistory(job.DbInstanceId, job.DbName, target)
if err != nil {
return err
}
start := &entity.BinlogInfo{
FileName: backupHistory.BinlogFileName,
Sequence: backupHistory.BinlogSequence,
Position: backupHistory.BinlogPosition,
}
2024-01-23 16:29:41 +08:00
binlogHistories, err := s.binlogHistoryRepo.GetHistories(job.DbInstanceId, start, target)
if err != nil {
return err
}
restoreInfo := &dbi.RestoreInfo{
BackupHistory: backupHistory,
BinlogHistories: binlogHistories,
StartPosition: backupHistory.BinlogPosition,
TargetPosition: target.Position,
TargetTime: job.PointInTime.Time,
}
if err := dbProgram.ReplayBinlog(ctx, job.DbName, job.DbName, restoreInfo); err != nil {
return err
}
if err := s.restoreBackupHistory(ctx, dbProgram, backupHistory); err != nil {
return err
}
// 由于 ReplayBinlog 未记录 BINLOG 事件,系统自动备份,避免数据丢失
backup := &entity.DbBackup{
DbInstanceId: backupHistory.DbInstanceId,
DbName: backupHistory.DbName,
Enabled: true,
Repeated: false,
StartTime: time.Now(),
Interval: 0,
Name: "系统备份",
}
backup.Id = backupHistory.DbBackupId
if err := s.backup(ctx, dbProgram, backup); err != nil {
return err
}
return nil
}
func (s *dbScheduler) restoreBackupHistory(ctx context.Context, program dbi.DbProgram, backupHistory *entity.DbBackupHistory) (retErr error) {
ok, err := s.backupHistoryRepo.UpdateRestoring(true, backupHistory.Id)
if err != nil {
return err
}
defer func() {
_, err = s.backupHistoryRepo.UpdateRestoring(false, backupHistory.Id)
if err == nil {
return
}
if retErr == nil {
retErr = err
return
}
retErr = fmt.Errorf("%w, %w", retErr, err)
}()
if !ok {
return errors.New("关联的数据库备份历史已删除")
}
return program.RestoreBackupHistory(ctx, backupHistory.DbName, backupHistory.DbBackupId, backupHistory.Uuid)
}
func (s *dbScheduler) fetchBinlog(ctx context.Context, dbProgram dbi.DbProgram, instanceId uint64, downloadLatestBinlogFile bool) error {
if enabled, err := dbProgram.CheckBinlogEnabled(ctx); err != nil {
return err
} else if !enabled {
return errors.New("数据库未启用 BINLOG")
}
if enabled, err := dbProgram.CheckBinlogRowFormat(ctx); err != nil {
return err
} else if !enabled {
return errors.New("数据库未启用 BINLOG 行模式")
}
latestBinlogSequence, earliestBackupSequence := int64(-1), int64(-1)
2024-01-23 16:29:41 +08:00
binlogHistory, ok, err := s.binlogHistoryRepo.GetLatestHistory(instanceId)
if err != nil {
return err
}
if ok {
latestBinlogSequence = binlogHistory.Sequence
} else {
2024-01-23 16:29:41 +08:00
backupHistory, ok, err := s.backupHistoryRepo.GetEarliestHistory(instanceId)
if err != nil {
return err
}
if !ok {
return nil
}
earliestBackupSequence = backupHistory.BinlogSequence
}
binlogFiles, err := dbProgram.FetchBinlogs(ctx, downloadLatestBinlogFile, earliestBackupSequence, latestBinlogSequence)
if err != nil {
return err
}
2024-01-23 16:29:41 +08:00
return s.binlogHistoryRepo.InsertWithBinlogFiles(ctx, instanceId, binlogFiles)
}