From a951ecbb1071af9c0427485dce0212fc50542885 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Tue, 30 Apr 2024 19:09:40 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8F=AF=E4=BB=A5=E5=9C=A8=E9=9B=86=E7=BE=A4?= =?UTF-8?q?=E8=AE=BE=E7=BD=AE=E4=B8=AD=E4=BF=AE=E6=94=B9=E8=8A=82=E7=82=B9?= =?UTF-8?q?=E6=9C=80=E5=A4=A7=E5=B9=B6=E5=8F=91=E8=AF=BB/=E5=86=99?= =?UTF-8?q?=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/nodes/node.go | 5 ++++ internal/utils/fs/limiter.go | 39 ++++++++++++++++++++++-------- internal/utils/fs/limiter_test.go | 22 +++++++++++++++++ internal/utils/percpu/chan.go | 36 +++++++++++++++++++++++++++ internal/utils/percpu/chan_test.go | 23 ++++++++++++++++++ 5 files changed, 115 insertions(+), 10 deletions(-) create mode 100644 internal/utils/percpu/chan.go create mode 100644 internal/utils/percpu/chan_test.go diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 5acaace..11f46ef 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -28,6 +28,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/utils" _ "github.com/TeaOSLab/EdgeNode/internal/utils/agents" // 引入Agent管理器 _ "github.com/TeaOSLab/EdgeNode/internal/utils/clock" // 触发时钟更新 + fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" "github.com/TeaOSLab/EdgeNode/internal/utils/jsonutils" memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem" "github.com/TeaOSLab/EdgeNode/internal/waf" @@ -880,6 +881,10 @@ func (this *Node) onReload(config *nodeconfigs.NodeConfig, reloadAll bool) { nodeconfigs.ResetNodeConfig(config) sharedNodeConfig = config + // 并发读写数 + fsutils.ReaderLimiter.SetThreads(config.MaxConcurrentReads) + fsutils.WriterLimiter.SetThreads(config.MaxConcurrentWrites) + if reloadAll { // 缓存策略 var subDirs = config.CacheDiskSubDirs diff --git a/internal/utils/fs/limiter.go b/internal/utils/fs/limiter.go index 5206898..f1a49d3 100644 --- a/internal/utils/fs/limiter.go +++ b/internal/utils/fs/limiter.go @@ -3,26 +3,27 @@ package fsutils import ( - "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "runtime" "time" ) var maxThreads = runtime.NumCPU() -var WriterLimiter = NewLimiter(maxThreads) -var ReaderLimiter = NewLimiter(maxThreads) +var WriterLimiter = NewLimiter(max(maxThreads, 4)) +var ReaderLimiter = NewLimiter(max(maxThreads*2, 8)) type Limiter struct { - threads chan struct{} - timers chan *time.Timer + threads chan struct{} + count int + countDefault int + timers chan *time.Timer } func NewLimiter(threads int) *Limiter { if threads < 4 { threads = 4 } - if threads > 32 { - threads = 32 + if threads > 64 { + threads = 64 } var threadsChan = make(chan struct{}, threads) @@ -31,8 +32,26 @@ func NewLimiter(threads int) *Limiter { } return &Limiter{ - threads: threadsChan, - timers: make(chan *time.Timer, 2048), + countDefault: threads, + count: threads, + threads: threadsChan, + timers: make(chan *time.Timer, 4096), + } +} + +func (this *Limiter) SetThreads(newThreads int) { + if newThreads <= 0 { + newThreads = this.countDefault + } + + if newThreads != this.count { + var threadsChan = make(chan struct{}, newThreads) + for i := 0; i < newThreads; i++ { + threadsChan <- struct{}{} + } + + this.threads = threadsChan + this.count = newThreads } } @@ -72,7 +91,7 @@ func (this *Limiter) Release() { select { case this.threads <- struct{}{}: default: - remotelogs.Error("FS_LIMITER", "Limiter Ack()/Release() should appeared as a pair") + // 由于容量可能有变化,这里忽略多余的thread } } diff --git a/internal/utils/fs/limiter_test.go b/internal/utils/fs/limiter_test.go index 67a1ec6..32acfd6 100644 --- a/internal/utils/fs/limiter_test.go +++ b/internal/utils/fs/limiter_test.go @@ -6,10 +6,32 @@ import ( fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" "github.com/TeaOSLab/EdgeNode/internal/utils/testutils" "github.com/iwind/TeaGo/assert" + "math/rand" + "sync" "testing" "time" ) +func TestLimiter_SetThreads(t *testing.T) { + var limiter = fsutils.NewLimiter(4) + + var concurrent = 1024 + + var wg = sync.WaitGroup{} + wg.Add(concurrent) + + for i := 0; i < concurrent; i++ { + go func() { + defer wg.Done() + + limiter.SetThreads(rand.Int() % 128) + limiter.TryAck() + }() + } + + wg.Wait() +} + func TestLimiter_Ack(t *testing.T) { var a = assert.NewAssertion(t) diff --git a/internal/utils/percpu/chan.go b/internal/utils/percpu/chan.go new file mode 100644 index 0000000..63777bc --- /dev/null +++ b/internal/utils/percpu/chan.go @@ -0,0 +1,36 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package percpu + +import ( + "runtime" +) + +type Chan[T any] struct { + c chan T + + count int + cList []chan T +} + +func NewChan[T any](size int) *Chan[T] { + var count = max(runtime.NumCPU(), runtime.GOMAXPROCS(0)) + var cList []chan T + for i := 0; i < count; i++ { + cList = append(cList, make(chan T, size)) + } + + return &Chan[T]{ + c: make(chan T, size), + count: count, + cList: cList, + } +} + +func (this *Chan[T]) C() chan T { + var procId = GetProcId() + if procId < this.count { + return this.cList[procId] + } + return this.c +} diff --git a/internal/utils/percpu/chan_test.go b/internal/utils/percpu/chan_test.go new file mode 100644 index 0000000..7169e26 --- /dev/null +++ b/internal/utils/percpu/chan_test.go @@ -0,0 +1,23 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package percpu_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/utils/percpu" + "github.com/TeaOSLab/EdgeNode/internal/zero" + "testing" +) + +func TestChan_C(t *testing.T) { + var c = percpu.NewChan[zero.Zero](10) + c.C() <- zero.Zero{} + + t.Log(<-c.C()) + + select { + case <-c.C(): + t.Fatal("should not return from here") + default: + t.Log("ok") + } +}