Files
EdgeAPI/internal/rpc/services/service_node_stream.go

338 lines
8.0 KiB
Go
Raw Normal View History

2020-10-04 14:27:14 +08:00
package services
import (
"context"
"encoding/json"
"fmt"
"github.com/TeaOSLab/EdgeAPI/internal/configs"
2020-10-25 18:26:46 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
2020-10-04 14:27:14 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/errors"
2021-11-20 18:59:35 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
2020-10-04 14:27:14 +08:00
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs"
2021-08-08 10:29:48 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
2020-10-04 14:27:14 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/logs"
"strconv"
"sync"
"sync/atomic"
"time"
)
2021-11-11 14:16:42 +08:00
var primaryNodeId int64 = 0
// CommandRequest 命令请求相关
2020-10-04 14:27:14 +08:00
type CommandRequest struct {
Id int64
Code string
CommandJSON []byte
}
type CommandRequestWaiting struct {
Timestamp int64
Chan chan *pb.NodeStreamMessage
}
func (this *CommandRequestWaiting) Close() {
defer func() {
recover()
}()
close(this.Chan)
}
2021-10-17 17:12:30 +08:00
var nodeResponseChanMap = map[int64]*CommandRequestWaiting{} // request id => response
2020-10-04 14:27:14 +08:00
var commandRequestId = int64(0)
var nodeLocker = &sync.Mutex{}
2021-10-17 17:12:30 +08:00
var nodeRequestChanMap = map[int64]chan *CommandRequest{} // node id => chan
2020-10-04 14:27:14 +08:00
func NextCommandRequestId() int64 {
return atomic.AddInt64(&commandRequestId, 1)
}
func init() {
// 清理WaitingChannelMap
ticker := time.NewTicker(30 * time.Second)
go func() {
for range ticker.C {
nodeLocker.Lock()
2021-10-17 17:12:30 +08:00
for requestId, request := range nodeResponseChanMap {
2020-10-04 14:27:14 +08:00
if time.Now().Unix()-request.Timestamp > 3600 {
2021-10-17 17:12:30 +08:00
nodeResponseChanMap[requestId].Close()
delete(nodeResponseChanMap, requestId)
2020-10-04 14:27:14 +08:00
}
}
nodeLocker.Unlock()
}
}()
}
2021-04-12 19:19:15 +08:00
// NodeStream 节点stream
2020-10-04 14:27:14 +08:00
func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) error {
// TODO 使用此stream快速通知边缘节点更新
// 校验节点
2021-07-11 18:05:57 +08:00
_, _, nodeId, err := rpcutils.ValidateRequest(server.Context(), rpcutils.UserTypeNode)
2020-10-04 14:27:14 +08:00
if err != nil {
return err
}
2021-11-11 14:16:42 +08:00
// 选择一个作为主节点
if primaryNodeId == 0 {
primaryNodeId = nodeId
}
defer func() {
2021-11-20 18:59:35 +08:00
// 修改当前API节点的主边缘节点
2021-11-11 14:16:42 +08:00
if primaryNodeId == nodeId {
primaryNodeId = 0
nodeLocker.Lock()
if len(nodeRequestChanMap) > 0 {
for anotherNodeId := range nodeRequestChanMap {
primaryNodeId = anotherNodeId
break
}
}
nodeLocker.Unlock()
}
2021-11-20 18:59:35 +08:00
// 修改在线状态
err = models.SharedNodeDAO.UpdateNodeActive(nil, nodeId, false)
if err != nil {
remotelogs.Error("NODE_SERVICE", "change node active failed: "+err.Error())
}
2021-11-11 14:16:42 +08:00
}()
2020-10-04 14:27:14 +08:00
// 返回连接成功
{
apiConfig, err := configs.SharedAPIConfig()
if err != nil {
return err
}
connectedMessage := &messageconfigs.ConnectedAPINodeMessage{APINodeId: apiConfig.NumberId()}
connectedMessageJSON, err := json.Marshal(connectedMessage)
if err != nil {
return errors.Wrap(err)
}
err = server.Send(&pb.NodeStreamMessage{
Code: messageconfigs.MessageCodeConnectedAPINode,
DataJSON: connectedMessageJSON,
})
if err != nil {
return err
}
}
2021-05-12 16:10:41 +08:00
//logs.Println("[RPC]accepted node '" + numberutils.FormatInt64(nodeId) + "' connection")
2020-11-16 09:20:24 +08:00
tx := this.NullTx()
2020-11-16 09:20:24 +08:00
// 标记为活跃状态
oldIsActive, err := models.SharedNodeDAO.FindNodeActive(tx, nodeId)
2020-11-16 09:20:24 +08:00
if err != nil {
return err
}
2021-11-20 18:59:35 +08:00
2020-11-16 09:20:24 +08:00
if !oldIsActive {
err = models.SharedNodeDAO.UpdateNodeActive(tx, nodeId, true)
2020-11-16 09:20:24 +08:00
if err != nil {
return err
}
// 发送恢复消息
clusterId, err := models.SharedNodeDAO.FindNodeClusterId(tx, nodeId)
2020-11-16 09:20:24 +08:00
if err != nil {
return err
}
2021-04-12 19:19:15 +08:00
nodeName, err := models.SharedNodeDAO.FindNodeName(tx, nodeId)
if err != nil {
return err
}
subject := "节点\"" + nodeName + "\"已经恢复在线"
msg := "节点\"" + nodeName + "\"已经恢复在线"
err = models.SharedMessageDAO.CreateNodeMessage(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, models.MessageTypeNodeActive, models.MessageLevelSuccess, subject, msg, nil, false)
2020-11-16 09:20:24 +08:00
if err != nil {
return err
}
}
2020-10-04 14:27:14 +08:00
nodeLocker.Lock()
2021-10-17 17:12:30 +08:00
requestChan, ok := nodeRequestChanMap[nodeId]
2020-10-04 14:27:14 +08:00
if !ok {
requestChan = make(chan *CommandRequest, 1024)
2021-10-17 17:12:30 +08:00
nodeRequestChanMap[nodeId] = requestChan
2020-10-04 14:27:14 +08:00
}
nodeLocker.Unlock()
2021-08-08 16:17:25 +08:00
defer func() {
nodeLocker.Lock()
2021-10-17 17:12:30 +08:00
delete(nodeRequestChanMap, nodeId)
2021-08-08 16:17:25 +08:00
nodeLocker.Unlock()
}()
2020-10-04 14:27:14 +08:00
// 发送请求
go func() {
for {
select {
case <-server.Context().Done():
return
case commandRequest := <-requestChan:
2021-01-18 20:40:57 +08:00
// logs.Println("[RPC]sending command '" + commandRequest.Code + "' to node '" + strconv.FormatInt(nodeId, 10) + "'")
2020-10-04 14:27:14 +08:00
retries := 3 // 错误重试次数
for i := 0; i < retries; i++ {
err := server.Send(&pb.NodeStreamMessage{
RequestId: commandRequest.Id,
Code: commandRequest.Code,
DataJSON: commandRequest.CommandJSON,
})
if err != nil {
if i == retries-1 {
logs.Println("[RPC]send command '" + commandRequest.Code + "' failed: " + err.Error())
} else {
time.Sleep(1 * time.Second)
}
} else {
break
}
}
}
}
}()
// 接受请求
for {
req, err := server.Recv()
if err != nil {
2020-10-25 18:26:46 +08:00
// 修改节点状态
err1 := models.SharedNodeDAO.UpdateNodeIsActive(tx, nodeId, false)
2020-10-25 18:26:46 +08:00
if err1 != nil {
logs.Println(err1.Error())
}
2020-10-04 14:27:14 +08:00
return err
}
func(req *pb.NodeStreamMessage) {
// 因为 responseChan.Chan 有被关闭的风险所以我们使用recover防止panic
defer func() {
recover()
}()
nodeLocker.Lock()
2021-10-17 17:12:30 +08:00
responseChan, ok := nodeResponseChanMap[req.RequestId]
2020-10-04 14:27:14 +08:00
if ok {
select {
case responseChan.Chan <- req:
default:
}
}
nodeLocker.Unlock()
}(req)
}
}
2021-04-12 19:19:15 +08:00
// SendCommandToNode 向节点发送命令
2020-10-04 14:27:14 +08:00
func (this *NodeService) SendCommandToNode(ctx context.Context, req *pb.NodeStreamMessage) (*pb.NodeStreamMessage, error) {
// 校验请求
_, _, err := this.ValidateAdminAndUser(ctx, 0, 0)
2020-10-04 14:27:14 +08:00
if err != nil {
return nil, err
}
nodeId := req.NodeId
if nodeId <= 0 {
return nil, errors.New("node id should not be less than 0")
}
2021-10-17 17:12:30 +08:00
return SendCommandToNode(req.NodeId, req.RequestId, req.Code, req.DataJSON, req.TimeoutSeconds, true)
}
// SendCommandToNode 向节点发送命令
func SendCommandToNode(nodeId int64, requestId int64, messageCode string, dataJSON []byte, timeoutSeconds int32, forceConnecting bool) (result *pb.NodeStreamMessage, err error) {
2020-10-04 14:27:14 +08:00
nodeLocker.Lock()
2021-10-17 17:12:30 +08:00
requestChan, ok := nodeRequestChanMap[nodeId]
2020-10-04 14:27:14 +08:00
nodeLocker.Unlock()
if !ok {
2021-10-17 17:12:30 +08:00
if forceConnecting {
return &pb.NodeStreamMessage{
RequestId: requestId,
IsOk: false,
Message: "node '" + strconv.FormatInt(nodeId, 10) + "' not connected yet",
}, nil
} else {
return &pb.NodeStreamMessage{
RequestId: requestId,
IsOk: true,
}, nil
}
2020-10-04 14:27:14 +08:00
}
2021-10-17 17:12:30 +08:00
requestId = NextCommandRequestId()
2020-10-04 14:27:14 +08:00
select {
case requestChan <- &CommandRequest{
2021-10-17 17:12:30 +08:00
Id: requestId,
Code: messageCode,
CommandJSON: dataJSON,
2020-10-04 14:27:14 +08:00
}:
// 加入到等待队列中
respChan := make(chan *pb.NodeStreamMessage, 1)
waiting := &CommandRequestWaiting{
Timestamp: time.Now().Unix(),
Chan: respChan,
}
nodeLocker.Lock()
2021-10-17 17:12:30 +08:00
nodeResponseChanMap[requestId] = waiting
2020-10-04 14:27:14 +08:00
nodeLocker.Unlock()
// 等待响应
if timeoutSeconds <= 0 {
timeoutSeconds = 10
}
timeout := time.NewTimer(time.Duration(timeoutSeconds) * time.Second)
select {
case resp := <-respChan:
// 从队列中删除
nodeLocker.Lock()
2021-10-17 17:12:30 +08:00
delete(nodeResponseChanMap, requestId)
2020-10-04 14:27:14 +08:00
waiting.Close()
nodeLocker.Unlock()
if resp == nil {
return &pb.NodeStreamMessage{
2021-10-17 17:12:30 +08:00
RequestId: requestId,
Code: messageCode,
2020-10-04 14:27:14 +08:00
Message: "response timeout",
IsOk: false,
}, nil
}
return resp, nil
case <-timeout.C:
// 从队列中删除
nodeLocker.Lock()
2021-10-17 17:12:30 +08:00
delete(nodeResponseChanMap, requestId)
2020-10-04 14:27:14 +08:00
waiting.Close()
nodeLocker.Unlock()
return &pb.NodeStreamMessage{
2021-10-17 17:12:30 +08:00
RequestId: requestId,
Code: messageCode,
2020-10-04 14:27:14 +08:00
Message: "response timeout over " + fmt.Sprintf("%d", timeoutSeconds) + " seconds",
IsOk: false,
}, nil
}
default:
return &pb.NodeStreamMessage{
2021-10-17 17:12:30 +08:00
RequestId: requestId,
Code: messageCode,
2020-10-04 14:27:14 +08:00
Message: "command queue is full over " + strconv.Itoa(len(requestChan)),
IsOk: false,
}, nil
}
}