实现open file cache

This commit is contained in:
刘祥超
2022-01-12 21:09:00 +08:00
parent 91fab59a18
commit d02f9f9a0e
12 changed files with 553 additions and 144 deletions

View File

@@ -14,12 +14,12 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/zero"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
stringutil "github.com/iwind/TeaGo/utils/string"
"golang.org/x/text/language"
"golang.org/x/text/message"
"io"
"math"
"os"
"path/filepath"
@@ -67,6 +67,8 @@ type FileStorage struct {
hotMapLocker sync.Mutex
lastHotSize int
hotTicker *utils.Ticker
openFileCache *OpenFileCache
}
func NewFileStorage(policy *serverconfigs.HTTPCachePolicy) *FileStorage {
@@ -100,13 +102,13 @@ func (this *FileStorage) Init() error {
return err
}
this.cacheConfig = cacheConfig
cacheDir := cacheConfig.Dir
if !filepath.IsAbs(this.cacheConfig.Dir) {
this.cacheConfig.Dir = Tea.Root + Tea.DS + this.cacheConfig.Dir
}
dir := this.cacheConfig.Dir
this.cacheConfig.Dir = filepath.Clean(this.cacheConfig.Dir)
var dir = this.cacheConfig.Dir
if len(dir) == 0 {
return errors.New("[CACHE]cache storage dir can not be empty")
@@ -159,7 +161,7 @@ func (this *FileStorage) Init() error {
} else if size > 1024 {
sizeMB = fmt.Sprintf("%.3f K", float64(size)/1024)
}
remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+" from '"+cacheDir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+message.NewPrinter(language.English).Sprintf("%d", count)+", size: "+sizeMB)
remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+" from '"+this.cacheConfig.Dir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+message.NewPrinter(language.English).Sprintf("%d", count)+", size: "+sizeMB)
}()
// 初始化list
@@ -202,6 +204,15 @@ func (this *FileStorage) Init() error {
}
}
// open file cache
if this.cacheConfig.OpenFileCache != nil && this.cacheConfig.OpenFileCache.IsOn && this.cacheConfig.OpenFileCache.Max > 0 {
this.openFileCache, err = NewOpenFileCache(this.cacheConfig.OpenFileCache.Max)
logs.Println("start open file cache")
if err != nil {
remotelogs.Error("CACHE", "open file cache failed: "+err.Error())
}
}
return nil
}
@@ -238,7 +249,18 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool)
// TODO 尝试使用mmap加快读取速度
var isOk = false
fp, err := os.OpenFile(path, os.O_RDONLY, 0444)
var openFile *OpenFile
if this.openFileCache != nil {
openFile = this.openFileCache.Get(path)
}
var fp *os.File
var err error
var meta []byte
if openFile != nil {
fp, meta = openFile.fp, openFile.meta
} else {
fp, err = os.OpenFile(path, os.O_RDONLY, 0444)
}
if err != nil {
if !os.IsNotExist(err) {
return nil, err
@@ -253,8 +275,10 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool)
}()
reader := NewFileReader(fp)
if err != nil {
return nil, err
reader.openFile = openFile
reader.openFileCache = this.openFileCache
if len(meta) > 0 {
reader.meta = meta
}
err = reader.Init()
if err != nil {
@@ -623,6 +647,8 @@ func (this *FileStorage) Purge(keys []string, urlType string) error {
// Stop 停止
func (this *FileStorage) Stop() {
events.Remove(this)
this.locker.Lock()
defer this.locker.Unlock()
@@ -640,6 +666,10 @@ func (this *FileStorage) Stop() {
}
_ = this.list.Close()
if this.openFileCache != nil {
this.openFileCache.CloseAll()
}
}
// TotalDiskSize 消耗的磁盘尺寸
@@ -702,11 +732,20 @@ func (this *FileStorage) initList() error {
}
}
this.purgeTicker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second)
events.On(events.EventQuit, func() {
events.OnKey(events.EventQuit, this, func() {
remotelogs.Println("CACHE", "quit clean timer")
var ticker = this.purgeTicker
if ticker != nil {
ticker.Stop()
{
var ticker = this.purgeTicker
if ticker != nil {
ticker.Stop()
}
}
{
var ticker = this.hotTicker
if ticker != nil {
ticker.Stop()
}
}
})
goman.New(func() {
@@ -733,98 +772,6 @@ func (this *FileStorage) initList() error {
return nil
}
// 解析文件信息
func (this *FileStorage) decodeFile(path string) (*Item, error) {
fp, err := os.OpenFile(path, os.O_RDONLY, 0444)
if err != nil {
return nil, err
}
isAllOk := false
defer func() {
_ = fp.Close()
if !isAllOk {
_ = os.Remove(path)
}
}()
item := &Item{
Type: ItemTypeFile,
MetaSize: SizeMeta,
}
bytes4 := make([]byte, 4)
// 过期时间
ok, err := this.readToBuff(fp, bytes4)
if err != nil {
return nil, err
}
if !ok {
return nil, ErrNotFound
}
item.ExpiredAt = int64(binary.BigEndian.Uint32(bytes4))
// 是否已过期
if item.ExpiredAt < time.Now().Unix() {
return nil, ErrNotFound
}
// URL Size
_, err = fp.Seek(int64(SizeExpiresAt+SizeStatus), io.SeekStart)
if err != nil {
return nil, err
}
ok, err = this.readToBuff(fp, bytes4)
if err != nil {
return nil, err
}
if !ok {
return nil, ErrNotFound
}
urlSize := binary.BigEndian.Uint32(bytes4)
// Header Size
ok, err = this.readToBuff(fp, bytes4)
if err != nil {
return nil, err
}
if !ok {
return nil, ErrNotFound
}
item.HeaderSize = int64(binary.BigEndian.Uint32(bytes4))
// Body Size
bytes8 := make([]byte, 8)
ok, err = this.readToBuff(fp, bytes8)
if err != nil {
return nil, err
}
if !ok {
return nil, ErrNotFound
}
item.BodySize = int64(binary.BigEndian.Uint64(bytes8))
// URL
if urlSize > 0 {
data := utils.BytePool1k.Get()
result, ok, err := this.readN(fp, data, int(urlSize))
utils.BytePool1k.Put(data)
if err != nil {
return nil, err
}
if !ok {
return nil, ErrNotFound
}
item.Key = string(result)
}
isAllOk = true
return item, nil
}
// 清理任务
func (this *FileStorage) purgeLoop() {
// 计算是否应该开启LFU清理
@@ -1008,34 +955,6 @@ func (this *FileStorage) hotLoop() {
}
}
func (this *FileStorage) readToBuff(fp *os.File, buf []byte) (ok bool, err error) {
n, err := fp.Read(buf)
if err != nil {
return false, err
}
ok = n == len(buf)
return
}
func (this *FileStorage) readN(fp *os.File, buf []byte, total int) (result []byte, ok bool, err error) {
for {
n, err := fp.Read(buf)
if err != nil {
return nil, false, err
}
if n > 0 {
if n >= total {
result = append(result, buf[:total]...)
ok = true
return result, ok, nil
} else {
total -= n
result = append(result, buf[:n]...)
}
}
}
}
func (this *FileStorage) diskCapacityBytes() int64 {
c1 := this.policy.CapacityBytes()
if SharedManager.MaxDiskCapacity != nil {