2020-10-17 21:15:31 +08:00
package tasks
import (
2021-02-24 16:20:22 +08:00
"context"
2020-10-17 21:15:31 +08:00
"crypto/tls"
"encoding/json"
2021-11-18 14:30:53 +08:00
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
2020-10-17 21:15:31 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
2021-12-14 10:49:29 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/goman"
2021-08-29 16:01:31 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
2021-06-07 10:06:16 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/utils"
2021-09-13 13:46:20 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
2021-05-26 14:40:05 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
2021-10-19 16:31:05 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/nodeutils"
2020-10-17 21:15:31 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/iwind/TeaGo/lists"
2021-10-19 16:31:05 +08:00
"github.com/iwind/TeaGo/maps"
2020-10-17 21:15:31 +08:00
"github.com/iwind/TeaGo/types"
2021-02-24 16:20:22 +08:00
"net"
2020-10-17 21:15:31 +08:00
"net/http"
"strconv"
"strings"
"sync"
"time"
)
type HealthCheckExecutor struct {
clusterId int64
}
func NewHealthCheckExecutor ( clusterId int64 ) * HealthCheckExecutor {
return & HealthCheckExecutor { clusterId : clusterId }
}
func ( this * HealthCheckExecutor ) Run ( ) ( [ ] * HealthCheckResult , error ) {
2021-01-01 23:31:30 +08:00
cluster , err := models . NewNodeClusterDAO ( ) . FindEnabledNodeCluster ( nil , this . clusterId )
2020-10-17 21:15:31 +08:00
if err != nil {
return nil , err
}
if cluster == nil {
return nil , errors . New ( "can not find cluster with id '" + strconv . FormatInt ( this . clusterId , 10 ) + "'" )
}
2022-03-21 21:39:36 +08:00
if ! cluster . HealthCheck . IsNotNull ( ) {
2020-10-17 21:15:31 +08:00
return nil , errors . New ( "health check config is not found" )
}
healthCheckConfig := & serverconfigs . HealthCheckConfig { }
2022-03-21 21:39:36 +08:00
err = json . Unmarshal ( cluster . HealthCheck , healthCheckConfig )
2020-10-17 21:15:31 +08:00
if err != nil {
return nil , err
}
results := [ ] * HealthCheckResult { }
2021-01-01 23:31:30 +08:00
nodes , err := models . NewNodeDAO ( ) . FindAllEnabledNodesWithClusterId ( nil , this . clusterId )
2020-10-17 21:15:31 +08:00
if err != nil {
return nil , err
}
for _ , node := range nodes {
2022-03-22 21:45:07 +08:00
if ! node . IsOn {
2020-10-17 21:15:31 +08:00
continue
}
result := & HealthCheckResult {
Node : node ,
}
2021-11-18 14:30:53 +08:00
ipAddr , ipAddrId , err := models . NewNodeIPAddressDAO ( ) . FindFirstNodeAccessIPAddress ( nil , int64 ( node . Id ) , nodeconfigs . NodeRoleNode )
2020-10-17 21:15:31 +08:00
if err != nil {
return nil , err
}
2020-11-15 21:17:42 +08:00
if len ( ipAddr ) == 0 {
2020-10-17 21:15:31 +08:00
result . Error = "no ip address can be used"
} else {
2020-11-15 21:17:42 +08:00
result . NodeAddr = ipAddr
2021-11-18 14:30:53 +08:00
result . NodeAddrId = ipAddrId
2020-10-17 21:15:31 +08:00
}
results = append ( results , result )
}
// 并行检查
preparedResults := [ ] * HealthCheckResult { }
for _ , result := range results {
if len ( result . NodeAddr ) > 0 {
preparedResults = append ( preparedResults , result )
}
}
if len ( preparedResults ) == 0 {
return results , nil
}
countResults := len ( preparedResults )
queue := make ( chan * HealthCheckResult , countResults )
for _ , result := range preparedResults {
queue <- result
}
countTries := types . Int ( healthCheckConfig . CountTries )
if countTries > 10 { // 限定最多尝试10次 TODO 应该在管理界面提示用户
countTries = 10
}
if countTries < 1 {
2021-08-29 16:01:31 +08:00
countTries = 3
2020-10-17 21:15:31 +08:00
}
tryDelay := 1 * time . Second
if healthCheckConfig . TryDelay != nil {
tryDelay = healthCheckConfig . TryDelay . Duration ( )
if tryDelay > 1 * time . Minute { // 最多不能超过1分钟 TODO 应该在管理界面提示用户
tryDelay = 1 * time . Minute
}
}
countRoutines := 10
wg := sync . WaitGroup { }
wg . Add ( countResults )
for i := 0 ; i < countRoutines ; i ++ {
2021-12-14 10:49:29 +08:00
goman . New ( func ( ) {
2020-10-17 21:15:31 +08:00
for {
select {
case result := <- queue :
2021-11-18 14:30:53 +08:00
func ( ) {
for i := 1 ; i <= countTries ; i ++ {
before := time . Now ( )
err := this . checkNode ( healthCheckConfig , result )
result . CostMs = time . Since ( before ) . Seconds ( ) * 1000
if err != nil {
result . Error = err . Error ( )
}
if result . IsOk {
break
}
if tryDelay > 0 {
time . Sleep ( tryDelay )
}
2020-10-17 21:15:31 +08:00
}
2021-11-18 14:30:53 +08:00
// 修改节点IP状态
if teaconst . IsPlus {
isChanged , err := models . SharedNodeIPAddressDAO . UpdateAddressHealthCount ( nil , result . NodeAddrId , result . IsOk , healthCheckConfig . CountUp , healthCheckConfig . CountDown )
if err != nil {
remotelogs . Error ( "HEALTH_CHECK_EXECUTOR" , err . Error ( ) )
return
}
if isChanged {
2021-12-16 10:09:19 +08:00
// 发送消息
var message = ""
var messageType string
var messageLevel string
if result . IsOk {
message = "健康检查成功,节点\"" + result . Node . Name + "\", IP\"" + result . NodeAddr + "\"已恢复上线"
messageType = models . MessageTypeHealthCheckNodeUp
messageLevel = models . MessageLevelSuccess
} else {
message = "健康检查失败,节点\"" + result . Node . Name + "\", IP\"" + result . NodeAddr + "\"已自动下线"
messageType = models . MessageTypeHealthCheckNodeDown
messageLevel = models . MessageLevelError
}
err = models . NewMessageDAO ( ) . CreateNodeMessage ( nil , nodeconfigs . NodeRoleNode , this . clusterId , int64 ( result . Node . Id ) , messageType , messageLevel , message , message , nil , false )
if err != nil {
remotelogs . Error ( "HEALTH_CHECK_EXECUTOR" , err . Error ( ) )
return
}
2021-11-18 14:30:53 +08:00
// 触发阈值
err = models . SharedNodeIPAddressDAO . FireThresholds ( nil , nodeconfigs . NodeRoleNode , int64 ( result . Node . Id ) )
if err != nil {
remotelogs . Error ( "HEALTH_CHECK_EXECUTOR" , err . Error ( ) )
return
}
}
2021-12-16 10:09:19 +08:00
// 我们只处理IP的上下线, 不修改节点的状态
return
2020-10-17 21:15:31 +08:00
}
2021-11-18 14:30:53 +08:00
// 修改节点状态
if healthCheckConfig . AutoDown {
isChanged , err := models . SharedNodeDAO . UpdateNodeUpCount ( nil , int64 ( result . Node . Id ) , result . IsOk , healthCheckConfig . CountUp , healthCheckConfig . CountDown )
if err != nil {
remotelogs . Error ( "HEALTH_CHECK_EXECUTOR" , err . Error ( ) )
} else if isChanged {
// 通知恢复或下线
if result . IsOk {
message := "健康检查成功,节点\"" + result . Node . Name + "\"已恢复上线"
err = models . NewMessageDAO ( ) . CreateNodeMessage ( nil , nodeconfigs . NodeRoleNode , this . clusterId , int64 ( result . Node . Id ) , models . MessageTypeHealthCheckNodeUp , models . MessageLevelSuccess , message , message , nil , false )
} else {
message := "健康检查失败,节点\"" + result . Node . Name + "\"已自动下线"
err = models . NewMessageDAO ( ) . CreateNodeMessage ( nil , nodeconfigs . NodeRoleNode , this . clusterId , int64 ( result . Node . Id ) , models . MessageTypeHealthCheckNodeDown , models . MessageLevelError , message , message , nil , false )
}
2020-11-16 09:20:24 +08:00
}
2020-11-15 21:17:42 +08:00
}
2021-11-18 14:30:53 +08:00
} ( )
2020-11-15 21:17:42 +08:00
2020-10-17 21:15:31 +08:00
wg . Done ( )
default :
return
}
}
2021-12-14 10:49:29 +08:00
} )
2020-10-17 21:15:31 +08:00
}
wg . Wait ( )
return results , nil
}
// 检查单个节点
func ( this * HealthCheckExecutor ) checkNode ( healthCheckConfig * serverconfigs . HealthCheckConfig , result * HealthCheckResult ) error {
2021-06-07 10:06:16 +08:00
// 支持IPv6
if utils . IsIPv6 ( result . NodeAddr ) {
result . NodeAddr = "[" + result . NodeAddr + "]"
}
2021-10-19 16:31:05 +08:00
if len ( healthCheckConfig . URL ) == 0 {
healthCheckConfig . URL = "http://${host}/"
}
2020-10-17 21:15:31 +08:00
url := strings . ReplaceAll ( healthCheckConfig . URL , "${host}" , result . NodeAddr )
req , err := http . NewRequest ( http . MethodGet , url , nil )
if err != nil {
return err
}
2021-10-19 16:31:05 +08:00
if len ( healthCheckConfig . UserAgent ) > 0 {
req . Header . Set ( "User-Agent" , healthCheckConfig . UserAgent )
} else {
req . Header . Set ( "User-Agent" , "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.81 Safari/537.36" )
}
2021-11-29 09:52:47 +08:00
key , err := nodeutils . Base64EncodeMap ( maps . Map {
2021-10-19 16:31:05 +08:00
"onlyBasicRequest" : healthCheckConfig . OnlyBasicRequest ,
2021-11-29 09:52:47 +08:00
} )
2021-10-19 16:31:05 +08:00
if err != nil {
return err
}
req . Header . Set ( serverconfigs . HealthCheckHeaderName , key )
2020-10-17 21:15:31 +08:00
timeout := 5 * time . Second
if healthCheckConfig . Timeout != nil {
timeout = healthCheckConfig . Timeout . Duration ( )
}
client := & http . Client {
Timeout : timeout ,
Transport : & http . Transport {
2021-02-24 16:20:22 +08:00
DialContext : func ( ctx context . Context , network , addr string ) ( net . Conn , error ) {
_ , port , err := net . SplitHostPort ( addr )
if err != nil {
return nil , err
}
2021-09-13 13:46:20 +08:00
conn , err := net . Dial ( network , configutils . QuoteIP ( result . NodeAddr ) + ":" + port )
2021-07-08 18:37:11 +08:00
if err == nil {
return conn , nil
}
2021-09-13 13:46:20 +08:00
return net . DialTimeout ( network , configutils . QuoteIP ( result . NodeAddr ) + ":" + port , timeout )
2021-02-24 16:20:22 +08:00
} ,
2020-10-17 21:15:31 +08:00
MaxIdleConns : 1 ,
MaxIdleConnsPerHost : 1 ,
MaxConnsPerHost : 1 ,
IdleConnTimeout : 2 * time . Minute ,
ExpectContinueTimeout : 1 * time . Second ,
TLSHandshakeTimeout : 0 ,
TLSClientConfig : & tls . Config {
InsecureSkipVerify : true ,
} ,
} ,
}
defer func ( ) {
client . CloseIdleConnections ( )
} ( )
resp , err := client . Do ( req )
if err != nil {
return err
}
_ = resp . Body . Close ( )
if len ( healthCheckConfig . StatusCodes ) > 0 && ! lists . ContainsInt ( healthCheckConfig . StatusCodes , resp . StatusCode ) {
result . Error = "invalid response status code '" + strconv . Itoa ( resp . StatusCode ) + "'"
return nil
}
result . IsOk = true
return nil
}