边缘节点支持设置多个缓存目录

This commit is contained in:
刘祥超
2022-11-15 20:42:25 +08:00
parent 825e46458f
commit 1bb818b5b0
4 changed files with 185 additions and 68 deletions

View File

@@ -0,0 +1,11 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
package caches
import "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
type FileDir struct {
Path string
Capacity *shared.SizeCapacity
IsFull bool
}

View File

@@ -24,7 +24,8 @@ func init() {
type Manager struct { type Manager struct {
// 全局配置 // 全局配置
MaxDiskCapacity *shared.SizeCapacity MaxDiskCapacity *shared.SizeCapacity
DiskDir string MainDiskDir string
SubDiskDirs []*serverconfigs.CacheDir
MaxMemoryCapacity *shared.SizeCapacity MaxMemoryCapacity *shared.SizeCapacity
policyMap map[int64]*serverconfigs.HTTPCachePolicy // policyId => []*Policy policyMap map[int64]*serverconfigs.HTTPCachePolicy // policyId => []*Policy
@@ -47,12 +48,10 @@ func (this *Manager) UpdatePolicies(newPolicies []*serverconfigs.HTTPCachePolicy
this.locker.Lock() this.locker.Lock()
defer this.locker.Unlock() defer this.locker.Unlock()
newPolicyIds := []int64{} var newPolicyIds = []int64{}
for _, policy := range newPolicies { for _, policy := range newPolicies {
// 使用节点单独的缓存目录 // 使用节点单独的缓存目录
if len(this.DiskDir) > 0 { policy.UpdateDiskDir(this.MainDiskDir, this.SubDiskDirs)
policy.UpdateDiskDir(this.DiskDir)
}
newPolicyIds = append(newPolicyIds, policy.Id) newPolicyIds = append(newPolicyIds, policy.Id)
} }

View File

@@ -93,7 +93,10 @@ type FileStorage struct {
openFileCache *OpenFileCache openFileCache *OpenFileCache
diskIsFull bool mainDir string
mainDiskIsFull bool
subDirs []*FileDir
} }
func NewFileStorage(policy *serverconfigs.HTTPCachePolicy) *FileStorage { func NewFileStorage(policy *serverconfigs.HTTPCachePolicy) *FileStorage {
@@ -157,6 +160,16 @@ func (this *FileStorage) UpdatePolicy(newPolicy *serverconfigs.HTTPCachePolicy)
return return
} }
var subDirs = []*FileDir{}
for _, subDir := range newOptions.SubDirs {
subDirs = append(subDirs, &FileDir{
Path: subDir.Path,
Capacity: subDir.Capacity,
IsFull: false,
})
}
this.checkDiskSpace()
err = newOptions.Init() err = newOptions.Init()
if err != nil { if err != nil {
remotelogs.Error("CACHE", "update policy '"+types.String(this.policy.Id)+"' failed: init options failed: "+err.Error()) remotelogs.Error("CACHE", "update policy '"+types.String(this.policy.Id)+"' failed: init options failed: "+err.Error())
@@ -219,6 +232,19 @@ func (this *FileStorage) Init() error {
this.options.Dir = filepath.Clean(this.options.Dir) this.options.Dir = filepath.Clean(this.options.Dir)
var dir = this.options.Dir var dir = this.options.Dir
var subDirs = []*FileDir{}
for _, subDir := range this.options.SubDirs {
subDirs = append(subDirs, &FileDir{
Path: subDir.Path,
Capacity: subDir.Capacity,
IsFull: false,
})
}
this.subDirs = subDirs
if len(subDirs) > 0 {
this.checkDiskSpace()
}
if len(dir) == 0 { if len(dir) == 0 {
return errors.New("[CACHE]cache storage dir can not be empty") return errors.New("[CACHE]cache storage dir can not be empty")
} }
@@ -321,7 +347,7 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
} }
} }
hash, path := this.keyPath(key) hash, path, _ := this.keyPath(key)
// 检查文件记录是否已过期 // 检查文件记录是否已过期
if !useStale { if !useStale {
@@ -404,11 +430,6 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz
return nil, ErrWritingUnavailable return nil, ErrWritingUnavailable
} }
// 当前磁盘可用容量是否严重不足
if this.diskIsFull {
return nil, NewCapacityError("the disk is full")
}
// 是否已忽略 // 是否已忽略
if this.ignoreKeys.Has(key) { if this.ignoreKeys.Has(key) {
return nil, ErrEntityTooLarge return nil, ErrEntityTooLarge
@@ -475,8 +496,12 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz
var hash = stringutil.Md5(key) var hash = stringutil.Md5(key)
dir, diskIsFull := this.subDir(hash)
if diskIsFull {
return nil, NewCapacityError("the disk is full")
}
// TODO 可以只stat一次 // TODO 可以只stat一次
var dir = this.options.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4]
_, err = os.Stat(dir) _, err = os.Stat(dir)
if err != nil { if err != nil {
if !os.IsNotExist(err) { if !os.IsNotExist(err) {
@@ -680,7 +705,7 @@ func (this *FileStorage) Delete(key string) error {
_ = memoryStorage.Delete(key) _ = memoryStorage.Delete(key)
}) })
hash, path := this.keyPath(key) hash, path, _ := this.keyPath(key)
err := this.list.Remove(hash) err := this.list.Remove(hash)
if err != nil { if err != nil {
return err return err
@@ -720,56 +745,67 @@ func (this *FileStorage) CleanAll() error {
// 删除缓存和目录 // 删除缓存和目录
// 不能直接删除子目录,比较危险 // 不能直接删除子目录,比较危险
dir := this.dir()
fp, err := os.Open(dir)
if err != nil {
return err
}
defer func() {
_ = fp.Close()
}()
stat, err := fp.Stat() var rootDirs = []string{this.options.Dir}
if err != nil { var subDirs = this.subDirs // copy slice
return err if len(subDirs) > 0 {
for _, subDir := range subDirs {
rootDirs = append(rootDirs, subDir.Path)
}
} }
if !stat.IsDir() { for _, rootDir := range rootDirs {
return nil var dir = rootDir + "/p" + types.String(this.policy.Id)
} fp, err := os.Open(dir)
// 改成待删除
subDirs, err := fp.Readdir(-1)
if err != nil {
return err
}
for _, info := range subDirs {
subDir := info.Name()
// 检查目录名
ok, err := regexp.MatchString(`^[0-9a-f]{2}$`, subDir)
if err != nil { if err != nil {
return err return err
} }
if !ok { defer func() {
continue _ = fp.Close()
} }()
// 修改目录名 stat, err := fp.Stat()
tmpDir := dir + "/" + subDir + "-deleted"
err = os.Rename(dir+"/"+subDir, tmpDir)
if err != nil { if err != nil {
return err return err
} }
}
// 重新遍历待删除 if !stat.IsDir() {
goman.New(func() { return nil
err = this.cleanDeletedDirs(dir)
if err != nil {
remotelogs.Warn("CACHE", "delete '*-deleted' dirs failed: "+err.Error())
} }
})
// 改成待删除
subDirs, err := fp.Readdir(-1)
if err != nil {
return err
}
for _, info := range subDirs {
subDir := info.Name()
// 检查目录名
ok, err := regexp.MatchString(`^[0-9a-f]{2}$`, subDir)
if err != nil {
return err
}
if !ok {
continue
}
// 修改目录名
tmpDir := dir + "/" + subDir + "-deleted"
err = os.Rename(dir+"/"+subDir, tmpDir)
if err != nil {
return err
}
}
// 重新遍历待删除
goman.New(func() {
err = this.cleanDeletedDirs(dir)
if err != nil {
remotelogs.Warn("CACHE", "delete '*-deleted' dirs failed: "+err.Error())
}
})
}
return nil return nil
} }
@@ -802,7 +838,7 @@ func (this *FileStorage) Purge(keys []string, urlType string) error {
// URL // URL
for _, key := range keys { for _, key := range keys {
hash, path := this.keyPath(key) hash, path, _ := this.keyPath(key)
err := this.removeCacheFile(path) err := this.removeCacheFile(path)
if err != nil && !os.IsNotExist(err) { if err != nil && !os.IsNotExist(err) {
return err return err
@@ -873,25 +909,22 @@ func (this *FileStorage) CanSendfile() bool {
return this.options.EnableSendfile return this.options.EnableSendfile
} }
// 绝对路径
func (this *FileStorage) dir() string {
return this.options.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/"
}
// 获取Key对应的文件路径 // 获取Key对应的文件路径
func (this *FileStorage) keyPath(key string) (hash string, path string) { func (this *FileStorage) keyPath(key string) (hash string, path string, diskIsFull bool) {
hash = stringutil.Md5(key) hash = stringutil.Md5(key)
dir := this.options.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4] var dir string
dir, diskIsFull = this.subDir(hash)
path = dir + "/" + hash + ".cache" path = dir + "/" + hash + ".cache"
return return
} }
// 获取Hash对应的文件路径 // 获取Hash对应的文件路径
func (this *FileStorage) hashPath(hash string) (path string) { func (this *FileStorage) hashPath(hash string) (path string, diskIsFull bool) {
if len(hash) != 32 { if len(hash) != 32 {
return "" return "", false
} }
dir := this.options.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4] var dir string
dir, diskIsFull = this.subDir(hash)
path = dir + "/" + hash + ".cache" path = dir + "/" + hash + ".cache"
return return
} }
@@ -949,6 +982,7 @@ func (this *FileStorage) initList() error {
} }
// 清理任务 // 清理任务
// TODO purge每个分区
func (this *FileStorage) purgeLoop() { func (this *FileStorage) purgeLoop() {
// 检查磁盘剩余空间 // 检查磁盘剩余空间
this.checkDiskSpace() this.checkDiskSpace()
@@ -960,7 +994,19 @@ func (this *FileStorage) purgeLoop() {
if lfuFreePercent <= 0 { if lfuFreePercent <= 0 {
lfuFreePercent = 5 lfuFreePercent = 5
} }
if this.diskIsFull {
var hasFullDisk = this.mainDiskIsFull
if !hasFullDisk {
var subDirs = this.subDirs // copy slice
for _, subDir := range subDirs {
if subDir.IsFull {
hasFullDisk = true
break
}
}
}
if hasFullDisk {
startLFU = true startLFU = true
} else { } else {
var usedPercent = float32(this.TotalDiskSize()*100) / float32(capacityBytes) var usedPercent = float32(this.TotalDiskSize()*100) / float32(capacityBytes)
@@ -993,7 +1039,7 @@ func (this *FileStorage) purgeLoop() {
} }
for i := 0; i < times; i++ { for i := 0; i < times; i++ {
countFound, err := this.list.Purge(purgeCount, func(hash string) error { countFound, err := this.list.Purge(purgeCount, func(hash string) error {
path := this.hashPath(hash) path, _ := this.hashPath(hash)
err := this.removeCacheFile(path) err := this.removeCacheFile(path)
if err != nil && !os.IsNotExist(err) { if err != nil && !os.IsNotExist(err) {
remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error()) remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error())
@@ -1027,7 +1073,7 @@ func (this *FileStorage) purgeLoop() {
remotelogs.Println("CACHE", "LFU purge policy '"+this.policy.Name+"' id: "+types.String(this.policy.Id)+", count: "+types.String(count)) remotelogs.Println("CACHE", "LFU purge policy '"+this.policy.Name+"' id: "+types.String(this.policy.Id)+", count: "+types.String(count))
err := this.list.PurgeLFU(count, func(hash string) error { err := this.list.PurgeLFU(count, func(hash string) error {
path := this.hashPath(hash) path, _ := this.hashPath(hash)
err := this.removeCacheFile(path) err := this.removeCacheFile(path)
if err != nil && !os.IsNotExist(err) { if err != nil && !os.IsNotExist(err) {
remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error()) remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error())
@@ -1354,7 +1400,55 @@ func (this *FileStorage) checkDiskSpace() {
err := unix.Statfs(this.options.Dir, &stat) err := unix.Statfs(this.options.Dir, &stat)
if err == nil { if err == nil {
var availableBytes = stat.Bavail * uint64(stat.Bsize) var availableBytes = stat.Bavail * uint64(stat.Bsize)
this.diskIsFull = availableBytes < MinDiskSpace this.mainDiskIsFull = availableBytes < MinDiskSpace
}
}
var subDirs = this.subDirs // copy slice
for _, subDir := range subDirs {
var stat unix.Statfs_t
err := unix.Statfs(subDir.Path, &stat)
if err == nil {
var availableBytes = stat.Bavail * uint64(stat.Bsize)
subDir.IsFull = availableBytes < MinDiskSpace
} }
} }
} }
// 获取目录
func (this *FileStorage) subDir(hash string) (dirPath string, dirIsFull bool) {
var suffix = "/p" + types.String(this.policy.Id) + "/" + hash[:2] + "/" + hash[2:4]
if len(hash) < 4 {
return this.options.Dir + suffix, this.mainDiskIsFull
}
var subDirs = this.subDirs // copy slice
var countSubDirs = len(subDirs)
if countSubDirs == 0 {
return this.options.Dir + suffix, this.mainDiskIsFull
}
countSubDirs++ // add main dir
// 最多只支持16个目录
if countSubDirs > 16 {
countSubDirs = 16
}
var dirIndex = this.charCode(hash[0]) % uint8(countSubDirs)
if dirIndex == 0 {
return this.options.Dir + suffix, this.mainDiskIsFull
}
var subDir = subDirs[dirIndex-1]
return subDir.Path + suffix, subDir.IsFull
}
func (this *FileStorage) charCode(r byte) uint8 {
if r >= '0' && r <= '9' {
return r - '0'
}
if r >= 'a' && r <= 'z' {
return r - 'a' + 10
}
return 0
}

View File

@@ -37,6 +37,7 @@ import (
"os" "os"
"os/exec" "os/exec"
"os/signal" "os/signal"
"path/filepath"
"runtime" "runtime"
"runtime/debug" "runtime/debug"
"sort" "sort"
@@ -985,7 +986,19 @@ func (this *Node) onReload(config *nodeconfigs.NodeConfig) {
// 缓存策略 // 缓存策略
caches.SharedManager.MaxDiskCapacity = config.MaxCacheDiskCapacity caches.SharedManager.MaxDiskCapacity = config.MaxCacheDiskCapacity
caches.SharedManager.MaxMemoryCapacity = config.MaxCacheMemoryCapacity caches.SharedManager.MaxMemoryCapacity = config.MaxCacheMemoryCapacity
caches.SharedManager.DiskDir = config.CacheDiskDir caches.SharedManager.MainDiskDir = config.CacheDiskDir
var subDirs = config.CacheDiskSubDirs
for _, subDir := range subDirs {
subDir.Path = filepath.Clean(subDir.Path)
}
if len(subDirs) > 0 {
sort.Slice(subDirs, func(i, j int) bool {
return subDirs[i].Path < subDirs[j].Path
})
}
caches.SharedManager.SubDiskDirs = subDirs
if len(config.HTTPCachePolicies) > 0 { if len(config.HTTPCachePolicies) > 0 {
caches.SharedManager.UpdatePolicies(config.HTTPCachePolicies) caches.SharedManager.UpdatePolicies(config.HTTPCachePolicies)
} else { } else {