diff --git a/internal/caches/errors.go b/internal/caches/errors.go index 82363a3..42c10f4 100644 --- a/internal/caches/errors.go +++ b/internal/caches/errors.go @@ -6,11 +6,12 @@ import "errors" // 常用的几个错误 var ( - ErrNotFound = errors.New("cache not found") - ErrFileIsWriting = errors.New("the file is writing") - ErrInvalidRange = errors.New("invalid range") - ErrEntityTooLarge = errors.New("entity too large") - ErrWritingUnavaible = errors.New("writing unavailable") + ErrNotFound = errors.New("cache not found") + ErrFileIsWriting = errors.New("the file is writing") + ErrInvalidRange = errors.New("invalid range") + ErrEntityTooLarge = errors.New("entity too large") + ErrWritingUnavailable = errors.New("writing unavailable") + ErrWritingQueueFull = errors.New("writing queue full") ) // CapacityError 容量错误 @@ -32,7 +33,10 @@ func CanIgnoreErr(err error) bool { if err == nil { return true } - if err == ErrFileIsWriting || err == ErrEntityTooLarge || err == ErrWritingUnavaible { + if err == ErrFileIsWriting || + err == ErrEntityTooLarge || + err == ErrWritingUnavailable || + err == ErrWritingQueueFull { return true } _, ok := err.(*CapacityError) diff --git a/internal/caches/list_file.go b/internal/caches/list_file.go index 7713eee..ed853be 100644 --- a/internal/caches/list_file.go +++ b/internal/caches/list_file.go @@ -4,54 +4,32 @@ package caches import ( "database/sql" - "errors" - teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/ttlcache" - "github.com/TeaOSLab/EdgeNode/internal/utils" - "github.com/TeaOSLab/EdgeNode/internal/utils/dbs" - "github.com/iwind/TeaGo/lists" + "github.com/TeaOSLab/EdgeNode/internal/utils/fnv" "github.com/iwind/TeaGo/types" - timeutil "github.com/iwind/TeaGo/utils/time" _ "github.com/mattn/go-sqlite3" "os" "sync/atomic" "time" ) +const CountFileDB = 10 + // FileList 文件缓存列表管理 type FileList struct { - dir string - db *dbs.DB - total int64 + dir string + dbList [CountFileDB]*FileListDB + total int64 onAdd func(item *Item) onRemove func(item *Item) - // cacheItems - existsByHashStmt *dbs.Stmt // 根据hash检查是否存在 - insertStmt *dbs.Stmt // 写入数据 - selectByHashStmt *dbs.Stmt // 使用hash查询数据 - deleteByHashStmt *dbs.Stmt // 根据hash删除数据 - statStmt *dbs.Stmt // 统计 - purgeStmt *dbs.Stmt // 清理 - deleteAllStmt *dbs.Stmt // 删除所有数据 - - // hits - insertHitStmt *dbs.Stmt // 写入数据 - increaseHitStmt *dbs.Stmt // 增加点击量 - deleteHitByHashStmt *dbs.Stmt // 根据hash删除数据 - lfuHitsStmt *dbs.Stmt // 读取老的数据 - - oldTables []string - itemsTableName string - hitsTableName string - - isClosed bool - isReady bool - memoryCache *ttlcache.Cache + + // 老数据库地址 + oldDir string } func NewFileList(dir string) ListInterface { @@ -61,6 +39,10 @@ func NewFileList(dir string) ListInterface { } } +func (this *FileList) SetOldDir(oldDir string) { + this.oldDir = oldDir +} + func (this *FileList) Init() error { // 检查目录是否存在 _, err := os.Stat(this.dir) @@ -72,118 +54,38 @@ func (this *FileList) Init() error { remotelogs.Println("CACHE", "create cache dir '"+this.dir+"'") } - this.itemsTableName = "cacheItems_v3" - this.hitsTableName = "hits" - var dir = this.dir if dir == "/" { // 防止sqlite提示authority错误 dir = "" } - var dbPath = dir + "/index.db" - remotelogs.Println("CACHE", "loading database '"+dbPath+"'") - db, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF&_cache_size=16000") - if err != nil { - return errors.New("open database failed: " + err.Error()) - } - db.SetMaxOpenConns(1) + remotelogs.Println("CACHE", "loading database from '"+dir+"' ...") + for i := 0; i < CountFileDB; i++ { + var db = NewFileListDB() + err = db.Open(dir + "/db-" + types.String(i) + ".db") + if err != nil { + return err + } - this.db = dbs.NewDB(db) + err = db.Init() + if err != nil { + return err + } - if teaconst.EnableDBStat { - this.db.EnableStat(true) - } - - // TODO 耗时过长,暂时不整理数据库 - /**_, err = db.Exec("VACUUM") - if err != nil { - return err - }**/ - - // 创建 - err = this.initTables(db, 1) - if err != nil { - return errors.New("init tables failed: " + err.Error()) - } - - // 清除旧表 - // 这个一定要在initTables()之后,因为老的数据需要转移 - this.oldTables = []string{ - "cacheItems", - "cacheItems_v2", - } - err = this.removeOldTables() - if err != nil { - remotelogs.Warn("CACHE", "clean old tables failed: "+err.Error()) + this.dbList[i] = db } // 读取总数量 - row := this.db.QueryRow(`SELECT COUNT(*) FROM "` + this.itemsTableName + `"`) - if row.Err() != nil { - return row.Err() - } - var total int64 - err = row.Scan(&total) - if err != nil { - return err - } - this.total = total - - // 常用语句 - this.existsByHashStmt, err = this.db.Prepare(`SELECT "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? AND expiredAt>? LIMIT 1`) - if err != nil { - return err + this.total = 0 + for _, db := range this.dbList { + this.total += db.total } - this.insertStmt, err = this.db.Prepare(`INSERT INTO "` + this.itemsTableName + `" ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "staleAt", "host", "serverId", "createdAt") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`) - if err != nil { - return err - } - - this.selectByHashStmt, err = this.db.Prepare(`SELECT "key", "headerSize", "bodySize", "metaSize", "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? LIMIT 1`) - if err != nil { - return err - } - - this.deleteByHashStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `" WHERE "hash"=?`) - if err != nil { - return err - } - - this.statStmt, err = this.db.Prepare(`SELECT COUNT(*), IFNULL(SUM(headerSize+bodySize+metaSize), 0), IFNULL(SUM(headerSize+bodySize), 0) FROM "` + this.itemsTableName + `"`) - if err != nil { - return err - } - - this.purgeStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.itemsTableName + `" WHERE staleAt<=? LIMIT ?`) - if err != nil { - return err - } - - this.deleteAllStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `"`) - if err != nil { - return err - } - - this.insertHitStmt, err = this.db.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?)`) - - this.increaseHitStmt, err = this.db.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?) ON CONFLICT("hash") DO UPDATE SET "week1Hits"=IIF("week"=?, "week1Hits", "week2Hits"), "week2Hits"=IIF("week"=?, "week2Hits"+1, 1), "week"=?`) - if err != nil { - return err - } - - this.deleteHitByHashStmt, err = this.db.Prepare(`DELETE FROM "` + this.hitsTableName + `" WHERE "hash"=?`) - if err != nil { - return err - } - - this.lfuHitsStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.hitsTableName + `" ORDER BY "week" ASC, "week1Hits"+"week2Hits" ASC LIMIT ?`) - if err != nil { - return err - } - - this.isReady = true + // 升级老版本数据库 + goman.New(func() { + this.upgradeOldDB() + }) return nil } @@ -194,24 +96,22 @@ func (this *FileList) Reset() error { } func (this *FileList) Add(hash string, item *Item) error { - if !this.isReady { + var db = this.getDB(hash) + + if !db.IsReady() { return nil } - if item.StaleAt == 0 { - item.StaleAt = item.ExpiredAt - } - - // 放入队列 - _, err := this.insertStmt.Exec(hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt, item.StaleAt, item.Host, item.ServerId, utils.UnixTime()) + err := db.Add(hash, item) if err != nil { return err } + atomic.AddInt64(&this.total, 1) + // 这里不增加点击量,以减少对数据库的操作次数 this.memoryCache.Write(hash, 1, item.ExpiredAt) - atomic.AddInt64(&this.total, 1) if this.onAdd != nil { this.onAdd(item) @@ -220,7 +120,9 @@ func (this *FileList) Add(hash string, item *Item) error { } func (this *FileList) Exist(hash string) (bool, error) { - if !this.isReady { + var db = this.getDB(hash) + + if !db.IsReady() { return false, nil } @@ -229,7 +131,7 @@ func (this *FileList) Exist(hash string) (bool, error) { return true, nil } - rows, err := this.existsByHashStmt.Query(hash, time.Now().Unix()) + rows, err := db.existsByHashStmt.Query(hash, time.Now().Unix()) if err != nil { return false, err } @@ -250,10 +152,6 @@ func (this *FileList) Exist(hash string) (bool, error) { // CleanPrefix 清理某个前缀的缓存数据 func (this *FileList) CleanPrefix(prefix string) error { - if !this.isReady { - return nil - } - if len(prefix) == 0 { return nil } @@ -262,106 +160,48 @@ func (this *FileList) CleanPrefix(prefix string) error { this.memoryCache.Clean() }() - var count = int64(10000) - var staleLife = 600 // TODO 需要可以设置 - var unixTime = utils.UnixTime() // 只删除当前的,不删除新的 - for { - result, err := this.db.Exec(`UPDATE "`+this.itemsTableName+`" SET expiredAt=0,staleAt=? WHERE id IN (SELECT id FROM "`+this.itemsTableName+`" WHERE expiredAt>0 AND createdAt<=? AND INSTR("key", ?)=1 LIMIT `+types.String(count)+`)`, unixTime+int64(staleLife), unixTime, prefix) + for _, db := range this.dbList { + err := db.CleanPrefix(prefix) if err != nil { return err } - affectedRows, err := result.RowsAffected() - if err != nil { - return err - } - if affectedRows < count { - return nil - } } + return nil } func (this *FileList) Remove(hash string) error { - if !this.isReady { - return nil - } - - // 从缓存中删除 - this.memoryCache.Delete(hash) - - row := this.selectByHashStmt.QueryRow(hash) - if row.Err() != nil { - return row.Err() - } - - var item = &Item{Type: ItemTypeFile} - err := row.Scan(&item.Key, &item.HeaderSize, &item.BodySize, &item.MetaSize, &item.ExpiredAt) - if err != nil { - if err == sql.ErrNoRows { - return nil - } - return err - } - - _, err = this.deleteByHashStmt.Exec(hash) - if err != nil { - return err - } - - _, err = this.deleteHitByHashStmt.Exec(hash) - if err != nil { - return err - } - - atomic.AddInt64(&this.total, -1) - - if this.onRemove != nil { - this.onRemove(item) - } - - return nil + _, err := this.remove(hash) + return err } // Purge 清理过期的缓存 // count 每次遍历的最大数量,控制此数字可以保证每次清理的时候不用花太多时间 // callback 每次发现过期key的调用 func (this *FileList) Purge(count int, callback func(hash string) error) (int, error) { - if !this.isReady { - return 0, nil - } - + count /= CountFileDB if count <= 0 { - count = 1000 + count = 100 } - rows, err := this.purgeStmt.Query(time.Now().Unix(), count) - if err != nil { - return 0, err - } - - hashStrings := []string{} var countFound = 0 - for rows.Next() { - var hash string - err = rows.Scan(&hash) + for _, db := range this.dbList { + hashStrings, err := db.ListExpiredItems(count) if err != nil { - _ = rows.Close() - return 0, err + return 0, nil } - hashStrings = append(hashStrings, hash) - countFound++ - } - _ = rows.Close() // 不能使用defer,防止读写冲突 + countFound += len(hashStrings) - // 不在 rows.Next() 循环中操作是为了避免死锁 - for _, hash := range hashStrings { - err = this.Remove(hash) - if err != nil { - return 0, err - } + // 不在 rows.Next() 循环中操作是为了避免死锁 + for _, hash := range hashStrings { + err = this.Remove(hash) + if err != nil { + return 0, err + } - err = callback(hash) - if err != nil { - return 0, err + err = callback(hash) + if err != nil { + return 0, err + } } } @@ -369,80 +209,80 @@ func (this *FileList) Purge(count int, callback func(hash string) error) (int, e } func (this *FileList) PurgeLFU(count int, callback func(hash string) error) error { - if !this.isReady { - return nil - } - + count /= CountFileDB if count <= 0 { - return nil + count = 100 } - rows, err := this.lfuHitsStmt.Query(count) - if err != nil { - return err - } - - hashStrings := []string{} - var countFound = 0 - for rows.Next() { - var hash string - err = rows.Scan(&hash) - if err != nil { - _ = rows.Close() - return err - } - hashStrings = append(hashStrings, hash) - countFound++ - } - _ = rows.Close() // 不能使用defer,防止读写冲突 - - // 不在 rows.Next() 循环中操作是为了避免死锁 - for _, hash := range hashStrings { - err = this.Remove(hash) + for _, db := range this.dbList { + hashStrings, err := db.ListLFUItems(count) if err != nil { return err } - err = callback(hash) - if err != nil { - return err + // 不在 rows.Next() 循环中操作是为了避免死锁 + for _, hash := range hashStrings { + notFound, err := this.remove(hash) + if err != nil { + return err + } + if notFound { + _, err = db.deleteHitByHashStmt.Exec(hash) + if err != nil { + return err + } + } + + err = callback(hash) + if err != nil { + return err + } } } return nil } func (this *FileList) CleanAll() error { - if !this.isReady { - return nil + defer this.memoryCache.Clean() + + for _, db := range this.dbList { + err := db.CleanAll() + if err != nil { + return err + } } - this.memoryCache.Clean() - - _, err := this.deleteAllStmt.Exec() - if err != nil { - return err - } atomic.StoreInt64(&this.total, 0) + return nil } func (this *FileList) Stat(check func(hash string) bool) (*Stat, error) { - if !this.isReady { - return &Stat{}, nil + var result = &Stat{} + + for _, db := range this.dbList { + if !db.IsReady() { + return &Stat{}, nil + } + + // 这里不设置过期时间、不使用 check 函数,目的是让查询更快速一些 + _ = check + + row := db.statStmt.QueryRow() + if row.Err() != nil { + return nil, row.Err() + } + var stat = &Stat{} + err := row.Scan(&stat.Count, &stat.Size, &stat.ValueSize) + if err != nil { + return nil, err + } + result.Count += stat.Count + result.Size += stat.Size + result.ValueSize += stat.ValueSize } - // 这里不设置过期时间、不使用 check 函数,目的是让查询更快速一些 - row := this.statStmt.QueryRow() - if row.Err() != nil { - return nil, row.Err() - } - stat := &Stat{} - err := row.Scan(&stat.Count, &stat.Size, &stat.ValueSize) - if err != nil { - return nil, err - } - - return stat, nil + return result, nil } // Count 总数量 @@ -453,9 +293,13 @@ func (this *FileList) Count() (int64, error) { // IncreaseHit 增加点击量 func (this *FileList) IncreaseHit(hash string) error { - var week = timeutil.Format("YW") - _, err := this.increaseHitStmt.Exec(hash, week, week, week, week) - return err + var db = this.getDB(hash) + + if !db.IsReady() { + return nil + } + + return db.IncreaseHit(hash) } // OnAdd 添加事件 @@ -469,163 +313,176 @@ func (this *FileList) OnRemove(f func(item *Item)) { } func (this *FileList) Close() error { - this.isClosed = true - this.isReady = false - this.memoryCache.Destroy() - if this.db != nil { - _ = this.existsByHashStmt.Close() - _ = this.insertStmt.Close() - _ = this.selectByHashStmt.Close() - _ = this.deleteByHashStmt.Close() - _ = this.statStmt.Close() - _ = this.purgeStmt.Close() - _ = this.deleteAllStmt.Close() - - _ = this.insertHitStmt.Close() - _ = this.increaseHitStmt.Close() - _ = this.deleteHitByHashStmt.Close() - _ = this.lfuHitsStmt.Close() - - return this.db.Close() + for _, db := range this.dbList { + if db != nil { + _ = db.Close() + } } + return nil } -// 初始化 -func (this *FileList) initTables(db *sql.DB, times int) error { - // 检查是否存在 - _, err := db.Exec(`SELECT id FROM "` + this.itemsTableName + `" LIMIT 1`) - var notFound = false +func (this *FileList) GetDBIndex(hash string) uint64 { + return fnv.Hash(hash) % CountFileDB +} + +func (this *FileList) getDB(hash string) *FileListDB { + return this.dbList[fnv.Hash(hash)%CountFileDB] +} + +func (this *FileList) remove(hash string) (notFound bool, err error) { + var db = this.getDB(hash) + + if !db.IsReady() { + return false, nil + } + + // 从缓存中删除 + this.memoryCache.Delete(hash) + + var row = db.selectByHashStmt.QueryRow(hash) + if row.Err() != nil { + if row.Err() == sql.ErrNoRows { + return true, nil + } + return false, row.Err() + } + + var item = &Item{Type: ItemTypeFile} + err = row.Scan(&item.Key, &item.HeaderSize, &item.BodySize, &item.MetaSize, &item.ExpiredAt) if err != nil { - notFound = true - } - - { - // expiredAt - 过期时间,用来判断有无过期 - // staleAt - 陈旧最大时间,用来清理缓存 - _, err := db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.itemsTableName + `" ( - "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, - "hash" varchar(32), - "key" varchar(1024), - "tag" varchar(64), - "headerSize" integer DEFAULT 0, - "bodySize" integer DEFAULT 0, - "metaSize" integer DEFAULT 0, - "expiredAt" integer DEFAULT 0, - "staleAt" integer DEFAULT 0, - "createdAt" integer DEFAULT 0, - "host" varchar(128), - "serverId" integer -); - -CREATE INDEX IF NOT EXISTS "createdAt" -ON "` + this.itemsTableName + `" ( - "createdAt" ASC -); - -CREATE INDEX IF NOT EXISTS "expiredAt" -ON "` + this.itemsTableName + `" ( - "expiredAt" ASC -); - -CREATE INDEX IF NOT EXISTS "staleAt" -ON "` + this.itemsTableName + `" ( - "staleAt" ASC -); - -CREATE UNIQUE INDEX IF NOT EXISTS "hash" -ON "` + this.itemsTableName + `" ( - "hash" ASC -); - -CREATE INDEX IF NOT EXISTS "serverId" -ON "` + this.itemsTableName + `" ( - "serverId" ASC -); -`) - - if err != nil { - // 尝试删除重建 - if times < 3 { - _, dropErr := db.Exec(`DROP TABLE "` + this.itemsTableName + `"`) - if dropErr == nil { - return this.initTables(db, times+1) - } - return err - } - - return err + if err == sql.ErrNoRows { + return true, nil } + return false, err } - // 如果数据为空,从老数据中加载数据 - if notFound { - // v2 => v3 - remotelogs.Println("CACHE", "transferring old data from v2 to v3 ...") - result, err := db.Exec(`INSERT INTO "` + this.itemsTableName + `" ("id", "hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "createdAt", "host", "serverId", "staleAt") SELECT "id", "hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "createdAt", "host", "serverId", "expiredAt"+600 FROM cacheItems_v2`) - if err == nil { - count, _ := result.RowsAffected() - remotelogs.Println("CACHE", "transfer old data from v2 to v3 finished, "+types.String(count)+" rows transferred") - } + _, err = db.deleteByHashStmt.Exec(hash) + if err != nil { + return false, err } - { - _, err := db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.hitsTableName + `" ( - "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, - "hash" varchar(32), - "week1Hits" integer DEFAULT 0, - "week2Hits" integer DEFAULT 0, - "week" varchar(6) -); + atomic.AddInt64(&this.total, -1) -CREATE UNIQUE INDEX IF NOT EXISTS "hits_hash" -ON "` + this.hitsTableName + `" ( - "hash" ASC -); -`) - if err != nil { - // 尝试删除重建 - if times < 3 { - _, dropErr := db.Exec(`DROP TABLE "` + this.hitsTableName + `"`) - if dropErr == nil { - return this.initTables(db, times+1) - } - return err - } - - return err - } + _, err = db.deleteHitByHashStmt.Exec(hash) + if err != nil { + return false, err } - return nil + if this.onRemove != nil { + this.onRemove(item) + } + + return false, nil } -// 删除过期不用的表格 -func (this *FileList) removeOldTables() error { - rows, err := this.db.Query(`SELECT "name" FROM sqlite_master WHERE "type"='table'`) +// 升级老版本数据库 +func (this *FileList) upgradeOldDB() { + if len(this.oldDir) == 0 { + return + } + _ = this.UpgradeV3(this.oldDir, false) +} + +func (this *FileList) UpgradeV3(oldDir string, brokenOnError bool) error { + // index.db + var indexDBPath = oldDir + "/index.db" + _, err := os.Stat(indexDBPath) + if err != nil { + return nil + } + remotelogs.Println("CACHE", "upgrading local database from '"+oldDir+"' ...") + + defer func() { + _ = os.Remove(indexDBPath) + }() + + db, err := sql.Open("sqlite3", "file:"+indexDBPath+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF") if err != nil { return err } + defer func() { - _ = rows.Close() + _ = db.Close() }() - for rows.Next() { - var name string - err = rows.Scan(&name) + + var isFinished = false + var offset = 0 + var count = 10000 + + for { + if isFinished { + break + } + + err = func() error { + defer func() { + offset += count + }() + + rows, err := db.Query(`SELECT "hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "staleAt", "createdAt", "host", "serverId" FROM "cacheItems_v3" LIMIT ?, ?`, offset, count) + if err != nil { + return err + } + defer func() { + _ = rows.Close() + }() + + var hash = "" + var key = "" + var headerSize int64 + var bodySize int64 + var metaSize int64 + var expiredAt int64 + var staleAt int64 + var createdAt int64 + var host string + var serverId int64 + + isFinished = true + + for rows.Next() { + isFinished = false + + err = rows.Scan(&hash, &key, &headerSize, &bodySize, &metaSize, &expiredAt, &staleAt, &createdAt, &host, &serverId) + if err != nil { + if brokenOnError { + return err + } + return nil + } + + err = this.Add(hash, &Item{ + Type: ItemTypeFile, + Key: key, + ExpiredAt: expiredAt, + StaleAt: staleAt, + HeaderSize: headerSize, + BodySize: bodySize, + MetaSize: metaSize, + Host: host, + ServerId: serverId, + Week1Hits: 0, + Week2Hits: 0, + Week: 0, + }) + if err != nil { + if brokenOnError { + return err + } + } + } + + return nil + }() if err != nil { return err } - if lists.ContainsString(this.oldTables, name) { - // 异步执行 - goman.New(func() { - remotelogs.Println("CACHE", "remove old table '"+name+"' ...") - _, _ = this.db.Exec(`DROP TABLE "` + name + `"`) - remotelogs.Println("CACHE", "remove old table '"+name+"' done") - }) - } + time.Sleep(1 * time.Second) } + return nil } diff --git a/internal/caches/list_file_db.go b/internal/caches/list_file_db.go new file mode 100644 index 0000000..96841c1 --- /dev/null +++ b/internal/caches/list_file_db.go @@ -0,0 +1,426 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package caches + +import ( + "database/sql" + "errors" + teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/TeaOSLab/EdgeNode/internal/utils/dbs" + "github.com/iwind/TeaGo/types" + timeutil "github.com/iwind/TeaGo/utils/time" + "time" +) + +type FileListDB struct { + db *dbs.DB + + itemsTableName string + hitsTableName string + + total int64 + + isClosed bool + isReady bool + + // cacheItems + existsByHashStmt *dbs.Stmt // 根据hash检查是否存在 + insertStmt *dbs.Stmt // 写入数据 + selectByHashStmt *dbs.Stmt // 使用hash查询数据 + deleteByHashStmt *dbs.Stmt // 根据hash删除数据 + statStmt *dbs.Stmt // 统计 + purgeStmt *dbs.Stmt // 清理 + deleteAllStmt *dbs.Stmt // 删除所有数据 + listOlderItemsStmt *dbs.Stmt // 读取较早存储的缓存 + + // hits + insertHitStmt *dbs.Stmt // 写入数据 + increaseHitStmt *dbs.Stmt // 增加点击量 + deleteHitByHashStmt *dbs.Stmt // 根据hash删除数据 + lfuHitsStmt *dbs.Stmt // 读取老的数据 +} + +func NewFileListDB() *FileListDB { + return &FileListDB{} +} + +func (this *FileListDB) Open(dbPath string) error { + db, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF") + if err != nil { + return errors.New("open database failed: " + err.Error()) + } + + db.SetMaxOpenConns(1) + + // TODO 耗时过长,暂时不整理数据库 + /**_, err = db.Exec("VACUUM") + if err != nil { + return err + }**/ + + this.db = dbs.NewDB(db) + + if teaconst.EnableDBStat { + this.db.EnableStat(true) + } + + return nil +} + +func (this *FileListDB) Init() error { + this.itemsTableName = "cacheItems" + this.hitsTableName = "hits" + + // 创建 + var err = this.initTables(1) + if err != nil { + return errors.New("init tables failed: " + err.Error()) + } + + // 读取总数量 + row := this.db.QueryRow(`SELECT COUNT(*) FROM "` + this.itemsTableName + `"`) + if row.Err() != nil { + return row.Err() + } + var total int64 + err = row.Scan(&total) + if err != nil { + return err + } + this.total = total + + // 常用语句 + this.existsByHashStmt, err = this.db.Prepare(`SELECT "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? AND expiredAt>? LIMIT 1`) + if err != nil { + return err + } + + this.insertStmt, err = this.db.Prepare(`INSERT INTO "` + this.itemsTableName + `" ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "staleAt", "host", "serverId", "createdAt") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`) + if err != nil { + return err + } + + this.selectByHashStmt, err = this.db.Prepare(`SELECT "key", "headerSize", "bodySize", "metaSize", "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? LIMIT 1`) + if err != nil { + return err + } + + this.deleteByHashStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `" WHERE "hash"=?`) + if err != nil { + return err + } + + this.statStmt, err = this.db.Prepare(`SELECT COUNT(*), IFNULL(SUM(headerSize+bodySize+metaSize), 0), IFNULL(SUM(headerSize+bodySize), 0) FROM "` + this.itemsTableName + `"`) + if err != nil { + return err + } + + this.purgeStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.itemsTableName + `" WHERE staleAt<=? LIMIT ?`) + if err != nil { + return err + } + + this.deleteAllStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `"`) + if err != nil { + return err + } + + this.listOlderItemsStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.itemsTableName + `" ORDER BY "id" ASC LIMIT ?`) + + this.insertHitStmt, err = this.db.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?)`) + + this.increaseHitStmt, err = this.db.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?) ON CONFLICT("hash") DO UPDATE SET "week1Hits"=IIF("week"=?, "week1Hits", "week2Hits"), "week2Hits"=IIF("week"=?, "week2Hits"+1, 1), "week"=?`) + if err != nil { + return err + } + + this.deleteHitByHashStmt, err = this.db.Prepare(`DELETE FROM "` + this.hitsTableName + `" WHERE "hash"=?`) + if err != nil { + return err + } + + this.lfuHitsStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.hitsTableName + `" ORDER BY "week" ASC, "week1Hits"+"week2Hits" ASC LIMIT ?`) + if err != nil { + return err + } + + this.isReady = true + + return nil +} + +func (this *FileListDB) IsReady() bool { + return this.isReady +} + +func (this *FileListDB) Total() int64 { + return this.total +} + +func (this *FileListDB) Add(hash string, item *Item) error { + if item.StaleAt == 0 { + item.StaleAt = item.ExpiredAt + } + + // 放入队列 + _, err := this.insertStmt.Exec(hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt, item.StaleAt, item.Host, item.ServerId, utils.UnixTime()) + if err != nil { + return err + } + + return nil +} + +func (this *FileListDB) ListExpiredItems(count int) (hashList []string, err error) { + if !this.isReady { + return nil, nil + } + + if count <= 0 { + count = 100 + } + + rows, err := this.purgeStmt.Query(time.Now().Unix(), count) + if err != nil { + return nil, err + } + defer func() { + _ = rows.Close() + }() + + for rows.Next() { + var hash string + err = rows.Scan(&hash) + if err != nil { + return nil, err + } + hashList = append(hashList, hash) + } + return hashList, nil +} + +func (this *FileListDB) ListLFUItems(count int) (hashList []string, err error) { + if !this.isReady { + return nil, nil + } + + if count <= 0 { + count = 100 + } + + hashList, err = this.listLFUItems(count) + if err != nil { + return + } + + if len(hashList) > count/2 { + return + } + + // 不足补齐 + olderHashList, err := this.listOlderItems(count - len(hashList)) + if err != nil { + return nil, err + } + hashList = append(hashList, olderHashList...) + return +} + +func (this *FileListDB) IncreaseHit(hash string) error { + var week = timeutil.Format("YW") + _, err := this.increaseHitStmt.Exec(hash, week, week, week, week) + return err +} + +func (this *FileListDB) CleanPrefix(prefix string) error { + if !this.isReady { + return nil + } + var count = int64(10000) + var staleLife = 600 // TODO 需要可以设置 + var unixTime = utils.UnixTime() // 只删除当前的,不删除新的 + for { + result, err := this.db.Exec(`UPDATE "`+this.itemsTableName+`" SET expiredAt=0,staleAt=? WHERE id IN (SELECT id FROM "`+this.itemsTableName+`" WHERE expiredAt>0 AND createdAt<=? AND INSTR("key", ?)=1 LIMIT `+types.String(count)+`)`, unixTime+int64(staleLife), unixTime, prefix) + if err != nil { + return err + } + affectedRows, err := result.RowsAffected() + if err != nil { + return err + } + if affectedRows < count { + return nil + } + } +} + +func (this *FileListDB) CleanAll() error { + if !this.isReady { + return nil + } + + _, err := this.deleteAllStmt.Exec() + if err != nil { + return err + } + + return nil +} + +func (this *FileListDB) Close() error { + this.isClosed = true + this.isReady = false + + if this.db != nil { + _ = this.existsByHashStmt.Close() + _ = this.insertStmt.Close() + _ = this.selectByHashStmt.Close() + _ = this.deleteByHashStmt.Close() + _ = this.statStmt.Close() + _ = this.purgeStmt.Close() + _ = this.deleteAllStmt.Close() + _ = this.listOlderItemsStmt.Close() + + _ = this.insertHitStmt.Close() + _ = this.increaseHitStmt.Close() + _ = this.deleteHitByHashStmt.Close() + _ = this.lfuHitsStmt.Close() + + return this.db.Close() + } + return nil +} + +// 初始化 +func (this *FileListDB) initTables(times int) error { + { + // expiredAt - 过期时间,用来判断有无过期 + // staleAt - 陈旧最大时间,用来清理缓存 + _, err := this.db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.itemsTableName + `" ( + "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, + "hash" varchar(32), + "key" varchar(1024), + "tag" varchar(64), + "headerSize" integer DEFAULT 0, + "bodySize" integer DEFAULT 0, + "metaSize" integer DEFAULT 0, + "expiredAt" integer DEFAULT 0, + "staleAt" integer DEFAULT 0, + "createdAt" integer DEFAULT 0, + "host" varchar(128), + "serverId" integer +); + +CREATE INDEX IF NOT EXISTS "createdAt" +ON "` + this.itemsTableName + `" ( + "createdAt" ASC +); + +CREATE INDEX IF NOT EXISTS "expiredAt" +ON "` + this.itemsTableName + `" ( + "expiredAt" ASC +); + +CREATE INDEX IF NOT EXISTS "staleAt" +ON "` + this.itemsTableName + `" ( + "staleAt" ASC +); + +CREATE UNIQUE INDEX IF NOT EXISTS "hash" +ON "` + this.itemsTableName + `" ( + "hash" ASC +); + +CREATE INDEX IF NOT EXISTS "serverId" +ON "` + this.itemsTableName + `" ( + "serverId" ASC +); +`) + + if err != nil { + // 尝试删除重建 + if times < 3 { + _, dropErr := this.db.Exec(`DROP TABLE "` + this.itemsTableName + `"`) + if dropErr == nil { + return this.initTables(times + 1) + } + return err + } + + return err + } + } + + { + _, err := this.db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.hitsTableName + `" ( + "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, + "hash" varchar(32), + "week1Hits" integer DEFAULT 0, + "week2Hits" integer DEFAULT 0, + "week" varchar(6) +); + +CREATE UNIQUE INDEX IF NOT EXISTS "hits_hash" +ON "` + this.hitsTableName + `" ( + "hash" ASC +); +`) + if err != nil { + // 尝试删除重建 + if times < 3 { + _, dropErr := this.db.Exec(`DROP TABLE "` + this.hitsTableName + `"`) + if dropErr == nil { + return this.initTables(times + 1) + } + return err + } + + return err + } + } + + return nil +} + +func (this *FileListDB) listLFUItems(count int) (hashList []string, err error) { + rows, err := this.lfuHitsStmt.Query(count) + if err != nil { + return nil, err + } + defer func() { + _ = rows.Close() + }() + + for rows.Next() { + var hash string + err = rows.Scan(&hash) + if err != nil { + return nil, err + } + hashList = append(hashList, hash) + } + + return hashList, nil +} + +func (this *FileListDB) listOlderItems(count int) (hashList []string, err error) { + rows, err := this.listOlderItemsStmt.Query(count) + if err != nil { + return nil, err + } + defer func() { + _ = rows.Close() + }() + + for rows.Next() { + var hash string + err = rows.Scan(&hash) + if err != nil { + return nil, err + } + hashList = append(hashList, hash) + } + + return hashList, nil +} diff --git a/internal/caches/list_file_test.go b/internal/caches/list_file_test.go index 8628b8d..277b982 100644 --- a/internal/caches/list_file_test.go +++ b/internal/caches/list_file_test.go @@ -1,8 +1,9 @@ // Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. -package caches +package caches_test import ( + "github.com/TeaOSLab/EdgeNode/internal/caches" "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/rands" @@ -15,21 +16,31 @@ import ( ) func TestFileList_Init(t *testing.T) { - list := NewFileList(Tea.Root + "/data") + var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1") err := list.Init() if err != nil { t.Fatal(err) } + defer func() { + _ = list.Close() + }() t.Log("ok") } func TestFileList_Add(t *testing.T) { - list := NewFileList(Tea.Root + "/data") + var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1").(*caches.FileList) err := list.Init() if err != nil { t.Fatal(err) } - err = list.Add(stringutil.Md5("123456"), &Item{ + + defer func() { + _ = list.Close() + }() + + var hash = stringutil.Md5("123456") + t.Log("db index:", list.GetDBIndex(hash)) + err = list.Add(hash, &caches.Item{ Key: "123456", ExpiredAt: time.Now().Unix(), HeaderSize: 1, @@ -41,19 +52,25 @@ func TestFileList_Add(t *testing.T) { if err != nil { t.Fatal(err) } + t.Log("ok") } func TestFileList_Add_Many(t *testing.T) { - list := NewFileList(Tea.Root + "/data") + var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1") err := list.Init() if err != nil { t.Fatal(err) } + + defer func() { + _ = list.Close() + }() + before := time.Now() - for i := 0; i < 2000_0000; i++ { + for i := 0; i < 100_000; i++ { u := "https://edge.teaos.cn/123456" + strconv.Itoa(i) - _ = list.Add(stringutil.Md5(u), &Item{ + _ = list.Add(stringutil.Md5(u), &caches.Item{ Key: u, ExpiredAt: time.Now().Unix() + 3600, HeaderSize: 1, @@ -72,36 +89,46 @@ func TestFileList_Add_Many(t *testing.T) { } func TestFileList_Exist(t *testing.T) { - list := NewFileList(Tea.Root + "/data") + var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1").(*caches.FileList) err := list.Init() if err != nil { t.Fatal(err) } - before := time.Now() + + defer func() { + _ = list.Close() + }() + + total, _ := list.Count() + t.Log("total:", total) + + var before = time.Now() defer func() { t.Log(time.Since(before).Seconds()*1000, "ms") }() { - exists, err := list.Exist(stringutil.Md5("123456")) + var hash = stringutil.Md5("123456") + exists, err := list.Exist(hash) if err != nil { t.Fatal(err) } - t.Log("exists:", exists) + t.Log(hash, "exists:", exists) } { - exists, err := list.Exist(stringutil.Md5("http://edge.teaos.cn/1234561")) + var hash = stringutil.Md5("http://edge.teaos.cn/1234561") + exists, err := list.Exist(hash) if err != nil { t.Fatal(err) } - t.Log("exists:", exists) + t.Log(hash, "exists:", exists) } } func TestFileList_Exist_Many_DB(t *testing.T) { // 测试在多个数据库下的性能 - var listSlice = []ListInterface{} + var listSlice = []caches.ListInterface{} for i := 1; i <= 10; i++ { - list := NewFileList(Tea.Root + "/data/data" + strconv.Itoa(i)) + list := caches.NewFileList(Tea.Root + "/data/data" + strconv.Itoa(i)) err := list.Init() if err != nil { t.Fatal(err) @@ -151,13 +178,18 @@ func TestFileList_Exist_Many_DB(t *testing.T) { } func TestFileList_CleanPrefix(t *testing.T) { - list := NewFileList(Tea.Root + "/data") + var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1") err := list.Init() if err != nil { t.Fatal(err) } + + defer func() { + _ = list.Close() + }() + before := time.Now() - err = list.CleanPrefix("1234") + err = list.CleanPrefix("123") if err != nil { t.Fatal(err) } @@ -165,12 +197,17 @@ func TestFileList_CleanPrefix(t *testing.T) { } func TestFileList_Remove(t *testing.T) { - list := NewFileList(Tea.Root + "/data") + var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1").(*caches.FileList) err := list.Init() if err != nil { t.Fatal(err) } - list.OnRemove(func(item *Item) { + + defer func() { + _ = list.Close() + }() + + list.OnRemove(func(item *caches.Item) { t.Logf("remove %#v", item) }) err = list.Remove(stringutil.Md5("123456")) @@ -178,30 +215,71 @@ func TestFileList_Remove(t *testing.T) { t.Fatal(err) } t.Log("ok") + + t.Log("===count===") + t.Log(list.Count()) } func TestFileList_Purge(t *testing.T) { - list := NewFileList(Tea.Root + "/data") + var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1") err := list.Init() if err != nil { t.Fatal(err) } - _, err = list.Purge(2, func(hash string) error { + defer func() { + _ = list.Close() + }() + + var count = 0 + _, err = list.Purge(caches.CountFileDB*2, func(hash string) error { t.Log(hash) + count++ return nil }) if err != nil { t.Fatal(err) } - t.Log("ok") + t.Log("ok, purged", count, "items") } -func TestFileList_Stat(t *testing.T) { - list := NewFileList(Tea.Root + "/data") +func TestFileList_PurgeLFU(t *testing.T) { + var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1") err := list.Init() if err != nil { t.Fatal(err) } + defer func() { + _ = list.Close() + }() + + err = list.IncreaseHit(stringutil.Md5("123456")) + if err != nil { + t.Fatal(err) + } + + var count = 0 + err = list.PurgeLFU(caches.CountFileDB*2, func(hash string) error { + t.Log(hash) + count++ + return nil + }) + if err != nil { + t.Fatal(err) + } + t.Log("ok, purged", count, "items") +} + +func TestFileList_Stat(t *testing.T) { + var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1") + err := list.Init() + if err != nil { + t.Fatal(err) + } + + defer func() { + _ = list.Close() + }() + stat, err := list.Stat(nil) if err != nil { t.Fatal(err) @@ -210,7 +288,7 @@ func TestFileList_Stat(t *testing.T) { } func TestFileList_Count(t *testing.T) { - list := NewFileList(Tea.Root + "/data") + list := caches.NewFileList(Tea.Root + "/data") err := list.Init() if err != nil { t.Fatal(err) @@ -225,7 +303,7 @@ func TestFileList_Count(t *testing.T) { } func TestFileList_CleanAll(t *testing.T) { - list := NewFileList(Tea.Root + "/data") + list := caches.NewFileList(Tea.Root + "/data") err := list.Init() if err != nil { t.Fatal(err) @@ -238,58 +316,17 @@ func TestFileList_CleanAll(t *testing.T) { t.Log(list.Count()) } -func TestFileList_Conflict(t *testing.T) { - list := NewFileList(Tea.Root + "/data").(*FileList) - err := list.Init() - if err != nil { - t.Fatal(err) - } - - rows, err := list.purgeStmt.Query(time.Now().Unix(), 1000) - if err != nil { - t.Fatal(err) - } - go func() { - time.Sleep(5 * time.Second) - _ = rows.Close() - }() - - t.Log("before exists") - t.Log(list.Exist("123456")) - t.Log("after exists") -} - -func TestFileList_IIF(t *testing.T) { - list := NewFileList(Tea.Root + "/data").(*FileList) - err := list.Init() - if err != nil { - t.Fatal(err) - } - - rows, err := list.db.Query("SELECT IIF(0, 2, 3)") - if err != nil { - t.Fatal(err) - } - defer func() { - _ = rows.Close() - }() - - if rows.Next() { - var result int - err = rows.Scan(&result) - if err != nil { - t.Fatal(err) - } - t.Log("result:", result) - } -} - func TestFileList_IncreaseHit(t *testing.T) { - list := NewFileList(Tea.Root + "/data") + var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1") err := list.Init() if err != nil { t.Fatal(err) } + + defer func() { + _ = list.Close() + }() + var before = time.Now() defer func() { t.Log(time.Since(before).Seconds()*1000, "ms") @@ -303,13 +340,32 @@ func TestFileList_IncreaseHit(t *testing.T) { t.Log("ok") } +func TestFileList_UpgradeV3(t *testing.T) { + var list = caches.NewFileList(Tea.Root + "/data/cache-index/p43").(*caches.FileList) + err := list.Init() + if err != nil { + t.Fatal(err) + } + + defer func() { + _ = list.Close() + }() + + err = list.UpgradeV3("/Users/WorkSpace/EdgeProject/EdgeCache/p43", false) + if err != nil { + t.Log(err) + return + } + t.Log("ok") +} + func BenchmarkFileList_Exist(b *testing.B) { - list := NewFileList(Tea.Root + "/data") + var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1") err := list.Init() if err != nil { b.Fatal(err) } for i := 0; i < b.N; i++ { - _, _ = list.Exist("f0eb5b87e0b0041f3917002c0707475f") + _, _ = list.Exist("f0eb5b87e0b0041f3917002c0707475f" + types.String(i)) } } diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 60d279d..1abbcff 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -122,11 +122,12 @@ func (this *FileStorage) Init() error { return errors.New("[CACHE]cache storage dir can not be empty") } - list := NewFileList(dir + "/p" + strconv.FormatInt(this.policy.Id, 10)) + var list = NewFileList(Tea.Root + "/data/cache-index/p" + types.String(this.policy.Id)) err = list.Init() if err != nil { return err } + list.(*FileList).SetOldDir(this.cacheConfig.Dir + "/p" + types.String(this.policy.Id)) this.list = list stat, err := list.Stat(func(hash string) bool { return true @@ -317,7 +318,7 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool, func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, size int64, maxSize int64, isPartial bool) (Writer, error) { // 是否正在退出 if teaconst.IsQuiting { - return nil, ErrWritingUnavaible + return nil, ErrWritingUnavailable } // 是否已忽略 @@ -336,6 +337,11 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, siz if err == nil { return writer, nil } + + // 如果队列满了,则等待 + if err == ErrWritingQueueFull { + return nil, err + } } // 是否正在写入 diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index beb3ae2..9c6a6b5 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -8,6 +8,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/utils" setutils "github.com/TeaOSLab/EdgeNode/internal/utils/sets" + "github.com/TeaOSLab/EdgeNode/internal/utils/sizes" "github.com/TeaOSLab/EdgeNode/internal/zero" "github.com/cespare/xxhash" "github.com/iwind/TeaGo/rands" @@ -42,7 +43,8 @@ type MemoryStorage struct { valuesMap map[uint64]*MemoryItem // hash => item - dirtyChan chan string // hash chan + dirtyChan chan string // hash chan + dirtyQueueSize int purgeTicker *utils.Ticker @@ -54,22 +56,25 @@ type MemoryStorage struct { func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy, parentStorage StorageInterface) *MemoryStorage { var dirtyChan chan string + var queueSize = policy.MemoryAutoFlushQueueSize + if parentStorage != nil { - var queueSize = policy.MemoryAutoFlushQueueSize if queueSize <= 0 { - queueSize = 2048 + queueSize = 2048 + int(policy.CapacityBytes()/sizes.G)*2048 } + dirtyChan = make(chan string, queueSize) } return &MemoryStorage{ - parentStorage: parentStorage, - policy: policy, - list: NewMemoryList(), - locker: &sync.RWMutex{}, - valuesMap: map[uint64]*MemoryItem{}, - dirtyChan: dirtyChan, - writingKeyMap: map[string]zero.Zero{}, - ignoreKeys: setutils.NewFixedSet(32768), + parentStorage: parentStorage, + policy: policy, + list: NewMemoryList(), + locker: &sync.RWMutex{}, + valuesMap: map[uint64]*MemoryItem{}, + dirtyChan: dirtyChan, + dirtyQueueSize: queueSize, + writingKeyMap: map[string]zero.Zero{}, + ignoreKeys: setutils.NewFixedSet(32768), } } @@ -101,11 +106,19 @@ func (this *MemoryStorage) Init() error { // 启动定时Flush memory to disk任务 if this.parentStorage != nil { - goman.New(func() { - for hash := range this.dirtyChan { - this.flushItem(hash) - } - }) + var threads = runtime.NumCPU() + if threads == 0 { + threads = 1 + } else if threads > 8 { + threads = 8 + } + for i := 0; i < threads; i++ { + goman.New(func() { + for hash := range this.dirtyChan { + this.flushItem(hash) + } + }) + } } return nil @@ -165,6 +178,15 @@ func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int, s } func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, size int64, maxSize int64, isDirty bool) (Writer, error) { + // 待写入队列是否已满 + if isDirty && + this.parentStorage != nil && + this.dirtyQueueSize > 0 && + len(this.dirtyChan) == this.dirtyQueueSize && + (expiredAt <= 0 || expiredAt > time.Now().Unix()+7200) { // 缓存时间过长 + return nil, ErrWritingQueueFull + } + this.locker.Lock() defer this.locker.Unlock() diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 58ebbcf..bbeec22 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -604,9 +604,8 @@ func (this *Node) listenSignals() { signal.Notify(queue, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGQUIT) goman.New(func() { for range queue { - events.Notify(events.EventTerminated) time.Sleep(100 * time.Millisecond) - os.Exit(0) + utils.Exit() return } }) @@ -650,7 +649,7 @@ func (this *Node) listenSock() error { // 退出主进程 events.Notify(events.EventQuit) - os.Exit(0) + utils.Exit() case "quit": _ = cmd.ReplyOk() _ = this.sock.Close() @@ -662,7 +661,7 @@ func (this *Node) listenSock() error { for { countActiveConnections := sharedListenerManager.TotalActiveConnections() if countActiveConnections <= 0 { - os.Exit(0) + utils.Exit() return } time.Sleep(1 * time.Second) diff --git a/internal/nodes/upgrade_manager.go b/internal/nodes/upgrade_manager.go index f26409d..7b08dcb 100644 --- a/internal/nodes/upgrade_manager.go +++ b/internal/nodes/upgrade_manager.go @@ -228,7 +228,7 @@ func (this *UpgradeManager) unzip(zipPath string) error { func (this *UpgradeManager) restart() error { // 重新启动 if DaemonIsOn && DaemonPid == os.Getppid() { - os.Exit(0) // TODO 试着更优雅重启 + utils.Exit() // TODO 试着更优雅重启 } else { exe, err := os.Executable() if err != nil { @@ -238,6 +238,9 @@ func (this *UpgradeManager) restart() error { // quit events.Notify(events.EventQuit) + // terminated + events.Notify(events.EventTerminated) + // 启动 cmd := exec.Command(exe, "start") err = cmd.Start() diff --git a/internal/utils/dbs/query_stat_manager.go b/internal/utils/dbs/query_stat_manager.go index 0498807..b1fb612 100644 --- a/internal/utils/dbs/query_stat_manager.go +++ b/internal/utils/dbs/query_stat_manager.go @@ -24,7 +24,11 @@ func init() { var stats = []string{} for _, stat := range SharedQueryStatManager.TopN(10) { var avg = stat.CostTotal / float64(stat.Calls) - stats = append(stats, fmt.Sprintf("%.2fms/%.2fms/%.2fms - %d - %s", stat.CostMin*1000, stat.CostMax*1000, avg*1000, stat.Calls, stat.Query)) + var query = stat.Query + if len(query) > 100 { + query = query[:100] + } + stats = append(stats, fmt.Sprintf("%.2fms/%.2fms/%.2fms - %d - %s", stat.CostMin*1000, stat.CostMax*1000, avg*1000, stat.Calls, query)) } logs.Println("====DB STATS====\n" + strings.Join(stats, "\n")) } diff --git a/internal/utils/exit.go b/internal/utils/exit.go new file mode 100644 index 0000000..61c3a37 --- /dev/null +++ b/internal/utils/exit.go @@ -0,0 +1,13 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package utils + +import ( + "github.com/TeaOSLab/EdgeNode/internal/events" + "os" +) + +func Exit() { + events.Notify(events.EventTerminated) + os.Exit(0) +} diff --git a/internal/utils/fnv/hash.go b/internal/utils/fnv/hash.go new file mode 100644 index 0000000..644f9e4 --- /dev/null +++ b/internal/utils/fnv/hash.go @@ -0,0 +1,19 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package fnv + +const ( + offset64 uint64 = 14695981039346656037 + prime64 uint64 = 1099511628211 +) + +// Hash +// 非unique Hash +func Hash(key string) uint64 { + var hash = offset64 + for _, b := range key { + hash ^= uint64(b) + hash *= prime64 + } + return hash +} diff --git a/internal/utils/fnv/hash_test.go b/internal/utils/fnv/hash_test.go new file mode 100644 index 0000000..524778f --- /dev/null +++ b/internal/utils/fnv/hash_test.go @@ -0,0 +1,16 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package fnv_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/utils/fnv" + "github.com/iwind/TeaGo/types" + "testing" +) + +func TestHash(t *testing.T) { + for _, key := range []string{"costarring", "liquid", "hello"} { + var h = fnv.Hash(key) + t.Log(key + " => " + types.String(h)) + } +} diff --git a/internal/utils/sizes/sizes.go b/internal/utils/sizes/sizes.go index d804dee..c0bb3ff 100644 --- a/internal/utils/sizes/sizes.go +++ b/internal/utils/sizes/sizes.go @@ -3,7 +3,7 @@ package sizes const ( - K = 1024 + K int64 = 1024 M = 1024 * K G = 1024 * M T = 1024 * G