From f2b6e15cf4a980d359545689981a058b1bfd2e3d Mon Sep 17 00:00:00 2001 From: kanzihuang Date: Tue, 6 Feb 2024 07:16:56 +0000 Subject: [PATCH] =?UTF-8?q?!100=20=E5=AE=9A=E6=97=B6=E6=B8=85=E7=90=86?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=BA=93=E5=A4=87=E4=BB=BD=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=20*=20feat:=20=E4=BC=98=E5=8C=96=E6=95=B0=E6=8D=AE=E5=BA=93=20?= =?UTF-8?q?BINLOG=20=E5=90=8C=E6=AD=A5=E6=9C=BA=E5=88=B6=20*=20feat:=20?= =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=95=B0=E6=8D=AE=E5=BA=93=E5=AE=9E=E4=BE=8B?= =?UTF-8?q?=E5=89=8D=E9=9C=80=E5=88=A0=E9=99=A4=E5=85=B3=E8=81=94=E7=9A=84?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=BA=93=E5=A4=87=E4=BB=BD=E4=B8=8E=E6=81=A2?= =?UTF-8?q?=E5=A4=8D=E4=BB=BB=E5=8A=A1=20*=20refactor:=20=E9=87=8D?= =?UTF-8?q?=E6=9E=84=E6=95=B0=E6=8D=AE=E5=BA=93=E5=A4=87=E4=BB=BD=E4=B8=8E?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=A8=A1=E5=9D=97=20*=20feat:=20=E5=AE=9A?= =?UTF-8?q?=E6=97=B6=E6=B8=85=E7=90=86=E6=95=B0=E6=8D=AE=E5=BA=93=E5=A4=87?= =?UTF-8?q?=E4=BB=BD=E5=8E=86=E5=8F=B2=E5=92=8C=E6=9C=AC=E5=9C=B0=20Binlog?= =?UTF-8?q?=20=E6=96=87=E4=BB=B6=20*=20feat:=20=E5=8E=8B=E7=BC=A9=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BA=93=E5=A4=87=E4=BB=BD=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/views/ops/db/DbBackupEdit.vue | 24 ++- .../src/views/ops/db/InstanceList.vue | 21 ++- server/go.mod | 1 - server/internal/db/api/db.go | 2 - server/internal/db/api/db_backup.go | 3 +- .../db/api/{instance.go => db_instance.go} | 12 +- server/internal/db/api/form/db_backup.go | 1 + server/internal/db/api/vo/db_backup.go | 5 +- server/internal/db/api/vo/db_restore.go | 4 +- server/internal/db/application/application.go | 7 +- server/internal/db/application/db_backup.go | 95 ++++++++-- server/internal/db/application/db_binlog.go | 132 ++++++++++--- .../{instance.go => db_instance.go} | 53 +++++- server/internal/db/application/db_restore.go | 1 - .../internal/db/application/db_scheduler.go | 178 +++++++----------- server/internal/db/dbm/dbi/db_program.go | 4 +- server/internal/db/dbm/dbi/db_type.go | 9 + server/internal/db/dbm/dbi/dialect.go | 2 +- server/internal/db/dbm/dm/dialect.go | 4 +- server/internal/db/dbm/mssql/dialect.go | 4 +- server/internal/db/dbm/mysql/dialect.go | 4 +- server/internal/db/dbm/mysql/program.go | 126 ++++++++++--- .../internal/db/dbm/mysql/program_e2e_test.go | 6 +- server/internal/db/dbm/oracle/dialect.go | 4 +- server/internal/db/dbm/postgres/dialect.go | 4 +- server/internal/db/dbm/sqlite/dialect.go | 4 +- server/internal/db/domain/entity/db_backup.go | 5 +- server/internal/db/domain/entity/db_binlog.go | 12 +- .../db/domain/entity/db_binlog_history.go | 1 + server/internal/db/domain/entity/db_job.go | 6 - .../internal/db/domain/entity/db_restore.go | 4 - .../db/domain/repository/db_backup.go | 4 +- .../db/domain/repository/db_backup_history.go | 5 +- .../db/domain/repository/db_binlog.go | 2 +- .../db/domain/repository/db_binlog_history.go | 2 + .../internal/db/domain/repository/db_job.go | 16 +- .../db/domain/repository/db_restore.go | 2 +- .../infrastructure/persistence/db_backup.go | 9 +- .../persistence/db_backup_history.go | 23 ++- .../persistence/db_binlog_history.go | 16 +- .../infrastructure/persistence/db_job_base.go | 10 +- .../infrastructure/persistence/db_restore.go | 4 +- .../db/infrastructure/persistence/instance.go | 2 +- .../infrastructure/persistence/persistence.go | 2 +- server/pkg/biz/assert.go | 6 + server/pkg/runner/runner.go | 2 +- server/resources/data/mayfly-go.sqlite | Bin 233472 -> 237568 bytes .../resources/script/sql/mayfly-go-sqlite.sql | 10 +- server/resources/script/sql/mayfly-go.sql | 10 +- server/resources/script/sql/v1.7/v1.7.2.sql | 9 +- server/resources/script/sql/v1.7/v1.7.3.sql | 5 + 51 files changed, 572 insertions(+), 305 deletions(-) rename server/internal/db/api/{instance.go => db_instance.go} (87%) rename server/internal/db/application/{instance.go => db_instance.go} (66%) create mode 100644 server/resources/script/sql/v1.7/v1.7.3.sql diff --git a/mayfly_go_web/src/views/ops/db/DbBackupEdit.vue b/mayfly_go_web/src/views/ops/db/DbBackupEdit.vue index 6ad17b2b..3ea74435 100644 --- a/mayfly_go_web/src/views/ops/db/DbBackupEdit.vue +++ b/mayfly_go_web/src/views/ops/db/DbBackupEdit.vue @@ -28,8 +28,11 @@ - - + + + + + @@ -92,6 +95,14 @@ const rules = { trigger: ['change', 'blur'], }, ], + maxSaveDays: [ + { + required: true, + pattern: /^[0-9]\d*$/, + message: '请输入非负整数', + trigger: ['change', 'blur'], + }, + ], }; const backupForm: any = ref(null); @@ -102,9 +113,10 @@ const state = reactive({ dbId: 0, dbNames: '', name: '', - intervalDay: null, + intervalDay: 1, startTime: null as any, - repeated: null as any, + repeated: true, + maxSaveDays: 0, }, btnLoading: false, dbNamesSelected: [] as any, @@ -137,12 +149,14 @@ const init = (data: any) => { state.form.name = data.name; state.form.intervalDay = data.intervalDay; state.form.startTime = data.startTime; + state.form.maxSaveDays = data.maxSaveDays; } else { state.editOrCreate = false; state.form.name = ''; - state.form.intervalDay = null; + state.form.intervalDay = 1; const now = new Date(); state.form.startTime = new Date(now.getFullYear(), now.getMonth(), now.getDate() + 1); + state.form.maxSaveDays = 0; getDbNamesWithoutBackup(); } }; diff --git a/mayfly_go_web/src/views/ops/db/InstanceList.vue b/mayfly_go_web/src/views/ops/db/InstanceList.vue index ecebb0cf..029ddd8b 100644 --- a/mayfly_go_web/src/views/ops/db/InstanceList.vue +++ b/mayfly_go_web/src/views/ops/db/InstanceList.vue @@ -25,6 +25,7 @@ @@ -91,7 +92,7 @@ const columns = ref([ ]); // 该用户拥有的的操作列按钮权限 -const actionBtns = hasPerms([perms.saveInstance]); +const actionBtns = hasPerms(Object.values(perms)); const actionColumn = TableColumn.new('action', '操作').isSlot().setMinWidth(110).fixedRight().alignCenter(); const pageTableRef: Ref = ref(null); @@ -150,14 +151,26 @@ const editInstance = async (data: any) => { state.instanceEditDialog.visible = true; }; -const deleteInstance = async () => { +const deleteInstance = async (data: any) => { try { - await ElMessageBox.confirm(`确定删除数据库实例【${state.selectionData.map((x: any) => x.name).join(', ')}】?`, '提示', { + let instanceName: string; + if (data) { + instanceName = data.name; + } else { + instanceName = state.selectionData.map((x: any) => x.name).join(', '); + } + await ElMessageBox.confirm(`确定删除数据库实例【${instanceName}】?`, '提示', { confirmButtonText: '确定', cancelButtonText: '取消', type: 'warning', }); - await dbApi.deleteInstance.request({ id: state.selectionData.map((x: any) => x.id).join(',') }); + let instanceId: string; + if (data) { + instanceId = data.id; + } else { + instanceId = state.selectionData.map((x: any) => x.id).join(','); + } + await dbApi.deleteInstance.request({ id: instanceId }); ElMessage.success('删除成功'); search(); } catch (err) { diff --git a/server/go.mod b/server/go.mod index 7db5b09f..0e2ee3ff 100644 --- a/server/go.mod +++ b/server/go.mod @@ -38,7 +38,6 @@ require ( // gorm gorm.io/driver/mysql v1.5.2 gorm.io/gorm v1.25.6 - ) require ( diff --git a/server/internal/db/api/db.go b/server/internal/db/api/db.go index fff38878..c4a8f679 100644 --- a/server/internal/db/api/db.go +++ b/server/internal/db/api/db.go @@ -78,8 +78,6 @@ func (d *Db) DeleteDb(rc *req.Ctx) { d.DbApp.Delete(ctx, dbId) // 删除该库的sql执行记录 d.DbSqlExecApp.DeleteBy(ctx, &entity.DbSqlExec{DbId: dbId}) - - // todo delete restore task and histories } } diff --git a/server/internal/db/api/db_backup.go b/server/internal/db/api/db_backup.go index 6609a8c1..49566203 100644 --- a/server/internal/db/api/db_backup.go +++ b/server/internal/db/api/db_backup.go @@ -81,6 +81,7 @@ func (d *DbBackup) Update(rc *req.Ctx) { job.Name = backupForm.Name job.StartTime = backupForm.StartTime job.Interval = backupForm.Interval + job.MaxSaveDays = backupForm.MaxSaveDays biz.ErrIsNilAppendErr(d.backupApp.Update(rc.MetaCtx, job), "保存数据库备份任务失败: %v") } @@ -178,7 +179,7 @@ func (d *DbBackup) GetHistoryPageList(rc *req.Ctx) { rc.ResData = res } -// RestoreHistories 删除数据库备份历史 +// RestoreHistories 从数据库备份历史中恢复数据库 // @router /api/dbs/:dbId/backup-histories/:backupHistoryId/restore [POST] func (d *DbBackup) RestoreHistories(rc *req.Ctx) { pm := ginx.PathParam(rc.GinCtx, "backupHistoryId") diff --git a/server/internal/db/api/instance.go b/server/internal/db/api/db_instance.go similarity index 87% rename from server/internal/db/api/instance.go rename to server/internal/db/api/db_instance.go index c0cb4159..c95af839 100644 --- a/server/internal/db/api/instance.go +++ b/server/internal/db/api/db_instance.go @@ -87,16 +87,10 @@ func (d *Instance) DeleteInstance(rc *req.Ctx) { for _, v := range ids { value, err := strconv.Atoi(v) - biz.ErrIsNilAppendErr(err, "string类型转换为int异常: %s") + biz.ErrIsNilAppendErr(err, "删除数据库实例失败: %s") instanceId := uint64(value) - if d.DbApp.Count(&entity.DbQuery{InstanceId: instanceId}) != 0 { - instance, err := d.InstanceApp.GetById(new(entity.DbInstance), instanceId, "name") - biz.ErrIsNil(err, "获取数据库实例错误,数据库实例ID为: %d", instance.Id) - biz.IsTrue(false, "不能删除数据库实例【%s】,请先删除其关联的数据库资源。", instance.Name) - } - // todo check if backup task has been disabled and backup histories have been deleted - - d.InstanceApp.Delete(rc.MetaCtx, instanceId) + err = d.InstanceApp.Delete(rc.MetaCtx, instanceId) + biz.ErrIsNilAppendErr(err, "删除数据库实例失败: %s") } } diff --git a/server/internal/db/api/form/db_backup.go b/server/internal/db/api/form/db_backup.go index dacd0571..9c5747a5 100644 --- a/server/internal/db/api/form/db_backup.go +++ b/server/internal/db/api/form/db_backup.go @@ -14,6 +14,7 @@ type DbBackupForm struct { Interval time.Duration `json:"-"` // 间隔时间: 为零表示单次执行,为正表示反复执行 IntervalDay uint64 `json:"intervalDay"` // 间隔天数: 为零表示单次执行,为正表示反复执行 Repeated bool `json:"repeated"` // 是否重复执行 + MaxSaveDays int `json:"maxSaveDays"` // 数据库备份历史保留天数,过期将自动删除 } func (restore *DbBackupForm) UnmarshalJSON(data []byte) error { diff --git a/server/internal/db/api/vo/db_backup.go b/server/internal/db/api/vo/db_backup.go index fdfe17e7..6579398a 100644 --- a/server/internal/db/api/vo/db_backup.go +++ b/server/internal/db/api/vo/db_backup.go @@ -15,6 +15,7 @@ type DbBackup struct { StartTime time.Time `json:"startTime"` // 开始时间 Interval time.Duration `json:"-"` // 间隔时间 IntervalDay uint64 `json:"intervalDay" gorm:"-"` // 间隔天数 + MaxSaveDays int `json:"maxSaveDays"` // 数据库备份历史保留天数,过期将自动删除 Enabled bool `json:"enabled"` // 是否启用 EnabledDesc string `json:"enabledDesc"` // 启用状态描述 LastTime timex.NullTime `json:"lastTime"` // 最近一次执行时间 @@ -29,9 +30,9 @@ func (backup *DbBackup) MarshalJSON() ([]byte, error) { backup.IntervalDay = uint64(backup.Interval / time.Hour / 24) if len(backup.EnabledDesc) == 0 { if backup.Enabled { - backup.EnabledDesc = "任务已启用" + backup.EnabledDesc = "已启用" } else { - backup.EnabledDesc = "任务已禁用" + backup.EnabledDesc = "已禁用" } } return json.Marshal((*dbBackup)(backup)) diff --git a/server/internal/db/api/vo/db_restore.go b/server/internal/db/api/vo/db_restore.go index 99885c45..ae188cf6 100644 --- a/server/internal/db/api/vo/db_restore.go +++ b/server/internal/db/api/vo/db_restore.go @@ -30,9 +30,9 @@ func (restore *DbRestore) MarshalJSON() ([]byte, error) { restore.IntervalDay = uint64(restore.Interval / time.Hour / 24) if len(restore.EnabledDesc) == 0 { if restore.Enabled { - restore.EnabledDesc = "任务已启用" + restore.EnabledDesc = "已启用" } else { - restore.EnabledDesc = "任务已禁用" + restore.EnabledDesc = "已禁用" } } return json.Marshal((*dbBackup)(restore)) diff --git a/server/internal/db/application/application.go b/server/internal/db/application/application.go index a57a335a..adb67ec4 100644 --- a/server/internal/db/application/application.go +++ b/server/internal/db/application/application.go @@ -25,10 +25,13 @@ func InitIoc() { func Init() { sync.OnceFunc(func() { if err := GetDbBackupApp().Init(); err != nil { - panic(fmt.Sprintf("初始化 dbBackupApp 失败: %v", err)) + panic(fmt.Sprintf("初始化 DbBackupApp 失败: %v", err)) } if err := GetDbRestoreApp().Init(); err != nil { - panic(fmt.Sprintf("初始化 dbRestoreApp 失败: %v", err)) + panic(fmt.Sprintf("初始化 DbRestoreApp 失败: %v", err)) + } + if err := GetDbBinlogApp().Init(); err != nil { + panic(fmt.Sprintf("初始化 DbBinlogApp 失败: %v", err)) } GetDataSyncTaskApp().InitCronJob() })() diff --git a/server/internal/db/application/db_backup.go b/server/internal/db/application/db_backup.go index 29a48208..25c0c267 100644 --- a/server/internal/db/application/db_backup.go +++ b/server/internal/db/application/db_backup.go @@ -6,15 +6,24 @@ import ( "errors" "fmt" "gorm.io/gorm" + "math" "mayfly-go/internal/db/domain/entity" "mayfly-go/internal/db/domain/repository" "mayfly-go/pkg/logx" "mayfly-go/pkg/model" + "mayfly-go/pkg/utils/timex" "sync" + "time" "github.com/google/uuid" ) +const maxBackupHistoryDays = 30 + +var ( + errRestoringBackupHistory = errors.New("正在从备份历史中恢复数据库") +) + type DbBackupApp struct { scheduler *dbScheduler `inject:"DbScheduler"` backupRepo repository.DbBackup `inject:"DbBackupRepo"` @@ -22,6 +31,10 @@ type DbBackupApp struct { restoreRepo repository.DbRestore `inject:"DbRestoreRepo"` dbApp Db `inject:"DbApp"` mutex sync.Mutex + closed chan struct{} + wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc } func (app *DbBackupApp) Init() error { @@ -32,11 +45,68 @@ func (app *DbBackupApp) Init() error { if err := app.scheduler.AddJob(context.Background(), jobs); err != nil { return err } + app.ctx, app.cancel = context.WithCancel(context.Background()) + app.wg.Add(1) + go func() { + defer app.wg.Done() + for app.ctx.Err() == nil { + if err := app.prune(app.ctx); err != nil { + logx.Errorf("清理数据库备份历史失败: %s", err.Error()) + timex.SleepWithContext(app.ctx, time.Minute*15) + continue + } + timex.SleepWithContext(app.ctx, time.Hour*24) + } + }() + return nil +} + +func (app *DbBackupApp) prune(ctx context.Context) error { + var jobs []*entity.DbBackup + if err := app.backupRepo.ListByCond(map[string]any{}, &jobs); err != nil { + return err + } + for _, job := range jobs { + if ctx.Err() != nil { + return nil + } + var histories []*entity.DbBackupHistory + historyCond := map[string]any{ + "db_backup_id": job.Id, + } + if err := app.backupHistoryRepo.ListByCondOrder(historyCond, &histories, "id"); err != nil { + return err + } + expiringTime := time.Now().Add(-math.MaxInt64) + if job.MaxSaveDays > 0 { + expiringTime = time.Now().Add(-time.Hour * 24 * time.Duration(job.MaxSaveDays+1)) + } + for _, history := range histories { + if ctx.Err() != nil { + return nil + } + if history.CreateTime.After(expiringTime) { + break + } + err := app.DeleteHistory(ctx, history.Id) + if errors.Is(err, errRestoringBackupHistory) { + break + } + if err != nil { + return err + } + } + } return nil } func (app *DbBackupApp) Close() { app.scheduler.Close() + if app.cancel != nil { + app.cancel() + app.cancel = nil + } + app.wg.Wait() } func (app *DbBackupApp) Create(ctx context.Context, jobs []*entity.DbBackup) error { @@ -61,7 +131,6 @@ func (app *DbBackupApp) Update(ctx context.Context, job *entity.DbBackup) error } func (app *DbBackupApp) Delete(ctx context.Context, jobId uint64) error { - // todo: 删除数据库备份历史文件 app.mutex.Lock() defer app.mutex.Unlock() @@ -76,7 +145,7 @@ func (app *DbBackupApp) Delete(ctx context.Context, jobId uint64) error { default: return err case err == nil: - return fmt.Errorf("数据库备份存在历史记录【%s】,无法删除该任务", history.Name) + return fmt.Errorf("请先删除关联的数据库备份历史【%s】", history.Name) case errors.Is(err, gorm.ErrRecordNotFound): } if err := app.backupRepo.DeleteById(ctx, jobId); err != nil { @@ -184,27 +253,18 @@ func NewIncUUID() (uuid.UUID, error) { } func (app *DbBackupApp) DeleteHistory(ctx context.Context, historyId uint64) (retErr error) { - // todo: 删除数据库备份历史文件 app.mutex.Lock() defer app.mutex.Unlock() + if _, err := app.backupHistoryRepo.UpdateDeleting(false, historyId); err != nil { + return err + } ok, err := app.backupHistoryRepo.UpdateDeleting(true, historyId) if err != nil { return err } - defer func() { - _, err = app.backupHistoryRepo.UpdateDeleting(false, historyId) - if err == nil { - return - } - if retErr == nil { - retErr = err - return - } - retErr = fmt.Errorf("%w, %w", retErr, err) - }() if !ok { - return errors.New("正在从备份历史中恢复数据库") + return errRestoringBackupHistory } job := &entity.DbBackupHistory{} if err := app.backupHistoryRepo.GetById(job, historyId); err != nil { @@ -214,7 +274,10 @@ func (app *DbBackupApp) DeleteHistory(ctx context.Context, historyId uint64) (re if err != nil { return err } - dbProgram := conn.GetDialect().GetDbProgram() + dbProgram, err := conn.GetDialect().GetDbProgram() + if err != nil { + return err + } if err := dbProgram.RemoveBackupHistory(ctx, job.DbBackupId, job.Uuid); err != nil { return err } diff --git a/server/internal/db/application/db_binlog.go b/server/internal/db/application/db_binlog.go index 844a110d..121f0849 100644 --- a/server/internal/db/application/db_binlog.go +++ b/server/internal/db/application/db_binlog.go @@ -2,6 +2,7 @@ package application import ( "context" + "math" "mayfly-go/internal/db/domain/entity" "mayfly-go/internal/db/domain/repository" "mayfly-go/pkg/logx" @@ -11,9 +12,13 @@ import ( ) type DbBinlogApp struct { - scheduler *dbScheduler `inject:"DbScheduler"` - binlogRepo repository.DbBinlog `inject:"DbBinlogRepo"` - backupRepo repository.DbBackup `inject:"DbBackupRepo"` + scheduler *dbScheduler `inject:"DbScheduler"` + binlogRepo repository.DbBinlog `inject:"DbBinlogRepo"` + binlogHistoryRepo repository.DbBinlogHistory `inject:"DbBinlogHistoryRepo"` + backupRepo repository.DbBackup `inject:"DbBackupRepo"` + backupHistoryRepo repository.DbBackupHistory `inject:"DbBackupHistoryRepo"` + instanceRepo repository.Instance `inject:"DbInstanceRepo"` + dbApp Db `inject:"DbApp"` context context.Context cancel context.CancelFunc @@ -26,41 +31,113 @@ func newDbBinlogApp() *DbBinlogApp { context: ctx, cancel: cancel, } - svc.waitGroup.Add(1) - go svc.run() return svc } +func (app *DbBinlogApp) Init() error { + app.context, app.cancel = context.WithCancel(context.Background()) + app.waitGroup.Add(1) + go app.run() + return nil +} + func (app *DbBinlogApp) run() { defer app.waitGroup.Done() - // todo: 实现 binlog 并发下载 - timex.SleepWithContext(app.context, time.Minute) - for !app.closed() { - jobs, err := app.loadJobs() - if err != nil { - logx.Errorf("DbBinlogApp: 加载 BINLOG 同步任务失败: %s", err.Error()) + for app.context.Err() == nil { + if err := app.fetchBinlog(app.context); err != nil { timex.SleepWithContext(app.context, time.Minute) continue } - if app.closed() { - break - } - if err := app.scheduler.AddJob(app.context, jobs); err != nil { - logx.Error("DbBinlogApp: 添加 BINLOG 同步任务失败: ", err.Error()) + if err := app.pruneBinlog(app.context); err != nil { + timex.SleepWithContext(app.context, time.Minute) + continue } timex.SleepWithContext(app.context, entity.BinlogDownloadInterval) } } -func (app *DbBinlogApp) loadJobs() ([]*entity.DbBinlog, error) { +func (app *DbBinlogApp) fetchBinlog(ctx context.Context) error { + jobs, err := app.loadJobs(ctx) + if err != nil { + logx.Errorf("DbBinlogApp: 加载 BINLOG 同步任务失败: %s", err.Error()) + timex.SleepWithContext(app.context, time.Minute) + return err + } + if ctx.Err() != nil { + return ctx.Err() + } + if err := app.scheduler.AddJob(app.context, jobs); err != nil { + logx.Error("DbBinlogApp: 添加 BINLOG 同步任务失败: ", err.Error()) + return err + } + return nil +} + +func (app *DbBinlogApp) pruneBinlog(ctx context.Context) error { + var jobs []*entity.DbBinlog + if err := app.binlogRepo.ListByCond(map[string]any{}, &jobs); err != nil { + logx.Error("DbBinlogApp: 获取 BINLOG 同步任务失败: ", err.Error()) + return err + } + for _, instance := range jobs { + if ctx.Err() != nil { + return ctx.Err() + } + var histories []*entity.DbBinlogHistory + backupHistory, backupHistoryExists, err := app.backupHistoryRepo.GetEarliestHistoryForBinlog(instance.Id) + if err != nil { + logx.Errorf("DbBinlogApp: 获取数据库备份历史失败: %s", err.Error()) + return err + } + var binlogSeq int64 = math.MaxInt64 + if backupHistoryExists { + binlogSeq = backupHistory.BinlogSequence + } + if err := app.binlogHistoryRepo.GetHistoriesBeforeSequence(ctx, instance.Id, binlogSeq, &histories); err != nil { + logx.Errorf("DbBinlogApp: 获取数据库 BINLOG 历史失败: %s", err.Error()) + return err + } + conn, err := app.dbApp.GetDbConnByInstanceId(instance.Id) + if err != nil { + logx.Errorf("DbBinlogApp: 创建数据库连接失败: %s", err.Error()) + return err + } + dbProgram, err := conn.GetDialect().GetDbProgram() + if err != nil { + logx.Errorf("DbBinlogApp: 获取数据库备份与恢复程序失败: %s", err.Error()) + return err + } + for i, history := range histories { + // todo: 在避免并发访问的前提下删除本地最新的 BINLOG 文件 + if !backupHistoryExists && i == len(histories)-1 { + // 暂不删除本地最新的 BINLOG 文件 + break + } + if ctx.Err() != nil { + return ctx.Err() + } + if err := dbProgram.PruneBinlog(history); err != nil { + logx.Errorf("清理 BINLOG 文件失败: %v", err) + continue + } + if err := app.binlogHistoryRepo.DeleteById(ctx, history.Id); err != nil { + logx.Errorf("删除 BINLOG 历史失败: %v", err) + continue + } + } + } + return nil +} + +func (app *DbBinlogApp) loadJobs(ctx context.Context) ([]*entity.DbBinlog, error) { var instanceIds []uint64 if err := app.backupRepo.ListDbInstances(true, true, &instanceIds); err != nil { return nil, err } jobs := make([]*entity.DbBinlog, 0, len(instanceIds)) for _, id := range instanceIds { - if app.closed() { + if ctx.Err() != nil { break } binlog := entity.NewDbBinlog(id) @@ -73,14 +150,15 @@ func (app *DbBinlogApp) loadJobs() ([]*entity.DbBinlog, error) { } func (app *DbBinlogApp) Close() { - app.cancel() + cancel := app.cancel + if cancel == nil { + return + } + app.cancel = nil + cancel() app.waitGroup.Wait() } -func (app *DbBinlogApp) closed() bool { - return app.context.Err() != nil -} - func (app *DbBinlogApp) AddJobIfNotExists(ctx context.Context, job *entity.DbBinlog) error { if err := app.binlogRepo.AddJobIfNotExists(ctx, job); err != nil { return err @@ -90,11 +168,3 @@ func (app *DbBinlogApp) AddJobIfNotExists(ctx context.Context, job *entity.DbBin } return nil } - -func (app *DbBinlogApp) Delete(ctx context.Context, jobId uint64) error { - // todo: 删除 Binlog 历史文件 - if err := app.binlogRepo.DeleteById(ctx, jobId); err != nil { - return err - } - return nil -} diff --git a/server/internal/db/application/instance.go b/server/internal/db/application/db_instance.go similarity index 66% rename from server/internal/db/application/instance.go rename to server/internal/db/application/db_instance.go index 94571cc1..39574b7b 100644 --- a/server/internal/db/application/instance.go +++ b/server/internal/db/application/db_instance.go @@ -2,11 +2,14 @@ package application import ( "context" + "errors" + "gorm.io/gorm" "mayfly-go/internal/db/dbm" "mayfly-go/internal/db/dbm/dbi" "mayfly-go/internal/db/domain/entity" "mayfly-go/internal/db/domain/repository" "mayfly-go/pkg/base" + "mayfly-go/pkg/biz" "mayfly-go/pkg/errorx" "mayfly-go/pkg/model" ) @@ -32,6 +35,10 @@ type Instance interface { type instanceAppImpl struct { base.AppImpl[*entity.DbInstance, repository.Instance] + + dbApp Db `inject:"DbApp"` + backupApp *DbBackupApp `inject:"DbBackupApp"` + restoreApp *DbRestoreApp `inject:"DbRestoreApp"` } // 注入DbInstanceRepo @@ -96,8 +103,50 @@ func (app *instanceAppImpl) Save(ctx context.Context, instanceEntity *entity.DbI return app.UpdateById(ctx, instanceEntity) } -func (app *instanceAppImpl) Delete(ctx context.Context, id uint64) error { - return app.DeleteById(ctx, id) +func (app *instanceAppImpl) Delete(ctx context.Context, instanceId uint64) error { + instance, err := app.GetById(new(entity.DbInstance), instanceId, "name") + biz.ErrIsNil(err, "获取数据库实例错误,数据库实例ID为: %d", instance.Id) + + restore := &entity.DbRestore{ + DbInstanceId: instanceId, + } + err = app.restoreApp.restoreRepo.GetBy(restore) + switch { + case err == nil: + biz.ErrNotNil(err, "不能删除数据库实例【%s】,请先删除关联的数据库恢复任务。", instance.Name) + case errors.Is(err, gorm.ErrRecordNotFound): + break + default: + biz.ErrIsNil(err, "删除数据库实例失败: %v", err) + } + + backup := &entity.DbBackup{ + DbInstanceId: instanceId, + } + err = app.backupApp.backupRepo.GetBy(backup) + switch { + case err == nil: + biz.ErrNotNil(err, "不能删除数据库实例【%s】,请先删除关联的数据库备份任务。", instance.Name) + case errors.Is(err, gorm.ErrRecordNotFound): + break + default: + biz.ErrIsNil(err, "删除数据库实例失败: %v", err) + } + + db := &entity.Db{ + InstanceId: instanceId, + } + err = app.dbApp.GetBy(db) + switch { + case err == nil: + biz.ErrNotNil(err, "不能删除数据库实例【%s】,请先删除关联的数据库资源。", instance.Name) + case errors.Is(err, gorm.ErrRecordNotFound): + break + default: + biz.ErrIsNil(err, "删除数据库实例失败: %v", err) + } + + return app.DeleteById(ctx, instanceId) } func (app *instanceAppImpl) GetDatabases(ed *entity.DbInstance) ([]string, error) { diff --git a/server/internal/db/application/db_restore.go b/server/internal/db/application/db_restore.go index cf57d312..1a9288d5 100644 --- a/server/internal/db/application/db_restore.go +++ b/server/internal/db/application/db_restore.go @@ -55,7 +55,6 @@ func (app *DbRestoreApp) Update(ctx context.Context, job *entity.DbRestore) erro } func (app *DbRestoreApp) Delete(ctx context.Context, jobId uint64) error { - // todo: 删除数据库恢复历史文件 app.mutex.Lock() defer app.mutex.Unlock() diff --git a/server/internal/db/application/db_scheduler.go b/server/internal/db/application/db_scheduler.go index 930d7fd5..45700547 100644 --- a/server/internal/db/application/db_scheduler.go +++ b/server/internal/db/application/db_scheduler.go @@ -4,12 +4,14 @@ import ( "context" "errors" "fmt" + "golang.org/x/sync/singleflight" "gorm.io/gorm" "mayfly-go/internal/db/dbm/dbi" "mayfly-go/internal/db/domain/entity" "mayfly-go/internal/db/domain/repository" "mayfly-go/pkg/runner" "reflect" + "strconv" "sync" "time" ) @@ -28,6 +30,7 @@ type dbScheduler struct { restoreHistoryRepo repository.DbRestoreHistory `inject:"DbRestoreHistoryRepo"` binlogRepo repository.DbBinlog `inject:"DbBinlogRepo"` binlogHistoryRepo repository.DbBinlogHistory `inject:"DbBinlogHistoryRepo"` + sfGroup singleflight.Group } func newDbScheduler() *dbScheduler { @@ -76,7 +79,6 @@ func (s *dbScheduler) AddJob(ctx context.Context, jobs any) error { } func (s *dbScheduler) RemoveJob(ctx context.Context, jobType entity.DbJobType, jobId uint64) error { - // todo: 删除数据库备份历史文件 s.mutex.Lock() defer s.mutex.Unlock() @@ -110,12 +112,11 @@ func (s *dbScheduler) StartJobNow(ctx context.Context, job entity.DbJob) error { return nil } -func (s *dbScheduler) backup(ctx context.Context, dbProgram dbi.DbProgram, job entity.DbJob) error { +func (s *dbScheduler) backup(ctx context.Context, dbProgram dbi.DbProgram, backup *entity.DbBackup) error { id, err := NewIncUUID() if err != nil { return err } - backup := job.(*entity.DbBackup) history := &entity.DbBackupHistory{ Uuid: id.String(), DbBackupId: backup.Id, @@ -143,45 +144,29 @@ func (s *dbScheduler) backup(ctx context.Context, dbProgram dbi.DbProgram, job e return nil } -func (s *dbScheduler) restore(ctx context.Context, dbProgram dbi.DbProgram, job entity.DbJob) error { - restore := job.(*entity.DbRestore) +func (s *dbScheduler) singleFlightFetchBinlog(ctx context.Context, dbProgram dbi.DbProgram, instanceId uint64, targetTime time.Time) error { + key := strconv.FormatUint(instanceId, 10) + for ctx.Err() == nil { + c := s.sfGroup.DoChan(key, func() (interface{}, error) { + if err := s.fetchBinlog(ctx, dbProgram, instanceId, true, targetTime); err != nil { + return targetTime, err + } + return targetTime, nil + }) + select { + case res := <-c: + if targetTime.Compare(res.Val.(time.Time)) <= 0 { + return res.Err + } + case <-ctx.Done(): + } + } + return ctx.Err() +} + +func (s *dbScheduler) restore(ctx context.Context, dbProgram dbi.DbProgram, restore *entity.DbRestore) error { if restore.PointInTime.Valid { - //if enabled, err := dbProgram.CheckBinlogEnabled(ctx); err != nil { - // return err - //} else if !enabled { - // return errors.New("数据库未启用 BINLOG") - //} - //if enabled, err := dbProgram.CheckBinlogRowFormat(ctx); err != nil { - // return err - //} else if !enabled { - // return errors.New("数据库未启用 BINLOG 行模式") - //} - // - //latestBinlogSequence, earliestBackupSequence := int64(-1), int64(-1) - //binlogHistory, ok, err := s.binlogHistoryRepo.GetLatestHistory(restore.DbInstanceId) - //if err != nil { - // return err - //} - //if ok { - // latestBinlogSequence = binlogHistory.Sequence - //} else { - // backupHistory, ok, err := s.backupHistoryRepo.GetEarliestHistory(restore.DbInstanceId) - // if err != nil { - // return err - // } - // if !ok { - // return nil - // } - // earliestBackupSequence = backupHistory.BinlogSequence - //} - //binlogFiles, err := dbProgram.FetchBinlogs(ctx, true, earliestBackupSequence, latestBinlogSequence) - //if err != nil { - // return err - //} - //if err := s.binlogHistoryRepo.InsertWithBinlogFiles(ctx, restore.DbInstanceId, binlogFiles); err != nil { - // return err - //} - if err := s.fetchBinlog(ctx, dbProgram, job.GetInstanceId(), true); err != nil { + if err := s.fetchBinlog(ctx, dbProgram, restore.DbInstanceId, true, restore.PointInTime.Time); err != nil { return err } if err := s.restorePointInTime(ctx, dbProgram, restore); err != nil { @@ -210,102 +195,68 @@ func (s *dbScheduler) restore(ctx context.Context, dbProgram dbi.DbProgram, job return nil } -//func (s *dbScheduler) updateLastStatus(ctx context.Context, job entity.DbJob) error { -// switch typ := job.GetJobType(); typ { -// case entity.DbJobTypeBackup: -// return s.backupRepo.UpdateLastStatus(ctx, job) -// case entity.DbJobTypeRestore: -// return s.restoreRepo.UpdateLastStatus(ctx, job) -// case entity.DbJobTypeBinlog: -// return s.binlogRepo.UpdateLastStatus(ctx, job) -// default: -// panic(fmt.Errorf("无效的数据库任务类型: %v", typ)) -// } -//} - func (s *dbScheduler) updateJob(ctx context.Context, job entity.DbJob) error { - switch typ := job.GetJobType(); typ { - case entity.DbJobTypeBackup: - return s.backupRepo.UpdateById(ctx, job) - case entity.DbJobTypeRestore: - return s.restoreRepo.UpdateById(ctx, job) - case entity.DbJobTypeBinlog: - return s.binlogRepo.UpdateById(ctx, job) + switch t := job.(type) { + case *entity.DbBackup: + return s.backupRepo.UpdateById(ctx, t) + case *entity.DbRestore: + return s.restoreRepo.UpdateById(ctx, t) + case *entity.DbBinlog: + return s.binlogRepo.UpdateById(ctx, t) default: - return fmt.Errorf("无效的数据库任务类型: %v", typ) + return fmt.Errorf("无效的数据库任务类型: %T", t) } } func (s *dbScheduler) runJob(ctx context.Context, job entity.DbJob) error { - //job.SetLastStatus(entity.DbJobRunning, nil) - //if err := s.updateLastStatus(ctx, job); err != nil { - // logx.Errorf("failed to update job status: %v", err) - // return - //} - - //var errRun error conn, err := s.dbApp.GetDbConnByInstanceId(job.GetInstanceId()) if err != nil { return err } - dbProgram := conn.GetDialect().GetDbProgram() - switch typ := job.GetJobType(); typ { - case entity.DbJobTypeBackup: - return s.backup(ctx, dbProgram, job) - case entity.DbJobTypeRestore: - return s.restore(ctx, dbProgram, job) - case entity.DbJobTypeBinlog: - return s.fetchBinlog(ctx, dbProgram, job.GetInstanceId(), false) - default: - return fmt.Errorf("无效的数据库任务类型: %v", typ) + dbProgram, err := conn.GetDialect().GetDbProgram() + if err != nil { + return err + } + switch t := job.(type) { + case *entity.DbBackup: + return s.backup(ctx, dbProgram, t) + case *entity.DbRestore: + return s.restore(ctx, dbProgram, t) + case *entity.DbBinlog: + return s.fetchBinlog(ctx, dbProgram, t.DbInstanceId, false, time.Now()) + default: + return fmt.Errorf("无效的数据库任务类型: %T", t) } - //status := entity.DbJobSuccess - //if errRun != nil { - // status = entity.DbJobFailed - //} - //job.SetLastStatus(status, errRun) - //if err := s.updateLastStatus(ctx, job); err != nil { - // logx.Errorf("failed to update job status: %v", err) - // return - //} } -func (s *dbScheduler) runnableJob(job entity.DbJob, next runner.NextJobFunc[entity.DbJob]) (bool, error) { +func (s *dbScheduler) runnableJob(job entity.DbJob, nextRunning runner.NextJobFunc[entity.DbJob]) (bool, error) { if job.IsExpired() { return false, runner.ErrJobExpired } const maxCountByInstanceId = 4 const maxCountByDbName = 1 var countByInstanceId, countByDbName int - for item, ok := next(); ok; item, ok = next() { + for item, ok := nextRunning(); ok; item, ok = nextRunning() { if job.GetInstanceId() == item.GetInstanceId() { countByInstanceId++ if countByInstanceId >= maxCountByInstanceId { return false, nil } - - if relatedToBinlog(job.GetJobType()) { - // todo: 恢复数据库前触发 BINLOG 同步,BINLOG 同步完成后才能恢复数据库 - if relatedToBinlog(item.GetJobType()) { - return false, nil - } - } - if job.GetDbName() == item.GetDbName() { countByDbName++ if countByDbName >= maxCountByDbName { return false, nil } } + if (job.GetJobType() == entity.DbJobTypeBinlog && item.GetJobType() == entity.DbJobTypeRestore) || + (job.GetJobType() == entity.DbJobTypeRestore && item.GetJobType() == entity.DbJobTypeBinlog) { + return false, nil + } } } return true, nil } -func relatedToBinlog(typ entity.DbJobType) bool { - return typ == entity.DbJobTypeRestore || typ == entity.DbJobTypeBinlog -} - func (s *dbScheduler) restorePointInTime(ctx context.Context, dbProgram dbi.DbProgram, job *entity.DbRestore) error { binlogHistory, err := s.binlogHistoryRepo.GetHistoryByTime(job.DbInstanceId, job.PointInTime.Time) if err != nil { @@ -320,7 +271,7 @@ func (s *dbScheduler) restorePointInTime(ctx context.Context, dbProgram dbi.DbPr Sequence: binlogHistory.Sequence, Position: position, } - backupHistory, err := s.backupHistoryRepo.GetLatestHistory(job.DbInstanceId, job.DbName, target) + backupHistory, err := s.backupHistoryRepo.GetLatestHistoryForBinlog(job.DbInstanceId, job.DbName, target) if err != nil { return err } @@ -364,6 +315,9 @@ func (s *dbScheduler) restorePointInTime(ctx context.Context, dbProgram dbi.DbPr } func (s *dbScheduler) restoreBackupHistory(ctx context.Context, program dbi.DbProgram, backupHistory *entity.DbBackupHistory) (retErr error) { + if _, err := s.backupHistoryRepo.UpdateRestoring(false, backupHistory.Id); err != nil { + return err + } ok, err := s.backupHistoryRepo.UpdateRestoring(true, backupHistory.Id) if err != nil { return err @@ -385,7 +339,7 @@ func (s *dbScheduler) restoreBackupHistory(ctx context.Context, program dbi.DbPr return program.RestoreBackupHistory(ctx, backupHistory.DbName, backupHistory.DbBackupId, backupHistory.Uuid) } -func (s *dbScheduler) fetchBinlog(ctx context.Context, dbProgram dbi.DbProgram, instanceId uint64, downloadLatestBinlogFile bool) error { +func (s *dbScheduler) fetchBinlog(ctx context.Context, dbProgram dbi.DbProgram, instanceId uint64, downloadLatestBinlogFile bool, targetTime time.Time) error { if enabled, err := dbProgram.CheckBinlogEnabled(ctx); err != nil { return err } else if !enabled { @@ -397,15 +351,17 @@ func (s *dbScheduler) fetchBinlog(ctx context.Context, dbProgram dbi.DbProgram, return errors.New("数据库未启用 BINLOG 行模式") } - latestBinlogSequence, earliestBackupSequence := int64(-1), int64(-1) + earliestBackupSequence := int64(-1) binlogHistory, ok, err := s.binlogHistoryRepo.GetLatestHistory(instanceId) if err != nil { return err } - if ok { - latestBinlogSequence = binlogHistory.Sequence - } else { - backupHistory, ok, err := s.backupHistoryRepo.GetEarliestHistory(instanceId) + if downloadLatestBinlogFile && targetTime.Before(binlogHistory.LastEventTime) { + return nil + } + + if !ok { + backupHistory, ok, err := s.backupHistoryRepo.GetEarliestHistoryForBinlog(instanceId) if err != nil { return err } @@ -414,7 +370,9 @@ func (s *dbScheduler) fetchBinlog(ctx context.Context, dbProgram dbi.DbProgram, } earliestBackupSequence = backupHistory.BinlogSequence } - binlogFiles, err := dbProgram.FetchBinlogs(ctx, downloadLatestBinlogFile, earliestBackupSequence, latestBinlogSequence) + + // todo: 将循环从 dbProgram.FetchBinlogs 中提取出来,实现 BINLOG 同步成功后逐一保存 binlogHistory + binlogFiles, err := dbProgram.FetchBinlogs(ctx, downloadLatestBinlogFile, earliestBackupSequence, binlogHistory) if err != nil { return err } diff --git a/server/internal/db/dbm/dbi/db_program.go b/server/internal/db/dbm/dbi/db_program.go index 9d76319c..30dfa293 100644 --- a/server/internal/db/dbm/dbi/db_program.go +++ b/server/internal/db/dbm/dbi/db_program.go @@ -13,7 +13,7 @@ type DbProgram interface { Backup(ctx context.Context, backupHistory *entity.DbBackupHistory) (*entity.BinlogInfo, error) - FetchBinlogs(ctx context.Context, downloadLatestBinlogFile bool, earliestBackupSequence, latestBinlogSequence int64) ([]*entity.BinlogFile, error) + FetchBinlogs(ctx context.Context, downloadLatestBinlogFile bool, earliestBackupSequence int64, latestBinlogHistory *entity.DbBinlogHistory) ([]*entity.BinlogFile, error) ReplayBinlog(ctx context.Context, originalDatabase, targetDatabase string, restoreInfo *RestoreInfo) error @@ -22,6 +22,8 @@ type DbProgram interface { RemoveBackupHistory(ctx context.Context, dbBackupId uint64, dbBackupHistoryUuid string) error GetBinlogEventPositionAtOrAfterTime(ctx context.Context, binlogName string, targetTime time.Time) (position int64, parseErr error) + + PruneBinlog(history *entity.DbBinlogHistory) error } type RestoreInfo struct { diff --git a/server/internal/db/dbm/dbi/db_type.go b/server/internal/db/dbm/dbi/db_type.go index 1a020fe6..548c90b7 100644 --- a/server/internal/db/dbm/dbi/db_type.go +++ b/server/internal/db/dbm/dbi/db_type.go @@ -141,3 +141,12 @@ func (dbType DbType) StmtUseDatabase(dbName string) string { return "" } } + +func (dbType DbType) SupportingBackup() bool { + switch dbType { + case DbTypeMysql, DbTypeMariadb: + return true + default: + return false + } +} diff --git a/server/internal/db/dbm/dbi/dialect.go b/server/internal/db/dbm/dbi/dialect.go index 4410c510..db25cf45 100644 --- a/server/internal/db/dbm/dbi/dialect.go +++ b/server/internal/db/dbm/dbi/dialect.go @@ -108,7 +108,7 @@ type Dialect interface { GetSchemas() ([]string, error) // GetDbProgram 获取数据库程序模块,用于数据库备份与恢复 - GetDbProgram() DbProgram + GetDbProgram() (DbProgram, error) // 批量保存数据 BatchInsert(tx *sql.Tx, tableName string, columns []string, values [][]any) (int64, error) diff --git a/server/internal/db/dbm/dm/dialect.go b/server/internal/db/dbm/dm/dialect.go index 57448e1b..ecd0cf18 100644 --- a/server/internal/db/dbm/dm/dialect.go +++ b/server/internal/db/dbm/dm/dialect.go @@ -248,8 +248,8 @@ func (dd *DMDialect) GetSchemas() ([]string, error) { } // GetDbProgram 获取数据库程序模块,用于数据库备份与恢复 -func (dd *DMDialect) GetDbProgram() dbi.DbProgram { - panic("implement me") +func (dd *DMDialect) GetDbProgram() (dbi.DbProgram, error) { + return nil, fmt.Errorf("该数据库类型不支持数据库备份与恢复: %v", dd.dc.Info.Type) } var ( diff --git a/server/internal/db/dbm/mssql/dialect.go b/server/internal/db/dbm/mssql/dialect.go index c45a7be9..7cf3b725 100644 --- a/server/internal/db/dbm/mssql/dialect.go +++ b/server/internal/db/dbm/mssql/dialect.go @@ -285,8 +285,8 @@ func (md *MssqlDialect) GetSchemas() ([]string, error) { } // GetDbProgram 获取数据库程序模块,用于数据库备份与恢复 -func (md *MssqlDialect) GetDbProgram() dbi.DbProgram { - panic("implement me") +func (md *MssqlDialect) GetDbProgram() (dbi.DbProgram, error) { + return nil, fmt.Errorf("该数据库类型不支持数据库备份与恢复: %v", md.dc.Info.Type) } func (md *MssqlDialect) BatchInsert(tx *sql.Tx, tableName string, columns []string, values [][]any) (int64, error) { diff --git a/server/internal/db/dbm/mysql/dialect.go b/server/internal/db/dbm/mysql/dialect.go index e93a2bda..66c17388 100644 --- a/server/internal/db/dbm/mysql/dialect.go +++ b/server/internal/db/dbm/mysql/dialect.go @@ -169,8 +169,8 @@ func (md *MysqlDialect) GetSchemas() ([]string, error) { } // GetDbProgram 获取数据库程序模块,用于数据库备份与恢复 -func (md *MysqlDialect) GetDbProgram() dbi.DbProgram { - return NewDbProgramMysql(md.dc) +func (md *MysqlDialect) GetDbProgram() (dbi.DbProgram, error) { + return NewDbProgramMysql(md.dc), nil } func (md *MysqlDialect) BatchInsert(tx *sql.Tx, tableName string, columns []string, values [][]any) (int64, error) { diff --git a/server/internal/db/dbm/mysql/program.go b/server/internal/db/dbm/mysql/program.go index 4e9e4eb6..172781fa 100644 --- a/server/internal/db/dbm/mysql/program.go +++ b/server/internal/db/dbm/mysql/program.go @@ -2,6 +2,7 @@ package mysql import ( "bufio" + "compress/gzip" "context" "database/sql" "fmt" @@ -130,22 +131,46 @@ func (svc *DbProgramMysql) Backup(ctx context.Context, backupHistory *entity.DbB if binlogEnabled && rowFormatEnabled { binlogInfo, err = readBinlogInfoFromBackup(reader) } - _ = reader.Close() if err != nil { + _ = reader.Close() return nil, errors.Wrapf(err, "从备份文件中读取 binlog 信息失败") } - fileName := filepath.Join(dir, fmt.Sprintf("%s.sql", backupHistory.Uuid)) - if err := os.Rename(tmpFile, fileName); err != nil { - return nil, errors.Wrap(err, "备份文件改名失败") - } + if _, err := reader.Seek(0, io.SeekStart); err != nil { + _ = reader.Close() + return nil, errors.Wrapf(err, "跳转到备份文件开始处失败") + } + gzipTmpFile := tmpFile + ".gz" + writer, err := os.Create(gzipTmpFile) + if err != nil { + _ = reader.Close() + return nil, errors.Wrapf(err, "创建备份压缩文件失败") + } + defer func() { + _ = os.Remove(gzipTmpFile) + }() + gzipWriter := gzip.NewWriter(writer) + gzipWriter.Name = backupHistory.Uuid + ".sql" + _, err = io.Copy(gzipWriter, reader) + _ = gzipWriter.Close() + _ = writer.Close() + _ = reader.Close() + if err != nil { + return nil, errors.Wrapf(err, "压缩备份文件失败") + } + destPath := filepath.Join(dir, backupHistory.Uuid+".sql") + if err := os.Rename(gzipTmpFile, destPath+".gz"); err != nil { + return nil, errors.Wrap(err, "备份文件更名失败") + } return binlogInfo, nil } func (svc *DbProgramMysql) RemoveBackupHistory(_ context.Context, dbBackupId uint64, dbBackupHistoryUuid string) error { fileName := filepath.Join(svc.getDbBackupDir(svc.dbInfo().InstanceId, dbBackupId), fmt.Sprintf("%v.sql", dbBackupHistoryUuid)) - return os.Remove(fileName) + _ = os.Remove(fileName) + _ = os.Remove(fileName + ".gz") + return nil } func (svc *DbProgramMysql) RestoreBackupHistory(ctx context.Context, dbName string, dbBackupId uint64, dbBackupHistoryUuid string) error { @@ -158,18 +183,33 @@ func (svc *DbProgramMysql) RestoreBackupHistory(ctx context.Context, dbName stri "--password=" + dbInfo.Password, } + compressed := false fileName := filepath.Join(svc.getDbBackupDir(svc.dbInfo().InstanceId, dbBackupId), fmt.Sprintf("%v.sql", dbBackupHistoryUuid)) + _, err := os.Stat(fileName) + if err != nil { + compressed = true + fileName += ".gz" + } file, err := os.Open(fileName) if err != nil { return errors.Wrap(err, "打开备份文件失败") } - defer func() { - _ = file.Close() - }() + defer func() { _ = file.Close() }() + + var reader io.ReadCloser + if compressed { + reader, err = gzip.NewReader(file) + if err != nil { + return errors.Wrap(err, "解压缩备份文件失败") + } + defer func() { _ = reader.Close() }() + } else { + reader = file + } cmd := exec.CommandContext(ctx, svc.getMysqlBin().MysqlPath, args...) - cmd.Stdin = file + cmd.Stdin = reader logx.Debug("恢复数据库: ", cmd.String()) if err := runCmd(cmd); err != nil { logx.Errorf("运行 mysql 程序失败: %v", err) @@ -205,13 +245,17 @@ func (svc *DbProgramMysql) downloadBinlogFilesOnServer(ctx context.Context, binl } // Parse the first binlog eventTs of a local binlog file. -func (svc *DbProgramMysql) parseLocalBinlogLastEventTime(ctx context.Context, filePath string) (eventTime time.Time, parseErr error) { - // todo: implement me - return time.Now(), nil +func (svc *DbProgramMysql) parseLocalBinlogLastEventTime(ctx context.Context, filePath string, lastEventTime time.Time) (eventTime time.Time, parseErr error) { + return svc.parseLocalBinlogEventTime(ctx, filePath, false, lastEventTime) } // Parse the first binlog eventTs of a local binlog file. func (svc *DbProgramMysql) parseLocalBinlogFirstEventTime(ctx context.Context, filePath string) (eventTime time.Time, parseErr error) { + return svc.parseLocalBinlogEventTime(ctx, filePath, true, time.Time{}) +} + +// Parse the first binlog eventTs of a local binlog file. +func (svc *DbProgramMysql) parseLocalBinlogEventTime(ctx context.Context, filePath string, firstOrLast bool, startTime time.Time) (eventTime time.Time, parseErr error) { args := []string{ // Local binlog file path. filePath, @@ -220,6 +264,9 @@ func (svc *DbProgramMysql) parseLocalBinlogFirstEventTime(ctx context.Context, f // Tell mysqlbinlog to suppress the BINLOG statements for row events, which reduces the unneeded output. "--base64-output=DECODE-ROWS", } + if !startTime.IsZero() { + args = append(args, "--start-datetime", startTime.Local().Format(time.DateTime)) + } cmd := exec.CommandContext(ctx, svc.getMysqlBin().MysqlbinlogPath, args...) var stderr strings.Builder cmd.Stderr = &stderr @@ -237,22 +284,30 @@ func (svc *DbProgramMysql) parseLocalBinlogFirstEventTime(ctx context.Context, f parseErr = errors.Wrap(parseErr, stderr.String()) } }() - + lastEventTime := time.Time{} for s := bufio.NewScanner(pr); s.Scan(); { line := s.Text() eventTimeParsed, found, err := parseBinlogEventTimeInLine(line) if err != nil { return time.Time{}, errors.Wrap(err, "解析 binlog 文件失败") } - if found { - return eventTimeParsed, nil + if !found { + continue } + if !firstOrLast { + lastEventTime = eventTimeParsed + continue + } + return eventTimeParsed, nil } - return time.Time{}, errors.New("解析 binlog 文件失败") + if lastEventTime.IsZero() { + return time.Time{}, errors.New("解析 binlog 文件失败") + } + return lastEventTime, nil } // FetchBinlogs downloads binlog files from startingFileName on server to `binlogDir`. -func (svc *DbProgramMysql) FetchBinlogs(ctx context.Context, downloadLatestBinlogFile bool, earliestBackupSequence, latestBinlogSequence int64) ([]*entity.BinlogFile, error) { +func (svc *DbProgramMysql) FetchBinlogs(ctx context.Context, downloadLatestBinlogFile bool, earliestBackupSequence int64, latestBinlogHistory *entity.DbBinlogHistory) ([]*entity.BinlogFile, error) { // Read binlog files list on server. binlogFilesOnServerSorted, err := svc.GetSortedBinlogFilesOnServer(ctx) if err != nil { @@ -264,8 +319,11 @@ func (svc *DbProgramMysql) FetchBinlogs(ctx context.Context, downloadLatestBinlo } indexHistory := -1 for i, file := range binlogFilesOnServerSorted { - if latestBinlogSequence == file.Sequence { + if latestBinlogHistory.Sequence == file.Sequence { indexHistory = i + 1 + file.FirstEventTime = latestBinlogHistory.FirstEventTime + file.LastEventTime = latestBinlogHistory.LastEventTime + file.LocalSize = latestBinlogHistory.FileSize break } if earliestBackupSequence == file.Sequence { @@ -274,10 +332,15 @@ func (svc *DbProgramMysql) FetchBinlogs(ctx context.Context, downloadLatestBinlo } } if indexHistory < 0 { - return nil, errors.New(fmt.Sprintf("在数据库服务器上未找到 binlog 文件: %d, %d", earliestBackupSequence, latestBinlogSequence)) + // todo: 数据库服务器上的 binlog 序列已被删除, 导致 binlog 同步失败,如何处理? + return nil, errors.New(fmt.Sprintf("数据库服务器上的 binlog 序列已被删除: %d, %d", earliestBackupSequence, latestBinlogHistory.Sequence)) } - if indexHistory > len(binlogFilesOnServerSorted)-1 { + if indexHistory >= len(binlogFilesOnServerSorted)-1 { indexHistory = len(binlogFilesOnServerSorted) - 1 + if binlogFilesOnServerSorted[indexHistory].LocalSize == binlogFilesOnServerSorted[indexHistory].RemoteSize { + // 没有新的事件,不需要重新下载 + return nil, nil + } } binlogFilesOnServerSorted = binlogFilesOnServerSorted[indexHistory:] @@ -331,13 +394,14 @@ func (svc *DbProgramMysql) downloadBinlogFile(ctx context.Context, binlogFileToD logx.Error("未找到 binlog 文件", logx.String("path", binlogFilePathTemp), logx.String("error", err.Error())) return errors.Wrapf(err, "未找到 binlog 文件: %q", binlogFilePathTemp) } - if !isLast && binlogFileTempInfo.Size() != binlogFileToDownload.Size { + + if (isLast && binlogFileTempInfo.Size() < binlogFileToDownload.RemoteSize) || (!isLast && binlogFileTempInfo.Size() != binlogFileToDownload.RemoteSize) { logx.Error("Downloaded archived binlog file size is not equal to size queried on the MySQL server earlier.", logx.String("binlog", binlogFileToDownload.Name), - logx.Int64("sizeInfo", binlogFileToDownload.Size), + logx.Int64("sizeInfo", binlogFileToDownload.RemoteSize), logx.Int64("downloadedSize", binlogFileTempInfo.Size()), ) - return errors.Errorf("下载的 binlog 文件 %q 与服务上的文件大小不一致 %d != %d", binlogFilePathTemp, binlogFileTempInfo.Size(), binlogFileToDownload.Size) + return errors.Errorf("下载的 binlog 文件 %q 与服务上的文件大小不一致 %d != %d", binlogFilePathTemp, binlogFileTempInfo.Size(), binlogFileToDownload.RemoteSize) } binlogFilePath := svc.GetBinlogFilePath(binlogFileToDownload.Name) @@ -348,7 +412,7 @@ func (svc *DbProgramMysql) downloadBinlogFile(ctx context.Context, binlogFileToD if err != nil { return err } - lastEventTime, err := svc.parseLocalBinlogLastEventTime(ctx, binlogFilePath) + lastEventTime, err := svc.parseLocalBinlogLastEventTime(ctx, binlogFilePath, binlogFileToDownload.LastEventTime) if err != nil { return err } @@ -394,9 +458,9 @@ func (svc *DbProgramMysql) GetSortedBinlogFilesOnServer(_ context.Context) ([]*e return nil, errors.Wrapf(err, "SQL 语句 %q 执行结果解析失败", query) } binlogFile := &entity.BinlogFile{ - Name: name, - Size: int64(size), - Sequence: seq, + Name: name, + RemoteSize: int64(size), + Sequence: seq, } binlogFiles = append(binlogFiles, binlogFile) } @@ -781,3 +845,9 @@ func (svc *DbProgramMysql) getDbBackupDir(instanceId, backupId uint64) string { fmt.Sprintf("instance-%d", instanceId), fmt.Sprintf("backup-%d", backupId)) } + +func (svc *DbProgramMysql) PruneBinlog(history *entity.DbBinlogHistory) error { + binlogFilePath := filepath.Join(svc.getBinlogDir(history.DbInstanceId), history.FileName) + _ = os.Remove(binlogFilePath) + return nil +} diff --git a/server/internal/db/dbm/mysql/program_e2e_test.go b/server/internal/db/dbm/mysql/program_e2e_test.go index 8a99d46f..86780b80 100644 --- a/server/internal/db/dbm/mysql/program_e2e_test.go +++ b/server/internal/db/dbm/mysql/program_e2e_test.go @@ -47,11 +47,11 @@ func (s *DbInstanceSuite) SetupSuite() { Username: "test", Password: "test", } - dbConn, err := dbInfo.Conn(GetMeta()) + dbConn, err := dbInfo.Conn(dbi.GetMeta(dbi.DbTypeMysql)) s.Require().NoError(err) s.dbConn = dbConn s.repositories = &repository.Repositories{ - Instance: persistence.GetInstanceRepo(), + Instance: persistence.NewInstanceRepo(), Backup: persistence.NewDbBackupRepo(), BackupHistory: persistence.NewDbBackupHistoryRepo(), Restore: persistence.NewDbRestoreRepo(), @@ -111,7 +111,7 @@ func (s *DbInstanceSuite) testBackup(backupHistory *entity.DbBackupHistory) { binlogInfo, err := s.instanceSvc.Backup(context.Background(), backupHistory) require.NoError(err) - fileName := filepath.Join(s.instanceSvc.getDbBackupDir(s.dbConn.Info.InstanceId, backupHistory.Id), dbNameBackupTest+".sql") + fileName := filepath.Join(s.instanceSvc.getDbBackupDir(s.dbConn.Info.InstanceId, backupHistory.Id), dbNameBackupTest+".sql.gz") _, err = os.Stat(fileName) require.NoError(err) diff --git a/server/internal/db/dbm/oracle/dialect.go b/server/internal/db/dbm/oracle/dialect.go index 0e303d58..9814f4d2 100644 --- a/server/internal/db/dbm/oracle/dialect.go +++ b/server/internal/db/dbm/oracle/dialect.go @@ -248,8 +248,8 @@ func (od *OracleDialect) GetSchemas() ([]string, error) { } // GetDbProgram 获取数据库程序模块,用于数据库备份与恢复 -func (od *OracleDialect) GetDbProgram() dbi.DbProgram { - panic("implement me") +func (od *OracleDialect) GetDbProgram() (dbi.DbProgram, error) { + return nil, fmt.Errorf("该数据库类型不支持数据库备份与恢复: %v", od.dc.Info.Type) } func (od *OracleDialect) BatchInsert(tx *sql.Tx, tableName string, columns []string, values [][]any) (int64, error) { diff --git a/server/internal/db/dbm/postgres/dialect.go b/server/internal/db/dbm/postgres/dialect.go index 173c93a2..33d79ac3 100644 --- a/server/internal/db/dbm/postgres/dialect.go +++ b/server/internal/db/dbm/postgres/dialect.go @@ -188,8 +188,8 @@ func (md *PgsqlDialect) GetSchemas() ([]string, error) { } // GetDbProgram 获取数据库程序模块,用于数据库备份与恢复 -func (md *PgsqlDialect) GetDbProgram() dbi.DbProgram { - panic("implement me") +func (md *PgsqlDialect) GetDbProgram() (dbi.DbProgram, error) { + return nil, fmt.Errorf("该数据库类型不支持数据库备份与恢复: %v", md.dc.Info.Type) } func (md *PgsqlDialect) BatchInsert(tx *sql.Tx, tableName string, columns []string, values [][]any) (int64, error) { diff --git a/server/internal/db/dbm/sqlite/dialect.go b/server/internal/db/dbm/sqlite/dialect.go index 3b0887ff..ce26841e 100644 --- a/server/internal/db/dbm/sqlite/dialect.go +++ b/server/internal/db/dbm/sqlite/dialect.go @@ -180,8 +180,8 @@ func (sd *SqliteDialect) GetSchemas() ([]string, error) { } // GetDbProgram 获取数据库程序模块,用于数据库备份与恢复 -func (sd *SqliteDialect) GetDbProgram() dbi.DbProgram { - panic("implement me") +func (sd *SqliteDialect) GetDbProgram() (dbi.DbProgram, error) { + return nil, fmt.Errorf("该数据库类型不支持数据库备份与恢复: %v", sd.dc.Info.Type) } func (sd *SqliteDialect) BatchInsert(tx *sql.Tx, tableName string, columns []string, values [][]any) (int64, error) { diff --git a/server/internal/db/domain/entity/db_backup.go b/server/internal/db/domain/entity/db_backup.go index 2455a49e..d8f62f2e 100644 --- a/server/internal/db/domain/entity/db_backup.go +++ b/server/internal/db/domain/entity/db_backup.go @@ -18,6 +18,7 @@ type DbBackup struct { EnabledDesc string // 启用状态描述 StartTime time.Time // 开始时间 Interval time.Duration // 间隔时间 + MaxSaveDays int // 数据库备份历史保留天数,过期将自动删除 Repeated bool // 是否重复执行 } @@ -81,10 +82,6 @@ func (b *DbBackup) GetInterval() time.Duration { return b.Interval } -func (b *DbBackup) SetLastStatus(status DbJobStatus, err error) { - b.setLastStatus(b.GetJobType(), status, err) -} - func (b *DbBackup) GetKey() DbJobKey { return b.getKey(b.GetJobType()) } diff --git a/server/internal/db/domain/entity/db_binlog.go b/server/internal/db/domain/entity/db_binlog.go index c1d9d3b8..388f52d1 100644 --- a/server/internal/db/domain/entity/db_binlog.go +++ b/server/internal/db/domain/entity/db_binlog.go @@ -11,14 +11,16 @@ const ( // BinlogFile is the metadata of the MySQL binlog file. type BinlogFile struct { - Name string - Size int64 + Name string + RemoteSize int64 + LocalSize int64 // Sequence is parsed from Name and is for the sorting purpose. Sequence int64 FirstEventTime time.Time LastEventTime time.Time - Downloaded bool + + Downloaded bool } var _ DbJob = (*DbBinlog)(nil) @@ -76,10 +78,6 @@ func (b *DbBinlog) GetJobType() DbJobType { return DbJobTypeBinlog } -func (b *DbBinlog) SetLastStatus(status DbJobStatus, err error) { - b.setLastStatus(b.GetJobType(), status, err) -} - func (b *DbBinlog) GetKey() DbJobKey { return b.getKey(b.GetJobType()) } diff --git a/server/internal/db/domain/entity/db_binlog_history.go b/server/internal/db/domain/entity/db_binlog_history.go index 1a6a6fe6..43fcc46f 100644 --- a/server/internal/db/domain/entity/db_binlog_history.go +++ b/server/internal/db/domain/entity/db_binlog_history.go @@ -14,6 +14,7 @@ type DbBinlogHistory struct { FileSize int64 Sequence int64 FirstEventTime time.Time + LastEventTime time.Time DbInstanceId uint64 `json:"dbInstanceId"` } diff --git a/server/internal/db/domain/entity/db_job.go b/server/internal/db/domain/entity/db_job.go index bf318662..d7dbfc2e 100644 --- a/server/internal/db/domain/entity/db_job.go +++ b/server/internal/db/domain/entity/db_job.go @@ -45,7 +45,6 @@ var _ runner.Job = (DbJob)(nil) type DbJobBase interface { model.ModelI - GetLastStatus() DbJobStatus } type DbJob interface { @@ -62,7 +61,6 @@ type DbJob interface { SetEnabled(enabled bool, desc string) Update(job runner.Job) GetInterval() time.Duration - SetLastStatus(status DbJobStatus, err error) } var _ DbJobBase = (*DbJobBaseImpl)(nil) @@ -84,10 +82,6 @@ func (d *DbJobBaseImpl) getJobType() DbJobType { return job.GetJobType() } -func (d *DbJobBaseImpl) GetLastStatus() DbJobStatus { - return d.LastStatus -} - func (d *DbJobBaseImpl) setLastStatus(jobType DbJobType, status DbJobStatus, err error) { var statusName, jobName string switch status { diff --git a/server/internal/db/domain/entity/db_restore.go b/server/internal/db/domain/entity/db_restore.go index 5eaa35f8..5660a338 100644 --- a/server/internal/db/domain/entity/db_restore.go +++ b/server/internal/db/domain/entity/db_restore.go @@ -79,10 +79,6 @@ func (r *DbRestore) GetJobType() DbJobType { return DbJobTypeRestore } -func (r *DbRestore) SetLastStatus(status DbJobStatus, err error) { - r.setLastStatus(r.GetJobType(), status, err) -} - func (r *DbRestore) GetKey() DbJobKey { return r.getKey(r.GetJobType()) } diff --git a/server/internal/db/domain/repository/db_backup.go b/server/internal/db/domain/repository/db_backup.go index 8eaad758..666dcc77 100644 --- a/server/internal/db/domain/repository/db_backup.go +++ b/server/internal/db/domain/repository/db_backup.go @@ -6,7 +6,7 @@ import ( ) type DbBackup interface { - DbJob + DbJob[*entity.DbBackup] ListToDo(jobs any) error ListDbInstances(enabled bool, repeated bool, instanceIds *[]uint64) error @@ -14,4 +14,6 @@ type DbBackup interface { // GetPageList 分页获取数据库任务列表 GetPageList(condition *entity.DbBackupQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) + + ListByCond(cond any, listModels any, cols ...string) error } diff --git a/server/internal/db/domain/repository/db_backup_history.go b/server/internal/db/domain/repository/db_backup_history.go index d3d678bc..4ae5ce87 100644 --- a/server/internal/db/domain/repository/db_backup_history.go +++ b/server/internal/db/domain/repository/db_backup_history.go @@ -12,12 +12,13 @@ type DbBackupHistory interface { // GetPageList 分页获取数据备份历史 GetPageList(condition *entity.DbBackupHistoryQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) - GetLatestHistory(instanceId uint64, dbName string, bi *entity.BinlogInfo) (*entity.DbBackupHistory, error) + GetLatestHistoryForBinlog(instanceId uint64, dbName string, bi *entity.BinlogInfo) (*entity.DbBackupHistory, error) - GetEarliestHistory(instanceId uint64) (*entity.DbBackupHistory, bool, error) + GetEarliestHistoryForBinlog(instanceId uint64) (*entity.DbBackupHistory, bool, error) GetHistories(backupHistoryIds []uint64, toEntity any) error UpdateDeleting(deleting bool, backupHistoryId ...uint64) (bool, error) UpdateRestoring(restoring bool, backupHistoryId ...uint64) (bool, error) + ZeroBinlogInfo(backupHistoryId uint64) error } diff --git a/server/internal/db/domain/repository/db_binlog.go b/server/internal/db/domain/repository/db_binlog.go index 76c1122a..f3229c6e 100644 --- a/server/internal/db/domain/repository/db_binlog.go +++ b/server/internal/db/domain/repository/db_binlog.go @@ -6,7 +6,7 @@ import ( ) type DbBinlog interface { - DbJob + DbJob[*entity.DbBinlog] AddJobIfNotExists(ctx context.Context, job *entity.DbBinlog) error } diff --git a/server/internal/db/domain/repository/db_binlog_history.go b/server/internal/db/domain/repository/db_binlog_history.go index 0085b52f..a93130d3 100644 --- a/server/internal/db/domain/repository/db_binlog_history.go +++ b/server/internal/db/domain/repository/db_binlog_history.go @@ -19,4 +19,6 @@ type DbBinlogHistory interface { InsertWithBinlogFiles(ctx context.Context, instanceId uint64, binlogFiles []*entity.BinlogFile) error Upsert(ctx context.Context, history *entity.DbBinlogHistory) error + + GetHistoriesBeforeSequence(ctx context.Context, instanceId uint64, binlogSeq int64, histories *[]*entity.DbBinlogHistory) error } diff --git a/server/internal/db/domain/repository/db_job.go b/server/internal/db/domain/repository/db_job.go index 76ced093..17063f58 100644 --- a/server/internal/db/domain/repository/db_job.go +++ b/server/internal/db/domain/repository/db_job.go @@ -3,24 +3,18 @@ package repository import ( "context" "mayfly-go/internal/db/domain/entity" + "mayfly-go/pkg/base" ) -type DbJobBase interface { - // GetById 根据实体id查询 - GetById(e entity.DbJob, id uint64, cols ...string) error - - // UpdateById 根据实体id更新实体信息 - UpdateById(ctx context.Context, e entity.DbJob, columns ...string) error - - // DeleteById 根据实体主键删除实体 - DeleteById(ctx context.Context, id uint64) error +type DbJobBase[T entity.DbJob] interface { + base.Repo[T] // UpdateLastStatus 更新任务执行状态 UpdateLastStatus(ctx context.Context, job entity.DbJob) error } -type DbJob interface { - DbJobBase +type DbJob[T entity.DbJob] interface { + DbJobBase[T] // AddJob 添加数据库任务 AddJob(ctx context.Context, jobs any) error diff --git a/server/internal/db/domain/repository/db_restore.go b/server/internal/db/domain/repository/db_restore.go index 44fec3a4..4f30e67e 100644 --- a/server/internal/db/domain/repository/db_restore.go +++ b/server/internal/db/domain/repository/db_restore.go @@ -6,7 +6,7 @@ import ( ) type DbRestore interface { - DbJob + DbJob[*entity.DbRestore] ListToDo(jobs any) error GetDbNamesWithoutRestore(instanceId uint64, dbNames []string) ([]string, error) diff --git a/server/internal/db/infrastructure/persistence/db_backup.go b/server/internal/db/infrastructure/persistence/db_backup.go index f7717b6d..632e7fda 100644 --- a/server/internal/db/infrastructure/persistence/db_backup.go +++ b/server/internal/db/infrastructure/persistence/db_backup.go @@ -64,7 +64,6 @@ func (d *dbBackupRepoImpl) ListToDo(jobs any) error { // GetPageList 分页获取数据库备份任务列表 func (d *dbBackupRepoImpl) GetPageList(condition *entity.DbBackupQuery, pageParam *model.PageParam, toEntity any, _ ...string) (*model.PageResult[any], error) { - d.GetModel() qd := gormx.NewQuery(d.GetModel()). Eq("id", condition.Id). Eq0("db_instance_id", condition.DbInstanceId). @@ -83,12 +82,16 @@ func (d *dbBackupRepoImpl) UpdateEnabled(_ context.Context, jobId uint64, enable cond := map[string]any{ "id": jobId, } - desc := "任务已禁用" + desc := "已禁用" if enabled { - desc = "任务已启用" + desc = "已启用" } return d.Updates(cond, map[string]any{ "enabled": enabled, "enabled_desc": desc, }) } + +func (d *dbBackupRepoImpl) ListByCond(cond any, listModels any, cols ...string) error { + return d.dbJobBaseImpl.ListByCond(cond, listModels, cols...) +} diff --git a/server/internal/db/infrastructure/persistence/db_backup_history.go b/server/internal/db/infrastructure/persistence/db_backup_history.go index 7ed915f9..ccc12b0a 100644 --- a/server/internal/db/infrastructure/persistence/db_backup_history.go +++ b/server/internal/db/infrastructure/persistence/db_backup_history.go @@ -34,12 +34,13 @@ func (repo *dbBackupHistoryRepoImpl) GetPageList(condition *entity.DbBackupHisto func (repo *dbBackupHistoryRepoImpl) GetHistories(backupHistoryIds []uint64, toEntity any) error { return global.Db.Model(repo.GetModel()). Where("id in ?", backupHistoryIds). + Where("deleting = false"). Scopes(gormx.UndeleteScope). Find(toEntity). Error } -func (repo *dbBackupHistoryRepoImpl) GetLatestHistory(instanceId uint64, dbName string, bi *entity.BinlogInfo) (*entity.DbBackupHistory, error) { +func (repo *dbBackupHistoryRepoImpl) GetLatestHistoryForBinlog(instanceId uint64, dbName string, bi *entity.BinlogInfo) (*entity.DbBackupHistory, error) { history := &entity.DbBackupHistory{} db := global.Db err := db.Model(repo.GetModel()). @@ -48,6 +49,8 @@ func (repo *dbBackupHistoryRepoImpl) GetLatestHistory(instanceId uint64, dbName Where(db.Where("binlog_sequence < ?", bi.Sequence). Or(db.Where("binlog_sequence = ?", bi.Sequence). Where("binlog_position <= ?", bi.Position))). + Where("binlog_sequence > 0"). + Where("deleting = false"). Scopes(gormx.UndeleteScope). Order("binlog_sequence desc, binlog_position desc"). First(history).Error @@ -57,10 +60,12 @@ func (repo *dbBackupHistoryRepoImpl) GetLatestHistory(instanceId uint64, dbName return history, err } -func (repo *dbBackupHistoryRepoImpl) GetEarliestHistory(instanceId uint64) (*entity.DbBackupHistory, bool, error) { +func (repo *dbBackupHistoryRepoImpl) GetEarliestHistoryForBinlog(instanceId uint64) (*entity.DbBackupHistory, bool, error) { history := &entity.DbBackupHistory{} db := global.Db.Model(repo.GetModel()) err := db.Where("db_instance_id = ?", instanceId). + Where("binlog_sequence > 0"). + Where("deleting = false"). Scopes(gormx.UndeleteScope). Order("binlog_sequence"). First(history).Error @@ -79,7 +84,7 @@ func (repo *dbBackupHistoryRepoImpl) UpdateDeleting(deleting bool, backupHistory Where("id in ?", backupHistoryId). Where("restoring = false"). Scopes(gormx.UndeleteScope). - Update("restoring", deleting) + Update("deleting", deleting) if db.Error != nil { return false, db.Error } @@ -103,3 +108,15 @@ func (repo *dbBackupHistoryRepoImpl) UpdateRestoring(restoring bool, backupHisto } return true, nil } + +func (repo *dbBackupHistoryRepoImpl) ZeroBinlogInfo(backupHistoryId uint64) error { + return global.Db.Model(repo.GetModel()). + Where("id = ?", backupHistoryId). + Where("restoring = false"). + Scopes(gormx.UndeleteScope). + Updates(&map[string]any{ + "binlog_file_name": "", + "binlog_sequence": 0, + "binlog_position": 0, + }).Error +} diff --git a/server/internal/db/infrastructure/persistence/db_binlog_history.go b/server/internal/db/infrastructure/persistence/db_binlog_history.go index 42345257..6d6140cb 100644 --- a/server/internal/db/infrastructure/persistence/db_binlog_history.go +++ b/server/internal/db/infrastructure/persistence/db_binlog_history.go @@ -7,6 +7,7 @@ import ( "mayfly-go/internal/db/domain/entity" "mayfly-go/internal/db/domain/repository" "mayfly-go/pkg/base" + "mayfly-go/pkg/global" "mayfly-go/pkg/gormx" "time" ) @@ -82,7 +83,7 @@ func (repo *dbBinlogHistoryRepoImpl) Upsert(_ context.Context, history *entity.D First(old).Error switch { case err == nil: - return db.Model(old).Select("create_time", "file_size", "first_event_time").Updates(history).Error + return db.Model(old).Select("create_time", "file_size", "first_event_time", "last_event_time").Updates(history).Error case errors.Is(err, gorm.ErrRecordNotFound): return db.Create(history).Error default: @@ -103,9 +104,10 @@ func (repo *dbBinlogHistoryRepoImpl) InsertWithBinlogFiles(ctx context.Context, history := &entity.DbBinlogHistory{ CreateTime: time.Now(), FileName: fileOnServer.Name, - FileSize: fileOnServer.Size, + FileSize: fileOnServer.RemoteSize, Sequence: fileOnServer.Sequence, FirstEventTime: fileOnServer.FirstEventTime, + LastEventTime: fileOnServer.LastEventTime, DbInstanceId: instanceId, } histories = append(histories, history) @@ -122,3 +124,13 @@ func (repo *dbBinlogHistoryRepoImpl) InsertWithBinlogFiles(ctx context.Context, } return nil } + +func (repo *dbBinlogHistoryRepoImpl) GetHistoriesBeforeSequence(ctx context.Context, instanceId uint64, binlogSeq int64, histories *[]*entity.DbBinlogHistory) error { + return global.Db.Model(repo.GetModel()). + Where("db_instance_id = ?", instanceId). + Where("sequence < ?", binlogSeq). + Scopes(gormx.UndeleteScope). + Order("id"). + Find(histories). + Error +} diff --git a/server/internal/db/infrastructure/persistence/db_job_base.go b/server/internal/db/infrastructure/persistence/db_job_base.go index 1e04d290..750ab377 100644 --- a/server/internal/db/infrastructure/persistence/db_job_base.go +++ b/server/internal/db/infrastructure/persistence/db_job_base.go @@ -12,20 +12,12 @@ import ( "reflect" ) -var _ repository.DbJobBase = (*dbJobBaseImpl[entity.DbJob])(nil) +var _ repository.DbJobBase[entity.DbJob] = (*dbJobBaseImpl[entity.DbJob])(nil) type dbJobBaseImpl[T entity.DbJob] struct { base.RepoImpl[T] } -func (d *dbJobBaseImpl[T]) GetById(e entity.DbJob, id uint64, cols ...string) error { - return d.RepoImpl.GetById(e.(T), id, cols...) -} - -func (d *dbJobBaseImpl[T]) UpdateById(ctx context.Context, e entity.DbJob, columns ...string) error { - return d.RepoImpl.UpdateById(ctx, e.(T), columns...) -} - func (d *dbJobBaseImpl[T]) UpdateLastStatus(ctx context.Context, job entity.DbJob) error { return d.UpdateById(ctx, job.(T), "last_status", "last_result", "last_time") } diff --git a/server/internal/db/infrastructure/persistence/db_restore.go b/server/internal/db/infrastructure/persistence/db_restore.go index d4e4ab07..c0ccb36e 100644 --- a/server/internal/db/infrastructure/persistence/db_restore.go +++ b/server/internal/db/infrastructure/persistence/db_restore.go @@ -84,9 +84,9 @@ func (d *dbRestoreRepoImpl) UpdateEnabled(_ context.Context, jobId uint64, enabl cond := map[string]any{ "id": jobId, } - desc := "任务已禁用" + desc := "已禁用" if enabled { - desc = "任务已启用" + desc = "已启用" } return d.Updates(cond, map[string]any{ "enabled": enabled, diff --git a/server/internal/db/infrastructure/persistence/instance.go b/server/internal/db/infrastructure/persistence/instance.go index cd9e1d67..f4652088 100644 --- a/server/internal/db/infrastructure/persistence/instance.go +++ b/server/internal/db/infrastructure/persistence/instance.go @@ -12,7 +12,7 @@ type instanceRepoImpl struct { base.RepoImpl[*entity.DbInstance] } -func newInstanceRepo() repository.Instance { +func NewInstanceRepo() repository.Instance { return &instanceRepoImpl{base.RepoImpl[*entity.DbInstance]{M: new(entity.DbInstance)}} } diff --git a/server/internal/db/infrastructure/persistence/persistence.go b/server/internal/db/infrastructure/persistence/persistence.go index e0f04fe8..abb77248 100644 --- a/server/internal/db/infrastructure/persistence/persistence.go +++ b/server/internal/db/infrastructure/persistence/persistence.go @@ -5,7 +5,7 @@ import ( ) func Init() { - ioc.Register(newInstanceRepo(), ioc.WithComponentName("DbInstanceRepo")) + ioc.Register(NewInstanceRepo(), ioc.WithComponentName("DbInstanceRepo")) ioc.Register(newDbRepo(), ioc.WithComponentName("DbRepo")) ioc.Register(newDbSqlRepo(), ioc.WithComponentName("DbSqlRepo")) ioc.Register(newDbSqlExecRepo(), ioc.WithComponentName("DbSqlExecRepo")) diff --git a/server/pkg/biz/assert.go b/server/pkg/biz/assert.go index 74a5e76f..94ecdf59 100644 --- a/server/pkg/biz/assert.go +++ b/server/pkg/biz/assert.go @@ -24,6 +24,12 @@ func ErrIsNil(err error, msgAndParams ...any) { } } +func ErrNotNil(err error, msg string, params ...any) { + if err == nil { + panic(errorx.NewBiz(fmt.Sprintf(msg, params...))) + } +} + func ErrIsNilAppendErr(err error, msg string) { if err != nil { panic(errorx.NewBiz(fmt.Sprintf(msg, err.Error()))) diff --git a/server/pkg/runner/runner.go b/server/pkg/runner/runner.go index f0fd4d1d..d7eb7e48 100644 --- a/server/pkg/runner/runner.go +++ b/server/pkg/runner/runner.go @@ -22,7 +22,7 @@ var ( type JobKey = string type RunJobFunc[T Job] func(ctx context.Context, job T) error type NextJobFunc[T Job] func() (T, bool) -type RunnableJobFunc[T Job] func(job T, next NextJobFunc[T]) (bool, error) +type RunnableJobFunc[T Job] func(job T, nextRunning NextJobFunc[T]) (bool, error) type ScheduleJobFunc[T Job] func(job T) (deadline time.Time, err error) type UpdateJobFunc[T Job] func(ctx context.Context, job T) error diff --git a/server/resources/data/mayfly-go.sqlite b/server/resources/data/mayfly-go.sqlite index 7ed16d6e3b0ed05e64a9fef5ef55fbd789f5a4dd..16c5a5dffed2a60080ed2053983da2498669571b 100644 GIT binary patch delta 1354 zcmbVLQEb~(6!m*{>cq8^w6x9AG@b3VtX0~PEE{Y?s7!?kjcHpbV?+Whw|=dnrD@tY zWt#-5HM%1BnQ_6;4bf%Qw2w_%77a@Sp|UUdLcrHP(BNkuNWl0*nufS`)|LU{W3t}W zd-vXV-#tF(&MgP$R)eQwEfPghyJ4}g5UgG1K*lFgTMxcb(-uNaJ>ZJR>%WIWhw5VZ z$B=niyec9w!Z;K|=P2PGzpp7x=k9-5)A)*E|G>va_>(I5KS z+^P(3P=?e>pz3QW26X$!uuKL`&5({-~=^OM2v(2r0&Z{V0=)!P77up(L>Eb4f z=XuC9x{!PAC$?PxaBsadz(z6|^!A7~ymScs4RDv1+~#go4oFbII^){=9Y4@Z&?_-R%KFqyzBplBVcR>! zXP(`==kWd)#Vx5V172Js1^5w-aPYzK8kG6xl58ZAK<^x}ehut|*BVfUI2Xz~0NAgy z)#{a=RF5fTb(Scadc16_-KhPa4?HBL8-O30A{r=i=8w={Ag6g6-$4wF(GR9^lRMd+ z_IR5gv%X}KXt(tX)3Xj4z;=4j{+p4bzF3zRE|cjB0kO*>sE$mKnZ9i)S3=9aj{Xhh zxsy>)%eq2l35c_y%#E;Zyi7+yYj!PydKz^-vdF(O*N8Hh6dM`LAY+GZmT(_>!oFU@ zFh=&x5{}6Yy4ObYZ*xc<^XN5N?EN`9W`CEXJCdH-{%@Wea&!mT`BR~Bsu~Xe1v6G{ A?f?J) delta 673 zcmZuuUr1A76#vfm-Q92ZdY#)UZ_}1{Gjl^5%CeB?LB^JADf>_(`b%=8fwUqF| zkv77Y*+$03CKj9+vQ zYSFO1C&a}$2S?>_7|Id$DM@0C^FqM5Z%@J*Wn7z>yfZ#h$SKzb2BTM^iB3h;^4jDP zq&EAc(Jw6%R6`xSUB4=W`;J+uC5k#XiKyCAKz=y?TWY z>R;_G`ntHm`Vz#WZ`nAON{}YKQqQrq1aY%Uf;bN|pFSb9=%H6nbV+J&{Fi9p!ujCA zK@Z}qW_%TlW%kA1FUFT1>fZ-LKzwtGI>Tklu;k`XaD)lTK^Rw%Wzt1;q-9yS^yzGMGs cWiMhx=EYzM&Qf!sh>O^)&fq+Gd7E_o0sVuzz5oCK diff --git a/server/resources/script/sql/mayfly-go-sqlite.sql b/server/resources/script/sql/mayfly-go-sqlite.sql index 13c36ece..2e4c3d27 100644 --- a/server/resources/script/sql/mayfly-go-sqlite.sql +++ b/server/resources/script/sql/mayfly-go-sqlite.sql @@ -50,6 +50,7 @@ CREATE TABLE IF NOT EXISTS "t_db_backup" ( "db_name" text(64) NOT NULL, "repeated" integer(1), "interval" integer(20), + "max_save_days" integer(8) NOT NULL DEFAULT '0', "start_time" datetime, "enabled" integer(1), "enabled_desc" text(64), @@ -81,8 +82,8 @@ CREATE TABLE IF NOT EXISTS "t_db_backup_history" ( "create_time" datetime, "is_deleted" integer(1) NOT NULL, "delete_time" datetime, - "restoring" integer(1), - "deleting" integer(1), + "restoring" integer(1) NOT NULL DEFAULT '0', + "deleting" integer(1) NOT NULL DEFAULT '0', PRIMARY KEY ("id") ); @@ -112,6 +113,7 @@ CREATE TABLE IF NOT EXISTS "t_db_binlog_history" ( "file_size" integer(20), "sequence" integer(20), "first_event_time" datetime, + "last_event_time" datetime, "create_time" datetime, "is_deleted" integer(4) NOT NULL, "delete_time" datetime, @@ -830,8 +832,8 @@ INSERT INTO t_sys_resource (id, pid, ui_path, type, status, name, code, weight, INSERT INTO t_sys_resource (id, pid, ui_path, type, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES (153, 150, 'Jra0n7De/pLOA2UYz/', 2, 1, '删除', 'db:sync:del', 1703641342, 'null', 12, 'liuzongyang', 12, 'liuzongyang', '2023-12-27 09:42:22', '2023-12-27 09:42:22', 0, NULL); INSERT INTO t_sys_resource (id, pid, ui_path, type, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES (154, 150, 'Jra0n7De/VBt68CDx/', 2, 1, '启停', 'db:sync:status', 1703641364, 'null', 12, 'liuzongyang', 12, 'liuzongyang', '2023-12-27 09:42:45', '2023-12-27 09:42:45', 0, NULL); INSERT INTO t_sys_resource (id, pid, ui_path, type, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES (155, 150, 'Jra0n7De/PigmSGVg/', 2, 1, '日志', 'db:sync:log', 1704266866, 'null', 12, 'liuzongyang', 12, 'liuzongyang', '2024-01-03 15:27:47', '2024-01-03 15:27:47', 0, NULL); -INSERT INTO t_sys_resource (id, pid, ui_path, type, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES (161, 49, 'dbms23ax/xleaiec2/3NUXQFIO/', 2, 1, '数据库备份', 'db:backup', 1705973876, 'null', 1, 'admin', 1, 'admin', '2024-01-23 09:37:56', '2024-01-23 09:37:56', 0, NULL); -INSERT INTO t_sys_resource (id, pid, ui_path, type, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES (160, 49, 'dbms23ax/xleaiec2/ghErkTdb/', 2, 1, '数据库恢复', 'db:restore', 1705973909, 'null', 1, 'admin', 1, 'admin', '2024-01-23 09:38:29', '2024-01-23 09:38:29', 0, NULL); +INSERT INTO t_sys_resource (id, pid, ui_path, type, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES (160, 49, 'dbms23ax/xleaiec2/3NUXQFIO/', 2, 1, '数据库备份', 'db:backup', 1705973876, 'null', 1, 'admin', 1, 'admin', '2024-01-23 09:37:56', '2024-01-23 09:37:56', 0, NULL); +INSERT INTO t_sys_resource (id, pid, ui_path, type, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES (161, 49, 'dbms23ax/xleaiec2/ghErkTdb/', 2, 1, '数据库恢复', 'db:restore', 1705973909, 'null', 1, 'admin', 1, 'admin', '2024-01-23 09:38:29', '2024-01-23 09:38:29', 0, NULL); -- Table: t_sys_role CREATE TABLE IF NOT EXISTS "t_sys_role" ( diff --git a/server/resources/script/sql/mayfly-go.sql b/server/resources/script/sql/mayfly-go.sql index b6522bde..a66bb70c 100644 --- a/server/resources/script/sql/mayfly-go.sql +++ b/server/resources/script/sql/mayfly-go.sql @@ -108,6 +108,7 @@ CREATE TABLE `t_db_backup` ( `db_name` varchar(64) NOT NULL COMMENT '数据库名称', `repeated` tinyint(1) DEFAULT NULL COMMENT '是否重复执行', `interval` bigint(20) DEFAULT NULL COMMENT '备份周期', + `max_save_days` int(8) NOT NULL DEFAULT '0' COMMENT '最大保留天数', `start_time` datetime DEFAULT NULL COMMENT '首次备份时间', `enabled` tinyint(1) DEFAULT NULL COMMENT '是否启用', `enabled_desc` varchar(64) NULL COMMENT '任务启用描述', @@ -144,8 +145,8 @@ CREATE TABLE `t_db_backup_history` ( `create_time` datetime DEFAULT NULL COMMENT '历史备份创建时间', `is_deleted` tinyint(1) NOT NULL DEFAULT 0, `delete_time` datetime DEFAULT NULL, - `restoring` int(1) NOT NULL DEFAULT '0' COMMENT '备份历史恢复标识', - `deleting` int(1) NOT NULL DEFAULT '0' COMMENT '备份历史删除标识', + `restoring` tinyint(1) NOT NULL DEFAULT '0' COMMENT '备份历史恢复标识', + `deleting` tinyint(1) NOT NULL DEFAULT '0' COMMENT '备份历史删除标识', PRIMARY KEY (`id`), KEY `idx_db_backup_id` (`db_backup_id`) USING BTREE, KEY `idx_db_instance_id` (`db_instance_id`) USING BTREE, @@ -232,6 +233,7 @@ CREATE TABLE `t_db_binlog_history` ( `file_size` bigint(20) DEFAULT NULL COMMENT 'BINLOG文件大小', `sequence` bigint(20) DEFAULT NULL COMMENT 'BINLOG序列号', `first_event_time` datetime DEFAULT NULL COMMENT '首次事件时间', + `last_event_time` datetime DEFAULT NULL COMMENT '最新事件时间', `create_time` datetime DEFAULT NULL, `is_deleted` tinyint(4) NOT NULL DEFAULT 0, `delete_time` datetime DEFAULT NULL, @@ -792,8 +794,8 @@ INSERT INTO t_sys_resource (id, pid, ui_path, `type`, status, name, code, weight INSERT INTO t_sys_resource (id, pid, ui_path, `type`, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES(152, 150, 'Jra0n7De/zvAMo2vk/', 2, 1, '编辑', 'db:sync:save', 1703641320, 'null', 12, 'liuzongyang', 12, 'liuzongyang', '2023-12-27 09:42:00', '2023-12-27 09:42:12', 0, NULL); INSERT INTO t_sys_resource (id, pid, ui_path, `type`, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES(151, 150, 'Jra0n7De/uAnHZxEV/', 2, 1, '基本权限', 'db:sync', 1703641202, 'null', 12, 'liuzongyang', 12, 'liuzongyang', '2023-12-27 09:40:02', '2023-12-27 09:40:02', 0, NULL); INSERT INTO t_sys_resource (id, pid, ui_path, `type`, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES(150, 36, 'Jra0n7De/', 1, 1, '数据同步', 'sync', 1693040707, '{"component":"ops/db/SyncTaskList","icon":"Coin","isKeepAlive":true,"routeName":"SyncTaskList"}', 12, 'liuzongyang', 12, 'liuzongyang', '2023-12-22 09:51:34', '2023-12-27 10:16:57', 0, NULL); -INSERT INTO t_sys_resource (id, pid, ui_path, `type`, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES(161, 49, 'dbms23ax/xleaiec2/3NUXQFIO/', 2, 1, '数据库备份', 'db:backup', 1705973876, 'null', 1, 'admin', 1, 'admin', '2024-01-23 09:37:56', '2024-01-23 09:37:56', 0, NULL); -INSERT INTO t_sys_resource (id, pid, ui_path, `type`, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES(160, 49, 'dbms23ax/xleaiec2/ghErkTdb/', 2, 1, '数据库恢复', 'db:restore', 1705973909, 'null', 1, 'admin', 1, 'admin', '2024-01-23 09:38:29', '2024-01-23 09:38:29', 0, NULL); +INSERT INTO t_sys_resource (id, pid, ui_path, `type`, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES(160, 49, 'dbms23ax/xleaiec2/3NUXQFIO/', 2, 1, '数据库备份', 'db:backup', 1705973876, 'null', 1, 'admin', 1, 'admin', '2024-01-23 09:37:56', '2024-01-23 09:37:56', 0, NULL); +INSERT INTO t_sys_resource (id, pid, ui_path, `type`, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES(161, 49, 'dbms23ax/xleaiec2/ghErkTdb/', 2, 1, '数据库恢复', 'db:restore', 1705973909, 'null', 1, 'admin', 1, 'admin', '2024-01-23 09:38:29', '2024-01-23 09:38:29', 0, NULL); COMMIT; -- ---------------------------- diff --git a/server/resources/script/sql/v1.7/v1.7.2.sql b/server/resources/script/sql/v1.7/v1.7.2.sql index 0af8c9ba..240c5b0a 100644 --- a/server/resources/script/sql/v1.7/v1.7.2.sql +++ b/server/resources/script/sql/v1.7/v1.7.2.sql @@ -1,6 +1,5 @@ -INSERT INTO `t_sys_resource` (`id`, `pid`, `ui_path`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `is_deleted`, `delete_time`) - VALUES (161, 49, 'dbms23ax/xleaiec2/3NUXQFIO/', 2, 1, '数据库备份', 'db:backup', 1705973876, 'null', 1, 'admin', 1, 'admin', '2024-01-23 09:37:56', '2024-01-23 09:37:56', 0, NULL), - (160, 49, 'dbms23ax/xleaiec2/ghErkTdb/', 2, 1, '数据库恢复', 'db:restore', 1705973909, 'null', 1, 'admin', 1, 'admin', '2024-01-23 09:38:29', '2024-01-23 09:38:29', 0, NULL); +INSERT INTO t_sys_resource (id, pid, ui_path, `type`, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES(160, 49, 'dbms23ax/xleaiec2/3NUXQFIO/', 2, 1, '数据库备份', 'db:backup', 1705973876, 'null', 1, 'admin', 1, 'admin', '2024-01-23 09:37:56', '2024-01-23 09:37:56', 0, NULL); +INSERT INTO t_sys_resource (id, pid, ui_path, `type`, status, name, code, weight, meta, creator_id, creator, modifier_id, modifier, create_time, update_time, is_deleted, delete_time) VALUES(161, 49, 'dbms23ax/xleaiec2/ghErkTdb/', 2, 1, '数据库恢复', 'db:restore', 1705973909, 'null', 1, 'admin', 1, 'admin', '2024-01-23 09:38:29', '2024-01-23 09:38:29', 0, NULL); ALTER TABLE `t_db_backup` ADD COLUMN `enabled_desc` varchar(64) NULL COMMENT '任务启用描述' AFTER `enabled`; @@ -9,5 +8,5 @@ ALTER TABLE `t_db_restore` ADD COLUMN `enabled_desc` varchar(64) NULL COMMENT '任务启用描述' AFTER `enabled`; ALTER TABLE `t_db_backup_history` - ADD COLUMN `restoring` int(1) NOT NULL DEFAULT '0' COMMENT '备份历史恢复标识', - ADD COLUMN `deleting` int(1) NOT NULL DEFAULT '0' COMMENT '备份历史删除标识'; + ADD COLUMN `restoring` tinyint(1) NOT NULL DEFAULT '0' COMMENT '备份历史恢复标识', + ADD COLUMN `deleting` tinyint(1) NOT NULL DEFAULT '0' COMMENT '备份历史删除标识'; diff --git a/server/resources/script/sql/v1.7/v1.7.3.sql b/server/resources/script/sql/v1.7/v1.7.3.sql new file mode 100644 index 00000000..9ad4b82f --- /dev/null +++ b/server/resources/script/sql/v1.7/v1.7.3.sql @@ -0,0 +1,5 @@ +ALTER TABLE `t_db_backup` + ADD COLUMN `max_save_days` int(8) NOT NULL DEFAULT '0' COMMENT '最大保存天数' AFTER `interval`; + +ALTER TABLE `t_db_binlog_history` + ADD COLUMN `last_event_time` datetime NULL DEFAULT NULL COMMENT '最新事件时间' AFTER `first_event_time`;