diff --git a/internal/utils/apinodeutils/deploy_file.go b/internal/utils/apinodeutils/deploy_file.go new file mode 100644 index 00000000..76ce2d03 --- /dev/null +++ b/internal/utils/apinodeutils/deploy_file.go @@ -0,0 +1,79 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package apinodeutils + +import ( + "crypto/md5" + "fmt" + "io" + "os" +) + +// DeployFile 部署文件描述 +type DeployFile struct { + OS string + Arch string + Version string + Path string +} + +// Sum 计算概要 +func (this *DeployFile) Sum() (string, error) { + fp, err := os.Open(this.Path) + if err != nil { + return "", err + } + defer func() { + _ = fp.Close() + }() + + m := md5.New() + buffer := make([]byte, 128*1024) + for { + n, err := fp.Read(buffer) + if err != nil { + if err == io.EOF { + break + } + return "", err + } + _, err = m.Write(buffer[:n]) + if err != nil { + return "", err + } + } + sum := m.Sum(nil) + return fmt.Sprintf("%x", sum), nil +} + +// Read 读取一个片段数据 +func (this *DeployFile) Read(offset int64) (data []byte, newOffset int64, err error) { + fp, err := os.Open(this.Path) + if err != nil { + return nil, offset, err + } + defer func() { + _ = fp.Close() + }() + + stat, err := fp.Stat() + if err != nil { + return nil, offset, err + } + if offset >= stat.Size() { + return nil, offset, io.EOF + } + + _, err = fp.Seek(offset, io.SeekStart) + if err != nil { + return nil, offset, err + } + + buffer := make([]byte, 128*1024) + n, err := fp.Read(buffer) + if err != nil { + return nil, offset, err + } + + return buffer[:n], offset + int64(n), nil +} diff --git a/internal/utils/apinodeutils/deploy_manager.go b/internal/utils/apinodeutils/deploy_manager.go new file mode 100644 index 00000000..8599df32 --- /dev/null +++ b/internal/utils/apinodeutils/deploy_manager.go @@ -0,0 +1,96 @@ +package apinodeutils + +import ( + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/files" + stringutil "github.com/iwind/TeaGo/utils/string" + "regexp" +) + +// DeployManager 节点部署文件管理器 +// 如果节点部署文件有变化,需要重启API节点以便于生效 +type DeployManager struct { + dir string +} + +// NewDeployManager 获取新节点部署文件管理器 +func NewDeployManager() *DeployManager { + var manager = &DeployManager{ + dir: Tea.Root + "/edge-api/deploy", + } + manager.LoadNodeFiles() + manager.LoadNSNodeFiles() + return manager +} + +// LoadNodeFiles 加载所有边缘节点文件 +func (this *DeployManager) LoadNodeFiles() []*DeployFile { + var keyMap = map[string]*DeployFile{} // key => File + + var reg = regexp.MustCompile(`^edge-node-(\w+)-(\w+)-v([0-9.]+)\.zip$`) + for _, file := range files.NewFile(this.dir).List() { + var name = file.Name() + if !reg.MatchString(name) { + continue + } + var matches = reg.FindStringSubmatch(name) + var osName = matches[1] + var arch = matches[2] + var version = matches[3] + + var key = osName + "_" + arch + oldFile, ok := keyMap[key] + if ok && stringutil.VersionCompare(oldFile.Version, version) > 0 { + continue + } + keyMap[key] = &DeployFile{ + OS: osName, + Arch: arch, + Version: version, + Path: file.Path(), + } + } + + var result = []*DeployFile{} + for _, v := range keyMap { + result = append(result, v) + } + + return result +} + +// LoadNSNodeFiles 加载所有NS节点安装文件 +func (this *DeployManager) LoadNSNodeFiles() []*DeployFile { + var keyMap = map[string]*DeployFile{} // key => File + + var reg = regexp.MustCompile(`^edge-dns-(\w+)-(\w+)-v([0-9.]+)\.zip$`) + for _, file := range files.NewFile(this.dir).List() { + var name = file.Name() + if !reg.MatchString(name) { + continue + } + var matches = reg.FindStringSubmatch(name) + var osName = matches[1] + var arch = matches[2] + var version = matches[3] + + var key = osName + "_" + arch + oldFile, ok := keyMap[key] + if ok && stringutil.VersionCompare(oldFile.Version, version) > 0 { + continue + } + keyMap[key] = &DeployFile{ + OS: osName, + Arch: arch, + Version: version, + Path: file.Path(), + } + } + + var result = []*DeployFile{} + for _, v := range keyMap { + result = append(result, v) + } + + return result +} diff --git a/internal/utils/apinodeutils/upgrader.go b/internal/utils/apinodeutils/upgrader.go index fa0da5c8..ae46dd5e 100644 --- a/internal/utils/apinodeutils/upgrader.go +++ b/internal/utils/apinodeutils/upgrader.go @@ -4,6 +4,7 @@ package apinodeutils import ( "compress/gzip" + "context" "crypto/md5" "errors" "fmt" @@ -62,7 +63,36 @@ func (this *Upgrader) Upgrade() error { if err != nil { return err } - versionResp, err := rpcClient.APINodeRPC().FindCurrentAPINodeVersion(sharedClient.Context(0), &pb.FindCurrentAPINodeVersionRequest{}) + + // 升级边缘节点 + err = this.upgradeNodes(sharedClient.Context(0), rpcClient) + if err != nil { + return err + } + + // 升级NS节点 + err = this.upgradeNSNodes(sharedClient.Context(0), rpcClient) + if err != nil { + return err + } + + // 升级API节点 + err = this.upgradeAPINode(sharedClient.Context(0), rpcClient) + if err != nil { + return errors.New("upgrade api node failed: " + err.Error()) + } + + return nil +} + +// Progress 查看升级进程 +func (this *Upgrader) Progress() *Progress { + return this.progress +} + +// 升级API节点 +func (this *Upgrader) upgradeAPINode(ctx context.Context, rpcClient *rpc.RPCClient) error { + versionResp, err := rpcClient.APINodeRPC().FindCurrentAPINodeVersion(ctx, &pb.FindCurrentAPINodeVersionRequest{}) if err != nil { return err } @@ -77,7 +107,7 @@ func (this *Upgrader) Upgrade() error { return errors.New(reason) } - localVersion, err := localVersion() + localVersion, err := lookupLocalVersion() if err != nil { return errors.New("lookup version failed: " + err.Error()) } @@ -196,6 +226,124 @@ func (this *Upgrader) Upgrade() error { return nil } -func (this *Upgrader) Progress() *Progress { - return this.progress +// 升级边缘节点 +func (this *Upgrader) upgradeNodes(ctx context.Context, rpcClient *rpc.RPCClient) error { + // 本地的 + var manager = NewDeployManager() + var localFileMap = map[string]*DeployFile{} // os_arch => *DeployFile + for _, deployFile := range manager.LoadNodeFiles() { + localFileMap[deployFile.OS+"_"+deployFile.Arch] = deployFile + } + + remoteFilesResp, err := rpcClient.APINodeRPC().FindLatestDeployFiles(ctx, &pb.FindLatestDeployFilesRequest{}) + if err != nil { + return err + } + + var remoteFileMap = map[string]*pb.FindLatestDeployFilesResponse_DeployFile{} // os_arch => *DeployFile + for _, nodeFile := range remoteFilesResp.NodeDeployFiles { + remoteFileMap[nodeFile.Os+"_"+nodeFile.Arch] = nodeFile + } + + // 对比版本 + for key, deployFile := range localFileMap { + remoteDeployFile, ok := remoteFileMap[key] + if !ok || stringutil.VersionCompare(remoteDeployFile.Version, deployFile.Version) < 0 { + err = this.uploadNodeDeployFile(ctx, rpcClient, deployFile.Path) + if err != nil { + return errors.New("upload deploy file '" + filepath.Base(deployFile.Path) + "' failed: " + err.Error()) + } + } + } + + return nil +} + +// 升级NS节点 +func (this *Upgrader) upgradeNSNodes(ctx context.Context, rpcClient *rpc.RPCClient) error { + // 本地的 + var manager = NewDeployManager() + var localFileMap = map[string]*DeployFile{} // os_arch => *DeployFile + for _, deployFile := range manager.LoadNSNodeFiles() { + localFileMap[deployFile.OS+"_"+deployFile.Arch] = deployFile + } + + remoteFilesResp, err := rpcClient.APINodeRPC().FindLatestDeployFiles(ctx, &pb.FindLatestDeployFilesRequest{}) + if err != nil { + return err + } + + var remoteFileMap = map[string]*pb.FindLatestDeployFilesResponse_DeployFile{} // os_arch => *DeployFile + for _, nodeFile := range remoteFilesResp.NsNodeDeployFiles { + remoteFileMap[nodeFile.Os+"_"+nodeFile.Arch] = nodeFile + } + + // 对比版本 + for key, deployFile := range localFileMap { + remoteDeployFile, ok := remoteFileMap[key] + if !ok || stringutil.VersionCompare(remoteDeployFile.Version, deployFile.Version) < 0 { + err = this.uploadNodeDeployFile(ctx, rpcClient, deployFile.Path) + if err != nil { + return errors.New("upload deploy file '" + filepath.Base(deployFile.Path) + "' failed: " + err.Error()) + } + } + } + + return nil +} + +// 上传节点文件 +func (this *Upgrader) uploadNodeDeployFile(ctx context.Context, rpcClient *rpc.RPCClient, path string) error { + fp, err := os.Open(path) + if err != nil { + return err + } + defer func() { + _ = fp.Close() + }() + + var buf = make([]byte, 128*4096) + var isFirst = true + + var hash = md5.New() + + for { + n, err := fp.Read(buf) + if n > 0 { + hash.Write(buf[:n]) + + _, uploadErr := rpcClient.APINodeRPC().UploadDeployFileToAPINode(ctx, &pb.UploadDeployFileToAPINodeRequest{ + Filename: filepath.Base(path), + Sum: "", + ChunkData: buf[:n], + IsFirstChunk: isFirst, + IsLastChunk: false, + }) + if uploadErr != nil { + return uploadErr + } + isFirst = false + } + if err != nil { + if err == io.EOF { + err = nil + + _, uploadErr := rpcClient.APINodeRPC().UploadDeployFileToAPINode(ctx, &pb.UploadDeployFileToAPINodeRequest{ + Filename: filepath.Base(path), + Sum: fmt.Sprintf("%x", hash.Sum(nil)), + ChunkData: nil, + IsFirstChunk: false, + IsLastChunk: true, + }) + if uploadErr != nil { + return uploadErr + } + + break + } + return err + } + } + + return nil } diff --git a/internal/utils/apinodeutils/utils.go b/internal/utils/apinodeutils/utils.go index ce78d80c..2f7f0997 100644 --- a/internal/utils/apinodeutils/utils.go +++ b/internal/utils/apinodeutils/utils.go @@ -39,7 +39,7 @@ func CanUpgrade(apiVersion string, osName string, arch string) (canUpgrade bool, return false, "is directory" } - localVersion, err := localVersion() + localVersion, err := lookupLocalVersion() if err != nil { return false, "lookup version failed: " + err.Error() } @@ -53,9 +53,7 @@ func CanUpgrade(apiVersion string, osName string, arch string) (canUpgrade bool, return true, "" } - - -func localVersion() (string, error) { +func lookupLocalVersion() (string, error) { var cmd = exec.Command(apiExe(), "-V") var output = &bytes.Buffer{} cmd.Stdout = output @@ -74,7 +72,6 @@ func localVersion() (string, error) { return localVersion, nil } - func apiExe() string { return Tea.Root + "/edge-api/bin/edge-api" -} \ No newline at end of file +}