bfs:弹出BFile的时候确保没有正在被使用

This commit is contained in:
刘祥超
2024-04-27 20:55:04 +08:00
parent 6226e31cdc
commit 962cbde4e9
5 changed files with 113 additions and 23 deletions

View File

@@ -11,6 +11,7 @@ import (
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
)
@@ -28,7 +29,8 @@ type BlocksFile struct {
fp *os.File
mFile *MetaFile
isClosed bool
isClosing bool
isClosed bool
mu *sync.RWMutex
@@ -36,7 +38,8 @@ type BlocksFile struct {
writingFileMap map[string]zero.Zero // hash => Zero
syncAt time.Time
readerPool chan *FileReader
readerPool chan *FileReader
countReadingFiles int32
}
func NewBlocksFileWithRawFile(fp *os.File, options *BlockFileOptions) (*BlocksFile, error) {
@@ -199,6 +202,7 @@ func (this *BlocksFile) OpenFileReader(fileHash string, isPartial bool) (*FileRe
return nil, ErrClosed
}
reader.Reset(header)
atomic.AddInt32(&this.countReadingFiles, 1)
return reader, nil
default:
}
@@ -209,10 +213,14 @@ func (this *BlocksFile) OpenFileReader(fileHash string, isPartial bool) (*FileRe
if err != nil {
return nil, err
}
atomic.AddInt32(&this.countReadingFiles, 1)
return NewFileReader(this, fp, header), nil
}
func (this *BlocksFile) CloseFileReader(reader *FileReader) error {
atomic.AddInt32(&this.countReadingFiles, -1)
select {
case this.readerPool <- reader:
return nil
@@ -286,6 +294,19 @@ func (this *BlocksFile) RemoveAll() error {
return os.Remove(this.fp.Name())
}
// CanClose 检查是否可以关闭
func (this *BlocksFile) CanClose() bool {
this.mu.Lock()
defer this.mu.Unlock()
if len(this.writingFileMap) > 0 || atomic.LoadInt32(&this.countReadingFiles) > 0 {
return false
}
this.isClosing = true
return true
}
// Close 关闭当前文件
func (this *BlocksFile) Close() error {
this.mu.Lock()
@@ -317,7 +338,7 @@ func (this *BlocksFile) removeWritingFile(hash string) {
}
func (this *BlocksFile) checkStatus() error {
if this.isClosed {
if this.isClosed || this.isClosing {
return ErrClosed
}
return nil