feature: 将数据库实例管理集成到数据库管理模块中

This commit is contained in:
kanzihuang
2023-08-30 22:41:42 +08:00
parent f7b685cfad
commit 86ad183c41
22 changed files with 446 additions and 490 deletions

View File

@@ -12,7 +12,6 @@ import (
"mayfly-go/pkg/logx"
"mayfly-go/pkg/model"
"mayfly-go/pkg/utils/collx"
"mayfly-go/pkg/utils/structx"
"reflect"
"strconv"
"strings"
@@ -40,10 +39,7 @@ type Db interface {
// 获取数据库连接实例
// @param id 数据库实例id
// @param db 数据库
GetDbInstance(id uint64, db string) *DbInstance
// 获取数据库实例的所有数据库列表
GetDatabases(entity *entity.Db) []string
GetDbInstance(db *entity.Db, instance *entity.Instance, dbName string) *DbInstance
}
func newDbApp(dbRepo repository.Db, dbSqlRepo repository.DbSql) Db {
@@ -78,32 +74,19 @@ func (d *dbAppImpl) GetById(id uint64, cols ...string) *entity.Db {
}
func (d *dbAppImpl) Save(dbEntity *entity.Db) {
// 默认tcp连接
dbEntity.Network = dbEntity.GetNetwork()
// 测试连接
if dbEntity.Password != "" {
TestConnection(dbEntity)
}
// 查找是否存在该库
oldDb := &entity.Db{Host: dbEntity.Host, Port: dbEntity.Port, Username: dbEntity.Username}
if dbEntity.SshTunnelMachineId > 0 {
oldDb.SshTunnelMachineId = dbEntity.SshTunnelMachineId
}
oldDb := &entity.Db{Name: dbEntity.Name}
err := d.GetDbBy(oldDb)
if dbEntity.Id == 0 {
biz.NotEmpty(dbEntity.Password, "密码不能为空")
biz.IsTrue(err != nil, "该数据库实例已存在")
dbEntity.PwdEncrypt()
biz.IsTrue(err != nil, "该数据库资源已存在")
d.dbRepo.Insert(dbEntity)
return
}
// 如果存在该库,则校验修改的库是否为该库
if err == nil {
biz.IsTrue(oldDb.Id == dbEntity.Id, "该数据库实例已存在")
biz.IsTrue(oldDb.Id == dbEntity.Id, "该数据库资源已存在")
}
dbId := dbEntity.Id
@@ -129,7 +112,6 @@ func (d *dbAppImpl) Save(dbEntity *entity.Db) {
d.dbSqlRepo.DeleteBy(&entity.DbSql{DbId: dbId, Db: v.(string)})
}
dbEntity.PwdEncrypt()
d.dbRepo.Update(dbEntity)
}
@@ -145,39 +127,13 @@ func (d *dbAppImpl) Delete(id uint64) {
d.dbSqlRepo.DeleteBy(&entity.DbSql{DbId: id})
}
func (d *dbAppImpl) GetDatabases(ed *entity.Db) []string {
ed.Network = ed.GetNetwork()
databases := make([]string, 0)
var dbConn *sql.DB
var metaDb string
var getDatabasesSql string
if ed.Type == entity.DbTypeMysql {
metaDb = "information_schema"
getDatabasesSql = "SELECT SCHEMA_NAME AS dbname FROM SCHEMATA"
} else {
metaDb = "postgres"
getDatabasesSql = "SELECT datname AS dbname FROM pg_database"
}
dbConn, err := GetDbConn(ed, metaDb)
biz.ErrIsNilAppendErr(err, "数据库连接失败: %s")
defer dbConn.Close()
_, res, err := SelectDataByDb(dbConn, getDatabasesSql)
biz.ErrIsNilAppendErr(err, "获取数据库列表失败")
for _, re := range res {
databases = append(databases, re["dbname"].(string))
}
return databases
}
var mutex sync.Mutex
func (da *dbAppImpl) GetDbInstance(id uint64, db string) *DbInstance {
cacheKey := GetDbCacheKey(id, db)
func (d *dbAppImpl) GetDbInstance(db *entity.Db, instance *entity.Instance, dbName string) *DbInstance {
cacheKey := GetDbCacheKey(db.Id, dbName)
// Id不为0则为需要缓存
needCache := id != 0
needCache := db.Id != 0
if needCache {
load, ok := dbCache.Get(cacheKey)
if ok {
@@ -187,33 +143,32 @@ func (da *dbAppImpl) GetDbInstance(id uint64, db string) *DbInstance {
mutex.Lock()
defer mutex.Unlock()
d := da.GetById(id)
biz.NotNil(d, "数据库信息不存在")
biz.IsTrue(strings.Contains(d.Database, db), "未配置该库的操作权限")
// 密码解密
d.PwdDecrypt()
biz.NotNil(db, "数据库信息不存在")
biz.IsTrue(strings.Contains(" "+db.Database+" ", " "+dbName+" "), "未配置该库的操作权限")
dbInfo := new(DbInfo)
structx.Copy(dbInfo, d)
dbInfo.Database = db
// 密码解密
instance.PwdDecrypt()
dbInfo := NewDbInfo(db, instance)
dbInfo.Database = dbName
dbi := &DbInstance{Id: cacheKey, Info: dbInfo}
DB, err := GetDbConn(d, db)
conn, err := getInstanceConn(instance, dbName)
if err != nil {
dbi.Close()
logx.Errorf("连接db失败: %s:%d/%s", d.Host, d.Port, db)
logx.Errorf("连接db失败: %s:%d/%s", dbInfo.Host, dbInfo.Port, dbName)
panic(biz.NewBizErr(fmt.Sprintf("数据库连接失败: %s", err.Error())))
}
// 最大连接周期超过时间的连接就close
// DB.SetConnMaxLifetime(100 * time.Second)
// conn.SetConnMaxLifetime(100 * time.Second)
// 设置最大连接数
DB.SetMaxOpenConns(5)
conn.SetMaxOpenConns(5)
// 设置闲置连接数
DB.SetMaxIdleConns(1)
conn.SetMaxIdleConns(1)
dbi.db = DB
logx.Infof("连接db: %s:%d/%s", d.Host, d.Port, db)
dbi.db = conn
logx.Infof("连接db: %s:%d/%s", dbInfo.Host, dbInfo.Port, dbName)
if needCache {
dbCache.Put(cacheKey, dbi)
}
@@ -235,6 +190,19 @@ type DbInfo struct {
SshTunnelMachineId int
}
func NewDbInfo(db *entity.Db, instance *entity.Instance) *DbInfo {
return &DbInfo{
Id: db.Id,
Name: db.Name,
Type: instance.Type,
Host: instance.Host,
Port: instance.Port,
Username: instance.Username,
TagPath: db.TagPath,
SshTunnelMachineId: instance.SshTunnelMachineId,
}
}
// 获取记录日志的描述
func (d *DbInfo) GetLogDesc() string {
return fmt.Sprintf("DB[id=%d, tag=%s, name=%s, ip=%s:%d, database=%s]", d.Id, d.TagPath, d.Name, d.Host, d.Port, d.Database)
@@ -325,35 +293,6 @@ func GetDbInstanceByCache(id string) *DbInstance {
return nil
}
func TestConnection(d *entity.Db) {
// 验证第一个库是否可以连接即可
DB, err := GetDbConn(d, strings.Split(d.Database, " ")[0])
biz.ErrIsNilAppendErr(err, "数据库连接失败: %s")
defer DB.Close()
}
// 获取数据库连接
func GetDbConn(d *entity.Db, db string) (*sql.DB, error) {
var DB *sql.DB
var err error
if d.Type == entity.DbTypeMysql {
DB, err = getMysqlDB(d, db)
} else if d.Type == entity.DbTypePostgres {
DB, err = getPgsqlDB(d, db)
}
if err != nil {
return nil, err
}
err = DB.Ping()
if err != nil {
DB.Close()
return nil, err
}
return DB, nil
}
func SelectDataByDb(db *sql.DB, selectSql string) ([]string, []map[string]any, error) {
rows, err := db.Query(selectSql)
if err != nil {