内容压缩算法使用Pool管理

This commit is contained in:
GoEdgeLab
2022-03-20 00:05:47 +08:00
parent af7a5d6e26
commit d16245eb7a
25 changed files with 625 additions and 157 deletions

View File

@@ -2,7 +2,14 @@
package compressions
import "io"
type Reader interface {
Read(p []byte) (n int, err error)
Reset(reader io.Reader) error
RawClose() error
Close() error
SetPool(pool *ReaderPool)
ResetFinish()
}

View File

@@ -0,0 +1,26 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions
type BaseReader struct {
pool *ReaderPool
isFinished bool
}
func (this *BaseReader) SetPool(pool *ReaderPool) {
this.pool = pool
}
func (this *BaseReader) Finish(obj Reader) error {
err := obj.RawClose()
if err == nil && this.pool != nil && !this.isFinished {
this.pool.Put(obj)
}
this.isFinished = true
return err
}
func (this *BaseReader) ResetFinish() {
this.isFinished = false
}

View File

@@ -9,10 +9,16 @@ import (
)
type BrotliReader struct {
BaseReader
reader *brotli.Reader
}
func NewBrotliReader(reader io.Reader) (Reader, error) {
return sharedBrotliReaderPool.Get(reader)
}
func newBrotliReader(reader io.Reader) (Reader, error) {
return &BrotliReader{reader: brotli.NewReader(reader)}, nil
}
@@ -24,6 +30,14 @@ func (this *BrotliReader) Read(p []byte) (n int, err error) {
return
}
func (this *BrotliReader) Close() error {
func (this *BrotliReader) Reset(reader io.Reader) error {
return this.reader.Reset(reader)
}
func (this *BrotliReader) RawClose() error {
return nil
}
func (this *BrotliReader) Close() error {
return this.Finish(this)
}

View File

@@ -0,0 +1,51 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions_test
import (
"bytes"
"github.com/TeaOSLab/EdgeNode/internal/compressions"
"io"
"testing"
)
func TestBrotliReader(t *testing.T) {
for _, testString := range []string{"Hello", "World", "Ni", "Hao"} {
t.Log("===", testString, "===")
var buf = &bytes.Buffer{}
writer, err := compressions.NewBrotliWriter(buf, 5)
if err != nil {
t.Fatal(err)
}
_, err = writer.Write([]byte(testString))
if err != nil {
t.Fatal(err)
}
err = writer.Close()
if err != nil {
t.Fatal(err)
}
reader, err := compressions.NewBrotliReader(buf)
if err != nil {
t.Fatal(err)
}
var data = make([]byte, 4096)
for {
n, err := reader.Read(data)
if n > 0 {
t.Log(string(data[:n]))
}
if err != nil {
if err == io.EOF {
break
}
t.Fatal(err)
}
}
err = reader.Close()
if err != nil {
t.Fatal(err)
}
}
}

View File

@@ -8,10 +8,16 @@ import (
)
type DeflateReader struct {
BaseReader
reader io.ReadCloser
}
func NewDeflateReader(reader io.Reader) (Reader, error) {
return sharedDeflateReaderPool.Get(reader)
}
func newDeflateReader(reader io.Reader) (Reader, error) {
return &DeflateReader{reader: flate.NewReader(reader)}, nil
}
@@ -19,6 +25,15 @@ func (this *DeflateReader) Read(p []byte) (n int, err error) {
return this.reader.Read(p)
}
func (this *DeflateReader) Close() error {
func (this *DeflateReader) Reset(reader io.Reader) error {
this.reader = flate.NewReader(reader)
return nil
}
func (this *DeflateReader) RawClose() error {
return this.reader.Close()
}
func (this *DeflateReader) Close() error {
return this.Finish(this)
}

View File

@@ -0,0 +1,51 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions_test
import (
"bytes"
"github.com/TeaOSLab/EdgeNode/internal/compressions"
"io"
"testing"
)
func TestDeflateReader(t *testing.T) {
for _, testString := range []string{"Hello", "World", "Ni", "Hao"} {
t.Log("===", testString, "===")
var buf = &bytes.Buffer{}
writer, err := compressions.NewDeflateWriter(buf, 5)
if err != nil {
t.Fatal(err)
}
_, err = writer.Write([]byte(testString))
if err != nil {
t.Fatal(err)
}
err = writer.Close()
if err != nil {
t.Fatal(err)
}
reader, err := compressions.NewDeflateReader(buf)
if err != nil {
t.Fatal(err)
}
var data = make([]byte, 4096)
for {
n, err := reader.Read(data)
if n > 0 {
t.Log(string(data[:n]))
}
if err != nil {
if err == io.EOF {
break
}
t.Fatal(err)
}
}
err = reader.Close()
if err != nil {
t.Fatal(err)
}
}
}

View File

@@ -8,10 +8,16 @@ import (
)
type GzipReader struct {
BaseReader
reader *gzip.Reader
}
func NewGzipReader(reader io.Reader) (Reader, error) {
return sharedGzipReaderPool.Get(reader)
}
func newGzipReader(reader io.Reader) (Reader, error) {
r, err := gzip.NewReader(reader)
if err != nil {
return nil, err
@@ -25,6 +31,14 @@ func (this *GzipReader) Read(p []byte) (n int, err error) {
return this.reader.Read(p)
}
func (this *GzipReader) Close() error {
func (this *GzipReader) Reset(reader io.Reader) error {
return this.reader.Reset(reader)
}
func (this *GzipReader) RawClose() error {
return this.reader.Close()
}
func (this *GzipReader) Close() error {
return this.Finish(this)
}

View File

@@ -1,68 +1,106 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions
package compressions_test
import (
"bytes"
"errors"
"github.com/TeaOSLab/EdgeNode/internal/caches"
"github.com/TeaOSLab/EdgeNode/internal/compressions"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
"io"
"os"
"strings"
"testing"
)
func TestGzipReader(t *testing.T) {
fp, err := os.Open("/Users/WorkSpace/EdgeProject/EdgeCache/p43/36/7e/367e02720713fe05b66573a1d69b4f0a.cache")
if err != nil {
// not fatal
t.Log(err)
return
}
defer func() {
_ = fp.Close()
}()
var buf = make([]byte, 32*1024)
cacheReader := caches.NewFileReader(fp)
err = cacheReader.Init()
for _, testString := range []string{"Hello", "World", "Ni", "Hao"} {
t.Log("===", testString, "===")
var buf = &bytes.Buffer{}
writer, err := compressions.NewGzipWriter(buf, 5)
if err != nil {
t.Fatal(err)
}
var headerBuf = []byte{}
err = cacheReader.ReadHeader(buf, func(n int) (goNext bool, err error) {
headerBuf = append(headerBuf, buf[:n]...)
for {
nIndex := bytes.Index(headerBuf, []byte{'\n'})
if nIndex >= 0 {
row := headerBuf[:nIndex]
spaceIndex := bytes.Index(row, []byte{':'})
if spaceIndex <= 0 {
return false, errors.New("invalid header '" + string(row) + "'")
_, err = writer.Write([]byte(testString))
if err != nil {
t.Fatal(err)
}
err = writer.Close()
if err != nil {
t.Fatal(err)
}
headerBuf = headerBuf[nIndex+1:]
} else {
reader, err := compressions.NewGzipReader(buf)
if err != nil {
t.Fatal(err)
}
var data = make([]byte, 4096)
for {
n, err := reader.Read(data)
if n > 0 {
t.Log(string(data[:n]))
}
if err != nil {
if err == io.EOF {
break
}
t.Fatal(err)
}
return true, nil
})
reader, err := NewGzipReader(cacheReader)
}
err = reader.Close()
if err != nil {
t.Fatal(err)
}
for {
n, err := reader.Read(buf)
if err != nil {
if err != io.EOF {
t.Fatal(err)
} else {
break
}
}
t.Log(string(buf[:n]))
_ = n
}
}
func BenchmarkGzipReader(b *testing.B) {
var randomData = func() []byte {
var b = strings.Builder{}
for i := 0; i < 1024; i++ {
b.WriteString(types.String(rands.Int64() % 10))
}
return []byte(b.String())
}
var buf = &bytes.Buffer{}
writer, err := compressions.NewGzipWriter(buf, 5)
if err != nil {
b.Fatal(err)
}
_, err = writer.Write(randomData())
if err != nil {
b.Fatal(err)
}
err = writer.Close()
if err != nil {
b.Fatal(err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
var newBytes = make([]byte, buf.Len())
copy(newBytes, buf.Bytes())
reader, err := compressions.NewGzipReader(bytes.NewReader(newBytes))
if err != nil {
b.Fatal(err)
}
var data = make([]byte, 4096)
for {
n, err := reader.Read(data)
if n > 0 {
_ = data[:n]
}
if err != nil {
if err == io.EOF {
break
}
b.Fatal(err)
}
}
err = reader.Close()
if err != nil {
b.Fatal(err)
}
}
}

View File

@@ -0,0 +1,56 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions
import (
"io"
)
type ReaderPool struct {
c chan Reader
newFunc func(reader io.Reader) (Reader, error)
}
func NewReaderPool(maxSize int, newFunc func(reader io.Reader) (Reader, error)) *ReaderPool {
if maxSize <= 0 {
maxSize = 1024
}
return &ReaderPool{
c: make(chan Reader, maxSize),
newFunc: newFunc,
}
}
func (this *ReaderPool) Get(parentReader io.Reader) (Reader, error) {
select {
case reader := <-this.c:
err := reader.Reset(parentReader)
if err != nil {
// create new
reader, err = this.newFunc(parentReader)
if err != nil {
return nil, err
}
reader.SetPool(this)
return reader, nil
}
reader.ResetFinish()
return reader, nil
default:
// create new
reader, err := this.newFunc(parentReader)
if err != nil {
return nil, err
}
reader.SetPool(this)
return reader, nil
}
}
func (this *ReaderPool) Put(reader Reader) {
select {
case this.c <- reader:
default:
}
}

View File

@@ -0,0 +1,20 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions
import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
"io"
)
var sharedBrotliReaderPool *ReaderPool
func init() {
var maxSize = utils.SystemMemoryGB() * 256
if maxSize == 0 {
maxSize = 256
}
sharedBrotliReaderPool = NewReaderPool(maxSize, func(reader io.Reader) (Reader, error) {
return newBrotliReader(reader)
})
}

View File

@@ -0,0 +1,20 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions
import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
"io"
)
var sharedDeflateReaderPool *ReaderPool
func init() {
var maxSize = utils.SystemMemoryGB() * 256
if maxSize == 0 {
maxSize = 256
}
sharedDeflateReaderPool = NewReaderPool(maxSize, func(reader io.Reader) (Reader, error) {
return newDeflateReader(reader)
})
}

View File

@@ -0,0 +1,20 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions
import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
"io"
)
var sharedGzipReaderPool *ReaderPool
func init() {
var maxSize = utils.SystemMemoryGB() * 256
if maxSize == 0 {
maxSize = 256
}
sharedGzipReaderPool = NewReaderPool(maxSize, func(reader io.Reader) (Reader, error) {
return newGzipReader(reader)
})
}

View File

@@ -37,7 +37,6 @@ func NewReader(reader io.Reader, contentEncoding ContentEncoding) (Reader, error
}
// NewWriter 获取Writer
// TODO 考虑重用Writer
func NewWriter(writer io.Writer, compressType serverconfigs.HTTPCompressionType, level int) (Writer, error) {
switch compressType {
case serverconfigs.HTTPCompressionTypeGzip:

View File

@@ -2,9 +2,16 @@
package compressions
import "io"
type Writer interface {
Write(p []byte) (int, error)
Flush() error
Reset(writer io.Writer)
RawClose() error
Close() error
Level() int
SetPool(pool *WriterPool)
ResetFinish()
}

View File

@@ -0,0 +1,26 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions
type BaseWriter struct {
pool *WriterPool
isFinished bool
}
func (this *BaseWriter) SetPool(pool *WriterPool) {
this.pool = pool
}
func (this *BaseWriter) Finish(obj Writer) error {
err := obj.RawClose()
if err == nil && this.pool != nil && !this.isFinished {
this.pool.Put(obj)
}
this.isFinished = true
return err
}
func (this *BaseWriter) ResetFinish() {
this.isFinished = false
}

View File

@@ -8,11 +8,17 @@ import (
)
type BrotliWriter struct {
BaseWriter
writer *brotli.Writer
level int
}
func NewBrotliWriter(writer io.Writer, level int) (Writer, error) {
return sharedBrotliWriterPool.Get(writer, level)
}
func newBrotliWriter(writer io.Writer, level int) (*BrotliWriter, error) {
if level <= 0 {
level = brotli.BestSpeed
} else if level > brotli.BestCompression {
@@ -35,10 +41,18 @@ func (this *BrotliWriter) Flush() error {
return this.writer.Flush()
}
func (this *BrotliWriter) Close() error {
func (this *BrotliWriter) Reset(newWriter io.Writer) {
this.writer.Reset(newWriter)
}
func (this *BrotliWriter) RawClose() error {
return this.writer.Close()
}
func (this *BrotliWriter) Close() error {
return this.Finish(this)
}
func (this *BrotliWriter) Level() int {
return this.level
}

View File

@@ -8,11 +8,17 @@ import (
)
type DeflateWriter struct {
BaseWriter
writer *flate.Writer
level int
}
func NewDeflateWriter(writer io.Writer, level int) (Writer, error) {
return sharedDeflateWriterPool.Get(writer, level)
}
func newDeflateWriter(writer io.Writer, level int) (Writer, error) {
if level <= 0 {
level = flate.BestSpeed
} else if level > flate.BestCompression {
@@ -38,10 +44,18 @@ func (this *DeflateWriter) Flush() error {
return this.writer.Flush()
}
func (this *DeflateWriter) Close() error {
func (this *DeflateWriter) Reset(writer io.Writer) {
this.writer.Reset(writer)
}
func (this *DeflateWriter) RawClose() error {
return this.writer.Close()
}
func (this *DeflateWriter) Close() error {
return this.Finish(this)
}
func (this *DeflateWriter) Level() int {
return this.level
}

View File

@@ -0,0 +1,36 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions_test
import (
"bytes"
"github.com/TeaOSLab/EdgeNode/internal/compressions"
"strings"
"testing"
)
func BenchmarkDeflateWriter_Write(b *testing.B) {
var data = []byte(strings.Repeat("A", 1024))
for i := 0; i < b.N; i++ {
var buf = &bytes.Buffer{}
writer, err := compressions.NewDeflateWriter(buf, 5)
if err != nil {
b.Fatal(err)
}
for j := 0; j < 100; j++ {
_, err = writer.Write(data)
if err != nil {
b.Fatal(err)
}
/**err = writer.Flush()
if err != nil {
b.Fatal(err)
}**/
}
_ = writer.Close()
}
}

View File

@@ -1,51 +0,0 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions
import (
"bytes"
"io"
)
type EncodingWriter struct {
contentEncoding ContentEncoding
writer Writer
buf *bytes.Buffer
}
func NewEncodingWriter(contentEncoding ContentEncoding, writer Writer) (Writer, error) {
return &EncodingWriter{
contentEncoding: contentEncoding,
writer: writer,
buf: &bytes.Buffer{},
}, nil
}
func (this *EncodingWriter) Write(p []byte) (int, error) {
return this.buf.Write(p)
}
func (this *EncodingWriter) Flush() error {
return this.writer.Flush()
}
func (this *EncodingWriter) Close() error {
reader, err := NewReader(this.buf, this.contentEncoding)
if err != nil {
_ = this.writer.Close()
return err
}
_, err = io.Copy(this.writer, reader)
if err != nil {
_ = reader.Close()
_ = this.writer.Close()
return err
}
_ = reader.Close()
return this.writer.Close()
}
func (this *EncodingWriter) Level() int {
return this.writer.Level()
}

View File

@@ -1,47 +0,0 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions
import (
"bytes"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"testing"
)
func TestNewEncodingWriter(t *testing.T) {
var buf = &bytes.Buffer{}
subWriter, err := NewWriter(buf, serverconfigs.HTTPCompressionTypeGzip, 5)
if err != nil {
t.Fatal(err)
}
writer, err := NewEncodingWriter(ContentEncodingGzip, subWriter)
if err != nil {
t.Fatal(err)
}
gzipBuf := &bytes.Buffer{}
gzipWriter, err := NewGzipWriter(gzipBuf, 5)
if err != nil {
t.Fatal(err)
}
_, err = gzipWriter.Write([]byte("Hello"))
if err != nil {
t.Fatal(err)
}
_, err = gzipWriter.Write([]byte("World"))
if err != nil {
t.Fatal(err)
}
_ = gzipWriter.Close()
_, err = writer.Write(gzipBuf.Bytes())
if err != nil {
t.Fatal(err)
}
_ = writer.Close()
t.Log(buf.String())
}

View File

@@ -8,11 +8,17 @@ import (
)
type GzipWriter struct {
BaseWriter
writer *gzip.Writer
level int
}
func NewGzipWriter(writer io.Writer, level int) (Writer, error) {
return sharedGzipWriterPool.Get(writer, level)
}
func newGzipWriter(writer io.Writer, level int) (Writer, error) {
if level <= 0 {
level = gzip.BestSpeed
} else if level > gzip.BestCompression {
@@ -38,10 +44,18 @@ func (this *GzipWriter) Flush() error {
return this.writer.Flush()
}
func (this *GzipWriter) Close() error {
func (this *GzipWriter) Reset(writer io.Writer) {
this.writer.Reset(writer)
}
func (this *GzipWriter) RawClose() error {
return this.writer.Close()
}
func (this *GzipWriter) Close() error {
return this.Finish(this)
}
func (this *GzipWriter) Level() int {
return this.level
}

View File

@@ -0,0 +1,61 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions
import (
"io"
)
type WriterPool struct {
m map[int]chan Writer // level => chan Writer
newFunc func(writer io.Writer, level int) (Writer, error)
}
func NewWriterPool(maxSize int, maxLevel int, newFunc func(writer io.Writer, level int) (Writer, error)) *WriterPool {
if maxSize <= 0 {
maxSize = 1024
}
var m = map[int]chan Writer{}
for i := 0; i <= maxLevel; i++ {
m[i] = make(chan Writer, maxSize)
}
return &WriterPool{
m: m,
newFunc: newFunc,
}
}
func (this *WriterPool) Get(parentWriter io.Writer, level int) (Writer, error) {
c, ok := this.m[level]
if !ok {
c = this.m[0]
}
select {
case writer := <-c:
writer.Reset(parentWriter)
writer.ResetFinish()
return writer, nil
default:
writer, err := this.newFunc(parentWriter, level)
if err != nil {
return nil, err
}
writer.SetPool(this)
return writer, nil
}
}
func (this *WriterPool) Put(writer Writer) {
var level = writer.Level()
c, ok := this.m[level]
if !ok {
c = this.m[0]
}
select {
case c <- writer:
default:
}
}

View File

@@ -0,0 +1,21 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions
import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/andybalholm/brotli"
"io"
)
var sharedBrotliWriterPool *WriterPool
func init() {
var maxSize = utils.SystemMemoryGB() * 256
if maxSize == 0 {
maxSize = 256
}
sharedBrotliWriterPool = NewWriterPool(maxSize, brotli.BestCompression, func(writer io.Writer, level int) (Writer, error) {
return newBrotliWriter(writer, level)
})
}

View File

@@ -0,0 +1,21 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions
import (
"compress/flate"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"io"
)
var sharedDeflateWriterPool *WriterPool
func init() {
var maxSize = utils.SystemMemoryGB() * 256
if maxSize == 0 {
maxSize = 256
}
sharedDeflateWriterPool = NewWriterPool(maxSize, flate.BestCompression, func(writer io.Writer, level int) (Writer, error) {
return newDeflateWriter(writer, level)
})
}

View File

@@ -0,0 +1,21 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions
import (
"compress/gzip"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"io"
)
var sharedGzipWriterPool *WriterPool
func init() {
var maxSize = utils.SystemMemoryGB() * 256
if maxSize == 0 {
maxSize = 256
}
sharedGzipWriterPool = NewWriterPool(maxSize, gzip.BestCompression, func(writer io.Writer, level int) (Writer, error) {
return newGzipWriter(writer, level)
})
}