diff --git a/internal/db/models/node_dao.go b/internal/db/models/node_dao.go index 694d8a45..2ccc19ef 100644 --- a/internal/db/models/node_dao.go +++ b/internal/db/models/node_dao.go @@ -402,6 +402,27 @@ func (this *NodeDAO) UpdateNodeStatus(tx *dbs.Tx, nodeId int64, statusJSON []byt return err } +// FindNodeStatus 获取节点状态 +func (this *NodeDAO) FindNodeStatus(tx *dbs.Tx, nodeId int64) (*nodeconfigs.NodeStatus, error) { + statusJSONString, err := this.Query(tx). + Pk(nodeId). + Result("status"). + FindStringCol("") + if err != nil { + return nil, err + } + if len(statusJSONString) == 0 { + return nil, nil + } + + status := &nodeconfigs.NodeStatus{} + err = json.Unmarshal([]byte(statusJSONString), status) + if err != nil { + return nil, err + } + return status, nil +} + // UpdateNodeIsActive 更改节点在线状态 func (this *NodeDAO) UpdateNodeIsActive(tx *dbs.Tx, nodeId int64, isActive bool) error { b := "true" diff --git a/internal/db/models/node_task_dao.go b/internal/db/models/node_task_dao.go index 6d1c9a5d..979c220f 100644 --- a/internal/db/models/node_task_dao.go +++ b/internal/db/models/node_task_dao.go @@ -13,8 +13,9 @@ import ( type NodeTaskType = string const ( - NodeTaskTypeConfigChanged NodeTaskType = "configChanged" - NodeTaskTypeIPItemChanged NodeTaskType = "ipItemChanged" + NodeTaskTypeConfigChanged NodeTaskType = "configChanged" + NodeTaskTypeIPItemChanged NodeTaskType = "ipItemChanged" + NodeTaskTypeNodeVersionChanged NodeTaskType = "nodeVersionChanged" ) type NodeTaskDAO dbs.DAO diff --git a/internal/installers/deploy_file.go b/internal/installers/deploy_file.go new file mode 100644 index 00000000..79029c72 --- /dev/null +++ b/internal/installers/deploy_file.go @@ -0,0 +1,79 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package installers + +import ( + "crypto/md5" + "fmt" + "io" + "os" +) + +// DeployFile 部署文件描述 +type DeployFile struct { + OS string + Arch string + Version string + Path string +} + +// Sum 计算概要 +func (this *DeployFile) Sum() (string, error) { + fp, err := os.Open(this.Path) + if err != nil { + return "", err + } + defer func() { + _ = fp.Close() + }() + + m := md5.New() + buffer := make([]byte, 128*1024) + for { + n, err := fp.Read(buffer) + if err != nil { + if err == io.EOF { + break + } + return "", err + } + _, err = m.Write(buffer[:n]) + if err != nil { + return "", err + } + } + sum := m.Sum(nil) + return fmt.Sprintf("%x", sum), nil +} + +// Read 读取一个片段数据 +func (this *DeployFile) Read(offset int64) (data []byte, newOffset int64, err error) { + fp, err := os.Open(this.Path) + if err != nil { + return nil, offset, err + } + defer func() { + _ = fp.Close() + }() + + stat, err := fp.Stat() + if err != nil { + return nil, offset, err + } + if offset >= stat.Size() { + return nil, offset, io.EOF + } + + _, err = fp.Seek(offset, io.SeekStart) + if err != nil { + return nil, offset, err + } + + buffer := make([]byte, 128*1024) + n, err := fp.Read(buffer) + if err != nil { + return nil, offset, err + } + + return buffer[:n], offset + int64(n), nil +} diff --git a/internal/installers/deploy_file_test.go b/internal/installers/deploy_file_test.go new file mode 100644 index 00000000..955fbcff --- /dev/null +++ b/internal/installers/deploy_file_test.go @@ -0,0 +1,36 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package installers + +import ( + "io" + "testing" +) + +func TestDeployFile_Sum(t *testing.T) { + d := &DeployFile{Path: "deploy_test.txt"} + sum, err := d.Sum() + if err != nil { + t.Log("err:", err) + return + } + t.Log("sum:", sum) +} + +func TestDeployFile_Read(t *testing.T) { + d := &DeployFile{Path: "deploy_test.txt"} + + var offset int64 + for i := 0; i < 3; i++ { + data, newOffset, err := d.Read(offset) + if err != nil { + if err == io.EOF { + break + } + t.Log("err: ", err) + return + } + t.Log("offset:", newOffset, "data:", string(data)) + offset = newOffset + } +} diff --git a/internal/installers/deploy_manager.go b/internal/installers/deploy_manager.go index 70df1f43..4ed68e37 100644 --- a/internal/installers/deploy_manager.go +++ b/internal/installers/deploy_manager.go @@ -9,17 +9,11 @@ import ( var SharedDeployManager = NewDeployManager() -type DeployFile struct { - OS string - Arch string - Version string - Path string -} - type DeployManager struct { dir string } +// NewDeployManager 节点部署文件管理器 func NewDeployManager() *DeployManager { return &DeployManager{ dir: Tea.Root + "/deploy", @@ -61,7 +55,15 @@ func (this *DeployManager) LoadNodeFiles() []*DeployFile { return result } - +// FindNodeFile 查找特别平台的节点文件 +func (this *DeployManager) FindNodeFile(os string, arch string) *DeployFile { + for _, file := range this.LoadNodeFiles() { + if file.OS == os && file.Arch == arch { + return file + } + } + return nil +} // LoadNSNodeFiles 加载所有文件 func (this *DeployManager) LoadNSNodeFiles() []*DeployFile { @@ -96,4 +98,4 @@ func (this *DeployManager) LoadNSNodeFiles() []*DeployFile { result = append(result, v) } return result -} \ No newline at end of file +} diff --git a/internal/rpc/services/service_node.go b/internal/rpc/services/service_node.go index db43245c..f520a14f 100644 --- a/internal/rpc/services/service_node.go +++ b/internal/rpc/services/service_node.go @@ -18,6 +18,7 @@ import ( "github.com/iwind/TeaGo/types" stringutil "github.com/iwind/TeaGo/utils/string" "net" + "path/filepath" ) // NodeService 边缘节点相关服务 @@ -1349,3 +1350,31 @@ func (this *NodeService) UpdateNodeUp(ctx context.Context, req *pb.UpdateNodeUpR return this.Success() } + +// DownloadNodeInstallationFile 下载最新边缘节点安装文件 +func (this *NodeService) DownloadNodeInstallationFile(ctx context.Context, req *pb.DownloadNodeInstallationFileRequest) (*pb.DownloadNodeInstallationFileResponse, error) { + _, err := this.ValidateNode(ctx) + if err != nil { + return nil, err + } + + file := installers.SharedDeployManager.FindNodeFile(req.Os, req.Arch) + if file == nil { + return &pb.DownloadNodeInstallationFileResponse{}, nil + } + + sum, err := file.Sum() + if err != nil { + return nil, err + } + + data, offset, err := file.Read(req.ChunkOffset) + + return &pb.DownloadNodeInstallationFileResponse{ + Sum: sum, + Offset: offset, + ChunkData: data, + Version: file.Version, + Filename: filepath.Base(file.Path), + }, nil +} diff --git a/internal/rpc/services/service_node_task.go b/internal/rpc/services/service_node_task.go index becc8cb5..f80a0503 100644 --- a/internal/rpc/services/service_node_task.go +++ b/internal/rpc/services/service_node_task.go @@ -3,17 +3,19 @@ package services import ( "context" "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeAPI/internal/installers" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/iwind/TeaGo/dbs" + stringutil "github.com/iwind/TeaGo/utils/string" "time" ) -// 节点同步任务相关服务 +// NodeTaskService 节点同步任务相关服务 type NodeTaskService struct { BaseService } -// 获取单节点同步任务 +// FindNodeTasks 获取单节点同步任务 func (this *NodeTaskService) FindNodeTasks(ctx context.Context, req *pb.FindNodeTasksRequest) (*pb.FindNodeTasksResponse, error) { nodeId, err := this.ValidateNode(ctx) if err != nil { @@ -36,10 +38,26 @@ func (this *NodeTaskService) FindNodeTasks(ctx context.Context, req *pb.FindNode }) } + // 版本更新任务 + status, err := models.SharedNodeDAO.FindNodeStatus(tx, nodeId) + if err != nil { + return nil, err + } + if status != nil && len(status.OS) > 0 && len(status.Arch) > 0 && len(status.BuildVersion) > 0 { + deployFile := installers.SharedDeployManager.FindNodeFile(status.OS, status.Arch) + if deployFile != nil { + if stringutil.VersionCompare(deployFile.Version, status.BuildVersion) > 0 { + pbTasks = append(pbTasks, &pb.NodeTask{ + Type: models.NodeTaskTypeNodeVersionChanged, + }) + } + } + } + return &pb.FindNodeTasksResponse{NodeTasks: pbTasks}, nil } -// 报告同步任务结果 +// ReportNodeTaskDone 报告同步任务结果 func (this *NodeTaskService) ReportNodeTaskDone(ctx context.Context, req *pb.ReportNodeTaskDoneRequest) (*pb.RPCSuccess, error) { _, err := this.ValidateNode(ctx) if err != nil { @@ -55,7 +73,7 @@ func (this *NodeTaskService) ReportNodeTaskDone(ctx context.Context, req *pb.Rep return this.Success() } -// 获取所有正在同步的集群信息 +// FindNodeClusterTasks 获取所有正在同步的集群信息 func (this *NodeTaskService) FindNodeClusterTasks(ctx context.Context, req *pb.FindNodeClusterTasksRequest) (*pb.FindNodeClusterTasksResponse, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil { @@ -125,7 +143,7 @@ func (this *NodeTaskService) FindNodeClusterTasks(ctx context.Context, req *pb.F return &pb.FindNodeClusterTasksResponse{ClusterTasks: pbClusterTasks}, nil } -// 检查是否有正在执行的任务 +// ExistsNodeTasks 检查是否有正在执行的任务 func (this *NodeTaskService) ExistsNodeTasks(ctx context.Context, req *pb.ExistsNodeTasksRequest) (*pb.ExistsNodeTasksResponse, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil { @@ -154,7 +172,7 @@ func (this *NodeTaskService) ExistsNodeTasks(ctx context.Context, req *pb.Exists }, nil } -// 删除任务 +// DeleteNodeTask 删除任务 func (this *NodeTaskService) DeleteNodeTask(ctx context.Context, req *pb.DeleteNodeTaskRequest) (*pb.RPCSuccess, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil { @@ -170,7 +188,7 @@ func (this *NodeTaskService) DeleteNodeTask(ctx context.Context, req *pb.DeleteN return this.Success() } -// 批量删除任务 +// DeleteNodeTasks 批量删除任务 func (this *NodeTaskService) DeleteNodeTasks(ctx context.Context, req *pb.DeleteNodeTasksRequest) (*pb.RPCSuccess, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil { @@ -188,7 +206,7 @@ func (this *NodeTaskService) DeleteNodeTasks(ctx context.Context, req *pb.Delete return this.Success() } -// 计算正在执行的任务数量 +// CountDoingNodeTasks 计算正在执行的任务数量 func (this *NodeTaskService) CountDoingNodeTasks(ctx context.Context, req *pb.CountDoingNodeTasksRequest) (*pb.RPCCountResponse, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil { @@ -205,7 +223,7 @@ func (this *NodeTaskService) CountDoingNodeTasks(ctx context.Context, req *pb.Co return this.SuccessCount(count) } -// 查找需要通知的任务 +// FindNotifyingNodeTasks 查找需要通知的任务 func (this *NodeTaskService) FindNotifyingNodeTasks(ctx context.Context, req *pb.FindNotifyingNodeTasksRequest) (*pb.FindNotifyingNodeTasksResponse, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil { @@ -241,7 +259,7 @@ func (this *NodeTaskService) FindNotifyingNodeTasks(ctx context.Context, req *pb return &pb.FindNotifyingNodeTasksResponse{NodeTasks: pbTasks}, nil } -// 设置任务已通知 +// UpdateNodeTasksNotified 设置任务已通知 func (this *NodeTaskService) UpdateNodeTasksNotified(ctx context.Context, req *pb.UpdateNodeTasksNotifiedRequest) (*pb.RPCSuccess, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil {