节点IP阈值增加节点健康检查失败

This commit is contained in:
GoEdgeLab
2021-11-18 14:30:53 +08:00
parent fd3ccfe0d1
commit ead3718c57
8 changed files with 191 additions and 41 deletions

View File

@@ -153,7 +153,7 @@ func CheckClusterDNS(tx *dbs.Tx, cluster *models.NodeCluster) (issues []*pb.DNSI
} }
// 检查IP地址 // 检查IP地址
ipAddr, err := models.SharedNodeIPAddressDAO.FindFirstNodeAccessIPAddress(tx, nodeId, nodeconfigs.NodeRoleNode) ipAddr, _, err := models.SharedNodeIPAddressDAO.FindFirstNodeAccessIPAddress(tx, nodeId, nodeconfigs.NodeRoleNode)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -102,6 +102,22 @@ func (this *NodeIPAddressDAO) FindAddressName(tx *dbs.Tx, id int64) (string, err
FindStringCol("") FindStringCol("")
} }
// FindAddressIsHealthy 判断IP地址是否健康
func (this *NodeIPAddressDAO) FindAddressIsHealthy(tx *dbs.Tx, addressId int64) (isHealthy bool, err error) {
if addressId <= 0 {
return false, nil
}
one, err := this.Query(tx).
Pk(addressId).
Result("isHealthy").
Find()
if err != nil || one == nil {
return false, err
}
var addr = one.(*NodeIPAddress)
return addr.IsHealthy == 1, nil
}
// CreateAddress 创建IP地址 // CreateAddress 创建IP地址
func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, adminId int64, nodeId int64, role nodeconfigs.NodeRole, name string, ip string, canAccess bool, isUp bool) (addressId int64, err error) { func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, adminId int64, nodeId int64, role nodeconfigs.NodeRole, name string, ip string, canAccess bool, isUp bool) (addressId int64, err error) {
if len(role) == 0 { if len(role) == 0 {
@@ -216,11 +232,11 @@ func (this *NodeIPAddressDAO) FindAllEnabledAddressesWithNode(tx *dbs.Tx, nodeId
} }
// FindFirstNodeAccessIPAddress 查找节点的第一个可访问的IP地址 // FindFirstNodeAccessIPAddress 查找节点的第一个可访问的IP地址
func (this *NodeIPAddressDAO) FindFirstNodeAccessIPAddress(tx *dbs.Tx, nodeId int64, role nodeconfigs.NodeRole) (string, error) { func (this *NodeIPAddressDAO) FindFirstNodeAccessIPAddress(tx *dbs.Tx, nodeId int64, role nodeconfigs.NodeRole) (ip string, addrId int64, err error) {
if len(role) == 0 { if len(role) == 0 {
role = nodeconfigs.NodeRoleNode role = nodeconfigs.NodeRoleNode
} }
return this.Query(tx). one, err := this.Query(tx).
Attr("nodeId", nodeId). Attr("nodeId", nodeId).
Attr("role", role). Attr("role", role).
State(NodeIPAddressStateEnabled). State(NodeIPAddressStateEnabled).
@@ -229,8 +245,17 @@ func (this *NodeIPAddressDAO) FindFirstNodeAccessIPAddress(tx *dbs.Tx, nodeId in
Attr("isUp", true). Attr("isUp", true).
Desc("order"). Desc("order").
AscPk(). AscPk().
Result("ip"). Result("id", "ip").
FindStringCol("") Find()
if err != nil {
return "", 0, err
}
if one == nil {
return
}
var addr = one.(*NodeIPAddress)
return addr.Ip, int64(addr.Id), nil
} }
// FindFirstNodeAccessIPAddressId 查找节点的第一个可访问的IP地址ID // FindFirstNodeAccessIPAddressId 查找节点的第一个可访问的IP地址ID
@@ -400,6 +425,10 @@ func (this *NodeIPAddressDAO) UpdateAddressIsUp(tx *dbs.Tx, addressId int64, isU
var err = this.Query(tx). var err = this.Query(tx).
Pk(addressId). Pk(addressId).
Set("isUp", isUp). Set("isUp", isUp).
Set("countUp", 0).
Set("countDown", 0).
Set("backupIP", "").
Set("backupThresholdId", 0).
UpdateQuickly() UpdateQuickly()
if err != nil { if err != nil {
return err return err
@@ -423,6 +452,71 @@ func (this *NodeIPAddressDAO) UpdateAddressBackupIP(tx *dbs.Tx, addressId int64,
return this.NotifyUpdate(tx, addressId) return this.NotifyUpdate(tx, addressId)
} }
// UpdateAddressHealthCount 计算IP健康状态
func (this *NodeIPAddressDAO) UpdateAddressHealthCount(tx *dbs.Tx, addrId int64, isUp bool, maxUp int, maxDown int) (changed bool, err error) {
if addrId <= 0 {
return false, errors.New("invalid address id")
}
one, err := this.Query(tx).
Pk(addrId).
Result("isHealthy", "countUp", "countDown").
Find()
if err != nil {
return false, err
}
if one == nil {
return false, nil
}
oldIsHealthy := one.(*NodeIPAddress).IsHealthy == 1
// 如果新老状态一致,则不做任何事情
if oldIsHealthy == isUp {
return false, nil
}
countUp := int(one.(*NodeIPAddress).CountUp)
countDown := int(one.(*NodeIPAddress).CountDown)
op := NewNodeIPAddressOperator()
op.Id = addrId
if isUp {
countUp++
countDown = 0
if countUp >= maxUp {
changed = true
//op.IsUp = true
op.IsHealthy = true
}
} else {
countDown++
countUp = 0
if countDown >= maxDown {
changed = true
//op.IsUp = false
op.IsHealthy = false
}
}
op.CountUp = countUp
op.CountDown = countDown
err = this.Save(tx, op)
if err != nil {
return false, err
}
if changed {
err = this.NotifyUpdate(tx, addrId)
if err != nil {
return true, err
}
}
return
}
// NotifyUpdate 通知更新 // NotifyUpdate 通知更新
func (this *NodeIPAddressDAO) NotifyUpdate(tx *dbs.Tx, addressId int64) error { func (this *NodeIPAddressDAO) NotifyUpdate(tx *dbs.Tx, addressId int64) error {
address, err := this.Query(tx). address, err := this.Query(tx).

View File

@@ -40,3 +40,25 @@ func TestNodeIPAddressDAO_LoopTasks(t *testing.T) {
} }
t.Log("ok") t.Log("ok")
} }
func TestNodeIPAddressDAO_FindAddressIsHealthy(t *testing.T) {
dbs.NotifyReady()
var tx *dbs.Tx
isHealthy, err := SharedNodeIPAddressDAO.FindAddressIsHealthy(tx, 1)
if err != nil {
t.Fatal(err)
}
t.Log("isHealthy:", isHealthy)
}
func TestNodeIPAddressDAO_UpdateAddressHealthCount(t *testing.T) {
dbs.NotifyReady()
var tx *dbs.Tx
isChanged, err := SharedNodeIPAddressDAO.UpdateAddressHealthCount(tx, 1, true, 3, 3)
if err != nil {
t.Fatal(err)
}
t.Log("isChanged:", isChanged)
}

View File

@@ -13,10 +13,13 @@ type NodeIPAddress struct {
CanAccess uint8 `field:"canAccess"` // 是否可以访问 CanAccess uint8 `field:"canAccess"` // 是否可以访问
IsOn uint8 `field:"isOn"` // 是否启用 IsOn uint8 `field:"isOn"` // 是否启用
IsUp uint8 `field:"isUp"` // 是否上线 IsUp uint8 `field:"isUp"` // 是否上线
IsHealthy uint8 `field:"isHealthy"` // 是否健康
Thresholds string `field:"thresholds"` // 上线阈值 Thresholds string `field:"thresholds"` // 上线阈值
Connectivity string `field:"connectivity"` // 连通性状态 Connectivity string `field:"connectivity"` // 连通性状态
BackupIP string `field:"backupIP"` // 备用IP BackupIP string `field:"backupIP"` // 备用IP
BackupThresholdId uint32 `field:"backupThresholdId"` // 触发备用IP的阈值 BackupThresholdId uint32 `field:"backupThresholdId"` // 触发备用IP的阈值
CountUp uint32 `field:"countUp"` // UP状态次数
CountDown uint32 `field:"countDown"` // DOWN状态次数
} }
type NodeIPAddressOperator struct { type NodeIPAddressOperator struct {
@@ -31,10 +34,13 @@ type NodeIPAddressOperator struct {
CanAccess interface{} // 是否可以访问 CanAccess interface{} // 是否可以访问
IsOn interface{} // 是否启用 IsOn interface{} // 是否启用
IsUp interface{} // 是否上线 IsUp interface{} // 是否上线
IsHealthy interface{} // 是否健康
Thresholds interface{} // 上线阈值 Thresholds interface{} // 上线阈值
Connectivity interface{} // 连通性状态 Connectivity interface{} // 连通性状态
BackupIP interface{} // 备用IP BackupIP interface{} // 备用IP
BackupThresholdId interface{} // 触发备用IP的阈值 BackupThresholdId interface{} // 触发备用IP的阈值
CountUp interface{} // UP状态次数
CountDown interface{} // DOWN状态次数
} }
func NewNodeIPAddressOperator() *NodeIPAddressOperator { func NewNodeIPAddressOperator() *NodeIPAddressOperator {

View File

@@ -1271,7 +1271,7 @@ func (this *NodeService) FindEnabledNodeDNS(ctx context.Context, req *pb.FindEna
return &pb.FindEnabledNodeDNSResponse{Node: nil}, nil return &pb.FindEnabledNodeDNSResponse{Node: nil}, nil
} }
ipAddr, err := models.SharedNodeIPAddressDAO.FindFirstNodeAccessIPAddress(tx, int64(node.Id), nodeconfigs.NodeRoleNode) ipAddr, _, err := models.SharedNodeIPAddressDAO.FindFirstNodeAccessIPAddress(tx, int64(node.Id), nodeconfigs.NodeRoleNode)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"crypto/tls" "crypto/tls"
"encoding/json" "encoding/json"
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs" "github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
@@ -62,7 +63,7 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) {
Node: node, Node: node,
} }
ipAddr, err := models.NewNodeIPAddressDAO().FindFirstNodeAccessIPAddress(nil, int64(node.Id), nodeconfigs.NodeRoleNode) ipAddr, ipAddrId, err := models.NewNodeIPAddressDAO().FindFirstNodeAccessIPAddress(nil, int64(node.Id), nodeconfigs.NodeRoleNode)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -70,6 +71,7 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) {
result.Error = "no ip address can be used" result.Error = "no ip address can be used"
} else { } else {
result.NodeAddr = ipAddr result.NodeAddr = ipAddr
result.NodeAddrId = ipAddrId
} }
results = append(results, result) results = append(results, result)
@@ -118,37 +120,57 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) {
for { for {
select { select {
case result := <-queue: case result := <-queue:
for i := 1; i <= countTries; i++ { func() {
before := time.Now() for i := 1; i <= countTries; i++ {
err := this.checkNode(healthCheckConfig, result) before := time.Now()
result.CostMs = time.Since(before).Seconds() * 1000 err := this.checkNode(healthCheckConfig, result)
if err != nil { result.CostMs = time.Since(before).Seconds() * 1000
result.Error = err.Error() if err != nil {
} result.Error = err.Error()
if result.IsOk { }
break
}
if tryDelay > 0 {
time.Sleep(tryDelay)
}
}
// 修改节点状态
if healthCheckConfig.AutoDown {
isChanged, err := models.SharedNodeDAO.UpdateNodeUpCount(nil, int64(result.Node.Id), result.IsOk, healthCheckConfig.CountUp, healthCheckConfig.CountDown)
if err != nil {
remotelogs.Error("HEALTH_CHECK", err.Error())
} else if isChanged {
// 通知恢复或下线
if result.IsOk { if result.IsOk {
message := "健康检查成功,节点\"" + result.Node.Name + "\"已恢复上线" break
err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeUp, models.MessageLevelSuccess, message, message, nil, false) }
} else { if tryDelay > 0 {
message := "健康检查失败,节点\"" + result.Node.Name + "\"已自动下线" time.Sleep(tryDelay)
err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeDown, models.MessageLevelError, message, message, nil, false)
} }
} }
}
// 修改节点IP状态
if teaconst.IsPlus {
isChanged, err := models.SharedNodeIPAddressDAO.UpdateAddressHealthCount(nil, result.NodeAddrId, result.IsOk, healthCheckConfig.CountUp, healthCheckConfig.CountDown)
if err != nil {
remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error())
return
}
if isChanged {
// 触发阈值
err = models.SharedNodeIPAddressDAO.FireThresholds(nil, nodeconfigs.NodeRoleNode, int64(result.Node.Id))
if err != nil {
remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error())
return
}
}
}
// 修改节点状态
if healthCheckConfig.AutoDown {
isChanged, err := models.SharedNodeDAO.UpdateNodeUpCount(nil, int64(result.Node.Id), result.IsOk, healthCheckConfig.CountUp, healthCheckConfig.CountDown)
if err != nil {
remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error())
} else if isChanged {
// 通知恢复或下线
if result.IsOk {
message := "健康检查成功,节点\"" + result.Node.Name + "\"已恢复上线"
err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeUp, models.MessageLevelSuccess, message, message, nil, false)
} else {
message := "健康检查失败,节点\"" + result.Node.Name + "\"已自动下线"
err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeDown, models.MessageLevelError, message, message, nil, false)
}
}
}
}()
wg.Done() wg.Done()
default: default:

View File

@@ -1,14 +1,19 @@
//go:build plus
// +build plus
package tasks package tasks
import ( import (
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"testing" "testing"
) )
func TestHealthCheckExecutor_Run(t *testing.T) { func TestHealthCheckExecutor_Run(t *testing.T) {
teaconst.IsPlus = true
dbs.NotifyReady() dbs.NotifyReady()
executor := NewHealthCheckExecutor(10) executor := NewHealthCheckExecutor(35)
results, err := executor.Run() results, err := executor.Run()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@@ -3,9 +3,10 @@ package tasks
import "github.com/TeaOSLab/EdgeAPI/internal/db/models" import "github.com/TeaOSLab/EdgeAPI/internal/db/models"
type HealthCheckResult struct { type HealthCheckResult struct {
Node *models.Node Node *models.Node
NodeAddr string NodeAddr string
IsOk bool NodeAddrId int64
Error string IsOk bool
CostMs float64 Error string
CostMs float64
} }