refactor: dbm

This commit is contained in:
meilin.huang
2024-12-08 13:04:23 +08:00
parent ebc89e056f
commit e56788af3e
152 changed files with 4273 additions and 3715 deletions

View File

@@ -1,6 +1,7 @@
package application
import (
"cmp"
"context"
"database/sql"
"encoding/json"
@@ -14,8 +15,8 @@ import (
"mayfly-go/pkg/logx"
"mayfly-go/pkg/model"
"mayfly-go/pkg/scheduler"
"mayfly-go/pkg/utils/collx"
"regexp"
"strconv"
"strings"
"time"
@@ -145,28 +146,11 @@ func (app *dataSyncAppImpl) RunCronJob(ctx context.Context, id uint64) error {
updSql := ""
orderSql := ""
if task.UpdFieldVal != "0" && task.UpdFieldVal != "" && task.UpdField != "" {
srcConn, err := app.dbApp.GetDbConn(uint64(task.SrcDbId), task.SrcDbName)
if err != nil {
logx.ErrorfContext(ctx, "data source connection unavailable: %s", err.Error())
return
}
task.UpdFieldVal = strings.Trim(task.UpdFieldVal, " ")
// 判断UpdFieldVal数据类型
var updFieldValType dbi.DataType
if _, err = strconv.Atoi(task.UpdFieldVal); err != nil {
if dateTimeReg.MatchString(task.UpdFieldVal) || dateTimeIsoReg.MatchString(task.UpdFieldVal) {
updFieldValType = dbi.DataTypeDateTime
} else {
updFieldValType = dbi.DataTypeString
}
} else {
updFieldValType = dbi.DataTypeNumber
}
wrapUpdFieldVal := srcConn.GetDialect().GetDataHelper().WrapValue(task.UpdFieldVal, updFieldValType)
updSql = fmt.Sprintf("and %s > %s", task.UpdField, wrapUpdFieldVal)
updSql = fmt.Sprintf("and %s > %s", task.UpdField, strings.Trim(task.UpdFieldVal, " "))
orderSql = "order by " + task.UpdField + " asc "
}
// 正则判断DataSql是否以where .*结尾如果是则不添加where 1 = 1
@@ -221,15 +205,13 @@ func (app *dataSyncAppImpl) doDataSync(ctx context.Context, sql string, task *en
}
}()
srcDialect := srcConn.GetDialect()
// task.FieldMap为json数组字符串 [{"src":"id","target":"id"}]转为map
var fieldMap []map[string]string
err = json.Unmarshal([]byte(task.FieldMap), &fieldMap)
if err != nil {
return syncLog, errorx.NewBiz("there was an error parsing the field map json: %s", err.Error())
}
var updFieldType dbi.DataType
var updFieldType *dbi.DbDataType
// 记录本次同步数据总数
total := 0
@@ -243,15 +225,28 @@ func (app *dataSyncAppImpl) doDataSync(ctx context.Context, sql string, task *en
updFieldName = strings.Split(task.UpdField, ".")[1]
}
targetTableColumns, err := targetConn.GetMetadata().GetColumns(task.TargetTableName)
if err != nil {
return syncLog, errorx.NewBiz("failed to get target table columns: %s", err.Error())
}
targetColumnName2Column := collx.ArrayToMap(targetTableColumns, func(column dbi.Column) string {
return column.ColumnName
})
// 目标库对应的insert columns
targetInsertColumns := collx.ArrayMap[map[string]string, dbi.Column](fieldMap, func(val map[string]string) dbi.Column {
return targetColumnName2Column[val["target"]]
})
_, err = srcConn.WalkQueryRows(context.Background(), sql, func(row map[string]any, columns []*dbi.QueryColumn) error {
if len(queryColumns) == 0 {
queryColumns = columns
// 遍历columns 取task.UpdField的字段类型
updFieldType = dbi.DataTypeString
updFieldType = dbi.DefaultDbDataType
for _, column := range columns {
if strings.EqualFold(column.Name, updFieldName) {
updFieldType = srcDialect.GetDataHelper().GetDataType(column.Type)
updFieldType = column.DbDataType
break
}
}
@@ -260,7 +255,7 @@ func (app *dataSyncAppImpl) doDataSync(ctx context.Context, sql string, task *en
total++
result = append(result, row)
if total%batchSize == 0 {
if err := app.srcData2TargetDb(result, fieldMap, columns, updFieldType, updFieldName, task, srcDialect, targetConn, targetDbTx); err != nil {
if err := app.srcData2TargetDb(result, fieldMap, updFieldType, updFieldName, task, targetConn, targetDbTx, targetInsertColumns); err != nil {
return err
}
@@ -283,7 +278,7 @@ func (app *dataSyncAppImpl) doDataSync(ctx context.Context, sql string, task *en
// 处理剩余的数据
if len(result) > 0 {
if err := app.srcData2TargetDb(result, fieldMap, queryColumns, updFieldType, updFieldName, task, srcDialect, targetConn, targetDbTx); err != nil {
if err := app.srcData2TargetDb(result, fieldMap, updFieldType, updFieldName, task, targetConn, targetDbTx, targetInsertColumns); err != nil {
targetDbTx.Rollback()
return syncLog, err
}
@@ -291,7 +286,7 @@ func (app *dataSyncAppImpl) doDataSync(ctx context.Context, sql string, task *en
// 如果是mssql暂不手动提交事务否则报错 mssql: The COMMIT TRANSACTION request has no corresponding BEGIN TRANSACTION.
if err := targetDbTx.Commit(); err != nil {
if targetConn.Info.Type != dbi.DbTypeMssql {
if targetConn.Info.Type != dbi.ToDbType("mssql") {
return syncLog, errorx.NewBiz("data synchronization - The target database transaction failed to commit: %s", err.Error())
}
}
@@ -307,36 +302,38 @@ func (app *dataSyncAppImpl) doDataSync(ctx context.Context, sql string, task *en
return syncLog, nil
}
func (app *dataSyncAppImpl) srcData2TargetDb(srcRes []map[string]any, fieldMap []map[string]string, columns []*dbi.QueryColumn, updFieldType dbi.DataType, updFieldName string, task *entity.DataSyncTask, srcDialect dbi.Dialect, targetDbConn *dbi.DbConn, targetDbTx *sql.Tx) error {
// 遍历src字段列表取出字段对应的类型
var srcColumnTypes = make(map[string]string)
for _, column := range columns {
srcColumnTypes[column.Name] = column.Type
}
func (app *dataSyncAppImpl) srcData2TargetDb(srcRes []map[string]any, fieldMap []map[string]string, updFieldType *dbi.DbDataType, updFieldName string, task *entity.DataSyncTask, targetDbConn *dbi.DbConn, targetDbTx *sql.Tx, targetInsertColumns []dbi.Column) error {
// 遍历res组装数据
var data = make([]map[string]any, 0)
for _, record := range srcRes {
var rowData = make(map[string]any)
var targetData = make([]map[string]any, 0)
for _, srcData := range srcRes {
var data = make(map[string]any)
// 遍历字段映射, target字段的值为src字段取值
for _, item := range fieldMap {
srcField := item["src"]
targetField := item["target"]
// target字段的值为src字段取值
rowData[targetField] = record[srcField]
data[item["target"]] = srcData[item["src"]]
}
data = append(data, rowData)
targetData = append(targetData, data)
}
tragetValues := make([][]any, 0)
for _, item := range targetData {
var values = make([]any, 0)
for _, column := range targetInsertColumns {
values = append(values, item[column.ColumnName])
}
tragetValues = append(tragetValues, values)
}
// 执行插入
setUpdateFieldVal := func(field string) {
// 解决字段大小写问题
updFieldVal := srcRes[len(srcRes)-1][strings.ToUpper(field)]
if updFieldVal == "" || updFieldVal == nil {
updFieldVal = srcRes[len(srcRes)-1][strings.ToLower(field)]
}
task.UpdFieldVal = srcDialect.GetDataHelper().FormatData(updFieldVal, updFieldType)
task.UpdFieldVal = updFieldType.DataType.SQLValue(updFieldVal)
}
// 如果指定了更新字段,则以更新字段取值
@@ -346,36 +343,15 @@ func (app *dataSyncAppImpl) srcData2TargetDb(srcRes []map[string]any, fieldMap [
setUpdateFieldVal(updFieldName)
}
// 获取目标库字段数组
targetWrapColumns := make([]string, 0)
// 获取源库字段数组
srcColumns := make([]string, 0)
srcFieldTypes := make(map[string]dbi.DataType)
targetDialect := targetDbConn.GetDialect()
for _, item := range fieldMap {
targetField := item["target"]
srcField := item["target"]
srcFieldTypes[srcField] = srcDialect.GetDataHelper().GetDataType(srcColumnTypes[item["src"]])
targetWrapColumns = append(targetWrapColumns, targetDialect.QuoteIdentifier(targetField))
srcColumns = append(srcColumns, srcField)
}
// 目标数据中取出源库字段对应的值
values := make([][]any, 0)
for _, record := range data {
rawValue := make([]any, 0)
for _, column := range srcColumns {
// 某些情况如oracle需要转换时间类型的字符串为time类型
res := srcDialect.GetDataHelper().ParseData(record[column], srcFieldTypes[column])
rawValue = append(rawValue, res)
// 生成目标数据库批量插入sql并执行
sqls := targetDialect.GetSQLGenerator().GenInsert(task.TargetTableName, targetInsertColumns, tragetValues, cmp.Or(task.DuplicateStrategy, dbi.DuplicateStrategyNone))
for _, sql := range sqls {
_, err := targetDbTx.Exec(sql)
if err != nil {
return err
}
values = append(values, rawValue)
}
// 目标数据库执行sql批量插入
_, err := targetDialect.BatchInsert(targetDbTx, task.TargetTableName, targetWrapColumns, values, task.DuplicateStrategy)
if err != nil {
return err
}
// 运行过程中,判断状态是否为已关闭,是则结束运行,否则继续运行