diff --git a/internal/utils/bfs/blocks_file.go b/internal/utils/bfs/blocks_file.go index 9a12ba6..d2884c4 100644 --- a/internal/utils/bfs/blocks_file.go +++ b/internal/utils/bfs/blocks_file.go @@ -11,6 +11,7 @@ import ( "path/filepath" "strings" "sync" + "sync/atomic" "time" ) @@ -28,7 +29,8 @@ type BlocksFile struct { fp *os.File mFile *MetaFile - isClosed bool + isClosing bool + isClosed bool mu *sync.RWMutex @@ -36,7 +38,8 @@ type BlocksFile struct { writingFileMap map[string]zero.Zero // hash => Zero syncAt time.Time - readerPool chan *FileReader + readerPool chan *FileReader + countReadingFiles int32 } func NewBlocksFileWithRawFile(fp *os.File, options *BlockFileOptions) (*BlocksFile, error) { @@ -199,6 +202,7 @@ func (this *BlocksFile) OpenFileReader(fileHash string, isPartial bool) (*FileRe return nil, ErrClosed } reader.Reset(header) + atomic.AddInt32(&this.countReadingFiles, 1) return reader, nil default: } @@ -209,10 +213,14 @@ func (this *BlocksFile) OpenFileReader(fileHash string, isPartial bool) (*FileRe if err != nil { return nil, err } + + atomic.AddInt32(&this.countReadingFiles, 1) return NewFileReader(this, fp, header), nil } func (this *BlocksFile) CloseFileReader(reader *FileReader) error { + atomic.AddInt32(&this.countReadingFiles, -1) + select { case this.readerPool <- reader: return nil @@ -286,6 +294,19 @@ func (this *BlocksFile) RemoveAll() error { return os.Remove(this.fp.Name()) } +// CanClose 检查是否可以关闭 +func (this *BlocksFile) CanClose() bool { + this.mu.Lock() + defer this.mu.Unlock() + + if len(this.writingFileMap) > 0 || atomic.LoadInt32(&this.countReadingFiles) > 0 { + return false + } + + this.isClosing = true + return true +} + // Close 关闭当前文件 func (this *BlocksFile) Close() error { this.mu.Lock() @@ -317,7 +338,7 @@ func (this *BlocksFile) removeWritingFile(hash string) { } func (this *BlocksFile) checkStatus() error { - if this.isClosed { + if this.isClosed || this.isClosing { return ErrClosed } return nil diff --git a/internal/utils/bfs/blocks_file_test.go b/internal/utils/bfs/blocks_file_test.go index 3480880..706d907 100644 --- a/internal/utils/bfs/blocks_file_test.go +++ b/internal/utils/bfs/blocks_file_test.go @@ -4,10 +4,43 @@ package bfs_test import ( "github.com/TeaOSLab/EdgeNode/internal/utils/bfs" + "github.com/iwind/TeaGo/assert" "os" "testing" ) +func TestBlocksFile_CanClose(t *testing.T) { + var a = assert.NewAssertion(t) + + bFile, openErr := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) + if openErr != nil { + if os.IsNotExist(openErr) { + return + } + t.Fatal(openErr) + } + + reader, err := bFile.OpenFileReader(bfs.Hash("123456"), false) + if err != nil { + t.Fatal(err) + } + + a.IsTrue(!bFile.CanClose()) + + err = reader.Close() + if err != nil { + t.Fatal(err) + } + + // duplicated close + err = reader.Close() + if err != nil { + t.Fatal(err) + } + + a.IsTrue(bFile.CanClose()) +} + func TestBlocksFile_OpenFileWriter_SameHash(t *testing.T) { bFile, openErr := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) if openErr != nil { diff --git a/internal/utils/bfs/file_reader.go b/internal/utils/bfs/file_reader.go index 94c53ff..95e436f 100644 --- a/internal/utils/bfs/file_reader.go +++ b/internal/utils/bfs/file_reader.go @@ -15,6 +15,8 @@ type FileReader struct { fileHeader *FileHeader pos int64 + + isClosed bool } func NewFileReader(bFile *BlocksFile, fp *os.File, fileHeader *FileHeader) *FileReader { @@ -74,6 +76,10 @@ func (this *FileReader) Reset(fileHeader *FileHeader) { } func (this *FileReader) Close() error { + if this.isClosed { + return nil + } + this.isClosed = true return this.bFile.CloseFileReader(this) } diff --git a/internal/utils/bfs/fs.go b/internal/utils/bfs/fs.go index 01a1cc4..934bb66 100644 --- a/internal/utils/bfs/fs.go +++ b/internal/utils/bfs/fs.go @@ -328,22 +328,7 @@ func (this *FS) openBFile(bPath string, bName string) (*BlocksFile, error) { // 检查是否超出maxOpenFiles if this.bList.Len() > this.opt.MaxOpenFiles { - var firstBName = this.bList.Shift().Value - var firstBFile = this.bMap[firstBName] - delete(this.bMap, firstBName) - delete(this.bItemMap, firstBName) - - this.closingBMap[firstBFile.Filename()] = zero.Zero{} - - // MUST run in goroutine - go func() { - // 因为 closingBChan 可能已经关闭 - defer func() { - recover() - }() - - this.closingBChan <- firstBFile - }() + this.shiftOpenFiles() } return bFile, nil @@ -366,3 +351,44 @@ func (this *FS) processClosingBFiles() { delete(this.closingBMap, bFile.Filename()) this.mu.Unlock() } + +func (this *FS) shiftOpenFiles() { + var count = this.bList.Len() - this.opt.MaxOpenFiles + if count <= 0 { + return + } + + var bNames []string + this.bList.Range(func(item *linkedlist.Item[string]) (goNext bool) { + var bName = item.Value + var bFile = this.bMap[bName] + if bFile.CanClose() { + bNames = append(bNames, bName) + count-- + } + return count > 0 + }) + + for _, bName := range bNames { + var bFile = this.bMap[bName] + var item = this.bItemMap[bName] + + // clean + delete(this.bMap, bName) + delete(this.bItemMap, bName) + this.bList.Remove(item) + + // add to closing queue + this.closingBMap[bFile.Filename()] = zero.Zero{} + + // MUST run in goroutine + go func(bFile *BlocksFile) { + // 因为 closingBChan 可能已经关闭 + defer func() { + recover() + }() + + this.closingBChan <- bFile + }(bFile) + } +} diff --git a/internal/utils/bfs/fs_test.go b/internal/utils/bfs/fs_test.go index a9b9345..938ed96 100644 --- a/internal/utils/bfs/fs_test.go +++ b/internal/utils/bfs/fs_test.go @@ -129,7 +129,7 @@ func TestFS_OpenFileWriter_Close(t *testing.T) { } fs, openErr := bfs.OpenFS(Tea.Root+"/data/bfs/test", &bfs.FSOptions{ - MaxOpenFiles: 4 << 10, + MaxOpenFiles: 99, }) if openErr != nil { t.Fatal(openErr) @@ -138,9 +138,9 @@ func TestFS_OpenFileWriter_Close(t *testing.T) { _ = fs.Close() }() - var count = 10 + var count = 2 if testutils.IsSingleTesting() { - count = 1000 + count = 100 } for i := 0; i < count; i++ { @@ -165,7 +165,11 @@ func TestFS_OpenFileWriter_Close(t *testing.T) { t.Fatal("len(bNames)!=len(bMap)") } - t.Log("["+types.String(len(bNames))+"]", bNames) + if len(bNames) < 10 { + t.Log("["+types.String(len(bNames))+"]", bNames) + } else { + t.Log("["+types.String(len(bNames))+"]", bNames[:10], "...") + } } p()