mirror of
https://gitee.com/dromara/mayfly-go
synced 2025-11-04 00:10:25 +08:00
refactor: 优化数据库导出速度
This commit is contained in:
@@ -194,7 +194,7 @@
|
|||||||
<el-form-item label="扩展名: ">
|
<el-form-item label="扩展名: ">
|
||||||
<el-radio-group v-model="exportDialog.extName">
|
<el-radio-group v-model="exportDialog.extName">
|
||||||
<el-radio label="sql" />
|
<el-radio label="sql" />
|
||||||
<el-radio label="gz" />
|
<el-radio label="gzip" />
|
||||||
</el-radio-group>
|
</el-radio-group>
|
||||||
</el-form-item>
|
</el-form-item>
|
||||||
</el-col>
|
</el-col>
|
||||||
@@ -353,7 +353,7 @@ const columns = ref([
|
|||||||
TableColumn.new('name', '名称'),
|
TableColumn.new('name', '名称'),
|
||||||
TableColumn.new('database', '数据库').isSlot().setMinWidth(70),
|
TableColumn.new('database', '数据库').isSlot().setMinWidth(70),
|
||||||
TableColumn.new('remark', '备注'),
|
TableColumn.new('remark', '备注'),
|
||||||
TableColumn.new('more', '更多').isSlot().setMinWidth(220).fixedRight(),
|
TableColumn.new('more', '更多').isSlot().setMinWidth(180).fixedRight(),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
// 该用户拥有的的操作列按钮权限
|
// 该用户拥有的的操作列按钮权限
|
||||||
@@ -414,7 +414,7 @@ const state = reactive({
|
|||||||
TableColumn.new('oldValue', '原值').canBeautify(),
|
TableColumn.new('oldValue', '原值').canBeautify(),
|
||||||
TableColumn.new('createTime', '执行时间').isTime(),
|
TableColumn.new('createTime', '执行时间').isTime(),
|
||||||
TableColumn.new('remark', '备注'),
|
TableColumn.new('remark', '备注'),
|
||||||
TableColumn.new('action', '操作').isSlot().setMinWidth(100).fixedRight().alignCenter(),
|
TableColumn.new('action', '操作').isSlot().setMinWidth(80).fixedRight().alignCenter(),
|
||||||
],
|
],
|
||||||
title: '',
|
title: '',
|
||||||
visible: false,
|
visible: false,
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"compress/gzip"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"mayfly-go/internal/db/api/form"
|
"mayfly-go/internal/db/api/form"
|
||||||
@@ -33,20 +32,6 @@ type Db struct {
|
|||||||
TagApp tagapp.TagTree
|
TagApp tagapp.TagTree
|
||||||
}
|
}
|
||||||
|
|
||||||
type gzipResponseWriter struct {
|
|
||||||
writer *gzip.Writer
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g gzipResponseWriter) WriteString(data string) {
|
|
||||||
g.writer.Write([]byte(data))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g gzipResponseWriter) Close() {
|
|
||||||
g.writer.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
const DEFAULT_ROW_SIZE = 5000
|
|
||||||
|
|
||||||
// @router /api/dbs [get]
|
// @router /api/dbs [get]
|
||||||
func (d *Db) Dbs(rc *req.Ctx) {
|
func (d *Db) Dbs(rc *req.Ctx) {
|
||||||
queryCond, page := ginx.BindQueryAndPage[*entity.DbQuery](rc.GinCtx, new(entity.DbQuery))
|
queryCond, page := ginx.BindQueryAndPage[*entity.DbQuery](rc.GinCtx, new(entity.DbQuery))
|
||||||
@@ -270,8 +255,7 @@ func (d *Db) DumpSql(rc *req.Ctx) {
|
|||||||
if len(dbNames) == 1 && len(tablesStr) > 0 {
|
if len(dbNames) == 1 && len(tablesStr) > 0 {
|
||||||
tables = strings.Split(tablesStr, ",")
|
tables = strings.Split(tablesStr, ",")
|
||||||
}
|
}
|
||||||
|
writer := newGzipWriter(g.Writer)
|
||||||
writer := gzipResponseWriter{writer: gzip.NewWriter(g.Writer)}
|
|
||||||
defer func() {
|
defer func() {
|
||||||
var msg string
|
var msg string
|
||||||
if err := recover(); err != nil {
|
if err := recover(); err != nil {
|
||||||
@@ -296,13 +280,14 @@ func (d *Db) DumpSql(rc *req.Ctx) {
|
|||||||
rc.ReqParam = fmt.Sprintf("DB[id=%d, tag=%s, name=%s, databases=%s, tables=%s, dumpType=%s]", db.Id, db.TagPath, db.Name, dbNamesStr, tablesStr, dumpType)
|
rc.ReqParam = fmt.Sprintf("DB[id=%d, tag=%s, name=%s, databases=%s, tables=%s, dumpType=%s]", db.Id, db.TagPath, db.Name, dbNamesStr, tablesStr, dumpType)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Db) dumpDb(writer gzipResponseWriter, dbId uint64, dbName string, tables []string, needStruct bool, needData bool, switchDb bool) {
|
func (d *Db) dumpDb(writer *gzipWriter, dbId uint64, dbName string, tables []string, needStruct bool, needData bool, switchDb bool) {
|
||||||
dbConn := d.DbApp.GetDbConnection(dbId, dbName)
|
dbConn := d.DbApp.GetDbConnection(dbId, dbName)
|
||||||
writer.WriteString("-- ----------------------------")
|
writer.WriteString("-- ----------------------------")
|
||||||
writer.WriteString("\n-- 导出平台: mayfly-go")
|
writer.WriteString("\n-- 导出平台: mayfly-go")
|
||||||
writer.WriteString(fmt.Sprintf("\n-- 导出时间: %s ", time.Now().Format("2006-01-02 15:04:05")))
|
writer.WriteString(fmt.Sprintf("\n-- 导出时间: %s ", time.Now().Format("2006-01-02 15:04:05")))
|
||||||
writer.WriteString(fmt.Sprintf("\n-- 导出数据库: %s ", dbName))
|
writer.WriteString(fmt.Sprintf("\n-- 导出数据库: %s ", dbName))
|
||||||
writer.WriteString("\n-- ----------------------------\n")
|
writer.WriteString("\n-- ----------------------------\n")
|
||||||
|
writer.TryFlush()
|
||||||
|
|
||||||
if switchDb {
|
if switchDb {
|
||||||
switch dbConn.Info.Type {
|
switch dbConn.Info.Type {
|
||||||
@@ -335,38 +320,29 @@ func (d *Db) dumpDb(writer gzipResponseWriter, dbId uint64, dbName string, table
|
|||||||
writer.WriteString(fmt.Sprintf("\n-- ----------------------------\n-- 表记录: %s \n-- ----------------------------\n", table))
|
writer.WriteString(fmt.Sprintf("\n-- ----------------------------\n-- 表记录: %s \n-- ----------------------------\n", table))
|
||||||
writer.WriteString("BEGIN;\n")
|
writer.WriteString("BEGIN;\n")
|
||||||
|
|
||||||
pageNum := 1
|
insertSql := "INSERT INTO `%s` VALUES (%s);\n"
|
||||||
for {
|
|
||||||
columns, result, _ := dbMeta.GetTableRecord(table, pageNum, DEFAULT_ROW_SIZE)
|
dbMeta.WalkTableRecord(table, func(record map[string]any, columns []string) {
|
||||||
resultLen := len(result)
|
var values []string
|
||||||
if resultLen == 0 {
|
for _, column := range columns {
|
||||||
break
|
value := record[column]
|
||||||
}
|
if value == nil {
|
||||||
insertSql := "INSERT INTO `%s` VALUES (%s);\n"
|
values = append(values, "NULL")
|
||||||
for _, res := range result {
|
continue
|
||||||
var values []string
|
}
|
||||||
for _, column := range columns {
|
strValue, ok := value.(string)
|
||||||
value := res[column]
|
if ok {
|
||||||
if value == nil {
|
values = append(values, fmt.Sprintf("%#v", strValue))
|
||||||
values = append(values, "NULL")
|
} else {
|
||||||
continue
|
values = append(values, stringx.AnyToStr(value))
|
||||||
}
|
|
||||||
strValue, ok := value.(string)
|
|
||||||
if ok {
|
|
||||||
values = append(values, fmt.Sprintf("%#v", strValue))
|
|
||||||
} else {
|
|
||||||
values = append(values, stringx.AnyToStr(value))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
writer.WriteString(fmt.Sprintf(insertSql, table, strings.Join(values, ", ")))
|
|
||||||
}
|
}
|
||||||
if resultLen < DEFAULT_ROW_SIZE {
|
writer.WriteString(fmt.Sprintf(insertSql, table, strings.Join(values, ", ")))
|
||||||
break
|
writer.TryFlush()
|
||||||
}
|
})
|
||||||
pageNum++
|
|
||||||
}
|
|
||||||
|
|
||||||
writer.WriteString("COMMIT;\n")
|
writer.WriteString("COMMIT;\n")
|
||||||
|
writer.TryFlush()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
38
server/internal/db/api/gzip_writer.go
Normal file
38
server/internal/db/api/gzip_writer.go
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"compress/gzip"
|
||||||
|
"io"
|
||||||
|
"mayfly-go/pkg/biz"
|
||||||
|
)
|
||||||
|
|
||||||
|
type gzipWriter struct {
|
||||||
|
tryFlushCount int
|
||||||
|
writer *gzip.Writer
|
||||||
|
aborted bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func newGzipWriter(writer io.Writer) *gzipWriter {
|
||||||
|
return &gzipWriter{writer: gzip.NewWriter(writer)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *gzipWriter) WriteString(data string) {
|
||||||
|
if g.aborted {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if _, err := g.writer.Write([]byte(data)); err != nil {
|
||||||
|
g.aborted = true
|
||||||
|
biz.IsTrue(false, "数据库导出失败:%s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *gzipWriter) Close() {
|
||||||
|
g.writer.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *gzipWriter) TryFlush() {
|
||||||
|
if g.tryFlushCount%1000 == 0 {
|
||||||
|
g.writer.Flush()
|
||||||
|
}
|
||||||
|
g.tryFlushCount += 1
|
||||||
|
}
|
||||||
@@ -223,12 +223,17 @@ type DbConnection struct {
|
|||||||
// 执行查询语句
|
// 执行查询语句
|
||||||
// 依次返回 列名数组,结果map,错误
|
// 依次返回 列名数组,结果map,错误
|
||||||
func (d *DbConnection) SelectData(execSql string) ([]string, []map[string]any, error) {
|
func (d *DbConnection) SelectData(execSql string) ([]string, []map[string]any, error) {
|
||||||
return SelectDataByDb(d.db, execSql)
|
return selectDataByDb(d.db, execSql)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 将查询结果映射至struct,可具体参考sqlx库
|
// 将查询结果映射至struct,可具体参考sqlx库
|
||||||
func (d *DbConnection) SelectData2Struct(execSql string, dest any) error {
|
func (d *DbConnection) SelectData2Struct(execSql string, dest any) error {
|
||||||
return Select2StructByDb(d.db, execSql, dest)
|
return select2StructByDb(d.db, execSql, dest)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WalkTableRecord 遍历表记录
|
||||||
|
func (d *DbConnection) WalkTableRecord(selectSql string, walk func(record map[string]any, columns []string)) error {
|
||||||
|
return walkTableRecord(d.db, selectSql, walk)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 执行 update, insert, delete,建表等sql
|
// 执行 update, insert, delete,建表等sql
|
||||||
@@ -242,13 +247,13 @@ func (d *DbConnection) Exec(sql string) (int64, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 获取数据库元信息实现接口
|
// 获取数据库元信息实现接口
|
||||||
func (di *DbConnection) GetMeta() DbMetadata {
|
func (d *DbConnection) GetMeta() DbMetadata {
|
||||||
dbType := di.Info.Type
|
dbType := d.Info.Type
|
||||||
if dbType == entity.DbTypeMysql {
|
if dbType == entity.DbTypeMysql {
|
||||||
return &MysqlMetadata{di: di}
|
return &MysqlMetadata{di: d}
|
||||||
}
|
}
|
||||||
if dbType == entity.DbTypePostgres {
|
if dbType == entity.DbTypePostgres {
|
||||||
return &PgsqlMetadata{di: di}
|
return &PgsqlMetadata{di: d}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -290,11 +295,34 @@ func GetDbCacheKey(dbId uint64, db string) string {
|
|||||||
return fmt.Sprintf("%d:%s", dbId, db)
|
return fmt.Sprintf("%d:%s", dbId, db)
|
||||||
}
|
}
|
||||||
|
|
||||||
func SelectDataByDb(db *sql.DB, selectSql string) ([]string, []map[string]any, error) {
|
func selectDataByDb(db *sql.DB, selectSql string) ([]string, []map[string]any, error) {
|
||||||
rows, err := db.Query(selectSql)
|
// 列名用于前端表头名称按照数据库与查询字段顺序显示
|
||||||
|
var colNames []string
|
||||||
|
result := make([]map[string]any, 0, 16)
|
||||||
|
err := walkTableRecord(db, selectSql, func(record map[string]any, columns []string) {
|
||||||
|
result = append(result, record)
|
||||||
|
if colNames == nil {
|
||||||
|
colNames = make([]string, 0, len(columns))
|
||||||
|
copy(colNames, columns)
|
||||||
|
}
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
return colNames, result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func walkTableRecord(db *sql.DB, selectSql string, walk func(record map[string]any, columns []string)) error {
|
||||||
|
tx, err := db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer tx.Rollback()
|
||||||
|
|
||||||
|
rows, err := tx.Query(selectSql)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
// rows对象一定要close掉,如果出错,不关掉则会很迅速的达到设置最大连接数,
|
// rows对象一定要close掉,如果出错,不关掉则会很迅速的达到设置最大连接数,
|
||||||
// 后面的链接过来直接报错或拒绝,实际上也没有起效果
|
// 后面的链接过来直接报错或拒绝,实际上也没有起效果
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -302,44 +330,42 @@ func SelectDataByDb(db *sql.DB, selectSql string) ([]string, []map[string]any, e
|
|||||||
rows.Close()
|
rows.Close()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
colTypes, _ := rows.ColumnTypes()
|
|
||||||
|
colTypes, err := rows.ColumnTypes()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
lenCols := len(colTypes)
|
||||||
|
// 列名用于前端表头名称按照数据库与查询字段顺序显示
|
||||||
|
colNames := make([]string, lenCols)
|
||||||
// 这里表示一行填充数据
|
// 这里表示一行填充数据
|
||||||
scans := make([]any, len(colTypes))
|
scans := make([]any, lenCols)
|
||||||
// 这里表示一行所有列的值,用[]byte表示
|
// 这里表示一行所有列的值,用[]byte表示
|
||||||
vals := make([][]byte, len(colTypes))
|
values := make([][]byte, lenCols)
|
||||||
// 这里scans引用vals,把数据填充到[]byte里
|
for k, colType := range colTypes {
|
||||||
for k := range vals {
|
colNames[k] = colType.Name()
|
||||||
scans[k] = &vals[k]
|
// 这里scans引用values,把数据填充到[]byte里
|
||||||
|
scans[k] = &values[k]
|
||||||
}
|
}
|
||||||
|
|
||||||
result := make([]map[string]any, 0)
|
|
||||||
// 列名用于前端表头名称按照数据库与查询字段顺序显示
|
|
||||||
colNames := make([]string, 0)
|
|
||||||
// 是否第一次遍历,列名数组只需第一次遍历时加入
|
|
||||||
isFirst := true
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
// 不Scan也会导致等待,该链接实际处于未工作的状态,然后也会导致连接数迅速达到最大
|
// 不Scan也会导致等待,该链接实际处于未工作的状态,然后也会导致连接数迅速达到最大
|
||||||
err := rows.Scan(scans...)
|
if err := rows.Scan(scans...); err != nil {
|
||||||
if err != nil {
|
return err
|
||||||
return nil, nil, err
|
|
||||||
}
|
}
|
||||||
// 每行数据
|
// 每行数据
|
||||||
rowData := make(map[string]any)
|
rowData := make(map[string]any, lenCols)
|
||||||
// 把vals中的数据复制到row中
|
// 把values中的数据复制到row中
|
||||||
for i, v := range vals {
|
for i, v := range values {
|
||||||
colType := colTypes[i]
|
rowData[colTypes[i].Name()] = valueConvert(v, colTypes[i])
|
||||||
colName := colType.Name()
|
|
||||||
// 如果是第一行,则将列名加入到列信息中,由于map是无序的,所有需要返回列名的有序数组
|
|
||||||
if isFirst {
|
|
||||||
colNames = append(colNames, colName)
|
|
||||||
}
|
|
||||||
rowData[colName] = valueConvert(v, colType)
|
|
||||||
}
|
}
|
||||||
// 放入结果集
|
walk(rowData, colNames)
|
||||||
result = append(result, rowData)
|
|
||||||
isFirst = false
|
|
||||||
}
|
}
|
||||||
return colNames, result, nil
|
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// 将查询的值转为对应列类型的实际值,不全部转为字符串
|
// 将查询的值转为对应列类型的实际值,不全部转为字符串
|
||||||
@@ -392,7 +418,7 @@ func valueConvert(data []byte, colType *sql.ColumnType) any {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 查询数据结果映射至struct。可参考sqlx库
|
// 查询数据结果映射至struct。可参考sqlx库
|
||||||
func Select2StructByDb(db *sql.DB, selectSql string, dest any) error {
|
func select2StructByDb(db *sql.DB, selectSql string, dest any) error {
|
||||||
rows, err := db.Query(selectSql)
|
rows, err := db.Query(selectSql)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|||||||
@@ -140,10 +140,11 @@ func (app *instanceAppImpl) GetDatabases(ed *entity.Instance) []string {
|
|||||||
biz.ErrIsNilAppendErr(err, "数据库连接失败: %s")
|
biz.ErrIsNilAppendErr(err, "数据库连接失败: %s")
|
||||||
defer dbConn.Close()
|
defer dbConn.Close()
|
||||||
|
|
||||||
_, res, err := SelectDataByDb(dbConn, getDatabasesSql)
|
_, res, err := selectDataByDb(dbConn, getDatabasesSql)
|
||||||
biz.ErrIsNilAppendErr(err, "获取数据库列表失败")
|
biz.ErrIsNilAppendErr(err, "获取数据库列表失败")
|
||||||
for _, re := range res {
|
for _, re := range res {
|
||||||
databases = append(databases, re["dbname"].(string))
|
databases = append(databases, re["dbname"].(string))
|
||||||
}
|
}
|
||||||
|
|
||||||
return databases
|
return databases
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -64,6 +64,9 @@ type DbMetadata interface {
|
|||||||
// 获取指定表的数据-分页查询
|
// 获取指定表的数据-分页查询
|
||||||
// @return columns: 列字段名;result: 结果集;error: 错误
|
// @return columns: 列字段名;result: 结果集;error: 错误
|
||||||
GetTableRecord(tableName string, pageNum, pageSize int) ([]string, []map[string]any, error)
|
GetTableRecord(tableName string, pageNum, pageSize int) ([]string, []map[string]any, error)
|
||||||
|
|
||||||
|
// WalkTableRecord 遍历指定表的数据
|
||||||
|
WalkTableRecord(tableName string, walk func(record map[string]any, columns []string)) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// ------------------------- 元数据sql操作 -------------------------
|
// ------------------------- 元数据sql操作 -------------------------
|
||||||
|
|||||||
@@ -160,3 +160,7 @@ func (mm *MysqlMetadata) GetCreateTableDdl(tableName string) string {
|
|||||||
func (mm *MysqlMetadata) GetTableRecord(tableName string, pageNum, pageSize int) ([]string, []map[string]any, error) {
|
func (mm *MysqlMetadata) GetTableRecord(tableName string, pageNum, pageSize int) ([]string, []map[string]any, error) {
|
||||||
return mm.di.SelectData(fmt.Sprintf("SELECT * FROM %s LIMIT %d, %d", tableName, (pageNum-1)*pageSize, pageSize))
|
return mm.di.SelectData(fmt.Sprintf("SELECT * FROM %s LIMIT %d, %d", tableName, (pageNum-1)*pageSize, pageSize))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mm *MysqlMetadata) WalkTableRecord(tableName string, walk func(record map[string]any, columns []string)) error {
|
||||||
|
return mm.di.WalkTableRecord(fmt.Sprintf("SELECT * FROM %s", tableName), walk)
|
||||||
|
}
|
||||||
|
|||||||
@@ -183,3 +183,7 @@ func (pm *PgsqlMetadata) GetCreateTableDdl(tableName string) string {
|
|||||||
func (pm *PgsqlMetadata) GetTableRecord(tableName string, pageNum, pageSize int) ([]string, []map[string]any, error) {
|
func (pm *PgsqlMetadata) GetTableRecord(tableName string, pageNum, pageSize int) ([]string, []map[string]any, error) {
|
||||||
return pm.di.SelectData(fmt.Sprintf("SELECT * FROM %s OFFSET %d LIMIT %d", tableName, (pageNum-1)*pageSize, pageSize))
|
return pm.di.SelectData(fmt.Sprintf("SELECT * FROM %s OFFSET %d LIMIT %d", tableName, (pageNum-1)*pageSize, pageSize))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (pm *PgsqlMetadata) WalkTableRecord(tableName string, walk func(record map[string]any, columns []string)) error {
|
||||||
|
return pm.di.WalkTableRecord(fmt.Sprintf("SELECT * FROM %s", tableName), walk)
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user