实现节点自动升级成最新版本

This commit is contained in:
刘祥超
2021-06-10 19:19:15 +08:00
parent c1af8b36a4
commit 085adcf1c4
5 changed files with 397 additions and 6 deletions

View File

@@ -1,6 +1,7 @@
package nodes
import (
"context"
"encoding/json"
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs"
@@ -13,6 +14,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/logs"
"io"
"net/http"
"os/exec"
@@ -55,10 +57,16 @@ func (this *APIStream) loop() error {
return errors.Wrap(err)
}
isQuiting := false
ctx, cancelFunc := context.WithCancel(rpcClient.Context())
nodeStream, err := rpcClient.NodeRPC().NodeStream(ctx)
events.On(events.EventQuit, func() {
isQuiting = true
remotelogs.Println("API_STREAM", "quiting")
if nodeStream != nil {
cancelFunc()
}
})
nodeStream, err := rpcClient.NodeRPC().NodeStream(rpcClient.Context())
if err != nil {
if isQuiting {
return nil
@@ -69,12 +77,14 @@ func (this *APIStream) loop() error {
for {
if isQuiting {
logs.Println("API_STREAM", "quit")
break
}
message, err := nodeStream.Recv()
if err != nil {
if isQuiting {
remotelogs.Println("API_STREAM", "quit")
return nil
}
return errors.Wrap(err)

View File

@@ -33,8 +33,10 @@ import (
var sharedNodeConfig *nodeconfigs.NodeConfig
var nodeTaskNotify = make(chan bool, 8)
var DaemonIsOn = false
var DaemonPid = 0
// 节点
// Node 节点
type Node struct {
isLoaded bool
}
@@ -43,7 +45,7 @@ func NewNode() *Node {
return &Node{}
}
// 检查配置
// Test 检查配置
func (this *Node) Test() error {
// 检查是否能连接API
rpcClient, err := rpc.SharedRPC()
@@ -58,8 +60,15 @@ func (this *Node) Test() error {
return nil
}
// 启动
// Start 启动
func (this *Node) Start() {
_, ok := os.LookupEnv("EdgeDaemon")
if ok {
remotelogs.Println("NODE", "start from daemon")
DaemonIsOn = true
DaemonPid = os.Getppid()
}
// 启动事件
events.Notify(events.EventStart)
@@ -146,7 +155,7 @@ func (this *Node) Start() {
select {}
}
// 实现守护进程
// Daemon 实现守护进程
func (this *Node) Daemon() {
path := os.TempDir() + "/edge-node.sock"
isDebug := lists.ContainsString(os.Args, "debug")
@@ -164,6 +173,10 @@ func (this *Node) Daemon() {
if err != nil {
return err
}
// 可以标记当前是从守护进程启动的
_ = os.Setenv("EdgeDaemon", "on")
cmd := exec.Command(exe)
err = cmd.Start()
if err != nil {
@@ -191,7 +204,7 @@ func (this *Node) Daemon() {
}
}
// 安装系统服务
// InstallSystemService 安装系统服务
func (this *Node) InstallSystemService() error {
shortName := teaconst.SystemdServiceName
@@ -285,6 +298,8 @@ func (this *Node) loop() error {
if err != nil {
return err
}
case "nodeVersionChanged":
go sharedUpgradeManager.Start()
}
}

View File

@@ -0,0 +1,252 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package nodes
import (
"crypto/md5"
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/logs"
stringutil "github.com/iwind/TeaGo/utils/string"
"os"
"os/exec"
"runtime"
"time"
)
var sharedUpgradeManager = NewUpgradeManager()
// UpgradeManager 节点升级管理器
// TODO 需要在集群中设置是否自动更新
type UpgradeManager struct {
isInstalling bool
lastFile string
}
// NewUpgradeManager 获取新对象
func NewUpgradeManager() *UpgradeManager {
return &UpgradeManager{}
}
// Start 启动升级
func (this *UpgradeManager) Start() {
// 测试环境下不更新
if Tea.IsTesting() {
return
}
if this.isInstalling {
return
}
this.isInstalling = true
// 还原安装状态
defer func() {
this.isInstalling = false
}()
remotelogs.Println("UPGRADE_MANAGER", "upgrading node ...")
err := this.install()
if err != nil {
remotelogs.Error("UPGRADE_MANAGER", "download failed: "+err.Error())
return
}
remotelogs.Println("UPGRADE_MANAGER", "upgrade successfully")
go func() {
err = this.restart()
if err != nil {
logs.Println("UPGRADE_MANAGER", err.Error())
}
}()
}
func (this *UpgradeManager) install() error {
// 检查是否有已下载但未安装成功的
if len(this.lastFile) > 0 {
_, err := os.Stat(this.lastFile)
if err == nil {
err = this.unzip(this.lastFile)
if err != nil {
return err
}
this.lastFile = ""
return nil
}
}
// 创建临时文件
dir := Tea.Root + "/tmp"
_, err := os.Stat(dir)
if err != nil {
if os.IsNotExist(err) {
err = os.Mkdir(dir, 0777)
if err != nil {
return err
}
} else {
return err
}
}
remotelogs.Println("UPGRADE_MANAGER", "downloading new node ...")
path := dir + "/edge-node" + ".tmp"
fp, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0777)
if err != nil {
return err
}
isClosed := false
defer func() {
if !isClosed {
_ = fp.Close()
}
}()
client, err := rpc.SharedRPC()
if err != nil {
return err
}
var offset int64
var h = md5.New()
var sum = ""
var filename = ""
for {
resp, err := client.NodeRPC().DownloadNodeInstallationFile(client.Context(), &pb.DownloadNodeInstallationFileRequest{
Os: runtime.GOOS,
Arch: runtime.GOARCH,
ChunkOffset: offset,
})
if err != nil {
return err
}
if len(resp.Sum) == 0 {
return nil
}
sum = resp.Sum
filename = resp.Filename
if stringutil.VersionCompare(resp.Version, teaconst.Version) <= 0 {
return nil
}
if len(resp.ChunkData) == 0 {
break
}
// 写入文件
_, err = fp.Write(resp.ChunkData)
if err != nil {
return err
}
_, err = h.Write(resp.ChunkData)
if err != nil {
return err
}
offset = resp.Offset
}
if len(filename) == 0 {
return nil
}
isClosed = true
err = fp.Close()
if err != nil {
return err
}
if fmt.Sprintf("%x", h.Sum(nil)) != sum {
_ = os.Remove(path)
return nil
}
// 改成zip
zipPath := dir + "/" + filename
err = os.Rename(path, zipPath)
if err != nil {
return err
}
this.lastFile = zipPath
// 解压
err = this.unzip(zipPath)
if err != nil {
return err
}
return nil
}
// 解压
func (this *UpgradeManager) unzip(zipPath string) error {
var isOk = false
defer func() {
if isOk {
// 只有解压并覆盖成功后才会删除
_ = os.Remove(zipPath)
}
}()
// 解压
var target = Tea.Root
if Tea.IsTesting() {
// 测试环境下只解压在tmp目录
target = Tea.Root + "/tmp"
}
// 先改先前的可执行文件
err := os.Rename(target+"/bin/edge-node", target+"/bin/.edge-node.old")
hasBackup := err == nil
defer func() {
if !isOk && hasBackup {
// 失败时还原
_ = os.Rename(target+"/bin/.edge-node.old", target+"/bin/edge-node")
}
}()
unzip := utils.NewUnzip(zipPath, target, "edge-node/")
err = unzip.Run()
if err != nil {
return err
}
isOk = true
return nil
}
// 重启
func (this *UpgradeManager) restart() error {
// 重新启动
if DaemonIsOn && DaemonPid == os.Getppid() {
os.Exit(0) // TODO 试着更优雅重启
} else {
exe, err := os.Executable()
if err != nil {
return err
}
// quit
events.Notify(events.EventQuit)
// 启动
cmd := exec.Command(exe, "start")
err = cmd.Start()
if err != nil {
return err
}
// 退出当前进程
time.Sleep(1 * time.Second)
os.Exit(0)
}
return nil
}

View File

@@ -0,0 +1,16 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package nodes
import (
_ "github.com/iwind/TeaGo/bootstrap"
"testing"
)
func TestUpgradeManager_install(t *testing.T) {
err := NewUpgradeManager().install()
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}

98
internal/utils/unzip.go Normal file
View File

@@ -0,0 +1,98 @@
package utils
import (
"archive/zip"
"errors"
"io"
"os"
"strings"
)
type Unzip struct {
zipFile string
targetDir string
stripPrefix string
}
func NewUnzip(zipFile string, targetDir string, stripPrefix string) *Unzip {
return &Unzip{
zipFile: zipFile,
targetDir: targetDir,
stripPrefix: stripPrefix,
}
}
func (this *Unzip) Run() error {
if len(this.zipFile) == 0 {
return errors.New("zip file should not be empty")
}
if len(this.targetDir) == 0 {
return errors.New("target dir should not be empty")
}
reader, err := zip.OpenReader(this.zipFile)
if err != nil {
return err
}
defer func() {
_ = reader.Close()
}()
for _, file := range reader.File {
info := file.FileInfo()
filename := file.Name
if len(this.stripPrefix) > 0 {
filename = strings.TrimPrefix(filename, this.stripPrefix)
}
target := this.targetDir + "/" + filename
// 目录
if info.IsDir() {
stat, err := os.Stat(target)
if err != nil {
if !os.IsNotExist(err) {
return err
} else {
err = os.MkdirAll(target, info.Mode())
if err != nil {
return err
}
}
} else if !stat.IsDir() {
err = os.MkdirAll(target, info.Mode())
if err != nil {
return err
}
}
continue
}
// 文件
err := func(file *zip.File, target string) error {
fileReader, err := file.Open()
if err != nil {
return err
}
defer func() {
_ = fileReader.Close()
}()
fileWriter, err := os.OpenFile(target, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, file.FileInfo().Mode())
if err != nil {
return err
}
defer func() {
_ = fileWriter.Close()
}()
_, err = io.Copy(fileWriter, fileReader)
return err
}(file, target)
if err != nil {
return err
}
}
return nil
}