From 188fe12a0b9a5e34d6dd59ad5cd6f8099bf6a9d1 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Thu, 11 Nov 2021 14:16:57 +0800 Subject: [PATCH] =?UTF-8?q?=E8=8A=82=E7=82=B9=E9=85=8D=E7=BD=AE=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E5=8E=8B=E7=BC=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/nodes/node.go | 39 ++++++++++++++++++++++++++++++++------- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 7fed7d2..c13b17d 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -1,6 +1,7 @@ package nodes import ( + "bytes" "encoding/json" "errors" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" @@ -16,6 +17,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/stats" "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/andybalholm/brotli" "github.com/go-yaml/yaml" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/lists" @@ -89,14 +91,14 @@ func (this *Node) Start() { } // 读取API配置 - err = this.syncConfig() + err = this.syncConfig(0) if err != nil { _, err := nodeconfigs.SharedNodeConfig() if err != nil { // 无本地数据时,会尝试多次读取 tryTimes := 0 for { - err := this.syncConfig() + err := this.syncConfig(0) if err != nil { tryTimes++ @@ -261,7 +263,11 @@ func (this *Node) loop() error { return err } case "configChanged": - err := this.syncConfig() + if !task.IsPrimary { + // 我们等等主节点配置准备完毕 + time.Sleep(2 * time.Second) + } + err := this.syncConfig(task.Version) if err != nil { _, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{ NodeTaskId: task.Id, @@ -287,7 +293,7 @@ func (this *Node) loop() error { } // 读取API配置 -func (this *Node) syncConfig() error { +func (this *Node) syncConfig(taskVersion int64) error { this.locker.Lock() defer this.locker.Unlock() @@ -318,7 +324,9 @@ func (this *Node) syncConfig() error { // TODO 这里考虑只同步版本号有变更的 configResp, err := rpcClient.NodeRPC().FindCurrentNodeConfig(nodeCtx, &pb.FindCurrentNodeConfigRequest{ - Version: -1, // 更新所有版本 + Version: -1, // 更新所有版本 + Compress: true, + NodeTaskVersion: taskVersion, }) if err != nil { return errors.New("read config from rpc failed: " + err.Error()) @@ -326,9 +334,26 @@ func (this *Node) syncConfig() error { if !configResp.IsChanged { return nil } - nodeConfigUpdatedAt = time.Now().Unix() configJSON := configResp.NodeJSON + if configResp.IsCompressed { + var reader = brotli.NewReader(bytes.NewReader(configJSON)) + var configBuf = &bytes.Buffer{} + var buf = make([]byte, 32*1024) + for { + n, err := reader.Read(buf) + if n > 0 { + configBuf.Write(buf[:n]) + } + if err != nil { + break + } + } + configJSON = configBuf.Bytes() + } + + nodeConfigUpdatedAt = time.Now().Unix() + nodeConfig := &nodeconfigs.NodeConfig{} err = json.Unmarshal(configJSON, nodeConfig) if err != nil { @@ -411,7 +436,7 @@ func (this *Node) startSyncTimer() { continue } case <-nodeConfigChangedNotify: - err := this.syncConfig() + err := this.syncConfig(0) if err != nil { remotelogs.Error("NODE", "sync config error: "+err.Error()) continue