Files
mayfly-go/server/internal/db/dbm/postgres/dialect.go
zongyangleo 2acc295259 !110 feat: 支持各源数据库导出sql,数据库迁移部分bug修复
* feat: 各源数据库导出
* fix: 数据库迁移 bug修复
2024-03-26 09:05:28 +00:00

225 lines
7.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package postgres
import (
"database/sql"
"fmt"
"mayfly-go/internal/db/dbm/dbi"
"mayfly-go/pkg/utils/anyx"
"mayfly-go/pkg/utils/collx"
"strings"
"time"
"github.com/may-fly/cast"
)
type PgsqlDialect struct {
dbi.DefaultDialect
dc *dbi.DbConn
}
func (pd *PgsqlDialect) BatchInsert(tx *sql.Tx, tableName string, columns []string, values [][]any, duplicateStrategy int) (int64, error) {
// 执行批量insert sql跟mysql一样 pg或高斯支持批量insert语法
// insert into table_name (column1, column2, ...) values (value1, value2, ...), (value1, value2, ...), ...
// 把二维数组转为一维数组
var args []any
for _, v := range values {
args = append(args, v...)
}
// 构建占位符字符串 "($1, $2, $3), ($4, $5, $6), ..." 用于指定参数
var placeholders []string
for i := 0; i < len(args); i += len(columns) {
var placeholder []string
for j := 0; j < len(columns); j++ {
placeholder = append(placeholder, fmt.Sprintf("$%d", i+j+1))
}
placeholders = append(placeholders, "("+strings.Join(placeholder, ", ")+")")
}
// 根据冲突策略生成后缀
suffix := ""
if pd.dc.Info.Type == dbi.DbTypeGauss {
// 高斯db使用ON DUPLICATE KEY UPDATE 语法参考 https://support.huaweicloud.com/distributed-devg-v3-gaussdb/gaussdb-12-0607.html#ZH-CN_TOPIC_0000001633948138
suffix = pd.gaussOnDuplicateStrategySql(duplicateStrategy, tableName, columns)
} else {
// pgsql 默认使用 on conflict 语法参考 http://www.postgres.cn/docs/12/sql-insert.html
// vastbase语法参考 https://docs.vastdata.com.cn/zh/docs/VastbaseE100Ver3.0.0/doc/SQL%E8%AF%AD%E6%B3%95/INSERT.html
// kingbase语法参考 https://help.kingbase.com.cn/v8/development/sql-plsql/sql/SQL_Statements_9.html#insert
suffix = pd.pgsqlOnDuplicateStrategySql(duplicateStrategy, tableName, columns)
}
sqlStr := fmt.Sprintf("insert into %s (%s) values %s %s", pd.dc.GetMetaData().QuoteIdentifier(tableName), strings.Join(columns, ","), strings.Join(placeholders, ", "), suffix)
// 执行批量insert sql
return pd.dc.TxExec(tx, sqlStr, args...)
}
// pgsql默认唯一键冲突策略
func (pd *PgsqlDialect) pgsqlOnDuplicateStrategySql(duplicateStrategy int, tableName string, columns []string) string {
suffix := ""
if duplicateStrategy == dbi.DuplicateStrategyIgnore {
suffix = " \n on conflict do nothing"
} else if duplicateStrategy == dbi.DuplicateStrategyUpdate {
// 生成 on conflict () do update set column1 = excluded.column1, column2 = excluded.column2, ...
var updateColumns []string
for _, col := range columns {
updateColumns = append(updateColumns, fmt.Sprintf("%s = excluded.%s", col, col))
}
// 查询唯一键名,拼接冲突sql
_, keyRes, _ := pd.dc.Query("SELECT constraint_name FROM information_schema.table_constraints WHERE constraint_schema = $1 AND table_name = $2 AND constraint_type in ('PRIMARY KEY', 'UNIQUE') ", pd.dc.Info.CurrentSchema(), tableName)
if len(keyRes) > 0 {
for _, re := range keyRes {
key := anyx.ToString(re["constraint_name"])
if key != "" {
suffix += fmt.Sprintf(" \n on conflict on constraint %s do update set %s \n", key, strings.Join(updateColumns, ", "))
}
}
}
}
return suffix
}
// 高斯db唯一键冲突策略,使用ON DUPLICATE KEY UPDATE 参考https://support.huaweicloud.com/distributed-devg-v3-gaussdb/gaussdb-12-0607.html#ZH-CN_TOPIC_0000001633948138
func (pd *PgsqlDialect) gaussOnDuplicateStrategySql(duplicateStrategy int, tableName string, columns []string) string {
suffix := ""
metadata := pd.dc.GetMetaData()
if duplicateStrategy == dbi.DuplicateStrategyIgnore {
suffix = " \n ON DUPLICATE KEY UPDATE NOTHING"
} else if duplicateStrategy == dbi.DuplicateStrategyUpdate {
// 查出表里的唯一键涉及的字段
var uniqueColumns []string
indexs, err := metadata.GetTableIndex(tableName)
if err == nil {
for _, index := range indexs {
if index.IsUnique {
cols := strings.Split(index.ColumnName, ",")
for _, col := range cols {
if !collx.ArrayContains(uniqueColumns, strings.ToLower(col)) {
uniqueColumns = append(uniqueColumns, strings.ToLower(col))
}
}
}
}
}
suffix = " \n ON DUPLICATE KEY UPDATE "
for i, col := range columns {
// ON DUPLICATE KEY UPDATE语句不支持更新唯一键字段所以得去掉
if !collx.ArrayContains(uniqueColumns, metadata.RemoveQuote(strings.ToLower(col))) {
suffix += fmt.Sprintf("%s = excluded.%s", col, col)
if i < len(columns)-1 {
suffix += ", "
}
}
}
}
return suffix
}
func (pd *PgsqlDialect) CopyTable(copy *dbi.DbCopyTable) error {
tableName := copy.TableName
// 生成新表名,为老表明+_copy_时间戳
newTableName := tableName + "_copy_" + time.Now().Format("20060102150405")
// 执行根据旧表创建新表
_, err := pd.dc.Exec(fmt.Sprintf("create table %s (like %s)", newTableName, tableName))
if err != nil {
return err
}
// 复制数据
if copy.CopyData {
go func() {
_, _ = pd.dc.Exec(fmt.Sprintf("insert into %s select * from %s", newTableName, tableName))
}()
}
// 查询旧表的自增字段名 重新设置新表的序列序列器
_, res, err := pd.dc.Query(fmt.Sprintf("select column_name from information_schema.columns where table_name = '%s' and column_default like 'nextval%%'", tableName))
if err != nil {
return err
}
for _, re := range res {
colName := cast.ToString(re["column_name"])
if colName != "" {
// 查询自增列当前最大值
_, maxRes, err := pd.dc.Query(fmt.Sprintf("select max(%s) max_val from %s", colName, tableName))
if err != nil {
return err
}
maxVal := cast.ToInt(maxRes[0]["max_val"])
// 序列起始值为1或当前最大值+1
if maxVal <= 0 {
maxVal = 1
} else {
maxVal += 1
}
// 之所以不用tableName_colName_seq是因为gauss会自动创建同名的序列且无法修改序列起始值所以直接使用新序列值
newSeqName := fmt.Sprintf("%s_%s_copy_seq", newTableName, colName)
// 创建自增序列,当前最大值为旧表最大值
_, err = pd.dc.Exec(fmt.Sprintf("CREATE SEQUENCE %s START %d INCREMENT 1", newSeqName, maxVal))
if err != nil {
return err
}
// 将新表的自增主键序列与主键列相关联
_, err = pd.dc.Exec(fmt.Sprintf("alter table %s alter column %s set default nextval('%s')", newTableName, colName, newSeqName))
if err != nil {
return err
}
}
}
return err
}
func (pd *PgsqlDialect) ToCommonColumn(column *dbi.Column) {
// 翻译为通用数据库类型
dataType := column.DataType
t1 := commonColumnTypeMap[string(dataType)]
if t1 == "" {
column.DataType = dbi.CommonTypeVarchar
column.CharMaxLength = 2000
} else {
column.DataType = t1
}
}
func (pd *PgsqlDialect) ToColumn(commonColumn *dbi.Column) {
ctype := pgsqlColumnTypeMap[commonColumn.DataType]
if ctype == "" {
commonColumn.DataType = "varchar"
commonColumn.CharMaxLength = 2000
} else {
commonColumn.DataType = dbi.ColumnDataType(ctype)
}
}
func (pd *PgsqlDialect) CreateTable(commonColumns []dbi.Column, tableInfo dbi.Table, dropOldTable bool) (int, error) {
meta := pd.dc.GetMetaData()
sqlArr := meta.GenerateTableDDL(commonColumns, tableInfo, dropOldTable)
_, err := pd.dc.Exec(strings.Join(sqlArr, ";"))
return len(sqlArr), err
}
func (pd *PgsqlDialect) CreateIndex(tableInfo dbi.Table, indexs []dbi.Index) error {
sqlArr := pd.dc.GetMetaData().GenerateIndexDDL(indexs, tableInfo)
_, err := pd.dc.Exec(strings.Join(sqlArr, ";"))
return err
}
func (pd *PgsqlDialect) UpdateSequence(tableName string, columns []dbi.Column) {
for _, column := range columns {
if column.IsIdentity {
_, _ = pd.dc.Exec(fmt.Sprintf("select setval('%s_%s_seq', (SELECT max(%s) from %s))", tableName, column.ColumnName, column.ColumnName, tableName))
}
}
}