bfs:实现maxOpenFiles

This commit is contained in:
GoEdgeLab
2024-04-27 17:29:12 +08:00
parent 337799cad7
commit 7b75c508c6
9 changed files with 318 additions and 54 deletions

View File

@@ -5,17 +5,29 @@ package bfs
import (
"errors"
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
"github.com/TeaOSLab/EdgeNode/internal/utils/linkedlist"
"github.com/TeaOSLab/EdgeNode/internal/zero"
"log"
"runtime"
"sync"
"time"
)
// FS 文件系统管理
func IsEnabled() bool {
return runtime.GOARCH == "amd64" || runtime.GOARCH == "arm64"
}
// FS 文件系统对象
type FS struct {
dir string
opt *FSOptions
bMap map[string]*BlocksFile // name => *BlocksFile
bMap map[string]*BlocksFile // name => *BlocksFile
bList *linkedlist.List[string] // [bName]
bItemMap map[string]*linkedlist.Item[string]
closingBMap map[string]zero.Zero // filename => Zero
closingBChan chan *BlocksFile
mu *sync.RWMutex
isClosed bool
@@ -24,8 +36,17 @@ type FS struct {
locker *fsutils.Locker
}
// OpenFS 打开文件系统
func OpenFS(dir string, options *FSOptions) (*FS, error) {
options.EnsureDefaults()
if !IsEnabled() {
return nil, errors.New("the fs only works under 64 bit system")
}
if options == nil {
options = DefaultFSOptions
} else {
options.EnsureDefaults()
}
var locker = fsutils.NewLocker(dir + "/fs")
err := locker.Lock()
@@ -34,29 +55,48 @@ func OpenFS(dir string, options *FSOptions) (*FS, error) {
}
var fs = &FS{
dir: dir,
bMap: map[string]*BlocksFile{},
mu: &sync.RWMutex{},
opt: options,
syncTicker: time.NewTicker(1 * time.Second),
locker: locker,
dir: dir,
bMap: map[string]*BlocksFile{},
bList: linkedlist.NewList[string](),
bItemMap: map[string]*linkedlist.Item[string]{},
closingBMap: map[string]zero.Zero{},
closingBChan: make(chan *BlocksFile, 32),
mu: &sync.RWMutex{},
opt: options,
syncTicker: time.NewTicker(1 * time.Second),
locker: locker,
}
go fs.init()
return fs, nil
}
func (this *FS) init() {
// sync in background
for range this.syncTicker.C {
this.syncLoop()
}
go func() {
// sync in background
for range this.syncTicker.C {
this.syncLoop()
}
}()
go func() {
for {
this.processClosingBFiles()
}
}()
}
// OpenFileWriter 打开文件写入器
func (this *FS) OpenFileWriter(hash string, bodySize int64, isPartial bool) (*FileWriter, error) {
if this.isClosed {
return nil, errors.New("the fs closed")
}
if isPartial && bodySize <= 0 {
return nil, errors.New("invalid body size for partial content")
}
// TODO 限制同一个hash同时只能有一个Writer
bFile, err := this.openBFileForHashWriting(hash)
if err != nil {
return nil, err
@@ -64,7 +104,12 @@ func (this *FS) OpenFileWriter(hash string, bodySize int64, isPartial bool) (*Fi
return bFile.OpenFileWriter(hash, bodySize, isPartial)
}
// OpenFileReader 打开文件读取器
func (this *FS) OpenFileReader(hash string, isPartial bool) (*FileReader, error) {
if this.isClosed {
return nil, errors.New("the fs closed")
}
bFile, err := this.openBFileForHashReading(hash)
if err != nil {
return nil, err
@@ -73,6 +118,10 @@ func (this *FS) OpenFileReader(hash string, isPartial bool) (*FileReader, error)
}
func (this *FS) ExistFile(hash string) (bool, error) {
if this.isClosed {
return false, errors.New("the fs closed")
}
bFile, err := this.openBFileForHashReading(hash)
if err != nil {
return false, err
@@ -81,6 +130,10 @@ func (this *FS) ExistFile(hash string) (bool, error) {
}
func (this *FS) RemoveFile(hash string) error {
if this.isClosed {
return errors.New("the fs closed")
}
bFile, err := this.openBFileForHashWriting(hash)
if err != nil {
return err
@@ -89,8 +142,15 @@ func (this *FS) RemoveFile(hash string) error {
}
func (this *FS) Close() error {
if this.isClosed {
return nil
}
this.isClosed = true
close(this.closingBChan)
this.syncTicker.Stop()
var lastErr error
this.mu.Lock()
for _, bFile := range this.bMap {
@@ -109,6 +169,14 @@ func (this *FS) Close() error {
return lastErr
}
func (this *FS) TestBMap() map[string]*BlocksFile {
return this.bMap
}
func (this *FS) TestBList() *linkedlist.List[string] {
return this.bList
}
func (this *FS) bPathForHash(hash string) (path string, bName string, err error) {
err = CheckHashErr(hash)
if err != nil {
@@ -170,6 +238,15 @@ func (this *FS) openBFileForHashWriting(hash string) (*BlocksFile, error) {
bFile, ok := this.bMap[bName]
this.mu.RUnlock()
if ok {
// 调整当前BFile所在位置
this.mu.Lock()
item, itemOk := this.bItemMap[bName]
if itemOk {
this.bList.Remove(item)
this.bList.Push(item)
}
this.mu.Unlock()
return bFile, nil
}
@@ -191,6 +268,15 @@ func (this *FS) openBFileForHashReading(hash string) (*BlocksFile, error) {
bFile, ok := this.bMap[bName]
this.mu.RUnlock()
if ok {
// 调整当前BFile所在位置
this.mu.Lock()
item, itemOk := this.bItemMap[bName]
if itemOk {
this.bList.Remove(item)
this.bList.Push(item)
}
this.mu.Unlock()
return bFile, nil
}
@@ -198,6 +284,28 @@ func (this *FS) openBFileForHashReading(hash string) (*BlocksFile, error) {
}
func (this *FS) openBFile(bPath string, bName string) (*BlocksFile, error) {
// check closing queue
this.mu.RLock()
_, isClosing := this.closingBMap[bPath]
this.mu.RUnlock()
if isClosing {
var maxWaits = 30_000
for {
this.mu.RLock()
_, isClosing = this.closingBMap[bPath]
this.mu.RUnlock()
if !isClosing {
break
}
time.Sleep(1 * time.Millisecond)
maxWaits--
if maxWaits < 0 {
return nil, errors.New("open blocks file timeout")
}
}
}
this.mu.Lock()
defer this.mu.Unlock()
@@ -214,5 +322,49 @@ func (this *FS) openBFile(bPath string, bName string) (*BlocksFile, error) {
return nil, err
}
this.bMap[bName] = bFile
// 加入到列表中
var item = linkedlist.NewItem(bName)
this.bList.Push(item)
this.bItemMap[bName] = item
// 检查是否超出maxOpenFiles
if this.bList.Len() > this.opt.MaxOpenFiles {
var firstBName = this.bList.Shift().Value
var firstBFile = this.bMap[firstBName]
delete(this.bMap, firstBName)
delete(this.bItemMap, firstBName)
this.closingBMap[firstBFile.Filename()] = zero.Zero{}
// MUST run in goroutine
go func() {
// 因为 closingBChan 可能已经关闭
defer func() {
recover()
}()
this.closingBChan <- firstBFile
}()
}
return bFile, nil
}
// 处理关闭中的 BFile 们
func (this *FS) processClosingBFiles() {
if this.isClosed {
return
}
var bFile = <-this.closingBChan
if bFile == nil {
return
}
_ = bFile.Close()
this.mu.Lock()
delete(this.closingBMap, bFile.Filename())
this.mu.Unlock()
}