From b3aa8dfeedf81ee2c738756cbf20a0245dfc4635 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Sun, 28 Apr 2024 10:06:29 +0800 Subject: [PATCH] =?UTF-8?q?bfs=EF=BC=9A=E5=AF=B9FileHeader=E7=9A=84?= =?UTF-8?q?=E5=8E=8B=E7=BC=A9=E5=92=8C=E8=A7=A3=E5=8E=8B=E4=BD=BF=E7=94=A8?= =?UTF-8?q?Pool=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/utils/bfs/file_header.go | 85 ++++++++--- internal/utils/bfs/file_header_lazy.go | 23 ++- internal/utils/bfs/file_header_lazy_test.go | 87 +++++++++++ internal/utils/bfs/file_header_test.go | 155 ++++++++++++++++++++ internal/utils/bfs/gzip_reader_pool.go | 66 +++++++++ internal/utils/bfs/gzip_writer_pool.go | 63 ++++++++ internal/utils/bfs/meta_file.go | 37 +---- internal/utils/percpu/proc_id.go | 19 +++ 8 files changed, 470 insertions(+), 65 deletions(-) create mode 100644 internal/utils/bfs/file_header_lazy_test.go create mode 100644 internal/utils/bfs/gzip_reader_pool.go create mode 100644 internal/utils/bfs/gzip_writer_pool.go create mode 100644 internal/utils/percpu/proc_id.go diff --git a/internal/utils/bfs/file_header.go b/internal/utils/bfs/file_header.go index 18911fc..1b5f71c 100644 --- a/internal/utils/bfs/file_header.go +++ b/internal/utils/bfs/file_header.go @@ -3,6 +3,8 @@ package bfs import ( + "encoding/json" + "github.com/TeaOSLab/EdgeNode/internal/utils" "sort" ) @@ -20,6 +22,35 @@ type FileHeader struct { IsWriting bool `json:"11,omitempty"` } +func (this *FileHeader) BlockAt(offset int64) (blockInfo BlockInfo, ok bool) { + var l = len(this.BodyBlocks) + if l == 1 { + if this.BodyBlocks[0].Contains(offset) { + return this.BodyBlocks[0], true + } + return + } + + sort.Search(l, func(i int) bool { + if this.BodyBlocks[i].Contains(offset) { + blockInfo = this.BodyBlocks[i] + ok = true + return true + } + return this.BodyBlocks[i].OriginOffsetFrom > offset + }) + + return +} + +func (this *FileHeader) MaxOffset() int64 { + var l = len(this.BodyBlocks) + if l > 0 { + return this.BodyBlocks[l-1].OriginOffsetTo + } + return 0 +} + func (this *FileHeader) Compact() { // TODO 合并相邻的headerBlocks和bodyBlocks(必须是对应的BFile offset也要相邻) @@ -65,31 +96,37 @@ func (this *FileHeader) Clone() *FileHeader { } } -func (this *FileHeader) BlockAt(offset int64) (blockInfo BlockInfo, ok bool) { - var l = len(this.BodyBlocks) - if l == 1 { - if this.BodyBlocks[0].Contains(offset) { - return this.BodyBlocks[0], true - } - return +func (this *FileHeader) Encode(hash string) ([]byte, error) { + headerJSON, err := json.Marshal(this) + if err != nil { + return nil, err } - sort.Search(l, func(i int) bool { - if this.BodyBlocks[i].Contains(offset) { - blockInfo = this.BodyBlocks[i] - ok = true - return true - } - return this.BodyBlocks[i].OriginOffsetFrom > offset - }) - - return -} - -func (this *FileHeader) MaxOffset() int64 { - var l = len(this.BodyBlocks) - if l > 0 { - return this.BodyBlocks[l-1].OriginOffsetTo + // we do not compress data which size is less than 100 bytes + if len(headerJSON) < 100 { + return EncodeMetaBlock(MetaActionNew, hash, append([]byte("json:"), headerJSON...)) } - return 0 + + var buf = utils.SharedBufferPool.Get() + defer utils.SharedBufferPool.Put(buf) + + compressor, err := SharedCompressPool.Get(buf) + if err != nil { + return nil, err + } + + _, err = compressor.Write(headerJSON) + if err != nil { + _ = compressor.Close() + SharedCompressPool.Put(compressor) + return nil, err + } + + err = compressor.Close() + SharedCompressPool.Put(compressor) + if err != nil { + return nil, err + } + + return EncodeMetaBlock(MetaActionNew, hash, buf.Bytes()) } diff --git a/internal/utils/bfs/file_header_lazy.go b/internal/utils/bfs/file_header_lazy.go index b88d422..2499ada 100644 --- a/internal/utils/bfs/file_header_lazy.go +++ b/internal/utils/bfs/file_header_lazy.go @@ -5,7 +5,6 @@ package bfs import ( "bytes" "encoding/json" - "github.com/klauspost/compress/gzip" ) // LazyFileHeader load file header lazily to save memory @@ -31,18 +30,30 @@ func (this *LazyFileHeader) FileHeaderUnsafe() (*FileHeader, error) { return this.fileHeader, nil } - // TODO 使用pool管理gzip - gzReader, err := gzip.NewReader(bytes.NewBuffer(this.rawData)) + var jsonPrefix = []byte("json:") + + var header = &FileHeader{} + + // json + if bytes.HasPrefix(this.rawData, jsonPrefix) { + err := json.Unmarshal(this.rawData[len(jsonPrefix):], header) + if err != nil { + return nil, err + } + return header, nil + } + + decompressor, err := SharedDecompressPool.Get(bytes.NewBuffer(this.rawData)) if err != nil { return nil, err } defer func() { - _ = gzReader.Close() + _ = decompressor.Close() + SharedDecompressPool.Put(decompressor) }() - var header = &FileHeader{} - err = json.NewDecoder(gzReader).Decode(header) + err = json.NewDecoder(decompressor).Decode(header) if err != nil { return nil, err } diff --git a/internal/utils/bfs/file_header_lazy_test.go b/internal/utils/bfs/file_header_lazy_test.go new file mode 100644 index 0000000..ad000bd --- /dev/null +++ b/internal/utils/bfs/file_header_lazy_test.go @@ -0,0 +1,87 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package bfs_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/utils/bfs" + "runtime" + "testing" +) + +func TestNewLazyFileHeaderFromData(t *testing.T) { + var header = &bfs.FileHeader{ + Version: 1, + Status: 200, + BodyBlocks: []bfs.BlockInfo{ + { + BFileOffsetFrom: 0, + BFileOffsetTo: 1 << 20, + }, + }, + } + blockBytes, err := header.Encode(bfs.Hash("123456")) + if err != nil { + t.Fatal(err) + } + + _, _, rawData, err := bfs.DecodeMetaBlock(blockBytes) + if err != nil { + t.Fatal(err) + } + + var lazyHeader = bfs.NewLazyFileHeaderFromData(rawData) + newHeader, err := lazyHeader.FileHeaderUnsafe() + if err != nil { + t.Fatal(err) + } + + t.Log(newHeader) +} + +func BenchmarkLazyFileHeader_Decode(b *testing.B) { + runtime.GOMAXPROCS(12) + + var header = &bfs.FileHeader{ + Version: 1, + Status: 200, + BodyBlocks: []bfs.BlockInfo{}, + } + var offset int64 + for { + var end = offset + 16<<10 + if end > 1<<20 { + break + } + + header.BodyBlocks = append(header.BodyBlocks, bfs.BlockInfo{ + BFileOffsetFrom: offset, + BFileOffsetTo: end, + }) + + offset = end + } + + var hash = bfs.Hash("123456") + + blockBytes, err := header.Encode(hash) + if err != nil { + b.Fatal(err) + } + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, _, rawData, decodeErr := bfs.DecodeMetaBlock(blockBytes) + if decodeErr != nil { + b.Fatal(decodeErr) + } + + var lazyHeader = bfs.NewLazyFileHeaderFromData(rawData) + _, decodeErr = lazyHeader.FileHeaderUnsafe() + if decodeErr != nil { + b.Fatal(decodeErr) + } + } + }) +} diff --git a/internal/utils/bfs/file_header_test.go b/internal/utils/bfs/file_header_test.go index f7e8c66..f3df501 100644 --- a/internal/utils/bfs/file_header_test.go +++ b/internal/utils/bfs/file_header_test.go @@ -3,9 +3,13 @@ package bfs_test import ( + "encoding/json" "github.com/TeaOSLab/EdgeNode/internal/utils/bfs" + "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" "github.com/iwind/TeaGo/assert" "github.com/iwind/TeaGo/logs" + "math/rand" + "runtime" "testing" ) @@ -176,6 +180,120 @@ func TestFileHeader_Clone(t *testing.T) { a.IsTrue(header.BodyBlocks[0].OriginOffsetFrom != clonedHeader.BodyBlocks[0].OriginOffsetFrom) } +func TestFileHeader_Encode(t *testing.T) { + { + var header = &bfs.FileHeader{ + Version: 1, + Status: 200, + ModifiedAt: fasttime.Now().Unix(), + ExpiresAt: fasttime.Now().Unix() + 3600, + BodySize: 1 << 20, + HeaderSize: 1 << 10, + BodyBlocks: []bfs.BlockInfo{ + { + BFileOffsetFrom: 1 << 10, + BFileOffsetTo: 1 << 20, + }, + }, + } + data, err := header.Encode(bfs.Hash("123456")) + if err != nil { + t.Fatal(err) + } + jsonBytes, _ := json.Marshal(header) + t.Log(len(header.BodyBlocks), "blocks", len(data), "bytes", "json:", len(jsonBytes), "bytes") + + _, _, _, err = bfs.DecodeMetaBlock(data) + if err != nil { + t.Fatal(err) + } + } + + { + var header = &bfs.FileHeader{ + Version: 1, + Status: 200, + BodyBlocks: []bfs.BlockInfo{}, + } + var offset int64 + for { + var end = offset + 16<<10 + if end > 256<<10 { + break + } + + header.BodyBlocks = append(header.BodyBlocks, bfs.BlockInfo{ + BFileOffsetFrom: offset, + BFileOffsetTo: end, + }) + + offset = end + } + data, err := header.Encode(bfs.Hash("123456")) + if err != nil { + t.Fatal(err) + } + jsonBytes, _ := json.Marshal(header) + t.Log(len(header.BodyBlocks), "blocks", len(data), "bytes", "json:", len(jsonBytes), "bytes") + } + + { + var header = &bfs.FileHeader{ + Version: 1, + Status: 200, + BodyBlocks: []bfs.BlockInfo{}, + } + var offset int64 + for { + var end = offset + 16<<10 + if end > 512<<10 { + break + } + + header.BodyBlocks = append(header.BodyBlocks, bfs.BlockInfo{ + BFileOffsetFrom: offset, + BFileOffsetTo: end, + }) + + offset = end + } + data, err := header.Encode(bfs.Hash("123456")) + if err != nil { + t.Fatal(err) + } + jsonBytes, _ := json.Marshal(header) + t.Log(len(header.BodyBlocks), "blocks", len(data), "bytes", "json:", len(jsonBytes), "bytes") + } + + { + var header = &bfs.FileHeader{ + Version: 1, + Status: 200, + BodyBlocks: []bfs.BlockInfo{}, + } + var offset int64 + for { + var end = offset + 16<<10 + if end > 1<<20 { + break + } + + header.BodyBlocks = append(header.BodyBlocks, bfs.BlockInfo{ + BFileOffsetFrom: offset, + BFileOffsetTo: end, + }) + + offset = end + } + data, err := header.Encode(bfs.Hash("123456")) + if err != nil { + t.Fatal(err) + } + jsonBytes, _ := json.Marshal(header) + t.Log(len(header.BodyBlocks), "blocks", len(data), "bytes", "json:", len(jsonBytes), "bytes") + } +} + func BenchmarkFileHeader_Compact(b *testing.B) { for i := 0; i < b.N; i++ { var header = &bfs.FileHeader{ @@ -197,3 +315,40 @@ func BenchmarkFileHeader_Compact(b *testing.B) { header.Compact() } } + +func BenchmarkFileHeader_Encode(b *testing.B) { + runtime.GOMAXPROCS(12) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + var header = &bfs.FileHeader{ + Version: 1, + Status: 200, + ModifiedAt: rand.Int63(), + BodySize: rand.Int63(), + BodyBlocks: []bfs.BlockInfo{}, + } + var offset int64 + for { + var end = offset + 16<<10 + if end > 2<<20 { + break + } + + header.BodyBlocks = append(header.BodyBlocks, bfs.BlockInfo{ + BFileOffsetFrom: offset + int64(rand.Int()%1000000), + BFileOffsetTo: end + int64(rand.Int()%1000000), + }) + + offset = end + } + + var hash = bfs.Hash("123456") + + _, err := header.Encode(hash) + if err != nil { + b.Fatal(err) + } + } + }) +} diff --git a/internal/utils/bfs/gzip_reader_pool.go b/internal/utils/bfs/gzip_reader_pool.go new file mode 100644 index 0000000..794cba9 --- /dev/null +++ b/internal/utils/bfs/gzip_reader_pool.go @@ -0,0 +1,66 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package bfs + +import ( + "github.com/TeaOSLab/EdgeNode/internal/utils/percpu" + "github.com/klauspost/compress/gzip" + "io" + "runtime" +) + +var SharedDecompressPool = NewGzipReaderPool() + +type GzipReaderPool struct { + c chan *gzip.Reader + cList []chan *gzip.Reader +} + +func NewGzipReaderPool() *GzipReaderPool { + const poolSize = 16 + + var countProcs = runtime.GOMAXPROCS(0) + if countProcs <= 0 { + countProcs = runtime.NumCPU() + } + countProcs *= 4 + + var cList []chan *gzip.Reader + for i := 0; i < countProcs; i++ { + cList = append(cList, make(chan *gzip.Reader, poolSize)) + } + + return &GzipReaderPool{ + c: make(chan *gzip.Reader, poolSize), + cList: cList, + } +} + +func (this *GzipReaderPool) Get(rawReader io.Reader) (*gzip.Reader, error) { + select { + case w := <-this.getC(): + err := w.Reset(rawReader) + if err != nil { + return nil, err + } + return w, nil + default: + return gzip.NewReader(rawReader) + } +} + +func (this *GzipReaderPool) Put(reader *gzip.Reader) { + select { + case this.getC() <- reader: + default: + // 不需要close,因为已经在使用的时候调用了 + } +} + +func (this *GzipReaderPool) getC() chan *gzip.Reader { + var procId = percpu.GetProcId() + if procId < len(this.cList) { + return this.cList[procId] + } + return this.c +} diff --git a/internal/utils/bfs/gzip_writer_pool.go b/internal/utils/bfs/gzip_writer_pool.go new file mode 100644 index 0000000..9835437 --- /dev/null +++ b/internal/utils/bfs/gzip_writer_pool.go @@ -0,0 +1,63 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package bfs + +import ( + "github.com/TeaOSLab/EdgeNode/internal/utils/percpu" + "github.com/klauspost/compress/gzip" + "io" + "runtime" +) + +var SharedCompressPool = NewGzipWriterPool() + +type GzipWriterPool struct { + c chan *gzip.Writer + cList []chan *gzip.Writer +} + +func NewGzipWriterPool() *GzipWriterPool { + const poolSize = 16 + + var countProcs = runtime.GOMAXPROCS(0) + if countProcs <= 0 { + countProcs = runtime.NumCPU() + } + countProcs *= 4 + + var cList []chan *gzip.Writer + for i := 0; i < countProcs; i++ { + cList = append(cList, make(chan *gzip.Writer, poolSize)) + } + + return &GzipWriterPool{ + c: make(chan *gzip.Writer, poolSize), + cList: cList, + } +} + +func (this *GzipWriterPool) Get(rawWriter io.Writer) (*gzip.Writer, error) { + select { + case w := <-this.getC(): + w.Reset(rawWriter) + return w, nil + default: + return gzip.NewWriterLevel(rawWriter, gzip.BestSpeed) + } +} + +func (this *GzipWriterPool) Put(writer *gzip.Writer) { + select { + case this.getC() <- writer: + default: + // 不需要close,因为已经在使用的时候调用了 + } +} + +func (this *GzipWriterPool) getC() chan *gzip.Writer { + var procId = percpu.GetProcId() + if procId < len(this.cList) { + return this.cList[procId] + } + return this.c +} diff --git a/internal/utils/bfs/meta_file.go b/internal/utils/bfs/meta_file.go index 69894a8..1e8b05a 100644 --- a/internal/utils/bfs/meta_file.go +++ b/internal/utils/bfs/meta_file.go @@ -5,11 +5,8 @@ package bfs import ( "bytes" "encoding/binary" - "encoding/json" - "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" "github.com/TeaOSLab/EdgeNode/internal/zero" - "github.com/klauspost/compress/gzip" "io" "os" "sync" @@ -189,7 +186,7 @@ func (this *MetaFile) WriteClose(hash string, headerSize int64, bodySize int64) header.BodySize = bodySize header.Compact() - blockBytes, err := this.encodeFileHeader(hash, header) + blockBytes, err := header.Encode(hash) if err != nil { return err } @@ -315,7 +312,7 @@ func (this *MetaFile) Compact() error { return err } - blockBytes, err := this.encodeFileHeader(hash, header) + blockBytes, err := header.Encode(hash) if err != nil { return err } @@ -381,33 +378,3 @@ func (this *MetaFile) RemoveAll() error { _ = this.fp.Close() return os.Remove(this.fp.Name()) } - -// encode file header to data bytes -func (this *MetaFile) encodeFileHeader(hash string, header *FileHeader) ([]byte, error) { - headerJSON, err := json.Marshal(header) - if err != nil { - return nil, err - } - - var buf = utils.SharedBufferPool.Get() - defer utils.SharedBufferPool.Put(buf) - - // TODO 考虑使用gzip pool - gzWriter, err := gzip.NewWriterLevel(buf, gzip.BestSpeed) - if err != nil { - return nil, err - } - - _, err = gzWriter.Write(headerJSON) - if err != nil { - _ = gzWriter.Close() - return nil, err - } - - err = gzWriter.Close() - if err != nil { - return nil, err - } - - return EncodeMetaBlock(MetaActionNew, hash, buf.Bytes()) -} diff --git a/internal/utils/percpu/proc_id.go b/internal/utils/percpu/proc_id.go new file mode 100644 index 0000000..d0cc5fa --- /dev/null +++ b/internal/utils/percpu/proc_id.go @@ -0,0 +1,19 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package percpu + +import ( + _ "unsafe" +) + +//go:linkname runtime_procPin runtime.procPin +func runtime_procPin() int + +//go:linkname runtime_procUnpin runtime.procUnpin +func runtime_procUnpin() int + +func GetProcId() int { + var pid = runtime_procPin() + runtime_procUnpin() + return pid +}