diff --git a/internal/installers/upgrade_limiter.go b/internal/installers/upgrade_limiter.go new file mode 100644 index 00000000..ff730e52 --- /dev/null +++ b/internal/installers/upgrade_limiter.go @@ -0,0 +1,90 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package installers + +import ( + "github.com/TeaOSLab/EdgeAPI/internal/utils/sizes" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" + "github.com/iwind/TeaGo/types" + "sync" + "time" +) + +const ( + UpgradeLimiterDuration = 10 // node key expire time, by seconds + UpgradeLimiterConcurrent = 10 // 10 nodes + UpgradeLimiterMaxBytesPerSecond = 5 * sizes.M // max bytes per second +) + +var SharedUpgradeLimiter = NewUpgradeLimiter() + +// UpgradeLimiter 升级流量管理器 +type UpgradeLimiter struct { + nodeMap map[string]int64 // key => timestamp + + rateTimestamp int64 + rateBytes int64 + + locker sync.Mutex +} + +func NewUpgradeLimiter() *UpgradeLimiter { + return &UpgradeLimiter{ + nodeMap: map[string]int64{}, + } +} + +// UpdateNodeBytes 添加正在下载的节点流量 +func (this *UpgradeLimiter) UpdateNodeBytes(nodeType nodeconfigs.NodeRole, nodeId int64, bytes int64) { + this.locker.Lock() + defer this.locker.Unlock() + + // 先清理 + var nowTime = time.Now().Unix() + this.gc(nowTime) + + // 添加 + var key = nodeType + "_" + types.String(nodeId) + this.nodeMap[key] = nowTime + + // 流量 + if this.rateTimestamp == nowTime { + this.rateBytes += bytes + } else { + this.rateTimestamp = nowTime + this.rateBytes = bytes + } +} + +// CanUpgrade 检查是否有新的升级 +func (this *UpgradeLimiter) CanUpgrade() bool { + this.locker.Lock() + defer this.locker.Unlock() + + var nowTime = time.Now().Unix() + this.gc(nowTime) + + // 限制并发节点数 + if len(this.nodeMap) >= UpgradeLimiterConcurrent { + return false + } + + if this.rateTimestamp != nowTime { + return true + } + + // 限制下载速度 + if this.rateBytes >= UpgradeLimiterMaxBytesPerSecond { + return false + } + + return true +} + +func (this *UpgradeLimiter) gc(nowTime int64) { + for nodeKey, timestamp := range this.nodeMap { + if timestamp < nowTime-UpgradeLimiterDuration { + delete(this.nodeMap, nodeKey) + } + } +} diff --git a/internal/installers/upgrade_limiter_test.go b/internal/installers/upgrade_limiter_test.go new file mode 100644 index 00000000..3e48bde8 --- /dev/null +++ b/internal/installers/upgrade_limiter_test.go @@ -0,0 +1,27 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package installers_test + +import ( + "github.com/TeaOSLab/EdgeAPI/internal/installers" + "github.com/TeaOSLab/EdgeAPI/internal/utils/sizes" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" + "testing" + "time" +) + +func TestNewUpgradeLimiter(t *testing.T) { + var limiter = installers.NewUpgradeLimiter() + limiter.UpdateNodeBytes(nodeconfigs.NodeRoleNode, 1, 1) + limiter.UpdateNodeBytes(nodeconfigs.NodeRoleNode, 2, 5*sizes.M) + t.Log("limiter:", limiter) + t.Log("canUpgrade:", limiter.CanUpgrade()) + + time.Sleep(1 * time.Second) + t.Log("canUpgrade:", limiter.CanUpgrade()) + t.Log("limiter:", limiter) + limiter.UpdateNodeBytes(nodeconfigs.NodeRoleNode, 2, 4*sizes.M) + t.Log("canUpgrade:", limiter.CanUpgrade()) + + t.Log("limiter:", limiter) +} diff --git a/internal/rpc/services/nameservers/service_ns_node.go b/internal/rpc/services/nameservers/service_ns_node.go index 24dfd75a..c991b78e 100644 --- a/internal/rpc/services/nameservers/service_ns_node.go +++ b/internal/rpc/services/nameservers/service_ns_node.go @@ -462,12 +462,12 @@ func (this *NSNodeService) CheckNSNodeLatestVersion(ctx context.Context, req *pb // DownloadNSNodeInstallationFile 下载最新DNS节点安装文件 func (this *NSNodeService) DownloadNSNodeInstallationFile(ctx context.Context, req *pb.DownloadNSNodeInstallationFileRequest) (*pb.DownloadNSNodeInstallationFileResponse, error) { - _, err := this.ValidateNSNode(ctx) + nodeId, err := this.ValidateNSNode(ctx) if err != nil { return nil, err } - file := installers.SharedDeployManager.FindNSNodeFile(req.Os, req.Arch) + var file = installers.SharedDeployManager.FindNSNodeFile(req.Os, req.Arch) if file == nil { return &pb.DownloadNSNodeInstallationFileResponse{}, nil } @@ -482,6 +482,9 @@ func (this *NSNodeService) DownloadNSNodeInstallationFile(ctx context.Context, r return nil, err } + // 增加下载速度监控 + installers.SharedUpgradeLimiter.UpdateNodeBytes(nodeconfigs.NodeRoleDNS, nodeId, int64(len(data))) + return &pb.DownloadNSNodeInstallationFileResponse{ Sum: sum, Offset: offset, diff --git a/internal/rpc/services/service_node.go b/internal/rpc/services/service_node.go index 2a0ee728..7128484b 100644 --- a/internal/rpc/services/service_node.go +++ b/internal/rpc/services/service_node.go @@ -1602,12 +1602,12 @@ func (this *NodeService) UpdateNodeUp(ctx context.Context, req *pb.UpdateNodeUpR // DownloadNodeInstallationFile 下载最新边缘节点安装文件 func (this *NodeService) DownloadNodeInstallationFile(ctx context.Context, req *pb.DownloadNodeInstallationFileRequest) (*pb.DownloadNodeInstallationFileResponse, error) { - _, err := this.ValidateNode(ctx) + nodeId, err := this.ValidateNode(ctx) if err != nil { return nil, err } - file := installers.SharedDeployManager.FindNodeFile(req.Os, req.Arch) + var file = installers.SharedDeployManager.FindNodeFile(req.Os, req.Arch) if file == nil { return &pb.DownloadNodeInstallationFileResponse{}, nil } @@ -1622,6 +1622,9 @@ func (this *NodeService) DownloadNodeInstallationFile(ctx context.Context, req * return nil, err } + // 增加下载速度监控 + installers.SharedUpgradeLimiter.UpdateNodeBytes(nodeconfigs.NodeRoleNode, nodeId, int64(len(data))) + return &pb.DownloadNodeInstallationFileResponse{ Sum: sum, Offset: offset, diff --git a/internal/rpc/services/service_node_task.go b/internal/rpc/services/service_node_task.go index 91ff3e6c..e4a49102 100644 --- a/internal/rpc/services/service_node_task.go +++ b/internal/rpc/services/service_node_task.go @@ -32,7 +32,7 @@ func (this *NodeTaskService) FindNodeTasks(ctx context.Context, req *pb.FindNode return nil, err } - pbTasks := []*pb.NodeTask{} + var pbTasks = []*pb.NodeTask{} for _, task := range tasks { pbTasks = append(pbTasks, &pb.NodeTask{ Id: int64(task.Id), @@ -44,7 +44,7 @@ func (this *NodeTaskService) FindNodeTasks(ctx context.Context, req *pb.FindNode } // 边缘节点版本更新任务 - if nodeType == rpcutils.UserTypeNode { + if nodeType == rpcutils.UserTypeNode && installers.SharedUpgradeLimiter.CanUpgrade() { status, err := models.SharedNodeDAO.FindNodeStatus(tx, nodeId) if err != nil { return nil, err