文件缓存增加自动限速/提升本地缓存数据库写入和查询速度

This commit is contained in:
GoEdgeLab
2022-04-20 18:23:26 +08:00
parent c8b8071bee
commit f39c59569a
7 changed files with 147 additions and 33 deletions

View File

@@ -58,6 +58,7 @@ func (this *FileListDB) Open(dbPath string) error {
writeDB.SetMaxOpenConns(1)
// TODO 耗时过长,暂时不整理数据库
// TODO 需要根据行数来判断是否VACUUM
/**_, err = db.Exec("VACUUM")
if err != nil {
return err
@@ -109,7 +110,7 @@ func (this *FileListDB) Init() error {
this.total = total
// 常用语句
this.existsByHashStmt, err = this.readDB.Prepare(`SELECT "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? AND expiredAt>? LIMIT 1`)
this.existsByHashStmt, err = this.readDB.Prepare(`SELECT "expiredAt" FROM "` + this.itemsTableName + `" INDEXED BY "hash" WHERE "hash"=? AND expiredAt>? LIMIT 1`)
if err != nil {
return err
}
@@ -354,7 +355,7 @@ func (this *FileListDB) Close() error {
func (this *FileListDB) initTables(times int) error {
{
// expiredAt - 过期时间,用来判断有无过期
// staleAt - 陈旧最大时间,用来清理缓存
// staleAt - 过时缓存最大时间,用来清理缓存
_, err := this.writeDB.Exec(`CREATE TABLE IF NOT EXISTS "` + this.itemsTableName + `" (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"hash" varchar(32),
@@ -370,15 +371,9 @@ func (this *FileListDB) initTables(times int) error {
"serverId" integer
);
CREATE INDEX IF NOT EXISTS "createdAt"
ON "` + this.itemsTableName + `" (
"createdAt" ASC
);
CREATE INDEX IF NOT EXISTS "expiredAt"
ON "` + this.itemsTableName + `" (
"expiredAt" ASC
);
DROP INDEX IF EXISTS "createdAt";
DROP INDEX IF EXISTS "expiredAt";
DROP INDEX IF EXISTS "serverId";
CREATE INDEX IF NOT EXISTS "staleAt"
ON "` + this.itemsTableName + `" (
@@ -389,11 +384,6 @@ CREATE UNIQUE INDEX IF NOT EXISTS "hash"
ON "` + this.itemsTableName + `" (
"hash" ASC
);
CREATE INDEX IF NOT EXISTS "serverId"
ON "` + this.itemsTableName + `" (
"serverId" ASC
);
`)
if err != nil {

View File

@@ -0,0 +1,83 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
package caches
import (
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"sync/atomic"
"time"
)
const (
minOpenFilesValue int32 = 2
maxOpenFilesValue int32 = 65535
modSlow int32 = 1
modFast int32 = 2
)
type MaxOpenFiles struct {
step int32
maxOpenFiles int32
ptr *int32
ticker *time.Ticker
mod int32
}
func NewMaxOpenFiles(step int32) *MaxOpenFiles {
if step <= 0 {
step = 2
}
var f = &MaxOpenFiles{
step: step,
maxOpenFiles: 2,
}
if teaconst.DiskIsFast {
f.maxOpenFiles = 32
}
f.ptr = &f.maxOpenFiles
f.ticker = time.NewTicker(1 * time.Second)
goman.New(func() {
for range f.ticker.C {
var mod = atomic.LoadInt32(&f.mod)
switch mod {
case modSlow:
// we decrease more quickly, with more steps
if atomic.AddInt32(f.ptr, -step*2) <= 0 {
atomic.StoreInt32(f.ptr, minOpenFilesValue)
}
case modFast:
if atomic.AddInt32(f.ptr, step) >= maxOpenFilesValue {
atomic.StoreInt32(f.ptr, maxOpenFilesValue)
}
}
// reset mod
atomic.StoreInt32(&f.mod, 0)
}
})
return f
}
func (this *MaxOpenFiles) Fast() {
if atomic.LoadInt32(&this.mod) == 0 {
this.mod = modFast
}
}
func (this *MaxOpenFiles) Slow() {
atomic.StoreInt32(&this.mod, modSlow)
}
func (this *MaxOpenFiles) Max() int32 {
if atomic.LoadInt32(&this.mod) == modSlow {
return 0
}
var v = atomic.LoadInt32(this.ptr)
if v <= minOpenFilesValue {
return minOpenFilesValue
}
return v
}

View File

@@ -0,0 +1,28 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
package caches_test
import (
"github.com/TeaOSLab/EdgeNode/internal/caches"
"testing"
"time"
)
func TestNewMaxOpenFiles(t *testing.T) {
var maxOpenFiles = caches.NewMaxOpenFiles(2)
maxOpenFiles.Fast()
t.Log(maxOpenFiles.Max())
maxOpenFiles.Fast()
time.Sleep(1 * time.Second)
t.Log(maxOpenFiles.Max())
maxOpenFiles.Slow()
t.Log(maxOpenFiles.Max())
maxOpenFiles.Slow()
t.Log(maxOpenFiles.Max())
maxOpenFiles.Slow()
t.Log(maxOpenFiles.Max())
}

View File

@@ -27,7 +27,6 @@ import (
"os"
"path/filepath"
"regexp"
"runtime"
"sort"
"strconv"
"strings"
@@ -58,21 +57,15 @@ const (
HotItemSize = 1024 // 热点数据数量
HotItemLifeSeconds int64 = 3600 // 热点数据生命周期
FileToMemoryMaxSize = 32 * sizes.M // 可以从文件写入到内存的最大文件尺寸
FileTmpSuffix = ".tmp"
)
var sharedWritingFileKeyMap = map[string]zero.Zero{} // key => bool
var sharedWritingFileKeyLocker = sync.Mutex{}
var maxOpenFiles = 3
var maxOpenFiles = NewMaxOpenFiles(2)
func init() {
if teaconst.DiskIsFast {
maxOpenFiles = runtime.NumCPU()
}
if maxOpenFiles < 3 {
maxOpenFiles = 3
}
}
const maxOpenFilesSlowCost = 1 * time.Microsecond
// FileStorage 文件缓存
// 文件结构:
@@ -263,7 +256,7 @@ func (this *FileStorage) Init() error {
var count = stat.Count
var size = stat.Size
cost := time.Since(before).Seconds() * 1000
var cost = time.Since(before).Seconds() * 1000
sizeMB := strconv.FormatInt(size, 10) + " Bytes"
if size > 1*sizes.G {
sizeMB = fmt.Sprintf("%.3f G", float64(size)/float64(sizes.G))
@@ -435,7 +428,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz
return nil, ErrFileIsWriting
}
if !isFlushing && len(sharedWritingFileKeyMap) >= maxOpenFiles {
if len(sharedWritingFileKeyMap) >= int(maxOpenFiles.Max()) {
sharedWritingFileKeyLocker.Unlock()
return nil, ErrTooManyOpenFiles
}
@@ -464,6 +457,8 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz
}
var hash = stringutil.Md5(key)
// TODO 可以只stat一次
var dir = this.options.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4]
_, err = os.Stat(dir)
if err != nil {
@@ -491,7 +486,15 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz
// 防止并发连续写入
return nil, ErrFileIsWriting
}
var tmpPath = cachePath + ".tmp"
var tmpPath = cachePath
var existsFile = false
if stat != nil {
existsFile = true
// 如果已经存在,则增加一个.tmp后缀防止读写冲突
tmpPath += FileTmpSuffix
}
if isPartial {
tmpPath = cachePathName + ".cache"
}
@@ -523,13 +526,19 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz
}
var flags = os.O_CREATE | os.O_WRONLY
if isNewCreated {
if isNewCreated && existsFile {
flags |= os.O_TRUNC
}
var before = time.Now()
writer, err := os.OpenFile(tmpPath, flags, 0666)
if err != nil {
return nil, err
}
if time.Since(before) >= maxOpenFilesSlowCost {
maxOpenFiles.Slow()
} else {
maxOpenFiles.Fast()
}
var removeOnFailure = true
defer func() {

View File

@@ -127,8 +127,8 @@ func (this *FileWriter) Close() error {
err = this.rawWriter.Close()
if err != nil {
_ = os.Remove(path)
} else {
err = os.Rename(path, strings.Replace(path, ".tmp", "", 1))
} else if strings.HasSuffix(path, FileTmpSuffix) {
err = os.Rename(path, strings.Replace(path, FileTmpSuffix, "", 1))
if err != nil {
_ = os.Remove(path)
}

View File

@@ -314,7 +314,7 @@ func (this *HTTPWriter) PrepareCache(resp *http.Response, size int64) {
remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error())
}
this.Header().Set("X-Cache", "BYPASS, open failed")
this.Header().Set("X-Cache", "BYPASS, too many requests")
return
}
this.cacheWriter = cacheWriter

View File

@@ -31,6 +31,10 @@ func (this *DB) EnableStat(b bool) {
this.enableStat = b
}
func (this *DB) Begin() (*sql.Tx, error) {
return this.rawDB.Begin()
}
func (this *DB) Prepare(query string) (*Stmt, error) {
stmt, err := this.rawDB.Prepare(query)
if err != nil {