mirror of
				https://github.com/TeaOSLab/EdgeAPI.git
				synced 2025-11-04 07:50:25 +08:00 
			
		
		
		
	实现完整的集群、域名同步
This commit is contained in:
		@@ -7,6 +7,7 @@ import (
 | 
			
		||||
	"github.com/iwind/TeaGo/Tea"
 | 
			
		||||
	"github.com/iwind/TeaGo/dbs"
 | 
			
		||||
	"github.com/iwind/TeaGo/types"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
@@ -142,6 +143,7 @@ func (this *DNSDomainDAO) UpdateDomainRecords(domainId int64, recordsJSON []byte
 | 
			
		||||
	op := NewDNSDomainOperator()
 | 
			
		||||
	op.Id = domainId
 | 
			
		||||
	op.Records = recordsJSON
 | 
			
		||||
	op.DataUpdatedAt = time.Now().Unix()
 | 
			
		||||
	_, err := this.Save(op)
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
@@ -154,6 +156,7 @@ func (this *DNSDomainDAO) UpdateDomainRoutes(domainId int64, routesJSON []byte)
 | 
			
		||||
	op := NewDNSDomainOperator()
 | 
			
		||||
	op.Id = domainId
 | 
			
		||||
	op.Routes = routesJSON
 | 
			
		||||
	op.DataUpdatedAt = time.Now().Unix()
 | 
			
		||||
	_, err := this.Save(op)
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -1 +1,43 @@
 | 
			
		||||
package models
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeAPI/internal/dnsclients"
 | 
			
		||||
	"github.com/iwind/TeaGo/lists"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// 获取所有的线路
 | 
			
		||||
func (this *DNSDomain) DecodeRoutes() ([]string, error) {
 | 
			
		||||
	if len(this.Routes) == 0 || this.Routes == "null" {
 | 
			
		||||
		return nil, nil
 | 
			
		||||
	}
 | 
			
		||||
	result := []string{}
 | 
			
		||||
	err := json.Unmarshal([]byte(this.Routes), &result)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return result, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 检查是否包含某个线路
 | 
			
		||||
func (this *DNSDomain) ContainsRoute(route string) (bool, error) {
 | 
			
		||||
	routes, err := this.DecodeRoutes()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return false, err
 | 
			
		||||
	}
 | 
			
		||||
	return lists.ContainsString(routes, route), nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 获取所有的记录
 | 
			
		||||
func (this *DNSDomain) DecodeRecords() ([]*dnsclients.Record, error) {
 | 
			
		||||
	records := this.Records
 | 
			
		||||
	if len(records) == 0 || records == "null" {
 | 
			
		||||
		return nil, nil
 | 
			
		||||
	}
 | 
			
		||||
	result := []*dnsclients.Record{}
 | 
			
		||||
	err := json.Unmarshal([]byte(records), &result)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return result, nil
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -3,6 +3,8 @@ package models
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
 | 
			
		||||
	_ "github.com/go-sql-driver/mysql"
 | 
			
		||||
	"github.com/iwind/TeaGo/Tea"
 | 
			
		||||
@@ -319,7 +321,18 @@ func (this *NodeClusterDAO) FindAllEnabledClustersWithDNSDomainId(dnsDomainId in
 | 
			
		||||
	_, err = this.Query().
 | 
			
		||||
		State(NodeClusterStateEnabled).
 | 
			
		||||
		Attr("dnsDomainId", dnsDomainId).
 | 
			
		||||
		Result("id", "name", "dnsName").
 | 
			
		||||
		Result("id", "name", "dnsName", "dnsDomainId").
 | 
			
		||||
		Slice(&result).
 | 
			
		||||
		FindAll()
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 查询已经设置了域名的集群
 | 
			
		||||
func (this *NodeClusterDAO) FindAllEnabledClustersHaveDNSDomain() (result []*NodeCluster, err error) {
 | 
			
		||||
	_, err = this.Query().
 | 
			
		||||
		State(NodeClusterStateEnabled).
 | 
			
		||||
		Gt("dnsDomainId", 0).
 | 
			
		||||
		Result("id", "name", "dnsName", "dnsDomainId").
 | 
			
		||||
		Slice(&result).
 | 
			
		||||
		FindAll()
 | 
			
		||||
	return
 | 
			
		||||
@@ -337,7 +350,7 @@ func (this *NodeClusterDAO) FindClusterGrantId(clusterId int64) (int64, error) {
 | 
			
		||||
func (this *NodeClusterDAO) FindClusterDNSInfo(clusterId int64) (*NodeCluster, error) {
 | 
			
		||||
	one, err := this.Query().
 | 
			
		||||
		Pk(clusterId).
 | 
			
		||||
		Result("dnsName", "dnsDomainId").
 | 
			
		||||
		Result("id", "name", "dnsName", "dnsDomainId").
 | 
			
		||||
		Find()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
@@ -371,6 +384,116 @@ func (this *NodeClusterDAO) UpdateClusterDNS(clusterId int64, dnsName string, dn
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 检查集群的DNS问题
 | 
			
		||||
func (this *NodeClusterDAO) CheckClusterDNS(cluster *NodeCluster) (issues []*pb.DNSIssue, err error) {
 | 
			
		||||
	clusterId := int64(cluster.Id)
 | 
			
		||||
	domainId := int64(cluster.DnsDomainId)
 | 
			
		||||
 | 
			
		||||
	// 检查域名
 | 
			
		||||
	domain, err := SharedDNSDomainDAO.FindEnabledDNSDomain(domainId)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	if domain == nil {
 | 
			
		||||
		issues = append(issues, &pb.DNSIssue{
 | 
			
		||||
			Target:      cluster.Name,
 | 
			
		||||
			TargetId:    clusterId,
 | 
			
		||||
			Type:        "cluster",
 | 
			
		||||
			Description: "域名选择错误,需要重新选择",
 | 
			
		||||
			Params:      nil,
 | 
			
		||||
		})
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 检查二级域名
 | 
			
		||||
	if len(cluster.DnsName) == 0 {
 | 
			
		||||
		issues = append(issues, &pb.DNSIssue{
 | 
			
		||||
			Target:      cluster.Name,
 | 
			
		||||
			TargetId:    clusterId,
 | 
			
		||||
			Type:        "cluster",
 | 
			
		||||
			Description: "没有设置二级域名",
 | 
			
		||||
			Params:      nil,
 | 
			
		||||
		})
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// TODO 检查域名格式
 | 
			
		||||
 | 
			
		||||
	// TODO 检查域名是否已解析
 | 
			
		||||
 | 
			
		||||
	// 检查节点
 | 
			
		||||
	nodes, err := SharedNodeDAO.FindAllEnabledNodesDNSWithClusterId(clusterId)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// TODO 检查节点数量不能为0
 | 
			
		||||
 | 
			
		||||
	for _, node := range nodes {
 | 
			
		||||
		nodeId := int64(node.Id)
 | 
			
		||||
 | 
			
		||||
		route, err := node.DNSRoute(domainId)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		if len(route) == 0 {
 | 
			
		||||
			issues = append(issues, &pb.DNSIssue{
 | 
			
		||||
				Target:      node.Name,
 | 
			
		||||
				TargetId:    nodeId,
 | 
			
		||||
				Type:        "node",
 | 
			
		||||
				Description: "没有选择节点所属线路",
 | 
			
		||||
				Params: map[string]string{
 | 
			
		||||
					"clusterName": cluster.Name,
 | 
			
		||||
					"clusterId":   numberutils.FormatInt64(clusterId),
 | 
			
		||||
				},
 | 
			
		||||
			})
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// 检查线路是否在已有线路中
 | 
			
		||||
		routeOk, err := domain.ContainsRoute(route)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		if !routeOk {
 | 
			
		||||
			issues = append(issues, &pb.DNSIssue{
 | 
			
		||||
				Target:      node.Name,
 | 
			
		||||
				TargetId:    nodeId,
 | 
			
		||||
				Type:        "node",
 | 
			
		||||
				Description: "线路已经失效,请重新选择",
 | 
			
		||||
				Params: map[string]string{
 | 
			
		||||
					"clusterName": cluster.Name,
 | 
			
		||||
					"clusterId":   numberutils.FormatInt64(clusterId),
 | 
			
		||||
				},
 | 
			
		||||
			})
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// 检查IP地址
 | 
			
		||||
		ipAddr, err := SharedNodeIPAddressDAO.FindFirstNodeIPAddress(nodeId)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		if len(ipAddr) == 0 {
 | 
			
		||||
			issues = append(issues, &pb.DNSIssue{
 | 
			
		||||
				Target:      node.Name,
 | 
			
		||||
				TargetId:    nodeId,
 | 
			
		||||
				Type:        "node",
 | 
			
		||||
				Description: "没有设置IP地址",
 | 
			
		||||
				Params: map[string]string{
 | 
			
		||||
					"clusterName": cluster.Name,
 | 
			
		||||
					"clusterId":   numberutils.FormatInt64(clusterId),
 | 
			
		||||
				},
 | 
			
		||||
			})
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// TODO 检查是否有解析记录
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 生成唯一ID
 | 
			
		||||
func (this *NodeClusterDAO) genUniqueId() (string, error) {
 | 
			
		||||
	for {
 | 
			
		||||
 
 | 
			
		||||
@@ -604,7 +604,7 @@ func (this *NodeDAO) FindAllEnabledNodesDNSWithClusterId(clusterId int64) (resul
 | 
			
		||||
		State(NodeStateEnabled).
 | 
			
		||||
		Attr("clusterId", clusterId).
 | 
			
		||||
		Attr("isOn", true).
 | 
			
		||||
		Result("id", "name", "dnsRoutes").
 | 
			
		||||
		Result("id", "name", "dnsRoutes", "isOn").
 | 
			
		||||
		DescPk().
 | 
			
		||||
		Slice(&result).
 | 
			
		||||
		FindAll()
 | 
			
		||||
 
 | 
			
		||||
@@ -828,26 +828,6 @@ func (this *ServerDAO) FindAllServerDNSNamesWithDNSDomainId(dnsDomainId int64) (
 | 
			
		||||
	return result, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 查找DNS名称为空的所有服务
 | 
			
		||||
func (this *ServerDAO) FindAllServersToFixWithDNSDomainId(dnsDomainId int64) (result []*Server, err error) {
 | 
			
		||||
	clusterIds, err := SharedNodeClusterDAO.FindAllEnabledClusterIdsWithDNSDomainId(dnsDomainId)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	if len(clusterIds) == 0 {
 | 
			
		||||
		return nil, nil
 | 
			
		||||
	}
 | 
			
		||||
	_, err = this.Query().
 | 
			
		||||
		State(ServerStateEnabled).
 | 
			
		||||
		Attr("isOn", true).
 | 
			
		||||
		Attr("clusterId", clusterIds).
 | 
			
		||||
		Result("dnsName").
 | 
			
		||||
		Reuse(false). // 避免因为IN语句造成内存占用过多
 | 
			
		||||
		Slice(&result).
 | 
			
		||||
		FindAll()
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 获取某个集群下的服务DNS信息
 | 
			
		||||
func (this *ServerDAO) FindAllServersDNSWithClusterId(clusterId int64) (result []*Server, err error) {
 | 
			
		||||
	_, err = this.Query().
 | 
			
		||||
 
 | 
			
		||||
@@ -202,3 +202,8 @@ func (this *DNSPodProvider) post(path string, params map[string]string) (maps.Ma
 | 
			
		||||
 | 
			
		||||
	return m, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 默认线路
 | 
			
		||||
func (this *DNSPodProvider) DefaultRoute() string {
 | 
			
		||||
	return "默认"
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -21,4 +21,7 @@ type ProviderInterface interface {
 | 
			
		||||
 | 
			
		||||
	// 删除记录
 | 
			
		||||
	DeleteRecord(domain string, record *Record) error
 | 
			
		||||
 | 
			
		||||
	// 默认线路
 | 
			
		||||
	DefaultRoute() string
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -183,6 +183,7 @@ func (this *APINode) listenRPC(listener net.Listener, tlsConfig *tls.Config) err
 | 
			
		||||
	pb.RegisterLogServiceServer(rpcServer, &services.LogService{})
 | 
			
		||||
	pb.RegisterDNSProviderServiceServer(rpcServer, &services.DNSProviderService{})
 | 
			
		||||
	pb.RegisterDNSDomainServiceServer(rpcServer, &services.DNSDomainService{})
 | 
			
		||||
	pb.RegisterDNSServiceServer(rpcServer, &services.DNSService{})
 | 
			
		||||
	err := rpcServer.Serve(listener)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return errors.New("[API]start rpc failed: " + err.Error())
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										40
									
								
								internal/rpc/services/service_dns.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										40
									
								
								internal/rpc/services/service_dns.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,40 @@
 | 
			
		||||
package services
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeAPI/internal/db/models"
 | 
			
		||||
	rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// DNS相关服务
 | 
			
		||||
type DNSService struct {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 查找问题
 | 
			
		||||
func (this *DNSService) FindAllDNSIssues(ctx context.Context, req *pb.FindAllDNSIssuesRequest) (*pb.FindAllDNSIssuesResponse, error) {
 | 
			
		||||
	// 校验请求
 | 
			
		||||
	_, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	result := []*pb.DNSIssue{}
 | 
			
		||||
 | 
			
		||||
	clusters, err := models.SharedNodeClusterDAO.FindAllEnabledClustersHaveDNSDomain()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	for _, cluster := range clusters {
 | 
			
		||||
		issues, err := models.SharedNodeClusterDAO.CheckClusterDNS(cluster)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		if len(issues) > 0 {
 | 
			
		||||
			result = append(result, issues...)
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &pb.FindAllDNSIssuesResponse{Issues: result}, nil
 | 
			
		||||
}
 | 
			
		||||
@@ -5,8 +5,12 @@ import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeAPI/internal/db/models"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeAPI/internal/dnsclients"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeAPI/internal/errors"
 | 
			
		||||
	rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
 | 
			
		||||
	"github.com/iwind/TeaGo/lists"
 | 
			
		||||
	"github.com/iwind/TeaGo/logs"
 | 
			
		||||
	"github.com/iwind/TeaGo/maps"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@@ -176,47 +180,33 @@ func (this *DNSDomainService) SyncDNSDomainData(ctx context.Context, req *pb.Syn
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 检查集群设置完整性
 | 
			
		||||
	clusters, err := models.SharedNodeClusterDAO.FindAllEnabledClustersWithDNSDomainId(req.DnsDomainId)
 | 
			
		||||
	// 查询集群信息
 | 
			
		||||
	clusters := []*models.NodeCluster{}
 | 
			
		||||
	if req.NodeClusterId > 0 {
 | 
			
		||||
		cluster, err := models.SharedNodeClusterDAO.FindEnabledNodeCluster(req.NodeClusterId)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
	for _, cluster := range clusters {
 | 
			
		||||
		if len(cluster.DnsName) == 0 {
 | 
			
		||||
			return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "有问题需要修复", ShouldFix: true}, nil
 | 
			
		||||
		if cluster == nil {
 | 
			
		||||
			return &pb.SyncDNSDomainDataResponse{
 | 
			
		||||
				IsOk:      false,
 | 
			
		||||
				Error:     "找不到要同步的集群",
 | 
			
		||||
				ShouldFix: false,
 | 
			
		||||
			}, nil
 | 
			
		||||
		}
 | 
			
		||||
		nodes, err := models.SharedNodeDAO.FindAllEnabledNodesWithClusterId(int64(cluster.Id))
 | 
			
		||||
		if int64(cluster.DnsDomainId) != req.DnsDomainId {
 | 
			
		||||
			return &pb.SyncDNSDomainDataResponse{
 | 
			
		||||
				IsOk:      false,
 | 
			
		||||
				Error:     "集群设置的域名和参数不符",
 | 
			
		||||
				ShouldFix: false,
 | 
			
		||||
			}, nil
 | 
			
		||||
		}
 | 
			
		||||
		clusters = append(clusters, cluster)
 | 
			
		||||
	} else {
 | 
			
		||||
		clusters, err = models.SharedNodeClusterDAO.FindAllEnabledClustersWithDNSDomainId(req.DnsDomainId)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		for _, node := range nodes {
 | 
			
		||||
			if node.IsOn == 0 {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			ipAddress, err := models.SharedNodeIPAddressDAO.FindFirstNodeIPAddress(int64(node.Id))
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return nil, err
 | 
			
		||||
			}
 | 
			
		||||
			if len(ipAddress) == 0 {
 | 
			
		||||
				return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "有问题需要修复", ShouldFix: true}, nil
 | 
			
		||||
			}
 | 
			
		||||
			route, err := node.DNSRoute(req.DnsDomainId)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return nil, err
 | 
			
		||||
			}
 | 
			
		||||
			if len(route) == 0 {
 | 
			
		||||
				return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "有问题需要修复", ShouldFix: true}, nil
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 检查服务设置完整性
 | 
			
		||||
	servers, err := models.SharedServerDAO.FindAllServersToFixWithDNSDomainId(req.DnsDomainId)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	if len(servers) > 0 {
 | 
			
		||||
		return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "有问题需要修复", ShouldFix: true}, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 域名信息
 | 
			
		||||
@@ -256,7 +246,7 @@ func (this *DNSDomainService) SyncDNSDomainData(ctx context.Context, req *pb.Syn
 | 
			
		||||
		return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "调用API认证失败:" + err.Error()}, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 线路
 | 
			
		||||
	// 更新线路
 | 
			
		||||
	routes, err := manager.GetRoutes(domainName)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "获取线路失败:" + err.Error()}, nil
 | 
			
		||||
@@ -270,6 +260,17 @@ func (this *DNSDomainService) SyncDNSDomainData(ctx context.Context, req *pb.Syn
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 检查集群设置
 | 
			
		||||
	for _, cluster := range clusters {
 | 
			
		||||
		issues, err := models.SharedNodeClusterDAO.CheckClusterDNS(cluster)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		if len(issues) > 0 {
 | 
			
		||||
			return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "发现问题需要修复", ShouldFix: true}, nil
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 所有记录
 | 
			
		||||
	records, err := manager.GetRecords(domainName)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -284,11 +285,55 @@ func (this *DNSDomainService) SyncDNSDomainData(ctx context.Context, req *pb.Syn
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 修正集群域名
 | 
			
		||||
	// TODO
 | 
			
		||||
	// 对比变化
 | 
			
		||||
	allChanges := []maps.Map{}
 | 
			
		||||
	for _, cluster := range clusters {
 | 
			
		||||
		changes, _, _, _, _, err := this.findClusterDNSChanges(cluster, records, domainName)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		allChanges = append(allChanges, changes...)
 | 
			
		||||
	}
 | 
			
		||||
	logs.Println("====")
 | 
			
		||||
	for _, change := range allChanges {
 | 
			
		||||
		action := change.GetString("action")
 | 
			
		||||
		record := change.Get("record").(*dnsclients.Record)
 | 
			
		||||
 | 
			
		||||
	// 修正服务域名
 | 
			
		||||
	// TODO
 | 
			
		||||
		if len(record.Route) == 0 {
 | 
			
		||||
			record.Route = manager.DefaultRoute()
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		switch action {
 | 
			
		||||
		case "create":
 | 
			
		||||
			err = manager.AddRecord(domainName, record)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "创建域名记录失败:" + err.Error()}, nil
 | 
			
		||||
			}
 | 
			
		||||
		case "delete":
 | 
			
		||||
			err = manager.DeleteRecord(domainName, record)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "删除域名记录失败:" + err.Error()}, nil
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		logs.Println(action, record.Name, record.Type, record.Value, record.Route) // TODO 仅供调试
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 重新更新记录
 | 
			
		||||
	if len(allChanges) > 0 {
 | 
			
		||||
		records, err := manager.GetRecords(domainName)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "重新获取域名解析记录失败:" + err.Error()}, nil
 | 
			
		||||
		}
 | 
			
		||||
		recordsJSON, err := json.Marshal(records)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		err = models.SharedDNSDomainDAO.UpdateDomainRecords(domainId, recordsJSON)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &pb.SyncDNSDomainDataResponse{
 | 
			
		||||
		IsOk: true,
 | 
			
		||||
@@ -321,105 +366,32 @@ func (this *DNSDomainService) convertDomainToPB(domain *models.DNSDomain) (*pb.D
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	recordsMapping := map[string][]*dnsclients.Record{} // name_type => *Record
 | 
			
		||||
	for _, record := range records {
 | 
			
		||||
		key := record.Name + "_" + record.Type
 | 
			
		||||
		_, ok := recordsMapping[key]
 | 
			
		||||
		if ok {
 | 
			
		||||
			recordsMapping[key] = append(recordsMapping[key], record)
 | 
			
		||||
		} else {
 | 
			
		||||
			recordsMapping[key] = []*dnsclients.Record{record}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 集群域名
 | 
			
		||||
	clusterRecords := []*pb.DNSRecord{}
 | 
			
		||||
	allClusterResolved := true
 | 
			
		||||
	{
 | 
			
		||||
	countNodeRecords := 0
 | 
			
		||||
	nodesChanged := false
 | 
			
		||||
 | 
			
		||||
	// 服务域名
 | 
			
		||||
	countServerRecords := 0
 | 
			
		||||
	serversChanged := false
 | 
			
		||||
 | 
			
		||||
	// 检查是否所有的集群都已经被解析
 | 
			
		||||
	clusters, err := models.SharedNodeClusterDAO.FindAllEnabledClustersWithDNSDomainId(domainId)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	for _, cluster := range clusters {
 | 
			
		||||
			clusterId := int64(cluster.Id)
 | 
			
		||||
			dnsName := cluster.DnsName
 | 
			
		||||
 | 
			
		||||
			// 子节点
 | 
			
		||||
			nodes, err := models.SharedNodeDAO.FindAllEnabledNodesWithClusterId(clusterId)
 | 
			
		||||
		_, nodeRecords, serverRecords, nodesChanged2, serversChanged2, err := this.findClusterDNSChanges(cluster, records, domain.Name)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
			nodeMapping := map[string]*models.Node{} // name_type_value_route => *Node
 | 
			
		||||
			for _, node := range nodes {
 | 
			
		||||
				if node.IsOn == 0 {
 | 
			
		||||
					continue
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				ipAddr, err := models.SharedNodeIPAddressDAO.FindFirstNodeIPAddress(int64(node.Id))
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return nil, err
 | 
			
		||||
				}
 | 
			
		||||
				route, err := node.DNSRoute(domainId)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return nil, err
 | 
			
		||||
				}
 | 
			
		||||
				nodeMapping[dnsName+"_A_"+ipAddr+"_"+route] = node
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// 已有的记录
 | 
			
		||||
			nodeRecordsMapping := map[string]*dnsclients.Record{} // name_type_value_route => *Record
 | 
			
		||||
			nodeRecords, _ := recordsMapping[dnsName+"_A"]
 | 
			
		||||
			for _, record := range nodeRecords {
 | 
			
		||||
				key := record.Name + "_" + record.Type + "_" + record.Value + "_" + record.Route
 | 
			
		||||
				nodeRecordsMapping[key] = record
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// 检查有无多余的子节点
 | 
			
		||||
			for key, record := range nodeRecordsMapping {
 | 
			
		||||
				_, ok := nodeMapping[key]
 | 
			
		||||
				if !ok {
 | 
			
		||||
					allClusterResolved = false
 | 
			
		||||
					continue
 | 
			
		||||
				}
 | 
			
		||||
				clusterRecords = append(clusterRecords, this.convertRecordToPB(record))
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// 检查有无少的子节点
 | 
			
		||||
			for key := range nodeMapping {
 | 
			
		||||
				_, ok := nodeRecordsMapping[key]
 | 
			
		||||
				if !ok {
 | 
			
		||||
					allClusterResolved = false
 | 
			
		||||
					break
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 服务域名
 | 
			
		||||
	serverRecords := []*pb.DNSRecord{}
 | 
			
		||||
	allServersResolved := true
 | 
			
		||||
 | 
			
		||||
	// 检查是否所有的服务都已经被解析
 | 
			
		||||
	{
 | 
			
		||||
		dnsNames, err := models.SharedServerDAO.FindAllServerDNSNamesWithDNSDomainId(domainId)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		for _, dnsName := range dnsNames {
 | 
			
		||||
			if len(dnsName) == 0 {
 | 
			
		||||
				allServersResolved = true
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			key := dnsName + "_CNAME"
 | 
			
		||||
			recordList, ok := recordsMapping[key]
 | 
			
		||||
			if !ok {
 | 
			
		||||
				allServersResolved = false
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			for _, record := range recordList {
 | 
			
		||||
				serverRecords = append(serverRecords, this.convertRecordToPB(record))
 | 
			
		||||
		countNodeRecords += len(nodeRecords)
 | 
			
		||||
		countServerRecords += len(serverRecords)
 | 
			
		||||
		if nodesChanged2 {
 | 
			
		||||
			nodesChanged = true
 | 
			
		||||
		}
 | 
			
		||||
		if serversChanged2 {
 | 
			
		||||
			serversChanged = true
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -438,10 +410,10 @@ func (this *DNSDomainService) convertDomainToPB(domain *models.DNSDomain) (*pb.D
 | 
			
		||||
		Name:               domain.Name,
 | 
			
		||||
		IsOn:               domain.IsOn == 1,
 | 
			
		||||
		DataUpdatedAt:      int64(domain.DataUpdatedAt),
 | 
			
		||||
		ClusterRecords:      clusterRecords,
 | 
			
		||||
		AllClustersResolved: allClusterResolved,
 | 
			
		||||
		ServerRecords:       serverRecords,
 | 
			
		||||
		AllServersResolved:  allServersResolved,
 | 
			
		||||
		CountNodeRecords:   int64(countNodeRecords),
 | 
			
		||||
		NodesChanged:       nodesChanged,
 | 
			
		||||
		CountServerRecords: int64(countServerRecords),
 | 
			
		||||
		ServersChanged:     serversChanged,
 | 
			
		||||
		Routes:             routes,
 | 
			
		||||
	}, nil
 | 
			
		||||
}
 | 
			
		||||
@@ -456,3 +428,126 @@ func (this *DNSDomainService) convertRecordToPB(record *dnsclients.Record) *pb.D
 | 
			
		||||
		Route: record.Route,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 检查集群节点变化
 | 
			
		||||
func (this *DNSDomainService) findClusterDNSChanges(cluster *models.NodeCluster, records []*dnsclients.Record, domainName string) (result []maps.Map, doneNodeRecords []*dnsclients.Record, doneServerRecords []*dnsclients.Record, nodesChanged bool, serversChanged bool, err error) {
 | 
			
		||||
	clusterId := int64(cluster.Id)
 | 
			
		||||
	clusterDnsName := cluster.DnsName
 | 
			
		||||
	clusterDomain := clusterDnsName + "." + domainName
 | 
			
		||||
 | 
			
		||||
	// 节点域名
 | 
			
		||||
	nodes, err := models.SharedNodeDAO.FindAllEnabledNodesDNSWithClusterId(clusterId)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, nil, nil, false, false, err
 | 
			
		||||
	}
 | 
			
		||||
	nodeRecords := []*dnsclients.Record{}                // 之所以用数组再存一遍,是因为dnsName可能会重复
 | 
			
		||||
	nodeRecordMapping := map[string]*dnsclients.Record{} // value_route => *Record
 | 
			
		||||
	for _, record := range records {
 | 
			
		||||
		if record.Type == dnsclients.RecordTypeA && record.Name == clusterDnsName {
 | 
			
		||||
			nodeRecords = append(nodeRecords, record)
 | 
			
		||||
			nodeRecordMapping[record.Value+"_"+record.Route] = record
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 新增的节点域名
 | 
			
		||||
	nodeKeys := []string{}
 | 
			
		||||
	for _, node := range nodes {
 | 
			
		||||
		ipAddr, err := models.SharedNodeIPAddressDAO.FindFirstNodeIPAddress(int64(node.Id))
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, nil, nil, false, false, err
 | 
			
		||||
		}
 | 
			
		||||
		if len(ipAddr) == 0 {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		route, err := node.DNSRoute(int64(cluster.DnsDomainId))
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, nil, nil, false, false, err
 | 
			
		||||
		}
 | 
			
		||||
		if len(route) == 0 {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		key := ipAddr + "_" + route
 | 
			
		||||
		nodeKeys = append(nodeKeys, key)
 | 
			
		||||
		record, ok := nodeRecordMapping[key]
 | 
			
		||||
		if !ok {
 | 
			
		||||
			result = append(result, maps.Map{
 | 
			
		||||
				"action": "create",
 | 
			
		||||
				"record": &dnsclients.Record{
 | 
			
		||||
					Id:    "",
 | 
			
		||||
					Name:  clusterDnsName,
 | 
			
		||||
					Type:  dnsclients.RecordTypeA,
 | 
			
		||||
					Value: ipAddr,
 | 
			
		||||
					Route: route,
 | 
			
		||||
				},
 | 
			
		||||
			})
 | 
			
		||||
			nodesChanged = true
 | 
			
		||||
		} else {
 | 
			
		||||
			doneNodeRecords = append(doneNodeRecords, record)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 多余的节点域名
 | 
			
		||||
	for _, record := range nodeRecords {
 | 
			
		||||
		key := record.Value + "_" + record.Route
 | 
			
		||||
		if !lists.ContainsString(nodeKeys, key) {
 | 
			
		||||
			nodesChanged = true
 | 
			
		||||
			result = append(result, maps.Map{
 | 
			
		||||
				"action": "delete",
 | 
			
		||||
				"record": record,
 | 
			
		||||
			})
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 服务域名
 | 
			
		||||
	servers, err := models.SharedServerDAO.FindAllServersDNSWithClusterId(clusterId)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, nil, nil, false, false, err
 | 
			
		||||
	}
 | 
			
		||||
	serverRecords := []*dnsclients.Record{}             // 之所以用数组再存一遍,是因为dnsName可能会重复
 | 
			
		||||
	serverRecordsMap := map[string]*dnsclients.Record{} // dnsName => *Record
 | 
			
		||||
	for _, record := range records {
 | 
			
		||||
		if record.Type == dnsclients.RecordTypeCName && record.Value == clusterDomain+"." {
 | 
			
		||||
			serverRecords = append(serverRecords, record)
 | 
			
		||||
			serverRecordsMap[record.Name] = record
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 新增的域名
 | 
			
		||||
	serverDNSNames := []string{}
 | 
			
		||||
	for _, server := range servers {
 | 
			
		||||
		dnsName := server.DnsName
 | 
			
		||||
		if len(dnsName) == 0 {
 | 
			
		||||
			return nil, nil, nil, false, false, errors.New("server '" + numberutils.FormatInt64(int64(server.Id)) + "' 'dnsName' should not empty")
 | 
			
		||||
		}
 | 
			
		||||
		serverDNSNames = append(serverDNSNames, dnsName)
 | 
			
		||||
		record, ok := serverRecordsMap[dnsName]
 | 
			
		||||
		if !ok {
 | 
			
		||||
			serversChanged = true
 | 
			
		||||
			result = append(result, maps.Map{
 | 
			
		||||
				"action": "create",
 | 
			
		||||
				"record": &dnsclients.Record{
 | 
			
		||||
					Id:    "",
 | 
			
		||||
					Name:  dnsName,
 | 
			
		||||
					Type:  dnsclients.RecordTypeCName,
 | 
			
		||||
					Value: clusterDomain + ".",
 | 
			
		||||
					Route: "", // 注意这里为空,需要在执行过程中获取默认值
 | 
			
		||||
				},
 | 
			
		||||
			})
 | 
			
		||||
		} else {
 | 
			
		||||
			doneServerRecords = append(doneServerRecords, record)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 多余的域名
 | 
			
		||||
	for _, record := range serverRecords {
 | 
			
		||||
		if !lists.ContainsString(serverDNSNames, record.Name) {
 | 
			
		||||
			serversChanged = true
 | 
			
		||||
			result = append(result, maps.Map{
 | 
			
		||||
				"action": "delete",
 | 
			
		||||
				"record": record,
 | 
			
		||||
			})
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -909,6 +909,10 @@ func (this *NodeService) FindEnabledNodeDNS(ctx context.Context, req *pb.FindEna
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	dnsDomainId := int64(clusterDNS.DnsDomainId)
 | 
			
		||||
	dnsDomainName, err := models.SharedDNSDomainDAO.FindDNSDomainName(dnsDomainId)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var route = ""
 | 
			
		||||
	if dnsDomainId > 0 {
 | 
			
		||||
@@ -931,6 +935,7 @@ func (this *NodeService) FindEnabledNodeDNS(ctx context.Context, req *pb.FindEna
 | 
			
		||||
			Route:         route,
 | 
			
		||||
			ClusterId:     clusterId,
 | 
			
		||||
			DnsDomainId:   dnsDomainId,
 | 
			
		||||
			DnsDomainName: dnsDomainName,
 | 
			
		||||
		},
 | 
			
		||||
	}, nil
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -479,3 +479,41 @@ func (this *NodeClusterService) UpdateNodeClusterDNS(ctx context.Context, req *p
 | 
			
		||||
	}
 | 
			
		||||
	return rpcutils.Success()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 检查集群的DNS是否有变化
 | 
			
		||||
func (this *NodeClusterService) CheckNodeClusterDNSChanges(ctx context.Context, req *pb.CheckNodeClusterDNSChangesRequest) (*pb.CheckNodeClusterDNSChangesResponse, error) {
 | 
			
		||||
	// 校验请求
 | 
			
		||||
	_, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	cluster, err := models.SharedNodeClusterDAO.FindClusterDNSInfo(req.NodeClusterId)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if cluster == nil || len(cluster.DnsName) == 0 || cluster.DnsDomainId <= 0 {
 | 
			
		||||
		return &pb.CheckNodeClusterDNSChangesResponse{IsChanged: false}, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	domainId := int64(cluster.DnsDomainId)
 | 
			
		||||
	domain, err := models.SharedDNSDomainDAO.FindEnabledDNSDomain(domainId)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	if domain == nil {
 | 
			
		||||
		return &pb.CheckNodeClusterDNSChangesResponse{IsChanged: false}, nil
 | 
			
		||||
	}
 | 
			
		||||
	records, err := domain.DecodeRecords()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	service := &DNSDomainService{}
 | 
			
		||||
	changes, _, _, _, _, err := service.findClusterDNSChanges(cluster, records, domain.Name)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return &pb.CheckNodeClusterDNSChangesResponse{IsChanged: len(changes) > 0}, nil
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user