Files
EdgeNode/internal/caches/list_file.go

648 lines
14 KiB
Go
Raw Normal View History

2021-05-19 12:07:35 +08:00
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package caches
import (
"database/sql"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/ttlcache"
2021-06-14 19:55:06 +08:00
"github.com/TeaOSLab/EdgeNode/internal/utils"
2021-06-13 17:37:57 +08:00
"github.com/iwind/TeaGo/lists"
2021-12-17 11:54:27 +08:00
"github.com/iwind/TeaGo/types"
timeutil "github.com/iwind/TeaGo/utils/time"
2021-05-19 12:07:35 +08:00
_ "github.com/mattn/go-sqlite3"
"os"
2021-12-16 17:27:21 +08:00
"strings"
2021-05-19 12:07:35 +08:00
"sync/atomic"
"time"
)
// FileList 文件缓存列表管理
type FileList struct {
dir string
db *sql.DB
total int64
onAdd func(item *Item)
onRemove func(item *Item)
2021-06-13 17:37:57 +08:00
// cacheItems
2021-06-13 17:37:57 +08:00
existsByHashStmt *sql.Stmt // 根据hash检查是否存在
insertStmt *sql.Stmt // 写入数据
selectByHashStmt *sql.Stmt // 使用hash查询数据
deleteByHashStmt *sql.Stmt // 根据hash删除数据
statStmt *sql.Stmt // 统计
purgeStmt *sql.Stmt // 清理
deleteAllStmt *sql.Stmt // 删除所有数据
// hits
insertHitStmt *sql.Stmt // 写入数据
increaseHitStmt *sql.Stmt // 增加点击量
deleteHitByHashStmt *sql.Stmt // 根据hash删除数据
lfuHitsStmt *sql.Stmt // 读取老的数据
2021-06-13 17:37:57 +08:00
oldTables []string
itemsTableName string
hitsTableName string
2021-06-13 17:51:04 +08:00
isClosed bool
isReady bool
memoryCache *ttlcache.Cache
2021-05-19 12:07:35 +08:00
}
func NewFileList(dir string) ListInterface {
return &FileList{
dir: dir,
memoryCache: ttlcache.NewCache(),
}
2021-05-19 12:07:35 +08:00
}
func (this *FileList) Init() error {
// 检查目录是否存在
_, err := os.Stat(this.dir)
if err != nil {
err = os.MkdirAll(this.dir, 0777)
if err != nil {
return err
}
remotelogs.Println("CACHE", "create cache dir '"+this.dir+"'")
}
2021-06-13 17:37:57 +08:00
this.itemsTableName = "cacheItems_v2"
this.hitsTableName = "hits"
2021-06-13 17:37:57 +08:00
var dir = this.dir
if dir == "/" {
// 防止sqlite提示authority错误
dir = ""
}
var dbPath = dir + "/index.db"
remotelogs.Println("CACHE", "loading database '"+dbPath+"'")
db, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=shared&mode=rwc&_journal_mode=WAL")
2021-05-19 12:07:35 +08:00
if err != nil {
return err
}
// 检查数据库
_, err = db.Exec(`SELECT * FROM "` + this.itemsTableName + `" LIMIT 1`)
if err != nil {
// 删除重建
remotelogs.Println("CACHE", "rebuilding database '"+dbPath+"'")
_ = db.Close()
this.isClosed = false
_ = os.Remove(dbPath)
db, err = sql.Open("sqlite3", "file:"+dbPath+"?cache=shared&mode=rwc&_journal_mode=WAL")
if err != nil {
return err
}
}
2021-05-19 12:07:35 +08:00
db.SetMaxOpenConns(1)
2021-06-13 17:37:57 +08:00
this.db = db
2021-05-19 12:07:35 +08:00
2021-06-13 17:37:57 +08:00
// 清除旧表
this.oldTables = []string{
"cacheItems",
}
err = this.removeOldTables()
2021-05-19 12:07:35 +08:00
if err != nil {
2021-06-13 17:37:57 +08:00
remotelogs.Warn("CACHE", "clean old tables failed: "+err.Error())
2021-05-19 12:07:35 +08:00
}
2021-06-13 17:37:57 +08:00
// TODO 耗时过长,暂时不整理数据库
/**_, err = db.Exec("VACUUM")
if err != nil {
return err
}**/
2021-05-19 12:07:35 +08:00
// 创建
2021-06-17 21:13:21 +08:00
err = this.initTables(db, 1)
2021-05-19 12:07:35 +08:00
if err != nil {
return err
}
2021-12-16 17:27:21 +08:00
// 检查staleAt字段
err = this.checkStaleAtField()
if err != nil {
return err
}
2021-05-19 12:07:35 +08:00
// 读取总数量
2021-06-13 17:37:57 +08:00
row := this.db.QueryRow(`SELECT COUNT(*) FROM "` + this.itemsTableName + `"`)
2021-05-19 12:07:35 +08:00
if row.Err() != nil {
return row.Err()
}
var total int64
err = row.Scan(&total)
if err != nil {
return err
}
this.total = total
2021-06-13 17:37:57 +08:00
// 常用语句
this.existsByHashStmt, err = this.db.Prepare(`SELECT "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? AND expiredAt>? LIMIT 1`)
2021-06-13 17:37:57 +08:00
if err != nil {
return err
}
2021-12-16 17:27:21 +08:00
this.insertStmt, err = this.db.Prepare(`INSERT INTO "` + this.itemsTableName + `" ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "staleAt", "host", "serverId", "createdAt") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
2021-06-13 17:37:57 +08:00
if err != nil {
return err
}
this.selectByHashStmt, err = this.db.Prepare(`SELECT "key", "headerSize", "bodySize", "metaSize", "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? LIMIT 1`)
if err != nil {
return err
}
this.deleteByHashStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `" WHERE "hash"=?`)
if err != nil {
return err
}
this.statStmt, err = this.db.Prepare(`SELECT COUNT(*), IFNULL(SUM(headerSize+bodySize+metaSize), 0), IFNULL(SUM(headerSize+bodySize), 0) FROM "` + this.itemsTableName + `"`)
2021-06-13 17:37:57 +08:00
if err != nil {
return err
}
2021-12-16 17:27:21 +08:00
this.purgeStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.itemsTableName + `" WHERE staleAt<=? LIMIT ?`)
2021-06-13 17:37:57 +08:00
if err != nil {
return err
}
this.deleteAllStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `"`)
if err != nil {
return err
}
this.insertHitStmt, err = this.db.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?)`)
this.increaseHitStmt, err = this.db.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?) ON CONFLICT("hash") DO UPDATE SET "week1Hits"=IIF("week"=?, "week1Hits", "week2Hits"), "week2Hits"=IIF("week"=?, "week2Hits"+1, 1), "week"=?`)
if err != nil {
return err
}
this.deleteHitByHashStmt, err = this.db.Prepare(`DELETE FROM "` + this.hitsTableName + `" WHERE "hash"=?`)
if err != nil {
return err
}
this.lfuHitsStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.hitsTableName + `" ORDER BY "week" ASC, "week1Hits"+"week2Hits" ASC LIMIT ?`)
if err != nil {
return err
}
this.isReady = true
2021-05-19 12:07:35 +08:00
return nil
}
func (this *FileList) Reset() error {
2021-06-15 10:47:40 +08:00
// 不做任何事情
2021-05-19 12:07:35 +08:00
return nil
}
func (this *FileList) Add(hash string, item *Item) error {
if !this.isReady {
2021-06-13 17:51:04 +08:00
return nil
}
2021-12-16 17:27:21 +08:00
if item.StaleAt == 0 {
item.StaleAt = item.ExpiredAt
}
_, err := this.insertStmt.Exec(hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt, item.StaleAt, item.Host, item.ServerId, utils.UnixTime())
2021-05-19 12:07:35 +08:00
if err != nil {
return err
}
_, err = this.insertHitStmt.Exec(hash, timeutil.Format("YW"))
if err != nil {
return err
}
2021-05-19 12:07:35 +08:00
atomic.AddInt64(&this.total, 1)
if this.onAdd != nil {
this.onAdd(item)
}
return nil
}
func (this *FileList) Exist(hash string) (bool, error) {
if !this.isReady {
2021-06-13 17:51:04 +08:00
return false, nil
}
item := this.memoryCache.Read(hash)
if item != nil {
return true, nil
}
2021-06-13 17:37:57 +08:00
rows, err := this.existsByHashStmt.Query(hash, time.Now().Unix())
2021-05-19 12:07:35 +08:00
if err != nil {
return false, err
}
2021-06-13 17:37:57 +08:00
defer func() {
_ = rows.Close()
}()
if rows.Next() {
var expiredAt int64
err = rows.Scan(&expiredAt)
if err != nil {
2021-10-03 18:00:57 +08:00
return false, nil
}
this.memoryCache.Write(hash, 1, expiredAt)
2021-06-13 17:37:57 +08:00
return true, nil
}
return false, nil
2021-05-19 12:07:35 +08:00
}
2021-06-13 17:37:57 +08:00
// CleanPrefix 清理某个前缀的缓存数据
func (this *FileList) CleanPrefix(prefix string) error {
if !this.isReady {
2021-06-13 17:51:04 +08:00
return nil
}
2021-05-19 12:07:35 +08:00
if len(prefix) == 0 {
2021-06-13 17:37:57 +08:00
return nil
2021-05-19 12:07:35 +08:00
}
2021-08-26 15:48:09 +08:00
defer func() {
this.memoryCache.Clean()
}()
2021-06-13 17:37:57 +08:00
var count = int64(10000)
2021-12-17 11:54:27 +08:00
var staleLife = 600 // TODO 需要可以设置
2021-06-13 17:37:57 +08:00
for {
2021-12-17 11:54:27 +08:00
result, err := this.db.Exec(`UPDATE "`+this.itemsTableName+`" SET expiredAt=0,staleAt=? WHERE id IN (SELECT id FROM "`+this.itemsTableName+`" WHERE expiredAt>0 AND createdAt<=? AND INSTR("key", ?)=1 LIMIT `+types.String(count)+`)`, utils.UnixTime()+int64(staleLife), utils.UnixTime(), prefix)
2021-05-19 12:07:35 +08:00
if err != nil {
2021-06-13 17:37:57 +08:00
return err
}
affectedRows, err := result.RowsAffected()
if err != nil {
return err
}
if affectedRows < count {
return nil
2021-05-19 12:07:35 +08:00
}
}
}
func (this *FileList) Remove(hash string) error {
if !this.isReady {
2021-06-13 17:51:04 +08:00
return nil
}
// 从缓存中删除
this.memoryCache.Delete(hash)
2021-06-13 17:37:57 +08:00
row := this.selectByHashStmt.QueryRow(hash)
2021-05-19 12:07:35 +08:00
if row.Err() != nil {
return row.Err()
}
var item = &Item{Type: ItemTypeFile}
err := row.Scan(&item.Key, &item.HeaderSize, &item.BodySize, &item.MetaSize, &item.ExpiredAt)
if err != nil {
if err == sql.ErrNoRows {
return nil
}
return err
}
2021-06-13 17:37:57 +08:00
_, err = this.deleteByHashStmt.Exec(hash)
2021-05-19 12:07:35 +08:00
if err != nil {
return err
}
_, err = this.deleteHitByHashStmt.Exec(hash)
if err != nil {
return err
}
2021-05-19 12:07:35 +08:00
atomic.AddInt64(&this.total, -1)
if this.onRemove != nil {
this.onRemove(item)
}
return nil
}
// Purge 清理过期的缓存
// count 每次遍历的最大数量,控制此数字可以保证每次清理的时候不用花太多时间
// callback 每次发现过期key的调用
func (this *FileList) Purge(count int, callback func(hash string) error) (int, error) {
if !this.isReady {
return 0, nil
2021-06-13 17:51:04 +08:00
}
2021-05-19 12:07:35 +08:00
if count <= 0 {
count = 1000
}
2021-06-13 17:37:57 +08:00
rows, err := this.purgeStmt.Query(time.Now().Unix(), count)
if err != nil {
return 0, err
}
hashStrings := []string{}
var countFound = 0
for rows.Next() {
var hash string
err = rows.Scan(&hash)
if err != nil {
_ = rows.Close()
return 0, err
}
hashStrings = append(hashStrings, hash)
countFound++
}
_ = rows.Close() // 不能使用defer防止读写冲突
// 不在 rows.Next() 循环中操作是为了避免死锁
for _, hash := range hashStrings {
err = this.Remove(hash)
if err != nil {
return 0, err
}
err = callback(hash)
if err != nil {
return 0, err
}
}
return countFound, nil
}
func (this *FileList) PurgeLFU(count int, callback func(hash string) error) error {
if !this.isReady {
return nil
}
if count <= 0 {
return nil
}
rows, err := this.lfuHitsStmt.Query(count)
2021-05-19 12:07:35 +08:00
if err != nil {
return err
}
hashStrings := []string{}
var countFound = 0
2021-05-19 12:07:35 +08:00
for rows.Next() {
var hash string
err = rows.Scan(&hash)
if err != nil {
2021-06-16 16:10:02 +08:00
_ = rows.Close()
2021-05-19 12:07:35 +08:00
return err
}
hashStrings = append(hashStrings, hash)
countFound++
2021-05-19 12:07:35 +08:00
}
2021-06-16 16:10:02 +08:00
_ = rows.Close() // 不能使用defer防止读写冲突
2021-05-19 12:07:35 +08:00
// 不在 rows.Next() 循环中操作是为了避免死锁
for _, hash := range hashStrings {
err = this.Remove(hash)
if err != nil {
return err
}
err = callback(hash)
if err != nil {
return err
}
}
return nil
}
func (this *FileList) CleanAll() error {
if !this.isReady {
2021-06-13 17:51:04 +08:00
return nil
}
this.memoryCache.Clean()
2021-06-13 17:37:57 +08:00
_, err := this.deleteAllStmt.Exec()
2021-05-19 12:07:35 +08:00
if err != nil {
return err
}
atomic.StoreInt64(&this.total, 0)
return nil
}
func (this *FileList) Stat(check func(hash string) bool) (*Stat, error) {
if !this.isReady {
2021-06-13 17:51:04 +08:00
return &Stat{}, nil
}
2021-05-19 12:07:35 +08:00
// 这里不设置过期时间、不使用 check 函数,目的是让查询更快速一些
row := this.statStmt.QueryRow()
2021-05-19 12:07:35 +08:00
if row.Err() != nil {
return nil, row.Err()
}
stat := &Stat{}
err := row.Scan(&stat.Count, &stat.Size, &stat.ValueSize)
if err != nil {
return nil, err
}
return stat, nil
}
// Count 总数量
// 常用的方法,所以避免直接查询数据库
func (this *FileList) Count() (int64, error) {
return atomic.LoadInt64(&this.total), nil
}
// IncreaseHit 增加点击量
func (this *FileList) IncreaseHit(hash string) error {
var week = timeutil.Format("YW")
_, err := this.increaseHitStmt.Exec(hash, week, week, week, week)
return err
}
2021-05-19 12:07:35 +08:00
// OnAdd 添加事件
func (this *FileList) OnAdd(f func(item *Item)) {
this.onAdd = f
}
// OnRemove 删除事件
func (this *FileList) OnRemove(f func(item *Item)) {
this.onRemove = f
}
2021-06-13 17:37:57 +08:00
func (this *FileList) Close() error {
2021-06-13 17:51:04 +08:00
this.isClosed = true
this.isReady = false
2021-06-13 17:51:04 +08:00
this.memoryCache.Destroy()
2021-06-13 17:37:57 +08:00
if this.db != nil {
2021-06-13 17:51:04 +08:00
_ = this.existsByHashStmt.Close()
_ = this.insertStmt.Close()
_ = this.selectByHashStmt.Close()
_ = this.deleteByHashStmt.Close()
_ = this.statStmt.Close()
_ = this.purgeStmt.Close()
_ = this.deleteAllStmt.Close()
_ = this.insertHitStmt.Close()
_ = this.increaseHitStmt.Close()
_ = this.deleteHitByHashStmt.Close()
_ = this.lfuHitsStmt.Close()
2021-06-13 17:37:57 +08:00
return this.db.Close()
}
return nil
}
2021-06-17 21:13:21 +08:00
// 初始化
func (this *FileList) initTables(db *sql.DB, times int) error {
{
2021-12-16 17:27:21 +08:00
// expiredAt - 过期时间,用来判断有无过期
// staleAt - 陈旧最大时间,用来清理缓存
_, err := db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.itemsTableName + `" (
2021-06-17 21:13:21 +08:00
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"hash" varchar(32),
"key" varchar(1024),
"headerSize" integer DEFAULT 0,
"bodySize" integer DEFAULT 0,
"metaSize" integer DEFAULT 0,
"expiredAt" integer DEFAULT 0,
2021-12-16 17:27:21 +08:00
"staleAt" integer DEFAULT 0,
2021-06-17 21:13:21 +08:00
"createdAt" integer DEFAULT 0,
"host" varchar(128),
"serverId" integer
);
CREATE INDEX IF NOT EXISTS "createdAt"
ON "` + this.itemsTableName + `" (
"createdAt" ASC
);
CREATE INDEX IF NOT EXISTS "expiredAt"
ON "` + this.itemsTableName + `" (
"expiredAt" ASC
);
2021-12-16 17:27:21 +08:00
CREATE INDEX IF NOT EXISTS "staleAt"
ON "` + this.itemsTableName + `" (
"staleAt" ASC
);
2021-06-17 21:13:21 +08:00
CREATE UNIQUE INDEX IF NOT EXISTS "hash"
ON "` + this.itemsTableName + `" (
"hash" ASC
);
CREATE INDEX IF NOT EXISTS "serverId"
ON "` + this.itemsTableName + `" (
"serverId" ASC
);
`)
if err != nil {
// 尝试删除重建
if times < 3 {
_, dropErr := db.Exec(`DROP TABLE "` + this.itemsTableName + `"`)
if dropErr == nil {
return this.initTables(db, times+1)
}
return err
2021-06-17 21:13:21 +08:00
}
2021-06-17 21:13:21 +08:00
return err
}
}
2021-06-17 21:13:21 +08:00
{
_, err := db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.hitsTableName + `" (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"hash" varchar(32),
"week1Hits" integer DEFAULT 0,
"week2Hits" integer DEFAULT 0,
"week" varchar(6)
);
CREATE UNIQUE INDEX IF NOT EXISTS "hits_hash"
ON "` + this.hitsTableName + `" (
"hash" ASC
);
`)
if err != nil {
// 尝试删除重建
if times < 3 {
_, dropErr := db.Exec(`DROP TABLE "` + this.hitsTableName + `"`)
if dropErr == nil {
return this.initTables(db, times+1)
}
return err
}
return err
}
2021-06-17 21:13:21 +08:00
}
return nil
}
// 删除过期不用的表格
2021-06-13 17:37:57 +08:00
func (this *FileList) removeOldTables() error {
rows, err := this.db.Query(`SELECT "name" FROM sqlite_master WHERE "type"='table'`)
if err != nil {
return err
}
defer func() {
_ = rows.Close()
}()
for rows.Next() {
var name string
err = rows.Scan(&name)
if err != nil {
return err
}
if lists.ContainsString(this.oldTables, name) {
// 异步执行
goman.New(func() {
2021-06-13 17:37:57 +08:00
remotelogs.Println("CACHE", "remove old table '"+name+"' ...")
_, _ = this.db.Exec(`DROP TABLE "` + name + `"`)
remotelogs.Println("CACHE", "remove old table '"+name+"' done")
})
2021-06-13 17:37:57 +08:00
}
}
return nil
}
2021-12-16 17:27:21 +08:00
func (this *FileList) checkStaleAtField() error {
rows, err := this.db.Query(`SELECT staleAt FROM "` + this.itemsTableName + `"`)
if err != nil {
if strings.Contains(err.Error(), "no such column: staleAt") { // 暂时没有更好的判断方法
_, err = this.db.Exec(`ALTER TABLE "` + this.itemsTableName + `" ADD COLUMN staleAt integer DEFAULT 0`)
if err != nil {
return err
}
_, err = this.db.Exec(`UPDATE "` + this.itemsTableName + `" SET staleAt=expiredAt`)
if err != nil {
return err
}
} else {
return err
}
} else {
_ = rows.Close()
}
return nil
}