From f1ffe8c438e6f03eab4525c377924e8cacabd855 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Sat, 17 Oct 2020 11:14:40 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8F=AF=E4=BB=A5=E7=9B=B4=E6=8E=A5=E5=9C=A8?= =?UTF-8?q?=E8=8A=82=E7=82=B9=E5=90=AF=E5=8A=A8=E6=97=B6=E8=87=AA=E5=8A=A8?= =?UTF-8?q?=E6=B3=A8=E5=86=8C=E8=8A=82=E7=82=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build/configs/.gitignore | 3 +- build/configs/cluster.template.yaml | 4 ++ internal/configs/api_config.go | 11 ++++ internal/configs/cluster_config.go | 10 ++++ internal/nodes/api_stream.go | 1 + internal/nodes/node.go | 81 +++++++++++++++++++++++++++-- internal/rpc/rpc_client.go | 24 +++++++++ 7 files changed, 130 insertions(+), 4 deletions(-) create mode 100644 build/configs/cluster.template.yaml create mode 100644 internal/configs/cluster_config.go diff --git a/build/configs/.gitignore b/build/configs/.gitignore index bd58da1..b227765 100644 --- a/build/configs/.gitignore +++ b/build/configs/.gitignore @@ -1,2 +1,3 @@ node.json -api.yaml \ No newline at end of file +api.yaml +cluster.yaml \ No newline at end of file diff --git a/build/configs/cluster.template.yaml b/build/configs/cluster.template.yaml new file mode 100644 index 0000000..5fe7149 --- /dev/null +++ b/build/configs/cluster.template.yaml @@ -0,0 +1,4 @@ +rpc: + endpoints: [ "" ] +clusterId: "" +secret: "" \ No newline at end of file diff --git a/internal/configs/api_config.go b/internal/configs/api_config.go index 5c89cc5..16b829b 100644 --- a/internal/configs/api_config.go +++ b/internal/configs/api_config.go @@ -6,6 +6,7 @@ import ( "io/ioutil" ) +// 节点API配置 type APIConfig struct { RPC struct { Endpoints []string `yaml:"endpoints"` @@ -28,3 +29,13 @@ func LoadAPIConfig() (*APIConfig, error) { return config, nil } + +// 保存到文件 +func (this *APIConfig) WriteFile(path string) error { + data, err := yaml.Marshal(this) + if err != nil { + return err + } + err = ioutil.WriteFile(path, data, 0666) + return err +} diff --git a/internal/configs/cluster_config.go b/internal/configs/cluster_config.go new file mode 100644 index 0000000..94b23d5 --- /dev/null +++ b/internal/configs/cluster_config.go @@ -0,0 +1,10 @@ +package configs + +// 集群配置 +type ClusterConfig struct { + RPC struct { + Endpoints []string `yaml:"endpoints"` + } `yaml:"rpc"` + ClusterId string `yaml:"clusterId"` + Secret string `yaml:"secret"` +} diff --git a/internal/nodes/api_stream.go b/internal/nodes/api_stream.go index 3b60ef4..898416d 100644 --- a/internal/nodes/api_stream.go +++ b/internal/nodes/api_stream.go @@ -48,6 +48,7 @@ func (this *APIStream) loop() error { return errors.Wrap(err) } this.stream = nodeStream + for { message, err := nodeStream.Recv() if err != nil { diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 811a5b9..383d34f 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -6,9 +6,15 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeNode/internal/caches" + "github.com/TeaOSLab/EdgeNode/internal/configs" "github.com/TeaOSLab/EdgeNode/internal/logs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/go-yaml/yaml" + "github.com/iwind/TeaGo/Tea" + tealogs "github.com/iwind/TeaGo/logs" + "io/ioutil" + "os" "runtime" "time" ) @@ -69,20 +75,37 @@ func (this *Node) Start() { // 读取API配置 func (this *Node) syncConfig(isFirstTime bool) error { + // 检查api.yaml是否存在 + apiConfigFile := Tea.ConfigFile("api.yaml") + _, err := os.Stat(apiConfigFile) + if err != nil { + if os.IsNotExist(err) { + clusterErr := this.checkClusterConfig() + if clusterErr != nil { + if os.IsNotExist(clusterErr) { + return err + } + return clusterErr + } + } else { + return err + } + } + rpcClient, err := rpc.SharedRPC() if err != nil { - return errors.New("[NODE]create rpc client failed: " + err.Error()) + return errors.New("create rpc client failed: " + err.Error()) } // TODO 这里考虑只同步版本号有变更的 configResp, err := rpcClient.NodeRPC().ComposeNodeConfig(rpcClient.Context(), &pb.ComposeNodeConfigRequest{}) if err != nil { - return errors.New("[NODE]read config from rpc failed: " + err.Error()) + return errors.New("read config from rpc failed: " + err.Error()) } configJSON := configResp.NodeJSON nodeConfig := &nodeconfigs.NodeConfig{} err = json.Unmarshal(configJSON, nodeConfig) if err != nil { - return errors.New("[NODE]decode config failed: " + err.Error()) + return errors.New("decode config failed: " + err.Error()) } // 写入到文件中 @@ -151,3 +174,55 @@ func (this *Node) startSyncTimer() { } }() } + +// 检查集群设置 +func (this *Node) checkClusterConfig() error { + configFile := Tea.ConfigFile("cluster.yaml") + data, err := ioutil.ReadFile(configFile) + if err != nil { + return err + } + config := &configs.ClusterConfig{} + err = yaml.Unmarshal(data, config) + if err != nil { + return err + } + + rpcClient, err := rpc.NewRPCClient(&configs.APIConfig{ + RPC: config.RPC, + NodeId: config.ClusterId, + Secret: config.Secret, + }) + if err != nil { + return err + } + + tealogs.Println("[NODE]registering node ...") + resp, err := rpcClient.NodeRPC().RegisterClusterNode(rpcClient.ClusterContext(config.ClusterId, config.Secret), &pb.RegisterClusterNodeRequest{Name: HOSTNAME}) + if err != nil { + return err + } + tealogs.Println("[NODE]registered successfully") + + // 写入到配置文件中 + if len(resp.Endpoints) == 0 { + resp.Endpoints = []string{} + } + apiConfig := &configs.APIConfig{ + RPC: struct { + Endpoints []string `yaml:"endpoints"` + }{ + Endpoints: resp.Endpoints, + }, + NodeId: resp.UniqueId, + Secret: resp.Secret, + } + tealogs.Println("[NODE]writing 'configs/api.yaml' ...") + err = apiConfig.WriteFile(Tea.ConfigFile("api.yaml")) + if err != nil { + return err + } + tealogs.Println("[NODE]wrote 'configs/api.yaml' successfully") + + return nil +} diff --git a/internal/rpc/rpc_client.go b/internal/rpc/rpc_client.go index a97752e..c6b53de 100644 --- a/internal/rpc/rpc_client.go +++ b/internal/rpc/rpc_client.go @@ -72,6 +72,7 @@ func (this *RPCClient) HTTPAccessLogRPC() pb.HTTPAccessLogServiceClient { return pb.NewHTTPAccessLogServiceClient(this.pickConn()) } +// 节点上下文信息 func (this *RPCClient) Context() context.Context { ctx := context.Background() m := maps.Map{ @@ -94,6 +95,29 @@ func (this *RPCClient) Context() context.Context { return ctx } +// 集群上下文 +func (this *RPCClient) ClusterContext(clusterId string, clusterSecret string) context.Context { + ctx := context.Background() + m := maps.Map{ + "timestamp": time.Now().Unix(), + "type": "cluster", + "userId": 0, + } + method, err := encrypt.NewMethodInstance(teaconst.EncryptMethod, clusterSecret, clusterId) + if err != nil { + utils.PrintError(err) + return context.Background() + } + data, err := method.Encrypt(m.AsJSON()) + if err != nil { + utils.PrintError(err) + return context.Background() + } + token := base64.StdEncoding.EncodeToString(data) + ctx = metadata.AppendToOutgoingContext(ctx, "nodeId", clusterId, "token", token) + return ctx +} + // 随机选择一个连接 func (this *RPCClient) pickConn() *grpc.ClientConn { if len(this.conns) == 0 {