mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-11 05:00:25 +08:00
改进MaxOpenFiles算法
This commit is contained in:
@@ -3,44 +3,24 @@
|
|||||||
package caches
|
package caches
|
||||||
|
|
||||||
import (
|
import (
|
||||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
minOpenFilesValue int32 = 4
|
|
||||||
maxOpenFilesValue int32 = 65535
|
|
||||||
|
|
||||||
modeSlow int32 = 1
|
modeSlow int32 = 1
|
||||||
modeFast int32 = 2
|
modeFast int32 = 2
|
||||||
)
|
)
|
||||||
|
|
||||||
// MaxOpenFiles max open files manager
|
// MaxOpenFiles max open files manager
|
||||||
type MaxOpenFiles struct {
|
type MaxOpenFiles struct {
|
||||||
step int32
|
|
||||||
maxOpenFiles int32
|
|
||||||
ptr *int32
|
|
||||||
ticker *time.Ticker
|
ticker *time.Ticker
|
||||||
mode int32
|
mode int32
|
||||||
|
|
||||||
lastOpens int32
|
|
||||||
currentOpens int32
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMaxOpenFiles(step int32) *MaxOpenFiles {
|
func NewMaxOpenFiles() *MaxOpenFiles {
|
||||||
if step <= 0 {
|
var f = &MaxOpenFiles{}
|
||||||
step = 2
|
|
||||||
}
|
|
||||||
var f = &MaxOpenFiles{
|
|
||||||
step: step,
|
|
||||||
maxOpenFiles: minOpenFilesValue,
|
|
||||||
}
|
|
||||||
if teaconst.DiskIsFast {
|
|
||||||
f.maxOpenFiles = 32
|
|
||||||
}
|
|
||||||
f.ptr = &f.maxOpenFiles
|
|
||||||
f.ticker = time.NewTicker(1 * time.Second)
|
f.ticker = time.NewTicker(1 * time.Second)
|
||||||
f.init()
|
f.init()
|
||||||
return f
|
return f
|
||||||
@@ -49,50 +29,24 @@ func NewMaxOpenFiles(step int32) *MaxOpenFiles {
|
|||||||
func (this *MaxOpenFiles) init() {
|
func (this *MaxOpenFiles) init() {
|
||||||
goman.New(func() {
|
goman.New(func() {
|
||||||
for range this.ticker.C {
|
for range this.ticker.C {
|
||||||
var mod = atomic.LoadInt32(&this.mode)
|
|
||||||
switch mod {
|
|
||||||
case modeSlow:
|
|
||||||
// we decrease more quickly, with more steps
|
|
||||||
if atomic.AddInt32(this.ptr, -this.step*2) <= 0 {
|
|
||||||
atomic.StoreInt32(this.ptr, minOpenFilesValue)
|
|
||||||
}
|
|
||||||
case modeFast:
|
|
||||||
// we increase only when file opens increases
|
|
||||||
var currentOpens = atomic.LoadInt32(&this.currentOpens)
|
|
||||||
if currentOpens > this.lastOpens {
|
|
||||||
if atomic.AddInt32(this.ptr, this.step) >= maxOpenFilesValue {
|
|
||||||
atomic.StoreInt32(this.ptr, maxOpenFilesValue)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
this.lastOpens = currentOpens
|
|
||||||
atomic.StoreInt32(&this.currentOpens, 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
// reset mode
|
// reset mode
|
||||||
atomic.StoreInt32(&this.mode, 0)
|
atomic.StoreInt32(&this.mode, modeFast)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *MaxOpenFiles) Fast() {
|
func (this *MaxOpenFiles) Fast() {
|
||||||
if atomic.LoadInt32(&this.mode) == 0 {
|
atomic.AddInt32(&this.mode, modeFast)
|
||||||
this.mode = modeFast
|
}
|
||||||
}
|
|
||||||
atomic.AddInt32(&this.currentOpens, 1)
|
func (this *MaxOpenFiles) FinishAll() {
|
||||||
|
this.Fast()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *MaxOpenFiles) Slow() {
|
func (this *MaxOpenFiles) Slow() {
|
||||||
atomic.StoreInt32(&this.mode, modeSlow)
|
atomic.StoreInt32(&this.mode, modeSlow)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *MaxOpenFiles) Max() int32 {
|
func (this *MaxOpenFiles) Next() bool {
|
||||||
if atomic.LoadInt32(&this.mode) == modeSlow {
|
return atomic.LoadInt32(&this.mode) != modeSlow
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
var v = atomic.LoadInt32(this.ptr)
|
|
||||||
if v <= minOpenFilesValue {
|
|
||||||
return minOpenFilesValue
|
|
||||||
}
|
|
||||||
return v
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,20 +9,27 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestNewMaxOpenFiles(t *testing.T) {
|
func TestNewMaxOpenFiles(t *testing.T) {
|
||||||
var maxOpenFiles = caches.NewMaxOpenFiles(2)
|
var maxOpenFiles = caches.NewMaxOpenFiles()
|
||||||
maxOpenFiles.Fast()
|
maxOpenFiles.Fast()
|
||||||
t.Log(maxOpenFiles.Max())
|
t.Log("fast:", maxOpenFiles.Next())
|
||||||
|
|
||||||
|
maxOpenFiles.Slow()
|
||||||
|
t.Log("slow:", maxOpenFiles.Next())
|
||||||
|
time.Sleep(1*time.Second + 1*time.Millisecond)
|
||||||
|
t.Log("slow 1 second:", maxOpenFiles.Next())
|
||||||
|
|
||||||
|
maxOpenFiles.Slow()
|
||||||
|
t.Log("slow:", maxOpenFiles.Next())
|
||||||
|
|
||||||
|
maxOpenFiles.Slow()
|
||||||
|
t.Log("slow:", maxOpenFiles.Next())
|
||||||
|
|
||||||
maxOpenFiles.Fast()
|
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
t.Log(maxOpenFiles.Max())
|
t.Log("slow 1 second:", maxOpenFiles.Next())
|
||||||
|
|
||||||
maxOpenFiles.Slow()
|
maxOpenFiles.Slow()
|
||||||
t.Log(maxOpenFiles.Max())
|
t.Log("slow:", maxOpenFiles.Next())
|
||||||
|
|
||||||
maxOpenFiles.Slow()
|
maxOpenFiles.Fast()
|
||||||
t.Log(maxOpenFiles.Max())
|
t.Log("fast:", maxOpenFiles.Next())
|
||||||
|
|
||||||
maxOpenFiles.Slow()
|
|
||||||
t.Log(maxOpenFiles.Max())
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -63,7 +63,7 @@ const (
|
|||||||
var sharedWritingFileKeyMap = map[string]zero.Zero{} // key => bool
|
var sharedWritingFileKeyMap = map[string]zero.Zero{} // key => bool
|
||||||
var sharedWritingFileKeyLocker = sync.Mutex{}
|
var sharedWritingFileKeyLocker = sync.Mutex{}
|
||||||
|
|
||||||
var maxOpenFiles = NewMaxOpenFiles(2)
|
var maxOpenFiles = NewMaxOpenFiles()
|
||||||
|
|
||||||
const maxOpenFilesSlowCost = 500 * time.Microsecond // 0.5ms
|
const maxOpenFilesSlowCost = 500 * time.Microsecond // 0.5ms
|
||||||
|
|
||||||
@@ -428,7 +428,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz
|
|||||||
return nil, ErrFileIsWriting
|
return nil, ErrFileIsWriting
|
||||||
}
|
}
|
||||||
|
|
||||||
if !isFlushing && len(sharedWritingFileKeyMap) >= int(maxOpenFiles.Max()) {
|
if !isFlushing && !maxOpenFiles.Next() {
|
||||||
sharedWritingFileKeyLocker.Unlock()
|
sharedWritingFileKeyLocker.Unlock()
|
||||||
return nil, ErrTooManyOpenFiles
|
return nil, ErrTooManyOpenFiles
|
||||||
}
|
}
|
||||||
@@ -439,6 +439,9 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz
|
|||||||
if !isOk {
|
if !isOk {
|
||||||
sharedWritingFileKeyLocker.Lock()
|
sharedWritingFileKeyLocker.Lock()
|
||||||
delete(sharedWritingFileKeyMap, key)
|
delete(sharedWritingFileKeyMap, key)
|
||||||
|
if len(sharedWritingFileKeyMap) == 0 {
|
||||||
|
maxOpenFiles.FinishAll()
|
||||||
|
}
|
||||||
sharedWritingFileKeyLocker.Unlock()
|
sharedWritingFileKeyLocker.Unlock()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@@ -608,12 +611,18 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz
|
|||||||
return NewPartialFileWriter(writer, key, expiredAt, isNewCreated, isPartial, partialBodyOffset, ranges, func() {
|
return NewPartialFileWriter(writer, key, expiredAt, isNewCreated, isPartial, partialBodyOffset, ranges, func() {
|
||||||
sharedWritingFileKeyLocker.Lock()
|
sharedWritingFileKeyLocker.Lock()
|
||||||
delete(sharedWritingFileKeyMap, key)
|
delete(sharedWritingFileKeyMap, key)
|
||||||
|
if len(sharedWritingFileKeyMap) == 0 {
|
||||||
|
maxOpenFiles.FinishAll()
|
||||||
|
}
|
||||||
sharedWritingFileKeyLocker.Unlock()
|
sharedWritingFileKeyLocker.Unlock()
|
||||||
}), nil
|
}), nil
|
||||||
} else {
|
} else {
|
||||||
return NewFileWriter(this, writer, key, expiredAt, -1, func() {
|
return NewFileWriter(this, writer, key, expiredAt, -1, func() {
|
||||||
sharedWritingFileKeyLocker.Lock()
|
sharedWritingFileKeyLocker.Lock()
|
||||||
delete(sharedWritingFileKeyMap, key)
|
delete(sharedWritingFileKeyMap, key)
|
||||||
|
if len(sharedWritingFileKeyMap) == 0 {
|
||||||
|
maxOpenFiles.FinishAll()
|
||||||
|
}
|
||||||
sharedWritingFileKeyLocker.Unlock()
|
sharedWritingFileKeyLocker.Unlock()
|
||||||
}), nil
|
}), nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user