Files
EdgeNode/internal/caches/storage_memory.go
2020-10-05 20:23:18 +08:00

187 lines
4.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package caches
import (
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/errors"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/dchest/siphash"
"strconv"
"sync"
"sync/atomic"
"time"
)
type MemoryItem struct {
ExpiredAt int64
Value []byte
}
type MemoryStorage struct {
policy *serverconfigs.HTTPCachePolicy
list *List
locker *sync.RWMutex
valuesMap map[uint64]*MemoryItem
ticker *utils.Ticker
purgeDuration time.Duration
totalSize int64
}
func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy) *MemoryStorage {
return &MemoryStorage{
policy: policy,
list: NewList(),
locker: &sync.RWMutex{},
valuesMap: map[uint64]*MemoryItem{},
}
}
// 初始化
func (this *MemoryStorage) Init() error {
this.list.OnAdd(func(item *Item) {
atomic.AddInt64(&this.totalSize, item.Size)
})
this.list.OnRemove(func(item *Item) {
atomic.AddInt64(&this.totalSize, -item.Size)
})
if this.purgeDuration <= 0 {
this.purgeDuration = 30 * time.Second
}
// 启动定时清理任务
this.ticker = utils.NewTicker(this.purgeDuration)
go func() {
for this.ticker.Next() {
this.purgeLoop()
}
}()
return nil
}
// 读取缓存
func (this *MemoryStorage) Read(key string, readerBuf []byte, callback func(data []byte, size int64, expiredAt int64, isEOF bool)) error {
hash := this.hash(key)
this.locker.RLock()
item := this.valuesMap[hash]
if item == nil {
this.locker.RUnlock()
return ErrNotFound
}
if item.ExpiredAt > utils.UnixTime() {
// 这时如果callback处理比较慢的话可能会影响性能但目前没有更好的解决方案
callback(item.Value, int64(len(item.Value)), item.ExpiredAt, true)
this.locker.RUnlock()
return nil
}
this.locker.RUnlock()
_ = this.Delete(key)
return ErrNotFound
}
// 打开缓存写入器等待写入
func (this *MemoryStorage) Open(key string, expiredAt int64) (Writer, error) {
// 检查是否超出最大值
if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys {
return nil, errors.New("write memory cache failed: too many keys in cache storage")
}
if this.policy.CapacityBytes() > 0 && this.policy.CapacityBytes() <= this.totalSize {
return nil, errors.New("write memory cache failed: over memory size, real size: " + strconv.FormatInt(this.totalSize, 10) + " bytes")
}
// 先删除
err := this.Delete(key)
if err != nil {
return nil, err
}
return NewMemoryWriter(this.valuesMap, key, expiredAt, this.locker), nil
}
// 删除某个键值对应的缓存
func (this *MemoryStorage) Delete(key string) error {
hash := this.hash(key)
this.locker.Lock()
delete(this.valuesMap, hash)
this.list.Remove(fmt.Sprintf("%d", hash))
this.locker.Unlock()
return nil
}
// 统计缓存
func (this *MemoryStorage) Stat() (*Stat, error) {
this.locker.RLock()
defer this.locker.RUnlock()
return this.list.Stat(func(hash string) bool {
return true
}), nil
}
// 清除所有缓存
func (this *MemoryStorage) CleanAll() error {
this.locker.Lock()
this.valuesMap = map[uint64]*MemoryItem{}
this.list.Reset()
atomic.StoreInt64(&this.totalSize, 0)
this.locker.Unlock()
return nil
}
// 批量删除缓存
func (this *MemoryStorage) Purge(keys []string) error {
for _, key := range keys {
err := this.Delete(key)
if err != nil {
return err
}
}
return nil
}
// 停止缓存策略
func (this *MemoryStorage) Stop() {
this.locker.Lock()
defer this.locker.Unlock()
this.valuesMap = map[uint64]*MemoryItem{}
this.list.Reset()
if this.ticker != nil {
this.ticker.Stop()
}
}
// 获取当前存储的Policy
func (this *MemoryStorage) Policy() *serverconfigs.HTTPCachePolicy {
return this.policy
}
// 将缓存添加到列表
func (this *MemoryStorage) AddToList(item *Item) {
item.Size = item.ValueSize + int64(len(item.Key)) + 32 /** 32是我们评估的数据结构的长度 **/
hash := fmt.Sprintf("%d", this.hash(item.Key))
this.list.Add(hash, item)
}
// 计算Key Hash
func (this *MemoryStorage) hash(key string) uint64 {
return siphash.Hash(0, 0, []byte(key))
}
// 清理任务
func (this *MemoryStorage) purgeLoop() {
this.list.Purge(1000, func(hash string) {
uintHash, err := strconv.ParseUint(hash, 10, 64)
if err == nil {
this.locker.Lock()
delete(this.valuesMap, uintHash)
this.locker.Unlock()
}
})
}