From 51d144cb53a52473f24bddfee01d414761f27760 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Sat, 20 Nov 2021 18:59:35 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=BE=B9=E7=BC=98=E8=8A=82?= =?UTF-8?q?=E7=82=B9=E5=9C=A8=E7=BA=BF=E7=8A=B6=E6=80=81=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/configs/api_config.go | 11 ++--- internal/const/vars.go | 1 + internal/db/models/node_dao.go | 9 +++-- internal/rpc/services/service_api_node.go | 42 ++++++++++++++++++++ internal/rpc/services/service_node_stream.go | 9 +++++ 5 files changed, 63 insertions(+), 9 deletions(-) diff --git a/internal/configs/api_config.go b/internal/configs/api_config.go index 1df34565..5859f8d9 100644 --- a/internal/configs/api_config.go +++ b/internal/configs/api_config.go @@ -13,7 +13,7 @@ import ( var sharedAPIConfig *APIConfig = nil var PaddingId string -// API节点配置 +// APIConfig API节点配置 type APIConfig struct { NodeId string `yaml:"nodeId" json:"nodeId"` Secret string `yaml:"secret" json:"secret"` @@ -21,7 +21,7 @@ type APIConfig struct { numberId int64 // 数字ID } -// 获取共享配置 +// SharedAPIConfig 获取共享配置 func SharedAPIConfig() (*APIConfig, error) { sharedLocker.Lock() defer sharedLocker.Unlock() @@ -96,18 +96,19 @@ func SharedAPIConfig() (*APIConfig, error) { return config, nil } -// 设置数字ID +// SetNumberId 设置数字ID func (this *APIConfig) SetNumberId(numberId int64) { this.numberId = numberId + teaconst.NodeId = numberId PaddingId = fmt.Sprintf("%08d", numberId) } -// 获取数字ID +// NumberId 获取数字ID func (this *APIConfig) NumberId() int64 { return this.numberId } -// 保存到文件 +// WriteFile 保存到文件 func (this *APIConfig) WriteFile(path string) error { data, err := yaml.Marshal(this) if err != nil { diff --git a/internal/const/vars.go b/internal/const/vars.go index efac89ed..bb20feef 100644 --- a/internal/const/vars.go +++ b/internal/const/vars.go @@ -5,4 +5,5 @@ package teaconst var ( IsPlus = false MaxNodes int32 = 0 + NodeId int64 = 0 ) diff --git a/internal/db/models/node_dao.go b/internal/db/models/node_dao.go index 0d77dd24..6b20a582 100644 --- a/internal/db/models/node_dao.go +++ b/internal/db/models/node_dao.go @@ -315,7 +315,7 @@ func (this *NodeDAO) ListEnabledNodesMatch(tx *dbs.Tx, case configutils.BoolStateAll: // 所有 case configutils.BoolStateYes: - query.Where("JSON_EXTRACT(status, '$.isActive') AND UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')<=60") + query.Where("isActive AND UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')<=60") case configutils.BoolStateNo: query.Where("(status IS NULL OR NOT JSON_EXTRACT(status, '$.isActive') OR UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>60)") } @@ -519,9 +519,9 @@ func (this *NodeDAO) FindAllInactiveNodesWithClusterId(tx *dbs.Tx, clusterId int _, err = this.Query(tx). State(NodeStateEnabled). Attr("clusterId", clusterId). - Attr("isOn", true). // 只监控启用的节点 + Attr("isOn", true). // 只监控启用的节点 Attr("isInstalled", true). // 只监控已经安装的节点 - Attr("isActive", true). // 当前已经在线的 + Attr("isActive", true). // 当前已经在线的 Where("(status IS NULL OR (JSON_EXTRACT(status, '$.isActive')=false AND UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>10) OR UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>120)"). Result("id", "name"). Slice(&result). @@ -569,7 +569,7 @@ func (this *NodeDAO) CountAllEnabledNodesMatch(tx *dbs.Tx, case configutils.BoolStateAll: // 所有 case configutils.BoolStateYes: - query.Where("JSON_EXTRACT(status, '$.isActive') AND UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')<=60") + query.Where("isActive AND UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')<=60") case configutils.BoolStateNo: query.Where("(status IS NULL OR NOT JSON_EXTRACT(status, '$.isActive') OR UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>60)") } @@ -597,6 +597,7 @@ func (this *NodeDAO) CountAllEnabledNodesMatch(tx *dbs.Tx, func (this *NodeDAO) UpdateNodeStatus(tx *dbs.Tx, nodeId int64, statusJSON []byte) error { _, err := this.Query(tx). Pk(nodeId). + Set("isActive", true). Set("status", string(statusJSON)). Update() return err diff --git a/internal/rpc/services/service_api_node.go b/internal/rpc/services/service_api_node.go index b969ea6f..da2421d4 100644 --- a/internal/rpc/services/service_api_node.go +++ b/internal/rpc/services/service_api_node.go @@ -6,6 +6,7 @@ import ( "github.com/TeaOSLab/EdgeAPI/internal/db/models" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/iwind/TeaGo/dbs" ) type APINodeService struct { @@ -231,6 +232,47 @@ func (this *APINodeService) FindCurrentAPINodeVersion(ctx context.Context, req * return &pb.FindCurrentAPINodeVersionResponse{Version: teaconst.Version}, nil } +// FindCurrentAPINode 获取当前API节点的信息 +func (this *APINodeService) FindCurrentAPINode(ctx context.Context, req *pb.FindCurrentAPINodeRequest) (*pb.FindCurrentAPINodeResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var nodeId = teaconst.NodeId + var tx *dbs.Tx + node, err := models.SharedAPINodeDAO.FindEnabledAPINode(tx, nodeId) + if err != nil { + return nil, err + } + if node == nil { + return &pb.FindCurrentAPINodeResponse{ApiNode: nil}, nil + } + + accessAddrs, err := node.DecodeAccessAddrStrings() + if err != nil { + return nil, err + } + + return &pb.FindCurrentAPINodeResponse{ApiNode: &pb.APINode{ + Id: int64(node.Id), + IsOn: node.IsOn == 1, + NodeClusterId: 0, + UniqueId: "", + Secret: "", + Name: "", + Description: "", + HttpJSON: nil, + HttpsJSON: nil, + RestIsOn: false, + RestHTTPJSON: nil, + RestHTTPSJSON: nil, + AccessAddrsJSON: []byte(node.AccessAddrs), + AccessAddrs: accessAddrs, + StatusJSON: nil, + }}, nil +} + // CountAllEnabledAPINodesWithSSLCertId 计算使用某个SSL证书的API节点数量 func (this *APINodeService) CountAllEnabledAPINodesWithSSLCertId(ctx context.Context, req *pb.CountAllEnabledAPINodesWithSSLCertIdRequest) (*pb.RPCCountResponse, error) { _, err := this.ValidateAdmin(ctx, 0) diff --git a/internal/rpc/services/service_node_stream.go b/internal/rpc/services/service_node_stream.go index 762f5d21..31c16888 100644 --- a/internal/rpc/services/service_node_stream.go +++ b/internal/rpc/services/service_node_stream.go @@ -7,6 +7,7 @@ import ( "github.com/TeaOSLab/EdgeAPI/internal/configs" "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" "github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" @@ -82,6 +83,7 @@ func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) erro } defer func() { + // 修改当前API节点的主边缘节点 if primaryNodeId == nodeId { primaryNodeId = 0 @@ -94,6 +96,12 @@ func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) erro } nodeLocker.Unlock() } + + // 修改在线状态 + err = models.SharedNodeDAO.UpdateNodeActive(nil, nodeId, false) + if err != nil { + remotelogs.Error("NODE_SERVICE", "change node active failed: "+err.Error()) + } }() // 返回连接成功 @@ -125,6 +133,7 @@ func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) erro if err != nil { return err } + if !oldIsActive { err = models.SharedNodeDAO.UpdateNodeActive(tx, nodeId, true) if err != nil {