DNS节点增加在线状态通知

This commit is contained in:
GoEdgeLab
2021-08-08 10:29:48 +08:00
parent 11f7e5010e
commit 07aa0990f8
14 changed files with 623 additions and 86 deletions

View File

@@ -119,6 +119,7 @@ func (this *NSNodeService) ListEnabledNSNodesMatch(ctx context.Context, req *pb.
IsOn: node.IsOn == 1,
UniqueId: node.UniqueId,
Secret: node.Secret,
IsActive: node.IsActive == 1,
IsInstalled: node.IsInstalled == 1,
InstallDir: node.InstallDir,
IsUp: node.IsUp == 1,
@@ -420,3 +421,21 @@ func (this *NSNodeService) DownloadNSNodeInstallationFile(ctx context.Context, r
Filename: filepath.Base(file.Path),
}, nil
}
// UpdateNSNodeConnectedAPINodes 更改节点连接的API节点信息
func (this *NSNodeService) UpdateNSNodeConnectedAPINodes(ctx context.Context, req *pb.UpdateNSNodeConnectedAPINodesRequest) (*pb.RPCSuccess, error) {
// 校验节点
_, _, nodeId, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeDNS)
if err != nil {
return nil, err
}
tx := this.NullTx()
err = nameservers.SharedNSNodeDAO.UpdateNodeConnectedAPINodes(tx, nodeId, req.ApiNodeIds)
if err != nil {
return nil, errors.Wrap(err)
}
return this.Success()
}

View File

@@ -0,0 +1,290 @@
package nameservers
import (
"context"
"encoding/json"
"fmt"
"github.com/TeaOSLab/EdgeAPI/internal/configs"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/logs"
"strconv"
"sync"
"sync/atomic"
"time"
)
// CommandRequest 命令请求相关
type CommandRequest struct {
Id int64
Code string
CommandJSON []byte
}
type CommandRequestWaiting struct {
Timestamp int64
Chan chan *pb.NSNodeStreamMessage
}
func (this *CommandRequestWaiting) Close() {
defer func() {
recover()
}()
close(this.Chan)
}
var responseChanMap = map[int64]*CommandRequestWaiting{} // request id => response
var commandRequestId = int64(0)
var nodeLocker = &sync.Mutex{}
var requestChanMap = map[int64]chan *CommandRequest{} // node id => chan
func NextCommandRequestId() int64 {
return atomic.AddInt64(&commandRequestId, 1)
}
func init() {
// 清理WaitingChannelMap
ticker := time.NewTicker(30 * time.Second)
go func() {
for range ticker.C {
nodeLocker.Lock()
for requestId, request := range responseChanMap {
if time.Now().Unix()-request.Timestamp > 3600 {
responseChanMap[requestId].Close()
delete(responseChanMap, requestId)
}
}
nodeLocker.Unlock()
}
}()
}
// NsNodeStream 节点stream
func (this *NSNodeService) NsNodeStream(server pb.NSNodeService_NsNodeStreamServer) error {
// TODO 使用此stream快速通知NS节点更新
// 校验节点
_, _, nodeId, err := rpcutils.ValidateRequest(server.Context(), rpcutils.UserTypeDNS)
if err != nil {
return err
}
// 返回连接成功
{
apiConfig, err := configs.SharedAPIConfig()
if err != nil {
return err
}
connectedMessage := &messageconfigs.NSConnectedAPINodeMessage{APINodeId: apiConfig.NumberId()}
connectedMessageJSON, err := json.Marshal(connectedMessage)
if err != nil {
return errors.Wrap(err)
}
err = server.Send(&pb.NSNodeStreamMessage{
Code: messageconfigs.NSMessageCodeConnectedAPINode,
DataJSON: connectedMessageJSON,
})
if err != nil {
return err
}
}
//logs.Println("[RPC]accepted ns node '" + types.String(nodeId) + "' connection")
tx := this.NullTx()
// 标记为活跃状态
oldIsActive, err := nameservers.SharedNSNodeDAO.FindNodeActive(tx, nodeId)
if err != nil {
return err
}
if !oldIsActive {
err = nameservers.SharedNSNodeDAO.UpdateNodeActive(tx, nodeId, true)
if err != nil {
return err
}
// 发送恢复消息
clusterId, err := nameservers.SharedNSNodeDAO.FindNodeClusterId(tx, nodeId)
if err != nil {
return err
}
nodeName, err := nameservers.SharedNSNodeDAO.FindEnabledNSNodeName(tx, nodeId)
if err != nil {
return err
}
subject := "DNS节点\"" + nodeName + "\"已经恢复在线"
msg := "DNS节点\"" + nodeName + "\"已经恢复在线"
err = models.SharedMessageDAO.CreateNodeMessage(tx, nodeconfigs.NodeRoleDNS, clusterId, nodeId, models.MessageTypeNSNodeActive, models.MessageLevelSuccess, subject, msg, nil)
if err != nil {
return err
}
}
nodeLocker.Lock()
requestChan, ok := requestChanMap[nodeId]
if !ok {
requestChan = make(chan *CommandRequest, 1024)
requestChanMap[nodeId] = requestChan
}
nodeLocker.Unlock()
// 发送请求
go func() {
for {
select {
case <-server.Context().Done():
return
case commandRequest := <-requestChan:
// logs.Println("[RPC]sending command '" + commandRequest.Code + "' to node '" + strconv.FormatInt(nodeId, 10) + "'")
retries := 3 // 错误重试次数
for i := 0; i < retries; i++ {
err := server.Send(&pb.NSNodeStreamMessage{
RequestId: commandRequest.Id,
Code: commandRequest.Code,
DataJSON: commandRequest.CommandJSON,
})
if err != nil {
if i == retries-1 {
logs.Println("[RPC]send command '" + commandRequest.Code + "' failed: " + err.Error())
} else {
time.Sleep(1 * time.Second)
}
} else {
break
}
}
}
}
}()
// 接受请求
for {
req, err := server.Recv()
if err != nil {
// 修改节点状态
err1 := nameservers.SharedNSNodeDAO.UpdateNodeActive(tx, nodeId, false)
if err1 != nil {
logs.Println(err1.Error())
}
return err
}
func(req *pb.NSNodeStreamMessage) {
// 因为 responseChan.Chan 有被关闭的风险所以我们使用recover防止panic
defer func() {
recover()
}()
nodeLocker.Lock()
responseChan, ok := responseChanMap[req.RequestId]
if ok {
select {
case responseChan.Chan <- req:
default:
}
}
nodeLocker.Unlock()
}(req)
}
}
// SendCommandToNSNode 向节点发送命令
func (this *NSNodeService) SendCommandToNSNode(ctx context.Context, req *pb.NSNodeStreamMessage) (*pb.NSNodeStreamMessage, error) {
// 校验请求
_, _, err := this.ValidateAdminAndUser(ctx, 0, 0)
if err != nil {
return nil, err
}
nodeId := req.NsNodeId
if nodeId <= 0 {
return nil, errors.New("node id should not be less than 0")
}
nodeLocker.Lock()
requestChan, ok := requestChanMap[nodeId]
nodeLocker.Unlock()
if !ok {
return &pb.NSNodeStreamMessage{
RequestId: req.RequestId,
IsOk: false,
Message: "node '" + strconv.FormatInt(nodeId, 10) + "' not connected yet",
}, nil
}
req.RequestId = NextCommandRequestId()
select {
case requestChan <- &CommandRequest{
Id: req.RequestId,
Code: req.Code,
CommandJSON: req.DataJSON,
}:
// 加入到等待队列中
respChan := make(chan *pb.NSNodeStreamMessage, 1)
waiting := &CommandRequestWaiting{
Timestamp: time.Now().Unix(),
Chan: respChan,
}
nodeLocker.Lock()
responseChanMap[req.RequestId] = waiting
nodeLocker.Unlock()
// 等待响应
timeoutSeconds := req.TimeoutSeconds
if timeoutSeconds <= 0 {
timeoutSeconds = 10
}
timeout := time.NewTimer(time.Duration(timeoutSeconds) * time.Second)
select {
case resp := <-respChan:
// 从队列中删除
nodeLocker.Lock()
delete(responseChanMap, req.RequestId)
waiting.Close()
nodeLocker.Unlock()
if resp == nil {
return &pb.NSNodeStreamMessage{
RequestId: req.RequestId,
Code: req.Code,
Message: "response timeout",
IsOk: false,
}, nil
}
return resp, nil
case <-timeout.C:
// 从队列中删除
nodeLocker.Lock()
delete(responseChanMap, req.RequestId)
waiting.Close()
nodeLocker.Unlock()
return &pb.NSNodeStreamMessage{
RequestId: req.RequestId,
Code: req.Code,
Message: "response timeout over " + fmt.Sprintf("%d", timeoutSeconds) + " seconds",
IsOk: false,
}, nil
}
default:
return &pb.NSNodeStreamMessage{
RequestId: req.RequestId,
Code: req.Code,
Message: "command queue is full over " + strconv.Itoa(len(requestChan)),
IsOk: false,
}, nil
}
}

View File

@@ -3,15 +3,17 @@ package services
import (
"context"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
)
// 消息相关服务
// MessageService 消息相关服务
type MessageService struct {
BaseService
}
// 计算未读消息数
// CountUnreadMessages 计算未读消息数
func (this *MessageService) CountUnreadMessages(ctx context.Context, req *pb.CountUnreadMessagesRequest) (*pb.RPCCountResponse, error) {
// 校验请求
adminId, userId, err := this.ValidateAdminAndUser(ctx, 0, 0)
@@ -28,7 +30,7 @@ func (this *MessageService) CountUnreadMessages(ctx context.Context, req *pb.Cou
return this.SuccessCount(count)
}
// 列出单页未读消息
// ListUnreadMessages 列出单页未读消息
func (this *MessageService) ListUnreadMessages(ctx context.Context, req *pb.ListUnreadMessagesRequest) (*pb.ListUnreadMessagesResponse, error) {
// 校验请求
adminId, userId, err := this.ValidateAdminAndUser(ctx, 0, 0)
@@ -48,33 +50,62 @@ func (this *MessageService) ListUnreadMessages(ctx context.Context, req *pb.List
var pbNode *pb.Node = nil
if message.ClusterId > 0 {
cluster, err := models.SharedNodeClusterDAO.FindEnabledNodeCluster(tx, int64(message.ClusterId))
if err != nil {
return nil, err
}
if cluster != nil {
pbCluster = &pb.NodeCluster{
Id: int64(cluster.Id),
Name: cluster.Name,
switch message.Role {
case nodeconfigs.NodeRoleNode:
cluster, err := models.SharedNodeClusterDAO.FindEnabledNodeCluster(tx, int64(message.ClusterId))
if err != nil {
return nil, err
}
if cluster != nil {
pbCluster = &pb.NodeCluster{
Id: int64(cluster.Id),
Name: cluster.Name,
}
}
case nodeconfigs.NodeRoleDNS:
cluster, err := nameservers.SharedNSClusterDAO.FindEnabledNSCluster(tx, int64(message.ClusterId))
if err != nil {
return nil, err
}
if cluster != nil {
pbCluster = &pb.NodeCluster{
Id: int64(cluster.Id),
Name: cluster.Name,
}
}
}
}
if message.NodeId > 0 {
node, err := models.SharedNodeDAO.FindEnabledNode(tx, int64(message.NodeId))
if err != nil {
return nil, err
}
if node != nil {
pbNode = &pb.Node{
Id: int64(node.Id),
Name: node.Name,
switch message.Role {
case nodeconfigs.NodeRoleNode:
node, err := models.SharedNodeDAO.FindEnabledNode(tx, int64(message.NodeId))
if err != nil {
return nil, err
}
if node != nil {
pbNode = &pb.Node{
Id: int64(node.Id),
Name: node.Name,
}
}
case nodeconfigs.NodeRoleDNS:
node, err := nameservers.SharedNSNodeDAO.FindEnabledNSNode(tx, int64(message.NodeId))
if err != nil {
return nil, err
}
if node != nil {
pbNode = &pb.Node{
Id: int64(node.Id),
Name: node.Name,
}
}
}
}
result = append(result, &pb.Message{
Id: int64(message.Id),
Role: message.Role,
Type: message.Type,
Body: message.Body,
Level: message.Level,
@@ -89,7 +120,7 @@ func (this *MessageService) ListUnreadMessages(ctx context.Context, req *pb.List
return &pb.ListUnreadMessagesResponse{Messages: result}, nil
}
// 设置消息已读状态
// UpdateMessageRead 设置消息已读状态
func (this *MessageService) UpdateMessageRead(ctx context.Context, req *pb.UpdateMessageReadRequest) (*pb.RPCSuccess, error) {
// 校验请求
adminId, userId, err := this.ValidateAdminAndUser(ctx, 0, 0)
@@ -115,7 +146,7 @@ func (this *MessageService) UpdateMessageRead(ctx context.Context, req *pb.Updat
return this.Success()
}
// 设置一组消息已读状态
// UpdateMessagesRead 设置一组消息已读状态
func (this *MessageService) UpdateMessagesRead(ctx context.Context, req *pb.UpdateMessagesReadRequest) (*pb.RPCSuccess, error) {
// 校验请求
adminId, userId, err := this.ValidateAdminAndUser(ctx, 0, 0)
@@ -143,7 +174,7 @@ func (this *MessageService) UpdateMessagesRead(ctx context.Context, req *pb.Upda
return this.Success()
}
// 设置所有消息为已读
// UpdateAllMessagesRead 设置所有消息为已读
func (this *MessageService) UpdateAllMessagesRead(ctx context.Context, req *pb.UpdateAllMessagesReadRequest) (*pb.RPCSuccess, error) {
// 校验请求
// 校验请求

View File

@@ -9,6 +9,7 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/errors"
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/logs"
"strconv"
@@ -119,7 +120,7 @@ func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) erro
}
subject := "节点\"" + nodeName + "\"已经恢复在线"
msg := "节点\"" + nodeName + "\"已经恢复在线"
err = models.SharedMessageDAO.CreateNodeMessage(tx, clusterId, nodeId, models.MessageTypeNodeActive, models.MessageLevelSuccess, subject, msg, nil)
err = models.SharedMessageDAO.CreateNodeMessage(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, models.MessageTypeNodeActive, models.MessageLevelSuccess, subject, msg, nil)
if err != nil {
return err
}