2024-01-05 05:31:32 +00:00
|
|
|
|
package application
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
|
"context"
|
2024-01-06 22:36:50 +08:00
|
|
|
|
"database/sql"
|
2024-01-05 05:31:32 +00:00
|
|
|
|
"encoding/json"
|
|
|
|
|
|
"fmt"
|
2024-01-12 13:15:30 +08:00
|
|
|
|
"mayfly-go/internal/db/dbm/dbi"
|
2024-01-05 05:31:32 +00:00
|
|
|
|
"mayfly-go/internal/db/domain/entity"
|
|
|
|
|
|
"mayfly-go/internal/db/domain/repository"
|
|
|
|
|
|
"mayfly-go/pkg/base"
|
2024-01-06 22:36:50 +08:00
|
|
|
|
"mayfly-go/pkg/errorx"
|
2024-01-05 05:31:32 +00:00
|
|
|
|
"mayfly-go/pkg/gormx"
|
|
|
|
|
|
"mayfly-go/pkg/logx"
|
|
|
|
|
|
"mayfly-go/pkg/model"
|
|
|
|
|
|
"mayfly-go/pkg/scheduler"
|
2024-01-24 08:29:16 +00:00
|
|
|
|
"regexp"
|
|
|
|
|
|
"strconv"
|
|
|
|
|
|
"strings"
|
2024-01-05 05:31:32 +00:00
|
|
|
|
"time"
|
2024-01-30 21:56:49 +08:00
|
|
|
|
|
|
|
|
|
|
"github.com/google/uuid"
|
2024-01-05 05:31:32 +00:00
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
type DataSyncTask interface {
|
|
|
|
|
|
base.App[*entity.DataSyncTask]
|
|
|
|
|
|
|
|
|
|
|
|
// GetPageList 分页获取数据库实例
|
|
|
|
|
|
GetPageList(condition *entity.DataSyncTaskQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error)
|
|
|
|
|
|
|
|
|
|
|
|
Save(ctx context.Context, instanceEntity *entity.DataSyncTask) error
|
|
|
|
|
|
|
|
|
|
|
|
Delete(ctx context.Context, id uint64) error
|
|
|
|
|
|
|
|
|
|
|
|
InitCronJob()
|
|
|
|
|
|
|
|
|
|
|
|
AddCronJob(taskEntity *entity.DataSyncTask)
|
2024-01-05 22:16:38 +08:00
|
|
|
|
|
2024-01-07 21:46:25 +08:00
|
|
|
|
RemoveCronJobById(taskId uint64)
|
2024-01-05 22:16:38 +08:00
|
|
|
|
|
2024-01-06 22:36:50 +08:00
|
|
|
|
RunCronJob(id uint64) error
|
2024-01-05 22:16:38 +08:00
|
|
|
|
|
|
|
|
|
|
GetTaskLogList(condition *entity.DataSyncLogQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error)
|
2024-01-05 05:31:32 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
type dataSyncAppImpl struct {
|
|
|
|
|
|
base.AppImpl[*entity.DataSyncTask, repository.DataSyncTask]
|
2024-01-05 22:16:38 +08:00
|
|
|
|
|
2024-01-23 19:30:28 +08:00
|
|
|
|
dbDataSyncLogRepo repository.DataSyncLog `inject:"DbDataSyncLogRepo"`
|
2024-01-29 04:20:23 +00:00
|
|
|
|
|
|
|
|
|
|
dbApp Db `inject:"DbApp"`
|
2024-01-21 22:52:20 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2024-01-24 08:29:16 +00:00
|
|
|
|
var (
|
|
|
|
|
|
dateTimeReg = regexp.MustCompile(`^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$`)
|
2024-03-01 04:03:03 +00:00
|
|
|
|
whereReg = regexp.MustCompile(`(?i)where`)
|
2024-01-24 08:29:16 +00:00
|
|
|
|
)
|
|
|
|
|
|
|
2024-01-30 13:09:26 +00:00
|
|
|
|
func (app *dataSyncAppImpl) InjectDbDataSyncTaskRepo(repo repository.DataSyncTask) {
|
|
|
|
|
|
app.Repo = repo
|
2024-01-05 05:31:32 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (app *dataSyncAppImpl) GetPageList(condition *entity.DataSyncTaskQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) {
|
|
|
|
|
|
return app.GetRepo().GetTaskList(condition, pageParam, toEntity, orderBy...)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (app *dataSyncAppImpl) Save(ctx context.Context, taskEntity *entity.DataSyncTask) error {
|
2024-01-07 21:46:25 +08:00
|
|
|
|
var err error
|
2024-01-05 05:31:32 +00:00
|
|
|
|
if taskEntity.Id == 0 {
|
2024-01-30 13:09:26 +00:00
|
|
|
|
// 新建时生成key
|
|
|
|
|
|
taskEntity.TaskKey = uuid.New().String()
|
2024-01-07 21:46:25 +08:00
|
|
|
|
err = app.Insert(ctx, taskEntity)
|
|
|
|
|
|
} else {
|
|
|
|
|
|
err = app.UpdateById(ctx, taskEntity)
|
2024-01-05 05:31:32 +00:00
|
|
|
|
}
|
2024-01-30 13:09:26 +00:00
|
|
|
|
|
2024-01-07 21:46:25 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2024-01-30 13:09:26 +00:00
|
|
|
|
task, err := app.GetById(new(entity.DataSyncTask), taskEntity.Id)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
app.AddCronJob(task)
|
2024-01-07 21:46:25 +08:00
|
|
|
|
return nil
|
2024-01-05 05:31:32 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (app *dataSyncAppImpl) Delete(ctx context.Context, id uint64) error {
|
2024-01-07 21:46:25 +08:00
|
|
|
|
if err := app.DeleteById(ctx, id); err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
app.RemoveCronJobById(id)
|
|
|
|
|
|
return nil
|
2024-01-05 05:31:32 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (app *dataSyncAppImpl) AddCronJob(taskEntity *entity.DataSyncTask) {
|
|
|
|
|
|
key := taskEntity.TaskKey
|
|
|
|
|
|
// 先移除旧的任务
|
|
|
|
|
|
scheduler.RemoveByKey(key)
|
|
|
|
|
|
|
|
|
|
|
|
// 根据状态添加新的任务
|
|
|
|
|
|
if taskEntity.Status == entity.DataSyncTaskStatusEnable {
|
2024-01-30 21:56:49 +08:00
|
|
|
|
taskId := taskEntity.Id
|
2024-01-05 05:31:32 +00:00
|
|
|
|
scheduler.AddFunByKey(key, taskEntity.TaskCron, func() {
|
2024-01-30 21:56:49 +08:00
|
|
|
|
logx.Infof("开始执行同步任务: %d", taskId)
|
|
|
|
|
|
if err := app.RunCronJob(taskId); err != nil {
|
|
|
|
|
|
logx.Errorf("定时执行数据同步任务失败: %s", err.Error())
|
|
|
|
|
|
}
|
2024-01-05 05:31:32 +00:00
|
|
|
|
})
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2024-01-07 21:46:25 +08:00
|
|
|
|
func (app *dataSyncAppImpl) RemoveCronJobById(taskId uint64) {
|
|
|
|
|
|
task, err := app.GetById(new(entity.DataSyncTask), taskId)
|
|
|
|
|
|
if err == nil {
|
|
|
|
|
|
scheduler.RemoveByKey(task.TaskKey)
|
2024-01-05 05:31:32 +00:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (app *dataSyncAppImpl) changeRunningState(id uint64, state int8) {
|
|
|
|
|
|
task := new(entity.DataSyncTask)
|
|
|
|
|
|
task.Id = id
|
|
|
|
|
|
task.RunningState = state
|
|
|
|
|
|
_ = app.UpdateById(context.Background(), task)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2024-01-06 22:36:50 +08:00
|
|
|
|
func (app *dataSyncAppImpl) RunCronJob(id uint64) error {
|
2024-01-05 05:31:32 +00:00
|
|
|
|
// 查询最新的任务信息
|
|
|
|
|
|
task, err := app.GetById(new(entity.DataSyncTask), id)
|
2024-01-05 22:16:38 +08:00
|
|
|
|
if err != nil {
|
2024-01-06 22:36:50 +08:00
|
|
|
|
return errorx.NewBiz("任务不存在")
|
2024-01-05 22:16:38 +08:00
|
|
|
|
}
|
2024-01-05 05:31:32 +00:00
|
|
|
|
if task.RunningState == entity.DataSyncTaskRunStateRunning {
|
2024-01-06 22:36:50 +08:00
|
|
|
|
return errorx.NewBiz("该任务正在执行中")
|
2024-01-05 05:31:32 +00:00
|
|
|
|
}
|
|
|
|
|
|
// 开始运行时,修改状态为运行中
|
|
|
|
|
|
app.changeRunningState(id, entity.DataSyncTaskRunStateRunning)
|
|
|
|
|
|
|
2024-01-06 22:36:50 +08:00
|
|
|
|
logx.Infof("开始执行数据同步任务:%s => %s", task.TaskName, task.TaskKey)
|
2024-01-05 05:31:32 +00:00
|
|
|
|
|
2024-01-06 22:36:50 +08:00
|
|
|
|
go func() {
|
|
|
|
|
|
// 通过占位符格式化sql
|
|
|
|
|
|
updSql := ""
|
|
|
|
|
|
orderSql := ""
|
|
|
|
|
|
if task.UpdFieldVal != "0" && task.UpdFieldVal != "" && task.UpdField != "" {
|
2024-03-01 04:03:03 +00:00
|
|
|
|
srcConn, err := app.dbApp.GetDbConn(uint64(task.SrcDbId), task.SrcDbName)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
logx.Errorf("数据源连接不可用, %s", err.Error())
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
2024-01-24 08:29:16 +00:00
|
|
|
|
|
|
|
|
|
|
task.UpdFieldVal = strings.Trim(task.UpdFieldVal, " ")
|
|
|
|
|
|
// 把UpdFieldVal尝试转为int,如果可以转为int,则不添加引号,否则添加引号
|
2024-03-01 04:03:03 +00:00
|
|
|
|
if _, err = strconv.Atoi(task.UpdFieldVal); err != nil {
|
2024-01-24 08:29:16 +00:00
|
|
|
|
updSql = fmt.Sprintf("and %s > '%s'", task.UpdField, task.UpdFieldVal)
|
|
|
|
|
|
} else {
|
|
|
|
|
|
updSql = fmt.Sprintf("and %s > %s", task.UpdField, task.UpdFieldVal)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 如果是oracle且数据类型是时间类型,则需要加上to_date函数
|
|
|
|
|
|
if srcConn.Info.Type == dbi.DbTypeOracle {
|
|
|
|
|
|
// 用正则判断数据类型是时间
|
|
|
|
|
|
if dateTimeReg.MatchString(task.UpdFieldVal) {
|
|
|
|
|
|
updSql = fmt.Sprintf("and %s > to_date('%s','yyyy-mm-dd hh24:mi:ss')", task.UpdField, task.UpdFieldVal)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2024-01-06 22:36:50 +08:00
|
|
|
|
orderSql = "order by " + task.UpdField + " asc "
|
|
|
|
|
|
}
|
2024-03-01 04:03:03 +00:00
|
|
|
|
// 正则判断DataSql是否以where .*结尾,如果是则不添加where 1 = 1
|
|
|
|
|
|
var where = "where 1=1"
|
|
|
|
|
|
if whereReg.MatchString(task.DataSql) {
|
|
|
|
|
|
where = ""
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2024-01-06 22:36:50 +08:00
|
|
|
|
// 组装查询sql
|
2024-03-01 04:03:03 +00:00
|
|
|
|
sql := fmt.Sprintf("%s %s %s %s", task.DataSql, where, updSql, orderSql)
|
2024-01-06 22:36:50 +08:00
|
|
|
|
|
2024-01-07 21:46:25 +08:00
|
|
|
|
log, err := app.doDataSync(sql, task)
|
2024-01-06 22:36:50 +08:00
|
|
|
|
if err != nil {
|
2024-01-07 21:46:25 +08:00
|
|
|
|
log.ErrText = fmt.Sprintf("执行失败: %s", err.Error())
|
|
|
|
|
|
log.Status = entity.DataSyncTaskStateFail
|
|
|
|
|
|
app.endRunning(task, log)
|
2024-01-06 22:36:50 +08:00
|
|
|
|
}
|
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2024-01-07 21:46:25 +08:00
|
|
|
|
func (app *dataSyncAppImpl) doDataSync(sql string, task *entity.DataSyncTask) (*entity.DataSyncLog, error) {
|
|
|
|
|
|
now := time.Now()
|
|
|
|
|
|
syncLog := &entity.DataSyncLog{
|
|
|
|
|
|
TaskId: task.Id,
|
|
|
|
|
|
CreateTime: &now,
|
|
|
|
|
|
DataSqlFull: sql,
|
|
|
|
|
|
Status: entity.DataSyncTaskStateRunning,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2024-01-05 05:31:32 +00:00
|
|
|
|
// 获取源数据库连接
|
2024-01-29 04:20:23 +00:00
|
|
|
|
srcConn, err := app.dbApp.GetDbConn(uint64(task.SrcDbId), task.SrcDbName)
|
2024-01-05 05:31:32 +00:00
|
|
|
|
if err != nil {
|
2024-01-07 21:46:25 +08:00
|
|
|
|
return syncLog, errorx.NewBiz("连接源数据库失败: %s", err.Error())
|
2024-01-05 05:31:32 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 获取目标数据库连接
|
2024-01-29 04:20:23 +00:00
|
|
|
|
targetConn, err := app.dbApp.GetDbConn(uint64(task.TargetDbId), task.TargetDbName)
|
2024-01-05 05:31:32 +00:00
|
|
|
|
if err != nil {
|
2024-01-07 21:46:25 +08:00
|
|
|
|
return syncLog, errorx.NewBiz("连接目标数据库失败: %s", err.Error())
|
2024-01-06 22:36:50 +08:00
|
|
|
|
}
|
|
|
|
|
|
targetDbTx, err := targetConn.Begin()
|
|
|
|
|
|
if err != nil {
|
2024-01-07 21:46:25 +08:00
|
|
|
|
return syncLog, errorx.NewBiz("开启目标数据库事务失败: %s", err.Error())
|
2024-01-05 05:31:32 +00:00
|
|
|
|
}
|
2024-01-06 22:36:50 +08:00
|
|
|
|
defer func() {
|
|
|
|
|
|
if r := recover(); r != nil {
|
|
|
|
|
|
targetDbTx.Rollback()
|
|
|
|
|
|
err = fmt.Errorf("%v", r)
|
|
|
|
|
|
}
|
|
|
|
|
|
}()
|
2024-01-05 05:31:32 +00:00
|
|
|
|
|
2024-03-15 13:31:53 +08:00
|
|
|
|
srcMetaData := srcConn.GetMetaData()
|
2024-01-05 05:31:32 +00:00
|
|
|
|
|
2024-01-06 22:36:50 +08:00
|
|
|
|
// task.FieldMap为json数组字符串 [{"src":"id","target":"id"}],转为map
|
|
|
|
|
|
var fieldMap []map[string]string
|
|
|
|
|
|
err = json.Unmarshal([]byte(task.FieldMap), &fieldMap)
|
|
|
|
|
|
if err != nil {
|
2024-01-07 21:46:25 +08:00
|
|
|
|
return syncLog, errorx.NewBiz("解析字段映射json出错: %s", err.Error())
|
2024-01-06 22:36:50 +08:00
|
|
|
|
}
|
2024-01-12 13:15:30 +08:00
|
|
|
|
var updFieldType dbi.DataType
|
2024-01-05 05:31:32 +00:00
|
|
|
|
|
2024-01-06 22:36:50 +08:00
|
|
|
|
// 记录本次同步数据总数
|
|
|
|
|
|
total := 0
|
|
|
|
|
|
batchSize := task.PageSize
|
|
|
|
|
|
result := make([]map[string]any, 0)
|
2024-01-12 13:15:30 +08:00
|
|
|
|
var queryColumns []*dbi.QueryColumn
|
2024-01-06 22:36:50 +08:00
|
|
|
|
|
2024-03-01 04:03:03 +00:00
|
|
|
|
// 如果有数据库别名,则从UpdField中去掉数据库别名, 如:a.id => id,用于获取字段具体名称
|
|
|
|
|
|
updFieldName := task.UpdField
|
|
|
|
|
|
if task.UpdField != "" && strings.Contains(task.UpdField, ".") {
|
|
|
|
|
|
updFieldName = strings.Split(task.UpdField, ".")[1]
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2024-01-12 13:15:30 +08:00
|
|
|
|
err = srcConn.WalkQueryRows(context.Background(), sql, func(row map[string]any, columns []*dbi.QueryColumn) error {
|
2024-01-06 22:36:50 +08:00
|
|
|
|
if len(queryColumns) == 0 {
|
|
|
|
|
|
queryColumns = columns
|
|
|
|
|
|
|
|
|
|
|
|
// 遍历columns 取task.UpdField的字段类型
|
2024-01-12 13:15:30 +08:00
|
|
|
|
updFieldType = dbi.DataTypeString
|
2024-01-06 22:36:50 +08:00
|
|
|
|
for _, column := range columns {
|
2024-03-01 04:03:03 +00:00
|
|
|
|
if strings.EqualFold(column.Name, updFieldName) {
|
2024-03-15 13:31:53 +08:00
|
|
|
|
updFieldType = srcMetaData.GetDataConverter().GetDataType(column.Type)
|
2024-01-06 22:36:50 +08:00
|
|
|
|
break
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2024-01-05 05:31:32 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
2024-01-06 22:36:50 +08:00
|
|
|
|
total++
|
|
|
|
|
|
result = append(result, row)
|
|
|
|
|
|
if total%batchSize == 0 {
|
2024-03-15 13:31:53 +08:00
|
|
|
|
if err := app.srcData2TargetDb(result, fieldMap, columns, updFieldType, updFieldName, task, srcMetaData, targetConn, targetDbTx); err != nil {
|
2024-01-06 22:36:50 +08:00
|
|
|
|
return err
|
2024-01-05 05:31:32 +00:00
|
|
|
|
}
|
2024-01-07 21:46:25 +08:00
|
|
|
|
|
|
|
|
|
|
// 记录当前已同步的数据量
|
|
|
|
|
|
syncLog.ErrText = fmt.Sprintf("本次任务执行中,已同步:%d条", total)
|
|
|
|
|
|
syncLog.ResNum = total
|
|
|
|
|
|
app.saveLog(syncLog)
|
|
|
|
|
|
|
2024-01-06 22:36:50 +08:00
|
|
|
|
result = result[:0]
|
2024-01-05 05:31:32 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
2024-01-06 22:36:50 +08:00
|
|
|
|
return nil
|
|
|
|
|
|
})
|
2024-01-05 05:31:32 +00:00
|
|
|
|
|
2024-01-06 22:36:50 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
targetDbTx.Rollback()
|
2024-01-07 21:46:25 +08:00
|
|
|
|
return syncLog, err
|
2024-01-06 22:36:50 +08:00
|
|
|
|
}
|
2024-01-05 05:31:32 +00:00
|
|
|
|
|
2024-01-06 22:36:50 +08:00
|
|
|
|
// 处理剩余的数据
|
|
|
|
|
|
if len(result) > 0 {
|
2024-03-15 13:31:53 +08:00
|
|
|
|
if err := app.srcData2TargetDb(result, fieldMap, queryColumns, updFieldType, updFieldName, task, srcMetaData, targetConn, targetDbTx); err != nil {
|
2024-01-06 22:36:50 +08:00
|
|
|
|
targetDbTx.Rollback()
|
2024-01-07 21:46:25 +08:00
|
|
|
|
return syncLog, err
|
2024-01-05 05:31:32 +00:00
|
|
|
|
}
|
2024-01-06 22:36:50 +08:00
|
|
|
|
}
|
2024-01-05 05:31:32 +00:00
|
|
|
|
|
2024-03-01 04:03:03 +00:00
|
|
|
|
// 如果是mssql,暂不手动提交事务,否则报错 mssql: The COMMIT TRANSACTION request has no corresponding BEGIN TRANSACTION.
|
2024-01-06 22:36:50 +08:00
|
|
|
|
if err := targetDbTx.Commit(); err != nil {
|
2024-03-01 04:03:03 +00:00
|
|
|
|
if targetConn.Info.Type != dbi.DbTypeMssql {
|
|
|
|
|
|
return syncLog, errorx.NewBiz("数据同步-目标数据库事务提交失败: %s", err.Error())
|
|
|
|
|
|
}
|
2024-01-06 22:36:50 +08:00
|
|
|
|
}
|
2024-03-01 04:03:03 +00:00
|
|
|
|
|
2024-01-06 22:36:50 +08:00
|
|
|
|
logx.Infof("同步任务:[%s],执行完毕,保存记录成功:[%d]条", task.TaskName, total)
|
2024-01-05 05:31:32 +00:00
|
|
|
|
|
2024-01-06 22:36:50 +08:00
|
|
|
|
// 保存执行成功日志
|
2024-01-07 21:46:25 +08:00
|
|
|
|
syncLog.ErrText = fmt.Sprintf("本次任务执行成功,新数据:%d 条", total)
|
|
|
|
|
|
syncLog.Status = entity.DataSyncTaskStateSuccess
|
|
|
|
|
|
syncLog.ResNum = total
|
|
|
|
|
|
app.endRunning(task, syncLog)
|
|
|
|
|
|
|
|
|
|
|
|
return syncLog, nil
|
2024-01-06 22:36:50 +08:00
|
|
|
|
}
|
2024-01-05 05:31:32 +00:00
|
|
|
|
|
2024-03-15 13:31:53 +08:00
|
|
|
|
func (app *dataSyncAppImpl) srcData2TargetDb(srcRes []map[string]any, fieldMap []map[string]string, columns []*dbi.QueryColumn, updFieldType dbi.DataType, updFieldName string, task *entity.DataSyncTask, srcMetaData *dbi.MetaDataX, targetDbConn *dbi.DbConn, targetDbTx *sql.Tx) error {
|
2024-01-24 08:29:16 +00:00
|
|
|
|
|
|
|
|
|
|
// 遍历src字段列表,取出字段对应的类型
|
|
|
|
|
|
var srcColumnTypes = make(map[string]string)
|
|
|
|
|
|
for _, column := range columns {
|
|
|
|
|
|
srcColumnTypes[column.Name] = column.Type
|
|
|
|
|
|
}
|
2024-01-05 05:31:32 +00:00
|
|
|
|
|
2024-01-24 08:29:16 +00:00
|
|
|
|
// 遍历res,组装数据
|
|
|
|
|
|
var data = make([]map[string]any, 0)
|
2024-01-06 22:36:50 +08:00
|
|
|
|
for _, record := range srcRes {
|
|
|
|
|
|
var rowData = make(map[string]any)
|
|
|
|
|
|
// 遍历字段映射, target字段的值为src字段取值
|
|
|
|
|
|
for _, item := range fieldMap {
|
|
|
|
|
|
srcField := item["src"]
|
|
|
|
|
|
targetField := item["target"]
|
|
|
|
|
|
// target字段的值为src字段取值
|
|
|
|
|
|
rowData[targetField] = record[srcField]
|
2024-01-05 05:31:32 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
2024-01-06 22:36:50 +08:00
|
|
|
|
data = append(data, rowData)
|
|
|
|
|
|
}
|
2024-01-24 08:29:16 +00:00
|
|
|
|
// 解决字段大小写问题
|
2024-03-01 04:03:03 +00:00
|
|
|
|
updFieldVal := srcRes[len(srcRes)-1][strings.ToUpper(updFieldName)]
|
2024-01-29 04:20:23 +00:00
|
|
|
|
if updFieldVal == "" || updFieldVal == nil {
|
2024-03-01 04:03:03 +00:00
|
|
|
|
updFieldVal = srcRes[len(srcRes)-1][strings.ToLower(updFieldName)]
|
2024-01-24 08:29:16 +00:00
|
|
|
|
}
|
2024-01-05 05:31:32 +00:00
|
|
|
|
|
2024-03-15 13:31:53 +08:00
|
|
|
|
task.UpdFieldVal = srcMetaData.GetDataConverter().FormatData(updFieldVal, updFieldType)
|
2024-01-05 05:31:32 +00:00
|
|
|
|
|
2024-01-06 22:36:50 +08:00
|
|
|
|
// 获取目标库字段数组
|
|
|
|
|
|
targetWrapColumns := make([]string, 0)
|
|
|
|
|
|
// 获取源库字段数组
|
|
|
|
|
|
srcColumns := make([]string, 0)
|
2024-01-24 08:29:16 +00:00
|
|
|
|
srcFieldTypes := make(map[string]dbi.DataType)
|
2024-03-15 13:31:53 +08:00
|
|
|
|
targetMetaData := targetDbConn.GetMetaData()
|
2024-01-06 22:36:50 +08:00
|
|
|
|
for _, item := range fieldMap {
|
|
|
|
|
|
targetField := item["target"]
|
|
|
|
|
|
srcField := item["target"]
|
2024-03-15 13:31:53 +08:00
|
|
|
|
srcFieldTypes[srcField] = srcMetaData.GetDataConverter().GetDataType(srcColumnTypes[item["src"]])
|
|
|
|
|
|
targetWrapColumns = append(targetWrapColumns, targetMetaData.QuoteIdentifier(targetField))
|
2024-01-06 22:36:50 +08:00
|
|
|
|
srcColumns = append(srcColumns, srcField)
|
2024-01-05 05:31:32 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
2024-01-06 22:36:50 +08:00
|
|
|
|
// 从目标库数据中取出源库字段对应的值
|
|
|
|
|
|
values := make([][]any, 0)
|
|
|
|
|
|
for _, record := range data {
|
|
|
|
|
|
rawValue := make([]any, 0)
|
|
|
|
|
|
for _, column := range srcColumns {
|
2024-01-24 08:29:16 +00:00
|
|
|
|
// 某些情况,如oracle,需要转换时间类型的字符串为time类型
|
2024-03-15 13:31:53 +08:00
|
|
|
|
res := srcMetaData.GetDataConverter().ParseData(record[column], srcFieldTypes[column])
|
2024-01-24 08:29:16 +00:00
|
|
|
|
rawValue = append(rawValue, res)
|
2024-01-06 22:36:50 +08:00
|
|
|
|
}
|
|
|
|
|
|
values = append(values, rawValue)
|
|
|
|
|
|
}
|
2024-01-05 05:31:32 +00:00
|
|
|
|
|
2024-01-06 22:36:50 +08:00
|
|
|
|
// 目标数据库执行sql批量插入
|
2024-03-01 04:03:03 +00:00
|
|
|
|
_, err := targetDbConn.GetDialect().BatchInsert(targetDbTx, task.TargetTableName, targetWrapColumns, values, task.DuplicateStrategy)
|
2024-01-06 22:36:50 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
2024-01-05 05:31:32 +00:00
|
|
|
|
|
2024-01-06 22:36:50 +08:00
|
|
|
|
// 运行过程中,判断状态是否为已关闭,是则结束运行,否则继续运行
|
|
|
|
|
|
taskParam, _ := app.GetById(new(entity.DataSyncTask), task.Id)
|
|
|
|
|
|
if taskParam.RunningState == entity.DataSyncTaskRunStateStop {
|
|
|
|
|
|
return errorx.NewBiz("该任务已被手动终止")
|
|
|
|
|
|
}
|
2024-01-05 05:31:32 +00:00
|
|
|
|
|
2024-01-06 22:36:50 +08:00
|
|
|
|
return nil
|
2024-01-05 05:31:32 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
2024-01-07 21:46:25 +08:00
|
|
|
|
func (app *dataSyncAppImpl) endRunning(taskEntity *entity.DataSyncTask, log *entity.DataSyncLog) {
|
|
|
|
|
|
logx.Info(log.ErrText)
|
2024-01-05 05:31:32 +00:00
|
|
|
|
|
2024-01-07 21:46:25 +08:00
|
|
|
|
state := log.Status
|
2024-01-05 05:31:32 +00:00
|
|
|
|
task := new(entity.DataSyncTask)
|
|
|
|
|
|
task.Id = taskEntity.Id
|
|
|
|
|
|
task.RecentState = state
|
2024-01-06 22:36:50 +08:00
|
|
|
|
if state == entity.DataSyncTaskStateSuccess {
|
|
|
|
|
|
task.UpdFieldVal = taskEntity.UpdFieldVal
|
|
|
|
|
|
}
|
2024-01-05 05:31:32 +00:00
|
|
|
|
task.RunningState = entity.DataSyncTaskRunStateReady
|
|
|
|
|
|
// 运行失败之后设置任务状态为禁用
|
|
|
|
|
|
//if state == entity.DataSyncTaskStateFail {
|
|
|
|
|
|
// taskEntity.Status = entity.DataSyncTaskStatusDisable
|
|
|
|
|
|
// app.RemoveCronJob(taskEntity)
|
|
|
|
|
|
//}
|
|
|
|
|
|
_ = app.UpdateById(context.Background(), task)
|
|
|
|
|
|
// 保存执行日志
|
2024-01-07 21:46:25 +08:00
|
|
|
|
app.saveLog(log)
|
2024-01-05 05:31:32 +00:00
|
|
|
|
}
|
2024-01-05 22:16:38 +08:00
|
|
|
|
|
2024-01-07 21:46:25 +08:00
|
|
|
|
func (app *dataSyncAppImpl) saveLog(log *entity.DataSyncLog) {
|
2024-01-23 19:30:28 +08:00
|
|
|
|
app.dbDataSyncLogRepo.Save(context.Background(), log)
|
2024-01-05 05:31:32 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (app *dataSyncAppImpl) InitCronJob() {
|
|
|
|
|
|
defer func() {
|
|
|
|
|
|
if err := recover(); err != nil {
|
|
|
|
|
|
logx.ErrorTrace("数据同步任务初始化失败: %s", err.(error))
|
|
|
|
|
|
}
|
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
|
|
// 修改执行状态为待执行
|
|
|
|
|
|
updateMap := map[string]interface{}{
|
|
|
|
|
|
"running_state": entity.DataSyncTaskRunStateReady,
|
|
|
|
|
|
}
|
|
|
|
|
|
taskParam := new(entity.DataSyncTask)
|
|
|
|
|
|
taskParam.RunningState = 1
|
|
|
|
|
|
_ = gormx.Updates(taskParam, taskParam, updateMap)
|
|
|
|
|
|
|
|
|
|
|
|
// 把所有正常任务添加到定时任务中
|
|
|
|
|
|
pageParam := &model.PageParam{
|
|
|
|
|
|
PageSize: 100,
|
|
|
|
|
|
PageNum: 1,
|
|
|
|
|
|
}
|
|
|
|
|
|
cond := new(entity.DataSyncTaskQuery)
|
|
|
|
|
|
cond.Status = entity.DataSyncTaskStatusEnable
|
|
|
|
|
|
jobs := new([]entity.DataSyncTask)
|
|
|
|
|
|
|
|
|
|
|
|
pr, _ := app.GetPageList(cond, pageParam, jobs)
|
|
|
|
|
|
total := pr.Total
|
|
|
|
|
|
add := 0
|
|
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
|
for _, job := range *jobs {
|
|
|
|
|
|
app.AddCronJob(&job)
|
|
|
|
|
|
add++
|
|
|
|
|
|
}
|
|
|
|
|
|
if add >= int(total) {
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
pageParam.PageNum++
|
|
|
|
|
|
_, _ = app.GetPageList(cond, pageParam, jobs)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2024-01-05 22:16:38 +08:00
|
|
|
|
|
|
|
|
|
|
func (app *dataSyncAppImpl) GetTaskLogList(condition *entity.DataSyncLogQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) {
|
2024-01-23 19:30:28 +08:00
|
|
|
|
return app.dbDataSyncLogRepo.GetTaskLogList(condition, pageParam, toEntity, orderBy...)
|
2024-01-05 22:16:38 +08:00
|
|
|
|
}
|