消息中增加恢复成功的消息

This commit is contained in:
刘祥超
2020-11-16 09:20:24 +08:00
parent 58cae9a5e5
commit 5ef3e4acaf
7 changed files with 70 additions and 4 deletions

View File

@@ -19,13 +19,17 @@ const (
MessageLevelInfo = "info" MessageLevelInfo = "info"
MessageLevelWarning = "warning" MessageLevelWarning = "warning"
MessageLevelError = "error" MessageLevelError = "error"
MessageLevelSuccess = "success"
) )
type MessageType = string type MessageType = string
const ( const (
MessageTypeHealthCheckFailed MessageType = "HealthCheckFailed" MessageTypeHealthCheckFailed MessageType = "HealthCheckFailed"
MessageTypeHealthCheckNodeUp MessageType = "HealthCheckNodeUp"
MessageTypeHealthCheckNodeDown MessageType = "HealthCheckNodeDown"
MessageTypeNodeInactive MessageType = "NodeInactive" MessageTypeNodeInactive MessageType = "NodeInactive"
MessageTypeNodeActive MessageType = "NodeActive"
MessageTypeClusterDNSSyncFailed MessageType = "ClusterDNSSyncFailed" MessageTypeClusterDNSSyncFailed MessageType = "ClusterDNSSyncFailed"
) )

View File

@@ -315,7 +315,9 @@ func (this *NodeDAO) FindAllInactiveNodesWithClusterId(clusterId int64) (result
Attr("clusterId", clusterId). Attr("clusterId", clusterId).
Attr("isOn", true). // 只监控启用的节点 Attr("isOn", true). // 只监控启用的节点
Attr("isInstalled", true). // 只监控已经安装的节点 Attr("isInstalled", true). // 只监控已经安装的节点
Attr("isActive", true). // 当前已经在线的
Where("(status IS NULL OR (JSON_EXTRACT(status, '$.isActive')=false AND UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>10) OR UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>120)"). Where("(status IS NULL OR (JSON_EXTRACT(status, '$.isActive')=false AND UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>10) OR UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>120)").
Result("id").
Slice(&result). Slice(&result).
FindAll() FindAll()
return return
@@ -664,7 +666,7 @@ func (this *NodeDAO) UpdateNodeUp(nodeId int64, isUp bool, maxUp int, maxDown in
// 如果新老状态一致,则不做任何事情 // 如果新老状态一致,则不做任何事情
if oldIsUp == isUp { if oldIsUp == isUp {
return isUp, nil return false, nil
} }
countUp := int(one.(*Node).CountUp) countUp := int(one.(*Node).CountUp)
@@ -700,6 +702,30 @@ func (this *NodeDAO) UpdateNodeUp(nodeId int64, isUp bool, maxUp int, maxDown in
return return
} }
// 修改节点活跃状态
func (this *NodeDAO) UpdateNodeActive(nodeId int64, isActive bool) error {
if nodeId <= 0 {
return errors.New("invalid nodeId")
}
_, err := this.Query().
Pk(nodeId).
Set("isActive", isActive).
Update()
return err
}
// 检查节点活跃状态
func (this *NodeDAO) FindNodeActive(nodeId int64) (bool, error) {
isActive, err := this.Query().
Pk(nodeId).
Result("isActive").
FindIntCol(0)
if err != nil {
return false, err
}
return isActive == 1, nil
}
// 生成唯一ID // 生成唯一ID
func (this *NodeDAO) genUniqueId() (string, error) { func (this *NodeDAO) genUniqueId() (string, error) {
for { for {

View File

@@ -9,6 +9,7 @@ type Node struct {
IsUp uint8 `field:"isUp"` // 是否在线 IsUp uint8 `field:"isUp"` // 是否在线
CountUp uint32 `field:"countUp"` // 连续在线次数 CountUp uint32 `field:"countUp"` // 连续在线次数
CountDown uint32 `field:"countDown"` // 连续下线次数 CountDown uint32 `field:"countDown"` // 连续下线次数
IsActive uint8 `field:"isActive"` // 是否活跃
UniqueId string `field:"uniqueId"` // 节点ID UniqueId string `field:"uniqueId"` // 节点ID
Secret string `field:"secret"` // 密钥 Secret string `field:"secret"` // 密钥
Name string `field:"name"` // 节点名 Name string `field:"name"` // 节点名
@@ -37,6 +38,7 @@ type NodeOperator struct {
IsUp interface{} // 是否在线 IsUp interface{} // 是否在线
CountUp interface{} // 连续在线次数 CountUp interface{} // 连续在线次数
CountDown interface{} // 连续下线次数 CountDown interface{} // 连续下线次数
IsActive interface{} // 是否活跃
UniqueId interface{} // 节点ID UniqueId interface{} // 节点ID
Secret interface{} // 密钥 Secret interface{} // 密钥
Name interface{} // 节点名 Name interface{} // 节点名

View File

@@ -21,7 +21,6 @@ func init() {
go func() { go func() {
service := &NodeService{} service := &NodeService{}
for nodeId := range events.NodeDNSChanges { for nodeId := range events.NodeDNSChanges {
logs.Println("change dns: ", nodeId)
err := service.notifyNodeDNSChanged(nodeId) err := service.notifyNodeDNSChanged(nodeId)
if err != nil { if err != nil {
logs.Println("[ERROR]change node dns: " + err.Error()) logs.Println("[ERROR]change node dns: " + err.Error())
@@ -450,7 +449,6 @@ func (this *NodeService) UpdateNodeStatus(ctx context.Context, req *pb.UpdateNod
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &pb.RPCSuccess{}, nil return &pb.RPCSuccess{}, nil
} }

View File

@@ -8,6 +8,7 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/errors"
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
"github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/logs"
@@ -93,7 +94,29 @@ func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) erro
} }
} }
logs.Println("[RPC]accepted node '" + strconv.FormatInt(nodeId, 10) + "' connection") logs.Println("[RPC]accepted node '" + numberutils.FormatInt64(nodeId) + "' connection")
// 标记为活跃状态
oldIsActive, err := models.SharedNodeDAO.FindNodeActive(nodeId)
if err != nil {
return err
}
if !oldIsActive {
err = models.SharedNodeDAO.UpdateNodeActive(nodeId, true)
if err != nil {
return err
}
// 发送恢复消息
clusterId, err := models.SharedNodeDAO.FindNodeClusterId(nodeId)
if err != nil {
return err
}
err = models.SharedMessageDAO.CreateNodeMessage(clusterId, nodeId, models.MessageTypeNodeActive, models.MessageLevelSuccess, "节点已经恢复在线", nil)
if err != nil {
return err
}
}
nodeLocker.Lock() nodeLocker.Lock()
requestChan, ok := requestChanMap[nodeId] requestChan, ok := requestChanMap[nodeId]

View File

@@ -139,6 +139,13 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) {
default: default:
} }
// 通知恢复或下线
if result.IsOk {
err = models.NewMessageDAO().CreateNodeMessage(this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeUp, models.MessageLevelSuccess, "健康检查成功,节点\""+result.Node.Name+"\"已恢复上线", nil)
} else {
err = models.NewMessageDAO().CreateNodeMessage(this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeDown, models.MessageLevelError, "健康检查失败,节点\""+result.Node.Name+"\"已自动下线", nil)
}
} }
} }

View File

@@ -83,6 +83,12 @@ func (this *NodeMonitorTask) monitorCluster(cluster *models.NodeCluster) error {
if err != nil { if err != nil {
return err return err
} }
// 修改在线状态
err = models.SharedNodeDAO.UpdateNodeActive(int64(node.Id), false)
if err != nil {
return err
}
} }
// TODO 检查恢复连接 // TODO 检查恢复连接