From b3c55f1c404352896e95c63bdd98fbc3dfe00c34 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Tue, 15 Nov 2022 20:42:25 +0800 Subject: [PATCH] =?UTF-8?q?=E8=BE=B9=E7=BC=98=E8=8A=82=E7=82=B9=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E8=AE=BE=E7=BD=AE=E5=A4=9A=E4=B8=AA=E7=BC=93=E5=AD=98?= =?UTF-8?q?=E7=9B=AE=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/caches/file_dir.go | 11 ++ internal/caches/manager.go | 9 +- internal/caches/storage_file.go | 218 +++++++++++++++++++++++--------- internal/nodes/node.go | 15 ++- 4 files changed, 185 insertions(+), 68 deletions(-) create mode 100644 internal/caches/file_dir.go diff --git a/internal/caches/file_dir.go b/internal/caches/file_dir.go new file mode 100644 index 0000000..907aac4 --- /dev/null +++ b/internal/caches/file_dir.go @@ -0,0 +1,11 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package caches + +import "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared" + +type FileDir struct { + Path string + Capacity *shared.SizeCapacity + IsFull bool +} diff --git a/internal/caches/manager.go b/internal/caches/manager.go index 56539e9..75b68f9 100644 --- a/internal/caches/manager.go +++ b/internal/caches/manager.go @@ -24,7 +24,8 @@ func init() { type Manager struct { // 全局配置 MaxDiskCapacity *shared.SizeCapacity - DiskDir string + MainDiskDir string + SubDiskDirs []*serverconfigs.CacheDir MaxMemoryCapacity *shared.SizeCapacity policyMap map[int64]*serverconfigs.HTTPCachePolicy // policyId => []*Policy @@ -47,12 +48,10 @@ func (this *Manager) UpdatePolicies(newPolicies []*serverconfigs.HTTPCachePolicy this.locker.Lock() defer this.locker.Unlock() - newPolicyIds := []int64{} + var newPolicyIds = []int64{} for _, policy := range newPolicies { // 使用节点单独的缓存目录 - if len(this.DiskDir) > 0 { - policy.UpdateDiskDir(this.DiskDir) - } + policy.UpdateDiskDir(this.MainDiskDir, this.SubDiskDirs) newPolicyIds = append(newPolicyIds, policy.Id) } diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 60dc9d9..d691418 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -93,7 +93,10 @@ type FileStorage struct { openFileCache *OpenFileCache - diskIsFull bool + mainDir string + mainDiskIsFull bool + + subDirs []*FileDir } func NewFileStorage(policy *serverconfigs.HTTPCachePolicy) *FileStorage { @@ -157,6 +160,16 @@ func (this *FileStorage) UpdatePolicy(newPolicy *serverconfigs.HTTPCachePolicy) return } + var subDirs = []*FileDir{} + for _, subDir := range newOptions.SubDirs { + subDirs = append(subDirs, &FileDir{ + Path: subDir.Path, + Capacity: subDir.Capacity, + IsFull: false, + }) + } + this.checkDiskSpace() + err = newOptions.Init() if err != nil { remotelogs.Error("CACHE", "update policy '"+types.String(this.policy.Id)+"' failed: init options failed: "+err.Error()) @@ -219,6 +232,19 @@ func (this *FileStorage) Init() error { this.options.Dir = filepath.Clean(this.options.Dir) var dir = this.options.Dir + var subDirs = []*FileDir{} + for _, subDir := range this.options.SubDirs { + subDirs = append(subDirs, &FileDir{ + Path: subDir.Path, + Capacity: subDir.Capacity, + IsFull: false, + }) + } + this.subDirs = subDirs + if len(subDirs) > 0 { + this.checkDiskSpace() + } + if len(dir) == 0 { return errors.New("[CACHE]cache storage dir can not be empty") } @@ -321,7 +347,7 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool, } } - hash, path := this.keyPath(key) + hash, path, _ := this.keyPath(key) // 检查文件记录是否已过期 if !useStale { @@ -404,11 +430,6 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz return nil, ErrWritingUnavailable } - // 当前磁盘可用容量是否严重不足 - if this.diskIsFull { - return nil, NewCapacityError("the disk is full") - } - // 是否已忽略 if this.ignoreKeys.Has(key) { return nil, ErrEntityTooLarge @@ -475,8 +496,12 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz var hash = stringutil.Md5(key) + dir, diskIsFull := this.subDir(hash) + if diskIsFull { + return nil, NewCapacityError("the disk is full") + } + // TODO 可以只stat一次 - var dir = this.options.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4] _, err = os.Stat(dir) if err != nil { if !os.IsNotExist(err) { @@ -680,7 +705,7 @@ func (this *FileStorage) Delete(key string) error { _ = memoryStorage.Delete(key) }) - hash, path := this.keyPath(key) + hash, path, _ := this.keyPath(key) err := this.list.Remove(hash) if err != nil { return err @@ -720,56 +745,67 @@ func (this *FileStorage) CleanAll() error { // 删除缓存和目录 // 不能直接删除子目录,比较危险 - dir := this.dir() - fp, err := os.Open(dir) - if err != nil { - return err - } - defer func() { - _ = fp.Close() - }() - stat, err := fp.Stat() - if err != nil { - return err + var rootDirs = []string{this.options.Dir} + var subDirs = this.subDirs // copy slice + if len(subDirs) > 0 { + for _, subDir := range subDirs { + rootDirs = append(rootDirs, subDir.Path) + } } - if !stat.IsDir() { - return nil - } - - // 改成待删除 - subDirs, err := fp.Readdir(-1) - if err != nil { - return err - } - for _, info := range subDirs { - subDir := info.Name() - - // 检查目录名 - ok, err := regexp.MatchString(`^[0-9a-f]{2}$`, subDir) + for _, rootDir := range rootDirs { + var dir = rootDir + "/p" + types.String(this.policy.Id) + fp, err := os.Open(dir) if err != nil { return err } - if !ok { - continue - } + defer func() { + _ = fp.Close() + }() - // 修改目录名 - tmpDir := dir + "/" + subDir + "-deleted" - err = os.Rename(dir+"/"+subDir, tmpDir) + stat, err := fp.Stat() if err != nil { return err } - } - // 重新遍历待删除 - goman.New(func() { - err = this.cleanDeletedDirs(dir) - if err != nil { - remotelogs.Warn("CACHE", "delete '*-deleted' dirs failed: "+err.Error()) + if !stat.IsDir() { + return nil } - }) + + // 改成待删除 + subDirs, err := fp.Readdir(-1) + if err != nil { + return err + } + for _, info := range subDirs { + subDir := info.Name() + + // 检查目录名 + ok, err := regexp.MatchString(`^[0-9a-f]{2}$`, subDir) + if err != nil { + return err + } + if !ok { + continue + } + + // 修改目录名 + tmpDir := dir + "/" + subDir + "-deleted" + err = os.Rename(dir+"/"+subDir, tmpDir) + if err != nil { + return err + } + } + + // 重新遍历待删除 + goman.New(func() { + err = this.cleanDeletedDirs(dir) + if err != nil { + remotelogs.Warn("CACHE", "delete '*-deleted' dirs failed: "+err.Error()) + } + }) + } return nil } @@ -802,7 +838,7 @@ func (this *FileStorage) Purge(keys []string, urlType string) error { // URL for _, key := range keys { - hash, path := this.keyPath(key) + hash, path, _ := this.keyPath(key) err := this.removeCacheFile(path) if err != nil && !os.IsNotExist(err) { return err @@ -873,25 +909,22 @@ func (this *FileStorage) CanSendfile() bool { return this.options.EnableSendfile } -// 绝对路径 -func (this *FileStorage) dir() string { - return this.options.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" -} - // 获取Key对应的文件路径 -func (this *FileStorage) keyPath(key string) (hash string, path string) { +func (this *FileStorage) keyPath(key string) (hash string, path string, diskIsFull bool) { hash = stringutil.Md5(key) - dir := this.options.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4] + var dir string + dir, diskIsFull = this.subDir(hash) path = dir + "/" + hash + ".cache" return } // 获取Hash对应的文件路径 -func (this *FileStorage) hashPath(hash string) (path string) { +func (this *FileStorage) hashPath(hash string) (path string, diskIsFull bool) { if len(hash) != 32 { - return "" + return "", false } - dir := this.options.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4] + var dir string + dir, diskIsFull = this.subDir(hash) path = dir + "/" + hash + ".cache" return } @@ -949,6 +982,7 @@ func (this *FileStorage) initList() error { } // 清理任务 +// TODO purge每个分区 func (this *FileStorage) purgeLoop() { // 检查磁盘剩余空间 this.checkDiskSpace() @@ -960,7 +994,19 @@ func (this *FileStorage) purgeLoop() { if lfuFreePercent <= 0 { lfuFreePercent = 5 } - if this.diskIsFull { + + var hasFullDisk = this.mainDiskIsFull + if !hasFullDisk { + var subDirs = this.subDirs // copy slice + for _, subDir := range subDirs { + if subDir.IsFull { + hasFullDisk = true + break + } + } + } + + if hasFullDisk { startLFU = true } else { var usedPercent = float32(this.TotalDiskSize()*100) / float32(capacityBytes) @@ -993,7 +1039,7 @@ func (this *FileStorage) purgeLoop() { } for i := 0; i < times; i++ { countFound, err := this.list.Purge(purgeCount, func(hash string) error { - path := this.hashPath(hash) + path, _ := this.hashPath(hash) err := this.removeCacheFile(path) if err != nil && !os.IsNotExist(err) { remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error()) @@ -1027,7 +1073,7 @@ func (this *FileStorage) purgeLoop() { remotelogs.Println("CACHE", "LFU purge policy '"+this.policy.Name+"' id: "+types.String(this.policy.Id)+", count: "+types.String(count)) err := this.list.PurgeLFU(count, func(hash string) error { - path := this.hashPath(hash) + path, _ := this.hashPath(hash) err := this.removeCacheFile(path) if err != nil && !os.IsNotExist(err) { remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error()) @@ -1354,7 +1400,55 @@ func (this *FileStorage) checkDiskSpace() { err := unix.Statfs(this.options.Dir, &stat) if err == nil { var availableBytes = stat.Bavail * uint64(stat.Bsize) - this.diskIsFull = availableBytes < MinDiskSpace + this.mainDiskIsFull = availableBytes < MinDiskSpace + } + } + var subDirs = this.subDirs // copy slice + for _, subDir := range subDirs { + var stat unix.Statfs_t + err := unix.Statfs(subDir.Path, &stat) + if err == nil { + var availableBytes = stat.Bavail * uint64(stat.Bsize) + subDir.IsFull = availableBytes < MinDiskSpace } } } + +// 获取目录 +func (this *FileStorage) subDir(hash string) (dirPath string, dirIsFull bool) { + var suffix = "/p" + types.String(this.policy.Id) + "/" + hash[:2] + "/" + hash[2:4] + + if len(hash) < 4 { + return this.options.Dir + suffix, this.mainDiskIsFull + } + + var subDirs = this.subDirs // copy slice + var countSubDirs = len(subDirs) + if countSubDirs == 0 { + return this.options.Dir + suffix, this.mainDiskIsFull + } + + countSubDirs++ // add main dir + + // 最多只支持16个目录 + if countSubDirs > 16 { + countSubDirs = 16 + } + + var dirIndex = this.charCode(hash[0]) % uint8(countSubDirs) + if dirIndex == 0 { + return this.options.Dir + suffix, this.mainDiskIsFull + } + var subDir = subDirs[dirIndex-1] + return subDir.Path + suffix, subDir.IsFull +} + +func (this *FileStorage) charCode(r byte) uint8 { + if r >= '0' && r <= '9' { + return r - '0' + } + if r >= 'a' && r <= 'z' { + return r - 'a' + 10 + } + return 0 +} diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 0845c0f..dd08d31 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -37,6 +37,7 @@ import ( "os" "os/exec" "os/signal" + "path/filepath" "runtime" "runtime/debug" "sort" @@ -985,7 +986,19 @@ func (this *Node) onReload(config *nodeconfigs.NodeConfig) { // 缓存策略 caches.SharedManager.MaxDiskCapacity = config.MaxCacheDiskCapacity caches.SharedManager.MaxMemoryCapacity = config.MaxCacheMemoryCapacity - caches.SharedManager.DiskDir = config.CacheDiskDir + caches.SharedManager.MainDiskDir = config.CacheDiskDir + + var subDirs = config.CacheDiskSubDirs + for _, subDir := range subDirs { + subDir.Path = filepath.Clean(subDir.Path) + } + if len(subDirs) > 0 { + sort.Slice(subDirs, func(i, j int) bool { + return subDirs[i].Path < subDirs[j].Path + }) + } + caches.SharedManager.SubDiskDirs = subDirs + if len(config.HTTPCachePolicies) > 0 { caches.SharedManager.UpdatePolicies(config.HTTPCachePolicies) } else {