refactor: dbm包重构

This commit is contained in:
meilin.huang
2024-03-15 13:31:53 +08:00
parent eb55f93864
commit 263dfa6be7
34 changed files with 725 additions and 789 deletions

View File

@@ -218,7 +218,7 @@ func (app *dataSyncAppImpl) doDataSync(sql string, task *entity.DataSyncTask) (*
}
}()
srcDialect := srcConn.GetDialect()
srcMetaData := srcConn.GetMetaData()
// task.FieldMap为json数组字符串 [{"src":"id","target":"id"}]转为map
var fieldMap []map[string]string
@@ -248,7 +248,7 @@ func (app *dataSyncAppImpl) doDataSync(sql string, task *entity.DataSyncTask) (*
updFieldType = dbi.DataTypeString
for _, column := range columns {
if strings.EqualFold(column.Name, updFieldName) {
updFieldType = srcDialect.GetDataConverter().GetDataType(column.Type)
updFieldType = srcMetaData.GetDataConverter().GetDataType(column.Type)
break
}
}
@@ -257,7 +257,7 @@ func (app *dataSyncAppImpl) doDataSync(sql string, task *entity.DataSyncTask) (*
total++
result = append(result, row)
if total%batchSize == 0 {
if err := app.srcData2TargetDb(result, fieldMap, columns, updFieldType, updFieldName, task, srcDialect, targetConn, targetDbTx); err != nil {
if err := app.srcData2TargetDb(result, fieldMap, columns, updFieldType, updFieldName, task, srcMetaData, targetConn, targetDbTx); err != nil {
return err
}
@@ -279,7 +279,7 @@ func (app *dataSyncAppImpl) doDataSync(sql string, task *entity.DataSyncTask) (*
// 处理剩余的数据
if len(result) > 0 {
if err := app.srcData2TargetDb(result, fieldMap, queryColumns, updFieldType, updFieldName, task, srcDialect, targetConn, targetDbTx); err != nil {
if err := app.srcData2TargetDb(result, fieldMap, queryColumns, updFieldType, updFieldName, task, srcMetaData, targetConn, targetDbTx); err != nil {
targetDbTx.Rollback()
return syncLog, err
}
@@ -303,7 +303,7 @@ func (app *dataSyncAppImpl) doDataSync(sql string, task *entity.DataSyncTask) (*
return syncLog, nil
}
func (app *dataSyncAppImpl) srcData2TargetDb(srcRes []map[string]any, fieldMap []map[string]string, columns []*dbi.QueryColumn, updFieldType dbi.DataType, updFieldName string, task *entity.DataSyncTask, srcDialect dbi.Dialect, targetDbConn *dbi.DbConn, targetDbTx *sql.Tx) error {
func (app *dataSyncAppImpl) srcData2TargetDb(srcRes []map[string]any, fieldMap []map[string]string, columns []*dbi.QueryColumn, updFieldType dbi.DataType, updFieldName string, task *entity.DataSyncTask, srcMetaData *dbi.MetaDataX, targetDbConn *dbi.DbConn, targetDbTx *sql.Tx) error {
// 遍历src字段列表取出字段对应的类型
var srcColumnTypes = make(map[string]string)
@@ -331,18 +331,19 @@ func (app *dataSyncAppImpl) srcData2TargetDb(srcRes []map[string]any, fieldMap [
updFieldVal = srcRes[len(srcRes)-1][strings.ToLower(updFieldName)]
}
task.UpdFieldVal = srcDialect.GetDataConverter().FormatData(updFieldVal, updFieldType)
task.UpdFieldVal = srcMetaData.GetDataConverter().FormatData(updFieldVal, updFieldType)
// 获取目标库字段数组
targetWrapColumns := make([]string, 0)
// 获取源库字段数组
srcColumns := make([]string, 0)
srcFieldTypes := make(map[string]dbi.DataType)
targetMetaData := targetDbConn.GetMetaData()
for _, item := range fieldMap {
targetField := item["target"]
srcField := item["target"]
srcFieldTypes[srcField] = srcDialect.GetDataConverter().GetDataType(srcColumnTypes[item["src"]])
targetWrapColumns = append(targetWrapColumns, targetDbConn.Info.Type.QuoteIdentifier(targetField))
srcFieldTypes[srcField] = srcMetaData.GetDataConverter().GetDataType(srcColumnTypes[item["src"]])
targetWrapColumns = append(targetWrapColumns, targetMetaData.QuoteIdentifier(targetField))
srcColumns = append(srcColumns, srcField)
}
@@ -352,7 +353,7 @@ func (app *dataSyncAppImpl) srcData2TargetDb(srcRes []map[string]any, fieldMap [
rawValue := make([]any, 0)
for _, column := range srcColumns {
// 某些情况如oracle需要转换时间类型的字符串为time类型
res := srcDialect.GetDataConverter().ParseData(record[column], srcFieldTypes[column])
res := srcMetaData.GetDataConverter().ParseData(record[column], srcFieldTypes[column])
rawValue = append(rawValue, res)
}
values = append(values, rawValue)