只有发送过离线通知的节点才会发送恢复在线通知

This commit is contained in:
GoEdgeLab
2022-08-07 17:28:54 +08:00
parent 2aa93e826d
commit d00061d3c4
4 changed files with 66 additions and 20 deletions

View File

@@ -1668,10 +1668,31 @@ func (this *NodeDAO) UpdateNodeActive(tx *dbs.Tx, nodeId int64, isActive bool) e
_, err := this.Query(tx). _, err := this.Query(tx).
Pk(nodeId). Pk(nodeId).
Set("isActive", isActive). Set("isActive", isActive).
Set("inactiveNotifiedAt", 0).
Update() Update()
return err return err
} }
// UpdateNodeInactiveNotifiedAt 修改节点的离线通知时间
func (this *NodeDAO) UpdateNodeInactiveNotifiedAt(tx *dbs.Tx, nodeId int64, inactiveAt int64) error {
if nodeId <= 0 {
return errors.New("invalid nodeId")
}
_, err := this.Query(tx).
Pk(nodeId).
Set("inactiveNotifiedAt", inactiveAt).
Update()
return err
}
// FindNodeInactiveNotifiedAt 读取上次的节点离线通知时间
func (this *NodeDAO) FindNodeInactiveNotifiedAt(tx *dbs.Tx, nodeId int64) (int64, error) {
return this.Query(tx).
Pk(nodeId).
Result("inactiveNotifiedAt").
FindInt64Col(0)
}
// FindNodeActive 检查节点活跃状态 // FindNodeActive 检查节点活跃状态
func (this *NodeDAO) FindNodeActive(tx *dbs.Tx, nodeId int64) (bool, error) { func (this *NodeDAO) FindNodeActive(tx *dbs.Tx, nodeId int64) (bool, error) {
isActive, err := this.Query(tx). isActive, err := this.Query(tx).
@@ -1827,6 +1848,19 @@ func (this *NodeDAO) FindParentNodeConfigs(tx *dbs.Tx, nodeId int64, groupId int
if err != nil { if err != nil {
return nil, err return nil, err
} }
} else if nodeId > 0 {
// 当前节点所属分组
groupId, err = this.Query(tx).Result("groupId").FindInt64Col(0)
if err != nil {
return nil, err
}
if groupId > 0 {
parentNodes, err = this.FindEnabledNodesWithGroupIdAndLevel(tx, groupId, level+1)
if err != nil {
return nil, err
}
}
} }
// 当前集群的L2 // 当前集群的L2

View File

@@ -13,6 +13,7 @@ type Node struct {
CountUp uint32 `field:"countUp"` // 连续在线次数 CountUp uint32 `field:"countUp"` // 连续在线次数
CountDown uint32 `field:"countDown"` // 连续下线次数 CountDown uint32 `field:"countDown"` // 连续下线次数
IsActive bool `field:"isActive"` // 是否活跃 IsActive bool `field:"isActive"` // 是否活跃
InactiveNotifiedAt uint64 `field:"inactiveNotifiedAt"` // 离线通知时间
UniqueId string `field:"uniqueId"` // 节点ID UniqueId string `field:"uniqueId"` // 节点ID
Secret string `field:"secret"` // 密钥 Secret string `field:"secret"` // 密钥
Name string `field:"name"` // 节点名 Name string `field:"name"` // 节点名
@@ -50,6 +51,7 @@ type NodeOperator struct {
CountUp interface{} // 连续在线次数 CountUp interface{} // 连续在线次数
CountDown interface{} // 连续下线次数 CountDown interface{} // 连续下线次数
IsActive interface{} // 是否活跃 IsActive interface{} // 是否活跃
InactiveNotifiedAt interface{} // 离线通知时间
UniqueId interface{} // 节点ID UniqueId interface{} // 节点ID
Secret interface{} // 密钥 Secret interface{} // 密钥
Name interface{} // 节点名 Name interface{} // 节点名

View File

@@ -52,7 +52,7 @@ func NextCommandRequestId() int64 {
func init() { func init() {
// 清理WaitingChannelMap // 清理WaitingChannelMap
ticker := time.NewTicker(30 * time.Second) var ticker = time.NewTicker(30 * time.Second)
goman.New(func() { goman.New(func() {
for range ticker.C { for range ticker.C {
nodeLocker.Lock() nodeLocker.Lock()
@@ -110,8 +110,6 @@ func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) erro
return err return err
} }
//logs.Println("[RPC]accepted node '" + numberutils.FormatInt64(nodeId) + "' connection")
var tx = this.NullTx() var tx = this.NullTx()
// 标记为活跃状态 // 标记为活跃状态
@@ -121,25 +119,31 @@ func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) erro
} }
if !oldIsActive { if !oldIsActive {
err = models.SharedNodeDAO.UpdateNodeActive(tx, nodeId, true) inactiveNotifiedAt, err := models.SharedNodeDAO.FindNodeInactiveNotifiedAt(tx, nodeId)
if err != nil { if err != nil {
return err return err
} }
if inactiveNotifiedAt > 0 {
err = models.SharedNodeDAO.UpdateNodeActive(tx, nodeId, true)
if err != nil {
return err
}
// 发送恢复消息 // 发送恢复消息
clusterId, err := models.SharedNodeDAO.FindNodeClusterId(tx, nodeId) clusterId, err := models.SharedNodeDAO.FindNodeClusterId(tx, nodeId)
if err != nil { if err != nil {
return err return err
} }
nodeName, err := models.SharedNodeDAO.FindNodeName(tx, nodeId) nodeName, err := models.SharedNodeDAO.FindNodeName(tx, nodeId)
if err != nil { if err != nil {
return err return err
} }
subject := "节点\"" + nodeName + "\"已经恢复在线" var subject = "节点\"" + nodeName + "\"已经恢复在线"
msg := "节点\"" + nodeName + "\"已经恢复在线" var msg = "节点\"" + nodeName + "\"已经恢复在线"
err = models.SharedMessageDAO.CreateNodeMessage(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, models.MessageTypeNodeActive, models.MessageLevelSuccess, subject, msg, nil, false) err = models.SharedMessageDAO.CreateNodeMessage(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, models.MessageTypeNodeActive, models.MessageLevelSuccess, subject, msg, nil, false)
if err != nil { if err != nil {
return err return err
}
} }
} }

View File

@@ -98,12 +98,18 @@ func (this *NodeMonitorTask) MonitorCluster(cluster *models.NodeCluster) error {
this.inactiveMap[key] = 0 this.inactiveMap[key] = 0
this.notifiedMap[nodeId] = time.Now().Unix() this.notifiedMap[nodeId] = time.Now().Unix()
subject := "节点\"" + node.Name + "\"已处于离线状态" var subject = "节点\"" + node.Name + "\"已处于离线状态"
msg := "集群'" + cluster.Name + "'节点\"" + node.Name + "\"已处于离线状态,请检查节点是否异常" var msg = "集群'" + cluster.Name + "'节点\"" + node.Name + "\"已处于离线状态,请检查节点是否异常"
err = models.SharedMessageDAO.CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, clusterId, int64(node.Id), models.MessageTypeNodeInactive, models.LevelError, subject, msg, nil, false) err = models.SharedMessageDAO.CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, clusterId, int64(node.Id), models.MessageTypeNodeInactive, models.LevelError, subject, msg, nil, false)
if err != nil { if err != nil {
return err return err
} }
// 设置通知时间
err = models.SharedNodeDAO.UpdateNodeInactiveNotifiedAt(nil, nodeId, time.Now().Unix())
if err != nil {
return err
}
} }
} else { } else {
delete(this.inactiveMap, key) delete(this.inactiveMap, key)