mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-03 15:00:26 +08:00
可以在集群设置中修改节点最大并发读/写数
This commit is contained in:
@@ -28,6 +28,7 @@ import (
|
|||||||
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
||||||
_ "github.com/TeaOSLab/EdgeNode/internal/utils/agents" // 引入Agent管理器
|
_ "github.com/TeaOSLab/EdgeNode/internal/utils/agents" // 引入Agent管理器
|
||||||
_ "github.com/TeaOSLab/EdgeNode/internal/utils/clock" // 触发时钟更新
|
_ "github.com/TeaOSLab/EdgeNode/internal/utils/clock" // 触发时钟更新
|
||||||
|
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/utils/jsonutils"
|
"github.com/TeaOSLab/EdgeNode/internal/utils/jsonutils"
|
||||||
memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem"
|
memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/waf"
|
"github.com/TeaOSLab/EdgeNode/internal/waf"
|
||||||
@@ -880,6 +881,10 @@ func (this *Node) onReload(config *nodeconfigs.NodeConfig, reloadAll bool) {
|
|||||||
nodeconfigs.ResetNodeConfig(config)
|
nodeconfigs.ResetNodeConfig(config)
|
||||||
sharedNodeConfig = config
|
sharedNodeConfig = config
|
||||||
|
|
||||||
|
// 并发读写数
|
||||||
|
fsutils.ReaderLimiter.SetThreads(config.MaxConcurrentReads)
|
||||||
|
fsutils.WriterLimiter.SetThreads(config.MaxConcurrentWrites)
|
||||||
|
|
||||||
if reloadAll {
|
if reloadAll {
|
||||||
// 缓存策略
|
// 缓存策略
|
||||||
var subDirs = config.CacheDiskSubDirs
|
var subDirs = config.CacheDiskSubDirs
|
||||||
|
|||||||
@@ -3,26 +3,27 @@
|
|||||||
package fsutils
|
package fsutils
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
|
||||||
"runtime"
|
"runtime"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
var maxThreads = runtime.NumCPU()
|
var maxThreads = runtime.NumCPU()
|
||||||
var WriterLimiter = NewLimiter(maxThreads)
|
var WriterLimiter = NewLimiter(max(maxThreads, 4))
|
||||||
var ReaderLimiter = NewLimiter(maxThreads)
|
var ReaderLimiter = NewLimiter(max(maxThreads*2, 8))
|
||||||
|
|
||||||
type Limiter struct {
|
type Limiter struct {
|
||||||
threads chan struct{}
|
threads chan struct{}
|
||||||
timers chan *time.Timer
|
count int
|
||||||
|
countDefault int
|
||||||
|
timers chan *time.Timer
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLimiter(threads int) *Limiter {
|
func NewLimiter(threads int) *Limiter {
|
||||||
if threads < 4 {
|
if threads < 4 {
|
||||||
threads = 4
|
threads = 4
|
||||||
}
|
}
|
||||||
if threads > 32 {
|
if threads > 64 {
|
||||||
threads = 32
|
threads = 64
|
||||||
}
|
}
|
||||||
|
|
||||||
var threadsChan = make(chan struct{}, threads)
|
var threadsChan = make(chan struct{}, threads)
|
||||||
@@ -31,8 +32,26 @@ func NewLimiter(threads int) *Limiter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return &Limiter{
|
return &Limiter{
|
||||||
threads: threadsChan,
|
countDefault: threads,
|
||||||
timers: make(chan *time.Timer, 2048),
|
count: threads,
|
||||||
|
threads: threadsChan,
|
||||||
|
timers: make(chan *time.Timer, 4096),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *Limiter) SetThreads(newThreads int) {
|
||||||
|
if newThreads <= 0 {
|
||||||
|
newThreads = this.countDefault
|
||||||
|
}
|
||||||
|
|
||||||
|
if newThreads != this.count {
|
||||||
|
var threadsChan = make(chan struct{}, newThreads)
|
||||||
|
for i := 0; i < newThreads; i++ {
|
||||||
|
threadsChan <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.threads = threadsChan
|
||||||
|
this.count = newThreads
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -72,7 +91,7 @@ func (this *Limiter) Release() {
|
|||||||
select {
|
select {
|
||||||
case this.threads <- struct{}{}:
|
case this.threads <- struct{}{}:
|
||||||
default:
|
default:
|
||||||
remotelogs.Error("FS_LIMITER", "Limiter Ack()/Release() should appeared as a pair")
|
// 由于容量可能有变化,这里忽略多余的thread
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -6,10 +6,32 @@ import (
|
|||||||
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
|
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/utils/testutils"
|
"github.com/TeaOSLab/EdgeNode/internal/utils/testutils"
|
||||||
"github.com/iwind/TeaGo/assert"
|
"github.com/iwind/TeaGo/assert"
|
||||||
|
"math/rand"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestLimiter_SetThreads(t *testing.T) {
|
||||||
|
var limiter = fsutils.NewLimiter(4)
|
||||||
|
|
||||||
|
var concurrent = 1024
|
||||||
|
|
||||||
|
var wg = sync.WaitGroup{}
|
||||||
|
wg.Add(concurrent)
|
||||||
|
|
||||||
|
for i := 0; i < concurrent; i++ {
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
limiter.SetThreads(rand.Int() % 128)
|
||||||
|
limiter.TryAck()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
func TestLimiter_Ack(t *testing.T) {
|
func TestLimiter_Ack(t *testing.T) {
|
||||||
var a = assert.NewAssertion(t)
|
var a = assert.NewAssertion(t)
|
||||||
|
|
||||||
|
|||||||
36
internal/utils/percpu/chan.go
Normal file
36
internal/utils/percpu/chan.go
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||||
|
|
||||||
|
package percpu
|
||||||
|
|
||||||
|
import (
|
||||||
|
"runtime"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Chan[T any] struct {
|
||||||
|
c chan T
|
||||||
|
|
||||||
|
count int
|
||||||
|
cList []chan T
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewChan[T any](size int) *Chan[T] {
|
||||||
|
var count = max(runtime.NumCPU(), runtime.GOMAXPROCS(0))
|
||||||
|
var cList []chan T
|
||||||
|
for i := 0; i < count; i++ {
|
||||||
|
cList = append(cList, make(chan T, size))
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Chan[T]{
|
||||||
|
c: make(chan T, size),
|
||||||
|
count: count,
|
||||||
|
cList: cList,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *Chan[T]) C() chan T {
|
||||||
|
var procId = GetProcId()
|
||||||
|
if procId < this.count {
|
||||||
|
return this.cList[procId]
|
||||||
|
}
|
||||||
|
return this.c
|
||||||
|
}
|
||||||
23
internal/utils/percpu/chan_test.go
Normal file
23
internal/utils/percpu/chan_test.go
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||||
|
|
||||||
|
package percpu_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/TeaOSLab/EdgeNode/internal/utils/percpu"
|
||||||
|
"github.com/TeaOSLab/EdgeNode/internal/zero"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestChan_C(t *testing.T) {
|
||||||
|
var c = percpu.NewChan[zero.Zero](10)
|
||||||
|
c.C() <- zero.Zero{}
|
||||||
|
|
||||||
|
t.Log(<-c.C())
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-c.C():
|
||||||
|
t.Fatal("should not return from here")
|
||||||
|
default:
|
||||||
|
t.Log("ok")
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user