缓存数据库升级时从老的数据库转移数据

This commit is contained in:
GoEdgeLab
2021-12-19 18:55:54 +08:00
parent d662c96cfd
commit 78cefd85e5

View File

@@ -4,6 +4,7 @@ package caches
import (
"database/sql"
"errors"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/ttlcache"
@@ -13,7 +14,6 @@ import (
timeutil "github.com/iwind/TeaGo/utils/time"
_ "github.com/mattn/go-sqlite3"
"os"
"strings"
"sync/atomic"
"time"
)
@@ -70,7 +70,7 @@ func (this *FileList) Init() error {
remotelogs.Println("CACHE", "create cache dir '"+this.dir+"'")
}
this.itemsTableName = "cacheItems_v2"
this.itemsTableName = "cacheItems_v3"
this.hitsTableName = "hits"
var dir = this.dir
@@ -82,37 +82,13 @@ func (this *FileList) Init() error {
remotelogs.Println("CACHE", "loading database '"+dbPath+"'")
db, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=shared&mode=rwc&_journal_mode=WAL")
if err != nil {
return err
}
// 检查数据库
_, err = db.Exec(`SELECT * FROM "` + this.itemsTableName + `" LIMIT 1`)
if err != nil {
// 删除重建
remotelogs.Println("CACHE", "rebuilding database '"+dbPath+"'")
_ = db.Close()
this.isClosed = false
_ = os.Remove(dbPath)
db, err = sql.Open("sqlite3", "file:"+dbPath+"?cache=shared&mode=rwc&_journal_mode=WAL")
if err != nil {
return err
}
return errors.New("open database failed: " + err.Error())
}
db.SetMaxOpenConns(1)
this.db = db
// 清除旧表
this.oldTables = []string{
"cacheItems",
}
err = this.removeOldTables()
if err != nil {
remotelogs.Warn("CACHE", "clean old tables failed: "+err.Error())
}
// TODO 耗时过长,暂时不整理数据库
/**_, err = db.Exec("VACUUM")
if err != nil {
@@ -122,13 +98,18 @@ func (this *FileList) Init() error {
// 创建
err = this.initTables(db, 1)
if err != nil {
return err
return errors.New("init tables failed: " + err.Error())
}
// 检查staleAt字段
err = this.checkStaleAtField()
// 清除旧表
// 这个一定要在initTables()之后,因为老的数据需要转移
this.oldTables = []string{
"cacheItems",
"cacheItems_v2",
}
err = this.removeOldTables()
if err != nil {
return err
remotelogs.Warn("CACHE", "clean old tables failed: "+err.Error())
}
// 读取总数量
@@ -508,6 +489,13 @@ func (this *FileList) Close() error {
// 初始化
func (this *FileList) initTables(db *sql.DB, times int) error {
// 检查是否存在
_, err := db.Exec(`SELECT id FROM "` + this.itemsTableName + `" LIMIT 1`)
var notFound = false
if err != nil {
notFound = true
}
{
// expiredAt - 过期时间,用来判断有无过期
// staleAt - 陈旧最大时间,用来清理缓存
@@ -550,6 +538,7 @@ ON "` + this.itemsTableName + `" (
"serverId" ASC
);
`)
if err != nil {
// 尝试删除重建
if times < 3 {
@@ -564,6 +553,19 @@ ON "` + this.itemsTableName + `" (
}
}
// 如果数据为空,从老数据中加载数据
if notFound {
// v2 => v3
remotelogs.Println("CACHE", "transferring old data from v2 to v3 ...")
result, err := db.Exec(`INSERT INTO "` + this.itemsTableName + `" ("id", "hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "createdAt", "host", "serverId", "staleAt") SELECT "id", "hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "createdAt", "host", "serverId", "expiredAt"+600 FROM cacheItems_v2`)
if err != nil {
remotelogs.Println("CACHE", "transfer old data from v2 to v3 failed: "+err.Error())
} else {
count, _ := result.RowsAffected()
remotelogs.Println("CACHE", "transfer old data from v2 to v3 finished, "+types.String(count)+" rows transferred")
}
}
{
_, err := db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.hitsTableName + `" (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
@@ -622,26 +624,3 @@ func (this *FileList) removeOldTables() error {
}
return nil
}
func (this *FileList) checkStaleAtField() error {
rows, err := this.db.Query(`SELECT staleAt FROM "` + this.itemsTableName + `"`)
if err != nil {
if strings.Contains(err.Error(), "no such column: staleAt") { // 暂时没有更好的判断方法
_, err = this.db.Exec(`ALTER TABLE "` + this.itemsTableName + `" ADD COLUMN staleAt integer DEFAULT 0`)
if err != nil {
return err
}
_, err = this.db.Exec(`UPDATE "` + this.itemsTableName + `" SET staleAt=expiredAt`)
if err != nil {
return err
}
} else {
return err
}
} else {
_ = rows.Close()
}
return nil
}