mirror of
https://gitee.com/dromara/mayfly-go
synced 2025-11-04 00:10:25 +08:00
refactor: db代码review
This commit is contained in:
@@ -165,7 +165,7 @@ func (d *dbAppImpl) GetDatabases(ed *entity.Db) []string {
|
||||
biz.ErrIsNilAppendErr(err, "数据库连接失败: %s")
|
||||
defer dbConn.Close()
|
||||
|
||||
_, res, err := SelectDataByDb(dbConn, getDatabasesSql)
|
||||
_, res, err := SelectDataByDb(dbConn, getDatabasesSql, true)
|
||||
biz.ErrIsNilAppendErr(err, "获取数据库列表失败")
|
||||
for _, re := range res {
|
||||
databases = append(databases, re["dbname"].(string))
|
||||
@@ -218,6 +218,67 @@ func (da *dbAppImpl) GetDbInstance(id uint64, db string) *DbInstance {
|
||||
return dbi
|
||||
}
|
||||
|
||||
//---------------------------------------- db instance ------------------------------------
|
||||
|
||||
// db实例
|
||||
type DbInstance struct {
|
||||
Id string
|
||||
Type string
|
||||
ProjectId uint64
|
||||
db *sql.DB
|
||||
sshTunnelMachineId uint64
|
||||
}
|
||||
|
||||
// 执行查询语句
|
||||
// 依次返回 列名数组,结果map,错误
|
||||
func (d *DbInstance) SelectData(execSql string) ([]string, []map[string]interface{}, error) {
|
||||
return SelectDataByDb(d.db, execSql, false)
|
||||
}
|
||||
|
||||
// 将查询结果映射至struct,可具体参考sqlx库
|
||||
func (d *DbInstance) SelectData2Struct(execSql string, dest interface{}) error {
|
||||
return Select2StructByDb(d.db, execSql, dest)
|
||||
}
|
||||
|
||||
// 执行内部查询语句,不返回列名以及不限制行数
|
||||
// 依次返回 结果map,错误
|
||||
func (d *DbInstance) innerSelect(execSql string) ([]map[string]interface{}, error) {
|
||||
_, res, err := SelectDataByDb(d.db, execSql, true)
|
||||
return res, err
|
||||
}
|
||||
|
||||
// 执行 update, insert, delete,建表等sql
|
||||
// 返回影响条数和错误
|
||||
func (d *DbInstance) Exec(sql string) (int64, error) {
|
||||
res, err := d.db.Exec(sql)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return res.RowsAffected()
|
||||
}
|
||||
|
||||
// 获取数据库元信息实现接口
|
||||
func (di *DbInstance) GetMeta() DbMetadata {
|
||||
dbType := di.Type
|
||||
if dbType == entity.DbTypeMysql {
|
||||
return &MysqlMetadata{di: di}
|
||||
}
|
||||
if dbType == entity.DbTypePostgres {
|
||||
return &PgsqlMetadata{di: di}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 关闭连接
|
||||
func (d *DbInstance) Close() {
|
||||
if d.db != nil {
|
||||
if err := d.db.Close(); err != nil {
|
||||
global.Log.Errorf("关闭数据库实例[%s]连接失败: %s", d.Id, err.Error())
|
||||
}
|
||||
d.db = nil
|
||||
}
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
// 单次最大查询数据集
|
||||
@@ -292,7 +353,28 @@ func GetDbConn(d *entity.Db, db string) (*sql.DB, error) {
|
||||
return DB, nil
|
||||
}
|
||||
|
||||
func SelectDataByDb(db *sql.DB, selectSql string) ([]string, []map[string]interface{}, error) {
|
||||
// 获取dataSourceName
|
||||
func getDsn(d *entity.Db, db string) string {
|
||||
var dsn string
|
||||
if d.Type == entity.DbTypeMysql {
|
||||
dsn = fmt.Sprintf("%s:%s@%s(%s:%d)/%s?timeout=8s", d.Username, d.Password, d.Network, d.Host, d.Port, db)
|
||||
if d.Params != "" {
|
||||
dsn = fmt.Sprintf("%s&%s", dsn, d.Params)
|
||||
}
|
||||
return dsn
|
||||
}
|
||||
|
||||
if d.Type == entity.DbTypePostgres {
|
||||
dsn = fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", d.Host, d.Port, d.Username, d.Password, db)
|
||||
if d.Params != "" {
|
||||
dsn = fmt.Sprintf("%s %s", dsn, strings.Join(strings.Split(d.Params, "&"), " "))
|
||||
}
|
||||
return dsn
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func SelectDataByDb(db *sql.DB, selectSql string, isInner bool) ([]string, []map[string]interface{}, error) {
|
||||
rows, err := db.Query(selectSql)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
@@ -322,7 +404,10 @@ func SelectDataByDb(db *sql.DB, selectSql string) ([]string, []map[string]interf
|
||||
rowNum := 0
|
||||
for rows.Next() {
|
||||
rowNum++
|
||||
biz.IsTrue(rowNum <= Max_Rows, "结果集 > 2000, 请完善条件或分页信息")
|
||||
// 非内部sql,则校验返回结果数量
|
||||
if !isInner {
|
||||
biz.IsTrue(rowNum <= Max_Rows, "结果集 > 2000, 请完善条件或分页信息")
|
||||
}
|
||||
|
||||
// 不Scan也会导致等待,该链接实际处于未工作的状态,然后也会导致连接数迅速达到最大
|
||||
err := rows.Scan(scans...)
|
||||
@@ -397,10 +482,11 @@ func valueConvert(data []byte, colType *sql.ColumnType) interface{} {
|
||||
return stringV
|
||||
}
|
||||
|
||||
func innerSelectByDb(db *sql.DB, selectSql string) ([]map[string]interface{}, error) {
|
||||
// 查询数据结果映射至struct。可参考sqlx库
|
||||
func Select2StructByDb(db *sql.DB, selectSql string, dest interface{}) error {
|
||||
rows, err := db.Query(selectSql)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
// rows对象一定要close掉,如果出错,不关掉则会很迅速的达到设置最大连接数,
|
||||
// 后面的链接过来直接报错或拒绝,实际上也没有起效果
|
||||
@@ -409,35 +495,12 @@ func innerSelectByDb(db *sql.DB, selectSql string) ([]map[string]interface{}, er
|
||||
rows.Close()
|
||||
}
|
||||
}()
|
||||
colTypes, _ := rows.ColumnTypes()
|
||||
// 这里表示一行填充数据
|
||||
scans := make([]interface{}, len(colTypes))
|
||||
// 这里表示一行所有列的值,用[]byte表示
|
||||
vals := make([][]byte, len(colTypes))
|
||||
// 这里scans引用vals,把数据填充到[]byte里
|
||||
for k := range vals {
|
||||
scans[k] = &vals[k]
|
||||
}
|
||||
return scanAll(rows, dest, false)
|
||||
}
|
||||
|
||||
result := make([]map[string]interface{}, 0)
|
||||
for rows.Next() {
|
||||
// 不Scan也会导致等待,该链接实际处于未工作的状态,然后也会导致连接数迅速达到最大
|
||||
err := rows.Scan(scans...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// 每行数据
|
||||
rowData := make(map[string]interface{})
|
||||
// 把vals中的数据复制到row中
|
||||
for i, v := range vals {
|
||||
colType := colTypes[i]
|
||||
colName := colType.Name()
|
||||
rowData[colName] = valueConvert(v, colType)
|
||||
}
|
||||
// 放入结果集
|
||||
result = append(result, rowData)
|
||||
}
|
||||
return result, nil
|
||||
// 删除db缓存并关闭该数据库所有连接
|
||||
func CloseDb(dbId uint64, db string) {
|
||||
dbCache.Delete(GetDbCacheKey(dbId, db))
|
||||
}
|
||||
|
||||
type PqSqlDialer struct {
|
||||
@@ -452,85 +515,7 @@ func (pd *PqSqlDialer) Dial(network, address string) (net.Conn, error) {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
func (pd *PqSqlDialer) DialTimeout(network, address string, timeout time.Duration) (net.Conn, error) {
|
||||
return pd.Dial(network, address)
|
||||
}
|
||||
|
||||
// db实例
|
||||
type DbInstance struct {
|
||||
Id string
|
||||
Type string
|
||||
ProjectId uint64
|
||||
db *sql.DB
|
||||
sshTunnelMachineId uint64
|
||||
}
|
||||
|
||||
// 执行查询语句
|
||||
// 依次返回 列名数组,结果map,错误
|
||||
func (d *DbInstance) SelectData(execSql string) ([]string, []map[string]interface{}, error) {
|
||||
return SelectDataByDb(d.db, execSql)
|
||||
}
|
||||
|
||||
// 执行内部查询语句,不返回列名以及不限制行数
|
||||
// 依次返回 结果map,错误
|
||||
func (d *DbInstance) innerSelect(execSql string) ([]map[string]interface{}, error) {
|
||||
return innerSelectByDb(d.db, execSql)
|
||||
}
|
||||
|
||||
// 执行 update, insert, delete,建表等sql
|
||||
// 返回影响条数和错误
|
||||
func (d *DbInstance) Exec(sql string) (int64, error) {
|
||||
res, err := d.db.Exec(sql)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return res.RowsAffected()
|
||||
}
|
||||
|
||||
// 获取数据库元信息实现接口
|
||||
func (di *DbInstance) GetMeta() DbMetadata {
|
||||
dbType := di.Type
|
||||
if dbType == entity.DbTypeMysql {
|
||||
return &MysqlMetadata{di: di}
|
||||
}
|
||||
if dbType == entity.DbTypePostgres {
|
||||
return &PgsqlMetadata{di: di}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 关闭连接
|
||||
func (d *DbInstance) Close() {
|
||||
if d.db != nil {
|
||||
if err := d.db.Close(); err != nil {
|
||||
global.Log.Errorf("关闭数据库实例[%s]连接失败: %s", d.Id, err.Error())
|
||||
}
|
||||
d.db = nil
|
||||
}
|
||||
}
|
||||
|
||||
// 获取dataSourceName
|
||||
func getDsn(d *entity.Db, db string) string {
|
||||
var dsn string
|
||||
if d.Type == entity.DbTypeMysql {
|
||||
dsn = fmt.Sprintf("%s:%s@%s(%s:%d)/%s?timeout=8s", d.Username, d.Password, d.Network, d.Host, d.Port, db)
|
||||
if d.Params != "" {
|
||||
dsn = fmt.Sprintf("%s&%s", dsn, d.Params)
|
||||
}
|
||||
return dsn
|
||||
}
|
||||
|
||||
if d.Type == entity.DbTypePostgres {
|
||||
dsn = fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", d.Host, d.Port, d.Username, d.Password, db)
|
||||
if d.Params != "" {
|
||||
dsn = fmt.Sprintf("%s %s", dsn, strings.Join(strings.Split(d.Params, "&"), " "))
|
||||
}
|
||||
return dsn
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// 删除db缓存并关闭该数据库所有连接
|
||||
func CloseDb(dbId uint64, db string) {
|
||||
dbCache.Delete(GetDbCacheKey(dbId, db))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user