远程升级API节点时自动上传边缘节点安装文件

This commit is contained in:
GoEdgeLab
2023-04-23 19:48:07 +08:00
parent d943ebf843
commit 084e71b288
4 changed files with 330 additions and 10 deletions

View File

@@ -4,6 +4,7 @@ package apinodeutils
import (
"compress/gzip"
"context"
"crypto/md5"
"errors"
"fmt"
@@ -62,7 +63,36 @@ func (this *Upgrader) Upgrade() error {
if err != nil {
return err
}
versionResp, err := rpcClient.APINodeRPC().FindCurrentAPINodeVersion(sharedClient.Context(0), &pb.FindCurrentAPINodeVersionRequest{})
// 升级边缘节点
err = this.upgradeNodes(sharedClient.Context(0), rpcClient)
if err != nil {
return err
}
// 升级NS节点
err = this.upgradeNSNodes(sharedClient.Context(0), rpcClient)
if err != nil {
return err
}
// 升级API节点
err = this.upgradeAPINode(sharedClient.Context(0), rpcClient)
if err != nil {
return errors.New("upgrade api node failed: " + err.Error())
}
return nil
}
// Progress 查看升级进程
func (this *Upgrader) Progress() *Progress {
return this.progress
}
// 升级API节点
func (this *Upgrader) upgradeAPINode(ctx context.Context, rpcClient *rpc.RPCClient) error {
versionResp, err := rpcClient.APINodeRPC().FindCurrentAPINodeVersion(ctx, &pb.FindCurrentAPINodeVersionRequest{})
if err != nil {
return err
}
@@ -77,7 +107,7 @@ func (this *Upgrader) Upgrade() error {
return errors.New(reason)
}
localVersion, err := localVersion()
localVersion, err := lookupLocalVersion()
if err != nil {
return errors.New("lookup version failed: " + err.Error())
}
@@ -196,6 +226,124 @@ func (this *Upgrader) Upgrade() error {
return nil
}
func (this *Upgrader) Progress() *Progress {
return this.progress
// 升级边缘节点
func (this *Upgrader) upgradeNodes(ctx context.Context, rpcClient *rpc.RPCClient) error {
// 本地的
var manager = NewDeployManager()
var localFileMap = map[string]*DeployFile{} // os_arch => *DeployFile
for _, deployFile := range manager.LoadNodeFiles() {
localFileMap[deployFile.OS+"_"+deployFile.Arch] = deployFile
}
remoteFilesResp, err := rpcClient.APINodeRPC().FindLatestDeployFiles(ctx, &pb.FindLatestDeployFilesRequest{})
if err != nil {
return err
}
var remoteFileMap = map[string]*pb.FindLatestDeployFilesResponse_DeployFile{} // os_arch => *DeployFile
for _, nodeFile := range remoteFilesResp.NodeDeployFiles {
remoteFileMap[nodeFile.Os+"_"+nodeFile.Arch] = nodeFile
}
// 对比版本
for key, deployFile := range localFileMap {
remoteDeployFile, ok := remoteFileMap[key]
if !ok || stringutil.VersionCompare(remoteDeployFile.Version, deployFile.Version) < 0 {
err = this.uploadNodeDeployFile(ctx, rpcClient, deployFile.Path)
if err != nil {
return errors.New("upload deploy file '" + filepath.Base(deployFile.Path) + "' failed: " + err.Error())
}
}
}
return nil
}
// 升级NS节点
func (this *Upgrader) upgradeNSNodes(ctx context.Context, rpcClient *rpc.RPCClient) error {
// 本地的
var manager = NewDeployManager()
var localFileMap = map[string]*DeployFile{} // os_arch => *DeployFile
for _, deployFile := range manager.LoadNSNodeFiles() {
localFileMap[deployFile.OS+"_"+deployFile.Arch] = deployFile
}
remoteFilesResp, err := rpcClient.APINodeRPC().FindLatestDeployFiles(ctx, &pb.FindLatestDeployFilesRequest{})
if err != nil {
return err
}
var remoteFileMap = map[string]*pb.FindLatestDeployFilesResponse_DeployFile{} // os_arch => *DeployFile
for _, nodeFile := range remoteFilesResp.NsNodeDeployFiles {
remoteFileMap[nodeFile.Os+"_"+nodeFile.Arch] = nodeFile
}
// 对比版本
for key, deployFile := range localFileMap {
remoteDeployFile, ok := remoteFileMap[key]
if !ok || stringutil.VersionCompare(remoteDeployFile.Version, deployFile.Version) < 0 {
err = this.uploadNodeDeployFile(ctx, rpcClient, deployFile.Path)
if err != nil {
return errors.New("upload deploy file '" + filepath.Base(deployFile.Path) + "' failed: " + err.Error())
}
}
}
return nil
}
// 上传节点文件
func (this *Upgrader) uploadNodeDeployFile(ctx context.Context, rpcClient *rpc.RPCClient, path string) error {
fp, err := os.Open(path)
if err != nil {
return err
}
defer func() {
_ = fp.Close()
}()
var buf = make([]byte, 128*4096)
var isFirst = true
var hash = md5.New()
for {
n, err := fp.Read(buf)
if n > 0 {
hash.Write(buf[:n])
_, uploadErr := rpcClient.APINodeRPC().UploadDeployFileToAPINode(ctx, &pb.UploadDeployFileToAPINodeRequest{
Filename: filepath.Base(path),
Sum: "",
ChunkData: buf[:n],
IsFirstChunk: isFirst,
IsLastChunk: false,
})
if uploadErr != nil {
return uploadErr
}
isFirst = false
}
if err != nil {
if err == io.EOF {
err = nil
_, uploadErr := rpcClient.APINodeRPC().UploadDeployFileToAPINode(ctx, &pb.UploadDeployFileToAPINodeRequest{
Filename: filepath.Base(path),
Sum: fmt.Sprintf("%x", hash.Sum(nil)),
ChunkData: nil,
IsFirstChunk: false,
IsLastChunk: true,
})
if uploadErr != nil {
return uploadErr
}
break
}
return err
}
}
return nil
}