diff --git a/go.mod b/go.mod index 61aaceb..b6e2981 100644 --- a/go.mod +++ b/go.mod @@ -16,14 +16,16 @@ require ( github.com/go-ole/go-ole v1.2.4 // indirect github.com/go-yaml/yaml v2.1.0+incompatible github.com/golang/protobuf v1.5.2 - github.com/iwind/TeaGo v0.0.0-20210628135026-38575a4ab060 + github.com/iwind/TeaGo v0.0.0-20211026123858-7de7a21cad24 github.com/iwind/gofcgi v0.0.0-20210528023741-a92711d45f11 github.com/iwind/gosock v0.0.0-20210722083328-12b2d66abec3 github.com/iwind/gowebp v0.0.0-20211029040624-7331ecc78ed8 + github.com/json-iterator/go v1.1.12 // indirect github.com/jsummers/gobmp v0.0.0-20151104160322-e2ba15ffa76e // indirect github.com/lionsoul2014/ip2region v2.2.0-release+incompatible - github.com/mattn/go-sqlite3 v2.0.3+incompatible + github.com/mattn/go-sqlite3 v1.14.9 github.com/miekg/dns v1.1.43 + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/mssola/user_agent v0.5.2 github.com/pires/go-proxyproto v0.6.1 github.com/shirou/gopsutil v3.21.5+incompatible diff --git a/go.sum b/go.sum index 8a806d8..f9e95ed 100644 --- a/go.sum +++ b/go.sum @@ -78,6 +78,8 @@ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/iwind/TeaGo v0.0.0-20210628135026-38575a4ab060 h1:qdLtK4PDXxk2vMKkTWl5Fl9xqYuRCukzWAgJbLHdfOo= github.com/iwind/TeaGo v0.0.0-20210628135026-38575a4ab060/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= +github.com/iwind/TeaGo v0.0.0-20211026123858-7de7a21cad24 h1:1cGulkD2SNJJRok5OKwyhP/Ddm+PgSWKOupn0cR36/A= +github.com/iwind/TeaGo v0.0.0-20211026123858-7de7a21cad24/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= github.com/iwind/gofcgi v0.0.0-20210528023741-a92711d45f11 h1:DaQjoWZhLNxjhIXedVg4/vFEtHkZhK4IjIwsWdyzBLg= github.com/iwind/gofcgi v0.0.0-20210528023741-a92711d45f11/go.mod h1:JtbX20untAjUVjZs1ZBtq80f5rJWvwtQNRL6EnuYRnY= github.com/iwind/gosock v0.0.0-20210722083328-12b2d66abec3 h1:aBSonas7vFcgTj9u96/bWGILGv1ZbUSTLiOzcI1ZT6c= @@ -85,6 +87,8 @@ github.com/iwind/gosock v0.0.0-20210722083328-12b2d66abec3/go.mod h1:H5Q7SXwbx3a github.com/iwind/gowebp v0.0.0-20211029040624-7331ecc78ed8 h1:AojsHz9Es9B3He2MQQxeRq3TyD//o9huxUo7r1wh44g= github.com/iwind/gowebp v0.0.0-20211029040624-7331ecc78ed8/go.mod h1:QJBY2txYhLMzwLO29iB5ujDJ3s3V7DsZ582nw4Ss+tM= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jsummers/gobmp v0.0.0-20151104160322-e2ba15ffa76e h1:LvL4XsI70QxOGHed6yhQtAU34Kx3Qq2wwBzGFKY8zKk= github.com/jsummers/gobmp v0.0.0-20151104160322-e2ba15ffa76e/go.mod h1:kLgvv7o6UM+0QSf0QjAse3wReFDsb9qbZJdfexWlrQw= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -96,12 +100,18 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lionsoul2014/ip2region v2.2.0-release+incompatible h1:1qp9iks+69h7IGLazAplzS9Ca14HAxuD5c0rbFdPGy4= github.com/lionsoul2014/ip2region v2.2.0-release+incompatible/go.mod h1:+ZBN7PBoh5gG6/y0ZQ85vJDBe21WnfbRrQQwTfliJJI= +github.com/mattn/go-sqlite3 v1.14.9 h1:10HX2Td0ocZpYEjhilsuo6WWtUqttj2Kb0KtD86/KYA= +github.com/mattn/go-sqlite3 v1.14.9/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/mattn/go-sqlite3 v2.0.3+incompatible h1:gXHsfypPkaMZrKbD5209QV9jbUTJKjyR5WD3HYQSd+U= github.com/mattn/go-sqlite3 v2.0.3+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/miekg/dns v1.1.43 h1:JKfpVSCB84vrAmHzyrsxB5NAr5kLoMXZArPSw7Qlgyg= github.com/miekg/dns v1.1.43/go.mod h1:+evo5L0630/F6ca/Z9+GAqzhjGyn8/c+TBaOyfEl0V4= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mssola/user_agent v0.5.2 h1:CZkTUahjL1+OcZ5zv3kZr8QiJ8jy2H08vZIEkBeRbxo= github.com/mssola/user_agent v0.5.2/go.mod h1:TTPno8LPY3wAIEKRpAtkdMT0f8SE24pLRGPahjCH4uw= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= diff --git a/internal/caches/item.go b/internal/caches/item.go index 6589be4..3ddb39c 100644 --- a/internal/caches/item.go +++ b/internal/caches/item.go @@ -2,6 +2,7 @@ package caches import ( "github.com/TeaOSLab/EdgeNode/internal/utils" + "time" ) type ItemType = int @@ -11,6 +12,12 @@ const ( ItemTypeMemory ItemType = 2 ) +// 计算当前周 +// 不要用YW,因为需要计算两周是否临近 +func currentWeek() int32 { + return int32(time.Now().Unix() / 604800) +} + type Item struct { Type ItemType `json:"type"` Key string `json:"key"` @@ -20,6 +27,10 @@ type Item struct { MetaSize int64 `json:"metaSize"` Host string `json:"host"` // 主机名 ServerId int64 `json:"serverId"` // 服务ID + + Week1Hits int64 `json:"week1Hits"` + Week2Hits int64 `json:"week2Hits"` + Week int32 `json:"week"` } func (this *Item) IsExpired() bool { @@ -27,9 +38,23 @@ func (this *Item) IsExpired() bool { } func (this *Item) TotalSize() int64 { - return this.Size() + this.MetaSize + int64(len(this.Key)) + int64(len(this.Host)) + 64 + return this.Size() + this.MetaSize + int64(len(this.Key)) + int64(len(this.Host)) } func (this *Item) Size() int64 { return this.HeaderSize + this.BodySize } + +func (this *Item) IncreaseHit(week int32) { + if this.Week == week { + this.Week2Hits++ + } else { + if week-this.Week == 1 { + this.Week1Hits = this.Week2Hits + } else { + this.Week1Hits = 0 + } + this.Week2Hits = 1 + this.Week = week + } +} diff --git a/internal/caches/item_test.go b/internal/caches/item_test.go new file mode 100644 index 0000000..9aea702 --- /dev/null +++ b/internal/caches/item_test.go @@ -0,0 +1,82 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package caches + +import ( + "github.com/iwind/TeaGo/rands" + "github.com/iwind/TeaGo/types" + "runtime" + "testing" + "time" +) + +func TestItem_IncreaseHit(t *testing.T) { + var week = currentWeek() + + var item = &Item{} + //item.Week = 2704 + item.Week2Hits = 100 + item.IncreaseHit(week) + t.Log("week:", item.Week, "week1:", item.Week1Hits, "week2:", item.Week2Hits) + + item.IncreaseHit(week) + t.Log("week:", item.Week, "week1:", item.Week1Hits, "week2:", item.Week2Hits) +} + +func TestItems_Memory(t *testing.T) { + var stat = &runtime.MemStats{} + runtime.ReadMemStats(stat) + var memory1 = stat.HeapInuse + + var items = []*Item{} + for i := 0; i < 10_000_000; i++ { + items = append(items, &Item{ + Key: types.String(i), + }) + } + + runtime.ReadMemStats(stat) + var memory2 = stat.HeapInuse + + t.Log(memory1, memory2, (memory2-memory1)/1024/1024, "M") + + var weekItems = make(map[string]*Item, 10_000_000) + + for _, item := range items { + weekItems[item.Key] = item + } + + runtime.ReadMemStats(stat) + var memory3 = stat.HeapInuse + t.Log(memory2, memory3, (memory3-memory2)/1024/1024, "M") + + time.Sleep(1 * time.Second) + t.Log(len(items), len(weekItems)) +} + +func TestItems_Memory2(t *testing.T) { + var stat = &runtime.MemStats{} + runtime.ReadMemStats(stat) + var memory1 = stat.HeapInuse + + var items = map[int32]map[string]bool{} + for i := 0; i < 10_000_000; i++ { + var week = int32((time.Now().Unix() - int64(86400*rands.Int(0, 300))) / (86400 * 7)) + m, ok := items[week] + if !ok { + m = map[string]bool{} + items[week] = m + } + m[types.String(int64(i)*1_000_000)] = true + } + + runtime.ReadMemStats(stat) + var memory2 = stat.HeapInuse + + t.Log(memory1, memory2, (memory2-memory1)/1024/1024, "M") + + time.Sleep(1 * time.Second) + for w, i := range items { + t.Log(w, len(i)) + } +} diff --git a/internal/caches/list_file.go b/internal/caches/list_file.go index ebb30f7..7896e88 100644 --- a/internal/caches/list_file.go +++ b/internal/caches/list_file.go @@ -8,6 +8,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/ttlcache" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/iwind/TeaGo/lists" + timeutil "github.com/iwind/TeaGo/utils/time" _ "github.com/mattn/go-sqlite3" "os" "strconv" @@ -24,6 +25,7 @@ type FileList struct { onAdd func(item *Item) onRemove func(item *Item) + // cacheItems existsByHashStmt *sql.Stmt // 根据hash检查是否存在 insertStmt *sql.Stmt // 写入数据 selectByHashStmt *sql.Stmt // 使用hash查询数据 @@ -32,8 +34,15 @@ type FileList struct { purgeStmt *sql.Stmt // 清理 deleteAllStmt *sql.Stmt // 删除所有数据 + // hits + insertHitStmt *sql.Stmt // 写入数据 + increaseHitStmt *sql.Stmt // 增加点击量 + deleteHitByHashStmt *sql.Stmt // 根据hash删除数据 + lfuHitsStmt *sql.Stmt // 读取老的数据 + oldTables []string itemsTableName string + hitsTableName string isClosed bool @@ -59,6 +68,7 @@ func (this *FileList) Init() error { } this.itemsTableName = "cacheItems_v2" + this.hitsTableName = "hits" var dir = this.dir if dir == "/" { @@ -141,6 +151,23 @@ func (this *FileList) Init() error { return err } + this.insertHitStmt, err = this.db.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?)`) + + this.increaseHitStmt, err = this.db.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?) ON CONFLICT("hash") DO UPDATE SET "week1Hits"=IIF("week"=?, "week1Hits", "week2Hits"), "week2Hits"=IIF("week"=?, "week2Hits"+1, 1), "week"=?`) + if err != nil { + return err + } + + this.deleteHitByHashStmt, err = this.db.Prepare(`DELETE FROM "` + this.hitsTableName + `" WHERE "hash"=?`) + if err != nil { + return err + } + + this.lfuHitsStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.hitsTableName + `" ORDER BY "week" ASC, "week1Hits"+"week2Hits" ASC LIMIT ?`) + if err != nil { + return err + } + return nil } @@ -159,6 +186,11 @@ func (this *FileList) Add(hash string, item *Item) error { return err } + _, err = this.insertHitStmt.Exec(hash, timeutil.Format("YW")) + if err != nil { + return err + } + atomic.AddInt64(&this.total, 1) if this.onAdd != nil { @@ -253,6 +285,11 @@ func (this *FileList) Remove(hash string) error { return err } + _, err = this.deleteHitByHashStmt.Exec(hash) + if err != nil { + return err + } + atomic.AddInt64(&this.total, -1) if this.onRemove != nil { @@ -265,9 +302,9 @@ func (this *FileList) Remove(hash string) error { // Purge 清理过期的缓存 // count 每次遍历的最大数量,控制此数字可以保证每次清理的时候不用花太多时间 // callback 每次发现过期key的调用 -func (this *FileList) Purge(count int, callback func(hash string) error) error { +func (this *FileList) Purge(count int, callback func(hash string) error) (int, error) { if this.isClosed { - return nil + return 0, nil } if count <= 0 { @@ -275,11 +312,56 @@ func (this *FileList) Purge(count int, callback func(hash string) error) error { } rows, err := this.purgeStmt.Query(time.Now().Unix(), count) + if err != nil { + return 0, err + } + + hashStrings := []string{} + var countFound = 0 + for rows.Next() { + var hash string + err = rows.Scan(&hash) + if err != nil { + _ = rows.Close() + return 0, err + } + hashStrings = append(hashStrings, hash) + countFound++ + } + _ = rows.Close() // 不能使用defer,防止读写冲突 + + // 不在 rows.Next() 循环中操作是为了避免死锁 + for _, hash := range hashStrings { + err = this.Remove(hash) + if err != nil { + return 0, err + } + + err = callback(hash) + if err != nil { + return 0, err + } + } + + return countFound, nil +} + +func (this *FileList) PurgeLFU(count int, callback func(hash string) error) error { + if this.isClosed { + return nil + } + + if count <= 0 { + return nil + } + + rows, err := this.lfuHitsStmt.Query(count) if err != nil { return err } hashStrings := []string{} + var countFound = 0 for rows.Next() { var hash string err = rows.Scan(&hash) @@ -288,6 +370,7 @@ func (this *FileList) Purge(count int, callback func(hash string) error) error { return err } hashStrings = append(hashStrings, hash) + countFound++ } _ = rows.Close() // 不能使用defer,防止读写冲突 @@ -303,7 +386,6 @@ func (this *FileList) Purge(count int, callback func(hash string) error) error { return err } } - return nil } @@ -347,6 +429,13 @@ func (this *FileList) Count() (int64, error) { return atomic.LoadInt64(&this.total), nil } +// IncreaseHit 增加点击量 +func (this *FileList) IncreaseHit(hash string) error { + var week = timeutil.Format("YW") + _, err := this.increaseHitStmt.Exec(hash, week, week, week, week) + return err +} + // OnAdd 添加事件 func (this *FileList) OnAdd(f func(item *Item)) { this.onAdd = f @@ -371,6 +460,11 @@ func (this *FileList) Close() error { _ = this.purgeStmt.Close() _ = this.deleteAllStmt.Close() + _ = this.insertHitStmt.Close() + _ = this.increaseHitStmt.Close() + _ = this.deleteHitByHashStmt.Close() + _ = this.lfuHitsStmt.Close() + return this.db.Close() } return nil @@ -378,7 +472,8 @@ func (this *FileList) Close() error { // 初始化 func (this *FileList) initTables(db *sql.DB, times int) error { - _, err := db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.itemsTableName + `" ( + { + _, err := db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.itemsTableName + `" ( "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "hash" varchar(32), "key" varchar(1024), @@ -411,17 +506,46 @@ ON "` + this.itemsTableName + `" ( "serverId" ASC ); `) - if err != nil { - // 尝试删除重建 - if times < 3 { - _, dropErr := db.Exec(`DROP TABLE "` + this.itemsTableName + `"`) - if dropErr == nil { - return this.initTables(db, times+1) + if err != nil { + // 尝试删除重建 + if times < 3 { + _, dropErr := db.Exec(`DROP TABLE "` + this.itemsTableName + `"`) + if dropErr == nil { + return this.initTables(db, times+1) + } + return err } + return err } + } - return err + { + _, err := db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.hitsTableName + `" ( + "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, + "hash" varchar(32), + "week1Hits" integer DEFAULT 0, + "week2Hits" integer DEFAULT 0, + "week" varchar(6) +); + +CREATE UNIQUE INDEX IF NOT EXISTS "hits_hash" +ON "` + this.hitsTableName + `" ( + "hash" ASC +); +`) + if err != nil { + // 尝试删除重建 + if times < 3 { + _, dropErr := db.Exec(`DROP TABLE "` + this.hitsTableName + `"`) + if dropErr == nil { + return this.initTables(db, times+1) + } + return err + } + + return err + } } return nil diff --git a/internal/caches/list_file_test.go b/internal/caches/list_file_test.go index c0ec857..1d25e86 100644 --- a/internal/caches/list_file_test.go +++ b/internal/caches/list_file_test.go @@ -5,6 +5,7 @@ package caches import ( "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/rands" + "github.com/iwind/TeaGo/types" stringutil "github.com/iwind/TeaGo/utils/string" "strconv" "sync" @@ -184,7 +185,7 @@ func TestFileList_Purge(t *testing.T) { if err != nil { t.Fatal(err) } - err = list.Purge(2, func(hash string) error { + _, err = list.Purge(2, func(hash string) error { t.Log(hash) return nil }) @@ -257,6 +258,50 @@ func TestFileList_Conflict(t *testing.T) { t.Log("after exists") } +func TestFileList_IIF(t *testing.T) { + list := NewFileList(Tea.Root + "/data").(*FileList) + err := list.Init() + if err != nil { + t.Fatal(err) + } + + rows, err := list.db.Query("SELECT IIF(0, 2, 3)") + if err != nil { + t.Fatal(err) + } + defer func() { + _ = rows.Close() + }() + + if rows.Next() { + var result int + err = rows.Scan(&result) + if err != nil { + t.Fatal(err) + } + t.Log("result:", result) + } +} + +func TestFileList_IncreaseHit(t *testing.T) { + list := NewFileList(Tea.Root + "/data") + err := list.Init() + if err != nil { + t.Fatal(err) + } + var before = time.Now() + defer func() { + t.Log(time.Since(before).Seconds()*1000, "ms") + }() + for i := 0; i < 1000_000; i++ { + err = list.IncreaseHit(stringutil.Md5("abc" + types.String(i))) + } + if err != nil { + t.Fatal(err) + } + t.Log("ok") +} + func BenchmarkFileList_Exist(b *testing.B) { list := NewFileList(Tea.Root + "/data") err := list.Init() diff --git a/internal/caches/list_interface.go b/internal/caches/list_interface.go index 1ee91bf..119577f 100644 --- a/internal/caches/list_interface.go +++ b/internal/caches/list_interface.go @@ -22,7 +22,10 @@ type ListInterface interface { Remove(hash string) error // Purge 清理过期数据 - Purge(count int, callback func(hash string) error) error + Purge(count int, callback func(hash string) error) (int, error) + + // PurgeLFU 清理LFU数据 + PurgeLFU(count int, callback func(hash string) error) error // CleanAll 清除所有缓存 CleanAll() error @@ -41,4 +44,7 @@ type ListInterface interface { // Close 关闭 Close() error + + // IncreaseHit 增加点击量 + IncreaseHit(hash string) error } diff --git a/internal/caches/list_memory.go b/internal/caches/list_memory.go index ffbf74f..db9dec1 100644 --- a/internal/caches/list_memory.go +++ b/internal/caches/list_memory.go @@ -5,12 +5,19 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "testing" ) // MemoryList 内存缓存列表管理 type MemoryList struct { + count int64 + itemMaps map[string]map[string]*Item // prefix => { hash => item } + + weekItemMaps map[int32]map[string]bool // week => { hash => true } + minWeek int32 + prefixes []string locker sync.RWMutex onAdd func(item *Item) @@ -21,7 +28,9 @@ type MemoryList struct { func NewMemoryList() ListInterface { return &MemoryList{ - itemMaps: map[string]map[string]*Item{}, + itemMaps: map[string]map[string]*Item{}, + weekItemMaps: map[int32]map[string]bool{}, + minWeek: currentWeek(), } } @@ -43,11 +52,19 @@ func (this *MemoryList) Reset() error { for key := range this.itemMaps { this.itemMaps[key] = map[string]*Item{} } + this.weekItemMaps = map[int32]map[string]bool{} this.locker.Unlock() + + atomic.StoreInt64(&this.count, 0) + return nil } func (this *MemoryList) Add(hash string, item *Item) error { + if item.Week == 0 { + item.Week = currentWeek() + } + this.locker.Lock() prefix := this.prefix(hash) @@ -60,9 +77,20 @@ func (this *MemoryList) Add(hash string, item *Item) error { // 先删除,为了可以正确触发统计 oldItem, ok := itemMap[hash] if ok { + // 从week map中删除 + if oldItem.Week > 0 { + wm, ok := this.weekItemMaps[oldItem.Week] + if ok { + delete(wm, hash) + } + } + + // 回调 if this.onRemove != nil { this.onRemove(oldItem) } + } else { + atomic.AddInt64(&this.count, 1) } // 添加 @@ -71,6 +99,15 @@ func (this *MemoryList) Add(hash string, item *Item) error { } itemMap[hash] = item + + // week map + wm, ok := this.weekItemMaps[item.Week] + if ok { + wm[hash] = true + } else { + this.weekItemMaps[item.Week] = map[string]bool{hash: true} + } + this.locker.Unlock() return nil } @@ -122,7 +159,17 @@ func (this *MemoryList) Remove(hash string) error { if this.onRemove != nil { this.onRemove(item) } + + atomic.AddInt64(&this.count, -1) delete(itemMap, hash) + + // week map + if item.Week > 0 { + wm, ok := this.weekItemMaps[item.Week] + if ok { + delete(wm, hash) + } + } } this.locker.Unlock() @@ -132,7 +179,7 @@ func (this *MemoryList) Remove(hash string) error { // Purge 清理过期的缓存 // count 每次遍历的最大数量,控制此数字可以保证每次清理的时候不用花太多时间 // callback 每次发现过期key的调用 -func (this *MemoryList) Purge(count int, callback func(hash string) error) error { +func (this *MemoryList) Purge(count int, callback func(hash string) error) (int, error) { this.locker.Lock() deletedHashList := []string{} @@ -146,8 +193,9 @@ func (this *MemoryList) Purge(count int, callback func(hash string) error) error itemMap, ok := this.itemMaps[prefix] if !ok { this.locker.Unlock() - return nil + return 0, nil } + var countFound = 0 for hash, item := range itemMap { if count <= 0 { break @@ -157,14 +205,100 @@ func (this *MemoryList) Purge(count int, callback func(hash string) error) error if this.onRemove != nil { this.onRemove(item) } + + atomic.AddInt64(&this.count, -1) delete(itemMap, hash) deletedHashList = append(deletedHashList, hash) + + // week map + if item.Week > 0 { + wm, ok := this.weekItemMaps[item.Week] + if ok { + delete(wm, hash) + } + } + + countFound++ } count-- } this.locker.Unlock() + // 执行外部操作 + for _, hash := range deletedHashList { + if callback != nil { + err := callback(hash) + if err != nil { + return 0, err + } + } + } + return countFound, nil +} + +func (this *MemoryList) PurgeLFU(count int, callback func(hash string) error) error { + if count <= 0 { + return nil + } + + var week = currentWeek() + if this.minWeek > week { + this.minWeek = week + } + + var deletedHashList = []string{} + +Loop: + for w := this.minWeek; w <= week; w++ { + this.minWeek = w + + this.locker.Lock() + wm, ok := this.weekItemMaps[w] + if ok { + var wc = len(wm) + if wc == 0 { + delete(this.weekItemMaps, w) + } else { + if wc <= count { + delete(this.weekItemMaps, w) + } + + // TODO 未来支持按照点击量排序 + for hash := range wm { + count-- + + if count < 0 { + this.locker.Unlock() + break Loop + } + + delete(wm, hash) + + itemMap, ok := this.itemMaps[this.prefix(hash)] + if !ok { + continue + } + item, ok := itemMap[hash] + if !ok { + continue + } + + if this.onRemove != nil { + this.onRemove(item) + } + + atomic.AddInt64(&this.count, -1) + delete(itemMap, hash) + deletedHashList = append(deletedHashList, hash) + } + } + } else { + delete(this.weekItemMaps, w) + } + this.locker.Unlock() + } + // 执行外部操作 for _, hash := range deletedHashList { if callback != nil { @@ -174,6 +308,7 @@ func (this *MemoryList) Purge(count int, callback func(hash string) error) error } } } + return nil } @@ -206,13 +341,8 @@ func (this *MemoryList) Stat(check func(hash string) bool) (*Stat, error) { // Count 总数量 func (this *MemoryList) Count() (int64, error) { - this.locker.RLock() - var count = 0 - for _, itemMap := range this.itemMaps { - count += len(itemMap) - } - this.locker.RUnlock() - return int64(count), nil + var count = atomic.LoadInt64(&this.count) + return count, nil } // OnAdd 添加事件 @@ -229,6 +359,41 @@ func (this *MemoryList) Close() error { return nil } +// IncreaseHit 增加点击量 +func (this *MemoryList) IncreaseHit(hash string) error { + this.locker.Lock() + + itemMap, ok := this.itemMaps[this.prefix(hash)] + if !ok { + this.locker.Unlock() + return nil + } + + item, ok := itemMap[hash] + if ok { + var week = currentWeek() + + // 交换位置 + if item.Week > 0 && item.Week != week { + wm, ok := this.weekItemMaps[item.Week] + if ok { + delete(wm, hash) + } + wm, ok = this.weekItemMaps[week] + if ok { + wm[hash] = true + } else { + this.weekItemMaps[week] = map[string]bool{hash: true} + } + } + + item.IncreaseHit(week) + } + + this.locker.Unlock() + return nil +} + func (this *MemoryList) print(t *testing.T) { this.locker.Lock() for _, itemMap := range this.itemMaps { diff --git a/internal/caches/list_memory_test.go b/internal/caches/list_memory_test.go index 91bffb0..5b08df8 100644 --- a/internal/caches/list_memory_test.go +++ b/internal/caches/list_memory_test.go @@ -31,6 +31,8 @@ func TestMemoryList_Add(t *testing.T) { }) t.Log(list.prefixes) logs.PrintAsJSON(list.itemMaps, t) + logs.PrintAsJSON(list.weekItemMaps, t) + t.Log(list.Count()) } func TestMemoryList_Remove(t *testing.T) { @@ -48,6 +50,8 @@ func TestMemoryList_Remove(t *testing.T) { }) _ = list.Remove("b") list.print(t) + logs.PrintAsJSON(list.weekItemMaps, t) + t.Log(list.Count()) } func TestMemoryList_Purge(t *testing.T) { @@ -73,19 +77,22 @@ func TestMemoryList_Purge(t *testing.T) { ExpiredAt: time.Now().Unix() - 2, HeaderSize: 1024, }) - _ = list.Purge(100, func(hash string) error { + _, _ = list.Purge(100, func(hash string) error { t.Log("delete:", hash) return nil }) list.print(t) + logs.PrintAsJSON(list.weekItemMaps, t) for i := 0; i < 1000; i++ { - _ = list.Purge(100, func(hash string) error { + _, _ = list.Purge(100, func(hash string) error { t.Log("delete:", hash) return nil }) t.Log(list.purgeIndex) } + + t.Log(list.Count()) } func TestMemoryList_Purge_Large_List(t *testing.T) { @@ -139,7 +146,7 @@ func TestMemoryList_CleanPrefix(t *testing.T) { _ = list.Init() before := time.Now() for i := 0; i < 1_000_000; i++ { - key := "http://www.teaos.cn/hello/" + strconv.Itoa(i/10000) + "/" + strconv.Itoa(i) + ".html" + key := "https://www.teaos.cn/hello/" + strconv.Itoa(i/10000) + "/" + strconv.Itoa(i) + ".html" _ = list.Add(fmt.Sprintf("%d", xxhash.Sum64String(key)), &Item{ Key: key, ExpiredAt: time.Now().Unix() + 3600, @@ -150,7 +157,7 @@ func TestMemoryList_CleanPrefix(t *testing.T) { t.Log(time.Since(before).Seconds()*1000, "ms") before = time.Now() - err := list.CleanPrefix("http://www.teaos.cn/hello/10") + err := list.CleanPrefix("https://www.teaos.cn/hello/10") if err != nil { t.Fatal(err) } @@ -162,11 +169,77 @@ func TestMemoryList_CleanPrefix(t *testing.T) { t.Log(time.Since(before).Seconds()*1000, "ms") } +func TestMemoryList_PurgeLFU(t *testing.T) { + var list = NewMemoryList().(*MemoryList) + list.minWeek = 2704 + + var before = time.Now() + defer func() { + t.Log(time.Since(before).Seconds()*1000, "ms") + }() + + t.Log("current week:", currentWeek()) + + _ = list.Add("1", &Item{}) + _ = list.Add("2", &Item{}) + _ = list.Add("3", &Item{}) + _ = list.Add("4", &Item{}) + _ = list.Add("5", &Item{}) + _ = list.Add("6", &Item{Week: 2704}) + _ = list.Add("7", &Item{Week: 2704}) + _ = list.Add("8", &Item{Week: 2705}) + + err := list.PurgeLFU(2, func(hash string) error { + t.Log("purge lfu:", hash) + return nil + }) + if err != nil { + t.Fatal(err) + } + t.Log("ok") + + logs.PrintAsJSON(list.weekItemMaps, t) + t.Log(list.Count()) +} + +func TestMemoryList_IncreaseHit(t *testing.T) { + var list = NewMemoryList().(*MemoryList) + var item = &Item{} + item.Week = 2705 + item.Week2Hits = 100 + + _ = list.Add("a", &Item{}) + _ = list.Add("a", item) + t.Log("hits1:", item.Week1Hits, "hits2:", item.Week2Hits, "week:", item.Week) + logs.PrintAsJSON(list.weekItemMaps, t) + + _ = list.IncreaseHit("a") + t.Log("hits1:", item.Week1Hits, "hits2:", item.Week2Hits, "week:", item.Week) + logs.PrintAsJSON(list.weekItemMaps, t) + + _ = list.IncreaseHit("a") + t.Log("hits1:", item.Week1Hits, "hits2:", item.Week2Hits, "week:", item.Week) + logs.PrintAsJSON(list.weekItemMaps, t) +} + +func TestMemoryList_CleanAll(t *testing.T) { + var list = NewMemoryList().(*MemoryList) + var item = &Item{} + item.Week = 2705 + item.Week2Hits = 100 + + _ = list.Add("a", &Item{}) + _ = list.CleanAll() + logs.PrintAsJSON(list.itemMaps, t) + logs.PrintAsJSON(list.weekItemMaps, t) + t.Log(list.Count()) +} + func TestMemoryList_GC(t *testing.T) { list := NewMemoryList().(*MemoryList) _ = list.Init() for i := 0; i < 1_000_000; i++ { - key := "http://www.teaos.cn/hello" + strconv.Itoa(i/100000) + "/" + strconv.Itoa(i) + ".html" + key := "https://www.teaos.cn/hello" + strconv.Itoa(i/100000) + "/" + strconv.Itoa(i) + ".html" _ = list.Add(fmt.Sprintf("%d", xxhash.Sum64String(key)), &Item{ Key: key, ExpiredAt: 0, diff --git a/internal/caches/manager.go b/internal/caches/manager.go index 28133d6..66be659 100644 --- a/internal/caches/manager.go +++ b/internal/caches/manager.go @@ -135,7 +135,7 @@ func (this *Manager) NewStorageWithPolicy(policy *serverconfigs.HTTPCachePolicy) case serverconfigs.CachePolicyStorageFile: return NewFileStorage(policy) case serverconfigs.CachePolicyStorageMemory: - return NewMemoryStorage(policy) + return NewMemoryStorage(policy, nil) } return nil } diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index f758469..7ffa42f 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -11,10 +11,13 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/rands" + "github.com/iwind/TeaGo/types" stringutil "github.com/iwind/TeaGo/utils/string" "golang.org/x/text/language" "golang.org/x/text/message" "io" + "math" "os" "path/filepath" "regexp" @@ -165,12 +168,16 @@ func (this *FileStorage) Init() error { Life: this.policy.Life, MinLife: this.policy.MinLife, MaxLife: this.policy.MaxLife, + + MemoryAutoPurgeCount: this.policy.MemoryAutoPurgeCount, + MemoryAutoPurgeInterval: this.policy.MemoryAutoPurgeInterval, + MemoryLFUFreePercent: this.policy.MemoryLFUFreePercent, } err = memoryPolicy.Init() if err != nil { return err } - memoryStorage := NewMemoryStorage(memoryPolicy) + memoryStorage := NewMemoryStorage(memoryPolicy, this) err = memoryStorage.Init() if err != nil { return err @@ -227,6 +234,17 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) { return nil, err } + // 增加点击量 + // 1/1000采样 + // TODO 考虑是否在缓存策略里设置 + if rands.Int(0, 1000) == 0 { + var hitErr = this.list.IncreaseHit(hash) + if hitErr != nil { + // 此错误可以忽略 + remotelogs.Error("CACHE", "increase hit failed: "+hitErr.Error()) + } + } + isOk = true return reader, nil } @@ -398,7 +416,7 @@ func (this *FileStorage) AddToList(item *Item) { } } - item.MetaSize = SizeMeta + item.MetaSize = SizeMeta + 128 hash := stringutil.Md5(item.Key) err := this.list.Add(hash, item) if err != nil && !strings.Contains(err.Error(), "UNIQUE constraint failed") { @@ -619,7 +637,14 @@ func (this *FileStorage) initList() error { }() // 启动定时清理任务 - this.ticker = utils.NewTicker(30 * time.Second) + var autoPurgeInterval = this.policy.PersistenceAutoPurgeInterval + if autoPurgeInterval <= 0 { + autoPurgeInterval = 30 + if Tea.IsTesting() { + autoPurgeInterval = 10 + } + } + this.ticker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second) events.On(events.EventQuit, func() { remotelogs.Println("CACHE", "quit clean timer") var ticker = this.ticker @@ -730,16 +755,87 @@ func (this *FileStorage) decodeFile(path string) (*Item, error) { // 清理任务 func (this *FileStorage) purgeLoop() { - err := this.list.Purge(1000, func(hash string) error { - path := this.hashPath(hash) - err := os.Remove(path) - if err != nil && !os.IsNotExist(err) { - remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error()) + // 计算是否应该开启LFU清理 + var capacityBytes = this.policy.CapacityBytes() + var startLFU = false + var usedPercent = float32(this.TotalDiskSize()*100) / float32(capacityBytes) + var lfuFreePercent = this.policy.PersistenceLFUFreePercent + if lfuFreePercent <= 0 { + lfuFreePercent = 5 + } + if capacityBytes > 0 { + if lfuFreePercent < 100 { + if usedPercent >= 100-lfuFreePercent { + startLFU = true + } + } + } + + // 清理过期 + { + var times = 1 + + // 空闲时间多清理 + if utils.SharedFreeHoursManager.IsFreeHour() { + times = 5 + } + + // 处于LFU阈值时,多清理 + if startLFU { + times = 5 + } + + var purgeCount = this.policy.PersistenceAutoPurgeCount + if purgeCount <= 0 { + purgeCount = 1000 + } + for i := 0; i < times; i++ { + countFound, err := this.list.Purge(purgeCount, func(hash string) error { + path := this.hashPath(hash) + err := os.Remove(path) + if err != nil && !os.IsNotExist(err) { + remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error()) + } + return nil + }) + if err != nil { + remotelogs.Warn("CACHE", "purge file storage failed: "+err.Error()) + continue + } + + if countFound < purgeCount { + break + } + + time.Sleep(1 * time.Second) + } + } + + // 磁盘空间不足时,清除老旧的缓存 + if startLFU { + var total, _ = this.list.Count() + if total > 0 { + var count = types.Int(math.Ceil(float64(total) * float64(lfuFreePercent*2) / 100)) + if count > 0 { + // 限制单次清理的条数,防止占用太多系统资源 + if count > 2000 { + count = 2000 + } + + remotelogs.Println("CACHE", "LFU purge policy '"+this.policy.Name+"' id: "+types.String(this.policy.Id)+", count: "+types.String(count)) + err := this.list.PurgeLFU(count, func(hash string) error { + path := this.hashPath(hash) + err := os.Remove(path) + if err != nil && !os.IsNotExist(err) { + remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error()) + } + return nil + }) + if err != nil { + remotelogs.Warn("CACHE", "purge file storage in LFU failed: "+err.Error()) + } + } } - return nil - }) - if err != nil { - remotelogs.Warn("CACHE", "purge file storage failed: "+err.Error()) } } diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index 0bcadf9..ac5eebb 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -6,6 +6,9 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/cespare/xxhash" + "github.com/iwind/TeaGo/rands" + "github.com/iwind/TeaGo/types" + "math" "strconv" "sync" "sync/atomic" @@ -26,22 +29,37 @@ func (this *MemoryItem) IsExpired() bool { } type MemoryStorage struct { - policy *serverconfigs.HTTPCachePolicy - list ListInterface - locker *sync.RWMutex - valuesMap map[uint64]*MemoryItem - ticker *utils.Ticker - purgeDuration time.Duration + parentStorage StorageInterface + + policy *serverconfigs.HTTPCachePolicy + list ListInterface + locker *sync.RWMutex + + valuesMap map[uint64]*MemoryItem // hash => item + dirtyChan chan string // hash chan + + purgeTicker *utils.Ticker + totalSize int64 writingKeyMap map[string]bool // key => bool } -func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy) *MemoryStorage { +func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy, parentStorage StorageInterface) *MemoryStorage { + var dirtyChan chan string + if parentStorage != nil { + var queueSize = policy.MemoryAutoFlushQueueSize + if queueSize <= 0 { + queueSize = 2048 + } + dirtyChan = make(chan string, queueSize) + } return &MemoryStorage{ + parentStorage: parentStorage, policy: policy, list: NewMemoryList(), locker: &sync.RWMutex{}, valuesMap: map[uint64]*MemoryItem{}, + dirtyChan: dirtyChan, writingKeyMap: map[string]bool{}, } } @@ -57,18 +75,26 @@ func (this *MemoryStorage) Init() error { atomic.AddInt64(&this.totalSize, -item.TotalSize()) }) - if this.purgeDuration <= 0 { - this.purgeDuration = 10 * time.Second + var autoPurgeInterval = this.policy.MemoryAutoPurgeInterval + if autoPurgeInterval <= 0 { + autoPurgeInterval = 5 } // 启动定时清理任务 - this.ticker = utils.NewTicker(this.purgeDuration) + this.purgeTicker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second) go func() { - for this.ticker.Next() { + for this.purgeTicker.Next() { this.purgeLoop() } }() + // 启动定时Flush memory to disk任务 + go func() { + for hash := range this.dirtyChan { + this.flushItem(hash) + } + }() + return nil } @@ -91,6 +117,18 @@ func (this *MemoryStorage) OpenReader(key string) (Reader, error) { return nil, err } this.locker.RUnlock() + + // 增加点击量 + // 1/1000采样 + // TODO 考虑是否在缓存策略里设置 + if rands.Int(0, 1000) == 0 { + var hitErr = this.list.IncreaseHit(types.String(hash)) + if hitErr != nil { + // 此错误可以忽略 + remotelogs.Error("CACHE", "increase hit failed: "+hitErr.Error()) + } + } + return reader, nil } this.locker.RUnlock() @@ -145,7 +183,7 @@ func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) ( } isWriting = true - return NewMemoryWriter(this.valuesMap, key, expiredAt, status, this.locker, func() { + return NewMemoryWriter(this, key, expiredAt, status, func() { this.locker.Lock() delete(this.writingKeyMap, key) this.locker.Unlock() @@ -210,8 +248,12 @@ func (this *MemoryStorage) Stop() { this.valuesMap = map[uint64]*MemoryItem{} this.writingKeyMap = map[string]bool{} _ = this.list.Reset() - if this.ticker != nil { - this.ticker.Stop() + if this.purgeTicker != nil { + this.purgeTicker.Stop() + } + + if this.parentStorage != nil && this.dirtyChan != nil { + close(this.dirtyChan) } _ = this.list.Close() @@ -228,7 +270,7 @@ func (this *MemoryStorage) Policy() *serverconfigs.HTTPCachePolicy { // AddToList 将缓存添加到列表 func (this *MemoryStorage) AddToList(item *Item) { - item.MetaSize = int64(len(item.Key)) + 32 /** 32是我们评估的数据结构的长度 **/ + item.MetaSize = int64(len(item.Key)) + 128 /** 128是我们评估的数据结构的长度 **/ hash := fmt.Sprintf("%d", this.hash(item.Key)) _ = this.list.Add(hash, item) } @@ -250,7 +292,28 @@ func (this *MemoryStorage) hash(key string) uint64 { // 清理任务 func (this *MemoryStorage) purgeLoop() { - _ = this.list.Purge(2048, func(hash string) error { + // 计算是否应该开启LFU清理 + var capacityBytes = this.policy.CapacityBytes() + var startLFU = false + var usedPercent = float32(this.TotalMemorySize()*100) / float32(capacityBytes) + var lfuFreePercent = this.policy.MemoryLFUFreePercent + if lfuFreePercent <= 0 { + lfuFreePercent = 5 + } + if capacityBytes > 0 { + if lfuFreePercent < 100 { + if usedPercent >= 100-lfuFreePercent { + startLFU = true + } + } + } + + // 清理过期 + var purgeCount = this.policy.MemoryAutoPurgeCount + if purgeCount <= 0 { + purgeCount = 2000 + } + _, _ = this.list.Purge(purgeCount, func(hash string) error { uintHash, err := strconv.ParseUint(hash, 10, 64) if err == nil { this.locker.Lock() @@ -259,6 +322,92 @@ func (this *MemoryStorage) purgeLoop() { } return nil }) + + // LFU + if startLFU { + var total, _ = this.list.Count() + if total > 0 { + var count = types.Int(math.Ceil(float64(total) * float64(lfuFreePercent * 2) / 100)) + if count > 0 { + // 限制单次清理的条数,防止占用太多系统资源 + if count > 2000 { + count = 2000 + } + + remotelogs.Println("CACHE", "LFU purge policy '"+this.policy.Name+"' id: "+types.String(this.policy.Id)+", count: "+types.String(count)) + + err := this.list.PurgeLFU(count, func(hash string) error { + uintHash, err := strconv.ParseUint(hash, 10, 64) + if err == nil { + this.locker.Lock() + delete(this.valuesMap, uintHash) + this.locker.Unlock() + } + return nil + }) + if err != nil { + remotelogs.Warn("CACHE", "purge memory storage in LFU failed: "+err.Error()) + } + } + } + } +} + +// Flush任务 +func (this *MemoryStorage) flushItem(key string) { + if this.parentStorage == nil { + return + } + var hash = this.hash(key) + + this.locker.RLock() + item, ok := this.valuesMap[hash] + this.locker.RUnlock() + + if !ok { + return + } + if !item.IsDone || item.IsExpired() { + return + } + + writer, err := this.parentStorage.OpenWriter(key, item.ExpiredAt, item.Status) + if err != nil { + if !CanIgnoreErr(err) { + remotelogs.Error("CACHE", "flush items failed: open writer failed: "+err.Error()) + } + return + } + + _, err = writer.WriteHeader(item.HeaderValue) + if err != nil { + _ = writer.Discard() + remotelogs.Error("CACHE", "flush items failed: write header failed: "+err.Error()) + return + } + + _, err = writer.Write(item.BodyValue) + if err != nil { + _ = writer.Discard() + remotelogs.Error("CACHE", "flush items failed: writer body failed: "+err.Error()) + return + } + + err = writer.Close() + if err != nil { + _ = writer.Discard() + remotelogs.Error("CACHE", "flush items failed: close writer failed: "+err.Error()) + } + + this.parentStorage.AddToList(&Item{ + Type: writer.ItemType(), + Key: key, + ExpiredAt: item.ExpiredAt, + HeaderSize: writer.HeaderSize(), + BodySize: writer.BodySize(), + }) + + return } func (this *MemoryStorage) memoryCapacityBytes() int64 { diff --git a/internal/caches/storage_memory_test.go b/internal/caches/storage_memory_test.go index 1128fb8..071c54b 100644 --- a/internal/caches/storage_memory_test.go +++ b/internal/caches/storage_memory_test.go @@ -230,8 +230,9 @@ func TestMemoryStorage_Purge(t *testing.T) { } func TestMemoryStorage_Expire(t *testing.T) { - storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}) - storage.purgeDuration = 5 * time.Second + storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{ + MemoryAutoPurgeInterval: 5, + }) err := storage.Init() if err != nil { t.Fatal(err) diff --git a/internal/caches/writer_memory.go b/internal/caches/writer_memory.go index 1b7b51e..e0920bd 100644 --- a/internal/caches/writer_memory.go +++ b/internal/caches/writer_memory.go @@ -2,15 +2,14 @@ package caches import ( "github.com/cespare/xxhash" - "sync" "time" ) type MemoryWriter struct { + storage *MemoryStorage + key string expiredAt int64 - m map[uint64]*MemoryItem - locker *sync.RWMutex headerSize int64 bodySize int64 status int @@ -20,12 +19,11 @@ type MemoryWriter struct { endFunc func() } -func NewMemoryWriter(m map[uint64]*MemoryItem, key string, expiredAt int64, status int, locker *sync.RWMutex, endFunc func()) *MemoryWriter { +func NewMemoryWriter(memoryStorage *MemoryStorage, key string, expiredAt int64, status int, endFunc func()) *MemoryWriter { w := &MemoryWriter{ - m: m, + storage: memoryStorage, key: key, expiredAt: expiredAt, - locker: locker, item: &MemoryItem{ ExpiredAt: expiredAt, ModifiedAt: time.Now().Unix(), @@ -72,10 +70,17 @@ func (this *MemoryWriter) Close() error { return nil } - this.locker.Lock() + this.storage.locker.Lock() this.item.IsDone = true - this.m[this.hash] = this.item - this.locker.Unlock() + this.storage.valuesMap[this.hash] = this.item + if this.storage.parentStorage != nil { + select { + case this.storage.dirtyChan <- this.key: + default: + + } + } + this.storage.locker.Unlock() return nil } @@ -85,9 +90,9 @@ func (this *MemoryWriter) Discard() error { // 需要在Locker之外 defer this.endFunc() - this.locker.Lock() - delete(this.m, this.hash) - this.locker.Unlock() + this.storage.locker.Lock() + delete(this.storage.valuesMap, this.hash) + this.storage.locker.Unlock() return nil } diff --git a/internal/const/vars.go b/internal/const/vars.go new file mode 100644 index 0000000..be7446f --- /dev/null +++ b/internal/const/vars.go @@ -0,0 +1,10 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package teaconst + +var ( + // 流量统计 + + InTrafficBytes = uint64(0) + OutTrafficBytes = uint64(0) +) diff --git a/internal/nodes/client_conn.go b/internal/nodes/client_conn.go index f52122a..e474dcb 100644 --- a/internal/nodes/client_conn.go +++ b/internal/nodes/client_conn.go @@ -4,6 +4,7 @@ package nodes import ( "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" + teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/monitor" "github.com/iwind/TeaGo/maps" @@ -12,10 +13,6 @@ import ( "time" ) -// 流量统计 -var inTrafficBytes = uint64(0) -var outTrafficBytes = uint64(0) - // 发送监控流量 func init() { events.On(events.EventStart, func() { @@ -23,20 +20,20 @@ func init() { go func() { for range ticker.C { // 加入到数据队列中 - if inTrafficBytes > 0 { + if teaconst.InTrafficBytes > 0 { monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemTrafficIn, maps.Map{ - "total": inTrafficBytes, + "total": teaconst.InTrafficBytes, }) } - if outTrafficBytes > 0 { + if teaconst.OutTrafficBytes > 0 { monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemTrafficOut, maps.Map{ - "total": outTrafficBytes, + "total": teaconst.OutTrafficBytes, }) } // 重置数据 - atomic.StoreUint64(&inTrafficBytes, 0) - atomic.StoreUint64(&outTrafficBytes, 0) + atomic.StoreUint64(&teaconst.InTrafficBytes, 0) + atomic.StoreUint64(&teaconst.OutTrafficBytes, 0) } }() }) @@ -63,7 +60,7 @@ func NewClientConn(conn net.Conn, quickClose bool) net.Conn { func (this *ClientConn) Read(b []byte) (n int, err error) { n, err = this.rawConn.Read(b) if n > 0 { - atomic.AddUint64(&inTrafficBytes, uint64(n)) + atomic.AddUint64(&teaconst.InTrafficBytes, uint64(n)) } return } @@ -71,7 +68,7 @@ func (this *ClientConn) Read(b []byte) (n int, err error) { func (this *ClientConn) Write(b []byte) (n int, err error) { n, err = this.rawConn.Write(b) if n > 0 { - atomic.AddUint64(&outTrafficBytes, uint64(n)) + atomic.AddUint64(&teaconst.OutTrafficBytes, uint64(n)) } return } diff --git a/internal/nodes/http_request_reverse_proxy.go b/internal/nodes/http_request_reverse_proxy.go index db5dfc5..61fbdba 100644 --- a/internal/nodes/http_request_reverse_proxy.go +++ b/internal/nodes/http_request_reverse_proxy.go @@ -281,7 +281,7 @@ func (this *HTTPRequest) doReverseProxy() { closeErr := resp.Body.Close() if closeErr != nil { - if !this.canIgnore(err) { + if !this.canIgnore(closeErr) { remotelogs.Warn("HTTP_REQUEST_REVERSE_PROXY", closeErr.Error()) } } diff --git a/internal/nodes/node_status_executor.go b/internal/nodes/node_status_executor.go index 7d0deff..f7f6ef2 100644 --- a/internal/nodes/node_status_executor.go +++ b/internal/nodes/node_status_executor.go @@ -68,8 +68,8 @@ func (this *NodeStatusExecutor) update() { status.ConnectionCount = sharedListenerManager.TotalActiveConnections() status.CacheTotalDiskSize = caches.SharedManager.TotalDiskSize() status.CacheTotalMemorySize = caches.SharedManager.TotalMemorySize() - status.TrafficInBytes = inTrafficBytes - status.TrafficOutBytes = outTrafficBytes + status.TrafficInBytes = teaconst.InTrafficBytes + status.TrafficOutBytes = teaconst.OutTrafficBytes // 记录监控数据 monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemConnections, maps.Map{ diff --git a/internal/utils/free_hours_manager.go b/internal/utils/free_hours_manager.go new file mode 100644 index 0000000..fb22523 --- /dev/null +++ b/internal/utils/free_hours_manager.go @@ -0,0 +1,172 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package utils + +import ( + teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "github.com/TeaOSLab/EdgeNode/internal/events" + "sort" + "sync" + "sync/atomic" + "time" +) + +var SharedFreeHoursManager = NewFreeHoursManager() + +func init() { + events.On(events.EventLoaded, func() { + go SharedFreeHoursManager.Start() + }) +} + +// FreeHoursManager 计算节点空闲时间 +// 以便于我们在空闲时间执行高强度的任务,如果清理缓存等 +type FreeHoursManager struct { + dayTrafficMap map[int][24]uint64 // day => [ traffic bytes ] + lastBytes uint64 + + freeHours []int + count int + + locker sync.Mutex +} + +func NewFreeHoursManager() *FreeHoursManager { + return &FreeHoursManager{dayTrafficMap: map[int][24]uint64{}, count: 3} +} + +func (this *FreeHoursManager) Start() { + var ticker = time.NewTicker(30 * time.Minute) + for range ticker.C { + this.Update(atomic.LoadUint64(&teaconst.InTrafficBytes)) + } +} + +func (this *FreeHoursManager) Update(bytes uint64) { + if this.count <= 0 { + this.count = 3 + } + + if this.lastBytes == 0 { + this.lastBytes = bytes + } else { + // 记录流量 + var deltaBytes = bytes - this.lastBytes + var now = time.Now() + var day = now.Day() + var hour = now.Hour() + traffic, ok := this.dayTrafficMap[day] + if ok { + traffic[hour] += deltaBytes + } else { + var traffic = [24]uint64{} + traffic[hour] += deltaBytes + this.dayTrafficMap[day] = traffic + } + + this.lastBytes = bytes + + // 计算空闲时间 + var result = [24]uint64{} + var hasData = false + for trafficDay, trafficArray := range this.dayTrafficMap { + // 当天的不算 + if trafficDay == day { + continue + } + + // 查看最近5天的 + if (day > trafficDay && day-trafficDay <= 5) || (day < trafficDay && trafficDay-day >= 26) { + var weights = this.sortUintArrayWeights(trafficArray) + for k, v := range weights { + result[k] += v + } + hasData = true + } + } + if hasData { + var freeHours = this.sortUintArrayIndexes(result) + this.locker.Lock() + this.freeHours = freeHours[:this.count] // 取前N个小时作为空闲时间 + this.locker.Unlock() + } + } +} + +func (this *FreeHoursManager) IsFreeHour() bool { + this.locker.Lock() + defer this.locker.Unlock() + + if len(this.freeHours) == 0 { + return false + } + + var hour = time.Now().Hour() + for _, h := range this.freeHours { + if h == hour { + return true + } + } + return false +} + +// 对数组进行排序,并返回权重 +func (this *FreeHoursManager) sortUintArrayWeights(arr [24]uint64) [24]uint64 { + var l = []map[string]interface{}{} + for k, v := range arr { + l = append(l, map[string]interface{}{ + "k": k, + "v": v, + }) + } + sort.Slice(l, func(i, j int) bool { + var m1 = l[i] + var v1 = m1["v"].(uint64) + + var m2 = l[j] + var v2 = m2["v"].(uint64) + + return v1 < v2 + }) + + var result = [24]uint64{} + for k, v := range l { + if k < this.count { + k = 0 + } else { + k = 1 + } + result[v["k"].(int)] = v["v"].(uint64) + } + + return result +} + +// 对数组进行排序,并返回索引 +func (this *FreeHoursManager) sortUintArrayIndexes(arr [24]uint64) [24]int { + var l = []map[string]interface{}{} + for k, v := range arr { + l = append(l, map[string]interface{}{ + "k": k, + "v": v, + }) + } + sort.Slice(l, func(i, j int) bool { + var m1 = l[i] + var v1 = m1["v"].(uint64) + + var m2 = l[j] + var v2 = m2["v"].(uint64) + + return v1 < v2 + }) + + var result = [24]int{} + var i = 0 + for _, v := range l { + result[i] = v["k"].(int) + i++ + } + + return result +} diff --git a/internal/utils/free_hours_manager_test.go b/internal/utils/free_hours_manager_test.go new file mode 100644 index 0000000..777f0e1 --- /dev/null +++ b/internal/utils/free_hours_manager_test.go @@ -0,0 +1,49 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package utils + +import ( + "testing" + "time" +) + +func TestFreeHoursManager_Update(t *testing.T) { + var manager = NewFreeHoursManager() + manager.Update(111) + + manager.dayTrafficMap[1] = [24]uint64{1, 1, 1, 1, 0, 0, 0, 1, 0, 1, 1, 1, 1} + manager.dayTrafficMap[2] = [24]uint64{0, 0, 1, 0, 1, 1, 1, 0, 0} + manager.dayTrafficMap[3] = [24]uint64{0, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1} + manager.dayTrafficMap[4] = [24]uint64{0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1} + manager.dayTrafficMap[5] = [24]uint64{0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1} + manager.dayTrafficMap[6] = [24]uint64{0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1} + manager.dayTrafficMap[7] = [24]uint64{} + manager.dayTrafficMap[8] = [24]uint64{} + manager.dayTrafficMap[9] = [24]uint64{} + manager.dayTrafficMap[10] = [24]uint64{1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1} + manager.dayTrafficMap[11] = [24]uint64{1} + manager.dayTrafficMap[12] = [24]uint64{1} + manager.dayTrafficMap[13] = [24]uint64{1} + manager.dayTrafficMap[14] = [24]uint64{} + manager.dayTrafficMap[15] = [24]uint64{} + manager.dayTrafficMap[16] = [24]uint64{} + manager.dayTrafficMap[25] = [24]uint64{} + manager.dayTrafficMap[26] = [24]uint64{} + manager.dayTrafficMap[27] = [24]uint64{} + manager.dayTrafficMap[28] = [24]uint64{} + manager.dayTrafficMap[29] = [24]uint64{} + manager.dayTrafficMap[30] = [24]uint64{} + manager.dayTrafficMap[31] = [24]uint64{} + + var before = time.Now() + manager.Update(222) + t.Log(manager.freeHours) + t.Log(manager.IsFreeHour()) + t.Log(time.Since(before).Seconds()*1000, "ms") +} + +func TestFreeHoursManager_SortArray(t *testing.T) { + var manager = NewFreeHoursManager() + t.Log(manager.sortUintArrayWeights([24]uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 109, 10, 11, 12, 130, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23})) + t.Log(manager.sortUintArrayIndexes([24]uint64{1, 2, 3, 5, 4, 0, 100})) +}