diff --git a/mayfly_go_web/src/hooks/usePageTable.ts b/mayfly_go_web/src/hooks/usePageTable.ts index e3b08fd2..6aea4ccb 100644 --- a/mayfly_go_web/src/hooks/usePageTable.ts +++ b/mayfly_go_web/src/hooks/usePageTable.ts @@ -50,7 +50,7 @@ export const usePageTable = ( } let res = await api.request(sp); - dataCallBack && (res = dataCallBack(res)); + dataCallBack && (res = await dataCallBack(res)); if (pageable) { state.tableData = res.list; diff --git a/mayfly_go_web/src/views/ops/db/DbBackupList.vue b/mayfly_go_web/src/views/ops/db/DbBackupList.vue index 9054a423..100f0dd1 100644 --- a/mayfly_go_web/src/views/ops/db/DbBackupList.vue +++ b/mayfly_go_web/src/views/ops/db/DbBackupList.vue @@ -75,7 +75,7 @@ const columns = [ TableColumn.new('enabled', '是否启用'), TableColumn.new('lastResult', '执行结果'), TableColumn.new('lastTime', '执行时间').isTime(), - TableColumn.new('action', '操作').isSlot().setMinWidth(160).fixedRight(), + TableColumn.new('action', '操作').isSlot().setMinWidth(180).fixedRight(), ]; const emptyQuery = { diff --git a/mayfly_go_web/src/views/ops/db/SyncTaskEdit.vue b/mayfly_go_web/src/views/ops/db/SyncTaskEdit.vue index dd42e42e..915a1597 100644 --- a/mayfly_go_web/src/views/ops/db/SyncTaskEdit.vue +++ b/mayfly_go_web/src/views/ops/db/SyncTaskEdit.vue @@ -10,7 +10,7 @@ width="700px" > - + @@ -41,7 +41,7 @@ /> - + @@ -116,9 +116,6 @@ import { DbInst, registerDbCompletionItemProvider } from '@/views/ops/db/db'; import { getDbDialect } from '@/views/ops/db/dialect'; const props = defineProps({ - visible: { - type: Boolean, - }, data: { type: [Boolean, Object], }, @@ -130,6 +127,8 @@ const props = defineProps({ //定义事件 const emit = defineEmits(['update:visible', 'cancel', 'val-change']); +const dialogVisible = defineModel('visible', { default: false }); + const rules = { taskName: [ { @@ -180,7 +179,6 @@ const basicFormData = { } as FormData; const state = reactive({ - dialogVisible: false, tabActiveName: 'basic', form: basicFormData, submitForm: {} as any, @@ -218,18 +216,18 @@ const loadDbTables = async (dbId: number, db: string) => { } }; -const { dialogVisible, tabActiveName, form, submitForm } = toRefs(state); +const { tabActiveName, form, submitForm } = toRefs(state); const { isFetching: saveBtnLoading, execute: saveExec } = dbApi.saveDatasyncTask.useApi(submitForm); -watch(props, async (newValue: any) => { - state.dialogVisible = newValue.visible; - if (!state.dialogVisible) { +watch(dialogVisible, async (newValue: boolean) => { + if (!newValue) { return; } state.tabActiveName = 'basic'; - if (newValue.data?.id) { - let data = await dbApi.getDatasyncTask.request({ taskId: newValue.data?.id }); + const propsData = props.data as any; + if (propsData?.id) { + let data = await dbApi.getDatasyncTask.request({ taskId: propsData?.id }); state.form = data; try { state.form.fieldMap = JSON.parse(data.fieldMap); @@ -316,6 +314,7 @@ watch(tabActiveName, async (newValue: string) => { break; } }); + const handleGetSrcFields = async () => { // 执行sql,获取字段信息 if (!state.form.dataSql || !state.form.dataSql.trim()) { @@ -362,6 +361,7 @@ const handleGetSrcFields = async () => { state.previewRes = res; }; + const handleGetTargetFields = async () => { // 查询目标表下的字段信息 if (state.form.targetDbName && state.form.targetTableName) { @@ -412,7 +412,7 @@ const btnOk = async () => { }; const cancel = () => { - emit('update:visible', false); + dialogVisible.value = false; emit('cancel'); }; diff --git a/mayfly_go_web/src/views/ops/db/component/DbSelectTree.vue b/mayfly_go_web/src/views/ops/db/component/DbSelectTree.vue index 7a982f3a..b2aff0f7 100644 --- a/mayfly_go_web/src/views/ops/db/component/DbSelectTree.vue +++ b/mayfly_go_web/src/views/ops/db/component/DbSelectTree.vue @@ -150,7 +150,7 @@ const NodeTypePostgresSchema = new NodeType(SqlExecNodeType.PgSchema).withNodeCl overflow-x: hidden; width: 560px; .el-tree { - height: 200px; + height: 150px; overflow-y: auto; overflow-x: hidden; } diff --git a/mayfly_go_web/src/views/ops/machine/cronjob/CronJobExecList.vue b/mayfly_go_web/src/views/ops/machine/cronjob/CronJobExecList.vue index 36cd53d2..4e5fdd75 100644 --- a/mayfly_go_web/src/views/ops/machine/cronjob/CronJobExecList.vue +++ b/mayfly_go_web/src/views/ops/machine/cronjob/CronJobExecList.vue @@ -114,7 +114,8 @@ const search = async () => { pageTableRef.value.search(); }; -const parseData = async (dataList: any) => { +const parseData = async (res: any) => { + const dataList = res.list; // 填充机器信息 for (let x of dataList) { const machineId = x.machineId; @@ -137,7 +138,7 @@ const parseData = async (dataList: any) => { x.machineIp = machine?.ip; x.machineName = machine?.name; } - return dataList; + return res; }; const cancel = () => { diff --git a/server/internal/auth/api/ldap_login.go b/server/internal/auth/api/ldap_login.go index 2942c54b..5be96c01 100644 --- a/server/internal/auth/api/ldap_login.go +++ b/server/internal/auth/api/ldap_login.go @@ -14,6 +14,7 @@ import ( "mayfly-go/pkg/captcha" "mayfly-go/pkg/errorx" "mayfly-go/pkg/ginx" + "mayfly-go/pkg/model" "mayfly-go/pkg/req" "mayfly-go/pkg/utils/collx" "mayfly-go/pkg/utils/cryptox" @@ -87,7 +88,7 @@ func (a *LdapLogin) getUser(userName string, cols ...string) (*sysentity.Account func (a *LdapLogin) createUser(userName, displayName string) { account := &sysentity.Account{Username: userName} - account.SetBaseInfo(nil) + account.SetBaseInfo(model.IdGenTypeNone, nil) account.Name = displayName biz.ErrIsNil(a.AccountApp.Create(context.TODO(), account)) // 将 LADP 用户本地密码设置为空,不允许本地登录 diff --git a/server/internal/auth/api/oauth2_login.go b/server/internal/auth/api/oauth2_login.go index ae886455..375a59c8 100644 --- a/server/internal/auth/api/oauth2_login.go +++ b/server/internal/auth/api/oauth2_login.go @@ -169,10 +169,7 @@ func (a *Oauth2Login) doLoginAction(rc *req.Ctx, userId string, oauth *config.Oa } // 进行登录 - account := &sysentity.Account{ - Model: model.Model{DeletedModel: model.DeletedModel{Id: accountId}}, - } - err = a.AccountApp.GetBy(account, "Id", "Name", "Username", "Password", "Status", "LastLoginTime", "LastLoginIp", "OtpSecret") + account, err := a.AccountApp.GetById(new(sysentity.Account), accountId, "Id", "Name", "Username", "Password", "Status", "LastLoginTime", "LastLoginIp", "OtpSecret") biz.ErrIsNilAppendErr(err, "获取用户信息失败: %s") clientIp := getIpAndRegion(rc) diff --git a/server/internal/db/api/db_backup.go b/server/internal/db/api/db_backup.go index f5883cdf..0218d48d 100644 --- a/server/internal/db/api/db_backup.go +++ b/server/internal/db/api/db_backup.go @@ -137,3 +137,19 @@ func (d *DbBackup) GetDbNamesWithoutBackup(rc *req.Ctx) { biz.ErrIsNilAppendErr(err, "获取未配置定时备份的数据库名称失败: %v") rc.ResData = dbNamesWithoutBackup } + +// GetPageList 获取数据库备份历史 +// @router /api/dbs/:dbId/backups/:backupId/histories [GET] +func (d *DbBackup) GetHistoryPageList(rc *req.Ctx) { + dbId := uint64(ginx.PathParamInt(rc.GinCtx, "dbId")) + biz.IsTrue(dbId > 0, "无效的 dbId: %v", dbId) + db, err := d.DbApp.GetById(new(entity.Db), dbId, "db_instance_id", "database") + biz.ErrIsNilAppendErr(err, "获取数据库信息失败: %v") + + queryCond, page := ginx.BindQueryAndPage[*entity.DbBackupHistoryQuery](rc.GinCtx, new(entity.DbBackupHistoryQuery)) + queryCond.DbInstanceId = db.InstanceId + queryCond.InDbNames = strings.Fields(db.Database) + res, err := d.DbBackupApp.GetHistoryPageList(queryCond, page, new([]vo.DbBackupHistory)) + biz.ErrIsNilAppendErr(err, "获取数据库备份历史失败: %v") + rc.ResData = res +} diff --git a/server/internal/db/api/db_backup_history.go b/server/internal/db/api/db_backup_history.go deleted file mode 100644 index ff6c12fc..00000000 --- a/server/internal/db/api/db_backup_history.go +++ /dev/null @@ -1,39 +0,0 @@ -package api - -import ( - "mayfly-go/internal/db/api/vo" - "mayfly-go/internal/db/application" - "mayfly-go/internal/db/domain/entity" - "mayfly-go/pkg/biz" - "mayfly-go/pkg/ginx" - "mayfly-go/pkg/req" - "strings" -) - -type DbBackupHistory struct { - DbBackupHistoryApp *application.DbBackupHistoryApp - DbApp application.Db -} - -// GetPageList 获取数据库备份历史 -// @router /api/dbs/:dbId/backups/:backupId/histories [GET] -func (d *DbBackupHistory) GetPageList(rc *req.Ctx) { - dbId := uint64(ginx.PathParamInt(rc.GinCtx, "dbId")) - biz.IsTrue(dbId > 0, "无效的 dbId: %v", dbId) - db, err := d.DbApp.GetById(new(entity.Db), dbId, "db_instance_id", "database") - biz.ErrIsNilAppendErr(err, "获取数据库信息失败: %v") - - queryCond, page := ginx.BindQueryAndPage[*entity.DbBackupHistoryQuery](rc.GinCtx, new(entity.DbBackupHistoryQuery)) - queryCond.DbInstanceId = db.InstanceId - queryCond.InDbNames = strings.Fields(db.Database) - res, err := d.DbBackupHistoryApp.GetPageList(queryCond, page, new([]vo.DbBackupHistory)) - biz.ErrIsNilAppendErr(err, "获取数据库备份历史失败: %v") - rc.ResData = res -} - -// Delete 删除数据库备份历史 -// @router /api/dbs/:dbId/backups/:backupId/histories/:historyId [DELETE] -func (d *DbBackupHistory) Delete(rc *req.Ctx) { - // todo delete backup histories - panic("implement me") -} diff --git a/server/internal/db/api/db_data_sync.go b/server/internal/db/api/db_data_sync.go index 5b719d11..80219287 100644 --- a/server/internal/db/api/db_data_sync.go +++ b/server/internal/db/api/db_data_sync.go @@ -3,8 +3,6 @@ package api import ( "context" "encoding/base64" - "github.com/gin-gonic/gin" - "github.com/google/uuid" "mayfly-go/internal/db/api/form" "mayfly-go/internal/db/api/vo" "mayfly-go/internal/db/application" @@ -15,11 +13,13 @@ import ( "mayfly-go/pkg/utils/stringx" "strconv" "strings" + + "github.com/gin-gonic/gin" + "github.com/google/uuid" ) type DataSyncTask struct { DataSyncTaskApp application.DataSyncTask - DataSyncLogApp application.DataSyncLog } func (d *DataSyncTask) Tasks(rc *req.Ctx) { @@ -31,7 +31,7 @@ func (d *DataSyncTask) Tasks(rc *req.Ctx) { func (d *DataSyncTask) Logs(rc *req.Ctx) { queryCond, page := ginx.BindQueryAndPage[*entity.DataSyncLogQuery](rc.GinCtx, new(entity.DataSyncLogQuery)) - res, err := d.DataSyncLogApp.GetTaskLogList(queryCond, page, new([]vo.DataSyncLogListVO)) + res, err := d.DataSyncTaskApp.GetTaskLogList(queryCond, page, new([]vo.DataSyncLogListVO)) biz.ErrIsNil(err) rc.ResData = res } diff --git a/server/internal/db/api/db_restore.go b/server/internal/db/api/db_restore.go index 4146aafb..b8e453a9 100644 --- a/server/internal/db/api/db_restore.go +++ b/server/internal/db/api/db_restore.go @@ -92,21 +92,18 @@ func (d *DbRestore) walk(rc *req.Ctx, fn func(ctx context.Context, taskId uint64 return nil } -// Delete 删除数据库恢复任务 // @router /api/dbs/:dbId/restores/:taskId [DELETE] func (d *DbRestore) Delete(rc *req.Ctx) { err := d.walk(rc, d.DbRestoreApp.Delete) biz.ErrIsNilAppendErr(err, "删除数据库恢复任务失败: %v") } -// Enable 删除数据库恢复任务 // @router /api/dbs/:dbId/restores/:taskId/enable [PUT] func (d *DbRestore) Enable(rc *req.Ctx) { err := d.walk(rc, d.DbRestoreApp.Enable) biz.ErrIsNilAppendErr(err, "启用数据库恢复任务失败: %v") } -// Disable 删除数据库恢复任务 // @router /api/dbs/:dbId/restores/:taskId/disable [PUT] func (d *DbRestore) Disable(rc *req.Ctx) { err := d.walk(rc, d.DbRestoreApp.Disable) @@ -124,3 +121,14 @@ func (d *DbRestore) GetDbNamesWithoutRestore(rc *req.Ctx) { biz.ErrIsNilAppendErr(err, "获取未配置定时备份的数据库名称失败: %v") rc.ResData = dbNamesWithoutRestore } + +// 获取数据库备份历史 +// @router /api/dbs/:dbId/restores/:restoreId/histories [GET] +func (d *DbRestore) GetHistoryPageList(rc *req.Ctx) { + queryCond := &entity.DbRestoreHistoryQuery{ + DbRestoreId: uint64(ginx.PathParamInt(rc.GinCtx, "restoreId")), + } + res, err := d.DbRestoreApp.GetHistoryPageList(queryCond, ginx.GetPageParam(rc.GinCtx), new([]vo.DbRestoreHistory)) + biz.ErrIsNilAppendErr(err, "获取数据库备份历史失败: %v") + rc.ResData = res +} diff --git a/server/internal/db/api/db_restore_history.go b/server/internal/db/api/db_restore_history.go deleted file mode 100644 index ab672c01..00000000 --- a/server/internal/db/api/db_restore_history.go +++ /dev/null @@ -1,33 +0,0 @@ -package api - -import ( - "mayfly-go/internal/db/api/vo" - "mayfly-go/internal/db/application" - "mayfly-go/internal/db/domain/entity" - "mayfly-go/pkg/biz" - "mayfly-go/pkg/ginx" - "mayfly-go/pkg/req" -) - -type DbRestoreHistory struct { - InstanceApp application.Instance - DbRestoreHistoryApp *application.DbRestoreHistoryApp -} - -// GetPageList 获取数据库备份历史 -// @router /api/dbs/:dbId/restores/:restoreId/histories [GET] -func (d *DbRestoreHistory) GetPageList(rc *req.Ctx) { - queryCond := &entity.DbRestoreHistoryQuery{ - DbRestoreId: uint64(ginx.PathParamInt(rc.GinCtx, "restoreId")), - } - res, err := d.DbRestoreHistoryApp.GetPageList(queryCond, ginx.GetPageParam(rc.GinCtx), new([]vo.DbRestoreHistory)) - biz.ErrIsNilAppendErr(err, "获取数据库备份历史失败: %v") - rc.ResData = res -} - -// Delete 删除数据库备份历史 -// @router /api/dbs/:dbId/restores/:restoreId/histories/:historyId [DELETE] -func (d *DbRestoreHistory) Delete(rc *req.Ctx) { - // todo delete restore histories - panic("implement me") -} diff --git a/server/internal/db/api/vo/db_backup.go b/server/internal/db/api/vo/db_backup.go index 4c66d6cc..601c9181 100644 --- a/server/internal/db/api/vo/db_backup.go +++ b/server/internal/db/api/vo/db_backup.go @@ -27,3 +27,12 @@ func (backup *DbBackup) MarshalJSON() ([]byte, error) { backup.IntervalDay = uint64(backup.Interval / time.Hour / 24) return json.Marshal((*dbBackup)(backup)) } + +// DbBackupHistory 数据库备份历史 +type DbBackupHistory struct { + Id uint64 `json:"id"` + DbBackupId uint64 `json:"dbBackupId"` + CreateTime time.Time `json:"createTime"` + DbName string `json:"dbName"` // 数据库名称 + Name string `json:"name"` // 备份历史名称 +} diff --git a/server/internal/db/api/vo/db_backup_history.go b/server/internal/db/api/vo/db_backup_history.go deleted file mode 100644 index 14eba084..00000000 --- a/server/internal/db/api/vo/db_backup_history.go +++ /dev/null @@ -1,12 +0,0 @@ -package vo - -import "time" - -// DbBackupHistory 数据库备份历史 -type DbBackupHistory struct { - Id uint64 `json:"id"` - DbBackupId uint64 `json:"dbBackupId"` - CreateTime time.Time `json:"createTime"` - DbName string `json:"dbName"` // 数据库名称 - Name string `json:"name"` // 备份历史名称 -} diff --git a/server/internal/db/api/vo/db_restore.go b/server/internal/db/api/vo/db_restore.go index c57bf2fe..89ca4205 100644 --- a/server/internal/db/api/vo/db_restore.go +++ b/server/internal/db/api/vo/db_restore.go @@ -29,3 +29,9 @@ func (restore *DbRestore) MarshalJSON() ([]byte, error) { restore.IntervalDay = uint64(restore.Interval / time.Hour / 24) return json.Marshal((*dbBackup)(restore)) } + +// DbRestoreHistory 数据库备份历史 +type DbRestoreHistory struct { + Id uint64 `json:"id"` + DbRestoreId uint64 `json:"dbRestoreId"` +} diff --git a/server/internal/db/api/vo/db_restore_history.go b/server/internal/db/api/vo/db_restore_history.go deleted file mode 100644 index 45820fcb..00000000 --- a/server/internal/db/api/vo/db_restore_history.go +++ /dev/null @@ -1,7 +0,0 @@ -package vo - -// DbRestoreHistory 数据库备份历史 -type DbRestoreHistory struct { - Id uint64 `json:"id"` - DbRestoreId uint64 `json:"dbRestoreId"` -} diff --git a/server/internal/db/application/application.go b/server/internal/db/application/application.go index fe18438b..eab9066c 100644 --- a/server/internal/db/application/application.go +++ b/server/internal/db/application/application.go @@ -9,17 +9,14 @@ import ( ) var ( - instanceApp Instance - dbApp Db - dbSqlExecApp DbSqlExec - dbSqlApp DbSql - dbBackupApp *DbBackupApp - dbBackupHistoryApp *DbBackupHistoryApp - dbRestoreApp *DbRestoreApp - dbRestoreHistoryApp *DbRestoreHistoryApp - dbBinlogApp *DbBinlogApp - dataSyncApp DataSyncTask - dataSyncLogApp DataSyncLog + instanceApp Instance + dbApp Db + dbSqlExecApp DbSqlExec + dbSqlApp DbSql + dbBackupApp *DbBackupApp + dbRestoreApp *DbRestoreApp + dbBinlogApp *DbBinlogApp + dataSyncApp DataSyncTask ) var repositories *repository.Repositories @@ -41,8 +38,7 @@ func Init() { dbApp = newDbApp(persistence.GetDbRepo(), persistence.GetDbSqlRepo(), instanceApp, tagapp.GetTagTreeApp()) dbSqlExecApp = newDbSqlExecApp(persistence.GetDbSqlExecRepo()) dbSqlApp = newDbSqlApp(persistence.GetDbSqlRepo()) - dataSyncApp = newDataSyncApp(persistence.GetDataSyncTaskRepo()) - dataSyncLogApp = newDataSyncLogApp(persistence.GetDataSyncLogRepo()) + dataSyncApp = newDataSyncApp(persistence.GetDataSyncTaskRepo(), persistence.GetDataSyncLogRepo()) dbBackupApp, err = newDbBackupApp(repositories, dbApp) if err != nil { @@ -52,14 +48,7 @@ func Init() { if err != nil { panic(fmt.Sprintf("初始化 dbRestoreApp 失败: %v", err)) } - dbBackupHistoryApp, err = newDbBackupHistoryApp(repositories) - if err != nil { - panic(fmt.Sprintf("初始化 dbBackupHistoryApp 失败: %v", err)) - } - dbRestoreHistoryApp, err = newDbRestoreHistoryApp(repositories) - if err != nil { - panic(fmt.Sprintf("初始化 dbRestoreHistoryApp 失败: %v", err)) - } + dbBinlogApp, err = newDbBinlogApp(repositories, dbApp) if err != nil { panic(fmt.Sprintf("初始化 dbBinlogApp 失败: %v", err)) @@ -89,18 +78,10 @@ func GetDbBackupApp() *DbBackupApp { return dbBackupApp } -func GetDbBackupHistoryApp() *DbBackupHistoryApp { - return dbBackupHistoryApp -} - func GetDbRestoreApp() *DbRestoreApp { return dbRestoreApp } -func GetDbRestoreHistoryApp() *DbRestoreHistoryApp { - return dbRestoreHistoryApp -} - func GetDbBinlogApp() *DbBinlogApp { return dbBinlogApp } @@ -108,7 +89,3 @@ func GetDbBinlogApp() *DbBinlogApp { func GetDataSyncTaskApp() DataSyncTask { return dataSyncApp } - -func GetDataSyncLogApp() DataSyncLog { - return dataSyncLogApp -} diff --git a/server/internal/db/application/db.go b/server/internal/db/application/db.go index fce28600..cd4074bd 100644 --- a/server/internal/db/application/db.go +++ b/server/internal/db/application/db.go @@ -182,7 +182,7 @@ func (d *dbAppImpl) GetDbConnByInstanceId(instanceId uint64) (*dbm.DbConn, error var dbs []*entity.Db if err := d.ListByCond(&entity.Db{InstanceId: instanceId}, &dbs, "id", "database"); err != nil { - return nil, errorx.NewBiz("获取数据库列表失败: ", err) + return nil, errorx.NewBiz("获取数据库列表失败") } if len(dbs) == 0 { return nil, errorx.NewBiz("该实例未配置数据库, 请先进行配置") diff --git a/server/internal/db/application/db_backup.go b/server/internal/db/application/db_backup.go index 85b2b701..871a6c17 100644 --- a/server/internal/db/application/db_backup.go +++ b/server/internal/db/application/db_backup.go @@ -4,11 +4,12 @@ import ( "context" "encoding/binary" "fmt" - "github.com/google/uuid" "mayfly-go/internal/db/domain/entity" "mayfly-go/internal/db/domain/repository" "mayfly-go/pkg/model" "time" + + "github.com/google/uuid" ) func newDbBackupApp(repositories *repository.Repositories, dbApp Db) (*DbBackupApp, error) { @@ -75,6 +76,11 @@ func (app *DbBackupApp) GetDbNamesWithoutBackup(instanceId uint64, dbNames []str return app.backupRepo.GetDbNamesWithoutBackup(instanceId, dbNames) } +// GetPageList 分页获取数据库备份历史 +func (app *DbBackupApp) GetHistoryPageList(condition *entity.DbBackupHistoryQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) { + return app.backupHistoryRepo.GetHistories(condition, pageParam, toEntity, orderBy...) +} + func withRunBackupTask(app *DbBackupApp) dbSchedulerOption[*entity.DbBackup] { return func(scheduler *dbScheduler[*entity.DbBackup]) { scheduler.RunTask = app.runTask @@ -139,18 +145,18 @@ func NewIncUUID() (uuid.UUID, error) { return uid, nil } -func newDbBackupHistoryApp(repositories *repository.Repositories) (*DbBackupHistoryApp, error) { - app := &DbBackupHistoryApp{ - repo: repositories.BackupHistory, - } - return app, nil -} +// func newDbBackupHistoryApp(repositories *repository.Repositories) (*DbBackupHistoryApp, error) { +// app := &DbBackupHistoryApp{ +// repo: repositories.BackupHistory, +// } +// return app, nil +// } -type DbBackupHistoryApp struct { - repo repository.DbBackupHistory -} +// type DbBackupHistoryApp struct { +// repo repository.DbBackupHistory +// } -// GetPageList 分页获取数据库备份历史 -func (app *DbBackupHistoryApp) GetPageList(condition *entity.DbBackupHistoryQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) { - return app.repo.GetHistories(condition, pageParam, toEntity, orderBy...) -} +// // GetPageList 分页获取数据库备份历史 +// func (app *DbBackupHistoryApp) GetPageList(condition *entity.DbBackupHistoryQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) { +// return app.repo.GetHistories(condition, pageParam, toEntity, orderBy...) +// } diff --git a/server/internal/db/application/db_data_sync.go b/server/internal/db/application/db_data_sync.go index dac48e34..b34592ef 100644 --- a/server/internal/db/application/db_data_sync.go +++ b/server/internal/db/application/db_data_sync.go @@ -24,27 +24,36 @@ type DataSyncTask interface { Save(ctx context.Context, instanceEntity *entity.DataSyncTask) error - // Delete 删除数据库信息 Delete(ctx context.Context, id uint64) error InitCronJob() AddCronJob(taskEntity *entity.DataSyncTask) + AddCronJobById(id uint64) error + RemoveCronJob(taskEntity *entity.DataSyncTask) + RemoveCronJobById(id uint64) error + RemoveCronJobByKey(taskKey string) + RunCronJob(id uint64) + + GetTaskLogList(condition *entity.DataSyncLogQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) } -func newDataSyncApp(dataSyncRepo repository.DataSyncTask) DataSyncTask { +func newDataSyncApp(dataSyncRepo repository.DataSyncTask, dataSyncLogRepo repository.DataSyncLog) DataSyncTask { app := new(dataSyncAppImpl) app.Repo = dataSyncRepo + app.dataSyncLogRepo = dataSyncLogRepo return app } type dataSyncAppImpl struct { base.AppImpl[*entity.DataSyncTask, repository.DataSyncTask] + + dataSyncLogRepo repository.DataSyncLog } func (app *dataSyncAppImpl) GetPageList(condition *entity.DataSyncTaskQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) { @@ -114,6 +123,10 @@ func (app *dataSyncAppImpl) changeRunningState(id uint64, state int8) { func (app *dataSyncAppImpl) RunCronJob(id uint64) { // 查询最新的任务信息 task, err := app.GetById(new(entity.DataSyncTask), id) + if err != nil { + logx.Warnf("[%d]任务不存在", id) + return + } if task.RunningState == entity.DataSyncTaskRunStateRunning { logx.Warnf("数据同步任务正在执行中:%s => %s", task.TaskName, task.TaskKey) return @@ -184,7 +197,7 @@ func (app *dataSyncAppImpl) RunCronJob(id uint64) { var fieldMap []map[string]string err = json.Unmarshal([]byte(task.FieldMap), &fieldMap) if err != nil { - app.endRunning(task, entity.DataSyncTaskStateFail, fmt.Sprintf("解析字段映射json出错"), sql, resSize) + app.endRunning(task, entity.DataSyncTaskStateFail, "解析字段映射json出错", sql, resSize) return } @@ -304,9 +317,10 @@ func (app *dataSyncAppImpl) endRunning(taskEntity *entity.DataSyncTask, state in app.saveLog(taskEntity.Id, state, msg, sql, resNum) } + func (app *dataSyncAppImpl) saveLog(taskId uint64, state int8, msg string, sql string, resNum int) { now := time.Now() - _ = GetDataSyncLogApp().Insert(context.Background(), &entity.DataSyncLog{ + _ = app.dataSyncLogRepo.Insert(context.Background(), &entity.DataSyncLog{ TaskId: taskId, CreateTime: &now, DataSqlFull: sql, @@ -357,3 +371,7 @@ func (app *dataSyncAppImpl) InitCronJob() { _, _ = app.GetPageList(cond, pageParam, jobs) } } + +func (app *dataSyncAppImpl) GetTaskLogList(condition *entity.DataSyncLogQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) { + return app.dataSyncLogRepo.GetTaskLogList(condition, pageParam, toEntity, orderBy...) +} diff --git a/server/internal/db/application/db_data_sync_log.go b/server/internal/db/application/db_data_sync_log.go deleted file mode 100644 index 80859a7c..00000000 --- a/server/internal/db/application/db_data_sync_log.go +++ /dev/null @@ -1,29 +0,0 @@ -package application - -import ( - "mayfly-go/internal/db/domain/entity" - "mayfly-go/internal/db/domain/repository" - "mayfly-go/pkg/base" - "mayfly-go/pkg/model" -) - -type DataSyncLog interface { - base.App[*entity.DataSyncLog] - - // GetTaskLogList 分页获取数据库实例 - GetTaskLogList(condition *entity.DataSyncLogQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) -} - -func newDataSyncLogApp(dataSyncRepo repository.DataSyncLog) DataSyncLog { - app := new(dataSyncLogAppImpl) - app.Repo = dataSyncRepo - return app -} - -type dataSyncLogAppImpl struct { - base.AppImpl[*entity.DataSyncLog, repository.DataSyncLog] -} - -func (app *dataSyncLogAppImpl) GetTaskLogList(condition *entity.DataSyncLogQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) { - return app.GetRepo().GetTaskLogList(condition, pageParam, toEntity, orderBy...) -} diff --git a/server/internal/db/application/db_restore.go b/server/internal/db/application/db_restore.go index cc8b0357..2c48006f 100644 --- a/server/internal/db/application/db_restore.go +++ b/server/internal/db/application/db_restore.go @@ -73,6 +73,11 @@ func (app *DbRestoreApp) GetDbNamesWithoutRestore(instanceId uint64, dbNames []s return app.restoreRepo.GetDbNamesWithoutRestore(instanceId, dbNames) } +// 分页获取数据库备份历史 +func (app *DbRestoreApp) GetHistoryPageList(condition *entity.DbRestoreHistoryQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) { + return app.restoreHistoryRepo.GetDbRestoreHistories(condition, pageParam, toEntity, orderBy...) +} + func (app *DbRestoreApp) runTask(ctx context.Context, task *entity.DbRestore) error { conn, err := app.dbApp.GetDbConnByInstanceId(task.DbInstanceId) if err != nil { @@ -173,19 +178,3 @@ func withRunRestoreTask(app *DbRestoreApp) dbSchedulerOption[*entity.DbRestore] scheduler.RunTask = app.runTask } } - -func newDbRestoreHistoryApp(repositories *repository.Repositories) (*DbRestoreHistoryApp, error) { - app := &DbRestoreHistoryApp{ - repo: repositories.RestoreHistory, - } - return app, nil -} - -type DbRestoreHistoryApp struct { - repo repository.DbRestoreHistory -} - -// GetPageList 分页获取数据库备份历史 -func (app *DbRestoreHistoryApp) GetPageList(condition *entity.DbRestoreHistoryQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) { - return app.repo.GetDbRestoreHistories(condition, pageParam, toEntity, orderBy...) -} diff --git a/server/internal/db/application/db_sql_exec.go b/server/internal/db/application/db_sql_exec.go index 97a7e291..6522881e 100644 --- a/server/internal/db/application/db_sql_exec.go +++ b/server/internal/db/application/db_sql_exec.go @@ -72,7 +72,7 @@ func createSqlExecRecord(ctx context.Context, execSqlReq *DbSqlExecReq) *entity. dbSqlExecRecord.Db = execSqlReq.Db dbSqlExecRecord.Sql = execSqlReq.Sql dbSqlExecRecord.Remark = execSqlReq.Remark - dbSqlExecRecord.SetBaseInfo(contextx.GetLoginAccount(ctx)) + dbSqlExecRecord.SetBaseInfo(model.IdGenTypeNone, contextx.GetLoginAccount(ctx)) return dbSqlExecRecord } diff --git a/server/internal/db/dbm/db_program.go b/server/internal/db/dbm/db_program.go index ce06736a..f8c7d213 100644 --- a/server/internal/db/dbm/db_program.go +++ b/server/internal/db/dbm/db_program.go @@ -9,9 +9,13 @@ import ( type DbProgram interface { Backup(ctx context.Context, backupHistory *entity.DbBackupHistory) (*entity.BinlogInfo, error) + FetchBinlogs(ctx context.Context, downloadLatestBinlogFile bool, earliestBackupSequence, latestBinlogSequence int64) ([]*entity.BinlogFile, error) + ReplayBinlog(ctx context.Context, originalDatabase, targetDatabase string, restoreInfo *RestoreInfo) error + RestoreBackupHistory(ctx context.Context, dbName string, dbBackupId uint64, dbBackupHistoryUuid string) error + GetBinlogEventPositionAtOrAfterTime(ctx context.Context, binlogName string, targetTime time.Time) (position int64, parseErr error) } diff --git a/server/internal/db/dbm/db_program_mysql.go b/server/internal/db/dbm/db_program_mysql.go index 827cf0f5..c95b64ea 100644 --- a/server/internal/db/dbm/db_program_mysql.go +++ b/server/internal/db/dbm/db_program_mysql.go @@ -40,21 +40,27 @@ func NewDbProgramMysql(dbConn *DbConn) *DbProgramMysql { } func (svc *DbProgramMysql) dbInfo() *DbInfo { - return svc.dbConn.Info + dbInfo := svc.dbConn.Info + err := dbInfo.IfUseSshTunnelChangeIpPort() + if err != nil { + logx.Errorf("通过ssh隧道连接db失败: %s", err.Error()) + } + return dbInfo } func (svc *DbProgramMysql) getMysqlBin() *config.MysqlBin { if svc.mysqlBin != nil { return svc.mysqlBin } + dbInfo := svc.dbInfo() var mysqlBin *config.MysqlBin - switch svc.dbInfo().Type { + switch dbInfo.Type { case DbTypeMariadb: mysqlBin = config.GetMysqlBin(config.ConfigKeyDbMariadbBin) case DbTypeMysql: mysqlBin = config.GetMysqlBin(config.ConfigKeyDbMysqlBin) default: - panic(fmt.Sprintf("不兼容 MySQL 的数据库类型: %v", svc.dbInfo().Type)) + panic(fmt.Sprintf("不兼容 MySQL 的数据库类型: %v", dbInfo.Type)) } svc.mysqlBin = mysqlBin return svc.mysqlBin @@ -81,11 +87,12 @@ func (svc *DbProgramMysql) Backup(ctx context.Context, backupHistory *entity.DbB _ = os.Remove(tmpFile) }() + dbInfo := svc.dbInfo() args := []string{ - "--host", svc.dbInfo().Host, - "--port", strconv.Itoa(svc.dbInfo().Port), - "--user", svc.dbInfo().Username, - "--password=" + svc.dbInfo().Password, + "--host", dbInfo.Host, + "--port", strconv.Itoa(dbInfo.Port), + "--user", dbInfo.Username, + "--password=" + dbInfo.Password, "--add-drop-database", "--result-file", tmpFile, "--single-transaction", @@ -123,12 +130,13 @@ func (svc *DbProgramMysql) Backup(ctx context.Context, backupHistory *entity.DbB } func (svc *DbProgramMysql) RestoreBackupHistory(ctx context.Context, dbName string, dbBackupId uint64, dbBackupHistoryUuid string) error { + dbInfo := svc.dbInfo() args := []string{ - "--host", svc.dbInfo().Host, - "--port", strconv.Itoa(svc.dbInfo().Port), + "--host", dbInfo.Host, + "--port", strconv.Itoa(dbInfo.Port), "--database", dbName, - "--user", svc.dbInfo().Username, - "--password=" + svc.dbInfo().Password, + "--user", dbInfo.Username, + "--password=" + dbInfo.Password, } fileName := filepath.Join(svc.getDbBackupDir(svc.dbInfo().InstanceId, dbBackupId), @@ -157,7 +165,8 @@ func (svc *DbProgramMysql) downloadBinlogFilesOnServer(ctx context.Context, binl logx.Debug("No binlog file found on server to download") return nil } - if err := os.MkdirAll(svc.getBinlogDir(svc.dbInfo().InstanceId), os.ModePerm); err != nil { + dbInfo := svc.dbInfo() + if err := os.MkdirAll(svc.getBinlogDir(dbInfo.InstanceId), os.ModePerm); err != nil { return errors.Wrapf(err, "创建 binlog 目录失败: %q", svc.getBinlogDir(svc.dbInfo().InstanceId)) } latestBinlogFileOnServer := binlogFilesOnServerSorted[len(binlogFilesOnServerSorted)-1] @@ -166,7 +175,7 @@ func (svc *DbProgramMysql) downloadBinlogFilesOnServer(ctx context.Context, binl if isLatest && !downloadLatestBinlogFile { continue } - binlogFilePath := filepath.Join(svc.getBinlogDir(svc.dbInfo().InstanceId), fileOnServer.Name) + binlogFilePath := filepath.Join(svc.getBinlogDir(dbInfo.InstanceId), fileOnServer.Name) logx.Debug("Downloading binlog file from MySQL server.", logx.String("path", binlogFilePath), logx.Bool("isLatest", isLatest)) if err := svc.downloadBinlogFile(ctx, fileOnServer, isLatest); err != nil { logx.Error("下载 binlog 文件失败", logx.String("path", binlogFilePath), logx.String("error", err.Error())) @@ -287,15 +296,16 @@ func (svc *DbProgramMysql) fetchBinlogs(ctx context.Context, downloadLatestBinlo // It may keep growing as there are ongoing writes to the database. So we just need to check that // the file size is larger or equal to the binlog file size we queried from the MySQL server earlier. func (svc *DbProgramMysql) downloadBinlogFile(ctx context.Context, binlogFileToDownload *entity.BinlogFile, isLast bool) error { - tempBinlogPrefix := filepath.Join(svc.getBinlogDir(svc.dbInfo().InstanceId), "tmp-") + dbInfo := svc.dbInfo() + tempBinlogPrefix := filepath.Join(svc.getBinlogDir(dbInfo.InstanceId), "tmp-") args := []string{ binlogFileToDownload.Name, "--read-from-remote-server", // Verify checksum binlog events. "--verify-binlog-checksum", - "--host", svc.dbInfo().Host, - "--port", strconv.Itoa(svc.dbInfo().Port), - "--user", svc.dbInfo().Username, + "--host", dbInfo.Host, + "--port", strconv.Itoa(dbInfo.Port), + "--user", dbInfo.Username, "--raw", // With --raw this is a prefix for the file names. "--result-file", tempBinlogPrefix, @@ -304,8 +314,8 @@ func (svc *DbProgramMysql) downloadBinlogFile(ctx context.Context, binlogFileToD cmd := exec.CommandContext(ctx, svc.getMysqlBin().MysqlbinlogPath, args...) // We cannot set password as a flag. Otherwise, there is warning message // "mysqlbinlog: [Warning] Using a password on the command line interface can be insecure." - if svc.dbInfo().Password != "" { - cmd.Env = append(cmd.Env, fmt.Sprintf("MYSQL_PWD=%s", svc.dbInfo().Password)) + if dbInfo.Password != "" { + cmd.Env = append(cmd.Env, fmt.Sprintf("MYSQL_PWD=%s", dbInfo.Password)) } logx.Debug("Downloading binlog files using mysqlbinlog:", cmd.String()) @@ -531,18 +541,19 @@ func (svc *DbProgramMysql) ReplayBinlog(ctx context.Context, originalDatabase, t "--stop-position", fmt.Sprintf("%d", restoreInfo.TargetPosition), } - mysqlbinlogArgs = append(mysqlbinlogArgs, restoreInfo.GetBinlogPaths(svc.getBinlogDir(svc.dbInfo().InstanceId))...) + dbInfo := svc.dbInfo() + mysqlbinlogArgs = append(mysqlbinlogArgs, restoreInfo.GetBinlogPaths(svc.getBinlogDir(dbInfo.InstanceId))...) mysqlArgs := []string{ - "--host", svc.dbInfo().Host, - "--port", strconv.Itoa(svc.dbInfo().Port), - "--user", svc.dbInfo().Username, + "--host", dbInfo.Host, + "--port", strconv.Itoa(dbInfo.Port), + "--user", dbInfo.Username, } - if svc.dbInfo().Password != "" { + if dbInfo.Password != "" { // The --password parameter of mysql/mysqlbinlog does not support the "--password PASSWORD" format (split by space). // If provided like that, the program will hang. - mysqlArgs = append(mysqlArgs, fmt.Sprintf("--password=%s", svc.dbInfo().Password)) + mysqlArgs = append(mysqlArgs, fmt.Sprintf("--password=%s", dbInfo.Password)) } mysqlbinlogCmd := exec.CommandContext(ctx, svc.getMysqlBin().MysqlbinlogPath, mysqlbinlogArgs...) @@ -649,11 +660,12 @@ func runCmd(cmd *exec.Cmd) error { } func (svc *DbProgramMysql) execute(database string, sql string) error { + dbInfo := svc.dbInfo() args := []string{ - "--host", svc.dbInfo().Host, - "--port", strconv.Itoa(svc.dbInfo().Port), - "--user", svc.dbInfo().Username, - "--password=" + svc.dbInfo().Password, + "--host", dbInfo.Host, + "--port", strconv.Itoa(dbInfo.Port), + "--user", dbInfo.Username, + "--password=" + dbInfo.Password, "--execute", sql, } if len(database) > 0 { diff --git a/server/internal/db/dbm/dialect_dm.go b/server/internal/db/dbm/dialect_dm.go index 0a049410..88d679ec 100644 --- a/server/internal/db/dbm/dialect_dm.go +++ b/server/internal/db/dbm/dialect_dm.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "fmt" - machineapp "mayfly-go/internal/machine/application" "mayfly-go/pkg/errorx" "mayfly-go/pkg/logx" "mayfly-go/pkg/utils/anyx" @@ -29,18 +28,9 @@ func getDmDB(d *DbInfo) (*sql.DB, error) { } } - // 开启ssh隧道 - if d.SshTunnelMachineId > 0 { - sshTunnelMachine, err := machineapp.GetMachineApp().GetSshTunnelMachine(d.SshTunnelMachineId) - if err != nil { - return nil, err - } - exposedIp, exposedPort, err := sshTunnelMachine.OpenSshTunnel(fmt.Sprintf("db:%d", d.Id), d.Host, d.Port) - if err != nil { - return nil, err - } - d.Host = exposedIp - d.Port = exposedPort + err := d.IfUseSshTunnelChangeIpPort() + if err != nil { + return nil, err } dsn := fmt.Sprintf("dm://%s:%s@%s:%d/%s", d.Username, d.Password, d.Host, d.Port, dbParam) @@ -299,6 +289,7 @@ func (pd *DMDialect) WrapName(name string) string { func (pd *DMDialect) PageSql(pageNum int, pageSize int) string { return fmt.Sprintf("LIMIT %d OFFSET %d", pageSize, (pageNum-1)*pageSize) } + func (pd *DMDialect) GetDataType(dbColumnType string) DataType { if regexp.MustCompile(`(?i)int|double|float|number|decimal|byte|bit`).MatchString(dbColumnType) { return DataTypeNumber diff --git a/server/internal/db/dbm/dialect_mysql.go b/server/internal/db/dbm/dialect_mysql.go index 3ed164df..f9141c2a 100644 --- a/server/internal/db/dbm/dialect_mysql.go +++ b/server/internal/db/dbm/dialect_mysql.go @@ -4,13 +4,14 @@ import ( "context" "database/sql" "fmt" - "github.com/go-sql-driver/mysql" machineapp "mayfly-go/internal/machine/application" "mayfly-go/pkg/errorx" "mayfly-go/pkg/utils/anyx" "net" "regexp" "strings" + + "github.com/go-sql-driver/mysql" ) func getMysqlDB(d *DbInfo) (*sql.DB, error) { @@ -212,6 +213,7 @@ func (pd *MysqlDialect) WrapName(name string) string { func (pd *MysqlDialect) PageSql(pageNum int, pageSize int) string { return fmt.Sprintf("limit %d, %d", (pageNum-1)*pageSize, pageSize) } + func (pd *MysqlDialect) GetDataType(dbColumnType string) DataType { if regexp.MustCompile(`(?i)int|double|float|number|decimal|byte|bit`).MatchString(dbColumnType) { return DataTypeNumber @@ -230,6 +232,7 @@ func (pd *MysqlDialect) GetDataType(dbColumnType string) DataType { } return DataTypeString } + func (pd *MysqlDialect) SaveBatch(conn *DbConn, tableName string, columns string, placeholder string, values [][]any) error { // 执行批量insert sql,mysql支持批量insert语法 // insert into table_name (column1, column2, ...) values (value1, value2, ...), (value1, value2, ...), ... diff --git a/server/internal/db/dbm/dialect_pgsql.go b/server/internal/db/dbm/dialect_pgsql.go index 214bfffe..3f79334d 100644 --- a/server/internal/db/dbm/dialect_pgsql.go +++ b/server/internal/db/dbm/dialect_pgsql.go @@ -291,6 +291,7 @@ func (pd *PgsqlDialect) WrapName(name string) string { func (pd *PgsqlDialect) PageSql(pageNum int, pageSize int) string { return fmt.Sprintf("LIMIT %d OFFSET %d", pageSize, (pageNum-1)*pageSize) } + func (pd *PgsqlDialect) GetDataType(dbColumnType string) DataType { if regexp.MustCompile(`(?i)int|double|float|number|decimal|byte|bit`).MatchString(dbColumnType) { return DataTypeNumber @@ -309,6 +310,7 @@ func (pd *PgsqlDialect) GetDataType(dbColumnType string) DataType { } return DataTypeString } + func (pd *PgsqlDialect) SaveBatch(conn *DbConn, tableName string, columns string, placeholder string, values [][]any) error { // 执行批量insert sql,跟mysql一样 pg或高斯支持批量insert语法 // insert into table_name (column1, column2, ...) values (value1, value2, ...), (value1, value2, ...), ... diff --git a/server/internal/db/dbm/info.go b/server/internal/db/dbm/info.go index 536835ec..99c393dd 100644 --- a/server/internal/db/dbm/info.go +++ b/server/internal/db/dbm/info.go @@ -3,6 +3,7 @@ package dbm import ( "database/sql" "fmt" + machineapp "mayfly-go/internal/machine/application" "mayfly-go/pkg/errorx" "mayfly-go/pkg/logx" ) @@ -72,6 +73,24 @@ func (dbInfo *DbInfo) Conn() (*DbConn, error) { return dbc, nil } +// 如果使用了ssh隧道,将其host port改变其本地映射host port +func (di *DbInfo) IfUseSshTunnelChangeIpPort() error { + // 开启ssh隧道 + if di.SshTunnelMachineId > 0 { + sshTunnelMachine, err := machineapp.GetMachineApp().GetSshTunnelMachine(di.SshTunnelMachineId) + if err != nil { + return err + } + exposedIp, exposedPort, err := sshTunnelMachine.OpenSshTunnel(fmt.Sprintf("db:%d", di.Id), di.Host, di.Port) + if err != nil { + return err + } + di.Host = exposedIp + di.Port = exposedPort + } + return nil +} + // 获取连接id func GetDbConnId(dbId uint64, db string) string { if dbId == 0 { diff --git a/server/internal/db/domain/entity/db_data_sync.go b/server/internal/db/domain/entity/db_data_sync.go index 56342c51..3324f731 100644 --- a/server/internal/db/domain/entity/db_data_sync.go +++ b/server/internal/db/domain/entity/db_data_sync.go @@ -38,7 +38,7 @@ func (d *DataSyncTask) TableName() string { } type DataSyncLog struct { - Id uint64 `json:"id"` // 自增主键 + model.IdModel TaskId uint64 `orm:"column(task_id)" json:"taskId"` // 任务表id CreateTime *time.Time `orm:"column(create_time)" json:"createTime"` DataSqlFull string `orm:"column(data_sql_full)" json:"dataSqlFull"` // 执行的完整sql @@ -47,10 +47,6 @@ type DataSyncLog struct { Status int8 `orm:"column(status)" json:"status"` // 状态:1.成功 -1.失败 } -func (d *DataSyncLog) SetBaseInfo(account *model.LoginAccount) { - //TODO implement me -} - func (d *DataSyncLog) TableName() string { return "t_db_data_sync_log" } diff --git a/server/internal/db/domain/repository/db_backup_history.go b/server/internal/db/domain/repository/db_backup_history.go index 94f92a98..46967dae 100644 --- a/server/internal/db/domain/repository/db_backup_history.go +++ b/server/internal/db/domain/repository/db_backup_history.go @@ -11,6 +11,8 @@ type DbBackupHistory interface { // GetDbBackupHistories 分页获取数据备份历史 GetHistories(condition *entity.DbBackupHistoryQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) + GetLatestHistory(instanceId uint64, dbName string, bi *entity.BinlogInfo) (*entity.DbBackupHistory, error) + GetEarliestHistory(instanceId uint64) (*entity.DbBackupHistory, error) } diff --git a/server/internal/db/domain/repository/db_binlog_history.go b/server/internal/db/domain/repository/db_binlog_history.go index b2aa9b71..0085b52f 100644 --- a/server/internal/db/domain/repository/db_binlog_history.go +++ b/server/internal/db/domain/repository/db_binlog_history.go @@ -9,9 +9,14 @@ import ( type DbBinlogHistory interface { base.Repo[*entity.DbBinlogHistory] + GetHistories(instanceId uint64, start, target *entity.BinlogInfo) ([]*entity.DbBinlogHistory, error) + GetHistoryByTime(instanceId uint64, targetTime time.Time) (*entity.DbBinlogHistory, error) + GetLatestHistory(instanceId uint64) (*entity.DbBinlogHistory, bool, error) + InsertWithBinlogFiles(ctx context.Context, instanceId uint64, binlogFiles []*entity.BinlogFile) error + Upsert(ctx context.Context, history *entity.DbBinlogHistory) error } diff --git a/server/internal/db/router/db_backup.go b/server/internal/db/router/db_backup.go index 66c67dfc..28b125d5 100644 --- a/server/internal/db/router/db_backup.go +++ b/server/internal/db/router/db_backup.go @@ -1,10 +1,11 @@ package router import ( - "github.com/gin-gonic/gin" "mayfly-go/internal/db/api" "mayfly-go/internal/db/application" "mayfly-go/pkg/req" + + "github.com/gin-gonic/gin" ) func InitDbBackupRouter(router *gin.RouterGroup) { @@ -32,6 +33,9 @@ func InitDbBackupRouter(router *gin.RouterGroup) { req.NewDelete(":dbId/backups/:backupId", d.Delete), // 获取未配置定时备份的数据库名称 req.NewGet(":dbId/db-names-without-backup", d.GetDbNamesWithoutBackup), + + // 获取数据库备份历史 + req.NewGet(":dbId/backup-histories/", d.GetHistoryPageList), } req.BatchSetGroup(dbs, reqs) diff --git a/server/internal/db/router/db_backup_hisotry.go b/server/internal/db/router/db_backup_hisotry.go deleted file mode 100644 index 7f6c14be..00000000 --- a/server/internal/db/router/db_backup_hisotry.go +++ /dev/null @@ -1,26 +0,0 @@ -package router - -import ( - "github.com/gin-gonic/gin" - "mayfly-go/internal/db/api" - "mayfly-go/internal/db/application" - "mayfly-go/pkg/req" -) - -func InitDbBackupHistoryRouter(router *gin.RouterGroup) { - dbs := router.Group("/dbs") - - d := &api.DbBackupHistory{ - DbBackupHistoryApp: application.GetDbBackupHistoryApp(), - DbApp: application.GetDbApp(), - } - - reqs := []*req.Conf{ - // 获取数据库备份历史 - req.NewGet(":dbId/backup-histories/", d.GetPageList), - // 删除数据库备份历史 - req.NewDelete(":dbId/backups/:backupId/histories/:historyId", d.Delete), - } - - req.BatchSetGroup(dbs, reqs) -} diff --git a/server/internal/db/router/db_data_sync.go b/server/internal/db/router/db_data_sync.go index 2c49361a..f2bd780f 100644 --- a/server/internal/db/router/db_data_sync.go +++ b/server/internal/db/router/db_data_sync.go @@ -13,7 +13,6 @@ func InitDbDataSyncRouter(router *gin.RouterGroup) { d := &api.DataSyncTask{ DataSyncTaskApp: application.GetDataSyncTaskApp(), - DataSyncLogApp: application.GetDataSyncLogApp(), } reqs := [...]*req.Conf{ diff --git a/server/internal/db/router/db_restore.go b/server/internal/db/router/db_restore.go index 0ee62b7b..a4117cd8 100644 --- a/server/internal/db/router/db_restore.go +++ b/server/internal/db/router/db_restore.go @@ -1,10 +1,11 @@ package router import ( - "github.com/gin-gonic/gin" "mayfly-go/internal/db/api" "mayfly-go/internal/db/application" "mayfly-go/pkg/req" + + "github.com/gin-gonic/gin" ) func InitDbRestoreRouter(router *gin.RouterGroup) { @@ -30,6 +31,9 @@ func InitDbRestoreRouter(router *gin.RouterGroup) { req.NewDelete(":dbId/restores/:restoreId", d.Delete), // 获取未配置定时恢复的数据库名称 req.NewGet(":dbId/db-names-without-restore", d.GetDbNamesWithoutRestore), + + // 获取数据库备份历史 + req.NewGet(":dbId/restores/:restoreId/histories", d.GetHistoryPageList), } req.BatchSetGroup(dbs, reqs) diff --git a/server/internal/db/router/db_restore_hisotry.go b/server/internal/db/router/db_restore_hisotry.go deleted file mode 100644 index 6ba496dd..00000000 --- a/server/internal/db/router/db_restore_hisotry.go +++ /dev/null @@ -1,25 +0,0 @@ -package router - -import ( - "github.com/gin-gonic/gin" - "mayfly-go/internal/db/api" - "mayfly-go/internal/db/application" - "mayfly-go/pkg/req" -) - -func InitDbRestoreHistoryRouter(router *gin.RouterGroup) { - dbs := router.Group("/dbs") - - d := &api.DbRestoreHistory{ - DbRestoreHistoryApp: application.GetDbRestoreHistoryApp(), - } - - reqs := []*req.Conf{ - // 获取数据库备份历史 - req.NewGet(":dbId/restores/:restoreId/histories", d.GetPageList), - // 删除数据库备份历史 - req.NewDelete(":dbId/restores/:restoreId/histories/:historyId", d.Delete), - } - - req.BatchSetGroup(dbs, reqs) -} diff --git a/server/internal/db/router/router.go b/server/internal/db/router/router.go index 0eeb5f6c..ee2713ff 100644 --- a/server/internal/db/router/router.go +++ b/server/internal/db/router/router.go @@ -8,8 +8,6 @@ func Init(router *gin.RouterGroup) { InitDbSqlRouter(router) InitDbSqlExecRouter(router) InitDbBackupRouter(router) - InitDbBackupHistoryRouter(router) InitDbRestoreRouter(router) - InitDbRestoreHistoryRouter(router) InitDbDataSyncRouter(router) } diff --git a/server/internal/machine/domain/entity/machine_cronjob.go b/server/internal/machine/domain/entity/machine_cronjob.go index a25d3ead..c14cd49d 100644 --- a/server/internal/machine/domain/entity/machine_cronjob.go +++ b/server/internal/machine/domain/entity/machine_cronjob.go @@ -30,7 +30,7 @@ type MachineCronJobRelate struct { CreateTime *time.Time } -func (m *MachineCronJobRelate) SetBaseInfo(la *model.LoginAccount) { +func (m *MachineCronJobRelate) SetBaseInfo(gt model.IdGenType, la *model.LoginAccount) { now := time.Now() m.CreateTime = &now m.Creator = la.Username diff --git a/server/internal/machine/mcm/sshtunnel.go b/server/internal/machine/mcm/sshtunnel.go index b8519623..8f8f048b 100644 --- a/server/internal/machine/mcm/sshtunnel.go +++ b/server/internal/machine/mcm/sshtunnel.go @@ -75,6 +75,12 @@ func (stm *SshTunnelMachine) OpenSshTunnel(id string, ip string, port int) (expo stm.mutex.Lock() defer stm.mutex.Unlock() + tunnel := stm.tunnels[id] + // 已存在该id隧道,则直接返回 + if tunnel != nil { + return tunnel.localHost, tunnel.localPort, nil + } + localPort, err := netx.GetAvailablePort() if err != nil { return "", 0, err @@ -93,7 +99,7 @@ func (stm *SshTunnelMachine) OpenSshTunnel(id string, ip string, port int) (expo return "", 0, err } - tunnel := &Tunnel{ + tunnel = &Tunnel{ id: id, machineId: stm.machineId, localHost: hostname, diff --git a/server/internal/sys/domain/entity/resource.go b/server/internal/sys/domain/entity/resource.go index 095264b2..e1babe2e 100644 --- a/server/internal/sys/domain/entity/resource.go +++ b/server/internal/sys/domain/entity/resource.go @@ -18,6 +18,11 @@ func (a *Resource) TableName() string { return "t_sys_resource" } +func (m *Resource) SetBaseInfo(idGenType model.IdGenType, la *model.LoginAccount) { + // id使用时间戳,减少id冲突概率 + m.Model.SetBaseInfo(model.IdGenTypeTimestamp, la) +} + const ( ResourceStatusEnable int8 = 1 // 启用状态 ResourceStatusDisable int8 = -1 // 禁用状态 diff --git a/server/pkg/base/repo.go b/server/pkg/base/repo.go index a8867ac2..81c956f5 100644 --- a/server/pkg/base/repo.go +++ b/server/pkg/base/repo.go @@ -2,10 +2,11 @@ package base import ( "context" - "gorm.io/gorm" "mayfly-go/pkg/contextx" "mayfly-go/pkg/gormx" "mayfly-go/pkg/model" + + "gorm.io/gorm" ) // 基础repo接口 @@ -182,7 +183,7 @@ func (br *RepoImpl[T]) GetModel() T { // 从上下文获取登录账号信息,并赋值至实体 func (br *RepoImpl[T]) setBaseInfo(ctx context.Context, e T) T { if la := contextx.GetLoginAccount(ctx); la != nil { - e.SetBaseInfo(la) + e.SetBaseInfo(model.IdGenTypeNone, la) } return e } diff --git a/server/pkg/model/model.go b/server/pkg/model/model.go index cb2150bb..4cbfdfdf 100644 --- a/server/pkg/model/model.go +++ b/server/pkg/model/model.go @@ -4,6 +4,8 @@ import ( "time" ) +type IdGenType int + const ( IdColumn = "id" DeletedColumn = "is_deleted" // 删除字段 @@ -11,32 +13,77 @@ const ( ModelDeleted int8 = 1 ModelUndeleted int8 = 0 + + IdGenTypeNone IdGenType = 0 // 数据库处理 + IdGenTypeTimestamp IdGenType = 1 // 当前时间戳 ) // 实体接口 type ModelI interface { + // id生成策略 + // IdGenType() IdGenType + // 使用当前登录账号信息设置实体结构体的基础信息 // // 如创建时间,修改时间,创建者,修改者信息 - SetBaseInfo(account *LoginAccount) + SetBaseInfo(idGenType IdGenType, account *LoginAccount) +} + +type IdModel struct { + Id uint64 `json:"id"` +} + +// func (m *IdModel) IdGenType() IdGenType { +// // 默认由数据库自行生成 +// return IdGenTypeNone +// } + +func (m *IdModel) SetBaseInfo(idGenType IdGenType, account *LoginAccount) { + // 存在id,则赋值 + if m.Id != 0 { + return + } + m.Id = GetIdByGenType(idGenType) } // 含有删除字段模型 type DeletedModel struct { - Id uint64 `json:"id"` + IdModel IsDeleted int8 `json:"-" gorm:"column:is_deleted;default:0"` DeleteTime *time.Time `json:"-"` } -func (m *DeletedModel) SetBaseInfo(account *LoginAccount) { - isCreate := m.Id == 0 - if isCreate { +func (m *DeletedModel) SetBaseInfo(idGenType IdGenType, account *LoginAccount) { + if m.Id == 0 { + m.IdModel.SetBaseInfo(idGenType, account) m.IsDeleted = ModelUndeleted } } -// 基础实体模型,数据表最基础字段,每张表必备字段 +// 含有删除、创建字段模型 +type CreateModel struct { + DeletedModel + CreateTime *time.Time `json:"createTime"` + CreatorId uint64 `json:"creatorId"` + Creator string `json:"creator"` +} + +func (m *CreateModel) SetBaseInfo(idGenType IdGenType, account *LoginAccount) { + if m.Id != 0 { + return + } + + m.DeletedModel.SetBaseInfo(idGenType, account) + nowTime := time.Now() + m.CreateTime = &nowTime + if account != nil { + m.CreatorId = account.Id + m.Creator = account.Username + } +} + +// 基础实体模型,数据表最基础字段,尽量每张表都包含这些字段 type Model struct { DeletedModel @@ -49,12 +96,13 @@ type Model struct { } // 设置基础信息. 如创建时间,修改时间,创建者,修改者信息 -func (m *Model) SetBaseInfo(account *LoginAccount) { +func (m *Model) SetBaseInfo(idGenType IdGenType, account *LoginAccount) { nowTime := time.Now() isCreate := m.Id == 0 if isCreate { m.IsDeleted = ModelUndeleted m.CreateTime = &nowTime + m.IdModel.SetBaseInfo(idGenType, account) } m.UpdateTime = &nowTime @@ -70,3 +118,11 @@ func (m *Model) SetBaseInfo(account *LoginAccount) { m.Modifier = name m.ModifierId = id } + +// 根据id生成类型,生成id +func GetIdByGenType(genType IdGenType) uint64 { + if genType == IdGenTypeTimestamp { + return uint64(time.Now().Unix()) + } + return 0 +}