缓存策略实现LFU算法/实现内存缓存自动Flush数据到磁盘

This commit is contained in:
GoEdgeLab
2021-11-13 21:30:24 +08:00
parent 188fe12a0b
commit 07cb2bb303
20 changed files with 1100 additions and 89 deletions

6
go.mod
View File

@@ -16,14 +16,16 @@ require (
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/go-yaml/yaml v2.1.0+incompatible
github.com/golang/protobuf v1.5.2
github.com/iwind/TeaGo v0.0.0-20210628135026-38575a4ab060
github.com/iwind/TeaGo v0.0.0-20211026123858-7de7a21cad24
github.com/iwind/gofcgi v0.0.0-20210528023741-a92711d45f11
github.com/iwind/gosock v0.0.0-20210722083328-12b2d66abec3
github.com/iwind/gowebp v0.0.0-20211029040624-7331ecc78ed8
github.com/json-iterator/go v1.1.12 // indirect
github.com/jsummers/gobmp v0.0.0-20151104160322-e2ba15ffa76e // indirect
github.com/lionsoul2014/ip2region v2.2.0-release+incompatible
github.com/mattn/go-sqlite3 v2.0.3+incompatible
github.com/mattn/go-sqlite3 v1.14.9
github.com/miekg/dns v1.1.43
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/mssola/user_agent v0.5.2
github.com/pires/go-proxyproto v0.6.1
github.com/shirou/gopsutil v3.21.5+incompatible

10
go.sum
View File

@@ -78,6 +78,8 @@ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/iwind/TeaGo v0.0.0-20210628135026-38575a4ab060 h1:qdLtK4PDXxk2vMKkTWl5Fl9xqYuRCukzWAgJbLHdfOo=
github.com/iwind/TeaGo v0.0.0-20210628135026-38575a4ab060/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
github.com/iwind/TeaGo v0.0.0-20211026123858-7de7a21cad24 h1:1cGulkD2SNJJRok5OKwyhP/Ddm+PgSWKOupn0cR36/A=
github.com/iwind/TeaGo v0.0.0-20211026123858-7de7a21cad24/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
github.com/iwind/gofcgi v0.0.0-20210528023741-a92711d45f11 h1:DaQjoWZhLNxjhIXedVg4/vFEtHkZhK4IjIwsWdyzBLg=
github.com/iwind/gofcgi v0.0.0-20210528023741-a92711d45f11/go.mod h1:JtbX20untAjUVjZs1ZBtq80f5rJWvwtQNRL6EnuYRnY=
github.com/iwind/gosock v0.0.0-20210722083328-12b2d66abec3 h1:aBSonas7vFcgTj9u96/bWGILGv1ZbUSTLiOzcI1ZT6c=
@@ -85,6 +87,8 @@ github.com/iwind/gosock v0.0.0-20210722083328-12b2d66abec3/go.mod h1:H5Q7SXwbx3a
github.com/iwind/gowebp v0.0.0-20211029040624-7331ecc78ed8 h1:AojsHz9Es9B3He2MQQxeRq3TyD//o9huxUo7r1wh44g=
github.com/iwind/gowebp v0.0.0-20211029040624-7331ecc78ed8/go.mod h1:QJBY2txYhLMzwLO29iB5ujDJ3s3V7DsZ582nw4Ss+tM=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jsummers/gobmp v0.0.0-20151104160322-e2ba15ffa76e h1:LvL4XsI70QxOGHed6yhQtAU34Kx3Qq2wwBzGFKY8zKk=
github.com/jsummers/gobmp v0.0.0-20151104160322-e2ba15ffa76e/go.mod h1:kLgvv7o6UM+0QSf0QjAse3wReFDsb9qbZJdfexWlrQw=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
@@ -96,12 +100,18 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lionsoul2014/ip2region v2.2.0-release+incompatible h1:1qp9iks+69h7IGLazAplzS9Ca14HAxuD5c0rbFdPGy4=
github.com/lionsoul2014/ip2region v2.2.0-release+incompatible/go.mod h1:+ZBN7PBoh5gG6/y0ZQ85vJDBe21WnfbRrQQwTfliJJI=
github.com/mattn/go-sqlite3 v1.14.9 h1:10HX2Td0ocZpYEjhilsuo6WWtUqttj2Kb0KtD86/KYA=
github.com/mattn/go-sqlite3 v1.14.9/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-sqlite3 v2.0.3+incompatible h1:gXHsfypPkaMZrKbD5209QV9jbUTJKjyR5WD3HYQSd+U=
github.com/mattn/go-sqlite3 v2.0.3+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/miekg/dns v1.1.43 h1:JKfpVSCB84vrAmHzyrsxB5NAr5kLoMXZArPSw7Qlgyg=
github.com/miekg/dns v1.1.43/go.mod h1:+evo5L0630/F6ca/Z9+GAqzhjGyn8/c+TBaOyfEl0V4=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mssola/user_agent v0.5.2 h1:CZkTUahjL1+OcZ5zv3kZr8QiJ8jy2H08vZIEkBeRbxo=
github.com/mssola/user_agent v0.5.2/go.mod h1:TTPno8LPY3wAIEKRpAtkdMT0f8SE24pLRGPahjCH4uw=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=

View File

@@ -2,6 +2,7 @@ package caches
import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
"time"
)
type ItemType = int
@@ -11,6 +12,12 @@ const (
ItemTypeMemory ItemType = 2
)
// 计算当前周
// 不要用YW因为需要计算两周是否临近
func currentWeek() int32 {
return int32(time.Now().Unix() / 604800)
}
type Item struct {
Type ItemType `json:"type"`
Key string `json:"key"`
@@ -20,6 +27,10 @@ type Item struct {
MetaSize int64 `json:"metaSize"`
Host string `json:"host"` // 主机名
ServerId int64 `json:"serverId"` // 服务ID
Week1Hits int64 `json:"week1Hits"`
Week2Hits int64 `json:"week2Hits"`
Week int32 `json:"week"`
}
func (this *Item) IsExpired() bool {
@@ -27,9 +38,23 @@ func (this *Item) IsExpired() bool {
}
func (this *Item) TotalSize() int64 {
return this.Size() + this.MetaSize + int64(len(this.Key)) + int64(len(this.Host)) + 64
return this.Size() + this.MetaSize + int64(len(this.Key)) + int64(len(this.Host))
}
func (this *Item) Size() int64 {
return this.HeaderSize + this.BodySize
}
func (this *Item) IncreaseHit(week int32) {
if this.Week == week {
this.Week2Hits++
} else {
if week-this.Week == 1 {
this.Week1Hits = this.Week2Hits
} else {
this.Week1Hits = 0
}
this.Week2Hits = 1
this.Week = week
}
}

View File

@@ -0,0 +1,82 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package caches
import (
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
"runtime"
"testing"
"time"
)
func TestItem_IncreaseHit(t *testing.T) {
var week = currentWeek()
var item = &Item{}
//item.Week = 2704
item.Week2Hits = 100
item.IncreaseHit(week)
t.Log("week:", item.Week, "week1:", item.Week1Hits, "week2:", item.Week2Hits)
item.IncreaseHit(week)
t.Log("week:", item.Week, "week1:", item.Week1Hits, "week2:", item.Week2Hits)
}
func TestItems_Memory(t *testing.T) {
var stat = &runtime.MemStats{}
runtime.ReadMemStats(stat)
var memory1 = stat.HeapInuse
var items = []*Item{}
for i := 0; i < 10_000_000; i++ {
items = append(items, &Item{
Key: types.String(i),
})
}
runtime.ReadMemStats(stat)
var memory2 = stat.HeapInuse
t.Log(memory1, memory2, (memory2-memory1)/1024/1024, "M")
var weekItems = make(map[string]*Item, 10_000_000)
for _, item := range items {
weekItems[item.Key] = item
}
runtime.ReadMemStats(stat)
var memory3 = stat.HeapInuse
t.Log(memory2, memory3, (memory3-memory2)/1024/1024, "M")
time.Sleep(1 * time.Second)
t.Log(len(items), len(weekItems))
}
func TestItems_Memory2(t *testing.T) {
var stat = &runtime.MemStats{}
runtime.ReadMemStats(stat)
var memory1 = stat.HeapInuse
var items = map[int32]map[string]bool{}
for i := 0; i < 10_000_000; i++ {
var week = int32((time.Now().Unix() - int64(86400*rands.Int(0, 300))) / (86400 * 7))
m, ok := items[week]
if !ok {
m = map[string]bool{}
items[week] = m
}
m[types.String(int64(i)*1_000_000)] = true
}
runtime.ReadMemStats(stat)
var memory2 = stat.HeapInuse
t.Log(memory1, memory2, (memory2-memory1)/1024/1024, "M")
time.Sleep(1 * time.Second)
for w, i := range items {
t.Log(w, len(i))
}
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/ttlcache"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/lists"
timeutil "github.com/iwind/TeaGo/utils/time"
_ "github.com/mattn/go-sqlite3"
"os"
"strconv"
@@ -24,6 +25,7 @@ type FileList struct {
onAdd func(item *Item)
onRemove func(item *Item)
// cacheItems
existsByHashStmt *sql.Stmt // 根据hash检查是否存在
insertStmt *sql.Stmt // 写入数据
selectByHashStmt *sql.Stmt // 使用hash查询数据
@@ -32,8 +34,15 @@ type FileList struct {
purgeStmt *sql.Stmt // 清理
deleteAllStmt *sql.Stmt // 删除所有数据
// hits
insertHitStmt *sql.Stmt // 写入数据
increaseHitStmt *sql.Stmt // 增加点击量
deleteHitByHashStmt *sql.Stmt // 根据hash删除数据
lfuHitsStmt *sql.Stmt // 读取老的数据
oldTables []string
itemsTableName string
hitsTableName string
isClosed bool
@@ -59,6 +68,7 @@ func (this *FileList) Init() error {
}
this.itemsTableName = "cacheItems_v2"
this.hitsTableName = "hits"
var dir = this.dir
if dir == "/" {
@@ -141,6 +151,23 @@ func (this *FileList) Init() error {
return err
}
this.insertHitStmt, err = this.db.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?)`)
this.increaseHitStmt, err = this.db.Prepare(`INSERT INTO "` + this.hitsTableName + `" ("hash", "week2Hits", "week") VALUES (?, 1, ?) ON CONFLICT("hash") DO UPDATE SET "week1Hits"=IIF("week"=?, "week1Hits", "week2Hits"), "week2Hits"=IIF("week"=?, "week2Hits"+1, 1), "week"=?`)
if err != nil {
return err
}
this.deleteHitByHashStmt, err = this.db.Prepare(`DELETE FROM "` + this.hitsTableName + `" WHERE "hash"=?`)
if err != nil {
return err
}
this.lfuHitsStmt, err = this.db.Prepare(`SELECT "hash" FROM "` + this.hitsTableName + `" ORDER BY "week" ASC, "week1Hits"+"week2Hits" ASC LIMIT ?`)
if err != nil {
return err
}
return nil
}
@@ -159,6 +186,11 @@ func (this *FileList) Add(hash string, item *Item) error {
return err
}
_, err = this.insertHitStmt.Exec(hash, timeutil.Format("YW"))
if err != nil {
return err
}
atomic.AddInt64(&this.total, 1)
if this.onAdd != nil {
@@ -253,6 +285,11 @@ func (this *FileList) Remove(hash string) error {
return err
}
_, err = this.deleteHitByHashStmt.Exec(hash)
if err != nil {
return err
}
atomic.AddInt64(&this.total, -1)
if this.onRemove != nil {
@@ -265,9 +302,9 @@ func (this *FileList) Remove(hash string) error {
// Purge 清理过期的缓存
// count 每次遍历的最大数量,控制此数字可以保证每次清理的时候不用花太多时间
// callback 每次发现过期key的调用
func (this *FileList) Purge(count int, callback func(hash string) error) error {
func (this *FileList) Purge(count int, callback func(hash string) error) (int, error) {
if this.isClosed {
return nil
return 0, nil
}
if count <= 0 {
@@ -275,11 +312,56 @@ func (this *FileList) Purge(count int, callback func(hash string) error) error {
}
rows, err := this.purgeStmt.Query(time.Now().Unix(), count)
if err != nil {
return 0, err
}
hashStrings := []string{}
var countFound = 0
for rows.Next() {
var hash string
err = rows.Scan(&hash)
if err != nil {
_ = rows.Close()
return 0, err
}
hashStrings = append(hashStrings, hash)
countFound++
}
_ = rows.Close() // 不能使用defer防止读写冲突
// 不在 rows.Next() 循环中操作是为了避免死锁
for _, hash := range hashStrings {
err = this.Remove(hash)
if err != nil {
return 0, err
}
err = callback(hash)
if err != nil {
return 0, err
}
}
return countFound, nil
}
func (this *FileList) PurgeLFU(count int, callback func(hash string) error) error {
if this.isClosed {
return nil
}
if count <= 0 {
return nil
}
rows, err := this.lfuHitsStmt.Query(count)
if err != nil {
return err
}
hashStrings := []string{}
var countFound = 0
for rows.Next() {
var hash string
err = rows.Scan(&hash)
@@ -288,6 +370,7 @@ func (this *FileList) Purge(count int, callback func(hash string) error) error {
return err
}
hashStrings = append(hashStrings, hash)
countFound++
}
_ = rows.Close() // 不能使用defer防止读写冲突
@@ -303,7 +386,6 @@ func (this *FileList) Purge(count int, callback func(hash string) error) error {
return err
}
}
return nil
}
@@ -347,6 +429,13 @@ func (this *FileList) Count() (int64, error) {
return atomic.LoadInt64(&this.total), nil
}
// IncreaseHit 增加点击量
func (this *FileList) IncreaseHit(hash string) error {
var week = timeutil.Format("YW")
_, err := this.increaseHitStmt.Exec(hash, week, week, week, week)
return err
}
// OnAdd 添加事件
func (this *FileList) OnAdd(f func(item *Item)) {
this.onAdd = f
@@ -371,6 +460,11 @@ func (this *FileList) Close() error {
_ = this.purgeStmt.Close()
_ = this.deleteAllStmt.Close()
_ = this.insertHitStmt.Close()
_ = this.increaseHitStmt.Close()
_ = this.deleteHitByHashStmt.Close()
_ = this.lfuHitsStmt.Close()
return this.db.Close()
}
return nil
@@ -378,6 +472,7 @@ func (this *FileList) Close() error {
// 初始化
func (this *FileList) initTables(db *sql.DB, times int) error {
{
_, err := db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.itemsTableName + `" (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"hash" varchar(32),
@@ -423,6 +518,35 @@ ON "` + this.itemsTableName + `" (
return err
}
}
{
_, err := db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.hitsTableName + `" (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"hash" varchar(32),
"week1Hits" integer DEFAULT 0,
"week2Hits" integer DEFAULT 0,
"week" varchar(6)
);
CREATE UNIQUE INDEX IF NOT EXISTS "hits_hash"
ON "` + this.hitsTableName + `" (
"hash" ASC
);
`)
if err != nil {
// 尝试删除重建
if times < 3 {
_, dropErr := db.Exec(`DROP TABLE "` + this.hitsTableName + `"`)
if dropErr == nil {
return this.initTables(db, times+1)
}
return err
}
return err
}
}
return nil
}

View File

@@ -5,6 +5,7 @@ package caches
import (
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
stringutil "github.com/iwind/TeaGo/utils/string"
"strconv"
"sync"
@@ -184,7 +185,7 @@ func TestFileList_Purge(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = list.Purge(2, func(hash string) error {
_, err = list.Purge(2, func(hash string) error {
t.Log(hash)
return nil
})
@@ -257,6 +258,50 @@ func TestFileList_Conflict(t *testing.T) {
t.Log("after exists")
}
func TestFileList_IIF(t *testing.T) {
list := NewFileList(Tea.Root + "/data").(*FileList)
err := list.Init()
if err != nil {
t.Fatal(err)
}
rows, err := list.db.Query("SELECT IIF(0, 2, 3)")
if err != nil {
t.Fatal(err)
}
defer func() {
_ = rows.Close()
}()
if rows.Next() {
var result int
err = rows.Scan(&result)
if err != nil {
t.Fatal(err)
}
t.Log("result:", result)
}
}
func TestFileList_IncreaseHit(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
}
var before = time.Now()
defer func() {
t.Log(time.Since(before).Seconds()*1000, "ms")
}()
for i := 0; i < 1000_000; i++ {
err = list.IncreaseHit(stringutil.Md5("abc" + types.String(i)))
}
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}
func BenchmarkFileList_Exist(b *testing.B) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()

View File

@@ -22,7 +22,10 @@ type ListInterface interface {
Remove(hash string) error
// Purge 清理过期数据
Purge(count int, callback func(hash string) error) error
Purge(count int, callback func(hash string) error) (int, error)
// PurgeLFU 清理LFU数据
PurgeLFU(count int, callback func(hash string) error) error
// CleanAll 清除所有缓存
CleanAll() error
@@ -41,4 +44,7 @@ type ListInterface interface {
// Close 关闭
Close() error
// IncreaseHit 增加点击量
IncreaseHit(hash string) error
}

View File

@@ -5,12 +5,19 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
)
// MemoryList 内存缓存列表管理
type MemoryList struct {
count int64
itemMaps map[string]map[string]*Item // prefix => { hash => item }
weekItemMaps map[int32]map[string]bool // week => { hash => true }
minWeek int32
prefixes []string
locker sync.RWMutex
onAdd func(item *Item)
@@ -22,6 +29,8 @@ type MemoryList struct {
func NewMemoryList() ListInterface {
return &MemoryList{
itemMaps: map[string]map[string]*Item{},
weekItemMaps: map[int32]map[string]bool{},
minWeek: currentWeek(),
}
}
@@ -43,11 +52,19 @@ func (this *MemoryList) Reset() error {
for key := range this.itemMaps {
this.itemMaps[key] = map[string]*Item{}
}
this.weekItemMaps = map[int32]map[string]bool{}
this.locker.Unlock()
atomic.StoreInt64(&this.count, 0)
return nil
}
func (this *MemoryList) Add(hash string, item *Item) error {
if item.Week == 0 {
item.Week = currentWeek()
}
this.locker.Lock()
prefix := this.prefix(hash)
@@ -60,9 +77,20 @@ func (this *MemoryList) Add(hash string, item *Item) error {
// 先删除,为了可以正确触发统计
oldItem, ok := itemMap[hash]
if ok {
// 从week map中删除
if oldItem.Week > 0 {
wm, ok := this.weekItemMaps[oldItem.Week]
if ok {
delete(wm, hash)
}
}
// 回调
if this.onRemove != nil {
this.onRemove(oldItem)
}
} else {
atomic.AddInt64(&this.count, 1)
}
// 添加
@@ -71,6 +99,15 @@ func (this *MemoryList) Add(hash string, item *Item) error {
}
itemMap[hash] = item
// week map
wm, ok := this.weekItemMaps[item.Week]
if ok {
wm[hash] = true
} else {
this.weekItemMaps[item.Week] = map[string]bool{hash: true}
}
this.locker.Unlock()
return nil
}
@@ -122,7 +159,17 @@ func (this *MemoryList) Remove(hash string) error {
if this.onRemove != nil {
this.onRemove(item)
}
atomic.AddInt64(&this.count, -1)
delete(itemMap, hash)
// week map
if item.Week > 0 {
wm, ok := this.weekItemMaps[item.Week]
if ok {
delete(wm, hash)
}
}
}
this.locker.Unlock()
@@ -132,7 +179,7 @@ func (this *MemoryList) Remove(hash string) error {
// Purge 清理过期的缓存
// count 每次遍历的最大数量,控制此数字可以保证每次清理的时候不用花太多时间
// callback 每次发现过期key的调用
func (this *MemoryList) Purge(count int, callback func(hash string) error) error {
func (this *MemoryList) Purge(count int, callback func(hash string) error) (int, error) {
this.locker.Lock()
deletedHashList := []string{}
@@ -146,8 +193,9 @@ func (this *MemoryList) Purge(count int, callback func(hash string) error) error
itemMap, ok := this.itemMaps[prefix]
if !ok {
this.locker.Unlock()
return nil
return 0, nil
}
var countFound = 0
for hash, item := range itemMap {
if count <= 0 {
break
@@ -157,14 +205,100 @@ func (this *MemoryList) Purge(count int, callback func(hash string) error) error
if this.onRemove != nil {
this.onRemove(item)
}
atomic.AddInt64(&this.count, -1)
delete(itemMap, hash)
deletedHashList = append(deletedHashList, hash)
// week map
if item.Week > 0 {
wm, ok := this.weekItemMaps[item.Week]
if ok {
delete(wm, hash)
}
}
countFound++
}
count--
}
this.locker.Unlock()
// 执行外部操作
for _, hash := range deletedHashList {
if callback != nil {
err := callback(hash)
if err != nil {
return 0, err
}
}
}
return countFound, nil
}
func (this *MemoryList) PurgeLFU(count int, callback func(hash string) error) error {
if count <= 0 {
return nil
}
var week = currentWeek()
if this.minWeek > week {
this.minWeek = week
}
var deletedHashList = []string{}
Loop:
for w := this.minWeek; w <= week; w++ {
this.minWeek = w
this.locker.Lock()
wm, ok := this.weekItemMaps[w]
if ok {
var wc = len(wm)
if wc == 0 {
delete(this.weekItemMaps, w)
} else {
if wc <= count {
delete(this.weekItemMaps, w)
}
// TODO 未来支持按照点击量排序
for hash := range wm {
count--
if count < 0 {
this.locker.Unlock()
break Loop
}
delete(wm, hash)
itemMap, ok := this.itemMaps[this.prefix(hash)]
if !ok {
continue
}
item, ok := itemMap[hash]
if !ok {
continue
}
if this.onRemove != nil {
this.onRemove(item)
}
atomic.AddInt64(&this.count, -1)
delete(itemMap, hash)
deletedHashList = append(deletedHashList, hash)
}
}
} else {
delete(this.weekItemMaps, w)
}
this.locker.Unlock()
}
// 执行外部操作
for _, hash := range deletedHashList {
if callback != nil {
@@ -174,6 +308,7 @@ func (this *MemoryList) Purge(count int, callback func(hash string) error) error
}
}
}
return nil
}
@@ -206,13 +341,8 @@ func (this *MemoryList) Stat(check func(hash string) bool) (*Stat, error) {
// Count 总数量
func (this *MemoryList) Count() (int64, error) {
this.locker.RLock()
var count = 0
for _, itemMap := range this.itemMaps {
count += len(itemMap)
}
this.locker.RUnlock()
return int64(count), nil
var count = atomic.LoadInt64(&this.count)
return count, nil
}
// OnAdd 添加事件
@@ -229,6 +359,41 @@ func (this *MemoryList) Close() error {
return nil
}
// IncreaseHit 增加点击量
func (this *MemoryList) IncreaseHit(hash string) error {
this.locker.Lock()
itemMap, ok := this.itemMaps[this.prefix(hash)]
if !ok {
this.locker.Unlock()
return nil
}
item, ok := itemMap[hash]
if ok {
var week = currentWeek()
// 交换位置
if item.Week > 0 && item.Week != week {
wm, ok := this.weekItemMaps[item.Week]
if ok {
delete(wm, hash)
}
wm, ok = this.weekItemMaps[week]
if ok {
wm[hash] = true
} else {
this.weekItemMaps[week] = map[string]bool{hash: true}
}
}
item.IncreaseHit(week)
}
this.locker.Unlock()
return nil
}
func (this *MemoryList) print(t *testing.T) {
this.locker.Lock()
for _, itemMap := range this.itemMaps {

View File

@@ -31,6 +31,8 @@ func TestMemoryList_Add(t *testing.T) {
})
t.Log(list.prefixes)
logs.PrintAsJSON(list.itemMaps, t)
logs.PrintAsJSON(list.weekItemMaps, t)
t.Log(list.Count())
}
func TestMemoryList_Remove(t *testing.T) {
@@ -48,6 +50,8 @@ func TestMemoryList_Remove(t *testing.T) {
})
_ = list.Remove("b")
list.print(t)
logs.PrintAsJSON(list.weekItemMaps, t)
t.Log(list.Count())
}
func TestMemoryList_Purge(t *testing.T) {
@@ -73,19 +77,22 @@ func TestMemoryList_Purge(t *testing.T) {
ExpiredAt: time.Now().Unix() - 2,
HeaderSize: 1024,
})
_ = list.Purge(100, func(hash string) error {
_, _ = list.Purge(100, func(hash string) error {
t.Log("delete:", hash)
return nil
})
list.print(t)
logs.PrintAsJSON(list.weekItemMaps, t)
for i := 0; i < 1000; i++ {
_ = list.Purge(100, func(hash string) error {
_, _ = list.Purge(100, func(hash string) error {
t.Log("delete:", hash)
return nil
})
t.Log(list.purgeIndex)
}
t.Log(list.Count())
}
func TestMemoryList_Purge_Large_List(t *testing.T) {
@@ -139,7 +146,7 @@ func TestMemoryList_CleanPrefix(t *testing.T) {
_ = list.Init()
before := time.Now()
for i := 0; i < 1_000_000; i++ {
key := "http://www.teaos.cn/hello/" + strconv.Itoa(i/10000) + "/" + strconv.Itoa(i) + ".html"
key := "https://www.teaos.cn/hello/" + strconv.Itoa(i/10000) + "/" + strconv.Itoa(i) + ".html"
_ = list.Add(fmt.Sprintf("%d", xxhash.Sum64String(key)), &Item{
Key: key,
ExpiredAt: time.Now().Unix() + 3600,
@@ -150,7 +157,7 @@ func TestMemoryList_CleanPrefix(t *testing.T) {
t.Log(time.Since(before).Seconds()*1000, "ms")
before = time.Now()
err := list.CleanPrefix("http://www.teaos.cn/hello/10")
err := list.CleanPrefix("https://www.teaos.cn/hello/10")
if err != nil {
t.Fatal(err)
}
@@ -162,11 +169,77 @@ func TestMemoryList_CleanPrefix(t *testing.T) {
t.Log(time.Since(before).Seconds()*1000, "ms")
}
func TestMemoryList_PurgeLFU(t *testing.T) {
var list = NewMemoryList().(*MemoryList)
list.minWeek = 2704
var before = time.Now()
defer func() {
t.Log(time.Since(before).Seconds()*1000, "ms")
}()
t.Log("current week:", currentWeek())
_ = list.Add("1", &Item{})
_ = list.Add("2", &Item{})
_ = list.Add("3", &Item{})
_ = list.Add("4", &Item{})
_ = list.Add("5", &Item{})
_ = list.Add("6", &Item{Week: 2704})
_ = list.Add("7", &Item{Week: 2704})
_ = list.Add("8", &Item{Week: 2705})
err := list.PurgeLFU(2, func(hash string) error {
t.Log("purge lfu:", hash)
return nil
})
if err != nil {
t.Fatal(err)
}
t.Log("ok")
logs.PrintAsJSON(list.weekItemMaps, t)
t.Log(list.Count())
}
func TestMemoryList_IncreaseHit(t *testing.T) {
var list = NewMemoryList().(*MemoryList)
var item = &Item{}
item.Week = 2705
item.Week2Hits = 100
_ = list.Add("a", &Item{})
_ = list.Add("a", item)
t.Log("hits1:", item.Week1Hits, "hits2:", item.Week2Hits, "week:", item.Week)
logs.PrintAsJSON(list.weekItemMaps, t)
_ = list.IncreaseHit("a")
t.Log("hits1:", item.Week1Hits, "hits2:", item.Week2Hits, "week:", item.Week)
logs.PrintAsJSON(list.weekItemMaps, t)
_ = list.IncreaseHit("a")
t.Log("hits1:", item.Week1Hits, "hits2:", item.Week2Hits, "week:", item.Week)
logs.PrintAsJSON(list.weekItemMaps, t)
}
func TestMemoryList_CleanAll(t *testing.T) {
var list = NewMemoryList().(*MemoryList)
var item = &Item{}
item.Week = 2705
item.Week2Hits = 100
_ = list.Add("a", &Item{})
_ = list.CleanAll()
logs.PrintAsJSON(list.itemMaps, t)
logs.PrintAsJSON(list.weekItemMaps, t)
t.Log(list.Count())
}
func TestMemoryList_GC(t *testing.T) {
list := NewMemoryList().(*MemoryList)
_ = list.Init()
for i := 0; i < 1_000_000; i++ {
key := "http://www.teaos.cn/hello" + strconv.Itoa(i/100000) + "/" + strconv.Itoa(i) + ".html"
key := "https://www.teaos.cn/hello" + strconv.Itoa(i/100000) + "/" + strconv.Itoa(i) + ".html"
_ = list.Add(fmt.Sprintf("%d", xxhash.Sum64String(key)), &Item{
Key: key,
ExpiredAt: 0,

View File

@@ -135,7 +135,7 @@ func (this *Manager) NewStorageWithPolicy(policy *serverconfigs.HTTPCachePolicy)
case serverconfigs.CachePolicyStorageFile:
return NewFileStorage(policy)
case serverconfigs.CachePolicyStorageMemory:
return NewMemoryStorage(policy)
return NewMemoryStorage(policy, nil)
}
return nil
}

View File

@@ -11,10 +11,13 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
stringutil "github.com/iwind/TeaGo/utils/string"
"golang.org/x/text/language"
"golang.org/x/text/message"
"io"
"math"
"os"
"path/filepath"
"regexp"
@@ -165,12 +168,16 @@ func (this *FileStorage) Init() error {
Life: this.policy.Life,
MinLife: this.policy.MinLife,
MaxLife: this.policy.MaxLife,
MemoryAutoPurgeCount: this.policy.MemoryAutoPurgeCount,
MemoryAutoPurgeInterval: this.policy.MemoryAutoPurgeInterval,
MemoryLFUFreePercent: this.policy.MemoryLFUFreePercent,
}
err = memoryPolicy.Init()
if err != nil {
return err
}
memoryStorage := NewMemoryStorage(memoryPolicy)
memoryStorage := NewMemoryStorage(memoryPolicy, this)
err = memoryStorage.Init()
if err != nil {
return err
@@ -227,6 +234,17 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) {
return nil, err
}
// 增加点击量
// 1/1000采样
// TODO 考虑是否在缓存策略里设置
if rands.Int(0, 1000) == 0 {
var hitErr = this.list.IncreaseHit(hash)
if hitErr != nil {
// 此错误可以忽略
remotelogs.Error("CACHE", "increase hit failed: "+hitErr.Error())
}
}
isOk = true
return reader, nil
}
@@ -398,7 +416,7 @@ func (this *FileStorage) AddToList(item *Item) {
}
}
item.MetaSize = SizeMeta
item.MetaSize = SizeMeta + 128
hash := stringutil.Md5(item.Key)
err := this.list.Add(hash, item)
if err != nil && !strings.Contains(err.Error(), "UNIQUE constraint failed") {
@@ -619,7 +637,14 @@ func (this *FileStorage) initList() error {
}()
// 启动定时清理任务
this.ticker = utils.NewTicker(30 * time.Second)
var autoPurgeInterval = this.policy.PersistenceAutoPurgeInterval
if autoPurgeInterval <= 0 {
autoPurgeInterval = 30
if Tea.IsTesting() {
autoPurgeInterval = 10
}
}
this.ticker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second)
events.On(events.EventQuit, func() {
remotelogs.Println("CACHE", "quit clean timer")
var ticker = this.ticker
@@ -730,7 +755,42 @@ func (this *FileStorage) decodeFile(path string) (*Item, error) {
// 清理任务
func (this *FileStorage) purgeLoop() {
err := this.list.Purge(1000, func(hash string) error {
// 计算是否应该开启LFU清理
var capacityBytes = this.policy.CapacityBytes()
var startLFU = false
var usedPercent = float32(this.TotalDiskSize()*100) / float32(capacityBytes)
var lfuFreePercent = this.policy.PersistenceLFUFreePercent
if lfuFreePercent <= 0 {
lfuFreePercent = 5
}
if capacityBytes > 0 {
if lfuFreePercent < 100 {
if usedPercent >= 100-lfuFreePercent {
startLFU = true
}
}
}
// 清理过期
{
var times = 1
// 空闲时间多清理
if utils.SharedFreeHoursManager.IsFreeHour() {
times = 5
}
// 处于LFU阈值时多清理
if startLFU {
times = 5
}
var purgeCount = this.policy.PersistenceAutoPurgeCount
if purgeCount <= 0 {
purgeCount = 1000
}
for i := 0; i < times; i++ {
countFound, err := this.list.Purge(purgeCount, func(hash string) error {
path := this.hashPath(hash)
err := os.Remove(path)
if err != nil && !os.IsNotExist(err) {
@@ -740,6 +800,42 @@ func (this *FileStorage) purgeLoop() {
})
if err != nil {
remotelogs.Warn("CACHE", "purge file storage failed: "+err.Error())
continue
}
if countFound < purgeCount {
break
}
time.Sleep(1 * time.Second)
}
}
// 磁盘空间不足时,清除老旧的缓存
if startLFU {
var total, _ = this.list.Count()
if total > 0 {
var count = types.Int(math.Ceil(float64(total) * float64(lfuFreePercent*2) / 100))
if count > 0 {
// 限制单次清理的条数,防止占用太多系统资源
if count > 2000 {
count = 2000
}
remotelogs.Println("CACHE", "LFU purge policy '"+this.policy.Name+"' id: "+types.String(this.policy.Id)+", count: "+types.String(count))
err := this.list.PurgeLFU(count, func(hash string) error {
path := this.hashPath(hash)
err := os.Remove(path)
if err != nil && !os.IsNotExist(err) {
remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error())
}
return nil
})
if err != nil {
remotelogs.Warn("CACHE", "purge file storage in LFU failed: "+err.Error())
}
}
}
}
}

View File

@@ -6,6 +6,9 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/cespare/xxhash"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
"math"
"strconv"
"sync"
"sync/atomic"
@@ -26,22 +29,37 @@ func (this *MemoryItem) IsExpired() bool {
}
type MemoryStorage struct {
parentStorage StorageInterface
policy *serverconfigs.HTTPCachePolicy
list ListInterface
locker *sync.RWMutex
valuesMap map[uint64]*MemoryItem
ticker *utils.Ticker
purgeDuration time.Duration
valuesMap map[uint64]*MemoryItem // hash => item
dirtyChan chan string // hash chan
purgeTicker *utils.Ticker
totalSize int64
writingKeyMap map[string]bool // key => bool
}
func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy) *MemoryStorage {
func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy, parentStorage StorageInterface) *MemoryStorage {
var dirtyChan chan string
if parentStorage != nil {
var queueSize = policy.MemoryAutoFlushQueueSize
if queueSize <= 0 {
queueSize = 2048
}
dirtyChan = make(chan string, queueSize)
}
return &MemoryStorage{
parentStorage: parentStorage,
policy: policy,
list: NewMemoryList(),
locker: &sync.RWMutex{},
valuesMap: map[uint64]*MemoryItem{},
dirtyChan: dirtyChan,
writingKeyMap: map[string]bool{},
}
}
@@ -57,18 +75,26 @@ func (this *MemoryStorage) Init() error {
atomic.AddInt64(&this.totalSize, -item.TotalSize())
})
if this.purgeDuration <= 0 {
this.purgeDuration = 10 * time.Second
var autoPurgeInterval = this.policy.MemoryAutoPurgeInterval
if autoPurgeInterval <= 0 {
autoPurgeInterval = 5
}
// 启动定时清理任务
this.ticker = utils.NewTicker(this.purgeDuration)
this.purgeTicker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second)
go func() {
for this.ticker.Next() {
for this.purgeTicker.Next() {
this.purgeLoop()
}
}()
// 启动定时Flush memory to disk任务
go func() {
for hash := range this.dirtyChan {
this.flushItem(hash)
}
}()
return nil
}
@@ -91,6 +117,18 @@ func (this *MemoryStorage) OpenReader(key string) (Reader, error) {
return nil, err
}
this.locker.RUnlock()
// 增加点击量
// 1/1000采样
// TODO 考虑是否在缓存策略里设置
if rands.Int(0, 1000) == 0 {
var hitErr = this.list.IncreaseHit(types.String(hash))
if hitErr != nil {
// 此错误可以忽略
remotelogs.Error("CACHE", "increase hit failed: "+hitErr.Error())
}
}
return reader, nil
}
this.locker.RUnlock()
@@ -145,7 +183,7 @@ func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (
}
isWriting = true
return NewMemoryWriter(this.valuesMap, key, expiredAt, status, this.locker, func() {
return NewMemoryWriter(this, key, expiredAt, status, func() {
this.locker.Lock()
delete(this.writingKeyMap, key)
this.locker.Unlock()
@@ -210,8 +248,12 @@ func (this *MemoryStorage) Stop() {
this.valuesMap = map[uint64]*MemoryItem{}
this.writingKeyMap = map[string]bool{}
_ = this.list.Reset()
if this.ticker != nil {
this.ticker.Stop()
if this.purgeTicker != nil {
this.purgeTicker.Stop()
}
if this.parentStorage != nil && this.dirtyChan != nil {
close(this.dirtyChan)
}
_ = this.list.Close()
@@ -228,7 +270,7 @@ func (this *MemoryStorage) Policy() *serverconfigs.HTTPCachePolicy {
// AddToList 将缓存添加到列表
func (this *MemoryStorage) AddToList(item *Item) {
item.MetaSize = int64(len(item.Key)) + 32 /** 32是我们评估的数据结构的长度 **/
item.MetaSize = int64(len(item.Key)) + 128 /** 128是我们评估的数据结构的长度 **/
hash := fmt.Sprintf("%d", this.hash(item.Key))
_ = this.list.Add(hash, item)
}
@@ -250,7 +292,28 @@ func (this *MemoryStorage) hash(key string) uint64 {
// 清理任务
func (this *MemoryStorage) purgeLoop() {
_ = this.list.Purge(2048, func(hash string) error {
// 计算是否应该开启LFU清理
var capacityBytes = this.policy.CapacityBytes()
var startLFU = false
var usedPercent = float32(this.TotalMemorySize()*100) / float32(capacityBytes)
var lfuFreePercent = this.policy.MemoryLFUFreePercent
if lfuFreePercent <= 0 {
lfuFreePercent = 5
}
if capacityBytes > 0 {
if lfuFreePercent < 100 {
if usedPercent >= 100-lfuFreePercent {
startLFU = true
}
}
}
// 清理过期
var purgeCount = this.policy.MemoryAutoPurgeCount
if purgeCount <= 0 {
purgeCount = 2000
}
_, _ = this.list.Purge(purgeCount, func(hash string) error {
uintHash, err := strconv.ParseUint(hash, 10, 64)
if err == nil {
this.locker.Lock()
@@ -259,6 +322,92 @@ func (this *MemoryStorage) purgeLoop() {
}
return nil
})
// LFU
if startLFU {
var total, _ = this.list.Count()
if total > 0 {
var count = types.Int(math.Ceil(float64(total) * float64(lfuFreePercent * 2) / 100))
if count > 0 {
// 限制单次清理的条数,防止占用太多系统资源
if count > 2000 {
count = 2000
}
remotelogs.Println("CACHE", "LFU purge policy '"+this.policy.Name+"' id: "+types.String(this.policy.Id)+", count: "+types.String(count))
err := this.list.PurgeLFU(count, func(hash string) error {
uintHash, err := strconv.ParseUint(hash, 10, 64)
if err == nil {
this.locker.Lock()
delete(this.valuesMap, uintHash)
this.locker.Unlock()
}
return nil
})
if err != nil {
remotelogs.Warn("CACHE", "purge memory storage in LFU failed: "+err.Error())
}
}
}
}
}
// Flush任务
func (this *MemoryStorage) flushItem(key string) {
if this.parentStorage == nil {
return
}
var hash = this.hash(key)
this.locker.RLock()
item, ok := this.valuesMap[hash]
this.locker.RUnlock()
if !ok {
return
}
if !item.IsDone || item.IsExpired() {
return
}
writer, err := this.parentStorage.OpenWriter(key, item.ExpiredAt, item.Status)
if err != nil {
if !CanIgnoreErr(err) {
remotelogs.Error("CACHE", "flush items failed: open writer failed: "+err.Error())
}
return
}
_, err = writer.WriteHeader(item.HeaderValue)
if err != nil {
_ = writer.Discard()
remotelogs.Error("CACHE", "flush items failed: write header failed: "+err.Error())
return
}
_, err = writer.Write(item.BodyValue)
if err != nil {
_ = writer.Discard()
remotelogs.Error("CACHE", "flush items failed: writer body failed: "+err.Error())
return
}
err = writer.Close()
if err != nil {
_ = writer.Discard()
remotelogs.Error("CACHE", "flush items failed: close writer failed: "+err.Error())
}
this.parentStorage.AddToList(&Item{
Type: writer.ItemType(),
Key: key,
ExpiredAt: item.ExpiredAt,
HeaderSize: writer.HeaderSize(),
BodySize: writer.BodySize(),
})
return
}
func (this *MemoryStorage) memoryCapacityBytes() int64 {

View File

@@ -230,8 +230,9 @@ func TestMemoryStorage_Purge(t *testing.T) {
}
func TestMemoryStorage_Expire(t *testing.T) {
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{})
storage.purgeDuration = 5 * time.Second
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{
MemoryAutoPurgeInterval: 5,
})
err := storage.Init()
if err != nil {
t.Fatal(err)

View File

@@ -2,15 +2,14 @@ package caches
import (
"github.com/cespare/xxhash"
"sync"
"time"
)
type MemoryWriter struct {
storage *MemoryStorage
key string
expiredAt int64
m map[uint64]*MemoryItem
locker *sync.RWMutex
headerSize int64
bodySize int64
status int
@@ -20,12 +19,11 @@ type MemoryWriter struct {
endFunc func()
}
func NewMemoryWriter(m map[uint64]*MemoryItem, key string, expiredAt int64, status int, locker *sync.RWMutex, endFunc func()) *MemoryWriter {
func NewMemoryWriter(memoryStorage *MemoryStorage, key string, expiredAt int64, status int, endFunc func()) *MemoryWriter {
w := &MemoryWriter{
m: m,
storage: memoryStorage,
key: key,
expiredAt: expiredAt,
locker: locker,
item: &MemoryItem{
ExpiredAt: expiredAt,
ModifiedAt: time.Now().Unix(),
@@ -72,10 +70,17 @@ func (this *MemoryWriter) Close() error {
return nil
}
this.locker.Lock()
this.storage.locker.Lock()
this.item.IsDone = true
this.m[this.hash] = this.item
this.locker.Unlock()
this.storage.valuesMap[this.hash] = this.item
if this.storage.parentStorage != nil {
select {
case this.storage.dirtyChan <- this.key:
default:
}
}
this.storage.locker.Unlock()
return nil
}
@@ -85,9 +90,9 @@ func (this *MemoryWriter) Discard() error {
// 需要在Locker之外
defer this.endFunc()
this.locker.Lock()
delete(this.m, this.hash)
this.locker.Unlock()
this.storage.locker.Lock()
delete(this.storage.valuesMap, this.hash)
this.storage.locker.Unlock()
return nil
}

10
internal/const/vars.go Normal file
View File

@@ -0,0 +1,10 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package teaconst
var (
// 流量统计
InTrafficBytes = uint64(0)
OutTrafficBytes = uint64(0)
)

View File

@@ -4,6 +4,7 @@ package nodes
import (
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/monitor"
"github.com/iwind/TeaGo/maps"
@@ -12,10 +13,6 @@ import (
"time"
)
// 流量统计
var inTrafficBytes = uint64(0)
var outTrafficBytes = uint64(0)
// 发送监控流量
func init() {
events.On(events.EventStart, func() {
@@ -23,20 +20,20 @@ func init() {
go func() {
for range ticker.C {
// 加入到数据队列中
if inTrafficBytes > 0 {
if teaconst.InTrafficBytes > 0 {
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemTrafficIn, maps.Map{
"total": inTrafficBytes,
"total": teaconst.InTrafficBytes,
})
}
if outTrafficBytes > 0 {
if teaconst.OutTrafficBytes > 0 {
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemTrafficOut, maps.Map{
"total": outTrafficBytes,
"total": teaconst.OutTrafficBytes,
})
}
// 重置数据
atomic.StoreUint64(&inTrafficBytes, 0)
atomic.StoreUint64(&outTrafficBytes, 0)
atomic.StoreUint64(&teaconst.InTrafficBytes, 0)
atomic.StoreUint64(&teaconst.OutTrafficBytes, 0)
}
}()
})
@@ -63,7 +60,7 @@ func NewClientConn(conn net.Conn, quickClose bool) net.Conn {
func (this *ClientConn) Read(b []byte) (n int, err error) {
n, err = this.rawConn.Read(b)
if n > 0 {
atomic.AddUint64(&inTrafficBytes, uint64(n))
atomic.AddUint64(&teaconst.InTrafficBytes, uint64(n))
}
return
}
@@ -71,7 +68,7 @@ func (this *ClientConn) Read(b []byte) (n int, err error) {
func (this *ClientConn) Write(b []byte) (n int, err error) {
n, err = this.rawConn.Write(b)
if n > 0 {
atomic.AddUint64(&outTrafficBytes, uint64(n))
atomic.AddUint64(&teaconst.OutTrafficBytes, uint64(n))
}
return
}

View File

@@ -281,7 +281,7 @@ func (this *HTTPRequest) doReverseProxy() {
closeErr := resp.Body.Close()
if closeErr != nil {
if !this.canIgnore(err) {
if !this.canIgnore(closeErr) {
remotelogs.Warn("HTTP_REQUEST_REVERSE_PROXY", closeErr.Error())
}
}

View File

@@ -68,8 +68,8 @@ func (this *NodeStatusExecutor) update() {
status.ConnectionCount = sharedListenerManager.TotalActiveConnections()
status.CacheTotalDiskSize = caches.SharedManager.TotalDiskSize()
status.CacheTotalMemorySize = caches.SharedManager.TotalMemorySize()
status.TrafficInBytes = inTrafficBytes
status.TrafficOutBytes = outTrafficBytes
status.TrafficInBytes = teaconst.InTrafficBytes
status.TrafficOutBytes = teaconst.OutTrafficBytes
// 记录监控数据
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemConnections, maps.Map{

View File

@@ -0,0 +1,172 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package utils
import (
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"sort"
"sync"
"sync/atomic"
"time"
)
var SharedFreeHoursManager = NewFreeHoursManager()
func init() {
events.On(events.EventLoaded, func() {
go SharedFreeHoursManager.Start()
})
}
// FreeHoursManager 计算节点空闲时间
// 以便于我们在空闲时间执行高强度的任务,如果清理缓存等
type FreeHoursManager struct {
dayTrafficMap map[int][24]uint64 // day => [ traffic bytes ]
lastBytes uint64
freeHours []int
count int
locker sync.Mutex
}
func NewFreeHoursManager() *FreeHoursManager {
return &FreeHoursManager{dayTrafficMap: map[int][24]uint64{}, count: 3}
}
func (this *FreeHoursManager) Start() {
var ticker = time.NewTicker(30 * time.Minute)
for range ticker.C {
this.Update(atomic.LoadUint64(&teaconst.InTrafficBytes))
}
}
func (this *FreeHoursManager) Update(bytes uint64) {
if this.count <= 0 {
this.count = 3
}
if this.lastBytes == 0 {
this.lastBytes = bytes
} else {
// 记录流量
var deltaBytes = bytes - this.lastBytes
var now = time.Now()
var day = now.Day()
var hour = now.Hour()
traffic, ok := this.dayTrafficMap[day]
if ok {
traffic[hour] += deltaBytes
} else {
var traffic = [24]uint64{}
traffic[hour] += deltaBytes
this.dayTrafficMap[day] = traffic
}
this.lastBytes = bytes
// 计算空闲时间
var result = [24]uint64{}
var hasData = false
for trafficDay, trafficArray := range this.dayTrafficMap {
// 当天的不算
if trafficDay == day {
continue
}
// 查看最近5天的
if (day > trafficDay && day-trafficDay <= 5) || (day < trafficDay && trafficDay-day >= 26) {
var weights = this.sortUintArrayWeights(trafficArray)
for k, v := range weights {
result[k] += v
}
hasData = true
}
}
if hasData {
var freeHours = this.sortUintArrayIndexes(result)
this.locker.Lock()
this.freeHours = freeHours[:this.count] // 取前N个小时作为空闲时间
this.locker.Unlock()
}
}
}
func (this *FreeHoursManager) IsFreeHour() bool {
this.locker.Lock()
defer this.locker.Unlock()
if len(this.freeHours) == 0 {
return false
}
var hour = time.Now().Hour()
for _, h := range this.freeHours {
if h == hour {
return true
}
}
return false
}
// 对数组进行排序,并返回权重
func (this *FreeHoursManager) sortUintArrayWeights(arr [24]uint64) [24]uint64 {
var l = []map[string]interface{}{}
for k, v := range arr {
l = append(l, map[string]interface{}{
"k": k,
"v": v,
})
}
sort.Slice(l, func(i, j int) bool {
var m1 = l[i]
var v1 = m1["v"].(uint64)
var m2 = l[j]
var v2 = m2["v"].(uint64)
return v1 < v2
})
var result = [24]uint64{}
for k, v := range l {
if k < this.count {
k = 0
} else {
k = 1
}
result[v["k"].(int)] = v["v"].(uint64)
}
return result
}
// 对数组进行排序,并返回索引
func (this *FreeHoursManager) sortUintArrayIndexes(arr [24]uint64) [24]int {
var l = []map[string]interface{}{}
for k, v := range arr {
l = append(l, map[string]interface{}{
"k": k,
"v": v,
})
}
sort.Slice(l, func(i, j int) bool {
var m1 = l[i]
var v1 = m1["v"].(uint64)
var m2 = l[j]
var v2 = m2["v"].(uint64)
return v1 < v2
})
var result = [24]int{}
var i = 0
for _, v := range l {
result[i] = v["k"].(int)
i++
}
return result
}

View File

@@ -0,0 +1,49 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package utils
import (
"testing"
"time"
)
func TestFreeHoursManager_Update(t *testing.T) {
var manager = NewFreeHoursManager()
manager.Update(111)
manager.dayTrafficMap[1] = [24]uint64{1, 1, 1, 1, 0, 0, 0, 1, 0, 1, 1, 1, 1}
manager.dayTrafficMap[2] = [24]uint64{0, 0, 1, 0, 1, 1, 1, 0, 0}
manager.dayTrafficMap[3] = [24]uint64{0, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1}
manager.dayTrafficMap[4] = [24]uint64{0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1}
manager.dayTrafficMap[5] = [24]uint64{0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1}
manager.dayTrafficMap[6] = [24]uint64{0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1}
manager.dayTrafficMap[7] = [24]uint64{}
manager.dayTrafficMap[8] = [24]uint64{}
manager.dayTrafficMap[9] = [24]uint64{}
manager.dayTrafficMap[10] = [24]uint64{1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}
manager.dayTrafficMap[11] = [24]uint64{1}
manager.dayTrafficMap[12] = [24]uint64{1}
manager.dayTrafficMap[13] = [24]uint64{1}
manager.dayTrafficMap[14] = [24]uint64{}
manager.dayTrafficMap[15] = [24]uint64{}
manager.dayTrafficMap[16] = [24]uint64{}
manager.dayTrafficMap[25] = [24]uint64{}
manager.dayTrafficMap[26] = [24]uint64{}
manager.dayTrafficMap[27] = [24]uint64{}
manager.dayTrafficMap[28] = [24]uint64{}
manager.dayTrafficMap[29] = [24]uint64{}
manager.dayTrafficMap[30] = [24]uint64{}
manager.dayTrafficMap[31] = [24]uint64{}
var before = time.Now()
manager.Update(222)
t.Log(manager.freeHours)
t.Log(manager.IsFreeHour())
t.Log(time.Since(before).Seconds()*1000, "ms")
}
func TestFreeHoursManager_SortArray(t *testing.T) {
var manager = NewFreeHoursManager()
t.Log(manager.sortUintArrayWeights([24]uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 109, 10, 11, 12, 130, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23}))
t.Log(manager.sortUintArrayIndexes([24]uint64{1, 2, 3, 5, 4, 0, 100}))
}