mirror of
https://github.com/TeaOSLab/EdgeAPI.git
synced 2025-11-05 00:43:05 +08:00
初步实现多集群共享节点
This commit is contained in:
@@ -380,7 +380,7 @@ func (this *NodeClusterDAO) FindAllEnabledClustersWithDNSDomainId(tx *dbs.Tx, dn
|
||||
_, err = this.Query(tx).
|
||||
State(NodeClusterStateEnabled).
|
||||
Attr("dnsDomainId", dnsDomainId).
|
||||
Result("id", "name", "dnsName", "dnsDomainId").
|
||||
Result("id", "name", "dnsName", "dnsDomainId", "isOn").
|
||||
Slice(&result).
|
||||
FindAll()
|
||||
return
|
||||
@@ -391,7 +391,7 @@ func (this *NodeClusterDAO) FindAllEnabledClustersHaveDNSDomain(tx *dbs.Tx) (res
|
||||
_, err = this.Query(tx).
|
||||
State(NodeClusterStateEnabled).
|
||||
Gt("dnsDomainId", 0).
|
||||
Result("id", "name", "dnsName", "dnsDomainId").
|
||||
Result("id", "name", "dnsName", "dnsDomainId", "isOn").
|
||||
Slice(&result).
|
||||
FindAll()
|
||||
return
|
||||
@@ -409,7 +409,7 @@ func (this *NodeClusterDAO) FindClusterGrantId(tx *dbs.Tx, clusterId int64) (int
|
||||
func (this *NodeClusterDAO) FindClusterDNSInfo(tx *dbs.Tx, clusterId int64) (*NodeCluster, error) {
|
||||
one, err := this.Query(tx).
|
||||
Pk(clusterId).
|
||||
Result("id", "name", "dnsName", "dnsDomainId", "dns").
|
||||
Result("id", "name", "dnsName", "dnsDomainId", "dns", "isOn").
|
||||
Find()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -499,7 +499,7 @@ func (this *NodeClusterDAO) CheckClusterDNS(tx *dbs.Tx, cluster *NodeCluster) (i
|
||||
// TODO 检查域名是否已解析
|
||||
|
||||
// 检查节点
|
||||
nodes, err := SharedNodeDAO.FindAllEnabledNodesDNSWithClusterId(tx, clusterId)
|
||||
nodes, err := SharedNodeDAO.FindAllEnabledNodesDNSWithClusterId(tx, clusterId, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -837,6 +837,36 @@ func (this *NodeClusterDAO) FindLatestNodeClusters(tx *dbs.Tx, size int64) (resu
|
||||
return
|
||||
}
|
||||
|
||||
// CheckNodeClusterIsOn 获取集群是否正在启用状态
|
||||
func (this *NodeClusterDAO) CheckNodeClusterIsOn(tx *dbs.Tx, clusterId int64) (bool, error) {
|
||||
return this.Query(tx).
|
||||
Pk(clusterId).
|
||||
State(NodeClusterStateEnabled).
|
||||
Attr("isOn", true).
|
||||
Exist()
|
||||
}
|
||||
|
||||
// FindEnabledNodeClustersWithIds 查找一组集群
|
||||
func (this *NodeClusterDAO) FindEnabledNodeClustersWithIds(tx *dbs.Tx, clusterIds []int64) (result []*NodeCluster, err error) {
|
||||
if len(clusterIds) == 0 {
|
||||
return
|
||||
}
|
||||
for _, clusterId := range clusterIds {
|
||||
cluster, err := this.Query(tx).
|
||||
Pk(clusterId).
|
||||
State(NodeClusterStateEnabled).
|
||||
Find()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if cluster == nil {
|
||||
continue
|
||||
}
|
||||
result = append(result, cluster.(*NodeCluster))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// NotifyUpdate 通知更新
|
||||
func (this *NodeClusterDAO) NotifyUpdate(tx *dbs.Tx, clusterId int64) error {
|
||||
return SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, NodeTaskTypeConfigChanged)
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
package models
|
||||
|
||||
// 节点集群
|
||||
// NodeCluster 节点集群
|
||||
type NodeCluster struct {
|
||||
Id uint32 `field:"id"` // ID
|
||||
AdminId uint32 `field:"adminId"` // 管理员ID
|
||||
UserId uint32 `field:"userId"` // 用户ID
|
||||
IsOn uint8 `field:"isOn"` // 是否启用
|
||||
Name string `field:"name"` // 名称
|
||||
UseAllAPINodes uint8 `field:"useAllAPINodes"` // 是否使用所有API节点
|
||||
ApiNodes string `field:"apiNodes"` // 使用的API节点
|
||||
@@ -31,6 +32,7 @@ type NodeClusterOperator struct {
|
||||
Id interface{} // ID
|
||||
AdminId interface{} // 管理员ID
|
||||
UserId interface{} // 用户ID
|
||||
IsOn interface{} // 是否启用
|
||||
Name interface{} // 名称
|
||||
UseAllAPINodes interface{} // 是否使用所有API节点
|
||||
ApiNodes interface{} // 使用的API节点
|
||||
|
||||
@@ -163,7 +163,7 @@ func (this *NodeDAO) CreateNode(tx *dbs.Tx, adminId int64, name string, clusterI
|
||||
}
|
||||
|
||||
// UpdateNode 修改节点
|
||||
func (this *NodeDAO) UpdateNode(tx *dbs.Tx, nodeId int64, name string, clusterId int64, groupId int64, regionId int64, maxCPU int32, isOn bool, maxCacheDiskCapacityJSON []byte, maxCacheMemoryCapacityJSON []byte) error {
|
||||
func (this *NodeDAO) UpdateNode(tx *dbs.Tx, nodeId int64, name string, clusterId int64, secondaryClusterIds []int64, groupId int64, regionId int64, maxCPU int32, isOn bool, maxCacheDiskCapacityJSON []byte, maxCacheMemoryCapacityJSON []byte) error {
|
||||
if nodeId <= 0 {
|
||||
return errors.New("invalid nodeId")
|
||||
}
|
||||
@@ -171,6 +171,24 @@ func (this *NodeDAO) UpdateNode(tx *dbs.Tx, nodeId int64, name string, clusterId
|
||||
op.Id = nodeId
|
||||
op.Name = name
|
||||
op.ClusterId = clusterId
|
||||
|
||||
// 去重
|
||||
var filteredSecondaryClusterIds = []int64{}
|
||||
for _, secondaryClusterId := range secondaryClusterIds {
|
||||
if secondaryClusterId <= 0 {
|
||||
continue
|
||||
}
|
||||
if lists.ContainsInt64(filteredSecondaryClusterIds, secondaryClusterId) {
|
||||
continue
|
||||
}
|
||||
filteredSecondaryClusterIds = append(filteredSecondaryClusterIds, secondaryClusterId)
|
||||
}
|
||||
filteredSecondaryClusterIdsJSON, err := json.Marshal(filteredSecondaryClusterIds)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
op.SecondaryClusterIds = filteredSecondaryClusterIdsJSON
|
||||
|
||||
op.GroupId = groupId
|
||||
op.RegionId = regionId
|
||||
op.LatestVersion = dbs.SQL("latestVersion+1")
|
||||
@@ -182,7 +200,7 @@ func (this *NodeDAO) UpdateNode(tx *dbs.Tx, nodeId int64, name string, clusterId
|
||||
if len(maxCacheMemoryCapacityJSON) > 0 {
|
||||
op.MaxCacheMemoryCapacity = maxCacheMemoryCapacityJSON
|
||||
}
|
||||
err := this.Save(tx, op)
|
||||
err = this.Save(tx, op)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -212,6 +230,7 @@ func (this *NodeDAO) ListEnabledNodesMatch(tx *dbs.Tx,
|
||||
keyword string,
|
||||
groupId int64,
|
||||
regionId int64,
|
||||
includeSecondaryNodes bool,
|
||||
order string,
|
||||
offset int64,
|
||||
size int64) (result []*Node, err error) {
|
||||
@@ -223,7 +242,13 @@ func (this *NodeDAO) ListEnabledNodesMatch(tx *dbs.Tx,
|
||||
|
||||
// 集群
|
||||
if clusterId > 0 {
|
||||
query.Attr("clusterId", clusterId)
|
||||
if includeSecondaryNodes {
|
||||
query.Where("(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))").
|
||||
Param("primaryClusterId", clusterId).
|
||||
Param("primaryClusterIdString", types.String(clusterId))
|
||||
} else {
|
||||
query.Attr("clusterId", clusterId)
|
||||
}
|
||||
} else {
|
||||
query.Where("clusterId IN (SELECT id FROM " + SharedNodeClusterDAO.Table + " WHERE state=1)")
|
||||
}
|
||||
@@ -327,12 +352,51 @@ func (this *NodeDAO) FindNodeClusterId(tx *dbs.Tx, nodeId int64) (int64, error)
|
||||
return types.Int64(col), err
|
||||
}
|
||||
|
||||
// FindEnabledAndOnNodeClusterIds 获取节点所属所有可用而且启用的集群ID
|
||||
func (this *NodeDAO) FindEnabledAndOnNodeClusterIds(tx *dbs.Tx, nodeId int64) (result []int64, err error) {
|
||||
one, err := this.Query(tx).
|
||||
Pk(nodeId).
|
||||
Result("clusterId", "secondaryClusterIds").
|
||||
Find()
|
||||
if one == nil {
|
||||
return nil, err
|
||||
}
|
||||
var clusterId = int64(one.(*Node).ClusterId)
|
||||
if clusterId > 0 {
|
||||
result = append(result, clusterId)
|
||||
}
|
||||
|
||||
for _, clusterId := range one.(*Node).DecodeSecondaryClusterIds() {
|
||||
if lists.ContainsInt64(result, clusterId) {
|
||||
continue
|
||||
}
|
||||
|
||||
// 检查是否启用
|
||||
isOn, err := SharedNodeClusterDAO.CheckNodeClusterIsOn(tx, clusterId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !isOn {
|
||||
continue
|
||||
}
|
||||
|
||||
result = append(result, clusterId)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// FindAllNodeIdsMatch 匹配节点并返回节点ID
|
||||
func (this *NodeDAO) FindAllNodeIdsMatch(tx *dbs.Tx, clusterId int64, isOn configutils.BoolState) (result []int64, err error) {
|
||||
func (this *NodeDAO) FindAllNodeIdsMatch(tx *dbs.Tx, clusterId int64, includeSecondaryNodes bool, isOn configutils.BoolState) (result []int64, err error) {
|
||||
query := this.Query(tx)
|
||||
query.State(NodeStateEnabled)
|
||||
if clusterId > 0 {
|
||||
query.Attr("clusterId", clusterId)
|
||||
if includeSecondaryNodes {
|
||||
query.Where("(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))").
|
||||
Param("primaryClusterId", clusterId).
|
||||
Param("primaryClusterIdString", types.String(clusterId))
|
||||
} else {
|
||||
query.Attr("clusterId", clusterId)
|
||||
}
|
||||
} else {
|
||||
query.Where("clusterId IN (SELECT id FROM " + SharedNodeClusterDAO.Table + " WHERE state=1)")
|
||||
}
|
||||
@@ -385,13 +449,20 @@ func (this *NodeDAO) CountAllEnabledNodesMatch(tx *dbs.Tx,
|
||||
activeState configutils.BoolState,
|
||||
keyword string,
|
||||
groupId int64,
|
||||
regionId int64) (int64, error) {
|
||||
regionId int64,
|
||||
includeSecondaryNodes bool) (int64, error) {
|
||||
query := this.Query(tx)
|
||||
query.State(NodeStateEnabled)
|
||||
|
||||
// 集群
|
||||
if clusterId > 0 {
|
||||
query.Attr("clusterId", clusterId)
|
||||
if includeSecondaryNodes {
|
||||
query.Where("(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))").
|
||||
Param("primaryClusterId", clusterId).
|
||||
Param("primaryClusterIdString", types.String(clusterId))
|
||||
} else {
|
||||
query.Attr("clusterId", clusterId)
|
||||
}
|
||||
} else {
|
||||
query.Where("clusterId IN (SELECT id FROM " + SharedNodeClusterDAO.Table + " WHERE state=1)")
|
||||
}
|
||||
@@ -878,10 +949,20 @@ func (this *NodeDAO) CountAllEnabledNodesWithRegionId(tx *dbs.Tx, regionId int64
|
||||
}
|
||||
|
||||
// FindAllEnabledNodesDNSWithClusterId 获取一个集群的节点DNS信息
|
||||
func (this *NodeDAO) FindAllEnabledNodesDNSWithClusterId(tx *dbs.Tx, clusterId int64) (result []*Node, err error) {
|
||||
_, err = this.Query(tx).
|
||||
func (this *NodeDAO) FindAllEnabledNodesDNSWithClusterId(tx *dbs.Tx, clusterId int64, includeSecondaryNodes bool) (result []*Node, err error) {
|
||||
if clusterId <= 0 {
|
||||
return nil, nil
|
||||
}
|
||||
var query = this.Query(tx)
|
||||
if includeSecondaryNodes {
|
||||
query.Where("(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))").
|
||||
Param("primaryClusterId", clusterId).
|
||||
Param("primaryClusterIdString", types.String(clusterId))
|
||||
} else {
|
||||
query.Attr("clusterId", clusterId)
|
||||
}
|
||||
_, err = query.
|
||||
State(NodeStateEnabled).
|
||||
Attr("clusterId", clusterId).
|
||||
Attr("isOn", true).
|
||||
Attr("isUp", true).
|
||||
Result("id", "name", "dnsRoutes", "isOn").
|
||||
@@ -911,7 +992,7 @@ func (this *NodeDAO) FindEnabledNodeDNS(tx *dbs.Tx, nodeId int64) (*Node, error)
|
||||
Pk(nodeId).
|
||||
Result("id", "name", "dnsRoutes", "clusterId", "isOn").
|
||||
Find()
|
||||
if err != nil || one == nil {
|
||||
if one == nil {
|
||||
return nil, err
|
||||
}
|
||||
return one.(*Node), nil
|
||||
@@ -1108,6 +1189,89 @@ func (this *NodeDAO) FindEnabledNodesWithIds(tx *dbs.Tx, nodeIds []int64) (resul
|
||||
return
|
||||
}
|
||||
|
||||
// DeleteNodeFromCluster 从集群中删除节点
|
||||
func (this *NodeDAO) DeleteNodeFromCluster(tx *dbs.Tx, nodeId int64, clusterId int64) error {
|
||||
one, err := this.Query(tx).
|
||||
Pk(nodeId).
|
||||
Result("clusterId", "secondaryClusterIds").
|
||||
Find()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if one == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var node = one.(*Node)
|
||||
|
||||
var secondaryClusterIds = []int64{}
|
||||
for _, secondaryClusterId := range node.DecodeSecondaryClusterIds() {
|
||||
if secondaryClusterId == clusterId {
|
||||
continue
|
||||
}
|
||||
secondaryClusterIds = append(secondaryClusterIds, secondaryClusterId)
|
||||
}
|
||||
|
||||
var newClusterId = int64(node.ClusterId)
|
||||
|
||||
if newClusterId == clusterId {
|
||||
newClusterId = 0
|
||||
|
||||
// 选择一个从集群作为主集群
|
||||
if len(secondaryClusterIds) > 0 {
|
||||
newClusterId = secondaryClusterIds[0]
|
||||
secondaryClusterIds = secondaryClusterIds[1:]
|
||||
}
|
||||
}
|
||||
|
||||
secondaryClusterIdsJSON, err := json.Marshal(secondaryClusterIds)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
op := NewNodeOperator()
|
||||
op.Id = nodeId
|
||||
op.ClusterId = newClusterId
|
||||
op.SecondaryClusterIds = secondaryClusterIdsJSON
|
||||
return this.Save(tx, op)
|
||||
}
|
||||
|
||||
// TransferPrimaryClusterNodes 自动转移集群下的节点
|
||||
func (this *NodeDAO) TransferPrimaryClusterNodes(tx *dbs.Tx, primaryClusterId int64) error {
|
||||
if primaryClusterId <= 0 {
|
||||
return nil
|
||||
}
|
||||
ones, err := this.Query(tx).
|
||||
Attr("clusterId", primaryClusterId).
|
||||
Result("id", "secondaryClusterIds").
|
||||
State(NodeStateEnabled).
|
||||
FindAll()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, one := range ones {
|
||||
var node = one.(*Node)
|
||||
clusterIds := node.DecodeSecondaryClusterIds()
|
||||
if len(clusterIds) == 0 {
|
||||
continue
|
||||
}
|
||||
var clusterId = clusterIds[0]
|
||||
var secondaryClusterIds = clusterIds[1:]
|
||||
secondaryClusterIdsJSON, err := json.Marshal(secondaryClusterIds)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = this.Query(tx).
|
||||
Pk(node.Id).
|
||||
Set("clusterId", clusterId).
|
||||
Set("secondaryClusterIds", secondaryClusterIdsJSON).
|
||||
UpdateQuickly()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// NotifyUpdate 通知更新
|
||||
func (this *NodeDAO) NotifyUpdate(tx *dbs.Tx, nodeId int64) error {
|
||||
clusterId, err := this.FindNodeClusterId(tx, nodeId)
|
||||
@@ -1122,25 +1286,25 @@ func (this *NodeDAO) NotifyUpdate(tx *dbs.Tx, nodeId int64) error {
|
||||
|
||||
// NotifyDNSUpdate 通知DNS更新
|
||||
func (this *NodeDAO) NotifyDNSUpdate(tx *dbs.Tx, nodeId int64) error {
|
||||
clusterId, err := this.Query(tx).
|
||||
Pk(nodeId).
|
||||
Result("clusterId").
|
||||
FindInt64Col(0) // 这里不需要加服务状态条件,因为我们即使删除也要删除对应的服务的DNS解析
|
||||
clusterIds, err := this.FindEnabledAndOnNodeClusterIds(tx, nodeId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if clusterId <= 0 {
|
||||
return nil
|
||||
for _, clusterId := range clusterIds {
|
||||
dnsInfo, err := SharedNodeClusterDAO.FindClusterDNSInfo(tx, clusterId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if dnsInfo == nil {
|
||||
continue
|
||||
}
|
||||
if len(dnsInfo.DnsName) == 0 || dnsInfo.DnsDomainId <= 0 {
|
||||
continue
|
||||
}
|
||||
err = dns.SharedDNSTaskDAO.CreateNodeTask(tx, nodeId, dns.DNSTaskTypeNodeChange)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
dnsInfo, err := SharedNodeClusterDAO.FindClusterDNSInfo(tx, clusterId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if dnsInfo == nil {
|
||||
return nil
|
||||
}
|
||||
if len(dnsInfo.DnsName) == 0 || dnsInfo.DnsDomainId <= 0 {
|
||||
return nil
|
||||
}
|
||||
return dns.SharedDNSTaskDAO.CreateNodeTask(tx, nodeId, dns.DNSTaskTypeNodeChange)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -24,3 +24,13 @@ func TestNodeDAO_UpdateNodeUp(t *testing.T) {
|
||||
}
|
||||
t.Log("ok")
|
||||
}
|
||||
|
||||
func TestNodeDAO_FindEnabledNodeClusterIds(t *testing.T) {
|
||||
dbs.NotifyReady()
|
||||
var tx *dbs.Tx
|
||||
clusterIds, err := NewNodeDAO().FindEnabledAndOnNodeClusterIds(tx, 48)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log(clusterIds)
|
||||
}
|
||||
|
||||
@@ -14,7 +14,8 @@ type Node struct {
|
||||
Secret string `field:"secret"` // 密钥
|
||||
Name string `field:"name"` // 节点名
|
||||
Code string `field:"code"` // 代号
|
||||
ClusterId uint32 `field:"clusterId"` // 集群ID
|
||||
ClusterId uint32 `field:"clusterId"` // 主集群ID
|
||||
SecondaryClusterIds string `field:"secondaryClusterIds"` // 从集群ID
|
||||
RegionId uint32 `field:"regionId"` // 区域ID
|
||||
GroupId uint32 `field:"groupId"` // 分组ID
|
||||
CreatedAt uint64 `field:"createdAt"` // 创建时间
|
||||
@@ -45,7 +46,8 @@ type NodeOperator struct {
|
||||
Secret interface{} // 密钥
|
||||
Name interface{} // 节点名
|
||||
Code interface{} // 代号
|
||||
ClusterId interface{} // 集群ID
|
||||
ClusterId interface{} // 主集群ID
|
||||
SecondaryClusterIds interface{} // 从集群ID
|
||||
RegionId interface{} // 区域ID
|
||||
GroupId interface{} // 分组ID
|
||||
CreatedAt interface{} // 创建时间
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// 安装状态
|
||||
// DecodeInstallStatus 安装状态
|
||||
func (this *Node) DecodeInstallStatus() (*NodeInstallStatus, error) {
|
||||
if len(this.InstallStatus) == 0 || this.InstallStatus == "null" {
|
||||
return NewNodeInstallStatus(), nil
|
||||
@@ -27,7 +27,7 @@ func (this *Node) DecodeInstallStatus() (*NodeInstallStatus, error) {
|
||||
return status, nil
|
||||
}
|
||||
|
||||
// 节点状态
|
||||
// DecodeStatus 节点状态
|
||||
func (this *Node) DecodeStatus() (*nodeconfigs.NodeStatus, error) {
|
||||
if len(this.Status) == 0 || this.Status == "null" {
|
||||
return nil, nil
|
||||
@@ -40,20 +40,21 @@ func (this *Node) DecodeStatus() (*nodeconfigs.NodeStatus, error) {
|
||||
return status, nil
|
||||
}
|
||||
|
||||
// 所有的DNS线路
|
||||
func (this *Node) DNSRouteCodes() (map[int64][]string, error) {
|
||||
// DNSRouteCodes 所有的DNS线路
|
||||
func (this *Node) DNSRouteCodes() map[int64][]string {
|
||||
routes := map[int64][]string{} // domainId => routes
|
||||
if len(this.DnsRoutes) == 0 || this.DnsRoutes == "null" {
|
||||
return routes, nil
|
||||
return routes
|
||||
}
|
||||
err := json.Unmarshal([]byte(this.DnsRoutes), &routes)
|
||||
if err != nil {
|
||||
return map[int64][]string{}, err
|
||||
// 忽略错误
|
||||
return routes
|
||||
}
|
||||
return routes, nil
|
||||
return routes
|
||||
}
|
||||
|
||||
// DNS线路
|
||||
// DNSRouteCodesForDomainId DNS线路
|
||||
func (this *Node) DNSRouteCodesForDomainId(dnsDomainId int64) ([]string, error) {
|
||||
routes := map[int64][]string{} // domainId => routes
|
||||
if len(this.DnsRoutes) == 0 || this.DnsRoutes == "null" {
|
||||
@@ -67,7 +68,7 @@ func (this *Node) DNSRouteCodesForDomainId(dnsDomainId int64) ([]string, error)
|
||||
return domainRoutes, nil
|
||||
}
|
||||
|
||||
// 连接的API
|
||||
// DecodeConnectedAPINodeIds 连接的API
|
||||
func (this *Node) DecodeConnectedAPINodeIds() ([]int64, error) {
|
||||
apiNodeIds := []int64{}
|
||||
if IsNotNull(this.ConnectedAPINodes) {
|
||||
@@ -78,3 +79,14 @@ func (this *Node) DecodeConnectedAPINodeIds() ([]int64, error) {
|
||||
}
|
||||
return apiNodeIds, nil
|
||||
}
|
||||
|
||||
// DecodeSecondaryClusterIds 从集群IDs
|
||||
func (this *Node) DecodeSecondaryClusterIds() []int64 {
|
||||
if len(this.SecondaryClusterIds) == 0 {
|
||||
return []int64{}
|
||||
}
|
||||
var result = []int64{}
|
||||
// 不需要处理错误
|
||||
_ = json.Unmarshal([]byte(this.SecondaryClusterIds), &result)
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
"github.com/iwind/TeaGo/dbs"
|
||||
"github.com/iwind/TeaGo/maps"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -39,7 +40,7 @@ func init() {
|
||||
})
|
||||
}
|
||||
|
||||
// 创建单个节点任务
|
||||
// CreateNodeTask 创建单个节点任务
|
||||
func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, clusterId int64, nodeId int64, taskType NodeTaskType) error {
|
||||
if clusterId <= 0 || nodeId <= 0 {
|
||||
return nil
|
||||
@@ -66,7 +67,7 @@ func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, clusterId int64, nodeId int6
|
||||
return err
|
||||
}
|
||||
|
||||
// 创建集群任务
|
||||
// CreateClusterTask 创建集群任务
|
||||
func (this *NodeTaskDAO) CreateClusterTask(tx *dbs.Tx, clusterId int64, taskType NodeTaskType) error {
|
||||
if clusterId <= 0 {
|
||||
return nil
|
||||
@@ -95,15 +96,16 @@ func (this *NodeTaskDAO) CreateClusterTask(tx *dbs.Tx, clusterId int64, taskType
|
||||
return err
|
||||
}
|
||||
|
||||
// 分解集群任务
|
||||
// ExtractClusterTask 分解集群任务
|
||||
func (this *NodeTaskDAO) ExtractClusterTask(tx *dbs.Tx, clusterId int64, taskType NodeTaskType) error {
|
||||
nodeIds, err := SharedNodeDAO.FindAllNodeIdsMatch(tx, clusterId, configutils.BoolStateYes)
|
||||
nodeIds, err := SharedNodeDAO.FindAllNodeIdsMatch(tx, clusterId, true, configutils.BoolStateYes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = this.Query(tx).
|
||||
Attr("clusterId", clusterId).
|
||||
Param("clusterIdString", types.String(clusterId)).
|
||||
Where("nodeId> 0").
|
||||
Attr("type", taskType).
|
||||
Delete()
|
||||
@@ -130,7 +132,7 @@ func (this *NodeTaskDAO) ExtractClusterTask(tx *dbs.Tx, clusterId int64, taskTyp
|
||||
return nil
|
||||
}
|
||||
|
||||
// 分解所有集群任务
|
||||
// ExtractAllClusterTasks 分解所有集群任务
|
||||
func (this *NodeTaskDAO) ExtractAllClusterTasks(tx *dbs.Tx) error {
|
||||
ones, err := this.Query(tx).
|
||||
Attr("nodeId", 0).
|
||||
@@ -148,7 +150,7 @@ func (this *NodeTaskDAO) ExtractAllClusterTasks(tx *dbs.Tx) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 删除集群所有相关任务
|
||||
// DeleteAllClusterTasks 删除集群所有相关任务
|
||||
func (this *NodeTaskDAO) DeleteAllClusterTasks(tx *dbs.Tx, clusterId int64) error {
|
||||
_, err := this.Query(tx).
|
||||
Attr("clusterId", clusterId).
|
||||
@@ -156,7 +158,7 @@ func (this *NodeTaskDAO) DeleteAllClusterTasks(tx *dbs.Tx, clusterId int64) erro
|
||||
return err
|
||||
}
|
||||
|
||||
// 删除节点相关任务
|
||||
// DeleteNodeTasks 删除节点相关任务
|
||||
func (this *NodeTaskDAO) DeleteNodeTasks(tx *dbs.Tx, nodeId int64) error {
|
||||
_, err := this.Query(tx).
|
||||
Attr("nodeId", nodeId).
|
||||
@@ -164,7 +166,7 @@ func (this *NodeTaskDAO) DeleteNodeTasks(tx *dbs.Tx, nodeId int64) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// 查询一个节点的所有任务
|
||||
// FindDoingNodeTasks 查询一个节点的所有任务
|
||||
func (this *NodeTaskDAO) FindDoingNodeTasks(tx *dbs.Tx, nodeId int64) (result []*NodeTask, err error) {
|
||||
if nodeId <= 0 {
|
||||
return
|
||||
@@ -177,7 +179,7 @@ func (this *NodeTaskDAO) FindDoingNodeTasks(tx *dbs.Tx, nodeId int64) (result []
|
||||
return
|
||||
}
|
||||
|
||||
// 修改节点任务的完成状态
|
||||
// UpdateNodeTaskDone 修改节点任务的完成状态
|
||||
func (this *NodeTaskDAO) UpdateNodeTaskDone(tx *dbs.Tx, taskId int64, isOk bool, errorMessage string) error {
|
||||
_, err := this.Query(tx).
|
||||
Pk(taskId).
|
||||
@@ -188,7 +190,7 @@ func (this *NodeTaskDAO) UpdateNodeTaskDone(tx *dbs.Tx, taskId int64, isOk bool,
|
||||
return err
|
||||
}
|
||||
|
||||
// 查找正在更新的集群IDs
|
||||
// FindAllDoingTaskClusterIds 查找正在更新的集群IDs
|
||||
func (this *NodeTaskDAO) FindAllDoingTaskClusterIds(tx *dbs.Tx) ([]int64, error) {
|
||||
ones, _, err := this.Query(tx).
|
||||
Result("DISTINCT(clusterId) AS clusterId").
|
||||
@@ -204,7 +206,7 @@ func (this *NodeTaskDAO) FindAllDoingTaskClusterIds(tx *dbs.Tx) ([]int64, error)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// 查询某个集群下所有的任务
|
||||
// FindAllDoingNodeTasksWithClusterId 查询某个集群下所有的任务
|
||||
func (this *NodeTaskDAO) FindAllDoingNodeTasksWithClusterId(tx *dbs.Tx, clusterId int64) (result []*NodeTask, err error) {
|
||||
_, err = this.Query(tx).
|
||||
Attr("clusterId", clusterId).
|
||||
@@ -218,7 +220,7 @@ func (this *NodeTaskDAO) FindAllDoingNodeTasksWithClusterId(tx *dbs.Tx, clusterI
|
||||
return
|
||||
}
|
||||
|
||||
// 检查是否有正在执行的任务
|
||||
// ExistsDoingNodeTasks 检查是否有正在执行的任务
|
||||
func (this *NodeTaskDAO) ExistsDoingNodeTasks(tx *dbs.Tx) (bool, error) {
|
||||
return this.Query(tx).
|
||||
Where("(isDone=0 OR (isDone=1 AND isOk=0))").
|
||||
@@ -226,14 +228,14 @@ func (this *NodeTaskDAO) ExistsDoingNodeTasks(tx *dbs.Tx) (bool, error) {
|
||||
Exist()
|
||||
}
|
||||
|
||||
// 是否有错误的任务
|
||||
// ExistsErrorNodeTasks 是否有错误的任务
|
||||
func (this *NodeTaskDAO) ExistsErrorNodeTasks(tx *dbs.Tx) (bool, error) {
|
||||
return this.Query(tx).
|
||||
Where("(isDone=1 AND isOk=0)").
|
||||
Exist()
|
||||
}
|
||||
|
||||
// 删除任务
|
||||
// DeleteNodeTask 删除任务
|
||||
func (this *NodeTaskDAO) DeleteNodeTask(tx *dbs.Tx, taskId int64) error {
|
||||
_, err := this.Query(tx).
|
||||
Pk(taskId).
|
||||
@@ -241,7 +243,7 @@ func (this *NodeTaskDAO) DeleteNodeTask(tx *dbs.Tx, taskId int64) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// 计算正在执行的任务
|
||||
// CountDoingNodeTasks 计算正在执行的任务
|
||||
func (this *NodeTaskDAO) CountDoingNodeTasks(tx *dbs.Tx) (int64, error) {
|
||||
return this.Query(tx).
|
||||
Attr("isDone", 0).
|
||||
@@ -249,7 +251,7 @@ func (this *NodeTaskDAO) CountDoingNodeTasks(tx *dbs.Tx) (int64, error) {
|
||||
Count()
|
||||
}
|
||||
|
||||
// 查找需要通知的任务
|
||||
// FindNotifyingNodeTasks 查找需要通知的任务
|
||||
func (this *NodeTaskDAO) FindNotifyingNodeTasks(tx *dbs.Tx, size int64) (result []*NodeTask, err error) {
|
||||
_, err = this.Query(tx).
|
||||
Gt("nodeId", 0).
|
||||
@@ -261,7 +263,7 @@ func (this *NodeTaskDAO) FindNotifyingNodeTasks(tx *dbs.Tx, size int64) (result
|
||||
return
|
||||
}
|
||||
|
||||
// 设置任务已通知
|
||||
// UpdateTasksNotified 设置任务已通知
|
||||
func (this *NodeTaskDAO) UpdateTasksNotified(tx *dbs.Tx, taskIds []int64) error {
|
||||
if len(taskIds) == 0 {
|
||||
return nil
|
||||
|
||||
@@ -710,21 +710,24 @@ func (this *ServerDAO) ListEnabledServersMatch(tx *dbs.Tx, offset int64, size in
|
||||
|
||||
// FindAllEnabledServersWithNode 获取节点中的所有服务
|
||||
func (this *ServerDAO) FindAllEnabledServersWithNode(tx *dbs.Tx, nodeId int64) (result []*Server, err error) {
|
||||
// 节点所在集群
|
||||
clusterId, err := SharedNodeDAO.FindNodeClusterId(tx, nodeId)
|
||||
// 节点所在主集群
|
||||
clusterIds, err := SharedNodeDAO.FindEnabledAndOnNodeClusterIds(tx, nodeId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if clusterId <= 0 {
|
||||
return nil, nil
|
||||
for _, clusterId := range clusterIds {
|
||||
ones, err := this.Query(tx).
|
||||
Attr("clusterId", clusterId).
|
||||
State(ServerStateEnabled).
|
||||
AscPk().
|
||||
FindAll()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, one := range ones {
|
||||
result = append(result, one.(*Server))
|
||||
}
|
||||
}
|
||||
|
||||
_, err = this.Query(tx).
|
||||
Attr("clusterId", clusterId).
|
||||
State(ServerStateEnabled).
|
||||
AscPk().
|
||||
Slice(&result).
|
||||
FindAll()
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -134,5 +134,16 @@ func TestServerDAO_ExistServerNameInCluster(t *testing.T) {
|
||||
}
|
||||
t.Log(exist)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestServerDAO_FindAllEnabledServersWithNode(t *testing.T) {
|
||||
dbs.NotifyReady()
|
||||
|
||||
servers, err := SharedServerDAO.FindAllEnabledServersWithNode(nil, 48)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _, server := range servers {
|
||||
t.Log("serverId:", server.Id, "clusterId:", server.ClusterId)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,9 +46,9 @@ func FindAllProviderTypes() []maps.Map {
|
||||
if teaconst.IsPlus {
|
||||
typeMaps = append(typeMaps, []maps.Map{
|
||||
{
|
||||
"name": "集成EdgeDNS",
|
||||
"name": "自建EdgeDNS",
|
||||
"code": ProviderTypeLocalEdgeDNS,
|
||||
"description": "当前企业版提供的DNS服务。",
|
||||
"description": "当前企业版提供的自建DNS服务。",
|
||||
},
|
||||
// TODO 需要实现用户使用AccessId/AccessKey来连接DNS服务
|
||||
/**{
|
||||
|
||||
@@ -396,7 +396,7 @@ func (this *DNSDomainService) findClusterDNSChanges(cluster *models.NodeCluster,
|
||||
tx := this.NullTx()
|
||||
|
||||
// 节点域名
|
||||
nodes, err := models.SharedNodeDAO.FindAllEnabledNodesDNSWithClusterId(tx, clusterId)
|
||||
nodes, err := models.SharedNodeDAO.FindAllEnabledNodesDNSWithClusterId(tx, clusterId, true)
|
||||
if err != nil {
|
||||
return nil, nil, nil, 0, 0, false, false, err
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
stringutil "github.com/iwind/TeaGo/utils/string"
|
||||
"net"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// NodeService 边缘节点相关服务
|
||||
@@ -49,10 +50,23 @@ func (this *NodeService) CreateNode(ctx context.Context, req *pb.CreateNodeReque
|
||||
}
|
||||
|
||||
// 保存DNS相关
|
||||
if req.DnsDomainId > 0 && len(req.DnsRoutes) > 0 {
|
||||
err = models.SharedNodeDAO.UpdateNodeDNS(tx, nodeId, map[int64][]string{
|
||||
req.DnsDomainId: req.DnsRoutes,
|
||||
})
|
||||
if len(req.DnsRoutes) > 0 {
|
||||
var routesMap = map[int64][]string{}
|
||||
var m = map[int64][]string{} // domainId => codes
|
||||
for _, route := range req.DnsRoutes {
|
||||
var pieces = strings.SplitN(route, "@", 2)
|
||||
if len(pieces) != 2 {
|
||||
continue
|
||||
}
|
||||
var code = pieces[0]
|
||||
var domainId = types.Int64(pieces[1])
|
||||
m[domainId] = append(m[domainId], code)
|
||||
}
|
||||
for domainId, codes := range m {
|
||||
routesMap[domainId] = codes
|
||||
}
|
||||
|
||||
err = models.SharedNodeDAO.UpdateNodeDNS(tx, nodeId, routesMap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -135,7 +149,7 @@ func (this *NodeService) CountAllEnabledNodesMatch(ctx context.Context, req *pb.
|
||||
|
||||
tx := this.NullTx()
|
||||
|
||||
count, err := models.SharedNodeDAO.CountAllEnabledNodesMatch(tx, req.NodeClusterId, configutils.ToBoolState(req.InstallState), configutils.ToBoolState(req.ActiveState), req.Keyword, req.NodeGroupId, req.NodeRegionId)
|
||||
count, err := models.SharedNodeDAO.CountAllEnabledNodesMatch(tx, req.NodeClusterId, configutils.ToBoolState(req.InstallState), configutils.ToBoolState(req.ActiveState), req.Keyword, req.NodeGroupId, req.NodeRegionId, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -188,18 +202,32 @@ func (this *NodeService) ListEnabledNodesMatch(ctx context.Context, req *pb.List
|
||||
order = "trafficOutDesc"
|
||||
}
|
||||
|
||||
nodes, err := models.SharedNodeDAO.ListEnabledNodesMatch(tx, req.NodeClusterId, configutils.ToBoolState(req.InstallState), configutils.ToBoolState(req.ActiveState), req.Keyword, req.NodeGroupId, req.NodeRegionId, order, req.Offset, req.Size)
|
||||
nodes, err := models.SharedNodeDAO.ListEnabledNodesMatch(tx, req.NodeClusterId, configutils.ToBoolState(req.InstallState), configutils.ToBoolState(req.ActiveState), req.Keyword, req.NodeGroupId, req.NodeRegionId, true, order, req.Offset, req.Size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result := []*pb.Node{}
|
||||
for _, node := range nodes {
|
||||
// 集群信息
|
||||
// 主集群信息
|
||||
clusterName, err := models.SharedNodeClusterDAO.FindNodeClusterName(tx, int64(node.ClusterId))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 从集群
|
||||
secondaryClusters, err := models.SharedNodeClusterDAO.FindEnabledNodeClustersWithIds(tx, node.DecodeSecondaryClusterIds())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var pbSecondaryClusters = []*pb.NodeCluster{}
|
||||
for _, secondaryCluster := range secondaryClusters {
|
||||
pbSecondaryClusters = append(pbSecondaryClusters, &pb.NodeCluster{
|
||||
Id: int64(secondaryCluster.Id),
|
||||
IsOn: secondaryCluster.IsOn == 1,
|
||||
Name: secondaryCluster.Name,
|
||||
})
|
||||
}
|
||||
|
||||
// 安装信息
|
||||
installStatus, err := node.DecodeInstallStatus()
|
||||
if err != nil {
|
||||
@@ -276,13 +304,14 @@ func (this *NodeService) ListEnabledNodesMatch(ctx context.Context, req *pb.List
|
||||
Id: int64(node.ClusterId),
|
||||
Name: clusterName,
|
||||
},
|
||||
InstallStatus: installStatusResult,
|
||||
MaxCPU: types.Int32(node.MaxCPU),
|
||||
IsOn: node.IsOn == 1,
|
||||
IsUp: node.IsUp == 1,
|
||||
NodeGroup: pbGroup,
|
||||
NodeRegion: pbRegion,
|
||||
DnsRoutes: pbRoutes,
|
||||
SecondaryNodeClusters: pbSecondaryClusters,
|
||||
InstallStatus: installStatusResult,
|
||||
MaxCPU: types.Int32(node.MaxCPU),
|
||||
IsOn: node.IsOn == 1,
|
||||
IsUp: node.IsUp == 1,
|
||||
NodeGroup: pbGroup,
|
||||
NodeRegion: pbRegion,
|
||||
DnsRoutes: pbRoutes,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -354,6 +383,22 @@ func (this *NodeService) DeleteNode(ctx context.Context, req *pb.DeleteNodeReque
|
||||
return this.Success()
|
||||
}
|
||||
|
||||
// DeleteNodeFromNodeCluster 从集群中删除节点
|
||||
func (this *NodeService) DeleteNodeFromNodeCluster(ctx context.Context, req *pb.DeleteNodeFromNodeClusterRequest) (*pb.RPCSuccess, error) {
|
||||
_, err := this.ValidateAdmin(ctx, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tx := this.NullTx()
|
||||
err = models.SharedNodeDAO.DeleteNodeFromCluster(tx, req.NodeId, req.NodeClusterId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return this.Success()
|
||||
}
|
||||
|
||||
// UpdateNode 修改节点
|
||||
func (this *NodeService) UpdateNode(ctx context.Context, req *pb.UpdateNodeRequest) (*pb.RPCSuccess, error) {
|
||||
_, err := this.ValidateAdmin(ctx, 0)
|
||||
@@ -385,7 +430,7 @@ func (this *NodeService) UpdateNode(ctx context.Context, req *pb.UpdateNodeReque
|
||||
}
|
||||
}
|
||||
|
||||
err = models.SharedNodeDAO.UpdateNode(tx, req.NodeId, req.Name, req.NodeClusterId, req.NodeGroupId, req.NodeRegionId, req.MaxCPU, req.IsOn, maxCacheDiskCapacityJSON, maxCacheMemoryCapacityJSON)
|
||||
err = models.SharedNodeDAO.UpdateNode(tx, req.NodeId, req.Name, req.NodeClusterId, req.SecondaryNodeClusterIds, req.NodeGroupId, req.NodeRegionId, req.MaxCPU, req.IsOn, maxCacheDiskCapacityJSON, maxCacheMemoryCapacityJSON)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -410,10 +455,26 @@ func (this *NodeService) UpdateNode(ctx context.Context, req *pb.UpdateNodeReque
|
||||
}
|
||||
|
||||
// 保存DNS相关
|
||||
if req.DnsDomainId > 0 && len(req.DnsRoutes) > 0 {
|
||||
err = models.SharedNodeDAO.UpdateNodeDNS(tx, req.NodeId, map[int64][]string{
|
||||
req.DnsDomainId: req.DnsRoutes,
|
||||
})
|
||||
nodeDNS, err := models.SharedNodeDAO.FindEnabledNodeDNS(tx, req.NodeId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if nodeDNS != nil {
|
||||
var routesMap = nodeDNS.DNSRouteCodes()
|
||||
var m = map[int64][]string{} // domainId => codes
|
||||
for _, route := range req.DnsRoutes {
|
||||
var pieces = strings.SplitN(route, "@", 2)
|
||||
if len(pieces) != 2 {
|
||||
continue
|
||||
}
|
||||
var code = pieces[0]
|
||||
var domainId = types.Int64(pieces[1])
|
||||
m[domainId] = append(m[domainId], code)
|
||||
}
|
||||
for domainId, codes := range m {
|
||||
routesMap[domainId] = codes
|
||||
}
|
||||
err = models.SharedNodeDAO.UpdateNodeDNS(tx, req.NodeId, routesMap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -439,12 +500,29 @@ func (this *NodeService) FindEnabledNode(ctx context.Context, req *pb.FindEnable
|
||||
return &pb.FindEnabledNodeResponse{Node: nil}, nil
|
||||
}
|
||||
|
||||
// 集群信息
|
||||
// 主集群信息
|
||||
clusterName, err := models.SharedNodeClusterDAO.FindNodeClusterName(tx, int64(node.ClusterId))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 从集群信息
|
||||
var secondaryPBClusters []*pb.NodeCluster
|
||||
for _, secondaryClusterId := range node.DecodeSecondaryClusterIds() {
|
||||
cluster, err := models.SharedNodeClusterDAO.FindEnabledNodeCluster(tx, secondaryClusterId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if cluster == nil {
|
||||
continue
|
||||
}
|
||||
secondaryPBClusters = append(secondaryPBClusters, &pb.NodeCluster{
|
||||
Id: int64(cluster.Id),
|
||||
IsOn: cluster.IsOn == 1,
|
||||
Name: cluster.Name,
|
||||
})
|
||||
}
|
||||
|
||||
// 认证信息
|
||||
login, err := models.SharedNodeLoginDAO.FindEnabledNodeLoginWithNodeId(tx, req.NodeId)
|
||||
if err != nil {
|
||||
@@ -542,6 +620,7 @@ func (this *NodeService) FindEnabledNode(ctx context.Context, req *pb.FindEnable
|
||||
Id: int64(node.ClusterId),
|
||||
Name: clusterName,
|
||||
},
|
||||
SecondaryNodeClusters: secondaryPBClusters,
|
||||
Login: respLogin,
|
||||
InstallStatus: installStatusResult,
|
||||
MaxCPU: types.Int32(node.MaxCPU),
|
||||
@@ -1117,7 +1196,7 @@ func (this *NodeService) FindAllEnabledNodesDNSWithNodeClusterId(ctx context.Con
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodes, err := models.SharedNodeDAO.FindAllEnabledNodesDNSWithClusterId(tx, req.NodeClusterId)
|
||||
nodes, err := models.SharedNodeDAO.FindAllEnabledNodesDNSWithClusterId(tx, req.NodeClusterId, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1155,10 +1234,11 @@ func (this *NodeService) FindAllEnabledNodesDNSWithNodeClusterId(ctx context.Con
|
||||
continue
|
||||
}
|
||||
result = append(result, &pb.NodeDNSInfo{
|
||||
Id: int64(node.Id),
|
||||
Name: node.Name,
|
||||
IpAddr: ip,
|
||||
Routes: pbRoutes,
|
||||
Id: int64(node.Id),
|
||||
Name: node.Name,
|
||||
IpAddr: ip,
|
||||
Routes: pbRoutes,
|
||||
NodeClusterId: req.NodeClusterId,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1189,7 +1269,11 @@ func (this *NodeService) FindEnabledNodeDNS(ctx context.Context, req *pb.FindEna
|
||||
return nil, err
|
||||
}
|
||||
|
||||
clusterId := int64(node.ClusterId)
|
||||
var clusterId = int64(node.ClusterId)
|
||||
if req.NodeClusterId > 0 {
|
||||
clusterId = req.NodeClusterId
|
||||
}
|
||||
|
||||
clusterDNS, err := models.SharedNodeClusterDAO.FindClusterDNSInfo(tx, clusterId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -1256,14 +1340,26 @@ func (this *NodeService) UpdateNodeDNS(ctx context.Context, req *pb.UpdateNodeDN
|
||||
return nil, errors.New("node not found")
|
||||
}
|
||||
|
||||
routeCodeMap, err := node.DNSRouteCodes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
routeCodeMap := node.DNSRouteCodes()
|
||||
if req.DnsDomainId > 0 && len(req.Routes) > 0 {
|
||||
routeCodeMap[req.DnsDomainId] = req.Routes
|
||||
var m = map[int64][]string{} // domainId => codes
|
||||
for _, route := range req.Routes {
|
||||
var pieces = strings.SplitN(route, "@", 2)
|
||||
if len(pieces) != 2 {
|
||||
continue
|
||||
}
|
||||
var code = pieces[0]
|
||||
var domainId = types.Int64(pieces[1])
|
||||
m[domainId] = append(m[domainId], code)
|
||||
}
|
||||
for domainId, codes := range m {
|
||||
routeCodeMap[domainId] = codes
|
||||
}
|
||||
} else {
|
||||
delete(routeCodeMap, req.DnsDomainId)
|
||||
}
|
||||
|
||||
|
||||
err = models.SharedNodeDAO.UpdateNodeDNS(tx, req.NodeId, routeCodeMap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -78,8 +78,19 @@ func (this *NodeClusterService) DeleteNodeCluster(ctx context.Context, req *pb.D
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if req.NodeClusterId <= 0 {
|
||||
return this.Success()
|
||||
}
|
||||
|
||||
tx := this.NullTx()
|
||||
|
||||
// 转移节点
|
||||
err = models.SharedNodeDAO.TransferPrimaryClusterNodes(tx, req.NodeClusterId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 删除集群
|
||||
err = models.SharedNodeClusterDAO.DisableNodeCluster(tx, req.NodeClusterId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -128,6 +139,7 @@ func (this *NodeClusterService) FindEnabledNodeCluster(ctx context.Context, req
|
||||
HttpFirewallPolicyId: int64(cluster.HttpFirewallPolicyId),
|
||||
DnsName: cluster.DnsName,
|
||||
DnsDomainId: int64(cluster.DnsDomainId),
|
||||
IsOn: cluster.IsOn == 1,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
@@ -208,6 +220,7 @@ func (this *NodeClusterService) FindAllEnabledNodeClusters(ctx context.Context,
|
||||
CreatedAt: int64(cluster.CreatedAt),
|
||||
UniqueId: cluster.UniqueId,
|
||||
Secret: cluster.Secret,
|
||||
IsOn: cluster.IsOn == 1,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -259,6 +272,7 @@ func (this *NodeClusterService) ListEnabledNodeClusters(ctx context.Context, req
|
||||
Secret: cluster.Secret,
|
||||
DnsName: cluster.DnsName,
|
||||
DnsDomainId: int64(cluster.DnsDomainId),
|
||||
IsOn: cluster.IsOn == 1,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -372,6 +386,7 @@ func (this *NodeClusterService) FindAllEnabledNodeClustersWithNodeGrantId(ctx co
|
||||
CreatedAt: int64(cluster.CreatedAt),
|
||||
UniqueId: cluster.UniqueId,
|
||||
Secret: cluster.Secret,
|
||||
IsOn: cluster.IsOn == 1,
|
||||
})
|
||||
}
|
||||
return &pb.FindAllEnabledNodeClustersWithNodeGrantIdResponse{NodeClusters: result}, nil
|
||||
@@ -511,6 +526,7 @@ func (this *NodeClusterService) FindAllEnabledNodeClustersWithDNSDomainId(ctx co
|
||||
Name: cluster.Name,
|
||||
DnsName: cluster.DnsName,
|
||||
DnsDomainId: int64(cluster.DnsDomainId),
|
||||
IsOn: cluster.IsOn == 1,
|
||||
})
|
||||
}
|
||||
return &pb.FindAllEnabledNodeClustersWithDNSDomainIdResponse{NodeClusters: result}, nil
|
||||
@@ -666,6 +682,7 @@ func (this *NodeClusterService) FindAllEnabledNodeClustersWithHTTPCachePolicyId(
|
||||
result = append(result, &pb.NodeCluster{
|
||||
Id: int64(cluster.Id),
|
||||
Name: cluster.Name,
|
||||
IsOn: cluster.IsOn == 1,
|
||||
})
|
||||
}
|
||||
return &pb.FindAllEnabledNodeClustersWithHTTPCachePolicyIdResponse{
|
||||
@@ -707,6 +724,7 @@ func (this *NodeClusterService) FindAllEnabledNodeClustersWithHTTPFirewallPolicy
|
||||
result = append(result, &pb.NodeCluster{
|
||||
Id: int64(cluster.Id),
|
||||
Name: cluster.Name,
|
||||
IsOn: cluster.IsOn == 1,
|
||||
})
|
||||
}
|
||||
return &pb.FindAllEnabledNodeClustersWithHTTPFirewallPolicyIdResponse{
|
||||
@@ -884,6 +902,7 @@ func (this *NodeClusterService) FindLatestNodeClusters(ctx context.Context, req
|
||||
pbClusters = append(pbClusters, &pb.NodeCluster{
|
||||
Id: int64(cluster.Id),
|
||||
Name: cluster.Name,
|
||||
IsOn: cluster.IsOn == 1,
|
||||
})
|
||||
}
|
||||
return &pb.FindLatestNodeClustersResponse{NodeClusters: pbClusters}, nil
|
||||
|
||||
@@ -61,13 +61,13 @@ func (this *ServerStatBoardService) ComposeServerStatNodeClusterBoard(ctx contex
|
||||
var result = &pb.ComposeServerStatNodeClusterBoardResponse{}
|
||||
|
||||
// 统计数字
|
||||
countActiveNodes, err := models.SharedNodeDAO.CountAllEnabledNodesMatch(tx, req.NodeClusterId, configutils.BoolStateAll, configutils.BoolStateYes, "", 0, 0)
|
||||
countActiveNodes, err := models.SharedNodeDAO.CountAllEnabledNodesMatch(tx, req.NodeClusterId, configutils.BoolStateAll, configutils.BoolStateYes, "", 0, 0, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result.CountActiveNodes = countActiveNodes
|
||||
|
||||
countInactiveNodes, err := models.SharedNodeDAO.CountAllEnabledNodesMatch(tx, req.NodeClusterId, configutils.BoolStateAll, configutils.BoolStateNo, "", 0, 0)
|
||||
countInactiveNodes, err := models.SharedNodeDAO.CountAllEnabledNodesMatch(tx, req.NodeClusterId, configutils.BoolStateAll, configutils.BoolStateNo, "", 0, 0, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -234,16 +234,17 @@ func (this *DNSTaskExecutor) doNode(taskId int64, nodeId int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if node.ClusterId == 0 {
|
||||
isOk = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// 转交给cluster统一处理
|
||||
err = dnsmodels.SharedDNSTaskDAO.CreateClusterTask(tx, int64(node.ClusterId), dnsmodels.DNSTaskTypeClusterChange)
|
||||
clusterIds, err := models.SharedNodeDAO.FindEnabledAndOnNodeClusterIds(tx, nodeId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, clusterId := range clusterIds {
|
||||
err = dnsmodels.SharedDNSTaskDAO.CreateClusterTask(tx, clusterId, dnsmodels.DNSTaskTypeClusterChange)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
isOk = true
|
||||
|
||||
@@ -287,7 +288,7 @@ func (this *DNSTaskExecutor) doCluster(taskId int64, clusterId int64) error {
|
||||
|
||||
// 当前的节点记录
|
||||
newRecordKeys := []string{}
|
||||
nodes, err := models.SharedNodeDAO.FindAllEnabledNodesDNSWithClusterId(tx, clusterId)
|
||||
nodes, err := models.SharedNodeDAO.FindAllEnabledNodesDNSWithClusterId(tx, clusterId, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user