mirror of
https://github.com/TeaOSLab/EdgeAPI.git
synced 2025-11-20 04:50:24 +08:00
增加清除服务缓存API
This commit is contained in:
@@ -38,11 +38,11 @@ func (this *CommandRequestWaiting) Close() {
|
||||
close(this.Chan)
|
||||
}
|
||||
|
||||
var responseChanMap = map[int64]*CommandRequestWaiting{} // request id => response
|
||||
var nodeResponseChanMap = map[int64]*CommandRequestWaiting{} // request id => response
|
||||
var commandRequestId = int64(0)
|
||||
|
||||
var nodeLocker = &sync.Mutex{}
|
||||
var requestChanMap = map[int64]chan *CommandRequest{} // node id => chan
|
||||
var nodeRequestChanMap = map[int64]chan *CommandRequest{} // node id => chan
|
||||
|
||||
func NextCommandRequestId() int64 {
|
||||
return atomic.AddInt64(&commandRequestId, 1)
|
||||
@@ -54,10 +54,10 @@ func init() {
|
||||
go func() {
|
||||
for range ticker.C {
|
||||
nodeLocker.Lock()
|
||||
for requestId, request := range responseChanMap {
|
||||
for requestId, request := range nodeResponseChanMap {
|
||||
if time.Now().Unix()-request.Timestamp > 3600 {
|
||||
responseChanMap[requestId].Close()
|
||||
delete(responseChanMap, requestId)
|
||||
nodeResponseChanMap[requestId].Close()
|
||||
delete(nodeResponseChanMap, requestId)
|
||||
}
|
||||
}
|
||||
nodeLocker.Unlock()
|
||||
@@ -127,16 +127,16 @@ func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) erro
|
||||
}
|
||||
|
||||
nodeLocker.Lock()
|
||||
requestChan, ok := requestChanMap[nodeId]
|
||||
requestChan, ok := nodeRequestChanMap[nodeId]
|
||||
if !ok {
|
||||
requestChan = make(chan *CommandRequest, 1024)
|
||||
requestChanMap[nodeId] = requestChan
|
||||
nodeRequestChanMap[nodeId] = requestChan
|
||||
}
|
||||
nodeLocker.Unlock()
|
||||
|
||||
defer func() {
|
||||
nodeLocker.Lock()
|
||||
delete(requestChanMap, nodeId)
|
||||
delete(nodeRequestChanMap, nodeId)
|
||||
nodeLocker.Unlock()
|
||||
}()
|
||||
|
||||
@@ -189,7 +189,7 @@ func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) erro
|
||||
}()
|
||||
|
||||
nodeLocker.Lock()
|
||||
responseChan, ok := responseChanMap[req.RequestId]
|
||||
responseChan, ok := nodeResponseChanMap[req.RequestId]
|
||||
if ok {
|
||||
select {
|
||||
case responseChan.Chan <- req:
|
||||
@@ -215,25 +215,37 @@ func (this *NodeService) SendCommandToNode(ctx context.Context, req *pb.NodeStre
|
||||
return nil, errors.New("node id should not be less than 0")
|
||||
}
|
||||
|
||||
return SendCommandToNode(req.NodeId, req.RequestId, req.Code, req.DataJSON, req.TimeoutSeconds, true)
|
||||
}
|
||||
|
||||
// SendCommandToNode 向节点发送命令
|
||||
func SendCommandToNode(nodeId int64, requestId int64, messageCode string, dataJSON []byte, timeoutSeconds int32, forceConnecting bool) (result *pb.NodeStreamMessage, err error) {
|
||||
nodeLocker.Lock()
|
||||
requestChan, ok := requestChanMap[nodeId]
|
||||
requestChan, ok := nodeRequestChanMap[nodeId]
|
||||
nodeLocker.Unlock()
|
||||
|
||||
if !ok {
|
||||
return &pb.NodeStreamMessage{
|
||||
RequestId: req.RequestId,
|
||||
IsOk: false,
|
||||
Message: "node '" + strconv.FormatInt(nodeId, 10) + "' not connected yet",
|
||||
}, nil
|
||||
if forceConnecting {
|
||||
return &pb.NodeStreamMessage{
|
||||
RequestId: requestId,
|
||||
IsOk: false,
|
||||
Message: "node '" + strconv.FormatInt(nodeId, 10) + "' not connected yet",
|
||||
}, nil
|
||||
} else {
|
||||
return &pb.NodeStreamMessage{
|
||||
RequestId: requestId,
|
||||
IsOk: true,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
req.RequestId = NextCommandRequestId()
|
||||
requestId = NextCommandRequestId()
|
||||
|
||||
select {
|
||||
case requestChan <- &CommandRequest{
|
||||
Id: req.RequestId,
|
||||
Code: req.Code,
|
||||
CommandJSON: req.DataJSON,
|
||||
Id: requestId,
|
||||
Code: messageCode,
|
||||
CommandJSON: dataJSON,
|
||||
}:
|
||||
// 加入到等待队列中
|
||||
respChan := make(chan *pb.NodeStreamMessage, 1)
|
||||
@@ -243,11 +255,10 @@ func (this *NodeService) SendCommandToNode(ctx context.Context, req *pb.NodeStre
|
||||
}
|
||||
|
||||
nodeLocker.Lock()
|
||||
responseChanMap[req.RequestId] = waiting
|
||||
nodeResponseChanMap[requestId] = waiting
|
||||
nodeLocker.Unlock()
|
||||
|
||||
// 等待响应
|
||||
timeoutSeconds := req.TimeoutSeconds
|
||||
if timeoutSeconds <= 0 {
|
||||
timeoutSeconds = 10
|
||||
}
|
||||
@@ -256,14 +267,14 @@ func (this *NodeService) SendCommandToNode(ctx context.Context, req *pb.NodeStre
|
||||
case resp := <-respChan:
|
||||
// 从队列中删除
|
||||
nodeLocker.Lock()
|
||||
delete(responseChanMap, req.RequestId)
|
||||
delete(nodeResponseChanMap, requestId)
|
||||
waiting.Close()
|
||||
nodeLocker.Unlock()
|
||||
|
||||
if resp == nil {
|
||||
return &pb.NodeStreamMessage{
|
||||
RequestId: req.RequestId,
|
||||
Code: req.Code,
|
||||
RequestId: requestId,
|
||||
Code: messageCode,
|
||||
Message: "response timeout",
|
||||
IsOk: false,
|
||||
}, nil
|
||||
@@ -273,21 +284,21 @@ func (this *NodeService) SendCommandToNode(ctx context.Context, req *pb.NodeStre
|
||||
case <-timeout.C:
|
||||
// 从队列中删除
|
||||
nodeLocker.Lock()
|
||||
delete(responseChanMap, req.RequestId)
|
||||
delete(nodeResponseChanMap, requestId)
|
||||
waiting.Close()
|
||||
nodeLocker.Unlock()
|
||||
|
||||
return &pb.NodeStreamMessage{
|
||||
RequestId: req.RequestId,
|
||||
Code: req.Code,
|
||||
RequestId: requestId,
|
||||
Code: messageCode,
|
||||
Message: "response timeout over " + fmt.Sprintf("%d", timeoutSeconds) + " seconds",
|
||||
IsOk: false,
|
||||
}, nil
|
||||
}
|
||||
default:
|
||||
return &pb.NodeStreamMessage{
|
||||
RequestId: req.RequestId,
|
||||
Code: req.Code,
|
||||
RequestId: requestId,
|
||||
Code: messageCode,
|
||||
Message: "command queue is full over " + strconv.Itoa(len(requestChan)),
|
||||
IsOk: false,
|
||||
}, nil
|
||||
|
||||
Reference in New Issue
Block a user