mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-12-03 06:50:24 +08:00
优化MMAP相关功能
This commit is contained in:
@@ -78,7 +78,7 @@ func (this *KVFileList) Add(hash string, item *Item) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Exist 检查内容是否存在
|
// Exist 检查内容是否存在
|
||||||
func (this *KVFileList) Exist(hash string) (bool, error) {
|
func (this *KVFileList) Exist(hash string) (bool, int64, error) {
|
||||||
return this.getStore(hash).ExistItem(hash)
|
return this.getStore(hash).ExistItem(hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -90,23 +90,23 @@ func (this *KVListFileStore) AddItem(hash string, item *Item) error {
|
|||||||
return this.itemsTable.Set(hash, item)
|
return this.itemsTable.Set(hash, item)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *KVListFileStore) ExistItem(hash string) (bool, error) {
|
func (this *KVListFileStore) ExistItem(hash string) (bool, int64, error) {
|
||||||
if !this.isReady() {
|
if !this.isReady() {
|
||||||
return false, nil
|
return false, -1, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
item, err := this.itemsTable.Get(hash)
|
item, err := this.itemsTable.Get(hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if kvstore.IsNotFound(err) {
|
if kvstore.IsNotFound(err) {
|
||||||
return false, nil
|
return false, -1, nil
|
||||||
}
|
}
|
||||||
return false, err
|
return false, -1, err
|
||||||
}
|
}
|
||||||
if item == nil {
|
if item == nil {
|
||||||
return false, nil
|
return false, -1, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return item.ExpiresAt >= fasttime.Now().Unix(), nil
|
return item.ExpiresAt > fasttime.Now().Unix(), item.HeaderSize + item.BodySize, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *KVListFileStore) ExistQuickItem(hash string) (bool, error) {
|
func (this *KVListFileStore) ExistQuickItem(hash string) (bool, error) {
|
||||||
|
|||||||
@@ -159,7 +159,7 @@ func TestKVFileList_Exist(t *testing.T) {
|
|||||||
stringutil.Md5("123456"),
|
stringutil.Md5("123456"),
|
||||||
stringutil.Md5("654321"),
|
stringutil.Md5("654321"),
|
||||||
} {
|
} {
|
||||||
b, err := list.Exist(hash)
|
b, _, err := list.Exist(hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -322,7 +322,7 @@ func BenchmarkKVFileList_Exist(b *testing.B) {
|
|||||||
|
|
||||||
b.RunParallel(func(pb *testing.PB) {
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
for pb.Next() {
|
for pb.Next() {
|
||||||
_, existErr := list.Exist(stringutil.Md5(strconv.Itoa(rand.Int() % 2_000_000)))
|
_, _, existErr := list.Exist(stringutil.Md5(strconv.Itoa(rand.Int() % 2_000_000)))
|
||||||
if existErr != nil {
|
if existErr != nil {
|
||||||
b.Fatal(existErr)
|
b.Fatal(existErr)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -130,26 +130,26 @@ func (this *SQLiteFileList) Add(hash string, item *Item) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *SQLiteFileList) Exist(hash string) (bool, error) {
|
func (this *SQLiteFileList) Exist(hash string) (bool, int64, error) {
|
||||||
var db = this.GetDB(hash)
|
var db = this.GetDB(hash)
|
||||||
|
|
||||||
if !db.IsReady() {
|
if !db.IsReady() {
|
||||||
return false, nil
|
return false, -1, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// 如果Hash列表里不存在,那么必然不存在
|
// 如果Hash列表里不存在,那么必然不存在
|
||||||
if !db.hashMap.Exist(hash) {
|
if !db.hashMap.Exist(hash) {
|
||||||
return false, nil
|
return false, -1, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var item = this.memoryCache.Read(hash)
|
var item = this.memoryCache.Read(hash)
|
||||||
if item != nil {
|
if item != nil {
|
||||||
return true, nil
|
return true, -1, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var row = db.existsByHashStmt.QueryRow(hash, time.Now().Unix())
|
var row = db.existsByHashStmt.QueryRow(hash, time.Now().Unix())
|
||||||
if row.Err() != nil {
|
if row.Err() != nil {
|
||||||
return false, nil
|
return false, -1, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var expiredAt int64
|
var expiredAt int64
|
||||||
@@ -158,15 +158,15 @@ func (this *SQLiteFileList) Exist(hash string) (bool, error) {
|
|||||||
if errors.Is(err, sql.ErrNoRows) {
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
err = nil
|
err = nil
|
||||||
}
|
}
|
||||||
return false, err
|
return false, -1, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if expiredAt < fasttime.Now().Unix() {
|
if expiredAt <= fasttime.Now().Unix() {
|
||||||
return false, nil
|
return false, -1, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
this.memoryCache.Write(hash, zero.Zero{}, this.maxExpiresAtForMemoryCache(expiredAt))
|
this.memoryCache.Write(hash, zero.Zero{}, this.maxExpiresAtForMemoryCache(expiredAt))
|
||||||
return true, nil
|
return true, -1, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *SQLiteFileList) ExistQuick(hash string) (isReady bool, found bool) {
|
func (this *SQLiteFileList) ExistQuick(hash string) (isReady bool, found bool) {
|
||||||
|
|||||||
@@ -140,7 +140,7 @@ func TestFileList_Exist(t *testing.T) {
|
|||||||
}()
|
}()
|
||||||
{
|
{
|
||||||
var hash = stringutil.Md5("123456")
|
var hash = stringutil.Md5("123456")
|
||||||
exists, err := list.Exist(hash)
|
exists, _, err := list.Exist(hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -148,7 +148,7 @@ func TestFileList_Exist(t *testing.T) {
|
|||||||
}
|
}
|
||||||
{
|
{
|
||||||
var hash = stringutil.Md5("http://edge.teaos.cn/1234561")
|
var hash = stringutil.Md5("http://edge.teaos.cn/1234561")
|
||||||
exists, err := list.Exist(hash)
|
exists, _, err := list.Exist(hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -208,7 +208,7 @@ func TestFileList_Exist_Many_DB(t *testing.T) {
|
|||||||
countLocker.Unlock()
|
countLocker.Unlock()
|
||||||
|
|
||||||
var list = listSlice[rands.Int(0, len(listSlice)-1)]
|
var list = listSlice[rands.Int(0, len(listSlice)-1)]
|
||||||
_, _ = list.Exist(hash)
|
_, _, _ = list.Exist(hash)
|
||||||
default:
|
default:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -442,6 +442,6 @@ func BenchmarkFileList_Exist(b *testing.B) {
|
|||||||
}
|
}
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
_, _ = list.Exist("f0eb5b87e0b0041f3917002c0707475f" + types.String(i))
|
_, _, _ = list.Exist("f0eb5b87e0b0041f3917002c0707475f" + types.String(i))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ type ListInterface interface {
|
|||||||
Add(hash string, item *Item) error
|
Add(hash string, item *Item) error
|
||||||
|
|
||||||
// Exist 检查内容是否存在
|
// Exist 检查内容是否存在
|
||||||
Exist(hash string) (bool, error)
|
Exist(hash string) (ok bool, size int64, err error)
|
||||||
|
|
||||||
// CleanPrefix 清除某个前缀的缓存
|
// CleanPrefix 清除某个前缀的缓存
|
||||||
CleanPrefix(prefix string) error
|
CleanPrefix(prefix string) error
|
||||||
|
|||||||
@@ -89,21 +89,21 @@ func (this *MemoryList) Add(hash string, item *Item) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *MemoryList) Exist(hash string) (bool, error) {
|
func (this *MemoryList) Exist(hash string) (bool, int64, error) {
|
||||||
this.locker.RLock()
|
this.locker.RLock()
|
||||||
defer this.locker.RUnlock()
|
defer this.locker.RUnlock()
|
||||||
|
|
||||||
prefix := this.prefix(hash)
|
prefix := this.prefix(hash)
|
||||||
itemMap, ok := this.itemMaps[prefix]
|
itemMap, ok := this.itemMaps[prefix]
|
||||||
if !ok {
|
if !ok {
|
||||||
return false, nil
|
return false, -1, nil
|
||||||
}
|
}
|
||||||
item, ok := itemMap[hash]
|
item, ok := itemMap[hash]
|
||||||
if !ok {
|
if !ok {
|
||||||
return false, nil
|
return false, -1, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return !item.IsExpired(), nil
|
return !item.IsExpired(), -1, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CleanPrefix 根据前缀进行清除
|
// CleanPrefix 根据前缀进行清除
|
||||||
|
|||||||
@@ -317,7 +317,7 @@ func BenchmarkMemoryList(b *testing.B) {
|
|||||||
|
|
||||||
b.RunParallel(func(pb *testing.PB) {
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
for pb.Next() {
|
for pb.Next() {
|
||||||
_, _ = list.Exist(types.String("a" + types.String(rands.Int(1, 10000))))
|
_, _, _ = list.Exist(types.String("a" + types.String(rands.Int(1, 10000))))
|
||||||
_ = list.Add("a"+types.String(rands.Int(1, 100000)), &caches.Item{})
|
_ = list.Add("a"+types.String(rands.Int(1, 100000)), &caches.Item{})
|
||||||
_, _ = list.Purge(1000, func(hash string) error {
|
_, _ = list.Purge(1000, func(hash string) error {
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -6,23 +6,12 @@ package caches
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func IsValidForMMAP(fp *os.File) (ok bool, stat os.FileInfo) {
|
|
||||||
// stub
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
type MMAPFileReader struct {
|
type MMAPFileReader struct {
|
||||||
FileReader
|
FileReader
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMMAPFileReader(fp *os.File, stat os.FileInfo) (*MMAPFileReader, error) {
|
|
||||||
// stub
|
|
||||||
return &MMAPFileReader{}, errors.New("not implemented")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (this *MMAPFileReader) CopyBodyTo(writer io.Writer) (int, error) {
|
func (this *MMAPFileReader) CopyBodyTo(writer io.Writer) (int, error) {
|
||||||
// stub
|
// stub
|
||||||
return 0, errors.New("not implemented")
|
return 0, errors.New("not implemented")
|
||||||
|
|||||||
@@ -304,7 +304,12 @@ func (this *FileStorage) Init() error {
|
|||||||
} else if totalSize > 1*sizes.K {
|
} else if totalSize > 1*sizes.K {
|
||||||
sizeMB = fmt.Sprintf("%.3f K", float64(totalSize)/float64(sizes.K))
|
sizeMB = fmt.Sprintf("%.3f K", float64(totalSize)/float64(sizes.K))
|
||||||
}
|
}
|
||||||
remotelogs.Println("CACHE", "init policy "+types.String(this.policy.Id)+" from '"+this.options.Dir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, size: "+sizeMB)
|
|
||||||
|
var mmapTag = "disabled"
|
||||||
|
if this.options.EnableMMAP {
|
||||||
|
mmapTag = "enabled"
|
||||||
|
}
|
||||||
|
remotelogs.Println("CACHE", "init policy "+types.String(this.policy.Id)+" from '"+this.options.Dir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, size: "+sizeMB+", mmap: "+mmapTag)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// 初始化list
|
// 初始化list
|
||||||
@@ -360,14 +365,27 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
|
|||||||
hash, path, _ := this.keyPath(key)
|
hash, path, _ := this.keyPath(key)
|
||||||
|
|
||||||
// 检查文件记录是否已过期
|
// 检查文件记录是否已过期
|
||||||
|
var estimatedSize int64
|
||||||
if !useStale {
|
if !useStale {
|
||||||
exists, err := this.list.Exist(hash)
|
exists, filesize, err := this.list.Exist(hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if !exists {
|
if !exists {
|
||||||
return nil, ErrNotFound
|
return nil, ErrNotFound
|
||||||
}
|
}
|
||||||
|
estimatedSize = filesize
|
||||||
|
}
|
||||||
|
|
||||||
|
// 尝试通过MMAP读取
|
||||||
|
if estimatedSize > 0 {
|
||||||
|
reader, err := this.tryMMAPReader(isPartial, estimatedSize, path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if reader != nil {
|
||||||
|
return reader, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var isOk = false
|
var isOk = false
|
||||||
@@ -404,22 +422,10 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
|
|||||||
partialFileReader.openFileCache = openFileCache
|
partialFileReader.openFileCache = openFileCache
|
||||||
reader = partialFileReader
|
reader = partialFileReader
|
||||||
} else {
|
} else {
|
||||||
var options = this.options // copy
|
var fileReader = NewFileReader(fp)
|
||||||
if options != nil && options.EnableMMAP {
|
fileReader.openFile = openFile
|
||||||
if isValid, stat := IsValidForMMAP(fp); isValid {
|
fileReader.openFileCache = openFileCache
|
||||||
reader, err = NewMMAPFileReader(fp, stat)
|
reader = fileReader
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if reader == nil {
|
|
||||||
var fileReader = NewFileReader(fp)
|
|
||||||
fileReader.openFile = openFile
|
|
||||||
fileReader.openFileCache = openFileCache
|
|
||||||
reader = fileReader
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err = reader.Init()
|
err = reader.Init()
|
||||||
@@ -562,7 +568,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
|
|||||||
var partialRanges *PartialRanges
|
var partialRanges *PartialRanges
|
||||||
if isPartial {
|
if isPartial {
|
||||||
// 数据库中是否存在
|
// 数据库中是否存在
|
||||||
existsCacheItem, _ := this.list.Exist(hash)
|
existsCacheItem, _, _ := this.list.Exist(hash)
|
||||||
if existsCacheItem {
|
if existsCacheItem {
|
||||||
readerFp, err := os.OpenFile(tmpPath, os.O_RDONLY, 0444)
|
readerFp, err := os.OpenFile(tmpPath, os.O_RDONLY, 0444)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
|||||||
9
internal/caches/storage_file_ext.go
Normal file
9
internal/caches/storage_file_ext.go
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||||
|
//go:build !plus
|
||||||
|
|
||||||
|
package caches
|
||||||
|
|
||||||
|
func (this *FileStorage) tryMMAPReader(isPartial bool, estimatedSize int64, path string) (Reader, error) {
|
||||||
|
// stub
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
@@ -128,7 +128,7 @@ func (this *MemoryStorage) OpenReader(key string, useStale bool, isPartial bool)
|
|||||||
var hash = this.hash(key)
|
var hash = this.hash(key)
|
||||||
|
|
||||||
// check if exists in list
|
// check if exists in list
|
||||||
exists, _ := this.list.Exist(types.String(hash))
|
exists, _, _ := this.list.Exist(types.String(hash))
|
||||||
if !exists {
|
if !exists {
|
||||||
return nil, ErrNotFound
|
return nil, ErrNotFound
|
||||||
}
|
}
|
||||||
@@ -212,7 +212,7 @@ func (this *MemoryStorage) openWriter(key string, expiresAt int64, status int, h
|
|||||||
item, ok := this.valuesMap[hash]
|
item, ok := this.valuesMap[hash]
|
||||||
if ok && !item.IsExpired() {
|
if ok && !item.IsExpired() {
|
||||||
var hashString = types.String(hash)
|
var hashString = types.String(hash)
|
||||||
exists, _ := this.list.Exist(hashString)
|
exists, _, _ := this.list.Exist(hashString)
|
||||||
if !exists {
|
if !exists {
|
||||||
// remove from values map
|
// remove from values map
|
||||||
delete(this.valuesMap, hash)
|
delete(this.valuesMap, hash)
|
||||||
@@ -546,7 +546,7 @@ func (this *MemoryStorage) flushItem(key string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 检查是否在列表中,防止未加入列表时就开始flush
|
// 检查是否在列表中,防止未加入列表时就开始flush
|
||||||
isInList, err := this.list.Exist(types.String(hash))
|
isInList, _, err := this.list.Exist(types.String(hash))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
remotelogs.Error("CACHE", "flush items failed: "+err.Error())
|
remotelogs.Error("CACHE", "flush items failed: "+err.Error())
|
||||||
return
|
return
|
||||||
|
|||||||
Reference in New Issue
Block a user