diff --git a/internal/caches/max_open_files.go b/internal/caches/max_open_files.go index dfcb30b..c911230 100644 --- a/internal/caches/max_open_files.go +++ b/internal/caches/max_open_files.go @@ -3,44 +3,24 @@ package caches import ( - teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/goman" "sync/atomic" "time" ) const ( - minOpenFilesValue int32 = 4 - maxOpenFilesValue int32 = 65535 - modeSlow int32 = 1 modeFast int32 = 2 ) // MaxOpenFiles max open files manager type MaxOpenFiles struct { - step int32 - maxOpenFiles int32 - ptr *int32 - ticker *time.Ticker - mode int32 - - lastOpens int32 - currentOpens int32 + ticker *time.Ticker + mode int32 } -func NewMaxOpenFiles(step int32) *MaxOpenFiles { - if step <= 0 { - step = 2 - } - var f = &MaxOpenFiles{ - step: step, - maxOpenFiles: minOpenFilesValue, - } - if teaconst.DiskIsFast { - f.maxOpenFiles = 32 - } - f.ptr = &f.maxOpenFiles +func NewMaxOpenFiles() *MaxOpenFiles { + var f = &MaxOpenFiles{} f.ticker = time.NewTicker(1 * time.Second) f.init() return f @@ -49,50 +29,24 @@ func NewMaxOpenFiles(step int32) *MaxOpenFiles { func (this *MaxOpenFiles) init() { goman.New(func() { for range this.ticker.C { - var mod = atomic.LoadInt32(&this.mode) - switch mod { - case modeSlow: - // we decrease more quickly, with more steps - if atomic.AddInt32(this.ptr, -this.step*2) <= 0 { - atomic.StoreInt32(this.ptr, minOpenFilesValue) - } - case modeFast: - // we increase only when file opens increases - var currentOpens = atomic.LoadInt32(&this.currentOpens) - if currentOpens > this.lastOpens { - if atomic.AddInt32(this.ptr, this.step) >= maxOpenFilesValue { - atomic.StoreInt32(this.ptr, maxOpenFilesValue) - } - } - this.lastOpens = currentOpens - atomic.StoreInt32(&this.currentOpens, 0) - } - // reset mode - atomic.StoreInt32(&this.mode, 0) + atomic.StoreInt32(&this.mode, modeFast) } }) } func (this *MaxOpenFiles) Fast() { - if atomic.LoadInt32(&this.mode) == 0 { - this.mode = modeFast - } - atomic.AddInt32(&this.currentOpens, 1) + atomic.AddInt32(&this.mode, modeFast) +} + +func (this *MaxOpenFiles) FinishAll() { + this.Fast() } func (this *MaxOpenFiles) Slow() { atomic.StoreInt32(&this.mode, modeSlow) } -func (this *MaxOpenFiles) Max() int32 { - if atomic.LoadInt32(&this.mode) == modeSlow { - return 0 - } - - var v = atomic.LoadInt32(this.ptr) - if v <= minOpenFilesValue { - return minOpenFilesValue - } - return v +func (this *MaxOpenFiles) Next() bool { + return atomic.LoadInt32(&this.mode) != modeSlow } diff --git a/internal/caches/max_open_files_test.go b/internal/caches/max_open_files_test.go index 814d21d..2b37ab4 100644 --- a/internal/caches/max_open_files_test.go +++ b/internal/caches/max_open_files_test.go @@ -9,20 +9,27 @@ import ( ) func TestNewMaxOpenFiles(t *testing.T) { - var maxOpenFiles = caches.NewMaxOpenFiles(2) + var maxOpenFiles = caches.NewMaxOpenFiles() maxOpenFiles.Fast() - t.Log(maxOpenFiles.Max()) + t.Log("fast:", maxOpenFiles.Next()) + + maxOpenFiles.Slow() + t.Log("slow:", maxOpenFiles.Next()) + time.Sleep(1*time.Second + 1*time.Millisecond) + t.Log("slow 1 second:", maxOpenFiles.Next()) + + maxOpenFiles.Slow() + t.Log("slow:", maxOpenFiles.Next()) + + maxOpenFiles.Slow() + t.Log("slow:", maxOpenFiles.Next()) - maxOpenFiles.Fast() time.Sleep(1 * time.Second) - t.Log(maxOpenFiles.Max()) + t.Log("slow 1 second:", maxOpenFiles.Next()) maxOpenFiles.Slow() - t.Log(maxOpenFiles.Max()) + t.Log("slow:", maxOpenFiles.Next()) - maxOpenFiles.Slow() - t.Log(maxOpenFiles.Max()) - - maxOpenFiles.Slow() - t.Log(maxOpenFiles.Max()) + maxOpenFiles.Fast() + t.Log("fast:", maxOpenFiles.Next()) } diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index bee5cb2..321fedc 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -63,7 +63,7 @@ const ( var sharedWritingFileKeyMap = map[string]zero.Zero{} // key => bool var sharedWritingFileKeyLocker = sync.Mutex{} -var maxOpenFiles = NewMaxOpenFiles(2) +var maxOpenFiles = NewMaxOpenFiles() const maxOpenFilesSlowCost = 500 * time.Microsecond // 0.5ms @@ -428,7 +428,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz return nil, ErrFileIsWriting } - if !isFlushing && len(sharedWritingFileKeyMap) >= int(maxOpenFiles.Max()) { + if !isFlushing && !maxOpenFiles.Next() { sharedWritingFileKeyLocker.Unlock() return nil, ErrTooManyOpenFiles } @@ -439,6 +439,9 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz if !isOk { sharedWritingFileKeyLocker.Lock() delete(sharedWritingFileKeyMap, key) + if len(sharedWritingFileKeyMap) == 0 { + maxOpenFiles.FinishAll() + } sharedWritingFileKeyLocker.Unlock() } }() @@ -608,12 +611,18 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz return NewPartialFileWriter(writer, key, expiredAt, isNewCreated, isPartial, partialBodyOffset, ranges, func() { sharedWritingFileKeyLocker.Lock() delete(sharedWritingFileKeyMap, key) + if len(sharedWritingFileKeyMap) == 0 { + maxOpenFiles.FinishAll() + } sharedWritingFileKeyLocker.Unlock() }), nil } else { return NewFileWriter(this, writer, key, expiredAt, -1, func() { sharedWritingFileKeyLocker.Lock() delete(sharedWritingFileKeyMap, key) + if len(sharedWritingFileKeyMap) == 0 { + maxOpenFiles.FinishAll() + } sharedWritingFileKeyLocker.Unlock() }), nil }