From a90baa69c76af12e1c8bde40d8f1be1850011f37 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Wed, 16 Mar 2022 16:20:53 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E5=8D=87=E7=BC=93=E5=AD=98=E6=95=88?= =?UTF-8?q?=E7=8E=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/caches/list_file.go | 12 +- internal/caches/list_file_db.go | 138 ++++++++++++++++------- internal/caches/list_file_test.go | 4 +- internal/caches/storage_file.go | 4 +- internal/caches/storage_memory.go | 52 +++++++-- internal/utils/dbs/query_stat_manager.go | 6 +- 6 files changed, 155 insertions(+), 61 deletions(-) diff --git a/internal/caches/list_file.go b/internal/caches/list_file.go index 1750208..8b1386c 100644 --- a/internal/caches/list_file.go +++ b/internal/caches/list_file.go @@ -3,6 +3,7 @@ package caches import ( + "context" "database/sql" "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" @@ -15,7 +16,7 @@ import ( "time" ) -const CountFileDB = 10 +const CountFileDB = 20 // FileList 文件缓存列表管理 type FileList struct { @@ -131,8 +132,13 @@ func (this *FileList) Exist(hash string) (bool, error) { return true, nil } - rows, err := db.existsByHashStmt.Query(hash, time.Now().Unix()) + ctx, cancelCtx := context.WithTimeout(context.Background(), 500*time.Millisecond) + rows, err := db.existsByHashStmt.QueryContext(ctx, hash, time.Now().Unix()) + cancelCtx() if err != nil { + if err == context.DeadlineExceeded { + return false, nil + } return false, err } defer func() { @@ -423,7 +429,7 @@ func (this *FileList) UpgradeV3(oldDir string, brokenOnError bool) error { offset += count }() - rows, err := db.Query(`SELECT "hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "staleAt", "createdAt", "host", "serverId" FROM "cacheItems_v3" LIMIT ?, ?`, offset, count) + rows, err := db.Query(`SELECT "hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "staleAt", "createdAt", "host", "serverId" FROM "cacheItems_v3" ORDER BY "id" ASC LIMIT ?, ?`, offset, count) if err != nil { return err } diff --git a/internal/caches/list_file_db.go b/internal/caches/list_file_db.go index 96841c1..e5dc3a7 100644 --- a/internal/caches/list_file_db.go +++ b/internal/caches/list_file_db.go @@ -10,11 +10,14 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/utils/dbs" "github.com/iwind/TeaGo/types" timeutil "github.com/iwind/TeaGo/utils/time" + "runtime" + "strings" "time" ) type FileListDB struct { - db *dbs.DB + readDB *dbs.DB + writeDB *dbs.DB itemsTableName string hitsTableName string @@ -46,12 +49,13 @@ func NewFileListDB() *FileListDB { } func (this *FileListDB) Open(dbPath string) error { - db, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF") + // write db + writeDB, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=private&mode=rwc&_journal_mode=WAL&_sync=OFF&_cache_size=32000&_secure_delete=FAST") if err != nil { - return errors.New("open database failed: " + err.Error()) + return errors.New("open write database failed: " + err.Error()) } - db.SetMaxOpenConns(1) + writeDB.SetMaxOpenConns(1) // TODO 耗时过长,暂时不整理数据库 /**_, err = db.Exec("VACUUM") @@ -59,10 +63,24 @@ func (this *FileListDB) Open(dbPath string) error { return err }**/ - this.db = dbs.NewDB(db) + this.writeDB = dbs.NewDB(writeDB) if teaconst.EnableDBStat { - this.db.EnableStat(true) + this.writeDB.EnableStat(true) + } + + // read db + readDB, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=private&mode=ro&_journal_mode=WAL&_sync=OFF&_cache_size=32000") + if err != nil { + return errors.New("open read database failed: " + err.Error()) + } + + readDB.SetMaxOpenConns(runtime.NumCPU()) + + this.readDB = dbs.NewDB(readDB) + + if teaconst.EnableDBStat { + this.readDB.EnableStat(true) } return nil @@ -79,7 +97,7 @@ func (this *FileListDB) Init() error { } // 读取总数量 - row := this.db.QueryRow(`SELECT COUNT(*) FROM "` + this.itemsTableName + `"`) + row := this.readDB.QueryRow(`SELECT COUNT(*) FROM "` + this.itemsTableName + `"`) if row.Err() != nil { return row.Err() } @@ -91,56 +109,56 @@ func (this *FileListDB) Init() error { this.total = total // 常用语句 - this.existsByHashStmt, err = this.db.Prepare(`SELECT "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? AND expiredAt>? LIMIT 1`) + this.existsByHashStmt, err = this.readDB.Prepare(`SELECT "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? AND expiredAt>? LIMIT 1`) if err != nil { return err } - this.insertStmt, err = this.db.Prepare(`INSERT INTO "` + this.itemsTableName + `" ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "staleAt", "host", "serverId", "createdAt") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`) + this.insertStmt, err = this.writeDB.Prepare(`INSERT INTO "` + this.itemsTableName + `" ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "staleAt", "host", "serverId", "createdAt") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`) if err != nil { return err } - this.selectByHashStmt, err = this.db.Prepare(`SELECT "key", "headerSize", "bodySize", "metaSize", "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? LIMIT 1`) + this.selectByHashStmt, err = this.readDB.Prepare(`SELECT "key", "headerSize", "bodySize", "metaSize", "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? LIMIT 1`) if err != nil { return err } - this.deleteByHashStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `" WHERE "hash"=?`) + this.deleteByHashStmt, err = this.writeDB.Prepare(`DELETE FROM "` + this.itemsTableName + `" WHERE "hash"=?`) if err != nil { return err } - this.statStmt, err = this.db.Prepare(`SELECT COUNT(*), IFNULL(SUM(headerSize+bodySize+metaSize), 0), IFNULL(SUM(headerSize+bodySize), 0) FROM "` + this.itemsTableName + `"`) + this.statStmt, err = this.readDB.Prepare(`SELECT COUNT(*), IFNULL(SUM(headerSize+bodySize+metaSize), 0), IFNULL(SUM(headerSize+bodySize), 0) FROM "` + this.itemsTableName + `"`) if err != nil { return err } - this.purgeStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.itemsTableName + `" WHERE staleAt<=? LIMIT ?`) + this.purgeStmt, err = this.readDB.Prepare(`SELECT "hash" FROM "` + this.itemsTableName + `" WHERE staleAt<=? LIMIT ?`) if err != nil { return err } - this.deleteAllStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `"`) + this.deleteAllStmt, err = this.writeDB.Prepare(`DELETE FROM "` + this.itemsTableName + `"`) if err != nil { return err } - this.listOlderItemsStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.itemsTableName + `" ORDER BY "id" ASC LIMIT ?`) + this.listOlderItemsStmt, err = this.readDB.Prepare(`SELECT "hash" FROM "` + this.itemsTableName + `" ORDER BY "id" ASC LIMIT ?`) - this.insertHitStmt, err = this.db.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?)`) + this.insertHitStmt, err = this.writeDB.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?)`) - this.increaseHitStmt, err = this.db.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?) ON CONFLICT("hash") DO UPDATE SET "week1Hits"=IIF("week"=?, "week1Hits", "week2Hits"), "week2Hits"=IIF("week"=?, "week2Hits"+1, 1), "week"=?`) + this.increaseHitStmt, err = this.writeDB.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?) ON CONFLICT("hash") DO UPDATE SET "week1Hits"=IIF("week"=?, "week1Hits", "week2Hits"), "week2Hits"=IIF("week"=?, "week2Hits"+1, 1), "week"=?`) if err != nil { return err } - this.deleteHitByHashStmt, err = this.db.Prepare(`DELETE FROM "` + this.hitsTableName + `" WHERE "hash"=?`) + this.deleteHitByHashStmt, err = this.writeDB.Prepare(`DELETE FROM "` + this.hitsTableName + `" WHERE "hash"=?`) if err != nil { return err } - this.lfuHitsStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.hitsTableName + `" ORDER BY "week" ASC, "week1Hits"+"week2Hits" ASC LIMIT ?`) + this.lfuHitsStmt, err = this.readDB.Prepare(`SELECT "hash" FROM "` + this.hitsTableName + `" ORDER BY "week" ASC, "week1Hits"+"week2Hits" ASC LIMIT ?`) if err != nil { return err } @@ -241,7 +259,7 @@ func (this *FileListDB) CleanPrefix(prefix string) error { var staleLife = 600 // TODO 需要可以设置 var unixTime = utils.UnixTime() // 只删除当前的,不删除新的 for { - result, err := this.db.Exec(`UPDATE "`+this.itemsTableName+`" SET expiredAt=0,staleAt=? WHERE id IN (SELECT id FROM "`+this.itemsTableName+`" WHERE expiredAt>0 AND createdAt<=? AND INSTR("key", ?)=1 LIMIT `+types.String(count)+`)`, unixTime+int64(staleLife), unixTime, prefix) + result, err := this.writeDB.Exec(`UPDATE "`+this.itemsTableName+`" SET expiredAt=0,staleAt=? WHERE id IN (SELECT id FROM "`+this.itemsTableName+`" WHERE expiredAt>0 AND createdAt<=? AND INSTR("key", ?)=1 LIMIT `+types.String(count)+`)`, unixTime+int64(staleLife), unixTime, prefix) if err != nil { return err } @@ -272,24 +290,64 @@ func (this *FileListDB) Close() error { this.isClosed = true this.isReady = false - if this.db != nil { + if this.existsByHashStmt != nil { _ = this.existsByHashStmt.Close() - _ = this.insertStmt.Close() - _ = this.selectByHashStmt.Close() - _ = this.deleteByHashStmt.Close() - _ = this.statStmt.Close() - _ = this.purgeStmt.Close() - _ = this.deleteAllStmt.Close() - _ = this.listOlderItemsStmt.Close() - - _ = this.insertHitStmt.Close() - _ = this.increaseHitStmt.Close() - _ = this.deleteHitByHashStmt.Close() - _ = this.lfuHitsStmt.Close() - - return this.db.Close() } - return nil + if this.insertStmt != nil { + _ = this.insertStmt.Close() + } + if this.selectByHashStmt != nil { + _ = this.selectByHashStmt.Close() + } + if this.deleteByHashStmt != nil { + _ = this.deleteByHashStmt.Close() + } + if this.statStmt != nil { + _ = this.statStmt.Close() + } + if this.purgeStmt != nil { + _ = this.purgeStmt.Close() + } + if this.deleteAllStmt != nil { + _ = this.deleteAllStmt.Close() + } + if this.listOlderItemsStmt != nil { + _ = this.listOlderItemsStmt.Close() + } + + if this.insertHitStmt != nil { + _ = this.insertHitStmt.Close() + } + if this.increaseHitStmt != nil { + _ = this.increaseHitStmt.Close() + } + if this.deleteHitByHashStmt != nil { + _ = this.deleteHitByHashStmt.Close() + } + if this.lfuHitsStmt != nil { + _ = this.lfuHitsStmt.Close() + } + + var errStrings []string + + if this.readDB != nil { + err := this.readDB.Close() + if err != nil { + errStrings = append(errStrings, err.Error()) + } + } + + if this.writeDB != nil { + err := this.writeDB.Close() + if err != nil { + errStrings = append(errStrings, err.Error()) + } + } + + if len(errStrings) == 0 { + return nil + } + return errors.New("close database failed: " + strings.Join(errStrings, ", ")) } // 初始化 @@ -297,7 +355,7 @@ func (this *FileListDB) initTables(times int) error { { // expiredAt - 过期时间,用来判断有无过期 // staleAt - 陈旧最大时间,用来清理缓存 - _, err := this.db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.itemsTableName + `" ( + _, err := this.writeDB.Exec(`CREATE TABLE IF NOT EXISTS "` + this.itemsTableName + `" ( "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "hash" varchar(32), "key" varchar(1024), @@ -341,7 +399,7 @@ ON "` + this.itemsTableName + `" ( if err != nil { // 尝试删除重建 if times < 3 { - _, dropErr := this.db.Exec(`DROP TABLE "` + this.itemsTableName + `"`) + _, dropErr := this.writeDB.Exec(`DROP TABLE "` + this.itemsTableName + `"`) if dropErr == nil { return this.initTables(times + 1) } @@ -353,7 +411,7 @@ ON "` + this.itemsTableName + `" ( } { - _, err := this.db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.hitsTableName + `" ( + _, err := this.writeDB.Exec(`CREATE TABLE IF NOT EXISTS "` + this.hitsTableName + `" ( "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "hash" varchar(32), "week1Hits" integer DEFAULT 0, @@ -369,7 +427,7 @@ ON "` + this.hitsTableName + `" ( if err != nil { // 尝试删除重建 if times < 3 { - _, dropErr := this.db.Exec(`DROP TABLE "` + this.hitsTableName + `"`) + _, dropErr := this.writeDB.Exec(`DROP TABLE "` + this.hitsTableName + `"`) if dropErr == nil { return this.initTables(times + 1) } diff --git a/internal/caches/list_file_test.go b/internal/caches/list_file_test.go index 277b982..15f3526 100644 --- a/internal/caches/list_file_test.go +++ b/internal/caches/list_file_test.go @@ -42,7 +42,7 @@ func TestFileList_Add(t *testing.T) { t.Log("db index:", list.GetDBIndex(hash)) err = list.Add(hash, &caches.Item{ Key: "123456", - ExpiredAt: time.Now().Unix(), + ExpiredAt: time.Now().Unix() + 1, HeaderSize: 1, MetaSize: 2, BodySize: 3, @@ -53,6 +53,8 @@ func TestFileList_Add(t *testing.T) { t.Fatal(err) } + t.Log(list.Exist(hash)) + t.Log("ok") } diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 705b411..6c13c1b 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -205,12 +205,12 @@ func (this *FileStorage) Init() error { return errors.New("[CACHE]cache storage dir can not be empty") } - var list = NewFileList(Tea.Root + "/data/cache-index/p" + types.String(this.policy.Id)) + var list = NewFileList(dir + "/p" + types.String(this.policy.Id) + "/.indexes") err = list.Init() if err != nil { return err } - list.(*FileList).SetOldDir(this.options.Dir + "/p" + types.String(this.policy.Id)) + list.(*FileList).SetOldDir(dir + "/p" + types.String(this.policy.Id)) this.list = list stat, err := list.Stat(func(hash string) bool { return true diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index 7a78036..de30e28 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -13,6 +13,7 @@ import ( "github.com/cespare/xxhash" "github.com/iwind/TeaGo/rands" "github.com/iwind/TeaGo/types" + "github.com/shirou/gopsutil/v3/load" "math" "runtime" "strconv" @@ -93,17 +94,12 @@ func (this *MemoryStorage) Init() error { // 启动定时Flush memory to disk任务 if this.parentStorage != nil { - var threads = runtime.NumCPU() - if threads == 0 { - threads = 1 - } else if threads > 8 { - threads = 8 - } + // TODO 应该根据磁盘性能决定线程数 + var threads = 1 + for i := 0; i < threads; i++ { goman.New(func() { - for hash := range this.dirtyChan { - this.flushItem(hash) - } + this.startFlush() }) } } @@ -169,8 +165,7 @@ func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, s if isDirty && this.parentStorage != nil && this.dirtyQueueSize > 0 && - len(this.dirtyChan) == this.dirtyQueueSize && - (expiredAt <= 0 || expiredAt > time.Now().Unix()+7200) { // 缓存时间过长 + len(this.dirtyChan) == this.dirtyQueueSize { // 缓存时间过长 return nil, ErrWritingQueueFull } @@ -420,7 +415,40 @@ func (this *MemoryStorage) purgeLoop() { } } -// Flush任务 +// 开始Flush任务 +func (this *MemoryStorage) startFlush() { + var statCount = 0 + var writeDelayMS float64 = 0 + + for hash := range this.dirtyChan { + statCount++ + + if statCount == 100 { + statCount = 0 + + loadStat, err := load.Avg() + if err == nil && loadStat != nil { + if loadStat.Load1 > 10 { + writeDelayMS = 100 + } else if loadStat.Load1 > 3 { + writeDelayMS = 50 + } else if loadStat.Load1 > 2 { + writeDelayMS = 10 + } else { + writeDelayMS = 0 + } + } + } + + this.flushItem(hash) + + if writeDelayMS > 0 { + time.Sleep(time.Duration(writeDelayMS) * time.Millisecond) + } + } +} + +// 单次Flush任务 func (this *MemoryStorage) flushItem(key string) { if this.parentStorage == nil { return diff --git a/internal/utils/dbs/query_stat_manager.go b/internal/utils/dbs/query_stat_manager.go index b1fb612..8ecbd29 100644 --- a/internal/utils/dbs/query_stat_manager.go +++ b/internal/utils/dbs/query_stat_manager.go @@ -25,12 +25,12 @@ func init() { for _, stat := range SharedQueryStatManager.TopN(10) { var avg = stat.CostTotal / float64(stat.Calls) var query = stat.Query - if len(query) > 100 { - query = query[:100] + if len(query) > 128 { + query = query[:128] } stats = append(stats, fmt.Sprintf("%.2fms/%.2fms/%.2fms - %d - %s", stat.CostMin*1000, stat.CostMax*1000, avg*1000, stat.Calls, query)) } - logs.Println("====DB STATS====\n" + strings.Join(stats, "\n")) + logs.Println("\n========== DB STATS ==========\n" + strings.Join(stats, "\n") + "\n=============================") } }) }