缓存策略移除“容纳Key数量”选项;缓存占用空间统计改成统计缓存目录所在文件系统

This commit is contained in:
GoEdgeLab
2023-07-08 18:52:57 +08:00
parent ad108fbd2a
commit 7f1f8f59f4
5 changed files with 187 additions and 73 deletions

View File

@@ -52,9 +52,8 @@ func TestManager_UpdatePolicies(t *testing.T) {
},
},
{
Id: 2,
Type: serverconfigs.CachePolicyStorageFile,
MaxKeys: 1,
Id: 2,
Type: serverconfigs.CachePolicyStorageFile,
Options: map[string]interface{}{
"dir": Tea.Root + "/caches",
},
@@ -95,9 +94,9 @@ func TestManager_ChangePolicy_Memory(t *testing.T) {
func TestManager_ChangePolicy_File(t *testing.T) {
var policies = []*serverconfigs.HTTPCachePolicy{
{
Id: 1,
Type: serverconfigs.CachePolicyStorageFile,
Options: map[string]interface{}{
Id: 1,
Type: serverconfigs.CachePolicyStorageFile,
Options: map[string]interface{}{
"dir": Tea.Root + "/data/cache-index/p1",
},
Capacity: &shared.SizeCapacity{Count: 1, Unit: shared.SizeCapacityUnitGB},
@@ -106,9 +105,9 @@ func TestManager_ChangePolicy_File(t *testing.T) {
SharedManager.UpdatePolicies(policies)
SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{
{
Id: 1,
Type: serverconfigs.CachePolicyStorageFile,
Options: map[string]interface{}{
Id: 1,
Type: serverconfigs.CachePolicyStorageFile,
Options: map[string]interface{}{
"dir": Tea.Root + "/data/cache-index/p1",
},
Capacity: &shared.SizeCapacity{Count: 2, Unit: shared.SizeCapacityUnitGB},

View File

@@ -14,6 +14,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/TeaOSLab/EdgeNode/internal/utils"
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
setutils "github.com/TeaOSLab/EdgeNode/internal/utils/sets"
"github.com/TeaOSLab/EdgeNode/internal/utils/sizes"
"github.com/TeaOSLab/EdgeNode/internal/zero"
@@ -21,9 +22,6 @@ import (
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
stringutil "github.com/iwind/TeaGo/utils/string"
"golang.org/x/sys/unix"
"golang.org/x/text/language"
"golang.org/x/text/message"
"math"
"os"
"path/filepath"
@@ -32,7 +30,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
)
@@ -53,12 +50,12 @@ const (
)
const (
FileStorageMaxIgnoreKeys = 32768 // 最大可忽略的键值数(尺寸过大的键值)
HotItemSize = 1024 // 热点数据数量
HotItemLifeSeconds int64 = 3600 // 热点数据生命周期
FileToMemoryMaxSize = 32 * sizes.M // 可以从文件写入到内存的最大文件尺寸
FileTmpSuffix = ".tmp"
MinDiskSpace = 5 << 30 // 当前磁盘最小剩余空间
FileStorageMaxIgnoreKeys = 32768 // 最大可忽略的键值数(尺寸过大的键值)
HotItemSize = 1024 // 热点数据数量
HotItemLifeSeconds int64 = 3600 // 热点数据生命周期
FileToMemoryMaxSize = 32 * sizes.M // 可以从文件写入到内存的最大文件尺寸
FileTmpSuffix = ".tmp"
MinDiskSpace uint64 = 5 << 30 // 当前磁盘最小剩余空间
)
var sharedWritingFileKeyMap = map[string]zero.Zero{} // key => bool
@@ -77,7 +74,6 @@ type FileStorage struct {
policy *serverconfigs.HTTPCachePolicy
options *serverconfigs.HTTPFileCacheStorage // 二级缓存
memoryStorage *MemoryStorage // 一级缓存
totalSize int64
list ListInterface
locker sync.RWMutex
@@ -92,7 +88,6 @@ type FileStorage struct {
openFileCache *OpenFileCache
mainDir string
mainDiskIsFull bool
subDirs []*FileDir
@@ -255,19 +250,6 @@ func (this *FileStorage) Init() error {
}
list.(*FileList).SetOldDir(dir + "/p" + types.String(this.policy.Id))
this.list = list
stat, err := list.Stat(func(hash string) bool {
return true
})
if err != nil {
return err
}
this.totalSize = stat.Size
this.list.OnAdd(func(item *Item) {
atomic.AddInt64(&this.totalSize, item.TotalSize())
})
this.list.OnRemove(func(item *Item) {
atomic.AddInt64(&this.totalSize, -item.TotalSize())
})
// 检查目录是否存在
_, err = os.Stat(dir)
@@ -284,19 +266,17 @@ func (this *FileStorage) Init() error {
defer func() {
// 统计
var count = stat.Count
var size = stat.Size
var totalSize = this.TotalDiskSize()
var cost = time.Since(before).Seconds() * 1000
sizeMB := strconv.FormatInt(size, 10) + " Bytes"
if size > 1*sizes.G {
sizeMB = fmt.Sprintf("%.3f G", float64(size)/float64(sizes.G))
} else if size > 1*sizes.M {
sizeMB = fmt.Sprintf("%.3f M", float64(size)/float64(sizes.M))
} else if size > 1*sizes.K {
sizeMB = fmt.Sprintf("%.3f K", float64(size)/float64(sizes.K))
var sizeMB = types.String(totalSize) + " Bytes"
if totalSize > 1*sizes.G {
sizeMB = fmt.Sprintf("%.3f G", float64(totalSize)/float64(sizes.G))
} else if totalSize > 1*sizes.M {
sizeMB = fmt.Sprintf("%.3f M", float64(totalSize)/float64(sizes.M))
} else if totalSize > 1*sizes.K {
sizeMB = fmt.Sprintf("%.3f K", float64(totalSize)/float64(sizes.K))
}
remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+" from '"+this.options.Dir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+message.NewPrinter(language.English).Sprintf("%d", count)+", size: "+sizeMB)
remotelogs.Println("CACHE", "init policy "+types.String(this.policy.Id)+" from '"+this.options.Dir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, size: "+sizeMB)
}()
// 初始化list
@@ -480,17 +460,10 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
}
}()
// 检查是否超出最大值
count, err := this.list.Count()
if err != nil {
return nil, err
}
if this.policy.MaxKeys > 0 && count > this.policy.MaxKeys {
return nil, NewCapacityError("write file cache failed: too many keys in cache storage")
}
// 检查是否超出容量
var capacityBytes = this.diskCapacityBytes()
if capacityBytes > 0 && capacityBytes <= this.totalSize {
return nil, NewCapacityError("write file cache failed: over disk size, current total size: " + strconv.FormatInt(this.totalSize, 10) + " bytes, capacity: " + strconv.FormatInt(capacityBytes, 10))
if capacityBytes > 0 && capacityBytes <= this.TotalDiskSize() {
return nil, NewCapacityError("write file cache failed: over disk size, current total size: " + types.String(this.TotalDiskSize()) + " bytes, capacity: " + types.String(capacityBytes))
}
var hash = stringutil.Md5(key)
@@ -924,7 +897,11 @@ func (this *FileStorage) Stop() {
// TotalDiskSize 消耗的磁盘尺寸
func (this *FileStorage) TotalDiskSize() int64 {
return atomic.LoadInt64(&this.totalSize)
stat, err := fsutils.StatCache(this.options.Dir)
if err == nil {
return int64(stat.UsedSize())
}
return 0
}
// TotalMemorySize 内存尺寸
@@ -1364,7 +1341,6 @@ func (this *FileStorage) createMemoryStorage() error {
Name: this.policy.Name,
Description: this.policy.Description,
Capacity: this.options.MemoryPolicy.Capacity,
MaxKeys: this.policy.MaxKeys,
MaxSize: &shared.SizeCapacity{Count: 128, Unit: shared.SizeCapacityUnitMB}, // TODO 将来可以修改
Type: serverconfigs.CachePolicyStorageMemory,
Options: this.policy.Options,
@@ -1440,20 +1416,16 @@ func (this *FileStorage) runMemoryStorageSafety(f func(memoryStorage *MemoryStor
// 检查磁盘剩余空间
func (this *FileStorage) checkDiskSpace() {
if this.options != nil && len(this.options.Dir) > 0 {
var stat unix.Statfs_t
err := unix.Statfs(this.options.Dir, &stat)
stat, err := fsutils.Stat(this.options.Dir)
if err == nil {
var availableBytes = stat.Bavail * uint64(stat.Bsize)
this.mainDiskIsFull = availableBytes < MinDiskSpace
this.mainDiskIsFull = stat.AvailableSize() < MinDiskSpace
}
}
var subDirs = this.subDirs // copy slice
for _, subDir := range subDirs {
var stat unix.Statfs_t
err := unix.Statfs(subDir.Path, &stat)
stat, err := fsutils.Stat(subDir.Path)
if err == nil {
var availableBytes = stat.Bavail * uint64(stat.Bsize)
subDir.IsFull = availableBytes < MinDiskSpace
subDir.IsFull = stat.AvailableSize() < MinDiskSpace
}
}
}

View File

@@ -200,13 +200,6 @@ func (this *MemoryStorage) openWriter(key string, expiresAt int64, status int, h
}
// 检查是否超出最大值
totalKeys, err := this.list.Count()
if err != nil {
return nil, err
}
if this.policy.MaxKeys > 0 && totalKeys > this.policy.MaxKeys {
return nil, NewCapacityError("write memory cache failed: too many keys in cache storage")
}
capacityBytes := this.memoryCapacityBytes()
if bodySize < 0 {
bodySize = 0
@@ -216,7 +209,7 @@ func (this *MemoryStorage) openWriter(key string, expiresAt int64, status int, h
}
// 先删除
err = this.deleteWithoutLocker(key)
err := this.deleteWithoutLocker(key)
if err != nil {
return nil, err
}

81
internal/utils/fs/stat.go Normal file
View File

@@ -0,0 +1,81 @@
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package fsutils
import (
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
"golang.org/x/sys/unix"
"sync"
)
// Stat device contains the path
func Stat(path string) (*StatResult, error) {
var stat = &unix.Statfs_t{}
err := unix.Statfs(path, stat)
if err != nil {
return nil, err
}
return NewStatResult(stat), nil
}
var locker = &sync.RWMutex{}
var cacheMap = map[string]*StatResult{} // path => StatResult
const cacheLife = 3 // seconds
// StatCache stat device with cache
func StatCache(path string) (*StatResult, error) {
locker.RLock()
stat, ok := cacheMap[path]
if ok && stat.updatedAt >= fasttime.Now().Unix()-cacheLife {
locker.RUnlock()
return stat, nil
}
locker.RUnlock()
locker.Lock()
defer locker.Unlock()
stat, err := Stat(path)
if err != nil {
return nil, err
}
cacheMap[path] = stat
return stat, nil
}
type StatResult struct {
rawStat *unix.Statfs_t
blockSize uint64
updatedAt int64
}
func NewStatResult(rawStat *unix.Statfs_t) *StatResult {
var blockSize = rawStat.Bsize
if blockSize < 0 {
blockSize = 0
}
return &StatResult{
rawStat: rawStat,
blockSize: uint64(blockSize),
updatedAt: fasttime.Now().Unix(),
}
}
func (this *StatResult) AvailableSize() uint64 {
return this.rawStat.Bavail * this.blockSize
}
func (this *StatResult) TotalSize() uint64 {
return this.rawStat.Blocks * this.blockSize
}
func (this *StatResult) UsedSize() uint64 {
if this.rawStat.Bavail <= this.rawStat.Blocks {
return (this.rawStat.Blocks - this.rawStat.Bavail) * this.blockSize
}
return 0
}

View File

@@ -0,0 +1,69 @@
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package fsutils_test
import (
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
"sync"
"testing"
"time"
)
func TestStat(t *testing.T) {
stat, err := fsutils.Stat("/usr/local")
if err != nil {
t.Fatal(err)
}
t.Log("available:", stat.AvailableSize()/(1<<30), "total:", stat.TotalSize()/(1<<30), "used:", stat.UsedSize()/(1<<30))
}
func TestStatCache(t *testing.T) {
for i := 0; i < 10; i++ {
stat, err := fsutils.StatCache("/usr/local")
if err != nil {
t.Fatal(err)
}
t.Log("available:", stat.AvailableSize()/(1<<30), "total:", stat.TotalSize()/(1<<30), "used:", stat.UsedSize()/(1<<30))
}
}
func TestConcurrent(t *testing.T) {
var before = time.Now()
defer func() {
t.Log(time.Since(before).Seconds()*1000, "ms")
}()
var count = 10000
var wg = sync.WaitGroup{}
wg.Add(count)
for i := 0; i < count; i++ {
go func() {
defer wg.Done()
_, _ = fsutils.Stat("/usr/local")
}()
}
wg.Wait()
}
func BenchmarkStat(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := fsutils.Stat("/usr/local")
if err != nil {
b.Fatal(err)
}
}
})
}
func BenchmarkStatCache(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := fsutils.StatCache("/usr/local")
if err != nil {
b.Fatal(err)
}
}
})
}