From 7b75c508c6ccd8f1372d8e73003bbb2fc9a3fc06 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Sat, 27 Apr 2024 17:29:12 +0800 Subject: [PATCH] =?UTF-8?q?bfs=EF=BC=9A=E5=AE=9E=E7=8E=B0maxOpenFiles?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/utils/bfs/blocks_file.go | 22 ++- internal/utils/bfs/file_reader.go | 19 +- internal/utils/bfs/file_reader_test.go | 31 +++ internal/utils/bfs/fs.go | 178 ++++++++++++++++-- internal/utils/bfs/fs_options.go | 2 +- internal/utils/bfs/fs_test.go | 73 +++++++ internal/utils/bfs/meta_file.go | 18 +- internal/utils/bfs/meta_file_test.go | 24 +++ ...{thread_limiters.go => threads_limiter.go} | 5 +- 9 files changed, 318 insertions(+), 54 deletions(-) rename internal/utils/bfs/{thread_limiters.go => threads_limiter.go} (75%) diff --git a/internal/utils/bfs/blocks_file.go b/internal/utils/bfs/blocks_file.go index af8f73c..0fe8690 100644 --- a/internal/utils/bfs/blocks_file.go +++ b/internal/utils/bfs/blocks_file.go @@ -189,12 +189,17 @@ func (this *BlocksFile) OpenFileReader(fileHash string, isPartial bool) (*FileRe // 先尝试从Pool中获取 select { case reader := <-this.readerPool: + if reader == nil { + return nil, ErrClosed + } reader.Reset(header) return reader, nil default: } + AckReadThread() fp, err := os.Open(this.fp.Name()) + ReleaseReadThread() if err != nil { return nil, err } @@ -270,19 +275,22 @@ func (this *BlocksFile) RemoveAll() error { _ = this.mFile.RemoveAll() this.closeReaderPool() + _ = this.fp.Close() return os.Remove(this.fp.Name()) } +// Close 关闭当前文件 func (this *BlocksFile) Close() error { this.mu.Lock() defer this.mu.Unlock() - err := this.sync(true) - if err != nil { - return err + if this.isClosed { + return nil } + _ = this.sync(true) + this.isClosed = true _ = this.mFile.Close() @@ -292,6 +300,10 @@ func (this *BlocksFile) Close() error { return this.fp.Close() } +func (this *BlocksFile) TestReaderPool() chan *FileReader { + return this.readerPool +} + func (this *BlocksFile) checkStatus() error { if this.isClosed { return ErrClosed @@ -332,7 +344,9 @@ func (this *BlocksFile) closeReaderPool() { for { select { case reader := <-this.readerPool: - _ = reader.Free() + if reader != nil { + _ = reader.Free() + } default: return } diff --git a/internal/utils/bfs/file_reader.go b/internal/utils/bfs/file_reader.go index de1719c..94c53ff 100644 --- a/internal/utils/bfs/file_reader.go +++ b/internal/utils/bfs/file_reader.go @@ -15,7 +15,6 @@ type FileReader struct { fileHeader *FileHeader pos int64 - bPos int64 } func NewFileReader(bFile *BlocksFile, fp *os.File, fileHeader *FileHeader) *FileReader { @@ -23,7 +22,6 @@ func NewFileReader(bFile *BlocksFile, fp *os.File, fileHeader *FileHeader) *File bFile: bFile, fp: fp, fileHeader: fileHeader, - bPos: -1, } } @@ -64,20 +62,8 @@ func (this *FileReader) ReadAt(b []byte, offset int64) (n int, err error) { } AckReadThread() - defer ReleaseReadThread() - - if bFrom == this.bPos { // read continuous - n, err = this.fp.Read(b[:bufLen]) - } else { // read from offset - _, err = this.fp.Seek(bFrom, io.SeekStart) - if err != nil { - return - } - n, err = this.fp.Read(b[:bufLen]) - } - if n > 0 { - this.bPos = bFrom + int64(n) - } + n, err = this.fp.ReadAt(b[:bufLen], bFrom) + ReleaseReadThread() return } @@ -85,7 +71,6 @@ func (this *FileReader) ReadAt(b []byte, offset int64) (n int, err error) { func (this *FileReader) Reset(fileHeader *FileHeader) { this.fileHeader = fileHeader this.pos = 0 - this.bPos = -1 } func (this *FileReader) Close() error { diff --git a/internal/utils/bfs/file_reader_test.go b/internal/utils/bfs/file_reader_test.go index 44c08de..86da4a1 100644 --- a/internal/utils/bfs/file_reader_test.go +++ b/internal/utils/bfs/file_reader_test.go @@ -204,3 +204,34 @@ func TestFileReader_ReadAt(t *testing.T) { } } } + +func TestFileReader_Pool(t *testing.T) { + bFile, openErr := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) + if openErr != nil { + if os.IsNotExist(openErr) { + t.Log(openErr) + return + } + t.Fatal(openErr) + } + + for i := 0; i < 10; i++ { + reader, err := bFile.OpenFileReader(bfs.Hash("123456"), false) + if err != nil { + if os.IsNotExist(err) { + continue + } + t.Fatal(err) + } + + go func() { + err = reader.Close() + if err != nil { + t.Log(err) + } + }() + } + + time.Sleep(100 * time.Millisecond) + t.Log(len(bFile.TestReaderPool())) +} diff --git a/internal/utils/bfs/fs.go b/internal/utils/bfs/fs.go index 323912a..1383421 100644 --- a/internal/utils/bfs/fs.go +++ b/internal/utils/bfs/fs.go @@ -5,17 +5,29 @@ package bfs import ( "errors" fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" + "github.com/TeaOSLab/EdgeNode/internal/utils/linkedlist" + "github.com/TeaOSLab/EdgeNode/internal/zero" "log" + "runtime" "sync" "time" ) -// FS 文件系统管理 +func IsEnabled() bool { + return runtime.GOARCH == "amd64" || runtime.GOARCH == "arm64" +} + +// FS 文件系统对象 type FS struct { dir string opt *FSOptions - bMap map[string]*BlocksFile // name => *BlocksFile + bMap map[string]*BlocksFile // name => *BlocksFile + bList *linkedlist.List[string] // [bName] + bItemMap map[string]*linkedlist.Item[string] + closingBMap map[string]zero.Zero // filename => Zero + closingBChan chan *BlocksFile + mu *sync.RWMutex isClosed bool @@ -24,8 +36,17 @@ type FS struct { locker *fsutils.Locker } +// OpenFS 打开文件系统 func OpenFS(dir string, options *FSOptions) (*FS, error) { - options.EnsureDefaults() + if !IsEnabled() { + return nil, errors.New("the fs only works under 64 bit system") + } + + if options == nil { + options = DefaultFSOptions + } else { + options.EnsureDefaults() + } var locker = fsutils.NewLocker(dir + "/fs") err := locker.Lock() @@ -34,29 +55,48 @@ func OpenFS(dir string, options *FSOptions) (*FS, error) { } var fs = &FS{ - dir: dir, - bMap: map[string]*BlocksFile{}, - mu: &sync.RWMutex{}, - opt: options, - syncTicker: time.NewTicker(1 * time.Second), - locker: locker, + dir: dir, + bMap: map[string]*BlocksFile{}, + bList: linkedlist.NewList[string](), + bItemMap: map[string]*linkedlist.Item[string]{}, + closingBMap: map[string]zero.Zero{}, + closingBChan: make(chan *BlocksFile, 32), + mu: &sync.RWMutex{}, + opt: options, + syncTicker: time.NewTicker(1 * time.Second), + locker: locker, } go fs.init() return fs, nil } func (this *FS) init() { - // sync in background - for range this.syncTicker.C { - this.syncLoop() - } + go func() { + // sync in background + for range this.syncTicker.C { + this.syncLoop() + } + }() + + go func() { + for { + this.processClosingBFiles() + } + }() } +// OpenFileWriter 打开文件写入器 func (this *FS) OpenFileWriter(hash string, bodySize int64, isPartial bool) (*FileWriter, error) { + if this.isClosed { + return nil, errors.New("the fs closed") + } + if isPartial && bodySize <= 0 { return nil, errors.New("invalid body size for partial content") } + // TODO 限制同一个hash同时只能有一个Writer + bFile, err := this.openBFileForHashWriting(hash) if err != nil { return nil, err @@ -64,7 +104,12 @@ func (this *FS) OpenFileWriter(hash string, bodySize int64, isPartial bool) (*Fi return bFile.OpenFileWriter(hash, bodySize, isPartial) } +// OpenFileReader 打开文件读取器 func (this *FS) OpenFileReader(hash string, isPartial bool) (*FileReader, error) { + if this.isClosed { + return nil, errors.New("the fs closed") + } + bFile, err := this.openBFileForHashReading(hash) if err != nil { return nil, err @@ -73,6 +118,10 @@ func (this *FS) OpenFileReader(hash string, isPartial bool) (*FileReader, error) } func (this *FS) ExistFile(hash string) (bool, error) { + if this.isClosed { + return false, errors.New("the fs closed") + } + bFile, err := this.openBFileForHashReading(hash) if err != nil { return false, err @@ -81,6 +130,10 @@ func (this *FS) ExistFile(hash string) (bool, error) { } func (this *FS) RemoveFile(hash string) error { + if this.isClosed { + return errors.New("the fs closed") + } + bFile, err := this.openBFileForHashWriting(hash) if err != nil { return err @@ -89,8 +142,15 @@ func (this *FS) RemoveFile(hash string) error { } func (this *FS) Close() error { + if this.isClosed { + return nil + } + this.isClosed = true + close(this.closingBChan) + this.syncTicker.Stop() + var lastErr error this.mu.Lock() for _, bFile := range this.bMap { @@ -109,6 +169,14 @@ func (this *FS) Close() error { return lastErr } +func (this *FS) TestBMap() map[string]*BlocksFile { + return this.bMap +} + +func (this *FS) TestBList() *linkedlist.List[string] { + return this.bList +} + func (this *FS) bPathForHash(hash string) (path string, bName string, err error) { err = CheckHashErr(hash) if err != nil { @@ -170,6 +238,15 @@ func (this *FS) openBFileForHashWriting(hash string) (*BlocksFile, error) { bFile, ok := this.bMap[bName] this.mu.RUnlock() if ok { + // 调整当前BFile所在位置 + this.mu.Lock() + item, itemOk := this.bItemMap[bName] + if itemOk { + this.bList.Remove(item) + this.bList.Push(item) + } + this.mu.Unlock() + return bFile, nil } @@ -191,6 +268,15 @@ func (this *FS) openBFileForHashReading(hash string) (*BlocksFile, error) { bFile, ok := this.bMap[bName] this.mu.RUnlock() if ok { + // 调整当前BFile所在位置 + this.mu.Lock() + item, itemOk := this.bItemMap[bName] + if itemOk { + this.bList.Remove(item) + this.bList.Push(item) + } + this.mu.Unlock() + return bFile, nil } @@ -198,6 +284,28 @@ func (this *FS) openBFileForHashReading(hash string) (*BlocksFile, error) { } func (this *FS) openBFile(bPath string, bName string) (*BlocksFile, error) { + // check closing queue + this.mu.RLock() + _, isClosing := this.closingBMap[bPath] + this.mu.RUnlock() + if isClosing { + var maxWaits = 30_000 + for { + this.mu.RLock() + _, isClosing = this.closingBMap[bPath] + this.mu.RUnlock() + if !isClosing { + break + } + time.Sleep(1 * time.Millisecond) + maxWaits-- + + if maxWaits < 0 { + return nil, errors.New("open blocks file timeout") + } + } + } + this.mu.Lock() defer this.mu.Unlock() @@ -214,5 +322,49 @@ func (this *FS) openBFile(bPath string, bName string) (*BlocksFile, error) { return nil, err } this.bMap[bName] = bFile + + // 加入到列表中 + var item = linkedlist.NewItem(bName) + this.bList.Push(item) + this.bItemMap[bName] = item + + // 检查是否超出maxOpenFiles + if this.bList.Len() > this.opt.MaxOpenFiles { + var firstBName = this.bList.Shift().Value + var firstBFile = this.bMap[firstBName] + delete(this.bMap, firstBName) + delete(this.bItemMap, firstBName) + + this.closingBMap[firstBFile.Filename()] = zero.Zero{} + + // MUST run in goroutine + go func() { + // 因为 closingBChan 可能已经关闭 + defer func() { + recover() + }() + + this.closingBChan <- firstBFile + }() + } + return bFile, nil } + +// 处理关闭中的 BFile 们 +func (this *FS) processClosingBFiles() { + if this.isClosed { + return + } + + var bFile = <-this.closingBChan + if bFile == nil { + return + } + + _ = bFile.Close() + + this.mu.Lock() + delete(this.closingBMap, bFile.Filename()) + this.mu.Unlock() +} diff --git a/internal/utils/bfs/fs_options.go b/internal/utils/bfs/fs_options.go index 7e295e1..1d8339c 100644 --- a/internal/utils/bfs/fs_options.go +++ b/internal/utils/bfs/fs_options.go @@ -5,7 +5,7 @@ package bfs import "time" type FSOptions struct { - MaxOpenFiles int // TODO 需要实现 + MaxOpenFiles int BytesPerSync int64 SyncTimeout time.Duration MaxSyncFiles int diff --git a/internal/utils/bfs/fs_test.go b/internal/utils/bfs/fs_test.go index 706a21d..a9b9345 100644 --- a/internal/utils/bfs/fs_test.go +++ b/internal/utils/bfs/fs_test.go @@ -5,10 +5,14 @@ package bfs_test import ( "github.com/TeaOSLab/EdgeNode/internal/utils/bfs" "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" + "github.com/TeaOSLab/EdgeNode/internal/utils/linkedlist" + "github.com/TeaOSLab/EdgeNode/internal/utils/testutils" "github.com/iwind/TeaGo/Tea" _ "github.com/iwind/TeaGo/bootstrap" "github.com/iwind/TeaGo/logs" + "github.com/iwind/TeaGo/types" "io" + "os" "testing" ) @@ -118,3 +122,72 @@ func TestFS_RemoveFile(t *testing.T) { } t.Log("exist:", exist) } + +func TestFS_OpenFileWriter_Close(t *testing.T) { + if !testutils.IsSingleTesting() { + return + } + + fs, openErr := bfs.OpenFS(Tea.Root+"/data/bfs/test", &bfs.FSOptions{ + MaxOpenFiles: 4 << 10, + }) + if openErr != nil { + t.Fatal(openErr) + } + defer func() { + _ = fs.Close() + }() + + var count = 10 + if testutils.IsSingleTesting() { + count = 1000 + } + + for i := 0; i < count; i++ { + //t.Log("open", i) + writer, err := fs.OpenFileWriter(bfs.Hash(types.String(i)), -1, false) + if err != nil { + t.Fatal(err) + } + _ = writer.Close() + } + + t.Log(len(fs.TestBMap()), "block files, pid:", os.Getpid()) + + var p = func() { + var bNames []string + fs.TestBList().Range(func(item *linkedlist.Item[string]) (goNext bool) { + bNames = append(bNames, item.Value) + return true + }) + + if len(bNames) != len(fs.TestBMap()) { + t.Fatal("len(bNames)!=len(bMap)") + } + + t.Log("["+types.String(len(bNames))+"]", bNames) + } + + p() + + { + writer, err := fs.OpenFileWriter(bfs.Hash(types.String(10)), -1, false) + if err != nil { + t.Fatal(err) + } + _ = writer.Close() + } + + p() + + // testing closing + for i := 0; i < 3; i++ { + writer, err := fs.OpenFileWriter(bfs.Hash(types.String(0)), -1, false) + if err != nil { + t.Fatal(err) + } + _ = writer.Close() + } + + p() +} diff --git a/internal/utils/bfs/meta_file.go b/internal/utils/bfs/meta_file.go index d1f3ff1..cc7d06d 100644 --- a/internal/utils/bfs/meta_file.go +++ b/internal/utils/bfs/meta_file.go @@ -369,24 +369,8 @@ func (this *MetaFile) decodeHeader(data []byte) (*FileHeader, error) { _ = gzReader.Close() }() - var resultBuf = bytes.NewBuffer(nil) - - var buf = make([]byte, 4096) - for { - n, readErr := gzReader.Read(buf) - if n > 0 { - resultBuf.Write(buf[:n]) - } - if readErr != nil { - if readErr == io.EOF { - break - } - return nil, readErr - } - } - var header = &FileHeader{} - err = json.Unmarshal(resultBuf.Bytes(), header) + err = json.NewDecoder(gzReader).Decode(header) if err != nil { return nil, err } diff --git a/internal/utils/bfs/meta_file_test.go b/internal/utils/bfs/meta_file_test.go index dfe2a08..16d9e86 100644 --- a/internal/utils/bfs/meta_file_test.go +++ b/internal/utils/bfs/meta_file_test.go @@ -5,9 +5,11 @@ package bfs_test import ( "github.com/TeaOSLab/EdgeNode/internal/utils/bfs" "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" + "github.com/TeaOSLab/EdgeNode/internal/utils/testutils" "github.com/iwind/TeaGo/logs" "sync" "testing" + "time" ) func TestNewMetaFile(t *testing.T) { @@ -24,6 +26,28 @@ func TestNewMetaFile(t *testing.T) { //logs.PrintAsJSON(mFile.Headers(), t) } +func TestNewMetaFile_Large(t *testing.T) { + var count = 2 + + if testutils.IsSingleTesting() { + count = 100 + } + + var before = time.Now() + for i := 0; i < count; i++ { + mFile, err := bfs.OpenMetaFile("testdata/test2.m", &sync.RWMutex{}) + if err != nil { + if bfs.IsNotExist(err) { + continue + } + t.Fatal(err) + } + _ = mFile.Close() + } + var costMs = time.Since(before).Seconds() * 1000 + t.Logf("cost: %.2fms, qps: %.2fms/file", costMs, costMs/float64(count)) +} + func TestMetaFile_WriteMeta(t *testing.T) { mFile, err := bfs.OpenMetaFile("testdata/test.m", &sync.RWMutex{}) if err != nil { diff --git a/internal/utils/bfs/thread_limiters.go b/internal/utils/bfs/threads_limiter.go similarity index 75% rename from internal/utils/bfs/thread_limiters.go rename to internal/utils/bfs/threads_limiter.go index d770c52..93f291a 100644 --- a/internal/utils/bfs/thread_limiters.go +++ b/internal/utils/bfs/threads_limiter.go @@ -5,8 +5,9 @@ package bfs import "github.com/TeaOSLab/EdgeNode/internal/zero" // TODO 使用atomic代替channel?需要使用基准测试对比性能 -var readThreadsLimiter = make(chan zero.Zero, 16) -var writeThreadsLimiter = make(chan zero.Zero, 16) +// TODO 线程数可以根据硬盘数量动态调整? +var readThreadsLimiter = make(chan zero.Zero, 8) +var writeThreadsLimiter = make(chan zero.Zero, 8) func AckReadThread() { readThreadsLimiter <- zero.Zero{}