实现自动将热点数据加载到内存中

This commit is contained in:
GoEdgeLab
2021-11-14 16:15:07 +08:00
parent 7f80e32448
commit fff7e7a95d
9 changed files with 230 additions and 22 deletions

View File

@@ -0,0 +1,10 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package caches
type HotItem struct {
Key string
ExpiresAt int64
Hits uint32
Status int
}

View File

@@ -9,6 +9,9 @@ type Reader interface {
// TypeName 类型名称 // TypeName 类型名称
TypeName() string TypeName() string
// ExpiresAt 过期时间
ExpiresAt() int64
// Status 状态码 // Status 状态码
Status() int Status() int

View File

@@ -11,6 +11,7 @@ import (
type FileReader struct { type FileReader struct {
fp *os.File fp *os.File
expiresAt int64
status int status int
headerOffset int64 headerOffset int64
headerSize int headerSize int
@@ -43,6 +44,8 @@ func (this *FileReader) Init() error {
return ErrNotFound return ErrNotFound
} }
this.expiresAt = int64(binary.BigEndian.Uint32(buf[:SizeExpiresAt]))
status := types.Int(string(buf[SizeExpiresAt : SizeExpiresAt+SizeStatus])) status := types.Int(string(buf[SizeExpiresAt : SizeExpiresAt+SizeStatus]))
if status < 100 || status > 999 { if status < 100 || status > 999 {
return errors.New("invalid status") return errors.New("invalid status")
@@ -78,6 +81,10 @@ func (this *FileReader) TypeName() string {
return "disk" return "disk"
} }
func (this *FileReader) ExpiresAt() int64 {
return this.expiresAt
}
func (this *FileReader) Status() int { func (this *FileReader) Status() int {
return this.status return this.status
} }

View File

@@ -20,6 +20,10 @@ func (this *MemoryReader) TypeName() string {
return "memory" return "memory"
} }
func (this *MemoryReader) ExpiresAt() int64 {
return this.item.ExpiredAt
}
func (this *MemoryReader) Status() int { func (this *MemoryReader) Status() int {
return this.item.Status return this.item.Status
} }

View File

@@ -22,6 +22,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"regexp" "regexp"
"sort"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@@ -40,6 +41,10 @@ const (
SizeMeta = SizeExpiresAt + SizeStatus + SizeURLLength + SizeHeaderLength + SizeBodyLength SizeMeta = SizeExpiresAt + SizeStatus + SizeURLLength + SizeHeaderLength + SizeBodyLength
) )
const (
HotItemSize = 1024
)
// FileStorage 文件缓存 // FileStorage 文件缓存
// 文件结构: // 文件结构:
// [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data] // [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data]
@@ -52,13 +57,20 @@ type FileStorage struct {
list ListInterface list ListInterface
writingKeyMap map[string]bool // key => bool writingKeyMap map[string]bool // key => bool
locker sync.RWMutex locker sync.RWMutex
ticker *utils.Ticker purgeTicker *utils.Ticker
hotMap map[string]*HotItem // key => count
hotMapLocker sync.Mutex
lastHotSize int
hotTicker *utils.Ticker
} }
func NewFileStorage(policy *serverconfigs.HTTPCachePolicy) *FileStorage { func NewFileStorage(policy *serverconfigs.HTTPCachePolicy) *FileStorage {
return &FileStorage{ return &FileStorage{
policy: policy, policy: policy,
writingKeyMap: map[string]bool{}, writingKeyMap: map[string]bool{},
hotMap: map[string]*HotItem{},
lastHotSize: -1,
} }
} }
@@ -191,8 +203,12 @@ func (this *FileStorage) Init() error {
} }
func (this *FileStorage) OpenReader(key string) (Reader, error) { func (this *FileStorage) OpenReader(key string) (Reader, error) {
return this.openReader(key, true)
}
func (this *FileStorage) openReader(key string, allowMemory bool) (Reader, error) {
// 先尝试内存缓存 // 先尝试内存缓存
if this.memoryStorage != nil { if allowMemory && this.memoryStorage != nil {
reader, err := this.memoryStorage.OpenReader(key) reader, err := this.memoryStorage.OpenReader(key)
if err == nil { if err == nil {
return reader, err return reader, err
@@ -237,12 +253,40 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) {
// 增加点击量 // 增加点击量
// 1/1000采样 // 1/1000采样
// TODO 考虑是否在缓存策略里设置 if allowMemory {
if rands.Int(0, 1000) == 0 { var rate = this.policy.PersistenceHitSampleRate
var hitErr = this.list.IncreaseHit(hash) if rate <= 0 {
if hitErr != nil { rate = 1000
// 此错误可以忽略 }
remotelogs.Error("CACHE", "increase hit failed: "+hitErr.Error()) if this.lastHotSize == 0 {
// 自动降低采样率来增加热点数据的缓存几率
rate = rate / 10
}
if rands.Int(0, rate) == 0 {
var hitErr = this.list.IncreaseHit(hash)
if hitErr != nil {
// 此错误可以忽略
remotelogs.Error("CACHE", "increase hit failed: "+hitErr.Error())
}
// 增加到热点
// 这里不收录缓存尺寸过大的文件
if this.memoryStorage != nil && reader.BodySize() > 0 && reader.BodySize() < 128*1024*1024 {
this.hotMapLocker.Lock()
hotItem, ok := this.hotMap[key]
if ok {
hotItem.Hits++
hotItem.ExpiresAt = reader.expiresAt
} else if len(this.hotMap) < HotItemSize { // 控制数量
this.hotMap[key] = &HotItem{
Key: key,
ExpiresAt: reader.ExpiresAt(),
Status: reader.Status(),
Hits: 1,
}
}
this.hotMapLocker.Unlock()
}
} }
} }
@@ -574,8 +618,11 @@ func (this *FileStorage) Stop() {
} }
_ = this.list.Reset() _ = this.list.Reset()
if this.ticker != nil { if this.purgeTicker != nil {
this.ticker.Stop() this.purgeTicker.Stop()
}
if this.hotTicker != nil {
this.hotTicker.Stop()
} }
_ = this.list.Close() _ = this.list.Close()
@@ -645,19 +692,32 @@ func (this *FileStorage) initList() error {
autoPurgeInterval = 10 autoPurgeInterval = 10
} }
} }
this.ticker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second) this.purgeTicker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second)
events.On(events.EventQuit, func() { events.On(events.EventQuit, func() {
remotelogs.Println("CACHE", "quit clean timer") remotelogs.Println("CACHE", "quit clean timer")
var ticker = this.ticker var ticker = this.purgeTicker
if ticker != nil { if ticker != nil {
ticker.Stop() ticker.Stop()
} }
}) })
go func() { go func() {
for this.ticker.Next() { for this.purgeTicker.Next() {
var tr = trackers.Begin("FILE_CACHE_STORAGE_PURGE_LOOP") trackers.Run("FILE_CACHE_STORAGE_PURGE_LOOP", func() {
this.purgeLoop() this.purgeLoop()
tr.End() })
}
}()
// 热点处理任务
this.hotTicker = utils.NewTicker(1 * time.Minute)
if Tea.IsTesting() {
this.hotTicker = utils.NewTicker(10 * time.Second)
}
go func() {
for this.hotTicker.Next() {
trackers.Run("FILE_CACHE_STORAGE_HOT_LOOP", func() {
this.hotLoop()
})
} }
}() }()
@@ -842,6 +902,102 @@ func (this *FileStorage) purgeLoop() {
} }
} }
// 热点数据任务
func (this *FileStorage) hotLoop() {
var memoryStorage = this.memoryStorage
if memoryStorage == nil {
return
}
this.hotMapLocker.Lock()
if len(this.hotMap) == 0 {
this.hotMapLocker.Unlock()
this.lastHotSize = 0
return
}
this.lastHotSize = len(this.hotMap)
var result = []*HotItem{} // [ {key: ..., hits: ...}, ... ]
for _, v := range this.hotMap {
result = append(result, v)
}
this.hotMap = map[string]*HotItem{}
this.hotMapLocker.Unlock()
// 取Top10
if len(result) > 0 {
sort.Slice(result, func(i, j int) bool {
return result[i].Hits > result[j].Hits
})
var size = 1
if len(result) < 10 {
size = 1
} else {
size = len(result) / 10
}
var buf = make([]byte, 32*1024)
for _, item := range result[:size] {
reader, err := this.openReader(item.Key, false)
if err != nil {
continue
}
if reader == nil {
continue
}
if reader.ExpiresAt() <= time.Now().Unix() {
continue
}
writer, err := this.memoryStorage.openWriter(item.Key, item.ExpiresAt, item.Status, false)
if err != nil {
if !CanIgnoreErr(err) {
remotelogs.Error("CACHE", "transfer hot item failed: "+err.Error())
}
_ = reader.Close()
continue
}
if writer == nil {
_ = reader.Close()
continue
}
err = reader.ReadHeader(buf, func(n int) (goNext bool, err error) {
_, err = writer.WriteHeader(buf[:n])
return
})
if err != nil {
_ = reader.Close()
_ = writer.Discard()
continue
}
err = reader.ReadBody(buf, func(n int) (goNext bool, err error) {
_, err = writer.Write(buf[:n])
return
})
if err != nil {
_ = reader.Close()
_ = writer.Discard()
continue
}
this.memoryStorage.AddToList(&Item{
Type: writer.ItemType(),
Key: item.Key,
ExpiredAt: item.ExpiresAt,
HeaderSize: writer.HeaderSize(),
BodySize: writer.BodySize(),
})
_ = reader.Close()
_ = writer.Close()
}
}
}
func (this *FileStorage) readToBuff(fp *os.File, buf []byte) (ok bool, err error) { func (this *FileStorage) readToBuff(fp *os.File, buf []byte) (ok bool, err error) {
n, err := fp.Read(buf) n, err := fp.Read(buf)
if err != nil { if err != nil {

View File

@@ -144,6 +144,10 @@ func (this *MemoryStorage) OpenReader(key string) (Reader, error) {
// OpenWriter 打开缓存写入器等待写入 // OpenWriter 打开缓存写入器等待写入
func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) { func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) {
return this.openWriter(key, expiredAt, status, true)
}
func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, isDirty bool) (Writer, error) {
this.locker.Lock() this.locker.Lock()
defer this.locker.Unlock() defer this.locker.Unlock()
@@ -187,7 +191,7 @@ func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (
} }
isWriting = true isWriting = true
return NewMemoryWriter(this, key, expiredAt, status, func() { return NewMemoryWriter(this, key, expiredAt, status, isDirty, func() {
this.locker.Lock() this.locker.Lock()
delete(this.writingKeyMap, key) delete(this.writingKeyMap, key)
this.locker.Unlock() this.locker.Unlock()

View File

@@ -13,13 +13,14 @@ type MemoryWriter struct {
headerSize int64 headerSize int64
bodySize int64 bodySize int64
status int status int
isDirty bool
hash uint64 hash uint64
item *MemoryItem item *MemoryItem
endFunc func() endFunc func()
} }
func NewMemoryWriter(memoryStorage *MemoryStorage, key string, expiredAt int64, status int, endFunc func()) *MemoryWriter { func NewMemoryWriter(memoryStorage *MemoryStorage, key string, expiredAt int64, status int, isDirty bool, endFunc func()) *MemoryWriter {
w := &MemoryWriter{ w := &MemoryWriter{
storage: memoryStorage, storage: memoryStorage,
key: key, key: key,
@@ -30,6 +31,7 @@ func NewMemoryWriter(memoryStorage *MemoryStorage, key string, expiredAt int64,
Status: status, Status: status,
}, },
status: status, status: status,
isDirty: isDirty,
endFunc: endFunc, endFunc: endFunc,
} }
w.hash = w.calculateHash(key) w.hash = w.calculateHash(key)
@@ -73,11 +75,13 @@ func (this *MemoryWriter) Close() error {
this.storage.locker.Lock() this.storage.locker.Lock()
this.item.IsDone = true this.item.IsDone = true
this.storage.valuesMap[this.hash] = this.item this.storage.valuesMap[this.hash] = this.item
if this.storage.parentStorage != nil { if this.isDirty {
select { if this.storage.parentStorage != nil {
case this.storage.dirtyChan <- this.key: select {
default: case this.storage.dirtyChan <- this.key:
default:
}
} }
} }
this.storage.locker.Unlock() this.storage.locker.Unlock()

View File

@@ -13,6 +13,12 @@ func Begin(label string) *tracker {
return &tracker{label: label, startTime: time.Now()} return &tracker{label: label, startTime: time.Now()}
} }
func Run(label string, f func()) {
var tr = Begin(label)
f()
tr.End()
}
func (this *tracker) End() { func (this *tracker) End() {
SharedManager.Add(this.label, time.Since(this.startTime).Seconds()*1000) SharedManager.Add(this.label, time.Since(this.startTime).Seconds()*1000)
} }

View File

@@ -6,6 +6,20 @@ import (
"time" "time"
) )
func TestRawTicker(t *testing.T) {
var ticker = time.NewTicker(2 * time.Second)
go func() {
for range ticker.C {
t.Log("tick")
}
t.Log("stop")
}()
time.Sleep(6 * time.Second)
ticker.Stop()
time.Sleep(1 * time.Second)
}
func TestTicker(t *testing.T) { func TestTicker(t *testing.T) {
ticker := NewTicker(3 * time.Second) ticker := NewTicker(3 * time.Second)
go func() { go func() {