优化内容压缩

* 取消用户设置的压缩级别,现在压缩级别通过系统自动设置
* Pool中的对象命中100万次时自动销毁,避免内存泄漏
* 降低Pool中的对象数量,避免占用太多内存
* 根据系统CPU线程数自动计算压缩级别,避免消耗太多CPU
* zstd限制解码的最大Window
* zstd使用低内存模式
This commit is contained in:
GoEdgeLab
2024-04-16 11:32:38 +08:00
parent 4f66f55152
commit 005e27a63c
21 changed files with 125 additions and 82 deletions

View File

@@ -9,6 +9,7 @@ type Reader interface {
Reset(reader io.Reader) error
RawClose() error
Close() error
IncreaseHit() uint32
SetPool(pool *ReaderPool)
ResetFinish()

View File

@@ -2,10 +2,13 @@
package compressions
import "sync/atomic"
type BaseReader struct {
pool *ReaderPool
isFinished bool
hits uint32
}
func (this *BaseReader) SetPool(pool *ReaderPool) {
@@ -13,8 +16,11 @@ func (this *BaseReader) SetPool(pool *ReaderPool) {
}
func (this *BaseReader) Finish(obj Reader) error {
if this.isFinished {
return nil
}
err := obj.RawClose()
if err == nil && this.pool != nil && !this.isFinished {
if err == nil && this.pool != nil {
this.pool.Put(obj)
}
this.isFinished = true
@@ -24,3 +30,7 @@ func (this *BaseReader) Finish(obj Reader) error {
func (this *BaseReader) ResetFinish() {
this.isFinished = false
}
func (this *BaseReader) IncreaseHit() uint32 {
return atomic.AddUint32(&this.hits, 1)
}

View File

@@ -6,6 +6,8 @@ import (
"io"
)
const maxReadHits = 1 << 20
type ReaderPool struct {
c chan Reader
newFunc func(reader io.Reader) (Reader, error)
@@ -49,6 +51,11 @@ func (this *ReaderPool) Get(parentReader io.Reader) (Reader, error) {
}
func (this *ReaderPool) Put(reader Reader) {
if reader.IncreaseHit() > maxReadHits {
// do nothing to discard it
return
}
select {
case this.c <- reader:
default:

View File

@@ -4,7 +4,6 @@ package compressions
import (
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem"
"io"
)
@@ -15,11 +14,8 @@ func init() {
return
}
var maxSize = memutils.SystemMemoryGB() * 256
if maxSize == 0 {
maxSize = 256
}
sharedBrotliReaderPool = NewReaderPool(maxSize, func(reader io.Reader) (Reader, error) {
sharedBrotliReaderPool = NewReaderPool(CalculatePoolSize(), func(reader io.Reader) (Reader, error) {
return newBrotliReader(reader)
})
}

View File

@@ -4,7 +4,6 @@ package compressions
import (
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem"
"io"
)
@@ -15,11 +14,7 @@ func init() {
return
}
var maxSize = memutils.SystemMemoryGB() * 256
if maxSize == 0 {
maxSize = 256
}
sharedDeflateReaderPool = NewReaderPool(maxSize, func(reader io.Reader) (Reader, error) {
sharedDeflateReaderPool = NewReaderPool(CalculatePoolSize(), func(reader io.Reader) (Reader, error) {
return newDeflateReader(reader)
})
}

View File

@@ -4,7 +4,6 @@ package compressions
import (
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem"
"io"
)
@@ -15,11 +14,7 @@ func init() {
return
}
var maxSize = memutils.SystemMemoryGB() * 256
if maxSize == 0 {
maxSize = 256
}
sharedGzipReaderPool = NewReaderPool(maxSize, func(reader io.Reader) (Reader, error) {
sharedGzipReaderPool = NewReaderPool(CalculatePoolSize(), func(reader io.Reader) (Reader, error) {
return newGzipReader(reader)
})
}

View File

@@ -4,7 +4,6 @@ package compressions
import (
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem"
"io"
)
@@ -15,11 +14,7 @@ func init() {
return
}
var maxSize = memutils.SystemMemoryGB() * 256
if maxSize == 0 {
maxSize = 256
}
sharedZSTDReaderPool = NewReaderPool(maxSize, func(reader io.Reader) (Reader, error) {
sharedZSTDReaderPool = NewReaderPool(CalculatePoolSize(), func(reader io.Reader) (Reader, error) {
return newZSTDReader(reader)
})
}

View File

@@ -18,7 +18,7 @@ func NewZSTDReader(reader io.Reader) (Reader, error) {
}
func newZSTDReader(reader io.Reader) (Reader, error) {
r, err := zstd.NewReader(reader)
r, err := zstd.NewReader(reader, zstd.WithDecoderMaxWindow(256<<20))
if err != nil {
return nil, err
}

View File

@@ -5,8 +5,10 @@ package compressions
import (
"errors"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem"
"io"
"net/http"
"runtime"
)
type ContentEncoding = string
@@ -88,3 +90,31 @@ func WrapHTTPResponse(resp *http.Response) {
resp.Header.Del("Content-Length")
resp.Body = reader
}
// 系统CPU线程数
var countCPU = runtime.NumCPU()
// GenerateCompressLevel 根据系统资源自动生成压缩级别
func GenerateCompressLevel(minLevel int, maxLevel int) (level int) {
if countCPU < 16 {
return minLevel
}
if countCPU < 32 {
return min(3, maxLevel)
}
return min(5, maxLevel)
}
// CalculatePoolSize 计算Pool尺寸
func CalculatePoolSize() int {
var maxSize = memutils.SystemMemoryGB() * 64
if maxSize == 0 {
maxSize = 128
}
if maxSize > 4096 {
maxSize = 4096
}
return maxSize
}

View File

@@ -0,0 +1,27 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package compressions_test
import (
"github.com/TeaOSLab/EdgeNode/internal/compressions"
"github.com/iwind/TeaGo/assert"
"testing"
)
func TestGenerateCompressLevel(t *testing.T) {
var a = assert.NewAssertion(t)
t.Log(compressions.GenerateCompressLevel(0, 10))
t.Log(compressions.GenerateCompressLevel(1, 10))
t.Log(compressions.GenerateCompressLevel(1, 4))
{
var level = compressions.GenerateCompressLevel(1, 2)
t.Log(level)
a.IsTrue(level >= 1 && level <= 2)
}
}
func TestCalculatePoolSize(t *testing.T) {
t.Log(compressions.CalculatePoolSize())
}

View File

@@ -11,6 +11,7 @@ type Writer interface {
RawClose() error
Close() error
Level() int
IncreaseHit() uint32
SetPool(pool *WriterPool)
ResetFinish()

View File

@@ -2,10 +2,16 @@
package compressions
import (
"sync/atomic"
)
type BaseWriter struct {
pool *WriterPool
isFinished bool
hits uint32
}
func (this *BaseWriter) SetPool(pool *WriterPool) {
@@ -13,8 +19,11 @@ func (this *BaseWriter) SetPool(pool *WriterPool) {
}
func (this *BaseWriter) Finish(obj Writer) error {
if this.isFinished {
return nil
}
err := obj.RawClose()
if err == nil && this.pool != nil && !this.isFinished {
if err == nil && this.pool != nil {
this.pool.Put(obj)
}
this.isFinished = true
@@ -24,3 +33,7 @@ func (this *BaseWriter) Finish(obj Writer) error {
func (this *BaseWriter) ResetFinish() {
this.isFinished = false
}
func (this *BaseWriter) IncreaseHit() uint32 {
return atomic.AddUint32(&this.hits, 1)
}

View File

@@ -19,12 +19,8 @@ func NewBrotliWriter(writer io.Writer, level int) (Writer, error) {
return sharedBrotliWriterPool.Get(writer, level)
}
func newBrotliWriter(writer io.Writer, level int) (*BrotliWriter, error) {
if level <= 0 {
level = brotli.BestSpeed
} else if level > brotli.BestCompression {
level = brotli.BestCompression
}
func newBrotliWriter(writer io.Writer) (*BrotliWriter, error) {
var level = GenerateCompressLevel(brotli.BestSpeed, brotli.BestCompression)
return &BrotliWriter{
writer: brotli.NewWriterOptions(writer, brotli.WriterOptions{
Quality: level,

View File

@@ -18,12 +18,8 @@ func NewDeflateWriter(writer io.Writer, level int) (Writer, error) {
return sharedDeflateWriterPool.Get(writer, level)
}
func newDeflateWriter(writer io.Writer, level int) (Writer, error) {
if level <= 0 {
level = flate.BestSpeed
} else if level > flate.BestCompression {
level = flate.BestCompression
}
func newDeflateWriter(writer io.Writer) (Writer, error) {
var level = GenerateCompressLevel(flate.BestSpeed, flate.BestCompression)
flateWriter, err := flate.NewWriter(writer, level)
if err != nil {

View File

@@ -18,12 +18,8 @@ func NewGzipWriter(writer io.Writer, level int) (Writer, error) {
return sharedGzipWriterPool.Get(writer, level)
}
func newGzipWriter(writer io.Writer, level int) (Writer, error) {
if level <= 0 {
level = gzip.BestSpeed
} else if level > gzip.BestCompression {
level = gzip.BestCompression
}
func newGzipWriter(writer io.Writer) (Writer, error) {
var level = GenerateCompressLevel(gzip.BestSpeed, gzip.BestCompression)
gzipWriter, err := gzip.NewWriterLevel(writer, level)
if err != nil {

View File

@@ -6,6 +6,8 @@ import (
"io"
)
const maxWriterHits = 1 << 20
type WriterPool struct {
m map[int]chan Writer // level => chan Writer
newFunc func(writer io.Writer, level int) (Writer, error)
@@ -49,6 +51,11 @@ func (this *WriterPool) Get(parentWriter io.Writer, level int) (Writer, error) {
}
func (this *WriterPool) Put(writer Writer) {
if writer.IncreaseHit() > maxWriterHits {
// do nothing to discard it
return
}
var level = writer.Level()
c, ok := this.m[level]
if !ok {

View File

@@ -4,7 +4,6 @@ package compressions
import (
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem"
"github.com/andybalholm/brotli"
"io"
)
@@ -16,11 +15,7 @@ func init() {
return
}
var maxSize = memutils.SystemMemoryGB() * 256
if maxSize == 0 {
maxSize = 256
}
sharedBrotliWriterPool = NewWriterPool(maxSize, brotli.BestCompression, func(writer io.Writer, level int) (Writer, error) {
return newBrotliWriter(writer, level)
sharedBrotliWriterPool = NewWriterPool(CalculatePoolSize(), brotli.BestCompression, func(writer io.Writer, level int) (Writer, error) {
return newBrotliWriter(writer)
})
}

View File

@@ -5,7 +5,6 @@ package compressions
import (
"compress/flate"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem"
"io"
)
@@ -16,11 +15,7 @@ func init() {
return
}
var maxSize = memutils.SystemMemoryGB() * 256
if maxSize == 0 {
maxSize = 256
}
sharedDeflateWriterPool = NewWriterPool(maxSize, flate.BestCompression, func(writer io.Writer, level int) (Writer, error) {
return newDeflateWriter(writer, level)
sharedDeflateWriterPool = NewWriterPool(CalculatePoolSize(), flate.BestCompression, func(writer io.Writer, level int) (Writer, error) {
return newDeflateWriter(writer)
})
}

View File

@@ -5,7 +5,6 @@ package compressions
import (
"compress/gzip"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem"
"io"
)
@@ -16,11 +15,8 @@ func init() {
return
}
var maxSize = memutils.SystemMemoryGB() * 256
if maxSize == 0 {
maxSize = 256
}
sharedGzipWriterPool = NewWriterPool(maxSize, gzip.BestCompression, func(writer io.Writer, level int) (Writer, error) {
return newGzipWriter(writer, level)
sharedGzipWriterPool = NewWriterPool(CalculatePoolSize(), gzip.BestCompression, func(writer io.Writer, level int) (Writer, error) {
return newGzipWriter(writer)
})
}

View File

@@ -4,7 +4,6 @@ package compressions
import (
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem"
"github.com/klauspost/compress/zstd"
"io"
)
@@ -16,11 +15,7 @@ func init() {
return
}
var maxSize = memutils.SystemMemoryGB() * 256
if maxSize == 0 {
maxSize = 256
}
sharedZSTDWriterPool = NewWriterPool(maxSize, int(zstd.SpeedBestCompression), func(writer io.Writer, level int) (Writer, error) {
return newZSTDWriter(writer, level)
sharedZSTDWriterPool = NewWriterPool(CalculatePoolSize(), int(zstd.SpeedBestCompression), func(writer io.Writer, level int) (Writer, error) {
return newZSTDWriter(writer)
})
}

View File

@@ -18,21 +18,18 @@ func NewZSTDWriter(writer io.Writer, level int) (Writer, error) {
return sharedZSTDWriterPool.Get(writer, level)
}
func newZSTDWriter(writer io.Writer, level int) (Writer, error) {
if level < 0 {
level = 0
}
func newZSTDWriter(writer io.Writer) (Writer, error) {
var level = 1
var zstdLevel = zstd.SpeedFastest
var zstdLevel = zstd.EncoderLevelFromZstd(level)
zstdWriter, err := zstd.NewWriter(writer, zstd.WithEncoderLevel(zstdLevel))
zstdWriter, err := zstd.NewWriter(writer, zstd.WithEncoderLevel(zstdLevel), zstd.WithWindowSize(16<<10), zstd.WithLowerEncoderMem(true))
if err != nil {
return nil, err
}
return &ZSTDWriter{
writer: zstdWriter,
level: level,
writer: zstdWriter,
level: level,
}, nil
}