diff --git a/mayfly_go_web/src/views/ops/db/DbList.vue b/mayfly_go_web/src/views/ops/db/DbList.vue index 1140f9d1..ce74d6c7 100644 --- a/mayfly_go_web/src/views/ops/db/DbList.vue +++ b/mayfly_go_web/src/views/ops/db/DbList.vue @@ -194,7 +194,7 @@ - + @@ -353,7 +353,7 @@ const columns = ref([ TableColumn.new('name', '名称'), TableColumn.new('database', '数据库').isSlot().setMinWidth(70), TableColumn.new('remark', '备注'), - TableColumn.new('more', '更多').isSlot().setMinWidth(220).fixedRight(), + TableColumn.new('more', '更多').isSlot().setMinWidth(180).fixedRight(), ]); // 该用户拥有的的操作列按钮权限 @@ -414,7 +414,7 @@ const state = reactive({ TableColumn.new('oldValue', '原值').canBeautify(), TableColumn.new('createTime', '执行时间').isTime(), TableColumn.new('remark', '备注'), - TableColumn.new('action', '操作').isSlot().setMinWidth(100).fixedRight().alignCenter(), + TableColumn.new('action', '操作').isSlot().setMinWidth(80).fixedRight().alignCenter(), ], title: '', visible: false, diff --git a/server/internal/db/api/db.go b/server/internal/db/api/db.go index 28f545db..8b6dae4d 100644 --- a/server/internal/db/api/db.go +++ b/server/internal/db/api/db.go @@ -1,7 +1,6 @@ package api import ( - "compress/gzip" "fmt" "io" "mayfly-go/internal/db/api/form" @@ -33,20 +32,6 @@ type Db struct { TagApp tagapp.TagTree } -type gzipResponseWriter struct { - writer *gzip.Writer -} - -func (g gzipResponseWriter) WriteString(data string) { - g.writer.Write([]byte(data)) -} - -func (g gzipResponseWriter) Close() { - g.writer.Close() -} - -const DEFAULT_ROW_SIZE = 5000 - // @router /api/dbs [get] func (d *Db) Dbs(rc *req.Ctx) { queryCond, page := ginx.BindQueryAndPage[*entity.DbQuery](rc.GinCtx, new(entity.DbQuery)) @@ -270,8 +255,7 @@ func (d *Db) DumpSql(rc *req.Ctx) { if len(dbNames) == 1 && len(tablesStr) > 0 { tables = strings.Split(tablesStr, ",") } - - writer := gzipResponseWriter{writer: gzip.NewWriter(g.Writer)} + writer := newGzipWriter(g.Writer) defer func() { var msg string if err := recover(); err != nil { @@ -296,13 +280,14 @@ func (d *Db) DumpSql(rc *req.Ctx) { rc.ReqParam = fmt.Sprintf("DB[id=%d, tag=%s, name=%s, databases=%s, tables=%s, dumpType=%s]", db.Id, db.TagPath, db.Name, dbNamesStr, tablesStr, dumpType) } -func (d *Db) dumpDb(writer gzipResponseWriter, dbId uint64, dbName string, tables []string, needStruct bool, needData bool, switchDb bool) { +func (d *Db) dumpDb(writer *gzipWriter, dbId uint64, dbName string, tables []string, needStruct bool, needData bool, switchDb bool) { dbConn := d.DbApp.GetDbConnection(dbId, dbName) writer.WriteString("-- ----------------------------") writer.WriteString("\n-- 导出平台: mayfly-go") writer.WriteString(fmt.Sprintf("\n-- 导出时间: %s ", time.Now().Format("2006-01-02 15:04:05"))) writer.WriteString(fmt.Sprintf("\n-- 导出数据库: %s ", dbName)) writer.WriteString("\n-- ----------------------------\n") + writer.TryFlush() if switchDb { switch dbConn.Info.Type { @@ -335,38 +320,29 @@ func (d *Db) dumpDb(writer gzipResponseWriter, dbId uint64, dbName string, table writer.WriteString(fmt.Sprintf("\n-- ----------------------------\n-- 表记录: %s \n-- ----------------------------\n", table)) writer.WriteString("BEGIN;\n") - pageNum := 1 - for { - columns, result, _ := dbMeta.GetTableRecord(table, pageNum, DEFAULT_ROW_SIZE) - resultLen := len(result) - if resultLen == 0 { - break - } - insertSql := "INSERT INTO `%s` VALUES (%s);\n" - for _, res := range result { - var values []string - for _, column := range columns { - value := res[column] - if value == nil { - values = append(values, "NULL") - continue - } - strValue, ok := value.(string) - if ok { - values = append(values, fmt.Sprintf("%#v", strValue)) - } else { - values = append(values, stringx.AnyToStr(value)) - } + insertSql := "INSERT INTO `%s` VALUES (%s);\n" + + dbMeta.WalkTableRecord(table, func(record map[string]any, columns []string) { + var values []string + for _, column := range columns { + value := record[column] + if value == nil { + values = append(values, "NULL") + continue + } + strValue, ok := value.(string) + if ok { + values = append(values, fmt.Sprintf("%#v", strValue)) + } else { + values = append(values, stringx.AnyToStr(value)) } - writer.WriteString(fmt.Sprintf(insertSql, table, strings.Join(values, ", "))) } - if resultLen < DEFAULT_ROW_SIZE { - break - } - pageNum++ - } + writer.WriteString(fmt.Sprintf(insertSql, table, strings.Join(values, ", "))) + writer.TryFlush() + }) writer.WriteString("COMMIT;\n") + writer.TryFlush() } } diff --git a/server/internal/db/api/gzip_writer.go b/server/internal/db/api/gzip_writer.go new file mode 100644 index 00000000..dde21ad1 --- /dev/null +++ b/server/internal/db/api/gzip_writer.go @@ -0,0 +1,38 @@ +package api + +import ( + "compress/gzip" + "io" + "mayfly-go/pkg/biz" +) + +type gzipWriter struct { + tryFlushCount int + writer *gzip.Writer + aborted bool +} + +func newGzipWriter(writer io.Writer) *gzipWriter { + return &gzipWriter{writer: gzip.NewWriter(writer)} +} + +func (g *gzipWriter) WriteString(data string) { + if g.aborted { + return + } + if _, err := g.writer.Write([]byte(data)); err != nil { + g.aborted = true + biz.IsTrue(false, "数据库导出失败:%s", err) + } +} + +func (g *gzipWriter) Close() { + g.writer.Close() +} + +func (g *gzipWriter) TryFlush() { + if g.tryFlushCount%1000 == 0 { + g.writer.Flush() + } + g.tryFlushCount += 1 +} diff --git a/server/internal/db/application/db.go b/server/internal/db/application/db.go index 74af778e..e4cce146 100644 --- a/server/internal/db/application/db.go +++ b/server/internal/db/application/db.go @@ -223,12 +223,17 @@ type DbConnection struct { // 执行查询语句 // 依次返回 列名数组,结果map,错误 func (d *DbConnection) SelectData(execSql string) ([]string, []map[string]any, error) { - return SelectDataByDb(d.db, execSql) + return selectDataByDb(d.db, execSql) } // 将查询结果映射至struct,可具体参考sqlx库 func (d *DbConnection) SelectData2Struct(execSql string, dest any) error { - return Select2StructByDb(d.db, execSql, dest) + return select2StructByDb(d.db, execSql, dest) +} + +// WalkTableRecord 遍历表记录 +func (d *DbConnection) WalkTableRecord(selectSql string, walk func(record map[string]any, columns []string)) error { + return walkTableRecord(d.db, selectSql, walk) } // 执行 update, insert, delete,建表等sql @@ -242,13 +247,13 @@ func (d *DbConnection) Exec(sql string) (int64, error) { } // 获取数据库元信息实现接口 -func (di *DbConnection) GetMeta() DbMetadata { - dbType := di.Info.Type +func (d *DbConnection) GetMeta() DbMetadata { + dbType := d.Info.Type if dbType == entity.DbTypeMysql { - return &MysqlMetadata{di: di} + return &MysqlMetadata{di: d} } if dbType == entity.DbTypePostgres { - return &PgsqlMetadata{di: di} + return &PgsqlMetadata{di: d} } return nil } @@ -290,11 +295,34 @@ func GetDbCacheKey(dbId uint64, db string) string { return fmt.Sprintf("%d:%s", dbId, db) } -func SelectDataByDb(db *sql.DB, selectSql string) ([]string, []map[string]any, error) { - rows, err := db.Query(selectSql) +func selectDataByDb(db *sql.DB, selectSql string) ([]string, []map[string]any, error) { + // 列名用于前端表头名称按照数据库与查询字段顺序显示 + var colNames []string + result := make([]map[string]any, 0, 16) + err := walkTableRecord(db, selectSql, func(record map[string]any, columns []string) { + result = append(result, record) + if colNames == nil { + colNames = make([]string, 0, len(columns)) + copy(colNames, columns) + } + }) if err != nil { return nil, nil, err } + return colNames, result, nil +} + +func walkTableRecord(db *sql.DB, selectSql string, walk func(record map[string]any, columns []string)) error { + tx, err := db.Begin() + if err != nil { + return err + } + defer tx.Rollback() + + rows, err := tx.Query(selectSql) + if err != nil { + return err + } // rows对象一定要close掉,如果出错,不关掉则会很迅速的达到设置最大连接数, // 后面的链接过来直接报错或拒绝,实际上也没有起效果 defer func() { @@ -302,44 +330,42 @@ func SelectDataByDb(db *sql.DB, selectSql string) ([]string, []map[string]any, e rows.Close() } }() - colTypes, _ := rows.ColumnTypes() + + colTypes, err := rows.ColumnTypes() + if err != nil { + return err + } + lenCols := len(colTypes) + // 列名用于前端表头名称按照数据库与查询字段顺序显示 + colNames := make([]string, lenCols) // 这里表示一行填充数据 - scans := make([]any, len(colTypes)) + scans := make([]any, lenCols) // 这里表示一行所有列的值,用[]byte表示 - vals := make([][]byte, len(colTypes)) - // 这里scans引用vals,把数据填充到[]byte里 - for k := range vals { - scans[k] = &vals[k] + values := make([][]byte, lenCols) + for k, colType := range colTypes { + colNames[k] = colType.Name() + // 这里scans引用values,把数据填充到[]byte里 + scans[k] = &values[k] } - result := make([]map[string]any, 0) - // 列名用于前端表头名称按照数据库与查询字段顺序显示 - colNames := make([]string, 0) - // 是否第一次遍历,列名数组只需第一次遍历时加入 - isFirst := true for rows.Next() { // 不Scan也会导致等待,该链接实际处于未工作的状态,然后也会导致连接数迅速达到最大 - err := rows.Scan(scans...) - if err != nil { - return nil, nil, err + if err := rows.Scan(scans...); err != nil { + return err } // 每行数据 - rowData := make(map[string]any) - // 把vals中的数据复制到row中 - for i, v := range vals { - colType := colTypes[i] - colName := colType.Name() - // 如果是第一行,则将列名加入到列信息中,由于map是无序的,所有需要返回列名的有序数组 - if isFirst { - colNames = append(colNames, colName) - } - rowData[colName] = valueConvert(v, colType) + rowData := make(map[string]any, lenCols) + // 把values中的数据复制到row中 + for i, v := range values { + rowData[colTypes[i].Name()] = valueConvert(v, colTypes[i]) } - // 放入结果集 - result = append(result, rowData) - isFirst = false + walk(rowData, colNames) } - return colNames, result, nil + + if err := tx.Commit(); err != nil { + return err + } + return nil } // 将查询的值转为对应列类型的实际值,不全部转为字符串 @@ -392,7 +418,7 @@ func valueConvert(data []byte, colType *sql.ColumnType) any { } // 查询数据结果映射至struct。可参考sqlx库 -func Select2StructByDb(db *sql.DB, selectSql string, dest any) error { +func select2StructByDb(db *sql.DB, selectSql string, dest any) error { rows, err := db.Query(selectSql) if err != nil { return err diff --git a/server/internal/db/application/instance.go b/server/internal/db/application/instance.go index 2a3442b4..a3c8c548 100644 --- a/server/internal/db/application/instance.go +++ b/server/internal/db/application/instance.go @@ -140,10 +140,11 @@ func (app *instanceAppImpl) GetDatabases(ed *entity.Instance) []string { biz.ErrIsNilAppendErr(err, "数据库连接失败: %s") defer dbConn.Close() - _, res, err := SelectDataByDb(dbConn, getDatabasesSql) + _, res, err := selectDataByDb(dbConn, getDatabasesSql) biz.ErrIsNilAppendErr(err, "获取数据库列表失败") for _, re := range res { databases = append(databases, re["dbname"].(string)) } + return databases } diff --git a/server/internal/db/application/meta.go b/server/internal/db/application/meta.go index 989af443..8094cc39 100644 --- a/server/internal/db/application/meta.go +++ b/server/internal/db/application/meta.go @@ -64,6 +64,9 @@ type DbMetadata interface { // 获取指定表的数据-分页查询 // @return columns: 列字段名;result: 结果集;error: 错误 GetTableRecord(tableName string, pageNum, pageSize int) ([]string, []map[string]any, error) + + // WalkTableRecord 遍历指定表的数据 + WalkTableRecord(tableName string, walk func(record map[string]any, columns []string)) error } // ------------------------- 元数据sql操作 ------------------------- diff --git a/server/internal/db/application/mysql_meta.go b/server/internal/db/application/mysql_meta.go index 8749e02a..22c18693 100644 --- a/server/internal/db/application/mysql_meta.go +++ b/server/internal/db/application/mysql_meta.go @@ -160,3 +160,7 @@ func (mm *MysqlMetadata) GetCreateTableDdl(tableName string) string { func (mm *MysqlMetadata) GetTableRecord(tableName string, pageNum, pageSize int) ([]string, []map[string]any, error) { return mm.di.SelectData(fmt.Sprintf("SELECT * FROM %s LIMIT %d, %d", tableName, (pageNum-1)*pageSize, pageSize)) } + +func (mm *MysqlMetadata) WalkTableRecord(tableName string, walk func(record map[string]any, columns []string)) error { + return mm.di.WalkTableRecord(fmt.Sprintf("SELECT * FROM %s", tableName), walk) +} diff --git a/server/internal/db/application/pgsql_meta.go b/server/internal/db/application/pgsql_meta.go index 224b58f8..50056465 100644 --- a/server/internal/db/application/pgsql_meta.go +++ b/server/internal/db/application/pgsql_meta.go @@ -183,3 +183,7 @@ func (pm *PgsqlMetadata) GetCreateTableDdl(tableName string) string { func (pm *PgsqlMetadata) GetTableRecord(tableName string, pageNum, pageSize int) ([]string, []map[string]any, error) { return pm.di.SelectData(fmt.Sprintf("SELECT * FROM %s OFFSET %d LIMIT %d", tableName, (pageNum-1)*pageSize, pageSize)) } + +func (pm *PgsqlMetadata) WalkTableRecord(tableName string, walk func(record map[string]any, columns []string)) error { + return pm.di.WalkTableRecord(fmt.Sprintf("SELECT * FROM %s", tableName), walk) +}