From fd165b86ec49efcbe47b83ed51ac8fdcbe54ed37 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Mon, 2 Oct 2023 15:20:19 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=86=85=E5=AD=98=E7=BC=93?= =?UTF-8?q?=E5=AD=98=E9=98=9F=E5=88=97=E9=95=BF=E5=BA=A6=EF=BC=8C=E7=A1=AE?= =?UTF-8?q?=E4=BF=9D=E4=B8=8D=E4=BC=9A=E4=BA=A7=E7=94=9F=E4=B8=8D=E5=9C=A8?= =?UTF-8?q?=E9=98=9F=E5=88=97=E9=87=8C=E7=9A=84=E7=BC=93=E5=AD=98=E5=AF=B9?= =?UTF-8?q?=E8=B1=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/caches/storage_file.go | 2 +- internal/caches/storage_memory.go | 17 +++++++++-------- internal/caches/writer_memory.go | 16 ++++++++++++---- 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 1b09b11..51e6abf 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -436,7 +436,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea } // 如果队列满了,则等待 - if err == ErrWritingQueueFull { + if errors.Is(err, ErrWritingQueueFull) { return nil, err } } diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index bdff1d8..e750d3b 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -9,7 +9,6 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" setutils "github.com/TeaOSLab/EdgeNode/internal/utils/sets" - "github.com/TeaOSLab/EdgeNode/internal/utils/sizes" "github.com/TeaOSLab/EdgeNode/internal/zero" "github.com/cespare/xxhash" "github.com/iwind/TeaGo/types" @@ -67,7 +66,7 @@ func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy, parentStorage Stora if parentStorage != nil { if queueSize <= 0 { - queueSize = 2048 + int(policy.CapacityBytes()/sizes.G)*2048 + queueSize = utils.SystemMemoryGB() * 100_000 } dirtyChan = make(chan string, queueSize) @@ -166,7 +165,7 @@ func (this *MemoryStorage) openWriter(key string, expiresAt int64, status int, h if isDirty && this.parentStorage != nil && this.dirtyQueueSize > 0 && - len(this.dirtyChan) == this.dirtyQueueSize { // 缓存时间过长 + len(this.dirtyChan) >= this.dirtyQueueSize-int(fsutils.DiskMaxWrites) /** delta **/ { // 缓存时间过长 return nil, ErrWritingQueueFull } @@ -202,7 +201,7 @@ func (this *MemoryStorage) openWriter(key string, expiresAt int64, status int, h } } - // 检查是否超出最大值 + // 检查是否超出容量最大值 var capacityBytes = this.memoryCapacityBytes() if bodySize < 0 { bodySize = 0 @@ -226,8 +225,8 @@ func (this *MemoryStorage) openWriter(key string, expiresAt int64, status int, h if valueItem != nil { valueItem.TotalSize = int64(len(valueItem.HeaderValue) + len(valueItem.BodyValue) + len(key) + 256 /** meta size **/) - atomic.AddInt64(&this.usedSize, valueItem.TotalSize) runtime.SetFinalizer(valueItem, this.valueItemFinalizer) + atomic.AddInt64(&this.usedSize, valueItem.TotalSize) } }), nil } @@ -516,6 +515,11 @@ func (this *MemoryStorage) flushItem(key string) { item, ok := this.valuesMap[hash] this.locker.RUnlock() + // 从内存中移除,并确保无论如何都会执行 + defer func() { + _ = this.Delete(key) + }() + if !ok { return } @@ -564,9 +568,6 @@ func (this *MemoryStorage) flushItem(key string) { HeaderSize: writer.HeaderSize(), BodySize: writer.BodySize(), }) - - // 从内存中移除 - _ = this.Delete(key) } func (this *MemoryStorage) memoryCapacityBytes() int64 { diff --git a/internal/caches/writer_memory.go b/internal/caches/writer_memory.go index 63d4e82..4e6c257 100644 --- a/internal/caches/writer_memory.go +++ b/internal/caches/writer_memory.go @@ -2,9 +2,9 @@ package caches import ( "errors" + "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" "github.com/cespare/xxhash" "sync" - "time" ) type MemoryWriter struct { @@ -29,7 +29,7 @@ type MemoryWriter struct { func NewMemoryWriter(memoryStorage *MemoryStorage, key string, expiredAt int64, status int, isDirty bool, expectedBodySize int64, maxSize int64, endFunc func(valueItem *MemoryItem)) *MemoryWriter { var valueItem = &MemoryItem{ ExpiresAt: expiredAt, - ModifiedAt: time.Now().Unix(), + ModifiedAt: fasttime.Now().Unix(), Status: status, } if expectedBodySize > 0 { @@ -120,18 +120,26 @@ func (this *MemoryWriter) Close() error { this.storage.locker.Lock() this.item.IsDone = true - this.storage.valuesMap[this.hash] = this.item + var err error if this.isDirty { if this.storage.parentStorage != nil { select { case this.storage.dirtyChan <- this.key: + this.storage.valuesMap[this.hash] = this.item default: + // do not add value map + err = ErrWritingQueueFull } + } else { + err = ErrWritingQueueFull } + } else { + this.storage.valuesMap[this.hash] = this.item } + this.storage.locker.Unlock() - return nil + return err } // Discard 丢弃