mirror of
https://github.com/TeaOSLab/EdgeAPI.git
synced 2025-11-22 14:30:25 +08:00
实现健康检查配置、立即执行健康检查
This commit is contained in:
190
internal/tasks/health_check_executor.go
Normal file
190
internal/tasks/health_check_executor.go
Normal file
@@ -0,0 +1,190 @@
|
||||
package tasks
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/errors"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/iwind/TeaGo/lists"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type HealthCheckExecutor struct {
|
||||
clusterId int64
|
||||
}
|
||||
|
||||
func NewHealthCheckExecutor(clusterId int64) *HealthCheckExecutor {
|
||||
return &HealthCheckExecutor{clusterId: clusterId}
|
||||
}
|
||||
|
||||
func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) {
|
||||
cluster, err := models.SharedNodeClusterDAO.FindEnabledNodeCluster(this.clusterId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if cluster == nil {
|
||||
return nil, errors.New("can not find cluster with id '" + strconv.FormatInt(this.clusterId, 10) + "'")
|
||||
}
|
||||
if len(cluster.HealthCheck) == 0 || cluster.HealthCheck == "null" {
|
||||
return nil, errors.New("health check config is not found")
|
||||
}
|
||||
|
||||
healthCheckConfig := &serverconfigs.HealthCheckConfig{}
|
||||
err = json.Unmarshal([]byte(cluster.HealthCheck), healthCheckConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
results := []*HealthCheckResult{}
|
||||
nodes, err := models.SharedNodeDAO.FindAllEnabledNodesWithClusterId(this.clusterId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, node := range nodes {
|
||||
if node.IsOn != 1 {
|
||||
continue
|
||||
}
|
||||
result := &HealthCheckResult{
|
||||
Node: node,
|
||||
}
|
||||
|
||||
addresses, err := models.SharedNodeIPAddressDAO.FindAllEnabledAddressesWithNode(int64(node.Id))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
accessAddresses := []string{}
|
||||
for _, addr := range addresses {
|
||||
if addr.CanAccess == 1 {
|
||||
accessAddresses = append(accessAddresses, addr.Ip)
|
||||
}
|
||||
}
|
||||
if len(accessAddresses) == 0 {
|
||||
result.Error = "no ip address can be used"
|
||||
} else {
|
||||
result.NodeAddr = accessAddresses[0]
|
||||
}
|
||||
|
||||
results = append(results, result)
|
||||
}
|
||||
|
||||
// 并行检查
|
||||
preparedResults := []*HealthCheckResult{}
|
||||
for _, result := range results {
|
||||
if len(result.NodeAddr) > 0 {
|
||||
preparedResults = append(preparedResults, result)
|
||||
}
|
||||
}
|
||||
|
||||
if len(preparedResults) == 0 {
|
||||
return results, nil
|
||||
}
|
||||
|
||||
countResults := len(preparedResults)
|
||||
queue := make(chan *HealthCheckResult, countResults)
|
||||
for _, result := range preparedResults {
|
||||
queue <- result
|
||||
}
|
||||
|
||||
countTries := types.Int(healthCheckConfig.CountTries)
|
||||
if countTries > 10 { // 限定最多尝试10次 TODO 应该在管理界面提示用户
|
||||
countTries = 10
|
||||
}
|
||||
if countTries < 1 {
|
||||
countTries = 1
|
||||
}
|
||||
|
||||
tryDelay := 1 * time.Second
|
||||
if healthCheckConfig.TryDelay != nil {
|
||||
tryDelay = healthCheckConfig.TryDelay.Duration()
|
||||
|
||||
if tryDelay > 1*time.Minute { // 最多不能超过1分钟 TODO 应该在管理界面提示用户
|
||||
tryDelay = 1 * time.Minute
|
||||
}
|
||||
}
|
||||
|
||||
countRoutines := 10
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(countResults)
|
||||
for i := 0; i < countRoutines; i++ {
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case result := <-queue:
|
||||
for i := 1; i <= countTries; i++ {
|
||||
before := time.Now()
|
||||
err := this.checkNode(healthCheckConfig, result)
|
||||
result.CostMs = time.Since(before).Seconds() * 1000
|
||||
if err != nil {
|
||||
result.Error = err.Error()
|
||||
}
|
||||
if result.IsOk {
|
||||
break
|
||||
}
|
||||
if tryDelay > 0 {
|
||||
time.Sleep(tryDelay)
|
||||
}
|
||||
}
|
||||
wg.Done()
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// 检查单个节点
|
||||
func (this *HealthCheckExecutor) checkNode(healthCheckConfig *serverconfigs.HealthCheckConfig, result *HealthCheckResult) error {
|
||||
url := strings.ReplaceAll(healthCheckConfig.URL, "${host}", result.NodeAddr)
|
||||
req, err := http.NewRequest(http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
timeout := 5 * time.Second
|
||||
if healthCheckConfig.Timeout != nil {
|
||||
timeout = healthCheckConfig.Timeout.Duration()
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Timeout: timeout,
|
||||
Transport: &http.Transport{
|
||||
MaxIdleConns: 1,
|
||||
MaxIdleConnsPerHost: 1,
|
||||
MaxConnsPerHost: 1,
|
||||
IdleConnTimeout: 2 * time.Minute,
|
||||
ExpectContinueTimeout: 1 * time.Second,
|
||||
TLSHandshakeTimeout: 0,
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
defer func() {
|
||||
client.CloseIdleConnections()
|
||||
}()
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_ = resp.Body.Close()
|
||||
|
||||
if len(healthCheckConfig.StatusCodes) > 0 && !lists.ContainsInt(healthCheckConfig.StatusCodes, resp.StatusCode) {
|
||||
result.Error = "invalid response status code '" + strconv.Itoa(resp.StatusCode) + "'"
|
||||
return nil
|
||||
}
|
||||
|
||||
result.IsOk = true
|
||||
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user