diff --git a/internal/db/models/dns/dnsutils/dns_utils.go b/internal/db/models/dns/dnsutils/dns_utils.go index 18daaf93..ddc34edd 100644 --- a/internal/db/models/dns/dnsutils/dns_utils.go +++ b/internal/db/models/dns/dnsutils/dns_utils.go @@ -153,7 +153,7 @@ func CheckClusterDNS(tx *dbs.Tx, cluster *models.NodeCluster) (issues []*pb.DNSI } // 检查IP地址 - ipAddr, err := models.SharedNodeIPAddressDAO.FindFirstNodeAccessIPAddress(tx, nodeId, nodeconfigs.NodeRoleNode) + ipAddr, _, err := models.SharedNodeIPAddressDAO.FindFirstNodeAccessIPAddress(tx, nodeId, nodeconfigs.NodeRoleNode) if err != nil { return nil, err } diff --git a/internal/db/models/node_ip_address_dao.go b/internal/db/models/node_ip_address_dao.go index 98bfb1d0..bc2fde80 100644 --- a/internal/db/models/node_ip_address_dao.go +++ b/internal/db/models/node_ip_address_dao.go @@ -102,6 +102,22 @@ func (this *NodeIPAddressDAO) FindAddressName(tx *dbs.Tx, id int64) (string, err FindStringCol("") } +// FindAddressIsHealthy 判断IP地址是否健康 +func (this *NodeIPAddressDAO) FindAddressIsHealthy(tx *dbs.Tx, addressId int64) (isHealthy bool, err error) { + if addressId <= 0 { + return false, nil + } + one, err := this.Query(tx). + Pk(addressId). + Result("isHealthy"). + Find() + if err != nil || one == nil { + return false, err + } + var addr = one.(*NodeIPAddress) + return addr.IsHealthy == 1, nil +} + // CreateAddress 创建IP地址 func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, adminId int64, nodeId int64, role nodeconfigs.NodeRole, name string, ip string, canAccess bool, isUp bool) (addressId int64, err error) { if len(role) == 0 { @@ -216,11 +232,11 @@ func (this *NodeIPAddressDAO) FindAllEnabledAddressesWithNode(tx *dbs.Tx, nodeId } // FindFirstNodeAccessIPAddress 查找节点的第一个可访问的IP地址 -func (this *NodeIPAddressDAO) FindFirstNodeAccessIPAddress(tx *dbs.Tx, nodeId int64, role nodeconfigs.NodeRole) (string, error) { +func (this *NodeIPAddressDAO) FindFirstNodeAccessIPAddress(tx *dbs.Tx, nodeId int64, role nodeconfigs.NodeRole) (ip string, addrId int64, err error) { if len(role) == 0 { role = nodeconfigs.NodeRoleNode } - return this.Query(tx). + one, err := this.Query(tx). Attr("nodeId", nodeId). Attr("role", role). State(NodeIPAddressStateEnabled). @@ -229,8 +245,17 @@ func (this *NodeIPAddressDAO) FindFirstNodeAccessIPAddress(tx *dbs.Tx, nodeId in Attr("isUp", true). Desc("order"). AscPk(). - Result("ip"). - FindStringCol("") + Result("id", "ip"). + Find() + if err != nil { + return "", 0, err + } + if one == nil { + return + } + + var addr = one.(*NodeIPAddress) + return addr.Ip, int64(addr.Id), nil } // FindFirstNodeAccessIPAddressId 查找节点的第一个可访问的IP地址ID @@ -400,6 +425,10 @@ func (this *NodeIPAddressDAO) UpdateAddressIsUp(tx *dbs.Tx, addressId int64, isU var err = this.Query(tx). Pk(addressId). Set("isUp", isUp). + Set("countUp", 0). + Set("countDown", 0). + Set("backupIP", ""). + Set("backupThresholdId", 0). UpdateQuickly() if err != nil { return err @@ -423,6 +452,71 @@ func (this *NodeIPAddressDAO) UpdateAddressBackupIP(tx *dbs.Tx, addressId int64, return this.NotifyUpdate(tx, addressId) } +// UpdateAddressHealthCount 计算IP健康状态 +func (this *NodeIPAddressDAO) UpdateAddressHealthCount(tx *dbs.Tx, addrId int64, isUp bool, maxUp int, maxDown int) (changed bool, err error) { + if addrId <= 0 { + return false, errors.New("invalid address id") + } + one, err := this.Query(tx). + Pk(addrId). + Result("isHealthy", "countUp", "countDown"). + Find() + if err != nil { + return false, err + } + if one == nil { + return false, nil + } + oldIsHealthy := one.(*NodeIPAddress).IsHealthy == 1 + + // 如果新老状态一致,则不做任何事情 + if oldIsHealthy == isUp { + return false, nil + } + + countUp := int(one.(*NodeIPAddress).CountUp) + countDown := int(one.(*NodeIPAddress).CountDown) + + op := NewNodeIPAddressOperator() + op.Id = addrId + + if isUp { + countUp++ + countDown = 0 + + if countUp >= maxUp { + changed = true + //op.IsUp = true + op.IsHealthy = true + } + } else { + countDown++ + countUp = 0 + + if countDown >= maxDown { + changed = true + //op.IsUp = false + op.IsHealthy = false + } + } + + op.CountUp = countUp + op.CountDown = countDown + err = this.Save(tx, op) + if err != nil { + return false, err + } + + if changed { + err = this.NotifyUpdate(tx, addrId) + if err != nil { + return true, err + } + } + + return +} + // NotifyUpdate 通知更新 func (this *NodeIPAddressDAO) NotifyUpdate(tx *dbs.Tx, addressId int64) error { address, err := this.Query(tx). diff --git a/internal/db/models/node_ip_address_dao_test.go b/internal/db/models/node_ip_address_dao_test.go index b37b4c9b..727819d6 100644 --- a/internal/db/models/node_ip_address_dao_test.go +++ b/internal/db/models/node_ip_address_dao_test.go @@ -40,3 +40,25 @@ func TestNodeIPAddressDAO_LoopTasks(t *testing.T) { } t.Log("ok") } + +func TestNodeIPAddressDAO_FindAddressIsHealthy(t *testing.T) { + dbs.NotifyReady() + + var tx *dbs.Tx + isHealthy, err := SharedNodeIPAddressDAO.FindAddressIsHealthy(tx, 1) + if err != nil { + t.Fatal(err) + } + t.Log("isHealthy:", isHealthy) +} + +func TestNodeIPAddressDAO_UpdateAddressHealthCount(t *testing.T) { + dbs.NotifyReady() + + var tx *dbs.Tx + isChanged, err := SharedNodeIPAddressDAO.UpdateAddressHealthCount(tx, 1, true, 3, 3) + if err != nil { + t.Fatal(err) + } + t.Log("isChanged:", isChanged) +} diff --git a/internal/db/models/node_ip_address_model.go b/internal/db/models/node_ip_address_model.go index c3929331..71339753 100644 --- a/internal/db/models/node_ip_address_model.go +++ b/internal/db/models/node_ip_address_model.go @@ -13,10 +13,13 @@ type NodeIPAddress struct { CanAccess uint8 `field:"canAccess"` // 是否可以访问 IsOn uint8 `field:"isOn"` // 是否启用 IsUp uint8 `field:"isUp"` // 是否上线 + IsHealthy uint8 `field:"isHealthy"` // 是否健康 Thresholds string `field:"thresholds"` // 上线阈值 Connectivity string `field:"connectivity"` // 连通性状态 BackupIP string `field:"backupIP"` // 备用IP BackupThresholdId uint32 `field:"backupThresholdId"` // 触发备用IP的阈值 + CountUp uint32 `field:"countUp"` // UP状态次数 + CountDown uint32 `field:"countDown"` // DOWN状态次数 } type NodeIPAddressOperator struct { @@ -31,10 +34,13 @@ type NodeIPAddressOperator struct { CanAccess interface{} // 是否可以访问 IsOn interface{} // 是否启用 IsUp interface{} // 是否上线 + IsHealthy interface{} // 是否健康 Thresholds interface{} // 上线阈值 Connectivity interface{} // 连通性状态 BackupIP interface{} // 备用IP BackupThresholdId interface{} // 触发备用IP的阈值 + CountUp interface{} // UP状态次数 + CountDown interface{} // DOWN状态次数 } func NewNodeIPAddressOperator() *NodeIPAddressOperator { diff --git a/internal/rpc/services/service_node.go b/internal/rpc/services/service_node.go index d6603b70..db829471 100644 --- a/internal/rpc/services/service_node.go +++ b/internal/rpc/services/service_node.go @@ -1271,7 +1271,7 @@ func (this *NodeService) FindEnabledNodeDNS(ctx context.Context, req *pb.FindEna return &pb.FindEnabledNodeDNSResponse{Node: nil}, nil } - ipAddr, err := models.SharedNodeIPAddressDAO.FindFirstNodeAccessIPAddress(tx, int64(node.Id), nodeconfigs.NodeRoleNode) + ipAddr, _, err := models.SharedNodeIPAddressDAO.FindFirstNodeAccessIPAddress(tx, int64(node.Id), nodeconfigs.NodeRoleNode) if err != nil { return nil, err } diff --git a/internal/tasks/health_check_executor.go b/internal/tasks/health_check_executor.go index 6fdeed55..d66b9271 100644 --- a/internal/tasks/health_check_executor.go +++ b/internal/tasks/health_check_executor.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "encoding/json" + teaconst "github.com/TeaOSLab/EdgeAPI/internal/const" "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" @@ -62,7 +63,7 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) { Node: node, } - ipAddr, err := models.NewNodeIPAddressDAO().FindFirstNodeAccessIPAddress(nil, int64(node.Id), nodeconfigs.NodeRoleNode) + ipAddr, ipAddrId, err := models.NewNodeIPAddressDAO().FindFirstNodeAccessIPAddress(nil, int64(node.Id), nodeconfigs.NodeRoleNode) if err != nil { return nil, err } @@ -70,6 +71,7 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) { result.Error = "no ip address can be used" } else { result.NodeAddr = ipAddr + result.NodeAddrId = ipAddrId } results = append(results, result) @@ -118,37 +120,57 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) { for { select { case result := <-queue: - for i := 1; i <= countTries; i++ { - before := time.Now() - err := this.checkNode(healthCheckConfig, result) - result.CostMs = time.Since(before).Seconds() * 1000 - if err != nil { - result.Error = err.Error() - } - if result.IsOk { - break - } - if tryDelay > 0 { - time.Sleep(tryDelay) - } - } - - // 修改节点状态 - if healthCheckConfig.AutoDown { - isChanged, err := models.SharedNodeDAO.UpdateNodeUpCount(nil, int64(result.Node.Id), result.IsOk, healthCheckConfig.CountUp, healthCheckConfig.CountDown) - if err != nil { - remotelogs.Error("HEALTH_CHECK", err.Error()) - } else if isChanged { - // 通知恢复或下线 + func() { + for i := 1; i <= countTries; i++ { + before := time.Now() + err := this.checkNode(healthCheckConfig, result) + result.CostMs = time.Since(before).Seconds() * 1000 + if err != nil { + result.Error = err.Error() + } if result.IsOk { - message := "健康检查成功,节点\"" + result.Node.Name + "\"已恢复上线" - err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeUp, models.MessageLevelSuccess, message, message, nil, false) - } else { - message := "健康检查失败,节点\"" + result.Node.Name + "\"已自动下线" - err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeDown, models.MessageLevelError, message, message, nil, false) + break + } + if tryDelay > 0 { + time.Sleep(tryDelay) } } - } + + // 修改节点IP状态 + if teaconst.IsPlus { + isChanged, err := models.SharedNodeIPAddressDAO.UpdateAddressHealthCount(nil, result.NodeAddrId, result.IsOk, healthCheckConfig.CountUp, healthCheckConfig.CountDown) + if err != nil { + remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error()) + return + } + + if isChanged { + // 触发阈值 + err = models.SharedNodeIPAddressDAO.FireThresholds(nil, nodeconfigs.NodeRoleNode, int64(result.Node.Id)) + if err != nil { + remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error()) + return + } + } + } + + // 修改节点状态 + if healthCheckConfig.AutoDown { + isChanged, err := models.SharedNodeDAO.UpdateNodeUpCount(nil, int64(result.Node.Id), result.IsOk, healthCheckConfig.CountUp, healthCheckConfig.CountDown) + if err != nil { + remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error()) + } else if isChanged { + // 通知恢复或下线 + if result.IsOk { + message := "健康检查成功,节点\"" + result.Node.Name + "\"已恢复上线" + err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeUp, models.MessageLevelSuccess, message, message, nil, false) + } else { + message := "健康检查失败,节点\"" + result.Node.Name + "\"已自动下线" + err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeDown, models.MessageLevelError, message, message, nil, false) + } + } + } + }() wg.Done() default: diff --git a/internal/tasks/health_check_executor_test.go b/internal/tasks/health_check_executor_test.go index 9999a28d..dce5ddac 100644 --- a/internal/tasks/health_check_executor_test.go +++ b/internal/tasks/health_check_executor_test.go @@ -1,14 +1,19 @@ +//go:build plus +// +build plus + package tasks import ( + teaconst "github.com/TeaOSLab/EdgeAPI/internal/const" "github.com/iwind/TeaGo/dbs" "testing" ) func TestHealthCheckExecutor_Run(t *testing.T) { + teaconst.IsPlus = true dbs.NotifyReady() - executor := NewHealthCheckExecutor(10) + executor := NewHealthCheckExecutor(35) results, err := executor.Run() if err != nil { t.Fatal(err) diff --git a/internal/tasks/health_check_result.go b/internal/tasks/health_check_result.go index a8c8c4e0..db9ea643 100644 --- a/internal/tasks/health_check_result.go +++ b/internal/tasks/health_check_result.go @@ -3,9 +3,10 @@ package tasks import "github.com/TeaOSLab/EdgeAPI/internal/db/models" type HealthCheckResult struct { - Node *models.Node - NodeAddr string - IsOk bool - Error string - CostMs float64 + Node *models.Node + NodeAddr string + NodeAddrId int64 + IsOk bool + Error string + CostMs float64 }