mirror of
				https://github.com/TeaOSLab/EdgeAdmin.git
				synced 2025-11-04 05:00:25 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			156 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			156 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package nodeutils
 | 
						||
 | 
						||
import (
 | 
						||
	"context"
 | 
						||
	"encoding/json"
 | 
						||
	"github.com/TeaOSLab/EdgeAdmin/internal/configs"
 | 
						||
	"github.com/TeaOSLab/EdgeAdmin/internal/rpc"
 | 
						||
	"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
 | 
						||
	"strconv"
 | 
						||
	"sync"
 | 
						||
)
 | 
						||
 | 
						||
type MessageResult struct {
 | 
						||
	NodeId   int64  `json:"nodeId"`
 | 
						||
	NodeName string `json:"nodeName"`
 | 
						||
	IsOK     bool   `json:"isOk"`
 | 
						||
	Message  string `json:"message"`
 | 
						||
}
 | 
						||
 | 
						||
// 向集群发送命令消息
 | 
						||
func SendMessageToCluster(ctx context.Context, clusterId int64, code string, msg interface{}, timeoutSeconds int32) (results []*MessageResult, err error) {
 | 
						||
	results = []*MessageResult{}
 | 
						||
 | 
						||
	msgJSON, err := json.Marshal(msg)
 | 
						||
	if err != nil {
 | 
						||
		return results, err
 | 
						||
	}
 | 
						||
 | 
						||
	defaultRPCClient, err := rpc.SharedRPC()
 | 
						||
	if err != nil {
 | 
						||
		return results, err
 | 
						||
	}
 | 
						||
 | 
						||
	// 获取所有节点
 | 
						||
	nodesResp, err := defaultRPCClient.NodeRPC().FindAllEnabledNodesWithClusterId(ctx, &pb.FindAllEnabledNodesWithClusterIdRequest{NodeClusterId: clusterId})
 | 
						||
	if err != nil {
 | 
						||
		return results, err
 | 
						||
	}
 | 
						||
	nodes := nodesResp.Nodes
 | 
						||
	if len(nodes) == 0 {
 | 
						||
		return results, nil
 | 
						||
	}
 | 
						||
 | 
						||
	rpcMap := map[int64]*rpc.RPCClient{} // apiNodeId => RPCClient
 | 
						||
	locker := &sync.Mutex{}
 | 
						||
 | 
						||
	wg := &sync.WaitGroup{}
 | 
						||
	wg.Add(len(nodes))
 | 
						||
	for _, node := range nodes {
 | 
						||
		if len(node.ConnectedAPINodeIds) == 0 {
 | 
						||
			locker.Lock()
 | 
						||
			results = append(results, &MessageResult{
 | 
						||
				NodeId:   node.Id,
 | 
						||
				NodeName: node.Name,
 | 
						||
				IsOK:     false,
 | 
						||
				Message:  "节点尚未连接到API",
 | 
						||
			})
 | 
						||
			locker.Unlock()
 | 
						||
			wg.Done()
 | 
						||
			continue
 | 
						||
		}
 | 
						||
 | 
						||
		// 获取API节点信息
 | 
						||
		apiNodeId := node.ConnectedAPINodeIds[0]
 | 
						||
		rpcClient, ok := rpcMap[apiNodeId]
 | 
						||
		if !ok {
 | 
						||
			apiNodeResp, err := defaultRPCClient.APINodeRPC().FindEnabledAPINode(ctx, &pb.FindEnabledAPINodeRequest{NodeId: apiNodeId})
 | 
						||
			if err != nil {
 | 
						||
				locker.Lock()
 | 
						||
				results = append(results, &MessageResult{
 | 
						||
					NodeId:   node.Id,
 | 
						||
					NodeName: node.Name,
 | 
						||
					IsOK:     false,
 | 
						||
					Message:  "无法读取对应的API节点信息:" + err.Error(),
 | 
						||
				})
 | 
						||
				locker.Unlock()
 | 
						||
				wg.Done()
 | 
						||
				continue
 | 
						||
			}
 | 
						||
 | 
						||
			if apiNodeResp.Node == nil {
 | 
						||
				locker.Lock()
 | 
						||
				results = append(results, &MessageResult{
 | 
						||
					NodeId:   node.Id,
 | 
						||
					NodeName: node.Name,
 | 
						||
					IsOK:     false,
 | 
						||
					Message:  "无法读取对应的API节点信息:API节点ID:" + strconv.FormatInt(apiNodeId, 10),
 | 
						||
				})
 | 
						||
				locker.Unlock()
 | 
						||
				wg.Done()
 | 
						||
				continue
 | 
						||
			}
 | 
						||
			apiNode := apiNodeResp.Node
 | 
						||
 | 
						||
			apiRPCClient, err := rpc.NewRPCClient(&configs.APIConfig{
 | 
						||
				RPC: struct {
 | 
						||
					Endpoints []string `yaml:"endpoints"`
 | 
						||
				}{
 | 
						||
					Endpoints: apiNode.AccessAddrs,
 | 
						||
				},
 | 
						||
				NodeId: apiNode.UniqueId,
 | 
						||
				Secret: apiNode.Secret,
 | 
						||
			})
 | 
						||
			if err != nil {
 | 
						||
				locker.Lock()
 | 
						||
				results = append(results, &MessageResult{
 | 
						||
					NodeId:   node.Id,
 | 
						||
					NodeName: node.Name,
 | 
						||
					IsOK:     false,
 | 
						||
					Message:  "初始化API节点错误:API节点ID:" + strconv.FormatInt(apiNodeId, 10) + ":" + err.Error(),
 | 
						||
				})
 | 
						||
				locker.Unlock()
 | 
						||
				wg.Done()
 | 
						||
				continue
 | 
						||
			}
 | 
						||
			rpcMap[apiNodeId] = apiRPCClient
 | 
						||
			rpcClient = apiRPCClient
 | 
						||
		}
 | 
						||
 | 
						||
		// 发送消息
 | 
						||
		go func(node *pb.Node) {
 | 
						||
			defer wg.Done()
 | 
						||
 | 
						||
			result, err := rpcClient.NodeRPC().SendCommandToNode(ctx, &pb.NodeStreamMessage{
 | 
						||
				NodeId:         node.Id,
 | 
						||
				TimeoutSeconds: timeoutSeconds,
 | 
						||
				Code:           code,
 | 
						||
				DataJSON:       msgJSON,
 | 
						||
			})
 | 
						||
			if err != nil {
 | 
						||
				locker.Lock()
 | 
						||
				results = append(results, &MessageResult{
 | 
						||
					NodeId:   node.Id,
 | 
						||
					NodeName: node.Name,
 | 
						||
					IsOK:     false,
 | 
						||
					Message:  "API返回错误:" + err.Error(),
 | 
						||
				})
 | 
						||
				locker.Unlock()
 | 
						||
				return
 | 
						||
			}
 | 
						||
 | 
						||
			locker.Lock()
 | 
						||
			results = append(results, &MessageResult{
 | 
						||
				NodeId:   node.Id,
 | 
						||
				NodeName: node.Name,
 | 
						||
				IsOK:     result.IsOk,
 | 
						||
				Message:  result.Message,
 | 
						||
			})
 | 
						||
			locker.Unlock()
 | 
						||
		}(node)
 | 
						||
	}
 | 
						||
	wg.Wait()
 | 
						||
 | 
						||
	return
 | 
						||
}
 |