mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-08 19:40:25 +08:00
提升缓存效率
This commit is contained in:
@@ -3,6 +3,7 @@
|
|||||||
package caches
|
package caches
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||||
@@ -15,7 +16,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const CountFileDB = 10
|
const CountFileDB = 20
|
||||||
|
|
||||||
// FileList 文件缓存列表管理
|
// FileList 文件缓存列表管理
|
||||||
type FileList struct {
|
type FileList struct {
|
||||||
@@ -131,8 +132,13 @@ func (this *FileList) Exist(hash string) (bool, error) {
|
|||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
rows, err := db.existsByHashStmt.Query(hash, time.Now().Unix())
|
ctx, cancelCtx := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||||||
|
rows, err := db.existsByHashStmt.QueryContext(ctx, hash, time.Now().Unix())
|
||||||
|
cancelCtx()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if err == context.DeadlineExceeded {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -423,7 +429,7 @@ func (this *FileList) UpgradeV3(oldDir string, brokenOnError bool) error {
|
|||||||
offset += count
|
offset += count
|
||||||
}()
|
}()
|
||||||
|
|
||||||
rows, err := db.Query(`SELECT "hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "staleAt", "createdAt", "host", "serverId" FROM "cacheItems_v3" LIMIT ?, ?`, offset, count)
|
rows, err := db.Query(`SELECT "hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "staleAt", "createdAt", "host", "serverId" FROM "cacheItems_v3" ORDER BY "id" ASC LIMIT ?, ?`, offset, count)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,11 +10,14 @@ import (
|
|||||||
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
|
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
|
||||||
"github.com/iwind/TeaGo/types"
|
"github.com/iwind/TeaGo/types"
|
||||||
timeutil "github.com/iwind/TeaGo/utils/time"
|
timeutil "github.com/iwind/TeaGo/utils/time"
|
||||||
|
"runtime"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type FileListDB struct {
|
type FileListDB struct {
|
||||||
db *dbs.DB
|
readDB *dbs.DB
|
||||||
|
writeDB *dbs.DB
|
||||||
|
|
||||||
itemsTableName string
|
itemsTableName string
|
||||||
hitsTableName string
|
hitsTableName string
|
||||||
@@ -46,12 +49,13 @@ func NewFileListDB() *FileListDB {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *FileListDB) Open(dbPath string) error {
|
func (this *FileListDB) Open(dbPath string) error {
|
||||||
db, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF")
|
// write db
|
||||||
|
writeDB, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=private&mode=rwc&_journal_mode=WAL&_sync=OFF&_cache_size=32000&_secure_delete=FAST")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.New("open database failed: " + err.Error())
|
return errors.New("open write database failed: " + err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
db.SetMaxOpenConns(1)
|
writeDB.SetMaxOpenConns(1)
|
||||||
|
|
||||||
// TODO 耗时过长,暂时不整理数据库
|
// TODO 耗时过长,暂时不整理数据库
|
||||||
/**_, err = db.Exec("VACUUM")
|
/**_, err = db.Exec("VACUUM")
|
||||||
@@ -59,10 +63,24 @@ func (this *FileListDB) Open(dbPath string) error {
|
|||||||
return err
|
return err
|
||||||
}**/
|
}**/
|
||||||
|
|
||||||
this.db = dbs.NewDB(db)
|
this.writeDB = dbs.NewDB(writeDB)
|
||||||
|
|
||||||
if teaconst.EnableDBStat {
|
if teaconst.EnableDBStat {
|
||||||
this.db.EnableStat(true)
|
this.writeDB.EnableStat(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// read db
|
||||||
|
readDB, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=private&mode=ro&_journal_mode=WAL&_sync=OFF&_cache_size=32000")
|
||||||
|
if err != nil {
|
||||||
|
return errors.New("open read database failed: " + err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
readDB.SetMaxOpenConns(runtime.NumCPU())
|
||||||
|
|
||||||
|
this.readDB = dbs.NewDB(readDB)
|
||||||
|
|
||||||
|
if teaconst.EnableDBStat {
|
||||||
|
this.readDB.EnableStat(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -79,7 +97,7 @@ func (this *FileListDB) Init() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 读取总数量
|
// 读取总数量
|
||||||
row := this.db.QueryRow(`SELECT COUNT(*) FROM "` + this.itemsTableName + `"`)
|
row := this.readDB.QueryRow(`SELECT COUNT(*) FROM "` + this.itemsTableName + `"`)
|
||||||
if row.Err() != nil {
|
if row.Err() != nil {
|
||||||
return row.Err()
|
return row.Err()
|
||||||
}
|
}
|
||||||
@@ -91,56 +109,56 @@ func (this *FileListDB) Init() error {
|
|||||||
this.total = total
|
this.total = total
|
||||||
|
|
||||||
// 常用语句
|
// 常用语句
|
||||||
this.existsByHashStmt, err = this.db.Prepare(`SELECT "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? AND expiredAt>? LIMIT 1`)
|
this.existsByHashStmt, err = this.readDB.Prepare(`SELECT "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? AND expiredAt>? LIMIT 1`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
this.insertStmt, err = this.db.Prepare(`INSERT INTO "` + this.itemsTableName + `" ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "staleAt", "host", "serverId", "createdAt") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
|
this.insertStmt, err = this.writeDB.Prepare(`INSERT INTO "` + this.itemsTableName + `" ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "staleAt", "host", "serverId", "createdAt") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
this.selectByHashStmt, err = this.db.Prepare(`SELECT "key", "headerSize", "bodySize", "metaSize", "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? LIMIT 1`)
|
this.selectByHashStmt, err = this.readDB.Prepare(`SELECT "key", "headerSize", "bodySize", "metaSize", "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? LIMIT 1`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
this.deleteByHashStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `" WHERE "hash"=?`)
|
this.deleteByHashStmt, err = this.writeDB.Prepare(`DELETE FROM "` + this.itemsTableName + `" WHERE "hash"=?`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
this.statStmt, err = this.db.Prepare(`SELECT COUNT(*), IFNULL(SUM(headerSize+bodySize+metaSize), 0), IFNULL(SUM(headerSize+bodySize), 0) FROM "` + this.itemsTableName + `"`)
|
this.statStmt, err = this.readDB.Prepare(`SELECT COUNT(*), IFNULL(SUM(headerSize+bodySize+metaSize), 0), IFNULL(SUM(headerSize+bodySize), 0) FROM "` + this.itemsTableName + `"`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
this.purgeStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.itemsTableName + `" WHERE staleAt<=? LIMIT ?`)
|
this.purgeStmt, err = this.readDB.Prepare(`SELECT "hash" FROM "` + this.itemsTableName + `" WHERE staleAt<=? LIMIT ?`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
this.deleteAllStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `"`)
|
this.deleteAllStmt, err = this.writeDB.Prepare(`DELETE FROM "` + this.itemsTableName + `"`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
this.listOlderItemsStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.itemsTableName + `" ORDER BY "id" ASC LIMIT ?`)
|
this.listOlderItemsStmt, err = this.readDB.Prepare(`SELECT "hash" FROM "` + this.itemsTableName + `" ORDER BY "id" ASC LIMIT ?`)
|
||||||
|
|
||||||
this.insertHitStmt, err = this.db.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?)`)
|
this.insertHitStmt, err = this.writeDB.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?)`)
|
||||||
|
|
||||||
this.increaseHitStmt, err = this.db.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?) ON CONFLICT("hash") DO UPDATE SET "week1Hits"=IIF("week"=?, "week1Hits", "week2Hits"), "week2Hits"=IIF("week"=?, "week2Hits"+1, 1), "week"=?`)
|
this.increaseHitStmt, err = this.writeDB.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?) ON CONFLICT("hash") DO UPDATE SET "week1Hits"=IIF("week"=?, "week1Hits", "week2Hits"), "week2Hits"=IIF("week"=?, "week2Hits"+1, 1), "week"=?`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
this.deleteHitByHashStmt, err = this.db.Prepare(`DELETE FROM "` + this.hitsTableName + `" WHERE "hash"=?`)
|
this.deleteHitByHashStmt, err = this.writeDB.Prepare(`DELETE FROM "` + this.hitsTableName + `" WHERE "hash"=?`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
this.lfuHitsStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.hitsTableName + `" ORDER BY "week" ASC, "week1Hits"+"week2Hits" ASC LIMIT ?`)
|
this.lfuHitsStmt, err = this.readDB.Prepare(`SELECT "hash" FROM "` + this.hitsTableName + `" ORDER BY "week" ASC, "week1Hits"+"week2Hits" ASC LIMIT ?`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -241,7 +259,7 @@ func (this *FileListDB) CleanPrefix(prefix string) error {
|
|||||||
var staleLife = 600 // TODO 需要可以设置
|
var staleLife = 600 // TODO 需要可以设置
|
||||||
var unixTime = utils.UnixTime() // 只删除当前的,不删除新的
|
var unixTime = utils.UnixTime() // 只删除当前的,不删除新的
|
||||||
for {
|
for {
|
||||||
result, err := this.db.Exec(`UPDATE "`+this.itemsTableName+`" SET expiredAt=0,staleAt=? WHERE id IN (SELECT id FROM "`+this.itemsTableName+`" WHERE expiredAt>0 AND createdAt<=? AND INSTR("key", ?)=1 LIMIT `+types.String(count)+`)`, unixTime+int64(staleLife), unixTime, prefix)
|
result, err := this.writeDB.Exec(`UPDATE "`+this.itemsTableName+`" SET expiredAt=0,staleAt=? WHERE id IN (SELECT id FROM "`+this.itemsTableName+`" WHERE expiredAt>0 AND createdAt<=? AND INSTR("key", ?)=1 LIMIT `+types.String(count)+`)`, unixTime+int64(staleLife), unixTime, prefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -272,24 +290,64 @@ func (this *FileListDB) Close() error {
|
|||||||
this.isClosed = true
|
this.isClosed = true
|
||||||
this.isReady = false
|
this.isReady = false
|
||||||
|
|
||||||
if this.db != nil {
|
if this.existsByHashStmt != nil {
|
||||||
_ = this.existsByHashStmt.Close()
|
_ = this.existsByHashStmt.Close()
|
||||||
_ = this.insertStmt.Close()
|
|
||||||
_ = this.selectByHashStmt.Close()
|
|
||||||
_ = this.deleteByHashStmt.Close()
|
|
||||||
_ = this.statStmt.Close()
|
|
||||||
_ = this.purgeStmt.Close()
|
|
||||||
_ = this.deleteAllStmt.Close()
|
|
||||||
_ = this.listOlderItemsStmt.Close()
|
|
||||||
|
|
||||||
_ = this.insertHitStmt.Close()
|
|
||||||
_ = this.increaseHitStmt.Close()
|
|
||||||
_ = this.deleteHitByHashStmt.Close()
|
|
||||||
_ = this.lfuHitsStmt.Close()
|
|
||||||
|
|
||||||
return this.db.Close()
|
|
||||||
}
|
}
|
||||||
|
if this.insertStmt != nil {
|
||||||
|
_ = this.insertStmt.Close()
|
||||||
|
}
|
||||||
|
if this.selectByHashStmt != nil {
|
||||||
|
_ = this.selectByHashStmt.Close()
|
||||||
|
}
|
||||||
|
if this.deleteByHashStmt != nil {
|
||||||
|
_ = this.deleteByHashStmt.Close()
|
||||||
|
}
|
||||||
|
if this.statStmt != nil {
|
||||||
|
_ = this.statStmt.Close()
|
||||||
|
}
|
||||||
|
if this.purgeStmt != nil {
|
||||||
|
_ = this.purgeStmt.Close()
|
||||||
|
}
|
||||||
|
if this.deleteAllStmt != nil {
|
||||||
|
_ = this.deleteAllStmt.Close()
|
||||||
|
}
|
||||||
|
if this.listOlderItemsStmt != nil {
|
||||||
|
_ = this.listOlderItemsStmt.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
if this.insertHitStmt != nil {
|
||||||
|
_ = this.insertHitStmt.Close()
|
||||||
|
}
|
||||||
|
if this.increaseHitStmt != nil {
|
||||||
|
_ = this.increaseHitStmt.Close()
|
||||||
|
}
|
||||||
|
if this.deleteHitByHashStmt != nil {
|
||||||
|
_ = this.deleteHitByHashStmt.Close()
|
||||||
|
}
|
||||||
|
if this.lfuHitsStmt != nil {
|
||||||
|
_ = this.lfuHitsStmt.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
var errStrings []string
|
||||||
|
|
||||||
|
if this.readDB != nil {
|
||||||
|
err := this.readDB.Close()
|
||||||
|
if err != nil {
|
||||||
|
errStrings = append(errStrings, err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if this.writeDB != nil {
|
||||||
|
err := this.writeDB.Close()
|
||||||
|
if err != nil {
|
||||||
|
errStrings = append(errStrings, err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(errStrings) == 0 {
|
||||||
return nil
|
return nil
|
||||||
|
}
|
||||||
|
return errors.New("close database failed: " + strings.Join(errStrings, ", "))
|
||||||
}
|
}
|
||||||
|
|
||||||
// 初始化
|
// 初始化
|
||||||
@@ -297,7 +355,7 @@ func (this *FileListDB) initTables(times int) error {
|
|||||||
{
|
{
|
||||||
// expiredAt - 过期时间,用来判断有无过期
|
// expiredAt - 过期时间,用来判断有无过期
|
||||||
// staleAt - 陈旧最大时间,用来清理缓存
|
// staleAt - 陈旧最大时间,用来清理缓存
|
||||||
_, err := this.db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.itemsTableName + `" (
|
_, err := this.writeDB.Exec(`CREATE TABLE IF NOT EXISTS "` + this.itemsTableName + `" (
|
||||||
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
|
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
|
||||||
"hash" varchar(32),
|
"hash" varchar(32),
|
||||||
"key" varchar(1024),
|
"key" varchar(1024),
|
||||||
@@ -341,7 +399,7 @@ ON "` + this.itemsTableName + `" (
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
// 尝试删除重建
|
// 尝试删除重建
|
||||||
if times < 3 {
|
if times < 3 {
|
||||||
_, dropErr := this.db.Exec(`DROP TABLE "` + this.itemsTableName + `"`)
|
_, dropErr := this.writeDB.Exec(`DROP TABLE "` + this.itemsTableName + `"`)
|
||||||
if dropErr == nil {
|
if dropErr == nil {
|
||||||
return this.initTables(times + 1)
|
return this.initTables(times + 1)
|
||||||
}
|
}
|
||||||
@@ -353,7 +411,7 @@ ON "` + this.itemsTableName + `" (
|
|||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
_, err := this.db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.hitsTableName + `" (
|
_, err := this.writeDB.Exec(`CREATE TABLE IF NOT EXISTS "` + this.hitsTableName + `" (
|
||||||
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
|
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
|
||||||
"hash" varchar(32),
|
"hash" varchar(32),
|
||||||
"week1Hits" integer DEFAULT 0,
|
"week1Hits" integer DEFAULT 0,
|
||||||
@@ -369,7 +427,7 @@ ON "` + this.hitsTableName + `" (
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
// 尝试删除重建
|
// 尝试删除重建
|
||||||
if times < 3 {
|
if times < 3 {
|
||||||
_, dropErr := this.db.Exec(`DROP TABLE "` + this.hitsTableName + `"`)
|
_, dropErr := this.writeDB.Exec(`DROP TABLE "` + this.hitsTableName + `"`)
|
||||||
if dropErr == nil {
|
if dropErr == nil {
|
||||||
return this.initTables(times + 1)
|
return this.initTables(times + 1)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -42,7 +42,7 @@ func TestFileList_Add(t *testing.T) {
|
|||||||
t.Log("db index:", list.GetDBIndex(hash))
|
t.Log("db index:", list.GetDBIndex(hash))
|
||||||
err = list.Add(hash, &caches.Item{
|
err = list.Add(hash, &caches.Item{
|
||||||
Key: "123456",
|
Key: "123456",
|
||||||
ExpiredAt: time.Now().Unix(),
|
ExpiredAt: time.Now().Unix() + 1,
|
||||||
HeaderSize: 1,
|
HeaderSize: 1,
|
||||||
MetaSize: 2,
|
MetaSize: 2,
|
||||||
BodySize: 3,
|
BodySize: 3,
|
||||||
@@ -53,6 +53,8 @@ func TestFileList_Add(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
t.Log(list.Exist(hash))
|
||||||
|
|
||||||
t.Log("ok")
|
t.Log("ok")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -205,12 +205,12 @@ func (this *FileStorage) Init() error {
|
|||||||
return errors.New("[CACHE]cache storage dir can not be empty")
|
return errors.New("[CACHE]cache storage dir can not be empty")
|
||||||
}
|
}
|
||||||
|
|
||||||
var list = NewFileList(Tea.Root + "/data/cache-index/p" + types.String(this.policy.Id))
|
var list = NewFileList(dir + "/p" + types.String(this.policy.Id) + "/.indexes")
|
||||||
err = list.Init()
|
err = list.Init()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
list.(*FileList).SetOldDir(this.options.Dir + "/p" + types.String(this.policy.Id))
|
list.(*FileList).SetOldDir(dir + "/p" + types.String(this.policy.Id))
|
||||||
this.list = list
|
this.list = list
|
||||||
stat, err := list.Stat(func(hash string) bool {
|
stat, err := list.Stat(func(hash string) bool {
|
||||||
return true
|
return true
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ import (
|
|||||||
"github.com/cespare/xxhash"
|
"github.com/cespare/xxhash"
|
||||||
"github.com/iwind/TeaGo/rands"
|
"github.com/iwind/TeaGo/rands"
|
||||||
"github.com/iwind/TeaGo/types"
|
"github.com/iwind/TeaGo/types"
|
||||||
|
"github.com/shirou/gopsutil/v3/load"
|
||||||
"math"
|
"math"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
@@ -93,17 +94,12 @@ func (this *MemoryStorage) Init() error {
|
|||||||
|
|
||||||
// 启动定时Flush memory to disk任务
|
// 启动定时Flush memory to disk任务
|
||||||
if this.parentStorage != nil {
|
if this.parentStorage != nil {
|
||||||
var threads = runtime.NumCPU()
|
// TODO 应该根据磁盘性能决定线程数
|
||||||
if threads == 0 {
|
var threads = 1
|
||||||
threads = 1
|
|
||||||
} else if threads > 8 {
|
|
||||||
threads = 8
|
|
||||||
}
|
|
||||||
for i := 0; i < threads; i++ {
|
for i := 0; i < threads; i++ {
|
||||||
goman.New(func() {
|
goman.New(func() {
|
||||||
for hash := range this.dirtyChan {
|
this.startFlush()
|
||||||
this.flushItem(hash)
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -169,8 +165,7 @@ func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, s
|
|||||||
if isDirty &&
|
if isDirty &&
|
||||||
this.parentStorage != nil &&
|
this.parentStorage != nil &&
|
||||||
this.dirtyQueueSize > 0 &&
|
this.dirtyQueueSize > 0 &&
|
||||||
len(this.dirtyChan) == this.dirtyQueueSize &&
|
len(this.dirtyChan) == this.dirtyQueueSize { // 缓存时间过长
|
||||||
(expiredAt <= 0 || expiredAt > time.Now().Unix()+7200) { // 缓存时间过长
|
|
||||||
return nil, ErrWritingQueueFull
|
return nil, ErrWritingQueueFull
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -420,7 +415,40 @@ func (this *MemoryStorage) purgeLoop() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Flush任务
|
// 开始Flush任务
|
||||||
|
func (this *MemoryStorage) startFlush() {
|
||||||
|
var statCount = 0
|
||||||
|
var writeDelayMS float64 = 0
|
||||||
|
|
||||||
|
for hash := range this.dirtyChan {
|
||||||
|
statCount++
|
||||||
|
|
||||||
|
if statCount == 100 {
|
||||||
|
statCount = 0
|
||||||
|
|
||||||
|
loadStat, err := load.Avg()
|
||||||
|
if err == nil && loadStat != nil {
|
||||||
|
if loadStat.Load1 > 10 {
|
||||||
|
writeDelayMS = 100
|
||||||
|
} else if loadStat.Load1 > 3 {
|
||||||
|
writeDelayMS = 50
|
||||||
|
} else if loadStat.Load1 > 2 {
|
||||||
|
writeDelayMS = 10
|
||||||
|
} else {
|
||||||
|
writeDelayMS = 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.flushItem(hash)
|
||||||
|
|
||||||
|
if writeDelayMS > 0 {
|
||||||
|
time.Sleep(time.Duration(writeDelayMS) * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 单次Flush任务
|
||||||
func (this *MemoryStorage) flushItem(key string) {
|
func (this *MemoryStorage) flushItem(key string) {
|
||||||
if this.parentStorage == nil {
|
if this.parentStorage == nil {
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -25,12 +25,12 @@ func init() {
|
|||||||
for _, stat := range SharedQueryStatManager.TopN(10) {
|
for _, stat := range SharedQueryStatManager.TopN(10) {
|
||||||
var avg = stat.CostTotal / float64(stat.Calls)
|
var avg = stat.CostTotal / float64(stat.Calls)
|
||||||
var query = stat.Query
|
var query = stat.Query
|
||||||
if len(query) > 100 {
|
if len(query) > 128 {
|
||||||
query = query[:100]
|
query = query[:128]
|
||||||
}
|
}
|
||||||
stats = append(stats, fmt.Sprintf("%.2fms/%.2fms/%.2fms - %d - %s", stat.CostMin*1000, stat.CostMax*1000, avg*1000, stat.Calls, query))
|
stats = append(stats, fmt.Sprintf("%.2fms/%.2fms/%.2fms - %d - %s", stat.CostMin*1000, stat.CostMax*1000, avg*1000, stat.Calls, query))
|
||||||
}
|
}
|
||||||
logs.Println("====DB STATS====\n" + strings.Join(stats, "\n"))
|
logs.Println("\n========== DB STATS ==========\n" + strings.Join(stats, "\n") + "\n=============================")
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user