diff --git a/internal/db/models/ip_item_dao.go b/internal/db/models/ip_item_dao.go index 01f90e0a..ec21b4d7 100644 --- a/internal/db/models/ip_item_dao.go +++ b/internal/db/models/ip_item_dao.go @@ -3,6 +3,7 @@ package models import ( "github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/utils" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" @@ -302,7 +303,7 @@ func (this *IPItemDAO) NotifyUpdate(tx *dbs.Tx, itemId int64) error { if len(resultClusterIds) > 0 { for _, clusterId := range resultClusterIds { - err = SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, NodeTaskTypeIPItemChanged) + err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeIPItemChanged) if err != nil { return err } diff --git a/internal/db/models/ip_list_dao.go b/internal/db/models/ip_list_dao.go index 554f8e4b..541ecdc7 100644 --- a/internal/db/models/ip_list_dao.go +++ b/internal/db/models/ip_list_dao.go @@ -2,6 +2,7 @@ package models import ( "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/ipconfigs" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" @@ -253,7 +254,7 @@ func (this *IPListDAO) NotifyUpdate(tx *dbs.Tx, listId int64, taskType NodeTaskT if len(resultClusterIds) > 0 { for _, clusterId := range resultClusterIds { - err = SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, taskType) + err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, taskType) if err != nil { return err } diff --git a/internal/db/models/metric_item_dao.go b/internal/db/models/metric_item_dao.go index e9fa2604..f5c1e0ef 100644 --- a/internal/db/models/metric_item_dao.go +++ b/internal/db/models/metric_item_dao.go @@ -3,6 +3,7 @@ package models import ( "encoding/json" "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" @@ -308,7 +309,7 @@ func (this *MetricItemDAO) NotifyUpdate(tx *dbs.Tx, itemId int64, isPublic bool) return err } for _, clusterId := range clusterIds { - err = SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, NodeTaskTypeConfigChanged) + err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeConfigChanged) if err != nil { return err } @@ -320,7 +321,7 @@ func (this *MetricItemDAO) NotifyUpdate(tx *dbs.Tx, itemId int64, isPublic bool) return err } for _, clusterId := range clusterIds { - err = SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, NodeTaskTypeConfigChanged) + err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeConfigChanged) if err != nil { return err } diff --git a/internal/db/models/nameservers/ns_cluster_model_ext.go b/internal/db/models/nameservers/ns_cluster_model_ext.go deleted file mode 100644 index e0f8e7e2..00000000 --- a/internal/db/models/nameservers/ns_cluster_model_ext.go +++ /dev/null @@ -1 +0,0 @@ -package nameservers diff --git a/internal/db/models/nameservers/ns_domain_dao.go b/internal/db/models/nameservers/ns_domain_dao.go index 13e417b4..41be69a2 100644 --- a/internal/db/models/nameservers/ns_domain_dao.go +++ b/internal/db/models/nameservers/ns_domain_dao.go @@ -3,6 +3,7 @@ package nameservers import ( "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" @@ -35,12 +36,15 @@ func init() { } // EnableNSDomain 启用条目 -func (this *NSDomainDAO) EnableNSDomain(tx *dbs.Tx, id int64) error { +func (this *NSDomainDAO) EnableNSDomain(tx *dbs.Tx, domainId int64) error { _, err := this.Query(tx). - Pk(id). + Pk(domainId). Set("state", NSDomainStateEnabled). Update() - return err + if err != nil { + return err + } + return this.NotifyUpdate(tx, domainId) } // DisableNSDomain 禁用条目 @@ -55,7 +59,10 @@ func (this *NSDomainDAO) DisableNSDomain(tx *dbs.Tx, domainId int64) error { Set("state", NSDomainStateDisabled). Set("version", version). Update() - return err + if err != nil { + return err + } + return this.NotifyUpdate(tx, domainId) } // FindEnabledNSDomain 查找启用中的条目 @@ -92,7 +99,16 @@ func (this *NSDomainDAO) CreateDomain(tx *dbs.Tx, clusterId int64, userId int64, op.Version = version op.IsOn = true op.State = NSDomainStateEnabled - return this.SaveInt64(tx, op) + domainId, err := this.SaveInt64(tx, op) + if err != nil { + return 0, err + } + + err = this.NotifyUpdate(tx, domainId) + if err != nil { + return domainId, err + } + return domainId, nil } // UpdateDomain 修改域名 @@ -101,6 +117,14 @@ func (this *NSDomainDAO) UpdateDomain(tx *dbs.Tx, domainId int64, clusterId int6 return errors.New("invalid domainId") } + oldClusterId, err := this.Query(tx). + Pk(domainId). + Result("clusterId"). + FindInt64Col(0) + if err != nil { + return err + } + version, err := this.IncreaseVersion(tx) if err != nil { return err @@ -112,7 +136,20 @@ func (this *NSDomainDAO) UpdateDomain(tx *dbs.Tx, domainId int64, clusterId int6 op.UserId = userId op.IsOn = isOn op.Version = version - return this.Save(tx, op) + err = this.Save(tx, op) + if err != nil { + return err + } + + // 通知更新 + if oldClusterId > 0 && oldClusterId != clusterId { + err = models.SharedNSClusterDAO.NotifyUpdate(tx, oldClusterId) + if err != nil { + return err + } + } + + return this.NotifyUpdate(tx, domainId) } // CountAllEnabledDomains 计算域名数量 @@ -121,7 +158,7 @@ func (this *NSDomainDAO) CountAllEnabledDomains(tx *dbs.Tx, clusterId int64, use if clusterId > 0 { query.Attr("clusterId", clusterId) } else { - query.Where("clusterId IN (SELECT id FROM " + SharedNSClusterDAO.Table + " WHERE state=1)") + query.Where("clusterId IN (SELECT id FROM " + models.SharedNSClusterDAO.Table + " WHERE state=1)") } if userId > 0 { query.Attr("userId", userId) @@ -144,7 +181,7 @@ func (this *NSDomainDAO) ListEnabledDomains(tx *dbs.Tx, clusterId int64, userId if clusterId > 0 { query.Attr("clusterId", clusterId) } else { - query.Where("clusterId IN (SELECT id FROM " + SharedNSClusterDAO.Table + " WHERE state=1)") + query.Where("clusterId IN (SELECT id FROM " + models.SharedNSClusterDAO.Table + " WHERE state=1)") } if userId > 0 { query.Attr("userId", userId) @@ -214,22 +251,39 @@ func (this *NSDomainDAO) UpdateDomainTSIG(tx *dbs.Tx, domainId int64, tsigJSON [ return err } - return this.Query(tx). + err = this.Query(tx). Pk(domainId). Set("tsig", tsigJSON). Set("version", version). UpdateQuickly() -} - -// NotifyUpdate 通知更改 -func (this *NSDomainDAO) NotifyUpdate(tx *dbs.Tx, domainId int64) error { - version, err := this.IncreaseVersion(tx) if err != nil { return err } + return this.NotifyUpdate(tx, domainId) +} + +// FindEnabledDomainClusterId 获取域名的集群ID +func (this *NSDomainDAO) FindEnabledDomainClusterId(tx *dbs.Tx, domainId int64) (int64, error) { return this.Query(tx). Pk(domainId). - Set("version", version). - UpdateQuickly() + State(NSDomainStateEnabled). + Result("clusterId"). + FindInt64Col(0) +} + +// NotifyUpdate 通知更改 +func (this *NSDomainDAO) NotifyUpdate(tx *dbs.Tx, domainId int64) error { + clusterId, err := this.Query(tx). + Result("clusterId"). + Pk(domainId). + FindInt64Col(0) + if err != nil { + return err + } + if clusterId > 0 { + return models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, models.NSNodeTaskTypeDomainChanged) + } + + return nil } diff --git a/internal/db/models/nameservers/ns_key_dao.go b/internal/db/models/nameservers/ns_key_dao.go index 4decd932..4a646d6b 100644 --- a/internal/db/models/nameservers/ns_key_dao.go +++ b/internal/db/models/nameservers/ns_key_dao.go @@ -4,6 +4,7 @@ import ( "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeCommon/pkg/dnsconfigs" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" @@ -176,8 +177,33 @@ func (this *NSKeyDAO) NotifyUpdate(tx *dbs.Tx, keyId int64) error { if err != nil { return err } - return this.Query(tx). + err = this.Query(tx). Pk(keyId). Set("version", version). UpdateQuickly() + if err != nil { + return err + } + + // 通知集群 + domainId, err := this.Query(tx). + Pk(keyId). + Result("domainId"). + FindInt64Col(0) + if err != nil { + return err + } + if domainId > 0 { + clusterId, err := SharedNSDomainDAO.FindEnabledDomainClusterId(tx, domainId) + if err != nil { + return err + } + if clusterId > 0 { + err = models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, models.NSNodeTaskTypeKeyChanged) + if err != nil { + return err + } + } + } + return nil } diff --git a/internal/db/models/nameservers/ns_record_dao.go b/internal/db/models/nameservers/ns_record_dao.go index 02661819..6cfdc655 100644 --- a/internal/db/models/nameservers/ns_record_dao.go +++ b/internal/db/models/nameservers/ns_record_dao.go @@ -5,6 +5,7 @@ import ( "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeCommon/pkg/dnsconfigs" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" @@ -38,27 +39,33 @@ func init() { } // EnableNSRecord 启用条目 -func (this *NSRecordDAO) EnableNSRecord(tx *dbs.Tx, id uint64) error { +func (this *NSRecordDAO) EnableNSRecord(tx *dbs.Tx, recordId int64) error { _, err := this.Query(tx). - Pk(id). + Pk(recordId). Set("state", NSRecordStateEnabled). Update() - return err + if err != nil { + return err + } + return this.NotifyUpdate(tx, recordId) } // DisableNSRecord 禁用条目 -func (this *NSRecordDAO) DisableNSRecord(tx *dbs.Tx, id int64) error { +func (this *NSRecordDAO) DisableNSRecord(tx *dbs.Tx, recordId int64) error { version, err := this.IncreaseVersion(tx) if err != nil { return err } _, err = this.Query(tx). - Pk(id). + Pk(recordId). Set("state", NSRecordStateDisabled). Set("version", version). Update() - return err + if err != nil { + return err + } + return this.NotifyUpdate(tx, recordId) } // FindEnabledNSRecord 查找启用中的条目 @@ -109,7 +116,16 @@ func (this *NSRecordDAO) CreateRecord(tx *dbs.Tx, domainId int64, description st op.IsOn = true op.State = NSRecordStateEnabled op.Version = version - return this.SaveInt64(tx, op) + recordId, err := this.SaveInt64(tx, op) + if err != nil { + return 0, err + } + + err = this.NotifyUpdate(tx, recordId) + if err != nil { + return 0, err + } + return recordId, nil } // UpdateRecord 修改记录 @@ -144,7 +160,12 @@ func (this *NSRecordDAO) UpdateRecord(tx *dbs.Tx, recordId int64, description st op.Version = version - return this.Save(tx, op) + err = this.Save(tx, op) + if err != nil { + return err + } + + return this.NotifyUpdate(tx, recordId) } // CountAllEnabledDomainRecords 计算域名中记录数量 @@ -230,3 +251,31 @@ func (this *NSRecordDAO) FindEnabledRecordWithName(tx *dbs.Tx, domainId int64, r } return record.(*NSRecord), nil } + +// NotifyUpdate 通知更新 +func (this *NSRecordDAO) NotifyUpdate(tx *dbs.Tx, recordId int64) error { + domainId, err := this.Query(tx). + Pk(recordId). + Result("domainId"). + FindInt64Col(0) + if err != nil { + return err + } + + if domainId == 0 { + return nil + } + + clusterId, err := SharedNSDomainDAO.FindEnabledDomainClusterId(tx, domainId) + if err != nil { + return err + } + + if clusterId > 0 { + err = models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, models.NSNodeTaskTypeRecordChanged) + if err != nil { + return err + } + } + return nil +} diff --git a/internal/db/models/nameservers/ns_route_dao.go b/internal/db/models/nameservers/ns_route_dao.go index b84e11f3..7dbf9440 100644 --- a/internal/db/models/nameservers/ns_route_dao.go +++ b/internal/db/models/nameservers/ns_route_dao.go @@ -3,6 +3,7 @@ package nameservers import ( "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" @@ -35,12 +36,21 @@ func init() { } // EnableNSRoute 启用条目 -func (this *NSRouteDAO) EnableNSRoute(tx *dbs.Tx, id int64) error { - _, err := this.Query(tx). - Pk(id). +func (this *NSRouteDAO) EnableNSRoute(tx *dbs.Tx, routeId int64) error { + version, err := this.IncreaseVersion(tx) + if err != nil { + return err + } + + _, err = this.Query(tx). + Pk(routeId). Set("state", NSRouteStateEnabled). + Set("version", version). Update() - return err + if err != nil { + return err + } + return this.NotifyUpdate(tx) } // DisableNSRoute 禁用条目 @@ -55,7 +65,10 @@ func (this *NSRouteDAO) DisableNSRoute(tx *dbs.Tx, routeId int64) error { Set("state", NSRouteStateDisabled). Set("version", version). Update() - return err + if err != nil { + return err + } + return this.NotifyUpdate(tx) } // FindEnabledNSRoute 查找启用中的条目 @@ -98,7 +111,17 @@ func (this *NSRouteDAO) CreateRoute(tx *dbs.Tx, clusterId int64, domainId int64, op.IsOn = true op.State = NSRouteStateEnabled op.Version = version - return this.SaveInt64(tx, op) + routeId, err := this.SaveInt64(tx, op) + if err != nil { + return 0, err + } + + err = this.NotifyUpdate(tx) + if err != nil { + return 0, err + } + + return routeId, nil } // UpdateRoute 修改线路 @@ -123,7 +146,12 @@ func (this *NSRouteDAO) UpdateRoute(tx *dbs.Tx, routeId int64, name string, rang op.Version = version - return this.Save(tx, op) + err = this.Save(tx, op) + if err != nil { + return err + } + + return this.NotifyUpdate(tx) } // UpdateRouteOrders 修改线路排序 @@ -145,7 +173,8 @@ func (this *NSRouteDAO) UpdateRouteOrders(tx *dbs.Tx, routeIds []int64) error { } order-- } - return nil + + return this.NotifyUpdate(tx) } // FindAllEnabledRoutes 列出所有线路 @@ -190,3 +219,19 @@ func (this *NSRouteDAO) ListRoutesAfterVersion(tx *dbs.Tx, version int64, size i FindAll() return } + +// NotifyUpdate 通知更新 +func (this *NSRouteDAO) NotifyUpdate(tx *dbs.Tx) error { + // 线路变更时所有集群都要更新 + clusterIds, err := models.SharedNSClusterDAO.FindAllEnabledClusterIds(tx) + if err != nil { + return err + } + for _, clusterId := range clusterIds { + err = models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, models.NSNodeTaskTypeRouteChanged) + if err != nil { + return err + } + } + return nil +} diff --git a/internal/db/models/node_cluster_dao.go b/internal/db/models/node_cluster_dao.go index 83954676..76276a97 100644 --- a/internal/db/models/node_cluster_dao.go +++ b/internal/db/models/node_cluster_dao.go @@ -869,7 +869,7 @@ func (this *NodeClusterDAO) FindEnabledNodeClustersWithIds(tx *dbs.Tx, clusterId // NotifyUpdate 通知更新 func (this *NodeClusterDAO) NotifyUpdate(tx *dbs.Tx, clusterId int64) error { - return SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, NodeTaskTypeConfigChanged) + return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeConfigChanged) } // NotifyDNSUpdate 通知DNS更新 diff --git a/internal/db/models/node_cluster_metric_item_dao.go b/internal/db/models/node_cluster_metric_item_dao.go index b2e4789d..8b21a97c 100644 --- a/internal/db/models/node_cluster_metric_item_dao.go +++ b/internal/db/models/node_cluster_metric_item_dao.go @@ -2,6 +2,7 @@ package models import ( "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" @@ -158,5 +159,5 @@ func (this *NodeClusterMetricItemDAO) ExistsClusterItem(tx *dbs.Tx, clusterId in // NotifyUpdate 通知更新 func (this *NodeClusterMetricItemDAO) NotifyUpdate(tx *dbs.Tx, clusterId int64) error { - return SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, NodeTaskTypeConfigChanged) + return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeConfigChanged) } diff --git a/internal/db/models/node_dao.go b/internal/db/models/node_dao.go index 0284fe9d..73662ab8 100644 --- a/internal/db/models/node_dao.go +++ b/internal/db/models/node_dao.go @@ -1290,7 +1290,7 @@ func (this *NodeDAO) NotifyUpdate(tx *dbs.Tx, nodeId int64) error { return err } if clusterId > 0 { - return SharedNodeTaskDAO.CreateNodeTask(tx, clusterId, nodeId, NodeTaskTypeConfigChanged) + return SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, NodeTaskTypeConfigChanged) } return nil } diff --git a/internal/db/models/node_task_dao.go b/internal/db/models/node_task_dao.go index 2d511910..22e837e9 100644 --- a/internal/db/models/node_task_dao.go +++ b/internal/db/models/node_task_dao.go @@ -1,8 +1,8 @@ package models import ( - "github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils" "github.com/TeaOSLab/EdgeCommon/pkg/configutils" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" @@ -17,6 +17,14 @@ const ( NodeTaskTypeConfigChanged NodeTaskType = "configChanged" NodeTaskTypeIPItemChanged NodeTaskType = "ipItemChanged" NodeTaskTypeNodeVersionChanged NodeTaskType = "nodeVersionChanged" + + // NS相关 + + NSNodeTaskTypeConfigChanged NodeTaskType = "nsConfigChanged" + NSNodeTaskTypeDomainChanged NodeTaskType = "nsDomainChanged" + NSNodeTaskTypeRecordChanged NodeTaskType = "nsRecordChanged" + NSNodeTaskTypeRouteChanged NodeTaskType = "nsRouteChanged" + NSNodeTaskTypeKeyChanged NodeTaskType = "nsKeyChanged" ) type NodeTaskDAO dbs.DAO @@ -41,14 +49,15 @@ func init() { } // CreateNodeTask 创建单个节点任务 -func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, clusterId int64, nodeId int64, taskType NodeTaskType) error { +func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, role string, clusterId int64, nodeId int64, taskType NodeTaskType) error { if clusterId <= 0 || nodeId <= 0 { return nil } - uniqueId := numberutils.FormatInt64(nodeId) + "@node@" + taskType + uniqueId := role + "@" + types.String(nodeId) + "@node@" + taskType updatedAt := time.Now().Unix() _, _, err := this.Query(tx). InsertOrUpdate(maps.Map{ + "role": role, "clusterId": clusterId, "nodeId": nodeId, "type": taskType, @@ -68,15 +77,16 @@ func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, clusterId int64, nodeId int6 } // CreateClusterTask 创建集群任务 -func (this *NodeTaskDAO) CreateClusterTask(tx *dbs.Tx, clusterId int64, taskType NodeTaskType) error { +func (this *NodeTaskDAO) CreateClusterTask(tx *dbs.Tx, role string, clusterId int64, taskType NodeTaskType) error { if clusterId <= 0 { return nil } - uniqueId := numberutils.FormatInt64(clusterId) + "@cluster@" + taskType + uniqueId := role + "@" + types.String(clusterId) + "@cluster@" + taskType updatedAt := time.Now().Unix() _, _, err := this.Query(tx). InsertOrUpdate(maps.Map{ + "role": role, "clusterId": clusterId, "nodeId": 0, "type": taskType, @@ -96,14 +106,15 @@ func (this *NodeTaskDAO) CreateClusterTask(tx *dbs.Tx, clusterId int64, taskType return err } -// ExtractClusterTask 分解集群任务 -func (this *NodeTaskDAO) ExtractClusterTask(tx *dbs.Tx, clusterId int64, taskType NodeTaskType) error { +// ExtractNodeClusterTask 分解边缘节点集群任务 +func (this *NodeTaskDAO) ExtractNodeClusterTask(tx *dbs.Tx, clusterId int64, taskType NodeTaskType) error { nodeIds, err := SharedNodeDAO.FindAllNodeIdsMatch(tx, clusterId, true, configutils.BoolStateYes) if err != nil { return err } _, err = this.Query(tx). + Attr("role", nodeconfigs.NodeRoleNode). Attr("clusterId", clusterId). Param("clusterIdString", types.String(clusterId)). Where("nodeId> 0"). @@ -114,13 +125,52 @@ func (this *NodeTaskDAO) ExtractClusterTask(tx *dbs.Tx, clusterId int64, taskTyp } for _, nodeId := range nodeIds { - err = this.CreateNodeTask(tx, clusterId, nodeId, taskType) + err = this.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, taskType) if err != nil { return err } } _, err = this.Query(tx). + Attr("role", nodeconfigs.NodeRoleNode). + Attr("clusterId", clusterId). + Attr("nodeId", 0). + Attr("type", taskType). + Delete() + if err != nil { + return err + } + + return nil +} + +// ExtractNSClusterTask 分解NS节点集群任务 +func (this *NodeTaskDAO) ExtractNSClusterTask(tx *dbs.Tx, clusterId int64, taskType NodeTaskType) error { + nodeIds, err := SharedNSNodeDAO.FindAllNodeIdsMatch(tx, clusterId, true, configutils.BoolStateYes) + if err != nil { + return err + } + + _, err = this.Query(tx). + Attr("role", nodeconfigs.NodeRoleDNS). + Attr("clusterId", clusterId). + Param("clusterIdString", types.String(clusterId)). + Where("nodeId> 0"). + Attr("type", taskType). + Delete() + if err != nil { + return err + } + + for _, nodeId := range nodeIds { + err = this.CreateNodeTask(tx, nodeconfigs.NodeRoleDNS, clusterId, nodeId, taskType) + if err != nil { + return err + } + } + + _, err = this.Query(tx). + Attr("role", nodeconfigs.NodeRoleDNS). Attr("clusterId", clusterId). Attr("nodeId", 0). Attr("type", taskType). @@ -133,8 +183,9 @@ func (this *NodeTaskDAO) ExtractClusterTask(tx *dbs.Tx, clusterId int64, taskTyp } // ExtractAllClusterTasks 分解所有集群任务 -func (this *NodeTaskDAO) ExtractAllClusterTasks(tx *dbs.Tx) error { +func (this *NodeTaskDAO) ExtractAllClusterTasks(tx *dbs.Tx, role string) error { ones, err := this.Query(tx). + Attr("role", role). Attr("nodeId", 0). FindAll() if err != nil { @@ -142,36 +193,47 @@ func (this *NodeTaskDAO) ExtractAllClusterTasks(tx *dbs.Tx) error { } for _, one := range ones { clusterId := int64(one.(*NodeTask).ClusterId) - err = this.ExtractClusterTask(tx, clusterId, one.(*NodeTask).Type) - if err != nil { - return err + switch role { + case nodeconfigs.NodeRoleNode: + err = this.ExtractNodeClusterTask(tx, clusterId, one.(*NodeTask).Type) + if err != nil { + return err + } + case nodeconfigs.NodeRoleDNS: + err = this.ExtractNSClusterTask(tx, clusterId, one.(*NodeTask).Type) + if err != nil { + return err + } } } return nil } // DeleteAllClusterTasks 删除集群所有相关任务 -func (this *NodeTaskDAO) DeleteAllClusterTasks(tx *dbs.Tx, clusterId int64) error { +func (this *NodeTaskDAO) DeleteAllClusterTasks(tx *dbs.Tx, role string, clusterId int64) error { _, err := this.Query(tx). + Attr("role", role). Attr("clusterId", clusterId). Delete() return err } // DeleteNodeTasks 删除节点相关任务 -func (this *NodeTaskDAO) DeleteNodeTasks(tx *dbs.Tx, nodeId int64) error { +func (this *NodeTaskDAO) DeleteNodeTasks(tx *dbs.Tx, role string, nodeId int64) error { _, err := this.Query(tx). + Attr("role", role). Attr("nodeId", nodeId). Delete() return err } // FindDoingNodeTasks 查询一个节点的所有任务 -func (this *NodeTaskDAO) FindDoingNodeTasks(tx *dbs.Tx, nodeId int64) (result []*NodeTask, err error) { +func (this *NodeTaskDAO) FindDoingNodeTasks(tx *dbs.Tx, role string, nodeId int64) (result []*NodeTask, err error) { if nodeId <= 0 { return } _, err = this.Query(tx). + Attr("role", role). Attr("nodeId", nodeId). Where("(isDone=0 OR (isDone=1 AND isOk=0))"). Slice(&result). @@ -191,9 +253,10 @@ func (this *NodeTaskDAO) UpdateNodeTaskDone(tx *dbs.Tx, taskId int64, isOk bool, } // FindAllDoingTaskClusterIds 查找正在更新的集群IDs -func (this *NodeTaskDAO) FindAllDoingTaskClusterIds(tx *dbs.Tx) ([]int64, error) { +func (this *NodeTaskDAO) FindAllDoingTaskClusterIds(tx *dbs.Tx, role string) ([]int64, error) { ones, _, err := this.Query(tx). Result("DISTINCT(clusterId) AS clusterId"). + Attr("role", role). Where("(nodeId=0 OR (isDone=0 OR (isDone=1 AND isOk=0)))"). FindOnes() if err != nil { @@ -207,8 +270,9 @@ func (this *NodeTaskDAO) FindAllDoingTaskClusterIds(tx *dbs.Tx) ([]int64, error) } // FindAllDoingNodeTasksWithClusterId 查询某个集群下所有的任务 -func (this *NodeTaskDAO) FindAllDoingNodeTasksWithClusterId(tx *dbs.Tx, clusterId int64) (result []*NodeTask, err error) { +func (this *NodeTaskDAO) FindAllDoingNodeTasksWithClusterId(tx *dbs.Tx, role string, clusterId int64) (result []*NodeTask, err error) { _, err = this.Query(tx). + Attr("role", role). Attr("clusterId", clusterId). Gt("nodeId", 0). Where("(isDone=0 OR (isDone=1 AND isOk=0))"). @@ -220,17 +284,37 @@ func (this *NodeTaskDAO) FindAllDoingNodeTasksWithClusterId(tx *dbs.Tx, clusterI return } +// FindAllDoingNodeIds 查询有任务的节点IDs +func (this *NodeTaskDAO) FindAllDoingNodeIds(tx *dbs.Tx, role string) ([]int64, error) { + ones, err := this.Query(tx). + Result("DISTINCT(nodeId) AS nodeId"). + Attr("role", role). + Gt("nodeId", 0). + Attr("isDone", false). + FindAll() + if err != nil { + return nil, err + } + var result []int64 + for _, one := range ones { + result = append(result, int64(one.(*NodeTask).NodeId)) + } + return result, nil +} + // ExistsDoingNodeTasks 检查是否有正在执行的任务 -func (this *NodeTaskDAO) ExistsDoingNodeTasks(tx *dbs.Tx) (bool, error) { +func (this *NodeTaskDAO) ExistsDoingNodeTasks(tx *dbs.Tx, role string) (bool, error) { return this.Query(tx). + Attr("role", role). Where("(isDone=0 OR (isDone=1 AND isOk=0))"). Gt("nodeId", 0). Exist() } // ExistsErrorNodeTasks 是否有错误的任务 -func (this *NodeTaskDAO) ExistsErrorNodeTasks(tx *dbs.Tx) (bool, error) { +func (this *NodeTaskDAO) ExistsErrorNodeTasks(tx *dbs.Tx, role string) (bool, error) { return this.Query(tx). + Attr("role", role). Where("(isDone=1 AND isOk=0)"). Exist() } @@ -244,16 +328,18 @@ func (this *NodeTaskDAO) DeleteNodeTask(tx *dbs.Tx, taskId int64) error { } // CountDoingNodeTasks 计算正在执行的任务 -func (this *NodeTaskDAO) CountDoingNodeTasks(tx *dbs.Tx) (int64, error) { +func (this *NodeTaskDAO) CountDoingNodeTasks(tx *dbs.Tx, role string) (int64, error) { return this.Query(tx). Attr("isDone", 0). + Attr("role", role). Gt("nodeId", 0). Count() } // FindNotifyingNodeTasks 查找需要通知的任务 -func (this *NodeTaskDAO) FindNotifyingNodeTasks(tx *dbs.Tx, size int64) (result []*NodeTask, err error) { +func (this *NodeTaskDAO) FindNotifyingNodeTasks(tx *dbs.Tx, role string, size int64) (result []*NodeTask, err error) { _, err = this.Query(tx). + Attr("role", role). Gt("nodeId", 0). Attr("isNotified", 0). Attr("isDone", 0). diff --git a/internal/db/models/node_task_model.go b/internal/db/models/node_task_model.go index 8774f886..8c48b5f8 100644 --- a/internal/db/models/node_task_model.go +++ b/internal/db/models/node_task_model.go @@ -1,8 +1,9 @@ package models -// 节点同步任务 +// NodeTask 节点同步任务 type NodeTask struct { Id uint64 `field:"id"` // ID + Role string `field:"role"` // 节点角色 NodeId uint32 `field:"nodeId"` // 节点ID ClusterId uint32 `field:"clusterId"` // 集群ID Type string `field:"type"` // 任务类型 @@ -16,6 +17,7 @@ type NodeTask struct { type NodeTaskOperator struct { Id interface{} // ID + Role interface{} // 节点角色 NodeId interface{} // 节点ID ClusterId interface{} // 集群ID Type interface{} // 任务类型 diff --git a/internal/db/models/nameservers/ns_cluster_dao.go b/internal/db/models/ns_cluster_dao.go similarity index 83% rename from internal/db/models/nameservers/ns_cluster_dao.go rename to internal/db/models/ns_cluster_dao.go index c3fdf06d..f909da8d 100644 --- a/internal/db/models/nameservers/ns_cluster_dao.go +++ b/internal/db/models/ns_cluster_dao.go @@ -1,7 +1,8 @@ -package nameservers +package models import ( "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" @@ -127,6 +128,22 @@ func (this *NSClusterDAO) FindAllEnabledClusters(tx *dbs.Tx) (result []*NSCluste return } +// FindAllEnabledClusterIds 获取所有集群IDs +func (this *NSClusterDAO) FindAllEnabledClusterIds(tx *dbs.Tx) ([]int64, error) { + ones, err := this.Query(tx). + State(NSClusterStateEnabled). + ResultPk(). + FindAll() + if err != nil { + return nil, err + } + var result = []int64{} + for _, one := range ones { + result = append(result, int64(one.(*NSCluster).Id)) + } + return result, nil +} + // UpdateClusterAccessLog 设置访问日志 func (this *NSClusterDAO) UpdateClusterAccessLog(tx *dbs.Tx, clusterId int64, accessLogJSON []byte) error { return this.Query(tx). @@ -143,3 +160,8 @@ func (this *NSClusterDAO) FindClusterAccessLog(tx *dbs.Tx, clusterId int64) ([]b FindStringCol("") return []byte(accessLog), err } + +// NotifyUpdate 通知更改 +func (this *NSClusterDAO) NotifyUpdate(tx *dbs.Tx, clusterId int64) error { + return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, NSNodeTaskTypeConfigChanged) +} diff --git a/internal/db/models/nameservers/ns_cluster_dao_test.go b/internal/db/models/ns_cluster_dao_test.go similarity index 81% rename from internal/db/models/nameservers/ns_cluster_dao_test.go rename to internal/db/models/ns_cluster_dao_test.go index e29fe962..224e9db7 100644 --- a/internal/db/models/nameservers/ns_cluster_dao_test.go +++ b/internal/db/models/ns_cluster_dao_test.go @@ -1,4 +1,4 @@ -package nameservers +package models import ( _ "github.com/go-sql-driver/mysql" diff --git a/internal/db/models/nameservers/ns_cluster_model.go b/internal/db/models/ns_cluster_model.go similarity index 97% rename from internal/db/models/nameservers/ns_cluster_model.go rename to internal/db/models/ns_cluster_model.go index f4a3fac2..d401a39f 100644 --- a/internal/db/models/nameservers/ns_cluster_model.go +++ b/internal/db/models/ns_cluster_model.go @@ -1,4 +1,4 @@ -package nameservers +package models // NSCluster 域名服务器集群 type NSCluster struct { diff --git a/internal/db/models/ns_cluster_model_ext.go b/internal/db/models/ns_cluster_model_ext.go new file mode 100644 index 00000000..2640e7f9 --- /dev/null +++ b/internal/db/models/ns_cluster_model_ext.go @@ -0,0 +1 @@ +package models diff --git a/internal/db/models/nameservers/ns_node_dao.go b/internal/db/models/ns_node_dao.go similarity index 93% rename from internal/db/models/nameservers/ns_node_dao.go rename to internal/db/models/ns_node_dao.go index 8ee2483b..64dc358c 100644 --- a/internal/db/models/nameservers/ns_node_dao.go +++ b/internal/db/models/ns_node_dao.go @@ -1,8 +1,7 @@ -package nameservers +package models import ( "encoding/json" - "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/utils" "github.com/TeaOSLab/EdgeCommon/pkg/configutils" @@ -214,7 +213,7 @@ func (this *NSNodeDAO) CreateNode(tx *dbs.Tx, adminId int64, name string, cluste secret := rands.String(32) // 保存API Token - err = models.SharedApiTokenDAO.CreateAPIToken(tx, uniqueId, secret, nodeconfigs.NodeRoleDNS) + err = SharedApiTokenDAO.CreateAPIToken(tx, uniqueId, secret, nodeconfigs.NodeRoleDNS) if err != nil { return } @@ -281,7 +280,7 @@ func (this *NSNodeDAO) FindEnabledNodeIdWithUniqueId(tx *dbs.Tx, uniqueId string } // FindNodeInstallStatus 查询节点的安装状态 -func (this *NSNodeDAO) FindNodeInstallStatus(tx *dbs.Tx, nodeId int64) (*models.NodeInstallStatus, error) { +func (this *NSNodeDAO) FindNodeInstallStatus(tx *dbs.Tx, nodeId int64) (*NodeInstallStatus, error) { node, err := this.Query(tx). Pk(nodeId). Result("installStatus", "isInstalled"). @@ -296,10 +295,10 @@ func (this *NSNodeDAO) FindNodeInstallStatus(tx *dbs.Tx, nodeId int64) (*models. installStatus := node.(*NSNode).InstallStatus isInstalled := node.(*NSNode).IsInstalled == 1 if len(installStatus) == 0 { - return models.NewNodeInstallStatus(), nil + return NewNodeInstallStatus(), nil } - status := &models.NodeInstallStatus{} + status := &NodeInstallStatus{} err = json.Unmarshal([]byte(installStatus), status) if err != nil { return nil, err @@ -391,7 +390,7 @@ func (this *NSNodeDAO) ComposeNodeConfig(tx *dbs.Tx, nodeId int64) (*dnsconfigs. // 访问日志 // 全局配置 { - globalValue, err := models.SharedSysSettingDAO.ReadSetting(tx, systemconfigs.SettingCodeNSAccessLogSetting) + globalValue, err := SharedSysSettingDAO.ReadSetting(tx, systemconfigs.SettingCodeNSAccessLogSetting) if err != nil { return nil, err } @@ -498,6 +497,29 @@ func (this *NSNodeDAO) UpdateNodeStatusIsNotified(tx *dbs.Tx, nodeId int64) erro UpdateQuickly() } +// FindAllNodeIdsMatch 匹配节点并返回节点ID +func (this *NSNodeDAO) FindAllNodeIdsMatch(tx *dbs.Tx, clusterId int64, includeSecondaryNodes bool, isOn configutils.BoolState) (result []int64, err error) { + query := this.Query(tx) + query.State(NSNodeStateEnabled) + if clusterId > 0 { + query.Attr("clusterId", clusterId) + } + if isOn == configutils.BoolStateYes { + query.Attr("isOn", true) + } else if isOn == configutils.BoolStateNo { + query.Attr("isOn", false) + } + query.Result("id") + ones, _, err := query.FindOnes() + if err != nil { + return nil, err + } + for _, one := range ones { + result = append(result, one.GetInt64("id")) + } + return +} + // NotifyUpdate 通知更新 func (this *NSNodeDAO) NotifyUpdate(tx *dbs.Tx, nodeId int64) error { // TODO 先什么都不做 diff --git a/internal/db/models/nameservers/ns_node_dao_test.go b/internal/db/models/ns_node_dao_test.go similarity index 81% rename from internal/db/models/nameservers/ns_node_dao_test.go rename to internal/db/models/ns_node_dao_test.go index e29fe962..224e9db7 100644 --- a/internal/db/models/nameservers/ns_node_dao_test.go +++ b/internal/db/models/ns_node_dao_test.go @@ -1,4 +1,4 @@ -package nameservers +package models import ( _ "github.com/go-sql-driver/mysql" diff --git a/internal/db/models/nameservers/ns_node_model.go b/internal/db/models/ns_node_model.go similarity index 99% rename from internal/db/models/nameservers/ns_node_model.go rename to internal/db/models/ns_node_model.go index 9268af5f..cc3e03ae 100644 --- a/internal/db/models/nameservers/ns_node_model.go +++ b/internal/db/models/ns_node_model.go @@ -1,4 +1,4 @@ -package nameservers +package models // NSNode 域名服务器节点 type NSNode struct { diff --git a/internal/db/models/nameservers/ns_node_model_ext.go b/internal/db/models/ns_node_model_ext.go similarity index 75% rename from internal/db/models/nameservers/ns_node_model_ext.go rename to internal/db/models/ns_node_model_ext.go index 49d1f0e1..7f87a8a4 100644 --- a/internal/db/models/nameservers/ns_node_model_ext.go +++ b/internal/db/models/ns_node_model_ext.go @@ -1,21 +1,20 @@ -package nameservers +package models import ( "encoding/json" - "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "time" ) // DecodeInstallStatus 安装状态 -func (this *NSNode) DecodeInstallStatus() (*models.NodeInstallStatus, error) { +func (this *NSNode) DecodeInstallStatus() (*NodeInstallStatus, error) { if len(this.InstallStatus) == 0 || this.InstallStatus == "null" { - return models.NewNodeInstallStatus(), nil + return NewNodeInstallStatus(), nil } - status := &models.NodeInstallStatus{} + status := &NodeInstallStatus{} err := json.Unmarshal([]byte(this.InstallStatus), status) if err != nil { - return models.NewNodeInstallStatus(), err + return NewNodeInstallStatus(), err } // 如果N秒钟没有更新状态,则认为不在运行 diff --git a/internal/db/models/server_dao.go b/internal/db/models/server_dao.go index 6eeaa829..94622c12 100644 --- a/internal/db/models/server_dao.go +++ b/internal/db/models/server_dao.go @@ -8,6 +8,7 @@ import ( "github.com/TeaOSLab/EdgeAPI/internal/db/models/dns" "github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils" "github.com/TeaOSLab/EdgeCommon/pkg/configutils" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs" @@ -1270,11 +1271,11 @@ func (this *ServerDAO) UpdateUserServersClusterId(tx *dbs.Tx, userId int64, oldC } if oldClusterId > 0 { - err = SharedNodeTaskDAO.CreateClusterTask(tx, oldClusterId, NodeTaskTypeConfigChanged) + err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, oldClusterId, NodeTaskTypeConfigChanged) if err != nil { return err } - err = SharedNodeTaskDAO.CreateClusterTask(tx, oldClusterId, NodeTaskTypeIPItemChanged) + err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, oldClusterId, NodeTaskTypeIPItemChanged) if err != nil { return err } @@ -1285,11 +1286,11 @@ func (this *ServerDAO) UpdateUserServersClusterId(tx *dbs.Tx, userId int64, oldC } if newClusterId > 0 { - err = SharedNodeTaskDAO.CreateClusterTask(tx, newClusterId, NodeTaskTypeConfigChanged) + err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, newClusterId, NodeTaskTypeConfigChanged) if err != nil { return err } - err = SharedNodeTaskDAO.CreateClusterTask(tx, newClusterId, NodeTaskTypeIPItemChanged) + err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, newClusterId, NodeTaskTypeIPItemChanged) if err != nil { return err } @@ -1437,7 +1438,7 @@ func (this *ServerDAO) NotifyUpdate(tx *dbs.Tx, serverId int64) error { if clusterId == 0 { return nil } - return SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, NodeTaskTypeConfigChanged) + return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeConfigChanged) } // NotifyDNSUpdate 通知DNS更新 diff --git a/internal/rpc/services/nameservers/service_ns.go b/internal/rpc/services/nameservers/service_ns.go index 76ead017..8af9eeaf 100644 --- a/internal/rpc/services/nameservers/service_ns.go +++ b/internal/rpc/services/nameservers/service_ns.go @@ -45,21 +45,21 @@ func (this *NSService) ComposeNSBoard(ctx context.Context, req *pb.ComposeNSBoar result.CountNSRecords = countRecords // 集群数 - countClusters, err := nameservers.SharedNSClusterDAO.CountAllEnabledClusters(tx) + countClusters, err := models.SharedNSClusterDAO.CountAllEnabledClusters(tx) if err != nil { return nil, err } result.CountNSClusters = countClusters // 节点数 - countNodes, err := nameservers.SharedNSNodeDAO.CountAllEnabledNodes(tx) + countNodes, err := models.SharedNSNodeDAO.CountAllEnabledNodes(tx) if err != nil { return nil, err } result.CountNSNodes = countNodes // 离线节点数 - countOfflineNodes, err := nameservers.SharedNSNodeDAO.CountAllOfflineNodes(tx) + countOfflineNodes, err := models.SharedNSNodeDAO.CountAllOfflineNodes(tx) if err != nil { return nil, err } @@ -122,7 +122,7 @@ func (this *NSService) ComposeNSBoard(ctx context.Context, req *pb.ComposeNSBoar return nil, err } for _, stat := range topNodeStats { - nodeName, err := nameservers.SharedNSNodeDAO.FindEnabledNSNodeName(tx, int64(stat.NodeId)) + nodeName, err := models.SharedNSNodeDAO.FindEnabledNSNodeName(tx, int64(stat.NodeId)) if err != nil { return nil, err } diff --git a/internal/rpc/services/nameservers/service_ns_cluster.go b/internal/rpc/services/nameservers/service_ns_cluster.go index 1c5db62b..b07a99f2 100644 --- a/internal/rpc/services/nameservers/service_ns_cluster.go +++ b/internal/rpc/services/nameservers/service_ns_cluster.go @@ -4,8 +4,9 @@ package nameservers import ( "context" - "github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/rpc/services" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" ) @@ -21,7 +22,7 @@ func (this *NSClusterService) CreateNSCluster(ctx context.Context, req *pb.Creat return nil, err } var tx = this.NullTx() - clusterId, err := nameservers.SharedNSClusterDAO.CreateCluster(tx, req.Name, req.AccessLogJSON) + clusterId, err := models.SharedNSClusterDAO.CreateCluster(tx, req.Name, req.AccessLogJSON) if err != nil { return nil, err } @@ -35,7 +36,7 @@ func (this *NSClusterService) UpdateNSCluster(ctx context.Context, req *pb.Updat return nil, err } var tx = this.NullTx() - err = nameservers.SharedNSClusterDAO.UpdateCluster(tx, req.NsClusterId, req.Name, req.IsOn) + err = models.SharedNSClusterDAO.UpdateCluster(tx, req.NsClusterId, req.Name, req.IsOn) if err != nil { return nil, err } @@ -50,7 +51,7 @@ func (this *NSClusterService) FindNSClusterAccessLog(ctx context.Context, req *p } var tx = this.NullTx() - accessLogJSON, err := nameservers.SharedNSClusterDAO.FindClusterAccessLog(tx, req.NsClusterId) + accessLogJSON, err := models.SharedNSClusterDAO.FindClusterAccessLog(tx, req.NsClusterId) if err != nil { return nil, err } @@ -65,7 +66,7 @@ func (this *NSClusterService) UpdateNSClusterAccessLog(ctx context.Context, req } var tx = this.NullTx() - err = nameservers.SharedNSClusterDAO.UpdateClusterAccessLog(tx, req.NsClusterId, req.AccessLogJSON) + err = models.SharedNSClusterDAO.UpdateClusterAccessLog(tx, req.NsClusterId, req.AccessLogJSON) if err != nil { return nil, err } @@ -79,10 +80,17 @@ func (this *NSClusterService) DeleteNSCluster(ctx context.Context, req *pb.Delet return nil, err } var tx = this.NullTx() - err = nameservers.SharedNSClusterDAO.DisableNSCluster(tx, req.NsClusterId) + err = models.SharedNSClusterDAO.DisableNSCluster(tx, req.NsClusterId) if err != nil { return nil, err } + + // 删除任务 + err = models.SharedNodeTaskDAO.DeleteAllClusterTasks(tx, nodeconfigs.NodeRoleDNS, req.NsClusterId) + if err != nil { + return nil, err + } + return this.Success() } @@ -93,7 +101,7 @@ func (this *NSClusterService) FindEnabledNSCluster(ctx context.Context, req *pb. return nil, err } var tx = this.NullTx() - cluster, err := nameservers.SharedNSClusterDAO.FindEnabledNSCluster(tx, req.NsClusterId) + cluster, err := models.SharedNSClusterDAO.FindEnabledNSCluster(tx, req.NsClusterId) if err != nil { return nil, err } @@ -115,7 +123,7 @@ func (this *NSClusterService) CountAllEnabledNSClusters(ctx context.Context, req return nil, err } var tx = this.NullTx() - count, err := nameservers.SharedNSClusterDAO.CountAllEnabledClusters(tx) + count, err := models.SharedNSClusterDAO.CountAllEnabledClusters(tx) if err != nil { return nil, err } @@ -129,7 +137,7 @@ func (this *NSClusterService) ListEnabledNSClusters(ctx context.Context, req *pb return nil, err } var tx = this.NullTx() - clusters, err := nameservers.SharedNSClusterDAO.ListEnabledClusters(tx, req.Offset, req.Size) + clusters, err := models.SharedNSClusterDAO.ListEnabledClusters(tx, req.Offset, req.Size) if err != nil { return nil, err } @@ -152,7 +160,7 @@ func (this *NSClusterService) FindAllEnabledNSClusters(ctx context.Context, req return nil, err } var tx = this.NullTx() - clusters, err := nameservers.SharedNSClusterDAO.FindAllEnabledClusters(tx) + clusters, err := models.SharedNSClusterDAO.FindAllEnabledClusters(tx) if err != nil { return nil, err } diff --git a/internal/rpc/services/nameservers/service_ns_domain.go b/internal/rpc/services/nameservers/service_ns_domain.go index 7ce19ffd..c86cba1c 100644 --- a/internal/rpc/services/nameservers/service_ns_domain.go +++ b/internal/rpc/services/nameservers/service_ns_domain.go @@ -78,7 +78,7 @@ func (this *NSDomainService) FindEnabledNSDomain(ctx context.Context, req *pb.Fi } // 集群 - cluster, err := nameservers.SharedNSClusterDAO.FindEnabledNSCluster(tx, int64(domain.ClusterId)) + cluster, err := models.SharedNSClusterDAO.FindEnabledNSCluster(tx, int64(domain.ClusterId)) if err != nil { return nil, err } @@ -150,7 +150,7 @@ func (this *NSDomainService) ListEnabledNSDomains(ctx context.Context, req *pb.L pbDomains := []*pb.NSDomain{} for _, domain := range domains { // 集群 - cluster, err := nameservers.SharedNSClusterDAO.FindEnabledNSCluster(tx, int64(domain.ClusterId)) + cluster, err := models.SharedNSClusterDAO.FindEnabledNSCluster(tx, int64(domain.ClusterId)) if err != nil { return nil, err } diff --git a/internal/rpc/services/nameservers/service_ns_node.go b/internal/rpc/services/nameservers/service_ns_node.go index 3d56b9e1..8ff6af21 100644 --- a/internal/rpc/services/nameservers/service_ns_node.go +++ b/internal/rpc/services/nameservers/service_ns_node.go @@ -5,12 +5,13 @@ package nameservers import ( "context" "encoding/json" - "github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/installers" "github.com/TeaOSLab/EdgeAPI/internal/rpc/services" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" "github.com/TeaOSLab/EdgeCommon/pkg/configutils" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" stringutil "github.com/iwind/TeaGo/utils/string" "path/filepath" @@ -30,7 +31,7 @@ func (this *NSNodeService) FindAllEnabledNSNodesWithNSClusterId(ctx context.Cont var tx = this.NullTx() - nodes, err := nameservers.SharedNSNodeDAO.FindAllEnabledNodesWithClusterId(tx, req.NsClusterId) + nodes, err := models.SharedNSNodeDAO.FindAllEnabledNodesWithClusterId(tx, req.NsClusterId) if err != nil { return nil, err } @@ -60,7 +61,7 @@ func (this *NSNodeService) CountAllEnabledNSNodes(ctx context.Context, req *pb.C } var tx = this.NullTx() - count, err := nameservers.SharedNSNodeDAO.CountAllEnabledNodes(tx) + count, err := models.SharedNSNodeDAO.CountAllEnabledNodes(tx) if err != nil { return nil, err } @@ -75,7 +76,7 @@ func (this *NSNodeService) CountAllEnabledNSNodesMatch(ctx context.Context, req } var tx = this.NullTx() - count, err := nameservers.SharedNSNodeDAO.CountAllEnabledNodesMatch(tx, req.NsClusterId, configutils.ToBoolState(req.InstallState), configutils.ToBoolState(req.ActiveState), req.Keyword) + count, err := models.SharedNSNodeDAO.CountAllEnabledNodesMatch(tx, req.NsClusterId, configutils.ToBoolState(req.InstallState), configutils.ToBoolState(req.ActiveState), req.Keyword) if err != nil { return nil, err } @@ -90,7 +91,7 @@ func (this *NSNodeService) ListEnabledNSNodesMatch(ctx context.Context, req *pb. } var tx = this.NullTx() - nodes, err := nameservers.SharedNSNodeDAO.ListAllEnabledNodesMatch(tx, req.NsClusterId, configutils.ToBoolState(req.InstallState), configutils.ToBoolState(req.ActiveState), req.Keyword, req.Offset, req.Size) + nodes, err := models.SharedNSNodeDAO.ListAllEnabledNodesMatch(tx, req.NsClusterId, configutils.ToBoolState(req.InstallState), configutils.ToBoolState(req.ActiveState), req.Keyword, req.Offset, req.Size) if err != nil { return nil, err } @@ -144,7 +145,7 @@ func (this *NSNodeService) CountAllUpgradeNSNodesWithNSClusterId(ctx context.Con deployFiles := installers.SharedDeployManager.LoadNSNodeFiles() total := int64(0) for _, deployFile := range deployFiles { - count, err := nameservers.SharedNSNodeDAO.CountAllLowerVersionNodesWithClusterId(tx, req.NsClusterId, deployFile.OS, deployFile.Arch, deployFile.Version) + count, err := models.SharedNSNodeDAO.CountAllLowerVersionNodesWithClusterId(tx, req.NsClusterId, deployFile.OS, deployFile.Arch, deployFile.Version) if err != nil { return nil, err } @@ -163,7 +164,7 @@ func (this *NSNodeService) CreateNSNode(ctx context.Context, req *pb.CreateNSNod tx := this.NullTx() - nodeId, err := nameservers.SharedNSNodeDAO.CreateNode(tx, adminId, req.Name, req.NodeClusterId) + nodeId, err := models.SharedNSNodeDAO.CreateNode(tx, adminId, req.Name, req.NodeClusterId) if err != nil { return nil, err } @@ -182,7 +183,13 @@ func (this *NSNodeService) DeleteNSNode(ctx context.Context, req *pb.DeleteNSNod tx := this.NullTx() - err = nameservers.SharedNSNodeDAO.DisableNSNode(tx, req.NsNodeId) + err = models.SharedNSNodeDAO.DisableNSNode(tx, req.NsNodeId) + if err != nil { + return nil, err + } + + // 删除任务 + err = models.SharedNodeTaskDAO.DeleteNodeTasks(tx, nodeconfigs.NodeRoleDNS, req.NsNodeId) if err != nil { return nil, err } @@ -199,7 +206,7 @@ func (this *NSNodeService) FindEnabledNSNode(ctx context.Context, req *pb.FindEn tx := this.NullTx() - node, err := nameservers.SharedNSNodeDAO.FindEnabledNSNode(tx, req.NsNodeId) + node, err := models.SharedNSNodeDAO.FindEnabledNSNode(tx, req.NsNodeId) if err != nil { return nil, err } @@ -208,7 +215,7 @@ func (this *NSNodeService) FindEnabledNSNode(ctx context.Context, req *pb.FindEn } // 集群信息 - clusterName, err := nameservers.SharedNSClusterDAO.FindEnabledNSClusterName(tx, int64(node.ClusterId)) + clusterName, err := models.SharedNSClusterDAO.FindEnabledNSClusterName(tx, int64(node.ClusterId)) if err != nil { return nil, err } @@ -256,7 +263,7 @@ func (this *NSNodeService) UpdateNSNode(ctx context.Context, req *pb.UpdateNSNod tx := this.NullTx() - err = nameservers.SharedNSNodeDAO.UpdateNode(tx, req.NsNodeId, req.Name, req.NsClusterId, req.IsOn) + err = models.SharedNSNodeDAO.UpdateNode(tx, req.NsNodeId, req.Name, req.NsClusterId, req.IsOn) if err != nil { return nil, err } @@ -287,7 +294,7 @@ func (this *NSNodeService) FindNSNodeInstallStatus(ctx context.Context, req *pb. tx := this.NullTx() - installStatus, err := nameservers.SharedNSNodeDAO.FindNodeInstallStatus(tx, req.NsNodeId) + installStatus, err := models.SharedNSNodeDAO.FindNodeInstallStatus(tx, req.NsNodeId) if err != nil { return nil, err } @@ -315,7 +322,7 @@ func (this *NSNodeService) UpdateNSNodeIsInstalled(ctx context.Context, req *pb. tx := this.NullTx() - err = nameservers.SharedNSNodeDAO.UpdateNodeIsInstalled(tx, req.NsNodeId, req.IsInstalled) + err = models.SharedNSNodeDAO.UpdateNodeIsInstalled(tx, req.NsNodeId, req.IsInstalled) if err != nil { return nil, err } @@ -341,7 +348,7 @@ func (this *NSNodeService) UpdateNSNodeStatus(ctx context.Context, req *pb.Updat tx := this.NullTx() - err = nameservers.SharedNSNodeDAO.UpdateNodeStatus(tx, nodeId, req.StatusJSON) + err = models.SharedNSNodeDAO.UpdateNodeStatus(tx, nodeId, req.StatusJSON) if err != nil { return nil, err } @@ -357,7 +364,7 @@ func (this *NSNodeService) FindCurrentNSNodeConfig(ctx context.Context, req *pb. } var tx = this.NullTx() - config, err := nameservers.SharedNSNodeDAO.ComposeNodeConfig(tx, nodeId) + config, err := models.SharedNSNodeDAO.ComposeNodeConfig(tx, nodeId) if err != nil { return nil, err } @@ -432,7 +439,7 @@ func (this *NSNodeService) UpdateNSNodeConnectedAPINodes(ctx context.Context, re tx := this.NullTx() - err = nameservers.SharedNSNodeDAO.UpdateNodeConnectedAPINodes(tx, nodeId, req.ApiNodeIds) + err = models.SharedNSNodeDAO.UpdateNodeConnectedAPINodes(tx, nodeId, req.ApiNodeIds) if err != nil { return nil, errors.Wrap(err) } diff --git a/internal/rpc/services/nameservers/service_ns_node_stream.go b/internal/rpc/services/nameservers/service_ns_node_stream.go index 98a9303d..07a4e0f8 100644 --- a/internal/rpc/services/nameservers/service_ns_node_stream.go +++ b/internal/rpc/services/nameservers/service_ns_node_stream.go @@ -6,12 +6,13 @@ import ( "fmt" "github.com/TeaOSLab/EdgeAPI/internal/configs" "github.com/TeaOSLab/EdgeAPI/internal/db/models" - "github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers" "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" "github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/logs" "strconv" "sync" @@ -48,22 +49,56 @@ var requestChanMap = map[int64]chan *CommandRequest{} // node id => chan func NextCommandRequestId() int64 { return atomic.AddInt64(&commandRequestId, 1) } - func init() { - // 清理WaitingChannelMap - ticker := time.NewTicker(30 * time.Second) - go func() { - for range ticker.C { - nodeLocker.Lock() - for requestId, request := range responseChanMap { - if time.Now().Unix()-request.Timestamp > 3600 { - responseChanMap[requestId].Close() - delete(responseChanMap, requestId) + dbs.OnReadyDone(func() { + // 清理WaitingChannelMap + go func() { + ticker := time.NewTicker(30 * time.Second) + for range ticker.C { + nodeLocker.Lock() + for requestId, request := range responseChanMap { + if time.Now().Unix()-request.Timestamp > 3600 { + responseChanMap[requestId].Close() + delete(responseChanMap, requestId) + } } + nodeLocker.Unlock() } - nodeLocker.Unlock() - } - }() + }() + + // 自动同步连接到本API节点的NS节点任务 + go func() { + defer func() { + _ = recover() + }() + + // TODO 未来支持同步边缘节点 + var ticker = time.NewTicker(3 * time.Second) + for range ticker.C { + nodeIds, err := models.SharedNodeTaskDAO.FindAllDoingNodeIds(nil, nodeconfigs.NodeRoleDNS) + if err != nil { + remotelogs.Error("NSNodeService_SYNC", err.Error()) + continue + } + nodeLocker.Lock() + for _, nodeId := range nodeIds { + c, ok := requestChanMap[nodeId] + if ok { + select { + case c <- &CommandRequest{ + Id: NextCommandRequestId(), + Code: messageconfigs.NSMessageCodeNewNodeTask, + CommandJSON: nil, + }: + default: + + } + } + } + nodeLocker.Unlock() + } + }() + }) } // NsNodeStream 节点stream @@ -100,22 +135,22 @@ func (this *NSNodeService) NsNodeStream(server pb.NSNodeService_NsNodeStreamServ tx := this.NullTx() // 标记为活跃状态 - oldIsActive, err := nameservers.SharedNSNodeDAO.FindNodeActive(tx, nodeId) + oldIsActive, err := models.SharedNSNodeDAO.FindNodeActive(tx, nodeId) if err != nil { return err } if !oldIsActive { - err = nameservers.SharedNSNodeDAO.UpdateNodeActive(tx, nodeId, true) + err = models.SharedNSNodeDAO.UpdateNodeActive(tx, nodeId, true) if err != nil { return err } // 发送恢复消息 - clusterId, err := nameservers.SharedNSNodeDAO.FindNodeClusterId(tx, nodeId) + clusterId, err := models.SharedNSNodeDAO.FindNodeClusterId(tx, nodeId) if err != nil { return err } - nodeName, err := nameservers.SharedNSNodeDAO.FindEnabledNSNodeName(tx, nodeId) + nodeName, err := models.SharedNSNodeDAO.FindEnabledNSNodeName(tx, nodeId) if err != nil { return err } @@ -169,7 +204,7 @@ func (this *NSNodeService) NsNodeStream(server pb.NSNodeService_NsNodeStreamServ req, err := server.Recv() if err != nil { // 修改节点状态 - err1 := nameservers.SharedNSNodeDAO.UpdateNodeActive(tx, nodeId, false) + err1 := models.SharedNSNodeDAO.UpdateNodeActive(tx, nodeId, false) if err1 != nil { logs.Println(err1.Error()) } diff --git a/internal/rpc/services/nameservers/service_ns_record_hourly_stat.go b/internal/rpc/services/nameservers/service_ns_record_hourly_stat.go index 4f0f4fb4..8ed32acd 100644 --- a/internal/rpc/services/nameservers/service_ns_record_hourly_stat.go +++ b/internal/rpc/services/nameservers/service_ns_record_hourly_stat.go @@ -4,6 +4,7 @@ package nameservers import ( "context" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers" "github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/rpc/services" @@ -31,7 +32,7 @@ func (this *NSRecordHourlyStatService) UploadNSRecordHourlyStats(ctx context.Con } var tx = this.NullTx() - clusterId, err := nameservers.SharedNSNodeDAO.FindNodeClusterId(tx, nodeId) + clusterId, err := models.SharedNSNodeDAO.FindNodeClusterId(tx, nodeId) if err != nil { return nil, err } diff --git a/internal/rpc/services/nameservers/service_ns_route.go b/internal/rpc/services/nameservers/service_ns_route.go index 24072b20..0bca0903 100644 --- a/internal/rpc/services/nameservers/service_ns_route.go +++ b/internal/rpc/services/nameservers/service_ns_route.go @@ -4,6 +4,7 @@ package nameservers import ( "context" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers" "github.com/TeaOSLab/EdgeAPI/internal/rpc/services" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" @@ -75,7 +76,7 @@ func (this *NSRouteService) FindEnabledNSRoute(ctx context.Context, req *pb.Find // 集群 var pbCluster *pb.NSCluster if route.ClusterId > 0 { - cluster, err := nameservers.SharedNSClusterDAO.FindEnabledNSCluster(tx, int64(route.ClusterId)) + cluster, err := models.SharedNSClusterDAO.FindEnabledNSCluster(tx, int64(route.ClusterId)) if err != nil { return nil, err } @@ -130,7 +131,7 @@ func (this *NSRouteService) FindAllEnabledNSRoutes(ctx context.Context, req *pb. // 集群 var pbCluster *pb.NSCluster if route.ClusterId > 0 { - cluster, err := nameservers.SharedNSClusterDAO.FindEnabledNSCluster(tx, int64(route.ClusterId)) + cluster, err := models.SharedNSClusterDAO.FindEnabledNSCluster(tx, int64(route.ClusterId)) if err != nil { return nil, err } diff --git a/internal/rpc/services/service_admin.go b/internal/rpc/services/service_admin.go index 774e928d..1fa0feca 100644 --- a/internal/rpc/services/service_admin.go +++ b/internal/rpc/services/service_admin.go @@ -6,7 +6,6 @@ import ( teaconst "github.com/TeaOSLab/EdgeAPI/internal/const" "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models/authority" - "github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers" "github.com/TeaOSLab/EdgeAPI/internal/db/models/stats" "github.com/TeaOSLab/EdgeAPI/internal/errors" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" @@ -637,7 +636,7 @@ func (this *AdminService) ComposeAdminDashboard(ctx context.Context, req *pb.Com upgradeInfo := &pb.ComposeAdminDashboardResponse_UpgradeInfo{ NewVersion: teaconst.DNSNodeVersion, } - countNodes, err := nameservers.SharedNSNodeDAO.CountAllLowerVersionNodes(tx, upgradeInfo.NewVersion) + countNodes, err := models.SharedNSNodeDAO.CountAllLowerVersionNodes(tx, upgradeInfo.NewVersion) if err != nil { return nil, err } diff --git a/internal/rpc/services/service_base.go b/internal/rpc/services/service_base.go index c04b8991..e8e5caa2 100644 --- a/internal/rpc/services/service_base.go +++ b/internal/rpc/services/service_base.go @@ -7,7 +7,6 @@ import ( teaconst "github.com/TeaOSLab/EdgeAPI/internal/const" "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models/authority" - "github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers" "github.com/TeaOSLab/EdgeAPI/internal/encrypt" "github.com/TeaOSLab/EdgeAPI/internal/errors" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" @@ -194,7 +193,7 @@ func (this *BaseService) ValidateNodeId(ctx context.Context, roles ...rpcutils.U case rpcutils.UserTypeMonitor: nodeIntId, err = models.SharedMonitorNodeDAO.FindEnabledMonitorNodeIdWithUniqueId(nil, nodeId) case rpcutils.UserTypeDNS: - nodeIntId, err = nameservers.SharedNSNodeDAO.FindEnabledNodeIdWithUniqueId(nil, nodeId) + nodeIntId, err = models.SharedNSNodeDAO.FindEnabledNodeIdWithUniqueId(nil, nodeId) case rpcutils.UserTypeAuthority: nodeIntId, err = authority.SharedAuthorityNodeDAO.FindEnabledAuthorityNodeIdWithUniqueId(nil, nodeId) default: diff --git a/internal/rpc/services/service_message.go b/internal/rpc/services/service_message.go index 2697c4ed..ff7097f0 100644 --- a/internal/rpc/services/service_message.go +++ b/internal/rpc/services/service_message.go @@ -3,7 +3,6 @@ package services import ( "context" "github.com/TeaOSLab/EdgeAPI/internal/db/models" - "github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" ) @@ -63,7 +62,7 @@ func (this *MessageService) ListUnreadMessages(ctx context.Context, req *pb.List } } case nodeconfigs.NodeRoleDNS: - cluster, err := nameservers.SharedNSClusterDAO.FindEnabledNSCluster(tx, int64(message.ClusterId)) + cluster, err := models.SharedNSClusterDAO.FindEnabledNSCluster(tx, int64(message.ClusterId)) if err != nil { return nil, err } @@ -90,7 +89,7 @@ func (this *MessageService) ListUnreadMessages(ctx context.Context, req *pb.List } } case nodeconfigs.NodeRoleDNS: - node, err := nameservers.SharedNSNodeDAO.FindEnabledNSNode(tx, int64(message.NodeId)) + node, err := models.SharedNSNodeDAO.FindEnabledNSNode(tx, int64(message.NodeId)) if err != nil { return nil, err } diff --git a/internal/rpc/services/service_node.go b/internal/rpc/services/service_node.go index 842cd829..9fab29d4 100644 --- a/internal/rpc/services/service_node.go +++ b/internal/rpc/services/service_node.go @@ -375,7 +375,7 @@ func (this *NodeService) DeleteNode(ctx context.Context, req *pb.DeleteNodeReque } // 删除节点相关任务 - err = models.SharedNodeTaskDAO.DeleteNodeTasks(tx, req.NodeId) + err = models.SharedNodeTaskDAO.DeleteNodeTasks(tx, nodeconfigs.NodeRoleNode, req.NodeId) if err != nil { return nil, err } @@ -1359,7 +1359,6 @@ func (this *NodeService) UpdateNodeDNS(ctx context.Context, req *pb.UpdateNodeDN delete(routeCodeMap, req.DnsDomainId) } - err = models.SharedNodeDAO.UpdateNodeDNS(tx, req.NodeId, routeCodeMap) if err != nil { return nil, err diff --git a/internal/rpc/services/service_node_cluster.go b/internal/rpc/services/service_node_cluster.go index b2d96358..18b085d0 100644 --- a/internal/rpc/services/service_node_cluster.go +++ b/internal/rpc/services/service_node_cluster.go @@ -97,7 +97,7 @@ func (this *NodeClusterService) DeleteNodeCluster(ctx context.Context, req *pb.D } // 删除相关任务 - err = models.SharedNodeTaskDAO.DeleteAllClusterTasks(tx, req.NodeClusterId) + err = models.SharedNodeTaskDAO.DeleteAllClusterTasks(tx, nodeconfigs.NodeRoleNode, req.NodeClusterId) if err != nil { return nil, err } diff --git a/internal/rpc/services/service_node_task.go b/internal/rpc/services/service_node_task.go index f80a0503..8b3ece55 100644 --- a/internal/rpc/services/service_node_task.go +++ b/internal/rpc/services/service_node_task.go @@ -4,6 +4,8 @@ import ( "context" "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/installers" + rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/iwind/TeaGo/dbs" stringutil "github.com/iwind/TeaGo/utils/string" @@ -17,7 +19,7 @@ type NodeTaskService struct { // FindNodeTasks 获取单节点同步任务 func (this *NodeTaskService) FindNodeTasks(ctx context.Context, req *pb.FindNodeTasksRequest) (*pb.FindNodeTasksResponse, error) { - nodeId, err := this.ValidateNode(ctx) + nodeType, nodeId, err := this.ValidateNodeId(ctx, rpcutils.UserTypeNode, rpcutils.UserTypeDNS) if err != nil { return nil, err } @@ -25,7 +27,7 @@ func (this *NodeTaskService) FindNodeTasks(ctx context.Context, req *pb.FindNode _ = req var tx = this.NullTx() - tasks, err := models.SharedNodeTaskDAO.FindDoingNodeTasks(tx, nodeId) + tasks, err := models.SharedNodeTaskDAO.FindDoingNodeTasks(tx, nodeType, nodeId) if err != nil { return nil, err } @@ -38,18 +40,20 @@ func (this *NodeTaskService) FindNodeTasks(ctx context.Context, req *pb.FindNode }) } - // 版本更新任务 - status, err := models.SharedNodeDAO.FindNodeStatus(tx, nodeId) - if err != nil { - return nil, err - } - if status != nil && len(status.OS) > 0 && len(status.Arch) > 0 && len(status.BuildVersion) > 0 { - deployFile := installers.SharedDeployManager.FindNodeFile(status.OS, status.Arch) - if deployFile != nil { - if stringutil.VersionCompare(deployFile.Version, status.BuildVersion) > 0 { - pbTasks = append(pbTasks, &pb.NodeTask{ - Type: models.NodeTaskTypeNodeVersionChanged, - }) + // 边缘节点版本更新任务 + if nodeType == rpcutils.UserTypeNode { + status, err := models.SharedNodeDAO.FindNodeStatus(tx, nodeId) + if err != nil { + return nil, err + } + if status != nil && len(status.OS) > 0 && len(status.Arch) > 0 && len(status.BuildVersion) > 0 { + deployFile := installers.SharedDeployManager.FindNodeFile(status.OS, status.Arch) + if deployFile != nil { + if stringutil.VersionCompare(deployFile.Version, status.BuildVersion) > 0 { + pbTasks = append(pbTasks, &pb.NodeTask{ + Type: models.NodeTaskTypeNodeVersionChanged, + }) + } } } } @@ -59,7 +63,7 @@ func (this *NodeTaskService) FindNodeTasks(ctx context.Context, req *pb.FindNode // ReportNodeTaskDone 报告同步任务结果 func (this *NodeTaskService) ReportNodeTaskDone(ctx context.Context, req *pb.ReportNodeTaskDoneRequest) (*pb.RPCSuccess, error) { - _, err := this.ValidateNode(ctx) + _, _, err := this.ValidateNodeId(ctx, rpcutils.UserTypeNode, rpcutils.UserTypeDNS) if err != nil { return nil, err } @@ -83,7 +87,8 @@ func (this *NodeTaskService) FindNodeClusterTasks(ctx context.Context, req *pb.F _ = req var tx = this.NullTx() - clusterIds, err := models.SharedNodeTaskDAO.FindAllDoingTaskClusterIds(tx) + // TODO 支持NS节点 + clusterIds, err := models.SharedNodeTaskDAO.FindAllDoingTaskClusterIds(tx, nodeconfigs.NodeRoleNode) if err != nil { return nil, err } @@ -104,7 +109,8 @@ func (this *NodeTaskService) FindNodeClusterTasks(ctx context.Context, req *pb.F // 错误的节点任务 pbNodeTasks := []*pb.NodeTask{} // TODO 考虑节点特别多的情形,比如只显示前100个 - tasks, err := models.SharedNodeTaskDAO.FindAllDoingNodeTasksWithClusterId(tx, clusterId) + // TODO 支持NS节点 + tasks, err := models.SharedNodeTaskDAO.FindAllDoingNodeTasksWithClusterId(tx, nodeconfigs.NodeRoleNode, clusterId) if err != nil { return nil, err } @@ -155,13 +161,13 @@ func (this *NodeTaskService) ExistsNodeTasks(ctx context.Context, req *pb.Exists var tx = this.NullTx() // 是否有任务 - existTask, err := models.SharedNodeTaskDAO.ExistsDoingNodeTasks(tx) + existTask, err := models.SharedNodeTaskDAO.ExistsDoingNodeTasks(tx, nodeconfigs.NodeRoleNode) if err != nil { return nil, err } // 是否有错误 - existError, err := models.SharedNodeTaskDAO.ExistsErrorNodeTasks(tx) + existError, err := models.SharedNodeTaskDAO.ExistsErrorNodeTasks(tx, nodeconfigs.NodeRoleNode) if err != nil { return nil, err } @@ -216,7 +222,7 @@ func (this *NodeTaskService) CountDoingNodeTasks(ctx context.Context, req *pb.Co _ = req var tx = this.NullTx() - count, err := models.SharedNodeTaskDAO.CountDoingNodeTasks(tx) + count, err := models.SharedNodeTaskDAO.CountDoingNodeTasks(tx, nodeconfigs.NodeRoleNode) if err != nil { return nil, err } @@ -238,7 +244,7 @@ func (this *NodeTaskService) FindNotifyingNodeTasks(ctx context.Context, req *pb } var tx = this.NullTx() - tasks, err := models.SharedNodeTaskDAO.FindNotifyingNodeTasks(tx, req.Size) + tasks, err := models.SharedNodeTaskDAO.FindNotifyingNodeTasks(tx, nodeconfigs.NodeRoleNode, req.Size) if err != nil { return nil, err } diff --git a/internal/rpc/services/service_node_value.go b/internal/rpc/services/service_node_value.go index 507a8db8..8c5b396a 100644 --- a/internal/rpc/services/service_node_value.go +++ b/internal/rpc/services/service_node_value.go @@ -5,7 +5,6 @@ package services import ( "context" "github.com/TeaOSLab/EdgeAPI/internal/db/models" - "github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" ) @@ -28,7 +27,7 @@ func (this *NodeValueService) CreateNodeValue(ctx context.Context, req *pb.Creat case rpcutils.UserTypeNode: clusterId, err = models.SharedNodeDAO.FindNodeClusterId(tx, nodeId) case rpcutils.UserTypeDNS: - clusterId, err = nameservers.SharedNSNodeDAO.FindNodeClusterId(tx, nodeId) + clusterId, err = models.SharedNSNodeDAO.FindNodeClusterId(tx, nodeId) case rpcutils.UserTypeUser: } if err != nil { diff --git a/internal/rpc/services/service_server_daily_stat.go b/internal/rpc/services/service_server_daily_stat.go index 3224275d..a1880e7d 100644 --- a/internal/rpc/services/service_server_daily_stat.go +++ b/internal/rpc/services/service_server_daily_stat.go @@ -3,7 +3,6 @@ package services import ( "context" "github.com/TeaOSLab/EdgeAPI/internal/db/models" - "github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers" "github.com/TeaOSLab/EdgeAPI/internal/db/models/stats" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" @@ -34,7 +33,7 @@ func (this *ServerDailyStatService) UploadServerDailyStats(ctx context.Context, var clusterId int64 switch role { case rpcutils.UserTypeDNS: - clusterId, err = nameservers.SharedNSNodeDAO.FindNodeClusterId(tx, nodeId) + clusterId, err = models.SharedNSNodeDAO.FindNodeClusterId(tx, nodeId) if err != nil { return nil, err } diff --git a/internal/rpc/utils/utils.go b/internal/rpc/utils/utils.go index 8290c14a..f1632a3a 100644 --- a/internal/rpc/utils/utils.go +++ b/internal/rpc/utils/utils.go @@ -8,7 +8,6 @@ import ( teaconst "github.com/TeaOSLab/EdgeAPI/internal/const" "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models/authority" - "github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers" "github.com/TeaOSLab/EdgeAPI/internal/encrypt" "github.com/TeaOSLab/EdgeAPI/internal/utils" "github.com/iwind/TeaGo/lists" @@ -187,7 +186,7 @@ func ValidateRequest(ctx context.Context, userTypes ...UserType) (userType UserT nodeUserId = nodeIntId resultNodeId = nodeIntId case UserTypeDNS: - nodeIntId, err := nameservers.SharedNSNodeDAO.FindEnabledNodeIdWithUniqueId(nil, nodeId) + nodeIntId, err := models.SharedNSNodeDAO.FindEnabledNodeIdWithUniqueId(nil, nodeId) if err != nil { return UserTypeNode, nodeIntId, 0, errors.New("context: " + err.Error()) } diff --git a/internal/tasks/node_task_extractor.go b/internal/tasks/node_task_extractor.go index c0870265..45bff8b3 100644 --- a/internal/tasks/node_task_extractor.go +++ b/internal/tasks/node_task_extractor.go @@ -2,6 +2,7 @@ package tasks import ( "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/logs" "time" @@ -42,9 +43,11 @@ func (this *NodeTaskExtractor) Loop() error { // 这里不解锁,是为了让任务N秒钟之内只运行一次 - err = models.SharedNodeTaskDAO.ExtractAllClusterTasks(nil) - if err != nil { - return err + for _, role := range []string{nodeconfigs.NodeRoleNode, nodeconfigs.NodeRoleDNS} { + err = models.SharedNodeTaskDAO.ExtractAllClusterTasks(nil, role) + if err != nil { + return err + } } return nil diff --git a/internal/tasks/ns_node_monitor_task.go b/internal/tasks/ns_node_monitor_task.go index 2de03984..537317bf 100644 --- a/internal/tasks/ns_node_monitor_task.go +++ b/internal/tasks/ns_node_monitor_task.go @@ -2,7 +2,6 @@ package tasks import ( "github.com/TeaOSLab/EdgeAPI/internal/db/models" - "github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers" "github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs" @@ -59,7 +58,7 @@ func (this *NSNodeMonitorTask) loop() error { return err } - clusters, err := nameservers.SharedNSClusterDAO.FindAllEnabledClusters(nil) + clusters, err := models.SharedNSClusterDAO.FindAllEnabledClusters(nil) if err != nil { return err } @@ -73,11 +72,11 @@ func (this *NSNodeMonitorTask) loop() error { return nil } -func (this *NSNodeMonitorTask) monitorCluster(cluster *nameservers.NSCluster) error { +func (this *NSNodeMonitorTask) monitorCluster(cluster *models.NSCluster) error { clusterId := int64(cluster.Id) // 检查离线节点 - inactiveNodes, err := nameservers.SharedNSNodeDAO.FindAllNotifyingInactiveNodesWithClusterId(nil, clusterId) + inactiveNodes, err := models.SharedNSNodeDAO.FindAllNotifyingInactiveNodesWithClusterId(nil, clusterId) if err != nil { return err } @@ -90,7 +89,7 @@ func (this *NSNodeMonitorTask) monitorCluster(cluster *nameservers.NSCluster) er } // 修改在线状态 - err = nameservers.SharedNSNodeDAO.UpdateNodeStatusIsNotified(nil, int64(node.Id)) + err = models.SharedNSNodeDAO.UpdateNodeStatusIsNotified(nil, int64(node.Id)) if err != nil { return err }