mirror of
https://gitee.com/dromara/mayfly-go
synced 2026-03-23 10:25:53 +08:00
!100 定时清理数据库备份数据
* feat: 优化数据库 BINLOG 同步机制 * feat: 删除数据库实例前需删除关联的数据库备份与恢复任务 * refactor: 重构数据库备份与恢复模块 * feat: 定时清理数据库备份历史和本地 Binlog 文件 * feat: 压缩数据库备份文件
This commit is contained in:
@@ -25,10 +25,13 @@ func InitIoc() {
|
||||
func Init() {
|
||||
sync.OnceFunc(func() {
|
||||
if err := GetDbBackupApp().Init(); err != nil {
|
||||
panic(fmt.Sprintf("初始化 dbBackupApp 失败: %v", err))
|
||||
panic(fmt.Sprintf("初始化 DbBackupApp 失败: %v", err))
|
||||
}
|
||||
if err := GetDbRestoreApp().Init(); err != nil {
|
||||
panic(fmt.Sprintf("初始化 dbRestoreApp 失败: %v", err))
|
||||
panic(fmt.Sprintf("初始化 DbRestoreApp 失败: %v", err))
|
||||
}
|
||||
if err := GetDbBinlogApp().Init(); err != nil {
|
||||
panic(fmt.Sprintf("初始化 DbBinlogApp 失败: %v", err))
|
||||
}
|
||||
GetDataSyncTaskApp().InitCronJob()
|
||||
})()
|
||||
|
||||
@@ -6,15 +6,24 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"gorm.io/gorm"
|
||||
"math"
|
||||
"mayfly-go/internal/db/domain/entity"
|
||||
"mayfly-go/internal/db/domain/repository"
|
||||
"mayfly-go/pkg/logx"
|
||||
"mayfly-go/pkg/model"
|
||||
"mayfly-go/pkg/utils/timex"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
const maxBackupHistoryDays = 30
|
||||
|
||||
var (
|
||||
errRestoringBackupHistory = errors.New("正在从备份历史中恢复数据库")
|
||||
)
|
||||
|
||||
type DbBackupApp struct {
|
||||
scheduler *dbScheduler `inject:"DbScheduler"`
|
||||
backupRepo repository.DbBackup `inject:"DbBackupRepo"`
|
||||
@@ -22,6 +31,10 @@ type DbBackupApp struct {
|
||||
restoreRepo repository.DbRestore `inject:"DbRestoreRepo"`
|
||||
dbApp Db `inject:"DbApp"`
|
||||
mutex sync.Mutex
|
||||
closed chan struct{}
|
||||
wg sync.WaitGroup
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func (app *DbBackupApp) Init() error {
|
||||
@@ -32,11 +45,68 @@ func (app *DbBackupApp) Init() error {
|
||||
if err := app.scheduler.AddJob(context.Background(), jobs); err != nil {
|
||||
return err
|
||||
}
|
||||
app.ctx, app.cancel = context.WithCancel(context.Background())
|
||||
app.wg.Add(1)
|
||||
go func() {
|
||||
defer app.wg.Done()
|
||||
for app.ctx.Err() == nil {
|
||||
if err := app.prune(app.ctx); err != nil {
|
||||
logx.Errorf("清理数据库备份历史失败: %s", err.Error())
|
||||
timex.SleepWithContext(app.ctx, time.Minute*15)
|
||||
continue
|
||||
}
|
||||
timex.SleepWithContext(app.ctx, time.Hour*24)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (app *DbBackupApp) prune(ctx context.Context) error {
|
||||
var jobs []*entity.DbBackup
|
||||
if err := app.backupRepo.ListByCond(map[string]any{}, &jobs); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, job := range jobs {
|
||||
if ctx.Err() != nil {
|
||||
return nil
|
||||
}
|
||||
var histories []*entity.DbBackupHistory
|
||||
historyCond := map[string]any{
|
||||
"db_backup_id": job.Id,
|
||||
}
|
||||
if err := app.backupHistoryRepo.ListByCondOrder(historyCond, &histories, "id"); err != nil {
|
||||
return err
|
||||
}
|
||||
expiringTime := time.Now().Add(-math.MaxInt64)
|
||||
if job.MaxSaveDays > 0 {
|
||||
expiringTime = time.Now().Add(-time.Hour * 24 * time.Duration(job.MaxSaveDays+1))
|
||||
}
|
||||
for _, history := range histories {
|
||||
if ctx.Err() != nil {
|
||||
return nil
|
||||
}
|
||||
if history.CreateTime.After(expiringTime) {
|
||||
break
|
||||
}
|
||||
err := app.DeleteHistory(ctx, history.Id)
|
||||
if errors.Is(err, errRestoringBackupHistory) {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (app *DbBackupApp) Close() {
|
||||
app.scheduler.Close()
|
||||
if app.cancel != nil {
|
||||
app.cancel()
|
||||
app.cancel = nil
|
||||
}
|
||||
app.wg.Wait()
|
||||
}
|
||||
|
||||
func (app *DbBackupApp) Create(ctx context.Context, jobs []*entity.DbBackup) error {
|
||||
@@ -61,7 +131,6 @@ func (app *DbBackupApp) Update(ctx context.Context, job *entity.DbBackup) error
|
||||
}
|
||||
|
||||
func (app *DbBackupApp) Delete(ctx context.Context, jobId uint64) error {
|
||||
// todo: 删除数据库备份历史文件
|
||||
app.mutex.Lock()
|
||||
defer app.mutex.Unlock()
|
||||
|
||||
@@ -76,7 +145,7 @@ func (app *DbBackupApp) Delete(ctx context.Context, jobId uint64) error {
|
||||
default:
|
||||
return err
|
||||
case err == nil:
|
||||
return fmt.Errorf("数据库备份存在历史记录【%s】,无法删除该任务", history.Name)
|
||||
return fmt.Errorf("请先删除关联的数据库备份历史【%s】", history.Name)
|
||||
case errors.Is(err, gorm.ErrRecordNotFound):
|
||||
}
|
||||
if err := app.backupRepo.DeleteById(ctx, jobId); err != nil {
|
||||
@@ -184,27 +253,18 @@ func NewIncUUID() (uuid.UUID, error) {
|
||||
}
|
||||
|
||||
func (app *DbBackupApp) DeleteHistory(ctx context.Context, historyId uint64) (retErr error) {
|
||||
// todo: 删除数据库备份历史文件
|
||||
app.mutex.Lock()
|
||||
defer app.mutex.Unlock()
|
||||
|
||||
if _, err := app.backupHistoryRepo.UpdateDeleting(false, historyId); err != nil {
|
||||
return err
|
||||
}
|
||||
ok, err := app.backupHistoryRepo.UpdateDeleting(true, historyId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
_, err = app.backupHistoryRepo.UpdateDeleting(false, historyId)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
if retErr == nil {
|
||||
retErr = err
|
||||
return
|
||||
}
|
||||
retErr = fmt.Errorf("%w, %w", retErr, err)
|
||||
}()
|
||||
if !ok {
|
||||
return errors.New("正在从备份历史中恢复数据库")
|
||||
return errRestoringBackupHistory
|
||||
}
|
||||
job := &entity.DbBackupHistory{}
|
||||
if err := app.backupHistoryRepo.GetById(job, historyId); err != nil {
|
||||
@@ -214,7 +274,10 @@ func (app *DbBackupApp) DeleteHistory(ctx context.Context, historyId uint64) (re
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dbProgram := conn.GetDialect().GetDbProgram()
|
||||
dbProgram, err := conn.GetDialect().GetDbProgram()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := dbProgram.RemoveBackupHistory(ctx, job.DbBackupId, job.Uuid); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package application
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"mayfly-go/internal/db/domain/entity"
|
||||
"mayfly-go/internal/db/domain/repository"
|
||||
"mayfly-go/pkg/logx"
|
||||
@@ -11,9 +12,13 @@ import (
|
||||
)
|
||||
|
||||
type DbBinlogApp struct {
|
||||
scheduler *dbScheduler `inject:"DbScheduler"`
|
||||
binlogRepo repository.DbBinlog `inject:"DbBinlogRepo"`
|
||||
backupRepo repository.DbBackup `inject:"DbBackupRepo"`
|
||||
scheduler *dbScheduler `inject:"DbScheduler"`
|
||||
binlogRepo repository.DbBinlog `inject:"DbBinlogRepo"`
|
||||
binlogHistoryRepo repository.DbBinlogHistory `inject:"DbBinlogHistoryRepo"`
|
||||
backupRepo repository.DbBackup `inject:"DbBackupRepo"`
|
||||
backupHistoryRepo repository.DbBackupHistory `inject:"DbBackupHistoryRepo"`
|
||||
instanceRepo repository.Instance `inject:"DbInstanceRepo"`
|
||||
dbApp Db `inject:"DbApp"`
|
||||
|
||||
context context.Context
|
||||
cancel context.CancelFunc
|
||||
@@ -26,41 +31,113 @@ func newDbBinlogApp() *DbBinlogApp {
|
||||
context: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
svc.waitGroup.Add(1)
|
||||
go svc.run()
|
||||
return svc
|
||||
}
|
||||
|
||||
func (app *DbBinlogApp) Init() error {
|
||||
app.context, app.cancel = context.WithCancel(context.Background())
|
||||
app.waitGroup.Add(1)
|
||||
go app.run()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (app *DbBinlogApp) run() {
|
||||
defer app.waitGroup.Done()
|
||||
|
||||
// todo: 实现 binlog 并发下载
|
||||
timex.SleepWithContext(app.context, time.Minute)
|
||||
for !app.closed() {
|
||||
jobs, err := app.loadJobs()
|
||||
if err != nil {
|
||||
logx.Errorf("DbBinlogApp: 加载 BINLOG 同步任务失败: %s", err.Error())
|
||||
for app.context.Err() == nil {
|
||||
if err := app.fetchBinlog(app.context); err != nil {
|
||||
timex.SleepWithContext(app.context, time.Minute)
|
||||
continue
|
||||
}
|
||||
if app.closed() {
|
||||
break
|
||||
}
|
||||
if err := app.scheduler.AddJob(app.context, jobs); err != nil {
|
||||
logx.Error("DbBinlogApp: 添加 BINLOG 同步任务失败: ", err.Error())
|
||||
if err := app.pruneBinlog(app.context); err != nil {
|
||||
timex.SleepWithContext(app.context, time.Minute)
|
||||
continue
|
||||
}
|
||||
timex.SleepWithContext(app.context, entity.BinlogDownloadInterval)
|
||||
}
|
||||
}
|
||||
|
||||
func (app *DbBinlogApp) loadJobs() ([]*entity.DbBinlog, error) {
|
||||
func (app *DbBinlogApp) fetchBinlog(ctx context.Context) error {
|
||||
jobs, err := app.loadJobs(ctx)
|
||||
if err != nil {
|
||||
logx.Errorf("DbBinlogApp: 加载 BINLOG 同步任务失败: %s", err.Error())
|
||||
timex.SleepWithContext(app.context, time.Minute)
|
||||
return err
|
||||
}
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
if err := app.scheduler.AddJob(app.context, jobs); err != nil {
|
||||
logx.Error("DbBinlogApp: 添加 BINLOG 同步任务失败: ", err.Error())
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (app *DbBinlogApp) pruneBinlog(ctx context.Context) error {
|
||||
var jobs []*entity.DbBinlog
|
||||
if err := app.binlogRepo.ListByCond(map[string]any{}, &jobs); err != nil {
|
||||
logx.Error("DbBinlogApp: 获取 BINLOG 同步任务失败: ", err.Error())
|
||||
return err
|
||||
}
|
||||
for _, instance := range jobs {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
var histories []*entity.DbBinlogHistory
|
||||
backupHistory, backupHistoryExists, err := app.backupHistoryRepo.GetEarliestHistoryForBinlog(instance.Id)
|
||||
if err != nil {
|
||||
logx.Errorf("DbBinlogApp: 获取数据库备份历史失败: %s", err.Error())
|
||||
return err
|
||||
}
|
||||
var binlogSeq int64 = math.MaxInt64
|
||||
if backupHistoryExists {
|
||||
binlogSeq = backupHistory.BinlogSequence
|
||||
}
|
||||
if err := app.binlogHistoryRepo.GetHistoriesBeforeSequence(ctx, instance.Id, binlogSeq, &histories); err != nil {
|
||||
logx.Errorf("DbBinlogApp: 获取数据库 BINLOG 历史失败: %s", err.Error())
|
||||
return err
|
||||
}
|
||||
conn, err := app.dbApp.GetDbConnByInstanceId(instance.Id)
|
||||
if err != nil {
|
||||
logx.Errorf("DbBinlogApp: 创建数据库连接失败: %s", err.Error())
|
||||
return err
|
||||
}
|
||||
dbProgram, err := conn.GetDialect().GetDbProgram()
|
||||
if err != nil {
|
||||
logx.Errorf("DbBinlogApp: 获取数据库备份与恢复程序失败: %s", err.Error())
|
||||
return err
|
||||
}
|
||||
for i, history := range histories {
|
||||
// todo: 在避免并发访问的前提下删除本地最新的 BINLOG 文件
|
||||
if !backupHistoryExists && i == len(histories)-1 {
|
||||
// 暂不删除本地最新的 BINLOG 文件
|
||||
break
|
||||
}
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
if err := dbProgram.PruneBinlog(history); err != nil {
|
||||
logx.Errorf("清理 BINLOG 文件失败: %v", err)
|
||||
continue
|
||||
}
|
||||
if err := app.binlogHistoryRepo.DeleteById(ctx, history.Id); err != nil {
|
||||
logx.Errorf("删除 BINLOG 历史失败: %v", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (app *DbBinlogApp) loadJobs(ctx context.Context) ([]*entity.DbBinlog, error) {
|
||||
var instanceIds []uint64
|
||||
if err := app.backupRepo.ListDbInstances(true, true, &instanceIds); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
jobs := make([]*entity.DbBinlog, 0, len(instanceIds))
|
||||
for _, id := range instanceIds {
|
||||
if app.closed() {
|
||||
if ctx.Err() != nil {
|
||||
break
|
||||
}
|
||||
binlog := entity.NewDbBinlog(id)
|
||||
@@ -73,14 +150,15 @@ func (app *DbBinlogApp) loadJobs() ([]*entity.DbBinlog, error) {
|
||||
}
|
||||
|
||||
func (app *DbBinlogApp) Close() {
|
||||
app.cancel()
|
||||
cancel := app.cancel
|
||||
if cancel == nil {
|
||||
return
|
||||
}
|
||||
app.cancel = nil
|
||||
cancel()
|
||||
app.waitGroup.Wait()
|
||||
}
|
||||
|
||||
func (app *DbBinlogApp) closed() bool {
|
||||
return app.context.Err() != nil
|
||||
}
|
||||
|
||||
func (app *DbBinlogApp) AddJobIfNotExists(ctx context.Context, job *entity.DbBinlog) error {
|
||||
if err := app.binlogRepo.AddJobIfNotExists(ctx, job); err != nil {
|
||||
return err
|
||||
@@ -90,11 +168,3 @@ func (app *DbBinlogApp) AddJobIfNotExists(ctx context.Context, job *entity.DbBin
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (app *DbBinlogApp) Delete(ctx context.Context, jobId uint64) error {
|
||||
// todo: 删除 Binlog 历史文件
|
||||
if err := app.binlogRepo.DeleteById(ctx, jobId); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -2,11 +2,14 @@ package application
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"gorm.io/gorm"
|
||||
"mayfly-go/internal/db/dbm"
|
||||
"mayfly-go/internal/db/dbm/dbi"
|
||||
"mayfly-go/internal/db/domain/entity"
|
||||
"mayfly-go/internal/db/domain/repository"
|
||||
"mayfly-go/pkg/base"
|
||||
"mayfly-go/pkg/biz"
|
||||
"mayfly-go/pkg/errorx"
|
||||
"mayfly-go/pkg/model"
|
||||
)
|
||||
@@ -32,6 +35,10 @@ type Instance interface {
|
||||
|
||||
type instanceAppImpl struct {
|
||||
base.AppImpl[*entity.DbInstance, repository.Instance]
|
||||
|
||||
dbApp Db `inject:"DbApp"`
|
||||
backupApp *DbBackupApp `inject:"DbBackupApp"`
|
||||
restoreApp *DbRestoreApp `inject:"DbRestoreApp"`
|
||||
}
|
||||
|
||||
// 注入DbInstanceRepo
|
||||
@@ -96,8 +103,50 @@ func (app *instanceAppImpl) Save(ctx context.Context, instanceEntity *entity.DbI
|
||||
return app.UpdateById(ctx, instanceEntity)
|
||||
}
|
||||
|
||||
func (app *instanceAppImpl) Delete(ctx context.Context, id uint64) error {
|
||||
return app.DeleteById(ctx, id)
|
||||
func (app *instanceAppImpl) Delete(ctx context.Context, instanceId uint64) error {
|
||||
instance, err := app.GetById(new(entity.DbInstance), instanceId, "name")
|
||||
biz.ErrIsNil(err, "获取数据库实例错误,数据库实例ID为: %d", instance.Id)
|
||||
|
||||
restore := &entity.DbRestore{
|
||||
DbInstanceId: instanceId,
|
||||
}
|
||||
err = app.restoreApp.restoreRepo.GetBy(restore)
|
||||
switch {
|
||||
case err == nil:
|
||||
biz.ErrNotNil(err, "不能删除数据库实例【%s】,请先删除关联的数据库恢复任务。", instance.Name)
|
||||
case errors.Is(err, gorm.ErrRecordNotFound):
|
||||
break
|
||||
default:
|
||||
biz.ErrIsNil(err, "删除数据库实例失败: %v", err)
|
||||
}
|
||||
|
||||
backup := &entity.DbBackup{
|
||||
DbInstanceId: instanceId,
|
||||
}
|
||||
err = app.backupApp.backupRepo.GetBy(backup)
|
||||
switch {
|
||||
case err == nil:
|
||||
biz.ErrNotNil(err, "不能删除数据库实例【%s】,请先删除关联的数据库备份任务。", instance.Name)
|
||||
case errors.Is(err, gorm.ErrRecordNotFound):
|
||||
break
|
||||
default:
|
||||
biz.ErrIsNil(err, "删除数据库实例失败: %v", err)
|
||||
}
|
||||
|
||||
db := &entity.Db{
|
||||
InstanceId: instanceId,
|
||||
}
|
||||
err = app.dbApp.GetBy(db)
|
||||
switch {
|
||||
case err == nil:
|
||||
biz.ErrNotNil(err, "不能删除数据库实例【%s】,请先删除关联的数据库资源。", instance.Name)
|
||||
case errors.Is(err, gorm.ErrRecordNotFound):
|
||||
break
|
||||
default:
|
||||
biz.ErrIsNil(err, "删除数据库实例失败: %v", err)
|
||||
}
|
||||
|
||||
return app.DeleteById(ctx, instanceId)
|
||||
}
|
||||
|
||||
func (app *instanceAppImpl) GetDatabases(ed *entity.DbInstance) ([]string, error) {
|
||||
@@ -55,7 +55,6 @@ func (app *DbRestoreApp) Update(ctx context.Context, job *entity.DbRestore) erro
|
||||
}
|
||||
|
||||
func (app *DbRestoreApp) Delete(ctx context.Context, jobId uint64) error {
|
||||
// todo: 删除数据库恢复历史文件
|
||||
app.mutex.Lock()
|
||||
defer app.mutex.Unlock()
|
||||
|
||||
|
||||
@@ -4,12 +4,14 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"golang.org/x/sync/singleflight"
|
||||
"gorm.io/gorm"
|
||||
"mayfly-go/internal/db/dbm/dbi"
|
||||
"mayfly-go/internal/db/domain/entity"
|
||||
"mayfly-go/internal/db/domain/repository"
|
||||
"mayfly-go/pkg/runner"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
@@ -28,6 +30,7 @@ type dbScheduler struct {
|
||||
restoreHistoryRepo repository.DbRestoreHistory `inject:"DbRestoreHistoryRepo"`
|
||||
binlogRepo repository.DbBinlog `inject:"DbBinlogRepo"`
|
||||
binlogHistoryRepo repository.DbBinlogHistory `inject:"DbBinlogHistoryRepo"`
|
||||
sfGroup singleflight.Group
|
||||
}
|
||||
|
||||
func newDbScheduler() *dbScheduler {
|
||||
@@ -76,7 +79,6 @@ func (s *dbScheduler) AddJob(ctx context.Context, jobs any) error {
|
||||
}
|
||||
|
||||
func (s *dbScheduler) RemoveJob(ctx context.Context, jobType entity.DbJobType, jobId uint64) error {
|
||||
// todo: 删除数据库备份历史文件
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
@@ -110,12 +112,11 @@ func (s *dbScheduler) StartJobNow(ctx context.Context, job entity.DbJob) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *dbScheduler) backup(ctx context.Context, dbProgram dbi.DbProgram, job entity.DbJob) error {
|
||||
func (s *dbScheduler) backup(ctx context.Context, dbProgram dbi.DbProgram, backup *entity.DbBackup) error {
|
||||
id, err := NewIncUUID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
backup := job.(*entity.DbBackup)
|
||||
history := &entity.DbBackupHistory{
|
||||
Uuid: id.String(),
|
||||
DbBackupId: backup.Id,
|
||||
@@ -143,45 +144,29 @@ func (s *dbScheduler) backup(ctx context.Context, dbProgram dbi.DbProgram, job e
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *dbScheduler) restore(ctx context.Context, dbProgram dbi.DbProgram, job entity.DbJob) error {
|
||||
restore := job.(*entity.DbRestore)
|
||||
func (s *dbScheduler) singleFlightFetchBinlog(ctx context.Context, dbProgram dbi.DbProgram, instanceId uint64, targetTime time.Time) error {
|
||||
key := strconv.FormatUint(instanceId, 10)
|
||||
for ctx.Err() == nil {
|
||||
c := s.sfGroup.DoChan(key, func() (interface{}, error) {
|
||||
if err := s.fetchBinlog(ctx, dbProgram, instanceId, true, targetTime); err != nil {
|
||||
return targetTime, err
|
||||
}
|
||||
return targetTime, nil
|
||||
})
|
||||
select {
|
||||
case res := <-c:
|
||||
if targetTime.Compare(res.Val.(time.Time)) <= 0 {
|
||||
return res.Err
|
||||
}
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
func (s *dbScheduler) restore(ctx context.Context, dbProgram dbi.DbProgram, restore *entity.DbRestore) error {
|
||||
if restore.PointInTime.Valid {
|
||||
//if enabled, err := dbProgram.CheckBinlogEnabled(ctx); err != nil {
|
||||
// return err
|
||||
//} else if !enabled {
|
||||
// return errors.New("数据库未启用 BINLOG")
|
||||
//}
|
||||
//if enabled, err := dbProgram.CheckBinlogRowFormat(ctx); err != nil {
|
||||
// return err
|
||||
//} else if !enabled {
|
||||
// return errors.New("数据库未启用 BINLOG 行模式")
|
||||
//}
|
||||
//
|
||||
//latestBinlogSequence, earliestBackupSequence := int64(-1), int64(-1)
|
||||
//binlogHistory, ok, err := s.binlogHistoryRepo.GetLatestHistory(restore.DbInstanceId)
|
||||
//if err != nil {
|
||||
// return err
|
||||
//}
|
||||
//if ok {
|
||||
// latestBinlogSequence = binlogHistory.Sequence
|
||||
//} else {
|
||||
// backupHistory, ok, err := s.backupHistoryRepo.GetEarliestHistory(restore.DbInstanceId)
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
// if !ok {
|
||||
// return nil
|
||||
// }
|
||||
// earliestBackupSequence = backupHistory.BinlogSequence
|
||||
//}
|
||||
//binlogFiles, err := dbProgram.FetchBinlogs(ctx, true, earliestBackupSequence, latestBinlogSequence)
|
||||
//if err != nil {
|
||||
// return err
|
||||
//}
|
||||
//if err := s.binlogHistoryRepo.InsertWithBinlogFiles(ctx, restore.DbInstanceId, binlogFiles); err != nil {
|
||||
// return err
|
||||
//}
|
||||
if err := s.fetchBinlog(ctx, dbProgram, job.GetInstanceId(), true); err != nil {
|
||||
if err := s.fetchBinlog(ctx, dbProgram, restore.DbInstanceId, true, restore.PointInTime.Time); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.restorePointInTime(ctx, dbProgram, restore); err != nil {
|
||||
@@ -210,102 +195,68 @@ func (s *dbScheduler) restore(ctx context.Context, dbProgram dbi.DbProgram, job
|
||||
return nil
|
||||
}
|
||||
|
||||
//func (s *dbScheduler) updateLastStatus(ctx context.Context, job entity.DbJob) error {
|
||||
// switch typ := job.GetJobType(); typ {
|
||||
// case entity.DbJobTypeBackup:
|
||||
// return s.backupRepo.UpdateLastStatus(ctx, job)
|
||||
// case entity.DbJobTypeRestore:
|
||||
// return s.restoreRepo.UpdateLastStatus(ctx, job)
|
||||
// case entity.DbJobTypeBinlog:
|
||||
// return s.binlogRepo.UpdateLastStatus(ctx, job)
|
||||
// default:
|
||||
// panic(fmt.Errorf("无效的数据库任务类型: %v", typ))
|
||||
// }
|
||||
//}
|
||||
|
||||
func (s *dbScheduler) updateJob(ctx context.Context, job entity.DbJob) error {
|
||||
switch typ := job.GetJobType(); typ {
|
||||
case entity.DbJobTypeBackup:
|
||||
return s.backupRepo.UpdateById(ctx, job)
|
||||
case entity.DbJobTypeRestore:
|
||||
return s.restoreRepo.UpdateById(ctx, job)
|
||||
case entity.DbJobTypeBinlog:
|
||||
return s.binlogRepo.UpdateById(ctx, job)
|
||||
switch t := job.(type) {
|
||||
case *entity.DbBackup:
|
||||
return s.backupRepo.UpdateById(ctx, t)
|
||||
case *entity.DbRestore:
|
||||
return s.restoreRepo.UpdateById(ctx, t)
|
||||
case *entity.DbBinlog:
|
||||
return s.binlogRepo.UpdateById(ctx, t)
|
||||
default:
|
||||
return fmt.Errorf("无效的数据库任务类型: %v", typ)
|
||||
return fmt.Errorf("无效的数据库任务类型: %T", t)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *dbScheduler) runJob(ctx context.Context, job entity.DbJob) error {
|
||||
//job.SetLastStatus(entity.DbJobRunning, nil)
|
||||
//if err := s.updateLastStatus(ctx, job); err != nil {
|
||||
// logx.Errorf("failed to update job status: %v", err)
|
||||
// return
|
||||
//}
|
||||
|
||||
//var errRun error
|
||||
conn, err := s.dbApp.GetDbConnByInstanceId(job.GetInstanceId())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dbProgram := conn.GetDialect().GetDbProgram()
|
||||
switch typ := job.GetJobType(); typ {
|
||||
case entity.DbJobTypeBackup:
|
||||
return s.backup(ctx, dbProgram, job)
|
||||
case entity.DbJobTypeRestore:
|
||||
return s.restore(ctx, dbProgram, job)
|
||||
case entity.DbJobTypeBinlog:
|
||||
return s.fetchBinlog(ctx, dbProgram, job.GetInstanceId(), false)
|
||||
default:
|
||||
return fmt.Errorf("无效的数据库任务类型: %v", typ)
|
||||
dbProgram, err := conn.GetDialect().GetDbProgram()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
switch t := job.(type) {
|
||||
case *entity.DbBackup:
|
||||
return s.backup(ctx, dbProgram, t)
|
||||
case *entity.DbRestore:
|
||||
return s.restore(ctx, dbProgram, t)
|
||||
case *entity.DbBinlog:
|
||||
return s.fetchBinlog(ctx, dbProgram, t.DbInstanceId, false, time.Now())
|
||||
default:
|
||||
return fmt.Errorf("无效的数据库任务类型: %T", t)
|
||||
}
|
||||
//status := entity.DbJobSuccess
|
||||
//if errRun != nil {
|
||||
// status = entity.DbJobFailed
|
||||
//}
|
||||
//job.SetLastStatus(status, errRun)
|
||||
//if err := s.updateLastStatus(ctx, job); err != nil {
|
||||
// logx.Errorf("failed to update job status: %v", err)
|
||||
// return
|
||||
//}
|
||||
}
|
||||
|
||||
func (s *dbScheduler) runnableJob(job entity.DbJob, next runner.NextJobFunc[entity.DbJob]) (bool, error) {
|
||||
func (s *dbScheduler) runnableJob(job entity.DbJob, nextRunning runner.NextJobFunc[entity.DbJob]) (bool, error) {
|
||||
if job.IsExpired() {
|
||||
return false, runner.ErrJobExpired
|
||||
}
|
||||
const maxCountByInstanceId = 4
|
||||
const maxCountByDbName = 1
|
||||
var countByInstanceId, countByDbName int
|
||||
for item, ok := next(); ok; item, ok = next() {
|
||||
for item, ok := nextRunning(); ok; item, ok = nextRunning() {
|
||||
if job.GetInstanceId() == item.GetInstanceId() {
|
||||
countByInstanceId++
|
||||
if countByInstanceId >= maxCountByInstanceId {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if relatedToBinlog(job.GetJobType()) {
|
||||
// todo: 恢复数据库前触发 BINLOG 同步,BINLOG 同步完成后才能恢复数据库
|
||||
if relatedToBinlog(item.GetJobType()) {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
if job.GetDbName() == item.GetDbName() {
|
||||
countByDbName++
|
||||
if countByDbName >= maxCountByDbName {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
if (job.GetJobType() == entity.DbJobTypeBinlog && item.GetJobType() == entity.DbJobTypeRestore) ||
|
||||
(job.GetJobType() == entity.DbJobTypeRestore && item.GetJobType() == entity.DbJobTypeBinlog) {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func relatedToBinlog(typ entity.DbJobType) bool {
|
||||
return typ == entity.DbJobTypeRestore || typ == entity.DbJobTypeBinlog
|
||||
}
|
||||
|
||||
func (s *dbScheduler) restorePointInTime(ctx context.Context, dbProgram dbi.DbProgram, job *entity.DbRestore) error {
|
||||
binlogHistory, err := s.binlogHistoryRepo.GetHistoryByTime(job.DbInstanceId, job.PointInTime.Time)
|
||||
if err != nil {
|
||||
@@ -320,7 +271,7 @@ func (s *dbScheduler) restorePointInTime(ctx context.Context, dbProgram dbi.DbPr
|
||||
Sequence: binlogHistory.Sequence,
|
||||
Position: position,
|
||||
}
|
||||
backupHistory, err := s.backupHistoryRepo.GetLatestHistory(job.DbInstanceId, job.DbName, target)
|
||||
backupHistory, err := s.backupHistoryRepo.GetLatestHistoryForBinlog(job.DbInstanceId, job.DbName, target)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -364,6 +315,9 @@ func (s *dbScheduler) restorePointInTime(ctx context.Context, dbProgram dbi.DbPr
|
||||
}
|
||||
|
||||
func (s *dbScheduler) restoreBackupHistory(ctx context.Context, program dbi.DbProgram, backupHistory *entity.DbBackupHistory) (retErr error) {
|
||||
if _, err := s.backupHistoryRepo.UpdateRestoring(false, backupHistory.Id); err != nil {
|
||||
return err
|
||||
}
|
||||
ok, err := s.backupHistoryRepo.UpdateRestoring(true, backupHistory.Id)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -385,7 +339,7 @@ func (s *dbScheduler) restoreBackupHistory(ctx context.Context, program dbi.DbPr
|
||||
return program.RestoreBackupHistory(ctx, backupHistory.DbName, backupHistory.DbBackupId, backupHistory.Uuid)
|
||||
}
|
||||
|
||||
func (s *dbScheduler) fetchBinlog(ctx context.Context, dbProgram dbi.DbProgram, instanceId uint64, downloadLatestBinlogFile bool) error {
|
||||
func (s *dbScheduler) fetchBinlog(ctx context.Context, dbProgram dbi.DbProgram, instanceId uint64, downloadLatestBinlogFile bool, targetTime time.Time) error {
|
||||
if enabled, err := dbProgram.CheckBinlogEnabled(ctx); err != nil {
|
||||
return err
|
||||
} else if !enabled {
|
||||
@@ -397,15 +351,17 @@ func (s *dbScheduler) fetchBinlog(ctx context.Context, dbProgram dbi.DbProgram,
|
||||
return errors.New("数据库未启用 BINLOG 行模式")
|
||||
}
|
||||
|
||||
latestBinlogSequence, earliestBackupSequence := int64(-1), int64(-1)
|
||||
earliestBackupSequence := int64(-1)
|
||||
binlogHistory, ok, err := s.binlogHistoryRepo.GetLatestHistory(instanceId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if ok {
|
||||
latestBinlogSequence = binlogHistory.Sequence
|
||||
} else {
|
||||
backupHistory, ok, err := s.backupHistoryRepo.GetEarliestHistory(instanceId)
|
||||
if downloadLatestBinlogFile && targetTime.Before(binlogHistory.LastEventTime) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !ok {
|
||||
backupHistory, ok, err := s.backupHistoryRepo.GetEarliestHistoryForBinlog(instanceId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -414,7 +370,9 @@ func (s *dbScheduler) fetchBinlog(ctx context.Context, dbProgram dbi.DbProgram,
|
||||
}
|
||||
earliestBackupSequence = backupHistory.BinlogSequence
|
||||
}
|
||||
binlogFiles, err := dbProgram.FetchBinlogs(ctx, downloadLatestBinlogFile, earliestBackupSequence, latestBinlogSequence)
|
||||
|
||||
// todo: 将循环从 dbProgram.FetchBinlogs 中提取出来,实现 BINLOG 同步成功后逐一保存 binlogHistory
|
||||
binlogFiles, err := dbProgram.FetchBinlogs(ctx, downloadLatestBinlogFile, earliestBackupSequence, binlogHistory)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user