diff --git a/internal/db/models/dns/dnsutils/dns_utils.go b/internal/db/models/dns/dnsutils/dns_utils.go index ddc34edd..800506a0 100644 --- a/internal/db/models/dns/dnsutils/dns_utils.go +++ b/internal/db/models/dns/dnsutils/dns_utils.go @@ -153,7 +153,7 @@ func CheckClusterDNS(tx *dbs.Tx, cluster *models.NodeCluster) (issues []*pb.DNSI } // 检查IP地址 - ipAddr, _, err := models.SharedNodeIPAddressDAO.FindFirstNodeAccessIPAddress(tx, nodeId, nodeconfigs.NodeRoleNode) + ipAddr, _, err := models.SharedNodeIPAddressDAO.FindFirstNodeAccessIPAddress(tx, nodeId, true, nodeconfigs.NodeRoleNode) if err != nil { return nil, err } diff --git a/internal/db/models/node_ip_address_dao.go b/internal/db/models/node_ip_address_dao.go index dffb7d76..d9ebfd07 100644 --- a/internal/db/models/node_ip_address_dao.go +++ b/internal/db/models/node_ip_address_dao.go @@ -125,7 +125,7 @@ func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, adminId int64, nodeId in role = nodeconfigs.NodeRoleNode } - op := NewNodeIPAddressOperator() + var op = NewNodeIPAddressOperator() op.NodeId = nodeId op.Role = role op.Name = name @@ -234,17 +234,20 @@ func (this *NodeIPAddressDAO) FindAllEnabledAddressesWithNode(tx *dbs.Tx, nodeId } // FindFirstNodeAccessIPAddress 查找节点的第一个可访问的IP地址 -func (this *NodeIPAddressDAO) FindFirstNodeAccessIPAddress(tx *dbs.Tx, nodeId int64, role nodeconfigs.NodeRole) (ip string, addrId int64, err error) { +func (this *NodeIPAddressDAO) FindFirstNodeAccessIPAddress(tx *dbs.Tx, nodeId int64, mustUp bool, role nodeconfigs.NodeRole) (ip string, addrId int64, err error) { if len(role) == 0 { role = nodeconfigs.NodeRoleNode } - one, err := this.Query(tx). + var query = this.Query(tx). Attr("nodeId", nodeId). Attr("role", role). State(NodeIPAddressStateEnabled). Attr("canAccess", true). - Attr("isOn", true). - Attr("isUp", true). + Attr("isOn", true) + if mustUp { + query.Attr("isUp", true) + } + one, err := query. Desc("order"). AscPk(). Result("id", "ip"). @@ -456,13 +459,13 @@ func (this *NodeIPAddressDAO) UpdateAddressBackupIP(tx *dbs.Tx, addressId int64, } // UpdateAddressHealthCount 计算IP健康状态 -func (this *NodeIPAddressDAO) UpdateAddressHealthCount(tx *dbs.Tx, addrId int64, isUp bool, maxUp int, maxDown int) (changed bool, err error) { +func (this *NodeIPAddressDAO) UpdateAddressHealthCount(tx *dbs.Tx, addrId int64, newIsUp bool, maxUp int, maxDown int, autoUpDown bool) (changed bool, err error) { if addrId <= 0 { return false, errors.New("invalid address id") } one, err := this.Query(tx). Pk(addrId). - Result("isHealthy", "countUp", "countDown"). + Result("isHealthy", "isUp", "countUp", "countDown"). Find() if err != nil { return false, err @@ -470,26 +473,57 @@ func (this *NodeIPAddressDAO) UpdateAddressHealthCount(tx *dbs.Tx, addrId int64, if one == nil { return false, nil } - oldIsHealthy := one.(*NodeIPAddress).IsHealthy + var oldIsHealthy = one.(*NodeIPAddress).IsHealthy + var oldIsUp = one.(*NodeIPAddress).IsUp // 如果新老状态一致,则不做任何事情 - if oldIsHealthy == isUp { + if oldIsHealthy == newIsUp { + // 如果自动上下线,则健康状况和是否在线保持一致 + if autoUpDown { + if oldIsUp != oldIsHealthy { + err = this.Query(tx). + Pk(addrId). + Set("isUp", newIsUp). + UpdateQuickly() + if err != nil { + return false, err + } + err = this.NotifyUpdate(tx, addrId) + if err != nil { + return false, err + } + + // 创建日志 + if newIsUp { + err = SharedNodeIPAddressLogDAO.CreateLog(tx, 0, addrId, "健康检查上线") + } else { + err = SharedNodeIPAddressLogDAO.CreateLog(tx, 0, addrId, "健康检查下线") + } + if err != nil { + return true, err + } + + return true, nil + } + } return false, nil } - countUp := int(one.(*NodeIPAddress).CountUp) - countDown := int(one.(*NodeIPAddress).CountDown) + var countUp = int(one.(*NodeIPAddress).CountUp) + var countDown = int(one.(*NodeIPAddress).CountDown) - op := NewNodeIPAddressOperator() + var op = NewNodeIPAddressOperator() op.Id = addrId - if isUp { + if newIsUp { countUp++ countDown = 0 if countUp >= maxUp { changed = true - //op.IsUp = true + if autoUpDown { + op.IsUp = true + } op.IsHealthy = true } } else { @@ -498,7 +532,9 @@ func (this *NodeIPAddressDAO) UpdateAddressHealthCount(tx *dbs.Tx, addrId int64, if countDown >= maxDown { changed = true - //op.IsUp = false + if autoUpDown { + op.IsUp = false + } op.IsHealthy = false } } @@ -515,6 +551,15 @@ func (this *NodeIPAddressDAO) UpdateAddressHealthCount(tx *dbs.Tx, addrId int64, if err != nil { return true, err } + + // 创建日志 + if autoUpDown { + if newIsUp { + err = SharedNodeIPAddressLogDAO.CreateLog(tx, 0, addrId, "健康检查上线") + } else { + err = SharedNodeIPAddressLogDAO.CreateLog(tx, 0, addrId, "健康检查下线") + } + } } return diff --git a/internal/rpc/services/service_node.go b/internal/rpc/services/service_node.go index e74700d8..7f5244c7 100644 --- a/internal/rpc/services/service_node.go +++ b/internal/rpc/services/service_node.go @@ -1368,7 +1368,7 @@ func (this *NodeService) FindEnabledNodeDNS(ctx context.Context, req *pb.FindEna return &pb.FindEnabledNodeDNSResponse{Node: nil}, nil } - ipAddr, _, err := models.SharedNodeIPAddressDAO.FindFirstNodeAccessIPAddress(tx, int64(node.Id), nodeconfigs.NodeRoleNode) + ipAddr, _, err := models.SharedNodeIPAddressDAO.FindFirstNodeAccessIPAddress(tx, int64(node.Id), true, nodeconfigs.NodeRoleNode) if err != nil { return nil, err } diff --git a/internal/rpc/services/service_node_ip_address.go b/internal/rpc/services/service_node_ip_address.go index d4423a48..6f5e4176 100644 --- a/internal/rpc/services/service_node_ip_address.go +++ b/internal/rpc/services/service_node_ip_address.go @@ -158,6 +158,7 @@ func (this *NodeIPAddressService) FindEnabledNodeIPAddress(ctx context.Context, CanAccess: address.CanAccess, IsOn: address.IsOn, IsUp: address.IsUp, + IsHealthy: address.IsHealthy, BackupIP: address.DecodeBackupIP(), } } @@ -194,6 +195,7 @@ func (this *NodeIPAddressService) FindAllEnabledNodeIPAddressesWithNodeId(ctx co CanAccess: address.CanAccess, IsOn: address.IsOn, IsUp: address.IsUp, + IsHealthy: address.IsHealthy, BackupIP: address.DecodeBackupIP(), }) } @@ -218,7 +220,7 @@ func (this *NodeIPAddressService) CountAllEnabledNodeIPAddresses(ctx context.Con return this.SuccessCount(count) } -// ListEnabledIPAddresses 列出单页IP地址 +// ListEnabledNodeIPAddresses 列出单页IP地址 func (this *NodeIPAddressService) ListEnabledNodeIPAddresses(ctx context.Context, req *pb.ListEnabledNodeIPAddressesRequest) (*pb.ListEnabledNodeIPAddressesResponse, error) { // 校验请求 _, err := this.ValidateAdmin(ctx, 0) @@ -244,6 +246,7 @@ func (this *NodeIPAddressService) ListEnabledNodeIPAddresses(ctx context.Context CanAccess: addr.CanAccess, IsOn: addr.IsOn, IsUp: addr.IsUp, + IsHealthy: addr.IsHealthy, BackupIP: addr.DecodeBackupIP(), }) } diff --git a/internal/tasks/health_check_executor.go b/internal/tasks/health_check_executor.go index dae77354..f8585e90 100644 --- a/internal/tasks/health_check_executor.go +++ b/internal/tasks/health_check_executor.go @@ -7,7 +7,6 @@ import ( teaconst "github.com/TeaOSLab/EdgeAPI/internal/const" "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/errors" - "github.com/TeaOSLab/EdgeAPI/internal/goman" "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" "github.com/TeaOSLab/EdgeAPI/internal/utils" "github.com/TeaOSLab/EdgeCommon/pkg/configutils" @@ -45,13 +44,13 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) { return nil, errors.New("health check config is not found") } - healthCheckConfig := &serverconfigs.HealthCheckConfig{} + var healthCheckConfig = &serverconfigs.HealthCheckConfig{} err = json.Unmarshal(cluster.HealthCheck, healthCheckConfig) if err != nil { return nil, err } - results := []*HealthCheckResult{} + var results = []*HealthCheckResult{} nodes, err := models.NewNodeDAO().FindAllEnabledNodesWithClusterId(nil, this.clusterId) if err != nil { return nil, err @@ -64,7 +63,7 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) { Node: node, } - ipAddr, ipAddrId, err := models.NewNodeIPAddressDAO().FindFirstNodeAccessIPAddress(nil, int64(node.Id), nodeconfigs.NodeRoleNode) + ipAddr, ipAddrId, err := models.NewNodeIPAddressDAO().FindFirstNodeAccessIPAddress(nil, int64(node.Id), false, nodeconfigs.NodeRoleNode) if err != nil { return nil, err } @@ -79,7 +78,7 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) { } // 并行检查 - preparedResults := []*HealthCheckResult{} + var preparedResults = []*HealthCheckResult{} for _, result := range results { if len(result.NodeAddr) > 0 { preparedResults = append(preparedResults, result) @@ -90,13 +89,13 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) { return results, nil } - countResults := len(preparedResults) - queue := make(chan *HealthCheckResult, countResults) + var countResults = len(preparedResults) + var queue = make(chan *HealthCheckResult, countResults) for _, result := range preparedResults { queue <- result } - countTries := types.Int(healthCheckConfig.CountTries) + var countTries = types.Int(healthCheckConfig.CountTries) if countTries > 10 { // 限定最多尝试10次 TODO 应该在管理界面提示用户 countTries = 10 } @@ -104,7 +103,7 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) { countTries = 3 } - tryDelay := 1 * time.Second + var tryDelay = 1 * time.Second if healthCheckConfig.TryDelay != nil { tryDelay = healthCheckConfig.TryDelay.Duration() @@ -113,103 +112,105 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) { } } - countRoutines := 10 + var concurrent = 128 + wg := sync.WaitGroup{} wg.Add(countResults) - for i := 0; i < countRoutines; i++ { - goman.New(func() { + for i := 0; i < concurrent; i++ { + go func() { for { select { case result := <-queue: - func() { - for i := 1; i <= countTries; i++ { - before := time.Now() - err := this.checkNode(healthCheckConfig, result) - result.CostMs = time.Since(before).Seconds() * 1000 - if err != nil { - result.Error = err.Error() - } - if result.IsOk { - break - } - if tryDelay > 0 { - time.Sleep(tryDelay) - } - } - - // 修改节点IP状态 - if teaconst.IsPlus { - isChanged, err := models.SharedNodeIPAddressDAO.UpdateAddressHealthCount(nil, result.NodeAddrId, result.IsOk, healthCheckConfig.CountUp, healthCheckConfig.CountDown) - if err != nil { - remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error()) - return - } - - if isChanged { - // 发送消息 - var message = "" - var messageType string - var messageLevel string - if result.IsOk { - message = "健康检查成功,节点\"" + result.Node.Name + "\",IP\"" + result.NodeAddr + "\"已恢复上线" - messageType = models.MessageTypeHealthCheckNodeUp - messageLevel = models.MessageLevelSuccess - } else { - message = "健康检查失败,节点\"" + result.Node.Name + "\",IP\"" + result.NodeAddr + "\"已自动下线" - messageType = models.MessageTypeHealthCheckNodeDown - messageLevel = models.MessageLevelError - } - - err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), messageType, messageLevel, message, message, nil, false) - if err != nil { - remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error()) - return - } - - // 触发阈值 - err = models.SharedNodeIPAddressDAO.FireThresholds(nil, nodeconfigs.NodeRoleNode, int64(result.Node.Id)) - if err != nil { - remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error()) - return - } - } - - // 我们只处理IP的上下线,不修改节点的状态 - return - } - - // 修改节点状态 - if healthCheckConfig.AutoDown { - isChanged, err := models.SharedNodeDAO.UpdateNodeUpCount(nil, int64(result.Node.Id), result.IsOk, healthCheckConfig.CountUp, healthCheckConfig.CountDown) - if err != nil { - remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error()) - } else if isChanged { - // 通知恢复或下线 - if result.IsOk { - message := "健康检查成功,节点\"" + result.Node.Name + "\"已恢复上线" - err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeUp, models.MessageLevelSuccess, message, message, nil, false) - } else { - message := "健康检查失败,节点\"" + result.Node.Name + "\"已自动下线" - err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeDown, models.MessageLevelError, message, message, nil, false) - } - } - } - }() - + this.runNode(healthCheckConfig, result, countTries, tryDelay) wg.Done() default: return } } - }) + }() } wg.Wait() return results, nil } +func (this *HealthCheckExecutor) runNode(healthCheckConfig *serverconfigs.HealthCheckConfig, result *HealthCheckResult, countTries int, tryDelay time.Duration) { + for i := 1; i <= countTries; i++ { + var before = time.Now() + err := this.runNodeOnce(healthCheckConfig, result) + result.CostMs = time.Since(before).Seconds() * 1000 + if err != nil { + result.Error = err.Error() + } + if result.IsOk { + break + } + if tryDelay > 0 { + time.Sleep(tryDelay) + } + } + + // 修改节点IP状态 + if teaconst.IsPlus { + isChanged, err := models.SharedNodeIPAddressDAO.UpdateAddressHealthCount(nil, result.NodeAddrId, result.IsOk, healthCheckConfig.CountUp, healthCheckConfig.CountDown, healthCheckConfig.AutoDown) + if err != nil { + remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error()) + return + } + + if isChanged { + // 发送消息 + var message = "" + var messageType string + var messageLevel string + if result.IsOk { + message = "健康检查成功,节点\"" + result.Node.Name + "\",IP\"" + result.NodeAddr + "\"已恢复上线" + messageType = models.MessageTypeHealthCheckNodeUp + messageLevel = models.MessageLevelSuccess + } else { + message = "健康检查失败,节点\"" + result.Node.Name + "\",IP\"" + result.NodeAddr + "\"已自动下线" + messageType = models.MessageTypeHealthCheckNodeDown + messageLevel = models.MessageLevelError + } + + err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), messageType, messageLevel, message, message, nil, false) + if err != nil { + remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error()) + return + } + + // 触发阈值 + err = models.SharedNodeIPAddressDAO.FireThresholds(nil, nodeconfigs.NodeRoleNode, int64(result.Node.Id)) + if err != nil { + remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error()) + return + } + } + + // 我们只处理IP的上下线,不修改节点的状态 + return + } + + // 修改节点状态 + if healthCheckConfig.AutoDown { + isChanged, err := models.SharedNodeDAO.UpdateNodeUpCount(nil, int64(result.Node.Id), result.IsOk, healthCheckConfig.CountUp, healthCheckConfig.CountDown) + if err != nil { + remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error()) + } else if isChanged { + // 通知恢复或下线 + if result.IsOk { + message := "健康检查成功,节点\"" + result.Node.Name + "\"已恢复上线" + err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeUp, models.MessageLevelSuccess, message, message, nil, false) + } else { + message := "健康检查失败,节点\"" + result.Node.Name + "\"已自动下线" + err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeDown, models.MessageLevelError, message, message, nil, false) + } + } + } +} + // 检查单个节点 -func (this *HealthCheckExecutor) checkNode(healthCheckConfig *serverconfigs.HealthCheckConfig, result *HealthCheckResult) error { +func (this *HealthCheckExecutor) runNodeOnce(healthCheckConfig *serverconfigs.HealthCheckConfig, result *HealthCheckResult) error { // 支持IPv6 if utils.IsIPv6(result.NodeAddr) { result.NodeAddr = "[" + result.NodeAddr + "]" @@ -219,7 +220,7 @@ func (this *HealthCheckExecutor) checkNode(healthCheckConfig *serverconfigs.Heal healthCheckConfig.URL = "http://${host}/" } - url := strings.ReplaceAll(healthCheckConfig.URL, "${host}", result.NodeAddr) + var url = strings.ReplaceAll(healthCheckConfig.URL, "${host}", result.NodeAddr) req, err := http.NewRequest(http.MethodGet, url, nil) if err != nil { return err @@ -238,12 +239,12 @@ func (this *HealthCheckExecutor) checkNode(healthCheckConfig *serverconfigs.Heal } req.Header.Set(serverconfigs.HealthCheckHeaderName, key) - timeout := 5 * time.Second + var timeout = 5 * time.Second if healthCheckConfig.Timeout != nil { timeout = healthCheckConfig.Timeout.Duration() } - client := &http.Client{ + var client = &http.Client{ Timeout: timeout, Transport: &http.Transport{ DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { @@ -251,10 +252,6 @@ func (this *HealthCheckExecutor) checkNode(healthCheckConfig *serverconfigs.Heal if err != nil { return nil, err } - conn, err := net.Dial(network, configutils.QuoteIP(result.NodeAddr)+":"+port) - if err == nil { - return conn, nil - } return net.DialTimeout(network, configutils.QuoteIP(result.NodeAddr)+":"+port, timeout) }, MaxIdleConns: 1,