增加edge-node cache.garbage命令用于清理垃圾缓存

This commit is contained in:
GoEdgeLab
2023-09-15 18:14:58 +08:00
parent bbb0c68fb0
commit a31548d26e
10 changed files with 284 additions and 66 deletions

View File

@@ -22,6 +22,7 @@ import (
"os"
"path/filepath"
"sort"
"strings"
"time"
)
@@ -30,7 +31,7 @@ func main() {
Version(teaconst.Version).
Product(teaconst.ProductName).
Usage(teaconst.ProcessName + " [-v|start|stop|restart|status|quit|test|reload|service|daemon|pprof|accesslog|uninstall]").
Usage(teaconst.ProcessName + " [trackers|goman|conns|gc|bandwidth|disk]").
Usage(teaconst.ProcessName + " [trackers|goman|conns|gc|bandwidth|disk|cache.garbage]").
Usage(teaconst.ProcessName + " [ip.drop|ip.reject|ip.remove|ip.close] IP")
app.On("start:before", func() {
@@ -464,6 +465,45 @@ func main() {
fmt.Println("Usage: edge-node disk [speed]")
}
})
app.On("cache.garbage", func() {
fmt.Println("scanning ...")
var shouldDelete bool
for _, arg := range os.Args {
if strings.TrimLeft(arg, "-") == "delete" {
shouldDelete = true
}
}
var sock = gosock.NewTmpSock(teaconst.ProcessName)
reply, err := sock.Send(&gosock.Command{
Code: "cache.garbage",
Params: map[string]any{"delete": shouldDelete},
})
if err != nil {
fmt.Println("[ERROR]" + err.Error())
return
}
var params = maps.NewMap(reply.Params)
if params.GetBool("isOk") {
var count = params.GetInt("count")
fmt.Println("found", count, "bad caches")
if count > 0 {
fmt.Println("======")
var sampleFiles = params.GetSlice("sampleFiles")
for _, file := range sampleFiles {
fmt.Println(types.String(file))
}
if count > len(sampleFiles) {
fmt.Println("... more files")
}
}
} else {
fmt.Println("[ERROR]" + params.GetString("error"))
}
})
app.Run(func() {
var node = nodes.NewNode()
node.Start()

View File

@@ -4,6 +4,7 @@ package caches
import (
"database/sql"
"errors"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/ttlcache"
@@ -134,7 +135,7 @@ func (this *FileList) Exist(hash string) (bool, error) {
var expiredAt int64
err := row.Scan(&expiredAt)
if err != nil {
if err == sql.ErrNoRows {
if errors.Is(err, sql.ErrNoRows) {
err = nil
}
return false, err
@@ -143,6 +144,18 @@ func (this *FileList) Exist(hash string) (bool, error) {
return true, nil
}
func (this *FileList) ExistQuick(hash string) (isReady bool, found bool) {
var db = this.GetDB(hash)
if !db.IsReady() || !db.HashMapIsLoaded() {
return
}
isReady = true
found = db.hashMap.Exist(hash)
return
}
// CleanPrefix 清理某个前缀的缓存数据
func (this *FileList) CleanPrefix(prefix string) error {
if len(prefix) == 0 {

View File

@@ -35,8 +35,9 @@ type FileListDB struct {
itemsTableName string
isClosed bool
isReady bool
isClosed bool // 是否已关闭
isReady bool // 是否已完成初始化
hashMapIsLoaded bool // Hash是否已加载
// cacheItems
existsByHashStmt *dbs.Stmt // 根据hash检查是否存在
@@ -196,26 +197,7 @@ func (this *FileListDB) Init() error {
this.isReady = true
// 加载HashMap
go func() {
err := this.hashMap.Load(this)
if err != nil {
remotelogs.Error("LIST_FILE_DB", "load hash map failed: "+err.Error()+"(file: "+this.dbPath+")")
// 自动修复错误
// TODO 将来希望能尽可能恢复以往数据库中的内容
if strings.Contains(err.Error(), "database is closed") || strings.Contains(err.Error(), "database disk image is malformed") {
_ = this.Close()
this.deleteDB()
remotelogs.Println("LIST_FILE_DB", "recreating the database (file:"+this.dbPath+") ...")
err = this.Open(this.dbPath)
if err != nil {
remotelogs.Error("LIST_FILE_DB", "recreate the database failed: "+err.Error()+" (file:"+this.dbPath+")")
} else {
_ = this.Init()
}
}
}
}()
go this.loadHashMap()
return nil
}
@@ -521,6 +503,10 @@ func (this *FileListDB) WrapError(err error) error {
return fmt.Errorf("%w (file: %s)", err, this.dbPath)
}
func (this *FileListDB) HashMapIsLoaded() bool {
return this.hashMapIsLoaded
}
// 初始化
func (this *FileListDB) initTables(times int) error {
{
@@ -633,3 +619,30 @@ func (this *FileListDB) deleteDB() {
_ = os.Remove(this.dbPath + "-shm")
_ = os.Remove(this.dbPath + "-wal")
}
// 加载Hash列表
func (this *FileListDB) loadHashMap() {
this.hashMapIsLoaded = false
err := this.hashMap.Load(this)
if err != nil {
remotelogs.Error("LIST_FILE_DB", "load hash map failed: "+err.Error()+"(file: "+this.dbPath+")")
// 自动修复错误
// TODO 将来希望能尽可能恢复以往数据库中的内容
if strings.Contains(err.Error(), "database is closed") || strings.Contains(err.Error(), "database disk image is malformed") {
_ = this.Close()
this.deleteDB()
remotelogs.Println("LIST_FILE_DB", "recreating the database (file:"+this.dbPath+") ...")
err = this.Open(this.dbPath)
if err != nil {
remotelogs.Error("LIST_FILE_DB", "recreate the database failed: "+err.Error()+" (file:"+this.dbPath+")")
} else {
_ = this.Init()
}
}
return
}
this.hashMapIsLoaded = true
}

View File

@@ -256,3 +256,19 @@ func (this *Manager) FindAllStorages() []StorageInterface {
}
return storages
}
// ScanGarbageCaches 清理目录中“失联”的缓存文件
func (this *Manager) ScanGarbageCaches(callback func(path string) error) error {
var storages = this.FindAllStorages()
for _, storage := range storages {
fileStorage, ok := storage.(*FileStorage)
if !ok {
continue
}
err := fileStorage.ScanGarbageCaches(callback)
if err != nil {
return err
}
}
return nil
}

View File

@@ -20,7 +20,7 @@ type PartialFileReader struct {
func NewPartialFileReader(fp *os.File) *PartialFileReader {
return &PartialFileReader{
FileReader: NewFileReader(fp),
rangePath: partialRangesFilePath(fp.Name()),
rangePath: PartialRangesFilePath(fp.Name()),
}
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
setutils "github.com/TeaOSLab/EdgeNode/internal/utils/sets"
"github.com/TeaOSLab/EdgeNode/internal/utils/sizes"
@@ -59,6 +60,7 @@ const (
FileTmpSuffix = ".tmp"
DefaultMinDiskFreeSpace uint64 = 5 << 30 // 当前磁盘最小剩余空间
DefaultStaleCacheSeconds = 1200 // 过时缓存留存时间
HashKeyLength = 32
)
var sharedWritingFileKeyMap = map[string]zero.Zero{} // key => bool
@@ -931,7 +933,7 @@ func (this *FileStorage) keyPath(key string) (hash string, path string, diskIsFu
// 获取Hash对应的文件路径
func (this *FileStorage) hashPath(hash string) (path string, diskIsFull bool) {
if len(hash) != 32 {
if len(hash) != HashKeyLength {
return "", false
}
var dir string
@@ -1081,45 +1083,43 @@ func (this *FileStorage) purgeLoop() {
maxCount = 10000
}
for {
maxLoops--
if maxLoops <= 0 {
break
}
var total, _ = this.list.Count()
if total <= 0 {
break
}
// 开始清理
var count = types.Int(math.Ceil(float64(total) * float64(lfuFreePercent*2) / 100))
if count <= 0 {
break
}
// 限制单次清理的条数,防止占用太多系统资源
if count > maxCount {
count = maxCount
}
remotelogs.Println("CACHE", "LFU purge policy '"+this.policy.Name+"' id: "+types.String(this.policy.Id)+", count: "+types.String(count))
err := this.list.PurgeLFU(count, func(hash string) error {
path, _ := this.hashPath(hash)
err := this.removeCacheFile(path)
if err != nil && !os.IsNotExist(err) {
remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error())
var total, _ = this.list.Count()
if total > 0 {
for {
maxLoops--
if maxLoops <= 0 {
break
}
return nil
})
if err != nil {
remotelogs.Warn("CACHE", "purge file storage in LFU failed: "+err.Error())
}
// 开始清理
var count = types.Int(math.Ceil(float64(total) * float64(lfuFreePercent*2) / 100))
if count <= 0 {
break
}
// 检查硬盘空间状态
if !this.hasFullDisk() {
break
// 限制单次清理的条数,防止占用太多系统资源
if count > maxCount {
count = maxCount
}
remotelogs.Println("CACHE", "LFU purge policy '"+this.policy.Name+"' id: "+types.String(this.policy.Id)+", count: "+types.String(count))
err := this.list.PurgeLFU(count, func(hash string) error {
path, _ := this.hashPath(hash)
err := this.removeCacheFile(path)
if err != nil && !os.IsNotExist(err) {
remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error())
}
return nil
})
if err != nil {
remotelogs.Warn("CACHE", "purge file storage in LFU failed: "+err.Error())
}
// 检查硬盘空间状态
if !this.hasFullDisk() {
break
}
}
}
}
@@ -1354,7 +1354,7 @@ func (this *FileStorage) removeCacheFile(path string) error {
err = nil
// 删除Partial相关
var partialPath = partialRangesFilePath(path)
var partialPath = PartialRangesFilePath(path)
if openFileCache != nil {
openFileCache.Close(partialPath)
}
@@ -1476,6 +1476,7 @@ func (this *FileStorage) checkDiskSpace() {
}
}
// 检查是否有已满的磁盘分区
func (this *FileStorage) hasFullDisk() bool {
this.checkDiskSpace()
@@ -1521,6 +1522,85 @@ func (this *FileStorage) subDir(hash string) (dirPath string, dirIsFull bool) {
return subDir.Path + suffix, subDir.IsFull
}
// ScanGarbageCaches 清理目录中“失联”的缓存文件
// “失联”为不在HashMap中的文件
func (this *FileStorage) ScanGarbageCaches(fileCallback func(path string) error) error {
var mainDir = this.options.Dir
var allDirs = []string{mainDir}
var subDirs = this.subDirs // copy
for _, subDir := range subDirs {
allDirs = append(allDirs, subDir.Path)
}
for _, subDir := range allDirs {
var dir0 = subDir + "/p" + types.String(this.policy.Id)
dir1Matches, err := filepath.Glob(dir0 + "/*")
if err != nil {
// ignore error
continue
}
for _, dir1 := range dir1Matches {
if len(filepath.Base(dir1)) != 2 {
continue
}
dir2Matches, err := filepath.Glob(dir1 + "/*")
if err != nil {
// ignore error
continue
}
for _, dir2 := range dir2Matches {
if len(filepath.Base(dir2)) != 2 {
continue
}
fileMatches, err := filepath.Glob(dir2 + "/*.cache")
if err != nil {
// ignore error
continue
}
for _, file := range fileMatches {
var filename = filepath.Base(file)
var hash = strings.TrimSuffix(filename, ".cache")
if len(hash) != HashKeyLength {
continue
}
isReady, found := this.list.(*FileList).ExistQuick(hash)
if !isReady {
continue
}
if found {
continue
}
// 检查文件正在被写入
stat, err := os.Stat(file)
if err != nil {
continue
}
if fasttime.Now().Unix()-stat.ModTime().Unix() < 300 /** 5 minutes **/ {
continue
}
if fileCallback != nil {
err = fileCallback(file)
if err != nil {
return err
}
}
}
}
}
}
return nil
}
// 计算字节数字代号
func (this *FileStorage) charCode(r byte) uint8 {
if r >= '0' && r <= '9' {
return r - '0'

View File

@@ -5,6 +5,7 @@ import (
"errors"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/utils/testutils"
"github.com/iwind/TeaGo/Tea"
_ "github.com/iwind/TeaGo/bootstrap"
"github.com/iwind/TeaGo/logs"
@@ -577,6 +578,31 @@ func TestFileStorage_RemoveCacheFile(t *testing.T) {
t.Log(storage.removeCacheFile("/Users/WorkSpace/EdgeProject/EdgeCache/p43/15/7e/157eba0dfc6dfb6fbbf20b1f9e584674.cache"))
}
func TestFileStorage_ScanGarbageCaches(t *testing.T) {
if !testutils.IsSingleTesting() {
return
}
var storage = NewFileStorage(&serverconfigs.HTTPCachePolicy{
Id: 43,
Options: map[string]any{"dir": "/Users/WorkSpace/EdgeProject/EdgeCache"},
})
err := storage.Init()
if err != nil {
t.Fatal(err)
}
err = storage.ScanGarbageCaches(func(path string) {
t.Log(path)
}, func(path string) error {
t.Log(path, PartialRangesFilePath(path))
return nil
})
if err != nil {
t.Fatal(err)
}
}
func BenchmarkFileStorage_Read(b *testing.B) {
runtime.GOMAXPROCS(1)

View File

@@ -4,8 +4,8 @@ package caches
import "strings"
// 获取 ranges 文件路径
func partialRangesFilePath(path string) string {
// PartialRangesFilePath 获取 ranges 文件路径
func PartialRangesFilePath(path string) string {
// ranges路径
var dotIndex = strings.LastIndex(path, ".")
var rangePath string

View File

@@ -43,7 +43,7 @@ func NewPartialFileWriter(rawWriter *os.File, key string, expiredAt int64, metaH
isPartial: isPartial,
bodyOffset: bodyOffset,
ranges: ranges,
rangePath: partialRangesFilePath(rawWriter.Name()),
rangePath: PartialRangesFilePath(rawWriter.Name()),
metaHeaderSize: metaHeaderSize,
metaBodySize: metaBodySize,
}

View File

@@ -808,6 +808,36 @@ func (this *Node) listenSock() error {
_ = cmd.Reply(&gosock.Command{Params: maps.Map{
"stats": m,
}})
case "cache.garbage":
var shouldDelete = maps.NewMap(cmd.Params).GetBool("delete")
var count = 0
var sampleFiles = []string{}
err := caches.SharedManager.ScanGarbageCaches(func(path string) error {
count++
if len(sampleFiles) < 10 {
sampleFiles = append(sampleFiles, path)
}
if shouldDelete {
_ = os.Remove(path) // .cache
_ = os.Remove(caches.PartialRangesFilePath(path)) // @range.cache
}
return nil
})
if err != nil {
_ = cmd.Reply(&gosock.Command{Params: maps.Map{
"isOk": false,
"error": err.Error(),
}})
} else {
_ = cmd.Reply(&gosock.Command{Params: maps.Map{
"isOk": true,
"count": count,
"sampleFiles": sampleFiles,
}})
}
}
})