diff --git a/internal/utils/bfs/blocks_file.go b/internal/utils/bfs/blocks_file.go index 0bff537..d73b17e 100644 --- a/internal/utils/bfs/blocks_file.go +++ b/internal/utils/bfs/blocks_file.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "os" + "path/filepath" "strings" "sync" "time" @@ -45,7 +46,7 @@ func NewBlocksFileWithRawFile(fp *os.File, options *BlockFileOptions) (*BlocksFi var mu = &sync.RWMutex{} var mFilename = strings.TrimSuffix(bFilename, BFileExt) + MFileExt - mFile, err := NewMetaFile(mFilename, mu) + mFile, err := OpenMetaFile(mFilename, mu) if err != nil { _ = fp.Close() return nil, fmt.Errorf("load '%s' failed: %w", mFilename, err) @@ -67,12 +68,23 @@ func NewBlocksFileWithRawFile(fp *os.File, options *BlockFileOptions) (*BlocksFi }, nil } -func NewBlocksFile(filename string, options *BlockFileOptions) (*BlocksFile, error) { +func OpenBlocksFile(filename string, options *BlockFileOptions) (*BlocksFile, error) { // TODO 考虑是否使用flock锁定,防止多进程写冲突 fp, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0666) if err != nil { - return nil, fmt.Errorf("open blocks file failed: %w", err) + if os.IsNotExist(err) { + var dir = filepath.Dir(filename) + _ = os.MkdirAll(dir, 0777) + + // try again + fp, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0666) + } + + if err != nil { + return nil, fmt.Errorf("open blocks file failed: %w", err) + } } + return NewBlocksFileWithRawFile(fp, options) } @@ -154,7 +166,7 @@ func (this *BlocksFile) OpenFileReader(fileHash string, isPartial bool) (*FileRe } // 是否存在 - header, ok := this.mFile.CloneHeader(fileHash) + header, ok := this.mFile.CloneFileHeader(fileHash) if !ok { return nil, os.ErrNotExist } @@ -176,14 +188,22 @@ func (this *BlocksFile) OpenFileReader(fileHash string, isPartial bool) (*FileRe return NewFileReader(this, fp, header), nil } +func (this *BlocksFile) ExistFile(fileHash string) bool { + err := CheckHashErr(fileHash) + if err != nil { + return false + } + + return this.mFile.ExistFile(fileHash) +} + func (this *BlocksFile) RemoveFile(fileHash string) error { err := CheckHashErr(fileHash) if err != nil { return err } - // TODO 需要实现 - return nil + return this.mFile.RemoveFile(fileHash) } func (this *BlocksFile) Sync() error { diff --git a/internal/utils/bfs/blocks_file_test.go b/internal/utils/bfs/blocks_file_test.go index eaa2930..5dd85bb 100644 --- a/internal/utils/bfs/blocks_file_test.go +++ b/internal/utils/bfs/blocks_file_test.go @@ -9,7 +9,7 @@ import ( ) func TestBlocksFile_RemoveAll(t *testing.T) { - bFile, err := bfs.NewBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) + bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) if err != nil { if os.IsNotExist(err) { return diff --git a/internal/utils/bfs/errors.go b/internal/utils/bfs/errors.go index e9d5b61..ef61d7d 100644 --- a/internal/utils/bfs/errors.go +++ b/internal/utils/bfs/errors.go @@ -2,7 +2,10 @@ package bfs -import "errors" +import ( + "errors" + "os" +) var ErrClosed = errors.New("the file closed") var ErrInvalidHash = errors.New("invalid hash") @@ -11,3 +14,7 @@ var ErrFileIsWriting = errors.New("the file is writing") func IsWritingErr(err error) bool { return err != nil && errors.Is(err, ErrFileIsWriting) } + +func IsNotExist(err error) bool { + return err != nil && os.IsNotExist(err) +} diff --git a/internal/utils/bfs/file_reader.go b/internal/utils/bfs/file_reader.go index 3bbdd2c..bed3de5 100644 --- a/internal/utils/bfs/file_reader.go +++ b/internal/utils/bfs/file_reader.go @@ -10,21 +10,25 @@ import ( ) type FileReader struct { - bFile *BlocksFile - fp *os.File - header *FileHeader + bFile *BlocksFile + fp *os.File + fileHeader *FileHeader pos int64 } -func NewFileReader(bFile *BlocksFile, fp *os.File, header *FileHeader) *FileReader { +func NewFileReader(bFile *BlocksFile, fp *os.File, fileHeader *FileHeader) *FileReader { return &FileReader{ - bFile: bFile, - fp: fp, - header: header, + bFile: bFile, + fp: fp, + fileHeader: fileHeader, } } +func (this *FileReader) FileHeader() *FileHeader { + return this.fileHeader +} + func (this *FileReader) Read(b []byte) (n int, err error) { n, err = this.ReadAt(b, this.pos) this.pos += int64(n) @@ -33,12 +37,12 @@ func (this *FileReader) Read(b []byte) (n int, err error) { } func (this *FileReader) ReadAt(b []byte, offset int64) (n int, err error) { - if offset >= this.header.MaxOffset() { + if offset >= this.fileHeader.MaxOffset() { err = io.EOF return } - blockInfo, ok := this.header.BlockAt(offset) + blockInfo, ok := this.fileHeader.BlockAt(offset) if !ok { err = errors.New("could not find block at '" + types.String(offset) + "'") return diff --git a/internal/utils/bfs/file_reader_test.go b/internal/utils/bfs/file_reader_test.go index c2f00c2..44c08de 100644 --- a/internal/utils/bfs/file_reader_test.go +++ b/internal/utils/bfs/file_reader_test.go @@ -6,18 +6,23 @@ import ( "fmt" "github.com/TeaOSLab/EdgeNode/internal/utils/bfs" "io" + "os" "testing" "time" ) func TestFileReader_Read_SmallBuf(t *testing.T) { - bFile, err := bfs.NewBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) + bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) if err != nil { t.Fatal(err) } reader, err := bFile.OpenFileReader(bfs.Hash("123456"), false) if err != nil { + if os.IsNotExist(err) { + t.Log(err) + return + } t.Fatal(err) } @@ -41,13 +46,21 @@ func TestFileReader_Read_SmallBuf(t *testing.T) { } func TestFileReader_Read_LargeBuff(t *testing.T) { - bFile, err := bfs.NewBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) + bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) if err != nil { + if os.IsNotExist(err) { + t.Log(err) + return + } t.Fatal(err) } reader, err := bFile.OpenFileReader(bfs.Hash("123456"), false) if err != nil { + if os.IsNotExist(err) { + t.Log(err) + return + } t.Fatal(err) } @@ -71,13 +84,21 @@ func TestFileReader_Read_LargeBuff(t *testing.T) { } func TestFileReader_Read_LargeFile(t *testing.T) { - bFile, err := bfs.NewBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) + bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) if err != nil { + if os.IsNotExist(err) { + t.Log(err) + return + } t.Fatal(err) } reader, err := bFile.OpenFileReader(bfs.Hash("123456@LARGE"), false) if err != nil { + if os.IsNotExist(err) { + t.Log(err) + return + } t.Fatal(err) } @@ -104,13 +125,21 @@ func TestFileReader_Read_LargeFile(t *testing.T) { } func TestFileReader_ReadAt(t *testing.T) { - bFile, err := bfs.NewBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) + bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) if err != nil { + if os.IsNotExist(err) { + t.Log(err) + return + } t.Fatal(err) } reader, err := bFile.OpenFileReader(bfs.Hash("123456"), false) if err != nil { + if os.IsNotExist(err) { + t.Log(err) + return + } t.Fatal(err) } diff --git a/internal/utils/bfs/file_writer_test.go b/internal/utils/bfs/file_writer_test.go index 12a1f03..dfea51c 100644 --- a/internal/utils/bfs/file_writer_test.go +++ b/internal/utils/bfs/file_writer_test.go @@ -14,7 +14,7 @@ import ( ) func TestNewFileWriter(t *testing.T) { - bFile, err := bfs.NewBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) + bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) if err != nil { t.Fatal(err) } @@ -58,7 +58,7 @@ func TestNewFileWriter(t *testing.T) { } func TestNewFileWriter_LargeFile(t *testing.T) { - bFile, err := bfs.NewBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) + bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) if err != nil { t.Fatal(err) } @@ -105,7 +105,7 @@ func TestNewFileWriter_LargeFile(t *testing.T) { } func TestFileWriter_WriteBodyAt(t *testing.T) { - bFile, err := bfs.NewBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) + bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) if err != nil { t.Fatal(err) } diff --git a/internal/utils/bfs/fs.go b/internal/utils/bfs/fs.go index 2e1bfc6..cd1ad49 100644 --- a/internal/utils/bfs/fs.go +++ b/internal/utils/bfs/fs.go @@ -5,12 +5,11 @@ package bfs import ( "errors" "log" - "os" - "path/filepath" "sync" "time" ) +// FS 文件系统管理 type FS struct { dir string opt *FSOptions @@ -44,68 +43,39 @@ func (this *FS) init() { } func (this *FS) OpenFileWriter(hash string, bodySize int64, isPartial bool) (*FileWriter, error) { - err := CheckHashErr(hash) - if err != nil { - return nil, err - } - if isPartial && bodySize <= 0 { return nil, errors.New("invalid body size for partial content") } - bPath, bName, err := this.bPathForHash(hash) + bFile, err := this.openBFileForWriting(hash) if err != nil { return nil, err } - - // check directory - // TODO 需要改成提示找不到文件的时候再检查 - _, err = os.Stat(filepath.Dir(bPath)) - if err != nil && os.IsNotExist(err) { - _ = os.MkdirAll(filepath.Dir(bPath), 0777) - } - - this.mu.Lock() - defer this.mu.Unlock() - bFile, ok := this.bMap[bName] - if ok { - return bFile.OpenFileWriter(hash, bodySize, isPartial) - } - - bFile, err = NewBlocksFile(bPath, &BlockFileOptions{ - BytesPerSync: this.opt.BytesPerSync, - }) - if err != nil { - return nil, err - } - this.bMap[bName] = bFile return bFile.OpenFileWriter(hash, bodySize, isPartial) } func (this *FS) OpenFileReader(hash string, isPartial bool) (*FileReader, error) { - err := CheckHashErr(hash) + bFile, err := this.openBFileForReading(hash) if err != nil { return nil, err } + return bFile.OpenFileReader(hash, isPartial) +} - _, bName, err := this.bPathForHash(hash) +func (this *FS) ExistFile(hash string) (bool, error) { + bFile, err := this.openBFileForReading(hash) if err != nil { - return nil, err + return false, err } - - this.mu.Lock() - defer this.mu.Unlock() - bFile, ok := this.bMap[bName] - if ok { - return bFile.OpenFileReader(hash, isPartial) - } - - return nil, os.ErrNotExist + return bFile.ExistFile(hash), nil } func (this *FS) RemoveFile(hash string) error { - // TODO 需要实现 - return nil + bFile, err := this.openBFileForWriting(hash) + if err != nil { + return err + } + return bFile.RemoveFile(hash) } func (this *FS) Close() error { @@ -168,3 +138,65 @@ func (this *FS) syncLoop() { } } } + +func (this *FS) openBFileForWriting(hash string) (*BlocksFile, error) { + err := CheckHashErr(hash) + if err != nil { + return nil, err + } + + bPath, bName, err := this.bPathForHash(hash) + if err != nil { + return nil, err + } + + this.mu.RLock() + bFile, ok := this.bMap[bName] + this.mu.RUnlock() + if ok { + return bFile, nil + } + + return this.openBFile(bPath, bName) +} + +func (this *FS) openBFileForReading(hash string) (*BlocksFile, error) { + err := CheckHashErr(hash) + if err != nil { + return nil, err + } + + bPath, bName, err := this.bPathForHash(hash) + if err != nil { + return nil, err + } + + this.mu.RLock() + bFile, ok := this.bMap[bName] + this.mu.RUnlock() + if ok { + return bFile, nil + } + + return this.openBFile(bPath, bName) +} + +func (this *FS) openBFile(bPath string, bName string) (*BlocksFile, error) { + this.mu.Lock() + defer this.mu.Unlock() + + // lookup again + bFile, ok := this.bMap[bName] + if ok { + return bFile, nil + } + + bFile, err := OpenBlocksFile(bPath, &BlockFileOptions{ + BytesPerSync: this.opt.BytesPerSync, + }) + if err != nil { + return nil, err + } + this.bMap[bName] = bFile + return bFile, nil +} diff --git a/internal/utils/bfs/fs_test.go b/internal/utils/bfs/fs_test.go index 24fc053..271b455 100644 --- a/internal/utils/bfs/fs_test.go +++ b/internal/utils/bfs/fs_test.go @@ -4,11 +4,12 @@ package bfs_test import ( "github.com/TeaOSLab/EdgeNode/internal/utils/bfs" - "github.com/TeaOSLab/EdgeNode/internal/utils/testutils" + "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" "github.com/iwind/TeaGo/Tea" _ "github.com/iwind/TeaGo/bootstrap" + "github.com/iwind/TeaGo/logs" + "io" "testing" - "time" ) func TestFS_OpenFileWriter(t *testing.T) { @@ -18,7 +19,12 @@ func TestFS_OpenFileWriter(t *testing.T) { }() { - writer, err := fs.OpenFileWriter(bfs.Hash("123456"), 100, true) + writer, err := fs.OpenFileWriter(bfs.Hash("123456"), -1, false) + if err != nil { + t.Fatal(err) + } + + err = writer.WriteMeta(200, fasttime.Now().Unix()+3600, -1) if err != nil { t.Fatal(err) } @@ -27,10 +33,15 @@ func TestFS_OpenFileWriter(t *testing.T) { if err != nil { t.Fatal(err) } + + err = writer.Close() + if err != nil { + t.Fatal(err) + } } { - writer, err := fs.OpenFileWriter(bfs.Hash("123456"), 100, true) + writer, err := fs.OpenFileWriter(bfs.Hash("654321"), 100, true) if err != nil { t.Fatal(err) } @@ -40,8 +51,58 @@ func TestFS_OpenFileWriter(t *testing.T) { t.Fatal(err) } } - - if testutils.IsSingleTesting() { - time.Sleep(2 * time.Second) - } +} + +func TestFS_OpenFileReader(t *testing.T) { + var fs = bfs.NewFS(Tea.Root+"/data/bfs/test", bfs.DefaultFSOptions) + defer func() { + _ = fs.Close() + }() + + reader, err := fs.OpenFileReader(bfs.Hash("123456"), false) + if err != nil { + if bfs.IsNotExist(err) { + t.Log(err) + return + } + t.Fatal(err) + } + data, err := io.ReadAll(reader) + if err != nil { + t.Fatal(err) + } + t.Log(string(data)) + logs.PrintAsJSON(reader.FileHeader(), t) +} + +func TestFS_ExistFile(t *testing.T) { + var fs = bfs.NewFS(Tea.Root+"/data/bfs/test", bfs.DefaultFSOptions) + defer func() { + _ = fs.Close() + }() + + exist, err := fs.ExistFile(bfs.Hash("123456")) + if err != nil { + t.Fatal(err) + } + t.Log("exist:", exist) +} + +func TestFS_RemoveFile(t *testing.T) { + var fs = bfs.NewFS(Tea.Root+"/data/bfs/test", bfs.DefaultFSOptions) + defer func() { + _ = fs.Close() + }() + + var hash = bfs.Hash("123456") + err := fs.RemoveFile(hash) + if err != nil { + t.Fatal(err) + } + + exist, err := fs.ExistFile(bfs.Hash("123456")) + if err != nil { + t.Fatal(err) + } + t.Log("exist:", exist) } diff --git a/internal/utils/bfs/meta_file.go b/internal/utils/bfs/meta_file.go index c9bb1f4..81c58c7 100644 --- a/internal/utils/bfs/meta_file.go +++ b/internal/utils/bfs/meta_file.go @@ -28,7 +28,7 @@ type MetaFile struct { modifiedHashMap map[string]zero.Zero } -func NewMetaFile(filename string, mu *sync.RWMutex) (*MetaFile, error) { +func OpenMetaFile(filename string, mu *sync.RWMutex) (*MetaFile, error) { fp, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR, 0666) if err != nil { return nil, err @@ -160,7 +160,6 @@ func (this *MetaFile) WriteClose(hash string, headerSize int64, bodySize int64) this.mu.Lock() header, ok := this.headerMap[hash] if ok { - // TODO 计算headerSize, bodySize // TODO 检查bodySize和expectedBodySize是否一致,如果不一致则从headerMap中删除 header.ModifiedAt = fasttime.Now().Unix() @@ -186,7 +185,6 @@ func (this *MetaFile) WriteClose(hash string, headerSize int64, bodySize int64) return err } - // TODO 考虑自动sync的机制 _, err = this.fp.Write(blockBytes) this.isModified = true return err @@ -217,14 +215,14 @@ func (this *MetaFile) RemoveFile(hash string) error { return nil } -func (this *MetaFile) Header(hash string) (header *FileHeader, ok bool) { +func (this *MetaFile) FileHeader(hash string) (header *FileHeader, ok bool) { this.mu.RLock() defer this.mu.RUnlock() header, ok = this.headerMap[hash] return } -func (this *MetaFile) CloneHeader(hash string) (header *FileHeader, ok bool) { +func (this *MetaFile) CloneFileHeader(hash string) (header *FileHeader, ok bool) { this.mu.RLock() defer this.mu.RUnlock() header, ok = this.headerMap[hash] @@ -236,12 +234,20 @@ func (this *MetaFile) CloneHeader(hash string) (header *FileHeader, ok bool) { return } -func (this *MetaFile) Headers() map[string]*FileHeader { +func (this *MetaFile) FileHeaders() map[string]*FileHeader { this.mu.RLock() defer this.mu.RUnlock() return this.headerMap } +func (this *MetaFile) ExistFile(hash string) bool { + this.mu.RLock() + defer this.mu.RUnlock() + + _, ok := this.headerMap[hash] + return ok +} + // Compact the meta file // TODO 考虑自动Compact的时机(脏数据比例?) func (this *MetaFile) Compact() error { @@ -294,6 +300,7 @@ func (this *MetaFile) SyncUnsafe() error { return nil } +// Close 关闭当前文件 func (this *MetaFile) Close() error { return this.fp.Close() } @@ -365,5 +372,6 @@ func (this *MetaFile) decodeHeader(data []byte) (*FileHeader, error) { return nil, err } + header.IsWriting = false return header, nil } diff --git a/internal/utils/bfs/meta_file_test.go b/internal/utils/bfs/meta_file_test.go index dbcd2e7..dfe2a08 100644 --- a/internal/utils/bfs/meta_file_test.go +++ b/internal/utils/bfs/meta_file_test.go @@ -11,7 +11,7 @@ import ( ) func TestNewMetaFile(t *testing.T) { - mFile, err := bfs.NewMetaFile("testdata/test.m", &sync.RWMutex{}) + mFile, err := bfs.OpenMetaFile("testdata/test.m", &sync.RWMutex{}) if err != nil { t.Fatal(err) } @@ -19,13 +19,13 @@ func TestNewMetaFile(t *testing.T) { _ = mFile.Close() }() - var header, _ = mFile.Header(bfs.Hash("123456")) + var header, _ = mFile.FileHeader(bfs.Hash("123456")) logs.PrintAsJSON(header, t) //logs.PrintAsJSON(mFile.Headers(), t) } func TestMetaFile_WriteMeta(t *testing.T) { - mFile, err := bfs.NewMetaFile("testdata/test.m", &sync.RWMutex{}) + mFile, err := bfs.OpenMetaFile("testdata/test.m", &sync.RWMutex{}) if err != nil { t.Fatal(err) } @@ -63,7 +63,7 @@ func TestMetaFile_WriteMeta(t *testing.T) { } func TestMetaFile_Write(t *testing.T) { - mFile, err := bfs.NewMetaFile("testdata/test.m", &sync.RWMutex{}) + mFile, err := bfs.OpenMetaFile("testdata/test.m", &sync.RWMutex{}) if err != nil { t.Fatal(err) } @@ -85,7 +85,7 @@ func TestMetaFile_Write(t *testing.T) { } func TestMetaFile_RemoveFile(t *testing.T) { - mFile, err := bfs.NewMetaFile("testdata/test.m", &sync.RWMutex{}) + mFile, err := bfs.OpenMetaFile("testdata/test.m", &sync.RWMutex{}) if err != nil { t.Fatal(err) } @@ -100,7 +100,7 @@ func TestMetaFile_RemoveFile(t *testing.T) { } func TestMetaFile_Compact(t *testing.T) { - mFile, err := bfs.NewMetaFile("testdata/test.m", &sync.RWMutex{}) + mFile, err := bfs.OpenMetaFile("testdata/test.m", &sync.RWMutex{}) if err != nil { t.Fatal(err) } @@ -115,7 +115,7 @@ func TestMetaFile_Compact(t *testing.T) { } func TestMetaFile_RemoveAll(t *testing.T) { - mFile, err := bfs.NewMetaFile("testdata/test.m", &sync.RWMutex{}) + mFile, err := bfs.OpenMetaFile("testdata/test.m", &sync.RWMutex{}) if err != nil { t.Fatal(err) }