diff --git a/internal/caches/memory_fragment_pool.go b/internal/caches/memory_fragment_pool.go new file mode 100644 index 0000000..7de47f3 --- /dev/null +++ b/internal/caches/memory_fragment_pool.go @@ -0,0 +1,353 @@ +// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package caches + +import ( + teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "github.com/TeaOSLab/EdgeNode/internal/goman" + "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" + "github.com/iwind/TeaGo/logs" + "os" + "sync" + "sync/atomic" + "time" +) + +const ( + minMemoryFragmentPoolItemSize = 8 << 10 + maxMemoryFragmentPoolItemSize = 128 << 20 + maxItemsInMemoryFragmentPoolBucket = 1024 + memoryFragmentPoolBucketSegmentSize = 512 << 10 + maxMemoryFragmentPoolItemAgeSeconds = 60 +) + +var SharedFragmentMemoryPool = NewMemoryFragmentPool() + +func init() { + if !teaconst.IsMain { + return + } + + goman.New(func() { + var ticker = time.NewTicker(100 * time.Millisecond) + for range ticker.C { + SharedFragmentMemoryPool.GCNextBucket() + } + }) +} + +type MemoryFragmentPoolItem struct { + Bytes []byte + + size int64 + createdAt int64 + + Refs int32 +} + +func (this *MemoryFragmentPoolItem) IsExpired() bool { + return this.createdAt < fasttime.Now().Unix()-maxMemoryFragmentPoolItemAgeSeconds +} + +func (this *MemoryFragmentPoolItem) Reset() { + this.Bytes = nil +} + +func (this *MemoryFragmentPoolItem) IsAvailable() bool { + return atomic.AddInt32(&this.Refs, 1) == 1 +} + +// MemoryFragmentPool memory fragments management +type MemoryFragmentPool struct { + bucketMaps []map[uint64]*MemoryFragmentPoolItem // [ { id => Zero }, ... ] + countBuckets int + gcBucketIndex int + + mu sync.RWMutex + + id uint64 + totalMemory int64 + + isOk bool + capacity int64 + + debugMode bool + countGet uint64 + countNew uint64 +} + +// NewMemoryFragmentPool create new fragment memory pool +func NewMemoryFragmentPool() *MemoryFragmentPool { + var pool = &MemoryFragmentPool{} + pool.init() + return pool +} + +func (this *MemoryFragmentPool) init() { + var capacity = int64(utils.SystemMemoryGB()) << 30 / 16 + if capacity > 256<<20 { + this.isOk = true + this.capacity = capacity + + this.bucketMaps = []map[uint64]*MemoryFragmentPoolItem{} + for i := 0; i < maxMemoryFragmentPoolItemSize/memoryFragmentPoolBucketSegmentSize+1; i++ { + this.bucketMaps = append(this.bucketMaps, map[uint64]*MemoryFragmentPoolItem{}) + } + this.countBuckets = len(this.bucketMaps) + } + + // print statistics for debug + if len(os.Getenv("GOEDGE_DEBUG_MEMORY_FRAGMENT_POOL")) > 0 { + this.debugMode = true + + go func() { + var maxRounds = 10_000 + var ticker = time.NewTicker(10 * time.Second) + for range ticker.C { + logs.Println("reused:", this.countGet, "created:", this.countNew, "fragments:", this.Len(), "memory:", this.totalMemory>>20, "MB") + + maxRounds-- + if maxRounds <= 0 { + break + } + } + }() + } +} + +// Get try to get a bytes object +func (this *MemoryFragmentPool) Get(expectSize int64) (resultBytes []byte, ok bool) { + if !this.isOk { + return + } + + if expectSize <= 0 { + return + } + + if expectSize < minMemoryFragmentPoolItemSize { + return + } + + this.mu.RLock() + + var bucketIndex = this.bucketIndexForSize(expectSize) + var resultItemId uint64 + const maxSearchingBuckets = 20 + for i := bucketIndex; i <= bucketIndex+maxSearchingBuckets; i++ { + resultBytes, resultItemId, ok = this.findItemInMap(this.bucketMaps[i], expectSize) + if ok { + this.mu.RUnlock() + + // remove from bucket + this.mu.Lock() + delete(this.bucketMaps[i], resultItemId) + this.mu.Unlock() + + return + } + if i >= this.countBuckets { + break + } + } + this.mu.RUnlock() + + return +} + +// Put a bytes object to specified bucket +func (this *MemoryFragmentPool) Put(data []byte) (ok bool) { + if !this.isOk { + return + } + + var l = int64(cap(data)) // MUST be 'cap' instead of 'len' + + if l < minMemoryFragmentPoolItemSize || l > maxMemoryFragmentPoolItemSize { + return + } + + if atomic.LoadInt64(&this.totalMemory) >= this.capacity { + return + } + + var itemId = atomic.AddUint64(&this.id, 1) + + this.mu.Lock() + defer this.mu.Unlock() + + var bucketMap = this.bucketMaps[this.bucketIndexForSize(l)] + if len(bucketMap) >= maxItemsInMemoryFragmentPoolBucket { + return + } + + atomic.AddInt64(&this.totalMemory, l) + + bucketMap[itemId] = &MemoryFragmentPoolItem{ + Bytes: data, + size: l, + createdAt: fasttime.Now().Unix(), + } + + return true +} + +// GC fully GC +func (this *MemoryFragmentPool) GC() { + if !this.isOk { + return + } + + var totalMemory = atomic.LoadInt64(&this.totalMemory) + if totalMemory < this.capacity { + return + } + + this.mu.Lock() + defer this.mu.Unlock() + + var garbageSize = totalMemory * 1 / 10 // 10% + + // remove expired + for _, bucketMap := range this.bucketMaps { + for itemId, item := range bucketMap { + if item.IsExpired() { + delete(bucketMap, itemId) + item.Reset() + atomic.AddInt64(&this.totalMemory, -item.size) + + garbageSize -= item.size + } + } + } + + // remove others + if garbageSize > 0 { + for _, bucketMap := range this.bucketMaps { + for itemId, item := range bucketMap { + delete(bucketMap, itemId) + item.Reset() + atomic.AddInt64(&this.totalMemory, -item.size) + + garbageSize -= item.size + if garbageSize <= 0 { + break + } + } + } + } +} + +// GCNextBucket gc one bucket +func (this *MemoryFragmentPool) GCNextBucket() { + if !this.isOk { + return + } + + var itemIds = []uint64{} + + // find + this.mu.RLock() + + var bucketIndex = this.gcBucketIndex + var bucketMap = this.bucketMaps[bucketIndex] + for itemId, item := range bucketMap { + if item.IsExpired() { + itemIds = append(itemIds, itemId) + } + } + + this.mu.RUnlock() + + // remove + if len(itemIds) > 0 { + this.mu.Lock() + for _, itemId := range itemIds { + item, ok := bucketMap[itemId] + if !ok { + continue + } + if !item.IsAvailable() { + continue + } + delete(bucketMap, itemId) + item.Reset() + atomic.AddInt64(&this.totalMemory, -item.size) + } + this.mu.Unlock() + } + + // move to next bucket index + bucketIndex++ + if bucketIndex >= len(this.bucketMaps) { + bucketIndex = 0 + } + this.gcBucketIndex = bucketIndex +} + +func (this *MemoryFragmentPool) SetCapacity(capacity int64) { + this.capacity = capacity +} + +func (this *MemoryFragmentPool) TotalSize() int64 { + return atomic.LoadInt64(&this.totalMemory) +} + +func (this *MemoryFragmentPool) Len() int { + this.mu.Lock() + defer this.mu.Unlock() + var count = 0 + for _, bucketMap := range this.bucketMaps { + count += len(bucketMap) + } + return count +} + +func (this *MemoryFragmentPool) IncreaseNew() { + if this.isOk && this.debugMode { + atomic.AddUint64(&this.countNew, 1) + } +} + +func (this *MemoryFragmentPool) bucketIndexForSize(size int64) int { + return int(size / memoryFragmentPoolBucketSegmentSize) +} + +func (this *MemoryFragmentPool) findItemInMap(bucketMap map[uint64]*MemoryFragmentPoolItem, expectSize int64) (resultBytes []byte, resultItemId uint64, ok bool) { + if len(bucketMap) == 0 { + return + } + + for itemId, item := range bucketMap { + if item.size >= expectSize { + // check if is referred + if !item.IsAvailable() { + continue + } + + // return result + if item.size != expectSize { + resultBytes = item.Bytes[:expectSize] + } else { + resultBytes = item.Bytes + } + + // reset old item + item.Reset() + atomic.AddInt64(&this.totalMemory, -item.size) + + resultItemId = itemId + + if this.debugMode { + atomic.AddUint64(&this.countGet, 1) + } + + ok = true + + return + } + } + + return +} diff --git a/internal/caches/memory_fragment_pool_test.go b/internal/caches/memory_fragment_pool_test.go new file mode 100644 index 0000000..89ea522 --- /dev/null +++ b/internal/caches/memory_fragment_pool_test.go @@ -0,0 +1,302 @@ +// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package caches_test + +import ( + "bytes" + "github.com/TeaOSLab/EdgeNode/internal/caches" + "github.com/TeaOSLab/EdgeNode/internal/utils/testutils" + "github.com/iwind/TeaGo/assert" + "github.com/iwind/TeaGo/rands" + timeutil "github.com/iwind/TeaGo/utils/time" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestNewMemoryFragmentPool(t *testing.T) { + var a = assert.NewAssertion(t) + + var pool = caches.NewMemoryFragmentPool() + for i := 0; i < 3000; i++ { + ok := pool.Put(make([]byte, 2<<20)) + if !ok { + t.Log("finished at", i) + break + } + } + + t.Log(pool.TotalSize()>>20, "MB", pool.Len(), "items") + + { + r, ok := pool.Get(1 << 20) + a.IsTrue(ok) + a.IsTrue(len(r) == 1<<20) + } + + { + r, ok := pool.Get(2 << 20) + a.IsTrue(ok) + a.IsTrue(len(r) == 2<<20) + } + + { + r, ok := pool.Get(4 << 20) + a.IsFalse(ok) + a.IsTrue(len(r) == 0) + } + + t.Log(pool.TotalSize()>>20, "MB", pool.Len(), "items") +} + +func TestNewMemoryFragmentPool_LargeBucket(t *testing.T) { + var a = assert.NewAssertion(t) + + var pool = caches.NewMemoryFragmentPool() + { + pool.Put(make([]byte, 128<<20+1)) + a.IsTrue(pool.Len() == 0) + } + + { + pool.Put(make([]byte, 128<<20)) + a.IsTrue(pool.Len() == 1) + + pool.Get(118 << 20) + a.IsTrue(pool.Len() == 0) + } + + { + pool.Put(make([]byte, 128<<20)) + a.IsTrue(pool.Len() == 1) + + pool.Get(110 << 20) + a.IsTrue(pool.Len() == 1) + } +} + +func TestMemoryFragmentPool_Get_Exactly(t *testing.T) { + var a = assert.NewAssertion(t) + + var pool = caches.NewMemoryFragmentPool() + { + pool.Put(make([]byte, 129<<20)) + a.IsTrue(pool.Len() == 0) + } + + { + pool.Put(make([]byte, 4<<20)) + a.IsTrue(pool.Len() == 1) + } + + { + pool.Get(4 << 20) + a.IsTrue(pool.Len() == 0) + } +} + +func TestMemoryFragmentPool_Get_Round(t *testing.T) { + var a = assert.NewAssertion(t) + + var pool = caches.NewMemoryFragmentPool() + { + pool.Put(make([]byte, 8<<20)) + pool.Put(make([]byte, 8<<20)) + pool.Put(make([]byte, 8<<20)) + a.IsTrue(pool.Len() == 3) + } + + { + resultBytes, ok := pool.Get(3 << 20) + a.IsTrue(pool.Len() == 2) + if ok { + pool.Put(resultBytes) + } + } + + { + pool.Get(2 << 20) + a.IsTrue(pool.Len() == 2) + } + + { + pool.Get(1 << 20) + a.IsTrue(pool.Len() == 1) + } +} + +func TestMemoryFragmentPool_GC(t *testing.T) { + var pool = caches.NewMemoryFragmentPool() + pool.SetCapacity(32 << 20) + for i := 0; i < 16; i++ { + pool.Put(make([]byte, 4<<20)) + } + var before = time.Now() + pool.GC() + t.Log(time.Since(before).Seconds()*1000, "ms") + t.Log(pool.Len()) +} + +func TestMemoryFragmentPool_Memory(t *testing.T) { + if !testutils.IsSingleTesting() { + return + } + + var pool = caches.NewMemoryFragmentPool() + + testutils.StartMemoryStats(t, func() { + t.Log(pool.Len(), "items") + }) + + var sampleData = bytes.Repeat([]byte{'A'}, 16<<20) + + var countNew = 0 + for i := 0; i < 1000; i++ { + cacheData, ok := pool.Get(16 << 20) + if ok { + copy(cacheData, sampleData) + pool.Put(cacheData) + } else { + countNew++ + var data = make([]byte, 16<<20) + copy(data, sampleData) + pool.Put(data) + } + } + + t.Log("count new:", countNew) + t.Log("count remains:", pool.Len()) + + time.Sleep(10 * time.Minute) +} + +func TestMemoryFragmentPool_GCNextBucket(t *testing.T) { + if !testutils.IsSingleTesting() { + return + } + + var pool = caches.NewMemoryFragmentPool() + for i := 0; i < 100; i++ { + pool.Put(make([]byte, 2<<20)) + } + + for i := 0; i < 100; i++ { + pool.GCNextBucket() + t.Log(pool.Len(), timeutil.Format("H:i:s")) + time.Sleep(10 * time.Second) + } +} + +func TestMemoryFragmentPoolItem(t *testing.T) { + var a = assert.NewAssertion(t) + + var m = map[int]*caches.MemoryFragmentPoolItem{} + m[1] = &caches.MemoryFragmentPoolItem{ + Refs: 0, + } + var item = m[1] + a.IsTrue(item.Refs == 0) + a.IsTrue(atomic.AddInt32(&item.Refs, 1) == 1) + + for _, item2 := range m { + t.Log(item2) + a.IsTrue(atomic.AddInt32(&item2.Refs, 1) == 2) + } + + t.Log(m) +} + +func BenchmarkMemoryFragmentPool_Get_HIT(b *testing.B) { + runtime.GOMAXPROCS(4) + + var pool = caches.NewMemoryFragmentPool() + for i := 0; i < 3000; i++ { + ok := pool.Put(make([]byte, 2<<20)) + if !ok { + break + } + } + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + data, ok := pool.Get(2 << 20) + if ok { + pool.Put(data) + } + } + }) +} + +func BenchmarkMemoryFragmentPool_Get_TOTALLY_MISSING(b *testing.B) { + runtime.GOMAXPROCS(4) + + var pool = caches.NewMemoryFragmentPool() + for i := 0; i < 3000; i++ { + ok := pool.Put(make([]byte, 2<<20+100)) + if !ok { + break + } + } + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + data, ok := pool.Get(2<<20 + 200) + if ok { + pool.Put(data) + } + } + }) +} + +func BenchmarkMemoryPool_Get_HIT_MISSING(b *testing.B) { + runtime.GOMAXPROCS(4) + + var pool = caches.NewMemoryFragmentPool() + for i := 0; i < 3000; i++ { + ok := pool.Put(make([]byte, rands.Int(2, 32)<<20)) + if !ok { + break + } + } + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + data, ok := pool.Get(4 << 20) + if ok { + pool.Put(data) + } + } + }) +} + +func BenchmarkMemoryFragmentPool_GC(b *testing.B) { + runtime.GOMAXPROCS(4) + + var pool = caches.NewMemoryFragmentPool() + pool.SetCapacity(1 << 30) + for i := 0; i < 2_000; i++ { + pool.Put(make([]byte, 1<<20)) + } + + var mu = sync.Mutex{} + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + mu.Lock() + for i := 0; i < 100; i++ { + pool.GCNextBucket() + } + mu.Unlock() + } + }) +} diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 413ceb4..dfb5e75 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -1348,19 +1348,9 @@ func (this *FileStorage) increaseHit(key string, hash string, reader Reader) { if rate <= 0 { rate = 1000 } - if this.lastHotSize == 0 { - // 自动降低采样率来增加热点数据的缓存几率 - rate = rate / 10 - } if rands.Int(0, rate) == 0 { var memoryStorage = this.memoryStorage - var hitErr = this.list.IncreaseHit(hash) - if hitErr != nil { - // 此错误可以忽略 - remotelogs.Error("CACHE", "increase hit failed: "+hitErr.Error()) - } - // 增加到热点 // 这里不收录缓存尺寸过大的文件 if memoryStorage != nil && reader.BodySize() > 0 && reader.BodySize() < 128*sizes.M { @@ -1376,6 +1366,15 @@ func (this *FileStorage) increaseHit(key string, hash string, reader Reader) { } } this.hotMapLocker.Unlock() + + // 只有重复点击的才增加点击量 + if ok { + var hitErr = this.list.IncreaseHit(hash) + if hitErr != nil { + // 此错误可以忽略 + remotelogs.Error("CACHE", "increase hit failed: "+hitErr.Error()) + } + } } } } diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index 65d9678..ce2cca0 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -12,7 +12,6 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/zero" "github.com/cespare/xxhash" "github.com/iwind/TeaGo/types" - "github.com/shirou/gopsutil/v3/load" "math" "runtime" "strconv" @@ -32,6 +31,8 @@ type MemoryItem struct { IsPrepared bool WriteOffset int64 + + isReferring bool // if it is referring by other objects } func (this *MemoryItem) IsExpired() bool { @@ -123,7 +124,12 @@ func (this *MemoryStorage) OpenReader(key string, useStale bool, isPartial bool) // read from valuesMap this.locker.RLock() - item := this.valuesMap[hash] + var item = this.valuesMap[hash] + + if item != nil { + item.isReferring = true + } + if item == nil || !item.IsDone { this.locker.RUnlock() return nil, ErrNotFound @@ -477,33 +483,20 @@ func (this *MemoryStorage) purgeLoop() { // 开始Flush任务 func (this *MemoryStorage) startFlush() { var statCount = 0 - var writeDelayMS float64 = 0 for key := range this.dirtyChan { statCount++ if statCount == 100 { statCount = 0 - - // delay some time to reduce load if needed - if !fsutils.DiskIsFast() { - loadStat, err := load.Avg() - if err == nil && loadStat != nil { - if loadStat.Load1 > 10 { - writeDelayMS = 100 - } else if loadStat.Load1 > 5 { - writeDelayMS = 50 - } else { - writeDelayMS = 0 - } - } - } } this.flushItem(key) - if writeDelayMS > 0 { - time.Sleep(time.Duration(writeDelayMS) * time.Millisecond) + if fsutils.IsInExtremelyHighLoad { + time.Sleep(1 * time.Second) + } else if fsutils.IsInHighLoad { + time.Sleep(100 * time.Millisecond) } } } @@ -522,6 +515,11 @@ func (this *MemoryStorage) flushItem(key string) { // 从内存中移除,并确保无论如何都会执行 defer func() { _ = this.Delete(key) + + // 重用内存,前提是确保内存不再被引用 + if ok && item.IsDone && !item.isReferring && len(item.BodyValue) > 0 { + SharedFragmentMemoryPool.Put(item.BodyValue) + } }() if !ok { diff --git a/internal/caches/writer_memory.go b/internal/caches/writer_memory.go index d2623df..b92539f 100644 --- a/internal/caches/writer_memory.go +++ b/internal/caches/writer_memory.go @@ -33,7 +33,16 @@ func NewMemoryWriter(memoryStorage *MemoryStorage, key string, expiredAt int64, Status: status, } if expectedBodySize > 0 { - valueItem.BodyValue = make([]byte, expectedBodySize) + bodyBytes, ok := SharedFragmentMemoryPool.Get(expectedBodySize) // try to reuse memory + if ok { + valueItem.BodyValue = bodyBytes + } else { + if expectedBodySize >= minMemoryFragmentPoolItemSize { + SharedFragmentMemoryPool.IncreaseNew() + } + valueItem.BodyValue = make([]byte, expectedBodySize) + } + valueItem.IsPrepared = true } var w = &MemoryWriter{