数据有更改时发送通知

This commit is contained in:
刘祥超
2021-08-08 15:47:48 +08:00
parent c893de8af7
commit 473a2db335
40 changed files with 548 additions and 185 deletions

View File

@@ -3,6 +3,7 @@ package models
import (
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
@@ -302,7 +303,7 @@ func (this *IPItemDAO) NotifyUpdate(tx *dbs.Tx, itemId int64) error {
if len(resultClusterIds) > 0 {
for _, clusterId := range resultClusterIds {
err = SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, NodeTaskTypeIPItemChanged)
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeIPItemChanged)
if err != nil {
return err
}

View File

@@ -2,6 +2,7 @@ package models
import (
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/ipconfigs"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
@@ -253,7 +254,7 @@ func (this *IPListDAO) NotifyUpdate(tx *dbs.Tx, listId int64, taskType NodeTaskT
if len(resultClusterIds) > 0 {
for _, clusterId := range resultClusterIds {
err = SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, taskType)
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, taskType)
if err != nil {
return err
}

View File

@@ -3,6 +3,7 @@ package models
import (
"encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
@@ -308,7 +309,7 @@ func (this *MetricItemDAO) NotifyUpdate(tx *dbs.Tx, itemId int64, isPublic bool)
return err
}
for _, clusterId := range clusterIds {
err = SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, NodeTaskTypeConfigChanged)
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeConfigChanged)
if err != nil {
return err
}
@@ -320,7 +321,7 @@ func (this *MetricItemDAO) NotifyUpdate(tx *dbs.Tx, itemId int64, isPublic bool)
return err
}
for _, clusterId := range clusterIds {
err = SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, NodeTaskTypeConfigChanged)
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeConfigChanged)
if err != nil {
return err
}

View File

@@ -1 +0,0 @@
package nameservers

View File

@@ -3,6 +3,7 @@ package nameservers
import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
@@ -35,12 +36,15 @@ func init() {
}
// EnableNSDomain 启用条目
func (this *NSDomainDAO) EnableNSDomain(tx *dbs.Tx, id int64) error {
func (this *NSDomainDAO) EnableNSDomain(tx *dbs.Tx, domainId int64) error {
_, err := this.Query(tx).
Pk(id).
Pk(domainId).
Set("state", NSDomainStateEnabled).
Update()
return err
if err != nil {
return err
}
return this.NotifyUpdate(tx, domainId)
}
// DisableNSDomain 禁用条目
@@ -55,7 +59,10 @@ func (this *NSDomainDAO) DisableNSDomain(tx *dbs.Tx, domainId int64) error {
Set("state", NSDomainStateDisabled).
Set("version", version).
Update()
return err
if err != nil {
return err
}
return this.NotifyUpdate(tx, domainId)
}
// FindEnabledNSDomain 查找启用中的条目
@@ -92,7 +99,16 @@ func (this *NSDomainDAO) CreateDomain(tx *dbs.Tx, clusterId int64, userId int64,
op.Version = version
op.IsOn = true
op.State = NSDomainStateEnabled
return this.SaveInt64(tx, op)
domainId, err := this.SaveInt64(tx, op)
if err != nil {
return 0, err
}
err = this.NotifyUpdate(tx, domainId)
if err != nil {
return domainId, err
}
return domainId, nil
}
// UpdateDomain 修改域名
@@ -101,6 +117,14 @@ func (this *NSDomainDAO) UpdateDomain(tx *dbs.Tx, domainId int64, clusterId int6
return errors.New("invalid domainId")
}
oldClusterId, err := this.Query(tx).
Pk(domainId).
Result("clusterId").
FindInt64Col(0)
if err != nil {
return err
}
version, err := this.IncreaseVersion(tx)
if err != nil {
return err
@@ -112,7 +136,20 @@ func (this *NSDomainDAO) UpdateDomain(tx *dbs.Tx, domainId int64, clusterId int6
op.UserId = userId
op.IsOn = isOn
op.Version = version
return this.Save(tx, op)
err = this.Save(tx, op)
if err != nil {
return err
}
// 通知更新
if oldClusterId > 0 && oldClusterId != clusterId {
err = models.SharedNSClusterDAO.NotifyUpdate(tx, oldClusterId)
if err != nil {
return err
}
}
return this.NotifyUpdate(tx, domainId)
}
// CountAllEnabledDomains 计算域名数量
@@ -121,7 +158,7 @@ func (this *NSDomainDAO) CountAllEnabledDomains(tx *dbs.Tx, clusterId int64, use
if clusterId > 0 {
query.Attr("clusterId", clusterId)
} else {
query.Where("clusterId IN (SELECT id FROM " + SharedNSClusterDAO.Table + " WHERE state=1)")
query.Where("clusterId IN (SELECT id FROM " + models.SharedNSClusterDAO.Table + " WHERE state=1)")
}
if userId > 0 {
query.Attr("userId", userId)
@@ -144,7 +181,7 @@ func (this *NSDomainDAO) ListEnabledDomains(tx *dbs.Tx, clusterId int64, userId
if clusterId > 0 {
query.Attr("clusterId", clusterId)
} else {
query.Where("clusterId IN (SELECT id FROM " + SharedNSClusterDAO.Table + " WHERE state=1)")
query.Where("clusterId IN (SELECT id FROM " + models.SharedNSClusterDAO.Table + " WHERE state=1)")
}
if userId > 0 {
query.Attr("userId", userId)
@@ -214,22 +251,39 @@ func (this *NSDomainDAO) UpdateDomainTSIG(tx *dbs.Tx, domainId int64, tsigJSON [
return err
}
return this.Query(tx).
err = this.Query(tx).
Pk(domainId).
Set("tsig", tsigJSON).
Set("version", version).
UpdateQuickly()
}
// NotifyUpdate 通知更改
func (this *NSDomainDAO) NotifyUpdate(tx *dbs.Tx, domainId int64) error {
version, err := this.IncreaseVersion(tx)
if err != nil {
return err
}
return this.NotifyUpdate(tx, domainId)
}
// FindEnabledDomainClusterId 获取域名的集群ID
func (this *NSDomainDAO) FindEnabledDomainClusterId(tx *dbs.Tx, domainId int64) (int64, error) {
return this.Query(tx).
Pk(domainId).
Set("version", version).
UpdateQuickly()
State(NSDomainStateEnabled).
Result("clusterId").
FindInt64Col(0)
}
// NotifyUpdate 通知更改
func (this *NSDomainDAO) NotifyUpdate(tx *dbs.Tx, domainId int64) error {
clusterId, err := this.Query(tx).
Result("clusterId").
Pk(domainId).
FindInt64Col(0)
if err != nil {
return err
}
if clusterId > 0 {
return models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, models.NSNodeTaskTypeDomainChanged)
}
return nil
}

View File

@@ -4,6 +4,7 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeCommon/pkg/dnsconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
@@ -176,8 +177,33 @@ func (this *NSKeyDAO) NotifyUpdate(tx *dbs.Tx, keyId int64) error {
if err != nil {
return err
}
return this.Query(tx).
err = this.Query(tx).
Pk(keyId).
Set("version", version).
UpdateQuickly()
if err != nil {
return err
}
// 通知集群
domainId, err := this.Query(tx).
Pk(keyId).
Result("domainId").
FindInt64Col(0)
if err != nil {
return err
}
if domainId > 0 {
clusterId, err := SharedNSDomainDAO.FindEnabledDomainClusterId(tx, domainId)
if err != nil {
return err
}
if clusterId > 0 {
err = models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, models.NSNodeTaskTypeKeyChanged)
if err != nil {
return err
}
}
}
return nil
}

View File

@@ -5,6 +5,7 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeCommon/pkg/dnsconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
@@ -38,27 +39,33 @@ func init() {
}
// EnableNSRecord 启用条目
func (this *NSRecordDAO) EnableNSRecord(tx *dbs.Tx, id uint64) error {
func (this *NSRecordDAO) EnableNSRecord(tx *dbs.Tx, recordId int64) error {
_, err := this.Query(tx).
Pk(id).
Pk(recordId).
Set("state", NSRecordStateEnabled).
Update()
return err
if err != nil {
return err
}
return this.NotifyUpdate(tx, recordId)
}
// DisableNSRecord 禁用条目
func (this *NSRecordDAO) DisableNSRecord(tx *dbs.Tx, id int64) error {
func (this *NSRecordDAO) DisableNSRecord(tx *dbs.Tx, recordId int64) error {
version, err := this.IncreaseVersion(tx)
if err != nil {
return err
}
_, err = this.Query(tx).
Pk(id).
Pk(recordId).
Set("state", NSRecordStateDisabled).
Set("version", version).
Update()
return err
if err != nil {
return err
}
return this.NotifyUpdate(tx, recordId)
}
// FindEnabledNSRecord 查找启用中的条目
@@ -109,7 +116,16 @@ func (this *NSRecordDAO) CreateRecord(tx *dbs.Tx, domainId int64, description st
op.IsOn = true
op.State = NSRecordStateEnabled
op.Version = version
return this.SaveInt64(tx, op)
recordId, err := this.SaveInt64(tx, op)
if err != nil {
return 0, err
}
err = this.NotifyUpdate(tx, recordId)
if err != nil {
return 0, err
}
return recordId, nil
}
// UpdateRecord 修改记录
@@ -144,7 +160,12 @@ func (this *NSRecordDAO) UpdateRecord(tx *dbs.Tx, recordId int64, description st
op.Version = version
return this.Save(tx, op)
err = this.Save(tx, op)
if err != nil {
return err
}
return this.NotifyUpdate(tx, recordId)
}
// CountAllEnabledDomainRecords 计算域名中记录数量
@@ -230,3 +251,31 @@ func (this *NSRecordDAO) FindEnabledRecordWithName(tx *dbs.Tx, domainId int64, r
}
return record.(*NSRecord), nil
}
// NotifyUpdate 通知更新
func (this *NSRecordDAO) NotifyUpdate(tx *dbs.Tx, recordId int64) error {
domainId, err := this.Query(tx).
Pk(recordId).
Result("domainId").
FindInt64Col(0)
if err != nil {
return err
}
if domainId == 0 {
return nil
}
clusterId, err := SharedNSDomainDAO.FindEnabledDomainClusterId(tx, domainId)
if err != nil {
return err
}
if clusterId > 0 {
err = models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, models.NSNodeTaskTypeRecordChanged)
if err != nil {
return err
}
}
return nil
}

View File

@@ -3,6 +3,7 @@ package nameservers
import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
@@ -35,12 +36,21 @@ func init() {
}
// EnableNSRoute 启用条目
func (this *NSRouteDAO) EnableNSRoute(tx *dbs.Tx, id int64) error {
_, err := this.Query(tx).
Pk(id).
func (this *NSRouteDAO) EnableNSRoute(tx *dbs.Tx, routeId int64) error {
version, err := this.IncreaseVersion(tx)
if err != nil {
return err
}
_, err = this.Query(tx).
Pk(routeId).
Set("state", NSRouteStateEnabled).
Set("version", version).
Update()
return err
if err != nil {
return err
}
return this.NotifyUpdate(tx)
}
// DisableNSRoute 禁用条目
@@ -55,7 +65,10 @@ func (this *NSRouteDAO) DisableNSRoute(tx *dbs.Tx, routeId int64) error {
Set("state", NSRouteStateDisabled).
Set("version", version).
Update()
return err
if err != nil {
return err
}
return this.NotifyUpdate(tx)
}
// FindEnabledNSRoute 查找启用中的条目
@@ -98,7 +111,17 @@ func (this *NSRouteDAO) CreateRoute(tx *dbs.Tx, clusterId int64, domainId int64,
op.IsOn = true
op.State = NSRouteStateEnabled
op.Version = version
return this.SaveInt64(tx, op)
routeId, err := this.SaveInt64(tx, op)
if err != nil {
return 0, err
}
err = this.NotifyUpdate(tx)
if err != nil {
return 0, err
}
return routeId, nil
}
// UpdateRoute 修改线路
@@ -123,7 +146,12 @@ func (this *NSRouteDAO) UpdateRoute(tx *dbs.Tx, routeId int64, name string, rang
op.Version = version
return this.Save(tx, op)
err = this.Save(tx, op)
if err != nil {
return err
}
return this.NotifyUpdate(tx)
}
// UpdateRouteOrders 修改线路排序
@@ -145,7 +173,8 @@ func (this *NSRouteDAO) UpdateRouteOrders(tx *dbs.Tx, routeIds []int64) error {
}
order--
}
return nil
return this.NotifyUpdate(tx)
}
// FindAllEnabledRoutes 列出所有线路
@@ -190,3 +219,19 @@ func (this *NSRouteDAO) ListRoutesAfterVersion(tx *dbs.Tx, version int64, size i
FindAll()
return
}
// NotifyUpdate 通知更新
func (this *NSRouteDAO) NotifyUpdate(tx *dbs.Tx) error {
// 线路变更时所有集群都要更新
clusterIds, err := models.SharedNSClusterDAO.FindAllEnabledClusterIds(tx)
if err != nil {
return err
}
for _, clusterId := range clusterIds {
err = models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, models.NSNodeTaskTypeRouteChanged)
if err != nil {
return err
}
}
return nil
}

View File

@@ -869,7 +869,7 @@ func (this *NodeClusterDAO) FindEnabledNodeClustersWithIds(tx *dbs.Tx, clusterId
// NotifyUpdate 通知更新
func (this *NodeClusterDAO) NotifyUpdate(tx *dbs.Tx, clusterId int64) error {
return SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, NodeTaskTypeConfigChanged)
return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeConfigChanged)
}
// NotifyDNSUpdate 通知DNS更新

View File

@@ -2,6 +2,7 @@ package models
import (
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
@@ -158,5 +159,5 @@ func (this *NodeClusterMetricItemDAO) ExistsClusterItem(tx *dbs.Tx, clusterId in
// NotifyUpdate 通知更新
func (this *NodeClusterMetricItemDAO) NotifyUpdate(tx *dbs.Tx, clusterId int64) error {
return SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, NodeTaskTypeConfigChanged)
return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeConfigChanged)
}

View File

@@ -1290,7 +1290,7 @@ func (this *NodeDAO) NotifyUpdate(tx *dbs.Tx, nodeId int64) error {
return err
}
if clusterId > 0 {
return SharedNodeTaskDAO.CreateNodeTask(tx, clusterId, nodeId, NodeTaskTypeConfigChanged)
return SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, NodeTaskTypeConfigChanged)
}
return nil
}

View File

@@ -1,8 +1,8 @@
package models
import (
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
@@ -17,6 +17,14 @@ const (
NodeTaskTypeConfigChanged NodeTaskType = "configChanged"
NodeTaskTypeIPItemChanged NodeTaskType = "ipItemChanged"
NodeTaskTypeNodeVersionChanged NodeTaskType = "nodeVersionChanged"
// NS相关
NSNodeTaskTypeConfigChanged NodeTaskType = "nsConfigChanged"
NSNodeTaskTypeDomainChanged NodeTaskType = "nsDomainChanged"
NSNodeTaskTypeRecordChanged NodeTaskType = "nsRecordChanged"
NSNodeTaskTypeRouteChanged NodeTaskType = "nsRouteChanged"
NSNodeTaskTypeKeyChanged NodeTaskType = "nsKeyChanged"
)
type NodeTaskDAO dbs.DAO
@@ -41,14 +49,15 @@ func init() {
}
// CreateNodeTask 创建单个节点任务
func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, clusterId int64, nodeId int64, taskType NodeTaskType) error {
func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, role string, clusterId int64, nodeId int64, taskType NodeTaskType) error {
if clusterId <= 0 || nodeId <= 0 {
return nil
}
uniqueId := numberutils.FormatInt64(nodeId) + "@node@" + taskType
uniqueId := role + "@" + types.String(nodeId) + "@node@" + taskType
updatedAt := time.Now().Unix()
_, _, err := this.Query(tx).
InsertOrUpdate(maps.Map{
"role": role,
"clusterId": clusterId,
"nodeId": nodeId,
"type": taskType,
@@ -68,15 +77,16 @@ func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, clusterId int64, nodeId int6
}
// CreateClusterTask 创建集群任务
func (this *NodeTaskDAO) CreateClusterTask(tx *dbs.Tx, clusterId int64, taskType NodeTaskType) error {
func (this *NodeTaskDAO) CreateClusterTask(tx *dbs.Tx, role string, clusterId int64, taskType NodeTaskType) error {
if clusterId <= 0 {
return nil
}
uniqueId := numberutils.FormatInt64(clusterId) + "@cluster@" + taskType
uniqueId := role + "@" + types.String(clusterId) + "@cluster@" + taskType
updatedAt := time.Now().Unix()
_, _, err := this.Query(tx).
InsertOrUpdate(maps.Map{
"role": role,
"clusterId": clusterId,
"nodeId": 0,
"type": taskType,
@@ -96,14 +106,15 @@ func (this *NodeTaskDAO) CreateClusterTask(tx *dbs.Tx, clusterId int64, taskType
return err
}
// ExtractClusterTask 分解集群任务
func (this *NodeTaskDAO) ExtractClusterTask(tx *dbs.Tx, clusterId int64, taskType NodeTaskType) error {
// ExtractNodeClusterTask 分解边缘节点集群任务
func (this *NodeTaskDAO) ExtractNodeClusterTask(tx *dbs.Tx, clusterId int64, taskType NodeTaskType) error {
nodeIds, err := SharedNodeDAO.FindAllNodeIdsMatch(tx, clusterId, true, configutils.BoolStateYes)
if err != nil {
return err
}
_, err = this.Query(tx).
Attr("role", nodeconfigs.NodeRoleNode).
Attr("clusterId", clusterId).
Param("clusterIdString", types.String(clusterId)).
Where("nodeId> 0").
@@ -114,13 +125,52 @@ func (this *NodeTaskDAO) ExtractClusterTask(tx *dbs.Tx, clusterId int64, taskTyp
}
for _, nodeId := range nodeIds {
err = this.CreateNodeTask(tx, clusterId, nodeId, taskType)
err = this.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, taskType)
if err != nil {
return err
}
}
_, err = this.Query(tx).
Attr("role", nodeconfigs.NodeRoleNode).
Attr("clusterId", clusterId).
Attr("nodeId", 0).
Attr("type", taskType).
Delete()
if err != nil {
return err
}
return nil
}
// ExtractNSClusterTask 分解NS节点集群任务
func (this *NodeTaskDAO) ExtractNSClusterTask(tx *dbs.Tx, clusterId int64, taskType NodeTaskType) error {
nodeIds, err := SharedNSNodeDAO.FindAllNodeIdsMatch(tx, clusterId, true, configutils.BoolStateYes)
if err != nil {
return err
}
_, err = this.Query(tx).
Attr("role", nodeconfigs.NodeRoleDNS).
Attr("clusterId", clusterId).
Param("clusterIdString", types.String(clusterId)).
Where("nodeId> 0").
Attr("type", taskType).
Delete()
if err != nil {
return err
}
for _, nodeId := range nodeIds {
err = this.CreateNodeTask(tx, nodeconfigs.NodeRoleDNS, clusterId, nodeId, taskType)
if err != nil {
return err
}
}
_, err = this.Query(tx).
Attr("role", nodeconfigs.NodeRoleDNS).
Attr("clusterId", clusterId).
Attr("nodeId", 0).
Attr("type", taskType).
@@ -133,8 +183,9 @@ func (this *NodeTaskDAO) ExtractClusterTask(tx *dbs.Tx, clusterId int64, taskTyp
}
// ExtractAllClusterTasks 分解所有集群任务
func (this *NodeTaskDAO) ExtractAllClusterTasks(tx *dbs.Tx) error {
func (this *NodeTaskDAO) ExtractAllClusterTasks(tx *dbs.Tx, role string) error {
ones, err := this.Query(tx).
Attr("role", role).
Attr("nodeId", 0).
FindAll()
if err != nil {
@@ -142,36 +193,47 @@ func (this *NodeTaskDAO) ExtractAllClusterTasks(tx *dbs.Tx) error {
}
for _, one := range ones {
clusterId := int64(one.(*NodeTask).ClusterId)
err = this.ExtractClusterTask(tx, clusterId, one.(*NodeTask).Type)
if err != nil {
return err
switch role {
case nodeconfigs.NodeRoleNode:
err = this.ExtractNodeClusterTask(tx, clusterId, one.(*NodeTask).Type)
if err != nil {
return err
}
case nodeconfigs.NodeRoleDNS:
err = this.ExtractNSClusterTask(tx, clusterId, one.(*NodeTask).Type)
if err != nil {
return err
}
}
}
return nil
}
// DeleteAllClusterTasks 删除集群所有相关任务
func (this *NodeTaskDAO) DeleteAllClusterTasks(tx *dbs.Tx, clusterId int64) error {
func (this *NodeTaskDAO) DeleteAllClusterTasks(tx *dbs.Tx, role string, clusterId int64) error {
_, err := this.Query(tx).
Attr("role", role).
Attr("clusterId", clusterId).
Delete()
return err
}
// DeleteNodeTasks 删除节点相关任务
func (this *NodeTaskDAO) DeleteNodeTasks(tx *dbs.Tx, nodeId int64) error {
func (this *NodeTaskDAO) DeleteNodeTasks(tx *dbs.Tx, role string, nodeId int64) error {
_, err := this.Query(tx).
Attr("role", role).
Attr("nodeId", nodeId).
Delete()
return err
}
// FindDoingNodeTasks 查询一个节点的所有任务
func (this *NodeTaskDAO) FindDoingNodeTasks(tx *dbs.Tx, nodeId int64) (result []*NodeTask, err error) {
func (this *NodeTaskDAO) FindDoingNodeTasks(tx *dbs.Tx, role string, nodeId int64) (result []*NodeTask, err error) {
if nodeId <= 0 {
return
}
_, err = this.Query(tx).
Attr("role", role).
Attr("nodeId", nodeId).
Where("(isDone=0 OR (isDone=1 AND isOk=0))").
Slice(&result).
@@ -191,9 +253,10 @@ func (this *NodeTaskDAO) UpdateNodeTaskDone(tx *dbs.Tx, taskId int64, isOk bool,
}
// FindAllDoingTaskClusterIds 查找正在更新的集群IDs
func (this *NodeTaskDAO) FindAllDoingTaskClusterIds(tx *dbs.Tx) ([]int64, error) {
func (this *NodeTaskDAO) FindAllDoingTaskClusterIds(tx *dbs.Tx, role string) ([]int64, error) {
ones, _, err := this.Query(tx).
Result("DISTINCT(clusterId) AS clusterId").
Attr("role", role).
Where("(nodeId=0 OR (isDone=0 OR (isDone=1 AND isOk=0)))").
FindOnes()
if err != nil {
@@ -207,8 +270,9 @@ func (this *NodeTaskDAO) FindAllDoingTaskClusterIds(tx *dbs.Tx) ([]int64, error)
}
// FindAllDoingNodeTasksWithClusterId 查询某个集群下所有的任务
func (this *NodeTaskDAO) FindAllDoingNodeTasksWithClusterId(tx *dbs.Tx, clusterId int64) (result []*NodeTask, err error) {
func (this *NodeTaskDAO) FindAllDoingNodeTasksWithClusterId(tx *dbs.Tx, role string, clusterId int64) (result []*NodeTask, err error) {
_, err = this.Query(tx).
Attr("role", role).
Attr("clusterId", clusterId).
Gt("nodeId", 0).
Where("(isDone=0 OR (isDone=1 AND isOk=0))").
@@ -220,17 +284,37 @@ func (this *NodeTaskDAO) FindAllDoingNodeTasksWithClusterId(tx *dbs.Tx, clusterI
return
}
// FindAllDoingNodeIds 查询有任务的节点IDs
func (this *NodeTaskDAO) FindAllDoingNodeIds(tx *dbs.Tx, role string) ([]int64, error) {
ones, err := this.Query(tx).
Result("DISTINCT(nodeId) AS nodeId").
Attr("role", role).
Gt("nodeId", 0).
Attr("isDone", false).
FindAll()
if err != nil {
return nil, err
}
var result []int64
for _, one := range ones {
result = append(result, int64(one.(*NodeTask).NodeId))
}
return result, nil
}
// ExistsDoingNodeTasks 检查是否有正在执行的任务
func (this *NodeTaskDAO) ExistsDoingNodeTasks(tx *dbs.Tx) (bool, error) {
func (this *NodeTaskDAO) ExistsDoingNodeTasks(tx *dbs.Tx, role string) (bool, error) {
return this.Query(tx).
Attr("role", role).
Where("(isDone=0 OR (isDone=1 AND isOk=0))").
Gt("nodeId", 0).
Exist()
}
// ExistsErrorNodeTasks 是否有错误的任务
func (this *NodeTaskDAO) ExistsErrorNodeTasks(tx *dbs.Tx) (bool, error) {
func (this *NodeTaskDAO) ExistsErrorNodeTasks(tx *dbs.Tx, role string) (bool, error) {
return this.Query(tx).
Attr("role", role).
Where("(isDone=1 AND isOk=0)").
Exist()
}
@@ -244,16 +328,18 @@ func (this *NodeTaskDAO) DeleteNodeTask(tx *dbs.Tx, taskId int64) error {
}
// CountDoingNodeTasks 计算正在执行的任务
func (this *NodeTaskDAO) CountDoingNodeTasks(tx *dbs.Tx) (int64, error) {
func (this *NodeTaskDAO) CountDoingNodeTasks(tx *dbs.Tx, role string) (int64, error) {
return this.Query(tx).
Attr("isDone", 0).
Attr("role", role).
Gt("nodeId", 0).
Count()
}
// FindNotifyingNodeTasks 查找需要通知的任务
func (this *NodeTaskDAO) FindNotifyingNodeTasks(tx *dbs.Tx, size int64) (result []*NodeTask, err error) {
func (this *NodeTaskDAO) FindNotifyingNodeTasks(tx *dbs.Tx, role string, size int64) (result []*NodeTask, err error) {
_, err = this.Query(tx).
Attr("role", role).
Gt("nodeId", 0).
Attr("isNotified", 0).
Attr("isDone", 0).

View File

@@ -1,8 +1,9 @@
package models
// 节点同步任务
// NodeTask 节点同步任务
type NodeTask struct {
Id uint64 `field:"id"` // ID
Role string `field:"role"` // 节点角色
NodeId uint32 `field:"nodeId"` // 节点ID
ClusterId uint32 `field:"clusterId"` // 集群ID
Type string `field:"type"` // 任务类型
@@ -16,6 +17,7 @@ type NodeTask struct {
type NodeTaskOperator struct {
Id interface{} // ID
Role interface{} // 节点角色
NodeId interface{} // 节点ID
ClusterId interface{} // 集群ID
Type interface{} // 任务类型

View File

@@ -1,7 +1,8 @@
package nameservers
package models
import (
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
@@ -127,6 +128,22 @@ func (this *NSClusterDAO) FindAllEnabledClusters(tx *dbs.Tx) (result []*NSCluste
return
}
// FindAllEnabledClusterIds 获取所有集群IDs
func (this *NSClusterDAO) FindAllEnabledClusterIds(tx *dbs.Tx) ([]int64, error) {
ones, err := this.Query(tx).
State(NSClusterStateEnabled).
ResultPk().
FindAll()
if err != nil {
return nil, err
}
var result = []int64{}
for _, one := range ones {
result = append(result, int64(one.(*NSCluster).Id))
}
return result, nil
}
// UpdateClusterAccessLog 设置访问日志
func (this *NSClusterDAO) UpdateClusterAccessLog(tx *dbs.Tx, clusterId int64, accessLogJSON []byte) error {
return this.Query(tx).
@@ -143,3 +160,8 @@ func (this *NSClusterDAO) FindClusterAccessLog(tx *dbs.Tx, clusterId int64) ([]b
FindStringCol("")
return []byte(accessLog), err
}
// NotifyUpdate 通知更改
func (this *NSClusterDAO) NotifyUpdate(tx *dbs.Tx, clusterId int64) error {
return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, NSNodeTaskTypeConfigChanged)
}

View File

@@ -1,4 +1,4 @@
package nameservers
package models
import (
_ "github.com/go-sql-driver/mysql"

View File

@@ -1,4 +1,4 @@
package nameservers
package models
// NSCluster 域名服务器集群
type NSCluster struct {

View File

@@ -0,0 +1 @@
package models

View File

@@ -1,8 +1,7 @@
package nameservers
package models
import (
"encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
@@ -214,7 +213,7 @@ func (this *NSNodeDAO) CreateNode(tx *dbs.Tx, adminId int64, name string, cluste
secret := rands.String(32)
// 保存API Token
err = models.SharedApiTokenDAO.CreateAPIToken(tx, uniqueId, secret, nodeconfigs.NodeRoleDNS)
err = SharedApiTokenDAO.CreateAPIToken(tx, uniqueId, secret, nodeconfigs.NodeRoleDNS)
if err != nil {
return
}
@@ -281,7 +280,7 @@ func (this *NSNodeDAO) FindEnabledNodeIdWithUniqueId(tx *dbs.Tx, uniqueId string
}
// FindNodeInstallStatus 查询节点的安装状态
func (this *NSNodeDAO) FindNodeInstallStatus(tx *dbs.Tx, nodeId int64) (*models.NodeInstallStatus, error) {
func (this *NSNodeDAO) FindNodeInstallStatus(tx *dbs.Tx, nodeId int64) (*NodeInstallStatus, error) {
node, err := this.Query(tx).
Pk(nodeId).
Result("installStatus", "isInstalled").
@@ -296,10 +295,10 @@ func (this *NSNodeDAO) FindNodeInstallStatus(tx *dbs.Tx, nodeId int64) (*models.
installStatus := node.(*NSNode).InstallStatus
isInstalled := node.(*NSNode).IsInstalled == 1
if len(installStatus) == 0 {
return models.NewNodeInstallStatus(), nil
return NewNodeInstallStatus(), nil
}
status := &models.NodeInstallStatus{}
status := &NodeInstallStatus{}
err = json.Unmarshal([]byte(installStatus), status)
if err != nil {
return nil, err
@@ -391,7 +390,7 @@ func (this *NSNodeDAO) ComposeNodeConfig(tx *dbs.Tx, nodeId int64) (*dnsconfigs.
// 访问日志
// 全局配置
{
globalValue, err := models.SharedSysSettingDAO.ReadSetting(tx, systemconfigs.SettingCodeNSAccessLogSetting)
globalValue, err := SharedSysSettingDAO.ReadSetting(tx, systemconfigs.SettingCodeNSAccessLogSetting)
if err != nil {
return nil, err
}
@@ -498,6 +497,29 @@ func (this *NSNodeDAO) UpdateNodeStatusIsNotified(tx *dbs.Tx, nodeId int64) erro
UpdateQuickly()
}
// FindAllNodeIdsMatch 匹配节点并返回节点ID
func (this *NSNodeDAO) FindAllNodeIdsMatch(tx *dbs.Tx, clusterId int64, includeSecondaryNodes bool, isOn configutils.BoolState) (result []int64, err error) {
query := this.Query(tx)
query.State(NSNodeStateEnabled)
if clusterId > 0 {
query.Attr("clusterId", clusterId)
}
if isOn == configutils.BoolStateYes {
query.Attr("isOn", true)
} else if isOn == configutils.BoolStateNo {
query.Attr("isOn", false)
}
query.Result("id")
ones, _, err := query.FindOnes()
if err != nil {
return nil, err
}
for _, one := range ones {
result = append(result, one.GetInt64("id"))
}
return
}
// NotifyUpdate 通知更新
func (this *NSNodeDAO) NotifyUpdate(tx *dbs.Tx, nodeId int64) error {
// TODO 先什么都不做

View File

@@ -1,4 +1,4 @@
package nameservers
package models
import (
_ "github.com/go-sql-driver/mysql"

View File

@@ -1,4 +1,4 @@
package nameservers
package models
// NSNode 域名服务器节点
type NSNode struct {

View File

@@ -1,21 +1,20 @@
package nameservers
package models
import (
"encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"time"
)
// DecodeInstallStatus 安装状态
func (this *NSNode) DecodeInstallStatus() (*models.NodeInstallStatus, error) {
func (this *NSNode) DecodeInstallStatus() (*NodeInstallStatus, error) {
if len(this.InstallStatus) == 0 || this.InstallStatus == "null" {
return models.NewNodeInstallStatus(), nil
return NewNodeInstallStatus(), nil
}
status := &models.NodeInstallStatus{}
status := &NodeInstallStatus{}
err := json.Unmarshal([]byte(this.InstallStatus), status)
if err != nil {
return models.NewNodeInstallStatus(), err
return NewNodeInstallStatus(), err
}
// 如果N秒钟没有更新状态则认为不在运行

View File

@@ -8,6 +8,7 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models/dns"
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs"
@@ -1270,11 +1271,11 @@ func (this *ServerDAO) UpdateUserServersClusterId(tx *dbs.Tx, userId int64, oldC
}
if oldClusterId > 0 {
err = SharedNodeTaskDAO.CreateClusterTask(tx, oldClusterId, NodeTaskTypeConfigChanged)
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, oldClusterId, NodeTaskTypeConfigChanged)
if err != nil {
return err
}
err = SharedNodeTaskDAO.CreateClusterTask(tx, oldClusterId, NodeTaskTypeIPItemChanged)
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, oldClusterId, NodeTaskTypeIPItemChanged)
if err != nil {
return err
}
@@ -1285,11 +1286,11 @@ func (this *ServerDAO) UpdateUserServersClusterId(tx *dbs.Tx, userId int64, oldC
}
if newClusterId > 0 {
err = SharedNodeTaskDAO.CreateClusterTask(tx, newClusterId, NodeTaskTypeConfigChanged)
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, newClusterId, NodeTaskTypeConfigChanged)
if err != nil {
return err
}
err = SharedNodeTaskDAO.CreateClusterTask(tx, newClusterId, NodeTaskTypeIPItemChanged)
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, newClusterId, NodeTaskTypeIPItemChanged)
if err != nil {
return err
}
@@ -1437,7 +1438,7 @@ func (this *ServerDAO) NotifyUpdate(tx *dbs.Tx, serverId int64) error {
if clusterId == 0 {
return nil
}
return SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, NodeTaskTypeConfigChanged)
return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeConfigChanged)
}
// NotifyDNSUpdate 通知DNS更新