diff --git a/mayfly_go_web/src/views/ops/db/DbList.vue b/mayfly_go_web/src/views/ops/db/DbList.vue
index e0317c19..ce74d6c7 100644
--- a/mayfly_go_web/src/views/ops/db/DbList.vue
+++ b/mayfly_go_web/src/views/ops/db/DbList.vue
@@ -194,7 +194,7 @@
-
+
@@ -202,14 +202,14 @@
-
+ :titles="['全部数据库', '导出数据库']"
+ :data="exportDialog.data"
+ max-height="300"
+ size="small"
+ />
@@ -353,7 +353,7 @@ const columns = ref([
TableColumn.new('name', '名称'),
TableColumn.new('database', '数据库').isSlot().setMinWidth(70),
TableColumn.new('remark', '备注'),
- TableColumn.new('more', '更多').isSlot().setMinWidth(220).fixedRight(),
+ TableColumn.new('more', '更多').isSlot().setMinWidth(180).fixedRight(),
]);
// 该用户拥有的的操作列按钮权限
@@ -414,7 +414,7 @@ const state = reactive({
TableColumn.new('oldValue', '原值').canBeautify(),
TableColumn.new('createTime', '执行时间').isTime(),
TableColumn.new('remark', '备注'),
- TableColumn.new('action', '操作').isSlot().setMinWidth(100).fixedRight().alignCenter(),
+ TableColumn.new('action', '操作').isSlot().setMinWidth(80).fixedRight().alignCenter(),
],
title: '',
visible: false,
diff --git a/server/internal/db/api/db.go b/server/internal/db/api/db.go
index 00f2e794..8b6dae4d 100644
--- a/server/internal/db/api/db.go
+++ b/server/internal/db/api/db.go
@@ -1,7 +1,6 @@
package api
import (
- "compress/gzip"
"fmt"
"io"
"mayfly-go/internal/db/api/form"
@@ -33,20 +32,6 @@ type Db struct {
TagApp tagapp.TagTree
}
-type gzipResponseWriter struct {
- writer *gzip.Writer
-}
-
-func (g gzipResponseWriter) WriteString(data string) {
- g.writer.Write([]byte(data))
-}
-
-func (g gzipResponseWriter) Close() {
- g.writer.Close()
-}
-
-const DEFAULT_ROW_SIZE = 5000
-
// @router /api/dbs [get]
func (d *Db) Dbs(rc *req.Ctx) {
queryCond, page := ginx.BindQueryAndPage[*entity.DbQuery](rc.GinCtx, new(entity.DbQuery))
@@ -270,9 +255,24 @@ func (d *Db) DumpSql(rc *req.Ctx) {
if len(dbNames) == 1 && len(tablesStr) > 0 {
tables = strings.Split(tablesStr, ",")
}
-
- writer := gzipResponseWriter{writer: gzip.NewWriter(g.Writer)}
- defer writer.Close()
+ writer := newGzipWriter(g.Writer)
+ defer func() {
+ var msg string
+ if err := recover(); err != nil {
+ switch t := err.(type) {
+ case biz.BizError:
+ msg = t.Error()
+ case *biz.BizError:
+ msg = t.Error()
+ }
+ }
+ if len(msg) > 0 {
+ msg = "数据库导出失败: " + msg
+ writer.WriteString(msg)
+ d.MsgApp.CreateAndSend(rc.LoginAccount, ws.ErrMsg("数据库导出失败", msg))
+ }
+ writer.Close()
+ }()
for _, dbName := range dbNames {
d.dumpDb(writer, dbId, dbName, tables, needStruct, needData, len(dbNames) > 1)
}
@@ -280,13 +280,14 @@ func (d *Db) DumpSql(rc *req.Ctx) {
rc.ReqParam = fmt.Sprintf("DB[id=%d, tag=%s, name=%s, databases=%s, tables=%s, dumpType=%s]", db.Id, db.TagPath, db.Name, dbNamesStr, tablesStr, dumpType)
}
-func (d *Db) dumpDb(writer gzipResponseWriter, dbId uint64, dbName string, tables []string, needStruct bool, needData bool, switchDb bool) {
+func (d *Db) dumpDb(writer *gzipWriter, dbId uint64, dbName string, tables []string, needStruct bool, needData bool, switchDb bool) {
dbConn := d.DbApp.GetDbConnection(dbId, dbName)
writer.WriteString("-- ----------------------------")
writer.WriteString("\n-- 导出平台: mayfly-go")
writer.WriteString(fmt.Sprintf("\n-- 导出时间: %s ", time.Now().Format("2006-01-02 15:04:05")))
writer.WriteString(fmt.Sprintf("\n-- 导出数据库: %s ", dbName))
writer.WriteString("\n-- ----------------------------\n")
+ writer.TryFlush()
if switchDb {
switch dbConn.Info.Type {
@@ -319,38 +320,29 @@ func (d *Db) dumpDb(writer gzipResponseWriter, dbId uint64, dbName string, table
writer.WriteString(fmt.Sprintf("\n-- ----------------------------\n-- 表记录: %s \n-- ----------------------------\n", table))
writer.WriteString("BEGIN;\n")
- pageNum := 1
- for {
- columns, result, _ := dbMeta.GetTableRecord(table, pageNum, DEFAULT_ROW_SIZE)
- resultLen := len(result)
- if resultLen == 0 {
- break
- }
- insertSql := "INSERT INTO `%s` VALUES (%s);\n"
- for _, res := range result {
- var values []string
- for _, column := range columns {
- value := res[column]
- if value == nil {
- values = append(values, "NULL")
- continue
- }
- strValue, ok := value.(string)
- if ok {
- values = append(values, fmt.Sprintf("%#v", strValue))
- } else {
- values = append(values, stringx.AnyToStr(value))
- }
+ insertSql := "INSERT INTO `%s` VALUES (%s);\n"
+
+ dbMeta.WalkTableRecord(table, func(record map[string]any, columns []string) {
+ var values []string
+ for _, column := range columns {
+ value := record[column]
+ if value == nil {
+ values = append(values, "NULL")
+ continue
+ }
+ strValue, ok := value.(string)
+ if ok {
+ values = append(values, fmt.Sprintf("%#v", strValue))
+ } else {
+ values = append(values, stringx.AnyToStr(value))
}
- writer.WriteString(fmt.Sprintf(insertSql, table, strings.Join(values, ", ")))
}
- if resultLen < DEFAULT_ROW_SIZE {
- break
- }
- pageNum++
- }
+ writer.WriteString(fmt.Sprintf(insertSql, table, strings.Join(values, ", ")))
+ writer.TryFlush()
+ })
writer.WriteString("COMMIT;\n")
+ writer.TryFlush()
}
}
diff --git a/server/internal/db/api/gzip_writer.go b/server/internal/db/api/gzip_writer.go
new file mode 100644
index 00000000..dde21ad1
--- /dev/null
+++ b/server/internal/db/api/gzip_writer.go
@@ -0,0 +1,38 @@
+package api
+
+import (
+ "compress/gzip"
+ "io"
+ "mayfly-go/pkg/biz"
+)
+
+type gzipWriter struct {
+ tryFlushCount int
+ writer *gzip.Writer
+ aborted bool
+}
+
+func newGzipWriter(writer io.Writer) *gzipWriter {
+ return &gzipWriter{writer: gzip.NewWriter(writer)}
+}
+
+func (g *gzipWriter) WriteString(data string) {
+ if g.aborted {
+ return
+ }
+ if _, err := g.writer.Write([]byte(data)); err != nil {
+ g.aborted = true
+ biz.IsTrue(false, "数据库导出失败:%s", err)
+ }
+}
+
+func (g *gzipWriter) Close() {
+ g.writer.Close()
+}
+
+func (g *gzipWriter) TryFlush() {
+ if g.tryFlushCount%1000 == 0 {
+ g.writer.Flush()
+ }
+ g.tryFlushCount += 1
+}
diff --git a/server/internal/db/application/db.go b/server/internal/db/application/db.go
index 74af778e..e4cce146 100644
--- a/server/internal/db/application/db.go
+++ b/server/internal/db/application/db.go
@@ -223,12 +223,17 @@ type DbConnection struct {
// 执行查询语句
// 依次返回 列名数组,结果map,错误
func (d *DbConnection) SelectData(execSql string) ([]string, []map[string]any, error) {
- return SelectDataByDb(d.db, execSql)
+ return selectDataByDb(d.db, execSql)
}
// 将查询结果映射至struct,可具体参考sqlx库
func (d *DbConnection) SelectData2Struct(execSql string, dest any) error {
- return Select2StructByDb(d.db, execSql, dest)
+ return select2StructByDb(d.db, execSql, dest)
+}
+
+// WalkTableRecord 遍历表记录
+func (d *DbConnection) WalkTableRecord(selectSql string, walk func(record map[string]any, columns []string)) error {
+ return walkTableRecord(d.db, selectSql, walk)
}
// 执行 update, insert, delete,建表等sql
@@ -242,13 +247,13 @@ func (d *DbConnection) Exec(sql string) (int64, error) {
}
// 获取数据库元信息实现接口
-func (di *DbConnection) GetMeta() DbMetadata {
- dbType := di.Info.Type
+func (d *DbConnection) GetMeta() DbMetadata {
+ dbType := d.Info.Type
if dbType == entity.DbTypeMysql {
- return &MysqlMetadata{di: di}
+ return &MysqlMetadata{di: d}
}
if dbType == entity.DbTypePostgres {
- return &PgsqlMetadata{di: di}
+ return &PgsqlMetadata{di: d}
}
return nil
}
@@ -290,11 +295,34 @@ func GetDbCacheKey(dbId uint64, db string) string {
return fmt.Sprintf("%d:%s", dbId, db)
}
-func SelectDataByDb(db *sql.DB, selectSql string) ([]string, []map[string]any, error) {
- rows, err := db.Query(selectSql)
+func selectDataByDb(db *sql.DB, selectSql string) ([]string, []map[string]any, error) {
+ // 列名用于前端表头名称按照数据库与查询字段顺序显示
+ var colNames []string
+ result := make([]map[string]any, 0, 16)
+ err := walkTableRecord(db, selectSql, func(record map[string]any, columns []string) {
+ result = append(result, record)
+ if colNames == nil {
+ colNames = make([]string, 0, len(columns))
+ copy(colNames, columns)
+ }
+ })
if err != nil {
return nil, nil, err
}
+ return colNames, result, nil
+}
+
+func walkTableRecord(db *sql.DB, selectSql string, walk func(record map[string]any, columns []string)) error {
+ tx, err := db.Begin()
+ if err != nil {
+ return err
+ }
+ defer tx.Rollback()
+
+ rows, err := tx.Query(selectSql)
+ if err != nil {
+ return err
+ }
// rows对象一定要close掉,如果出错,不关掉则会很迅速的达到设置最大连接数,
// 后面的链接过来直接报错或拒绝,实际上也没有起效果
defer func() {
@@ -302,44 +330,42 @@ func SelectDataByDb(db *sql.DB, selectSql string) ([]string, []map[string]any, e
rows.Close()
}
}()
- colTypes, _ := rows.ColumnTypes()
+
+ colTypes, err := rows.ColumnTypes()
+ if err != nil {
+ return err
+ }
+ lenCols := len(colTypes)
+ // 列名用于前端表头名称按照数据库与查询字段顺序显示
+ colNames := make([]string, lenCols)
// 这里表示一行填充数据
- scans := make([]any, len(colTypes))
+ scans := make([]any, lenCols)
// 这里表示一行所有列的值,用[]byte表示
- vals := make([][]byte, len(colTypes))
- // 这里scans引用vals,把数据填充到[]byte里
- for k := range vals {
- scans[k] = &vals[k]
+ values := make([][]byte, lenCols)
+ for k, colType := range colTypes {
+ colNames[k] = colType.Name()
+ // 这里scans引用values,把数据填充到[]byte里
+ scans[k] = &values[k]
}
- result := make([]map[string]any, 0)
- // 列名用于前端表头名称按照数据库与查询字段顺序显示
- colNames := make([]string, 0)
- // 是否第一次遍历,列名数组只需第一次遍历时加入
- isFirst := true
for rows.Next() {
// 不Scan也会导致等待,该链接实际处于未工作的状态,然后也会导致连接数迅速达到最大
- err := rows.Scan(scans...)
- if err != nil {
- return nil, nil, err
+ if err := rows.Scan(scans...); err != nil {
+ return err
}
// 每行数据
- rowData := make(map[string]any)
- // 把vals中的数据复制到row中
- for i, v := range vals {
- colType := colTypes[i]
- colName := colType.Name()
- // 如果是第一行,则将列名加入到列信息中,由于map是无序的,所有需要返回列名的有序数组
- if isFirst {
- colNames = append(colNames, colName)
- }
- rowData[colName] = valueConvert(v, colType)
+ rowData := make(map[string]any, lenCols)
+ // 把values中的数据复制到row中
+ for i, v := range values {
+ rowData[colTypes[i].Name()] = valueConvert(v, colTypes[i])
}
- // 放入结果集
- result = append(result, rowData)
- isFirst = false
+ walk(rowData, colNames)
}
- return colNames, result, nil
+
+ if err := tx.Commit(); err != nil {
+ return err
+ }
+ return nil
}
// 将查询的值转为对应列类型的实际值,不全部转为字符串
@@ -392,7 +418,7 @@ func valueConvert(data []byte, colType *sql.ColumnType) any {
}
// 查询数据结果映射至struct。可参考sqlx库
-func Select2StructByDb(db *sql.DB, selectSql string, dest any) error {
+func select2StructByDb(db *sql.DB, selectSql string, dest any) error {
rows, err := db.Query(selectSql)
if err != nil {
return err
diff --git a/server/internal/db/application/instance.go b/server/internal/db/application/instance.go
index 2a3442b4..a3c8c548 100644
--- a/server/internal/db/application/instance.go
+++ b/server/internal/db/application/instance.go
@@ -140,10 +140,11 @@ func (app *instanceAppImpl) GetDatabases(ed *entity.Instance) []string {
biz.ErrIsNilAppendErr(err, "数据库连接失败: %s")
defer dbConn.Close()
- _, res, err := SelectDataByDb(dbConn, getDatabasesSql)
+ _, res, err := selectDataByDb(dbConn, getDatabasesSql)
biz.ErrIsNilAppendErr(err, "获取数据库列表失败")
for _, re := range res {
databases = append(databases, re["dbname"].(string))
}
+
return databases
}
diff --git a/server/internal/db/application/meta.go b/server/internal/db/application/meta.go
index 989af443..8094cc39 100644
--- a/server/internal/db/application/meta.go
+++ b/server/internal/db/application/meta.go
@@ -64,6 +64,9 @@ type DbMetadata interface {
// 获取指定表的数据-分页查询
// @return columns: 列字段名;result: 结果集;error: 错误
GetTableRecord(tableName string, pageNum, pageSize int) ([]string, []map[string]any, error)
+
+ // WalkTableRecord 遍历指定表的数据
+ WalkTableRecord(tableName string, walk func(record map[string]any, columns []string)) error
}
// ------------------------- 元数据sql操作 -------------------------
diff --git a/server/internal/db/application/mysql_meta.go b/server/internal/db/application/mysql_meta.go
index 8749e02a..22c18693 100644
--- a/server/internal/db/application/mysql_meta.go
+++ b/server/internal/db/application/mysql_meta.go
@@ -160,3 +160,7 @@ func (mm *MysqlMetadata) GetCreateTableDdl(tableName string) string {
func (mm *MysqlMetadata) GetTableRecord(tableName string, pageNum, pageSize int) ([]string, []map[string]any, error) {
return mm.di.SelectData(fmt.Sprintf("SELECT * FROM %s LIMIT %d, %d", tableName, (pageNum-1)*pageSize, pageSize))
}
+
+func (mm *MysqlMetadata) WalkTableRecord(tableName string, walk func(record map[string]any, columns []string)) error {
+ return mm.di.WalkTableRecord(fmt.Sprintf("SELECT * FROM %s", tableName), walk)
+}
diff --git a/server/internal/db/application/pgsql_meta.go b/server/internal/db/application/pgsql_meta.go
index 224b58f8..50056465 100644
--- a/server/internal/db/application/pgsql_meta.go
+++ b/server/internal/db/application/pgsql_meta.go
@@ -183,3 +183,7 @@ func (pm *PgsqlMetadata) GetCreateTableDdl(tableName string) string {
func (pm *PgsqlMetadata) GetTableRecord(tableName string, pageNum, pageSize int) ([]string, []map[string]any, error) {
return pm.di.SelectData(fmt.Sprintf("SELECT * FROM %s OFFSET %d LIMIT %d", tableName, (pageNum-1)*pageSize, pageSize))
}
+
+func (pm *PgsqlMetadata) WalkTableRecord(tableName string, walk func(record map[string]any, columns []string)) error {
+ return pm.di.WalkTableRecord(fmt.Sprintf("SELECT * FROM %s", tableName), walk)
+}