diff --git a/cmd/installer-helper/main.go b/cmd/installer-helper/main.go index 9394c49d..919567a5 100644 --- a/cmd/installer-helper/main.go +++ b/cmd/installer-helper/main.go @@ -3,6 +3,7 @@ package main import ( "flag" "github.com/TeaOSLab/EdgeAPI/internal/utils" + "net" "os" ) @@ -21,6 +22,14 @@ func main() { if len(cmd) == 0 { stderr("need '-cmd=COMMAND' argument") + } else if cmd == "test" { + // 检查是否正在运行 + path := os.TempDir() + "/edge-node.sock" + conn, err := net.Dial("unix", path) + if err == nil { + _ = conn.Close() + stderr("test node status: edge node is running now, can not install again") + } } else if cmd == "unzip" { // 解压 if len(zipPath) == 0 { stderr("ERROR: need '-zip=PATH' argument") diff --git a/internal/installers/installer_interface.go b/internal/installers/installer_interface.go index 751d8333..734105e9 100644 --- a/internal/installers/installer_interface.go +++ b/internal/installers/installer_interface.go @@ -1,11 +1,13 @@ package installers +import "github.com/TeaOSLab/EdgeAPI/internal/db/models" + type InstallerInterface interface { // 登录SSH服务 Login(credentials *Credentials) error // 安装 - Install(dir string, params interface{}) error + Install(dir string, params interface{}, installStatus *models.NodeInstallStatus) error // 关闭连接的SSH服务 Close() error diff --git a/internal/installers/installer_node.go b/internal/installers/installer_node.go index 21bbc656..c2399ba9 100644 --- a/internal/installers/installer_node.go +++ b/internal/installers/installer_node.go @@ -3,14 +3,16 @@ package installers import ( "bytes" "errors" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" "path/filepath" + "regexp" ) type NodeInstaller struct { BaseInstaller } -func (this *NodeInstaller) Install(dir string, params interface{}) error { +func (this *NodeInstaller) Install(dir string, params interface{}, installStatus *models.NodeInstallStatus) error { if params == nil { return errors.New("'params' required for node installation") } @@ -28,6 +30,7 @@ func (this *NodeInstaller) Install(dir string, params interface{}) error { if err != nil { err = this.client.MkdirAll(dir) if err != nil { + installStatus.ErrorCode = "CREATE_ROOT_DIRECTORY_FAILED" return errors.New("create directory '" + dir + "' failed: " + err.Error()) } } @@ -35,9 +38,19 @@ func (this *NodeInstaller) Install(dir string, params interface{}) error { // 安装助手 env, err := this.InstallHelper(dir) if err != nil { + installStatus.ErrorCode = "INSTALL_HELPER_FAILED" return err } + // 测试环境 + _, stderr, err := this.client.Exec(dir + "/" + env.HelperName + " -cmd=test") + if err != nil { + return errors.New("test failed: " + err.Error()) + } + if len(stderr) > 0 { + return errors.New("test failed: " + stderr) + } + // 上传安装文件 filePrefix := "edge-node-" + env.OS + "-" + env.Arch zipFile, err := this.LookupLatestInstaller(filePrefix) @@ -54,7 +67,7 @@ func (this *NodeInstaller) Install(dir string, params interface{}) error { } // 解压 - _, stderr, err := this.client.Exec(dir + "/" + env.HelperName + " -cmd=unzip -zip=\"" + targetZip + "\" -target=\"" + dir + "\"") + _, stderr, err = this.client.Exec(dir + "/" + env.HelperName + " -cmd=unzip -zip=\"" + targetZip + "\" -target=\"" + dir + "\"") if err != nil { return err } @@ -81,6 +94,20 @@ func (this *NodeInstaller) Install(dir string, params interface{}) error { } } + // 测试 + _, stderr, err = this.client.Exec(dir + "/edge-node/bin/edge-node test") + if err != nil { + installStatus.ErrorCode = "TEST_FAILED" + return errors.New("test edge node failed: " + err.Error()) + } + if len(stderr) > 0 { + if regexp.MustCompile(`(?i)rpc`).MatchString(stderr) { + installStatus.ErrorCode = "RPC_TEST_FAILED" + } + + return errors.New("test edge node failed: " + stderr) + } + // 启动 _, stderr, err = this.client.Exec(dir + "/edge-node/bin/edge-node start") if err != nil { diff --git a/internal/installers/installer_node_test.go b/internal/installers/installer_node_test.go index 60e8bded..e7828e8b 100644 --- a/internal/installers/installer_node_test.go +++ b/internal/installers/installer_node_test.go @@ -1,6 +1,9 @@ package installers -import "testing" +import ( + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "testing" +) func TestNodeInstaller_Install(t *testing.T) { var installer InstallerInterface = &NodeInstaller{} @@ -28,7 +31,7 @@ func TestNodeInstaller_Install(t *testing.T) { Endpoints: []string{"192.168.2.40:8003"}, NodeId: "313fdb1b90d0a63c736f307b4d1ca358", Secret: "Pl3u5kYqBDZddp7raw6QfHiuGPRCWF54", - }) + }, &models.NodeInstallStatus{}) if err != nil { t.Fatal(err) } diff --git a/internal/installers/queue.go b/internal/installers/queue.go index 1d0e2afc..52644db8 100644 --- a/internal/installers/queue.go +++ b/internal/installers/queue.go @@ -188,6 +188,202 @@ func (this *Queue) InstallNode(nodeId int64, installStatus *models.NodeInstallSt return err } - err = installer.Install(installDir, params) + err = installer.Install(installDir, params, installStatus) return err } + +// 启动边缘节点 +func (this *Queue) StartNode(nodeId int64) error { + node, err := models.SharedNodeDAO.FindEnabledNode(nodeId) + if err != nil { + return err + } + if node == nil { + return errors.New("can not find node, ID:'" + numberutils.FormatInt64(nodeId) + "'") + } + + // 登录信息 + login, err := models.SharedNodeLoginDAO.FindEnabledNodeLoginWithNodeId(nodeId) + if err != nil { + return err + } + if login == nil { + return errors.New("can not find node login information") + } + loginParams, err := login.DecodeSSHParams() + if err != nil { + return err + } + + if len(loginParams.Host) == 0 { + return errors.New("ssh host should not be empty") + } + + if loginParams.Port <= 0 { + return errors.New("ssh port is invalid") + } + + if loginParams.GrantId == 0 { + // 从集群中读取 + grantId, err := models.SharedNodeClusterDAO.FindClusterGrantId(int64(node.ClusterId)) + if err != nil { + return err + } + if grantId == 0 { + return errors.New("can not find node grant") + } + loginParams.GrantId = grantId + } + grant, err := models.SharedNodeGrantDAO.FindEnabledNodeGrant(loginParams.GrantId) + if err != nil { + return err + } + if grant == nil { + return errors.New("can not find user grant with id '" + numberutils.FormatInt64(loginParams.GrantId) + "'") + } + + // 安装目录 + installDir := node.InstallDir + if len(installDir) == 0 { + clusterId := node.ClusterId + cluster, err := models.SharedNodeClusterDAO.FindEnabledNodeCluster(int64(clusterId)) + if err != nil { + return err + } + if cluster == nil { + return errors.New("can not find cluster, ID:'" + fmt.Sprintf("%d", clusterId) + "'") + } + installDir = cluster.InstallDir + if len(installDir) == 0 { + // 默认是 $登录用户/edge-node + installDir = "/" + grant.Username + "/edge-node" + } + } + + installer := &NodeInstaller{} + err = installer.Login(&Credentials{ + Host: loginParams.Host, + Port: loginParams.Port, + Username: grant.Username, + Password: grant.Password, + PrivateKey: grant.PrivateKey, + }) + if err != nil { + return err + } + + // 检查命令是否存在 + exeFile := installDir + "/edge-node/bin/edge-node" + _, err = installer.client.Stat(exeFile) + if err != nil { + return errors.New("edge node is not installed correctly, can not find executable file: " + exeFile) + } + + _, stderr, err := installer.client.Exec(exeFile + " start") + if err != nil { + return errors.New("start failed: " + err.Error()) + } + if len(stderr) > 0 { + return errors.New("start failed: " + stderr) + } + + return nil +} + +// 停止节点 +func (this *Queue) StopNode(nodeId int64) error { + node, err := models.SharedNodeDAO.FindEnabledNode(nodeId) + if err != nil { + return err + } + if node == nil { + return errors.New("can not find node, ID:'" + numberutils.FormatInt64(nodeId) + "'") + } + + // 登录信息 + login, err := models.SharedNodeLoginDAO.FindEnabledNodeLoginWithNodeId(nodeId) + if err != nil { + return err + } + if login == nil { + return errors.New("can not find node login information") + } + loginParams, err := login.DecodeSSHParams() + if err != nil { + return err + } + + if len(loginParams.Host) == 0 { + return errors.New("ssh host should not be empty") + } + + if loginParams.Port <= 0 { + return errors.New("ssh port is invalid") + } + + if loginParams.GrantId == 0 { + // 从集群中读取 + grantId, err := models.SharedNodeClusterDAO.FindClusterGrantId(int64(node.ClusterId)) + if err != nil { + return err + } + if grantId == 0 { + return errors.New("can not find node grant") + } + loginParams.GrantId = grantId + } + grant, err := models.SharedNodeGrantDAO.FindEnabledNodeGrant(loginParams.GrantId) + if err != nil { + return err + } + if grant == nil { + return errors.New("can not find user grant with id '" + numberutils.FormatInt64(loginParams.GrantId) + "'") + } + + // 安装目录 + installDir := node.InstallDir + if len(installDir) == 0 { + clusterId := node.ClusterId + cluster, err := models.SharedNodeClusterDAO.FindEnabledNodeCluster(int64(clusterId)) + if err != nil { + return err + } + if cluster == nil { + return errors.New("can not find cluster, ID:'" + fmt.Sprintf("%d", clusterId) + "'") + } + installDir = cluster.InstallDir + if len(installDir) == 0 { + // 默认是 $登录用户/edge-node + installDir = "/" + grant.Username + "/edge-node" + } + } + + installer := &NodeInstaller{} + err = installer.Login(&Credentials{ + Host: loginParams.Host, + Port: loginParams.Port, + Username: grant.Username, + Password: grant.Password, + PrivateKey: grant.PrivateKey, + }) + if err != nil { + return err + } + + // 检查命令是否存在 + exeFile := installDir + "/edge-node/bin/edge-node" + _, err = installer.client.Stat(exeFile) + if err != nil { + return errors.New("edge node is not installed correctly, can not find executable file: " + exeFile) + } + + _, stderr, err := installer.client.Exec(exeFile + " stop") + if err != nil { + return errors.New("start failed: " + err.Error()) + } + if len(stderr) > 0 { + return errors.New("start failed: " + stderr) + } + + return nil +} diff --git a/internal/rpc/services/service_node.go b/internal/rpc/services/service_node.go index 0e0a499d..817bc670 100644 --- a/internal/rpc/services/service_node.go +++ b/internal/rpc/services/service_node.go @@ -149,7 +149,7 @@ func (this *NodeService) ListEnabledNodesMatch(ctx context.Context, req *pb.List Name: node.Name, Version: int64(node.Version), IsInstalled: node.IsInstalled == 1, - Status: node.Status, + StatusJSON: []byte(node.Status), Cluster: &pb.NodeCluster{ Id: int64(node.ClusterId), Name: clusterName, @@ -304,7 +304,7 @@ func (this *NodeService) FindEnabledNode(ctx context.Context, req *pb.FindEnable return &pb.FindEnabledNodeResponse{Node: &pb.Node{ Id: int64(node.Id), Name: node.Name, - Status: node.Status, + StatusJSON: []byte(node.Status), UniqueId: node.UniqueId, Version: int64(node.Version), LatestVersion: int64(node.LatestVersion), @@ -400,6 +400,7 @@ func (this *NodeService) UpdateNodeIsInstalled(ctx context.Context, req *pb.Upda // 安装节点 func (this *NodeService) InstallNode(ctx context.Context, req *pb.InstallNodeRequest) (*pb.InstallNodeResponse, error) { + // 校验节点 _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin) if err != nil { return nil, err @@ -415,6 +416,42 @@ func (this *NodeService) InstallNode(ctx context.Context, req *pb.InstallNodeReq return &pb.InstallNodeResponse{}, nil } +// 启动节点 +func (this *NodeService) StartNode(ctx context.Context, req *pb.StartNodeRequest) (*pb.StartNodeResponse, error) { + // 校验节点 + _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin) + if err != nil { + return nil, err + } + + err = installers.SharedQueue().StartNode(req.NodeId) + if err != nil { + return &pb.StartNodeResponse{ + IsOk: false, + Error: err.Error(), + }, nil + } + return &pb.StartNodeResponse{IsOk: true}, nil +} + +// 停止节点 +func (this *NodeService) StopNode(ctx context.Context, req *pb.StopNodeRequest) (*pb.StopNodeResponse, error) { + // 校验节点 + _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin) + if err != nil { + return nil, err + } + + err = installers.SharedQueue().StopNode(req.NodeId) + if err != nil { + return &pb.StopNodeResponse{ + IsOk: false, + Error: err.Error(), + }, nil + } + return &pb.StopNodeResponse{IsOk: true}, nil +} + // 更改节点连接的API节点信息 func (this *NodeService) UpdateNodeConnectedAPINodes(ctx context.Context, req *pb.UpdateNodeConnectedAPINodesRequest) (*pb.RPCUpdateSuccess, error) { // 校验节点 @@ -472,7 +509,7 @@ func (this *NodeService) FindAllEnabledNodesWithGrantId(ctx context.Context, req Name: node.Name, Version: int64(node.Version), IsInstalled: node.IsInstalled == 1, - Status: node.Status, + StatusJSON: []byte(node.Status), Cluster: &pb.NodeCluster{ Id: int64(node.ClusterId), Name: clusterName, @@ -555,7 +592,7 @@ func (this *NodeService) FindAllNotInstalledNodesWithClusterId(ctx context.Conte Name: node.Name, Version: int64(node.Version), IsInstalled: node.IsInstalled == 1, - Status: node.Status, + StatusJSON: []byte(node.Status), IsOn: node.IsOn == 1, Login: pbLogin, IpAddresses: pbAddresses,