节点配置支持压缩

This commit is contained in:
GoEdgeLab
2021-11-11 14:16:57 +08:00
parent 93a3af158a
commit 188fe12a0b

View File

@@ -1,6 +1,7 @@
package nodes package nodes
import ( import (
"bytes"
"encoding/json" "encoding/json"
"errors" "errors"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
@@ -16,6 +17,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/stats" "github.com/TeaOSLab/EdgeNode/internal/stats"
"github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/andybalholm/brotli"
"github.com/go-yaml/yaml" "github.com/go-yaml/yaml"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/lists" "github.com/iwind/TeaGo/lists"
@@ -89,14 +91,14 @@ func (this *Node) Start() {
} }
// 读取API配置 // 读取API配置
err = this.syncConfig() err = this.syncConfig(0)
if err != nil { if err != nil {
_, err := nodeconfigs.SharedNodeConfig() _, err := nodeconfigs.SharedNodeConfig()
if err != nil { if err != nil {
// 无本地数据时,会尝试多次读取 // 无本地数据时,会尝试多次读取
tryTimes := 0 tryTimes := 0
for { for {
err := this.syncConfig() err := this.syncConfig(0)
if err != nil { if err != nil {
tryTimes++ tryTimes++
@@ -261,7 +263,11 @@ func (this *Node) loop() error {
return err return err
} }
case "configChanged": case "configChanged":
err := this.syncConfig() if !task.IsPrimary {
// 我们等等主节点配置准备完毕
time.Sleep(2 * time.Second)
}
err := this.syncConfig(task.Version)
if err != nil { if err != nil {
_, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{ _, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
NodeTaskId: task.Id, NodeTaskId: task.Id,
@@ -287,7 +293,7 @@ func (this *Node) loop() error {
} }
// 读取API配置 // 读取API配置
func (this *Node) syncConfig() error { func (this *Node) syncConfig(taskVersion int64) error {
this.locker.Lock() this.locker.Lock()
defer this.locker.Unlock() defer this.locker.Unlock()
@@ -319,6 +325,8 @@ func (this *Node) syncConfig() error {
// TODO 这里考虑只同步版本号有变更的 // TODO 这里考虑只同步版本号有变更的
configResp, err := rpcClient.NodeRPC().FindCurrentNodeConfig(nodeCtx, &pb.FindCurrentNodeConfigRequest{ configResp, err := rpcClient.NodeRPC().FindCurrentNodeConfig(nodeCtx, &pb.FindCurrentNodeConfigRequest{
Version: -1, // 更新所有版本 Version: -1, // 更新所有版本
Compress: true,
NodeTaskVersion: taskVersion,
}) })
if err != nil { if err != nil {
return errors.New("read config from rpc failed: " + err.Error()) return errors.New("read config from rpc failed: " + err.Error())
@@ -326,9 +334,26 @@ func (this *Node) syncConfig() error {
if !configResp.IsChanged { if !configResp.IsChanged {
return nil return nil
} }
nodeConfigUpdatedAt = time.Now().Unix()
configJSON := configResp.NodeJSON configJSON := configResp.NodeJSON
if configResp.IsCompressed {
var reader = brotli.NewReader(bytes.NewReader(configJSON))
var configBuf = &bytes.Buffer{}
var buf = make([]byte, 32*1024)
for {
n, err := reader.Read(buf)
if n > 0 {
configBuf.Write(buf[:n])
}
if err != nil {
break
}
}
configJSON = configBuf.Bytes()
}
nodeConfigUpdatedAt = time.Now().Unix()
nodeConfig := &nodeconfigs.NodeConfig{} nodeConfig := &nodeconfigs.NodeConfig{}
err = json.Unmarshal(configJSON, nodeConfig) err = json.Unmarshal(configJSON, nodeConfig)
if err != nil { if err != nil {
@@ -411,7 +436,7 @@ func (this *Node) startSyncTimer() {
continue continue
} }
case <-nodeConfigChangedNotify: case <-nodeConfigChangedNotify:
err := this.syncConfig() err := this.syncConfig(0)
if err != nil { if err != nil {
remotelogs.Error("NODE", "sync config error: "+err.Error()) remotelogs.Error("NODE", "sync config error: "+err.Error())
continue continue