diff --git a/mayfly_go_web/src/views/ops/db/DbRestoreEdit.vue b/mayfly_go_web/src/views/ops/db/DbRestoreEdit.vue index 705d67be..38a3107d 100644 --- a/mayfly_go_web/src/views/ops/db/DbRestoreEdit.vue +++ b/mayfly_go_web/src/views/ops/db/DbRestoreEdit.vue @@ -35,7 +35,13 @@ clearable class="w100" > - + + @@ -83,20 +89,30 @@ const visible = defineModel('visible', { }); const validatePointInTime = (rule: any, value: any, callback: any) => { - if (!state.histories || state.histories.length == 0) { - callback(new Error('数据库没有备份记录')); - return; - } - const history = state.histories[state.histories.length - 1]; - if (value < new Date(history.createTime)) { - callback(new Error('在此之前数据库没有备份记录')); - return; - } if (value > new Date()) { callback(new Error('恢复时间点晚于当前时间')); return; } - callback(); + if (!state.histories || state.histories.length == 0) { + callback(new Error('数据库没有备份记录')); + return; + } + let last = null; + for (const history of state.histories) { + if (!history.binlogFileName || history.binlogFileName.length === 0) { + break; + } + if (new Date(history.createTime) < value) { + callback(); + return; + } + last = history; + } + if (!last) { + callback(new Error('现有数据库备份不支持指定时间恢复')); + return; + } + callback(last.name + ' 之前的数据库备份不支持指定时间恢复'); }; const rules = { @@ -110,7 +126,6 @@ const rules = { pointInTime: [ { required: true, - // message: '请选择恢复时间点', validator: validatePointInTime, trigger: ['change', 'blur'], }, diff --git a/server/config.yml.example b/server/config.yml.example index d7911118..bb40243a 100644 --- a/server/config.yml.example +++ b/server/config.yml.example @@ -46,13 +46,3 @@ log: # file: # path: ./ # name: mayfly-go.log -db: - backup-path: ./backup - mysqlutil-path: - mysql: ./mysqlutil/bin/mysql - mysqldump: ./mysqlutil/bin/mysqldump - mysqlbinlog: ./mysqlutil/bin/mysqlbinlog - mariadbutil-path: - mysql: ./mariadbutil/bin/mariadb - mysqldump: ./mariadbutil/bin/mariadb-dump - mysqlbinlog: ./mariadbutil/bin/mariadb-binlog \ No newline at end of file diff --git a/server/internal/db/api/db_backup.go b/server/internal/db/api/db_backup.go index 0c4d8cc2..8c6e6ec3 100644 --- a/server/internal/db/api/db_backup.go +++ b/server/internal/db/api/db_backup.go @@ -55,13 +55,13 @@ func (d *DbBackup) Create(rc *req.Ctx) { for _, dbName := range dbNames { job := &entity.DbBackup{ DbJobBaseImpl: entity.NewDbBJobBase(db.InstanceId, entity.DbJobTypeBackup), + DbName: dbName, Enabled: true, Repeated: backupForm.Repeated, StartTime: backupForm.StartTime, Interval: backupForm.Interval, Name: backupForm.Name, } - job.DbName = dbName jobs = append(jobs, job) } biz.ErrIsNilAppendErr(d.DbBackupApp.Create(rc.MetaCtx, jobs), "添加数据库备份任务失败: %v") diff --git a/server/internal/db/api/db_restore.go b/server/internal/db/api/db_restore.go index 060d6e48..1c8d0e14 100644 --- a/server/internal/db/api/db_restore.go +++ b/server/internal/db/api/db_restore.go @@ -128,7 +128,7 @@ func (d *DbRestore) GetDbNamesWithoutRestore(rc *req.Ctx) { rc.ResData = dbNamesWithoutRestore } -// 获取数据库备份历史 +// GetHistoryPageList 获取数据库备份历史 // @router /api/dbs/:dbId/restores/:restoreId/histories [GET] func (d *DbRestore) GetHistoryPageList(rc *req.Ctx) { queryCond := &entity.DbRestoreHistoryQuery{ diff --git a/server/internal/db/api/vo/db_backup.go b/server/internal/db/api/vo/db_backup.go index 601c9181..17d0e2c9 100644 --- a/server/internal/db/api/vo/db_backup.go +++ b/server/internal/db/api/vo/db_backup.go @@ -30,9 +30,10 @@ func (backup *DbBackup) MarshalJSON() ([]byte, error) { // DbBackupHistory 数据库备份历史 type DbBackupHistory struct { - Id uint64 `json:"id"` - DbBackupId uint64 `json:"dbBackupId"` - CreateTime time.Time `json:"createTime"` - DbName string `json:"dbName"` // 数据库名称 - Name string `json:"name"` // 备份历史名称 + Id uint64 `json:"id"` + DbBackupId uint64 `json:"dbBackupId"` + CreateTime time.Time `json:"createTime"` + DbName string `json:"dbName"` // 数据库名称 + Name string `json:"name"` // 备份历史名称 + BinlogFileName string `json:"binlogFileName"` } diff --git a/server/internal/db/application/db_scheduler.go b/server/internal/db/application/db_scheduler.go index dc12b2a3..cec05fd8 100644 --- a/server/internal/db/application/db_scheduler.go +++ b/server/internal/db/application/db_scheduler.go @@ -219,6 +219,17 @@ func (s *dbScheduler) restoreMysql(ctx context.Context, job entity.DbJob) error } dbProgram := conn.GetDialect().GetDbProgram() if restore.PointInTime.Valid { + if enabled, err := dbProgram.CheckBinlogEnabled(ctx); err != nil { + return err + } else if !enabled { + return errors.New("数据库未启用 BINLOG") + } + if enabled, err := dbProgram.CheckBinlogRowFormat(ctx); err != nil { + return err + } else if !enabled { + return errors.New("数据库未启用 BINLOG 行模式") + } + latestBinlogSequence, earliestBackupSequence := int64(-1), int64(-1) binlogHistory, ok, err := s.binlogHistoryRepo.GetLatestHistory(restore.DbInstanceId) if err != nil { @@ -363,7 +374,25 @@ func (s *dbScheduler) restorePointInTime(ctx context.Context, program dbi.DbProg if err := program.RestoreBackupHistory(ctx, backupHistory.DbName, backupHistory.DbBackupId, backupHistory.Uuid); err != nil { return err } - return program.ReplayBinlog(ctx, job.DbName, job.DbName, restoreInfo) + if err := program.ReplayBinlog(ctx, job.DbName, job.DbName, restoreInfo); err != nil { + return err + } + + // 由于 ReplayBinlog 未记录 BINLOG 事件,系统自动备份,避免数据丢失 + backup := &entity.DbBackup{ + DbJobBaseImpl: entity.NewDbBJobBase(backupHistory.DbInstanceId, entity.DbJobTypeBackup), + DbName: backupHistory.DbName, + Enabled: true, + Repeated: false, + StartTime: time.Now(), + Interval: 0, + Name: fmt.Sprintf("%s-系统自动备份", backupHistory.DbName), + } + backup.Id = backupHistory.DbBackupId + if err := s.backupMysql(ctx, backup); err != nil { + return err + } + return nil } func (s *dbScheduler) restoreBackupHistory(ctx context.Context, program dbi.DbProgram, job *entity.DbRestore) error { @@ -399,8 +428,8 @@ func (s *dbScheduler) fetchBinlogMysql(ctx context.Context, backup entity.DbJob) } dbProgram := conn.GetDialect().GetDbProgram() binlogFiles, err := dbProgram.FetchBinlogs(ctx, false, earliestBackupSequence, latestBinlogSequence) - if err == nil { - err = s.binlogHistoryRepo.InsertWithBinlogFiles(ctx, instanceId, binlogFiles) + if err != nil { + return err } - return nil + return s.binlogHistoryRepo.InsertWithBinlogFiles(ctx, instanceId, binlogFiles) } diff --git a/server/internal/db/dbm/dbi/db_program.go b/server/internal/db/dbm/dbi/db_program.go index 938e4dd8..8de5063f 100644 --- a/server/internal/db/dbm/dbi/db_program.go +++ b/server/internal/db/dbm/dbi/db_program.go @@ -8,6 +8,9 @@ import ( ) type DbProgram interface { + CheckBinlogEnabled(ctx context.Context) (bool, error) + CheckBinlogRowFormat(ctx context.Context) (bool, error) + Backup(ctx context.Context, backupHistory *entity.DbBackupHistory) (*entity.BinlogInfo, error) FetchBinlogs(ctx context.Context, downloadLatestBinlogFile bool, earliestBackupSequence, latestBinlogSequence int64) ([]*entity.BinlogFile, error) diff --git a/server/internal/db/dbm/mysql/program.go b/server/internal/db/dbm/mysql/program.go index 2b33c4e0..fbc0fb43 100644 --- a/server/internal/db/dbm/mysql/program.go +++ b/server/internal/db/dbm/mysql/program.go @@ -77,6 +77,15 @@ func (svc *DbProgramMysql) GetBinlogFilePath(fileName string) string { } func (svc *DbProgramMysql) Backup(ctx context.Context, backupHistory *entity.DbBackupHistory) (*entity.BinlogInfo, error) { + binlogEnabled, err := svc.CheckBinlogEnabled(ctx) + if err != nil { + return nil, err + } + rowFormatEnabled, err := svc.CheckBinlogRowFormat(ctx) + if err != nil { + return nil, err + } + dir := svc.getDbBackupDir(backupHistory.DbInstanceId, backupHistory.DbBackupId) if err := os.MkdirAll(dir, os.ModePerm); err != nil { return nil, err @@ -95,10 +104,11 @@ func (svc *DbProgramMysql) Backup(ctx context.Context, backupHistory *entity.DbB "--add-drop-database", "--result-file", tmpFile, "--single-transaction", - "--master-data=2", "--databases", backupHistory.DbName, } - + if binlogEnabled && rowFormatEnabled { + args = append(args, "--master-data=2") + } cmd := exec.CommandContext(ctx, svc.getMysqlBin().MysqldumpPath, args...) logx.Debugf("backup database using mysqldump binary: %s", cmd.String()) if err := runCmd(cmd); err != nil { @@ -115,7 +125,10 @@ func (svc *DbProgramMysql) Backup(ctx context.Context, backupHistory *entity.DbB if err != nil { return nil, err } - binlogInfo, err := readBinlogInfoFromBackup(reader) + binlogInfo := &entity.BinlogInfo{} + if binlogEnabled && rowFormatEnabled { + binlogInfo, err = readBinlogInfoFromBackup(reader) + } _ = reader.Close() if err != nil { return nil, errors.Wrapf(err, "从备份文件中读取 binlog 信息失败") @@ -568,18 +581,24 @@ func (svc *DbProgramMysql) ReplayBinlog(ctx context.Context, originalDatabase, t return errors.Wrap(err, "启动 mysqlbinlog 程序失败") } defer func() { + _ = mysqlbinlogCmd.Cancel() if err := mysqlbinlogCmd.Wait(); err != nil { - if replayErr != nil { - replayErr = errors.Wrap(replayErr, "运行 mysqlbinlog 程序失败") - } else { - replayErr = errors.Errorf("运行 mysqlbinlog 程序失败: %s", mysqlbinlogErr.String()) + if mysqlbinlogErr.Len() > 0 { + logx.Errorf("运行 mysqlbinlog 程序失败", mysqlbinlogErr.String()) + if replayErr != nil { + replayErr = errors.Wrap(replayErr, "运行 mysqlbinlog 程序失败: "+mysqlbinlogErr.String()) + } else { + replayErr = errors.Errorf("运行 mysqlbinlog 程序失败: %s", mysqlbinlogErr.String()) + } } } }() if err := mysqlCmd.Start(); err != nil { + logx.Error("启动 mysql 程序失败") return errors.Wrap(err, "启动 mysql 程序失败") } if err := mysqlCmd.Wait(); err != nil { + logx.Errorf("运行 mysql 程序失败: %s", mysqlErr.String()) return errors.Errorf("运行 mysql 程序失败: %s", mysqlErr.String()) } @@ -606,27 +625,30 @@ func (svc *DbProgramMysql) getServerVariable(_ context.Context, varName string) } // CheckBinlogEnabled checks whether binlog is enabled for the current instance. -func (svc *DbProgramMysql) CheckBinlogEnabled(ctx context.Context) error { +func (svc *DbProgramMysql) CheckBinlogEnabled(ctx context.Context) (bool, error) { value, err := svc.getServerVariable(ctx, "log_bin") - if err != nil { - return err + switch { + case err == nil: + return strings.ToUpper(value) == "ON", nil + case errors.Is(err, sql.ErrNoRows): + return false, nil + default: + return false, err } - if strings.ToUpper(value) != "ON" { - return errors.Errorf("数据库未启用 binlog") - } - return nil } -// CheckBinlogRowFormat checks whether the binlog format is ROW. -func (svc *DbProgramMysql) CheckBinlogRowFormat(ctx context.Context) error { +// CheckBinlogRowFormat checks whether the binlog format is ROW or MIXED. +func (svc *DbProgramMysql) CheckBinlogRowFormat(ctx context.Context) (bool, error) { value, err := svc.getServerVariable(ctx, "binlog_format") - if err != nil { - return err + switch { + case err == nil: + value = strings.ToUpper(value) + return value == "ROW" || value == "MIXED", nil + case errors.Is(err, sql.ErrNoRows): + return false, nil + default: + return false, err } - if strings.ToUpper(value) != "ROW" { - return errors.Errorf("binlog 格式 %s 不是行模式", value) - } - return nil } func runCmd(cmd *exec.Cmd) error { diff --git a/server/internal/db/dbm/mysql/program_e2e_test.go b/server/internal/db/dbm/mysql/program_e2e_test.go index 8fbb5ea3..8a99d46f 100644 --- a/server/internal/db/dbm/mysql/program_e2e_test.go +++ b/server/internal/db/dbm/mysql/program_e2e_test.go @@ -153,8 +153,9 @@ func (s *DbInstanceSuite) TestRestorePontInTime() { s.createTable(dbNameBackupTest, tableNameRestorePITTest, "") s.selectTable(dbNameBackupTest, tableNameRestorePITTest, "") time.Sleep(time.Second) - targetTime := time.Now() + // 首次恢复数据库 + firstTargetTime := time.Now() s.dropTable(dbNameBackupTest, tableNameBackupTest, "") s.selectTable(dbNameBackupTest, tableNameBackupTest, "运行 mysql 程序失败") s.createTable(dbNameBackupTest, tableNameNoBackupTest, "") @@ -165,7 +166,28 @@ func (s *DbInstanceSuite) TestRestorePontInTime() { s.selectTable(dbNameBackupTest, tableNameRestorePITTest, "运行 mysql 程序失败") s.selectTable(dbNameBackupTest, tableNameNoBackupTest, "运行 mysql 程序失败") - s.testReplayBinlog(backupHistory, targetTime) + s.testReplayBinlog(backupHistory, firstTargetTime) + s.selectTable(dbNameBackupTest, tableNameBackupTest, "") + s.selectTable(dbNameBackupTest, tableNameRestorePITTest, "") + s.selectTable(dbNameBackupTest, tableNameNoBackupTest, "运行 mysql 程序失败") + + s.testBackup(backupHistory) + + // 再次恢复数据库 + secondTargetTime := time.Now() + s.dropTable(dbNameBackupTest, tableNameBackupTest, "") + s.selectTable(dbNameBackupTest, tableNameBackupTest, "运行 mysql 程序失败") + s.dropTable(dbNameBackupTest, tableNameRestorePITTest, "") + s.selectTable(dbNameBackupTest, tableNameRestorePITTest, "运行 mysql 程序失败") + s.createTable(dbNameBackupTest, tableNameNoBackupTest, "") + s.selectTable(dbNameBackupTest, tableNameNoBackupTest, "") + + s.testRestore(backupHistory) + s.selectTable(dbNameBackupTest, tableNameBackupTest, "") + s.selectTable(dbNameBackupTest, tableNameRestorePITTest, "") + s.selectTable(dbNameBackupTest, tableNameNoBackupTest, "运行 mysql 程序失败") + + s.testReplayBinlog(backupHistory, secondTargetTime) s.selectTable(dbNameBackupTest, tableNameBackupTest, "") s.selectTable(dbNameBackupTest, tableNameRestorePITTest, "") s.selectTable(dbNameBackupTest, tableNameNoBackupTest, "运行 mysql 程序失败") diff --git a/server/internal/db/domain/entity/db_job.go b/server/internal/db/domain/entity/db_job.go index b73f6f14..68b83d15 100644 --- a/server/internal/db/domain/entity/db_job.go +++ b/server/internal/db/domain/entity/db_job.go @@ -21,7 +21,11 @@ const ( DbJobFailed ) -type DbJobType = string +type DbJobType string + +func (typ DbJobType) String() string { + return string(typ) +} const ( DbJobTypeBackup DbJobType = "db-backup" @@ -121,10 +125,10 @@ func (d *DbJobBaseImpl) SetLastStatus(status DbJobStatus, err error) { jobName = DbJobNameBackup case DbJobTypeRestore: jobName = DbJobNameRestore - case DbJobNameBinlog: + case DbJobTypeBinlog: jobName = DbJobNameBinlog default: - jobName = d.jobType + jobName = d.jobType.String() } d.LastStatus = status var result = jobName + statusName diff --git a/server/internal/db/domain/repository/db_backup_history.go b/server/internal/db/domain/repository/db_backup_history.go index 83652705..0f0c97c9 100644 --- a/server/internal/db/domain/repository/db_backup_history.go +++ b/server/internal/db/domain/repository/db_backup_history.go @@ -9,7 +9,7 @@ import ( type DbBackupHistory interface { base.Repo[*entity.DbBackupHistory] - // GetDbBackupHistories 分页获取数据备份历史 + // GetHistories 分页获取数据备份历史 GetHistories(condition *entity.DbBackupHistoryQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) GetLatestHistory(instanceId uint64, dbName string, bi *entity.BinlogInfo) (*entity.DbBackupHistory, error)