商业版支持L2节点

This commit is contained in:
刘祥超
2022-04-04 12:08:08 +08:00
parent d884777a55
commit cd7e01c2f0
10 changed files with 305 additions and 28 deletions

View File

@@ -1,7 +1,9 @@
package models
import (
"crypto/sha256"
"encoding/json"
"fmt"
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/dns"
dbutils "github.com/TeaOSLab/EdgeAPI/internal/db/utils"
@@ -191,7 +193,7 @@ func (this *NodeDAO) CreateNode(tx *dbs.Tx, adminId int64, name string, clusterI
}
// UpdateNode 修改节点
func (this *NodeDAO) UpdateNode(tx *dbs.Tx, nodeId int64, name string, clusterId int64, secondaryClusterIds []int64, groupId int64, regionId int64, isOn bool) error {
func (this *NodeDAO) UpdateNode(tx *dbs.Tx, nodeId int64, name string, clusterId int64, secondaryClusterIds []int64, groupId int64, regionId int64, isOn bool, level int) error {
if nodeId <= 0 {
return errors.New("invalid nodeId")
}
@@ -202,7 +204,16 @@ func (this *NodeDAO) UpdateNode(tx *dbs.Tx, nodeId int64, name string, clusterId
return err
}
op := NewNodeOperator()
// 老的级别
oldLevel, err := this.Query(tx).
Pk(nodeId).
Result("level").
FindIntCol(0)
if err != nil {
return err
}
var op = NewNodeOperator()
op.Id = nodeId
op.Name = name
op.ClusterId = clusterId
@@ -228,6 +239,11 @@ func (this *NodeDAO) UpdateNode(tx *dbs.Tx, nodeId int64, name string, clusterId
op.RegionId = regionId
op.LatestVersion = dbs.SQL("latestVersion+1")
op.IsOn = isOn
if teaconst.IsPlus {
op.Level = level
}
err = this.Save(tx, op)
if err != nil {
return err
@@ -238,7 +254,7 @@ func (this *NodeDAO) UpdateNode(tx *dbs.Tx, nodeId int64, name string, clusterId
return err
}
// 通知老的集群更新
// 通知老的集群DNS更新
for _, oldClusterId := range oldClusterIds {
if oldClusterId != clusterId && !lists.ContainsInt64(secondaryClusterIds, oldClusterId) {
err = dns.SharedDNSTaskDAO.CreateClusterTask(tx, oldClusterId, dns.DNSTaskTypeClusterChange)
@@ -248,6 +264,14 @@ func (this *NodeDAO) UpdateNode(tx *dbs.Tx, nodeId int64, name string, clusterId
}
}
// 通知子级级别变更
if oldLevel > 1 || level > 1 {
err = this.NotifyLevelUpdate(tx, nodeId)
if err != nil {
return err
}
}
return this.NotifyDNSUpdate(tx, nodeId)
}
@@ -278,6 +302,7 @@ func (this *NodeDAO) ListEnabledNodesMatch(tx *dbs.Tx,
keyword string,
groupId int64,
regionId int64,
level int32,
includeSecondaryNodes bool,
order string,
offset int64,
@@ -337,6 +362,11 @@ func (this *NodeDAO) ListEnabledNodesMatch(tx *dbs.Tx,
query.Attr("regionId", regionId)
}
// 级别
if level > 0 {
query.Attr("level", level)
}
// 排序
switch order {
case "cpuAsc":
@@ -355,6 +385,8 @@ func (this *NodeDAO) ListEnabledNodesMatch(tx *dbs.Tx,
query.Asc("IF(JSON_EXTRACT(status, '$.updatedAt')>UNIX_TIMESTAMP()-120, IFNULL(JSON_EXTRACT(status, '$.trafficOutBytes'), 0), 0)")
case "trafficOutDesc":
query.Desc("IF(JSON_EXTRACT(status, '$.updatedAt')>UNIX_TIMESTAMP()-120, IFNULL(JSON_EXTRACT(status, '$.trafficOutBytes'), 0), 0)")
default:
query.Desc("level")
}
query.DescPk()
@@ -400,6 +432,37 @@ func (this *NodeDAO) FindNodeClusterId(tx *dbs.Tx, nodeId int64) (int64, error)
return types.Int64(col), err
}
// FindNodeLevel 获取节点级别
func (this *NodeDAO) FindNodeLevel(tx *dbs.Tx, nodeId int64) (int, error) {
level, err := this.Query(tx).
Pk(nodeId).
Result("level").
FindIntCol(0)
if err != nil {
return 0, err
}
if level < 1 {
level = 1
}
return level, nil
}
// FindNodeLevelInfo 获取节点级别相关信息
func (this *NodeDAO) FindNodeLevelInfo(tx *dbs.Tx, nodeId int64) (*Node, error) {
one, err := this.Query(tx).
Pk(nodeId).
Result("id", "clusterId", "secondaryClusterIds", "groupId", "level").
Find()
if err != nil || one == nil {
return nil, err
}
var node = one.(*Node)
if node.Level < 1 {
node.Level = 1
}
return node, nil
}
// FindEnabledAndOnNodeClusterIds 获取节点所属所有可用而且启用的集群ID
func (this *NodeDAO) FindEnabledAndOnNodeClusterIds(tx *dbs.Tx, nodeId int64) (result []int64, err error) {
one, err := this.Query(tx).
@@ -474,6 +537,41 @@ func (this *NodeDAO) FindEnabledNodeIdsWithClusterId(tx *dbs.Tx, clusterId int64
return result, nil
}
// FindEnabledNodesWithGroupIdAndLevel 查找当前分组下的某个级别的所有节点
func (this *NodeDAO) FindEnabledNodesWithGroupIdAndLevel(tx *dbs.Tx, groupId int64, level int) (result []*Node, err error) {
if groupId <= 0 {
return
}
_, err = this.Query(tx).
State(NodeStateEnabled).
Result("id", "clusterId", "secondaryClusterIds", "uniqueId", "secret").
Attr("isOn", true).
Attr("groupId", groupId).
Attr("level", level).
Slice(&result).
FindAll()
return
}
// FindEnabledNodesWithClusterIdAndLevel 查找当前集群下的某个级别的所有节点
func (this *NodeDAO) FindEnabledNodesWithClusterIdAndLevel(tx *dbs.Tx, clusterId int64, level int) (result []*Node, err error) {
if clusterId <= 0 {
return
}
_, err = this.Query(tx).
State(NodeStateEnabled).
Result("id", "clusterId", "secondaryClusterIds", "uniqueId", "secret").
Attr("isOn", true).
Where("(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))").
Param("primaryClusterId", clusterId).
Param("primaryClusterIdString", types.String(clusterId)).
Attr("groupId", 0). // 需要去掉已经有分组的
Attr("level", level).
Slice(&result).
FindAll()
return
}
// FindAllNodeIdsMatch 匹配节点并返回节点ID
func (this *NodeDAO) FindAllNodeIdsMatch(tx *dbs.Tx, clusterId int64, includeSecondaryNodes bool, isOn configutils.BoolState) (result []int64, err error) {
var query = this.Query(tx)
@@ -555,6 +653,7 @@ func (this *NodeDAO) CountAllEnabledNodesMatch(tx *dbs.Tx,
keyword string,
groupId int64,
regionId int64,
level int32,
includeSecondaryNodes bool) (int64, error) {
query := this.Query(tx)
query.State(NodeStateEnabled)
@@ -608,6 +707,11 @@ func (this *NodeDAO) CountAllEnabledNodesMatch(tx *dbs.Tx,
query.Attr("regionId", regionId)
}
// 级别
if level > 0 {
query.Attr("level", level)
}
return query.Count()
}
@@ -733,6 +837,10 @@ func (this *NodeDAO) ComposeNodeConfig(tx *dbs.Tx, nodeId int64, cacheMap *utils
return nil, errors.New("node not found '" + strconv.FormatInt(nodeId, 10) + "'")
}
if node.Level < 1 {
node.Level = 1
}
var config = &nodeconfigs.NodeConfig{
Id: int64(node.Id),
NodeId: node.UniqueId,
@@ -743,6 +851,8 @@ func (this *NodeDAO) ComposeNodeConfig(tx *dbs.Tx, nodeId int64, cacheMap *utils
Name: node.Name,
MaxCPU: types.Int32(node.MaxCPU),
RegionId: int64(node.RegionId),
Level: types.Int32(node.Level),
GroupId: int64(node.GroupId),
}
// API节点IP
@@ -968,7 +1078,7 @@ func (this *NodeDAO) ComposeNodeConfig(tx *dbs.Tx, nodeId int64, cacheMap *utils
config.OCSPVersion = ocspVersion
// 初始化扩展配置
err = this.composeExtConfig(tx, config, cacheMap)
err = this.composeExtConfig(tx, config, clusterIds, cacheMap)
if err != nil {
return nil, err
}
@@ -1509,19 +1619,99 @@ func (this *NodeDAO) TransferPrimaryClusterNodes(tx *dbs.Tx, primaryClusterId in
return nil
}
// NotifyUpdate 通知更新
// FindParentNodeConfigs 查找父级节点配置
func (this *NodeDAO) FindParentNodeConfigs(tx *dbs.Tx, nodeId int64, groupId int64, clusterIds []int64, level int) (result map[int64][]*nodeconfigs.ParentNodeConfig, err error) {
result = map[int64][]*nodeconfigs.ParentNodeConfig{} // clusterId => []*ParentNodeConfig
// 当前分组的L2
var parentNodes []*Node
if groupId > 0 {
parentNodes, err = this.FindEnabledNodesWithGroupIdAndLevel(tx, groupId, level+1)
if err != nil {
return nil, err
}
}
// 当前集群的L2
if len(parentNodes) == 0 {
for _, clusterId := range clusterIds {
clusterParentNodes, err := this.FindEnabledNodesWithClusterIdAndLevel(tx, clusterId, level+1)
if err != nil {
return nil, err
}
if len(clusterParentNodes) > 0 {
parentNodes = append(parentNodes, clusterParentNodes...)
}
}
}
if len(parentNodes) > 0 {
for _, node := range parentNodes {
addrs, err := SharedNodeIPAddressDAO.FindAllEnabledAddressesWithNode(tx, int64(node.Id), nodeconfigs.NodeRoleNode)
if err != nil {
return nil, err
}
var addrStrings = []string{}
for _, addr := range addrs {
if addr.IsOn {
addrStrings = append(addrStrings, addr.DNSIP())
}
}
// 没有地址就跳过
if len(addrStrings) == 0 {
continue
}
var secretHash = fmt.Sprintf("%x", sha256.Sum256([]byte(node.UniqueId+"@"+node.Secret)))
for _, clusterId := range node.AllClusterIds() {
parentNodeConfigs, _ := result[clusterId]
parentNodeConfigs = append(parentNodeConfigs, &nodeconfigs.ParentNodeConfig{
Id: int64(node.Id),
Addrs: addrStrings,
SecretHash: secretHash,
})
result[clusterId] = parentNodeConfigs
}
}
}
return
}
// NotifyUpdate 通知节点相关更新
func (this *NodeDAO) NotifyUpdate(tx *dbs.Tx, nodeId int64) error {
// 这里只需要通知单个集群即可,因为节点是公用的,更新一个就相当于更新了所有
clusterId, err := this.FindNodeClusterId(tx, nodeId)
if err != nil {
return err
}
if clusterId > 0 {
return SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, 0, NodeTaskTypeConfigChanged, 0)
err = SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, 0, NodeTaskTypeConfigChanged, 0)
if err != nil {
return err
}
return nil
}
// NotifyLevelUpdate 通知节点级别更新
func (this *NodeDAO) NotifyLevelUpdate(tx *dbs.Tx, nodeId int64) error {
// 这里只需要通知单个集群即可,因为节点是公用的,更新一个就相当于更新了所有
clusterIds, err := this.FindEnabledAndOnNodeClusterIds(tx, nodeId)
if err != nil {
return err
}
for _, clusterId := range clusterIds {
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, 0, NodeTaskTypeNodeLevelChanged)
if err != nil {
return err
}
}
return nil
}
// NotifyDNSUpdate 通知DNS更新
// NotifyDNSUpdate 通知节点相关DNS更新
func (this *NodeDAO) NotifyDNSUpdate(tx *dbs.Tx, nodeId int64) error {
clusterIds, err := this.FindEnabledAndOnNodeClusterIds(tx, nodeId)
if err != nil {