diff --git a/internal/nodes/api_stream.go b/internal/nodes/api_stream.go index 107c210..73f08e3 100644 --- a/internal/nodes/api_stream.go +++ b/internal/nodes/api_stream.go @@ -1,6 +1,7 @@ package nodes import ( + "context" "encoding/json" "fmt" "github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs" @@ -13,6 +14,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/iwind/TeaGo/logs" "io" "net/http" "os/exec" @@ -55,10 +57,16 @@ func (this *APIStream) loop() error { return errors.Wrap(err) } isQuiting := false + ctx, cancelFunc := context.WithCancel(rpcClient.Context()) + nodeStream, err := rpcClient.NodeRPC().NodeStream(ctx) events.On(events.EventQuit, func() { isQuiting = true + + remotelogs.Println("API_STREAM", "quiting") + if nodeStream != nil { + cancelFunc() + } }) - nodeStream, err := rpcClient.NodeRPC().NodeStream(rpcClient.Context()) if err != nil { if isQuiting { return nil @@ -69,12 +77,14 @@ func (this *APIStream) loop() error { for { if isQuiting { + logs.Println("API_STREAM", "quit") break } message, err := nodeStream.Recv() if err != nil { if isQuiting { + remotelogs.Println("API_STREAM", "quit") return nil } return errors.Wrap(err) diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 166a1d7..089902f 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -33,8 +33,10 @@ import ( var sharedNodeConfig *nodeconfigs.NodeConfig var nodeTaskNotify = make(chan bool, 8) +var DaemonIsOn = false +var DaemonPid = 0 -// 节点 +// Node 节点 type Node struct { isLoaded bool } @@ -43,7 +45,7 @@ func NewNode() *Node { return &Node{} } -// 检查配置 +// Test 检查配置 func (this *Node) Test() error { // 检查是否能连接API rpcClient, err := rpc.SharedRPC() @@ -58,8 +60,15 @@ func (this *Node) Test() error { return nil } -// 启动 +// Start 启动 func (this *Node) Start() { + _, ok := os.LookupEnv("EdgeDaemon") + if ok { + remotelogs.Println("NODE", "start from daemon") + DaemonIsOn = true + DaemonPid = os.Getppid() + } + // 启动事件 events.Notify(events.EventStart) @@ -146,7 +155,7 @@ func (this *Node) Start() { select {} } -// 实现守护进程 +// Daemon 实现守护进程 func (this *Node) Daemon() { path := os.TempDir() + "/edge-node.sock" isDebug := lists.ContainsString(os.Args, "debug") @@ -164,6 +173,10 @@ func (this *Node) Daemon() { if err != nil { return err } + + // 可以标记当前是从守护进程启动的 + _ = os.Setenv("EdgeDaemon", "on") + cmd := exec.Command(exe) err = cmd.Start() if err != nil { @@ -191,7 +204,7 @@ func (this *Node) Daemon() { } } -// 安装系统服务 +// InstallSystemService 安装系统服务 func (this *Node) InstallSystemService() error { shortName := teaconst.SystemdServiceName @@ -285,6 +298,8 @@ func (this *Node) loop() error { if err != nil { return err } + case "nodeVersionChanged": + go sharedUpgradeManager.Start() } } diff --git a/internal/nodes/upgrade_manager.go b/internal/nodes/upgrade_manager.go new file mode 100644 index 0000000..ff88b6d --- /dev/null +++ b/internal/nodes/upgrade_manager.go @@ -0,0 +1,252 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package nodes + +import ( + "crypto/md5" + "fmt" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/TeaOSLab/EdgeNode/internal/rpc" + "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/logs" + stringutil "github.com/iwind/TeaGo/utils/string" + "os" + "os/exec" + "runtime" + "time" +) + +var sharedUpgradeManager = NewUpgradeManager() + +// UpgradeManager 节点升级管理器 +// TODO 需要在集群中设置是否自动更新 +type UpgradeManager struct { + isInstalling bool + lastFile string +} + +// NewUpgradeManager 获取新对象 +func NewUpgradeManager() *UpgradeManager { + return &UpgradeManager{} +} + +// Start 启动升级 +func (this *UpgradeManager) Start() { + // 测试环境下不更新 + if Tea.IsTesting() { + return + } + + if this.isInstalling { + return + } + this.isInstalling = true + + // 还原安装状态 + defer func() { + this.isInstalling = false + }() + + remotelogs.Println("UPGRADE_MANAGER", "upgrading node ...") + err := this.install() + if err != nil { + remotelogs.Error("UPGRADE_MANAGER", "download failed: "+err.Error()) + return + } + + remotelogs.Println("UPGRADE_MANAGER", "upgrade successfully") + + go func() { + err = this.restart() + if err != nil { + logs.Println("UPGRADE_MANAGER", err.Error()) + } + }() +} + +func (this *UpgradeManager) install() error { + // 检查是否有已下载但未安装成功的 + if len(this.lastFile) > 0 { + _, err := os.Stat(this.lastFile) + if err == nil { + err = this.unzip(this.lastFile) + if err != nil { + return err + } + this.lastFile = "" + return nil + } + } + + // 创建临时文件 + dir := Tea.Root + "/tmp" + _, err := os.Stat(dir) + if err != nil { + if os.IsNotExist(err) { + err = os.Mkdir(dir, 0777) + if err != nil { + return err + } + } else { + return err + } + } + + remotelogs.Println("UPGRADE_MANAGER", "downloading new node ...") + + path := dir + "/edge-node" + ".tmp" + fp, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0777) + if err != nil { + return err + } + isClosed := false + defer func() { + if !isClosed { + _ = fp.Close() + } + }() + + client, err := rpc.SharedRPC() + if err != nil { + return err + } + + var offset int64 + var h = md5.New() + var sum = "" + var filename = "" + for { + resp, err := client.NodeRPC().DownloadNodeInstallationFile(client.Context(), &pb.DownloadNodeInstallationFileRequest{ + Os: runtime.GOOS, + Arch: runtime.GOARCH, + ChunkOffset: offset, + }) + if err != nil { + return err + } + if len(resp.Sum) == 0 { + return nil + } + sum = resp.Sum + filename = resp.Filename + if stringutil.VersionCompare(resp.Version, teaconst.Version) <= 0 { + return nil + } + if len(resp.ChunkData) == 0 { + break + } + + // 写入文件 + _, err = fp.Write(resp.ChunkData) + if err != nil { + return err + } + _, err = h.Write(resp.ChunkData) + if err != nil { + return err + } + + offset = resp.Offset + } + + if len(filename) == 0 { + return nil + } + + isClosed = true + err = fp.Close() + if err != nil { + return err + } + + if fmt.Sprintf("%x", h.Sum(nil)) != sum { + _ = os.Remove(path) + return nil + } + + // 改成zip + zipPath := dir + "/" + filename + err = os.Rename(path, zipPath) + if err != nil { + return err + } + this.lastFile = zipPath + + // 解压 + err = this.unzip(zipPath) + if err != nil { + return err + } + + return nil +} + +// 解压 +func (this *UpgradeManager) unzip(zipPath string) error { + var isOk = false + defer func() { + if isOk { + // 只有解压并覆盖成功后才会删除 + _ = os.Remove(zipPath) + } + }() + + // 解压 + var target = Tea.Root + if Tea.IsTesting() { + // 测试环境下只解压在tmp目录 + target = Tea.Root + "/tmp" + } + + // 先改先前的可执行文件 + err := os.Rename(target+"/bin/edge-node", target+"/bin/.edge-node.old") + hasBackup := err == nil + defer func() { + if !isOk && hasBackup { + // 失败时还原 + _ = os.Rename(target+"/bin/.edge-node.old", target+"/bin/edge-node") + } + }() + + unzip := utils.NewUnzip(zipPath, target, "edge-node/") + err = unzip.Run() + if err != nil { + return err + } + + isOk = true + + return nil +} + +// 重启 +func (this *UpgradeManager) restart() error { + // 重新启动 + if DaemonIsOn && DaemonPid == os.Getppid() { + os.Exit(0) // TODO 试着更优雅重启 + } else { + exe, err := os.Executable() + if err != nil { + return err + } + + // quit + events.Notify(events.EventQuit) + + // 启动 + cmd := exec.Command(exe, "start") + err = cmd.Start() + if err != nil { + return err + } + + // 退出当前进程 + time.Sleep(1 * time.Second) + os.Exit(0) + } + return nil +} diff --git a/internal/nodes/upgrade_manager_test.go b/internal/nodes/upgrade_manager_test.go new file mode 100644 index 0000000..7f88af2 --- /dev/null +++ b/internal/nodes/upgrade_manager_test.go @@ -0,0 +1,16 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package nodes + +import ( + _ "github.com/iwind/TeaGo/bootstrap" + "testing" +) + +func TestUpgradeManager_install(t *testing.T) { + err := NewUpgradeManager().install() + if err != nil { + t.Fatal(err) + } + t.Log("ok") +} diff --git a/internal/utils/unzip.go b/internal/utils/unzip.go new file mode 100644 index 0000000..95e4bdb --- /dev/null +++ b/internal/utils/unzip.go @@ -0,0 +1,98 @@ +package utils + +import ( + "archive/zip" + "errors" + "io" + "os" + "strings" +) + +type Unzip struct { + zipFile string + targetDir string + stripPrefix string +} + +func NewUnzip(zipFile string, targetDir string, stripPrefix string) *Unzip { + return &Unzip{ + zipFile: zipFile, + targetDir: targetDir, + stripPrefix: stripPrefix, + } +} + +func (this *Unzip) Run() error { + if len(this.zipFile) == 0 { + return errors.New("zip file should not be empty") + } + if len(this.targetDir) == 0 { + return errors.New("target dir should not be empty") + } + + reader, err := zip.OpenReader(this.zipFile) + if err != nil { + return err + } + + defer func() { + _ = reader.Close() + }() + + for _, file := range reader.File { + info := file.FileInfo() + filename := file.Name + if len(this.stripPrefix) > 0 { + filename = strings.TrimPrefix(filename, this.stripPrefix) + } + target := this.targetDir + "/" + filename + + // 目录 + if info.IsDir() { + stat, err := os.Stat(target) + if err != nil { + if !os.IsNotExist(err) { + return err + } else { + err = os.MkdirAll(target, info.Mode()) + if err != nil { + return err + } + } + } else if !stat.IsDir() { + err = os.MkdirAll(target, info.Mode()) + if err != nil { + return err + } + } + continue + } + + // 文件 + err := func(file *zip.File, target string) error { + fileReader, err := file.Open() + if err != nil { + return err + } + defer func() { + _ = fileReader.Close() + }() + + fileWriter, err := os.OpenFile(target, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, file.FileInfo().Mode()) + if err != nil { + return err + } + defer func() { + _ = fileWriter.Close() + }() + + _, err = io.Copy(fileWriter, fileReader) + return err + }(file, target) + if err != nil { + return err + } + } + + return nil +}