bfs:同一个Hash同时只能有一个Writer,避免多线程读冲突

This commit is contained in:
GoEdgeLab
2024-04-27 18:27:49 +08:00
parent 7b75c508c6
commit 501f9b6712
5 changed files with 62 additions and 14 deletions

View File

@@ -5,6 +5,7 @@ package bfs
import (
"errors"
"fmt"
"github.com/TeaOSLab/EdgeNode/internal/zero"
"io"
"os"
"path/filepath"
@@ -31,8 +32,9 @@ type BlocksFile struct {
mu *sync.RWMutex
writtenBytes int64
syncAt time.Time
writtenBytes int64
writingFileMap map[string]zero.Zero // hash => Zero
syncAt time.Time
readerPool chan *FileReader
}
@@ -64,12 +66,13 @@ func NewBlocksFileWithRawFile(fp *os.File, options *BlockFileOptions) (*BlocksFi
}
return &BlocksFile{
fp: fp,
mFile: mFile,
mu: mu,
opt: options,
syncAt: time.Now(),
readerPool: make(chan *FileReader, 32),
fp: fp,
mFile: mFile,
mu: mu,
opt: options,
syncAt: time.Now(),
readerPool: make(chan *FileReader, 32),
writingFileMap: map[string]zero.Zero{},
}, nil
}
@@ -102,8 +105,6 @@ func (this *BlocksFile) Write(hash string, blockType BlockType, b []byte, origin
return
}
// TODO 实现 originOffset
this.mu.Lock()
defer this.mu.Unlock()
@@ -144,11 +145,16 @@ func (this *BlocksFile) OpenFileWriter(fileHash string, bodySize int64, isPartia
return nil, err
}
// TODO 限制对同一个Hash同时只能有一个Writer
this.mu.Lock()
defer this.mu.Unlock()
_, isWriting := this.writingFileMap[fileHash]
if isWriting {
err = ErrFileIsWriting
return
}
this.writingFileMap[fileHash] = zero.Zero{}
err = this.checkStatus()
if err != nil {
return
@@ -304,6 +310,12 @@ func (this *BlocksFile) TestReaderPool() chan *FileReader {
return this.readerPool
}
func (this *BlocksFile) removeWritingFile(hash string) {
this.mu.Lock()
delete(this.writingFileMap, hash)
this.mu.Unlock()
}
func (this *BlocksFile) checkStatus() error {
if this.isClosed {
return ErrClosed

View File

@@ -8,6 +8,32 @@ import (
"testing"
)
func TestBlocksFile_OpenFileWriter_SameHash(t *testing.T) {
bFile, openErr := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
if openErr != nil {
if os.IsNotExist(openErr) {
return
}
t.Fatal(openErr)
}
{
writer, err := bFile.OpenFileWriter(bfs.Hash("123456"), -1, false)
if err != nil {
t.Fatal(err)
}
_ = writer.Close()
}
{
writer, err := bFile.OpenFileWriter(bfs.Hash("123456"), -1, false)
if err != nil {
t.Fatal(err)
}
_ = writer.Close()
}
}
func TestBlocksFile_RemoveAll(t *testing.T) {
bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
if err != nil {
@@ -16,6 +42,9 @@ func TestBlocksFile_RemoveAll(t *testing.T) {
}
t.Fatal(err)
}
defer func() {
_ = bFile.Close()
}()
err = bFile.RemoveAll()
if err != nil {

View File

@@ -79,6 +79,10 @@ func (this *FileWriter) WriteBodyAt(b []byte, offset int64) (n int, err error) {
}
func (this *FileWriter) Close() error {
defer func() {
this.bFile.removeWritingFile(this.hash)
}()
if !this.isPartial && !this.hasMeta {
return errors.New("no meta found")
}

View File

@@ -95,8 +95,6 @@ func (this *FS) OpenFileWriter(hash string, bodySize int64, isPartial bool) (*Fi
return nil, errors.New("invalid body size for partial content")
}
// TODO 限制同一个hash同时只能有一个Writer
bFile, err := this.openBFileForHashWriting(hash)
if err != nil {
return nil, err

View File

@@ -233,6 +233,11 @@ func (this *MetaFile) FileHeader(hash string) (header *FileHeader, ok bool) {
return
}
func (this *MetaFile) FileHeaderUnsafe(hash string) (header *FileHeader, ok bool) {
header, ok = this.headerMap[hash]
return
}
func (this *MetaFile) CloneFileHeader(hash string) (header *FileHeader, ok bool) {
this.mu.RLock()
defer this.mu.RUnlock()