mirror of
https://github.com/TeaOSLab/EdgeAPI.git
synced 2025-11-23 23:10:26 +08:00
只有连续N分钟离线的节点才会发送通知
This commit is contained in:
@@ -518,11 +518,11 @@ func (this *NodeDAO) FindAllEnabledNodeIdsWithClusterId(tx *dbs.Tx, clusterId in
|
|||||||
func (this *NodeDAO) FindAllInactiveNodesWithClusterId(tx *dbs.Tx, clusterId int64) (result []*Node, err error) {
|
func (this *NodeDAO) FindAllInactiveNodesWithClusterId(tx *dbs.Tx, clusterId int64) (result []*Node, err error) {
|
||||||
_, err = this.Query(tx).
|
_, err = this.Query(tx).
|
||||||
State(NodeStateEnabled).
|
State(NodeStateEnabled).
|
||||||
|
Result("id", "name", "status").
|
||||||
Attr("clusterId", clusterId).
|
Attr("clusterId", clusterId).
|
||||||
Attr("isOn", true). // 只监控启用的节点
|
Attr("isOn", true). // 只监控启用的节点
|
||||||
Attr("isInstalled", true). // 只监控已经安装的节点
|
Attr("isInstalled", true). // 只监控已经安装的节点
|
||||||
Attr("isActive", true). // 当前已经在线的
|
Attr("isActive", false).
|
||||||
Where("(status IS NULL OR (JSON_EXTRACT(status, '$.isActive')=false AND UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>10) OR UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>120)").
|
|
||||||
Result("id", "name").
|
Result("id", "name").
|
||||||
Slice(&result).
|
Slice(&result).
|
||||||
FindAll()
|
FindAll()
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||||
//go:build community
|
//go:build !plus
|
||||||
// +build community
|
// +build !plus
|
||||||
|
|
||||||
package models
|
package models
|
||||||
|
|
||||||
|
|||||||
@@ -13,8 +13,8 @@ import (
|
|||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
dbs.OnReadyDone(func() {
|
dbs.OnReadyDone(func() {
|
||||||
task := NewNodeMonitorTask(60)
|
var task = NewNodeMonitorTask(60)
|
||||||
ticker := time.NewTicker(60 * time.Second)
|
var ticker = time.NewTicker(60 * time.Second)
|
||||||
goman.New(func() {
|
goman.New(func() {
|
||||||
for range ticker.C {
|
for range ticker.C {
|
||||||
err := task.loop()
|
err := task.loop()
|
||||||
@@ -29,11 +29,14 @@ func init() {
|
|||||||
// NodeMonitorTask 边缘节点监控任务
|
// NodeMonitorTask 边缘节点监控任务
|
||||||
type NodeMonitorTask struct {
|
type NodeMonitorTask struct {
|
||||||
intervalSeconds int
|
intervalSeconds int
|
||||||
|
|
||||||
|
inactiveMap map[int64]int // nodeId => count
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewNodeMonitorTask(intervalSeconds int) *NodeMonitorTask {
|
func NewNodeMonitorTask(intervalSeconds int) *NodeMonitorTask {
|
||||||
return &NodeMonitorTask{
|
return &NodeMonitorTask{
|
||||||
intervalSeconds: intervalSeconds,
|
intervalSeconds: intervalSeconds,
|
||||||
|
inactiveMap: map[int64]int{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -81,24 +84,37 @@ func (this *NodeMonitorTask) monitorCluster(cluster *models.NodeCluster) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, node := range inactiveNodes {
|
|
||||||
subject := "节点\"" + node.Name + "\"已处于离线状态"
|
|
||||||
msg := "节点\"" + node.Name + "\"已处于离线状态"
|
|
||||||
err = models.SharedMessageDAO.CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, clusterId, int64(node.Id), models.MessageTypeNodeInactive, models.LevelError, subject, msg, nil, false)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// 修改在线状态
|
var nodeMap = map[int64]*models.Node{}
|
||||||
err = models.SharedNodeDAO.UpdateNodeActive(nil, int64(node.Id), false)
|
for _, node := range inactiveNodes {
|
||||||
if err != nil {
|
var nodeId = int64(node.Id)
|
||||||
return err
|
nodeMap[nodeId] = node
|
||||||
|
this.inactiveMap[nodeId]++
|
||||||
|
}
|
||||||
|
|
||||||
|
const maxInactiveTries = 5
|
||||||
|
|
||||||
|
// 处理现有的离线状态
|
||||||
|
for nodeId, count := range this.inactiveMap {
|
||||||
|
node, ok := nodeMap[nodeId]
|
||||||
|
if ok {
|
||||||
|
// 连续两次
|
||||||
|
if count >= maxInactiveTries {
|
||||||
|
this.inactiveMap[nodeId] = 0
|
||||||
|
|
||||||
|
subject := "节点\"" + node.Name + "\"已处于离线状态"
|
||||||
|
msg := "集群'" + cluster.Name + "'节点\"" + node.Name + "\"已处于离线状态,请检查节点是否异常"
|
||||||
|
err = models.SharedMessageDAO.CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, clusterId, int64(node.Id), models.MessageTypeNodeInactive, models.LevelError, subject, msg, nil, false)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
this.inactiveMap[nodeId] = 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO 检查恢复连接
|
// 检查CPU、内存、磁盘不足节点
|
||||||
|
|
||||||
// 检查CPU、内存、磁盘不足节点,而且离线的节点不再重复提示
|
|
||||||
// TODO 需要实现
|
// TODO 需要实现
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package tasks
|
package tasks
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
||||||
"github.com/iwind/TeaGo/dbs"
|
"github.com/iwind/TeaGo/dbs"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
@@ -8,10 +9,23 @@ import (
|
|||||||
func TestNodeMonitorTask_loop(t *testing.T) {
|
func TestNodeMonitorTask_loop(t *testing.T) {
|
||||||
dbs.NotifyReady()
|
dbs.NotifyReady()
|
||||||
|
|
||||||
task := NewNodeMonitorTask(60)
|
var task = NewNodeMonitorTask(60)
|
||||||
err := task.loop()
|
err := task.loop()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
t.Log("ok")
|
t.Log("ok")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNodeMonitorTask_Monitor(t *testing.T) {
|
||||||
|
dbs.NotifyReady()
|
||||||
|
var task = NewNodeMonitorTask(60)
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
err := task.monitorCluster(&models.NodeCluster{
|
||||||
|
Id: 42,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user