diff --git a/internal/utils/bfs/blocks_file.go b/internal/utils/bfs/blocks_file.go index 0fe8690..9a12ba6 100644 --- a/internal/utils/bfs/blocks_file.go +++ b/internal/utils/bfs/blocks_file.go @@ -5,6 +5,7 @@ package bfs import ( "errors" "fmt" + "github.com/TeaOSLab/EdgeNode/internal/zero" "io" "os" "path/filepath" @@ -31,8 +32,9 @@ type BlocksFile struct { mu *sync.RWMutex - writtenBytes int64 - syncAt time.Time + writtenBytes int64 + writingFileMap map[string]zero.Zero // hash => Zero + syncAt time.Time readerPool chan *FileReader } @@ -64,12 +66,13 @@ func NewBlocksFileWithRawFile(fp *os.File, options *BlockFileOptions) (*BlocksFi } return &BlocksFile{ - fp: fp, - mFile: mFile, - mu: mu, - opt: options, - syncAt: time.Now(), - readerPool: make(chan *FileReader, 32), + fp: fp, + mFile: mFile, + mu: mu, + opt: options, + syncAt: time.Now(), + readerPool: make(chan *FileReader, 32), + writingFileMap: map[string]zero.Zero{}, }, nil } @@ -102,8 +105,6 @@ func (this *BlocksFile) Write(hash string, blockType BlockType, b []byte, origin return } - // TODO 实现 originOffset - this.mu.Lock() defer this.mu.Unlock() @@ -144,11 +145,16 @@ func (this *BlocksFile) OpenFileWriter(fileHash string, bodySize int64, isPartia return nil, err } - // TODO 限制对同一个Hash同时只能有一个Writer - this.mu.Lock() defer this.mu.Unlock() + _, isWriting := this.writingFileMap[fileHash] + if isWriting { + err = ErrFileIsWriting + return + } + this.writingFileMap[fileHash] = zero.Zero{} + err = this.checkStatus() if err != nil { return @@ -304,6 +310,12 @@ func (this *BlocksFile) TestReaderPool() chan *FileReader { return this.readerPool } +func (this *BlocksFile) removeWritingFile(hash string) { + this.mu.Lock() + delete(this.writingFileMap, hash) + this.mu.Unlock() +} + func (this *BlocksFile) checkStatus() error { if this.isClosed { return ErrClosed diff --git a/internal/utils/bfs/blocks_file_test.go b/internal/utils/bfs/blocks_file_test.go index 5dd85bb..3480880 100644 --- a/internal/utils/bfs/blocks_file_test.go +++ b/internal/utils/bfs/blocks_file_test.go @@ -8,6 +8,32 @@ import ( "testing" ) +func TestBlocksFile_OpenFileWriter_SameHash(t *testing.T) { + bFile, openErr := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) + if openErr != nil { + if os.IsNotExist(openErr) { + return + } + t.Fatal(openErr) + } + + { + writer, err := bFile.OpenFileWriter(bfs.Hash("123456"), -1, false) + if err != nil { + t.Fatal(err) + } + _ = writer.Close() + } + + { + writer, err := bFile.OpenFileWriter(bfs.Hash("123456"), -1, false) + if err != nil { + t.Fatal(err) + } + _ = writer.Close() + } +} + func TestBlocksFile_RemoveAll(t *testing.T) { bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) if err != nil { @@ -16,6 +42,9 @@ func TestBlocksFile_RemoveAll(t *testing.T) { } t.Fatal(err) } + defer func() { + _ = bFile.Close() + }() err = bFile.RemoveAll() if err != nil { diff --git a/internal/utils/bfs/file_writer.go b/internal/utils/bfs/file_writer.go index 1ff6c6f..38dc7b2 100644 --- a/internal/utils/bfs/file_writer.go +++ b/internal/utils/bfs/file_writer.go @@ -79,6 +79,10 @@ func (this *FileWriter) WriteBodyAt(b []byte, offset int64) (n int, err error) { } func (this *FileWriter) Close() error { + defer func() { + this.bFile.removeWritingFile(this.hash) + }() + if !this.isPartial && !this.hasMeta { return errors.New("no meta found") } diff --git a/internal/utils/bfs/fs.go b/internal/utils/bfs/fs.go index 1383421..01a1cc4 100644 --- a/internal/utils/bfs/fs.go +++ b/internal/utils/bfs/fs.go @@ -95,8 +95,6 @@ func (this *FS) OpenFileWriter(hash string, bodySize int64, isPartial bool) (*Fi return nil, errors.New("invalid body size for partial content") } - // TODO 限制同一个hash同时只能有一个Writer - bFile, err := this.openBFileForHashWriting(hash) if err != nil { return nil, err diff --git a/internal/utils/bfs/meta_file.go b/internal/utils/bfs/meta_file.go index cc7d06d..e474219 100644 --- a/internal/utils/bfs/meta_file.go +++ b/internal/utils/bfs/meta_file.go @@ -233,6 +233,11 @@ func (this *MetaFile) FileHeader(hash string) (header *FileHeader, ok bool) { return } +func (this *MetaFile) FileHeaderUnsafe(hash string) (header *FileHeader, ok bool) { + header, ok = this.headerMap[hash] + return +} + func (this *MetaFile) CloneFileHeader(hash string) (header *FileHeader, ok bool) { this.mu.RLock() defer this.mu.RUnlock()