mirror of
https://github.com/TeaOSLab/EdgeAPI.git
synced 2025-11-03 23:20:26 +08:00
实现远程升级节点
This commit is contained in:
@@ -548,6 +548,24 @@ func (this *NodeDAO) FindAllNotInstalledNodesWithClusterId(clusterId int64) (res
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 查找所有低于某个版本的节点
|
||||||
|
func (this *NodeDAO) FindAllLowerVersionNodesWithClusterId(clusterId int64, os string, arch string, version string) (result []*Node, err error) {
|
||||||
|
_, err = this.Query().
|
||||||
|
State(NodeStateEnabled).
|
||||||
|
Attr("clusterId", clusterId).
|
||||||
|
Where("status IS NOT NULL").
|
||||||
|
Where("JSON_EXTRACT(status, '$.os')=:os").
|
||||||
|
Where("JSON_EXTRACT(status, '$.arch')=:arch").
|
||||||
|
Where("INET_ATON(JSON_UNQUOTE(JSON_EXTRACT(status, '$.buildVersion')))<INET_ATON(:version)").
|
||||||
|
Param("os", os).
|
||||||
|
Param("arch", arch).
|
||||||
|
Param("version", version).
|
||||||
|
DescPk().
|
||||||
|
Slice(&result).
|
||||||
|
FindAll()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// 生成唯一ID
|
// 生成唯一ID
|
||||||
func (this *NodeDAO) genUniqueId() (string, error) {
|
func (this *NodeDAO) genUniqueId() (string, error) {
|
||||||
for {
|
for {
|
||||||
|
|||||||
62
internal/installers/deploy_manager.go
Normal file
62
internal/installers/deploy_manager.go
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
package installers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/iwind/TeaGo/Tea"
|
||||||
|
"github.com/iwind/TeaGo/files"
|
||||||
|
stringutil "github.com/iwind/TeaGo/utils/string"
|
||||||
|
"regexp"
|
||||||
|
)
|
||||||
|
|
||||||
|
var SharedDeployManager = NewDeployManager()
|
||||||
|
|
||||||
|
type DeployFile struct {
|
||||||
|
OS string
|
||||||
|
Arch string
|
||||||
|
Version string
|
||||||
|
Path string
|
||||||
|
}
|
||||||
|
|
||||||
|
type DeployManager struct {
|
||||||
|
dir string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDeployManager() *DeployManager {
|
||||||
|
return &DeployManager{
|
||||||
|
dir: Tea.Root + "/deploy",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 加载所有文件
|
||||||
|
func (this *DeployManager) LoadFiles() []*DeployFile {
|
||||||
|
keyMap := map[string]*DeployFile{} // key => File
|
||||||
|
|
||||||
|
reg := regexp.MustCompile(`(\w+)-(\w+)-v([0-9.]+)\.zip`)
|
||||||
|
for _, file := range files.NewFile(this.dir).List() {
|
||||||
|
name := file.Name()
|
||||||
|
if !reg.MatchString(name) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
matches := reg.FindStringSubmatch(name)
|
||||||
|
osName := matches[1]
|
||||||
|
arch := matches[2]
|
||||||
|
version := matches[3]
|
||||||
|
|
||||||
|
key := osName + "_" + arch
|
||||||
|
oldFile, ok := keyMap[key]
|
||||||
|
if ok && stringutil.VersionCompare(oldFile.Version, version) > 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
keyMap[key] = &DeployFile{
|
||||||
|
OS: osName,
|
||||||
|
Arch: arch,
|
||||||
|
Version: version,
|
||||||
|
Path: file.Path(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result := []*DeployFile{}
|
||||||
|
for _, v := range keyMap {
|
||||||
|
result = append(result, v)
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
10
internal/installers/deploy_manager_test.go
Normal file
10
internal/installers/deploy_manager_test.go
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
package installers
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func TestDeployManager_LoadFiles(t *testing.T) {
|
||||||
|
files := NewDeployManager().LoadFiles()
|
||||||
|
for _, file := range files {
|
||||||
|
t.Logf("%#v", file)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -42,15 +42,6 @@ func (this *NodeInstaller) Install(dir string, params interface{}, installStatus
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// 测试环境
|
|
||||||
_, stderr, err := this.client.Exec(dir + "/" + env.HelperName + " -cmd=test")
|
|
||||||
if err != nil {
|
|
||||||
return errors.New("test failed: " + err.Error())
|
|
||||||
}
|
|
||||||
if len(stderr) > 0 {
|
|
||||||
return errors.New("test failed: " + stderr)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 上传安装文件
|
// 上传安装文件
|
||||||
filePrefix := "edge-node-" + env.OS + "-" + env.Arch
|
filePrefix := "edge-node-" + env.OS + "-" + env.Arch
|
||||||
zipFile, err := this.LookupLatestInstaller(filePrefix)
|
zipFile, err := this.LookupLatestInstaller(filePrefix)
|
||||||
@@ -66,8 +57,35 @@ func (this *NodeInstaller) Install(dir string, params interface{}, installStatus
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 测试运行环境
|
||||||
|
// 升级的节点暂时不列入测试
|
||||||
|
if !nodeParams.IsUpgrading {
|
||||||
|
_, stderr, err := this.client.Exec(dir + "/" + env.HelperName + " -cmd=test")
|
||||||
|
if err != nil {
|
||||||
|
return errors.New("test failed: " + err.Error())
|
||||||
|
}
|
||||||
|
if len(stderr) > 0 {
|
||||||
|
return errors.New("test failed: " + stderr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果是升级则优雅停止先前的进程
|
||||||
|
exePath := dir + "/edge-node/bin/edge-node"
|
||||||
|
if nodeParams.IsUpgrading {
|
||||||
|
_, err = this.client.Stat(exePath)
|
||||||
|
if err == nil {
|
||||||
|
_, _, _ = this.client.Exec(exePath + " quit")
|
||||||
|
}
|
||||||
|
|
||||||
|
// 删除可执行文件防止冲突
|
||||||
|
err = this.client.Remove(exePath)
|
||||||
|
if err != nil {
|
||||||
|
return errors.New("remove old file failed: " + err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// 解压
|
// 解压
|
||||||
_, stderr, err = this.client.Exec(dir + "/" + env.HelperName + " -cmd=unzip -zip=\"" + targetZip + "\" -target=\"" + dir + "\"")
|
_, stderr, err := this.client.Exec(dir + "/" + env.HelperName + " -cmd=unzip -zip=\"" + targetZip + "\" -target=\"" + dir + "\"")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ type NodeParams struct {
|
|||||||
Endpoints []string
|
Endpoints []string
|
||||||
NodeId string
|
NodeId string
|
||||||
Secret string
|
Secret string
|
||||||
|
IsUpgrading bool // 是否为升级
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *NodeParams) Validate() error {
|
func (this *NodeParams) Validate() error {
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ func SharedQueue() *Queue {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 安装边缘节点流程控制
|
// 安装边缘节点流程控制
|
||||||
func (this *Queue) InstallNodeProcess(nodeId int64) error {
|
func (this *Queue) InstallNodeProcess(nodeId int64, isUpgrading bool) error {
|
||||||
installStatus := models.NewNodeInstallStatus()
|
installStatus := models.NewNodeInstallStatus()
|
||||||
installStatus.IsRunning = true
|
installStatus.IsRunning = true
|
||||||
installStatus.UpdatedAt = time.Now().Unix()
|
installStatus.UpdatedAt = time.Now().Unix()
|
||||||
@@ -51,7 +51,7 @@ func (this *Queue) InstallNodeProcess(nodeId int64) error {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// 开始安装
|
// 开始安装
|
||||||
err = this.InstallNode(nodeId, installStatus)
|
err = this.InstallNode(nodeId, installStatus, isUpgrading)
|
||||||
|
|
||||||
// 安装结束
|
// 安装结束
|
||||||
installStatus.IsRunning = false
|
installStatus.IsRunning = false
|
||||||
@@ -78,7 +78,7 @@ func (this *Queue) InstallNodeProcess(nodeId int64) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 安装边缘节点
|
// 安装边缘节点
|
||||||
func (this *Queue) InstallNode(nodeId int64, installStatus *models.NodeInstallStatus) error {
|
func (this *Queue) InstallNode(nodeId int64, installStatus *models.NodeInstallStatus, isUpgrading bool) error {
|
||||||
node, err := models.SharedNodeDAO.FindEnabledNode(nodeId)
|
node, err := models.SharedNodeDAO.FindEnabledNode(nodeId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -174,6 +174,7 @@ func (this *Queue) InstallNode(nodeId int64, installStatus *models.NodeInstallSt
|
|||||||
Endpoints: apiEndpoints,
|
Endpoints: apiEndpoints,
|
||||||
NodeId: node.UniqueId,
|
NodeId: node.UniqueId,
|
||||||
Secret: node.Secret,
|
Secret: node.Secret,
|
||||||
|
IsUpgrading: isUpgrading,
|
||||||
}
|
}
|
||||||
|
|
||||||
installer := &NodeInstaller{}
|
installer := &NodeInstaller{}
|
||||||
@@ -187,6 +188,9 @@ func (this *Queue) InstallNode(nodeId int64, installStatus *models.NodeInstallSt
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
defer func() {
|
||||||
|
_ = installer.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
err = installer.Install(installDir, params, installStatus)
|
err = installer.Install(installDir, params, installStatus)
|
||||||
return err
|
return err
|
||||||
@@ -271,6 +275,9 @@ func (this *Queue) StartNode(nodeId int64) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
defer func() {
|
||||||
|
_ = installer.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
// 检查命令是否存在
|
// 检查命令是否存在
|
||||||
exeFile := installDir + "/edge-node/bin/edge-node"
|
exeFile := installDir + "/edge-node/bin/edge-node"
|
||||||
@@ -369,6 +376,9 @@ func (this *Queue) StopNode(nodeId int64) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
defer func() {
|
||||||
|
_ = installer.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
// 检查命令是否存在
|
// 检查命令是否存在
|
||||||
exeFile := installDir + "/edge-node/bin/edge-node"
|
exeFile := installDir + "/edge-node/bin/edge-node"
|
||||||
|
|||||||
@@ -147,3 +147,8 @@ func (this *SSHClient) WriteFile(path string, data []byte) (n int, err error) {
|
|||||||
n, err = fp.Write(data)
|
n, err = fp.Write(data)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 删除文件
|
||||||
|
func (this *SSHClient) Remove(path string) error {
|
||||||
|
return this.sftp.Remove(path)
|
||||||
|
}
|
||||||
|
|||||||
@@ -407,7 +407,7 @@ func (this *NodeService) InstallNode(ctx context.Context, req *pb.InstallNodeReq
|
|||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
err = installers.SharedQueue().InstallNodeProcess(req.NodeId)
|
err = installers.SharedQueue().InstallNodeProcess(req.NodeId, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logs.Println("[RPC]install node:" + err.Error())
|
logs.Println("[RPC]install node:" + err.Error())
|
||||||
}
|
}
|
||||||
@@ -416,6 +416,44 @@ func (this *NodeService) InstallNode(ctx context.Context, req *pb.InstallNodeReq
|
|||||||
return &pb.InstallNodeResponse{}, nil
|
return &pb.InstallNodeResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 升级节点
|
||||||
|
func (this *NodeService) UpgradeNode(ctx context.Context, req *pb.UpgradeNodeRequest) (*pb.UpgradeNodeResponse, error) {
|
||||||
|
// 校验节点
|
||||||
|
_, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = models.SharedNodeDAO.UpdateNodeIsInstalled(req.NodeId, false)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 检查状态
|
||||||
|
installStatus, err := models.SharedNodeDAO.FindNodeInstallStatus(req.NodeId)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if installStatus == nil {
|
||||||
|
installStatus = &models.NodeInstallStatus{}
|
||||||
|
}
|
||||||
|
installStatus.IsOk = false
|
||||||
|
installStatus.IsFinished = false
|
||||||
|
err = models.SharedNodeDAO.UpdateNodeInstallStatus(req.NodeId, installStatus)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
err = installers.SharedQueue().InstallNodeProcess(req.NodeId, true)
|
||||||
|
if err != nil {
|
||||||
|
logs.Println("[RPC]install node:" + err.Error())
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return &pb.UpgradeNodeResponse{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// 启动节点
|
// 启动节点
|
||||||
func (this *NodeService) StartNode(ctx context.Context, req *pb.StartNodeRequest) (*pb.StartNodeResponse, error) {
|
func (this *NodeService) StartNode(ctx context.Context, req *pb.StartNodeRequest) (*pb.StartNodeResponse, error) {
|
||||||
// 校验节点
|
// 校验节点
|
||||||
@@ -602,6 +640,110 @@ func (this *NodeService) FindAllNotInstalledNodesWithClusterId(ctx context.Conte
|
|||||||
return &pb.FindAllNotInstalledNodesWithClusterIdResponse{Nodes: result}, nil
|
return &pb.FindAllNotInstalledNodesWithClusterIdResponse{Nodes: result}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 列出所有需要升级的节点
|
||||||
|
func (this *NodeService) FindAllUpgradeNodesWithClusterId(ctx context.Context, req *pb.FindAllUpgradeNodesWithClusterIdRequest) (*pb.FindAllUpgradeNodesWithClusterIdResponse, error) {
|
||||||
|
// 校验请求
|
||||||
|
_, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 获取当前能升级到的最新版本
|
||||||
|
deployFiles := installers.SharedDeployManager.LoadFiles()
|
||||||
|
result := []*pb.FindAllUpgradeNodesWithClusterIdResponse_NodeUpgrade{}
|
||||||
|
for _, deployFile := range deployFiles {
|
||||||
|
nodes, err := models.SharedNodeDAO.FindAllLowerVersionNodesWithClusterId(req.ClusterId, deployFile.OS, deployFile.Arch, deployFile.Version)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for _, node := range nodes {
|
||||||
|
// 认证信息
|
||||||
|
login, err := models.SharedNodeLoginDAO.FindEnabledNodeLoginWithNodeId(int64(node.Id))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var pbLogin *pb.NodeLogin = nil
|
||||||
|
if login != nil {
|
||||||
|
pbLogin = &pb.NodeLogin{
|
||||||
|
Id: int64(login.Id),
|
||||||
|
Name: login.Name,
|
||||||
|
Type: login.Type,
|
||||||
|
Params: []byte(login.Params),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// IP信息
|
||||||
|
addresses, err := models.SharedNodeIPAddressDAO.FindAllEnabledAddressesWithNode(int64(node.Id))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
pbAddresses := []*pb.NodeIPAddress{}
|
||||||
|
for _, address := range addresses {
|
||||||
|
pbAddresses = append(pbAddresses, &pb.NodeIPAddress{
|
||||||
|
Id: int64(address.Id),
|
||||||
|
NodeId: int64(address.NodeId),
|
||||||
|
Name: address.Name,
|
||||||
|
Ip: address.Ip,
|
||||||
|
Description: address.Description,
|
||||||
|
State: int64(address.State),
|
||||||
|
Order: int64(address.Order),
|
||||||
|
CanAccess: address.CanAccess == 1,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// 状态
|
||||||
|
status, err := node.DecodeStatus()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if status == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// 安装信息
|
||||||
|
installStatus, err := node.DecodeInstallStatus()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
pbInstallStatus := &pb.NodeInstallStatus{}
|
||||||
|
if installStatus != nil {
|
||||||
|
pbInstallStatus = &pb.NodeInstallStatus{
|
||||||
|
IsRunning: installStatus.IsRunning,
|
||||||
|
IsFinished: installStatus.IsFinished,
|
||||||
|
IsOk: installStatus.IsOk,
|
||||||
|
Error: installStatus.Error,
|
||||||
|
ErrorCode: installStatus.ErrorCode,
|
||||||
|
UpdatedAt: installStatus.UpdatedAt,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pbNode := &pb.Node{
|
||||||
|
Id: int64(node.Id),
|
||||||
|
Name: node.Name,
|
||||||
|
Version: int64(node.Version),
|
||||||
|
IsInstalled: node.IsInstalled == 1,
|
||||||
|
StatusJSON: []byte(node.Status),
|
||||||
|
IsOn: node.IsOn == 1,
|
||||||
|
IpAddresses: pbAddresses,
|
||||||
|
Login: pbLogin,
|
||||||
|
InstallStatus: pbInstallStatus,
|
||||||
|
}
|
||||||
|
|
||||||
|
result = append(result, &pb.FindAllUpgradeNodesWithClusterIdResponse_NodeUpgrade{
|
||||||
|
Os: deployFile.OS,
|
||||||
|
Arch: deployFile.Arch,
|
||||||
|
OldVersion: status.BuildVersion,
|
||||||
|
NewVersion: deployFile.Version,
|
||||||
|
Node: pbNode,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &pb.FindAllUpgradeNodesWithClusterIdResponse{
|
||||||
|
Nodes: result,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// 读取节点安装状态
|
// 读取节点安装状态
|
||||||
func (this *NodeService) FindNodeInstallStatus(ctx context.Context, req *pb.FindNodeInstallStatusRequest) (*pb.FindNodeInstallStatusResponse, error) {
|
func (this *NodeService) FindNodeInstallStatus(ctx context.Context, req *pb.FindNodeInstallStatusRequest) (*pb.FindNodeInstallStatusResponse, error) {
|
||||||
// 校验请求
|
// 校验请求
|
||||||
|
|||||||
Reference in New Issue
Block a user