feat: 数据迁移新增实时日志&数据库游标遍历查询问题修复

This commit is contained in:
meilin.huang
2024-03-28 22:20:39 +08:00
parent 5e4793433b
commit d1d372e1bf
31 changed files with 477 additions and 344 deletions

View File

@@ -6,13 +6,17 @@ import (
"mayfly-go/internal/db/dbm/dbi"
"mayfly-go/internal/db/domain/entity"
"mayfly-go/internal/db/domain/repository"
sysapp "mayfly-go/internal/sys/application"
sysentity "mayfly-go/internal/sys/domain/entity"
"mayfly-go/pkg/base"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/gormx"
"mayfly-go/pkg/logx"
"mayfly-go/pkg/model"
"mayfly-go/pkg/utils/collx"
"sort"
"strings"
"time"
)
type DbTransferTask interface {
@@ -27,19 +31,16 @@ type DbTransferTask interface {
InitJob()
GetTaskLogList(condition *entity.DbTransferLogQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error)
Run(ctx context.Context, taskId uint64)
Run(taskId uint64, end func(msg string, err error))
Stop(taskId uint64)
Stop(ctx context.Context, taskId uint64) error
}
type dbTransferAppImpl struct {
base.AppImpl[*entity.DbTransferTask, repository.DbTransferTask]
dbTransferLogRepo repository.DbTransferLog `inject:"DbTransferLogRepo"`
dbApp Db `inject:"DbApp"`
dbApp Db `inject:"DbApp"`
logApp sysapp.Syslog `inject:"SyslogApp"`
}
func (app *dbTransferAppImpl) InjectDbTransferTaskRepo(repo repository.DbTransferTask) {
@@ -73,70 +74,123 @@ func (app *dbTransferAppImpl) InitJob() {
"running_state": entity.DbTransferTaskRunStateStop,
}
taskParam := new(entity.DbTransferTask)
taskParam.RunningState = 1
taskParam.RunningState = entity.DbTransferTaskRunStateRunning
_ = gormx.Updates(taskParam, taskParam, updateMap)
}
func (app *dbTransferAppImpl) GetTaskLogList(condition *entity.DbTransferLogQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) {
return app.dbTransferLogRepo.GetTaskLogList(condition, pageParam, toEntity, orderBy...)
}
func (app *dbTransferAppImpl) Run(taskId uint64, end func(msg string, err error)) {
func (app *dbTransferAppImpl) Run(ctx context.Context, taskId uint64) {
task, err := app.GetById(new(entity.DbTransferTask), taskId)
if err != nil {
return
}
start := time.Now()
logId, err := app.logApp.CreateLog(ctx, &sysapp.CreateLogReq{
Description: "DBMS-执行数据迁移",
ReqParam: collx.Kvs("taskId", task.Id),
Type: sysentity.SyslogTypeRunning,
Resp: "开始执行数据迁移...",
})
if err != nil {
logx.Errorf("创建DBMS-执行数据迁移日志失败:%v", err)
return
}
defer app.logApp.Flush(logId)
// 修改状态与关联日志id
task.LogId = logId
task.RunningState = entity.DbTransferTaskRunStateRunning
app.UpdateById(ctx, task)
// 获取源库连接、目标库连接判断连接可用性否则记录日志xx连接不可用
// 获取源库表信息
srcConn, err := app.dbApp.GetDbConn(uint64(task.SrcDbId), task.SrcDbName)
if err != nil {
end("获取源库连接失败", err)
app.EndTransfer(ctx, logId, taskId, "获取源库连接失败", err, nil)
return
}
// 获取目标库表信息
targetConn, err := app.dbApp.GetDbConn(uint64(task.TargetDbId), task.TargetDbName)
if err != nil {
end("获取目标库连接失败", err)
app.EndTransfer(ctx, logId, taskId, "获取目标库连接失败", err, nil)
return
}
// 查询出源库表信息
srcDialect := srcConn.GetDialect()
targetDialect := targetConn.GetDialect()
var tables []dbi.Table
if task.CheckedKeys == "all" {
tables, err = srcConn.GetMetaData().GetTables()
if err != nil {
end("获取源表信息失败", err)
app.EndTransfer(ctx, logId, taskId, "获取源表信息失败", err, nil)
return
}
} else {
tableNames := strings.Split(task.CheckedKeys, ",")
tables, err = srcConn.GetMetaData().GetTables(tableNames...)
if err != nil {
end("获取源表信息失败", err)
app.EndTransfer(ctx, logId, taskId, "获取源表信息失败", err, nil)
return
}
}
// 迁移表
app.transferTables(task, srcConn, srcDialect, targetConn, targetDialect, tables, end)
if err = app.transferTables(ctx, logId, task, srcConn, targetConn, tables); err != nil {
app.EndTransfer(ctx, logId, taskId, "迁移表失败", err, nil)
return
}
end(fmt.Sprintf("执行迁移任务完成:[%d]", task.Id), nil)
app.EndTransfer(ctx, logId, taskId, fmt.Sprintf("执行迁移完成,执行迁移任务[taskId = %d]完成, 耗时:%v", taskId, time.Since(start)), nil, nil)
}
func (app *dbTransferAppImpl) Stop(taskId uint64) {
func (app *dbTransferAppImpl) Log(ctx context.Context, logId uint64, msg string) {
logType := sysentity.SyslogTypeRunning
logx.InfoContext(ctx, msg)
app.logApp.AppendLog(logId, &sysapp.AppendLogReq{
AppendResp: msg,
Type: logType,
})
}
func (app *dbTransferAppImpl) recLog(taskId uint64) {
func (app *dbTransferAppImpl) EndTransfer(ctx context.Context, logId uint64, taskId uint64, msg string, err error, extra map[string]any) {
logType := sysentity.SyslogTypeSuccess
transferState := entity.DbTransferTaskRunStateSuccess
if err != nil {
msg = fmt.Sprintf("%s: %s", msg, err.Error())
logx.ErrorContext(ctx, msg)
logType = sysentity.SyslogTypeError
transferState = entity.DbTransferTaskRunStateFail
} else {
logx.InfoContext(ctx, msg)
}
app.logApp.AppendLog(logId, &sysapp.AppendLogReq{
AppendResp: msg,
Extra: extra,
Type: logType,
})
// 修改任务状态
task := new(entity.DbTransferTask)
task.Id = taskId
task.RunningState = transferState
app.UpdateById(context.Background(), task)
}
func (app *dbTransferAppImpl) Stop(ctx context.Context, taskId uint64) error {
task, err := app.GetById(new(entity.DbTransferTask), taskId)
if err != nil {
return errorx.NewBiz("任务不存在")
}
if task.RunningState != entity.DbTransferTaskRunStateRunning {
return errorx.NewBiz("该任务未在执行")
}
task.RunningState = entity.DbTransferTaskRunStateStop
return app.UpdateById(ctx, task)
}
// 迁移表
func (app *dbTransferAppImpl) transferTables(task *entity.DbTransferTask, srcConn *dbi.DbConn, srcDialect dbi.Dialect, targetConn *dbi.DbConn, targetDialect dbi.Dialect, tables []dbi.Table, end func(msg string, err error)) {
func (app *dbTransferAppImpl) transferTables(ctx context.Context, logId uint64, task *entity.DbTransferTask, srcConn *dbi.DbConn, targetConn *dbi.DbConn, tables []dbi.Table) error {
tableNames := make([]string, 0)
tableMap := make(map[string]dbi.Table) // 以表名分组,存放表信息
for _, table := range tables {
@@ -145,15 +199,13 @@ func (app *dbTransferAppImpl) transferTables(task *entity.DbTransferTask, srcCon
}
if len(tableNames) == 0 {
end("没有需要迁移的表", nil)
return
return errorx.NewBiz("没有需要迁移的表")
}
srcMeta := srcConn.GetMetaData()
// 查询源表列信息
columns, err := srcMeta.GetColumns(tableNames...)
if err != nil {
end("获取源表列信息失败", err)
return
return errorx.NewBiz("获取源表列信息失败")
}
// 以表名分组,存放每个表的列信息
@@ -166,10 +218,10 @@ func (app *dbTransferAppImpl) transferTables(task *entity.DbTransferTask, srcCon
sortTableNames := collx.MapKeys(columnMap)
sort.Strings(sortTableNames)
ctx := context.Background()
targetDialect := targetConn.GetDialect()
srcColumnHelper := srcMeta.GetColumnHelper()
targetColumnHelper := targetConn.GetMetaData().GetColumnHelper()
for _, tbName := range sortTableNames {
cols := columnMap[tbName]
targetCols := make([]dbi.Column, 0)
@@ -183,59 +235,47 @@ func (app *dbTransferAppImpl) transferTables(task *entity.DbTransferTask, srcCon
}
// 通过公共列信息生成目标库的建表语句,并执行目标库建表
logx.Infof("开始创建目标表: 表名:%s", tbName)
app.Log(ctx, logId, fmt.Sprintf("开始创建目标表: 表名:%s", tbName))
_, err := targetDialect.CreateTable(targetCols, tableMap[tbName], true)
if err != nil {
end(fmt.Sprintf("创建目标表失败: 表名:%s, error: %s", tbName, err.Error()), err)
return
return errorx.NewBiz(fmt.Sprintf("创建目标表失败: 表名:%s, error: %s", tbName, err.Error()))
}
logx.Infof("创建目标表成功: 表名:%s", tbName)
app.Log(ctx, logId, fmt.Sprintf("创建目标表成功: 表名:%s", tbName))
// 迁移数据
logx.Infof("开始迁移数据: 表名:%s", tbName)
total, err := app.transferData(ctx, tbName, targetCols, srcConn, srcDialect, targetConn, targetDialect)
app.Log(ctx, logId, fmt.Sprintf("开始迁移数据: 表名:%s", tbName))
total, err := app.transferData(ctx, task.Id, tbName, targetCols, srcConn, targetConn)
if err != nil {
end(fmt.Sprintf("迁移数据失败: 表名:%s, error: %s", tbName, err.Error()), err)
return
return errorx.NewBiz(fmt.Sprintf("迁移数据失败: 表名:%s, error: %s", tbName, err.Error()))
}
logx.Infof("迁移数据成功: 表名:%s, 数据:%d 条", tbName, total)
app.Log(ctx, logId, fmt.Sprintf("迁移数据成功: 表名:%s, 数据:%d 条", tbName, total))
// 有些数据库迁移完数据之后,需要更新表自增序列为当前表最大值
targetDialect.UpdateSequence(tbName, targetCols)
// 迁移索引信息
logx.Infof("开始迁移索引: 表名:%s", tbName)
app.Log(ctx, logId, fmt.Sprintf("开始迁移索引: 表名:%s", tbName))
err = app.transferIndex(ctx, tableMap[tbName], srcConn, targetDialect)
if err != nil {
end(fmt.Sprintf("迁移索引失败: 表名:%s, error: %s", tbName, err.Error()), err)
return
return errorx.NewBiz(fmt.Sprintf("迁移索引失败: 表名:%s, error: %s", tbName, err.Error()))
}
logx.Infof("迁移索引成功: 表名:%s", tbName)
// 记录任务执行日志
app.Log(ctx, logId, fmt.Sprintf("迁移索引成功: 表名:%s", tbName))
}
// 修改任务状态
taskParam := &entity.DbTransferTask{}
taskParam.Id = task.Id
taskParam.RunningState = entity.DbTransferTaskRunStateStop
if err := app.UpdateById(ctx, task); err != nil {
end("修改任务状态失败", err)
return
}
return nil
}
func (app *dbTransferAppImpl) transferData(ctx context.Context, tableName string, targetColumns []dbi.Column, srcConn *dbi.DbConn, srcDialect dbi.Dialect, targetConn *dbi.DbConn, targetDialect dbi.Dialect) (int, error) {
func (app *dbTransferAppImpl) transferData(ctx context.Context, taskId uint64, tableName string, targetColumns []dbi.Column, srcConn *dbi.DbConn, targetConn *dbi.DbConn) (int, error) {
result := make([]map[string]any, 0)
total := 0 // 总条数
batchSize := 1000 // 每次查询并迁移1000条数据
var err error
srcMeta := srcConn.GetMetaData()
srcConverter := srcMeta.GetDataHelper()
targetDialect := targetConn.GetDialect()
// 游标查询源表数据,并批量插入目标表
err = srcConn.WalkTableRows(ctx, tableName, func(row map[string]any, columns []*dbi.QueryColumn) error {
err = srcConn.WalkTableRows(context.Background(), tableName, func(row map[string]any, columns []*dbi.QueryColumn) error {
total++
rawValue := map[string]any{}
for _, column := range columns {
@@ -245,27 +285,40 @@ func (app *dbTransferAppImpl) transferData(ctx context.Context, tableName string
}
result = append(result, rawValue)
if total%batchSize == 0 {
err = app.transfer2Target(targetConn, targetColumns, result, targetDialect, tableName)
err = app.transfer2Target(taskId, targetConn, targetColumns, result, targetDialect, tableName)
if err != nil {
logx.Error("批量插入目标表数据失败", err)
logx.ErrorfContext(ctx, "批量插入目标表数据失败: %v", err)
return err
}
result = result[:0]
}
return nil
})
if err != nil {
return total, err
}
// 处理剩余的数据
if len(result) > 0 {
err = app.transfer2Target(targetConn, targetColumns, result, targetDialect, tableName)
err = app.transfer2Target(taskId, targetConn, targetColumns, result, targetDialect, tableName)
if err != nil {
logx.Error(fmt.Sprintf("批量插入目标表数据失败,表名:%s", tableName), err)
logx.ErrorfContext(ctx, "批量插入目标表数据失败,表名:%s error: %v", tableName, err)
return 0, err
}
}
return total, err
}
func (app *dbTransferAppImpl) transfer2Target(targetConn *dbi.DbConn, targetColumns []dbi.Column, result []map[string]any, targetDialect dbi.Dialect, tbName string) error {
func (app *dbTransferAppImpl) transfer2Target(taskId uint64, targetConn *dbi.DbConn, targetColumns []dbi.Column, result []map[string]any, targetDialect dbi.Dialect, tbName string) error {
task, err := app.GetById(new(entity.DbTransferTask), taskId)
if err != nil {
return errorx.NewBiz("任务不存在")
}
if task.RunningState == entity.DbTransferTaskRunStateStop {
return errorx.NewBiz("迁移终止")
}
tx, err := targetConn.Begin()
if err != nil {
return err
@@ -304,7 +357,7 @@ func (app *dbTransferAppImpl) transfer2Target(targetConn *dbi.DbConn, targetColu
defer func() {
if r := recover(); r != nil {
tx.Rollback()
logx.Error("批量插入目标表数据失败", r)
logx.Errorf("批量插入目标表数据失败: %v", r)
}
}()
@@ -313,7 +366,6 @@ func (app *dbTransferAppImpl) transfer2Target(targetConn *dbi.DbConn, targetColu
}
func (app *dbTransferAppImpl) transferIndex(_ context.Context, tableInfo dbi.Table, srcConn *dbi.DbConn, targetDialect dbi.Dialect) error {
// 查询源表索引信息
indexs, err := srcConn.GetMetaData().GetTableIndex(tableInfo.TableName)
if err != nil {