From 468af65edd8ce5b627c900560ffd71ec5b76c357 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Sun, 13 Jun 2021 17:37:57 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=96=87=E4=BB=B6=E7=BC=93?= =?UTF-8?q?=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/caches/item.go | 2 + internal/caches/list_file.go | 209 +++++++++++++----- internal/caches/list_file_test.go | 90 +++++++- .../caches/{list.go => list_interface.go} | 15 +- internal/caches/list_memory.go | 12 +- internal/caches/list_memory_test.go | 12 +- internal/caches/storage_file.go | 46 ++-- internal/caches/storage_memory.go | 17 +- internal/caches/writer_memory.go | 2 +- internal/nodes/http_request.go | 3 +- internal/nodes/http_writer.go | 2 + 11 files changed, 309 insertions(+), 101 deletions(-) rename internal/caches/{list.go => list_interface.go} (63%) diff --git a/internal/caches/item.go b/internal/caches/item.go index ab2f9f7..e86af0c 100644 --- a/internal/caches/item.go +++ b/internal/caches/item.go @@ -18,6 +18,8 @@ type Item struct { HeaderSize int64 `json:"headerSize"` BodySize int64 `json:"bodySize"` MetaSize int64 `json:"metaSize"` + Host string `json:"host"` // 主机名 + ServerId int64 `json:"serverId"` // 服务ID } func (this *Item) IsExpired() bool { diff --git a/internal/caches/list_file.go b/internal/caches/list_file.go index dcad6f5..318814b 100644 --- a/internal/caches/list_file.go +++ b/internal/caches/list_file.go @@ -5,8 +5,10 @@ package caches import ( "database/sql" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/iwind/TeaGo/lists" _ "github.com/mattn/go-sqlite3" "os" + "strconv" "sync/atomic" "time" ) @@ -19,6 +21,17 @@ type FileList struct { onAdd func(item *Item) onRemove func(item *Item) + + existsByHashStmt *sql.Stmt // 根据hash检查是否存在 + insertStmt *sql.Stmt // 写入数据 + selectByHashStmt *sql.Stmt // 使用hash查询数据 + deleteByHashStmt *sql.Stmt // 根据hash删除数据 + statStmt *sql.Stmt // 统计 + purgeStmt *sql.Stmt // 清理 + deleteAllStmt *sql.Stmt // 删除所有数据 + + oldTables []string + itemsTableName string } func NewFileList(dir string) ListInterface { @@ -36,51 +49,72 @@ func (this *FileList) Init() error { remotelogs.Println("CACHE", "create cache dir '"+this.dir+"'") } - db, err := sql.Open("sqlite3", "file:"+this.dir+"/index.db?cache=shared&mode=rwc") + this.itemsTableName = "cacheItems_v2" + + db, err := sql.Open("sqlite3", "file:"+this.dir+"/index.db?cache=shared&mode=rwc&_journal_mode=WAL") if err != nil { return err } db.SetMaxOpenConns(1) + this.db = db - _, err = db.Exec("VACUUM") + // 清除旧表 + this.oldTables = []string{ + "cacheItems", + } + err = this.removeOldTables() + if err != nil { + remotelogs.Warn("CACHE", "clean old tables failed: "+err.Error()) + } + + // TODO 耗时过长,暂时不整理数据库 + /**_, err = db.Exec("VACUUM") if err != nil { return err - } + }**/ // 创建 // TODO accessesAt 用来存储访问时间,将来可以根据此访问时间删除不常访问的内容 // 且访问时间只需要每隔一个小时存储一个整数值即可,因为不需要那么精确 - _, err = db.Exec(`CREATE TABLE IF NOT EXISTS "cacheItems" ( + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.itemsTableName + `" ( + "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "hash" varchar(32), "key" varchar(1024), "headerSize" integer DEFAULT 0, "bodySize" integer DEFAULT 0, "metaSize" integer DEFAULT 0, "expiredAt" integer DEFAULT 0, - "accessedAt" integer DEFAULT 0 + "accessedAt" integer DEFAULT 0, + "host" varchar(128), + "serverId" integer +); + +CREATE INDEX IF NOT EXISTS "accessedAt" +ON "` + this.itemsTableName + `" ( + "accessedAt" ASC +); + +CREATE INDEX IF NOT EXISTS "expiredAt" +ON "` + this.itemsTableName + `" ( + "expiredAt" ASC ); CREATE UNIQUE INDEX IF NOT EXISTS "hash" -ON "cacheItems" ( - "hash" +ON "` + this.itemsTableName + `" ( + "hash" ASC ); -CREATE INDEX IF NOT EXISTS "expiredAt" -ON "cacheItems" ( - "expiredAt" -); -CREATE INDEX IF NOT EXISTS "accessedAt" -ON "cacheItems" ( - "accessedAt" + +CREATE INDEX IF NOT EXISTS "serverId" +ON "` + this.itemsTableName + `" ( + "serverId" ASC ); `) if err != nil { return err } - this.db = db - // 读取总数量 - row := this.db.QueryRow("SELECT COUNT(*) FROM cacheItems") + row := this.db.QueryRow(`SELECT COUNT(*) FROM "` + this.itemsTableName + `"`) if row.Err() != nil { return row.Err() } @@ -91,6 +125,42 @@ ON "cacheItems" ( } this.total = total + // 常用语句 + this.existsByHashStmt, err = this.db.Prepare(`SELECT "bodySize" FROM "` + this.itemsTableName + `" WHERE "hash"=? AND expiredAt>? LIMIT 1`) + if err != nil { + return err + } + + this.insertStmt, err = this.db.Prepare(`INSERT INTO "` + this.itemsTableName + `" ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "host", "serverId") VALUES (?, ?, ?, ?, ?, ?, ?, ?)`) + if err != nil { + return err + } + + this.selectByHashStmt, err = this.db.Prepare(`SELECT "key", "headerSize", "bodySize", "metaSize", "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? LIMIT 1`) + if err != nil { + return err + } + + this.deleteByHashStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `" WHERE "hash"=?`) + if err != nil { + return err + } + + this.statStmt, err = this.db.Prepare(`SELECT COUNT(*), IFNULL(SUM(headerSize+bodySize+metaSize), 0), IFNULL(SUM(headerSize+bodySize), 0) FROM "` + this.itemsTableName + `" WHERE expiredAt>?`) + if err != nil { + return err + } + + this.purgeStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.itemsTableName + `" WHERE expiredAt<=? LIMIT ?`) + if err != nil { + return err + } + + this.deleteAllStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `"`) + if err != nil { + return err + } + return nil } @@ -100,7 +170,7 @@ func (this *FileList) Reset() error { } func (this *FileList) Add(hash string, item *Item) error { - _, err := this.db.Exec(`INSERT INTO cacheItems ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt") VALUES (?, ?, ?, ?, ?, ?)`, hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt) + _, err := this.insertStmt.Exec(hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt, item.Host, item.ServerId) if err != nil { return err } @@ -114,54 +184,43 @@ func (this *FileList) Add(hash string, item *Item) error { } func (this *FileList) Exist(hash string) (bool, error) { - row := this.db.QueryRow(`SELECT "bodySize" FROM cacheItems WHERE "hash"=? LIMIT 1`, hash) - if row == nil { - return false, nil - } - if row.Err() != nil { - return false, row.Err() - } - var bodySize int - err := row.Scan(&bodySize) + rows, err := this.existsByHashStmt.Query(hash, time.Now().Unix()) if err != nil { - if err == sql.ErrNoRows { - return false, nil - } return false, err } - return true, nil -} - -// FindKeysWithPrefix 根据前缀进行查找 -func (this *FileList) FindKeysWithPrefix(prefix string) (keys []string, err error) { - if len(prefix) == 0 { - return - } - - // TODO 需要优化上千万结果的情况 - - rows, err := this.db.Query(`SELECT "key" FROM cacheItems WHERE INSTR("key", ?)==1 LIMIT 100000`, prefix) - if err != nil { - return nil, err - } defer func() { _ = rows.Close() }() + if rows.Next() { + return true, nil + } + return false, nil +} - for rows.Next() { - var key string - err = rows.Scan(&key) - if err != nil { - return nil, err - } - keys = append(keys, key) +// CleanPrefix 清理某个前缀的缓存数据 +func (this *FileList) CleanPrefix(prefix string) error { + if len(prefix) == 0 { + return nil } - return + var count = int64(10000) + for { + result, err := this.db.Exec(`UPDATE "`+this.itemsTableName+`" SET expiredAt=0 WHERE id IN (SELECT id FROM "`+this.itemsTableName+`" WHERE expiredAt>0 AND INSTR("key", ?)==1 LIMIT `+strconv.FormatInt(count, 10)+`)`, prefix) + if err != nil { + return err + } + affectedRows, err := result.RowsAffected() + if err != nil { + return err + } + if affectedRows < count { + return nil + } + } } func (this *FileList) Remove(hash string) error { - row := this.db.QueryRow(`SELECT "key", "headerSize", "bodySize", "metaSize", "expiredAt" FROM cacheItems WHERE "hash"=? LIMIT 1`, hash) + row := this.selectByHashStmt.QueryRow(hash) if row.Err() != nil { return row.Err() } @@ -175,7 +234,7 @@ func (this *FileList) Remove(hash string) error { return err } - _, err = this.db.Exec(`DELETE FROM cacheItems WHERE "hash"=?`, hash) + _, err = this.deleteByHashStmt.Exec(hash) if err != nil { return err } @@ -197,7 +256,7 @@ func (this *FileList) Purge(count int, callback func(hash string) error) error { count = 1000 } - rows, err := this.db.Query(`SELECT "hash" FROM cacheItems WHERE expiredAt<=? LIMIT ?`, time.Now().Unix(), count) + rows, err := this.purgeStmt.Query(time.Now().Unix(), count) if err != nil { return err } @@ -232,7 +291,7 @@ func (this *FileList) Purge(count int, callback func(hash string) error) error { } func (this *FileList) CleanAll() error { - _, err := this.db.Exec("DELETE FROM cacheItems") + _, err := this.deleteAllStmt.Exec() if err != nil { return err } @@ -242,7 +301,7 @@ func (this *FileList) CleanAll() error { func (this *FileList) Stat(check func(hash string) bool) (*Stat, error) { // 这里不设置过期时间、不使用 check 函数,目的是让查询更快速一些 - row := this.db.QueryRow("SELECT COUNT(*), IFNULL(SUM(headerSize+bodySize+metaSize), 0), IFNULL(SUM(headerSize+bodySize), 0) FROM cacheItems") + row := this.statStmt.QueryRow(time.Now().Unix()) if row.Err() != nil { return nil, row.Err() } @@ -270,3 +329,37 @@ func (this *FileList) OnAdd(f func(item *Item)) { func (this *FileList) OnRemove(f func(item *Item)) { this.onRemove = f } + +func (this *FileList) Close() error { + if this.db != nil { + return this.db.Close() + } + return nil +} + +func (this *FileList) removeOldTables() error { + rows, err := this.db.Query(`SELECT "name" FROM sqlite_master WHERE "type"='table'`) + if err != nil { + return err + } + defer func() { + _ = rows.Close() + }() + for rows.Next() { + var name string + err = rows.Scan(&name) + if err != nil { + return err + } + if lists.ContainsString(this.oldTables, name) { + // 异步执行 + go func() { + remotelogs.Println("CACHE", "remove old table '"+name+"' ...") + _, _ = this.db.Exec(`DROP TABLE "` + name + `"`) + remotelogs.Println("CACHE", "remove old table '"+name+"' done") + }() + } + + } + return nil +} diff --git a/internal/caches/list_file_test.go b/internal/caches/list_file_test.go index 8923d7a..c71b231 100644 --- a/internal/caches/list_file_test.go +++ b/internal/caches/list_file_test.go @@ -4,8 +4,10 @@ package caches import ( "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/rands" stringutil "github.com/iwind/TeaGo/utils/string" "strconv" + "sync" "testing" "time" ) @@ -31,6 +33,8 @@ func TestFileList_Add(t *testing.T) { HeaderSize: 1, MetaSize: 2, BodySize: 3, + Host: "teaos.cn", + ServerId: 1, }) if err != nil { t.Fatal(err) @@ -44,11 +48,12 @@ func TestFileList_Add_Many(t *testing.T) { if err != nil { t.Fatal(err) } - for i := 0; i < 100_0000; i++ { + before := time.Now() + for i := 0; i < 2000_0000; i++ { u := "http://edge.teaos.cn/123456" + strconv.Itoa(i) - err = list.Add(stringutil.Md5(u), &Item{ + _ = list.Add(stringutil.Md5(u), &Item{ Key: u, - ExpiredAt: time.Now().Unix(), + ExpiredAt: time.Now().Unix() + 3600, HeaderSize: 1, MetaSize: 2, BodySize: 3, @@ -56,6 +61,10 @@ func TestFileList_Add_Many(t *testing.T) { if err != nil { t.Fatal(err) } + if i > 0 && i%10_000 == 0 { + t.Log(i, int(10000/time.Since(before).Seconds()), "qps") + before = time.Now() + } } t.Log("ok") } @@ -66,6 +75,10 @@ func TestFileList_Exist(t *testing.T) { if err != nil { t.Fatal(err) } + before := time.Now() + defer func() { + t.Log(time.Since(before).Seconds()*1000, "ms") + }() { exists, err := list.Exist(stringutil.Md5("123456")) if err != nil { @@ -74,7 +87,7 @@ func TestFileList_Exist(t *testing.T) { t.Log("exists:", exists) } { - exists, err := list.Exist(stringutil.Md5("654321")) + exists, err := list.Exist(stringutil.Md5("http://edge.teaos.cn/1234561")) if err != nil { t.Fatal(err) } @@ -82,18 +95,70 @@ func TestFileList_Exist(t *testing.T) { } } -func TestFileList_FindKeysWithPrefix(t *testing.T) { +func TestFileList_Exist_Many_DB(t *testing.T) { + // 测试在多个数据库下的性能 + var listSlice = []ListInterface{} + for i := 1; i <= 10; i++ { + list := NewFileList(Tea.Root + "/data/data" + strconv.Itoa(i)) + err := list.Init() + if err != nil { + t.Fatal(err) + } + listSlice = append(listSlice, list) + } + + var wg = sync.WaitGroup{} + var threads = 8 + wg.Add(threads) + + var count = 200_000 + var countLocker sync.Mutex + var tasks = make(chan int, count) + for i := 0; i < count; i++ { + tasks <- i + } + + var hash = stringutil.Md5("http://edge.teaos.cn/1234561") + + before := time.Now() + defer func() { + t.Log(time.Since(before).Seconds()*1000, "ms") + }() + + for i := 0; i < threads; i++ { + go func() { + defer wg.Done() + + for { + select { + case <-tasks: + countLocker.Lock() + count-- + countLocker.Unlock() + + var list = listSlice[rands.Int(0, len(listSlice)-1)] + _, _ = list.Exist(hash) + default: + return + } + } + }() + } + wg.Wait() + t.Log("left:", count) +} + +func TestFileList_CleanPrefix(t *testing.T) { list := NewFileList(Tea.Root + "/data") err := list.Init() if err != nil { t.Fatal(err) } before := time.Now() - keys, err := list.FindKeysWithPrefix("1234") + err = list.CleanPrefix("1234") if err != nil { t.Fatal(err) } - t.Log("keys:", keys) t.Log(time.Since(before).Seconds()*1000, "ms") } @@ -170,3 +235,14 @@ func TestFileList_CleanAll(t *testing.T) { t.Log("ok") t.Log(list.Count()) } + +func BenchmarkFileList_Exist(b *testing.B) { + list := NewFileList(Tea.Root + "/data") + err := list.Init() + if err != nil { + b.Fatal(err) + } + for i := 0; i < b.N; i++ { + _, _ = list.Exist("f0eb5b87e0b0041f3917002c0707475f") + } +} diff --git a/internal/caches/list.go b/internal/caches/list_interface.go similarity index 63% rename from internal/caches/list.go rename to internal/caches/list_interface.go index 8f56f38..1ee91bf 100644 --- a/internal/caches/list.go +++ b/internal/caches/list_interface.go @@ -3,23 +3,31 @@ package caches type ListInterface interface { + // Init 初始化 Init() error + // Reset 重置数据 Reset() error + // Add 添加内容 Add(hash string, item *Item) error + // Exist 检查内容是否存在 Exist(hash string) (bool, error) - // FindKeysWithPrefix 根据前缀进行查找 - FindKeysWithPrefix(prefix string) (keys []string, err error) + // CleanPrefix 清除某个前缀的缓存 + CleanPrefix(prefix string) error + // Remove 删除内容 Remove(hash string) error + // Purge 清理过期数据 Purge(count int, callback func(hash string) error) error + // CleanAll 清除所有缓存 CleanAll() error + // Stat 统计 Stat(check func(hash string) bool) (*Stat, error) // Count 总数量 @@ -30,4 +38,7 @@ type ListInterface interface { // OnRemove 删除事件 OnRemove(f func(item *Item)) + + // Close 关闭 + Close() error } diff --git a/internal/caches/list_memory.go b/internal/caches/list_memory.go index 00241a6..ffbf74f 100644 --- a/internal/caches/list_memory.go +++ b/internal/caches/list_memory.go @@ -92,8 +92,8 @@ func (this *MemoryList) Exist(hash string) (bool, error) { return !item.IsExpired(), nil } -// FindKeysWithPrefix 根据前缀进行查找 -func (this *MemoryList) FindKeysWithPrefix(prefix string) (keys []string, err error) { +// CleanPrefix 根据前缀进行清除 +func (this *MemoryList) CleanPrefix(prefix string) error { this.locker.RLock() defer this.locker.RUnlock() @@ -101,11 +101,11 @@ func (this *MemoryList) FindKeysWithPrefix(prefix string) (keys []string, err er for _, itemMap := range this.itemMaps { for _, item := range itemMap { if strings.HasPrefix(item.Key, prefix) { - keys = append(keys, item.Key) + item.ExpiredAt = 0 } } } - return + return nil } func (this *MemoryList) Remove(hash string) error { @@ -225,6 +225,10 @@ func (this *MemoryList) OnRemove(f func(item *Item)) { this.onRemove = f } +func (this *MemoryList) Close() error { + return nil +} + func (this *MemoryList) print(t *testing.T) { this.locker.Lock() for _, itemMap := range this.itemMaps { diff --git a/internal/caches/list_memory_test.go b/internal/caches/list_memory_test.go index daafe30..91bffb0 100644 --- a/internal/caches/list_memory_test.go +++ b/internal/caches/list_memory_test.go @@ -134,7 +134,7 @@ func TestMemoryList_Stat(t *testing.T) { t.Log(result) } -func TestMemoryList_FindKeysWithPrefix(t *testing.T) { +func TestMemoryList_CleanPrefix(t *testing.T) { list := NewMemoryList() _ = list.Init() before := time.Now() @@ -142,7 +142,7 @@ func TestMemoryList_FindKeysWithPrefix(t *testing.T) { key := "http://www.teaos.cn/hello/" + strconv.Itoa(i/10000) + "/" + strconv.Itoa(i) + ".html" _ = list.Add(fmt.Sprintf("%d", xxhash.Sum64String(key)), &Item{ Key: key, - ExpiredAt: 0, + ExpiredAt: time.Now().Unix() + 3600, BodySize: 0, HeaderSize: 0, }) @@ -150,11 +150,15 @@ func TestMemoryList_FindKeysWithPrefix(t *testing.T) { t.Log(time.Since(before).Seconds()*1000, "ms") before = time.Now() - keys, err := list.FindKeysWithPrefix("http://www.teaos.cn/hello/50") + err := list.CleanPrefix("http://www.teaos.cn/hello/10") if err != nil { t.Fatal(err) } - t.Log(len(keys)) + + logs.Println(list.Stat(func(hash string) bool { + return true + })) + t.Log(time.Since(before).Seconds()*1000, "ms") } diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 94dc6e6..56ceb7d 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -191,9 +191,10 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) { } } - _, path := this.keyPath(key) + hash, path := this.keyPath(key) // TODO 尝试使用mmap加快读取速度 + var isOk = false fp, err := os.OpenFile(path, os.O_RDONLY, 0444) if err != nil { if !os.IsNotExist(err) { @@ -201,6 +202,21 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) { } return nil, ErrNotFound } + defer func() { + if !isOk { + _ = fp.Close() + _ = os.Remove(path) + } + }() + + // 检查文件记录是否已过期 + exists, err := this.list.Exist(hash) + if err != nil { + return nil, err + } + if !exists { + return nil, ErrNotFound + } reader := NewFileReader(fp) if err != nil { @@ -210,6 +226,8 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) { if err != nil { return nil, err } + + isOk = true return reader, nil } @@ -520,33 +538,18 @@ func (this *FileStorage) Purge(keys []string, urlType string) error { // 目录 if urlType == "dir" { - resultKeys := []string{} for _, key := range keys { - subKeys, err := this.list.FindKeysWithPrefix(key) + err := this.list.CleanPrefix(key) if err != nil { return err } - resultKeys = append(resultKeys, subKeys...) } - keys = resultKeys } // 文件 for _, key := range keys { hash, path := this.keyPath(key) - exists, err := this.list.Exist(hash) - if err != nil { - return err - } - if !exists { - err := os.Remove(path) - if err != nil && !os.IsNotExist(err) { - return err - } - continue - } - - err = os.Remove(path) + err := os.Remove(path) if err != nil && !os.IsNotExist(err) { return err } @@ -572,6 +575,8 @@ func (this *FileStorage) Stop() { if this.ticker != nil { this.ticker.Stop() } + + _ = this.list.Close() } // TotalDiskSize 消耗的磁盘尺寸 @@ -742,7 +747,7 @@ func (this *FileStorage) decodeFile(path string) (*Item, error) { // 清理任务 func (this *FileStorage) purgeLoop() { - _ = this.list.Purge(1000, func(hash string) error { + err := this.list.Purge(1000, func(hash string) error { path := this.hashPath(hash) err := os.Remove(path) if err != nil && !os.IsNotExist(err) { @@ -750,6 +755,9 @@ func (this *FileStorage) purgeLoop() { } return nil }) + if err != nil { + remotelogs.Warn("CACHE", "purge file storage failed: " + err.Error()) + } } func (this *FileStorage) readToBuff(fp *os.File, buf []byte) (ok bool, err error) { diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index 922429e..1ad57b0 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -3,8 +3,10 @@ package caches import ( "fmt" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/cespare/xxhash" + "runtime" "strconv" "sync" "sync/atomic" @@ -182,15 +184,12 @@ func (this *MemoryStorage) CleanAll() error { func (this *MemoryStorage) Purge(keys []string, urlType string) error { // 目录 if urlType == "dir" { - resultKeys := []string{} for _, key := range keys { - subKeys, err := this.list.FindKeysWithPrefix(key) + err := this.list.CleanPrefix(key) if err != nil { return err } - resultKeys = append(resultKeys, subKeys...) } - keys = resultKeys } for _, key := range keys { @@ -205,13 +204,21 @@ func (this *MemoryStorage) Purge(keys []string, urlType string) error { // Stop 停止缓存策略 func (this *MemoryStorage) Stop() { this.locker.Lock() - defer this.locker.Unlock() this.valuesMap = map[uint64]*MemoryItem{} + this.writingKeyMap = map[string]bool{} _ = this.list.Reset() if this.ticker != nil { this.ticker.Stop() } + + _ = this.list.Close() + + this.locker.Unlock() + + runtime.GC() + + remotelogs.Println("CACHE", "close memory storage '"+strconv.FormatInt(this.policy.Id, 10)+"'") } // Policy 获取当前存储的Policy diff --git a/internal/caches/writer_memory.go b/internal/caches/writer_memory.go index 9a10f1a..9de33f1 100644 --- a/internal/caches/writer_memory.go +++ b/internal/caches/writer_memory.go @@ -82,7 +82,7 @@ func (this *MemoryWriter) Close() error { func (this *MemoryWriter) Discard() error { // 需要在Locker之外 defer this.endFunc() - + this.locker.Lock() delete(this.m, this.hash) this.locker.Unlock() diff --git a/internal/nodes/http_request.go b/internal/nodes/http_request.go index f5ff76d..e199435 100644 --- a/internal/nodes/http_request.go +++ b/internal/nodes/http_request.go @@ -11,6 +11,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/iwind/TeaGo/types" "golang.org/x/net/http2" + "io" "net" "net/http" "net/url" @@ -1153,7 +1154,7 @@ func (this *HTTPRequest) canIgnore(err error) bool { } // 客户端主动取消 - if err == context.Canceled { + if err == context.Canceled || err == io.ErrShortWrite { return true } diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index 1034eb4..a2cd3b9 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -231,6 +231,8 @@ func (this *HTTPWriter) Close() { ExpiredAt: this.cacheWriter.ExpiredAt(), HeaderSize: this.cacheWriter.HeaderSize(), BodySize: this.cacheWriter.BodySize(), + Host: this.req.Host, + ServerId: this.req.Server.Id, }) } } else {