From d1d372e1bf2f2b40e6d05db3158f31ebd9d1996c Mon Sep 17 00:00:00 2001 From: "meilin.huang" <954537473@qq.com> Date: Thu, 28 Mar 2024 22:20:39 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=95=B0=E6=8D=AE=E8=BF=81=E7=A7=BB?= =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=AE=9E=E6=97=B6=E6=97=A5=E5=BF=97&?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=BA=93=E6=B8=B8=E6=A0=87=E9=81=8D=E5=8E=86?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E9=97=AE=E9=A2=98=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mayfly_go_web/src/common/config.ts | 2 +- .../src/components/terminal/TerminalBody.vue | 30 ++- .../src/components/terminal/TerminalLog.vue | 102 ++++++++++ .../src/views/ops/component/TagTree.vue | 8 +- .../src/views/ops/db/DbTransferList.vue | 40 ++-- .../src/views/ops/db/DbTransferLog.vue | 117 ----------- .../src/views/ops/db/SyncTaskList.vue | 12 +- mayfly_go_web/src/views/ops/db/enums.ts | 7 + mayfly_go_web/src/views/system/api.ts | 1 + mayfly_go_web/src/views/system/enums.ts | 1 + .../src/views/system/syslog/SyslogList.vue | 2 +- server/go.mod | 6 +- server/internal/db/api/db_transfer.go | 40 +--- server/internal/db/api/form/db_transfer.go | 5 - server/internal/db/api/vo/db_data_sync.go | 12 +- server/internal/db/api/vo/db_transfer.go | 16 +- server/internal/db/application/db_transfer.go | 184 +++++++++++------- server/internal/db/dbm/dbi/conn.go | 13 +- .../internal/db/domain/entity/db_transfer.go | 28 +-- .../db/domain/repository/db_transfer.go | 7 - .../infrastructure/persistence/db_transfer.go | 15 -- .../infrastructure/persistence/persistence.go | 1 - server/internal/db/router/db_transfer.go | 6 +- server/internal/sys/api/syslog.go | 4 + server/internal/sys/application/syslog.go | 110 ++++++++++- server/internal/sys/domain/entity/syslog.go | 15 +- server/internal/sys/router/syslog.go | 2 + server/pkg/config/app.go | 2 +- server/pkg/logx/logx.go | 16 ++ server/pkg/utils/collx/map.go | 13 ++ server/resources/script/sql/v1.7/v1.7.5.sql | 4 + 31 files changed, 477 insertions(+), 344 deletions(-) create mode 100644 mayfly_go_web/src/components/terminal/TerminalLog.vue delete mode 100644 mayfly_go_web/src/views/ops/db/DbTransferLog.vue diff --git a/mayfly_go_web/src/common/config.ts b/mayfly_go_web/src/common/config.ts index f8cc6ecd..00525b16 100644 --- a/mayfly_go_web/src/common/config.ts +++ b/mayfly_go_web/src/common/config.ts @@ -15,7 +15,7 @@ const config = { baseWsUrl: `${(window as any).globalConfig.BaseWsUrl || `${location.protocol == 'https:' ? 'wss:' : 'ws:'}//${getBaseApiUrl()}`}/api`, // 系统版本 - version: 'v1.7.4', + version: 'v1.7.5', }; export default config; diff --git a/mayfly_go_web/src/components/terminal/TerminalBody.vue b/mayfly_go_web/src/components/terminal/TerminalBody.vue index ee95bc67..ff7c7d47 100644 --- a/mayfly_go_web/src/components/terminal/TerminalBody.vue +++ b/mayfly_go_web/src/components/terminal/TerminalBody.vue @@ -124,6 +124,8 @@ function initTerm() { state.addon.fit = fitAddon; term.loadAddon(fitAddon); fitTerminal(); + // 注册窗口大小监听器 + useEventListener('resize', debounce(fitTerminal, 400)); // 注册搜索组件 const searchAddon = new SearchAddon(); @@ -148,10 +150,11 @@ function initTerm() { } function initSocket() { - if (props.socketUrl) { - socket = new WebSocket(`${props.socketUrl}&rows=${term?.rows}&cols=${term?.cols}`); + if (!props.socketUrl) { + return; } + socket = new WebSocket(`${props.socketUrl}&rows=${term?.rows}&cols=${term?.cols}`); // 监听socket连接 socket.onopen = () => { // 注册心跳 @@ -162,8 +165,6 @@ function initSocket() { term.onResize((event) => sendResize(event.cols, event.rows)); term.onData((event) => sendCmd(event)); - // // 注册窗口大小监听器 - useEventListener('resize', debounce(fitTerminal, 400)); focus(); // 如果有初始要执行的命令,则发送执行命令 @@ -187,10 +188,19 @@ function initSocket() { // 监听socket消息 socket.onmessage = (msg: any) => { // msg.data是真正后端返回的数据 - term.write(msg.data); + write2Term(msg.data); }; } +// 写入内容至终端 +const write2Term = (data: any) => { + term.write(data); +}; + +const writeln2Term = (data: any) => { + term.writeln(data); +}; + const getTerminalTheme = () => { const terminalTheme = themeConfig.value.terminalTheme; // 如果不是自定义主题,则返回内置主题 @@ -229,7 +239,7 @@ enum MsgType { } const send = (msg: any) => { - state.status == TerminalStatus.Connected && socket.send(msg); + state.status == TerminalStatus.Connected && socket?.send(msg); }; const sendResize = (cols: number, rows: number) => { @@ -266,7 +276,7 @@ const getStatus = (): TerminalStatus => { return state.status; }; -defineExpose({ init, fitTerminal, focus, clear, close, getStatus, sendResize }); +defineExpose({ init, fitTerminal, focus, clear, close, getStatus, sendResize, write2Term, writeln2Term }); diff --git a/mayfly_go_web/src/components/terminal/TerminalLog.vue b/mayfly_go_web/src/components/terminal/TerminalLog.vue new file mode 100644 index 00000000..fa3c646d --- /dev/null +++ b/mayfly_go_web/src/components/terminal/TerminalLog.vue @@ -0,0 +1,102 @@ + + + + + diff --git a/mayfly_go_web/src/views/ops/component/TagTree.vue b/mayfly_go_web/src/views/ops/component/TagTree.vue index 1dd82915..5412fa7e 100644 --- a/mayfly_go_web/src/views/ops/component/TagTree.vue +++ b/mayfly_go_web/src/views/ops/component/TagTree.vue @@ -1,7 +1,7 @@ @@ -55,9 +57,10 @@ import { TableColumn } from '@/components/pagetable'; import { hasPerms } from '@/components/auth/auth'; import { SearchItem } from '@/components/SearchForm'; import { getDbDialect } from '@/views/ops/db/dialect'; +import { DbTransferRunningStateEnum } from './enums'; +import TerminalLog from '@/components/terminal/TerminalLog.vue'; const DbTransferEdit = defineAsyncComponent(() => import('./DbTransferEdit.vue')); -const DbTransferLog = defineAsyncComponent(() => import('./DbTransferLog.vue')); const perms = { save: 'db:transfer:save', @@ -72,8 +75,11 @@ const searchItems = [SearchItem.input('name', '名称')]; const columns = ref([ TableColumn.new('srcDb', '源库').setMinWidth(250).isSlot(), TableColumn.new('targetDb', '目标库').setMinWidth(250).isSlot(), - TableColumn.new('modifier', '修改人').alignCenter(), - TableColumn.new('updateTime', '修改时间').alignCenter().isTime(), + TableColumn.new('runningState', '执行状态').typeTag(DbTransferRunningStateEnum), + TableColumn.new('creator', '创建人'), + TableColumn.new('createTime', '创建时间').isTime(), + TableColumn.new('modifier', '修改人'), + TableColumn.new('updateTime', '修改时间').isTime(), ]); // 该用户拥有的的操作列按钮权限 @@ -104,11 +110,13 @@ const state = reactive({ title: '新增数据数据迁移任务', }, logsDialog: { - taskId: 0, + logId: 0, + title: '数据库迁移日志', visible: false, data: null as any, running: false, }, + runBtnDisabled: false, }); const { selectionData, query, editDialog, logsDialog } = toRefs(state); @@ -146,8 +154,9 @@ const stop = async (id: any) => { }; const log = async (data: any) => { - state.logsDialog.taskId = data.id; + state.logsDialog.logId = data.logId; state.logsDialog.visible = true; + state.logsDialog.title = '数据库迁移日志'; state.logsDialog.running = data.state === 1; }; @@ -157,9 +166,18 @@ const reRun = async (data: any) => { cancelButtonText: '取消', type: 'warning', }); - await dbApi.runDbTransferTask.request({ taskId: data.id }); - ElMessage.success('运行成功'); - search(); + try { + state.runBtnDisabled = true; + await dbApi.runDbTransferTask.request({ taskId: data.id }); + ElMessage.success('运行成功'); + } catch (e) { + state.runBtnDisabled = false; + } + // 延迟2秒执行,后端异步执行 + setTimeout(() => { + search(); + state.runBtnDisabled = false; + }, 2000); }; const del = async () => { diff --git a/mayfly_go_web/src/views/ops/db/DbTransferLog.vue b/mayfly_go_web/src/views/ops/db/DbTransferLog.vue deleted file mode 100644 index 833b271a..00000000 --- a/mayfly_go_web/src/views/ops/db/DbTransferLog.vue +++ /dev/null @@ -1,117 +0,0 @@ - - - diff --git a/mayfly_go_web/src/views/ops/db/SyncTaskList.vue b/mayfly_go_web/src/views/ops/db/SyncTaskList.vue index ae13c7d9..3897f9fd 100644 --- a/mayfly_go_web/src/views/ops/db/SyncTaskList.vue +++ b/mayfly_go_web/src/views/ops/db/SyncTaskList.vue @@ -71,11 +71,13 @@ const searchItems = [SearchItem.input('name', '名称')]; // 任务名、修改人、修改时间、最近一次任务执行状态、状态(停用启用)、操作 const columns = ref([ TableColumn.new('taskName', '任务名'), - TableColumn.new('runningState', '运行状态').alignCenter().typeTag(DbDataSyncRunningStateEnum), - TableColumn.new('recentState', '最近任务状态').alignCenter().typeTag(DbDataSyncRecentStateEnum), - TableColumn.new('status', '状态').alignCenter().isSlot(), - TableColumn.new('modifier', '修改人').alignCenter(), - TableColumn.new('updateTime', '修改时间').alignCenter().isTime(), + TableColumn.new('runningState', '运行状态').typeTag(DbDataSyncRunningStateEnum), + TableColumn.new('recentState', '最近任务状态').typeTag(DbDataSyncRecentStateEnum), + TableColumn.new('status', '状态').isSlot(), + TableColumn.new('creator', '创建人'), + TableColumn.new('createTime', '创建时间').isTime(), + TableColumn.new('modifier', '修改人'), + TableColumn.new('updateTime', '修改时间').isTime(), ]); // 该用户拥有的的操作列按钮权限 diff --git a/mayfly_go_web/src/views/ops/db/enums.ts b/mayfly_go_web/src/views/ops/db/enums.ts index 3f8c78d3..11615fe1 100644 --- a/mayfly_go_web/src/views/ops/db/enums.ts +++ b/mayfly_go_web/src/views/ops/db/enums.ts @@ -30,3 +30,10 @@ export const DbDataSyncRunningStateEnum = { Wait: EnumValue.of(2, '待运行').setTagType('primary'), Fail: EnumValue.of(3, '已停止').setTagType('danger'), }; + +export const DbTransferRunningStateEnum = { + Success: EnumValue.of(2, '成功').setTagType('success'), + Wait: EnumValue.of(1, '执行中').setTagType('primary'), + Fail: EnumValue.of(-1, '失败').setTagType('danger'), + Stop: EnumValue.of(-2, '手动终止').setTagType('warning'), +}; diff --git a/mayfly_go_web/src/views/system/api.ts b/mayfly_go_web/src/views/system/api.ts index 9e1472af..b6fd4b07 100644 --- a/mayfly_go_web/src/views/system/api.ts +++ b/mayfly_go_web/src/views/system/api.ts @@ -44,6 +44,7 @@ export const configApi = { export const logApi = { list: Api.newGet('/syslogs'), + detail: Api.newGet('/syslogs/{id}'), }; export const authApi = { diff --git a/mayfly_go_web/src/views/system/enums.ts b/mayfly_go_web/src/views/system/enums.ts index c3835121..c412f821 100644 --- a/mayfly_go_web/src/views/system/enums.ts +++ b/mayfly_go_web/src/views/system/enums.ts @@ -18,4 +18,5 @@ export const RoleStatusEnum = { export const LogTypeEnum = { Success: EnumValue.of(1, '成功').tagTypeSuccess(), Error: EnumValue.of(2, '失败').tagTypeDanger(), + Running: EnumValue.of(-1, '执行中'), }; diff --git a/mayfly_go_web/src/views/system/syslog/SyslogList.vue b/mayfly_go_web/src/views/system/syslog/SyslogList.vue index 8caf995f..a1554c4f 100755 --- a/mayfly_go_web/src/views/system/syslog/SyslogList.vue +++ b/mayfly_go_web/src/views/system/syslog/SyslogList.vue @@ -43,7 +43,7 @@ const columns = [ TableColumn.new('type', '结果').typeTag(LogTypeEnum), TableColumn.new('description', '描述'), TableColumn.new('reqParam', '操作信息').canBeautify(), - TableColumn.new('resp', '响应信息'), + TableColumn.new('resp', '响应信息').canBeautify(), ]; const state = reactive({ diff --git a/server/go.mod b/server/go.mod index a5a76cc3..0ea6265d 100644 --- a/server/go.mod +++ b/server/go.mod @@ -14,7 +14,7 @@ require ( github.com/go-playground/locales v0.14.1 github.com/go-playground/universal-translator v0.18.1 github.com/go-playground/validator/v10 v10.14.0 - github.com/go-sql-driver/mysql v1.8.0 + github.com/go-sql-driver/mysql v1.8.1 github.com/golang-jwt/jwt/v5 v5.2.1 github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.1 @@ -37,8 +37,8 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.2.1 gopkg.in/yaml.v3 v3.0.1 // gorm - gorm.io/driver/mysql v1.5.5 - gorm.io/gorm v1.25.8 + gorm.io/driver/mysql v1.5.6 + gorm.io/gorm v1.25.9 ) require ( diff --git a/server/internal/db/api/db_transfer.go b/server/internal/db/api/db_transfer.go index 073443e8..11122706 100644 --- a/server/internal/db/api/db_transfer.go +++ b/server/internal/db/api/db_transfer.go @@ -1,18 +1,14 @@ package api import ( - "context" - "fmt" "mayfly-go/internal/db/api/form" "mayfly-go/internal/db/api/vo" "mayfly-go/internal/db/application" "mayfly-go/internal/db/domain/entity" "mayfly-go/pkg/biz" - "mayfly-go/pkg/logx" "mayfly-go/pkg/req" "strconv" "strings" - "time" ) type DbTransferTask struct { @@ -26,13 +22,6 @@ func (d *DbTransferTask) Tasks(rc *req.Ctx) { rc.ResData = res } -func (d *DbTransferTask) Logs(rc *req.Ctx) { - queryCond, page := req.BindQueryAndPage[*entity.DbTransferLogQuery](rc, new(entity.DbTransferLogQuery)) - res, err := d.DbTransferTask.GetTaskLogList(queryCond, page, new([]vo.DbTransferLogListVO)) - biz.ErrIsNil(err) - rc.ResData = res -} - func (d *DbTransferTask) SaveTask(rc *req.Ctx) { reqForm := &form.DbTransferTaskForm{} task := req.BindJsonAndCopyTo[*entity.DbTransferTask](rc, reqForm, new(entity.DbTransferTask)) @@ -54,34 +43,9 @@ func (d *DbTransferTask) DeleteTask(rc *req.Ctx) { } func (d *DbTransferTask) Run(rc *req.Ctx) { - start := time.Now() - taskId := d.changeState(rc, entity.DbTransferTaskRunStateRunning) - go d.DbTransferTask.Run(taskId, func(msg string, err error) { - // 修改状态为停止 - if err != nil { - logx.Error(msg, err) - } else { - logx.Info(fmt.Sprintf("执行迁移完成,%s, 耗时:%v", msg, time.Since(start))) - } - // 修改任务状态 - task := new(entity.DbTransferTask) - task.Id = taskId - task.RunningState = entity.DbTransferTaskRunStateStop - biz.ErrIsNil(d.DbTransferTask.UpdateById(context.Background(), task)) - }) - + go d.DbTransferTask.Run(rc.MetaCtx, uint64(rc.PathParamInt("taskId"))) } func (d *DbTransferTask) Stop(rc *req.Ctx) { - taskId := d.changeState(rc, entity.DbTransferTaskRunStateStop) - d.DbTransferTask.Stop(taskId) -} - -func (d *DbTransferTask) changeState(rc *req.Ctx, RunningState int) uint64 { - reqForm := &form.DbTransferTaskStatusForm{RunningState: RunningState} - task := req.BindJsonAndCopyTo[*entity.DbTransferTask](rc, reqForm, new(entity.DbTransferTask)) - biz.ErrIsNil(d.DbTransferTask.UpdateById(rc.MetaCtx, task)) - // 记录请求日志 - rc.ReqParam = reqForm - return task.Id + biz.ErrIsNil(d.DbTransferTask.Stop(rc.MetaCtx, uint64(rc.PathParamInt("taskId")))) } diff --git a/server/internal/db/api/form/db_transfer.go b/server/internal/db/api/form/db_transfer.go index f17d42c1..1b559846 100644 --- a/server/internal/db/api/form/db_transfer.go +++ b/server/internal/db/api/form/db_transfer.go @@ -17,8 +17,3 @@ type DbTransferTaskForm struct { TargetInstName string `binding:"required" json:"targetInstName"` // 目标库实例名 TargetTagPath string `binding:"required" json:"targetTagPath"` // 目标库tagPath } - -type DbTransferTaskStatusForm struct { - Id uint64 `binding:"required" json:"taskId"` - RunningState int `json:"status"` -} diff --git a/server/internal/db/api/vo/db_data_sync.go b/server/internal/db/api/vo/db_data_sync.go index 415e276d..376dd6c8 100644 --- a/server/internal/db/api/vo/db_data_sync.go +++ b/server/internal/db/api/vo/db_data_sync.go @@ -3,14 +3,16 @@ package vo import "time" type DataSyncTaskListVO struct { - Id *int64 `json:"id"` - TaskName *string `json:"taskName"` + Id int64 `json:"id"` + TaskName string `json:"taskName"` + CreateTime *time.Time `json:"createTime"` + Creator string `json:"creator"` UpdateTime *time.Time `json:"updateTime"` ModifierId uint64 `json:"modifierId"` Modifier string `json:"modifier"` - RecentState *int `json:"recentState"` - RunningState *int `json:"runningState"` - Status *int `json:"status"` + RecentState int `json:"recentState"` + RunningState int `json:"runningState"` + Status int `json:"status"` } type DataSyncLogListVO struct { diff --git a/server/internal/db/api/vo/db_transfer.go b/server/internal/db/api/vo/db_transfer.go index e64767d0..5dab98dc 100644 --- a/server/internal/db/api/vo/db_transfer.go +++ b/server/internal/db/api/vo/db_transfer.go @@ -3,12 +3,14 @@ package vo import "time" type DbTransferTaskListVO struct { - Id *int64 `json:"id"` - + Id *int64 `json:"id"` + CreateTime *time.Time `json:"createTime"` + Creator string `json:"creator"` UpdateTime *time.Time `json:"updateTime"` Modifier string `json:"modifier"` - RunningState int `json:"runningState"` + RunningState int `json:"runningState"` + LogId uint64 `json:"logId"` CheckedKeys string `json:"checkedKeys"` // 选中需要迁移的表 DeleteTable int `json:"deleteTable"` // 创建表前是否删除表 @@ -27,11 +29,3 @@ type DbTransferTaskListVO struct { TargetInstName string `json:"targetInstName"` // 目标库实例名 TargetTagPath string `json:"targetTagPath"` // 目标库tagPath } - -type DbTransferLogListVO struct { - CreateTime *time.Time `json:"createTime"` - DataSqlFull string `json:"dataSqlFull"` - ResNum string `json:"resNum"` - ErrText string `json:"errText"` - Status *int `json:"status"` -} diff --git a/server/internal/db/application/db_transfer.go b/server/internal/db/application/db_transfer.go index 692302a5..8f0fe57e 100644 --- a/server/internal/db/application/db_transfer.go +++ b/server/internal/db/application/db_transfer.go @@ -6,13 +6,17 @@ import ( "mayfly-go/internal/db/dbm/dbi" "mayfly-go/internal/db/domain/entity" "mayfly-go/internal/db/domain/repository" + sysapp "mayfly-go/internal/sys/application" + sysentity "mayfly-go/internal/sys/domain/entity" "mayfly-go/pkg/base" + "mayfly-go/pkg/errorx" "mayfly-go/pkg/gormx" "mayfly-go/pkg/logx" "mayfly-go/pkg/model" "mayfly-go/pkg/utils/collx" "sort" "strings" + "time" ) type DbTransferTask interface { @@ -27,19 +31,16 @@ type DbTransferTask interface { InitJob() - GetTaskLogList(condition *entity.DbTransferLogQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) + Run(ctx context.Context, taskId uint64) - Run(taskId uint64, end func(msg string, err error)) - - Stop(taskId uint64) + Stop(ctx context.Context, taskId uint64) error } type dbTransferAppImpl struct { base.AppImpl[*entity.DbTransferTask, repository.DbTransferTask] - dbTransferLogRepo repository.DbTransferLog `inject:"DbTransferLogRepo"` - - dbApp Db `inject:"DbApp"` + dbApp Db `inject:"DbApp"` + logApp sysapp.Syslog `inject:"SyslogApp"` } func (app *dbTransferAppImpl) InjectDbTransferTaskRepo(repo repository.DbTransferTask) { @@ -73,70 +74,123 @@ func (app *dbTransferAppImpl) InitJob() { "running_state": entity.DbTransferTaskRunStateStop, } taskParam := new(entity.DbTransferTask) - taskParam.RunningState = 1 + taskParam.RunningState = entity.DbTransferTaskRunStateRunning _ = gormx.Updates(taskParam, taskParam, updateMap) } -func (app *dbTransferAppImpl) GetTaskLogList(condition *entity.DbTransferLogQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) { - return app.dbTransferLogRepo.GetTaskLogList(condition, pageParam, toEntity, orderBy...) -} - -func (app *dbTransferAppImpl) Run(taskId uint64, end func(msg string, err error)) { +func (app *dbTransferAppImpl) Run(ctx context.Context, taskId uint64) { task, err := app.GetById(new(entity.DbTransferTask), taskId) if err != nil { return } + + start := time.Now() + logId, err := app.logApp.CreateLog(ctx, &sysapp.CreateLogReq{ + Description: "DBMS-执行数据迁移", + ReqParam: collx.Kvs("taskId", task.Id), + Type: sysentity.SyslogTypeRunning, + Resp: "开始执行数据迁移...", + }) + if err != nil { + logx.Errorf("创建DBMS-执行数据迁移日志失败:%v", err) + return + } + defer app.logApp.Flush(logId) + + // 修改状态与关联日志id + task.LogId = logId + task.RunningState = entity.DbTransferTaskRunStateRunning + app.UpdateById(ctx, task) + // 获取源库连接、目标库连接,判断连接可用性,否则记录日志:xx连接不可用 // 获取源库表信息 srcConn, err := app.dbApp.GetDbConn(uint64(task.SrcDbId), task.SrcDbName) if err != nil { - end("获取源库连接失败", err) + app.EndTransfer(ctx, logId, taskId, "获取源库连接失败", err, nil) return } // 获取目标库表信息 targetConn, err := app.dbApp.GetDbConn(uint64(task.TargetDbId), task.TargetDbName) if err != nil { - end("获取目标库连接失败", err) + app.EndTransfer(ctx, logId, taskId, "获取目标库连接失败", err, nil) return } - // 查询出源库表信息 - srcDialect := srcConn.GetDialect() - targetDialect := targetConn.GetDialect() var tables []dbi.Table if task.CheckedKeys == "all" { tables, err = srcConn.GetMetaData().GetTables() if err != nil { - end("获取源表信息失败", err) + app.EndTransfer(ctx, logId, taskId, "获取源表信息失败", err, nil) return } } else { tableNames := strings.Split(task.CheckedKeys, ",") tables, err = srcConn.GetMetaData().GetTables(tableNames...) if err != nil { - end("获取源表信息失败", err) + app.EndTransfer(ctx, logId, taskId, "获取源表信息失败", err, nil) return } } // 迁移表 - app.transferTables(task, srcConn, srcDialect, targetConn, targetDialect, tables, end) + if err = app.transferTables(ctx, logId, task, srcConn, targetConn, tables); err != nil { + app.EndTransfer(ctx, logId, taskId, "迁移表失败", err, nil) + return + } - end(fmt.Sprintf("执行迁移任务完成:[%d]", task.Id), nil) + app.EndTransfer(ctx, logId, taskId, fmt.Sprintf("执行迁移完成,执行迁移任务[taskId = %d]完成, 耗时:%v", taskId, time.Since(start)), nil, nil) } -func (app *dbTransferAppImpl) Stop(taskId uint64) { - +func (app *dbTransferAppImpl) Log(ctx context.Context, logId uint64, msg string) { + logType := sysentity.SyslogTypeRunning + logx.InfoContext(ctx, msg) + app.logApp.AppendLog(logId, &sysapp.AppendLogReq{ + AppendResp: msg, + Type: logType, + }) } -func (app *dbTransferAppImpl) recLog(taskId uint64) { +func (app *dbTransferAppImpl) EndTransfer(ctx context.Context, logId uint64, taskId uint64, msg string, err error, extra map[string]any) { + logType := sysentity.SyslogTypeSuccess + transferState := entity.DbTransferTaskRunStateSuccess + if err != nil { + msg = fmt.Sprintf("%s: %s", msg, err.Error()) + logx.ErrorContext(ctx, msg) + logType = sysentity.SyslogTypeError + transferState = entity.DbTransferTaskRunStateFail + } else { + logx.InfoContext(ctx, msg) + } + app.logApp.AppendLog(logId, &sysapp.AppendLogReq{ + AppendResp: msg, + Extra: extra, + Type: logType, + }) + + // 修改任务状态 + task := new(entity.DbTransferTask) + task.Id = taskId + task.RunningState = transferState + app.UpdateById(context.Background(), task) +} + +func (app *dbTransferAppImpl) Stop(ctx context.Context, taskId uint64) error { + task, err := app.GetById(new(entity.DbTransferTask), taskId) + if err != nil { + return errorx.NewBiz("任务不存在") + } + + if task.RunningState != entity.DbTransferTaskRunStateRunning { + return errorx.NewBiz("该任务未在执行") + } + task.RunningState = entity.DbTransferTaskRunStateStop + return app.UpdateById(ctx, task) } // 迁移表 -func (app *dbTransferAppImpl) transferTables(task *entity.DbTransferTask, srcConn *dbi.DbConn, srcDialect dbi.Dialect, targetConn *dbi.DbConn, targetDialect dbi.Dialect, tables []dbi.Table, end func(msg string, err error)) { - +func (app *dbTransferAppImpl) transferTables(ctx context.Context, logId uint64, task *entity.DbTransferTask, srcConn *dbi.DbConn, targetConn *dbi.DbConn, tables []dbi.Table) error { tableNames := make([]string, 0) tableMap := make(map[string]dbi.Table) // 以表名分组,存放表信息 for _, table := range tables { @@ -145,15 +199,13 @@ func (app *dbTransferAppImpl) transferTables(task *entity.DbTransferTask, srcCon } if len(tableNames) == 0 { - end("没有需要迁移的表", nil) - return + return errorx.NewBiz("没有需要迁移的表") } srcMeta := srcConn.GetMetaData() // 查询源表列信息 columns, err := srcMeta.GetColumns(tableNames...) if err != nil { - end("获取源表列信息失败", err) - return + return errorx.NewBiz("获取源表列信息失败") } // 以表名分组,存放每个表的列信息 @@ -166,10 +218,10 @@ func (app *dbTransferAppImpl) transferTables(task *entity.DbTransferTask, srcCon sortTableNames := collx.MapKeys(columnMap) sort.Strings(sortTableNames) - ctx := context.Background() - + targetDialect := targetConn.GetDialect() srcColumnHelper := srcMeta.GetColumnHelper() targetColumnHelper := targetConn.GetMetaData().GetColumnHelper() + for _, tbName := range sortTableNames { cols := columnMap[tbName] targetCols := make([]dbi.Column, 0) @@ -183,59 +235,47 @@ func (app *dbTransferAppImpl) transferTables(task *entity.DbTransferTask, srcCon } // 通过公共列信息生成目标库的建表语句,并执行目标库建表 - logx.Infof("开始创建目标表: 表名:%s", tbName) + app.Log(ctx, logId, fmt.Sprintf("开始创建目标表: 表名:%s", tbName)) _, err := targetDialect.CreateTable(targetCols, tableMap[tbName], true) if err != nil { - end(fmt.Sprintf("创建目标表失败: 表名:%s, error: %s", tbName, err.Error()), err) - return + return errorx.NewBiz(fmt.Sprintf("创建目标表失败: 表名:%s, error: %s", tbName, err.Error())) } - logx.Infof("创建目标表成功: 表名:%s", tbName) + app.Log(ctx, logId, fmt.Sprintf("创建目标表成功: 表名:%s", tbName)) // 迁移数据 - logx.Infof("开始迁移数据: 表名:%s", tbName) - total, err := app.transferData(ctx, tbName, targetCols, srcConn, srcDialect, targetConn, targetDialect) + app.Log(ctx, logId, fmt.Sprintf("开始迁移数据: 表名:%s", tbName)) + total, err := app.transferData(ctx, task.Id, tbName, targetCols, srcConn, targetConn) if err != nil { - end(fmt.Sprintf("迁移数据失败: 表名:%s, error: %s", tbName, err.Error()), err) - return + return errorx.NewBiz(fmt.Sprintf("迁移数据失败: 表名:%s, error: %s", tbName, err.Error())) } - logx.Infof("迁移数据成功: 表名:%s, 数据:%d 条", tbName, total) + app.Log(ctx, logId, fmt.Sprintf("迁移数据成功: 表名:%s, 数据:%d 条", tbName, total)) // 有些数据库迁移完数据之后,需要更新表自增序列为当前表最大值 targetDialect.UpdateSequence(tbName, targetCols) // 迁移索引信息 - logx.Infof("开始迁移索引: 表名:%s", tbName) + app.Log(ctx, logId, fmt.Sprintf("开始迁移索引: 表名:%s", tbName)) err = app.transferIndex(ctx, tableMap[tbName], srcConn, targetDialect) if err != nil { - end(fmt.Sprintf("迁移索引失败: 表名:%s, error: %s", tbName, err.Error()), err) - return + return errorx.NewBiz(fmt.Sprintf("迁移索引失败: 表名:%s, error: %s", tbName, err.Error())) } - logx.Infof("迁移索引成功: 表名:%s", tbName) - - // 记录任务执行日志 + app.Log(ctx, logId, fmt.Sprintf("迁移索引成功: 表名:%s", tbName)) } - // 修改任务状态 - taskParam := &entity.DbTransferTask{} - taskParam.Id = task.Id - taskParam.RunningState = entity.DbTransferTaskRunStateStop - - if err := app.UpdateById(ctx, task); err != nil { - end("修改任务状态失败", err) - return - } + return nil } -func (app *dbTransferAppImpl) transferData(ctx context.Context, tableName string, targetColumns []dbi.Column, srcConn *dbi.DbConn, srcDialect dbi.Dialect, targetConn *dbi.DbConn, targetDialect dbi.Dialect) (int, error) { +func (app *dbTransferAppImpl) transferData(ctx context.Context, taskId uint64, tableName string, targetColumns []dbi.Column, srcConn *dbi.DbConn, targetConn *dbi.DbConn) (int, error) { result := make([]map[string]any, 0) total := 0 // 总条数 batchSize := 1000 // 每次查询并迁移1000条数据 var err error srcMeta := srcConn.GetMetaData() srcConverter := srcMeta.GetDataHelper() + targetDialect := targetConn.GetDialect() // 游标查询源表数据,并批量插入目标表 - err = srcConn.WalkTableRows(ctx, tableName, func(row map[string]any, columns []*dbi.QueryColumn) error { + err = srcConn.WalkTableRows(context.Background(), tableName, func(row map[string]any, columns []*dbi.QueryColumn) error { total++ rawValue := map[string]any{} for _, column := range columns { @@ -245,27 +285,40 @@ func (app *dbTransferAppImpl) transferData(ctx context.Context, tableName string } result = append(result, rawValue) if total%batchSize == 0 { - err = app.transfer2Target(targetConn, targetColumns, result, targetDialect, tableName) + err = app.transfer2Target(taskId, targetConn, targetColumns, result, targetDialect, tableName) if err != nil { - logx.Error("批量插入目标表数据失败", err) + logx.ErrorfContext(ctx, "批量插入目标表数据失败: %v", err) return err } result = result[:0] } return nil }) + + if err != nil { + return total, err + } + // 处理剩余的数据 if len(result) > 0 { - err = app.transfer2Target(targetConn, targetColumns, result, targetDialect, tableName) + err = app.transfer2Target(taskId, targetConn, targetColumns, result, targetDialect, tableName) if err != nil { - logx.Error(fmt.Sprintf("批量插入目标表数据失败,表名:%s", tableName), err) + logx.ErrorfContext(ctx, "批量插入目标表数据失败,表名:%s error: %v", tableName, err) return 0, err } } return total, err } -func (app *dbTransferAppImpl) transfer2Target(targetConn *dbi.DbConn, targetColumns []dbi.Column, result []map[string]any, targetDialect dbi.Dialect, tbName string) error { +func (app *dbTransferAppImpl) transfer2Target(taskId uint64, targetConn *dbi.DbConn, targetColumns []dbi.Column, result []map[string]any, targetDialect dbi.Dialect, tbName string) error { + task, err := app.GetById(new(entity.DbTransferTask), taskId) + if err != nil { + return errorx.NewBiz("任务不存在") + } + if task.RunningState == entity.DbTransferTaskRunStateStop { + return errorx.NewBiz("迁移终止") + } + tx, err := targetConn.Begin() if err != nil { return err @@ -304,7 +357,7 @@ func (app *dbTransferAppImpl) transfer2Target(targetConn *dbi.DbConn, targetColu defer func() { if r := recover(); r != nil { tx.Rollback() - logx.Error("批量插入目标表数据失败", r) + logx.Errorf("批量插入目标表数据失败: %v", r) } }() @@ -313,7 +366,6 @@ func (app *dbTransferAppImpl) transfer2Target(targetConn *dbi.DbConn, targetColu } func (app *dbTransferAppImpl) transferIndex(_ context.Context, tableInfo dbi.Table, srcConn *dbi.DbConn, targetDialect dbi.Dialect) error { - // 查询源表索引信息 indexs, err := srcConn.GetMetaData().GetTableIndex(tableInfo.TableName) if err != nil { diff --git a/server/internal/db/dbm/dbi/conn.go b/server/internal/db/dbm/dbi/conn.go index a0948e36..9221440f 100644 --- a/server/internal/db/dbm/dbi/conn.go +++ b/server/internal/db/dbm/dbi/conn.go @@ -153,18 +153,16 @@ func (d *DbConn) Close() { // 游标方式遍历查询rows, walkFn error不为nil, 则跳出遍历 func walkQueryRows(ctx context.Context, db *sql.DB, selectSql string, walkFn WalkQueryRowsFunc, args ...any) error { - rows, err := db.QueryContext(ctx, selectSql, args...) + cancelCtx, cancelFunc := context.WithCancel(ctx) + defer cancelFunc() + rows, err := db.QueryContext(cancelCtx, selectSql, args...) if err != nil { return err } // rows对象一定要close掉,如果出错,不关掉则会很迅速的达到设置最大连接数, // 后面的链接过来直接报错或拒绝,实际上也没有起效果 - defer func() { - if rows != nil { - rows.Close() - } - }() + defer rows.Close() colTypes, err := rows.ColumnTypes() if err != nil { @@ -200,7 +198,8 @@ func walkQueryRows(ctx context.Context, db *sql.DB, selectSql string, walkFn Wal rowData[cols[i].Name] = valueConvert(v, colTypes[i]) } if err = walkFn(rowData, cols); err != nil { - logx.Errorf("游标遍历查询结果集出错,退出遍历: %s", err.Error()) + logx.ErrorfContext(ctx, "游标遍历查询结果集出错, 退出遍历: %s", err.Error()) + cancelFunc() return err } } diff --git a/server/internal/db/domain/entity/db_transfer.go b/server/internal/db/domain/entity/db_transfer.go index 01c9209b..3bb015b0 100644 --- a/server/internal/db/domain/entity/db_transfer.go +++ b/server/internal/db/domain/entity/db_transfer.go @@ -2,13 +2,13 @@ package entity import ( "mayfly-go/pkg/model" - "time" ) type DbTransferTask struct { model.Model - RunningState int `orm:"column(running_state)" json:"runningState"` // 运行状态 1运行中 2待运行 + RunningState DbTransferRunningState `orm:"column(running_state)" json:"runningState"` // 运行状态 + LogId uint64 `json:"logId"` CheckedKeys string `orm:"column(checked_keys)" json:"checkedKeys"` // 选中需要迁移的表 DeleteTable int `orm:"column(delete_table)" json:"deleteTable"` // 创建表前是否删除表 @@ -33,28 +33,14 @@ func (d *DbTransferTask) TableName() string { return "t_db_transfer_task" } -type DbTransferLog struct { - model.IdModel - TaskId uint64 `orm:"column(task_id)" json:"taskId"` // 任务表id - CreateTime *time.Time `orm:"column(create_time)" json:"createTime"` - DataSqlFull string `orm:"column(data_sql_full)" json:"dataSqlFull"` // 执行的完整sql - ResNum int `orm:"column(res_num)" json:"resNum"` // 收到数据条数 - ErrText string `orm:"column(err_text)" json:"errText"` // 错误日志 - Status int8 `orm:"column(status)" json:"status"` // 状态:1.成功 -1.失败 -} - -func (d *DbTransferLog) TableName() string { - return "t_db_transfer_log" -} +type DbTransferRunningState int8 const ( DbTransferTaskStatusEnable int = 1 // 启用状态 DbTransferTaskStatusDisable int = -1 // 禁用状态 - DbTransferTaskStateSuccess int = 1 // 执行成功状态 - DbTransferTaskStateRunning int = 2 // 执行成功状态 - DbTransferTaskStateFail int = -1 // 执行失败状态 - - DbTransferTaskRunStateRunning int = 1 // 运行中状态 - DbTransferTaskRunStateStop int = 2 // 手动停止状态 + DbTransferTaskRunStateSuccess DbTransferRunningState = 2 // 执行成功 + DbTransferTaskRunStateRunning DbTransferRunningState = 1 // 运行中状态 + DbTransferTaskRunStateFail DbTransferRunningState = -1 // 执行失败 + DbTransferTaskRunStateStop DbTransferRunningState = -2 // 手动终止 ) diff --git a/server/internal/db/domain/repository/db_transfer.go b/server/internal/db/domain/repository/db_transfer.go index 9db65313..2afb4b76 100644 --- a/server/internal/db/domain/repository/db_transfer.go +++ b/server/internal/db/domain/repository/db_transfer.go @@ -12,10 +12,3 @@ type DbTransferTask interface { // 分页获取数据库实例信息列表 GetTaskList(condition *entity.DbTransferTaskQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) } - -type DbTransferLog interface { - base.Repo[*entity.DbTransferLog] - - // 分页获取数据库实例信息列表 - GetTaskLogList(condition *entity.DbTransferLogQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) -} diff --git a/server/internal/db/infrastructure/persistence/db_transfer.go b/server/internal/db/infrastructure/persistence/db_transfer.go index 2839f602..453739d1 100644 --- a/server/internal/db/infrastructure/persistence/db_transfer.go +++ b/server/internal/db/infrastructure/persistence/db_transfer.go @@ -23,18 +23,3 @@ func (d *dbTransferTaskRepoImpl) GetTaskList(condition *entity.DbTransferTaskQue //Eq("status", condition.Status) return gormx.PageQuery(qd, pageParam, toEntity) } - -type dbTransferLogRepoImpl struct { - base.RepoImpl[*entity.DbTransferLog] -} - -// 分页获取数据库信息列表 -func (d *dbTransferLogRepoImpl) GetTaskLogList(condition *entity.DbTransferLogQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) { - qd := gormx.NewQuery(new(entity.DbTransferLog)). - Eq("task_id", condition.TaskId) - return gormx.PageQuery(qd, pageParam, toEntity) -} - -func newDbTransferLogRepo() repository.DbTransferLog { - return &dbTransferLogRepoImpl{base.RepoImpl[*entity.DbTransferLog]{M: new(entity.DbTransferLog)}} -} diff --git a/server/internal/db/infrastructure/persistence/persistence.go b/server/internal/db/infrastructure/persistence/persistence.go index 6e16f148..7ca7f1f7 100644 --- a/server/internal/db/infrastructure/persistence/persistence.go +++ b/server/internal/db/infrastructure/persistence/persistence.go @@ -12,7 +12,6 @@ func InitIoc() { ioc.Register(newDataSyncTaskRepo(), ioc.WithComponentName("DbDataSyncTaskRepo")) ioc.Register(newDataSyncLogRepo(), ioc.WithComponentName("DbDataSyncLogRepo")) ioc.Register(newDbTransferTaskRepo(), ioc.WithComponentName("DbTransferTaskRepo")) - ioc.Register(newDbTransferLogRepo(), ioc.WithComponentName("DbTransferLogRepo")) ioc.Register(NewDbBackupRepo(), ioc.WithComponentName("DbBackupRepo")) ioc.Register(NewDbBackupHistoryRepo(), ioc.WithComponentName("DbBackupHistoryRepo")) diff --git a/server/internal/db/router/db_transfer.go b/server/internal/db/router/db_transfer.go index 8e87437b..bcbc6b47 100644 --- a/server/internal/db/router/db_transfer.go +++ b/server/internal/db/router/db_transfer.go @@ -19,8 +19,6 @@ func InitDbTransferRouter(router *gin.RouterGroup) { // 获取任务列表 /datasync req.NewGet("", d.Tasks), - req.NewGet(":taskId/logs", d.Logs).RequiredPermissionCode("db:transfer:log"), - // 保存任务 /datasync/save req.NewPost("save", d.SaveTask).Log(req.NewLogSave("datasync-保存数据迁移任务信息")).RequiredPermissionCode("db:transfer:save"), @@ -28,10 +26,10 @@ func InitDbTransferRouter(router *gin.RouterGroup) { req.NewDelete(":taskId/del", d.DeleteTask).Log(req.NewLogSave("datasync-删除数据迁移任务信息")).RequiredPermissionCode("db:transfer:del"), // 立即执行任务 /datasync/run - req.NewPost(":taskId/run", d.Run).Log(req.NewLogSave("datasync-运行数据迁移任务")).RequiredPermissionCode("db:transfer:run"), + req.NewPost(":taskId/run", d.Run).Log(req.NewLog("DBMS-执行数据迁移任务")).RequiredPermissionCode("db:transfer:run"), // 停止正在执行中的任务 - req.NewPost(":taskId/stop", d.Stop), + req.NewPost(":taskId/stop", d.Stop).Log(req.NewLogSave("DBMS-终止数据迁移任务")), } req.BatchSetGroup(instances, reqs[:]) diff --git a/server/internal/sys/api/syslog.go b/server/internal/sys/api/syslog.go index 69c62cbf..1380794c 100644 --- a/server/internal/sys/api/syslog.go +++ b/server/internal/sys/api/syslog.go @@ -17,3 +17,7 @@ func (r *Syslog) Syslogs(rc *req.Ctx) { biz.ErrIsNil(err) rc.ResData = res } + +func (r *Syslog) SyslogDetail(rc *req.Ctx) { + rc.ResData = r.SyslogApp.GetLogDetail(uint64(rc.PathParamInt("id"))) +} diff --git a/server/internal/sys/application/syslog.go b/server/internal/sys/application/syslog.go index a62ca513..1549f495 100644 --- a/server/internal/sys/application/syslog.go +++ b/server/internal/sys/application/syslog.go @@ -1,27 +1,62 @@ package application import ( + "context" "encoding/json" "fmt" "mayfly-go/internal/sys/domain/entity" "mayfly-go/internal/sys/domain/repository" "mayfly-go/pkg/contextx" "mayfly-go/pkg/errorx" + "mayfly-go/pkg/logx" "mayfly-go/pkg/model" "mayfly-go/pkg/req" "mayfly-go/pkg/utils/anyx" + "mayfly-go/pkg/utils/collx" + "mayfly-go/pkg/utils/jsonx" + "mayfly-go/pkg/utils/structx" + "mayfly-go/pkg/utils/timex" + "sync" "time" ) +type CreateLogReq struct { + Type int8 `json:"type"` + Description string `json:"description"` + ReqParam any `json:"reqParam" ` // 请求参数 + Resp string `json:"resp" ` // 响应结构 + Extra map[string]any // 额外日志信息 +} + +type AppendLogReq struct { + Type int8 `json:"type"` + AppendResp string `json:"appendResp" ` // 追加日志信息 + Extra map[string]any // 额外日志信息 +} + type Syslog interface { GetPageList(condition *entity.SysLogQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) // 从请求上下文的参数保存系统日志 SaveFromReq(req *req.Ctx) + + GetLogDetail(logId uint64) *entity.SysLog + + // CreateLog 创建日志信息 + CreateLog(ctx context.Context, log *CreateLogReq) (uint64, error) + + // AppendLog 追加日志信息 + AppendLog(logId uint64, appendLog *AppendLogReq) + + // Flush 实时追加的日志到库里 + Flush(logId uint64) } type syslogAppImpl struct { SyslogRepo repository.Syslog `inject:""` + + appendLogs map[uint64]*entity.SysLog + rwLock sync.RWMutex } func (m *syslogAppImpl) GetPageList(condition *entity.SysLogQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) { @@ -34,7 +69,8 @@ func (m *syslogAppImpl) SaveFromReq(req *req.Ctx) { lg = &model.LoginAccount{Id: 0, Username: "-"} } syslog := new(entity.SysLog) - syslog.CreateTime = time.Now() + now := time.Now() + syslog.CreateTime = &now syslog.Creator = lg.Username syslog.CreatorId = lg.Id @@ -67,8 +103,78 @@ func (m *syslogAppImpl) SaveFromReq(req *req.Ctx) { } syslog.Resp = errMsg } else { - syslog.Type = entity.SyslogTypeNorman + syslog.Type = entity.SyslogTypeSuccess } m.SyslogRepo.Insert(req.MetaCtx, syslog) } + +func (m *syslogAppImpl) GetLogDetail(logId uint64) *entity.SysLog { + syslog := new(entity.SysLog) + if err := m.SyslogRepo.GetById(syslog, logId); err != nil { + return nil + } + + if syslog.Type == entity.SyslogTypeRunning { + m.rwLock.RLock() + defer m.rwLock.RUnlock() + return m.appendLogs[logId] + } + + return syslog +} + +func (m *syslogAppImpl) CreateLog(ctx context.Context, log *CreateLogReq) (uint64, error) { + syslog := new(entity.SysLog) + structx.Copy(syslog, log) + syslog.ReqParam = anyx.ToString(log.ReqParam) + if log.Extra != nil { + syslog.Extra = jsonx.ToStr(log.Extra) + } + if err := m.SyslogRepo.Insert(ctx, syslog); err != nil { + return 0, err + } + return syslog.Id, nil +} + +func (m *syslogAppImpl) AppendLog(logId uint64, appendLog *AppendLogReq) { + m.rwLock.Lock() + defer m.rwLock.Unlock() + + if m.appendLogs == nil { + m.appendLogs = make(map[uint64]*entity.SysLog) + } + + syslog := m.appendLogs[logId] + if syslog == nil { + syslog = new(entity.SysLog) + if err := m.SyslogRepo.GetById(syslog, logId); err != nil { + logx.Warnf("追加日志不存在: %d", logId) + return + } + m.appendLogs[logId] = syslog + } + + appendLogMsg := fmt.Sprintf("%s %s", timex.DefaultFormat(time.Now()), appendLog.AppendResp) + syslog.Resp = fmt.Sprintf("%s\n%s", syslog.Resp, appendLogMsg) + syslog.Type = appendLog.Type + if appendLog.Extra != nil { + existExtra := jsonx.ToMap(syslog.Extra) + syslog.Extra = jsonx.ToStr(collx.MapMerge(existExtra, appendLog.Extra)) + } +} + +func (m *syslogAppImpl) Flush(logId uint64) { + syslog := m.appendLogs[logId] + if syslog == nil { + return + } + + // 如果刷入库的的时候还是执行中状态,则默认改为成功状态 + if syslog.Type == entity.SyslogTypeRunning { + syslog.Type = entity.SyslogTypeSuccess + } + + m.SyslogRepo.UpdateById(context.Background(), syslog) + delete(m.appendLogs, logId) +} diff --git a/server/internal/sys/domain/entity/syslog.go b/server/internal/sys/domain/entity/syslog.go index ebda3fcf..778f04e0 100644 --- a/server/internal/sys/domain/entity/syslog.go +++ b/server/internal/sys/domain/entity/syslog.go @@ -2,21 +2,17 @@ package entity import ( "mayfly-go/pkg/model" - "time" ) // 系统操作日志 type SysLog struct { - model.DeletedModel - - CreateTime time.Time `json:"createTime"` - CreatorId uint64 `json:"creatorId"` - Creator string `json:"creator"` + model.CreateModel Type int8 `json:"type"` Description string `json:"description"` ReqParam string `json:"reqParam" gorm:"column:req_param;type:varchar(1000)"` // 请求参数 - Resp string `json:"resp" gorm:"column:resp;type:varchar(1000)"` // 响应结构 + Resp string `json:"resp" gorm:"column:resp;type:varchar(10000)"` // 响应结构 + Extra string `json:"extra"` // 日志额外信息 } func (a *SysLog) TableName() string { @@ -24,6 +20,7 @@ func (a *SysLog) TableName() string { } const ( - SyslogTypeNorman int8 = 1 // 正常状态 - SyslogTypeError int8 = 2 // 错误状态 + SyslogTypeRunning int8 = -1 // 执行中 + SyslogTypeSuccess int8 = 1 // 正常状态 + SyslogTypeError int8 = 2 // 错误状态 ) diff --git a/server/internal/sys/router/syslog.go b/server/internal/sys/router/syslog.go index fe784eee..b8e59dca 100644 --- a/server/internal/sys/router/syslog.go +++ b/server/internal/sys/router/syslog.go @@ -15,4 +15,6 @@ func InitSyslogRouter(router *gin.RouterGroup) { biz.ErrIsNil(ioc.Inject(s)) req.NewGet("", s.Syslogs).Group(sysG) + + req.NewGet("/:id", s.SyslogDetail).Group(sysG) } diff --git a/server/pkg/config/app.go b/server/pkg/config/app.go index e86d21b3..d978a714 100644 --- a/server/pkg/config/app.go +++ b/server/pkg/config/app.go @@ -4,7 +4,7 @@ import "fmt" const ( AppName = "mayfly-go" - Version = "v1.7.4" + Version = "v1.7.5" ) func GetAppInfo() string { diff --git a/server/pkg/logx/logx.go b/server/pkg/logx/logx.go index 44208a98..a135465f 100644 --- a/server/pkg/logx/logx.go +++ b/server/pkg/logx/logx.go @@ -48,6 +48,10 @@ func DebugContext(ctx context.Context, msg string, args ...any) { Log(ctx, slog.LevelDebug, msg, args...) } +func DebugfContext(ctx context.Context, format string, args ...any) { + Log(ctx, slog.LevelDebug, fmt.Sprintf(format, args...)) +} + func Debugf(format string, args ...any) { Log(context.Background(), slog.LevelDebug, fmt.Sprintf(format, args...)) } @@ -69,6 +73,10 @@ func InfoContext(ctx context.Context, msg string, args ...any) { Log(ctx, slog.LevelInfo, msg, args...) } +func InfofContext(ctx context.Context, format string, args ...any) { + Log(ctx, slog.LevelInfo, fmt.Sprintf(format, args...)) +} + func Infof(format string, args ...any) { Log(context.Background(), slog.LevelInfo, fmt.Sprintf(format, args...)) } @@ -85,6 +93,10 @@ func WarnContext(ctx context.Context, msg string, args ...any) { Log(ctx, slog.LevelWarn, msg, args...) } +func WarnfContext(ctx context.Context, format string, args ...any) { + Log(ctx, slog.LevelWarn, fmt.Sprintf(format, args...)) +} + func Warnf(format string, args ...any) { Log(context.Background(), slog.LevelWarn, fmt.Sprintf(format, args...)) } @@ -101,6 +113,10 @@ func ErrorContext(ctx context.Context, msg string, args ...any) { Log(ctx, slog.LevelError, msg, args...) } +func ErrorfContext(ctx context.Context, format string, args ...any) { + Log(ctx, slog.LevelError, fmt.Sprintf(format, args...)) +} + func Errorf(format string, args ...any) { Log(context.Background(), slog.LevelError, fmt.Sprintf(format, args...)) } diff --git a/server/pkg/utils/collx/map.go b/server/pkg/utils/collx/map.go index 2814b3dc..2fc8349d 100644 --- a/server/pkg/utils/collx/map.go +++ b/server/pkg/utils/collx/map.go @@ -42,3 +42,16 @@ func MapValues[M ~map[K]V, K comparable, V any](m M) []V { } return r } + +// MapMerge maps merge, 若存在重复的key,则以最后的map值为准 +func MapMerge[M ~map[K]V, K comparable, V any](maps ...M) M { + mergedMap := make(M) + + for _, m := range maps { + for k, v := range m { + mergedMap[k] = v + } + } + + return mergedMap +} diff --git a/server/resources/script/sql/v1.7/v1.7.5.sql b/server/resources/script/sql/v1.7/v1.7.5.sql index 987ea0e0..8681a9e2 100644 --- a/server/resources/script/sql/v1.7/v1.7.5.sql +++ b/server/resources/script/sql/v1.7/v1.7.5.sql @@ -34,6 +34,7 @@ CREATE TABLE `t_db_transfer_task` ( `target_tag_path` varchar(200) NOT NULL COMMENT '目标库类型', `target_db_type` varchar(200) NOT NULL COMMENT '目标库实例名', `target_inst_name` varchar(200) NOT NULL COMMENT '目标库tagPath', + `log_id` bigint(20) NOT NULL COMMENT '日志id', PRIMARY KEY (`id`) ) COMMENT='数据库迁移任务表'; @@ -45,6 +46,9 @@ INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `we INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`, `delete_time`) VALUES(1709196737, 1709194669, 2, 1, '日志', 'db:transfer:log', 1709196737, 'null', 12, 'liuzongyang', 12, 'liuzongyang', '2024-02-29 16:52:17', '2024-02-29 16:52:17', 'SmLcpu6c/CZhNIbWg/', 0, NULL); INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`, `delete_time`) VALUES(1709196755, 1709194669, 2, 1, '运行', 'db:transfer:run', 1709196755, 'null', 12, 'liuzongyang', 12, 'liuzongyang', '2024-02-29 16:52:36', '2024-02-29 16:52:36', 'SmLcpu6c/b6yHt6V2/', 0, NULL); +ALTER TABLE t_sys_log ADD extra varchar(5000) NULL; +ALTER TABLE t_sys_log MODIFY COLUMN resp text NULL; +