feat: 支持达梦数据库查询

This commit is contained in:
刘宗洋
2023-12-06 14:50:02 +08:00
parent a376a82240
commit 84fd14c129
17 changed files with 571 additions and 12 deletions

View File

@@ -11,6 +11,7 @@
<el-select style="width: 100%" v-model="form.type" placeholder="请选择数据库类型">
<el-option key="item.id" label="mysql" value="mysql"> </el-option>
<el-option key="item.id" label="postgres" value="postgres"> </el-option>
<el-option key="item.id" label="达梦(暂不支持ssh)" value="dm"> </el-option>
</el-select>
</el-form-item>
<el-form-item prop="host" label="host" required>
@@ -79,7 +80,7 @@
</template>
<script lang="ts" setup>
import { toRefs, reactive, watch, ref } from 'vue';
import { reactive, ref, toRefs, watch } from 'vue';
import { dbApi } from './api';
import { ElMessage } from 'element-plus';
import { notBlank } from '@/common/assert';

View File

@@ -252,7 +252,7 @@ const NodeTypeDbInst = new NodeType(SqlExecNodeType.DbInst).withLoadNodesFunc((p
const NodeTypeDb = new NodeType(SqlExecNodeType.Db)
.withLoadNodesFunc(async (parentNode: TagTreeNode) => {
const params = parentNode.params;
if (params.type == 'postgres') {
if (params.type == 'postgres' || params.type === 'dm') {
return [new TagTreeNode(`${params.id}.${params.db}.schema-menu`, 'schema', NodeTypePostgresScheamMenu).withParams(params).withIcon(SchemaIcon)];
}

View File

@@ -197,7 +197,7 @@
</template>
<script lang="ts" setup>
import { onMounted, computed, watch, reactive, toRefs, ref, Ref, onUnmounted } from 'vue';
import { computed, onMounted, onUnmounted, reactive, Ref, ref, toRefs, watch } from 'vue';
import { notEmpty } from '@/common/assert';
import { ElMessage } from 'element-plus';
@@ -332,7 +332,7 @@ const selectData = async () => {
const table = props.tableName;
try {
const countRes = await dbInst.runSql(db, dbInst.getDefaultCountSql(table, state.condition));
state.count = countRes.res[0].count;
state.count = countRes.res[0].count || countRes.res[0].COUNT;
let sql = dbInst.getDefaultSelectSql(table, state.condition, state.orderBy, state.pageNum, state.pageSize);
state.sql = sql;
if (state.count > 0) {

View File

@@ -0,0 +1,281 @@
import { DbDialect, sqlColumnType } from './index';
import { SqlLanguage } from 'sql-formatter/lib/src/sqlFormatter';
export { DMDialect, GAUSS_TYPE_LIST };
const GAUSS_TYPE_LIST: sqlColumnType[] = [
// 数值 - 整数型
{ udtName: 'int1', dataType: 'tinyint', desc: '微整数别名为INT1', space: '1字节', range: '0 ~ +255' },
{ udtName: 'int2', dataType: 'smallint', desc: '小范围整数别名为INT2。', space: '2字节', range: '-32,768 ~ +32,767' },
{ udtName: 'int4', dataType: 'integer', desc: '常用的整数别名为INT4。', space: '4字节', range: '-2,147,483,648 ~ +2,147,483,647' },
{ udtName: 'int8', dataType: 'bigint', desc: '大范围的整数别名为INT8。', space: '8字节', range: '很大' },
// 数值 - 任意精度型
{
udtName: 'numeric',
dataType: 'numeric',
desc: '精度(总位数)取值范围为[1,1000],标度(小数位数)取值范围为[0,精度]。',
space: '每四位(十进制位)占用两个字节,然后在整个数据上加上八个字节的额外开销',
range: '未指定精度的情况下小数点前最大131,072位小数点后最大16,383位',
},
// 数值 - 任意精度型
{ udtName: 'decimal', dataType: 'decimal', desc: '等同于number类型', space: '等同于number类型' },
// 数值 - 序列整型
{ udtName: 'smallserial', dataType: 'smallserial', desc: '二字节序列整型。', space: '2字节', range: '-32,768 ~ +32,767' },
{ udtName: 'serial', dataType: 'serial', desc: '四字节序列整型。', space: '4字节', range: '-2,147,483,648 ~ +2,147,483,647' },
{ udtName: 'bigserial', dataType: 'bigserial', desc: '八字节序列整型', space: '8字节', range: '-9,223,372,036,854,775,808 ~ +9,223,372,036,854,775,807' },
{
udtName: 'largeserial',
dataType: 'largeserial',
desc: '默认插入十六字节序列整型实际数值类型和numeric相同',
space: '变长类型,每四位(十进制位)占用两个字节,然后在整个数据上加上八个字节的额外开销。',
range: '小数点前最大131,072位小数点后最大16,383位。',
},
// 数值 - 浮点类型(不常用 就不列出来了)
// 货币类型
{ udtName: 'money', dataType: 'money', desc: '货币金额', space: '8字节', range: '-92233720368547758.08 ~ +92233720368547758.07' },
// 布尔类型
{ udtName: 'bool', dataType: 'bool', desc: '布尔类型', space: '1字节', range: 'true真 , false假 , null未知unknown' },
// 字符类型
{ udtName: 'char', dataType: 'char', desc: '定长字符串不足补空格。n是指字节长度如不带精度n默认精度为1。', space: '最大为10MB' },
{ udtName: 'character', dataType: 'character', desc: '定长字符串不足补空格。n是指字节长度如不带精度n默认精度为1。', space: '最大为10MB' },
{ udtName: 'nchar', dataType: 'nchar', desc: '定长字符串不足补空格。n是指字节长度如不带精度n默认精度为1。', space: '最大为10MB' },
{ udtName: 'varchar', dataType: 'varchar', desc: '变长字符串。PG兼容模式下n是字符长度。其他兼容模式下n是指字节长度。', space: '最大为10MB。' },
{ udtName: 'text', dataType: 'text', desc: '变长字符串。', space: '最大稍微小于1GB-1。' },
{ udtName: 'clob', dataType: 'clob', desc: '文本大对象。是TEXT类型的别名。', space: '最大稍微小于32TB-1。' },
//特殊字符类型 用的很少,先屏蔽了
// { udtName: 'name', dataType: 'name', desc: '用于对象名的内部类型。', space: '64字节。' },
// { udtName: '"char"', dataType: '"char"', desc: '单字节内部类型。', space: '1字节。' },
// 二进制类型
{ udtName: 'bytea', dataType: 'bytea', desc: '变长的二进制字符串', space: '4字节加上实际的二进制字符串。最大为1GB减去8203字节即1073733621字节。' },
// 日期/时间类型
{ udtName: 'date', dataType: 'date', desc: '日期', space: '4字节' },
{ udtName: 'time', dataType: 'time', desc: 'TIME [(p)] 只用于一日内时间,p表示小数点后的精度取值范围为0~6。', space: '8-12字节' },
{ udtName: 'timestamp', dataType: 'timestamp', desc: 'TIMESTAMP[(p)]日期和时间,p表示小数点后的精度取值范围为0~6', space: '8字节' },
// 带时区的时间戳用的少,先屏蔽了
//{ udtName: 'TIMESTAMPTZ', dataType: 'TIMESTAMP WITH TIME ZONE', desc: '带时区的时间戳', space: '8字节' },
{
udtName: 'interval',
dataType: 'interval',
desc: '时间间隔', // 可以跟参数YEARMONTHDAYHOURMINUTESECONDDAY TO HOURDAY TO MINUTEDAY TO SECONDHOUR TO MINUTEHOUR TO SECONDMINUTE TO SECOND
space: '精度取值范围为0~6且参数为SECONDDAY TO SECONDHOUR TO SECOND或MINUTE TO SECOND时参数p才有效',
},
// 几何类型
{ udtName: 'point', dataType: 'point', desc: '平面中的点, 如:(x,y)', space: '16字节' },
{ udtName: 'lseg', dataType: 'lseg', desc: '(有限)线段, 如:((x1,y1),(x2,y2))', space: '32字节' },
{ udtName: 'box', dataType: 'box', desc: '矩形, 如:((x1,y1),(x2,y2))', space: '32字节' },
{ udtName: 'path', dataType: 'path', desc: '闭合路径(与多边形类似), 如:((x1,y1),...)', space: '16+16n字节' },
{ udtName: 'path', dataType: 'path', desc: '开放路径(与多边形类似), 如:[(x1,y1),...]', space: '16+16n字节' },
{ udtName: 'polygon', dataType: 'polygon', desc: '多边形(与闭合路径相似), 如:((x1,y1),...)', space: '40+16n字节' },
{ udtName: 'circle', dataType: 'polygon', desc: '圆,如:<(x,y),r> (圆心和半径)', space: '24 字节' },
// 网络地址类型
{ udtName: 'cidr', dataType: 'cidr', desc: 'IPv4网络', space: '7字节' },
{ udtName: 'inet', dataType: 'inet', desc: 'IPv4主机和网络', space: '7字节' },
{ udtName: 'macaddr', dataType: 'macaddr', desc: 'MAC地址', space: '6字节' },
];
class DMDialect implements DbDialect {
getFormatDialect(): SqlLanguage {
return 'postgresql';
}
getIcon() {
return 'iconfont icon-op-postgres';
}
getDefaultSelectSql(table: string, condition: string, orderBy: string, pageNum: number, limit: number) {
return `SELECT * FROM ${this.wrapName(table)} ${condition ? 'WHERE ' + condition : ''} ${orderBy ? orderBy : ''} OFFSET ${
(pageNum - 1) * limit
} LIMIT ${limit};`;
}
wrapName = (name: string) => {
return name;
};
getColumnTypes(): sqlColumnType[] {
return GAUSS_TYPE_LIST.sort((a, b) => a.udtName.localeCompare(b.udtName));
}
matchType(text: string, arr: string[]): boolean {
if (!text || !arr || arr.length === 0) {
return false;
}
for (let i = 0; i < arr.length; i++) {
if (text.indexOf(arr[i]) > -1) {
return true;
}
}
return false;
}
getDefaultValueSql(cl: any): string {
if (cl.value && cl.value.length > 0) {
// 哪些字段默认值需要加引号
let marks = false;
if (this.matchType(cl.type, ['char', 'time', 'date', 'text'])) {
// 默认值是now()的time或date不需要加引号
if (cl.value.toLowerCase().replace(' ', '') === 'CURRENT_TIMESTAMP' && this.matchType(cl.type, ['time', 'date'])) {
marks = false;
} else {
marks = true;
}
}
// 哪些函数不需要加引号
if (this.matchType(cl.value, ['nextval'])) {
marks = false;
}
return ` DEFAULT ${marks ? "'" : ''}${cl.value}${marks ? "'" : ''}`;
}
return '';
}
getTypeLengthSql(cl: any) {
// 哪些字段可以指定长度
if (cl.length && this.matchType(cl.type, ['char', 'time', 'bit', 'num', 'decimal'])) {
// 哪些字段类型可以指定小数点
if (cl.numScale && this.matchType(cl.type, ['num', 'decimal'])) {
return `(${cl.length}, ${cl.numScale})`;
} else {
return `(${cl.length})`;
}
}
return '';
}
genColumnBasicSql(cl: any): string {
let length = this.getTypeLengthSql(cl);
// 默认值
let defVal = this.getDefaultValueSql(cl);
return ` ${cl.name} ${cl.type}${length} ${cl.notNull ? 'NOT NULL' : ''} ${defVal} `;
}
getCreateTableSql(data: any): string {
let createSql = '';
let tableCommentSql = '';
let columCommentSql = '';
// 创建表结构
let pks = [] as string[];
let fields: string[] = [];
data.fields.res.forEach((item: any) => {
item.name && fields.push(this.genColumnBasicSql(item));
if (item.pri) {
pks.push(item.name);
}
// 列注释
if (item.remark) {
columCommentSql += ` comment on column ${data.tableName}.${item.name} is '${item.remark}'; `;
}
});
// 建表
createSql = `CREATE TABLE ${data.tableName}
(
${fields.join(',')}
${pks ? `, PRIMARY KEY (${pks.join(',')})` : ''}
);`;
// 表注释
if (data.tableComment) {
tableCommentSql = ` comment on table ${data.tableName} is '${data.tableComment}'; `;
}
return createSql + tableCommentSql + columCommentSql;
}
getCreateIndexSql(tableData: any): string {
// CREATE UNIQUE INDEX idx_column_name ON your_table (column1, column2);
// COMMENT ON INDEX idx_column_name IS 'Your index comment here';
// 创建索引
let sql: string[] = [];
tableData.indexs.res.forEach((a: any) => {
sql.push(` CREATE ${a.unique ? 'UNIQUE' : ''} INDEX ${a.indexName} USING btree ("${a.columnNames.join('","')})"`);
if (a.indexComment) {
sql.push(`COMMENT ON INDEX ${a.indexName} IS '${a.indexComment}'`);
}
});
return sql.join(';');
}
getModifyColumnSql(tableName: string, changeData: { del: any[]; add: any[]; upd: any[] }): string {
let sql: string[] = [];
if (changeData.add.length > 0) {
changeData.add.forEach((a) => {
let typeLength = this.getTypeLengthSql(a);
let defaultSql = this.getDefaultValueSql(a);
sql.push(`ALTER TABLE ${tableName} add ${a.name} ${a.type}${typeLength} ${defaultSql}`);
});
}
if (changeData.upd.length > 0) {
changeData.upd.forEach((a) => {
let typeLength = this.getTypeLengthSql(a);
sql.push(`ALTER TABLE ${tableName} alter column ${a.name} type ${a.type}${typeLength}`);
let defaultSql = this.getDefaultValueSql(a);
if (defaultSql) {
sql.push(`alter table ${tableName} alter column ${a.name} set ${defaultSql}`);
}
});
}
if (changeData.del.length > 0) {
changeData.del.forEach((a) => {
sql.push(`ALTER TABLE ${tableName} DROP COLUMN ${a.name}`);
});
}
return sql.join(';');
}
getModifyIndexSql(tableName: string, changeData: { del: any[]; add: any[]; upd: any[] }): string {
// 不能直接修改索引名或字段、需要先删后加
let dropIndexNames: string[] = [];
let addIndexs: any[] = [];
if (changeData.upd.length > 0) {
changeData.upd.forEach((a) => {
dropIndexNames.push(a.indexName);
addIndexs.push(a);
});
}
if (changeData.del.length > 0) {
changeData.del.forEach((a) => {
dropIndexNames.push(a.indexName);
});
}
if (changeData.add.length > 0) {
changeData.add.forEach((a) => {
addIndexs.push(a);
});
}
if (dropIndexNames.length > 0 || addIndexs.length > 0) {
let sql: string[] = [];
if (dropIndexNames.length > 0) {
dropIndexNames.forEach((a) => {
sql.push(`DROP INDEX ${a}`);
});
}
if (addIndexs.length > 0) {
addIndexs.forEach((a) => {
sql.push(`CREATE ${a.unique ? 'UNIQUE' : ''} INDEX ${a.indexName}(${a.columnNames.join(',')})`);
if (a.indexComment) {
sql.push(`COMMENT ON INDEX ${a.indexName} IS '${a.indexComment}'`);
}
});
}
return sql.join(';');
}
return '';
}
}

View File

@@ -1,5 +1,7 @@
import { MysqlDialect } from './mysql_dialect';
import { PostgresqlDialect } from './postgres_dialect';
import { DMDialect } from '@/views/ops/db/dialect/dm_dialect';
import { SqlLanguage } from 'sql-formatter/lib/src/sqlFormatter';
export interface sqlColumnType {
udtName: string;
@@ -12,13 +14,14 @@ export interface sqlColumnType {
export const DbType = {
mysql: 'mysql',
postgresql: 'postgres',
dm: 'dm', // 达梦
};
export interface DbDialect {
/**
* 获取格式化sql对应的dialect名称
*/
getFormatDialect(): string;
getFormatDialect(): SqlLanguage;
/**
* 获取图标信息
@@ -75,6 +78,7 @@ export interface DbDialect {
let mysqlDialect = new MysqlDialect();
let postgresDialect = new PostgresqlDialect();
let dmDialect = new DMDialect();
export const getDbDialect = (dbType: string | undefined): DbDialect => {
if (dbType === DbType.mysql) {
@@ -83,5 +87,8 @@ export const getDbDialect = (dbType: string | undefined): DbDialect => {
if (dbType === DbType.postgresql) {
return postgresDialect;
}
if (dbType === DbType.dm) {
return dmDialect;
}
throw new Error('不支持的数据库');
};

View File

@@ -1,4 +1,5 @@
import { DbDialect, sqlColumnType } from './index';
import { SqlLanguage } from 'sql-formatter/lib/src/sqlFormatter';
export { MYSQL_TYPE_LIST, MysqlDialect, language };
@@ -30,7 +31,7 @@ const MYSQL_TYPE_LIST = [
];
class MysqlDialect implements DbDialect {
getFormatDialect() {
getFormatDialect(): SqlLanguage {
return 'mysql';
}

View File

@@ -1,4 +1,5 @@
import { DbDialect, sqlColumnType } from './index';
import { SqlLanguage } from 'sql-formatter/lib/src/sqlFormatter';
export { PostgresqlDialect, GAUSS_TYPE_LIST };
@@ -83,7 +84,7 @@ const GAUSS_TYPE_LIST: sqlColumnType[] = [
];
class PostgresqlDialect implements DbDialect {
getFormatDialect() {
getFormatDialect(): SqlLanguage {
return 'postgresql';
}

View File

@@ -3,6 +3,7 @@ module mayfly-go
go 1.21
require (
gitee.com/chunanyong/dm v1.8.13
gitee.com/liuzongyang/libpq v1.0.9
github.com/buger/jsonparser v1.1.1
github.com/gin-gonic/gin v1.9.1
@@ -33,6 +34,8 @@ require (
gorm.io/gorm v1.25.5
)
require golang.org/x/exp v0.0.0-20230519143937-03e91628a987
require (
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect
github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc // indirect
@@ -76,7 +79,6 @@ require (
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/exp v0.0.0-20230519143937-03e91628a987 // indirect
golang.org/x/image v0.0.0-20220302094943-723b81ca9867 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/sync v0.1.0 // indirect

View File

@@ -468,8 +468,8 @@ func (d *Db) GetCreateTableDdl(rc *req.Ctx) {
func (d *Db) GetPgsqlSchemas(rc *req.Ctx) {
conn := d.getDbConn(rc.GinCtx)
biz.IsTrue(conn.Info.Type == dbm.DbTypePostgres, "非postgres无法获取该schemas")
res, err := d.getDbConn(rc.GinCtx).GetDialect().(*dbm.PgsqlDialect).GetSchemas()
biz.IsTrue(conn.Info.Type == dbm.DbTypePostgres || conn.Info.Type == dbm.DM, "非postgres无法获取该schemas")
res, err := d.getDbConn(rc.GinCtx).GetDialect().GetSchemas()
biz.ErrIsNilAppendErr(err, "获取schemas失败: %s")
rc.ResData = res
}

View File

@@ -142,8 +142,8 @@ func (d *dbAppImpl) GetDbConn(dbId uint64, dbName string) (*dbm.DbConn, error) {
}
checkDb := dbName
// 兼容pgsql db/schema模式
if instance.Type == dbm.DbTypePostgres {
// 兼容pgsql/dm db/schema模式
if instance.Type == dbm.DbTypePostgres || instance.Type == dbm.DM {
ss := strings.Split(dbName, "/")
if len(ss) > 1 {
checkDb = ss[0]

View File

@@ -89,6 +89,8 @@ func (d *DbConn) GetDialect() DbDialect {
return &MysqlDialect{dc: d}
case DbTypePostgres:
return &PgsqlDialect{dc: d}
case DM:
return &DMDialect{dc: d}
default:
panic(fmt.Sprintf("invalid database type: %s", d.Info.Type))
}

View File

@@ -13,6 +13,7 @@ type DbType string
const (
DbTypeMysql DbType = "mysql"
DbTypePostgres DbType = "postgres"
DM DbType = "dm"
)
func (dbType DbType) MetaDbName() string {
@@ -21,6 +22,8 @@ func (dbType DbType) MetaDbName() string {
return "information_schema"
case DbTypePostgres:
return "postgres"
case DM:
return ""
default:
panic(fmt.Sprintf("invalid database type: %s", dbType))
}
@@ -56,6 +59,8 @@ func (dbType DbType) Dialect() sqlparser.Dialect {
return sqlparser.MysqlDialect{}
case DbTypePostgres:
return sqlparser.PostgresDialect{}
case DM:
return sqlparser.PostgresDialect{}
default:
panic(fmt.Sprintf("invalid database type: %s", dbType))
}

View File

@@ -67,6 +67,8 @@ type DbDialect interface {
// WalkTableRecord 遍历指定表的数据
WalkTableRecord(tableName string, walk func(record map[string]any, columns []string)) error
GetSchemas() ([]string, error)
}
// ------------------------- 元数据sql操作 -------------------------

View File

@@ -0,0 +1,195 @@
package dbm
import (
"context"
"database/sql"
"fmt"
_ "gitee.com/chunanyong/dm"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/utils/anyx"
"strings"
)
func getDmDB(d *DbInfo) (*sql.DB, error) {
driverName := "dm"
// SSH Conect 暂时不支持隧道连接
db := d.Database
var dbParam string
if db != "" {
// postgres database可以使用db/schema表示方便连接指定schema, 若不存在schema则使用默认schema
ss := strings.Split(db, "/")
if len(ss) > 1 {
dbParam = fmt.Sprintf("%s?schema=%s", ss[0], ss[len(ss)-1])
} else {
dbParam = db
}
}
dsn := fmt.Sprintf("dm://%s:%s@%s:%d/%s", d.Username, d.Password, d.Host, d.Port, dbParam)
return sql.Open(driverName, dsn)
}
// ---------------------------------- DM元数据 -----------------------------------
const (
DM_META_FILE = "metasql/dm_meta.sql"
DM_DB_SCHEMAS = "DM_DB_SCHEMAS"
DM_TABLE_INFO_KEY = "DM_TABLE_INFO"
DM_INDEX_INFO_KEY = "DM_INDEX_INFO"
DM_COLUMN_MA_KEY = "DM_COLUMN_MA"
)
type DMDialect struct {
dc *DbConn
}
func (pd *DMDialect) GetDbNames() ([]string, error) {
_, res, err := pd.dc.Query("SELECT name AS dbname FROM v$database")
if err != nil {
return nil, err
}
databases := make([]string, 0)
for _, re := range res {
databases = append(databases, anyx.ConvString(re["dbname"]))
}
return databases, nil
}
// 获取表基础元信息, 如表名等
func (pd *DMDialect) GetTables() ([]Table, error) {
_, res, err := pd.dc.Query(GetLocalSql(DM_META_FILE, DM_TABLE_INFO_KEY))
if err != nil {
return nil, err
}
tables := make([]Table, 0)
for _, re := range res {
tables = append(tables, Table{
TableName: re["tableName"].(string),
TableComment: anyx.ConvString(re["tableComment"]),
CreateTime: anyx.ConvString(re["createTime"]),
TableRows: anyx.ConvInt(re["tableRows"]),
DataLength: anyx.ConvInt64(re["dataLength"]),
IndexLength: anyx.ConvInt64(re["indexLength"]),
})
}
return tables, nil
}
// 获取列元信息, 如列名等
func (pd *DMDialect) GetColumns(tableNames ...string) ([]Column, error) {
tableName := ""
for i := 0; i < len(tableNames); i++ {
if i != 0 {
tableName = tableName + ", "
}
tableName = tableName + "'" + tableNames[i] + "'"
}
_, res, err := pd.dc.Query(fmt.Sprintf(GetLocalSql(DM_META_FILE, DM_COLUMN_MA_KEY), tableName))
if err != nil {
return nil, err
}
columns := make([]Column, 0)
for _, re := range res {
columns = append(columns, Column{
TableName: re["tableName"].(string),
ColumnName: re["columnName"].(string),
ColumnType: anyx.ConvString(re["columnType"]),
ColumnComment: anyx.ConvString(re["columnComment"]),
Nullable: anyx.ConvString(re["nullable"]),
ColumnKey: anyx.ConvString(re["columnKey"]),
ColumnDefault: anyx.ConvString(re["columnDefault"]),
NumScale: anyx.ConvString(re["numScale"]),
})
}
return columns, nil
}
func (pd *DMDialect) GetPrimaryKey(tablename string) (string, error) {
columns, err := pd.GetColumns(tablename)
if err != nil {
return "", err
}
if len(columns) == 0 {
return "", errorx.NewBiz("[%s] 表不存在", tablename)
}
for _, v := range columns {
if v.ColumnKey == "PRI" {
return v.ColumnName, nil
}
}
return columns[0].ColumnName, nil
}
// 获取表索引信息
func (pd *DMDialect) GetTableIndex(tableName string) ([]Index, error) {
_, res, err := pd.dc.Query(fmt.Sprintf(GetLocalSql(DM_META_FILE, DM_INDEX_INFO_KEY), tableName))
if err != nil {
return nil, err
}
indexs := make([]Index, 0)
for _, re := range res {
indexs = append(indexs, Index{
IndexName: re["indexName"].(string),
ColumnName: anyx.ConvString(re["columnName"]),
IndexType: anyx.ConvString(re["IndexType"]),
IndexComment: anyx.ConvString(re["indexComment"]),
NonUnique: anyx.ConvInt(re["nonUnique"]),
SeqInIndex: anyx.ConvInt(re["seqInIndex"]),
})
}
// 把查询结果以索引名分组,索引字段以逗号连接
result := make([]Index, 0)
key := ""
for _, v := range indexs {
// 当前的索引名
in := v.IndexName
if key == in {
// 索引字段已根据名称和顺序排序,故取最后一个即可
i := len(result) - 1
// 同索引字段以逗号连接
result[i].ColumnName = result[i].ColumnName + "," + v.ColumnName
} else {
key = in
result = append(result, v)
}
}
return result, nil
}
// 获取建表ddl
func (pd *DMDialect) GetCreateTableDdl(tableName string) (string, error) {
ddlSql := fmt.Sprintf("CALL SP_TABLEDEF((SELECT SF_GET_SCHEMA_NAME_BY_ID(CURRENT_SCHID)), '%s')", tableName)
_, res, err := pd.dc.Query(ddlSql)
if err != nil {
return "", err
}
return res[0]["COLUMN_VALUE"].(string), nil
}
func (pd *DMDialect) GetTableRecord(tableName string, pageNum, pageSize int) ([]string, []map[string]any, error) {
return pd.dc.Query(fmt.Sprintf("SELECT * FROM %s OFFSET %d LIMIT %d", tableName, (pageNum-1)*pageSize, pageSize))
}
func (pd *DMDialect) WalkTableRecord(tableName string, walk func(record map[string]any, columns []string)) error {
return pd.dc.WalkTableRecord(context.Background(), fmt.Sprintf("SELECT * FROM %s", tableName), walk)
}
// 获取DM当前连接的库可访问的schemaNames
func (pd *DMDialect) GetSchemas() ([]string, error) {
sql := GetLocalSql(DM_META_FILE, DM_DB_SCHEMAS)
_, res, err := pd.dc.Query(sql)
if err != nil {
return nil, err
}
schemaNames := make([]string, 0)
for _, re := range res {
schemaNames = append(schemaNames, anyx.ConvString(re["schemaName"]))
}
return schemaNames, nil
}

View File

@@ -181,3 +181,7 @@ func (md *MysqlDialect) GetTableRecord(tableName string, pageNum, pageSize int)
func (md *MysqlDialect) WalkTableRecord(tableName string, walk func(record map[string]any, columns []string)) error {
return md.dc.WalkTableRecord(context.Background(), fmt.Sprintf("SELECT * FROM %s", tableName), walk)
}
func (pd *MysqlDialect) GetSchemas() ([]string, error) {
return nil, nil
}

View File

@@ -40,6 +40,8 @@ func (dbInfo *DbInfo) Conn() (*DbConn, error) {
conn, err = getMysqlDB(dbInfo)
case DbTypePostgres:
conn, err = getPgsqlDB(dbInfo)
case DM:
conn, err = getDmDB(dbInfo)
default:
return nil, errorx.NewBiz("invalid database type: %s", dbInfo.Type)
}

View File

@@ -0,0 +1,56 @@
--DM_DB_SCHEMAS schemas
select
distinct owner as schemaName
from dba_objects
---------------------------------------
--DM_TABLE_INFO 表详细信息
select
a.object_name as tableName,
b.comments as tableComment,
a.created as createTime,
TABLE_USED_SPACE((SELECT SF_GET_SCHEMA_NAME_BY_ID(CURRENT_SCHID)), a.object_name)*page() as dataLength
from dba_objects a
JOIN USER_TAB_COMMENTS b ON b.TABLE_TYPE='TABLE' and a.object_name = b.TABLE_NAME
where a.owner = (SELECT SF_GET_SCHEMA_NAME_BY_ID(CURRENT_SCHID))
and a.object_type = 'TABLE'
---------------------------------------
--DM_INDEX_INFO 表索引信息
SELECT
indexname AS "indexName",
'BTREE' AS "IndexType",
case when indexdef like 'CREATE UNIQUE INDEX%%' then 0 else 1 end as "nonUnique",
obj_description(b.oid, 'pg_class') AS "indexComment",
indexdef AS "indexDef",
c.attname AS "columnName",
c.attnum AS "seqInIndex"
FROM pg_indexes a
join pg_class b on a.indexname = b.relname
join pg_attribute c on b.oid = c.attrelid
WHERE a.schemaname = (select current_schema())
AND a.tablename = '%s';
---------------------------------------
--DM_COLUMN_MA 表列信息
select a.table_name as tableName,
a.column_name as columnName,
case when a.NULLABLE = 'Y' then 'YES' when a.NULLABLE = 'N' then 'NO' else 'NO' end as nullable,
case
when a.char_col_decl_length > 0 then concat(a.data_type, '(', a.char_col_decl_length, ')')
when a.data_precision > 0 and a.data_scale > 0
then concat(a.data_type, '(', a.data_precision, ',', a.data_scale, ')')
else a.data_type end
as columnType,
b.comments as columnComment,
a.data_default as columnDefault,
a.data_scale as numScale,
case when t.COL_NAME = a.column_name then 'PRI' else '' end as columnKey
from dba_tab_columns a
join user_col_comments b on b.owner = a.owner and b.table_name = a.table_name and a.column_name = b.column_name
join (select b.owner, b.table_name, a.name COL_NAME
from SYS.SYSCOLUMNS a,
dba_tables b,
sys.sysobjects c
where a.INFO2 & 0x01 = 0x01
and a.id=c.id and c.name = b.table_name) t on t.owner = a.owner and t.table_name = a.table_name
where a.owner = (SELECT SF_GET_SCHEMA_NAME_BY_ID(CURRENT_SCHID))
and a.table_name in (%s)
order by a.table_name, a.column_id