限制文件句柄缓存内存使用

This commit is contained in:
刘祥超
2023-10-11 21:51:05 +08:00
parent f01eae3590
commit 6ff3230bab
6 changed files with 123 additions and 60 deletions

View File

@@ -12,14 +12,16 @@ type OpenFile struct {
meta []byte meta []byte
header []byte header []byte
version int64 version int64
size int64
} }
func NewOpenFile(fp *os.File, meta []byte, header []byte, version int64) *OpenFile { func NewOpenFile(fp *os.File, meta []byte, header []byte, version int64, size int64) *OpenFile {
return &OpenFile{ return &OpenFile{
fp: fp, fp: fp,
meta: meta, meta: meta,
header: header, header: header,
version: version, version: version,
size: size,
} }
} }

View File

@@ -3,7 +3,9 @@
package caches package caches
import ( import (
"fmt"
"github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/utils/linkedlist" "github.com/TeaOSLab/EdgeNode/internal/utils/linkedlist"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
"github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/logs"
@@ -14,6 +16,10 @@ import (
"time" "time"
) )
const (
maxOpenFileSize = 256 << 20
)
type OpenFileCache struct { type OpenFileCache struct {
poolMap map[string]*OpenFilePool // file path => Pool poolMap map[string]*OpenFilePool // file path => Pool
poolList *linkedlist.List[*OpenFilePool] poolList *linkedlist.List[*OpenFilePool]
@@ -21,19 +27,23 @@ type OpenFileCache struct {
locker sync.RWMutex locker sync.RWMutex
maxSize int maxCount int
count int capacitySize int64
count int
usedSize int64
} }
func NewOpenFileCache(maxSize int) (*OpenFileCache, error) { func NewOpenFileCache(maxCount int) (*OpenFileCache, error) {
if maxSize <= 0 { if maxCount <= 0 {
maxSize = 16384 maxCount = 16384
} }
var cache = &OpenFileCache{ var cache = &OpenFileCache{
maxSize: maxSize, maxCount: maxCount,
poolMap: map[string]*OpenFilePool{}, poolMap: map[string]*OpenFilePool{},
poolList: linkedlist.NewList[*OpenFilePool](), poolList: linkedlist.NewList[*OpenFilePool](),
capacitySize: (int64(utils.SystemMemoryGB()) << 30) / 16,
} }
watcher, err := fsnotify.NewWatcher() watcher, err := fsnotify.NewWatcher()
@@ -58,24 +68,36 @@ func (this *OpenFileCache) Get(filename string) *OpenFile {
pool, ok := this.poolMap[filename] pool, ok := this.poolMap[filename]
this.locker.RUnlock() this.locker.RUnlock()
if ok { if ok {
file, consumed := pool.Get() file, consumed, consumedSize := pool.Get()
if consumed { if consumed {
this.locker.Lock() this.locker.Lock()
this.count-- this.count--
this.usedSize -= consumedSize
// pool如果为空也不需要从列表中删除避免put时需要重新创建 // pool如果为空也不需要从列表中删除避免put时需要重新创建
this.locker.Unlock() this.locker.Unlock()
} }
return file return file
} }
return nil return nil
} }
func (this *OpenFileCache) Put(filename string, file *OpenFile) { func (this *OpenFileCache) Put(filename string, file *OpenFile) {
if file.size > maxOpenFileSize {
return
}
this.locker.Lock() this.locker.Lock()
defer this.locker.Unlock() defer this.locker.Unlock()
// 如果超过当前容量,则关闭最早的
if this.count >= this.maxCount || this.usedSize >= this.capacitySize {
this.consumeHead()
return
}
pool, ok := this.poolMap[filename] pool, ok := this.poolMap[filename]
var success bool var success bool
if ok { if ok {
@@ -92,35 +114,7 @@ func (this *OpenFileCache) Put(filename string, file *OpenFile) {
// 检查长度 // 检查长度
if success { if success {
this.count++ this.count++
this.usedSize += file.size
// 如果超过当前容量,则关闭最早的
if this.count > this.maxSize {
var delta = this.maxSize / 100 // 清理1%
if delta == 0 {
delta = 1
}
for i := 0; i < delta; i++ {
var head = this.poolList.Head()
if head == nil {
break
}
var headPool = head.Value
headFile, consumed := headPool.Get()
if consumed {
this.count--
if headFile != nil {
_ = headFile.Close()
}
}
if headPool.Len() == 0 {
delete(this.poolMap, headPool.filename)
this.poolList.Remove(head)
_ = this.watcher.Remove(headPool.filename)
}
}
}
} }
} }
@@ -136,6 +130,7 @@ func (this *OpenFileCache) Close(filename string) {
this.poolList.Remove(pool.linkItem) this.poolList.Remove(pool.linkItem)
_ = this.watcher.Remove(filename) _ = this.watcher.Remove(filename)
this.count -= pool.Len() this.count -= pool.Len()
this.usedSize -= pool.usedSize
} }
this.locker.Unlock() this.locker.Unlock()
@@ -155,14 +150,19 @@ func (this *OpenFileCache) CloseAll() {
this.poolList.Reset() this.poolList.Reset()
_ = this.watcher.Close() _ = this.watcher.Close()
this.count = 0 this.count = 0
this.usedSize = 0
this.locker.Unlock() this.locker.Unlock()
} }
func (this *OpenFileCache) SetCapacity(capacityBytes int64) {
this.capacitySize = capacityBytes
}
func (this *OpenFileCache) Debug() { func (this *OpenFileCache) Debug() {
var ticker = time.NewTicker(5 * time.Second) var ticker = time.NewTicker(5 * time.Second)
goman.New(func() { goman.New(func() {
for range ticker.C { for range ticker.C {
logs.Println("==== " + types.String(this.count) + " ====") logs.Println("==== " + types.String(this.count) + ", " + fmt.Sprintf("%.4fMB", float64(this.usedSize)/(1<<20)) + " ====")
this.poolList.Range(func(item *linkedlist.Item[*OpenFilePool]) (goNext bool) { this.poolList.Range(func(item *linkedlist.Item[*OpenFilePool]) (goNext bool) {
logs.Println(filepath.Base(item.Value.Filename()), item.Value.Len()) logs.Println(filepath.Base(item.Value.Filename()), item.Value.Len())
return true return true
@@ -170,3 +170,35 @@ func (this *OpenFileCache) Debug() {
} }
}) })
} }
func (this *OpenFileCache) consumeHead() {
var delta = 1
if this.count > 100 {
delta = 2
}
for i := 0; i < delta; i++ {
var head = this.poolList.Head()
if head == nil {
break
}
var headPool = head.Value
headFile, consumed, consumedSize := headPool.Get()
if consumed {
this.count--
this.usedSize -= consumedSize
if headFile != nil {
_ = headFile.Close()
}
}
if headPool.Len() == 0 {
delete(this.poolMap, headPool.filename)
this.poolList.Remove(head)
_ = this.watcher.Remove(headPool.filename)
}
}
}

View File

@@ -5,6 +5,7 @@ package caches_test
import ( import (
"github.com/TeaOSLab/EdgeNode/internal/caches" "github.com/TeaOSLab/EdgeNode/internal/caches"
"github.com/TeaOSLab/EdgeNode/internal/utils/testutils" "github.com/TeaOSLab/EdgeNode/internal/utils/testutils"
"github.com/iwind/TeaGo/types"
"testing" "testing"
"time" "time"
) )
@@ -15,13 +16,14 @@ func TestNewOpenFileCache_Close(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
cache.Debug() cache.Debug()
cache.Put("a.txt", caches.NewOpenFile(nil, nil, nil, 0)) cache.Put("a.txt", caches.NewOpenFile(nil, nil, nil, 0, 1<<20))
cache.Put("b.txt", caches.NewOpenFile(nil, nil, nil, 0)) cache.Put("b.txt", caches.NewOpenFile(nil, nil, nil, 0, 1<<20))
cache.Put("b.txt", caches.NewOpenFile(nil, nil, nil, 0)) cache.Put("b.txt", caches.NewOpenFile(nil, nil, nil, 0, 1<<20))
cache.Put("b.txt", caches.NewOpenFile(nil, nil, nil, 0)) cache.Put("b.txt", caches.NewOpenFile(nil, nil, nil, 0, 1<<20))
cache.Put("c.txt", caches.NewOpenFile(nil, nil, nil, 0)) cache.Put("c.txt", caches.NewOpenFile(nil, nil, nil, 0, 1<<20))
cache.Get("b.txt") cache.Get("b.txt")
cache.Get("d.txt") cache.Get("d.txt") // not exist
cache.Close("a.txt") cache.Close("a.txt")
if testutils.IsSingleTesting() { if testutils.IsSingleTesting() {
@@ -29,15 +31,34 @@ func TestNewOpenFileCache_Close(t *testing.T) {
} }
} }
func TestNewOpenFileCache_OverSize(t *testing.T) {
cache, err := caches.NewOpenFileCache(1024)
if err != nil {
t.Fatal(err)
}
cache.SetCapacity(1 << 30)
cache.Debug()
for i := 0; i < 100; i++ {
cache.Put("a"+types.String(i)+".txt", caches.NewOpenFile(nil, nil, nil, 0, 128<<20))
}
if testutils.IsSingleTesting() {
time.Sleep(100 * time.Second)
}
}
func TestNewOpenFileCache_CloseAll(t *testing.T) { func TestNewOpenFileCache_CloseAll(t *testing.T) {
cache, err := caches.NewOpenFileCache(1024) cache, err := caches.NewOpenFileCache(1024)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
cache.Debug() cache.Debug()
cache.Put("a.txt", caches.NewOpenFile(nil, nil, nil, 0)) cache.Put("a.txt", caches.NewOpenFile(nil, nil, nil, 0, 1))
cache.Put("b.txt", caches.NewOpenFile(nil, nil, nil, 0)) cache.Put("b.txt", caches.NewOpenFile(nil, nil, nil, 0, 1))
cache.Put("c.txt", caches.NewOpenFile(nil, nil, nil, 0)) cache.Put("c.txt", caches.NewOpenFile(nil, nil, nil, 0, 1))
cache.Get("b.txt") cache.Get("b.txt")
cache.Get("d.txt") cache.Get("d.txt")
cache.CloseAll() cache.CloseAll()

View File

@@ -13,6 +13,7 @@ type OpenFilePool struct {
filename string filename string
version int64 version int64
isClosed bool isClosed bool
usedSize int64
} }
func NewOpenFilePool(filename string) *OpenFilePool { func NewOpenFilePool(filename string) *OpenFilePool {
@@ -29,27 +30,29 @@ func (this *OpenFilePool) Filename() string {
return this.filename return this.filename
} }
func (this *OpenFilePool) Get() (*OpenFile, bool) { func (this *OpenFilePool) Get() (resultFile *OpenFile, consumed bool, consumedSize int64) {
// 如果已经关闭,直接返回 // 如果已经关闭,直接返回
if this.isClosed { if this.isClosed {
return nil, false return nil, false, 0
} }
select { select {
case file := <-this.c: case file := <-this.c:
if file != nil { if file != nil {
this.usedSize -= file.size
err := file.SeekStart() err := file.SeekStart()
if err != nil { if err != nil {
_ = file.Close() _ = file.Close()
return nil, true return nil, true, file.size
} }
file.version = this.version file.version = this.version
return file, true return file, true, file.size
} }
return nil, false return nil, false, 0
default: default:
return nil, false return nil, false, 0
} }
} }
@@ -69,6 +72,7 @@ func (this *OpenFilePool) Put(file *OpenFile) bool {
// 加入Pool // 加入Pool
select { select {
case this.c <- file: case this.c <- file:
this.usedSize += file.size
return true return true
default: default:
// 多余的直接关闭 // 多余的直接关闭
@@ -81,6 +85,10 @@ func (this *OpenFilePool) Len() int {
return len(this.c) return len(this.c)
} }
func (this *OpenFilePool) TotalSize() int64 {
return this.usedSize
}
func (this *OpenFilePool) SetClosing() { func (this *OpenFilePool) SetClosing() {
this.isClosed = true this.isClosed = true
} }

View File

@@ -13,15 +13,15 @@ func TestOpenFilePool_Get(t *testing.T) {
var pool = caches.NewOpenFilePool("a") var pool = caches.NewOpenFilePool("a")
t.Log(pool.Filename()) t.Log(pool.Filename())
t.Log(pool.Get()) t.Log(pool.Get())
t.Log(pool.Put(caches.NewOpenFile(nil, nil, nil, 0))) t.Log(pool.Put(caches.NewOpenFile(nil, nil, nil, 0, 1)))
t.Log(pool.Get()) t.Log(pool.Get())
t.Log(pool.Get()) t.Log(pool.Get())
} }
func TestOpenFilePool_Close(t *testing.T) { func TestOpenFilePool_Close(t *testing.T) {
var pool = caches.NewOpenFilePool("a") var pool = caches.NewOpenFilePool("a")
pool.Put(caches.NewOpenFile(nil, nil, nil, 0)) pool.Put(caches.NewOpenFile(nil, nil, nil, 0, 1))
pool.Put(caches.NewOpenFile(nil, nil, nil, 0)) pool.Put(caches.NewOpenFile(nil, nil, nil, 0, 1))
pool.Close() pool.Close()
} }
@@ -35,7 +35,7 @@ func TestOpenFilePool_Concurrent(t *testing.T) {
defer wg.Done() defer wg.Done()
if rands.Int(0, 1) == 1 { if rands.Int(0, 1) == 1 {
pool.Put(caches.NewOpenFile(nil, nil, nil, 0)) pool.Put(caches.NewOpenFile(nil, nil, nil, 0, 1))
} }
if rands.Int(0, 1) == 0 { if rands.Int(0, 1) == 0 {
pool.Get() pool.Get()

View File

@@ -366,7 +366,7 @@ func (this *FileReader) Close() error {
} else { } else {
var cacheMeta = make([]byte, len(this.meta)) var cacheMeta = make([]byte, len(this.meta))
copy(cacheMeta, this.meta) copy(cacheMeta, this.meta)
this.openFileCache.Put(this.fp.Name(), NewOpenFile(this.fp, cacheMeta, this.header, this.LastModified())) this.openFileCache.Put(this.fp.Name(), NewOpenFile(this.fp, cacheMeta, this.header, this.LastModified(), this.bodySize))
} }
return nil return nil
} }