mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-03 23:20:25 +08:00
可以直接在节点启动时自动注册节点
This commit is contained in:
1
build/configs/.gitignore
vendored
1
build/configs/.gitignore
vendored
@@ -1,2 +1,3 @@
|
||||
node.json
|
||||
api.yaml
|
||||
cluster.yaml
|
||||
4
build/configs/cluster.template.yaml
Normal file
4
build/configs/cluster.template.yaml
Normal file
@@ -0,0 +1,4 @@
|
||||
rpc:
|
||||
endpoints: [ "" ]
|
||||
clusterId: ""
|
||||
secret: ""
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"io/ioutil"
|
||||
)
|
||||
|
||||
// 节点API配置
|
||||
type APIConfig struct {
|
||||
RPC struct {
|
||||
Endpoints []string `yaml:"endpoints"`
|
||||
@@ -28,3 +29,13 @@ func LoadAPIConfig() (*APIConfig, error) {
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
||||
// 保存到文件
|
||||
func (this *APIConfig) WriteFile(path string) error {
|
||||
data, err := yaml.Marshal(this)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = ioutil.WriteFile(path, data, 0666)
|
||||
return err
|
||||
}
|
||||
|
||||
10
internal/configs/cluster_config.go
Normal file
10
internal/configs/cluster_config.go
Normal file
@@ -0,0 +1,10 @@
|
||||
package configs
|
||||
|
||||
// 集群配置
|
||||
type ClusterConfig struct {
|
||||
RPC struct {
|
||||
Endpoints []string `yaml:"endpoints"`
|
||||
} `yaml:"rpc"`
|
||||
ClusterId string `yaml:"clusterId"`
|
||||
Secret string `yaml:"secret"`
|
||||
}
|
||||
@@ -48,6 +48,7 @@ func (this *APIStream) loop() error {
|
||||
return errors.Wrap(err)
|
||||
}
|
||||
this.stream = nodeStream
|
||||
|
||||
for {
|
||||
message, err := nodeStream.Recv()
|
||||
if err != nil {
|
||||
|
||||
@@ -6,9 +6,15 @@ import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/caches"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/configs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/logs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/rpc"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
||||
"github.com/go-yaml/yaml"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
tealogs "github.com/iwind/TeaGo/logs"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"runtime"
|
||||
"time"
|
||||
)
|
||||
@@ -69,20 +75,37 @@ func (this *Node) Start() {
|
||||
|
||||
// 读取API配置
|
||||
func (this *Node) syncConfig(isFirstTime bool) error {
|
||||
// 检查api.yaml是否存在
|
||||
apiConfigFile := Tea.ConfigFile("api.yaml")
|
||||
_, err := os.Stat(apiConfigFile)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
clusterErr := this.checkClusterConfig()
|
||||
if clusterErr != nil {
|
||||
if os.IsNotExist(clusterErr) {
|
||||
return err
|
||||
}
|
||||
return clusterErr
|
||||
}
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
rpcClient, err := rpc.SharedRPC()
|
||||
if err != nil {
|
||||
return errors.New("[NODE]create rpc client failed: " + err.Error())
|
||||
return errors.New("create rpc client failed: " + err.Error())
|
||||
}
|
||||
// TODO 这里考虑只同步版本号有变更的
|
||||
configResp, err := rpcClient.NodeRPC().ComposeNodeConfig(rpcClient.Context(), &pb.ComposeNodeConfigRequest{})
|
||||
if err != nil {
|
||||
return errors.New("[NODE]read config from rpc failed: " + err.Error())
|
||||
return errors.New("read config from rpc failed: " + err.Error())
|
||||
}
|
||||
configJSON := configResp.NodeJSON
|
||||
nodeConfig := &nodeconfigs.NodeConfig{}
|
||||
err = json.Unmarshal(configJSON, nodeConfig)
|
||||
if err != nil {
|
||||
return errors.New("[NODE]decode config failed: " + err.Error())
|
||||
return errors.New("decode config failed: " + err.Error())
|
||||
}
|
||||
|
||||
// 写入到文件中
|
||||
@@ -151,3 +174,55 @@ func (this *Node) startSyncTimer() {
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// 检查集群设置
|
||||
func (this *Node) checkClusterConfig() error {
|
||||
configFile := Tea.ConfigFile("cluster.yaml")
|
||||
data, err := ioutil.ReadFile(configFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
config := &configs.ClusterConfig{}
|
||||
err = yaml.Unmarshal(data, config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rpcClient, err := rpc.NewRPCClient(&configs.APIConfig{
|
||||
RPC: config.RPC,
|
||||
NodeId: config.ClusterId,
|
||||
Secret: config.Secret,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tealogs.Println("[NODE]registering node ...")
|
||||
resp, err := rpcClient.NodeRPC().RegisterClusterNode(rpcClient.ClusterContext(config.ClusterId, config.Secret), &pb.RegisterClusterNodeRequest{Name: HOSTNAME})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tealogs.Println("[NODE]registered successfully")
|
||||
|
||||
// 写入到配置文件中
|
||||
if len(resp.Endpoints) == 0 {
|
||||
resp.Endpoints = []string{}
|
||||
}
|
||||
apiConfig := &configs.APIConfig{
|
||||
RPC: struct {
|
||||
Endpoints []string `yaml:"endpoints"`
|
||||
}{
|
||||
Endpoints: resp.Endpoints,
|
||||
},
|
||||
NodeId: resp.UniqueId,
|
||||
Secret: resp.Secret,
|
||||
}
|
||||
tealogs.Println("[NODE]writing 'configs/api.yaml' ...")
|
||||
err = apiConfig.WriteFile(Tea.ConfigFile("api.yaml"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tealogs.Println("[NODE]wrote 'configs/api.yaml' successfully")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -72,6 +72,7 @@ func (this *RPCClient) HTTPAccessLogRPC() pb.HTTPAccessLogServiceClient {
|
||||
return pb.NewHTTPAccessLogServiceClient(this.pickConn())
|
||||
}
|
||||
|
||||
// 节点上下文信息
|
||||
func (this *RPCClient) Context() context.Context {
|
||||
ctx := context.Background()
|
||||
m := maps.Map{
|
||||
@@ -94,6 +95,29 @@ func (this *RPCClient) Context() context.Context {
|
||||
return ctx
|
||||
}
|
||||
|
||||
// 集群上下文
|
||||
func (this *RPCClient) ClusterContext(clusterId string, clusterSecret string) context.Context {
|
||||
ctx := context.Background()
|
||||
m := maps.Map{
|
||||
"timestamp": time.Now().Unix(),
|
||||
"type": "cluster",
|
||||
"userId": 0,
|
||||
}
|
||||
method, err := encrypt.NewMethodInstance(teaconst.EncryptMethod, clusterSecret, clusterId)
|
||||
if err != nil {
|
||||
utils.PrintError(err)
|
||||
return context.Background()
|
||||
}
|
||||
data, err := method.Encrypt(m.AsJSON())
|
||||
if err != nil {
|
||||
utils.PrintError(err)
|
||||
return context.Background()
|
||||
}
|
||||
token := base64.StdEncoding.EncodeToString(data)
|
||||
ctx = metadata.AppendToOutgoingContext(ctx, "nodeId", clusterId, "token", token)
|
||||
return ctx
|
||||
}
|
||||
|
||||
// 随机选择一个连接
|
||||
func (this *RPCClient) pickConn() *grpc.ClientConn {
|
||||
if len(this.conns) == 0 {
|
||||
|
||||
Reference in New Issue
Block a user