Files
EdgeNode/internal/caches/list_file.go

488 lines
9.2 KiB
Go
Raw Normal View History

2021-05-19 12:07:35 +08:00
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package caches
import (
"database/sql"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/ttlcache"
2022-03-15 18:32:39 +08:00
"github.com/TeaOSLab/EdgeNode/internal/utils/fnv"
2021-12-17 11:54:27 +08:00
"github.com/iwind/TeaGo/types"
2021-05-19 12:07:35 +08:00
_ "github.com/mattn/go-sqlite3"
"os"
2021-05-19 12:07:35 +08:00
"sync/atomic"
"time"
)
2022-03-16 16:20:53 +08:00
const CountFileDB = 20
2022-03-15 18:32:39 +08:00
2021-05-19 12:07:35 +08:00
// FileList 文件缓存列表管理
type FileList struct {
2022-03-15 18:32:39 +08:00
dir string
dbList [CountFileDB]*FileListDB
total int64
2021-05-19 12:07:35 +08:00
onAdd func(item *Item)
onRemove func(item *Item)
2021-06-13 17:37:57 +08:00
memoryCache *ttlcache.Cache
2022-03-15 18:32:39 +08:00
// 老数据库地址
oldDir string
2021-05-19 12:07:35 +08:00
}
func NewFileList(dir string) ListInterface {
return &FileList{
dir: dir,
memoryCache: ttlcache.NewCache(),
}
2021-05-19 12:07:35 +08:00
}
2022-03-15 18:32:39 +08:00
func (this *FileList) SetOldDir(oldDir string) {
this.oldDir = oldDir
}
2021-05-19 12:07:35 +08:00
func (this *FileList) Init() error {
// 检查目录是否存在
_, err := os.Stat(this.dir)
if err != nil {
err = os.MkdirAll(this.dir, 0777)
if err != nil {
return err
}
remotelogs.Println("CACHE", "create cache dir '"+this.dir+"'")
}
var dir = this.dir
if dir == "/" {
// 防止sqlite提示authority错误
dir = ""
}
2021-05-19 12:07:35 +08:00
2022-03-15 18:32:39 +08:00
remotelogs.Println("CACHE", "loading database from '"+dir+"' ...")
for i := 0; i < CountFileDB; i++ {
var db = NewFileListDB()
err = db.Open(dir + "/db-" + types.String(i) + ".db")
if err != nil {
return err
}
2021-06-13 17:37:57 +08:00
2022-03-15 18:32:39 +08:00
err = db.Init()
if err != nil {
return err
}
2021-05-19 12:07:35 +08:00
2022-03-15 18:32:39 +08:00
this.dbList[i] = db
2021-12-16 17:27:21 +08:00
}
2021-05-19 12:07:35 +08:00
// 读取总数量
2022-03-15 18:32:39 +08:00
this.total = 0
for _, db := range this.dbList {
this.total += db.total
}
2022-03-15 18:32:39 +08:00
// 升级老版本数据库
goman.New(func() {
this.upgradeOldDB()
})
2021-05-19 12:07:35 +08:00
return nil
}
func (this *FileList) Reset() error {
2021-06-15 10:47:40 +08:00
// 不做任何事情
2021-05-19 12:07:35 +08:00
return nil
}
func (this *FileList) Add(hash string, item *Item) error {
2022-03-15 18:32:39 +08:00
var db = this.getDB(hash)
2021-06-13 17:51:04 +08:00
2022-03-15 18:32:39 +08:00
if !db.IsReady() {
return nil
2021-12-16 17:27:21 +08:00
}
err := db.AddAsync(hash, item)
2021-05-19 12:07:35 +08:00
if err != nil {
return err
}
2022-03-15 18:32:39 +08:00
atomic.AddInt64(&this.total, 1)
// 这里不增加点击量,以减少对数据库的操作次数
2022-01-11 16:02:41 +08:00
this.memoryCache.Write(hash, 1, item.ExpiredAt)
2021-05-19 12:07:35 +08:00
if this.onAdd != nil {
this.onAdd(item)
}
return nil
}
func (this *FileList) Exist(hash string) (bool, error) {
2022-03-15 18:32:39 +08:00
var db = this.getDB(hash)
if !db.IsReady() {
2021-06-13 17:51:04 +08:00
return false, nil
}
2022-03-20 16:20:40 +08:00
var item = this.memoryCache.Read(hash)
if item != nil {
return true, nil
}
2022-03-20 16:20:40 +08:00
var row = db.existsByHashStmt.QueryRow(hash, time.Now().Unix())
if row.Err() != nil {
return false, nil
}
var expiredAt int64
err := row.Scan(&expiredAt)
2021-05-19 12:07:35 +08:00
if err != nil {
2022-03-20 16:20:40 +08:00
if err == sql.ErrNoRows {
err = nil
2022-03-16 16:20:53 +08:00
}
2021-05-19 12:07:35 +08:00
return false, err
}
2022-03-20 16:20:40 +08:00
this.memoryCache.Write(hash, 1, expiredAt)
return true, nil
2021-05-19 12:07:35 +08:00
}
2021-06-13 17:37:57 +08:00
// CleanPrefix 清理某个前缀的缓存数据
func (this *FileList) CleanPrefix(prefix string) error {
2021-05-19 12:07:35 +08:00
if len(prefix) == 0 {
2021-06-13 17:37:57 +08:00
return nil
2021-05-19 12:07:35 +08:00
}
2021-08-26 15:48:09 +08:00
defer func() {
this.memoryCache.Clean()
}()
2022-03-15 18:32:39 +08:00
for _, db := range this.dbList {
err := db.CleanPrefix(prefix)
2021-05-19 12:07:35 +08:00
if err != nil {
2021-06-13 17:37:57 +08:00
return err
}
2021-05-19 12:07:35 +08:00
}
2022-03-15 18:32:39 +08:00
return nil
2021-05-19 12:07:35 +08:00
}
func (this *FileList) Remove(hash string) error {
2022-03-15 18:32:39 +08:00
_, err := this.remove(hash)
return err
2021-05-19 12:07:35 +08:00
}
// Purge 清理过期的缓存
// count 每次遍历的最大数量,控制此数字可以保证每次清理的时候不用花太多时间
// callback 每次发现过期key的调用
func (this *FileList) Purge(count int, callback func(hash string) error) (int, error) {
2022-03-15 18:32:39 +08:00
count /= CountFileDB
2021-05-19 12:07:35 +08:00
if count <= 0 {
2022-03-15 18:32:39 +08:00
count = 100
2021-05-19 12:07:35 +08:00
}
var countFound = 0
2022-03-15 18:32:39 +08:00
for _, db := range this.dbList {
hashStrings, err := db.ListExpiredItems(count)
if err != nil {
2022-03-15 18:32:39 +08:00
return 0, nil
}
2022-03-15 18:32:39 +08:00
countFound += len(hashStrings)
2022-03-15 18:32:39 +08:00
// 不在 rows.Next() 循环中操作是为了避免死锁
for _, hash := range hashStrings {
err = this.Remove(hash)
if err != nil {
return 0, err
}
2022-03-15 18:32:39 +08:00
err = callback(hash)
if err != nil {
return 0, err
}
}
}
return countFound, nil
}
func (this *FileList) PurgeLFU(count int, callback func(hash string) error) error {
2022-03-15 18:32:39 +08:00
count /= CountFileDB
if count <= 0 {
2022-03-15 18:32:39 +08:00
count = 100
2021-05-19 12:07:35 +08:00
}
2022-03-15 18:32:39 +08:00
for _, db := range this.dbList {
hashStrings, err := db.ListLFUItems(count)
2021-05-19 12:07:35 +08:00
if err != nil {
return err
}
2022-03-15 18:32:39 +08:00
// 不在 rows.Next() 循环中操作是为了避免死锁
for _, hash := range hashStrings {
notFound, err := this.remove(hash)
if err != nil {
return err
}
if notFound {
_, err = db.deleteHitByHashStmt.Exec(hash)
if err != nil {
return db.WrapError(err)
2022-03-15 18:32:39 +08:00
}
}
2021-05-19 12:07:35 +08:00
2022-03-15 18:32:39 +08:00
err = callback(hash)
if err != nil {
return err
}
2021-05-19 12:07:35 +08:00
}
}
return nil
}
func (this *FileList) CleanAll() error {
2022-03-15 18:32:39 +08:00
defer this.memoryCache.Clean()
2021-06-13 17:51:04 +08:00
2022-03-15 18:32:39 +08:00
for _, db := range this.dbList {
err := db.CleanAll()
if err != nil {
return err
}
2021-05-19 12:07:35 +08:00
}
2022-03-15 18:32:39 +08:00
2021-05-19 12:07:35 +08:00
atomic.StoreInt64(&this.total, 0)
2022-03-15 18:32:39 +08:00
2021-05-19 12:07:35 +08:00
return nil
}
func (this *FileList) Stat(check func(hash string) bool) (*Stat, error) {
2022-03-15 18:32:39 +08:00
var result = &Stat{}
2021-06-13 17:51:04 +08:00
2022-03-15 18:32:39 +08:00
for _, db := range this.dbList {
if !db.IsReady() {
return &Stat{}, nil
}
// 这里不设置过期时间、不使用 check 函数,目的是让查询更快速一些
_ = check
var row = db.statStmt.QueryRow()
2022-03-15 18:32:39 +08:00
if row.Err() != nil {
return nil, row.Err()
}
var stat = &Stat{}
err := row.Scan(&stat.Count, &stat.Size, &stat.ValueSize)
if err != nil {
return nil, err
}
result.Count += stat.Count
result.Size += stat.Size
result.ValueSize += stat.ValueSize
2021-05-19 12:07:35 +08:00
}
2022-03-15 18:32:39 +08:00
return result, nil
2021-05-19 12:07:35 +08:00
}
// Count 总数量
// 常用的方法,所以避免直接查询数据库
func (this *FileList) Count() (int64, error) {
return atomic.LoadInt64(&this.total), nil
}
// IncreaseHit 增加点击量
func (this *FileList) IncreaseHit(hash string) error {
2022-03-15 18:32:39 +08:00
var db = this.getDB(hash)
if !db.IsReady() {
return nil
}
return db.IncreaseHit(hash)
}
2021-05-19 12:07:35 +08:00
// OnAdd 添加事件
func (this *FileList) OnAdd(f func(item *Item)) {
this.onAdd = f
}
// OnRemove 删除事件
func (this *FileList) OnRemove(f func(item *Item)) {
this.onRemove = f
}
2021-06-13 17:37:57 +08:00
func (this *FileList) Close() error {
this.memoryCache.Destroy()
2022-03-15 18:32:39 +08:00
for _, db := range this.dbList {
if db != nil {
_ = db.Close()
}
2021-06-13 17:37:57 +08:00
}
2022-03-15 18:32:39 +08:00
2021-06-13 17:37:57 +08:00
return nil
}
2022-03-15 18:32:39 +08:00
func (this *FileList) GetDBIndex(hash string) uint64 {
2022-03-16 17:06:26 +08:00
return fnv.HashString(hash) % CountFileDB
2022-03-15 18:32:39 +08:00
}
2022-03-15 18:32:39 +08:00
func (this *FileList) getDB(hash string) *FileListDB {
2022-03-16 17:06:26 +08:00
return this.dbList[fnv.HashString(hash)%CountFileDB]
2022-03-15 18:32:39 +08:00
}
2022-03-15 18:32:39 +08:00
func (this *FileList) remove(hash string) (notFound bool, err error) {
var db = this.getDB(hash)
if !db.IsReady() {
return false, nil
}
// 从缓存中删除
this.memoryCache.Delete(hash)
var row = db.selectByHashStmt.QueryRow(hash)
if row.Err() != nil {
if row.Err() == sql.ErrNoRows {
return true, nil
2021-06-17 21:13:21 +08:00
}
2022-03-15 18:32:39 +08:00
return false, row.Err()
}
2021-06-17 21:13:21 +08:00
2022-03-15 18:32:39 +08:00
var item = &Item{Type: ItemTypeFile}
err = row.Scan(&item.Key, &item.HeaderSize, &item.BodySize, &item.MetaSize, &item.ExpiredAt)
if err != nil {
if err == sql.ErrNoRows {
return true, nil
}
2022-03-15 18:32:39 +08:00
return false, err
}
err = db.DeleteAsync(hash)
2022-03-15 18:32:39 +08:00
if err != nil {
return false, db.WrapError(err)
2022-03-15 18:32:39 +08:00
}
2022-03-15 18:32:39 +08:00
atomic.AddInt64(&this.total, -1)
_, err = db.deleteHitByHashStmt.Exec(hash)
if err != nil {
return false, db.WrapError(err)
2021-06-17 21:13:21 +08:00
}
2022-03-15 18:32:39 +08:00
if this.onRemove != nil {
this.onRemove(item)
}
return false, nil
}
// 升级老版本数据库
func (this *FileList) upgradeOldDB() {
if len(this.oldDir) == 0 {
return
}
_ = this.UpgradeV3(this.oldDir, false)
2021-06-17 21:13:21 +08:00
}
2022-03-15 18:32:39 +08:00
func (this *FileList) UpgradeV3(oldDir string, brokenOnError bool) error {
// index.db
var indexDBPath = oldDir + "/index.db"
_, err := os.Stat(indexDBPath)
if err != nil {
return nil
}
remotelogs.Println("CACHE", "upgrading local database from '"+oldDir+"' ...")
defer func() {
_ = os.Remove(indexDBPath)
remotelogs.Println("CACHE", "upgrading local database finished")
2022-03-15 18:32:39 +08:00
}()
db, err := sql.Open("sqlite3", "file:"+indexDBPath+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF")
2021-06-13 17:37:57 +08:00
if err != nil {
return err
}
2022-03-15 18:32:39 +08:00
2021-06-13 17:37:57 +08:00
defer func() {
2022-03-15 18:32:39 +08:00
_ = db.Close()
2021-06-13 17:37:57 +08:00
}()
2022-03-15 18:32:39 +08:00
var isFinished = false
var offset = 0
var count = 10000
for {
if isFinished {
break
}
err = func() error {
defer func() {
offset += count
}()
2022-03-16 16:20:53 +08:00
rows, err := db.Query(`SELECT "hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "staleAt", "createdAt", "host", "serverId" FROM "cacheItems_v3" ORDER BY "id" ASC LIMIT ?, ?`, offset, count)
2022-03-15 18:32:39 +08:00
if err != nil {
return err
}
defer func() {
_ = rows.Close()
}()
var hash = ""
var key = ""
var headerSize int64
var bodySize int64
var metaSize int64
var expiredAt int64
var staleAt int64
var createdAt int64
var host string
var serverId int64
isFinished = true
for rows.Next() {
isFinished = false
err = rows.Scan(&hash, &key, &headerSize, &bodySize, &metaSize, &expiredAt, &staleAt, &createdAt, &host, &serverId)
if err != nil {
if brokenOnError {
return err
}
return nil
}
err = this.Add(hash, &Item{
Type: ItemTypeFile,
Key: key,
ExpiredAt: expiredAt,
StaleAt: staleAt,
HeaderSize: headerSize,
BodySize: bodySize,
MetaSize: metaSize,
Host: host,
ServerId: serverId,
Week1Hits: 0,
Week2Hits: 0,
Week: 0,
})
if err != nil {
if brokenOnError {
return err
}
}
}
return nil
}()
2021-06-13 17:37:57 +08:00
if err != nil {
return err
}
2022-03-15 18:32:39 +08:00
time.Sleep(1 * time.Second)
2021-06-13 17:37:57 +08:00
}
2022-03-15 18:32:39 +08:00
2021-06-13 17:37:57 +08:00
return nil
}