diff --git a/internal/caches/list.go b/internal/caches/list.go index 57b1877..4b625e1 100644 --- a/internal/caches/list.go +++ b/internal/caches/list.go @@ -4,8 +4,10 @@ import "sync" // 缓存列表管理 type List struct { - m map[string]*Item // hash => item - locker sync.RWMutex + m map[string]*Item // hash => item + locker sync.RWMutex + onAdd func(item *Item) + onRemove func(item *Item) } func NewList() *List { @@ -22,6 +24,9 @@ func (this *List) Reset() { func (this *List) Add(hash string, item *Item) { this.locker.Lock() + if this.onAdd != nil { + this.onAdd(item) + } this.m[hash] = item this.locker.Unlock() } @@ -40,7 +45,15 @@ func (this *List) Exist(hash string) bool { func (this *List) Remove(hash string) { this.locker.Lock() - delete(this.m, hash) + + item, ok := this.m[hash] + if ok { + if this.onRemove != nil { + this.onRemove(item) + } + delete(this.m, hash) + } + this.locker.Unlock() } @@ -56,6 +69,9 @@ func (this *List) Purge(count int, callback func(hash string)) { } if item.IsExpired() { + if this.onRemove != nil { + this.onRemove(item) + } delete(this.m, hash) deletedHashList = append(deletedHashList, hash) } @@ -100,3 +116,13 @@ func (this *List) Count() int64 { this.locker.RUnlock() return count } + +// 添加事件 +func (this *List) OnAdd(f func(item *Item)) { + this.onAdd = f +} + +// 删除事件 +func (this *List) OnRemove(f func(item *Item)) { + this.onRemove = f +} diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index a5750b6..0947769 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -18,6 +18,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" ) @@ -35,6 +36,7 @@ var ( type FileStorage struct { policy *serverconfigs.HTTPCachePolicy cacheConfig *serverconfigs.HTTPFileCacheStorage + totalSize int64 list *List locker sync.RWMutex @@ -55,6 +57,13 @@ func (this *FileStorage) Policy() *serverconfigs.HTTPCachePolicy { // 初始化 func (this *FileStorage) Init() error { + this.list.OnAdd(func(item *Item) { + atomic.AddInt64(&this.totalSize, item.Size) + }) + this.list.OnRemove(func(item *Item) { + atomic.AddInt64(&this.totalSize, -item.Size) + }) + this.locker.Lock() defer this.locker.Unlock() @@ -222,7 +231,10 @@ func (this *FileStorage) Read(key string, readerBuf []byte, callback func(data [ func (this *FileStorage) Open(key string, expiredAt int64) (Writer, error) { // 检查是否超出最大值 if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys { - return nil, errors.New("too many keys in cache storage") + return nil, errors.New("write file cache failed: too many keys in cache storage") + } + if this.policy.CapacityBytes() > 0 && this.policy.CapacityBytes() <= this.totalSize { + return nil, errors.New("write file cache failed: over disk size, real size: " + strconv.FormatInt(this.totalSize, 10) + " bytes") } hash := stringutil.Md5(key) @@ -240,6 +252,9 @@ func (this *FileStorage) Open(key string, expiredAt int64) (Writer, error) { this.locker.Lock() + // 先删除 + this.list.Remove(hash) + path := dir + "/" + hash + ".cache" writer, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_SYNC|os.O_WRONLY, 0777) if err != nil { diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index 99bc5b1..5db81b6 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -8,6 +8,7 @@ import ( "github.com/dchest/siphash" "strconv" "sync" + "sync/atomic" "time" ) @@ -19,11 +20,11 @@ type MemoryItem struct { type MemoryStorage struct { policy *serverconfigs.HTTPCachePolicy list *List - totalSize int64 // 需要实现 locker *sync.RWMutex valuesMap map[uint64]*MemoryItem ticker *utils.Ticker purgeDuration time.Duration + totalSize int64 } func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy) *MemoryStorage { @@ -37,6 +38,13 @@ func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy) *MemoryStorage { // 初始化 func (this *MemoryStorage) Init() error { + this.list.OnAdd(func(item *Item) { + atomic.AddInt64(&this.totalSize, item.Size) + }) + this.list.OnRemove(func(item *Item) { + atomic.AddInt64(&this.totalSize, -item.Size) + }) + if this.purgeDuration <= 0 { this.purgeDuration = 30 * time.Second } @@ -64,6 +72,7 @@ func (this *MemoryStorage) Read(key string, readerBuf []byte, callback func(data } if item.ExpiredAt > utils.UnixTime() { + // 这时如果callback处理比较慢的话,可能会影响性能,但目前没有更好的解决方案 callback(item.Value, int64(len(item.Value)), item.ExpiredAt, true) this.locker.RUnlock() return nil @@ -79,7 +88,16 @@ func (this *MemoryStorage) Read(key string, readerBuf []byte, callback func(data func (this *MemoryStorage) Open(key string, expiredAt int64) (Writer, error) { // 检查是否超出最大值 if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys { - return nil, errors.New("too many keys in cache storage") + return nil, errors.New("write memory cache failed: too many keys in cache storage") + } + if this.policy.CapacityBytes() > 0 && this.policy.CapacityBytes() <= this.totalSize { + return nil, errors.New("write memory cache failed: over memory size, real size: " + strconv.FormatInt(this.totalSize, 10) + " bytes") + } + + // 先删除 + err := this.Delete(key) + if err != nil { + return nil, err } return NewMemoryWriter(this.valuesMap, key, expiredAt, this.locker), nil @@ -110,6 +128,7 @@ func (this *MemoryStorage) CleanAll() error { this.locker.Lock() this.valuesMap = map[uint64]*MemoryItem{} this.list.Reset() + atomic.StoreInt64(&this.totalSize, 0) this.locker.Unlock() return nil } @@ -144,7 +163,7 @@ func (this *MemoryStorage) Policy() *serverconfigs.HTTPCachePolicy { // 将缓存添加到列表 func (this *MemoryStorage) AddToList(item *Item) { - item.Size = item.ValueSize + int64(len(item.Key)) + item.Size = item.ValueSize + int64(len(item.Key)) + 32 /** 32是我们评估的数据结构的长度 **/ hash := fmt.Sprintf("%d", this.hash(item.Key)) this.list.Add(hash, item) } diff --git a/internal/caches/writer_memory.go b/internal/caches/writer_memory.go index 8781d33..e15bb6c 100644 --- a/internal/caches/writer_memory.go +++ b/internal/caches/writer_memory.go @@ -43,6 +43,7 @@ func (this *MemoryWriter) Write(data []byte) (n int, err error) { item.Value = append([]byte{}, data...) item.ExpiredAt = this.expiredAt this.m[hash] = item + this.isFirstWriting = false } this.locker.Unlock() return len(data), nil diff --git a/internal/nodes/http_request_cache.go b/internal/nodes/http_request_cache.go index fb90dcf..a507ed9 100644 --- a/internal/nodes/http_request_cache.go +++ b/internal/nodes/http_request_cache.go @@ -147,5 +147,6 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) { return } + this.cacheRef = nil // 终止读取不再往下传递 return true } diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index 37aa2c0..e558f3b 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -114,6 +114,7 @@ func (this *HTTPWriter) Write(data []byte) (n int, err error) { _, err = this.cacheWriter.Write(data) if err != nil { _ = this.cacheWriter.Discard() + this.cacheWriter = nil logs.Println("write cache failed: " + err.Error()) } }