优化缓存从内存刷新到硬盘程序

This commit is contained in:
GoEdgeLab
2024-04-05 10:59:14 +08:00
parent 57d8f5135a
commit 1377f25fa4
6 changed files with 83 additions and 18 deletions

View File

@@ -45,3 +45,12 @@ func CanIgnoreErr(err error) bool {
var capacityErr *CapacityError
return errors.As(err, &capacityErr)
}
func IsCapacityError(err error) bool {
if err == nil {
return false
}
var capacityErr *CapacityError
return errors.As(err, &capacityErr)
}

View File

@@ -54,10 +54,10 @@ const (
)
const (
FileStorageMaxIgnoreKeys = 32768 // 最大可忽略的键值数(尺寸过大的键值)
HotItemSize = 1024 // 热点数据数量
HotItemLifeSeconds int64 = 3600 // 热点数据生命周期
FileToMemoryMaxSize = 32 * sizes.M // 可以从文件写入到内存的最大文件尺寸
FileStorageMaxIgnoreKeys = 32768 // 最大可忽略的键值数(尺寸过大的键值)
HotItemSize = 1024 // 热点数据数量
HotItemLifeSeconds int64 = 3600 // 热点数据生命周期
FileToMemoryMaxSize int64 = 32 << 20 // 可以从文件写入到内存的最大文件尺寸
FileTmpSuffix = ".tmp"
DefaultMinDiskFreeSpace uint64 = 5 << 30 // 当前磁盘最小剩余空间
DefaultStaleCacheSeconds = 1200 // 过时缓存留存时间
@@ -478,7 +478,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
maxMemorySize = maxSize
}
var memoryStorage = this.memoryStorage
if !fsutils.DiskIsExtremelyFast() && !isFlushing && !isPartial && memoryStorage != nil && ((bodySize > 0 && bodySize < maxMemorySize) || bodySize < 0) {
if !isFlushing && !isPartial && memoryStorage != nil && ((bodySize > 0 && bodySize < maxMemorySize) || bodySize < 0) {
writer, err := memoryStorage.OpenWriter(key, expiredAt, status, headerSize, bodySize, maxMemorySize, false)
if err == nil {
return writer, nil
@@ -488,6 +488,10 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
if errors.Is(err, ErrWritingQueueFull) {
return nil, err
}
if IsCapacityError(err) && bodySize > 0 && memoryStorage.totalDirtySize > (128<<20) {
return nil, err
}
}
// 是否正在写入
@@ -607,7 +611,6 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
writer, err := os.OpenFile(tmpPath, flags, 0666)
fsutils.WriteEnd()
if err != nil {
// TODO 检查在各个系统中的稳定性
if os.IsNotExist(err) {
_ = os.MkdirAll(dir, 0777)

View File

@@ -55,7 +55,9 @@ type MemoryStorage struct {
purgeTicker *utils.Ticker
usedSize int64
usedSize int64
totalDirtySize int64
writingKeyMap map[string]zero.Zero // key => bool
ignoreKeys *setutils.FixedSet
@@ -340,6 +342,9 @@ func (this *MemoryStorage) Stop() {
close(this.dirtyChan)
}
this.usedSize = 0
this.totalDirtySize = 0
_ = this.list.Close()
this.locker.Unlock()
@@ -506,14 +511,20 @@ func (this *MemoryStorage) startFlush() {
if fsutils.IsInExtremelyHighLoad {
time.Sleep(1 * time.Second)
} else if fsutils.IsInHighLoad {
time.Sleep(100 * time.Millisecond)
}
}
}
// 单次Flush任务
func (this *MemoryStorage) flushItem(key string) {
func (this *MemoryStorage) flushItem(fullKey string) {
sizeString, key, found := strings.Cut(fullKey, "@")
if !found {
return
}
defer func() {
atomic.AddInt64(&this.totalDirtySize, -types.Int64(sizeString))
}()
if this.parentStorage == nil {
return
}
@@ -547,7 +558,17 @@ func (this *MemoryStorage) flushItem(key string) {
return
}
if !isInList {
time.Sleep(1 * time.Second)
for i := 0; i < 1000; i++ {
isInList, _, err = this.list.Exist(types.String(hash))
if isInList {
break
}
time.Sleep(1 * time.Millisecond)
}
if !isInList {
// discard
return
}
}
writer, err := this.parentStorage.OpenFlushWriter(key, item.ExpiresAt, item.Status, len(item.HeaderValue), int64(len(item.BodyValue)))

View File

@@ -6,6 +6,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/utils/testutils"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/rands"
"math/rand"
"runtime"
"runtime/debug"
"strconv"
@@ -381,3 +382,31 @@ func BenchmarkValuesMap(b *testing.B) {
}
})
}
func BenchmarkNewMemoryStorage(b *testing.B) {
var storage = NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
var data = bytes.Repeat([]byte{'A'}, 1024)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
func() {
writer, err := storage.OpenWriter("abc"+strconv.Itoa(rand.Int()), time.Now().Unix()+60, 200, -1, -1, -1, false)
if err != nil {
b.Fatal(err)
}
if err != nil {
b.Fatal(err)
}
_, _ = writer.WriteHeader([]byte("Header"))
_, _ = writer.Write(data)
err = writer.Close()
if err != nil {
b.Fatal(err)
}
}()
}
})
}

View File

@@ -4,7 +4,9 @@ import (
"errors"
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
"github.com/cespare/xxhash"
"github.com/iwind/TeaGo/types"
"sync"
"sync/atomic"
)
type MemoryWriter struct {
@@ -127,7 +129,8 @@ func (this *MemoryWriter) Close() error {
this.storage.valuesMap[this.hash] = this.item
select {
case this.storage.dirtyChan <- this.key:
case this.storage.dirtyChan <- types.String(this.bodySize) + "@" +this.key :
atomic.AddInt64(&this.storage.totalDirtySize, this.bodySize)
default:
// remove from values map
delete(this.storage.valuesMap, this.hash)

View File

@@ -129,14 +129,14 @@ func WriteEnd() {
func calculateDiskMaxWrites() {
switch DiskSpeed {
case SpeedExtremelyFast:
DiskMaxWrites = 128
case SpeedFast:
DiskMaxWrites = 64
case SpeedLow:
DiskMaxWrites = 32
case SpeedFast:
DiskMaxWrites = 16
case SpeedLow:
DiskMaxWrites = 8
case SpeedExtremelySlow:
DiskMaxWrites = 16
DiskMaxWrites = 4
default:
DiskMaxWrites = 16
DiskMaxWrites = 4
}
}