diff --git a/internal/db/models/node_task_dao.go b/internal/db/models/node_task_dao.go index 22e837e9..2a5f0f9f 100644 --- a/internal/db/models/node_task_dao.go +++ b/internal/db/models/node_task_dao.go @@ -67,11 +67,12 @@ func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, role string, clusterId int64 "isOk": 0, "error": "", }, maps.Map{ - "clusterId": clusterId, - "updatedAt": updatedAt, - "isDone": 0, - "isOk": 0, - "error": "", + "clusterId": clusterId, + "updatedAt": updatedAt, + "isDone": 0, + "isOk": 0, + "error": "", + "isNotified": 0, }) return err } @@ -291,6 +292,7 @@ func (this *NodeTaskDAO) FindAllDoingNodeIds(tx *dbs.Tx, role string) ([]int64, Attr("role", role). Gt("nodeId", 0). Attr("isDone", false). + Attr("isNotified", 0). FindAll() if err != nil { return nil, err diff --git a/internal/rpc/services/nameservers/service_ns_node_stream.go b/internal/rpc/services/nameservers/service_ns_node_stream.go index 07a4e0f8..51ffe605 100644 --- a/internal/rpc/services/nameservers/service_ns_node_stream.go +++ b/internal/rpc/services/nameservers/service_ns_node_stream.go @@ -170,6 +170,12 @@ func (this *NSNodeService) NsNodeStream(server pb.NSNodeService_NsNodeStreamServ } nodeLocker.Unlock() + defer func() { + nodeLocker.Lock() + delete(requestChanMap, nodeId) + nodeLocker.Unlock() + }() + // 发送请求 go func() { for { diff --git a/internal/rpc/services/service_node_stream.go b/internal/rpc/services/service_node_stream.go index b51ddd88..b86b6e0f 100644 --- a/internal/rpc/services/service_node_stream.go +++ b/internal/rpc/services/service_node_stream.go @@ -134,6 +134,12 @@ func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) erro } nodeLocker.Unlock() + defer func() { + nodeLocker.Lock() + delete(requestChanMap, nodeId) + nodeLocker.Unlock() + }() + // 发送请求 go func() { for {