mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-03 23:20:25 +08:00
实现缓存策略最大容量
This commit is contained in:
@@ -4,8 +4,10 @@ import "sync"
|
||||
|
||||
// 缓存列表管理
|
||||
type List struct {
|
||||
m map[string]*Item // hash => item
|
||||
locker sync.RWMutex
|
||||
m map[string]*Item // hash => item
|
||||
locker sync.RWMutex
|
||||
onAdd func(item *Item)
|
||||
onRemove func(item *Item)
|
||||
}
|
||||
|
||||
func NewList() *List {
|
||||
@@ -22,6 +24,9 @@ func (this *List) Reset() {
|
||||
|
||||
func (this *List) Add(hash string, item *Item) {
|
||||
this.locker.Lock()
|
||||
if this.onAdd != nil {
|
||||
this.onAdd(item)
|
||||
}
|
||||
this.m[hash] = item
|
||||
this.locker.Unlock()
|
||||
}
|
||||
@@ -40,7 +45,15 @@ func (this *List) Exist(hash string) bool {
|
||||
|
||||
func (this *List) Remove(hash string) {
|
||||
this.locker.Lock()
|
||||
delete(this.m, hash)
|
||||
|
||||
item, ok := this.m[hash]
|
||||
if ok {
|
||||
if this.onRemove != nil {
|
||||
this.onRemove(item)
|
||||
}
|
||||
delete(this.m, hash)
|
||||
}
|
||||
|
||||
this.locker.Unlock()
|
||||
}
|
||||
|
||||
@@ -56,6 +69,9 @@ func (this *List) Purge(count int, callback func(hash string)) {
|
||||
}
|
||||
|
||||
if item.IsExpired() {
|
||||
if this.onRemove != nil {
|
||||
this.onRemove(item)
|
||||
}
|
||||
delete(this.m, hash)
|
||||
deletedHashList = append(deletedHashList, hash)
|
||||
}
|
||||
@@ -100,3 +116,13 @@ func (this *List) Count() int64 {
|
||||
this.locker.RUnlock()
|
||||
return count
|
||||
}
|
||||
|
||||
// 添加事件
|
||||
func (this *List) OnAdd(f func(item *Item)) {
|
||||
this.onAdd = f
|
||||
}
|
||||
|
||||
// 删除事件
|
||||
func (this *List) OnRemove(f func(item *Item)) {
|
||||
this.onRemove = f
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -35,6 +36,7 @@ var (
|
||||
type FileStorage struct {
|
||||
policy *serverconfigs.HTTPCachePolicy
|
||||
cacheConfig *serverconfigs.HTTPFileCacheStorage
|
||||
totalSize int64
|
||||
|
||||
list *List
|
||||
locker sync.RWMutex
|
||||
@@ -55,6 +57,13 @@ func (this *FileStorage) Policy() *serverconfigs.HTTPCachePolicy {
|
||||
|
||||
// 初始化
|
||||
func (this *FileStorage) Init() error {
|
||||
this.list.OnAdd(func(item *Item) {
|
||||
atomic.AddInt64(&this.totalSize, item.Size)
|
||||
})
|
||||
this.list.OnRemove(func(item *Item) {
|
||||
atomic.AddInt64(&this.totalSize, -item.Size)
|
||||
})
|
||||
|
||||
this.locker.Lock()
|
||||
defer this.locker.Unlock()
|
||||
|
||||
@@ -222,7 +231,10 @@ func (this *FileStorage) Read(key string, readerBuf []byte, callback func(data [
|
||||
func (this *FileStorage) Open(key string, expiredAt int64) (Writer, error) {
|
||||
// 检查是否超出最大值
|
||||
if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys {
|
||||
return nil, errors.New("too many keys in cache storage")
|
||||
return nil, errors.New("write file cache failed: too many keys in cache storage")
|
||||
}
|
||||
if this.policy.CapacityBytes() > 0 && this.policy.CapacityBytes() <= this.totalSize {
|
||||
return nil, errors.New("write file cache failed: over disk size, real size: " + strconv.FormatInt(this.totalSize, 10) + " bytes")
|
||||
}
|
||||
|
||||
hash := stringutil.Md5(key)
|
||||
@@ -240,6 +252,9 @@ func (this *FileStorage) Open(key string, expiredAt int64) (Writer, error) {
|
||||
|
||||
this.locker.Lock()
|
||||
|
||||
// 先删除
|
||||
this.list.Remove(hash)
|
||||
|
||||
path := dir + "/" + hash + ".cache"
|
||||
writer, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_SYNC|os.O_WRONLY, 0777)
|
||||
if err != nil {
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/dchest/siphash"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -19,11 +20,11 @@ type MemoryItem struct {
|
||||
type MemoryStorage struct {
|
||||
policy *serverconfigs.HTTPCachePolicy
|
||||
list *List
|
||||
totalSize int64 // 需要实现
|
||||
locker *sync.RWMutex
|
||||
valuesMap map[uint64]*MemoryItem
|
||||
ticker *utils.Ticker
|
||||
purgeDuration time.Duration
|
||||
totalSize int64
|
||||
}
|
||||
|
||||
func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy) *MemoryStorage {
|
||||
@@ -37,6 +38,13 @@ func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy) *MemoryStorage {
|
||||
|
||||
// 初始化
|
||||
func (this *MemoryStorage) Init() error {
|
||||
this.list.OnAdd(func(item *Item) {
|
||||
atomic.AddInt64(&this.totalSize, item.Size)
|
||||
})
|
||||
this.list.OnRemove(func(item *Item) {
|
||||
atomic.AddInt64(&this.totalSize, -item.Size)
|
||||
})
|
||||
|
||||
if this.purgeDuration <= 0 {
|
||||
this.purgeDuration = 30 * time.Second
|
||||
}
|
||||
@@ -64,6 +72,7 @@ func (this *MemoryStorage) Read(key string, readerBuf []byte, callback func(data
|
||||
}
|
||||
|
||||
if item.ExpiredAt > utils.UnixTime() {
|
||||
// 这时如果callback处理比较慢的话,可能会影响性能,但目前没有更好的解决方案
|
||||
callback(item.Value, int64(len(item.Value)), item.ExpiredAt, true)
|
||||
this.locker.RUnlock()
|
||||
return nil
|
||||
@@ -79,7 +88,16 @@ func (this *MemoryStorage) Read(key string, readerBuf []byte, callback func(data
|
||||
func (this *MemoryStorage) Open(key string, expiredAt int64) (Writer, error) {
|
||||
// 检查是否超出最大值
|
||||
if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys {
|
||||
return nil, errors.New("too many keys in cache storage")
|
||||
return nil, errors.New("write memory cache failed: too many keys in cache storage")
|
||||
}
|
||||
if this.policy.CapacityBytes() > 0 && this.policy.CapacityBytes() <= this.totalSize {
|
||||
return nil, errors.New("write memory cache failed: over memory size, real size: " + strconv.FormatInt(this.totalSize, 10) + " bytes")
|
||||
}
|
||||
|
||||
// 先删除
|
||||
err := this.Delete(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewMemoryWriter(this.valuesMap, key, expiredAt, this.locker), nil
|
||||
@@ -110,6 +128,7 @@ func (this *MemoryStorage) CleanAll() error {
|
||||
this.locker.Lock()
|
||||
this.valuesMap = map[uint64]*MemoryItem{}
|
||||
this.list.Reset()
|
||||
atomic.StoreInt64(&this.totalSize, 0)
|
||||
this.locker.Unlock()
|
||||
return nil
|
||||
}
|
||||
@@ -144,7 +163,7 @@ func (this *MemoryStorage) Policy() *serverconfigs.HTTPCachePolicy {
|
||||
|
||||
// 将缓存添加到列表
|
||||
func (this *MemoryStorage) AddToList(item *Item) {
|
||||
item.Size = item.ValueSize + int64(len(item.Key))
|
||||
item.Size = item.ValueSize + int64(len(item.Key)) + 32 /** 32是我们评估的数据结构的长度 **/
|
||||
hash := fmt.Sprintf("%d", this.hash(item.Key))
|
||||
this.list.Add(hash, item)
|
||||
}
|
||||
|
||||
@@ -43,6 +43,7 @@ func (this *MemoryWriter) Write(data []byte) (n int, err error) {
|
||||
item.Value = append([]byte{}, data...)
|
||||
item.ExpiredAt = this.expiredAt
|
||||
this.m[hash] = item
|
||||
this.isFirstWriting = false
|
||||
}
|
||||
this.locker.Unlock()
|
||||
return len(data), nil
|
||||
|
||||
@@ -147,5 +147,6 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
|
||||
return
|
||||
}
|
||||
|
||||
this.cacheRef = nil // 终止读取不再往下传递
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -114,6 +114,7 @@ func (this *HTTPWriter) Write(data []byte) (n int, err error) {
|
||||
_, err = this.cacheWriter.Write(data)
|
||||
if err != nil {
|
||||
_ = this.cacheWriter.Discard()
|
||||
this.cacheWriter = nil
|
||||
logs.Println("write cache failed: " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user