mirror of
https://gitee.com/dromara/mayfly-go
synced 2025-11-03 16:00:25 +08:00
!124 一些更新和bug
* fix: 代码合并 * feat:支持数据库版本兼容,目前兼容了oracle11g部分特性 * fix: 修改数据同步bug,数据sql里指定修改字段别,导致未正确记录修改字段值 * feat: 数据库迁移支持定时迁移和迁移到sql文件
This commit is contained in:
@@ -1,9 +1,12 @@
|
||||
package application
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"github.com/google/uuid"
|
||||
"mayfly-go/internal/db/application/dto"
|
||||
"mayfly-go/internal/db/dbm/dbi"
|
||||
"mayfly-go/internal/db/domain/entity"
|
||||
"mayfly-go/internal/db/domain/repository"
|
||||
@@ -11,10 +14,14 @@ import (
|
||||
sysentity "mayfly-go/internal/sys/domain/entity"
|
||||
"mayfly-go/pkg/base"
|
||||
"mayfly-go/pkg/cache"
|
||||
"mayfly-go/pkg/contextx"
|
||||
"mayfly-go/pkg/errorx"
|
||||
"mayfly-go/pkg/logx"
|
||||
"mayfly-go/pkg/model"
|
||||
"mayfly-go/pkg/scheduler"
|
||||
"mayfly-go/pkg/utils/collx"
|
||||
"mayfly-go/pkg/utils/writer"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -32,20 +39,27 @@ type DbTransferTask interface {
|
||||
|
||||
Delete(ctx context.Context, id uint64) error
|
||||
|
||||
InitJob()
|
||||
InitCronJob()
|
||||
|
||||
AddCronJob(ctx context.Context, taskEntity *entity.DbTransferTask)
|
||||
|
||||
RemoveCronJobById(taskId uint64)
|
||||
|
||||
CreateLog(ctx context.Context, taskId uint64) (uint64, error)
|
||||
|
||||
Run(ctx context.Context, taskId uint64, logId uint64)
|
||||
|
||||
IsRunning(taskId uint64) bool
|
||||
|
||||
Stop(ctx context.Context, taskId uint64) error
|
||||
}
|
||||
|
||||
type dbTransferAppImpl struct {
|
||||
base.AppImpl[*entity.DbTransferTask, repository.DbTransferTask]
|
||||
|
||||
dbApp Db `inject:"DbApp"`
|
||||
logApp sysapp.Syslog `inject:"SyslogApp"`
|
||||
dbApp Db `inject:"DbApp"`
|
||||
logApp sysapp.Syslog `inject:"SyslogApp"`
|
||||
fileApp DbTransferFile `inject:"DbTransferFileApp"`
|
||||
}
|
||||
|
||||
func (app *dbTransferAppImpl) InjectDbTransferTaskRepo(repo repository.DbTransferTask) {
|
||||
@@ -58,23 +72,97 @@ func (app *dbTransferAppImpl) GetPageList(condition *entity.DbTransferTaskQuery,
|
||||
|
||||
func (app *dbTransferAppImpl) Save(ctx context.Context, taskEntity *entity.DbTransferTask) error {
|
||||
var err error
|
||||
if taskEntity.Id == 0 {
|
||||
if taskEntity.Id == 0 { // 新建时生成key
|
||||
taskEntity.TaskKey = uuid.New().String()
|
||||
err = app.Insert(ctx, taskEntity)
|
||||
} else {
|
||||
err = app.UpdateById(ctx, taskEntity)
|
||||
}
|
||||
return err
|
||||
// 视情况添加或删除任务
|
||||
task, err := app.GetById(taskEntity.Id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
app.AddCronJob(ctx, task)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (app *dbTransferAppImpl) Delete(ctx context.Context, id uint64) error {
|
||||
if err := app.DeleteById(ctx, id); err != nil {
|
||||
return err
|
||||
}
|
||||
app.RemoveCronJobById(id)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (app *dbTransferAppImpl) InitJob() {
|
||||
app.UpdateByCond(context.TODO(), &entity.DbTransferTask{RunningState: entity.DbTransferTaskRunStateStop}, &entity.DbTransferTask{RunningState: entity.DbTransferTaskRunStateRunning})
|
||||
func (app *dbTransferAppImpl) AddCronJob(ctx context.Context, taskEntity *entity.DbTransferTask) {
|
||||
key := taskEntity.TaskKey
|
||||
// 先移除旧的任务
|
||||
scheduler.RemoveByKey(key)
|
||||
|
||||
// 根据状态添加新的任务
|
||||
if taskEntity.Status == entity.DbTransferTaskStatusEnable && taskEntity.CronAble == entity.DbTransferTaskCronAbleEnable {
|
||||
if key == "" {
|
||||
taskEntity.TaskKey = uuid.New().String()
|
||||
key = taskEntity.TaskKey
|
||||
_ = app.UpdateById(ctx, taskEntity)
|
||||
}
|
||||
|
||||
taskId := taskEntity.Id
|
||||
scheduler.AddFunByKey(key, taskEntity.Cron, func() {
|
||||
logx.Infof("开始执行同步任务: %d", taskId)
|
||||
logId, _ := app.CreateLog(ctx, taskId)
|
||||
app.Run(ctx, taskId, logId)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (app *dbTransferAppImpl) InitCronJob() {
|
||||
// 重启后,把正在运行的状态设置为停止
|
||||
_ = app.UpdateByCond(context.TODO(), &entity.DbTransferTask{RunningState: entity.DbTransferTaskRunStateStop}, &entity.DbTransferTask{RunningState: entity.DbTransferTaskRunStateRunning})
|
||||
ent := &entity.DbTransferTask{}
|
||||
list, err := app.ListByCond(model.NewModelCond(ent).Columns("id"))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if len(list) > 0 {
|
||||
// 移除所有正在运行的任务
|
||||
for _, task := range list {
|
||||
app.MarkStop(task.Id)
|
||||
}
|
||||
}
|
||||
// 把所有运行中的文件状态设置为失败
|
||||
_ = app.fileApp.UpdateByCond(context.TODO(), &entity.DbTransferFile{Status: entity.DbTransferFileStatusFail}, &entity.DbTransferFile{Status: entity.DbTransferFileStatusRunning})
|
||||
|
||||
// 把所有需要定时执行的任务添加到定时任务中
|
||||
pageParam := &model.PageParam{
|
||||
PageSize: 100,
|
||||
PageNum: 1,
|
||||
}
|
||||
cond := new(entity.DbTransferTaskQuery)
|
||||
cond.Status = entity.DbTransferTaskStatusEnable
|
||||
cond.CronAble = entity.DbTransferTaskCronAbleEnable
|
||||
jobs := new([]entity.DbTransferTask)
|
||||
|
||||
pr, _ := app.GetPageList(cond, pageParam, jobs)
|
||||
if nil == pr || pr.Total == 0 {
|
||||
return
|
||||
}
|
||||
total := pr.Total
|
||||
add := 0
|
||||
|
||||
for {
|
||||
for _, job := range *jobs {
|
||||
app.AddCronJob(contextx.NewTraceId(), &job)
|
||||
add++
|
||||
}
|
||||
if add >= int(total) {
|
||||
return
|
||||
}
|
||||
pageParam.PageNum++
|
||||
_, _ = app.GetPageList(cond, pageParam, jobs)
|
||||
}
|
||||
}
|
||||
|
||||
func (app *dbTransferAppImpl) CreateLog(ctx context.Context, taskId uint64) (uint64, error) {
|
||||
@@ -88,7 +176,6 @@ func (app *dbTransferAppImpl) CreateLog(ctx context.Context, taskId uint64) (uin
|
||||
}
|
||||
|
||||
func (app *dbTransferAppImpl) Run(ctx context.Context, taskId uint64, logId uint64) {
|
||||
defer app.logApp.Flush(logId, true)
|
||||
|
||||
task, err := app.GetById(taskId)
|
||||
if err != nil {
|
||||
@@ -97,7 +184,7 @@ func (app *dbTransferAppImpl) Run(ctx context.Context, taskId uint64, logId uint
|
||||
}
|
||||
|
||||
if app.IsRunning(taskId) {
|
||||
logx.Warnf("[%d]该任务正在运行中...", taskId)
|
||||
logx.Panicf("[%d]该任务正在运行中...", taskId)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -111,8 +198,7 @@ func (app *dbTransferAppImpl) Run(ctx context.Context, taskId uint64, logId uint
|
||||
}
|
||||
|
||||
// 标记该任务开始执行
|
||||
app.MarkRuning(taskId)
|
||||
defer app.MarkStop(taskId)
|
||||
app.MarkRunning(taskId)
|
||||
|
||||
// 获取源库连接、目标库连接,判断连接可用性,否则记录日志:xx连接不可用
|
||||
// 获取源库表信息
|
||||
@@ -121,15 +207,9 @@ func (app *dbTransferAppImpl) Run(ctx context.Context, taskId uint64, logId uint
|
||||
app.EndTransfer(ctx, logId, taskId, "获取源库连接失败", err, nil)
|
||||
return
|
||||
}
|
||||
// 获取目标库表信息
|
||||
targetConn, err := app.dbApp.GetDbConn(uint64(task.TargetDbId), task.TargetDbName)
|
||||
if err != nil {
|
||||
app.EndTransfer(ctx, logId, taskId, "获取目标库连接失败", err, nil)
|
||||
return
|
||||
}
|
||||
|
||||
// 获取迁移表信息
|
||||
var tables []dbi.Table
|
||||
|
||||
if task.CheckedKeys == "all" {
|
||||
tables, err = srcConn.GetMetaData().GetTables()
|
||||
if err != nil {
|
||||
@@ -145,8 +225,28 @@ func (app *dbTransferAppImpl) Run(ctx context.Context, taskId uint64, logId uint
|
||||
}
|
||||
}
|
||||
|
||||
// 迁移到文件或数据库
|
||||
if task.Mode == entity.DbTransferTaskModeFile {
|
||||
app.transfer2File(ctx, taskId, logId, task, srcConn, start, tables)
|
||||
} else if task.Mode == entity.DbTransferTaskModeDb {
|
||||
defer app.MarkStop(taskId)
|
||||
defer app.logApp.Flush(logId, true)
|
||||
app.transfer2Db(ctx, taskId, logId, task, srcConn, start, tables)
|
||||
} else {
|
||||
app.EndTransfer(ctx, logId, taskId, "迁移模式出错,目前仅支持迁移到文件或数据库", err, nil)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (app *dbTransferAppImpl) transfer2Db(ctx context.Context, taskId uint64, logId uint64, task *entity.DbTransferTask, srcConn *dbi.DbConn, start time.Time, tables []dbi.Table) {
|
||||
// 获取目标库表信息
|
||||
targetConn, err := app.dbApp.GetDbConn(uint64(task.TargetDbId), task.TargetDbName)
|
||||
if err != nil {
|
||||
app.EndTransfer(ctx, logId, taskId, "获取目标库连接失败", err, nil)
|
||||
return
|
||||
}
|
||||
// 迁移表
|
||||
if err = app.transferTables(ctx, logId, task, srcConn, targetConn, tables); err != nil {
|
||||
if err = app.transferDbTables(ctx, logId, task, srcConn, targetConn, tables); err != nil {
|
||||
app.EndTransfer(ctx, logId, taskId, "迁移表失败", err, nil)
|
||||
return
|
||||
}
|
||||
@@ -154,6 +254,67 @@ func (app *dbTransferAppImpl) Run(ctx context.Context, taskId uint64, logId uint
|
||||
app.EndTransfer(ctx, logId, taskId, fmt.Sprintf("执行迁移完成,执行迁移任务[taskId = %d]完成, 耗时:%v", taskId, time.Since(start)), nil, nil)
|
||||
}
|
||||
|
||||
func (app *dbTransferAppImpl) transfer2File(ctx context.Context, taskId uint64, logId uint64, task *entity.DbTransferTask, srcConn *dbi.DbConn, start time.Time, tables []dbi.Table) {
|
||||
|
||||
// 1、新增迁移文件数据
|
||||
nowTime := time.Now()
|
||||
tFile := &entity.DbTransferFile{
|
||||
TaskId: taskId,
|
||||
CreateTime: &nowTime,
|
||||
Status: entity.DbTransferFileStatusRunning,
|
||||
FileDbType: cmp.Or(task.TargetFileDbType, task.TargetDbType),
|
||||
FileName: fmt.Sprintf("%s.sql", task.TaskName), // 用于下载和展示
|
||||
FileUuid: uuid.New().String(), // 用于存放到磁盘
|
||||
LogId: logId,
|
||||
}
|
||||
_ = app.fileApp.Save(ctx, tFile)
|
||||
|
||||
// 新建一个文件,文件位置为 {transferPath}/{taskId}/{uuid}.sql
|
||||
filePath := app.fileApp.GetFilePath(tFile)
|
||||
|
||||
// 从tables提取表名
|
||||
tableNames := make([]string, 0)
|
||||
for _, table := range tables {
|
||||
tableNames = append(tableNames, table.TableName)
|
||||
}
|
||||
// 2、把源库数据迁移到文件
|
||||
app.Log(ctx, logId, fmt.Sprintf("开始迁移表数据到文件: %s", filePath))
|
||||
|
||||
app.Log(ctx, logId, fmt.Sprintf("目标库文件语言类型: %s", task.TargetFileDbType))
|
||||
|
||||
go func() {
|
||||
defer app.MarkStop(taskId)
|
||||
defer app.logApp.Flush(logId, true)
|
||||
ctx = context.Background()
|
||||
err := app.dbApp.DumpDb(ctx, &dto.DumpDb{
|
||||
LogId: logId,
|
||||
DbId: uint64(task.SrcDbId),
|
||||
DbName: task.SrcDbName,
|
||||
TargetDbType: dbi.DbType(task.TargetFileDbType),
|
||||
Tables: tableNames,
|
||||
DumpDDL: true,
|
||||
DumpData: true,
|
||||
Writer: writer.NewFileWriter(filePath),
|
||||
Log: func(msg string) { // 记录日志
|
||||
app.Log(ctx, logId, msg)
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
app.EndTransfer(ctx, logId, taskId, "数据库迁移失败", err, nil)
|
||||
tFile.Status = entity.DbTransferFileStatusFail
|
||||
_ = app.fileApp.UpdateById(ctx, tFile)
|
||||
// 删除文件
|
||||
_ = os.Remove(filePath)
|
||||
return
|
||||
}
|
||||
app.EndTransfer(ctx, logId, taskId, "数据库迁移完成", err, nil)
|
||||
|
||||
tFile.Status = entity.DbTransferFileStatusSuccess
|
||||
_ = app.fileApp.UpdateById(ctx, tFile)
|
||||
}()
|
||||
|
||||
}
|
||||
|
||||
func (app *dbTransferAppImpl) Stop(ctx context.Context, taskId uint64) error {
|
||||
task, err := app.GetById(taskId)
|
||||
if err != nil {
|
||||
@@ -173,7 +334,7 @@ func (app *dbTransferAppImpl) Stop(ctx context.Context, taskId uint64) error {
|
||||
}
|
||||
|
||||
// 迁移表
|
||||
func (app *dbTransferAppImpl) transferTables(ctx context.Context, logId uint64, task *entity.DbTransferTask, srcConn *dbi.DbConn, targetConn *dbi.DbConn, tables []dbi.Table) error {
|
||||
func (app *dbTransferAppImpl) transferDbTables(ctx context.Context, logId uint64, task *entity.DbTransferTask, srcConn *dbi.DbConn, targetConn *dbi.DbConn, tables []dbi.Table) error {
|
||||
tableNames := make([]string, 0)
|
||||
tableMap := make(map[string]dbi.Table) // 以表名分组,存放表信息
|
||||
for _, table := range tables {
|
||||
@@ -255,10 +416,7 @@ func (app *dbTransferAppImpl) transferTables(ctx context.Context, logId uint64,
|
||||
})
|
||||
}
|
||||
|
||||
if err := errGroup.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return errGroup.Wait()
|
||||
}
|
||||
|
||||
func (app *dbTransferAppImpl) transferData(ctx context.Context, logId uint64, taskId uint64, tableName string, targetColumns []dbi.Column, srcConn *dbi.DbConn, targetConn *dbi.DbConn) (int, error) {
|
||||
@@ -386,8 +544,8 @@ func (app *dbTransferAppImpl) transferIndex(_ context.Context, tableInfo dbi.Tab
|
||||
return targetDialect.CreateIndex(tableInfo, indexs)
|
||||
}
|
||||
|
||||
// MarkRuning 标记任务执行中
|
||||
func (app *dbTransferAppImpl) MarkRuning(taskId uint64) {
|
||||
// MarkRunning 标记任务执行中
|
||||
func (app *dbTransferAppImpl) MarkRunning(taskId uint64) {
|
||||
cache.Set(fmt.Sprintf("mayfly:db:transfer:%d", taskId), 1, -1)
|
||||
}
|
||||
|
||||
@@ -434,3 +592,10 @@ func (app *dbTransferAppImpl) EndTransfer(ctx context.Context, logId uint64, tas
|
||||
task.RunningState = transferState
|
||||
app.UpdateById(context.Background(), task)
|
||||
}
|
||||
|
||||
func (app *dbTransferAppImpl) RemoveCronJobById(taskId uint64) {
|
||||
task, err := app.GetById(taskId)
|
||||
if err == nil {
|
||||
scheduler.RemoveByKey(task.TaskKey)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user