diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index a91053c..d3e8528 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -30,6 +30,8 @@ type MemoryItem struct { Status int IsDone bool ModifiedAt int64 + + TotalSize int64 } func (this *MemoryItem) IsExpired() bool { @@ -50,7 +52,7 @@ type MemoryStorage struct { purgeTicker *utils.Ticker - totalSize int64 + usedSize int64 writingKeyMap map[string]zero.Zero // key => bool ignoreKeys *setutils.FixedSet @@ -84,13 +86,6 @@ func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy, parentStorage Stora func (this *MemoryStorage) Init() error { _ = this.list.Init() - this.list.OnAdd(func(item *Item) { - atomic.AddInt64(&this.totalSize, item.TotalSize()) - }) - this.list.OnRemove(func(item *Item) { - atomic.AddInt64(&this.totalSize, -item.TotalSize()) - }) - this.initPurgeTicker() // 启动定时Flush memory to disk任务 @@ -205,12 +200,12 @@ func (this *MemoryStorage) openWriter(key string, expiresAt int64, status int, h } // 检查是否超出最大值 - capacityBytes := this.memoryCapacityBytes() + var capacityBytes = this.memoryCapacityBytes() if bodySize < 0 { bodySize = 0 } - if capacityBytes > 0 && capacityBytes <= this.totalSize+bodySize { - return nil, NewCapacityError("write memory cache failed: over memory size: " + strconv.FormatInt(capacityBytes, 10) + ", current size: " + strconv.FormatInt(this.totalSize, 10) + " bytes") + if capacityBytes > 0 && capacityBytes <= atomic.LoadInt64(&this.usedSize)+bodySize { + return nil, NewCapacityError("write memory cache failed: over memory size: " + strconv.FormatInt(capacityBytes, 10) + ", current size: " + strconv.FormatInt(this.usedSize, 10) + " bytes") } // 先删除 @@ -220,10 +215,17 @@ func (this *MemoryStorage) openWriter(key string, expiresAt int64, status int, h } isWriting = true - return NewMemoryWriter(this, key, expiresAt, status, isDirty, maxSize, func() { + return NewMemoryWriter(this, key, expiresAt, status, isDirty, maxSize, func(valueItem *MemoryItem) { this.locker.Lock() delete(this.writingKeyMap, key) this.locker.Unlock() + + if valueItem != nil && valueItem.IsDone { + valueItem.TotalSize = int64(len(valueItem.HeaderValue) + len(valueItem.BodyValue) + len(key) + 256 /** meta size **/) + + atomic.AddInt64(&this.usedSize, valueItem.TotalSize) + runtime.SetFinalizer(valueItem, this.valueItemFinalizer) + } }), nil } @@ -252,7 +254,7 @@ func (this *MemoryStorage) CleanAll() error { this.locker.Lock() this.valuesMap = map[uint64]*MemoryItem{} _ = this.list.Reset() - atomic.StoreInt64(&this.totalSize, 0) + atomic.StoreInt64(&this.usedSize, 0) this.locker.Unlock() return nil } @@ -363,6 +365,11 @@ func (this *MemoryStorage) CanUpdatePolicy(newPolicy *serverconfigs.HTTPCachePol // AddToList 将缓存添加到列表 func (this *MemoryStorage) AddToList(item *Item) { + // skip added item + if item.MetaSize > 0 { + return + } + item.MetaSize = int64(len(item.Key)) + 128 /** 128是我们评估的数据结构的长度 **/ var hash = types.String(this.hash(item.Key)) @@ -380,7 +387,7 @@ func (this *MemoryStorage) TotalDiskSize() int64 { // TotalMemorySize 内存尺寸 func (this *MemoryStorage) TotalMemorySize() int64 { - return atomic.LoadInt64(&this.totalSize) + return atomic.LoadInt64(&this.usedSize) } // IgnoreKey 忽略某个Key,即不缓存某个Key @@ -563,17 +570,29 @@ func (this *MemoryStorage) memoryCapacityBytes() int64 { if this.policy == nil { return 0 } - c1 := int64(0) - if this.policy.Capacity != nil { - c1 = this.policy.Capacity.Bytes() - } + if SharedManager.MaxMemoryCapacity != nil { - c2 := SharedManager.MaxMemoryCapacity.Bytes() - if c2 > 0 { - return c2 + var capacityBytes = SharedManager.MaxMemoryCapacity.Bytes() + if capacityBytes > 0 { + return capacityBytes } } - return c1 + + var capacity = this.policy.Capacity // copy + if capacity != nil { + var capacityBytes = capacity.Bytes() + if capacityBytes > 0 { + return capacityBytes + } + } + + // half of the system memory + var memoryGB = utils.SystemMemoryGB() + if memoryGB < 1 { + memoryGB = 1 + } + + return int64(memoryGB) << 30 / 2 } func (this *MemoryStorage) deleteWithoutLocker(key string) error { @@ -604,3 +623,7 @@ func (this *MemoryStorage) initPurgeTicker() { } }) } + +func (this *MemoryStorage) valueItemFinalizer(valueItem *MemoryItem) { + atomic.AddInt64(&this.usedSize, -valueItem.TotalSize) +} diff --git a/internal/caches/writer_memory.go b/internal/caches/writer_memory.go index d1d7a86..3ad813d 100644 --- a/internal/caches/writer_memory.go +++ b/internal/caches/writer_memory.go @@ -20,12 +20,12 @@ type MemoryWriter struct { hash uint64 item *MemoryItem - endFunc func() + endFunc func(valueItem *MemoryItem) once sync.Once } -func NewMemoryWriter(memoryStorage *MemoryStorage, key string, expiredAt int64, status int, isDirty bool, maxSize int64, endFunc func()) *MemoryWriter { - w := &MemoryWriter{ +func NewMemoryWriter(memoryStorage *MemoryStorage, key string, expiredAt int64, status int, isDirty bool, maxSize int64, endFunc func(valueItem *MemoryItem)) *MemoryWriter { + var w = &MemoryWriter{ storage: memoryStorage, key: key, expiredAt: expiredAt, @@ -39,6 +39,7 @@ func NewMemoryWriter(memoryStorage *MemoryStorage, key string, expiredAt int64, maxSize: maxSize, endFunc: endFunc, } + w.hash = w.calculateHash(key) return w @@ -87,7 +88,7 @@ func (this *MemoryWriter) BodySize() int64 { func (this *MemoryWriter) Close() error { // 需要在Locker之外 defer this.once.Do(func() { - this.endFunc() + this.endFunc(this.item) }) if this.item == nil { @@ -102,7 +103,6 @@ func (this *MemoryWriter) Close() error { select { case this.storage.dirtyChan <- this.key: default: - } } } @@ -115,7 +115,7 @@ func (this *MemoryWriter) Close() error { func (this *MemoryWriter) Discard() error { // 需要在Locker之外 defer this.once.Do(func() { - this.endFunc() + this.endFunc(nil) }) this.storage.locker.Lock() diff --git a/internal/utils/fs/status.go b/internal/utils/fs/status.go index 58af938..c60be56 100644 --- a/internal/utils/fs/status.go +++ b/internal/utils/fs/status.go @@ -4,6 +4,7 @@ package fsutils import ( teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "github.com/iwind/TeaGo/Tea" "sync/atomic" "time" ) @@ -67,6 +68,10 @@ func DiskIsFast() bool { } func DiskIsExtremelyFast() bool { + // 在开发环境下返回false,以便于测试 + if Tea.IsTesting() { + return false + } return DiskSpeed == SpeedExtremelyFast }