From c4bb92433db6beaee0d9db934e7e360ffc0881e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E7=A5=A5=E8=B6=85?= Date: Fri, 5 Apr 2024 10:59:14 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=BC=93=E5=AD=98=E4=BB=8E?= =?UTF-8?q?=E5=86=85=E5=AD=98=E5=88=B7=E6=96=B0=E5=88=B0=E7=A1=AC=E7=9B=98?= =?UTF-8?q?=E7=A8=8B=E5=BA=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/caches/errors.go | 9 ++++++++ internal/caches/storage_file.go | 15 ++++++++----- internal/caches/storage_memory.go | 31 +++++++++++++++++++++----- internal/caches/storage_memory_test.go | 29 ++++++++++++++++++++++++ internal/caches/writer_memory.go | 5 ++++- internal/utils/fs/status.go | 12 +++++----- 6 files changed, 83 insertions(+), 18 deletions(-) diff --git a/internal/caches/errors.go b/internal/caches/errors.go index 4656117..31244d6 100644 --- a/internal/caches/errors.go +++ b/internal/caches/errors.go @@ -45,3 +45,12 @@ func CanIgnoreErr(err error) bool { var capacityErr *CapacityError return errors.As(err, &capacityErr) } + +func IsCapacityError(err error) bool { + if err == nil { + return false + } + + var capacityErr *CapacityError + return errors.As(err, &capacityErr) +} diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 88bf73d..fa2553b 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -54,10 +54,10 @@ const ( ) const ( - FileStorageMaxIgnoreKeys = 32768 // 最大可忽略的键值数(尺寸过大的键值) - HotItemSize = 1024 // 热点数据数量 - HotItemLifeSeconds int64 = 3600 // 热点数据生命周期 - FileToMemoryMaxSize = 32 * sizes.M // 可以从文件写入到内存的最大文件尺寸 + FileStorageMaxIgnoreKeys = 32768 // 最大可忽略的键值数(尺寸过大的键值) + HotItemSize = 1024 // 热点数据数量 + HotItemLifeSeconds int64 = 3600 // 热点数据生命周期 + FileToMemoryMaxSize int64 = 32 << 20 // 可以从文件写入到内存的最大文件尺寸 FileTmpSuffix = ".tmp" DefaultMinDiskFreeSpace uint64 = 5 << 30 // 当前磁盘最小剩余空间 DefaultStaleCacheSeconds = 1200 // 过时缓存留存时间 @@ -478,7 +478,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea maxMemorySize = maxSize } var memoryStorage = this.memoryStorage - if !fsutils.DiskIsExtremelyFast() && !isFlushing && !isPartial && memoryStorage != nil && ((bodySize > 0 && bodySize < maxMemorySize) || bodySize < 0) { + if !isFlushing && !isPartial && memoryStorage != nil && ((bodySize > 0 && bodySize < maxMemorySize) || bodySize < 0) { writer, err := memoryStorage.OpenWriter(key, expiredAt, status, headerSize, bodySize, maxMemorySize, false) if err == nil { return writer, nil @@ -488,6 +488,10 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea if errors.Is(err, ErrWritingQueueFull) { return nil, err } + + if IsCapacityError(err) && bodySize > 0 && memoryStorage.totalDirtySize > (128<<20) { + return nil, err + } } // 是否正在写入 @@ -607,7 +611,6 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea writer, err := os.OpenFile(tmpPath, flags, 0666) fsutils.WriteEnd() if err != nil { - // TODO 检查在各个系统中的稳定性 if os.IsNotExist(err) { _ = os.MkdirAll(dir, 0777) diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index 8ce8cef..beee962 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -55,7 +55,9 @@ type MemoryStorage struct { purgeTicker *utils.Ticker - usedSize int64 + usedSize int64 + totalDirtySize int64 + writingKeyMap map[string]zero.Zero // key => bool ignoreKeys *setutils.FixedSet @@ -340,6 +342,9 @@ func (this *MemoryStorage) Stop() { close(this.dirtyChan) } + this.usedSize = 0 + this.totalDirtySize = 0 + _ = this.list.Close() this.locker.Unlock() @@ -506,14 +511,20 @@ func (this *MemoryStorage) startFlush() { if fsutils.IsInExtremelyHighLoad { time.Sleep(1 * time.Second) - } else if fsutils.IsInHighLoad { - time.Sleep(100 * time.Millisecond) } } } // 单次Flush任务 -func (this *MemoryStorage) flushItem(key string) { +func (this *MemoryStorage) flushItem(fullKey string) { + sizeString, key, found := strings.Cut(fullKey, "@") + if !found { + return + } + defer func() { + atomic.AddInt64(&this.totalDirtySize, -types.Int64(sizeString)) + }() + if this.parentStorage == nil { return } @@ -547,7 +558,17 @@ func (this *MemoryStorage) flushItem(key string) { return } if !isInList { - time.Sleep(1 * time.Second) + for i := 0; i < 1000; i++ { + isInList, _, err = this.list.Exist(types.String(hash)) + if isInList { + break + } + time.Sleep(1 * time.Millisecond) + } + if !isInList { + // discard + return + } } writer, err := this.parentStorage.OpenFlushWriter(key, item.ExpiresAt, item.Status, len(item.HeaderValue), int64(len(item.BodyValue))) diff --git a/internal/caches/storage_memory_test.go b/internal/caches/storage_memory_test.go index 1e9210a..89f6495 100644 --- a/internal/caches/storage_memory_test.go +++ b/internal/caches/storage_memory_test.go @@ -6,6 +6,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/utils/testutils" "github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/rands" + "math/rand" "runtime" "runtime/debug" "strconv" @@ -381,3 +382,31 @@ func BenchmarkValuesMap(b *testing.B) { } }) } + +func BenchmarkNewMemoryStorage(b *testing.B) { + var storage = NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) + + var data = bytes.Repeat([]byte{'A'}, 1024) + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + func() { + writer, err := storage.OpenWriter("abc"+strconv.Itoa(rand.Int()), time.Now().Unix()+60, 200, -1, -1, -1, false) + if err != nil { + b.Fatal(err) + } + if err != nil { + b.Fatal(err) + } + _, _ = writer.WriteHeader([]byte("Header")) + _, _ = writer.Write(data) + err = writer.Close() + if err != nil { + b.Fatal(err) + } + }() + } + }) +} diff --git a/internal/caches/writer_memory.go b/internal/caches/writer_memory.go index f578adc..2910c08 100644 --- a/internal/caches/writer_memory.go +++ b/internal/caches/writer_memory.go @@ -4,7 +4,9 @@ import ( "errors" "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" "github.com/cespare/xxhash" + "github.com/iwind/TeaGo/types" "sync" + "sync/atomic" ) type MemoryWriter struct { @@ -127,7 +129,8 @@ func (this *MemoryWriter) Close() error { this.storage.valuesMap[this.hash] = this.item select { - case this.storage.dirtyChan <- this.key: + case this.storage.dirtyChan <- types.String(this.bodySize) + "@" +this.key : + atomic.AddInt64(&this.storage.totalDirtySize, this.bodySize) default: // remove from values map delete(this.storage.valuesMap, this.hash) diff --git a/internal/utils/fs/status.go b/internal/utils/fs/status.go index fcfdfe1..ba84fac 100644 --- a/internal/utils/fs/status.go +++ b/internal/utils/fs/status.go @@ -129,14 +129,14 @@ func WriteEnd() { func calculateDiskMaxWrites() { switch DiskSpeed { case SpeedExtremelyFast: - DiskMaxWrites = 128 - case SpeedFast: - DiskMaxWrites = 64 - case SpeedLow: DiskMaxWrites = 32 + case SpeedFast: + DiskMaxWrites = 16 + case SpeedLow: + DiskMaxWrites = 8 case SpeedExtremelySlow: - DiskMaxWrites = 16 + DiskMaxWrites = 4 default: - DiskMaxWrites = 16 + DiskMaxWrites = 4 } }