优化本地数据库性能

This commit is contained in:
GoEdgeLab
2022-03-15 18:32:39 +08:00
parent 240698ff0d
commit cbc97652a5
13 changed files with 940 additions and 515 deletions

View File

@@ -6,11 +6,12 @@ import "errors"
// 常用的几个错误
var (
ErrNotFound = errors.New("cache not found")
ErrFileIsWriting = errors.New("the file is writing")
ErrInvalidRange = errors.New("invalid range")
ErrEntityTooLarge = errors.New("entity too large")
ErrWritingUnavaible = errors.New("writing unavailable")
ErrNotFound = errors.New("cache not found")
ErrFileIsWriting = errors.New("the file is writing")
ErrInvalidRange = errors.New("invalid range")
ErrEntityTooLarge = errors.New("entity too large")
ErrWritingUnavailable = errors.New("writing unavailable")
ErrWritingQueueFull = errors.New("writing queue full")
)
// CapacityError 容量错误
@@ -32,7 +33,10 @@ func CanIgnoreErr(err error) bool {
if err == nil {
return true
}
if err == ErrFileIsWriting || err == ErrEntityTooLarge || err == ErrWritingUnavaible {
if err == ErrFileIsWriting ||
err == ErrEntityTooLarge ||
err == ErrWritingUnavailable ||
err == ErrWritingQueueFull {
return true
}
_, ok := err.(*CapacityError)

View File

@@ -4,54 +4,32 @@ package caches
import (
"database/sql"
"errors"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/ttlcache"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
"github.com/iwind/TeaGo/lists"
"github.com/TeaOSLab/EdgeNode/internal/utils/fnv"
"github.com/iwind/TeaGo/types"
timeutil "github.com/iwind/TeaGo/utils/time"
_ "github.com/mattn/go-sqlite3"
"os"
"sync/atomic"
"time"
)
const CountFileDB = 10
// FileList 文件缓存列表管理
type FileList struct {
dir string
db *dbs.DB
total int64
dir string
dbList [CountFileDB]*FileListDB
total int64
onAdd func(item *Item)
onRemove func(item *Item)
// cacheItems
existsByHashStmt *dbs.Stmt // 根据hash检查是否存在
insertStmt *dbs.Stmt // 写入数据
selectByHashStmt *dbs.Stmt // 使用hash查询数据
deleteByHashStmt *dbs.Stmt // 根据hash删除数据
statStmt *dbs.Stmt // 统计
purgeStmt *dbs.Stmt // 清理
deleteAllStmt *dbs.Stmt // 删除所有数据
// hits
insertHitStmt *dbs.Stmt // 写入数据
increaseHitStmt *dbs.Stmt // 增加点击量
deleteHitByHashStmt *dbs.Stmt // 根据hash删除数据
lfuHitsStmt *dbs.Stmt // 读取老的数据
oldTables []string
itemsTableName string
hitsTableName string
isClosed bool
isReady bool
memoryCache *ttlcache.Cache
// 老数据库地址
oldDir string
}
func NewFileList(dir string) ListInterface {
@@ -61,6 +39,10 @@ func NewFileList(dir string) ListInterface {
}
}
func (this *FileList) SetOldDir(oldDir string) {
this.oldDir = oldDir
}
func (this *FileList) Init() error {
// 检查目录是否存在
_, err := os.Stat(this.dir)
@@ -72,118 +54,38 @@ func (this *FileList) Init() error {
remotelogs.Println("CACHE", "create cache dir '"+this.dir+"'")
}
this.itemsTableName = "cacheItems_v3"
this.hitsTableName = "hits"
var dir = this.dir
if dir == "/" {
// 防止sqlite提示authority错误
dir = ""
}
var dbPath = dir + "/index.db"
remotelogs.Println("CACHE", "loading database '"+dbPath+"'")
db, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF&_cache_size=16000")
if err != nil {
return errors.New("open database failed: " + err.Error())
}
db.SetMaxOpenConns(1)
remotelogs.Println("CACHE", "loading database from '"+dir+"' ...")
for i := 0; i < CountFileDB; i++ {
var db = NewFileListDB()
err = db.Open(dir + "/db-" + types.String(i) + ".db")
if err != nil {
return err
}
this.db = dbs.NewDB(db)
err = db.Init()
if err != nil {
return err
}
if teaconst.EnableDBStat {
this.db.EnableStat(true)
}
// TODO 耗时过长,暂时不整理数据库
/**_, err = db.Exec("VACUUM")
if err != nil {
return err
}**/
// 创建
err = this.initTables(db, 1)
if err != nil {
return errors.New("init tables failed: " + err.Error())
}
// 清除旧表
// 这个一定要在initTables()之后,因为老的数据需要转移
this.oldTables = []string{
"cacheItems",
"cacheItems_v2",
}
err = this.removeOldTables()
if err != nil {
remotelogs.Warn("CACHE", "clean old tables failed: "+err.Error())
this.dbList[i] = db
}
// 读取总数量
row := this.db.QueryRow(`SELECT COUNT(*) FROM "` + this.itemsTableName + `"`)
if row.Err() != nil {
return row.Err()
}
var total int64
err = row.Scan(&total)
if err != nil {
return err
}
this.total = total
// 常用语句
this.existsByHashStmt, err = this.db.Prepare(`SELECT "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? AND expiredAt>? LIMIT 1`)
if err != nil {
return err
this.total = 0
for _, db := range this.dbList {
this.total += db.total
}
this.insertStmt, err = this.db.Prepare(`INSERT INTO "` + this.itemsTableName + `" ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "staleAt", "host", "serverId", "createdAt") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
if err != nil {
return err
}
this.selectByHashStmt, err = this.db.Prepare(`SELECT "key", "headerSize", "bodySize", "metaSize", "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? LIMIT 1`)
if err != nil {
return err
}
this.deleteByHashStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `" WHERE "hash"=?`)
if err != nil {
return err
}
this.statStmt, err = this.db.Prepare(`SELECT COUNT(*), IFNULL(SUM(headerSize+bodySize+metaSize), 0), IFNULL(SUM(headerSize+bodySize), 0) FROM "` + this.itemsTableName + `"`)
if err != nil {
return err
}
this.purgeStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.itemsTableName + `" WHERE staleAt<=? LIMIT ?`)
if err != nil {
return err
}
this.deleteAllStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `"`)
if err != nil {
return err
}
this.insertHitStmt, err = this.db.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?)`)
this.increaseHitStmt, err = this.db.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?) ON CONFLICT("hash") DO UPDATE SET "week1Hits"=IIF("week"=?, "week1Hits", "week2Hits"), "week2Hits"=IIF("week"=?, "week2Hits"+1, 1), "week"=?`)
if err != nil {
return err
}
this.deleteHitByHashStmt, err = this.db.Prepare(`DELETE FROM "` + this.hitsTableName + `" WHERE "hash"=?`)
if err != nil {
return err
}
this.lfuHitsStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.hitsTableName + `" ORDER BY "week" ASC, "week1Hits"+"week2Hits" ASC LIMIT ?`)
if err != nil {
return err
}
this.isReady = true
// 升级老版本数据库
goman.New(func() {
this.upgradeOldDB()
})
return nil
}
@@ -194,24 +96,22 @@ func (this *FileList) Reset() error {
}
func (this *FileList) Add(hash string, item *Item) error {
if !this.isReady {
var db = this.getDB(hash)
if !db.IsReady() {
return nil
}
if item.StaleAt == 0 {
item.StaleAt = item.ExpiredAt
}
// 放入队列
_, err := this.insertStmt.Exec(hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt, item.StaleAt, item.Host, item.ServerId, utils.UnixTime())
err := db.Add(hash, item)
if err != nil {
return err
}
atomic.AddInt64(&this.total, 1)
// 这里不增加点击量,以减少对数据库的操作次数
this.memoryCache.Write(hash, 1, item.ExpiredAt)
atomic.AddInt64(&this.total, 1)
if this.onAdd != nil {
this.onAdd(item)
@@ -220,7 +120,9 @@ func (this *FileList) Add(hash string, item *Item) error {
}
func (this *FileList) Exist(hash string) (bool, error) {
if !this.isReady {
var db = this.getDB(hash)
if !db.IsReady() {
return false, nil
}
@@ -229,7 +131,7 @@ func (this *FileList) Exist(hash string) (bool, error) {
return true, nil
}
rows, err := this.existsByHashStmt.Query(hash, time.Now().Unix())
rows, err := db.existsByHashStmt.Query(hash, time.Now().Unix())
if err != nil {
return false, err
}
@@ -250,10 +152,6 @@ func (this *FileList) Exist(hash string) (bool, error) {
// CleanPrefix 清理某个前缀的缓存数据
func (this *FileList) CleanPrefix(prefix string) error {
if !this.isReady {
return nil
}
if len(prefix) == 0 {
return nil
}
@@ -262,106 +160,48 @@ func (this *FileList) CleanPrefix(prefix string) error {
this.memoryCache.Clean()
}()
var count = int64(10000)
var staleLife = 600 // TODO 需要可以设置
var unixTime = utils.UnixTime() // 只删除当前的,不删除新的
for {
result, err := this.db.Exec(`UPDATE "`+this.itemsTableName+`" SET expiredAt=0,staleAt=? WHERE id IN (SELECT id FROM "`+this.itemsTableName+`" WHERE expiredAt>0 AND createdAt<=? AND INSTR("key", ?)=1 LIMIT `+types.String(count)+`)`, unixTime+int64(staleLife), unixTime, prefix)
for _, db := range this.dbList {
err := db.CleanPrefix(prefix)
if err != nil {
return err
}
affectedRows, err := result.RowsAffected()
if err != nil {
return err
}
if affectedRows < count {
return nil
}
}
return nil
}
func (this *FileList) Remove(hash string) error {
if !this.isReady {
return nil
}
// 从缓存中删除
this.memoryCache.Delete(hash)
row := this.selectByHashStmt.QueryRow(hash)
if row.Err() != nil {
return row.Err()
}
var item = &Item{Type: ItemTypeFile}
err := row.Scan(&item.Key, &item.HeaderSize, &item.BodySize, &item.MetaSize, &item.ExpiredAt)
if err != nil {
if err == sql.ErrNoRows {
return nil
}
return err
}
_, err = this.deleteByHashStmt.Exec(hash)
if err != nil {
return err
}
_, err = this.deleteHitByHashStmt.Exec(hash)
if err != nil {
return err
}
atomic.AddInt64(&this.total, -1)
if this.onRemove != nil {
this.onRemove(item)
}
return nil
_, err := this.remove(hash)
return err
}
// Purge 清理过期的缓存
// count 每次遍历的最大数量,控制此数字可以保证每次清理的时候不用花太多时间
// callback 每次发现过期key的调用
func (this *FileList) Purge(count int, callback func(hash string) error) (int, error) {
if !this.isReady {
return 0, nil
}
count /= CountFileDB
if count <= 0 {
count = 1000
count = 100
}
rows, err := this.purgeStmt.Query(time.Now().Unix(), count)
if err != nil {
return 0, err
}
hashStrings := []string{}
var countFound = 0
for rows.Next() {
var hash string
err = rows.Scan(&hash)
for _, db := range this.dbList {
hashStrings, err := db.ListExpiredItems(count)
if err != nil {
_ = rows.Close()
return 0, err
return 0, nil
}
hashStrings = append(hashStrings, hash)
countFound++
}
_ = rows.Close() // 不能使用defer防止读写冲突
countFound += len(hashStrings)
// 不在 rows.Next() 循环中操作是为了避免死锁
for _, hash := range hashStrings {
err = this.Remove(hash)
if err != nil {
return 0, err
}
// 不在 rows.Next() 循环中操作是为了避免死锁
for _, hash := range hashStrings {
err = this.Remove(hash)
if err != nil {
return 0, err
}
err = callback(hash)
if err != nil {
return 0, err
err = callback(hash)
if err != nil {
return 0, err
}
}
}
@@ -369,80 +209,80 @@ func (this *FileList) Purge(count int, callback func(hash string) error) (int, e
}
func (this *FileList) PurgeLFU(count int, callback func(hash string) error) error {
if !this.isReady {
return nil
}
count /= CountFileDB
if count <= 0 {
return nil
count = 100
}
rows, err := this.lfuHitsStmt.Query(count)
if err != nil {
return err
}
hashStrings := []string{}
var countFound = 0
for rows.Next() {
var hash string
err = rows.Scan(&hash)
if err != nil {
_ = rows.Close()
return err
}
hashStrings = append(hashStrings, hash)
countFound++
}
_ = rows.Close() // 不能使用defer防止读写冲突
// 不在 rows.Next() 循环中操作是为了避免死锁
for _, hash := range hashStrings {
err = this.Remove(hash)
for _, db := range this.dbList {
hashStrings, err := db.ListLFUItems(count)
if err != nil {
return err
}
err = callback(hash)
if err != nil {
return err
// 不在 rows.Next() 循环中操作是为了避免死锁
for _, hash := range hashStrings {
notFound, err := this.remove(hash)
if err != nil {
return err
}
if notFound {
_, err = db.deleteHitByHashStmt.Exec(hash)
if err != nil {
return err
}
}
err = callback(hash)
if err != nil {
return err
}
}
}
return nil
}
func (this *FileList) CleanAll() error {
if !this.isReady {
return nil
defer this.memoryCache.Clean()
for _, db := range this.dbList {
err := db.CleanAll()
if err != nil {
return err
}
}
this.memoryCache.Clean()
_, err := this.deleteAllStmt.Exec()
if err != nil {
return err
}
atomic.StoreInt64(&this.total, 0)
return nil
}
func (this *FileList) Stat(check func(hash string) bool) (*Stat, error) {
if !this.isReady {
return &Stat{}, nil
var result = &Stat{}
for _, db := range this.dbList {
if !db.IsReady() {
return &Stat{}, nil
}
// 这里不设置过期时间、不使用 check 函数,目的是让查询更快速一些
_ = check
row := db.statStmt.QueryRow()
if row.Err() != nil {
return nil, row.Err()
}
var stat = &Stat{}
err := row.Scan(&stat.Count, &stat.Size, &stat.ValueSize)
if err != nil {
return nil, err
}
result.Count += stat.Count
result.Size += stat.Size
result.ValueSize += stat.ValueSize
}
// 这里不设置过期时间、不使用 check 函数,目的是让查询更快速一些
row := this.statStmt.QueryRow()
if row.Err() != nil {
return nil, row.Err()
}
stat := &Stat{}
err := row.Scan(&stat.Count, &stat.Size, &stat.ValueSize)
if err != nil {
return nil, err
}
return stat, nil
return result, nil
}
// Count 总数量
@@ -453,9 +293,13 @@ func (this *FileList) Count() (int64, error) {
// IncreaseHit 增加点击量
func (this *FileList) IncreaseHit(hash string) error {
var week = timeutil.Format("YW")
_, err := this.increaseHitStmt.Exec(hash, week, week, week, week)
return err
var db = this.getDB(hash)
if !db.IsReady() {
return nil
}
return db.IncreaseHit(hash)
}
// OnAdd 添加事件
@@ -469,163 +313,176 @@ func (this *FileList) OnRemove(f func(item *Item)) {
}
func (this *FileList) Close() error {
this.isClosed = true
this.isReady = false
this.memoryCache.Destroy()
if this.db != nil {
_ = this.existsByHashStmt.Close()
_ = this.insertStmt.Close()
_ = this.selectByHashStmt.Close()
_ = this.deleteByHashStmt.Close()
_ = this.statStmt.Close()
_ = this.purgeStmt.Close()
_ = this.deleteAllStmt.Close()
_ = this.insertHitStmt.Close()
_ = this.increaseHitStmt.Close()
_ = this.deleteHitByHashStmt.Close()
_ = this.lfuHitsStmt.Close()
return this.db.Close()
for _, db := range this.dbList {
if db != nil {
_ = db.Close()
}
}
return nil
}
// 初始化
func (this *FileList) initTables(db *sql.DB, times int) error {
// 检查是否存在
_, err := db.Exec(`SELECT id FROM "` + this.itemsTableName + `" LIMIT 1`)
var notFound = false
func (this *FileList) GetDBIndex(hash string) uint64 {
return fnv.Hash(hash) % CountFileDB
}
func (this *FileList) getDB(hash string) *FileListDB {
return this.dbList[fnv.Hash(hash)%CountFileDB]
}
func (this *FileList) remove(hash string) (notFound bool, err error) {
var db = this.getDB(hash)
if !db.IsReady() {
return false, nil
}
// 从缓存中删除
this.memoryCache.Delete(hash)
var row = db.selectByHashStmt.QueryRow(hash)
if row.Err() != nil {
if row.Err() == sql.ErrNoRows {
return true, nil
}
return false, row.Err()
}
var item = &Item{Type: ItemTypeFile}
err = row.Scan(&item.Key, &item.HeaderSize, &item.BodySize, &item.MetaSize, &item.ExpiredAt)
if err != nil {
notFound = true
}
{
// expiredAt - 过期时间,用来判断有无过期
// staleAt - 陈旧最大时间,用来清理缓存
_, err := db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.itemsTableName + `" (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"hash" varchar(32),
"key" varchar(1024),
"tag" varchar(64),
"headerSize" integer DEFAULT 0,
"bodySize" integer DEFAULT 0,
"metaSize" integer DEFAULT 0,
"expiredAt" integer DEFAULT 0,
"staleAt" integer DEFAULT 0,
"createdAt" integer DEFAULT 0,
"host" varchar(128),
"serverId" integer
);
CREATE INDEX IF NOT EXISTS "createdAt"
ON "` + this.itemsTableName + `" (
"createdAt" ASC
);
CREATE INDEX IF NOT EXISTS "expiredAt"
ON "` + this.itemsTableName + `" (
"expiredAt" ASC
);
CREATE INDEX IF NOT EXISTS "staleAt"
ON "` + this.itemsTableName + `" (
"staleAt" ASC
);
CREATE UNIQUE INDEX IF NOT EXISTS "hash"
ON "` + this.itemsTableName + `" (
"hash" ASC
);
CREATE INDEX IF NOT EXISTS "serverId"
ON "` + this.itemsTableName + `" (
"serverId" ASC
);
`)
if err != nil {
// 尝试删除重建
if times < 3 {
_, dropErr := db.Exec(`DROP TABLE "` + this.itemsTableName + `"`)
if dropErr == nil {
return this.initTables(db, times+1)
}
return err
}
return err
if err == sql.ErrNoRows {
return true, nil
}
return false, err
}
// 如果数据为空,从老数据中加载数据
if notFound {
// v2 => v3
remotelogs.Println("CACHE", "transferring old data from v2 to v3 ...")
result, err := db.Exec(`INSERT INTO "` + this.itemsTableName + `" ("id", "hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "createdAt", "host", "serverId", "staleAt") SELECT "id", "hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "createdAt", "host", "serverId", "expiredAt"+600 FROM cacheItems_v2`)
if err == nil {
count, _ := result.RowsAffected()
remotelogs.Println("CACHE", "transfer old data from v2 to v3 finished, "+types.String(count)+" rows transferred")
}
_, err = db.deleteByHashStmt.Exec(hash)
if err != nil {
return false, err
}
{
_, err := db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.hitsTableName + `" (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"hash" varchar(32),
"week1Hits" integer DEFAULT 0,
"week2Hits" integer DEFAULT 0,
"week" varchar(6)
);
atomic.AddInt64(&this.total, -1)
CREATE UNIQUE INDEX IF NOT EXISTS "hits_hash"
ON "` + this.hitsTableName + `" (
"hash" ASC
);
`)
if err != nil {
// 尝试删除重建
if times < 3 {
_, dropErr := db.Exec(`DROP TABLE "` + this.hitsTableName + `"`)
if dropErr == nil {
return this.initTables(db, times+1)
}
return err
}
return err
}
_, err = db.deleteHitByHashStmt.Exec(hash)
if err != nil {
return false, err
}
return nil
if this.onRemove != nil {
this.onRemove(item)
}
return false, nil
}
// 删除过期不用的表格
func (this *FileList) removeOldTables() error {
rows, err := this.db.Query(`SELECT "name" FROM sqlite_master WHERE "type"='table'`)
// 升级老版本数据库
func (this *FileList) upgradeOldDB() {
if len(this.oldDir) == 0 {
return
}
_ = this.UpgradeV3(this.oldDir, false)
}
func (this *FileList) UpgradeV3(oldDir string, brokenOnError bool) error {
// index.db
var indexDBPath = oldDir + "/index.db"
_, err := os.Stat(indexDBPath)
if err != nil {
return nil
}
remotelogs.Println("CACHE", "upgrading local database from '"+oldDir+"' ...")
defer func() {
_ = os.Remove(indexDBPath)
}()
db, err := sql.Open("sqlite3", "file:"+indexDBPath+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF")
if err != nil {
return err
}
defer func() {
_ = rows.Close()
_ = db.Close()
}()
for rows.Next() {
var name string
err = rows.Scan(&name)
var isFinished = false
var offset = 0
var count = 10000
for {
if isFinished {
break
}
err = func() error {
defer func() {
offset += count
}()
rows, err := db.Query(`SELECT "hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "staleAt", "createdAt", "host", "serverId" FROM "cacheItems_v3" LIMIT ?, ?`, offset, count)
if err != nil {
return err
}
defer func() {
_ = rows.Close()
}()
var hash = ""
var key = ""
var headerSize int64
var bodySize int64
var metaSize int64
var expiredAt int64
var staleAt int64
var createdAt int64
var host string
var serverId int64
isFinished = true
for rows.Next() {
isFinished = false
err = rows.Scan(&hash, &key, &headerSize, &bodySize, &metaSize, &expiredAt, &staleAt, &createdAt, &host, &serverId)
if err != nil {
if brokenOnError {
return err
}
return nil
}
err = this.Add(hash, &Item{
Type: ItemTypeFile,
Key: key,
ExpiredAt: expiredAt,
StaleAt: staleAt,
HeaderSize: headerSize,
BodySize: bodySize,
MetaSize: metaSize,
Host: host,
ServerId: serverId,
Week1Hits: 0,
Week2Hits: 0,
Week: 0,
})
if err != nil {
if brokenOnError {
return err
}
}
}
return nil
}()
if err != nil {
return err
}
if lists.ContainsString(this.oldTables, name) {
// 异步执行
goman.New(func() {
remotelogs.Println("CACHE", "remove old table '"+name+"' ...")
_, _ = this.db.Exec(`DROP TABLE "` + name + `"`)
remotelogs.Println("CACHE", "remove old table '"+name+"' done")
})
}
time.Sleep(1 * time.Second)
}
return nil
}

View File

@@ -0,0 +1,426 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package caches
import (
"database/sql"
"errors"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
"github.com/iwind/TeaGo/types"
timeutil "github.com/iwind/TeaGo/utils/time"
"time"
)
type FileListDB struct {
db *dbs.DB
itemsTableName string
hitsTableName string
total int64
isClosed bool
isReady bool
// cacheItems
existsByHashStmt *dbs.Stmt // 根据hash检查是否存在
insertStmt *dbs.Stmt // 写入数据
selectByHashStmt *dbs.Stmt // 使用hash查询数据
deleteByHashStmt *dbs.Stmt // 根据hash删除数据
statStmt *dbs.Stmt // 统计
purgeStmt *dbs.Stmt // 清理
deleteAllStmt *dbs.Stmt // 删除所有数据
listOlderItemsStmt *dbs.Stmt // 读取较早存储的缓存
// hits
insertHitStmt *dbs.Stmt // 写入数据
increaseHitStmt *dbs.Stmt // 增加点击量
deleteHitByHashStmt *dbs.Stmt // 根据hash删除数据
lfuHitsStmt *dbs.Stmt // 读取老的数据
}
func NewFileListDB() *FileListDB {
return &FileListDB{}
}
func (this *FileListDB) Open(dbPath string) error {
db, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF")
if err != nil {
return errors.New("open database failed: " + err.Error())
}
db.SetMaxOpenConns(1)
// TODO 耗时过长,暂时不整理数据库
/**_, err = db.Exec("VACUUM")
if err != nil {
return err
}**/
this.db = dbs.NewDB(db)
if teaconst.EnableDBStat {
this.db.EnableStat(true)
}
return nil
}
func (this *FileListDB) Init() error {
this.itemsTableName = "cacheItems"
this.hitsTableName = "hits"
// 创建
var err = this.initTables(1)
if err != nil {
return errors.New("init tables failed: " + err.Error())
}
// 读取总数量
row := this.db.QueryRow(`SELECT COUNT(*) FROM "` + this.itemsTableName + `"`)
if row.Err() != nil {
return row.Err()
}
var total int64
err = row.Scan(&total)
if err != nil {
return err
}
this.total = total
// 常用语句
this.existsByHashStmt, err = this.db.Prepare(`SELECT "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? AND expiredAt>? LIMIT 1`)
if err != nil {
return err
}
this.insertStmt, err = this.db.Prepare(`INSERT INTO "` + this.itemsTableName + `" ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "staleAt", "host", "serverId", "createdAt") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
if err != nil {
return err
}
this.selectByHashStmt, err = this.db.Prepare(`SELECT "key", "headerSize", "bodySize", "metaSize", "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? LIMIT 1`)
if err != nil {
return err
}
this.deleteByHashStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `" WHERE "hash"=?`)
if err != nil {
return err
}
this.statStmt, err = this.db.Prepare(`SELECT COUNT(*), IFNULL(SUM(headerSize+bodySize+metaSize), 0), IFNULL(SUM(headerSize+bodySize), 0) FROM "` + this.itemsTableName + `"`)
if err != nil {
return err
}
this.purgeStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.itemsTableName + `" WHERE staleAt<=? LIMIT ?`)
if err != nil {
return err
}
this.deleteAllStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `"`)
if err != nil {
return err
}
this.listOlderItemsStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.itemsTableName + `" ORDER BY "id" ASC LIMIT ?`)
this.insertHitStmt, err = this.db.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?)`)
this.increaseHitStmt, err = this.db.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?) ON CONFLICT("hash") DO UPDATE SET "week1Hits"=IIF("week"=?, "week1Hits", "week2Hits"), "week2Hits"=IIF("week"=?, "week2Hits"+1, 1), "week"=?`)
if err != nil {
return err
}
this.deleteHitByHashStmt, err = this.db.Prepare(`DELETE FROM "` + this.hitsTableName + `" WHERE "hash"=?`)
if err != nil {
return err
}
this.lfuHitsStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.hitsTableName + `" ORDER BY "week" ASC, "week1Hits"+"week2Hits" ASC LIMIT ?`)
if err != nil {
return err
}
this.isReady = true
return nil
}
func (this *FileListDB) IsReady() bool {
return this.isReady
}
func (this *FileListDB) Total() int64 {
return this.total
}
func (this *FileListDB) Add(hash string, item *Item) error {
if item.StaleAt == 0 {
item.StaleAt = item.ExpiredAt
}
// 放入队列
_, err := this.insertStmt.Exec(hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt, item.StaleAt, item.Host, item.ServerId, utils.UnixTime())
if err != nil {
return err
}
return nil
}
func (this *FileListDB) ListExpiredItems(count int) (hashList []string, err error) {
if !this.isReady {
return nil, nil
}
if count <= 0 {
count = 100
}
rows, err := this.purgeStmt.Query(time.Now().Unix(), count)
if err != nil {
return nil, err
}
defer func() {
_ = rows.Close()
}()
for rows.Next() {
var hash string
err = rows.Scan(&hash)
if err != nil {
return nil, err
}
hashList = append(hashList, hash)
}
return hashList, nil
}
func (this *FileListDB) ListLFUItems(count int) (hashList []string, err error) {
if !this.isReady {
return nil, nil
}
if count <= 0 {
count = 100
}
hashList, err = this.listLFUItems(count)
if err != nil {
return
}
if len(hashList) > count/2 {
return
}
// 不足补齐
olderHashList, err := this.listOlderItems(count - len(hashList))
if err != nil {
return nil, err
}
hashList = append(hashList, olderHashList...)
return
}
func (this *FileListDB) IncreaseHit(hash string) error {
var week = timeutil.Format("YW")
_, err := this.increaseHitStmt.Exec(hash, week, week, week, week)
return err
}
func (this *FileListDB) CleanPrefix(prefix string) error {
if !this.isReady {
return nil
}
var count = int64(10000)
var staleLife = 600 // TODO 需要可以设置
var unixTime = utils.UnixTime() // 只删除当前的,不删除新的
for {
result, err := this.db.Exec(`UPDATE "`+this.itemsTableName+`" SET expiredAt=0,staleAt=? WHERE id IN (SELECT id FROM "`+this.itemsTableName+`" WHERE expiredAt>0 AND createdAt<=? AND INSTR("key", ?)=1 LIMIT `+types.String(count)+`)`, unixTime+int64(staleLife), unixTime, prefix)
if err != nil {
return err
}
affectedRows, err := result.RowsAffected()
if err != nil {
return err
}
if affectedRows < count {
return nil
}
}
}
func (this *FileListDB) CleanAll() error {
if !this.isReady {
return nil
}
_, err := this.deleteAllStmt.Exec()
if err != nil {
return err
}
return nil
}
func (this *FileListDB) Close() error {
this.isClosed = true
this.isReady = false
if this.db != nil {
_ = this.existsByHashStmt.Close()
_ = this.insertStmt.Close()
_ = this.selectByHashStmt.Close()
_ = this.deleteByHashStmt.Close()
_ = this.statStmt.Close()
_ = this.purgeStmt.Close()
_ = this.deleteAllStmt.Close()
_ = this.listOlderItemsStmt.Close()
_ = this.insertHitStmt.Close()
_ = this.increaseHitStmt.Close()
_ = this.deleteHitByHashStmt.Close()
_ = this.lfuHitsStmt.Close()
return this.db.Close()
}
return nil
}
// 初始化
func (this *FileListDB) initTables(times int) error {
{
// expiredAt - 过期时间,用来判断有无过期
// staleAt - 陈旧最大时间,用来清理缓存
_, err := this.db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.itemsTableName + `" (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"hash" varchar(32),
"key" varchar(1024),
"tag" varchar(64),
"headerSize" integer DEFAULT 0,
"bodySize" integer DEFAULT 0,
"metaSize" integer DEFAULT 0,
"expiredAt" integer DEFAULT 0,
"staleAt" integer DEFAULT 0,
"createdAt" integer DEFAULT 0,
"host" varchar(128),
"serverId" integer
);
CREATE INDEX IF NOT EXISTS "createdAt"
ON "` + this.itemsTableName + `" (
"createdAt" ASC
);
CREATE INDEX IF NOT EXISTS "expiredAt"
ON "` + this.itemsTableName + `" (
"expiredAt" ASC
);
CREATE INDEX IF NOT EXISTS "staleAt"
ON "` + this.itemsTableName + `" (
"staleAt" ASC
);
CREATE UNIQUE INDEX IF NOT EXISTS "hash"
ON "` + this.itemsTableName + `" (
"hash" ASC
);
CREATE INDEX IF NOT EXISTS "serverId"
ON "` + this.itemsTableName + `" (
"serverId" ASC
);
`)
if err != nil {
// 尝试删除重建
if times < 3 {
_, dropErr := this.db.Exec(`DROP TABLE "` + this.itemsTableName + `"`)
if dropErr == nil {
return this.initTables(times + 1)
}
return err
}
return err
}
}
{
_, err := this.db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.hitsTableName + `" (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"hash" varchar(32),
"week1Hits" integer DEFAULT 0,
"week2Hits" integer DEFAULT 0,
"week" varchar(6)
);
CREATE UNIQUE INDEX IF NOT EXISTS "hits_hash"
ON "` + this.hitsTableName + `" (
"hash" ASC
);
`)
if err != nil {
// 尝试删除重建
if times < 3 {
_, dropErr := this.db.Exec(`DROP TABLE "` + this.hitsTableName + `"`)
if dropErr == nil {
return this.initTables(times + 1)
}
return err
}
return err
}
}
return nil
}
func (this *FileListDB) listLFUItems(count int) (hashList []string, err error) {
rows, err := this.lfuHitsStmt.Query(count)
if err != nil {
return nil, err
}
defer func() {
_ = rows.Close()
}()
for rows.Next() {
var hash string
err = rows.Scan(&hash)
if err != nil {
return nil, err
}
hashList = append(hashList, hash)
}
return hashList, nil
}
func (this *FileListDB) listOlderItems(count int) (hashList []string, err error) {
rows, err := this.listOlderItemsStmt.Query(count)
if err != nil {
return nil, err
}
defer func() {
_ = rows.Close()
}()
for rows.Next() {
var hash string
err = rows.Scan(&hash)
if err != nil {
return nil, err
}
hashList = append(hashList, hash)
}
return hashList, nil
}

View File

@@ -1,8 +1,9 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package caches
package caches_test
import (
"github.com/TeaOSLab/EdgeNode/internal/caches"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/rands"
@@ -15,21 +16,31 @@ import (
)
func TestFileList_Init(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1")
err := list.Init()
if err != nil {
t.Fatal(err)
}
defer func() {
_ = list.Close()
}()
t.Log("ok")
}
func TestFileList_Add(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1").(*caches.FileList)
err := list.Init()
if err != nil {
t.Fatal(err)
}
err = list.Add(stringutil.Md5("123456"), &Item{
defer func() {
_ = list.Close()
}()
var hash = stringutil.Md5("123456")
t.Log("db index:", list.GetDBIndex(hash))
err = list.Add(hash, &caches.Item{
Key: "123456",
ExpiredAt: time.Now().Unix(),
HeaderSize: 1,
@@ -41,19 +52,25 @@ func TestFileList_Add(t *testing.T) {
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}
func TestFileList_Add_Many(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1")
err := list.Init()
if err != nil {
t.Fatal(err)
}
defer func() {
_ = list.Close()
}()
before := time.Now()
for i := 0; i < 2000_0000; i++ {
for i := 0; i < 100_000; i++ {
u := "https://edge.teaos.cn/123456" + strconv.Itoa(i)
_ = list.Add(stringutil.Md5(u), &Item{
_ = list.Add(stringutil.Md5(u), &caches.Item{
Key: u,
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1,
@@ -72,36 +89,46 @@ func TestFileList_Add_Many(t *testing.T) {
}
func TestFileList_Exist(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1").(*caches.FileList)
err := list.Init()
if err != nil {
t.Fatal(err)
}
before := time.Now()
defer func() {
_ = list.Close()
}()
total, _ := list.Count()
t.Log("total:", total)
var before = time.Now()
defer func() {
t.Log(time.Since(before).Seconds()*1000, "ms")
}()
{
exists, err := list.Exist(stringutil.Md5("123456"))
var hash = stringutil.Md5("123456")
exists, err := list.Exist(hash)
if err != nil {
t.Fatal(err)
}
t.Log("exists:", exists)
t.Log(hash, "exists:", exists)
}
{
exists, err := list.Exist(stringutil.Md5("http://edge.teaos.cn/1234561"))
var hash = stringutil.Md5("http://edge.teaos.cn/1234561")
exists, err := list.Exist(hash)
if err != nil {
t.Fatal(err)
}
t.Log("exists:", exists)
t.Log(hash, "exists:", exists)
}
}
func TestFileList_Exist_Many_DB(t *testing.T) {
// 测试在多个数据库下的性能
var listSlice = []ListInterface{}
var listSlice = []caches.ListInterface{}
for i := 1; i <= 10; i++ {
list := NewFileList(Tea.Root + "/data/data" + strconv.Itoa(i))
list := caches.NewFileList(Tea.Root + "/data/data" + strconv.Itoa(i))
err := list.Init()
if err != nil {
t.Fatal(err)
@@ -151,13 +178,18 @@ func TestFileList_Exist_Many_DB(t *testing.T) {
}
func TestFileList_CleanPrefix(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1")
err := list.Init()
if err != nil {
t.Fatal(err)
}
defer func() {
_ = list.Close()
}()
before := time.Now()
err = list.CleanPrefix("1234")
err = list.CleanPrefix("123")
if err != nil {
t.Fatal(err)
}
@@ -165,12 +197,17 @@ func TestFileList_CleanPrefix(t *testing.T) {
}
func TestFileList_Remove(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1").(*caches.FileList)
err := list.Init()
if err != nil {
t.Fatal(err)
}
list.OnRemove(func(item *Item) {
defer func() {
_ = list.Close()
}()
list.OnRemove(func(item *caches.Item) {
t.Logf("remove %#v", item)
})
err = list.Remove(stringutil.Md5("123456"))
@@ -178,30 +215,71 @@ func TestFileList_Remove(t *testing.T) {
t.Fatal(err)
}
t.Log("ok")
t.Log("===count===")
t.Log(list.Count())
}
func TestFileList_Purge(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1")
err := list.Init()
if err != nil {
t.Fatal(err)
}
_, err = list.Purge(2, func(hash string) error {
defer func() {
_ = list.Close()
}()
var count = 0
_, err = list.Purge(caches.CountFileDB*2, func(hash string) error {
t.Log(hash)
count++
return nil
})
if err != nil {
t.Fatal(err)
}
t.Log("ok")
t.Log("ok, purged", count, "items")
}
func TestFileList_Stat(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
func TestFileList_PurgeLFU(t *testing.T) {
var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1")
err := list.Init()
if err != nil {
t.Fatal(err)
}
defer func() {
_ = list.Close()
}()
err = list.IncreaseHit(stringutil.Md5("123456"))
if err != nil {
t.Fatal(err)
}
var count = 0
err = list.PurgeLFU(caches.CountFileDB*2, func(hash string) error {
t.Log(hash)
count++
return nil
})
if err != nil {
t.Fatal(err)
}
t.Log("ok, purged", count, "items")
}
func TestFileList_Stat(t *testing.T) {
var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1")
err := list.Init()
if err != nil {
t.Fatal(err)
}
defer func() {
_ = list.Close()
}()
stat, err := list.Stat(nil)
if err != nil {
t.Fatal(err)
@@ -210,7 +288,7 @@ func TestFileList_Stat(t *testing.T) {
}
func TestFileList_Count(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
list := caches.NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
@@ -225,7 +303,7 @@ func TestFileList_Count(t *testing.T) {
}
func TestFileList_CleanAll(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
list := caches.NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
@@ -238,58 +316,17 @@ func TestFileList_CleanAll(t *testing.T) {
t.Log(list.Count())
}
func TestFileList_Conflict(t *testing.T) {
list := NewFileList(Tea.Root + "/data").(*FileList)
err := list.Init()
if err != nil {
t.Fatal(err)
}
rows, err := list.purgeStmt.Query(time.Now().Unix(), 1000)
if err != nil {
t.Fatal(err)
}
go func() {
time.Sleep(5 * time.Second)
_ = rows.Close()
}()
t.Log("before exists")
t.Log(list.Exist("123456"))
t.Log("after exists")
}
func TestFileList_IIF(t *testing.T) {
list := NewFileList(Tea.Root + "/data").(*FileList)
err := list.Init()
if err != nil {
t.Fatal(err)
}
rows, err := list.db.Query("SELECT IIF(0, 2, 3)")
if err != nil {
t.Fatal(err)
}
defer func() {
_ = rows.Close()
}()
if rows.Next() {
var result int
err = rows.Scan(&result)
if err != nil {
t.Fatal(err)
}
t.Log("result:", result)
}
}
func TestFileList_IncreaseHit(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1")
err := list.Init()
if err != nil {
t.Fatal(err)
}
defer func() {
_ = list.Close()
}()
var before = time.Now()
defer func() {
t.Log(time.Since(before).Seconds()*1000, "ms")
@@ -303,13 +340,32 @@ func TestFileList_IncreaseHit(t *testing.T) {
t.Log("ok")
}
func TestFileList_UpgradeV3(t *testing.T) {
var list = caches.NewFileList(Tea.Root + "/data/cache-index/p43").(*caches.FileList)
err := list.Init()
if err != nil {
t.Fatal(err)
}
defer func() {
_ = list.Close()
}()
err = list.UpgradeV3("/Users/WorkSpace/EdgeProject/EdgeCache/p43", false)
if err != nil {
t.Log(err)
return
}
t.Log("ok")
}
func BenchmarkFileList_Exist(b *testing.B) {
list := NewFileList(Tea.Root + "/data")
var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1")
err := list.Init()
if err != nil {
b.Fatal(err)
}
for i := 0; i < b.N; i++ {
_, _ = list.Exist("f0eb5b87e0b0041f3917002c0707475f")
_, _ = list.Exist("f0eb5b87e0b0041f3917002c0707475f" + types.String(i))
}
}

View File

@@ -122,11 +122,12 @@ func (this *FileStorage) Init() error {
return errors.New("[CACHE]cache storage dir can not be empty")
}
list := NewFileList(dir + "/p" + strconv.FormatInt(this.policy.Id, 10))
var list = NewFileList(Tea.Root + "/data/cache-index/p" + types.String(this.policy.Id))
err = list.Init()
if err != nil {
return err
}
list.(*FileList).SetOldDir(this.cacheConfig.Dir + "/p" + types.String(this.policy.Id))
this.list = list
stat, err := list.Stat(func(hash string) bool {
return true
@@ -317,7 +318,7 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, size int64, maxSize int64, isPartial bool) (Writer, error) {
// 是否正在退出
if teaconst.IsQuiting {
return nil, ErrWritingUnavaible
return nil, ErrWritingUnavailable
}
// 是否已忽略
@@ -336,6 +337,11 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, siz
if err == nil {
return writer, nil
}
// 如果队列满了,则等待
if err == ErrWritingQueueFull {
return nil, err
}
}
// 是否正在写入

View File

@@ -8,6 +8,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/TeaOSLab/EdgeNode/internal/utils"
setutils "github.com/TeaOSLab/EdgeNode/internal/utils/sets"
"github.com/TeaOSLab/EdgeNode/internal/utils/sizes"
"github.com/TeaOSLab/EdgeNode/internal/zero"
"github.com/cespare/xxhash"
"github.com/iwind/TeaGo/rands"
@@ -42,7 +43,8 @@ type MemoryStorage struct {
valuesMap map[uint64]*MemoryItem // hash => item
dirtyChan chan string // hash chan
dirtyChan chan string // hash chan
dirtyQueueSize int
purgeTicker *utils.Ticker
@@ -54,22 +56,25 @@ type MemoryStorage struct {
func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy, parentStorage StorageInterface) *MemoryStorage {
var dirtyChan chan string
var queueSize = policy.MemoryAutoFlushQueueSize
if parentStorage != nil {
var queueSize = policy.MemoryAutoFlushQueueSize
if queueSize <= 0 {
queueSize = 2048
queueSize = 2048 + int(policy.CapacityBytes()/sizes.G)*2048
}
dirtyChan = make(chan string, queueSize)
}
return &MemoryStorage{
parentStorage: parentStorage,
policy: policy,
list: NewMemoryList(),
locker: &sync.RWMutex{},
valuesMap: map[uint64]*MemoryItem{},
dirtyChan: dirtyChan,
writingKeyMap: map[string]zero.Zero{},
ignoreKeys: setutils.NewFixedSet(32768),
parentStorage: parentStorage,
policy: policy,
list: NewMemoryList(),
locker: &sync.RWMutex{},
valuesMap: map[uint64]*MemoryItem{},
dirtyChan: dirtyChan,
dirtyQueueSize: queueSize,
writingKeyMap: map[string]zero.Zero{},
ignoreKeys: setutils.NewFixedSet(32768),
}
}
@@ -101,11 +106,19 @@ func (this *MemoryStorage) Init() error {
// 启动定时Flush memory to disk任务
if this.parentStorage != nil {
goman.New(func() {
for hash := range this.dirtyChan {
this.flushItem(hash)
}
})
var threads = runtime.NumCPU()
if threads == 0 {
threads = 1
} else if threads > 8 {
threads = 8
}
for i := 0; i < threads; i++ {
goman.New(func() {
for hash := range this.dirtyChan {
this.flushItem(hash)
}
})
}
}
return nil
@@ -165,6 +178,15 @@ func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int, s
}
func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, size int64, maxSize int64, isDirty bool) (Writer, error) {
// 待写入队列是否已满
if isDirty &&
this.parentStorage != nil &&
this.dirtyQueueSize > 0 &&
len(this.dirtyChan) == this.dirtyQueueSize &&
(expiredAt <= 0 || expiredAt > time.Now().Unix()+7200) { // 缓存时间过长
return nil, ErrWritingQueueFull
}
this.locker.Lock()
defer this.locker.Unlock()

View File

@@ -604,9 +604,8 @@ func (this *Node) listenSignals() {
signal.Notify(queue, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGQUIT)
goman.New(func() {
for range queue {
events.Notify(events.EventTerminated)
time.Sleep(100 * time.Millisecond)
os.Exit(0)
utils.Exit()
return
}
})
@@ -650,7 +649,7 @@ func (this *Node) listenSock() error {
// 退出主进程
events.Notify(events.EventQuit)
os.Exit(0)
utils.Exit()
case "quit":
_ = cmd.ReplyOk()
_ = this.sock.Close()
@@ -662,7 +661,7 @@ func (this *Node) listenSock() error {
for {
countActiveConnections := sharedListenerManager.TotalActiveConnections()
if countActiveConnections <= 0 {
os.Exit(0)
utils.Exit()
return
}
time.Sleep(1 * time.Second)

View File

@@ -228,7 +228,7 @@ func (this *UpgradeManager) unzip(zipPath string) error {
func (this *UpgradeManager) restart() error {
// 重新启动
if DaemonIsOn && DaemonPid == os.Getppid() {
os.Exit(0) // TODO 试着更优雅重启
utils.Exit() // TODO 试着更优雅重启
} else {
exe, err := os.Executable()
if err != nil {
@@ -238,6 +238,9 @@ func (this *UpgradeManager) restart() error {
// quit
events.Notify(events.EventQuit)
// terminated
events.Notify(events.EventTerminated)
// 启动
cmd := exec.Command(exe, "start")
err = cmd.Start()

View File

@@ -24,7 +24,11 @@ func init() {
var stats = []string{}
for _, stat := range SharedQueryStatManager.TopN(10) {
var avg = stat.CostTotal / float64(stat.Calls)
stats = append(stats, fmt.Sprintf("%.2fms/%.2fms/%.2fms - %d - %s", stat.CostMin*1000, stat.CostMax*1000, avg*1000, stat.Calls, stat.Query))
var query = stat.Query
if len(query) > 100 {
query = query[:100]
}
stats = append(stats, fmt.Sprintf("%.2fms/%.2fms/%.2fms - %d - %s", stat.CostMin*1000, stat.CostMax*1000, avg*1000, stat.Calls, query))
}
logs.Println("====DB STATS====\n" + strings.Join(stats, "\n"))
}

13
internal/utils/exit.go Normal file
View File

@@ -0,0 +1,13 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package utils
import (
"github.com/TeaOSLab/EdgeNode/internal/events"
"os"
)
func Exit() {
events.Notify(events.EventTerminated)
os.Exit(0)
}

View File

@@ -0,0 +1,19 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package fnv
const (
offset64 uint64 = 14695981039346656037
prime64 uint64 = 1099511628211
)
// Hash
// 非unique Hash
func Hash(key string) uint64 {
var hash = offset64
for _, b := range key {
hash ^= uint64(b)
hash *= prime64
}
return hash
}

View File

@@ -0,0 +1,16 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package fnv_test
import (
"github.com/TeaOSLab/EdgeNode/internal/utils/fnv"
"github.com/iwind/TeaGo/types"
"testing"
)
func TestHash(t *testing.T) {
for _, key := range []string{"costarring", "liquid", "hello"} {
var h = fnv.Hash(key)
t.Log(key + " => " + types.String(h))
}
}

View File

@@ -3,7 +3,7 @@
package sizes
const (
K = 1024
K int64 = 1024
M = 1024 * K
G = 1024 * M
T = 1024 * G