From a31548d26e83a45cee8ca46889007b4eafdcd3e1 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Fri, 15 Sep 2023 18:14:58 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0edge-node=20cache.garbage?= =?UTF-8?q?=E5=91=BD=E4=BB=A4=E7=94=A8=E4=BA=8E=E6=B8=85=E7=90=86=E5=9E=83?= =?UTF-8?q?=E5=9C=BE=E7=BC=93=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/edge-node/main.go | 42 ++++++- internal/caches/list_file.go | 15 ++- internal/caches/list_file_db.go | 57 +++++---- internal/caches/manager.go | 16 +++ internal/caches/reader_partial_file.go | 2 +- internal/caches/storage_file.go | 156 +++++++++++++++++++------ internal/caches/storage_file_test.go | 26 +++++ internal/caches/utils_partial.go | 4 +- internal/caches/writer_partial_file.go | 2 +- internal/nodes/node.go | 30 +++++ 10 files changed, 284 insertions(+), 66 deletions(-) diff --git a/cmd/edge-node/main.go b/cmd/edge-node/main.go index 0cd0563..0bc22ab 100644 --- a/cmd/edge-node/main.go +++ b/cmd/edge-node/main.go @@ -22,6 +22,7 @@ import ( "os" "path/filepath" "sort" + "strings" "time" ) @@ -30,7 +31,7 @@ func main() { Version(teaconst.Version). Product(teaconst.ProductName). Usage(teaconst.ProcessName + " [-v|start|stop|restart|status|quit|test|reload|service|daemon|pprof|accesslog|uninstall]"). - Usage(teaconst.ProcessName + " [trackers|goman|conns|gc|bandwidth|disk]"). + Usage(teaconst.ProcessName + " [trackers|goman|conns|gc|bandwidth|disk|cache.garbage]"). Usage(teaconst.ProcessName + " [ip.drop|ip.reject|ip.remove|ip.close] IP") app.On("start:before", func() { @@ -464,6 +465,45 @@ func main() { fmt.Println("Usage: edge-node disk [speed]") } }) + app.On("cache.garbage", func() { + fmt.Println("scanning ...") + + var shouldDelete bool + for _, arg := range os.Args { + if strings.TrimLeft(arg, "-") == "delete" { + shouldDelete = true + } + } + + var sock = gosock.NewTmpSock(teaconst.ProcessName) + reply, err := sock.Send(&gosock.Command{ + Code: "cache.garbage", + Params: map[string]any{"delete": shouldDelete}, + }) + if err != nil { + fmt.Println("[ERROR]" + err.Error()) + return + } + + var params = maps.NewMap(reply.Params) + if params.GetBool("isOk") { + var count = params.GetInt("count") + fmt.Println("found", count, "bad caches") + + if count > 0 { + fmt.Println("======") + var sampleFiles = params.GetSlice("sampleFiles") + for _, file := range sampleFiles { + fmt.Println(types.String(file)) + } + if count > len(sampleFiles) { + fmt.Println("... more files") + } + } + } else { + fmt.Println("[ERROR]" + params.GetString("error")) + } + }) app.Run(func() { var node = nodes.NewNode() node.Start() diff --git a/internal/caches/list_file.go b/internal/caches/list_file.go index 7f5e6ea..b2c887e 100644 --- a/internal/caches/list_file.go +++ b/internal/caches/list_file.go @@ -4,6 +4,7 @@ package caches import ( "database/sql" + "errors" "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/ttlcache" @@ -134,7 +135,7 @@ func (this *FileList) Exist(hash string) (bool, error) { var expiredAt int64 err := row.Scan(&expiredAt) if err != nil { - if err == sql.ErrNoRows { + if errors.Is(err, sql.ErrNoRows) { err = nil } return false, err @@ -143,6 +144,18 @@ func (this *FileList) Exist(hash string) (bool, error) { return true, nil } +func (this *FileList) ExistQuick(hash string) (isReady bool, found bool) { + var db = this.GetDB(hash) + + if !db.IsReady() || !db.HashMapIsLoaded() { + return + } + + isReady = true + found = db.hashMap.Exist(hash) + return +} + // CleanPrefix 清理某个前缀的缓存数据 func (this *FileList) CleanPrefix(prefix string) error { if len(prefix) == 0 { diff --git a/internal/caches/list_file_db.go b/internal/caches/list_file_db.go index 19de8b9..c2a599c 100644 --- a/internal/caches/list_file_db.go +++ b/internal/caches/list_file_db.go @@ -35,8 +35,9 @@ type FileListDB struct { itemsTableName string - isClosed bool - isReady bool + isClosed bool // 是否已关闭 + isReady bool // 是否已完成初始化 + hashMapIsLoaded bool // Hash是否已加载 // cacheItems existsByHashStmt *dbs.Stmt // 根据hash检查是否存在 @@ -196,26 +197,7 @@ func (this *FileListDB) Init() error { this.isReady = true // 加载HashMap - go func() { - err := this.hashMap.Load(this) - if err != nil { - remotelogs.Error("LIST_FILE_DB", "load hash map failed: "+err.Error()+"(file: "+this.dbPath+")") - - // 自动修复错误 - // TODO 将来希望能尽可能恢复以往数据库中的内容 - if strings.Contains(err.Error(), "database is closed") || strings.Contains(err.Error(), "database disk image is malformed") { - _ = this.Close() - this.deleteDB() - remotelogs.Println("LIST_FILE_DB", "recreating the database (file:"+this.dbPath+") ...") - err = this.Open(this.dbPath) - if err != nil { - remotelogs.Error("LIST_FILE_DB", "recreate the database failed: "+err.Error()+" (file:"+this.dbPath+")") - } else { - _ = this.Init() - } - } - } - }() + go this.loadHashMap() return nil } @@ -521,6 +503,10 @@ func (this *FileListDB) WrapError(err error) error { return fmt.Errorf("%w (file: %s)", err, this.dbPath) } +func (this *FileListDB) HashMapIsLoaded() bool { + return this.hashMapIsLoaded +} + // 初始化 func (this *FileListDB) initTables(times int) error { { @@ -633,3 +619,30 @@ func (this *FileListDB) deleteDB() { _ = os.Remove(this.dbPath + "-shm") _ = os.Remove(this.dbPath + "-wal") } + +// 加载Hash列表 +func (this *FileListDB) loadHashMap() { + this.hashMapIsLoaded = false + + err := this.hashMap.Load(this) + if err != nil { + remotelogs.Error("LIST_FILE_DB", "load hash map failed: "+err.Error()+"(file: "+this.dbPath+")") + + // 自动修复错误 + // TODO 将来希望能尽可能恢复以往数据库中的内容 + if strings.Contains(err.Error(), "database is closed") || strings.Contains(err.Error(), "database disk image is malformed") { + _ = this.Close() + this.deleteDB() + remotelogs.Println("LIST_FILE_DB", "recreating the database (file:"+this.dbPath+") ...") + err = this.Open(this.dbPath) + if err != nil { + remotelogs.Error("LIST_FILE_DB", "recreate the database failed: "+err.Error()+" (file:"+this.dbPath+")") + } else { + _ = this.Init() + } + } + return + } + + this.hashMapIsLoaded = true +} diff --git a/internal/caches/manager.go b/internal/caches/manager.go index b5b5714..f186295 100644 --- a/internal/caches/manager.go +++ b/internal/caches/manager.go @@ -256,3 +256,19 @@ func (this *Manager) FindAllStorages() []StorageInterface { } return storages } + +// ScanGarbageCaches 清理目录中“失联”的缓存文件 +func (this *Manager) ScanGarbageCaches(callback func(path string) error) error { + var storages = this.FindAllStorages() + for _, storage := range storages { + fileStorage, ok := storage.(*FileStorage) + if !ok { + continue + } + err := fileStorage.ScanGarbageCaches(callback) + if err != nil { + return err + } + } + return nil +} diff --git a/internal/caches/reader_partial_file.go b/internal/caches/reader_partial_file.go index 6f03c63..fa47895 100644 --- a/internal/caches/reader_partial_file.go +++ b/internal/caches/reader_partial_file.go @@ -20,7 +20,7 @@ type PartialFileReader struct { func NewPartialFileReader(fp *os.File) *PartialFileReader { return &PartialFileReader{ FileReader: NewFileReader(fp), - rangePath: partialRangesFilePath(fp.Name()), + rangePath: PartialRangesFilePath(fp.Name()), } } diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 5c48843..d481f3e 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -14,6 +14,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" setutils "github.com/TeaOSLab/EdgeNode/internal/utils/sets" "github.com/TeaOSLab/EdgeNode/internal/utils/sizes" @@ -59,6 +60,7 @@ const ( FileTmpSuffix = ".tmp" DefaultMinDiskFreeSpace uint64 = 5 << 30 // 当前磁盘最小剩余空间 DefaultStaleCacheSeconds = 1200 // 过时缓存留存时间 + HashKeyLength = 32 ) var sharedWritingFileKeyMap = map[string]zero.Zero{} // key => bool @@ -931,7 +933,7 @@ func (this *FileStorage) keyPath(key string) (hash string, path string, diskIsFu // 获取Hash对应的文件路径 func (this *FileStorage) hashPath(hash string) (path string, diskIsFull bool) { - if len(hash) != 32 { + if len(hash) != HashKeyLength { return "", false } var dir string @@ -1081,45 +1083,43 @@ func (this *FileStorage) purgeLoop() { maxCount = 10000 } - for { - maxLoops-- - if maxLoops <= 0 { - break - } - - var total, _ = this.list.Count() - if total <= 0 { - break - } - - // 开始清理 - var count = types.Int(math.Ceil(float64(total) * float64(lfuFreePercent*2) / 100)) - if count <= 0 { - break - } - - // 限制单次清理的条数,防止占用太多系统资源 - if count > maxCount { - count = maxCount - } - - remotelogs.Println("CACHE", "LFU purge policy '"+this.policy.Name+"' id: "+types.String(this.policy.Id)+", count: "+types.String(count)) - err := this.list.PurgeLFU(count, func(hash string) error { - path, _ := this.hashPath(hash) - err := this.removeCacheFile(path) - if err != nil && !os.IsNotExist(err) { - remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error()) + var total, _ = this.list.Count() + if total > 0 { + for { + maxLoops-- + if maxLoops <= 0 { + break } - return nil - }) - if err != nil { - remotelogs.Warn("CACHE", "purge file storage in LFU failed: "+err.Error()) - } + // 开始清理 + var count = types.Int(math.Ceil(float64(total) * float64(lfuFreePercent*2) / 100)) + if count <= 0 { + break + } - // 检查硬盘空间状态 - if !this.hasFullDisk() { - break + // 限制单次清理的条数,防止占用太多系统资源 + if count > maxCount { + count = maxCount + } + + remotelogs.Println("CACHE", "LFU purge policy '"+this.policy.Name+"' id: "+types.String(this.policy.Id)+", count: "+types.String(count)) + err := this.list.PurgeLFU(count, func(hash string) error { + path, _ := this.hashPath(hash) + err := this.removeCacheFile(path) + if err != nil && !os.IsNotExist(err) { + remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error()) + } + + return nil + }) + if err != nil { + remotelogs.Warn("CACHE", "purge file storage in LFU failed: "+err.Error()) + } + + // 检查硬盘空间状态 + if !this.hasFullDisk() { + break + } } } } @@ -1354,7 +1354,7 @@ func (this *FileStorage) removeCacheFile(path string) error { err = nil // 删除Partial相关 - var partialPath = partialRangesFilePath(path) + var partialPath = PartialRangesFilePath(path) if openFileCache != nil { openFileCache.Close(partialPath) } @@ -1476,6 +1476,7 @@ func (this *FileStorage) checkDiskSpace() { } } +// 检查是否有已满的磁盘分区 func (this *FileStorage) hasFullDisk() bool { this.checkDiskSpace() @@ -1521,6 +1522,85 @@ func (this *FileStorage) subDir(hash string) (dirPath string, dirIsFull bool) { return subDir.Path + suffix, subDir.IsFull } +// ScanGarbageCaches 清理目录中“失联”的缓存文件 +// “失联”为不在HashMap中的文件 +func (this *FileStorage) ScanGarbageCaches(fileCallback func(path string) error) error { + var mainDir = this.options.Dir + var allDirs = []string{mainDir} + var subDirs = this.subDirs // copy + for _, subDir := range subDirs { + allDirs = append(allDirs, subDir.Path) + } + + for _, subDir := range allDirs { + var dir0 = subDir + "/p" + types.String(this.policy.Id) + dir1Matches, err := filepath.Glob(dir0 + "/*") + if err != nil { + // ignore error + continue + } + + for _, dir1 := range dir1Matches { + if len(filepath.Base(dir1)) != 2 { + continue + } + + dir2Matches, err := filepath.Glob(dir1 + "/*") + if err != nil { + // ignore error + continue + } + for _, dir2 := range dir2Matches { + if len(filepath.Base(dir2)) != 2 { + continue + } + + fileMatches, err := filepath.Glob(dir2 + "/*.cache") + if err != nil { + // ignore error + continue + } + + for _, file := range fileMatches { + var filename = filepath.Base(file) + var hash = strings.TrimSuffix(filename, ".cache") + if len(hash) != HashKeyLength { + continue + } + + isReady, found := this.list.(*FileList).ExistQuick(hash) + if !isReady { + continue + } + + if found { + continue + } + + // 检查文件正在被写入 + stat, err := os.Stat(file) + if err != nil { + continue + } + if fasttime.Now().Unix()-stat.ModTime().Unix() < 300 /** 5 minutes **/ { + continue + } + + if fileCallback != nil { + err = fileCallback(file) + if err != nil { + return err + } + } + } + } + } + } + + return nil +} + +// 计算字节数字代号 func (this *FileStorage) charCode(r byte) uint8 { if r >= '0' && r <= '9' { return r - '0' diff --git a/internal/caches/storage_file_test.go b/internal/caches/storage_file_test.go index a66c977..6a79b52 100644 --- a/internal/caches/storage_file_test.go +++ b/internal/caches/storage_file_test.go @@ -5,6 +5,7 @@ import ( "errors" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/TeaOSLab/EdgeNode/internal/utils/testutils" "github.com/iwind/TeaGo/Tea" _ "github.com/iwind/TeaGo/bootstrap" "github.com/iwind/TeaGo/logs" @@ -577,6 +578,31 @@ func TestFileStorage_RemoveCacheFile(t *testing.T) { t.Log(storage.removeCacheFile("/Users/WorkSpace/EdgeProject/EdgeCache/p43/15/7e/157eba0dfc6dfb6fbbf20b1f9e584674.cache")) } +func TestFileStorage_ScanGarbageCaches(t *testing.T) { + if !testutils.IsSingleTesting() { + return + } + + var storage = NewFileStorage(&serverconfigs.HTTPCachePolicy{ + Id: 43, + Options: map[string]any{"dir": "/Users/WorkSpace/EdgeProject/EdgeCache"}, + }) + err := storage.Init() + if err != nil { + t.Fatal(err) + } + + err = storage.ScanGarbageCaches(func(path string) { + t.Log(path) + }, func(path string) error { + t.Log(path, PartialRangesFilePath(path)) + return nil + }) + if err != nil { + t.Fatal(err) + } +} + func BenchmarkFileStorage_Read(b *testing.B) { runtime.GOMAXPROCS(1) diff --git a/internal/caches/utils_partial.go b/internal/caches/utils_partial.go index 70f7f10..607a17d 100644 --- a/internal/caches/utils_partial.go +++ b/internal/caches/utils_partial.go @@ -4,8 +4,8 @@ package caches import "strings" -// 获取 ranges 文件路径 -func partialRangesFilePath(path string) string { +// PartialRangesFilePath 获取 ranges 文件路径 +func PartialRangesFilePath(path string) string { // ranges路径 var dotIndex = strings.LastIndex(path, ".") var rangePath string diff --git a/internal/caches/writer_partial_file.go b/internal/caches/writer_partial_file.go index 0b07394..fd02a7d 100644 --- a/internal/caches/writer_partial_file.go +++ b/internal/caches/writer_partial_file.go @@ -43,7 +43,7 @@ func NewPartialFileWriter(rawWriter *os.File, key string, expiredAt int64, metaH isPartial: isPartial, bodyOffset: bodyOffset, ranges: ranges, - rangePath: partialRangesFilePath(rawWriter.Name()), + rangePath: PartialRangesFilePath(rawWriter.Name()), metaHeaderSize: metaHeaderSize, metaBodySize: metaBodySize, } diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 0c911b5..3b4a60a 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -808,6 +808,36 @@ func (this *Node) listenSock() error { _ = cmd.Reply(&gosock.Command{Params: maps.Map{ "stats": m, }}) + case "cache.garbage": + var shouldDelete = maps.NewMap(cmd.Params).GetBool("delete") + + var count = 0 + var sampleFiles = []string{} + err := caches.SharedManager.ScanGarbageCaches(func(path string) error { + count++ + if len(sampleFiles) < 10 { + sampleFiles = append(sampleFiles, path) + } + + if shouldDelete { + _ = os.Remove(path) // .cache + _ = os.Remove(caches.PartialRangesFilePath(path)) // @range.cache + } + + return nil + }) + if err != nil { + _ = cmd.Reply(&gosock.Command{Params: maps.Map{ + "isOk": false, + "error": err.Error(), + }}) + } else { + _ = cmd.Reply(&gosock.Command{Params: maps.Map{ + "isOk": true, + "count": count, + "sampleFiles": sampleFiles, + }}) + } } })