Files
EdgeAPI/internal/tasks/dns_task_executor.go

585 lines
14 KiB
Go
Raw Normal View History

2021-01-27 23:00:02 +08:00
package tasks
import (
"encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
dnsmodels "github.com/TeaOSLab/EdgeAPI/internal/db/models/dns"
"github.com/TeaOSLab/EdgeAPI/internal/dnsclients"
2021-06-02 18:13:48 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/dnsclients/dnstypes"
"github.com/TeaOSLab/EdgeAPI/internal/goman"
2021-01-27 23:00:02 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
2021-06-07 10:02:07 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/utils"
2021-09-20 20:01:21 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/dnsconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
2021-01-27 23:00:02 +08:00
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/lists"
"net"
"strings"
"time"
)
func init() {
dbs.OnReadyDone(func() {
goman.New(func() {
NewDNSTaskExecutor().Start()
})
2021-01-27 23:00:02 +08:00
})
}
// DNSTaskExecutor DNS任务执行器
2021-01-27 23:00:02 +08:00
type DNSTaskExecutor struct {
}
func NewDNSTaskExecutor() *DNSTaskExecutor {
return &DNSTaskExecutor{}
}
func (this *DNSTaskExecutor) Start() {
ticker := time.NewTicker(10 * time.Second)
for range ticker.C {
err := this.LoopWithLocker(10)
if err != nil {
remotelogs.Error("DNSTaskExecutor", err.Error())
}
}
}
func (this *DNSTaskExecutor) LoopWithLocker(seconds int64) error {
ok, err := models.SharedSysLockerDAO.Lock(nil, "dns_task_executor", seconds-1) // 假设执行时间为1秒
if err != nil {
return err
}
if !ok {
return nil
}
return this.Loop()
}
func (this *DNSTaskExecutor) Loop() error {
tasks, err := dnsmodels.SharedDNSTaskDAO.FindAllDoingTasks(nil)
if err != nil {
return err
}
for _, task := range tasks {
taskId := int64(task.Id)
switch task.Type {
case dnsmodels.DNSTaskTypeServerChange:
err = this.doServer(taskId, int64(task.ServerId))
if err != nil {
err = dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskError(nil, taskId, err.Error())
if err != nil {
return err
}
}
case dnsmodels.DNSTaskTypeNodeChange:
err = this.doNode(taskId, int64(task.NodeId))
if err != nil {
err = dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskError(nil, taskId, err.Error())
if err != nil {
return err
}
}
case dnsmodels.DNSTaskTypeClusterChange:
err = this.doCluster(taskId, int64(task.ClusterId))
if err != nil {
err = dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskError(nil, taskId, err.Error())
if err != nil {
return err
}
}
case dnsmodels.DNSTaskTypeDomainChange:
err = this.doDomain(taskId, int64(task.DomainId))
if err != nil {
err = dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskError(nil, taskId, err.Error())
if err != nil {
return err
}
}
}
}
return nil
}
// 修改服务相关记录
2021-01-27 23:00:02 +08:00
func (this *DNSTaskExecutor) doServer(taskId int64, serverId int64) error {
var tx *dbs.Tx
isOk := false
defer func() {
if isOk {
err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(tx, taskId)
if err != nil {
remotelogs.Error("DNSTaskExecutor", err.Error())
}
}
}()
// 检查是否已通过审核
serverDNS, err := models.SharedServerDAO.FindStatelessServerDNS(tx, serverId)
if err != nil {
return err
}
if serverDNS == nil {
isOk = true
return nil
}
if len(serverDNS.DnsName) == 0 {
isOk = true
return nil
}
2021-09-20 20:01:21 +08:00
manager, domainId, domain, clusterDNSName, dnsConfig, err := this.findDNSManager(tx, int64(serverDNS.ClusterId))
2021-01-27 23:00:02 +08:00
if err != nil {
return err
}
if manager == nil {
isOk = true
return nil
}
2021-09-20 20:01:21 +08:00
var ttl int32 = 0
if dnsConfig != nil {
ttl = dnsConfig.TTL
}
2021-01-27 23:00:02 +08:00
recordName := serverDNS.DnsName
recordValue := clusterDNSName + "." + domain + "."
recordRoute := manager.DefaultRoute()
2021-06-02 18:13:48 +08:00
recordType := dnstypes.RecordTypeCNAME
2022-03-22 21:45:07 +08:00
if serverDNS.State == models.ServerStateDisabled || !serverDNS.IsOn {
2021-01-27 23:00:02 +08:00
// 检查记录是否已经存在
record, err := manager.QueryRecord(domain, recordName, recordType)
if err != nil {
return err
}
if record != nil {
// 删除
err = manager.DeleteRecord(domain, record)
if err != nil {
return err
}
err = dnsmodels.SharedDNSTaskDAO.CreateDomainTask(tx, domainId, dnsmodels.DNSTaskTypeDomainChange)
if err != nil {
return err
}
}
isOk = true
} else {
// 是否已存在
exist, err := dnsmodels.SharedDNSDomainDAO.ExistDomainRecord(tx, domainId, recordName, recordType, recordRoute, recordValue)
if err != nil {
return err
}
if exist {
isOk = true
return nil
}
// 检查记录是否已经存在
record, err := manager.QueryRecord(domain, recordName, recordType)
if err != nil {
return err
}
if record != nil {
if record.Value == recordValue || record.Value == strings.TrimRight(recordValue, ".") {
isOk = true
return nil
}
// 删除
err = manager.DeleteRecord(domain, record)
if err != nil {
return err
}
err = dnsmodels.SharedDNSTaskDAO.CreateDomainTask(tx, domainId, dnsmodels.DNSTaskTypeDomainChange)
if err != nil {
return err
}
}
2021-06-02 18:13:48 +08:00
err = manager.AddRecord(domain, &dnstypes.Record{
2021-01-27 23:00:02 +08:00
Id: "",
Name: recordName,
Type: recordType,
Value: recordValue,
Route: recordRoute,
2021-09-20 20:01:21 +08:00
TTL: ttl,
2021-01-27 23:00:02 +08:00
})
if err != nil {
return err
}
err = dnsmodels.SharedDNSTaskDAO.CreateDomainTask(tx, domainId, dnsmodels.DNSTaskTypeDomainChange)
if err != nil {
return err
}
isOk = true
}
return nil
}
// 修改节点相关记录
2021-01-27 23:00:02 +08:00
func (this *DNSTaskExecutor) doNode(taskId int64, nodeId int64) error {
isOk := false
defer func() {
if isOk {
err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId)
if err != nil {
remotelogs.Error("DNSTaskExecutor", err.Error())
}
}
}()
var tx *dbs.Tx
node, err := models.SharedNodeDAO.FindStatelessNodeDNS(tx, nodeId)
if err != nil {
return err
}
if node == nil {
isOk = true
return nil
}
// 转交给cluster统一处理
2021-07-31 22:23:11 +08:00
clusterIds, err := models.SharedNodeDAO.FindEnabledAndOnNodeClusterIds(tx, nodeId)
2021-01-27 23:00:02 +08:00
if err != nil {
return err
}
2021-07-31 22:23:11 +08:00
for _, clusterId := range clusterIds {
err = dnsmodels.SharedDNSTaskDAO.CreateClusterTask(tx, clusterId, dnsmodels.DNSTaskTypeClusterChange)
if err != nil {
return err
}
}
2021-01-27 23:00:02 +08:00
isOk = true
return nil
}
// 修改集群相关记录
2021-01-27 23:00:02 +08:00
func (this *DNSTaskExecutor) doCluster(taskId int64, clusterId int64) error {
isOk := false
defer func() {
if isOk {
err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId)
if err != nil {
remotelogs.Error("DNSTaskExecutor", err.Error())
}
}
}()
var tx *dbs.Tx
2021-09-20 20:01:21 +08:00
manager, domainId, domain, clusterDNSName, dnsConfig, err := this.findDNSManager(tx, clusterId)
2021-01-27 23:00:02 +08:00
if err != nil {
return err
}
if manager == nil {
isOk = true
return nil
}
var clusterDomain = clusterDNSName + "." + domain
2021-09-20 20:01:21 +08:00
var ttl int32 = 0
if dnsConfig != nil {
ttl = dnsConfig.TTL
}
2021-01-27 23:00:02 +08:00
// 以前的节点记录
records, err := manager.GetRecords(domain)
if err != nil {
return err
}
var oldRecordsMap = map[string]*dnstypes.Record{} // route@value => record
var oldCnameRecordsMap = map[string]*dnstypes.Record{} // cname => record
2021-01-27 23:00:02 +08:00
for _, record := range records {
2021-06-07 10:02:07 +08:00
if (record.Type == dnstypes.RecordTypeA || record.Type == dnstypes.RecordTypeAAAA) && record.Name == clusterDNSName {
2021-01-27 23:00:02 +08:00
key := record.Route + "@" + record.Value
oldRecordsMap[key] = record
}
if record.Type == dnstypes.RecordTypeCNAME {
oldCnameRecordsMap[record.Name] = record
}
2021-01-27 23:00:02 +08:00
}
// 当前的节点记录
newRecordKeys := []string{}
2021-07-31 22:23:11 +08:00
nodes, err := models.SharedNodeDAO.FindAllEnabledNodesDNSWithClusterId(tx, clusterId, true)
2021-01-27 23:00:02 +08:00
if err != nil {
return err
}
isChanged := false
for _, node := range nodes {
routes, err := node.DNSRouteCodesForDomainId(domainId)
if err != nil {
return err
}
if len(routes) == 0 {
routes = []string{manager.DefaultRoute()}
}
// 所有的IP记录
ipAddresses, err := models.SharedNodeIPAddressDAO.FindAllEnabledAddressesWithNode(tx, int64(node.Id), nodeconfigs.NodeRoleNode)
2021-01-27 23:00:02 +08:00
if err != nil {
return err
}
if len(ipAddresses) == 0 {
continue
}
for _, ipAddress := range ipAddresses {
2021-09-13 10:51:05 +08:00
ip := ipAddress.DNSIP()
2022-03-22 21:45:07 +08:00
if len(ip) == 0 || ipAddress.CanAccess == 0 || ipAddress.IsUp == 0 || !ipAddress.IsOn {
2021-01-27 23:00:02 +08:00
continue
}
if net.ParseIP(ip) == nil {
continue
}
for _, route := range routes {
key := route + "@" + ip
_, ok := oldRecordsMap[key]
if ok {
newRecordKeys = append(newRecordKeys, key)
continue
}
2021-06-07 10:02:07 +08:00
recordType := dnstypes.RecordTypeA
if utils.IsIPv6(ip) {
recordType = dnstypes.RecordTypeAAAA
}
2021-06-02 18:13:48 +08:00
err = manager.AddRecord(domain, &dnstypes.Record{
2021-01-27 23:00:02 +08:00
Id: "",
Name: clusterDNSName,
2021-06-07 10:02:07 +08:00
Type: recordType,
2021-01-27 23:00:02 +08:00
Value: ip,
Route: route,
2021-09-20 20:01:21 +08:00
TTL: ttl,
2021-01-27 23:00:02 +08:00
})
if err != nil {
return err
}
isChanged = true
newRecordKeys = append(newRecordKeys, key)
}
}
}
// 删除多余的节点解析记录
for key, record := range oldRecordsMap {
if !lists.ContainsString(newRecordKeys, key) {
isChanged = true
2021-01-27 23:00:02 +08:00
err = manager.DeleteRecord(domain, record)
if err != nil {
return err
}
}
}
// 服务域名
servers, err := models.SharedServerDAO.FindAllServersDNSWithClusterId(tx, clusterId)
if err != nil {
return err
}
serverRecords := []*dnstypes.Record{} // 之所以用数组再存一遍是因为dnsName可能会重复
serverRecordsMap := map[string]*dnstypes.Record{} // dnsName => *Record
for _, record := range records {
if record.Type == dnstypes.RecordTypeCNAME && record.Value == clusterDomain+"." {
serverRecords = append(serverRecords, record)
serverRecordsMap[record.Name] = record
}
}
// 新增的域名
serverDNSNames := []string{}
for _, server := range servers {
dnsName := server.DnsName
if len(dnsName) == 0 {
continue
}
serverDNSNames = append(serverDNSNames, dnsName)
_, ok := serverRecordsMap[dnsName]
if !ok {
isChanged = true
err = manager.AddRecord(domain, &dnstypes.Record{
Id: "",
Name: dnsName,
Type: dnstypes.RecordTypeCNAME,
Value: clusterDomain + ".",
Route: "", // 注意这里为空,需要在执行过程中获取默认值
2021-09-20 20:01:21 +08:00
TTL: ttl,
})
if err != nil {
return err
}
}
}
// 自动设置的CNAME
2021-09-20 20:01:21 +08:00
var cnameRecords = []string{}
if dnsConfig != nil {
cnameRecords = dnsConfig.CNameRecords
}
for _, cnameRecord := range cnameRecords {
serverDNSNames = append(serverDNSNames, cnameRecord)
_, ok := serverRecordsMap[cnameRecord]
if !ok {
isChanged = true
err = manager.AddRecord(domain, &dnstypes.Record{
Id: "",
Name: cnameRecord,
Type: dnstypes.RecordTypeCNAME,
Value: clusterDomain + ".",
Route: "", // 注意这里为空,需要在执行过程中获取默认值
2021-09-20 20:01:21 +08:00
TTL: ttl,
})
if err != nil {
return err
}
}
}
// 多余的域名
for _, record := range serverRecords {
if !lists.ContainsString(serverDNSNames, record.Name) {
isChanged = true
err = manager.DeleteRecord(domain, record)
if err != nil {
return err
}
}
}
2021-01-27 23:00:02 +08:00
// 通知更新域名
if isChanged {
err = dnsmodels.SharedDNSTaskDAO.CreateDomainTask(tx, domainId, dnsmodels.DNSTaskTypeDomainChange)
if err != nil {
return err
}
}
isOk = true
return nil
}
func (this *DNSTaskExecutor) doDomain(taskId int64, domainId int64) error {
var tx *dbs.Tx
isOk := false
defer func() {
if isOk {
err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(tx, taskId)
if err != nil {
remotelogs.Error("DNSTaskExecutor", err.Error())
}
}
}()
2021-08-22 11:35:33 +08:00
dnsDomain, err := dnsmodels.SharedDNSDomainDAO.FindEnabledDNSDomain(tx, domainId, nil)
2021-01-27 23:00:02 +08:00
if err != nil {
return err
}
if dnsDomain == nil {
isOk = true
return nil
}
providerId := int64(dnsDomain.ProviderId)
if providerId <= 0 {
isOk = true
return nil
}
provider, err := dnsmodels.SharedDNSProviderDAO.FindEnabledDNSProvider(tx, providerId)
if err != nil {
return err
}
if provider == nil {
isOk = true
return nil
}
manager := dnsclients.FindProvider(provider.Type)
if manager == nil {
remotelogs.Error("DNSTaskExecutor", "unsupported dns provider type '"+provider.Type+"'")
isOk = true
return nil
}
params, err := provider.DecodeAPIParams()
if err != nil {
return err
}
err = manager.Auth(params)
if err != nil {
return err
}
records, err := manager.GetRecords(dnsDomain.Name)
if err != nil {
return err
}
recordsJSON, err := json.Marshal(records)
if err != nil {
return err
}
err = dnsmodels.SharedDNSDomainDAO.UpdateDomainRecords(tx, domainId, recordsJSON)
if err != nil {
return err
}
isOk = true
return nil
}
2021-09-20 20:01:21 +08:00
func (this *DNSTaskExecutor) findDNSManager(tx *dbs.Tx, clusterId int64) (manager dnsclients.ProviderInterface, domainId int64, domain string, clusterDNSName string, dnsConfig *dnsconfigs.ClusterDNSConfig, err error) {
2021-08-22 11:35:33 +08:00
clusterDNS, err := models.SharedNodeClusterDAO.FindClusterDNSInfo(tx, clusterId, nil)
2021-01-27 23:00:02 +08:00
if err != nil {
return nil, 0, "", "", nil, err
2021-01-27 23:00:02 +08:00
}
if clusterDNS == nil || len(clusterDNS.DnsName) == 0 || clusterDNS.DnsDomainId <= 0 {
return nil, 0, "", "", nil, nil
}
2021-09-20 20:01:21 +08:00
dnsConfig, err = clusterDNS.DecodeDNSConfig()
if err != nil {
return nil, 0, "", "", nil, err
2021-01-27 23:00:02 +08:00
}
2021-08-22 11:35:33 +08:00
dnsDomain, err := dnsmodels.SharedDNSDomainDAO.FindEnabledDNSDomain(tx, int64(clusterDNS.DnsDomainId), nil)
2021-01-27 23:00:02 +08:00
if err != nil {
return nil, 0, "", "", nil, err
2021-01-27 23:00:02 +08:00
}
if dnsDomain == nil {
return nil, 0, "", "", nil, nil
2021-01-27 23:00:02 +08:00
}
providerId := int64(dnsDomain.ProviderId)
if providerId <= 0 {
return nil, 0, "", "", nil, nil
2021-01-27 23:00:02 +08:00
}
provider, err := dnsmodels.SharedDNSProviderDAO.FindEnabledDNSProvider(tx, providerId)
if err != nil {
return nil, 0, "", "", nil, err
2021-01-27 23:00:02 +08:00
}
if provider == nil {
return nil, 0, "", "", nil, nil
2021-01-27 23:00:02 +08:00
}
manager = dnsclients.FindProvider(provider.Type)
if manager == nil {
remotelogs.Error("DNSTaskExecutor", "unsupported dns provider type '"+provider.Type+"'")
return nil, 0, "", "", nil, nil
2021-01-27 23:00:02 +08:00
}
params, err := provider.DecodeAPIParams()
if err != nil {
return nil, 0, "", "", nil, err
2021-01-27 23:00:02 +08:00
}
err = manager.Auth(params)
if err != nil {
return nil, 0, "", "", nil, err
2021-01-27 23:00:02 +08:00
}
2021-09-20 20:01:21 +08:00
return manager, int64(dnsDomain.Id), dnsDomain.Name, clusterDNS.DnsName, dnsConfig, nil
2021-01-27 23:00:02 +08:00
}