diff --git a/internal/db/models/message_dao.go b/internal/db/models/message_dao.go index 2a721fd4..901e3900 100644 --- a/internal/db/models/message_dao.go +++ b/internal/db/models/message_dao.go @@ -19,13 +19,17 @@ const ( MessageLevelInfo = "info" MessageLevelWarning = "warning" MessageLevelError = "error" + MessageLevelSuccess = "success" ) type MessageType = string const ( MessageTypeHealthCheckFailed MessageType = "HealthCheckFailed" + MessageTypeHealthCheckNodeUp MessageType = "HealthCheckNodeUp" + MessageTypeHealthCheckNodeDown MessageType = "HealthCheckNodeDown" MessageTypeNodeInactive MessageType = "NodeInactive" + MessageTypeNodeActive MessageType = "NodeActive" MessageTypeClusterDNSSyncFailed MessageType = "ClusterDNSSyncFailed" ) diff --git a/internal/db/models/node_dao.go b/internal/db/models/node_dao.go index 536a5eb0..696accec 100644 --- a/internal/db/models/node_dao.go +++ b/internal/db/models/node_dao.go @@ -315,7 +315,9 @@ func (this *NodeDAO) FindAllInactiveNodesWithClusterId(clusterId int64) (result Attr("clusterId", clusterId). Attr("isOn", true). // 只监控启用的节点 Attr("isInstalled", true). // 只监控已经安装的节点 + Attr("isActive", true). // 当前已经在线的 Where("(status IS NULL OR (JSON_EXTRACT(status, '$.isActive')=false AND UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>10) OR UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>120)"). + Result("id"). Slice(&result). FindAll() return @@ -664,7 +666,7 @@ func (this *NodeDAO) UpdateNodeUp(nodeId int64, isUp bool, maxUp int, maxDown in // 如果新老状态一致,则不做任何事情 if oldIsUp == isUp { - return isUp, nil + return false, nil } countUp := int(one.(*Node).CountUp) @@ -700,6 +702,30 @@ func (this *NodeDAO) UpdateNodeUp(nodeId int64, isUp bool, maxUp int, maxDown in return } +// 修改节点活跃状态 +func (this *NodeDAO) UpdateNodeActive(nodeId int64, isActive bool) error { + if nodeId <= 0 { + return errors.New("invalid nodeId") + } + _, err := this.Query(). + Pk(nodeId). + Set("isActive", isActive). + Update() + return err +} + +// 检查节点活跃状态 +func (this *NodeDAO) FindNodeActive(nodeId int64) (bool, error) { + isActive, err := this.Query(). + Pk(nodeId). + Result("isActive"). + FindIntCol(0) + if err != nil { + return false, err + } + return isActive == 1, nil +} + // 生成唯一ID func (this *NodeDAO) genUniqueId() (string, error) { for { diff --git a/internal/db/models/node_model.go b/internal/db/models/node_model.go index 4eb80997..35cb3316 100644 --- a/internal/db/models/node_model.go +++ b/internal/db/models/node_model.go @@ -9,6 +9,7 @@ type Node struct { IsUp uint8 `field:"isUp"` // 是否在线 CountUp uint32 `field:"countUp"` // 连续在线次数 CountDown uint32 `field:"countDown"` // 连续下线次数 + IsActive uint8 `field:"isActive"` // 是否活跃 UniqueId string `field:"uniqueId"` // 节点ID Secret string `field:"secret"` // 密钥 Name string `field:"name"` // 节点名 @@ -37,6 +38,7 @@ type NodeOperator struct { IsUp interface{} // 是否在线 CountUp interface{} // 连续在线次数 CountDown interface{} // 连续下线次数 + IsActive interface{} // 是否活跃 UniqueId interface{} // 节点ID Secret interface{} // 密钥 Name interface{} // 节点名 diff --git a/internal/rpc/services/service_node.go b/internal/rpc/services/service_node.go index 69ccaa4d..fd286cbe 100644 --- a/internal/rpc/services/service_node.go +++ b/internal/rpc/services/service_node.go @@ -21,7 +21,6 @@ func init() { go func() { service := &NodeService{} for nodeId := range events.NodeDNSChanges { - logs.Println("change dns: ", nodeId) err := service.notifyNodeDNSChanged(nodeId) if err != nil { logs.Println("[ERROR]change node dns: " + err.Error()) @@ -450,7 +449,6 @@ func (this *NodeService) UpdateNodeStatus(ctx context.Context, req *pb.UpdateNod if err != nil { return nil, err } - return &pb.RPCSuccess{}, nil } diff --git a/internal/rpc/services/service_node_stream.go b/internal/rpc/services/service_node_stream.go index c660d69a..198fe2cf 100644 --- a/internal/rpc/services/service_node_stream.go +++ b/internal/rpc/services/service_node_stream.go @@ -8,6 +8,7 @@ import ( "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/errors" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" + "github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils" "github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/iwind/TeaGo/logs" @@ -93,7 +94,29 @@ func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) erro } } - logs.Println("[RPC]accepted node '" + strconv.FormatInt(nodeId, 10) + "' connection") + logs.Println("[RPC]accepted node '" + numberutils.FormatInt64(nodeId) + "' connection") + + // 标记为活跃状态 + oldIsActive, err := models.SharedNodeDAO.FindNodeActive(nodeId) + if err != nil { + return err + } + if !oldIsActive { + err = models.SharedNodeDAO.UpdateNodeActive(nodeId, true) + if err != nil { + return err + } + + // 发送恢复消息 + clusterId, err := models.SharedNodeDAO.FindNodeClusterId(nodeId) + if err != nil { + return err + } + err = models.SharedMessageDAO.CreateNodeMessage(clusterId, nodeId, models.MessageTypeNodeActive, models.MessageLevelSuccess, "节点已经恢复在线", nil) + if err != nil { + return err + } + } nodeLocker.Lock() requestChan, ok := requestChanMap[nodeId] diff --git a/internal/tasks/health_check_executor.go b/internal/tasks/health_check_executor.go index d5ebebb0..faeeb492 100644 --- a/internal/tasks/health_check_executor.go +++ b/internal/tasks/health_check_executor.go @@ -139,6 +139,13 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) { default: } + + // 通知恢复或下线 + if result.IsOk { + err = models.NewMessageDAO().CreateNodeMessage(this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeUp, models.MessageLevelSuccess, "健康检查成功,节点\""+result.Node.Name+"\"已恢复上线", nil) + } else { + err = models.NewMessageDAO().CreateNodeMessage(this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeDown, models.MessageLevelError, "健康检查失败,节点\""+result.Node.Name+"\"已自动下线", nil) + } } } diff --git a/internal/tasks/node_monitor_task.go b/internal/tasks/node_monitor_task.go index 75632c55..e012f95f 100644 --- a/internal/tasks/node_monitor_task.go +++ b/internal/tasks/node_monitor_task.go @@ -83,6 +83,12 @@ func (this *NodeMonitorTask) monitorCluster(cluster *models.NodeCluster) error { if err != nil { return err } + + // 修改在线状态 + err = models.SharedNodeDAO.UpdateNodeActive(int64(node.Id), false) + if err != nil { + return err + } } // TODO 检查恢复连接