diff --git a/internal/db/models/node_cluster_dao.go b/internal/db/models/node_cluster_dao.go index a40c6827..83954676 100644 --- a/internal/db/models/node_cluster_dao.go +++ b/internal/db/models/node_cluster_dao.go @@ -380,7 +380,7 @@ func (this *NodeClusterDAO) FindAllEnabledClustersWithDNSDomainId(tx *dbs.Tx, dn _, err = this.Query(tx). State(NodeClusterStateEnabled). Attr("dnsDomainId", dnsDomainId). - Result("id", "name", "dnsName", "dnsDomainId"). + Result("id", "name", "dnsName", "dnsDomainId", "isOn"). Slice(&result). FindAll() return @@ -391,7 +391,7 @@ func (this *NodeClusterDAO) FindAllEnabledClustersHaveDNSDomain(tx *dbs.Tx) (res _, err = this.Query(tx). State(NodeClusterStateEnabled). Gt("dnsDomainId", 0). - Result("id", "name", "dnsName", "dnsDomainId"). + Result("id", "name", "dnsName", "dnsDomainId", "isOn"). Slice(&result). FindAll() return @@ -409,7 +409,7 @@ func (this *NodeClusterDAO) FindClusterGrantId(tx *dbs.Tx, clusterId int64) (int func (this *NodeClusterDAO) FindClusterDNSInfo(tx *dbs.Tx, clusterId int64) (*NodeCluster, error) { one, err := this.Query(tx). Pk(clusterId). - Result("id", "name", "dnsName", "dnsDomainId", "dns"). + Result("id", "name", "dnsName", "dnsDomainId", "dns", "isOn"). Find() if err != nil { return nil, err @@ -499,7 +499,7 @@ func (this *NodeClusterDAO) CheckClusterDNS(tx *dbs.Tx, cluster *NodeCluster) (i // TODO 检查域名是否已解析 // 检查节点 - nodes, err := SharedNodeDAO.FindAllEnabledNodesDNSWithClusterId(tx, clusterId) + nodes, err := SharedNodeDAO.FindAllEnabledNodesDNSWithClusterId(tx, clusterId, true) if err != nil { return nil, err } @@ -837,6 +837,36 @@ func (this *NodeClusterDAO) FindLatestNodeClusters(tx *dbs.Tx, size int64) (resu return } +// CheckNodeClusterIsOn 获取集群是否正在启用状态 +func (this *NodeClusterDAO) CheckNodeClusterIsOn(tx *dbs.Tx, clusterId int64) (bool, error) { + return this.Query(tx). + Pk(clusterId). + State(NodeClusterStateEnabled). + Attr("isOn", true). + Exist() +} + +// FindEnabledNodeClustersWithIds 查找一组集群 +func (this *NodeClusterDAO) FindEnabledNodeClustersWithIds(tx *dbs.Tx, clusterIds []int64) (result []*NodeCluster, err error) { + if len(clusterIds) == 0 { + return + } + for _, clusterId := range clusterIds { + cluster, err := this.Query(tx). + Pk(clusterId). + State(NodeClusterStateEnabled). + Find() + if err != nil { + return nil, err + } + if cluster == nil { + continue + } + result = append(result, cluster.(*NodeCluster)) + } + return +} + // NotifyUpdate 通知更新 func (this *NodeClusterDAO) NotifyUpdate(tx *dbs.Tx, clusterId int64) error { return SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, NodeTaskTypeConfigChanged) diff --git a/internal/db/models/node_cluster_model.go b/internal/db/models/node_cluster_model.go index 5b29ac86..2dea7e2b 100644 --- a/internal/db/models/node_cluster_model.go +++ b/internal/db/models/node_cluster_model.go @@ -1,10 +1,11 @@ package models -// 节点集群 +// NodeCluster 节点集群 type NodeCluster struct { Id uint32 `field:"id"` // ID AdminId uint32 `field:"adminId"` // 管理员ID UserId uint32 `field:"userId"` // 用户ID + IsOn uint8 `field:"isOn"` // 是否启用 Name string `field:"name"` // 名称 UseAllAPINodes uint8 `field:"useAllAPINodes"` // 是否使用所有API节点 ApiNodes string `field:"apiNodes"` // 使用的API节点 @@ -31,6 +32,7 @@ type NodeClusterOperator struct { Id interface{} // ID AdminId interface{} // 管理员ID UserId interface{} // 用户ID + IsOn interface{} // 是否启用 Name interface{} // 名称 UseAllAPINodes interface{} // 是否使用所有API节点 ApiNodes interface{} // 使用的API节点 diff --git a/internal/db/models/node_dao.go b/internal/db/models/node_dao.go index 944caecf..6c22c9a8 100644 --- a/internal/db/models/node_dao.go +++ b/internal/db/models/node_dao.go @@ -163,7 +163,7 @@ func (this *NodeDAO) CreateNode(tx *dbs.Tx, adminId int64, name string, clusterI } // UpdateNode 修改节点 -func (this *NodeDAO) UpdateNode(tx *dbs.Tx, nodeId int64, name string, clusterId int64, groupId int64, regionId int64, maxCPU int32, isOn bool, maxCacheDiskCapacityJSON []byte, maxCacheMemoryCapacityJSON []byte) error { +func (this *NodeDAO) UpdateNode(tx *dbs.Tx, nodeId int64, name string, clusterId int64, secondaryClusterIds []int64, groupId int64, regionId int64, maxCPU int32, isOn bool, maxCacheDiskCapacityJSON []byte, maxCacheMemoryCapacityJSON []byte) error { if nodeId <= 0 { return errors.New("invalid nodeId") } @@ -171,6 +171,24 @@ func (this *NodeDAO) UpdateNode(tx *dbs.Tx, nodeId int64, name string, clusterId op.Id = nodeId op.Name = name op.ClusterId = clusterId + + // 去重 + var filteredSecondaryClusterIds = []int64{} + for _, secondaryClusterId := range secondaryClusterIds { + if secondaryClusterId <= 0 { + continue + } + if lists.ContainsInt64(filteredSecondaryClusterIds, secondaryClusterId) { + continue + } + filteredSecondaryClusterIds = append(filteredSecondaryClusterIds, secondaryClusterId) + } + filteredSecondaryClusterIdsJSON, err := json.Marshal(filteredSecondaryClusterIds) + if err != nil { + return err + } + op.SecondaryClusterIds = filteredSecondaryClusterIdsJSON + op.GroupId = groupId op.RegionId = regionId op.LatestVersion = dbs.SQL("latestVersion+1") @@ -182,7 +200,7 @@ func (this *NodeDAO) UpdateNode(tx *dbs.Tx, nodeId int64, name string, clusterId if len(maxCacheMemoryCapacityJSON) > 0 { op.MaxCacheMemoryCapacity = maxCacheMemoryCapacityJSON } - err := this.Save(tx, op) + err = this.Save(tx, op) if err != nil { return err } @@ -212,6 +230,7 @@ func (this *NodeDAO) ListEnabledNodesMatch(tx *dbs.Tx, keyword string, groupId int64, regionId int64, + includeSecondaryNodes bool, order string, offset int64, size int64) (result []*Node, err error) { @@ -223,7 +242,13 @@ func (this *NodeDAO) ListEnabledNodesMatch(tx *dbs.Tx, // 集群 if clusterId > 0 { - query.Attr("clusterId", clusterId) + if includeSecondaryNodes { + query.Where("(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))"). + Param("primaryClusterId", clusterId). + Param("primaryClusterIdString", types.String(clusterId)) + } else { + query.Attr("clusterId", clusterId) + } } else { query.Where("clusterId IN (SELECT id FROM " + SharedNodeClusterDAO.Table + " WHERE state=1)") } @@ -327,12 +352,51 @@ func (this *NodeDAO) FindNodeClusterId(tx *dbs.Tx, nodeId int64) (int64, error) return types.Int64(col), err } +// FindEnabledAndOnNodeClusterIds 获取节点所属所有可用而且启用的集群ID +func (this *NodeDAO) FindEnabledAndOnNodeClusterIds(tx *dbs.Tx, nodeId int64) (result []int64, err error) { + one, err := this.Query(tx). + Pk(nodeId). + Result("clusterId", "secondaryClusterIds"). + Find() + if one == nil { + return nil, err + } + var clusterId = int64(one.(*Node).ClusterId) + if clusterId > 0 { + result = append(result, clusterId) + } + + for _, clusterId := range one.(*Node).DecodeSecondaryClusterIds() { + if lists.ContainsInt64(result, clusterId) { + continue + } + + // 检查是否启用 + isOn, err := SharedNodeClusterDAO.CheckNodeClusterIsOn(tx, clusterId) + if err != nil { + return nil, err + } + if !isOn { + continue + } + + result = append(result, clusterId) + } + return +} + // FindAllNodeIdsMatch 匹配节点并返回节点ID -func (this *NodeDAO) FindAllNodeIdsMatch(tx *dbs.Tx, clusterId int64, isOn configutils.BoolState) (result []int64, err error) { +func (this *NodeDAO) FindAllNodeIdsMatch(tx *dbs.Tx, clusterId int64, includeSecondaryNodes bool, isOn configutils.BoolState) (result []int64, err error) { query := this.Query(tx) query.State(NodeStateEnabled) if clusterId > 0 { - query.Attr("clusterId", clusterId) + if includeSecondaryNodes { + query.Where("(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))"). + Param("primaryClusterId", clusterId). + Param("primaryClusterIdString", types.String(clusterId)) + } else { + query.Attr("clusterId", clusterId) + } } else { query.Where("clusterId IN (SELECT id FROM " + SharedNodeClusterDAO.Table + " WHERE state=1)") } @@ -385,13 +449,20 @@ func (this *NodeDAO) CountAllEnabledNodesMatch(tx *dbs.Tx, activeState configutils.BoolState, keyword string, groupId int64, - regionId int64) (int64, error) { + regionId int64, + includeSecondaryNodes bool) (int64, error) { query := this.Query(tx) query.State(NodeStateEnabled) // 集群 if clusterId > 0 { - query.Attr("clusterId", clusterId) + if includeSecondaryNodes { + query.Where("(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))"). + Param("primaryClusterId", clusterId). + Param("primaryClusterIdString", types.String(clusterId)) + } else { + query.Attr("clusterId", clusterId) + } } else { query.Where("clusterId IN (SELECT id FROM " + SharedNodeClusterDAO.Table + " WHERE state=1)") } @@ -878,10 +949,20 @@ func (this *NodeDAO) CountAllEnabledNodesWithRegionId(tx *dbs.Tx, regionId int64 } // FindAllEnabledNodesDNSWithClusterId 获取一个集群的节点DNS信息 -func (this *NodeDAO) FindAllEnabledNodesDNSWithClusterId(tx *dbs.Tx, clusterId int64) (result []*Node, err error) { - _, err = this.Query(tx). +func (this *NodeDAO) FindAllEnabledNodesDNSWithClusterId(tx *dbs.Tx, clusterId int64, includeSecondaryNodes bool) (result []*Node, err error) { + if clusterId <= 0 { + return nil, nil + } + var query = this.Query(tx) + if includeSecondaryNodes { + query.Where("(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))"). + Param("primaryClusterId", clusterId). + Param("primaryClusterIdString", types.String(clusterId)) + } else { + query.Attr("clusterId", clusterId) + } + _, err = query. State(NodeStateEnabled). - Attr("clusterId", clusterId). Attr("isOn", true). Attr("isUp", true). Result("id", "name", "dnsRoutes", "isOn"). @@ -911,7 +992,7 @@ func (this *NodeDAO) FindEnabledNodeDNS(tx *dbs.Tx, nodeId int64) (*Node, error) Pk(nodeId). Result("id", "name", "dnsRoutes", "clusterId", "isOn"). Find() - if err != nil || one == nil { + if one == nil { return nil, err } return one.(*Node), nil @@ -1108,6 +1189,89 @@ func (this *NodeDAO) FindEnabledNodesWithIds(tx *dbs.Tx, nodeIds []int64) (resul return } +// DeleteNodeFromCluster 从集群中删除节点 +func (this *NodeDAO) DeleteNodeFromCluster(tx *dbs.Tx, nodeId int64, clusterId int64) error { + one, err := this.Query(tx). + Pk(nodeId). + Result("clusterId", "secondaryClusterIds"). + Find() + if err != nil { + return err + } + if one == nil { + return nil + } + + var node = one.(*Node) + + var secondaryClusterIds = []int64{} + for _, secondaryClusterId := range node.DecodeSecondaryClusterIds() { + if secondaryClusterId == clusterId { + continue + } + secondaryClusterIds = append(secondaryClusterIds, secondaryClusterId) + } + + var newClusterId = int64(node.ClusterId) + + if newClusterId == clusterId { + newClusterId = 0 + + // 选择一个从集群作为主集群 + if len(secondaryClusterIds) > 0 { + newClusterId = secondaryClusterIds[0] + secondaryClusterIds = secondaryClusterIds[1:] + } + } + + secondaryClusterIdsJSON, err := json.Marshal(secondaryClusterIds) + if err != nil { + return err + } + op := NewNodeOperator() + op.Id = nodeId + op.ClusterId = newClusterId + op.SecondaryClusterIds = secondaryClusterIdsJSON + return this.Save(tx, op) +} + +// TransferPrimaryClusterNodes 自动转移集群下的节点 +func (this *NodeDAO) TransferPrimaryClusterNodes(tx *dbs.Tx, primaryClusterId int64) error { + if primaryClusterId <= 0 { + return nil + } + ones, err := this.Query(tx). + Attr("clusterId", primaryClusterId). + Result("id", "secondaryClusterIds"). + State(NodeStateEnabled). + FindAll() + if err != nil { + return err + } + for _, one := range ones { + var node = one.(*Node) + clusterIds := node.DecodeSecondaryClusterIds() + if len(clusterIds) == 0 { + continue + } + var clusterId = clusterIds[0] + var secondaryClusterIds = clusterIds[1:] + secondaryClusterIdsJSON, err := json.Marshal(secondaryClusterIds) + if err != nil { + return err + } + err = this.Query(tx). + Pk(node.Id). + Set("clusterId", clusterId). + Set("secondaryClusterIds", secondaryClusterIdsJSON). + UpdateQuickly() + if err != nil { + return err + } + } + return nil +} + // NotifyUpdate 通知更新 func (this *NodeDAO) NotifyUpdate(tx *dbs.Tx, nodeId int64) error { clusterId, err := this.FindNodeClusterId(tx, nodeId) @@ -1122,25 +1286,25 @@ func (this *NodeDAO) NotifyUpdate(tx *dbs.Tx, nodeId int64) error { // NotifyDNSUpdate 通知DNS更新 func (this *NodeDAO) NotifyDNSUpdate(tx *dbs.Tx, nodeId int64) error { - clusterId, err := this.Query(tx). - Pk(nodeId). - Result("clusterId"). - FindInt64Col(0) // 这里不需要加服务状态条件,因为我们即使删除也要删除对应的服务的DNS解析 + clusterIds, err := this.FindEnabledAndOnNodeClusterIds(tx, nodeId) if err != nil { return err } - if clusterId <= 0 { - return nil + for _, clusterId := range clusterIds { + dnsInfo, err := SharedNodeClusterDAO.FindClusterDNSInfo(tx, clusterId) + if err != nil { + return err + } + if dnsInfo == nil { + continue + } + if len(dnsInfo.DnsName) == 0 || dnsInfo.DnsDomainId <= 0 { + continue + } + err = dns.SharedDNSTaskDAO.CreateNodeTask(tx, nodeId, dns.DNSTaskTypeNodeChange) + if err != nil { + return err + } } - dnsInfo, err := SharedNodeClusterDAO.FindClusterDNSInfo(tx, clusterId) - if err != nil { - return err - } - if dnsInfo == nil { - return nil - } - if len(dnsInfo.DnsName) == 0 || dnsInfo.DnsDomainId <= 0 { - return nil - } - return dns.SharedDNSTaskDAO.CreateNodeTask(tx, nodeId, dns.DNSTaskTypeNodeChange) + return nil } diff --git a/internal/db/models/node_dao_test.go b/internal/db/models/node_dao_test.go index 38da5fe9..3994c7c2 100644 --- a/internal/db/models/node_dao_test.go +++ b/internal/db/models/node_dao_test.go @@ -24,3 +24,13 @@ func TestNodeDAO_UpdateNodeUp(t *testing.T) { } t.Log("ok") } + +func TestNodeDAO_FindEnabledNodeClusterIds(t *testing.T) { + dbs.NotifyReady() + var tx *dbs.Tx + clusterIds, err := NewNodeDAO().FindEnabledAndOnNodeClusterIds(tx, 48) + if err != nil { + t.Fatal(err) + } + t.Log(clusterIds) +} diff --git a/internal/db/models/node_model.go b/internal/db/models/node_model.go index daf9146e..d46aa755 100644 --- a/internal/db/models/node_model.go +++ b/internal/db/models/node_model.go @@ -14,7 +14,8 @@ type Node struct { Secret string `field:"secret"` // 密钥 Name string `field:"name"` // 节点名 Code string `field:"code"` // 代号 - ClusterId uint32 `field:"clusterId"` // 集群ID + ClusterId uint32 `field:"clusterId"` // 主集群ID + SecondaryClusterIds string `field:"secondaryClusterIds"` // 从集群ID RegionId uint32 `field:"regionId"` // 区域ID GroupId uint32 `field:"groupId"` // 分组ID CreatedAt uint64 `field:"createdAt"` // 创建时间 @@ -45,7 +46,8 @@ type NodeOperator struct { Secret interface{} // 密钥 Name interface{} // 节点名 Code interface{} // 代号 - ClusterId interface{} // 集群ID + ClusterId interface{} // 主集群ID + SecondaryClusterIds interface{} // 从集群ID RegionId interface{} // 区域ID GroupId interface{} // 分组ID CreatedAt interface{} // 创建时间 diff --git a/internal/db/models/node_model_ext.go b/internal/db/models/node_model_ext.go index eca59dc5..e04512ba 100644 --- a/internal/db/models/node_model_ext.go +++ b/internal/db/models/node_model_ext.go @@ -6,7 +6,7 @@ import ( "time" ) -// 安装状态 +// DecodeInstallStatus 安装状态 func (this *Node) DecodeInstallStatus() (*NodeInstallStatus, error) { if len(this.InstallStatus) == 0 || this.InstallStatus == "null" { return NewNodeInstallStatus(), nil @@ -27,7 +27,7 @@ func (this *Node) DecodeInstallStatus() (*NodeInstallStatus, error) { return status, nil } -// 节点状态 +// DecodeStatus 节点状态 func (this *Node) DecodeStatus() (*nodeconfigs.NodeStatus, error) { if len(this.Status) == 0 || this.Status == "null" { return nil, nil @@ -40,20 +40,21 @@ func (this *Node) DecodeStatus() (*nodeconfigs.NodeStatus, error) { return status, nil } -// 所有的DNS线路 -func (this *Node) DNSRouteCodes() (map[int64][]string, error) { +// DNSRouteCodes 所有的DNS线路 +func (this *Node) DNSRouteCodes() map[int64][]string { routes := map[int64][]string{} // domainId => routes if len(this.DnsRoutes) == 0 || this.DnsRoutes == "null" { - return routes, nil + return routes } err := json.Unmarshal([]byte(this.DnsRoutes), &routes) if err != nil { - return map[int64][]string{}, err + // 忽略错误 + return routes } - return routes, nil + return routes } -// DNS线路 +// DNSRouteCodesForDomainId DNS线路 func (this *Node) DNSRouteCodesForDomainId(dnsDomainId int64) ([]string, error) { routes := map[int64][]string{} // domainId => routes if len(this.DnsRoutes) == 0 || this.DnsRoutes == "null" { @@ -67,7 +68,7 @@ func (this *Node) DNSRouteCodesForDomainId(dnsDomainId int64) ([]string, error) return domainRoutes, nil } -// 连接的API +// DecodeConnectedAPINodeIds 连接的API func (this *Node) DecodeConnectedAPINodeIds() ([]int64, error) { apiNodeIds := []int64{} if IsNotNull(this.ConnectedAPINodes) { @@ -78,3 +79,14 @@ func (this *Node) DecodeConnectedAPINodeIds() ([]int64, error) { } return apiNodeIds, nil } + +// DecodeSecondaryClusterIds 从集群IDs +func (this *Node) DecodeSecondaryClusterIds() []int64 { + if len(this.SecondaryClusterIds) == 0 { + return []int64{} + } + var result = []int64{} + // 不需要处理错误 + _ = json.Unmarshal([]byte(this.SecondaryClusterIds), &result) + return result +} diff --git a/internal/db/models/node_task_dao.go b/internal/db/models/node_task_dao.go index 979c220f..2d511910 100644 --- a/internal/db/models/node_task_dao.go +++ b/internal/db/models/node_task_dao.go @@ -7,6 +7,7 @@ import ( "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/maps" + "github.com/iwind/TeaGo/types" "time" ) @@ -39,7 +40,7 @@ func init() { }) } -// 创建单个节点任务 +// CreateNodeTask 创建单个节点任务 func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, clusterId int64, nodeId int64, taskType NodeTaskType) error { if clusterId <= 0 || nodeId <= 0 { return nil @@ -66,7 +67,7 @@ func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, clusterId int64, nodeId int6 return err } -// 创建集群任务 +// CreateClusterTask 创建集群任务 func (this *NodeTaskDAO) CreateClusterTask(tx *dbs.Tx, clusterId int64, taskType NodeTaskType) error { if clusterId <= 0 { return nil @@ -95,15 +96,16 @@ func (this *NodeTaskDAO) CreateClusterTask(tx *dbs.Tx, clusterId int64, taskType return err } -// 分解集群任务 +// ExtractClusterTask 分解集群任务 func (this *NodeTaskDAO) ExtractClusterTask(tx *dbs.Tx, clusterId int64, taskType NodeTaskType) error { - nodeIds, err := SharedNodeDAO.FindAllNodeIdsMatch(tx, clusterId, configutils.BoolStateYes) + nodeIds, err := SharedNodeDAO.FindAllNodeIdsMatch(tx, clusterId, true, configutils.BoolStateYes) if err != nil { return err } _, err = this.Query(tx). Attr("clusterId", clusterId). + Param("clusterIdString", types.String(clusterId)). Where("nodeId> 0"). Attr("type", taskType). Delete() @@ -130,7 +132,7 @@ func (this *NodeTaskDAO) ExtractClusterTask(tx *dbs.Tx, clusterId int64, taskTyp return nil } -// 分解所有集群任务 +// ExtractAllClusterTasks 分解所有集群任务 func (this *NodeTaskDAO) ExtractAllClusterTasks(tx *dbs.Tx) error { ones, err := this.Query(tx). Attr("nodeId", 0). @@ -148,7 +150,7 @@ func (this *NodeTaskDAO) ExtractAllClusterTasks(tx *dbs.Tx) error { return nil } -// 删除集群所有相关任务 +// DeleteAllClusterTasks 删除集群所有相关任务 func (this *NodeTaskDAO) DeleteAllClusterTasks(tx *dbs.Tx, clusterId int64) error { _, err := this.Query(tx). Attr("clusterId", clusterId). @@ -156,7 +158,7 @@ func (this *NodeTaskDAO) DeleteAllClusterTasks(tx *dbs.Tx, clusterId int64) erro return err } -// 删除节点相关任务 +// DeleteNodeTasks 删除节点相关任务 func (this *NodeTaskDAO) DeleteNodeTasks(tx *dbs.Tx, nodeId int64) error { _, err := this.Query(tx). Attr("nodeId", nodeId). @@ -164,7 +166,7 @@ func (this *NodeTaskDAO) DeleteNodeTasks(tx *dbs.Tx, nodeId int64) error { return err } -// 查询一个节点的所有任务 +// FindDoingNodeTasks 查询一个节点的所有任务 func (this *NodeTaskDAO) FindDoingNodeTasks(tx *dbs.Tx, nodeId int64) (result []*NodeTask, err error) { if nodeId <= 0 { return @@ -177,7 +179,7 @@ func (this *NodeTaskDAO) FindDoingNodeTasks(tx *dbs.Tx, nodeId int64) (result [] return } -// 修改节点任务的完成状态 +// UpdateNodeTaskDone 修改节点任务的完成状态 func (this *NodeTaskDAO) UpdateNodeTaskDone(tx *dbs.Tx, taskId int64, isOk bool, errorMessage string) error { _, err := this.Query(tx). Pk(taskId). @@ -188,7 +190,7 @@ func (this *NodeTaskDAO) UpdateNodeTaskDone(tx *dbs.Tx, taskId int64, isOk bool, return err } -// 查找正在更新的集群IDs +// FindAllDoingTaskClusterIds 查找正在更新的集群IDs func (this *NodeTaskDAO) FindAllDoingTaskClusterIds(tx *dbs.Tx) ([]int64, error) { ones, _, err := this.Query(tx). Result("DISTINCT(clusterId) AS clusterId"). @@ -204,7 +206,7 @@ func (this *NodeTaskDAO) FindAllDoingTaskClusterIds(tx *dbs.Tx) ([]int64, error) return result, nil } -// 查询某个集群下所有的任务 +// FindAllDoingNodeTasksWithClusterId 查询某个集群下所有的任务 func (this *NodeTaskDAO) FindAllDoingNodeTasksWithClusterId(tx *dbs.Tx, clusterId int64) (result []*NodeTask, err error) { _, err = this.Query(tx). Attr("clusterId", clusterId). @@ -218,7 +220,7 @@ func (this *NodeTaskDAO) FindAllDoingNodeTasksWithClusterId(tx *dbs.Tx, clusterI return } -// 检查是否有正在执行的任务 +// ExistsDoingNodeTasks 检查是否有正在执行的任务 func (this *NodeTaskDAO) ExistsDoingNodeTasks(tx *dbs.Tx) (bool, error) { return this.Query(tx). Where("(isDone=0 OR (isDone=1 AND isOk=0))"). @@ -226,14 +228,14 @@ func (this *NodeTaskDAO) ExistsDoingNodeTasks(tx *dbs.Tx) (bool, error) { Exist() } -// 是否有错误的任务 +// ExistsErrorNodeTasks 是否有错误的任务 func (this *NodeTaskDAO) ExistsErrorNodeTasks(tx *dbs.Tx) (bool, error) { return this.Query(tx). Where("(isDone=1 AND isOk=0)"). Exist() } -// 删除任务 +// DeleteNodeTask 删除任务 func (this *NodeTaskDAO) DeleteNodeTask(tx *dbs.Tx, taskId int64) error { _, err := this.Query(tx). Pk(taskId). @@ -241,7 +243,7 @@ func (this *NodeTaskDAO) DeleteNodeTask(tx *dbs.Tx, taskId int64) error { return err } -// 计算正在执行的任务 +// CountDoingNodeTasks 计算正在执行的任务 func (this *NodeTaskDAO) CountDoingNodeTasks(tx *dbs.Tx) (int64, error) { return this.Query(tx). Attr("isDone", 0). @@ -249,7 +251,7 @@ func (this *NodeTaskDAO) CountDoingNodeTasks(tx *dbs.Tx) (int64, error) { Count() } -// 查找需要通知的任务 +// FindNotifyingNodeTasks 查找需要通知的任务 func (this *NodeTaskDAO) FindNotifyingNodeTasks(tx *dbs.Tx, size int64) (result []*NodeTask, err error) { _, err = this.Query(tx). Gt("nodeId", 0). @@ -261,7 +263,7 @@ func (this *NodeTaskDAO) FindNotifyingNodeTasks(tx *dbs.Tx, size int64) (result return } -// 设置任务已通知 +// UpdateTasksNotified 设置任务已通知 func (this *NodeTaskDAO) UpdateTasksNotified(tx *dbs.Tx, taskIds []int64) error { if len(taskIds) == 0 { return nil diff --git a/internal/db/models/server_dao.go b/internal/db/models/server_dao.go index 6259602a..b433a392 100644 --- a/internal/db/models/server_dao.go +++ b/internal/db/models/server_dao.go @@ -710,21 +710,24 @@ func (this *ServerDAO) ListEnabledServersMatch(tx *dbs.Tx, offset int64, size in // FindAllEnabledServersWithNode 获取节点中的所有服务 func (this *ServerDAO) FindAllEnabledServersWithNode(tx *dbs.Tx, nodeId int64) (result []*Server, err error) { - // 节点所在集群 - clusterId, err := SharedNodeDAO.FindNodeClusterId(tx, nodeId) + // 节点所在主集群 + clusterIds, err := SharedNodeDAO.FindEnabledAndOnNodeClusterIds(tx, nodeId) if err != nil { return nil, err } - if clusterId <= 0 { - return nil, nil + for _, clusterId := range clusterIds { + ones, err := this.Query(tx). + Attr("clusterId", clusterId). + State(ServerStateEnabled). + AscPk(). + FindAll() + if err != nil { + return nil, err + } + for _, one := range ones { + result = append(result, one.(*Server)) + } } - - _, err = this.Query(tx). - Attr("clusterId", clusterId). - State(ServerStateEnabled). - AscPk(). - Slice(&result). - FindAll() return } diff --git a/internal/db/models/server_dao_test.go b/internal/db/models/server_dao_test.go index 3e2546d0..c12727de 100644 --- a/internal/db/models/server_dao_test.go +++ b/internal/db/models/server_dao_test.go @@ -134,5 +134,16 @@ func TestServerDAO_ExistServerNameInCluster(t *testing.T) { } t.Log(exist) } - +} + +func TestServerDAO_FindAllEnabledServersWithNode(t *testing.T) { + dbs.NotifyReady() + + servers, err := SharedServerDAO.FindAllEnabledServersWithNode(nil, 48) + if err != nil { + t.Fatal(err) + } + for _, server := range servers { + t.Log("serverId:", server.Id, "clusterId:", server.ClusterId) + } } diff --git a/internal/dnsclients/types.go b/internal/dnsclients/types.go index 3a11fe9f..2cf8f1dc 100644 --- a/internal/dnsclients/types.go +++ b/internal/dnsclients/types.go @@ -46,9 +46,9 @@ func FindAllProviderTypes() []maps.Map { if teaconst.IsPlus { typeMaps = append(typeMaps, []maps.Map{ { - "name": "集成EdgeDNS", + "name": "自建EdgeDNS", "code": ProviderTypeLocalEdgeDNS, - "description": "当前企业版提供的DNS服务。", + "description": "当前企业版提供的自建DNS服务。", }, // TODO 需要实现用户使用AccessId/AccessKey来连接DNS服务 /**{ diff --git a/internal/rpc/services/service_dns_domain.go b/internal/rpc/services/service_dns_domain.go index 1cc52a42..068de6bc 100644 --- a/internal/rpc/services/service_dns_domain.go +++ b/internal/rpc/services/service_dns_domain.go @@ -396,7 +396,7 @@ func (this *DNSDomainService) findClusterDNSChanges(cluster *models.NodeCluster, tx := this.NullTx() // 节点域名 - nodes, err := models.SharedNodeDAO.FindAllEnabledNodesDNSWithClusterId(tx, clusterId) + nodes, err := models.SharedNodeDAO.FindAllEnabledNodesDNSWithClusterId(tx, clusterId, true) if err != nil { return nil, nil, nil, 0, 0, false, false, err } diff --git a/internal/rpc/services/service_node.go b/internal/rpc/services/service_node.go index 3804d4be..842cd829 100644 --- a/internal/rpc/services/service_node.go +++ b/internal/rpc/services/service_node.go @@ -19,6 +19,7 @@ import ( stringutil "github.com/iwind/TeaGo/utils/string" "net" "path/filepath" + "strings" ) // NodeService 边缘节点相关服务 @@ -49,10 +50,23 @@ func (this *NodeService) CreateNode(ctx context.Context, req *pb.CreateNodeReque } // 保存DNS相关 - if req.DnsDomainId > 0 && len(req.DnsRoutes) > 0 { - err = models.SharedNodeDAO.UpdateNodeDNS(tx, nodeId, map[int64][]string{ - req.DnsDomainId: req.DnsRoutes, - }) + if len(req.DnsRoutes) > 0 { + var routesMap = map[int64][]string{} + var m = map[int64][]string{} // domainId => codes + for _, route := range req.DnsRoutes { + var pieces = strings.SplitN(route, "@", 2) + if len(pieces) != 2 { + continue + } + var code = pieces[0] + var domainId = types.Int64(pieces[1]) + m[domainId] = append(m[domainId], code) + } + for domainId, codes := range m { + routesMap[domainId] = codes + } + + err = models.SharedNodeDAO.UpdateNodeDNS(tx, nodeId, routesMap) if err != nil { return nil, err } @@ -135,7 +149,7 @@ func (this *NodeService) CountAllEnabledNodesMatch(ctx context.Context, req *pb. tx := this.NullTx() - count, err := models.SharedNodeDAO.CountAllEnabledNodesMatch(tx, req.NodeClusterId, configutils.ToBoolState(req.InstallState), configutils.ToBoolState(req.ActiveState), req.Keyword, req.NodeGroupId, req.NodeRegionId) + count, err := models.SharedNodeDAO.CountAllEnabledNodesMatch(tx, req.NodeClusterId, configutils.ToBoolState(req.InstallState), configutils.ToBoolState(req.ActiveState), req.Keyword, req.NodeGroupId, req.NodeRegionId, true) if err != nil { return nil, err } @@ -188,18 +202,32 @@ func (this *NodeService) ListEnabledNodesMatch(ctx context.Context, req *pb.List order = "trafficOutDesc" } - nodes, err := models.SharedNodeDAO.ListEnabledNodesMatch(tx, req.NodeClusterId, configutils.ToBoolState(req.InstallState), configutils.ToBoolState(req.ActiveState), req.Keyword, req.NodeGroupId, req.NodeRegionId, order, req.Offset, req.Size) + nodes, err := models.SharedNodeDAO.ListEnabledNodesMatch(tx, req.NodeClusterId, configutils.ToBoolState(req.InstallState), configutils.ToBoolState(req.ActiveState), req.Keyword, req.NodeGroupId, req.NodeRegionId, true, order, req.Offset, req.Size) if err != nil { return nil, err } result := []*pb.Node{} for _, node := range nodes { - // 集群信息 + // 主集群信息 clusterName, err := models.SharedNodeClusterDAO.FindNodeClusterName(tx, int64(node.ClusterId)) if err != nil { return nil, err } + // 从集群 + secondaryClusters, err := models.SharedNodeClusterDAO.FindEnabledNodeClustersWithIds(tx, node.DecodeSecondaryClusterIds()) + if err != nil { + return nil, err + } + var pbSecondaryClusters = []*pb.NodeCluster{} + for _, secondaryCluster := range secondaryClusters { + pbSecondaryClusters = append(pbSecondaryClusters, &pb.NodeCluster{ + Id: int64(secondaryCluster.Id), + IsOn: secondaryCluster.IsOn == 1, + Name: secondaryCluster.Name, + }) + } + // 安装信息 installStatus, err := node.DecodeInstallStatus() if err != nil { @@ -276,13 +304,14 @@ func (this *NodeService) ListEnabledNodesMatch(ctx context.Context, req *pb.List Id: int64(node.ClusterId), Name: clusterName, }, - InstallStatus: installStatusResult, - MaxCPU: types.Int32(node.MaxCPU), - IsOn: node.IsOn == 1, - IsUp: node.IsUp == 1, - NodeGroup: pbGroup, - NodeRegion: pbRegion, - DnsRoutes: pbRoutes, + SecondaryNodeClusters: pbSecondaryClusters, + InstallStatus: installStatusResult, + MaxCPU: types.Int32(node.MaxCPU), + IsOn: node.IsOn == 1, + IsUp: node.IsUp == 1, + NodeGroup: pbGroup, + NodeRegion: pbRegion, + DnsRoutes: pbRoutes, }) } @@ -354,6 +383,22 @@ func (this *NodeService) DeleteNode(ctx context.Context, req *pb.DeleteNodeReque return this.Success() } +// DeleteNodeFromNodeCluster 从集群中删除节点 +func (this *NodeService) DeleteNodeFromNodeCluster(ctx context.Context, req *pb.DeleteNodeFromNodeClusterRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + tx := this.NullTx() + err = models.SharedNodeDAO.DeleteNodeFromCluster(tx, req.NodeId, req.NodeClusterId) + if err != nil { + return nil, err + } + + return this.Success() +} + // UpdateNode 修改节点 func (this *NodeService) UpdateNode(ctx context.Context, req *pb.UpdateNodeRequest) (*pb.RPCSuccess, error) { _, err := this.ValidateAdmin(ctx, 0) @@ -385,7 +430,7 @@ func (this *NodeService) UpdateNode(ctx context.Context, req *pb.UpdateNodeReque } } - err = models.SharedNodeDAO.UpdateNode(tx, req.NodeId, req.Name, req.NodeClusterId, req.NodeGroupId, req.NodeRegionId, req.MaxCPU, req.IsOn, maxCacheDiskCapacityJSON, maxCacheMemoryCapacityJSON) + err = models.SharedNodeDAO.UpdateNode(tx, req.NodeId, req.Name, req.NodeClusterId, req.SecondaryNodeClusterIds, req.NodeGroupId, req.NodeRegionId, req.MaxCPU, req.IsOn, maxCacheDiskCapacityJSON, maxCacheMemoryCapacityJSON) if err != nil { return nil, err } @@ -410,10 +455,26 @@ func (this *NodeService) UpdateNode(ctx context.Context, req *pb.UpdateNodeReque } // 保存DNS相关 - if req.DnsDomainId > 0 && len(req.DnsRoutes) > 0 { - err = models.SharedNodeDAO.UpdateNodeDNS(tx, req.NodeId, map[int64][]string{ - req.DnsDomainId: req.DnsRoutes, - }) + nodeDNS, err := models.SharedNodeDAO.FindEnabledNodeDNS(tx, req.NodeId) + if err != nil { + return nil, err + } + if nodeDNS != nil { + var routesMap = nodeDNS.DNSRouteCodes() + var m = map[int64][]string{} // domainId => codes + for _, route := range req.DnsRoutes { + var pieces = strings.SplitN(route, "@", 2) + if len(pieces) != 2 { + continue + } + var code = pieces[0] + var domainId = types.Int64(pieces[1]) + m[domainId] = append(m[domainId], code) + } + for domainId, codes := range m { + routesMap[domainId] = codes + } + err = models.SharedNodeDAO.UpdateNodeDNS(tx, req.NodeId, routesMap) if err != nil { return nil, err } @@ -439,12 +500,29 @@ func (this *NodeService) FindEnabledNode(ctx context.Context, req *pb.FindEnable return &pb.FindEnabledNodeResponse{Node: nil}, nil } - // 集群信息 + // 主集群信息 clusterName, err := models.SharedNodeClusterDAO.FindNodeClusterName(tx, int64(node.ClusterId)) if err != nil { return nil, err } + // 从集群信息 + var secondaryPBClusters []*pb.NodeCluster + for _, secondaryClusterId := range node.DecodeSecondaryClusterIds() { + cluster, err := models.SharedNodeClusterDAO.FindEnabledNodeCluster(tx, secondaryClusterId) + if err != nil { + return nil, err + } + if cluster == nil { + continue + } + secondaryPBClusters = append(secondaryPBClusters, &pb.NodeCluster{ + Id: int64(cluster.Id), + IsOn: cluster.IsOn == 1, + Name: cluster.Name, + }) + } + // 认证信息 login, err := models.SharedNodeLoginDAO.FindEnabledNodeLoginWithNodeId(tx, req.NodeId) if err != nil { @@ -542,6 +620,7 @@ func (this *NodeService) FindEnabledNode(ctx context.Context, req *pb.FindEnable Id: int64(node.ClusterId), Name: clusterName, }, + SecondaryNodeClusters: secondaryPBClusters, Login: respLogin, InstallStatus: installStatusResult, MaxCPU: types.Int32(node.MaxCPU), @@ -1117,7 +1196,7 @@ func (this *NodeService) FindAllEnabledNodesDNSWithNodeClusterId(ctx context.Con return nil, err } - nodes, err := models.SharedNodeDAO.FindAllEnabledNodesDNSWithClusterId(tx, req.NodeClusterId) + nodes, err := models.SharedNodeDAO.FindAllEnabledNodesDNSWithClusterId(tx, req.NodeClusterId, true) if err != nil { return nil, err } @@ -1155,10 +1234,11 @@ func (this *NodeService) FindAllEnabledNodesDNSWithNodeClusterId(ctx context.Con continue } result = append(result, &pb.NodeDNSInfo{ - Id: int64(node.Id), - Name: node.Name, - IpAddr: ip, - Routes: pbRoutes, + Id: int64(node.Id), + Name: node.Name, + IpAddr: ip, + Routes: pbRoutes, + NodeClusterId: req.NodeClusterId, }) } } @@ -1189,7 +1269,11 @@ func (this *NodeService) FindEnabledNodeDNS(ctx context.Context, req *pb.FindEna return nil, err } - clusterId := int64(node.ClusterId) + var clusterId = int64(node.ClusterId) + if req.NodeClusterId > 0 { + clusterId = req.NodeClusterId + } + clusterDNS, err := models.SharedNodeClusterDAO.FindClusterDNSInfo(tx, clusterId) if err != nil { return nil, err @@ -1256,14 +1340,26 @@ func (this *NodeService) UpdateNodeDNS(ctx context.Context, req *pb.UpdateNodeDN return nil, errors.New("node not found") } - routeCodeMap, err := node.DNSRouteCodes() - if err != nil { - return nil, err - } + routeCodeMap := node.DNSRouteCodes() if req.DnsDomainId > 0 && len(req.Routes) > 0 { - routeCodeMap[req.DnsDomainId] = req.Routes + var m = map[int64][]string{} // domainId => codes + for _, route := range req.Routes { + var pieces = strings.SplitN(route, "@", 2) + if len(pieces) != 2 { + continue + } + var code = pieces[0] + var domainId = types.Int64(pieces[1]) + m[domainId] = append(m[domainId], code) + } + for domainId, codes := range m { + routeCodeMap[domainId] = codes + } + } else { + delete(routeCodeMap, req.DnsDomainId) } + err = models.SharedNodeDAO.UpdateNodeDNS(tx, req.NodeId, routeCodeMap) if err != nil { return nil, err diff --git a/internal/rpc/services/service_node_cluster.go b/internal/rpc/services/service_node_cluster.go index 9f4b84dc..b2d96358 100644 --- a/internal/rpc/services/service_node_cluster.go +++ b/internal/rpc/services/service_node_cluster.go @@ -78,8 +78,19 @@ func (this *NodeClusterService) DeleteNodeCluster(ctx context.Context, req *pb.D return nil, err } + if req.NodeClusterId <= 0 { + return this.Success() + } + tx := this.NullTx() + // 转移节点 + err = models.SharedNodeDAO.TransferPrimaryClusterNodes(tx, req.NodeClusterId) + if err != nil { + return nil, err + } + + // 删除集群 err = models.SharedNodeClusterDAO.DisableNodeCluster(tx, req.NodeClusterId) if err != nil { return nil, err @@ -128,6 +139,7 @@ func (this *NodeClusterService) FindEnabledNodeCluster(ctx context.Context, req HttpFirewallPolicyId: int64(cluster.HttpFirewallPolicyId), DnsName: cluster.DnsName, DnsDomainId: int64(cluster.DnsDomainId), + IsOn: cluster.IsOn == 1, }}, nil } @@ -208,6 +220,7 @@ func (this *NodeClusterService) FindAllEnabledNodeClusters(ctx context.Context, CreatedAt: int64(cluster.CreatedAt), UniqueId: cluster.UniqueId, Secret: cluster.Secret, + IsOn: cluster.IsOn == 1, }) } @@ -259,6 +272,7 @@ func (this *NodeClusterService) ListEnabledNodeClusters(ctx context.Context, req Secret: cluster.Secret, DnsName: cluster.DnsName, DnsDomainId: int64(cluster.DnsDomainId), + IsOn: cluster.IsOn == 1, }) } @@ -372,6 +386,7 @@ func (this *NodeClusterService) FindAllEnabledNodeClustersWithNodeGrantId(ctx co CreatedAt: int64(cluster.CreatedAt), UniqueId: cluster.UniqueId, Secret: cluster.Secret, + IsOn: cluster.IsOn == 1, }) } return &pb.FindAllEnabledNodeClustersWithNodeGrantIdResponse{NodeClusters: result}, nil @@ -511,6 +526,7 @@ func (this *NodeClusterService) FindAllEnabledNodeClustersWithDNSDomainId(ctx co Name: cluster.Name, DnsName: cluster.DnsName, DnsDomainId: int64(cluster.DnsDomainId), + IsOn: cluster.IsOn == 1, }) } return &pb.FindAllEnabledNodeClustersWithDNSDomainIdResponse{NodeClusters: result}, nil @@ -666,6 +682,7 @@ func (this *NodeClusterService) FindAllEnabledNodeClustersWithHTTPCachePolicyId( result = append(result, &pb.NodeCluster{ Id: int64(cluster.Id), Name: cluster.Name, + IsOn: cluster.IsOn == 1, }) } return &pb.FindAllEnabledNodeClustersWithHTTPCachePolicyIdResponse{ @@ -707,6 +724,7 @@ func (this *NodeClusterService) FindAllEnabledNodeClustersWithHTTPFirewallPolicy result = append(result, &pb.NodeCluster{ Id: int64(cluster.Id), Name: cluster.Name, + IsOn: cluster.IsOn == 1, }) } return &pb.FindAllEnabledNodeClustersWithHTTPFirewallPolicyIdResponse{ @@ -884,6 +902,7 @@ func (this *NodeClusterService) FindLatestNodeClusters(ctx context.Context, req pbClusters = append(pbClusters, &pb.NodeCluster{ Id: int64(cluster.Id), Name: cluster.Name, + IsOn: cluster.IsOn == 1, }) } return &pb.FindLatestNodeClustersResponse{NodeClusters: pbClusters}, nil diff --git a/internal/rpc/services/service_server_stat_board.go b/internal/rpc/services/service_server_stat_board.go index 17e9b1e4..b789889e 100644 --- a/internal/rpc/services/service_server_stat_board.go +++ b/internal/rpc/services/service_server_stat_board.go @@ -61,13 +61,13 @@ func (this *ServerStatBoardService) ComposeServerStatNodeClusterBoard(ctx contex var result = &pb.ComposeServerStatNodeClusterBoardResponse{} // 统计数字 - countActiveNodes, err := models.SharedNodeDAO.CountAllEnabledNodesMatch(tx, req.NodeClusterId, configutils.BoolStateAll, configutils.BoolStateYes, "", 0, 0) + countActiveNodes, err := models.SharedNodeDAO.CountAllEnabledNodesMatch(tx, req.NodeClusterId, configutils.BoolStateAll, configutils.BoolStateYes, "", 0, 0, true) if err != nil { return nil, err } result.CountActiveNodes = countActiveNodes - countInactiveNodes, err := models.SharedNodeDAO.CountAllEnabledNodesMatch(tx, req.NodeClusterId, configutils.BoolStateAll, configutils.BoolStateNo, "", 0, 0) + countInactiveNodes, err := models.SharedNodeDAO.CountAllEnabledNodesMatch(tx, req.NodeClusterId, configutils.BoolStateAll, configutils.BoolStateNo, "", 0, 0, true) if err != nil { return nil, err } diff --git a/internal/tasks/dns_task_executor.go b/internal/tasks/dns_task_executor.go index 3f44d761..0199ac01 100644 --- a/internal/tasks/dns_task_executor.go +++ b/internal/tasks/dns_task_executor.go @@ -234,16 +234,17 @@ func (this *DNSTaskExecutor) doNode(taskId int64, nodeId int64) error { return nil } - if node.ClusterId == 0 { - isOk = true - return nil - } - // 转交给cluster统一处理 - err = dnsmodels.SharedDNSTaskDAO.CreateClusterTask(tx, int64(node.ClusterId), dnsmodels.DNSTaskTypeClusterChange) + clusterIds, err := models.SharedNodeDAO.FindEnabledAndOnNodeClusterIds(tx, nodeId) if err != nil { return err } + for _, clusterId := range clusterIds { + err = dnsmodels.SharedDNSTaskDAO.CreateClusterTask(tx, clusterId, dnsmodels.DNSTaskTypeClusterChange) + if err != nil { + return err + } + } isOk = true @@ -287,7 +288,7 @@ func (this *DNSTaskExecutor) doCluster(taskId int64, clusterId int64) error { // 当前的节点记录 newRecordKeys := []string{} - nodes, err := models.SharedNodeDAO.FindAllEnabledNodesDNSWithClusterId(tx, clusterId) + nodes, err := models.SharedNodeDAO.FindAllEnabledNodesDNSWithClusterId(tx, clusterId, true) if err != nil { return err }