bfs:弹出BFile的时候确保没有正在被使用

This commit is contained in:
GoEdgeLab
2024-04-27 20:55:04 +08:00
parent e612ad4082
commit 33c8e4b201
5 changed files with 113 additions and 23 deletions

View File

@@ -11,6 +11,7 @@ import (
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
)
@@ -28,7 +29,8 @@ type BlocksFile struct {
fp *os.File
mFile *MetaFile
isClosed bool
isClosing bool
isClosed bool
mu *sync.RWMutex
@@ -36,7 +38,8 @@ type BlocksFile struct {
writingFileMap map[string]zero.Zero // hash => Zero
syncAt time.Time
readerPool chan *FileReader
readerPool chan *FileReader
countReadingFiles int32
}
func NewBlocksFileWithRawFile(fp *os.File, options *BlockFileOptions) (*BlocksFile, error) {
@@ -199,6 +202,7 @@ func (this *BlocksFile) OpenFileReader(fileHash string, isPartial bool) (*FileRe
return nil, ErrClosed
}
reader.Reset(header)
atomic.AddInt32(&this.countReadingFiles, 1)
return reader, nil
default:
}
@@ -209,10 +213,14 @@ func (this *BlocksFile) OpenFileReader(fileHash string, isPartial bool) (*FileRe
if err != nil {
return nil, err
}
atomic.AddInt32(&this.countReadingFiles, 1)
return NewFileReader(this, fp, header), nil
}
func (this *BlocksFile) CloseFileReader(reader *FileReader) error {
atomic.AddInt32(&this.countReadingFiles, -1)
select {
case this.readerPool <- reader:
return nil
@@ -286,6 +294,19 @@ func (this *BlocksFile) RemoveAll() error {
return os.Remove(this.fp.Name())
}
// CanClose 检查是否可以关闭
func (this *BlocksFile) CanClose() bool {
this.mu.Lock()
defer this.mu.Unlock()
if len(this.writingFileMap) > 0 || atomic.LoadInt32(&this.countReadingFiles) > 0 {
return false
}
this.isClosing = true
return true
}
// Close 关闭当前文件
func (this *BlocksFile) Close() error {
this.mu.Lock()
@@ -317,7 +338,7 @@ func (this *BlocksFile) removeWritingFile(hash string) {
}
func (this *BlocksFile) checkStatus() error {
if this.isClosed {
if this.isClosed || this.isClosing {
return ErrClosed
}
return nil

View File

@@ -4,10 +4,43 @@ package bfs_test
import (
"github.com/TeaOSLab/EdgeNode/internal/utils/bfs"
"github.com/iwind/TeaGo/assert"
"os"
"testing"
)
func TestBlocksFile_CanClose(t *testing.T) {
var a = assert.NewAssertion(t)
bFile, openErr := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
if openErr != nil {
if os.IsNotExist(openErr) {
return
}
t.Fatal(openErr)
}
reader, err := bFile.OpenFileReader(bfs.Hash("123456"), false)
if err != nil {
t.Fatal(err)
}
a.IsTrue(!bFile.CanClose())
err = reader.Close()
if err != nil {
t.Fatal(err)
}
// duplicated close
err = reader.Close()
if err != nil {
t.Fatal(err)
}
a.IsTrue(bFile.CanClose())
}
func TestBlocksFile_OpenFileWriter_SameHash(t *testing.T) {
bFile, openErr := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
if openErr != nil {

View File

@@ -15,6 +15,8 @@ type FileReader struct {
fileHeader *FileHeader
pos int64
isClosed bool
}
func NewFileReader(bFile *BlocksFile, fp *os.File, fileHeader *FileHeader) *FileReader {
@@ -74,6 +76,10 @@ func (this *FileReader) Reset(fileHeader *FileHeader) {
}
func (this *FileReader) Close() error {
if this.isClosed {
return nil
}
this.isClosed = true
return this.bFile.CloseFileReader(this)
}

View File

@@ -328,22 +328,7 @@ func (this *FS) openBFile(bPath string, bName string) (*BlocksFile, error) {
// 检查是否超出maxOpenFiles
if this.bList.Len() > this.opt.MaxOpenFiles {
var firstBName = this.bList.Shift().Value
var firstBFile = this.bMap[firstBName]
delete(this.bMap, firstBName)
delete(this.bItemMap, firstBName)
this.closingBMap[firstBFile.Filename()] = zero.Zero{}
// MUST run in goroutine
go func() {
// 因为 closingBChan 可能已经关闭
defer func() {
recover()
}()
this.closingBChan <- firstBFile
}()
this.shiftOpenFiles()
}
return bFile, nil
@@ -366,3 +351,44 @@ func (this *FS) processClosingBFiles() {
delete(this.closingBMap, bFile.Filename())
this.mu.Unlock()
}
func (this *FS) shiftOpenFiles() {
var count = this.bList.Len() - this.opt.MaxOpenFiles
if count <= 0 {
return
}
var bNames []string
this.bList.Range(func(item *linkedlist.Item[string]) (goNext bool) {
var bName = item.Value
var bFile = this.bMap[bName]
if bFile.CanClose() {
bNames = append(bNames, bName)
count--
}
return count > 0
})
for _, bName := range bNames {
var bFile = this.bMap[bName]
var item = this.bItemMap[bName]
// clean
delete(this.bMap, bName)
delete(this.bItemMap, bName)
this.bList.Remove(item)
// add to closing queue
this.closingBMap[bFile.Filename()] = zero.Zero{}
// MUST run in goroutine
go func(bFile *BlocksFile) {
// 因为 closingBChan 可能已经关闭
defer func() {
recover()
}()
this.closingBChan <- bFile
}(bFile)
}
}

View File

@@ -129,7 +129,7 @@ func TestFS_OpenFileWriter_Close(t *testing.T) {
}
fs, openErr := bfs.OpenFS(Tea.Root+"/data/bfs/test", &bfs.FSOptions{
MaxOpenFiles: 4 << 10,
MaxOpenFiles: 99,
})
if openErr != nil {
t.Fatal(openErr)
@@ -138,9 +138,9 @@ func TestFS_OpenFileWriter_Close(t *testing.T) {
_ = fs.Close()
}()
var count = 10
var count = 2
if testutils.IsSingleTesting() {
count = 1000
count = 100
}
for i := 0; i < count; i++ {
@@ -165,7 +165,11 @@ func TestFS_OpenFileWriter_Close(t *testing.T) {
t.Fatal("len(bNames)!=len(bMap)")
}
t.Log("["+types.String(len(bNames))+"]", bNames)
if len(bNames) < 10 {
t.Log("["+types.String(len(bNames))+"]", bNames)
} else {
t.Log("["+types.String(len(bNames))+"]", bNames[:10], "...")
}
}
p()