diff --git a/internal/caches/list.go b/internal/caches/list.go index a16d0eb..57b1877 100644 --- a/internal/caches/list.go +++ b/internal/caches/list.go @@ -92,3 +92,11 @@ func (this *List) Stat(check func(hash string) bool) *Stat { } return result } + +// 总数量 +func (this *List) Count() int64 { + this.locker.RLock() + count := int64(len(this.m)) + this.locker.RUnlock() + return count +} diff --git a/internal/caches/manager.go b/internal/caches/manager.go index 4185564..acafbcf 100644 --- a/internal/caches/manager.go +++ b/internal/caches/manager.go @@ -126,7 +126,7 @@ func (this *Manager) NewStorageWithPolicy(policy *serverconfigs.HTTPCachePolicy) case serverconfigs.CachePolicyStorageFile: return NewFileStorage(policy) case serverconfigs.CachePolicyStorageMemory: - return nil // TODO 暂时返回nil + return NewMemoryStorage(policy) } return nil } diff --git a/internal/caches/manager_test.go b/internal/caches/manager_test.go index b7c74cc..c64ca02 100644 --- a/internal/caches/manager_test.go +++ b/internal/caches/manager_test.go @@ -17,21 +17,21 @@ func TestManager_UpdatePolicies(t *testing.T) { policies := []*serverconfigs.HTTPCachePolicy{ { Id: 1, - Type: serverconfigs.CachePolicyTypeFile, + Type: serverconfigs.CachePolicyStorageFile, Options: map[string]interface{}{ "dir": Tea.Root + "/caches", }, }, { Id: 2, - Type: serverconfigs.CachePolicyTypeFile, + Type: serverconfigs.CachePolicyStorageFile, Options: map[string]interface{}{ "dir": Tea.Root + "/caches", }, }, { Id: 3, - Type: serverconfigs.CachePolicyTypeFile, + Type: serverconfigs.CachePolicyStorageFile, Options: map[string]interface{}{ "dir": Tea.Root + "/caches", }, @@ -45,14 +45,14 @@ func TestManager_UpdatePolicies(t *testing.T) { policies := []*serverconfigs.HTTPCachePolicy{ { Id: 1, - Type: serverconfigs.CachePolicyTypeFile, + Type: serverconfigs.CachePolicyStorageFile, Options: map[string]interface{}{ "dir": Tea.Root + "/caches", }, }, { Id: 2, - Type: serverconfigs.CachePolicyTypeFile, + Type: serverconfigs.CachePolicyStorageFile, MaxKeys: 1, Options: map[string]interface{}{ "dir": Tea.Root + "/caches", @@ -60,7 +60,7 @@ func TestManager_UpdatePolicies(t *testing.T) { }, { Id: 4, - Type: serverconfigs.CachePolicyTypeFile, + Type: serverconfigs.CachePolicyStorageFile, Options: map[string]interface{}{ "dir": Tea.Root + "/caches", }, diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index b08533b..a5750b6 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -220,6 +220,11 @@ func (this *FileStorage) Read(key string, readerBuf []byte, callback func(data [ // 打开缓存文件等待写入 func (this *FileStorage) Open(key string, expiredAt int64) (Writer, error) { + // 检查是否超出最大值 + if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys { + return nil, errors.New("too many keys in cache storage") + } + hash := stringutil.Md5(key) dir := this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4] _, err := os.Stat(dir) diff --git a/internal/caches/storage_file_test.go b/internal/caches/storage_file_test.go index 47de105..b7437f9 100644 --- a/internal/caches/storage_file_test.go +++ b/internal/caches/storage_file_test.go @@ -61,7 +61,7 @@ func TestFileStorage_Open(t *testing.T) { } t.Log(writer) - err = writer.Write([]byte("Hello,World")) + _, err = writer.Write([]byte("Hello,World")) if err != nil { t.Fatal(err) } @@ -114,7 +114,7 @@ func TestFileStorage_Read(t *testing.T) { t.Fatal(err) } now := time.Now() - t.Log(storage.Read("my-key", make([]byte, 64), func(data []byte, expiredAt int64) { + t.Log(storage.Read("my-key", make([]byte, 64), func(data []byte, size int64, expiredAt int64, isEOF bool) { t.Log("[expiredAt]", "["+string(data)+"]") })) t.Log(time.Since(now).Seconds()*1000, "ms") @@ -133,7 +133,7 @@ func TestFileStorage_Read_NotFound(t *testing.T) { t.Fatal(err) } now := time.Now() - t.Log(storage.Read("my-key-10000", make([]byte, 64), func(data []byte, expiredAt int64) { + t.Log(storage.Read("my-key-10000", make([]byte, 64), func(data []byte, size int64, expiredAt int64, isEOF bool) { t.Log("[" + string(data) + "]") })) t.Log(time.Since(now).Seconds()*1000, "ms") @@ -277,7 +277,7 @@ func BenchmarkFileStorage_Read(b *testing.B) { } buf := make([]byte, 1024) for i := 0; i < b.N; i++ { - _ = storage.Read("my-key", buf, func(data []byte, expiredAt int64) { + _ = storage.Read("my-key", buf, func(data []byte, size int64, expiredAt int64, isEOF bool) { }) } } diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go new file mode 100644 index 0000000..99bc5b1 --- /dev/null +++ b/internal/caches/storage_memory.go @@ -0,0 +1,167 @@ +package caches + +import ( + "fmt" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeNode/internal/errors" + "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/dchest/siphash" + "strconv" + "sync" + "time" +) + +type MemoryItem struct { + ExpiredAt int64 + Value []byte +} + +type MemoryStorage struct { + policy *serverconfigs.HTTPCachePolicy + list *List + totalSize int64 // 需要实现 + locker *sync.RWMutex + valuesMap map[uint64]*MemoryItem + ticker *utils.Ticker + purgeDuration time.Duration +} + +func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy) *MemoryStorage { + return &MemoryStorage{ + policy: policy, + list: NewList(), + locker: &sync.RWMutex{}, + valuesMap: map[uint64]*MemoryItem{}, + } +} + +// 初始化 +func (this *MemoryStorage) Init() error { + if this.purgeDuration <= 0 { + this.purgeDuration = 30 * time.Second + } + + // 启动定时清理任务 + this.ticker = utils.NewTicker(this.purgeDuration) + go func() { + for this.ticker.Next() { + this.purgeLoop() + } + }() + + return nil +} + +// 读取缓存 +func (this *MemoryStorage) Read(key string, readerBuf []byte, callback func(data []byte, size int64, expiredAt int64, isEOF bool)) error { + hash := this.hash(key) + + this.locker.RLock() + item := this.valuesMap[hash] + if item == nil { + this.locker.RUnlock() + return ErrNotFound + } + + if item.ExpiredAt > utils.UnixTime() { + callback(item.Value, int64(len(item.Value)), item.ExpiredAt, true) + this.locker.RUnlock() + return nil + } + this.locker.RUnlock() + + _ = this.Delete(key) + + return ErrNotFound +} + +// 打开缓存写入器等待写入 +func (this *MemoryStorage) Open(key string, expiredAt int64) (Writer, error) { + // 检查是否超出最大值 + if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys { + return nil, errors.New("too many keys in cache storage") + } + + return NewMemoryWriter(this.valuesMap, key, expiredAt, this.locker), nil +} + +// 删除某个键值对应的缓存 +func (this *MemoryStorage) Delete(key string) error { + hash := this.hash(key) + this.locker.Lock() + delete(this.valuesMap, hash) + this.list.Remove(fmt.Sprintf("%d", hash)) + this.locker.Unlock() + return nil +} + +// 统计缓存 +func (this *MemoryStorage) Stat() (*Stat, error) { + this.locker.RLock() + defer this.locker.RUnlock() + + return this.list.Stat(func(hash string) bool { + return true + }), nil +} + +// 清除所有缓存 +func (this *MemoryStorage) CleanAll() error { + this.locker.Lock() + this.valuesMap = map[uint64]*MemoryItem{} + this.list.Reset() + this.locker.Unlock() + return nil +} + +// 批量删除缓存 +func (this *MemoryStorage) Purge(keys []string) error { + for _, key := range keys { + err := this.Delete(key) + if err != nil { + return err + } + } + return nil +} + +// 停止缓存策略 +func (this *MemoryStorage) Stop() { + this.locker.Lock() + defer this.locker.Unlock() + + this.valuesMap = map[uint64]*MemoryItem{} + this.list.Reset() + if this.ticker != nil { + this.ticker.Stop() + } +} + +// 获取当前存储的Policy +func (this *MemoryStorage) Policy() *serverconfigs.HTTPCachePolicy { + return this.policy +} + +// 将缓存添加到列表 +func (this *MemoryStorage) AddToList(item *Item) { + item.Size = item.ValueSize + int64(len(item.Key)) + hash := fmt.Sprintf("%d", this.hash(item.Key)) + this.list.Add(hash, item) +} + +// 计算Key Hash +func (this *MemoryStorage) hash(key string) uint64 { + return siphash.Hash(0, 0, []byte(key)) +} + +// 清理任务 +func (this *MemoryStorage) purgeLoop() { + this.list.Purge(1000, func(hash string) { + uintHash, err := strconv.ParseUint(hash, 10, 64) + if err == nil { + this.locker.Lock() + delete(this.valuesMap, uintHash) + this.locker.Unlock() + } + }) +} diff --git a/internal/caches/storage_memory_test.go b/internal/caches/storage_memory_test.go new file mode 100644 index 0000000..8b7c057 --- /dev/null +++ b/internal/caches/storage_memory_test.go @@ -0,0 +1,218 @@ +package caches + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/iwind/TeaGo/logs" + "github.com/iwind/TeaGo/rands" + "strconv" + "testing" + "time" +) + +func TestMemoryStorage_Open(t *testing.T) { + storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}) + + writer, err := storage.Open("abc", time.Now().Unix()+60) + if err != nil { + t.Fatal(err) + } + _, _ = writer.Write([]byte("Hello")) + _, _ = writer.Write([]byte(", World")) + t.Log(storage.valuesMap) + + { + err = storage.Read("abc", make([]byte, 8), func(data []byte, size int64, expiredAt int64, isEOF bool) { + t.Log("read:", string(data)) + }) + if err != nil { + if err == ErrNotFound { + t.Log("not found: abc") + } else { + t.Fatal(err) + } + } + } + + { + err = storage.Read("abc 2", make([]byte, 8), func(data []byte, size int64, expiredAt int64, isEOF bool) { + t.Log("read:", string(data)) + }) + if err != nil { + if err == ErrNotFound { + t.Log("not found: abc2") + } else { + t.Fatal(err) + } + } + } + + writer, err = storage.Open("abc", time.Now().Unix()+60) + if err != nil { + t.Fatal(err) + } + _, _ = writer.Write([]byte("Hello123")) + { + err = storage.Read("abc", make([]byte, 8), func(data []byte, size int64, expiredAt int64, isEOF bool) { + t.Log("read:", string(data)) + }) + if err != nil { + if err == ErrNotFound { + t.Log("not found: abc") + } else { + t.Fatal(err) + } + } + } +} + +func TestMemoryStorage_Delete(t *testing.T) { + storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}) + { + writer, err := storage.Open("abc", time.Now().Unix()+60) + if err != nil { + t.Fatal(err) + } + _, _ = writer.Write([]byte("Hello")) + t.Log(len(storage.valuesMap)) + } + { + writer, err := storage.Open("abc1", time.Now().Unix()+60) + if err != nil { + t.Fatal(err) + } + _, _ = writer.Write([]byte("Hello")) + t.Log(len(storage.valuesMap)) + } + _ = storage.Delete("abc1") + t.Log(len(storage.valuesMap)) +} + +func TestMemoryStorage_Stat(t *testing.T) { + storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}) + expiredAt := time.Now().Unix() + 60 + { + writer, err := storage.Open("abc", expiredAt) + if err != nil { + t.Fatal(err) + } + _, _ = writer.Write([]byte("Hello")) + t.Log(len(storage.valuesMap)) + storage.AddToList(&Item{ + Key: "abc", + Size: 5, + ExpiredAt: expiredAt, + }) + } + { + writer, err := storage.Open("abc1", expiredAt) + if err != nil { + t.Fatal(err) + } + _, _ = writer.Write([]byte("Hello")) + t.Log(len(storage.valuesMap)) + storage.AddToList(&Item{ + Key: "abc1", + Size: 5, + ExpiredAt: expiredAt, + }) + } + stat, err := storage.Stat() + if err != nil { + t.Fatal(err) + } + t.Log("===stat===") + logs.PrintAsJSON(stat, t) +} + +func TestMemoryStorage_CleanAll(t *testing.T) { + storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}) + expiredAt := time.Now().Unix() + 60 + { + writer, err := storage.Open("abc", expiredAt) + if err != nil { + t.Fatal(err) + } + _, _ = writer.Write([]byte("Hello")) + storage.AddToList(&Item{ + Key: "abc", + Size: 5, + ExpiredAt: expiredAt, + }) + } + { + writer, err := storage.Open("abc1", expiredAt) + if err != nil { + t.Fatal(err) + } + _, _ = writer.Write([]byte("Hello")) + storage.AddToList(&Item{ + Key: "abc1", + Size: 5, + ExpiredAt: expiredAt, + }) + } + err := storage.CleanAll() + if err != nil { + t.Fatal(err) + } + t.Log(storage.list.Count(), len(storage.valuesMap)) +} + +func TestMemoryStorage_Purge(t *testing.T) { + storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}) + expiredAt := time.Now().Unix() + 60 + { + writer, err := storage.Open("abc", expiredAt) + if err != nil { + t.Fatal(err) + } + _, _ = writer.Write([]byte("Hello")) + storage.AddToList(&Item{ + Key: "abc", + Size: 5, + ExpiredAt: expiredAt, + }) + } + { + writer, err := storage.Open("abc1", expiredAt) + if err != nil { + t.Fatal(err) + } + _, _ = writer.Write([]byte("Hello")) + storage.AddToList(&Item{ + Key: "abc1", + Size: 5, + ExpiredAt: expiredAt, + }) + } + err := storage.Purge([]string{"abc", "abc1"}) + if err != nil { + t.Fatal(err) + } + t.Log(storage.list.Count(), len(storage.valuesMap)) +} + +func TestMemoryStorage_Expire(t *testing.T) { + storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}) + storage.purgeDuration = 5 * time.Second + err := storage.Init() + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 1000; i++ { + expiredAt := time.Now().Unix() + int64(rands.Int(0, 60)) + key := "abc" + strconv.Itoa(i) + writer, err := storage.Open(key, expiredAt) + if err != nil { + t.Fatal(err) + } + _, _ = writer.Write([]byte("Hello")) + storage.AddToList(&Item{ + Key: key, + Size: 5, + ExpiredAt: expiredAt, + }) + } + time.Sleep(70 * time.Second) +} diff --git a/internal/caches/writer.go b/internal/caches/writer.go index e402d5c..8e60702 100644 --- a/internal/caches/writer.go +++ b/internal/caches/writer.go @@ -5,6 +5,9 @@ type Writer interface { // 写入数据 Write(data []byte) (n int, err error) + // 写入的总数据大小 + Size() int64 + // 关闭 Close() error diff --git a/internal/caches/file_writer.go b/internal/caches/writer_file.go similarity index 100% rename from internal/caches/file_writer.go rename to internal/caches/writer_file.go diff --git a/internal/caches/writer_gzip.go b/internal/caches/writer_gzip.go index 11754e1..b416bc9 100644 --- a/internal/caches/writer_gzip.go +++ b/internal/caches/writer_gzip.go @@ -45,3 +45,7 @@ func (this *gzipWriter) Key() string { func (this *gzipWriter) ExpiredAt() int64 { return this.expiredAt } + +func (this *gzipWriter) Size() int64 { + return this.rawWriter.Size() +} diff --git a/internal/caches/writer_memory.go b/internal/caches/writer_memory.go new file mode 100644 index 0000000..8781d33 --- /dev/null +++ b/internal/caches/writer_memory.go @@ -0,0 +1,83 @@ +package caches + +import ( + "github.com/dchest/siphash" + "sync" +) + +type MemoryWriter struct { + key string + expiredAt int64 + m map[uint64]*MemoryItem + locker *sync.RWMutex + isFirstWriting bool + size int64 +} + +func NewMemoryWriter(m map[uint64]*MemoryItem, key string, expiredAt int64, locker *sync.RWMutex) *MemoryWriter { + return &MemoryWriter{ + m: m, + key: key, + expiredAt: expiredAt, + locker: locker, + isFirstWriting: true, + } +} + +// 写入数据 +func (this *MemoryWriter) Write(data []byte) (n int, err error) { + this.size += int64(len(data)) + + hash := this.hash(this.key) + this.locker.Lock() + item, ok := this.m[hash] + if ok { + // 第一次写先清空 + if this.isFirstWriting { + item.Value = nil + this.isFirstWriting = false + } + item.Value = append(item.Value, data...) + } else { + item := &MemoryItem{} + item.Value = append([]byte{}, data...) + item.ExpiredAt = this.expiredAt + this.m[hash] = item + } + this.locker.Unlock() + return len(data), nil +} + +// 数据尺寸 +func (this *MemoryWriter) Size() int64 { + return this.size +} + +// 关闭 +func (this *MemoryWriter) Close() error { + return nil +} + +// 丢弃 +func (this *MemoryWriter) Discard() error { + hash := this.hash(this.key) + this.locker.Lock() + delete(this.m, hash) + this.locker.Unlock() + return nil +} + +// Key +func (this *MemoryWriter) Key() string { + return this.key +} + +// 过期时间 +func (this *MemoryWriter) ExpiredAt() int64 { + return this.expiredAt +} + +// 计算Key Hash +func (this *MemoryWriter) hash(key string) uint64 { + return siphash.Hash(0, 0, []byte(key)) +} diff --git a/internal/nodes/api_stream.go b/internal/nodes/api_stream.go index 4c9abbe..6b9806d 100644 --- a/internal/nodes/api_stream.go +++ b/internal/nodes/api_stream.go @@ -430,7 +430,7 @@ func (this *APIStream) cacheStorage(message *pb.NodeStreamMessage, cachePolicyJS storage = caches.SharedManager.NewStorageWithPolicy(cachePolicy) if storage == nil { this.replyFail(message.RequestId, "invalid storage type '"+cachePolicy.Type+"'") - return nil, false, err + return nil, false, errors.New("invalid storage type '" + cachePolicy.Type + "'") } err = storage.Init() if err != nil { diff --git a/internal/nodes/http_request_cache.go b/internal/nodes/http_request_cache.go index 9b14f58..fb90dcf 100644 --- a/internal/nodes/http_request_cache.go +++ b/internal/nodes/http_request_cache.go @@ -64,17 +64,13 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) { return } - buf := bytePool32k.Get() - defer func() { - bytePool32k.Put(buf) - }() - isBroken := false headerBuf := []byte{} statusCode := http.StatusOK statusFound := false headerFound := false + buf := bytePool32k.Get() err := storage.Read(key, buf, func(data []byte, valueSize int64, expiredAt int64, isEOF bool) { if isBroken { return @@ -134,6 +130,8 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) { } }) + bytePool32k.Put(buf) + if err != nil { if err == caches.ErrNotFound { // cache相关变量 diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index c4434e5..37aa2c0 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -217,6 +217,7 @@ func (this *HTTPWriter) Close() { this.cacheStorage.AddToList(&caches.Item{ Key: this.cacheWriter.Key(), ExpiredAt: this.cacheWriter.ExpiredAt(), + ValueSize: this.cacheWriter.Size(), }) } }