支持ZSTD压缩

This commit is contained in:
GoEdgeLab
2022-06-27 22:40:36 +08:00
parent ff6bcb1a06
commit c6e41c3d10
8 changed files with 345 additions and 4 deletions

View File

@@ -0,0 +1,20 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions
import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
"io"
)
var sharedZSTDReaderPool *ReaderPool
func init() {
var maxSize = utils.SystemMemoryGB() * 256
if maxSize == 0 {
maxSize = 256
}
sharedZSTDReaderPool = NewReaderPool(maxSize, func(reader io.Reader) (Reader, error) {
return newZSTDReader(reader)
})
}

View File

@@ -0,0 +1,45 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions
import (
"github.com/klauspost/compress/zstd"
"io"
)
type ZSTDReader struct {
BaseReader
reader *zstd.Decoder
}
func NewZSTDReader(reader io.Reader) (Reader, error) {
return sharedZSTDReaderPool.Get(reader)
}
func newZSTDReader(reader io.Reader) (Reader, error) {
r, err := zstd.NewReader(reader)
if err != nil {
return nil, err
}
return &ZSTDReader{
reader: r,
}, nil
}
func (this *ZSTDReader) Read(p []byte) (n int, err error) {
return this.reader.Read(p)
}
func (this *ZSTDReader) Reset(reader io.Reader) error {
return this.reader.Reset(reader)
}
func (this *ZSTDReader) RawClose() error {
this.reader.Close()
return nil
}
func (this *ZSTDReader) Close() error {
return this.Finish(this)
}

View File

@@ -0,0 +1,106 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions_test
import (
"bytes"
"github.com/TeaOSLab/EdgeNode/internal/compressions"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
"io"
"strings"
"testing"
)
func TestZSTDReader(t *testing.T) {
for _, testString := range []string{"Hello", "World", "Ni", "Hao"} {
t.Log("===", testString, "===")
var buf = &bytes.Buffer{}
writer, err := compressions.NewZSTDWriter(buf, 5)
if err != nil {
t.Fatal(err)
}
_, err = writer.Write([]byte(testString))
if err != nil {
t.Fatal(err)
}
err = writer.Close()
if err != nil {
t.Fatal(err)
}
reader, err := compressions.NewZSTDReader(buf)
if err != nil {
t.Fatal(err)
}
var data = make([]byte, 4096)
for {
n, err := reader.Read(data)
if n > 0 {
t.Log(string(data[:n]))
}
if err != nil {
if err == io.EOF {
break
}
t.Fatal(err)
}
}
err = reader.Close()
if err != nil {
t.Fatal(err)
}
}
}
func BenchmarkZSTDReader(b *testing.B) {
var randomData = func() []byte {
var b = strings.Builder{}
for i := 0; i < 1024; i++ {
b.WriteString(types.String(rands.Int64() % 10))
}
return []byte(b.String())
}
var buf = &bytes.Buffer{}
writer, err := compressions.NewZSTDWriter(buf, 5)
if err != nil {
b.Fatal(err)
}
_, err = writer.Write(randomData())
if err != nil {
b.Fatal(err)
}
err = writer.Close()
if err != nil {
b.Fatal(err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
var newBytes = make([]byte, buf.Len())
copy(newBytes, buf.Bytes())
reader, err := compressions.NewZSTDReader(bytes.NewReader(newBytes))
if err != nil {
b.Fatal(err)
}
var data = make([]byte, 4096)
for {
n, err := reader.Read(data)
if n > 0 {
_ = data[:n]
}
if err != nil {
if err == io.EOF {
break
}
b.Fatal(err)
}
}
err = reader.Close()
if err != nil {
b.Fatal(err)
}
}
}

View File

@@ -14,13 +14,19 @@ const (
ContentEncodingBr ContentEncoding = "br"
ContentEncodingGzip ContentEncoding = "gzip"
ContentEncodingDeflate ContentEncoding = "deflate"
ContentEncodingZSTD ContentEncoding = "zstd"
)
var ErrNotSupportedContentEncoding = errors.New("not supported content encoding")
// AllEncodings 当前支持的所有编码
func AllEncodings() []ContentEncoding {
return []ContentEncoding{ContentEncodingBr, ContentEncodingGzip, ContentEncodingDeflate}
return []ContentEncoding{
ContentEncodingBr,
ContentEncodingGzip,
ContentEncodingZSTD,
ContentEncodingDeflate,
}
}
// NewReader 获取Reader
@@ -32,6 +38,8 @@ func NewReader(reader io.Reader, contentEncoding ContentEncoding) (Reader, error
return NewGzipReader(reader)
case ContentEncodingDeflate:
return NewDeflateReader(reader)
case ContentEncodingZSTD:
return NewZSTDReader(reader)
}
return nil, ErrNotSupportedContentEncoding
}
@@ -45,6 +53,8 @@ func NewWriter(writer io.Writer, compressType serverconfigs.HTTPCompressionType,
return NewDeflateWriter(writer, level)
case serverconfigs.HTTPCompressionTypeBrotli:
return NewBrotliWriter(writer, level)
case serverconfigs.HTTPCompressionTypeZSTD:
return NewZSTDWriter(writer, level)
}
return nil, errors.New("invalid compression type '" + compressType + "'")
}

View File

@@ -0,0 +1,21 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions
import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/klauspost/compress/zstd"
"io"
)
var sharedZSTDWriterPool *WriterPool
func init() {
var maxSize = utils.SystemMemoryGB() * 256
if maxSize == 0 {
maxSize = 256
}
sharedZSTDWriterPool = NewWriterPool(maxSize, int(zstd.SpeedBestCompression), func(writer io.Writer, level int) (Writer, error) {
return newZSTDWriter(writer, level)
})
}

View File

@@ -0,0 +1,57 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions
import (
"github.com/klauspost/compress/zstd"
"io"
)
type ZSTDWriter struct {
BaseWriter
writer *zstd.Encoder
level int
}
func NewZSTDWriter(writer io.Writer, level int) (Writer, error) {
return sharedZSTDWriterPool.Get(writer, level)
}
func newZSTDWriter(writer io.Writer, level int) (Writer, error) {
var zstdLevel = zstd.EncoderLevelFromZstd(level)
zstdWriter, err := zstd.NewWriter(writer, zstd.WithEncoderLevel(zstdLevel))
if err != nil {
return nil, err
}
return &ZSTDWriter{
writer: zstdWriter,
level: level,
}, nil
}
func (this *ZSTDWriter) Write(p []byte) (int, error) {
return this.writer.Write(p)
}
func (this *ZSTDWriter) Flush() error {
return this.writer.Flush()
}
func (this *ZSTDWriter) Reset(writer io.Writer) {
this.writer.Reset(writer)
}
func (this *ZSTDWriter) RawClose() error {
return this.writer.Close()
}
func (this *ZSTDWriter) Close() error {
return this.Finish(this)
}
func (this *ZSTDWriter) Level() int {
return this.level
}

View File

@@ -0,0 +1,82 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions_test
import (
"bytes"
"github.com/TeaOSLab/EdgeNode/internal/compressions"
"strings"
"testing"
)
func TestNewZSTDWriter(t *testing.T) {
var buf = &bytes.Buffer{}
writer, err := compressions.NewZSTDWriter(buf, 10)
if err != nil {
t.Fatal(err)
}
var originData = []byte(strings.Repeat("Hello", 1024))
_, err = writer.Write(originData)
if err != nil {
t.Fatal(err)
}
err = writer.Close()
if err != nil {
t.Fatal(err)
}
t.Log("origin data:", len(originData), "result:", buf.Len())
}
func BenchmarkZSTDWriter_Write(b *testing.B) {
var data = []byte(strings.Repeat("A", 1024))
for i := 0; i < b.N; i++ {
var buf = &bytes.Buffer{}
writer, err := compressions.NewZSTDWriter(buf, 5)
if err != nil {
b.Fatal(err)
}
for j := 0; j < 100; j++ {
_, err = writer.Write(data)
if err != nil {
b.Fatal(err)
}
/**err = writer.Flush()
if err != nil {
b.Fatal(err)
}**/
}
_ = writer.Close()
}
}
func BenchmarkZSTDWriter_Write_Parallel(b *testing.B) {
var data = []byte(strings.Repeat("A", 1024))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var buf = &bytes.Buffer{}
writer, err := compressions.NewZSTDWriter(buf, 5)
if err != nil {
b.Fatal(err)
}
for j := 0; j < 100; j++ {
_, err = writer.Write(data)
if err != nil {
b.Fatal(err)
}
/**err = writer.Flush()
if err != nil {
b.Fatal(err)
}**/
}
_ = writer.Close()
}
})
}

View File

@@ -509,7 +509,7 @@ func (this *HTTPWriter) PrepareWebP(resp *http.Response, size int64) {
var contentEncoding = this.GetHeader("Content-Encoding")
switch contentEncoding {
case "gzip", "deflate", "br":
case "gzip", "deflate", "br", "zstd":
reader, err := compressions.NewReader(resp.Body, contentEncoding)
if err != nil {
return
@@ -547,7 +547,7 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) {
var contentEncoding = this.GetHeader("Content-Encoding")
if this.compressionConfig == nil || !this.compressionConfig.IsOn {
if lists.ContainsString([]string{"gzip", "deflate", "br"}, contentEncoding) && !httpAcceptEncoding(acceptEncodings, contentEncoding) {
if lists.ContainsString([]string{"gzip", "deflate", "br", "zstd"}, contentEncoding) && !httpAcceptEncoding(acceptEncodings, contentEncoding) {
reader, err := compressions.NewReader(resp.Body, contentEncoding)
if err != nil {
return
@@ -564,7 +564,7 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) {
}
// 如果已经有编码则不处理
if len(contentEncoding) > 0 && (!this.compressionConfig.DecompressData || !lists.ContainsString([]string{"gzip", "deflate", "br"}, contentEncoding)) {
if len(contentEncoding) > 0 && (!this.compressionConfig.DecompressData || !lists.ContainsString([]string{"gzip", "deflate", "br", "zstd"}, contentEncoding)) {
return
}