!87 fix: 修复数据库备份与恢复问题

* feat: 修复数据库备份与恢复问题
* feat: 启用 BINLOG 支持全量备份和增量备份,未启用 BINLOG 仅支持全量备份
* feat: 数据库恢复后自动备份,避免数据丢失
This commit is contained in:
kanzihuang
2024-01-22 03:12:16 +00:00
committed by Coder慌
parent f27d3d200f
commit de5b9e46d3
11 changed files with 147 additions and 61 deletions

View File

@@ -35,7 +35,13 @@
clearable
class="w100"
>
<el-option v-for="item in state.histories" :key="item.id" :label="item.name" :value="item"> </el-option>
<el-option
v-for="item in state.histories"
:key="item.id"
:label="item.name + (item.binlogFileName ? ' ' : ' 不') + '支持指定时间点恢复'"
:value="item"
>
</el-option>
</el-select>
</el-form-item>
<el-form-item prop="startTime" label="开始时间">
@@ -83,20 +89,30 @@ const visible = defineModel<boolean>('visible', {
});
const validatePointInTime = (rule: any, value: any, callback: any) => {
if (!state.histories || state.histories.length == 0) {
callback(new Error('数据库没有备份记录'));
return;
}
const history = state.histories[state.histories.length - 1];
if (value < new Date(history.createTime)) {
callback(new Error('在此之前数据库没有备份记录'));
return;
}
if (value > new Date()) {
callback(new Error('恢复时间点晚于当前时间'));
return;
}
callback();
if (!state.histories || state.histories.length == 0) {
callback(new Error('数据库没有备份记录'));
return;
}
let last = null;
for (const history of state.histories) {
if (!history.binlogFileName || history.binlogFileName.length === 0) {
break;
}
if (new Date(history.createTime) < value) {
callback();
return;
}
last = history;
}
if (!last) {
callback(new Error('现有数据库备份不支持指定时间恢复'));
return;
}
callback(last.name + ' 之前的数据库备份不支持指定时间恢复');
};
const rules = {
@@ -110,7 +126,6 @@ const rules = {
pointInTime: [
{
required: true,
// message: '请选择恢复时间点',
validator: validatePointInTime,
trigger: ['change', 'blur'],
},

View File

@@ -46,13 +46,3 @@ log:
# file:
# path: ./
# name: mayfly-go.log
db:
backup-path: ./backup
mysqlutil-path:
mysql: ./mysqlutil/bin/mysql
mysqldump: ./mysqlutil/bin/mysqldump
mysqlbinlog: ./mysqlutil/bin/mysqlbinlog
mariadbutil-path:
mysql: ./mariadbutil/bin/mariadb
mysqldump: ./mariadbutil/bin/mariadb-dump
mysqlbinlog: ./mariadbutil/bin/mariadb-binlog

View File

@@ -55,13 +55,13 @@ func (d *DbBackup) Create(rc *req.Ctx) {
for _, dbName := range dbNames {
job := &entity.DbBackup{
DbJobBaseImpl: entity.NewDbBJobBase(db.InstanceId, entity.DbJobTypeBackup),
DbName: dbName,
Enabled: true,
Repeated: backupForm.Repeated,
StartTime: backupForm.StartTime,
Interval: backupForm.Interval,
Name: backupForm.Name,
}
job.DbName = dbName
jobs = append(jobs, job)
}
biz.ErrIsNilAppendErr(d.DbBackupApp.Create(rc.MetaCtx, jobs), "添加数据库备份任务失败: %v")

View File

@@ -128,7 +128,7 @@ func (d *DbRestore) GetDbNamesWithoutRestore(rc *req.Ctx) {
rc.ResData = dbNamesWithoutRestore
}
// 获取数据库备份历史
// GetHistoryPageList 获取数据库备份历史
// @router /api/dbs/:dbId/restores/:restoreId/histories [GET]
func (d *DbRestore) GetHistoryPageList(rc *req.Ctx) {
queryCond := &entity.DbRestoreHistoryQuery{

View File

@@ -30,9 +30,10 @@ func (backup *DbBackup) MarshalJSON() ([]byte, error) {
// DbBackupHistory 数据库备份历史
type DbBackupHistory struct {
Id uint64 `json:"id"`
DbBackupId uint64 `json:"dbBackupId"`
CreateTime time.Time `json:"createTime"`
DbName string `json:"dbName"` // 数据库名称
Name string `json:"name"` // 备份历史名称
Id uint64 `json:"id"`
DbBackupId uint64 `json:"dbBackupId"`
CreateTime time.Time `json:"createTime"`
DbName string `json:"dbName"` // 数据库名称
Name string `json:"name"` // 备份历史名称
BinlogFileName string `json:"binlogFileName"`
}

View File

@@ -219,6 +219,17 @@ func (s *dbScheduler) restoreMysql(ctx context.Context, job entity.DbJob) error
}
dbProgram := conn.GetDialect().GetDbProgram()
if restore.PointInTime.Valid {
if enabled, err := dbProgram.CheckBinlogEnabled(ctx); err != nil {
return err
} else if !enabled {
return errors.New("数据库未启用 BINLOG")
}
if enabled, err := dbProgram.CheckBinlogRowFormat(ctx); err != nil {
return err
} else if !enabled {
return errors.New("数据库未启用 BINLOG 行模式")
}
latestBinlogSequence, earliestBackupSequence := int64(-1), int64(-1)
binlogHistory, ok, err := s.binlogHistoryRepo.GetLatestHistory(restore.DbInstanceId)
if err != nil {
@@ -363,7 +374,25 @@ func (s *dbScheduler) restorePointInTime(ctx context.Context, program dbi.DbProg
if err := program.RestoreBackupHistory(ctx, backupHistory.DbName, backupHistory.DbBackupId, backupHistory.Uuid); err != nil {
return err
}
return program.ReplayBinlog(ctx, job.DbName, job.DbName, restoreInfo)
if err := program.ReplayBinlog(ctx, job.DbName, job.DbName, restoreInfo); err != nil {
return err
}
// 由于 ReplayBinlog 未记录 BINLOG 事件,系统自动备份,避免数据丢失
backup := &entity.DbBackup{
DbJobBaseImpl: entity.NewDbBJobBase(backupHistory.DbInstanceId, entity.DbJobTypeBackup),
DbName: backupHistory.DbName,
Enabled: true,
Repeated: false,
StartTime: time.Now(),
Interval: 0,
Name: fmt.Sprintf("%s-系统自动备份", backupHistory.DbName),
}
backup.Id = backupHistory.DbBackupId
if err := s.backupMysql(ctx, backup); err != nil {
return err
}
return nil
}
func (s *dbScheduler) restoreBackupHistory(ctx context.Context, program dbi.DbProgram, job *entity.DbRestore) error {
@@ -399,8 +428,8 @@ func (s *dbScheduler) fetchBinlogMysql(ctx context.Context, backup entity.DbJob)
}
dbProgram := conn.GetDialect().GetDbProgram()
binlogFiles, err := dbProgram.FetchBinlogs(ctx, false, earliestBackupSequence, latestBinlogSequence)
if err == nil {
err = s.binlogHistoryRepo.InsertWithBinlogFiles(ctx, instanceId, binlogFiles)
if err != nil {
return err
}
return nil
return s.binlogHistoryRepo.InsertWithBinlogFiles(ctx, instanceId, binlogFiles)
}

View File

@@ -8,6 +8,9 @@ import (
)
type DbProgram interface {
CheckBinlogEnabled(ctx context.Context) (bool, error)
CheckBinlogRowFormat(ctx context.Context) (bool, error)
Backup(ctx context.Context, backupHistory *entity.DbBackupHistory) (*entity.BinlogInfo, error)
FetchBinlogs(ctx context.Context, downloadLatestBinlogFile bool, earliestBackupSequence, latestBinlogSequence int64) ([]*entity.BinlogFile, error)

View File

@@ -77,6 +77,15 @@ func (svc *DbProgramMysql) GetBinlogFilePath(fileName string) string {
}
func (svc *DbProgramMysql) Backup(ctx context.Context, backupHistory *entity.DbBackupHistory) (*entity.BinlogInfo, error) {
binlogEnabled, err := svc.CheckBinlogEnabled(ctx)
if err != nil {
return nil, err
}
rowFormatEnabled, err := svc.CheckBinlogRowFormat(ctx)
if err != nil {
return nil, err
}
dir := svc.getDbBackupDir(backupHistory.DbInstanceId, backupHistory.DbBackupId)
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
return nil, err
@@ -95,10 +104,11 @@ func (svc *DbProgramMysql) Backup(ctx context.Context, backupHistory *entity.DbB
"--add-drop-database",
"--result-file", tmpFile,
"--single-transaction",
"--master-data=2",
"--databases", backupHistory.DbName,
}
if binlogEnabled && rowFormatEnabled {
args = append(args, "--master-data=2")
}
cmd := exec.CommandContext(ctx, svc.getMysqlBin().MysqldumpPath, args...)
logx.Debugf("backup database using mysqldump binary: %s", cmd.String())
if err := runCmd(cmd); err != nil {
@@ -115,7 +125,10 @@ func (svc *DbProgramMysql) Backup(ctx context.Context, backupHistory *entity.DbB
if err != nil {
return nil, err
}
binlogInfo, err := readBinlogInfoFromBackup(reader)
binlogInfo := &entity.BinlogInfo{}
if binlogEnabled && rowFormatEnabled {
binlogInfo, err = readBinlogInfoFromBackup(reader)
}
_ = reader.Close()
if err != nil {
return nil, errors.Wrapf(err, "从备份文件中读取 binlog 信息失败")
@@ -568,18 +581,24 @@ func (svc *DbProgramMysql) ReplayBinlog(ctx context.Context, originalDatabase, t
return errors.Wrap(err, "启动 mysqlbinlog 程序失败")
}
defer func() {
_ = mysqlbinlogCmd.Cancel()
if err := mysqlbinlogCmd.Wait(); err != nil {
if replayErr != nil {
replayErr = errors.Wrap(replayErr, "运行 mysqlbinlog 程序失败")
} else {
replayErr = errors.Errorf("运行 mysqlbinlog 程序失败: %s", mysqlbinlogErr.String())
if mysqlbinlogErr.Len() > 0 {
logx.Errorf("运行 mysqlbinlog 程序失败", mysqlbinlogErr.String())
if replayErr != nil {
replayErr = errors.Wrap(replayErr, "运行 mysqlbinlog 程序失败: "+mysqlbinlogErr.String())
} else {
replayErr = errors.Errorf("运行 mysqlbinlog 程序失败: %s", mysqlbinlogErr.String())
}
}
}
}()
if err := mysqlCmd.Start(); err != nil {
logx.Error("启动 mysql 程序失败")
return errors.Wrap(err, "启动 mysql 程序失败")
}
if err := mysqlCmd.Wait(); err != nil {
logx.Errorf("运行 mysql 程序失败: %s", mysqlErr.String())
return errors.Errorf("运行 mysql 程序失败: %s", mysqlErr.String())
}
@@ -606,27 +625,30 @@ func (svc *DbProgramMysql) getServerVariable(_ context.Context, varName string)
}
// CheckBinlogEnabled checks whether binlog is enabled for the current instance.
func (svc *DbProgramMysql) CheckBinlogEnabled(ctx context.Context) error {
func (svc *DbProgramMysql) CheckBinlogEnabled(ctx context.Context) (bool, error) {
value, err := svc.getServerVariable(ctx, "log_bin")
if err != nil {
return err
switch {
case err == nil:
return strings.ToUpper(value) == "ON", nil
case errors.Is(err, sql.ErrNoRows):
return false, nil
default:
return false, err
}
if strings.ToUpper(value) != "ON" {
return errors.Errorf("数据库未启用 binlog")
}
return nil
}
// CheckBinlogRowFormat checks whether the binlog format is ROW.
func (svc *DbProgramMysql) CheckBinlogRowFormat(ctx context.Context) error {
// CheckBinlogRowFormat checks whether the binlog format is ROW or MIXED.
func (svc *DbProgramMysql) CheckBinlogRowFormat(ctx context.Context) (bool, error) {
value, err := svc.getServerVariable(ctx, "binlog_format")
if err != nil {
return err
switch {
case err == nil:
value = strings.ToUpper(value)
return value == "ROW" || value == "MIXED", nil
case errors.Is(err, sql.ErrNoRows):
return false, nil
default:
return false, err
}
if strings.ToUpper(value) != "ROW" {
return errors.Errorf("binlog 格式 %s 不是行模式", value)
}
return nil
}
func runCmd(cmd *exec.Cmd) error {

View File

@@ -153,8 +153,9 @@ func (s *DbInstanceSuite) TestRestorePontInTime() {
s.createTable(dbNameBackupTest, tableNameRestorePITTest, "")
s.selectTable(dbNameBackupTest, tableNameRestorePITTest, "")
time.Sleep(time.Second)
targetTime := time.Now()
// 首次恢复数据库
firstTargetTime := time.Now()
s.dropTable(dbNameBackupTest, tableNameBackupTest, "")
s.selectTable(dbNameBackupTest, tableNameBackupTest, "运行 mysql 程序失败")
s.createTable(dbNameBackupTest, tableNameNoBackupTest, "")
@@ -165,7 +166,28 @@ func (s *DbInstanceSuite) TestRestorePontInTime() {
s.selectTable(dbNameBackupTest, tableNameRestorePITTest, "运行 mysql 程序失败")
s.selectTable(dbNameBackupTest, tableNameNoBackupTest, "运行 mysql 程序失败")
s.testReplayBinlog(backupHistory, targetTime)
s.testReplayBinlog(backupHistory, firstTargetTime)
s.selectTable(dbNameBackupTest, tableNameBackupTest, "")
s.selectTable(dbNameBackupTest, tableNameRestorePITTest, "")
s.selectTable(dbNameBackupTest, tableNameNoBackupTest, "运行 mysql 程序失败")
s.testBackup(backupHistory)
// 再次恢复数据库
secondTargetTime := time.Now()
s.dropTable(dbNameBackupTest, tableNameBackupTest, "")
s.selectTable(dbNameBackupTest, tableNameBackupTest, "运行 mysql 程序失败")
s.dropTable(dbNameBackupTest, tableNameRestorePITTest, "")
s.selectTable(dbNameBackupTest, tableNameRestorePITTest, "运行 mysql 程序失败")
s.createTable(dbNameBackupTest, tableNameNoBackupTest, "")
s.selectTable(dbNameBackupTest, tableNameNoBackupTest, "")
s.testRestore(backupHistory)
s.selectTable(dbNameBackupTest, tableNameBackupTest, "")
s.selectTable(dbNameBackupTest, tableNameRestorePITTest, "")
s.selectTable(dbNameBackupTest, tableNameNoBackupTest, "运行 mysql 程序失败")
s.testReplayBinlog(backupHistory, secondTargetTime)
s.selectTable(dbNameBackupTest, tableNameBackupTest, "")
s.selectTable(dbNameBackupTest, tableNameRestorePITTest, "")
s.selectTable(dbNameBackupTest, tableNameNoBackupTest, "运行 mysql 程序失败")

View File

@@ -21,7 +21,11 @@ const (
DbJobFailed
)
type DbJobType = string
type DbJobType string
func (typ DbJobType) String() string {
return string(typ)
}
const (
DbJobTypeBackup DbJobType = "db-backup"
@@ -121,10 +125,10 @@ func (d *DbJobBaseImpl) SetLastStatus(status DbJobStatus, err error) {
jobName = DbJobNameBackup
case DbJobTypeRestore:
jobName = DbJobNameRestore
case DbJobNameBinlog:
case DbJobTypeBinlog:
jobName = DbJobNameBinlog
default:
jobName = d.jobType
jobName = d.jobType.String()
}
d.LastStatus = status
var result = jobName + statusName

View File

@@ -9,7 +9,7 @@ import (
type DbBackupHistory interface {
base.Repo[*entity.DbBackupHistory]
// GetDbBackupHistories 分页获取数据备份历史
// GetHistories 分页获取数据备份历史
GetHistories(condition *entity.DbBackupHistoryQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error)
GetLatestHistory(instanceId uint64, dbName string, bi *entity.BinlogInfo) (*entity.DbBackupHistory, error)