mirror of
https://github.com/TeaOSLab/EdgeAPI.git
synced 2025-11-03 23:20:26 +08:00
2143 lines
55 KiB
Go
2143 lines
55 KiB
Go
package models
|
||
|
||
import (
|
||
"crypto/sha256"
|
||
"encoding/json"
|
||
"fmt"
|
||
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
|
||
"github.com/TeaOSLab/EdgeAPI/internal/db/models/dns"
|
||
dbutils "github.com/TeaOSLab/EdgeAPI/internal/db/utils"
|
||
"github.com/TeaOSLab/EdgeAPI/internal/errors"
|
||
"github.com/TeaOSLab/EdgeAPI/internal/utils"
|
||
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
|
||
"github.com/TeaOSLab/EdgeAPI/internal/utils/sizes"
|
||
"github.com/TeaOSLab/EdgeAPI/internal/utils/ttlcache"
|
||
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
|
||
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/ddosconfigs"
|
||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
|
||
"github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs"
|
||
_ "github.com/go-sql-driver/mysql"
|
||
"github.com/iwind/TeaGo/Tea"
|
||
"github.com/iwind/TeaGo/dbs"
|
||
"github.com/iwind/TeaGo/lists"
|
||
"github.com/iwind/TeaGo/maps"
|
||
"github.com/iwind/TeaGo/rands"
|
||
"github.com/iwind/TeaGo/types"
|
||
timeutil "github.com/iwind/TeaGo/utils/time"
|
||
"strconv"
|
||
"strings"
|
||
"time"
|
||
)
|
||
|
||
const (
|
||
NodeStateEnabled = 1 // 已启用
|
||
NodeStateDisabled = 0 // 已禁用
|
||
)
|
||
|
||
type NodeDAO dbs.DAO
|
||
|
||
func NewNodeDAO() *NodeDAO {
|
||
return dbs.NewDAO(&NodeDAO{
|
||
DAOObject: dbs.DAOObject{
|
||
DB: Tea.Env,
|
||
Table: "edgeNodes",
|
||
Model: new(Node),
|
||
PkName: "id",
|
||
},
|
||
}).(*NodeDAO)
|
||
}
|
||
|
||
var SharedNodeDAO *NodeDAO
|
||
|
||
func init() {
|
||
dbs.OnReady(func() {
|
||
SharedNodeDAO = NewNodeDAO()
|
||
})
|
||
}
|
||
|
||
// EnableNode 启用条目
|
||
func (this *NodeDAO) EnableNode(tx *dbs.Tx, id uint32) (rowsAffected int64, err error) {
|
||
return this.Query(tx).
|
||
Pk(id).
|
||
Set("state", NodeStateEnabled).
|
||
Update()
|
||
}
|
||
|
||
// DisableNode 禁用条目
|
||
func (this *NodeDAO) DisableNode(tx *dbs.Tx, nodeId int64) (err error) {
|
||
// 删除缓存
|
||
uniqueId, err := this.Query(tx).
|
||
Pk(nodeId).
|
||
Result("uniqueId").
|
||
FindStringCol("")
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if len(uniqueId) > 0 {
|
||
ttlcache.SharedCache.Delete("nodeId@uniqueId@" + uniqueId)
|
||
}
|
||
|
||
_, err = this.Query(tx).
|
||
Pk(nodeId).
|
||
Set("state", NodeStateDisabled).
|
||
Update()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
err = this.NotifyUpdate(tx, nodeId)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
err = this.NotifyDNSUpdate(tx, nodeId)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 删除运行日志
|
||
return SharedNodeLogDAO.DeleteNodeLogs(tx, nodeconfigs.NodeRoleNode, nodeId)
|
||
}
|
||
|
||
// FindEnabledNode 查找启用中的条目
|
||
func (this *NodeDAO) FindEnabledNode(tx *dbs.Tx, id int64) (*Node, error) {
|
||
result, err := this.Query(tx).
|
||
Pk(id).
|
||
Attr("state", NodeStateEnabled).
|
||
Find()
|
||
if result == nil {
|
||
return nil, err
|
||
}
|
||
return result.(*Node), err
|
||
}
|
||
|
||
// FindEnabledBasicNode 获取节点的基本信息
|
||
func (this *NodeDAO) FindEnabledBasicNode(tx *dbs.Tx, nodeId int64) (*Node, error) {
|
||
one, err := this.Query(tx).
|
||
State(NodeStateEnabled).
|
||
Pk(nodeId).
|
||
Result("id", "name", "clusterId", "groupId", "isOn", "isUp").
|
||
Find()
|
||
if one == nil {
|
||
return nil, err
|
||
}
|
||
return one.(*Node), nil
|
||
}
|
||
|
||
// FindNodeName 根据主键查找名称
|
||
func (this *NodeDAO) FindNodeName(tx *dbs.Tx, id int64) (string, error) {
|
||
name, err := this.Query(tx).
|
||
Pk(id).
|
||
Result("name").
|
||
FindCol("")
|
||
return name.(string), err
|
||
}
|
||
|
||
// CreateNode 创建节点
|
||
func (this *NodeDAO) CreateNode(tx *dbs.Tx, adminId int64, name string, clusterId int64, groupId int64, regionId int64) (nodeId int64, err error) {
|
||
err = this.CheckNodesLimit(tx)
|
||
if err != nil {
|
||
return
|
||
}
|
||
|
||
uniqueId, err := this.GenUniqueId(tx)
|
||
if err != nil {
|
||
return 0, err
|
||
}
|
||
|
||
secret := rands.String(32)
|
||
|
||
// 保存API Token
|
||
err = SharedApiTokenDAO.CreateAPIToken(tx, uniqueId, secret, nodeconfigs.NodeRoleNode)
|
||
if err != nil {
|
||
return
|
||
}
|
||
|
||
var op = NewNodeOperator()
|
||
op.AdminId = adminId
|
||
op.Name = name
|
||
op.UniqueId = uniqueId
|
||
op.Secret = secret
|
||
op.ClusterId = clusterId
|
||
op.GroupId = groupId
|
||
op.RegionId = regionId
|
||
op.IsOn = 1
|
||
op.EnableIPLists = 1
|
||
op.State = NodeStateEnabled
|
||
err = this.Save(tx, op)
|
||
if err != nil {
|
||
return 0, err
|
||
}
|
||
|
||
// 通知节点更新
|
||
nodeId = types.Int64(op.Id)
|
||
err = this.NotifyUpdate(tx, nodeId)
|
||
if err != nil {
|
||
return 0, err
|
||
}
|
||
|
||
// 通知DNS更新
|
||
err = this.NotifyDNSUpdate(tx, nodeId)
|
||
if err != nil {
|
||
return 0, err
|
||
}
|
||
|
||
return nodeId, nil
|
||
}
|
||
|
||
// UpdateNode 修改节点
|
||
func (this *NodeDAO) UpdateNode(tx *dbs.Tx, nodeId int64, name string, clusterId int64, secondaryClusterIds []int64, groupId int64, regionId int64, isOn bool, level int, lnAddrs []string, enableIPLists bool) error {
|
||
if nodeId <= 0 {
|
||
return errors.New("invalid nodeId")
|
||
}
|
||
|
||
// 老的集群
|
||
oldClusterIds, err := this.FindEnabledNodeClusterIds(tx, nodeId)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 老的级别
|
||
oldLevel, err := this.Query(tx).
|
||
Pk(nodeId).
|
||
Result("level").
|
||
FindIntCol(0)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
var op = NewNodeOperator()
|
||
op.Id = nodeId
|
||
op.Name = name
|
||
op.ClusterId = clusterId
|
||
|
||
// 去重
|
||
var filteredSecondaryClusterIds = []int64{}
|
||
for _, secondaryClusterId := range secondaryClusterIds {
|
||
if secondaryClusterId <= 0 {
|
||
continue
|
||
}
|
||
if lists.ContainsInt64(filteredSecondaryClusterIds, secondaryClusterId) {
|
||
continue
|
||
}
|
||
filteredSecondaryClusterIds = append(filteredSecondaryClusterIds, secondaryClusterId)
|
||
}
|
||
filteredSecondaryClusterIdsJSON, err := json.Marshal(filteredSecondaryClusterIds)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
op.SecondaryClusterIds = filteredSecondaryClusterIdsJSON
|
||
|
||
op.GroupId = groupId
|
||
op.RegionId = regionId
|
||
op.LatestVersion = dbs.SQL("latestVersion+1")
|
||
op.IsOn = isOn
|
||
|
||
if teaconst.IsPlus {
|
||
op.Level = level
|
||
|
||
if lnAddrs == nil {
|
||
lnAddrs = []string{}
|
||
}
|
||
lnAddrsJSON, err := json.Marshal(lnAddrs)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
op.LnAddrs = lnAddrsJSON
|
||
}
|
||
|
||
op.EnableIPLists = enableIPLists
|
||
|
||
err = this.Save(tx, op)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
err = this.NotifyUpdate(tx, nodeId)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 通知老的集群DNS更新
|
||
for _, oldClusterId := range oldClusterIds {
|
||
if oldClusterId != clusterId && !lists.ContainsInt64(secondaryClusterIds, oldClusterId) {
|
||
err = dns.SharedDNSTaskDAO.CreateClusterTask(tx, oldClusterId, dns.DNSTaskTypeClusterChange)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
}
|
||
}
|
||
|
||
// 通知子级级别变更
|
||
if oldLevel > 1 || level > 1 {
|
||
err = this.NotifyLevelUpdate(tx, nodeId)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
return this.NotifyDNSUpdate(tx, nodeId)
|
||
}
|
||
|
||
// CountAllEnabledNodes 计算所有节点数量
|
||
func (this *NodeDAO) CountAllEnabledNodes(tx *dbs.Tx) (int64, error) {
|
||
return this.Query(tx).
|
||
State(NodeStateEnabled).
|
||
Where("clusterId IN (SELECT id FROM "+SharedNodeClusterDAO.Table+" WHERE state=:clusterState)").
|
||
Param("clusterState", NodeClusterStateEnabled).
|
||
Count()
|
||
}
|
||
|
||
// CountAllEnabledOfflineNodes 计算所有离线节点数量
|
||
func (this *NodeDAO) CountAllEnabledOfflineNodes(tx *dbs.Tx) (int64, error) {
|
||
return this.Query(tx).
|
||
State(NodeStateEnabled).
|
||
Where("clusterId IN (SELECT id FROM "+SharedNodeClusterDAO.Table+" WHERE state=:clusterState)").
|
||
Param("clusterState", NodeClusterStateEnabled).
|
||
Where("(status IS NULL OR NOT JSON_EXTRACT(status, '$.isActive') OR UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>60)").
|
||
Count()
|
||
}
|
||
|
||
// ListEnabledNodesMatch 列出单页节点
|
||
func (this *NodeDAO) ListEnabledNodesMatch(tx *dbs.Tx,
|
||
clusterId int64,
|
||
installState configutils.BoolState,
|
||
activeState configutils.BoolState,
|
||
keyword string,
|
||
groupId int64,
|
||
regionId int64,
|
||
level int32,
|
||
includeSecondaryNodes bool,
|
||
order string,
|
||
offset int64,
|
||
size int64) (result []*Node, err error) {
|
||
query := this.Query(tx).
|
||
Result(this.Table + ".*"). // must have table name for table joins below
|
||
State(NodeStateEnabled).
|
||
Offset(offset).
|
||
Limit(size).
|
||
Slice(&result)
|
||
|
||
// 集群
|
||
if clusterId > 0 {
|
||
if includeSecondaryNodes {
|
||
query.Where("("+this.Table+".clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))").
|
||
Param("primaryClusterId", clusterId).
|
||
Param("primaryClusterIdString", types.String(clusterId))
|
||
} else {
|
||
query.Attr(this.Table+".clusterId", clusterId)
|
||
}
|
||
} else {
|
||
query.Where(this.Table + ".clusterId IN (SELECT id FROM " + SharedNodeClusterDAO.Table + " WHERE state=1)")
|
||
}
|
||
|
||
// 安装状态
|
||
switch installState {
|
||
case configutils.BoolStateAll:
|
||
// 所有
|
||
case configutils.BoolStateYes:
|
||
query.Attr("isInstalled", 1)
|
||
case configutils.BoolStateNo:
|
||
query.Attr("isInstalled", 0)
|
||
}
|
||
|
||
// 在线状态
|
||
switch activeState {
|
||
case configutils.BoolStateAll:
|
||
// 所有
|
||
case configutils.BoolStateYes:
|
||
query.Where("isActive AND UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')<=60")
|
||
case configutils.BoolStateNo:
|
||
query.Where("(status IS NULL OR NOT JSON_EXTRACT(status, '$.isActive') OR UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>60)")
|
||
}
|
||
|
||
// 关键词
|
||
if len(keyword) > 0 {
|
||
query.Where("(name LIKE :keyword OR JSON_EXTRACT(status,'$.hostname') LIKE :keyword OR id IN (SELECT nodeId FROM "+SharedNodeIPAddressDAO.Table+" WHERE ip LIKE :keyword))").
|
||
Param("keyword", dbutils.QuoteLike(keyword))
|
||
}
|
||
|
||
// 分组
|
||
if groupId > 0 {
|
||
query.Attr("groupId", groupId)
|
||
} else if groupId < 0 {
|
||
query.Attr("groupId", 0)
|
||
}
|
||
|
||
// 区域
|
||
if regionId > 0 {
|
||
query.Attr("regionId", regionId)
|
||
}
|
||
|
||
// 级别
|
||
if level > 0 {
|
||
query.Attr("level", level)
|
||
}
|
||
|
||
// 排序
|
||
var minute = timeutil.FormatTime("YmdHi", time.Now().Unix()-60)
|
||
var nodeValueTable = SharedNodeValueDAO.Table
|
||
var valueItem = ""
|
||
var valueField = ""
|
||
var isAsc = false
|
||
var ifNullValue int64 = 0
|
||
switch order {
|
||
case "cpuAsc":
|
||
valueItem = "cpu"
|
||
valueField = "usage"
|
||
isAsc = true
|
||
ifNullValue = 100
|
||
case "cpuDesc":
|
||
valueItem = "cpu"
|
||
valueField = "usage"
|
||
isAsc = false
|
||
ifNullValue = -1
|
||
case "memoryAsc":
|
||
valueItem = "memory"
|
||
valueField = "usage"
|
||
isAsc = true
|
||
ifNullValue = 100
|
||
case "memoryDesc":
|
||
valueItem = "memory"
|
||
valueField = "usage"
|
||
isAsc = false
|
||
ifNullValue = -1
|
||
case "trafficInAsc":
|
||
valueItem = "trafficIn"
|
||
valueField = "total"
|
||
isAsc = true
|
||
ifNullValue = 60 * sizes.G
|
||
case "trafficInDesc":
|
||
valueItem = "trafficIn"
|
||
valueField = "total"
|
||
isAsc = false
|
||
ifNullValue = -1
|
||
case "trafficOutAsc":
|
||
valueItem = "trafficOut"
|
||
valueField = "total"
|
||
isAsc = true
|
||
ifNullValue = sizes.G
|
||
case "trafficOutDesc":
|
||
valueItem = "trafficOut"
|
||
valueField = "total"
|
||
isAsc = false
|
||
ifNullValue = -1
|
||
case "loadAsc":
|
||
valueItem = "load"
|
||
valueField = "load1m"
|
||
isAsc = true
|
||
ifNullValue = 1000
|
||
case "loadDesc":
|
||
valueItem = "load"
|
||
valueField = "load1m"
|
||
isAsc = false
|
||
ifNullValue = -1
|
||
default:
|
||
query.Desc("level")
|
||
}
|
||
|
||
if len(valueItem) > 0 {
|
||
query.Join(SharedNodeValueDAO, dbs.QueryJoinLeft, this.Table+".id="+nodeValueTable+".nodeId AND "+nodeValueTable+".role='"+nodeconfigs.NodeRoleNode+"' AND "+nodeValueTable+".item='"+valueItem+"' "+" AND "+nodeValueTable+".minute=:minute")
|
||
query.Param("minute", minute)
|
||
var ifNullSQL = "IFNULL(CAST(JSON_EXTRACT(value, '$." + valueField + "') AS DECIMAL (20, 6)), " + types.String(ifNullValue) + ")"
|
||
if isAsc {
|
||
query.Asc(ifNullSQL)
|
||
} else {
|
||
query.Desc(ifNullSQL)
|
||
}
|
||
}
|
||
|
||
query.DescPk()
|
||
|
||
_, err = query.FindAll()
|
||
return
|
||
}
|
||
|
||
// FindEnabledNodeWithUniqueIdAndSecret 根据节点ID和密钥查询节点
|
||
func (this *NodeDAO) FindEnabledNodeWithUniqueIdAndSecret(tx *dbs.Tx, uniqueId string, secret string) (*Node, error) {
|
||
one, err := this.Query(tx).
|
||
Attr("uniqueId", uniqueId).
|
||
Attr("secret", secret).
|
||
State(NodeStateEnabled).
|
||
Find()
|
||
|
||
if one != nil {
|
||
return one.(*Node), err
|
||
}
|
||
|
||
return nil, err
|
||
}
|
||
|
||
// FindEnabledNodeWithUniqueId 根据节点ID获取节点
|
||
func (this *NodeDAO) FindEnabledNodeWithUniqueId(tx *dbs.Tx, uniqueId string) (*Node, error) {
|
||
one, err := this.Query(tx).
|
||
Attr("uniqueId", uniqueId).
|
||
State(NodeStateEnabled).
|
||
Find()
|
||
|
||
if one != nil {
|
||
return one.(*Node), err
|
||
}
|
||
|
||
return nil, err
|
||
}
|
||
|
||
// FindNodeClusterId 获取节点集群ID
|
||
func (this *NodeDAO) FindNodeClusterId(tx *dbs.Tx, nodeId int64) (int64, error) {
|
||
col, err := this.Query(tx).
|
||
Pk(nodeId).
|
||
Result("clusterId").
|
||
FindCol(0)
|
||
return types.Int64(col), err
|
||
}
|
||
|
||
// FindNodeLevel 获取节点级别
|
||
func (this *NodeDAO) FindNodeLevel(tx *dbs.Tx, nodeId int64) (int, error) {
|
||
level, err := this.Query(tx).
|
||
Pk(nodeId).
|
||
Result("level").
|
||
FindIntCol(0)
|
||
if err != nil {
|
||
return 0, err
|
||
}
|
||
if level < 1 {
|
||
level = 1
|
||
}
|
||
return level, nil
|
||
}
|
||
|
||
// FindNodeLevelInfo 获取节点级别相关信息
|
||
func (this *NodeDAO) FindNodeLevelInfo(tx *dbs.Tx, nodeId int64) (*Node, error) {
|
||
one, err := this.Query(tx).
|
||
Pk(nodeId).
|
||
Result("id", "clusterId", "secondaryClusterIds", "groupId", "level").
|
||
Find()
|
||
if err != nil || one == nil {
|
||
return nil, err
|
||
}
|
||
var node = one.(*Node)
|
||
if node.Level < 1 {
|
||
node.Level = 1
|
||
}
|
||
return node, nil
|
||
}
|
||
|
||
// FindEnabledAndOnNodeClusterIds 获取节点所属所有可用而且启用的集群ID
|
||
func (this *NodeDAO) FindEnabledAndOnNodeClusterIds(tx *dbs.Tx, nodeId int64) (result []int64, err error) {
|
||
one, err := this.Query(tx).
|
||
Pk(nodeId).
|
||
Result("clusterId", "secondaryClusterIds").
|
||
Find()
|
||
if one == nil {
|
||
return nil, err
|
||
}
|
||
var clusterId = int64(one.(*Node).ClusterId)
|
||
if clusterId > 0 {
|
||
result = append(result, clusterId)
|
||
}
|
||
|
||
for _, clusterId := range one.(*Node).DecodeSecondaryClusterIds() {
|
||
if lists.ContainsInt64(result, clusterId) {
|
||
continue
|
||
}
|
||
|
||
// 检查是否启用
|
||
isOn, err := SharedNodeClusterDAO.CheckNodeClusterIsOn(tx, clusterId)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if !isOn {
|
||
continue
|
||
}
|
||
|
||
result = append(result, clusterId)
|
||
}
|
||
return
|
||
}
|
||
|
||
// FindEnabledNodeClusterIds 获取节点所属所有可用的集群ID
|
||
func (this *NodeDAO) FindEnabledNodeClusterIds(tx *dbs.Tx, nodeId int64) (result []int64, err error) {
|
||
one, err := this.Query(tx).
|
||
Pk(nodeId).
|
||
Result("clusterId", "secondaryClusterIds").
|
||
Find()
|
||
if one == nil {
|
||
return nil, err
|
||
}
|
||
var clusterId = int64(one.(*Node).ClusterId)
|
||
if clusterId > 0 {
|
||
result = append(result, clusterId)
|
||
}
|
||
|
||
for _, secondaryClusterId := range one.(*Node).DecodeSecondaryClusterIds() {
|
||
if lists.ContainsInt64(result, secondaryClusterId) {
|
||
continue
|
||
}
|
||
|
||
result = append(result, secondaryClusterId)
|
||
}
|
||
return
|
||
}
|
||
|
||
// FindEnabledNodeIdsWithClusterId 查找某个集群下的所有节点IDs
|
||
func (this *NodeDAO) FindEnabledNodeIdsWithClusterId(tx *dbs.Tx, clusterId int64) ([]int64, error) {
|
||
ones, err := this.Query(tx).
|
||
Attr("clusterId", clusterId).
|
||
State(NodeClusterStateEnabled).
|
||
ResultPk().
|
||
FindAll()
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
var result = []int64{}
|
||
for _, one := range ones {
|
||
result = append(result, int64(one.(*Node).Id))
|
||
}
|
||
return result, nil
|
||
}
|
||
|
||
// FindEnabledNodesWithGroupIdAndLevel 查找当前分组下的某个级别的所有节点
|
||
func (this *NodeDAO) FindEnabledNodesWithGroupIdAndLevel(tx *dbs.Tx, groupId int64, level int) (result []*Node, err error) {
|
||
if groupId <= 0 {
|
||
return
|
||
}
|
||
_, err = this.Query(tx).
|
||
State(NodeStateEnabled).
|
||
Result("id", "clusterId", "secondaryClusterIds", "uniqueId", "secret", "lnAddrs").
|
||
Attr("isOn", true).
|
||
Attr("groupId", groupId).
|
||
Attr("level", level).
|
||
Slice(&result).
|
||
FindAll()
|
||
return
|
||
}
|
||
|
||
// FindEnabledNodesWithClusterIdAndLevel 查找当前集群下的某个级别的所有节点
|
||
func (this *NodeDAO) FindEnabledNodesWithClusterIdAndLevel(tx *dbs.Tx, clusterId int64, level int) (result []*Node, err error) {
|
||
if clusterId <= 0 {
|
||
return
|
||
}
|
||
_, err = this.Query(tx).
|
||
State(NodeStateEnabled).
|
||
Result("id", "clusterId", "secondaryClusterIds", "uniqueId", "secret").
|
||
Attr("isOn", true).
|
||
Where("(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))").
|
||
Param("primaryClusterId", clusterId).
|
||
Param("primaryClusterIdString", types.String(clusterId)).
|
||
Attr("groupId", 0). // 需要去掉已经有分组的
|
||
Attr("level", level).
|
||
Slice(&result).
|
||
FindAll()
|
||
return
|
||
}
|
||
|
||
// FindAllNodeIdsMatch 匹配节点并返回节点ID
|
||
func (this *NodeDAO) FindAllNodeIdsMatch(tx *dbs.Tx, clusterId int64, includeSecondaryNodes bool, isOn configutils.BoolState) (result []int64, err error) {
|
||
var query = this.Query(tx)
|
||
query.State(NodeStateEnabled)
|
||
if clusterId > 0 {
|
||
if includeSecondaryNodes {
|
||
query.Where("(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))").
|
||
Param("primaryClusterId", clusterId).
|
||
Param("primaryClusterIdString", types.String(clusterId))
|
||
} else {
|
||
query.Attr("clusterId", clusterId)
|
||
}
|
||
} else {
|
||
query.Where("clusterId IN (SELECT id FROM " + SharedNodeClusterDAO.Table + " WHERE state=1)")
|
||
}
|
||
if isOn == configutils.BoolStateYes {
|
||
query.Attr("isOn", true)
|
||
} else if isOn == configutils.BoolStateNo {
|
||
query.Attr("isOn", false)
|
||
}
|
||
query.Result("id")
|
||
ones, _, err := query.FindOnes()
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
for _, one := range ones {
|
||
result = append(result, one.GetInt64("id"))
|
||
}
|
||
return
|
||
}
|
||
|
||
// FindAllEnabledNodesWithClusterId 获取一个集群的所有节点
|
||
func (this *NodeDAO) FindAllEnabledNodesWithClusterId(tx *dbs.Tx, clusterId int64, includeSecondary bool) (result []*Node, err error) {
|
||
var query = this.Query(tx)
|
||
|
||
if includeSecondary {
|
||
query.Where("(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))").
|
||
Param("primaryClusterId", clusterId).
|
||
Param("primaryClusterIdString", types.String(clusterId))
|
||
} else {
|
||
query.Attr("clusterId", clusterId)
|
||
}
|
||
|
||
_, err = query.
|
||
State(NodeStateEnabled).
|
||
DescPk().
|
||
Slice(&result).
|
||
FindAll()
|
||
|
||
return
|
||
}
|
||
|
||
// FindEnabledAndOnNodeIdsWithClusterId 查找某个集群下的所有启用的节点IDs
|
||
func (this *NodeDAO) FindEnabledAndOnNodeIdsWithClusterId(tx *dbs.Tx, clusterId int64, includeSecondary bool) ([]int64, error) {
|
||
var query = this.Query(tx)
|
||
if includeSecondary {
|
||
query.Where("(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))").
|
||
Param("primaryClusterId", clusterId).
|
||
Param("primaryClusterIdString", types.String(clusterId))
|
||
} else {
|
||
query.Attr("clusterId", clusterId)
|
||
}
|
||
ones, err := query.
|
||
Attr("isOn", true).
|
||
State(NodeStateEnabled).
|
||
ResultPk().
|
||
FindAll()
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
var result = []int64{}
|
||
for _, one := range ones {
|
||
result = append(result, int64(one.(*Node).Id))
|
||
}
|
||
return result, nil
|
||
}
|
||
|
||
// FindAllEnabledNodeIdsWithClusterId 获取一个集群的所有节点Ids
|
||
func (this *NodeDAO) FindAllEnabledNodeIdsWithClusterId(tx *dbs.Tx, clusterId int64) (result []int64, err error) {
|
||
ones, err := this.Query(tx).
|
||
ResultPk().
|
||
State(NodeStateEnabled).
|
||
Attr("clusterId", clusterId).
|
||
FindAll()
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
for _, one := range ones {
|
||
result = append(result, int64(one.(*Node).Id))
|
||
}
|
||
return
|
||
}
|
||
|
||
// FindAllInactiveNodesWithClusterId 取得一个集群离线的节点
|
||
func (this *NodeDAO) FindAllInactiveNodesWithClusterId(tx *dbs.Tx, clusterId int64) (result []*Node, err error) {
|
||
_, err = this.Query(tx).
|
||
State(NodeStateEnabled).
|
||
Result("id", "name", "status").
|
||
Attr("clusterId", clusterId).
|
||
Attr("isOn", true). // 只监控启用的节点
|
||
Attr("isInstalled", true). // 只监控已经安装的节点
|
||
Attr("isActive", false).
|
||
Result("id", "name").
|
||
Slice(&result).
|
||
FindAll()
|
||
return
|
||
}
|
||
|
||
// CountAllEnabledNodesMatch 计算节点数量
|
||
func (this *NodeDAO) CountAllEnabledNodesMatch(tx *dbs.Tx,
|
||
clusterId int64,
|
||
installState configutils.BoolState,
|
||
activeState configutils.BoolState,
|
||
keyword string,
|
||
groupId int64,
|
||
regionId int64,
|
||
level int32,
|
||
includeSecondaryNodes bool) (int64, error) {
|
||
query := this.Query(tx)
|
||
query.State(NodeStateEnabled)
|
||
|
||
// 集群
|
||
if clusterId > 0 {
|
||
if includeSecondaryNodes {
|
||
query.Where("(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))").
|
||
Param("primaryClusterId", clusterId).
|
||
Param("primaryClusterIdString", types.String(clusterId))
|
||
} else {
|
||
query.Attr("clusterId", clusterId)
|
||
}
|
||
} else {
|
||
query.Where("clusterId IN (SELECT id FROM " + SharedNodeClusterDAO.Table + " WHERE state=1)")
|
||
}
|
||
|
||
// 安装状态
|
||
switch installState {
|
||
case configutils.BoolStateAll:
|
||
// 所有
|
||
case configutils.BoolStateYes:
|
||
query.Attr("isInstalled", 1)
|
||
case configutils.BoolStateNo:
|
||
query.Attr("isInstalled", 0)
|
||
}
|
||
|
||
// 在线状态
|
||
switch activeState {
|
||
case configutils.BoolStateAll:
|
||
// 所有
|
||
case configutils.BoolStateYes:
|
||
query.Where("isActive AND UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')<=60")
|
||
case configutils.BoolStateNo:
|
||
query.Where("(status IS NULL OR NOT JSON_EXTRACT(status, '$.isActive') OR UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>60)")
|
||
}
|
||
|
||
// 关键词
|
||
if len(keyword) > 0 {
|
||
query.Where("(name LIKE :keyword OR JSON_EXTRACT(status,'$.hostname') LIKE :keyword OR id IN (SELECT nodeId FROM "+SharedNodeIPAddressDAO.Table+" WHERE ip LIKE :keyword))").
|
||
Param("keyword", dbutils.QuoteLike(keyword))
|
||
}
|
||
|
||
// 分组
|
||
if groupId > 0 {
|
||
query.Attr("groupId", groupId)
|
||
} else if groupId < 0 {
|
||
query.Attr("groupId", 0)
|
||
}
|
||
|
||
// 区域
|
||
if regionId > 0 {
|
||
query.Attr("regionId", regionId)
|
||
}
|
||
|
||
// 级别
|
||
if level > 0 {
|
||
query.Attr("level", level)
|
||
}
|
||
|
||
return query.Count()
|
||
}
|
||
|
||
// UpdateNodeStatus 更改节点状态
|
||
func (this *NodeDAO) UpdateNodeStatus(tx *dbs.Tx, nodeId int64, nodeStatus *nodeconfigs.NodeStatus) error {
|
||
if nodeStatus == nil {
|
||
return nil
|
||
}
|
||
|
||
nodeStatusJSON, err := json.Marshal(nodeStatus)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
_, err = this.Query(tx).
|
||
Pk(nodeId).
|
||
Set("isActive", true).
|
||
Set("status", nodeStatusJSON).
|
||
Update()
|
||
return err
|
||
}
|
||
|
||
// FindNodeStatus 获取节点状态
|
||
func (this *NodeDAO) FindNodeStatus(tx *dbs.Tx, nodeId int64) (*nodeconfigs.NodeStatus, error) {
|
||
statusJSONString, err := this.Query(tx).
|
||
Pk(nodeId).
|
||
Result("status").
|
||
FindStringCol("")
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if len(statusJSONString) == 0 {
|
||
return nil, nil
|
||
}
|
||
|
||
status := &nodeconfigs.NodeStatus{}
|
||
err = json.Unmarshal([]byte(statusJSONString), status)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return status, nil
|
||
}
|
||
|
||
// UpdateNodeIsActive 更改节点在线状态
|
||
func (this *NodeDAO) UpdateNodeIsActive(tx *dbs.Tx, nodeId int64, isActive bool) error {
|
||
b := "true"
|
||
if !isActive {
|
||
b = "false"
|
||
}
|
||
_, err := this.Query(tx).
|
||
Pk(nodeId).
|
||
Where("status IS NOT NULL").
|
||
Set("status", dbs.SQL("JSON_SET(status, '$.isActive', "+b+")")).
|
||
Update()
|
||
return err
|
||
}
|
||
|
||
// UpdateNodeIsInstalled 设置节点安装状态
|
||
func (this *NodeDAO) UpdateNodeIsInstalled(tx *dbs.Tx, nodeId int64, isInstalled bool) error {
|
||
_, err := this.Query(tx).
|
||
Pk(nodeId).
|
||
Set("isInstalled", isInstalled).
|
||
Set("installStatus", "null"). // 重置安装状态
|
||
Update()
|
||
return err
|
||
}
|
||
|
||
// FindNodeInstallStatus 查询节点的安装状态
|
||
func (this *NodeDAO) FindNodeInstallStatus(tx *dbs.Tx, nodeId int64) (*NodeInstallStatus, error) {
|
||
node, err := this.Query(tx).
|
||
Pk(nodeId).
|
||
Result("installStatus", "isInstalled").
|
||
Find()
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if node == nil {
|
||
return nil, errors.New("not found")
|
||
}
|
||
|
||
installStatus := node.(*Node).InstallStatus
|
||
isInstalled := node.(*Node).IsInstalled
|
||
if len(installStatus) == 0 {
|
||
return NewNodeInstallStatus(), nil
|
||
}
|
||
|
||
status := &NodeInstallStatus{}
|
||
err = json.Unmarshal(installStatus, status)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if isInstalled {
|
||
status.IsFinished = true
|
||
status.IsOk = true
|
||
}
|
||
return status, nil
|
||
}
|
||
|
||
// UpdateNodeInstallStatus 修改节点的安装状态
|
||
func (this *NodeDAO) UpdateNodeInstallStatus(tx *dbs.Tx, nodeId int64, status *NodeInstallStatus) error {
|
||
if status == nil {
|
||
_, err := this.Query(tx).
|
||
Pk(nodeId).
|
||
Set("installStatus", "null").
|
||
Update()
|
||
return err
|
||
}
|
||
|
||
data, err := json.Marshal(status)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
_, err = this.Query(tx).
|
||
Pk(nodeId).
|
||
Set("installStatus", string(data)).
|
||
Update()
|
||
return err
|
||
}
|
||
|
||
// ComposeNodeConfig 组合配置
|
||
// TODO 提升运行速度
|
||
func (this *NodeDAO) ComposeNodeConfig(tx *dbs.Tx, nodeId int64, cacheMap *utils.CacheMap) (*nodeconfigs.NodeConfig, error) {
|
||
if cacheMap == nil {
|
||
cacheMap = utils.NewCacheMap()
|
||
}
|
||
|
||
node, err := this.FindEnabledNode(tx, nodeId)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if node == nil {
|
||
return nil, errors.New("node not found '" + strconv.FormatInt(nodeId, 10) + "'")
|
||
}
|
||
|
||
if node.Level < 1 {
|
||
node.Level = 1
|
||
}
|
||
|
||
var config = &nodeconfigs.NodeConfig{
|
||
Id: int64(node.Id),
|
||
NodeId: node.UniqueId,
|
||
Secret: node.Secret,
|
||
IsOn: node.IsOn,
|
||
Servers: nil,
|
||
Version: int64(node.Version),
|
||
Name: node.Name,
|
||
MaxCPU: types.Int32(node.MaxCPU),
|
||
RegionId: int64(node.RegionId),
|
||
Level: types.Int32(node.Level),
|
||
GroupId: int64(node.GroupId),
|
||
EnableIPLists: node.EnableIPLists,
|
||
APINodeAddrs: node.DecodeAPINodeAddrs(),
|
||
}
|
||
|
||
// API节点IP
|
||
apiNodeIPs, err := SharedAPINodeDAO.FindAllEnabledAPIAccessIPs(tx, cacheMap)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
config.AllowedIPs = append(config.AllowedIPs, apiNodeIPs...)
|
||
|
||
// 获取所有的服务
|
||
servers, err := SharedServerDAO.FindAllEnabledServersWithNode(tx, int64(node.Id))
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
for _, server := range servers {
|
||
serverConfig, err := SharedServerDAO.ComposeServerConfig(tx, server, cacheMap, true, false)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if serverConfig == nil {
|
||
continue
|
||
}
|
||
config.Servers = append(config.Servers, serverConfig)
|
||
|
||
if server.IsOn && server.SupportCNAME == 1 {
|
||
config.SupportCNAME = true
|
||
}
|
||
}
|
||
|
||
// 全局设置
|
||
// TODO 根据用户的不同读取不同的全局设置
|
||
var settingCacheKey = "SharedSysSettingDAO:" + systemconfigs.SettingCodeServerGlobalConfig
|
||
settingJSONCache, ok := cacheMap.Get(settingCacheKey)
|
||
var settingJSON = []byte{}
|
||
if ok {
|
||
settingJSON = settingJSONCache.([]byte)
|
||
} else {
|
||
settingJSON, err = SharedSysSettingDAO.ReadSetting(tx, systemconfigs.SettingCodeServerGlobalConfig)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
cacheMap.Put(settingCacheKey, settingJSON)
|
||
}
|
||
|
||
if len(settingJSON) > 0 {
|
||
globalConfig := &serverconfigs.GlobalConfig{}
|
||
err = json.Unmarshal(settingJSON, globalConfig)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
config.GlobalConfig = globalConfig
|
||
}
|
||
|
||
var primaryClusterId = int64(node.ClusterId)
|
||
var clusterIds = []int64{primaryClusterId}
|
||
clusterIds = append(clusterIds, node.DecodeSecondaryClusterIds()...)
|
||
var clusterIndex = 0
|
||
config.WebPImagePolicies = map[int64]*nodeconfigs.WebPImagePolicy{}
|
||
config.UAMPolicies = map[int64]*nodeconfigs.UAMPolicy{}
|
||
var allowIPMaps = map[string]bool{}
|
||
for _, clusterId := range clusterIds {
|
||
nodeCluster, err := SharedNodeClusterDAO.FindClusterBasicInfo(tx, clusterId, cacheMap)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if nodeCluster == nil || !nodeCluster.IsOn {
|
||
continue
|
||
}
|
||
|
||
// 节点IP地址
|
||
nodeIPAddresses, err := SharedNodeIPAddressDAO.FindAllAccessibleIPAddressesWithClusterId(tx, nodeconfigs.NodeRoleNode, clusterId, cacheMap)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
for _, address := range nodeIPAddresses {
|
||
var ip = address.Ip
|
||
_, ok := allowIPMaps[ip]
|
||
if !ok {
|
||
allowIPMaps[ip] = true
|
||
config.AllowedIPs = append(config.AllowedIPs, ip)
|
||
}
|
||
}
|
||
|
||
// 防火墙
|
||
var httpFirewallPolicyId = int64(nodeCluster.HttpFirewallPolicyId)
|
||
if httpFirewallPolicyId > 0 {
|
||
firewallPolicy, err := SharedHTTPFirewallPolicyDAO.ComposeFirewallPolicy(tx, httpFirewallPolicyId, cacheMap)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if firewallPolicy != nil {
|
||
config.HTTPFirewallPolicies = append(config.HTTPFirewallPolicies, firewallPolicy)
|
||
}
|
||
}
|
||
|
||
// 缓存策略
|
||
var httpCachePolicyId = int64(nodeCluster.CachePolicyId)
|
||
if httpCachePolicyId > 0 {
|
||
cachePolicy, err := SharedHTTPCachePolicyDAO.ComposeCachePolicy(tx, httpCachePolicyId, cacheMap)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if cachePolicy != nil {
|
||
config.HTTPCachePolicies = append(config.HTTPCachePolicies, cachePolicy)
|
||
}
|
||
}
|
||
|
||
// 时区
|
||
if len(config.TimeZone) == 0 {
|
||
var timeZone = nodeCluster.TimeZone
|
||
if len(timeZone) > 0 {
|
||
config.TimeZone = timeZone
|
||
}
|
||
}
|
||
|
||
// 时钟
|
||
if config.Clock == nil {
|
||
config.Clock = nodeCluster.DecodeClock()
|
||
}
|
||
|
||
// 全局配置
|
||
if config.GlobalServerConfig == nil {
|
||
config.GlobalServerConfig = nodeCluster.DecodeGlobalServerConfig()
|
||
}
|
||
|
||
// 最大线程数、TCP连接数
|
||
if clusterIndex == 0 {
|
||
config.MaxThreads = int(nodeCluster.NodeMaxThreads)
|
||
config.DDoSProtection = nodeCluster.DecodeDDoSProtection()
|
||
config.AutoOpenPorts = nodeCluster.AutoOpenPorts == 1
|
||
}
|
||
|
||
// webp
|
||
if IsNotNull(nodeCluster.Webp) {
|
||
var webpPolicy = &nodeconfigs.WebPImagePolicy{}
|
||
err = json.Unmarshal(nodeCluster.Webp, webpPolicy)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
config.WebPImagePolicies[clusterId] = webpPolicy
|
||
}
|
||
|
||
// UAM
|
||
if IsNotNull(nodeCluster.Uam) {
|
||
var uamPolicy = &nodeconfigs.UAMPolicy{}
|
||
err = json.Unmarshal(nodeCluster.Uam, uamPolicy)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
config.UAMPolicies[clusterId] = uamPolicy
|
||
}
|
||
|
||
// 自动安装nftables
|
||
if clusterIndex == 0 {
|
||
config.AutoInstallNftables = nodeCluster.AutoInstallNftables
|
||
}
|
||
|
||
clusterIndex++
|
||
}
|
||
|
||
// 缓存最大容量设置
|
||
if len(node.MaxCacheDiskCapacity) > 0 {
|
||
capacity := &shared.SizeCapacity{}
|
||
err = json.Unmarshal(node.MaxCacheDiskCapacity, capacity)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if capacity.Count > 0 {
|
||
config.MaxCacheDiskCapacity = capacity
|
||
}
|
||
}
|
||
|
||
if len(node.MaxCacheMemoryCapacity) > 0 {
|
||
capacity := &shared.SizeCapacity{}
|
||
err = json.Unmarshal(node.MaxCacheMemoryCapacity, capacity)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if capacity.Count > 0 {
|
||
config.MaxCacheMemoryCapacity = capacity
|
||
}
|
||
}
|
||
|
||
config.CacheDiskDir = node.CacheDiskDir
|
||
config.CacheDiskSubDirs = node.DecodeCacheDiskSubDirs()
|
||
|
||
// TOA
|
||
toaConfig, err := SharedNodeClusterDAO.FindClusterTOAConfig(tx, primaryClusterId, cacheMap)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
config.TOA = toaConfig
|
||
|
||
// 系统服务
|
||
services, err := SharedNodeClusterDAO.FindNodeClusterSystemServices(tx, primaryClusterId, cacheMap)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if len(services) > 0 {
|
||
config.SystemServices = services
|
||
}
|
||
|
||
// DNS Resolver
|
||
if IsNotNull(node.DnsResolver) {
|
||
var dnsResolverConfig = nodeconfigs.DefaultDNSResolverConfig()
|
||
err = json.Unmarshal(node.DnsResolver, dnsResolverConfig)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
config.DNSResolver = dnsResolverConfig
|
||
}
|
||
|
||
// 防火墙动作
|
||
actions, err := SharedNodeClusterFirewallActionDAO.FindAllEnabledFirewallActions(tx, primaryClusterId, cacheMap)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
for _, action := range actions {
|
||
actionConfig, err := SharedNodeClusterFirewallActionDAO.ComposeFirewallActionConfig(tx, action)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if actionConfig != nil {
|
||
config.FirewallActions = append(config.FirewallActions, actionConfig)
|
||
}
|
||
}
|
||
|
||
// 集群指标
|
||
metricItemIds, err := SharedNodeClusterMetricItemDAO.FindAllClusterItemIds(tx, int64(node.ClusterId), cacheMap)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
var metricItems = []*serverconfigs.MetricItemConfig{}
|
||
for _, itemId := range metricItemIds {
|
||
itemConfig, err := SharedMetricItemDAO.ComposeItemConfig(tx, itemId)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if itemConfig != nil {
|
||
metricItems = append(metricItems, itemConfig)
|
||
}
|
||
}
|
||
|
||
// 公用指标
|
||
publicMetricItems, err := SharedMetricItemDAO.FindAllPublicItems(tx, serverconfigs.MetricItemCategoryHTTP, cacheMap)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
for _, item := range publicMetricItems {
|
||
itemConfig := SharedMetricItemDAO.ComposeItemConfigWithItem(item)
|
||
if itemConfig != nil && !lists.ContainsInt64(metricItemIds, itemConfig.Id) {
|
||
metricItems = append(metricItems, itemConfig)
|
||
}
|
||
}
|
||
|
||
config.MetricItems = metricItems
|
||
|
||
// 产品
|
||
adminUIConfig, err := SharedSysSettingDAO.ReadAdminUIConfig(tx, cacheMap)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if adminUIConfig != nil {
|
||
config.ProductConfig = &nodeconfigs.ProductConfig{
|
||
Name: adminUIConfig.ProductName,
|
||
Version: adminUIConfig.Version,
|
||
}
|
||
}
|
||
|
||
// OCSP
|
||
ocspVersion, err := SharedSSLCertDAO.FindCertOCSPLatestVersion(tx)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
config.OCSPVersion = ocspVersion
|
||
|
||
// DDOS Protection
|
||
var ddosProtection = node.DecodeDDoSProtection()
|
||
if ddosProtection != nil {
|
||
if config.DDoSProtection == nil {
|
||
config.DDoSProtection = ddosProtection
|
||
} else {
|
||
config.DDoSProtection.Merge(ddosProtection)
|
||
}
|
||
}
|
||
|
||
// 初始化扩展配置
|
||
err = this.composeExtConfig(tx, config, clusterIds, cacheMap)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
return config, nil
|
||
}
|
||
|
||
// UpdateNodeConnectedAPINodes 修改当前连接的API节点
|
||
func (this *NodeDAO) UpdateNodeConnectedAPINodes(tx *dbs.Tx, nodeId int64, apiNodeIds []int64) error {
|
||
if nodeId <= 0 {
|
||
return errors.New("invalid nodeId")
|
||
}
|
||
|
||
var op = NewNodeOperator()
|
||
op.Id = nodeId
|
||
|
||
if len(apiNodeIds) > 0 {
|
||
apiNodeIdsJSON, err := json.Marshal(apiNodeIds)
|
||
if err != nil {
|
||
return errors.Wrap(err)
|
||
}
|
||
op.ConnectedAPINodes = apiNodeIdsJSON
|
||
} else {
|
||
op.ConnectedAPINodes = "[]"
|
||
}
|
||
err := this.Save(tx, op)
|
||
return err
|
||
}
|
||
|
||
// FindEnabledNodeIdWithUniqueId 根据UniqueId获取ID
|
||
func (this *NodeDAO) FindEnabledNodeIdWithUniqueId(tx *dbs.Tx, uniqueId string) (int64, error) {
|
||
var cacheKey = "nodeId@uniqueId@" + uniqueId
|
||
var item = ttlcache.SharedCache.Read(cacheKey)
|
||
if item != nil {
|
||
return types.Int64(item.Value), nil
|
||
}
|
||
|
||
one, err := this.Query(tx).
|
||
State(NodeStateEnabled).
|
||
Attr("uniqueId", uniqueId).
|
||
Result("id", "clusterId").
|
||
Find()
|
||
if err != nil || one == nil {
|
||
return 0, err
|
||
}
|
||
|
||
// 检查集群
|
||
var node = one.(*Node)
|
||
var clusterId = int64(node.ClusterId)
|
||
if clusterId <= 0 {
|
||
return 0, nil
|
||
}
|
||
|
||
isOn, err := SharedNodeClusterDAO.CheckNodeClusterIsOn(tx, clusterId)
|
||
if err != nil {
|
||
return 0, err
|
||
}
|
||
if !isOn {
|
||
return 0, nil
|
||
}
|
||
|
||
ttlcache.SharedCache.Write(cacheKey, int64(node.Id), time.Now().Unix()+60)
|
||
|
||
return int64(node.Id), nil
|
||
}
|
||
|
||
// CountAllEnabledNodesWithGrantId 计算使用某个认证的节点数量
|
||
func (this *NodeDAO) CountAllEnabledNodesWithGrantId(tx *dbs.Tx, grantId int64) (int64, error) {
|
||
return this.Query(tx).
|
||
State(NodeStateEnabled).
|
||
Where("id IN (SELECT nodeId FROM edgeNodeLogins WHERE type='ssh' AND JSON_CONTAINS(params, :grantParam))").
|
||
Param("grantParam", string(maps.Map{"grantId": grantId}.AsJSON())).
|
||
Where("clusterId IN (SELECT id FROM edgeNodeClusters WHERE state=1)").
|
||
Count()
|
||
}
|
||
|
||
// FindAllEnabledNodesWithGrantId 查找使用某个认证的所有节点
|
||
func (this *NodeDAO) FindAllEnabledNodesWithGrantId(tx *dbs.Tx, grantId int64) (result []*Node, err error) {
|
||
_, err = this.Query(tx).
|
||
State(NodeStateEnabled).
|
||
Where("id IN (SELECT nodeId FROM edgeNodeLogins WHERE type='ssh' AND JSON_CONTAINS(params, :grantParam))").
|
||
Param("grantParam", string(maps.Map{"grantId": grantId}.AsJSON())).
|
||
Where("clusterId IN (SELECT id FROM edgeNodeClusters WHERE state=1)").
|
||
Slice(&result).
|
||
DescPk().
|
||
FindAll()
|
||
return
|
||
}
|
||
|
||
// CountAllNotInstalledNodesWithClusterId 计算未安装的节点数量
|
||
func (this *NodeDAO) CountAllNotInstalledNodesWithClusterId(tx *dbs.Tx, clusterId int64) (int64, error) {
|
||
return this.Query(tx).
|
||
State(NodeStateEnabled).
|
||
Attr("clusterId", clusterId).
|
||
Attr("isInstalled", false).
|
||
Count()
|
||
}
|
||
|
||
// FindAllNotInstalledNodesWithClusterId 查找所有未安装的节点
|
||
func (this *NodeDAO) FindAllNotInstalledNodesWithClusterId(tx *dbs.Tx, clusterId int64) (result []*Node, err error) {
|
||
_, err = this.Query(tx).
|
||
State(NodeStateEnabled).
|
||
Attr("clusterId", clusterId).
|
||
Attr("isInstalled", false).
|
||
DescPk().
|
||
Slice(&result).
|
||
FindAll()
|
||
return
|
||
}
|
||
|
||
// CountAllLowerVersionNodesWithClusterId 计算单个集群中所有低于某个版本的节点数量
|
||
func (this *NodeDAO) CountAllLowerVersionNodesWithClusterId(tx *dbs.Tx, clusterId int64, os string, arch string, version string) (int64, error) {
|
||
return this.Query(tx).
|
||
State(NodeStateEnabled).
|
||
Attr("clusterId", clusterId).
|
||
Where("status IS NOT NULL").
|
||
Where("JSON_EXTRACT(status, '$.os')=:os").
|
||
Where("JSON_EXTRACT(status, '$.arch')=:arch").
|
||
Where("(JSON_EXTRACT(status, '$.buildVersionCode') IS NULL OR JSON_EXTRACT(status, '$.buildVersionCode')<:version)").
|
||
Param("os", os).
|
||
Param("arch", arch).
|
||
Param("version", utils.VersionToLong(version)).
|
||
Count()
|
||
}
|
||
|
||
// FindAllLowerVersionNodesWithClusterId 查找单个集群中所有低于某个版本的节点
|
||
func (this *NodeDAO) FindAllLowerVersionNodesWithClusterId(tx *dbs.Tx, clusterId int64, os string, arch string, version string) (result []*Node, err error) {
|
||
_, err = this.Query(tx).
|
||
State(NodeStateEnabled).
|
||
Attr("clusterId", clusterId).
|
||
Where("status IS NOT NULL").
|
||
Where("JSON_EXTRACT(status, '$.os')=:os").
|
||
Where("JSON_EXTRACT(status, '$.arch')=:arch").
|
||
Where("(JSON_EXTRACT(status, '$.buildVersionCode') IS NULL OR JSON_EXTRACT(status, '$.buildVersionCode')<:version)").
|
||
Param("os", os).
|
||
Param("arch", arch).
|
||
Param("version", utils.VersionToLong(version)).
|
||
DescPk().
|
||
Slice(&result).
|
||
FindAll()
|
||
return
|
||
}
|
||
|
||
// CountAllLowerVersionNodes 计算所有集群中低于某个版本的节点数量
|
||
func (this *NodeDAO) CountAllLowerVersionNodes(tx *dbs.Tx, version string) (int64, error) {
|
||
return this.Query(tx).
|
||
State(NodeStateEnabled).
|
||
Where("clusterId IN (SELECT id FROM "+SharedNodeClusterDAO.Table+" WHERE state=1)").
|
||
Where("status IS NOT NULL").
|
||
Where("(JSON_EXTRACT(status, '$.buildVersionCode') IS NULL OR JSON_EXTRACT(status, '$.buildVersionCode')<:version)").
|
||
Param("version", utils.VersionToLong(version)).
|
||
Count()
|
||
}
|
||
|
||
// CountAllEnabledNodesWithGroupId 查找某个节点分组下的所有节点数量
|
||
func (this *NodeDAO) CountAllEnabledNodesWithGroupId(tx *dbs.Tx, groupId int64) (int64, error) {
|
||
return this.Query(tx).
|
||
State(NodeStateEnabled).
|
||
Attr("groupId", groupId).
|
||
Where("clusterId IN (SELECT id FROM " + SharedNodeClusterDAO.Table + " WHERE state=1)").
|
||
Count()
|
||
}
|
||
|
||
// CountAllEnabledNodesWithRegionId 查找某个节点区域下的所有节点数量
|
||
func (this *NodeDAO) CountAllEnabledNodesWithRegionId(tx *dbs.Tx, regionId int64) (int64, error) {
|
||
return this.Query(tx).
|
||
State(NodeStateEnabled).
|
||
Attr("regionId", regionId).
|
||
Where("clusterId IN (SELECT id FROM " + SharedNodeClusterDAO.Table + " WHERE state=1)").
|
||
Count()
|
||
}
|
||
|
||
// CountAllNodeRegionInfo 查找所有节点区域信息数量
|
||
func (this *NodeDAO) CountAllNodeRegionInfo(tx *dbs.Tx, regionId int64) (int64, error) {
|
||
var query = this.Query(tx).
|
||
State(NodeStateEnabled).
|
||
Where("clusterId IN (SELECT id FROM " + SharedNodeClusterDAO.Table + " WHERE state=1)")
|
||
if regionId > 0 {
|
||
query.Attr("regionId", regionId)
|
||
}
|
||
return query.Count()
|
||
}
|
||
|
||
// ListNodeRegionInfo 列出节点区域信息
|
||
func (this *NodeDAO) ListNodeRegionInfo(tx *dbs.Tx, regionId int64, offset int64, size int64) (result []*Node, err error) {
|
||
var query = this.Query(tx).
|
||
Result("id", "name", "clusterId", "regionId").
|
||
State(NodeStateEnabled).
|
||
Where("clusterId IN (SELECT id FROM " + SharedNodeClusterDAO.Table + " WHERE state=1)").
|
||
Asc("IF(regionId=0, 0, 1)"). // 按照 regionId 排序是为了让没有设置区域的节点排在最上面
|
||
DescPk().
|
||
Offset(offset).
|
||
Limit(size).
|
||
Slice(&result)
|
||
if regionId > 0 {
|
||
query.Attr("regionId", regionId)
|
||
}
|
||
_, err = query.FindAll()
|
||
return
|
||
}
|
||
|
||
// UpdateNodeRegionId 修改节点所在区域
|
||
func (this *NodeDAO) UpdateNodeRegionId(tx *dbs.Tx, nodeId int64, regionId int64) error {
|
||
// 这里允许 regionId 为 0
|
||
err := this.Query(tx).
|
||
Pk(nodeId).
|
||
Set("regionId", regionId).
|
||
UpdateQuickly()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
return this.NotifyUpdate(tx, nodeId)
|
||
}
|
||
|
||
// FindAllEnabledNodesDNSWithClusterId 获取一个集群的节点DNS信息
|
||
func (this *NodeDAO) FindAllEnabledNodesDNSWithClusterId(tx *dbs.Tx, clusterId int64, includeSecondaryNodes bool, includingLnNodes bool) (result []*Node, err error) {
|
||
if clusterId <= 0 {
|
||
return nil, nil
|
||
}
|
||
var query = this.Query(tx)
|
||
if includeSecondaryNodes {
|
||
query.Where("(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))").
|
||
Param("primaryClusterId", clusterId).
|
||
Param("primaryClusterIdString", types.String(clusterId))
|
||
} else {
|
||
query.Attr("clusterId", clusterId)
|
||
}
|
||
if !includingLnNodes {
|
||
query.Lte("level", 1)
|
||
}
|
||
_, err = query.
|
||
State(NodeStateEnabled).
|
||
Attr("isOn", true).
|
||
Attr("isUp", true).
|
||
Result("id", "name", "dnsRoutes", "isOn").
|
||
DescPk().
|
||
Slice(&result).
|
||
FindAll()
|
||
return
|
||
}
|
||
|
||
// CountAllEnabledNodesDNSWithClusterId 计算一个集群的节点DNS数量
|
||
func (this *NodeDAO) CountAllEnabledNodesDNSWithClusterId(tx *dbs.Tx, clusterId int64) (result int64, err error) {
|
||
return this.Query(tx).
|
||
State(NodeStateEnabled).
|
||
Attr("clusterId", clusterId).
|
||
Attr("isOn", true).
|
||
Attr("isUp", true).
|
||
Result("id", "name", "dnsRoutes", "isOn").
|
||
DescPk().
|
||
Slice(&result).
|
||
Count()
|
||
}
|
||
|
||
// FindEnabledNodeDNS 获取单个节点的DNS信息
|
||
func (this *NodeDAO) FindEnabledNodeDNS(tx *dbs.Tx, nodeId int64) (*Node, error) {
|
||
one, err := this.Query(tx).
|
||
State(NodeStateEnabled).
|
||
Pk(nodeId).
|
||
Result("id", "name", "dnsRoutes", "clusterId", "isOn").
|
||
Find()
|
||
if one == nil {
|
||
return nil, err
|
||
}
|
||
return one.(*Node), nil
|
||
}
|
||
|
||
// FindStatelessNodeDNS 获取单个节点的DNS信息,无论什么状态
|
||
func (this *NodeDAO) FindStatelessNodeDNS(tx *dbs.Tx, nodeId int64) (*Node, error) {
|
||
one, err := this.Query(tx).
|
||
Pk(nodeId).
|
||
Result("id", "name", "dnsRoutes", "clusterId", "isOn", "state").
|
||
Find()
|
||
if err != nil || one == nil {
|
||
return nil, err
|
||
}
|
||
return one.(*Node), nil
|
||
}
|
||
|
||
// UpdateNodeDNS 修改节点的DNS信息
|
||
func (this *NodeDAO) UpdateNodeDNS(tx *dbs.Tx, nodeId int64, routes map[int64][]string) error {
|
||
if nodeId <= 0 {
|
||
return errors.New("invalid nodeId")
|
||
}
|
||
if routes == nil {
|
||
routes = map[int64][]string{}
|
||
}
|
||
routesJSON, err := json.Marshal(routes)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
var op = NewNodeOperator()
|
||
op.Id = nodeId
|
||
op.DnsRoutes = routesJSON
|
||
err = this.Save(tx, op)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
err = this.NotifyUpdate(tx, nodeId)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
err = this.NotifyDNSUpdate(tx, nodeId)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// FindNodeDNSResolver 查找域名DNS Resolver
|
||
func (this *NodeDAO) FindNodeDNSResolver(tx *dbs.Tx, nodeId int64) (*nodeconfigs.DNSResolverConfig, error) {
|
||
configJSON, err := this.Query(tx).
|
||
Pk(nodeId).
|
||
Result("dnsResolver").
|
||
FindJSONCol()
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if IsNull(configJSON) {
|
||
return nodeconfigs.DefaultDNSResolverConfig(), nil
|
||
}
|
||
|
||
var config = nodeconfigs.DefaultDNSResolverConfig()
|
||
err = json.Unmarshal(configJSON, config)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return config, nil
|
||
}
|
||
|
||
// UpdateNodeDNSResolver 修改域名DNS Resolver
|
||
func (this *NodeDAO) UpdateNodeDNSResolver(tx *dbs.Tx, nodeId int64, dnsResolverConfig *nodeconfigs.DNSResolverConfig) error {
|
||
if nodeId <= 0 {
|
||
return ErrNotFound
|
||
}
|
||
|
||
configJSON, err := json.Marshal(dnsResolverConfig)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
var op = NewNodeOperator()
|
||
op.Id = nodeId
|
||
op.DnsResolver = configJSON
|
||
err = this.Save(tx, op)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
return this.NotifyUpdate(tx, nodeId)
|
||
}
|
||
|
||
// UpdateNodeSystem 设置系统信息
|
||
func (this *NodeDAO) UpdateNodeSystem(tx *dbs.Tx, nodeId int64, maxCPU int32) error {
|
||
if nodeId <= 0 {
|
||
return errors.New("invalid nodeId")
|
||
}
|
||
var op = NewNodeOperator()
|
||
op.Id = nodeId
|
||
op.MaxCPU = maxCPU
|
||
err := this.Save(tx, op)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return this.NotifyUpdate(tx, nodeId)
|
||
}
|
||
|
||
// UpdateNodeCache 设置缓存相关
|
||
func (this *NodeDAO) UpdateNodeCache(tx *dbs.Tx, nodeId int64, maxCacheDiskCapacityJSON []byte, maxCacheMemoryCapacityJSON []byte, cacheDiskDir string, cacheDiskSubDirs []*serverconfigs.CacheDir) error {
|
||
if nodeId <= 0 {
|
||
return errors.New("invalid nodeId")
|
||
}
|
||
var op = NewNodeOperator()
|
||
op.Id = nodeId
|
||
if len(maxCacheDiskCapacityJSON) > 0 {
|
||
op.MaxCacheDiskCapacity = maxCacheDiskCapacityJSON
|
||
}
|
||
if len(maxCacheMemoryCapacityJSON) > 0 {
|
||
op.MaxCacheMemoryCapacity = maxCacheMemoryCapacityJSON
|
||
}
|
||
op.CacheDiskDir = cacheDiskDir
|
||
|
||
if cacheDiskSubDirs == nil {
|
||
cacheDiskSubDirs = []*serverconfigs.CacheDir{}
|
||
}
|
||
cacheDiskSubDirsJSON, err := json.Marshal(cacheDiskSubDirs)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
op.CacheDiskSubDirs = cacheDiskSubDirsJSON
|
||
|
||
err = this.Save(tx, op)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return this.NotifyUpdate(tx, nodeId)
|
||
}
|
||
|
||
// UpdateNodeUpCount 计算节点上线|下线状态
|
||
func (this *NodeDAO) UpdateNodeUpCount(tx *dbs.Tx, nodeId int64, isUp bool, maxUp int, maxDown int) (changed bool, err error) {
|
||
if nodeId <= 0 {
|
||
return false, errors.New("invalid nodeId")
|
||
}
|
||
one, err := this.Query(tx).
|
||
Pk(nodeId).
|
||
Result("isUp", "countUp", "countDown").
|
||
Find()
|
||
if err != nil {
|
||
return false, err
|
||
}
|
||
if one == nil {
|
||
return false, nil
|
||
}
|
||
oldIsUp := one.(*Node).IsUp
|
||
|
||
// 如果新老状态一致,则不做任何事情
|
||
if oldIsUp == isUp {
|
||
return false, nil
|
||
}
|
||
|
||
countUp := int(one.(*Node).CountUp)
|
||
countDown := int(one.(*Node).CountDown)
|
||
|
||
var op = NewNodeOperator()
|
||
op.Id = nodeId
|
||
|
||
if isUp {
|
||
countUp++
|
||
countDown = 0
|
||
|
||
if countUp >= maxUp {
|
||
changed = true
|
||
op.IsUp = true
|
||
}
|
||
} else {
|
||
countDown++
|
||
countUp = 0
|
||
|
||
if countDown >= maxDown {
|
||
changed = true
|
||
op.IsUp = false
|
||
}
|
||
}
|
||
|
||
op.CountUp = countUp
|
||
op.CountDown = countDown
|
||
err = this.Save(tx, op)
|
||
if err != nil {
|
||
return false, err
|
||
}
|
||
|
||
if changed {
|
||
err = this.NotifyDNSUpdate(tx, nodeId)
|
||
if err != nil {
|
||
return true, err
|
||
}
|
||
}
|
||
|
||
return
|
||
}
|
||
|
||
// UpdateNodeUp 设置节点上下线状态
|
||
func (this *NodeDAO) UpdateNodeUp(tx *dbs.Tx, nodeId int64, isUp bool) error {
|
||
if nodeId <= 0 {
|
||
return errors.New("invalid nodeId")
|
||
}
|
||
|
||
var op = NewNodeOperator()
|
||
op.Id = nodeId
|
||
op.IsUp = isUp
|
||
op.CountUp = 0
|
||
op.CountDown = 0
|
||
err := this.Save(tx, op)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// TODO 只有前后状态不一致的时候才需要更新DNS
|
||
|
||
return this.NotifyDNSUpdate(tx, nodeId)
|
||
}
|
||
|
||
// UpdateNodeActive 修改节点活跃状态
|
||
func (this *NodeDAO) UpdateNodeActive(tx *dbs.Tx, nodeId int64, isActive bool) error {
|
||
if nodeId <= 0 {
|
||
return errors.New("invalid nodeId")
|
||
}
|
||
_, err := this.Query(tx).
|
||
Pk(nodeId).
|
||
Set("isActive", isActive).
|
||
Set("inactiveNotifiedAt", 0).
|
||
Update()
|
||
return err
|
||
}
|
||
|
||
// UpdateNodeInactiveNotifiedAt 修改节点的离线通知时间
|
||
func (this *NodeDAO) UpdateNodeInactiveNotifiedAt(tx *dbs.Tx, nodeId int64, inactiveAt int64) error {
|
||
if nodeId <= 0 {
|
||
return errors.New("invalid nodeId")
|
||
}
|
||
_, err := this.Query(tx).
|
||
Pk(nodeId).
|
||
Set("inactiveNotifiedAt", inactiveAt).
|
||
Update()
|
||
return err
|
||
}
|
||
|
||
// FindNodeInactiveNotifiedAt 读取上次的节点离线通知时间
|
||
func (this *NodeDAO) FindNodeInactiveNotifiedAt(tx *dbs.Tx, nodeId int64) (int64, error) {
|
||
return this.Query(tx).
|
||
Pk(nodeId).
|
||
Result("inactiveNotifiedAt").
|
||
FindInt64Col(0)
|
||
}
|
||
|
||
// FindNodeActive 检查节点活跃状态
|
||
func (this *NodeDAO) FindNodeActive(tx *dbs.Tx, nodeId int64) (bool, error) {
|
||
isActive, err := this.Query(tx).
|
||
Pk(nodeId).
|
||
Result("isActive").
|
||
FindIntCol(0)
|
||
if err != nil {
|
||
return false, err
|
||
}
|
||
return isActive == 1, nil
|
||
}
|
||
|
||
// FindNodeVersion 查找节点的版本号
|
||
func (this *NodeDAO) FindNodeVersion(tx *dbs.Tx, nodeId int64) (int64, error) {
|
||
return this.Query(tx).
|
||
Pk(nodeId).
|
||
Result("version").
|
||
FindInt64Col(0)
|
||
}
|
||
|
||
// GenUniqueId 生成唯一ID
|
||
func (this *NodeDAO) GenUniqueId(tx *dbs.Tx) (string, error) {
|
||
for {
|
||
uniqueId := rands.HexString(32)
|
||
ok, err := this.Query(tx).
|
||
Attr("uniqueId", uniqueId).
|
||
Exist()
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
if ok {
|
||
continue
|
||
}
|
||
return uniqueId, nil
|
||
}
|
||
}
|
||
|
||
// FindEnabledNodesWithIds 根据一组ID查找一组节点
|
||
func (this *NodeDAO) FindEnabledNodesWithIds(tx *dbs.Tx, nodeIds []int64) (result []*Node, err error) {
|
||
if len(nodeIds) == 0 {
|
||
return nil, nil
|
||
}
|
||
idStrings := []string{}
|
||
for _, nodeId := range nodeIds {
|
||
idStrings = append(idStrings, numberutils.FormatInt64(nodeId))
|
||
}
|
||
_, err = this.Query(tx).
|
||
State(NodeStateEnabled).
|
||
Where("id IN ("+strings.Join(idStrings, ", ")+")").
|
||
Result("id", "connectedAPINodes", "isActive", "isOn").
|
||
Slice(&result).
|
||
Reuse(false).
|
||
FindAll()
|
||
return
|
||
}
|
||
|
||
// DeleteNodeFromCluster 从集群中删除节点
|
||
func (this *NodeDAO) DeleteNodeFromCluster(tx *dbs.Tx, nodeId int64, clusterId int64) error {
|
||
one, err := this.Query(tx).
|
||
Pk(nodeId).
|
||
Result("clusterId", "secondaryClusterIds").
|
||
Find()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if one == nil {
|
||
return nil
|
||
}
|
||
|
||
// 提前通知DNS更新,因为后面集群会有变化
|
||
err = this.NotifyDNSUpdate(tx, nodeId)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
var node = one.(*Node)
|
||
|
||
var secondaryClusterIds = []int64{}
|
||
for _, secondaryClusterId := range node.DecodeSecondaryClusterIds() {
|
||
if secondaryClusterId == clusterId {
|
||
continue
|
||
}
|
||
secondaryClusterIds = append(secondaryClusterIds, secondaryClusterId)
|
||
}
|
||
|
||
var newClusterId = int64(node.ClusterId)
|
||
|
||
if newClusterId == clusterId {
|
||
newClusterId = 0
|
||
|
||
// 选择一个从集群作为主集群
|
||
if len(secondaryClusterIds) > 0 {
|
||
newClusterId = secondaryClusterIds[0]
|
||
secondaryClusterIds = secondaryClusterIds[1:]
|
||
}
|
||
}
|
||
|
||
secondaryClusterIdsJSON, err := json.Marshal(secondaryClusterIds)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
var op = NewNodeOperator()
|
||
op.Id = nodeId
|
||
op.ClusterId = newClusterId
|
||
op.SecondaryClusterIds = secondaryClusterIdsJSON
|
||
|
||
if newClusterId == 0 {
|
||
op.State = NodeStateDisabled
|
||
}
|
||
|
||
return this.Save(tx, op)
|
||
}
|
||
|
||
// TransferPrimaryClusterNodes 自动转移集群下的节点
|
||
func (this *NodeDAO) TransferPrimaryClusterNodes(tx *dbs.Tx, primaryClusterId int64) error {
|
||
if primaryClusterId <= 0 {
|
||
return nil
|
||
}
|
||
ones, err := this.Query(tx).
|
||
Attr("clusterId", primaryClusterId).
|
||
Result("id", "secondaryClusterIds").
|
||
State(NodeStateEnabled).
|
||
FindAll()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
for _, one := range ones {
|
||
var node = one.(*Node)
|
||
clusterIds := node.DecodeSecondaryClusterIds()
|
||
if len(clusterIds) == 0 {
|
||
continue
|
||
}
|
||
var clusterId = clusterIds[0]
|
||
var secondaryClusterIds = clusterIds[1:]
|
||
secondaryClusterIdsJSON, err := json.Marshal(secondaryClusterIds)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
err = this.Query(tx).
|
||
Pk(node.Id).
|
||
Set("clusterId", clusterId).
|
||
Set("secondaryClusterIds", secondaryClusterIdsJSON).
|
||
UpdateQuickly()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// FindParentNodeConfigs 查找父级节点配置
|
||
func (this *NodeDAO) FindParentNodeConfigs(tx *dbs.Tx, nodeId int64, groupId int64, clusterIds []int64, level int) (result map[int64][]*nodeconfigs.ParentNodeConfig, err error) {
|
||
result = map[int64][]*nodeconfigs.ParentNodeConfig{} // clusterId => []*ParentNodeConfig
|
||
|
||
// 当前分组的L2
|
||
var parentNodes []*Node
|
||
if groupId > 0 {
|
||
parentNodes, err = this.FindEnabledNodesWithGroupIdAndLevel(tx, groupId, level+1)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
} else if nodeId > 0 {
|
||
// 当前节点所属分组
|
||
groupId, err = this.Query(tx).Result("groupId").FindInt64Col(0)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
if groupId > 0 {
|
||
parentNodes, err = this.FindEnabledNodesWithGroupIdAndLevel(tx, groupId, level+1)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
}
|
||
}
|
||
|
||
// 当前集群的L2
|
||
if len(parentNodes) == 0 {
|
||
for _, clusterId := range clusterIds {
|
||
clusterParentNodes, err := this.FindEnabledNodesWithClusterIdAndLevel(tx, clusterId, level+1)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if len(clusterParentNodes) > 0 {
|
||
parentNodes = append(parentNodes, clusterParentNodes...)
|
||
}
|
||
}
|
||
}
|
||
|
||
if len(parentNodes) > 0 {
|
||
for _, node := range parentNodes {
|
||
// 是否有Ln地址
|
||
var addrStrings = node.DecodeLnAddrs()
|
||
if len(addrStrings) == 0 {
|
||
// 如果没有就取节点的可访问地址
|
||
addrs, err := SharedNodeIPAddressDAO.FindNodeAccessAndUpIPAddresses(tx, int64(node.Id), nodeconfigs.NodeRoleNode)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
for _, addr := range addrs {
|
||
if addr.IsOn {
|
||
addrStrings = append(addrStrings, addr.DNSIP())
|
||
}
|
||
}
|
||
}
|
||
|
||
// 没有地址就跳过
|
||
if len(addrStrings) == 0 {
|
||
continue
|
||
}
|
||
|
||
var secretHash = fmt.Sprintf("%x", sha256.Sum256([]byte(node.UniqueId+"@"+node.Secret)))
|
||
|
||
for _, clusterId := range node.AllClusterIds() {
|
||
parentNodeConfigs, _ := result[clusterId]
|
||
parentNodeConfigs = append(parentNodeConfigs, &nodeconfigs.ParentNodeConfig{
|
||
Id: int64(node.Id),
|
||
Addrs: addrStrings,
|
||
SecretHash: secretHash,
|
||
})
|
||
result[clusterId] = parentNodeConfigs
|
||
}
|
||
}
|
||
}
|
||
|
||
return
|
||
}
|
||
|
||
// FindNodeDDoSProtection 获取节点的DDOS设置
|
||
func (this *NodeDAO) FindNodeDDoSProtection(tx *dbs.Tx, nodeId int64) (*ddosconfigs.ProtectionConfig, error) {
|
||
one, err := this.Query(tx).
|
||
Result("ddosProtection").
|
||
Pk(nodeId).
|
||
Find()
|
||
if one == nil || err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
return one.(*Node).DecodeDDoSProtection(), nil
|
||
}
|
||
|
||
// UpdateNodeDDoSProtection 设置集群的DDoS设置
|
||
func (this *NodeDAO) UpdateNodeDDoSProtection(tx *dbs.Tx, nodeId int64, ddosProtection *ddosconfigs.ProtectionConfig) error {
|
||
if nodeId <= 0 {
|
||
return ErrNotFound
|
||
}
|
||
|
||
var op = NewNodeOperator()
|
||
op.Id = nodeId
|
||
|
||
if ddosProtection == nil {
|
||
op.DdosProtection = "{}"
|
||
} else {
|
||
ddosProtectionJSON, err := json.Marshal(ddosProtection)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
op.DdosProtection = ddosProtectionJSON
|
||
}
|
||
|
||
err := this.Save(tx, op)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
clusterId, err := this.FindNodeClusterId(tx, nodeId)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if clusterId > 0 {
|
||
return SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, 0, 0, NodeTaskTypeDDosProtectionChanged)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// FindNodeAPIConfig 查找API相关配置信息
|
||
func (this *NodeDAO) FindNodeAPIConfig(tx *dbs.Tx, nodeId int64) (*Node, error) {
|
||
if nodeId <= 0 {
|
||
return nil, nil
|
||
}
|
||
|
||
one, err := this.Query(tx).
|
||
Pk(nodeId).
|
||
Result("apiNodeAddrs").
|
||
Find()
|
||
if err != nil || one == nil {
|
||
return nil, err
|
||
}
|
||
return one.(*Node), nil
|
||
}
|
||
|
||
// UpdateNodeAPIConfig 修改API相关配置信息
|
||
func (this *NodeDAO) UpdateNodeAPIConfig(tx *dbs.Tx, nodeId int64, apiNodeAddrs []*serverconfigs.NetworkAddressConfig) error {
|
||
if nodeId <= 0 {
|
||
return errors.New("invalid nodeId")
|
||
}
|
||
|
||
if apiNodeAddrs == nil {
|
||
apiNodeAddrs = []*serverconfigs.NetworkAddressConfig{}
|
||
}
|
||
apiNodeAddrsJSON, err := json.Marshal(apiNodeAddrs)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
var op = NewNodeOperator()
|
||
op.Id = nodeId
|
||
op.ApiNodeAddrs = apiNodeAddrsJSON
|
||
err = this.Save(tx, op)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
return this.NotifyUpdate(tx, nodeId)
|
||
}
|
||
|
||
// NotifyUpdate 通知节点相关更新
|
||
func (this *NodeDAO) NotifyUpdate(tx *dbs.Tx, nodeId int64) error {
|
||
// 这里只需要通知单个集群即可,因为节点是公用的,更新一个就相当于更新了所有
|
||
clusterId, err := this.FindNodeClusterId(tx, nodeId)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
err = SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, 0, 0, NodeTaskTypeConfigChanged)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// NotifyLevelUpdate 通知节点级别更新
|
||
func (this *NodeDAO) NotifyLevelUpdate(tx *dbs.Tx, nodeId int64) error {
|
||
// 这里只需要通知单个集群即可,因为节点是公用的,更新一个就相当于更新了所有
|
||
clusterIds, err := this.FindEnabledAndOnNodeClusterIds(tx, nodeId)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
for _, clusterId := range clusterIds {
|
||
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, 0, 0, NodeTaskTypeNodeLevelChanged)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// NotifyDNSUpdate 通知节点相关DNS更新
|
||
func (this *NodeDAO) NotifyDNSUpdate(tx *dbs.Tx, nodeId int64) error {
|
||
clusterIds, err := this.FindEnabledAndOnNodeClusterIds(tx, nodeId)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
for _, clusterId := range clusterIds {
|
||
dnsInfo, err := SharedNodeClusterDAO.FindClusterDNSInfo(tx, clusterId, nil)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if dnsInfo == nil {
|
||
continue
|
||
}
|
||
if len(dnsInfo.DnsName) == 0 || dnsInfo.DnsDomainId <= 0 {
|
||
continue
|
||
}
|
||
err = dns.SharedDNSTaskDAO.CreateNodeTask(tx, clusterId, nodeId, dns.DNSTaskTypeNodeChange)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return nil
|
||
}
|