Files
EdgeAdmin/internal/web/actions/default/nodes/nodeutils/utils.go

342 lines
8.0 KiB
Go
Raw Normal View History

2020-10-04 14:27:05 +08:00
package nodeutils
import (
"context"
"encoding/json"
"github.com/TeaOSLab/EdgeAdmin/internal/configs"
"github.com/TeaOSLab/EdgeAdmin/internal/rpc"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
2021-05-23 22:36:52 +08:00
"sort"
2020-10-04 14:27:05 +08:00
"strconv"
"sync"
)
2021-05-23 22:36:52 +08:00
// MessageResult 和节点消息通讯结果定义
2020-10-04 14:27:05 +08:00
type MessageResult struct {
NodeId int64 `json:"nodeId"`
NodeName string `json:"nodeName"`
IsOK bool `json:"isOk"`
Message string `json:"message"`
}
2021-05-23 22:36:52 +08:00
// SendMessageToCluster 向集群发送命令消息
2020-10-04 14:27:05 +08:00
func SendMessageToCluster(ctx context.Context, clusterId int64, code string, msg interface{}, timeoutSeconds int32) (results []*MessageResult, err error) {
results = []*MessageResult{}
msgJSON, err := json.Marshal(msg)
if err != nil {
return results, err
}
defaultRPCClient, err := rpc.SharedRPC()
if err != nil {
return results, err
}
// 获取所有节点
2021-05-25 17:48:51 +08:00
nodesResp, err := defaultRPCClient.NodeRPC().FindAllEnabledNodesWithNodeClusterId(ctx, &pb.FindAllEnabledNodesWithNodeClusterIdRequest{NodeClusterId: clusterId})
2020-10-04 14:27:05 +08:00
if err != nil {
return results, err
}
nodes := nodesResp.Nodes
if len(nodes) == 0 {
return results, nil
}
rpcMap := map[int64]*rpc.RPCClient{} // apiNodeId => RPCClient
locker := &sync.Mutex{}
wg := &sync.WaitGroup{}
wg.Add(len(nodes))
for _, node := range nodes {
// TODO 检查是否在线
if len(node.ConnectedAPINodeIds) == 0 {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "节点尚未连接到API",
})
locker.Unlock()
wg.Done()
continue
}
// 获取API节点信息
apiNodeId := node.ConnectedAPINodeIds[0]
rpcClient, ok := rpcMap[apiNodeId]
if !ok {
2021-11-05 15:34:48 +08:00
apiNodeResp, err := defaultRPCClient.APINodeRPC().FindEnabledAPINode(ctx, &pb.FindEnabledAPINodeRequest{ApiNodeId: apiNodeId})
if err != nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "无法读取对应的API节点信息" + err.Error(),
})
locker.Unlock()
wg.Done()
continue
}
2021-11-05 15:34:48 +08:00
if apiNodeResp.ApiNode == nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "无法读取对应的API节点信息API节点ID" + strconv.FormatInt(apiNodeId, 10),
})
locker.Unlock()
wg.Done()
continue
}
2021-11-05 15:34:48 +08:00
apiNode := apiNodeResp.ApiNode
apiRPCClient, err := rpc.NewRPCClient(&configs.APIConfig{
RPC: struct {
Endpoints []string `yaml:"endpoints"`
}{
Endpoints: apiNode.AccessAddrs,
},
NodeId: apiNode.UniqueId,
Secret: apiNode.Secret,
})
if err != nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "初始化API节点错误API节点ID" + strconv.FormatInt(apiNodeId, 10) + "" + err.Error(),
})
locker.Unlock()
wg.Done()
continue
}
rpcMap[apiNodeId] = apiRPCClient
rpcClient = apiRPCClient
}
// 发送消息
go func(node *pb.Node) {
defer wg.Done()
result, err := rpcClient.NodeRPC().SendCommandToNode(ctx, &pb.NodeStreamMessage{
NodeId: node.Id,
TimeoutSeconds: timeoutSeconds,
Code: code,
DataJSON: msgJSON,
})
if err != nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "API返回错误" + err.Error(),
})
locker.Unlock()
return
}
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: result.IsOk,
Message: result.Message,
})
locker.Unlock()
}(node)
}
wg.Wait()
2021-05-23 22:36:52 +08:00
// 对结果进行排序
if len(results) > 0 {
sort.Slice(results, func(i, j int) bool {
return results[i].NodeId < results[j].NodeId
})
}
return
}
2021-05-23 22:36:52 +08:00
// SendMessageToNodeIds 向一组节点发送命令消息
func SendMessageToNodeIds(ctx context.Context, nodeIds []int64, code string, msg interface{}, timeoutSeconds int32) (results []*MessageResult, err error) {
results = []*MessageResult{}
if len(nodeIds) == 0 {
return
}
msgJSON, err := json.Marshal(msg)
if err != nil {
return results, err
}
defaultRPCClient, err := rpc.SharedRPC()
if err != nil {
return results, err
}
// 获取所有节点
nodesResp, err := defaultRPCClient.NodeRPC().FindEnabledNodesWithIds(ctx, &pb.FindEnabledNodesWithIdsRequest{NodeIds: nodeIds})
if err != nil {
return nil, err
}
nodes := nodesResp.Nodes
if len(nodes) == 0 {
return results, nil
}
rpcMap := map[int64]*rpc.RPCClient{} // apiNodeId => RPCClient
locker := &sync.Mutex{}
wg := &sync.WaitGroup{}
wg.Add(len(nodes))
for _, node := range nodes {
if !node.IsActive {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "节点不在线",
})
locker.Unlock()
wg.Done()
continue
}
if !node.IsOn {
if !node.IsActive {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "节点未启用",
})
locker.Unlock()
wg.Done()
continue
}
}
2020-10-04 14:27:05 +08:00
if len(node.ConnectedAPINodeIds) == 0 {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "节点尚未连接到API",
})
locker.Unlock()
wg.Done()
continue
}
// 获取API节点信息
apiNodeId := node.ConnectedAPINodeIds[0]
rpcClient, ok := rpcMap[apiNodeId]
if !ok {
2021-11-05 15:34:48 +08:00
apiNodeResp, err := defaultRPCClient.APINodeRPC().FindEnabledAPINode(ctx, &pb.FindEnabledAPINodeRequest{ApiNodeId: apiNodeId})
2020-10-04 14:27:05 +08:00
if err != nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "无法读取对应的API节点信息" + err.Error(),
})
locker.Unlock()
wg.Done()
continue
}
2021-11-05 15:34:48 +08:00
if apiNodeResp.ApiNode == nil {
2020-10-04 14:27:05 +08:00
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "无法读取对应的API节点信息API节点ID" + strconv.FormatInt(apiNodeId, 10),
})
locker.Unlock()
wg.Done()
continue
}
2021-11-05 15:34:48 +08:00
apiNode := apiNodeResp.ApiNode
2020-10-04 14:27:05 +08:00
apiRPCClient, err := rpc.NewRPCClient(&configs.APIConfig{
RPC: struct {
Endpoints []string `yaml:"endpoints"`
}{
Endpoints: apiNode.AccessAddrs,
},
NodeId: apiNode.UniqueId,
Secret: apiNode.Secret,
})
if err != nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "初始化API节点错误API节点ID" + strconv.FormatInt(apiNodeId, 10) + "" + err.Error(),
})
locker.Unlock()
wg.Done()
continue
}
rpcMap[apiNodeId] = apiRPCClient
rpcClient = apiRPCClient
}
// 发送消息
go func(node *pb.Node) {
defer wg.Done()
result, err := rpcClient.NodeRPC().SendCommandToNode(ctx, &pb.NodeStreamMessage{
NodeId: node.Id,
TimeoutSeconds: timeoutSeconds,
Code: code,
DataJSON: msgJSON,
})
if err != nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "API返回错误" + err.Error(),
})
locker.Unlock()
return
}
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: result.IsOk,
Message: result.Message,
})
locker.Unlock()
}(node)
}
wg.Wait()
2021-05-23 22:36:52 +08:00
// 对结果进行排序
if len(results) > 0 {
sort.Slice(results, func(i, j int) bool {
return results[i].NodeId < results[j].NodeId
})
}
2020-10-04 14:27:05 +08:00
return
}