From 7f1f8f59f471275840ea9d20122df191a8b623c6 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Sat, 8 Jul 2023 18:52:57 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BC=93=E5=AD=98=E7=AD=96=E7=95=A5=E7=A7=BB?= =?UTF-8?q?=E9=99=A4=E2=80=9C=E5=AE=B9=E7=BA=B3Key=E6=95=B0=E9=87=8F?= =?UTF-8?q?=E2=80=9D=E9=80=89=E9=A1=B9=EF=BC=9B=E7=BC=93=E5=AD=98=E5=8D=A0?= =?UTF-8?q?=E7=94=A8=E7=A9=BA=E9=97=B4=E7=BB=9F=E8=AE=A1=E6=94=B9=E6=88=90?= =?UTF-8?q?=E7=BB=9F=E8=AE=A1=E7=BC=93=E5=AD=98=E7=9B=AE=E5=BD=95=E6=89=80?= =?UTF-8?q?=E5=9C=A8=E6=96=87=E4=BB=B6=E7=B3=BB=E7=BB=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/caches/manager_test.go | 17 +++---- internal/caches/storage_file.go | 84 +++++++++++-------------------- internal/caches/storage_memory.go | 9 +--- internal/utils/fs/stat.go | 81 +++++++++++++++++++++++++++++ internal/utils/fs/stat_test.go | 69 +++++++++++++++++++++++++ 5 files changed, 187 insertions(+), 73 deletions(-) create mode 100644 internal/utils/fs/stat.go create mode 100644 internal/utils/fs/stat_test.go diff --git a/internal/caches/manager_test.go b/internal/caches/manager_test.go index 6df8d41..d151d99 100644 --- a/internal/caches/manager_test.go +++ b/internal/caches/manager_test.go @@ -52,9 +52,8 @@ func TestManager_UpdatePolicies(t *testing.T) { }, }, { - Id: 2, - Type: serverconfigs.CachePolicyStorageFile, - MaxKeys: 1, + Id: 2, + Type: serverconfigs.CachePolicyStorageFile, Options: map[string]interface{}{ "dir": Tea.Root + "/caches", }, @@ -95,9 +94,9 @@ func TestManager_ChangePolicy_Memory(t *testing.T) { func TestManager_ChangePolicy_File(t *testing.T) { var policies = []*serverconfigs.HTTPCachePolicy{ { - Id: 1, - Type: serverconfigs.CachePolicyStorageFile, - Options: map[string]interface{}{ + Id: 1, + Type: serverconfigs.CachePolicyStorageFile, + Options: map[string]interface{}{ "dir": Tea.Root + "/data/cache-index/p1", }, Capacity: &shared.SizeCapacity{Count: 1, Unit: shared.SizeCapacityUnitGB}, @@ -106,9 +105,9 @@ func TestManager_ChangePolicy_File(t *testing.T) { SharedManager.UpdatePolicies(policies) SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{ { - Id: 1, - Type: serverconfigs.CachePolicyStorageFile, - Options: map[string]interface{}{ + Id: 1, + Type: serverconfigs.CachePolicyStorageFile, + Options: map[string]interface{}{ "dir": Tea.Root + "/data/cache-index/p1", }, Capacity: &shared.SizeCapacity{Count: 2, Unit: shared.SizeCapacityUnitGB}, diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index ece9f73..438992a 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -14,6 +14,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/utils" + fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" setutils "github.com/TeaOSLab/EdgeNode/internal/utils/sets" "github.com/TeaOSLab/EdgeNode/internal/utils/sizes" "github.com/TeaOSLab/EdgeNode/internal/zero" @@ -21,9 +22,6 @@ import ( "github.com/iwind/TeaGo/rands" "github.com/iwind/TeaGo/types" stringutil "github.com/iwind/TeaGo/utils/string" - "golang.org/x/sys/unix" - "golang.org/x/text/language" - "golang.org/x/text/message" "math" "os" "path/filepath" @@ -32,7 +30,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "syscall" "time" ) @@ -53,12 +50,12 @@ const ( ) const ( - FileStorageMaxIgnoreKeys = 32768 // 最大可忽略的键值数(尺寸过大的键值) - HotItemSize = 1024 // 热点数据数量 - HotItemLifeSeconds int64 = 3600 // 热点数据生命周期 - FileToMemoryMaxSize = 32 * sizes.M // 可以从文件写入到内存的最大文件尺寸 - FileTmpSuffix = ".tmp" - MinDiskSpace = 5 << 30 // 当前磁盘最小剩余空间 + FileStorageMaxIgnoreKeys = 32768 // 最大可忽略的键值数(尺寸过大的键值) + HotItemSize = 1024 // 热点数据数量 + HotItemLifeSeconds int64 = 3600 // 热点数据生命周期 + FileToMemoryMaxSize = 32 * sizes.M // 可以从文件写入到内存的最大文件尺寸 + FileTmpSuffix = ".tmp" + MinDiskSpace uint64 = 5 << 30 // 当前磁盘最小剩余空间 ) var sharedWritingFileKeyMap = map[string]zero.Zero{} // key => bool @@ -77,7 +74,6 @@ type FileStorage struct { policy *serverconfigs.HTTPCachePolicy options *serverconfigs.HTTPFileCacheStorage // 二级缓存 memoryStorage *MemoryStorage // 一级缓存 - totalSize int64 list ListInterface locker sync.RWMutex @@ -92,7 +88,6 @@ type FileStorage struct { openFileCache *OpenFileCache - mainDir string mainDiskIsFull bool subDirs []*FileDir @@ -255,19 +250,6 @@ func (this *FileStorage) Init() error { } list.(*FileList).SetOldDir(dir + "/p" + types.String(this.policy.Id)) this.list = list - stat, err := list.Stat(func(hash string) bool { - return true - }) - if err != nil { - return err - } - this.totalSize = stat.Size - this.list.OnAdd(func(item *Item) { - atomic.AddInt64(&this.totalSize, item.TotalSize()) - }) - this.list.OnRemove(func(item *Item) { - atomic.AddInt64(&this.totalSize, -item.TotalSize()) - }) // 检查目录是否存在 _, err = os.Stat(dir) @@ -284,19 +266,17 @@ func (this *FileStorage) Init() error { defer func() { // 统计 - var count = stat.Count - var size = stat.Size - + var totalSize = this.TotalDiskSize() var cost = time.Since(before).Seconds() * 1000 - sizeMB := strconv.FormatInt(size, 10) + " Bytes" - if size > 1*sizes.G { - sizeMB = fmt.Sprintf("%.3f G", float64(size)/float64(sizes.G)) - } else if size > 1*sizes.M { - sizeMB = fmt.Sprintf("%.3f M", float64(size)/float64(sizes.M)) - } else if size > 1*sizes.K { - sizeMB = fmt.Sprintf("%.3f K", float64(size)/float64(sizes.K)) + var sizeMB = types.String(totalSize) + " Bytes" + if totalSize > 1*sizes.G { + sizeMB = fmt.Sprintf("%.3f G", float64(totalSize)/float64(sizes.G)) + } else if totalSize > 1*sizes.M { + sizeMB = fmt.Sprintf("%.3f M", float64(totalSize)/float64(sizes.M)) + } else if totalSize > 1*sizes.K { + sizeMB = fmt.Sprintf("%.3f K", float64(totalSize)/float64(sizes.K)) } - remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+" from '"+this.options.Dir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+message.NewPrinter(language.English).Sprintf("%d", count)+", size: "+sizeMB) + remotelogs.Println("CACHE", "init policy "+types.String(this.policy.Id)+" from '"+this.options.Dir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, size: "+sizeMB) }() // 初始化list @@ -480,17 +460,10 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea } }() - // 检查是否超出最大值 - count, err := this.list.Count() - if err != nil { - return nil, err - } - if this.policy.MaxKeys > 0 && count > this.policy.MaxKeys { - return nil, NewCapacityError("write file cache failed: too many keys in cache storage") - } + // 检查是否超出容量 var capacityBytes = this.diskCapacityBytes() - if capacityBytes > 0 && capacityBytes <= this.totalSize { - return nil, NewCapacityError("write file cache failed: over disk size, current total size: " + strconv.FormatInt(this.totalSize, 10) + " bytes, capacity: " + strconv.FormatInt(capacityBytes, 10)) + if capacityBytes > 0 && capacityBytes <= this.TotalDiskSize() { + return nil, NewCapacityError("write file cache failed: over disk size, current total size: " + types.String(this.TotalDiskSize()) + " bytes, capacity: " + types.String(capacityBytes)) } var hash = stringutil.Md5(key) @@ -924,7 +897,11 @@ func (this *FileStorage) Stop() { // TotalDiskSize 消耗的磁盘尺寸 func (this *FileStorage) TotalDiskSize() int64 { - return atomic.LoadInt64(&this.totalSize) + stat, err := fsutils.StatCache(this.options.Dir) + if err == nil { + return int64(stat.UsedSize()) + } + return 0 } // TotalMemorySize 内存尺寸 @@ -1364,7 +1341,6 @@ func (this *FileStorage) createMemoryStorage() error { Name: this.policy.Name, Description: this.policy.Description, Capacity: this.options.MemoryPolicy.Capacity, - MaxKeys: this.policy.MaxKeys, MaxSize: &shared.SizeCapacity{Count: 128, Unit: shared.SizeCapacityUnitMB}, // TODO 将来可以修改 Type: serverconfigs.CachePolicyStorageMemory, Options: this.policy.Options, @@ -1440,20 +1416,16 @@ func (this *FileStorage) runMemoryStorageSafety(f func(memoryStorage *MemoryStor // 检查磁盘剩余空间 func (this *FileStorage) checkDiskSpace() { if this.options != nil && len(this.options.Dir) > 0 { - var stat unix.Statfs_t - err := unix.Statfs(this.options.Dir, &stat) + stat, err := fsutils.Stat(this.options.Dir) if err == nil { - var availableBytes = stat.Bavail * uint64(stat.Bsize) - this.mainDiskIsFull = availableBytes < MinDiskSpace + this.mainDiskIsFull = stat.AvailableSize() < MinDiskSpace } } var subDirs = this.subDirs // copy slice for _, subDir := range subDirs { - var stat unix.Statfs_t - err := unix.Statfs(subDir.Path, &stat) + stat, err := fsutils.Stat(subDir.Path) if err == nil { - var availableBytes = stat.Bavail * uint64(stat.Bsize) - subDir.IsFull = availableBytes < MinDiskSpace + subDir.IsFull = stat.AvailableSize() < MinDiskSpace } } } diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index 080e846..6bbe960 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -200,13 +200,6 @@ func (this *MemoryStorage) openWriter(key string, expiresAt int64, status int, h } // 检查是否超出最大值 - totalKeys, err := this.list.Count() - if err != nil { - return nil, err - } - if this.policy.MaxKeys > 0 && totalKeys > this.policy.MaxKeys { - return nil, NewCapacityError("write memory cache failed: too many keys in cache storage") - } capacityBytes := this.memoryCapacityBytes() if bodySize < 0 { bodySize = 0 @@ -216,7 +209,7 @@ func (this *MemoryStorage) openWriter(key string, expiresAt int64, status int, h } // 先删除 - err = this.deleteWithoutLocker(key) + err := this.deleteWithoutLocker(key) if err != nil { return nil, err } diff --git a/internal/utils/fs/stat.go b/internal/utils/fs/stat.go new file mode 100644 index 0000000..9eaac30 --- /dev/null +++ b/internal/utils/fs/stat.go @@ -0,0 +1,81 @@ +// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package fsutils + +import ( + "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" + "golang.org/x/sys/unix" + "sync" +) + +// Stat device contains the path +func Stat(path string) (*StatResult, error) { + var stat = &unix.Statfs_t{} + err := unix.Statfs(path, stat) + if err != nil { + return nil, err + } + return NewStatResult(stat), nil +} + +var locker = &sync.RWMutex{} +var cacheMap = map[string]*StatResult{} // path => StatResult + +const cacheLife = 3 // seconds + +// StatCache stat device with cache +func StatCache(path string) (*StatResult, error) { + locker.RLock() + stat, ok := cacheMap[path] + if ok && stat.updatedAt >= fasttime.Now().Unix()-cacheLife { + locker.RUnlock() + return stat, nil + } + locker.RUnlock() + + locker.Lock() + defer locker.Unlock() + + stat, err := Stat(path) + if err != nil { + return nil, err + } + + cacheMap[path] = stat + return stat, nil +} + +type StatResult struct { + rawStat *unix.Statfs_t + blockSize uint64 + + updatedAt int64 +} + +func NewStatResult(rawStat *unix.Statfs_t) *StatResult { + var blockSize = rawStat.Bsize + if blockSize < 0 { + blockSize = 0 + } + + return &StatResult{ + rawStat: rawStat, + blockSize: uint64(blockSize), + updatedAt: fasttime.Now().Unix(), + } +} + +func (this *StatResult) AvailableSize() uint64 { + return this.rawStat.Bavail * this.blockSize +} + +func (this *StatResult) TotalSize() uint64 { + return this.rawStat.Blocks * this.blockSize +} + +func (this *StatResult) UsedSize() uint64 { + if this.rawStat.Bavail <= this.rawStat.Blocks { + return (this.rawStat.Blocks - this.rawStat.Bavail) * this.blockSize + } + return 0 +} diff --git a/internal/utils/fs/stat_test.go b/internal/utils/fs/stat_test.go new file mode 100644 index 0000000..e9f36b0 --- /dev/null +++ b/internal/utils/fs/stat_test.go @@ -0,0 +1,69 @@ +// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package fsutils_test + +import ( + fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" + "sync" + "testing" + "time" +) + +func TestStat(t *testing.T) { + stat, err := fsutils.Stat("/usr/local") + if err != nil { + t.Fatal(err) + } + t.Log("available:", stat.AvailableSize()/(1<<30), "total:", stat.TotalSize()/(1<<30), "used:", stat.UsedSize()/(1<<30)) +} + +func TestStatCache(t *testing.T) { + for i := 0; i < 10; i++ { + stat, err := fsutils.StatCache("/usr/local") + if err != nil { + t.Fatal(err) + } + t.Log("available:", stat.AvailableSize()/(1<<30), "total:", stat.TotalSize()/(1<<30), "used:", stat.UsedSize()/(1<<30)) + } +} + +func TestConcurrent(t *testing.T) { + var before = time.Now() + defer func() { + t.Log(time.Since(before).Seconds()*1000, "ms") + }() + + var count = 10000 + var wg = sync.WaitGroup{} + wg.Add(count) + for i := 0; i < count; i++ { + go func() { + defer wg.Done() + + _, _ = fsutils.Stat("/usr/local") + }() + } + wg.Wait() +} + +func BenchmarkStat(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, err := fsutils.Stat("/usr/local") + if err != nil { + b.Fatal(err) + } + } + }) +} + +func BenchmarkStatCache(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, err := fsutils.StatCache("/usr/local") + if err != nil { + b.Fatal(err) + } + } + }) +}