From fe52046fbac826bf5d47bf39fc929120547b2a7b Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Sun, 28 Apr 2024 19:02:59 +0800 Subject: [PATCH] =?UTF-8?q?bfs=EF=BC=9A=E4=BF=AE=E5=A4=8D=E5=B9=B6?= =?UTF-8?q?=E5=8F=91=E8=AF=BB=E6=8F=90=E7=A4=BAErrClosed=E9=83=BD=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/utils/bfs/blocks_file.go | 47 +++++++++----- internal/utils/bfs/fs.go | 103 +++++++++++++++++++++--------- 2 files changed, 104 insertions(+), 46 deletions(-) diff --git a/internal/utils/bfs/blocks_file.go b/internal/utils/bfs/blocks_file.go index d2884c4..4375573 100644 --- a/internal/utils/bfs/blocks_file.go +++ b/internal/utils/bfs/blocks_file.go @@ -38,8 +38,8 @@ type BlocksFile struct { writingFileMap map[string]zero.Zero // hash => Zero syncAt time.Time - readerPool chan *FileReader - countReadingFiles int32 + readerPool chan *FileReader + countRefs int32 } func NewBlocksFileWithRawFile(fp *os.File, options *BlockFileOptions) (*BlocksFile, error) { @@ -202,7 +202,7 @@ func (this *BlocksFile) OpenFileReader(fileHash string, isPartial bool) (*FileRe return nil, ErrClosed } reader.Reset(header) - atomic.AddInt32(&this.countReadingFiles, 1) + atomic.AddInt32(&this.countRefs, 1) return reader, nil default: } @@ -214,12 +214,12 @@ func (this *BlocksFile) OpenFileReader(fileHash string, isPartial bool) (*FileRe return nil, err } - atomic.AddInt32(&this.countReadingFiles, 1) + atomic.AddInt32(&this.countRefs, 1) return NewFileReader(this, fp, header), nil } func (this *BlocksFile) CloseFileReader(reader *FileReader) error { - atomic.AddInt32(&this.countReadingFiles, -1) + defer atomic.AddInt32(&this.countRefs, -1) select { case this.readerPool <- reader: @@ -296,10 +296,10 @@ func (this *BlocksFile) RemoveAll() error { // CanClose 检查是否可以关闭 func (this *BlocksFile) CanClose() bool { - this.mu.Lock() - defer this.mu.Unlock() + this.mu.RLock() + defer this.mu.RUnlock() - if len(this.writingFileMap) > 0 || atomic.LoadInt32(&this.countReadingFiles) > 0 { + if len(this.writingFileMap) > 0 || atomic.LoadInt32(&this.countRefs) > 0 { return false } @@ -327,6 +327,19 @@ func (this *BlocksFile) Close() error { return this.fp.Close() } +// IsClosing 判断当前文件是否正在关闭或者已关闭 +func (this *BlocksFile) IsClosing() bool { + return this.isClosed || this.isClosing +} + +func (this *BlocksFile) IncrRef() { + atomic.AddInt32(&this.countRefs, 1) +} + +func (this *BlocksFile) DecrRef() { + atomic.AddInt32(&this.countRefs, -1) +} + func (this *BlocksFile) TestReaderPool() chan *FileReader { return this.readerPool } @@ -339,7 +352,7 @@ func (this *BlocksFile) removeWritingFile(hash string) { func (this *BlocksFile) checkStatus() error { if this.isClosed || this.isClosing { - return ErrClosed + return fmt.Errorf("check status failed: %w", ErrClosed) } return nil } @@ -355,15 +368,17 @@ func (this *BlocksFile) sync(force bool) error { } } - this.writtenBytes = 0 - - AckWriteThread() - err := this.fp.Sync() - ReleaseWriteThread() - if err != nil { - return err + if this.writtenBytes > 0 { + AckWriteThread() + err := this.fp.Sync() + ReleaseWriteThread() + if err != nil { + return err + } } + this.writtenBytes = 0 + this.syncAt = time.Now() if force { diff --git a/internal/utils/bfs/fs.go b/internal/utils/bfs/fs.go index 934bb66..15ed096 100644 --- a/internal/utils/bfs/fs.go +++ b/internal/utils/bfs/fs.go @@ -4,6 +4,7 @@ package bfs import ( "errors" + "github.com/TeaOSLab/EdgeNode/internal/goman" fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" "github.com/TeaOSLab/EdgeNode/internal/utils/linkedlist" "github.com/TeaOSLab/EdgeNode/internal/zero" @@ -151,11 +152,19 @@ func (this *FS) Close() error { var lastErr error this.mu.Lock() - for _, bFile := range this.bMap { - err := bFile.Close() - if err != nil { - lastErr = err + if len(this.bMap) > 0 { + var g = goman.NewTaskGroup() + for _, bFile := range this.bMap { + var bFileCopy = bFile + g.Run(func() { + err := bFileCopy.Close() + if err != nil { + lastErr = err + } + }) } + + g.Wait() } this.mu.Unlock() @@ -213,8 +222,17 @@ func (this *FS) syncLoop() { this.mu.RUnlock() for _, bFile := range bFiles { + if bFile.IsClosing() { + continue + } + err := bFile.ForceSync() if err != nil { + // check again + if bFile.IsClosing() { + continue + } + // TODO 可以在options自定义一个logger log.Println("BFS", "sync failed: "+err.Error()) } @@ -262,46 +280,34 @@ func (this *FS) openBFileForHashReading(hash string) (*BlocksFile, error) { return nil, err } - this.mu.RLock() + err = this.waitBFile(bPath) + if err != nil { + return nil, err + } + + this.mu.Lock() bFile, ok := this.bMap[bName] - this.mu.RUnlock() if ok { // 调整当前BFile所在位置 - this.mu.Lock() item, itemOk := this.bItemMap[bName] if itemOk { this.bList.Remove(item) this.bList.Push(item) } this.mu.Unlock() - return bFile, nil } + this.mu.Unlock() + return this.openBFile(bPath, bName) } func (this *FS) openBFile(bPath string, bName string) (*BlocksFile, error) { // check closing queue - this.mu.RLock() - _, isClosing := this.closingBMap[bPath] - this.mu.RUnlock() - if isClosing { - var maxWaits = 30_000 - for { - this.mu.RLock() - _, isClosing = this.closingBMap[bPath] - this.mu.RUnlock() - if !isClosing { - break - } - time.Sleep(1 * time.Millisecond) - maxWaits-- - - if maxWaits < 0 { - return nil, errors.New("open blocks file timeout") - } - } + err := this.waitBFile(bPath) + if err != nil { + return nil, err } this.mu.Lock() @@ -313,12 +319,18 @@ func (this *FS) openBFile(bPath string, bName string) (*BlocksFile, error) { return bFile, nil } - bFile, err := OpenBlocksFile(bPath, &BlockFileOptions{ + // TODO 不要把 OpenBlocksFile 放入到 mu 中? + bFile, err = OpenBlocksFile(bPath, &BlockFileOptions{ BytesPerSync: this.opt.BytesPerSync, }) if err != nil { return nil, err } + + // 防止被关闭 + bFile.IncrRef() + defer bFile.DecrRef() + this.bMap[bName] = bFile // 加入到列表中 @@ -352,21 +364,26 @@ func (this *FS) processClosingBFiles() { this.mu.Unlock() } +// 弹出超出BFile数量限制的BFile func (this *FS) shiftOpenFiles() { - var count = this.bList.Len() - this.opt.MaxOpenFiles + var l = this.bList.Len() + var count = l - this.opt.MaxOpenFiles if count <= 0 { return } var bNames []string + var searchCount int this.bList.Range(func(item *linkedlist.Item[string]) (goNext bool) { + searchCount++ + var bName = item.Value var bFile = this.bMap[bName] if bFile.CanClose() { bNames = append(bNames, bName) count-- } - return count > 0 + return count > 0 && searchCount < 8 && searchCount < l-8 }) for _, bName := range bNames { @@ -392,3 +409,29 @@ func (this *FS) shiftOpenFiles() { }(bFile) } } + +func (this *FS) waitBFile(bPath string) error { + this.mu.RLock() + _, isClosing := this.closingBMap[bPath] + this.mu.RUnlock() + if !isClosing { + return nil + } + + var maxWaits = 30_000 + for { + this.mu.RLock() + _, isClosing = this.closingBMap[bPath] + this.mu.RUnlock() + if !isClosing { + break + } + time.Sleep(1 * time.Millisecond) + maxWaits-- + + if maxWaits < 0 { + return errors.New("open blocks file timeout") + } + } + return nil +}