mirror of
https://github.com/TeaOSLab/EdgeAdmin.git
synced 2025-11-06 06:40:27 +08:00
增加节点同步状态提示和任务列表
This commit is contained in:
@@ -47,6 +47,176 @@ func SendMessageToCluster(ctx context.Context, clusterId int64, code string, msg
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(len(nodes))
|
||||
for _, node := range nodes {
|
||||
// TODO 检查是否在线
|
||||
|
||||
if len(node.ConnectedAPINodeIds) == 0 {
|
||||
locker.Lock()
|
||||
results = append(results, &MessageResult{
|
||||
NodeId: node.Id,
|
||||
NodeName: node.Name,
|
||||
IsOK: false,
|
||||
Message: "节点尚未连接到API",
|
||||
})
|
||||
locker.Unlock()
|
||||
wg.Done()
|
||||
continue
|
||||
}
|
||||
|
||||
// 获取API节点信息
|
||||
apiNodeId := node.ConnectedAPINodeIds[0]
|
||||
rpcClient, ok := rpcMap[apiNodeId]
|
||||
if !ok {
|
||||
apiNodeResp, err := defaultRPCClient.APINodeRPC().FindEnabledAPINode(ctx, &pb.FindEnabledAPINodeRequest{NodeId: apiNodeId})
|
||||
if err != nil {
|
||||
locker.Lock()
|
||||
results = append(results, &MessageResult{
|
||||
NodeId: node.Id,
|
||||
NodeName: node.Name,
|
||||
IsOK: false,
|
||||
Message: "无法读取对应的API节点信息:" + err.Error(),
|
||||
})
|
||||
locker.Unlock()
|
||||
wg.Done()
|
||||
continue
|
||||
}
|
||||
|
||||
if apiNodeResp.Node == nil {
|
||||
locker.Lock()
|
||||
results = append(results, &MessageResult{
|
||||
NodeId: node.Id,
|
||||
NodeName: node.Name,
|
||||
IsOK: false,
|
||||
Message: "无法读取对应的API节点信息:API节点ID:" + strconv.FormatInt(apiNodeId, 10),
|
||||
})
|
||||
locker.Unlock()
|
||||
wg.Done()
|
||||
continue
|
||||
}
|
||||
apiNode := apiNodeResp.Node
|
||||
|
||||
apiRPCClient, err := rpc.NewRPCClient(&configs.APIConfig{
|
||||
RPC: struct {
|
||||
Endpoints []string `yaml:"endpoints"`
|
||||
}{
|
||||
Endpoints: apiNode.AccessAddrs,
|
||||
},
|
||||
NodeId: apiNode.UniqueId,
|
||||
Secret: apiNode.Secret,
|
||||
})
|
||||
if err != nil {
|
||||
locker.Lock()
|
||||
results = append(results, &MessageResult{
|
||||
NodeId: node.Id,
|
||||
NodeName: node.Name,
|
||||
IsOK: false,
|
||||
Message: "初始化API节点错误:API节点ID:" + strconv.FormatInt(apiNodeId, 10) + ":" + err.Error(),
|
||||
})
|
||||
locker.Unlock()
|
||||
wg.Done()
|
||||
continue
|
||||
}
|
||||
rpcMap[apiNodeId] = apiRPCClient
|
||||
rpcClient = apiRPCClient
|
||||
}
|
||||
|
||||
// 发送消息
|
||||
go func(node *pb.Node) {
|
||||
defer wg.Done()
|
||||
|
||||
result, err := rpcClient.NodeRPC().SendCommandToNode(ctx, &pb.NodeStreamMessage{
|
||||
NodeId: node.Id,
|
||||
TimeoutSeconds: timeoutSeconds,
|
||||
Code: code,
|
||||
DataJSON: msgJSON,
|
||||
})
|
||||
if err != nil {
|
||||
locker.Lock()
|
||||
results = append(results, &MessageResult{
|
||||
NodeId: node.Id,
|
||||
NodeName: node.Name,
|
||||
IsOK: false,
|
||||
Message: "API返回错误:" + err.Error(),
|
||||
})
|
||||
locker.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
locker.Lock()
|
||||
results = append(results, &MessageResult{
|
||||
NodeId: node.Id,
|
||||
NodeName: node.Name,
|
||||
IsOK: result.IsOk,
|
||||
Message: result.Message,
|
||||
})
|
||||
locker.Unlock()
|
||||
}(node)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// 向一组节点发送命令消息
|
||||
func SendMessageToNodeIds(ctx context.Context, nodeIds []int64, code string, msg interface{}, timeoutSeconds int32) (results []*MessageResult, err error) {
|
||||
results = []*MessageResult{}
|
||||
if len(nodeIds) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
msgJSON, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return results, err
|
||||
}
|
||||
|
||||
defaultRPCClient, err := rpc.SharedRPC()
|
||||
if err != nil {
|
||||
return results, err
|
||||
}
|
||||
|
||||
// 获取所有节点
|
||||
nodesResp, err := defaultRPCClient.NodeRPC().FindEnabledNodesWithIds(ctx, &pb.FindEnabledNodesWithIdsRequest{NodeIds: nodeIds})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nodes := nodesResp.Nodes
|
||||
if len(nodes) == 0 {
|
||||
return results, nil
|
||||
}
|
||||
|
||||
rpcMap := map[int64]*rpc.RPCClient{} // apiNodeId => RPCClient
|
||||
locker := &sync.Mutex{}
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(len(nodes))
|
||||
for _, node := range nodes {
|
||||
if !node.IsActive {
|
||||
locker.Lock()
|
||||
results = append(results, &MessageResult{
|
||||
NodeId: node.Id,
|
||||
NodeName: node.Name,
|
||||
IsOK: false,
|
||||
Message: "节点不在线",
|
||||
})
|
||||
locker.Unlock()
|
||||
wg.Done()
|
||||
continue
|
||||
}
|
||||
|
||||
if !node.IsOn {
|
||||
if !node.IsActive {
|
||||
locker.Lock()
|
||||
results = append(results, &MessageResult{
|
||||
NodeId: node.Id,
|
||||
NodeName: node.Name,
|
||||
IsOK: false,
|
||||
Message: "节点未启用",
|
||||
})
|
||||
locker.Unlock()
|
||||
wg.Done()
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if len(node.ConnectedAPINodeIds) == 0 {
|
||||
locker.Lock()
|
||||
results = append(results, &MessageResult{
|
||||
|
||||
Reference in New Issue
Block a user