diff --git a/internal/caches/open_file.go b/internal/caches/open_file.go index e4f884c..4565d1e 100644 --- a/internal/caches/open_file.go +++ b/internal/caches/open_file.go @@ -12,14 +12,16 @@ type OpenFile struct { meta []byte header []byte version int64 + size int64 } -func NewOpenFile(fp *os.File, meta []byte, header []byte, version int64) *OpenFile { +func NewOpenFile(fp *os.File, meta []byte, header []byte, version int64, size int64) *OpenFile { return &OpenFile{ fp: fp, meta: meta, header: header, version: version, + size: size, } } diff --git a/internal/caches/open_file_cache.go b/internal/caches/open_file_cache.go index def7177..c814964 100644 --- a/internal/caches/open_file_cache.go +++ b/internal/caches/open_file_cache.go @@ -3,7 +3,9 @@ package caches import ( + "fmt" "github.com/TeaOSLab/EdgeNode/internal/goman" + "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils/linkedlist" "github.com/fsnotify/fsnotify" "github.com/iwind/TeaGo/logs" @@ -14,6 +16,10 @@ import ( "time" ) +const ( + maxOpenFileSize = 256 << 20 +) + type OpenFileCache struct { poolMap map[string]*OpenFilePool // file path => Pool poolList *linkedlist.List[*OpenFilePool] @@ -21,19 +27,23 @@ type OpenFileCache struct { locker sync.RWMutex - maxSize int - count int + maxCount int + capacitySize int64 + + count int + usedSize int64 } -func NewOpenFileCache(maxSize int) (*OpenFileCache, error) { - if maxSize <= 0 { - maxSize = 16384 +func NewOpenFileCache(maxCount int) (*OpenFileCache, error) { + if maxCount <= 0 { + maxCount = 16384 } var cache = &OpenFileCache{ - maxSize: maxSize, - poolMap: map[string]*OpenFilePool{}, - poolList: linkedlist.NewList[*OpenFilePool](), + maxCount: maxCount, + poolMap: map[string]*OpenFilePool{}, + poolList: linkedlist.NewList[*OpenFilePool](), + capacitySize: (int64(utils.SystemMemoryGB()) << 30) / 16, } watcher, err := fsnotify.NewWatcher() @@ -58,24 +68,36 @@ func (this *OpenFileCache) Get(filename string) *OpenFile { pool, ok := this.poolMap[filename] this.locker.RUnlock() if ok { - file, consumed := pool.Get() + file, consumed, consumedSize := pool.Get() if consumed { this.locker.Lock() this.count-- + this.usedSize -= consumedSize // pool如果为空,也不需要从列表中删除,避免put时需要重新创建 this.locker.Unlock() } + return file } return nil } func (this *OpenFileCache) Put(filename string, file *OpenFile) { + if file.size > maxOpenFileSize { + return + } + this.locker.Lock() defer this.locker.Unlock() + // 如果超过当前容量,则关闭最早的 + if this.count >= this.maxCount || this.usedSize >= this.capacitySize { + this.consumeHead() + return + } + pool, ok := this.poolMap[filename] var success bool if ok { @@ -92,35 +114,7 @@ func (this *OpenFileCache) Put(filename string, file *OpenFile) { // 检查长度 if success { this.count++ - - // 如果超过当前容量,则关闭最早的 - if this.count > this.maxSize { - var delta = this.maxSize / 100 // 清理1% - if delta == 0 { - delta = 1 - } - for i := 0; i < delta; i++ { - var head = this.poolList.Head() - if head == nil { - break - } - - var headPool = head.Value - headFile, consumed := headPool.Get() - if consumed { - this.count-- - if headFile != nil { - _ = headFile.Close() - } - } - - if headPool.Len() == 0 { - delete(this.poolMap, headPool.filename) - this.poolList.Remove(head) - _ = this.watcher.Remove(headPool.filename) - } - } - } + this.usedSize += file.size } } @@ -136,6 +130,7 @@ func (this *OpenFileCache) Close(filename string) { this.poolList.Remove(pool.linkItem) _ = this.watcher.Remove(filename) this.count -= pool.Len() + this.usedSize -= pool.usedSize } this.locker.Unlock() @@ -155,14 +150,19 @@ func (this *OpenFileCache) CloseAll() { this.poolList.Reset() _ = this.watcher.Close() this.count = 0 + this.usedSize = 0 this.locker.Unlock() } +func (this *OpenFileCache) SetCapacity(capacityBytes int64) { + this.capacitySize = capacityBytes +} + func (this *OpenFileCache) Debug() { var ticker = time.NewTicker(5 * time.Second) goman.New(func() { for range ticker.C { - logs.Println("==== " + types.String(this.count) + " ====") + logs.Println("==== " + types.String(this.count) + ", " + fmt.Sprintf("%.4fMB", float64(this.usedSize)/(1<<20)) + " ====") this.poolList.Range(func(item *linkedlist.Item[*OpenFilePool]) (goNext bool) { logs.Println(filepath.Base(item.Value.Filename()), item.Value.Len()) return true @@ -170,3 +170,35 @@ func (this *OpenFileCache) Debug() { } }) } + +func (this *OpenFileCache) consumeHead() { + var delta = 1 + + if this.count > 100 { + delta = 2 + } + + for i := 0; i < delta; i++ { + var head = this.poolList.Head() + if head == nil { + break + } + + var headPool = head.Value + headFile, consumed, consumedSize := headPool.Get() + if consumed { + this.count-- + this.usedSize -= consumedSize + + if headFile != nil { + _ = headFile.Close() + } + } + + if headPool.Len() == 0 { + delete(this.poolMap, headPool.filename) + this.poolList.Remove(head) + _ = this.watcher.Remove(headPool.filename) + } + } +} diff --git a/internal/caches/open_file_cache_test.go b/internal/caches/open_file_cache_test.go index 85211a6..c06c98a 100644 --- a/internal/caches/open_file_cache_test.go +++ b/internal/caches/open_file_cache_test.go @@ -5,6 +5,7 @@ package caches_test import ( "github.com/TeaOSLab/EdgeNode/internal/caches" "github.com/TeaOSLab/EdgeNode/internal/utils/testutils" + "github.com/iwind/TeaGo/types" "testing" "time" ) @@ -15,13 +16,14 @@ func TestNewOpenFileCache_Close(t *testing.T) { t.Fatal(err) } cache.Debug() - cache.Put("a.txt", caches.NewOpenFile(nil, nil, nil, 0)) - cache.Put("b.txt", caches.NewOpenFile(nil, nil, nil, 0)) - cache.Put("b.txt", caches.NewOpenFile(nil, nil, nil, 0)) - cache.Put("b.txt", caches.NewOpenFile(nil, nil, nil, 0)) - cache.Put("c.txt", caches.NewOpenFile(nil, nil, nil, 0)) + cache.Put("a.txt", caches.NewOpenFile(nil, nil, nil, 0, 1<<20)) + cache.Put("b.txt", caches.NewOpenFile(nil, nil, nil, 0, 1<<20)) + cache.Put("b.txt", caches.NewOpenFile(nil, nil, nil, 0, 1<<20)) + cache.Put("b.txt", caches.NewOpenFile(nil, nil, nil, 0, 1<<20)) + cache.Put("c.txt", caches.NewOpenFile(nil, nil, nil, 0, 1<<20)) + cache.Get("b.txt") - cache.Get("d.txt") + cache.Get("d.txt") // not exist cache.Close("a.txt") if testutils.IsSingleTesting() { @@ -29,15 +31,34 @@ func TestNewOpenFileCache_Close(t *testing.T) { } } +func TestNewOpenFileCache_OverSize(t *testing.T) { + cache, err := caches.NewOpenFileCache(1024) + if err != nil { + t.Fatal(err) + } + + cache.SetCapacity(1 << 30) + + cache.Debug() + + for i := 0; i < 100; i++ { + cache.Put("a"+types.String(i)+".txt", caches.NewOpenFile(nil, nil, nil, 0, 128<<20)) + } + + if testutils.IsSingleTesting() { + time.Sleep(100 * time.Second) + } +} + func TestNewOpenFileCache_CloseAll(t *testing.T) { cache, err := caches.NewOpenFileCache(1024) if err != nil { t.Fatal(err) } cache.Debug() - cache.Put("a.txt", caches.NewOpenFile(nil, nil, nil, 0)) - cache.Put("b.txt", caches.NewOpenFile(nil, nil, nil, 0)) - cache.Put("c.txt", caches.NewOpenFile(nil, nil, nil, 0)) + cache.Put("a.txt", caches.NewOpenFile(nil, nil, nil, 0, 1)) + cache.Put("b.txt", caches.NewOpenFile(nil, nil, nil, 0, 1)) + cache.Put("c.txt", caches.NewOpenFile(nil, nil, nil, 0, 1)) cache.Get("b.txt") cache.Get("d.txt") cache.CloseAll() diff --git a/internal/caches/open_file_pool.go b/internal/caches/open_file_pool.go index 7934f15..8375234 100644 --- a/internal/caches/open_file_pool.go +++ b/internal/caches/open_file_pool.go @@ -13,6 +13,7 @@ type OpenFilePool struct { filename string version int64 isClosed bool + usedSize int64 } func NewOpenFilePool(filename string) *OpenFilePool { @@ -29,27 +30,29 @@ func (this *OpenFilePool) Filename() string { return this.filename } -func (this *OpenFilePool) Get() (*OpenFile, bool) { +func (this *OpenFilePool) Get() (resultFile *OpenFile, consumed bool, consumedSize int64) { // 如果已经关闭,直接返回 if this.isClosed { - return nil, false + return nil, false, 0 } select { case file := <-this.c: if file != nil { + this.usedSize -= file.size + err := file.SeekStart() if err != nil { _ = file.Close() - return nil, true + return nil, true, file.size } file.version = this.version - return file, true + return file, true, file.size } - return nil, false + return nil, false, 0 default: - return nil, false + return nil, false, 0 } } @@ -69,6 +72,7 @@ func (this *OpenFilePool) Put(file *OpenFile) bool { // 加入Pool select { case this.c <- file: + this.usedSize += file.size return true default: // 多余的直接关闭 @@ -81,6 +85,10 @@ func (this *OpenFilePool) Len() int { return len(this.c) } +func (this *OpenFilePool) TotalSize() int64 { + return this.usedSize +} + func (this *OpenFilePool) SetClosing() { this.isClosed = true } diff --git a/internal/caches/open_file_pool_test.go b/internal/caches/open_file_pool_test.go index ca7f8bd..bc1d58c 100644 --- a/internal/caches/open_file_pool_test.go +++ b/internal/caches/open_file_pool_test.go @@ -13,15 +13,15 @@ func TestOpenFilePool_Get(t *testing.T) { var pool = caches.NewOpenFilePool("a") t.Log(pool.Filename()) t.Log(pool.Get()) - t.Log(pool.Put(caches.NewOpenFile(nil, nil, nil, 0))) + t.Log(pool.Put(caches.NewOpenFile(nil, nil, nil, 0, 1))) t.Log(pool.Get()) t.Log(pool.Get()) } func TestOpenFilePool_Close(t *testing.T) { var pool = caches.NewOpenFilePool("a") - pool.Put(caches.NewOpenFile(nil, nil, nil, 0)) - pool.Put(caches.NewOpenFile(nil, nil, nil, 0)) + pool.Put(caches.NewOpenFile(nil, nil, nil, 0, 1)) + pool.Put(caches.NewOpenFile(nil, nil, nil, 0, 1)) pool.Close() } @@ -35,7 +35,7 @@ func TestOpenFilePool_Concurrent(t *testing.T) { defer wg.Done() if rands.Int(0, 1) == 1 { - pool.Put(caches.NewOpenFile(nil, nil, nil, 0)) + pool.Put(caches.NewOpenFile(nil, nil, nil, 0, 1)) } if rands.Int(0, 1) == 0 { pool.Get() diff --git a/internal/caches/reader_file.go b/internal/caches/reader_file.go index 29d987f..9c2146f 100644 --- a/internal/caches/reader_file.go +++ b/internal/caches/reader_file.go @@ -366,7 +366,7 @@ func (this *FileReader) Close() error { } else { var cacheMeta = make([]byte, len(this.meta)) copy(cacheMeta, this.meta) - this.openFileCache.Put(this.fp.Name(), NewOpenFile(this.fp, cacheMeta, this.header, this.LastModified())) + this.openFileCache.Put(this.fp.Name(), NewOpenFile(this.fp, cacheMeta, this.header, this.LastModified(), this.bodySize)) } return nil }