bfs:实现maxOpenFiles

This commit is contained in:
刘祥超
2024-04-27 17:29:12 +08:00
parent 04007bf8f1
commit 801f5d4525
9 changed files with 318 additions and 54 deletions

View File

@@ -189,12 +189,17 @@ func (this *BlocksFile) OpenFileReader(fileHash string, isPartial bool) (*FileRe
// 先尝试从Pool中获取 // 先尝试从Pool中获取
select { select {
case reader := <-this.readerPool: case reader := <-this.readerPool:
if reader == nil {
return nil, ErrClosed
}
reader.Reset(header) reader.Reset(header)
return reader, nil return reader, nil
default: default:
} }
AckReadThread()
fp, err := os.Open(this.fp.Name()) fp, err := os.Open(this.fp.Name())
ReleaseReadThread()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -270,19 +275,22 @@ func (this *BlocksFile) RemoveAll() error {
_ = this.mFile.RemoveAll() _ = this.mFile.RemoveAll()
this.closeReaderPool() this.closeReaderPool()
_ = this.fp.Close() _ = this.fp.Close()
return os.Remove(this.fp.Name()) return os.Remove(this.fp.Name())
} }
// Close 关闭当前文件
func (this *BlocksFile) Close() error { func (this *BlocksFile) Close() error {
this.mu.Lock() this.mu.Lock()
defer this.mu.Unlock() defer this.mu.Unlock()
err := this.sync(true) if this.isClosed {
if err != nil { return nil
return err
} }
_ = this.sync(true)
this.isClosed = true this.isClosed = true
_ = this.mFile.Close() _ = this.mFile.Close()
@@ -292,6 +300,10 @@ func (this *BlocksFile) Close() error {
return this.fp.Close() return this.fp.Close()
} }
func (this *BlocksFile) TestReaderPool() chan *FileReader {
return this.readerPool
}
func (this *BlocksFile) checkStatus() error { func (this *BlocksFile) checkStatus() error {
if this.isClosed { if this.isClosed {
return ErrClosed return ErrClosed
@@ -332,7 +344,9 @@ func (this *BlocksFile) closeReaderPool() {
for { for {
select { select {
case reader := <-this.readerPool: case reader := <-this.readerPool:
if reader != nil {
_ = reader.Free() _ = reader.Free()
}
default: default:
return return
} }

View File

@@ -15,7 +15,6 @@ type FileReader struct {
fileHeader *FileHeader fileHeader *FileHeader
pos int64 pos int64
bPos int64
} }
func NewFileReader(bFile *BlocksFile, fp *os.File, fileHeader *FileHeader) *FileReader { func NewFileReader(bFile *BlocksFile, fp *os.File, fileHeader *FileHeader) *FileReader {
@@ -23,7 +22,6 @@ func NewFileReader(bFile *BlocksFile, fp *os.File, fileHeader *FileHeader) *File
bFile: bFile, bFile: bFile,
fp: fp, fp: fp,
fileHeader: fileHeader, fileHeader: fileHeader,
bPos: -1,
} }
} }
@@ -64,20 +62,8 @@ func (this *FileReader) ReadAt(b []byte, offset int64) (n int, err error) {
} }
AckReadThread() AckReadThread()
defer ReleaseReadThread() n, err = this.fp.ReadAt(b[:bufLen], bFrom)
ReleaseReadThread()
if bFrom == this.bPos { // read continuous
n, err = this.fp.Read(b[:bufLen])
} else { // read from offset
_, err = this.fp.Seek(bFrom, io.SeekStart)
if err != nil {
return
}
n, err = this.fp.Read(b[:bufLen])
}
if n > 0 {
this.bPos = bFrom + int64(n)
}
return return
} }
@@ -85,7 +71,6 @@ func (this *FileReader) ReadAt(b []byte, offset int64) (n int, err error) {
func (this *FileReader) Reset(fileHeader *FileHeader) { func (this *FileReader) Reset(fileHeader *FileHeader) {
this.fileHeader = fileHeader this.fileHeader = fileHeader
this.pos = 0 this.pos = 0
this.bPos = -1
} }
func (this *FileReader) Close() error { func (this *FileReader) Close() error {

View File

@@ -204,3 +204,34 @@ func TestFileReader_ReadAt(t *testing.T) {
} }
} }
} }
func TestFileReader_Pool(t *testing.T) {
bFile, openErr := bfs.OpenBlocksFile("testdata/test.b", bfs.DefaultBlockFileOptions)
if openErr != nil {
if os.IsNotExist(openErr) {
t.Log(openErr)
return
}
t.Fatal(openErr)
}
for i := 0; i < 10; i++ {
reader, err := bFile.OpenFileReader(bfs.Hash("123456"), false)
if err != nil {
if os.IsNotExist(err) {
continue
}
t.Fatal(err)
}
go func() {
err = reader.Close()
if err != nil {
t.Log(err)
}
}()
}
time.Sleep(100 * time.Millisecond)
t.Log(len(bFile.TestReaderPool()))
}

View File

@@ -5,17 +5,29 @@ package bfs
import ( import (
"errors" "errors"
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
"github.com/TeaOSLab/EdgeNode/internal/utils/linkedlist"
"github.com/TeaOSLab/EdgeNode/internal/zero"
"log" "log"
"runtime"
"sync" "sync"
"time" "time"
) )
// FS 文件系统管理 func IsEnabled() bool {
return runtime.GOARCH == "amd64" || runtime.GOARCH == "arm64"
}
// FS 文件系统对象
type FS struct { type FS struct {
dir string dir string
opt *FSOptions opt *FSOptions
bMap map[string]*BlocksFile // name => *BlocksFile bMap map[string]*BlocksFile // name => *BlocksFile
bList *linkedlist.List[string] // [bName]
bItemMap map[string]*linkedlist.Item[string]
closingBMap map[string]zero.Zero // filename => Zero
closingBChan chan *BlocksFile
mu *sync.RWMutex mu *sync.RWMutex
isClosed bool isClosed bool
@@ -24,8 +36,17 @@ type FS struct {
locker *fsutils.Locker locker *fsutils.Locker
} }
// OpenFS 打开文件系统
func OpenFS(dir string, options *FSOptions) (*FS, error) { func OpenFS(dir string, options *FSOptions) (*FS, error) {
if !IsEnabled() {
return nil, errors.New("the fs only works under 64 bit system")
}
if options == nil {
options = DefaultFSOptions
} else {
options.EnsureDefaults() options.EnsureDefaults()
}
var locker = fsutils.NewLocker(dir + "/fs") var locker = fsutils.NewLocker(dir + "/fs")
err := locker.Lock() err := locker.Lock()
@@ -36,6 +57,10 @@ func OpenFS(dir string, options *FSOptions) (*FS, error) {
var fs = &FS{ var fs = &FS{
dir: dir, dir: dir,
bMap: map[string]*BlocksFile{}, bMap: map[string]*BlocksFile{},
bList: linkedlist.NewList[string](),
bItemMap: map[string]*linkedlist.Item[string]{},
closingBMap: map[string]zero.Zero{},
closingBChan: make(chan *BlocksFile, 32),
mu: &sync.RWMutex{}, mu: &sync.RWMutex{},
opt: options, opt: options,
syncTicker: time.NewTicker(1 * time.Second), syncTicker: time.NewTicker(1 * time.Second),
@@ -46,17 +71,32 @@ func OpenFS(dir string, options *FSOptions) (*FS, error) {
} }
func (this *FS) init() { func (this *FS) init() {
go func() {
// sync in background // sync in background
for range this.syncTicker.C { for range this.syncTicker.C {
this.syncLoop() this.syncLoop()
} }
}()
go func() {
for {
this.processClosingBFiles()
}
}()
} }
// OpenFileWriter 打开文件写入器
func (this *FS) OpenFileWriter(hash string, bodySize int64, isPartial bool) (*FileWriter, error) { func (this *FS) OpenFileWriter(hash string, bodySize int64, isPartial bool) (*FileWriter, error) {
if this.isClosed {
return nil, errors.New("the fs closed")
}
if isPartial && bodySize <= 0 { if isPartial && bodySize <= 0 {
return nil, errors.New("invalid body size for partial content") return nil, errors.New("invalid body size for partial content")
} }
// TODO 限制同一个hash同时只能有一个Writer
bFile, err := this.openBFileForHashWriting(hash) bFile, err := this.openBFileForHashWriting(hash)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -64,7 +104,12 @@ func (this *FS) OpenFileWriter(hash string, bodySize int64, isPartial bool) (*Fi
return bFile.OpenFileWriter(hash, bodySize, isPartial) return bFile.OpenFileWriter(hash, bodySize, isPartial)
} }
// OpenFileReader 打开文件读取器
func (this *FS) OpenFileReader(hash string, isPartial bool) (*FileReader, error) { func (this *FS) OpenFileReader(hash string, isPartial bool) (*FileReader, error) {
if this.isClosed {
return nil, errors.New("the fs closed")
}
bFile, err := this.openBFileForHashReading(hash) bFile, err := this.openBFileForHashReading(hash)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -73,6 +118,10 @@ func (this *FS) OpenFileReader(hash string, isPartial bool) (*FileReader, error)
} }
func (this *FS) ExistFile(hash string) (bool, error) { func (this *FS) ExistFile(hash string) (bool, error) {
if this.isClosed {
return false, errors.New("the fs closed")
}
bFile, err := this.openBFileForHashReading(hash) bFile, err := this.openBFileForHashReading(hash)
if err != nil { if err != nil {
return false, err return false, err
@@ -81,6 +130,10 @@ func (this *FS) ExistFile(hash string) (bool, error) {
} }
func (this *FS) RemoveFile(hash string) error { func (this *FS) RemoveFile(hash string) error {
if this.isClosed {
return errors.New("the fs closed")
}
bFile, err := this.openBFileForHashWriting(hash) bFile, err := this.openBFileForHashWriting(hash)
if err != nil { if err != nil {
return err return err
@@ -89,8 +142,15 @@ func (this *FS) RemoveFile(hash string) error {
} }
func (this *FS) Close() error { func (this *FS) Close() error {
if this.isClosed {
return nil
}
this.isClosed = true this.isClosed = true
close(this.closingBChan)
this.syncTicker.Stop()
var lastErr error var lastErr error
this.mu.Lock() this.mu.Lock()
for _, bFile := range this.bMap { for _, bFile := range this.bMap {
@@ -109,6 +169,14 @@ func (this *FS) Close() error {
return lastErr return lastErr
} }
func (this *FS) TestBMap() map[string]*BlocksFile {
return this.bMap
}
func (this *FS) TestBList() *linkedlist.List[string] {
return this.bList
}
func (this *FS) bPathForHash(hash string) (path string, bName string, err error) { func (this *FS) bPathForHash(hash string) (path string, bName string, err error) {
err = CheckHashErr(hash) err = CheckHashErr(hash)
if err != nil { if err != nil {
@@ -170,6 +238,15 @@ func (this *FS) openBFileForHashWriting(hash string) (*BlocksFile, error) {
bFile, ok := this.bMap[bName] bFile, ok := this.bMap[bName]
this.mu.RUnlock() this.mu.RUnlock()
if ok { if ok {
// 调整当前BFile所在位置
this.mu.Lock()
item, itemOk := this.bItemMap[bName]
if itemOk {
this.bList.Remove(item)
this.bList.Push(item)
}
this.mu.Unlock()
return bFile, nil return bFile, nil
} }
@@ -191,6 +268,15 @@ func (this *FS) openBFileForHashReading(hash string) (*BlocksFile, error) {
bFile, ok := this.bMap[bName] bFile, ok := this.bMap[bName]
this.mu.RUnlock() this.mu.RUnlock()
if ok { if ok {
// 调整当前BFile所在位置
this.mu.Lock()
item, itemOk := this.bItemMap[bName]
if itemOk {
this.bList.Remove(item)
this.bList.Push(item)
}
this.mu.Unlock()
return bFile, nil return bFile, nil
} }
@@ -198,6 +284,28 @@ func (this *FS) openBFileForHashReading(hash string) (*BlocksFile, error) {
} }
func (this *FS) openBFile(bPath string, bName string) (*BlocksFile, error) { func (this *FS) openBFile(bPath string, bName string) (*BlocksFile, error) {
// check closing queue
this.mu.RLock()
_, isClosing := this.closingBMap[bPath]
this.mu.RUnlock()
if isClosing {
var maxWaits = 30_000
for {
this.mu.RLock()
_, isClosing = this.closingBMap[bPath]
this.mu.RUnlock()
if !isClosing {
break
}
time.Sleep(1 * time.Millisecond)
maxWaits--
if maxWaits < 0 {
return nil, errors.New("open blocks file timeout")
}
}
}
this.mu.Lock() this.mu.Lock()
defer this.mu.Unlock() defer this.mu.Unlock()
@@ -214,5 +322,49 @@ func (this *FS) openBFile(bPath string, bName string) (*BlocksFile, error) {
return nil, err return nil, err
} }
this.bMap[bName] = bFile this.bMap[bName] = bFile
// 加入到列表中
var item = linkedlist.NewItem(bName)
this.bList.Push(item)
this.bItemMap[bName] = item
// 检查是否超出maxOpenFiles
if this.bList.Len() > this.opt.MaxOpenFiles {
var firstBName = this.bList.Shift().Value
var firstBFile = this.bMap[firstBName]
delete(this.bMap, firstBName)
delete(this.bItemMap, firstBName)
this.closingBMap[firstBFile.Filename()] = zero.Zero{}
// MUST run in goroutine
go func() {
// 因为 closingBChan 可能已经关闭
defer func() {
recover()
}()
this.closingBChan <- firstBFile
}()
}
return bFile, nil return bFile, nil
} }
// 处理关闭中的 BFile 们
func (this *FS) processClosingBFiles() {
if this.isClosed {
return
}
var bFile = <-this.closingBChan
if bFile == nil {
return
}
_ = bFile.Close()
this.mu.Lock()
delete(this.closingBMap, bFile.Filename())
this.mu.Unlock()
}

View File

@@ -5,7 +5,7 @@ package bfs
import "time" import "time"
type FSOptions struct { type FSOptions struct {
MaxOpenFiles int // TODO 需要实现 MaxOpenFiles int
BytesPerSync int64 BytesPerSync int64
SyncTimeout time.Duration SyncTimeout time.Duration
MaxSyncFiles int MaxSyncFiles int

View File

@@ -5,10 +5,14 @@ package bfs_test
import ( import (
"github.com/TeaOSLab/EdgeNode/internal/utils/bfs" "github.com/TeaOSLab/EdgeNode/internal/utils/bfs"
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
"github.com/TeaOSLab/EdgeNode/internal/utils/linkedlist"
"github.com/TeaOSLab/EdgeNode/internal/utils/testutils"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
_ "github.com/iwind/TeaGo/bootstrap" _ "github.com/iwind/TeaGo/bootstrap"
"github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/types"
"io" "io"
"os"
"testing" "testing"
) )
@@ -118,3 +122,72 @@ func TestFS_RemoveFile(t *testing.T) {
} }
t.Log("exist:", exist) t.Log("exist:", exist)
} }
func TestFS_OpenFileWriter_Close(t *testing.T) {
if !testutils.IsSingleTesting() {
return
}
fs, openErr := bfs.OpenFS(Tea.Root+"/data/bfs/test", &bfs.FSOptions{
MaxOpenFiles: 4 << 10,
})
if openErr != nil {
t.Fatal(openErr)
}
defer func() {
_ = fs.Close()
}()
var count = 10
if testutils.IsSingleTesting() {
count = 1000
}
for i := 0; i < count; i++ {
//t.Log("open", i)
writer, err := fs.OpenFileWriter(bfs.Hash(types.String(i)), -1, false)
if err != nil {
t.Fatal(err)
}
_ = writer.Close()
}
t.Log(len(fs.TestBMap()), "block files, pid:", os.Getpid())
var p = func() {
var bNames []string
fs.TestBList().Range(func(item *linkedlist.Item[string]) (goNext bool) {
bNames = append(bNames, item.Value)
return true
})
if len(bNames) != len(fs.TestBMap()) {
t.Fatal("len(bNames)!=len(bMap)")
}
t.Log("["+types.String(len(bNames))+"]", bNames)
}
p()
{
writer, err := fs.OpenFileWriter(bfs.Hash(types.String(10)), -1, false)
if err != nil {
t.Fatal(err)
}
_ = writer.Close()
}
p()
// testing closing
for i := 0; i < 3; i++ {
writer, err := fs.OpenFileWriter(bfs.Hash(types.String(0)), -1, false)
if err != nil {
t.Fatal(err)
}
_ = writer.Close()
}
p()
}

View File

@@ -369,24 +369,8 @@ func (this *MetaFile) decodeHeader(data []byte) (*FileHeader, error) {
_ = gzReader.Close() _ = gzReader.Close()
}() }()
var resultBuf = bytes.NewBuffer(nil)
var buf = make([]byte, 4096)
for {
n, readErr := gzReader.Read(buf)
if n > 0 {
resultBuf.Write(buf[:n])
}
if readErr != nil {
if readErr == io.EOF {
break
}
return nil, readErr
}
}
var header = &FileHeader{} var header = &FileHeader{}
err = json.Unmarshal(resultBuf.Bytes(), header) err = json.NewDecoder(gzReader).Decode(header)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -5,9 +5,11 @@ package bfs_test
import ( import (
"github.com/TeaOSLab/EdgeNode/internal/utils/bfs" "github.com/TeaOSLab/EdgeNode/internal/utils/bfs"
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
"github.com/TeaOSLab/EdgeNode/internal/utils/testutils"
"github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/logs"
"sync" "sync"
"testing" "testing"
"time"
) )
func TestNewMetaFile(t *testing.T) { func TestNewMetaFile(t *testing.T) {
@@ -24,6 +26,28 @@ func TestNewMetaFile(t *testing.T) {
//logs.PrintAsJSON(mFile.Headers(), t) //logs.PrintAsJSON(mFile.Headers(), t)
} }
func TestNewMetaFile_Large(t *testing.T) {
var count = 2
if testutils.IsSingleTesting() {
count = 100
}
var before = time.Now()
for i := 0; i < count; i++ {
mFile, err := bfs.OpenMetaFile("testdata/test2.m", &sync.RWMutex{})
if err != nil {
if bfs.IsNotExist(err) {
continue
}
t.Fatal(err)
}
_ = mFile.Close()
}
var costMs = time.Since(before).Seconds() * 1000
t.Logf("cost: %.2fms, qps: %.2fms/file", costMs, costMs/float64(count))
}
func TestMetaFile_WriteMeta(t *testing.T) { func TestMetaFile_WriteMeta(t *testing.T) {
mFile, err := bfs.OpenMetaFile("testdata/test.m", &sync.RWMutex{}) mFile, err := bfs.OpenMetaFile("testdata/test.m", &sync.RWMutex{})
if err != nil { if err != nil {

View File

@@ -5,8 +5,9 @@ package bfs
import "github.com/TeaOSLab/EdgeNode/internal/zero" import "github.com/TeaOSLab/EdgeNode/internal/zero"
// TODO 使用atomic代替channel需要使用基准测试对比性能 // TODO 使用atomic代替channel需要使用基准测试对比性能
var readThreadsLimiter = make(chan zero.Zero, 16) // TODO 线程数可以根据硬盘数量动态调整?
var writeThreadsLimiter = make(chan zero.Zero, 16) var readThreadsLimiter = make(chan zero.Zero, 8)
var writeThreadsLimiter = make(chan zero.Zero, 8)
func AckReadThread() { func AckReadThread() {
readThreadsLimiter <- zero.Zero{} readThreadsLimiter <- zero.Zero{}