From 6cc70dc8e592996723a6420fcd0df8248f22e6ef Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Sat, 19 Nov 2022 15:55:05 +0800 Subject: [PATCH] =?UTF-8?q?=E8=BF=9B=E4=B8=80=E6=AD=A5=E6=8F=90=E5=8D=87?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E7=BC=93=E5=AD=98=E5=86=99=E5=85=A5=E9=80=9F?= =?UTF-8?q?=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/caches/storage_file.go | 20 ++++--------- internal/nodes/http_writer.go | 26 +++++++++++++++-- internal/utils/buffer_pool.go | 44 ++++++++++++---------------- internal/utils/buffer_pool_test.go | 47 ++++++++++++++++++++++++++++++ 4 files changed, 94 insertions(+), 43 deletions(-) create mode 100644 internal/utils/buffer_pool_test.go diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index e1db536..bf7dac7 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -567,6 +567,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz var before = time.Now() writer, err := os.OpenFile(tmpPath, flags, 0666) if err != nil { + // TODO 检查在各个系统中的稳定性 if os.IsNotExist(err) { _ = os.MkdirAll(dir, 0777) @@ -608,8 +609,9 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz } if isNewCreated { - // 写入过期时间 - var metaBytes = make([]byte, SizeMeta+len(key)) + // 写入meta + // 从v0.5.8开始不再在meta中写入Key + var metaBytes = make([]byte, SizeMeta) binary.BigEndian.PutUint32(metaBytes[OffsetExpiresAt:], uint32(expiredAt)) // 写入状态码 @@ -618,18 +620,6 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz } copy(metaBytes[OffsetStatus:], strconv.Itoa(status)) - // 写入URL长度 - binary.BigEndian.PutUint32(metaBytes[OffsetURLLength:], uint32(len(key))) - - // 写入Header Length - binary.BigEndian.PutUint32(metaBytes[OffsetHeaderLength:], uint32(0)) - - // 写入Body Length - binary.BigEndian.PutUint64(metaBytes[OffsetBodyLength:], uint64(0)) - - // 写入URL - copy(metaBytes[OffsetKey:], key) - _, err = writer.Write(metaBytes) if err != nil { return nil, err @@ -679,7 +669,7 @@ func (this *FileStorage) AddToList(item *Item) { } item.MetaSize = SizeMeta + 128 - hash := stringutil.Md5(item.Key) + var hash = stringutil.Md5(item.Key) err := this.list.Add(hash, item) if err != nil && !strings.Contains(err.Error(), "UNIQUE constraint failed") { remotelogs.Error("CACHE", "add to list failed: "+err.Error()) diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index 90ff349..6d9a93c 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -324,13 +324,16 @@ func (this *HTTPWriter) PrepareCache(resp *http.Response, size int64) { } // 写入Header + var headerBuf = utils.SharedBufferPool.Get() for k, v := range this.Header() { for _, v1 := range v { if this.isPartial && k == "Content-Type" && strings.Contains(v1, "multipart/byteranges") { continue } - _, err = cacheWriter.WriteHeader([]byte(k + ":" + v1 + "\n")) + _, err = headerBuf.Write([]byte(k + ":" + v1 + "\n")) if err != nil { + utils.SharedBufferPool.Put(headerBuf) + remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error()) _ = this.cacheWriter.Discard() this.cacheWriter = nil @@ -338,6 +341,14 @@ func (this *HTTPWriter) PrepareCache(resp *http.Response, size int64) { } } } + _, err = cacheWriter.WriteHeader(headerBuf.Bytes()) + utils.SharedBufferPool.Put(headerBuf) + if err != nil { + remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error()) + _ = this.cacheWriter.Discard() + this.cacheWriter = nil + return + } if this.isPartial { // content-range @@ -633,10 +644,12 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) { } // 写入Header + var headerBuffer = utils.SharedBufferPool.Get() for k, v := range this.Header() { for _, v1 := range v { - _, err = compressionCacheWriter.WriteHeader([]byte(k + ":" + v1 + "\n")) + _, err = headerBuffer.Write([]byte(k + ":" + v1 + "\n")) if err != nil { + utils.SharedBufferPool.Put(headerBuffer) remotelogs.Error("HTTP_WRITER", "write compression cache failed: "+err.Error()) _ = compressionCacheWriter.Discard() compressionCacheWriter = nil @@ -645,6 +658,15 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) { } } + _, err = compressionCacheWriter.WriteHeader(headerBuffer.Bytes()) + utils.SharedBufferPool.Put(headerBuffer) + if err != nil { + remotelogs.Error("HTTP_WRITER", "write compression cache failed: "+err.Error()) + _ = compressionCacheWriter.Discard() + compressionCacheWriter = nil + return + } + if compressionCacheWriter != nil { this.compressionCacheWriter = compressionCacheWriter var teeWriter = writers.NewTeeWriterCloser(this.writer, compressionCacheWriter) diff --git a/internal/utils/buffer_pool.go b/internal/utils/buffer_pool.go index f08614c..547e208 100644 --- a/internal/utils/buffer_pool.go +++ b/internal/utils/buffer_pool.go @@ -2,47 +2,39 @@ package utils -import "bytes" +import ( + "bytes" + "sync" +) + +var SharedBufferPool = NewBufferPool() // BufferPool pool for get byte slice type BufferPool struct { - c chan *bytes.Buffer + rawPool *sync.Pool } // NewBufferPool 创建新对象 -func NewBufferPool(maxSize int) *BufferPool { - if maxSize <= 0 { - maxSize = 1024 - } - pool := &BufferPool{ - c: make(chan *bytes.Buffer, maxSize), +func NewBufferPool() *BufferPool { + var pool = &BufferPool{} + pool.rawPool = &sync.Pool{ + New: func() any { + return &bytes.Buffer{} + }, } return pool } // Get 获取一个新的Buffer func (this *BufferPool) Get() (b *bytes.Buffer) { - select { - case b = <-this.c: - b.Reset() - default: - b = &bytes.Buffer{} + var buffer = this.rawPool.Get().(*bytes.Buffer) + if buffer.Len() > 0 { + buffer.Reset() } - return + return buffer } // Put 放回一个使用过的byte slice func (this *BufferPool) Put(b *bytes.Buffer) { - b.Reset() - - select { - case this.c <- b: - default: - // 已达最大容量,则抛弃 - } -} - -// Size 当前的数量 -func (this *BufferPool) Size() int { - return len(this.c) + this.rawPool.Put(b) } diff --git a/internal/utils/buffer_pool_test.go b/internal/utils/buffer_pool_test.go new file mode 100644 index 0000000..918424e --- /dev/null +++ b/internal/utils/buffer_pool_test.go @@ -0,0 +1,47 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package utils_test + +import ( + "bytes" + "github.com/TeaOSLab/EdgeNode/internal/utils" + "strings" + "testing" +) + +func TestNewBufferPool(t *testing.T) { + var pool = utils.NewBufferPool() + var b = pool.Get() + b.WriteString("Hello, World") + t.Log(b.String()) + + pool.Put(b) + t.Log(b.String()) + + b = pool.Get() + t.Log(b.String()) +} + +func BenchmarkNewBufferPool1(b *testing.B) { + var data = []byte(strings.Repeat("Hello", 1024)) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + var buffer = &bytes.Buffer{} + buffer.Write(data) + } + }) +} + +func BenchmarkNewBufferPool2(b *testing.B) { + var pool = utils.NewBufferPool() + var data = []byte(strings.Repeat("Hello", 1024)) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + var buffer = pool.Get() + buffer.Write(data) + pool.Put(buffer) + } + }) +}