Files
mayfly-go/server/internal/redis/application/redis.go

363 lines
10 KiB
Go
Raw Normal View History

package application
import (
"context"
"fmt"
"mayfly-go/internal/constant"
2022-09-09 18:26:08 +08:00
machineapp "mayfly-go/internal/machine/application"
"mayfly-go/internal/machine/infrastructure/machine"
"mayfly-go/internal/redis/domain/entity"
"mayfly-go/internal/redis/domain/repository"
"mayfly-go/pkg/biz"
"mayfly-go/pkg/cache"
"mayfly-go/pkg/global"
"mayfly-go/pkg/model"
"mayfly-go/pkg/utils"
"net"
2022-09-29 13:14:50 +08:00
"strconv"
"strings"
"time"
2023-03-17 09:46:41 +08:00
"github.com/redis/go-redis/v9"
)
type Redis interface {
// 分页获取机器脚本信息列表
GetPageList(condition *entity.RedisQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) *model.PageResult[any]
2022-10-26 20:49:29 +08:00
Count(condition *entity.RedisQuery) int64
2021-09-11 14:04:09 +08:00
// 根据id获取
GetById(id uint64, cols ...string) *entity.Redis
// 根据条件获取
GetRedisBy(condition *entity.Redis, cols ...string) error
Save(entity *entity.Redis)
// 删除数据库信息
Delete(id uint64)
// 获取数据库连接实例
2022-09-29 13:14:50 +08:00
// id: 数据库实例id
// db: 库号
GetRedisInstance(id uint64, db int) *RedisInstance
}
2022-09-09 18:26:08 +08:00
func newRedisApp(redisRepo repository.Redis) Redis {
return &redisAppImpl{
redisRepo: redisRepo,
}
}
2022-09-09 18:26:08 +08:00
type redisAppImpl struct {
redisRepo repository.Redis
}
// 分页获取机器脚本信息列表
func (r *redisAppImpl) GetPageList(condition *entity.RedisQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) *model.PageResult[any] {
return r.redisRepo.GetRedisList(condition, pageParam, toEntity, orderBy...)
}
2022-10-26 20:49:29 +08:00
func (r *redisAppImpl) Count(condition *entity.RedisQuery) int64 {
2021-09-11 14:04:09 +08:00
return r.redisRepo.Count(condition)
}
// 根据id获取
func (r *redisAppImpl) GetById(id uint64, cols ...string) *entity.Redis {
return r.redisRepo.GetById(id, cols...)
}
// 根据条件获取
func (r *redisAppImpl) GetRedisBy(condition *entity.Redis, cols ...string) error {
return r.redisRepo.GetRedis(condition, cols...)
}
func (r *redisAppImpl) Save(re *entity.Redis) {
// ’修改信息且密码不为空‘ or ‘新增’需要测试是否可连接
if (re.Id != 0 && re.Password != "") || re.Id == 0 {
TestRedisConnection(re)
}
// 查找是否存在该库
2022-09-29 13:14:50 +08:00
oldRedis := &entity.Redis{Host: re.Host}
2023-03-17 09:46:41 +08:00
if re.SshTunnelMachineId > 0 {
oldRedis.SshTunnelMachineId = re.SshTunnelMachineId
}
err := r.GetRedisBy(oldRedis)
if re.Id == 0 {
2022-09-29 13:14:50 +08:00
biz.IsTrue(err != nil, "该实例已存在")
re.PwdEncrypt()
r.redisRepo.Insert(re)
} else {
// 如果存在该库,则校验修改的库是否为该库
if err == nil {
2022-09-29 13:14:50 +08:00
biz.IsTrue(oldRedis.Id == re.Id, "该实例已存在")
}
// 如果修改了redis实例的库信息则关闭旧库的连接
2022-11-18 17:52:30 +08:00
if oldRedis.Db != re.Db || oldRedis.SshTunnelMachineId != re.SshTunnelMachineId {
2022-09-29 13:14:50 +08:00
for _, dbStr := range strings.Split(oldRedis.Db, ",") {
db, _ := strconv.Atoi(dbStr)
CloseRedis(re.Id, db)
}
}
re.PwdEncrypt()
r.redisRepo.Update(re)
}
}
// 删除Redis信息
func (r *redisAppImpl) Delete(id uint64) {
2022-09-29 13:14:50 +08:00
re := r.GetById(id)
biz.NotNil(re, "该redis信息不存在")
// 如果存在连接,则关闭所有库连接信息
for _, dbStr := range strings.Split(re.Db, ",") {
db, _ := strconv.Atoi(dbStr)
CloseRedis(re.Id, db)
}
r.redisRepo.Delete(id)
}
// 获取数据库连接实例
2022-09-29 13:14:50 +08:00
func (r *redisAppImpl) GetRedisInstance(id uint64, db int) *RedisInstance {
// Id不为0则为需要缓存
needCache := id != 0
if needCache {
2022-09-29 13:14:50 +08:00
load, ok := redisCache.Get(getRedisCacheKey(id, db))
if ok {
return load.(*RedisInstance)
}
}
// 缓存不存在则回调获取redis信息
re := r.GetById(id)
re.PwdDecrypt()
biz.NotNil(re, "redis信息不存在")
redisMode := re.Mode
var ri *RedisInstance
if redisMode == "" || redisMode == entity.RedisModeStandalone {
2022-09-29 13:14:50 +08:00
ri = getRedisCient(re, db)
// 测试连接
_, e := ri.Cli.Ping(context.Background()).Result()
if e != nil {
ri.Close()
panic(biz.NewBizErr(fmt.Sprintf("redis连接失败: %s", e.Error())))
}
} else if redisMode == entity.RedisModeCluster {
ri = getRedisClusterClient(re)
// 测试连接
_, e := ri.ClusterCli.Ping(context.Background()).Result()
if e != nil {
ri.Close()
panic(biz.NewBizErr(fmt.Sprintf("redis集群连接失败: %s", e.Error())))
}
2022-08-23 18:50:07 +08:00
} else if redisMode == entity.RedisModeSentinel {
2022-09-29 13:14:50 +08:00
ri = getRedisSentinelCient(re, db)
2022-08-23 18:50:07 +08:00
// 测试连接
_, e := ri.Cli.Ping(context.Background()).Result()
if e != nil {
ri.Close()
panic(biz.NewBizErr(fmt.Sprintf("redis sentinel连接失败: %s", e.Error())))
}
}
global.Log.Infof("连接redis: %s", re.Host)
if needCache {
2022-09-29 13:14:50 +08:00
redisCache.Put(getRedisCacheKey(id, db), ri)
}
return ri
}
2022-09-29 13:14:50 +08:00
// 生成redis连接缓存key
func getRedisCacheKey(id uint64, db int) string {
return fmt.Sprintf("%d/%d", id, db)
}
2022-11-18 17:52:30 +08:00
func toRedisInfo(re *entity.Redis, db int) *RedisInfo {
redisInfo := new(RedisInfo)
utils.Copy(redisInfo, re)
redisInfo.Db = db
return redisInfo
}
2022-09-29 13:14:50 +08:00
func getRedisCient(re *entity.Redis, db int) *RedisInstance {
2022-11-18 17:52:30 +08:00
ri := &RedisInstance{Id: getRedisCacheKey(re.Id, db), Info: toRedisInfo(re, db)}
redisOptions := &redis.Options{
Addr: re.Host,
Username: re.Username,
Password: re.Password, // no password set
2022-09-29 13:14:50 +08:00
DB: db, // use default DB
DialTimeout: 8 * time.Second,
ReadTimeout: -1, // Disable timeouts, because SSH does not support deadlines.
WriteTimeout: -1,
}
if re.SshTunnelMachineId > 0 {
redisOptions.Dialer = getRedisDialer(re.SshTunnelMachineId)
}
ri.Cli = redis.NewClient(redisOptions)
return ri
}
func getRedisClusterClient(re *entity.Redis) *RedisInstance {
2022-11-18 17:52:30 +08:00
ri := &RedisInstance{Id: getRedisCacheKey(re.Id, 0), Info: toRedisInfo(re, 0)}
redisClusterOptions := &redis.ClusterOptions{
Addrs: strings.Split(re.Host, ","),
Username: re.Username,
Password: re.Password,
DialTimeout: 8 * time.Second,
}
if re.SshTunnelMachineId > 0 {
redisClusterOptions.Dialer = getRedisDialer(re.SshTunnelMachineId)
}
ri.ClusterCli = redis.NewClusterClient(redisClusterOptions)
return ri
}
2022-09-29 13:14:50 +08:00
func getRedisSentinelCient(re *entity.Redis, db int) *RedisInstance {
2022-11-18 17:52:30 +08:00
ri := &RedisInstance{Id: getRedisCacheKey(re.Id, db), Info: toRedisInfo(re, db)}
2022-08-23 18:50:07 +08:00
// sentinel模式host为 masterName=host:port,host:port
masterNameAndHosts := strings.Split(re.Host, "=")
sentinelOptions := &redis.FailoverOptions{
2023-06-06 20:51:54 +08:00
MasterName: masterNameAndHosts[0],
SentinelAddrs: strings.Split(masterNameAndHosts[1], ","),
Username: re.Username,
2023-06-06 20:51:54 +08:00
Password: re.Password, // no password set
SentinelPassword: re.Password, // 哨兵节点密码需与redis节点密码一致
DB: db, // use default DB
DialTimeout: 8 * time.Second,
ReadTimeout: -1, // Disable timeouts, because SSH does not support deadlines.
WriteTimeout: -1,
2022-08-23 18:50:07 +08:00
}
if re.SshTunnelMachineId > 0 {
2022-08-23 18:50:07 +08:00
sentinelOptions.Dialer = getRedisDialer(re.SshTunnelMachineId)
}
ri.Cli = redis.NewFailoverClient(sentinelOptions)
return ri
}
func getRedisDialer(machineId int) func(ctx context.Context, network, addr string) (net.Conn, error) {
2022-09-09 18:26:08 +08:00
sshTunnel := machineapp.GetMachineApp().GetSshTunnelMachine(machineId)
return func(_ context.Context, network, addr string) (net.Conn, error) {
if sshConn, err := sshTunnel.GetDialConn(network, addr); err == nil {
// 将ssh conn包装否则redis内部设置超时会报错,ssh conn不支持设置超时会返回错误: ssh: tcpChan: deadline not supported
return &utils.WrapSshConn{Conn: sshConn}, nil
} else {
return nil, err
}
}
}
//------------------------------------------------------------------------------
// redis客户端连接缓存指定时间内没有访问则会被关闭
var redisCache = cache.NewTimedCache(constant.RedisConnExpireTime, 5*time.Second).
WithUpdateAccessTime(true).
OnEvicted(func(key any, value any) {
2022-10-11 08:25:20 +08:00
global.Log.Info(fmt.Sprintf("删除redis连接缓存 id = %s", key))
value.(*RedisInstance).Close()
})
// 移除redis连接缓存并关闭redis连接
2022-09-29 13:14:50 +08:00
func CloseRedis(id uint64, db int) {
redisCache.Delete(getRedisCacheKey(id, db))
}
func init() {
machine.AddCheckSshTunnelMachineUseFunc(func(machineId int) bool {
// 遍历所有redis连接实例若存在redis实例使用该ssh隧道机器则返回true表示还在使用中...
items := redisCache.Items()
for _, v := range items {
2022-11-18 17:52:30 +08:00
if v.Value.(*RedisInstance).Info.SshTunnelMachineId == machineId {
return true
}
}
return false
})
}
func TestRedisConnection(re *entity.Redis) {
var cmd redis.Cmdable
2022-09-29 13:14:50 +08:00
// 取第一个库测试连接即可
dbStr := strings.Split(re.Db, ",")[0]
db, _ := strconv.Atoi(dbStr)
if re.Mode == "" || re.Mode == entity.RedisModeStandalone {
2022-09-29 13:14:50 +08:00
rcli := getRedisCient(re, db)
defer rcli.Close()
cmd = rcli.Cli
} else if re.Mode == entity.RedisModeCluster {
ccli := getRedisClusterClient(re)
defer ccli.Close()
cmd = ccli.ClusterCli
2022-08-23 18:50:07 +08:00
} else if re.Mode == entity.RedisModeSentinel {
2022-09-29 13:14:50 +08:00
rcli := getRedisSentinelCient(re, db)
2022-08-23 18:50:07 +08:00
defer rcli.Close()
cmd = rcli.Cli
}
// 测试连接
_, e := cmd.Ping(context.Background()).Result()
biz.ErrIsNilAppendErr(e, "Redis连接失败: %s")
}
2022-11-18 17:52:30 +08:00
type RedisInfo struct {
Id uint64
Host string
Db int // 库号
TagPath string
Mode string
Name string
SshTunnelMachineId int
2022-11-18 17:52:30 +08:00
}
// 获取记录日志的描述
func (r *RedisInfo) GetLogDesc() string {
return fmt.Sprintf("Redis[id=%d, tag=%s, host=%s, db=%d]", r.Id, r.TagPath, r.Host, r.Db)
}
// redis实例
type RedisInstance struct {
2022-11-18 17:52:30 +08:00
Id string
Info *RedisInfo
Cli *redis.Client
ClusterCli *redis.ClusterClient
}
// 获取命令执行接口的具体实现
func (r *RedisInstance) GetCmdable() redis.Cmdable {
2022-11-18 17:52:30 +08:00
redisMode := r.Info.Mode
if redisMode == "" || redisMode == entity.RedisModeStandalone || r.Info.Mode == entity.RedisModeSentinel {
return r.Cli
}
2022-08-23 18:50:07 +08:00
if redisMode == entity.RedisModeCluster {
return r.ClusterCli
}
return nil
}
func (r *RedisInstance) Scan(cursor uint64, match string, count int64) ([]string, uint64) {
keys, newcursor, err := r.GetCmdable().Scan(context.Background(), cursor, match, count).Result()
biz.ErrIsNilAppendErr(err, "scan失败: %s")
return keys, newcursor
}
func (r *RedisInstance) Close() {
2022-11-18 17:52:30 +08:00
mode := r.Info.Mode
if mode == entity.RedisModeStandalone || mode == entity.RedisModeSentinel {
if err := r.Cli.Close(); err != nil {
global.Log.Errorf("关闭redis单机实例[%s]连接失败: %s", r.Id, err.Error())
}
r.Cli = nil
}
2022-11-18 17:52:30 +08:00
if mode == entity.RedisModeCluster {
if err := r.ClusterCli.Close(); err != nil {
global.Log.Errorf("关闭redis集群实例[%s]连接失败: %s", r.Id, err.Error())
}
r.ClusterCli = nil
}
}