!91 fix: oracle数据同步 bug

* fix: oracle数据同步 bug
This commit is contained in:
zongyangleo
2024-01-24 08:29:16 +00:00
committed by Coder慌
parent e4d13f3377
commit bed95254d0
13 changed files with 335 additions and 182 deletions

View File

@@ -14,6 +14,9 @@ import (
"mayfly-go/pkg/logx"
"mayfly-go/pkg/model"
"mayfly-go/pkg/scheduler"
"regexp"
"strconv"
"strings"
"time"
)
@@ -44,6 +47,10 @@ type dataSyncAppImpl struct {
dbDataSyncLogRepo repository.DataSyncLog `inject:"DbDataSyncLogRepo"`
}
var (
dateTimeReg = regexp.MustCompile(`^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$`)
)
func (d *dataSyncAppImpl) InjectDbDataSyncTaskRepo(repo repository.DataSyncTask) {
d.Repo = repo
}
@@ -123,7 +130,23 @@ func (app *dataSyncAppImpl) RunCronJob(id uint64) error {
updSql := ""
orderSql := ""
if task.UpdFieldVal != "0" && task.UpdFieldVal != "" && task.UpdField != "" {
updSql = fmt.Sprintf("and %s > '%s'", task.UpdField, task.UpdFieldVal)
srcConn, _ := GetDbApp().GetDbConn(uint64(task.SrcDbId), task.SrcDbName)
task.UpdFieldVal = strings.Trim(task.UpdFieldVal, " ")
// 把UpdFieldVal尝试转为int如果可以转为int则不添加引号否则添加引号
if _, err := strconv.Atoi(task.UpdFieldVal); err != nil {
updSql = fmt.Sprintf("and %s > '%s'", task.UpdField, task.UpdFieldVal)
} else {
updSql = fmt.Sprintf("and %s > %s", task.UpdField, task.UpdFieldVal)
}
// 如果是oracle且数据类型是时间类型则需要加上to_date函数
if srcConn.Info.Type == dbi.DbTypeOracle {
// 用正则判断数据类型是时间
if dateTimeReg.MatchString(task.UpdFieldVal) {
updSql = fmt.Sprintf("and %s > to_date('%s','yyyy-mm-dd hh24:mi:ss')", task.UpdField, task.UpdFieldVal)
}
}
orderSql = "order by " + task.UpdField + " asc "
}
// 组装查询sql
@@ -194,8 +217,8 @@ func (app *dataSyncAppImpl) doDataSync(sql string, task *entity.DataSyncTask) (*
// 遍历columns 取task.UpdField的字段类型
updFieldType = dbi.DataTypeString
for _, column := range columns {
if column.Name == task.UpdField {
updFieldType = srcDialect.GetDataType(column.Type)
if strings.ToLower(column.Name) == strings.ToLower(task.UpdField) {
updFieldType = srcDialect.GetDataConverter().GetDataType(column.Type)
break
}
}
@@ -204,7 +227,7 @@ func (app *dataSyncAppImpl) doDataSync(sql string, task *entity.DataSyncTask) (*
total++
result = append(result, row)
if total%batchSize == 0 {
if err := app.srcData2TargetDb(result, fieldMap, updFieldType, task, srcDialect, targetConn, targetDbTx); err != nil {
if err := app.srcData2TargetDb(result, fieldMap, columns, updFieldType, task, srcDialect, targetConn, targetDbTx); err != nil {
return err
}
@@ -226,7 +249,7 @@ func (app *dataSyncAppImpl) doDataSync(sql string, task *entity.DataSyncTask) (*
// 处理剩余的数据
if len(result) > 0 {
if err := app.srcData2TargetDb(result, fieldMap, updFieldType, task, srcDialect, targetConn, targetDbTx); err != nil {
if err := app.srcData2TargetDb(result, fieldMap, queryColumns, updFieldType, task, srcDialect, targetConn, targetDbTx); err != nil {
targetDbTx.Rollback()
return syncLog, err
}
@@ -246,10 +269,16 @@ func (app *dataSyncAppImpl) doDataSync(sql string, task *entity.DataSyncTask) (*
return syncLog, nil
}
func (app *dataSyncAppImpl) srcData2TargetDb(srcRes []map[string]any, fieldMap []map[string]string, updFieldType dbi.DataType, task *entity.DataSyncTask, srcDialect dbi.Dialect, targetDbConn *dbi.DbConn, targetDbTx *sql.Tx) error {
var data = make([]map[string]any, 0)
func (app *dataSyncAppImpl) srcData2TargetDb(srcRes []map[string]any, fieldMap []map[string]string, columns []*dbi.QueryColumn, updFieldType dbi.DataType, task *entity.DataSyncTask, srcDialect dbi.Dialect, targetDbConn *dbi.DbConn, targetDbTx *sql.Tx) error {
// 遍历res组装插入sql
// 遍历src字段列表取出字段对应的类型
var srcColumnTypes = make(map[string]string)
for _, column := range columns {
srcColumnTypes[column.Name] = column.Type
}
// 遍历res组装数据
var data = make([]map[string]any, 0)
for _, record := range srcRes {
var rowData = make(map[string]any)
// 遍历字段映射, target字段的值为src字段取值
@@ -262,18 +291,23 @@ func (app *dataSyncAppImpl) srcData2TargetDb(srcRes []map[string]any, fieldMap [
data = append(data, rowData)
}
// 解决字段大小写问题
updFieldVal := srcRes[len(srcRes)-1][strings.ToUpper(task.UpdField)]
if updFieldVal == "" {
updFieldVal = srcRes[len(srcRes)-1][strings.ToLower(task.UpdField)]
}
updFieldVal := fmt.Sprintf("%v", srcRes[len(srcRes)-1][task.UpdField])
updFieldVal = srcDialect.FormatStrData(updFieldVal, updFieldType)
task.UpdFieldVal = updFieldVal
task.UpdFieldVal = srcDialect.GetDataConverter().FormatData(updFieldVal, updFieldType)
// 获取目标库字段数组
targetWrapColumns := make([]string, 0)
// 获取源库字段数组
srcColumns := make([]string, 0)
srcFieldTypes := make(map[string]dbi.DataType)
for _, item := range fieldMap {
targetField := item["target"]
srcField := item["target"]
srcFieldTypes[srcField] = srcDialect.GetDataConverter().GetDataType(srcColumnTypes[item["src"]])
targetWrapColumns = append(targetWrapColumns, targetDbConn.Info.Type.QuoteIdentifier(targetField))
srcColumns = append(srcColumns, srcField)
}
@@ -283,7 +317,9 @@ func (app *dataSyncAppImpl) srcData2TargetDb(srcRes []map[string]any, fieldMap [
for _, record := range data {
rawValue := make([]any, 0)
for _, column := range srcColumns {
rawValue = append(rawValue, record[column])
// 某些情况如oracle需要转换时间类型的字符串为time类型
res := srcDialect.GetDataConverter().ParseData(record[column], srcFieldTypes[column])
rawValue = append(rawValue, res)
}
values = append(values, rawValue)
}
@@ -294,6 +330,12 @@ func (app *dataSyncAppImpl) srcData2TargetDb(srcRes []map[string]any, fieldMap [
return err
}
// 运行完成一轮就记录一下修改字段最大值
taskParam1 := new(entity.DataSyncTask)
taskParam1.Id = task.Id
taskParam1.UpdFieldVal = task.UpdFieldVal
_ = app.UpdateById(context.Background(), taskParam1)
// 运行过程中,判断状态是否为已关闭,是则结束运行,否则继续运行
taskParam, _ := app.GetById(new(entity.DataSyncTask), task.Id)
if taskParam.RunningState == entity.DataSyncTaskRunStateStop {