From 96db004fb2ce1bcceedaae9e2eebd272b6e6fba5 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Sun, 6 Mar 2022 17:18:06 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=86=E5=9D=97=E4=BC=A0=E8=BE=93=E5=86=85?= =?UTF-8?q?=E5=AE=B9=E5=8F=AF=E4=BB=A5=E5=86=99=E5=85=A5=E5=88=B0=E5=86=85?= =?UTF-8?q?=E5=AD=98=E4=B8=AD/=E5=88=86=E5=9D=97=E4=BC=A0=E8=BE=93?= =?UTF-8?q?=E5=86=85=E5=AE=B9=E5=8F=AF=E4=BB=A5=E5=88=A4=E6=96=AD=E6=9C=80?= =?UTF-8?q?=E5=A4=A7=E5=B0=BA=E5=AF=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/caches/errors.go | 9 ++-- internal/caches/storage_file.go | 36 ++++++++++++--- internal/caches/storage_file_test.go | 10 ++-- internal/caches/storage_interface.go | 6 ++- internal/caches/storage_memory.go | 25 ++++++++-- internal/caches/storage_memory_test.go | 22 ++++----- internal/caches/writer_file.go | 15 +++++- internal/caches/writer_memory.go | 12 ++++- internal/nodes/api_stream.go | 4 +- internal/nodes/http_writer.go | 12 +++-- internal/utils/sets/set_fixed.go | 64 ++++++++++++++++++++++++++ internal/utils/sets/set_fixed_test.go | 57 +++++++++++++++++++++++ internal/utils/sizes/sizes.go | 10 ++++ internal/utils/sizes/sizes_test.go | 17 +++++++ 14 files changed, 259 insertions(+), 40 deletions(-) create mode 100644 internal/utils/sets/set_fixed.go create mode 100644 internal/utils/sets/set_fixed_test.go create mode 100644 internal/utils/sizes/sizes.go create mode 100644 internal/utils/sizes/sizes_test.go diff --git a/internal/caches/errors.go b/internal/caches/errors.go index 6a158e2..41f9c26 100644 --- a/internal/caches/errors.go +++ b/internal/caches/errors.go @@ -6,9 +6,10 @@ import "errors" // 常用的几个错误 var ( - ErrNotFound = errors.New("cache not found") - ErrFileIsWriting = errors.New("the file is writing") - ErrInvalidRange = errors.New("invalid range") + ErrNotFound = errors.New("cache not found") + ErrFileIsWriting = errors.New("the file is writing") + ErrInvalidRange = errors.New("invalid range") + ErrEntityTooLarge = errors.New("entity too large") ) // CapacityError 容量错误 @@ -30,7 +31,7 @@ func CanIgnoreErr(err error) bool { if err == nil { return true } - if err == ErrFileIsWriting { + if err == ErrFileIsWriting || err == ErrEntityTooLarge { return true } _, ok := err.(*CapacityError) diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 7166073..283bc10 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -12,6 +12,8 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/utils" + setutils "github.com/TeaOSLab/EdgeNode/internal/utils/sets" + "github.com/TeaOSLab/EdgeNode/internal/utils/sizes" "github.com/TeaOSLab/EdgeNode/internal/zero" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/logs" @@ -44,7 +46,8 @@ const ( ) const ( - HotItemSize = 1024 + HotItemSize = 1024 // 热点数据数量 + FileToMemoryMaxSize int64 = 32 * sizes.M // 可以从文件写入到内存的最大文件尺寸 ) var sharedWritingFileKeyMap = map[string]zero.Zero{} // key => bool @@ -68,6 +71,8 @@ type FileStorage struct { lastHotSize int hotTicker *utils.Ticker + ignoreKeys *setutils.FixedSet + openFileCache *OpenFileCache } @@ -76,6 +81,7 @@ func NewFileStorage(policy *serverconfigs.HTTPCachePolicy) *FileStorage { policy: policy, hotMap: map[string]*HotItem{}, lastHotSize: -1, + ignoreKeys: setutils.NewFixedSet(32768), } } @@ -297,7 +303,7 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool, // 增加点击量 // 1/1000采样 - if !isPartial && allowMemory { + if !isPartial && allowMemory && reader.BodySize() < FileToMemoryMaxSize { this.increaseHit(key, hash, reader) } @@ -306,11 +312,20 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool, } // OpenWriter 打开缓存文件等待写入 -func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, size int64, isPartial bool) (Writer, error) { +func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, size int64, maxSize int64, isPartial bool) (Writer, error) { + // 是否已忽略 + if this.ignoreKeys.Has(key) { + return nil, ErrEntityTooLarge + } + // 先尝试内存缓存 // 我们限定仅小文件优先存在内存中 - if !isPartial && this.memoryStorage != nil && size > 0 && size < 32*1024*1024 { - writer, err := this.memoryStorage.OpenWriter(key, expiredAt, status, size, false) + var maxMemorySize = FileToMemoryMaxSize + if maxSize > maxMemorySize { + maxMemorySize = maxSize + } + if !isPartial && this.memoryStorage != nil && ((size > 0 && size < maxMemorySize) || size < 0) { + writer, err := this.memoryStorage.OpenWriter(key, expiredAt, status, size, maxMemorySize, false) if err == nil { return writer, nil } @@ -499,7 +514,7 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, siz sharedWritingFileKeyLocker.Unlock() }), nil } else { - return NewFileWriter(writer, key, expiredAt, func() { + return NewFileWriter(this, writer, key, expiredAt, -1, func() { sharedWritingFileKeyLocker.Lock() delete(sharedWritingFileKeyMap, key) sharedWritingFileKeyLocker.Unlock() @@ -689,6 +704,8 @@ func (this *FileStorage) Stop() { if this.openFileCache != nil { this.openFileCache.CloseAll() } + + this.ignoreKeys.Reset() } // TotalDiskSize 消耗的磁盘尺寸 @@ -704,6 +721,11 @@ func (this *FileStorage) TotalMemorySize() int64 { return this.memoryStorage.TotalMemorySize() } +// IgnoreKey 忽略某个Key,即不缓存某个Key +func (this *FileStorage) IgnoreKey(key string) { + this.ignoreKeys.Push(key) +} + // 绝对路径 func (this *FileStorage) dir() string { return this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" @@ -929,7 +951,7 @@ func (this *FileStorage) hotLoop() { continue } - writer, err := this.memoryStorage.openWriter(item.Key, item.ExpiresAt, item.Status, reader.BodySize(), false) + writer, err := this.memoryStorage.openWriter(item.Key, item.ExpiresAt, item.Status, reader.BodySize(), -1, false) if err != nil { if !CanIgnoreErr(err) { remotelogs.Error("CACHE", "transfer hot item failed: "+err.Error()) diff --git a/internal/caches/storage_file_test.go b/internal/caches/storage_file_test.go index 7cd24d3..376d44c 100644 --- a/internal/caches/storage_file_test.go +++ b/internal/caches/storage_file_test.go @@ -62,7 +62,7 @@ func TestFileStorage_OpenWriter(t *testing.T) { header := []byte("Header") body := []byte("This is Body") - writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200, -1, false) + writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200, -1, -1, false) if err != nil { t.Fatal(err) } @@ -100,7 +100,7 @@ func TestFileStorage_OpenWriter_Partial(t *testing.T) { t.Fatal(err) } - writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200, -1, true) + writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200, -1, -1, true) if err != nil { t.Fatal(err) } @@ -139,7 +139,7 @@ func TestFileStorage_OpenWriter_HTTP(t *testing.T) { t.Log(time.Since(now).Seconds()*1000, "ms") }() - writer, err := storage.OpenWriter("my-http-response", time.Now().Unix()+86400, 200, -1, false) + writer, err := storage.OpenWriter("my-http-response", time.Now().Unix()+86400, 200, -1, -1, false) if err != nil { t.Fatal(err) } @@ -212,7 +212,7 @@ func TestFileStorage_Concurrent_Open_DifferentFile(t *testing.T) { go func(i int) { defer wg.Done() - writer, err := storage.OpenWriter("abc"+strconv.Itoa(i), time.Now().Unix()+3600, 200, -1, false) + writer, err := storage.OpenWriter("abc"+strconv.Itoa(i), time.Now().Unix()+3600, 200, -1, -1, false) if err != nil { if err != ErrFileIsWriting { t.Error(err) @@ -267,7 +267,7 @@ func TestFileStorage_Concurrent_Open_SameFile(t *testing.T) { go func(i int) { defer wg.Done() - writer, err := storage.OpenWriter("abc"+strconv.Itoa(0), time.Now().Unix()+3600, 200, -1, false) + writer, err := storage.OpenWriter("abc"+strconv.Itoa(0), time.Now().Unix()+3600, 200, -1, -1, false) if err != nil { if err != ErrFileIsWriting { t.Error(err) diff --git a/internal/caches/storage_interface.go b/internal/caches/storage_interface.go index 98d4f0b..01a3fb6 100644 --- a/internal/caches/storage_interface.go +++ b/internal/caches/storage_interface.go @@ -13,7 +13,8 @@ type StorageInterface interface { OpenReader(key string, useStale bool, isPartial bool) (reader Reader, err error) // OpenWriter 打开缓存写入器等待写入 - OpenWriter(key string, expiredAt int64, status int, size int64, isPartial bool) (Writer, error) + // size 和 maxSize 可能为-1 + OpenWriter(key string, expiredAt int64, status int, size int64, maxSize int64, isPartial bool) (Writer, error) // Delete 删除某个键值对应的缓存 Delete(key string) error @@ -41,4 +42,7 @@ type StorageInterface interface { // AddToList 将缓存添加到列表 AddToList(item *Item) + + // IgnoreKey 忽略某个Key,即不缓存某个Key + IgnoreKey(key string) } diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index 5b98339..aed851e 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -7,6 +7,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/utils" + setutils "github.com/TeaOSLab/EdgeNode/internal/utils/sets" "github.com/TeaOSLab/EdgeNode/internal/zero" "github.com/cespare/xxhash" "github.com/iwind/TeaGo/rands" @@ -46,6 +47,8 @@ type MemoryStorage struct { totalSize int64 writingKeyMap map[string]zero.Zero // key => bool + + ignoreKeys *setutils.FixedSet } func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy, parentStorage StorageInterface) *MemoryStorage { @@ -65,6 +68,7 @@ func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy, parentStorage Stora valuesMap: map[uint64]*MemoryItem{}, dirtyChan: dirtyChan, writingKeyMap: map[string]zero.Zero{}, + ignoreKeys: setutils.NewFixedSet(32768), } } @@ -145,15 +149,19 @@ func (this *MemoryStorage) OpenReader(key string, useStale bool, isPartial bool) } // OpenWriter 打开缓存写入器等待写入 -func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int, size int64, isPartial bool) (Writer, error) { +func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int, size int64, maxSize int64, isPartial bool) (Writer, error) { + if this.ignoreKeys.Has(key) { + return nil, ErrEntityTooLarge + } + // TODO 内存缓存暂时不支持分块内容存储 if isPartial { return nil, ErrFileIsWriting } - return this.openWriter(key, expiredAt, status, size, true) + return this.openWriter(key, expiredAt, status, size, maxSize, true) } -func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, size int64, isDirty bool) (Writer, error) { +func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, size int64, maxSize int64, isDirty bool) (Writer, error) { this.locker.Lock() defer this.locker.Unlock() @@ -200,7 +208,7 @@ func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, s } isWriting = true - return NewMemoryWriter(this, key, expiredAt, status, isDirty, func() { + return NewMemoryWriter(this, key, expiredAt, status, isDirty, maxSize, func() { this.locker.Lock() delete(this.writingKeyMap, key) this.locker.Unlock() @@ -277,6 +285,8 @@ func (this *MemoryStorage) Stop() { this.locker.Unlock() + this.ignoreKeys.Reset() + // 回收内存 runtime.GC() @@ -305,6 +315,11 @@ func (this *MemoryStorage) TotalMemorySize() int64 { return atomic.LoadInt64(&this.totalSize) } +// IgnoreKey 忽略某个Key,即不缓存某个Key +func (this *MemoryStorage) IgnoreKey(key string) { + this.ignoreKeys.Push(key) +} + // 计算Key Hash func (this *MemoryStorage) hash(key string) uint64 { return xxhash.Sum64String(key) @@ -391,7 +406,7 @@ func (this *MemoryStorage) flushItem(key string) { return } - writer, err := this.parentStorage.OpenWriter(key, item.ExpiredAt, item.Status, -1, false) + writer, err := this.parentStorage.OpenWriter(key, item.ExpiredAt, item.Status, -1, -1, false) if err != nil { if !CanIgnoreErr(err) { remotelogs.Error("CACHE", "flush items failed: open writer failed: "+err.Error()) diff --git a/internal/caches/storage_memory_test.go b/internal/caches/storage_memory_test.go index 38c5c1d..27110fe 100644 --- a/internal/caches/storage_memory_test.go +++ b/internal/caches/storage_memory_test.go @@ -15,7 +15,7 @@ import ( func TestMemoryStorage_OpenWriter(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) - writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1, false) + writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1, -1, false) if err != nil { t.Fatal(err) } @@ -62,7 +62,7 @@ func TestMemoryStorage_OpenWriter(t *testing.T) { } } - writer, err = storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1, false) + writer, err = storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1, -1, false) if err != nil { t.Fatal(err) } @@ -103,7 +103,7 @@ func TestMemoryStorage_OpenReaderLock(t *testing.T) { func TestMemoryStorage_Delete(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) { - writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1, false) + writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1, -1, false) if err != nil { t.Fatal(err) } @@ -111,7 +111,7 @@ func TestMemoryStorage_Delete(t *testing.T) { t.Log(len(storage.valuesMap)) } { - writer, err := storage.OpenWriter("abc1", time.Now().Unix()+60, 200, -1, false) + writer, err := storage.OpenWriter("abc1", time.Now().Unix()+60, 200, -1, -1, false) if err != nil { t.Fatal(err) } @@ -126,7 +126,7 @@ func TestMemoryStorage_Stat(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) expiredAt := time.Now().Unix() + 60 { - writer, err := storage.OpenWriter("abc", expiredAt, 200, -1, false) + writer, err := storage.OpenWriter("abc", expiredAt, 200, -1, -1, false) if err != nil { t.Fatal(err) } @@ -139,7 +139,7 @@ func TestMemoryStorage_Stat(t *testing.T) { }) } { - writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1, false) + writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1, -1, false) if err != nil { t.Fatal(err) } @@ -163,7 +163,7 @@ func TestMemoryStorage_CleanAll(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) expiredAt := time.Now().Unix() + 60 { - writer, err := storage.OpenWriter("abc", expiredAt, 200, -1, false) + writer, err := storage.OpenWriter("abc", expiredAt, 200, -1, -1, false) if err != nil { t.Fatal(err) } @@ -175,7 +175,7 @@ func TestMemoryStorage_CleanAll(t *testing.T) { }) } { - writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1, false) + writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1, -1, false) if err != nil { t.Fatal(err) } @@ -198,7 +198,7 @@ func TestMemoryStorage_Purge(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) expiredAt := time.Now().Unix() + 60 { - writer, err := storage.OpenWriter("abc", expiredAt, 200, -1, false) + writer, err := storage.OpenWriter("abc", expiredAt, 200, -1, -1, false) if err != nil { t.Fatal(err) } @@ -210,7 +210,7 @@ func TestMemoryStorage_Purge(t *testing.T) { }) } { - writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1, false) + writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1, -1, false) if err != nil { t.Fatal(err) } @@ -241,7 +241,7 @@ func TestMemoryStorage_Expire(t *testing.T) { for i := 0; i < 1000; i++ { expiredAt := time.Now().Unix() + int64(rands.Int(0, 60)) key := "abc" + strconv.Itoa(i) - writer, err := storage.OpenWriter(key, expiredAt, 200, -1, false) + writer, err := storage.OpenWriter(key, expiredAt, 200, -1, -1, false) if err != nil { t.Fatal(err) } diff --git a/internal/caches/writer_file.go b/internal/caches/writer_file.go index 5a34c68..8c49496 100644 --- a/internal/caches/writer_file.go +++ b/internal/caches/writer_file.go @@ -11,20 +11,24 @@ import ( ) type FileWriter struct { + storage StorageInterface rawWriter *os.File key string headerSize int64 bodySize int64 expiredAt int64 + maxSize int64 endFunc func() once sync.Once } -func NewFileWriter(rawWriter *os.File, key string, expiredAt int64, endFunc func()) *FileWriter { +func NewFileWriter(storage StorageInterface, rawWriter *os.File, key string, expiredAt int64, maxSize int64, endFunc func()) *FileWriter { return &FileWriter{ + storage: storage, key: key, rawWriter: rawWriter, expiredAt: expiredAt, + maxSize: maxSize, endFunc: endFunc, } } @@ -60,6 +64,15 @@ func (this *FileWriter) WriteHeaderLength(headerLength int) error { func (this *FileWriter) Write(data []byte) (n int, err error) { n, err = this.rawWriter.Write(data) this.bodySize += int64(n) + + if this.maxSize > 0 && this.bodySize > this.maxSize { + err = ErrEntityTooLarge + + if this.storage != nil { + this.storage.IgnoreKey(this.key) + } + } + if err != nil { _ = this.Discard() } diff --git a/internal/caches/writer_memory.go b/internal/caches/writer_memory.go index 9b675f8..2a1c630 100644 --- a/internal/caches/writer_memory.go +++ b/internal/caches/writer_memory.go @@ -16,6 +16,7 @@ type MemoryWriter struct { bodySize int64 status int isDirty bool + maxSize int64 hash uint64 item *MemoryItem @@ -23,7 +24,7 @@ type MemoryWriter struct { once sync.Once } -func NewMemoryWriter(memoryStorage *MemoryStorage, key string, expiredAt int64, status int, isDirty bool, endFunc func()) *MemoryWriter { +func NewMemoryWriter(memoryStorage *MemoryStorage, key string, expiredAt int64, status int, isDirty bool, maxSize int64, endFunc func()) *MemoryWriter { w := &MemoryWriter{ storage: memoryStorage, key: key, @@ -35,6 +36,7 @@ func NewMemoryWriter(memoryStorage *MemoryStorage, key string, expiredAt int64, }, status: status, isDirty: isDirty, + maxSize: maxSize, endFunc: endFunc, } w.hash = w.calculateHash(key) @@ -53,6 +55,14 @@ func (this *MemoryWriter) WriteHeader(data []byte) (n int, err error) { func (this *MemoryWriter) Write(data []byte) (n int, err error) { this.bodySize += int64(len(data)) this.item.BodyValue = append(this.item.BodyValue, data...) + + // 检查尺寸 + if this.maxSize > 0 && this.bodySize > this.maxSize { + err = ErrEntityTooLarge + this.storage.IgnoreKey(this.key) + return len(data), err + } + return len(data), nil } diff --git a/internal/nodes/api_stream.go b/internal/nodes/api_stream.go index 069da6a..cf174b8 100644 --- a/internal/nodes/api_stream.go +++ b/internal/nodes/api_stream.go @@ -183,7 +183,7 @@ func (this *APIStream) handleWriteCache(message *pb.NodeStreamMessage) error { } expiredAt := time.Now().Unix() + msg.LifeSeconds - writer, err := storage.OpenWriter(msg.Key, expiredAt, 200, int64(len(msg.Value)), false) + writer, err := storage.OpenWriter(msg.Key, expiredAt, 200, int64(len(msg.Value)), -1, false) if err != nil { this.replyFail(message.RequestId, "prepare writing failed: "+err.Error()) return err @@ -472,7 +472,7 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error { } expiredAt := time.Now().Unix() + 8600 - writer, err := storage.OpenWriter(key, expiredAt, 200, resp.ContentLength, false) // TODO 可以设置缓存过期时间 + writer, err := storage.OpenWriter(key, expiredAt, 200, resp.ContentLength, -1, false) // TODO 可以设置缓存过期时间 if err != nil { locker.Lock() errorMessages = append(errorMessages, "open cache writer failed: "+key+": "+err.Error()) diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index 5b7e710..ae83891 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -286,8 +286,12 @@ func (this *HTTPWriter) PrepareCache(resp *http.Response, size int64) { if this.isPartial { cacheKey += caches.SuffixPartial } - cacheWriter, err := storage.OpenWriter(cacheKey, expiredAt, this.StatusCode(), size, this.isPartial) + cacheWriter, err := storage.OpenWriter(cacheKey, expiredAt, this.StatusCode(), size, cacheRef.MaxSizeBytes(), this.isPartial) if err != nil { + if err == caches.ErrEntityTooLarge && addStatusHeader { + this.Header().Set("X-Cache", "BYPASS, entity too large") + } + if !caches.CanIgnoreErr(err) { remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error()) } @@ -556,8 +560,10 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) { // compression cache writer // 只有在本身内容已经缓存的情况下才会写入缓存,防止同时写入缓存导致IO负载升高 + var cacheRef = this.req.cacheRef if !this.isPartial && this.cacheStorage != nil && + cacheRef != nil && (this.cacheReader != nil || (this.cacheStorage.Policy().SyncCompressionCache && this.cacheWriter != nil)) && !this.webpIsEncoding { var cacheKey = "" @@ -575,7 +581,7 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) { cacheKey += this.cacheReaderSuffix } - compressionCacheWriter, err := this.cacheStorage.OpenWriter(cacheKey+caches.SuffixCompression+compressionEncoding, expiredAt, this.StatusCode(), -1, false) + compressionCacheWriter, err := this.cacheStorage.OpenWriter(cacheKey+caches.SuffixCompression+compressionEncoding, expiredAt, this.StatusCode(), -1, cacheRef.MaxSizeBytes(), false) if err != nil { return } @@ -788,7 +794,7 @@ func (this *HTTPWriter) Close() { expiredAt = this.cacheWriter.ExpiredAt() } - webpCacheWriter, _ = this.cacheStorage.OpenWriter(cacheKey, expiredAt, this.StatusCode(), -1, false) + webpCacheWriter, _ = this.cacheStorage.OpenWriter(cacheKey, expiredAt, this.StatusCode(), -1, -1, false) if webpCacheWriter != nil { // 写入Header for k, v := range this.Header() { diff --git a/internal/utils/sets/set_fixed.go b/internal/utils/sets/set_fixed.go new file mode 100644 index 0000000..9a1e189 --- /dev/null +++ b/internal/utils/sets/set_fixed.go @@ -0,0 +1,64 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package setutils + +import ( + "github.com/TeaOSLab/EdgeNode/internal/zero" + "sync" +) + +type FixedSet struct { + maxSize int + locker sync.RWMutex + + m map[interface{}]zero.Zero + keys []interface{} +} + +func NewFixedSet(maxSize int) *FixedSet { + if maxSize <= 0 { + maxSize = 1024 + } + return &FixedSet{ + maxSize: maxSize, + m: map[interface{}]zero.Zero{}, + } +} + +func (this *FixedSet) Push(item interface{}) { + this.locker.Lock() + _, ok := this.m[item] + if !ok { + // 是否已满 + if len(this.keys) == this.maxSize { + var firstKey = this.keys[0] + this.keys = this.keys[1:] + delete(this.m, firstKey) + } + + this.m[item] = zero.New() + this.keys = append(this.keys, item) + } + this.locker.Unlock() +} + +func (this *FixedSet) Has(item interface{}) bool { + this.locker.RLock() + defer this.locker.RUnlock() + + _, ok := this.m[item] + return ok +} + +func (this *FixedSet) Size() int { + this.locker.RLock() + defer this.locker.RUnlock() + return len(this.keys) +} + +func (this *FixedSet) Reset() { + this.locker.Lock() + this.m = map[interface{}]zero.Zero{} + this.keys = nil + this.locker.Unlock() +} diff --git a/internal/utils/sets/set_fixed_test.go b/internal/utils/sets/set_fixed_test.go new file mode 100644 index 0000000..b3f7d2a --- /dev/null +++ b/internal/utils/sets/set_fixed_test.go @@ -0,0 +1,57 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package setutils_test + +import ( + setutils "github.com/TeaOSLab/EdgeNode/internal/utils/sets" + "github.com/iwind/TeaGo/assert" + "testing" +) + +func TestNewFixedSet(t *testing.T) { + var a = assert.NewAssertion(t) + + { + var set = setutils.NewFixedSet(0) + set.Push(1) + set.Push(2) + set.Push(2) + a.IsTrue(set.Size() == 2) + a.IsTrue(set.Has(1)) + a.IsTrue(set.Has(2)) + } + + { + var set = setutils.NewFixedSet(1) + set.Push(1) + set.Push(2) + set.Push(3) + a.IsTrue(set.Size() == 1) + a.IsFalse(set.Has(1)) + a.IsTrue(set.Has(3)) + a.IsFalse(set.Has(4)) + } +} + +func TestFixedSet_Reset(t *testing.T) { + var a = assert.NewAssertion(t) + + var set = setutils.NewFixedSet(3) + set.Push(1) + set.Push(2) + set.Push(3) + set.Reset() + a.IsTrue(set.Size() == 0) +} + +func BenchmarkFixedSet_Has(b *testing.B) { + var count = 100_000 + var set = setutils.NewFixedSet(count) + for i := 0; i < count; i++ { + set.Push(i) + } + + for i := 0; i < b.N; i++ { + set.Has(i) + } +} diff --git a/internal/utils/sizes/sizes.go b/internal/utils/sizes/sizes.go new file mode 100644 index 0000000..d804dee --- /dev/null +++ b/internal/utils/sizes/sizes.go @@ -0,0 +1,10 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package sizes + +const ( + K = 1024 + M = 1024 * K + G = 1024 * M + T = 1024 * G +) diff --git a/internal/utils/sizes/sizes_test.go b/internal/utils/sizes/sizes_test.go new file mode 100644 index 0000000..35334f5 --- /dev/null +++ b/internal/utils/sizes/sizes_test.go @@ -0,0 +1,17 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package sizes_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/utils/sizes" + "github.com/iwind/TeaGo/assert" + "testing" +) + +func TestSizes(t *testing.T) { + var a = assert.NewAssertion(t) + a.IsTrue(sizes.K == 1024) + a.IsTrue(sizes.M == 1024*1024) + a.IsTrue(sizes.G == 1024*1024*1024) + a.IsTrue(sizes.T == 1024*1024*1024*1024) +}