优化本地数据库性能

This commit is contained in:
刘祥超
2022-03-15 18:32:39 +08:00
parent ddebc0e4a8
commit 21e206061d
13 changed files with 940 additions and 515 deletions

View File

@@ -4,54 +4,32 @@ package caches
import (
"database/sql"
"errors"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/ttlcache"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
"github.com/iwind/TeaGo/lists"
"github.com/TeaOSLab/EdgeNode/internal/utils/fnv"
"github.com/iwind/TeaGo/types"
timeutil "github.com/iwind/TeaGo/utils/time"
_ "github.com/mattn/go-sqlite3"
"os"
"sync/atomic"
"time"
)
const CountFileDB = 10
// FileList 文件缓存列表管理
type FileList struct {
dir string
db *dbs.DB
total int64
dir string
dbList [CountFileDB]*FileListDB
total int64
onAdd func(item *Item)
onRemove func(item *Item)
// cacheItems
existsByHashStmt *dbs.Stmt // 根据hash检查是否存在
insertStmt *dbs.Stmt // 写入数据
selectByHashStmt *dbs.Stmt // 使用hash查询数据
deleteByHashStmt *dbs.Stmt // 根据hash删除数据
statStmt *dbs.Stmt // 统计
purgeStmt *dbs.Stmt // 清理
deleteAllStmt *dbs.Stmt // 删除所有数据
// hits
insertHitStmt *dbs.Stmt // 写入数据
increaseHitStmt *dbs.Stmt // 增加点击量
deleteHitByHashStmt *dbs.Stmt // 根据hash删除数据
lfuHitsStmt *dbs.Stmt // 读取老的数据
oldTables []string
itemsTableName string
hitsTableName string
isClosed bool
isReady bool
memoryCache *ttlcache.Cache
// 老数据库地址
oldDir string
}
func NewFileList(dir string) ListInterface {
@@ -61,6 +39,10 @@ func NewFileList(dir string) ListInterface {
}
}
func (this *FileList) SetOldDir(oldDir string) {
this.oldDir = oldDir
}
func (this *FileList) Init() error {
// 检查目录是否存在
_, err := os.Stat(this.dir)
@@ -72,118 +54,38 @@ func (this *FileList) Init() error {
remotelogs.Println("CACHE", "create cache dir '"+this.dir+"'")
}
this.itemsTableName = "cacheItems_v3"
this.hitsTableName = "hits"
var dir = this.dir
if dir == "/" {
// 防止sqlite提示authority错误
dir = ""
}
var dbPath = dir + "/index.db"
remotelogs.Println("CACHE", "loading database '"+dbPath+"'")
db, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF&_cache_size=16000")
if err != nil {
return errors.New("open database failed: " + err.Error())
}
db.SetMaxOpenConns(1)
remotelogs.Println("CACHE", "loading database from '"+dir+"' ...")
for i := 0; i < CountFileDB; i++ {
var db = NewFileListDB()
err = db.Open(dir + "/db-" + types.String(i) + ".db")
if err != nil {
return err
}
this.db = dbs.NewDB(db)
err = db.Init()
if err != nil {
return err
}
if teaconst.EnableDBStat {
this.db.EnableStat(true)
}
// TODO 耗时过长,暂时不整理数据库
/**_, err = db.Exec("VACUUM")
if err != nil {
return err
}**/
// 创建
err = this.initTables(db, 1)
if err != nil {
return errors.New("init tables failed: " + err.Error())
}
// 清除旧表
// 这个一定要在initTables()之后,因为老的数据需要转移
this.oldTables = []string{
"cacheItems",
"cacheItems_v2",
}
err = this.removeOldTables()
if err != nil {
remotelogs.Warn("CACHE", "clean old tables failed: "+err.Error())
this.dbList[i] = db
}
// 读取总数量
row := this.db.QueryRow(`SELECT COUNT(*) FROM "` + this.itemsTableName + `"`)
if row.Err() != nil {
return row.Err()
}
var total int64
err = row.Scan(&total)
if err != nil {
return err
}
this.total = total
// 常用语句
this.existsByHashStmt, err = this.db.Prepare(`SELECT "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? AND expiredAt>? LIMIT 1`)
if err != nil {
return err
this.total = 0
for _, db := range this.dbList {
this.total += db.total
}
this.insertStmt, err = this.db.Prepare(`INSERT INTO "` + this.itemsTableName + `" ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "staleAt", "host", "serverId", "createdAt") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
if err != nil {
return err
}
this.selectByHashStmt, err = this.db.Prepare(`SELECT "key", "headerSize", "bodySize", "metaSize", "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? LIMIT 1`)
if err != nil {
return err
}
this.deleteByHashStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `" WHERE "hash"=?`)
if err != nil {
return err
}
this.statStmt, err = this.db.Prepare(`SELECT COUNT(*), IFNULL(SUM(headerSize+bodySize+metaSize), 0), IFNULL(SUM(headerSize+bodySize), 0) FROM "` + this.itemsTableName + `"`)
if err != nil {
return err
}
this.purgeStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.itemsTableName + `" WHERE staleAt<=? LIMIT ?`)
if err != nil {
return err
}
this.deleteAllStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `"`)
if err != nil {
return err
}
this.insertHitStmt, err = this.db.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?)`)
this.increaseHitStmt, err = this.db.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?) ON CONFLICT("hash") DO UPDATE SET "week1Hits"=IIF("week"=?, "week1Hits", "week2Hits"), "week2Hits"=IIF("week"=?, "week2Hits"+1, 1), "week"=?`)
if err != nil {
return err
}
this.deleteHitByHashStmt, err = this.db.Prepare(`DELETE FROM "` + this.hitsTableName + `" WHERE "hash"=?`)
if err != nil {
return err
}
this.lfuHitsStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.hitsTableName + `" ORDER BY "week" ASC, "week1Hits"+"week2Hits" ASC LIMIT ?`)
if err != nil {
return err
}
this.isReady = true
// 升级老版本数据库
goman.New(func() {
this.upgradeOldDB()
})
return nil
}
@@ -194,24 +96,22 @@ func (this *FileList) Reset() error {
}
func (this *FileList) Add(hash string, item *Item) error {
if !this.isReady {
var db = this.getDB(hash)
if !db.IsReady() {
return nil
}
if item.StaleAt == 0 {
item.StaleAt = item.ExpiredAt
}
// 放入队列
_, err := this.insertStmt.Exec(hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt, item.StaleAt, item.Host, item.ServerId, utils.UnixTime())
err := db.Add(hash, item)
if err != nil {
return err
}
atomic.AddInt64(&this.total, 1)
// 这里不增加点击量,以减少对数据库的操作次数
this.memoryCache.Write(hash, 1, item.ExpiredAt)
atomic.AddInt64(&this.total, 1)
if this.onAdd != nil {
this.onAdd(item)
@@ -220,7 +120,9 @@ func (this *FileList) Add(hash string, item *Item) error {
}
func (this *FileList) Exist(hash string) (bool, error) {
if !this.isReady {
var db = this.getDB(hash)
if !db.IsReady() {
return false, nil
}
@@ -229,7 +131,7 @@ func (this *FileList) Exist(hash string) (bool, error) {
return true, nil
}
rows, err := this.existsByHashStmt.Query(hash, time.Now().Unix())
rows, err := db.existsByHashStmt.Query(hash, time.Now().Unix())
if err != nil {
return false, err
}
@@ -250,10 +152,6 @@ func (this *FileList) Exist(hash string) (bool, error) {
// CleanPrefix 清理某个前缀的缓存数据
func (this *FileList) CleanPrefix(prefix string) error {
if !this.isReady {
return nil
}
if len(prefix) == 0 {
return nil
}
@@ -262,106 +160,48 @@ func (this *FileList) CleanPrefix(prefix string) error {
this.memoryCache.Clean()
}()
var count = int64(10000)
var staleLife = 600 // TODO 需要可以设置
var unixTime = utils.UnixTime() // 只删除当前的,不删除新的
for {
result, err := this.db.Exec(`UPDATE "`+this.itemsTableName+`" SET expiredAt=0,staleAt=? WHERE id IN (SELECT id FROM "`+this.itemsTableName+`" WHERE expiredAt>0 AND createdAt<=? AND INSTR("key", ?)=1 LIMIT `+types.String(count)+`)`, unixTime+int64(staleLife), unixTime, prefix)
for _, db := range this.dbList {
err := db.CleanPrefix(prefix)
if err != nil {
return err
}
affectedRows, err := result.RowsAffected()
if err != nil {
return err
}
if affectedRows < count {
return nil
}
}
return nil
}
func (this *FileList) Remove(hash string) error {
if !this.isReady {
return nil
}
// 从缓存中删除
this.memoryCache.Delete(hash)
row := this.selectByHashStmt.QueryRow(hash)
if row.Err() != nil {
return row.Err()
}
var item = &Item{Type: ItemTypeFile}
err := row.Scan(&item.Key, &item.HeaderSize, &item.BodySize, &item.MetaSize, &item.ExpiredAt)
if err != nil {
if err == sql.ErrNoRows {
return nil
}
return err
}
_, err = this.deleteByHashStmt.Exec(hash)
if err != nil {
return err
}
_, err = this.deleteHitByHashStmt.Exec(hash)
if err != nil {
return err
}
atomic.AddInt64(&this.total, -1)
if this.onRemove != nil {
this.onRemove(item)
}
return nil
_, err := this.remove(hash)
return err
}
// Purge 清理过期的缓存
// count 每次遍历的最大数量,控制此数字可以保证每次清理的时候不用花太多时间
// callback 每次发现过期key的调用
func (this *FileList) Purge(count int, callback func(hash string) error) (int, error) {
if !this.isReady {
return 0, nil
}
count /= CountFileDB
if count <= 0 {
count = 1000
count = 100
}
rows, err := this.purgeStmt.Query(time.Now().Unix(), count)
if err != nil {
return 0, err
}
hashStrings := []string{}
var countFound = 0
for rows.Next() {
var hash string
err = rows.Scan(&hash)
for _, db := range this.dbList {
hashStrings, err := db.ListExpiredItems(count)
if err != nil {
_ = rows.Close()
return 0, err
return 0, nil
}
hashStrings = append(hashStrings, hash)
countFound++
}
_ = rows.Close() // 不能使用defer防止读写冲突
countFound += len(hashStrings)
// 不在 rows.Next() 循环中操作是为了避免死锁
for _, hash := range hashStrings {
err = this.Remove(hash)
if err != nil {
return 0, err
}
// 不在 rows.Next() 循环中操作是为了避免死锁
for _, hash := range hashStrings {
err = this.Remove(hash)
if err != nil {
return 0, err
}
err = callback(hash)
if err != nil {
return 0, err
err = callback(hash)
if err != nil {
return 0, err
}
}
}
@@ -369,80 +209,80 @@ func (this *FileList) Purge(count int, callback func(hash string) error) (int, e
}
func (this *FileList) PurgeLFU(count int, callback func(hash string) error) error {
if !this.isReady {
return nil
}
count /= CountFileDB
if count <= 0 {
return nil
count = 100
}
rows, err := this.lfuHitsStmt.Query(count)
if err != nil {
return err
}
hashStrings := []string{}
var countFound = 0
for rows.Next() {
var hash string
err = rows.Scan(&hash)
if err != nil {
_ = rows.Close()
return err
}
hashStrings = append(hashStrings, hash)
countFound++
}
_ = rows.Close() // 不能使用defer防止读写冲突
// 不在 rows.Next() 循环中操作是为了避免死锁
for _, hash := range hashStrings {
err = this.Remove(hash)
for _, db := range this.dbList {
hashStrings, err := db.ListLFUItems(count)
if err != nil {
return err
}
err = callback(hash)
if err != nil {
return err
// 不在 rows.Next() 循环中操作是为了避免死锁
for _, hash := range hashStrings {
notFound, err := this.remove(hash)
if err != nil {
return err
}
if notFound {
_, err = db.deleteHitByHashStmt.Exec(hash)
if err != nil {
return err
}
}
err = callback(hash)
if err != nil {
return err
}
}
}
return nil
}
func (this *FileList) CleanAll() error {
if !this.isReady {
return nil
defer this.memoryCache.Clean()
for _, db := range this.dbList {
err := db.CleanAll()
if err != nil {
return err
}
}
this.memoryCache.Clean()
_, err := this.deleteAllStmt.Exec()
if err != nil {
return err
}
atomic.StoreInt64(&this.total, 0)
return nil
}
func (this *FileList) Stat(check func(hash string) bool) (*Stat, error) {
if !this.isReady {
return &Stat{}, nil
var result = &Stat{}
for _, db := range this.dbList {
if !db.IsReady() {
return &Stat{}, nil
}
// 这里不设置过期时间、不使用 check 函数,目的是让查询更快速一些
_ = check
row := db.statStmt.QueryRow()
if row.Err() != nil {
return nil, row.Err()
}
var stat = &Stat{}
err := row.Scan(&stat.Count, &stat.Size, &stat.ValueSize)
if err != nil {
return nil, err
}
result.Count += stat.Count
result.Size += stat.Size
result.ValueSize += stat.ValueSize
}
// 这里不设置过期时间、不使用 check 函数,目的是让查询更快速一些
row := this.statStmt.QueryRow()
if row.Err() != nil {
return nil, row.Err()
}
stat := &Stat{}
err := row.Scan(&stat.Count, &stat.Size, &stat.ValueSize)
if err != nil {
return nil, err
}
return stat, nil
return result, nil
}
// Count 总数量
@@ -453,9 +293,13 @@ func (this *FileList) Count() (int64, error) {
// IncreaseHit 增加点击量
func (this *FileList) IncreaseHit(hash string) error {
var week = timeutil.Format("YW")
_, err := this.increaseHitStmt.Exec(hash, week, week, week, week)
return err
var db = this.getDB(hash)
if !db.IsReady() {
return nil
}
return db.IncreaseHit(hash)
}
// OnAdd 添加事件
@@ -469,163 +313,176 @@ func (this *FileList) OnRemove(f func(item *Item)) {
}
func (this *FileList) Close() error {
this.isClosed = true
this.isReady = false
this.memoryCache.Destroy()
if this.db != nil {
_ = this.existsByHashStmt.Close()
_ = this.insertStmt.Close()
_ = this.selectByHashStmt.Close()
_ = this.deleteByHashStmt.Close()
_ = this.statStmt.Close()
_ = this.purgeStmt.Close()
_ = this.deleteAllStmt.Close()
_ = this.insertHitStmt.Close()
_ = this.increaseHitStmt.Close()
_ = this.deleteHitByHashStmt.Close()
_ = this.lfuHitsStmt.Close()
return this.db.Close()
for _, db := range this.dbList {
if db != nil {
_ = db.Close()
}
}
return nil
}
// 初始化
func (this *FileList) initTables(db *sql.DB, times int) error {
// 检查是否存在
_, err := db.Exec(`SELECT id FROM "` + this.itemsTableName + `" LIMIT 1`)
var notFound = false
func (this *FileList) GetDBIndex(hash string) uint64 {
return fnv.Hash(hash) % CountFileDB
}
func (this *FileList) getDB(hash string) *FileListDB {
return this.dbList[fnv.Hash(hash)%CountFileDB]
}
func (this *FileList) remove(hash string) (notFound bool, err error) {
var db = this.getDB(hash)
if !db.IsReady() {
return false, nil
}
// 从缓存中删除
this.memoryCache.Delete(hash)
var row = db.selectByHashStmt.QueryRow(hash)
if row.Err() != nil {
if row.Err() == sql.ErrNoRows {
return true, nil
}
return false, row.Err()
}
var item = &Item{Type: ItemTypeFile}
err = row.Scan(&item.Key, &item.HeaderSize, &item.BodySize, &item.MetaSize, &item.ExpiredAt)
if err != nil {
notFound = true
}
{
// expiredAt - 过期时间,用来判断有无过期
// staleAt - 陈旧最大时间,用来清理缓存
_, err := db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.itemsTableName + `" (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"hash" varchar(32),
"key" varchar(1024),
"tag" varchar(64),
"headerSize" integer DEFAULT 0,
"bodySize" integer DEFAULT 0,
"metaSize" integer DEFAULT 0,
"expiredAt" integer DEFAULT 0,
"staleAt" integer DEFAULT 0,
"createdAt" integer DEFAULT 0,
"host" varchar(128),
"serverId" integer
);
CREATE INDEX IF NOT EXISTS "createdAt"
ON "` + this.itemsTableName + `" (
"createdAt" ASC
);
CREATE INDEX IF NOT EXISTS "expiredAt"
ON "` + this.itemsTableName + `" (
"expiredAt" ASC
);
CREATE INDEX IF NOT EXISTS "staleAt"
ON "` + this.itemsTableName + `" (
"staleAt" ASC
);
CREATE UNIQUE INDEX IF NOT EXISTS "hash"
ON "` + this.itemsTableName + `" (
"hash" ASC
);
CREATE INDEX IF NOT EXISTS "serverId"
ON "` + this.itemsTableName + `" (
"serverId" ASC
);
`)
if err != nil {
// 尝试删除重建
if times < 3 {
_, dropErr := db.Exec(`DROP TABLE "` + this.itemsTableName + `"`)
if dropErr == nil {
return this.initTables(db, times+1)
}
return err
}
return err
if err == sql.ErrNoRows {
return true, nil
}
return false, err
}
// 如果数据为空,从老数据中加载数据
if notFound {
// v2 => v3
remotelogs.Println("CACHE", "transferring old data from v2 to v3 ...")
result, err := db.Exec(`INSERT INTO "` + this.itemsTableName + `" ("id", "hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "createdAt", "host", "serverId", "staleAt") SELECT "id", "hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "createdAt", "host", "serverId", "expiredAt"+600 FROM cacheItems_v2`)
if err == nil {
count, _ := result.RowsAffected()
remotelogs.Println("CACHE", "transfer old data from v2 to v3 finished, "+types.String(count)+" rows transferred")
}
_, err = db.deleteByHashStmt.Exec(hash)
if err != nil {
return false, err
}
{
_, err := db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.hitsTableName + `" (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"hash" varchar(32),
"week1Hits" integer DEFAULT 0,
"week2Hits" integer DEFAULT 0,
"week" varchar(6)
);
atomic.AddInt64(&this.total, -1)
CREATE UNIQUE INDEX IF NOT EXISTS "hits_hash"
ON "` + this.hitsTableName + `" (
"hash" ASC
);
`)
if err != nil {
// 尝试删除重建
if times < 3 {
_, dropErr := db.Exec(`DROP TABLE "` + this.hitsTableName + `"`)
if dropErr == nil {
return this.initTables(db, times+1)
}
return err
}
return err
}
_, err = db.deleteHitByHashStmt.Exec(hash)
if err != nil {
return false, err
}
return nil
if this.onRemove != nil {
this.onRemove(item)
}
return false, nil
}
// 删除过期不用的表格
func (this *FileList) removeOldTables() error {
rows, err := this.db.Query(`SELECT "name" FROM sqlite_master WHERE "type"='table'`)
// 升级老版本数据库
func (this *FileList) upgradeOldDB() {
if len(this.oldDir) == 0 {
return
}
_ = this.UpgradeV3(this.oldDir, false)
}
func (this *FileList) UpgradeV3(oldDir string, brokenOnError bool) error {
// index.db
var indexDBPath = oldDir + "/index.db"
_, err := os.Stat(indexDBPath)
if err != nil {
return nil
}
remotelogs.Println("CACHE", "upgrading local database from '"+oldDir+"' ...")
defer func() {
_ = os.Remove(indexDBPath)
}()
db, err := sql.Open("sqlite3", "file:"+indexDBPath+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF")
if err != nil {
return err
}
defer func() {
_ = rows.Close()
_ = db.Close()
}()
for rows.Next() {
var name string
err = rows.Scan(&name)
var isFinished = false
var offset = 0
var count = 10000
for {
if isFinished {
break
}
err = func() error {
defer func() {
offset += count
}()
rows, err := db.Query(`SELECT "hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "staleAt", "createdAt", "host", "serverId" FROM "cacheItems_v3" LIMIT ?, ?`, offset, count)
if err != nil {
return err
}
defer func() {
_ = rows.Close()
}()
var hash = ""
var key = ""
var headerSize int64
var bodySize int64
var metaSize int64
var expiredAt int64
var staleAt int64
var createdAt int64
var host string
var serverId int64
isFinished = true
for rows.Next() {
isFinished = false
err = rows.Scan(&hash, &key, &headerSize, &bodySize, &metaSize, &expiredAt, &staleAt, &createdAt, &host, &serverId)
if err != nil {
if brokenOnError {
return err
}
return nil
}
err = this.Add(hash, &Item{
Type: ItemTypeFile,
Key: key,
ExpiredAt: expiredAt,
StaleAt: staleAt,
HeaderSize: headerSize,
BodySize: bodySize,
MetaSize: metaSize,
Host: host,
ServerId: serverId,
Week1Hits: 0,
Week2Hits: 0,
Week: 0,
})
if err != nil {
if brokenOnError {
return err
}
}
}
return nil
}()
if err != nil {
return err
}
if lists.ContainsString(this.oldTables, name) {
// 异步执行
goman.New(func() {
remotelogs.Println("CACHE", "remove old table '"+name+"' ...")
_, _ = this.db.Exec(`DROP TABLE "` + name + `"`)
remotelogs.Println("CACHE", "remove old table '"+name+"' done")
})
}
time.Sleep(1 * time.Second)
}
return nil
}