From 6226e31cdc29eeedc26a5cc2fabb8c1611f4f730 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E7=A5=A5=E8=B6=85?= Date: Sat, 27 Apr 2024 20:11:50 +0800 Subject: [PATCH] =?UTF-8?q?bfs=EF=BC=9A=E5=AE=9E=E7=8E=B0FileHeader?= =?UTF-8?q?=E7=9A=84lazy=20load?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/utils/bfs/file_header_lazy.go | 56 +++++++++++ internal/utils/bfs/fs_options.go | 23 ++++- internal/utils/bfs/meta_block.go | 6 +- internal/utils/bfs/meta_file.go | 134 +++++++++++++++---------- internal/utils/bfs/meta_file_test.go | 46 +++++++++ 5 files changed, 206 insertions(+), 59 deletions(-) create mode 100644 internal/utils/bfs/file_header_lazy.go diff --git a/internal/utils/bfs/file_header_lazy.go b/internal/utils/bfs/file_header_lazy.go new file mode 100644 index 0000000..b88d422 --- /dev/null +++ b/internal/utils/bfs/file_header_lazy.go @@ -0,0 +1,56 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package bfs + +import ( + "bytes" + "encoding/json" + "github.com/klauspost/compress/gzip" +) + +// LazyFileHeader load file header lazily to save memory +type LazyFileHeader struct { + rawData []byte + fileHeader *FileHeader +} + +func NewLazyFileHeaderFromData(rawData []byte) *LazyFileHeader { + return &LazyFileHeader{ + rawData: rawData, + } +} + +func NewLazyFileHeader(fileHeader *FileHeader) *LazyFileHeader { + return &LazyFileHeader{ + fileHeader: fileHeader, + } +} + +func (this *LazyFileHeader) FileHeaderUnsafe() (*FileHeader, error) { + if this.fileHeader != nil { + return this.fileHeader, nil + } + + // TODO 使用pool管理gzip + gzReader, err := gzip.NewReader(bytes.NewBuffer(this.rawData)) + if err != nil { + return nil, err + } + + defer func() { + _ = gzReader.Close() + }() + + var header = &FileHeader{} + err = json.NewDecoder(gzReader).Decode(header) + if err != nil { + return nil, err + } + + header.IsWriting = false + + this.fileHeader = header + this.rawData = nil + + return header, nil +} diff --git a/internal/utils/bfs/fs_options.go b/internal/utils/bfs/fs_options.go index 1d8339c..598a39e 100644 --- a/internal/utils/bfs/fs_options.go +++ b/internal/utils/bfs/fs_options.go @@ -2,7 +2,11 @@ package bfs -import "time" +import ( + fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" + memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem" + "time" +) type FSOptions struct { MaxOpenFiles int @@ -13,10 +17,19 @@ type FSOptions struct { func (this *FSOptions) EnsureDefaults() { if this.MaxOpenFiles <= 0 { - this.MaxOpenFiles = 4 << 10 + // 根据内存计算最大打开文件数 + var maxOpenFiles = memutils.SystemMemoryGB() * 64 + if maxOpenFiles > (4 << 10) { + maxOpenFiles = 4 << 10 + } + this.MaxOpenFiles = maxOpenFiles } if this.BytesPerSync <= 0 { - this.BytesPerSync = 1 << 20 // TODO 根据硬盘实际写入速度进行调整 + if fsutils.DiskIsFast() { + this.BytesPerSync = 1 << 20 // TODO 根据硬盘实际写入速度进行调整 + } else { + this.BytesPerSync = 512 << 10 + } } if this.SyncTimeout <= 0 { this.SyncTimeout = 1 * time.Second @@ -27,8 +40,8 @@ func (this *FSOptions) EnsureDefaults() { } var DefaultFSOptions = &FSOptions{ - MaxOpenFiles: 4 << 10, - BytesPerSync: 1 << 20, // TODO 根据硬盘实际写入速度进行调整 + MaxOpenFiles: 1 << 10, + BytesPerSync: 512 << 10, SyncTimeout: 1 * time.Second, MaxSyncFiles: 32, } diff --git a/internal/utils/bfs/meta_block.go b/internal/utils/bfs/meta_block.go index 0096ffa..00f9e94 100644 --- a/internal/utils/bfs/meta_block.go +++ b/internal/utils/bfs/meta_block.go @@ -41,7 +41,11 @@ func DecodeMetaBlock(blockBytes []byte) (action MetaAction, hash string, data [] hash = string(blockBytes[5 : 5+HashLen]) if action == MetaActionNew { - data = blockBytes[dataOffset:] + var rawData = blockBytes[dataOffset:] + if len(rawData) > 0 { + data = make([]byte, len(rawData)) + copy(data, rawData) + } } return diff --git a/internal/utils/bfs/meta_file.go b/internal/utils/bfs/meta_file.go index e474219..69894a8 100644 --- a/internal/utils/bfs/meta_file.go +++ b/internal/utils/bfs/meta_file.go @@ -21,11 +21,11 @@ const Version1 = 1 type MetaFile struct { fp *os.File filename string - headerMap map[string]*FileHeader // hash => *FileHeader - mu *sync.RWMutex // TODO 考虑单独一个,不要和bFile共享? + headerMap map[string]*LazyFileHeader // hash => *LazyFileHeader + mu *sync.RWMutex // TODO 考虑单独一个,不要和bFile共享? isModified bool - modifiedHashMap map[string]zero.Zero + modifiedHashMap map[string]zero.Zero // hash => Zero } func OpenMetaFile(filename string, mu *sync.RWMutex) (*MetaFile, error) { @@ -37,7 +37,7 @@ func OpenMetaFile(filename string, mu *sync.RWMutex) (*MetaFile, error) { var mFile = &MetaFile{ filename: filename, fp: fp, - headerMap: map[string]*FileHeader{}, + headerMap: map[string]*LazyFileHeader{}, mu: mu, modifiedHashMap: map[string]zero.Zero{}, } @@ -59,7 +59,7 @@ func (this *MetaFile) load() error { return err } - // TODO 考虑文件最后一行未写完整的情形 + // TODO 检查文件是否完整 var buf = make([]byte, 4<<10) var blockBytes []byte @@ -82,11 +82,7 @@ func (this *MetaFile) load() error { switch action { case MetaActionNew: - header, decodeHeaderErr := this.decodeHeader(data) - if decodeHeaderErr != nil { - return decodeHeaderErr - } - this.headerMap[hash] = header + this.headerMap[hash] = NewLazyFileHeaderFromData(data) case MetaActionRemove: delete(this.headerMap, hash) } @@ -106,16 +102,17 @@ func (this *MetaFile) load() error { } func (this *MetaFile) WriteMeta(hash string, status int, expiresAt int64, expectedFileSize int64) error { + this.mu.Lock() defer this.mu.Unlock() - this.headerMap[hash] = &FileHeader{ + this.headerMap[hash] = NewLazyFileHeader(&FileHeader{ Version: Version1, ExpiresAt: expiresAt, Status: status, ExpiredBodySize: expectedFileSize, IsWriting: true, - } + }) this.modifiedHashMap[hash] = zero.Zero{} @@ -123,11 +120,16 @@ func (this *MetaFile) WriteMeta(hash string, status int, expiresAt int64, expect } func (this *MetaFile) WriteHeaderBlockUnsafe(hash string, bOffsetFrom int64, bOffsetTo int64) error { - header, ok := this.headerMap[hash] + lazyHeader, ok := this.headerMap[hash] if !ok { return nil } + header, err := lazyHeader.FileHeaderUnsafe() + if err != nil { + return err + } + // TODO 合并相邻block header.HeaderBlocks = append(header.HeaderBlocks, BlockInfo{ BFileOffsetFrom: bOffsetFrom, @@ -140,11 +142,16 @@ func (this *MetaFile) WriteHeaderBlockUnsafe(hash string, bOffsetFrom int64, bOf } func (this *MetaFile) WriteBodyBlockUnsafe(hash string, bOffsetFrom int64, bOffsetTo int64, originOffsetFrom int64, originOffsetTo int64) error { - header, ok := this.headerMap[hash] + lazyHeader, ok := this.headerMap[hash] if !ok { return nil } + header, err := lazyHeader.FileHeaderUnsafe() + if err != nil { + return err + } + // TODO 合并相邻block header.BodyBlocks = append(header.BodyBlocks, BlockInfo{ OriginOffsetFrom: originOffsetFrom, @@ -162,21 +169,27 @@ func (this *MetaFile) WriteClose(hash string, headerSize int64, bodySize int64) // TODO 考虑单个hash多次重复调用的情况 this.mu.Lock() - header, ok := this.headerMap[hash] - if ok { - // TODO 检查bodySize和expectedBodySize是否一致,如果不一致则从headerMap中删除 - - header.ModifiedAt = fasttime.Now().Unix() - header.HeaderSize = headerSize - header.BodySize = bodySize - header.Compact() - } - this.mu.Unlock() + lazyHeader, ok := this.headerMap[hash] if !ok { + this.mu.Unlock() return nil } - blockBytes, err := this.encodeHeader(hash, header) + header, err := lazyHeader.FileHeaderUnsafe() + if err != nil { + return err + } + + this.mu.Unlock() + + // TODO 检查bodySize和expectedBodySize是否一致,如果不一致则从headerMap中删除 + + header.ModifiedAt = fasttime.Now().Unix() + header.HeaderSize = headerSize + header.BodySize = bodySize + header.Compact() + + blockBytes, err := this.encodeFileHeader(hash, header) if err != nil { return err } @@ -229,28 +242,53 @@ func (this *MetaFile) RemoveFile(hash string) error { func (this *MetaFile) FileHeader(hash string) (header *FileHeader, ok bool) { this.mu.RLock() defer this.mu.RUnlock() - header, ok = this.headerMap[hash] + + lazyHeader, ok := this.headerMap[hash] + + if ok { + var err error + header, err = lazyHeader.FileHeaderUnsafe() + if err != nil { + ok = false + } + } return } func (this *MetaFile) FileHeaderUnsafe(hash string) (header *FileHeader, ok bool) { - header, ok = this.headerMap[hash] + lazyHeader, ok := this.headerMap[hash] + + if ok { + var err error + header, err = lazyHeader.FileHeaderUnsafe() + if err != nil { + ok = false + } + } + return } func (this *MetaFile) CloneFileHeader(hash string) (header *FileHeader, ok bool) { this.mu.RLock() defer this.mu.RUnlock() - header, ok = this.headerMap[hash] + lazyHeader, ok := this.headerMap[hash] if !ok { return } + var err error + header, err = lazyHeader.FileHeaderUnsafe() + if err != nil { + ok = false + return + } + header = header.Clone() return } -func (this *MetaFile) FileHeaders() map[string]*FileHeader { +func (this *MetaFile) FileHeaders() map[string]*LazyFileHeader { this.mu.RLock() defer this.mu.RUnlock() return this.headerMap @@ -271,8 +309,13 @@ func (this *MetaFile) Compact() error { defer this.mu.Unlock() var buf = bytes.NewBuffer(nil) - for hash, header := range this.headerMap { - blockBytes, err := this.encodeHeader(hash, header) + for hash, lazyHeader := range this.headerMap { + header, err := lazyHeader.FileHeaderUnsafe() + if err != nil { + return err + } + + blockBytes, err := this.encodeFileHeader(hash, header) if err != nil { return err } @@ -313,8 +356,12 @@ func (this *MetaFile) SyncUnsafe() error { } for hash := range this.modifiedHashMap { - header, ok := this.headerMap[hash] + lazyHeader, ok := this.headerMap[hash] if ok { + header, decodeErr := lazyHeader.FileHeaderUnsafe() + if decodeErr != nil { + return decodeErr + } header.IsWriting = false } } @@ -335,7 +382,8 @@ func (this *MetaFile) RemoveAll() error { return os.Remove(this.fp.Name()) } -func (this *MetaFile) encodeHeader(hash string, header *FileHeader) ([]byte, error) { +// encode file header to data bytes +func (this *MetaFile) encodeFileHeader(hash string, header *FileHeader) ([]byte, error) { headerJSON, err := json.Marshal(header) if err != nil { return nil, err @@ -363,23 +411,3 @@ func (this *MetaFile) encodeHeader(hash string, header *FileHeader) ([]byte, err return EncodeMetaBlock(MetaActionNew, hash, buf.Bytes()) } - -func (this *MetaFile) decodeHeader(data []byte) (*FileHeader, error) { - gzReader, err := gzip.NewReader(bytes.NewBuffer(data)) - if err != nil { - return nil, err - } - - defer func() { - _ = gzReader.Close() - }() - - var header = &FileHeader{} - err = json.NewDecoder(gzReader).Decode(header) - if err != nil { - return nil, err - } - - header.IsWriting = false - return header, nil -} diff --git a/internal/utils/bfs/meta_file_test.go b/internal/utils/bfs/meta_file_test.go index 16d9e86..0803922 100644 --- a/internal/utils/bfs/meta_file_test.go +++ b/internal/utils/bfs/meta_file_test.go @@ -48,6 +48,52 @@ func TestNewMetaFile_Large(t *testing.T) { t.Logf("cost: %.2fms, qps: %.2fms/file", costMs, costMs/float64(count)) } +func TestNewMetaFile_Memory(t *testing.T) { + var count = 2 + + if testutils.IsSingleTesting() { + count = 100 + } + + var stat1 = testutils.ReadMemoryStat() + + var mFiles []*bfs.MetaFile + + for i := 0; i < count; i++ { + mFile, err := bfs.OpenMetaFile("testdata/test2.m", &sync.RWMutex{}) + if err != nil { + if bfs.IsNotExist(err) { + continue + } + t.Fatal(err) + } + + _ = mFile.Close() + mFiles = append(mFiles, mFile) + } + + var stat2 = testutils.ReadMemoryStat() + t.Log((stat2.HeapInuse-stat1.HeapInuse)>>20, "MiB") +} + +func TestMetaFile_FileHeaders(t *testing.T) { + mFile, openErr := bfs.OpenMetaFile("testdata/test2.m", &sync.RWMutex{}) + if openErr != nil { + if bfs.IsNotExist(openErr) { + return + } + t.Fatal(openErr) + } + _ = mFile.Close() + for hash, lazyHeader := range mFile.FileHeaders() { + header, err := lazyHeader.FileHeaderUnsafe() + if err != nil { + t.Fatal(err) + } + t.Log(hash, header.ModifiedAt, header.BodySize) + } +} + func TestMetaFile_WriteMeta(t *testing.T) { mFile, err := bfs.OpenMetaFile("testdata/test.m", &sync.RWMutex{}) if err != nil {