mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-16 18:10:30 +08:00
节点可以单独设置所使用的API节点地址
This commit is contained in:
@@ -9,11 +9,15 @@ import (
|
|||||||
// APIConfig 节点API配置
|
// APIConfig 节点API配置
|
||||||
type APIConfig struct {
|
type APIConfig struct {
|
||||||
RPC struct {
|
RPC struct {
|
||||||
Endpoints []string `yaml:"endpoints"`
|
Endpoints []string `yaml:"endpoints" json:"endpoints"`
|
||||||
DisableUpdate bool `yaml:"disableUpdate"`
|
DisableUpdate bool `yaml:"disableUpdate" json:"disableUpdate"`
|
||||||
} `yaml:"rpc"`
|
} `yaml:"rpc" json:"rpc"`
|
||||||
NodeId string `yaml:"nodeId"`
|
NodeId string `yaml:"nodeId" json:"nodeId"`
|
||||||
Secret string `yaml:"secret"`
|
Secret string `yaml:"secret" json:"secret"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAPIConfig() *APIConfig {
|
||||||
|
return &APIConfig{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func LoadAPIConfig() (*APIConfig, error) {
|
func LoadAPIConfig() (*APIConfig, error) {
|
||||||
|
|||||||
@@ -3,9 +3,9 @@ package configs
|
|||||||
// ClusterConfig 集群配置
|
// ClusterConfig 集群配置
|
||||||
type ClusterConfig struct {
|
type ClusterConfig struct {
|
||||||
RPC struct {
|
RPC struct {
|
||||||
Endpoints []string `yaml:"endpoints"`
|
Endpoints []string `yaml:"endpoints" json:"endpoints"`
|
||||||
DisableUpdate bool `yaml:"disableUpdate"`
|
DisableUpdate bool `yaml:"disableUpdate" json:"disableUpdate"`
|
||||||
} `yaml:"rpc"`
|
} `yaml:"rpc" json:"rpc"`
|
||||||
ClusterId string `yaml:"clusterId"`
|
ClusterId string `yaml:"clusterId" json:"clusterId"`
|
||||||
Secret string `yaml:"secret"`
|
Secret string `yaml:"secret" json:"secret"`
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
|
||||||
iplib "github.com/TeaOSLab/EdgeCommon/pkg/iplibrary"
|
iplib "github.com/TeaOSLab/EdgeCommon/pkg/iplibrary"
|
||||||
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
||||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||||
@@ -65,6 +66,9 @@ type Node struct {
|
|||||||
|
|
||||||
updatingServerMap map[int64]*serverconfigs.ServerConfig
|
updatingServerMap map[int64]*serverconfigs.ServerConfig
|
||||||
|
|
||||||
|
lastAPINodeVersion int64
|
||||||
|
lastAPINodeAddrs []string // 以前的API节点地址
|
||||||
|
|
||||||
lastTaskVersion int64
|
lastTaskVersion int64
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -747,8 +751,8 @@ func (this *Node) checkClusterConfig() error {
|
|||||||
}
|
}
|
||||||
var apiConfig = &configs.APIConfig{
|
var apiConfig = &configs.APIConfig{
|
||||||
RPC: struct {
|
RPC: struct {
|
||||||
Endpoints []string `yaml:"endpoints"`
|
Endpoints []string `yaml:"endpoints" json:"endpoints"`
|
||||||
DisableUpdate bool `yaml:"disableUpdate"`
|
DisableUpdate bool `yaml:"disableUpdate" json:"disableUpdate"`
|
||||||
}{
|
}{
|
||||||
Endpoints: resp.Endpoints,
|
Endpoints: resp.Endpoints,
|
||||||
DisableUpdate: false,
|
DisableUpdate: false,
|
||||||
@@ -1083,6 +1087,9 @@ func (this *Node) onReload(config *nodeconfigs.NodeConfig) {
|
|||||||
remotelogs.Error("NODE", "[DNS_RESOLVER]set env failed: "+err.Error())
|
remotelogs.Error("NODE", "[DNS_RESOLVER]set env failed: "+err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// API Node地址,这里不限制是否为空,因为在为空时仍然要有对应的处理
|
||||||
|
this.changeAPINodeAddrs(config.APINodeAddrs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// reload server config
|
// reload server config
|
||||||
@@ -1146,3 +1153,68 @@ func (this *Node) checkDisk() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *Node) changeAPINodeAddrs(apiNodeAddrs []*serverconfigs.NetworkAddressConfig) {
|
||||||
|
var addrs = []string{}
|
||||||
|
for _, addr := range apiNodeAddrs {
|
||||||
|
err := addr.Init()
|
||||||
|
if err != nil {
|
||||||
|
remotelogs.Error("NODE", "changeAPINodeAddrs: validate api node address '"+configutils.QuoteIP(addr.Host)+":"+addr.PortRange+"' failed: "+err.Error())
|
||||||
|
} else {
|
||||||
|
addrs = append(addrs, addr.FullAddresses()...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sort.Strings(addrs)
|
||||||
|
|
||||||
|
if utils.EqualStrings(this.lastAPINodeAddrs, addrs) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
this.lastAPINodeAddrs = addrs
|
||||||
|
|
||||||
|
config, err := configs.LoadAPIConfig()
|
||||||
|
if err != nil {
|
||||||
|
remotelogs.Error("NODE", "changeAPINodeAddrs: "+err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if config == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var oldEndpoints = config.RPC.Endpoints
|
||||||
|
|
||||||
|
rpcClient, err := rpc.SharedRPC()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(addrs) > 0 {
|
||||||
|
this.lastAPINodeVersion++
|
||||||
|
var v = this.lastAPINodeVersion
|
||||||
|
|
||||||
|
// 异步检测,防止阻塞
|
||||||
|
go func(v int64) {
|
||||||
|
// 测试新的API节点地址
|
||||||
|
if rpcClient.TestEndpoints(addrs) {
|
||||||
|
config.RPC.Endpoints = addrs
|
||||||
|
} else {
|
||||||
|
config.RPC.Endpoints = oldEndpoints
|
||||||
|
this.lastAPINodeAddrs = nil // 恢复为空,以便于下次更新重试
|
||||||
|
}
|
||||||
|
|
||||||
|
// 检查测试中间有无新的变更
|
||||||
|
if v != this.lastAPINodeVersion {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = rpcClient.UpdateConfig(config)
|
||||||
|
if err != nil {
|
||||||
|
remotelogs.Error("NODE", "changeAPINodeAddrs: update rpc config failed: "+err.Error())
|
||||||
|
}
|
||||||
|
}(v)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = rpcClient.UpdateConfig(config)
|
||||||
|
if err != nil {
|
||||||
|
remotelogs.Error("NODE", "changeAPINodeAddrs: update rpc config failed: "+err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,23 +1,15 @@
|
|||||||
package nodes
|
package nodes
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"crypto/tls"
|
|
||||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/configs"
|
"github.com/TeaOSLab/EdgeNode/internal/configs"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/rpc"
|
"github.com/TeaOSLab/EdgeNode/internal/rpc"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/trackers"
|
"github.com/TeaOSLab/EdgeNode/internal/trackers"
|
||||||
|
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
||||||
"github.com/iwind/TeaGo/Tea"
|
"github.com/iwind/TeaGo/Tea"
|
||||||
"github.com/iwind/TeaGo/logs"
|
"github.com/iwind/TeaGo/logs"
|
||||||
"google.golang.org/grpc"
|
|
||||||
"google.golang.org/grpc/credentials"
|
|
||||||
"google.golang.org/grpc/credentials/insecure"
|
|
||||||
"net/url"
|
|
||||||
"sort"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -64,6 +56,9 @@ func (this *SyncAPINodesTask) Stop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *SyncAPINodesTask) Loop() error {
|
func (this *SyncAPINodesTask) Loop() error {
|
||||||
|
// 如果有节点定制的API节点地址
|
||||||
|
var hasCustomizedAPINodeAddrs = sharedNodeConfig != nil && len(sharedNodeConfig.APINodeAddrs) > 0
|
||||||
|
|
||||||
config, err := configs.LoadAPIConfig()
|
config, err := configs.LoadAPIConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -96,21 +91,25 @@ func (this *SyncAPINodesTask) Loop() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 和现有的对比
|
// 和现有的对比
|
||||||
if this.isSame(newEndpoints, config.RPC.Endpoints) {
|
if utils.EqualStrings(newEndpoints, config.RPC.Endpoints) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// 测试是否有API节点可用
|
// 测试是否有API节点可用
|
||||||
var hasOk = this.testEndpoints(newEndpoints)
|
var hasOk = rpcClient.TestEndpoints(newEndpoints)
|
||||||
if !hasOk {
|
if !hasOk {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// 修改RPC对象配置
|
// 修改RPC对象配置
|
||||||
config.RPC.Endpoints = newEndpoints
|
config.RPC.Endpoints = newEndpoints
|
||||||
err = rpcClient.UpdateConfig(config)
|
|
||||||
if err != nil {
|
// 更新当前RPC
|
||||||
return err
|
if !hasCustomizedAPINodeAddrs {
|
||||||
|
err = rpcClient.UpdateConfig(config)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 保存到文件
|
// 保存到文件
|
||||||
@@ -121,53 +120,3 @@ func (this *SyncAPINodesTask) Loop() error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *SyncAPINodesTask) isSame(endpoints1 []string, endpoints2 []string) bool {
|
|
||||||
sort.Strings(endpoints1)
|
|
||||||
sort.Strings(endpoints2)
|
|
||||||
return strings.Join(endpoints1, "&") == strings.Join(endpoints2, "&")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (this *SyncAPINodesTask) testEndpoints(endpoints []string) bool {
|
|
||||||
if len(endpoints) == 0 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
var wg = sync.WaitGroup{}
|
|
||||||
wg.Add(len(endpoints))
|
|
||||||
|
|
||||||
var ok = false
|
|
||||||
|
|
||||||
for _, endpoint := range endpoints {
|
|
||||||
go func(endpoint string) {
|
|
||||||
defer wg.Done()
|
|
||||||
|
|
||||||
u, err := url.Parse(endpoint)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
|
|
||||||
defer func() {
|
|
||||||
cancelFunc()
|
|
||||||
}()
|
|
||||||
var conn *grpc.ClientConn
|
|
||||||
if u.Scheme == "http" {
|
|
||||||
conn, err = grpc.DialContext(ctx, u.Host, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
|
|
||||||
} else if u.Scheme == "https" {
|
|
||||||
conn, err = grpc.DialContext(ctx, u.Host, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{
|
|
||||||
InsecureSkipVerify: true,
|
|
||||||
})), grpc.WithBlock())
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
_ = conn.Close()
|
|
||||||
|
|
||||||
ok = true
|
|
||||||
}(endpoint)
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
return ok
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -160,6 +160,64 @@ func (this *RPCClient) UpdateConfig(config *configs.APIConfig) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestEndpoints 测试Endpoints是否可用
|
||||||
|
func (this *RPCClient) TestEndpoints(endpoints []string) bool {
|
||||||
|
if len(endpoints) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
var wg = sync.WaitGroup{}
|
||||||
|
wg.Add(len(endpoints))
|
||||||
|
|
||||||
|
var ok = false
|
||||||
|
|
||||||
|
for _, endpoint := range endpoints {
|
||||||
|
go func(endpoint string) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
u, err := url.Parse(endpoint)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer func() {
|
||||||
|
cancelFunc()
|
||||||
|
}()
|
||||||
|
var conn *grpc.ClientConn
|
||||||
|
if u.Scheme == "http" {
|
||||||
|
conn, err = grpc.DialContext(ctx, u.Host, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
|
||||||
|
} else if u.Scheme == "https" {
|
||||||
|
conn, err = grpc.DialContext(ctx, u.Host, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{
|
||||||
|
InsecureSkipVerify: true,
|
||||||
|
})), grpc.WithBlock())
|
||||||
|
} else {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if conn == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
_ = conn.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
var pingService = pb.NewPingServiceClient(conn)
|
||||||
|
_, err = pingService.Ping(this.Context(), &pb.PingRequest{})
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ok = true
|
||||||
|
}(endpoint)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
// 初始化
|
// 初始化
|
||||||
func (this *RPCClient) init() error {
|
func (this *RPCClient) init() error {
|
||||||
// 重新连接
|
// 重新连接
|
||||||
|
|||||||
@@ -42,8 +42,8 @@ func ToValidUTF8string(v string) string {
|
|||||||
return strings.ToValidUTF8(v, "")
|
return strings.ToValidUTF8(v, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
// ContainsSameStrings 检查两个字符串slice内容是否一致
|
// EqualStrings 检查两个字符串slice内容是否一致
|
||||||
func ContainsSameStrings(s1 []string, s2 []string) bool {
|
func EqualStrings(s1 []string, s2 []string) bool {
|
||||||
if len(s1) != len(s2) {
|
if len(s1) != len(s2) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -59,9 +59,9 @@ func TestFormatAddressList(t *testing.T) {
|
|||||||
|
|
||||||
func TestContainsSameStrings(t *testing.T) {
|
func TestContainsSameStrings(t *testing.T) {
|
||||||
var a = assert.NewAssertion(t)
|
var a = assert.NewAssertion(t)
|
||||||
a.IsFalse(utils.ContainsSameStrings([]string{"a"}, []string{"b"}))
|
a.IsFalse(utils.EqualStrings([]string{"a"}, []string{"b"}))
|
||||||
a.IsFalse(utils.ContainsSameStrings([]string{"a", "b"}, []string{"b"}))
|
a.IsFalse(utils.EqualStrings([]string{"a", "b"}, []string{"b"}))
|
||||||
a.IsFalse(utils.ContainsSameStrings([]string{"a", "b"}, []string{"a", "b", "c"}))
|
a.IsFalse(utils.EqualStrings([]string{"a", "b"}, []string{"a", "b", "c"}))
|
||||||
a.IsTrue(utils.ContainsSameStrings([]string{"a", "b"}, []string{"a", "b"}))
|
a.IsTrue(utils.EqualStrings([]string{"a", "b"}, []string{"a", "b"}))
|
||||||
a.IsTrue(utils.ContainsSameStrings([]string{"a", "b"}, []string{"b", "a"}))
|
a.IsTrue(utils.EqualStrings([]string{"a", "b"}, []string{"b", "a"}))
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user