diff --git a/internal/compressions/errors.go b/internal/compressions/errors.go new file mode 100644 index 0000000..4e0a328 --- /dev/null +++ b/internal/compressions/errors.go @@ -0,0 +1,14 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package compressions + +import "errors" + +var ErrIsBusy = errors.New("the system is busy for compression") + +func CanIgnore(err error) bool { + if err == nil { + return true + } + return errors.Is(err, ErrIsBusy) +} diff --git a/internal/compressions/writer_pool.go b/internal/compressions/writer_pool.go index d388c65..0848af5 100644 --- a/internal/compressions/writer_pool.go +++ b/internal/compressions/writer_pool.go @@ -3,40 +3,58 @@ package compressions import ( + teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "github.com/TeaOSLab/EdgeNode/internal/goman" "io" + "time" ) const maxWriterHits = 1 << 20 +var isBusy = false + +func init() { + if !teaconst.IsMain { + return + } + + goman.New(func() { + var ticker = time.NewTicker(100 * time.Millisecond) + for range ticker.C { + if isBusy { + isBusy = false + } + } + }) +} + +func IsBusy() bool { + return isBusy +} + type WriterPool struct { - m map[int]chan Writer // level => chan Writer + c chan Writer // level => chan Writer newFunc func(writer io.Writer, level int) (Writer, error) } -func NewWriterPool(maxSize int, maxLevel int, newFunc func(writer io.Writer, level int) (Writer, error)) *WriterPool { +func NewWriterPool(maxSize int, newFunc func(writer io.Writer, level int) (Writer, error)) *WriterPool { if maxSize <= 0 { maxSize = 1024 } - var m = map[int]chan Writer{} - for i := 0; i <= maxLevel; i++ { - m[i] = make(chan Writer, maxSize) - } - return &WriterPool{ - m: m, + c: make(chan Writer, maxSize), newFunc: newFunc, } } func (this *WriterPool) Get(parentWriter io.Writer, level int) (Writer, error) { - c, ok := this.m[level] - if !ok { - c = this.m[0] + if isBusy { + return nil, ErrIsBusy } select { - case writer := <-c: + case writer := <-this.c: writer.Reset(parentWriter) writer.ResetFinish() return writer, nil @@ -56,13 +74,9 @@ func (this *WriterPool) Put(writer Writer) { return } - var level = writer.Level() - c, ok := this.m[level] - if !ok { - c = this.m[0] - } select { - case c <- writer: + case this.c <- writer: default: + isBusy = true } } diff --git a/internal/compressions/writer_pool_brotli.go b/internal/compressions/writer_pool_brotli.go index 1e500d0..9feaaa7 100644 --- a/internal/compressions/writer_pool_brotli.go +++ b/internal/compressions/writer_pool_brotli.go @@ -4,7 +4,6 @@ package compressions import ( teaconst "github.com/TeaOSLab/EdgeNode/internal/const" - "github.com/andybalholm/brotli" "io" ) @@ -15,7 +14,7 @@ func init() { return } - sharedBrotliWriterPool = NewWriterPool(CalculatePoolSize(), brotli.BestCompression, func(writer io.Writer, level int) (Writer, error) { + sharedBrotliWriterPool = NewWriterPool(CalculatePoolSize(), func(writer io.Writer, level int) (Writer, error) { return newBrotliWriter(writer) }) } diff --git a/internal/compressions/writer_pool_deflate.go b/internal/compressions/writer_pool_deflate.go index cc8308f..2183060 100644 --- a/internal/compressions/writer_pool_deflate.go +++ b/internal/compressions/writer_pool_deflate.go @@ -3,7 +3,6 @@ package compressions import ( - "compress/flate" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "io" ) @@ -15,7 +14,7 @@ func init() { return } - sharedDeflateWriterPool = NewWriterPool(CalculatePoolSize(), flate.BestCompression, func(writer io.Writer, level int) (Writer, error) { + sharedDeflateWriterPool = NewWriterPool(CalculatePoolSize(), func(writer io.Writer, level int) (Writer, error) { return newDeflateWriter(writer) }) } diff --git a/internal/compressions/writer_pool_gzip.go b/internal/compressions/writer_pool_gzip.go index 09f52a0..a23bd04 100644 --- a/internal/compressions/writer_pool_gzip.go +++ b/internal/compressions/writer_pool_gzip.go @@ -3,7 +3,6 @@ package compressions import ( - "compress/gzip" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "io" ) @@ -15,8 +14,7 @@ func init() { return } - - sharedGzipWriterPool = NewWriterPool(CalculatePoolSize(), gzip.BestCompression, func(writer io.Writer, level int) (Writer, error) { + sharedGzipWriterPool = NewWriterPool(CalculatePoolSize(), func(writer io.Writer, level int) (Writer, error) { return newGzipWriter(writer) }) } diff --git a/internal/compressions/writer_pool_zstd.go b/internal/compressions/writer_pool_zstd.go index b685410..41225e0 100644 --- a/internal/compressions/writer_pool_zstd.go +++ b/internal/compressions/writer_pool_zstd.go @@ -4,7 +4,6 @@ package compressions import ( teaconst "github.com/TeaOSLab/EdgeNode/internal/const" - "github.com/klauspost/compress/zstd" "io" ) @@ -15,7 +14,7 @@ func init() { return } - sharedZSTDWriterPool = NewWriterPool(CalculatePoolSize(), int(zstd.SpeedBestCompression), func(writer io.Writer, level int) (Writer, error) { + sharedZSTDWriterPool = NewWriterPool(CalculatePoolSize(), func(writer io.Writer, level int) (Writer, error) { return newZSTDWriter(writer) }) } diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index ac12403..9ab2637 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -614,6 +614,11 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) { return } + // 检查是否正繁忙 + if compressions.IsBusy() { + return + } + // 分区内容不压缩,防止读取失败 if !this.compressionConfig.EnablePartialContent && this.StatusCode() == http.StatusPartialContent { return @@ -733,7 +738,9 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) { // compression writer compressionWriter, err := compressions.NewWriter(this.writer, compressionType, int(this.compressionConfig.Level)) if err != nil { - remotelogs.Error("HTTP_WRITER", err.Error()) + if !compressions.CanIgnore(err) { + remotelogs.Error("HTTP_WRITER", "open compress writer failed: "+err.Error()) + } header.Del("Content-Encoding") if this.compressionCacheWriter != nil { _ = this.compressionCacheWriter.Discard()