diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index e750d3b..ba4af46 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -30,8 +30,6 @@ type MemoryItem struct { IsDone bool ModifiedAt int64 - TotalSize int64 - IsPrepared bool WriteOffset int64 } @@ -88,6 +86,13 @@ func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy, parentStorage Stora func (this *MemoryStorage) Init() error { _ = this.list.Init() + this.list.OnAdd(func(item *Item) { + atomic.AddInt64(&this.usedSize, item.TotalSize()) + }) + this.list.OnRemove(func(item *Item) { + atomic.AddInt64(&this.usedSize, -item.TotalSize()) + }) + this.initPurgeTicker() // 启动定时Flush memory to disk任务 @@ -221,13 +226,6 @@ func (this *MemoryStorage) openWriter(key string, expiresAt int64, status int, h this.locker.Lock() delete(this.writingKeyMap, key) this.locker.Unlock() - - if valueItem != nil { - valueItem.TotalSize = int64(len(valueItem.HeaderValue) + len(valueItem.BodyValue) + len(key) + 256 /** meta size **/) - - runtime.SetFinalizer(valueItem, this.valueItemFinalizer) - atomic.AddInt64(&this.usedSize, valueItem.TotalSize) - } }), nil } @@ -409,22 +407,6 @@ func (this *MemoryStorage) hash(key string) uint64 { // 清理任务 func (this *MemoryStorage) purgeLoop() { - // 计算是否应该开启LFU清理 - var capacityBytes = this.policy.CapacityBytes() - var startLFU = false - var usedPercent = float32(this.TotalMemorySize()*100) / float32(capacityBytes) - var lfuFreePercent = this.policy.MemoryLFUFreePercent - if lfuFreePercent <= 0 { - lfuFreePercent = 5 - } - if capacityBytes > 0 { - if lfuFreePercent < 100 { - if usedPercent >= 100-lfuFreePercent { - startLFU = true - } - } - } - // 清理过期 var purgeCount = this.policy.MemoryAutoPurgeCount if purgeCount <= 0 { @@ -441,6 +423,23 @@ func (this *MemoryStorage) purgeLoop() { }) // LFU + // 计算是否应该开启LFU清理 + var capacityBytes = this.policy.CapacityBytes() + var startLFU = false + + var usedPercent = float32(this.TotalMemorySize()*100) / float32(capacityBytes) + var lfuFreePercent = this.policy.MemoryLFUFreePercent + if lfuFreePercent <= 0 { + lfuFreePercent = 5 + } + if capacityBytes > 0 { + if lfuFreePercent < 100 { + if usedPercent >= 100-lfuFreePercent { + startLFU = true + } + } + } + if startLFU { var total, _ = this.list.Count() if total > 0 { @@ -523,6 +522,7 @@ func (this *MemoryStorage) flushItem(key string) { if !ok { return } + if !item.IsDone { remotelogs.Error("CACHE", "flush items failed: open writer failed: item has not been done") return @@ -531,6 +531,16 @@ func (this *MemoryStorage) flushItem(key string) { return } + // 检查是否在列表中,防止未加入列表时就开始flush + isInList, err := this.list.Exist(types.String(hash)) + if err != nil { + remotelogs.Error("CACHE", "flush items failed: "+err.Error()) + return + } + if !isInList { + time.Sleep(1 * time.Second) + } + writer, err := this.parentStorage.OpenFlushWriter(key, item.ExpiresAt, item.Status, len(item.HeaderValue), int64(len(item.BodyValue))) if err != nil { if !CanIgnoreErr(err) { @@ -627,7 +637,3 @@ func (this *MemoryStorage) initPurgeTicker() { } }) } - -func (this *MemoryStorage) valueItemFinalizer(valueItem *MemoryItem) { - atomic.AddInt64(&this.usedSize, -valueItem.TotalSize) -} diff --git a/internal/caches/writer_memory.go b/internal/caches/writer_memory.go index 4e6c257..d2623df 100644 --- a/internal/caches/writer_memory.go +++ b/internal/caches/writer_memory.go @@ -112,6 +112,7 @@ func (this *MemoryWriter) Close() error { // 需要在Locker之外 defer this.once.Do(func() { this.endFunc(this.item) + this.item = nil // free memory }) if this.item == nil { @@ -123,11 +124,14 @@ func (this *MemoryWriter) Close() error { var err error if this.isDirty { if this.storage.parentStorage != nil { + this.storage.valuesMap[this.hash] = this.item + select { case this.storage.dirtyChan <- this.key: - this.storage.valuesMap[this.hash] = this.item default: - // do not add value map + // remove from values map + delete(this.storage.valuesMap, this.hash) + err = ErrWritingQueueFull } } else { @@ -147,6 +151,7 @@ func (this *MemoryWriter) Discard() error { // 需要在Locker之外 defer this.once.Do(func() { this.endFunc(this.item) + this.item = nil // free memory }) this.storage.locker.Lock()