mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-02 22:10:25 +08:00
bfs:实现FileHeader的lazy load
This commit is contained in:
56
internal/utils/bfs/file_header_lazy.go
Normal file
56
internal/utils/bfs/file_header_lazy.go
Normal file
@@ -0,0 +1,56 @@
|
||||
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
|
||||
package bfs
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"github.com/klauspost/compress/gzip"
|
||||
)
|
||||
|
||||
// LazyFileHeader load file header lazily to save memory
|
||||
type LazyFileHeader struct {
|
||||
rawData []byte
|
||||
fileHeader *FileHeader
|
||||
}
|
||||
|
||||
func NewLazyFileHeaderFromData(rawData []byte) *LazyFileHeader {
|
||||
return &LazyFileHeader{
|
||||
rawData: rawData,
|
||||
}
|
||||
}
|
||||
|
||||
func NewLazyFileHeader(fileHeader *FileHeader) *LazyFileHeader {
|
||||
return &LazyFileHeader{
|
||||
fileHeader: fileHeader,
|
||||
}
|
||||
}
|
||||
|
||||
func (this *LazyFileHeader) FileHeaderUnsafe() (*FileHeader, error) {
|
||||
if this.fileHeader != nil {
|
||||
return this.fileHeader, nil
|
||||
}
|
||||
|
||||
// TODO 使用pool管理gzip
|
||||
gzReader, err := gzip.NewReader(bytes.NewBuffer(this.rawData))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
_ = gzReader.Close()
|
||||
}()
|
||||
|
||||
var header = &FileHeader{}
|
||||
err = json.NewDecoder(gzReader).Decode(header)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
header.IsWriting = false
|
||||
|
||||
this.fileHeader = header
|
||||
this.rawData = nil
|
||||
|
||||
return header, nil
|
||||
}
|
||||
@@ -2,7 +2,11 @@
|
||||
|
||||
package bfs
|
||||
|
||||
import "time"
|
||||
import (
|
||||
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
|
||||
memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem"
|
||||
"time"
|
||||
)
|
||||
|
||||
type FSOptions struct {
|
||||
MaxOpenFiles int
|
||||
@@ -13,10 +17,19 @@ type FSOptions struct {
|
||||
|
||||
func (this *FSOptions) EnsureDefaults() {
|
||||
if this.MaxOpenFiles <= 0 {
|
||||
this.MaxOpenFiles = 4 << 10
|
||||
// 根据内存计算最大打开文件数
|
||||
var maxOpenFiles = memutils.SystemMemoryGB() * 64
|
||||
if maxOpenFiles > (4 << 10) {
|
||||
maxOpenFiles = 4 << 10
|
||||
}
|
||||
this.MaxOpenFiles = maxOpenFiles
|
||||
}
|
||||
if this.BytesPerSync <= 0 {
|
||||
this.BytesPerSync = 1 << 20 // TODO 根据硬盘实际写入速度进行调整
|
||||
if fsutils.DiskIsFast() {
|
||||
this.BytesPerSync = 1 << 20 // TODO 根据硬盘实际写入速度进行调整
|
||||
} else {
|
||||
this.BytesPerSync = 512 << 10
|
||||
}
|
||||
}
|
||||
if this.SyncTimeout <= 0 {
|
||||
this.SyncTimeout = 1 * time.Second
|
||||
@@ -27,8 +40,8 @@ func (this *FSOptions) EnsureDefaults() {
|
||||
}
|
||||
|
||||
var DefaultFSOptions = &FSOptions{
|
||||
MaxOpenFiles: 4 << 10,
|
||||
BytesPerSync: 1 << 20, // TODO 根据硬盘实际写入速度进行调整
|
||||
MaxOpenFiles: 1 << 10,
|
||||
BytesPerSync: 512 << 10,
|
||||
SyncTimeout: 1 * time.Second,
|
||||
MaxSyncFiles: 32,
|
||||
}
|
||||
|
||||
@@ -41,7 +41,11 @@ func DecodeMetaBlock(blockBytes []byte) (action MetaAction, hash string, data []
|
||||
hash = string(blockBytes[5 : 5+HashLen])
|
||||
|
||||
if action == MetaActionNew {
|
||||
data = blockBytes[dataOffset:]
|
||||
var rawData = blockBytes[dataOffset:]
|
||||
if len(rawData) > 0 {
|
||||
data = make([]byte, len(rawData))
|
||||
copy(data, rawData)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
|
||||
@@ -21,11 +21,11 @@ const Version1 = 1
|
||||
type MetaFile struct {
|
||||
fp *os.File
|
||||
filename string
|
||||
headerMap map[string]*FileHeader // hash => *FileHeader
|
||||
mu *sync.RWMutex // TODO 考虑单独一个,不要和bFile共享?
|
||||
headerMap map[string]*LazyFileHeader // hash => *LazyFileHeader
|
||||
mu *sync.RWMutex // TODO 考虑单独一个,不要和bFile共享?
|
||||
|
||||
isModified bool
|
||||
modifiedHashMap map[string]zero.Zero
|
||||
modifiedHashMap map[string]zero.Zero // hash => Zero
|
||||
}
|
||||
|
||||
func OpenMetaFile(filename string, mu *sync.RWMutex) (*MetaFile, error) {
|
||||
@@ -37,7 +37,7 @@ func OpenMetaFile(filename string, mu *sync.RWMutex) (*MetaFile, error) {
|
||||
var mFile = &MetaFile{
|
||||
filename: filename,
|
||||
fp: fp,
|
||||
headerMap: map[string]*FileHeader{},
|
||||
headerMap: map[string]*LazyFileHeader{},
|
||||
mu: mu,
|
||||
modifiedHashMap: map[string]zero.Zero{},
|
||||
}
|
||||
@@ -59,7 +59,7 @@ func (this *MetaFile) load() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO 考虑文件最后一行未写完整的情形
|
||||
// TODO 检查文件是否完整
|
||||
|
||||
var buf = make([]byte, 4<<10)
|
||||
var blockBytes []byte
|
||||
@@ -82,11 +82,7 @@ func (this *MetaFile) load() error {
|
||||
|
||||
switch action {
|
||||
case MetaActionNew:
|
||||
header, decodeHeaderErr := this.decodeHeader(data)
|
||||
if decodeHeaderErr != nil {
|
||||
return decodeHeaderErr
|
||||
}
|
||||
this.headerMap[hash] = header
|
||||
this.headerMap[hash] = NewLazyFileHeaderFromData(data)
|
||||
case MetaActionRemove:
|
||||
delete(this.headerMap, hash)
|
||||
}
|
||||
@@ -106,16 +102,17 @@ func (this *MetaFile) load() error {
|
||||
}
|
||||
|
||||
func (this *MetaFile) WriteMeta(hash string, status int, expiresAt int64, expectedFileSize int64) error {
|
||||
|
||||
this.mu.Lock()
|
||||
defer this.mu.Unlock()
|
||||
|
||||
this.headerMap[hash] = &FileHeader{
|
||||
this.headerMap[hash] = NewLazyFileHeader(&FileHeader{
|
||||
Version: Version1,
|
||||
ExpiresAt: expiresAt,
|
||||
Status: status,
|
||||
ExpiredBodySize: expectedFileSize,
|
||||
IsWriting: true,
|
||||
}
|
||||
})
|
||||
|
||||
this.modifiedHashMap[hash] = zero.Zero{}
|
||||
|
||||
@@ -123,11 +120,16 @@ func (this *MetaFile) WriteMeta(hash string, status int, expiresAt int64, expect
|
||||
}
|
||||
|
||||
func (this *MetaFile) WriteHeaderBlockUnsafe(hash string, bOffsetFrom int64, bOffsetTo int64) error {
|
||||
header, ok := this.headerMap[hash]
|
||||
lazyHeader, ok := this.headerMap[hash]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
header, err := lazyHeader.FileHeaderUnsafe()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO 合并相邻block
|
||||
header.HeaderBlocks = append(header.HeaderBlocks, BlockInfo{
|
||||
BFileOffsetFrom: bOffsetFrom,
|
||||
@@ -140,11 +142,16 @@ func (this *MetaFile) WriteHeaderBlockUnsafe(hash string, bOffsetFrom int64, bOf
|
||||
}
|
||||
|
||||
func (this *MetaFile) WriteBodyBlockUnsafe(hash string, bOffsetFrom int64, bOffsetTo int64, originOffsetFrom int64, originOffsetTo int64) error {
|
||||
header, ok := this.headerMap[hash]
|
||||
lazyHeader, ok := this.headerMap[hash]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
header, err := lazyHeader.FileHeaderUnsafe()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO 合并相邻block
|
||||
header.BodyBlocks = append(header.BodyBlocks, BlockInfo{
|
||||
OriginOffsetFrom: originOffsetFrom,
|
||||
@@ -162,21 +169,27 @@ func (this *MetaFile) WriteClose(hash string, headerSize int64, bodySize int64)
|
||||
// TODO 考虑单个hash多次重复调用的情况
|
||||
|
||||
this.mu.Lock()
|
||||
header, ok := this.headerMap[hash]
|
||||
if ok {
|
||||
// TODO 检查bodySize和expectedBodySize是否一致,如果不一致则从headerMap中删除
|
||||
|
||||
header.ModifiedAt = fasttime.Now().Unix()
|
||||
header.HeaderSize = headerSize
|
||||
header.BodySize = bodySize
|
||||
header.Compact()
|
||||
}
|
||||
this.mu.Unlock()
|
||||
lazyHeader, ok := this.headerMap[hash]
|
||||
if !ok {
|
||||
this.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
blockBytes, err := this.encodeHeader(hash, header)
|
||||
header, err := lazyHeader.FileHeaderUnsafe()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
this.mu.Unlock()
|
||||
|
||||
// TODO 检查bodySize和expectedBodySize是否一致,如果不一致则从headerMap中删除
|
||||
|
||||
header.ModifiedAt = fasttime.Now().Unix()
|
||||
header.HeaderSize = headerSize
|
||||
header.BodySize = bodySize
|
||||
header.Compact()
|
||||
|
||||
blockBytes, err := this.encodeFileHeader(hash, header)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -229,28 +242,53 @@ func (this *MetaFile) RemoveFile(hash string) error {
|
||||
func (this *MetaFile) FileHeader(hash string) (header *FileHeader, ok bool) {
|
||||
this.mu.RLock()
|
||||
defer this.mu.RUnlock()
|
||||
header, ok = this.headerMap[hash]
|
||||
|
||||
lazyHeader, ok := this.headerMap[hash]
|
||||
|
||||
if ok {
|
||||
var err error
|
||||
header, err = lazyHeader.FileHeaderUnsafe()
|
||||
if err != nil {
|
||||
ok = false
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (this *MetaFile) FileHeaderUnsafe(hash string) (header *FileHeader, ok bool) {
|
||||
header, ok = this.headerMap[hash]
|
||||
lazyHeader, ok := this.headerMap[hash]
|
||||
|
||||
if ok {
|
||||
var err error
|
||||
header, err = lazyHeader.FileHeaderUnsafe()
|
||||
if err != nil {
|
||||
ok = false
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (this *MetaFile) CloneFileHeader(hash string) (header *FileHeader, ok bool) {
|
||||
this.mu.RLock()
|
||||
defer this.mu.RUnlock()
|
||||
header, ok = this.headerMap[hash]
|
||||
lazyHeader, ok := this.headerMap[hash]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
var err error
|
||||
header, err = lazyHeader.FileHeaderUnsafe()
|
||||
if err != nil {
|
||||
ok = false
|
||||
return
|
||||
}
|
||||
|
||||
header = header.Clone()
|
||||
return
|
||||
}
|
||||
|
||||
func (this *MetaFile) FileHeaders() map[string]*FileHeader {
|
||||
func (this *MetaFile) FileHeaders() map[string]*LazyFileHeader {
|
||||
this.mu.RLock()
|
||||
defer this.mu.RUnlock()
|
||||
return this.headerMap
|
||||
@@ -271,8 +309,13 @@ func (this *MetaFile) Compact() error {
|
||||
defer this.mu.Unlock()
|
||||
|
||||
var buf = bytes.NewBuffer(nil)
|
||||
for hash, header := range this.headerMap {
|
||||
blockBytes, err := this.encodeHeader(hash, header)
|
||||
for hash, lazyHeader := range this.headerMap {
|
||||
header, err := lazyHeader.FileHeaderUnsafe()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
blockBytes, err := this.encodeFileHeader(hash, header)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -313,8 +356,12 @@ func (this *MetaFile) SyncUnsafe() error {
|
||||
}
|
||||
|
||||
for hash := range this.modifiedHashMap {
|
||||
header, ok := this.headerMap[hash]
|
||||
lazyHeader, ok := this.headerMap[hash]
|
||||
if ok {
|
||||
header, decodeErr := lazyHeader.FileHeaderUnsafe()
|
||||
if decodeErr != nil {
|
||||
return decodeErr
|
||||
}
|
||||
header.IsWriting = false
|
||||
}
|
||||
}
|
||||
@@ -335,7 +382,8 @@ func (this *MetaFile) RemoveAll() error {
|
||||
return os.Remove(this.fp.Name())
|
||||
}
|
||||
|
||||
func (this *MetaFile) encodeHeader(hash string, header *FileHeader) ([]byte, error) {
|
||||
// encode file header to data bytes
|
||||
func (this *MetaFile) encodeFileHeader(hash string, header *FileHeader) ([]byte, error) {
|
||||
headerJSON, err := json.Marshal(header)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -363,23 +411,3 @@ func (this *MetaFile) encodeHeader(hash string, header *FileHeader) ([]byte, err
|
||||
|
||||
return EncodeMetaBlock(MetaActionNew, hash, buf.Bytes())
|
||||
}
|
||||
|
||||
func (this *MetaFile) decodeHeader(data []byte) (*FileHeader, error) {
|
||||
gzReader, err := gzip.NewReader(bytes.NewBuffer(data))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
_ = gzReader.Close()
|
||||
}()
|
||||
|
||||
var header = &FileHeader{}
|
||||
err = json.NewDecoder(gzReader).Decode(header)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
header.IsWriting = false
|
||||
return header, nil
|
||||
}
|
||||
|
||||
@@ -48,6 +48,52 @@ func TestNewMetaFile_Large(t *testing.T) {
|
||||
t.Logf("cost: %.2fms, qps: %.2fms/file", costMs, costMs/float64(count))
|
||||
}
|
||||
|
||||
func TestNewMetaFile_Memory(t *testing.T) {
|
||||
var count = 2
|
||||
|
||||
if testutils.IsSingleTesting() {
|
||||
count = 100
|
||||
}
|
||||
|
||||
var stat1 = testutils.ReadMemoryStat()
|
||||
|
||||
var mFiles []*bfs.MetaFile
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
mFile, err := bfs.OpenMetaFile("testdata/test2.m", &sync.RWMutex{})
|
||||
if err != nil {
|
||||
if bfs.IsNotExist(err) {
|
||||
continue
|
||||
}
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_ = mFile.Close()
|
||||
mFiles = append(mFiles, mFile)
|
||||
}
|
||||
|
||||
var stat2 = testutils.ReadMemoryStat()
|
||||
t.Log((stat2.HeapInuse-stat1.HeapInuse)>>20, "MiB")
|
||||
}
|
||||
|
||||
func TestMetaFile_FileHeaders(t *testing.T) {
|
||||
mFile, openErr := bfs.OpenMetaFile("testdata/test2.m", &sync.RWMutex{})
|
||||
if openErr != nil {
|
||||
if bfs.IsNotExist(openErr) {
|
||||
return
|
||||
}
|
||||
t.Fatal(openErr)
|
||||
}
|
||||
_ = mFile.Close()
|
||||
for hash, lazyHeader := range mFile.FileHeaders() {
|
||||
header, err := lazyHeader.FileHeaderUnsafe()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log(hash, header.ModifiedAt, header.BodySize)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetaFile_WriteMeta(t *testing.T) {
|
||||
mFile, err := bfs.OpenMetaFile("testdata/test.m", &sync.RWMutex{})
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user