文件缓存策略支持二级缓存(内存 | 文件)

This commit is contained in:
GoEdgeLab
2021-03-02 19:43:05 +08:00
parent 5f5399cc61
commit 5f6b270af3
9 changed files with 145 additions and 22 deletions

View File

@@ -2,7 +2,15 @@ package caches
import "time" import "time"
type ItemType = int
const (
ItemTypeFile ItemType = 1
ItemTypeMemory ItemType = 2
)
type Item struct { type Item struct {
Type ItemType
Key string Key string
ExpiredAt int64 ExpiredAt int64
HeaderSize int64 HeaderSize int64

View File

@@ -6,6 +6,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils"
@@ -44,7 +45,8 @@ var (
// [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data] // [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data]
type FileStorage struct { type FileStorage struct {
policy *serverconfigs.HTTPCachePolicy policy *serverconfigs.HTTPCachePolicy
cacheConfig *serverconfigs.HTTPFileCacheStorage cacheConfig *serverconfigs.HTTPFileCacheStorage // 二级缓存
memoryStorage *MemoryStorage // 一级缓存
totalSize int64 totalSize int64
list *List list *List
@@ -77,6 +79,7 @@ func (this *FileStorage) Init() error {
defer this.locker.Unlock() defer this.locker.Unlock()
before := time.Now() before := time.Now()
cacheDir := ""
defer func() { defer func() {
// 统计 // 统计
count := 0 count := 0
@@ -98,7 +101,7 @@ func (this *FileStorage) Init() error {
} else if size > 1024 { } else if size > 1024 {
sizeMB = fmt.Sprintf("%.3f K", float64(size)/1024) sizeMB = fmt.Sprintf("%.3f K", float64(size)/1024)
} }
remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+", cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+strconv.Itoa(count)+", size: "+sizeMB) remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+" from '"+cacheDir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+strconv.Itoa(count)+", size: "+sizeMB)
}() }()
// 配置 // 配置
@@ -112,6 +115,7 @@ func (this *FileStorage) Init() error {
return err return err
} }
this.cacheConfig = cacheConfig this.cacheConfig = cacheConfig
cacheDir = cacheConfig.Dir
if !filepath.IsAbs(this.cacheConfig.Dir) { if !filepath.IsAbs(this.cacheConfig.Dir) {
this.cacheConfig.Dir = Tea.Root + Tea.DS + this.cacheConfig.Dir this.cacheConfig.Dir = Tea.Root + Tea.DS + this.cacheConfig.Dir
@@ -142,10 +146,49 @@ func (this *FileStorage) Init() error {
return err return err
} }
// 加载内存缓存
if this.cacheConfig.MemoryPolicy != nil {
memoryCapacity := this.cacheConfig.MemoryPolicy.Capacity
if memoryCapacity != nil && memoryCapacity.Count > 0 {
memoryPolicy := &serverconfigs.HTTPCachePolicy{
Id: this.policy.Id,
IsOn: this.policy.IsOn,
Name: this.policy.Name,
Description: this.policy.Description,
Capacity: memoryCapacity,
MaxKeys: this.policy.MaxKeys,
MaxSize: &shared.SizeCapacity{Count: 128, Unit: shared.SizeCapacityUnitMB}, // TODO 将来可以修改
Type: serverconfigs.CachePolicyStorageMemory,
Options: this.policy.Options,
Life: this.policy.Life,
MinLife: this.policy.MinLife,
MaxLife: this.policy.MaxLife,
}
err = memoryPolicy.Init()
if err != nil {
return err
}
memoryStorage := NewMemoryStorage(memoryPolicy)
err = memoryStorage.Init()
if err != nil {
return err
}
this.memoryStorage = memoryStorage
}
}
return nil return nil
} }
func (this *FileStorage) OpenReader(key string) (Reader, error) { func (this *FileStorage) OpenReader(key string) (Reader, error) {
// 先尝试内存缓存
if this.memoryStorage != nil {
reader, err := this.memoryStorage.OpenReader(key)
if err == nil {
return reader, err
}
}
hash, path := this.keyPath(key) hash, path := this.keyPath(key)
if !this.list.Exist(hash) { if !this.list.Exist(hash) {
return nil, ErrNotFound return nil, ErrNotFound
@@ -173,6 +216,14 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) {
// 打开缓存文件等待写入 // 打开缓存文件等待写入
func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) { func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) {
// 先尝试内存缓存
if this.memoryStorage != nil {
writer, err := this.memoryStorage.OpenWriter(key, expiredAt, status)
if err == nil {
return writer, nil
}
}
// 检查是否超出最大值 // 检查是否超出最大值
if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys { if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys {
return nil, errors.New("write file cache failed: too many keys in cache storage") return nil, errors.New("write file cache failed: too many keys in cache storage")
@@ -291,6 +342,13 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr
// 添加到List // 添加到List
func (this *FileStorage) AddToList(item *Item) { func (this *FileStorage) AddToList(item *Item) {
if this.memoryStorage != nil {
if item.Type == ItemTypeMemory {
this.memoryStorage.AddToList(item)
return
}
}
item.MetaSize = SizeMeta item.MetaSize = SizeMeta
hash := stringutil.Md5(item.Key) hash := stringutil.Md5(item.Key)
this.list.Add(hash, item) this.list.Add(hash, item)
@@ -301,6 +359,11 @@ func (this *FileStorage) Delete(key string) error {
this.locker.Lock() this.locker.Lock()
defer this.locker.Unlock() defer this.locker.Unlock()
// 先尝试内存缓存
if this.memoryStorage != nil {
_ = this.memoryStorage.Delete(key)
}
hash, path := this.keyPath(key) hash, path := this.keyPath(key)
this.list.Remove(hash) this.list.Remove(hash)
err := os.Remove(path) err := os.Remove(path)
@@ -325,6 +388,11 @@ func (this *FileStorage) CleanAll() error {
this.locker.Lock() this.locker.Lock()
defer this.locker.Unlock() defer this.locker.Unlock()
// 先尝试内存缓存
if this.memoryStorage != nil {
_ = this.memoryStorage.CleanAll()
}
this.list.Reset() this.list.Reset()
// 删除缓存和目录 // 删除缓存和目录
@@ -378,6 +446,11 @@ func (this *FileStorage) Purge(keys []string, urlType string) error {
this.locker.Lock() this.locker.Lock()
defer this.locker.Unlock() defer this.locker.Unlock()
// 先尝试内存缓存
if this.memoryStorage != nil {
_ = this.memoryStorage.Purge(keys, urlType)
}
// 目录 // 目录
if urlType == "dir" { if urlType == "dir" {
resultKeys := []string{} resultKeys := []string{}
@@ -412,6 +485,11 @@ func (this *FileStorage) Stop() {
this.locker.Lock() this.locker.Lock()
defer this.locker.Unlock() defer this.locker.Unlock()
// 先尝试内存缓存
if this.memoryStorage != nil {
this.memoryStorage.Stop()
}
this.list.Reset() this.list.Reset()
if this.ticker != nil { if this.ticker != nil {
this.ticker.Stop() this.ticker.Stop()
@@ -518,6 +596,7 @@ func (this *FileStorage) decodeFile(path string) (*Item, error) {
}() }()
item := &Item{ item := &Item{
Type: ItemTypeFile,
MetaSize: SizeMeta, MetaSize: SizeMeta,
} }

View File

@@ -17,6 +17,7 @@ type MemoryItem struct {
HeaderValue []byte HeaderValue []byte
BodyValue []byte BodyValue []byte
Status int Status int
IsDone bool
} }
type MemoryStorage struct { type MemoryStorage struct {
@@ -67,15 +68,14 @@ func (this *MemoryStorage) OpenReader(key string) (Reader, error) {
hash := this.hash(key) hash := this.hash(key)
this.locker.RLock() this.locker.RLock()
defer this.locker.RUnlock()
item := this.valuesMap[hash] item := this.valuesMap[hash]
if item == nil { if item == nil || !item.IsDone {
this.locker.RUnlock()
return nil, ErrNotFound return nil, ErrNotFound
} }
if item.ExpiredAt > utils.UnixTime() { if item.ExpiredAt > utils.UnixTime() {
this.locker.RUnlock()
reader := NewMemoryReader(item) reader := NewMemoryReader(item)
err := reader.Init() err := reader.Init()
if err != nil { if err != nil {
@@ -83,7 +83,6 @@ func (this *MemoryStorage) OpenReader(key string) (Reader, error) {
} }
return reader, nil return reader, nil
} }
this.locker.RUnlock()
_ = this.Delete(key) _ = this.Delete(key)
@@ -190,7 +189,7 @@ func (this *MemoryStorage) hash(key string) uint64 {
// 清理任务 // 清理任务
func (this *MemoryStorage) purgeLoop() { func (this *MemoryStorage) purgeLoop() {
this.list.Purge(1000, func(hash string) { this.list.Purge(2048, func(hash string) {
uintHash, err := strconv.ParseUint(hash, 10, 64) uintHash, err := strconv.ParseUint(hash, 10, 64)
if err == nil { if err == nil {
this.locker.Lock() this.locker.Lock()

View File

@@ -25,4 +25,7 @@ type Writer interface {
// 过期时间 // 过期时间
ExpiredAt() int64 ExpiredAt() int64
// 内容类型
ItemType() ItemType
} }

View File

@@ -126,3 +126,8 @@ func (this *FileWriter) ExpiredAt() int64 {
func (this *FileWriter) Key() string { func (this *FileWriter) Key() string {
return this.key return this.key
} }
// 内容类型
func (this *FileWriter) ItemType() ItemType {
return ItemTypeFile
}

View File

@@ -67,3 +67,8 @@ func (this *gzipWriter) HeaderSize() int64 {
func (this *gzipWriter) BodySize() int64 { func (this *gzipWriter) BodySize() int64 {
return this.rawWriter.BodySize() return this.rawWriter.BodySize()
} }
// 内容类型
func (this *gzipWriter) ItemType() ItemType {
return this.rawWriter.ItemType()
}

View File

@@ -14,10 +14,13 @@ type MemoryWriter struct {
headerSize int64 headerSize int64
bodySize int64 bodySize int64
status int status int
hash uint64
item *MemoryItem
} }
func NewMemoryWriter(m map[uint64]*MemoryItem, key string, expiredAt int64, status int, locker *sync.RWMutex) *MemoryWriter { func NewMemoryWriter(m map[uint64]*MemoryItem, key string, expiredAt int64, status int, locker *sync.RWMutex) *MemoryWriter {
return &MemoryWriter{ w := &MemoryWriter{
m: m, m: m,
key: key, key: key,
expiredAt: expiredAt, expiredAt: expiredAt,
@@ -25,16 +28,20 @@ func NewMemoryWriter(m map[uint64]*MemoryItem, key string, expiredAt int64, stat
isFirstWriting: true, isFirstWriting: true,
status: status, status: status,
} }
w.hash = w.calculateHash(key)
return w
} }
// 写入数据 // 写入数据
func (this *MemoryWriter) WriteHeader(data []byte) (n int, err error) { func (this *MemoryWriter) WriteHeader(data []byte) (n int, err error) {
this.headerSize += int64(len(data)) this.headerSize += int64(len(data))
hash := this.hash(this.key)
this.locker.Lock() this.locker.Lock()
item, ok := this.m[hash] item, ok := this.m[this.hash]
if ok { if ok {
item.IsDone = false
// 第一次写先清空 // 第一次写先清空
if this.isFirstWriting { if this.isFirstWriting {
item.HeaderValue = nil item.HeaderValue = nil
@@ -43,13 +50,13 @@ func (this *MemoryWriter) WriteHeader(data []byte) (n int, err error) {
} }
item.HeaderValue = append(item.HeaderValue, data...) item.HeaderValue = append(item.HeaderValue, data...)
} else { } else {
item := &MemoryItem{} item = &MemoryItem{}
item.HeaderValue = append([]byte{}, data...) item.HeaderValue = append([]byte{}, data...)
item.ExpiredAt = this.expiredAt item.ExpiredAt = this.expiredAt
item.Status = this.status item.Status = this.status
this.m[hash] = item
this.isFirstWriting = false this.isFirstWriting = false
} }
this.item = item
this.locker.Unlock() this.locker.Unlock()
return len(data), nil return len(data), nil
} }
@@ -58,10 +65,11 @@ func (this *MemoryWriter) WriteHeader(data []byte) (n int, err error) {
func (this *MemoryWriter) Write(data []byte) (n int, err error) { func (this *MemoryWriter) Write(data []byte) (n int, err error) {
this.bodySize += int64(len(data)) this.bodySize += int64(len(data))
hash := this.hash(this.key)
this.locker.Lock() this.locker.Lock()
item, ok := this.m[hash] item, ok := this.m[this.hash]
if ok { if ok {
item.IsDone = false
// 第一次写先清空 // 第一次写先清空
if this.isFirstWriting { if this.isFirstWriting {
item.HeaderValue = nil item.HeaderValue = nil
@@ -70,13 +78,13 @@ func (this *MemoryWriter) Write(data []byte) (n int, err error) {
} }
item.BodyValue = append(item.BodyValue, data...) item.BodyValue = append(item.BodyValue, data...)
} else { } else {
item := &MemoryItem{} item = &MemoryItem{}
item.BodyValue = append([]byte{}, data...) item.BodyValue = append([]byte{}, data...)
item.ExpiredAt = this.expiredAt item.ExpiredAt = this.expiredAt
item.Status = this.status item.Status = this.status
this.m[hash] = item
this.isFirstWriting = false this.isFirstWriting = false
} }
this.item = item
this.locker.Unlock() this.locker.Unlock()
return len(data), nil return len(data), nil
} }
@@ -92,14 +100,22 @@ func (this *MemoryWriter) BodySize() int64 {
// 关闭 // 关闭
func (this *MemoryWriter) Close() error { func (this *MemoryWriter) Close() error {
if this.item == nil {
return nil
}
this.locker.Lock()
this.item.IsDone = true
this.m[this.hash] = this.item
this.locker.Unlock()
return nil return nil
} }
// 丢弃 // 丢弃
func (this *MemoryWriter) Discard() error { func (this *MemoryWriter) Discard() error {
hash := this.hash(this.key)
this.locker.Lock() this.locker.Lock()
delete(this.m, hash) delete(this.m, this.hash)
this.locker.Unlock() this.locker.Unlock()
return nil return nil
} }
@@ -114,7 +130,12 @@ func (this *MemoryWriter) ExpiredAt() int64 {
return this.expiredAt return this.expiredAt
} }
// 内容类型
func (this *MemoryWriter) ItemType() ItemType {
return ItemTypeMemory
}
// 计算Key Hash // 计算Key Hash
func (this *MemoryWriter) hash(key string) uint64 { func (this *MemoryWriter) calculateHash(key string) uint64 {
return xxhash.Sum64String(key) return xxhash.Sum64String(key)
} }

View File

@@ -182,6 +182,7 @@ func (this *APIStream) handleWriteCache(message *pb.NodeStreamMessage) error {
return err return err
} }
storage.AddToList(&caches.Item{ storage.AddToList(&caches.Item{
Type: writer.ItemType(),
Key: msg.Key, Key: msg.Key,
ExpiredAt: expiredAt, ExpiredAt: expiredAt,
HeaderSize: writer.HeaderSize(), HeaderSize: writer.HeaderSize(),
@@ -445,6 +446,7 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error {
err = writer.Close() err = writer.Close()
if err == nil { if err == nil {
storage.AddToList(&caches.Item{ storage.AddToList(&caches.Item{
Type: writer.ItemType(),
Key: key, Key: key,
ExpiredAt: expiredAt, ExpiredAt: expiredAt,
}) })

View File

@@ -215,6 +215,7 @@ func (this *HTTPWriter) Close() {
err := this.cacheWriter.Close() err := this.cacheWriter.Close()
if err == nil { if err == nil {
this.cacheStorage.AddToList(&caches.Item{ this.cacheStorage.AddToList(&caches.Item{
Type: this.cacheWriter.ItemType(),
Key: this.cacheWriter.Key(), Key: this.cacheWriter.Key(),
ExpiredAt: this.cacheWriter.ExpiredAt(), ExpiredAt: this.cacheWriter.ExpiredAt(),
HeaderSize: this.cacheWriter.HeaderSize(), HeaderSize: this.cacheWriter.HeaderSize(),