From 00cce8572fbb024f9adc25d9f2a55944570b840a Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Sat, 19 Nov 2022 17:23:45 +0800 Subject: [PATCH] =?UTF-8?q?=E5=87=8F=E5=B0=91=E6=96=87=E4=BB=B6=E7=BC=93?= =?UTF-8?q?=E5=AD=98=E5=86=99=E5=85=A5=E6=AC=A1=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/caches/reader_file_test.go | 4 +- internal/caches/storage_file.go | 35 ++++++++++++------ internal/caches/storage_file_test.go | 14 +++---- internal/caches/storage_interface.go | 4 +- internal/caches/storage_memory.go | 18 ++++----- internal/caches/storage_memory_test.go | 22 +++++------ internal/caches/writer_file.go | 51 ++++++++++++++++---------- internal/nodes/api_stream.go | 2 +- internal/nodes/http_writer.go | 16 ++++++-- 9 files changed, 101 insertions(+), 65 deletions(-) diff --git a/internal/caches/reader_file_test.go b/internal/caches/reader_file_test.go index 9b1b6ed..19816a9 100644 --- a/internal/caches/reader_file_test.go +++ b/internal/caches/reader_file_test.go @@ -19,7 +19,7 @@ func TestFileReader(t *testing.T) { if err != nil { t.Fatal(err) } - _, path := storage.keyPath("my-key") + _, path, _ := storage.keyPath("my-key") fp, err := os.Open(path) if err != nil { @@ -105,7 +105,7 @@ func TestFileReader_Range(t *testing.T) { } _ = writer.Close()**/ - _, path := storage.keyPath("my-number") + _, path, _ := storage.keyPath("my-number") fp, err := os.Open(path) if err != nil { diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index bf7dac7..7dccc00 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -49,8 +49,7 @@ const ( SizeBodyLength = 8 OffsetBodyLength = OffsetHeaderLength + SizeHeaderLength - SizeMeta = SizeExpiresAt + SizeStatus + SizeURLLength + SizeHeaderLength + SizeBodyLength - OffsetKey = SizeMeta + SizeMeta = SizeExpiresAt + SizeStatus + SizeURLLength + SizeHeaderLength + SizeBodyLength ) const ( @@ -415,16 +414,16 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool, } // OpenWriter 打开缓存文件等待写入 -func (this *FileStorage) OpenWriter(key string, expiresAt int64, status int, size int64, maxSize int64, isPartial bool) (Writer, error) { - return this.openWriter(key, expiresAt, status, size, maxSize, isPartial, false) +func (this *FileStorage) OpenWriter(key string, expiresAt int64, status int, headerSize int, bodySize int64, maxSize int64, isPartial bool) (Writer, error) { + return this.openWriter(key, expiresAt, status, headerSize, bodySize, maxSize, isPartial, false) } // OpenFlushWriter 打开从其他媒介直接刷入的写入器 -func (this *FileStorage) OpenFlushWriter(key string, expiresAt int64, status int) (Writer, error) { - return this.openWriter(key, expiresAt, status, -1, -1, false, true) +func (this *FileStorage) OpenFlushWriter(key string, expiresAt int64, status int, headerSize int, bodySize int64) (Writer, error) { + return this.openWriter(key, expiresAt, status, headerSize, bodySize, -1, false, true) } -func (this *FileStorage) openWriter(key string, expiredAt int64, status int, size int64, maxSize int64, isPartial bool, isFlushing bool) (Writer, error) { +func (this *FileStorage) openWriter(key string, expiredAt int64, status int, headerSize int, bodySize int64, maxSize int64, isPartial bool, isFlushing bool) (Writer, error) { // 是否正在退出 if teaconst.IsQuiting { return nil, ErrWritingUnavailable @@ -442,8 +441,8 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz maxMemorySize = maxSize } var memoryStorage = this.memoryStorage - if !isFlushing && !isPartial && memoryStorage != nil && ((size > 0 && size < maxMemorySize) || size < 0) { - writer, err := memoryStorage.OpenWriter(key, expiredAt, status, size, maxMemorySize, false) + if !isFlushing && !isPartial && memoryStorage != nil && ((bodySize > 0 && bodySize < maxMemorySize) || bodySize < 0) { + writer, err := memoryStorage.OpenWriter(key, expiredAt, status, headerSize, bodySize, maxMemorySize, false) if err == nil { return writer, nil } @@ -608,6 +607,8 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz return nil, ErrFileIsWriting } + var metaBodySize int64 = -1 + var metaHeaderSize int = -1 if isNewCreated { // 写入meta // 从v0.5.8开始不再在meta中写入Key @@ -620,6 +621,18 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz } copy(metaBytes[OffsetStatus:], strconv.Itoa(status)) + // 写入Header Length + if headerSize > 0 { + binary.BigEndian.PutUint32(metaBytes[OffsetHeaderLength:], uint32(headerSize)) + metaHeaderSize = headerSize + } + + // 写入Body Length + if bodySize > 0 { + binary.BigEndian.PutUint64(metaBytes[OffsetBodyLength:], uint64(bodySize)) + metaBodySize = bodySize + } + _, err = writer.Write(metaBytes) if err != nil { return nil, err @@ -642,7 +655,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz sharedWritingFileKeyLocker.Unlock() }), nil } else { - return NewFileWriter(this, writer, key, expiredAt, -1, func() { + return NewFileWriter(this, writer, key, expiredAt, metaHeaderSize, metaBodySize, -1, func() { sharedWritingFileKeyLocker.Lock() delete(sharedWritingFileKeyMap, key) if len(sharedWritingFileKeyMap) == 0 { @@ -1140,7 +1153,7 @@ func (this *FileStorage) hotLoop() { expiresAt = bestExpiresAt } - writer, err := memoryStorage.openWriter(item.Key, expiresAt, reader.Status(), reader.BodySize(), -1, false) + writer, err := memoryStorage.openWriter(item.Key, expiresAt, reader.Status(), types.Int(reader.HeaderSize()), reader.BodySize(), -1, false) if err != nil { if !CanIgnoreErr(err) { remotelogs.Error("CACHE", "transfer hot item failed: "+err.Error()) diff --git a/internal/caches/storage_file_test.go b/internal/caches/storage_file_test.go index 77ee176..f538c62 100644 --- a/internal/caches/storage_file_test.go +++ b/internal/caches/storage_file_test.go @@ -62,7 +62,7 @@ func TestFileStorage_OpenWriter(t *testing.T) { header := []byte("Header") body := []byte("This is Body") - writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200, -1, -1, false) + writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200, -1, -1, -1, false) if err != nil { t.Fatal(err) } @@ -100,7 +100,7 @@ func TestFileStorage_OpenWriter_Partial(t *testing.T) { t.Fatal(err) } - writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200, -1, -1, true) + writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200, -1, -1, -1, true) if err != nil { t.Fatal(err) } @@ -139,7 +139,7 @@ func TestFileStorage_OpenWriter_HTTP(t *testing.T) { t.Log(time.Since(now).Seconds()*1000, "ms") }() - writer, err := storage.OpenWriter("my-http-response", time.Now().Unix()+86400, 200, -1, -1, false) + writer, err := storage.OpenWriter("my-http-response", time.Now().Unix()+86400, 200, -1, -1, -1, false) if err != nil { t.Fatal(err) } @@ -212,7 +212,7 @@ func TestFileStorage_Concurrent_Open_DifferentFile(t *testing.T) { go func(i int) { defer wg.Done() - writer, err := storage.OpenWriter("abc"+strconv.Itoa(i), time.Now().Unix()+3600, 200, -1, -1, false) + writer, err := storage.OpenWriter("abc"+strconv.Itoa(i), time.Now().Unix()+3600, 200, -1, -1, -1, false) if err != nil { if err != ErrFileIsWriting { t.Error(err) @@ -267,7 +267,7 @@ func TestFileStorage_Concurrent_Open_SameFile(t *testing.T) { go func(i int) { defer wg.Done() - writer, err := storage.OpenWriter("abc"+strconv.Itoa(0), time.Now().Unix()+3600, 200, -1, -1, false) + writer, err := storage.OpenWriter("abc"+strconv.Itoa(0), time.Now().Unix()+3600, 200, -1, -1, -1, false) if err != nil { if err != ErrFileIsWriting { t.Error(err) @@ -522,7 +522,7 @@ func TestFileStorage_DecodeFile(t *testing.T) { if err != nil { t.Fatal(err) } - _, path := storage.keyPath("my-key") + _, path, _ := storage.keyPath("my-key") t.Log(path) } @@ -569,6 +569,6 @@ func BenchmarkFileStorage_KeyPath(b *testing.B) { } for i := 0; i < b.N; i++ { - _, _ = storage.keyPath(strconv.Itoa(i)) + _, _, _ = storage.keyPath(strconv.Itoa(i)) } } diff --git a/internal/caches/storage_interface.go b/internal/caches/storage_interface.go index ef37092..bcbd1ec 100644 --- a/internal/caches/storage_interface.go +++ b/internal/caches/storage_interface.go @@ -14,10 +14,10 @@ type StorageInterface interface { // OpenWriter 打开缓存写入器等待写入 // size 和 maxSize 可能为-1 - OpenWriter(key string, expiresAt int64, status int, size int64, maxSize int64, isPartial bool) (Writer, error) + OpenWriter(key string, expiresAt int64, status int, headerSize int, bodySize int64, maxSize int64, isPartial bool) (Writer, error) // OpenFlushWriter 打开从其他媒介直接刷入的写入器 - OpenFlushWriter(key string, expiresAt int64, status int) (Writer, error) + OpenFlushWriter(key string, expiresAt int64, status int, headerSize int, bodySize int64) (Writer, error) // Delete 删除某个键值对应的缓存 Delete(key string) error diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index f0acc6c..c5a0af7 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -149,7 +149,7 @@ func (this *MemoryStorage) OpenReader(key string, useStale bool, isPartial bool) } // OpenWriter 打开缓存写入器等待写入 -func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int, size int64, maxSize int64, isPartial bool) (Writer, error) { +func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int, headerSize int, bodySize int64, maxSize int64, isPartial bool) (Writer, error) { if this.ignoreKeys.Has(key) { return nil, ErrEntityTooLarge } @@ -158,15 +158,15 @@ func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int, s if isPartial { return nil, ErrFileIsWriting } - return this.openWriter(key, expiredAt, status, size, maxSize, true) + return this.openWriter(key, expiredAt, status, headerSize, bodySize, maxSize, true) } // OpenFlushWriter 打开从其他媒介直接刷入的写入器 -func (this *MemoryStorage) OpenFlushWriter(key string, expiresAt int64, status int) (Writer, error) { - return this.openWriter(key, expiresAt, status, -1, -1, true) +func (this *MemoryStorage) OpenFlushWriter(key string, expiresAt int64, status int, headerSize int, bodySize int64) (Writer, error) { + return this.openWriter(key, expiresAt, status, headerSize, bodySize, -1, true) } -func (this *MemoryStorage) openWriter(key string, expiresAt int64, status int, size int64, maxSize int64, isDirty bool) (Writer, error) { +func (this *MemoryStorage) openWriter(key string, expiresAt int64, status int, headerSize int, bodySize int64, maxSize int64, isDirty bool) (Writer, error) { // 待写入队列是否已满 if isDirty && this.parentStorage != nil && @@ -207,10 +207,10 @@ func (this *MemoryStorage) openWriter(key string, expiresAt int64, status int, s return nil, NewCapacityError("write memory cache failed: too many keys in cache storage") } capacityBytes := this.memoryCapacityBytes() - if size < 0 { - size = 0 + if bodySize < 0 { + bodySize = 0 } - if capacityBytes > 0 && capacityBytes <= this.totalSize+size { + if capacityBytes > 0 && capacityBytes <= this.totalSize+bodySize { return nil, NewCapacityError("write memory cache failed: over memory size: " + strconv.FormatInt(capacityBytes, 10) + ", current size: " + strconv.FormatInt(this.totalSize, 10) + " bytes") } @@ -481,7 +481,7 @@ func (this *MemoryStorage) flushItem(key string) { return } - writer, err := this.parentStorage.OpenFlushWriter(key, item.ExpiresAt, item.Status) + writer, err := this.parentStorage.OpenFlushWriter(key, item.ExpiresAt, item.Status, len(item.HeaderValue), int64(len(item.BodyValue))) if err != nil { if !CanIgnoreErr(err) { remotelogs.Error("CACHE", "flush items failed: open writer failed: "+err.Error()) diff --git a/internal/caches/storage_memory_test.go b/internal/caches/storage_memory_test.go index 21eee0d..c2d1630 100644 --- a/internal/caches/storage_memory_test.go +++ b/internal/caches/storage_memory_test.go @@ -16,7 +16,7 @@ import ( func TestMemoryStorage_OpenWriter(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) - writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1, -1, false) + writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1, -1, -1, false) if err != nil { t.Fatal(err) } @@ -63,7 +63,7 @@ func TestMemoryStorage_OpenWriter(t *testing.T) { } } - writer, err = storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1, -1, false) + writer, err = storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1, -1, -1, false) if err != nil { t.Fatal(err) } @@ -104,7 +104,7 @@ func TestMemoryStorage_OpenReaderLock(t *testing.T) { func TestMemoryStorage_Delete(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) { - writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1, -1, false) + writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1, -1, -1, false) if err != nil { t.Fatal(err) } @@ -112,7 +112,7 @@ func TestMemoryStorage_Delete(t *testing.T) { t.Log(len(storage.valuesMap)) } { - writer, err := storage.OpenWriter("abc1", time.Now().Unix()+60, 200, -1, -1, false) + writer, err := storage.OpenWriter("abc1", time.Now().Unix()+60, 200, -1, -1, -1, false) if err != nil { t.Fatal(err) } @@ -127,7 +127,7 @@ func TestMemoryStorage_Stat(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) expiredAt := time.Now().Unix() + 60 { - writer, err := storage.OpenWriter("abc", expiredAt, 200, -1, -1, false) + writer, err := storage.OpenWriter("abc", expiredAt, 200, -1, -1, -1, false) if err != nil { t.Fatal(err) } @@ -140,7 +140,7 @@ func TestMemoryStorage_Stat(t *testing.T) { }) } { - writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1, -1, false) + writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1, -1, -1, false) if err != nil { t.Fatal(err) } @@ -164,7 +164,7 @@ func TestMemoryStorage_CleanAll(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) expiredAt := time.Now().Unix() + 60 { - writer, err := storage.OpenWriter("abc", expiredAt, 200, -1, -1, false) + writer, err := storage.OpenWriter("abc", expiredAt, 200, -1, -1, -1, false) if err != nil { t.Fatal(err) } @@ -176,7 +176,7 @@ func TestMemoryStorage_CleanAll(t *testing.T) { }) } { - writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1, -1, false) + writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1, -1, -1, false) if err != nil { t.Fatal(err) } @@ -199,7 +199,7 @@ func TestMemoryStorage_Purge(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) expiredAt := time.Now().Unix() + 60 { - writer, err := storage.OpenWriter("abc", expiredAt, 200, -1, -1, false) + writer, err := storage.OpenWriter("abc", expiredAt, 200, -1, -1, -1, false) if err != nil { t.Fatal(err) } @@ -211,7 +211,7 @@ func TestMemoryStorage_Purge(t *testing.T) { }) } { - writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1, -1, false) + writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1, -1, -1, false) if err != nil { t.Fatal(err) } @@ -242,7 +242,7 @@ func TestMemoryStorage_Expire(t *testing.T) { for i := 0; i < 1000; i++ { expiredAt := time.Now().Unix() + int64(rands.Int(0, 60)) key := "abc" + strconv.Itoa(i) - writer, err := storage.OpenWriter(key, expiredAt, 200, -1, -1, false) + writer, err := storage.OpenWriter(key, expiredAt, 200, -1, -1, -1, false) if err != nil { t.Fatal(err) } diff --git a/internal/caches/writer_file.go b/internal/caches/writer_file.go index 76b156c..c93a697 100644 --- a/internal/caches/writer_file.go +++ b/internal/caches/writer_file.go @@ -11,25 +11,32 @@ import ( ) type FileWriter struct { - storage StorageInterface - rawWriter *os.File - key string - headerSize int64 - bodySize int64 - expiredAt int64 - maxSize int64 - endFunc func() - once sync.Once + storage StorageInterface + rawWriter *os.File + key string + + metaHeaderSize int + headerSize int64 + + metaBodySize int64 // 写入前的内容长度 + bodySize int64 + + expiredAt int64 + maxSize int64 + endFunc func() + once sync.Once } -func NewFileWriter(storage StorageInterface, rawWriter *os.File, key string, expiredAt int64, maxSize int64, endFunc func()) *FileWriter { +func NewFileWriter(storage StorageInterface, rawWriter *os.File, key string, expiredAt int64, metaHeaderSize int, metaBodySize int64, maxSize int64, endFunc func()) *FileWriter { return &FileWriter{ - storage: storage, - key: key, - rawWriter: rawWriter, - expiredAt: expiredAt, - maxSize: maxSize, - endFunc: endFunc, + storage: storage, + key: key, + rawWriter: rawWriter, + expiredAt: expiredAt, + maxSize: maxSize, + endFunc: endFunc, + metaHeaderSize: metaHeaderSize, + metaBodySize: metaBodySize, } } @@ -45,7 +52,10 @@ func (this *FileWriter) WriteHeader(data []byte) (n int, err error) { // WriteHeaderLength 写入Header长度数据 func (this *FileWriter) WriteHeaderLength(headerLength int) error { - bytes4 := make([]byte, 4) + if this.metaHeaderSize > 0 && this.metaHeaderSize == headerLength { + return nil + } + var bytes4 = make([]byte, 4) binary.BigEndian.PutUint32(bytes4, uint32(headerLength)) _, err := this.rawWriter.Seek(SizeExpiresAt+SizeStatus+SizeURLLength, io.SeekStart) if err != nil { @@ -88,7 +98,10 @@ func (this *FileWriter) WriteAt(offset int64, data []byte) error { // WriteBodyLength 写入Body长度数据 func (this *FileWriter) WriteBodyLength(bodyLength int64) error { - bytes8 := make([]byte, 8) + if this.metaBodySize >= 0 && bodyLength == this.metaBodySize { + return nil + } + var bytes8 = make([]byte, 8) binary.BigEndian.PutUint64(bytes8, uint64(bodyLength)) _, err := this.rawWriter.Seek(SizeExpiresAt+SizeStatus+SizeURLLength+SizeHeaderLength, io.SeekStart) if err != nil { @@ -109,7 +122,7 @@ func (this *FileWriter) Close() error { this.endFunc() }) - path := this.rawWriter.Name() + var path = this.rawWriter.Name() err := this.WriteHeaderLength(types.Int(this.headerSize)) if err != nil { diff --git a/internal/nodes/api_stream.go b/internal/nodes/api_stream.go index 8fa45c7..f53df00 100644 --- a/internal/nodes/api_stream.go +++ b/internal/nodes/api_stream.go @@ -178,7 +178,7 @@ func (this *APIStream) handleWriteCache(message *pb.NodeStreamMessage) error { } expiredAt := time.Now().Unix() + msg.LifeSeconds - writer, err := storage.OpenWriter(msg.Key, expiredAt, 200, int64(len(msg.Value)), -1, false) + writer, err := storage.OpenWriter(msg.Key, expiredAt, 200, -1, int64(len(msg.Value)), -1, false) if err != nil { this.replyFail(message.RequestId, "prepare writing failed: "+err.Error()) return err diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index 6d9a93c..07a905b 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -303,7 +303,7 @@ func (this *HTTPWriter) PrepareCache(resp *http.Response, size int64) { if this.isPartial { cacheKey += caches.SuffixPartial } - cacheWriter, err := storage.OpenWriter(cacheKey, expiresAt, this.StatusCode(), size, cacheRef.MaxSizeBytes(), this.isPartial) + cacheWriter, err := storage.OpenWriter(cacheKey, expiresAt, this.StatusCode(), this.calculateHeaderLength(), size, cacheRef.MaxSizeBytes(), this.isPartial) if err != nil { if err == caches.ErrEntityTooLarge && addStatusHeader { this.Header().Set("X-Cache", "BYPASS, entity too large") @@ -638,7 +638,7 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) { cacheKey += this.cacheReaderSuffix } - compressionCacheWriter, err := this.cacheStorage.OpenWriter(cacheKey+caches.SuffixCompression+compressionEncoding, expiredAt, this.StatusCode(), -1, cacheRef.MaxSizeBytes(), false) + compressionCacheWriter, err := this.cacheStorage.OpenWriter(cacheKey+caches.SuffixCompression+compressionEncoding, expiredAt, this.StatusCode(), this.calculateHeaderLength(), -1, cacheRef.MaxSizeBytes(), false) if err != nil { return } @@ -964,7 +964,7 @@ func (this *HTTPWriter) finishWebP() { expiredAt = this.cacheWriter.ExpiredAt() } - webpCacheWriter, _ = this.cacheStorage.OpenWriter(cacheKey, expiredAt, this.StatusCode(), -1, -1, false) + webpCacheWriter, _ = this.cacheStorage.OpenWriter(cacheKey, expiredAt, this.StatusCode(), -1, -1, -1, false) if webpCacheWriter != nil { // 写入Header for k, v := range this.Header() { @@ -1179,3 +1179,13 @@ func (this *HTTPWriter) finishRequest() { _ = this.rawReader.Close() } } + +// 计算Header长度 +func (this *HTTPWriter) calculateHeaderLength() (result int) { + for k, v := range this.Header() { + for _, v1 := range v { + result += len(k) + 1 /**:**/ + len(v1) + 1 /**\n**/ + } + } + return +}