diff --git a/internal/utils/bfs/.gitignore b/internal/utils/bfs/.gitignore new file mode 100644 index 0000000..09d8579 --- /dev/null +++ b/internal/utils/bfs/.gitignore @@ -0,0 +1,2 @@ +DESIGN.md +test.* diff --git a/internal/utils/bfs/block_info.go b/internal/utils/bfs/block_info.go new file mode 100644 index 0000000..9c2d4f2 --- /dev/null +++ b/internal/utils/bfs/block_info.go @@ -0,0 +1,15 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package bfs + +type BlockInfo struct { + OriginOffsetFrom int64 `json:"1,omitempty"` + OriginOffsetTo int64 `json:"2,omitempty"` + + BFileOffsetFrom int64 `json:"3,omitempty"` + BFileOffsetTo int64 `json:"4,omitempty"` +} + +func (this BlockInfo) Contains(offset int64) bool { + return this.OriginOffsetFrom <= offset && this.OriginOffsetTo > /** MUST be gt, NOT gte **/ offset +} diff --git a/internal/utils/bfs/blocks_file.go b/internal/utils/bfs/blocks_file.go new file mode 100644 index 0000000..0bff537 --- /dev/null +++ b/internal/utils/bfs/blocks_file.go @@ -0,0 +1,281 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package bfs + +import ( + "errors" + "fmt" + "io" + "os" + "strings" + "sync" + "time" +) + +const BFileExt = ".b" + +type BlockType string + +const ( + BlockTypeHeader BlockType = "header" + BlockTypeBody BlockType = "body" +) + +type BlocksFile struct { + opt *BlockFileOptions + fp *os.File + mFile *MetaFile + + isClosed bool + + mu *sync.RWMutex + + writtenBytes int64 + syncAt time.Time +} + +func NewBlocksFileWithRawFile(fp *os.File, options *BlockFileOptions) (*BlocksFile, error) { + options.EnsureDefaults() + + var bFilename = fp.Name() + if !strings.HasSuffix(bFilename, BFileExt) { + return nil, errors.New("filename '" + bFilename + "' must has a '" + BFileExt + "' extension") + } + + var mu = &sync.RWMutex{} + + var mFilename = strings.TrimSuffix(bFilename, BFileExt) + MFileExt + mFile, err := NewMetaFile(mFilename, mu) + if err != nil { + _ = fp.Close() + return nil, fmt.Errorf("load '%s' failed: %w", mFilename, err) + } + + _, err = fp.Seek(0, io.SeekEnd) + if err != nil { + _ = fp.Close() + _ = mFile.Close() + return nil, err + } + + return &BlocksFile{ + fp: fp, + mFile: mFile, + mu: mu, + opt: options, + syncAt: time.Now(), + }, nil +} + +func NewBlocksFile(filename string, options *BlockFileOptions) (*BlocksFile, error) { + // TODO 考虑是否使用flock锁定,防止多进程写冲突 + fp, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + return nil, fmt.Errorf("open blocks file failed: %w", err) + } + return NewBlocksFileWithRawFile(fp, options) +} + +func (this *BlocksFile) Filename() string { + return this.fp.Name() +} + +func (this *BlocksFile) Write(hash string, blockType BlockType, b []byte, originOffset int64) (n int, err error) { + if len(b) == 0 { + return + } + + // TODO 实现 originOffset + + this.mu.Lock() + defer this.mu.Unlock() + + posBefore, err := this.currentPos() + if err != nil { + return 0, err + } + + err = this.checkStatus() + if err != nil { + return + } + + n, err = this.fp.Write(b) + + if err == nil { + if n > 0 { + this.writtenBytes += int64(n) + } + + if blockType == BlockTypeHeader { + err = this.mFile.WriteHeaderBlockUnsafe(hash, posBefore, posBefore+int64(n)) + } else if blockType == BlockTypeBody { + err = this.mFile.WriteBodyBlockUnsafe(hash, posBefore, posBefore+int64(n), originOffset, originOffset+int64(n)) + } else { + err = errors.New("invalid block type '" + string(blockType) + "'") + } + } + + return +} + +func (this *BlocksFile) OpenFileWriter(fileHash string, bodySize int64, isPartial bool) (writer *FileWriter, err error) { + err = CheckHashErr(fileHash) + if err != nil { + return nil, err + } + + // TODO 限制对同一个Hash同时只能有一个Writer + + this.mu.Lock() + defer this.mu.Unlock() + + err = this.checkStatus() + if err != nil { + return + } + + return NewFileWriter(this, fileHash, bodySize, isPartial) +} + +func (this *BlocksFile) OpenFileReader(fileHash string, isPartial bool) (*FileReader, error) { + err := CheckHashErr(fileHash) + if err != nil { + return nil, err + } + + // TODO 需要设置单个BFile文件的maxOpenFiles + + this.mu.RLock() + err = this.checkStatus() + this.mu.RUnlock() + if err != nil { + return nil, err + } + + // 是否存在 + header, ok := this.mFile.CloneHeader(fileHash) + if !ok { + return nil, os.ErrNotExist + } + + // TODO 对于partial content,需要传入ranges,用来判断是否有交集 + + if header.IsWriting { + return nil, ErrFileIsWriting + } + + if !isPartial && !header.IsCompleted { + return nil, os.ErrNotExist + } + + fp, err := os.Open(this.fp.Name()) + if err != nil { + return nil, err + } + return NewFileReader(this, fp, header), nil +} + +func (this *BlocksFile) RemoveFile(fileHash string) error { + err := CheckHashErr(fileHash) + if err != nil { + return err + } + + // TODO 需要实现 + return nil +} + +func (this *BlocksFile) Sync() error { + this.mu.Lock() + defer this.mu.Unlock() + + err := this.checkStatus() + if err != nil { + return err + } + + return this.sync(false) +} + +func (this *BlocksFile) ForceSync() error { + this.mu.Lock() + defer this.mu.Unlock() + + err := this.checkStatus() + if err != nil { + return err + } + + return this.sync(true) +} + +func (this *BlocksFile) SyncAt() time.Time { + return this.syncAt +} + +func (this *BlocksFile) Compact() error { + // TODO 需要实现 + return nil +} + +func (this *BlocksFile) RemoveAll() error { + this.mu.Lock() + defer this.mu.Unlock() + + this.isClosed = true + + _ = this.mFile.RemoveAll() + _ = this.fp.Close() + return os.Remove(this.fp.Name()) +} + +func (this *BlocksFile) Close() error { + this.mu.Lock() + defer this.mu.Unlock() + + err := this.sync(true) + if err != nil { + return err + } + + this.isClosed = true + + _ = this.mFile.Close() + + return this.fp.Close() +} + +func (this *BlocksFile) checkStatus() error { + if this.isClosed { + return ErrClosed + } + return nil +} + +func (this *BlocksFile) currentPos() (int64, error) { + return this.fp.Seek(0, io.SeekCurrent) +} + +func (this *BlocksFile) sync(force bool) error { + if !force { + if this.writtenBytes < this.opt.BytesPerSync { + return nil + } + } + + this.writtenBytes = 0 + + err := this.fp.Sync() + if err != nil { + return err + } + + this.syncAt = time.Now() + + if force { + return this.mFile.SyncUnsafe() + } + + return nil +} diff --git a/internal/utils/bfs/blocks_file_options.go b/internal/utils/bfs/blocks_file_options.go new file mode 100644 index 0000000..eba4cf1 --- /dev/null +++ b/internal/utils/bfs/blocks_file_options.go @@ -0,0 +1,18 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package bfs + +type BlockFileOptions struct { + BytesPerSync int64 + MaxOpenFiles int // TODO 需要实现,主要用于OpenFileReader +} + +func (this *BlockFileOptions) EnsureDefaults() { + if this.BytesPerSync <= 0 { + this.BytesPerSync = 1 << 20 + } +} + +var DefaultBlockFileOptions = &BlockFileOptions{ + BytesPerSync: 1 << 20, +} diff --git a/internal/utils/bfs/blocks_file_test.go b/internal/utils/bfs/blocks_file_test.go new file mode 100644 index 0000000..eaa2930 --- /dev/null +++ b/internal/utils/bfs/blocks_file_test.go @@ -0,0 +1,24 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package bfs_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/utils/bfs" + "os" + "testing" +) + +func TestBlocksFile_RemoveAll(t *testing.T) { + bFile, err := bfs.NewBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) + if err != nil { + if os.IsNotExist(err) { + return + } + t.Fatal(err) + } + + err = bFile.RemoveAll() + if err != nil { + t.Fatal(err) + } +} diff --git a/internal/utils/bfs/errors.go b/internal/utils/bfs/errors.go new file mode 100644 index 0000000..e9d5b61 --- /dev/null +++ b/internal/utils/bfs/errors.go @@ -0,0 +1,13 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package bfs + +import "errors" + +var ErrClosed = errors.New("the file closed") +var ErrInvalidHash = errors.New("invalid hash") +var ErrFileIsWriting = errors.New("the file is writing") + +func IsWritingErr(err error) bool { + return err != nil && errors.Is(err, ErrFileIsWriting) +} diff --git a/internal/utils/bfs/file_header.go b/internal/utils/bfs/file_header.go new file mode 100644 index 0000000..18911fc --- /dev/null +++ b/internal/utils/bfs/file_header.go @@ -0,0 +1,95 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package bfs + +import ( + "sort" +) + +type FileHeader struct { + Version int `json:"1,omitempty"` + ModifiedAt int64 `json:"2,omitempty"` + ExpiresAt int64 `json:"3,omitempty"` + Status int `json:"4,omitempty"` + HeaderSize int64 `json:"5,omitempty"` + BodySize int64 `json:"6,omitempty"` + ExpiredBodySize int64 `json:"7,omitempty"` + HeaderBlocks []BlockInfo `json:"8,omitempty"` + BodyBlocks []BlockInfo `json:"9,omitempty"` + IsCompleted bool `json:"10,omitempty"` + IsWriting bool `json:"11,omitempty"` +} + +func (this *FileHeader) Compact() { + // TODO 合并相邻的headerBlocks和bodyBlocks(必须是对应的BFile offset也要相邻) + + if len(this.BodyBlocks) > 0 { + sort.Slice(this.BodyBlocks, func(i, j int) bool { + var block1 = this.BodyBlocks[i] + var block2 = this.BodyBlocks[j] + if block1.OriginOffsetFrom == block1.OriginOffsetFrom { + return block1.OriginOffsetTo < block2.OriginOffsetTo + } + return block1.OriginOffsetFrom < block2.OriginOffsetFrom + }) + + var isCompleted = true + if this.BodyBlocks[0].OriginOffsetFrom != 0 || this.BodyBlocks[len(this.BodyBlocks)-1].OriginOffsetTo != this.BodySize { + isCompleted = false + } else { + for index, block := range this.BodyBlocks { + // 是否有不连续的 + if index > 0 && block.OriginOffsetFrom > this.BodyBlocks[index-1].OriginOffsetTo { + isCompleted = false + break + } + } + } + this.IsCompleted = isCompleted + } +} + +func (this *FileHeader) Clone() *FileHeader { + return &FileHeader{ + Version: this.Version, + ModifiedAt: this.ModifiedAt, + ExpiresAt: this.ExpiresAt, + Status: this.Status, + HeaderSize: this.HeaderSize, + BodySize: this.BodySize, + ExpiredBodySize: this.ExpiredBodySize, + HeaderBlocks: this.HeaderBlocks, + BodyBlocks: this.BodyBlocks, + IsCompleted: this.IsCompleted, + IsWriting: this.IsWriting, + } +} + +func (this *FileHeader) BlockAt(offset int64) (blockInfo BlockInfo, ok bool) { + var l = len(this.BodyBlocks) + if l == 1 { + if this.BodyBlocks[0].Contains(offset) { + return this.BodyBlocks[0], true + } + return + } + + sort.Search(l, func(i int) bool { + if this.BodyBlocks[i].Contains(offset) { + blockInfo = this.BodyBlocks[i] + ok = true + return true + } + return this.BodyBlocks[i].OriginOffsetFrom > offset + }) + + return +} + +func (this *FileHeader) MaxOffset() int64 { + var l = len(this.BodyBlocks) + if l > 0 { + return this.BodyBlocks[l-1].OriginOffsetTo + } + return 0 +} diff --git a/internal/utils/bfs/file_header_test.go b/internal/utils/bfs/file_header_test.go new file mode 100644 index 0000000..f7e8c66 --- /dev/null +++ b/internal/utils/bfs/file_header_test.go @@ -0,0 +1,199 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package bfs_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/utils/bfs" + "github.com/iwind/TeaGo/assert" + "github.com/iwind/TeaGo/logs" + "testing" +) + +func TestFileHeader_Compact(t *testing.T) { + var a = assert.NewAssertion(t) + + { + var header = &bfs.FileHeader{ + Version: 1, + Status: 200, + BodySize: 100, + BodyBlocks: []bfs.BlockInfo{ + { + OriginOffsetFrom: 0, + OriginOffsetTo: 100, + }, + }, + } + header.Compact() + a.IsTrue(header.IsCompleted) + } + + { + var header = &bfs.FileHeader{ + Version: 1, + Status: 200, + BodySize: 200, + BodyBlocks: []bfs.BlockInfo{ + { + OriginOffsetFrom: 100, + OriginOffsetTo: 200, + }, + { + OriginOffsetFrom: 0, + OriginOffsetTo: 100, + }, + }, + } + header.Compact() + a.IsTrue(header.IsCompleted) + } + + { + var header = &bfs.FileHeader{ + Version: 1, + Status: 200, + BodySize: 200, + BodyBlocks: []bfs.BlockInfo{ + { + OriginOffsetFrom: 10, + OriginOffsetTo: 99, + }, + { + OriginOffsetFrom: 110, + OriginOffsetTo: 200, + }, + { + OriginOffsetFrom: 88, + OriginOffsetTo: 120, + }, + { + OriginOffsetFrom: 0, + OriginOffsetTo: 100, + }, + }, + } + header.Compact() + a.IsTrue(header.IsCompleted) + } + + { + var header = &bfs.FileHeader{ + Version: 1, + Status: 200, + BodySize: 100, + BodyBlocks: []bfs.BlockInfo{ + { + OriginOffsetFrom: 10, + OriginOffsetTo: 100, + }, + { + OriginOffsetFrom: 100, + OriginOffsetTo: 200, + }, + }, + } + header.Compact() + a.IsFalse(header.IsCompleted) + } + + { + var header = &bfs.FileHeader{ + Version: 1, + Status: 200, + BodySize: 200, + BodyBlocks: []bfs.BlockInfo{ + { + OriginOffsetFrom: 0, + OriginOffsetTo: 100, + }, + { + OriginOffsetFrom: 100, + OriginOffsetTo: 199, + }, + }, + } + header.Compact() + a.IsFalse(header.IsCompleted) + } + + { + var header = &bfs.FileHeader{ + Version: 1, + Status: 200, + BodySize: 200, + BodyBlocks: []bfs.BlockInfo{ + { + OriginOffsetFrom: 0, + OriginOffsetTo: 100, + }, + { + OriginOffsetFrom: 101, + OriginOffsetTo: 200, + }, + }, + } + header.Compact() + a.IsFalse(header.IsCompleted) + } +} + +func TestFileHeader_Clone(t *testing.T) { + var a = assert.NewAssertion(t) + + var header = &bfs.FileHeader{ + Version: 1, + Status: 200, + BodyBlocks: []bfs.BlockInfo{ + { + BFileOffsetFrom: 0, + BFileOffsetTo: 100, + }, + }, + } + + var clonedHeader = header.Clone() + t.Log("=== cloned header ===") + logs.PrintAsJSON(clonedHeader, t) + a.IsTrue(len(clonedHeader.BodyBlocks) == 1) + + header.BodyBlocks = append(header.BodyBlocks, bfs.BlockInfo{ + BFileOffsetFrom: 100, + BFileOffsetTo: 200, + }) + header.BodyBlocks = append(header.BodyBlocks, bfs.BlockInfo{ + BFileOffsetFrom: 300, + BFileOffsetTo: 400, + }) + + clonedHeader.BodyBlocks[0].OriginOffsetFrom = 100000000 + + t.Log("=== after changed ===") + logs.PrintAsJSON(clonedHeader, t) + a.IsTrue(len(clonedHeader.BodyBlocks) == 1) + + t.Log("=== original header ===") + logs.PrintAsJSON(header, t) + a.IsTrue(header.BodyBlocks[0].OriginOffsetFrom != clonedHeader.BodyBlocks[0].OriginOffsetFrom) +} + +func BenchmarkFileHeader_Compact(b *testing.B) { + for i := 0; i < b.N; i++ { + var header = &bfs.FileHeader{ + Version: 1, + Status: 200, + BodySize: 200, + BodyBlocks: nil, + } + + for j := 0; j < 100; j++ { + header.BodyBlocks = append(header.BodyBlocks, bfs.BlockInfo{ + OriginOffsetFrom: int64(j * 100), + OriginOffsetTo: int64(j * 200), + BFileOffsetFrom: 0, + BFileOffsetTo: 0, + }) + } + + header.Compact() + } +} diff --git a/internal/utils/bfs/file_reader.go b/internal/utils/bfs/file_reader.go new file mode 100644 index 0000000..3bbdd2c --- /dev/null +++ b/internal/utils/bfs/file_reader.go @@ -0,0 +1,67 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package bfs + +import ( + "errors" + "github.com/iwind/TeaGo/types" + "io" + "os" +) + +type FileReader struct { + bFile *BlocksFile + fp *os.File + header *FileHeader + + pos int64 +} + +func NewFileReader(bFile *BlocksFile, fp *os.File, header *FileHeader) *FileReader { + return &FileReader{ + bFile: bFile, + fp: fp, + header: header, + } +} + +func (this *FileReader) Read(b []byte) (n int, err error) { + n, err = this.ReadAt(b, this.pos) + this.pos += int64(n) + + return +} + +func (this *FileReader) ReadAt(b []byte, offset int64) (n int, err error) { + if offset >= this.header.MaxOffset() { + err = io.EOF + return + } + + blockInfo, ok := this.header.BlockAt(offset) + if !ok { + err = errors.New("could not find block at '" + types.String(offset) + "'") + return + } + + var delta = offset - blockInfo.OriginOffsetFrom + var bFrom = blockInfo.BFileOffsetFrom + delta + var bTo = blockInfo.BFileOffsetTo + if bFrom > bTo { + err = errors.New("invalid block information") + return + } + + var bufLen = len(b) + if int64(bufLen) > bTo-bFrom { + bufLen = int(bTo - bFrom) + } + + n, err = this.fp.ReadAt(b[:bufLen], bFrom) + + return +} + +func (this *FileReader) Close() error { + return this.fp.Close() +} diff --git a/internal/utils/bfs/file_reader_test.go b/internal/utils/bfs/file_reader_test.go new file mode 100644 index 0000000..c2f00c2 --- /dev/null +++ b/internal/utils/bfs/file_reader_test.go @@ -0,0 +1,177 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package bfs_test + +import ( + "fmt" + "github.com/TeaOSLab/EdgeNode/internal/utils/bfs" + "io" + "testing" + "time" +) + +func TestFileReader_Read_SmallBuf(t *testing.T) { + bFile, err := bfs.NewBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) + if err != nil { + t.Fatal(err) + } + + reader, err := bFile.OpenFileReader(bfs.Hash("123456"), false) + if err != nil { + t.Fatal(err) + } + + defer func() { + _ = reader.Close() + }() + + var buf = make([]byte, 3) + for { + n, readErr := reader.Read(buf) + if n > 0 { + t.Log(string(buf[:n])) + } + if readErr != nil { + if readErr == io.EOF { + break + } + t.Fatal(readErr) + } + } +} + +func TestFileReader_Read_LargeBuff(t *testing.T) { + bFile, err := bfs.NewBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) + if err != nil { + t.Fatal(err) + } + + reader, err := bFile.OpenFileReader(bfs.Hash("123456"), false) + if err != nil { + t.Fatal(err) + } + + defer func() { + _ = reader.Close() + }() + + var buf = make([]byte, 128) + for { + n, readErr := reader.Read(buf) + if n > 0 { + t.Log(string(buf[:n])) + } + if readErr != nil { + if readErr == io.EOF { + break + } + t.Fatal(readErr) + } + } +} + +func TestFileReader_Read_LargeFile(t *testing.T) { + bFile, err := bfs.NewBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) + if err != nil { + t.Fatal(err) + } + + reader, err := bFile.OpenFileReader(bfs.Hash("123456@LARGE"), false) + if err != nil { + t.Fatal(err) + } + + defer func() { + _ = reader.Close() + }() + + var buf = make([]byte, 16<<10) + var totalSize int64 + var before = time.Now() + for { + n, readErr := reader.Read(buf) + if n > 0 { + totalSize += int64(n) + } + if readErr != nil { + if readErr == io.EOF { + break + } + t.Fatal(readErr) + } + } + t.Log("totalSize:", totalSize>>20, "MiB", "cost:", fmt.Sprintf("%.4fms", time.Since(before).Seconds()*1000)) +} + +func TestFileReader_ReadAt(t *testing.T) { + bFile, err := bfs.NewBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) + if err != nil { + t.Fatal(err) + } + + reader, err := bFile.OpenFileReader(bfs.Hash("123456"), false) + if err != nil { + t.Fatal(err) + } + + defer func() { + _ = reader.Close() + }() + + { + var buf = make([]byte, 3) + n, readErr := reader.ReadAt(buf, 0) + if n > 0 { + t.Log(string(buf[:n])) + } + if readErr != nil && readErr != io.EOF { + t.Fatal(readErr) + } + } + + { + var buf = make([]byte, 3) + n, readErr := reader.ReadAt(buf, 3) + if n > 0 { + t.Log(string(buf[:n])) + } + if readErr != nil && readErr != io.EOF { + t.Fatal(readErr) + } + } + + { + var buf = make([]byte, 11) + n, readErr := reader.ReadAt(buf, 3) + if n > 0 { + t.Log(string(buf[:n])) + } + if readErr != nil && readErr != io.EOF { + t.Fatal(readErr) + } + } + + { + var buf = make([]byte, 3) + n, readErr := reader.ReadAt(buf, 11) + if n > 0 { + t.Log(string(buf[:n])) + } + if readErr != nil && readErr != io.EOF { + t.Fatal(readErr) + } + } + + { + var buf = make([]byte, 3) + n, readErr := reader.ReadAt(buf, 1000) + if n > 0 { + t.Log(string(buf[:n])) + } else { + t.Log("EOF") + } + if readErr != nil && readErr != io.EOF { + t.Fatal(readErr) + } + } +} diff --git a/internal/utils/bfs/file_writer.go b/internal/utils/bfs/file_writer.go new file mode 100644 index 0000000..1ff6c6f --- /dev/null +++ b/internal/utils/bfs/file_writer.go @@ -0,0 +1,108 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package bfs + +import "errors" + +// FileWriter file writer +// not thread-safe +type FileWriter struct { + bFile *BlocksFile + hasMeta bool + hash string + + bodySize int64 + originOffset int64 + + realHeaderSize int64 + realBodySize int64 + isPartial bool +} + +func NewFileWriter(bFile *BlocksFile, hash string, bodySize int64, isPartial bool) (*FileWriter, error) { + if isPartial && bodySize <= 0 { + return nil, errors.New("invalid body size for partial content") + } + + return &FileWriter{ + bFile: bFile, + hash: hash, + bodySize: bodySize, + isPartial: isPartial, + }, nil +} + +func (this *FileWriter) WriteMeta(status int, expiresAt int64, expectedFileSize int64) error { + this.hasMeta = true + return this.bFile.mFile.WriteMeta(this.hash, status, expiresAt, expectedFileSize) +} + +func (this *FileWriter) WriteHeader(b []byte) (n int, err error) { + if !this.isPartial && !this.hasMeta { + err = errors.New("no meta found") + return + } + + n, err = this.bFile.Write(this.hash, BlockTypeHeader, b, -1) + this.realHeaderSize += int64(n) + return +} + +func (this *FileWriter) WriteBody(b []byte) (n int, err error) { + if !this.isPartial && !this.hasMeta { + err = errors.New("no meta found") + return + } + + n, err = this.bFile.Write(this.hash, BlockTypeBody, b, this.originOffset) + this.originOffset += int64(n) + this.realBodySize += int64(n) + return +} + +func (this *FileWriter) WriteBodyAt(b []byte, offset int64) (n int, err error) { + if !this.hasMeta { + err = errors.New("no meta found") + return + } + + if !this.isPartial { + err = errors.New("can not write body at specified offset: it is not a partial file") + return + } + + // still 'Write()' NOT 'WriteAt()' + this.originOffset = offset + n, err = this.bFile.Write(this.hash, BlockTypeBody, b, offset) + this.originOffset += int64(n) + return +} + +func (this *FileWriter) Close() error { + if !this.isPartial && !this.hasMeta { + return errors.New("no meta found") + } + + if this.isPartial { + if this.originOffset > this.bodySize { + return errors.New("unexpected body size") + } + this.realBodySize = this.bodySize + } else { + if this.bodySize > 0 && this.bodySize != this.realBodySize { + return errors.New("unexpected body size") + } + } + + err := this.bFile.mFile.WriteClose(this.hash, this.realHeaderSize, this.realBodySize) + if err != nil { + return err + } + + return this.bFile.Sync() +} + +func (this *FileWriter) Discard() error { + // TODO 需要测试 + return this.bFile.mFile.RemoveFile(this.hash) +} diff --git a/internal/utils/bfs/file_writer_test.go b/internal/utils/bfs/file_writer_test.go new file mode 100644 index 0000000..12a1f03 --- /dev/null +++ b/internal/utils/bfs/file_writer_test.go @@ -0,0 +1,134 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package bfs_test + +import ( + "bytes" + "github.com/TeaOSLab/EdgeNode/internal/utils/bfs" + "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" + "github.com/TeaOSLab/EdgeNode/internal/utils/testutils" + "github.com/iwind/TeaGo/logs" + "net/http" + "testing" + "time" +) + +func TestNewFileWriter(t *testing.T) { + bFile, err := bfs.NewBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) + if err != nil { + t.Fatal(err) + } + + defer func() { + if !testutils.IsSingleTesting() { + _ = bFile.RemoveAll() + } else { + _ = bFile.Close() + } + }() + + writer, err := bFile.OpenFileWriter(bfs.Hash("123456"), -1, false) + if err != nil { + t.Fatal(err) + } + + err = writer.WriteMeta(http.StatusOK, fasttime.Now().Unix()+3600, -1) + if err != nil { + t.Fatal(err) + } + + _, err = writer.WriteHeader([]byte("Content-Type: text/html; charset=utf-8")) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 3; i++ { + n, writeErr := writer.WriteBody([]byte("Hello,World")) + if writeErr != nil { + t.Fatal(writeErr) + } + + t.Log("wrote:", n, "bytes") + } + + err = writer.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestNewFileWriter_LargeFile(t *testing.T) { + bFile, err := bfs.NewBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) + if err != nil { + t.Fatal(err) + } + + defer func() { + if !testutils.IsSingleTesting() { + _ = bFile.RemoveAll() + } else { + _ = bFile.Close() + } + }() + + writer, err := bFile.OpenFileWriter(bfs.Hash("123456@LARGE"), -1, false) + if err != nil { + t.Fatal(err) + } + + err = writer.WriteMeta(http.StatusOK, fasttime.Now().Unix()+86400, -1) + if err != nil { + t.Fatal(err) + } + + var countBlocks = 1 << 10 + if !testutils.IsSingleTesting() { + countBlocks = 2 + } + + var data = bytes.Repeat([]byte{'A'}, 16<<10) + + var before = time.Now() + for i := 0; i < countBlocks; i++ { + _, err = writer.WriteBody(data) + if err != nil { + t.Fatal(err) + } + } + + err = writer.Close() + if err != nil { + t.Fatal(err) + } + + logs.Println("cost:", time.Since(before).Seconds()*1000, "ms") +} + +func TestFileWriter_WriteBodyAt(t *testing.T) { + bFile, err := bfs.NewBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions) + if err != nil { + t.Fatal(err) + } + + defer func() { + if !testutils.IsSingleTesting() { + _ = bFile.RemoveAll() + } else { + _ = bFile.Close() + } + }() + + writer, err := bFile.OpenFileWriter(bfs.Hash("123456"), 1<<20, true) + if err != nil { + t.Fatal(err) + } + + { + n, writeErr := writer.WriteBodyAt([]byte("Hello,World"), 1024) + if writeErr != nil { + t.Fatal(writeErr) + } + + t.Log("wrote:", n, "bytes") + } +} diff --git a/internal/utils/bfs/fs.go b/internal/utils/bfs/fs.go new file mode 100644 index 0000000..2e1bfc6 --- /dev/null +++ b/internal/utils/bfs/fs.go @@ -0,0 +1,170 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package bfs + +import ( + "errors" + "log" + "os" + "path/filepath" + "sync" + "time" +) + +type FS struct { + dir string + opt *FSOptions + + bMap map[string]*BlocksFile // name => *BlocksFile + mu *sync.RWMutex + isClosed bool + + syncTicker *time.Ticker +} + +func NewFS(dir string, options *FSOptions) *FS { + options.EnsureDefaults() + + var fs = &FS{ + dir: dir, + bMap: map[string]*BlocksFile{}, + mu: &sync.RWMutex{}, + opt: options, + syncTicker: time.NewTicker(1 * time.Second), + } + go fs.init() + return fs +} + +func (this *FS) init() { + // sync in background + for range this.syncTicker.C { + this.syncLoop() + } +} + +func (this *FS) OpenFileWriter(hash string, bodySize int64, isPartial bool) (*FileWriter, error) { + err := CheckHashErr(hash) + if err != nil { + return nil, err + } + + if isPartial && bodySize <= 0 { + return nil, errors.New("invalid body size for partial content") + } + + bPath, bName, err := this.bPathForHash(hash) + if err != nil { + return nil, err + } + + // check directory + // TODO 需要改成提示找不到文件的时候再检查 + _, err = os.Stat(filepath.Dir(bPath)) + if err != nil && os.IsNotExist(err) { + _ = os.MkdirAll(filepath.Dir(bPath), 0777) + } + + this.mu.Lock() + defer this.mu.Unlock() + bFile, ok := this.bMap[bName] + if ok { + return bFile.OpenFileWriter(hash, bodySize, isPartial) + } + + bFile, err = NewBlocksFile(bPath, &BlockFileOptions{ + BytesPerSync: this.opt.BytesPerSync, + }) + if err != nil { + return nil, err + } + this.bMap[bName] = bFile + return bFile.OpenFileWriter(hash, bodySize, isPartial) +} + +func (this *FS) OpenFileReader(hash string, isPartial bool) (*FileReader, error) { + err := CheckHashErr(hash) + if err != nil { + return nil, err + } + + _, bName, err := this.bPathForHash(hash) + if err != nil { + return nil, err + } + + this.mu.Lock() + defer this.mu.Unlock() + bFile, ok := this.bMap[bName] + if ok { + return bFile.OpenFileReader(hash, isPartial) + } + + return nil, os.ErrNotExist +} + +func (this *FS) RemoveFile(hash string) error { + // TODO 需要实现 + return nil +} + +func (this *FS) Close() error { + this.isClosed = true + + var lastErr error + this.mu.Lock() + for _, bFile := range this.bMap { + err := bFile.Close() + if err != nil { + lastErr = err + } + } + this.mu.Unlock() + return lastErr +} + +func (this *FS) bPathForHash(hash string) (path string, bName string, err error) { + err = CheckHashErr(hash) + if err != nil { + return "", "", err + } + + return this.dir + "/" + hash[:2] + "/" + hash[2:4] + BFileExt, hash[:4], nil +} + +func (this *FS) syncLoop() { + if this.isClosed { + return + } + + if this.opt.SyncTimeout <= 0 { + return + } + + var maxSyncFiles = this.opt.MaxSyncFiles + if maxSyncFiles <= 0 { + maxSyncFiles = 32 + } + + var bFiles []*BlocksFile + + this.mu.RLock() + for _, bFile := range this.bMap { + if time.Since(bFile.SyncAt()) > this.opt.SyncTimeout { + bFiles = append(bFiles, bFile) + maxSyncFiles-- + if maxSyncFiles <= 0 { + break + } + } + } + this.mu.RUnlock() + + for _, bFile := range bFiles { + err := bFile.ForceSync() + if err != nil { + // TODO 可以在options自定义一个logger + log.Println("BFS", "sync failed: "+err.Error()) + } + } +} diff --git a/internal/utils/bfs/fs_options.go b/internal/utils/bfs/fs_options.go new file mode 100644 index 0000000..7e295e1 --- /dev/null +++ b/internal/utils/bfs/fs_options.go @@ -0,0 +1,34 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package bfs + +import "time" + +type FSOptions struct { + MaxOpenFiles int // TODO 需要实现 + BytesPerSync int64 + SyncTimeout time.Duration + MaxSyncFiles int +} + +func (this *FSOptions) EnsureDefaults() { + if this.MaxOpenFiles <= 0 { + this.MaxOpenFiles = 4 << 10 + } + if this.BytesPerSync <= 0 { + this.BytesPerSync = 1 << 20 // TODO 根据硬盘实际写入速度进行调整 + } + if this.SyncTimeout <= 0 { + this.SyncTimeout = 1 * time.Second + } + if this.MaxSyncFiles <= 0 { + this.MaxSyncFiles = 32 + } +} + +var DefaultFSOptions = &FSOptions{ + MaxOpenFiles: 4 << 10, + BytesPerSync: 1 << 20, // TODO 根据硬盘实际写入速度进行调整 + SyncTimeout: 1 * time.Second, + MaxSyncFiles: 32, +} diff --git a/internal/utils/bfs/fs_test.go b/internal/utils/bfs/fs_test.go new file mode 100644 index 0000000..24fc053 --- /dev/null +++ b/internal/utils/bfs/fs_test.go @@ -0,0 +1,47 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package bfs_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/utils/bfs" + "github.com/TeaOSLab/EdgeNode/internal/utils/testutils" + "github.com/iwind/TeaGo/Tea" + _ "github.com/iwind/TeaGo/bootstrap" + "testing" + "time" +) + +func TestFS_OpenFileWriter(t *testing.T) { + var fs = bfs.NewFS(Tea.Root+"/data/bfs/test", bfs.DefaultFSOptions) + defer func() { + _ = fs.Close() + }() + + { + writer, err := fs.OpenFileWriter(bfs.Hash("123456"), 100, true) + if err != nil { + t.Fatal(err) + } + + _, err = writer.WriteBody([]byte("Hello, World")) + if err != nil { + t.Fatal(err) + } + } + + { + writer, err := fs.OpenFileWriter(bfs.Hash("123456"), 100, true) + if err != nil { + t.Fatal(err) + } + + _, err = writer.WriteBody([]byte("Hello, World")) + if err != nil { + t.Fatal(err) + } + } + + if testutils.IsSingleTesting() { + time.Sleep(2 * time.Second) + } +} diff --git a/internal/utils/bfs/hash.go b/internal/utils/bfs/hash.go new file mode 100644 index 0000000..6f1d4d4 --- /dev/null +++ b/internal/utils/bfs/hash.go @@ -0,0 +1,36 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package bfs + +import ( + "fmt" + stringutil "github.com/iwind/TeaGo/utils/string" +) + +var HashLen = 32 + +// CheckHash check hash string format +func CheckHash(hash string) bool { + if len(hash) != HashLen { + return false + } + + for _, b := range hash { + if !((b >= '0' && b <= '9') || (b >= 'a' && b <= 'f')) { + return false + } + } + + return true +} + +func CheckHashErr(hash string) error { + if CheckHash(hash) { + return nil + } + return fmt.Errorf("check hash '%s' failed: %w", hash, ErrInvalidHash) +} + +func Hash(s string) string { + return stringutil.Md5(s) +} diff --git a/internal/utils/bfs/hash_test.go b/internal/utils/bfs/hash_test.go new file mode 100644 index 0000000..8c01c69 --- /dev/null +++ b/internal/utils/bfs/hash_test.go @@ -0,0 +1,27 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package bfs_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/utils/bfs" + "github.com/iwind/TeaGo/assert" + "math/rand" + "strconv" + "strings" + "testing" +) + +func TestCheckHash(t *testing.T) { + var a = assert.NewAssertion(t) + + a.IsFalse(bfs.CheckHash("123456")) + a.IsFalse(bfs.CheckHash(strings.Repeat("A", 32))) + a.IsTrue(bfs.CheckHash(strings.Repeat("a", 32))) + a.IsTrue(bfs.CheckHash(bfs.Hash("123456"))) +} + +func BenchmarkCheckHashErr(b *testing.B) { + for i := 0; i < b.N; i++ { + _ = bfs.CheckHash(bfs.Hash(strconv.Itoa(rand.Int()))) + } +} diff --git a/internal/utils/bfs/meta_block.go b/internal/utils/bfs/meta_block.go new file mode 100644 index 0000000..0096ffa --- /dev/null +++ b/internal/utils/bfs/meta_block.go @@ -0,0 +1,48 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package bfs + +import ( + "encoding/binary" + "errors" +) + +type MetaAction = byte + +const ( + MetaActionNew MetaAction = '+' + MetaActionRemove MetaAction = '-' +) + +func EncodeMetaBlock(action MetaAction, hash string, data []byte) ([]byte, error) { + var hl = len(hash) + if hl != HashLen { + return nil, errors.New("invalid hash length") + } + + var l = 1 /** Action **/ + hl /** Hash **/ + len(data) + + var b = make([]byte, 4 /** Len **/ +l) + binary.BigEndian.PutUint32(b, uint32(l)) + b[4] = action + copy(b[5:], hash) + copy(b[5+hl:], data) + return b, nil +} + +func DecodeMetaBlock(blockBytes []byte) (action MetaAction, hash string, data []byte, err error) { + var dataOffset = 4 /** Len **/ + HashLen + 1 /** Action **/ + if len(blockBytes) < dataOffset { + err = errors.New("decode failed: invalid block data") + return + } + + action = blockBytes[4] + hash = string(blockBytes[5 : 5+HashLen]) + + if action == MetaActionNew { + data = blockBytes[dataOffset:] + } + + return +} diff --git a/internal/utils/bfs/meta_block_test.go b/internal/utils/bfs/meta_block_test.go new file mode 100644 index 0000000..9284b14 --- /dev/null +++ b/internal/utils/bfs/meta_block_test.go @@ -0,0 +1,52 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package bfs_test + +import ( + "bytes" + "github.com/TeaOSLab/EdgeNode/internal/utils/bfs" + "github.com/iwind/TeaGo/assert" + "testing" +) + +func TestMetaBlock(t *testing.T) { + var a = assert.NewAssertion(t) + + { + var srcHash = bfs.Hash("a") + b, err := bfs.EncodeMetaBlock(bfs.MetaActionNew, srcHash, []byte{1, 2, 3}) + if err != nil { + t.Fatal(err) + } + t.Log(b) + + { + action, hash, data, decodeErr := bfs.DecodeMetaBlock(b) + if decodeErr != nil { + t.Fatal(err) + } + a.IsTrue(action == bfs.MetaActionNew) + a.IsTrue(hash == srcHash) + a.IsTrue(bytes.Equal(data, []byte{1, 2, 3})) + } + } + + { + var srcHash = bfs.Hash("bcd") + + b, err := bfs.EncodeMetaBlock(bfs.MetaActionRemove, srcHash, []byte{1, 2, 3}) + if err != nil { + t.Fatal(err) + } + t.Log(b) + { + action, hash, data, decodeErr := bfs.DecodeMetaBlock(b) + if decodeErr != nil { + t.Fatal(err) + } + a.IsTrue(action == bfs.MetaActionRemove) + a.IsTrue(hash == srcHash) + a.IsTrue(len(data) == 0) + } + } +} diff --git a/internal/utils/bfs/meta_file.go b/internal/utils/bfs/meta_file.go new file mode 100644 index 0000000..c9bb1f4 --- /dev/null +++ b/internal/utils/bfs/meta_file.go @@ -0,0 +1,369 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package bfs + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" + "github.com/TeaOSLab/EdgeNode/internal/zero" + "github.com/klauspost/compress/gzip" + "io" + "os" + "sync" +) + +const MFileExt = ".m" +const Version1 = 1 + +type MetaFile struct { + fp *os.File + filename string + headerMap map[string]*FileHeader // hash => *FileHeader + mu *sync.RWMutex // TODO 考虑单独一个,不要和bFile共享? + + isModified bool + modifiedHashMap map[string]zero.Zero +} + +func NewMetaFile(filename string, mu *sync.RWMutex) (*MetaFile, error) { + fp, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + return nil, err + } + + var mFile = &MetaFile{ + filename: filename, + fp: fp, + headerMap: map[string]*FileHeader{}, + mu: mu, + modifiedHashMap: map[string]zero.Zero{}, + } + + // 从文件中加载已有的文件头信息 + err = mFile.load() + if err != nil { + return nil, err + } + + return mFile, nil +} + +func (this *MetaFile) load() error { + _, err := this.fp.Seek(0, io.SeekStart) + if err != nil { + return err + } + + // TODO 考虑文件最后一行未写完整的情形 + + var buf = make([]byte, 4<<10) + var blockBytes []byte + for { + n, readErr := this.fp.Read(buf) + if n > 0 { + blockBytes = append(blockBytes, buf[:n]...) + for len(blockBytes) > 4 { + var l = int(binary.BigEndian.Uint32(blockBytes[:4])) + 4 /* Len **/ + if len(blockBytes) < l { + break + } + + action, hash, data, decodeErr := DecodeMetaBlock(blockBytes[:l]) + if decodeErr != nil { + return decodeErr + } + + switch action { + case MetaActionNew: + header, decodeHeaderErr := this.decodeHeader(data) + if decodeHeaderErr != nil { + return decodeHeaderErr + } + this.headerMap[hash] = header + case MetaActionRemove: + delete(this.headerMap, hash) + } + + blockBytes = blockBytes[l:] + } + } + if readErr != nil { + if readErr == io.EOF { + break + } + return readErr + } + } + + return nil +} + +func (this *MetaFile) WriteMeta(hash string, status int, expiresAt int64, expectedFileSize int64) error { + this.mu.Lock() + defer this.mu.Unlock() + + this.headerMap[hash] = &FileHeader{ + Version: Version1, + ExpiresAt: expiresAt, + Status: status, + ExpiredBodySize: expectedFileSize, + IsWriting: true, + } + + this.modifiedHashMap[hash] = zero.Zero{} + + return nil +} + +func (this *MetaFile) WriteHeaderBlockUnsafe(hash string, bOffsetFrom int64, bOffsetTo int64) error { + header, ok := this.headerMap[hash] + if !ok { + return nil + } + + // TODO 合并相邻block + header.HeaderBlocks = append(header.HeaderBlocks, BlockInfo{ + BFileOffsetFrom: bOffsetFrom, + BFileOffsetTo: bOffsetTo, + }) + + this.modifiedHashMap[hash] = zero.Zero{} + + return nil +} + +func (this *MetaFile) WriteBodyBlockUnsafe(hash string, bOffsetFrom int64, bOffsetTo int64, originOffsetFrom int64, originOffsetTo int64) error { + header, ok := this.headerMap[hash] + if !ok { + return nil + } + + // TODO 合并相邻block + header.BodyBlocks = append(header.BodyBlocks, BlockInfo{ + OriginOffsetFrom: originOffsetFrom, + OriginOffsetTo: originOffsetTo, + BFileOffsetFrom: bOffsetFrom, + BFileOffsetTo: bOffsetTo, + }) + + this.modifiedHashMap[hash] = zero.Zero{} + + return nil +} + +func (this *MetaFile) WriteClose(hash string, headerSize int64, bodySize int64) error { + // TODO 考虑单个hash多次重复调用的情况 + + this.mu.Lock() + header, ok := this.headerMap[hash] + if ok { + // TODO 计算headerSize, bodySize + // TODO 检查bodySize和expectedBodySize是否一致,如果不一致则从headerMap中删除 + + header.ModifiedAt = fasttime.Now().Unix() + header.HeaderSize = headerSize + header.BodySize = bodySize + header.Compact() + } + this.mu.Unlock() + if !ok { + return nil + } + + blockBytes, err := this.encodeHeader(hash, header) + if err != nil { + return err + } + + this.mu.Lock() + defer this.mu.Unlock() + + _, err = this.fp.Seek(0, io.SeekEnd) + if err != nil { + return err + } + + // TODO 考虑自动sync的机制 + _, err = this.fp.Write(blockBytes) + this.isModified = true + return err +} + +func (this *MetaFile) RemoveFile(hash string) error { + this.mu.Lock() + defer this.mu.Unlock() + + _, ok := this.headerMap[hash] + if ok { + delete(this.headerMap, hash) + } + + if ok { + blockBytes, err := EncodeMetaBlock(MetaActionRemove, hash, nil) + if err != nil { + return err + } + + _, err = this.fp.Write(blockBytes) + if err != nil { + return err + } + this.isModified = true + } + + return nil +} + +func (this *MetaFile) Header(hash string) (header *FileHeader, ok bool) { + this.mu.RLock() + defer this.mu.RUnlock() + header, ok = this.headerMap[hash] + return +} + +func (this *MetaFile) CloneHeader(hash string) (header *FileHeader, ok bool) { + this.mu.RLock() + defer this.mu.RUnlock() + header, ok = this.headerMap[hash] + if !ok { + return + } + + header = header.Clone() + return +} + +func (this *MetaFile) Headers() map[string]*FileHeader { + this.mu.RLock() + defer this.mu.RUnlock() + return this.headerMap +} + +// Compact the meta file +// TODO 考虑自动Compact的时机(脏数据比例?) +func (this *MetaFile) Compact() error { + this.mu.Lock() + defer this.mu.Unlock() + + var buf = bytes.NewBuffer(nil) + for hash, header := range this.headerMap { + blockBytes, err := this.encodeHeader(hash, header) + if err != nil { + return err + } + buf.Write(blockBytes) + } + + err := this.fp.Truncate(int64(buf.Len())) + if err != nil { + return err + } + + _, err = this.fp.Seek(0, io.SeekStart) + if err != nil { + return err + } + + _, err = this.fp.Write(buf.Bytes()) + this.isModified = true + return err +} + +func (this *MetaFile) SyncUnsafe() error { + if !this.isModified { + return nil + } + + err := this.fp.Sync() + if err != nil { + return err + } + + for hash := range this.modifiedHashMap { + header, ok := this.headerMap[hash] + if ok { + header.IsWriting = false + } + } + + this.isModified = false + this.modifiedHashMap = map[string]zero.Zero{} + return nil +} + +func (this *MetaFile) Close() error { + return this.fp.Close() +} + +// RemoveAll 删除所有数据 +func (this *MetaFile) RemoveAll() error { + _ = this.fp.Close() + return os.Remove(this.fp.Name()) +} + +func (this *MetaFile) encodeHeader(hash string, header *FileHeader) ([]byte, error) { + headerJSON, err := json.Marshal(header) + if err != nil { + return nil, err + } + + var buf = utils.SharedBufferPool.Get() + defer utils.SharedBufferPool.Put(buf) + + // TODO 考虑使用gzip pool + gzWriter, err := gzip.NewWriterLevel(buf, gzip.BestSpeed) + if err != nil { + return nil, err + } + + _, err = gzWriter.Write(headerJSON) + if err != nil { + _ = gzWriter.Close() + return nil, err + } + + err = gzWriter.Close() + if err != nil { + return nil, err + } + + return EncodeMetaBlock(MetaActionNew, hash, buf.Bytes()) +} + +func (this *MetaFile) decodeHeader(data []byte) (*FileHeader, error) { + gzReader, err := gzip.NewReader(bytes.NewBuffer(data)) + if err != nil { + return nil, err + } + + defer func() { + _ = gzReader.Close() + }() + + var resultBuf = bytes.NewBuffer(nil) + + var buf = make([]byte, 4096) + for { + n, readErr := gzReader.Read(buf) + if n > 0 { + resultBuf.Write(buf[:n]) + } + if readErr != nil { + if readErr == io.EOF { + break + } + return nil, readErr + } + } + + var header = &FileHeader{} + err = json.Unmarshal(resultBuf.Bytes(), header) + if err != nil { + return nil, err + } + + return header, nil +} diff --git a/internal/utils/bfs/meta_file_test.go b/internal/utils/bfs/meta_file_test.go new file mode 100644 index 0000000..dbcd2e7 --- /dev/null +++ b/internal/utils/bfs/meta_file_test.go @@ -0,0 +1,126 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package bfs_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/utils/bfs" + "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" + "github.com/iwind/TeaGo/logs" + "sync" + "testing" +) + +func TestNewMetaFile(t *testing.T) { + mFile, err := bfs.NewMetaFile("testdata/test.m", &sync.RWMutex{}) + if err != nil { + t.Fatal(err) + } + defer func() { + _ = mFile.Close() + }() + + var header, _ = mFile.Header(bfs.Hash("123456")) + logs.PrintAsJSON(header, t) + //logs.PrintAsJSON(mFile.Headers(), t) +} + +func TestMetaFile_WriteMeta(t *testing.T) { + mFile, err := bfs.NewMetaFile("testdata/test.m", &sync.RWMutex{}) + if err != nil { + t.Fatal(err) + } + defer func() { + _ = mFile.Close() + }() + + var hash = bfs.Hash("123456") + err = mFile.WriteMeta(hash, 200, fasttime.Now().Unix()+3600, -1) + if err != nil { + t.Fatal(err) + } + + err = mFile.WriteHeaderBlockUnsafe(hash, 123, 223) + if err != nil { + t.Fatal(err) + } + + err = mFile.WriteBodyBlockUnsafe(hash, 223, 323, 0, 100) + if err != nil { + t.Fatal(err) + } + + err = mFile.WriteBodyBlockUnsafe(hash, 323, 423, 100, 200) + if err != nil { + t.Fatal(err) + } + + err = mFile.WriteClose(hash, 100, 200) + if err != nil { + t.Fatal(err) + } + + //logs.PrintAsJSON(mFile.Header(hash), t) +} + +func TestMetaFile_Write(t *testing.T) { + mFile, err := bfs.NewMetaFile("testdata/test.m", &sync.RWMutex{}) + if err != nil { + t.Fatal(err) + } + defer func() { + _ = mFile.Close() + }() + + var hash = bfs.Hash("123456") + + err = mFile.WriteBodyBlockUnsafe(hash, 0, 100, 0, 100) + if err != nil { + t.Fatal(err) + } + + err = mFile.WriteClose(hash, 0, 100) + if err != nil { + t.Fatal(err) + } +} + +func TestMetaFile_RemoveFile(t *testing.T) { + mFile, err := bfs.NewMetaFile("testdata/test.m", &sync.RWMutex{}) + if err != nil { + t.Fatal(err) + } + defer func() { + _ = mFile.Close() + }() + + err = mFile.RemoveFile(bfs.Hash("123456")) + if err != nil { + t.Fatal(err) + } +} + +func TestMetaFile_Compact(t *testing.T) { + mFile, err := bfs.NewMetaFile("testdata/test.m", &sync.RWMutex{}) + if err != nil { + t.Fatal(err) + } + defer func() { + _ = mFile.Close() + }() + + err = mFile.Compact() + if err != nil { + t.Fatal(err) + } +} + +func TestMetaFile_RemoveAll(t *testing.T) { + mFile, err := bfs.NewMetaFile("testdata/test.m", &sync.RWMutex{}) + if err != nil { + t.Fatal(err) + } + err = mFile.RemoveAll() + if err != nil { + t.Fatal(err) + } +}