节点可以单独设置所使用的API节点地址

This commit is contained in:
GoEdgeLab
2022-11-21 19:55:28 +08:00
parent 3a276a23ef
commit 74a2289db5
7 changed files with 166 additions and 83 deletions

View File

@@ -9,11 +9,15 @@ import (
// APIConfig 节点API配置 // APIConfig 节点API配置
type APIConfig struct { type APIConfig struct {
RPC struct { RPC struct {
Endpoints []string `yaml:"endpoints"` Endpoints []string `yaml:"endpoints" json:"endpoints"`
DisableUpdate bool `yaml:"disableUpdate"` DisableUpdate bool `yaml:"disableUpdate" json:"disableUpdate"`
} `yaml:"rpc"` } `yaml:"rpc" json:"rpc"`
NodeId string `yaml:"nodeId"` NodeId string `yaml:"nodeId" json:"nodeId"`
Secret string `yaml:"secret"` Secret string `yaml:"secret" json:"secret"`
}
func NewAPIConfig() *APIConfig {
return &APIConfig{}
} }
func LoadAPIConfig() (*APIConfig, error) { func LoadAPIConfig() (*APIConfig, error) {

View File

@@ -3,9 +3,9 @@ package configs
// ClusterConfig 集群配置 // ClusterConfig 集群配置
type ClusterConfig struct { type ClusterConfig struct {
RPC struct { RPC struct {
Endpoints []string `yaml:"endpoints"` Endpoints []string `yaml:"endpoints" json:"endpoints"`
DisableUpdate bool `yaml:"disableUpdate"` DisableUpdate bool `yaml:"disableUpdate" json:"disableUpdate"`
} `yaml:"rpc"` } `yaml:"rpc" json:"rpc"`
ClusterId string `yaml:"clusterId"` ClusterId string `yaml:"clusterId" json:"clusterId"`
Secret string `yaml:"secret"` Secret string `yaml:"secret" json:"secret"`
} }

View File

@@ -5,6 +5,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
iplib "github.com/TeaOSLab/EdgeCommon/pkg/iplibrary" iplib "github.com/TeaOSLab/EdgeCommon/pkg/iplibrary"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
@@ -65,6 +66,9 @@ type Node struct {
updatingServerMap map[int64]*serverconfigs.ServerConfig updatingServerMap map[int64]*serverconfigs.ServerConfig
lastAPINodeVersion int64
lastAPINodeAddrs []string // 以前的API节点地址
lastTaskVersion int64 lastTaskVersion int64
} }
@@ -747,8 +751,8 @@ func (this *Node) checkClusterConfig() error {
} }
var apiConfig = &configs.APIConfig{ var apiConfig = &configs.APIConfig{
RPC: struct { RPC: struct {
Endpoints []string `yaml:"endpoints"` Endpoints []string `yaml:"endpoints" json:"endpoints"`
DisableUpdate bool `yaml:"disableUpdate"` DisableUpdate bool `yaml:"disableUpdate" json:"disableUpdate"`
}{ }{
Endpoints: resp.Endpoints, Endpoints: resp.Endpoints,
DisableUpdate: false, DisableUpdate: false,
@@ -1083,6 +1087,9 @@ func (this *Node) onReload(config *nodeconfigs.NodeConfig) {
remotelogs.Error("NODE", "[DNS_RESOLVER]set env failed: "+err.Error()) remotelogs.Error("NODE", "[DNS_RESOLVER]set env failed: "+err.Error())
} }
} }
// API Node地址这里不限制是否为空因为在为空时仍然要有对应的处理
this.changeAPINodeAddrs(config.APINodeAddrs)
} }
// reload server config // reload server config
@@ -1146,3 +1153,68 @@ func (this *Node) checkDisk() {
} }
} }
} }
func (this *Node) changeAPINodeAddrs(apiNodeAddrs []*serverconfigs.NetworkAddressConfig) {
var addrs = []string{}
for _, addr := range apiNodeAddrs {
err := addr.Init()
if err != nil {
remotelogs.Error("NODE", "changeAPINodeAddrs: validate api node address '"+configutils.QuoteIP(addr.Host)+":"+addr.PortRange+"' failed: "+err.Error())
} else {
addrs = append(addrs, addr.FullAddresses()...)
}
}
sort.Strings(addrs)
if utils.EqualStrings(this.lastAPINodeAddrs, addrs) {
return
}
this.lastAPINodeAddrs = addrs
config, err := configs.LoadAPIConfig()
if err != nil {
remotelogs.Error("NODE", "changeAPINodeAddrs: "+err.Error())
return
}
if config == nil {
return
}
var oldEndpoints = config.RPC.Endpoints
rpcClient, err := rpc.SharedRPC()
if err != nil {
return
}
if len(addrs) > 0 {
this.lastAPINodeVersion++
var v = this.lastAPINodeVersion
// 异步检测,防止阻塞
go func(v int64) {
// 测试新的API节点地址
if rpcClient.TestEndpoints(addrs) {
config.RPC.Endpoints = addrs
} else {
config.RPC.Endpoints = oldEndpoints
this.lastAPINodeAddrs = nil // 恢复为空,以便于下次更新重试
}
// 检查测试中间有无新的变更
if v != this.lastAPINodeVersion {
return
}
err = rpcClient.UpdateConfig(config)
if err != nil {
remotelogs.Error("NODE", "changeAPINodeAddrs: update rpc config failed: "+err.Error())
}
}(v)
return
}
err = rpcClient.UpdateConfig(config)
if err != nil {
remotelogs.Error("NODE", "changeAPINodeAddrs: update rpc config failed: "+err.Error())
}
}

View File

@@ -1,23 +1,15 @@
package nodes package nodes
import ( import (
"context"
"crypto/tls"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeNode/internal/configs" "github.com/TeaOSLab/EdgeNode/internal/configs"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/logs"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"net/url"
"sort"
"strings"
"sync"
"time" "time"
) )
@@ -64,6 +56,9 @@ func (this *SyncAPINodesTask) Stop() {
} }
func (this *SyncAPINodesTask) Loop() error { func (this *SyncAPINodesTask) Loop() error {
// 如果有节点定制的API节点地址
var hasCustomizedAPINodeAddrs = sharedNodeConfig != nil && len(sharedNodeConfig.APINodeAddrs) > 0
config, err := configs.LoadAPIConfig() config, err := configs.LoadAPIConfig()
if err != nil { if err != nil {
return err return err
@@ -96,21 +91,25 @@ func (this *SyncAPINodesTask) Loop() error {
} }
// 和现有的对比 // 和现有的对比
if this.isSame(newEndpoints, config.RPC.Endpoints) { if utils.EqualStrings(newEndpoints, config.RPC.Endpoints) {
return nil return nil
} }
// 测试是否有API节点可用 // 测试是否有API节点可用
var hasOk = this.testEndpoints(newEndpoints) var hasOk = rpcClient.TestEndpoints(newEndpoints)
if !hasOk { if !hasOk {
return nil return nil
} }
// 修改RPC对象配置 // 修改RPC对象配置
config.RPC.Endpoints = newEndpoints config.RPC.Endpoints = newEndpoints
err = rpcClient.UpdateConfig(config)
if err != nil { // 更新当前RPC
return err if !hasCustomizedAPINodeAddrs {
err = rpcClient.UpdateConfig(config)
if err != nil {
return err
}
} }
// 保存到文件 // 保存到文件
@@ -121,53 +120,3 @@ func (this *SyncAPINodesTask) Loop() error {
return nil return nil
} }
func (this *SyncAPINodesTask) isSame(endpoints1 []string, endpoints2 []string) bool {
sort.Strings(endpoints1)
sort.Strings(endpoints2)
return strings.Join(endpoints1, "&") == strings.Join(endpoints2, "&")
}
func (this *SyncAPINodesTask) testEndpoints(endpoints []string) bool {
if len(endpoints) == 0 {
return false
}
var wg = sync.WaitGroup{}
wg.Add(len(endpoints))
var ok = false
for _, endpoint := range endpoints {
go func(endpoint string) {
defer wg.Done()
u, err := url.Parse(endpoint)
if err != nil {
return
}
ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
defer func() {
cancelFunc()
}()
var conn *grpc.ClientConn
if u.Scheme == "http" {
conn, err = grpc.DialContext(ctx, u.Host, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
} else if u.Scheme == "https" {
conn, err = grpc.DialContext(ctx, u.Host, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{
InsecureSkipVerify: true,
})), grpc.WithBlock())
}
if err != nil {
return
}
_ = conn.Close()
ok = true
}(endpoint)
}
wg.Wait()
return ok
}

View File

@@ -160,6 +160,64 @@ func (this *RPCClient) UpdateConfig(config *configs.APIConfig) error {
return err return err
} }
// TestEndpoints 测试Endpoints是否可用
func (this *RPCClient) TestEndpoints(endpoints []string) bool {
if len(endpoints) == 0 {
return false
}
var wg = sync.WaitGroup{}
wg.Add(len(endpoints))
var ok = false
for _, endpoint := range endpoints {
go func(endpoint string) {
defer wg.Done()
u, err := url.Parse(endpoint)
if err != nil {
return
}
ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
defer func() {
cancelFunc()
}()
var conn *grpc.ClientConn
if u.Scheme == "http" {
conn, err = grpc.DialContext(ctx, u.Host, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
} else if u.Scheme == "https" {
conn, err = grpc.DialContext(ctx, u.Host, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{
InsecureSkipVerify: true,
})), grpc.WithBlock())
} else {
return
}
if err != nil {
return
}
if conn == nil {
return
}
defer func() {
_ = conn.Close()
}()
var pingService = pb.NewPingServiceClient(conn)
_, err = pingService.Ping(this.Context(), &pb.PingRequest{})
if err != nil {
return
}
ok = true
}(endpoint)
}
wg.Wait()
return ok
}
// 初始化 // 初始化
func (this *RPCClient) init() error { func (this *RPCClient) init() error {
// 重新连接 // 重新连接

View File

@@ -42,8 +42,8 @@ func ToValidUTF8string(v string) string {
return strings.ToValidUTF8(v, "") return strings.ToValidUTF8(v, "")
} }
// ContainsSameStrings 检查两个字符串slice内容是否一致 // EqualStrings 检查两个字符串slice内容是否一致
func ContainsSameStrings(s1 []string, s2 []string) bool { func EqualStrings(s1 []string, s2 []string) bool {
if len(s1) != len(s2) { if len(s1) != len(s2) {
return false return false
} }

View File

@@ -59,9 +59,9 @@ func TestFormatAddressList(t *testing.T) {
func TestContainsSameStrings(t *testing.T) { func TestContainsSameStrings(t *testing.T) {
var a = assert.NewAssertion(t) var a = assert.NewAssertion(t)
a.IsFalse(utils.ContainsSameStrings([]string{"a"}, []string{"b"})) a.IsFalse(utils.EqualStrings([]string{"a"}, []string{"b"}))
a.IsFalse(utils.ContainsSameStrings([]string{"a", "b"}, []string{"b"})) a.IsFalse(utils.EqualStrings([]string{"a", "b"}, []string{"b"}))
a.IsFalse(utils.ContainsSameStrings([]string{"a", "b"}, []string{"a", "b", "c"})) a.IsFalse(utils.EqualStrings([]string{"a", "b"}, []string{"a", "b", "c"}))
a.IsTrue(utils.ContainsSameStrings([]string{"a", "b"}, []string{"a", "b"})) a.IsTrue(utils.EqualStrings([]string{"a", "b"}, []string{"a", "b"}))
a.IsTrue(utils.ContainsSameStrings([]string{"a", "b"}, []string{"b", "a"})) a.IsTrue(utils.EqualStrings([]string{"a", "b"}, []string{"b", "a"}))
} }