实现stale cache读取

This commit is contained in:
刘祥超
2021-12-16 17:27:21 +08:00
parent 6bedc97c95
commit a6d711c2a0
16 changed files with 164 additions and 59 deletions

View File

@@ -22,6 +22,7 @@ type Item struct {
Type ItemType `json:"type"`
Key string `json:"key"`
ExpiredAt int64 `json:"expiredAt"`
StaleAt int64 `json:"staleAt"`
HeaderSize int64 `json:"headerSize"`
BodySize int64 `json:"bodySize"`
MetaSize int64 `json:"metaSize"`

View File

@@ -13,6 +13,7 @@ import (
_ "github.com/mattn/go-sqlite3"
"os"
"strconv"
"strings"
"sync/atomic"
"time"
)
@@ -104,6 +105,12 @@ func (this *FileList) Init() error {
return err
}
// 检查staleAt字段
err = this.checkStaleAtField()
if err != nil {
return err
}
// 读取总数量
row := this.db.QueryRow(`SELECT COUNT(*) FROM "` + this.itemsTableName + `"`)
if row.Err() != nil {
@@ -122,7 +129,7 @@ func (this *FileList) Init() error {
return err
}
this.insertStmt, err = this.db.Prepare(`INSERT INTO "` + this.itemsTableName + `" ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "host", "serverId", "createdAt") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`)
this.insertStmt, err = this.db.Prepare(`INSERT INTO "` + this.itemsTableName + `" ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "staleAt", "host", "serverId", "createdAt") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
if err != nil {
return err
}
@@ -142,7 +149,7 @@ func (this *FileList) Init() error {
return err
}
this.purgeStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.itemsTableName + `" WHERE expiredAt<=? LIMIT ?`)
this.purgeStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.itemsTableName + `" WHERE staleAt<=? LIMIT ?`)
if err != nil {
return err
}
@@ -182,7 +189,11 @@ func (this *FileList) Add(hash string, item *Item) error {
return nil
}
_, err := this.insertStmt.Exec(hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt, item.Host, item.ServerId, utils.UnixTime())
if item.StaleAt == 0 {
item.StaleAt = item.ExpiredAt
}
_, err := this.insertStmt.Exec(hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt, item.StaleAt, item.Host, item.ServerId, utils.UnixTime())
if err != nil {
return err
}
@@ -474,6 +485,8 @@ func (this *FileList) Close() error {
// 初始化
func (this *FileList) initTables(db *sql.DB, times int) error {
{
// expiredAt - 过期时间,用来判断有无过期
// staleAt - 陈旧最大时间,用来清理缓存
_, err := db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.itemsTableName + `" (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"hash" varchar(32),
@@ -482,6 +495,7 @@ func (this *FileList) initTables(db *sql.DB, times int) error {
"bodySize" integer DEFAULT 0,
"metaSize" integer DEFAULT 0,
"expiredAt" integer DEFAULT 0,
"staleAt" integer DEFAULT 0,
"createdAt" integer DEFAULT 0,
"host" varchar(128),
"serverId" integer
@@ -497,6 +511,11 @@ ON "` + this.itemsTableName + `" (
"expiredAt" ASC
);
CREATE INDEX IF NOT EXISTS "staleAt"
ON "` + this.itemsTableName + `" (
"staleAt" ASC
);
CREATE UNIQUE INDEX IF NOT EXISTS "hash"
ON "` + this.itemsTableName + `" (
"hash" ASC
@@ -579,3 +598,26 @@ func (this *FileList) removeOldTables() error {
}
return nil
}
func (this *FileList) checkStaleAtField() error {
rows, err := this.db.Query(`SELECT staleAt FROM "` + this.itemsTableName + `"`)
if err != nil {
if strings.Contains(err.Error(), "no such column: staleAt") { // 暂时没有更好的判断方法
_, err = this.db.Exec(`ALTER TABLE "` + this.itemsTableName + `" ADD COLUMN staleAt integer DEFAULT 0`)
if err != nil {
return err
}
_, err = this.db.Exec(`UPDATE "` + this.itemsTableName + `" SET staleAt=expiredAt`)
if err != nil {
return err
}
} else {
return err
}
} else {
_ = rows.Close()
}
return nil
}

View File

@@ -204,14 +204,19 @@ func (this *FileStorage) Init() error {
return nil
}
func (this *FileStorage) OpenReader(key string) (Reader, error) {
return this.openReader(key, true)
func (this *FileStorage) OpenReader(key string, useStale bool) (Reader, error) {
return this.openReader(key, true, useStale)
}
func (this *FileStorage) openReader(key string, allowMemory bool) (Reader, error) {
func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool) (Reader, error) {
// 使用陈旧缓存的时候,我们认为是短暂的,只需要从文件里检查即可
if useStale {
allowMemory = false
}
// 先尝试内存缓存
if allowMemory && this.memoryStorage != nil {
reader, err := this.memoryStorage.OpenReader(key)
reader, err := this.memoryStorage.OpenReader(key, useStale)
if err == nil {
return reader, err
}
@@ -219,6 +224,17 @@ func (this *FileStorage) openReader(key string, allowMemory bool) (Reader, error
hash, path := this.keyPath(key)
// 检查文件记录是否已过期
if !useStale {
exists, err := this.list.Exist(hash)
if err != nil {
return nil, err
}
if !exists {
return nil, ErrNotFound
}
}
// TODO 尝试使用mmap加快读取速度
var isOk = false
fp, err := os.OpenFile(path, os.O_RDONLY, 0444)
@@ -235,15 +251,6 @@ func (this *FileStorage) openReader(key string, allowMemory bool) (Reader, error
}
}()
// 检查文件记录是否已过期
exists, err := this.list.Exist(hash)
if err != nil {
return nil, err
}
if !exists {
return nil, ErrNotFound
}
reader := NewFileReader(fp)
if err != nil {
return nil, err
@@ -923,7 +930,7 @@ func (this *FileStorage) hotLoop() {
this.hotMap = map[string]*HotItem{}
this.hotMapLocker.Unlock()
// 取Top10
// 取Top10写入内存
if len(result) > 0 {
sort.Slice(result, func(i, j int) bool {
return result[i].Hits > result[j].Hits
@@ -937,7 +944,7 @@ func (this *FileStorage) hotLoop() {
var buf = make([]byte, 32*1024)
for _, item := range result[:size] {
reader, err := this.openReader(item.Key, false)
reader, err := this.openReader(item.Key, false, false)
if err != nil {
continue
}

View File

@@ -10,7 +10,7 @@ type StorageInterface interface {
Init() error
// OpenReader 读取缓存
OpenReader(key string) (Reader, error)
OpenReader(key string, useStale bool) (reader Reader, err error)
// OpenWriter 打开缓存写入器等待写入
OpenWriter(key string, expiredAt int64, status int) (Writer, error)

View File

@@ -105,7 +105,7 @@ func (this *MemoryStorage) Init() error {
}
// OpenReader 读取缓存
func (this *MemoryStorage) OpenReader(key string) (Reader, error) {
func (this *MemoryStorage) OpenReader(key string, useStale bool) (Reader, error) {
hash := this.hash(key)
this.locker.RLock()
@@ -115,7 +115,7 @@ func (this *MemoryStorage) OpenReader(key string) (Reader, error) {
return nil, ErrNotFound
}
if item.ExpiredAt > utils.UnixTime() {
if useStale || (item.ExpiredAt > utils.UnixTime()) {
reader := NewMemoryReader(item)
err := reader.Init()
if err != nil {