2020-07-22 22:17:53 +08:00
package models
import (
2020-09-13 20:37:28 +08:00
"encoding/json"
2021-01-27 23:00:02 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/db/models/dns"
2020-10-04 14:27:14 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/errors"
2021-01-18 12:35:29 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/utils"
2021-01-17 16:48:00 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
2020-09-26 08:06:40 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
2021-05-12 21:39:08 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
2021-01-19 12:05:35 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs"
2020-07-22 22:17:53 +08:00
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
2021-07-19 17:58:16 +08:00
"github.com/iwind/TeaGo/lists"
2020-10-25 21:27:46 +08:00
"github.com/iwind/TeaGo/maps"
2020-07-24 09:17:48 +08:00
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
2020-09-26 08:06:40 +08:00
"strconv"
2021-01-17 16:48:00 +08:00
"strings"
2020-07-22 22:17:53 +08:00
)
const (
NodeStateEnabled = 1 // 已启用
NodeStateDisabled = 0 // 已禁用
)
2021-01-21 10:06:59 +08:00
var nodeIdCacheMap = map [ string ] int64 { } // uniqueId => nodeId
2020-07-22 22:17:53 +08:00
type NodeDAO dbs . DAO
func NewNodeDAO ( ) * NodeDAO {
return dbs . NewDAO ( & NodeDAO {
DAOObject : dbs . DAOObject {
DB : Tea . Env ,
Table : "edgeNodes" ,
Model : new ( Node ) ,
PkName : "id" ,
} ,
} ) . ( * NodeDAO )
}
2020-10-13 20:05:13 +08:00
var SharedNodeDAO * NodeDAO
func init ( ) {
dbs . OnReady ( func ( ) {
SharedNodeDAO = NewNodeDAO ( )
} )
}
2020-07-22 22:17:53 +08:00
2021-04-13 20:01:21 +08:00
// EnableNode 启用条目
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) EnableNode ( tx * dbs . Tx , id uint32 ) ( rowsAffected int64 , err error ) {
return this . Query ( tx ) .
2020-07-22 22:17:53 +08:00
Pk ( id ) .
Set ( "state" , NodeStateEnabled ) .
Update ( )
}
2021-04-13 20:01:21 +08:00
// DisableNode 禁用条目
2021-01-17 16:48:00 +08:00
func ( this * NodeDAO ) DisableNode ( tx * dbs . Tx , nodeId int64 ) ( err error ) {
2021-01-21 10:06:59 +08:00
// 删除缓存
uniqueId , err := this . Query ( tx ) .
Pk ( nodeId ) .
Result ( "uniqueId" ) .
FindStringCol ( "" )
if err != nil {
return err
}
if len ( uniqueId ) > 0 {
SharedCacheLocker . Lock ( )
delete ( nodeIdCacheMap , uniqueId )
SharedCacheLocker . Unlock ( )
}
2021-01-01 23:31:30 +08:00
_ , err = this . Query ( tx ) .
2021-01-17 16:48:00 +08:00
Pk ( nodeId ) .
2020-07-22 22:17:53 +08:00
Set ( "state" , NodeStateDisabled ) .
Update ( )
2021-01-17 16:48:00 +08:00
if err != nil {
return err
}
2021-01-27 23:00:02 +08:00
err = this . NotifyUpdate ( tx , nodeId )
if err != nil {
return err
}
err = this . NotifyDNSUpdate ( tx , nodeId )
if err != nil {
return err
}
return nil
2020-07-22 22:17:53 +08:00
}
2021-04-13 20:01:21 +08:00
// FindEnabledNode 查找启用中的条目
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) FindEnabledNode ( tx * dbs . Tx , id int64 ) ( * Node , error ) {
result , err := this . Query ( tx ) .
2020-07-22 22:17:53 +08:00
Pk ( id ) .
Attr ( "state" , NodeStateEnabled ) .
Find ( )
if result == nil {
return nil , err
}
return result . ( * Node ) , err
}
2021-04-13 20:01:21 +08:00
// FindNodeName 根据主键查找名称
2021-01-17 16:48:00 +08:00
func ( this * NodeDAO ) FindNodeName ( tx * dbs . Tx , id int64 ) ( string , error ) {
2021-01-01 23:31:30 +08:00
name , err := this . Query ( tx ) .
2020-07-22 22:17:53 +08:00
Pk ( id ) .
Result ( "name" ) .
FindCol ( "" )
return name . ( string ) , err
}
2020-07-24 09:17:48 +08:00
2021-04-13 20:01:21 +08:00
// CreateNode 创建节点
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) CreateNode ( tx * dbs . Tx , adminId int64 , name string , clusterId int64 , groupId int64 , regionId int64 ) ( nodeId int64 , err error ) {
2021-01-27 23:00:02 +08:00
uniqueId , err := this . GenUniqueId ( tx )
2020-08-21 12:32:33 +08:00
if err != nil {
return 0 , err
}
2020-09-06 16:19:54 +08:00
secret := rands . String ( 32 )
// 保存API Token
2021-05-26 14:40:05 +08:00
err = SharedApiTokenDAO . CreateAPIToken ( tx , uniqueId , secret , nodeconfigs . NodeRoleNode )
2020-09-06 16:19:54 +08:00
if err != nil {
return
}
2020-07-24 09:17:48 +08:00
op := NewNodeOperator ( )
2020-11-27 09:57:21 +08:00
op . AdminId = adminId
2020-07-24 09:17:48 +08:00
op . Name = name
2020-08-21 12:32:33 +08:00
op . UniqueId = uniqueId
2020-09-06 16:19:54 +08:00
op . Secret = secret
2020-07-24 09:17:48 +08:00
op . ClusterId = clusterId
2020-10-28 20:00:43 +08:00
op . GroupId = groupId
2020-12-10 16:11:35 +08:00
op . RegionId = regionId
2020-07-29 19:02:28 +08:00
op . IsOn = 1
2020-07-24 09:17:48 +08:00
op . State = NodeStateEnabled
2021-01-01 23:31:30 +08:00
err = this . Save ( tx , op )
2020-07-24 09:17:48 +08:00
if err != nil {
return 0 , err
}
2021-01-27 23:00:02 +08:00
// 通知节点更新
nodeId = types . Int64 ( op . Id )
err = this . NotifyUpdate ( tx , nodeId )
if err != nil {
return 0 , err
}
// 通知DNS更新
err = this . NotifyDNSUpdate ( tx , nodeId )
if err != nil {
return 0 , err
}
return nodeId , nil
2020-07-24 09:17:48 +08:00
}
2021-04-13 20:01:21 +08:00
// UpdateNode 修改节点
2021-07-31 22:23:11 +08:00
func ( this * NodeDAO ) UpdateNode ( tx * dbs . Tx , nodeId int64 , name string , clusterId int64 , secondaryClusterIds [ ] int64 , groupId int64 , regionId int64 , maxCPU int32 , isOn bool , maxCacheDiskCapacityJSON [ ] byte , maxCacheMemoryCapacityJSON [ ] byte ) error {
2020-07-30 22:41:49 +08:00
if nodeId <= 0 {
return errors . New ( "invalid nodeId" )
}
2020-07-24 09:17:48 +08:00
op := NewNodeOperator ( )
op . Id = nodeId
op . Name = name
op . ClusterId = clusterId
2021-07-31 22:23:11 +08:00
// 去重
var filteredSecondaryClusterIds = [ ] int64 { }
for _ , secondaryClusterId := range secondaryClusterIds {
if secondaryClusterId <= 0 {
continue
}
if lists . ContainsInt64 ( filteredSecondaryClusterIds , secondaryClusterId ) {
continue
}
filteredSecondaryClusterIds = append ( filteredSecondaryClusterIds , secondaryClusterId )
}
filteredSecondaryClusterIdsJSON , err := json . Marshal ( filteredSecondaryClusterIds )
if err != nil {
return err
}
op . SecondaryClusterIds = filteredSecondaryClusterIdsJSON
2020-10-28 20:00:43 +08:00
op . GroupId = groupId
2020-12-10 16:11:35 +08:00
op . RegionId = regionId
2020-08-21 12:32:33 +08:00
op . LatestVersion = dbs . SQL ( "latestVersion+1" )
2020-10-10 12:31:55 +08:00
op . MaxCPU = maxCPU
op . IsOn = isOn
2021-05-12 21:39:08 +08:00
if len ( maxCacheDiskCapacityJSON ) > 0 {
op . MaxCacheDiskCapacity = maxCacheDiskCapacityJSON
}
if len ( maxCacheMemoryCapacityJSON ) > 0 {
op . MaxCacheMemoryCapacity = maxCacheMemoryCapacityJSON
}
2021-07-31 22:23:11 +08:00
err = this . Save ( tx , op )
2020-08-21 12:32:33 +08:00
if err != nil {
2021-01-17 16:48:00 +08:00
return err
2020-08-21 12:32:33 +08:00
}
2021-01-17 16:48:00 +08:00
2021-01-27 23:00:02 +08:00
err = this . NotifyUpdate ( tx , nodeId )
if err != nil {
return err
}
return this . NotifyDNSUpdate ( tx , nodeId )
2020-08-21 12:32:33 +08:00
}
2021-04-13 20:01:21 +08:00
// CountAllEnabledNodes 计算所有节点数量
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) CountAllEnabledNodes ( tx * dbs . Tx ) ( int64 , error ) {
return this . Query ( tx ) .
2020-07-24 09:17:48 +08:00
State ( NodeStateEnabled ) .
2021-01-21 18:55:34 +08:00
Where ( "clusterId IN (SELECT id FROM " + SharedNodeClusterDAO . Table + " WHERE state=:clusterState)" ) .
Param ( "clusterState" , NodeClusterStateEnabled ) .
2020-07-24 09:17:48 +08:00
Count ( )
}
2021-04-13 20:01:21 +08:00
// ListEnabledNodesMatch 列出单页节点
2021-07-07 15:18:48 +08:00
func ( this * NodeDAO ) ListEnabledNodesMatch ( tx * dbs . Tx ,
clusterId int64 ,
installState configutils . BoolState ,
activeState configutils . BoolState ,
keyword string ,
groupId int64 ,
regionId int64 ,
2021-07-31 22:23:11 +08:00
includeSecondaryNodes bool ,
2021-07-07 15:18:48 +08:00
order string ,
offset int64 ,
size int64 ) ( result [ ] * Node , err error ) {
2021-01-01 23:31:30 +08:00
query := this . Query ( tx ) .
2020-07-24 09:17:48 +08:00
State ( NodeStateEnabled ) .
2020-07-29 19:02:28 +08:00
Offset ( offset ) .
Limit ( size ) .
2020-08-30 16:12:00 +08:00
Slice ( & result )
2020-09-13 20:37:28 +08:00
// 集群
2020-08-30 16:12:00 +08:00
if clusterId > 0 {
2021-07-31 22:23:11 +08:00
if includeSecondaryNodes {
query . Where ( "(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))" ) .
Param ( "primaryClusterId" , clusterId ) .
Param ( "primaryClusterIdString" , types . String ( clusterId ) )
} else {
query . Attr ( "clusterId" , clusterId )
}
2021-07-14 13:59:16 +08:00
} else {
query . Where ( "clusterId IN (SELECT id FROM " + SharedNodeClusterDAO . Table + " WHERE state=1)" )
2020-08-30 16:12:00 +08:00
}
2020-09-13 20:37:28 +08:00
// 安装状态
switch installState {
2020-09-26 08:06:40 +08:00
case configutils . BoolStateAll :
// 所有
case configutils . BoolStateYes :
2020-09-13 20:37:28 +08:00
query . Attr ( "isInstalled" , 1 )
2020-09-26 08:06:40 +08:00
case configutils . BoolStateNo :
2020-09-13 20:37:28 +08:00
query . Attr ( "isInstalled" , 0 )
}
2020-09-26 08:06:40 +08:00
// 在线状态
switch activeState {
case configutils . BoolStateAll :
// 所有
case configutils . BoolStateYes :
2020-09-28 16:25:39 +08:00
query . Where ( "JSON_EXTRACT(status, '$.isActive') AND UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')<=60" )
2020-09-26 08:06:40 +08:00
case configutils . BoolStateNo :
2020-09-28 16:25:39 +08:00
query . Where ( "(status IS NULL OR NOT JSON_EXTRACT(status, '$.isActive') OR UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>60)" )
2020-09-26 08:06:40 +08:00
}
2020-10-28 18:21:15 +08:00
// 关键词
if len ( keyword ) > 0 {
query . Where ( "(name LIKE :keyword OR JSON_EXTRACT(status,'$.hostname') LIKE :keyword OR id IN (SELECT nodeId FROM " + SharedNodeIPAddressDAO . Table + " WHERE ip LIKE :keyword))" ) .
Param ( "keyword" , "%" + keyword + "%" )
}
// 分组
if groupId > 0 {
query . Attr ( "groupId" , groupId )
}
2020-12-10 16:11:35 +08:00
// 区域
if regionId > 0 {
query . Attr ( "regionId" , regionId )
}
2021-07-07 15:18:48 +08:00
// 排序
switch order {
case "cpuAsc" :
query . Asc ( "IF(JSON_EXTRACT(status, '$.updatedAt')>UNIX_TIMESTAMP()-120, IFNULL(JSON_EXTRACT(status, '$.cpuUsage'), 0), 0)" )
case "cpuDesc" :
query . Desc ( "IF(JSON_EXTRACT(status, '$.updatedAt')>UNIX_TIMESTAMP()-120, IFNULL(JSON_EXTRACT(status, '$.cpuUsage'), 0), 0)" )
case "memoryAsc" :
query . Asc ( "IF(JSON_EXTRACT(status, '$.updatedAt')>UNIX_TIMESTAMP()-120, IFNULL(JSON_EXTRACT(status, '$.memoryUsage'), 0), 0)" )
case "memoryDesc" :
query . Desc ( "IF(JSON_EXTRACT(status, '$.updatedAt')>UNIX_TIMESTAMP()-120, IFNULL(JSON_EXTRACT(status, '$.memoryUsage'), 0), 0)" )
case "trafficInAsc" :
query . Asc ( "IF(JSON_EXTRACT(status, '$.updatedAt')>UNIX_TIMESTAMP()-120, IFNULL(JSON_EXTRACT(status, '$.trafficInBytes'), 0), 0)" )
case "trafficInDesc" :
query . Desc ( "IF(JSON_EXTRACT(status, '$.updatedAt')>UNIX_TIMESTAMP()-120, IFNULL(JSON_EXTRACT(status, '$.trafficInBytes'), 0), 0)" )
case "trafficOutAsc" :
query . Asc ( "IF(JSON_EXTRACT(status, '$.updatedAt')>UNIX_TIMESTAMP()-120, IFNULL(JSON_EXTRACT(status, '$.trafficOutBytes'), 0), 0)" )
case "trafficOutDesc" :
query . Desc ( "IF(JSON_EXTRACT(status, '$.updatedAt')>UNIX_TIMESTAMP()-120, IFNULL(JSON_EXTRACT(status, '$.trafficOutBytes'), 0), 0)" )
}
query . DescPk ( )
2020-08-30 16:12:00 +08:00
_ , err = query . FindAll ( )
2020-07-24 09:17:48 +08:00
return
}
2020-08-21 12:32:33 +08:00
2021-04-13 20:01:21 +08:00
// FindEnabledNodeWithUniqueIdAndSecret 根据节点ID和密钥查询节点
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) FindEnabledNodeWithUniqueIdAndSecret ( tx * dbs . Tx , uniqueId string , secret string ) ( * Node , error ) {
one , err := this . Query ( tx ) .
2020-08-21 12:32:33 +08:00
Attr ( "uniqueId" , uniqueId ) .
Attr ( "secret" , secret ) .
State ( NodeStateEnabled ) .
Find ( )
if one != nil {
return one . ( * Node ) , err
}
return nil , err
}
2021-04-13 20:01:21 +08:00
// FindEnabledNodeWithUniqueId 根据节点ID获取节点
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) FindEnabledNodeWithUniqueId ( tx * dbs . Tx , uniqueId string ) ( * Node , error ) {
one , err := this . Query ( tx ) .
2020-08-21 12:32:33 +08:00
Attr ( "uniqueId" , uniqueId ) .
State ( NodeStateEnabled ) .
Find ( )
if one != nil {
return one . ( * Node ) , err
}
return nil , err
}
2021-04-13 20:01:21 +08:00
// FindNodeClusterId 获取节点集群ID
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) FindNodeClusterId ( tx * dbs . Tx , nodeId int64 ) ( int64 , error ) {
col , err := this . Query ( tx ) .
2020-08-21 12:32:33 +08:00
Pk ( nodeId ) .
Result ( "clusterId" ) .
FindCol ( 0 )
return types . Int64 ( col ) , err
}
2021-07-31 22:23:11 +08:00
// FindEnabledAndOnNodeClusterIds 获取节点所属所有可用而且启用的集群ID
func ( this * NodeDAO ) FindEnabledAndOnNodeClusterIds ( tx * dbs . Tx , nodeId int64 ) ( result [ ] int64 , err error ) {
one , err := this . Query ( tx ) .
Pk ( nodeId ) .
Result ( "clusterId" , "secondaryClusterIds" ) .
Find ( )
if one == nil {
return nil , err
}
var clusterId = int64 ( one . ( * Node ) . ClusterId )
if clusterId > 0 {
result = append ( result , clusterId )
}
for _ , clusterId := range one . ( * Node ) . DecodeSecondaryClusterIds ( ) {
if lists . ContainsInt64 ( result , clusterId ) {
continue
}
// 检查是否启用
isOn , err := SharedNodeClusterDAO . CheckNodeClusterIsOn ( tx , clusterId )
if err != nil {
return nil , err
}
if ! isOn {
continue
}
result = append ( result , clusterId )
}
return
}
2021-04-13 20:01:21 +08:00
// FindAllNodeIdsMatch 匹配节点并返回节点ID
2021-07-31 22:23:11 +08:00
func ( this * NodeDAO ) FindAllNodeIdsMatch ( tx * dbs . Tx , clusterId int64 , includeSecondaryNodes bool , isOn configutils . BoolState ) ( result [ ] int64 , err error ) {
2021-01-01 23:31:30 +08:00
query := this . Query ( tx )
2020-08-21 12:32:33 +08:00
query . State ( NodeStateEnabled )
if clusterId > 0 {
2021-07-31 22:23:11 +08:00
if includeSecondaryNodes {
query . Where ( "(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))" ) .
Param ( "primaryClusterId" , clusterId ) .
Param ( "primaryClusterIdString" , types . String ( clusterId ) )
} else {
query . Attr ( "clusterId" , clusterId )
}
2021-07-14 13:59:16 +08:00
} else {
query . Where ( "clusterId IN (SELECT id FROM " + SharedNodeClusterDAO . Table + " WHERE state=1)" )
2020-08-21 12:32:33 +08:00
}
2021-01-17 16:48:00 +08:00
if isOn == configutils . BoolStateYes {
query . Attr ( "isOn" , true )
} else if isOn == configutils . BoolStateNo {
query . Attr ( "isOn" , false )
}
2020-08-21 12:32:33 +08:00
query . Result ( "id" )
ones , _ , err := query . FindOnes ( )
if err != nil {
return nil , err
}
for _ , one := range ones {
result = append ( result , one . GetInt64 ( "id" ) )
}
return
}
2021-04-13 20:01:21 +08:00
// FindAllEnabledNodesWithClusterId 获取一个集群的所有节点
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) FindAllEnabledNodesWithClusterId ( tx * dbs . Tx , clusterId int64 ) ( result [ ] * Node , err error ) {
_ , err = this . Query ( tx ) .
2020-10-04 14:27:14 +08:00
State ( NodeStateEnabled ) .
Attr ( "clusterId" , clusterId ) .
DescPk ( ) .
Slice ( & result ) .
FindAll ( )
return
}
2021-04-13 20:01:21 +08:00
// FindAllInactiveNodesWithClusterId 取得一个集群离线的节点
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) FindAllInactiveNodesWithClusterId ( tx * dbs . Tx , clusterId int64 ) ( result [ ] * Node , err error ) {
_ , err = this . Query ( tx ) .
2020-10-25 18:26:46 +08:00
State ( NodeStateEnabled ) .
Attr ( "clusterId" , clusterId ) .
Attr ( "isOn" , true ) . // 只监控启用的节点
2020-10-25 21:27:46 +08:00
Attr ( "isInstalled" , true ) . // 只监控已经安装的节点
2020-11-16 09:20:24 +08:00
Attr ( "isActive" , true ) . // 当前已经在线的
2020-10-25 18:26:46 +08:00
Where ( "(status IS NULL OR (JSON_EXTRACT(status, '$.isActive')=false AND UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>10) OR UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>120)" ) .
2021-04-13 20:01:21 +08:00
Result ( "id" , "name" ) .
2020-10-25 18:26:46 +08:00
Slice ( & result ) .
FindAll ( )
return
}
2021-04-13 20:01:21 +08:00
// CountAllEnabledNodesMatch 计算节点数量
2021-07-07 15:18:48 +08:00
func ( this * NodeDAO ) CountAllEnabledNodesMatch ( tx * dbs . Tx ,
clusterId int64 ,
installState configutils . BoolState ,
activeState configutils . BoolState ,
keyword string ,
groupId int64 ,
2021-07-31 22:23:11 +08:00
regionId int64 ,
includeSecondaryNodes bool ) ( int64 , error ) {
2021-01-01 23:31:30 +08:00
query := this . Query ( tx )
2020-08-30 16:12:00 +08:00
query . State ( NodeStateEnabled )
2020-09-13 20:37:28 +08:00
// 集群
2020-08-30 16:12:00 +08:00
if clusterId > 0 {
2021-07-31 22:23:11 +08:00
if includeSecondaryNodes {
query . Where ( "(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))" ) .
Param ( "primaryClusterId" , clusterId ) .
Param ( "primaryClusterIdString" , types . String ( clusterId ) )
} else {
query . Attr ( "clusterId" , clusterId )
}
2021-07-14 13:59:16 +08:00
} else {
query . Where ( "clusterId IN (SELECT id FROM " + SharedNodeClusterDAO . Table + " WHERE state=1)" )
2020-08-30 16:12:00 +08:00
}
2020-09-13 20:37:28 +08:00
// 安装状态
switch installState {
2020-09-26 08:06:40 +08:00
case configutils . BoolStateAll :
// 所有
case configutils . BoolStateYes :
2020-09-13 20:37:28 +08:00
query . Attr ( "isInstalled" , 1 )
2020-09-26 08:06:40 +08:00
case configutils . BoolStateNo :
2020-09-13 20:37:28 +08:00
query . Attr ( "isInstalled" , 0 )
}
2020-09-26 08:06:40 +08:00
// 在线状态
switch activeState {
case configutils . BoolStateAll :
// 所有
case configutils . BoolStateYes :
2020-09-28 16:25:39 +08:00
query . Where ( "JSON_EXTRACT(status, '$.isActive') AND UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')<=60" )
2020-09-26 08:06:40 +08:00
case configutils . BoolStateNo :
2020-09-28 16:25:39 +08:00
query . Where ( "(status IS NULL OR NOT JSON_EXTRACT(status, '$.isActive') OR UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>60)" )
2020-09-26 08:06:40 +08:00
}
2020-10-28 18:21:15 +08:00
// 关键词
if len ( keyword ) > 0 {
query . Where ( "(name LIKE :keyword OR JSON_EXTRACT(status,'$.hostname') LIKE :keyword OR id IN (SELECT nodeId FROM " + SharedNodeIPAddressDAO . Table + " WHERE ip LIKE :keyword))" ) .
Param ( "keyword" , "%" + keyword + "%" )
}
// 分组
if groupId > 0 {
query . Attr ( "groupId" , groupId )
}
2020-12-10 16:11:35 +08:00
// 区域
if regionId > 0 {
query . Attr ( "regionId" , regionId )
}
2020-08-30 16:12:00 +08:00
return query . Count ( )
}
2021-04-13 20:01:21 +08:00
// UpdateNodeStatus 更改节点状态
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) UpdateNodeStatus ( tx * dbs . Tx , nodeId int64 , statusJSON [ ] byte ) error {
_ , err := this . Query ( tx ) .
2020-08-21 12:32:33 +08:00
Pk ( nodeId ) .
Set ( "status" , string ( statusJSON ) ) .
Update ( )
return err
}
2021-06-10 19:21:45 +08:00
// FindNodeStatus 获取节点状态
func ( this * NodeDAO ) FindNodeStatus ( tx * dbs . Tx , nodeId int64 ) ( * nodeconfigs . NodeStatus , error ) {
statusJSONString , err := this . Query ( tx ) .
Pk ( nodeId ) .
Result ( "status" ) .
FindStringCol ( "" )
if err != nil {
return nil , err
}
if len ( statusJSONString ) == 0 {
return nil , nil
}
status := & nodeconfigs . NodeStatus { }
err = json . Unmarshal ( [ ] byte ( statusJSONString ) , status )
if err != nil {
return nil , err
}
return status , nil
}
2021-04-13 20:01:21 +08:00
// UpdateNodeIsActive 更改节点在线状态
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) UpdateNodeIsActive ( tx * dbs . Tx , nodeId int64 , isActive bool ) error {
2020-10-25 18:26:46 +08:00
b := "true"
if ! isActive {
b = "false"
}
2021-01-01 23:31:30 +08:00
_ , err := this . Query ( tx ) .
2020-10-25 18:26:46 +08:00
Pk ( nodeId ) .
Where ( "status IS NOT NULL" ) .
Set ( "status" , dbs . SQL ( "JSON_SET(status, '$.isActive', " + b + ")" ) ) .
Update ( )
return err
}
2021-04-13 20:01:21 +08:00
// UpdateNodeIsInstalled 设置节点安装状态
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) UpdateNodeIsInstalled ( tx * dbs . Tx , nodeId int64 , isInstalled bool ) error {
_ , err := this . Query ( tx ) .
2020-09-06 16:19:54 +08:00
Pk ( nodeId ) .
Set ( "isInstalled" , isInstalled ) .
2020-09-13 20:37:28 +08:00
Set ( "installStatus" , "null" ) . // 重置安装状态
Update ( )
return err
}
2021-04-13 20:01:21 +08:00
// FindNodeInstallStatus 查询节点的安装状态
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) FindNodeInstallStatus ( tx * dbs . Tx , nodeId int64 ) ( * NodeInstallStatus , error ) {
node , err := this . Query ( tx ) .
2020-09-13 20:37:28 +08:00
Pk ( nodeId ) .
2020-10-26 21:14:56 +08:00
Result ( "installStatus" , "isInstalled" ) .
Find ( )
2020-09-13 20:37:28 +08:00
if err != nil {
return nil , err
}
2020-10-26 21:14:56 +08:00
if node == nil {
return nil , errors . New ( "not found" )
}
2020-09-13 20:37:28 +08:00
2020-10-26 21:14:56 +08:00
installStatus := node . ( * Node ) . InstallStatus
isInstalled := node . ( * Node ) . IsInstalled == 1
2020-09-13 20:37:28 +08:00
if len ( installStatus ) == 0 {
return NewNodeInstallStatus ( ) , nil
}
status := & NodeInstallStatus { }
err = json . Unmarshal ( [ ] byte ( installStatus ) , status )
2020-10-26 21:14:56 +08:00
if err != nil {
return nil , err
}
if isInstalled {
status . IsFinished = true
status . IsOk = true
}
return status , nil
2020-09-13 20:37:28 +08:00
}
2021-04-13 20:01:21 +08:00
// UpdateNodeInstallStatus 修改节点的安装状态
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) UpdateNodeInstallStatus ( tx * dbs . Tx , nodeId int64 , status * NodeInstallStatus ) error {
2020-09-13 20:37:28 +08:00
if status == nil {
2021-01-01 23:31:30 +08:00
_ , err := this . Query ( tx ) .
2020-09-13 20:37:28 +08:00
Pk ( nodeId ) .
Set ( "installStatus" , "null" ) .
Update ( )
return err
}
data , err := json . Marshal ( status )
if err != nil {
return err
}
2021-01-01 23:31:30 +08:00
_ , err = this . Query ( tx ) .
2020-09-13 20:37:28 +08:00
Pk ( nodeId ) .
Set ( "installStatus" , string ( data ) ) .
2020-09-06 16:19:54 +08:00
Update ( )
return err
}
2021-04-13 20:01:21 +08:00
// ComposeNodeConfig 组合配置
2020-12-17 17:36:20 +08:00
// TODO 提升运行速度
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) ComposeNodeConfig ( tx * dbs . Tx , nodeId int64 ) ( * nodeconfigs . NodeConfig , error ) {
node , err := this . FindEnabledNode ( tx , nodeId )
2020-09-26 08:06:40 +08:00
if err != nil {
return nil , err
}
if node == nil {
return nil , errors . New ( "node not found '" + strconv . FormatInt ( nodeId , 10 ) + "'" )
}
config := & nodeconfigs . NodeConfig {
2020-12-11 17:28:20 +08:00
Id : int64 ( node . Id ) ,
NodeId : node . UniqueId ,
2021-07-18 15:52:34 +08:00
Secret : node . Secret ,
2020-12-11 17:28:20 +08:00
IsOn : node . IsOn == 1 ,
Servers : nil ,
Version : int64 ( node . Version ) ,
Name : node . Name ,
MaxCPU : types . Int32 ( node . MaxCPU ) ,
RegionId : int64 ( node . RegionId ) ,
2020-09-26 08:06:40 +08:00
}
// 获取所有的服务
2021-01-01 23:31:30 +08:00
servers , err := SharedServerDAO . FindAllEnabledServersWithNode ( tx , int64 ( node . Id ) )
2020-09-26 08:06:40 +08:00
if err != nil {
return nil , err
}
for _ , server := range servers {
if len ( server . Config ) == 0 {
continue
}
serverConfig := & serverconfigs . ServerConfig { }
err = json . Unmarshal ( [ ] byte ( server . Config ) , serverConfig )
if err != nil {
return nil , err
}
config . Servers = append ( config . Servers , serverConfig )
}
// 全局设置
// TODO 根据用户的不同读取不同的全局设置
2021-01-19 12:05:35 +08:00
settingJSON , err := SharedSysSettingDAO . ReadSetting ( tx , systemconfigs . SettingCodeServerGlobalConfig )
2020-09-26 08:06:40 +08:00
if err != nil {
return nil , err
}
if len ( settingJSON ) > 0 {
globalConfig := & serverconfigs . GlobalConfig { }
err = json . Unmarshal ( settingJSON , globalConfig )
if err != nil {
return nil , err
}
config . GlobalConfig = globalConfig
}
2020-12-17 17:36:20 +08:00
// WAF
clusterId := int64 ( node . ClusterId )
2021-01-01 23:31:30 +08:00
httpFirewallPolicyId , err := SharedNodeClusterDAO . FindClusterHTTPFirewallPolicyId ( tx , clusterId )
2020-12-17 17:36:20 +08:00
if err != nil {
return nil , err
}
if httpFirewallPolicyId > 0 {
2021-01-01 23:31:30 +08:00
firewallPolicy , err := SharedHTTPFirewallPolicyDAO . ComposeFirewallPolicy ( tx , httpFirewallPolicyId )
2020-12-17 17:36:20 +08:00
if err != nil {
return nil , err
}
if firewallPolicy != nil {
config . HTTPFirewallPolicy = firewallPolicy
}
}
// 缓存策略
2021-01-01 23:31:30 +08:00
httpCachePolicyId , err := SharedNodeClusterDAO . FindClusterHTTPCachePolicyId ( tx , clusterId )
2020-12-17 17:36:20 +08:00
if err != nil {
return nil , err
}
if httpCachePolicyId > 0 {
2021-01-01 23:31:30 +08:00
cachePolicy , err := SharedHTTPCachePolicyDAO . ComposeCachePolicy ( tx , httpCachePolicyId )
2020-12-17 17:36:20 +08:00
if err != nil {
return nil , err
}
if cachePolicy != nil {
config . HTTPCachePolicy = cachePolicy
}
}
2021-05-12 21:39:08 +08:00
// 缓存最大容量设置
if len ( node . MaxCacheDiskCapacity ) > 0 {
capacity := & shared . SizeCapacity { }
err = json . Unmarshal ( [ ] byte ( node . MaxCacheDiskCapacity ) , capacity )
if err != nil {
return nil , err
}
if capacity . Count > 0 {
config . MaxCacheDiskCapacity = capacity
}
}
if len ( node . MaxCacheMemoryCapacity ) > 0 {
capacity := & shared . SizeCapacity { }
err = json . Unmarshal ( [ ] byte ( node . MaxCacheMemoryCapacity ) , capacity )
if err != nil {
return nil , err
}
if capacity . Count > 0 {
config . MaxCacheMemoryCapacity = capacity
}
}
2020-12-02 14:26:03 +08:00
// TOA
2021-01-01 23:31:30 +08:00
toaConfig , err := SharedNodeClusterDAO . FindClusterTOAConfig ( tx , clusterId )
2020-12-02 14:26:03 +08:00
if err != nil {
return nil , err
}
config . TOA = toaConfig
2021-01-11 18:16:04 +08:00
// 系统服务
services , err := SharedNodeClusterDAO . FindNodeClusterSystemServices ( tx , clusterId )
if err != nil {
return nil , err
}
if len ( services ) > 0 {
config . SystemServices = services
}
2021-02-06 17:38:04 +08:00
// 防火墙动作
actions , err := SharedNodeClusterFirewallActionDAO . FindAllEnabledFirewallActions ( tx , clusterId )
if err != nil {
return nil , err
}
for _ , action := range actions {
actionConfig , err := SharedNodeClusterFirewallActionDAO . ComposeFirewallActionConfig ( tx , action )
if err != nil {
return nil , err
}
if actionConfig != nil {
config . FirewallActions = append ( config . FirewallActions , actionConfig )
}
}
2021-07-19 17:58:16 +08:00
// 集群指标
2021-06-30 19:59:49 +08:00
metricItemIds , err := SharedNodeClusterMetricItemDAO . FindAllClusterItemIds ( tx , int64 ( node . ClusterId ) )
if err != nil {
return nil , err
}
var metricItems = [ ] * serverconfigs . MetricItemConfig { }
for _ , itemId := range metricItemIds {
itemConfig , err := SharedMetricItemDAO . ComposeItemConfig ( tx , itemId )
if err != nil {
return nil , err
}
if itemConfig != nil {
metricItems = append ( metricItems , itemConfig )
}
}
2021-07-19 17:58:16 +08:00
// 公用指标
publicMetricItems , err := SharedMetricItemDAO . FindAllPublicItems ( tx )
if err != nil {
return nil , err
}
for _ , item := range publicMetricItems {
itemConfig := SharedMetricItemDAO . ComposeItemConfigWithItem ( item )
if itemConfig != nil && ! lists . ContainsInt64 ( metricItemIds , itemConfig . Id ) {
metricItems = append ( metricItems , itemConfig )
}
}
2021-06-30 19:59:49 +08:00
config . MetricItems = metricItems
2021-06-27 21:59:37 +08:00
2020-09-26 08:06:40 +08:00
return config , nil
}
2021-04-13 20:01:21 +08:00
// UpdateNodeConnectedAPINodes 修改当前连接的API节点
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) UpdateNodeConnectedAPINodes ( tx * dbs . Tx , nodeId int64 , apiNodeIds [ ] int64 ) error {
2020-10-04 14:27:14 +08:00
if nodeId <= 0 {
return errors . New ( "invalid nodeId" )
}
op := NewNodeOperator ( )
op . Id = nodeId
if len ( apiNodeIds ) > 0 {
apiNodeIdsJSON , err := json . Marshal ( apiNodeIds )
if err != nil {
return errors . Wrap ( err )
}
op . ConnectedAPINodes = apiNodeIdsJSON
} else {
op . ConnectedAPINodes = "[]"
}
2021-01-01 23:31:30 +08:00
err := this . Save ( tx , op )
2020-10-04 14:27:14 +08:00
return err
}
2021-04-13 20:01:21 +08:00
// FindEnabledNodeIdWithUniqueId 根据UniqueId获取ID
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) FindEnabledNodeIdWithUniqueId ( tx * dbs . Tx , uniqueId string ) ( int64 , error ) {
return this . Query ( tx ) .
2020-10-14 18:44:34 +08:00
State ( NodeStateEnabled ) .
Attr ( "uniqueId" , uniqueId ) .
ResultPk ( ) .
FindInt64Col ( 0 )
}
2021-04-13 20:01:21 +08:00
// FindEnabledNodeIdWithUniqueIdCacheable 根据UniqueId获取ID, 并可以使用缓存
2021-01-21 10:06:59 +08:00
func ( this * NodeDAO ) FindEnabledNodeIdWithUniqueIdCacheable ( tx * dbs . Tx , uniqueId string ) ( int64 , error ) {
SharedCacheLocker . RLock ( )
nodeId , ok := nodeIdCacheMap [ uniqueId ]
if ok {
SharedCacheLocker . RUnlock ( )
return nodeId , nil
}
SharedCacheLocker . RUnlock ( )
nodeId , err := this . Query ( tx ) .
State ( NodeStateEnabled ) .
Attr ( "uniqueId" , uniqueId ) .
ResultPk ( ) .
FindInt64Col ( 0 )
if err != nil {
return 0 , err
}
if nodeId > 0 {
SharedCacheLocker . Lock ( )
nodeIdCacheMap [ uniqueId ] = nodeId
SharedCacheLocker . Unlock ( )
}
return nodeId , nil
}
2021-04-13 20:01:21 +08:00
// CountAllEnabledNodesWithGrantId 计算使用某个认证的节点数量
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) CountAllEnabledNodesWithGrantId ( tx * dbs . Tx , grantId int64 ) ( int64 , error ) {
return this . Query ( tx ) .
2020-10-25 21:27:46 +08:00
State ( NodeStateEnabled ) .
Where ( "id IN (SELECT nodeId FROM edgeNodeLogins WHERE type='ssh' AND JSON_CONTAINS(params, :grantParam))" ) .
Param ( "grantParam" , string ( maps . Map { "grantId" : grantId } . AsJSON ( ) ) ) .
2020-11-27 12:05:49 +08:00
Where ( "clusterId IN (SELECT id FROM edgeNodeClusters WHERE state=1)" ) .
2020-10-25 21:27:46 +08:00
Count ( )
}
2021-04-13 20:01:21 +08:00
// FindAllEnabledNodesWithGrantId 查找使用某个认证的所有节点
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) FindAllEnabledNodesWithGrantId ( tx * dbs . Tx , grantId int64 ) ( result [ ] * Node , err error ) {
_ , err = this . Query ( tx ) .
2020-10-25 21:27:46 +08:00
State ( NodeStateEnabled ) .
Where ( "id IN (SELECT nodeId FROM edgeNodeLogins WHERE type='ssh' AND JSON_CONTAINS(params, :grantParam))" ) .
Param ( "grantParam" , string ( maps . Map { "grantId" : grantId } . AsJSON ( ) ) ) .
2020-11-27 12:05:49 +08:00
Where ( "clusterId IN (SELECT id FROM edgeNodeClusters WHERE state=1)" ) .
2020-10-25 21:27:46 +08:00
Slice ( & result ) .
DescPk ( ) .
FindAll ( )
return
}
2021-04-13 20:01:21 +08:00
// CountAllNotInstalledNodesWithClusterId 计算未安装的节点数量
2021-01-31 16:02:45 +08:00
func ( this * NodeDAO ) CountAllNotInstalledNodesWithClusterId ( tx * dbs . Tx , clusterId int64 ) ( int64 , error ) {
return this . Query ( tx ) .
State ( NodeStateEnabled ) .
Attr ( "clusterId" , clusterId ) .
Attr ( "isInstalled" , false ) .
Count ( )
}
2021-04-13 20:01:21 +08:00
// FindAllNotInstalledNodesWithClusterId 查找所有未安装的节点
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) FindAllNotInstalledNodesWithClusterId ( tx * dbs . Tx , clusterId int64 ) ( result [ ] * Node , err error ) {
_ , err = this . Query ( tx ) .
2020-10-26 21:14:56 +08:00
State ( NodeStateEnabled ) .
Attr ( "clusterId" , clusterId ) .
Attr ( "isInstalled" , false ) .
DescPk ( ) .
Slice ( & result ) .
FindAll ( )
return
}
2021-05-11 22:48:04 +08:00
// CountAllLowerVersionNodesWithClusterId 计算单个集群中所有低于某个版本的节点数量
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) CountAllLowerVersionNodesWithClusterId ( tx * dbs . Tx , clusterId int64 , os string , arch string , version string ) ( int64 , error ) {
return this . Query ( tx ) .
2020-11-27 10:41:19 +08:00
State ( NodeStateEnabled ) .
Attr ( "clusterId" , clusterId ) .
Where ( "status IS NOT NULL" ) .
Where ( "JSON_EXTRACT(status, '$.os')=:os" ) .
Where ( "JSON_EXTRACT(status, '$.arch')=:arch" ) .
2021-01-18 12:35:29 +08:00
Where ( "(JSON_EXTRACT(status, '$.buildVersionCode') IS NULL OR JSON_EXTRACT(status, '$.buildVersionCode')<:version)" ) .
2020-11-27 10:41:19 +08:00
Param ( "os" , os ) .
Param ( "arch" , arch ) .
2021-01-18 12:35:29 +08:00
Param ( "version" , utils . VersionToLong ( version ) ) .
2020-11-27 10:41:19 +08:00
Count ( )
}
2021-05-11 22:48:04 +08:00
// FindAllLowerVersionNodesWithClusterId 查找单个集群中所有低于某个版本的节点
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) FindAllLowerVersionNodesWithClusterId ( tx * dbs . Tx , clusterId int64 , os string , arch string , version string ) ( result [ ] * Node , err error ) {
_ , err = this . Query ( tx ) .
2020-10-28 12:35:36 +08:00
State ( NodeStateEnabled ) .
Attr ( "clusterId" , clusterId ) .
Where ( "status IS NOT NULL" ) .
Where ( "JSON_EXTRACT(status, '$.os')=:os" ) .
Where ( "JSON_EXTRACT(status, '$.arch')=:arch" ) .
2021-01-18 12:35:29 +08:00
Where ( "(JSON_EXTRACT(status, '$.buildVersionCode') IS NULL OR JSON_EXTRACT(status, '$.buildVersionCode')<:version)" ) .
2020-10-28 12:35:36 +08:00
Param ( "os" , os ) .
Param ( "arch" , arch ) .
2021-01-18 12:35:29 +08:00
Param ( "version" , utils . VersionToLong ( version ) ) .
2020-10-28 12:35:36 +08:00
DescPk ( ) .
Slice ( & result ) .
FindAll ( )
return
}
2021-05-11 22:48:04 +08:00
// CountAllLowerVersionNodes 计算所有集群中低于某个版本的节点数量
func ( this * NodeDAO ) CountAllLowerVersionNodes ( tx * dbs . Tx , version string ) ( int64 , error ) {
return this . Query ( tx ) .
State ( NodeStateEnabled ) .
Where ( "clusterId IN (SELECT id FROM " + SharedNodeClusterDAO . Table + " WHERE state=1)" ) .
Where ( "status IS NOT NULL" ) .
Where ( "(JSON_EXTRACT(status, '$.buildVersionCode') IS NULL OR JSON_EXTRACT(status, '$.buildVersionCode')<:version)" ) .
Param ( "version" , utils . VersionToLong ( version ) ) .
Count ( )
}
2021-04-13 20:01:21 +08:00
// CountAllEnabledNodesWithGroupId 查找某个节点分组下的所有节点数量
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) CountAllEnabledNodesWithGroupId ( tx * dbs . Tx , groupId int64 ) ( int64 , error ) {
return this . Query ( tx ) .
2020-10-28 18:21:15 +08:00
State ( NodeStateEnabled ) .
Attr ( "groupId" , groupId ) .
Count ( )
}
2021-04-13 20:01:21 +08:00
// CountAllEnabledNodesWithRegionId 查找某个节点区域下的所有节点数量
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) CountAllEnabledNodesWithRegionId ( tx * dbs . Tx , regionId int64 ) ( int64 , error ) {
return this . Query ( tx ) .
2020-12-10 15:03:03 +08:00
State ( NodeStateEnabled ) .
Attr ( "regionId" , regionId ) .
2021-02-06 19:40:53 +08:00
Where ( "clusterId IN (SELECT id FROM " + SharedNodeClusterDAO . Table + " WHERE state=1)" ) .
2020-12-10 15:03:03 +08:00
Count ( )
}
2021-04-13 20:01:21 +08:00
// FindAllEnabledNodesDNSWithClusterId 获取一个集群的节点DNS信息
2021-07-31 22:23:11 +08:00
func ( this * NodeDAO ) FindAllEnabledNodesDNSWithClusterId ( tx * dbs . Tx , clusterId int64 , includeSecondaryNodes bool ) ( result [ ] * Node , err error ) {
if clusterId <= 0 {
return nil , nil
}
var query = this . Query ( tx )
if includeSecondaryNodes {
query . Where ( "(clusterId=:primaryClusterId OR JSON_CONTAINS(secondaryClusterIds, :primaryClusterIdString))" ) .
Param ( "primaryClusterId" , clusterId ) .
Param ( "primaryClusterIdString" , types . String ( clusterId ) )
} else {
query . Attr ( "clusterId" , clusterId )
}
_ , err = query .
2020-11-14 09:41:58 +08:00
State ( NodeStateEnabled ) .
Attr ( "isOn" , true ) .
2020-11-15 21:17:42 +08:00
Attr ( "isUp" , true ) .
2020-11-14 21:28:07 +08:00
Result ( "id" , "name" , "dnsRoutes" , "isOn" ) .
2020-11-14 09:41:58 +08:00
DescPk ( ) .
Slice ( & result ) .
FindAll ( )
return
}
2021-04-13 20:01:21 +08:00
// CountAllEnabledNodesDNSWithClusterId 计算一个集群的节点DNS数量
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) CountAllEnabledNodesDNSWithClusterId ( tx * dbs . Tx , clusterId int64 ) ( result int64 , err error ) {
return this . Query ( tx ) .
2020-12-23 16:49:47 +08:00
State ( NodeStateEnabled ) .
Attr ( "clusterId" , clusterId ) .
Attr ( "isOn" , true ) .
Attr ( "isUp" , true ) .
Result ( "id" , "name" , "dnsRoutes" , "isOn" ) .
DescPk ( ) .
Slice ( & result ) .
Count ( )
}
2021-04-13 20:01:21 +08:00
// FindEnabledNodeDNS 获取单个节点的DNS信息
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) FindEnabledNodeDNS ( tx * dbs . Tx , nodeId int64 ) ( * Node , error ) {
one , err := this . Query ( tx ) .
2020-11-14 09:41:58 +08:00
State ( NodeStateEnabled ) .
Pk ( nodeId ) .
2020-12-03 18:19:22 +08:00
Result ( "id" , "name" , "dnsRoutes" , "clusterId" , "isOn" ) .
2020-11-14 09:41:58 +08:00
Find ( )
2021-07-31 22:23:11 +08:00
if one == nil {
2020-11-14 09:41:58 +08:00
return nil , err
}
return one . ( * Node ) , nil
}
2021-04-13 20:01:21 +08:00
// FindStatelessNodeDNS 获取单个节点的DNS信息, 无论什么状态
2021-01-27 23:00:02 +08:00
func ( this * NodeDAO ) FindStatelessNodeDNS ( tx * dbs . Tx , nodeId int64 ) ( * Node , error ) {
one , err := this . Query ( tx ) .
Pk ( nodeId ) .
Result ( "id" , "name" , "dnsRoutes" , "clusterId" , "isOn" , "state" ) .
Find ( )
if err != nil || one == nil {
return nil , err
}
return one . ( * Node ) , nil
}
2021-04-13 20:01:21 +08:00
// UpdateNodeDNS 修改节点的DNS信息
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) UpdateNodeDNS ( tx * dbs . Tx , nodeId int64 , routes map [ int64 ] [ ] string ) error {
2020-11-14 09:41:58 +08:00
if nodeId <= 0 {
return errors . New ( "invalid nodeId" )
}
if routes == nil {
2020-11-16 13:03:20 +08:00
routes = map [ int64 ] [ ] string { }
2020-11-14 09:41:58 +08:00
}
routesJSON , err := json . Marshal ( routes )
if err != nil {
return err
}
op := NewNodeOperator ( )
op . Id = nodeId
op . DnsRoutes = routesJSON
2021-01-01 23:31:30 +08:00
err = this . Save ( tx , op )
2021-01-17 16:48:00 +08:00
if err != nil {
return err
}
2021-01-27 23:00:02 +08:00
err = this . NotifyUpdate ( tx , nodeId )
if err != nil {
return err
}
err = this . NotifyDNSUpdate ( tx , nodeId )
if err != nil {
return err
}
return nil
2020-11-14 09:41:58 +08:00
}
2021-04-13 20:01:21 +08:00
// UpdateNodeUpCount 计算节点上线|下线状态
2021-02-24 19:26:59 +08:00
func ( this * NodeDAO ) UpdateNodeUpCount ( tx * dbs . Tx , nodeId int64 , isUp bool , maxUp int , maxDown int ) ( changed bool , err error ) {
2020-11-15 21:17:42 +08:00
if nodeId <= 0 {
return false , errors . New ( "invalid nodeId" )
}
2021-01-01 23:31:30 +08:00
one , err := this . Query ( tx ) .
2020-11-15 21:17:42 +08:00
Pk ( nodeId ) .
Result ( "isUp" , "countUp" , "countDown" ) .
Find ( )
if err != nil {
return false , err
}
if one == nil {
return false , nil
}
oldIsUp := one . ( * Node ) . IsUp == 1
// 如果新老状态一致,则不做任何事情
if oldIsUp == isUp {
2020-11-16 09:20:24 +08:00
return false , nil
2020-11-15 21:17:42 +08:00
}
countUp := int ( one . ( * Node ) . CountUp )
countDown := int ( one . ( * Node ) . CountDown )
op := NewNodeOperator ( )
op . Id = nodeId
if isUp {
countUp ++
countDown = 0
if countUp >= maxUp {
changed = true
op . IsUp = true
}
} else {
countDown ++
countUp = 0
if countDown >= maxDown {
changed = true
op . IsUp = false
}
}
op . CountUp = countUp
op . CountDown = countDown
2021-01-01 23:31:30 +08:00
err = this . Save ( tx , op )
2020-11-15 21:17:42 +08:00
if err != nil {
return false , err
}
2021-01-17 16:48:00 +08:00
2021-01-27 23:00:02 +08:00
err = this . NotifyDNSUpdate ( tx , nodeId )
if err != nil {
return false , err
}
2020-11-15 21:17:42 +08:00
return
}
2021-04-13 20:01:21 +08:00
// UpdateNodeUp 设置节点上下线状态
2021-02-24 19:26:59 +08:00
func ( this * NodeDAO ) UpdateNodeUp ( tx * dbs . Tx , nodeId int64 , isUp bool ) error {
if nodeId <= 0 {
return errors . New ( "invalid nodeId" )
}
op := NewNodeOperator ( )
op . Id = nodeId
op . IsUp = isUp
op . CountDown = 0
op . CountDown = 0
err := this . Save ( tx , op )
if err != nil {
return err
}
return this . NotifyDNSUpdate ( tx , nodeId )
}
2021-04-13 20:01:21 +08:00
// UpdateNodeActive 修改节点活跃状态
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) UpdateNodeActive ( tx * dbs . Tx , nodeId int64 , isActive bool ) error {
2020-11-16 09:20:24 +08:00
if nodeId <= 0 {
return errors . New ( "invalid nodeId" )
}
2021-01-01 23:31:30 +08:00
_ , err := this . Query ( tx ) .
2020-11-16 09:20:24 +08:00
Pk ( nodeId ) .
Set ( "isActive" , isActive ) .
Update ( )
return err
}
2021-04-13 20:01:21 +08:00
// FindNodeActive 检查节点活跃状态
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) FindNodeActive ( tx * dbs . Tx , nodeId int64 ) ( bool , error ) {
isActive , err := this . Query ( tx ) .
2020-11-16 09:20:24 +08:00
Pk ( nodeId ) .
Result ( "isActive" ) .
FindIntCol ( 0 )
if err != nil {
return false , err
}
return isActive == 1 , nil
}
2021-04-13 20:01:21 +08:00
// FindNodeVersion 查找节点的版本号
2021-01-01 23:31:30 +08:00
func ( this * NodeDAO ) FindNodeVersion ( tx * dbs . Tx , nodeId int64 ) ( int64 , error ) {
return this . Query ( tx ) .
2020-12-02 14:26:03 +08:00
Pk ( nodeId ) .
Result ( "version" ) .
FindInt64Col ( 0 )
}
2021-04-13 20:01:21 +08:00
// GenUniqueId 生成唯一ID
2021-01-27 23:00:02 +08:00
func ( this * NodeDAO ) GenUniqueId ( tx * dbs . Tx ) ( string , error ) {
2020-08-21 12:32:33 +08:00
for {
uniqueId := rands . HexString ( 32 )
2021-01-01 23:31:30 +08:00
ok , err := this . Query ( tx ) .
2020-08-21 12:32:33 +08:00
Attr ( "uniqueId" , uniqueId ) .
Exist ( )
if err != nil {
return "" , err
}
if ok {
continue
}
return uniqueId , nil
}
}
2021-01-17 16:48:00 +08:00
2021-04-13 20:01:21 +08:00
// FindEnabledNodesWithIds 根据一组ID查找一组节点
2021-01-17 16:48:00 +08:00
func ( this * NodeDAO ) FindEnabledNodesWithIds ( tx * dbs . Tx , nodeIds [ ] int64 ) ( result [ ] * Node , err error ) {
if len ( nodeIds ) == 0 {
return nil , nil
}
idStrings := [ ] string { }
for _ , nodeId := range nodeIds {
idStrings = append ( idStrings , numberutils . FormatInt64 ( nodeId ) )
}
_ , err = this . Query ( tx ) .
State ( NodeStateEnabled ) .
Where ( "id IN (" + strings . Join ( idStrings , ", " ) + ")" ) .
Result ( "id" , "connectedAPINodes" , "isActive" , "isOn" ) .
Slice ( & result ) .
Reuse ( false ) .
FindAll ( )
return
}
2021-07-31 22:23:11 +08:00
// DeleteNodeFromCluster 从集群中删除节点
func ( this * NodeDAO ) DeleteNodeFromCluster ( tx * dbs . Tx , nodeId int64 , clusterId int64 ) error {
one , err := this . Query ( tx ) .
Pk ( nodeId ) .
Result ( "clusterId" , "secondaryClusterIds" ) .
Find ( )
if err != nil {
return err
}
if one == nil {
return nil
}
var node = one . ( * Node )
var secondaryClusterIds = [ ] int64 { }
for _ , secondaryClusterId := range node . DecodeSecondaryClusterIds ( ) {
if secondaryClusterId == clusterId {
continue
}
secondaryClusterIds = append ( secondaryClusterIds , secondaryClusterId )
}
var newClusterId = int64 ( node . ClusterId )
if newClusterId == clusterId {
newClusterId = 0
// 选择一个从集群作为主集群
if len ( secondaryClusterIds ) > 0 {
newClusterId = secondaryClusterIds [ 0 ]
secondaryClusterIds = secondaryClusterIds [ 1 : ]
}
}
secondaryClusterIdsJSON , err := json . Marshal ( secondaryClusterIds )
if err != nil {
return err
}
op := NewNodeOperator ( )
op . Id = nodeId
op . ClusterId = newClusterId
op . SecondaryClusterIds = secondaryClusterIdsJSON
return this . Save ( tx , op )
}
// TransferPrimaryClusterNodes 自动转移集群下的节点
func ( this * NodeDAO ) TransferPrimaryClusterNodes ( tx * dbs . Tx , primaryClusterId int64 ) error {
if primaryClusterId <= 0 {
return nil
}
ones , err := this . Query ( tx ) .
Attr ( "clusterId" , primaryClusterId ) .
Result ( "id" , "secondaryClusterIds" ) .
State ( NodeStateEnabled ) .
FindAll ( )
if err != nil {
return err
}
for _ , one := range ones {
var node = one . ( * Node )
clusterIds := node . DecodeSecondaryClusterIds ( )
if len ( clusterIds ) == 0 {
continue
}
var clusterId = clusterIds [ 0 ]
var secondaryClusterIds = clusterIds [ 1 : ]
secondaryClusterIdsJSON , err := json . Marshal ( secondaryClusterIds )
if err != nil {
return err
}
err = this . Query ( tx ) .
Pk ( node . Id ) .
Set ( "clusterId" , clusterId ) .
Set ( "secondaryClusterIds" , secondaryClusterIdsJSON ) .
UpdateQuickly ( )
if err != nil {
return err
}
}
return nil
}
2021-04-13 20:01:21 +08:00
// NotifyUpdate 通知更新
2021-01-17 16:48:00 +08:00
func ( this * NodeDAO ) NotifyUpdate ( tx * dbs . Tx , nodeId int64 ) error {
clusterId , err := this . FindNodeClusterId ( tx , nodeId )
if err != nil {
return err
}
if clusterId > 0 {
return SharedNodeTaskDAO . CreateNodeTask ( tx , clusterId , nodeId , NodeTaskTypeConfigChanged )
}
return nil
}
2021-01-27 23:00:02 +08:00
2021-04-13 20:01:21 +08:00
// NotifyDNSUpdate 通知DNS更新
2021-01-27 23:00:02 +08:00
func ( this * NodeDAO ) NotifyDNSUpdate ( tx * dbs . Tx , nodeId int64 ) error {
2021-07-31 22:23:11 +08:00
clusterIds , err := this . FindEnabledAndOnNodeClusterIds ( tx , nodeId )
2021-01-27 23:00:02 +08:00
if err != nil {
return err
}
2021-07-31 22:23:11 +08:00
for _ , clusterId := range clusterIds {
dnsInfo , err := SharedNodeClusterDAO . FindClusterDNSInfo ( tx , clusterId )
if err != nil {
return err
}
if dnsInfo == nil {
continue
}
if len ( dnsInfo . DnsName ) == 0 || dnsInfo . DnsDomainId <= 0 {
continue
}
err = dns . SharedDNSTaskDAO . CreateNodeTask ( tx , nodeId , dns . DNSTaskTypeNodeChange )
if err != nil {
return err
}
2021-01-27 23:00:02 +08:00
}
2021-07-31 22:23:11 +08:00
return nil
2021-01-27 23:00:02 +08:00
}