分块传输内容可以写入到内存中/分块传输内容可以判断最大尺寸

This commit is contained in:
GoEdgeLab
2022-03-06 17:18:06 +08:00
parent f788f3894e
commit 96db004fb2
14 changed files with 259 additions and 40 deletions

View File

@@ -6,9 +6,10 @@ import "errors"
// 常用的几个错误 // 常用的几个错误
var ( var (
ErrNotFound = errors.New("cache not found") ErrNotFound = errors.New("cache not found")
ErrFileIsWriting = errors.New("the file is writing") ErrFileIsWriting = errors.New("the file is writing")
ErrInvalidRange = errors.New("invalid range") ErrInvalidRange = errors.New("invalid range")
ErrEntityTooLarge = errors.New("entity too large")
) )
// CapacityError 容量错误 // CapacityError 容量错误
@@ -30,7 +31,7 @@ func CanIgnoreErr(err error) bool {
if err == nil { if err == nil {
return true return true
} }
if err == ErrFileIsWriting { if err == ErrFileIsWriting || err == ErrEntityTooLarge {
return true return true
} }
_, ok := err.(*CapacityError) _, ok := err.(*CapacityError)

View File

@@ -12,6 +12,8 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils"
setutils "github.com/TeaOSLab/EdgeNode/internal/utils/sets"
"github.com/TeaOSLab/EdgeNode/internal/utils/sizes"
"github.com/TeaOSLab/EdgeNode/internal/zero" "github.com/TeaOSLab/EdgeNode/internal/zero"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/logs"
@@ -44,7 +46,8 @@ const (
) )
const ( const (
HotItemSize = 1024 HotItemSize = 1024 // 热点数据数量
FileToMemoryMaxSize int64 = 32 * sizes.M // 可以从文件写入到内存的最大文件尺寸
) )
var sharedWritingFileKeyMap = map[string]zero.Zero{} // key => bool var sharedWritingFileKeyMap = map[string]zero.Zero{} // key => bool
@@ -68,6 +71,8 @@ type FileStorage struct {
lastHotSize int lastHotSize int
hotTicker *utils.Ticker hotTicker *utils.Ticker
ignoreKeys *setutils.FixedSet
openFileCache *OpenFileCache openFileCache *OpenFileCache
} }
@@ -76,6 +81,7 @@ func NewFileStorage(policy *serverconfigs.HTTPCachePolicy) *FileStorage {
policy: policy, policy: policy,
hotMap: map[string]*HotItem{}, hotMap: map[string]*HotItem{},
lastHotSize: -1, lastHotSize: -1,
ignoreKeys: setutils.NewFixedSet(32768),
} }
} }
@@ -297,7 +303,7 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
// 增加点击量 // 增加点击量
// 1/1000采样 // 1/1000采样
if !isPartial && allowMemory { if !isPartial && allowMemory && reader.BodySize() < FileToMemoryMaxSize {
this.increaseHit(key, hash, reader) this.increaseHit(key, hash, reader)
} }
@@ -306,11 +312,20 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
} }
// OpenWriter 打开缓存文件等待写入 // OpenWriter 打开缓存文件等待写入
func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, size int64, isPartial bool) (Writer, error) { func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, size int64, maxSize int64, isPartial bool) (Writer, error) {
// 是否已忽略
if this.ignoreKeys.Has(key) {
return nil, ErrEntityTooLarge
}
// 先尝试内存缓存 // 先尝试内存缓存
// 我们限定仅小文件优先存在内存中 // 我们限定仅小文件优先存在内存中
if !isPartial && this.memoryStorage != nil && size > 0 && size < 32*1024*1024 { var maxMemorySize = FileToMemoryMaxSize
writer, err := this.memoryStorage.OpenWriter(key, expiredAt, status, size, false) if maxSize > maxMemorySize {
maxMemorySize = maxSize
}
if !isPartial && this.memoryStorage != nil && ((size > 0 && size < maxMemorySize) || size < 0) {
writer, err := this.memoryStorage.OpenWriter(key, expiredAt, status, size, maxMemorySize, false)
if err == nil { if err == nil {
return writer, nil return writer, nil
} }
@@ -499,7 +514,7 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, siz
sharedWritingFileKeyLocker.Unlock() sharedWritingFileKeyLocker.Unlock()
}), nil }), nil
} else { } else {
return NewFileWriter(writer, key, expiredAt, func() { return NewFileWriter(this, writer, key, expiredAt, -1, func() {
sharedWritingFileKeyLocker.Lock() sharedWritingFileKeyLocker.Lock()
delete(sharedWritingFileKeyMap, key) delete(sharedWritingFileKeyMap, key)
sharedWritingFileKeyLocker.Unlock() sharedWritingFileKeyLocker.Unlock()
@@ -689,6 +704,8 @@ func (this *FileStorage) Stop() {
if this.openFileCache != nil { if this.openFileCache != nil {
this.openFileCache.CloseAll() this.openFileCache.CloseAll()
} }
this.ignoreKeys.Reset()
} }
// TotalDiskSize 消耗的磁盘尺寸 // TotalDiskSize 消耗的磁盘尺寸
@@ -704,6 +721,11 @@ func (this *FileStorage) TotalMemorySize() int64 {
return this.memoryStorage.TotalMemorySize() return this.memoryStorage.TotalMemorySize()
} }
// IgnoreKey 忽略某个Key即不缓存某个Key
func (this *FileStorage) IgnoreKey(key string) {
this.ignoreKeys.Push(key)
}
// 绝对路径 // 绝对路径
func (this *FileStorage) dir() string { func (this *FileStorage) dir() string {
return this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" return this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/"
@@ -929,7 +951,7 @@ func (this *FileStorage) hotLoop() {
continue continue
} }
writer, err := this.memoryStorage.openWriter(item.Key, item.ExpiresAt, item.Status, reader.BodySize(), false) writer, err := this.memoryStorage.openWriter(item.Key, item.ExpiresAt, item.Status, reader.BodySize(), -1, false)
if err != nil { if err != nil {
if !CanIgnoreErr(err) { if !CanIgnoreErr(err) {
remotelogs.Error("CACHE", "transfer hot item failed: "+err.Error()) remotelogs.Error("CACHE", "transfer hot item failed: "+err.Error())

View File

@@ -62,7 +62,7 @@ func TestFileStorage_OpenWriter(t *testing.T) {
header := []byte("Header") header := []byte("Header")
body := []byte("This is Body") body := []byte("This is Body")
writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200, -1, false) writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200, -1, -1, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -100,7 +100,7 @@ func TestFileStorage_OpenWriter_Partial(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200, -1, true) writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200, -1, -1, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -139,7 +139,7 @@ func TestFileStorage_OpenWriter_HTTP(t *testing.T) {
t.Log(time.Since(now).Seconds()*1000, "ms") t.Log(time.Since(now).Seconds()*1000, "ms")
}() }()
writer, err := storage.OpenWriter("my-http-response", time.Now().Unix()+86400, 200, -1, false) writer, err := storage.OpenWriter("my-http-response", time.Now().Unix()+86400, 200, -1, -1, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -212,7 +212,7 @@ func TestFileStorage_Concurrent_Open_DifferentFile(t *testing.T) {
go func(i int) { go func(i int) {
defer wg.Done() defer wg.Done()
writer, err := storage.OpenWriter("abc"+strconv.Itoa(i), time.Now().Unix()+3600, 200, -1, false) writer, err := storage.OpenWriter("abc"+strconv.Itoa(i), time.Now().Unix()+3600, 200, -1, -1, false)
if err != nil { if err != nil {
if err != ErrFileIsWriting { if err != ErrFileIsWriting {
t.Error(err) t.Error(err)
@@ -267,7 +267,7 @@ func TestFileStorage_Concurrent_Open_SameFile(t *testing.T) {
go func(i int) { go func(i int) {
defer wg.Done() defer wg.Done()
writer, err := storage.OpenWriter("abc"+strconv.Itoa(0), time.Now().Unix()+3600, 200, -1, false) writer, err := storage.OpenWriter("abc"+strconv.Itoa(0), time.Now().Unix()+3600, 200, -1, -1, false)
if err != nil { if err != nil {
if err != ErrFileIsWriting { if err != ErrFileIsWriting {
t.Error(err) t.Error(err)

View File

@@ -13,7 +13,8 @@ type StorageInterface interface {
OpenReader(key string, useStale bool, isPartial bool) (reader Reader, err error) OpenReader(key string, useStale bool, isPartial bool) (reader Reader, err error)
// OpenWriter 打开缓存写入器等待写入 // OpenWriter 打开缓存写入器等待写入
OpenWriter(key string, expiredAt int64, status int, size int64, isPartial bool) (Writer, error) // size 和 maxSize 可能为-1
OpenWriter(key string, expiredAt int64, status int, size int64, maxSize int64, isPartial bool) (Writer, error)
// Delete 删除某个键值对应的缓存 // Delete 删除某个键值对应的缓存
Delete(key string) error Delete(key string) error
@@ -41,4 +42,7 @@ type StorageInterface interface {
// AddToList 将缓存添加到列表 // AddToList 将缓存添加到列表
AddToList(item *Item) AddToList(item *Item)
// IgnoreKey 忽略某个Key即不缓存某个Key
IgnoreKey(key string)
} }

View File

@@ -7,6 +7,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils"
setutils "github.com/TeaOSLab/EdgeNode/internal/utils/sets"
"github.com/TeaOSLab/EdgeNode/internal/zero" "github.com/TeaOSLab/EdgeNode/internal/zero"
"github.com/cespare/xxhash" "github.com/cespare/xxhash"
"github.com/iwind/TeaGo/rands" "github.com/iwind/TeaGo/rands"
@@ -46,6 +47,8 @@ type MemoryStorage struct {
totalSize int64 totalSize int64
writingKeyMap map[string]zero.Zero // key => bool writingKeyMap map[string]zero.Zero // key => bool
ignoreKeys *setutils.FixedSet
} }
func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy, parentStorage StorageInterface) *MemoryStorage { func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy, parentStorage StorageInterface) *MemoryStorage {
@@ -65,6 +68,7 @@ func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy, parentStorage Stora
valuesMap: map[uint64]*MemoryItem{}, valuesMap: map[uint64]*MemoryItem{},
dirtyChan: dirtyChan, dirtyChan: dirtyChan,
writingKeyMap: map[string]zero.Zero{}, writingKeyMap: map[string]zero.Zero{},
ignoreKeys: setutils.NewFixedSet(32768),
} }
} }
@@ -145,15 +149,19 @@ func (this *MemoryStorage) OpenReader(key string, useStale bool, isPartial bool)
} }
// OpenWriter 打开缓存写入器等待写入 // OpenWriter 打开缓存写入器等待写入
func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int, size int64, isPartial bool) (Writer, error) { func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int, size int64, maxSize int64, isPartial bool) (Writer, error) {
if this.ignoreKeys.Has(key) {
return nil, ErrEntityTooLarge
}
// TODO 内存缓存暂时不支持分块内容存储 // TODO 内存缓存暂时不支持分块内容存储
if isPartial { if isPartial {
return nil, ErrFileIsWriting return nil, ErrFileIsWriting
} }
return this.openWriter(key, expiredAt, status, size, true) return this.openWriter(key, expiredAt, status, size, maxSize, true)
} }
func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, size int64, isDirty bool) (Writer, error) { func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, size int64, maxSize int64, isDirty bool) (Writer, error) {
this.locker.Lock() this.locker.Lock()
defer this.locker.Unlock() defer this.locker.Unlock()
@@ -200,7 +208,7 @@ func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, s
} }
isWriting = true isWriting = true
return NewMemoryWriter(this, key, expiredAt, status, isDirty, func() { return NewMemoryWriter(this, key, expiredAt, status, isDirty, maxSize, func() {
this.locker.Lock() this.locker.Lock()
delete(this.writingKeyMap, key) delete(this.writingKeyMap, key)
this.locker.Unlock() this.locker.Unlock()
@@ -277,6 +285,8 @@ func (this *MemoryStorage) Stop() {
this.locker.Unlock() this.locker.Unlock()
this.ignoreKeys.Reset()
// 回收内存 // 回收内存
runtime.GC() runtime.GC()
@@ -305,6 +315,11 @@ func (this *MemoryStorage) TotalMemorySize() int64 {
return atomic.LoadInt64(&this.totalSize) return atomic.LoadInt64(&this.totalSize)
} }
// IgnoreKey 忽略某个Key即不缓存某个Key
func (this *MemoryStorage) IgnoreKey(key string) {
this.ignoreKeys.Push(key)
}
// 计算Key Hash // 计算Key Hash
func (this *MemoryStorage) hash(key string) uint64 { func (this *MemoryStorage) hash(key string) uint64 {
return xxhash.Sum64String(key) return xxhash.Sum64String(key)
@@ -391,7 +406,7 @@ func (this *MemoryStorage) flushItem(key string) {
return return
} }
writer, err := this.parentStorage.OpenWriter(key, item.ExpiredAt, item.Status, -1, false) writer, err := this.parentStorage.OpenWriter(key, item.ExpiredAt, item.Status, -1, -1, false)
if err != nil { if err != nil {
if !CanIgnoreErr(err) { if !CanIgnoreErr(err) {
remotelogs.Error("CACHE", "flush items failed: open writer failed: "+err.Error()) remotelogs.Error("CACHE", "flush items failed: open writer failed: "+err.Error())

View File

@@ -15,7 +15,7 @@ import (
func TestMemoryStorage_OpenWriter(t *testing.T) { func TestMemoryStorage_OpenWriter(t *testing.T) {
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1, false) writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1, -1, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -62,7 +62,7 @@ func TestMemoryStorage_OpenWriter(t *testing.T) {
} }
} }
writer, err = storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1, false) writer, err = storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1, -1, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -103,7 +103,7 @@ func TestMemoryStorage_OpenReaderLock(t *testing.T) {
func TestMemoryStorage_Delete(t *testing.T) { func TestMemoryStorage_Delete(t *testing.T) {
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
{ {
writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1, false) writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1, -1, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -111,7 +111,7 @@ func TestMemoryStorage_Delete(t *testing.T) {
t.Log(len(storage.valuesMap)) t.Log(len(storage.valuesMap))
} }
{ {
writer, err := storage.OpenWriter("abc1", time.Now().Unix()+60, 200, -1, false) writer, err := storage.OpenWriter("abc1", time.Now().Unix()+60, 200, -1, -1, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -126,7 +126,7 @@ func TestMemoryStorage_Stat(t *testing.T) {
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
expiredAt := time.Now().Unix() + 60 expiredAt := time.Now().Unix() + 60
{ {
writer, err := storage.OpenWriter("abc", expiredAt, 200, -1, false) writer, err := storage.OpenWriter("abc", expiredAt, 200, -1, -1, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -139,7 +139,7 @@ func TestMemoryStorage_Stat(t *testing.T) {
}) })
} }
{ {
writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1, false) writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1, -1, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -163,7 +163,7 @@ func TestMemoryStorage_CleanAll(t *testing.T) {
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
expiredAt := time.Now().Unix() + 60 expiredAt := time.Now().Unix() + 60
{ {
writer, err := storage.OpenWriter("abc", expiredAt, 200, -1, false) writer, err := storage.OpenWriter("abc", expiredAt, 200, -1, -1, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -175,7 +175,7 @@ func TestMemoryStorage_CleanAll(t *testing.T) {
}) })
} }
{ {
writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1, false) writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1, -1, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -198,7 +198,7 @@ func TestMemoryStorage_Purge(t *testing.T) {
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
expiredAt := time.Now().Unix() + 60 expiredAt := time.Now().Unix() + 60
{ {
writer, err := storage.OpenWriter("abc", expiredAt, 200, -1, false) writer, err := storage.OpenWriter("abc", expiredAt, 200, -1, -1, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -210,7 +210,7 @@ func TestMemoryStorage_Purge(t *testing.T) {
}) })
} }
{ {
writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1, false) writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1, -1, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -241,7 +241,7 @@ func TestMemoryStorage_Expire(t *testing.T) {
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {
expiredAt := time.Now().Unix() + int64(rands.Int(0, 60)) expiredAt := time.Now().Unix() + int64(rands.Int(0, 60))
key := "abc" + strconv.Itoa(i) key := "abc" + strconv.Itoa(i)
writer, err := storage.OpenWriter(key, expiredAt, 200, -1, false) writer, err := storage.OpenWriter(key, expiredAt, 200, -1, -1, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -11,20 +11,24 @@ import (
) )
type FileWriter struct { type FileWriter struct {
storage StorageInterface
rawWriter *os.File rawWriter *os.File
key string key string
headerSize int64 headerSize int64
bodySize int64 bodySize int64
expiredAt int64 expiredAt int64
maxSize int64
endFunc func() endFunc func()
once sync.Once once sync.Once
} }
func NewFileWriter(rawWriter *os.File, key string, expiredAt int64, endFunc func()) *FileWriter { func NewFileWriter(storage StorageInterface, rawWriter *os.File, key string, expiredAt int64, maxSize int64, endFunc func()) *FileWriter {
return &FileWriter{ return &FileWriter{
storage: storage,
key: key, key: key,
rawWriter: rawWriter, rawWriter: rawWriter,
expiredAt: expiredAt, expiredAt: expiredAt,
maxSize: maxSize,
endFunc: endFunc, endFunc: endFunc,
} }
} }
@@ -60,6 +64,15 @@ func (this *FileWriter) WriteHeaderLength(headerLength int) error {
func (this *FileWriter) Write(data []byte) (n int, err error) { func (this *FileWriter) Write(data []byte) (n int, err error) {
n, err = this.rawWriter.Write(data) n, err = this.rawWriter.Write(data)
this.bodySize += int64(n) this.bodySize += int64(n)
if this.maxSize > 0 && this.bodySize > this.maxSize {
err = ErrEntityTooLarge
if this.storage != nil {
this.storage.IgnoreKey(this.key)
}
}
if err != nil { if err != nil {
_ = this.Discard() _ = this.Discard()
} }

View File

@@ -16,6 +16,7 @@ type MemoryWriter struct {
bodySize int64 bodySize int64
status int status int
isDirty bool isDirty bool
maxSize int64
hash uint64 hash uint64
item *MemoryItem item *MemoryItem
@@ -23,7 +24,7 @@ type MemoryWriter struct {
once sync.Once once sync.Once
} }
func NewMemoryWriter(memoryStorage *MemoryStorage, key string, expiredAt int64, status int, isDirty bool, endFunc func()) *MemoryWriter { func NewMemoryWriter(memoryStorage *MemoryStorage, key string, expiredAt int64, status int, isDirty bool, maxSize int64, endFunc func()) *MemoryWriter {
w := &MemoryWriter{ w := &MemoryWriter{
storage: memoryStorage, storage: memoryStorage,
key: key, key: key,
@@ -35,6 +36,7 @@ func NewMemoryWriter(memoryStorage *MemoryStorage, key string, expiredAt int64,
}, },
status: status, status: status,
isDirty: isDirty, isDirty: isDirty,
maxSize: maxSize,
endFunc: endFunc, endFunc: endFunc,
} }
w.hash = w.calculateHash(key) w.hash = w.calculateHash(key)
@@ -53,6 +55,14 @@ func (this *MemoryWriter) WriteHeader(data []byte) (n int, err error) {
func (this *MemoryWriter) Write(data []byte) (n int, err error) { func (this *MemoryWriter) Write(data []byte) (n int, err error) {
this.bodySize += int64(len(data)) this.bodySize += int64(len(data))
this.item.BodyValue = append(this.item.BodyValue, data...) this.item.BodyValue = append(this.item.BodyValue, data...)
// 检查尺寸
if this.maxSize > 0 && this.bodySize > this.maxSize {
err = ErrEntityTooLarge
this.storage.IgnoreKey(this.key)
return len(data), err
}
return len(data), nil return len(data), nil
} }

View File

@@ -183,7 +183,7 @@ func (this *APIStream) handleWriteCache(message *pb.NodeStreamMessage) error {
} }
expiredAt := time.Now().Unix() + msg.LifeSeconds expiredAt := time.Now().Unix() + msg.LifeSeconds
writer, err := storage.OpenWriter(msg.Key, expiredAt, 200, int64(len(msg.Value)), false) writer, err := storage.OpenWriter(msg.Key, expiredAt, 200, int64(len(msg.Value)), -1, false)
if err != nil { if err != nil {
this.replyFail(message.RequestId, "prepare writing failed: "+err.Error()) this.replyFail(message.RequestId, "prepare writing failed: "+err.Error())
return err return err
@@ -472,7 +472,7 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error {
} }
expiredAt := time.Now().Unix() + 8600 expiredAt := time.Now().Unix() + 8600
writer, err := storage.OpenWriter(key, expiredAt, 200, resp.ContentLength, false) // TODO 可以设置缓存过期时间 writer, err := storage.OpenWriter(key, expiredAt, 200, resp.ContentLength, -1, false) // TODO 可以设置缓存过期时间
if err != nil { if err != nil {
locker.Lock() locker.Lock()
errorMessages = append(errorMessages, "open cache writer failed: "+key+": "+err.Error()) errorMessages = append(errorMessages, "open cache writer failed: "+key+": "+err.Error())

View File

@@ -286,8 +286,12 @@ func (this *HTTPWriter) PrepareCache(resp *http.Response, size int64) {
if this.isPartial { if this.isPartial {
cacheKey += caches.SuffixPartial cacheKey += caches.SuffixPartial
} }
cacheWriter, err := storage.OpenWriter(cacheKey, expiredAt, this.StatusCode(), size, this.isPartial) cacheWriter, err := storage.OpenWriter(cacheKey, expiredAt, this.StatusCode(), size, cacheRef.MaxSizeBytes(), this.isPartial)
if err != nil { if err != nil {
if err == caches.ErrEntityTooLarge && addStatusHeader {
this.Header().Set("X-Cache", "BYPASS, entity too large")
}
if !caches.CanIgnoreErr(err) { if !caches.CanIgnoreErr(err) {
remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error()) remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error())
} }
@@ -556,8 +560,10 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) {
// compression cache writer // compression cache writer
// 只有在本身内容已经缓存的情况下才会写入缓存防止同时写入缓存导致IO负载升高 // 只有在本身内容已经缓存的情况下才会写入缓存防止同时写入缓存导致IO负载升高
var cacheRef = this.req.cacheRef
if !this.isPartial && if !this.isPartial &&
this.cacheStorage != nil && this.cacheStorage != nil &&
cacheRef != nil &&
(this.cacheReader != nil || (this.cacheStorage.Policy().SyncCompressionCache && this.cacheWriter != nil)) && (this.cacheReader != nil || (this.cacheStorage.Policy().SyncCompressionCache && this.cacheWriter != nil)) &&
!this.webpIsEncoding { !this.webpIsEncoding {
var cacheKey = "" var cacheKey = ""
@@ -575,7 +581,7 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) {
cacheKey += this.cacheReaderSuffix cacheKey += this.cacheReaderSuffix
} }
compressionCacheWriter, err := this.cacheStorage.OpenWriter(cacheKey+caches.SuffixCompression+compressionEncoding, expiredAt, this.StatusCode(), -1, false) compressionCacheWriter, err := this.cacheStorage.OpenWriter(cacheKey+caches.SuffixCompression+compressionEncoding, expiredAt, this.StatusCode(), -1, cacheRef.MaxSizeBytes(), false)
if err != nil { if err != nil {
return return
} }
@@ -788,7 +794,7 @@ func (this *HTTPWriter) Close() {
expiredAt = this.cacheWriter.ExpiredAt() expiredAt = this.cacheWriter.ExpiredAt()
} }
webpCacheWriter, _ = this.cacheStorage.OpenWriter(cacheKey, expiredAt, this.StatusCode(), -1, false) webpCacheWriter, _ = this.cacheStorage.OpenWriter(cacheKey, expiredAt, this.StatusCode(), -1, -1, false)
if webpCacheWriter != nil { if webpCacheWriter != nil {
// 写入Header // 写入Header
for k, v := range this.Header() { for k, v := range this.Header() {

View File

@@ -0,0 +1,64 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package setutils
import (
"github.com/TeaOSLab/EdgeNode/internal/zero"
"sync"
)
type FixedSet struct {
maxSize int
locker sync.RWMutex
m map[interface{}]zero.Zero
keys []interface{}
}
func NewFixedSet(maxSize int) *FixedSet {
if maxSize <= 0 {
maxSize = 1024
}
return &FixedSet{
maxSize: maxSize,
m: map[interface{}]zero.Zero{},
}
}
func (this *FixedSet) Push(item interface{}) {
this.locker.Lock()
_, ok := this.m[item]
if !ok {
// 是否已满
if len(this.keys) == this.maxSize {
var firstKey = this.keys[0]
this.keys = this.keys[1:]
delete(this.m, firstKey)
}
this.m[item] = zero.New()
this.keys = append(this.keys, item)
}
this.locker.Unlock()
}
func (this *FixedSet) Has(item interface{}) bool {
this.locker.RLock()
defer this.locker.RUnlock()
_, ok := this.m[item]
return ok
}
func (this *FixedSet) Size() int {
this.locker.RLock()
defer this.locker.RUnlock()
return len(this.keys)
}
func (this *FixedSet) Reset() {
this.locker.Lock()
this.m = map[interface{}]zero.Zero{}
this.keys = nil
this.locker.Unlock()
}

View File

@@ -0,0 +1,57 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package setutils_test
import (
setutils "github.com/TeaOSLab/EdgeNode/internal/utils/sets"
"github.com/iwind/TeaGo/assert"
"testing"
)
func TestNewFixedSet(t *testing.T) {
var a = assert.NewAssertion(t)
{
var set = setutils.NewFixedSet(0)
set.Push(1)
set.Push(2)
set.Push(2)
a.IsTrue(set.Size() == 2)
a.IsTrue(set.Has(1))
a.IsTrue(set.Has(2))
}
{
var set = setutils.NewFixedSet(1)
set.Push(1)
set.Push(2)
set.Push(3)
a.IsTrue(set.Size() == 1)
a.IsFalse(set.Has(1))
a.IsTrue(set.Has(3))
a.IsFalse(set.Has(4))
}
}
func TestFixedSet_Reset(t *testing.T) {
var a = assert.NewAssertion(t)
var set = setutils.NewFixedSet(3)
set.Push(1)
set.Push(2)
set.Push(3)
set.Reset()
a.IsTrue(set.Size() == 0)
}
func BenchmarkFixedSet_Has(b *testing.B) {
var count = 100_000
var set = setutils.NewFixedSet(count)
for i := 0; i < count; i++ {
set.Push(i)
}
for i := 0; i < b.N; i++ {
set.Has(i)
}
}

View File

@@ -0,0 +1,10 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package sizes
const (
K = 1024
M = 1024 * K
G = 1024 * M
T = 1024 * G
)

View File

@@ -0,0 +1,17 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package sizes_test
import (
"github.com/TeaOSLab/EdgeNode/internal/utils/sizes"
"github.com/iwind/TeaGo/assert"
"testing"
)
func TestSizes(t *testing.T) {
var a = assert.NewAssertion(t)
a.IsTrue(sizes.K == 1024)
a.IsTrue(sizes.M == 1024*1024)
a.IsTrue(sizes.G == 1024*1024*1024)
a.IsTrue(sizes.T == 1024*1024*1024*1024)
}