feat: 调整单个数据库资源可配置多个数据库

This commit is contained in:
meilin.huang
2022-05-08 14:10:57 +08:00
parent 483f5b7604
commit 9db3db31be
26 changed files with 329 additions and 143 deletions

View File

@@ -8,6 +8,7 @@ import (
"mayfly-go/base/cache"
"mayfly-go/base/global"
"mayfly-go/base/model"
"mayfly-go/base/utils"
"mayfly-go/server/devops/domain/entity"
"mayfly-go/server/devops/domain/repository"
"mayfly-go/server/devops/infrastructure/persistence"
@@ -35,7 +36,9 @@ type Db interface {
Delete(id uint64)
// 获取数据库连接实例
GetDbInstance(id uint64) *DbInstance
// @param id 数据库实例id
// @param db 数据库
GetDbInstance(id uint64, db string) *DbInstance
}
type dbAppImpl struct {
@@ -71,29 +74,60 @@ func (d *dbAppImpl) Save(dbEntity *entity.Db) {
// 默认tcp连接
dbEntity.Network = "tcp"
// 测试连接
TestConnection(dbEntity)
if dbEntity.Password != "" {
TestConnection(*dbEntity)
}
// 查找是否存在该库
oldDb := &entity.Db{Host: dbEntity.Host, Port: dbEntity.Port, Database: dbEntity.Database}
oldDb := &entity.Db{Host: dbEntity.Host, Port: dbEntity.Port, EnvId: dbEntity.EnvId}
err := d.GetDbBy(oldDb)
if dbEntity.Id == 0 {
biz.IsTrue(err != nil, "该库已存在")
biz.NotEmpty(dbEntity.Password, "密码不能为空")
biz.IsTrue(err != nil, "该数据库实例已存在")
d.dbRepo.Insert(dbEntity)
} else {
// 如果存在该库,则校验修改的库是否为该库
if err == nil {
biz.IsTrue(oldDb.Id == dbEntity.Id, "该库已存在")
}
// 先关闭数据库连接
CloseDb(dbEntity.Id)
d.dbRepo.Update(dbEntity)
return
}
// 如果存在该库,则校验修改的库是否为该库
if err == nil {
biz.IsTrue(oldDb.Id == dbEntity.Id, "该数据库实例已存在")
}
dbId := dbEntity.Id
old := d.GetById(dbId)
var oldDbs []interface{}
for _, v := range strings.Split(old.Database, " ") {
oldDbs = append(oldDbs, v)
}
var newDbs []interface{}
for _, v := range strings.Split(dbEntity.Database, " ") {
newDbs = append(newDbs, v)
}
// 比较新旧数据库列表,需要将移除的数据库相关联的信息删除
_, delDb, _ := utils.ArrayCompare(newDbs, oldDbs, func(i1, i2 interface{}) bool {
return i1.(string) == i2.(string)
})
for _, v := range delDb {
// 先关闭数据库连接
CloseDb(dbEntity.Id, v.(string))
// 删除该库关联的所有sql记录
d.dbSqlRepo.DeleteBy(&entity.DbSql{DbId: dbId, Db: v.(string)})
}
d.dbRepo.Update(dbEntity)
}
func (d *dbAppImpl) Delete(id uint64) {
// 关闭连接
CloseDb(id)
db := d.GetById(id)
dbs := strings.Split(db.Database, " ")
for _, v := range dbs {
// 关闭连接
CloseDb(id, v)
}
d.dbRepo.Delete(id)
// 删除该库下用户保存的所有sql信息
d.dbSqlRepo.DeleteBy(&entity.DbSql{DbId: id})
@@ -101,13 +135,13 @@ func (d *dbAppImpl) Delete(id uint64) {
var mutex sync.Mutex
func (da *dbAppImpl) GetDbInstance(id uint64) *DbInstance {
func (da *dbAppImpl) GetDbInstance(id uint64, db string) *DbInstance {
mutex.Lock()
defer mutex.Unlock()
// Id不为0则为需要缓存
needCache := id != 0
if needCache {
load, ok := dbCache.Get(id)
load, ok := dbCache.Get(GetDbCacheKey(id, db))
if ok {
return load.(*DbInstance)
}
@@ -115,8 +149,11 @@ func (da *dbAppImpl) GetDbInstance(id uint64) *DbInstance {
d := da.GetById(id)
biz.NotNil(d, "数据库信息不存在")
global.Log.Infof("连接db: %s:%d/%s", d.Host, d.Port, d.Database)
biz.IsTrue(strings.Contains(d.Database, db), "未配置该库的操作权限")
global.Log.Infof("连接db: %s:%d/%s", d.Host, d.Port, db)
// 将数据库替换为要访问的数据库,原本数据库为空格拼接的所有库
d.Database = db
DB, err := sql.Open(d.Type, getDsn(d))
biz.ErrIsNil(err, fmt.Sprintf("Open %s failed, err:%v\n", d.Type, err))
perr := DB.Ping()
@@ -131,33 +168,39 @@ func (da *dbAppImpl) GetDbInstance(id uint64) *DbInstance {
// 设置闲置连接数
DB.SetMaxIdleConns(1)
dbi := &DbInstance{Id: id, Type: d.Type, ProjectId: d.ProjectId, db: DB}
cacheKey := GetDbCacheKey(id, db)
dbi := &DbInstance{Id: cacheKey, Type: d.Type, ProjectId: d.ProjectId, db: DB}
if needCache {
dbCache.Put(id, dbi)
dbCache.Put(cacheKey, dbi)
}
return dbi
}
//------------------------------------------------------------------------------
// 客户端连接缓存30分钟内没有访问则会被关闭
// 客户端连接缓存30分钟内没有访问则会被关闭, key为数据库实例id:数据库
var dbCache = cache.NewTimedCache(30*time.Minute, 5*time.Second).
WithUpdateAccessTime(true).
OnEvicted(func(key interface{}, value interface{}) {
global.Log.Info(fmt.Sprintf("删除db连接缓存 id: %d", key))
global.Log.Info(fmt.Sprintf("删除db连接缓存 id: %s", key))
value.(*DbInstance).Close()
})
func GetDbInstanceByCache(id uint64) *DbInstance {
if load, ok := dbCache.Get(fmt.Sprint(id)); ok {
func GetDbCacheKey(dbId uint64, db string) string {
return fmt.Sprintf("%d:%s", dbId, db)
}
func GetDbInstanceByCache(id string) *DbInstance {
if load, ok := dbCache.Get(id); ok {
return load.(*DbInstance)
}
return nil
}
func TestConnection(d *entity.Db) {
biz.NotNil(d, "数据库信息不存在")
DB, err := sql.Open(d.Type, getDsn(d))
func TestConnection(d entity.Db) {
// 验证第一个库是否可以连接即可
d.Database = strings.Split(d.Database, " ")[0]
DB, err := sql.Open(d.Type, getDsn(&d))
biz.ErrIsNil(err, "Open %s failed, err:%v\n", d.Type, err)
defer DB.Close()
perr := DB.Ping()
@@ -166,7 +209,7 @@ func TestConnection(d *entity.Db) {
// db实例
type DbInstance struct {
Id uint64
Id string
Type string
ProjectId uint64
db *sql.DB
@@ -264,7 +307,9 @@ func getDsn(d *entity.Db) string {
return ""
}
func CloseDb(id uint64) {
// 关闭该数据库所有连接
func CloseDb(dbId uint64, db string) {
id := GetDbCacheKey(dbId, db)
if di := GetDbInstanceByCache(id); di != nil {
di.Close()
dbCache.Delete(id)