Files
EdgeAdmin/internal/web/actions/default/nodes/nodeutils/utils.go
2020-10-04 14:27:05 +08:00

156 lines
3.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package nodeutils
import (
"context"
"encoding/json"
"github.com/TeaOSLab/EdgeAdmin/internal/configs"
"github.com/TeaOSLab/EdgeAdmin/internal/rpc"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"strconv"
"sync"
)
type MessageResult struct {
NodeId int64 `json:"nodeId"`
NodeName string `json:"nodeName"`
IsOK bool `json:"isOk"`
Message string `json:"message"`
}
// 向集群发送命令消息
func SendMessageToCluster(ctx context.Context, clusterId int64, code string, msg interface{}, timeoutSeconds int32) (results []*MessageResult, err error) {
results = []*MessageResult{}
msgJSON, err := json.Marshal(msg)
if err != nil {
return results, err
}
defaultRPCClient, err := rpc.SharedRPC()
if err != nil {
return results, err
}
// 获取所有节点
nodesResp, err := defaultRPCClient.NodeRPC().FindAllEnabledNodesWithClusterId(ctx, &pb.FindAllEnabledNodesWithClusterIdRequest{ClusterId: clusterId})
if err != nil {
return results, err
}
nodes := nodesResp.Nodes
if len(nodes) == 0 {
return results, nil
}
rpcMap := map[int64]*rpc.RPCClient{} // apiNodeId => RPCClient
locker := &sync.Mutex{}
wg := &sync.WaitGroup{}
wg.Add(len(nodes))
for _, node := range nodes {
if len(node.ConnectedAPINodeIds) == 0 {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "节点尚未连接到API",
})
locker.Unlock()
wg.Done()
continue
}
// 获取API节点信息
apiNodeId := node.ConnectedAPINodeIds[0]
rpcClient, ok := rpcMap[apiNodeId]
if !ok {
apiNodeResp, err := defaultRPCClient.APINodeRPC().FindEnabledAPINode(ctx, &pb.FindEnabledAPINodeRequest{NodeId: apiNodeId})
if err != nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "无法读取对应的API节点信息" + err.Error(),
})
locker.Unlock()
wg.Done()
continue
}
if apiNodeResp.Node == nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "无法读取对应的API节点信息API节点ID" + strconv.FormatInt(apiNodeId, 10),
})
locker.Unlock()
wg.Done()
continue
}
apiNode := apiNodeResp.Node
apiRPCClient, err := rpc.NewRPCClient(&configs.APIConfig{
RPC: struct {
Endpoints []string `yaml:"endpoints"`
}{
Endpoints: apiNode.AccessAddrs,
},
NodeId: apiNode.UniqueId,
Secret: apiNode.Secret,
})
if err != nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "初始化API节点错误API节点ID" + strconv.FormatInt(apiNodeId, 10) + "" + err.Error(),
})
locker.Unlock()
wg.Done()
continue
}
rpcMap[apiNodeId] = apiRPCClient
rpcClient = apiRPCClient
}
// 发送消息
go func(node *pb.Node) {
defer wg.Done()
result, err := rpcClient.NodeRPC().SendCommandToNode(ctx, &pb.NodeStreamMessage{
NodeId: node.Id,
TimeoutSeconds: timeoutSeconds,
Code: code,
DataJSON: msgJSON,
})
if err != nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "API返回错误" + err.Error(),
})
locker.Unlock()
return
}
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: result.IsOk,
Message: result.Message,
})
locker.Unlock()
}(node)
}
wg.Wait()
return
}