实现基础的智能调度

This commit is contained in:
GoEdgeLab
2023-05-17 18:42:21 +08:00
parent 667628aff3
commit 3fd2925780
33 changed files with 507 additions and 127 deletions

View File

@@ -87,8 +87,8 @@ func (this *DNSTaskExecutor) loop() error {
return err
}
}
case dnsmodels.DNSTaskTypeClusterChange:
err = this.doCluster(taskId, taskVersion, int64(task.ClusterId))
case dnsmodels.DNSTaskTypeClusterChange, dnsmodels.DNSTaskTypeClusterNodesChange:
err = this.doCluster(taskId, taskVersion, int64(task.ClusterId), task.Type == dnsmodels.DNSTaskTypeClusterNodesChange)
if err != nil {
err = dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskError(nil, taskId, err.Error())
if err != nil {
@@ -300,7 +300,7 @@ func (this *DNSTaskExecutor) doNode(taskId int64, taskVersion int64, nodeCluster
// 转交给cluster统一处理
if nodeClusterId > 0 {
err = dnsmodels.SharedDNSTaskDAO.CreateClusterTask(tx, nodeClusterId, dnsmodels.DNSTaskTypeClusterChange)
err = dnsmodels.SharedDNSTaskDAO.CreateClusterTask(tx, nodeClusterId, dnsmodels.DNSTaskTypeClusterNodesChange)
if err != nil {
return err
}
@@ -310,7 +310,7 @@ func (this *DNSTaskExecutor) doNode(taskId int64, taskVersion int64, nodeCluster
return err
}
for _, clusterId := range clusterIds {
err = dnsmodels.SharedDNSTaskDAO.CreateClusterTask(tx, clusterId, dnsmodels.DNSTaskTypeClusterChange)
err = dnsmodels.SharedDNSTaskDAO.CreateClusterTask(tx, clusterId, dnsmodels.DNSTaskTypeClusterNodesChange)
if err != nil {
return err
}
@@ -323,8 +323,8 @@ func (this *DNSTaskExecutor) doNode(taskId int64, taskVersion int64, nodeCluster
}
// 修改集群相关记录
func (this *DNSTaskExecutor) doCluster(taskId int64, taskVersion int64, clusterId int64) error {
isOk := false
func (this *DNSTaskExecutor) doCluster(taskId int64, taskVersion int64, clusterId int64, nodesOnly bool) error {
var isOk = false
defer func() {
if isOk {
err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId, taskVersion)
@@ -378,6 +378,14 @@ func (this *DNSTaskExecutor) doCluster(taskId int64, taskVersion int64, clusterI
var isChanged = false
var addingNodeRecordKeysMap = map[string]bool{} // clusterDnsName_type_ip_route
for _, node := range nodes {
shouldSkip, shouldOverwrite, ipAddressesStrings, err := models.SharedNodeDAO.CheckNodeIPAddresses(tx, node)
if err != nil {
return err
}
if shouldSkip {
continue
}
routes, err := node.DNSRouteCodesForDomainId(domainId)
if err != nil {
return err
@@ -387,26 +395,36 @@ func (this *DNSTaskExecutor) doCluster(taskId int64, taskVersion int64, clusterI
}
// 所有的IP记录
ipAddresses, err := models.SharedNodeIPAddressDAO.FindAllEnabledAddressesWithNode(tx, int64(node.Id), nodeconfigs.NodeRoleNode)
if err != nil {
return err
if !shouldOverwrite {
ipAddresses, err := models.SharedNodeIPAddressDAO.FindAllEnabledAddressesWithNode(tx, int64(node.Id), nodeconfigs.NodeRoleNode)
if err != nil {
return err
}
if len(ipAddresses) == 0 {
continue
}
for _, ipAddress := range ipAddresses {
// 检查专属节点
if !ipAddress.IsValidInCluster(clusterId) {
continue
}
var ip = ipAddress.DNSIP()
if len(ip) == 0 || !ipAddress.CanAccess || !ipAddress.IsUp || !ipAddress.IsOn {
continue
}
if net.ParseIP(ip) == nil {
continue
}
ipAddressesStrings = append(ipAddressesStrings, ip)
}
}
if len(ipAddresses) == 0 {
if len(ipAddressesStrings) == 0 {
continue
}
for _, ipAddress := range ipAddresses {
// 检查专属节点
if !ipAddress.IsValidInCluster(clusterId) {
continue
}
var ip = ipAddress.DNSIP()
if len(ip) == 0 || !ipAddress.CanAccess || !ipAddress.IsUp || !ipAddress.IsOn {
continue
}
if net.ParseIP(ip) == nil {
continue
}
for _, ip := range ipAddressesStrings {
for _, route := range routes {
var key = route + "@" + ip
_, ok := oldRecordsMap[key]
@@ -456,80 +474,82 @@ func (this *DNSTaskExecutor) doCluster(taskId int64, taskVersion int64, clusterI
}
// 服务域名
servers, err := models.SharedServerDAO.FindAllServersDNSWithClusterId(tx, clusterId)
if err != nil {
return err
}
serverRecords := []*dnstypes.Record{} // 之所以用数组再存一遍是因为dnsName可能会重复
serverRecordsMap := map[string]*dnstypes.Record{} // dnsName => *Record
for _, record := range records {
if record.Type == dnstypes.RecordTypeCNAME && record.Value == clusterDomain+"." {
serverRecords = append(serverRecords, record)
serverRecordsMap[record.Name] = record
if !nodesOnly {
servers, err := models.SharedServerDAO.FindAllServersDNSWithClusterId(tx, clusterId)
if err != nil {
return err
}
}
// 新增的域名
var serverDNSNames = []string{}
for _, server := range servers {
var dnsName = server.DnsName
if len(dnsName) == 0 {
continue
}
serverDNSNames = append(serverDNSNames, dnsName)
_, ok := serverRecordsMap[dnsName]
if !ok {
isChanged = true
err = manager.AddRecord(domain, &dnstypes.Record{
Id: "",
Name: dnsName,
Type: dnstypes.RecordTypeCNAME,
Value: clusterDomain + ".",
Route: "", // 注意这里为空,需要在执行过程中获取默认值
TTL: ttl,
})
if err != nil {
return err
serverRecords := []*dnstypes.Record{} // 之所以用数组再存一遍是因为dnsName可能会重复
serverRecordsMap := map[string]*dnstypes.Record{} // dnsName => *Record
for _, record := range records {
if record.Type == dnstypes.RecordTypeCNAME && record.Value == clusterDomain+"." {
serverRecords = append(serverRecords, record)
serverRecordsMap[record.Name] = record
}
}
}
// 自动设置的CNAME
var cnameRecords = []string{}
if dnsConfig != nil {
cnameRecords = dnsConfig.CNAMERecords
}
for _, cnameRecord := range cnameRecords {
// 如果记录已存在,则跳过
if lists.ContainsString(serverDNSNames, cnameRecord) {
continue
}
serverDNSNames = append(serverDNSNames, cnameRecord)
_, ok := serverRecordsMap[cnameRecord]
if !ok {
isChanged = true
err = manager.AddRecord(domain, &dnstypes.Record{
Id: "",
Name: cnameRecord,
Type: dnstypes.RecordTypeCNAME,
Value: clusterDomain + ".",
Route: "", // 注意这里为空,需要在执行过程中获取默认值
TTL: ttl,
})
if err != nil {
return err
// 新增的域名
var serverDNSNames = []string{}
for _, server := range servers {
var dnsName = server.DnsName
if len(dnsName) == 0 {
continue
}
serverDNSNames = append(serverDNSNames, dnsName)
_, ok := serverRecordsMap[dnsName]
if !ok {
isChanged = true
err = manager.AddRecord(domain, &dnstypes.Record{
Id: "",
Name: dnsName,
Type: dnstypes.RecordTypeCNAME,
Value: clusterDomain + ".",
Route: "", // 注意这里为空,需要在执行过程中获取默认值
TTL: ttl,
})
if err != nil {
return err
}
}
}
}
// 多余的域名
for _, record := range serverRecords {
if !lists.ContainsString(serverDNSNames, record.Name) {
isChanged = true
err = manager.DeleteRecord(domain, record)
if err != nil {
return err
// 自动设置的CNAME
var cnameRecords = []string{}
if dnsConfig != nil {
cnameRecords = dnsConfig.CNAMERecords
}
for _, cnameRecord := range cnameRecords {
// 如果记录已存在,则跳过
if lists.ContainsString(serverDNSNames, cnameRecord) {
continue
}
serverDNSNames = append(serverDNSNames, cnameRecord)
_, ok := serverRecordsMap[cnameRecord]
if !ok {
isChanged = true
err = manager.AddRecord(domain, &dnstypes.Record{
Id: "",
Name: cnameRecord,
Type: dnstypes.RecordTypeCNAME,
Value: clusterDomain + ".",
Route: "", // 注意这里为空,需要在执行过程中获取默认值
TTL: ttl,
})
if err != nil {
return err
}
}
}
// 多余的域名
for _, record := range serverRecords {
if !lists.ContainsString(serverDNSNames, record.Name) {
isChanged = true
err = manager.DeleteRecord(domain, record)
if err != nil {
return err
}
}
}
}