数据有更改时发送通知

This commit is contained in:
GoEdgeLab
2021-08-08 15:47:48 +08:00
parent 07aa0990f8
commit e979414973
40 changed files with 548 additions and 185 deletions

View File

@@ -3,6 +3,7 @@ package models
import ( import (
"github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/utils" "github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
@@ -302,7 +303,7 @@ func (this *IPItemDAO) NotifyUpdate(tx *dbs.Tx, itemId int64) error {
if len(resultClusterIds) > 0 { if len(resultClusterIds) > 0 {
for _, clusterId := range resultClusterIds { for _, clusterId := range resultClusterIds {
err = SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, NodeTaskTypeIPItemChanged) err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeIPItemChanged)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -2,6 +2,7 @@ package models
import ( import (
"github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/ipconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/ipconfigs"
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
@@ -253,7 +254,7 @@ func (this *IPListDAO) NotifyUpdate(tx *dbs.Tx, listId int64, taskType NodeTaskT
if len(resultClusterIds) > 0 { if len(resultClusterIds) > 0 {
for _, clusterId := range resultClusterIds { for _, clusterId := range resultClusterIds {
err = SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, taskType) err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, taskType)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -3,6 +3,7 @@ package models
import ( import (
"encoding/json" "encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
@@ -308,7 +309,7 @@ func (this *MetricItemDAO) NotifyUpdate(tx *dbs.Tx, itemId int64, isPublic bool)
return err return err
} }
for _, clusterId := range clusterIds { for _, clusterId := range clusterIds {
err = SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, NodeTaskTypeConfigChanged) err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeConfigChanged)
if err != nil { if err != nil {
return err return err
} }
@@ -320,7 +321,7 @@ func (this *MetricItemDAO) NotifyUpdate(tx *dbs.Tx, itemId int64, isPublic bool)
return err return err
} }
for _, clusterId := range clusterIds { for _, clusterId := range clusterIds {
err = SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, NodeTaskTypeConfigChanged) err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeConfigChanged)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -1 +0,0 @@
package nameservers

View File

@@ -3,6 +3,7 @@ package nameservers
import ( import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
@@ -35,12 +36,15 @@ func init() {
} }
// EnableNSDomain 启用条目 // EnableNSDomain 启用条目
func (this *NSDomainDAO) EnableNSDomain(tx *dbs.Tx, id int64) error { func (this *NSDomainDAO) EnableNSDomain(tx *dbs.Tx, domainId int64) error {
_, err := this.Query(tx). _, err := this.Query(tx).
Pk(id). Pk(domainId).
Set("state", NSDomainStateEnabled). Set("state", NSDomainStateEnabled).
Update() Update()
return err if err != nil {
return err
}
return this.NotifyUpdate(tx, domainId)
} }
// DisableNSDomain 禁用条目 // DisableNSDomain 禁用条目
@@ -55,7 +59,10 @@ func (this *NSDomainDAO) DisableNSDomain(tx *dbs.Tx, domainId int64) error {
Set("state", NSDomainStateDisabled). Set("state", NSDomainStateDisabled).
Set("version", version). Set("version", version).
Update() Update()
return err if err != nil {
return err
}
return this.NotifyUpdate(tx, domainId)
} }
// FindEnabledNSDomain 查找启用中的条目 // FindEnabledNSDomain 查找启用中的条目
@@ -92,7 +99,16 @@ func (this *NSDomainDAO) CreateDomain(tx *dbs.Tx, clusterId int64, userId int64,
op.Version = version op.Version = version
op.IsOn = true op.IsOn = true
op.State = NSDomainStateEnabled op.State = NSDomainStateEnabled
return this.SaveInt64(tx, op) domainId, err := this.SaveInt64(tx, op)
if err != nil {
return 0, err
}
err = this.NotifyUpdate(tx, domainId)
if err != nil {
return domainId, err
}
return domainId, nil
} }
// UpdateDomain 修改域名 // UpdateDomain 修改域名
@@ -101,6 +117,14 @@ func (this *NSDomainDAO) UpdateDomain(tx *dbs.Tx, domainId int64, clusterId int6
return errors.New("invalid domainId") return errors.New("invalid domainId")
} }
oldClusterId, err := this.Query(tx).
Pk(domainId).
Result("clusterId").
FindInt64Col(0)
if err != nil {
return err
}
version, err := this.IncreaseVersion(tx) version, err := this.IncreaseVersion(tx)
if err != nil { if err != nil {
return err return err
@@ -112,7 +136,20 @@ func (this *NSDomainDAO) UpdateDomain(tx *dbs.Tx, domainId int64, clusterId int6
op.UserId = userId op.UserId = userId
op.IsOn = isOn op.IsOn = isOn
op.Version = version op.Version = version
return this.Save(tx, op) err = this.Save(tx, op)
if err != nil {
return err
}
// 通知更新
if oldClusterId > 0 && oldClusterId != clusterId {
err = models.SharedNSClusterDAO.NotifyUpdate(tx, oldClusterId)
if err != nil {
return err
}
}
return this.NotifyUpdate(tx, domainId)
} }
// CountAllEnabledDomains 计算域名数量 // CountAllEnabledDomains 计算域名数量
@@ -121,7 +158,7 @@ func (this *NSDomainDAO) CountAllEnabledDomains(tx *dbs.Tx, clusterId int64, use
if clusterId > 0 { if clusterId > 0 {
query.Attr("clusterId", clusterId) query.Attr("clusterId", clusterId)
} else { } else {
query.Where("clusterId IN (SELECT id FROM " + SharedNSClusterDAO.Table + " WHERE state=1)") query.Where("clusterId IN (SELECT id FROM " + models.SharedNSClusterDAO.Table + " WHERE state=1)")
} }
if userId > 0 { if userId > 0 {
query.Attr("userId", userId) query.Attr("userId", userId)
@@ -144,7 +181,7 @@ func (this *NSDomainDAO) ListEnabledDomains(tx *dbs.Tx, clusterId int64, userId
if clusterId > 0 { if clusterId > 0 {
query.Attr("clusterId", clusterId) query.Attr("clusterId", clusterId)
} else { } else {
query.Where("clusterId IN (SELECT id FROM " + SharedNSClusterDAO.Table + " WHERE state=1)") query.Where("clusterId IN (SELECT id FROM " + models.SharedNSClusterDAO.Table + " WHERE state=1)")
} }
if userId > 0 { if userId > 0 {
query.Attr("userId", userId) query.Attr("userId", userId)
@@ -214,22 +251,39 @@ func (this *NSDomainDAO) UpdateDomainTSIG(tx *dbs.Tx, domainId int64, tsigJSON [
return err return err
} }
return this.Query(tx). err = this.Query(tx).
Pk(domainId). Pk(domainId).
Set("tsig", tsigJSON). Set("tsig", tsigJSON).
Set("version", version). Set("version", version).
UpdateQuickly() UpdateQuickly()
}
// NotifyUpdate 通知更改
func (this *NSDomainDAO) NotifyUpdate(tx *dbs.Tx, domainId int64) error {
version, err := this.IncreaseVersion(tx)
if err != nil { if err != nil {
return err return err
} }
return this.NotifyUpdate(tx, domainId)
}
// FindEnabledDomainClusterId 获取域名的集群ID
func (this *NSDomainDAO) FindEnabledDomainClusterId(tx *dbs.Tx, domainId int64) (int64, error) {
return this.Query(tx). return this.Query(tx).
Pk(domainId). Pk(domainId).
Set("version", version). State(NSDomainStateEnabled).
UpdateQuickly() Result("clusterId").
FindInt64Col(0)
}
// NotifyUpdate 通知更改
func (this *NSDomainDAO) NotifyUpdate(tx *dbs.Tx, domainId int64) error {
clusterId, err := this.Query(tx).
Result("clusterId").
Pk(domainId).
FindInt64Col(0)
if err != nil {
return err
}
if clusterId > 0 {
return models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, models.NSNodeTaskTypeDomainChanged)
}
return nil
} }

View File

@@ -4,6 +4,7 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeCommon/pkg/dnsconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/dnsconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
@@ -176,8 +177,33 @@ func (this *NSKeyDAO) NotifyUpdate(tx *dbs.Tx, keyId int64) error {
if err != nil { if err != nil {
return err return err
} }
return this.Query(tx). err = this.Query(tx).
Pk(keyId). Pk(keyId).
Set("version", version). Set("version", version).
UpdateQuickly() UpdateQuickly()
if err != nil {
return err
}
// 通知集群
domainId, err := this.Query(tx).
Pk(keyId).
Result("domainId").
FindInt64Col(0)
if err != nil {
return err
}
if domainId > 0 {
clusterId, err := SharedNSDomainDAO.FindEnabledDomainClusterId(tx, domainId)
if err != nil {
return err
}
if clusterId > 0 {
err = models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, models.NSNodeTaskTypeKeyChanged)
if err != nil {
return err
}
}
}
return nil
} }

View File

@@ -5,6 +5,7 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeCommon/pkg/dnsconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/dnsconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
@@ -38,27 +39,33 @@ func init() {
} }
// EnableNSRecord 启用条目 // EnableNSRecord 启用条目
func (this *NSRecordDAO) EnableNSRecord(tx *dbs.Tx, id uint64) error { func (this *NSRecordDAO) EnableNSRecord(tx *dbs.Tx, recordId int64) error {
_, err := this.Query(tx). _, err := this.Query(tx).
Pk(id). Pk(recordId).
Set("state", NSRecordStateEnabled). Set("state", NSRecordStateEnabled).
Update() Update()
return err if err != nil {
return err
}
return this.NotifyUpdate(tx, recordId)
} }
// DisableNSRecord 禁用条目 // DisableNSRecord 禁用条目
func (this *NSRecordDAO) DisableNSRecord(tx *dbs.Tx, id int64) error { func (this *NSRecordDAO) DisableNSRecord(tx *dbs.Tx, recordId int64) error {
version, err := this.IncreaseVersion(tx) version, err := this.IncreaseVersion(tx)
if err != nil { if err != nil {
return err return err
} }
_, err = this.Query(tx). _, err = this.Query(tx).
Pk(id). Pk(recordId).
Set("state", NSRecordStateDisabled). Set("state", NSRecordStateDisabled).
Set("version", version). Set("version", version).
Update() Update()
return err if err != nil {
return err
}
return this.NotifyUpdate(tx, recordId)
} }
// FindEnabledNSRecord 查找启用中的条目 // FindEnabledNSRecord 查找启用中的条目
@@ -109,7 +116,16 @@ func (this *NSRecordDAO) CreateRecord(tx *dbs.Tx, domainId int64, description st
op.IsOn = true op.IsOn = true
op.State = NSRecordStateEnabled op.State = NSRecordStateEnabled
op.Version = version op.Version = version
return this.SaveInt64(tx, op) recordId, err := this.SaveInt64(tx, op)
if err != nil {
return 0, err
}
err = this.NotifyUpdate(tx, recordId)
if err != nil {
return 0, err
}
return recordId, nil
} }
// UpdateRecord 修改记录 // UpdateRecord 修改记录
@@ -144,7 +160,12 @@ func (this *NSRecordDAO) UpdateRecord(tx *dbs.Tx, recordId int64, description st
op.Version = version op.Version = version
return this.Save(tx, op) err = this.Save(tx, op)
if err != nil {
return err
}
return this.NotifyUpdate(tx, recordId)
} }
// CountAllEnabledDomainRecords 计算域名中记录数量 // CountAllEnabledDomainRecords 计算域名中记录数量
@@ -230,3 +251,31 @@ func (this *NSRecordDAO) FindEnabledRecordWithName(tx *dbs.Tx, domainId int64, r
} }
return record.(*NSRecord), nil return record.(*NSRecord), nil
} }
// NotifyUpdate 通知更新
func (this *NSRecordDAO) NotifyUpdate(tx *dbs.Tx, recordId int64) error {
domainId, err := this.Query(tx).
Pk(recordId).
Result("domainId").
FindInt64Col(0)
if err != nil {
return err
}
if domainId == 0 {
return nil
}
clusterId, err := SharedNSDomainDAO.FindEnabledDomainClusterId(tx, domainId)
if err != nil {
return err
}
if clusterId > 0 {
err = models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, models.NSNodeTaskTypeRecordChanged)
if err != nil {
return err
}
}
return nil
}

View File

@@ -3,6 +3,7 @@ package nameservers
import ( import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
@@ -35,12 +36,21 @@ func init() {
} }
// EnableNSRoute 启用条目 // EnableNSRoute 启用条目
func (this *NSRouteDAO) EnableNSRoute(tx *dbs.Tx, id int64) error { func (this *NSRouteDAO) EnableNSRoute(tx *dbs.Tx, routeId int64) error {
_, err := this.Query(tx). version, err := this.IncreaseVersion(tx)
Pk(id). if err != nil {
return err
}
_, err = this.Query(tx).
Pk(routeId).
Set("state", NSRouteStateEnabled). Set("state", NSRouteStateEnabled).
Set("version", version).
Update() Update()
return err if err != nil {
return err
}
return this.NotifyUpdate(tx)
} }
// DisableNSRoute 禁用条目 // DisableNSRoute 禁用条目
@@ -55,7 +65,10 @@ func (this *NSRouteDAO) DisableNSRoute(tx *dbs.Tx, routeId int64) error {
Set("state", NSRouteStateDisabled). Set("state", NSRouteStateDisabled).
Set("version", version). Set("version", version).
Update() Update()
return err if err != nil {
return err
}
return this.NotifyUpdate(tx)
} }
// FindEnabledNSRoute 查找启用中的条目 // FindEnabledNSRoute 查找启用中的条目
@@ -98,7 +111,17 @@ func (this *NSRouteDAO) CreateRoute(tx *dbs.Tx, clusterId int64, domainId int64,
op.IsOn = true op.IsOn = true
op.State = NSRouteStateEnabled op.State = NSRouteStateEnabled
op.Version = version op.Version = version
return this.SaveInt64(tx, op) routeId, err := this.SaveInt64(tx, op)
if err != nil {
return 0, err
}
err = this.NotifyUpdate(tx)
if err != nil {
return 0, err
}
return routeId, nil
} }
// UpdateRoute 修改线路 // UpdateRoute 修改线路
@@ -123,7 +146,12 @@ func (this *NSRouteDAO) UpdateRoute(tx *dbs.Tx, routeId int64, name string, rang
op.Version = version op.Version = version
return this.Save(tx, op) err = this.Save(tx, op)
if err != nil {
return err
}
return this.NotifyUpdate(tx)
} }
// UpdateRouteOrders 修改线路排序 // UpdateRouteOrders 修改线路排序
@@ -145,7 +173,8 @@ func (this *NSRouteDAO) UpdateRouteOrders(tx *dbs.Tx, routeIds []int64) error {
} }
order-- order--
} }
return nil
return this.NotifyUpdate(tx)
} }
// FindAllEnabledRoutes 列出所有线路 // FindAllEnabledRoutes 列出所有线路
@@ -190,3 +219,19 @@ func (this *NSRouteDAO) ListRoutesAfterVersion(tx *dbs.Tx, version int64, size i
FindAll() FindAll()
return return
} }
// NotifyUpdate 通知更新
func (this *NSRouteDAO) NotifyUpdate(tx *dbs.Tx) error {
// 线路变更时所有集群都要更新
clusterIds, err := models.SharedNSClusterDAO.FindAllEnabledClusterIds(tx)
if err != nil {
return err
}
for _, clusterId := range clusterIds {
err = models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, models.NSNodeTaskTypeRouteChanged)
if err != nil {
return err
}
}
return nil
}

View File

@@ -869,7 +869,7 @@ func (this *NodeClusterDAO) FindEnabledNodeClustersWithIds(tx *dbs.Tx, clusterId
// NotifyUpdate 通知更新 // NotifyUpdate 通知更新
func (this *NodeClusterDAO) NotifyUpdate(tx *dbs.Tx, clusterId int64) error { func (this *NodeClusterDAO) NotifyUpdate(tx *dbs.Tx, clusterId int64) error {
return SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, NodeTaskTypeConfigChanged) return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeConfigChanged)
} }
// NotifyDNSUpdate 通知DNS更新 // NotifyDNSUpdate 通知DNS更新

View File

@@ -2,6 +2,7 @@ package models
import ( import (
"github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
@@ -158,5 +159,5 @@ func (this *NodeClusterMetricItemDAO) ExistsClusterItem(tx *dbs.Tx, clusterId in
// NotifyUpdate 通知更新 // NotifyUpdate 通知更新
func (this *NodeClusterMetricItemDAO) NotifyUpdate(tx *dbs.Tx, clusterId int64) error { func (this *NodeClusterMetricItemDAO) NotifyUpdate(tx *dbs.Tx, clusterId int64) error {
return SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, NodeTaskTypeConfigChanged) return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeConfigChanged)
} }

View File

@@ -1290,7 +1290,7 @@ func (this *NodeDAO) NotifyUpdate(tx *dbs.Tx, nodeId int64) error {
return err return err
} }
if clusterId > 0 { if clusterId > 0 {
return SharedNodeTaskDAO.CreateNodeTask(tx, clusterId, nodeId, NodeTaskTypeConfigChanged) return SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, NodeTaskTypeConfigChanged)
} }
return nil return nil
} }

View File

@@ -1,8 +1,8 @@
package models package models
import ( import (
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils" "github.com/TeaOSLab/EdgeCommon/pkg/configutils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
@@ -17,6 +17,14 @@ const (
NodeTaskTypeConfigChanged NodeTaskType = "configChanged" NodeTaskTypeConfigChanged NodeTaskType = "configChanged"
NodeTaskTypeIPItemChanged NodeTaskType = "ipItemChanged" NodeTaskTypeIPItemChanged NodeTaskType = "ipItemChanged"
NodeTaskTypeNodeVersionChanged NodeTaskType = "nodeVersionChanged" NodeTaskTypeNodeVersionChanged NodeTaskType = "nodeVersionChanged"
// NS相关
NSNodeTaskTypeConfigChanged NodeTaskType = "nsConfigChanged"
NSNodeTaskTypeDomainChanged NodeTaskType = "nsDomainChanged"
NSNodeTaskTypeRecordChanged NodeTaskType = "nsRecordChanged"
NSNodeTaskTypeRouteChanged NodeTaskType = "nsRouteChanged"
NSNodeTaskTypeKeyChanged NodeTaskType = "nsKeyChanged"
) )
type NodeTaskDAO dbs.DAO type NodeTaskDAO dbs.DAO
@@ -41,14 +49,15 @@ func init() {
} }
// CreateNodeTask 创建单个节点任务 // CreateNodeTask 创建单个节点任务
func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, clusterId int64, nodeId int64, taskType NodeTaskType) error { func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, role string, clusterId int64, nodeId int64, taskType NodeTaskType) error {
if clusterId <= 0 || nodeId <= 0 { if clusterId <= 0 || nodeId <= 0 {
return nil return nil
} }
uniqueId := numberutils.FormatInt64(nodeId) + "@node@" + taskType uniqueId := role + "@" + types.String(nodeId) + "@node@" + taskType
updatedAt := time.Now().Unix() updatedAt := time.Now().Unix()
_, _, err := this.Query(tx). _, _, err := this.Query(tx).
InsertOrUpdate(maps.Map{ InsertOrUpdate(maps.Map{
"role": role,
"clusterId": clusterId, "clusterId": clusterId,
"nodeId": nodeId, "nodeId": nodeId,
"type": taskType, "type": taskType,
@@ -68,15 +77,16 @@ func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, clusterId int64, nodeId int6
} }
// CreateClusterTask 创建集群任务 // CreateClusterTask 创建集群任务
func (this *NodeTaskDAO) CreateClusterTask(tx *dbs.Tx, clusterId int64, taskType NodeTaskType) error { func (this *NodeTaskDAO) CreateClusterTask(tx *dbs.Tx, role string, clusterId int64, taskType NodeTaskType) error {
if clusterId <= 0 { if clusterId <= 0 {
return nil return nil
} }
uniqueId := numberutils.FormatInt64(clusterId) + "@cluster@" + taskType uniqueId := role + "@" + types.String(clusterId) + "@cluster@" + taskType
updatedAt := time.Now().Unix() updatedAt := time.Now().Unix()
_, _, err := this.Query(tx). _, _, err := this.Query(tx).
InsertOrUpdate(maps.Map{ InsertOrUpdate(maps.Map{
"role": role,
"clusterId": clusterId, "clusterId": clusterId,
"nodeId": 0, "nodeId": 0,
"type": taskType, "type": taskType,
@@ -96,14 +106,15 @@ func (this *NodeTaskDAO) CreateClusterTask(tx *dbs.Tx, clusterId int64, taskType
return err return err
} }
// ExtractClusterTask 分解集群任务 // ExtractNodeClusterTask 分解边缘节点集群任务
func (this *NodeTaskDAO) ExtractClusterTask(tx *dbs.Tx, clusterId int64, taskType NodeTaskType) error { func (this *NodeTaskDAO) ExtractNodeClusterTask(tx *dbs.Tx, clusterId int64, taskType NodeTaskType) error {
nodeIds, err := SharedNodeDAO.FindAllNodeIdsMatch(tx, clusterId, true, configutils.BoolStateYes) nodeIds, err := SharedNodeDAO.FindAllNodeIdsMatch(tx, clusterId, true, configutils.BoolStateYes)
if err != nil { if err != nil {
return err return err
} }
_, err = this.Query(tx). _, err = this.Query(tx).
Attr("role", nodeconfigs.NodeRoleNode).
Attr("clusterId", clusterId). Attr("clusterId", clusterId).
Param("clusterIdString", types.String(clusterId)). Param("clusterIdString", types.String(clusterId)).
Where("nodeId> 0"). Where("nodeId> 0").
@@ -114,13 +125,52 @@ func (this *NodeTaskDAO) ExtractClusterTask(tx *dbs.Tx, clusterId int64, taskTyp
} }
for _, nodeId := range nodeIds { for _, nodeId := range nodeIds {
err = this.CreateNodeTask(tx, clusterId, nodeId, taskType) err = this.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, taskType)
if err != nil { if err != nil {
return err return err
} }
} }
_, err = this.Query(tx). _, err = this.Query(tx).
Attr("role", nodeconfigs.NodeRoleNode).
Attr("clusterId", clusterId).
Attr("nodeId", 0).
Attr("type", taskType).
Delete()
if err != nil {
return err
}
return nil
}
// ExtractNSClusterTask 分解NS节点集群任务
func (this *NodeTaskDAO) ExtractNSClusterTask(tx *dbs.Tx, clusterId int64, taskType NodeTaskType) error {
nodeIds, err := SharedNSNodeDAO.FindAllNodeIdsMatch(tx, clusterId, true, configutils.BoolStateYes)
if err != nil {
return err
}
_, err = this.Query(tx).
Attr("role", nodeconfigs.NodeRoleDNS).
Attr("clusterId", clusterId).
Param("clusterIdString", types.String(clusterId)).
Where("nodeId> 0").
Attr("type", taskType).
Delete()
if err != nil {
return err
}
for _, nodeId := range nodeIds {
err = this.CreateNodeTask(tx, nodeconfigs.NodeRoleDNS, clusterId, nodeId, taskType)
if err != nil {
return err
}
}
_, err = this.Query(tx).
Attr("role", nodeconfigs.NodeRoleDNS).
Attr("clusterId", clusterId). Attr("clusterId", clusterId).
Attr("nodeId", 0). Attr("nodeId", 0).
Attr("type", taskType). Attr("type", taskType).
@@ -133,8 +183,9 @@ func (this *NodeTaskDAO) ExtractClusterTask(tx *dbs.Tx, clusterId int64, taskTyp
} }
// ExtractAllClusterTasks 分解所有集群任务 // ExtractAllClusterTasks 分解所有集群任务
func (this *NodeTaskDAO) ExtractAllClusterTasks(tx *dbs.Tx) error { func (this *NodeTaskDAO) ExtractAllClusterTasks(tx *dbs.Tx, role string) error {
ones, err := this.Query(tx). ones, err := this.Query(tx).
Attr("role", role).
Attr("nodeId", 0). Attr("nodeId", 0).
FindAll() FindAll()
if err != nil { if err != nil {
@@ -142,36 +193,47 @@ func (this *NodeTaskDAO) ExtractAllClusterTasks(tx *dbs.Tx) error {
} }
for _, one := range ones { for _, one := range ones {
clusterId := int64(one.(*NodeTask).ClusterId) clusterId := int64(one.(*NodeTask).ClusterId)
err = this.ExtractClusterTask(tx, clusterId, one.(*NodeTask).Type) switch role {
if err != nil { case nodeconfigs.NodeRoleNode:
return err err = this.ExtractNodeClusterTask(tx, clusterId, one.(*NodeTask).Type)
if err != nil {
return err
}
case nodeconfigs.NodeRoleDNS:
err = this.ExtractNSClusterTask(tx, clusterId, one.(*NodeTask).Type)
if err != nil {
return err
}
} }
} }
return nil return nil
} }
// DeleteAllClusterTasks 删除集群所有相关任务 // DeleteAllClusterTasks 删除集群所有相关任务
func (this *NodeTaskDAO) DeleteAllClusterTasks(tx *dbs.Tx, clusterId int64) error { func (this *NodeTaskDAO) DeleteAllClusterTasks(tx *dbs.Tx, role string, clusterId int64) error {
_, err := this.Query(tx). _, err := this.Query(tx).
Attr("role", role).
Attr("clusterId", clusterId). Attr("clusterId", clusterId).
Delete() Delete()
return err return err
} }
// DeleteNodeTasks 删除节点相关任务 // DeleteNodeTasks 删除节点相关任务
func (this *NodeTaskDAO) DeleteNodeTasks(tx *dbs.Tx, nodeId int64) error { func (this *NodeTaskDAO) DeleteNodeTasks(tx *dbs.Tx, role string, nodeId int64) error {
_, err := this.Query(tx). _, err := this.Query(tx).
Attr("role", role).
Attr("nodeId", nodeId). Attr("nodeId", nodeId).
Delete() Delete()
return err return err
} }
// FindDoingNodeTasks 查询一个节点的所有任务 // FindDoingNodeTasks 查询一个节点的所有任务
func (this *NodeTaskDAO) FindDoingNodeTasks(tx *dbs.Tx, nodeId int64) (result []*NodeTask, err error) { func (this *NodeTaskDAO) FindDoingNodeTasks(tx *dbs.Tx, role string, nodeId int64) (result []*NodeTask, err error) {
if nodeId <= 0 { if nodeId <= 0 {
return return
} }
_, err = this.Query(tx). _, err = this.Query(tx).
Attr("role", role).
Attr("nodeId", nodeId). Attr("nodeId", nodeId).
Where("(isDone=0 OR (isDone=1 AND isOk=0))"). Where("(isDone=0 OR (isDone=1 AND isOk=0))").
Slice(&result). Slice(&result).
@@ -191,9 +253,10 @@ func (this *NodeTaskDAO) UpdateNodeTaskDone(tx *dbs.Tx, taskId int64, isOk bool,
} }
// FindAllDoingTaskClusterIds 查找正在更新的集群IDs // FindAllDoingTaskClusterIds 查找正在更新的集群IDs
func (this *NodeTaskDAO) FindAllDoingTaskClusterIds(tx *dbs.Tx) ([]int64, error) { func (this *NodeTaskDAO) FindAllDoingTaskClusterIds(tx *dbs.Tx, role string) ([]int64, error) {
ones, _, err := this.Query(tx). ones, _, err := this.Query(tx).
Result("DISTINCT(clusterId) AS clusterId"). Result("DISTINCT(clusterId) AS clusterId").
Attr("role", role).
Where("(nodeId=0 OR (isDone=0 OR (isDone=1 AND isOk=0)))"). Where("(nodeId=0 OR (isDone=0 OR (isDone=1 AND isOk=0)))").
FindOnes() FindOnes()
if err != nil { if err != nil {
@@ -207,8 +270,9 @@ func (this *NodeTaskDAO) FindAllDoingTaskClusterIds(tx *dbs.Tx) ([]int64, error)
} }
// FindAllDoingNodeTasksWithClusterId 查询某个集群下所有的任务 // FindAllDoingNodeTasksWithClusterId 查询某个集群下所有的任务
func (this *NodeTaskDAO) FindAllDoingNodeTasksWithClusterId(tx *dbs.Tx, clusterId int64) (result []*NodeTask, err error) { func (this *NodeTaskDAO) FindAllDoingNodeTasksWithClusterId(tx *dbs.Tx, role string, clusterId int64) (result []*NodeTask, err error) {
_, err = this.Query(tx). _, err = this.Query(tx).
Attr("role", role).
Attr("clusterId", clusterId). Attr("clusterId", clusterId).
Gt("nodeId", 0). Gt("nodeId", 0).
Where("(isDone=0 OR (isDone=1 AND isOk=0))"). Where("(isDone=0 OR (isDone=1 AND isOk=0))").
@@ -220,17 +284,37 @@ func (this *NodeTaskDAO) FindAllDoingNodeTasksWithClusterId(tx *dbs.Tx, clusterI
return return
} }
// FindAllDoingNodeIds 查询有任务的节点IDs
func (this *NodeTaskDAO) FindAllDoingNodeIds(tx *dbs.Tx, role string) ([]int64, error) {
ones, err := this.Query(tx).
Result("DISTINCT(nodeId) AS nodeId").
Attr("role", role).
Gt("nodeId", 0).
Attr("isDone", false).
FindAll()
if err != nil {
return nil, err
}
var result []int64
for _, one := range ones {
result = append(result, int64(one.(*NodeTask).NodeId))
}
return result, nil
}
// ExistsDoingNodeTasks 检查是否有正在执行的任务 // ExistsDoingNodeTasks 检查是否有正在执行的任务
func (this *NodeTaskDAO) ExistsDoingNodeTasks(tx *dbs.Tx) (bool, error) { func (this *NodeTaskDAO) ExistsDoingNodeTasks(tx *dbs.Tx, role string) (bool, error) {
return this.Query(tx). return this.Query(tx).
Attr("role", role).
Where("(isDone=0 OR (isDone=1 AND isOk=0))"). Where("(isDone=0 OR (isDone=1 AND isOk=0))").
Gt("nodeId", 0). Gt("nodeId", 0).
Exist() Exist()
} }
// ExistsErrorNodeTasks 是否有错误的任务 // ExistsErrorNodeTasks 是否有错误的任务
func (this *NodeTaskDAO) ExistsErrorNodeTasks(tx *dbs.Tx) (bool, error) { func (this *NodeTaskDAO) ExistsErrorNodeTasks(tx *dbs.Tx, role string) (bool, error) {
return this.Query(tx). return this.Query(tx).
Attr("role", role).
Where("(isDone=1 AND isOk=0)"). Where("(isDone=1 AND isOk=0)").
Exist() Exist()
} }
@@ -244,16 +328,18 @@ func (this *NodeTaskDAO) DeleteNodeTask(tx *dbs.Tx, taskId int64) error {
} }
// CountDoingNodeTasks 计算正在执行的任务 // CountDoingNodeTasks 计算正在执行的任务
func (this *NodeTaskDAO) CountDoingNodeTasks(tx *dbs.Tx) (int64, error) { func (this *NodeTaskDAO) CountDoingNodeTasks(tx *dbs.Tx, role string) (int64, error) {
return this.Query(tx). return this.Query(tx).
Attr("isDone", 0). Attr("isDone", 0).
Attr("role", role).
Gt("nodeId", 0). Gt("nodeId", 0).
Count() Count()
} }
// FindNotifyingNodeTasks 查找需要通知的任务 // FindNotifyingNodeTasks 查找需要通知的任务
func (this *NodeTaskDAO) FindNotifyingNodeTasks(tx *dbs.Tx, size int64) (result []*NodeTask, err error) { func (this *NodeTaskDAO) FindNotifyingNodeTasks(tx *dbs.Tx, role string, size int64) (result []*NodeTask, err error) {
_, err = this.Query(tx). _, err = this.Query(tx).
Attr("role", role).
Gt("nodeId", 0). Gt("nodeId", 0).
Attr("isNotified", 0). Attr("isNotified", 0).
Attr("isDone", 0). Attr("isDone", 0).

View File

@@ -1,8 +1,9 @@
package models package models
// 节点同步任务 // NodeTask 节点同步任务
type NodeTask struct { type NodeTask struct {
Id uint64 `field:"id"` // ID Id uint64 `field:"id"` // ID
Role string `field:"role"` // 节点角色
NodeId uint32 `field:"nodeId"` // 节点ID NodeId uint32 `field:"nodeId"` // 节点ID
ClusterId uint32 `field:"clusterId"` // 集群ID ClusterId uint32 `field:"clusterId"` // 集群ID
Type string `field:"type"` // 任务类型 Type string `field:"type"` // 任务类型
@@ -16,6 +17,7 @@ type NodeTask struct {
type NodeTaskOperator struct { type NodeTaskOperator struct {
Id interface{} // ID Id interface{} // ID
Role interface{} // 节点角色
NodeId interface{} // 节点ID NodeId interface{} // 节点ID
ClusterId interface{} // 集群ID ClusterId interface{} // 集群ID
Type interface{} // 任务类型 Type interface{} // 任务类型

View File

@@ -1,7 +1,8 @@
package nameservers package models
import ( import (
"github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
@@ -127,6 +128,22 @@ func (this *NSClusterDAO) FindAllEnabledClusters(tx *dbs.Tx) (result []*NSCluste
return return
} }
// FindAllEnabledClusterIds 获取所有集群IDs
func (this *NSClusterDAO) FindAllEnabledClusterIds(tx *dbs.Tx) ([]int64, error) {
ones, err := this.Query(tx).
State(NSClusterStateEnabled).
ResultPk().
FindAll()
if err != nil {
return nil, err
}
var result = []int64{}
for _, one := range ones {
result = append(result, int64(one.(*NSCluster).Id))
}
return result, nil
}
// UpdateClusterAccessLog 设置访问日志 // UpdateClusterAccessLog 设置访问日志
func (this *NSClusterDAO) UpdateClusterAccessLog(tx *dbs.Tx, clusterId int64, accessLogJSON []byte) error { func (this *NSClusterDAO) UpdateClusterAccessLog(tx *dbs.Tx, clusterId int64, accessLogJSON []byte) error {
return this.Query(tx). return this.Query(tx).
@@ -143,3 +160,8 @@ func (this *NSClusterDAO) FindClusterAccessLog(tx *dbs.Tx, clusterId int64) ([]b
FindStringCol("") FindStringCol("")
return []byte(accessLog), err return []byte(accessLog), err
} }
// NotifyUpdate 通知更改
func (this *NSClusterDAO) NotifyUpdate(tx *dbs.Tx, clusterId int64) error {
return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, NSNodeTaskTypeConfigChanged)
}

View File

@@ -1,4 +1,4 @@
package nameservers package models
import ( import (
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"

View File

@@ -1,4 +1,4 @@
package nameservers package models
// NSCluster 域名服务器集群 // NSCluster 域名服务器集群
type NSCluster struct { type NSCluster struct {

View File

@@ -0,0 +1 @@
package models

View File

@@ -1,8 +1,7 @@
package nameservers package models
import ( import (
"encoding/json" "encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/utils" "github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils" "github.com/TeaOSLab/EdgeCommon/pkg/configutils"
@@ -214,7 +213,7 @@ func (this *NSNodeDAO) CreateNode(tx *dbs.Tx, adminId int64, name string, cluste
secret := rands.String(32) secret := rands.String(32)
// 保存API Token // 保存API Token
err = models.SharedApiTokenDAO.CreateAPIToken(tx, uniqueId, secret, nodeconfigs.NodeRoleDNS) err = SharedApiTokenDAO.CreateAPIToken(tx, uniqueId, secret, nodeconfigs.NodeRoleDNS)
if err != nil { if err != nil {
return return
} }
@@ -281,7 +280,7 @@ func (this *NSNodeDAO) FindEnabledNodeIdWithUniqueId(tx *dbs.Tx, uniqueId string
} }
// FindNodeInstallStatus 查询节点的安装状态 // FindNodeInstallStatus 查询节点的安装状态
func (this *NSNodeDAO) FindNodeInstallStatus(tx *dbs.Tx, nodeId int64) (*models.NodeInstallStatus, error) { func (this *NSNodeDAO) FindNodeInstallStatus(tx *dbs.Tx, nodeId int64) (*NodeInstallStatus, error) {
node, err := this.Query(tx). node, err := this.Query(tx).
Pk(nodeId). Pk(nodeId).
Result("installStatus", "isInstalled"). Result("installStatus", "isInstalled").
@@ -296,10 +295,10 @@ func (this *NSNodeDAO) FindNodeInstallStatus(tx *dbs.Tx, nodeId int64) (*models.
installStatus := node.(*NSNode).InstallStatus installStatus := node.(*NSNode).InstallStatus
isInstalled := node.(*NSNode).IsInstalled == 1 isInstalled := node.(*NSNode).IsInstalled == 1
if len(installStatus) == 0 { if len(installStatus) == 0 {
return models.NewNodeInstallStatus(), nil return NewNodeInstallStatus(), nil
} }
status := &models.NodeInstallStatus{} status := &NodeInstallStatus{}
err = json.Unmarshal([]byte(installStatus), status) err = json.Unmarshal([]byte(installStatus), status)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -391,7 +390,7 @@ func (this *NSNodeDAO) ComposeNodeConfig(tx *dbs.Tx, nodeId int64) (*dnsconfigs.
// 访问日志 // 访问日志
// 全局配置 // 全局配置
{ {
globalValue, err := models.SharedSysSettingDAO.ReadSetting(tx, systemconfigs.SettingCodeNSAccessLogSetting) globalValue, err := SharedSysSettingDAO.ReadSetting(tx, systemconfigs.SettingCodeNSAccessLogSetting)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -498,6 +497,29 @@ func (this *NSNodeDAO) UpdateNodeStatusIsNotified(tx *dbs.Tx, nodeId int64) erro
UpdateQuickly() UpdateQuickly()
} }
// FindAllNodeIdsMatch 匹配节点并返回节点ID
func (this *NSNodeDAO) FindAllNodeIdsMatch(tx *dbs.Tx, clusterId int64, includeSecondaryNodes bool, isOn configutils.BoolState) (result []int64, err error) {
query := this.Query(tx)
query.State(NSNodeStateEnabled)
if clusterId > 0 {
query.Attr("clusterId", clusterId)
}
if isOn == configutils.BoolStateYes {
query.Attr("isOn", true)
} else if isOn == configutils.BoolStateNo {
query.Attr("isOn", false)
}
query.Result("id")
ones, _, err := query.FindOnes()
if err != nil {
return nil, err
}
for _, one := range ones {
result = append(result, one.GetInt64("id"))
}
return
}
// NotifyUpdate 通知更新 // NotifyUpdate 通知更新
func (this *NSNodeDAO) NotifyUpdate(tx *dbs.Tx, nodeId int64) error { func (this *NSNodeDAO) NotifyUpdate(tx *dbs.Tx, nodeId int64) error {
// TODO 先什么都不做 // TODO 先什么都不做

View File

@@ -1,4 +1,4 @@
package nameservers package models
import ( import (
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"

View File

@@ -1,4 +1,4 @@
package nameservers package models
// NSNode 域名服务器节点 // NSNode 域名服务器节点
type NSNode struct { type NSNode struct {

View File

@@ -1,21 +1,20 @@
package nameservers package models
import ( import (
"encoding/json" "encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"time" "time"
) )
// DecodeInstallStatus 安装状态 // DecodeInstallStatus 安装状态
func (this *NSNode) DecodeInstallStatus() (*models.NodeInstallStatus, error) { func (this *NSNode) DecodeInstallStatus() (*NodeInstallStatus, error) {
if len(this.InstallStatus) == 0 || this.InstallStatus == "null" { if len(this.InstallStatus) == 0 || this.InstallStatus == "null" {
return models.NewNodeInstallStatus(), nil return NewNodeInstallStatus(), nil
} }
status := &models.NodeInstallStatus{} status := &NodeInstallStatus{}
err := json.Unmarshal([]byte(this.InstallStatus), status) err := json.Unmarshal([]byte(this.InstallStatus), status)
if err != nil { if err != nil {
return models.NewNodeInstallStatus(), err return NewNodeInstallStatus(), err
} }
// 如果N秒钟没有更新状态则认为不在运行 // 如果N秒钟没有更新状态则认为不在运行

View File

@@ -8,6 +8,7 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models/dns" "github.com/TeaOSLab/EdgeAPI/internal/db/models/dns"
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils" "github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils" "github.com/TeaOSLab/EdgeCommon/pkg/configutils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs"
@@ -1270,11 +1271,11 @@ func (this *ServerDAO) UpdateUserServersClusterId(tx *dbs.Tx, userId int64, oldC
} }
if oldClusterId > 0 { if oldClusterId > 0 {
err = SharedNodeTaskDAO.CreateClusterTask(tx, oldClusterId, NodeTaskTypeConfigChanged) err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, oldClusterId, NodeTaskTypeConfigChanged)
if err != nil { if err != nil {
return err return err
} }
err = SharedNodeTaskDAO.CreateClusterTask(tx, oldClusterId, NodeTaskTypeIPItemChanged) err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, oldClusterId, NodeTaskTypeIPItemChanged)
if err != nil { if err != nil {
return err return err
} }
@@ -1285,11 +1286,11 @@ func (this *ServerDAO) UpdateUserServersClusterId(tx *dbs.Tx, userId int64, oldC
} }
if newClusterId > 0 { if newClusterId > 0 {
err = SharedNodeTaskDAO.CreateClusterTask(tx, newClusterId, NodeTaskTypeConfigChanged) err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, newClusterId, NodeTaskTypeConfigChanged)
if err != nil { if err != nil {
return err return err
} }
err = SharedNodeTaskDAO.CreateClusterTask(tx, newClusterId, NodeTaskTypeIPItemChanged) err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, newClusterId, NodeTaskTypeIPItemChanged)
if err != nil { if err != nil {
return err return err
} }
@@ -1437,7 +1438,7 @@ func (this *ServerDAO) NotifyUpdate(tx *dbs.Tx, serverId int64) error {
if clusterId == 0 { if clusterId == 0 {
return nil return nil
} }
return SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, NodeTaskTypeConfigChanged) return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeConfigChanged)
} }
// NotifyDNSUpdate 通知DNS更新 // NotifyDNSUpdate 通知DNS更新

View File

@@ -45,21 +45,21 @@ func (this *NSService) ComposeNSBoard(ctx context.Context, req *pb.ComposeNSBoar
result.CountNSRecords = countRecords result.CountNSRecords = countRecords
// 集群数 // 集群数
countClusters, err := nameservers.SharedNSClusterDAO.CountAllEnabledClusters(tx) countClusters, err := models.SharedNSClusterDAO.CountAllEnabledClusters(tx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
result.CountNSClusters = countClusters result.CountNSClusters = countClusters
// 节点数 // 节点数
countNodes, err := nameservers.SharedNSNodeDAO.CountAllEnabledNodes(tx) countNodes, err := models.SharedNSNodeDAO.CountAllEnabledNodes(tx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
result.CountNSNodes = countNodes result.CountNSNodes = countNodes
// 离线节点数 // 离线节点数
countOfflineNodes, err := nameservers.SharedNSNodeDAO.CountAllOfflineNodes(tx) countOfflineNodes, err := models.SharedNSNodeDAO.CountAllOfflineNodes(tx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -122,7 +122,7 @@ func (this *NSService) ComposeNSBoard(ctx context.Context, req *pb.ComposeNSBoar
return nil, err return nil, err
} }
for _, stat := range topNodeStats { for _, stat := range topNodeStats {
nodeName, err := nameservers.SharedNSNodeDAO.FindEnabledNSNodeName(tx, int64(stat.NodeId)) nodeName, err := models.SharedNSNodeDAO.FindEnabledNSNodeName(tx, int64(stat.NodeId))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -4,8 +4,9 @@ package nameservers
import ( import (
"context" "context"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/rpc/services" "github.com/TeaOSLab/EdgeAPI/internal/rpc/services"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
) )
@@ -21,7 +22,7 @@ func (this *NSClusterService) CreateNSCluster(ctx context.Context, req *pb.Creat
return nil, err return nil, err
} }
var tx = this.NullTx() var tx = this.NullTx()
clusterId, err := nameservers.SharedNSClusterDAO.CreateCluster(tx, req.Name, req.AccessLogJSON) clusterId, err := models.SharedNSClusterDAO.CreateCluster(tx, req.Name, req.AccessLogJSON)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -35,7 +36,7 @@ func (this *NSClusterService) UpdateNSCluster(ctx context.Context, req *pb.Updat
return nil, err return nil, err
} }
var tx = this.NullTx() var tx = this.NullTx()
err = nameservers.SharedNSClusterDAO.UpdateCluster(tx, req.NsClusterId, req.Name, req.IsOn) err = models.SharedNSClusterDAO.UpdateCluster(tx, req.NsClusterId, req.Name, req.IsOn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -50,7 +51,7 @@ func (this *NSClusterService) FindNSClusterAccessLog(ctx context.Context, req *p
} }
var tx = this.NullTx() var tx = this.NullTx()
accessLogJSON, err := nameservers.SharedNSClusterDAO.FindClusterAccessLog(tx, req.NsClusterId) accessLogJSON, err := models.SharedNSClusterDAO.FindClusterAccessLog(tx, req.NsClusterId)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -65,7 +66,7 @@ func (this *NSClusterService) UpdateNSClusterAccessLog(ctx context.Context, req
} }
var tx = this.NullTx() var tx = this.NullTx()
err = nameservers.SharedNSClusterDAO.UpdateClusterAccessLog(tx, req.NsClusterId, req.AccessLogJSON) err = models.SharedNSClusterDAO.UpdateClusterAccessLog(tx, req.NsClusterId, req.AccessLogJSON)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -79,10 +80,17 @@ func (this *NSClusterService) DeleteNSCluster(ctx context.Context, req *pb.Delet
return nil, err return nil, err
} }
var tx = this.NullTx() var tx = this.NullTx()
err = nameservers.SharedNSClusterDAO.DisableNSCluster(tx, req.NsClusterId) err = models.SharedNSClusterDAO.DisableNSCluster(tx, req.NsClusterId)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// 删除任务
err = models.SharedNodeTaskDAO.DeleteAllClusterTasks(tx, nodeconfigs.NodeRoleDNS, req.NsClusterId)
if err != nil {
return nil, err
}
return this.Success() return this.Success()
} }
@@ -93,7 +101,7 @@ func (this *NSClusterService) FindEnabledNSCluster(ctx context.Context, req *pb.
return nil, err return nil, err
} }
var tx = this.NullTx() var tx = this.NullTx()
cluster, err := nameservers.SharedNSClusterDAO.FindEnabledNSCluster(tx, req.NsClusterId) cluster, err := models.SharedNSClusterDAO.FindEnabledNSCluster(tx, req.NsClusterId)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -115,7 +123,7 @@ func (this *NSClusterService) CountAllEnabledNSClusters(ctx context.Context, req
return nil, err return nil, err
} }
var tx = this.NullTx() var tx = this.NullTx()
count, err := nameservers.SharedNSClusterDAO.CountAllEnabledClusters(tx) count, err := models.SharedNSClusterDAO.CountAllEnabledClusters(tx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -129,7 +137,7 @@ func (this *NSClusterService) ListEnabledNSClusters(ctx context.Context, req *pb
return nil, err return nil, err
} }
var tx = this.NullTx() var tx = this.NullTx()
clusters, err := nameservers.SharedNSClusterDAO.ListEnabledClusters(tx, req.Offset, req.Size) clusters, err := models.SharedNSClusterDAO.ListEnabledClusters(tx, req.Offset, req.Size)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -152,7 +160,7 @@ func (this *NSClusterService) FindAllEnabledNSClusters(ctx context.Context, req
return nil, err return nil, err
} }
var tx = this.NullTx() var tx = this.NullTx()
clusters, err := nameservers.SharedNSClusterDAO.FindAllEnabledClusters(tx) clusters, err := models.SharedNSClusterDAO.FindAllEnabledClusters(tx)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -78,7 +78,7 @@ func (this *NSDomainService) FindEnabledNSDomain(ctx context.Context, req *pb.Fi
} }
// 集群 // 集群
cluster, err := nameservers.SharedNSClusterDAO.FindEnabledNSCluster(tx, int64(domain.ClusterId)) cluster, err := models.SharedNSClusterDAO.FindEnabledNSCluster(tx, int64(domain.ClusterId))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -150,7 +150,7 @@ func (this *NSDomainService) ListEnabledNSDomains(ctx context.Context, req *pb.L
pbDomains := []*pb.NSDomain{} pbDomains := []*pb.NSDomain{}
for _, domain := range domains { for _, domain := range domains {
// 集群 // 集群
cluster, err := nameservers.SharedNSClusterDAO.FindEnabledNSCluster(tx, int64(domain.ClusterId)) cluster, err := models.SharedNSClusterDAO.FindEnabledNSCluster(tx, int64(domain.ClusterId))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -5,12 +5,13 @@ package nameservers
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/installers" "github.com/TeaOSLab/EdgeAPI/internal/installers"
"github.com/TeaOSLab/EdgeAPI/internal/rpc/services" "github.com/TeaOSLab/EdgeAPI/internal/rpc/services"
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils" "github.com/TeaOSLab/EdgeCommon/pkg/configutils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
stringutil "github.com/iwind/TeaGo/utils/string" stringutil "github.com/iwind/TeaGo/utils/string"
"path/filepath" "path/filepath"
@@ -30,7 +31,7 @@ func (this *NSNodeService) FindAllEnabledNSNodesWithNSClusterId(ctx context.Cont
var tx = this.NullTx() var tx = this.NullTx()
nodes, err := nameservers.SharedNSNodeDAO.FindAllEnabledNodesWithClusterId(tx, req.NsClusterId) nodes, err := models.SharedNSNodeDAO.FindAllEnabledNodesWithClusterId(tx, req.NsClusterId)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -60,7 +61,7 @@ func (this *NSNodeService) CountAllEnabledNSNodes(ctx context.Context, req *pb.C
} }
var tx = this.NullTx() var tx = this.NullTx()
count, err := nameservers.SharedNSNodeDAO.CountAllEnabledNodes(tx) count, err := models.SharedNSNodeDAO.CountAllEnabledNodes(tx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -75,7 +76,7 @@ func (this *NSNodeService) CountAllEnabledNSNodesMatch(ctx context.Context, req
} }
var tx = this.NullTx() var tx = this.NullTx()
count, err := nameservers.SharedNSNodeDAO.CountAllEnabledNodesMatch(tx, req.NsClusterId, configutils.ToBoolState(req.InstallState), configutils.ToBoolState(req.ActiveState), req.Keyword) count, err := models.SharedNSNodeDAO.CountAllEnabledNodesMatch(tx, req.NsClusterId, configutils.ToBoolState(req.InstallState), configutils.ToBoolState(req.ActiveState), req.Keyword)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -90,7 +91,7 @@ func (this *NSNodeService) ListEnabledNSNodesMatch(ctx context.Context, req *pb.
} }
var tx = this.NullTx() var tx = this.NullTx()
nodes, err := nameservers.SharedNSNodeDAO.ListAllEnabledNodesMatch(tx, req.NsClusterId, configutils.ToBoolState(req.InstallState), configutils.ToBoolState(req.ActiveState), req.Keyword, req.Offset, req.Size) nodes, err := models.SharedNSNodeDAO.ListAllEnabledNodesMatch(tx, req.NsClusterId, configutils.ToBoolState(req.InstallState), configutils.ToBoolState(req.ActiveState), req.Keyword, req.Offset, req.Size)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -144,7 +145,7 @@ func (this *NSNodeService) CountAllUpgradeNSNodesWithNSClusterId(ctx context.Con
deployFiles := installers.SharedDeployManager.LoadNSNodeFiles() deployFiles := installers.SharedDeployManager.LoadNSNodeFiles()
total := int64(0) total := int64(0)
for _, deployFile := range deployFiles { for _, deployFile := range deployFiles {
count, err := nameservers.SharedNSNodeDAO.CountAllLowerVersionNodesWithClusterId(tx, req.NsClusterId, deployFile.OS, deployFile.Arch, deployFile.Version) count, err := models.SharedNSNodeDAO.CountAllLowerVersionNodesWithClusterId(tx, req.NsClusterId, deployFile.OS, deployFile.Arch, deployFile.Version)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -163,7 +164,7 @@ func (this *NSNodeService) CreateNSNode(ctx context.Context, req *pb.CreateNSNod
tx := this.NullTx() tx := this.NullTx()
nodeId, err := nameservers.SharedNSNodeDAO.CreateNode(tx, adminId, req.Name, req.NodeClusterId) nodeId, err := models.SharedNSNodeDAO.CreateNode(tx, adminId, req.Name, req.NodeClusterId)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -182,7 +183,13 @@ func (this *NSNodeService) DeleteNSNode(ctx context.Context, req *pb.DeleteNSNod
tx := this.NullTx() tx := this.NullTx()
err = nameservers.SharedNSNodeDAO.DisableNSNode(tx, req.NsNodeId) err = models.SharedNSNodeDAO.DisableNSNode(tx, req.NsNodeId)
if err != nil {
return nil, err
}
// 删除任务
err = models.SharedNodeTaskDAO.DeleteNodeTasks(tx, nodeconfigs.NodeRoleDNS, req.NsNodeId)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -199,7 +206,7 @@ func (this *NSNodeService) FindEnabledNSNode(ctx context.Context, req *pb.FindEn
tx := this.NullTx() tx := this.NullTx()
node, err := nameservers.SharedNSNodeDAO.FindEnabledNSNode(tx, req.NsNodeId) node, err := models.SharedNSNodeDAO.FindEnabledNSNode(tx, req.NsNodeId)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -208,7 +215,7 @@ func (this *NSNodeService) FindEnabledNSNode(ctx context.Context, req *pb.FindEn
} }
// 集群信息 // 集群信息
clusterName, err := nameservers.SharedNSClusterDAO.FindEnabledNSClusterName(tx, int64(node.ClusterId)) clusterName, err := models.SharedNSClusterDAO.FindEnabledNSClusterName(tx, int64(node.ClusterId))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -256,7 +263,7 @@ func (this *NSNodeService) UpdateNSNode(ctx context.Context, req *pb.UpdateNSNod
tx := this.NullTx() tx := this.NullTx()
err = nameservers.SharedNSNodeDAO.UpdateNode(tx, req.NsNodeId, req.Name, req.NsClusterId, req.IsOn) err = models.SharedNSNodeDAO.UpdateNode(tx, req.NsNodeId, req.Name, req.NsClusterId, req.IsOn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -287,7 +294,7 @@ func (this *NSNodeService) FindNSNodeInstallStatus(ctx context.Context, req *pb.
tx := this.NullTx() tx := this.NullTx()
installStatus, err := nameservers.SharedNSNodeDAO.FindNodeInstallStatus(tx, req.NsNodeId) installStatus, err := models.SharedNSNodeDAO.FindNodeInstallStatus(tx, req.NsNodeId)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -315,7 +322,7 @@ func (this *NSNodeService) UpdateNSNodeIsInstalled(ctx context.Context, req *pb.
tx := this.NullTx() tx := this.NullTx()
err = nameservers.SharedNSNodeDAO.UpdateNodeIsInstalled(tx, req.NsNodeId, req.IsInstalled) err = models.SharedNSNodeDAO.UpdateNodeIsInstalled(tx, req.NsNodeId, req.IsInstalled)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -341,7 +348,7 @@ func (this *NSNodeService) UpdateNSNodeStatus(ctx context.Context, req *pb.Updat
tx := this.NullTx() tx := this.NullTx()
err = nameservers.SharedNSNodeDAO.UpdateNodeStatus(tx, nodeId, req.StatusJSON) err = models.SharedNSNodeDAO.UpdateNodeStatus(tx, nodeId, req.StatusJSON)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -357,7 +364,7 @@ func (this *NSNodeService) FindCurrentNSNodeConfig(ctx context.Context, req *pb.
} }
var tx = this.NullTx() var tx = this.NullTx()
config, err := nameservers.SharedNSNodeDAO.ComposeNodeConfig(tx, nodeId) config, err := models.SharedNSNodeDAO.ComposeNodeConfig(tx, nodeId)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -432,7 +439,7 @@ func (this *NSNodeService) UpdateNSNodeConnectedAPINodes(ctx context.Context, re
tx := this.NullTx() tx := this.NullTx()
err = nameservers.SharedNSNodeDAO.UpdateNodeConnectedAPINodes(tx, nodeId, req.ApiNodeIds) err = models.SharedNSNodeDAO.UpdateNodeConnectedAPINodes(tx, nodeId, req.ApiNodeIds)
if err != nil { if err != nil {
return nil, errors.Wrap(err) return nil, errors.Wrap(err)
} }

View File

@@ -6,12 +6,13 @@ import (
"fmt" "fmt"
"github.com/TeaOSLab/EdgeAPI/internal/configs" "github.com/TeaOSLab/EdgeAPI/internal/configs"
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers"
"github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/logs"
"strconv" "strconv"
"sync" "sync"
@@ -48,22 +49,56 @@ var requestChanMap = map[int64]chan *CommandRequest{} // node id => chan
func NextCommandRequestId() int64 { func NextCommandRequestId() int64 {
return atomic.AddInt64(&commandRequestId, 1) return atomic.AddInt64(&commandRequestId, 1)
} }
func init() { func init() {
// 清理WaitingChannelMap dbs.OnReadyDone(func() {
ticker := time.NewTicker(30 * time.Second) // 清理WaitingChannelMap
go func() { go func() {
for range ticker.C { ticker := time.NewTicker(30 * time.Second)
nodeLocker.Lock() for range ticker.C {
for requestId, request := range responseChanMap { nodeLocker.Lock()
if time.Now().Unix()-request.Timestamp > 3600 { for requestId, request := range responseChanMap {
responseChanMap[requestId].Close() if time.Now().Unix()-request.Timestamp > 3600 {
delete(responseChanMap, requestId) responseChanMap[requestId].Close()
delete(responseChanMap, requestId)
}
} }
nodeLocker.Unlock()
} }
nodeLocker.Unlock() }()
}
}() // 自动同步连接到本API节点的NS节点任务
go func() {
defer func() {
_ = recover()
}()
// TODO 未来支持同步边缘节点
var ticker = time.NewTicker(3 * time.Second)
for range ticker.C {
nodeIds, err := models.SharedNodeTaskDAO.FindAllDoingNodeIds(nil, nodeconfigs.NodeRoleDNS)
if err != nil {
remotelogs.Error("NSNodeService_SYNC", err.Error())
continue
}
nodeLocker.Lock()
for _, nodeId := range nodeIds {
c, ok := requestChanMap[nodeId]
if ok {
select {
case c <- &CommandRequest{
Id: NextCommandRequestId(),
Code: messageconfigs.NSMessageCodeNewNodeTask,
CommandJSON: nil,
}:
default:
}
}
}
nodeLocker.Unlock()
}
}()
})
} }
// NsNodeStream 节点stream // NsNodeStream 节点stream
@@ -100,22 +135,22 @@ func (this *NSNodeService) NsNodeStream(server pb.NSNodeService_NsNodeStreamServ
tx := this.NullTx() tx := this.NullTx()
// 标记为活跃状态 // 标记为活跃状态
oldIsActive, err := nameservers.SharedNSNodeDAO.FindNodeActive(tx, nodeId) oldIsActive, err := models.SharedNSNodeDAO.FindNodeActive(tx, nodeId)
if err != nil { if err != nil {
return err return err
} }
if !oldIsActive { if !oldIsActive {
err = nameservers.SharedNSNodeDAO.UpdateNodeActive(tx, nodeId, true) err = models.SharedNSNodeDAO.UpdateNodeActive(tx, nodeId, true)
if err != nil { if err != nil {
return err return err
} }
// 发送恢复消息 // 发送恢复消息
clusterId, err := nameservers.SharedNSNodeDAO.FindNodeClusterId(tx, nodeId) clusterId, err := models.SharedNSNodeDAO.FindNodeClusterId(tx, nodeId)
if err != nil { if err != nil {
return err return err
} }
nodeName, err := nameservers.SharedNSNodeDAO.FindEnabledNSNodeName(tx, nodeId) nodeName, err := models.SharedNSNodeDAO.FindEnabledNSNodeName(tx, nodeId)
if err != nil { if err != nil {
return err return err
} }
@@ -169,7 +204,7 @@ func (this *NSNodeService) NsNodeStream(server pb.NSNodeService_NsNodeStreamServ
req, err := server.Recv() req, err := server.Recv()
if err != nil { if err != nil {
// 修改节点状态 // 修改节点状态
err1 := nameservers.SharedNSNodeDAO.UpdateNodeActive(tx, nodeId, false) err1 := models.SharedNSNodeDAO.UpdateNodeActive(tx, nodeId, false)
if err1 != nil { if err1 != nil {
logs.Println(err1.Error()) logs.Println(err1.Error())
} }

View File

@@ -4,6 +4,7 @@ package nameservers
import ( import (
"context" "context"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers" "github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers"
"github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/rpc/services" "github.com/TeaOSLab/EdgeAPI/internal/rpc/services"
@@ -31,7 +32,7 @@ func (this *NSRecordHourlyStatService) UploadNSRecordHourlyStats(ctx context.Con
} }
var tx = this.NullTx() var tx = this.NullTx()
clusterId, err := nameservers.SharedNSNodeDAO.FindNodeClusterId(tx, nodeId) clusterId, err := models.SharedNSNodeDAO.FindNodeClusterId(tx, nodeId)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -4,6 +4,7 @@ package nameservers
import ( import (
"context" "context"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers" "github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers"
"github.com/TeaOSLab/EdgeAPI/internal/rpc/services" "github.com/TeaOSLab/EdgeAPI/internal/rpc/services"
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
@@ -75,7 +76,7 @@ func (this *NSRouteService) FindEnabledNSRoute(ctx context.Context, req *pb.Find
// 集群 // 集群
var pbCluster *pb.NSCluster var pbCluster *pb.NSCluster
if route.ClusterId > 0 { if route.ClusterId > 0 {
cluster, err := nameservers.SharedNSClusterDAO.FindEnabledNSCluster(tx, int64(route.ClusterId)) cluster, err := models.SharedNSClusterDAO.FindEnabledNSCluster(tx, int64(route.ClusterId))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -130,7 +131,7 @@ func (this *NSRouteService) FindAllEnabledNSRoutes(ctx context.Context, req *pb.
// 集群 // 集群
var pbCluster *pb.NSCluster var pbCluster *pb.NSCluster
if route.ClusterId > 0 { if route.ClusterId > 0 {
cluster, err := nameservers.SharedNSClusterDAO.FindEnabledNSCluster(tx, int64(route.ClusterId)) cluster, err := models.SharedNSClusterDAO.FindEnabledNSCluster(tx, int64(route.ClusterId))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -6,7 +6,6 @@ import (
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const" teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/authority" "github.com/TeaOSLab/EdgeAPI/internal/db/models/authority"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/stats" "github.com/TeaOSLab/EdgeAPI/internal/db/models/stats"
"github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/errors"
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
@@ -637,7 +636,7 @@ func (this *AdminService) ComposeAdminDashboard(ctx context.Context, req *pb.Com
upgradeInfo := &pb.ComposeAdminDashboardResponse_UpgradeInfo{ upgradeInfo := &pb.ComposeAdminDashboardResponse_UpgradeInfo{
NewVersion: teaconst.DNSNodeVersion, NewVersion: teaconst.DNSNodeVersion,
} }
countNodes, err := nameservers.SharedNSNodeDAO.CountAllLowerVersionNodes(tx, upgradeInfo.NewVersion) countNodes, err := models.SharedNSNodeDAO.CountAllLowerVersionNodes(tx, upgradeInfo.NewVersion)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -7,7 +7,6 @@ import (
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const" teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/authority" "github.com/TeaOSLab/EdgeAPI/internal/db/models/authority"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers"
"github.com/TeaOSLab/EdgeAPI/internal/encrypt" "github.com/TeaOSLab/EdgeAPI/internal/encrypt"
"github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/errors"
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
@@ -194,7 +193,7 @@ func (this *BaseService) ValidateNodeId(ctx context.Context, roles ...rpcutils.U
case rpcutils.UserTypeMonitor: case rpcutils.UserTypeMonitor:
nodeIntId, err = models.SharedMonitorNodeDAO.FindEnabledMonitorNodeIdWithUniqueId(nil, nodeId) nodeIntId, err = models.SharedMonitorNodeDAO.FindEnabledMonitorNodeIdWithUniqueId(nil, nodeId)
case rpcutils.UserTypeDNS: case rpcutils.UserTypeDNS:
nodeIntId, err = nameservers.SharedNSNodeDAO.FindEnabledNodeIdWithUniqueId(nil, nodeId) nodeIntId, err = models.SharedNSNodeDAO.FindEnabledNodeIdWithUniqueId(nil, nodeId)
case rpcutils.UserTypeAuthority: case rpcutils.UserTypeAuthority:
nodeIntId, err = authority.SharedAuthorityNodeDAO.FindEnabledAuthorityNodeIdWithUniqueId(nil, nodeId) nodeIntId, err = authority.SharedAuthorityNodeDAO.FindEnabledAuthorityNodeIdWithUniqueId(nil, nodeId)
default: default:

View File

@@ -3,7 +3,6 @@ package services
import ( import (
"context" "context"
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
) )
@@ -63,7 +62,7 @@ func (this *MessageService) ListUnreadMessages(ctx context.Context, req *pb.List
} }
} }
case nodeconfigs.NodeRoleDNS: case nodeconfigs.NodeRoleDNS:
cluster, err := nameservers.SharedNSClusterDAO.FindEnabledNSCluster(tx, int64(message.ClusterId)) cluster, err := models.SharedNSClusterDAO.FindEnabledNSCluster(tx, int64(message.ClusterId))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -90,7 +89,7 @@ func (this *MessageService) ListUnreadMessages(ctx context.Context, req *pb.List
} }
} }
case nodeconfigs.NodeRoleDNS: case nodeconfigs.NodeRoleDNS:
node, err := nameservers.SharedNSNodeDAO.FindEnabledNSNode(tx, int64(message.NodeId)) node, err := models.SharedNSNodeDAO.FindEnabledNSNode(tx, int64(message.NodeId))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -375,7 +375,7 @@ func (this *NodeService) DeleteNode(ctx context.Context, req *pb.DeleteNodeReque
} }
// 删除节点相关任务 // 删除节点相关任务
err = models.SharedNodeTaskDAO.DeleteNodeTasks(tx, req.NodeId) err = models.SharedNodeTaskDAO.DeleteNodeTasks(tx, nodeconfigs.NodeRoleNode, req.NodeId)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -1359,7 +1359,6 @@ func (this *NodeService) UpdateNodeDNS(ctx context.Context, req *pb.UpdateNodeDN
delete(routeCodeMap, req.DnsDomainId) delete(routeCodeMap, req.DnsDomainId)
} }
err = models.SharedNodeDAO.UpdateNodeDNS(tx, req.NodeId, routeCodeMap) err = models.SharedNodeDAO.UpdateNodeDNS(tx, req.NodeId, routeCodeMap)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@@ -97,7 +97,7 @@ func (this *NodeClusterService) DeleteNodeCluster(ctx context.Context, req *pb.D
} }
// 删除相关任务 // 删除相关任务
err = models.SharedNodeTaskDAO.DeleteAllClusterTasks(tx, req.NodeClusterId) err = models.SharedNodeTaskDAO.DeleteAllClusterTasks(tx, nodeconfigs.NodeRoleNode, req.NodeClusterId)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -4,6 +4,8 @@ import (
"context" "context"
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/installers" "github.com/TeaOSLab/EdgeAPI/internal/installers"
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
stringutil "github.com/iwind/TeaGo/utils/string" stringutil "github.com/iwind/TeaGo/utils/string"
@@ -17,7 +19,7 @@ type NodeTaskService struct {
// FindNodeTasks 获取单节点同步任务 // FindNodeTasks 获取单节点同步任务
func (this *NodeTaskService) FindNodeTasks(ctx context.Context, req *pb.FindNodeTasksRequest) (*pb.FindNodeTasksResponse, error) { func (this *NodeTaskService) FindNodeTasks(ctx context.Context, req *pb.FindNodeTasksRequest) (*pb.FindNodeTasksResponse, error) {
nodeId, err := this.ValidateNode(ctx) nodeType, nodeId, err := this.ValidateNodeId(ctx, rpcutils.UserTypeNode, rpcutils.UserTypeDNS)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -25,7 +27,7 @@ func (this *NodeTaskService) FindNodeTasks(ctx context.Context, req *pb.FindNode
_ = req _ = req
var tx = this.NullTx() var tx = this.NullTx()
tasks, err := models.SharedNodeTaskDAO.FindDoingNodeTasks(tx, nodeId) tasks, err := models.SharedNodeTaskDAO.FindDoingNodeTasks(tx, nodeType, nodeId)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -38,18 +40,20 @@ func (this *NodeTaskService) FindNodeTasks(ctx context.Context, req *pb.FindNode
}) })
} }
// 版本更新任务 // 边缘节点版本更新任务
status, err := models.SharedNodeDAO.FindNodeStatus(tx, nodeId) if nodeType == rpcutils.UserTypeNode {
if err != nil { status, err := models.SharedNodeDAO.FindNodeStatus(tx, nodeId)
return nil, err if err != nil {
} return nil, err
if status != nil && len(status.OS) > 0 && len(status.Arch) > 0 && len(status.BuildVersion) > 0 { }
deployFile := installers.SharedDeployManager.FindNodeFile(status.OS, status.Arch) if status != nil && len(status.OS) > 0 && len(status.Arch) > 0 && len(status.BuildVersion) > 0 {
if deployFile != nil { deployFile := installers.SharedDeployManager.FindNodeFile(status.OS, status.Arch)
if stringutil.VersionCompare(deployFile.Version, status.BuildVersion) > 0 { if deployFile != nil {
pbTasks = append(pbTasks, &pb.NodeTask{ if stringutil.VersionCompare(deployFile.Version, status.BuildVersion) > 0 {
Type: models.NodeTaskTypeNodeVersionChanged, pbTasks = append(pbTasks, &pb.NodeTask{
}) Type: models.NodeTaskTypeNodeVersionChanged,
})
}
} }
} }
} }
@@ -59,7 +63,7 @@ func (this *NodeTaskService) FindNodeTasks(ctx context.Context, req *pb.FindNode
// ReportNodeTaskDone 报告同步任务结果 // ReportNodeTaskDone 报告同步任务结果
func (this *NodeTaskService) ReportNodeTaskDone(ctx context.Context, req *pb.ReportNodeTaskDoneRequest) (*pb.RPCSuccess, error) { func (this *NodeTaskService) ReportNodeTaskDone(ctx context.Context, req *pb.ReportNodeTaskDoneRequest) (*pb.RPCSuccess, error) {
_, err := this.ValidateNode(ctx) _, _, err := this.ValidateNodeId(ctx, rpcutils.UserTypeNode, rpcutils.UserTypeDNS)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -83,7 +87,8 @@ func (this *NodeTaskService) FindNodeClusterTasks(ctx context.Context, req *pb.F
_ = req _ = req
var tx = this.NullTx() var tx = this.NullTx()
clusterIds, err := models.SharedNodeTaskDAO.FindAllDoingTaskClusterIds(tx) // TODO 支持NS节点
clusterIds, err := models.SharedNodeTaskDAO.FindAllDoingTaskClusterIds(tx, nodeconfigs.NodeRoleNode)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -104,7 +109,8 @@ func (this *NodeTaskService) FindNodeClusterTasks(ctx context.Context, req *pb.F
// 错误的节点任务 // 错误的节点任务
pbNodeTasks := []*pb.NodeTask{} pbNodeTasks := []*pb.NodeTask{}
// TODO 考虑节点特别多的情形比如只显示前100个 // TODO 考虑节点特别多的情形比如只显示前100个
tasks, err := models.SharedNodeTaskDAO.FindAllDoingNodeTasksWithClusterId(tx, clusterId) // TODO 支持NS节点
tasks, err := models.SharedNodeTaskDAO.FindAllDoingNodeTasksWithClusterId(tx, nodeconfigs.NodeRoleNode, clusterId)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -155,13 +161,13 @@ func (this *NodeTaskService) ExistsNodeTasks(ctx context.Context, req *pb.Exists
var tx = this.NullTx() var tx = this.NullTx()
// 是否有任务 // 是否有任务
existTask, err := models.SharedNodeTaskDAO.ExistsDoingNodeTasks(tx) existTask, err := models.SharedNodeTaskDAO.ExistsDoingNodeTasks(tx, nodeconfigs.NodeRoleNode)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// 是否有错误 // 是否有错误
existError, err := models.SharedNodeTaskDAO.ExistsErrorNodeTasks(tx) existError, err := models.SharedNodeTaskDAO.ExistsErrorNodeTasks(tx, nodeconfigs.NodeRoleNode)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -216,7 +222,7 @@ func (this *NodeTaskService) CountDoingNodeTasks(ctx context.Context, req *pb.Co
_ = req _ = req
var tx = this.NullTx() var tx = this.NullTx()
count, err := models.SharedNodeTaskDAO.CountDoingNodeTasks(tx) count, err := models.SharedNodeTaskDAO.CountDoingNodeTasks(tx, nodeconfigs.NodeRoleNode)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -238,7 +244,7 @@ func (this *NodeTaskService) FindNotifyingNodeTasks(ctx context.Context, req *pb
} }
var tx = this.NullTx() var tx = this.NullTx()
tasks, err := models.SharedNodeTaskDAO.FindNotifyingNodeTasks(tx, req.Size) tasks, err := models.SharedNodeTaskDAO.FindNotifyingNodeTasks(tx, nodeconfigs.NodeRoleNode, req.Size)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -5,7 +5,6 @@ package services
import ( import (
"context" "context"
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers"
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
) )
@@ -28,7 +27,7 @@ func (this *NodeValueService) CreateNodeValue(ctx context.Context, req *pb.Creat
case rpcutils.UserTypeNode: case rpcutils.UserTypeNode:
clusterId, err = models.SharedNodeDAO.FindNodeClusterId(tx, nodeId) clusterId, err = models.SharedNodeDAO.FindNodeClusterId(tx, nodeId)
case rpcutils.UserTypeDNS: case rpcutils.UserTypeDNS:
clusterId, err = nameservers.SharedNSNodeDAO.FindNodeClusterId(tx, nodeId) clusterId, err = models.SharedNSNodeDAO.FindNodeClusterId(tx, nodeId)
case rpcutils.UserTypeUser: case rpcutils.UserTypeUser:
} }
if err != nil { if err != nil {

View File

@@ -3,7 +3,6 @@ package services
import ( import (
"context" "context"
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/stats" "github.com/TeaOSLab/EdgeAPI/internal/db/models/stats"
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
@@ -34,7 +33,7 @@ func (this *ServerDailyStatService) UploadServerDailyStats(ctx context.Context,
var clusterId int64 var clusterId int64
switch role { switch role {
case rpcutils.UserTypeDNS: case rpcutils.UserTypeDNS:
clusterId, err = nameservers.SharedNSNodeDAO.FindNodeClusterId(tx, nodeId) clusterId, err = models.SharedNSNodeDAO.FindNodeClusterId(tx, nodeId)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -8,7 +8,6 @@ import (
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const" teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/authority" "github.com/TeaOSLab/EdgeAPI/internal/db/models/authority"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers"
"github.com/TeaOSLab/EdgeAPI/internal/encrypt" "github.com/TeaOSLab/EdgeAPI/internal/encrypt"
"github.com/TeaOSLab/EdgeAPI/internal/utils" "github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/iwind/TeaGo/lists" "github.com/iwind/TeaGo/lists"
@@ -187,7 +186,7 @@ func ValidateRequest(ctx context.Context, userTypes ...UserType) (userType UserT
nodeUserId = nodeIntId nodeUserId = nodeIntId
resultNodeId = nodeIntId resultNodeId = nodeIntId
case UserTypeDNS: case UserTypeDNS:
nodeIntId, err := nameservers.SharedNSNodeDAO.FindEnabledNodeIdWithUniqueId(nil, nodeId) nodeIntId, err := models.SharedNSNodeDAO.FindEnabledNodeIdWithUniqueId(nil, nodeId)
if err != nil { if err != nil {
return UserTypeNode, nodeIntId, 0, errors.New("context: " + err.Error()) return UserTypeNode, nodeIntId, 0, errors.New("context: " + err.Error())
} }

View File

@@ -2,6 +2,7 @@ package tasks
import ( import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/logs"
"time" "time"
@@ -42,9 +43,11 @@ func (this *NodeTaskExtractor) Loop() error {
// 这里不解锁是为了让任务N秒钟之内只运行一次 // 这里不解锁是为了让任务N秒钟之内只运行一次
err = models.SharedNodeTaskDAO.ExtractAllClusterTasks(nil) for _, role := range []string{nodeconfigs.NodeRoleNode, nodeconfigs.NodeRoleDNS} {
if err != nil { err = models.SharedNodeTaskDAO.ExtractAllClusterTasks(nil, role)
return err if err != nil {
return err
}
} }
return nil return nil

View File

@@ -2,7 +2,6 @@ package tasks
import ( import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers"
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils" "github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs"
@@ -59,7 +58,7 @@ func (this *NSNodeMonitorTask) loop() error {
return err return err
} }
clusters, err := nameservers.SharedNSClusterDAO.FindAllEnabledClusters(nil) clusters, err := models.SharedNSClusterDAO.FindAllEnabledClusters(nil)
if err != nil { if err != nil {
return err return err
} }
@@ -73,11 +72,11 @@ func (this *NSNodeMonitorTask) loop() error {
return nil return nil
} }
func (this *NSNodeMonitorTask) monitorCluster(cluster *nameservers.NSCluster) error { func (this *NSNodeMonitorTask) monitorCluster(cluster *models.NSCluster) error {
clusterId := int64(cluster.Id) clusterId := int64(cluster.Id)
// 检查离线节点 // 检查离线节点
inactiveNodes, err := nameservers.SharedNSNodeDAO.FindAllNotifyingInactiveNodesWithClusterId(nil, clusterId) inactiveNodes, err := models.SharedNSNodeDAO.FindAllNotifyingInactiveNodesWithClusterId(nil, clusterId)
if err != nil { if err != nil {
return err return err
} }
@@ -90,7 +89,7 @@ func (this *NSNodeMonitorTask) monitorCluster(cluster *nameservers.NSCluster) er
} }
// 修改在线状态 // 修改在线状态
err = nameservers.SharedNSNodeDAO.UpdateNodeStatusIsNotified(nil, int64(node.Id)) err = models.SharedNSNodeDAO.UpdateNodeStatusIsNotified(nil, int64(node.Id))
if err != nil { if err != nil {
return err return err
} }