mirror of
				https://github.com/TeaOSLab/EdgeNode.git
				synced 2025-11-04 07:40:56 +08:00 
			
		
		
		
	删除不需要的代码
This commit is contained in:
		@@ -1,375 +0,0 @@
 | 
				
			|||||||
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
package caches
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
import (
 | 
					 | 
				
			||||||
	teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
 | 
					 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/goman"
 | 
					 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
 | 
					 | 
				
			||||||
	memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem"
 | 
					 | 
				
			||||||
	"github.com/iwind/TeaGo/logs"
 | 
					 | 
				
			||||||
	"os"
 | 
					 | 
				
			||||||
	"sync"
 | 
					 | 
				
			||||||
	"sync/atomic"
 | 
					 | 
				
			||||||
	"time"
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
const (
 | 
					 | 
				
			||||||
	enableFragmentPool                  = false
 | 
					 | 
				
			||||||
	minMemoryFragmentPoolItemSize       = 8 << 10
 | 
					 | 
				
			||||||
	maxMemoryFragmentPoolItemSize       = 128 << 20
 | 
					 | 
				
			||||||
	maxItemsInMemoryFragmentPoolBucket  = 1024
 | 
					 | 
				
			||||||
	memoryFragmentPoolBucketSegmentSize = 512 << 10
 | 
					 | 
				
			||||||
	maxMemoryFragmentPoolItemAgeSeconds = 60
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
var SharedFragmentMemoryPool *MemoryFragmentPool
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func init() {
 | 
					 | 
				
			||||||
	if !teaconst.IsMain {
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	SharedFragmentMemoryPool = NewMemoryFragmentPool()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	goman.New(func() {
 | 
					 | 
				
			||||||
		var ticker = time.NewTicker(200 * time.Millisecond)
 | 
					 | 
				
			||||||
		for range ticker.C {
 | 
					 | 
				
			||||||
			for i := 0; i < 10; i++ { // skip N empty buckets
 | 
					 | 
				
			||||||
				var isEmpty = SharedFragmentMemoryPool.GCNextBucket()
 | 
					 | 
				
			||||||
				if !isEmpty {
 | 
					 | 
				
			||||||
					break
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	})
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
type MemoryFragmentPoolItem struct {
 | 
					 | 
				
			||||||
	Bytes []byte
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	size      int64
 | 
					 | 
				
			||||||
	createdAt int64
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	Refs int32
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (this *MemoryFragmentPoolItem) IsExpired() bool {
 | 
					 | 
				
			||||||
	return this.createdAt < fasttime.Now().Unix()-maxMemoryFragmentPoolItemAgeSeconds
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (this *MemoryFragmentPoolItem) Reset() {
 | 
					 | 
				
			||||||
	this.Bytes = nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (this *MemoryFragmentPoolItem) IsAvailable() bool {
 | 
					 | 
				
			||||||
	return atomic.AddInt32(&this.Refs, 1) == 1
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// MemoryFragmentPool memory fragments management
 | 
					 | 
				
			||||||
type MemoryFragmentPool struct {
 | 
					 | 
				
			||||||
	bucketMaps    []map[uint64]*MemoryFragmentPoolItem // [  { id => Zero }, ... ]
 | 
					 | 
				
			||||||
	countBuckets  int
 | 
					 | 
				
			||||||
	gcBucketIndex int
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	mu sync.RWMutex
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	id          uint64
 | 
					 | 
				
			||||||
	totalMemory int64
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	isOk     bool
 | 
					 | 
				
			||||||
	capacity int64
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	debugMode bool
 | 
					 | 
				
			||||||
	countGet  uint64
 | 
					 | 
				
			||||||
	countNew  uint64
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// NewMemoryFragmentPool create new fragment memory pool
 | 
					 | 
				
			||||||
func NewMemoryFragmentPool() *MemoryFragmentPool {
 | 
					 | 
				
			||||||
	var pool = &MemoryFragmentPool{}
 | 
					 | 
				
			||||||
	pool.init()
 | 
					 | 
				
			||||||
	return pool
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (this *MemoryFragmentPool) init() {
 | 
					 | 
				
			||||||
	var capacity = int64(memutils.SystemMemoryGB()) << 30 / 16
 | 
					 | 
				
			||||||
	if capacity > 256<<20 {
 | 
					 | 
				
			||||||
		this.isOk = true
 | 
					 | 
				
			||||||
		this.capacity = capacity
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		this.bucketMaps = []map[uint64]*MemoryFragmentPoolItem{}
 | 
					 | 
				
			||||||
		for i := 0; i < maxMemoryFragmentPoolItemSize/memoryFragmentPoolBucketSegmentSize+1; i++ {
 | 
					 | 
				
			||||||
			this.bucketMaps = append(this.bucketMaps, map[uint64]*MemoryFragmentPoolItem{})
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		this.countBuckets = len(this.bucketMaps)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// print statistics for debug
 | 
					 | 
				
			||||||
	if len(os.Getenv("GOEDGE_DEBUG_MEMORY_FRAGMENT_POOL")) > 0 {
 | 
					 | 
				
			||||||
		this.debugMode = true
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		go func() {
 | 
					 | 
				
			||||||
			var maxRounds = 10_000
 | 
					 | 
				
			||||||
			var ticker = time.NewTicker(10 * time.Second)
 | 
					 | 
				
			||||||
			for range ticker.C {
 | 
					 | 
				
			||||||
				logs.Println("reused:", this.countGet, "created:", this.countNew, "fragments:", this.Len(), "memory:", this.totalMemory>>20, "MB")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
				maxRounds--
 | 
					 | 
				
			||||||
				if maxRounds <= 0 {
 | 
					 | 
				
			||||||
					break
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}()
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// Get try to get a bytes object
 | 
					 | 
				
			||||||
func (this *MemoryFragmentPool) Get(expectSize int64) (resultBytes []byte, ok bool) {
 | 
					 | 
				
			||||||
	if !this.isOk {
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if expectSize <= 0 {
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// DO NOT check min segment size
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	this.mu.RLock()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var bucketIndex = this.bucketIndexForSize(expectSize)
 | 
					 | 
				
			||||||
	var resultItemId uint64
 | 
					 | 
				
			||||||
	const maxSearchingBuckets = 20
 | 
					 | 
				
			||||||
	for i := bucketIndex; i <= bucketIndex+maxSearchingBuckets; i++ {
 | 
					 | 
				
			||||||
		resultBytes, resultItemId, ok = this.findItemInMap(this.bucketMaps[i], expectSize)
 | 
					 | 
				
			||||||
		if ok {
 | 
					 | 
				
			||||||
			this.mu.RUnlock()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			// remove from bucket
 | 
					 | 
				
			||||||
			this.mu.Lock()
 | 
					 | 
				
			||||||
			delete(this.bucketMaps[i], resultItemId)
 | 
					 | 
				
			||||||
			this.mu.Unlock()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			return
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		if i >= this.countBuckets {
 | 
					 | 
				
			||||||
			break
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	this.mu.RUnlock()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// Put a bytes object to specified bucket
 | 
					 | 
				
			||||||
func (this *MemoryFragmentPool) Put(data []byte) (ok bool) {
 | 
					 | 
				
			||||||
	if !this.isOk {
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var l = int64(cap(data)) // MUST be 'cap' instead of 'len'
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if l < minMemoryFragmentPoolItemSize || l > maxMemoryFragmentPoolItemSize {
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if atomic.LoadInt64(&this.totalMemory) >= this.capacity {
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var itemId = atomic.AddUint64(&this.id, 1)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	this.mu.Lock()
 | 
					 | 
				
			||||||
	defer this.mu.Unlock()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var bucketMap = this.bucketMaps[this.bucketIndexForSize(l)]
 | 
					 | 
				
			||||||
	if len(bucketMap) >= maxItemsInMemoryFragmentPoolBucket {
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	atomic.AddInt64(&this.totalMemory, l)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	bucketMap[itemId] = &MemoryFragmentPoolItem{
 | 
					 | 
				
			||||||
		Bytes:     data,
 | 
					 | 
				
			||||||
		size:      l,
 | 
					 | 
				
			||||||
		createdAt: fasttime.Now().Unix(),
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return true
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// GC fully GC
 | 
					 | 
				
			||||||
func (this *MemoryFragmentPool) GC() {
 | 
					 | 
				
			||||||
	if !this.isOk {
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var totalMemory = atomic.LoadInt64(&this.totalMemory)
 | 
					 | 
				
			||||||
	if totalMemory < this.capacity {
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	this.mu.Lock()
 | 
					 | 
				
			||||||
	defer this.mu.Unlock()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var garbageSize = totalMemory * 1 / 10 // 10%
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// remove expired
 | 
					 | 
				
			||||||
	for _, bucketMap := range this.bucketMaps {
 | 
					 | 
				
			||||||
		for itemId, item := range bucketMap {
 | 
					 | 
				
			||||||
			if item.IsExpired() {
 | 
					 | 
				
			||||||
				delete(bucketMap, itemId)
 | 
					 | 
				
			||||||
				item.Reset()
 | 
					 | 
				
			||||||
				atomic.AddInt64(&this.totalMemory, -item.size)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
				garbageSize -= item.size
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// remove others
 | 
					 | 
				
			||||||
	if garbageSize > 0 {
 | 
					 | 
				
			||||||
		for _, bucketMap := range this.bucketMaps {
 | 
					 | 
				
			||||||
			for itemId, item := range bucketMap {
 | 
					 | 
				
			||||||
				delete(bucketMap, itemId)
 | 
					 | 
				
			||||||
				item.Reset()
 | 
					 | 
				
			||||||
				atomic.AddInt64(&this.totalMemory, -item.size)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
				garbageSize -= item.size
 | 
					 | 
				
			||||||
				if garbageSize <= 0 {
 | 
					 | 
				
			||||||
					break
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// GCNextBucket gc one bucket
 | 
					 | 
				
			||||||
func (this *MemoryFragmentPool) GCNextBucket() (isEmpty bool) {
 | 
					 | 
				
			||||||
	if !this.isOk {
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var itemIds = []uint64{}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// find
 | 
					 | 
				
			||||||
	this.mu.RLock()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var bucketIndex = this.gcBucketIndex
 | 
					 | 
				
			||||||
	var bucketMap = this.bucketMaps[bucketIndex]
 | 
					 | 
				
			||||||
	isEmpty = len(bucketMap) == 0
 | 
					 | 
				
			||||||
	if isEmpty {
 | 
					 | 
				
			||||||
		this.mu.RUnlock()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		// move to next bucket index
 | 
					 | 
				
			||||||
		bucketIndex++
 | 
					 | 
				
			||||||
		if bucketIndex >= this.countBuckets {
 | 
					 | 
				
			||||||
			bucketIndex = 0
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		this.gcBucketIndex = bucketIndex
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	for itemId, item := range bucketMap {
 | 
					 | 
				
			||||||
		if item.IsExpired() {
 | 
					 | 
				
			||||||
			itemIds = append(itemIds, itemId)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	this.mu.RUnlock()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// remove
 | 
					 | 
				
			||||||
	if len(itemIds) > 0 {
 | 
					 | 
				
			||||||
		this.mu.Lock()
 | 
					 | 
				
			||||||
		for _, itemId := range itemIds {
 | 
					 | 
				
			||||||
			item, ok := bucketMap[itemId]
 | 
					 | 
				
			||||||
			if !ok {
 | 
					 | 
				
			||||||
				continue
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			if !item.IsAvailable() {
 | 
					 | 
				
			||||||
				continue
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			delete(bucketMap, itemId)
 | 
					 | 
				
			||||||
			item.Reset()
 | 
					 | 
				
			||||||
			atomic.AddInt64(&this.totalMemory, -item.size)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		this.mu.Unlock()
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// move to next bucket index
 | 
					 | 
				
			||||||
	bucketIndex++
 | 
					 | 
				
			||||||
	if bucketIndex >= this.countBuckets {
 | 
					 | 
				
			||||||
		bucketIndex = 0
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	this.gcBucketIndex = bucketIndex
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (this *MemoryFragmentPool) SetCapacity(capacity int64) {
 | 
					 | 
				
			||||||
	this.capacity = capacity
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (this *MemoryFragmentPool) TotalSize() int64 {
 | 
					 | 
				
			||||||
	return atomic.LoadInt64(&this.totalMemory)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (this *MemoryFragmentPool) Len() int {
 | 
					 | 
				
			||||||
	this.mu.Lock()
 | 
					 | 
				
			||||||
	defer this.mu.Unlock()
 | 
					 | 
				
			||||||
	var count = 0
 | 
					 | 
				
			||||||
	for _, bucketMap := range this.bucketMaps {
 | 
					 | 
				
			||||||
		count += len(bucketMap)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return count
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (this *MemoryFragmentPool) IncreaseNew() {
 | 
					 | 
				
			||||||
	if this.isOk && this.debugMode {
 | 
					 | 
				
			||||||
		atomic.AddUint64(&this.countNew, 1)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (this *MemoryFragmentPool) bucketIndexForSize(size int64) int {
 | 
					 | 
				
			||||||
	return int(size / memoryFragmentPoolBucketSegmentSize)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (this *MemoryFragmentPool) findItemInMap(bucketMap map[uint64]*MemoryFragmentPoolItem, expectSize int64) (resultBytes []byte, resultItemId uint64, ok bool) {
 | 
					 | 
				
			||||||
	if len(bucketMap) == 0 {
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	for itemId, item := range bucketMap {
 | 
					 | 
				
			||||||
		if item.size >= expectSize {
 | 
					 | 
				
			||||||
			// check if is referred
 | 
					 | 
				
			||||||
			if !item.IsAvailable() {
 | 
					 | 
				
			||||||
				continue
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			// return result
 | 
					 | 
				
			||||||
			if item.size != expectSize {
 | 
					 | 
				
			||||||
				resultBytes = item.Bytes[:expectSize]
 | 
					 | 
				
			||||||
			} else {
 | 
					 | 
				
			||||||
				resultBytes = item.Bytes
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			// reset old item
 | 
					 | 
				
			||||||
			item.Reset()
 | 
					 | 
				
			||||||
			atomic.AddInt64(&this.totalMemory, -item.size)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			resultItemId = itemId
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			if this.debugMode {
 | 
					 | 
				
			||||||
				atomic.AddUint64(&this.countGet, 1)
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			ok = true
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			return
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
@@ -1,313 +0,0 @@
 | 
				
			|||||||
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
package caches_test
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
import (
 | 
					 | 
				
			||||||
	"bytes"
 | 
					 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/caches"
 | 
					 | 
				
			||||||
	"github.com/TeaOSLab/EdgeNode/internal/utils/testutils"
 | 
					 | 
				
			||||||
	"github.com/iwind/TeaGo/assert"
 | 
					 | 
				
			||||||
	"github.com/iwind/TeaGo/rands"
 | 
					 | 
				
			||||||
	timeutil "github.com/iwind/TeaGo/utils/time"
 | 
					 | 
				
			||||||
	"runtime"
 | 
					 | 
				
			||||||
	"sync"
 | 
					 | 
				
			||||||
	"sync/atomic"
 | 
					 | 
				
			||||||
	"testing"
 | 
					 | 
				
			||||||
	"time"
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func TestNewMemoryFragmentPool(t *testing.T) {
 | 
					 | 
				
			||||||
	var a = assert.NewAssertion(t)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var pool = caches.NewMemoryFragmentPool()
 | 
					 | 
				
			||||||
	for i := 0; i < 3000; i++ {
 | 
					 | 
				
			||||||
		ok := pool.Put(make([]byte, 2<<20))
 | 
					 | 
				
			||||||
		if !ok {
 | 
					 | 
				
			||||||
			t.Log("finished at", i)
 | 
					 | 
				
			||||||
			break
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	t.Log(pool.TotalSize()>>20, "MB", pool.Len(), "items")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	{
 | 
					 | 
				
			||||||
		r, ok := pool.Get(1 << 20)
 | 
					 | 
				
			||||||
		a.IsTrue(ok)
 | 
					 | 
				
			||||||
		a.IsTrue(len(r) == 1<<20)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	{
 | 
					 | 
				
			||||||
		r, ok := pool.Get(2 << 20)
 | 
					 | 
				
			||||||
		a.IsTrue(ok)
 | 
					 | 
				
			||||||
		a.IsTrue(len(r) == 2<<20)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	{
 | 
					 | 
				
			||||||
		r, ok := pool.Get(4 << 20)
 | 
					 | 
				
			||||||
		a.IsFalse(ok)
 | 
					 | 
				
			||||||
		a.IsTrue(len(r) == 0)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	t.Log(pool.TotalSize()>>20, "MB", pool.Len(), "items")
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func TestNewMemoryFragmentPool_LargeBucket(t *testing.T) {
 | 
					 | 
				
			||||||
	var a = assert.NewAssertion(t)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var pool = caches.NewMemoryFragmentPool()
 | 
					 | 
				
			||||||
	{
 | 
					 | 
				
			||||||
		pool.Put(make([]byte, 128<<20+1))
 | 
					 | 
				
			||||||
		a.IsTrue(pool.Len() == 0)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	{
 | 
					 | 
				
			||||||
		pool.Put(make([]byte, 128<<20))
 | 
					 | 
				
			||||||
		a.IsTrue(pool.Len() == 1)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		pool.Get(118 << 20)
 | 
					 | 
				
			||||||
		a.IsTrue(pool.Len() == 0)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	{
 | 
					 | 
				
			||||||
		pool.Put(make([]byte, 128<<20))
 | 
					 | 
				
			||||||
		a.IsTrue(pool.Len() == 1)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		pool.Get(110 << 20)
 | 
					 | 
				
			||||||
		a.IsTrue(pool.Len() == 1)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func TestMemoryFragmentPool_Get_Exactly(t *testing.T) {
 | 
					 | 
				
			||||||
	var a = assert.NewAssertion(t)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var pool = caches.NewMemoryFragmentPool()
 | 
					 | 
				
			||||||
	{
 | 
					 | 
				
			||||||
		pool.Put(make([]byte, 129<<20))
 | 
					 | 
				
			||||||
		a.IsTrue(pool.Len() == 0)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	{
 | 
					 | 
				
			||||||
		pool.Put(make([]byte, 4<<20))
 | 
					 | 
				
			||||||
		a.IsTrue(pool.Len() == 1)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	{
 | 
					 | 
				
			||||||
		pool.Get(4 << 20)
 | 
					 | 
				
			||||||
		a.IsTrue(pool.Len() == 0)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func TestMemoryFragmentPool_Get_Round(t *testing.T) {
 | 
					 | 
				
			||||||
	var a = assert.NewAssertion(t)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var pool = caches.NewMemoryFragmentPool()
 | 
					 | 
				
			||||||
	{
 | 
					 | 
				
			||||||
		pool.Put(make([]byte, 8<<20))
 | 
					 | 
				
			||||||
		pool.Put(make([]byte, 8<<20))
 | 
					 | 
				
			||||||
		pool.Put(make([]byte, 8<<20))
 | 
					 | 
				
			||||||
		a.IsTrue(pool.Len() == 3)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	{
 | 
					 | 
				
			||||||
		resultBytes, ok := pool.Get(3 << 20)
 | 
					 | 
				
			||||||
		a.IsTrue(pool.Len() == 2)
 | 
					 | 
				
			||||||
		if ok {
 | 
					 | 
				
			||||||
			pool.Put(resultBytes)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	{
 | 
					 | 
				
			||||||
		pool.Get(2 << 20)
 | 
					 | 
				
			||||||
		a.IsTrue(pool.Len() == 2)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	{
 | 
					 | 
				
			||||||
		pool.Get(1 << 20)
 | 
					 | 
				
			||||||
		a.IsTrue(pool.Len() == 1)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func TestMemoryFragmentPool_GC(t *testing.T) {
 | 
					 | 
				
			||||||
	var pool = caches.NewMemoryFragmentPool()
 | 
					 | 
				
			||||||
	pool.SetCapacity(32 << 20)
 | 
					 | 
				
			||||||
	for i := 0; i < 16; i++ {
 | 
					 | 
				
			||||||
		pool.Put(make([]byte, 4<<20))
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	var before = time.Now()
 | 
					 | 
				
			||||||
	pool.GC()
 | 
					 | 
				
			||||||
	t.Log(time.Since(before).Seconds()*1000, "ms")
 | 
					 | 
				
			||||||
	t.Log(pool.Len())
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func TestMemoryFragmentPool_Memory(t *testing.T) {
 | 
					 | 
				
			||||||
	if !testutils.IsSingleTesting() {
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var pool = caches.NewMemoryFragmentPool()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	testutils.StartMemoryStats(t, func() {
 | 
					 | 
				
			||||||
		t.Log(pool.Len(), "items")
 | 
					 | 
				
			||||||
	})
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var sampleData = bytes.Repeat([]byte{'A'}, 16<<20)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var countNew = 0
 | 
					 | 
				
			||||||
	for i := 0; i < 1000; i++ {
 | 
					 | 
				
			||||||
		cacheData, ok := pool.Get(16 << 20)
 | 
					 | 
				
			||||||
		if ok {
 | 
					 | 
				
			||||||
			copy(cacheData, sampleData)
 | 
					 | 
				
			||||||
			pool.Put(cacheData)
 | 
					 | 
				
			||||||
		} else {
 | 
					 | 
				
			||||||
			countNew++
 | 
					 | 
				
			||||||
			var data = make([]byte, 16<<20)
 | 
					 | 
				
			||||||
			copy(data, sampleData)
 | 
					 | 
				
			||||||
			pool.Put(data)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	t.Log("count new:", countNew)
 | 
					 | 
				
			||||||
	t.Log("count remains:", pool.Len())
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	time.Sleep(10 * time.Minute)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func TestMemoryFragmentPool_GCNextBucket(t *testing.T) {
 | 
					 | 
				
			||||||
	if !testutils.IsSingleTesting() {
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var pool = caches.NewMemoryFragmentPool()
 | 
					 | 
				
			||||||
	for i := 0; i < 1000; i++ {
 | 
					 | 
				
			||||||
		pool.Put(make([]byte, rands.Int(0, 100)<<20))
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var lastLen int
 | 
					 | 
				
			||||||
	for {
 | 
					 | 
				
			||||||
		pool.GCNextBucket()
 | 
					 | 
				
			||||||
		var currentLen = pool.Len()
 | 
					 | 
				
			||||||
		if lastLen == currentLen {
 | 
					 | 
				
			||||||
			continue
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		lastLen = currentLen
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		t.Log(currentLen, "items", pool.TotalSize(), "bytes", timeutil.Format("H:i:s"))
 | 
					 | 
				
			||||||
		time.Sleep(100 * time.Millisecond)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		if currentLen == 0 {
 | 
					 | 
				
			||||||
			break
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func TestMemoryFragmentPoolItem(t *testing.T) {
 | 
					 | 
				
			||||||
	var a = assert.NewAssertion(t)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var m = map[int]*caches.MemoryFragmentPoolItem{}
 | 
					 | 
				
			||||||
	m[1] = &caches.MemoryFragmentPoolItem{
 | 
					 | 
				
			||||||
		Refs: 0,
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	var item = m[1]
 | 
					 | 
				
			||||||
	a.IsTrue(item.Refs == 0)
 | 
					 | 
				
			||||||
	a.IsTrue(atomic.AddInt32(&item.Refs, 1) == 1)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	for _, item2 := range m {
 | 
					 | 
				
			||||||
		t.Log(item2)
 | 
					 | 
				
			||||||
		a.IsTrue(atomic.AddInt32(&item2.Refs, 1) == 2)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	t.Log(m)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func BenchmarkMemoryFragmentPool_Get_HIT(b *testing.B) {
 | 
					 | 
				
			||||||
	runtime.GOMAXPROCS(4)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var pool = caches.NewMemoryFragmentPool()
 | 
					 | 
				
			||||||
	for i := 0; i < 3000; i++ {
 | 
					 | 
				
			||||||
		ok := pool.Put(make([]byte, 2<<20))
 | 
					 | 
				
			||||||
		if !ok {
 | 
					 | 
				
			||||||
			break
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	b.ResetTimer()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	b.RunParallel(func(pb *testing.PB) {
 | 
					 | 
				
			||||||
		for pb.Next() {
 | 
					 | 
				
			||||||
			data, ok := pool.Get(2 << 20)
 | 
					 | 
				
			||||||
			if ok {
 | 
					 | 
				
			||||||
				pool.Put(data)
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	})
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func BenchmarkMemoryFragmentPool_Get_TOTALLY_MISSING(b *testing.B) {
 | 
					 | 
				
			||||||
	runtime.GOMAXPROCS(4)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var pool = caches.NewMemoryFragmentPool()
 | 
					 | 
				
			||||||
	for i := 0; i < 3000; i++ {
 | 
					 | 
				
			||||||
		ok := pool.Put(make([]byte, 2<<20+100))
 | 
					 | 
				
			||||||
		if !ok {
 | 
					 | 
				
			||||||
			break
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	b.ResetTimer()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	b.RunParallel(func(pb *testing.PB) {
 | 
					 | 
				
			||||||
		for pb.Next() {
 | 
					 | 
				
			||||||
			data, ok := pool.Get(2<<20 + 200)
 | 
					 | 
				
			||||||
			if ok {
 | 
					 | 
				
			||||||
				pool.Put(data)
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	})
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func BenchmarkMemoryPool_Get_HIT_MISSING(b *testing.B) {
 | 
					 | 
				
			||||||
	runtime.GOMAXPROCS(4)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var pool = caches.NewMemoryFragmentPool()
 | 
					 | 
				
			||||||
	for i := 0; i < 3000; i++ {
 | 
					 | 
				
			||||||
		ok := pool.Put(make([]byte, rands.Int(2, 32)<<20))
 | 
					 | 
				
			||||||
		if !ok {
 | 
					 | 
				
			||||||
			break
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	b.ResetTimer()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	b.RunParallel(func(pb *testing.PB) {
 | 
					 | 
				
			||||||
		for pb.Next() {
 | 
					 | 
				
			||||||
			data, ok := pool.Get(4 << 20)
 | 
					 | 
				
			||||||
			if ok {
 | 
					 | 
				
			||||||
				pool.Put(data)
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	})
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func BenchmarkMemoryFragmentPool_GC(b *testing.B) {
 | 
					 | 
				
			||||||
	runtime.GOMAXPROCS(4)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var pool = caches.NewMemoryFragmentPool()
 | 
					 | 
				
			||||||
	pool.SetCapacity(1 << 30)
 | 
					 | 
				
			||||||
	for i := 0; i < 2_000; i++ {
 | 
					 | 
				
			||||||
		pool.Put(make([]byte, 1<<20))
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	var mu = sync.Mutex{}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	b.ResetTimer()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	b.RunParallel(func(pb *testing.PB) {
 | 
					 | 
				
			||||||
		for pb.Next() {
 | 
					 | 
				
			||||||
			mu.Lock()
 | 
					 | 
				
			||||||
			for i := 0; i < 100; i++ {
 | 
					 | 
				
			||||||
				pool.GCNextBucket()
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			mu.Unlock()
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	})
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
@@ -526,11 +526,6 @@ func (this *MemoryStorage) flushItem(key string) {
 | 
				
			|||||||
	// 从内存中移除,并确保无论如何都会执行
 | 
						// 从内存中移除,并确保无论如何都会执行
 | 
				
			||||||
	defer func() {
 | 
						defer func() {
 | 
				
			||||||
		_ = this.Delete(key)
 | 
							_ = this.Delete(key)
 | 
				
			||||||
 | 
					 | 
				
			||||||
		// 重用内存,前提是确保内存不再被引用
 | 
					 | 
				
			||||||
		if enableFragmentPool && ok && item.IsDone && !item.isReferring && len(item.BodyValue) > 0 {
 | 
					 | 
				
			||||||
			SharedFragmentMemoryPool.Put(item.BodyValue)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if !ok {
 | 
						if !ok {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -32,27 +32,11 @@ func NewMemoryWriter(memoryStorage *MemoryStorage, key string, expiredAt int64,
 | 
				
			|||||||
		ModifiedAt: fasttime.Now().Unix(),
 | 
							ModifiedAt: fasttime.Now().Unix(),
 | 
				
			||||||
		Status:     status,
 | 
							Status:     status,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if enableFragmentPool &&
 | 
					 | 
				
			||||||
		expectedBodySize > 0 &&
 | 
					 | 
				
			||||||
		expectedBodySize <= maxMemoryFragmentPoolItemSize {
 | 
					 | 
				
			||||||
		bodyBytes, ok := SharedFragmentMemoryPool.Get(expectedBodySize) // try to reuse memory
 | 
					 | 
				
			||||||
		if ok {
 | 
					 | 
				
			||||||
			valueItem.BodyValue = bodyBytes
 | 
					 | 
				
			||||||
			valueItem.IsPrepared = true
 | 
					 | 
				
			||||||
		} else {
 | 
					 | 
				
			||||||
			if expectedBodySize <= (16 << 20) {
 | 
					 | 
				
			||||||
				var allocSize = (expectedBodySize/16384 + 1) * 16384
 | 
					 | 
				
			||||||
				valueItem.BodyValue = make([]byte, allocSize)[:expectedBodySize]
 | 
					 | 
				
			||||||
				valueItem.IsPrepared = true
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
				SharedFragmentMemoryPool.IncreaseNew()
 | 
						if expectedBodySize > 0 {
 | 
				
			||||||
			}
 | 
							valueItem.BodyValue = make([]byte, 0, expectedBodySize)
 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		if expectedBodySize > 0 {
 | 
					 | 
				
			||||||
			valueItem.BodyValue = make([]byte, 0, expectedBodySize)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	var w = &MemoryWriter{
 | 
						var w = &MemoryWriter{
 | 
				
			||||||
		storage:          memoryStorage,
 | 
							storage:          memoryStorage,
 | 
				
			||||||
		key:              key,
 | 
							key:              key,
 | 
				
			||||||
@@ -172,13 +156,6 @@ func (this *MemoryWriter) Discard() error {
 | 
				
			|||||||
	this.storage.locker.Lock()
 | 
						this.storage.locker.Lock()
 | 
				
			||||||
	delete(this.storage.valuesMap, this.hash)
 | 
						delete(this.storage.valuesMap, this.hash)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if enableFragmentPool &&
 | 
					 | 
				
			||||||
		this.item != nil &&
 | 
					 | 
				
			||||||
		!this.item.isReferring &&
 | 
					 | 
				
			||||||
		cap(this.item.BodyValue) >= minMemoryFragmentPoolItemSize {
 | 
					 | 
				
			||||||
		SharedFragmentMemoryPool.Put(this.item.BodyValue)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	this.storage.locker.Unlock()
 | 
						this.storage.locker.Unlock()
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user