mirror of
https://github.com/TeaOSLab/EdgeAPI.git
synced 2025-11-23 15:00:25 +08:00
实现完整的集群、域名同步
This commit is contained in:
@@ -5,8 +5,12 @@ import (
|
||||
"encoding/json"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/dnsclients"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/errors"
|
||||
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/iwind/TeaGo/lists"
|
||||
"github.com/iwind/TeaGo/logs"
|
||||
"github.com/iwind/TeaGo/maps"
|
||||
)
|
||||
|
||||
@@ -176,47 +180,33 @@ func (this *DNSDomainService) SyncDNSDomainData(ctx context.Context, req *pb.Syn
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 检查集群设置完整性
|
||||
clusters, err := models.SharedNodeClusterDAO.FindAllEnabledClustersWithDNSDomainId(req.DnsDomainId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, cluster := range clusters {
|
||||
if len(cluster.DnsName) == 0 {
|
||||
return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "有问题需要修复", ShouldFix: true}, nil
|
||||
}
|
||||
nodes, err := models.SharedNodeDAO.FindAllEnabledNodesWithClusterId(int64(cluster.Id))
|
||||
// 查询集群信息
|
||||
clusters := []*models.NodeCluster{}
|
||||
if req.NodeClusterId > 0 {
|
||||
cluster, err := models.SharedNodeClusterDAO.FindEnabledNodeCluster(req.NodeClusterId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, node := range nodes {
|
||||
if node.IsOn == 0 {
|
||||
continue
|
||||
}
|
||||
ipAddress, err := models.SharedNodeIPAddressDAO.FindFirstNodeIPAddress(int64(node.Id))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(ipAddress) == 0 {
|
||||
return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "有问题需要修复", ShouldFix: true}, nil
|
||||
}
|
||||
route, err := node.DNSRoute(req.DnsDomainId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(route) == 0 {
|
||||
return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "有问题需要修复", ShouldFix: true}, nil
|
||||
}
|
||||
if cluster == nil {
|
||||
return &pb.SyncDNSDomainDataResponse{
|
||||
IsOk: false,
|
||||
Error: "找不到要同步的集群",
|
||||
ShouldFix: false,
|
||||
}, nil
|
||||
}
|
||||
if int64(cluster.DnsDomainId) != req.DnsDomainId {
|
||||
return &pb.SyncDNSDomainDataResponse{
|
||||
IsOk: false,
|
||||
Error: "集群设置的域名和参数不符",
|
||||
ShouldFix: false,
|
||||
}, nil
|
||||
}
|
||||
clusters = append(clusters, cluster)
|
||||
} else {
|
||||
clusters, err = models.SharedNodeClusterDAO.FindAllEnabledClustersWithDNSDomainId(req.DnsDomainId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// 检查服务设置完整性
|
||||
servers, err := models.SharedServerDAO.FindAllServersToFixWithDNSDomainId(req.DnsDomainId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(servers) > 0 {
|
||||
return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "有问题需要修复", ShouldFix: true}, nil
|
||||
}
|
||||
|
||||
// 域名信息
|
||||
@@ -256,7 +246,7 @@ func (this *DNSDomainService) SyncDNSDomainData(ctx context.Context, req *pb.Syn
|
||||
return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "调用API认证失败:" + err.Error()}, nil
|
||||
}
|
||||
|
||||
// 线路
|
||||
// 更新线路
|
||||
routes, err := manager.GetRoutes(domainName)
|
||||
if err != nil {
|
||||
return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "获取线路失败:" + err.Error()}, nil
|
||||
@@ -270,6 +260,17 @@ func (this *DNSDomainService) SyncDNSDomainData(ctx context.Context, req *pb.Syn
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 检查集群设置
|
||||
for _, cluster := range clusters {
|
||||
issues, err := models.SharedNodeClusterDAO.CheckClusterDNS(cluster)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(issues) > 0 {
|
||||
return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "发现问题需要修复", ShouldFix: true}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// 所有记录
|
||||
records, err := manager.GetRecords(domainName)
|
||||
if err != nil {
|
||||
@@ -284,11 +285,55 @@ func (this *DNSDomainService) SyncDNSDomainData(ctx context.Context, req *pb.Syn
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 修正集群域名
|
||||
// TODO
|
||||
// 对比变化
|
||||
allChanges := []maps.Map{}
|
||||
for _, cluster := range clusters {
|
||||
changes, _, _, _, _, err := this.findClusterDNSChanges(cluster, records, domainName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
allChanges = append(allChanges, changes...)
|
||||
}
|
||||
logs.Println("====")
|
||||
for _, change := range allChanges {
|
||||
action := change.GetString("action")
|
||||
record := change.Get("record").(*dnsclients.Record)
|
||||
|
||||
// 修正服务域名
|
||||
// TODO
|
||||
if len(record.Route) == 0 {
|
||||
record.Route = manager.DefaultRoute()
|
||||
}
|
||||
|
||||
switch action {
|
||||
case "create":
|
||||
err = manager.AddRecord(domainName, record)
|
||||
if err != nil {
|
||||
return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "创建域名记录失败:" + err.Error()}, nil
|
||||
}
|
||||
case "delete":
|
||||
err = manager.DeleteRecord(domainName, record)
|
||||
if err != nil {
|
||||
return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "删除域名记录失败:" + err.Error()}, nil
|
||||
}
|
||||
}
|
||||
|
||||
logs.Println(action, record.Name, record.Type, record.Value, record.Route) // TODO 仅供调试
|
||||
}
|
||||
|
||||
// 重新更新记录
|
||||
if len(allChanges) > 0 {
|
||||
records, err := manager.GetRecords(domainName)
|
||||
if err != nil {
|
||||
return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "重新获取域名解析记录失败:" + err.Error()}, nil
|
||||
}
|
||||
recordsJSON, err := json.Marshal(records)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = models.SharedDNSDomainDAO.UpdateDomainRecords(domainId, recordsJSON)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return &pb.SyncDNSDomainDataResponse{
|
||||
IsOk: true,
|
||||
@@ -321,105 +366,32 @@ func (this *DNSDomainService) convertDomainToPB(domain *models.DNSDomain) (*pb.D
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
recordsMapping := map[string][]*dnsclients.Record{} // name_type => *Record
|
||||
for _, record := range records {
|
||||
key := record.Name + "_" + record.Type
|
||||
_, ok := recordsMapping[key]
|
||||
if ok {
|
||||
recordsMapping[key] = append(recordsMapping[key], record)
|
||||
} else {
|
||||
recordsMapping[key] = []*dnsclients.Record{record}
|
||||
}
|
||||
}
|
||||
|
||||
// 集群域名
|
||||
clusterRecords := []*pb.DNSRecord{}
|
||||
allClusterResolved := true
|
||||
{
|
||||
// 检查是否所有的集群都已经被解析
|
||||
clusters, err := models.SharedNodeClusterDAO.FindAllEnabledClustersWithDNSDomainId(domainId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, cluster := range clusters {
|
||||
clusterId := int64(cluster.Id)
|
||||
dnsName := cluster.DnsName
|
||||
|
||||
// 子节点
|
||||
nodes, err := models.SharedNodeDAO.FindAllEnabledNodesWithClusterId(clusterId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nodeMapping := map[string]*models.Node{} // name_type_value_route => *Node
|
||||
for _, node := range nodes {
|
||||
if node.IsOn == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
ipAddr, err := models.SharedNodeIPAddressDAO.FindFirstNodeIPAddress(int64(node.Id))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
route, err := node.DNSRoute(domainId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nodeMapping[dnsName+"_A_"+ipAddr+"_"+route] = node
|
||||
}
|
||||
|
||||
// 已有的记录
|
||||
nodeRecordsMapping := map[string]*dnsclients.Record{} // name_type_value_route => *Record
|
||||
nodeRecords, _ := recordsMapping[dnsName+"_A"]
|
||||
for _, record := range nodeRecords {
|
||||
key := record.Name + "_" + record.Type + "_" + record.Value + "_" + record.Route
|
||||
nodeRecordsMapping[key] = record
|
||||
}
|
||||
|
||||
// 检查有无多余的子节点
|
||||
for key, record := range nodeRecordsMapping {
|
||||
_, ok := nodeMapping[key]
|
||||
if !ok {
|
||||
allClusterResolved = false
|
||||
continue
|
||||
}
|
||||
clusterRecords = append(clusterRecords, this.convertRecordToPB(record))
|
||||
}
|
||||
|
||||
// 检查有无少的子节点
|
||||
for key := range nodeMapping {
|
||||
_, ok := nodeRecordsMapping[key]
|
||||
if !ok {
|
||||
allClusterResolved = false
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
countNodeRecords := 0
|
||||
nodesChanged := false
|
||||
|
||||
// 服务域名
|
||||
serverRecords := []*pb.DNSRecord{}
|
||||
allServersResolved := true
|
||||
countServerRecords := 0
|
||||
serversChanged := false
|
||||
|
||||
// 检查是否所有的服务都已经被解析
|
||||
{
|
||||
dnsNames, err := models.SharedServerDAO.FindAllServerDNSNamesWithDNSDomainId(domainId)
|
||||
// 检查是否所有的集群都已经被解析
|
||||
clusters, err := models.SharedNodeClusterDAO.FindAllEnabledClustersWithDNSDomainId(domainId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, cluster := range clusters {
|
||||
_, nodeRecords, serverRecords, nodesChanged2, serversChanged2, err := this.findClusterDNSChanges(cluster, records, domain.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, dnsName := range dnsNames {
|
||||
if len(dnsName) == 0 {
|
||||
allServersResolved = true
|
||||
continue
|
||||
}
|
||||
key := dnsName + "_CNAME"
|
||||
recordList, ok := recordsMapping[key]
|
||||
if !ok {
|
||||
allServersResolved = false
|
||||
continue
|
||||
}
|
||||
for _, record := range recordList {
|
||||
serverRecords = append(serverRecords, this.convertRecordToPB(record))
|
||||
}
|
||||
countNodeRecords += len(nodeRecords)
|
||||
countServerRecords += len(serverRecords)
|
||||
if nodesChanged2 {
|
||||
nodesChanged = true
|
||||
}
|
||||
if serversChanged2 {
|
||||
serversChanged = true
|
||||
}
|
||||
}
|
||||
|
||||
@@ -433,16 +405,16 @@ func (this *DNSDomainService) convertDomainToPB(domain *models.DNSDomain) (*pb.D
|
||||
}
|
||||
|
||||
return &pb.DNSDomain{
|
||||
Id: int64(domain.Id),
|
||||
ProviderId: int64(domain.ProviderId),
|
||||
Name: domain.Name,
|
||||
IsOn: domain.IsOn == 1,
|
||||
DataUpdatedAt: int64(domain.DataUpdatedAt),
|
||||
ClusterRecords: clusterRecords,
|
||||
AllClustersResolved: allClusterResolved,
|
||||
ServerRecords: serverRecords,
|
||||
AllServersResolved: allServersResolved,
|
||||
Routes: routes,
|
||||
Id: int64(domain.Id),
|
||||
ProviderId: int64(domain.ProviderId),
|
||||
Name: domain.Name,
|
||||
IsOn: domain.IsOn == 1,
|
||||
DataUpdatedAt: int64(domain.DataUpdatedAt),
|
||||
CountNodeRecords: int64(countNodeRecords),
|
||||
NodesChanged: nodesChanged,
|
||||
CountServerRecords: int64(countServerRecords),
|
||||
ServersChanged: serversChanged,
|
||||
Routes: routes,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -456,3 +428,126 @@ func (this *DNSDomainService) convertRecordToPB(record *dnsclients.Record) *pb.D
|
||||
Route: record.Route,
|
||||
}
|
||||
}
|
||||
|
||||
// 检查集群节点变化
|
||||
func (this *DNSDomainService) findClusterDNSChanges(cluster *models.NodeCluster, records []*dnsclients.Record, domainName string) (result []maps.Map, doneNodeRecords []*dnsclients.Record, doneServerRecords []*dnsclients.Record, nodesChanged bool, serversChanged bool, err error) {
|
||||
clusterId := int64(cluster.Id)
|
||||
clusterDnsName := cluster.DnsName
|
||||
clusterDomain := clusterDnsName + "." + domainName
|
||||
|
||||
// 节点域名
|
||||
nodes, err := models.SharedNodeDAO.FindAllEnabledNodesDNSWithClusterId(clusterId)
|
||||
if err != nil {
|
||||
return nil, nil, nil, false, false, err
|
||||
}
|
||||
nodeRecords := []*dnsclients.Record{} // 之所以用数组再存一遍,是因为dnsName可能会重复
|
||||
nodeRecordMapping := map[string]*dnsclients.Record{} // value_route => *Record
|
||||
for _, record := range records {
|
||||
if record.Type == dnsclients.RecordTypeA && record.Name == clusterDnsName {
|
||||
nodeRecords = append(nodeRecords, record)
|
||||
nodeRecordMapping[record.Value+"_"+record.Route] = record
|
||||
}
|
||||
}
|
||||
|
||||
// 新增的节点域名
|
||||
nodeKeys := []string{}
|
||||
for _, node := range nodes {
|
||||
ipAddr, err := models.SharedNodeIPAddressDAO.FindFirstNodeIPAddress(int64(node.Id))
|
||||
if err != nil {
|
||||
return nil, nil, nil, false, false, err
|
||||
}
|
||||
if len(ipAddr) == 0 {
|
||||
continue
|
||||
}
|
||||
route, err := node.DNSRoute(int64(cluster.DnsDomainId))
|
||||
if err != nil {
|
||||
return nil, nil, nil, false, false, err
|
||||
}
|
||||
if len(route) == 0 {
|
||||
continue
|
||||
}
|
||||
key := ipAddr + "_" + route
|
||||
nodeKeys = append(nodeKeys, key)
|
||||
record, ok := nodeRecordMapping[key]
|
||||
if !ok {
|
||||
result = append(result, maps.Map{
|
||||
"action": "create",
|
||||
"record": &dnsclients.Record{
|
||||
Id: "",
|
||||
Name: clusterDnsName,
|
||||
Type: dnsclients.RecordTypeA,
|
||||
Value: ipAddr,
|
||||
Route: route,
|
||||
},
|
||||
})
|
||||
nodesChanged = true
|
||||
} else {
|
||||
doneNodeRecords = append(doneNodeRecords, record)
|
||||
}
|
||||
}
|
||||
|
||||
// 多余的节点域名
|
||||
for _, record := range nodeRecords {
|
||||
key := record.Value + "_" + record.Route
|
||||
if !lists.ContainsString(nodeKeys, key) {
|
||||
nodesChanged = true
|
||||
result = append(result, maps.Map{
|
||||
"action": "delete",
|
||||
"record": record,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// 服务域名
|
||||
servers, err := models.SharedServerDAO.FindAllServersDNSWithClusterId(clusterId)
|
||||
if err != nil {
|
||||
return nil, nil, nil, false, false, err
|
||||
}
|
||||
serverRecords := []*dnsclients.Record{} // 之所以用数组再存一遍,是因为dnsName可能会重复
|
||||
serverRecordsMap := map[string]*dnsclients.Record{} // dnsName => *Record
|
||||
for _, record := range records {
|
||||
if record.Type == dnsclients.RecordTypeCName && record.Value == clusterDomain+"." {
|
||||
serverRecords = append(serverRecords, record)
|
||||
serverRecordsMap[record.Name] = record
|
||||
}
|
||||
}
|
||||
|
||||
// 新增的域名
|
||||
serverDNSNames := []string{}
|
||||
for _, server := range servers {
|
||||
dnsName := server.DnsName
|
||||
if len(dnsName) == 0 {
|
||||
return nil, nil, nil, false, false, errors.New("server '" + numberutils.FormatInt64(int64(server.Id)) + "' 'dnsName' should not empty")
|
||||
}
|
||||
serverDNSNames = append(serverDNSNames, dnsName)
|
||||
record, ok := serverRecordsMap[dnsName]
|
||||
if !ok {
|
||||
serversChanged = true
|
||||
result = append(result, maps.Map{
|
||||
"action": "create",
|
||||
"record": &dnsclients.Record{
|
||||
Id: "",
|
||||
Name: dnsName,
|
||||
Type: dnsclients.RecordTypeCName,
|
||||
Value: clusterDomain + ".",
|
||||
Route: "", // 注意这里为空,需要在执行过程中获取默认值
|
||||
},
|
||||
})
|
||||
} else {
|
||||
doneServerRecords = append(doneServerRecords, record)
|
||||
}
|
||||
}
|
||||
|
||||
// 多余的域名
|
||||
for _, record := range serverRecords {
|
||||
if !lists.ContainsString(serverDNSNames, record.Name) {
|
||||
serversChanged = true
|
||||
result = append(result, maps.Map{
|
||||
"action": "delete",
|
||||
"record": record,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user