From cb6919a0734cf637a02b75642fb4aff20fe23a74 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Wed, 1 Mar 2023 11:38:53 +0800 Subject: [PATCH] =?UTF-8?q?=E8=8A=82=E7=82=B9IP=E5=9C=B0=E5=9D=80=E5=8F=AF?= =?UTF-8?q?=E4=BB=A5=E8=AE=BE=E7=BD=AE=E4=B8=93=E5=B1=9E=E9=9B=86=E7=BE=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/db/models/dns/dnsutils/dns_utils.go | 2 +- internal/db/models/node_ip_address_dao.go | 26 ++- internal/db/models/node_ip_address_model.go | 40 ++--- .../db/models/node_ip_address_model_ext.go | 28 +++- internal/rpc/services/service_dns_domain.go | 9 +- internal/rpc/services/service_node.go | 7 +- .../rpc/services/service_node_ip_address.go | 155 +++++++++++++----- internal/tasks/dns_task_executor.go | 5 + 8 files changed, 200 insertions(+), 72 deletions(-) diff --git a/internal/db/models/dns/dnsutils/dns_utils.go b/internal/db/models/dns/dnsutils/dns_utils.go index 8e720b16..26171147 100644 --- a/internal/db/models/dns/dnsutils/dns_utils.go +++ b/internal/db/models/dns/dnsutils/dns_utils.go @@ -113,7 +113,7 @@ func CheckClusterDNS(tx *dbs.Tx, cluster *models.NodeCluster, checkNodeIssues bo // TODO 检查节点数量不能为0 for _, node := range nodes { - nodeId := int64(node.Id) + var nodeId = int64(node.Id) routeCodes, err := node.DNSRouteCodesForDomainId(domainId) if err != nil { diff --git a/internal/db/models/node_ip_address_dao.go b/internal/db/models/node_ip_address_dao.go index 3ab3f5b7..dd6944f6 100644 --- a/internal/db/models/node_ip_address_dao.go +++ b/internal/db/models/node_ip_address_dao.go @@ -121,7 +121,7 @@ func (this *NodeIPAddressDAO) FindAddressIsHealthy(tx *dbs.Tx, addressId int64) } // CreateAddress 创建IP地址 -func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, adminId int64, nodeId int64, role nodeconfigs.NodeRole, name string, ip string, canAccess bool, isUp bool, groupId int64) (addressId int64, err error) { +func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, adminId int64, nodeId int64, role nodeconfigs.NodeRole, name string, ip string, canAccess bool, isUp bool, groupId int64, clusterIds []int64) (addressId int64, err error) { if len(role) == 0 { role = nodeconfigs.NodeRoleNode } @@ -135,6 +135,17 @@ func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, adminId int64, nodeId in op.IsUp = isUp op.GroupId = groupId + // 集群 + if len(clusterIds) == 0 { + op.ClusterIds = "[]" + } else { + clusterIdsJSON, err := json.Marshal(clusterIds) + if err != nil { + return 0, err + } + op.ClusterIds = clusterIdsJSON + } + op.State = NodeIPAddressStateEnabled addressId, err = this.SaveInt64(tx, op) if err != nil { @@ -156,7 +167,7 @@ func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, adminId int64, nodeId in } // UpdateAddress 修改IP地址 -func (this *NodeIPAddressDAO) UpdateAddress(tx *dbs.Tx, adminId int64, addressId int64, name string, ip string, canAccess bool, isOn bool, isUp bool) (err error) { +func (this *NodeIPAddressDAO) UpdateAddress(tx *dbs.Tx, adminId int64, addressId int64, name string, ip string, canAccess bool, isOn bool, isUp bool, clusterIds []int64) (err error) { if addressId <= 0 { return errors.New("invalid addressId") } @@ -169,6 +180,17 @@ func (this *NodeIPAddressDAO) UpdateAddress(tx *dbs.Tx, adminId int64, addressId op.IsOn = isOn op.IsUp = isUp + // 集群 + if len(clusterIds) == 0 { + op.ClusterIds = "[]" + } else { + clusterIdsJSON, err := json.Marshal(clusterIds) + if err != nil { + return err + } + op.ClusterIds = clusterIdsJSON + } + op.State = NodeIPAddressStateEnabled // 恢复状态 err = this.Save(tx, op) if err != nil { diff --git a/internal/db/models/node_ip_address_model.go b/internal/db/models/node_ip_address_model.go index 433eac9c..d14d2420 100644 --- a/internal/db/models/node_ip_address_model.go +++ b/internal/db/models/node_ip_address_model.go @@ -6,6 +6,7 @@ import "github.com/iwind/TeaGo/dbs" type NodeIPAddress struct { Id uint32 `field:"id"` // ID NodeId uint32 `field:"nodeId"` // 节点ID + ClusterIds dbs.JSON `field:"clusterIds"` // 所属集群IDs Role string `field:"role"` // 节点角色 GroupId uint32 `field:"groupId"` // 所属分组ID Name string `field:"name"` // 名称 @@ -26,25 +27,26 @@ type NodeIPAddress struct { } type NodeIPAddressOperator struct { - Id interface{} // ID - NodeId interface{} // 节点ID - Role interface{} // 节点角色 - GroupId interface{} // 所属分组ID - Name interface{} // 名称 - Ip interface{} // IP地址 - Description interface{} // 描述 - State interface{} // 状态 - Order interface{} // 排序 - CanAccess interface{} // 是否可以访问 - IsOn interface{} // 是否启用 - IsUp interface{} // 是否上线 - IsHealthy interface{} // 是否健康 - Thresholds interface{} // 上线阈值 - Connectivity interface{} // 连通性状态 - BackupIP interface{} // 备用IP - BackupThresholdId interface{} // 触发备用IP的阈值 - CountUp interface{} // UP状态次数 - CountDown interface{} // DOWN状态次数 + Id any // ID + NodeId any // 节点ID + ClusterIds any // 所属集群IDs + Role any // 节点角色 + GroupId any // 所属分组ID + Name any // 名称 + Ip any // IP地址 + Description any // 描述 + State any // 状态 + Order any // 排序 + CanAccess any // 是否可以访问 + IsOn any // 是否启用 + IsUp any // 是否上线 + IsHealthy any // 是否健康 + Thresholds any // 上线阈值 + Connectivity any // 连通性状态 + BackupIP any // 备用IP + BackupThresholdId any // 触发备用IP的阈值 + CountUp any // UP状态次数 + CountDown any // DOWN状态次数 } func NewNodeIPAddressOperator() *NodeIPAddressOperator { diff --git a/internal/db/models/node_ip_address_model_ext.go b/internal/db/models/node_ip_address_model_ext.go index 05f85069..ea35bb40 100644 --- a/internal/db/models/node_ip_address_model_ext.go +++ b/internal/db/models/node_ip_address_model_ext.go @@ -4,6 +4,7 @@ import ( "encoding/json" "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" + "github.com/iwind/TeaGo/lists" ) // DecodeConnectivity 解析联通数值 @@ -12,7 +13,7 @@ func (this *NodeIPAddress) DecodeConnectivity() *nodeconfigs.Connectivity { if len(this.Connectivity) > 0 { err := json.Unmarshal(this.Connectivity, connectivity) if err != nil { - remotelogs.Error("NodeIPAddress.DecodeConnectivity", "decode failed: "+err.Error()) + remotelogs.Error("NodeIPAddress", "DecodeConnectivity(): decode failed: "+err.Error()) } } return connectivity @@ -33,7 +34,7 @@ func (this *NodeIPAddress) DecodeBackupIP() string { // 阈值是否存在 b, err := SharedNodeIPAddressThresholdDAO.ExistsEnabledThreshold(nil, int64(this.BackupThresholdId)) if err != nil { - remotelogs.Error("NodeIPAddress.DNSIP", "check enabled threshold failed: "+err.Error()) + remotelogs.Error("NodeIPAddress", "DecodeBackupIP(): check enabled threshold failed: "+err.Error()) } else { if b { return this.BackupIP @@ -42,3 +43,26 @@ func (this *NodeIPAddress) DecodeBackupIP() string { } return "" } + +// DecodeClusterIds 解析集群ID +func (this *NodeIPAddress) DecodeClusterIds() []int64 { + if IsNull(this.ClusterIds) { + return nil + } + + var clusterIds = []int64{} + err := json.Unmarshal(this.ClusterIds, &clusterIds) + if err != nil { + remotelogs.Error("NodeIPAddress", "DecodeClusterIds(): "+err.Error()) + } + return clusterIds +} + +// IsValidInCluster 检查在某个集群中是否有效 +func (this *NodeIPAddress) IsValidInCluster(clusterId int64) bool { + var clusterIds = this.DecodeClusterIds() + if len(clusterIds) == 0 { + return true + } + return lists.ContainsInt64(clusterIds, clusterId) +} diff --git a/internal/rpc/services/service_dns_domain.go b/internal/rpc/services/service_dns_domain.go index 2c2de1bc..d6cb7f85 100644 --- a/internal/rpc/services/service_dns_domain.go +++ b/internal/rpc/services/service_dns_domain.go @@ -493,7 +493,12 @@ func (this *DNSDomainService) findClusterDNSChanges(cluster *models.NodeCluster, } for _, route := range routeCodes { for _, ipAddress := range ipAddresses { - ip := ipAddress.DNSIP() + // 检查专属节点 + if !ipAddress.IsValidInCluster(clusterId) { + continue + } + + var ip = ipAddress.DNSIP() if len(ip) == 0 { continue } @@ -504,7 +509,7 @@ func (this *DNSDomainService) findClusterDNSChanges(cluster *models.NodeCluster, nodeKeys = append(nodeKeys, key) record, ok := nodeRecordMapping[key] if !ok { - recordType := dnstypes.RecordTypeA + var recordType = dnstypes.RecordTypeA if utils.IsIPv6(ip) { recordType = dnstypes.RecordTypeAAAA } diff --git a/internal/rpc/services/service_node.go b/internal/rpc/services/service_node.go index 61f675dc..cd593685 100644 --- a/internal/rpc/services/service_node.go +++ b/internal/rpc/services/service_node.go @@ -1347,6 +1347,11 @@ func (this *NodeService) FindAllEnabledNodesDNSWithNodeClusterId(ctx context.Con } for _, ipAddress := range ipAddresses { + // 检查专属节点 + if !ipAddress.IsValidInCluster(req.NodeClusterId) { + continue + } + var ip = ipAddress.DNSIP() if len(ip) == 0 { continue @@ -1539,7 +1544,7 @@ func (this *NodeService) UpdateNodeDNS(ctx context.Context, req *pb.UpdateNodeDN return nil, err } } else { - _, err = models.SharedNodeIPAddressDAO.CreateAddress(tx, adminId, req.NodeId, nodeconfigs.NodeRoleNode, "DNS IP", req.IpAddr, true, true, 0) + _, err = models.SharedNodeIPAddressDAO.CreateAddress(tx, adminId, req.NodeId, nodeconfigs.NodeRoleNode, "DNS IP", req.IpAddr, true, true, 0, nil) if err != nil { return nil, err } diff --git a/internal/rpc/services/service_node_ip_address.go b/internal/rpc/services/service_node_ip_address.go index 9f9f6a04..a73f7669 100644 --- a/internal/rpc/services/service_node_ip_address.go +++ b/internal/rpc/services/service_node_ip_address.go @@ -3,6 +3,8 @@ package services import ( "context" "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeAPI/internal/utils" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/iwind/TeaGo/types" ) @@ -21,7 +23,7 @@ func (this *NodeIPAddressService) CreateNodeIPAddress(ctx context.Context, req * var tx = this.NullTx() - addressId, err := models.SharedNodeIPAddressDAO.CreateAddress(tx, adminId, req.NodeId, req.Role, req.Name, req.Ip, req.CanAccess, req.IsUp, 0) + addressId, err := models.SharedNodeIPAddressDAO.CreateAddress(tx, adminId, req.NodeId, req.Role, req.Name, req.Ip, req.CanAccess, req.IsUp, 0, req.NodeClusterIds) if err != nil { return nil, err } @@ -47,7 +49,7 @@ func (this *NodeIPAddressService) CreateNodeIPAddresses(ctx context.Context, req var result = []int64{} for _, ip := range req.IpList { - addressId, err := models.SharedNodeIPAddressDAO.CreateAddress(tx, adminId, req.NodeId, req.Role, req.Name, ip, req.CanAccess, req.IsUp, groupId) + addressId, err := models.SharedNodeIPAddressDAO.CreateAddress(tx, adminId, req.NodeId, req.Role, req.Name, ip, req.CanAccess, req.IsUp, groupId, req.NodeClusterIds) if err != nil { return nil, err } @@ -67,7 +69,7 @@ func (this *NodeIPAddressService) UpdateNodeIPAddress(ctx context.Context, req * var tx = this.NullTx() - err = models.SharedNodeIPAddressDAO.UpdateAddress(tx, adminId, req.NodeIPAddressId, req.Name, req.Ip, req.CanAccess, req.IsOn, req.IsUp) + err = models.SharedNodeIPAddressDAO.UpdateAddress(tx, adminId, req.NodeIPAddressId, req.Name, req.Ip, req.CanAccess, req.IsOn, req.IsUp, req.ClusterIds) if err != nil { return nil, err } @@ -143,27 +145,48 @@ func (this *NodeIPAddressService) FindEnabledNodeIPAddress(ctx context.Context, if err != nil { return nil, err } + if address == nil { + return &pb.FindEnabledNodeIPAddressResponse{ + NodeIPAddress: nil, + }, nil + } - var result *pb.NodeIPAddress = nil - if address != nil { - result = &pb.NodeIPAddress{ - Id: int64(address.Id), - NodeId: int64(address.NodeId), - Role: address.Role, - Name: address.Name, - Ip: address.Ip, - Description: address.Description, - State: int64(address.State), - Order: int64(address.Order), - CanAccess: address.CanAccess, - IsOn: address.IsOn, - IsUp: address.IsUp, - IsHealthy: address.IsHealthy, - BackupIP: address.DecodeBackupIP(), + // CDN集群 + var pbNodeClusters = []*pb.NodeCluster{} + if len(address.ClusterIds) > 0 { + if address.Role == nodeconfigs.NodeRoleNode { // 边缘节点 + var clusterIds = address.DecodeClusterIds() + for _, clusterId := range clusterIds { + cluster, err := models.SharedNodeClusterDAO.FindClusterBasicInfo(tx, clusterId, nil) + if err != nil { + return nil, err + } + if cluster != nil { + pbNodeClusters = append(pbNodeClusters, &pb.NodeCluster{ + Id: int64(cluster.Id), + Name: cluster.Name, + }) + } + } } } - return &pb.FindEnabledNodeIPAddressResponse{NodeIPAddress: result}, nil + return &pb.FindEnabledNodeIPAddressResponse{NodeIPAddress: &pb.NodeIPAddress{ + Id: int64(address.Id), + NodeId: int64(address.NodeId), + Role: address.Role, + Name: address.Name, + Ip: address.Ip, + Description: address.Description, + State: int64(address.State), + Order: int64(address.Order), + CanAccess: address.CanAccess, + IsOn: address.IsOn, + IsUp: address.IsUp, + IsHealthy: address.IsHealthy, + BackupIP: address.DecodeBackupIP(), + NodeClusters: pbNodeClusters, + }}, nil } // FindAllEnabledNodeIPAddressesWithNodeId 查找节点的所有地址 @@ -181,22 +204,44 @@ func (this *NodeIPAddressService) FindAllEnabledNodeIPAddressesWithNodeId(ctx co return nil, err } - result := []*pb.NodeIPAddress{} + var result = []*pb.NodeIPAddress{} + var cacheMap = utils.NewCacheMap() for _, address := range addresses { + // 集群 + var pbNodeClusters = []*pb.NodeCluster{} + var clusterIds = address.DecodeClusterIds() + if len(clusterIds) > 0 { + if address.Role == nodeconfigs.NodeRoleNode { // 边缘节点 + for _, clusterId := range clusterIds { + nodeCluster, err := models.SharedNodeClusterDAO.FindClusterBasicInfo(tx, clusterId, cacheMap) + if err != nil { + return nil, err + } + if nodeCluster != nil { + pbNodeClusters = append(pbNodeClusters, &pb.NodeCluster{ + Id: int64(nodeCluster.Id), + Name: nodeCluster.Name, + }) + } + } + } + } + result = append(result, &pb.NodeIPAddress{ - Id: int64(address.Id), - NodeId: int64(address.NodeId), - Role: address.Role, - Name: address.Name, - Ip: address.Ip, - Description: address.Description, - State: int64(address.State), - Order: int64(address.Order), - CanAccess: address.CanAccess, - IsOn: address.IsOn, - IsUp: address.IsUp, - IsHealthy: address.IsHealthy, - BackupIP: address.DecodeBackupIP(), + Id: int64(address.Id), + NodeId: int64(address.NodeId), + Role: address.Role, + Name: address.Name, + Ip: address.Ip, + Description: address.Description, + State: int64(address.State), + Order: int64(address.Order), + CanAccess: address.CanAccess, + IsOn: address.IsOn, + IsUp: address.IsUp, + IsHealthy: address.IsHealthy, + BackupIP: address.DecodeBackupIP(), + NodeClusters: pbNodeClusters, }) } @@ -236,18 +281,38 @@ func (this *NodeIPAddressService) ListEnabledNodeIPAddresses(ctx context.Context var pbAddrs = []*pb.NodeIPAddress{} for _, addr := range addresses { + var clusterIds = addr.DecodeClusterIds() + var pbNodeClusters = []*pb.NodeCluster{} + if len(clusterIds) > 0 { + if addr.Role == nodeconfigs.NodeRoleNode { // 边缘节点 + for _, clusterId := range clusterIds { + cluster, err := models.SharedNodeClusterDAO.FindClusterBasicInfo(tx, clusterId, nil) + if err != nil { + return nil, err + } + if cluster != nil { + pbNodeClusters = append(pbNodeClusters, &pb.NodeCluster{ + Id: int64(cluster.Id), + Name: cluster.Name, + }) + } + } + } + } + pbAddrs = append(pbAddrs, &pb.NodeIPAddress{ - Id: int64(addr.Id), - NodeId: int64(addr.NodeId), - Role: addr.Role, - Name: addr.Name, - Ip: addr.Ip, - Description: addr.Description, - CanAccess: addr.CanAccess, - IsOn: addr.IsOn, - IsUp: addr.IsUp, - IsHealthy: addr.IsHealthy, - BackupIP: addr.DecodeBackupIP(), + Id: int64(addr.Id), + NodeId: int64(addr.NodeId), + Role: addr.Role, + Name: addr.Name, + Ip: addr.Ip, + Description: addr.Description, + CanAccess: addr.CanAccess, + IsOn: addr.IsOn, + IsUp: addr.IsUp, + IsHealthy: addr.IsHealthy, + BackupIP: addr.DecodeBackupIP(), + NodeClusters: pbNodeClusters, }) } return &pb.ListEnabledNodeIPAddressesResponse{NodeIPAddresses: pbAddrs}, nil diff --git a/internal/tasks/dns_task_executor.go b/internal/tasks/dns_task_executor.go index 67ec4271..2bd51ded 100644 --- a/internal/tasks/dns_task_executor.go +++ b/internal/tasks/dns_task_executor.go @@ -387,6 +387,11 @@ func (this *DNSTaskExecutor) doCluster(taskId int64, clusterId int64) error { continue } for _, ipAddress := range ipAddresses { + // 检查专属节点 + if !ipAddress.IsValidInCluster(clusterId) { + continue + } + var ip = ipAddress.DNSIP() if len(ip) == 0 || !ipAddress.CanAccess || !ipAddress.IsUp || !ipAddress.IsOn { continue