diff --git a/internal/configs/api_config.go b/internal/configs/api_config.go index d5be986..1f0d712 100644 --- a/internal/configs/api_config.go +++ b/internal/configs/api_config.go @@ -9,11 +9,15 @@ import ( // APIConfig 节点API配置 type APIConfig struct { RPC struct { - Endpoints []string `yaml:"endpoints"` - DisableUpdate bool `yaml:"disableUpdate"` - } `yaml:"rpc"` - NodeId string `yaml:"nodeId"` - Secret string `yaml:"secret"` + Endpoints []string `yaml:"endpoints" json:"endpoints"` + DisableUpdate bool `yaml:"disableUpdate" json:"disableUpdate"` + } `yaml:"rpc" json:"rpc"` + NodeId string `yaml:"nodeId" json:"nodeId"` + Secret string `yaml:"secret" json:"secret"` +} + +func NewAPIConfig() *APIConfig { + return &APIConfig{} } func LoadAPIConfig() (*APIConfig, error) { diff --git a/internal/configs/cluster_config.go b/internal/configs/cluster_config.go index 0eff4d1..8bdcb7c 100644 --- a/internal/configs/cluster_config.go +++ b/internal/configs/cluster_config.go @@ -3,9 +3,9 @@ package configs // ClusterConfig 集群配置 type ClusterConfig struct { RPC struct { - Endpoints []string `yaml:"endpoints"` - DisableUpdate bool `yaml:"disableUpdate"` - } `yaml:"rpc"` - ClusterId string `yaml:"clusterId"` - Secret string `yaml:"secret"` + Endpoints []string `yaml:"endpoints" json:"endpoints"` + DisableUpdate bool `yaml:"disableUpdate" json:"disableUpdate"` + } `yaml:"rpc" json:"rpc"` + ClusterId string `yaml:"clusterId" json:"clusterId"` + Secret string `yaml:"secret" json:"secret"` } diff --git a/internal/nodes/node.go b/internal/nodes/node.go index dd08d31..357e03d 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "errors" + "github.com/TeaOSLab/EdgeCommon/pkg/configutils" iplib "github.com/TeaOSLab/EdgeCommon/pkg/iplibrary" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" @@ -65,6 +66,9 @@ type Node struct { updatingServerMap map[int64]*serverconfigs.ServerConfig + lastAPINodeVersion int64 + lastAPINodeAddrs []string // 以前的API节点地址 + lastTaskVersion int64 } @@ -747,8 +751,8 @@ func (this *Node) checkClusterConfig() error { } var apiConfig = &configs.APIConfig{ RPC: struct { - Endpoints []string `yaml:"endpoints"` - DisableUpdate bool `yaml:"disableUpdate"` + Endpoints []string `yaml:"endpoints" json:"endpoints"` + DisableUpdate bool `yaml:"disableUpdate" json:"disableUpdate"` }{ Endpoints: resp.Endpoints, DisableUpdate: false, @@ -1083,6 +1087,9 @@ func (this *Node) onReload(config *nodeconfigs.NodeConfig) { remotelogs.Error("NODE", "[DNS_RESOLVER]set env failed: "+err.Error()) } } + + // API Node地址,这里不限制是否为空,因为在为空时仍然要有对应的处理 + this.changeAPINodeAddrs(config.APINodeAddrs) } // reload server config @@ -1146,3 +1153,68 @@ func (this *Node) checkDisk() { } } } + +func (this *Node) changeAPINodeAddrs(apiNodeAddrs []*serverconfigs.NetworkAddressConfig) { + var addrs = []string{} + for _, addr := range apiNodeAddrs { + err := addr.Init() + if err != nil { + remotelogs.Error("NODE", "changeAPINodeAddrs: validate api node address '"+configutils.QuoteIP(addr.Host)+":"+addr.PortRange+"' failed: "+err.Error()) + } else { + addrs = append(addrs, addr.FullAddresses()...) + } + } + sort.Strings(addrs) + + if utils.EqualStrings(this.lastAPINodeAddrs, addrs) { + return + } + + this.lastAPINodeAddrs = addrs + + config, err := configs.LoadAPIConfig() + if err != nil { + remotelogs.Error("NODE", "changeAPINodeAddrs: "+err.Error()) + return + } + if config == nil { + return + } + var oldEndpoints = config.RPC.Endpoints + + rpcClient, err := rpc.SharedRPC() + if err != nil { + return + } + if len(addrs) > 0 { + this.lastAPINodeVersion++ + var v = this.lastAPINodeVersion + + // 异步检测,防止阻塞 + go func(v int64) { + // 测试新的API节点地址 + if rpcClient.TestEndpoints(addrs) { + config.RPC.Endpoints = addrs + } else { + config.RPC.Endpoints = oldEndpoints + this.lastAPINodeAddrs = nil // 恢复为空,以便于下次更新重试 + } + + // 检查测试中间有无新的变更 + if v != this.lastAPINodeVersion { + return + } + + err = rpcClient.UpdateConfig(config) + if err != nil { + remotelogs.Error("NODE", "changeAPINodeAddrs: update rpc config failed: "+err.Error()) + } + }(v) + return + } + + err = rpcClient.UpdateConfig(config) + if err != nil { + remotelogs.Error("NODE", "changeAPINodeAddrs: update rpc config failed: "+err.Error()) + } +} diff --git a/internal/nodes/task_sync_api_nodes.go b/internal/nodes/task_sync_api_nodes.go index e167fbb..259b5b0 100644 --- a/internal/nodes/task_sync_api_nodes.go +++ b/internal/nodes/task_sync_api_nodes.go @@ -1,23 +1,15 @@ package nodes import ( - "context" - "crypto/tls" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeNode/internal/configs" "github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/trackers" + "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/logs" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/credentials/insecure" - "net/url" - "sort" - "strings" - "sync" "time" ) @@ -64,6 +56,9 @@ func (this *SyncAPINodesTask) Stop() { } func (this *SyncAPINodesTask) Loop() error { + // 如果有节点定制的API节点地址 + var hasCustomizedAPINodeAddrs = sharedNodeConfig != nil && len(sharedNodeConfig.APINodeAddrs) > 0 + config, err := configs.LoadAPIConfig() if err != nil { return err @@ -96,21 +91,25 @@ func (this *SyncAPINodesTask) Loop() error { } // 和现有的对比 - if this.isSame(newEndpoints, config.RPC.Endpoints) { + if utils.EqualStrings(newEndpoints, config.RPC.Endpoints) { return nil } // 测试是否有API节点可用 - var hasOk = this.testEndpoints(newEndpoints) + var hasOk = rpcClient.TestEndpoints(newEndpoints) if !hasOk { return nil } // 修改RPC对象配置 config.RPC.Endpoints = newEndpoints - err = rpcClient.UpdateConfig(config) - if err != nil { - return err + + // 更新当前RPC + if !hasCustomizedAPINodeAddrs { + err = rpcClient.UpdateConfig(config) + if err != nil { + return err + } } // 保存到文件 @@ -121,53 +120,3 @@ func (this *SyncAPINodesTask) Loop() error { return nil } - -func (this *SyncAPINodesTask) isSame(endpoints1 []string, endpoints2 []string) bool { - sort.Strings(endpoints1) - sort.Strings(endpoints2) - return strings.Join(endpoints1, "&") == strings.Join(endpoints2, "&") -} - -func (this *SyncAPINodesTask) testEndpoints(endpoints []string) bool { - if len(endpoints) == 0 { - return false - } - - var wg = sync.WaitGroup{} - wg.Add(len(endpoints)) - - var ok = false - - for _, endpoint := range endpoints { - go func(endpoint string) { - defer wg.Done() - - u, err := url.Parse(endpoint) - if err != nil { - return - } - - ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second) - defer func() { - cancelFunc() - }() - var conn *grpc.ClientConn - if u.Scheme == "http" { - conn, err = grpc.DialContext(ctx, u.Host, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) - } else if u.Scheme == "https" { - conn, err = grpc.DialContext(ctx, u.Host, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{ - InsecureSkipVerify: true, - })), grpc.WithBlock()) - } - if err != nil { - return - } - _ = conn.Close() - - ok = true - }(endpoint) - } - wg.Wait() - - return ok -} diff --git a/internal/rpc/rpc_client.go b/internal/rpc/rpc_client.go index d181738..652d464 100644 --- a/internal/rpc/rpc_client.go +++ b/internal/rpc/rpc_client.go @@ -160,6 +160,64 @@ func (this *RPCClient) UpdateConfig(config *configs.APIConfig) error { return err } +// TestEndpoints 测试Endpoints是否可用 +func (this *RPCClient) TestEndpoints(endpoints []string) bool { + if len(endpoints) == 0 { + return false + } + + var wg = sync.WaitGroup{} + wg.Add(len(endpoints)) + + var ok = false + + for _, endpoint := range endpoints { + go func(endpoint string) { + defer wg.Done() + + u, err := url.Parse(endpoint) + if err != nil { + return + } + + ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second) + defer func() { + cancelFunc() + }() + var conn *grpc.ClientConn + if u.Scheme == "http" { + conn, err = grpc.DialContext(ctx, u.Host, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + } else if u.Scheme == "https" { + conn, err = grpc.DialContext(ctx, u.Host, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{ + InsecureSkipVerify: true, + })), grpc.WithBlock()) + } else { + return + } + if err != nil { + return + } + if conn == nil { + return + } + defer func() { + _ = conn.Close() + }() + + var pingService = pb.NewPingServiceClient(conn) + _, err = pingService.Ping(this.Context(), &pb.PingRequest{}) + if err != nil { + return + } + + ok = true + }(endpoint) + } + wg.Wait() + + return ok +} + // 初始化 func (this *RPCClient) init() error { // 重新连接 diff --git a/internal/utils/string.go b/internal/utils/string.go index 3a41437..dc3745c 100644 --- a/internal/utils/string.go +++ b/internal/utils/string.go @@ -42,8 +42,8 @@ func ToValidUTF8string(v string) string { return strings.ToValidUTF8(v, "") } -// ContainsSameStrings 检查两个字符串slice内容是否一致 -func ContainsSameStrings(s1 []string, s2 []string) bool { +// EqualStrings 检查两个字符串slice内容是否一致 +func EqualStrings(s1 []string, s2 []string) bool { if len(s1) != len(s2) { return false } diff --git a/internal/utils/string_test.go b/internal/utils/string_test.go index 5c6084d..fa2bb38 100644 --- a/internal/utils/string_test.go +++ b/internal/utils/string_test.go @@ -59,9 +59,9 @@ func TestFormatAddressList(t *testing.T) { func TestContainsSameStrings(t *testing.T) { var a = assert.NewAssertion(t) - a.IsFalse(utils.ContainsSameStrings([]string{"a"}, []string{"b"})) - a.IsFalse(utils.ContainsSameStrings([]string{"a", "b"}, []string{"b"})) - a.IsFalse(utils.ContainsSameStrings([]string{"a", "b"}, []string{"a", "b", "c"})) - a.IsTrue(utils.ContainsSameStrings([]string{"a", "b"}, []string{"a", "b"})) - a.IsTrue(utils.ContainsSameStrings([]string{"a", "b"}, []string{"b", "a"})) + a.IsFalse(utils.EqualStrings([]string{"a"}, []string{"b"})) + a.IsFalse(utils.EqualStrings([]string{"a", "b"}, []string{"b"})) + a.IsFalse(utils.EqualStrings([]string{"a", "b"}, []string{"a", "b", "c"})) + a.IsTrue(utils.EqualStrings([]string{"a", "b"}, []string{"a", "b"})) + a.IsTrue(utils.EqualStrings([]string{"a", "b"}, []string{"b", "a"})) }