feat: redis支持设置多库操作

This commit is contained in:
meilin.huang
2022-09-29 13:14:50 +08:00
parent ac62767a18
commit e8f3671ffb
15 changed files with 181 additions and 84 deletions

View File

@@ -14,6 +14,7 @@ import (
"mayfly-go/pkg/model"
"mayfly-go/pkg/utils"
"net"
"strconv"
"strings"
"time"
@@ -38,7 +39,9 @@ type Redis interface {
Delete(id uint64)
// 获取数据库连接实例
GetRedisInstance(id uint64) *RedisInstance
// id: 数据库实例id
// db: 库号
GetRedisInstance(id uint64, db int) *RedisInstance
}
func newRedisApp(redisRepo repository.Redis) Redis {
@@ -77,20 +80,25 @@ func (r *redisAppImpl) Save(re *entity.Redis) {
}
// 查找是否存在该库
oldRedis := &entity.Redis{Host: re.Host, Db: re.Db}
oldRedis := &entity.Redis{Host: re.Host}
err := r.GetRedisBy(oldRedis)
if re.Id == 0 {
biz.IsTrue(err != nil, "该已存在")
biz.IsTrue(err != nil, "该实例已存在")
re.PwdEncrypt()
r.redisRepo.Insert(re)
} else {
// 如果存在该库,则校验修改的库是否为该库
if err == nil {
biz.IsTrue(oldRedis.Id == re.Id, "该已存在")
biz.IsTrue(oldRedis.Id == re.Id, "该实例已存在")
}
// 如果修改了redis实例的库信息则关闭旧库的连接
if oldRedis.Db != re.Db {
for _, dbStr := range strings.Split(oldRedis.Db, ",") {
db, _ := strconv.Atoi(dbStr)
CloseRedis(re.Id, db)
}
}
// 先关闭数据库连接
CloseRedis(re.Id)
re.PwdEncrypt()
r.redisRepo.Update(re)
}
@@ -98,16 +106,22 @@ func (r *redisAppImpl) Save(re *entity.Redis) {
// 删除Redis信息
func (r *redisAppImpl) Delete(id uint64) {
CloseRedis(id)
re := r.GetById(id)
biz.NotNil(re, "该redis信息不存在")
// 如果存在连接,则关闭所有库连接信息
for _, dbStr := range strings.Split(re.Db, ",") {
db, _ := strconv.Atoi(dbStr)
CloseRedis(re.Id, db)
}
r.redisRepo.Delete(id)
}
// 获取数据库连接实例
func (r *redisAppImpl) GetRedisInstance(id uint64) *RedisInstance {
func (r *redisAppImpl) GetRedisInstance(id uint64, db int) *RedisInstance {
// Id不为0则为需要缓存
needCache := id != 0
if needCache {
load, ok := redisCache.Get(id)
load, ok := redisCache.Get(getRedisCacheKey(id, db))
if ok {
return load.(*RedisInstance)
}
@@ -120,7 +134,7 @@ func (r *redisAppImpl) GetRedisInstance(id uint64) *RedisInstance {
redisMode := re.Mode
var ri *RedisInstance
if redisMode == "" || redisMode == entity.RedisModeStandalone {
ri = getRedisCient(re)
ri = getRedisCient(re, db)
// 测试连接
_, e := ri.Cli.Ping(context.Background()).Result()
if e != nil {
@@ -136,7 +150,7 @@ func (r *redisAppImpl) GetRedisInstance(id uint64) *RedisInstance {
panic(biz.NewBizErr(fmt.Sprintf("redis集群连接失败: %s", e.Error())))
}
} else if redisMode == entity.RedisModeSentinel {
ri = getRedisSentinelCient(re)
ri = getRedisSentinelCient(re, db)
// 测试连接
_, e := ri.Cli.Ping(context.Background()).Result()
if e != nil {
@@ -147,18 +161,23 @@ func (r *redisAppImpl) GetRedisInstance(id uint64) *RedisInstance {
global.Log.Infof("连接redis: %s", re.Host)
if needCache {
redisCache.Put(re.Id, ri)
redisCache.Put(getRedisCacheKey(id, db), ri)
}
return ri
}
func getRedisCient(re *entity.Redis) *RedisInstance {
ri := &RedisInstance{Id: re.Id, ProjectId: re.ProjectId, Mode: re.Mode}
// 生成redis连接缓存key
func getRedisCacheKey(id uint64, db int) string {
return fmt.Sprintf("%d/%d", id, db)
}
func getRedisCient(re *entity.Redis, db int) *RedisInstance {
ri := &RedisInstance{Id: getRedisCacheKey(re.Id, db), ProjectId: re.ProjectId, Mode: re.Mode}
redisOptions := &redis.Options{
Addr: re.Host,
Password: re.Password, // no password set
DB: re.Db, // use default DB
DB: db, // use default DB
DialTimeout: 8 * time.Second,
ReadTimeout: -1, // Disable timeouts, because SSH does not support deadlines.
WriteTimeout: -1,
@@ -172,7 +191,7 @@ func getRedisCient(re *entity.Redis) *RedisInstance {
}
func getRedisClusterClient(re *entity.Redis) *RedisInstance {
ri := &RedisInstance{Id: re.Id, ProjectId: re.ProjectId, Mode: re.Mode}
ri := &RedisInstance{Id: getRedisCacheKey(re.Id, 0), ProjectId: re.ProjectId, Mode: re.Mode}
redisClusterOptions := &redis.ClusterOptions{
Addrs: strings.Split(re.Host, ","),
@@ -187,15 +206,15 @@ func getRedisClusterClient(re *entity.Redis) *RedisInstance {
return ri
}
func getRedisSentinelCient(re *entity.Redis) *RedisInstance {
ri := &RedisInstance{Id: re.Id, ProjectId: re.ProjectId, Mode: re.Mode}
func getRedisSentinelCient(re *entity.Redis, db int) *RedisInstance {
ri := &RedisInstance{Id: getRedisCacheKey(re.Id, db), ProjectId: re.ProjectId, Mode: re.Mode}
// sentinel模式host为 masterName=host:port,host:port
masterNameAndHosts := strings.Split(re.Host, "=")
sentinelOptions := &redis.FailoverOptions{
MasterName: masterNameAndHosts[0],
SentinelAddrs: strings.Split(masterNameAndHosts[1], ","),
Password: re.Password, // no password set
DB: re.Db, // use default DB
DB: db, // use default DB
DialTimeout: 8 * time.Second,
ReadTimeout: -1, // Disable timeouts, because SSH does not support deadlines.
WriteTimeout: -1,
@@ -231,8 +250,8 @@ var redisCache = cache.NewTimedCache(constant.RedisConnExpireTime, 5*time.Second
})
// 移除redis连接缓存并关闭redis连接
func CloseRedis(id uint64) {
redisCache.Delete(id)
func CloseRedis(id uint64, db int) {
redisCache.Delete(getRedisCacheKey(id, db))
}
func init() {
@@ -250,8 +269,11 @@ func init() {
func TestRedisConnection(re *entity.Redis) {
var cmd redis.Cmdable
// 取第一个库测试连接即可
dbStr := strings.Split(re.Db, ",")[0]
db, _ := strconv.Atoi(dbStr)
if re.Mode == "" || re.Mode == entity.RedisModeStandalone {
rcli := getRedisCient(re)
rcli := getRedisCient(re, db)
defer rcli.Close()
cmd = rcli.Cli
} else if re.Mode == entity.RedisModeCluster {
@@ -259,7 +281,7 @@ func TestRedisConnection(re *entity.Redis) {
defer ccli.Close()
cmd = ccli.ClusterCli
} else if re.Mode == entity.RedisModeSentinel {
rcli := getRedisSentinelCient(re)
rcli := getRedisSentinelCient(re, db)
defer rcli.Close()
cmd = rcli.Cli
}
@@ -271,7 +293,7 @@ func TestRedisConnection(re *entity.Redis) {
// redis实例
type RedisInstance struct {
Id uint64
Id string
ProjectId uint64
Mode string
Cli *redis.Client