From 9c6a8c64a1cfc23b557c71190d79cffdf891c541 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Wed, 28 Oct 2020 12:35:36 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E8=BF=9C=E7=A8=8B=E5=8D=87?= =?UTF-8?q?=E7=BA=A7=E8=8A=82=E7=82=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/db/models/node_dao.go | 18 +++ internal/installers/deploy_manager.go | 62 +++++++++ internal/installers/deploy_manager_test.go | 10 ++ internal/installers/installer_node.go | 38 ++++-- internal/installers/params_node.go | 7 +- internal/installers/queue.go | 22 +++- internal/installers/ssh_client.go | 5 + internal/rpc/services/service_node.go | 144 ++++++++++++++++++++- 8 files changed, 286 insertions(+), 20 deletions(-) create mode 100644 internal/installers/deploy_manager.go create mode 100644 internal/installers/deploy_manager_test.go diff --git a/internal/db/models/node_dao.go b/internal/db/models/node_dao.go index 85221a3b..b2762420 100644 --- a/internal/db/models/node_dao.go +++ b/internal/db/models/node_dao.go @@ -548,6 +548,24 @@ func (this *NodeDAO) FindAllNotInstalledNodesWithClusterId(clusterId int64) (res return } +// 查找所有低于某个版本的节点 +func (this *NodeDAO) FindAllLowerVersionNodesWithClusterId(clusterId int64, os string, arch string, version string) (result []*Node, err error) { + _, err = this.Query(). + State(NodeStateEnabled). + Attr("clusterId", clusterId). + Where("status IS NOT NULL"). + Where("JSON_EXTRACT(status, '$.os')=:os"). + Where("JSON_EXTRACT(status, '$.arch')=:arch"). + Where("INET_ATON(JSON_UNQUOTE(JSON_EXTRACT(status, '$.buildVersion'))) File + + reg := regexp.MustCompile(`(\w+)-(\w+)-v([0-9.]+)\.zip`) + for _, file := range files.NewFile(this.dir).List() { + name := file.Name() + if !reg.MatchString(name) { + continue + } + matches := reg.FindStringSubmatch(name) + osName := matches[1] + arch := matches[2] + version := matches[3] + + key := osName + "_" + arch + oldFile, ok := keyMap[key] + if ok && stringutil.VersionCompare(oldFile.Version, version) > 0 { + continue + } + keyMap[key] = &DeployFile{ + OS: osName, + Arch: arch, + Version: version, + Path: file.Path(), + } + } + + result := []*DeployFile{} + for _, v := range keyMap { + result = append(result, v) + } + return result +} diff --git a/internal/installers/deploy_manager_test.go b/internal/installers/deploy_manager_test.go new file mode 100644 index 00000000..1a98ddcb --- /dev/null +++ b/internal/installers/deploy_manager_test.go @@ -0,0 +1,10 @@ +package installers + +import "testing" + +func TestDeployManager_LoadFiles(t *testing.T) { + files := NewDeployManager().LoadFiles() + for _, file := range files { + t.Logf("%#v", file) + } +} diff --git a/internal/installers/installer_node.go b/internal/installers/installer_node.go index c2399ba9..2318a166 100644 --- a/internal/installers/installer_node.go +++ b/internal/installers/installer_node.go @@ -42,15 +42,6 @@ func (this *NodeInstaller) Install(dir string, params interface{}, installStatus return err } - // 测试环境 - _, stderr, err := this.client.Exec(dir + "/" + env.HelperName + " -cmd=test") - if err != nil { - return errors.New("test failed: " + err.Error()) - } - if len(stderr) > 0 { - return errors.New("test failed: " + stderr) - } - // 上传安装文件 filePrefix := "edge-node-" + env.OS + "-" + env.Arch zipFile, err := this.LookupLatestInstaller(filePrefix) @@ -66,8 +57,35 @@ func (this *NodeInstaller) Install(dir string, params interface{}, installStatus return err } + // 测试运行环境 + // 升级的节点暂时不列入测试 + if !nodeParams.IsUpgrading { + _, stderr, err := this.client.Exec(dir + "/" + env.HelperName + " -cmd=test") + if err != nil { + return errors.New("test failed: " + err.Error()) + } + if len(stderr) > 0 { + return errors.New("test failed: " + stderr) + } + } + + // 如果是升级则优雅停止先前的进程 + exePath := dir + "/edge-node/bin/edge-node" + if nodeParams.IsUpgrading { + _, err = this.client.Stat(exePath) + if err == nil { + _, _, _ = this.client.Exec(exePath + " quit") + } + + // 删除可执行文件防止冲突 + err = this.client.Remove(exePath) + if err != nil { + return errors.New("remove old file failed: " + err.Error()) + } + } + // 解压 - _, stderr, err = this.client.Exec(dir + "/" + env.HelperName + " -cmd=unzip -zip=\"" + targetZip + "\" -target=\"" + dir + "\"") + _, stderr, err := this.client.Exec(dir + "/" + env.HelperName + " -cmd=unzip -zip=\"" + targetZip + "\" -target=\"" + dir + "\"") if err != nil { return err } diff --git a/internal/installers/params_node.go b/internal/installers/params_node.go index 1f7bdbca..b8251508 100644 --- a/internal/installers/params_node.go +++ b/internal/installers/params_node.go @@ -6,9 +6,10 @@ import ( ) type NodeParams struct { - Endpoints []string - NodeId string - Secret string + Endpoints []string + NodeId string + Secret string + IsUpgrading bool // 是否为升级 } func (this *NodeParams) Validate() error { diff --git a/internal/installers/queue.go b/internal/installers/queue.go index 52644db8..17a7b322 100644 --- a/internal/installers/queue.go +++ b/internal/installers/queue.go @@ -24,7 +24,7 @@ func SharedQueue() *Queue { } // 安装边缘节点流程控制 -func (this *Queue) InstallNodeProcess(nodeId int64) error { +func (this *Queue) InstallNodeProcess(nodeId int64, isUpgrading bool) error { installStatus := models.NewNodeInstallStatus() installStatus.IsRunning = true installStatus.UpdatedAt = time.Now().Unix() @@ -51,7 +51,7 @@ func (this *Queue) InstallNodeProcess(nodeId int64) error { }() // 开始安装 - err = this.InstallNode(nodeId, installStatus) + err = this.InstallNode(nodeId, installStatus, isUpgrading) // 安装结束 installStatus.IsRunning = false @@ -78,7 +78,7 @@ func (this *Queue) InstallNodeProcess(nodeId int64) error { } // 安装边缘节点 -func (this *Queue) InstallNode(nodeId int64, installStatus *models.NodeInstallStatus) error { +func (this *Queue) InstallNode(nodeId int64, installStatus *models.NodeInstallStatus, isUpgrading bool) error { node, err := models.SharedNodeDAO.FindEnabledNode(nodeId) if err != nil { return err @@ -171,9 +171,10 @@ func (this *Queue) InstallNode(nodeId int64, installStatus *models.NodeInstallSt } params := &NodeParams{ - Endpoints: apiEndpoints, - NodeId: node.UniqueId, - Secret: node.Secret, + Endpoints: apiEndpoints, + NodeId: node.UniqueId, + Secret: node.Secret, + IsUpgrading: isUpgrading, } installer := &NodeInstaller{} @@ -187,6 +188,9 @@ func (this *Queue) InstallNode(nodeId int64, installStatus *models.NodeInstallSt if err != nil { return err } + defer func() { + _ = installer.Close() + }() err = installer.Install(installDir, params, installStatus) return err @@ -271,6 +275,9 @@ func (this *Queue) StartNode(nodeId int64) error { if err != nil { return err } + defer func() { + _ = installer.Close() + }() // 检查命令是否存在 exeFile := installDir + "/edge-node/bin/edge-node" @@ -369,6 +376,9 @@ func (this *Queue) StopNode(nodeId int64) error { if err != nil { return err } + defer func() { + _ = installer.Close() + }() // 检查命令是否存在 exeFile := installDir + "/edge-node/bin/edge-node" diff --git a/internal/installers/ssh_client.go b/internal/installers/ssh_client.go index 60816e39..ab6b487c 100644 --- a/internal/installers/ssh_client.go +++ b/internal/installers/ssh_client.go @@ -147,3 +147,8 @@ func (this *SSHClient) WriteFile(path string, data []byte) (n int, err error) { n, err = fp.Write(data) return } + +// 删除文件 +func (this *SSHClient) Remove(path string) error { + return this.sftp.Remove(path) +} diff --git a/internal/rpc/services/service_node.go b/internal/rpc/services/service_node.go index 817bc670..cc22d17a 100644 --- a/internal/rpc/services/service_node.go +++ b/internal/rpc/services/service_node.go @@ -407,7 +407,7 @@ func (this *NodeService) InstallNode(ctx context.Context, req *pb.InstallNodeReq } go func() { - err = installers.SharedQueue().InstallNodeProcess(req.NodeId) + err = installers.SharedQueue().InstallNodeProcess(req.NodeId, false) if err != nil { logs.Println("[RPC]install node:" + err.Error()) } @@ -416,6 +416,44 @@ func (this *NodeService) InstallNode(ctx context.Context, req *pb.InstallNodeReq return &pb.InstallNodeResponse{}, nil } +// 升级节点 +func (this *NodeService) UpgradeNode(ctx context.Context, req *pb.UpgradeNodeRequest) (*pb.UpgradeNodeResponse, error) { + // 校验节点 + _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin) + if err != nil { + return nil, err + } + + err = models.SharedNodeDAO.UpdateNodeIsInstalled(req.NodeId, false) + if err != nil { + return nil, err + } + + // 检查状态 + installStatus, err := models.SharedNodeDAO.FindNodeInstallStatus(req.NodeId) + if err != nil { + return nil, err + } + if installStatus == nil { + installStatus = &models.NodeInstallStatus{} + } + installStatus.IsOk = false + installStatus.IsFinished = false + err = models.SharedNodeDAO.UpdateNodeInstallStatus(req.NodeId, installStatus) + if err != nil { + return nil, err + } + + go func() { + err = installers.SharedQueue().InstallNodeProcess(req.NodeId, true) + if err != nil { + logs.Println("[RPC]install node:" + err.Error()) + } + }() + + return &pb.UpgradeNodeResponse{}, nil +} + // 启动节点 func (this *NodeService) StartNode(ctx context.Context, req *pb.StartNodeRequest) (*pb.StartNodeResponse, error) { // 校验节点 @@ -602,6 +640,110 @@ func (this *NodeService) FindAllNotInstalledNodesWithClusterId(ctx context.Conte return &pb.FindAllNotInstalledNodesWithClusterIdResponse{Nodes: result}, nil } +// 列出所有需要升级的节点 +func (this *NodeService) FindAllUpgradeNodesWithClusterId(ctx context.Context, req *pb.FindAllUpgradeNodesWithClusterIdRequest) (*pb.FindAllUpgradeNodesWithClusterIdResponse, error) { + // 校验请求 + _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin) + if err != nil { + return nil, err + } + + // 获取当前能升级到的最新版本 + deployFiles := installers.SharedDeployManager.LoadFiles() + result := []*pb.FindAllUpgradeNodesWithClusterIdResponse_NodeUpgrade{} + for _, deployFile := range deployFiles { + nodes, err := models.SharedNodeDAO.FindAllLowerVersionNodesWithClusterId(req.ClusterId, deployFile.OS, deployFile.Arch, deployFile.Version) + if err != nil { + return nil, err + } + for _, node := range nodes { + // 认证信息 + login, err := models.SharedNodeLoginDAO.FindEnabledNodeLoginWithNodeId(int64(node.Id)) + if err != nil { + return nil, err + } + var pbLogin *pb.NodeLogin = nil + if login != nil { + pbLogin = &pb.NodeLogin{ + Id: int64(login.Id), + Name: login.Name, + Type: login.Type, + Params: []byte(login.Params), + } + } + + // IP信息 + addresses, err := models.SharedNodeIPAddressDAO.FindAllEnabledAddressesWithNode(int64(node.Id)) + if err != nil { + return nil, err + } + + pbAddresses := []*pb.NodeIPAddress{} + for _, address := range addresses { + pbAddresses = append(pbAddresses, &pb.NodeIPAddress{ + Id: int64(address.Id), + NodeId: int64(address.NodeId), + Name: address.Name, + Ip: address.Ip, + Description: address.Description, + State: int64(address.State), + Order: int64(address.Order), + CanAccess: address.CanAccess == 1, + }) + } + + // 状态 + status, err := node.DecodeStatus() + if err != nil { + return nil, err + } + if status == nil { + continue + } + + // 安装信息 + installStatus, err := node.DecodeInstallStatus() + if err != nil { + return nil, err + } + pbInstallStatus := &pb.NodeInstallStatus{} + if installStatus != nil { + pbInstallStatus = &pb.NodeInstallStatus{ + IsRunning: installStatus.IsRunning, + IsFinished: installStatus.IsFinished, + IsOk: installStatus.IsOk, + Error: installStatus.Error, + ErrorCode: installStatus.ErrorCode, + UpdatedAt: installStatus.UpdatedAt, + } + } + + pbNode := &pb.Node{ + Id: int64(node.Id), + Name: node.Name, + Version: int64(node.Version), + IsInstalled: node.IsInstalled == 1, + StatusJSON: []byte(node.Status), + IsOn: node.IsOn == 1, + IpAddresses: pbAddresses, + Login: pbLogin, + InstallStatus: pbInstallStatus, + } + + result = append(result, &pb.FindAllUpgradeNodesWithClusterIdResponse_NodeUpgrade{ + Os: deployFile.OS, + Arch: deployFile.Arch, + OldVersion: status.BuildVersion, + NewVersion: deployFile.Version, + Node: pbNode, + }) + } + } + return &pb.FindAllUpgradeNodesWithClusterIdResponse{ + Nodes: result, + }, nil +} + // 读取节点安装状态 func (this *NodeService) FindNodeInstallStatus(ctx context.Context, req *pb.FindNodeInstallStatusRequest) (*pb.FindNodeInstallStatusResponse, error) { // 校验请求