Files
EdgeAPI/internal/db/models/node_dao.go

1420 lines
37 KiB
Go
Raw Normal View History

2020-07-22 22:17:53 +08:00
package models
import (
2020-09-13 20:37:28 +08:00
"encoding/json"
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
2021-01-27 23:00:02 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/db/models/dns"
2020-10-04 14:27:14 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
2020-09-26 08:06:40 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
2021-01-19 12:05:35 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs"
2020-07-22 22:17:53 +08:00
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
2021-07-19 17:58:16 +08:00
"github.com/iwind/TeaGo/lists"
2020-10-25 21:27:46 +08:00
"github.com/iwind/TeaGo/maps"
2020-07-24 09:17:48 +08:00
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
2020-09-26 08:06:40 +08:00
"strconv"
"strings"
2020-07-22 22:17:53 +08:00
)
const (
NodeStateEnabled = 1 // 已启用
NodeStateDisabled = 0 // 已禁用
)
var nodeIdCacheMap = map[string]int64{} // uniqueId => nodeId
2020-07-22 22:17:53 +08:00
type NodeDAO dbs.DAO
func NewNodeDAO() *NodeDAO {
return dbs.NewDAO(&NodeDAO{
DAOObject: dbs.DAOObject{
DB: Tea.Env,
Table: "edgeNodes",
Model: new(Node),
PkName: "id",
},
}).(*NodeDAO)
}
2020-10-13 20:05:13 +08:00
var SharedNodeDAO *NodeDAO
func init() {
dbs.OnReady(func() {
SharedNodeDAO = NewNodeDAO()
})
}
2020-07-22 22:17:53 +08:00
2021-04-13 20:01:21 +08:00
// EnableNode 启用条目
func (this *NodeDAO) EnableNode(tx *dbs.Tx, id uint32) (rowsAffected int64, err error) {
return this.Query(tx).
2020-07-22 22:17:53 +08:00
Pk(id).
Set("state", NodeStateEnabled).
Update()
}
2021-04-13 20:01:21 +08:00
// DisableNode 禁用条目
func (this *NodeDAO) DisableNode(tx *dbs.Tx, nodeId int64) (err error) {
// 删除缓存
uniqueId, err := this.Query(tx).
Pk(nodeId).
Result("uniqueId").
FindStringCol("")
if err != nil {
return err
}
if len(uniqueId) > 0 {
SharedCacheLocker.Lock()
delete(nodeIdCacheMap, uniqueId)
SharedCacheLocker.Unlock()
}
_, err = this.Query(tx).
Pk(nodeId).
2020-07-22 22:17:53 +08:00
Set("state", NodeStateDisabled).
Update()
if err != nil {
return err
}
2021-01-27 23:00:02 +08:00
err = this.NotifyUpdate(tx, nodeId)
if err != nil {
return err
}
err = this.NotifyDNSUpdate(tx, nodeId)
if err != nil {
return err
}
return nil
2020-07-22 22:17:53 +08:00
}
2021-04-13 20:01:21 +08:00
// FindEnabledNode 查找启用中的条目
func (this *NodeDAO) FindEnabledNode(tx *dbs.Tx, id int64) (*Node, error) {
result, err := this.Query(tx).
2020-07-22 22:17:53 +08:00
Pk(id).
Attr("state", NodeStateEnabled).
Find()
if result == nil {
return nil, err
}
return result.(*Node), err
}
2021-08-31 17:24:52 +08:00
// FindEnabledBasicNode 获取节点的基本信息
func (this *NodeDAO) FindEnabledBasicNode(tx *dbs.Tx, nodeId int64) (*Node, error) {
one, err := this.Query(tx).
State(NodeStateEnabled).
Pk(nodeId).
Result("id", "name", "clusterId", "isOn", "isUp").
Find()
if one == nil {
return nil, err
}
return one.(*Node), nil
}
2021-04-13 20:01:21 +08:00
// FindNodeName 根据主键查找名称
func (this *NodeDAO) FindNodeName(tx *dbs.Tx, id int64) (string, error) {
name, err := this.Query(tx).
2020-07-22 22:17:53 +08:00
Pk(id).
Result("name").
FindCol("")
return name.(string), err
}
2020-07-24 09:17:48 +08:00
2021-04-13 20:01:21 +08:00
// CreateNode 创建节点
func (this *NodeDAO) CreateNode(tx *dbs.Tx, adminId int64, name string, clusterId int64, groupId int64, regionId int64) (nodeId int64, err error) {
// 检查节点数量
if teaconst.MaxNodes > 0 {
count, err := this.Query(tx).
State(NodeStateEnabled).
Count()
if err != nil {
return 0, err
}
if int64(teaconst.MaxNodes) <= count {
return 0, errors.New("[企业版]超出最大节点数限制:" + types.String(teaconst.MaxNodes) + ",请购买更多配额")
}
}
2021-01-27 23:00:02 +08:00
uniqueId, err := this.GenUniqueId(tx)
2020-08-21 12:32:33 +08:00
if err != nil {
return 0, err
}
2020-09-06 16:19:54 +08:00
secret := rands.String(32)
// 保存API Token
err = SharedApiTokenDAO.CreateAPIToken(tx, uniqueId, secret, nodeconfigs.NodeRoleNode)
2020-09-06 16:19:54 +08:00
if err != nil {
return
}
2020-07-24 09:17:48 +08:00
op := NewNodeOperator()
op.AdminId = adminId
2020-07-24 09:17:48 +08:00
op.Name = name
2020-08-21 12:32:33 +08:00
op.UniqueId = uniqueId
2020-09-06 16:19:54 +08:00
op.Secret = secret
2020-07-24 09:17:48 +08:00
op.ClusterId = clusterId
op.GroupId = groupId
2020-12-10 16:11:35 +08:00
op.RegionId = regionId
2020-07-29 19:02:28 +08:00
op.IsOn = 1
2020-07-24 09:17:48 +08:00
op.State = NodeStateEnabled
err = this.Save(tx, op)
2020-07-24 09:17:48 +08:00
if err != nil {
return 0, err
}
2021-01-27 23:00:02 +08:00
// 通知节点更新
nodeId = types.Int64(op.Id)
err = this.NotifyUpdate(tx, nodeId)
if err != nil {
return 0, err
}
// 通知DNS更新
err = this.NotifyDNSUpdate(tx, nodeId)
if err != nil {
return 0, err
}
return nodeId, nil
2020-07-24 09:17:48 +08:00
}
2021-04-13 20:01:21 +08:00
// UpdateNode 修改节点
2021-09-13 16:47:40 +08:00
func (this *NodeDAO) UpdateNode(tx *dbs.Tx, nodeId int64, name string, clusterId int64, secondaryClusterIds []int64, groupId int64, regionId int64, isOn bool) error {
2020-07-30 22:41:49 +08:00
if nodeId <= 0 {
return errors.New("invalid nodeId")
}
// 老的集群
oldClusterIds, err := this.FindEnabledNodeClusterIds(tx, nodeId)
if err != nil {
return err
}
2020-07-24 09:17:48 +08:00
op := NewNodeOperator()
op.Id = nodeId
op.Name = name
op.ClusterId = clusterId
2021-07-31 22:23:11 +08:00
// 去重
var filteredSecondaryClusterIds = []int64{}
for _, secondaryClusterId := range secondaryClusterIds {
if secondaryClusterId <= 0 {
continue
}
if lists.ContainsInt64(filteredSecondaryClusterIds, secondaryClusterId) {
continue
}
filteredSecondaryClusterIds = append(filteredSecondaryClusterIds, secondaryClusterId)
}
filteredSecondaryClusterIdsJSON, err := json.Marshal(filteredSecondaryClusterIds)
if err != nil {
return err
}
op.SecondaryClusterIds = filteredSecondaryClusterIdsJSON
op.GroupId = groupId
2020-12-10 16:11:35 +08:00
op.RegionId = regionId
2020-08-21 12:32:33 +08:00
op.LatestVersion = dbs.SQL("latestVersion+1")
op.IsOn = isOn
2021-07-31 22:23:11 +08:00
err = this.Save(tx, op)
2020-08-21 12:32:33 +08:00
if err != nil {
return err
2020-08-21 12:32:33 +08:00
}
2021-01-27 23:00:02 +08:00
err = this.NotifyUpdate(tx, nodeId)
if err != nil {
return err
}
// 通知老的集群更新
for _, oldClusterId := range oldClusterIds {
if oldClusterId != clusterId && !lists.ContainsInt64(secondaryClusterIds, oldClusterId) {
err = dns.SharedDNSTaskDAO.CreateClusterTask(tx, oldClusterId, dns.DNSTaskTypeClusterChange)
if err != nil {
return err
}
}
}
2021-01-27 23:00:02 +08:00
return this.NotifyDNSUpdate(tx, nodeId)
2020-08-21 12:32:33 +08:00
}
2021-04-13 20:01:21 +08:00
// CountAllEnabledNodes 计算所有节点数量
func (this *NodeDAO) CountAllEnabledNodes(tx *dbs.Tx) (int64, error) {
return this.Query(tx).
2020-07-24 09:17:48 +08:00
State(NodeStateEnabled).
Where("clusterId IN (SELECT id FROM "+SharedNodeClusterDAO.Table+" WHERE state=:clusterState)").
Param("clusterState", NodeClusterStateEnabled).
2020-07-24 09:17:48 +08:00
Count()
}
2021-04-13 20:01:21 +08:00
// ListEnabledNodesMatch 列出单页节点
func (this *NodeDAO) ListEnabledNodesMatch(tx *dbs.Tx,
clusterId int64,
installState configutils.BoolState,
activeState configutils.BoolState,
keyword string,
groupId int64,
regionId int64,
2021-07-31 22:23:11 +08:00
includeSecondaryNodes bool,
order string,
offset int64,
size int64) (result []*Node, err error) {
query := this.Query(tx).
2020-07-24 09:17:48 +08:00
State(NodeStateEnabled).
2020-07-29 19:02:28 +08:00
Offset(offset).
Limit(size).
2020-08-30 16:12:00 +08:00
Slice(&result)
2020-09-13 20:37:28 +08:00
// 集群
2020-08-30 16:12:00 +08:00
if clusterId > 0 {
2021-07-31 22:23:11 +08:00
if includeSecondaryNodes {
query.Where("(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))").
Param("primaryClusterId", clusterId).
Param("primaryClusterIdString", types.String(clusterId))
} else {
query.Attr("clusterId", clusterId)
}
} else {
query.Where("clusterId IN (SELECT id FROM " + SharedNodeClusterDAO.Table + " WHERE state=1)")
2020-08-30 16:12:00 +08:00
}
2020-09-13 20:37:28 +08:00
// 安装状态
switch installState {
2020-09-26 08:06:40 +08:00
case configutils.BoolStateAll:
// 所有
case configutils.BoolStateYes:
2020-09-13 20:37:28 +08:00
query.Attr("isInstalled", 1)
2020-09-26 08:06:40 +08:00
case configutils.BoolStateNo:
2020-09-13 20:37:28 +08:00
query.Attr("isInstalled", 0)
}
2020-09-26 08:06:40 +08:00
// 在线状态
switch activeState {
case configutils.BoolStateAll:
// 所有
case configutils.BoolStateYes:
2020-09-28 16:25:39 +08:00
query.Where("JSON_EXTRACT(status, '$.isActive') AND UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')<=60")
2020-09-26 08:06:40 +08:00
case configutils.BoolStateNo:
2020-09-28 16:25:39 +08:00
query.Where("(status IS NULL OR NOT JSON_EXTRACT(status, '$.isActive') OR UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>60)")
2020-09-26 08:06:40 +08:00
}
2020-10-28 18:21:15 +08:00
// 关键词
if len(keyword) > 0 {
query.Where("(name LIKE :keyword OR JSON_EXTRACT(status,'$.hostname') LIKE :keyword OR id IN (SELECT nodeId FROM "+SharedNodeIPAddressDAO.Table+" WHERE ip LIKE :keyword))").
Param("keyword", "%"+keyword+"%")
}
// 分组
if groupId > 0 {
query.Attr("groupId", groupId)
}
2020-12-10 16:11:35 +08:00
// 区域
if regionId > 0 {
query.Attr("regionId", regionId)
}
// 排序
switch order {
case "cpuAsc":
query.Asc("IF(JSON_EXTRACT(status, '$.updatedAt')>UNIX_TIMESTAMP()-120, IFNULL(JSON_EXTRACT(status, '$.cpuUsage'), 0), 0)")
case "cpuDesc":
query.Desc("IF(JSON_EXTRACT(status, '$.updatedAt')>UNIX_TIMESTAMP()-120, IFNULL(JSON_EXTRACT(status, '$.cpuUsage'), 0), 0)")
case "memoryAsc":
query.Asc("IF(JSON_EXTRACT(status, '$.updatedAt')>UNIX_TIMESTAMP()-120, IFNULL(JSON_EXTRACT(status, '$.memoryUsage'), 0), 0)")
case "memoryDesc":
query.Desc("IF(JSON_EXTRACT(status, '$.updatedAt')>UNIX_TIMESTAMP()-120, IFNULL(JSON_EXTRACT(status, '$.memoryUsage'), 0), 0)")
case "trafficInAsc":
query.Asc("IF(JSON_EXTRACT(status, '$.updatedAt')>UNIX_TIMESTAMP()-120, IFNULL(JSON_EXTRACT(status, '$.trafficInBytes'), 0), 0)")
case "trafficInDesc":
query.Desc("IF(JSON_EXTRACT(status, '$.updatedAt')>UNIX_TIMESTAMP()-120, IFNULL(JSON_EXTRACT(status, '$.trafficInBytes'), 0), 0)")
case "trafficOutAsc":
query.Asc("IF(JSON_EXTRACT(status, '$.updatedAt')>UNIX_TIMESTAMP()-120, IFNULL(JSON_EXTRACT(status, '$.trafficOutBytes'), 0), 0)")
case "trafficOutDesc":
query.Desc("IF(JSON_EXTRACT(status, '$.updatedAt')>UNIX_TIMESTAMP()-120, IFNULL(JSON_EXTRACT(status, '$.trafficOutBytes'), 0), 0)")
}
query.DescPk()
2020-08-30 16:12:00 +08:00
_, err = query.FindAll()
2020-07-24 09:17:48 +08:00
return
}
2020-08-21 12:32:33 +08:00
2021-04-13 20:01:21 +08:00
// FindEnabledNodeWithUniqueIdAndSecret 根据节点ID和密钥查询节点
func (this *NodeDAO) FindEnabledNodeWithUniqueIdAndSecret(tx *dbs.Tx, uniqueId string, secret string) (*Node, error) {
one, err := this.Query(tx).
2020-08-21 12:32:33 +08:00
Attr("uniqueId", uniqueId).
Attr("secret", secret).
State(NodeStateEnabled).
Find()
if one != nil {
return one.(*Node), err
}
return nil, err
}
2021-04-13 20:01:21 +08:00
// FindEnabledNodeWithUniqueId 根据节点ID获取节点
func (this *NodeDAO) FindEnabledNodeWithUniqueId(tx *dbs.Tx, uniqueId string) (*Node, error) {
one, err := this.Query(tx).
2020-08-21 12:32:33 +08:00
Attr("uniqueId", uniqueId).
State(NodeStateEnabled).
Find()
if one != nil {
return one.(*Node), err
}
return nil, err
}
2021-04-13 20:01:21 +08:00
// FindNodeClusterId 获取节点集群ID
func (this *NodeDAO) FindNodeClusterId(tx *dbs.Tx, nodeId int64) (int64, error) {
col, err := this.Query(tx).
2020-08-21 12:32:33 +08:00
Pk(nodeId).
Result("clusterId").
FindCol(0)
return types.Int64(col), err
}
2021-07-31 22:23:11 +08:00
// FindEnabledAndOnNodeClusterIds 获取节点所属所有可用而且启用的集群ID
func (this *NodeDAO) FindEnabledAndOnNodeClusterIds(tx *dbs.Tx, nodeId int64) (result []int64, err error) {
one, err := this.Query(tx).
Pk(nodeId).
Result("clusterId", "secondaryClusterIds").
Find()
if one == nil {
return nil, err
}
var clusterId = int64(one.(*Node).ClusterId)
if clusterId > 0 {
result = append(result, clusterId)
}
for _, clusterId := range one.(*Node).DecodeSecondaryClusterIds() {
if lists.ContainsInt64(result, clusterId) {
continue
}
// 检查是否启用
isOn, err := SharedNodeClusterDAO.CheckNodeClusterIsOn(tx, clusterId)
if err != nil {
return nil, err
}
if !isOn {
continue
}
result = append(result, clusterId)
}
return
}
// FindEnabledNodeClusterIds 获取节点所属所有可用的集群ID
func (this *NodeDAO) FindEnabledNodeClusterIds(tx *dbs.Tx, nodeId int64) (result []int64, err error) {
one, err := this.Query(tx).
Pk(nodeId).
Result("clusterId", "secondaryClusterIds").
Find()
if one == nil {
return nil, err
}
var clusterId = int64(one.(*Node).ClusterId)
if clusterId > 0 {
result = append(result, clusterId)
}
for _, clusterId := range one.(*Node).DecodeSecondaryClusterIds() {
if lists.ContainsInt64(result, clusterId) {
continue
}
result = append(result, clusterId)
}
return
}
2021-04-13 20:01:21 +08:00
// FindAllNodeIdsMatch 匹配节点并返回节点ID
2021-07-31 22:23:11 +08:00
func (this *NodeDAO) FindAllNodeIdsMatch(tx *dbs.Tx, clusterId int64, includeSecondaryNodes bool, isOn configutils.BoolState) (result []int64, err error) {
query := this.Query(tx)
2020-08-21 12:32:33 +08:00
query.State(NodeStateEnabled)
if clusterId > 0 {
2021-07-31 22:23:11 +08:00
if includeSecondaryNodes {
query.Where("(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))").
Param("primaryClusterId", clusterId).
Param("primaryClusterIdString", types.String(clusterId))
} else {
query.Attr("clusterId", clusterId)
}
} else {
query.Where("clusterId IN (SELECT id FROM " + SharedNodeClusterDAO.Table + " WHERE state=1)")
2020-08-21 12:32:33 +08:00
}
if isOn == configutils.BoolStateYes {
query.Attr("isOn", true)
} else if isOn == configutils.BoolStateNo {
query.Attr("isOn", false)
}
2020-08-21 12:32:33 +08:00
query.Result("id")
ones, _, err := query.FindOnes()
if err != nil {
return nil, err
}
for _, one := range ones {
result = append(result, one.GetInt64("id"))
}
return
}
2021-04-13 20:01:21 +08:00
// FindAllEnabledNodesWithClusterId 获取一个集群的所有节点
func (this *NodeDAO) FindAllEnabledNodesWithClusterId(tx *dbs.Tx, clusterId int64) (result []*Node, err error) {
_, err = this.Query(tx).
2020-10-04 14:27:14 +08:00
State(NodeStateEnabled).
Attr("clusterId", clusterId).
DescPk().
Slice(&result).
FindAll()
return
}
2021-04-13 20:01:21 +08:00
// FindAllInactiveNodesWithClusterId 取得一个集群离线的节点
func (this *NodeDAO) FindAllInactiveNodesWithClusterId(tx *dbs.Tx, clusterId int64) (result []*Node, err error) {
_, err = this.Query(tx).
2020-10-25 18:26:46 +08:00
State(NodeStateEnabled).
Attr("clusterId", clusterId).
2021-09-13 16:47:40 +08:00
Attr("isOn", true). // 只监控启用的节点
2020-10-25 21:27:46 +08:00
Attr("isInstalled", true). // 只监控已经安装的节点
2021-09-13 16:47:40 +08:00
Attr("isActive", true). // 当前已经在线的
2020-10-25 18:26:46 +08:00
Where("(status IS NULL OR (JSON_EXTRACT(status, '$.isActive')=false AND UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>10) OR UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>120)").
2021-04-13 20:01:21 +08:00
Result("id", "name").
2020-10-25 18:26:46 +08:00
Slice(&result).
FindAll()
return
}
2021-04-13 20:01:21 +08:00
// CountAllEnabledNodesMatch 计算节点数量
func (this *NodeDAO) CountAllEnabledNodesMatch(tx *dbs.Tx,
clusterId int64,
installState configutils.BoolState,
activeState configutils.BoolState,
keyword string,
groupId int64,
2021-07-31 22:23:11 +08:00
regionId int64,
includeSecondaryNodes bool) (int64, error) {
query := this.Query(tx)
2020-08-30 16:12:00 +08:00
query.State(NodeStateEnabled)
2020-09-13 20:37:28 +08:00
// 集群
2020-08-30 16:12:00 +08:00
if clusterId > 0 {
2021-07-31 22:23:11 +08:00
if includeSecondaryNodes {
query.Where("(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))").
Param("primaryClusterId", clusterId).
Param("primaryClusterIdString", types.String(clusterId))
} else {
query.Attr("clusterId", clusterId)
}
} else {
query.Where("clusterId IN (SELECT id FROM " + SharedNodeClusterDAO.Table + " WHERE state=1)")
2020-08-30 16:12:00 +08:00
}
2020-09-13 20:37:28 +08:00
// 安装状态
switch installState {
2020-09-26 08:06:40 +08:00
case configutils.BoolStateAll:
// 所有
case configutils.BoolStateYes:
2020-09-13 20:37:28 +08:00
query.Attr("isInstalled", 1)
2020-09-26 08:06:40 +08:00
case configutils.BoolStateNo:
2020-09-13 20:37:28 +08:00
query.Attr("isInstalled", 0)
}
2020-09-26 08:06:40 +08:00
// 在线状态
switch activeState {
case configutils.BoolStateAll:
// 所有
case configutils.BoolStateYes:
2020-09-28 16:25:39 +08:00
query.Where("JSON_EXTRACT(status, '$.isActive') AND UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')<=60")
2020-09-26 08:06:40 +08:00
case configutils.BoolStateNo:
2020-09-28 16:25:39 +08:00
query.Where("(status IS NULL OR NOT JSON_EXTRACT(status, '$.isActive') OR UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>60)")
2020-09-26 08:06:40 +08:00
}
2020-10-28 18:21:15 +08:00
// 关键词
if len(keyword) > 0 {
query.Where("(name LIKE :keyword OR JSON_EXTRACT(status,'$.hostname') LIKE :keyword OR id IN (SELECT nodeId FROM "+SharedNodeIPAddressDAO.Table+" WHERE ip LIKE :keyword))").
Param("keyword", "%"+keyword+"%")
}
// 分组
if groupId > 0 {
query.Attr("groupId", groupId)
}
2020-12-10 16:11:35 +08:00
// 区域
if regionId > 0 {
query.Attr("regionId", regionId)
}
2020-08-30 16:12:00 +08:00
return query.Count()
}
2021-04-13 20:01:21 +08:00
// UpdateNodeStatus 更改节点状态
func (this *NodeDAO) UpdateNodeStatus(tx *dbs.Tx, nodeId int64, statusJSON []byte) error {
_, err := this.Query(tx).
2020-08-21 12:32:33 +08:00
Pk(nodeId).
Set("status", string(statusJSON)).
Update()
return err
}
2021-06-10 19:21:45 +08:00
// FindNodeStatus 获取节点状态
func (this *NodeDAO) FindNodeStatus(tx *dbs.Tx, nodeId int64) (*nodeconfigs.NodeStatus, error) {
statusJSONString, err := this.Query(tx).
Pk(nodeId).
Result("status").
FindStringCol("")
if err != nil {
return nil, err
}
if len(statusJSONString) == 0 {
return nil, nil
}
status := &nodeconfigs.NodeStatus{}
err = json.Unmarshal([]byte(statusJSONString), status)
if err != nil {
return nil, err
}
return status, nil
}
2021-04-13 20:01:21 +08:00
// UpdateNodeIsActive 更改节点在线状态
func (this *NodeDAO) UpdateNodeIsActive(tx *dbs.Tx, nodeId int64, isActive bool) error {
2020-10-25 18:26:46 +08:00
b := "true"
if !isActive {
b = "false"
}
_, err := this.Query(tx).
2020-10-25 18:26:46 +08:00
Pk(nodeId).
Where("status IS NOT NULL").
Set("status", dbs.SQL("JSON_SET(status, '$.isActive', "+b+")")).
Update()
return err
}
2021-04-13 20:01:21 +08:00
// UpdateNodeIsInstalled 设置节点安装状态
func (this *NodeDAO) UpdateNodeIsInstalled(tx *dbs.Tx, nodeId int64, isInstalled bool) error {
_, err := this.Query(tx).
2020-09-06 16:19:54 +08:00
Pk(nodeId).
Set("isInstalled", isInstalled).
2020-09-13 20:37:28 +08:00
Set("installStatus", "null"). // 重置安装状态
Update()
return err
}
2021-04-13 20:01:21 +08:00
// FindNodeInstallStatus 查询节点的安装状态
func (this *NodeDAO) FindNodeInstallStatus(tx *dbs.Tx, nodeId int64) (*NodeInstallStatus, error) {
node, err := this.Query(tx).
2020-09-13 20:37:28 +08:00
Pk(nodeId).
Result("installStatus", "isInstalled").
Find()
2020-09-13 20:37:28 +08:00
if err != nil {
return nil, err
}
if node == nil {
return nil, errors.New("not found")
}
2020-09-13 20:37:28 +08:00
installStatus := node.(*Node).InstallStatus
isInstalled := node.(*Node).IsInstalled == 1
2020-09-13 20:37:28 +08:00
if len(installStatus) == 0 {
return NewNodeInstallStatus(), nil
}
status := &NodeInstallStatus{}
err = json.Unmarshal([]byte(installStatus), status)
if err != nil {
return nil, err
}
if isInstalled {
status.IsFinished = true
status.IsOk = true
}
return status, nil
2020-09-13 20:37:28 +08:00
}
2021-04-13 20:01:21 +08:00
// UpdateNodeInstallStatus 修改节点的安装状态
func (this *NodeDAO) UpdateNodeInstallStatus(tx *dbs.Tx, nodeId int64, status *NodeInstallStatus) error {
2020-09-13 20:37:28 +08:00
if status == nil {
_, err := this.Query(tx).
2020-09-13 20:37:28 +08:00
Pk(nodeId).
Set("installStatus", "null").
Update()
return err
}
data, err := json.Marshal(status)
if err != nil {
return err
}
_, err = this.Query(tx).
2020-09-13 20:37:28 +08:00
Pk(nodeId).
Set("installStatus", string(data)).
2020-09-06 16:19:54 +08:00
Update()
return err
}
2021-04-13 20:01:21 +08:00
// ComposeNodeConfig 组合配置
2020-12-17 17:36:20 +08:00
// TODO 提升运行速度
2021-08-22 11:35:33 +08:00
func (this *NodeDAO) ComposeNodeConfig(tx *dbs.Tx, nodeId int64, cacheMap maps.Map) (*nodeconfigs.NodeConfig, error) {
node, err := this.FindEnabledNode(tx, nodeId)
2020-09-26 08:06:40 +08:00
if err != nil {
return nil, err
}
if node == nil {
return nil, errors.New("node not found '" + strconv.FormatInt(nodeId, 10) + "'")
}
config := &nodeconfigs.NodeConfig{
Id: int64(node.Id),
NodeId: node.UniqueId,
2021-07-18 15:52:34 +08:00
Secret: node.Secret,
IsOn: node.IsOn == 1,
Servers: nil,
Version: int64(node.Version),
Name: node.Name,
MaxCPU: types.Int32(node.MaxCPU),
RegionId: int64(node.RegionId),
2020-09-26 08:06:40 +08:00
}
// 获取所有的服务
servers, err := SharedServerDAO.FindAllEnabledServersWithNode(tx, int64(node.Id))
2020-09-26 08:06:40 +08:00
if err != nil {
return nil, err
}
for _, server := range servers {
2021-08-22 11:35:33 +08:00
serverConfig, err := SharedServerDAO.ComposeServerConfig(tx, server, cacheMap)
2020-09-26 08:06:40 +08:00
if err != nil {
return nil, err
}
2021-08-01 14:56:08 +08:00
if serverConfig == nil {
continue
}
2020-09-26 08:06:40 +08:00
config.Servers = append(config.Servers, serverConfig)
}
// 全局设置
// TODO 根据用户的不同读取不同的全局设置
2021-01-19 12:05:35 +08:00
settingJSON, err := SharedSysSettingDAO.ReadSetting(tx, systemconfigs.SettingCodeServerGlobalConfig)
2020-09-26 08:06:40 +08:00
if err != nil {
return nil, err
}
if len(settingJSON) > 0 {
globalConfig := &serverconfigs.GlobalConfig{}
err = json.Unmarshal(settingJSON, globalConfig)
if err != nil {
return nil, err
}
config.GlobalConfig = globalConfig
}
2021-08-01 14:56:08 +08:00
var primaryClusterId = int64(node.ClusterId)
var clusterIds = []int64{primaryClusterId}
clusterIds = append(clusterIds, node.DecodeSecondaryClusterIds()...)
for _, clusterId := range clusterIds {
2021-08-22 11:35:33 +08:00
httpFirewallPolicyId, err := SharedNodeClusterDAO.FindClusterHTTPFirewallPolicyId(tx, clusterId, cacheMap)
2020-12-17 17:36:20 +08:00
if err != nil {
return nil, err
}
2021-08-01 14:56:08 +08:00
if httpFirewallPolicyId > 0 {
2021-08-22 11:35:33 +08:00
firewallPolicy, err := SharedHTTPFirewallPolicyDAO.ComposeFirewallPolicy(tx, httpFirewallPolicyId, cacheMap)
2021-08-01 14:56:08 +08:00
if err != nil {
return nil, err
}
if firewallPolicy != nil {
config.HTTPFirewallPolicies = append(config.HTTPFirewallPolicies, firewallPolicy)
}
2020-12-17 17:36:20 +08:00
}
2021-08-01 14:56:08 +08:00
// 缓存策略
2021-08-22 11:35:33 +08:00
httpCachePolicyId, err := SharedNodeClusterDAO.FindClusterHTTPCachePolicyId(tx, clusterId, cacheMap)
2020-12-17 17:36:20 +08:00
if err != nil {
return nil, err
}
2021-08-01 14:56:08 +08:00
if httpCachePolicyId > 0 {
2021-08-22 11:35:33 +08:00
cachePolicy, err := SharedHTTPCachePolicyDAO.ComposeCachePolicy(tx, httpCachePolicyId, cacheMap)
2021-08-01 14:56:08 +08:00
if err != nil {
return nil, err
}
if cachePolicy != nil {
config.HTTPCachePolicies = append(config.HTTPCachePolicies, cachePolicy)
}
2020-12-17 17:36:20 +08:00
}
}
// 缓存最大容量设置
if len(node.MaxCacheDiskCapacity) > 0 {
capacity := &shared.SizeCapacity{}
err = json.Unmarshal([]byte(node.MaxCacheDiskCapacity), capacity)
if err != nil {
return nil, err
}
if capacity.Count > 0 {
config.MaxCacheDiskCapacity = capacity
}
}
if len(node.MaxCacheMemoryCapacity) > 0 {
capacity := &shared.SizeCapacity{}
err = json.Unmarshal([]byte(node.MaxCacheMemoryCapacity), capacity)
if err != nil {
return nil, err
}
if capacity.Count > 0 {
config.MaxCacheMemoryCapacity = capacity
}
}
2020-12-02 14:26:03 +08:00
// TOA
2021-08-01 14:56:08 +08:00
toaConfig, err := SharedNodeClusterDAO.FindClusterTOAConfig(tx, primaryClusterId)
2020-12-02 14:26:03 +08:00
if err != nil {
return nil, err
}
config.TOA = toaConfig
2021-01-11 18:16:04 +08:00
// 系统服务
2021-08-01 14:56:08 +08:00
services, err := SharedNodeClusterDAO.FindNodeClusterSystemServices(tx, primaryClusterId)
2021-01-11 18:16:04 +08:00
if err != nil {
return nil, err
}
if len(services) > 0 {
config.SystemServices = services
}
2021-02-06 17:38:04 +08:00
// 防火墙动作
2021-08-01 14:56:08 +08:00
actions, err := SharedNodeClusterFirewallActionDAO.FindAllEnabledFirewallActions(tx, primaryClusterId)
2021-02-06 17:38:04 +08:00
if err != nil {
return nil, err
}
for _, action := range actions {
actionConfig, err := SharedNodeClusterFirewallActionDAO.ComposeFirewallActionConfig(tx, action)
if err != nil {
return nil, err
}
if actionConfig != nil {
config.FirewallActions = append(config.FirewallActions, actionConfig)
}
}
2021-07-19 17:58:16 +08:00
// 集群指标
2021-06-30 19:59:49 +08:00
metricItemIds, err := SharedNodeClusterMetricItemDAO.FindAllClusterItemIds(tx, int64(node.ClusterId))
if err != nil {
return nil, err
}
var metricItems = []*serverconfigs.MetricItemConfig{}
for _, itemId := range metricItemIds {
itemConfig, err := SharedMetricItemDAO.ComposeItemConfig(tx, itemId)
if err != nil {
return nil, err
}
if itemConfig != nil {
metricItems = append(metricItems, itemConfig)
}
}
2021-07-19 17:58:16 +08:00
// 公用指标
publicMetricItems, err := SharedMetricItemDAO.FindAllPublicItems(tx)
if err != nil {
return nil, err
}
for _, item := range publicMetricItems {
itemConfig := SharedMetricItemDAO.ComposeItemConfigWithItem(item)
if itemConfig != nil && !lists.ContainsInt64(metricItemIds, itemConfig.Id) {
metricItems = append(metricItems, itemConfig)
}
}
2021-06-30 19:59:49 +08:00
config.MetricItems = metricItems
2021-06-27 21:59:37 +08:00
2020-09-26 08:06:40 +08:00
return config, nil
}
2021-04-13 20:01:21 +08:00
// UpdateNodeConnectedAPINodes 修改当前连接的API节点
func (this *NodeDAO) UpdateNodeConnectedAPINodes(tx *dbs.Tx, nodeId int64, apiNodeIds []int64) error {
2020-10-04 14:27:14 +08:00
if nodeId <= 0 {
return errors.New("invalid nodeId")
}
op := NewNodeOperator()
op.Id = nodeId
if len(apiNodeIds) > 0 {
apiNodeIdsJSON, err := json.Marshal(apiNodeIds)
if err != nil {
return errors.Wrap(err)
}
op.ConnectedAPINodes = apiNodeIdsJSON
} else {
op.ConnectedAPINodes = "[]"
}
err := this.Save(tx, op)
2020-10-04 14:27:14 +08:00
return err
}
2021-04-13 20:01:21 +08:00
// FindEnabledNodeIdWithUniqueId 根据UniqueId获取ID
func (this *NodeDAO) FindEnabledNodeIdWithUniqueId(tx *dbs.Tx, uniqueId string) (int64, error) {
return this.Query(tx).
2020-10-14 18:44:34 +08:00
State(NodeStateEnabled).
Attr("uniqueId", uniqueId).
ResultPk().
FindInt64Col(0)
}
2021-04-13 20:01:21 +08:00
// FindEnabledNodeIdWithUniqueIdCacheable 根据UniqueId获取ID并可以使用缓存
func (this *NodeDAO) FindEnabledNodeIdWithUniqueIdCacheable(tx *dbs.Tx, uniqueId string) (int64, error) {
SharedCacheLocker.RLock()
nodeId, ok := nodeIdCacheMap[uniqueId]
if ok {
SharedCacheLocker.RUnlock()
return nodeId, nil
}
SharedCacheLocker.RUnlock()
nodeId, err := this.Query(tx).
State(NodeStateEnabled).
Attr("uniqueId", uniqueId).
ResultPk().
FindInt64Col(0)
if err != nil {
return 0, err
}
if nodeId > 0 {
SharedCacheLocker.Lock()
nodeIdCacheMap[uniqueId] = nodeId
SharedCacheLocker.Unlock()
}
return nodeId, nil
}
2021-04-13 20:01:21 +08:00
// CountAllEnabledNodesWithGrantId 计算使用某个认证的节点数量
func (this *NodeDAO) CountAllEnabledNodesWithGrantId(tx *dbs.Tx, grantId int64) (int64, error) {
return this.Query(tx).
2020-10-25 21:27:46 +08:00
State(NodeStateEnabled).
Where("id IN (SELECT nodeId FROM edgeNodeLogins WHERE type='ssh' AND JSON_CONTAINS(params, :grantParam))").
Param("grantParam", string(maps.Map{"grantId": grantId}.AsJSON())).
Where("clusterId IN (SELECT id FROM edgeNodeClusters WHERE state=1)").
2020-10-25 21:27:46 +08:00
Count()
}
2021-04-13 20:01:21 +08:00
// FindAllEnabledNodesWithGrantId 查找使用某个认证的所有节点
func (this *NodeDAO) FindAllEnabledNodesWithGrantId(tx *dbs.Tx, grantId int64) (result []*Node, err error) {
_, err = this.Query(tx).
2020-10-25 21:27:46 +08:00
State(NodeStateEnabled).
Where("id IN (SELECT nodeId FROM edgeNodeLogins WHERE type='ssh' AND JSON_CONTAINS(params, :grantParam))").
Param("grantParam", string(maps.Map{"grantId": grantId}.AsJSON())).
Where("clusterId IN (SELECT id FROM edgeNodeClusters WHERE state=1)").
2020-10-25 21:27:46 +08:00
Slice(&result).
DescPk().
FindAll()
return
}
2021-04-13 20:01:21 +08:00
// CountAllNotInstalledNodesWithClusterId 计算未安装的节点数量
func (this *NodeDAO) CountAllNotInstalledNodesWithClusterId(tx *dbs.Tx, clusterId int64) (int64, error) {
return this.Query(tx).
State(NodeStateEnabled).
Attr("clusterId", clusterId).
Attr("isInstalled", false).
Count()
}
2021-04-13 20:01:21 +08:00
// FindAllNotInstalledNodesWithClusterId 查找所有未安装的节点
func (this *NodeDAO) FindAllNotInstalledNodesWithClusterId(tx *dbs.Tx, clusterId int64) (result []*Node, err error) {
_, err = this.Query(tx).
State(NodeStateEnabled).
Attr("clusterId", clusterId).
Attr("isInstalled", false).
DescPk().
Slice(&result).
FindAll()
return
}
2021-05-11 22:48:04 +08:00
// CountAllLowerVersionNodesWithClusterId 计算单个集群中所有低于某个版本的节点数量
func (this *NodeDAO) CountAllLowerVersionNodesWithClusterId(tx *dbs.Tx, clusterId int64, os string, arch string, version string) (int64, error) {
return this.Query(tx).
State(NodeStateEnabled).
Attr("clusterId", clusterId).
Where("status IS NOT NULL").
Where("JSON_EXTRACT(status, '$.os')=:os").
Where("JSON_EXTRACT(status, '$.arch')=:arch").
Where("(JSON_EXTRACT(status, '$.buildVersionCode') IS NULL OR JSON_EXTRACT(status, '$.buildVersionCode')<:version)").
Param("os", os).
Param("arch", arch).
Param("version", utils.VersionToLong(version)).
Count()
}
2021-05-11 22:48:04 +08:00
// FindAllLowerVersionNodesWithClusterId 查找单个集群中所有低于某个版本的节点
func (this *NodeDAO) FindAllLowerVersionNodesWithClusterId(tx *dbs.Tx, clusterId int64, os string, arch string, version string) (result []*Node, err error) {
_, err = this.Query(tx).
2020-10-28 12:35:36 +08:00
State(NodeStateEnabled).
Attr("clusterId", clusterId).
Where("status IS NOT NULL").
Where("JSON_EXTRACT(status, '$.os')=:os").
Where("JSON_EXTRACT(status, '$.arch')=:arch").
Where("(JSON_EXTRACT(status, '$.buildVersionCode') IS NULL OR JSON_EXTRACT(status, '$.buildVersionCode')<:version)").
2020-10-28 12:35:36 +08:00
Param("os", os).
Param("arch", arch).
Param("version", utils.VersionToLong(version)).
2020-10-28 12:35:36 +08:00
DescPk().
Slice(&result).
FindAll()
return
}
2021-05-11 22:48:04 +08:00
// CountAllLowerVersionNodes 计算所有集群中低于某个版本的节点数量
func (this *NodeDAO) CountAllLowerVersionNodes(tx *dbs.Tx, version string) (int64, error) {
return this.Query(tx).
State(NodeStateEnabled).
Where("clusterId IN (SELECT id FROM "+SharedNodeClusterDAO.Table+" WHERE state=1)").
Where("status IS NOT NULL").
Where("(JSON_EXTRACT(status, '$.buildVersionCode') IS NULL OR JSON_EXTRACT(status, '$.buildVersionCode')<:version)").
Param("version", utils.VersionToLong(version)).
Count()
}
2021-04-13 20:01:21 +08:00
// CountAllEnabledNodesWithGroupId 查找某个节点分组下的所有节点数量
func (this *NodeDAO) CountAllEnabledNodesWithGroupId(tx *dbs.Tx, groupId int64) (int64, error) {
return this.Query(tx).
2020-10-28 18:21:15 +08:00
State(NodeStateEnabled).
Attr("groupId", groupId).
Where("clusterId IN (SELECT id FROM " + SharedNodeClusterDAO.Table + " WHERE state=1)").
2020-10-28 18:21:15 +08:00
Count()
}
2021-04-13 20:01:21 +08:00
// CountAllEnabledNodesWithRegionId 查找某个节点区域下的所有节点数量
func (this *NodeDAO) CountAllEnabledNodesWithRegionId(tx *dbs.Tx, regionId int64) (int64, error) {
return this.Query(tx).
2020-12-10 15:03:03 +08:00
State(NodeStateEnabled).
Attr("regionId", regionId).
Where("clusterId IN (SELECT id FROM " + SharedNodeClusterDAO.Table + " WHERE state=1)").
2020-12-10 15:03:03 +08:00
Count()
}
2021-04-13 20:01:21 +08:00
// FindAllEnabledNodesDNSWithClusterId 获取一个集群的节点DNS信息
2021-07-31 22:23:11 +08:00
func (this *NodeDAO) FindAllEnabledNodesDNSWithClusterId(tx *dbs.Tx, clusterId int64, includeSecondaryNodes bool) (result []*Node, err error) {
if clusterId <= 0 {
return nil, nil
}
var query = this.Query(tx)
if includeSecondaryNodes {
query.Where("(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))").
Param("primaryClusterId", clusterId).
Param("primaryClusterIdString", types.String(clusterId))
} else {
query.Attr("clusterId", clusterId)
}
_, err = query.
2020-11-14 09:41:58 +08:00
State(NodeStateEnabled).
Attr("isOn", true).
Attr("isUp", true).
2020-11-14 21:28:07 +08:00
Result("id", "name", "dnsRoutes", "isOn").
2020-11-14 09:41:58 +08:00
DescPk().
Slice(&result).
FindAll()
return
}
2021-04-13 20:01:21 +08:00
// CountAllEnabledNodesDNSWithClusterId 计算一个集群的节点DNS数量
func (this *NodeDAO) CountAllEnabledNodesDNSWithClusterId(tx *dbs.Tx, clusterId int64) (result int64, err error) {
return this.Query(tx).
2020-12-23 16:49:47 +08:00
State(NodeStateEnabled).
Attr("clusterId", clusterId).
Attr("isOn", true).
Attr("isUp", true).
Result("id", "name", "dnsRoutes", "isOn").
DescPk().
Slice(&result).
Count()
}
2021-04-13 20:01:21 +08:00
// FindEnabledNodeDNS 获取单个节点的DNS信息
func (this *NodeDAO) FindEnabledNodeDNS(tx *dbs.Tx, nodeId int64) (*Node, error) {
one, err := this.Query(tx).
2020-11-14 09:41:58 +08:00
State(NodeStateEnabled).
Pk(nodeId).
Result("id", "name", "dnsRoutes", "clusterId", "isOn").
2020-11-14 09:41:58 +08:00
Find()
2021-07-31 22:23:11 +08:00
if one == nil {
2020-11-14 09:41:58 +08:00
return nil, err
}
return one.(*Node), nil
}
2021-04-13 20:01:21 +08:00
// FindStatelessNodeDNS 获取单个节点的DNS信息无论什么状态
2021-01-27 23:00:02 +08:00
func (this *NodeDAO) FindStatelessNodeDNS(tx *dbs.Tx, nodeId int64) (*Node, error) {
one, err := this.Query(tx).
Pk(nodeId).
Result("id", "name", "dnsRoutes", "clusterId", "isOn", "state").
Find()
if err != nil || one == nil {
return nil, err
}
return one.(*Node), nil
}
2021-04-13 20:01:21 +08:00
// UpdateNodeDNS 修改节点的DNS信息
func (this *NodeDAO) UpdateNodeDNS(tx *dbs.Tx, nodeId int64, routes map[int64][]string) error {
2020-11-14 09:41:58 +08:00
if nodeId <= 0 {
return errors.New("invalid nodeId")
}
if routes == nil {
2020-11-16 13:03:20 +08:00
routes = map[int64][]string{}
2020-11-14 09:41:58 +08:00
}
routesJSON, err := json.Marshal(routes)
if err != nil {
return err
}
op := NewNodeOperator()
op.Id = nodeId
op.DnsRoutes = routesJSON
err = this.Save(tx, op)
if err != nil {
return err
}
2021-01-27 23:00:02 +08:00
err = this.NotifyUpdate(tx, nodeId)
if err != nil {
return err
}
err = this.NotifyDNSUpdate(tx, nodeId)
if err != nil {
return err
}
return nil
2020-11-14 09:41:58 +08:00
}
2021-09-13 16:47:40 +08:00
// UpdateNodeSystem 设置系统信息
func (this *NodeDAO) UpdateNodeSystem(tx *dbs.Tx, nodeId int64, maxCPU int32) error {
if nodeId <= 0 {
return errors.New("invalid nodeId")
}
var op = NewNodeOperator()
op.Id = nodeId
op.MaxCPU = maxCPU
err := this.Save(tx, op)
if err != nil {
return err
}
return this.NotifyUpdate(tx, nodeId)
}
// UpdateNodeCache 设置缓存相关
func (this *NodeDAO) UpdateNodeCache(tx *dbs.Tx, nodeId int64, maxCacheDiskCapacityJSON []byte, maxCacheMemoryCapacityJSON []byte) error {
if nodeId <= 0 {
return errors.New("invalid nodeId")
}
var op = NewNodeOperator()
op.Id = nodeId
if len(maxCacheDiskCapacityJSON) > 0 {
op.MaxCacheDiskCapacity = maxCacheDiskCapacityJSON
}
if len(maxCacheMemoryCapacityJSON) > 0 {
op.MaxCacheMemoryCapacity = maxCacheMemoryCapacityJSON
}
err := this.Save(tx, op)
if err != nil {
return err
}
return this.NotifyUpdate(tx, nodeId)
}
2021-04-13 20:01:21 +08:00
// UpdateNodeUpCount 计算节点上线|下线状态
func (this *NodeDAO) UpdateNodeUpCount(tx *dbs.Tx, nodeId int64, isUp bool, maxUp int, maxDown int) (changed bool, err error) {
if nodeId <= 0 {
return false, errors.New("invalid nodeId")
}
one, err := this.Query(tx).
Pk(nodeId).
Result("isUp", "countUp", "countDown").
Find()
if err != nil {
return false, err
}
if one == nil {
return false, nil
}
oldIsUp := one.(*Node).IsUp == 1
// 如果新老状态一致,则不做任何事情
if oldIsUp == isUp {
2020-11-16 09:20:24 +08:00
return false, nil
}
countUp := int(one.(*Node).CountUp)
countDown := int(one.(*Node).CountDown)
op := NewNodeOperator()
op.Id = nodeId
if isUp {
countUp++
countDown = 0
if countUp >= maxUp {
changed = true
op.IsUp = true
}
} else {
countDown++
countUp = 0
if countDown >= maxDown {
changed = true
op.IsUp = false
}
}
op.CountUp = countUp
op.CountDown = countDown
err = this.Save(tx, op)
if err != nil {
return false, err
}
if changed {
err = this.NotifyDNSUpdate(tx, nodeId)
if err != nil {
return true, err
}
2021-01-27 23:00:02 +08:00
}
return
}
2021-04-13 20:01:21 +08:00
// UpdateNodeUp 设置节点上下线状态
func (this *NodeDAO) UpdateNodeUp(tx *dbs.Tx, nodeId int64, isUp bool) error {
if nodeId <= 0 {
return errors.New("invalid nodeId")
}
op := NewNodeOperator()
op.Id = nodeId
op.IsUp = isUp
op.CountUp = 0
op.CountDown = 0
err := this.Save(tx, op)
if err != nil {
return err
}
// TODO 只有前后状态不一致的时候才需要更新DNS
return this.NotifyDNSUpdate(tx, nodeId)
}
2021-04-13 20:01:21 +08:00
// UpdateNodeActive 修改节点活跃状态
func (this *NodeDAO) UpdateNodeActive(tx *dbs.Tx, nodeId int64, isActive bool) error {
2020-11-16 09:20:24 +08:00
if nodeId <= 0 {
return errors.New("invalid nodeId")
}
_, err := this.Query(tx).
2020-11-16 09:20:24 +08:00
Pk(nodeId).
Set("isActive", isActive).
Update()
return err
}
2021-04-13 20:01:21 +08:00
// FindNodeActive 检查节点活跃状态
func (this *NodeDAO) FindNodeActive(tx *dbs.Tx, nodeId int64) (bool, error) {
isActive, err := this.Query(tx).
2020-11-16 09:20:24 +08:00
Pk(nodeId).
Result("isActive").
FindIntCol(0)
if err != nil {
return false, err
}
return isActive == 1, nil
}
2021-04-13 20:01:21 +08:00
// FindNodeVersion 查找节点的版本号
func (this *NodeDAO) FindNodeVersion(tx *dbs.Tx, nodeId int64) (int64, error) {
return this.Query(tx).
2020-12-02 14:26:03 +08:00
Pk(nodeId).
Result("version").
FindInt64Col(0)
}
2021-04-13 20:01:21 +08:00
// GenUniqueId 生成唯一ID
2021-01-27 23:00:02 +08:00
func (this *NodeDAO) GenUniqueId(tx *dbs.Tx) (string, error) {
2020-08-21 12:32:33 +08:00
for {
uniqueId := rands.HexString(32)
ok, err := this.Query(tx).
2020-08-21 12:32:33 +08:00
Attr("uniqueId", uniqueId).
Exist()
if err != nil {
return "", err
}
if ok {
continue
}
return uniqueId, nil
}
}
2021-04-13 20:01:21 +08:00
// FindEnabledNodesWithIds 根据一组ID查找一组节点
func (this *NodeDAO) FindEnabledNodesWithIds(tx *dbs.Tx, nodeIds []int64) (result []*Node, err error) {
if len(nodeIds) == 0 {
return nil, nil
}
idStrings := []string{}
for _, nodeId := range nodeIds {
idStrings = append(idStrings, numberutils.FormatInt64(nodeId))
}
_, err = this.Query(tx).
State(NodeStateEnabled).
Where("id IN ("+strings.Join(idStrings, ", ")+")").
Result("id", "connectedAPINodes", "isActive", "isOn").
Slice(&result).
Reuse(false).
FindAll()
return
}
2021-07-31 22:23:11 +08:00
// DeleteNodeFromCluster 从集群中删除节点
func (this *NodeDAO) DeleteNodeFromCluster(tx *dbs.Tx, nodeId int64, clusterId int64) error {
one, err := this.Query(tx).
Pk(nodeId).
Result("clusterId", "secondaryClusterIds").
Find()
if err != nil {
return err
}
if one == nil {
return nil
}
var node = one.(*Node)
var secondaryClusterIds = []int64{}
for _, secondaryClusterId := range node.DecodeSecondaryClusterIds() {
if secondaryClusterId == clusterId {
continue
}
secondaryClusterIds = append(secondaryClusterIds, secondaryClusterId)
}
var newClusterId = int64(node.ClusterId)
if newClusterId == clusterId {
newClusterId = 0
// 选择一个从集群作为主集群
if len(secondaryClusterIds) > 0 {
newClusterId = secondaryClusterIds[0]
secondaryClusterIds = secondaryClusterIds[1:]
}
}
secondaryClusterIdsJSON, err := json.Marshal(secondaryClusterIds)
if err != nil {
return err
}
op := NewNodeOperator()
op.Id = nodeId
op.ClusterId = newClusterId
op.SecondaryClusterIds = secondaryClusterIdsJSON
if newClusterId == 0 {
op.State = NodeStateDisabled
}
2021-07-31 22:23:11 +08:00
return this.Save(tx, op)
}
// TransferPrimaryClusterNodes 自动转移集群下的节点
func (this *NodeDAO) TransferPrimaryClusterNodes(tx *dbs.Tx, primaryClusterId int64) error {
if primaryClusterId <= 0 {
return nil
}
ones, err := this.Query(tx).
Attr("clusterId", primaryClusterId).
Result("id", "secondaryClusterIds").
State(NodeStateEnabled).
FindAll()
if err != nil {
return err
}
for _, one := range ones {
var node = one.(*Node)
clusterIds := node.DecodeSecondaryClusterIds()
if len(clusterIds) == 0 {
continue
}
var clusterId = clusterIds[0]
var secondaryClusterIds = clusterIds[1:]
secondaryClusterIdsJSON, err := json.Marshal(secondaryClusterIds)
if err != nil {
return err
}
err = this.Query(tx).
Pk(node.Id).
Set("clusterId", clusterId).
Set("secondaryClusterIds", secondaryClusterIdsJSON).
UpdateQuickly()
if err != nil {
return err
}
}
return nil
}
2021-04-13 20:01:21 +08:00
// NotifyUpdate 通知更新
func (this *NodeDAO) NotifyUpdate(tx *dbs.Tx, nodeId int64) error {
clusterId, err := this.FindNodeClusterId(tx, nodeId)
if err != nil {
return err
}
if clusterId > 0 {
2021-08-08 15:47:48 +08:00
return SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, NodeTaskTypeConfigChanged)
}
return nil
}
2021-01-27 23:00:02 +08:00
2021-04-13 20:01:21 +08:00
// NotifyDNSUpdate 通知DNS更新
2021-01-27 23:00:02 +08:00
func (this *NodeDAO) NotifyDNSUpdate(tx *dbs.Tx, nodeId int64) error {
2021-07-31 22:23:11 +08:00
clusterIds, err := this.FindEnabledAndOnNodeClusterIds(tx, nodeId)
2021-01-27 23:00:02 +08:00
if err != nil {
return err
}
2021-07-31 22:23:11 +08:00
for _, clusterId := range clusterIds {
2021-08-22 11:35:33 +08:00
dnsInfo, err := SharedNodeClusterDAO.FindClusterDNSInfo(tx, clusterId, nil)
2021-07-31 22:23:11 +08:00
if err != nil {
return err
}
if dnsInfo == nil {
continue
}
if len(dnsInfo.DnsName) == 0 || dnsInfo.DnsDomainId <= 0 {
continue
}
err = dns.SharedDNSTaskDAO.CreateNodeTask(tx, nodeId, dns.DNSTaskTypeNodeChange)
if err != nil {
return err
}
2021-01-27 23:00:02 +08:00
}
2021-07-31 22:23:11 +08:00
return nil
2021-01-27 23:00:02 +08:00
}