mirror of
https://gitee.com/dromara/mayfly-go
synced 2026-01-04 05:36:35 +08:00
refactor: dbms优化
This commit is contained in:
@@ -9,6 +9,7 @@ import (
|
||||
sysapp "mayfly-go/internal/sys/application"
|
||||
sysentity "mayfly-go/internal/sys/domain/entity"
|
||||
"mayfly-go/pkg/base"
|
||||
"mayfly-go/pkg/cache"
|
||||
"mayfly-go/pkg/errorx"
|
||||
"mayfly-go/pkg/gormx"
|
||||
"mayfly-go/pkg/logx"
|
||||
@@ -17,6 +18,8 @@ import (
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
type DbTransferTask interface {
|
||||
@@ -95,12 +98,19 @@ func (app *dbTransferAppImpl) Run(ctx context.Context, taskId uint64) {
|
||||
logx.Errorf("创建DBMS-执行数据迁移日志失败:%v", err)
|
||||
return
|
||||
}
|
||||
defer app.logApp.Flush(logId)
|
||||
defer app.logApp.Flush(logId, true)
|
||||
|
||||
// 修改状态与关联日志id
|
||||
task.LogId = logId
|
||||
task.RunningState = entity.DbTransferTaskRunStateRunning
|
||||
app.UpdateById(ctx, task)
|
||||
if err = app.UpdateById(ctx, task); err != nil {
|
||||
logx.Errorf("更新任务执行状态失败")
|
||||
return
|
||||
}
|
||||
|
||||
// 标记该任务开始执行
|
||||
app.MarkRuning(taskId)
|
||||
defer app.MarkStop(taskId)
|
||||
|
||||
// 获取源库连接、目标库连接,判断连接可用性,否则记录日志:xx连接不可用
|
||||
// 获取源库表信息
|
||||
@@ -142,40 +152,6 @@ func (app *dbTransferAppImpl) Run(ctx context.Context, taskId uint64) {
|
||||
app.EndTransfer(ctx, logId, taskId, fmt.Sprintf("执行迁移完成,执行迁移任务[taskId = %d]完成, 耗时:%v", taskId, time.Since(start)), nil, nil)
|
||||
}
|
||||
|
||||
func (app *dbTransferAppImpl) Log(ctx context.Context, logId uint64, msg string) {
|
||||
logType := sysentity.SyslogTypeRunning
|
||||
logx.InfoContext(ctx, msg)
|
||||
app.logApp.AppendLog(logId, &sysapp.AppendLogReq{
|
||||
AppendResp: msg,
|
||||
Type: logType,
|
||||
})
|
||||
}
|
||||
|
||||
func (app *dbTransferAppImpl) EndTransfer(ctx context.Context, logId uint64, taskId uint64, msg string, err error, extra map[string]any) {
|
||||
logType := sysentity.SyslogTypeSuccess
|
||||
transferState := entity.DbTransferTaskRunStateSuccess
|
||||
if err != nil {
|
||||
msg = fmt.Sprintf("%s: %s", msg, err.Error())
|
||||
logx.ErrorContext(ctx, msg)
|
||||
logType = sysentity.SyslogTypeError
|
||||
transferState = entity.DbTransferTaskRunStateFail
|
||||
} else {
|
||||
logx.InfoContext(ctx, msg)
|
||||
}
|
||||
|
||||
app.logApp.AppendLog(logId, &sysapp.AppendLogReq{
|
||||
AppendResp: msg,
|
||||
Extra: extra,
|
||||
Type: logType,
|
||||
})
|
||||
|
||||
// 修改任务状态
|
||||
task := new(entity.DbTransferTask)
|
||||
task.Id = taskId
|
||||
task.RunningState = transferState
|
||||
app.UpdateById(context.Background(), task)
|
||||
}
|
||||
|
||||
func (app *dbTransferAppImpl) Stop(ctx context.Context, taskId uint64) error {
|
||||
task, err := app.GetById(new(entity.DbTransferTask), taskId)
|
||||
if err != nil {
|
||||
@@ -186,7 +162,12 @@ func (app *dbTransferAppImpl) Stop(ctx context.Context, taskId uint64) error {
|
||||
return errorx.NewBiz("该任务未在执行")
|
||||
}
|
||||
task.RunningState = entity.DbTransferTaskRunStateStop
|
||||
return app.UpdateById(ctx, task)
|
||||
if err = app.UpdateById(ctx, task); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
app.MarkStop(taskId)
|
||||
return nil
|
||||
}
|
||||
|
||||
// 迁移表
|
||||
@@ -222,50 +203,63 @@ func (app *dbTransferAppImpl) transferTables(ctx context.Context, logId uint64,
|
||||
srcColumnHelper := srcMeta.GetColumnHelper()
|
||||
targetColumnHelper := targetConn.GetMetaData().GetColumnHelper()
|
||||
|
||||
for _, tbName := range sortTableNames {
|
||||
cols := columnMap[tbName]
|
||||
targetCols := make([]dbi.Column, 0)
|
||||
for _, col := range cols {
|
||||
colPtr := &col
|
||||
// 源库列转为公共列
|
||||
srcColumnHelper.ToCommonColumn(colPtr)
|
||||
// 公共列转为目标库列
|
||||
targetColumnHelper.ToColumn(colPtr)
|
||||
targetCols = append(targetCols, *colPtr)
|
||||
}
|
||||
// 分组迁移
|
||||
tableGroups := collx.ArraySplit[string](sortTableNames, 2)
|
||||
errGroup, _ := errgroup.WithContext(ctx)
|
||||
|
||||
// 通过公共列信息生成目标库的建表语句,并执行目标库建表
|
||||
app.Log(ctx, logId, fmt.Sprintf("开始创建目标表: 表名:%s", tbName))
|
||||
_, err := targetDialect.CreateTable(targetCols, tableMap[tbName], true)
|
||||
if err != nil {
|
||||
return errorx.NewBiz(fmt.Sprintf("创建目标表失败: 表名:%s, error: %s", tbName, err.Error()))
|
||||
}
|
||||
app.Log(ctx, logId, fmt.Sprintf("创建目标表成功: 表名:%s", tbName))
|
||||
for _, tables := range tableGroups {
|
||||
errGroup.Go(func() error {
|
||||
for _, tbName := range tables {
|
||||
cols := columnMap[tbName]
|
||||
targetCols := make([]dbi.Column, 0)
|
||||
for _, col := range cols {
|
||||
colPtr := &col
|
||||
// 源库列转为公共列
|
||||
srcColumnHelper.ToCommonColumn(colPtr)
|
||||
// 公共列转为目标库列
|
||||
targetColumnHelper.ToColumn(colPtr)
|
||||
targetCols = append(targetCols, *colPtr)
|
||||
}
|
||||
|
||||
// 迁移数据
|
||||
app.Log(ctx, logId, fmt.Sprintf("开始迁移数据: 表名:%s", tbName))
|
||||
total, err := app.transferData(ctx, task.Id, tbName, targetCols, srcConn, targetConn)
|
||||
if err != nil {
|
||||
return errorx.NewBiz(fmt.Sprintf("迁移数据失败: 表名:%s, error: %s", tbName, err.Error()))
|
||||
}
|
||||
app.Log(ctx, logId, fmt.Sprintf("迁移数据成功: 表名:%s, 数据:%d 条", tbName, total))
|
||||
// 通过公共列信息生成目标库的建表语句,并执行目标库建表
|
||||
app.Log(ctx, logId, fmt.Sprintf("开始创建目标表: 表名:%s", tbName))
|
||||
_, err := targetDialect.CreateTable(targetCols, tableMap[tbName], true)
|
||||
if err != nil {
|
||||
return errorx.NewBiz(fmt.Sprintf("创建目标表失败: 表名:%s, error: %s", tbName, err.Error()))
|
||||
}
|
||||
app.Log(ctx, logId, fmt.Sprintf("创建目标表成功: 表名:%s", tbName))
|
||||
|
||||
// 有些数据库迁移完数据之后,需要更新表自增序列为当前表最大值
|
||||
targetDialect.UpdateSequence(tbName, targetCols)
|
||||
// 迁移数据
|
||||
app.Log(ctx, logId, fmt.Sprintf("开始迁移数据: 表名:%s", tbName))
|
||||
total, err := app.transferData(ctx, logId, task.Id, tbName, targetCols, srcConn, targetConn)
|
||||
if err != nil {
|
||||
return errorx.NewBiz(fmt.Sprintf("迁移数据失败: 表名:%s, error: %s", tbName, err.Error()))
|
||||
}
|
||||
app.Log(ctx, logId, fmt.Sprintf("迁移数据成功: 表名:%s, 数据:%d 条", tbName, total))
|
||||
|
||||
// 迁移索引信息
|
||||
app.Log(ctx, logId, fmt.Sprintf("开始迁移索引: 表名:%s", tbName))
|
||||
err = app.transferIndex(ctx, tableMap[tbName], srcConn, targetDialect)
|
||||
if err != nil {
|
||||
return errorx.NewBiz(fmt.Sprintf("迁移索引失败: 表名:%s, error: %s", tbName, err.Error()))
|
||||
}
|
||||
app.Log(ctx, logId, fmt.Sprintf("迁移索引成功: 表名:%s", tbName))
|
||||
// 有些数据库迁移完数据之后,需要更新表自增序列为当前表最大值
|
||||
targetDialect.UpdateSequence(tbName, targetCols)
|
||||
|
||||
// 迁移索引信息
|
||||
app.Log(ctx, logId, fmt.Sprintf("开始迁移索引: 表名:%s", tbName))
|
||||
err = app.transferIndex(ctx, tableMap[tbName], srcConn, targetDialect)
|
||||
if err != nil {
|
||||
return errorx.NewBiz(fmt.Sprintf("迁移索引失败: 表名:%s, error: %s", tbName, err.Error()))
|
||||
}
|
||||
app.Log(ctx, logId, fmt.Sprintf("迁移索引成功: 表名:%s", tbName))
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
if err := errGroup.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (app *dbTransferAppImpl) transferData(ctx context.Context, taskId uint64, tableName string, targetColumns []dbi.Column, srcConn *dbi.DbConn, targetConn *dbi.DbConn) (int, error) {
|
||||
func (app *dbTransferAppImpl) transferData(ctx context.Context, logId uint64, taskId uint64, tableName string, targetColumns []dbi.Column, srcConn *dbi.DbConn, targetConn *dbi.DbConn) (int, error) {
|
||||
result := make([]map[string]any, 0)
|
||||
total := 0 // 总条数
|
||||
batchSize := 1000 // 每次查询并迁移1000条数据
|
||||
@@ -273,6 +267,7 @@ func (app *dbTransferAppImpl) transferData(ctx context.Context, taskId uint64, t
|
||||
srcMeta := srcConn.GetMetaData()
|
||||
srcConverter := srcMeta.GetDataHelper()
|
||||
targetDialect := targetConn.GetDialect()
|
||||
logExtraKey := fmt.Sprintf("`%s` 当前已迁移数据量: ", tableName)
|
||||
|
||||
// 游标查询源表数据,并批量插入目标表
|
||||
err = srcConn.WalkTableRows(context.Background(), tableName, func(row map[string]any, columns []*dbi.QueryColumn) error {
|
||||
@@ -291,6 +286,7 @@ func (app *dbTransferAppImpl) transferData(ctx context.Context, taskId uint64, t
|
||||
return err
|
||||
}
|
||||
result = result[:0]
|
||||
app.logApp.SetExtra(logId, logExtraKey, total)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
@@ -307,15 +303,13 @@ func (app *dbTransferAppImpl) transferData(ctx context.Context, taskId uint64, t
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
// 置空当前表数据迁移量进度
|
||||
app.logApp.SetExtra(logId, logExtraKey, nil)
|
||||
return total, err
|
||||
}
|
||||
|
||||
func (app *dbTransferAppImpl) transfer2Target(taskId uint64, targetConn *dbi.DbConn, targetColumns []dbi.Column, result []map[string]any, targetDialect dbi.Dialect, tbName string) error {
|
||||
task, err := app.GetById(new(entity.DbTransferTask), taskId)
|
||||
if err != nil {
|
||||
return errorx.NewBiz("任务不存在")
|
||||
}
|
||||
if task.RunningState == entity.DbTransferTaskRunStateStop {
|
||||
if !app.IsRunning(taskId) {
|
||||
return errorx.NewBiz("迁移终止")
|
||||
}
|
||||
|
||||
@@ -376,10 +370,55 @@ func (app *dbTransferAppImpl) transferIndex(_ context.Context, tableInfo dbi.Tab
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(indexs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 通过表名、索引信息生成建索引语句,并执行到目标表
|
||||
return targetDialect.CreateIndex(tableInfo, indexs)
|
||||
}
|
||||
|
||||
// MarkRuning 标记任务执行中
|
||||
func (app *dbTransferAppImpl) MarkRuning(taskId uint64) {
|
||||
cache.Set(fmt.Sprintf("mayfly:db:transfer:%d", taskId), 1, -1)
|
||||
}
|
||||
|
||||
// MarkStop 标记任务结束
|
||||
func (app *dbTransferAppImpl) MarkStop(taskId uint64) {
|
||||
cache.Del(fmt.Sprintf("mayfly:db:transfer:%d", taskId))
|
||||
}
|
||||
|
||||
// IsRunning 判断任务是否执行中
|
||||
func (app *dbTransferAppImpl) IsRunning(taskId uint64) bool {
|
||||
return cache.GetStr(fmt.Sprintf("mayfly:db:transfer:%d", taskId)) != ""
|
||||
}
|
||||
|
||||
func (app *dbTransferAppImpl) Log(ctx context.Context, logId uint64, msg string, extra ...any) {
|
||||
logType := sysentity.SyslogTypeRunning
|
||||
logx.InfoContext(ctx, msg)
|
||||
app.logApp.AppendLog(logId, &sysapp.AppendLogReq{
|
||||
AppendResp: msg,
|
||||
Type: logType,
|
||||
})
|
||||
}
|
||||
|
||||
func (app *dbTransferAppImpl) EndTransfer(ctx context.Context, logId uint64, taskId uint64, msg string, err error, extra map[string]any) {
|
||||
logType := sysentity.SyslogTypeSuccess
|
||||
transferState := entity.DbTransferTaskRunStateSuccess
|
||||
if err != nil {
|
||||
msg = fmt.Sprintf("%s: %s", msg, err.Error())
|
||||
logx.ErrorContext(ctx, msg)
|
||||
logType = sysentity.SyslogTypeError
|
||||
transferState = entity.DbTransferTaskRunStateFail
|
||||
} else {
|
||||
logx.InfoContext(ctx, msg)
|
||||
}
|
||||
|
||||
app.logApp.AppendLog(logId, &sysapp.AppendLogReq{
|
||||
AppendResp: msg,
|
||||
Extra: extra,
|
||||
Type: logType,
|
||||
})
|
||||
|
||||
// 修改任务状态
|
||||
task := new(entity.DbTransferTask)
|
||||
task.Id = taskId
|
||||
task.RunningState = transferState
|
||||
app.UpdateById(context.Background(), task)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user