修复集群健康检查无法自动上下线IP地址的Bug

This commit is contained in:
GoEdgeLab
2022-04-22 21:53:38 +08:00
parent fd6a02dfe4
commit 9aa43090d3
5 changed files with 159 additions and 114 deletions

View File

@@ -153,7 +153,7 @@ func CheckClusterDNS(tx *dbs.Tx, cluster *models.NodeCluster) (issues []*pb.DNSI
} }
// 检查IP地址 // 检查IP地址
ipAddr, _, err := models.SharedNodeIPAddressDAO.FindFirstNodeAccessIPAddress(tx, nodeId, nodeconfigs.NodeRoleNode) ipAddr, _, err := models.SharedNodeIPAddressDAO.FindFirstNodeAccessIPAddress(tx, nodeId, true, nodeconfigs.NodeRoleNode)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -125,7 +125,7 @@ func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, adminId int64, nodeId in
role = nodeconfigs.NodeRoleNode role = nodeconfigs.NodeRoleNode
} }
op := NewNodeIPAddressOperator() var op = NewNodeIPAddressOperator()
op.NodeId = nodeId op.NodeId = nodeId
op.Role = role op.Role = role
op.Name = name op.Name = name
@@ -234,17 +234,20 @@ func (this *NodeIPAddressDAO) FindAllEnabledAddressesWithNode(tx *dbs.Tx, nodeId
} }
// FindFirstNodeAccessIPAddress 查找节点的第一个可访问的IP地址 // FindFirstNodeAccessIPAddress 查找节点的第一个可访问的IP地址
func (this *NodeIPAddressDAO) FindFirstNodeAccessIPAddress(tx *dbs.Tx, nodeId int64, role nodeconfigs.NodeRole) (ip string, addrId int64, err error) { func (this *NodeIPAddressDAO) FindFirstNodeAccessIPAddress(tx *dbs.Tx, nodeId int64, mustUp bool, role nodeconfigs.NodeRole) (ip string, addrId int64, err error) {
if len(role) == 0 { if len(role) == 0 {
role = nodeconfigs.NodeRoleNode role = nodeconfigs.NodeRoleNode
} }
one, err := this.Query(tx). var query = this.Query(tx).
Attr("nodeId", nodeId). Attr("nodeId", nodeId).
Attr("role", role). Attr("role", role).
State(NodeIPAddressStateEnabled). State(NodeIPAddressStateEnabled).
Attr("canAccess", true). Attr("canAccess", true).
Attr("isOn", true). Attr("isOn", true)
Attr("isUp", true). if mustUp {
query.Attr("isUp", true)
}
one, err := query.
Desc("order"). Desc("order").
AscPk(). AscPk().
Result("id", "ip"). Result("id", "ip").
@@ -456,13 +459,13 @@ func (this *NodeIPAddressDAO) UpdateAddressBackupIP(tx *dbs.Tx, addressId int64,
} }
// UpdateAddressHealthCount 计算IP健康状态 // UpdateAddressHealthCount 计算IP健康状态
func (this *NodeIPAddressDAO) UpdateAddressHealthCount(tx *dbs.Tx, addrId int64, isUp bool, maxUp int, maxDown int) (changed bool, err error) { func (this *NodeIPAddressDAO) UpdateAddressHealthCount(tx *dbs.Tx, addrId int64, newIsUp bool, maxUp int, maxDown int, autoUpDown bool) (changed bool, err error) {
if addrId <= 0 { if addrId <= 0 {
return false, errors.New("invalid address id") return false, errors.New("invalid address id")
} }
one, err := this.Query(tx). one, err := this.Query(tx).
Pk(addrId). Pk(addrId).
Result("isHealthy", "countUp", "countDown"). Result("isHealthy", "isUp", "countUp", "countDown").
Find() Find()
if err != nil { if err != nil {
return false, err return false, err
@@ -470,26 +473,57 @@ func (this *NodeIPAddressDAO) UpdateAddressHealthCount(tx *dbs.Tx, addrId int64,
if one == nil { if one == nil {
return false, nil return false, nil
} }
oldIsHealthy := one.(*NodeIPAddress).IsHealthy var oldIsHealthy = one.(*NodeIPAddress).IsHealthy
var oldIsUp = one.(*NodeIPAddress).IsUp
// 如果新老状态一致,则不做任何事情 // 如果新老状态一致,则不做任何事情
if oldIsHealthy == isUp { if oldIsHealthy == newIsUp {
// 如果自动上下线,则健康状况和是否在线保持一致
if autoUpDown {
if oldIsUp != oldIsHealthy {
err = this.Query(tx).
Pk(addrId).
Set("isUp", newIsUp).
UpdateQuickly()
if err != nil {
return false, err
}
err = this.NotifyUpdate(tx, addrId)
if err != nil {
return false, err
}
// 创建日志
if newIsUp {
err = SharedNodeIPAddressLogDAO.CreateLog(tx, 0, addrId, "健康检查上线")
} else {
err = SharedNodeIPAddressLogDAO.CreateLog(tx, 0, addrId, "健康检查下线")
}
if err != nil {
return true, err
}
return true, nil
}
}
return false, nil return false, nil
} }
countUp := int(one.(*NodeIPAddress).CountUp) var countUp = int(one.(*NodeIPAddress).CountUp)
countDown := int(one.(*NodeIPAddress).CountDown) var countDown = int(one.(*NodeIPAddress).CountDown)
op := NewNodeIPAddressOperator() var op = NewNodeIPAddressOperator()
op.Id = addrId op.Id = addrId
if isUp { if newIsUp {
countUp++ countUp++
countDown = 0 countDown = 0
if countUp >= maxUp { if countUp >= maxUp {
changed = true changed = true
//op.IsUp = true if autoUpDown {
op.IsUp = true
}
op.IsHealthy = true op.IsHealthy = true
} }
} else { } else {
@@ -498,7 +532,9 @@ func (this *NodeIPAddressDAO) UpdateAddressHealthCount(tx *dbs.Tx, addrId int64,
if countDown >= maxDown { if countDown >= maxDown {
changed = true changed = true
//op.IsUp = false if autoUpDown {
op.IsUp = false
}
op.IsHealthy = false op.IsHealthy = false
} }
} }
@@ -515,6 +551,15 @@ func (this *NodeIPAddressDAO) UpdateAddressHealthCount(tx *dbs.Tx, addrId int64,
if err != nil { if err != nil {
return true, err return true, err
} }
// 创建日志
if autoUpDown {
if newIsUp {
err = SharedNodeIPAddressLogDAO.CreateLog(tx, 0, addrId, "健康检查上线")
} else {
err = SharedNodeIPAddressLogDAO.CreateLog(tx, 0, addrId, "健康检查下线")
}
}
} }
return return

View File

@@ -1368,7 +1368,7 @@ func (this *NodeService) FindEnabledNodeDNS(ctx context.Context, req *pb.FindEna
return &pb.FindEnabledNodeDNSResponse{Node: nil}, nil return &pb.FindEnabledNodeDNSResponse{Node: nil}, nil
} }
ipAddr, _, err := models.SharedNodeIPAddressDAO.FindFirstNodeAccessIPAddress(tx, int64(node.Id), nodeconfigs.NodeRoleNode) ipAddr, _, err := models.SharedNodeIPAddressDAO.FindFirstNodeAccessIPAddress(tx, int64(node.Id), true, nodeconfigs.NodeRoleNode)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -158,6 +158,7 @@ func (this *NodeIPAddressService) FindEnabledNodeIPAddress(ctx context.Context,
CanAccess: address.CanAccess, CanAccess: address.CanAccess,
IsOn: address.IsOn, IsOn: address.IsOn,
IsUp: address.IsUp, IsUp: address.IsUp,
IsHealthy: address.IsHealthy,
BackupIP: address.DecodeBackupIP(), BackupIP: address.DecodeBackupIP(),
} }
} }
@@ -194,6 +195,7 @@ func (this *NodeIPAddressService) FindAllEnabledNodeIPAddressesWithNodeId(ctx co
CanAccess: address.CanAccess, CanAccess: address.CanAccess,
IsOn: address.IsOn, IsOn: address.IsOn,
IsUp: address.IsUp, IsUp: address.IsUp,
IsHealthy: address.IsHealthy,
BackupIP: address.DecodeBackupIP(), BackupIP: address.DecodeBackupIP(),
}) })
} }
@@ -218,7 +220,7 @@ func (this *NodeIPAddressService) CountAllEnabledNodeIPAddresses(ctx context.Con
return this.SuccessCount(count) return this.SuccessCount(count)
} }
// ListEnabledIPAddresses 列出单页IP地址 // ListEnabledNodeIPAddresses 列出单页IP地址
func (this *NodeIPAddressService) ListEnabledNodeIPAddresses(ctx context.Context, req *pb.ListEnabledNodeIPAddressesRequest) (*pb.ListEnabledNodeIPAddressesResponse, error) { func (this *NodeIPAddressService) ListEnabledNodeIPAddresses(ctx context.Context, req *pb.ListEnabledNodeIPAddressesRequest) (*pb.ListEnabledNodeIPAddressesResponse, error) {
// 校验请求 // 校验请求
_, err := this.ValidateAdmin(ctx, 0) _, err := this.ValidateAdmin(ctx, 0)
@@ -244,6 +246,7 @@ func (this *NodeIPAddressService) ListEnabledNodeIPAddresses(ctx context.Context
CanAccess: addr.CanAccess, CanAccess: addr.CanAccess,
IsOn: addr.IsOn, IsOn: addr.IsOn,
IsUp: addr.IsUp, IsUp: addr.IsUp,
IsHealthy: addr.IsHealthy,
BackupIP: addr.DecodeBackupIP(), BackupIP: addr.DecodeBackupIP(),
}) })
} }

View File

@@ -7,7 +7,6 @@ import (
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const" teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs" "github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeAPI/internal/utils" "github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils" "github.com/TeaOSLab/EdgeCommon/pkg/configutils"
@@ -45,13 +44,13 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) {
return nil, errors.New("health check config is not found") return nil, errors.New("health check config is not found")
} }
healthCheckConfig := &serverconfigs.HealthCheckConfig{} var healthCheckConfig = &serverconfigs.HealthCheckConfig{}
err = json.Unmarshal(cluster.HealthCheck, healthCheckConfig) err = json.Unmarshal(cluster.HealthCheck, healthCheckConfig)
if err != nil { if err != nil {
return nil, err return nil, err
} }
results := []*HealthCheckResult{} var results = []*HealthCheckResult{}
nodes, err := models.NewNodeDAO().FindAllEnabledNodesWithClusterId(nil, this.clusterId) nodes, err := models.NewNodeDAO().FindAllEnabledNodesWithClusterId(nil, this.clusterId)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -64,7 +63,7 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) {
Node: node, Node: node,
} }
ipAddr, ipAddrId, err := models.NewNodeIPAddressDAO().FindFirstNodeAccessIPAddress(nil, int64(node.Id), nodeconfigs.NodeRoleNode) ipAddr, ipAddrId, err := models.NewNodeIPAddressDAO().FindFirstNodeAccessIPAddress(nil, int64(node.Id), false, nodeconfigs.NodeRoleNode)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -79,7 +78,7 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) {
} }
// 并行检查 // 并行检查
preparedResults := []*HealthCheckResult{} var preparedResults = []*HealthCheckResult{}
for _, result := range results { for _, result := range results {
if len(result.NodeAddr) > 0 { if len(result.NodeAddr) > 0 {
preparedResults = append(preparedResults, result) preparedResults = append(preparedResults, result)
@@ -90,13 +89,13 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) {
return results, nil return results, nil
} }
countResults := len(preparedResults) var countResults = len(preparedResults)
queue := make(chan *HealthCheckResult, countResults) var queue = make(chan *HealthCheckResult, countResults)
for _, result := range preparedResults { for _, result := range preparedResults {
queue <- result queue <- result
} }
countTries := types.Int(healthCheckConfig.CountTries) var countTries = types.Int(healthCheckConfig.CountTries)
if countTries > 10 { // 限定最多尝试10次 TODO 应该在管理界面提示用户 if countTries > 10 { // 限定最多尝试10次 TODO 应该在管理界面提示用户
countTries = 10 countTries = 10
} }
@@ -104,7 +103,7 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) {
countTries = 3 countTries = 3
} }
tryDelay := 1 * time.Second var tryDelay = 1 * time.Second
if healthCheckConfig.TryDelay != nil { if healthCheckConfig.TryDelay != nil {
tryDelay = healthCheckConfig.TryDelay.Duration() tryDelay = healthCheckConfig.TryDelay.Duration()
@@ -113,103 +112,105 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) {
} }
} }
countRoutines := 10 var concurrent = 128
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(countResults) wg.Add(countResults)
for i := 0; i < countRoutines; i++ { for i := 0; i < concurrent; i++ {
goman.New(func() { go func() {
for { for {
select { select {
case result := <-queue: case result := <-queue:
func() { this.runNode(healthCheckConfig, result, countTries, tryDelay)
for i := 1; i <= countTries; i++ {
before := time.Now()
err := this.checkNode(healthCheckConfig, result)
result.CostMs = time.Since(before).Seconds() * 1000
if err != nil {
result.Error = err.Error()
}
if result.IsOk {
break
}
if tryDelay > 0 {
time.Sleep(tryDelay)
}
}
// 修改节点IP状态
if teaconst.IsPlus {
isChanged, err := models.SharedNodeIPAddressDAO.UpdateAddressHealthCount(nil, result.NodeAddrId, result.IsOk, healthCheckConfig.CountUp, healthCheckConfig.CountDown)
if err != nil {
remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error())
return
}
if isChanged {
// 发送消息
var message = ""
var messageType string
var messageLevel string
if result.IsOk {
message = "健康检查成功,节点\"" + result.Node.Name + "\"IP\"" + result.NodeAddr + "\"已恢复上线"
messageType = models.MessageTypeHealthCheckNodeUp
messageLevel = models.MessageLevelSuccess
} else {
message = "健康检查失败,节点\"" + result.Node.Name + "\"IP\"" + result.NodeAddr + "\"已自动下线"
messageType = models.MessageTypeHealthCheckNodeDown
messageLevel = models.MessageLevelError
}
err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), messageType, messageLevel, message, message, nil, false)
if err != nil {
remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error())
return
}
// 触发阈值
err = models.SharedNodeIPAddressDAO.FireThresholds(nil, nodeconfigs.NodeRoleNode, int64(result.Node.Id))
if err != nil {
remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error())
return
}
}
// 我们只处理IP的上下线不修改节点的状态
return
}
// 修改节点状态
if healthCheckConfig.AutoDown {
isChanged, err := models.SharedNodeDAO.UpdateNodeUpCount(nil, int64(result.Node.Id), result.IsOk, healthCheckConfig.CountUp, healthCheckConfig.CountDown)
if err != nil {
remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error())
} else if isChanged {
// 通知恢复或下线
if result.IsOk {
message := "健康检查成功,节点\"" + result.Node.Name + "\"已恢复上线"
err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeUp, models.MessageLevelSuccess, message, message, nil, false)
} else {
message := "健康检查失败,节点\"" + result.Node.Name + "\"已自动下线"
err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeDown, models.MessageLevelError, message, message, nil, false)
}
}
}
}()
wg.Done() wg.Done()
default: default:
return return
} }
} }
}) }()
} }
wg.Wait() wg.Wait()
return results, nil return results, nil
} }
func (this *HealthCheckExecutor) runNode(healthCheckConfig *serverconfigs.HealthCheckConfig, result *HealthCheckResult, countTries int, tryDelay time.Duration) {
for i := 1; i <= countTries; i++ {
var before = time.Now()
err := this.runNodeOnce(healthCheckConfig, result)
result.CostMs = time.Since(before).Seconds() * 1000
if err != nil {
result.Error = err.Error()
}
if result.IsOk {
break
}
if tryDelay > 0 {
time.Sleep(tryDelay)
}
}
// 修改节点IP状态
if teaconst.IsPlus {
isChanged, err := models.SharedNodeIPAddressDAO.UpdateAddressHealthCount(nil, result.NodeAddrId, result.IsOk, healthCheckConfig.CountUp, healthCheckConfig.CountDown, healthCheckConfig.AutoDown)
if err != nil {
remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error())
return
}
if isChanged {
// 发送消息
var message = ""
var messageType string
var messageLevel string
if result.IsOk {
message = "健康检查成功,节点\"" + result.Node.Name + "\"IP\"" + result.NodeAddr + "\"已恢复上线"
messageType = models.MessageTypeHealthCheckNodeUp
messageLevel = models.MessageLevelSuccess
} else {
message = "健康检查失败,节点\"" + result.Node.Name + "\"IP\"" + result.NodeAddr + "\"已自动下线"
messageType = models.MessageTypeHealthCheckNodeDown
messageLevel = models.MessageLevelError
}
err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), messageType, messageLevel, message, message, nil, false)
if err != nil {
remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error())
return
}
// 触发阈值
err = models.SharedNodeIPAddressDAO.FireThresholds(nil, nodeconfigs.NodeRoleNode, int64(result.Node.Id))
if err != nil {
remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error())
return
}
}
// 我们只处理IP的上下线不修改节点的状态
return
}
// 修改节点状态
if healthCheckConfig.AutoDown {
isChanged, err := models.SharedNodeDAO.UpdateNodeUpCount(nil, int64(result.Node.Id), result.IsOk, healthCheckConfig.CountUp, healthCheckConfig.CountDown)
if err != nil {
remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error())
} else if isChanged {
// 通知恢复或下线
if result.IsOk {
message := "健康检查成功,节点\"" + result.Node.Name + "\"已恢复上线"
err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeUp, models.MessageLevelSuccess, message, message, nil, false)
} else {
message := "健康检查失败,节点\"" + result.Node.Name + "\"已自动下线"
err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeDown, models.MessageLevelError, message, message, nil, false)
}
}
}
}
// 检查单个节点 // 检查单个节点
func (this *HealthCheckExecutor) checkNode(healthCheckConfig *serverconfigs.HealthCheckConfig, result *HealthCheckResult) error { func (this *HealthCheckExecutor) runNodeOnce(healthCheckConfig *serverconfigs.HealthCheckConfig, result *HealthCheckResult) error {
// 支持IPv6 // 支持IPv6
if utils.IsIPv6(result.NodeAddr) { if utils.IsIPv6(result.NodeAddr) {
result.NodeAddr = "[" + result.NodeAddr + "]" result.NodeAddr = "[" + result.NodeAddr + "]"
@@ -219,7 +220,7 @@ func (this *HealthCheckExecutor) checkNode(healthCheckConfig *serverconfigs.Heal
healthCheckConfig.URL = "http://${host}/" healthCheckConfig.URL = "http://${host}/"
} }
url := strings.ReplaceAll(healthCheckConfig.URL, "${host}", result.NodeAddr) var url = strings.ReplaceAll(healthCheckConfig.URL, "${host}", result.NodeAddr)
req, err := http.NewRequest(http.MethodGet, url, nil) req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil { if err != nil {
return err return err
@@ -238,12 +239,12 @@ func (this *HealthCheckExecutor) checkNode(healthCheckConfig *serverconfigs.Heal
} }
req.Header.Set(serverconfigs.HealthCheckHeaderName, key) req.Header.Set(serverconfigs.HealthCheckHeaderName, key)
timeout := 5 * time.Second var timeout = 5 * time.Second
if healthCheckConfig.Timeout != nil { if healthCheckConfig.Timeout != nil {
timeout = healthCheckConfig.Timeout.Duration() timeout = healthCheckConfig.Timeout.Duration()
} }
client := &http.Client{ var client = &http.Client{
Timeout: timeout, Timeout: timeout,
Transport: &http.Transport{ Transport: &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
@@ -251,10 +252,6 @@ func (this *HealthCheckExecutor) checkNode(healthCheckConfig *serverconfigs.Heal
if err != nil { if err != nil {
return nil, err return nil, err
} }
conn, err := net.Dial(network, configutils.QuoteIP(result.NodeAddr)+":"+port)
if err == nil {
return conn, nil
}
return net.DialTimeout(network, configutils.QuoteIP(result.NodeAddr)+":"+port, timeout) return net.DialTimeout(network, configutils.QuoteIP(result.NodeAddr)+":"+port, timeout)
}, },
MaxIdleConns: 1, MaxIdleConns: 1,