diff --git a/internal/db/models/message_dao.go b/internal/db/models/message_dao.go index bfff7c42..247b6341 100644 --- a/internal/db/models/message_dao.go +++ b/internal/db/models/message_dao.go @@ -25,6 +25,7 @@ type MessageType = string const ( MessageTypeHealthCheckFail MessageType = "HealthCheckFail" + MessageTypeNodeInactive MessageType = "NodeInactive" ) type MessageDAO dbs.DAO @@ -84,6 +85,12 @@ func (this *MessageDAO) CreateClusterMessage(clusterId int64, messageType Messag return err } +// 创建节点消息 +func (this *MessageDAO) CreateNodeMessage(clusterId int64, nodeId int64, messageType MessageType, level string, body string, paramsJSON []byte) error { + _, err := this.createMessage(clusterId, nodeId, messageType, level, body, paramsJSON) + return err +} + // 删除某天之前的消息 func (this *MessageDAO) DeleteMessagesBeforeDay(dayTime time.Time) error { day := timeutil.Format("Ymd", dayTime) diff --git a/internal/db/models/node_dao.go b/internal/db/models/node_dao.go index 22ef0eb1..79ff5db4 100644 --- a/internal/db/models/node_dao.go +++ b/internal/db/models/node_dao.go @@ -294,6 +294,18 @@ func (this *NodeDAO) FindAllEnabledNodesWithClusterId(clusterId int64) (result [ return } +// 取得一个集群离线的节点 +func (this *NodeDAO) FindAllInactiveNodesWithClusterId(clusterId int64) (result []*Node, err error) { + _, err = this.Query(). + State(NodeStateEnabled). + Attr("clusterId", clusterId). + Attr("isOn", true). // 只监控启用的节点 + Where("(status IS NULL OR (JSON_EXTRACT(status, '$.isActive')=false AND UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>10) OR UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>120)"). + Slice(&result). + FindAll() + return +} + // 计算节点数量 func (this *NodeDAO) CountAllEnabledNodesMatch(clusterId int64, installState configutils.BoolState, activeState configutils.BoolState) (int64, error) { query := this.Query() @@ -336,6 +348,20 @@ func (this *NodeDAO) UpdateNodeStatus(nodeId int64, statusJSON []byte) error { return err } +// 更改节点在线状态 +func (this *NodeDAO) UpdateNodeIsActive(nodeId int64, isActive bool) error { + b := "true" + if !isActive { + b = "false" + } + _, err := this.Query(). + Pk(nodeId). + Where("status IS NOT NULL"). + Set("status", dbs.SQL("JSON_SET(status, '$.isActive', "+b+")")). + Update() + return err +} + // 设置节点安装状态 func (this *NodeDAO) UpdateNodeIsInstalled(nodeId int64, isInstalled bool) error { _, err := this.Query(). diff --git a/internal/db/models/node_model_ext.go b/internal/db/models/node_model_ext.go index 97f7120f..4a26d0e2 100644 --- a/internal/db/models/node_model_ext.go +++ b/internal/db/models/node_model_ext.go @@ -2,6 +2,7 @@ package models import ( "encoding/json" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "time" ) @@ -26,3 +27,15 @@ func (this *Node) DecodeInstallStatus() (*NodeInstallStatus, error) { return status, nil } +// 节点状态 +func (this *Node) DecodeStatus() (*nodeconfigs.NodeStatus, error) { + if len(this.Status) == 0 || this.Status == "null" { + return nil, nil + } + status := &nodeconfigs.NodeStatus{} + err := json.Unmarshal([]byte(this.Status), status) + if err != nil { + return nil, err + } + return status, nil +} diff --git a/internal/rpc/services/service_node_stream.go b/internal/rpc/services/service_node_stream.go index 8c3a9fdc..c660d69a 100644 --- a/internal/rpc/services/service_node_stream.go +++ b/internal/rpc/services/service_node_stream.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "github.com/TeaOSLab/EdgeAPI/internal/configs" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/errors" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" "github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs" @@ -135,6 +136,12 @@ func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) erro for { req, err := server.Recv() if err != nil { + // 修改节点状态 + err1 := models.SharedNodeDAO.UpdateNodeIsActive(nodeId, false) + if err1 != nil { + logs.Println(err1.Error()) + } + return err } diff --git a/internal/tasks/health_check_task.go b/internal/tasks/health_check_task.go index 96dae9d2..526c3aa1 100644 --- a/internal/tasks/health_check_task.go +++ b/internal/tasks/health_check_task.go @@ -19,6 +19,7 @@ func init() { }) } +// 节点健康检查任务 type HealthCheckTask struct { tasksMap map[int64]*HealthCheckClusterTask // taskId => task } diff --git a/internal/tasks/node_monitor_task.go b/internal/tasks/node_monitor_task.go new file mode 100644 index 00000000..883c8a8e --- /dev/null +++ b/internal/tasks/node_monitor_task.go @@ -0,0 +1,92 @@ +package tasks + +import ( + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils" + "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/logs" + "time" +) + +func init() { + dbs.OnReady(func() { + task := NewNodeMonitorTask(60) + ticker := time.NewTicker(60 * time.Second) + go func() { + for range ticker.C { + err := task.loop() + if err != nil { + logs.Println("[TASK][NODE_MONITOR]" + err.Error()) + } + } + }() + }) +} + +// 健康节点任务 +type NodeMonitorTask struct { + intervalSeconds int +} + +func NewNodeMonitorTask(intervalSeconds int) *NodeMonitorTask { + return &NodeMonitorTask{ + intervalSeconds: intervalSeconds, + } +} + +func (this *NodeMonitorTask) Run() { + +} + +func (this *NodeMonitorTask) loop() error { + // 检查上次运行时间,防止重复运行 + settingKey := "node_monitor" + timestamp := time.Now().Unix() + c, err := models.SharedSysSettingDAO.CompareInt64Setting(settingKey, timestamp-int64(this.intervalSeconds)) + if err != nil { + return err + } + if c > 0 { + return nil + } + + // 记录时间 + err = models.SharedSysSettingDAO.UpdateSetting(settingKey, []byte(numberutils.FormatInt64(timestamp))) + if err != nil { + return err + } + + clusters, err := models.SharedNodeClusterDAO.FindAllEnableClusters() + if err != nil { + return err + } + for _, cluster := range clusters { + err := this.monitorCluster(cluster) + if err != nil { + return err + } + } + + return nil +} + +func (this *NodeMonitorTask) monitorCluster(cluster *models.NodeCluster) error { + clusterId := int64(cluster.Id) + + // 检查离线节点 + inactiveNodes, err := models.SharedNodeDAO.FindAllInactiveNodesWithClusterId(clusterId) + if err != nil { + return err + } + for _, node := range inactiveNodes { + err = models.SharedMessageDAO.CreateNodeMessage(clusterId, int64(node.Id), models.MessageTypeNodeInactive, models.LevelError, "节点已处于离线状态", nil) + if err != nil { + return err + } + } + + // 检查CPU、内存、磁盘不足节点,而且离线的节点不再重复提示 + // TODO 需要实现 + + return nil +} diff --git a/internal/tasks/node_monitor_task_test.go b/internal/tasks/node_monitor_task_test.go new file mode 100644 index 00000000..ba501c48 --- /dev/null +++ b/internal/tasks/node_monitor_task_test.go @@ -0,0 +1,17 @@ +package tasks + +import ( + "github.com/iwind/TeaGo/dbs" + "testing" +) + +func TestNodeMonitorTask_loop(t *testing.T) { + dbs.NotifyReady() + + task := NewNodeMonitorTask(60) + err := task.loop() + if err != nil { + t.Fatal(err) + } + t.Log("ok") +}