fix: 数据同步相关bug修复

This commit is contained in:
刘宗洋
2024-01-08 11:24:37 +08:00
parent af454f7d5d
commit a764c4f974
7 changed files with 37 additions and 40 deletions

View File

@@ -278,7 +278,6 @@ watch(tabActiveName, async (newValue: string) => {
await handleGetTargetFields(); await handleGetTargetFields();
break; break;
case 'targetDb': case 'targetDb':
await handleGetSrcFields();
await handleGetTargetFields(); await handleGetTargetFields();
if (state.form.targetDbId && state.form.targetDbName) { if (state.form.targetDbId && state.form.targetDbName) {
await loadDbTables(state.form.targetDbId, state.form.targetDbName); await loadDbTables(state.form.targetDbId, state.form.targetDbName);

View File

@@ -6,7 +6,7 @@ import * as monaco from 'monaco-editor/esm/vs/editor/editor.api';
import { editor, languages, Position } from 'monaco-editor'; import { editor, languages, Position } from 'monaco-editor';
import { registerCompletionItemProvider } from '@/components/monaco/completionItemProvider'; import { registerCompletionItemProvider } from '@/components/monaco/completionItemProvider';
import { EditorCompletionItem, getDbDialect } from './dialect'; import {DbDialect, EditorCompletionItem, getDbDialect} from './dialect'
const dbInstCache: Map<number, DbInst> = new Map(); const dbInstCache: Map<number, DbInst> = new Map();
@@ -91,7 +91,7 @@ export class DbInst {
return tables; return tables;
} }
async loadTableSuggestions(dbName: string, range: any, reload?: boolean) { async loadTableSuggestions(dbDialect: DbDialect, dbName: string, range: any, reload?: boolean) {
const tables = await this.loadTables(dbName, reload); const tables = await this.loadTables(dbName, reload);
// 表名联想 // 表名联想
let suggestions: languages.CompletionItem[] = []; let suggestions: languages.CompletionItem[] = [];
@@ -104,7 +104,7 @@ export class DbInst {
}, },
kind: monaco.languages.CompletionItemKind.File, kind: monaco.languages.CompletionItemKind.File,
detail: tableComment, detail: tableComment,
insertText: tableName + ' ', insertText: dbDialect.wrapName(tableName) + ' ',
range, range,
sortText: 300 + index + '', sortText: 300 + index + '',
}); });
@@ -113,7 +113,7 @@ export class DbInst {
} }
/** 加载列信息提示 */ /** 加载列信息提示 */
async loadTableColumnSuggestions(db: string, tableName: string, range: any) { async loadTableColumnSuggestions(dbDialect: DbDialect,db: string, tableName: string, range: any) {
let dbHits = await this.loadDbHints(db); let dbHits = await this.loadDbHints(db);
let columns = dbHits[tableName]; let columns = dbHits[tableName];
let suggestions: languages.CompletionItem[] = []; let suggestions: languages.CompletionItem[] = [];
@@ -128,7 +128,7 @@ export class DbInst {
}, },
kind: monaco.languages.CompletionItemKind.Property, kind: monaco.languages.CompletionItemKind.Property,
detail: '', // 不显示detail, 否则选中时备注等会被遮挡 detail: '', // 不显示detail, 否则选中时备注等会被遮挡
insertText: fieldName, // create_time insertText: dbDialect.wrapName(fieldName)+ ' ', // create_time
range, range,
sortText: 100 + index + '', // 使用表字段声明顺序排序,排序需为字符串类型 sortText: 100 + index + '', // 使用表字段声明顺序排序,排序需为字符串类型
}); });
@@ -651,20 +651,20 @@ export function registerDbCompletionItemProvider(dbId: number, db: string, dbs:
if (db.indexOf('/') > 0) { if (db.indexOf('/') > 0) {
dbName = db.substring(0, db.indexOf('/') + 1) + alias; dbName = db.substring(0, db.indexOf('/') + 1) + alias;
} }
return await dbInst.loadTableSuggestions(dbName, range); return await dbInst.loadTableSuggestions(dbDialect, dbName, range);
} }
// 表下列名联想 .前的字符串是表名或表别名 // 表下列名联想 .前的字符串是表名或表别名
const sqlInfo = getTableName4SqlCtx(sqlStatement, alias, db); const sqlInfo = getTableName4SqlCtx(sqlStatement, alias, db);
// 提出到表名,则将表对应的字段也添加进提示建议 // 提出到表名,则将表对应的字段也添加进提示建议
if (sqlInfo) { if (sqlInfo) {
return await dbInst.loadTableColumnSuggestions(sqlInfo.db, sqlInfo.tableName, range); return await dbInst.loadTableColumnSuggestions(dbDialect, sqlInfo.db, sqlInfo.tableName, range);
} }
} }
// 空格触发也会提示字段信息 // 空格触发也会提示字段信息
const sqlInfo = getTableName4SqlCtx(sqlStatement, alias, db); const sqlInfo = getTableName4SqlCtx(sqlStatement, alias, db);
if (sqlInfo) { if (sqlInfo) {
const columnSuggestions = await dbInst.loadTableColumnSuggestions(sqlInfo.db, sqlInfo.tableName, range); const columnSuggestions = await dbInst.loadTableColumnSuggestions(dbDialect, sqlInfo.db, sqlInfo.tableName, range);
suggestions.push(...columnSuggestions.suggestions); suggestions.push(...columnSuggestions.suggestions);
} }

View File

@@ -203,7 +203,7 @@ class PostgresqlDialect implements DbDialect {
} }
wrapName = (name: string) => { wrapName = (name: string) => {
return name; return `"${name}"`;
}; };
matchType(text: string, arr: string[]): boolean { matchType(text: string, arr: string[]): boolean {

View File

@@ -278,11 +278,11 @@ func (dd *DMDialect) GetDbProgram() DbProgram {
panic("implement me") panic("implement me")
} }
func (pd *DMDialect) WrapName(name string) string { func (dd *DMDialect) WrapName(name string) string {
return "\"" + name + "\"" return "\"" + name + "\""
} }
func (pd *DMDialect) GetDataType(dbColumnType string) DataType { func (dd *DMDialect) GetDataType(dbColumnType string) DataType {
if regexp.MustCompile(`(?i)int|double|float|number|decimal|byte|bit`).MatchString(dbColumnType) { if regexp.MustCompile(`(?i)int|double|float|number|decimal|byte|bit`).MatchString(dbColumnType) {
return DataTypeNumber return DataTypeNumber
} }
@@ -301,7 +301,7 @@ func (pd *DMDialect) GetDataType(dbColumnType string) DataType {
return DataTypeString return DataTypeString
} }
func (pd *DMDialect) BatchInsert(tx *sql.Tx, tableName string, columns []string, values [][]any) (int64, error) { func (dd *DMDialect) BatchInsert(tx *sql.Tx, tableName string, columns []string, values [][]any) (int64, error) {
// 执行批量insert sql // 执行批量insert sql
// insert into "table_name" ("column1", "column2", ...) values (value1, value2, ...) // insert into "table_name" ("column1", "column2", ...) values (value1, value2, ...)
@@ -311,11 +311,11 @@ func (pd *DMDialect) BatchInsert(tx *sql.Tx, tableName string, columns []string,
// 去除最后一个逗号,占位符由括号包裹 // 去除最后一个逗号,占位符由括号包裹
placeholder := fmt.Sprintf("(%s)", strings.TrimSuffix(repeated, ",")) placeholder := fmt.Sprintf("(%s)", strings.TrimSuffix(repeated, ","))
sqlTemp := fmt.Sprintf("insert into %s (%s) values %s", pd.WrapName(tableName), strings.Join(columns, ","), placeholder) sqlTemp := fmt.Sprintf("insert into %s (%s) values %s", dd.WrapName(tableName), strings.Join(columns, ","), placeholder)
effRows := 0 effRows := 0
for _, value := range values { for _, value := range values {
// 达梦数据库只能一条条的执行insert // 达梦数据库只能一条条的执行insert
er, err := pd.dc.TxExec(tx, sqlTemp, value...) er, err := dd.dc.TxExec(tx, sqlTemp, value...)
if err != nil { if err != nil {
logx.Errorf("执行sql失败%s", err.Error()) logx.Errorf("执行sql失败%s", err.Error())
return int64(effRows), err return int64(effRows), err
@@ -326,7 +326,7 @@ func (pd *DMDialect) BatchInsert(tx *sql.Tx, tableName string, columns []string,
return int64(effRows), nil return int64(effRows), nil
} }
func (pd *DMDialect) FormatStrData(dbColumnValue string, dataType DataType) string { func (dd *DMDialect) FormatStrData(dbColumnValue string, dataType DataType) string {
switch dataType { switch dataType {
case DataTypeDateTime: // "2024-01-02T22:08:22.275697+08:00" case DataTypeDateTime: // "2024-01-02T22:08:22.275697+08:00"
res, _ := time.Parse(time.RFC3339, dbColumnValue) res, _ := time.Parse(time.RFC3339, dbColumnValue)

View File

@@ -202,11 +202,11 @@ func (md *MysqlDialect) GetDbProgram() DbProgram {
return NewDbProgramMysql(md.dc) return NewDbProgramMysql(md.dc)
} }
func (pd *MysqlDialect) WrapName(name string) string { func (md *MysqlDialect) WrapName(name string) string {
return "`" + name + "`" return "`" + name + "`"
} }
func (pd *MysqlDialect) GetDataType(dbColumnType string) DataType { func (md *MysqlDialect) GetDataType(dbColumnType string) DataType {
if regexp.MustCompile(`(?i)int|double|float|number|decimal|byte|bit`).MatchString(dbColumnType) { if regexp.MustCompile(`(?i)int|double|float|number|decimal|byte|bit`).MatchString(dbColumnType) {
return DataTypeNumber return DataTypeNumber
} }
@@ -225,7 +225,7 @@ func (pd *MysqlDialect) GetDataType(dbColumnType string) DataType {
return DataTypeString return DataTypeString
} }
func (pd *MysqlDialect) BatchInsert(tx *sql.Tx, tableName string, columns []string, values [][]any) (int64, error) { func (md *MysqlDialect) BatchInsert(tx *sql.Tx, tableName string, columns []string, values [][]any) (int64, error) {
// 生成占位符字符串:如:(?,?) // 生成占位符字符串:如:(?,?)
// 重复字符串并用逗号连接 // 重复字符串并用逗号连接
repeated := strings.Repeat("?,", len(columns)) repeated := strings.Repeat("?,", len(columns))
@@ -240,17 +240,17 @@ func (pd *MysqlDialect) BatchInsert(tx *sql.Tx, tableName string, columns []stri
// 去除最后一个逗号 // 去除最后一个逗号
placeholder = strings.TrimSuffix(repeated, ",") placeholder = strings.TrimSuffix(repeated, ",")
sqlStr := fmt.Sprintf("insert into %s (%s) values %s", pd.WrapName(tableName), strings.Join(columns, ","), placeholder) sqlStr := fmt.Sprintf("insert into %s (%s) values %s", md.WrapName(tableName), strings.Join(columns, ","), placeholder)
// 执行批量insert sql // 执行批量insert sql
// 把二维数组转为一维数组 // 把二维数组转为一维数组
var args []any var args []any
for _, v := range values { for _, v := range values {
args = append(args, v...) args = append(args, v...)
} }
return pd.dc.TxExec(tx, sqlStr, args...) return md.dc.TxExec(tx, sqlStr, args...)
} }
func (pd *MysqlDialect) FormatStrData(dbColumnValue string, dataType DataType) string { func (md *MysqlDialect) FormatStrData(dbColumnValue string, dataType DataType) string {
// mysql不需要格式化时间日期等 // mysql不需要格式化时间日期等
return dbColumnValue return dbColumnValue
} }

View File

@@ -281,7 +281,7 @@ func (pd *PgsqlDialect) GetDbProgram() DbProgram {
} }
func (pd *PgsqlDialect) WrapName(name string) string { func (pd *PgsqlDialect) WrapName(name string) string {
return name return fmt.Sprintf(`"%s"`, name)
} }
func (pd *PgsqlDialect) GetDataType(dbColumnType string) DataType { func (pd *PgsqlDialect) GetDataType(dbColumnType string) DataType {
@@ -307,28 +307,25 @@ func (pd *PgsqlDialect) BatchInsert(tx *sql.Tx, tableName string, columns []stri
// 执行批量insert sql跟mysql一样 pg或高斯支持批量insert语法 // 执行批量insert sql跟mysql一样 pg或高斯支持批量insert语法
// insert into table_name (column1, column2, ...) values (value1, value2, ...), (value1, value2, ...), ... // insert into table_name (column1, column2, ...) values (value1, value2, ...), (value1, value2, ...), ...
// 生成占位符字符串:如:(?,?)
// 重复字符串并用逗号连接
repeated := strings.Repeat("?,", len(columns))
// 去除最后一个逗号,占位符由括号包裹
placeholder := fmt.Sprintf("(%s)", strings.TrimSuffix(repeated, ","))
// 执行批量insert sqlmysql支持批量insert语法
// insert into table_name (column1, column2, ...) values (value1, value2, ...), (value1, value2, ...), ...
// 重复占位符字符串n遍
repeated = strings.Repeat(placeholder+",", len(values))
// 去除最后一个逗号
placeholder = strings.TrimSuffix(repeated, ",")
sqlStr := fmt.Sprintf("insert into %s (%s) values %s", pd.WrapName(tableName), strings.Join(columns, ","), placeholder)
// 执行批量insert sql
// 把二维数组转为一维数组 // 把二维数组转为一维数组
var args []any var args []any
for _, v := range values { for _, v := range values {
args = append(args, v...) args = append(args, v...)
} }
// 构建占位符字符串 "($1, $2, $3), ($4, $5, $6), ..." 用于指定参数
var placeholders []string
for i := 0; i < len(args); i += len(columns) {
var placeholder []string
for j := 0; j < len(columns); j++ {
placeholder = append(placeholder, fmt.Sprintf("$%d", i+j+1))
}
placeholders = append(placeholders, "("+strings.Join(placeholder, ", ")+")")
}
sqlStr := fmt.Sprintf("insert into %s (%s) values %s", pd.WrapName(tableName), strings.Join(columns, ","), strings.Join(placeholders, ", "))
// 执行批量insert sql
return pd.dc.TxExec(tx, sqlStr, args...) return pd.dc.TxExec(tx, sqlStr, args...)
} }

View File

@@ -19,7 +19,8 @@ func newDataSyncTaskRepo() repository.DataSyncTask {
// 分页获取数据库信息列表 // 分页获取数据库信息列表
func (d *dataSyncTaskRepoImpl) GetTaskList(condition *entity.DataSyncTaskQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) { func (d *dataSyncTaskRepoImpl) GetTaskList(condition *entity.DataSyncTaskQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) {
qd := gormx.NewQuery(new(entity.DataSyncTask)). qd := gormx.NewQuery(new(entity.DataSyncTask)).
Like("task_name", condition.Name) Like("task_name", condition.Name).
Eq("status", condition.Status)
return gormx.PageQuery(qd, pageParam, toEntity) return gormx.PageQuery(qd, pageParam, toEntity)
} }