节点所属集群删除后,不再接收API请求

This commit is contained in:
刘祥超
2022-10-23 19:56:58 +08:00
parent 88dae56b6c
commit e21a3c5f8c
21 changed files with 1443 additions and 34 deletions

View File

@@ -77,7 +77,7 @@ func (this *ApiTokenDAO) FindEnabledTokenWithNodeCacheable(tx *dbs.Tx, nodeId st
State(ApiTokenStateEnabled).
Find()
if one != nil {
token := one.(*ApiToken)
token = one.(*ApiToken)
SharedCacheLocker.Lock()
apiTokenCacheMap[nodeId] = token
SharedCacheLocker.Unlock()

View File

@@ -11,6 +11,7 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
"github.com/TeaOSLab/EdgeAPI/internal/utils/sizes"
"github.com/TeaOSLab/EdgeAPI/internal/utils/ttlcache"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
@@ -35,8 +36,6 @@ const (
NodeStateDisabled = 0 // 已禁用
)
var nodeIdCacheMap = map[string]int64{} // uniqueId => nodeId
type NodeDAO dbs.DAO
func NewNodeDAO() *NodeDAO {
@@ -77,9 +76,7 @@ func (this *NodeDAO) DisableNode(tx *dbs.Tx, nodeId int64) (err error) {
return err
}
if len(uniqueId) > 0 {
SharedCacheLocker.Lock()
delete(nodeIdCacheMap, uniqueId)
SharedCacheLocker.Unlock()
ttlcache.SharedCache.Delete("nodeId@uniqueId@" + uniqueId)
}
_, err = this.Query(tx).
@@ -1279,36 +1276,39 @@ func (this *NodeDAO) UpdateNodeConnectedAPINodes(tx *dbs.Tx, nodeId int64, apiNo
// FindEnabledNodeIdWithUniqueId 根据UniqueId获取ID
func (this *NodeDAO) FindEnabledNodeIdWithUniqueId(tx *dbs.Tx, uniqueId string) (int64, error) {
return this.Query(tx).
State(NodeStateEnabled).
Attr("uniqueId", uniqueId).
ResultPk().
FindInt64Col(0)
}
// FindEnabledNodeIdWithUniqueIdCacheable 根据UniqueId获取ID并可以使用缓存
func (this *NodeDAO) FindEnabledNodeIdWithUniqueIdCacheable(tx *dbs.Tx, uniqueId string) (int64, error) {
SharedCacheLocker.RLock()
nodeId, ok := nodeIdCacheMap[uniqueId]
if ok {
SharedCacheLocker.RUnlock()
return nodeId, nil
var cacheKey = "nodeId@uniqueId@" + uniqueId
var item = ttlcache.SharedCache.Read(cacheKey)
if item != nil {
return types.Int64(item.Value), nil
}
SharedCacheLocker.RUnlock()
nodeId, err := this.Query(tx).
one, err := this.Query(tx).
State(NodeStateEnabled).
Attr("uniqueId", uniqueId).
ResultPk().
FindInt64Col(0)
Result("id", "clusterId").
Find()
if err != nil || one == nil {
return 0, err
}
// 检查集群
var node = one.(*Node)
var clusterId = int64(node.ClusterId)
if clusterId <= 0 {
return 0, nil
}
isOn, err := SharedNodeClusterDAO.CheckNodeClusterIsOn(tx, clusterId)
if err != nil {
return 0, err
}
if nodeId > 0 {
SharedCacheLocker.Lock()
nodeIdCacheMap[uniqueId] = nodeId
SharedCacheLocker.Unlock()
if !isOn {
return 0, nil
}
return nodeId, nil
ttlcache.SharedCache.Write(cacheKey, int64(node.Id), time.Now().Unix()+60)
return int64(node.Id), nil
}
// CountAllEnabledNodesWithGrantId 计算使用某个认证的节点数量

View File

@@ -77,3 +77,34 @@ func TestNodeDAO_ComposeNodeConfig_ParentNodes(t *testing.T) {
}
logs.PrintAsJSON(nodeConfig.ParentNodes, t)
}
func TestNodeDAO_FindEnabledNodeIdWithUniqueId(t *testing.T) {
dbs.NotifyReady()
var tx *dbs.Tx
// init
{
_, err := models.SharedNodeDAO.FindEnabledNodeIdWithUniqueId(tx, "a186380dbd26ccd49e75d178ec59df1b")
if err != nil {
t.Fatal(err)
}
}
var before = time.Now()
nodeId, err := models.SharedNodeDAO.FindEnabledNodeIdWithUniqueId(tx, "a186380dbd26ccd49e75d178ec59df1b")
if err != nil {
t.Fatal(err)
}
t.Log("cost:", time.Since(before).Seconds()*1000, "ms")
t.Log("nodeId:", nodeId)
{
before = time.Now()
nodeId, err := models.SharedNodeDAO.FindEnabledNodeIdWithUniqueId(tx, "a186380dbd26ccd49e75d178ec59df1b")
if err != nil {
t.Fatal(err)
}
t.Log("cost:", time.Since(before).Seconds()*1000, "ms")
t.Log("nodeId:", nodeId)
}
}

View File

@@ -11,7 +11,7 @@ func TestNodeTaskDAO_CreateNodeTask(t *testing.T) {
dbs.NotifyReady()
var tx *dbs.Tx
err := SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, 1, 2, 0, NodeTaskTypeConfigChanged, 0)
err := SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, 1, 2, 0, 0, NodeTaskTypeConfigChanged, 0)
if err != nil {
t.Fatal(err)
}
@@ -22,7 +22,7 @@ func TestNodeTaskDAO_CreateClusterTask(t *testing.T) {
dbs.NotifyReady()
var tx *dbs.Tx
err := SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, 1, 0, NodeTaskTypeConfigChanged)
err := SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, 1, 0, 0, NodeTaskTypeConfigChanged)
if err != nil {
t.Fatal(err)
}
@@ -33,7 +33,7 @@ func TestNodeTaskDAO_ExtractClusterTask(t *testing.T) {
dbs.NotifyReady()
var tx *dbs.Tx
err := SharedNodeTaskDAO.ExtractNodeClusterTask(tx, 1, 0, NodeTaskTypeConfigChanged)
err := SharedNodeTaskDAO.ExtractNodeClusterTask(tx, 1, 0, 0, NodeTaskTypeConfigChanged)
if err != nil {
t.Fatal(err)
}