From f38e80e82d3e0cff52fa73e03a9b7900a0f0146a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E7=A5=A5=E8=B6=85?= Date: Tue, 20 Jul 2021 18:17:25 +0800 Subject: [PATCH] =?UTF-8?q?=E8=87=AA=E5=8A=A8=E6=9B=BF=E6=8D=A2API?= =?UTF-8?q?=E8=8A=82=E7=82=B9=E6=97=B6=E5=A2=9E=E5=8A=A0=E5=AF=B9=E6=96=B0?= =?UTF-8?q?=E8=8A=82=E7=82=B9=E7=9A=84=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/nodes/task_sync_api_nodes.go | 60 ++++++++++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/internal/nodes/task_sync_api_nodes.go b/internal/nodes/task_sync_api_nodes.go index 81bc55d..d497128 100644 --- a/internal/nodes/task_sync_api_nodes.go +++ b/internal/nodes/task_sync_api_nodes.go @@ -1,6 +1,8 @@ package nodes import ( + "context" + "crypto/tls" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeNode/internal/configs" "github.com/TeaOSLab/EdgeNode/internal/events" @@ -8,8 +10,12 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/logs" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "net/url" "sort" "strings" + "sync" "time" ) @@ -20,7 +26,7 @@ func init() { }) } -// API节点同步任务 +// SyncAPINodesTask API节点同步任务 type SyncAPINodesTask struct { } @@ -74,6 +80,12 @@ func (this *SyncAPINodesTask) Loop() error { return nil } + // 测试是否有API节点可用 + hasOk := this.testEndpoints(newEndpoints) + if !hasOk { + return nil + } + // 修改RPC对象配置 config.RPC.Endpoints = newEndpoints err = rpcClient.UpdateConfig(config) @@ -95,3 +107,49 @@ func (this *SyncAPINodesTask) isSame(endpoints1 []string, endpoints2 []string) b sort.Strings(endpoints2) return strings.Join(endpoints1, "&") == strings.Join(endpoints2, "&") } + + + +func (this *SyncAPINodesTask) testEndpoints(endpoints []string) bool { + if len(endpoints) == 0 { + return false + } + + var wg = sync.WaitGroup{} + wg.Add(len(endpoints)) + + var ok = false + + for _, endpoint := range endpoints { + go func(endpoint string) { + defer wg.Done() + + u, err := url.Parse(endpoint) + if err != nil { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer func() { + cancel() + }() + var conn *grpc.ClientConn + if u.Scheme == "http" { + conn, err = grpc.DialContext(ctx, u.Host, grpc.WithInsecure(), grpc.WithBlock()) + } else if u.Scheme == "https" { + conn, err = grpc.DialContext(ctx, u.Host, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{ + InsecureSkipVerify: true, + })), grpc.WithBlock()) + } + if err != nil { + return + } + _ = conn.Close() + + ok = true + }(endpoint) + } + wg.Wait() + + return ok +}