diff --git a/internal/db/models/node_cluster_dao.go b/internal/db/models/node_cluster_dao.go index c90fb5b7..bee28303 100644 --- a/internal/db/models/node_cluster_dao.go +++ b/internal/db/models/node_cluster_dao.go @@ -3,11 +3,13 @@ package models import ( "encoding/json" "errors" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/rands" "github.com/iwind/TeaGo/types" + "strconv" ) const ( @@ -214,6 +216,39 @@ func (this *NodeClusterDAO) FindAllAPINodeAddrsWithCluster(clusterId int64) (res return result, nil } +// 查找健康检查设置 +func (this *NodeClusterDAO) FindClusterHealthCheckConfig(clusterId int64) (*serverconfigs.HealthCheckConfig, error) { + col, err := this.Query(). + Pk(clusterId). + Result("healthCheck"). + FindStringCol("") + if err != nil { + return nil, err + } + if len(col) == 0 || col == "null" { + return nil, nil + } + + config := &serverconfigs.HealthCheckConfig{} + err = json.Unmarshal([]byte(col), config) + if err != nil { + return nil, err + } + return config, nil +} + +// 修改健康检查设置 +func (this *NodeClusterDAO) UpdateClusterHealthCheck(clusterId int64, healthCheckJSON []byte) error { + if clusterId <= 0 { + return errors.New("invalid clusterId '" + strconv.FormatInt(clusterId, 10) + "'") + } + op := NewNodeClusterOperator() + op.Id = clusterId + op.HealthCheck = healthCheckJSON + _, err := this.Save(op) + return err +} + // 生成唯一ID func (this *NodeClusterDAO) genUniqueId() (string, error) { for { diff --git a/internal/db/models/node_cluster_model.go b/internal/db/models/node_cluster_model.go index 9334c37b..5253e510 100644 --- a/internal/db/models/node_cluster_model.go +++ b/internal/db/models/node_cluster_model.go @@ -16,6 +16,7 @@ type NodeCluster struct { AutoRegister uint8 `field:"autoRegister"` // 是否开启自动注册 UniqueId string `field:"uniqueId"` // 唯一ID Secret string `field:"secret"` // 密钥 + HealthCheck string `field:"healthCheck"` // 健康检查 } type NodeClusterOperator struct { @@ -33,6 +34,7 @@ type NodeClusterOperator struct { AutoRegister interface{} // 是否开启自动注册 UniqueId interface{} // 唯一ID Secret interface{} // 密钥 + HealthCheck interface{} // 健康检查 } func NewNodeClusterOperator() *NodeClusterOperator { diff --git a/internal/db/models/node_ip_address_dao.go b/internal/db/models/node_ip_address_dao.go index 1c795378..5bfb58d4 100644 --- a/internal/db/models/node_ip_address_dao.go +++ b/internal/db/models/node_ip_address_dao.go @@ -85,11 +85,12 @@ func (this *NodeIPAddressDAO) FindAddressName(id int64) (string, error) { } // 创建IP地址 -func (this *NodeIPAddressDAO) CreateAddress(nodeId int64, name string, ip string) (addressId int64, err error) { +func (this *NodeIPAddressDAO) CreateAddress(nodeId int64, name string, ip string, canAccess bool) (addressId int64, err error) { op := NewNodeIPAddressOperator() op.NodeId = nodeId op.Name = name - op.IP = ip + op.Ip = ip + op.CanAccess = canAccess op.State = NodeIPAddressStateEnabled _, err = this.Save(op) if err != nil { @@ -99,7 +100,7 @@ func (this *NodeIPAddressDAO) CreateAddress(nodeId int64, name string, ip string } // 修改IP地址 -func (this *NodeIPAddressDAO) UpdateAddress(addressId int64, name string, ip string) (err error) { +func (this *NodeIPAddressDAO) UpdateAddress(addressId int64, name string, ip string, canAccess bool) (err error) { if addressId <= 0 { return errors.New("invalid addressId") } @@ -107,7 +108,9 @@ func (this *NodeIPAddressDAO) UpdateAddress(addressId int64, name string, ip str op := NewNodeIPAddressOperator() op.Id = addressId op.Name = name - op.IP = ip + op.Ip = ip + op.CanAccess = canAccess + op.State = NodeIPAddressStateEnabled // 恢复状态 _, err = this.Save(op) return err } diff --git a/internal/db/models/node_ip_address_model.go b/internal/db/models/node_ip_address_model.go index fa66cb2d..bdc8362a 100644 --- a/internal/db/models/node_ip_address_model.go +++ b/internal/db/models/node_ip_address_model.go @@ -1,24 +1,26 @@ package models -// +// 节点IP地址 type NodeIPAddress struct { Id uint32 `field:"id"` // ID NodeId uint32 `field:"nodeId"` // 节点ID Name string `field:"name"` // 名称 - IP string `field:"ip"` // IP地址 + Ip string `field:"ip"` // IP地址 Description string `field:"description"` // 描述 State uint8 `field:"state"` // 状态 Order uint32 `field:"order"` // 排序 + CanAccess uint8 `field:"canAccess"` // 是否可以访问 } type NodeIPAddressOperator struct { Id interface{} // ID NodeId interface{} // 节点ID Name interface{} // 名称 - IP interface{} // IP地址 + Ip interface{} // IP地址 Description interface{} // 描述 State interface{} // 状态 Order interface{} // 排序 + CanAccess interface{} // 是否可以访问 } func NewNodeIPAddressOperator() *NodeIPAddressOperator { diff --git a/internal/db/models/node_model_ext.go b/internal/db/models/node_model_ext.go index 9eda54b0..97f7120f 100644 --- a/internal/db/models/node_model_ext.go +++ b/internal/db/models/node_model_ext.go @@ -25,3 +25,4 @@ func (this *Node) DecodeInstallStatus() (*NodeInstallStatus, error) { return status, nil } + diff --git a/internal/rpc/services/service_node_cluster.go b/internal/rpc/services/service_node_cluster.go index 7b9636ce..7971b661 100644 --- a/internal/rpc/services/service_node_cluster.go +++ b/internal/rpc/services/service_node_cluster.go @@ -6,7 +6,9 @@ import ( "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/errors" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" + "github.com/TeaOSLab/EdgeAPI/internal/tasks" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/iwind/TeaGo/types" "strconv" ) @@ -244,3 +246,66 @@ func (this *NodeClusterService) ListEnabledNodeClusters(ctx context.Context, req return &pb.ListEnabledNodeClustersResponse{Clusters: result}, nil } + +// 查找集群的健康检查配置 +func (this *NodeClusterService) FindNodeClusterHealthCheckConfig(ctx context.Context, req *pb.FindNodeClusterHealthCheckConfigRequest) (*pb.FindNodeClusterHealthCheckConfigResponse, error) { + // 校验请求 + _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin) + if err != nil { + return nil, err + } + + config, err := models.SharedNodeClusterDAO.FindClusterHealthCheckConfig(req.ClusterId) + if err != nil { + return nil, err + } + configJSON, err := json.Marshal(config) + if err != nil { + return nil, err + } + return &pb.FindNodeClusterHealthCheckConfigResponse{HealthCheckConfig: configJSON}, nil +} + +// 修改集群健康检查设置 +func (this *NodeClusterService) UpdateNodeClusterHealthCheck(ctx context.Context, req *pb.UpdateNodeClusterHealthCheckRequest) (*pb.RPCUpdateSuccess, error) { + // 校验请求 + _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin) + if err != nil { + return nil, err + } + + err = models.SharedNodeClusterDAO.UpdateClusterHealthCheck(req.ClusterId, req.HealthCheckJSON) + if err != nil { + return nil, err + } + return rpcutils.RPCUpdateSuccess() +} + +// 执行健康检查 +func (this *NodeClusterService) ExecuteNodeClusterHealthCheck(ctx context.Context, req *pb.ExecuteNodeClusterHealthCheckRequest) (*pb.ExecuteNodeClusterHealthCheckResponse, error) { + // 校验请求 + _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin) + if err != nil { + return nil, err + } + + executor := tasks.NewHealthCheckExecutor(req.ClusterId) + results, err := executor.Run() + if err != nil { + return nil, err + } + pbResults := []*pb.ExecuteNodeClusterHealthCheckResponse_Result{} + for _, result := range results { + pbResults = append(pbResults, &pb.ExecuteNodeClusterHealthCheckResponse_Result{ + Node: &pb.Node{ + Id: int64(result.Node.Id), + Name: result.Node.Name, + }, + NodeAddr: result.NodeAddr, + IsOk: result.IsOk, + Error: result.Error, + CostMs: types.Float32(result.CostMs), + }) + } + return &pb.ExecuteNodeClusterHealthCheckResponse{Results: pbResults}, nil +} diff --git a/internal/rpc/services/service_node_ip_address.go b/internal/rpc/services/service_node_ip_address.go index 5c553570..0d60aba7 100644 --- a/internal/rpc/services/service_node_ip_address.go +++ b/internal/rpc/services/service_node_ip_address.go @@ -18,7 +18,7 @@ func (this *NodeIPAddressService) CreateNodeIPAddress(ctx context.Context, req * return nil, err } - addressId, err := models.SharedNodeIPAddressDAO.CreateAddress(req.NodeId, req.Name, req.Ip) + addressId, err := models.SharedNodeIPAddressDAO.CreateAddress(req.NodeId, req.Name, req.Ip, req.CanAccess) if err != nil { return nil, err } @@ -34,7 +34,7 @@ func (this *NodeIPAddressService) UpdateNodeIPAddress(ctx context.Context, req * return nil, err } - err = models.SharedNodeIPAddressDAO.UpdateAddress(req.AddressId, req.Name, req.Ip) + err = models.SharedNodeIPAddressDAO.UpdateAddress(req.AddressId, req.Name, req.Ip, req.CanAccess) if err != nil { return nil, err } @@ -109,10 +109,11 @@ func (this *NodeIPAddressService) FindEnabledNodeIPAddress(ctx context.Context, Id: int64(address.Id), NodeId: int64(address.NodeId), Name: address.Name, - Ip: address.IP, + Ip: address.Ip, Description: address.Description, State: int64(address.State), Order: int64(address.Order), + CanAccess: address.CanAccess == 1, } } @@ -138,10 +139,11 @@ func (this *NodeIPAddressService) FindAllEnabledIPAddressesWithNodeId(ctx contex Id: int64(address.Id), NodeId: int64(address.NodeId), Name: address.Name, - Ip: address.IP, + Ip: address.Ip, Description: address.Description, State: int64(address.State), Order: int64(address.Order), + CanAccess: address.CanAccess == 1, }) } diff --git a/internal/tasks/health_check_executor.go b/internal/tasks/health_check_executor.go new file mode 100644 index 00000000..25a11891 --- /dev/null +++ b/internal/tasks/health_check_executor.go @@ -0,0 +1,190 @@ +package tasks + +import ( + "crypto/tls" + "encoding/json" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/iwind/TeaGo/lists" + "github.com/iwind/TeaGo/types" + "net/http" + "strconv" + "strings" + "sync" + "time" +) + +type HealthCheckExecutor struct { + clusterId int64 +} + +func NewHealthCheckExecutor(clusterId int64) *HealthCheckExecutor { + return &HealthCheckExecutor{clusterId: clusterId} +} + +func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) { + cluster, err := models.SharedNodeClusterDAO.FindEnabledNodeCluster(this.clusterId) + if err != nil { + return nil, err + } + if cluster == nil { + return nil, errors.New("can not find cluster with id '" + strconv.FormatInt(this.clusterId, 10) + "'") + } + if len(cluster.HealthCheck) == 0 || cluster.HealthCheck == "null" { + return nil, errors.New("health check config is not found") + } + + healthCheckConfig := &serverconfigs.HealthCheckConfig{} + err = json.Unmarshal([]byte(cluster.HealthCheck), healthCheckConfig) + if err != nil { + return nil, err + } + + results := []*HealthCheckResult{} + nodes, err := models.SharedNodeDAO.FindAllEnabledNodesWithClusterId(this.clusterId) + if err != nil { + return nil, err + } + for _, node := range nodes { + if node.IsOn != 1 { + continue + } + result := &HealthCheckResult{ + Node: node, + } + + addresses, err := models.SharedNodeIPAddressDAO.FindAllEnabledAddressesWithNode(int64(node.Id)) + if err != nil { + return nil, err + } + accessAddresses := []string{} + for _, addr := range addresses { + if addr.CanAccess == 1 { + accessAddresses = append(accessAddresses, addr.Ip) + } + } + if len(accessAddresses) == 0 { + result.Error = "no ip address can be used" + } else { + result.NodeAddr = accessAddresses[0] + } + + results = append(results, result) + } + + // 并行检查 + preparedResults := []*HealthCheckResult{} + for _, result := range results { + if len(result.NodeAddr) > 0 { + preparedResults = append(preparedResults, result) + } + } + + if len(preparedResults) == 0 { + return results, nil + } + + countResults := len(preparedResults) + queue := make(chan *HealthCheckResult, countResults) + for _, result := range preparedResults { + queue <- result + } + + countTries := types.Int(healthCheckConfig.CountTries) + if countTries > 10 { // 限定最多尝试10次 TODO 应该在管理界面提示用户 + countTries = 10 + } + if countTries < 1 { + countTries = 1 + } + + tryDelay := 1 * time.Second + if healthCheckConfig.TryDelay != nil { + tryDelay = healthCheckConfig.TryDelay.Duration() + + if tryDelay > 1*time.Minute { // 最多不能超过1分钟 TODO 应该在管理界面提示用户 + tryDelay = 1 * time.Minute + } + } + + countRoutines := 10 + wg := sync.WaitGroup{} + wg.Add(countResults) + for i := 0; i < countRoutines; i++ { + go func() { + for { + select { + case result := <-queue: + for i := 1; i <= countTries; i++ { + before := time.Now() + err := this.checkNode(healthCheckConfig, result) + result.CostMs = time.Since(before).Seconds() * 1000 + if err != nil { + result.Error = err.Error() + } + if result.IsOk { + break + } + if tryDelay > 0 { + time.Sleep(tryDelay) + } + } + wg.Done() + default: + return + } + } + }() + } + wg.Wait() + + return results, nil +} + +// 检查单个节点 +func (this *HealthCheckExecutor) checkNode(healthCheckConfig *serverconfigs.HealthCheckConfig, result *HealthCheckResult) error { + url := strings.ReplaceAll(healthCheckConfig.URL, "${host}", result.NodeAddr) + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return err + } + + timeout := 5 * time.Second + if healthCheckConfig.Timeout != nil { + timeout = healthCheckConfig.Timeout.Duration() + } + + client := &http.Client{ + Timeout: timeout, + Transport: &http.Transport{ + MaxIdleConns: 1, + MaxIdleConnsPerHost: 1, + MaxConnsPerHost: 1, + IdleConnTimeout: 2 * time.Minute, + ExpectContinueTimeout: 1 * time.Second, + TLSHandshakeTimeout: 0, + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + }, + } + defer func() { + client.CloseIdleConnections() + }() + + resp, err := client.Do(req) + if err != nil { + return err + } + _ = resp.Body.Close() + + if len(healthCheckConfig.StatusCodes) > 0 && !lists.ContainsInt(healthCheckConfig.StatusCodes, resp.StatusCode) { + result.Error = "invalid response status code '" + strconv.Itoa(resp.StatusCode) + "'" + return nil + } + + result.IsOk = true + + return nil +} diff --git a/internal/tasks/health_check_executor_test.go b/internal/tasks/health_check_executor_test.go new file mode 100644 index 00000000..9999a28d --- /dev/null +++ b/internal/tasks/health_check_executor_test.go @@ -0,0 +1,19 @@ +package tasks + +import ( + "github.com/iwind/TeaGo/dbs" + "testing" +) + +func TestHealthCheckExecutor_Run(t *testing.T) { + dbs.NotifyReady() + + executor := NewHealthCheckExecutor(10) + results, err := executor.Run() + if err != nil { + t.Fatal(err) + } + for _, result := range results { + t.Log(result.Node.Name, "addr:", result.NodeAddr, "isOk:", result.IsOk, "error:", result.Error) + } +} diff --git a/internal/tasks/health_check_result.go b/internal/tasks/health_check_result.go new file mode 100644 index 00000000..a8c8c4e0 --- /dev/null +++ b/internal/tasks/health_check_result.go @@ -0,0 +1,11 @@ +package tasks + +import "github.com/TeaOSLab/EdgeAPI/internal/db/models" + +type HealthCheckResult struct { + Node *models.Node + NodeAddr string + IsOk bool + Error string + CostMs float64 +}