优化边缘节点在线状态管理

This commit is contained in:
刘祥超
2021-11-20 18:59:35 +08:00
parent 311751596c
commit c1b7cf11c3
5 changed files with 63 additions and 9 deletions

View File

@@ -13,7 +13,7 @@ import (
var sharedAPIConfig *APIConfig = nil var sharedAPIConfig *APIConfig = nil
var PaddingId string var PaddingId string
// API节点配置 // APIConfig API节点配置
type APIConfig struct { type APIConfig struct {
NodeId string `yaml:"nodeId" json:"nodeId"` NodeId string `yaml:"nodeId" json:"nodeId"`
Secret string `yaml:"secret" json:"secret"` Secret string `yaml:"secret" json:"secret"`
@@ -21,7 +21,7 @@ type APIConfig struct {
numberId int64 // 数字ID numberId int64 // 数字ID
} }
// 获取共享配置 // SharedAPIConfig 获取共享配置
func SharedAPIConfig() (*APIConfig, error) { func SharedAPIConfig() (*APIConfig, error) {
sharedLocker.Lock() sharedLocker.Lock()
defer sharedLocker.Unlock() defer sharedLocker.Unlock()
@@ -96,18 +96,19 @@ func SharedAPIConfig() (*APIConfig, error) {
return config, nil return config, nil
} }
// 设置数字ID // SetNumberId 设置数字ID
func (this *APIConfig) SetNumberId(numberId int64) { func (this *APIConfig) SetNumberId(numberId int64) {
this.numberId = numberId this.numberId = numberId
teaconst.NodeId = numberId
PaddingId = fmt.Sprintf("%08d", numberId) PaddingId = fmt.Sprintf("%08d", numberId)
} }
// 获取数字ID // NumberId 获取数字ID
func (this *APIConfig) NumberId() int64 { func (this *APIConfig) NumberId() int64 {
return this.numberId return this.numberId
} }
// 保存到文件 // WriteFile 保存到文件
func (this *APIConfig) WriteFile(path string) error { func (this *APIConfig) WriteFile(path string) error {
data, err := yaml.Marshal(this) data, err := yaml.Marshal(this)
if err != nil { if err != nil {

View File

@@ -5,4 +5,5 @@ package teaconst
var ( var (
IsPlus = false IsPlus = false
MaxNodes int32 = 0 MaxNodes int32 = 0
NodeId int64 = 0
) )

View File

@@ -315,7 +315,7 @@ func (this *NodeDAO) ListEnabledNodesMatch(tx *dbs.Tx,
case configutils.BoolStateAll: case configutils.BoolStateAll:
// 所有 // 所有
case configutils.BoolStateYes: case configutils.BoolStateYes:
query.Where("JSON_EXTRACT(status, '$.isActive') AND UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')<=60") query.Where("isActive AND UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')<=60")
case configutils.BoolStateNo: case configutils.BoolStateNo:
query.Where("(status IS NULL OR NOT JSON_EXTRACT(status, '$.isActive') OR UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>60)") query.Where("(status IS NULL OR NOT JSON_EXTRACT(status, '$.isActive') OR UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>60)")
} }
@@ -569,7 +569,7 @@ func (this *NodeDAO) CountAllEnabledNodesMatch(tx *dbs.Tx,
case configutils.BoolStateAll: case configutils.BoolStateAll:
// 所有 // 所有
case configutils.BoolStateYes: case configutils.BoolStateYes:
query.Where("JSON_EXTRACT(status, '$.isActive') AND UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')<=60") query.Where("isActive AND UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')<=60")
case configutils.BoolStateNo: case configutils.BoolStateNo:
query.Where("(status IS NULL OR NOT JSON_EXTRACT(status, '$.isActive') OR UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>60)") query.Where("(status IS NULL OR NOT JSON_EXTRACT(status, '$.isActive') OR UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>60)")
} }
@@ -597,6 +597,7 @@ func (this *NodeDAO) CountAllEnabledNodesMatch(tx *dbs.Tx,
func (this *NodeDAO) UpdateNodeStatus(tx *dbs.Tx, nodeId int64, statusJSON []byte) error { func (this *NodeDAO) UpdateNodeStatus(tx *dbs.Tx, nodeId int64, statusJSON []byte) error {
_, err := this.Query(tx). _, err := this.Query(tx).
Pk(nodeId). Pk(nodeId).
Set("isActive", true).
Set("status", string(statusJSON)). Set("status", string(statusJSON)).
Update() Update()
return err return err

View File

@@ -6,6 +6,7 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/dbs"
) )
type APINodeService struct { type APINodeService struct {
@@ -231,6 +232,47 @@ func (this *APINodeService) FindCurrentAPINodeVersion(ctx context.Context, req *
return &pb.FindCurrentAPINodeVersionResponse{Version: teaconst.Version}, nil return &pb.FindCurrentAPINodeVersionResponse{Version: teaconst.Version}, nil
} }
// FindCurrentAPINode 获取当前API节点的信息
func (this *APINodeService) FindCurrentAPINode(ctx context.Context, req *pb.FindCurrentAPINodeRequest) (*pb.FindCurrentAPINodeResponse, error) {
_, err := this.ValidateAdmin(ctx, 0)
if err != nil {
return nil, err
}
var nodeId = teaconst.NodeId
var tx *dbs.Tx
node, err := models.SharedAPINodeDAO.FindEnabledAPINode(tx, nodeId)
if err != nil {
return nil, err
}
if node == nil {
return &pb.FindCurrentAPINodeResponse{ApiNode: nil}, nil
}
accessAddrs, err := node.DecodeAccessAddrStrings()
if err != nil {
return nil, err
}
return &pb.FindCurrentAPINodeResponse{ApiNode: &pb.APINode{
Id: int64(node.Id),
IsOn: node.IsOn == 1,
NodeClusterId: 0,
UniqueId: "",
Secret: "",
Name: "",
Description: "",
HttpJSON: nil,
HttpsJSON: nil,
RestIsOn: false,
RestHTTPJSON: nil,
RestHTTPSJSON: nil,
AccessAddrsJSON: []byte(node.AccessAddrs),
AccessAddrs: accessAddrs,
StatusJSON: nil,
}}, nil
}
// CountAllEnabledAPINodesWithSSLCertId 计算使用某个SSL证书的API节点数量 // CountAllEnabledAPINodesWithSSLCertId 计算使用某个SSL证书的API节点数量
func (this *APINodeService) CountAllEnabledAPINodesWithSSLCertId(ctx context.Context, req *pb.CountAllEnabledAPINodesWithSSLCertIdRequest) (*pb.RPCCountResponse, error) { func (this *APINodeService) CountAllEnabledAPINodesWithSSLCertId(ctx context.Context, req *pb.CountAllEnabledAPINodesWithSSLCertIdRequest) (*pb.RPCCountResponse, error) {
_, err := this.ValidateAdmin(ctx, 0) _, err := this.ValidateAdmin(ctx, 0)

View File

@@ -7,6 +7,7 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/configs" "github.com/TeaOSLab/EdgeAPI/internal/configs"
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
@@ -82,6 +83,7 @@ func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) erro
} }
defer func() { defer func() {
// 修改当前API节点的主边缘节点
if primaryNodeId == nodeId { if primaryNodeId == nodeId {
primaryNodeId = 0 primaryNodeId = 0
@@ -94,6 +96,12 @@ func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) erro
} }
nodeLocker.Unlock() nodeLocker.Unlock()
} }
// 修改在线状态
err = models.SharedNodeDAO.UpdateNodeActive(nil, nodeId, false)
if err != nil {
remotelogs.Error("NODE_SERVICE", "change node active failed: "+err.Error())
}
}() }()
// 返回连接成功 // 返回连接成功
@@ -125,6 +133,7 @@ func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) erro
if err != nil { if err != nil {
return err return err
} }
if !oldIsActive { if !oldIsActive {
err = models.SharedNodeDAO.UpdateNodeActive(tx, nodeId, true) err = models.SharedNodeDAO.UpdateNodeActive(tx, nodeId, true)
if err != nil { if err != nil {