bfs: 增加读写线程限制

This commit is contained in:
刘祥超
2024-04-27 07:09:14 +08:00
parent 8c044faace
commit 04007bf8f1
5 changed files with 121 additions and 13 deletions

View File

@@ -33,6 +33,8 @@ type BlocksFile struct {
writtenBytes int64
syncAt time.Time
readerPool chan *FileReader
}
func NewBlocksFileWithRawFile(fp *os.File, options *BlockFileOptions) (*BlocksFile, error) {
@@ -52,7 +54,9 @@ func NewBlocksFileWithRawFile(fp *os.File, options *BlockFileOptions) (*BlocksFi
return nil, fmt.Errorf("load '%s' failed: %w", mFilename, err)
}
AckReadThread()
_, err = fp.Seek(0, io.SeekEnd)
ReleaseReadThread()
if err != nil {
_ = fp.Close()
_ = mFile.Close()
@@ -60,11 +64,12 @@ func NewBlocksFileWithRawFile(fp *os.File, options *BlockFileOptions) (*BlocksFi
}
return &BlocksFile{
fp: fp,
mFile: mFile,
mu: mu,
opt: options,
syncAt: time.Now(),
fp: fp,
mFile: mFile,
mu: mu,
opt: options,
syncAt: time.Now(),
readerPool: make(chan *FileReader, 32),
}, nil
}
@@ -112,7 +117,9 @@ func (this *BlocksFile) Write(hash string, blockType BlockType, b []byte, origin
return
}
AckWriteThread()
n, err = this.fp.Write(b)
ReleaseWriteThread()
if err == nil {
if n > 0 {
@@ -156,8 +163,6 @@ func (this *BlocksFile) OpenFileReader(fileHash string, isPartial bool) (*FileRe
return nil, err
}
// TODO 需要设置单个BFile文件的maxOpenFiles
this.mu.RLock()
err = this.checkStatus()
this.mu.RUnlock()
@@ -181,6 +186,14 @@ func (this *BlocksFile) OpenFileReader(fileHash string, isPartial bool) (*FileRe
return nil, os.ErrNotExist
}
// 先尝试从Pool中获取
select {
case reader := <-this.readerPool:
reader.Reset(header)
return reader, nil
default:
}
fp, err := os.Open(this.fp.Name())
if err != nil {
return nil, err
@@ -188,6 +201,15 @@ func (this *BlocksFile) OpenFileReader(fileHash string, isPartial bool) (*FileRe
return NewFileReader(this, fp, header), nil
}
func (this *BlocksFile) CloseFileReader(reader *FileReader) error {
select {
case this.readerPool <- reader:
return nil
default:
return reader.Free()
}
}
func (this *BlocksFile) ExistFile(fileHash string) bool {
err := CheckHashErr(fileHash)
if err != nil {
@@ -246,6 +268,8 @@ func (this *BlocksFile) RemoveAll() error {
this.isClosed = true
_ = this.mFile.RemoveAll()
this.closeReaderPool()
_ = this.fp.Close()
return os.Remove(this.fp.Name())
}
@@ -263,6 +287,8 @@ func (this *BlocksFile) Close() error {
_ = this.mFile.Close()
this.closeReaderPool()
return this.fp.Close()
}
@@ -286,7 +312,9 @@ func (this *BlocksFile) sync(force bool) error {
this.writtenBytes = 0
AckWriteThread()
err := this.fp.Sync()
ReleaseWriteThread()
if err != nil {
return err
}
@@ -299,3 +327,14 @@ func (this *BlocksFile) sync(force bool) error {
return nil
}
func (this *BlocksFile) closeReaderPool() {
for {
select {
case reader := <-this.readerPool:
_ = reader.Free()
default:
return
}
}
}

View File

@@ -4,7 +4,6 @@ package bfs
type BlockFileOptions struct {
BytesPerSync int64
MaxOpenFiles int // TODO 需要实现主要用于OpenFileReader
}
func (this *BlockFileOptions) EnsureDefaults() {

View File

@@ -10,11 +10,12 @@ import (
)
type FileReader struct {
bFile *BlocksFile
fp *os.File
fileHeader *FileHeader
bFile *BlocksFile
fp *os.File
pos int64
fileHeader *FileHeader
pos int64
bPos int64
}
func NewFileReader(bFile *BlocksFile, fp *os.File, fileHeader *FileHeader) *FileReader {
@@ -22,6 +23,7 @@ func NewFileReader(bFile *BlocksFile, fp *os.File, fileHeader *FileHeader) *File
bFile: bFile,
fp: fp,
fileHeader: fileHeader,
bPos: -1,
}
}
@@ -61,11 +63,35 @@ func (this *FileReader) ReadAt(b []byte, offset int64) (n int, err error) {
bufLen = int(bTo - bFrom)
}
n, err = this.fp.ReadAt(b[:bufLen], bFrom)
AckReadThread()
defer ReleaseReadThread()
if bFrom == this.bPos { // read continuous
n, err = this.fp.Read(b[:bufLen])
} else { // read from offset
_, err = this.fp.Seek(bFrom, io.SeekStart)
if err != nil {
return
}
n, err = this.fp.Read(b[:bufLen])
}
if n > 0 {
this.bPos = bFrom + int64(n)
}
return
}
func (this *FileReader) Reset(fileHeader *FileHeader) {
this.fileHeader = fileHeader
this.pos = 0
this.bPos = -1
}
func (this *FileReader) Close() error {
return this.bFile.CloseFileReader(this)
}
func (this *FileReader) Free() error {
return this.fp.Close()
}

View File

@@ -52,7 +52,9 @@ func OpenMetaFile(filename string, mu *sync.RWMutex) (*MetaFile, error) {
}
func (this *MetaFile) load() error {
AckReadThread()
_, err := this.fp.Seek(0, io.SeekStart)
ReleaseReadThread()
if err != nil {
return err
}
@@ -62,7 +64,9 @@ func (this *MetaFile) load() error {
var buf = make([]byte, 4<<10)
var blockBytes []byte
for {
AckReadThread()
n, readErr := this.fp.Read(buf)
ReleaseReadThread()
if n > 0 {
blockBytes = append(blockBytes, buf[:n]...)
for len(blockBytes) > 4 {
@@ -180,12 +184,17 @@ func (this *MetaFile) WriteClose(hash string, headerSize int64, bodySize int64)
this.mu.Lock()
defer this.mu.Unlock()
AckReadThread()
_, err = this.fp.Seek(0, io.SeekEnd)
ReleaseReadThread()
if err != nil {
return err
}
AckWriteThread()
_, err = this.fp.Write(blockBytes)
ReleaseWriteThread()
this.isModified = true
return err
}
@@ -205,7 +214,9 @@ func (this *MetaFile) RemoveFile(hash string) error {
return err
}
AckWriteThread()
_, err = this.fp.Write(blockBytes)
ReleaseWriteThread()
if err != nil {
return err
}
@@ -263,17 +274,23 @@ func (this *MetaFile) Compact() error {
buf.Write(blockBytes)
}
AckWriteThread()
err := this.fp.Truncate(int64(buf.Len()))
ReleaseWriteThread()
if err != nil {
return err
}
AckReadThread()
_, err = this.fp.Seek(0, io.SeekStart)
ReleaseReadThread()
if err != nil {
return err
}
AckWriteThread()
_, err = this.fp.Write(buf.Bytes())
ReleaseWriteThread()
this.isModified = true
return err
}
@@ -283,7 +300,9 @@ func (this *MetaFile) SyncUnsafe() error {
return nil
}
AckWriteThread()
err := this.fp.Sync()
ReleaseWriteThread()
if err != nil {
return err
}

View File

@@ -0,0 +1,25 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package bfs
import "github.com/TeaOSLab/EdgeNode/internal/zero"
// TODO 使用atomic代替channel需要使用基准测试对比性能
var readThreadsLimiter = make(chan zero.Zero, 16)
var writeThreadsLimiter = make(chan zero.Zero, 16)
func AckReadThread() {
readThreadsLimiter <- zero.Zero{}
}
func ReleaseReadThread() {
<-readThreadsLimiter
}
func AckWriteThread() {
writeThreadsLimiter <- zero.Zero{}
}
func ReleaseWriteThread() {
<-writeThreadsLimiter
}