优化文件缓存

This commit is contained in:
GoEdgeLab
2021-06-13 17:37:57 +08:00
parent 117bcaaf0a
commit 468af65edd
11 changed files with 309 additions and 101 deletions

View File

@@ -18,6 +18,8 @@ type Item struct {
HeaderSize int64 `json:"headerSize"` HeaderSize int64 `json:"headerSize"`
BodySize int64 `json:"bodySize"` BodySize int64 `json:"bodySize"`
MetaSize int64 `json:"metaSize"` MetaSize int64 `json:"metaSize"`
Host string `json:"host"` // 主机名
ServerId int64 `json:"serverId"` // 服务ID
} }
func (this *Item) IsExpired() bool { func (this *Item) IsExpired() bool {

View File

@@ -5,8 +5,10 @@ package caches
import ( import (
"database/sql" "database/sql"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/iwind/TeaGo/lists"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
"os" "os"
"strconv"
"sync/atomic" "sync/atomic"
"time" "time"
) )
@@ -19,6 +21,17 @@ type FileList struct {
onAdd func(item *Item) onAdd func(item *Item)
onRemove func(item *Item) onRemove func(item *Item)
existsByHashStmt *sql.Stmt // 根据hash检查是否存在
insertStmt *sql.Stmt // 写入数据
selectByHashStmt *sql.Stmt // 使用hash查询数据
deleteByHashStmt *sql.Stmt // 根据hash删除数据
statStmt *sql.Stmt // 统计
purgeStmt *sql.Stmt // 清理
deleteAllStmt *sql.Stmt // 删除所有数据
oldTables []string
itemsTableName string
} }
func NewFileList(dir string) ListInterface { func NewFileList(dir string) ListInterface {
@@ -36,51 +49,72 @@ func (this *FileList) Init() error {
remotelogs.Println("CACHE", "create cache dir '"+this.dir+"'") remotelogs.Println("CACHE", "create cache dir '"+this.dir+"'")
} }
db, err := sql.Open("sqlite3", "file:"+this.dir+"/index.db?cache=shared&mode=rwc") this.itemsTableName = "cacheItems_v2"
db, err := sql.Open("sqlite3", "file:"+this.dir+"/index.db?cache=shared&mode=rwc&_journal_mode=WAL")
if err != nil { if err != nil {
return err return err
} }
db.SetMaxOpenConns(1) db.SetMaxOpenConns(1)
this.db = db
_, err = db.Exec("VACUUM") // 清除旧表
this.oldTables = []string{
"cacheItems",
}
err = this.removeOldTables()
if err != nil {
remotelogs.Warn("CACHE", "clean old tables failed: "+err.Error())
}
// TODO 耗时过长,暂时不整理数据库
/**_, err = db.Exec("VACUUM")
if err != nil { if err != nil {
return err return err
} }**/
// 创建 // 创建
// TODO accessesAt 用来存储访问时间,将来可以根据此访问时间删除不常访问的内容 // TODO accessesAt 用来存储访问时间,将来可以根据此访问时间删除不常访问的内容
// 且访问时间只需要每隔一个小时存储一个整数值即可,因为不需要那么精确 // 且访问时间只需要每隔一个小时存储一个整数值即可,因为不需要那么精确
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS "cacheItems" ( _, err = db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.itemsTableName + `" (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"hash" varchar(32), "hash" varchar(32),
"key" varchar(1024), "key" varchar(1024),
"headerSize" integer DEFAULT 0, "headerSize" integer DEFAULT 0,
"bodySize" integer DEFAULT 0, "bodySize" integer DEFAULT 0,
"metaSize" integer DEFAULT 0, "metaSize" integer DEFAULT 0,
"expiredAt" integer DEFAULT 0, "expiredAt" integer DEFAULT 0,
"accessedAt" integer DEFAULT 0 "accessedAt" integer DEFAULT 0,
"host" varchar(128),
"serverId" integer
);
CREATE INDEX IF NOT EXISTS "accessedAt"
ON "` + this.itemsTableName + `" (
"accessedAt" ASC
);
CREATE INDEX IF NOT EXISTS "expiredAt"
ON "` + this.itemsTableName + `" (
"expiredAt" ASC
); );
CREATE UNIQUE INDEX IF NOT EXISTS "hash" CREATE UNIQUE INDEX IF NOT EXISTS "hash"
ON "cacheItems" ( ON "` + this.itemsTableName + `" (
"hash" "hash" ASC
); );
CREATE INDEX IF NOT EXISTS "expiredAt"
ON "cacheItems" ( CREATE INDEX IF NOT EXISTS "serverId"
"expiredAt" ON "` + this.itemsTableName + `" (
); "serverId" ASC
CREATE INDEX IF NOT EXISTS "accessedAt"
ON "cacheItems" (
"accessedAt"
); );
`) `)
if err != nil { if err != nil {
return err return err
} }
this.db = db
// 读取总数量 // 读取总数量
row := this.db.QueryRow("SELECT COUNT(*) FROM cacheItems") row := this.db.QueryRow(`SELECT COUNT(*) FROM "` + this.itemsTableName + `"`)
if row.Err() != nil { if row.Err() != nil {
return row.Err() return row.Err()
} }
@@ -91,6 +125,42 @@ ON "cacheItems" (
} }
this.total = total this.total = total
// 常用语句
this.existsByHashStmt, err = this.db.Prepare(`SELECT "bodySize" FROM "` + this.itemsTableName + `" WHERE "hash"=? AND expiredAt>? LIMIT 1`)
if err != nil {
return err
}
this.insertStmt, err = this.db.Prepare(`INSERT INTO "` + this.itemsTableName + `" ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "host", "serverId") VALUES (?, ?, ?, ?, ?, ?, ?, ?)`)
if err != nil {
return err
}
this.selectByHashStmt, err = this.db.Prepare(`SELECT "key", "headerSize", "bodySize", "metaSize", "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? LIMIT 1`)
if err != nil {
return err
}
this.deleteByHashStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `" WHERE "hash"=?`)
if err != nil {
return err
}
this.statStmt, err = this.db.Prepare(`SELECT COUNT(*), IFNULL(SUM(headerSize+bodySize+metaSize), 0), IFNULL(SUM(headerSize+bodySize), 0) FROM "` + this.itemsTableName + `" WHERE expiredAt>?`)
if err != nil {
return err
}
this.purgeStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.itemsTableName + `" WHERE expiredAt<=? LIMIT ?`)
if err != nil {
return err
}
this.deleteAllStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `"`)
if err != nil {
return err
}
return nil return nil
} }
@@ -100,7 +170,7 @@ func (this *FileList) Reset() error {
} }
func (this *FileList) Add(hash string, item *Item) error { func (this *FileList) Add(hash string, item *Item) error {
_, err := this.db.Exec(`INSERT INTO cacheItems ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt") VALUES (?, ?, ?, ?, ?, ?)`, hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt) _, err := this.insertStmt.Exec(hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt, item.Host, item.ServerId)
if err != nil { if err != nil {
return err return err
} }
@@ -114,54 +184,43 @@ func (this *FileList) Add(hash string, item *Item) error {
} }
func (this *FileList) Exist(hash string) (bool, error) { func (this *FileList) Exist(hash string) (bool, error) {
row := this.db.QueryRow(`SELECT "bodySize" FROM cacheItems WHERE "hash"=? LIMIT 1`, hash) rows, err := this.existsByHashStmt.Query(hash, time.Now().Unix())
if row == nil {
return false, nil
}
if row.Err() != nil {
return false, row.Err()
}
var bodySize int
err := row.Scan(&bodySize)
if err != nil { if err != nil {
if err == sql.ErrNoRows {
return false, nil
}
return false, err return false, err
} }
return true, nil
}
// FindKeysWithPrefix 根据前缀进行查找
func (this *FileList) FindKeysWithPrefix(prefix string) (keys []string, err error) {
if len(prefix) == 0 {
return
}
// TODO 需要优化上千万结果的情况
rows, err := this.db.Query(`SELECT "key" FROM cacheItems WHERE INSTR("key", ?)==1 LIMIT 100000`, prefix)
if err != nil {
return nil, err
}
defer func() { defer func() {
_ = rows.Close() _ = rows.Close()
}() }()
if rows.Next() {
return true, nil
}
return false, nil
}
for rows.Next() { // CleanPrefix 清理某个前缀的缓存数据
var key string func (this *FileList) CleanPrefix(prefix string) error {
err = rows.Scan(&key) if len(prefix) == 0 {
return nil
}
var count = int64(10000)
for {
result, err := this.db.Exec(`UPDATE "`+this.itemsTableName+`" SET expiredAt=0 WHERE id IN (SELECT id FROM "`+this.itemsTableName+`" WHERE expiredAt>0 AND INSTR("key", ?)==1 LIMIT `+strconv.FormatInt(count, 10)+`)`, prefix)
if err != nil { if err != nil {
return nil, err return err
}
affectedRows, err := result.RowsAffected()
if err != nil {
return err
}
if affectedRows < count {
return nil
} }
keys = append(keys, key)
} }
return
} }
func (this *FileList) Remove(hash string) error { func (this *FileList) Remove(hash string) error {
row := this.db.QueryRow(`SELECT "key", "headerSize", "bodySize", "metaSize", "expiredAt" FROM cacheItems WHERE "hash"=? LIMIT 1`, hash) row := this.selectByHashStmt.QueryRow(hash)
if row.Err() != nil { if row.Err() != nil {
return row.Err() return row.Err()
} }
@@ -175,7 +234,7 @@ func (this *FileList) Remove(hash string) error {
return err return err
} }
_, err = this.db.Exec(`DELETE FROM cacheItems WHERE "hash"=?`, hash) _, err = this.deleteByHashStmt.Exec(hash)
if err != nil { if err != nil {
return err return err
} }
@@ -197,7 +256,7 @@ func (this *FileList) Purge(count int, callback func(hash string) error) error {
count = 1000 count = 1000
} }
rows, err := this.db.Query(`SELECT "hash" FROM cacheItems WHERE expiredAt<=? LIMIT ?`, time.Now().Unix(), count) rows, err := this.purgeStmt.Query(time.Now().Unix(), count)
if err != nil { if err != nil {
return err return err
} }
@@ -232,7 +291,7 @@ func (this *FileList) Purge(count int, callback func(hash string) error) error {
} }
func (this *FileList) CleanAll() error { func (this *FileList) CleanAll() error {
_, err := this.db.Exec("DELETE FROM cacheItems") _, err := this.deleteAllStmt.Exec()
if err != nil { if err != nil {
return err return err
} }
@@ -242,7 +301,7 @@ func (this *FileList) CleanAll() error {
func (this *FileList) Stat(check func(hash string) bool) (*Stat, error) { func (this *FileList) Stat(check func(hash string) bool) (*Stat, error) {
// 这里不设置过期时间、不使用 check 函数,目的是让查询更快速一些 // 这里不设置过期时间、不使用 check 函数,目的是让查询更快速一些
row := this.db.QueryRow("SELECT COUNT(*), IFNULL(SUM(headerSize+bodySize+metaSize), 0), IFNULL(SUM(headerSize+bodySize), 0) FROM cacheItems") row := this.statStmt.QueryRow(time.Now().Unix())
if row.Err() != nil { if row.Err() != nil {
return nil, row.Err() return nil, row.Err()
} }
@@ -270,3 +329,37 @@ func (this *FileList) OnAdd(f func(item *Item)) {
func (this *FileList) OnRemove(f func(item *Item)) { func (this *FileList) OnRemove(f func(item *Item)) {
this.onRemove = f this.onRemove = f
} }
func (this *FileList) Close() error {
if this.db != nil {
return this.db.Close()
}
return nil
}
func (this *FileList) removeOldTables() error {
rows, err := this.db.Query(`SELECT "name" FROM sqlite_master WHERE "type"='table'`)
if err != nil {
return err
}
defer func() {
_ = rows.Close()
}()
for rows.Next() {
var name string
err = rows.Scan(&name)
if err != nil {
return err
}
if lists.ContainsString(this.oldTables, name) {
// 异步执行
go func() {
remotelogs.Println("CACHE", "remove old table '"+name+"' ...")
_, _ = this.db.Exec(`DROP TABLE "` + name + `"`)
remotelogs.Println("CACHE", "remove old table '"+name+"' done")
}()
}
}
return nil
}

View File

@@ -4,8 +4,10 @@ package caches
import ( import (
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/rands"
stringutil "github.com/iwind/TeaGo/utils/string" stringutil "github.com/iwind/TeaGo/utils/string"
"strconv" "strconv"
"sync"
"testing" "testing"
"time" "time"
) )
@@ -31,6 +33,8 @@ func TestFileList_Add(t *testing.T) {
HeaderSize: 1, HeaderSize: 1,
MetaSize: 2, MetaSize: 2,
BodySize: 3, BodySize: 3,
Host: "teaos.cn",
ServerId: 1,
}) })
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@@ -44,11 +48,12 @@ func TestFileList_Add_Many(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
for i := 0; i < 100_0000; i++ { before := time.Now()
for i := 0; i < 2000_0000; i++ {
u := "http://edge.teaos.cn/123456" + strconv.Itoa(i) u := "http://edge.teaos.cn/123456" + strconv.Itoa(i)
err = list.Add(stringutil.Md5(u), &Item{ _ = list.Add(stringutil.Md5(u), &Item{
Key: u, Key: u,
ExpiredAt: time.Now().Unix(), ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1, HeaderSize: 1,
MetaSize: 2, MetaSize: 2,
BodySize: 3, BodySize: 3,
@@ -56,6 +61,10 @@ func TestFileList_Add_Many(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if i > 0 && i%10_000 == 0 {
t.Log(i, int(10000/time.Since(before).Seconds()), "qps")
before = time.Now()
}
} }
t.Log("ok") t.Log("ok")
} }
@@ -66,6 +75,10 @@ func TestFileList_Exist(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
before := time.Now()
defer func() {
t.Log(time.Since(before).Seconds()*1000, "ms")
}()
{ {
exists, err := list.Exist(stringutil.Md5("123456")) exists, err := list.Exist(stringutil.Md5("123456"))
if err != nil { if err != nil {
@@ -74,7 +87,7 @@ func TestFileList_Exist(t *testing.T) {
t.Log("exists:", exists) t.Log("exists:", exists)
} }
{ {
exists, err := list.Exist(stringutil.Md5("654321")) exists, err := list.Exist(stringutil.Md5("http://edge.teaos.cn/1234561"))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -82,18 +95,70 @@ func TestFileList_Exist(t *testing.T) {
} }
} }
func TestFileList_FindKeysWithPrefix(t *testing.T) { func TestFileList_Exist_Many_DB(t *testing.T) {
// 测试在多个数据库下的性能
var listSlice = []ListInterface{}
for i := 1; i <= 10; i++ {
list := NewFileList(Tea.Root + "/data/data" + strconv.Itoa(i))
err := list.Init()
if err != nil {
t.Fatal(err)
}
listSlice = append(listSlice, list)
}
var wg = sync.WaitGroup{}
var threads = 8
wg.Add(threads)
var count = 200_000
var countLocker sync.Mutex
var tasks = make(chan int, count)
for i := 0; i < count; i++ {
tasks <- i
}
var hash = stringutil.Md5("http://edge.teaos.cn/1234561")
before := time.Now()
defer func() {
t.Log(time.Since(before).Seconds()*1000, "ms")
}()
for i := 0; i < threads; i++ {
go func() {
defer wg.Done()
for {
select {
case <-tasks:
countLocker.Lock()
count--
countLocker.Unlock()
var list = listSlice[rands.Int(0, len(listSlice)-1)]
_, _ = list.Exist(hash)
default:
return
}
}
}()
}
wg.Wait()
t.Log("left:", count)
}
func TestFileList_CleanPrefix(t *testing.T) {
list := NewFileList(Tea.Root + "/data") list := NewFileList(Tea.Root + "/data")
err := list.Init() err := list.Init()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
before := time.Now() before := time.Now()
keys, err := list.FindKeysWithPrefix("1234") err = list.CleanPrefix("1234")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
t.Log("keys:", keys)
t.Log(time.Since(before).Seconds()*1000, "ms") t.Log(time.Since(before).Seconds()*1000, "ms")
} }
@@ -170,3 +235,14 @@ func TestFileList_CleanAll(t *testing.T) {
t.Log("ok") t.Log("ok")
t.Log(list.Count()) t.Log(list.Count())
} }
func BenchmarkFileList_Exist(b *testing.B) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
b.Fatal(err)
}
for i := 0; i < b.N; i++ {
_, _ = list.Exist("f0eb5b87e0b0041f3917002c0707475f")
}
}

View File

@@ -3,23 +3,31 @@
package caches package caches
type ListInterface interface { type ListInterface interface {
// Init 初始化
Init() error Init() error
// Reset 重置数据
Reset() error Reset() error
// Add 添加内容
Add(hash string, item *Item) error Add(hash string, item *Item) error
// Exist 检查内容是否存在
Exist(hash string) (bool, error) Exist(hash string) (bool, error)
// FindKeysWithPrefix 根据前缀进行查找 // CleanPrefix 清除某个前缀的缓存
FindKeysWithPrefix(prefix string) (keys []string, err error) CleanPrefix(prefix string) error
// Remove 删除内容
Remove(hash string) error Remove(hash string) error
// Purge 清理过期数据
Purge(count int, callback func(hash string) error) error Purge(count int, callback func(hash string) error) error
// CleanAll 清除所有缓存
CleanAll() error CleanAll() error
// Stat 统计
Stat(check func(hash string) bool) (*Stat, error) Stat(check func(hash string) bool) (*Stat, error)
// Count 总数量 // Count 总数量
@@ -30,4 +38,7 @@ type ListInterface interface {
// OnRemove 删除事件 // OnRemove 删除事件
OnRemove(f func(item *Item)) OnRemove(f func(item *Item))
// Close 关闭
Close() error
} }

View File

@@ -92,8 +92,8 @@ func (this *MemoryList) Exist(hash string) (bool, error) {
return !item.IsExpired(), nil return !item.IsExpired(), nil
} }
// FindKeysWithPrefix 根据前缀进行查找 // CleanPrefix 根据前缀进行清除
func (this *MemoryList) FindKeysWithPrefix(prefix string) (keys []string, err error) { func (this *MemoryList) CleanPrefix(prefix string) error {
this.locker.RLock() this.locker.RLock()
defer this.locker.RUnlock() defer this.locker.RUnlock()
@@ -101,11 +101,11 @@ func (this *MemoryList) FindKeysWithPrefix(prefix string) (keys []string, err er
for _, itemMap := range this.itemMaps { for _, itemMap := range this.itemMaps {
for _, item := range itemMap { for _, item := range itemMap {
if strings.HasPrefix(item.Key, prefix) { if strings.HasPrefix(item.Key, prefix) {
keys = append(keys, item.Key) item.ExpiredAt = 0
} }
} }
} }
return return nil
} }
func (this *MemoryList) Remove(hash string) error { func (this *MemoryList) Remove(hash string) error {
@@ -225,6 +225,10 @@ func (this *MemoryList) OnRemove(f func(item *Item)) {
this.onRemove = f this.onRemove = f
} }
func (this *MemoryList) Close() error {
return nil
}
func (this *MemoryList) print(t *testing.T) { func (this *MemoryList) print(t *testing.T) {
this.locker.Lock() this.locker.Lock()
for _, itemMap := range this.itemMaps { for _, itemMap := range this.itemMaps {

View File

@@ -134,7 +134,7 @@ func TestMemoryList_Stat(t *testing.T) {
t.Log(result) t.Log(result)
} }
func TestMemoryList_FindKeysWithPrefix(t *testing.T) { func TestMemoryList_CleanPrefix(t *testing.T) {
list := NewMemoryList() list := NewMemoryList()
_ = list.Init() _ = list.Init()
before := time.Now() before := time.Now()
@@ -142,7 +142,7 @@ func TestMemoryList_FindKeysWithPrefix(t *testing.T) {
key := "http://www.teaos.cn/hello/" + strconv.Itoa(i/10000) + "/" + strconv.Itoa(i) + ".html" key := "http://www.teaos.cn/hello/" + strconv.Itoa(i/10000) + "/" + strconv.Itoa(i) + ".html"
_ = list.Add(fmt.Sprintf("%d", xxhash.Sum64String(key)), &Item{ _ = list.Add(fmt.Sprintf("%d", xxhash.Sum64String(key)), &Item{
Key: key, Key: key,
ExpiredAt: 0, ExpiredAt: time.Now().Unix() + 3600,
BodySize: 0, BodySize: 0,
HeaderSize: 0, HeaderSize: 0,
}) })
@@ -150,11 +150,15 @@ func TestMemoryList_FindKeysWithPrefix(t *testing.T) {
t.Log(time.Since(before).Seconds()*1000, "ms") t.Log(time.Since(before).Seconds()*1000, "ms")
before = time.Now() before = time.Now()
keys, err := list.FindKeysWithPrefix("http://www.teaos.cn/hello/50") err := list.CleanPrefix("http://www.teaos.cn/hello/10")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
t.Log(len(keys))
logs.Println(list.Stat(func(hash string) bool {
return true
}))
t.Log(time.Since(before).Seconds()*1000, "ms") t.Log(time.Since(before).Seconds()*1000, "ms")
} }

View File

@@ -191,9 +191,10 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) {
} }
} }
_, path := this.keyPath(key) hash, path := this.keyPath(key)
// TODO 尝试使用mmap加快读取速度 // TODO 尝试使用mmap加快读取速度
var isOk = false
fp, err := os.OpenFile(path, os.O_RDONLY, 0444) fp, err := os.OpenFile(path, os.O_RDONLY, 0444)
if err != nil { if err != nil {
if !os.IsNotExist(err) { if !os.IsNotExist(err) {
@@ -201,6 +202,21 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) {
} }
return nil, ErrNotFound return nil, ErrNotFound
} }
defer func() {
if !isOk {
_ = fp.Close()
_ = os.Remove(path)
}
}()
// 检查文件记录是否已过期
exists, err := this.list.Exist(hash)
if err != nil {
return nil, err
}
if !exists {
return nil, ErrNotFound
}
reader := NewFileReader(fp) reader := NewFileReader(fp)
if err != nil { if err != nil {
@@ -210,6 +226,8 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
isOk = true
return reader, nil return reader, nil
} }
@@ -520,36 +538,21 @@ func (this *FileStorage) Purge(keys []string, urlType string) error {
// 目录 // 目录
if urlType == "dir" { if urlType == "dir" {
resultKeys := []string{}
for _, key := range keys { for _, key := range keys {
subKeys, err := this.list.FindKeysWithPrefix(key) err := this.list.CleanPrefix(key)
if err != nil { if err != nil {
return err return err
} }
resultKeys = append(resultKeys, subKeys...)
} }
keys = resultKeys
} }
// 文件 // 文件
for _, key := range keys { for _, key := range keys {
hash, path := this.keyPath(key) hash, path := this.keyPath(key)
exists, err := this.list.Exist(hash)
if err != nil {
return err
}
if !exists {
err := os.Remove(path) err := os.Remove(path)
if err != nil && !os.IsNotExist(err) { if err != nil && !os.IsNotExist(err) {
return err return err
} }
continue
}
err = os.Remove(path)
if err != nil && !os.IsNotExist(err) {
return err
}
err = this.list.Remove(hash) err = this.list.Remove(hash)
if err != nil { if err != nil {
return err return err
@@ -572,6 +575,8 @@ func (this *FileStorage) Stop() {
if this.ticker != nil { if this.ticker != nil {
this.ticker.Stop() this.ticker.Stop()
} }
_ = this.list.Close()
} }
// TotalDiskSize 消耗的磁盘尺寸 // TotalDiskSize 消耗的磁盘尺寸
@@ -742,7 +747,7 @@ func (this *FileStorage) decodeFile(path string) (*Item, error) {
// 清理任务 // 清理任务
func (this *FileStorage) purgeLoop() { func (this *FileStorage) purgeLoop() {
_ = this.list.Purge(1000, func(hash string) error { err := this.list.Purge(1000, func(hash string) error {
path := this.hashPath(hash) path := this.hashPath(hash)
err := os.Remove(path) err := os.Remove(path)
if err != nil && !os.IsNotExist(err) { if err != nil && !os.IsNotExist(err) {
@@ -750,6 +755,9 @@ func (this *FileStorage) purgeLoop() {
} }
return nil return nil
}) })
if err != nil {
remotelogs.Warn("CACHE", "purge file storage failed: " + err.Error())
}
} }
func (this *FileStorage) readToBuff(fp *os.File, buf []byte) (ok bool, err error) { func (this *FileStorage) readToBuff(fp *os.File, buf []byte) (ok bool, err error) {

View File

@@ -3,8 +3,10 @@ package caches
import ( import (
"fmt" "fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/cespare/xxhash" "github.com/cespare/xxhash"
"runtime"
"strconv" "strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
@@ -182,15 +184,12 @@ func (this *MemoryStorage) CleanAll() error {
func (this *MemoryStorage) Purge(keys []string, urlType string) error { func (this *MemoryStorage) Purge(keys []string, urlType string) error {
// 目录 // 目录
if urlType == "dir" { if urlType == "dir" {
resultKeys := []string{}
for _, key := range keys { for _, key := range keys {
subKeys, err := this.list.FindKeysWithPrefix(key) err := this.list.CleanPrefix(key)
if err != nil { if err != nil {
return err return err
} }
resultKeys = append(resultKeys, subKeys...)
} }
keys = resultKeys
} }
for _, key := range keys { for _, key := range keys {
@@ -205,13 +204,21 @@ func (this *MemoryStorage) Purge(keys []string, urlType string) error {
// Stop 停止缓存策略 // Stop 停止缓存策略
func (this *MemoryStorage) Stop() { func (this *MemoryStorage) Stop() {
this.locker.Lock() this.locker.Lock()
defer this.locker.Unlock()
this.valuesMap = map[uint64]*MemoryItem{} this.valuesMap = map[uint64]*MemoryItem{}
this.writingKeyMap = map[string]bool{}
_ = this.list.Reset() _ = this.list.Reset()
if this.ticker != nil { if this.ticker != nil {
this.ticker.Stop() this.ticker.Stop()
} }
_ = this.list.Close()
this.locker.Unlock()
runtime.GC()
remotelogs.Println("CACHE", "close memory storage '"+strconv.FormatInt(this.policy.Id, 10)+"'")
} }
// Policy 获取当前存储的Policy // Policy 获取当前存储的Policy

View File

@@ -11,6 +11,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/types" "github.com/iwind/TeaGo/types"
"golang.org/x/net/http2" "golang.org/x/net/http2"
"io"
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
@@ -1153,7 +1154,7 @@ func (this *HTTPRequest) canIgnore(err error) bool {
} }
// 客户端主动取消 // 客户端主动取消
if err == context.Canceled { if err == context.Canceled || err == io.ErrShortWrite {
return true return true
} }

View File

@@ -231,6 +231,8 @@ func (this *HTTPWriter) Close() {
ExpiredAt: this.cacheWriter.ExpiredAt(), ExpiredAt: this.cacheWriter.ExpiredAt(),
HeaderSize: this.cacheWriter.HeaderSize(), HeaderSize: this.cacheWriter.HeaderSize(),
BodySize: this.cacheWriter.BodySize(), BodySize: this.cacheWriter.BodySize(),
Host: this.req.Host,
ServerId: this.req.Server.Id,
}) })
} }
} else { } else {