From a7dd101dbf85756f946cc7c8efd06b0376dd365f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E7=A5=A5=E8=B6=85?= Date: Thu, 11 Jan 2024 15:25:47 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A5=97=E9=A4=90=E5=8F=AF=E4=BB=A5=E8=AE=BE?= =?UTF-8?q?=E7=BD=AE=E5=B8=A6=E5=AE=BD=E9=99=90=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/nodes/http_request.go | 7 ++- internal/nodes/http_request_traffic_limit.go | 25 +++++++- internal/nodes/http_writer.go | 2 + internal/nodes/http_writer_ext.go | 4 ++ internal/nodes/node_tasks.go | 2 + internal/nodes/node_tasks_ext.go | 4 ++ internal/rpc/rpc_client.go | 2 + internal/utils/ratelimit/bandwidth.go | 61 +++++++++++++++++++ internal/utils/ratelimit/bandwidth_test.go | 27 ++++++++ internal/{ => utils}/ratelimit/counter.go | 0 .../{ => utils}/ratelimit/counter_test.go | 14 +++-- 11 files changed, 139 insertions(+), 9 deletions(-) create mode 100644 internal/utils/ratelimit/bandwidth.go create mode 100644 internal/utils/ratelimit/bandwidth_test.go rename internal/{ => utils}/ratelimit/counter.go (100%) rename internal/{ => utils}/ratelimit/counter_test.go (65%) diff --git a/internal/nodes/http_request.go b/internal/nodes/http_request.go index f86a884..8216b87 100644 --- a/internal/nodes/http_request.go +++ b/internal/nodes/http_request.go @@ -201,9 +201,10 @@ func (this *HTTPRequest) Do() { // 流量限制 if this.ReqServer.TrafficLimitStatus != nil && this.ReqServer.TrafficLimitStatus.IsValid() { - this.doTrafficLimit() - this.doEnd() - return + if this.doTrafficLimit(this.ReqServer.TrafficLimitStatus) { + this.doEnd() + return + } } // WAF diff --git a/internal/nodes/http_request_traffic_limit.go b/internal/nodes/http_request_traffic_limit.go index 95fb15b..6c6ca5d 100644 --- a/internal/nodes/http_request_traffic_limit.go +++ b/internal/nodes/http_request_traffic_limit.go @@ -7,7 +7,19 @@ import ( ) // 流量限制 -func (this *HTTPRequest) doTrafficLimit() { +func (this *HTTPRequest) doTrafficLimit(status *serverconfigs.TrafficLimitStatus) (blocked bool) { + if status == nil { + return false + } + + // 如果是网站单独设置的流量限制,则检查是否已关闭 + var config = this.ReqServer.TrafficLimit + if (config == nil || !config.IsOn) && status.PlanId == 0 { + return false + } + + // 如果是套餐设置的流量限制,即使套餐变更了(变更套餐或者变更套餐的限制),仍然会提示流量超限 + this.tags = append(this.tags, "trafficLimit") var statusCode = 509 @@ -17,10 +29,19 @@ func (this *HTTPRequest) doTrafficLimit() { this.writer.Header().Set("Content-Type", "text/html; charset=utf-8") this.writer.WriteHeader(statusCode) - var config = this.ReqServer.TrafficLimit + // check plan traffic limit + if (config == nil || !config.IsOn) && this.ReqServer.PlanId() > 0 && this.nodeConfig != nil { + var planConfig = this.nodeConfig.FindPlan(this.ReqServer.PlanId()) + if planConfig != nil && planConfig.TrafficLimit != nil && planConfig.TrafficLimit.IsOn { + config = planConfig.TrafficLimit + } + } + if config != nil && len(config.NoticePageBody) != 0 { _, _ = this.writer.WriteString(this.Format(config.NoticePageBody)) } else { _, _ = this.writer.WriteString(this.Format(serverconfigs.DefaultTrafficLimitNoticePageBody)) } + + return true } diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index 510810c..756214a 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -807,6 +807,8 @@ func (this *HTTPWriter) Write(data []byte) (n int, err error) { } n, err = this.writer.Write(data) + this.checkPlanBandwidth(n) + return } diff --git a/internal/nodes/http_writer_ext.go b/internal/nodes/http_writer_ext.go index b5bbb0f..46cc93b 100644 --- a/internal/nodes/http_writer_ext.go +++ b/internal/nodes/http_writer_ext.go @@ -11,3 +11,7 @@ import ( func (this *HTTPWriter) canSendfile() (*os.File, bool) { return nil, false } + +func (this *HTTPWriter) checkPlanBandwidth(n int) { + // stub +} diff --git a/internal/nodes/node_tasks.go b/internal/nodes/node_tasks.go index b65b953..91f5a2a 100644 --- a/internal/nodes/node_tasks.go +++ b/internal/nodes/node_tasks.go @@ -102,6 +102,8 @@ func (this *Node) execTask(rpcClient *rpc.RPCClient, task *pb.NodeTask) error { err = this.execNetworkSecurityPolicyChangedTask(rpcClient) case "webPPolicyChanged": err = this.execWebPPolicyChangedTask(rpcClient) + case "planChanged": + err = this.execPlanChangedTask(rpcClient) default: // 特殊任务 if strings.HasPrefix(task.Type, "ipListDeleted") { // 删除IP名单 diff --git a/internal/nodes/node_tasks_ext.go b/internal/nodes/node_tasks_ext.go index f65310c..5863677 100644 --- a/internal/nodes/node_tasks_ext.go +++ b/internal/nodes/node_tasks_ext.go @@ -34,3 +34,7 @@ func (this *Node) execNetworkSecurityPolicyChangedTask(rpcClient *rpc.RPCClient) // stub return nil } + +func (this *Node) execPlanChangedTask(rpcClient *rpc.RPCClient) error { + return nil +} diff --git a/internal/rpc/rpc_client.go b/internal/rpc/rpc_client.go index 375efcd..a006e01 100644 --- a/internal/rpc/rpc_client.go +++ b/internal/rpc/rpc_client.go @@ -55,6 +55,7 @@ type RPCClient struct { ClientAgentIPRPC pb.ClientAgentIPServiceClient AuthorityKeyRPC pb.AuthorityKeyServiceClient UpdatingServerListRPC pb.UpdatingServerListServiceClient + PlanRPC pb.PlanServiceClient } func NewRPCClient(apiConfig *configs.APIConfig) (*RPCClient, error) { @@ -91,6 +92,7 @@ func NewRPCClient(apiConfig *configs.APIConfig) (*RPCClient, error) { client.ClientAgentIPRPC = pb.NewClientAgentIPServiceClient(client) client.AuthorityKeyRPC = pb.NewAuthorityKeyServiceClient(client) client.UpdatingServerListRPC = pb.NewUpdatingServerListServiceClient(client) + client.PlanRPC = pb.NewPlanServiceClient(client) err := client.init() if err != nil { diff --git a/internal/utils/ratelimit/bandwidth.go b/internal/utils/ratelimit/bandwidth.go new file mode 100644 index 0000000..993ca96 --- /dev/null +++ b/internal/utils/ratelimit/bandwidth.go @@ -0,0 +1,61 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package ratelimit + +import ( + "context" + "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" + "sync/atomic" + "time" +) + +// Bandwidth lossy bandwidth limiter +type Bandwidth struct { + totalBytes int64 + + currentTimestamp int64 + currentBytes int64 +} + +// NewBandwidth create new bandwidth limiter +func NewBandwidth(totalBytes int64) *Bandwidth { + return &Bandwidth{totalBytes: totalBytes} +} + +// Ack acquire next chance to send data +func (this *Bandwidth) Ack(ctx context.Context, newBytes int) { + if newBytes <= 0 { + return + } + if this.totalBytes <= 0 { + return + } + + var timestamp = fasttime.Now().Unix() + if this.currentTimestamp != 0 && this.currentTimestamp != timestamp { + this.currentTimestamp = timestamp + this.currentBytes = int64(newBytes) + + // 第一次发送直接放行,不需要判断 + return + } + + if this.currentTimestamp == 0 { + this.currentTimestamp = timestamp + } + if atomic.AddInt64(&this.currentBytes, int64(newBytes)) <= this.totalBytes { + return + } + + var timeout = time.NewTimer(1 * time.Second) + if ctx != nil { + select { + case <-timeout.C: + case <-ctx.Done(): + } + } else { + select { + case <-timeout.C: + } + } +} diff --git a/internal/utils/ratelimit/bandwidth_test.go b/internal/utils/ratelimit/bandwidth_test.go new file mode 100644 index 0000000..c87b424 --- /dev/null +++ b/internal/utils/ratelimit/bandwidth_test.go @@ -0,0 +1,27 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package ratelimit_test + +import ( + "context" + "github.com/TeaOSLab/EdgeNode/internal/utils/ratelimit" + "github.com/TeaOSLab/EdgeNode/internal/utils/testutils" + "testing" +) + +func TestBandwidth(t *testing.T) { + if !testutils.IsSingleTesting() { + return + } + + var bandwidth = ratelimit.NewBandwidth(32 << 10) + bandwidth.Ack(context.Background(), 123) + bandwidth.Ack(context.Background(), 16 << 10) + bandwidth.Ack(context.Background(), 32 << 10) +} + +func TestBandwidth_0(t *testing.T) { + var bandwidth = ratelimit.NewBandwidth(0) + bandwidth.Ack(context.Background(), 123) + bandwidth.Ack(context.Background(), 123456) +} diff --git a/internal/ratelimit/counter.go b/internal/utils/ratelimit/counter.go similarity index 100% rename from internal/ratelimit/counter.go rename to internal/utils/ratelimit/counter.go diff --git a/internal/ratelimit/counter_test.go b/internal/utils/ratelimit/counter_test.go similarity index 65% rename from internal/ratelimit/counter_test.go rename to internal/utils/ratelimit/counter_test.go index 9587064..4db62bb 100644 --- a/internal/ratelimit/counter_test.go +++ b/internal/utils/ratelimit/counter_test.go @@ -1,14 +1,20 @@ // Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. -package ratelimit +package ratelimit_test import ( + "github.com/TeaOSLab/EdgeNode/internal/utils/ratelimit" + "github.com/TeaOSLab/EdgeNode/internal/utils/testutils" "testing" "time" ) func TestCounter_ACK(t *testing.T) { - var counter = NewCounter(10) + if !testutils.IsSingleTesting() { + return + } + + var counter = ratelimit.NewCounter(10) go func() { for i := 0; i < 10; i++ { @@ -26,7 +32,7 @@ func TestCounter_ACK(t *testing.T) { } func TestCounter_Release(t *testing.T) { - var counter = NewCounter(10) + var counter = ratelimit.NewCounter(10) for i := 0; i < 10; i++ { counter.Ack() @@ -34,5 +40,5 @@ func TestCounter_Release(t *testing.T) { for i := 0; i < 10; i++ { counter.Release() } - t.Log(len(counter.sem)) + t.Log(counter.Len()) }