diff --git a/internal/compressions/reader.go b/internal/compressions/reader.go index aef2701..3d9f790 100644 --- a/internal/compressions/reader.go +++ b/internal/compressions/reader.go @@ -9,6 +9,7 @@ type Reader interface { Reset(reader io.Reader) error RawClose() error Close() error + IncreaseHit() uint32 SetPool(pool *ReaderPool) ResetFinish() diff --git a/internal/compressions/reader_base.go b/internal/compressions/reader_base.go index 327215f..0e566e9 100644 --- a/internal/compressions/reader_base.go +++ b/internal/compressions/reader_base.go @@ -2,10 +2,13 @@ package compressions +import "sync/atomic" + type BaseReader struct { pool *ReaderPool isFinished bool + hits uint32 } func (this *BaseReader) SetPool(pool *ReaderPool) { @@ -13,8 +16,11 @@ func (this *BaseReader) SetPool(pool *ReaderPool) { } func (this *BaseReader) Finish(obj Reader) error { + if this.isFinished { + return nil + } err := obj.RawClose() - if err == nil && this.pool != nil && !this.isFinished { + if err == nil && this.pool != nil { this.pool.Put(obj) } this.isFinished = true @@ -24,3 +30,7 @@ func (this *BaseReader) Finish(obj Reader) error { func (this *BaseReader) ResetFinish() { this.isFinished = false } + +func (this *BaseReader) IncreaseHit() uint32 { + return atomic.AddUint32(&this.hits, 1) +} diff --git a/internal/compressions/reader_pool.go b/internal/compressions/reader_pool.go index f19c811..7dbe0d0 100644 --- a/internal/compressions/reader_pool.go +++ b/internal/compressions/reader_pool.go @@ -6,6 +6,8 @@ import ( "io" ) +const maxReadHits = 1 << 20 + type ReaderPool struct { c chan Reader newFunc func(reader io.Reader) (Reader, error) @@ -49,6 +51,11 @@ func (this *ReaderPool) Get(parentReader io.Reader) (Reader, error) { } func (this *ReaderPool) Put(reader Reader) { + if reader.IncreaseHit() > maxReadHits { + // do nothing to discard it + return + } + select { case this.c <- reader: default: diff --git a/internal/compressions/reader_pool_brotli.go b/internal/compressions/reader_pool_brotli.go index 282bde2..583d81d 100644 --- a/internal/compressions/reader_pool_brotli.go +++ b/internal/compressions/reader_pool_brotli.go @@ -4,7 +4,6 @@ package compressions import ( teaconst "github.com/TeaOSLab/EdgeNode/internal/const" - memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem" "io" ) @@ -15,11 +14,8 @@ func init() { return } - var maxSize = memutils.SystemMemoryGB() * 256 - if maxSize == 0 { - maxSize = 256 - } - sharedBrotliReaderPool = NewReaderPool(maxSize, func(reader io.Reader) (Reader, error) { + + sharedBrotliReaderPool = NewReaderPool(CalculatePoolSize(), func(reader io.Reader) (Reader, error) { return newBrotliReader(reader) }) } diff --git a/internal/compressions/reader_pool_deflate.go b/internal/compressions/reader_pool_deflate.go index 03b3bfa..16036ee 100644 --- a/internal/compressions/reader_pool_deflate.go +++ b/internal/compressions/reader_pool_deflate.go @@ -4,7 +4,6 @@ package compressions import ( teaconst "github.com/TeaOSLab/EdgeNode/internal/const" - memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem" "io" ) @@ -15,11 +14,7 @@ func init() { return } - var maxSize = memutils.SystemMemoryGB() * 256 - if maxSize == 0 { - maxSize = 256 - } - sharedDeflateReaderPool = NewReaderPool(maxSize, func(reader io.Reader) (Reader, error) { + sharedDeflateReaderPool = NewReaderPool(CalculatePoolSize(), func(reader io.Reader) (Reader, error) { return newDeflateReader(reader) }) } diff --git a/internal/compressions/reader_pool_gzip.go b/internal/compressions/reader_pool_gzip.go index 197a4f0..0b76c37 100644 --- a/internal/compressions/reader_pool_gzip.go +++ b/internal/compressions/reader_pool_gzip.go @@ -4,7 +4,6 @@ package compressions import ( teaconst "github.com/TeaOSLab/EdgeNode/internal/const" - memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem" "io" ) @@ -15,11 +14,7 @@ func init() { return } - var maxSize = memutils.SystemMemoryGB() * 256 - if maxSize == 0 { - maxSize = 256 - } - sharedGzipReaderPool = NewReaderPool(maxSize, func(reader io.Reader) (Reader, error) { + sharedGzipReaderPool = NewReaderPool(CalculatePoolSize(), func(reader io.Reader) (Reader, error) { return newGzipReader(reader) }) } diff --git a/internal/compressions/reader_pool_zstd.go b/internal/compressions/reader_pool_zstd.go index c2931ca..74372f3 100644 --- a/internal/compressions/reader_pool_zstd.go +++ b/internal/compressions/reader_pool_zstd.go @@ -4,7 +4,6 @@ package compressions import ( teaconst "github.com/TeaOSLab/EdgeNode/internal/const" - memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem" "io" ) @@ -15,11 +14,7 @@ func init() { return } - var maxSize = memutils.SystemMemoryGB() * 256 - if maxSize == 0 { - maxSize = 256 - } - sharedZSTDReaderPool = NewReaderPool(maxSize, func(reader io.Reader) (Reader, error) { + sharedZSTDReaderPool = NewReaderPool(CalculatePoolSize(), func(reader io.Reader) (Reader, error) { return newZSTDReader(reader) }) } diff --git a/internal/compressions/reader_zstd.go b/internal/compressions/reader_zstd.go index b76c43e..f5eb292 100644 --- a/internal/compressions/reader_zstd.go +++ b/internal/compressions/reader_zstd.go @@ -18,7 +18,7 @@ func NewZSTDReader(reader io.Reader) (Reader, error) { } func newZSTDReader(reader io.Reader) (Reader, error) { - r, err := zstd.NewReader(reader) + r, err := zstd.NewReader(reader, zstd.WithDecoderMaxWindow(256<<20)) if err != nil { return nil, err } diff --git a/internal/compressions/utils.go b/internal/compressions/utils.go index 74719c2..b4c10e0 100644 --- a/internal/compressions/utils.go +++ b/internal/compressions/utils.go @@ -5,8 +5,10 @@ package compressions import ( "errors" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem" "io" "net/http" + "runtime" ) type ContentEncoding = string @@ -88,3 +90,31 @@ func WrapHTTPResponse(resp *http.Response) { resp.Header.Del("Content-Length") resp.Body = reader } + +// 系统CPU线程数 +var countCPU = runtime.NumCPU() + +// GenerateCompressLevel 根据系统资源自动生成压缩级别 +func GenerateCompressLevel(minLevel int, maxLevel int) (level int) { + if countCPU < 16 { + return minLevel + } + + if countCPU < 32 { + return min(3, maxLevel) + } + + return min(5, maxLevel) +} + +// CalculatePoolSize 计算Pool尺寸 +func CalculatePoolSize() int { + var maxSize = memutils.SystemMemoryGB() * 64 + if maxSize == 0 { + maxSize = 128 + } + if maxSize > 4096 { + maxSize = 4096 + } + return maxSize +} diff --git a/internal/compressions/utils_test.go b/internal/compressions/utils_test.go new file mode 100644 index 0000000..7241866 --- /dev/null +++ b/internal/compressions/utils_test.go @@ -0,0 +1,27 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package compressions_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/compressions" + "github.com/iwind/TeaGo/assert" + "testing" +) + +func TestGenerateCompressLevel(t *testing.T) { + var a = assert.NewAssertion(t) + + t.Log(compressions.GenerateCompressLevel(0, 10)) + t.Log(compressions.GenerateCompressLevel(1, 10)) + t.Log(compressions.GenerateCompressLevel(1, 4)) + + { + var level = compressions.GenerateCompressLevel(1, 2) + t.Log(level) + a.IsTrue(level >= 1 && level <= 2) + } +} + +func TestCalculatePoolSize(t *testing.T) { + t.Log(compressions.CalculatePoolSize()) +} diff --git a/internal/compressions/writer.go b/internal/compressions/writer.go index fafcc3e..d0a0e79 100644 --- a/internal/compressions/writer.go +++ b/internal/compressions/writer.go @@ -11,6 +11,7 @@ type Writer interface { RawClose() error Close() error Level() int + IncreaseHit() uint32 SetPool(pool *WriterPool) ResetFinish() diff --git a/internal/compressions/writer_base.go b/internal/compressions/writer_base.go index 67bd99c..be27fe0 100644 --- a/internal/compressions/writer_base.go +++ b/internal/compressions/writer_base.go @@ -2,10 +2,16 @@ package compressions +import ( + "sync/atomic" +) + type BaseWriter struct { pool *WriterPool isFinished bool + + hits uint32 } func (this *BaseWriter) SetPool(pool *WriterPool) { @@ -13,8 +19,11 @@ func (this *BaseWriter) SetPool(pool *WriterPool) { } func (this *BaseWriter) Finish(obj Writer) error { + if this.isFinished { + return nil + } err := obj.RawClose() - if err == nil && this.pool != nil && !this.isFinished { + if err == nil && this.pool != nil { this.pool.Put(obj) } this.isFinished = true @@ -24,3 +33,7 @@ func (this *BaseWriter) Finish(obj Writer) error { func (this *BaseWriter) ResetFinish() { this.isFinished = false } + +func (this *BaseWriter) IncreaseHit() uint32 { + return atomic.AddUint32(&this.hits, 1) +} diff --git a/internal/compressions/writer_brotli.go b/internal/compressions/writer_brotli.go index 79c8ab0..dbd5839 100644 --- a/internal/compressions/writer_brotli.go +++ b/internal/compressions/writer_brotli.go @@ -19,12 +19,8 @@ func NewBrotliWriter(writer io.Writer, level int) (Writer, error) { return sharedBrotliWriterPool.Get(writer, level) } -func newBrotliWriter(writer io.Writer, level int) (*BrotliWriter, error) { - if level <= 0 { - level = brotli.BestSpeed - } else if level > brotli.BestCompression { - level = brotli.BestCompression - } +func newBrotliWriter(writer io.Writer) (*BrotliWriter, error) { + var level = GenerateCompressLevel(brotli.BestSpeed, brotli.BestCompression) return &BrotliWriter{ writer: brotli.NewWriterOptions(writer, brotli.WriterOptions{ Quality: level, diff --git a/internal/compressions/writer_deflate.go b/internal/compressions/writer_deflate.go index 0a16e76..edaa80d 100644 --- a/internal/compressions/writer_deflate.go +++ b/internal/compressions/writer_deflate.go @@ -18,12 +18,8 @@ func NewDeflateWriter(writer io.Writer, level int) (Writer, error) { return sharedDeflateWriterPool.Get(writer, level) } -func newDeflateWriter(writer io.Writer, level int) (Writer, error) { - if level <= 0 { - level = flate.BestSpeed - } else if level > flate.BestCompression { - level = flate.BestCompression - } +func newDeflateWriter(writer io.Writer) (Writer, error) { + var level = GenerateCompressLevel(flate.BestSpeed, flate.BestCompression) flateWriter, err := flate.NewWriter(writer, level) if err != nil { diff --git a/internal/compressions/writer_gzip.go b/internal/compressions/writer_gzip.go index 12f2bdf..8b89934 100644 --- a/internal/compressions/writer_gzip.go +++ b/internal/compressions/writer_gzip.go @@ -18,12 +18,8 @@ func NewGzipWriter(writer io.Writer, level int) (Writer, error) { return sharedGzipWriterPool.Get(writer, level) } -func newGzipWriter(writer io.Writer, level int) (Writer, error) { - if level <= 0 { - level = gzip.BestSpeed - } else if level > gzip.BestCompression { - level = gzip.BestCompression - } +func newGzipWriter(writer io.Writer) (Writer, error) { + var level = GenerateCompressLevel(gzip.BestSpeed, gzip.BestCompression) gzipWriter, err := gzip.NewWriterLevel(writer, level) if err != nil { diff --git a/internal/compressions/writer_pool.go b/internal/compressions/writer_pool.go index cdc0367..d388c65 100644 --- a/internal/compressions/writer_pool.go +++ b/internal/compressions/writer_pool.go @@ -6,6 +6,8 @@ import ( "io" ) +const maxWriterHits = 1 << 20 + type WriterPool struct { m map[int]chan Writer // level => chan Writer newFunc func(writer io.Writer, level int) (Writer, error) @@ -49,6 +51,11 @@ func (this *WriterPool) Get(parentWriter io.Writer, level int) (Writer, error) { } func (this *WriterPool) Put(writer Writer) { + if writer.IncreaseHit() > maxWriterHits { + // do nothing to discard it + return + } + var level = writer.Level() c, ok := this.m[level] if !ok { diff --git a/internal/compressions/writer_pool_brotli.go b/internal/compressions/writer_pool_brotli.go index 30de41f..1e500d0 100644 --- a/internal/compressions/writer_pool_brotli.go +++ b/internal/compressions/writer_pool_brotli.go @@ -4,7 +4,6 @@ package compressions import ( teaconst "github.com/TeaOSLab/EdgeNode/internal/const" - memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem" "github.com/andybalholm/brotli" "io" ) @@ -16,11 +15,7 @@ func init() { return } - var maxSize = memutils.SystemMemoryGB() * 256 - if maxSize == 0 { - maxSize = 256 - } - sharedBrotliWriterPool = NewWriterPool(maxSize, brotli.BestCompression, func(writer io.Writer, level int) (Writer, error) { - return newBrotliWriter(writer, level) + sharedBrotliWriterPool = NewWriterPool(CalculatePoolSize(), brotli.BestCompression, func(writer io.Writer, level int) (Writer, error) { + return newBrotliWriter(writer) }) } diff --git a/internal/compressions/writer_pool_deflate.go b/internal/compressions/writer_pool_deflate.go index c91181c..cc8308f 100644 --- a/internal/compressions/writer_pool_deflate.go +++ b/internal/compressions/writer_pool_deflate.go @@ -5,7 +5,6 @@ package compressions import ( "compress/flate" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" - memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem" "io" ) @@ -16,11 +15,7 @@ func init() { return } - var maxSize = memutils.SystemMemoryGB() * 256 - if maxSize == 0 { - maxSize = 256 - } - sharedDeflateWriterPool = NewWriterPool(maxSize, flate.BestCompression, func(writer io.Writer, level int) (Writer, error) { - return newDeflateWriter(writer, level) + sharedDeflateWriterPool = NewWriterPool(CalculatePoolSize(), flate.BestCompression, func(writer io.Writer, level int) (Writer, error) { + return newDeflateWriter(writer) }) } diff --git a/internal/compressions/writer_pool_gzip.go b/internal/compressions/writer_pool_gzip.go index f1a8cef..09f52a0 100644 --- a/internal/compressions/writer_pool_gzip.go +++ b/internal/compressions/writer_pool_gzip.go @@ -5,7 +5,6 @@ package compressions import ( "compress/gzip" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" - memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem" "io" ) @@ -16,11 +15,8 @@ func init() { return } - var maxSize = memutils.SystemMemoryGB() * 256 - if maxSize == 0 { - maxSize = 256 - } - sharedGzipWriterPool = NewWriterPool(maxSize, gzip.BestCompression, func(writer io.Writer, level int) (Writer, error) { - return newGzipWriter(writer, level) + + sharedGzipWriterPool = NewWriterPool(CalculatePoolSize(), gzip.BestCompression, func(writer io.Writer, level int) (Writer, error) { + return newGzipWriter(writer) }) } diff --git a/internal/compressions/writer_pool_zstd.go b/internal/compressions/writer_pool_zstd.go index 42f80b1..b685410 100644 --- a/internal/compressions/writer_pool_zstd.go +++ b/internal/compressions/writer_pool_zstd.go @@ -4,7 +4,6 @@ package compressions import ( teaconst "github.com/TeaOSLab/EdgeNode/internal/const" - memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem" "github.com/klauspost/compress/zstd" "io" ) @@ -16,11 +15,7 @@ func init() { return } - var maxSize = memutils.SystemMemoryGB() * 256 - if maxSize == 0 { - maxSize = 256 - } - sharedZSTDWriterPool = NewWriterPool(maxSize, int(zstd.SpeedBestCompression), func(writer io.Writer, level int) (Writer, error) { - return newZSTDWriter(writer, level) + sharedZSTDWriterPool = NewWriterPool(CalculatePoolSize(), int(zstd.SpeedBestCompression), func(writer io.Writer, level int) (Writer, error) { + return newZSTDWriter(writer) }) } diff --git a/internal/compressions/writer_zstd.go b/internal/compressions/writer_zstd.go index d14fdc8..313bb2b 100644 --- a/internal/compressions/writer_zstd.go +++ b/internal/compressions/writer_zstd.go @@ -18,21 +18,18 @@ func NewZSTDWriter(writer io.Writer, level int) (Writer, error) { return sharedZSTDWriterPool.Get(writer, level) } -func newZSTDWriter(writer io.Writer, level int) (Writer, error) { - if level < 0 { - level = 0 - } +func newZSTDWriter(writer io.Writer) (Writer, error) { + var level = 1 + var zstdLevel = zstd.SpeedFastest - var zstdLevel = zstd.EncoderLevelFromZstd(level) - - zstdWriter, err := zstd.NewWriter(writer, zstd.WithEncoderLevel(zstdLevel)) + zstdWriter, err := zstd.NewWriter(writer, zstd.WithEncoderLevel(zstdLevel), zstd.WithWindowSize(16<<10), zstd.WithLowerEncoderMem(true)) if err != nil { return nil, err } return &ZSTDWriter{ - writer: zstdWriter, - level: level, + writer: zstdWriter, + level: level, }, nil }