增加DNS同步任务状态显示

This commit is contained in:
GoEdgeLab
2021-01-27 23:00:02 +08:00
parent ff722baf22
commit a73a04521d
19 changed files with 1087 additions and 243 deletions

View File

@@ -0,0 +1,153 @@
package dns
import (
"github.com/TeaOSLab/EdgeAPI/internal/errors"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/maps"
"time"
)
type DNSTaskType = string
const (
DNSTaskTypeClusterChange DNSTaskType = "clusterChange"
DNSTaskTypeNodeChange DNSTaskType = "nodeChange"
DNSTaskTypeServerChange DNSTaskType = "serverChange"
DNSTaskTypeDomainChange DNSTaskType = "domainChange"
)
type DNSTaskDAO dbs.DAO
func NewDNSTaskDAO() *DNSTaskDAO {
return dbs.NewDAO(&DNSTaskDAO{
DAOObject: dbs.DAOObject{
DB: Tea.Env,
Table: "edgeDNSTasks",
Model: new(DNSTask),
PkName: "id",
},
}).(*DNSTaskDAO)
}
var SharedDNSTaskDAO *DNSTaskDAO
func init() {
dbs.OnReady(func() {
SharedDNSTaskDAO = NewDNSTaskDAO()
})
}
// 生成任务
func (this *DNSTaskDAO) CreateDNSTask(tx *dbs.Tx, clusterId int64, serverId int64, nodeId int64, domainId int64, taskType string) error {
if clusterId <= 0 && serverId <= 0 && nodeId <= 0 {
return nil
}
err := this.Query(tx).InsertOrUpdateQuickly(maps.Map{
"clusterId": clusterId,
"serverId": serverId,
"nodeId": nodeId,
"domainId": domainId,
"updatedAt": time.Now().Unix(),
"type": taskType,
"isDone": false,
"isOk": false,
"error": "",
}, maps.Map{
"updatedAt": time.Now().Unix(),
"isDone": false,
"isOk": false,
"error": "",
})
return err
}
// 生成集群任务
func (this *DNSTaskDAO) CreateClusterTask(tx *dbs.Tx, clusterId int64, taskType DNSTaskType) error {
return this.CreateDNSTask(tx, clusterId, 0, 0, 0, taskType)
}
// 生成节点任务
func (this *DNSTaskDAO) CreateNodeTask(tx *dbs.Tx, nodeId int64, taskType DNSTaskType) error {
return this.CreateDNSTask(tx, 0, 0, nodeId, 0, taskType)
}
// 生成服务任务
func (this *DNSTaskDAO) CreateServerTask(tx *dbs.Tx, serverId int64, taskType DNSTaskType) error {
return this.CreateDNSTask(tx, 0, serverId, 0, 0, taskType)
}
// 生成域名更新任务
func (this *DNSTaskDAO) CreateDomainTask(tx *dbs.Tx, domainId int64, taskType DNSTaskType) error {
return this.CreateDNSTask(tx, 0, 0, 0, domainId, taskType)
}
// 查找所有正在执行的任务
func (this *DNSTaskDAO) FindAllDoingTasks(tx *dbs.Tx) (result []*DNSTask, err error) {
_, err = this.Query(tx).
Attr("isDone", 0).
AscPk().
Slice(&result).
FindAll()
return
}
// 查找正在执行的和错误的任务
func (this *DNSTaskDAO) FindAllDoingOrErrorTasks(tx *dbs.Tx) (result []*DNSTask, err error) {
_, err = this.Query(tx).
Where("(isDone=0 OR (isDone=1 AND isOk=0))").
AscPk().
Slice(&result).
FindAll()
return
}
// 检查是否有正在执行的任务
func (this *DNSTaskDAO) ExistDoingTasks(tx *dbs.Tx) (bool, error) {
return this.Query(tx).
Attr("isDone", 0).
Exist()
}
// 检查是否有错误的任务
func (this *DNSTaskDAO) ExistErrorTasks(tx *dbs.Tx) (bool, error) {
return this.Query(tx).
Attr("isDone", 1).
Attr("isOk", 0).
Exist()
}
// 删除任务
func (this *DNSTaskDAO) DeleteDNSTask(tx *dbs.Tx, taskId int64) error {
_, err := this.Query(tx).
Pk(taskId).
Delete()
return err
}
// 设置任务错误
func (this *DNSTaskDAO) UpdateDNSTaskError(tx *dbs.Tx, taskId int64, err string) error {
if taskId <= 0 {
return errors.New("invalid taskId")
}
op := NewDNSTaskOperator()
op.Id = taskId
op.IsDone = true
op.Error = err
op.IsOk = false
return this.Save(tx, op)
}
// 设置任务完成
func (this *DNSTaskDAO) UpdateDNSTaskDone(tx *dbs.Tx, taskId int64) error {
if taskId <= 0 {
return errors.New("invalid taskId")
}
op := NewDNSTaskOperator()
op.Id = taskId
op.IsDone = true
op.IsOk = true
op.Error = ""
return this.Save(tx, op)
}

View File

@@ -0,0 +1,17 @@
package dns
import (
_ "github.com/go-sql-driver/mysql"
_ "github.com/iwind/TeaGo/bootstrap"
"github.com/iwind/TeaGo/dbs"
"testing"
)
func TestDNSTaskDAO_CreateDNSTask(t *testing.T) {
dbs.NotifyReady()
err := SharedDNSTaskDAO.CreateDNSTask(nil, 1, 2, 3, "taskType")
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}

View File

@@ -0,0 +1,32 @@
package dns
// DNS更新任务
type DNSTask struct {
Id uint64 `field:"id"` // ID
ClusterId uint32 `field:"clusterId"` // 集群ID
ServerId uint32 `field:"serverId"` // 服务ID
NodeId uint32 `field:"nodeId"` // 节点ID
DomainId uint32 `field:"domainId"` // 域名ID
Type string `field:"type"` // 任务类型
UpdatedAt uint64 `field:"updatedAt"` // 更新时间
IsDone uint8 `field:"isDone"` // 是否已完成
IsOk uint8 `field:"isOk"` // 是否成功
Error string `field:"error"` // 错误信息
}
type DNSTaskOperator struct {
Id interface{} // ID
ClusterId interface{} // 集群ID
ServerId interface{} // 服务ID
NodeId interface{} // 节点ID
DomainId interface{} // 域名ID
Type interface{} // 任务类型
UpdatedAt interface{} // 更新时间
IsDone interface{} // 是否已完成
IsOk interface{} // 是否成功
Error interface{} // 错误信息
}
func NewDNSTaskOperator() *DNSTaskOperator {
return &DNSTaskOperator{}
}

View File

@@ -0,0 +1 @@
package dns

View File

@@ -120,7 +120,7 @@ func (this *NodeClusterDAO) FindAllEnableClusterIds(tx *dbs.Tx) (result []int64,
// 创建集群
func (this *NodeClusterDAO) CreateCluster(tx *dbs.Tx, adminId int64, name string, grantId int64, installDir string, dnsDomainId int64, dnsName string, cachePolicyId int64, httpFirewallPolicyId int64, systemServices map[string]maps.Map) (clusterId int64, err error) {
uniqueId, err := this.genUniqueId(tx)
uniqueId, err := this.GenUniqueId(tx)
if err != nil {
return 0, err
}
@@ -454,7 +454,11 @@ func (this *NodeClusterDAO) UpdateClusterDNS(tx *dbs.Tx, clusterId int64, dnsNam
if err != nil {
return err
}
return this.NotifyUpdate(tx, clusterId)
err = this.NotifyUpdate(tx, clusterId)
if err != nil {
return err
}
return this.NotifyDNSUpdate(tx, clusterId)
}
// 检查集群的DNS问题
@@ -545,7 +549,7 @@ func (this *NodeClusterDAO) CheckClusterDNS(tx *dbs.Tx, cluster *NodeCluster) (i
}
// 检查IP地址
ipAddr, err := SharedNodeIPAddressDAO.FindFirstNodeIPAddress(tx, nodeId)
ipAddr, err := SharedNodeIPAddressDAO.FindFirstNodeAccessIPAddress(tx, nodeId)
if err != nil {
return nil, err
}
@@ -801,7 +805,7 @@ func (this *NodeClusterDAO) FindNodeClusterSystemServices(tx *dbs.Tx, clusterId
}
// 生成唯一ID
func (this *NodeClusterDAO) genUniqueId(tx *dbs.Tx) (string, error) {
func (this *NodeClusterDAO) GenUniqueId(tx *dbs.Tx) (string, error) {
for {
uniqueId := rands.HexString(32)
ok, err := this.Query(tx).
@@ -821,3 +825,13 @@ func (this *NodeClusterDAO) genUniqueId(tx *dbs.Tx) (string, error) {
func (this *NodeClusterDAO) NotifyUpdate(tx *dbs.Tx, clusterId int64) error {
return SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, NodeTaskTypeConfigChanged)
}
// 通知DNS更新
// TODO 更新新的DNS解析记录的同时需要删除老的DNS解析记录
func (this *NodeClusterDAO) NotifyDNSUpdate(tx *dbs.Tx, clusterId int64) error {
err := dns.SharedDNSTaskDAO.CreateClusterTask(tx, clusterId, dns.DNSTaskTypeClusterChange)
if err != nil {
return err
}
return nil
}

View File

@@ -2,6 +2,7 @@ package models
import (
"encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/dns"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
@@ -12,7 +13,6 @@ import (
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
@@ -80,7 +80,17 @@ func (this *NodeDAO) DisableNode(tx *dbs.Tx, nodeId int64) (err error) {
return err
}
return this.NotifyUpdate(tx, nodeId)
err = this.NotifyUpdate(tx, nodeId)
if err != nil {
return err
}
err = this.NotifyDNSUpdate(tx, nodeId)
if err != nil {
return err
}
return nil
}
// 查找启用中的条目
@@ -106,7 +116,7 @@ func (this *NodeDAO) FindNodeName(tx *dbs.Tx, id int64) (string, error) {
// 创建节点
func (this *NodeDAO) CreateNode(tx *dbs.Tx, adminId int64, name string, clusterId int64, groupId int64, regionId int64) (nodeId int64, err error) {
uniqueId, err := this.genUniqueId(tx)
uniqueId, err := this.GenUniqueId(tx)
if err != nil {
return 0, err
}
@@ -134,7 +144,20 @@ func (this *NodeDAO) CreateNode(tx *dbs.Tx, adminId int64, name string, clusterI
return 0, err
}
return types.Int64(op.Id), nil
// 通知节点更新
nodeId = types.Int64(op.Id)
err = this.NotifyUpdate(tx, nodeId)
if err != nil {
return 0, err
}
// 通知DNS更新
err = this.NotifyDNSUpdate(tx, nodeId)
if err != nil {
return 0, err
}
return nodeId, nil
}
// 修改节点
@@ -156,7 +179,12 @@ func (this *NodeDAO) UpdateNode(tx *dbs.Tx, nodeId int64, name string, clusterId
return err
}
return this.NotifyUpdate(tx, nodeId)
err = this.NotifyUpdate(tx, nodeId)
if err != nil {
return err
}
return this.NotifyDNSUpdate(tx, nodeId)
}
// 计算所有节点数量
@@ -670,7 +698,6 @@ func (this *NodeDAO) FindAllLowerVersionNodesWithClusterId(tx *dbs.Tx, clusterId
DescPk().
Slice(&result).
FindAll()
logs.Println(len(result), version) // TODO
return
}
@@ -730,6 +757,18 @@ func (this *NodeDAO) FindEnabledNodeDNS(tx *dbs.Tx, nodeId int64) (*Node, error)
return one.(*Node), nil
}
// 获取单个节点的DNS信息无论什么状态
func (this *NodeDAO) FindStatelessNodeDNS(tx *dbs.Tx, nodeId int64) (*Node, error) {
one, err := this.Query(tx).
Pk(nodeId).
Result("id", "name", "dnsRoutes", "clusterId", "isOn", "state").
Find()
if err != nil || one == nil {
return nil, err
}
return one.(*Node), nil
}
// 修改节点的DNS信息
func (this *NodeDAO) UpdateNodeDNS(tx *dbs.Tx, nodeId int64, routes map[int64][]string) error {
if nodeId <= 0 {
@@ -750,7 +789,17 @@ func (this *NodeDAO) UpdateNodeDNS(tx *dbs.Tx, nodeId int64, routes map[int64][]
return err
}
return this.NotifyUpdate(tx, nodeId)
err = this.NotifyUpdate(tx, nodeId)
if err != nil {
return err
}
err = this.NotifyDNSUpdate(tx, nodeId)
if err != nil {
return err
}
return nil
}
// 计算节点上线|下线状态
@@ -806,6 +855,11 @@ func (this *NodeDAO) UpdateNodeUp(tx *dbs.Tx, nodeId int64, isUp bool, maxUp int
return false, err
}
err = this.NotifyDNSUpdate(tx, nodeId)
if err != nil {
return false, err
}
return
}
@@ -842,7 +896,7 @@ func (this *NodeDAO) FindNodeVersion(tx *dbs.Tx, nodeId int64) (int64, error) {
}
// 生成唯一ID
func (this *NodeDAO) genUniqueId(tx *dbs.Tx) (string, error) {
func (this *NodeDAO) GenUniqueId(tx *dbs.Tx) (string, error) {
for {
uniqueId := rands.HexString(32)
ok, err := this.Query(tx).
@@ -888,3 +942,28 @@ func (this *NodeDAO) NotifyUpdate(tx *dbs.Tx, nodeId int64) error {
}
return nil
}
// 通知DNS更新
func (this *NodeDAO) NotifyDNSUpdate(tx *dbs.Tx, nodeId int64) error {
clusterId, err := this.Query(tx).
Pk(nodeId).
Result("clusterId").
FindInt64Col(0) // 这里不需要加服务状态条件因为我们即使删除也要删除对应的服务的DNS解析
if err != nil {
return err
}
if clusterId <= 0 {
return nil
}
dnsInfo, err := SharedNodeClusterDAO.FindClusterDNSInfo(tx, clusterId)
if err != nil {
return err
}
if dnsInfo == nil {
return nil
}
if len(dnsInfo.DnsName) == 0 || dnsInfo.DnsDomainId <= 0 {
return nil
}
return dns.SharedDNSTaskDAO.CreateNodeTask(tx, nodeId, dns.DNSTaskTypeNodeChange)
}

View File

@@ -96,6 +96,12 @@ func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, nodeId int64, name strin
if err != nil {
return 0, err
}
err = SharedNodeDAO.NotifyDNSUpdate(tx, nodeId)
if err != nil {
return 0, err
}
return types.Int64(op.Id), nil
}
@@ -134,7 +140,15 @@ func (this *NodeIPAddressDAO) UpdateAddressNodeId(tx *dbs.Tx, addressId int64, n
Set("nodeId", nodeId).
Set("state", NodeIPAddressStateEnabled). // 恢复状态
Update()
return err
if err != nil {
return err
}
err = SharedNodeDAO.NotifyDNSUpdate(tx, nodeId)
if err != nil {
return err
}
return nil
}
// 查找节点的所有的IP地址
@@ -150,7 +164,7 @@ func (this *NodeIPAddressDAO) FindAllEnabledAddressesWithNode(tx *dbs.Tx, nodeId
}
// 查找节点的第一个可访问的IP地址
func (this *NodeIPAddressDAO) FindFirstNodeIPAddress(tx *dbs.Tx, nodeId int64) (string, error) {
func (this *NodeIPAddressDAO) FindFirstNodeAccessIPAddress(tx *dbs.Tx, nodeId int64) (string, error) {
return this.Query(tx).
Attr("nodeId", nodeId).
State(NodeIPAddressStateEnabled).
@@ -162,7 +176,7 @@ func (this *NodeIPAddressDAO) FindFirstNodeIPAddress(tx *dbs.Tx, nodeId int64) (
}
// 查找节点的第一个可访问的IP地址ID
func (this *NodeIPAddressDAO) FindFirstNodeIPAddressId(tx *dbs.Tx, nodeId int64) (int64, error) {
func (this *NodeIPAddressDAO) FindFirstNodeAccessIPAddressId(tx *dbs.Tx, nodeId int64) (int64, error) {
return this.Query(tx).
Attr("nodeId", nodeId).
State(NodeIPAddressStateEnabled).
@@ -172,3 +186,16 @@ func (this *NodeIPAddressDAO) FindFirstNodeIPAddressId(tx *dbs.Tx, nodeId int64)
Result("id").
FindInt64Col(0)
}
// 查找节点所有的可访问的IP地址
func (this *NodeIPAddressDAO) FindNodeAccessIPAddresses(tx *dbs.Tx, nodeId int64) (result []*NodeIPAddress, err error) {
_, err = this.Query(tx).
Attr("nodeId", nodeId).
State(NodeIPAddressStateEnabled).
Attr("canAccess", true).
Desc("order").
AscPk().
Slice(&result).
FindAll()
return
}

View File

@@ -65,15 +65,24 @@ func (this *ServerDAO) EnableServer(tx *dbs.Tx, id uint32) (rowsAffected int64,
}
// 禁用条目
func (this *ServerDAO) DisableServer(tx *dbs.Tx, id int64) (err error) {
func (this *ServerDAO) DisableServer(tx *dbs.Tx, serverId int64) (err error) {
_, err = this.Query(tx).
Pk(id).
Pk(serverId).
Set("state", ServerStateDisabled).
Update()
if err != nil {
return err
}
return this.NotifyUpdate(tx, id)
err = this.NotifyUpdate(tx, serverId)
if err != nil {
return err
}
err = this.NotifyDNSUpdate(tx, serverId)
if err != nil {
return err
}
return nil
}
// 查找启用中的服务
@@ -88,6 +97,15 @@ func (this *ServerDAO) FindEnabledServer(tx *dbs.Tx, serverId int64) (*Server, e
return result.(*Server), err
}
// 查找服务名称
func (this *ServerDAO) FindEnabledServerName(tx *dbs.Tx, serverId int64) (string, error) {
return this.Query(tx).
Pk(serverId).
State(ServerStateEnabled).
Result("name").
FindStringCol("")
}
// 查找服务基本信息
func (this *ServerDAO) FindEnabledServerBasic(tx *dbs.Tx, serverId int64) (*Server, error) {
result, err := this.Query(tx).
@@ -186,7 +204,7 @@ func (this *ServerDAO) CreateServer(tx *dbs.Tx,
op.GroupIds = groupIdsJSON
}
dnsName, err := this.genDNSName(tx)
dnsName, err := this.GenDNSName(tx)
if err != nil {
return 0, err
}
@@ -203,11 +221,18 @@ func (this *ServerDAO) CreateServer(tx *dbs.Tx,
serverId = types.Int64(op.Id)
// 通知配置更改
err = this.NotifyUpdate(tx, serverId)
if err != nil {
return 0, err
}
// 通知DNS更改
err = this.NotifyDNSUpdate(tx, serverId)
if err != nil {
return 0, err
}
return serverId, nil
}
@@ -244,7 +269,8 @@ func (this *ServerDAO) UpdateServerBasic(tx *dbs.Tx, serverId int64, name string
return err
}
return nil
// 因为可能有isOn的原因所以需要修改
return this.NotifyDNSUpdate(tx, serverId)
}
// 设置用户相关的基本信息
@@ -487,7 +513,7 @@ func (this *ServerDAO) InitServerWeb(tx *dbs.Tx, serverId int64) (int64, error)
}
// 查找ServerNames配置
func (this *ServerDAO) FindServerNames(tx *dbs.Tx, serverId int64) (serverNamesJSON []byte, isAuditing bool, auditingServerNamesJSON []byte, auditingResultJSON []byte, err error) {
func (this *ServerDAO) FindServerServerNames(tx *dbs.Tx, serverId int64) (serverNamesJSON []byte, isAuditing bool, auditingServerNamesJSON []byte, auditingResultJSON []byte, err error) {
if serverId <= 0 {
return
}
@@ -574,7 +600,12 @@ func (this *ServerDAO) UpdateServerAuditing(tx *dbs.Tx, serverId int64, result *
return err
}
return this.NotifyUpdate(tx, serverId)
err = this.NotifyUpdate(tx, serverId)
if err != nil {
return err
}
return this.NotifyDNSUpdate(tx, serverId)
}
// 修改反向代理配置
@@ -1085,7 +1116,7 @@ func (this *ServerDAO) GenerateServerDNSName(tx *dbs.Tx, serverId int64) (string
if serverId <= 0 {
return "", errors.New("invalid serverId")
}
dnsName, err := this.genDNSName(tx)
dnsName, err := this.GenDNSName(tx)
if err != nil {
return "", err
}
@@ -1102,6 +1133,11 @@ func (this *ServerDAO) GenerateServerDNSName(tx *dbs.Tx, serverId int64) (string
return "", err
}
err = this.NotifyDNSUpdate(tx, serverId)
if err != nil {
return "", err
}
return dnsName, nil
}
@@ -1121,6 +1157,18 @@ func (this *ServerDAO) FindServerDNSName(tx *dbs.Tx, serverId int64) (string, er
FindStringCol("")
}
// 查询服务的DNS相关信息并且不关注状态
func (this *ServerDAO) FindStatelessServerDNS(tx *dbs.Tx, serverId int64) (*Server, error) {
one, err := this.Query(tx).
Pk(serverId).
Result("id", "dnsName", "isOn", "state", "clusterId").
Find()
if err != nil || one == nil {
return nil, err
}
return one.(*Server), nil
}
// 获取当前服务的管理员ID和用户ID
func (this *ServerDAO) FindServerAdminIdAndUserId(tx *dbs.Tx, serverId int64) (adminId int64, userId int64, err error) {
one, err := this.Query(tx).
@@ -1271,6 +1319,22 @@ func (this *ServerDAO) CheckPortIsUsing(tx *dbs.Tx, clusterId int64, port int, e
Exist()
}
// 生成DNS Name
func (this *ServerDAO) GenDNSName(tx *dbs.Tx) (string, error) {
for {
dnsName := rands.HexString(8)
exist, err := this.Query(tx).
Attr("dnsName", dnsName).
Exist()
if err != nil {
return "", err
}
if !exist {
return dnsName, nil
}
}
}
// 同步集群
func (this *ServerDAO) NotifyUpdate(tx *dbs.Tx, serverId int64) error {
// 更新配置
@@ -1290,18 +1354,27 @@ func (this *ServerDAO) NotifyUpdate(tx *dbs.Tx, serverId int64) error {
return SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, NodeTaskTypeConfigChanged)
}
// 生成DNS Name
func (this *ServerDAO) genDNSName(tx *dbs.Tx) (string, error) {
for {
dnsName := rands.HexString(8)
exist, err := this.Query(tx).
Attr("dnsName", dnsName).
Exist()
if err != nil {
return "", err
}
if !exist {
return dnsName, nil
}
// 通知DNS更新
func (this *ServerDAO) NotifyDNSUpdate(tx *dbs.Tx, serverId int64) error {
clusterId, err := this.Query(tx).
Pk(serverId).
Result("clusterId").
FindInt64Col(0) // 这里不需要加服务状态条件因为我们即使删除也要删除对应的服务的DNS解析
if err != nil {
return err
}
if clusterId <= 0 {
return nil
}
dnsInfo, err := SharedNodeClusterDAO.FindClusterDNSInfo(tx, clusterId)
if err != nil {
return err
}
if dnsInfo == nil {
return nil
}
if len(dnsInfo.DnsName) == 0 || dnsInfo.DnsDomainId <= 0 {
return nil
}
return dns.SharedDNSTaskDAO.CreateServerTask(tx, serverId, dns.DNSTaskTypeServerChange)
}

View File

@@ -8,7 +8,3 @@ const (
EventQuit Event = "quit" // quit node gracefully
EventReload Event = "reload" // reload config
)
// 节点更新事件
// TODO 改成事件
var NodeDNSChanges = make(chan int64, 128)

View File

@@ -233,6 +233,7 @@ func (this *APINode) listenRPC(listener net.Listener, tlsConfig *tls.Config) err
pb.RegisterServerClientSystemMonthlyStatServiceServer(rpcServer, &services.ServerClientSystemMonthlyStatService{})
pb.RegisterServerClientBrowserMonthlyStatServiceServer(rpcServer, &services.ServerClientBrowserMonthlyStatService{})
pb.RegisterServerHTTPFirewallDailyStatServiceServer(rpcServer, &services.ServerHTTPFirewallDailyStatService{})
pb.RegisterDNSTaskServiceServer(rpcServer, &services.DNSTaskService{})
err := rpcServer.Serve(listener)
if err != nil {
return errors.New("[API_NODE]start rpc failed: " + err.Error())

View File

@@ -12,6 +12,7 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/maps"
"net"
)
// DNS域名相关服务
@@ -410,11 +411,11 @@ func (this *DNSDomainService) findClusterDNSChanges(cluster *models.NodeCluster,
// 新增的节点域名
nodeKeys := []string{}
for _, node := range nodes {
ipAddr, err := models.SharedNodeIPAddressDAO.FindFirstNodeIPAddress(tx, int64(node.Id))
ipAddresses, err := models.SharedNodeIPAddressDAO.FindNodeAccessIPAddresses(tx, int64(node.Id))
if err != nil {
return nil, nil, nil, 0, 0, false, false, err
}
if len(ipAddr) == 0 {
if len(ipAddresses) == 0 {
continue
}
routeCodes, err := node.DNSRouteCodesForDomainId(int64(cluster.DnsDomainId))
@@ -425,23 +426,32 @@ func (this *DNSDomainService) findClusterDNSChanges(cluster *models.NodeCluster,
continue
}
for _, route := range routeCodes {
key := ipAddr + "_" + route
nodeKeys = append(nodeKeys, key)
record, ok := nodeRecordMapping[key]
if !ok {
result = append(result, maps.Map{
"action": "create",
"record": &dnsclients.Record{
Id: "",
Name: clusterDnsName,
Type: dnsclients.RecordTypeA,
Value: ipAddr,
Route: route,
},
})
nodesChanged = true
} else {
doneNodeRecords = append(doneNodeRecords, record)
for _, ipAddress := range ipAddresses {
ip := ipAddress.Ip
if len(ip) == 0 {
continue
}
if net.ParseIP(ip) == nil {
continue
}
key := ip + "_" + route
nodeKeys = append(nodeKeys, key)
record, ok := nodeRecordMapping[key]
if !ok {
result = append(result, maps.Map{
"action": "create",
"record": &dnsclients.Record{
Id: "",
Name: clusterDnsName,
Type: dnsclients.RecordTypeA,
Value: ip,
Route: route,
},
})
nodesChanged = true
} else {
doneNodeRecords = append(doneNodeRecords, record)
}
}
}
}

View File

@@ -0,0 +1,110 @@
package services
import (
"context"
"fmt"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/dns"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
)
// DNS同步相关任务
type DNSTaskService struct {
BaseService
}
// 检查是否有正在执行的任务
func (this *DNSTaskService) ExistsDNSTasks(ctx context.Context, req *pb.ExistsDNSTasksRequest) (*pb.ExistsDNSTasksResponse, error) {
_, err := this.ValidateAdmin(ctx, 0)
if err != nil {
return nil, err
}
var tx = this.NullTx()
existDoingTasks, err := dns.SharedDNSTaskDAO.ExistDoingTasks(tx)
if err != nil {
return nil, err
}
existErrorTasks, err := dns.SharedDNSTaskDAO.ExistErrorTasks(tx)
if err != nil {
return nil, err
}
return &pb.ExistsDNSTasksResponse{
ExistTasks: existDoingTasks,
ExistError: existErrorTasks,
}, nil
}
// 查找正在执行的所有任务
func (this *DNSTaskService) FindAllDoingDNSTasks(ctx context.Context, req *pb.FindAllDoingDNSTasksRequest) (*pb.FindAllDoingDNSTasksResponse, error) {
_, err := this.ValidateAdmin(ctx, 0)
if err != nil {
return nil, err
}
var tx = this.NullTx()
tasks, err := dns.SharedDNSTaskDAO.FindAllDoingOrErrorTasks(tx)
if err != nil {
return nil, err
}
pbTasks := []*pb.DNSTask{}
for _, task := range tasks {
pbTask := &pb.DNSTask{
Id: int64(task.Id),
Type: task.Type,
IsDone: task.IsDone == 1,
IsOk: task.IsOk == 1,
Error: task.Error,
UpdatedAt: int64(task.UpdatedAt),
}
switch task.Type {
case dns.DNSTaskTypeClusterChange:
clusterName, err := models.SharedNodeClusterDAO.FindNodeClusterName(tx, int64(task.ClusterId))
if err != nil {
return nil, err
}
if len(clusterName) == 0 {
clusterName = "集群[" + fmt.Sprintf("%d", task.ClusterId) + "]"
}
pbTask.NodeCluster = &pb.NodeCluster{Id: int64(task.ClusterId), Name: clusterName}
case dns.DNSTaskTypeNodeChange:
nodeName, err := models.SharedNodeDAO.FindNodeName(tx, int64(task.NodeId))
if err != nil {
return nil, err
}
if len(nodeName) == 0 {
nodeName = "节点[" + fmt.Sprintf("%d", task.NodeId) + "]"
}
pbTask.Node = &pb.Node{Id: int64(task.NodeId), Name: nodeName}
case dns.DNSTaskTypeServerChange:
serverName, err := models.SharedServerDAO.FindEnabledServerName(tx, int64(task.ServerId))
if err != nil {
return nil, err
}
if len(serverName) == 0 {
serverName = "服务[" + fmt.Sprintf("%d", task.ServerId) + "]"
}
pbTask.Server = &pb.Server{Id: int64(task.ServerId), Name: serverName}
}
pbTasks = append(pbTasks, pbTask)
}
return &pb.FindAllDoingDNSTasksResponse{DnsTasks: pbTasks}, nil
}
// 删除任务
func (this *DNSTaskService) DeleteDNSTask(ctx context.Context, req *pb.DeleteDNSTaskRequest) (*pb.RPCSuccess, error) {
_, err := this.ValidateAdmin(ctx, 0)
if err != nil {
return nil, err
}
err = dns.SharedDNSTaskDAO.DeleteDNSTask(this.NullTx(), req.DnsTaskId)
if err != nil {
return nil, err
}
return this.Success()
}

View File

@@ -7,31 +7,16 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models/dns"
"github.com/TeaOSLab/EdgeAPI/internal/dnsclients"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/events"
"github.com/TeaOSLab/EdgeAPI/internal/installers"
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/types"
"net"
)
func init() {
dbs.OnReady(func() {
go func() {
service := &NodeService{}
for nodeId := range events.NodeDNSChanges {
err := service.notifyNodeDNSChanged(nodeId)
if err != nil {
logs.Println("[ERROR]change node dns: " + err.Error())
}
}
}()
})
}
// 边缘节点相关服务
type NodeService struct {
BaseService
@@ -69,14 +54,6 @@ func (this *NodeService) CreateNode(ctx context.Context, req *pb.CreateNodeReque
}
}
// 同步DNS
go func() {
err := this.notifyNodeDNSChanged(nodeId)
if err != nil {
logs.Println("sync node DNS error: " + err.Error())
}
}()
return &pb.CreateNodeResponse{
NodeId: nodeId,
}, nil
@@ -344,14 +321,6 @@ func (this *NodeService) DeleteNode(ctx context.Context, req *pb.DeleteNodeReque
return nil, err
}
// 同步DNS
go func() {
err := this.notifyNodeDNSChanged(req.NodeId)
if err != nil {
logs.Println("sync node DNS error: " + err.Error())
}
}()
// 删除节点相关任务
err = models.SharedNodeTaskDAO.DeleteNodeTasks(tx, req.NodeId)
if err != nil {
@@ -404,15 +373,6 @@ func (this *NodeService) UpdateNode(ctx context.Context, req *pb.UpdateNodeReque
}
}
// 同步DNS
go func() {
// TODO 只有状态变化的时候才需要同步
err := this.notifyNodeDNSChanged(req.NodeId)
if err != nil {
logs.Println("sync node DNS error: " + err.Error())
}
}()
return this.Success()
}
@@ -675,14 +635,6 @@ func (this *NodeService) StartNode(ctx context.Context, req *pb.StartNodeRequest
}, nil
}
// 同步DNS
go func() {
err := this.notifyNodeDNSChanged(req.NodeId)
if err != nil {
logs.Println("sync node DNS error: " + err.Error())
}
}()
return &pb.StartNodeResponse{IsOk: true}, nil
}
@@ -702,14 +654,6 @@ func (this *NodeService) StopNode(ctx context.Context, req *pb.StopNodeRequest)
}, nil
}
// 同步DNS
go func() {
err := this.notifyNodeDNSChanged(req.NodeId)
if err != nil {
logs.Println("sync node DNS error: " + err.Error())
}
}()
return &pb.StopNodeResponse{IsOk: true}, nil
}
@@ -1098,7 +1042,7 @@ func (this *NodeService) FindAllEnabledNodesDNSWithClusterId(ctx context.Context
}
result := []*pb.NodeDNSInfo{}
for _, node := range nodes {
ipAddr, err := models.SharedNodeIPAddressDAO.FindFirstNodeIPAddress(tx, int64(node.Id))
ipAddresses, err := models.SharedNodeIPAddressDAO.FindNodeAccessIPAddresses(tx, int64(node.Id))
if err != nil {
return nil, err
}
@@ -1121,12 +1065,21 @@ func (this *NodeService) FindAllEnabledNodesDNSWithClusterId(ctx context.Context
}
}
result = append(result, &pb.NodeDNSInfo{
Id: int64(node.Id),
Name: node.Name,
IpAddr: ipAddr,
Routes: pbRoutes,
})
for _, ipAddress := range ipAddresses {
ip := ipAddress.Ip
if len(ip) == 0 {
continue
}
if net.ParseIP(ip) == nil {
continue
}
result = append(result, &pb.NodeDNSInfo{
Id: int64(node.Id),
Name: node.Name,
IpAddr: ip,
Routes: pbRoutes,
})
}
}
return &pb.FindAllEnabledNodesDNSWithClusterIdResponse{Nodes: result}, nil
}
@@ -1150,7 +1103,7 @@ func (this *NodeService) FindEnabledNodeDNS(ctx context.Context, req *pb.FindEna
return &pb.FindEnabledNodeDNSResponse{Node: nil}, nil
}
ipAddr, err := models.SharedNodeIPAddressDAO.FindFirstNodeIPAddress(tx, int64(node.Id))
ipAddr, err := models.SharedNodeIPAddressDAO.FindFirstNodeAccessIPAddress(tx, int64(node.Id))
if err != nil {
return nil, err
}
@@ -1237,7 +1190,7 @@ func (this *NodeService) UpdateNodeDNS(ctx context.Context, req *pb.UpdateNodeDN
// 修改IP
if len(req.IpAddr) > 0 {
ipAddrId, err := models.SharedNodeIPAddressDAO.FindFirstNodeIPAddressId(tx, req.NodeId)
ipAddrId, err := models.SharedNodeIPAddressDAO.FindFirstNodeAccessIPAddressId(tx, req.NodeId)
if err != nil {
return nil, err
}
@@ -1257,50 +1210,6 @@ func (this *NodeService) UpdateNodeDNS(ctx context.Context, req *pb.UpdateNodeDN
return this.Success()
}
// 自动同步DNS状态
func (this *NodeService) notifyNodeDNSChanged(nodeId int64) error {
tx := this.NullTx()
clusterId, err := models.SharedNodeDAO.FindNodeClusterId(tx, nodeId)
if err != nil {
return err
}
dnsInfo, err := models.SharedNodeClusterDAO.FindClusterDNSInfo(tx, clusterId)
if err != nil {
return err
}
if dnsInfo == nil {
return nil
}
if len(dnsInfo.DnsName) == 0 || dnsInfo.DnsDomainId == 0 {
return nil
}
dnsConfig, err := dnsInfo.DecodeDNSConfig()
if err != nil {
return err
}
if !dnsConfig.NodesAutoSync {
return nil
}
// 执行同步
domainService := &DNSDomainService{}
resp, err := domainService.syncClusterDNS(&pb.SyncDNSDomainDataRequest{
DnsDomainId: int64(dnsInfo.DnsDomainId),
NodeClusterId: clusterId,
})
if err != nil {
return err
}
if !resp.IsOk {
err = models.SharedMessageDAO.CreateClusterMessage(tx, clusterId, models.MessageTypeClusterDNSSyncFailed, models.LevelError, "集群DNS同步失败"+resp.Error, nil)
if err != nil {
logs.Println("[NODE_SERVICE]" + err.Error())
}
}
return nil
}
// 计算某个区域下的节点数量
func (this *NodeService) CountAllEnabledNodesWithNodeRegionId(ctx context.Context, req *pb.CountAllEnabledNodesWithNodeRegionIdRequest) (*pb.RPCCountResponse, error) {
_, err := this.ValidateAdmin(ctx, 0)

View File

@@ -11,7 +11,6 @@ import (
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/types"
timeutil "github.com/iwind/TeaGo/utils/time"
@@ -119,17 +118,6 @@ func (this *ServerService) UpdateServerBasic(ctx context.Context, req *pb.Update
return nil, err
}
// 检查服务变化
oldIsOn := server.IsOn == 1
if oldIsOn != req.IsOn {
go func() {
err := this.notifyServerDNSChanged(req.ServerId)
if err != nil {
logs.Println("[DNS]notify server changed: " + err.Error())
}
}()
}
return this.Success()
}
@@ -373,7 +361,7 @@ func (this *ServerService) FindServerNames(ctx context.Context, req *pb.FindServ
}
}
serverNamesJSON, isAuditing, auditingServerNamesJSON, auditingResultJSON, err := models.SharedServerDAO.FindServerNames(tx, req.ServerId)
serverNamesJSON, isAuditing, auditingServerNamesJSON, auditingResultJSON, err := models.SharedServerDAO.FindServerServerNames(tx, req.ServerId)
if err != nil {
return nil, err
}
@@ -474,14 +462,6 @@ func (this *ServerService) UpdateServerNamesAuditing(ctx context.Context, req *p
}
}
// 通知服务更新
go func() {
err := this.notifyServerDNSChanged(req.ServerId)
if err != nil {
logs.Println("[DNS]notify server changed: " + err.Error())
}
}()
return this.Success()
}
@@ -1095,50 +1075,6 @@ func (this *ServerService) FindEnabledServerDNS(ctx context.Context, req *pb.Fin
}, nil
}
// 自动同步DNS状态
func (this *ServerService) notifyServerDNSChanged(serverId int64) error {
tx := this.NullTx()
clusterId, err := models.SharedServerDAO.FindServerClusterId(tx, serverId)
if err != nil {
return err
}
dnsInfo, err := models.SharedNodeClusterDAO.FindClusterDNSInfo(tx, clusterId)
if err != nil {
return err
}
if dnsInfo == nil {
return nil
}
if len(dnsInfo.DnsName) == 0 || dnsInfo.DnsDomainId == 0 {
return nil
}
dnsConfig, err := dnsInfo.DecodeDNSConfig()
if err != nil {
return err
}
if !dnsConfig.ServersAutoSync {
return nil
}
// 执行同步
domainService := &DNSDomainService{}
resp, err := domainService.syncClusterDNS(&pb.SyncDNSDomainDataRequest{
DnsDomainId: int64(dnsInfo.DnsDomainId),
NodeClusterId: clusterId,
})
if err != nil {
return err
}
if !resp.IsOk {
err = models.SharedMessageDAO.CreateClusterMessage(tx, clusterId, models.MessageTypeClusterDNSSyncFailed, models.LevelError, "集群DNS同步失败"+resp.Error, nil)
if err != nil {
logs.Println("[NODE_SERVICE]" + err.Error())
}
}
return nil
}
// 检查服务是否属于某个用户
func (this *ServerService) CheckUserServer(ctx context.Context, req *pb.CheckUserServerRequest) (*pb.RPCSuccess, error) {
userId, err := this.ValidateUser(ctx)

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,470 @@
package tasks
import (
"encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
dnsmodels "github.com/TeaOSLab/EdgeAPI/internal/db/models/dns"
"github.com/TeaOSLab/EdgeAPI/internal/dnsclients"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/lists"
"net"
"strings"
"time"
)
func init() {
dbs.OnReadyDone(func() {
go NewDNSTaskExecutor().Start()
})
}
// DNS任务执行器
type DNSTaskExecutor struct {
}
func NewDNSTaskExecutor() *DNSTaskExecutor {
return &DNSTaskExecutor{}
}
func (this *DNSTaskExecutor) Start() {
ticker := time.NewTicker(10 * time.Second)
for range ticker.C {
err := this.LoopWithLocker(10)
if err != nil {
remotelogs.Error("DNSTaskExecutor", err.Error())
}
}
}
func (this *DNSTaskExecutor) LoopWithLocker(seconds int64) error {
ok, err := models.SharedSysLockerDAO.Lock(nil, "dns_task_executor", seconds-1) // 假设执行时间为1秒
if err != nil {
return err
}
if !ok {
return nil
}
return this.Loop()
}
func (this *DNSTaskExecutor) Loop() error {
tasks, err := dnsmodels.SharedDNSTaskDAO.FindAllDoingTasks(nil)
if err != nil {
return err
}
for _, task := range tasks {
taskId := int64(task.Id)
switch task.Type {
case dnsmodels.DNSTaskTypeServerChange:
err = this.doServer(taskId, int64(task.ServerId))
if err != nil {
err = dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskError(nil, taskId, err.Error())
if err != nil {
return err
}
}
case dnsmodels.DNSTaskTypeNodeChange:
err = this.doNode(taskId, int64(task.NodeId))
if err != nil {
err = dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskError(nil, taskId, err.Error())
if err != nil {
return err
}
}
case dnsmodels.DNSTaskTypeClusterChange:
err = this.doCluster(taskId, int64(task.ClusterId))
if err != nil {
err = dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskError(nil, taskId, err.Error())
if err != nil {
return err
}
}
case dnsmodels.DNSTaskTypeDomainChange:
err = this.doDomain(taskId, int64(task.DomainId))
if err != nil {
err = dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskError(nil, taskId, err.Error())
if err != nil {
return err
}
}
}
}
return nil
}
func (this *DNSTaskExecutor) doServer(taskId int64, serverId int64) error {
var tx *dbs.Tx
isOk := false
defer func() {
if isOk {
err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(tx, taskId)
if err != nil {
remotelogs.Error("DNSTaskExecutor", err.Error())
}
}
}()
// 检查是否已通过审核
serverDNS, err := models.SharedServerDAO.FindStatelessServerDNS(tx, serverId)
if err != nil {
return err
}
if serverDNS == nil {
isOk = true
return nil
}
if len(serverDNS.DnsName) == 0 {
isOk = true
return nil
}
manager, domainId, domain, clusterDNSName, err := this.findDNSManager(tx, int64(serverDNS.ClusterId))
if err != nil {
return err
}
if manager == nil {
isOk = true
return nil
}
recordName := serverDNS.DnsName
recordValue := clusterDNSName + "." + domain + "."
recordRoute := manager.DefaultRoute()
recordType := dnsclients.RecordTypeCName
if serverDNS.State == models.ServerStateDisabled || serverDNS.IsOn == 0 {
// 检查记录是否已经存在
record, err := manager.QueryRecord(domain, recordName, recordType)
if err != nil {
return err
}
if record != nil {
// 删除
err = manager.DeleteRecord(domain, record)
if err != nil {
return err
}
err = dnsmodels.SharedDNSTaskDAO.CreateDomainTask(tx, domainId, dnsmodels.DNSTaskTypeDomainChange)
if err != nil {
return err
}
}
isOk = true
} else {
// 是否已存在
exist, err := dnsmodels.SharedDNSDomainDAO.ExistDomainRecord(tx, domainId, recordName, recordType, recordRoute, recordValue)
if err != nil {
return err
}
if exist {
isOk = true
return nil
}
// 检查记录是否已经存在
record, err := manager.QueryRecord(domain, recordName, recordType)
if err != nil {
return err
}
if record != nil {
if record.Value == recordValue || record.Value == strings.TrimRight(recordValue, ".") {
isOk = true
return nil
}
// 删除
err = manager.DeleteRecord(domain, record)
if err != nil {
return err
}
err = dnsmodels.SharedDNSTaskDAO.CreateDomainTask(tx, domainId, dnsmodels.DNSTaskTypeDomainChange)
if err != nil {
return err
}
}
err = manager.AddRecord(domain, &dnsclients.Record{
Id: "",
Name: recordName,
Type: recordType,
Value: recordValue,
Route: recordRoute,
})
if err != nil {
return err
}
err = dnsmodels.SharedDNSTaskDAO.CreateDomainTask(tx, domainId, dnsmodels.DNSTaskTypeDomainChange)
if err != nil {
return err
}
isOk = true
}
return nil
}
func (this *DNSTaskExecutor) doNode(taskId int64, nodeId int64) error {
isOk := false
defer func() {
if isOk {
err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId)
if err != nil {
remotelogs.Error("DNSTaskExecutor", err.Error())
}
}
}()
var tx *dbs.Tx
node, err := models.SharedNodeDAO.FindStatelessNodeDNS(tx, nodeId)
if err != nil {
return err
}
if node == nil {
isOk = true
return nil
}
if node.ClusterId == 0 {
isOk = true
return nil
}
// 转交给cluster统一处理
err = dnsmodels.SharedDNSTaskDAO.CreateClusterTask(tx, int64(node.ClusterId), dnsmodels.DNSTaskTypeClusterChange)
if err != nil {
return err
}
isOk = true
return nil
}
func (this *DNSTaskExecutor) doCluster(taskId int64, clusterId int64) error {
isOk := false
defer func() {
if isOk {
err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId)
if err != nil {
remotelogs.Error("DNSTaskExecutor", err.Error())
}
}
}()
var tx *dbs.Tx
manager, domainId, domain, clusterDNSName, err := this.findDNSManager(tx, clusterId)
if err != nil {
return err
}
if manager == nil {
isOk = true
return nil
}
// 以前的节点记录
records, err := manager.GetRecords(domain)
if err != nil {
return err
}
oldRecordsMap := map[string]*dnsclients.Record{} // route@value => record
for _, record := range records {
if record.Type == dnsclients.RecordTypeA && record.Name == clusterDNSName {
key := record.Route + "@" + record.Value
oldRecordsMap[key] = record
}
}
// 当前的节点记录
newRecordKeys := []string{}
nodes, err := models.SharedNodeDAO.FindAllEnabledNodesDNSWithClusterId(tx, clusterId)
if err != nil {
return err
}
isChanged := false
for _, node := range nodes {
routes, err := node.DNSRouteCodesForDomainId(domainId)
if err != nil {
return err
}
if len(routes) == 0 {
routes = []string{manager.DefaultRoute()}
}
// 所有的IP记录
ipAddresses, err := models.SharedNodeIPAddressDAO.FindAllEnabledAddressesWithNode(tx, int64(node.Id))
if err != nil {
return err
}
if len(ipAddresses) == 0 {
continue
}
for _, ipAddress := range ipAddresses {
ip := ipAddress.Ip
if len(ip) == 0 {
continue
}
if net.ParseIP(ip) == nil {
continue
}
for _, route := range routes {
key := route + "@" + ip
_, ok := oldRecordsMap[key]
if ok {
newRecordKeys = append(newRecordKeys, key)
continue
}
err = manager.AddRecord(domain, &dnsclients.Record{
Id: "",
Name: clusterDNSName,
Type: dnsclients.RecordTypeA,
Value: ip,
Route: route,
})
if err != nil {
return err
}
isChanged = true
newRecordKeys = append(newRecordKeys, key)
}
}
}
// 删除多余的节点解析记录
for key, record := range oldRecordsMap {
if !lists.ContainsString(newRecordKeys, key) {
err = manager.DeleteRecord(domain, record)
if err != nil {
return err
}
}
}
// 通知更新域名
if isChanged {
err = dnsmodels.SharedDNSTaskDAO.CreateDomainTask(tx, domainId, dnsmodels.DNSTaskTypeDomainChange)
if err != nil {
return err
}
}
isOk = true
return nil
}
func (this *DNSTaskExecutor) doDomain(taskId int64, domainId int64) error {
var tx *dbs.Tx
isOk := false
defer func() {
if isOk {
err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(tx, taskId)
if err != nil {
remotelogs.Error("DNSTaskExecutor", err.Error())
}
}
}()
dnsDomain, err := dnsmodels.SharedDNSDomainDAO.FindEnabledDNSDomain(tx, domainId)
if err != nil {
return err
}
if dnsDomain == nil {
isOk = true
return nil
}
providerId := int64(dnsDomain.ProviderId)
if providerId <= 0 {
isOk = true
return nil
}
provider, err := dnsmodels.SharedDNSProviderDAO.FindEnabledDNSProvider(tx, providerId)
if err != nil {
return err
}
if provider == nil {
isOk = true
return nil
}
manager := dnsclients.FindProvider(provider.Type)
if manager == nil {
remotelogs.Error("DNSTaskExecutor", "unsupported dns provider type '"+provider.Type+"'")
isOk = true
return nil
}
params, err := provider.DecodeAPIParams()
if err != nil {
return err
}
err = manager.Auth(params)
if err != nil {
return err
}
records, err := manager.GetRecords(dnsDomain.Name)
if err != nil {
return err
}
recordsJSON, err := json.Marshal(records)
if err != nil {
return err
}
err = dnsmodels.SharedDNSDomainDAO.UpdateDomainRecords(tx, domainId, recordsJSON)
if err != nil {
return err
}
isOk = true
return nil
}
func (this *DNSTaskExecutor) findDNSManager(tx *dbs.Tx, clusterId int64) (manager dnsclients.ProviderInterface, domainId int64, domain string, clusterDNSName string, err error) {
clusterDNS, err := models.SharedNodeClusterDAO.FindClusterDNSInfo(tx, clusterId)
if err != nil {
return nil, 0, "", "", err
}
if clusterDNS == nil || len(clusterDNS.DnsName) == 0 || clusterDNS.DnsDomainId <= 0 {
return nil, 0, "", "", nil
}
dnsDomain, err := dnsmodels.SharedDNSDomainDAO.FindEnabledDNSDomain(tx, int64(clusterDNS.DnsDomainId))
if err != nil {
return nil, 0, "", "", err
}
if dnsDomain == nil {
return nil, 0, "", "", nil
}
providerId := int64(dnsDomain.ProviderId)
if providerId <= 0 {
return nil, 0, "", "", nil
}
provider, err := dnsmodels.SharedDNSProviderDAO.FindEnabledDNSProvider(tx, providerId)
if err != nil {
return nil, 0, "", "", err
}
if provider == nil {
return nil, 0, "", "", nil
}
manager = dnsclients.FindProvider(provider.Type)
if manager == nil {
remotelogs.Error("DNSTaskExecutor", "unsupported dns provider type '"+provider.Type+"'")
return nil, 0, "", "", nil
}
params, err := provider.DecodeAPIParams()
if err != nil {
return nil, 0, "", "", err
}
err = manager.Auth(params)
if err != nil {
return nil, 0, "", "", err
}
return manager, int64(dnsDomain.Id), dnsDomain.Name, clusterDNS.DnsName, nil
}

View File

@@ -0,0 +1,17 @@
package tasks
import (
"github.com/iwind/TeaGo/dbs"
"testing"
)
func TestDNSTaskExecutor_Loop(t *testing.T) {
dbs.NotifyReady()
executor := NewDNSTaskExecutor()
err := executor.Loop()
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}

View File

@@ -4,8 +4,8 @@ import (
"crypto/tls"
"encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/dns"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/events"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/logs"
@@ -56,7 +56,7 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) {
Node: node,
}
ipAddr, err := models.NewNodeIPAddressDAO().FindFirstNodeIPAddress(nil, int64(node.Id))
ipAddr, err := models.NewNodeIPAddressDAO().FindFirstNodeAccessIPAddress(nil, int64(node.Id))
if err != nil {
return nil, err
}
@@ -133,11 +133,10 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) {
if err != nil {
logs.Println("[HEALTH_CHECK]" + err.Error())
} else if isChanged {
// 通知更新
select {
case events.NodeDNSChanges <- int64(result.Node.Id):
default:
// 通知DNS更新
err = dns.SharedDNSTaskDAO.CreateNodeTask(nil, int64(result.Node.Id), dns.DNSTaskTypeNodeChange)
if err != nil {
logs.Println("[HEALTH_CHECK]" + err.Error())
}
// 通知恢复或下线

View File

@@ -8,7 +8,7 @@ import (
)
func init() {
dbs.OnReady(func() {
dbs.OnReadyDone(func() {
go NewNodeTaskExtractor().Start()
})
}