diff --git a/internal/caches/memory_fragment_pool.go b/internal/caches/memory_fragment_pool.go deleted file mode 100644 index 6a13b63..0000000 --- a/internal/caches/memory_fragment_pool.go +++ /dev/null @@ -1,375 +0,0 @@ -// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . - -package caches - -import ( - teaconst "github.com/TeaOSLab/EdgeNode/internal/const" - "github.com/TeaOSLab/EdgeNode/internal/goman" - "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" - memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem" - "github.com/iwind/TeaGo/logs" - "os" - "sync" - "sync/atomic" - "time" -) - -const ( - enableFragmentPool = false - minMemoryFragmentPoolItemSize = 8 << 10 - maxMemoryFragmentPoolItemSize = 128 << 20 - maxItemsInMemoryFragmentPoolBucket = 1024 - memoryFragmentPoolBucketSegmentSize = 512 << 10 - maxMemoryFragmentPoolItemAgeSeconds = 60 -) - -var SharedFragmentMemoryPool *MemoryFragmentPool - -func init() { - if !teaconst.IsMain { - return - } - - SharedFragmentMemoryPool = NewMemoryFragmentPool() - - goman.New(func() { - var ticker = time.NewTicker(200 * time.Millisecond) - for range ticker.C { - for i := 0; i < 10; i++ { // skip N empty buckets - var isEmpty = SharedFragmentMemoryPool.GCNextBucket() - if !isEmpty { - break - } - } - } - }) -} - -type MemoryFragmentPoolItem struct { - Bytes []byte - - size int64 - createdAt int64 - - Refs int32 -} - -func (this *MemoryFragmentPoolItem) IsExpired() bool { - return this.createdAt < fasttime.Now().Unix()-maxMemoryFragmentPoolItemAgeSeconds -} - -func (this *MemoryFragmentPoolItem) Reset() { - this.Bytes = nil -} - -func (this *MemoryFragmentPoolItem) IsAvailable() bool { - return atomic.AddInt32(&this.Refs, 1) == 1 -} - -// MemoryFragmentPool memory fragments management -type MemoryFragmentPool struct { - bucketMaps []map[uint64]*MemoryFragmentPoolItem // [ { id => Zero }, ... ] - countBuckets int - gcBucketIndex int - - mu sync.RWMutex - - id uint64 - totalMemory int64 - - isOk bool - capacity int64 - - debugMode bool - countGet uint64 - countNew uint64 -} - -// NewMemoryFragmentPool create new fragment memory pool -func NewMemoryFragmentPool() *MemoryFragmentPool { - var pool = &MemoryFragmentPool{} - pool.init() - return pool -} - -func (this *MemoryFragmentPool) init() { - var capacity = int64(memutils.SystemMemoryGB()) << 30 / 16 - if capacity > 256<<20 { - this.isOk = true - this.capacity = capacity - - this.bucketMaps = []map[uint64]*MemoryFragmentPoolItem{} - for i := 0; i < maxMemoryFragmentPoolItemSize/memoryFragmentPoolBucketSegmentSize+1; i++ { - this.bucketMaps = append(this.bucketMaps, map[uint64]*MemoryFragmentPoolItem{}) - } - this.countBuckets = len(this.bucketMaps) - } - - // print statistics for debug - if len(os.Getenv("GOEDGE_DEBUG_MEMORY_FRAGMENT_POOL")) > 0 { - this.debugMode = true - - go func() { - var maxRounds = 10_000 - var ticker = time.NewTicker(10 * time.Second) - for range ticker.C { - logs.Println("reused:", this.countGet, "created:", this.countNew, "fragments:", this.Len(), "memory:", this.totalMemory>>20, "MB") - - maxRounds-- - if maxRounds <= 0 { - break - } - } - }() - } -} - -// Get try to get a bytes object -func (this *MemoryFragmentPool) Get(expectSize int64) (resultBytes []byte, ok bool) { - if !this.isOk { - return - } - - if expectSize <= 0 { - return - } - - // DO NOT check min segment size - - this.mu.RLock() - - var bucketIndex = this.bucketIndexForSize(expectSize) - var resultItemId uint64 - const maxSearchingBuckets = 20 - for i := bucketIndex; i <= bucketIndex+maxSearchingBuckets; i++ { - resultBytes, resultItemId, ok = this.findItemInMap(this.bucketMaps[i], expectSize) - if ok { - this.mu.RUnlock() - - // remove from bucket - this.mu.Lock() - delete(this.bucketMaps[i], resultItemId) - this.mu.Unlock() - - return - } - if i >= this.countBuckets { - break - } - } - this.mu.RUnlock() - - return -} - -// Put a bytes object to specified bucket -func (this *MemoryFragmentPool) Put(data []byte) (ok bool) { - if !this.isOk { - return - } - - var l = int64(cap(data)) // MUST be 'cap' instead of 'len' - - if l < minMemoryFragmentPoolItemSize || l > maxMemoryFragmentPoolItemSize { - return - } - - if atomic.LoadInt64(&this.totalMemory) >= this.capacity { - return - } - - var itemId = atomic.AddUint64(&this.id, 1) - - this.mu.Lock() - defer this.mu.Unlock() - - var bucketMap = this.bucketMaps[this.bucketIndexForSize(l)] - if len(bucketMap) >= maxItemsInMemoryFragmentPoolBucket { - return - } - - atomic.AddInt64(&this.totalMemory, l) - - bucketMap[itemId] = &MemoryFragmentPoolItem{ - Bytes: data, - size: l, - createdAt: fasttime.Now().Unix(), - } - - return true -} - -// GC fully GC -func (this *MemoryFragmentPool) GC() { - if !this.isOk { - return - } - - var totalMemory = atomic.LoadInt64(&this.totalMemory) - if totalMemory < this.capacity { - return - } - - this.mu.Lock() - defer this.mu.Unlock() - - var garbageSize = totalMemory * 1 / 10 // 10% - - // remove expired - for _, bucketMap := range this.bucketMaps { - for itemId, item := range bucketMap { - if item.IsExpired() { - delete(bucketMap, itemId) - item.Reset() - atomic.AddInt64(&this.totalMemory, -item.size) - - garbageSize -= item.size - } - } - } - - // remove others - if garbageSize > 0 { - for _, bucketMap := range this.bucketMaps { - for itemId, item := range bucketMap { - delete(bucketMap, itemId) - item.Reset() - atomic.AddInt64(&this.totalMemory, -item.size) - - garbageSize -= item.size - if garbageSize <= 0 { - break - } - } - } - } -} - -// GCNextBucket gc one bucket -func (this *MemoryFragmentPool) GCNextBucket() (isEmpty bool) { - if !this.isOk { - return - } - - var itemIds = []uint64{} - - // find - this.mu.RLock() - - var bucketIndex = this.gcBucketIndex - var bucketMap = this.bucketMaps[bucketIndex] - isEmpty = len(bucketMap) == 0 - if isEmpty { - this.mu.RUnlock() - - // move to next bucket index - bucketIndex++ - if bucketIndex >= this.countBuckets { - bucketIndex = 0 - } - this.gcBucketIndex = bucketIndex - - return - } - - for itemId, item := range bucketMap { - if item.IsExpired() { - itemIds = append(itemIds, itemId) - } - } - - this.mu.RUnlock() - - // remove - if len(itemIds) > 0 { - this.mu.Lock() - for _, itemId := range itemIds { - item, ok := bucketMap[itemId] - if !ok { - continue - } - if !item.IsAvailable() { - continue - } - delete(bucketMap, itemId) - item.Reset() - atomic.AddInt64(&this.totalMemory, -item.size) - } - this.mu.Unlock() - } - - // move to next bucket index - bucketIndex++ - if bucketIndex >= this.countBuckets { - bucketIndex = 0 - } - this.gcBucketIndex = bucketIndex - - return -} - -func (this *MemoryFragmentPool) SetCapacity(capacity int64) { - this.capacity = capacity -} - -func (this *MemoryFragmentPool) TotalSize() int64 { - return atomic.LoadInt64(&this.totalMemory) -} - -func (this *MemoryFragmentPool) Len() int { - this.mu.Lock() - defer this.mu.Unlock() - var count = 0 - for _, bucketMap := range this.bucketMaps { - count += len(bucketMap) - } - return count -} - -func (this *MemoryFragmentPool) IncreaseNew() { - if this.isOk && this.debugMode { - atomic.AddUint64(&this.countNew, 1) - } -} - -func (this *MemoryFragmentPool) bucketIndexForSize(size int64) int { - return int(size / memoryFragmentPoolBucketSegmentSize) -} - -func (this *MemoryFragmentPool) findItemInMap(bucketMap map[uint64]*MemoryFragmentPoolItem, expectSize int64) (resultBytes []byte, resultItemId uint64, ok bool) { - if len(bucketMap) == 0 { - return - } - - for itemId, item := range bucketMap { - if item.size >= expectSize { - // check if is referred - if !item.IsAvailable() { - continue - } - - // return result - if item.size != expectSize { - resultBytes = item.Bytes[:expectSize] - } else { - resultBytes = item.Bytes - } - - // reset old item - item.Reset() - atomic.AddInt64(&this.totalMemory, -item.size) - - resultItemId = itemId - - if this.debugMode { - atomic.AddUint64(&this.countGet, 1) - } - - ok = true - - return - } - } - - return -} diff --git a/internal/caches/memory_fragment_pool_test.go b/internal/caches/memory_fragment_pool_test.go deleted file mode 100644 index 13ecc25..0000000 --- a/internal/caches/memory_fragment_pool_test.go +++ /dev/null @@ -1,313 +0,0 @@ -// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . - -package caches_test - -import ( - "bytes" - "github.com/TeaOSLab/EdgeNode/internal/caches" - "github.com/TeaOSLab/EdgeNode/internal/utils/testutils" - "github.com/iwind/TeaGo/assert" - "github.com/iwind/TeaGo/rands" - timeutil "github.com/iwind/TeaGo/utils/time" - "runtime" - "sync" - "sync/atomic" - "testing" - "time" -) - -func TestNewMemoryFragmentPool(t *testing.T) { - var a = assert.NewAssertion(t) - - var pool = caches.NewMemoryFragmentPool() - for i := 0; i < 3000; i++ { - ok := pool.Put(make([]byte, 2<<20)) - if !ok { - t.Log("finished at", i) - break - } - } - - t.Log(pool.TotalSize()>>20, "MB", pool.Len(), "items") - - { - r, ok := pool.Get(1 << 20) - a.IsTrue(ok) - a.IsTrue(len(r) == 1<<20) - } - - { - r, ok := pool.Get(2 << 20) - a.IsTrue(ok) - a.IsTrue(len(r) == 2<<20) - } - - { - r, ok := pool.Get(4 << 20) - a.IsFalse(ok) - a.IsTrue(len(r) == 0) - } - - t.Log(pool.TotalSize()>>20, "MB", pool.Len(), "items") -} - -func TestNewMemoryFragmentPool_LargeBucket(t *testing.T) { - var a = assert.NewAssertion(t) - - var pool = caches.NewMemoryFragmentPool() - { - pool.Put(make([]byte, 128<<20+1)) - a.IsTrue(pool.Len() == 0) - } - - { - pool.Put(make([]byte, 128<<20)) - a.IsTrue(pool.Len() == 1) - - pool.Get(118 << 20) - a.IsTrue(pool.Len() == 0) - } - - { - pool.Put(make([]byte, 128<<20)) - a.IsTrue(pool.Len() == 1) - - pool.Get(110 << 20) - a.IsTrue(pool.Len() == 1) - } -} - -func TestMemoryFragmentPool_Get_Exactly(t *testing.T) { - var a = assert.NewAssertion(t) - - var pool = caches.NewMemoryFragmentPool() - { - pool.Put(make([]byte, 129<<20)) - a.IsTrue(pool.Len() == 0) - } - - { - pool.Put(make([]byte, 4<<20)) - a.IsTrue(pool.Len() == 1) - } - - { - pool.Get(4 << 20) - a.IsTrue(pool.Len() == 0) - } -} - -func TestMemoryFragmentPool_Get_Round(t *testing.T) { - var a = assert.NewAssertion(t) - - var pool = caches.NewMemoryFragmentPool() - { - pool.Put(make([]byte, 8<<20)) - pool.Put(make([]byte, 8<<20)) - pool.Put(make([]byte, 8<<20)) - a.IsTrue(pool.Len() == 3) - } - - { - resultBytes, ok := pool.Get(3 << 20) - a.IsTrue(pool.Len() == 2) - if ok { - pool.Put(resultBytes) - } - } - - { - pool.Get(2 << 20) - a.IsTrue(pool.Len() == 2) - } - - { - pool.Get(1 << 20) - a.IsTrue(pool.Len() == 1) - } -} - -func TestMemoryFragmentPool_GC(t *testing.T) { - var pool = caches.NewMemoryFragmentPool() - pool.SetCapacity(32 << 20) - for i := 0; i < 16; i++ { - pool.Put(make([]byte, 4<<20)) - } - var before = time.Now() - pool.GC() - t.Log(time.Since(before).Seconds()*1000, "ms") - t.Log(pool.Len()) -} - -func TestMemoryFragmentPool_Memory(t *testing.T) { - if !testutils.IsSingleTesting() { - return - } - - var pool = caches.NewMemoryFragmentPool() - - testutils.StartMemoryStats(t, func() { - t.Log(pool.Len(), "items") - }) - - var sampleData = bytes.Repeat([]byte{'A'}, 16<<20) - - var countNew = 0 - for i := 0; i < 1000; i++ { - cacheData, ok := pool.Get(16 << 20) - if ok { - copy(cacheData, sampleData) - pool.Put(cacheData) - } else { - countNew++ - var data = make([]byte, 16<<20) - copy(data, sampleData) - pool.Put(data) - } - } - - t.Log("count new:", countNew) - t.Log("count remains:", pool.Len()) - - time.Sleep(10 * time.Minute) -} - -func TestMemoryFragmentPool_GCNextBucket(t *testing.T) { - if !testutils.IsSingleTesting() { - return - } - - var pool = caches.NewMemoryFragmentPool() - for i := 0; i < 1000; i++ { - pool.Put(make([]byte, rands.Int(0, 100)<<20)) - } - - var lastLen int - for { - pool.GCNextBucket() - var currentLen = pool.Len() - if lastLen == currentLen { - continue - } - lastLen = currentLen - - t.Log(currentLen, "items", pool.TotalSize(), "bytes", timeutil.Format("H:i:s")) - time.Sleep(100 * time.Millisecond) - - if currentLen == 0 { - break - } - } -} - -func TestMemoryFragmentPoolItem(t *testing.T) { - var a = assert.NewAssertion(t) - - var m = map[int]*caches.MemoryFragmentPoolItem{} - m[1] = &caches.MemoryFragmentPoolItem{ - Refs: 0, - } - var item = m[1] - a.IsTrue(item.Refs == 0) - a.IsTrue(atomic.AddInt32(&item.Refs, 1) == 1) - - for _, item2 := range m { - t.Log(item2) - a.IsTrue(atomic.AddInt32(&item2.Refs, 1) == 2) - } - - t.Log(m) -} - -func BenchmarkMemoryFragmentPool_Get_HIT(b *testing.B) { - runtime.GOMAXPROCS(4) - - var pool = caches.NewMemoryFragmentPool() - for i := 0; i < 3000; i++ { - ok := pool.Put(make([]byte, 2<<20)) - if !ok { - break - } - } - - b.ResetTimer() - - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - data, ok := pool.Get(2 << 20) - if ok { - pool.Put(data) - } - } - }) -} - -func BenchmarkMemoryFragmentPool_Get_TOTALLY_MISSING(b *testing.B) { - runtime.GOMAXPROCS(4) - - var pool = caches.NewMemoryFragmentPool() - for i := 0; i < 3000; i++ { - ok := pool.Put(make([]byte, 2<<20+100)) - if !ok { - break - } - } - - b.ResetTimer() - - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - data, ok := pool.Get(2<<20 + 200) - if ok { - pool.Put(data) - } - } - }) -} - -func BenchmarkMemoryPool_Get_HIT_MISSING(b *testing.B) { - runtime.GOMAXPROCS(4) - - var pool = caches.NewMemoryFragmentPool() - for i := 0; i < 3000; i++ { - ok := pool.Put(make([]byte, rands.Int(2, 32)<<20)) - if !ok { - break - } - } - - b.ResetTimer() - - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - data, ok := pool.Get(4 << 20) - if ok { - pool.Put(data) - } - } - }) -} - -func BenchmarkMemoryFragmentPool_GC(b *testing.B) { - runtime.GOMAXPROCS(4) - - var pool = caches.NewMemoryFragmentPool() - pool.SetCapacity(1 << 30) - for i := 0; i < 2_000; i++ { - pool.Put(make([]byte, 1<<20)) - } - - var mu = sync.Mutex{} - - b.ResetTimer() - - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - mu.Lock() - for i := 0; i < 100; i++ { - pool.GCNextBucket() - } - mu.Unlock() - } - }) -} diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index 3b8b469..8ce8cef 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -526,11 +526,6 @@ func (this *MemoryStorage) flushItem(key string) { // 从内存中移除,并确保无论如何都会执行 defer func() { _ = this.Delete(key) - - // 重用内存,前提是确保内存不再被引用 - if enableFragmentPool && ok && item.IsDone && !item.isReferring && len(item.BodyValue) > 0 { - SharedFragmentMemoryPool.Put(item.BodyValue) - } }() if !ok { diff --git a/internal/caches/writer_memory.go b/internal/caches/writer_memory.go index 94adbaf..f578adc 100644 --- a/internal/caches/writer_memory.go +++ b/internal/caches/writer_memory.go @@ -32,27 +32,11 @@ func NewMemoryWriter(memoryStorage *MemoryStorage, key string, expiredAt int64, ModifiedAt: fasttime.Now().Unix(), Status: status, } - if enableFragmentPool && - expectedBodySize > 0 && - expectedBodySize <= maxMemoryFragmentPoolItemSize { - bodyBytes, ok := SharedFragmentMemoryPool.Get(expectedBodySize) // try to reuse memory - if ok { - valueItem.BodyValue = bodyBytes - valueItem.IsPrepared = true - } else { - if expectedBodySize <= (16 << 20) { - var allocSize = (expectedBodySize/16384 + 1) * 16384 - valueItem.BodyValue = make([]byte, allocSize)[:expectedBodySize] - valueItem.IsPrepared = true - SharedFragmentMemoryPool.IncreaseNew() - } - } - } else { - if expectedBodySize > 0 { - valueItem.BodyValue = make([]byte, 0, expectedBodySize) - } + if expectedBodySize > 0 { + valueItem.BodyValue = make([]byte, 0, expectedBodySize) } + var w = &MemoryWriter{ storage: memoryStorage, key: key, @@ -172,13 +156,6 @@ func (this *MemoryWriter) Discard() error { this.storage.locker.Lock() delete(this.storage.valuesMap, this.hash) - if enableFragmentPool && - this.item != nil && - !this.item.isReferring && - cap(this.item.BodyValue) >= minMemoryFragmentPoolItemSize { - SharedFragmentMemoryPool.Put(this.item.BodyValue) - } - this.storage.locker.Unlock() return nil }