From c6e41c3d10400022255e7eb751e6a7b8a68a1326 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Mon, 27 Jun 2022 22:40:36 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81ZSTD=E5=8E=8B=E7=BC=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/compressions/reader_pool_zstd.go | 20 ++++ internal/compressions/reader_zstd.go | 45 +++++++++ internal/compressions/reader_zstd_test.go | 106 ++++++++++++++++++++++ internal/compressions/utils.go | 12 ++- internal/compressions/writer_pool_zstd.go | 21 +++++ internal/compressions/writer_zstd.go | 57 ++++++++++++ internal/compressions/writer_zstd_test.go | 82 +++++++++++++++++ internal/nodes/http_writer.go | 6 +- 8 files changed, 345 insertions(+), 4 deletions(-) create mode 100644 internal/compressions/reader_pool_zstd.go create mode 100644 internal/compressions/reader_zstd.go create mode 100644 internal/compressions/reader_zstd_test.go create mode 100644 internal/compressions/writer_pool_zstd.go create mode 100644 internal/compressions/writer_zstd.go create mode 100644 internal/compressions/writer_zstd_test.go diff --git a/internal/compressions/reader_pool_zstd.go b/internal/compressions/reader_pool_zstd.go new file mode 100644 index 0000000..baae79b --- /dev/null +++ b/internal/compressions/reader_pool_zstd.go @@ -0,0 +1,20 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package compressions + +import ( + "github.com/TeaOSLab/EdgeNode/internal/utils" + "io" +) + +var sharedZSTDReaderPool *ReaderPool + +func init() { + var maxSize = utils.SystemMemoryGB() * 256 + if maxSize == 0 { + maxSize = 256 + } + sharedZSTDReaderPool = NewReaderPool(maxSize, func(reader io.Reader) (Reader, error) { + return newZSTDReader(reader) + }) +} diff --git a/internal/compressions/reader_zstd.go b/internal/compressions/reader_zstd.go new file mode 100644 index 0000000..b76c43e --- /dev/null +++ b/internal/compressions/reader_zstd.go @@ -0,0 +1,45 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package compressions + +import ( + "github.com/klauspost/compress/zstd" + "io" +) + +type ZSTDReader struct { + BaseReader + + reader *zstd.Decoder +} + +func NewZSTDReader(reader io.Reader) (Reader, error) { + return sharedZSTDReaderPool.Get(reader) +} + +func newZSTDReader(reader io.Reader) (Reader, error) { + r, err := zstd.NewReader(reader) + if err != nil { + return nil, err + } + return &ZSTDReader{ + reader: r, + }, nil +} + +func (this *ZSTDReader) Read(p []byte) (n int, err error) { + return this.reader.Read(p) +} + +func (this *ZSTDReader) Reset(reader io.Reader) error { + return this.reader.Reset(reader) +} + +func (this *ZSTDReader) RawClose() error { + this.reader.Close() + return nil +} + +func (this *ZSTDReader) Close() error { + return this.Finish(this) +} diff --git a/internal/compressions/reader_zstd_test.go b/internal/compressions/reader_zstd_test.go new file mode 100644 index 0000000..a7e00c3 --- /dev/null +++ b/internal/compressions/reader_zstd_test.go @@ -0,0 +1,106 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package compressions_test + +import ( + "bytes" + "github.com/TeaOSLab/EdgeNode/internal/compressions" + "github.com/iwind/TeaGo/rands" + "github.com/iwind/TeaGo/types" + "io" + "strings" + "testing" +) + +func TestZSTDReader(t *testing.T) { + for _, testString := range []string{"Hello", "World", "Ni", "Hao"} { + t.Log("===", testString, "===") + var buf = &bytes.Buffer{} + writer, err := compressions.NewZSTDWriter(buf, 5) + if err != nil { + t.Fatal(err) + } + _, err = writer.Write([]byte(testString)) + if err != nil { + t.Fatal(err) + } + err = writer.Close() + if err != nil { + t.Fatal(err) + } + + reader, err := compressions.NewZSTDReader(buf) + if err != nil { + t.Fatal(err) + } + var data = make([]byte, 4096) + for { + n, err := reader.Read(data) + if n > 0 { + t.Log(string(data[:n])) + } + if err != nil { + if err == io.EOF { + break + } + t.Fatal(err) + } + } + err = reader.Close() + if err != nil { + t.Fatal(err) + } + } +} + +func BenchmarkZSTDReader(b *testing.B) { + var randomData = func() []byte { + var b = strings.Builder{} + for i := 0; i < 1024; i++ { + b.WriteString(types.String(rands.Int64() % 10)) + } + return []byte(b.String()) + } + + var buf = &bytes.Buffer{} + writer, err := compressions.NewZSTDWriter(buf, 5) + if err != nil { + b.Fatal(err) + } + _, err = writer.Write(randomData()) + if err != nil { + b.Fatal(err) + } + err = writer.Close() + if err != nil { + b.Fatal(err) + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + var newBytes = make([]byte, buf.Len()) + copy(newBytes, buf.Bytes()) + reader, err := compressions.NewZSTDReader(bytes.NewReader(newBytes)) + if err != nil { + b.Fatal(err) + } + var data = make([]byte, 4096) + for { + n, err := reader.Read(data) + if n > 0 { + _ = data[:n] + } + if err != nil { + if err == io.EOF { + break + } + b.Fatal(err) + } + } + err = reader.Close() + if err != nil { + b.Fatal(err) + } + } +} diff --git a/internal/compressions/utils.go b/internal/compressions/utils.go index e469fab..c466c91 100644 --- a/internal/compressions/utils.go +++ b/internal/compressions/utils.go @@ -14,13 +14,19 @@ const ( ContentEncodingBr ContentEncoding = "br" ContentEncodingGzip ContentEncoding = "gzip" ContentEncodingDeflate ContentEncoding = "deflate" + ContentEncodingZSTD ContentEncoding = "zstd" ) var ErrNotSupportedContentEncoding = errors.New("not supported content encoding") // AllEncodings 当前支持的所有编码 func AllEncodings() []ContentEncoding { - return []ContentEncoding{ContentEncodingBr, ContentEncodingGzip, ContentEncodingDeflate} + return []ContentEncoding{ + ContentEncodingBr, + ContentEncodingGzip, + ContentEncodingZSTD, + ContentEncodingDeflate, + } } // NewReader 获取Reader @@ -32,6 +38,8 @@ func NewReader(reader io.Reader, contentEncoding ContentEncoding) (Reader, error return NewGzipReader(reader) case ContentEncodingDeflate: return NewDeflateReader(reader) + case ContentEncodingZSTD: + return NewZSTDReader(reader) } return nil, ErrNotSupportedContentEncoding } @@ -45,6 +53,8 @@ func NewWriter(writer io.Writer, compressType serverconfigs.HTTPCompressionType, return NewDeflateWriter(writer, level) case serverconfigs.HTTPCompressionTypeBrotli: return NewBrotliWriter(writer, level) + case serverconfigs.HTTPCompressionTypeZSTD: + return NewZSTDWriter(writer, level) } return nil, errors.New("invalid compression type '" + compressType + "'") } diff --git a/internal/compressions/writer_pool_zstd.go b/internal/compressions/writer_pool_zstd.go new file mode 100644 index 0000000..4212da8 --- /dev/null +++ b/internal/compressions/writer_pool_zstd.go @@ -0,0 +1,21 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package compressions + +import ( + "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/klauspost/compress/zstd" + "io" +) + +var sharedZSTDWriterPool *WriterPool + +func init() { + var maxSize = utils.SystemMemoryGB() * 256 + if maxSize == 0 { + maxSize = 256 + } + sharedZSTDWriterPool = NewWriterPool(maxSize, int(zstd.SpeedBestCompression), func(writer io.Writer, level int) (Writer, error) { + return newZSTDWriter(writer, level) + }) +} diff --git a/internal/compressions/writer_zstd.go b/internal/compressions/writer_zstd.go new file mode 100644 index 0000000..942e0b8 --- /dev/null +++ b/internal/compressions/writer_zstd.go @@ -0,0 +1,57 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package compressions + +import ( + "github.com/klauspost/compress/zstd" + "io" +) + +type ZSTDWriter struct { + BaseWriter + + writer *zstd.Encoder + level int +} + +func NewZSTDWriter(writer io.Writer, level int) (Writer, error) { + return sharedZSTDWriterPool.Get(writer, level) +} + +func newZSTDWriter(writer io.Writer, level int) (Writer, error) { + var zstdLevel = zstd.EncoderLevelFromZstd(level) + + zstdWriter, err := zstd.NewWriter(writer, zstd.WithEncoderLevel(zstdLevel)) + if err != nil { + return nil, err + } + + return &ZSTDWriter{ + writer: zstdWriter, + level: level, + }, nil +} + +func (this *ZSTDWriter) Write(p []byte) (int, error) { + return this.writer.Write(p) +} + +func (this *ZSTDWriter) Flush() error { + return this.writer.Flush() +} + +func (this *ZSTDWriter) Reset(writer io.Writer) { + this.writer.Reset(writer) +} + +func (this *ZSTDWriter) RawClose() error { + return this.writer.Close() +} + +func (this *ZSTDWriter) Close() error { + return this.Finish(this) +} + +func (this *ZSTDWriter) Level() int { + return this.level +} diff --git a/internal/compressions/writer_zstd_test.go b/internal/compressions/writer_zstd_test.go new file mode 100644 index 0000000..7f87a95 --- /dev/null +++ b/internal/compressions/writer_zstd_test.go @@ -0,0 +1,82 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package compressions_test + +import ( + "bytes" + "github.com/TeaOSLab/EdgeNode/internal/compressions" + "strings" + "testing" +) + +func TestNewZSTDWriter(t *testing.T) { + var buf = &bytes.Buffer{} + writer, err := compressions.NewZSTDWriter(buf, 10) + if err != nil { + t.Fatal(err) + } + var originData = []byte(strings.Repeat("Hello", 1024)) + _, err = writer.Write(originData) + if err != nil { + t.Fatal(err) + } + err = writer.Close() + if err != nil { + t.Fatal(err) + } + t.Log("origin data:", len(originData), "result:", buf.Len()) +} + +func BenchmarkZSTDWriter_Write(b *testing.B) { + var data = []byte(strings.Repeat("A", 1024)) + + for i := 0; i < b.N; i++ { + var buf = &bytes.Buffer{} + writer, err := compressions.NewZSTDWriter(buf, 5) + if err != nil { + b.Fatal(err) + } + + for j := 0; j < 100; j++ { + _, err = writer.Write(data) + if err != nil { + b.Fatal(err) + } + + /**err = writer.Flush() + if err != nil { + b.Fatal(err) + }**/ + } + + _ = writer.Close() + } +} + +func BenchmarkZSTDWriter_Write_Parallel(b *testing.B) { + var data = []byte(strings.Repeat("A", 1024)) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + var buf = &bytes.Buffer{} + writer, err := compressions.NewZSTDWriter(buf, 5) + if err != nil { + b.Fatal(err) + } + + for j := 0; j < 100; j++ { + _, err = writer.Write(data) + if err != nil { + b.Fatal(err) + } + + /**err = writer.Flush() + if err != nil { + b.Fatal(err) + }**/ + } + + _ = writer.Close() + } + }) +} diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index 7b5e4fb..c0dd1f5 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -509,7 +509,7 @@ func (this *HTTPWriter) PrepareWebP(resp *http.Response, size int64) { var contentEncoding = this.GetHeader("Content-Encoding") switch contentEncoding { - case "gzip", "deflate", "br": + case "gzip", "deflate", "br", "zstd": reader, err := compressions.NewReader(resp.Body, contentEncoding) if err != nil { return @@ -547,7 +547,7 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) { var contentEncoding = this.GetHeader("Content-Encoding") if this.compressionConfig == nil || !this.compressionConfig.IsOn { - if lists.ContainsString([]string{"gzip", "deflate", "br"}, contentEncoding) && !httpAcceptEncoding(acceptEncodings, contentEncoding) { + if lists.ContainsString([]string{"gzip", "deflate", "br", "zstd"}, contentEncoding) && !httpAcceptEncoding(acceptEncodings, contentEncoding) { reader, err := compressions.NewReader(resp.Body, contentEncoding) if err != nil { return @@ -564,7 +564,7 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) { } // 如果已经有编码则不处理 - if len(contentEncoding) > 0 && (!this.compressionConfig.DecompressData || !lists.ContainsString([]string{"gzip", "deflate", "br"}, contentEncoding)) { + if len(contentEncoding) > 0 && (!this.compressionConfig.DecompressData || !lists.ContainsString([]string{"gzip", "deflate", "br", "zstd"}, contentEncoding)) { return }