初步实现多集群共享节点

This commit is contained in:
GoEdgeLab
2021-07-31 22:23:11 +08:00
parent 4407973626
commit 83ae3095cd
15 changed files with 468 additions and 116 deletions

View File

@@ -163,7 +163,7 @@ func (this *NodeDAO) CreateNode(tx *dbs.Tx, adminId int64, name string, clusterI
}
// UpdateNode 修改节点
func (this *NodeDAO) UpdateNode(tx *dbs.Tx, nodeId int64, name string, clusterId int64, groupId int64, regionId int64, maxCPU int32, isOn bool, maxCacheDiskCapacityJSON []byte, maxCacheMemoryCapacityJSON []byte) error {
func (this *NodeDAO) UpdateNode(tx *dbs.Tx, nodeId int64, name string, clusterId int64, secondaryClusterIds []int64, groupId int64, regionId int64, maxCPU int32, isOn bool, maxCacheDiskCapacityJSON []byte, maxCacheMemoryCapacityJSON []byte) error {
if nodeId <= 0 {
return errors.New("invalid nodeId")
}
@@ -171,6 +171,24 @@ func (this *NodeDAO) UpdateNode(tx *dbs.Tx, nodeId int64, name string, clusterId
op.Id = nodeId
op.Name = name
op.ClusterId = clusterId
// 去重
var filteredSecondaryClusterIds = []int64{}
for _, secondaryClusterId := range secondaryClusterIds {
if secondaryClusterId <= 0 {
continue
}
if lists.ContainsInt64(filteredSecondaryClusterIds, secondaryClusterId) {
continue
}
filteredSecondaryClusterIds = append(filteredSecondaryClusterIds, secondaryClusterId)
}
filteredSecondaryClusterIdsJSON, err := json.Marshal(filteredSecondaryClusterIds)
if err != nil {
return err
}
op.SecondaryClusterIds = filteredSecondaryClusterIdsJSON
op.GroupId = groupId
op.RegionId = regionId
op.LatestVersion = dbs.SQL("latestVersion+1")
@@ -182,7 +200,7 @@ func (this *NodeDAO) UpdateNode(tx *dbs.Tx, nodeId int64, name string, clusterId
if len(maxCacheMemoryCapacityJSON) > 0 {
op.MaxCacheMemoryCapacity = maxCacheMemoryCapacityJSON
}
err := this.Save(tx, op)
err = this.Save(tx, op)
if err != nil {
return err
}
@@ -212,6 +230,7 @@ func (this *NodeDAO) ListEnabledNodesMatch(tx *dbs.Tx,
keyword string,
groupId int64,
regionId int64,
includeSecondaryNodes bool,
order string,
offset int64,
size int64) (result []*Node, err error) {
@@ -223,7 +242,13 @@ func (this *NodeDAO) ListEnabledNodesMatch(tx *dbs.Tx,
// 集群
if clusterId > 0 {
query.Attr("clusterId", clusterId)
if includeSecondaryNodes {
query.Where("(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))").
Param("primaryClusterId", clusterId).
Param("primaryClusterIdString", types.String(clusterId))
} else {
query.Attr("clusterId", clusterId)
}
} else {
query.Where("clusterId IN (SELECT id FROM " + SharedNodeClusterDAO.Table + " WHERE state=1)")
}
@@ -327,12 +352,51 @@ func (this *NodeDAO) FindNodeClusterId(tx *dbs.Tx, nodeId int64) (int64, error)
return types.Int64(col), err
}
// FindEnabledAndOnNodeClusterIds 获取节点所属所有可用而且启用的集群ID
func (this *NodeDAO) FindEnabledAndOnNodeClusterIds(tx *dbs.Tx, nodeId int64) (result []int64, err error) {
one, err := this.Query(tx).
Pk(nodeId).
Result("clusterId", "secondaryClusterIds").
Find()
if one == nil {
return nil, err
}
var clusterId = int64(one.(*Node).ClusterId)
if clusterId > 0 {
result = append(result, clusterId)
}
for _, clusterId := range one.(*Node).DecodeSecondaryClusterIds() {
if lists.ContainsInt64(result, clusterId) {
continue
}
// 检查是否启用
isOn, err := SharedNodeClusterDAO.CheckNodeClusterIsOn(tx, clusterId)
if err != nil {
return nil, err
}
if !isOn {
continue
}
result = append(result, clusterId)
}
return
}
// FindAllNodeIdsMatch 匹配节点并返回节点ID
func (this *NodeDAO) FindAllNodeIdsMatch(tx *dbs.Tx, clusterId int64, isOn configutils.BoolState) (result []int64, err error) {
func (this *NodeDAO) FindAllNodeIdsMatch(tx *dbs.Tx, clusterId int64, includeSecondaryNodes bool, isOn configutils.BoolState) (result []int64, err error) {
query := this.Query(tx)
query.State(NodeStateEnabled)
if clusterId > 0 {
query.Attr("clusterId", clusterId)
if includeSecondaryNodes {
query.Where("(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))").
Param("primaryClusterId", clusterId).
Param("primaryClusterIdString", types.String(clusterId))
} else {
query.Attr("clusterId", clusterId)
}
} else {
query.Where("clusterId IN (SELECT id FROM " + SharedNodeClusterDAO.Table + " WHERE state=1)")
}
@@ -385,13 +449,20 @@ func (this *NodeDAO) CountAllEnabledNodesMatch(tx *dbs.Tx,
activeState configutils.BoolState,
keyword string,
groupId int64,
regionId int64) (int64, error) {
regionId int64,
includeSecondaryNodes bool) (int64, error) {
query := this.Query(tx)
query.State(NodeStateEnabled)
// 集群
if clusterId > 0 {
query.Attr("clusterId", clusterId)
if includeSecondaryNodes {
query.Where("(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))").
Param("primaryClusterId", clusterId).
Param("primaryClusterIdString", types.String(clusterId))
} else {
query.Attr("clusterId", clusterId)
}
} else {
query.Where("clusterId IN (SELECT id FROM " + SharedNodeClusterDAO.Table + " WHERE state=1)")
}
@@ -878,10 +949,20 @@ func (this *NodeDAO) CountAllEnabledNodesWithRegionId(tx *dbs.Tx, regionId int64
}
// FindAllEnabledNodesDNSWithClusterId 获取一个集群的节点DNS信息
func (this *NodeDAO) FindAllEnabledNodesDNSWithClusterId(tx *dbs.Tx, clusterId int64) (result []*Node, err error) {
_, err = this.Query(tx).
func (this *NodeDAO) FindAllEnabledNodesDNSWithClusterId(tx *dbs.Tx, clusterId int64, includeSecondaryNodes bool) (result []*Node, err error) {
if clusterId <= 0 {
return nil, nil
}
var query = this.Query(tx)
if includeSecondaryNodes {
query.Where("(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))").
Param("primaryClusterId", clusterId).
Param("primaryClusterIdString", types.String(clusterId))
} else {
query.Attr("clusterId", clusterId)
}
_, err = query.
State(NodeStateEnabled).
Attr("clusterId", clusterId).
Attr("isOn", true).
Attr("isUp", true).
Result("id", "name", "dnsRoutes", "isOn").
@@ -911,7 +992,7 @@ func (this *NodeDAO) FindEnabledNodeDNS(tx *dbs.Tx, nodeId int64) (*Node, error)
Pk(nodeId).
Result("id", "name", "dnsRoutes", "clusterId", "isOn").
Find()
if err != nil || one == nil {
if one == nil {
return nil, err
}
return one.(*Node), nil
@@ -1108,6 +1189,89 @@ func (this *NodeDAO) FindEnabledNodesWithIds(tx *dbs.Tx, nodeIds []int64) (resul
return
}
// DeleteNodeFromCluster 从集群中删除节点
func (this *NodeDAO) DeleteNodeFromCluster(tx *dbs.Tx, nodeId int64, clusterId int64) error {
one, err := this.Query(tx).
Pk(nodeId).
Result("clusterId", "secondaryClusterIds").
Find()
if err != nil {
return err
}
if one == nil {
return nil
}
var node = one.(*Node)
var secondaryClusterIds = []int64{}
for _, secondaryClusterId := range node.DecodeSecondaryClusterIds() {
if secondaryClusterId == clusterId {
continue
}
secondaryClusterIds = append(secondaryClusterIds, secondaryClusterId)
}
var newClusterId = int64(node.ClusterId)
if newClusterId == clusterId {
newClusterId = 0
// 选择一个从集群作为主集群
if len(secondaryClusterIds) > 0 {
newClusterId = secondaryClusterIds[0]
secondaryClusterIds = secondaryClusterIds[1:]
}
}
secondaryClusterIdsJSON, err := json.Marshal(secondaryClusterIds)
if err != nil {
return err
}
op := NewNodeOperator()
op.Id = nodeId
op.ClusterId = newClusterId
op.SecondaryClusterIds = secondaryClusterIdsJSON
return this.Save(tx, op)
}
// TransferPrimaryClusterNodes 自动转移集群下的节点
func (this *NodeDAO) TransferPrimaryClusterNodes(tx *dbs.Tx, primaryClusterId int64) error {
if primaryClusterId <= 0 {
return nil
}
ones, err := this.Query(tx).
Attr("clusterId", primaryClusterId).
Result("id", "secondaryClusterIds").
State(NodeStateEnabled).
FindAll()
if err != nil {
return err
}
for _, one := range ones {
var node = one.(*Node)
clusterIds := node.DecodeSecondaryClusterIds()
if len(clusterIds) == 0 {
continue
}
var clusterId = clusterIds[0]
var secondaryClusterIds = clusterIds[1:]
secondaryClusterIdsJSON, err := json.Marshal(secondaryClusterIds)
if err != nil {
return err
}
err = this.Query(tx).
Pk(node.Id).
Set("clusterId", clusterId).
Set("secondaryClusterIds", secondaryClusterIdsJSON).
UpdateQuickly()
if err != nil {
return err
}
}
return nil
}
// NotifyUpdate 通知更新
func (this *NodeDAO) NotifyUpdate(tx *dbs.Tx, nodeId int64) error {
clusterId, err := this.FindNodeClusterId(tx, nodeId)
@@ -1122,25 +1286,25 @@ func (this *NodeDAO) NotifyUpdate(tx *dbs.Tx, nodeId int64) error {
// NotifyDNSUpdate 通知DNS更新
func (this *NodeDAO) NotifyDNSUpdate(tx *dbs.Tx, nodeId int64) error {
clusterId, err := this.Query(tx).
Pk(nodeId).
Result("clusterId").
FindInt64Col(0) // 这里不需要加服务状态条件因为我们即使删除也要删除对应的服务的DNS解析
clusterIds, err := this.FindEnabledAndOnNodeClusterIds(tx, nodeId)
if err != nil {
return err
}
if clusterId <= 0 {
return nil
for _, clusterId := range clusterIds {
dnsInfo, err := SharedNodeClusterDAO.FindClusterDNSInfo(tx, clusterId)
if err != nil {
return err
}
if dnsInfo == nil {
continue
}
if len(dnsInfo.DnsName) == 0 || dnsInfo.DnsDomainId <= 0 {
continue
}
err = dns.SharedDNSTaskDAO.CreateNodeTask(tx, nodeId, dns.DNSTaskTypeNodeChange)
if err != nil {
return err
}
}
dnsInfo, err := SharedNodeClusterDAO.FindClusterDNSInfo(tx, clusterId)
if err != nil {
return err
}
if dnsInfo == nil {
return nil
}
if len(dnsInfo.DnsName) == 0 || dnsInfo.DnsDomainId <= 0 {
return nil
}
return dns.SharedDNSTaskDAO.CreateNodeTask(tx, nodeId, dns.DNSTaskTypeNodeChange)
return nil
}