diff --git a/internal/caches/item.go b/internal/caches/item.go index 432fc80..439c93b 100644 --- a/internal/caches/item.go +++ b/internal/caches/item.go @@ -3,7 +3,6 @@ package caches import ( "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" "strings" - "time" ) type ItemType = int @@ -16,7 +15,7 @@ const ( // 计算当前周 // 不要用YW,因为需要计算两周是否临近 func currentWeek() int32 { - return int32(time.Now().Unix() / 86400) + return int32(fasttime.Now().Unix() / 86400) } type Item struct { @@ -29,10 +28,7 @@ type Item struct { MetaSize int64 `json:"metaSize"` Host string `json:"host"` // 主机名 ServerId int64 `json:"serverId"` // 服务ID - - Week1Hits int64 `json:"week1Hits"` - Week2Hits int64 `json:"week2Hits"` - Week int32 `json:"week"` + Week int32 `json:"week"` } func (this *Item) IsExpired() bool { @@ -47,20 +43,6 @@ func (this *Item) Size() int64 { return this.HeaderSize + this.BodySize } -func (this *Item) IncreaseHit(week int32) { - if this.Week == week { - this.Week2Hits++ - } else { - if week-this.Week == 1 { - this.Week1Hits = this.Week2Hits - } else { - this.Week1Hits = 0 - } - this.Week2Hits = 1 - this.Week = week - } -} - func (this *Item) RequestURI() string { var schemeIndex = strings.Index(this.Key, "://") if schemeIndex <= 0 { diff --git a/internal/caches/item_test.go b/internal/caches/item_test.go index 5c32d71..e7dd4b6 100644 --- a/internal/caches/item_test.go +++ b/internal/caches/item_test.go @@ -1,8 +1,9 @@ // Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. -package caches +package caches_test import ( + "github.com/TeaOSLab/EdgeNode/internal/caches" "github.com/TeaOSLab/EdgeNode/internal/zero" "github.com/iwind/TeaGo/rands" "github.com/iwind/TeaGo/types" @@ -11,27 +12,14 @@ import ( "time" ) -func TestItem_IncreaseHit(t *testing.T) { - var week = currentWeek() - - var item = &Item{} - //item.Week = 2704 - item.Week2Hits = 100 - item.IncreaseHit(week) - t.Log("week:", item.Week, "week1:", item.Week1Hits, "week2:", item.Week2Hits) - - item.IncreaseHit(week) - t.Log("week:", item.Week, "week1:", item.Week1Hits, "week2:", item.Week2Hits) -} - func TestItems_Memory(t *testing.T) { var stat = &runtime.MemStats{} runtime.ReadMemStats(stat) var memory1 = stat.HeapInuse - var items = []*Item{} + var items = []*caches.Item{} for i := 0; i < 10_000_000; i++ { - items = append(items, &Item{ + items = append(items, &caches.Item{ Key: types.String(i), }) } @@ -41,18 +29,11 @@ func TestItems_Memory(t *testing.T) { t.Log(memory1, memory2, (memory2-memory1)/1024/1024, "M") - var weekItems = make(map[string]*Item, 10_000_000) - - for _, item := range items { - weekItems[item.Key] = item - } - runtime.ReadMemStats(stat) var memory3 = stat.HeapInuse t.Log(memory2, memory3, (memory3-memory2)/1024/1024, "M") time.Sleep(1 * time.Second) - t.Log(len(items), len(weekItems)) } func TestItems_Memory2(t *testing.T) { @@ -88,7 +69,7 @@ func TestItem_RequestURI(t *testing.T) { "https://goedge.cn:8080/hello/world", "https://goedge.cn/hello/world?v=1&t=123", } { - var item = &Item{Key: u} + var item = &caches.Item{Key: u} t.Log(u, "=>", item.RequestURI()) } } diff --git a/internal/caches/list_file.go b/internal/caches/list_file.go index 721f18c..7f5e6ea 100644 --- a/internal/caches/list_file.go +++ b/internal/caches/list_file.go @@ -256,16 +256,10 @@ func (this *FileList) PurgeLFU(count int, callback func(hash string) error) erro // 不在 rows.Next() 循环中操作是为了避免死锁 for _, hash := range hashStrings { - notFound, err := this.remove(hash) + _, err = this.remove(hash) if err != nil { return err } - if notFound { - err = db.DeleteHitAsync(hash) - if err != nil { - return db.WrapError(err) - } - } err = callback(hash) if err != nil { @@ -393,11 +387,6 @@ func (this *FileList) remove(hash string) (notFound bool, err error) { return false, db.WrapError(err) } - err = db.DeleteHitAsync(hash) - if err != nil { - return false, db.WrapError(err) - } - if this.onRemove != nil { // when remove file item, no any extra information needed this.onRemove(nil) @@ -493,9 +482,6 @@ func (this *FileList) UpgradeV3(oldDir string, brokenOnError bool) error { MetaSize: metaSize, Host: host, ServerId: serverId, - Week1Hits: 0, - Week2Hits: 0, - Week: 0, }) if err != nil { if brokenOnError { diff --git a/internal/caches/list_file_db.go b/internal/caches/list_file_db.go index dc0cde7..19de8b9 100644 --- a/internal/caches/list_file_db.go +++ b/internal/caches/list_file_db.go @@ -34,7 +34,6 @@ type FileListDB struct { hashMap *FileListHashMap itemsTableName string - hitsTableName string isClosed bool isReady bool @@ -57,11 +56,6 @@ type FileListDB struct { deleteAllStmt *dbs.Stmt // 删除所有数据 listOlderItemsStmt *dbs.Stmt // 读取较早存储的缓存 updateAccessWeekSQL string // 修改访问日期 - - // hits - insertHitSQL string // 写入数据 - increaseHitSQL string // 增加点击量 - deleteHitByHashSQL string // 根据hash删除数据 } func NewFileListDB() *FileListDB { @@ -142,7 +136,6 @@ func (this *FileListDB) Open(dbPath string) error { func (this *FileListDB) Init() error { this.itemsTableName = "cacheItems" - this.hitsTableName = "hits" // 创建 var err = this.initTables(1) @@ -200,12 +193,6 @@ func (this *FileListDB) Init() error { this.updateAccessWeekSQL = `UPDATE "` + this.itemsTableName + `" SET "accessWeek"=? WHERE "hash"=?` - this.insertHitSQL = `INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?)` - - this.increaseHitSQL = `INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?) ON CONFLICT("hash") DO UPDATE SET "week1Hits"=IIF("week"=?, "week1Hits", "week2Hits"), "week2Hits"=IIF("week"=?, "week2Hits"+1, 1), "week"=?` - - this.deleteHitByHashSQL = `DELETE FROM "` + this.hitsTableName + `" WHERE "hash"=?` - this.isReady = true // 加载HashMap @@ -352,16 +339,10 @@ func (this *FileListDB) ListHashes(lastId int64) (hashList []string, maxId int64 func (this *FileListDB) IncreaseHitAsync(hash string) error { var week = timeutil.Format("YW") - this.writeBatch.Add(this.increaseHitSQL, hash, week, week, week, week) this.writeBatch.Add(this.updateAccessWeekSQL, week, hash) return nil } -func (this *FileListDB) DeleteHitAsync(hash string) error { - this.writeBatch.Add(this.deleteHitByHashSQL, hash) - return nil -} - func (this *FileListDB) CleanPrefix(prefix string) error { if !this.isReady { return nil @@ -600,32 +581,9 @@ ALTER TABLE "cacheItems" ADD "accessWeek" varchar(6); } } + // 删除hits表 { - _, err := this.writeDB.Exec(`CREATE TABLE IF NOT EXISTS "` + this.hitsTableName + `" ( - "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, - "hash" varchar(32), - "week1Hits" integer DEFAULT 0, - "week2Hits" integer DEFAULT 0, - "week" varchar(6) -); - -CREATE UNIQUE INDEX IF NOT EXISTS "hits_hash" -ON "` + this.hitsTableName + `" ( - "hash" ASC -); -`) - if err != nil { - // 尝试删除重建 - if times < 3 { - _, dropErr := this.writeDB.Exec(`DROP TABLE "` + this.hitsTableName + `"`) - if dropErr == nil { - return this.initTables(times + 1) - } - return this.WrapError(err) - } - - return this.WrapError(err) - } + _, _ = this.writeDB.Exec(`DROP TABLE "hits"`) } return nil diff --git a/internal/caches/list_file_db_test.go b/internal/caches/list_file_db_test.go index bd7f424..868579e 100644 --- a/internal/caches/list_file_db_test.go +++ b/internal/caches/list_file_db_test.go @@ -7,7 +7,6 @@ import ( "github.com/iwind/TeaGo/Tea" _ "github.com/iwind/TeaGo/bootstrap" "testing" - "time" ) func TestFileListDB_ListLFUItems(t *testing.T) { @@ -34,32 +33,6 @@ func TestFileListDB_ListLFUItems(t *testing.T) { t.Log("[", len(hashList), "]", hashList) } -func TestFileListDB_IncreaseHitAsync(t *testing.T) { - var db = caches.NewFileListDB() - - defer func() { - _ = db.Close() - }() - - err := db.Open(Tea.Root + "/data/cache-db-large.db") - if err != nil { - t.Fatal(err) - } - - err = db.Init() - if err != nil { - t.Fatal(err) - } - - err = db.IncreaseHitAsync("4598e5231ba47d6ec7aa9ea640ff2eaf") - if err != nil { - t.Fatal(err) - } - - // wait transaction - time.Sleep(1 * time.Second) -} - func TestFileListDB_CleanMatchKey(t *testing.T) { var db = caches.NewFileListDB() diff --git a/internal/caches/list_file_test.go b/internal/caches/list_file_test.go index e933010..d2238ab 100644 --- a/internal/caches/list_file_test.go +++ b/internal/caches/list_file_test.go @@ -281,11 +281,6 @@ func TestFileList_PurgeLFU(t *testing.T) { t.Fatal(err) } - err = list.IncreaseHit(stringutil.Md5("123456")) - if err != nil { - t.Fatal(err) - } - var count = 0 err = list.PurgeLFU(caches.CountFileDB*2, func(hash string) error { t.Log(hash) @@ -356,41 +351,6 @@ func TestFileList_CleanAll(t *testing.T) { t.Log(list.Count()) } -func TestFileList_IncreaseHit(t *testing.T) { - var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1") - - defer func() { - _ = list.Close() - }() - - err := list.Init() - if err != nil { - t.Fatal(err) - } - - defer func() { - _ = list.Close() - }() - - var before = time.Now() - defer func() { - t.Log(time.Since(before).Seconds()*1000, "ms") - }() - - var count = 1_000_000 - - if !testutils.IsSingleTesting() { - count = 10 - } - for i := 0; i < count; i++ { - err = list.IncreaseHit(stringutil.Md5("abc" + types.String(i))) - } - if err != nil { - t.Fatal(err) - } - t.Log("ok") -} - func TestFileList_UpgradeV3(t *testing.T) { var list = caches.NewFileList(Tea.Root + "/data/cache-index/p43").(*caches.FileList) diff --git a/internal/caches/list_memory.go b/internal/caches/list_memory.go index 29d8b2d..0caa2e5 100644 --- a/internal/caches/list_memory.go +++ b/internal/caches/list_memory.go @@ -2,7 +2,6 @@ package caches import ( "github.com/TeaOSLab/EdgeCommon/pkg/configutils" - "github.com/TeaOSLab/EdgeNode/internal/zero" "github.com/iwind/TeaGo/logs" "net" "net/url" @@ -19,9 +18,6 @@ type MemoryList struct { itemMaps map[string]map[string]*Item // prefix => { hash => item } - weekItemMaps map[int32]map[string]zero.Zero // week => { hash => Zero } - minWeek int32 - prefixes []string locker sync.RWMutex onAdd func(item *Item) @@ -32,9 +28,7 @@ type MemoryList struct { func NewMemoryList() ListInterface { return &MemoryList{ - itemMaps: map[string]map[string]*Item{}, - weekItemMaps: map[int32]map[string]zero.Zero{}, - minWeek: currentWeek(), + itemMaps: map[string]map[string]*Item{}, } } @@ -56,7 +50,6 @@ func (this *MemoryList) Reset() error { for key := range this.itemMaps { this.itemMaps[key] = map[string]*Item{} } - this.weekItemMaps = map[int32]map[string]zero.Zero{} this.locker.Unlock() atomic.StoreInt64(&this.count, 0) @@ -65,10 +58,6 @@ func (this *MemoryList) Reset() error { } func (this *MemoryList) Add(hash string, item *Item) error { - if item.Week == 0 { - item.Week = currentWeek() - } - this.locker.Lock() prefix := this.prefix(hash) @@ -81,14 +70,6 @@ func (this *MemoryList) Add(hash string, item *Item) error { // 先删除,为了可以正确触发统计 oldItem, ok := itemMap[hash] if ok { - // 从week map中删除 - if oldItem.Week > 0 { - wm, ok := this.weekItemMaps[oldItem.Week] - if ok { - delete(wm, hash) - } - } - // 回调 if this.onRemove != nil { this.onRemove(oldItem) @@ -104,14 +85,6 @@ func (this *MemoryList) Add(hash string, item *Item) error { itemMap[hash] = item - // week map - wm, ok := this.weekItemMaps[item.Week] - if ok { - wm[hash] = zero.New() - } else { - this.weekItemMaps[item.Week] = map[string]zero.Zero{hash: zero.New()} - } - this.locker.Unlock() return nil } @@ -242,14 +215,6 @@ func (this *MemoryList) Remove(hash string) error { atomic.AddInt64(&this.count, -1) delete(itemMap, hash) - - // week map - if item.Week > 0 { - wm, ok := this.weekItemMaps[item.Week] - if ok { - delete(wm, hash) - } - } } this.locker.Unlock() @@ -261,12 +226,12 @@ func (this *MemoryList) Remove(hash string) error { // callback 每次发现过期key的调用 func (this *MemoryList) Purge(count int, callback func(hash string) error) (int, error) { this.locker.Lock() - deletedHashList := []string{} + var deletedHashList = []string{} if this.purgeIndex >= len(this.prefixes) { this.purgeIndex = 0 } - prefix := this.prefixes[this.purgeIndex] + var prefix = this.prefixes[this.purgeIndex] this.purgeIndex++ @@ -290,14 +255,6 @@ func (this *MemoryList) Purge(count int, callback func(hash string) error) (int, delete(itemMap, hash) deletedHashList = append(deletedHashList, hash) - // week map - if item.Week > 0 { - wm, ok := this.weekItemMaps[item.Week] - if ok { - delete(wm, hash) - } - } - countFound++ } @@ -322,63 +279,48 @@ func (this *MemoryList) PurgeLFU(count int, callback func(hash string) error) er return nil } - var week = currentWeek() - if this.minWeek > week { - this.minWeek = week - } - var deletedHashList = []string{} + var week = currentWeek() + var round = 0 + + this.locker.Lock() + Loop: - for w := this.minWeek; w <= week; w++ { - this.minWeek = w + for { + var found = false + round++ + for _, itemMap := range this.itemMaps { + for hash, item := range itemMap { + found = true - this.locker.Lock() - wm, ok := this.weekItemMaps[w] - if ok { - var wc = len(wm) - if wc == 0 { - delete(this.weekItemMaps, w) - } else { - if wc <= count { - delete(this.weekItemMaps, w) + if week-item.Week <= 1 /** 最近有在使用 **/ && round <= 3 /** 查找轮数过多还不满足数量要求的就不再限制 **/ { + continue } - // TODO 未来支持按照点击量排序 - for hash := range wm { - count-- - - if count < 0 { - this.locker.Unlock() - break Loop - } - - delete(wm, hash) - - itemMap, ok := this.itemMaps[this.prefix(hash)] - if !ok { - continue - } - item, ok := itemMap[hash] - if !ok { - continue - } - - if this.onRemove != nil { - this.onRemove(item) - } - - atomic.AddInt64(&this.count, -1) - delete(itemMap, hash) - deletedHashList = append(deletedHashList, hash) + if this.onRemove != nil { + this.onRemove(item) } + + atomic.AddInt64(&this.count, -1) + delete(itemMap, hash) + deletedHashList = append(deletedHashList, hash) + + count-- + if count <= 0 { + break Loop + } + + break } - } else { - delete(this.weekItemMaps, w) } - this.locker.Unlock() + if !found { + break + } } + this.locker.Unlock() + // 执行外部操作 for _, hash := range deletedHashList { if callback != nil { @@ -451,23 +393,7 @@ func (this *MemoryList) IncreaseHit(hash string) error { item, ok := itemMap[hash] if ok { - var week = currentWeek() - - // 交换位置 - if item.Week > 0 && item.Week != week { - wm, ok := this.weekItemMaps[item.Week] - if ok { - delete(wm, hash) - } - wm, ok = this.weekItemMaps[week] - if ok { - wm[hash] = zero.New() - } else { - this.weekItemMaps[week] = map[string]zero.Zero{hash: zero.New()} - } - } - - item.IncreaseHit(week) + item.Week = currentWeek() } this.locker.Unlock() diff --git a/internal/caches/list_memory_test.go b/internal/caches/list_memory_test.go index b2125e7..158ad08 100644 --- a/internal/caches/list_memory_test.go +++ b/internal/caches/list_memory_test.go @@ -6,7 +6,9 @@ import ( "github.com/cespare/xxhash" "github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/rands" + "github.com/iwind/TeaGo/types" "math/rand" + "sort" "strconv" "testing" "time" @@ -32,7 +34,6 @@ func TestMemoryList_Add(t *testing.T) { }) t.Log(list.prefixes) logs.PrintAsJSON(list.itemMaps, t) - logs.PrintAsJSON(list.weekItemMaps, t) t.Log(list.Count()) } @@ -51,7 +52,6 @@ func TestMemoryList_Remove(t *testing.T) { }) _ = list.Remove("b") list.print(t) - logs.PrintAsJSON(list.weekItemMaps, t) t.Log(list.Count()) } @@ -83,7 +83,6 @@ func TestMemoryList_Purge(t *testing.T) { return nil }) list.print(t) - logs.PrintAsJSON(list.weekItemMaps, t) for i := 0; i < 1000; i++ { _, _ = list.Purge(100, func(hash string) error { @@ -172,27 +171,64 @@ func TestMemoryList_CleanPrefix(t *testing.T) { t.Log(time.Since(before).Seconds()*1000, "ms") } +func TestMapRandomDelete(t *testing.T) { + var countMap = map[int]int{} // k => count + + for j := 0; j < 1_000_000; j++ { + var m = map[int]bool{} + for i := 0; i < 100; i++ { + m[i] = true + } + + var count = 0 + for k := range m { + delete(m, k) + count++ + if count >= 10 { + break + } + } + + for k := range m { + countMap[k]++ + } + } + + var counts = []int{} + for _, count := range countMap { + counts = append(counts, count) + } + sort.Ints(counts) + t.Log("["+types.String(len(counts))+"]", counts) +} + func TestMemoryList_PurgeLFU(t *testing.T) { var list = NewMemoryList().(*MemoryList) - list.minWeek = 2704 var before = time.Now() defer func() { t.Log(time.Since(before).Seconds()*1000, "ms") }() - t.Log("current week:", currentWeek()) - _ = list.Add("1", &Item{}) _ = list.Add("2", &Item{}) _ = list.Add("3", &Item{}) _ = list.Add("4", &Item{}) _ = list.Add("5", &Item{}) - _ = list.Add("6", &Item{Week: 2704}) - _ = list.Add("7", &Item{Week: 2704}) - _ = list.Add("8", &Item{Week: 2705}) - err := list.PurgeLFU(2, func(hash string) error { + //_ = list.IncreaseHit("1") + //_ = list.IncreaseHit("2") + //_ = list.IncreaseHit("3") + //_ = list.IncreaseHit("4") + //_ = list.IncreaseHit("5") + + count, err := list.Count() + if err != nil { + t.Fatal(err) + } + t.Log("count items before purge:", count) + + err = list.PurgeLFU(5, func(hash string) error { t.Log("purge lfu:", hash) return nil }) @@ -201,40 +237,18 @@ func TestMemoryList_PurgeLFU(t *testing.T) { } t.Log("ok") - logs.PrintAsJSON(list.weekItemMaps, t) - t.Log(list.Count()) -} - -func TestMemoryList_IncreaseHit(t *testing.T) { - var list = NewMemoryList().(*MemoryList) - var item = &Item{} - item.Week = 2705 - item.Week2Hits = 100 - - _ = list.Add("a", &Item{}) - _ = list.Add("a", item) - t.Log("hits1:", item.Week1Hits, "hits2:", item.Week2Hits, "week:", item.Week) - logs.PrintAsJSON(list.weekItemMaps, t) - - _ = list.IncreaseHit("a") - t.Log("hits1:", item.Week1Hits, "hits2:", item.Week2Hits, "week:", item.Week) - logs.PrintAsJSON(list.weekItemMaps, t) - - _ = list.IncreaseHit("a") - t.Log("hits1:", item.Week1Hits, "hits2:", item.Week2Hits, "week:", item.Week) - logs.PrintAsJSON(list.weekItemMaps, t) + count, err = list.Count() + if err != nil { + t.Fatal(err) + } + t.Log("count items left:", count) } func TestMemoryList_CleanAll(t *testing.T) { var list = NewMemoryList().(*MemoryList) - var item = &Item{} - item.Week = 2705 - item.Week2Hits = 100 - _ = list.Add("a", &Item{}) _ = list.CleanAll() logs.PrintAsJSON(list.itemMaps, t) - logs.PrintAsJSON(list.weekItemMaps, t) t.Log(list.Count()) } diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 88cba21..0e53f12 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -1004,6 +1004,11 @@ func (this *FileStorage) purgeLoop() { var lfuFreePercent = this.policy.PersistenceLFUFreePercent if lfuFreePercent <= 0 { lfuFreePercent = 5 + + // TB级别 + if capacityBytes>>30 > 1000 { + lfuFreePercent = 3 + } } var hasFullDisk = this.mainDiskIsFull @@ -1299,12 +1304,6 @@ func (this *FileStorage) increaseHit(key string, hash string, reader Reader) { if rands.Int(0, rate) == 0 { var memoryStorage = this.memoryStorage - var hitErr = this.list.IncreaseHit(hash) - if hitErr != nil { - // 此错误可以忽略 - remotelogs.Error("CACHE", "increase hit failed: "+hitErr.Error()) - } - // 增加到热点 // 这里不收录缓存尺寸过大的文件 if memoryStorage != nil && reader.BodySize() > 0 && reader.BodySize() < 128*sizes.M { diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index 13e7dc5..a91053c 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -12,7 +12,6 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/utils/sizes" "github.com/TeaOSLab/EdgeNode/internal/zero" "github.com/cespare/xxhash" - "github.com/iwind/TeaGo/rands" "github.com/iwind/TeaGo/types" "github.com/shirou/gopsutil/v3/load" "math" @@ -137,17 +136,6 @@ func (this *MemoryStorage) OpenReader(key string, useStale bool, isPartial bool) } this.locker.RUnlock() - // 增加点击量 - // 1/1000采样 - // TODO 考虑是否在缓存策略里设置 - if rands.Int(0, 1000) == 0 { - var hitErr = this.list.IncreaseHit(types.String(hash)) - if hitErr != nil { - // 此错误可以忽略 - remotelogs.Error("CACHE", "increase hit failed: "+hitErr.Error()) - } - } - return reader, nil } this.locker.RUnlock()