bfs:修复并发读提示ErrClosed都问题

This commit is contained in:
刘祥超
2024-04-28 19:02:59 +08:00
parent 0dbbc12eb7
commit 1cede74db3
2 changed files with 104 additions and 46 deletions

View File

@@ -38,8 +38,8 @@ type BlocksFile struct {
writingFileMap map[string]zero.Zero // hash => Zero writingFileMap map[string]zero.Zero // hash => Zero
syncAt time.Time syncAt time.Time
readerPool chan *FileReader readerPool chan *FileReader
countReadingFiles int32 countRefs int32
} }
func NewBlocksFileWithRawFile(fp *os.File, options *BlockFileOptions) (*BlocksFile, error) { func NewBlocksFileWithRawFile(fp *os.File, options *BlockFileOptions) (*BlocksFile, error) {
@@ -202,7 +202,7 @@ func (this *BlocksFile) OpenFileReader(fileHash string, isPartial bool) (*FileRe
return nil, ErrClosed return nil, ErrClosed
} }
reader.Reset(header) reader.Reset(header)
atomic.AddInt32(&this.countReadingFiles, 1) atomic.AddInt32(&this.countRefs, 1)
return reader, nil return reader, nil
default: default:
} }
@@ -214,12 +214,12 @@ func (this *BlocksFile) OpenFileReader(fileHash string, isPartial bool) (*FileRe
return nil, err return nil, err
} }
atomic.AddInt32(&this.countReadingFiles, 1) atomic.AddInt32(&this.countRefs, 1)
return NewFileReader(this, fp, header), nil return NewFileReader(this, fp, header), nil
} }
func (this *BlocksFile) CloseFileReader(reader *FileReader) error { func (this *BlocksFile) CloseFileReader(reader *FileReader) error {
atomic.AddInt32(&this.countReadingFiles, -1) defer atomic.AddInt32(&this.countRefs, -1)
select { select {
case this.readerPool <- reader: case this.readerPool <- reader:
@@ -296,10 +296,10 @@ func (this *BlocksFile) RemoveAll() error {
// CanClose 检查是否可以关闭 // CanClose 检查是否可以关闭
func (this *BlocksFile) CanClose() bool { func (this *BlocksFile) CanClose() bool {
this.mu.Lock() this.mu.RLock()
defer this.mu.Unlock() defer this.mu.RUnlock()
if len(this.writingFileMap) > 0 || atomic.LoadInt32(&this.countReadingFiles) > 0 { if len(this.writingFileMap) > 0 || atomic.LoadInt32(&this.countRefs) > 0 {
return false return false
} }
@@ -327,6 +327,19 @@ func (this *BlocksFile) Close() error {
return this.fp.Close() return this.fp.Close()
} }
// IsClosing 判断当前文件是否正在关闭或者已关闭
func (this *BlocksFile) IsClosing() bool {
return this.isClosed || this.isClosing
}
func (this *BlocksFile) IncrRef() {
atomic.AddInt32(&this.countRefs, 1)
}
func (this *BlocksFile) DecrRef() {
atomic.AddInt32(&this.countRefs, -1)
}
func (this *BlocksFile) TestReaderPool() chan *FileReader { func (this *BlocksFile) TestReaderPool() chan *FileReader {
return this.readerPool return this.readerPool
} }
@@ -339,7 +352,7 @@ func (this *BlocksFile) removeWritingFile(hash string) {
func (this *BlocksFile) checkStatus() error { func (this *BlocksFile) checkStatus() error {
if this.isClosed || this.isClosing { if this.isClosed || this.isClosing {
return ErrClosed return fmt.Errorf("check status failed: %w", ErrClosed)
} }
return nil return nil
} }
@@ -355,15 +368,17 @@ func (this *BlocksFile) sync(force bool) error {
} }
} }
this.writtenBytes = 0 if this.writtenBytes > 0 {
AckWriteThread()
AckWriteThread() err := this.fp.Sync()
err := this.fp.Sync() ReleaseWriteThread()
ReleaseWriteThread() if err != nil {
if err != nil { return err
return err }
} }
this.writtenBytes = 0
this.syncAt = time.Now() this.syncAt = time.Now()
if force { if force {

View File

@@ -4,6 +4,7 @@ package bfs
import ( import (
"errors" "errors"
"github.com/TeaOSLab/EdgeNode/internal/goman"
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
"github.com/TeaOSLab/EdgeNode/internal/utils/linkedlist" "github.com/TeaOSLab/EdgeNode/internal/utils/linkedlist"
"github.com/TeaOSLab/EdgeNode/internal/zero" "github.com/TeaOSLab/EdgeNode/internal/zero"
@@ -151,11 +152,19 @@ func (this *FS) Close() error {
var lastErr error var lastErr error
this.mu.Lock() this.mu.Lock()
for _, bFile := range this.bMap { if len(this.bMap) > 0 {
err := bFile.Close() var g = goman.NewTaskGroup()
if err != nil { for _, bFile := range this.bMap {
lastErr = err var bFileCopy = bFile
g.Run(func() {
err := bFileCopy.Close()
if err != nil {
lastErr = err
}
})
} }
g.Wait()
} }
this.mu.Unlock() this.mu.Unlock()
@@ -213,8 +222,17 @@ func (this *FS) syncLoop() {
this.mu.RUnlock() this.mu.RUnlock()
for _, bFile := range bFiles { for _, bFile := range bFiles {
if bFile.IsClosing() {
continue
}
err := bFile.ForceSync() err := bFile.ForceSync()
if err != nil { if err != nil {
// check again
if bFile.IsClosing() {
continue
}
// TODO 可以在options自定义一个logger // TODO 可以在options自定义一个logger
log.Println("BFS", "sync failed: "+err.Error()) log.Println("BFS", "sync failed: "+err.Error())
} }
@@ -262,46 +280,34 @@ func (this *FS) openBFileForHashReading(hash string) (*BlocksFile, error) {
return nil, err return nil, err
} }
this.mu.RLock() err = this.waitBFile(bPath)
if err != nil {
return nil, err
}
this.mu.Lock()
bFile, ok := this.bMap[bName] bFile, ok := this.bMap[bName]
this.mu.RUnlock()
if ok { if ok {
// 调整当前BFile所在位置 // 调整当前BFile所在位置
this.mu.Lock()
item, itemOk := this.bItemMap[bName] item, itemOk := this.bItemMap[bName]
if itemOk { if itemOk {
this.bList.Remove(item) this.bList.Remove(item)
this.bList.Push(item) this.bList.Push(item)
} }
this.mu.Unlock() this.mu.Unlock()
return bFile, nil return bFile, nil
} }
this.mu.Unlock()
return this.openBFile(bPath, bName) return this.openBFile(bPath, bName)
} }
func (this *FS) openBFile(bPath string, bName string) (*BlocksFile, error) { func (this *FS) openBFile(bPath string, bName string) (*BlocksFile, error) {
// check closing queue // check closing queue
this.mu.RLock() err := this.waitBFile(bPath)
_, isClosing := this.closingBMap[bPath] if err != nil {
this.mu.RUnlock() return nil, err
if isClosing {
var maxWaits = 30_000
for {
this.mu.RLock()
_, isClosing = this.closingBMap[bPath]
this.mu.RUnlock()
if !isClosing {
break
}
time.Sleep(1 * time.Millisecond)
maxWaits--
if maxWaits < 0 {
return nil, errors.New("open blocks file timeout")
}
}
} }
this.mu.Lock() this.mu.Lock()
@@ -313,12 +319,18 @@ func (this *FS) openBFile(bPath string, bName string) (*BlocksFile, error) {
return bFile, nil return bFile, nil
} }
bFile, err := OpenBlocksFile(bPath, &BlockFileOptions{ // TODO 不要把 OpenBlocksFile 放入到 mu 中?
bFile, err = OpenBlocksFile(bPath, &BlockFileOptions{
BytesPerSync: this.opt.BytesPerSync, BytesPerSync: this.opt.BytesPerSync,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
// 防止被关闭
bFile.IncrRef()
defer bFile.DecrRef()
this.bMap[bName] = bFile this.bMap[bName] = bFile
// 加入到列表中 // 加入到列表中
@@ -352,21 +364,26 @@ func (this *FS) processClosingBFiles() {
this.mu.Unlock() this.mu.Unlock()
} }
// 弹出超出BFile数量限制的BFile
func (this *FS) shiftOpenFiles() { func (this *FS) shiftOpenFiles() {
var count = this.bList.Len() - this.opt.MaxOpenFiles var l = this.bList.Len()
var count = l - this.opt.MaxOpenFiles
if count <= 0 { if count <= 0 {
return return
} }
var bNames []string var bNames []string
var searchCount int
this.bList.Range(func(item *linkedlist.Item[string]) (goNext bool) { this.bList.Range(func(item *linkedlist.Item[string]) (goNext bool) {
searchCount++
var bName = item.Value var bName = item.Value
var bFile = this.bMap[bName] var bFile = this.bMap[bName]
if bFile.CanClose() { if bFile.CanClose() {
bNames = append(bNames, bName) bNames = append(bNames, bName)
count-- count--
} }
return count > 0 return count > 0 && searchCount < 8 && searchCount < l-8
}) })
for _, bName := range bNames { for _, bName := range bNames {
@@ -392,3 +409,29 @@ func (this *FS) shiftOpenFiles() {
}(bFile) }(bFile)
} }
} }
func (this *FS) waitBFile(bPath string) error {
this.mu.RLock()
_, isClosing := this.closingBMap[bPath]
this.mu.RUnlock()
if !isClosing {
return nil
}
var maxWaits = 30_000
for {
this.mu.RLock()
_, isClosing = this.closingBMap[bPath]
this.mu.RUnlock()
if !isClosing {
break
}
time.Sleep(1 * time.Millisecond)
maxWaits--
if maxWaits < 0 {
return errors.New("open blocks file timeout")
}
}
return nil
}