From 16d661a236f11aa2a371f516524ca0734173f686 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Wed, 24 Feb 2021 09:00:12 +0800 Subject: [PATCH] =?UTF-8?q?=E8=87=AA=E5=8A=A8=E6=9B=B4=E6=96=B0API?= =?UTF-8?q?=E8=8A=82=E7=82=B9=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/rpc/rpc_client.go | 8 ++ internal/tasks/task_sync_api_nodes.go | 92 ++++++++++++++++++++++ internal/tasks/task_sync_api_nodes_test.go | 15 ++++ 3 files changed, 115 insertions(+) create mode 100644 internal/tasks/task_sync_api_nodes.go create mode 100644 internal/tasks/task_sync_api_nodes_test.go diff --git a/internal/rpc/rpc_client.go b/internal/rpc/rpc_client.go index b5ac8fb9..6456ec22 100644 --- a/internal/rpc/rpc_client.go +++ b/internal/rpc/rpc_client.go @@ -342,6 +342,12 @@ func (this *RPCClient) APIContext(apiNodeId int64) context.Context { return ctx } +// 修改配置 +func (this *RPCClient) UpdateConfig(config *configs.APIConfig) error { + this.apiConfig = config + return this.init() +} + // 初始化 func (this *RPCClient) init() error { // 重新连接 @@ -369,7 +375,9 @@ func (this *RPCClient) init() error { if len(conns) == 0 { return errors.New("[RPC]no available endpoints") } + this.locker.Lock() this.conns = conns + this.locker.Unlock() return nil } diff --git a/internal/tasks/task_sync_api_nodes.go b/internal/tasks/task_sync_api_nodes.go new file mode 100644 index 00000000..7763103e --- /dev/null +++ b/internal/tasks/task_sync_api_nodes.go @@ -0,0 +1,92 @@ +package tasks + +import ( + "github.com/TeaOSLab/EdgeAdmin/internal/configs" + "github.com/TeaOSLab/EdgeAdmin/internal/events" + "github.com/TeaOSLab/EdgeAdmin/internal/rpc" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/logs" + "sort" + "strings" + "time" +) + +func init() { + events.On(events.EventStart, func() { + task := NewSyncAPINodesTask() + go task.Start() + }) +} + +// API节点同步任务 +type SyncAPINodesTask struct { +} + +func NewSyncAPINodesTask() *SyncAPINodesTask { + return &SyncAPINodesTask{} +} + +func (this *SyncAPINodesTask) Start() { + ticker := time.NewTicker(5 * time.Minute) + if Tea.IsTesting() { + // 快速测试 + ticker = time.NewTicker(1 * time.Minute) + } + for range ticker.C { + err := this.Loop() + if err != nil { + logs.Println("[TASK][SYNC_API_NODES]" + err.Error()) + } + } +} + +func (this *SyncAPINodesTask) Loop() error { + // 获取所有可用的节点 + rpcClient, err := rpc.SharedRPC() + if err != nil { + return err + } + resp, err := rpcClient.APINodeRPC().FindAllEnabledAPINodes(rpcClient.Context(0), &pb.FindAllEnabledAPINodesRequest{}) + if err != nil { + return err + } + + newEndpoints := []string{} + for _, node := range resp.Nodes { + if !node.IsOn { + continue + } + newEndpoints = append(newEndpoints, node.AccessAddrs...) + } + + // 和现有的对比 + config, err := configs.LoadAPIConfig() + if err != nil { + return err + } + if this.isSame(newEndpoints, config.RPC.Endpoints) { + return nil + } + + // 修改RPC对象配置 + config.RPC.Endpoints = newEndpoints + err = rpcClient.UpdateConfig(config) + if err != nil { + return err + } + + // 保存到文件 + err = config.WriteFile(Tea.ConfigFile("api.yaml")) + if err != nil { + return err + } + + return nil +} + +func (this *SyncAPINodesTask) isSame(endpoints1 []string, endpoints2 []string) bool { + sort.Strings(endpoints1) + sort.Strings(endpoints2) + return strings.Join(endpoints1, "&") == strings.Join(endpoints2, "&") +} diff --git a/internal/tasks/task_sync_api_nodes_test.go b/internal/tasks/task_sync_api_nodes_test.go new file mode 100644 index 00000000..2a91f5ee --- /dev/null +++ b/internal/tasks/task_sync_api_nodes_test.go @@ -0,0 +1,15 @@ +package tasks + +import ( + _ "github.com/iwind/TeaGo/bootstrap" + "testing" +) + +func TestSyncAPINodesTask_Loop(t *testing.T) { + task := NewSyncAPINodesTask() + err := task.Loop() + if err != nil { + t.Fatal(err) + } + t.Log("ok") +}