From a726927a2809d8911636492f1523fa773f6a9665 Mon Sep 17 00:00:00 2001 From: "meilin.huang" <954537473@qq.com> Date: Fri, 18 Oct 2024 12:32:53 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20sql=E8=84=9A=E6=9C=AC=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mayfly_go_web/src/views/ops/db/SqlExec.vue | 23 +-- .../ops/db/component/table/DbTableOp.vue | 14 +- .../ops/db/component/table/DbTablesOp.vue | 5 - server/internal/db/api/db.go | 90 +--------- server/internal/db/application/db_sql_exec.go | 10 +- server/internal/db/dbm/sqlparser/sqlparser.go | 166 +++++++++++------- .../db/dbm/sqlparser/sqlparser_test.go | 38 ++++ server/internal/tag/domain/entity/tag_tree.go | 2 +- 8 files changed, 161 insertions(+), 187 deletions(-) create mode 100644 server/internal/db/dbm/sqlparser/sqlparser_test.go diff --git a/mayfly_go_web/src/views/ops/db/SqlExec.vue b/mayfly_go_web/src/views/ops/db/SqlExec.vue index 682d85e6..5ec3d6cb 100644 --- a/mayfly_go_web/src/views/ops/db/SqlExec.vue +++ b/mayfly_go_web/src/views/ops/db/SqlExec.vue @@ -205,7 +205,6 @@ :db-id="dt.params.id" :db="dt.params.db" :db-type="dt.params.type" - :flow-procdef="dt.params.flowProcdef" :height="state.tablesOpHeight" /> @@ -220,7 +219,6 @@ :dbId="tableCreateDialog.dbId" :db="tableCreateDialog.db" :dbType="tableCreateDialog.dbType" - :flow-procdef="tableCreateDialog.flowProcdef" :data="tableCreateDialog.data" v-model:visible="tableCreateDialog.visible" @submit-sql="onSubmitEditTableSql" @@ -249,7 +247,6 @@ import { useEventListener, useStorage } from '@vueuse/core'; import SqlExecBox from '@/views/ops/db/component/sqleditor/SqlExecBox'; import { useAutoOpenResource } from '@/store/autoOpenResource'; import { storeToRefs } from 'pinia'; -import { procdefApi } from '@/views/flow/api'; const DbTableOp = defineAsyncComponent(() => import('./component/table/DbTableOp.vue')); const DbSqlEditor = defineAsyncComponent(() => import('./component/sqleditor/DbSqlEditor.vue')); @@ -303,7 +300,6 @@ const nodeClickChangeDb = (nodeData: TagTreeNode) => { type: params.type, tagPath: params.tagPath, databases: params.dbs, - flowProcdef: params.flowProcdef, }, params.db ); @@ -335,7 +331,6 @@ const NodeTypeDbInst = new NodeType(SqlExecNodeType.DbInst).withLoadNodesFunc(as const params = parentNode.params; const dbs = (await DbInst.getDbNames(params))?.sort(); - const flowProcdef = await procdefApi.getByResource.request({ resourceType: TagResourceTypeEnum.DbName.value, resourceCode: params.code }); return dbs.map((x: any) => { return new TagTreeNode(`${parentNode.key}.${x}`, x, NodeTypeDb) .withParams({ @@ -346,7 +341,6 @@ const NodeTypeDbInst = new NodeType(SqlExecNodeType.DbInst).withLoadNodesFunc(as host: `${params.host}:${params.port}`, dbs: dbs, db: x, - flowProcdef: flowProcdef, }) .withIcon(DbIcon); }); @@ -407,7 +401,7 @@ const NodeTypeTableMenu = new NodeType(SqlExecNodeType.TableMenu) ]) .withLoadNodesFunc(async (parentNode: TagTreeNode) => { const params = parentNode.params; - let { id, db, type, flowProcdef, schema } = params; + let { id, db, type, schema } = params; // 获取当前库的所有表信息 let tables = await DbInst.getInst(id).loadTables(db, state.reloadStatus); state.reloadStatus = !dbConfig.value.cacheTable; @@ -423,7 +417,6 @@ const NodeTypeTableMenu = new NodeType(SqlExecNodeType.TableMenu) db, type, schema, - flowProcdef: flowProcdef, key: key, parentKey: parentNode.key, tableName: x.tableName, @@ -530,7 +523,6 @@ const state = reactive({ dbId: 0, db: '', dbType: '', - flowProcdef: null as any, data: {}, parentKey: '', }, @@ -818,7 +810,7 @@ const reloadNode = (nodeKey: string) => { }; const onEditTable = async (data: any) => { - let { db, id, tableName, tableComment, type, parentKey, key, flowProcdef } = data.params; + let { db, id, tableName, tableComment, type, parentKey, key } = data.params; // data.label就是表名 if (tableName) { state.tableCreateDialog.title = '修改表'; @@ -837,12 +829,11 @@ const onEditTable = async (data: any) => { state.tableCreateDialog.dbId = id; state.tableCreateDialog.db = db; state.tableCreateDialog.dbType = type; - state.tableCreateDialog.flowProcdef = flowProcdef; state.tableCreateDialog.visible = true; }; const onDeleteTable = async (data: any) => { - let { db, id, tableName, parentKey, flowProcdef, schema } = data.params; + let { db, id, tableName, parentKey, schema } = data.params; await ElMessageBox.confirm(`此操作是永久性且无法撤销,确定删除【${tableName}】? `, '提示', { confirmButtonText: '确定', cancelButtonText: '取消', @@ -854,10 +845,6 @@ const onDeleteTable = async (data: any) => { let schemaStr = schema ? `${dialect.quoteIdentifier(schema)}.` : ''; dbApi.sqlExec.request({ id, db, sql: `drop table ${schemaStr + dialect.quoteIdentifier(tableName)}` }).then(() => { - if (flowProcdef) { - ElMessage.success('工单提交成功'); - return; - } ElMessage.success('删除成功'); setTimeout(() => { parentKey && reloadNode(parentKey); @@ -866,7 +853,7 @@ const onDeleteTable = async (data: any) => { }; const onRenameTable = async (data: any) => { - let { db, id, tableName, parentKey, flowProcdef } = data.params; + let { db, id, tableName, parentKey } = data.params; let tableData = { db, oldTableName: tableName, tableName }; let value = ref(tableName); @@ -889,7 +876,6 @@ const onRenameTable = async (data: any) => { dbId: id as any, db: db as any, dbType: nowDbInst.value.getDialect().getInfo().formatSqlDialect, - flowProcdef: flowProcdef, runSuccessCallback: () => { setTimeout(() => { parentKey && reloadNode(parentKey); @@ -949,7 +935,6 @@ const getNowDbInfo = () => { name: di.name, type: di.type, host: di.host, - flowProcdef: di.flowProcdef, dbName: state.db, }; }; diff --git a/mayfly_go_web/src/views/ops/db/component/table/DbTableOp.vue b/mayfly_go_web/src/views/ops/db/component/table/DbTableOp.vue index c3f8cbef..e2c30759 100644 --- a/mayfly_go_web/src/views/ops/db/component/table/DbTableOp.vue +++ b/mayfly_go_web/src/views/ops/db/component/table/DbTableOp.vue @@ -152,9 +152,6 @@ const props = defineProps({ dbType: { type: String, }, - flowProcdef: { - type: Object, - }, }); //定义事件 @@ -335,7 +332,6 @@ const submit = async () => { dbId: props.dbId as any, db: props.db as any, dbType: dbDialect.getInfo().formatSqlDialect, - flowProcdef: props.flowProcdef, runSuccessCallback: () => { emit('submit-sql', { tableName: state.tableData.tableName }); // cancel(); @@ -371,11 +367,11 @@ const filterChangedData = (oldArr: object[], nowArr: object[], key: string): { d return data; } - let oldMap = {}, - newMap = {}; - oldArr.forEach((a) => (oldMap[a[key]] = a)); + let oldMap: any = {}, + newMap: any = {}; + oldArr.forEach((a: any) => (oldMap[a[key]] = a)); - nowArr.forEach((a) => { + nowArr.forEach((a: any) => { let k = a[key]; newMap[k] = a; // 取oldName,因为修改了name,但是oldName不会变 @@ -388,7 +384,7 @@ const filterChangedData = (oldArr: object[], nowArr: object[], key: string): { d } }); - oldArr.forEach((a) => { + oldArr.forEach((a: any) => { let k = a[key]; let newData = newMap[k]; if (!newData) { diff --git a/mayfly_go_web/src/views/ops/db/component/table/DbTablesOp.vue b/mayfly_go_web/src/views/ops/db/component/table/DbTablesOp.vue index b3e60c19..9c6878f8 100644 --- a/mayfly_go_web/src/views/ops/db/component/table/DbTablesOp.vue +++ b/mayfly_go_web/src/views/ops/db/component/table/DbTablesOp.vue @@ -109,7 +109,6 @@ :dbId="dbId" :db="db" :dbType="dbType" - :flow-procdef="props.flowProcdef" :data="tableCreateDialog.data" v-model:visible="tableCreateDialog.visible" @submit-sql="onSubmitSql" @@ -152,9 +151,6 @@ const props = defineProps({ type: [String], required: true, }, - flowProcdef: { - type: [Object], - }, }); const state = reactive({ @@ -312,7 +308,6 @@ const dropTable = async (row: any) => { sql: `DROP TABLE ${tableName}`, dbId: props.dbId as any, db: props.db as any, - flowProcdef: props.flowProcdef, runSuccessCallback: async () => { await getTables(); }, diff --git a/server/internal/db/api/db.go b/server/internal/db/api/db.go index 7cf630e5..f481f2be 100644 --- a/server/internal/db/api/db.go +++ b/server/internal/db/api/db.go @@ -9,6 +9,7 @@ import ( "mayfly-go/internal/db/application/dto" "mayfly-go/internal/db/config" "mayfly-go/internal/db/dbm/dbi" + "mayfly-go/internal/db/dbm/sqlparser" "mayfly-go/internal/db/domain/entity" "mayfly-go/internal/event" msgapp "mayfly-go/internal/msg/application" @@ -28,7 +29,6 @@ import ( "strings" "time" - // "github.com/kanzihuang/vitess/go/vt/sqlparser" "github.com/may-fly/cast" ) @@ -110,8 +110,6 @@ func (d *Db) ExecSql(rc *req.Ctx) { global.EventBus.Publish(rc.MetaCtx, event.EventTopicResourceOp, dbConn.Info.CodePath[0]) sqlStr, err := cryptox.AesDecryptByLa(form.Sql, rc.GetLoginAccount()) biz.ErrIsNilAppendErr(err, "sql解码失败: %s") - // 去除前后空格及换行符 - // sql := stringx.TrimSpaceAndBr(sqlStr) rc.ReqParam = fmt.Sprintf("%s %s\n-> %s", dbConn.Info.GetLogDesc(), form.ExecId, sqlStr) biz.NotEmpty(form.Sql, "sql不能为空") @@ -131,38 +129,6 @@ func (d *Db) ExecSql(rc *req.Ctx) { execRes, err := d.DbSqlExecApp.Exec(ctx, execReq) biz.ErrIsNil(err) rc.ResData = execRes - - // sqls := strings.Split(sql, ";") - // sqls = collx.ArrayRemoveBlank(sqls) - // // sqls, err := sqlparser.SplitStatementToPieces(sql, sqlparser.WithDialect(dbConn.GetMetaData().GetSqlParserDialect())) - // biz.ErrIsNil(err, "SQL解析错误,请检查您的执行SQL") - // isMulti := len(sqls) > 1 - // var execResAll *application.DbSqlExecRes - - // for _, s := range sqls { - // s = stringx.TrimSpaceAndBr(s) - // // 多条执行,暂不支持查询语句 - // if isMulti && len(s) > 10 { - // biz.IsTrue(!strings.HasPrefix(strings.ToLower(s[:10]), "select"), "多条语句执行暂不不支持select语句") - // } - - // execReq.Sql = s - // execRes, err := d.DbSqlExecApp.Exec(ctx, execReq) - // biz.ErrIsNilAppendErr(err, fmt.Sprintf("[%s] -> 执行失败: ", s)+"%s") - - // if execResAll == nil { - // execResAll = execRes - // } else { - // execResAll.Merge(execRes) - // } - // } - - // colAndRes := make(map[string]any) - // if execResAll != nil { - // colAndRes["columns"] = execResAll.Columns - // colAndRes["res"] = execResAll.Res - // } - // rc.ResData = colAndRes } // progressCategory sql文件执行进度消息类型 @@ -203,19 +169,6 @@ func (d *Db) ExecSqlFile(rc *req.Ctx) { } }() - execReq := &application.DbSqlExecReq{ - DbId: dbId, - Db: dbName, - Remark: filename, - DbConn: dbConn, - CheckFlow: true, - } - - var sql string - - // tokenizer := sqlparser.NewReaderTokenizer(file, - // sqlparser.WithCacheInBuffer(), sqlparser.WithDialect(dbConn.GetMetaData().GetSqlParserDialect())) - executedStatements := 0 progressId := stringx.Rand(32) laId := rc.GetLoginAccount().Id @@ -227,7 +180,8 @@ func (d *Db) ExecSqlFile(rc *req.Ctx) { }).WithCategory(progressCategory)) ticker := time.NewTicker(time.Second * 1) defer ticker.Stop() - for { + + err = sqlparser.SQLSplit(file, func(sql string) error { select { case <-ticker.C: ws.SendJsonMsg(ws.UserId(laId), clientId, msgdto.InfoSysMsg("sql脚本执行进度", &progressMsg{ @@ -238,41 +192,13 @@ func (d *Db) ExecSqlFile(rc *req.Ctx) { }).WithCategory(progressCategory)) default: } + executedStatements++ - // sql, err = sqlparser.SplitNext(tokenizer) - // if err == io.EOF { - // break - // } - biz.ErrIsNilAppendErr(err, "%s") + _, err = dbConn.Exec(sql) + return err + }) - const prefixUse = "use " - const prefixUSE = "USE " - if strings.HasPrefix(sql, prefixUSE) || strings.HasPrefix(sql, prefixUse) { - // var stmt sqlparser.Statement - // stmt, err = sqlparser.Parse(sql) - biz.ErrIsNilAppendErr(err, "%s") - - // stmtUse, ok := stmt.(*sqlparser.Use) - // 最终执行结果以数据库直接结果为准 - // if !ok { - // logx.Warnf("sql解析失败: %s", sql) - // } - // dbConn, err = d.DbApp.GetDbConn(dbId, stmtUse.DBName.String()) - biz.ErrIsNil(err) - biz.ErrIsNilAppendErr(d.TagApp.CanAccess(laId, dbConn.Info.CodePath...), "%s") - execReq.DbConn = dbConn - } - // 需要记录执行记录 - const maxRecordStatements = 64 - if executedStatements < maxRecordStatements { - execReq.Sql = sql - _, err = d.DbSqlExecApp.Exec(rc.MetaCtx, execReq) - } else { - _, err = dbConn.Exec(sql) - } - - biz.ErrIsNilAppendErr(err, "%s") - } + biz.ErrIsNilAppendErr(err, "%s") d.MsgApp.CreateAndSend(rc.GetLoginAccount(), msgdto.SuccessSysMsg("sql脚本执行成功", fmt.Sprintf("sql脚本执行完成:%s", rc.ReqParam)).WithClientId(clientId)) } diff --git a/server/internal/db/application/db_sql_exec.go b/server/internal/db/application/db_sql_exec.go index e9019975..6097dc6e 100644 --- a/server/internal/db/application/db_sql_exec.go +++ b/server/internal/db/application/db_sql_exec.go @@ -89,14 +89,12 @@ func (d *dbSqlExecAppImpl) Exec(ctx context.Context, execSqlReq *DbSqlExecReq) ( allExecRes := make([]*DbSqlExecRes, 0) stmts, err := sp.Parse(execSql) - // sql解析失败,则直接使用;切割进行执行 + // sql解析失败,则使用默认方式切割 if err != nil { - sqlScan := sqlparser.SplitSqls(strings.NewReader(execSql)) - for sqlScan.Scan() { + sqlparser.SQLSplit(strings.NewReader(execSql), func(oneSql string) error { var execRes *DbSqlExecRes var err error - oneSql := sqlScan.Text() dbSqlExecRecord := createSqlExecRecord(ctx, execSqlReq, oneSql) dbSqlExecRecord.Type = entity.DbSqlExecTypeOther sqlExec := &sqlExecParam{DbConn: dbConn, Sql: oneSql, Procdef: flowProcdef, SqlExecRecord: dbSqlExecRecord} @@ -124,8 +122,8 @@ func (d *dbSqlExecAppImpl) Exec(ctx context.Context, execSqlReq *DbSqlExecReq) ( d.saveSqlExecLog(dbSqlExecRecord, dbSqlExecRecord.Res) } allExecRes = append(allExecRes, execRes) - } - + return nil + }) return allExecRes, nil } diff --git a/server/internal/db/dbm/sqlparser/sqlparser.go b/server/internal/db/dbm/sqlparser/sqlparser.go index 2041c8ad..90d7a16b 100644 --- a/server/internal/db/dbm/sqlparser/sqlparser.go +++ b/server/internal/db/dbm/sqlparser/sqlparser.go @@ -2,18 +2,15 @@ package sqlparser import ( "bufio" + "bytes" "io" "mayfly-go/internal/db/dbm/sqlparser/sqlstmt" - "regexp" + "strings" + "unicode/utf8" ) type DbDialect string -// const ( -// mysql DbDialect = "mysql" -// pgsql DbDialect = "pgsql" -// ) - type SqlParser interface { // sql解析 @@ -21,63 +18,102 @@ type SqlParser interface { Parse(stmt string) ([]sqlstmt.Stmt, error) } -// var ( -// parsers = make(map[string]SqlParser) -// ) - -// // 注册数据库类型与dbmeta -// func Register(dialect string, parser SqlParser) { -// parsers[dialect] = parser -// } - -// func getParser(dialect string) (SqlParser, error) { -// parser, ok := parsers[dialect] -// if !ok { -// return nil, errors.New("不存在该parser") -// } -// return parser, nil -// } - -// 解析sql -// @param dialect 方言 -// @param stmt sql语句 -// func Parse(dialect string, stmt string) ([]sqlstmt.Stmt, error) { -// if parser, err := getParser(dialect); err != nil { -// return nil, err -// } else { -// return parser.Parse(stmt) -// } -// } - -var sqlSplitRegexp = regexp.MustCompile(`\s*;\s*\n`) - -// SplitSqls 根据;\n切割sql -func SplitSqls(r io.Reader) *bufio.Scanner { - scanner := bufio.NewScanner(r) - - scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) { - if atEOF && len(data) == 0 { - return 0, nil, io.EOF - } - - match := sqlSplitRegexp.FindIndex(data) - - if match != nil { - // 如果找到了";\n",判断是否为最后一行 - if match[1] == len(data) { - // 如果是最后一行,则返回完整的切片 - return len(data), data, nil - } - // 否则,返回到";\n"之后,并且包括";\n"本身 - return match[1], data[:match[1]], nil - } - - if atEOF { - return len(data), data, nil - } - - return 0, nil, nil - }) - - return scanner +// SQLSplit sql切割 +func SQLSplit(r io.Reader, callback SQLCallback) error { + return parseSQL(r, callback) +} + +// SQLCallback 是解析出一条 SQL 语句后的回调函数 +type SQLCallback func(sql string) error + +func parseSQL(r io.Reader, callback SQLCallback) error { + reader := bufio.NewReaderSize(r, 512*1024) + buffer := new(bytes.Buffer) // 使用 bytes.Buffer 来处理数据 + var currentStatement bytes.Buffer + var inString bool + var inMultiLineComment bool + var inSingleLineComment bool + var stringDelimiter rune + + for { + // 读取数据到缓冲区 + data, err := reader.ReadBytes('\n') // 按行读取 + if err == io.EOF && len(data) == 0 { + break + } + if err != nil && err != io.EOF { + return err + } + buffer.Write(data) + + // 处理缓冲区中的数据 + for buffer.Len() > 0 { + r, size := utf8.DecodeRune(buffer.Bytes()) + if r == utf8.RuneError && size == 1 { + // 如果解码出错,说明数据不完整,继续读取更多数据 + break + } + + switch { + case inMultiLineComment: + if r == '*' && buffer.Len() >= 2 && buffer.Bytes()[1] == '/' { + inMultiLineComment = false + buffer.Next(2) // 跳过 '*/' + } else { + buffer.Next(size) + } + case inSingleLineComment: + if r == '\n' { + inSingleLineComment = false + } + buffer.Next(size) + case inString: + if r == stringDelimiter { + inString = false + } + currentStatement.WriteRune(r) + buffer.Next(size) + case r == '/' && buffer.Len() >= 2 && buffer.Bytes()[1] == '*': + inMultiLineComment = true + buffer.Next(2) // 跳过 '/*' + case r == '-' && buffer.Len() >= 2 && buffer.Bytes()[1] == '-': + inSingleLineComment = true + buffer.Next(2) // 跳过 '--' + case r == '\'' || r == '"': + inString = true + stringDelimiter = r + currentStatement.WriteRune(r) + buffer.Next(size) + case r == ';' && !inString && !inMultiLineComment && !inSingleLineComment: + sql := strings.TrimSpace(currentStatement.String()) + if sql != "" { + if err := callback(sql); err != nil { + return err + } + } + currentStatement.Reset() + buffer.Next(size) + default: + currentStatement.WriteRune(r) + buffer.Next(size) + } + } + + // 如果读取到 EOF 并且缓冲区为空,退出循环 + if err == io.EOF && buffer.Len() == 0 { + break + } + } + + // 处理最后剩余的缓冲区 + if currentStatement.Len() > 0 { + sql := strings.TrimSpace(currentStatement.String()) + if sql != "" { + if err := callback(sql); err != nil { + return err + } + } + } + + return nil } diff --git a/server/internal/db/dbm/sqlparser/sqlparser_test.go b/server/internal/db/dbm/sqlparser/sqlparser_test.go new file mode 100644 index 00000000..993f8bca --- /dev/null +++ b/server/internal/db/dbm/sqlparser/sqlparser_test.go @@ -0,0 +1,38 @@ +package sqlparser + +import ( + "fmt" + "strings" + "testing" +) + +func TestSQLSplit(t *testing.T) { + + allsql := ` + /*删除*/ + DELETE FROM t_sys_log +WHERE + id IN (59) and name='哈哈哈';delete form tsyslog; + -- alter sql语句 + ALTER TABLE mayfly_go.t_sys_log +DROP COLUMN delete_time; +--插入sql语句 + INSERT INTO t_sys_log VALUES(15, 2, '用户登录', '{"ip":"::1 ",\\n"username":"test_user"}', 'errCode: 401, errMsg: --您的密码安全等级较低,请修改后重新登录', '-', 0, '2024-04-23 20:00:35', 0, NULL, ''); + --插入sql语句 + INSERT INTO t_sys_log VALUES(15, 2, '用户登录', NULL, '⑴ 成孔;⑵ 砼浇筑;⑶ 桩头掏渣;⑷ 桩基检测配合; + ⑸ 其他工作;⑹ 甲方现场要求乙方完成的其它临时工作。', '{"ip":"::1 ","username":"test_user"}', 'errCode: 401, errMsg: 您的密码安全等级较低,请修改后重新登录;\n信息嘻嘻嘻', '-', 0, '2024-04-23 20:00:35', 0, NULL, ''); + ` + // fmt.Println(allsql) + // allsql2 := "select * from t_sys_log" + err := SQLSplit(strings.NewReader(allsql), func(sql string) error { + // if strings.Contains(sql, "INSERT") { + // return fmt.Errorf("不能存在INSERT语句") + // } + fmt.Println(sql) + fmt.Println() + return nil + }) + if err != nil { + t.Fatal(err) + } +} diff --git a/server/internal/tag/domain/entity/tag_tree.go b/server/internal/tag/domain/entity/tag_tree.go index a4fa74fd..7c283c79 100644 --- a/server/internal/tag/domain/entity/tag_tree.go +++ b/server/internal/tag/domain/entity/tag_tree.go @@ -140,7 +140,7 @@ func GetParentPath(codePath string, index int) string { return parentPath + CodePathSeparator } -// GetAllCodePath 根据表情路径获取所有相关的标签codePath +// GetAllCodePath 根据标签路径获取所有相关的标签codePath,如 test1/test2/ -> test1/ test1/test2/ func GetAllCodePath(codePath string) []string { // 去除末尾的斜杠 codePath = strings.TrimSuffix(codePath, CodePathSeparator)