diff --git a/internal/db/models/node_dao.go b/internal/db/models/node_dao.go index df1fc2db..799b99f9 100644 --- a/internal/db/models/node_dao.go +++ b/internal/db/models/node_dao.go @@ -518,11 +518,11 @@ func (this *NodeDAO) FindAllEnabledNodeIdsWithClusterId(tx *dbs.Tx, clusterId in func (this *NodeDAO) FindAllInactiveNodesWithClusterId(tx *dbs.Tx, clusterId int64) (result []*Node, err error) { _, err = this.Query(tx). State(NodeStateEnabled). + Result("id", "name", "status"). Attr("clusterId", clusterId). Attr("isOn", true). // 只监控启用的节点 Attr("isInstalled", true). // 只监控已经安装的节点 - Attr("isActive", true). // 当前已经在线的 - Where("(status IS NULL OR (JSON_EXTRACT(status, '$.isActive')=false AND UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>10) OR UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>120)"). + Attr("isActive", false). Result("id", "name"). Slice(&result). FindAll() diff --git a/internal/db/models/node_ip_address_dao_community.go b/internal/db/models/node_ip_address_dao_community.go index 25e07790..41d32fb1 100644 --- a/internal/db/models/node_ip_address_dao_community.go +++ b/internal/db/models/node_ip_address_dao_community.go @@ -1,6 +1,6 @@ // Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. -//go:build community -// +build community +//go:build !plus +// +build !plus package models diff --git a/internal/tasks/node_monitor_task.go b/internal/tasks/node_monitor_task.go index 7f2e897d..e35070d1 100644 --- a/internal/tasks/node_monitor_task.go +++ b/internal/tasks/node_monitor_task.go @@ -13,8 +13,8 @@ import ( func init() { dbs.OnReadyDone(func() { - task := NewNodeMonitorTask(60) - ticker := time.NewTicker(60 * time.Second) + var task = NewNodeMonitorTask(60) + var ticker = time.NewTicker(60 * time.Second) goman.New(func() { for range ticker.C { err := task.loop() @@ -29,11 +29,14 @@ func init() { // NodeMonitorTask 边缘节点监控任务 type NodeMonitorTask struct { intervalSeconds int + + inactiveMap map[int64]int // nodeId => count } func NewNodeMonitorTask(intervalSeconds int) *NodeMonitorTask { return &NodeMonitorTask{ intervalSeconds: intervalSeconds, + inactiveMap: map[int64]int{}, } } @@ -81,24 +84,37 @@ func (this *NodeMonitorTask) monitorCluster(cluster *models.NodeCluster) error { if err != nil { return err } - for _, node := range inactiveNodes { - subject := "节点\"" + node.Name + "\"已处于离线状态" - msg := "节点\"" + node.Name + "\"已处于离线状态" - err = models.SharedMessageDAO.CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, clusterId, int64(node.Id), models.MessageTypeNodeInactive, models.LevelError, subject, msg, nil, false) - if err != nil { - return err - } - // 修改在线状态 - err = models.SharedNodeDAO.UpdateNodeActive(nil, int64(node.Id), false) - if err != nil { - return err + var nodeMap = map[int64]*models.Node{} + for _, node := range inactiveNodes { + var nodeId = int64(node.Id) + nodeMap[nodeId] = node + this.inactiveMap[nodeId]++ + } + + const maxInactiveTries = 5 + + // 处理现有的离线状态 + for nodeId, count := range this.inactiveMap { + node, ok := nodeMap[nodeId] + if ok { + // 连续两次 + if count >= maxInactiveTries { + this.inactiveMap[nodeId] = 0 + + subject := "节点\"" + node.Name + "\"已处于离线状态" + msg := "集群'" + cluster.Name + "'节点\"" + node.Name + "\"已处于离线状态,请检查节点是否异常" + err = models.SharedMessageDAO.CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, clusterId, int64(node.Id), models.MessageTypeNodeInactive, models.LevelError, subject, msg, nil, false) + if err != nil { + return err + } + } + } else { + this.inactiveMap[nodeId] = 0 } } - // TODO 检查恢复连接 - - // 检查CPU、内存、磁盘不足节点,而且离线的节点不再重复提示 + // 检查CPU、内存、磁盘不足节点 // TODO 需要实现 return nil diff --git a/internal/tasks/node_monitor_task_test.go b/internal/tasks/node_monitor_task_test.go index ba501c48..e522b0da 100644 --- a/internal/tasks/node_monitor_task_test.go +++ b/internal/tasks/node_monitor_task_test.go @@ -1,6 +1,7 @@ package tasks import ( + "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/iwind/TeaGo/dbs" "testing" ) @@ -8,10 +9,23 @@ import ( func TestNodeMonitorTask_loop(t *testing.T) { dbs.NotifyReady() - task := NewNodeMonitorTask(60) + var task = NewNodeMonitorTask(60) err := task.loop() if err != nil { t.Fatal(err) } t.Log("ok") } + +func TestNodeMonitorTask_Monitor(t *testing.T) { + dbs.NotifyReady() + var task = NewNodeMonitorTask(60) + for i := 0; i < 5; i++ { + err := task.monitorCluster(&models.NodeCluster{ + Id: 42, + }) + if err != nil { + t.Fatal(err) + } + } +}