diff --git a/mayfly_go_web/src/views/ops/db/DbBackupEdit.vue b/mayfly_go_web/src/views/ops/db/DbBackupEdit.vue
index 6ad17b2b..3ea74435 100644
--- a/mayfly_go_web/src/views/ops/db/DbBackupEdit.vue
+++ b/mayfly_go_web/src/views/ops/db/DbBackupEdit.vue
@@ -28,8 +28,11 @@
-
-
+
+
+
+
+
@@ -92,6 +95,14 @@ const rules = {
trigger: ['change', 'blur'],
},
],
+ maxSaveDays: [
+ {
+ required: true,
+ pattern: /^[0-9]\d*$/,
+ message: '请输入非负整数',
+ trigger: ['change', 'blur'],
+ },
+ ],
};
const backupForm: any = ref(null);
@@ -102,9 +113,10 @@ const state = reactive({
dbId: 0,
dbNames: '',
name: '',
- intervalDay: null,
+ intervalDay: 1,
startTime: null as any,
- repeated: null as any,
+ repeated: true,
+ maxSaveDays: 0,
},
btnLoading: false,
dbNamesSelected: [] as any,
@@ -137,12 +149,14 @@ const init = (data: any) => {
state.form.name = data.name;
state.form.intervalDay = data.intervalDay;
state.form.startTime = data.startTime;
+ state.form.maxSaveDays = data.maxSaveDays;
} else {
state.editOrCreate = false;
state.form.name = '';
- state.form.intervalDay = null;
+ state.form.intervalDay = 1;
const now = new Date();
state.form.startTime = new Date(now.getFullYear(), now.getMonth(), now.getDate() + 1);
+ state.form.maxSaveDays = 0;
getDbNamesWithoutBackup();
}
};
diff --git a/mayfly_go_web/src/views/ops/db/InstanceList.vue b/mayfly_go_web/src/views/ops/db/InstanceList.vue
index ecebb0cf..029ddd8b 100644
--- a/mayfly_go_web/src/views/ops/db/InstanceList.vue
+++ b/mayfly_go_web/src/views/ops/db/InstanceList.vue
@@ -25,6 +25,7 @@
详情
编辑
+ 删除
@@ -91,7 +92,7 @@ const columns = ref([
]);
// 该用户拥有的的操作列按钮权限
-const actionBtns = hasPerms([perms.saveInstance]);
+const actionBtns = hasPerms(Object.values(perms));
const actionColumn = TableColumn.new('action', '操作').isSlot().setMinWidth(110).fixedRight().alignCenter();
const pageTableRef: Ref = ref(null);
@@ -150,14 +151,26 @@ const editInstance = async (data: any) => {
state.instanceEditDialog.visible = true;
};
-const deleteInstance = async () => {
+const deleteInstance = async (data: any) => {
try {
- await ElMessageBox.confirm(`确定删除数据库实例【${state.selectionData.map((x: any) => x.name).join(', ')}】?`, '提示', {
+ let instanceName: string;
+ if (data) {
+ instanceName = data.name;
+ } else {
+ instanceName = state.selectionData.map((x: any) => x.name).join(', ');
+ }
+ await ElMessageBox.confirm(`确定删除数据库实例【${instanceName}】?`, '提示', {
confirmButtonText: '确定',
cancelButtonText: '取消',
type: 'warning',
});
- await dbApi.deleteInstance.request({ id: state.selectionData.map((x: any) => x.id).join(',') });
+ let instanceId: string;
+ if (data) {
+ instanceId = data.id;
+ } else {
+ instanceId = state.selectionData.map((x: any) => x.id).join(',');
+ }
+ await dbApi.deleteInstance.request({ id: instanceId });
ElMessage.success('删除成功');
search();
} catch (err) {
diff --git a/server/go.mod b/server/go.mod
index 7db5b09f..0e2ee3ff 100644
--- a/server/go.mod
+++ b/server/go.mod
@@ -38,7 +38,6 @@ require (
// gorm
gorm.io/driver/mysql v1.5.2
gorm.io/gorm v1.25.6
-
)
require (
diff --git a/server/internal/db/api/db.go b/server/internal/db/api/db.go
index fff38878..c4a8f679 100644
--- a/server/internal/db/api/db.go
+++ b/server/internal/db/api/db.go
@@ -78,8 +78,6 @@ func (d *Db) DeleteDb(rc *req.Ctx) {
d.DbApp.Delete(ctx, dbId)
// 删除该库的sql执行记录
d.DbSqlExecApp.DeleteBy(ctx, &entity.DbSqlExec{DbId: dbId})
-
- // todo delete restore task and histories
}
}
diff --git a/server/internal/db/api/db_backup.go b/server/internal/db/api/db_backup.go
index 6609a8c1..49566203 100644
--- a/server/internal/db/api/db_backup.go
+++ b/server/internal/db/api/db_backup.go
@@ -81,6 +81,7 @@ func (d *DbBackup) Update(rc *req.Ctx) {
job.Name = backupForm.Name
job.StartTime = backupForm.StartTime
job.Interval = backupForm.Interval
+ job.MaxSaveDays = backupForm.MaxSaveDays
biz.ErrIsNilAppendErr(d.backupApp.Update(rc.MetaCtx, job), "保存数据库备份任务失败: %v")
}
@@ -178,7 +179,7 @@ func (d *DbBackup) GetHistoryPageList(rc *req.Ctx) {
rc.ResData = res
}
-// RestoreHistories 删除数据库备份历史
+// RestoreHistories 从数据库备份历史中恢复数据库
// @router /api/dbs/:dbId/backup-histories/:backupHistoryId/restore [POST]
func (d *DbBackup) RestoreHistories(rc *req.Ctx) {
pm := ginx.PathParam(rc.GinCtx, "backupHistoryId")
diff --git a/server/internal/db/api/instance.go b/server/internal/db/api/db_instance.go
similarity index 87%
rename from server/internal/db/api/instance.go
rename to server/internal/db/api/db_instance.go
index c0cb4159..c95af839 100644
--- a/server/internal/db/api/instance.go
+++ b/server/internal/db/api/db_instance.go
@@ -87,16 +87,10 @@ func (d *Instance) DeleteInstance(rc *req.Ctx) {
for _, v := range ids {
value, err := strconv.Atoi(v)
- biz.ErrIsNilAppendErr(err, "string类型转换为int异常: %s")
+ biz.ErrIsNilAppendErr(err, "删除数据库实例失败: %s")
instanceId := uint64(value)
- if d.DbApp.Count(&entity.DbQuery{InstanceId: instanceId}) != 0 {
- instance, err := d.InstanceApp.GetById(new(entity.DbInstance), instanceId, "name")
- biz.ErrIsNil(err, "获取数据库实例错误,数据库实例ID为: %d", instance.Id)
- biz.IsTrue(false, "不能删除数据库实例【%s】,请先删除其关联的数据库资源。", instance.Name)
- }
- // todo check if backup task has been disabled and backup histories have been deleted
-
- d.InstanceApp.Delete(rc.MetaCtx, instanceId)
+ err = d.InstanceApp.Delete(rc.MetaCtx, instanceId)
+ biz.ErrIsNilAppendErr(err, "删除数据库实例失败: %s")
}
}
diff --git a/server/internal/db/api/form/db_backup.go b/server/internal/db/api/form/db_backup.go
index dacd0571..9c5747a5 100644
--- a/server/internal/db/api/form/db_backup.go
+++ b/server/internal/db/api/form/db_backup.go
@@ -14,6 +14,7 @@ type DbBackupForm struct {
Interval time.Duration `json:"-"` // 间隔时间: 为零表示单次执行,为正表示反复执行
IntervalDay uint64 `json:"intervalDay"` // 间隔天数: 为零表示单次执行,为正表示反复执行
Repeated bool `json:"repeated"` // 是否重复执行
+ MaxSaveDays int `json:"maxSaveDays"` // 数据库备份历史保留天数,过期将自动删除
}
func (restore *DbBackupForm) UnmarshalJSON(data []byte) error {
diff --git a/server/internal/db/api/vo/db_backup.go b/server/internal/db/api/vo/db_backup.go
index fdfe17e7..6579398a 100644
--- a/server/internal/db/api/vo/db_backup.go
+++ b/server/internal/db/api/vo/db_backup.go
@@ -15,6 +15,7 @@ type DbBackup struct {
StartTime time.Time `json:"startTime"` // 开始时间
Interval time.Duration `json:"-"` // 间隔时间
IntervalDay uint64 `json:"intervalDay" gorm:"-"` // 间隔天数
+ MaxSaveDays int `json:"maxSaveDays"` // 数据库备份历史保留天数,过期将自动删除
Enabled bool `json:"enabled"` // 是否启用
EnabledDesc string `json:"enabledDesc"` // 启用状态描述
LastTime timex.NullTime `json:"lastTime"` // 最近一次执行时间
@@ -29,9 +30,9 @@ func (backup *DbBackup) MarshalJSON() ([]byte, error) {
backup.IntervalDay = uint64(backup.Interval / time.Hour / 24)
if len(backup.EnabledDesc) == 0 {
if backup.Enabled {
- backup.EnabledDesc = "任务已启用"
+ backup.EnabledDesc = "已启用"
} else {
- backup.EnabledDesc = "任务已禁用"
+ backup.EnabledDesc = "已禁用"
}
}
return json.Marshal((*dbBackup)(backup))
diff --git a/server/internal/db/api/vo/db_restore.go b/server/internal/db/api/vo/db_restore.go
index 99885c45..ae188cf6 100644
--- a/server/internal/db/api/vo/db_restore.go
+++ b/server/internal/db/api/vo/db_restore.go
@@ -30,9 +30,9 @@ func (restore *DbRestore) MarshalJSON() ([]byte, error) {
restore.IntervalDay = uint64(restore.Interval / time.Hour / 24)
if len(restore.EnabledDesc) == 0 {
if restore.Enabled {
- restore.EnabledDesc = "任务已启用"
+ restore.EnabledDesc = "已启用"
} else {
- restore.EnabledDesc = "任务已禁用"
+ restore.EnabledDesc = "已禁用"
}
}
return json.Marshal((*dbBackup)(restore))
diff --git a/server/internal/db/application/application.go b/server/internal/db/application/application.go
index a57a335a..adb67ec4 100644
--- a/server/internal/db/application/application.go
+++ b/server/internal/db/application/application.go
@@ -25,10 +25,13 @@ func InitIoc() {
func Init() {
sync.OnceFunc(func() {
if err := GetDbBackupApp().Init(); err != nil {
- panic(fmt.Sprintf("初始化 dbBackupApp 失败: %v", err))
+ panic(fmt.Sprintf("初始化 DbBackupApp 失败: %v", err))
}
if err := GetDbRestoreApp().Init(); err != nil {
- panic(fmt.Sprintf("初始化 dbRestoreApp 失败: %v", err))
+ panic(fmt.Sprintf("初始化 DbRestoreApp 失败: %v", err))
+ }
+ if err := GetDbBinlogApp().Init(); err != nil {
+ panic(fmt.Sprintf("初始化 DbBinlogApp 失败: %v", err))
}
GetDataSyncTaskApp().InitCronJob()
})()
diff --git a/server/internal/db/application/db_backup.go b/server/internal/db/application/db_backup.go
index 29a48208..25c0c267 100644
--- a/server/internal/db/application/db_backup.go
+++ b/server/internal/db/application/db_backup.go
@@ -6,15 +6,24 @@ import (
"errors"
"fmt"
"gorm.io/gorm"
+ "math"
"mayfly-go/internal/db/domain/entity"
"mayfly-go/internal/db/domain/repository"
"mayfly-go/pkg/logx"
"mayfly-go/pkg/model"
+ "mayfly-go/pkg/utils/timex"
"sync"
+ "time"
"github.com/google/uuid"
)
+const maxBackupHistoryDays = 30
+
+var (
+ errRestoringBackupHistory = errors.New("正在从备份历史中恢复数据库")
+)
+
type DbBackupApp struct {
scheduler *dbScheduler `inject:"DbScheduler"`
backupRepo repository.DbBackup `inject:"DbBackupRepo"`
@@ -22,6 +31,10 @@ type DbBackupApp struct {
restoreRepo repository.DbRestore `inject:"DbRestoreRepo"`
dbApp Db `inject:"DbApp"`
mutex sync.Mutex
+ closed chan struct{}
+ wg sync.WaitGroup
+ ctx context.Context
+ cancel context.CancelFunc
}
func (app *DbBackupApp) Init() error {
@@ -32,11 +45,68 @@ func (app *DbBackupApp) Init() error {
if err := app.scheduler.AddJob(context.Background(), jobs); err != nil {
return err
}
+ app.ctx, app.cancel = context.WithCancel(context.Background())
+ app.wg.Add(1)
+ go func() {
+ defer app.wg.Done()
+ for app.ctx.Err() == nil {
+ if err := app.prune(app.ctx); err != nil {
+ logx.Errorf("清理数据库备份历史失败: %s", err.Error())
+ timex.SleepWithContext(app.ctx, time.Minute*15)
+ continue
+ }
+ timex.SleepWithContext(app.ctx, time.Hour*24)
+ }
+ }()
+ return nil
+}
+
+func (app *DbBackupApp) prune(ctx context.Context) error {
+ var jobs []*entity.DbBackup
+ if err := app.backupRepo.ListByCond(map[string]any{}, &jobs); err != nil {
+ return err
+ }
+ for _, job := range jobs {
+ if ctx.Err() != nil {
+ return nil
+ }
+ var histories []*entity.DbBackupHistory
+ historyCond := map[string]any{
+ "db_backup_id": job.Id,
+ }
+ if err := app.backupHistoryRepo.ListByCondOrder(historyCond, &histories, "id"); err != nil {
+ return err
+ }
+ expiringTime := time.Now().Add(-math.MaxInt64)
+ if job.MaxSaveDays > 0 {
+ expiringTime = time.Now().Add(-time.Hour * 24 * time.Duration(job.MaxSaveDays+1))
+ }
+ for _, history := range histories {
+ if ctx.Err() != nil {
+ return nil
+ }
+ if history.CreateTime.After(expiringTime) {
+ break
+ }
+ err := app.DeleteHistory(ctx, history.Id)
+ if errors.Is(err, errRestoringBackupHistory) {
+ break
+ }
+ if err != nil {
+ return err
+ }
+ }
+ }
return nil
}
func (app *DbBackupApp) Close() {
app.scheduler.Close()
+ if app.cancel != nil {
+ app.cancel()
+ app.cancel = nil
+ }
+ app.wg.Wait()
}
func (app *DbBackupApp) Create(ctx context.Context, jobs []*entity.DbBackup) error {
@@ -61,7 +131,6 @@ func (app *DbBackupApp) Update(ctx context.Context, job *entity.DbBackup) error
}
func (app *DbBackupApp) Delete(ctx context.Context, jobId uint64) error {
- // todo: 删除数据库备份历史文件
app.mutex.Lock()
defer app.mutex.Unlock()
@@ -76,7 +145,7 @@ func (app *DbBackupApp) Delete(ctx context.Context, jobId uint64) error {
default:
return err
case err == nil:
- return fmt.Errorf("数据库备份存在历史记录【%s】,无法删除该任务", history.Name)
+ return fmt.Errorf("请先删除关联的数据库备份历史【%s】", history.Name)
case errors.Is(err, gorm.ErrRecordNotFound):
}
if err := app.backupRepo.DeleteById(ctx, jobId); err != nil {
@@ -184,27 +253,18 @@ func NewIncUUID() (uuid.UUID, error) {
}
func (app *DbBackupApp) DeleteHistory(ctx context.Context, historyId uint64) (retErr error) {
- // todo: 删除数据库备份历史文件
app.mutex.Lock()
defer app.mutex.Unlock()
+ if _, err := app.backupHistoryRepo.UpdateDeleting(false, historyId); err != nil {
+ return err
+ }
ok, err := app.backupHistoryRepo.UpdateDeleting(true, historyId)
if err != nil {
return err
}
- defer func() {
- _, err = app.backupHistoryRepo.UpdateDeleting(false, historyId)
- if err == nil {
- return
- }
- if retErr == nil {
- retErr = err
- return
- }
- retErr = fmt.Errorf("%w, %w", retErr, err)
- }()
if !ok {
- return errors.New("正在从备份历史中恢复数据库")
+ return errRestoringBackupHistory
}
job := &entity.DbBackupHistory{}
if err := app.backupHistoryRepo.GetById(job, historyId); err != nil {
@@ -214,7 +274,10 @@ func (app *DbBackupApp) DeleteHistory(ctx context.Context, historyId uint64) (re
if err != nil {
return err
}
- dbProgram := conn.GetDialect().GetDbProgram()
+ dbProgram, err := conn.GetDialect().GetDbProgram()
+ if err != nil {
+ return err
+ }
if err := dbProgram.RemoveBackupHistory(ctx, job.DbBackupId, job.Uuid); err != nil {
return err
}
diff --git a/server/internal/db/application/db_binlog.go b/server/internal/db/application/db_binlog.go
index 844a110d..121f0849 100644
--- a/server/internal/db/application/db_binlog.go
+++ b/server/internal/db/application/db_binlog.go
@@ -2,6 +2,7 @@ package application
import (
"context"
+ "math"
"mayfly-go/internal/db/domain/entity"
"mayfly-go/internal/db/domain/repository"
"mayfly-go/pkg/logx"
@@ -11,9 +12,13 @@ import (
)
type DbBinlogApp struct {
- scheduler *dbScheduler `inject:"DbScheduler"`
- binlogRepo repository.DbBinlog `inject:"DbBinlogRepo"`
- backupRepo repository.DbBackup `inject:"DbBackupRepo"`
+ scheduler *dbScheduler `inject:"DbScheduler"`
+ binlogRepo repository.DbBinlog `inject:"DbBinlogRepo"`
+ binlogHistoryRepo repository.DbBinlogHistory `inject:"DbBinlogHistoryRepo"`
+ backupRepo repository.DbBackup `inject:"DbBackupRepo"`
+ backupHistoryRepo repository.DbBackupHistory `inject:"DbBackupHistoryRepo"`
+ instanceRepo repository.Instance `inject:"DbInstanceRepo"`
+ dbApp Db `inject:"DbApp"`
context context.Context
cancel context.CancelFunc
@@ -26,41 +31,113 @@ func newDbBinlogApp() *DbBinlogApp {
context: ctx,
cancel: cancel,
}
- svc.waitGroup.Add(1)
- go svc.run()
return svc
}
+func (app *DbBinlogApp) Init() error {
+ app.context, app.cancel = context.WithCancel(context.Background())
+ app.waitGroup.Add(1)
+ go app.run()
+ return nil
+}
+
func (app *DbBinlogApp) run() {
defer app.waitGroup.Done()
- // todo: 实现 binlog 并发下载
- timex.SleepWithContext(app.context, time.Minute)
- for !app.closed() {
- jobs, err := app.loadJobs()
- if err != nil {
- logx.Errorf("DbBinlogApp: 加载 BINLOG 同步任务失败: %s", err.Error())
+ for app.context.Err() == nil {
+ if err := app.fetchBinlog(app.context); err != nil {
timex.SleepWithContext(app.context, time.Minute)
continue
}
- if app.closed() {
- break
- }
- if err := app.scheduler.AddJob(app.context, jobs); err != nil {
- logx.Error("DbBinlogApp: 添加 BINLOG 同步任务失败: ", err.Error())
+ if err := app.pruneBinlog(app.context); err != nil {
+ timex.SleepWithContext(app.context, time.Minute)
+ continue
}
timex.SleepWithContext(app.context, entity.BinlogDownloadInterval)
}
}
-func (app *DbBinlogApp) loadJobs() ([]*entity.DbBinlog, error) {
+func (app *DbBinlogApp) fetchBinlog(ctx context.Context) error {
+ jobs, err := app.loadJobs(ctx)
+ if err != nil {
+ logx.Errorf("DbBinlogApp: 加载 BINLOG 同步任务失败: %s", err.Error())
+ timex.SleepWithContext(app.context, time.Minute)
+ return err
+ }
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
+ if err := app.scheduler.AddJob(app.context, jobs); err != nil {
+ logx.Error("DbBinlogApp: 添加 BINLOG 同步任务失败: ", err.Error())
+ return err
+ }
+ return nil
+}
+
+func (app *DbBinlogApp) pruneBinlog(ctx context.Context) error {
+ var jobs []*entity.DbBinlog
+ if err := app.binlogRepo.ListByCond(map[string]any{}, &jobs); err != nil {
+ logx.Error("DbBinlogApp: 获取 BINLOG 同步任务失败: ", err.Error())
+ return err
+ }
+ for _, instance := range jobs {
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
+ var histories []*entity.DbBinlogHistory
+ backupHistory, backupHistoryExists, err := app.backupHistoryRepo.GetEarliestHistoryForBinlog(instance.Id)
+ if err != nil {
+ logx.Errorf("DbBinlogApp: 获取数据库备份历史失败: %s", err.Error())
+ return err
+ }
+ var binlogSeq int64 = math.MaxInt64
+ if backupHistoryExists {
+ binlogSeq = backupHistory.BinlogSequence
+ }
+ if err := app.binlogHistoryRepo.GetHistoriesBeforeSequence(ctx, instance.Id, binlogSeq, &histories); err != nil {
+ logx.Errorf("DbBinlogApp: 获取数据库 BINLOG 历史失败: %s", err.Error())
+ return err
+ }
+ conn, err := app.dbApp.GetDbConnByInstanceId(instance.Id)
+ if err != nil {
+ logx.Errorf("DbBinlogApp: 创建数据库连接失败: %s", err.Error())
+ return err
+ }
+ dbProgram, err := conn.GetDialect().GetDbProgram()
+ if err != nil {
+ logx.Errorf("DbBinlogApp: 获取数据库备份与恢复程序失败: %s", err.Error())
+ return err
+ }
+ for i, history := range histories {
+ // todo: 在避免并发访问的前提下删除本地最新的 BINLOG 文件
+ if !backupHistoryExists && i == len(histories)-1 {
+ // 暂不删除本地最新的 BINLOG 文件
+ break
+ }
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
+ if err := dbProgram.PruneBinlog(history); err != nil {
+ logx.Errorf("清理 BINLOG 文件失败: %v", err)
+ continue
+ }
+ if err := app.binlogHistoryRepo.DeleteById(ctx, history.Id); err != nil {
+ logx.Errorf("删除 BINLOG 历史失败: %v", err)
+ continue
+ }
+ }
+ }
+ return nil
+}
+
+func (app *DbBinlogApp) loadJobs(ctx context.Context) ([]*entity.DbBinlog, error) {
var instanceIds []uint64
if err := app.backupRepo.ListDbInstances(true, true, &instanceIds); err != nil {
return nil, err
}
jobs := make([]*entity.DbBinlog, 0, len(instanceIds))
for _, id := range instanceIds {
- if app.closed() {
+ if ctx.Err() != nil {
break
}
binlog := entity.NewDbBinlog(id)
@@ -73,14 +150,15 @@ func (app *DbBinlogApp) loadJobs() ([]*entity.DbBinlog, error) {
}
func (app *DbBinlogApp) Close() {
- app.cancel()
+ cancel := app.cancel
+ if cancel == nil {
+ return
+ }
+ app.cancel = nil
+ cancel()
app.waitGroup.Wait()
}
-func (app *DbBinlogApp) closed() bool {
- return app.context.Err() != nil
-}
-
func (app *DbBinlogApp) AddJobIfNotExists(ctx context.Context, job *entity.DbBinlog) error {
if err := app.binlogRepo.AddJobIfNotExists(ctx, job); err != nil {
return err
@@ -90,11 +168,3 @@ func (app *DbBinlogApp) AddJobIfNotExists(ctx context.Context, job *entity.DbBin
}
return nil
}
-
-func (app *DbBinlogApp) Delete(ctx context.Context, jobId uint64) error {
- // todo: 删除 Binlog 历史文件
- if err := app.binlogRepo.DeleteById(ctx, jobId); err != nil {
- return err
- }
- return nil
-}
diff --git a/server/internal/db/application/instance.go b/server/internal/db/application/db_instance.go
similarity index 66%
rename from server/internal/db/application/instance.go
rename to server/internal/db/application/db_instance.go
index 94571cc1..39574b7b 100644
--- a/server/internal/db/application/instance.go
+++ b/server/internal/db/application/db_instance.go
@@ -2,11 +2,14 @@ package application
import (
"context"
+ "errors"
+ "gorm.io/gorm"
"mayfly-go/internal/db/dbm"
"mayfly-go/internal/db/dbm/dbi"
"mayfly-go/internal/db/domain/entity"
"mayfly-go/internal/db/domain/repository"
"mayfly-go/pkg/base"
+ "mayfly-go/pkg/biz"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/model"
)
@@ -32,6 +35,10 @@ type Instance interface {
type instanceAppImpl struct {
base.AppImpl[*entity.DbInstance, repository.Instance]
+
+ dbApp Db `inject:"DbApp"`
+ backupApp *DbBackupApp `inject:"DbBackupApp"`
+ restoreApp *DbRestoreApp `inject:"DbRestoreApp"`
}
// 注入DbInstanceRepo
@@ -96,8 +103,50 @@ func (app *instanceAppImpl) Save(ctx context.Context, instanceEntity *entity.DbI
return app.UpdateById(ctx, instanceEntity)
}
-func (app *instanceAppImpl) Delete(ctx context.Context, id uint64) error {
- return app.DeleteById(ctx, id)
+func (app *instanceAppImpl) Delete(ctx context.Context, instanceId uint64) error {
+ instance, err := app.GetById(new(entity.DbInstance), instanceId, "name")
+ biz.ErrIsNil(err, "获取数据库实例错误,数据库实例ID为: %d", instance.Id)
+
+ restore := &entity.DbRestore{
+ DbInstanceId: instanceId,
+ }
+ err = app.restoreApp.restoreRepo.GetBy(restore)
+ switch {
+ case err == nil:
+ biz.ErrNotNil(err, "不能删除数据库实例【%s】,请先删除关联的数据库恢复任务。", instance.Name)
+ case errors.Is(err, gorm.ErrRecordNotFound):
+ break
+ default:
+ biz.ErrIsNil(err, "删除数据库实例失败: %v", err)
+ }
+
+ backup := &entity.DbBackup{
+ DbInstanceId: instanceId,
+ }
+ err = app.backupApp.backupRepo.GetBy(backup)
+ switch {
+ case err == nil:
+ biz.ErrNotNil(err, "不能删除数据库实例【%s】,请先删除关联的数据库备份任务。", instance.Name)
+ case errors.Is(err, gorm.ErrRecordNotFound):
+ break
+ default:
+ biz.ErrIsNil(err, "删除数据库实例失败: %v", err)
+ }
+
+ db := &entity.Db{
+ InstanceId: instanceId,
+ }
+ err = app.dbApp.GetBy(db)
+ switch {
+ case err == nil:
+ biz.ErrNotNil(err, "不能删除数据库实例【%s】,请先删除关联的数据库资源。", instance.Name)
+ case errors.Is(err, gorm.ErrRecordNotFound):
+ break
+ default:
+ biz.ErrIsNil(err, "删除数据库实例失败: %v", err)
+ }
+
+ return app.DeleteById(ctx, instanceId)
}
func (app *instanceAppImpl) GetDatabases(ed *entity.DbInstance) ([]string, error) {
diff --git a/server/internal/db/application/db_restore.go b/server/internal/db/application/db_restore.go
index cf57d312..1a9288d5 100644
--- a/server/internal/db/application/db_restore.go
+++ b/server/internal/db/application/db_restore.go
@@ -55,7 +55,6 @@ func (app *DbRestoreApp) Update(ctx context.Context, job *entity.DbRestore) erro
}
func (app *DbRestoreApp) Delete(ctx context.Context, jobId uint64) error {
- // todo: 删除数据库恢复历史文件
app.mutex.Lock()
defer app.mutex.Unlock()
diff --git a/server/internal/db/application/db_scheduler.go b/server/internal/db/application/db_scheduler.go
index 930d7fd5..45700547 100644
--- a/server/internal/db/application/db_scheduler.go
+++ b/server/internal/db/application/db_scheduler.go
@@ -4,12 +4,14 @@ import (
"context"
"errors"
"fmt"
+ "golang.org/x/sync/singleflight"
"gorm.io/gorm"
"mayfly-go/internal/db/dbm/dbi"
"mayfly-go/internal/db/domain/entity"
"mayfly-go/internal/db/domain/repository"
"mayfly-go/pkg/runner"
"reflect"
+ "strconv"
"sync"
"time"
)
@@ -28,6 +30,7 @@ type dbScheduler struct {
restoreHistoryRepo repository.DbRestoreHistory `inject:"DbRestoreHistoryRepo"`
binlogRepo repository.DbBinlog `inject:"DbBinlogRepo"`
binlogHistoryRepo repository.DbBinlogHistory `inject:"DbBinlogHistoryRepo"`
+ sfGroup singleflight.Group
}
func newDbScheduler() *dbScheduler {
@@ -76,7 +79,6 @@ func (s *dbScheduler) AddJob(ctx context.Context, jobs any) error {
}
func (s *dbScheduler) RemoveJob(ctx context.Context, jobType entity.DbJobType, jobId uint64) error {
- // todo: 删除数据库备份历史文件
s.mutex.Lock()
defer s.mutex.Unlock()
@@ -110,12 +112,11 @@ func (s *dbScheduler) StartJobNow(ctx context.Context, job entity.DbJob) error {
return nil
}
-func (s *dbScheduler) backup(ctx context.Context, dbProgram dbi.DbProgram, job entity.DbJob) error {
+func (s *dbScheduler) backup(ctx context.Context, dbProgram dbi.DbProgram, backup *entity.DbBackup) error {
id, err := NewIncUUID()
if err != nil {
return err
}
- backup := job.(*entity.DbBackup)
history := &entity.DbBackupHistory{
Uuid: id.String(),
DbBackupId: backup.Id,
@@ -143,45 +144,29 @@ func (s *dbScheduler) backup(ctx context.Context, dbProgram dbi.DbProgram, job e
return nil
}
-func (s *dbScheduler) restore(ctx context.Context, dbProgram dbi.DbProgram, job entity.DbJob) error {
- restore := job.(*entity.DbRestore)
+func (s *dbScheduler) singleFlightFetchBinlog(ctx context.Context, dbProgram dbi.DbProgram, instanceId uint64, targetTime time.Time) error {
+ key := strconv.FormatUint(instanceId, 10)
+ for ctx.Err() == nil {
+ c := s.sfGroup.DoChan(key, func() (interface{}, error) {
+ if err := s.fetchBinlog(ctx, dbProgram, instanceId, true, targetTime); err != nil {
+ return targetTime, err
+ }
+ return targetTime, nil
+ })
+ select {
+ case res := <-c:
+ if targetTime.Compare(res.Val.(time.Time)) <= 0 {
+ return res.Err
+ }
+ case <-ctx.Done():
+ }
+ }
+ return ctx.Err()
+}
+
+func (s *dbScheduler) restore(ctx context.Context, dbProgram dbi.DbProgram, restore *entity.DbRestore) error {
if restore.PointInTime.Valid {
- //if enabled, err := dbProgram.CheckBinlogEnabled(ctx); err != nil {
- // return err
- //} else if !enabled {
- // return errors.New("数据库未启用 BINLOG")
- //}
- //if enabled, err := dbProgram.CheckBinlogRowFormat(ctx); err != nil {
- // return err
- //} else if !enabled {
- // return errors.New("数据库未启用 BINLOG 行模式")
- //}
- //
- //latestBinlogSequence, earliestBackupSequence := int64(-1), int64(-1)
- //binlogHistory, ok, err := s.binlogHistoryRepo.GetLatestHistory(restore.DbInstanceId)
- //if err != nil {
- // return err
- //}
- //if ok {
- // latestBinlogSequence = binlogHistory.Sequence
- //} else {
- // backupHistory, ok, err := s.backupHistoryRepo.GetEarliestHistory(restore.DbInstanceId)
- // if err != nil {
- // return err
- // }
- // if !ok {
- // return nil
- // }
- // earliestBackupSequence = backupHistory.BinlogSequence
- //}
- //binlogFiles, err := dbProgram.FetchBinlogs(ctx, true, earliestBackupSequence, latestBinlogSequence)
- //if err != nil {
- // return err
- //}
- //if err := s.binlogHistoryRepo.InsertWithBinlogFiles(ctx, restore.DbInstanceId, binlogFiles); err != nil {
- // return err
- //}
- if err := s.fetchBinlog(ctx, dbProgram, job.GetInstanceId(), true); err != nil {
+ if err := s.fetchBinlog(ctx, dbProgram, restore.DbInstanceId, true, restore.PointInTime.Time); err != nil {
return err
}
if err := s.restorePointInTime(ctx, dbProgram, restore); err != nil {
@@ -210,102 +195,68 @@ func (s *dbScheduler) restore(ctx context.Context, dbProgram dbi.DbProgram, job
return nil
}
-//func (s *dbScheduler) updateLastStatus(ctx context.Context, job entity.DbJob) error {
-// switch typ := job.GetJobType(); typ {
-// case entity.DbJobTypeBackup:
-// return s.backupRepo.UpdateLastStatus(ctx, job)
-// case entity.DbJobTypeRestore:
-// return s.restoreRepo.UpdateLastStatus(ctx, job)
-// case entity.DbJobTypeBinlog:
-// return s.binlogRepo.UpdateLastStatus(ctx, job)
-// default:
-// panic(fmt.Errorf("无效的数据库任务类型: %v", typ))
-// }
-//}
-
func (s *dbScheduler) updateJob(ctx context.Context, job entity.DbJob) error {
- switch typ := job.GetJobType(); typ {
- case entity.DbJobTypeBackup:
- return s.backupRepo.UpdateById(ctx, job)
- case entity.DbJobTypeRestore:
- return s.restoreRepo.UpdateById(ctx, job)
- case entity.DbJobTypeBinlog:
- return s.binlogRepo.UpdateById(ctx, job)
+ switch t := job.(type) {
+ case *entity.DbBackup:
+ return s.backupRepo.UpdateById(ctx, t)
+ case *entity.DbRestore:
+ return s.restoreRepo.UpdateById(ctx, t)
+ case *entity.DbBinlog:
+ return s.binlogRepo.UpdateById(ctx, t)
default:
- return fmt.Errorf("无效的数据库任务类型: %v", typ)
+ return fmt.Errorf("无效的数据库任务类型: %T", t)
}
}
func (s *dbScheduler) runJob(ctx context.Context, job entity.DbJob) error {
- //job.SetLastStatus(entity.DbJobRunning, nil)
- //if err := s.updateLastStatus(ctx, job); err != nil {
- // logx.Errorf("failed to update job status: %v", err)
- // return
- //}
-
- //var errRun error
conn, err := s.dbApp.GetDbConnByInstanceId(job.GetInstanceId())
if err != nil {
return err
}
- dbProgram := conn.GetDialect().GetDbProgram()
- switch typ := job.GetJobType(); typ {
- case entity.DbJobTypeBackup:
- return s.backup(ctx, dbProgram, job)
- case entity.DbJobTypeRestore:
- return s.restore(ctx, dbProgram, job)
- case entity.DbJobTypeBinlog:
- return s.fetchBinlog(ctx, dbProgram, job.GetInstanceId(), false)
- default:
- return fmt.Errorf("无效的数据库任务类型: %v", typ)
+ dbProgram, err := conn.GetDialect().GetDbProgram()
+ if err != nil {
+ return err
+ }
+ switch t := job.(type) {
+ case *entity.DbBackup:
+ return s.backup(ctx, dbProgram, t)
+ case *entity.DbRestore:
+ return s.restore(ctx, dbProgram, t)
+ case *entity.DbBinlog:
+ return s.fetchBinlog(ctx, dbProgram, t.DbInstanceId, false, time.Now())
+ default:
+ return fmt.Errorf("无效的数据库任务类型: %T", t)
}
- //status := entity.DbJobSuccess
- //if errRun != nil {
- // status = entity.DbJobFailed
- //}
- //job.SetLastStatus(status, errRun)
- //if err := s.updateLastStatus(ctx, job); err != nil {
- // logx.Errorf("failed to update job status: %v", err)
- // return
- //}
}
-func (s *dbScheduler) runnableJob(job entity.DbJob, next runner.NextJobFunc[entity.DbJob]) (bool, error) {
+func (s *dbScheduler) runnableJob(job entity.DbJob, nextRunning runner.NextJobFunc[entity.DbJob]) (bool, error) {
if job.IsExpired() {
return false, runner.ErrJobExpired
}
const maxCountByInstanceId = 4
const maxCountByDbName = 1
var countByInstanceId, countByDbName int
- for item, ok := next(); ok; item, ok = next() {
+ for item, ok := nextRunning(); ok; item, ok = nextRunning() {
if job.GetInstanceId() == item.GetInstanceId() {
countByInstanceId++
if countByInstanceId >= maxCountByInstanceId {
return false, nil
}
-
- if relatedToBinlog(job.GetJobType()) {
- // todo: 恢复数据库前触发 BINLOG 同步,BINLOG 同步完成后才能恢复数据库
- if relatedToBinlog(item.GetJobType()) {
- return false, nil
- }
- }
-
if job.GetDbName() == item.GetDbName() {
countByDbName++
if countByDbName >= maxCountByDbName {
return false, nil
}
}
+ if (job.GetJobType() == entity.DbJobTypeBinlog && item.GetJobType() == entity.DbJobTypeRestore) ||
+ (job.GetJobType() == entity.DbJobTypeRestore && item.GetJobType() == entity.DbJobTypeBinlog) {
+ return false, nil
+ }
}
}
return true, nil
}
-func relatedToBinlog(typ entity.DbJobType) bool {
- return typ == entity.DbJobTypeRestore || typ == entity.DbJobTypeBinlog
-}
-
func (s *dbScheduler) restorePointInTime(ctx context.Context, dbProgram dbi.DbProgram, job *entity.DbRestore) error {
binlogHistory, err := s.binlogHistoryRepo.GetHistoryByTime(job.DbInstanceId, job.PointInTime.Time)
if err != nil {
@@ -320,7 +271,7 @@ func (s *dbScheduler) restorePointInTime(ctx context.Context, dbProgram dbi.DbPr
Sequence: binlogHistory.Sequence,
Position: position,
}
- backupHistory, err := s.backupHistoryRepo.GetLatestHistory(job.DbInstanceId, job.DbName, target)
+ backupHistory, err := s.backupHistoryRepo.GetLatestHistoryForBinlog(job.DbInstanceId, job.DbName, target)
if err != nil {
return err
}
@@ -364,6 +315,9 @@ func (s *dbScheduler) restorePointInTime(ctx context.Context, dbProgram dbi.DbPr
}
func (s *dbScheduler) restoreBackupHistory(ctx context.Context, program dbi.DbProgram, backupHistory *entity.DbBackupHistory) (retErr error) {
+ if _, err := s.backupHistoryRepo.UpdateRestoring(false, backupHistory.Id); err != nil {
+ return err
+ }
ok, err := s.backupHistoryRepo.UpdateRestoring(true, backupHistory.Id)
if err != nil {
return err
@@ -385,7 +339,7 @@ func (s *dbScheduler) restoreBackupHistory(ctx context.Context, program dbi.DbPr
return program.RestoreBackupHistory(ctx, backupHistory.DbName, backupHistory.DbBackupId, backupHistory.Uuid)
}
-func (s *dbScheduler) fetchBinlog(ctx context.Context, dbProgram dbi.DbProgram, instanceId uint64, downloadLatestBinlogFile bool) error {
+func (s *dbScheduler) fetchBinlog(ctx context.Context, dbProgram dbi.DbProgram, instanceId uint64, downloadLatestBinlogFile bool, targetTime time.Time) error {
if enabled, err := dbProgram.CheckBinlogEnabled(ctx); err != nil {
return err
} else if !enabled {
@@ -397,15 +351,17 @@ func (s *dbScheduler) fetchBinlog(ctx context.Context, dbProgram dbi.DbProgram,
return errors.New("数据库未启用 BINLOG 行模式")
}
- latestBinlogSequence, earliestBackupSequence := int64(-1), int64(-1)
+ earliestBackupSequence := int64(-1)
binlogHistory, ok, err := s.binlogHistoryRepo.GetLatestHistory(instanceId)
if err != nil {
return err
}
- if ok {
- latestBinlogSequence = binlogHistory.Sequence
- } else {
- backupHistory, ok, err := s.backupHistoryRepo.GetEarliestHistory(instanceId)
+ if downloadLatestBinlogFile && targetTime.Before(binlogHistory.LastEventTime) {
+ return nil
+ }
+
+ if !ok {
+ backupHistory, ok, err := s.backupHistoryRepo.GetEarliestHistoryForBinlog(instanceId)
if err != nil {
return err
}
@@ -414,7 +370,9 @@ func (s *dbScheduler) fetchBinlog(ctx context.Context, dbProgram dbi.DbProgram,
}
earliestBackupSequence = backupHistory.BinlogSequence
}
- binlogFiles, err := dbProgram.FetchBinlogs(ctx, downloadLatestBinlogFile, earliestBackupSequence, latestBinlogSequence)
+
+ // todo: 将循环从 dbProgram.FetchBinlogs 中提取出来,实现 BINLOG 同步成功后逐一保存 binlogHistory
+ binlogFiles, err := dbProgram.FetchBinlogs(ctx, downloadLatestBinlogFile, earliestBackupSequence, binlogHistory)
if err != nil {
return err
}
diff --git a/server/internal/db/dbm/dbi/db_program.go b/server/internal/db/dbm/dbi/db_program.go
index 9d76319c..30dfa293 100644
--- a/server/internal/db/dbm/dbi/db_program.go
+++ b/server/internal/db/dbm/dbi/db_program.go
@@ -13,7 +13,7 @@ type DbProgram interface {
Backup(ctx context.Context, backupHistory *entity.DbBackupHistory) (*entity.BinlogInfo, error)
- FetchBinlogs(ctx context.Context, downloadLatestBinlogFile bool, earliestBackupSequence, latestBinlogSequence int64) ([]*entity.BinlogFile, error)
+ FetchBinlogs(ctx context.Context, downloadLatestBinlogFile bool, earliestBackupSequence int64, latestBinlogHistory *entity.DbBinlogHistory) ([]*entity.BinlogFile, error)
ReplayBinlog(ctx context.Context, originalDatabase, targetDatabase string, restoreInfo *RestoreInfo) error
@@ -22,6 +22,8 @@ type DbProgram interface {
RemoveBackupHistory(ctx context.Context, dbBackupId uint64, dbBackupHistoryUuid string) error
GetBinlogEventPositionAtOrAfterTime(ctx context.Context, binlogName string, targetTime time.Time) (position int64, parseErr error)
+
+ PruneBinlog(history *entity.DbBinlogHistory) error
}
type RestoreInfo struct {
diff --git a/server/internal/db/dbm/dbi/db_type.go b/server/internal/db/dbm/dbi/db_type.go
index 1a020fe6..548c90b7 100644
--- a/server/internal/db/dbm/dbi/db_type.go
+++ b/server/internal/db/dbm/dbi/db_type.go
@@ -141,3 +141,12 @@ func (dbType DbType) StmtUseDatabase(dbName string) string {
return ""
}
}
+
+func (dbType DbType) SupportingBackup() bool {
+ switch dbType {
+ case DbTypeMysql, DbTypeMariadb:
+ return true
+ default:
+ return false
+ }
+}
diff --git a/server/internal/db/dbm/dbi/dialect.go b/server/internal/db/dbm/dbi/dialect.go
index 4410c510..db25cf45 100644
--- a/server/internal/db/dbm/dbi/dialect.go
+++ b/server/internal/db/dbm/dbi/dialect.go
@@ -108,7 +108,7 @@ type Dialect interface {
GetSchemas() ([]string, error)
// GetDbProgram 获取数据库程序模块,用于数据库备份与恢复
- GetDbProgram() DbProgram
+ GetDbProgram() (DbProgram, error)
// 批量保存数据
BatchInsert(tx *sql.Tx, tableName string, columns []string, values [][]any) (int64, error)
diff --git a/server/internal/db/dbm/dm/dialect.go b/server/internal/db/dbm/dm/dialect.go
index 57448e1b..ecd0cf18 100644
--- a/server/internal/db/dbm/dm/dialect.go
+++ b/server/internal/db/dbm/dm/dialect.go
@@ -248,8 +248,8 @@ func (dd *DMDialect) GetSchemas() ([]string, error) {
}
// GetDbProgram 获取数据库程序模块,用于数据库备份与恢复
-func (dd *DMDialect) GetDbProgram() dbi.DbProgram {
- panic("implement me")
+func (dd *DMDialect) GetDbProgram() (dbi.DbProgram, error) {
+ return nil, fmt.Errorf("该数据库类型不支持数据库备份与恢复: %v", dd.dc.Info.Type)
}
var (
diff --git a/server/internal/db/dbm/mssql/dialect.go b/server/internal/db/dbm/mssql/dialect.go
index c45a7be9..7cf3b725 100644
--- a/server/internal/db/dbm/mssql/dialect.go
+++ b/server/internal/db/dbm/mssql/dialect.go
@@ -285,8 +285,8 @@ func (md *MssqlDialect) GetSchemas() ([]string, error) {
}
// GetDbProgram 获取数据库程序模块,用于数据库备份与恢复
-func (md *MssqlDialect) GetDbProgram() dbi.DbProgram {
- panic("implement me")
+func (md *MssqlDialect) GetDbProgram() (dbi.DbProgram, error) {
+ return nil, fmt.Errorf("该数据库类型不支持数据库备份与恢复: %v", md.dc.Info.Type)
}
func (md *MssqlDialect) BatchInsert(tx *sql.Tx, tableName string, columns []string, values [][]any) (int64, error) {
diff --git a/server/internal/db/dbm/mysql/dialect.go b/server/internal/db/dbm/mysql/dialect.go
index e93a2bda..66c17388 100644
--- a/server/internal/db/dbm/mysql/dialect.go
+++ b/server/internal/db/dbm/mysql/dialect.go
@@ -169,8 +169,8 @@ func (md *MysqlDialect) GetSchemas() ([]string, error) {
}
// GetDbProgram 获取数据库程序模块,用于数据库备份与恢复
-func (md *MysqlDialect) GetDbProgram() dbi.DbProgram {
- return NewDbProgramMysql(md.dc)
+func (md *MysqlDialect) GetDbProgram() (dbi.DbProgram, error) {
+ return NewDbProgramMysql(md.dc), nil
}
func (md *MysqlDialect) BatchInsert(tx *sql.Tx, tableName string, columns []string, values [][]any) (int64, error) {
diff --git a/server/internal/db/dbm/mysql/program.go b/server/internal/db/dbm/mysql/program.go
index 4e9e4eb6..172781fa 100644
--- a/server/internal/db/dbm/mysql/program.go
+++ b/server/internal/db/dbm/mysql/program.go
@@ -2,6 +2,7 @@ package mysql
import (
"bufio"
+ "compress/gzip"
"context"
"database/sql"
"fmt"
@@ -130,22 +131,46 @@ func (svc *DbProgramMysql) Backup(ctx context.Context, backupHistory *entity.DbB
if binlogEnabled && rowFormatEnabled {
binlogInfo, err = readBinlogInfoFromBackup(reader)
}
- _ = reader.Close()
if err != nil {
+ _ = reader.Close()
return nil, errors.Wrapf(err, "从备份文件中读取 binlog 信息失败")
}
- fileName := filepath.Join(dir, fmt.Sprintf("%s.sql", backupHistory.Uuid))
- if err := os.Rename(tmpFile, fileName); err != nil {
- return nil, errors.Wrap(err, "备份文件改名失败")
- }
+ if _, err := reader.Seek(0, io.SeekStart); err != nil {
+ _ = reader.Close()
+ return nil, errors.Wrapf(err, "跳转到备份文件开始处失败")
+ }
+ gzipTmpFile := tmpFile + ".gz"
+ writer, err := os.Create(gzipTmpFile)
+ if err != nil {
+ _ = reader.Close()
+ return nil, errors.Wrapf(err, "创建备份压缩文件失败")
+ }
+ defer func() {
+ _ = os.Remove(gzipTmpFile)
+ }()
+ gzipWriter := gzip.NewWriter(writer)
+ gzipWriter.Name = backupHistory.Uuid + ".sql"
+ _, err = io.Copy(gzipWriter, reader)
+ _ = gzipWriter.Close()
+ _ = writer.Close()
+ _ = reader.Close()
+ if err != nil {
+ return nil, errors.Wrapf(err, "压缩备份文件失败")
+ }
+ destPath := filepath.Join(dir, backupHistory.Uuid+".sql")
+ if err := os.Rename(gzipTmpFile, destPath+".gz"); err != nil {
+ return nil, errors.Wrap(err, "备份文件更名失败")
+ }
return binlogInfo, nil
}
func (svc *DbProgramMysql) RemoveBackupHistory(_ context.Context, dbBackupId uint64, dbBackupHistoryUuid string) error {
fileName := filepath.Join(svc.getDbBackupDir(svc.dbInfo().InstanceId, dbBackupId),
fmt.Sprintf("%v.sql", dbBackupHistoryUuid))
- return os.Remove(fileName)
+ _ = os.Remove(fileName)
+ _ = os.Remove(fileName + ".gz")
+ return nil
}
func (svc *DbProgramMysql) RestoreBackupHistory(ctx context.Context, dbName string, dbBackupId uint64, dbBackupHistoryUuid string) error {
@@ -158,18 +183,33 @@ func (svc *DbProgramMysql) RestoreBackupHistory(ctx context.Context, dbName stri
"--password=" + dbInfo.Password,
}
+ compressed := false
fileName := filepath.Join(svc.getDbBackupDir(svc.dbInfo().InstanceId, dbBackupId),
fmt.Sprintf("%v.sql", dbBackupHistoryUuid))
+ _, err := os.Stat(fileName)
+ if err != nil {
+ compressed = true
+ fileName += ".gz"
+ }
file, err := os.Open(fileName)
if err != nil {
return errors.Wrap(err, "打开备份文件失败")
}
- defer func() {
- _ = file.Close()
- }()
+ defer func() { _ = file.Close() }()
+
+ var reader io.ReadCloser
+ if compressed {
+ reader, err = gzip.NewReader(file)
+ if err != nil {
+ return errors.Wrap(err, "解压缩备份文件失败")
+ }
+ defer func() { _ = reader.Close() }()
+ } else {
+ reader = file
+ }
cmd := exec.CommandContext(ctx, svc.getMysqlBin().MysqlPath, args...)
- cmd.Stdin = file
+ cmd.Stdin = reader
logx.Debug("恢复数据库: ", cmd.String())
if err := runCmd(cmd); err != nil {
logx.Errorf("运行 mysql 程序失败: %v", err)
@@ -205,13 +245,17 @@ func (svc *DbProgramMysql) downloadBinlogFilesOnServer(ctx context.Context, binl
}
// Parse the first binlog eventTs of a local binlog file.
-func (svc *DbProgramMysql) parseLocalBinlogLastEventTime(ctx context.Context, filePath string) (eventTime time.Time, parseErr error) {
- // todo: implement me
- return time.Now(), nil
+func (svc *DbProgramMysql) parseLocalBinlogLastEventTime(ctx context.Context, filePath string, lastEventTime time.Time) (eventTime time.Time, parseErr error) {
+ return svc.parseLocalBinlogEventTime(ctx, filePath, false, lastEventTime)
}
// Parse the first binlog eventTs of a local binlog file.
func (svc *DbProgramMysql) parseLocalBinlogFirstEventTime(ctx context.Context, filePath string) (eventTime time.Time, parseErr error) {
+ return svc.parseLocalBinlogEventTime(ctx, filePath, true, time.Time{})
+}
+
+// Parse the first binlog eventTs of a local binlog file.
+func (svc *DbProgramMysql) parseLocalBinlogEventTime(ctx context.Context, filePath string, firstOrLast bool, startTime time.Time) (eventTime time.Time, parseErr error) {
args := []string{
// Local binlog file path.
filePath,
@@ -220,6 +264,9 @@ func (svc *DbProgramMysql) parseLocalBinlogFirstEventTime(ctx context.Context, f
// Tell mysqlbinlog to suppress the BINLOG statements for row events, which reduces the unneeded output.
"--base64-output=DECODE-ROWS",
}
+ if !startTime.IsZero() {
+ args = append(args, "--start-datetime", startTime.Local().Format(time.DateTime))
+ }
cmd := exec.CommandContext(ctx, svc.getMysqlBin().MysqlbinlogPath, args...)
var stderr strings.Builder
cmd.Stderr = &stderr
@@ -237,22 +284,30 @@ func (svc *DbProgramMysql) parseLocalBinlogFirstEventTime(ctx context.Context, f
parseErr = errors.Wrap(parseErr, stderr.String())
}
}()
-
+ lastEventTime := time.Time{}
for s := bufio.NewScanner(pr); s.Scan(); {
line := s.Text()
eventTimeParsed, found, err := parseBinlogEventTimeInLine(line)
if err != nil {
return time.Time{}, errors.Wrap(err, "解析 binlog 文件失败")
}
- if found {
- return eventTimeParsed, nil
+ if !found {
+ continue
}
+ if !firstOrLast {
+ lastEventTime = eventTimeParsed
+ continue
+ }
+ return eventTimeParsed, nil
}
- return time.Time{}, errors.New("解析 binlog 文件失败")
+ if lastEventTime.IsZero() {
+ return time.Time{}, errors.New("解析 binlog 文件失败")
+ }
+ return lastEventTime, nil
}
// FetchBinlogs downloads binlog files from startingFileName on server to `binlogDir`.
-func (svc *DbProgramMysql) FetchBinlogs(ctx context.Context, downloadLatestBinlogFile bool, earliestBackupSequence, latestBinlogSequence int64) ([]*entity.BinlogFile, error) {
+func (svc *DbProgramMysql) FetchBinlogs(ctx context.Context, downloadLatestBinlogFile bool, earliestBackupSequence int64, latestBinlogHistory *entity.DbBinlogHistory) ([]*entity.BinlogFile, error) {
// Read binlog files list on server.
binlogFilesOnServerSorted, err := svc.GetSortedBinlogFilesOnServer(ctx)
if err != nil {
@@ -264,8 +319,11 @@ func (svc *DbProgramMysql) FetchBinlogs(ctx context.Context, downloadLatestBinlo
}
indexHistory := -1
for i, file := range binlogFilesOnServerSorted {
- if latestBinlogSequence == file.Sequence {
+ if latestBinlogHistory.Sequence == file.Sequence {
indexHistory = i + 1
+ file.FirstEventTime = latestBinlogHistory.FirstEventTime
+ file.LastEventTime = latestBinlogHistory.LastEventTime
+ file.LocalSize = latestBinlogHistory.FileSize
break
}
if earliestBackupSequence == file.Sequence {
@@ -274,10 +332,15 @@ func (svc *DbProgramMysql) FetchBinlogs(ctx context.Context, downloadLatestBinlo
}
}
if indexHistory < 0 {
- return nil, errors.New(fmt.Sprintf("在数据库服务器上未找到 binlog 文件: %d, %d", earliestBackupSequence, latestBinlogSequence))
+ // todo: 数据库服务器上的 binlog 序列已被删除, 导致 binlog 同步失败,如何处理?
+ return nil, errors.New(fmt.Sprintf("数据库服务器上的 binlog 序列已被删除: %d, %d", earliestBackupSequence, latestBinlogHistory.Sequence))
}
- if indexHistory > len(binlogFilesOnServerSorted)-1 {
+ if indexHistory >= len(binlogFilesOnServerSorted)-1 {
indexHistory = len(binlogFilesOnServerSorted) - 1
+ if binlogFilesOnServerSorted[indexHistory].LocalSize == binlogFilesOnServerSorted[indexHistory].RemoteSize {
+ // 没有新的事件,不需要重新下载
+ return nil, nil
+ }
}
binlogFilesOnServerSorted = binlogFilesOnServerSorted[indexHistory:]
@@ -331,13 +394,14 @@ func (svc *DbProgramMysql) downloadBinlogFile(ctx context.Context, binlogFileToD
logx.Error("未找到 binlog 文件", logx.String("path", binlogFilePathTemp), logx.String("error", err.Error()))
return errors.Wrapf(err, "未找到 binlog 文件: %q", binlogFilePathTemp)
}
- if !isLast && binlogFileTempInfo.Size() != binlogFileToDownload.Size {
+
+ if (isLast && binlogFileTempInfo.Size() < binlogFileToDownload.RemoteSize) || (!isLast && binlogFileTempInfo.Size() != binlogFileToDownload.RemoteSize) {
logx.Error("Downloaded archived binlog file size is not equal to size queried on the MySQL server earlier.",
logx.String("binlog", binlogFileToDownload.Name),
- logx.Int64("sizeInfo", binlogFileToDownload.Size),
+ logx.Int64("sizeInfo", binlogFileToDownload.RemoteSize),
logx.Int64("downloadedSize", binlogFileTempInfo.Size()),
)
- return errors.Errorf("下载的 binlog 文件 %q 与服务上的文件大小不一致 %d != %d", binlogFilePathTemp, binlogFileTempInfo.Size(), binlogFileToDownload.Size)
+ return errors.Errorf("下载的 binlog 文件 %q 与服务上的文件大小不一致 %d != %d", binlogFilePathTemp, binlogFileTempInfo.Size(), binlogFileToDownload.RemoteSize)
}
binlogFilePath := svc.GetBinlogFilePath(binlogFileToDownload.Name)
@@ -348,7 +412,7 @@ func (svc *DbProgramMysql) downloadBinlogFile(ctx context.Context, binlogFileToD
if err != nil {
return err
}
- lastEventTime, err := svc.parseLocalBinlogLastEventTime(ctx, binlogFilePath)
+ lastEventTime, err := svc.parseLocalBinlogLastEventTime(ctx, binlogFilePath, binlogFileToDownload.LastEventTime)
if err != nil {
return err
}
@@ -394,9 +458,9 @@ func (svc *DbProgramMysql) GetSortedBinlogFilesOnServer(_ context.Context) ([]*e
return nil, errors.Wrapf(err, "SQL 语句 %q 执行结果解析失败", query)
}
binlogFile := &entity.BinlogFile{
- Name: name,
- Size: int64(size),
- Sequence: seq,
+ Name: name,
+ RemoteSize: int64(size),
+ Sequence: seq,
}
binlogFiles = append(binlogFiles, binlogFile)
}
@@ -781,3 +845,9 @@ func (svc *DbProgramMysql) getDbBackupDir(instanceId, backupId uint64) string {
fmt.Sprintf("instance-%d", instanceId),
fmt.Sprintf("backup-%d", backupId))
}
+
+func (svc *DbProgramMysql) PruneBinlog(history *entity.DbBinlogHistory) error {
+ binlogFilePath := filepath.Join(svc.getBinlogDir(history.DbInstanceId), history.FileName)
+ _ = os.Remove(binlogFilePath)
+ return nil
+}
diff --git a/server/internal/db/dbm/mysql/program_e2e_test.go b/server/internal/db/dbm/mysql/program_e2e_test.go
index 8a99d46f..86780b80 100644
--- a/server/internal/db/dbm/mysql/program_e2e_test.go
+++ b/server/internal/db/dbm/mysql/program_e2e_test.go
@@ -47,11 +47,11 @@ func (s *DbInstanceSuite) SetupSuite() {
Username: "test",
Password: "test",
}
- dbConn, err := dbInfo.Conn(GetMeta())
+ dbConn, err := dbInfo.Conn(dbi.GetMeta(dbi.DbTypeMysql))
s.Require().NoError(err)
s.dbConn = dbConn
s.repositories = &repository.Repositories{
- Instance: persistence.GetInstanceRepo(),
+ Instance: persistence.NewInstanceRepo(),
Backup: persistence.NewDbBackupRepo(),
BackupHistory: persistence.NewDbBackupHistoryRepo(),
Restore: persistence.NewDbRestoreRepo(),
@@ -111,7 +111,7 @@ func (s *DbInstanceSuite) testBackup(backupHistory *entity.DbBackupHistory) {
binlogInfo, err := s.instanceSvc.Backup(context.Background(), backupHistory)
require.NoError(err)
- fileName := filepath.Join(s.instanceSvc.getDbBackupDir(s.dbConn.Info.InstanceId, backupHistory.Id), dbNameBackupTest+".sql")
+ fileName := filepath.Join(s.instanceSvc.getDbBackupDir(s.dbConn.Info.InstanceId, backupHistory.Id), dbNameBackupTest+".sql.gz")
_, err = os.Stat(fileName)
require.NoError(err)
diff --git a/server/internal/db/dbm/oracle/dialect.go b/server/internal/db/dbm/oracle/dialect.go
index 0e303d58..9814f4d2 100644
--- a/server/internal/db/dbm/oracle/dialect.go
+++ b/server/internal/db/dbm/oracle/dialect.go
@@ -248,8 +248,8 @@ func (od *OracleDialect) GetSchemas() ([]string, error) {
}
// GetDbProgram 获取数据库程序模块,用于数据库备份与恢复
-func (od *OracleDialect) GetDbProgram() dbi.DbProgram {
- panic("implement me")
+func (od *OracleDialect) GetDbProgram() (dbi.DbProgram, error) {
+ return nil, fmt.Errorf("该数据库类型不支持数据库备份与恢复: %v", od.dc.Info.Type)
}
func (od *OracleDialect) BatchInsert(tx *sql.Tx, tableName string, columns []string, values [][]any) (int64, error) {
diff --git a/server/internal/db/dbm/postgres/dialect.go b/server/internal/db/dbm/postgres/dialect.go
index 173c93a2..33d79ac3 100644
--- a/server/internal/db/dbm/postgres/dialect.go
+++ b/server/internal/db/dbm/postgres/dialect.go
@@ -188,8 +188,8 @@ func (md *PgsqlDialect) GetSchemas() ([]string, error) {
}
// GetDbProgram 获取数据库程序模块,用于数据库备份与恢复
-func (md *PgsqlDialect) GetDbProgram() dbi.DbProgram {
- panic("implement me")
+func (md *PgsqlDialect) GetDbProgram() (dbi.DbProgram, error) {
+ return nil, fmt.Errorf("该数据库类型不支持数据库备份与恢复: %v", md.dc.Info.Type)
}
func (md *PgsqlDialect) BatchInsert(tx *sql.Tx, tableName string, columns []string, values [][]any) (int64, error) {
diff --git a/server/internal/db/dbm/sqlite/dialect.go b/server/internal/db/dbm/sqlite/dialect.go
index 3b0887ff..ce26841e 100644
--- a/server/internal/db/dbm/sqlite/dialect.go
+++ b/server/internal/db/dbm/sqlite/dialect.go
@@ -180,8 +180,8 @@ func (sd *SqliteDialect) GetSchemas() ([]string, error) {
}
// GetDbProgram 获取数据库程序模块,用于数据库备份与恢复
-func (sd *SqliteDialect) GetDbProgram() dbi.DbProgram {
- panic("implement me")
+func (sd *SqliteDialect) GetDbProgram() (dbi.DbProgram, error) {
+ return nil, fmt.Errorf("该数据库类型不支持数据库备份与恢复: %v", sd.dc.Info.Type)
}
func (sd *SqliteDialect) BatchInsert(tx *sql.Tx, tableName string, columns []string, values [][]any) (int64, error) {
diff --git a/server/internal/db/domain/entity/db_backup.go b/server/internal/db/domain/entity/db_backup.go
index 2455a49e..d8f62f2e 100644
--- a/server/internal/db/domain/entity/db_backup.go
+++ b/server/internal/db/domain/entity/db_backup.go
@@ -18,6 +18,7 @@ type DbBackup struct {
EnabledDesc string // 启用状态描述
StartTime time.Time // 开始时间
Interval time.Duration // 间隔时间
+ MaxSaveDays int // 数据库备份历史保留天数,过期将自动删除
Repeated bool // 是否重复执行
}
@@ -81,10 +82,6 @@ func (b *DbBackup) GetInterval() time.Duration {
return b.Interval
}
-func (b *DbBackup) SetLastStatus(status DbJobStatus, err error) {
- b.setLastStatus(b.GetJobType(), status, err)
-}
-
func (b *DbBackup) GetKey() DbJobKey {
return b.getKey(b.GetJobType())
}
diff --git a/server/internal/db/domain/entity/db_binlog.go b/server/internal/db/domain/entity/db_binlog.go
index c1d9d3b8..388f52d1 100644
--- a/server/internal/db/domain/entity/db_binlog.go
+++ b/server/internal/db/domain/entity/db_binlog.go
@@ -11,14 +11,16 @@ const (
// BinlogFile is the metadata of the MySQL binlog file.
type BinlogFile struct {
- Name string
- Size int64
+ Name string
+ RemoteSize int64
+ LocalSize int64
// Sequence is parsed from Name and is for the sorting purpose.
Sequence int64
FirstEventTime time.Time
LastEventTime time.Time
- Downloaded bool
+
+ Downloaded bool
}
var _ DbJob = (*DbBinlog)(nil)
@@ -76,10 +78,6 @@ func (b *DbBinlog) GetJobType() DbJobType {
return DbJobTypeBinlog
}
-func (b *DbBinlog) SetLastStatus(status DbJobStatus, err error) {
- b.setLastStatus(b.GetJobType(), status, err)
-}
-
func (b *DbBinlog) GetKey() DbJobKey {
return b.getKey(b.GetJobType())
}
diff --git a/server/internal/db/domain/entity/db_binlog_history.go b/server/internal/db/domain/entity/db_binlog_history.go
index 1a6a6fe6..43fcc46f 100644
--- a/server/internal/db/domain/entity/db_binlog_history.go
+++ b/server/internal/db/domain/entity/db_binlog_history.go
@@ -14,6 +14,7 @@ type DbBinlogHistory struct {
FileSize int64
Sequence int64
FirstEventTime time.Time
+ LastEventTime time.Time
DbInstanceId uint64 `json:"dbInstanceId"`
}
diff --git a/server/internal/db/domain/entity/db_job.go b/server/internal/db/domain/entity/db_job.go
index bf318662..d7dbfc2e 100644
--- a/server/internal/db/domain/entity/db_job.go
+++ b/server/internal/db/domain/entity/db_job.go
@@ -45,7 +45,6 @@ var _ runner.Job = (DbJob)(nil)
type DbJobBase interface {
model.ModelI
- GetLastStatus() DbJobStatus
}
type DbJob interface {
@@ -62,7 +61,6 @@ type DbJob interface {
SetEnabled(enabled bool, desc string)
Update(job runner.Job)
GetInterval() time.Duration
- SetLastStatus(status DbJobStatus, err error)
}
var _ DbJobBase = (*DbJobBaseImpl)(nil)
@@ -84,10 +82,6 @@ func (d *DbJobBaseImpl) getJobType() DbJobType {
return job.GetJobType()
}
-func (d *DbJobBaseImpl) GetLastStatus() DbJobStatus {
- return d.LastStatus
-}
-
func (d *DbJobBaseImpl) setLastStatus(jobType DbJobType, status DbJobStatus, err error) {
var statusName, jobName string
switch status {
diff --git a/server/internal/db/domain/entity/db_restore.go b/server/internal/db/domain/entity/db_restore.go
index 5eaa35f8..5660a338 100644
--- a/server/internal/db/domain/entity/db_restore.go
+++ b/server/internal/db/domain/entity/db_restore.go
@@ -79,10 +79,6 @@ func (r *DbRestore) GetJobType() DbJobType {
return DbJobTypeRestore
}
-func (r *DbRestore) SetLastStatus(status DbJobStatus, err error) {
- r.setLastStatus(r.GetJobType(), status, err)
-}
-
func (r *DbRestore) GetKey() DbJobKey {
return r.getKey(r.GetJobType())
}
diff --git a/server/internal/db/domain/repository/db_backup.go b/server/internal/db/domain/repository/db_backup.go
index 8eaad758..666dcc77 100644
--- a/server/internal/db/domain/repository/db_backup.go
+++ b/server/internal/db/domain/repository/db_backup.go
@@ -6,7 +6,7 @@ import (
)
type DbBackup interface {
- DbJob
+ DbJob[*entity.DbBackup]
ListToDo(jobs any) error
ListDbInstances(enabled bool, repeated bool, instanceIds *[]uint64) error
@@ -14,4 +14,6 @@ type DbBackup interface {
// GetPageList 分页获取数据库任务列表
GetPageList(condition *entity.DbBackupQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error)
+
+ ListByCond(cond any, listModels any, cols ...string) error
}
diff --git a/server/internal/db/domain/repository/db_backup_history.go b/server/internal/db/domain/repository/db_backup_history.go
index d3d678bc..4ae5ce87 100644
--- a/server/internal/db/domain/repository/db_backup_history.go
+++ b/server/internal/db/domain/repository/db_backup_history.go
@@ -12,12 +12,13 @@ type DbBackupHistory interface {
// GetPageList 分页获取数据备份历史
GetPageList(condition *entity.DbBackupHistoryQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error)
- GetLatestHistory(instanceId uint64, dbName string, bi *entity.BinlogInfo) (*entity.DbBackupHistory, error)
+ GetLatestHistoryForBinlog(instanceId uint64, dbName string, bi *entity.BinlogInfo) (*entity.DbBackupHistory, error)
- GetEarliestHistory(instanceId uint64) (*entity.DbBackupHistory, bool, error)
+ GetEarliestHistoryForBinlog(instanceId uint64) (*entity.DbBackupHistory, bool, error)
GetHistories(backupHistoryIds []uint64, toEntity any) error
UpdateDeleting(deleting bool, backupHistoryId ...uint64) (bool, error)
UpdateRestoring(restoring bool, backupHistoryId ...uint64) (bool, error)
+ ZeroBinlogInfo(backupHistoryId uint64) error
}
diff --git a/server/internal/db/domain/repository/db_binlog.go b/server/internal/db/domain/repository/db_binlog.go
index 76c1122a..f3229c6e 100644
--- a/server/internal/db/domain/repository/db_binlog.go
+++ b/server/internal/db/domain/repository/db_binlog.go
@@ -6,7 +6,7 @@ import (
)
type DbBinlog interface {
- DbJob
+ DbJob[*entity.DbBinlog]
AddJobIfNotExists(ctx context.Context, job *entity.DbBinlog) error
}
diff --git a/server/internal/db/domain/repository/db_binlog_history.go b/server/internal/db/domain/repository/db_binlog_history.go
index 0085b52f..a93130d3 100644
--- a/server/internal/db/domain/repository/db_binlog_history.go
+++ b/server/internal/db/domain/repository/db_binlog_history.go
@@ -19,4 +19,6 @@ type DbBinlogHistory interface {
InsertWithBinlogFiles(ctx context.Context, instanceId uint64, binlogFiles []*entity.BinlogFile) error
Upsert(ctx context.Context, history *entity.DbBinlogHistory) error
+
+ GetHistoriesBeforeSequence(ctx context.Context, instanceId uint64, binlogSeq int64, histories *[]*entity.DbBinlogHistory) error
}
diff --git a/server/internal/db/domain/repository/db_job.go b/server/internal/db/domain/repository/db_job.go
index 76ced093..17063f58 100644
--- a/server/internal/db/domain/repository/db_job.go
+++ b/server/internal/db/domain/repository/db_job.go
@@ -3,24 +3,18 @@ package repository
import (
"context"
"mayfly-go/internal/db/domain/entity"
+ "mayfly-go/pkg/base"
)
-type DbJobBase interface {
- // GetById 根据实体id查询
- GetById(e entity.DbJob, id uint64, cols ...string) error
-
- // UpdateById 根据实体id更新实体信息
- UpdateById(ctx context.Context, e entity.DbJob, columns ...string) error
-
- // DeleteById 根据实体主键删除实体
- DeleteById(ctx context.Context, id uint64) error
+type DbJobBase[T entity.DbJob] interface {
+ base.Repo[T]
// UpdateLastStatus 更新任务执行状态
UpdateLastStatus(ctx context.Context, job entity.DbJob) error
}
-type DbJob interface {
- DbJobBase
+type DbJob[T entity.DbJob] interface {
+ DbJobBase[T]
// AddJob 添加数据库任务
AddJob(ctx context.Context, jobs any) error
diff --git a/server/internal/db/domain/repository/db_restore.go b/server/internal/db/domain/repository/db_restore.go
index 44fec3a4..4f30e67e 100644
--- a/server/internal/db/domain/repository/db_restore.go
+++ b/server/internal/db/domain/repository/db_restore.go
@@ -6,7 +6,7 @@ import (
)
type DbRestore interface {
- DbJob
+ DbJob[*entity.DbRestore]
ListToDo(jobs any) error
GetDbNamesWithoutRestore(instanceId uint64, dbNames []string) ([]string, error)
diff --git a/server/internal/db/infrastructure/persistence/db_backup.go b/server/internal/db/infrastructure/persistence/db_backup.go
index f7717b6d..632e7fda 100644
--- a/server/internal/db/infrastructure/persistence/db_backup.go
+++ b/server/internal/db/infrastructure/persistence/db_backup.go
@@ -64,7 +64,6 @@ func (d *dbBackupRepoImpl) ListToDo(jobs any) error {
// GetPageList 分页获取数据库备份任务列表
func (d *dbBackupRepoImpl) GetPageList(condition *entity.DbBackupQuery, pageParam *model.PageParam, toEntity any, _ ...string) (*model.PageResult[any], error) {
- d.GetModel()
qd := gormx.NewQuery(d.GetModel()).
Eq("id", condition.Id).
Eq0("db_instance_id", condition.DbInstanceId).
@@ -83,12 +82,16 @@ func (d *dbBackupRepoImpl) UpdateEnabled(_ context.Context, jobId uint64, enable
cond := map[string]any{
"id": jobId,
}
- desc := "任务已禁用"
+ desc := "已禁用"
if enabled {
- desc = "任务已启用"
+ desc = "已启用"
}
return d.Updates(cond, map[string]any{
"enabled": enabled,
"enabled_desc": desc,
})
}
+
+func (d *dbBackupRepoImpl) ListByCond(cond any, listModels any, cols ...string) error {
+ return d.dbJobBaseImpl.ListByCond(cond, listModels, cols...)
+}
diff --git a/server/internal/db/infrastructure/persistence/db_backup_history.go b/server/internal/db/infrastructure/persistence/db_backup_history.go
index 7ed915f9..ccc12b0a 100644
--- a/server/internal/db/infrastructure/persistence/db_backup_history.go
+++ b/server/internal/db/infrastructure/persistence/db_backup_history.go
@@ -34,12 +34,13 @@ func (repo *dbBackupHistoryRepoImpl) GetPageList(condition *entity.DbBackupHisto
func (repo *dbBackupHistoryRepoImpl) GetHistories(backupHistoryIds []uint64, toEntity any) error {
return global.Db.Model(repo.GetModel()).
Where("id in ?", backupHistoryIds).
+ Where("deleting = false").
Scopes(gormx.UndeleteScope).
Find(toEntity).
Error
}
-func (repo *dbBackupHistoryRepoImpl) GetLatestHistory(instanceId uint64, dbName string, bi *entity.BinlogInfo) (*entity.DbBackupHistory, error) {
+func (repo *dbBackupHistoryRepoImpl) GetLatestHistoryForBinlog(instanceId uint64, dbName string, bi *entity.BinlogInfo) (*entity.DbBackupHistory, error) {
history := &entity.DbBackupHistory{}
db := global.Db
err := db.Model(repo.GetModel()).
@@ -48,6 +49,8 @@ func (repo *dbBackupHistoryRepoImpl) GetLatestHistory(instanceId uint64, dbName
Where(db.Where("binlog_sequence < ?", bi.Sequence).
Or(db.Where("binlog_sequence = ?", bi.Sequence).
Where("binlog_position <= ?", bi.Position))).
+ Where("binlog_sequence > 0").
+ Where("deleting = false").
Scopes(gormx.UndeleteScope).
Order("binlog_sequence desc, binlog_position desc").
First(history).Error
@@ -57,10 +60,12 @@ func (repo *dbBackupHistoryRepoImpl) GetLatestHistory(instanceId uint64, dbName
return history, err
}
-func (repo *dbBackupHistoryRepoImpl) GetEarliestHistory(instanceId uint64) (*entity.DbBackupHistory, bool, error) {
+func (repo *dbBackupHistoryRepoImpl) GetEarliestHistoryForBinlog(instanceId uint64) (*entity.DbBackupHistory, bool, error) {
history := &entity.DbBackupHistory{}
db := global.Db.Model(repo.GetModel())
err := db.Where("db_instance_id = ?", instanceId).
+ Where("binlog_sequence > 0").
+ Where("deleting = false").
Scopes(gormx.UndeleteScope).
Order("binlog_sequence").
First(history).Error
@@ -79,7 +84,7 @@ func (repo *dbBackupHistoryRepoImpl) UpdateDeleting(deleting bool, backupHistory
Where("id in ?", backupHistoryId).
Where("restoring = false").
Scopes(gormx.UndeleteScope).
- Update("restoring", deleting)
+ Update("deleting", deleting)
if db.Error != nil {
return false, db.Error
}
@@ -103,3 +108,15 @@ func (repo *dbBackupHistoryRepoImpl) UpdateRestoring(restoring bool, backupHisto
}
return true, nil
}
+
+func (repo *dbBackupHistoryRepoImpl) ZeroBinlogInfo(backupHistoryId uint64) error {
+ return global.Db.Model(repo.GetModel()).
+ Where("id = ?", backupHistoryId).
+ Where("restoring = false").
+ Scopes(gormx.UndeleteScope).
+ Updates(&map[string]any{
+ "binlog_file_name": "",
+ "binlog_sequence": 0,
+ "binlog_position": 0,
+ }).Error
+}
diff --git a/server/internal/db/infrastructure/persistence/db_binlog_history.go b/server/internal/db/infrastructure/persistence/db_binlog_history.go
index 42345257..6d6140cb 100644
--- a/server/internal/db/infrastructure/persistence/db_binlog_history.go
+++ b/server/internal/db/infrastructure/persistence/db_binlog_history.go
@@ -7,6 +7,7 @@ import (
"mayfly-go/internal/db/domain/entity"
"mayfly-go/internal/db/domain/repository"
"mayfly-go/pkg/base"
+ "mayfly-go/pkg/global"
"mayfly-go/pkg/gormx"
"time"
)
@@ -82,7 +83,7 @@ func (repo *dbBinlogHistoryRepoImpl) Upsert(_ context.Context, history *entity.D
First(old).Error
switch {
case err == nil:
- return db.Model(old).Select("create_time", "file_size", "first_event_time").Updates(history).Error
+ return db.Model(old).Select("create_time", "file_size", "first_event_time", "last_event_time").Updates(history).Error
case errors.Is(err, gorm.ErrRecordNotFound):
return db.Create(history).Error
default:
@@ -103,9 +104,10 @@ func (repo *dbBinlogHistoryRepoImpl) InsertWithBinlogFiles(ctx context.Context,
history := &entity.DbBinlogHistory{
CreateTime: time.Now(),
FileName: fileOnServer.Name,
- FileSize: fileOnServer.Size,
+ FileSize: fileOnServer.RemoteSize,
Sequence: fileOnServer.Sequence,
FirstEventTime: fileOnServer.FirstEventTime,
+ LastEventTime: fileOnServer.LastEventTime,
DbInstanceId: instanceId,
}
histories = append(histories, history)
@@ -122,3 +124,13 @@ func (repo *dbBinlogHistoryRepoImpl) InsertWithBinlogFiles(ctx context.Context,
}
return nil
}
+
+func (repo *dbBinlogHistoryRepoImpl) GetHistoriesBeforeSequence(ctx context.Context, instanceId uint64, binlogSeq int64, histories *[]*entity.DbBinlogHistory) error {
+ return global.Db.Model(repo.GetModel()).
+ Where("db_instance_id = ?", instanceId).
+ Where("sequence < ?", binlogSeq).
+ Scopes(gormx.UndeleteScope).
+ Order("id").
+ Find(histories).
+ Error
+}
diff --git a/server/internal/db/infrastructure/persistence/db_job_base.go b/server/internal/db/infrastructure/persistence/db_job_base.go
index 1e04d290..750ab377 100644
--- a/server/internal/db/infrastructure/persistence/db_job_base.go
+++ b/server/internal/db/infrastructure/persistence/db_job_base.go
@@ -12,20 +12,12 @@ import (
"reflect"
)
-var _ repository.DbJobBase = (*dbJobBaseImpl[entity.DbJob])(nil)
+var _ repository.DbJobBase[entity.DbJob] = (*dbJobBaseImpl[entity.DbJob])(nil)
type dbJobBaseImpl[T entity.DbJob] struct {
base.RepoImpl[T]
}
-func (d *dbJobBaseImpl[T]) GetById(e entity.DbJob, id uint64, cols ...string) error {
- return d.RepoImpl.GetById(e.(T), id, cols...)
-}
-
-func (d *dbJobBaseImpl[T]) UpdateById(ctx context.Context, e entity.DbJob, columns ...string) error {
- return d.RepoImpl.UpdateById(ctx, e.(T), columns...)
-}
-
func (d *dbJobBaseImpl[T]) UpdateLastStatus(ctx context.Context, job entity.DbJob) error {
return d.UpdateById(ctx, job.(T), "last_status", "last_result", "last_time")
}
diff --git a/server/internal/db/infrastructure/persistence/db_restore.go b/server/internal/db/infrastructure/persistence/db_restore.go
index d4e4ab07..c0ccb36e 100644
--- a/server/internal/db/infrastructure/persistence/db_restore.go
+++ b/server/internal/db/infrastructure/persistence/db_restore.go
@@ -84,9 +84,9 @@ func (d *dbRestoreRepoImpl) UpdateEnabled(_ context.Context, jobId uint64, enabl
cond := map[string]any{
"id": jobId,
}
- desc := "任务已禁用"
+ desc := "已禁用"
if enabled {
- desc = "任务已启用"
+ desc = "已启用"
}
return d.Updates(cond, map[string]any{
"enabled": enabled,
diff --git a/server/internal/db/infrastructure/persistence/instance.go b/server/internal/db/infrastructure/persistence/instance.go
index cd9e1d67..f4652088 100644
--- a/server/internal/db/infrastructure/persistence/instance.go
+++ b/server/internal/db/infrastructure/persistence/instance.go
@@ -12,7 +12,7 @@ type instanceRepoImpl struct {
base.RepoImpl[*entity.DbInstance]
}
-func newInstanceRepo() repository.Instance {
+func NewInstanceRepo() repository.Instance {
return &instanceRepoImpl{base.RepoImpl[*entity.DbInstance]{M: new(entity.DbInstance)}}
}
diff --git a/server/internal/db/infrastructure/persistence/persistence.go b/server/internal/db/infrastructure/persistence/persistence.go
index e0f04fe8..abb77248 100644
--- a/server/internal/db/infrastructure/persistence/persistence.go
+++ b/server/internal/db/infrastructure/persistence/persistence.go
@@ -5,7 +5,7 @@ import (
)
func Init() {
- ioc.Register(newInstanceRepo(), ioc.WithComponentName("DbInstanceRepo"))
+ ioc.Register(NewInstanceRepo(), ioc.WithComponentName("DbInstanceRepo"))
ioc.Register(newDbRepo(), ioc.WithComponentName("DbRepo"))
ioc.Register(newDbSqlRepo(), ioc.WithComponentName("DbSqlRepo"))
ioc.Register(newDbSqlExecRepo(), ioc.WithComponentName("DbSqlExecRepo"))
diff --git a/server/pkg/biz/assert.go b/server/pkg/biz/assert.go
index 74a5e76f..94ecdf59 100644
--- a/server/pkg/biz/assert.go
+++ b/server/pkg/biz/assert.go
@@ -24,6 +24,12 @@ func ErrIsNil(err error, msgAndParams ...any) {
}
}
+func ErrNotNil(err error, msg string, params ...any) {
+ if err == nil {
+ panic(errorx.NewBiz(fmt.Sprintf(msg, params...)))
+ }
+}
+
func ErrIsNilAppendErr(err error, msg string) {
if err != nil {
panic(errorx.NewBiz(fmt.Sprintf(msg, err.Error())))
diff --git a/server/pkg/runner/runner.go b/server/pkg/runner/runner.go
index f0fd4d1d..d7eb7e48 100644
--- a/server/pkg/runner/runner.go
+++ b/server/pkg/runner/runner.go
@@ -22,7 +22,7 @@ var (
type JobKey = string
type RunJobFunc[T Job] func(ctx context.Context, job T) error
type NextJobFunc[T Job] func() (T, bool)
-type RunnableJobFunc[T Job] func(job T, next NextJobFunc[T]) (bool, error)
+type RunnableJobFunc[T Job] func(job T, nextRunning NextJobFunc[T]) (bool, error)
type ScheduleJobFunc[T Job] func(job T) (deadline time.Time, err error)
type UpdateJobFunc[T Job] func(ctx context.Context, job T) error
diff --git a/server/resources/data/mayfly-go.sqlite b/server/resources/data/mayfly-go.sqlite
index 7ed16d6e..16c5a5df 100644
Binary files a/server/resources/data/mayfly-go.sqlite and b/server/resources/data/mayfly-go.sqlite differ
diff --git a/server/resources/script/sql/mayfly-go-sqlite.sql b/server/resources/script/sql/mayfly-go-sqlite.sql
index 13c36ece..2e4c3d27 100644
--- a/server/resources/script/sql/mayfly-go-sqlite.sql
+++ b/server/resources/script/sql/mayfly-go-sqlite.sql
@@ -50,6 +50,7 @@ CREATE TABLE IF NOT EXISTS "t_db_backup" (
"db_name" text(64) NOT NULL,
"repeated" integer(1),
"interval" integer(20),
+ "max_save_days" integer(8) NOT NULL DEFAULT '0',
"start_time" datetime,
"enabled" integer(1),
"enabled_desc" text(64),
@@ -81,8 +82,8 @@ CREATE TABLE IF NOT EXISTS "t_db_backup_history" (
"create_time" datetime,
"is_deleted" integer(1) NOT NULL,
"delete_time" datetime,
- "restoring" integer(1),
- "deleting" integer(1),
+ "restoring" integer(1) NOT NULL DEFAULT '0',
+ "deleting" integer(1) NOT NULL DEFAULT '0',
PRIMARY KEY ("id")
);
@@ -112,6 +113,7 @@ CREATE TABLE IF NOT EXISTS "t_db_binlog_history" (
"file_size" integer(20),
"sequence" integer(20),
"first_event_time" datetime,
+ "last_event_time" datetime,
"create_time" datetime,
"is_deleted" integer(4) NOT NULL,
"delete_time" datetime,
@@ -830,8 +832,8 @@ INSERT INTO t_sys_resource (id, pid, ui_path, type, status, name, code, weight,
INSERT INTO t_sys_resource (id, pid, ui_path, type, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES (153, 150, 'Jra0n7De/pLOA2UYz/', 2, 1, '删除', 'db:sync:del', 1703641342, 'null', 12, 'liuzongyang', 12, 'liuzongyang', '2023-12-27 09:42:22', '2023-12-27 09:42:22', 0, NULL);
INSERT INTO t_sys_resource (id, pid, ui_path, type, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES (154, 150, 'Jra0n7De/VBt68CDx/', 2, 1, '启停', 'db:sync:status', 1703641364, 'null', 12, 'liuzongyang', 12, 'liuzongyang', '2023-12-27 09:42:45', '2023-12-27 09:42:45', 0, NULL);
INSERT INTO t_sys_resource (id, pid, ui_path, type, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES (155, 150, 'Jra0n7De/PigmSGVg/', 2, 1, '日志', 'db:sync:log', 1704266866, 'null', 12, 'liuzongyang', 12, 'liuzongyang', '2024-01-03 15:27:47', '2024-01-03 15:27:47', 0, NULL);
-INSERT INTO t_sys_resource (id, pid, ui_path, type, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES (161, 49, 'dbms23ax/xleaiec2/3NUXQFIO/', 2, 1, '数据库备份', 'db:backup', 1705973876, 'null', 1, 'admin', 1, 'admin', '2024-01-23 09:37:56', '2024-01-23 09:37:56', 0, NULL);
-INSERT INTO t_sys_resource (id, pid, ui_path, type, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES (160, 49, 'dbms23ax/xleaiec2/ghErkTdb/', 2, 1, '数据库恢复', 'db:restore', 1705973909, 'null', 1, 'admin', 1, 'admin', '2024-01-23 09:38:29', '2024-01-23 09:38:29', 0, NULL);
+INSERT INTO t_sys_resource (id, pid, ui_path, type, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES (160, 49, 'dbms23ax/xleaiec2/3NUXQFIO/', 2, 1, '数据库备份', 'db:backup', 1705973876, 'null', 1, 'admin', 1, 'admin', '2024-01-23 09:37:56', '2024-01-23 09:37:56', 0, NULL);
+INSERT INTO t_sys_resource (id, pid, ui_path, type, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES (161, 49, 'dbms23ax/xleaiec2/ghErkTdb/', 2, 1, '数据库恢复', 'db:restore', 1705973909, 'null', 1, 'admin', 1, 'admin', '2024-01-23 09:38:29', '2024-01-23 09:38:29', 0, NULL);
-- Table: t_sys_role
CREATE TABLE IF NOT EXISTS "t_sys_role" (
diff --git a/server/resources/script/sql/mayfly-go.sql b/server/resources/script/sql/mayfly-go.sql
index b6522bde..a66bb70c 100644
--- a/server/resources/script/sql/mayfly-go.sql
+++ b/server/resources/script/sql/mayfly-go.sql
@@ -108,6 +108,7 @@ CREATE TABLE `t_db_backup` (
`db_name` varchar(64) NOT NULL COMMENT '数据库名称',
`repeated` tinyint(1) DEFAULT NULL COMMENT '是否重复执行',
`interval` bigint(20) DEFAULT NULL COMMENT '备份周期',
+ `max_save_days` int(8) NOT NULL DEFAULT '0' COMMENT '最大保留天数',
`start_time` datetime DEFAULT NULL COMMENT '首次备份时间',
`enabled` tinyint(1) DEFAULT NULL COMMENT '是否启用',
`enabled_desc` varchar(64) NULL COMMENT '任务启用描述',
@@ -144,8 +145,8 @@ CREATE TABLE `t_db_backup_history` (
`create_time` datetime DEFAULT NULL COMMENT '历史备份创建时间',
`is_deleted` tinyint(1) NOT NULL DEFAULT 0,
`delete_time` datetime DEFAULT NULL,
- `restoring` int(1) NOT NULL DEFAULT '0' COMMENT '备份历史恢复标识',
- `deleting` int(1) NOT NULL DEFAULT '0' COMMENT '备份历史删除标识',
+ `restoring` tinyint(1) NOT NULL DEFAULT '0' COMMENT '备份历史恢复标识',
+ `deleting` tinyint(1) NOT NULL DEFAULT '0' COMMENT '备份历史删除标识',
PRIMARY KEY (`id`),
KEY `idx_db_backup_id` (`db_backup_id`) USING BTREE,
KEY `idx_db_instance_id` (`db_instance_id`) USING BTREE,
@@ -232,6 +233,7 @@ CREATE TABLE `t_db_binlog_history` (
`file_size` bigint(20) DEFAULT NULL COMMENT 'BINLOG文件大小',
`sequence` bigint(20) DEFAULT NULL COMMENT 'BINLOG序列号',
`first_event_time` datetime DEFAULT NULL COMMENT '首次事件时间',
+ `last_event_time` datetime DEFAULT NULL COMMENT '最新事件时间',
`create_time` datetime DEFAULT NULL,
`is_deleted` tinyint(4) NOT NULL DEFAULT 0,
`delete_time` datetime DEFAULT NULL,
@@ -792,8 +794,8 @@ INSERT INTO t_sys_resource (id, pid, ui_path, `type`, status, name, code, weight
INSERT INTO t_sys_resource (id, pid, ui_path, `type`, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES(152, 150, 'Jra0n7De/zvAMo2vk/', 2, 1, '编辑', 'db:sync:save', 1703641320, 'null', 12, 'liuzongyang', 12, 'liuzongyang', '2023-12-27 09:42:00', '2023-12-27 09:42:12', 0, NULL);
INSERT INTO t_sys_resource (id, pid, ui_path, `type`, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES(151, 150, 'Jra0n7De/uAnHZxEV/', 2, 1, '基本权限', 'db:sync', 1703641202, 'null', 12, 'liuzongyang', 12, 'liuzongyang', '2023-12-27 09:40:02', '2023-12-27 09:40:02', 0, NULL);
INSERT INTO t_sys_resource (id, pid, ui_path, `type`, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES(150, 36, 'Jra0n7De/', 1, 1, '数据同步', 'sync', 1693040707, '{"component":"ops/db/SyncTaskList","icon":"Coin","isKeepAlive":true,"routeName":"SyncTaskList"}', 12, 'liuzongyang', 12, 'liuzongyang', '2023-12-22 09:51:34', '2023-12-27 10:16:57', 0, NULL);
-INSERT INTO t_sys_resource (id, pid, ui_path, `type`, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES(161, 49, 'dbms23ax/xleaiec2/3NUXQFIO/', 2, 1, '数据库备份', 'db:backup', 1705973876, 'null', 1, 'admin', 1, 'admin', '2024-01-23 09:37:56', '2024-01-23 09:37:56', 0, NULL);
-INSERT INTO t_sys_resource (id, pid, ui_path, `type`, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES(160, 49, 'dbms23ax/xleaiec2/ghErkTdb/', 2, 1, '数据库恢复', 'db:restore', 1705973909, 'null', 1, 'admin', 1, 'admin', '2024-01-23 09:38:29', '2024-01-23 09:38:29', 0, NULL);
+INSERT INTO t_sys_resource (id, pid, ui_path, `type`, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES(160, 49, 'dbms23ax/xleaiec2/3NUXQFIO/', 2, 1, '数据库备份', 'db:backup', 1705973876, 'null', 1, 'admin', 1, 'admin', '2024-01-23 09:37:56', '2024-01-23 09:37:56', 0, NULL);
+INSERT INTO t_sys_resource (id, pid, ui_path, `type`, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES(161, 49, 'dbms23ax/xleaiec2/ghErkTdb/', 2, 1, '数据库恢复', 'db:restore', 1705973909, 'null', 1, 'admin', 1, 'admin', '2024-01-23 09:38:29', '2024-01-23 09:38:29', 0, NULL);
COMMIT;
-- ----------------------------
diff --git a/server/resources/script/sql/v1.7/v1.7.2.sql b/server/resources/script/sql/v1.7/v1.7.2.sql
index 0af8c9ba..240c5b0a 100644
--- a/server/resources/script/sql/v1.7/v1.7.2.sql
+++ b/server/resources/script/sql/v1.7/v1.7.2.sql
@@ -1,6 +1,5 @@
-INSERT INTO `t_sys_resource` (`id`, `pid`, `ui_path`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `is_deleted`, `delete_time`)
- VALUES (161, 49, 'dbms23ax/xleaiec2/3NUXQFIO/', 2, 1, '数据库备份', 'db:backup', 1705973876, 'null', 1, 'admin', 1, 'admin', '2024-01-23 09:37:56', '2024-01-23 09:37:56', 0, NULL),
- (160, 49, 'dbms23ax/xleaiec2/ghErkTdb/', 2, 1, '数据库恢复', 'db:restore', 1705973909, 'null', 1, 'admin', 1, 'admin', '2024-01-23 09:38:29', '2024-01-23 09:38:29', 0, NULL);
+INSERT INTO t_sys_resource (id, pid, ui_path, `type`, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES(160, 49, 'dbms23ax/xleaiec2/3NUXQFIO/', 2, 1, '数据库备份', 'db:backup', 1705973876, 'null', 1, 'admin', 1, 'admin', '2024-01-23 09:37:56', '2024-01-23 09:37:56', 0, NULL);
+INSERT INTO t_sys_resource (id, pid, ui_path, `type`, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES(161, 49, 'dbms23ax/xleaiec2/ghErkTdb/', 2, 1, '数据库恢复', 'db:restore', 1705973909, 'null', 1, 'admin', 1, 'admin', '2024-01-23 09:38:29', '2024-01-23 09:38:29', 0, NULL);
ALTER TABLE `t_db_backup`
ADD COLUMN `enabled_desc` varchar(64) NULL COMMENT '任务启用描述' AFTER `enabled`;
@@ -9,5 +8,5 @@ ALTER TABLE `t_db_restore`
ADD COLUMN `enabled_desc` varchar(64) NULL COMMENT '任务启用描述' AFTER `enabled`;
ALTER TABLE `t_db_backup_history`
- ADD COLUMN `restoring` int(1) NOT NULL DEFAULT '0' COMMENT '备份历史恢复标识',
- ADD COLUMN `deleting` int(1) NOT NULL DEFAULT '0' COMMENT '备份历史删除标识';
+ ADD COLUMN `restoring` tinyint(1) NOT NULL DEFAULT '0' COMMENT '备份历史恢复标识',
+ ADD COLUMN `deleting` tinyint(1) NOT NULL DEFAULT '0' COMMENT '备份历史删除标识';
diff --git a/server/resources/script/sql/v1.7/v1.7.3.sql b/server/resources/script/sql/v1.7/v1.7.3.sql
new file mode 100644
index 00000000..9ad4b82f
--- /dev/null
+++ b/server/resources/script/sql/v1.7/v1.7.3.sql
@@ -0,0 +1,5 @@
+ALTER TABLE `t_db_backup`
+ ADD COLUMN `max_save_days` int(8) NOT NULL DEFAULT '0' COMMENT '最大保存天数' AFTER `interval`;
+
+ALTER TABLE `t_db_binlog_history`
+ ADD COLUMN `last_event_time` datetime NULL DEFAULT NULL COMMENT '最新事件时间' AFTER `first_event_time`;