mirror of
https://gitee.com/dromara/mayfly-go
synced 2025-11-03 16:00:25 +08:00
feat: 新增pgsql数据操作&redis集群操作
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"mayfly-go/internal/devops/api/form"
|
||||
"mayfly-go/internal/devops/api/vo"
|
||||
"mayfly-go/internal/devops/application"
|
||||
@@ -11,7 +12,10 @@ import (
|
||||
"mayfly-go/pkg/utils"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
)
|
||||
|
||||
type Redis struct {
|
||||
@@ -45,7 +49,40 @@ func (r *Redis) DeleteRedis(rc *ctx.ReqCtx) {
|
||||
}
|
||||
|
||||
func (r *Redis) RedisInfo(rc *ctx.ReqCtx) {
|
||||
res, _ := r.RedisApp.GetRedisInstance(uint64(ginx.PathParamInt(rc.GinCtx, "id"))).Cli.Info().Result()
|
||||
ri := r.RedisApp.GetRedisInstance(uint64(ginx.PathParamInt(rc.GinCtx, "id")))
|
||||
|
||||
var res string
|
||||
var err error
|
||||
|
||||
ctx := context.Background()
|
||||
if ri.Mode == "" || ri.Mode == entity.RedisModeStandalone {
|
||||
res, err = ri.Cli.Info(ctx).Result()
|
||||
} else if ri.Mode == entity.RedisModeCluster {
|
||||
host := rc.GinCtx.Query("host")
|
||||
biz.NotEmpty(host, "集群模式host信息不能为空")
|
||||
clusterClient := ri.ClusterCli
|
||||
var redisClient *redis.Client
|
||||
// 遍历集群的master节点找到该redis client
|
||||
clusterClient.ForEachMaster(ctx, func(ctx context.Context, client *redis.Client) error {
|
||||
if host == client.Options().Addr {
|
||||
redisClient = client
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if redisClient == nil {
|
||||
// 遍历集群的slave节点找到该redis client
|
||||
clusterClient.ForEachSlave(ctx, func(ctx context.Context, client *redis.Client) error {
|
||||
if host == client.Options().Addr {
|
||||
redisClient = client
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
biz.NotNil(redisClient, "该实例不在该集群中")
|
||||
res, err = redisClient.Info(ctx).Result()
|
||||
}
|
||||
|
||||
biz.ErrIsNilAppendErr(err, "获取redis info失败: %s")
|
||||
|
||||
datas := strings.Split(res, "\r\n")
|
||||
i := 0
|
||||
@@ -81,43 +118,125 @@ func (r *Redis) RedisInfo(rc *ctx.ReqCtx) {
|
||||
rc.ResData = parseMap
|
||||
}
|
||||
|
||||
func (r *Redis) ClusterInfo(rc *ctx.ReqCtx) {
|
||||
g := rc.GinCtx
|
||||
ri := r.RedisApp.GetRedisInstance(uint64(ginx.PathParamInt(g, "id")))
|
||||
biz.IsEquals(ri.Mode, entity.RedisModeCluster, "非集群模式")
|
||||
info, _ := ri.ClusterCli.ClusterInfo(context.Background()).Result()
|
||||
nodesStr, _ := ri.ClusterCli.ClusterNodes(context.Background()).Result()
|
||||
|
||||
nodesRes := make([]map[string]string, 0)
|
||||
nodes := strings.Split(nodesStr, "\n")
|
||||
for _, node := range nodes {
|
||||
if node == "" {
|
||||
continue
|
||||
}
|
||||
nodeInfos := strings.Split(node, " ")
|
||||
node := make(map[string]string)
|
||||
node["nodeId"] = nodeInfos[0]
|
||||
// ip:port1@port2:port1指redis服务器与客户端通信的端口,port2则是集群内部节点间通信的端口
|
||||
node["ip"] = nodeInfos[1]
|
||||
node["flags"] = nodeInfos[2]
|
||||
// 如果节点是slave,并且已知master节点,则为master节点ID;否则为符号"-"
|
||||
node["masterSlaveRelation"] = nodeInfos[3]
|
||||
// 最近一次发送ping的时间,这个时间是一个unix毫秒时间戳,0代表没有发送过
|
||||
node["pingSent"] = nodeInfos[4]
|
||||
// 最近一次收到pong的时间,使用unix时间戳表示
|
||||
node["pongRecv"] = nodeInfos[5]
|
||||
// 节点的epoch值(如果该节点是从节点,则为其主节点的epoch值)。每当节点发生失败切换时,都会创建一个新的,独特的,递增的epoch。
|
||||
// 如果多个节点竞争同一个哈希槽时,epoch值更高的节点会抢夺到
|
||||
node["configEpoch"] = nodeInfos[6]
|
||||
// node-to-node集群总线使用的链接的状态,我们使用这个链接与集群中其他节点进行通信.值可以是 connected 和 disconnected
|
||||
node["linkState"] = nodeInfos[7]
|
||||
// slave节点没有插槽信息
|
||||
if len(nodeInfos) > 8 {
|
||||
// slot:master节点第9位为哈希槽值或者一个哈希槽范围,代表当前节点可以提供服务的所有哈希槽值。如果只是一个值,那就是只有一个槽会被使用。
|
||||
// 如果是一个范围,这个值表示为起始槽-结束槽,节点将处理包括起始槽和结束槽在内的所有哈希槽。
|
||||
node["slot"] = nodeInfos[8]
|
||||
}
|
||||
nodesRes = append(nodesRes, node)
|
||||
}
|
||||
rc.ResData = map[string]interface{}{
|
||||
"clusterInfo": info,
|
||||
"clusterNodes": nodesRes,
|
||||
}
|
||||
}
|
||||
|
||||
// scan获取redis的key列表信息
|
||||
func (r *Redis) Scan(rc *ctx.ReqCtx) {
|
||||
g := rc.GinCtx
|
||||
|
||||
ri := r.RedisApp.GetRedisInstance(uint64(ginx.PathParamInt(g, "id")))
|
||||
biz.ErrIsNilAppendErr(r.ProjectApp.CanAccess(rc.LoginAccount.Id, ri.ProjectId), "%s")
|
||||
|
||||
keys, cursor := ri.Scan(uint64(ginx.PathParamInt(g, "cursor")), g.Query("match"), int64(ginx.PathParamInt(g, "count")))
|
||||
form := &form.RedisScanForm{}
|
||||
ginx.BindJsonAndValid(rc.GinCtx, form)
|
||||
|
||||
var keyInfoSplit []string
|
||||
if len(keys) > 0 {
|
||||
keyInfoLua := `
|
||||
local result = {}
|
||||
-- KEYS[1]为第1个参数,lua数组下标从1开始
|
||||
local ttl = redis.call('ttl', KEYS[1]);
|
||||
local keyType = redis.call('type', KEYS[1]);
|
||||
for i = 1, #KEYS do
|
||||
local ttl = redis.call('ttl', KEYS[i]);
|
||||
local keyType = redis.call('type', KEYS[i]);
|
||||
table.insert(result, string.format("%d,%s", ttl, keyType['ok']));
|
||||
end;
|
||||
return table.concat(result, ".");`
|
||||
// 通过lua获取 ttl,type.ttl2,type2格式,以便下面切割获取ttl和type。避免多次调用ttl和type函数
|
||||
keyInfos, _ := ri.Cli.Eval(keyInfoLua, keys).Result()
|
||||
keyInfoSplit = strings.Split(keyInfos.(string), ".")
|
||||
}
|
||||
cmd := ri.GetCmdable()
|
||||
ctx := context.Background()
|
||||
|
||||
kis := make([]*vo.KeyInfo, 0)
|
||||
for i, k := range keys {
|
||||
ttlType := strings.Split(keyInfoSplit[i], ",")
|
||||
ttl, _ := strconv.Atoi(ttlType[0])
|
||||
ki := &vo.KeyInfo{Key: k, Type: ttlType[1], Ttl: int64(ttl)}
|
||||
kis = append(kis, ki)
|
||||
var cursorRes map[string]uint64 = make(map[string]uint64)
|
||||
|
||||
if ri.Mode == "" || ri.Mode == entity.RedisModeStandalone {
|
||||
redisAddr := ri.Cli.Options().Addr
|
||||
keys, cursor := ri.Scan(form.Cursor[redisAddr], form.Match, form.Count)
|
||||
cursorRes[redisAddr] = cursor
|
||||
|
||||
var keyInfoSplit []string
|
||||
if len(keys) > 0 {
|
||||
keyInfosLua := `local result = {}
|
||||
-- KEYS[1]为第1个参数,lua数组下标从1开始
|
||||
for i = 1, #KEYS do
|
||||
local ttl = redis.call('ttl', KEYS[i]);
|
||||
local keyType = redis.call('type', KEYS[i]);
|
||||
table.insert(result, string.format("%d,%s", ttl, keyType['ok']));
|
||||
end;
|
||||
return table.concat(result, ".");`
|
||||
// 通过lua获取 ttl,type.ttl2,type2格式,以便下面切割获取ttl和type。避免多次调用ttl和type函数
|
||||
keyInfos, err := cmd.Eval(ctx, keyInfosLua, keys).Result()
|
||||
biz.ErrIsNilAppendErr(err, "执行lua脚本获取key信息失败: %s")
|
||||
keyInfoSplit = strings.Split(keyInfos.(string), ".")
|
||||
}
|
||||
|
||||
for i, k := range keys {
|
||||
ttlType := strings.Split(keyInfoSplit[i], ",")
|
||||
ttl, _ := strconv.Atoi(ttlType[0])
|
||||
ki := &vo.KeyInfo{Key: k, Type: ttlType[1], Ttl: int64(ttl)}
|
||||
kis = append(kis, ki)
|
||||
}
|
||||
} else if ri.Mode == entity.RedisModeCluster {
|
||||
var keys []string
|
||||
|
||||
mu := &sync.Mutex{}
|
||||
// 遍历所有master节点,并执行scan命令,合并keys
|
||||
ri.ClusterCli.ForEachMaster(ctx, func(ctx context.Context, client *redis.Client) error {
|
||||
redisAddr := client.Options().Addr
|
||||
ks, cursor, _ := client.Scan(ctx, form.Cursor[redisAddr], form.Match, form.Count).Result()
|
||||
// 遍历节点的内部回调函数使用异步调用,如不加锁会导致集合并发错误
|
||||
mu.Lock()
|
||||
cursorRes[redisAddr] = cursor
|
||||
keys = append(keys, ks...)
|
||||
mu.Unlock()
|
||||
return nil
|
||||
})
|
||||
|
||||
// 因为redis集群模式执行lua脚本key必须位于同一slot中,故单机获取的方式不适合
|
||||
// 使用lua获取key的ttl以及类型,减少网络调用
|
||||
keyInfoLua := `local ttl = redis.call('ttl', KEYS[1]);
|
||||
local keyType = redis.call('type', KEYS[1]);
|
||||
return string.format("%d,%s", ttl, keyType['ok'])`
|
||||
for _, k := range keys {
|
||||
keyInfo, err := cmd.Eval(ctx, keyInfoLua, []string{k}).Result()
|
||||
biz.ErrIsNilAppendErr(err, "执行lua脚本获取key信息失败: %s")
|
||||
ttlType := strings.Split(keyInfo.(string), ",")
|
||||
ttl, _ := strconv.Atoi(ttlType[0])
|
||||
ki := &vo.KeyInfo{Key: k, Type: ttlType[1], Ttl: int64(ttl)}
|
||||
kis = append(kis, ki)
|
||||
}
|
||||
}
|
||||
|
||||
size, _ := ri.Cli.DBSize().Result()
|
||||
rc.ResData = &vo.Keys{Cursor: cursor, Keys: kis, DbSize: size}
|
||||
size, _ := cmd.DBSize(context.TODO()).Result()
|
||||
rc.ResData = &vo.Keys{Cursor: cursorRes, Keys: kis, DbSize: size}
|
||||
}
|
||||
|
||||
func (r *Redis) DeleteKey(rc *ctx.ReqCtx) {
|
||||
@@ -129,7 +248,7 @@ func (r *Redis) DeleteKey(rc *ctx.ReqCtx) {
|
||||
biz.ErrIsNilAppendErr(r.ProjectApp.CanAccess(rc.LoginAccount.Id, ri.ProjectId), "%s")
|
||||
|
||||
rc.ReqParam = key
|
||||
ri.Cli.Del(key)
|
||||
ri.GetCmdable().Del(context.Background(), key)
|
||||
}
|
||||
|
||||
func (r *Redis) checkKey(rc *ctx.ReqCtx) (*application.RedisInstance, string) {
|
||||
@@ -145,14 +264,14 @@ func (r *Redis) checkKey(rc *ctx.ReqCtx) (*application.RedisInstance, string) {
|
||||
|
||||
func (r *Redis) GetStringValue(rc *ctx.ReqCtx) {
|
||||
ri, key := r.checkKey(rc)
|
||||
str, err := ri.Cli.Get(key).Result()
|
||||
str, err := ri.GetCmdable().Get(context.TODO(), key).Result()
|
||||
biz.ErrIsNilAppendErr(err, "获取字符串值失败: %s")
|
||||
rc.ResData = str
|
||||
}
|
||||
|
||||
func (r *Redis) GetHashValue(rc *ctx.ReqCtx) {
|
||||
ri, key := r.checkKey(rc)
|
||||
res, err := ri.Cli.HGetAll(key).Result()
|
||||
res, err := ri.GetCmdable().HGetAll(context.TODO(), key).Result()
|
||||
biz.ErrIsNilAppendErr(err, "获取hash值失败: %s")
|
||||
rc.ResData = res
|
||||
}
|
||||
@@ -165,7 +284,7 @@ func (r *Redis) SetStringValue(rc *ctx.ReqCtx) {
|
||||
ri := r.RedisApp.GetRedisInstance(uint64(ginx.PathParamInt(g, "id")))
|
||||
biz.ErrIsNilAppendErr(r.ProjectApp.CanAccess(rc.LoginAccount.Id, ri.ProjectId), "%s")
|
||||
|
||||
str, err := ri.Cli.Set(keyValue.Key, keyValue.Value, time.Second*time.Duration(keyValue.Timed)).Result()
|
||||
str, err := ri.GetCmdable().Set(context.TODO(), keyValue.Key, keyValue.Value, time.Second*time.Duration(keyValue.Timed)).Result()
|
||||
biz.ErrIsNilAppendErr(err, "保存字符串值失败: %s")
|
||||
rc.ResData = str
|
||||
}
|
||||
@@ -178,21 +297,22 @@ func (r *Redis) SetHashValue(rc *ctx.ReqCtx) {
|
||||
ri := r.RedisApp.GetRedisInstance(uint64(ginx.PathParamInt(g, "id")))
|
||||
biz.ErrIsNilAppendErr(r.ProjectApp.CanAccess(rc.LoginAccount.Id, ri.ProjectId), "%s")
|
||||
|
||||
cmd := ri.GetCmdable()
|
||||
key := hashValue.Key
|
||||
// 简单处理->先删除,后新增
|
||||
ri.Cli.Del(key)
|
||||
cmd.Del(context.TODO(), key)
|
||||
for _, v := range hashValue.Value {
|
||||
res := ri.Cli.HSet(key, v["key"].(string), v["value"])
|
||||
res := cmd.HSet(context.TODO(), key, v["key"].(string), v["value"])
|
||||
biz.ErrIsNilAppendErr(res.Err(), "保存hash值失败: %s")
|
||||
}
|
||||
if hashValue.Timed != -1 {
|
||||
ri.Cli.Expire(key, time.Second*time.Duration(hashValue.Timed))
|
||||
cmd.Expire(context.TODO(), key, time.Second*time.Duration(hashValue.Timed))
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Redis) GetSetValue(rc *ctx.ReqCtx) {
|
||||
ri, key := r.checkKey(rc)
|
||||
res, err := ri.Cli.SMembers(key).Result()
|
||||
res, err := ri.GetCmdable().SMembers(context.TODO(), key).Result()
|
||||
biz.ErrIsNilAppendErr(err, "获取set值失败: %s")
|
||||
rc.ResData = res
|
||||
}
|
||||
@@ -204,13 +324,14 @@ func (r *Redis) SetSetValue(rc *ctx.ReqCtx) {
|
||||
|
||||
ri := r.RedisApp.GetRedisInstance(uint64(ginx.PathParamInt(g, "id")))
|
||||
biz.ErrIsNilAppendErr(r.ProjectApp.CanAccess(rc.LoginAccount.Id, ri.ProjectId), "%s")
|
||||
cmd := ri.GetCmdable()
|
||||
|
||||
key := keyvalue.Key
|
||||
// 简单处理->先删除,后新增
|
||||
ri.Cli.Del(key)
|
||||
ri.Cli.SAdd(key, keyvalue.Value...)
|
||||
cmd.Del(context.TODO(), key)
|
||||
cmd.SAdd(context.TODO(), key, keyvalue.Value...)
|
||||
|
||||
if keyvalue.Timed != -1 {
|
||||
ri.Cli.Expire(key, time.Second*time.Duration(keyvalue.Timed))
|
||||
cmd.Expire(context.TODO(), key, time.Second*time.Duration(keyvalue.Timed))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user