diff --git a/internal/utils/bfs/blocks_file.go b/internal/utils/bfs/blocks_file.go index d73b17e..af8f73c 100644 --- a/internal/utils/bfs/blocks_file.go +++ b/internal/utils/bfs/blocks_file.go @@ -33,6 +33,8 @@ type BlocksFile struct { writtenBytes int64 syncAt time.Time + + readerPool chan *FileReader } func NewBlocksFileWithRawFile(fp *os.File, options *BlockFileOptions) (*BlocksFile, error) { @@ -52,7 +54,9 @@ func NewBlocksFileWithRawFile(fp *os.File, options *BlockFileOptions) (*BlocksFi return nil, fmt.Errorf("load '%s' failed: %w", mFilename, err) } + AckReadThread() _, err = fp.Seek(0, io.SeekEnd) + ReleaseReadThread() if err != nil { _ = fp.Close() _ = mFile.Close() @@ -60,11 +64,12 @@ func NewBlocksFileWithRawFile(fp *os.File, options *BlockFileOptions) (*BlocksFi } return &BlocksFile{ - fp: fp, - mFile: mFile, - mu: mu, - opt: options, - syncAt: time.Now(), + fp: fp, + mFile: mFile, + mu: mu, + opt: options, + syncAt: time.Now(), + readerPool: make(chan *FileReader, 32), }, nil } @@ -112,7 +117,9 @@ func (this *BlocksFile) Write(hash string, blockType BlockType, b []byte, origin return } + AckWriteThread() n, err = this.fp.Write(b) + ReleaseWriteThread() if err == nil { if n > 0 { @@ -156,8 +163,6 @@ func (this *BlocksFile) OpenFileReader(fileHash string, isPartial bool) (*FileRe return nil, err } - // TODO 需要设置单个BFile文件的maxOpenFiles - this.mu.RLock() err = this.checkStatus() this.mu.RUnlock() @@ -181,6 +186,14 @@ func (this *BlocksFile) OpenFileReader(fileHash string, isPartial bool) (*FileRe return nil, os.ErrNotExist } + // 先尝试从Pool中获取 + select { + case reader := <-this.readerPool: + reader.Reset(header) + return reader, nil + default: + } + fp, err := os.Open(this.fp.Name()) if err != nil { return nil, err @@ -188,6 +201,15 @@ func (this *BlocksFile) OpenFileReader(fileHash string, isPartial bool) (*FileRe return NewFileReader(this, fp, header), nil } +func (this *BlocksFile) CloseFileReader(reader *FileReader) error { + select { + case this.readerPool <- reader: + return nil + default: + return reader.Free() + } +} + func (this *BlocksFile) ExistFile(fileHash string) bool { err := CheckHashErr(fileHash) if err != nil { @@ -246,6 +268,8 @@ func (this *BlocksFile) RemoveAll() error { this.isClosed = true _ = this.mFile.RemoveAll() + + this.closeReaderPool() _ = this.fp.Close() return os.Remove(this.fp.Name()) } @@ -263,6 +287,8 @@ func (this *BlocksFile) Close() error { _ = this.mFile.Close() + this.closeReaderPool() + return this.fp.Close() } @@ -286,7 +312,9 @@ func (this *BlocksFile) sync(force bool) error { this.writtenBytes = 0 + AckWriteThread() err := this.fp.Sync() + ReleaseWriteThread() if err != nil { return err } @@ -299,3 +327,14 @@ func (this *BlocksFile) sync(force bool) error { return nil } + +func (this *BlocksFile) closeReaderPool() { + for { + select { + case reader := <-this.readerPool: + _ = reader.Free() + default: + return + } + } +} diff --git a/internal/utils/bfs/blocks_file_options.go b/internal/utils/bfs/blocks_file_options.go index eba4cf1..40c603a 100644 --- a/internal/utils/bfs/blocks_file_options.go +++ b/internal/utils/bfs/blocks_file_options.go @@ -4,7 +4,6 @@ package bfs type BlockFileOptions struct { BytesPerSync int64 - MaxOpenFiles int // TODO 需要实现,主要用于OpenFileReader } func (this *BlockFileOptions) EnsureDefaults() { diff --git a/internal/utils/bfs/file_reader.go b/internal/utils/bfs/file_reader.go index bed3de5..de1719c 100644 --- a/internal/utils/bfs/file_reader.go +++ b/internal/utils/bfs/file_reader.go @@ -10,11 +10,12 @@ import ( ) type FileReader struct { - bFile *BlocksFile - fp *os.File - fileHeader *FileHeader + bFile *BlocksFile + fp *os.File - pos int64 + fileHeader *FileHeader + pos int64 + bPos int64 } func NewFileReader(bFile *BlocksFile, fp *os.File, fileHeader *FileHeader) *FileReader { @@ -22,6 +23,7 @@ func NewFileReader(bFile *BlocksFile, fp *os.File, fileHeader *FileHeader) *File bFile: bFile, fp: fp, fileHeader: fileHeader, + bPos: -1, } } @@ -61,11 +63,35 @@ func (this *FileReader) ReadAt(b []byte, offset int64) (n int, err error) { bufLen = int(bTo - bFrom) } - n, err = this.fp.ReadAt(b[:bufLen], bFrom) + AckReadThread() + defer ReleaseReadThread() + + if bFrom == this.bPos { // read continuous + n, err = this.fp.Read(b[:bufLen]) + } else { // read from offset + _, err = this.fp.Seek(bFrom, io.SeekStart) + if err != nil { + return + } + n, err = this.fp.Read(b[:bufLen]) + } + if n > 0 { + this.bPos = bFrom + int64(n) + } return } +func (this *FileReader) Reset(fileHeader *FileHeader) { + this.fileHeader = fileHeader + this.pos = 0 + this.bPos = -1 +} + func (this *FileReader) Close() error { + return this.bFile.CloseFileReader(this) +} + +func (this *FileReader) Free() error { return this.fp.Close() } diff --git a/internal/utils/bfs/meta_file.go b/internal/utils/bfs/meta_file.go index 81c58c7..d1f3ff1 100644 --- a/internal/utils/bfs/meta_file.go +++ b/internal/utils/bfs/meta_file.go @@ -52,7 +52,9 @@ func OpenMetaFile(filename string, mu *sync.RWMutex) (*MetaFile, error) { } func (this *MetaFile) load() error { + AckReadThread() _, err := this.fp.Seek(0, io.SeekStart) + ReleaseReadThread() if err != nil { return err } @@ -62,7 +64,9 @@ func (this *MetaFile) load() error { var buf = make([]byte, 4<<10) var blockBytes []byte for { + AckReadThread() n, readErr := this.fp.Read(buf) + ReleaseReadThread() if n > 0 { blockBytes = append(blockBytes, buf[:n]...) for len(blockBytes) > 4 { @@ -180,12 +184,17 @@ func (this *MetaFile) WriteClose(hash string, headerSize int64, bodySize int64) this.mu.Lock() defer this.mu.Unlock() + AckReadThread() _, err = this.fp.Seek(0, io.SeekEnd) + ReleaseReadThread() if err != nil { return err } + AckWriteThread() _, err = this.fp.Write(blockBytes) + ReleaseWriteThread() + this.isModified = true return err } @@ -205,7 +214,9 @@ func (this *MetaFile) RemoveFile(hash string) error { return err } + AckWriteThread() _, err = this.fp.Write(blockBytes) + ReleaseWriteThread() if err != nil { return err } @@ -263,17 +274,23 @@ func (this *MetaFile) Compact() error { buf.Write(blockBytes) } + AckWriteThread() err := this.fp.Truncate(int64(buf.Len())) + ReleaseWriteThread() if err != nil { return err } + AckReadThread() _, err = this.fp.Seek(0, io.SeekStart) + ReleaseReadThread() if err != nil { return err } + AckWriteThread() _, err = this.fp.Write(buf.Bytes()) + ReleaseWriteThread() this.isModified = true return err } @@ -283,7 +300,9 @@ func (this *MetaFile) SyncUnsafe() error { return nil } + AckWriteThread() err := this.fp.Sync() + ReleaseWriteThread() if err != nil { return err } diff --git a/internal/utils/bfs/thread_limiters.go b/internal/utils/bfs/thread_limiters.go new file mode 100644 index 0000000..d770c52 --- /dev/null +++ b/internal/utils/bfs/thread_limiters.go @@ -0,0 +1,25 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package bfs + +import "github.com/TeaOSLab/EdgeNode/internal/zero" + +// TODO 使用atomic代替channel?需要使用基准测试对比性能 +var readThreadsLimiter = make(chan zero.Zero, 16) +var writeThreadsLimiter = make(chan zero.Zero, 16) + +func AckReadThread() { + readThreadsLimiter <- zero.Zero{} +} + +func ReleaseReadThread() { + <-readThreadsLimiter +} + +func AckWriteThread() { + writeThreadsLimiter <- zero.Zero{} +} + +func ReleaseWriteThread() { + <-writeThreadsLimiter +}