From acc2df7fea54acd4c184a40a26c524fe0ce226e4 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Wed, 24 Feb 2021 11:01:06 +0800 Subject: [PATCH] =?UTF-8?q?=E8=87=AA=E5=8A=A8=E5=90=8C=E6=AD=A5API?= =?UTF-8?q?=E8=8A=82=E7=82=B9=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/nodes/task_sync_api_nodes.go | 97 +++++++++++++++++++++++++++ internal/rpc/rpc_client.go | 8 +++ 2 files changed, 105 insertions(+) create mode 100644 internal/nodes/task_sync_api_nodes.go diff --git a/internal/nodes/task_sync_api_nodes.go b/internal/nodes/task_sync_api_nodes.go new file mode 100644 index 0000000..81bc55d --- /dev/null +++ b/internal/nodes/task_sync_api_nodes.go @@ -0,0 +1,97 @@ +package nodes + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeNode/internal/configs" + "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/TeaOSLab/EdgeNode/internal/rpc" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/logs" + "sort" + "strings" + "time" +) + +func init() { + events.On(events.EventStart, func() { + task := NewSyncAPINodesTask() + go task.Start() + }) +} + +// API节点同步任务 +type SyncAPINodesTask struct { +} + +func NewSyncAPINodesTask() *SyncAPINodesTask { + return &SyncAPINodesTask{} +} + +func (this *SyncAPINodesTask) Start() { + ticker := time.NewTicker(5 * time.Minute) + if Tea.IsTesting() { + // 快速测试 + ticker = time.NewTicker(1 * time.Minute) + } + events.On(events.EventQuit, func() { + remotelogs.Println("SYNC_API_NODES_TASK", "quit task") + ticker.Stop() + }) + for range ticker.C { + err := this.Loop() + if err != nil { + logs.Println("[TASK][SYNC_API_NODES_TASK]" + err.Error()) + } + } +} + +func (this *SyncAPINodesTask) Loop() error { + // 获取所有可用的节点 + rpcClient, err := rpc.SharedRPC() + if err != nil { + return err + } + resp, err := rpcClient.APINodeRPC().FindAllEnabledAPINodes(rpcClient.Context(), &pb.FindAllEnabledAPINodesRequest{}) + if err != nil { + return err + } + + newEndpoints := []string{} + for _, node := range resp.Nodes { + if !node.IsOn { + continue + } + newEndpoints = append(newEndpoints, node.AccessAddrs...) + } + + // 和现有的对比 + config, err := configs.LoadAPIConfig() + if err != nil { + return err + } + if this.isSame(newEndpoints, config.RPC.Endpoints) { + return nil + } + + // 修改RPC对象配置 + config.RPC.Endpoints = newEndpoints + err = rpcClient.UpdateConfig(config) + if err != nil { + return err + } + + // 保存到文件 + err = config.WriteFile(Tea.ConfigFile("api.yaml")) + if err != nil { + return err + } + + return nil +} + +func (this *SyncAPINodesTask) isSame(endpoints1 []string, endpoints2 []string) bool { + sort.Strings(endpoints1) + sort.Strings(endpoints2) + return strings.Join(endpoints1, "&") == strings.Join(endpoints2, "&") +} diff --git a/internal/rpc/rpc_client.go b/internal/rpc/rpc_client.go index 57c56bb..47462de 100644 --- a/internal/rpc/rpc_client.go +++ b/internal/rpc/rpc_client.go @@ -158,6 +158,12 @@ func (this *RPCClient) Close() { } } +// 修改配置 +func (this *RPCClient) UpdateConfig(config *configs.APIConfig) error { + this.apiConfig = config + return this.init() +} + // 初始化 func (this *RPCClient) init() error { // 重新连接 @@ -185,6 +191,8 @@ func (this *RPCClient) init() error { if len(conns) == 0 { return errors.New("[RPC]no available endpoints") } + + // 这里不需要加锁,防止和pickConn()冲突 this.conns = conns return nil }