diff --git a/internal/db/models/node_cluster_dao.go b/internal/db/models/node_cluster_dao.go index 84c832f4..c90fb5b7 100644 --- a/internal/db/models/node_cluster_dao.go +++ b/internal/db/models/node_cluster_dao.go @@ -1,10 +1,12 @@ package models import ( + "encoding/json" "errors" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/rands" "github.com/iwind/TeaGo/types" ) @@ -52,7 +54,7 @@ func (this *NodeClusterDAO) DisableNodeCluster(id int64) error { return err } -// 查找启用中的条目 +// 查找集群 func (this *NodeClusterDAO) FindEnabledNodeCluster(id int64) (*NodeCluster, error) { result, err := this.Query(). Pk(id). @@ -64,6 +66,16 @@ func (this *NodeClusterDAO) FindEnabledNodeCluster(id int64) (*NodeCluster, erro return result.(*NodeCluster), err } +// 根据UniqueId获取ID +// TODO 增加缓存 +func (this *NodeClusterDAO) FindEnabledClusterIdWithUniqueId(uniqueId string) (int64, error) { + return this.Query(). + State(NodeClusterStateEnabled). + Attr("uniqueId", uniqueId). + ResultPk(). + FindInt64Col(0) +} + // 根据主键查找名称 func (this *NodeClusterDAO) FindNodeClusterName(id int64) (string, error) { return this.Query(). @@ -85,12 +97,25 @@ func (this *NodeClusterDAO) FindAllEnableClusters() (result []*NodeCluster, err // 创建集群 func (this *NodeClusterDAO) CreateCluster(name string, grantId int64, installDir string) (clusterId int64, err error) { + uniqueId, err := this.genUniqueId() + if err != nil { + return 0, err + } + + secret := rands.String(32) + err = SharedApiTokenDAO.CreateAPIToken(uniqueId, secret, NodeRoleCluster) + if err != nil { + return 0, err + } + op := NewNodeClusterOperator() op.Name = name op.GrantId = grantId op.InstallDir = installDir op.UseAllAPINodes = 1 op.ApiNodes = "[]" + op.UniqueId = uniqueId + op.Secret = secret op.State = NodeClusterStateEnabled _, err = this.Save(op) if err != nil { @@ -128,6 +153,80 @@ func (this *NodeClusterDAO) ListEnabledClusters(offset, size int64) (result []*N Offset(offset). Limit(size). Slice(&result). + DescPk(). FindAll() return } + +// 查找所有API节点地址 +func (this *NodeClusterDAO) FindAllAPINodeAddrsWithCluster(clusterId int64) (result []string, err error) { + one, err := this.Query(). + Pk(clusterId). + Result("useAllAPINodes", "apiNodes"). + Find() + if err != nil { + return nil, err + } + if one == nil { + return nil, nil + } + cluster := one.(*NodeCluster) + if cluster.UseAllAPINodes == 1 { + apiNodes, err := SharedAPINodeDAO.FindAllEnabledAPINodes() + if err != nil { + return nil, err + } + for _, apiNode := range apiNodes { + if apiNode.IsOn != 1 { + continue + } + addrs, err := apiNode.DecodeAccessAddrStrings() + if err != nil { + return nil, err + } + result = append(result, addrs...) + } + return result, nil + } + + apiNodeIds := []int64{} + if !IsNotNull(cluster.ApiNodes) { + return + } + err = json.Unmarshal([]byte(cluster.ApiNodes), &apiNodeIds) + if err != nil { + return nil, err + } + for _, apiNodeId := range apiNodeIds { + apiNode, err := SharedAPINodeDAO.FindEnabledAPINode(apiNodeId) + if err != nil { + return nil, err + } + if apiNode == nil || apiNode.IsOn != 1 { + continue + } + addrs, err := apiNode.DecodeAccessAddrStrings() + if err != nil { + return nil, err + } + result = append(result, addrs...) + } + return result, nil +} + +// 生成唯一ID +func (this *NodeClusterDAO) genUniqueId() (string, error) { + for { + uniqueId := rands.HexString(32) + ok, err := this.Query(). + Attr("uniqueId", uniqueId). + Exist() + if err != nil { + return "", err + } + if ok { + continue + } + return uniqueId, nil + } +} diff --git a/internal/db/models/node_cluster_model.go b/internal/db/models/node_cluster_model.go index 1b53597a..9334c37b 100644 --- a/internal/db/models/node_cluster_model.go +++ b/internal/db/models/node_cluster_model.go @@ -13,6 +13,9 @@ type NodeCluster struct { CreatedAt uint64 `field:"createdAt"` // 创建时间 GrantId uint32 `field:"grantId"` // 默认认证方式 State uint8 `field:"state"` // 状态 + AutoRegister uint8 `field:"autoRegister"` // 是否开启自动注册 + UniqueId string `field:"uniqueId"` // 唯一ID + Secret string `field:"secret"` // 密钥 } type NodeClusterOperator struct { @@ -27,6 +30,9 @@ type NodeClusterOperator struct { CreatedAt interface{} // 创建时间 GrantId interface{} // 默认认证方式 State interface{} // 状态 + AutoRegister interface{} // 是否开启自动注册 + UniqueId interface{} // 唯一ID + Secret interface{} // 密钥 } func NewNodeClusterOperator() *NodeClusterOperator { diff --git a/internal/db/models/node_roles.go b/internal/db/models/node_roles.go index 8f817e75..22860244 100644 --- a/internal/db/models/node_roles.go +++ b/internal/db/models/node_roles.go @@ -12,4 +12,5 @@ const ( NodeRoleDNS NodeRole = "dns" NodeRoleMonitor NodeRole = "monitor" NodeRoleNode NodeRole = "node" + NodeRoleCluster NodeRole = "cluster" ) diff --git a/internal/rpc/services/service_node.go b/internal/rpc/services/service_node.go index 9ac500d4..1aacd404 100644 --- a/internal/rpc/services/service_node.go +++ b/internal/rpc/services/service_node.go @@ -42,13 +42,52 @@ func (this *NodeService) CreateNode(ctx context.Context, req *pb.CreateNodeReque }, nil } +// 注册集群节点 +func (this *NodeService) RegisterClusterNode(ctx context.Context, req *pb.RegisterClusterNodeRequest) (*pb.RegisterClusterNodeResponse, error) { + // 校验请求 + _, clusterId, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeCluster) + if err != nil { + return nil, err + } + + nodeId, err := models.SharedNodeDAO.CreateNode(req.Name, clusterId) + if err != nil { + return nil, err + } + err = models.SharedNodeDAO.UpdateNodeIsInstalled(nodeId, true) + if err != nil { + return nil, err + } + + node, err := models.SharedNodeDAO.FindEnabledNode(nodeId) + if err != nil { + return nil, err + } + if node == nil { + return nil, errors.New("can not find node after creating") + } + + // 获取集群可以使用的所有API节点 + apiAddrs, err := models.SharedNodeClusterDAO.FindAllAPINodeAddrsWithCluster(clusterId) + if err != nil { + return nil, err + } + + return &pb.RegisterClusterNodeResponse{ + UniqueId: node.UniqueId, + Secret: node.Secret, + Endpoints: apiAddrs, + }, nil +} + // 计算节点数量 func (this *NodeService) CountAllEnabledNodes(ctx context.Context, req *pb.CountAllEnabledNodesRequest) (*pb.CountAllEnabledNodesResponse, error) { - _ = req + // 校验请求 _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin) if err != nil { return nil, err } + count, err := models.SharedNodeDAO.CountAllEnabledNodes() if err != nil { return nil, err diff --git a/internal/rpc/services/service_node_cluster.go b/internal/rpc/services/service_node_cluster.go index b050e1f6..7b9636ce 100644 --- a/internal/rpc/services/service_node_cluster.go +++ b/internal/rpc/services/service_node_cluster.go @@ -2,9 +2,12 @@ package services import ( "context" + "encoding/json" "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeAPI/internal/errors" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "strconv" ) type NodeClusterService struct { @@ -77,13 +80,66 @@ func (this *NodeClusterService) FindEnabledNodeCluster(ctx context.Context, req CreatedAt: int64(cluster.CreatedAt), InstallDir: cluster.InstallDir, GrantId: int64(cluster.GrantId), + UniqueId: cluster.UniqueId, + Secret: cluster.Secret, }}, nil } +// 查找集群的API节点信息 +func (this *NodeClusterService) FindAPINodesWithNodeCluster(ctx context.Context, req *pb.FindAPINodesWithNodeClusterRequest) (*pb.FindAPINodesWithNodeClusterResponse, error) { + // 校验请求 + _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin) + if err != nil { + return nil, err + } + + cluster, err := models.SharedNodeClusterDAO.FindEnabledNodeCluster(req.ClusterId) + if err != nil { + return nil, err + } + if cluster == nil { + return nil, errors.New("can not find cluster with id '" + strconv.FormatInt(req.ClusterId, 10) + "'") + } + + result := &pb.FindAPINodesWithNodeClusterResponse{} + result.UseAllAPINodes = cluster.UseAllAPINodes == 1 + + apiNodeIds := []int64{} + if len(cluster.ApiNodes) > 0 && cluster.ApiNodes != "null" { + err = json.Unmarshal([]byte(cluster.ApiNodes), &apiNodeIds) + if err != nil { + return nil, err + } + if len(apiNodeIds) > 0 { + apiNodes := []*pb.APINode{} + for _, apiNodeId := range apiNodeIds { + apiNode, err := models.SharedAPINodeDAO.FindEnabledAPINode(apiNodeId) + if err != nil { + return nil, err + } + apiNodeAddrs, err := apiNode.DecodeAccessAddrStrings() + if err != nil { + return nil, err + } + apiNodes = append(apiNodes, &pb.APINode{ + Id: int64(apiNode.Id), + IsOn: apiNode.IsOn == 1, + ClusterId: int64(apiNode.ClusterId), + Name: apiNode.Name, + Description: apiNode.Description, + AccessAddrs: apiNodeAddrs, + }) + } + result.ApiNodes = apiNodes + } + } + + return result, nil +} + // 查找所有可用的集群 func (this *NodeClusterService) FindAllEnabledNodeClusters(ctx context.Context, req *pb.FindAllEnabledNodeClustersRequest) (*pb.FindAllEnabledNodeClustersResponse, error) { - _ = req - + // 校验请求 _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin) if err != nil { return nil, err @@ -100,6 +156,8 @@ func (this *NodeClusterService) FindAllEnabledNodeClusters(ctx context.Context, Id: int64(cluster.Id), Name: cluster.Name, CreatedAt: int64(cluster.CreatedAt), + UniqueId: cluster.UniqueId, + Secret: cluster.Secret, }) } @@ -137,6 +195,8 @@ func (this *NodeClusterService) FindAllChangedNodeClusters(ctx context.Context, Id: int64(cluster.Id), Name: cluster.Name, CreatedAt: int64(cluster.CreatedAt), + UniqueId: cluster.UniqueId, + Secret: cluster.Secret, }) } return &pb.FindAllChangedNodeClustersResponse{Clusters: result}, nil @@ -177,6 +237,8 @@ func (this *NodeClusterService) ListEnabledNodeClusters(ctx context.Context, req CreatedAt: int64(cluster.CreatedAt), GrantId: int64(cluster.GrantId), InstallDir: cluster.InstallDir, + UniqueId: cluster.UniqueId, + Secret: cluster.Secret, }) } diff --git a/internal/rpc/utils/utils.go b/internal/rpc/utils/utils.go index cce49ae6..4aa3c3e2 100644 --- a/internal/rpc/utils/utils.go +++ b/internal/rpc/utils/utils.go @@ -24,6 +24,7 @@ const ( UserTypeUser = "user" UserTypeProvider = "provider" UserTypeNode = "node" + UserTypeCluster = "cluster" UserTypeMonitor = "monitor" UserTypeStat = "stat" UserTypeDNS = "dns" @@ -52,7 +53,7 @@ func ValidateRequest(ctx context.Context, userTypes ...UserType) (userType UserT } nodeUserId := int64(0) if apiToken == nil { - return UserTypeNode, 0, errors.New("context: invalid api token") + return UserTypeNode, 0, errors.New("context: can not find api token for node '" + nodeId + "'") } tokens := md.Get("token") @@ -106,6 +107,15 @@ func ValidateRequest(ctx context.Context, userTypes ...UserType) (userType UserT return UserTypeNode, 0, errors.New("context: not found node with id '" + nodeId + "'") } nodeUserId = nodeIntId + case UserTypeCluster: + clusterId, err := models.SharedNodeClusterDAO.FindEnabledClusterIdWithUniqueId(nodeId) + if err != nil { + return UserTypeCluster, 0, errors.New("context: " + err.Error()) + } + if clusterId <= 0 { + return UserTypeCluster, 0, errors.New("context: not found cluster with id '" + nodeId + "'") + } + nodeUserId = clusterId } if nodeUserId > 0 {