From a5118536e718e459babdc54525d792d2bcd92cbc Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Sun, 8 Aug 2021 16:17:25 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=8A=82=E7=82=B9=E5=88=B0AP?= =?UTF-8?q?I=E8=8A=82=E7=82=B9=E8=BF=9E=E6=8E=A5=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/db/models/node_task_dao.go | 12 +++++++----- .../services/nameservers/service_ns_node_stream.go | 6 ++++++ internal/rpc/services/service_node_stream.go | 6 ++++++ 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/internal/db/models/node_task_dao.go b/internal/db/models/node_task_dao.go index 22e837e9..2a5f0f9f 100644 --- a/internal/db/models/node_task_dao.go +++ b/internal/db/models/node_task_dao.go @@ -67,11 +67,12 @@ func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, role string, clusterId int64 "isOk": 0, "error": "", }, maps.Map{ - "clusterId": clusterId, - "updatedAt": updatedAt, - "isDone": 0, - "isOk": 0, - "error": "", + "clusterId": clusterId, + "updatedAt": updatedAt, + "isDone": 0, + "isOk": 0, + "error": "", + "isNotified": 0, }) return err } @@ -291,6 +292,7 @@ func (this *NodeTaskDAO) FindAllDoingNodeIds(tx *dbs.Tx, role string) ([]int64, Attr("role", role). Gt("nodeId", 0). Attr("isDone", false). + Attr("isNotified", 0). FindAll() if err != nil { return nil, err diff --git a/internal/rpc/services/nameservers/service_ns_node_stream.go b/internal/rpc/services/nameservers/service_ns_node_stream.go index 07a4e0f8..51ffe605 100644 --- a/internal/rpc/services/nameservers/service_ns_node_stream.go +++ b/internal/rpc/services/nameservers/service_ns_node_stream.go @@ -170,6 +170,12 @@ func (this *NSNodeService) NsNodeStream(server pb.NSNodeService_NsNodeStreamServ } nodeLocker.Unlock() + defer func() { + nodeLocker.Lock() + delete(requestChanMap, nodeId) + nodeLocker.Unlock() + }() + // 发送请求 go func() { for { diff --git a/internal/rpc/services/service_node_stream.go b/internal/rpc/services/service_node_stream.go index b51ddd88..b86b6e0f 100644 --- a/internal/rpc/services/service_node_stream.go +++ b/internal/rpc/services/service_node_stream.go @@ -134,6 +134,12 @@ func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) erro } nodeLocker.Unlock() + defer func() { + nodeLocker.Lock() + delete(requestChanMap, nodeId) + nodeLocker.Unlock() + }() + // 发送请求 go func() { for {