diff --git a/internal/caches/open_file_pool_test.go b/internal/caches/open_file_pool_test.go index 7930789..a1460d8 100644 --- a/internal/caches/open_file_pool_test.go +++ b/internal/caches/open_file_pool_test.go @@ -11,7 +11,7 @@ func TestOpenFilePool_Get(t *testing.T) { var pool = caches.NewOpenFilePool("a") t.Log(pool.Filename()) t.Log(pool.Get()) - t.Log(pool.Put(caches.NewOpenFile(nil, nil))) + t.Log(pool.Put(caches.NewOpenFile(nil, nil, []byte{}))) t.Log(pool.Get()) t.Log(pool.Get()) } diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 651d9ca..d0e721a 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -325,10 +325,11 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool) } // OpenWriter 打开缓存文件等待写入 -func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) { +func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, size int64) (Writer, error) { // 先尝试内存缓存 - if this.memoryStorage != nil { - writer, err := this.memoryStorage.OpenWriter(key, expiredAt, status) + // 我们限定仅小文件优先存在内存中 + if this.memoryStorage != nil && size > 0 && size < 32*1024*1024 { + writer, err := this.memoryStorage.OpenWriter(key, expiredAt, status, size) if err == nil { return writer, nil } @@ -904,7 +905,7 @@ func (this *FileStorage) hotLoop() { continue } - writer, err := this.memoryStorage.openWriter(item.Key, item.ExpiresAt, item.Status, false) + writer, err := this.memoryStorage.openWriter(item.Key, item.ExpiresAt, item.Status, reader.BodySize(), false) if err != nil { if !CanIgnoreErr(err) { remotelogs.Error("CACHE", "transfer hot item failed: "+err.Error()) diff --git a/internal/caches/storage_file_test.go b/internal/caches/storage_file_test.go index d794df2..e647360 100644 --- a/internal/caches/storage_file_test.go +++ b/internal/caches/storage_file_test.go @@ -62,7 +62,7 @@ func TestFileStorage_OpenWriter(t *testing.T) { header := []byte("Header") body := []byte("This is Body") - writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200) + writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200, -1) if err != nil { t.Fatal(err) } @@ -104,7 +104,7 @@ func TestFileStorage_OpenWriter_HTTP(t *testing.T) { t.Log(time.Since(now).Seconds()*1000, "ms") }() - writer, err := storage.OpenWriter("my-http-response", time.Now().Unix()+86400, 200) + writer, err := storage.OpenWriter("my-http-response", time.Now().Unix()+86400, 200, -1) if err != nil { t.Fatal(err) } @@ -177,7 +177,7 @@ func TestFileStorage_Concurrent_Open_DifferentFile(t *testing.T) { go func(i int) { defer wg.Done() - writer, err := storage.OpenWriter("abc"+strconv.Itoa(i), time.Now().Unix()+3600, 200) + writer, err := storage.OpenWriter("abc"+strconv.Itoa(i), time.Now().Unix()+3600, 200, -1) if err != nil { if err != ErrFileIsWriting { t.Fatal(err) @@ -229,7 +229,7 @@ func TestFileStorage_Concurrent_Open_SameFile(t *testing.T) { go func(i int) { defer wg.Done() - writer, err := storage.OpenWriter("abc"+strconv.Itoa(0), time.Now().Unix()+3600, 200) + writer, err := storage.OpenWriter("abc"+strconv.Itoa(0), time.Now().Unix()+3600, 200, -1) if err != nil { if err != ErrFileIsWriting { t.Fatal(err) diff --git a/internal/caches/storage_interface.go b/internal/caches/storage_interface.go index de10c3c..81e298e 100644 --- a/internal/caches/storage_interface.go +++ b/internal/caches/storage_interface.go @@ -13,7 +13,7 @@ type StorageInterface interface { OpenReader(key string, useStale bool) (reader Reader, err error) // OpenWriter 打开缓存写入器等待写入 - OpenWriter(key string, expiredAt int64, status int) (Writer, error) + OpenWriter(key string, expiredAt int64, status int, size int64) (Writer, error) // Delete 删除某个键值对应的缓存 Delete(key string) error diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index 9d7df80..ce7fd9e 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -145,11 +145,11 @@ func (this *MemoryStorage) OpenReader(key string, useStale bool) (Reader, error) } // OpenWriter 打开缓存写入器等待写入 -func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) { - return this.openWriter(key, expiredAt, status, true) +func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int, size int64) (Writer, error) { + return this.openWriter(key, expiredAt, status, size, true) } -func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, isDirty bool) (Writer, error) { +func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, size int64, isDirty bool) (Writer, error) { this.locker.Lock() defer this.locker.Unlock() @@ -182,7 +182,10 @@ func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, i return nil, NewCapacityError("write memory cache failed: too many keys in cache storage") } capacityBytes := this.memoryCapacityBytes() - if capacityBytes > 0 && capacityBytes <= this.totalSize { + if size < 0 { + size = 0 + } + if capacityBytes > 0 && capacityBytes <= this.totalSize+size { return nil, NewCapacityError("write memory cache failed: over memory size: " + strconv.FormatInt(capacityBytes, 10) + ", current size: " + strconv.FormatInt(this.totalSize, 10) + " bytes") } @@ -384,7 +387,7 @@ func (this *MemoryStorage) flushItem(key string) { return } - writer, err := this.parentStorage.OpenWriter(key, item.ExpiredAt, item.Status) + writer, err := this.parentStorage.OpenWriter(key, item.ExpiredAt, item.Status, -1) if err != nil { if !CanIgnoreErr(err) { remotelogs.Error("CACHE", "flush items failed: open writer failed: "+err.Error()) diff --git a/internal/caches/storage_memory_test.go b/internal/caches/storage_memory_test.go index 03185a2..2aae411 100644 --- a/internal/caches/storage_memory_test.go +++ b/internal/caches/storage_memory_test.go @@ -15,7 +15,7 @@ import ( func TestMemoryStorage_OpenWriter(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) - writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200) + writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1) if err != nil { t.Fatal(err) } @@ -62,7 +62,7 @@ func TestMemoryStorage_OpenWriter(t *testing.T) { } } - writer, err = storage.OpenWriter("abc", time.Now().Unix()+60, 200) + writer, err = storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1) if err != nil { t.Fatal(err) } @@ -103,7 +103,7 @@ func TestMemoryStorage_OpenReaderLock(t *testing.T) { func TestMemoryStorage_Delete(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) { - writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200) + writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1) if err != nil { t.Fatal(err) } @@ -111,7 +111,7 @@ func TestMemoryStorage_Delete(t *testing.T) { t.Log(len(storage.valuesMap)) } { - writer, err := storage.OpenWriter("abc1", time.Now().Unix()+60, 200) + writer, err := storage.OpenWriter("abc1", time.Now().Unix()+60, 200, -1) if err != nil { t.Fatal(err) } @@ -126,7 +126,7 @@ func TestMemoryStorage_Stat(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) expiredAt := time.Now().Unix() + 60 { - writer, err := storage.OpenWriter("abc", expiredAt, 200) + writer, err := storage.OpenWriter("abc", expiredAt, 200, -1) if err != nil { t.Fatal(err) } @@ -139,7 +139,7 @@ func TestMemoryStorage_Stat(t *testing.T) { }) } { - writer, err := storage.OpenWriter("abc1", expiredAt, 200) + writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1) if err != nil { t.Fatal(err) } @@ -163,7 +163,7 @@ func TestMemoryStorage_CleanAll(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) expiredAt := time.Now().Unix() + 60 { - writer, err := storage.OpenWriter("abc", expiredAt, 200) + writer, err := storage.OpenWriter("abc", expiredAt, 200, -1) if err != nil { t.Fatal(err) } @@ -175,7 +175,7 @@ func TestMemoryStorage_CleanAll(t *testing.T) { }) } { - writer, err := storage.OpenWriter("abc1", expiredAt, 200) + writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1) if err != nil { t.Fatal(err) } @@ -198,7 +198,7 @@ func TestMemoryStorage_Purge(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) expiredAt := time.Now().Unix() + 60 { - writer, err := storage.OpenWriter("abc", expiredAt, 200) + writer, err := storage.OpenWriter("abc", expiredAt, 200, -1) if err != nil { t.Fatal(err) } @@ -210,7 +210,7 @@ func TestMemoryStorage_Purge(t *testing.T) { }) } { - writer, err := storage.OpenWriter("abc1", expiredAt, 200) + writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1) if err != nil { t.Fatal(err) } @@ -241,7 +241,7 @@ func TestMemoryStorage_Expire(t *testing.T) { for i := 0; i < 1000; i++ { expiredAt := time.Now().Unix() + int64(rands.Int(0, 60)) key := "abc" + strconv.Itoa(i) - writer, err := storage.OpenWriter(key, expiredAt, 200) + writer, err := storage.OpenWriter(key, expiredAt, 200, -1) if err != nil { t.Fatal(err) } diff --git a/internal/caches/writer_compression.go b/internal/caches/writer_compression.go deleted file mode 100644 index 27cffb5..0000000 --- a/internal/caches/writer_compression.go +++ /dev/null @@ -1,76 +0,0 @@ -package caches - -import ( - "github.com/TeaOSLab/EdgeNode/internal/compressions" -) - -type compressionWriter struct { - rawWriter Writer - writer compressions.Writer - key string - expiredAt int64 -} - -func NewCompressionWriter(gw Writer, cpWriter compressions.Writer, key string, expiredAt int64) Writer { - return &compressionWriter{ - rawWriter: gw, - writer: cpWriter, - key: key, - expiredAt: expiredAt, - } -} - -func (this *compressionWriter) WriteHeader(data []byte) (n int, err error) { - return this.writer.Write(data) -} - -// WriteHeaderLength 写入Header长度数据 -func (this *compressionWriter) WriteHeaderLength(headerLength int) error { - return nil -} - -// WriteBodyLength 写入Body长度数据 -func (this *compressionWriter) WriteBodyLength(bodyLength int64) error { - return nil -} - -func (this *compressionWriter) Write(data []byte) (n int, err error) { - return this.writer.Write(data) -} - -func (this *compressionWriter) Close() error { - err := this.writer.Close() - if err != nil { - return err - } - return this.rawWriter.Close() -} - -func (this *compressionWriter) Discard() error { - err := this.writer.Close() - if err != nil { - return err - } - return this.rawWriter.Discard() -} - -func (this *compressionWriter) Key() string { - return this.key -} - -func (this *compressionWriter) ExpiredAt() int64 { - return this.expiredAt -} - -func (this *compressionWriter) HeaderSize() int64 { - return this.rawWriter.HeaderSize() -} - -func (this *compressionWriter) BodySize() int64 { - return this.rawWriter.BodySize() -} - -// ItemType 内容类型 -func (this *compressionWriter) ItemType() ItemType { - return this.rawWriter.ItemType() -} diff --git a/internal/nodes/api_stream.go b/internal/nodes/api_stream.go index 04c5b4a..b86d92b 100644 --- a/internal/nodes/api_stream.go +++ b/internal/nodes/api_stream.go @@ -182,7 +182,7 @@ func (this *APIStream) handleWriteCache(message *pb.NodeStreamMessage) error { } expiredAt := time.Now().Unix() + msg.LifeSeconds - writer, err := storage.OpenWriter(msg.Key, expiredAt, 200) + writer, err := storage.OpenWriter(msg.Key, expiredAt, 200, int64(len(msg.Value))) if err != nil { this.replyFail(message.RequestId, "prepare writing failed: "+err.Error()) return err @@ -462,7 +462,7 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error { } expiredAt := time.Now().Unix() + 8600 - writer, err := storage.OpenWriter(key, expiredAt, 200) // TODO 可以设置缓存过期时间 + writer, err := storage.OpenWriter(key, expiredAt, 200, resp.ContentLength) // TODO 可以设置缓存过期时间 if err != nil { locker.Lock() errorMessages = append(errorMessages, "open cache writer failed: "+key+": "+err.Error()) diff --git a/internal/nodes/http_request_cache.go b/internal/nodes/http_request_cache.go index b63ee81..ff90132 100644 --- a/internal/nodes/http_request_cache.go +++ b/internal/nodes/http_request_cache.go @@ -21,7 +21,7 @@ import ( func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { this.cacheCanTryStale = false - cachePolicy := this.ReqServer.HTTPCachePolicy + var cachePolicy = this.ReqServer.HTTPCachePolicy if cachePolicy == nil || !cachePolicy.IsOn { return } diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index 90fb313..a38dac7 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -243,7 +243,7 @@ func (this *HTTPWriter) PrepareCache(resp *http.Response, size int64) { var expiredAt = utils.UnixTime() + life var cacheKey = this.req.cacheKey - cacheWriter, err := storage.OpenWriter(cacheKey, expiredAt, this.StatusCode()) + cacheWriter, err := storage.OpenWriter(cacheKey, expiredAt, this.StatusCode(), size) if err != nil { if !caches.CanIgnoreErr(err) { remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error()) @@ -537,7 +537,7 @@ func (this *HTTPWriter) Close() { if this.cacheWriter != nil { var cacheKey = this.cacheWriter.Key() + webpSuffix - webpCacheWriter, _ = this.cacheStorage.OpenWriter(cacheKey, this.cacheWriter.ExpiredAt(), this.StatusCode()) + webpCacheWriter, _ = this.cacheStorage.OpenWriter(cacheKey, this.cacheWriter.ExpiredAt(), this.StatusCode(), -1) if webpCacheWriter != nil { // 写入Header for k, v := range this.Header() { @@ -660,7 +660,7 @@ func (this *HTTPWriter) Close() { // 对比Content-Length var contentLengthString = this.Header().Get("Content-Length") if len(contentLengthString) > 0 { - contentLength := types.Int64(contentLengthString) + var contentLength = types.Int64(contentLengthString) if contentLength != this.cacheWriter.BodySize() { this.isOk = false _ = this.cacheWriter.Discard()