mirror of
				https://github.com/TeaOSLab/EdgeNode.git
				synced 2025-11-04 07:40:56 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			496 lines
		
	
	
		
			9.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			496 lines
		
	
	
		
			9.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
 | 
						|
 | 
						|
package caches
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"database/sql"
 | 
						|
	"github.com/TeaOSLab/EdgeNode/internal/goman"
 | 
						|
	"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
 | 
						|
	"github.com/TeaOSLab/EdgeNode/internal/ttlcache"
 | 
						|
	"github.com/TeaOSLab/EdgeNode/internal/utils/fnv"
 | 
						|
	"github.com/iwind/TeaGo/types"
 | 
						|
	_ "github.com/mattn/go-sqlite3"
 | 
						|
	"os"
 | 
						|
	"sync/atomic"
 | 
						|
	"time"
 | 
						|
)
 | 
						|
 | 
						|
const CountFileDB = 20
 | 
						|
 | 
						|
// FileList 文件缓存列表管理
 | 
						|
type FileList struct {
 | 
						|
	dir    string
 | 
						|
	dbList [CountFileDB]*FileListDB
 | 
						|
	total  int64
 | 
						|
 | 
						|
	onAdd    func(item *Item)
 | 
						|
	onRemove func(item *Item)
 | 
						|
 | 
						|
	memoryCache *ttlcache.Cache
 | 
						|
 | 
						|
	// 老数据库地址
 | 
						|
	oldDir string
 | 
						|
}
 | 
						|
 | 
						|
func NewFileList(dir string) ListInterface {
 | 
						|
	return &FileList{
 | 
						|
		dir:         dir,
 | 
						|
		memoryCache: ttlcache.NewCache(),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (this *FileList) SetOldDir(oldDir string) {
 | 
						|
	this.oldDir = oldDir
 | 
						|
}
 | 
						|
 | 
						|
func (this *FileList) Init() error {
 | 
						|
	// 检查目录是否存在
 | 
						|
	_, err := os.Stat(this.dir)
 | 
						|
	if err != nil {
 | 
						|
		err = os.MkdirAll(this.dir, 0777)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		remotelogs.Println("CACHE", "create cache dir '"+this.dir+"'")
 | 
						|
	}
 | 
						|
 | 
						|
	var dir = this.dir
 | 
						|
	if dir == "/" {
 | 
						|
		// 防止sqlite提示authority错误
 | 
						|
		dir = ""
 | 
						|
	}
 | 
						|
 | 
						|
	remotelogs.Println("CACHE", "loading database from '"+dir+"' ...")
 | 
						|
	for i := 0; i < CountFileDB; i++ {
 | 
						|
		var db = NewFileListDB()
 | 
						|
		err = db.Open(dir + "/db-" + types.String(i) + ".db")
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
 | 
						|
		err = db.Init()
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
 | 
						|
		this.dbList[i] = db
 | 
						|
	}
 | 
						|
 | 
						|
	// 读取总数量
 | 
						|
	this.total = 0
 | 
						|
	for _, db := range this.dbList {
 | 
						|
		this.total += db.total
 | 
						|
	}
 | 
						|
 | 
						|
	// 升级老版本数据库
 | 
						|
	goman.New(func() {
 | 
						|
		this.upgradeOldDB()
 | 
						|
	})
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (this *FileList) Reset() error {
 | 
						|
	// 不做任何事情
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (this *FileList) Add(hash string, item *Item) error {
 | 
						|
	var db = this.getDB(hash)
 | 
						|
 | 
						|
	if !db.IsReady() {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	err := db.Add(hash, item)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	atomic.AddInt64(&this.total, 1)
 | 
						|
 | 
						|
	// 这里不增加点击量,以减少对数据库的操作次数
 | 
						|
 | 
						|
	this.memoryCache.Write(hash, 1, item.ExpiredAt)
 | 
						|
 | 
						|
	if this.onAdd != nil {
 | 
						|
		this.onAdd(item)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (this *FileList) Exist(hash string) (bool, error) {
 | 
						|
	var db = this.getDB(hash)
 | 
						|
 | 
						|
	if !db.IsReady() {
 | 
						|
		return false, nil
 | 
						|
	}
 | 
						|
 | 
						|
	item := this.memoryCache.Read(hash)
 | 
						|
	if item != nil {
 | 
						|
		return true, nil
 | 
						|
	}
 | 
						|
 | 
						|
	ctx, cancelCtx := context.WithTimeout(context.Background(), 500*time.Millisecond)
 | 
						|
	rows, err := db.existsByHashStmt.QueryContext(ctx, hash, time.Now().Unix())
 | 
						|
	cancelCtx()
 | 
						|
	if err != nil {
 | 
						|
		if err == context.DeadlineExceeded {
 | 
						|
			return false, nil
 | 
						|
		}
 | 
						|
		return false, err
 | 
						|
	}
 | 
						|
	defer func() {
 | 
						|
		_ = rows.Close()
 | 
						|
	}()
 | 
						|
	if rows.Next() {
 | 
						|
		var expiredAt int64
 | 
						|
		err = rows.Scan(&expiredAt)
 | 
						|
		if err != nil {
 | 
						|
			return false, nil
 | 
						|
		}
 | 
						|
		this.memoryCache.Write(hash, 1, expiredAt)
 | 
						|
		return true, nil
 | 
						|
	}
 | 
						|
	return false, nil
 | 
						|
}
 | 
						|
 | 
						|
// CleanPrefix 清理某个前缀的缓存数据
 | 
						|
func (this *FileList) CleanPrefix(prefix string) error {
 | 
						|
	if len(prefix) == 0 {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	defer func() {
 | 
						|
		this.memoryCache.Clean()
 | 
						|
	}()
 | 
						|
 | 
						|
	for _, db := range this.dbList {
 | 
						|
		err := db.CleanPrefix(prefix)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (this *FileList) Remove(hash string) error {
 | 
						|
	_, err := this.remove(hash)
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
// Purge 清理过期的缓存
 | 
						|
// count 每次遍历的最大数量,控制此数字可以保证每次清理的时候不用花太多时间
 | 
						|
// callback 每次发现过期key的调用
 | 
						|
func (this *FileList) Purge(count int, callback func(hash string) error) (int, error) {
 | 
						|
	count /= CountFileDB
 | 
						|
	if count <= 0 {
 | 
						|
		count = 100
 | 
						|
	}
 | 
						|
 | 
						|
	var countFound = 0
 | 
						|
	for _, db := range this.dbList {
 | 
						|
		hashStrings, err := db.ListExpiredItems(count)
 | 
						|
		if err != nil {
 | 
						|
			return 0, nil
 | 
						|
		}
 | 
						|
		countFound += len(hashStrings)
 | 
						|
 | 
						|
		// 不在 rows.Next() 循环中操作是为了避免死锁
 | 
						|
		for _, hash := range hashStrings {
 | 
						|
			err = this.Remove(hash)
 | 
						|
			if err != nil {
 | 
						|
				return 0, err
 | 
						|
			}
 | 
						|
 | 
						|
			err = callback(hash)
 | 
						|
			if err != nil {
 | 
						|
				return 0, err
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return countFound, nil
 | 
						|
}
 | 
						|
 | 
						|
func (this *FileList) PurgeLFU(count int, callback func(hash string) error) error {
 | 
						|
	count /= CountFileDB
 | 
						|
	if count <= 0 {
 | 
						|
		count = 100
 | 
						|
	}
 | 
						|
 | 
						|
	for _, db := range this.dbList {
 | 
						|
		hashStrings, err := db.ListLFUItems(count)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
 | 
						|
		// 不在 rows.Next() 循环中操作是为了避免死锁
 | 
						|
		for _, hash := range hashStrings {
 | 
						|
			notFound, err := this.remove(hash)
 | 
						|
			if err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
			if notFound {
 | 
						|
				_, err = db.deleteHitByHashStmt.Exec(hash)
 | 
						|
				if err != nil {
 | 
						|
					return err
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			err = callback(hash)
 | 
						|
			if err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (this *FileList) CleanAll() error {
 | 
						|
	defer this.memoryCache.Clean()
 | 
						|
 | 
						|
	for _, db := range this.dbList {
 | 
						|
		err := db.CleanAll()
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	atomic.StoreInt64(&this.total, 0)
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (this *FileList) Stat(check func(hash string) bool) (*Stat, error) {
 | 
						|
	var result = &Stat{}
 | 
						|
 | 
						|
	for _, db := range this.dbList {
 | 
						|
		if !db.IsReady() {
 | 
						|
			return &Stat{}, nil
 | 
						|
		}
 | 
						|
 | 
						|
		// 这里不设置过期时间、不使用 check 函数,目的是让查询更快速一些
 | 
						|
		_ = check
 | 
						|
 | 
						|
		row := db.statStmt.QueryRow()
 | 
						|
		if row.Err() != nil {
 | 
						|
			return nil, row.Err()
 | 
						|
		}
 | 
						|
		var stat = &Stat{}
 | 
						|
		err := row.Scan(&stat.Count, &stat.Size, &stat.ValueSize)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		result.Count += stat.Count
 | 
						|
		result.Size += stat.Size
 | 
						|
		result.ValueSize += stat.ValueSize
 | 
						|
	}
 | 
						|
 | 
						|
	return result, nil
 | 
						|
}
 | 
						|
 | 
						|
// Count 总数量
 | 
						|
// 常用的方法,所以避免直接查询数据库
 | 
						|
func (this *FileList) Count() (int64, error) {
 | 
						|
	return atomic.LoadInt64(&this.total), nil
 | 
						|
}
 | 
						|
 | 
						|
// IncreaseHit 增加点击量
 | 
						|
func (this *FileList) IncreaseHit(hash string) error {
 | 
						|
	var db = this.getDB(hash)
 | 
						|
 | 
						|
	if !db.IsReady() {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	return db.IncreaseHit(hash)
 | 
						|
}
 | 
						|
 | 
						|
// OnAdd 添加事件
 | 
						|
func (this *FileList) OnAdd(f func(item *Item)) {
 | 
						|
	this.onAdd = f
 | 
						|
}
 | 
						|
 | 
						|
// OnRemove 删除事件
 | 
						|
func (this *FileList) OnRemove(f func(item *Item)) {
 | 
						|
	this.onRemove = f
 | 
						|
}
 | 
						|
 | 
						|
func (this *FileList) Close() error {
 | 
						|
	this.memoryCache.Destroy()
 | 
						|
 | 
						|
	for _, db := range this.dbList {
 | 
						|
		if db != nil {
 | 
						|
			_ = db.Close()
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (this *FileList) GetDBIndex(hash string) uint64 {
 | 
						|
	return fnv.HashString(hash) % CountFileDB
 | 
						|
}
 | 
						|
 | 
						|
func (this *FileList) getDB(hash string) *FileListDB {
 | 
						|
	return this.dbList[fnv.HashString(hash)%CountFileDB]
 | 
						|
}
 | 
						|
 | 
						|
func (this *FileList) remove(hash string) (notFound bool, err error) {
 | 
						|
	var db = this.getDB(hash)
 | 
						|
 | 
						|
	if !db.IsReady() {
 | 
						|
		return false, nil
 | 
						|
	}
 | 
						|
 | 
						|
	// 从缓存中删除
 | 
						|
	this.memoryCache.Delete(hash)
 | 
						|
 | 
						|
	var row = db.selectByHashStmt.QueryRow(hash)
 | 
						|
	if row.Err() != nil {
 | 
						|
		if row.Err() == sql.ErrNoRows {
 | 
						|
			return true, nil
 | 
						|
		}
 | 
						|
		return false, row.Err()
 | 
						|
	}
 | 
						|
 | 
						|
	var item = &Item{Type: ItemTypeFile}
 | 
						|
	err = row.Scan(&item.Key, &item.HeaderSize, &item.BodySize, &item.MetaSize, &item.ExpiredAt)
 | 
						|
	if err != nil {
 | 
						|
		if err == sql.ErrNoRows {
 | 
						|
			return true, nil
 | 
						|
		}
 | 
						|
		return false, err
 | 
						|
	}
 | 
						|
 | 
						|
	_, err = db.deleteByHashStmt.Exec(hash)
 | 
						|
	if err != nil {
 | 
						|
		return false, err
 | 
						|
	}
 | 
						|
 | 
						|
	atomic.AddInt64(&this.total, -1)
 | 
						|
 | 
						|
	_, err = db.deleteHitByHashStmt.Exec(hash)
 | 
						|
	if err != nil {
 | 
						|
		return false, err
 | 
						|
	}
 | 
						|
 | 
						|
	if this.onRemove != nil {
 | 
						|
		this.onRemove(item)
 | 
						|
	}
 | 
						|
 | 
						|
	return false, nil
 | 
						|
}
 | 
						|
 | 
						|
// 升级老版本数据库
 | 
						|
func (this *FileList) upgradeOldDB() {
 | 
						|
	if len(this.oldDir) == 0 {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	_ = this.UpgradeV3(this.oldDir, false)
 | 
						|
}
 | 
						|
 | 
						|
func (this *FileList) UpgradeV3(oldDir string, brokenOnError bool) error {
 | 
						|
	// index.db
 | 
						|
	var indexDBPath = oldDir + "/index.db"
 | 
						|
	_, err := os.Stat(indexDBPath)
 | 
						|
	if err != nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	remotelogs.Println("CACHE", "upgrading local database from '"+oldDir+"' ...")
 | 
						|
 | 
						|
	defer func() {
 | 
						|
		_ = os.Remove(indexDBPath)
 | 
						|
		remotelogs.Println("CACHE", "upgrading local database finished")
 | 
						|
	}()
 | 
						|
 | 
						|
	db, err := sql.Open("sqlite3", "file:"+indexDBPath+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF")
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	defer func() {
 | 
						|
		_ = db.Close()
 | 
						|
	}()
 | 
						|
 | 
						|
	var isFinished = false
 | 
						|
	var offset = 0
 | 
						|
	var count = 10000
 | 
						|
 | 
						|
	for {
 | 
						|
		if isFinished {
 | 
						|
			break
 | 
						|
		}
 | 
						|
 | 
						|
		err = func() error {
 | 
						|
			defer func() {
 | 
						|
				offset += count
 | 
						|
			}()
 | 
						|
 | 
						|
			rows, err := db.Query(`SELECT "hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "staleAt", "createdAt", "host", "serverId" FROM "cacheItems_v3" ORDER BY "id" ASC LIMIT ?, ?`, offset, count)
 | 
						|
			if err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
			defer func() {
 | 
						|
				_ = rows.Close()
 | 
						|
			}()
 | 
						|
 | 
						|
			var hash = ""
 | 
						|
			var key = ""
 | 
						|
			var headerSize int64
 | 
						|
			var bodySize int64
 | 
						|
			var metaSize int64
 | 
						|
			var expiredAt int64
 | 
						|
			var staleAt int64
 | 
						|
			var createdAt int64
 | 
						|
			var host string
 | 
						|
			var serverId int64
 | 
						|
 | 
						|
			isFinished = true
 | 
						|
 | 
						|
			for rows.Next() {
 | 
						|
				isFinished = false
 | 
						|
 | 
						|
				err = rows.Scan(&hash, &key, &headerSize, &bodySize, &metaSize, &expiredAt, &staleAt, &createdAt, &host, &serverId)
 | 
						|
				if err != nil {
 | 
						|
					if brokenOnError {
 | 
						|
						return err
 | 
						|
					}
 | 
						|
					return nil
 | 
						|
				}
 | 
						|
 | 
						|
				err = this.Add(hash, &Item{
 | 
						|
					Type:       ItemTypeFile,
 | 
						|
					Key:        key,
 | 
						|
					ExpiredAt:  expiredAt,
 | 
						|
					StaleAt:    staleAt,
 | 
						|
					HeaderSize: headerSize,
 | 
						|
					BodySize:   bodySize,
 | 
						|
					MetaSize:   metaSize,
 | 
						|
					Host:       host,
 | 
						|
					ServerId:   serverId,
 | 
						|
					Week1Hits:  0,
 | 
						|
					Week2Hits:  0,
 | 
						|
					Week:       0,
 | 
						|
				})
 | 
						|
				if err != nil {
 | 
						|
					if brokenOnError {
 | 
						|
						return err
 | 
						|
					}
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			return nil
 | 
						|
		}()
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
 | 
						|
		time.Sleep(1 * time.Second)
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 |