Merge pull request #57 from kanzihuang/feature-export-databases

feat: 批量导出数据库时可按名称筛选数据库
This commit is contained in:
may-fly
2023-09-07 17:20:42 +08:00
committed by GitHub
8 changed files with 161 additions and 93 deletions

View File

@@ -194,7 +194,7 @@
<el-form-item label="扩展名: ">
<el-radio-group v-model="exportDialog.extName">
<el-radio label="sql" />
<el-radio label="gz" />
<el-radio label="gzip" />
</el-radio-group>
</el-form-item>
</el-col>
@@ -202,14 +202,14 @@
<el-form-item>
<el-transfer
:titles="['全部数据库', '导出数据库']"
max-height="300"
v-model="exportDialog.value"
:data="exportDialog.data"
filterable
filter-placeholder="按数据库名称筛选"
>
</el-transfer>
:titles="['全部数据库', '导出数据库']"
:data="exportDialog.data"
max-height="300"
size="small"
/>
</el-form-item>
<template #footer>
@@ -353,7 +353,7 @@ const columns = ref([
TableColumn.new('name', '名称'),
TableColumn.new('database', '数据库').isSlot().setMinWidth(70),
TableColumn.new('remark', '备注'),
TableColumn.new('more', '更多').isSlot().setMinWidth(220).fixedRight(),
TableColumn.new('more', '更多').isSlot().setMinWidth(180).fixedRight(),
]);
// 该用户拥有的的操作列按钮权限
@@ -414,7 +414,7 @@ const state = reactive({
TableColumn.new('oldValue', '原值').canBeautify(),
TableColumn.new('createTime', '执行时间').isTime(),
TableColumn.new('remark', '备注'),
TableColumn.new('action', '操作').isSlot().setMinWidth(100).fixedRight().alignCenter(),
TableColumn.new('action', '操作').isSlot().setMinWidth(80).fixedRight().alignCenter(),
],
title: '',
visible: false,

View File

@@ -1,7 +1,6 @@
package api
import (
"compress/gzip"
"fmt"
"io"
"mayfly-go/internal/db/api/form"
@@ -33,20 +32,6 @@ type Db struct {
TagApp tagapp.TagTree
}
type gzipResponseWriter struct {
writer *gzip.Writer
}
func (g gzipResponseWriter) WriteString(data string) {
g.writer.Write([]byte(data))
}
func (g gzipResponseWriter) Close() {
g.writer.Close()
}
const DEFAULT_ROW_SIZE = 5000
// @router /api/dbs [get]
func (d *Db) Dbs(rc *req.Ctx) {
queryCond, page := ginx.BindQueryAndPage[*entity.DbQuery](rc.GinCtx, new(entity.DbQuery))
@@ -270,9 +255,24 @@ func (d *Db) DumpSql(rc *req.Ctx) {
if len(dbNames) == 1 && len(tablesStr) > 0 {
tables = strings.Split(tablesStr, ",")
}
writer := gzipResponseWriter{writer: gzip.NewWriter(g.Writer)}
defer writer.Close()
writer := newGzipWriter(g.Writer)
defer func() {
var msg string
if err := recover(); err != nil {
switch t := err.(type) {
case biz.BizError:
msg = t.Error()
case *biz.BizError:
msg = t.Error()
}
}
if len(msg) > 0 {
msg = "数据库导出失败: " + msg
writer.WriteString(msg)
d.MsgApp.CreateAndSend(rc.LoginAccount, ws.ErrMsg("数据库导出失败", msg))
}
writer.Close()
}()
for _, dbName := range dbNames {
d.dumpDb(writer, dbId, dbName, tables, needStruct, needData, len(dbNames) > 1)
}
@@ -280,13 +280,14 @@ func (d *Db) DumpSql(rc *req.Ctx) {
rc.ReqParam = fmt.Sprintf("DB[id=%d, tag=%s, name=%s, databases=%s, tables=%s, dumpType=%s]", db.Id, db.TagPath, db.Name, dbNamesStr, tablesStr, dumpType)
}
func (d *Db) dumpDb(writer gzipResponseWriter, dbId uint64, dbName string, tables []string, needStruct bool, needData bool, switchDb bool) {
func (d *Db) dumpDb(writer *gzipWriter, dbId uint64, dbName string, tables []string, needStruct bool, needData bool, switchDb bool) {
dbConn := d.DbApp.GetDbConnection(dbId, dbName)
writer.WriteString("-- ----------------------------")
writer.WriteString("\n-- 导出平台: mayfly-go")
writer.WriteString(fmt.Sprintf("\n-- 导出时间: %s ", time.Now().Format("2006-01-02 15:04:05")))
writer.WriteString(fmt.Sprintf("\n-- 导出数据库: %s ", dbName))
writer.WriteString("\n-- ----------------------------\n")
writer.TryFlush()
if switchDb {
switch dbConn.Info.Type {
@@ -319,38 +320,29 @@ func (d *Db) dumpDb(writer gzipResponseWriter, dbId uint64, dbName string, table
writer.WriteString(fmt.Sprintf("\n-- ----------------------------\n-- 表记录: %s \n-- ----------------------------\n", table))
writer.WriteString("BEGIN;\n")
pageNum := 1
for {
columns, result, _ := dbMeta.GetTableRecord(table, pageNum, DEFAULT_ROW_SIZE)
resultLen := len(result)
if resultLen == 0 {
break
}
insertSql := "INSERT INTO `%s` VALUES (%s);\n"
for _, res := range result {
var values []string
for _, column := range columns {
value := res[column]
if value == nil {
values = append(values, "NULL")
continue
}
strValue, ok := value.(string)
if ok {
values = append(values, fmt.Sprintf("%#v", strValue))
} else {
values = append(values, stringx.AnyToStr(value))
}
insertSql := "INSERT INTO `%s` VALUES (%s);\n"
dbMeta.WalkTableRecord(table, func(record map[string]any, columns []string) {
var values []string
for _, column := range columns {
value := record[column]
if value == nil {
values = append(values, "NULL")
continue
}
strValue, ok := value.(string)
if ok {
values = append(values, fmt.Sprintf("%#v", strValue))
} else {
values = append(values, stringx.AnyToStr(value))
}
writer.WriteString(fmt.Sprintf(insertSql, table, strings.Join(values, ", ")))
}
if resultLen < DEFAULT_ROW_SIZE {
break
}
pageNum++
}
writer.WriteString(fmt.Sprintf(insertSql, table, strings.Join(values, ", ")))
writer.TryFlush()
})
writer.WriteString("COMMIT;\n")
writer.TryFlush()
}
}

View File

@@ -0,0 +1,38 @@
package api
import (
"compress/gzip"
"io"
"mayfly-go/pkg/biz"
)
type gzipWriter struct {
tryFlushCount int
writer *gzip.Writer
aborted bool
}
func newGzipWriter(writer io.Writer) *gzipWriter {
return &gzipWriter{writer: gzip.NewWriter(writer)}
}
func (g *gzipWriter) WriteString(data string) {
if g.aborted {
return
}
if _, err := g.writer.Write([]byte(data)); err != nil {
g.aborted = true
biz.IsTrue(false, "数据库导出失败:%s", err)
}
}
func (g *gzipWriter) Close() {
g.writer.Close()
}
func (g *gzipWriter) TryFlush() {
if g.tryFlushCount%1000 == 0 {
g.writer.Flush()
}
g.tryFlushCount += 1
}

View File

@@ -223,12 +223,17 @@ type DbConnection struct {
// 执行查询语句
// 依次返回 列名数组结果map错误
func (d *DbConnection) SelectData(execSql string) ([]string, []map[string]any, error) {
return SelectDataByDb(d.db, execSql)
return selectDataByDb(d.db, execSql)
}
// 将查询结果映射至struct可具体参考sqlx库
func (d *DbConnection) SelectData2Struct(execSql string, dest any) error {
return Select2StructByDb(d.db, execSql, dest)
return select2StructByDb(d.db, execSql, dest)
}
// WalkTableRecord 遍历表记录
func (d *DbConnection) WalkTableRecord(selectSql string, walk func(record map[string]any, columns []string)) error {
return walkTableRecord(d.db, selectSql, walk)
}
// 执行 update, insert, delete建表等sql
@@ -242,13 +247,13 @@ func (d *DbConnection) Exec(sql string) (int64, error) {
}
// 获取数据库元信息实现接口
func (di *DbConnection) GetMeta() DbMetadata {
dbType := di.Info.Type
func (d *DbConnection) GetMeta() DbMetadata {
dbType := d.Info.Type
if dbType == entity.DbTypeMysql {
return &MysqlMetadata{di: di}
return &MysqlMetadata{di: d}
}
if dbType == entity.DbTypePostgres {
return &PgsqlMetadata{di: di}
return &PgsqlMetadata{di: d}
}
return nil
}
@@ -290,11 +295,34 @@ func GetDbCacheKey(dbId uint64, db string) string {
return fmt.Sprintf("%d:%s", dbId, db)
}
func SelectDataByDb(db *sql.DB, selectSql string) ([]string, []map[string]any, error) {
rows, err := db.Query(selectSql)
func selectDataByDb(db *sql.DB, selectSql string) ([]string, []map[string]any, error) {
// 列名用于前端表头名称按照数据库与查询字段顺序显示
var colNames []string
result := make([]map[string]any, 0, 16)
err := walkTableRecord(db, selectSql, func(record map[string]any, columns []string) {
result = append(result, record)
if colNames == nil {
colNames = make([]string, 0, len(columns))
copy(colNames, columns)
}
})
if err != nil {
return nil, nil, err
}
return colNames, result, nil
}
func walkTableRecord(db *sql.DB, selectSql string, walk func(record map[string]any, columns []string)) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
rows, err := tx.Query(selectSql)
if err != nil {
return err
}
// rows对象一定要close掉如果出错不关掉则会很迅速的达到设置最大连接数
// 后面的链接过来直接报错或拒绝,实际上也没有起效果
defer func() {
@@ -302,44 +330,42 @@ func SelectDataByDb(db *sql.DB, selectSql string) ([]string, []map[string]any, e
rows.Close()
}
}()
colTypes, _ := rows.ColumnTypes()
colTypes, err := rows.ColumnTypes()
if err != nil {
return err
}
lenCols := len(colTypes)
// 列名用于前端表头名称按照数据库与查询字段顺序显示
colNames := make([]string, lenCols)
// 这里表示一行填充数据
scans := make([]any, len(colTypes))
scans := make([]any, lenCols)
// 这里表示一行所有列的值,用[]byte表示
vals := make([][]byte, len(colTypes))
// 这里scans引用vals把数据填充到[]byte里
for k := range vals {
scans[k] = &vals[k]
values := make([][]byte, lenCols)
for k, colType := range colTypes {
colNames[k] = colType.Name()
// 这里scans引用values把数据填充到[]byte里
scans[k] = &values[k]
}
result := make([]map[string]any, 0)
// 列名用于前端表头名称按照数据库与查询字段顺序显示
colNames := make([]string, 0)
// 是否第一次遍历,列名数组只需第一次遍历时加入
isFirst := true
for rows.Next() {
// 不Scan也会导致等待该链接实际处于未工作的状态然后也会导致连接数迅速达到最大
err := rows.Scan(scans...)
if err != nil {
return nil, nil, err
if err := rows.Scan(scans...); err != nil {
return err
}
// 每行数据
rowData := make(map[string]any)
// 把vals中的数据复制到row中
for i, v := range vals {
colType := colTypes[i]
colName := colType.Name()
// 如果是第一行则将列名加入到列信息中由于map是无序的所有需要返回列名的有序数组
if isFirst {
colNames = append(colNames, colName)
}
rowData[colName] = valueConvert(v, colType)
rowData := make(map[string]any, lenCols)
// 把values中的数据复制到row中
for i, v := range values {
rowData[colTypes[i].Name()] = valueConvert(v, colTypes[i])
}
// 放入结果集
result = append(result, rowData)
isFirst = false
walk(rowData, colNames)
}
return colNames, result, nil
if err := tx.Commit(); err != nil {
return err
}
return nil
}
// 将查询的值转为对应列类型的实际值,不全部转为字符串
@@ -392,7 +418,7 @@ func valueConvert(data []byte, colType *sql.ColumnType) any {
}
// 查询数据结果映射至struct。可参考sqlx库
func Select2StructByDb(db *sql.DB, selectSql string, dest any) error {
func select2StructByDb(db *sql.DB, selectSql string, dest any) error {
rows, err := db.Query(selectSql)
if err != nil {
return err

View File

@@ -140,10 +140,11 @@ func (app *instanceAppImpl) GetDatabases(ed *entity.Instance) []string {
biz.ErrIsNilAppendErr(err, "数据库连接失败: %s")
defer dbConn.Close()
_, res, err := SelectDataByDb(dbConn, getDatabasesSql)
_, res, err := selectDataByDb(dbConn, getDatabasesSql)
biz.ErrIsNilAppendErr(err, "获取数据库列表失败")
for _, re := range res {
databases = append(databases, re["dbname"].(string))
}
return databases
}

View File

@@ -64,6 +64,9 @@ type DbMetadata interface {
// 获取指定表的数据-分页查询
// @return columns: 列字段名result: 结果集error: 错误
GetTableRecord(tableName string, pageNum, pageSize int) ([]string, []map[string]any, error)
// WalkTableRecord 遍历指定表的数据
WalkTableRecord(tableName string, walk func(record map[string]any, columns []string)) error
}
// ------------------------- 元数据sql操作 -------------------------

View File

@@ -160,3 +160,7 @@ func (mm *MysqlMetadata) GetCreateTableDdl(tableName string) string {
func (mm *MysqlMetadata) GetTableRecord(tableName string, pageNum, pageSize int) ([]string, []map[string]any, error) {
return mm.di.SelectData(fmt.Sprintf("SELECT * FROM %s LIMIT %d, %d", tableName, (pageNum-1)*pageSize, pageSize))
}
func (mm *MysqlMetadata) WalkTableRecord(tableName string, walk func(record map[string]any, columns []string)) error {
return mm.di.WalkTableRecord(fmt.Sprintf("SELECT * FROM %s", tableName), walk)
}

View File

@@ -183,3 +183,7 @@ func (pm *PgsqlMetadata) GetCreateTableDdl(tableName string) string {
func (pm *PgsqlMetadata) GetTableRecord(tableName string, pageNum, pageSize int) ([]string, []map[string]any, error) {
return pm.di.SelectData(fmt.Sprintf("SELECT * FROM %s OFFSET %d LIMIT %d", tableName, (pageNum-1)*pageSize, pageSize))
}
func (pm *PgsqlMetadata) WalkTableRecord(tableName string, walk func(record map[string]any, columns []string)) error {
return pm.di.WalkTableRecord(fmt.Sprintf("SELECT * FROM %s", tableName), walk)
}