优化MMAP相关功能

This commit is contained in:
GoEdgeLab
2024-04-04 08:28:14 +08:00
parent 4c30c28b4c
commit dcd8a0e020
12 changed files with 65 additions and 61 deletions

View File

@@ -78,7 +78,7 @@ func (this *KVFileList) Add(hash string, item *Item) error {
} }
// Exist 检查内容是否存在 // Exist 检查内容是否存在
func (this *KVFileList) Exist(hash string) (bool, error) { func (this *KVFileList) Exist(hash string) (bool, int64, error) {
return this.getStore(hash).ExistItem(hash) return this.getStore(hash).ExistItem(hash)
} }

View File

@@ -90,23 +90,23 @@ func (this *KVListFileStore) AddItem(hash string, item *Item) error {
return this.itemsTable.Set(hash, item) return this.itemsTable.Set(hash, item)
} }
func (this *KVListFileStore) ExistItem(hash string) (bool, error) { func (this *KVListFileStore) ExistItem(hash string) (bool, int64, error) {
if !this.isReady() { if !this.isReady() {
return false, nil return false, -1, nil
} }
item, err := this.itemsTable.Get(hash) item, err := this.itemsTable.Get(hash)
if err != nil { if err != nil {
if kvstore.IsNotFound(err) { if kvstore.IsNotFound(err) {
return false, nil return false, -1, nil
} }
return false, err return false, -1, err
} }
if item == nil { if item == nil {
return false, nil return false, -1, nil
} }
return item.ExpiresAt >= fasttime.Now().Unix(), nil return item.ExpiresAt > fasttime.Now().Unix(), item.HeaderSize + item.BodySize, nil
} }
func (this *KVListFileStore) ExistQuickItem(hash string) (bool, error) { func (this *KVListFileStore) ExistQuickItem(hash string) (bool, error) {

View File

@@ -159,7 +159,7 @@ func TestKVFileList_Exist(t *testing.T) {
stringutil.Md5("123456"), stringutil.Md5("123456"),
stringutil.Md5("654321"), stringutil.Md5("654321"),
} { } {
b, err := list.Exist(hash) b, _, err := list.Exist(hash)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -322,7 +322,7 @@ func BenchmarkKVFileList_Exist(b *testing.B) {
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
_, existErr := list.Exist(stringutil.Md5(strconv.Itoa(rand.Int() % 2_000_000))) _, _, existErr := list.Exist(stringutil.Md5(strconv.Itoa(rand.Int() % 2_000_000)))
if existErr != nil { if existErr != nil {
b.Fatal(existErr) b.Fatal(existErr)
} }

View File

@@ -130,26 +130,26 @@ func (this *SQLiteFileList) Add(hash string, item *Item) error {
return nil return nil
} }
func (this *SQLiteFileList) Exist(hash string) (bool, error) { func (this *SQLiteFileList) Exist(hash string) (bool, int64, error) {
var db = this.GetDB(hash) var db = this.GetDB(hash)
if !db.IsReady() { if !db.IsReady() {
return false, nil return false, -1, nil
} }
// 如果Hash列表里不存在那么必然不存在 // 如果Hash列表里不存在那么必然不存在
if !db.hashMap.Exist(hash) { if !db.hashMap.Exist(hash) {
return false, nil return false, -1, nil
} }
var item = this.memoryCache.Read(hash) var item = this.memoryCache.Read(hash)
if item != nil { if item != nil {
return true, nil return true, -1, nil
} }
var row = db.existsByHashStmt.QueryRow(hash, time.Now().Unix()) var row = db.existsByHashStmt.QueryRow(hash, time.Now().Unix())
if row.Err() != nil { if row.Err() != nil {
return false, nil return false, -1, nil
} }
var expiredAt int64 var expiredAt int64
@@ -158,15 +158,15 @@ func (this *SQLiteFileList) Exist(hash string) (bool, error) {
if errors.Is(err, sql.ErrNoRows) { if errors.Is(err, sql.ErrNoRows) {
err = nil err = nil
} }
return false, err return false, -1, err
} }
if expiredAt < fasttime.Now().Unix() { if expiredAt <= fasttime.Now().Unix() {
return false, nil return false, -1, nil
} }
this.memoryCache.Write(hash, zero.Zero{}, this.maxExpiresAtForMemoryCache(expiredAt)) this.memoryCache.Write(hash, zero.Zero{}, this.maxExpiresAtForMemoryCache(expiredAt))
return true, nil return true, -1, nil
} }
func (this *SQLiteFileList) ExistQuick(hash string) (isReady bool, found bool) { func (this *SQLiteFileList) ExistQuick(hash string) (isReady bool, found bool) {

View File

@@ -140,7 +140,7 @@ func TestFileList_Exist(t *testing.T) {
}() }()
{ {
var hash = stringutil.Md5("123456") var hash = stringutil.Md5("123456")
exists, err := list.Exist(hash) exists, _, err := list.Exist(hash)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -148,7 +148,7 @@ func TestFileList_Exist(t *testing.T) {
} }
{ {
var hash = stringutil.Md5("http://edge.teaos.cn/1234561") var hash = stringutil.Md5("http://edge.teaos.cn/1234561")
exists, err := list.Exist(hash) exists, _, err := list.Exist(hash)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -208,7 +208,7 @@ func TestFileList_Exist_Many_DB(t *testing.T) {
countLocker.Unlock() countLocker.Unlock()
var list = listSlice[rands.Int(0, len(listSlice)-1)] var list = listSlice[rands.Int(0, len(listSlice)-1)]
_, _ = list.Exist(hash) _, _, _ = list.Exist(hash)
default: default:
return return
} }
@@ -442,6 +442,6 @@ func BenchmarkFileList_Exist(b *testing.B) {
} }
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
_, _ = list.Exist("f0eb5b87e0b0041f3917002c0707475f" + types.String(i)) _, _, _ = list.Exist("f0eb5b87e0b0041f3917002c0707475f" + types.String(i))
} }
} }

View File

@@ -13,7 +13,7 @@ type ListInterface interface {
Add(hash string, item *Item) error Add(hash string, item *Item) error
// Exist 检查内容是否存在 // Exist 检查内容是否存在
Exist(hash string) (bool, error) Exist(hash string) (ok bool, size int64, err error)
// CleanPrefix 清除某个前缀的缓存 // CleanPrefix 清除某个前缀的缓存
CleanPrefix(prefix string) error CleanPrefix(prefix string) error

View File

@@ -89,21 +89,21 @@ func (this *MemoryList) Add(hash string, item *Item) error {
return nil return nil
} }
func (this *MemoryList) Exist(hash string) (bool, error) { func (this *MemoryList) Exist(hash string) (bool, int64, error) {
this.locker.RLock() this.locker.RLock()
defer this.locker.RUnlock() defer this.locker.RUnlock()
prefix := this.prefix(hash) prefix := this.prefix(hash)
itemMap, ok := this.itemMaps[prefix] itemMap, ok := this.itemMaps[prefix]
if !ok { if !ok {
return false, nil return false, -1, nil
} }
item, ok := itemMap[hash] item, ok := itemMap[hash]
if !ok { if !ok {
return false, nil return false, -1, nil
} }
return !item.IsExpired(), nil return !item.IsExpired(), -1, nil
} }
// CleanPrefix 根据前缀进行清除 // CleanPrefix 根据前缀进行清除

View File

@@ -317,7 +317,7 @@ func BenchmarkMemoryList(b *testing.B) {
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
_, _ = list.Exist(types.String("a" + types.String(rands.Int(1, 10000)))) _, _, _ = list.Exist(types.String("a" + types.String(rands.Int(1, 10000))))
_ = list.Add("a"+types.String(rands.Int(1, 100000)), &caches.Item{}) _ = list.Add("a"+types.String(rands.Int(1, 100000)), &caches.Item{})
_, _ = list.Purge(1000, func(hash string) error { _, _ = list.Purge(1000, func(hash string) error {
return nil return nil

View File

@@ -6,23 +6,12 @@ package caches
import ( import (
"errors" "errors"
"io" "io"
"os"
) )
func IsValidForMMAP(fp *os.File) (ok bool, stat os.FileInfo) {
// stub
return
}
type MMAPFileReader struct { type MMAPFileReader struct {
FileReader FileReader
} }
func NewMMAPFileReader(fp *os.File, stat os.FileInfo) (*MMAPFileReader, error) {
// stub
return &MMAPFileReader{}, errors.New("not implemented")
}
func (this *MMAPFileReader) CopyBodyTo(writer io.Writer) (int, error) { func (this *MMAPFileReader) CopyBodyTo(writer io.Writer) (int, error) {
// stub // stub
return 0, errors.New("not implemented") return 0, errors.New("not implemented")

View File

@@ -304,7 +304,12 @@ func (this *FileStorage) Init() error {
} else if totalSize > 1*sizes.K { } else if totalSize > 1*sizes.K {
sizeMB = fmt.Sprintf("%.3f K", float64(totalSize)/float64(sizes.K)) sizeMB = fmt.Sprintf("%.3f K", float64(totalSize)/float64(sizes.K))
} }
remotelogs.Println("CACHE", "init policy "+types.String(this.policy.Id)+" from '"+this.options.Dir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, size: "+sizeMB)
var mmapTag = "disabled"
if this.options.EnableMMAP {
mmapTag = "enabled"
}
remotelogs.Println("CACHE", "init policy "+types.String(this.policy.Id)+" from '"+this.options.Dir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, size: "+sizeMB+", mmap: "+mmapTag)
}() }()
// 初始化list // 初始化list
@@ -360,14 +365,27 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
hash, path, _ := this.keyPath(key) hash, path, _ := this.keyPath(key)
// 检查文件记录是否已过期 // 检查文件记录是否已过期
var estimatedSize int64
if !useStale { if !useStale {
exists, err := this.list.Exist(hash) exists, filesize, err := this.list.Exist(hash)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if !exists { if !exists {
return nil, ErrNotFound return nil, ErrNotFound
} }
estimatedSize = filesize
}
// 尝试通过MMAP读取
if estimatedSize > 0 {
reader, err := this.tryMMAPReader(isPartial, estimatedSize, path)
if err != nil {
return nil, err
}
if reader != nil {
return reader, nil
}
} }
var isOk = false var isOk = false
@@ -404,22 +422,10 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
partialFileReader.openFileCache = openFileCache partialFileReader.openFileCache = openFileCache
reader = partialFileReader reader = partialFileReader
} else { } else {
var options = this.options // copy var fileReader = NewFileReader(fp)
if options != nil && options.EnableMMAP { fileReader.openFile = openFile
if isValid, stat := IsValidForMMAP(fp); isValid { fileReader.openFileCache = openFileCache
reader, err = NewMMAPFileReader(fp, stat) reader = fileReader
if err != nil {
return nil, err
}
}
}
if reader == nil {
var fileReader = NewFileReader(fp)
fileReader.openFile = openFile
fileReader.openFileCache = openFileCache
reader = fileReader
}
} }
err = reader.Init() err = reader.Init()
@@ -562,7 +568,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
var partialRanges *PartialRanges var partialRanges *PartialRanges
if isPartial { if isPartial {
// 数据库中是否存在 // 数据库中是否存在
existsCacheItem, _ := this.list.Exist(hash) existsCacheItem, _, _ := this.list.Exist(hash)
if existsCacheItem { if existsCacheItem {
readerFp, err := os.OpenFile(tmpPath, os.O_RDONLY, 0444) readerFp, err := os.OpenFile(tmpPath, os.O_RDONLY, 0444)
if err == nil { if err == nil {

View File

@@ -0,0 +1,9 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
//go:build !plus
package caches
func (this *FileStorage) tryMMAPReader(isPartial bool, estimatedSize int64, path string) (Reader, error) {
// stub
return nil, nil
}

View File

@@ -128,7 +128,7 @@ func (this *MemoryStorage) OpenReader(key string, useStale bool, isPartial bool)
var hash = this.hash(key) var hash = this.hash(key)
// check if exists in list // check if exists in list
exists, _ := this.list.Exist(types.String(hash)) exists, _, _ := this.list.Exist(types.String(hash))
if !exists { if !exists {
return nil, ErrNotFound return nil, ErrNotFound
} }
@@ -212,7 +212,7 @@ func (this *MemoryStorage) openWriter(key string, expiresAt int64, status int, h
item, ok := this.valuesMap[hash] item, ok := this.valuesMap[hash]
if ok && !item.IsExpired() { if ok && !item.IsExpired() {
var hashString = types.String(hash) var hashString = types.String(hash)
exists, _ := this.list.Exist(hashString) exists, _, _ := this.list.Exist(hashString)
if !exists { if !exists {
// remove from values map // remove from values map
delete(this.valuesMap, hash) delete(this.valuesMap, hash)
@@ -546,7 +546,7 @@ func (this *MemoryStorage) flushItem(key string) {
} }
// 检查是否在列表中防止未加入列表时就开始flush // 检查是否在列表中防止未加入列表时就开始flush
isInList, err := this.list.Exist(types.String(hash)) isInList, _, err := this.list.Exist(types.String(hash))
if err != nil { if err != nil {
remotelogs.Error("CACHE", "flush items failed: "+err.Error()) remotelogs.Error("CACHE", "flush items failed: "+err.Error())
return return