2024-01-12 13:15:30 +08:00
package dm
2023-12-06 14:50:02 +08:00
import (
"context"
"database/sql"
"fmt"
2024-01-12 13:15:30 +08:00
"mayfly-go/internal/db/dbm/dbi"
2023-12-06 14:50:02 +08:00
"mayfly-go/pkg/errorx"
2024-01-05 05:31:32 +00:00
"mayfly-go/pkg/logx"
2023-12-06 14:50:02 +08:00
"mayfly-go/pkg/utils/anyx"
2024-01-18 17:18:17 +08:00
"mayfly-go/pkg/utils/collx"
2024-01-23 04:08:02 +00:00
"mayfly-go/pkg/utils/stringx"
2024-01-05 05:31:32 +00:00
"regexp"
2023-12-06 14:50:02 +08:00
"strings"
2024-01-05 05:31:32 +00:00
"time"
2023-12-07 11:48:17 +08:00
2024-01-24 17:01:17 +08:00
"github.com/kanzihuang/vitess/go/vt/sqlparser"
2023-12-07 11:48:17 +08:00
_ "gitee.com/chunanyong/dm"
2023-12-06 14:50:02 +08:00
)
const (
DM_META_FILE = "metasql/dm_meta.sql"
DM_DB_SCHEMAS = "DM_DB_SCHEMAS"
DM_TABLE_INFO_KEY = "DM_TABLE_INFO"
DM_INDEX_INFO_KEY = "DM_INDEX_INFO"
DM_COLUMN_MA_KEY = "DM_COLUMN_MA"
)
type DMDialect struct {
2024-01-12 13:15:30 +08:00
dc * dbi . DbConn
2023-12-06 14:50:02 +08:00
}
2024-01-12 13:15:30 +08:00
func ( dd * DMDialect ) GetDbServer ( ) ( * dbi . DbServer , error ) {
2023-12-20 17:29:16 +08:00
_ , res , err := dd . dc . Query ( "select * from v$instance" )
if err != nil {
return nil , err
}
2024-01-12 13:15:30 +08:00
ds := & dbi . DbServer {
2023-12-20 17:29:16 +08:00
Version : anyx . ConvString ( res [ 0 ] [ "SVR_VERSION" ] ) ,
}
return ds , nil
}
2024-01-05 08:55:34 +08:00
func ( dd * DMDialect ) GetDbNames ( ) ( [ ] string , error ) {
_ , res , err := dd . dc . Query ( "SELECT name AS DBNAME FROM v$database" )
2023-12-06 14:50:02 +08:00
if err != nil {
return nil , err
}
databases := make ( [ ] string , 0 )
for _ , re := range res {
2023-12-13 15:11:26 +08:00
databases = append ( databases , anyx . ConvString ( re [ "DBNAME" ] ) )
2023-12-06 14:50:02 +08:00
}
return databases , nil
}
// 获取表基础元信息, 如表名等
2024-01-12 13:15:30 +08:00
func ( dd * DMDialect ) GetTables ( ) ( [ ] dbi . Table , error ) {
2023-12-13 21:42:06 +08:00
// 首先执行更新统计信息sql 这个统计信息在数据量比较大的时候就比较耗时,所以最好定时执行
// _, _, err := pd.dc.Query("dbms_stats.GATHER_SCHEMA_stats(SELECT SF_GET_SCHEMA_NAME_BY_ID(CURRENT_SCHID))")
// 查询表信息
2024-01-12 13:15:30 +08:00
_ , res , err := dd . dc . Query ( dbi . GetLocalSql ( DM_META_FILE , DM_TABLE_INFO_KEY ) )
2023-12-06 14:50:02 +08:00
if err != nil {
return nil , err
}
2024-01-12 13:15:30 +08:00
tables := make ( [ ] dbi . Table , 0 )
2023-12-06 14:50:02 +08:00
for _ , re := range res {
2024-01-12 13:15:30 +08:00
tables = append ( tables , dbi . Table {
2024-01-18 17:18:17 +08:00
TableName : anyx . ConvString ( re [ "TABLE_NAME" ] ) ,
2023-12-13 15:11:26 +08:00
TableComment : anyx . ConvString ( re [ "TABLE_COMMENT" ] ) ,
CreateTime : anyx . ConvString ( re [ "CREATE_TIME" ] ) ,
2023-12-13 21:42:06 +08:00
TableRows : anyx . ConvInt ( re [ "TABLE_ROWS" ] ) ,
2023-12-13 15:11:26 +08:00
DataLength : anyx . ConvInt64 ( re [ "DATA_LENGTH" ] ) ,
2023-12-13 21:42:06 +08:00
IndexLength : anyx . ConvInt64 ( re [ "INDEX_LENGTH" ] ) ,
2023-12-06 14:50:02 +08:00
} )
}
return tables , nil
}
// 获取列元信息, 如列名等
2024-01-12 13:15:30 +08:00
func ( dd * DMDialect ) GetColumns ( tableNames ... string ) ( [ ] dbi . Column , error ) {
2024-01-18 17:18:17 +08:00
dbType := dd . dc . Info . Type
tableName := strings . Join ( collx . ArrayMap [ string , string ] ( tableNames , func ( val string ) string {
return fmt . Sprintf ( "'%s'" , dbType . RemoveQuote ( val ) )
} ) , "," )
2023-12-06 14:50:02 +08:00
2024-01-12 13:15:30 +08:00
_ , res , err := dd . dc . Query ( fmt . Sprintf ( dbi . GetLocalSql ( DM_META_FILE , DM_COLUMN_MA_KEY ) , tableName ) )
2023-12-06 14:50:02 +08:00
if err != nil {
return nil , err
}
2024-01-12 13:15:30 +08:00
columns := make ( [ ] dbi . Column , 0 )
2023-12-06 14:50:02 +08:00
for _ , re := range res {
2024-01-12 13:15:30 +08:00
columns = append ( columns , dbi . Column {
2024-01-18 17:18:17 +08:00
TableName : anyx . ConvString ( re [ "TABLE_NAME" ] ) ,
ColumnName : anyx . ConvString ( re [ "COLUMN_NAME" ] ) ,
2023-12-13 15:11:26 +08:00
ColumnType : anyx . ConvString ( re [ "COLUMN_TYPE" ] ) ,
ColumnComment : anyx . ConvString ( re [ "COLUMN_COMMENT" ] ) ,
Nullable : anyx . ConvString ( re [ "NULLABLE" ] ) ,
ColumnKey : anyx . ConvString ( re [ "COLUMN_KEY" ] ) ,
ColumnDefault : anyx . ConvString ( re [ "COLUMN_DEFAULT" ] ) ,
NumScale : anyx . ConvString ( re [ "NUM_SCALE" ] ) ,
2023-12-06 14:50:02 +08:00
} )
}
return columns , nil
}
2024-01-05 08:55:34 +08:00
func ( dd * DMDialect ) GetPrimaryKey ( tablename string ) ( string , error ) {
columns , err := dd . GetColumns ( tablename )
2023-12-06 14:50:02 +08:00
if err != nil {
return "" , err
}
if len ( columns ) == 0 {
return "" , errorx . NewBiz ( "[%s] 表不存在" , tablename )
}
for _ , v := range columns {
if v . ColumnKey == "PRI" {
return v . ColumnName , nil
}
}
return columns [ 0 ] . ColumnName , nil
}
// 获取表索引信息
2024-01-12 13:15:30 +08:00
func ( dd * DMDialect ) GetTableIndex ( tableName string ) ( [ ] dbi . Index , error ) {
_ , res , err := dd . dc . Query ( fmt . Sprintf ( dbi . GetLocalSql ( DM_META_FILE , DM_INDEX_INFO_KEY ) , tableName ) )
2023-12-06 14:50:02 +08:00
if err != nil {
return nil , err
}
2024-01-12 13:15:30 +08:00
indexs := make ( [ ] dbi . Index , 0 )
2023-12-06 14:50:02 +08:00
for _ , re := range res {
2024-01-12 13:15:30 +08:00
indexs = append ( indexs , dbi . Index {
2024-01-18 17:18:17 +08:00
IndexName : anyx . ConvString ( re [ "INDEX_NAME" ] ) ,
2023-12-13 15:11:26 +08:00
ColumnName : anyx . ConvString ( re [ "COLUMN_NAME" ] ) ,
IndexType : anyx . ConvString ( re [ "INDEX_TYPE" ] ) ,
IndexComment : anyx . ConvString ( re [ "INDEX_COMMENT" ] ) ,
NonUnique : anyx . ConvInt ( re [ "NON_UNIQUE" ] ) ,
SeqInIndex : anyx . ConvInt ( re [ "SEQ_IN_INDEX" ] ) ,
2023-12-06 14:50:02 +08:00
} )
}
// 把查询结果以索引名分组,索引字段以逗号连接
2024-01-12 13:15:30 +08:00
result := make ( [ ] dbi . Index , 0 )
2023-12-06 14:50:02 +08:00
key := ""
for _ , v := range indexs {
// 当前的索引名
in := v . IndexName
if key == in {
// 索引字段已根据名称和顺序排序,故取最后一个即可
i := len ( result ) - 1
// 同索引字段以逗号连接
result [ i ] . ColumnName = result [ i ] . ColumnName + "," + v . ColumnName
} else {
key = in
result = append ( result , v )
}
}
return result , nil
}
// 获取建表ddl
2024-01-05 08:55:34 +08:00
func ( dd * DMDialect ) GetTableDDL ( tableName string ) ( string , error ) {
2023-12-06 14:50:02 +08:00
ddlSql := fmt . Sprintf ( "CALL SP_TABLEDEF((SELECT SF_GET_SCHEMA_NAME_BY_ID(CURRENT_SCHID)), '%s')" , tableName )
2024-01-05 08:55:34 +08:00
_ , res , err := dd . dc . Query ( ddlSql )
2023-12-06 14:50:02 +08:00
if err != nil {
return "" , err
}
2023-12-13 15:11:26 +08:00
// 建表ddl
var builder strings . Builder
for _ , re := range res {
builder . WriteString ( re [ "COLUMN_VALUE" ] . ( string ) )
}
2023-12-06 14:50:02 +08:00
2023-12-13 15:11:26 +08:00
// 表注释
2024-01-05 08:55:34 +08:00
_ , res , err = dd . dc . Query ( fmt . Sprintf ( `
2024-01-05 05:31:32 +00:00
select OWNER , COMMENTS from ALL_TAB_COMMENTS where TABLE_TYPE = ' TABLE ' and TABLE_NAME = ' % s '
2023-12-13 21:42:06 +08:00
and owner = ( SELECT SF_GET_SCHEMA_NAME_BY_ID ( CURRENT_SCHID ) )
` , tableName ) )
2023-12-20 17:29:16 +08:00
if err != nil {
return "" , err
}
for _ , re := range res {
// COMMENT ON TABLE "SYS_MENU" IS '菜单表';
if re [ "COMMENTS" ] != nil {
tableComment := fmt . Sprintf ( "\n\nCOMMENT ON TABLE \"%s\".\"%s\" IS '%s';" , re [ "OWNER" ] . ( string ) , tableName , re [ "COMMENTS" ] . ( string ) )
builder . WriteString ( tableComment )
2023-12-13 15:11:26 +08:00
}
}
// 字段注释
fieldSql := fmt . Sprintf ( `
2023-12-13 21:42:06 +08:00
SELECT OWNER , COLUMN_NAME , COMMENTS
2023-12-13 15:11:26 +08:00
FROM USER_COL_COMMENTS
WHERE OWNER = ( SELECT SF_GET_SCHEMA_NAME_BY_ID ( CURRENT_SCHID ) )
AND TABLE_NAME = ' % s '
` , tableName )
2024-01-05 08:55:34 +08:00
_ , res , err = dd . dc . Query ( fieldSql )
2023-12-20 17:29:16 +08:00
if err != nil {
return "" , err
}
builder . WriteString ( "\n" )
for _ , re := range res {
// COMMENT ON COLUMN "SYS_MENU"."BIZ_CODE" IS '业务编码, 应用编码1';
if re [ "COMMENTS" ] != nil {
fieldComment := fmt . Sprintf ( "\nCOMMENT ON COLUMN \"%s\".\"%s\".\"%s\" IS '%s';" , re [ "OWNER" ] . ( string ) , tableName , re [ "COLUMN_NAME" ] . ( string ) , re [ "COMMENTS" ] . ( string ) )
builder . WriteString ( fieldComment )
2023-12-13 21:42:06 +08:00
}
}
// 索引信息
indexSql := fmt . Sprintf ( `
2024-01-05 05:31:32 +00:00
select indexdef ( b . object_id , 1 ) as INDEX_DEF from ALL_INDEXES a
join ALL_objects b on a . owner = b . owner and b . object_name = a . index_name and b . object_type = ' INDEX '
2023-12-13 21:42:06 +08:00
where a . owner = ( SELECT SF_GET_SCHEMA_NAME_BY_ID ( CURRENT_SCHID ) )
and a . table_name = ' % s '
and indexdef ( b . object_id , 1 ) != ' 禁止查看系统定义的索引信息 '
` , tableName )
2024-01-05 08:55:34 +08:00
_ , res , err = dd . dc . Query ( indexSql )
2023-12-20 17:29:16 +08:00
if err != nil {
return "" , err
}
for _ , re := range res {
builder . WriteString ( "\n\n" + re [ "INDEX_DEF" ] . ( string ) )
2023-12-13 15:11:26 +08:00
}
2023-12-13 21:42:06 +08:00
2023-12-13 15:11:26 +08:00
return builder . String ( ) , nil
2023-12-06 14:50:02 +08:00
}
2024-01-12 13:15:30 +08:00
func ( dd * DMDialect ) WalkTableRecord ( tableName string , walkFn dbi . WalkQueryRowsFunc ) error {
2024-01-06 22:36:50 +08:00
return dd . dc . WalkQueryRows ( context . Background ( ) , fmt . Sprintf ( "SELECT * FROM %s" , tableName ) , walkFn )
2023-12-06 14:50:02 +08:00
}
// 获取DM当前连接的库可访问的schemaNames
2024-01-05 08:55:34 +08:00
func ( dd * DMDialect ) GetSchemas ( ) ( [ ] string , error ) {
2024-01-12 13:15:30 +08:00
sql := dbi . GetLocalSql ( DM_META_FILE , DM_DB_SCHEMAS )
2024-01-05 08:55:34 +08:00
_ , res , err := dd . dc . Query ( sql )
2023-12-06 14:50:02 +08:00
if err != nil {
return nil , err
}
schemaNames := make ( [ ] string , 0 )
for _ , re := range res {
2023-12-13 15:11:26 +08:00
schemaNames = append ( schemaNames , anyx . ConvString ( re [ "SCHEMA_NAME" ] ) )
2023-12-06 14:50:02 +08:00
}
return schemaNames , nil
}
2024-01-05 08:55:34 +08:00
// GetDbProgram 获取数据库程序模块,用于数据库备份与恢复
2024-01-12 13:15:30 +08:00
func ( dd * DMDialect ) GetDbProgram ( ) dbi . DbProgram {
2024-01-05 08:55:34 +08:00
panic ( "implement me" )
}
2024-01-05 05:31:32 +00:00
2024-01-24 08:29:16 +00:00
var (
// 数字类型
numberRegexp = regexp . MustCompile ( ` (?i)int|double|float|number|decimal|byte|bit ` )
2024-01-05 05:31:32 +00:00
// 日期时间类型
2024-01-24 08:29:16 +00:00
datetimeRegexp = regexp . MustCompile ( ` (?i)datetime|timestamp ` )
2024-01-05 05:31:32 +00:00
// 日期类型
2024-01-24 08:29:16 +00:00
dateRegexp = regexp . MustCompile ( ` (?i)date ` )
2024-01-05 05:31:32 +00:00
// 时间类型
2024-01-24 08:29:16 +00:00
timeRegexp = regexp . MustCompile ( ` (?i)time ` )
)
2024-01-05 05:31:32 +00:00
2024-01-08 11:24:37 +08:00
func ( dd * DMDialect ) BatchInsert ( tx * sql . Tx , tableName string , columns [ ] string , values [ ] [ ] any ) ( int64 , error ) {
2024-01-05 05:31:32 +00:00
// 执行批量insert sql
// insert into "table_name" ("column1", "column2", ...) values (value1, value2, ...)
2024-01-06 22:36:50 +08:00
// 生成占位符字符串:如:(?,?)
// 重复字符串并用逗号连接
repeated := strings . Repeat ( "?," , len ( columns ) )
// 去除最后一个逗号,占位符由括号包裹
placeholder := fmt . Sprintf ( "(%s)" , strings . TrimSuffix ( repeated , "," ) )
2024-01-11 12:35:44 +08:00
sqlTemp := fmt . Sprintf ( "insert into %s (%s) values %s" , dd . dc . Info . Type . QuoteIdentifier ( tableName ) , strings . Join ( columns , "," ) , placeholder )
2024-01-06 22:36:50 +08:00
effRows := 0
2024-01-05 05:31:32 +00:00
for _ , value := range values {
// 达梦数据库只能一条条的执行insert
2024-01-08 11:24:37 +08:00
er , err := dd . dc . TxExec ( tx , sqlTemp , value ... )
2024-01-05 05:31:32 +00:00
if err != nil {
logx . Errorf ( "执行sql失败: %s" , err . Error ( ) )
2024-01-06 22:36:50 +08:00
return int64 ( effRows ) , err
2024-01-05 05:31:32 +00:00
}
2024-01-06 22:36:50 +08:00
effRows += int ( er )
2024-01-05 05:31:32 +00:00
}
// 执行批量insert sql
2024-01-06 22:36:50 +08:00
return int64 ( effRows ) , nil
2024-01-05 05:31:32 +00:00
}
2024-01-24 08:29:16 +00:00
func ( dd * DMDialect ) GetDataConverter ( ) dbi . DataConverter {
return new ( DataConverter )
}
2024-01-24 17:01:17 +08:00
type DataConverter struct {
}
2024-01-24 08:29:16 +00:00
func ( dd * DataConverter ) GetDataType ( dbColumnType string ) dbi . DataType {
if numberRegexp . MatchString ( dbColumnType ) {
return dbi . DataTypeNumber
}
if datetimeRegexp . MatchString ( dbColumnType ) {
return dbi . DataTypeDateTime
}
if dateRegexp . MatchString ( dbColumnType ) {
return dbi . DataTypeDate
}
if timeRegexp . MatchString ( dbColumnType ) {
return dbi . DataTypeTime
}
return dbi . DataTypeString
}
func ( dd * DataConverter ) FormatData ( dbColumnValue any , dataType dbi . DataType ) string {
str := anyx . ToString ( dbColumnValue )
2024-01-05 05:31:32 +00:00
switch dataType {
2024-01-12 13:15:30 +08:00
case dbi . DataTypeDateTime : // "2024-01-02T22:08:22.275697+08:00"
2024-01-24 08:29:16 +00:00
res , _ := time . Parse ( time . RFC3339 , str )
2024-01-05 05:31:32 +00:00
return res . Format ( time . DateTime )
2024-01-12 13:15:30 +08:00
case dbi . DataTypeDate : // "2024-01-02T00:00:00+08:00"
2024-01-24 08:29:16 +00:00
res , _ := time . Parse ( time . RFC3339 , str )
2024-01-05 05:31:32 +00:00
return res . Format ( time . DateOnly )
2024-01-12 13:15:30 +08:00
case dbi . DataTypeTime : // "0000-01-01T22:08:22.275688+08:00"
2024-01-24 08:29:16 +00:00
res , _ := time . Parse ( time . RFC3339 , str )
2024-01-05 05:31:32 +00:00
return res . Format ( time . TimeOnly )
}
2024-01-24 08:29:16 +00:00
return str
}
func ( dd * DataConverter ) ParseData ( dbColumnValue any , dataType dbi . DataType ) any {
2024-01-05 05:31:32 +00:00
return dbColumnValue
}
2024-01-23 04:08:02 +00:00
func ( dd * DMDialect ) CopyTable ( copy * dbi . DbCopyTable ) error {
tableName := copy . TableName
ddl , err := dd . GetTableDDL ( tableName )
if err != nil {
return err
}
// 生成新表名,为老表明+_copy_时间戳
newTableName := tableName + "_copy_" + time . Now ( ) . Format ( "20060102150405" )
// 替换新表名
ddl = strings . ReplaceAll ( ddl , fmt . Sprintf ( "\"%s\"" , strings . ToUpper ( tableName ) ) , fmt . Sprintf ( "\"%s\"" , strings . ToUpper ( newTableName ) ) )
// 去除空格换行
ddl = stringx . TrimSpaceAndBr ( ddl )
sqls , err := sqlparser . SplitStatementToPieces ( ddl , sqlparser . WithDialect ( dd . dc . Info . Type . Dialect ( ) ) )
for _ , sql := range sqls {
_ , _ = dd . dc . Exec ( sql )
}
// 复制数据
if copy . CopyData {
go func ( ) {
// 设置允许填充自增列之后,显示指定列名可以插入自增列
_ , _ = dd . dc . Exec ( fmt . Sprintf ( "set identity_insert \"%s\" on" , newTableName ) )
// 获取列名
columns , _ := dd . GetColumns ( tableName )
columnArr := make ( [ ] string , 0 )
for _ , column := range columns {
columnArr = append ( columnArr , fmt . Sprintf ( "\"%s\"" , column . ColumnName ) )
}
columnStr := strings . Join ( columnArr , "," )
// 插入新数据并显示指定列
_ , _ = dd . dc . Exec ( fmt . Sprintf ( "insert into \"%s\" (%s) select %s from \"%s\"" , newTableName , columnStr , columnStr , tableName ) )
// 执行完成后关闭允许填充自增列
_ , _ = dd . dc . Exec ( fmt . Sprintf ( "set identity_insert \"%s\" off" , newTableName ) )
} ( )
}
return err
}