优化文件缓存

This commit is contained in:
刘祥超
2021-06-13 17:37:57 +08:00
parent 993cda7766
commit 13194366a5
11 changed files with 309 additions and 101 deletions

View File

@@ -18,6 +18,8 @@ type Item struct {
HeaderSize int64 `json:"headerSize"`
BodySize int64 `json:"bodySize"`
MetaSize int64 `json:"metaSize"`
Host string `json:"host"` // 主机名
ServerId int64 `json:"serverId"` // 服务ID
}
func (this *Item) IsExpired() bool {

View File

@@ -5,8 +5,10 @@ package caches
import (
"database/sql"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/iwind/TeaGo/lists"
_ "github.com/mattn/go-sqlite3"
"os"
"strconv"
"sync/atomic"
"time"
)
@@ -19,6 +21,17 @@ type FileList struct {
onAdd func(item *Item)
onRemove func(item *Item)
existsByHashStmt *sql.Stmt // 根据hash检查是否存在
insertStmt *sql.Stmt // 写入数据
selectByHashStmt *sql.Stmt // 使用hash查询数据
deleteByHashStmt *sql.Stmt // 根据hash删除数据
statStmt *sql.Stmt // 统计
purgeStmt *sql.Stmt // 清理
deleteAllStmt *sql.Stmt // 删除所有数据
oldTables []string
itemsTableName string
}
func NewFileList(dir string) ListInterface {
@@ -36,51 +49,72 @@ func (this *FileList) Init() error {
remotelogs.Println("CACHE", "create cache dir '"+this.dir+"'")
}
db, err := sql.Open("sqlite3", "file:"+this.dir+"/index.db?cache=shared&mode=rwc")
this.itemsTableName = "cacheItems_v2"
db, err := sql.Open("sqlite3", "file:"+this.dir+"/index.db?cache=shared&mode=rwc&_journal_mode=WAL")
if err != nil {
return err
}
db.SetMaxOpenConns(1)
this.db = db
_, err = db.Exec("VACUUM")
// 清除旧表
this.oldTables = []string{
"cacheItems",
}
err = this.removeOldTables()
if err != nil {
remotelogs.Warn("CACHE", "clean old tables failed: "+err.Error())
}
// TODO 耗时过长,暂时不整理数据库
/**_, err = db.Exec("VACUUM")
if err != nil {
return err
}
}**/
// 创建
// TODO accessesAt 用来存储访问时间,将来可以根据此访问时间删除不常访问的内容
// 且访问时间只需要每隔一个小时存储一个整数值即可,因为不需要那么精确
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS "cacheItems" (
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.itemsTableName + `" (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"hash" varchar(32),
"key" varchar(1024),
"headerSize" integer DEFAULT 0,
"bodySize" integer DEFAULT 0,
"metaSize" integer DEFAULT 0,
"expiredAt" integer DEFAULT 0,
"accessedAt" integer DEFAULT 0
"accessedAt" integer DEFAULT 0,
"host" varchar(128),
"serverId" integer
);
CREATE INDEX IF NOT EXISTS "accessedAt"
ON "` + this.itemsTableName + `" (
"accessedAt" ASC
);
CREATE INDEX IF NOT EXISTS "expiredAt"
ON "` + this.itemsTableName + `" (
"expiredAt" ASC
);
CREATE UNIQUE INDEX IF NOT EXISTS "hash"
ON "cacheItems" (
"hash"
ON "` + this.itemsTableName + `" (
"hash" ASC
);
CREATE INDEX IF NOT EXISTS "expiredAt"
ON "cacheItems" (
"expiredAt"
);
CREATE INDEX IF NOT EXISTS "accessedAt"
ON "cacheItems" (
"accessedAt"
CREATE INDEX IF NOT EXISTS "serverId"
ON "` + this.itemsTableName + `" (
"serverId" ASC
);
`)
if err != nil {
return err
}
this.db = db
// 读取总数量
row := this.db.QueryRow("SELECT COUNT(*) FROM cacheItems")
row := this.db.QueryRow(`SELECT COUNT(*) FROM "` + this.itemsTableName + `"`)
if row.Err() != nil {
return row.Err()
}
@@ -91,6 +125,42 @@ ON "cacheItems" (
}
this.total = total
// 常用语句
this.existsByHashStmt, err = this.db.Prepare(`SELECT "bodySize" FROM "` + this.itemsTableName + `" WHERE "hash"=? AND expiredAt>? LIMIT 1`)
if err != nil {
return err
}
this.insertStmt, err = this.db.Prepare(`INSERT INTO "` + this.itemsTableName + `" ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "host", "serverId") VALUES (?, ?, ?, ?, ?, ?, ?, ?)`)
if err != nil {
return err
}
this.selectByHashStmt, err = this.db.Prepare(`SELECT "key", "headerSize", "bodySize", "metaSize", "expiredAt" FROM "` + this.itemsTableName + `" WHERE "hash"=? LIMIT 1`)
if err != nil {
return err
}
this.deleteByHashStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `" WHERE "hash"=?`)
if err != nil {
return err
}
this.statStmt, err = this.db.Prepare(`SELECT COUNT(*), IFNULL(SUM(headerSize+bodySize+metaSize), 0), IFNULL(SUM(headerSize+bodySize), 0) FROM "` + this.itemsTableName + `" WHERE expiredAt>?`)
if err != nil {
return err
}
this.purgeStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.itemsTableName + `" WHERE expiredAt<=? LIMIT ?`)
if err != nil {
return err
}
this.deleteAllStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemsTableName + `"`)
if err != nil {
return err
}
return nil
}
@@ -100,7 +170,7 @@ func (this *FileList) Reset() error {
}
func (this *FileList) Add(hash string, item *Item) error {
_, err := this.db.Exec(`INSERT INTO cacheItems ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt") VALUES (?, ?, ?, ?, ?, ?)`, hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt)
_, err := this.insertStmt.Exec(hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt, item.Host, item.ServerId)
if err != nil {
return err
}
@@ -114,54 +184,43 @@ func (this *FileList) Add(hash string, item *Item) error {
}
func (this *FileList) Exist(hash string) (bool, error) {
row := this.db.QueryRow(`SELECT "bodySize" FROM cacheItems WHERE "hash"=? LIMIT 1`, hash)
if row == nil {
return false, nil
}
if row.Err() != nil {
return false, row.Err()
}
var bodySize int
err := row.Scan(&bodySize)
rows, err := this.existsByHashStmt.Query(hash, time.Now().Unix())
if err != nil {
if err == sql.ErrNoRows {
return false, nil
}
return false, err
}
return true, nil
}
// FindKeysWithPrefix 根据前缀进行查找
func (this *FileList) FindKeysWithPrefix(prefix string) (keys []string, err error) {
if len(prefix) == 0 {
return
}
// TODO 需要优化上千万结果的情况
rows, err := this.db.Query(`SELECT "key" FROM cacheItems WHERE INSTR("key", ?)==1 LIMIT 100000`, prefix)
if err != nil {
return nil, err
}
defer func() {
_ = rows.Close()
}()
if rows.Next() {
return true, nil
}
return false, nil
}
for rows.Next() {
var key string
err = rows.Scan(&key)
if err != nil {
return nil, err
}
keys = append(keys, key)
// CleanPrefix 清理某个前缀的缓存数据
func (this *FileList) CleanPrefix(prefix string) error {
if len(prefix) == 0 {
return nil
}
return
var count = int64(10000)
for {
result, err := this.db.Exec(`UPDATE "`+this.itemsTableName+`" SET expiredAt=0 WHERE id IN (SELECT id FROM "`+this.itemsTableName+`" WHERE expiredAt>0 AND INSTR("key", ?)==1 LIMIT `+strconv.FormatInt(count, 10)+`)`, prefix)
if err != nil {
return err
}
affectedRows, err := result.RowsAffected()
if err != nil {
return err
}
if affectedRows < count {
return nil
}
}
}
func (this *FileList) Remove(hash string) error {
row := this.db.QueryRow(`SELECT "key", "headerSize", "bodySize", "metaSize", "expiredAt" FROM cacheItems WHERE "hash"=? LIMIT 1`, hash)
row := this.selectByHashStmt.QueryRow(hash)
if row.Err() != nil {
return row.Err()
}
@@ -175,7 +234,7 @@ func (this *FileList) Remove(hash string) error {
return err
}
_, err = this.db.Exec(`DELETE FROM cacheItems WHERE "hash"=?`, hash)
_, err = this.deleteByHashStmt.Exec(hash)
if err != nil {
return err
}
@@ -197,7 +256,7 @@ func (this *FileList) Purge(count int, callback func(hash string) error) error {
count = 1000
}
rows, err := this.db.Query(`SELECT "hash" FROM cacheItems WHERE expiredAt<=? LIMIT ?`, time.Now().Unix(), count)
rows, err := this.purgeStmt.Query(time.Now().Unix(), count)
if err != nil {
return err
}
@@ -232,7 +291,7 @@ func (this *FileList) Purge(count int, callback func(hash string) error) error {
}
func (this *FileList) CleanAll() error {
_, err := this.db.Exec("DELETE FROM cacheItems")
_, err := this.deleteAllStmt.Exec()
if err != nil {
return err
}
@@ -242,7 +301,7 @@ func (this *FileList) CleanAll() error {
func (this *FileList) Stat(check func(hash string) bool) (*Stat, error) {
// 这里不设置过期时间、不使用 check 函数,目的是让查询更快速一些
row := this.db.QueryRow("SELECT COUNT(*), IFNULL(SUM(headerSize+bodySize+metaSize), 0), IFNULL(SUM(headerSize+bodySize), 0) FROM cacheItems")
row := this.statStmt.QueryRow(time.Now().Unix())
if row.Err() != nil {
return nil, row.Err()
}
@@ -270,3 +329,37 @@ func (this *FileList) OnAdd(f func(item *Item)) {
func (this *FileList) OnRemove(f func(item *Item)) {
this.onRemove = f
}
func (this *FileList) Close() error {
if this.db != nil {
return this.db.Close()
}
return nil
}
func (this *FileList) removeOldTables() error {
rows, err := this.db.Query(`SELECT "name" FROM sqlite_master WHERE "type"='table'`)
if err != nil {
return err
}
defer func() {
_ = rows.Close()
}()
for rows.Next() {
var name string
err = rows.Scan(&name)
if err != nil {
return err
}
if lists.ContainsString(this.oldTables, name) {
// 异步执行
go func() {
remotelogs.Println("CACHE", "remove old table '"+name+"' ...")
_, _ = this.db.Exec(`DROP TABLE "` + name + `"`)
remotelogs.Println("CACHE", "remove old table '"+name+"' done")
}()
}
}
return nil
}

View File

@@ -4,8 +4,10 @@ package caches
import (
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/rands"
stringutil "github.com/iwind/TeaGo/utils/string"
"strconv"
"sync"
"testing"
"time"
)
@@ -31,6 +33,8 @@ func TestFileList_Add(t *testing.T) {
HeaderSize: 1,
MetaSize: 2,
BodySize: 3,
Host: "teaos.cn",
ServerId: 1,
})
if err != nil {
t.Fatal(err)
@@ -44,11 +48,12 @@ func TestFileList_Add_Many(t *testing.T) {
if err != nil {
t.Fatal(err)
}
for i := 0; i < 100_0000; i++ {
before := time.Now()
for i := 0; i < 2000_0000; i++ {
u := "http://edge.teaos.cn/123456" + strconv.Itoa(i)
err = list.Add(stringutil.Md5(u), &Item{
_ = list.Add(stringutil.Md5(u), &Item{
Key: u,
ExpiredAt: time.Now().Unix(),
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1,
MetaSize: 2,
BodySize: 3,
@@ -56,6 +61,10 @@ func TestFileList_Add_Many(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if i > 0 && i%10_000 == 0 {
t.Log(i, int(10000/time.Since(before).Seconds()), "qps")
before = time.Now()
}
}
t.Log("ok")
}
@@ -66,6 +75,10 @@ func TestFileList_Exist(t *testing.T) {
if err != nil {
t.Fatal(err)
}
before := time.Now()
defer func() {
t.Log(time.Since(before).Seconds()*1000, "ms")
}()
{
exists, err := list.Exist(stringutil.Md5("123456"))
if err != nil {
@@ -74,7 +87,7 @@ func TestFileList_Exist(t *testing.T) {
t.Log("exists:", exists)
}
{
exists, err := list.Exist(stringutil.Md5("654321"))
exists, err := list.Exist(stringutil.Md5("http://edge.teaos.cn/1234561"))
if err != nil {
t.Fatal(err)
}
@@ -82,18 +95,70 @@ func TestFileList_Exist(t *testing.T) {
}
}
func TestFileList_FindKeysWithPrefix(t *testing.T) {
func TestFileList_Exist_Many_DB(t *testing.T) {
// 测试在多个数据库下的性能
var listSlice = []ListInterface{}
for i := 1; i <= 10; i++ {
list := NewFileList(Tea.Root + "/data/data" + strconv.Itoa(i))
err := list.Init()
if err != nil {
t.Fatal(err)
}
listSlice = append(listSlice, list)
}
var wg = sync.WaitGroup{}
var threads = 8
wg.Add(threads)
var count = 200_000
var countLocker sync.Mutex
var tasks = make(chan int, count)
for i := 0; i < count; i++ {
tasks <- i
}
var hash = stringutil.Md5("http://edge.teaos.cn/1234561")
before := time.Now()
defer func() {
t.Log(time.Since(before).Seconds()*1000, "ms")
}()
for i := 0; i < threads; i++ {
go func() {
defer wg.Done()
for {
select {
case <-tasks:
countLocker.Lock()
count--
countLocker.Unlock()
var list = listSlice[rands.Int(0, len(listSlice)-1)]
_, _ = list.Exist(hash)
default:
return
}
}
}()
}
wg.Wait()
t.Log("left:", count)
}
func TestFileList_CleanPrefix(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
}
before := time.Now()
keys, err := list.FindKeysWithPrefix("1234")
err = list.CleanPrefix("1234")
if err != nil {
t.Fatal(err)
}
t.Log("keys:", keys)
t.Log(time.Since(before).Seconds()*1000, "ms")
}
@@ -170,3 +235,14 @@ func TestFileList_CleanAll(t *testing.T) {
t.Log("ok")
t.Log(list.Count())
}
func BenchmarkFileList_Exist(b *testing.B) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
b.Fatal(err)
}
for i := 0; i < b.N; i++ {
_, _ = list.Exist("f0eb5b87e0b0041f3917002c0707475f")
}
}

View File

@@ -3,23 +3,31 @@
package caches
type ListInterface interface {
// Init 初始化
Init() error
// Reset 重置数据
Reset() error
// Add 添加内容
Add(hash string, item *Item) error
// Exist 检查内容是否存在
Exist(hash string) (bool, error)
// FindKeysWithPrefix 根据前缀进行查找
FindKeysWithPrefix(prefix string) (keys []string, err error)
// CleanPrefix 清除某个前缀的缓存
CleanPrefix(prefix string) error
// Remove 删除内容
Remove(hash string) error
// Purge 清理过期数据
Purge(count int, callback func(hash string) error) error
// CleanAll 清除所有缓存
CleanAll() error
// Stat 统计
Stat(check func(hash string) bool) (*Stat, error)
// Count 总数量
@@ -30,4 +38,7 @@ type ListInterface interface {
// OnRemove 删除事件
OnRemove(f func(item *Item))
// Close 关闭
Close() error
}

View File

@@ -92,8 +92,8 @@ func (this *MemoryList) Exist(hash string) (bool, error) {
return !item.IsExpired(), nil
}
// FindKeysWithPrefix 根据前缀进行查找
func (this *MemoryList) FindKeysWithPrefix(prefix string) (keys []string, err error) {
// CleanPrefix 根据前缀进行清除
func (this *MemoryList) CleanPrefix(prefix string) error {
this.locker.RLock()
defer this.locker.RUnlock()
@@ -101,11 +101,11 @@ func (this *MemoryList) FindKeysWithPrefix(prefix string) (keys []string, err er
for _, itemMap := range this.itemMaps {
for _, item := range itemMap {
if strings.HasPrefix(item.Key, prefix) {
keys = append(keys, item.Key)
item.ExpiredAt = 0
}
}
}
return
return nil
}
func (this *MemoryList) Remove(hash string) error {
@@ -225,6 +225,10 @@ func (this *MemoryList) OnRemove(f func(item *Item)) {
this.onRemove = f
}
func (this *MemoryList) Close() error {
return nil
}
func (this *MemoryList) print(t *testing.T) {
this.locker.Lock()
for _, itemMap := range this.itemMaps {

View File

@@ -134,7 +134,7 @@ func TestMemoryList_Stat(t *testing.T) {
t.Log(result)
}
func TestMemoryList_FindKeysWithPrefix(t *testing.T) {
func TestMemoryList_CleanPrefix(t *testing.T) {
list := NewMemoryList()
_ = list.Init()
before := time.Now()
@@ -142,7 +142,7 @@ func TestMemoryList_FindKeysWithPrefix(t *testing.T) {
key := "http://www.teaos.cn/hello/" + strconv.Itoa(i/10000) + "/" + strconv.Itoa(i) + ".html"
_ = list.Add(fmt.Sprintf("%d", xxhash.Sum64String(key)), &Item{
Key: key,
ExpiredAt: 0,
ExpiredAt: time.Now().Unix() + 3600,
BodySize: 0,
HeaderSize: 0,
})
@@ -150,11 +150,15 @@ func TestMemoryList_FindKeysWithPrefix(t *testing.T) {
t.Log(time.Since(before).Seconds()*1000, "ms")
before = time.Now()
keys, err := list.FindKeysWithPrefix("http://www.teaos.cn/hello/50")
err := list.CleanPrefix("http://www.teaos.cn/hello/10")
if err != nil {
t.Fatal(err)
}
t.Log(len(keys))
logs.Println(list.Stat(func(hash string) bool {
return true
}))
t.Log(time.Since(before).Seconds()*1000, "ms")
}

View File

@@ -191,9 +191,10 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) {
}
}
_, path := this.keyPath(key)
hash, path := this.keyPath(key)
// TODO 尝试使用mmap加快读取速度
var isOk = false
fp, err := os.OpenFile(path, os.O_RDONLY, 0444)
if err != nil {
if !os.IsNotExist(err) {
@@ -201,6 +202,21 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) {
}
return nil, ErrNotFound
}
defer func() {
if !isOk {
_ = fp.Close()
_ = os.Remove(path)
}
}()
// 检查文件记录是否已过期
exists, err := this.list.Exist(hash)
if err != nil {
return nil, err
}
if !exists {
return nil, ErrNotFound
}
reader := NewFileReader(fp)
if err != nil {
@@ -210,6 +226,8 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) {
if err != nil {
return nil, err
}
isOk = true
return reader, nil
}
@@ -520,33 +538,18 @@ func (this *FileStorage) Purge(keys []string, urlType string) error {
// 目录
if urlType == "dir" {
resultKeys := []string{}
for _, key := range keys {
subKeys, err := this.list.FindKeysWithPrefix(key)
err := this.list.CleanPrefix(key)
if err != nil {
return err
}
resultKeys = append(resultKeys, subKeys...)
}
keys = resultKeys
}
// 文件
for _, key := range keys {
hash, path := this.keyPath(key)
exists, err := this.list.Exist(hash)
if err != nil {
return err
}
if !exists {
err := os.Remove(path)
if err != nil && !os.IsNotExist(err) {
return err
}
continue
}
err = os.Remove(path)
err := os.Remove(path)
if err != nil && !os.IsNotExist(err) {
return err
}
@@ -572,6 +575,8 @@ func (this *FileStorage) Stop() {
if this.ticker != nil {
this.ticker.Stop()
}
_ = this.list.Close()
}
// TotalDiskSize 消耗的磁盘尺寸
@@ -742,7 +747,7 @@ func (this *FileStorage) decodeFile(path string) (*Item, error) {
// 清理任务
func (this *FileStorage) purgeLoop() {
_ = this.list.Purge(1000, func(hash string) error {
err := this.list.Purge(1000, func(hash string) error {
path := this.hashPath(hash)
err := os.Remove(path)
if err != nil && !os.IsNotExist(err) {
@@ -750,6 +755,9 @@ func (this *FileStorage) purgeLoop() {
}
return nil
})
if err != nil {
remotelogs.Warn("CACHE", "purge file storage failed: " + err.Error())
}
}
func (this *FileStorage) readToBuff(fp *os.File, buf []byte) (ok bool, err error) {

View File

@@ -3,8 +3,10 @@ package caches
import (
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/cespare/xxhash"
"runtime"
"strconv"
"sync"
"sync/atomic"
@@ -182,15 +184,12 @@ func (this *MemoryStorage) CleanAll() error {
func (this *MemoryStorage) Purge(keys []string, urlType string) error {
// 目录
if urlType == "dir" {
resultKeys := []string{}
for _, key := range keys {
subKeys, err := this.list.FindKeysWithPrefix(key)
err := this.list.CleanPrefix(key)
if err != nil {
return err
}
resultKeys = append(resultKeys, subKeys...)
}
keys = resultKeys
}
for _, key := range keys {
@@ -205,13 +204,21 @@ func (this *MemoryStorage) Purge(keys []string, urlType string) error {
// Stop 停止缓存策略
func (this *MemoryStorage) Stop() {
this.locker.Lock()
defer this.locker.Unlock()
this.valuesMap = map[uint64]*MemoryItem{}
this.writingKeyMap = map[string]bool{}
_ = this.list.Reset()
if this.ticker != nil {
this.ticker.Stop()
}
_ = this.list.Close()
this.locker.Unlock()
runtime.GC()
remotelogs.Println("CACHE", "close memory storage '"+strconv.FormatInt(this.policy.Id, 10)+"'")
}
// Policy 获取当前存储的Policy

View File

@@ -11,6 +11,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/types"
"golang.org/x/net/http2"
"io"
"net"
"net/http"
"net/url"
@@ -1153,7 +1154,7 @@ func (this *HTTPRequest) canIgnore(err error) bool {
}
// 客户端主动取消
if err == context.Canceled {
if err == context.Canceled || err == io.ErrShortWrite {
return true
}

View File

@@ -231,6 +231,8 @@ func (this *HTTPWriter) Close() {
ExpiredAt: this.cacheWriter.ExpiredAt(),
HeaderSize: this.cacheWriter.HeaderSize(),
BodySize: this.cacheWriter.BodySize(),
Host: this.req.Host,
ServerId: this.req.Server.Id,
})
}
} else {