package tasks import ( "context" "crypto/tls" "encoding/json" "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models/dns" "github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/iwind/TeaGo/lists" "github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/types" "net" "net/http" "strconv" "strings" "sync" "time" ) type HealthCheckExecutor struct { clusterId int64 } func NewHealthCheckExecutor(clusterId int64) *HealthCheckExecutor { return &HealthCheckExecutor{clusterId: clusterId} } func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) { cluster, err := models.NewNodeClusterDAO().FindEnabledNodeCluster(nil, this.clusterId) if err != nil { return nil, err } if cluster == nil { return nil, errors.New("can not find cluster with id '" + strconv.FormatInt(this.clusterId, 10) + "'") } if len(cluster.HealthCheck) == 0 || cluster.HealthCheck == "null" { return nil, errors.New("health check config is not found") } healthCheckConfig := &serverconfigs.HealthCheckConfig{} err = json.Unmarshal([]byte(cluster.HealthCheck), healthCheckConfig) if err != nil { return nil, err } results := []*HealthCheckResult{} nodes, err := models.NewNodeDAO().FindAllEnabledNodesWithClusterId(nil, this.clusterId) if err != nil { return nil, err } for _, node := range nodes { if node.IsOn != 1 { continue } result := &HealthCheckResult{ Node: node, } ipAddr, err := models.NewNodeIPAddressDAO().FindFirstNodeAccessIPAddress(nil, int64(node.Id)) if err != nil { return nil, err } if len(ipAddr) == 0 { result.Error = "no ip address can be used" } else { result.NodeAddr = ipAddr } results = append(results, result) } // 并行检查 preparedResults := []*HealthCheckResult{} for _, result := range results { if len(result.NodeAddr) > 0 { preparedResults = append(preparedResults, result) } } if len(preparedResults) == 0 { return results, nil } countResults := len(preparedResults) queue := make(chan *HealthCheckResult, countResults) for _, result := range preparedResults { queue <- result } countTries := types.Int(healthCheckConfig.CountTries) if countTries > 10 { // 限定最多尝试10次 TODO 应该在管理界面提示用户 countTries = 10 } if countTries < 1 { countTries = 1 } tryDelay := 1 * time.Second if healthCheckConfig.TryDelay != nil { tryDelay = healthCheckConfig.TryDelay.Duration() if tryDelay > 1*time.Minute { // 最多不能超过1分钟 TODO 应该在管理界面提示用户 tryDelay = 1 * time.Minute } } countRoutines := 10 wg := sync.WaitGroup{} wg.Add(countResults) for i := 0; i < countRoutines; i++ { go func() { for { select { case result := <-queue: for i := 1; i <= countTries; i++ { before := time.Now() err := this.checkNode(healthCheckConfig, result) result.CostMs = time.Since(before).Seconds() * 1000 if err != nil { result.Error = err.Error() } if result.IsOk { break } if tryDelay > 0 { time.Sleep(tryDelay) } } // 修改节点状态 if healthCheckConfig.AutoDown { isChanged, err := models.SharedNodeDAO.UpdateNodeUp(nil, int64(result.Node.Id), result.IsOk, healthCheckConfig.CountUp, healthCheckConfig.CountDown) if err != nil { logs.Println("[HEALTH_CHECK]" + err.Error()) } else if isChanged { // 通知DNS更新 err = dns.SharedDNSTaskDAO.CreateNodeTask(nil, int64(result.Node.Id), dns.DNSTaskTypeNodeChange) if err != nil { logs.Println("[HEALTH_CHECK]" + err.Error()) } // 通知恢复或下线 if result.IsOk { err = models.NewMessageDAO().CreateNodeMessage(nil, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeUp, models.MessageLevelSuccess, "健康检查成功,节点\""+result.Node.Name+"\"已恢复上线", nil) } else { err = models.NewMessageDAO().CreateNodeMessage(nil, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeDown, models.MessageLevelError, "健康检查失败,节点\""+result.Node.Name+"\"已自动下线", nil) } } } wg.Done() default: return } } }() } wg.Wait() return results, nil } // 检查单个节点 func (this *HealthCheckExecutor) checkNode(healthCheckConfig *serverconfigs.HealthCheckConfig, result *HealthCheckResult) error { url := strings.ReplaceAll(healthCheckConfig.URL, "${host}", result.NodeAddr) req, err := http.NewRequest(http.MethodGet, url, nil) if err != nil { return err } timeout := 5 * time.Second if healthCheckConfig.Timeout != nil { timeout = healthCheckConfig.Timeout.Duration() } client := &http.Client{ Timeout: timeout, Transport: &http.Transport{ DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { _, port, err := net.SplitHostPort(addr) if err != nil { return nil, err } return net.Dial(network, result.NodeAddr+":"+port) }, MaxIdleConns: 1, MaxIdleConnsPerHost: 1, MaxConnsPerHost: 1, IdleConnTimeout: 2 * time.Minute, ExpectContinueTimeout: 1 * time.Second, TLSHandshakeTimeout: 0, TLSClientConfig: &tls.Config{ InsecureSkipVerify: true, }, }, } defer func() { client.CloseIdleConnections() }() resp, err := client.Do(req) if err != nil { return err } _ = resp.Body.Close() if len(healthCheckConfig.StatusCodes) > 0 && !lists.ContainsInt(healthCheckConfig.StatusCodes, resp.StatusCode) { result.Error = "invalid response status code '" + strconv.Itoa(resp.StatusCode) + "'" return nil } result.IsOk = true return nil }