diff --git a/internal/db/models/node_dao.go b/internal/db/models/node_dao.go index 73a8b236..84d3f2cf 100644 --- a/internal/db/models/node_dao.go +++ b/internal/db/models/node_dao.go @@ -1668,10 +1668,31 @@ func (this *NodeDAO) UpdateNodeActive(tx *dbs.Tx, nodeId int64, isActive bool) e _, err := this.Query(tx). Pk(nodeId). Set("isActive", isActive). + Set("inactiveNotifiedAt", 0). Update() return err } +// UpdateNodeInactiveNotifiedAt 修改节点的离线通知时间 +func (this *NodeDAO) UpdateNodeInactiveNotifiedAt(tx *dbs.Tx, nodeId int64, inactiveAt int64) error { + if nodeId <= 0 { + return errors.New("invalid nodeId") + } + _, err := this.Query(tx). + Pk(nodeId). + Set("inactiveNotifiedAt", inactiveAt). + Update() + return err +} + +// FindNodeInactiveNotifiedAt 读取上次的节点离线通知时间 +func (this *NodeDAO) FindNodeInactiveNotifiedAt(tx *dbs.Tx, nodeId int64) (int64, error) { + return this.Query(tx). + Pk(nodeId). + Result("inactiveNotifiedAt"). + FindInt64Col(0) +} + // FindNodeActive 检查节点活跃状态 func (this *NodeDAO) FindNodeActive(tx *dbs.Tx, nodeId int64) (bool, error) { isActive, err := this.Query(tx). @@ -1827,6 +1848,19 @@ func (this *NodeDAO) FindParentNodeConfigs(tx *dbs.Tx, nodeId int64, groupId int if err != nil { return nil, err } + } else if nodeId > 0 { + // 当前节点所属分组 + groupId, err = this.Query(tx).Result("groupId").FindInt64Col(0) + if err != nil { + return nil, err + } + + if groupId > 0 { + parentNodes, err = this.FindEnabledNodesWithGroupIdAndLevel(tx, groupId, level+1) + if err != nil { + return nil, err + } + } } // 当前集群的L2 diff --git a/internal/db/models/node_model.go b/internal/db/models/node_model.go index e6d4b8b3..1b42d618 100644 --- a/internal/db/models/node_model.go +++ b/internal/db/models/node_model.go @@ -13,6 +13,7 @@ type Node struct { CountUp uint32 `field:"countUp"` // 连续在线次数 CountDown uint32 `field:"countDown"` // 连续下线次数 IsActive bool `field:"isActive"` // 是否活跃 + InactiveNotifiedAt uint64 `field:"inactiveNotifiedAt"` // 离线通知时间 UniqueId string `field:"uniqueId"` // 节点ID Secret string `field:"secret"` // 密钥 Name string `field:"name"` // 节点名 @@ -50,6 +51,7 @@ type NodeOperator struct { CountUp interface{} // 连续在线次数 CountDown interface{} // 连续下线次数 IsActive interface{} // 是否活跃 + InactiveNotifiedAt interface{} // 离线通知时间 UniqueId interface{} // 节点ID Secret interface{} // 密钥 Name interface{} // 节点名 diff --git a/internal/rpc/services/service_node_stream.go b/internal/rpc/services/service_node_stream.go index 24518c04..9133b91b 100644 --- a/internal/rpc/services/service_node_stream.go +++ b/internal/rpc/services/service_node_stream.go @@ -52,7 +52,7 @@ func NextCommandRequestId() int64 { func init() { // 清理WaitingChannelMap - ticker := time.NewTicker(30 * time.Second) + var ticker = time.NewTicker(30 * time.Second) goman.New(func() { for range ticker.C { nodeLocker.Lock() @@ -110,8 +110,6 @@ func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) erro return err } - //logs.Println("[RPC]accepted node '" + numberutils.FormatInt64(nodeId) + "' connection") - var tx = this.NullTx() // 标记为活跃状态 @@ -121,25 +119,31 @@ func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) erro } if !oldIsActive { - err = models.SharedNodeDAO.UpdateNodeActive(tx, nodeId, true) + inactiveNotifiedAt, err := models.SharedNodeDAO.FindNodeInactiveNotifiedAt(tx, nodeId) if err != nil { return err } + if inactiveNotifiedAt > 0 { + err = models.SharedNodeDAO.UpdateNodeActive(tx, nodeId, true) + if err != nil { + return err + } - // 发送恢复消息 - clusterId, err := models.SharedNodeDAO.FindNodeClusterId(tx, nodeId) - if err != nil { - return err - } - nodeName, err := models.SharedNodeDAO.FindNodeName(tx, nodeId) - if err != nil { - return err - } - subject := "节点\"" + nodeName + "\"已经恢复在线" - msg := "节点\"" + nodeName + "\"已经恢复在线" - err = models.SharedMessageDAO.CreateNodeMessage(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, models.MessageTypeNodeActive, models.MessageLevelSuccess, subject, msg, nil, false) - if err != nil { - return err + // 发送恢复消息 + clusterId, err := models.SharedNodeDAO.FindNodeClusterId(tx, nodeId) + if err != nil { + return err + } + nodeName, err := models.SharedNodeDAO.FindNodeName(tx, nodeId) + if err != nil { + return err + } + var subject = "节点\"" + nodeName + "\"已经恢复在线" + var msg = "节点\"" + nodeName + "\"已经恢复在线" + err = models.SharedMessageDAO.CreateNodeMessage(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, models.MessageTypeNodeActive, models.MessageLevelSuccess, subject, msg, nil, false) + if err != nil { + return err + } } } diff --git a/internal/tasks/node_monitor_task.go b/internal/tasks/node_monitor_task.go index 63cc56d7..5ebbe458 100644 --- a/internal/tasks/node_monitor_task.go +++ b/internal/tasks/node_monitor_task.go @@ -98,12 +98,18 @@ func (this *NodeMonitorTask) MonitorCluster(cluster *models.NodeCluster) error { this.inactiveMap[key] = 0 this.notifiedMap[nodeId] = time.Now().Unix() - subject := "节点\"" + node.Name + "\"已处于离线状态" - msg := "集群'" + cluster.Name + "'节点\"" + node.Name + "\"已处于离线状态,请检查节点是否异常" + var subject = "节点\"" + node.Name + "\"已处于离线状态" + var msg = "集群'" + cluster.Name + "'节点\"" + node.Name + "\"已处于离线状态,请检查节点是否异常" err = models.SharedMessageDAO.CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, clusterId, int64(node.Id), models.MessageTypeNodeInactive, models.LevelError, subject, msg, nil, false) if err != nil { return err } + + // 设置通知时间 + err = models.SharedNodeDAO.UpdateNodeInactiveNotifiedAt(nil, nodeId, time.Now().Unix()) + if err != nil { + return err + } } } else { delete(this.inactiveMap, key)