Files
EdgeNode/internal/caches/storage_memory.go

436 lines
9.8 KiB
Go
Raw Normal View History

2020-10-05 19:15:35 +08:00
package caches
import (
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
2021-06-13 17:37:57 +08:00
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
2020-10-05 19:15:35 +08:00
"github.com/TeaOSLab/EdgeNode/internal/utils"
2020-11-21 22:29:57 +08:00
"github.com/cespare/xxhash"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
"math"
2020-10-05 19:15:35 +08:00
"strconv"
"sync"
2020-10-05 20:23:18 +08:00
"sync/atomic"
2020-10-05 19:15:35 +08:00
"time"
)
type MemoryItem struct {
2021-01-13 12:02:50 +08:00
ExpiredAt int64
HeaderValue []byte
BodyValue []byte
Status int
IsDone bool
2021-06-14 11:46:39 +08:00
ModifiedAt int64
2020-10-05 19:15:35 +08:00
}
func (this *MemoryItem) IsExpired() bool {
return this.ExpiredAt < utils.UnixTime()
}
2020-10-05 19:15:35 +08:00
type MemoryStorage struct {
parentStorage StorageInterface
policy *serverconfigs.HTTPCachePolicy
list ListInterface
locker *sync.RWMutex
valuesMap map[uint64]*MemoryItem // hash => item
dirtyChan chan string // hash chan
purgeTicker *utils.Ticker
2020-10-05 20:23:18 +08:00
totalSize int64
writingKeyMap map[string]bool // key => bool
2020-10-05 19:15:35 +08:00
}
func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy, parentStorage StorageInterface) *MemoryStorage {
var dirtyChan chan string
if parentStorage != nil {
var queueSize = policy.MemoryAutoFlushQueueSize
if queueSize <= 0 {
queueSize = 2048
}
dirtyChan = make(chan string, queueSize)
}
2020-10-05 19:15:35 +08:00
return &MemoryStorage{
parentStorage: parentStorage,
policy: policy,
list: NewMemoryList(),
locker: &sync.RWMutex{},
valuesMap: map[uint64]*MemoryItem{},
dirtyChan: dirtyChan,
writingKeyMap: map[string]bool{},
2020-10-05 19:15:35 +08:00
}
}
// Init 初始化
2020-10-05 19:15:35 +08:00
func (this *MemoryStorage) Init() error {
2021-06-12 10:03:33 +08:00
_ = this.list.Init()
2020-10-05 20:23:18 +08:00
this.list.OnAdd(func(item *Item) {
2021-05-24 09:37:37 +08:00
atomic.AddInt64(&this.totalSize, item.TotalSize())
2020-10-05 20:23:18 +08:00
})
this.list.OnRemove(func(item *Item) {
2021-05-24 09:37:37 +08:00
atomic.AddInt64(&this.totalSize, -item.TotalSize())
2020-10-05 20:23:18 +08:00
})
var autoPurgeInterval = this.policy.MemoryAutoPurgeInterval
if autoPurgeInterval <= 0 {
autoPurgeInterval = 5
2020-10-05 19:15:35 +08:00
}
// 启动定时清理任务
this.purgeTicker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second)
2020-10-05 19:15:35 +08:00
go func() {
for this.purgeTicker.Next() {
2020-10-05 19:15:35 +08:00
this.purgeLoop()
}
}()
// 启动定时Flush memory to disk任务
go func() {
for hash := range this.dirtyChan {
this.flushItem(hash)
}
}()
2020-10-05 19:15:35 +08:00
return nil
}
// OpenReader 读取缓存
2021-01-13 12:02:50 +08:00
func (this *MemoryStorage) OpenReader(key string) (Reader, error) {
2020-10-05 19:15:35 +08:00
hash := this.hash(key)
this.locker.RLock()
item := this.valuesMap[hash]
if item == nil || !item.IsDone {
2021-06-17 18:04:56 +08:00
this.locker.RUnlock()
2021-01-13 12:02:50 +08:00
return nil, ErrNotFound
2020-10-05 19:15:35 +08:00
}
if item.ExpiredAt > utils.UnixTime() {
2021-01-13 12:02:50 +08:00
reader := NewMemoryReader(item)
err := reader.Init()
if err != nil {
2021-06-17 18:04:56 +08:00
this.locker.RUnlock()
2021-01-13 12:02:50 +08:00
return nil, err
}
2021-06-17 18:04:56 +08:00
this.locker.RUnlock()
// 增加点击量
// 1/1000采样
// TODO 考虑是否在缓存策略里设置
if rands.Int(0, 1000) == 0 {
var hitErr = this.list.IncreaseHit(types.String(hash))
if hitErr != nil {
// 此错误可以忽略
remotelogs.Error("CACHE", "increase hit failed: "+hitErr.Error())
}
}
2021-01-13 12:02:50 +08:00
return reader, nil
2020-10-05 19:15:35 +08:00
}
2021-06-17 18:04:56 +08:00
this.locker.RUnlock()
2020-10-05 19:15:35 +08:00
_ = this.Delete(key)
2021-01-13 12:02:50 +08:00
return nil, ErrNotFound
2020-10-05 19:15:35 +08:00
}
// OpenWriter 打开缓存写入器等待写入
2021-01-13 12:02:50 +08:00
func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) {
this.locker.Lock()
defer this.locker.Unlock()
// 是否正在写入
var isWriting = false
_, ok := this.writingKeyMap[key]
if ok {
return nil, ErrFileIsWriting
}
this.writingKeyMap[key] = true
defer func() {
if !isWriting {
delete(this.writingKeyMap, key)
}
}()
// 检查是否过期
hash := this.hash(key)
item, ok := this.valuesMap[hash]
if ok && !item.IsExpired() {
return nil, ErrFileIsWriting
}
2020-10-05 19:15:35 +08:00
// 检查是否超出最大值
2021-05-19 12:07:35 +08:00
totalKeys, err := this.list.Count()
if err != nil {
return nil, err
}
if this.policy.MaxKeys > 0 && totalKeys > this.policy.MaxKeys {
2021-06-08 11:24:41 +08:00
return nil, NewCapacityError("write memory cache failed: too many keys in cache storage")
2020-10-05 20:23:18 +08:00
}
capacityBytes := this.memoryCapacityBytes()
if capacityBytes > 0 && capacityBytes <= this.totalSize {
2021-06-08 11:24:41 +08:00
return nil, NewCapacityError("write memory cache failed: over memory size: " + strconv.FormatInt(capacityBytes, 10) + ", current size: " + strconv.FormatInt(this.totalSize, 10) + " bytes")
2020-10-05 20:23:18 +08:00
}
// 先删除
2021-06-17 18:04:56 +08:00
err = this.deleteWithoutLocker(key)
2020-10-05 20:23:18 +08:00
if err != nil {
return nil, err
2020-10-05 19:15:35 +08:00
}
isWriting = true
return NewMemoryWriter(this, key, expiredAt, status, func() {
this.locker.Lock()
delete(this.writingKeyMap, key)
this.locker.Unlock()
}), nil
2020-10-05 19:15:35 +08:00
}
// Delete 删除某个键值对应的缓存
2020-10-05 19:15:35 +08:00
func (this *MemoryStorage) Delete(key string) error {
hash := this.hash(key)
this.locker.Lock()
delete(this.valuesMap, hash)
2021-05-24 09:37:37 +08:00
_ = this.list.Remove(fmt.Sprintf("%d", hash))
2020-10-05 19:15:35 +08:00
this.locker.Unlock()
return nil
}
// Stat 统计缓存
2020-10-05 19:15:35 +08:00
func (this *MemoryStorage) Stat() (*Stat, error) {
this.locker.RLock()
defer this.locker.RUnlock()
return this.list.Stat(func(hash string) bool {
return true
2021-05-19 12:07:35 +08:00
})
2020-10-05 19:15:35 +08:00
}
// CleanAll 清除所有缓存
2020-10-05 19:15:35 +08:00
func (this *MemoryStorage) CleanAll() error {
this.locker.Lock()
this.valuesMap = map[uint64]*MemoryItem{}
2021-05-24 09:37:37 +08:00
_ = this.list.Reset()
2020-10-05 20:23:18 +08:00
atomic.StoreInt64(&this.totalSize, 0)
2020-10-05 19:15:35 +08:00
this.locker.Unlock()
return nil
}
// Purge 批量删除缓存
2020-12-23 21:28:50 +08:00
func (this *MemoryStorage) Purge(keys []string, urlType string) error {
// 目录
if urlType == "dir" {
for _, key := range keys {
2021-06-13 17:37:57 +08:00
err := this.list.CleanPrefix(key)
2021-05-19 12:07:35 +08:00
if err != nil {
return err
}
2020-12-23 21:28:50 +08:00
}
}
2020-10-05 19:15:35 +08:00
for _, key := range keys {
err := this.Delete(key)
if err != nil {
return err
}
}
return nil
}
// Stop 停止缓存策略
2020-10-05 19:15:35 +08:00
func (this *MemoryStorage) Stop() {
this.locker.Lock()
this.valuesMap = map[uint64]*MemoryItem{}
2021-06-13 17:37:57 +08:00
this.writingKeyMap = map[string]bool{}
2021-05-24 09:37:37 +08:00
_ = this.list.Reset()
if this.purgeTicker != nil {
this.purgeTicker.Stop()
}
if this.parentStorage != nil && this.dirtyChan != nil {
close(this.dirtyChan)
2020-10-05 19:15:35 +08:00
}
2021-06-13 17:37:57 +08:00
_ = this.list.Close()
this.locker.Unlock()
remotelogs.Println("CACHE", "close memory storage '"+strconv.FormatInt(this.policy.Id, 10)+"'")
2020-10-05 19:15:35 +08:00
}
// Policy 获取当前存储的Policy
2020-10-05 19:15:35 +08:00
func (this *MemoryStorage) Policy() *serverconfigs.HTTPCachePolicy {
return this.policy
}
// AddToList 将缓存添加到列表
2020-10-05 19:15:35 +08:00
func (this *MemoryStorage) AddToList(item *Item) {
item.MetaSize = int64(len(item.Key)) + 128 /** 128是我们评估的数据结构的长度 **/
2020-10-05 19:15:35 +08:00
hash := fmt.Sprintf("%d", this.hash(item.Key))
2021-05-24 09:37:37 +08:00
_ = this.list.Add(hash, item)
2020-10-05 19:15:35 +08:00
}
// TotalDiskSize 消耗的磁盘尺寸
func (this *MemoryStorage) TotalDiskSize() int64 {
return 0
}
// TotalMemorySize 内存尺寸
func (this *MemoryStorage) TotalMemorySize() int64 {
return atomic.LoadInt64(&this.totalSize)
}
2020-10-05 19:15:35 +08:00
// 计算Key Hash
func (this *MemoryStorage) hash(key string) uint64 {
2020-11-21 22:29:57 +08:00
return xxhash.Sum64String(key)
2020-10-05 19:15:35 +08:00
}
// 清理任务
func (this *MemoryStorage) purgeLoop() {
// 计算是否应该开启LFU清理
var capacityBytes = this.policy.CapacityBytes()
var startLFU = false
var usedPercent = float32(this.TotalMemorySize()*100) / float32(capacityBytes)
var lfuFreePercent = this.policy.MemoryLFUFreePercent
if lfuFreePercent <= 0 {
lfuFreePercent = 5
}
if capacityBytes > 0 {
if lfuFreePercent < 100 {
if usedPercent >= 100-lfuFreePercent {
startLFU = true
}
}
}
// 清理过期
var purgeCount = this.policy.MemoryAutoPurgeCount
if purgeCount <= 0 {
purgeCount = 2000
}
_, _ = this.list.Purge(purgeCount, func(hash string) error {
2020-10-05 19:15:35 +08:00
uintHash, err := strconv.ParseUint(hash, 10, 64)
if err == nil {
this.locker.Lock()
delete(this.valuesMap, uintHash)
this.locker.Unlock()
}
2021-05-19 12:07:35 +08:00
return nil
2020-10-05 19:15:35 +08:00
})
// LFU
if startLFU {
var total, _ = this.list.Count()
if total > 0 {
var count = types.Int(math.Ceil(float64(total) * float64(lfuFreePercent * 2) / 100))
if count > 0 {
// 限制单次清理的条数,防止占用太多系统资源
if count > 2000 {
count = 2000
}
remotelogs.Println("CACHE", "LFU purge policy '"+this.policy.Name+"' id: "+types.String(this.policy.Id)+", count: "+types.String(count))
err := this.list.PurgeLFU(count, func(hash string) error {
uintHash, err := strconv.ParseUint(hash, 10, 64)
if err == nil {
this.locker.Lock()
delete(this.valuesMap, uintHash)
this.locker.Unlock()
}
return nil
})
if err != nil {
remotelogs.Warn("CACHE", "purge memory storage in LFU failed: "+err.Error())
}
}
}
}
}
// Flush任务
func (this *MemoryStorage) flushItem(key string) {
if this.parentStorage == nil {
return
}
var hash = this.hash(key)
this.locker.RLock()
item, ok := this.valuesMap[hash]
this.locker.RUnlock()
if !ok {
return
}
if !item.IsDone || item.IsExpired() {
return
}
writer, err := this.parentStorage.OpenWriter(key, item.ExpiredAt, item.Status)
if err != nil {
if !CanIgnoreErr(err) {
remotelogs.Error("CACHE", "flush items failed: open writer failed: "+err.Error())
}
return
}
_, err = writer.WriteHeader(item.HeaderValue)
if err != nil {
_ = writer.Discard()
remotelogs.Error("CACHE", "flush items failed: write header failed: "+err.Error())
return
}
_, err = writer.Write(item.BodyValue)
if err != nil {
_ = writer.Discard()
remotelogs.Error("CACHE", "flush items failed: writer body failed: "+err.Error())
return
}
err = writer.Close()
if err != nil {
_ = writer.Discard()
remotelogs.Error("CACHE", "flush items failed: close writer failed: "+err.Error())
}
this.parentStorage.AddToList(&Item{
Type: writer.ItemType(),
Key: key,
ExpiredAt: item.ExpiredAt,
HeaderSize: writer.HeaderSize(),
BodySize: writer.BodySize(),
})
return
2020-10-05 19:15:35 +08:00
}
func (this *MemoryStorage) memoryCapacityBytes() int64 {
if this.policy == nil {
return 0
}
c1 := int64(0)
if this.policy.Capacity != nil {
c1 = this.policy.Capacity.Bytes()
}
if SharedManager.MaxMemoryCapacity != nil {
c2 := SharedManager.MaxMemoryCapacity.Bytes()
if c2 > 0 {
return c2
}
}
return c1
}
2021-06-17 18:04:56 +08:00
func (this *MemoryStorage) deleteWithoutLocker(key string) error {
hash := this.hash(key)
delete(this.valuesMap, hash)
_ = this.list.Remove(fmt.Sprintf("%d", hash))
return nil
}