实现内存缓存

This commit is contained in:
GoEdgeLab
2020-10-05 19:15:35 +08:00
parent 870876bfff
commit 9c0c772226
14 changed files with 504 additions and 17 deletions

View File

@@ -92,3 +92,11 @@ func (this *List) Stat(check func(hash string) bool) *Stat {
}
return result
}
// 总数量
func (this *List) Count() int64 {
this.locker.RLock()
count := int64(len(this.m))
this.locker.RUnlock()
return count
}

View File

@@ -126,7 +126,7 @@ func (this *Manager) NewStorageWithPolicy(policy *serverconfigs.HTTPCachePolicy)
case serverconfigs.CachePolicyStorageFile:
return NewFileStorage(policy)
case serverconfigs.CachePolicyStorageMemory:
return nil // TODO 暂时返回nil
return NewMemoryStorage(policy)
}
return nil
}

View File

@@ -17,21 +17,21 @@ func TestManager_UpdatePolicies(t *testing.T) {
policies := []*serverconfigs.HTTPCachePolicy{
{
Id: 1,
Type: serverconfigs.CachePolicyTypeFile,
Type: serverconfigs.CachePolicyStorageFile,
Options: map[string]interface{}{
"dir": Tea.Root + "/caches",
},
},
{
Id: 2,
Type: serverconfigs.CachePolicyTypeFile,
Type: serverconfigs.CachePolicyStorageFile,
Options: map[string]interface{}{
"dir": Tea.Root + "/caches",
},
},
{
Id: 3,
Type: serverconfigs.CachePolicyTypeFile,
Type: serverconfigs.CachePolicyStorageFile,
Options: map[string]interface{}{
"dir": Tea.Root + "/caches",
},
@@ -45,14 +45,14 @@ func TestManager_UpdatePolicies(t *testing.T) {
policies := []*serverconfigs.HTTPCachePolicy{
{
Id: 1,
Type: serverconfigs.CachePolicyTypeFile,
Type: serverconfigs.CachePolicyStorageFile,
Options: map[string]interface{}{
"dir": Tea.Root + "/caches",
},
},
{
Id: 2,
Type: serverconfigs.CachePolicyTypeFile,
Type: serverconfigs.CachePolicyStorageFile,
MaxKeys: 1,
Options: map[string]interface{}{
"dir": Tea.Root + "/caches",
@@ -60,7 +60,7 @@ func TestManager_UpdatePolicies(t *testing.T) {
},
{
Id: 4,
Type: serverconfigs.CachePolicyTypeFile,
Type: serverconfigs.CachePolicyStorageFile,
Options: map[string]interface{}{
"dir": Tea.Root + "/caches",
},

View File

@@ -220,6 +220,11 @@ func (this *FileStorage) Read(key string, readerBuf []byte, callback func(data [
// 打开缓存文件等待写入
func (this *FileStorage) Open(key string, expiredAt int64) (Writer, error) {
// 检查是否超出最大值
if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys {
return nil, errors.New("too many keys in cache storage")
}
hash := stringutil.Md5(key)
dir := this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4]
_, err := os.Stat(dir)

View File

@@ -61,7 +61,7 @@ func TestFileStorage_Open(t *testing.T) {
}
t.Log(writer)
err = writer.Write([]byte("Hello,World"))
_, err = writer.Write([]byte("Hello,World"))
if err != nil {
t.Fatal(err)
}
@@ -114,7 +114,7 @@ func TestFileStorage_Read(t *testing.T) {
t.Fatal(err)
}
now := time.Now()
t.Log(storage.Read("my-key", make([]byte, 64), func(data []byte, expiredAt int64) {
t.Log(storage.Read("my-key", make([]byte, 64), func(data []byte, size int64, expiredAt int64, isEOF bool) {
t.Log("[expiredAt]", "["+string(data)+"]")
}))
t.Log(time.Since(now).Seconds()*1000, "ms")
@@ -133,7 +133,7 @@ func TestFileStorage_Read_NotFound(t *testing.T) {
t.Fatal(err)
}
now := time.Now()
t.Log(storage.Read("my-key-10000", make([]byte, 64), func(data []byte, expiredAt int64) {
t.Log(storage.Read("my-key-10000", make([]byte, 64), func(data []byte, size int64, expiredAt int64, isEOF bool) {
t.Log("[" + string(data) + "]")
}))
t.Log(time.Since(now).Seconds()*1000, "ms")
@@ -277,7 +277,7 @@ func BenchmarkFileStorage_Read(b *testing.B) {
}
buf := make([]byte, 1024)
for i := 0; i < b.N; i++ {
_ = storage.Read("my-key", buf, func(data []byte, expiredAt int64) {
_ = storage.Read("my-key", buf, func(data []byte, size int64, expiredAt int64, isEOF bool) {
})
}
}

View File

@@ -0,0 +1,167 @@
package caches
import (
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/errors"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/dchest/siphash"
"strconv"
"sync"
"time"
)
type MemoryItem struct {
ExpiredAt int64
Value []byte
}
type MemoryStorage struct {
policy *serverconfigs.HTTPCachePolicy
list *List
totalSize int64 // 需要实现
locker *sync.RWMutex
valuesMap map[uint64]*MemoryItem
ticker *utils.Ticker
purgeDuration time.Duration
}
func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy) *MemoryStorage {
return &MemoryStorage{
policy: policy,
list: NewList(),
locker: &sync.RWMutex{},
valuesMap: map[uint64]*MemoryItem{},
}
}
// 初始化
func (this *MemoryStorage) Init() error {
if this.purgeDuration <= 0 {
this.purgeDuration = 30 * time.Second
}
// 启动定时清理任务
this.ticker = utils.NewTicker(this.purgeDuration)
go func() {
for this.ticker.Next() {
this.purgeLoop()
}
}()
return nil
}
// 读取缓存
func (this *MemoryStorage) Read(key string, readerBuf []byte, callback func(data []byte, size int64, expiredAt int64, isEOF bool)) error {
hash := this.hash(key)
this.locker.RLock()
item := this.valuesMap[hash]
if item == nil {
this.locker.RUnlock()
return ErrNotFound
}
if item.ExpiredAt > utils.UnixTime() {
callback(item.Value, int64(len(item.Value)), item.ExpiredAt, true)
this.locker.RUnlock()
return nil
}
this.locker.RUnlock()
_ = this.Delete(key)
return ErrNotFound
}
// 打开缓存写入器等待写入
func (this *MemoryStorage) Open(key string, expiredAt int64) (Writer, error) {
// 检查是否超出最大值
if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys {
return nil, errors.New("too many keys in cache storage")
}
return NewMemoryWriter(this.valuesMap, key, expiredAt, this.locker), nil
}
// 删除某个键值对应的缓存
func (this *MemoryStorage) Delete(key string) error {
hash := this.hash(key)
this.locker.Lock()
delete(this.valuesMap, hash)
this.list.Remove(fmt.Sprintf("%d", hash))
this.locker.Unlock()
return nil
}
// 统计缓存
func (this *MemoryStorage) Stat() (*Stat, error) {
this.locker.RLock()
defer this.locker.RUnlock()
return this.list.Stat(func(hash string) bool {
return true
}), nil
}
// 清除所有缓存
func (this *MemoryStorage) CleanAll() error {
this.locker.Lock()
this.valuesMap = map[uint64]*MemoryItem{}
this.list.Reset()
this.locker.Unlock()
return nil
}
// 批量删除缓存
func (this *MemoryStorage) Purge(keys []string) error {
for _, key := range keys {
err := this.Delete(key)
if err != nil {
return err
}
}
return nil
}
// 停止缓存策略
func (this *MemoryStorage) Stop() {
this.locker.Lock()
defer this.locker.Unlock()
this.valuesMap = map[uint64]*MemoryItem{}
this.list.Reset()
if this.ticker != nil {
this.ticker.Stop()
}
}
// 获取当前存储的Policy
func (this *MemoryStorage) Policy() *serverconfigs.HTTPCachePolicy {
return this.policy
}
// 将缓存添加到列表
func (this *MemoryStorage) AddToList(item *Item) {
item.Size = item.ValueSize + int64(len(item.Key))
hash := fmt.Sprintf("%d", this.hash(item.Key))
this.list.Add(hash, item)
}
// 计算Key Hash
func (this *MemoryStorage) hash(key string) uint64 {
return siphash.Hash(0, 0, []byte(key))
}
// 清理任务
func (this *MemoryStorage) purgeLoop() {
this.list.Purge(1000, func(hash string) {
uintHash, err := strconv.ParseUint(hash, 10, 64)
if err == nil {
this.locker.Lock()
delete(this.valuesMap, uintHash)
this.locker.Unlock()
}
})
}

View File

@@ -0,0 +1,218 @@
package caches
import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/rands"
"strconv"
"testing"
"time"
)
func TestMemoryStorage_Open(t *testing.T) {
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{})
writer, err := storage.Open("abc", time.Now().Unix()+60)
if err != nil {
t.Fatal(err)
}
_, _ = writer.Write([]byte("Hello"))
_, _ = writer.Write([]byte(", World"))
t.Log(storage.valuesMap)
{
err = storage.Read("abc", make([]byte, 8), func(data []byte, size int64, expiredAt int64, isEOF bool) {
t.Log("read:", string(data))
})
if err != nil {
if err == ErrNotFound {
t.Log("not found: abc")
} else {
t.Fatal(err)
}
}
}
{
err = storage.Read("abc 2", make([]byte, 8), func(data []byte, size int64, expiredAt int64, isEOF bool) {
t.Log("read:", string(data))
})
if err != nil {
if err == ErrNotFound {
t.Log("not found: abc2")
} else {
t.Fatal(err)
}
}
}
writer, err = storage.Open("abc", time.Now().Unix()+60)
if err != nil {
t.Fatal(err)
}
_, _ = writer.Write([]byte("Hello123"))
{
err = storage.Read("abc", make([]byte, 8), func(data []byte, size int64, expiredAt int64, isEOF bool) {
t.Log("read:", string(data))
})
if err != nil {
if err == ErrNotFound {
t.Log("not found: abc")
} else {
t.Fatal(err)
}
}
}
}
func TestMemoryStorage_Delete(t *testing.T) {
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{})
{
writer, err := storage.Open("abc", time.Now().Unix()+60)
if err != nil {
t.Fatal(err)
}
_, _ = writer.Write([]byte("Hello"))
t.Log(len(storage.valuesMap))
}
{
writer, err := storage.Open("abc1", time.Now().Unix()+60)
if err != nil {
t.Fatal(err)
}
_, _ = writer.Write([]byte("Hello"))
t.Log(len(storage.valuesMap))
}
_ = storage.Delete("abc1")
t.Log(len(storage.valuesMap))
}
func TestMemoryStorage_Stat(t *testing.T) {
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{})
expiredAt := time.Now().Unix() + 60
{
writer, err := storage.Open("abc", expiredAt)
if err != nil {
t.Fatal(err)
}
_, _ = writer.Write([]byte("Hello"))
t.Log(len(storage.valuesMap))
storage.AddToList(&Item{
Key: "abc",
Size: 5,
ExpiredAt: expiredAt,
})
}
{
writer, err := storage.Open("abc1", expiredAt)
if err != nil {
t.Fatal(err)
}
_, _ = writer.Write([]byte("Hello"))
t.Log(len(storage.valuesMap))
storage.AddToList(&Item{
Key: "abc1",
Size: 5,
ExpiredAt: expiredAt,
})
}
stat, err := storage.Stat()
if err != nil {
t.Fatal(err)
}
t.Log("===stat===")
logs.PrintAsJSON(stat, t)
}
func TestMemoryStorage_CleanAll(t *testing.T) {
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{})
expiredAt := time.Now().Unix() + 60
{
writer, err := storage.Open("abc", expiredAt)
if err != nil {
t.Fatal(err)
}
_, _ = writer.Write([]byte("Hello"))
storage.AddToList(&Item{
Key: "abc",
Size: 5,
ExpiredAt: expiredAt,
})
}
{
writer, err := storage.Open("abc1", expiredAt)
if err != nil {
t.Fatal(err)
}
_, _ = writer.Write([]byte("Hello"))
storage.AddToList(&Item{
Key: "abc1",
Size: 5,
ExpiredAt: expiredAt,
})
}
err := storage.CleanAll()
if err != nil {
t.Fatal(err)
}
t.Log(storage.list.Count(), len(storage.valuesMap))
}
func TestMemoryStorage_Purge(t *testing.T) {
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{})
expiredAt := time.Now().Unix() + 60
{
writer, err := storage.Open("abc", expiredAt)
if err != nil {
t.Fatal(err)
}
_, _ = writer.Write([]byte("Hello"))
storage.AddToList(&Item{
Key: "abc",
Size: 5,
ExpiredAt: expiredAt,
})
}
{
writer, err := storage.Open("abc1", expiredAt)
if err != nil {
t.Fatal(err)
}
_, _ = writer.Write([]byte("Hello"))
storage.AddToList(&Item{
Key: "abc1",
Size: 5,
ExpiredAt: expiredAt,
})
}
err := storage.Purge([]string{"abc", "abc1"})
if err != nil {
t.Fatal(err)
}
t.Log(storage.list.Count(), len(storage.valuesMap))
}
func TestMemoryStorage_Expire(t *testing.T) {
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{})
storage.purgeDuration = 5 * time.Second
err := storage.Init()
if err != nil {
t.Fatal(err)
}
for i := 0; i < 1000; i++ {
expiredAt := time.Now().Unix() + int64(rands.Int(0, 60))
key := "abc" + strconv.Itoa(i)
writer, err := storage.Open(key, expiredAt)
if err != nil {
t.Fatal(err)
}
_, _ = writer.Write([]byte("Hello"))
storage.AddToList(&Item{
Key: key,
Size: 5,
ExpiredAt: expiredAt,
})
}
time.Sleep(70 * time.Second)
}

View File

@@ -5,6 +5,9 @@ type Writer interface {
// 写入数据
Write(data []byte) (n int, err error)
// 写入的总数据大小
Size() int64
// 关闭
Close() error

View File

@@ -45,3 +45,7 @@ func (this *gzipWriter) Key() string {
func (this *gzipWriter) ExpiredAt() int64 {
return this.expiredAt
}
func (this *gzipWriter) Size() int64 {
return this.rawWriter.Size()
}

View File

@@ -0,0 +1,83 @@
package caches
import (
"github.com/dchest/siphash"
"sync"
)
type MemoryWriter struct {
key string
expiredAt int64
m map[uint64]*MemoryItem
locker *sync.RWMutex
isFirstWriting bool
size int64
}
func NewMemoryWriter(m map[uint64]*MemoryItem, key string, expiredAt int64, locker *sync.RWMutex) *MemoryWriter {
return &MemoryWriter{
m: m,
key: key,
expiredAt: expiredAt,
locker: locker,
isFirstWriting: true,
}
}
// 写入数据
func (this *MemoryWriter) Write(data []byte) (n int, err error) {
this.size += int64(len(data))
hash := this.hash(this.key)
this.locker.Lock()
item, ok := this.m[hash]
if ok {
// 第一次写先清空
if this.isFirstWriting {
item.Value = nil
this.isFirstWriting = false
}
item.Value = append(item.Value, data...)
} else {
item := &MemoryItem{}
item.Value = append([]byte{}, data...)
item.ExpiredAt = this.expiredAt
this.m[hash] = item
}
this.locker.Unlock()
return len(data), nil
}
// 数据尺寸
func (this *MemoryWriter) Size() int64 {
return this.size
}
// 关闭
func (this *MemoryWriter) Close() error {
return nil
}
// 丢弃
func (this *MemoryWriter) Discard() error {
hash := this.hash(this.key)
this.locker.Lock()
delete(this.m, hash)
this.locker.Unlock()
return nil
}
// Key
func (this *MemoryWriter) Key() string {
return this.key
}
// 过期时间
func (this *MemoryWriter) ExpiredAt() int64 {
return this.expiredAt
}
// 计算Key Hash
func (this *MemoryWriter) hash(key string) uint64 {
return siphash.Hash(0, 0, []byte(key))
}

View File

@@ -430,7 +430,7 @@ func (this *APIStream) cacheStorage(message *pb.NodeStreamMessage, cachePolicyJS
storage = caches.SharedManager.NewStorageWithPolicy(cachePolicy)
if storage == nil {
this.replyFail(message.RequestId, "invalid storage type '"+cachePolicy.Type+"'")
return nil, false, err
return nil, false, errors.New("invalid storage type '" + cachePolicy.Type + "'")
}
err = storage.Init()
if err != nil {

View File

@@ -64,17 +64,13 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
return
}
buf := bytePool32k.Get()
defer func() {
bytePool32k.Put(buf)
}()
isBroken := false
headerBuf := []byte{}
statusCode := http.StatusOK
statusFound := false
headerFound := false
buf := bytePool32k.Get()
err := storage.Read(key, buf, func(data []byte, valueSize int64, expiredAt int64, isEOF bool) {
if isBroken {
return
@@ -134,6 +130,8 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
}
})
bytePool32k.Put(buf)
if err != nil {
if err == caches.ErrNotFound {
// cache相关变量

View File

@@ -217,6 +217,7 @@ func (this *HTTPWriter) Close() {
this.cacheStorage.AddToList(&caches.Item{
Key: this.cacheWriter.Key(),
ExpiredAt: this.cacheWriter.ExpiredAt(),
ValueSize: this.cacheWriter.Size(),
})
}
}