Files
EdgeAdmin/internal/web/actions/default/nodes/nodeutils/utils.go
2021-05-23 22:36:52 +08:00

342 lines
7.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package nodeutils
import (
"context"
"encoding/json"
"github.com/TeaOSLab/EdgeAdmin/internal/configs"
"github.com/TeaOSLab/EdgeAdmin/internal/rpc"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"sort"
"strconv"
"sync"
)
// MessageResult 和节点消息通讯结果定义
type MessageResult struct {
NodeId int64 `json:"nodeId"`
NodeName string `json:"nodeName"`
IsOK bool `json:"isOk"`
Message string `json:"message"`
}
// SendMessageToCluster 向集群发送命令消息
func SendMessageToCluster(ctx context.Context, clusterId int64, code string, msg interface{}, timeoutSeconds int32) (results []*MessageResult, err error) {
results = []*MessageResult{}
msgJSON, err := json.Marshal(msg)
if err != nil {
return results, err
}
defaultRPCClient, err := rpc.SharedRPC()
if err != nil {
return results, err
}
// 获取所有节点
nodesResp, err := defaultRPCClient.NodeRPC().FindAllEnabledNodesWithClusterId(ctx, &pb.FindAllEnabledNodesWithClusterIdRequest{NodeClusterId: clusterId})
if err != nil {
return results, err
}
nodes := nodesResp.Nodes
if len(nodes) == 0 {
return results, nil
}
rpcMap := map[int64]*rpc.RPCClient{} // apiNodeId => RPCClient
locker := &sync.Mutex{}
wg := &sync.WaitGroup{}
wg.Add(len(nodes))
for _, node := range nodes {
// TODO 检查是否在线
if len(node.ConnectedAPINodeIds) == 0 {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "节点尚未连接到API",
})
locker.Unlock()
wg.Done()
continue
}
// 获取API节点信息
apiNodeId := node.ConnectedAPINodeIds[0]
rpcClient, ok := rpcMap[apiNodeId]
if !ok {
apiNodeResp, err := defaultRPCClient.APINodeRPC().FindEnabledAPINode(ctx, &pb.FindEnabledAPINodeRequest{NodeId: apiNodeId})
if err != nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "无法读取对应的API节点信息" + err.Error(),
})
locker.Unlock()
wg.Done()
continue
}
if apiNodeResp.Node == nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "无法读取对应的API节点信息API节点ID" + strconv.FormatInt(apiNodeId, 10),
})
locker.Unlock()
wg.Done()
continue
}
apiNode := apiNodeResp.Node
apiRPCClient, err := rpc.NewRPCClient(&configs.APIConfig{
RPC: struct {
Endpoints []string `yaml:"endpoints"`
}{
Endpoints: apiNode.AccessAddrs,
},
NodeId: apiNode.UniqueId,
Secret: apiNode.Secret,
})
if err != nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "初始化API节点错误API节点ID" + strconv.FormatInt(apiNodeId, 10) + "" + err.Error(),
})
locker.Unlock()
wg.Done()
continue
}
rpcMap[apiNodeId] = apiRPCClient
rpcClient = apiRPCClient
}
// 发送消息
go func(node *pb.Node) {
defer wg.Done()
result, err := rpcClient.NodeRPC().SendCommandToNode(ctx, &pb.NodeStreamMessage{
NodeId: node.Id,
TimeoutSeconds: timeoutSeconds,
Code: code,
DataJSON: msgJSON,
})
if err != nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "API返回错误" + err.Error(),
})
locker.Unlock()
return
}
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: result.IsOk,
Message: result.Message,
})
locker.Unlock()
}(node)
}
wg.Wait()
// 对结果进行排序
if len(results) > 0 {
sort.Slice(results, func(i, j int) bool {
return results[i].NodeId < results[j].NodeId
})
}
return
}
// SendMessageToNodeIds 向一组节点发送命令消息
func SendMessageToNodeIds(ctx context.Context, nodeIds []int64, code string, msg interface{}, timeoutSeconds int32) (results []*MessageResult, err error) {
results = []*MessageResult{}
if len(nodeIds) == 0 {
return
}
msgJSON, err := json.Marshal(msg)
if err != nil {
return results, err
}
defaultRPCClient, err := rpc.SharedRPC()
if err != nil {
return results, err
}
// 获取所有节点
nodesResp, err := defaultRPCClient.NodeRPC().FindEnabledNodesWithIds(ctx, &pb.FindEnabledNodesWithIdsRequest{NodeIds: nodeIds})
if err != nil {
return nil, err
}
nodes := nodesResp.Nodes
if len(nodes) == 0 {
return results, nil
}
rpcMap := map[int64]*rpc.RPCClient{} // apiNodeId => RPCClient
locker := &sync.Mutex{}
wg := &sync.WaitGroup{}
wg.Add(len(nodes))
for _, node := range nodes {
if !node.IsActive {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "节点不在线",
})
locker.Unlock()
wg.Done()
continue
}
if !node.IsOn {
if !node.IsActive {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "节点未启用",
})
locker.Unlock()
wg.Done()
continue
}
}
if len(node.ConnectedAPINodeIds) == 0 {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "节点尚未连接到API",
})
locker.Unlock()
wg.Done()
continue
}
// 获取API节点信息
apiNodeId := node.ConnectedAPINodeIds[0]
rpcClient, ok := rpcMap[apiNodeId]
if !ok {
apiNodeResp, err := defaultRPCClient.APINodeRPC().FindEnabledAPINode(ctx, &pb.FindEnabledAPINodeRequest{NodeId: apiNodeId})
if err != nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "无法读取对应的API节点信息" + err.Error(),
})
locker.Unlock()
wg.Done()
continue
}
if apiNodeResp.Node == nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "无法读取对应的API节点信息API节点ID" + strconv.FormatInt(apiNodeId, 10),
})
locker.Unlock()
wg.Done()
continue
}
apiNode := apiNodeResp.Node
apiRPCClient, err := rpc.NewRPCClient(&configs.APIConfig{
RPC: struct {
Endpoints []string `yaml:"endpoints"`
}{
Endpoints: apiNode.AccessAddrs,
},
NodeId: apiNode.UniqueId,
Secret: apiNode.Secret,
})
if err != nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "初始化API节点错误API节点ID" + strconv.FormatInt(apiNodeId, 10) + "" + err.Error(),
})
locker.Unlock()
wg.Done()
continue
}
rpcMap[apiNodeId] = apiRPCClient
rpcClient = apiRPCClient
}
// 发送消息
go func(node *pb.Node) {
defer wg.Done()
result, err := rpcClient.NodeRPC().SendCommandToNode(ctx, &pb.NodeStreamMessage{
NodeId: node.Id,
TimeoutSeconds: timeoutSeconds,
Code: code,
DataJSON: msgJSON,
})
if err != nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "API返回错误" + err.Error(),
})
locker.Unlock()
return
}
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: result.IsOk,
Message: result.Message,
})
locker.Unlock()
}(node)
}
wg.Wait()
// 对结果进行排序
if len(results) > 0 {
sort.Slice(results, func(i, j int) bool {
return results[i].NodeId < results[j].NodeId
})
}
return
}