!109 refactor:ddl生成方式重构,数据类型和长度重构,所有数据库迁移调试

* feat:同步sqlite全量sql
* refactor:ddl生成方式重构,数据类型和长度重构,所有数据库迁移调试
This commit is contained in:
zongyangleo
2024-03-21 03:35:18 +00:00
committed by Coder慌
parent 68e0088016
commit b13d27ccd6
27 changed files with 1309 additions and 1405 deletions

View File

@@ -169,6 +169,8 @@ func (app *dbTransferAppImpl) transferTables(task *entity.DbTransferTask, srcCon
srcDialect.ToCommonColumn(colPtr)
// 公共列转为目标库列
targetDialect.ToColumn(colPtr)
// 初始化列显示类型
colPtr.InitShowNum()
targetCols = append(targetCols, *colPtr)
}
@@ -222,6 +224,8 @@ func (app *dbTransferAppImpl) transferData(ctx context.Context, tableName string
batchSize := 1000 // 每次查询并迁移1000条数据
var queryColumns []*dbi.QueryColumn
var err error
srcMeta := srcConn.GetMetaData()
srcConverter := srcMeta.GetDataConverter()
// 游标查询源表数据,并批量插入目标表
err = srcConn.WalkTableRows(ctx, tableName, func(row map[string]any, columns []*dbi.QueryColumn) error {
@@ -236,7 +240,13 @@ func (app *dbTransferAppImpl) transferData(ctx context.Context, tableName string
}
total++
result = append(result, row)
rawValue := map[string]any{}
for _, column := range columns {
// 某些情况如oracle需要转换时间类型的字符串为time类型
res := srcConverter.ParseData(row[column.Name], srcConverter.GetDataType(column.Type))
rawValue[column.Name] = res
}
result = append(result, rawValue)
if total%batchSize == 0 {
err = app.transfer2Target(targetConn, queryColumns, result, targetDialect, tableName)
if err != nil {