diff --git a/internal/caches/list_file.go b/internal/caches/list_file.go index ab2a60c..ffa4add 100644 --- a/internal/caches/list_file.go +++ b/internal/caches/list_file.go @@ -96,7 +96,7 @@ func (this *FileList) Reset() error { } func (this *FileList) Add(hash string, item *Item) error { - var db = this.getDB(hash) + var db = this.GetDB(hash) if !db.IsReady() { return nil @@ -120,12 +120,17 @@ func (this *FileList) Add(hash string, item *Item) error { } func (this *FileList) Exist(hash string) (bool, error) { - var db = this.getDB(hash) + var db = this.GetDB(hash) if !db.IsReady() { return false, nil } + // 如果Hash列表里不存在,那么必然不存在 + if !db.hashMap.Exist(hash) { + return false, nil + } + var item = this.memoryCache.Read(hash) if item != nil { return true, nil @@ -225,7 +230,7 @@ func (this *FileList) PurgeLFU(count int, callback func(hash string) error) erro return err } if notFound { - _, err = db.deleteHitByHashStmt.Exec(hash) + err = db.DeleteHitAsync(hash) if err != nil { return db.WrapError(err) } @@ -291,13 +296,13 @@ func (this *FileList) Count() (int64, error) { // IncreaseHit 增加点击量 func (this *FileList) IncreaseHit(hash string) error { - var db = this.getDB(hash) + var db = this.GetDB(hash) if !db.IsReady() { return nil } - return db.IncreaseHit(hash) + return db.IncreaseHitAsync(hash) } // OnAdd 添加事件 @@ -326,17 +331,23 @@ func (this *FileList) GetDBIndex(hash string) uint64 { return fnv.HashString(hash) % CountFileDB } -func (this *FileList) getDB(hash string) *FileListDB { +func (this *FileList) GetDB(hash string) *FileListDB { return this.dbList[fnv.HashString(hash)%CountFileDB] } func (this *FileList) remove(hash string) (notFound bool, err error) { - var db = this.getDB(hash) + var db = this.GetDB(hash) if !db.IsReady() { return false, nil } + // HashMap中不存在,则确定不存在 + if !db.hashMap.Exist(hash) { + return true, nil + } + defer db.hashMap.Delete(hash) + // 从缓存中删除 this.memoryCache.Delete(hash) @@ -364,7 +375,7 @@ func (this *FileList) remove(hash string) (notFound bool, err error) { atomic.AddInt64(&this.total, -1) - _, err = db.deleteHitByHashStmt.Exec(hash) + err = db.DeleteHitAsync(hash) if err != nil { return false, db.WrapError(err) } diff --git a/internal/caches/list_file_db.go b/internal/caches/list_file_db.go index 75bec53..d767d37 100644 --- a/internal/caches/list_file_db.go +++ b/internal/caches/list_file_db.go @@ -25,6 +25,8 @@ type FileListDB struct { writeBatch *dbs.Batch + hashMap *FileListHashMap + itemsTableName string hitsTableName string @@ -41,6 +43,8 @@ type FileListDB struct { selectByHashStmt *dbs.Stmt // 使用hash查询数据 + selectHashListStmt *dbs.Stmt + deleteByHashStmt *dbs.Stmt // 根据hash删除数据 deleteByHashSQL string @@ -50,21 +54,30 @@ type FileListDB struct { listOlderItemsStmt *dbs.Stmt // 读取较早存储的缓存 // hits - insertHitStmt *dbs.Stmt // 写入数据 - increaseHitStmt *dbs.Stmt // 增加点击量 - deleteHitByHashStmt *dbs.Stmt // 根据hash删除数据 - lfuHitsStmt *dbs.Stmt // 读取老的数据 + insertHitSQL string // 写入数据 + increaseHitSQL string // 增加点击量 + deleteHitByHashSQL string // 根据hash删除数据 + lfuHitsStmt *dbs.Stmt // 读取老的数据 } func NewFileListDB() *FileListDB { - return &FileListDB{} + return &FileListDB{ + hashMap: NewFileListHashMap(), + } } func (this *FileListDB) Open(dbPath string) error { this.dbPath = dbPath + // 动态调整Cache值 + var cacheSize = 32000 + var memoryGB = utils.SystemMemoryGB() + if memoryGB >= 8 { + cacheSize += 32000 * memoryGB / 8 + } + // write db - writeDB, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=private&mode=rwc&_journal_mode=WAL&_sync=OFF&_cache_size=32000&_secure_delete=FAST") + writeDB, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=private&mode=rwc&_journal_mode=WAL&_sync=OFF&_cache_size="+types.String(cacheSize)+"&_secure_delete=FAST") if err != nil { return errors.New("open write database failed: " + err.Error()) } @@ -94,7 +107,7 @@ func (this *FileListDB) Open(dbPath string) error { } // read db - readDB, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=private&mode=ro&_journal_mode=WAL&_sync=OFF&_cache_size=32000") + readDB, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=private&mode=ro&_journal_mode=WAL&_sync=OFF&_cache_size="+types.String(cacheSize)) if err != nil { return errors.New("open read database failed: " + err.Error()) } @@ -149,6 +162,8 @@ func (this *FileListDB) Init() error { return err } + this.selectHashListStmt, err = this.readDB.Prepare(`SELECT "id", "hash" FROM "` + this.itemsTableName + `" WHERE id>:id ORDER BY id ASC LIMIT 2000`) + this.deleteByHashSQL = `DELETE FROM "` + this.itemsTableName + `" WHERE "hash"=?` this.deleteByHashStmt, err = this.writeDB.Prepare(this.deleteByHashSQL) if err != nil { @@ -172,17 +187,11 @@ func (this *FileListDB) Init() error { this.listOlderItemsStmt, err = this.readDB.Prepare(`SELECT "hash" FROM "` + this.itemsTableName + `" ORDER BY "id" ASC LIMIT ?`) - this.insertHitStmt, err = this.writeDB.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?)`) + this.insertHitSQL = `INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?)` - this.increaseHitStmt, err = this.writeDB.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?) ON CONFLICT("hash") DO UPDATE SET "week1Hits"=IIF("week"=?, "week1Hits", "week2Hits"), "week2Hits"=IIF("week"=?, "week2Hits"+1, 1), "week"=?`) - if err != nil { - return err - } + this.increaseHitSQL = `INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?) ON CONFLICT("hash") DO UPDATE SET "week1Hits"=IIF("week"=?, "week1Hits", "week2Hits"), "week2Hits"=IIF("week"=?, "week2Hits"+1, 1), "week"=?` - this.deleteHitByHashStmt, err = this.writeDB.Prepare(`DELETE FROM "` + this.hitsTableName + `" WHERE "hash"=?`) - if err != nil { - return err - } + this.deleteHitByHashSQL = `DELETE FROM "` + this.hitsTableName + `" WHERE "hash"=?` this.lfuHitsStmt, err = this.readDB.Prepare(`SELECT "hash" FROM "` + this.hitsTableName + `" ORDER BY "week" ASC, "week1Hits"+"week2Hits" ASC LIMIT ?`) if err != nil { @@ -191,6 +200,14 @@ func (this *FileListDB) Init() error { this.isReady = true + // 加载HashMap + go func() { + err := this.hashMap.Load(this) + if err != nil { + remotelogs.Error("LIST_FILE_DB", "load hash map failed: "+err.Error()+"(file: "+this.dbPath+")") + } + }() + return nil } @@ -203,6 +220,8 @@ func (this *FileListDB) Total() int64 { } func (this *FileListDB) AddAsync(hash string, item *Item) error { + this.hashMap.Add(hash) + if item.StaleAt == 0 { item.StaleAt = item.ExpiredAt } @@ -213,6 +232,8 @@ func (this *FileListDB) AddAsync(hash string, item *Item) error { } func (this *FileListDB) AddSync(hash string, item *Item) error { + this.hashMap.Add(hash) + if item.StaleAt == 0 { item.StaleAt = item.ExpiredAt } @@ -226,11 +247,15 @@ func (this *FileListDB) AddSync(hash string, item *Item) error { } func (this *FileListDB) DeleteAsync(hash string) error { + this.hashMap.Delete(hash) + this.writeBatch.Add(this.deleteByHashSQL, hash) return nil } func (this *FileListDB) DeleteSync(hash string) error { + this.hashMap.Delete(hash) + _, err := this.deleteByHashStmt.Exec(hash) if err != nil { return err @@ -293,10 +318,34 @@ func (this *FileListDB) ListLFUItems(count int) (hashList []string, err error) { return } -func (this *FileListDB) IncreaseHit(hash string) error { +func (this *FileListDB) ListHashes(lastId int64) (hashList []string, maxId int64, err error) { + rows, err := this.selectHashListStmt.Query(lastId) + if err != nil { + return nil, 0, err + } + for rows.Next() { + var id int64 + var hash string + err = rows.Scan(&id, &hash) + if err != nil { + _ = rows.Close() + return + } + maxId = id + hashList = append(hashList, hash) + } + return +} + +func (this *FileListDB) IncreaseHitAsync(hash string) error { var week = timeutil.Format("YW") - _, err := this.increaseHitStmt.Exec(hash, week, week, week, week) - return this.WrapError(err) + this.writeBatch.Add(this.increaseHitSQL, hash, week, week, week, week) + return nil +} + +func (this *FileListDB) DeleteHitAsync(hash string) error { + this.writeBatch.Add(this.deleteHitByHashSQL, hash) + return nil } func (this *FileListDB) CleanPrefix(prefix string) error { @@ -331,6 +380,8 @@ func (this *FileListDB) CleanAll() error { return this.WrapError(err) } + this.hashMap.Clean() + return nil } @@ -347,6 +398,9 @@ func (this *FileListDB) Close() error { if this.selectByHashStmt != nil { _ = this.selectByHashStmt.Close() } + if this.selectHashListStmt != nil { + _ = this.selectHashListStmt.Close() + } if this.deleteByHashStmt != nil { _ = this.deleteByHashStmt.Close() } @@ -362,16 +416,6 @@ func (this *FileListDB) Close() error { if this.listOlderItemsStmt != nil { _ = this.listOlderItemsStmt.Close() } - - if this.insertHitStmt != nil { - _ = this.insertHitStmt.Close() - } - if this.increaseHitStmt != nil { - _ = this.increaseHitStmt.Close() - } - if this.deleteHitByHashStmt != nil { - _ = this.deleteHitByHashStmt.Close() - } if this.lfuHitsStmt != nil { _ = this.lfuHitsStmt.Close() } @@ -392,7 +436,6 @@ func (this *FileListDB) Close() error { } } - if this.writeBatch != nil { this.writeBatch.Close() } diff --git a/internal/caches/list_file_hash_map.go b/internal/caches/list_file_hash_map.go new file mode 100644 index 0000000..3016813 --- /dev/null +++ b/internal/caches/list_file_hash_map.go @@ -0,0 +1,75 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package caches + +import ( + "github.com/TeaOSLab/EdgeNode/internal/zero" + "sync" +) + +// FileListHashMap +type FileListHashMap struct { + m map[string]zero.Zero + locker sync.RWMutex + isReady bool +} + +func NewFileListHashMap() *FileListHashMap { + return &FileListHashMap{ + m: map[string]zero.Zero{}, + isReady: false, + } +} + +func (this *FileListHashMap) Load(db *FileListDB) error { + var lastId int64 + for { + hashList, maxId, err := db.ListHashes(lastId) + if err != nil { + return err + } + if len(hashList) == 0 { + break + } + for _, hash := range hashList { + this.Add(hash) + } + lastId = maxId + } + + this.isReady = true + return nil +} + +func (this *FileListHashMap) Add(hash string) { + this.locker.Lock() + this.m[hash] = zero.New() + this.locker.Unlock() +} + +func (this *FileListHashMap) Delete(hash string) { + this.locker.Lock() + delete(this.m, hash) + this.locker.Unlock() +} + +func (this *FileListHashMap) Exist(hash string) bool { + if !this.isReady { + // 只有完全Ready时才能判断是否为false + return true + } + this.locker.RLock() + _, ok := this.m[hash] + this.locker.RUnlock() + return ok +} + +func (this *FileListHashMap) Clean() { + this.locker.Lock() + this.m = map[string]zero.Zero{} + this.locker.Unlock() +} + +func (this *FileListHashMap) IsReady() bool { + return this.isReady +} diff --git a/internal/caches/list_file_hash_map_test.go b/internal/caches/list_file_hash_map_test.go new file mode 100644 index 0000000..4189f2a --- /dev/null +++ b/internal/caches/list_file_hash_map_test.go @@ -0,0 +1,51 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package caches_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/caches" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/types" + stringutil "github.com/iwind/TeaGo/utils/string" + "runtime" + "testing" +) + +func TestFileListHashMap_Memory(t *testing.T) { + var stat1 = &runtime.MemStats{} + runtime.ReadMemStats(stat1) + + var m = caches.NewFileListHashMap() + + for i := 0; i < 1_000_000; i++ { + m.Add(stringutil.Md5(types.String(i))) + } + + var stat2 = &runtime.MemStats{} + runtime.ReadMemStats(stat2) + + t.Log("ready", (stat2.Alloc-stat1.Alloc)/1024/1024, "M") +} + +func TestFileListHashMap_Load(t *testing.T) { + var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1").(*caches.FileList) + err := list.Init() + if err != nil { + t.Fatal(err) + } + + defer func() { + _ = list.Close() + }() + + var m = caches.NewFileListHashMap() + err = m.Load(list.GetDB("abc")) + if err != nil { + t.Fatal(err) + } + m.Add("abc") + + for _, hash := range []string{"33347bb4441265405347816cad36a0f8", "a", "abc", "123"} { + t.Log(hash, "=>", m.Exist(hash)) + } +} diff --git a/internal/caches/list_file_test.go b/internal/caches/list_file_test.go index 4a35588..a6de8b8 100644 --- a/internal/caches/list_file_test.go +++ b/internal/caches/list_file_test.go @@ -361,19 +361,6 @@ func TestFileList_UpgradeV3(t *testing.T) { t.Log("ok") } -func TestFileList_HashList(t *testing.T) { - var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1") - err := list.Init() - if err != nil { - t.Fatal(err) - } - prefixes, err := list.(*caches.FileList).FindAllPrefixes() - if err != nil { - t.Fatal(err) - } - t.Log(len(prefixes)) -} - func BenchmarkFileList_Exist(b *testing.B) { var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1") err := list.Init() diff --git a/internal/utils/dbs/batch.go b/internal/utils/dbs/batch.go index a67f90a..b35c9ef 100644 --- a/internal/utils/dbs/batch.go +++ b/internal/utils/dbs/batch.go @@ -103,6 +103,12 @@ For: } case <-this.close: // closed + + if lastTx != nil { + _ = lastTx.Commit() + lastTx = nil + } + return } }