Files
EdgeNode/internal/caches/list_file.go

424 lines
9.1 KiB
Go
Raw Normal View History

2021-05-19 12:07:35 +08:00
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package caches
import (
"database/sql"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
2021-06-14 19:55:06 +08:00
"github.com/TeaOSLab/EdgeNode/internal/utils"
2021-06-13 17:37:57 +08:00
"github.com/iwind/TeaGo/lists"
2021-05-19 12:07:35 +08:00
_ "github.com/mattn/go-sqlite3"
"os"
2021-06-13 17:37:57 +08:00
"strconv"
2021-05-19 12:07:35 +08:00
"sync/atomic"
"time"
)
// FileList 文件缓存列表管理
type FileList struct {
dir string
db *sql.DB
total int64
onAdd func(item *Item)
onRemove func(item *Item)
2021-06-13 17:37:57 +08:00
existsByHashStmt *sql.Stmt // 根据hash检查是否存在
insertStmt *sql.Stmt // 写入数据
selectByHashStmt *sql.Stmt // 使用hash查询数据
deleteByHashStmt *sql.Stmt // 根据hash删除数据
statStmt *sql.Stmt // 统计
purgeStmt *sql.Stmt // 清理
deleteAllStmt *sql.Stmt // 删除所有数据
oldTables []string
itemsTableName string
2021-06-13 17:51:04 +08:00
isClosed bool
2021-05-19 12:07:35 +08:00
}
func NewFileList(dir string) ListInterface {
return &FileList{dir: dir}
}
func (this *FileList) Init() error {
// 检查目录是否存在
_, err := os.Stat(this.dir)
if err != nil {
err = os.MkdirAll(this.dir, 0777)
if err != nil {
return err
}
remotelogs.Println("CACHE", "create cache dir '"+this.dir+"'")
}
2021-06-13 17:37:57 +08:00
this.itemsTableName = "cacheItems_v2"
db, err := sql.Open("sqlite3", "file:"+this.dir+"/index.db?cache=shared&mode=rwc&_journal_mode=WAL")
2021-05-19 12:07:35 +08:00
if err != nil {
return err
}
db.SetMaxOpenConns(1)
2021-06-13 17:37:57 +08:00
this.db = db
2021-05-19 12:07:35 +08:00
2021-06-13 17:37:57 +08:00
// 清除旧表
this.oldTables = []string{
"cacheItems",
}
err = this.removeOldTables()
2021-05-19 12:07:35 +08:00
if err != nil {
2021-06-13 17:37:57 +08:00
remotelogs.Warn("CACHE", "clean old tables failed: "+err.Error())
2021-05-19 12:07:35 +08:00
}
2021-06-13 17:37:57 +08:00
// TODO 耗时过长,暂时不整理数据库
/**_, err = db.Exec("VACUUM")
if err != nil {
return err
}**/
2021-05-19 12:07:35 +08:00
// 创建
2021-06-17 21:13:21 +08:00
err = this.initTables(db, 1)
2021-05-19 12:07:35 +08:00
if err != nil {
return err
}
// 读取总数量
2021-06-13 17:37:57 +08:00
row := this.db.QueryRow(`SELECT COUNT(*) FROM "` + this.itemsTableName + `"`)
2021-05-19 12:07:35 +08:00
if row.Err() != nil {
return row.Err()
}
var total int64
err = row.Scan(&total)
if err != nil {
return err
}
this.total = total
2021-06-13 17:37:57 +08:00
// 常用语句
this.existsByHashStmt, err = this.db.Prepare(`SELECT "bodySize" FROM "` + this.itemsTableName + `" WHERE "hash"=? AND expiredAt>? LIMIT 1`)
if err != nil {
return err
}
2021-06-14 19:55:06 +08:00
this.insertStmt, err = this.db.Prepare(`INSERT INTO "` + this.itemsTableName + `" ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "host", "serverId", "createdAt") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`)
2021-06-13 17:37:57 +08:00
if err != nil {
return err
}
this.selectByHashStmt, err = this.db.Prepare(`SELECT "key", "headerSize", "bodySize", "metaSize", "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? LIMIT 1`)
if err != nil {
return err
}
this.deleteByHashStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `" WHERE "hash"=?`)
if err != nil {
return err
}
this.statStmt, err = this.db.Prepare(`SELECT COUNT(*), IFNULL(SUM(headerSize+bodySize+metaSize), 0), IFNULL(SUM(headerSize+bodySize), 0) FROM "` + this.itemsTableName + `" WHERE expiredAt>?`)
if err != nil {
return err
}
this.purgeStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.itemsTableName + `" WHERE expiredAt<=? LIMIT ?`)
if err != nil {
return err
}
this.deleteAllStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `"`)
if err != nil {
return err
}
2021-05-19 12:07:35 +08:00
return nil
}
func (this *FileList) Reset() error {
2021-06-15 10:47:40 +08:00
// 不做任何事情
2021-05-19 12:07:35 +08:00
return nil
}
func (this *FileList) Add(hash string, item *Item) error {
2021-06-13 17:51:04 +08:00
if this.isClosed {
return nil
}
2021-06-14 19:55:06 +08:00
_, err := this.insertStmt.Exec(hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt, item.Host, item.ServerId, utils.UnixTime())
2021-05-19 12:07:35 +08:00
if err != nil {
return err
}
atomic.AddInt64(&this.total, 1)
if this.onAdd != nil {
this.onAdd(item)
}
return nil
}
func (this *FileList) Exist(hash string) (bool, error) {
2021-06-13 17:51:04 +08:00
if this.isClosed {
return false, nil
}
2021-06-13 17:37:57 +08:00
rows, err := this.existsByHashStmt.Query(hash, time.Now().Unix())
2021-05-19 12:07:35 +08:00
if err != nil {
return false, err
}
2021-06-13 17:37:57 +08:00
defer func() {
_ = rows.Close()
}()
if rows.Next() {
return true, nil
}
return false, nil
2021-05-19 12:07:35 +08:00
}
2021-06-13 17:37:57 +08:00
// CleanPrefix 清理某个前缀的缓存数据
func (this *FileList) CleanPrefix(prefix string) error {
2021-06-13 17:51:04 +08:00
if this.isClosed {
return nil
}
2021-05-19 12:07:35 +08:00
if len(prefix) == 0 {
2021-06-13 17:37:57 +08:00
return nil
2021-05-19 12:07:35 +08:00
}
2021-06-13 17:37:57 +08:00
var count = int64(10000)
for {
2021-06-14 19:55:06 +08:00
result, err := this.db.Exec(`UPDATE "`+this.itemsTableName+`" SET expiredAt=0 WHERE id IN (SELECT id FROM "`+this.itemsTableName+`" WHERE expiredAt>0 AND createdAt<=? AND INSTR("key", ?)==1 LIMIT `+strconv.FormatInt(count, 10)+`)`, utils.UnixTime(), prefix)
2021-05-19 12:07:35 +08:00
if err != nil {
2021-06-13 17:37:57 +08:00
return err
}
affectedRows, err := result.RowsAffected()
if err != nil {
return err
}
if affectedRows < count {
return nil
2021-05-19 12:07:35 +08:00
}
}
}
func (this *FileList) Remove(hash string) error {
2021-06-13 17:51:04 +08:00
if this.isClosed {
return nil
}
2021-06-13 17:37:57 +08:00
row := this.selectByHashStmt.QueryRow(hash)
2021-05-19 12:07:35 +08:00
if row.Err() != nil {
return row.Err()
}
var item = &Item{Type: ItemTypeFile}
err := row.Scan(&item.Key, &item.HeaderSize, &item.BodySize, &item.MetaSize, &item.ExpiredAt)
if err != nil {
if err == sql.ErrNoRows {
return nil
}
return err
}
2021-06-13 17:37:57 +08:00
_, err = this.deleteByHashStmt.Exec(hash)
2021-05-19 12:07:35 +08:00
if err != nil {
return err
}
atomic.AddInt64(&this.total, -1)
if this.onRemove != nil {
this.onRemove(item)
}
return nil
}
// Purge 清理过期的缓存
// count 每次遍历的最大数量,控制此数字可以保证每次清理的时候不用花太多时间
// callback 每次发现过期key的调用
func (this *FileList) Purge(count int, callback func(hash string) error) error {
2021-06-13 17:51:04 +08:00
if this.isClosed {
return nil
}
2021-05-19 12:07:35 +08:00
if count <= 0 {
count = 1000
}
2021-06-13 17:37:57 +08:00
rows, err := this.purgeStmt.Query(time.Now().Unix(), count)
2021-05-19 12:07:35 +08:00
if err != nil {
return err
}
hashStrings := []string{}
for rows.Next() {
var hash string
err = rows.Scan(&hash)
if err != nil {
2021-06-16 16:10:02 +08:00
_ = rows.Close()
2021-05-19 12:07:35 +08:00
return err
}
hashStrings = append(hashStrings, hash)
}
2021-06-16 16:10:02 +08:00
_ = rows.Close() // 不能使用defer防止读写冲突
2021-05-19 12:07:35 +08:00
// 不在 rows.Next() 循环中操作是为了避免死锁
for _, hash := range hashStrings {
err = this.Remove(hash)
if err != nil {
return err
}
err = callback(hash)
if err != nil {
return err
}
}
return nil
}
func (this *FileList) CleanAll() error {
2021-06-13 17:51:04 +08:00
if this.isClosed {
return nil
}
2021-06-13 17:37:57 +08:00
_, err := this.deleteAllStmt.Exec()
2021-05-19 12:07:35 +08:00
if err != nil {
return err
}
atomic.StoreInt64(&this.total, 0)
return nil
}
func (this *FileList) Stat(check func(hash string) bool) (*Stat, error) {
2021-06-13 17:51:04 +08:00
if this.isClosed {
return &Stat{}, nil
}
2021-05-19 12:07:35 +08:00
// 这里不设置过期时间、不使用 check 函数,目的是让查询更快速一些
2021-06-13 17:37:57 +08:00
row := this.statStmt.QueryRow(time.Now().Unix())
2021-05-19 12:07:35 +08:00
if row.Err() != nil {
return nil, row.Err()
}
stat := &Stat{}
err := row.Scan(&stat.Count, &stat.Size, &stat.ValueSize)
if err != nil {
return nil, err
}
return stat, nil
}
// Count 总数量
// 常用的方法,所以避免直接查询数据库
func (this *FileList) Count() (int64, error) {
return atomic.LoadInt64(&this.total), nil
}
// OnAdd 添加事件
func (this *FileList) OnAdd(f func(item *Item)) {
this.onAdd = f
}
// OnRemove 删除事件
func (this *FileList) OnRemove(f func(item *Item)) {
this.onRemove = f
}
2021-06-13 17:37:57 +08:00
func (this *FileList) Close() error {
2021-06-13 17:51:04 +08:00
this.isClosed = true
2021-06-13 17:37:57 +08:00
if this.db != nil {
2021-06-13 17:51:04 +08:00
_ = this.existsByHashStmt.Close()
_ = this.insertStmt.Close()
_ = this.selectByHashStmt.Close()
_ = this.deleteByHashStmt.Close()
_ = this.statStmt.Close()
_ = this.purgeStmt.Close()
_ = this.deleteAllStmt.Close()
2021-06-13 17:37:57 +08:00
return this.db.Close()
}
return nil
}
2021-06-17 21:13:21 +08:00
// 初始化
func (this *FileList) initTables(db *sql.DB, times int) error {
_, err := db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.itemsTableName + `" (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"hash" varchar(32),
"key" varchar(1024),
"headerSize" integer DEFAULT 0,
"bodySize" integer DEFAULT 0,
"metaSize" integer DEFAULT 0,
"expiredAt" integer DEFAULT 0,
"createdAt" integer DEFAULT 0,
"host" varchar(128),
"serverId" integer
);
CREATE INDEX IF NOT EXISTS "createdAt"
ON "` + this.itemsTableName + `" (
"createdAt" ASC
);
CREATE INDEX IF NOT EXISTS "expiredAt"
ON "` + this.itemsTableName + `" (
"expiredAt" ASC
);
CREATE UNIQUE INDEX IF NOT EXISTS "hash"
ON "` + this.itemsTableName + `" (
"hash" ASC
);
CREATE INDEX IF NOT EXISTS "serverId"
ON "` + this.itemsTableName + `" (
"serverId" ASC
);
`)
if err != nil {
// 尝试删除重建
if times < 3 {
_, dropErr := db.Exec(`DROP TABLE "` + this.itemsTableName + `"`)
if dropErr == nil {
return this.initTables(db, times+1)
}
return err
}
return err
}
return nil
}
// 删除过期不用的表格
2021-06-13 17:37:57 +08:00
func (this *FileList) removeOldTables() error {
rows, err := this.db.Query(`SELECT "name" FROM sqlite_master WHERE "type"='table'`)
if err != nil {
return err
}
defer func() {
_ = rows.Close()
}()
for rows.Next() {
var name string
err = rows.Scan(&name)
if err != nil {
return err
}
if lists.ContainsString(this.oldTables, name) {
// 异步执行
go func() {
remotelogs.Println("CACHE", "remove old table '"+name+"' ...")
_, _ = this.db.Exec(`DROP TABLE "` + name + `"`)
remotelogs.Println("CACHE", "remove old table '"+name+"' done")
}()
}
}
return nil
}