优化缓存LFU逻辑

This commit is contained in:
GoEdgeLab
2023-09-14 18:30:11 +08:00
parent 8133ba24f1
commit e0063aaf32
10 changed files with 101 additions and 334 deletions

View File

@@ -3,7 +3,6 @@ package caches
import (
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
"strings"
"time"
)
type ItemType = int
@@ -16,7 +15,7 @@ const (
// 计算当前周
// 不要用YW因为需要计算两周是否临近
func currentWeek() int32 {
return int32(time.Now().Unix() / 86400)
return int32(fasttime.Now().Unix() / 86400)
}
type Item struct {
@@ -29,9 +28,6 @@ type Item struct {
MetaSize int64 `json:"metaSize"`
Host string `json:"host"` // 主机名
ServerId int64 `json:"serverId"` // 服务ID
Week1Hits int64 `json:"week1Hits"`
Week2Hits int64 `json:"week2Hits"`
Week int32 `json:"week"`
}
@@ -47,20 +43,6 @@ func (this *Item) Size() int64 {
return this.HeaderSize + this.BodySize
}
func (this *Item) IncreaseHit(week int32) {
if this.Week == week {
this.Week2Hits++
} else {
if week-this.Week == 1 {
this.Week1Hits = this.Week2Hits
} else {
this.Week1Hits = 0
}
this.Week2Hits = 1
this.Week = week
}
}
func (this *Item) RequestURI() string {
var schemeIndex = strings.Index(this.Key, "://")
if schemeIndex <= 0 {

View File

@@ -1,8 +1,9 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package caches
package caches_test
import (
"github.com/TeaOSLab/EdgeNode/internal/caches"
"github.com/TeaOSLab/EdgeNode/internal/zero"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
@@ -11,27 +12,14 @@ import (
"time"
)
func TestItem_IncreaseHit(t *testing.T) {
var week = currentWeek()
var item = &Item{}
//item.Week = 2704
item.Week2Hits = 100
item.IncreaseHit(week)
t.Log("week:", item.Week, "week1:", item.Week1Hits, "week2:", item.Week2Hits)
item.IncreaseHit(week)
t.Log("week:", item.Week, "week1:", item.Week1Hits, "week2:", item.Week2Hits)
}
func TestItems_Memory(t *testing.T) {
var stat = &runtime.MemStats{}
runtime.ReadMemStats(stat)
var memory1 = stat.HeapInuse
var items = []*Item{}
var items = []*caches.Item{}
for i := 0; i < 10_000_000; i++ {
items = append(items, &Item{
items = append(items, &caches.Item{
Key: types.String(i),
})
}
@@ -41,18 +29,11 @@ func TestItems_Memory(t *testing.T) {
t.Log(memory1, memory2, (memory2-memory1)/1024/1024, "M")
var weekItems = make(map[string]*Item, 10_000_000)
for _, item := range items {
weekItems[item.Key] = item
}
runtime.ReadMemStats(stat)
var memory3 = stat.HeapInuse
t.Log(memory2, memory3, (memory3-memory2)/1024/1024, "M")
time.Sleep(1 * time.Second)
t.Log(len(items), len(weekItems))
}
func TestItems_Memory2(t *testing.T) {
@@ -88,7 +69,7 @@ func TestItem_RequestURI(t *testing.T) {
"https://goedge.cn:8080/hello/world",
"https://goedge.cn/hello/world?v=1&t=123",
} {
var item = &Item{Key: u}
var item = &caches.Item{Key: u}
t.Log(u, "=>", item.RequestURI())
}
}

View File

@@ -256,16 +256,10 @@ func (this *FileList) PurgeLFU(count int, callback func(hash string) error) erro
// 不在 rows.Next() 循环中操作是为了避免死锁
for _, hash := range hashStrings {
notFound, err := this.remove(hash)
_, err = this.remove(hash)
if err != nil {
return err
}
if notFound {
err = db.DeleteHitAsync(hash)
if err != nil {
return db.WrapError(err)
}
}
err = callback(hash)
if err != nil {
@@ -393,11 +387,6 @@ func (this *FileList) remove(hash string) (notFound bool, err error) {
return false, db.WrapError(err)
}
err = db.DeleteHitAsync(hash)
if err != nil {
return false, db.WrapError(err)
}
if this.onRemove != nil {
// when remove file item, no any extra information needed
this.onRemove(nil)
@@ -493,9 +482,6 @@ func (this *FileList) UpgradeV3(oldDir string, brokenOnError bool) error {
MetaSize: metaSize,
Host: host,
ServerId: serverId,
Week1Hits: 0,
Week2Hits: 0,
Week: 0,
})
if err != nil {
if brokenOnError {

View File

@@ -34,7 +34,6 @@ type FileListDB struct {
hashMap *FileListHashMap
itemsTableName string
hitsTableName string
isClosed bool
isReady bool
@@ -57,11 +56,6 @@ type FileListDB struct {
deleteAllStmt *dbs.Stmt // 删除所有数据
listOlderItemsStmt *dbs.Stmt // 读取较早存储的缓存
updateAccessWeekSQL string // 修改访问日期
// hits
insertHitSQL string // 写入数据
increaseHitSQL string // 增加点击量
deleteHitByHashSQL string // 根据hash删除数据
}
func NewFileListDB() *FileListDB {
@@ -142,7 +136,6 @@ func (this *FileListDB) Open(dbPath string) error {
func (this *FileListDB) Init() error {
this.itemsTableName = "cacheItems"
this.hitsTableName = "hits"
// 创建
var err = this.initTables(1)
@@ -200,12 +193,6 @@ func (this *FileListDB) Init() error {
this.updateAccessWeekSQL = `UPDATE "` + this.itemsTableName + `" SET "accessWeek"=? WHERE "hash"=?`
this.insertHitSQL = `INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?)`
this.increaseHitSQL = `INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?) ON CONFLICT("hash") DO UPDATE SET "week1Hits"=IIF("week"=?, "week1Hits", "week2Hits"), "week2Hits"=IIF("week"=?, "week2Hits"+1, 1), "week"=?`
this.deleteHitByHashSQL = `DELETE FROM "` + this.hitsTableName + `" WHERE "hash"=?`
this.isReady = true
// 加载HashMap
@@ -352,16 +339,10 @@ func (this *FileListDB) ListHashes(lastId int64) (hashList []string, maxId int64
func (this *FileListDB) IncreaseHitAsync(hash string) error {
var week = timeutil.Format("YW")
this.writeBatch.Add(this.increaseHitSQL, hash, week, week, week, week)
this.writeBatch.Add(this.updateAccessWeekSQL, week, hash)
return nil
}
func (this *FileListDB) DeleteHitAsync(hash string) error {
this.writeBatch.Add(this.deleteHitByHashSQL, hash)
return nil
}
func (this *FileListDB) CleanPrefix(prefix string) error {
if !this.isReady {
return nil
@@ -600,32 +581,9 @@ ALTER TABLE "cacheItems" ADD "accessWeek" varchar(6);
}
}
// 删除hits表
{
_, err := this.writeDB.Exec(`CREATE TABLE IF NOT EXISTS "` + this.hitsTableName + `" (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"hash" varchar(32),
"week1Hits" integer DEFAULT 0,
"week2Hits" integer DEFAULT 0,
"week" varchar(6)
);
CREATE UNIQUE INDEX IF NOT EXISTS "hits_hash"
ON "` + this.hitsTableName + `" (
"hash" ASC
);
`)
if err != nil {
// 尝试删除重建
if times < 3 {
_, dropErr := this.writeDB.Exec(`DROP TABLE "` + this.hitsTableName + `"`)
if dropErr == nil {
return this.initTables(times + 1)
}
return this.WrapError(err)
}
return this.WrapError(err)
}
_, _ = this.writeDB.Exec(`DROP TABLE "hits"`)
}
return nil

View File

@@ -7,7 +7,6 @@ import (
"github.com/iwind/TeaGo/Tea"
_ "github.com/iwind/TeaGo/bootstrap"
"testing"
"time"
)
func TestFileListDB_ListLFUItems(t *testing.T) {
@@ -34,32 +33,6 @@ func TestFileListDB_ListLFUItems(t *testing.T) {
t.Log("[", len(hashList), "]", hashList)
}
func TestFileListDB_IncreaseHitAsync(t *testing.T) {
var db = caches.NewFileListDB()
defer func() {
_ = db.Close()
}()
err := db.Open(Tea.Root + "/data/cache-db-large.db")
if err != nil {
t.Fatal(err)
}
err = db.Init()
if err != nil {
t.Fatal(err)
}
err = db.IncreaseHitAsync("4598e5231ba47d6ec7aa9ea640ff2eaf")
if err != nil {
t.Fatal(err)
}
// wait transaction
time.Sleep(1 * time.Second)
}
func TestFileListDB_CleanMatchKey(t *testing.T) {
var db = caches.NewFileListDB()

View File

@@ -281,11 +281,6 @@ func TestFileList_PurgeLFU(t *testing.T) {
t.Fatal(err)
}
err = list.IncreaseHit(stringutil.Md5("123456"))
if err != nil {
t.Fatal(err)
}
var count = 0
err = list.PurgeLFU(caches.CountFileDB*2, func(hash string) error {
t.Log(hash)
@@ -356,41 +351,6 @@ func TestFileList_CleanAll(t *testing.T) {
t.Log(list.Count())
}
func TestFileList_IncreaseHit(t *testing.T) {
var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1")
defer func() {
_ = list.Close()
}()
err := list.Init()
if err != nil {
t.Fatal(err)
}
defer func() {
_ = list.Close()
}()
var before = time.Now()
defer func() {
t.Log(time.Since(before).Seconds()*1000, "ms")
}()
var count = 1_000_000
if !testutils.IsSingleTesting() {
count = 10
}
for i := 0; i < count; i++ {
err = list.IncreaseHit(stringutil.Md5("abc" + types.String(i)))
}
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}
func TestFileList_UpgradeV3(t *testing.T) {
var list = caches.NewFileList(Tea.Root + "/data/cache-index/p43").(*caches.FileList)

View File

@@ -2,7 +2,6 @@ package caches
import (
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
"github.com/TeaOSLab/EdgeNode/internal/zero"
"github.com/iwind/TeaGo/logs"
"net"
"net/url"
@@ -19,9 +18,6 @@ type MemoryList struct {
itemMaps map[string]map[string]*Item // prefix => { hash => item }
weekItemMaps map[int32]map[string]zero.Zero // week => { hash => Zero }
minWeek int32
prefixes []string
locker sync.RWMutex
onAdd func(item *Item)
@@ -33,8 +29,6 @@ type MemoryList struct {
func NewMemoryList() ListInterface {
return &MemoryList{
itemMaps: map[string]map[string]*Item{},
weekItemMaps: map[int32]map[string]zero.Zero{},
minWeek: currentWeek(),
}
}
@@ -56,7 +50,6 @@ func (this *MemoryList) Reset() error {
for key := range this.itemMaps {
this.itemMaps[key] = map[string]*Item{}
}
this.weekItemMaps = map[int32]map[string]zero.Zero{}
this.locker.Unlock()
atomic.StoreInt64(&this.count, 0)
@@ -65,10 +58,6 @@ func (this *MemoryList) Reset() error {
}
func (this *MemoryList) Add(hash string, item *Item) error {
if item.Week == 0 {
item.Week = currentWeek()
}
this.locker.Lock()
prefix := this.prefix(hash)
@@ -81,14 +70,6 @@ func (this *MemoryList) Add(hash string, item *Item) error {
// 先删除,为了可以正确触发统计
oldItem, ok := itemMap[hash]
if ok {
// 从week map中删除
if oldItem.Week > 0 {
wm, ok := this.weekItemMaps[oldItem.Week]
if ok {
delete(wm, hash)
}
}
// 回调
if this.onRemove != nil {
this.onRemove(oldItem)
@@ -104,14 +85,6 @@ func (this *MemoryList) Add(hash string, item *Item) error {
itemMap[hash] = item
// week map
wm, ok := this.weekItemMaps[item.Week]
if ok {
wm[hash] = zero.New()
} else {
this.weekItemMaps[item.Week] = map[string]zero.Zero{hash: zero.New()}
}
this.locker.Unlock()
return nil
}
@@ -242,14 +215,6 @@ func (this *MemoryList) Remove(hash string) error {
atomic.AddInt64(&this.count, -1)
delete(itemMap, hash)
// week map
if item.Week > 0 {
wm, ok := this.weekItemMaps[item.Week]
if ok {
delete(wm, hash)
}
}
}
this.locker.Unlock()
@@ -261,12 +226,12 @@ func (this *MemoryList) Remove(hash string) error {
// callback 每次发现过期key的调用
func (this *MemoryList) Purge(count int, callback func(hash string) error) (int, error) {
this.locker.Lock()
deletedHashList := []string{}
var deletedHashList = []string{}
if this.purgeIndex >= len(this.prefixes) {
this.purgeIndex = 0
}
prefix := this.prefixes[this.purgeIndex]
var prefix = this.prefixes[this.purgeIndex]
this.purgeIndex++
@@ -290,14 +255,6 @@ func (this *MemoryList) Purge(count int, callback func(hash string) error) (int,
delete(itemMap, hash)
deletedHashList = append(deletedHashList, hash)
// week map
if item.Week > 0 {
wm, ok := this.weekItemMaps[item.Week]
if ok {
delete(wm, hash)
}
}
countFound++
}
@@ -322,45 +279,22 @@ func (this *MemoryList) PurgeLFU(count int, callback func(hash string) error) er
return nil
}
var week = currentWeek()
if this.minWeek > week {
this.minWeek = week
}
var deletedHashList = []string{}
Loop:
for w := this.minWeek; w <= week; w++ {
this.minWeek = w
var week = currentWeek()
var round = 0
this.locker.Lock()
wm, ok := this.weekItemMaps[w]
if ok {
var wc = len(wm)
if wc == 0 {
delete(this.weekItemMaps, w)
} else {
if wc <= count {
delete(this.weekItemMaps, w)
}
// TODO 未来支持按照点击量排序
for hash := range wm {
count--
Loop:
for {
var found = false
round++
for _, itemMap := range this.itemMaps {
for hash, item := range itemMap {
found = true
if count < 0 {
this.locker.Unlock()
break Loop
}
delete(wm, hash)
itemMap, ok := this.itemMaps[this.prefix(hash)]
if !ok {
continue
}
item, ok := itemMap[hash]
if !ok {
if week-item.Week <= 1 /** 最近有在使用 **/ && round <= 3 /** 查找轮数过多还不满足数量要求的就不再限制 **/ {
continue
}
@@ -371,13 +305,21 @@ Loop:
atomic.AddInt64(&this.count, -1)
delete(itemMap, hash)
deletedHashList = append(deletedHashList, hash)
count--
if count <= 0 {
break Loop
}
break
}
}
} else {
delete(this.weekItemMaps, w)
if !found {
break
}
}
this.locker.Unlock()
}
// 执行外部操作
for _, hash := range deletedHashList {
@@ -451,23 +393,7 @@ func (this *MemoryList) IncreaseHit(hash string) error {
item, ok := itemMap[hash]
if ok {
var week = currentWeek()
// 交换位置
if item.Week > 0 && item.Week != week {
wm, ok := this.weekItemMaps[item.Week]
if ok {
delete(wm, hash)
}
wm, ok = this.weekItemMaps[week]
if ok {
wm[hash] = zero.New()
} else {
this.weekItemMaps[week] = map[string]zero.Zero{hash: zero.New()}
}
}
item.IncreaseHit(week)
item.Week = currentWeek()
}
this.locker.Unlock()

View File

@@ -6,7 +6,9 @@ import (
"github.com/cespare/xxhash"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
"math/rand"
"sort"
"strconv"
"testing"
"time"
@@ -32,7 +34,6 @@ func TestMemoryList_Add(t *testing.T) {
})
t.Log(list.prefixes)
logs.PrintAsJSON(list.itemMaps, t)
logs.PrintAsJSON(list.weekItemMaps, t)
t.Log(list.Count())
}
@@ -51,7 +52,6 @@ func TestMemoryList_Remove(t *testing.T) {
})
_ = list.Remove("b")
list.print(t)
logs.PrintAsJSON(list.weekItemMaps, t)
t.Log(list.Count())
}
@@ -83,7 +83,6 @@ func TestMemoryList_Purge(t *testing.T) {
return nil
})
list.print(t)
logs.PrintAsJSON(list.weekItemMaps, t)
for i := 0; i < 1000; i++ {
_, _ = list.Purge(100, func(hash string) error {
@@ -172,27 +171,64 @@ func TestMemoryList_CleanPrefix(t *testing.T) {
t.Log(time.Since(before).Seconds()*1000, "ms")
}
func TestMapRandomDelete(t *testing.T) {
var countMap = map[int]int{} // k => count
for j := 0; j < 1_000_000; j++ {
var m = map[int]bool{}
for i := 0; i < 100; i++ {
m[i] = true
}
var count = 0
for k := range m {
delete(m, k)
count++
if count >= 10 {
break
}
}
for k := range m {
countMap[k]++
}
}
var counts = []int{}
for _, count := range countMap {
counts = append(counts, count)
}
sort.Ints(counts)
t.Log("["+types.String(len(counts))+"]", counts)
}
func TestMemoryList_PurgeLFU(t *testing.T) {
var list = NewMemoryList().(*MemoryList)
list.minWeek = 2704
var before = time.Now()
defer func() {
t.Log(time.Since(before).Seconds()*1000, "ms")
}()
t.Log("current week:", currentWeek())
_ = list.Add("1", &Item{})
_ = list.Add("2", &Item{})
_ = list.Add("3", &Item{})
_ = list.Add("4", &Item{})
_ = list.Add("5", &Item{})
_ = list.Add("6", &Item{Week: 2704})
_ = list.Add("7", &Item{Week: 2704})
_ = list.Add("8", &Item{Week: 2705})
err := list.PurgeLFU(2, func(hash string) error {
//_ = list.IncreaseHit("1")
//_ = list.IncreaseHit("2")
//_ = list.IncreaseHit("3")
//_ = list.IncreaseHit("4")
//_ = list.IncreaseHit("5")
count, err := list.Count()
if err != nil {
t.Fatal(err)
}
t.Log("count items before purge:", count)
err = list.PurgeLFU(5, func(hash string) error {
t.Log("purge lfu:", hash)
return nil
})
@@ -201,40 +237,18 @@ func TestMemoryList_PurgeLFU(t *testing.T) {
}
t.Log("ok")
logs.PrintAsJSON(list.weekItemMaps, t)
t.Log(list.Count())
}
func TestMemoryList_IncreaseHit(t *testing.T) {
var list = NewMemoryList().(*MemoryList)
var item = &Item{}
item.Week = 2705
item.Week2Hits = 100
_ = list.Add("a", &Item{})
_ = list.Add("a", item)
t.Log("hits1:", item.Week1Hits, "hits2:", item.Week2Hits, "week:", item.Week)
logs.PrintAsJSON(list.weekItemMaps, t)
_ = list.IncreaseHit("a")
t.Log("hits1:", item.Week1Hits, "hits2:", item.Week2Hits, "week:", item.Week)
logs.PrintAsJSON(list.weekItemMaps, t)
_ = list.IncreaseHit("a")
t.Log("hits1:", item.Week1Hits, "hits2:", item.Week2Hits, "week:", item.Week)
logs.PrintAsJSON(list.weekItemMaps, t)
count, err = list.Count()
if err != nil {
t.Fatal(err)
}
t.Log("count items left:", count)
}
func TestMemoryList_CleanAll(t *testing.T) {
var list = NewMemoryList().(*MemoryList)
var item = &Item{}
item.Week = 2705
item.Week2Hits = 100
_ = list.Add("a", &Item{})
_ = list.CleanAll()
logs.PrintAsJSON(list.itemMaps, t)
logs.PrintAsJSON(list.weekItemMaps, t)
t.Log(list.Count())
}

View File

@@ -1004,6 +1004,11 @@ func (this *FileStorage) purgeLoop() {
var lfuFreePercent = this.policy.PersistenceLFUFreePercent
if lfuFreePercent <= 0 {
lfuFreePercent = 5
// TB级别
if capacityBytes>>30 > 1000 {
lfuFreePercent = 3
}
}
var hasFullDisk = this.mainDiskIsFull
@@ -1299,12 +1304,6 @@ func (this *FileStorage) increaseHit(key string, hash string, reader Reader) {
if rands.Int(0, rate) == 0 {
var memoryStorage = this.memoryStorage
var hitErr = this.list.IncreaseHit(hash)
if hitErr != nil {
// 此错误可以忽略
remotelogs.Error("CACHE", "increase hit failed: "+hitErr.Error())
}
// 增加到热点
// 这里不收录缓存尺寸过大的文件
if memoryStorage != nil && reader.BodySize() > 0 && reader.BodySize() < 128*sizes.M {

View File

@@ -12,7 +12,6 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/utils/sizes"
"github.com/TeaOSLab/EdgeNode/internal/zero"
"github.com/cespare/xxhash"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
"github.com/shirou/gopsutil/v3/load"
"math"
@@ -137,17 +136,6 @@ func (this *MemoryStorage) OpenReader(key string, useStale bool, isPartial bool)
}
this.locker.RUnlock()
// 增加点击量
// 1/1000采样
// TODO 考虑是否在缓存策略里设置
if rands.Int(0, 1000) == 0 {
var hitErr = this.list.IncreaseHit(types.String(hash))
if hitErr != nil {
// 此错误可以忽略
remotelogs.Error("CACHE", "increase hit failed: "+hitErr.Error())
}
}
return reader, nil
}
this.locker.RUnlock()