@@ -3,13 +3,13 @@ package application
import (
"cmp"
"context"
"database/sql"
"encoding/json"
"fmt"
"mayfly-go/internal/db/dbm/dbi"
"mayfly-go/internal/db/domain/entity"
"mayfly-go/internal/db/domain/repository"
"mayfly-go/pkg/base"
"mayfly-go/pkg/cache"
"mayfly-go/pkg/contextx"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/logx"
@@ -42,6 +42,8 @@ type DataSyncTask interface {
RunCronJob ( ctx context . Context , id uint64 ) error
StopTask ( ctx context . Context , id uint64 ) error
GetTaskLogList ( condition * entity . DataSyncLogQuery , pageParam * model . PageParam , toEntity any , orderBy ... string ) ( * model . PageResult [ any ] , error )
}
@@ -117,17 +119,10 @@ func (app *dataSyncAppImpl) RemoveCronJobById(taskId uint64) {
if err == nil {
scheduler . RemoveByKey ( task . TaskKey )
}
}
func ( app * dataSyncAppImpl ) changeRunningState ( id uint64 , state int8 ) {
task := new ( entity . DataSyncTask )
task . Id = id
task . RunningState = state
_ = app . UpdateById ( context . Background ( ) , task )
app . MarkStop ( taskId )
}
func ( app * dataSyncAppImpl ) RunCronJob ( ctx context . Context , id uint64 ) error {
// 查询最新的任务信息
task , err := app . GetById ( id )
if err != nil {
return errorx . NewBiz ( "task not found" )
@@ -135,8 +130,9 @@ func (app *dataSyncAppImpl) RunCronJob(ctx context.Context, id uint64) error {
if task . RunningState == entity . DataSyncTaskRunStateRunning {
return errorx . NewBiz ( "the task is in progress" )
}
// 开始运行时,修改状态为运行中
app . changeRunningState ( id , entity . DataSyncTaskRunStateRunning )
// 标记该任务运行中
app . MarkRunning ( id )
logx . InfofContext ( ctx , "start the data synchronization task: %s => %s" , task . TaskName , task . TaskKey )
@@ -183,8 +179,11 @@ func (app *dataSyncAppImpl) RunCronJob(ctx context.Context, id uint64) error {
log . ErrText = fmt . Sprintf ( "execution failure: %s" , err . Error ( ) )
logx . ErrorContext ( ctx , log . ErrText )
log . Status = entity . DataSyncTaskStateFail
app . endRunning ( task , log )
} else {
log . Status = entity . DataSyncTaskStateSuccess
}
app . endRunning ( task , log )
} ( )
return nil
@@ -210,16 +209,6 @@ func (app *dataSyncAppImpl) doDataSync(ctx context.Context, sql string, task *en
if err != nil {
return syncLog , errorx . NewBiz ( "failed to connect to the target database: %s" , err . Error ( ) )
}
targetDbTx , err := targetConn . Begin ( )
if err != nil {
return syncLog , errorx . NewBiz ( "failed to start the target database transaction: %s" , err . Error ( ) )
}
defer func ( ) {
if r := recover ( ) ; r != nil {
targetDbTx . Rollback ( )
err = fmt . Errorf ( "%v" , r )
}
} ( )
// task.FieldMap为json数组字符串 [{"src":"id","target":"id"}], 转为map
var fieldMap [ ] map [ string ] string
@@ -227,13 +216,11 @@ func (app *dataSyncAppImpl) doDataSync(ctx context.Context, sql string, task *en
if err != nil {
return syncLog , errorx . NewBiz ( "there was an error parsing the field map json: %s" , err . Error ( ) )
}
var updFieldType * dbi . DbDataType
// 记录本次同步数据总数
total := 0
batchSize := task . PageSize
result := make ( [ ] map [ string ] any , 0 )
var queryColumns [ ] * dbi . QueryColumn
// 如果有数据库别名, 则从UpdField中去掉数据库别名, 如: a.id => id, 用于获取字段具体名称
updFieldName := task . UpdField
@@ -255,23 +242,10 @@ func (app *dataSyncAppImpl) doDataSync(ctx context.Context, sql string, task *en
} )
_ , err = srcConn . WalkQueryRows ( context . Background ( ) , sql , func ( row map [ string ] any , columns [ ] * dbi . QueryColumn ) error {
if len ( queryColumns ) == 0 {
queryColumns = columns
// 遍历columns 取task.UpdField的字段类型
updFieldType = dbi . DefaultDbDataType
for _ , column := range columns {
if strings . EqualFold ( column . Name , updFieldName ) {
updFieldType = column . DbDataType
break
}
}
}
total ++
result = append ( result , row )
if total % batchSize == 0 {
if err := app . srcData2TargetDb ( result , fieldMap , updFieldType , updFieldName , task , targetConn , targetDbTx , targetInsertColumns ) ; err != nil {
if err := app . srcData2TargetDb ( result , fieldMap , updFieldName , task , targetConn , targetInsertColumns ) ; err != nil {
return err
}
@@ -282,43 +256,37 @@ func (app *dataSyncAppImpl) doDataSync(ctx context.Context, sql string, task *en
app . saveLog ( syncLog )
result = result [ : 0 ]
// 运行过程中,判断状态是否为已关闭,是则结束运行,否则继续运行
if ! app . IsRunning ( task . Id ) {
return errorx . NewBiz ( "the task has been terminated manually" )
}
}
return nil
} )
if err != nil {
targetDbTx . Rollback ( )
return syncLog , err
}
// 处理剩余的数据
if len ( result ) > 0 {
if err := app . srcData2TargetDb ( result , fieldMap , updFieldType , updFieldName , task , targetConn , targetDbTx , targetInsertColumns ) ; err != nil {
targetDbTx . Rollback ( )
if err := app . srcData2TargetDb ( result , fieldMap , updFieldName , task , targetConn , targetInsertColumns ) ; err != nil {
return syncLog , err
}
}
// 如果是mssql, 暂不手动提交事务, 否则报错 mssql: The COMMIT TRANSACTION request has no corresponding BEGIN TRANSACTION.
if err := targetDbTx . Commit ( ) ; err != nil {
if targetConn . Info . Type != dbi . ToDbType ( "mssql" ) {
return syncLog , errorx . NewBiz ( "data synchronization - The target database transaction failed to commit: %s" , err . Error ( ) )
}
}
logx . InfofContext ( ctx , "synchronous task: [%s], finished execution, save records successfully: [%d]" , task . TaskName , total )
// 保存 执行成功日志
// 执行成功日志
syncLog . ErrText = fmt . Sprintf ( "the synchronous task was executed successfully. New data: %d" , total )
syncLog . Status = entity . DataSyncTaskStateSuccess
syncLog . ResNum = total
app . endRunning ( task , syncLog )
return syncLog , nil
}
func ( app * dataSyncAppImpl ) srcData2TargetDb ( srcRes [ ] map [ string ] any , fieldMap [ ] map [ string ] string , updFieldType * dbi . DbDataType , updFieldName string , task * entity . DataSyncTask , targetDbConn * dbi . DbConn , targetDbTx * sql . Tx , targetInsertColumns [ ] dbi . Column ) error {
func ( app * dataSyncAppImpl ) srcData2TargetDb ( srcRes [ ] map [ string ] any , fieldMap [ ] map [ string ] string , updFieldName string , task * entity . DataSyncTask , targetDbConn * dbi . DbConn , targetInsertColumns [ ] dbi . Column ) ( err error ) {
// 遍历res, 组装数据
var targetData = make ( [ ] map [ string ] any , 0 )
for _ , srcData := range srcRes {
@@ -341,6 +309,37 @@ func (app *dataSyncAppImpl) srcData2TargetDb(srcRes []map[string]any, fieldMap [
}
// 执行插入
targetDialect := targetDbConn . GetDialect ( )
// 生成目标数据库批量插入sql, 并执行
sqls := targetDialect . GetSQLGenerator ( ) . GenInsert ( task . TargetTableName , targetInsertColumns , tragetValues , cmp . Or ( task . DuplicateStrategy , dbi . DuplicateStrategyNone ) )
// 开启本批次执行事务
targetDbTx , err := targetDbConn . Begin ( )
if err != nil {
return errorx . NewBiz ( "failed to start the target database transaction: %s" , err . Error ( ) )
}
defer func ( ) {
if r := recover ( ) ; r != nil {
targetDbTx . Rollback ( )
err = fmt . Errorf ( "%v" , r )
}
} ( )
for _ , sql := range sqls {
_ , err := targetDbTx . Exec ( sql )
if err != nil {
targetDbTx . Rollback ( )
return err
}
}
// 如果是mssql, 暂不手动提交事务, 否则报错 mssql: The COMMIT TRANSACTION request has no corresponding BEGIN TRANSACTION.
if err := targetDbTx . Commit ( ) ; err != nil {
if targetDbConn . Info . Type != dbi . ToDbType ( "mssql" ) {
return errorx . NewBiz ( "data synchronization - The target database transaction failed to commit: %s" , err . Error ( ) )
}
}
setUpdateFieldVal := func ( field string ) {
// 解决字段大小写问题
@@ -351,27 +350,20 @@ func (app *dataSyncAppImpl) srcData2TargetDb(srcRes []map[string]any, fieldMap [
task . UpdFieldVal = cast . ToString ( updFieldVal )
}
// 如果指定了更新字段,则以更新字段取值
setUpdateFieldVal ( cmp . Or ( task . UpdFieldSrc , updFieldName ) )
targetDialect := targetDbConn . GetDialect ( )
return nil
}
// 生成目标数据库批量插入sql, 并执行
sqls := targetDialect . GetSQLGenerator ( ) . GenInsert ( task . TargetTableName , targetInsertColumns , tragetValues , cmp . Or ( task . DuplicateStrategy , dbi . DuplicateStrategyNone ) )
for _ , sql : = range sqls {
_ , err : = targetDbTx . Exec ( sql )
if err != nil {
return err
}
func ( app * dataSyncAppImpl ) StopTask ( ctx context . Context , taskId uint64 ) error {
task := new ( entity . DataSyncTask )
task . Id = taskId
task . RunningState = entity . DataSyncTaskRunStateStop
if err := app . UpdateById ( ctx , task ) ; err != nil {
return err
}
// 运行过程中,判断状态是否为已关闭,是则结束运行,否则继续运行
taskParam , _ := app . GetById ( task . Id )
if taskParam . RunningState == entity . DataSyncTaskRunStateStop {
return errorx . NewBiz ( "the task has been terminated manually" )
}
app . MarkStop ( taskId )
return nil
}
@@ -382,9 +374,7 @@ func (app *dataSyncAppImpl) endRunning(taskEntity *entity.DataSyncTask, log *ent
task := new ( entity . DataSyncTask )
task . Id = taskEntity . Id
task . RecentState = state
if state = = e ntity. DataSyncTaskStateSuccess {
task . UpdFieldVal = taskEntity . UpdFieldVal
}
task . UpdFieldVal = taskE ntity. UpdFieldVal
task . RunningState = entity . DataSyncTaskRunStateReady
// 运行失败之后设置任务状态为禁用
//if state == entity.DataSyncTaskStateFail {
@@ -394,6 +384,7 @@ func (app *dataSyncAppImpl) endRunning(taskEntity *entity.DataSyncTask, log *ent
_ = app . UpdateById ( context . Background ( ) , task )
// 保存执行日志
app . saveLog ( log )
app . MarkStop ( task . Id )
}
func ( app * dataSyncAppImpl ) saveLog ( log * entity . DataSyncLog ) {
@@ -440,3 +431,23 @@ func (app *dataSyncAppImpl) InitCronJob() {
func ( app * dataSyncAppImpl ) GetTaskLogList ( condition * entity . DataSyncLogQuery , pageParam * model . PageParam , toEntity any , orderBy ... string ) ( * model . PageResult [ any ] , error ) {
return app . dbDataSyncLogRepo . GetTaskLogList ( condition , pageParam , toEntity , orderBy ... )
}
// MarkRunning 标记任务执行中
func ( app * dataSyncAppImpl ) MarkRunning ( taskId uint64 ) {
task := new ( entity . DataSyncTask )
task . Id = taskId
task . RunningState = entity . DataSyncTaskRunStateRunning
_ = app . UpdateById ( context . Background ( ) , task )
cache . Set ( fmt . Sprintf ( "mayfly:db:syncdata:%d" , taskId ) , 1 , - 1 )
}
// MarkStop 标记任务结束
func ( app * dataSyncAppImpl ) MarkStop ( taskId uint64 ) {
cache . Del ( fmt . Sprintf ( "mayfly:db:syncdata:%d" , taskId ) )
}
// IsRunning 判断任务是否执行中
func ( app * dataSyncAppImpl ) IsRunning ( taskId uint64 ) bool {
return cache . GetStr ( fmt . Sprintf ( "mayfly:db:syncdata:%d" , taskId ) ) != ""
}