bfs commit (exprimental)

This commit is contained in:
GoEdgeLab
2024-04-26 18:44:29 +08:00
parent ebdcf8c5e1
commit 65fbafb712
10 changed files with 250 additions and 89 deletions

View File

@@ -7,6 +7,7 @@ import (
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"
"time"
@@ -45,7 +46,7 @@ func NewBlocksFileWithRawFile(fp *os.File, options *BlockFileOptions) (*BlocksFi
var mu = &sync.RWMutex{}
var mFilename = strings.TrimSuffix(bFilename, BFileExt) + MFileExt
mFile, err := NewMetaFile(mFilename, mu)
mFile, err := OpenMetaFile(mFilename, mu)
if err != nil {
_ = fp.Close()
return nil, fmt.Errorf("load '%s' failed: %w", mFilename, err)
@@ -67,12 +68,23 @@ func NewBlocksFileWithRawFile(fp *os.File, options *BlockFileOptions) (*BlocksFi
}, nil
}
func NewBlocksFile(filename string, options *BlockFileOptions) (*BlocksFile, error) {
func OpenBlocksFile(filename string, options *BlockFileOptions) (*BlocksFile, error) {
// TODO 考虑是否使用flock锁定防止多进程写冲突
fp, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return nil, fmt.Errorf("open blocks file failed: %w", err)
if os.IsNotExist(err) {
var dir = filepath.Dir(filename)
_ = os.MkdirAll(dir, 0777)
// try again
fp, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0666)
}
if err != nil {
return nil, fmt.Errorf("open blocks file failed: %w", err)
}
}
return NewBlocksFileWithRawFile(fp, options)
}
@@ -154,7 +166,7 @@ func (this *BlocksFile) OpenFileReader(fileHash string, isPartial bool) (*FileRe
}
// 是否存在
header, ok := this.mFile.CloneHeader(fileHash)
header, ok := this.mFile.CloneFileHeader(fileHash)
if !ok {
return nil, os.ErrNotExist
}
@@ -176,14 +188,22 @@ func (this *BlocksFile) OpenFileReader(fileHash string, isPartial bool) (*FileRe
return NewFileReader(this, fp, header), nil
}
func (this *BlocksFile) ExistFile(fileHash string) bool {
err := CheckHashErr(fileHash)
if err != nil {
return false
}
return this.mFile.ExistFile(fileHash)
}
func (this *BlocksFile) RemoveFile(fileHash string) error {
err := CheckHashErr(fileHash)
if err != nil {
return err
}
// TODO 需要实现
return nil
return this.mFile.RemoveFile(fileHash)
}
func (this *BlocksFile) Sync() error {

View File

@@ -9,7 +9,7 @@ import (
)
func TestBlocksFile_RemoveAll(t *testing.T) {
bFile, err := bfs.NewBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
if err != nil {
if os.IsNotExist(err) {
return

View File

@@ -2,7 +2,10 @@
package bfs
import "errors"
import (
"errors"
"os"
)
var ErrClosed = errors.New("the file closed")
var ErrInvalidHash = errors.New("invalid hash")
@@ -11,3 +14,7 @@ var ErrFileIsWriting = errors.New("the file is writing")
func IsWritingErr(err error) bool {
return err != nil && errors.Is(err, ErrFileIsWriting)
}
func IsNotExist(err error) bool {
return err != nil && os.IsNotExist(err)
}

View File

@@ -10,21 +10,25 @@ import (
)
type FileReader struct {
bFile *BlocksFile
fp *os.File
header *FileHeader
bFile *BlocksFile
fp *os.File
fileHeader *FileHeader
pos int64
}
func NewFileReader(bFile *BlocksFile, fp *os.File, header *FileHeader) *FileReader {
func NewFileReader(bFile *BlocksFile, fp *os.File, fileHeader *FileHeader) *FileReader {
return &FileReader{
bFile: bFile,
fp: fp,
header: header,
bFile: bFile,
fp: fp,
fileHeader: fileHeader,
}
}
func (this *FileReader) FileHeader() *FileHeader {
return this.fileHeader
}
func (this *FileReader) Read(b []byte) (n int, err error) {
n, err = this.ReadAt(b, this.pos)
this.pos += int64(n)
@@ -33,12 +37,12 @@ func (this *FileReader) Read(b []byte) (n int, err error) {
}
func (this *FileReader) ReadAt(b []byte, offset int64) (n int, err error) {
if offset >= this.header.MaxOffset() {
if offset >= this.fileHeader.MaxOffset() {
err = io.EOF
return
}
blockInfo, ok := this.header.BlockAt(offset)
blockInfo, ok := this.fileHeader.BlockAt(offset)
if !ok {
err = errors.New("could not find block at '" + types.String(offset) + "'")
return

View File

@@ -6,18 +6,23 @@ import (
"fmt"
"github.com/TeaOSLab/EdgeNode/internal/utils/bfs"
"io"
"os"
"testing"
"time"
)
func TestFileReader_Read_SmallBuf(t *testing.T) {
bFile, err := bfs.NewBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
if err != nil {
t.Fatal(err)
}
reader, err := bFile.OpenFileReader(bfs.Hash("123456"), false)
if err != nil {
if os.IsNotExist(err) {
t.Log(err)
return
}
t.Fatal(err)
}
@@ -41,13 +46,21 @@ func TestFileReader_Read_SmallBuf(t *testing.T) {
}
func TestFileReader_Read_LargeBuff(t *testing.T) {
bFile, err := bfs.NewBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
if err != nil {
if os.IsNotExist(err) {
t.Log(err)
return
}
t.Fatal(err)
}
reader, err := bFile.OpenFileReader(bfs.Hash("123456"), false)
if err != nil {
if os.IsNotExist(err) {
t.Log(err)
return
}
t.Fatal(err)
}
@@ -71,13 +84,21 @@ func TestFileReader_Read_LargeBuff(t *testing.T) {
}
func TestFileReader_Read_LargeFile(t *testing.T) {
bFile, err := bfs.NewBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
if err != nil {
if os.IsNotExist(err) {
t.Log(err)
return
}
t.Fatal(err)
}
reader, err := bFile.OpenFileReader(bfs.Hash("123456@LARGE"), false)
if err != nil {
if os.IsNotExist(err) {
t.Log(err)
return
}
t.Fatal(err)
}
@@ -104,13 +125,21 @@ func TestFileReader_Read_LargeFile(t *testing.T) {
}
func TestFileReader_ReadAt(t *testing.T) {
bFile, err := bfs.NewBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
if err != nil {
if os.IsNotExist(err) {
t.Log(err)
return
}
t.Fatal(err)
}
reader, err := bFile.OpenFileReader(bfs.Hash("123456"), false)
if err != nil {
if os.IsNotExist(err) {
t.Log(err)
return
}
t.Fatal(err)
}

View File

@@ -14,7 +14,7 @@ import (
)
func TestNewFileWriter(t *testing.T) {
bFile, err := bfs.NewBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
if err != nil {
t.Fatal(err)
}
@@ -58,7 +58,7 @@ func TestNewFileWriter(t *testing.T) {
}
func TestNewFileWriter_LargeFile(t *testing.T) {
bFile, err := bfs.NewBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
if err != nil {
t.Fatal(err)
}
@@ -105,7 +105,7 @@ func TestNewFileWriter_LargeFile(t *testing.T) {
}
func TestFileWriter_WriteBodyAt(t *testing.T) {
bFile, err := bfs.NewBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
bFile, err := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
if err != nil {
t.Fatal(err)
}

View File

@@ -5,12 +5,11 @@ package bfs
import (
"errors"
"log"
"os"
"path/filepath"
"sync"
"time"
)
// FS 文件系统管理
type FS struct {
dir string
opt *FSOptions
@@ -44,68 +43,39 @@ func (this *FS) init() {
}
func (this *FS) OpenFileWriter(hash string, bodySize int64, isPartial bool) (*FileWriter, error) {
err := CheckHashErr(hash)
if err != nil {
return nil, err
}
if isPartial && bodySize <= 0 {
return nil, errors.New("invalid body size for partial content")
}
bPath, bName, err := this.bPathForHash(hash)
bFile, err := this.openBFileForWriting(hash)
if err != nil {
return nil, err
}
// check directory
// TODO 需要改成提示找不到文件的时候再检查
_, err = os.Stat(filepath.Dir(bPath))
if err != nil && os.IsNotExist(err) {
_ = os.MkdirAll(filepath.Dir(bPath), 0777)
}
this.mu.Lock()
defer this.mu.Unlock()
bFile, ok := this.bMap[bName]
if ok {
return bFile.OpenFileWriter(hash, bodySize, isPartial)
}
bFile, err = NewBlocksFile(bPath, &BlockFileOptions{
BytesPerSync: this.opt.BytesPerSync,
})
if err != nil {
return nil, err
}
this.bMap[bName] = bFile
return bFile.OpenFileWriter(hash, bodySize, isPartial)
}
func (this *FS) OpenFileReader(hash string, isPartial bool) (*FileReader, error) {
err := CheckHashErr(hash)
bFile, err := this.openBFileForReading(hash)
if err != nil {
return nil, err
}
return bFile.OpenFileReader(hash, isPartial)
}
_, bName, err := this.bPathForHash(hash)
func (this *FS) ExistFile(hash string) (bool, error) {
bFile, err := this.openBFileForReading(hash)
if err != nil {
return nil, err
return false, err
}
this.mu.Lock()
defer this.mu.Unlock()
bFile, ok := this.bMap[bName]
if ok {
return bFile.OpenFileReader(hash, isPartial)
}
return nil, os.ErrNotExist
return bFile.ExistFile(hash), nil
}
func (this *FS) RemoveFile(hash string) error {
// TODO 需要实现
return nil
bFile, err := this.openBFileForWriting(hash)
if err != nil {
return err
}
return bFile.RemoveFile(hash)
}
func (this *FS) Close() error {
@@ -168,3 +138,65 @@ func (this *FS) syncLoop() {
}
}
}
func (this *FS) openBFileForWriting(hash string) (*BlocksFile, error) {
err := CheckHashErr(hash)
if err != nil {
return nil, err
}
bPath, bName, err := this.bPathForHash(hash)
if err != nil {
return nil, err
}
this.mu.RLock()
bFile, ok := this.bMap[bName]
this.mu.RUnlock()
if ok {
return bFile, nil
}
return this.openBFile(bPath, bName)
}
func (this *FS) openBFileForReading(hash string) (*BlocksFile, error) {
err := CheckHashErr(hash)
if err != nil {
return nil, err
}
bPath, bName, err := this.bPathForHash(hash)
if err != nil {
return nil, err
}
this.mu.RLock()
bFile, ok := this.bMap[bName]
this.mu.RUnlock()
if ok {
return bFile, nil
}
return this.openBFile(bPath, bName)
}
func (this *FS) openBFile(bPath string, bName string) (*BlocksFile, error) {
this.mu.Lock()
defer this.mu.Unlock()
// lookup again
bFile, ok := this.bMap[bName]
if ok {
return bFile, nil
}
bFile, err := OpenBlocksFile(bPath, &BlockFileOptions{
BytesPerSync: this.opt.BytesPerSync,
})
if err != nil {
return nil, err
}
this.bMap[bName] = bFile
return bFile, nil
}

View File

@@ -4,11 +4,12 @@ package bfs_test
import (
"github.com/TeaOSLab/EdgeNode/internal/utils/bfs"
"github.com/TeaOSLab/EdgeNode/internal/utils/testutils"
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
"github.com/iwind/TeaGo/Tea"
_ "github.com/iwind/TeaGo/bootstrap"
"github.com/iwind/TeaGo/logs"
"io"
"testing"
"time"
)
func TestFS_OpenFileWriter(t *testing.T) {
@@ -18,7 +19,12 @@ func TestFS_OpenFileWriter(t *testing.T) {
}()
{
writer, err := fs.OpenFileWriter(bfs.Hash("123456"), 100, true)
writer, err := fs.OpenFileWriter(bfs.Hash("123456"), -1, false)
if err != nil {
t.Fatal(err)
}
err = writer.WriteMeta(200, fasttime.Now().Unix()+3600, -1)
if err != nil {
t.Fatal(err)
}
@@ -27,10 +33,15 @@ func TestFS_OpenFileWriter(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = writer.Close()
if err != nil {
t.Fatal(err)
}
}
{
writer, err := fs.OpenFileWriter(bfs.Hash("123456"), 100, true)
writer, err := fs.OpenFileWriter(bfs.Hash("654321"), 100, true)
if err != nil {
t.Fatal(err)
}
@@ -40,8 +51,58 @@ func TestFS_OpenFileWriter(t *testing.T) {
t.Fatal(err)
}
}
if testutils.IsSingleTesting() {
time.Sleep(2 * time.Second)
}
}
func TestFS_OpenFileReader(t *testing.T) {
var fs = bfs.NewFS(Tea.Root+"/data/bfs/test", bfs.DefaultFSOptions)
defer func() {
_ = fs.Close()
}()
reader, err := fs.OpenFileReader(bfs.Hash("123456"), false)
if err != nil {
if bfs.IsNotExist(err) {
t.Log(err)
return
}
t.Fatal(err)
}
data, err := io.ReadAll(reader)
if err != nil {
t.Fatal(err)
}
t.Log(string(data))
logs.PrintAsJSON(reader.FileHeader(), t)
}
func TestFS_ExistFile(t *testing.T) {
var fs = bfs.NewFS(Tea.Root+"/data/bfs/test", bfs.DefaultFSOptions)
defer func() {
_ = fs.Close()
}()
exist, err := fs.ExistFile(bfs.Hash("123456"))
if err != nil {
t.Fatal(err)
}
t.Log("exist:", exist)
}
func TestFS_RemoveFile(t *testing.T) {
var fs = bfs.NewFS(Tea.Root+"/data/bfs/test", bfs.DefaultFSOptions)
defer func() {
_ = fs.Close()
}()
var hash = bfs.Hash("123456")
err := fs.RemoveFile(hash)
if err != nil {
t.Fatal(err)
}
exist, err := fs.ExistFile(bfs.Hash("123456"))
if err != nil {
t.Fatal(err)
}
t.Log("exist:", exist)
}

View File

@@ -28,7 +28,7 @@ type MetaFile struct {
modifiedHashMap map[string]zero.Zero
}
func NewMetaFile(filename string, mu *sync.RWMutex) (*MetaFile, error) {
func OpenMetaFile(filename string, mu *sync.RWMutex) (*MetaFile, error) {
fp, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return nil, err
@@ -160,7 +160,6 @@ func (this *MetaFile) WriteClose(hash string, headerSize int64, bodySize int64)
this.mu.Lock()
header, ok := this.headerMap[hash]
if ok {
// TODO 计算headerSize, bodySize
// TODO 检查bodySize和expectedBodySize是否一致如果不一致则从headerMap中删除
header.ModifiedAt = fasttime.Now().Unix()
@@ -186,7 +185,6 @@ func (this *MetaFile) WriteClose(hash string, headerSize int64, bodySize int64)
return err
}
// TODO 考虑自动sync的机制
_, err = this.fp.Write(blockBytes)
this.isModified = true
return err
@@ -217,14 +215,14 @@ func (this *MetaFile) RemoveFile(hash string) error {
return nil
}
func (this *MetaFile) Header(hash string) (header *FileHeader, ok bool) {
func (this *MetaFile) FileHeader(hash string) (header *FileHeader, ok bool) {
this.mu.RLock()
defer this.mu.RUnlock()
header, ok = this.headerMap[hash]
return
}
func (this *MetaFile) CloneHeader(hash string) (header *FileHeader, ok bool) {
func (this *MetaFile) CloneFileHeader(hash string) (header *FileHeader, ok bool) {
this.mu.RLock()
defer this.mu.RUnlock()
header, ok = this.headerMap[hash]
@@ -236,12 +234,20 @@ func (this *MetaFile) CloneHeader(hash string) (header *FileHeader, ok bool) {
return
}
func (this *MetaFile) Headers() map[string]*FileHeader {
func (this *MetaFile) FileHeaders() map[string]*FileHeader {
this.mu.RLock()
defer this.mu.RUnlock()
return this.headerMap
}
func (this *MetaFile) ExistFile(hash string) bool {
this.mu.RLock()
defer this.mu.RUnlock()
_, ok := this.headerMap[hash]
return ok
}
// Compact the meta file
// TODO 考虑自动Compact的时机脏数据比例
func (this *MetaFile) Compact() error {
@@ -294,6 +300,7 @@ func (this *MetaFile) SyncUnsafe() error {
return nil
}
// Close 关闭当前文件
func (this *MetaFile) Close() error {
return this.fp.Close()
}
@@ -365,5 +372,6 @@ func (this *MetaFile) decodeHeader(data []byte) (*FileHeader, error) {
return nil, err
}
header.IsWriting = false
return header, nil
}

View File

@@ -11,7 +11,7 @@ import (
)
func TestNewMetaFile(t *testing.T) {
mFile, err := bfs.NewMetaFile("testdata/test.m", &sync.RWMutex{})
mFile, err := bfs.OpenMetaFile("testdata/test.m", &sync.RWMutex{})
if err != nil {
t.Fatal(err)
}
@@ -19,13 +19,13 @@ func TestNewMetaFile(t *testing.T) {
_ = mFile.Close()
}()
var header, _ = mFile.Header(bfs.Hash("123456"))
var header, _ = mFile.FileHeader(bfs.Hash("123456"))
logs.PrintAsJSON(header, t)
//logs.PrintAsJSON(mFile.Headers(), t)
}
func TestMetaFile_WriteMeta(t *testing.T) {
mFile, err := bfs.NewMetaFile("testdata/test.m", &sync.RWMutex{})
mFile, err := bfs.OpenMetaFile("testdata/test.m", &sync.RWMutex{})
if err != nil {
t.Fatal(err)
}
@@ -63,7 +63,7 @@ func TestMetaFile_WriteMeta(t *testing.T) {
}
func TestMetaFile_Write(t *testing.T) {
mFile, err := bfs.NewMetaFile("testdata/test.m", &sync.RWMutex{})
mFile, err := bfs.OpenMetaFile("testdata/test.m", &sync.RWMutex{})
if err != nil {
t.Fatal(err)
}
@@ -85,7 +85,7 @@ func TestMetaFile_Write(t *testing.T) {
}
func TestMetaFile_RemoveFile(t *testing.T) {
mFile, err := bfs.NewMetaFile("testdata/test.m", &sync.RWMutex{})
mFile, err := bfs.OpenMetaFile("testdata/test.m", &sync.RWMutex{})
if err != nil {
t.Fatal(err)
}
@@ -100,7 +100,7 @@ func TestMetaFile_RemoveFile(t *testing.T) {
}
func TestMetaFile_Compact(t *testing.T) {
mFile, err := bfs.NewMetaFile("testdata/test.m", &sync.RWMutex{})
mFile, err := bfs.OpenMetaFile("testdata/test.m", &sync.RWMutex{})
if err != nil {
t.Fatal(err)
}
@@ -115,7 +115,7 @@ func TestMetaFile_Compact(t *testing.T) {
}
func TestMetaFile_RemoveAll(t *testing.T) {
mFile, err := bfs.NewMetaFile("testdata/test.m", &sync.RWMutex{})
mFile, err := bfs.OpenMetaFile("testdata/test.m", &sync.RWMutex{})
if err != nil {
t.Fatal(err)
}