优化缓存从内存刷新到硬盘程序

This commit is contained in:
刘祥超
2024-04-05 10:59:14 +08:00
parent f6e5201cc3
commit c4bb92433d
6 changed files with 83 additions and 18 deletions

View File

@@ -45,3 +45,12 @@ func CanIgnoreErr(err error) bool {
var capacityErr *CapacityError var capacityErr *CapacityError
return errors.As(err, &capacityErr) return errors.As(err, &capacityErr)
} }
func IsCapacityError(err error) bool {
if err == nil {
return false
}
var capacityErr *CapacityError
return errors.As(err, &capacityErr)
}

View File

@@ -57,7 +57,7 @@ const (
FileStorageMaxIgnoreKeys = 32768 // 最大可忽略的键值数(尺寸过大的键值) FileStorageMaxIgnoreKeys = 32768 // 最大可忽略的键值数(尺寸过大的键值)
HotItemSize = 1024 // 热点数据数量 HotItemSize = 1024 // 热点数据数量
HotItemLifeSeconds int64 = 3600 // 热点数据生命周期 HotItemLifeSeconds int64 = 3600 // 热点数据生命周期
FileToMemoryMaxSize = 32 * sizes.M // 可以从文件写入到内存的最大文件尺寸 FileToMemoryMaxSize int64 = 32 << 20 // 可以从文件写入到内存的最大文件尺寸
FileTmpSuffix = ".tmp" FileTmpSuffix = ".tmp"
DefaultMinDiskFreeSpace uint64 = 5 << 30 // 当前磁盘最小剩余空间 DefaultMinDiskFreeSpace uint64 = 5 << 30 // 当前磁盘最小剩余空间
DefaultStaleCacheSeconds = 1200 // 过时缓存留存时间 DefaultStaleCacheSeconds = 1200 // 过时缓存留存时间
@@ -478,7 +478,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
maxMemorySize = maxSize maxMemorySize = maxSize
} }
var memoryStorage = this.memoryStorage var memoryStorage = this.memoryStorage
if !fsutils.DiskIsExtremelyFast() && !isFlushing && !isPartial && memoryStorage != nil && ((bodySize > 0 && bodySize < maxMemorySize) || bodySize < 0) { if !isFlushing && !isPartial && memoryStorage != nil && ((bodySize > 0 && bodySize < maxMemorySize) || bodySize < 0) {
writer, err := memoryStorage.OpenWriter(key, expiredAt, status, headerSize, bodySize, maxMemorySize, false) writer, err := memoryStorage.OpenWriter(key, expiredAt, status, headerSize, bodySize, maxMemorySize, false)
if err == nil { if err == nil {
return writer, nil return writer, nil
@@ -488,6 +488,10 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
if errors.Is(err, ErrWritingQueueFull) { if errors.Is(err, ErrWritingQueueFull) {
return nil, err return nil, err
} }
if IsCapacityError(err) && bodySize > 0 && memoryStorage.totalDirtySize > (128<<20) {
return nil, err
}
} }
// 是否正在写入 // 是否正在写入
@@ -607,7 +611,6 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
writer, err := os.OpenFile(tmpPath, flags, 0666) writer, err := os.OpenFile(tmpPath, flags, 0666)
fsutils.WriteEnd() fsutils.WriteEnd()
if err != nil { if err != nil {
// TODO 检查在各个系统中的稳定性
if os.IsNotExist(err) { if os.IsNotExist(err) {
_ = os.MkdirAll(dir, 0777) _ = os.MkdirAll(dir, 0777)

View File

@@ -56,6 +56,8 @@ type MemoryStorage struct {
purgeTicker *utils.Ticker purgeTicker *utils.Ticker
usedSize int64 usedSize int64
totalDirtySize int64
writingKeyMap map[string]zero.Zero // key => bool writingKeyMap map[string]zero.Zero // key => bool
ignoreKeys *setutils.FixedSet ignoreKeys *setutils.FixedSet
@@ -340,6 +342,9 @@ func (this *MemoryStorage) Stop() {
close(this.dirtyChan) close(this.dirtyChan)
} }
this.usedSize = 0
this.totalDirtySize = 0
_ = this.list.Close() _ = this.list.Close()
this.locker.Unlock() this.locker.Unlock()
@@ -506,14 +511,20 @@ func (this *MemoryStorage) startFlush() {
if fsutils.IsInExtremelyHighLoad { if fsutils.IsInExtremelyHighLoad {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} else if fsutils.IsInHighLoad {
time.Sleep(100 * time.Millisecond)
} }
} }
} }
// 单次Flush任务 // 单次Flush任务
func (this *MemoryStorage) flushItem(key string) { func (this *MemoryStorage) flushItem(fullKey string) {
sizeString, key, found := strings.Cut(fullKey, "@")
if !found {
return
}
defer func() {
atomic.AddInt64(&this.totalDirtySize, -types.Int64(sizeString))
}()
if this.parentStorage == nil { if this.parentStorage == nil {
return return
} }
@@ -547,7 +558,17 @@ func (this *MemoryStorage) flushItem(key string) {
return return
} }
if !isInList { if !isInList {
time.Sleep(1 * time.Second) for i := 0; i < 1000; i++ {
isInList, _, err = this.list.Exist(types.String(hash))
if isInList {
break
}
time.Sleep(1 * time.Millisecond)
}
if !isInList {
// discard
return
}
} }
writer, err := this.parentStorage.OpenFlushWriter(key, item.ExpiresAt, item.Status, len(item.HeaderValue), int64(len(item.BodyValue))) writer, err := this.parentStorage.OpenFlushWriter(key, item.ExpiresAt, item.Status, len(item.HeaderValue), int64(len(item.BodyValue)))

View File

@@ -6,6 +6,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/utils/testutils" "github.com/TeaOSLab/EdgeNode/internal/utils/testutils"
"github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/rands" "github.com/iwind/TeaGo/rands"
"math/rand"
"runtime" "runtime"
"runtime/debug" "runtime/debug"
"strconv" "strconv"
@@ -381,3 +382,31 @@ func BenchmarkValuesMap(b *testing.B) {
} }
}) })
} }
func BenchmarkNewMemoryStorage(b *testing.B) {
var storage = NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
var data = bytes.Repeat([]byte{'A'}, 1024)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
func() {
writer, err := storage.OpenWriter("abc"+strconv.Itoa(rand.Int()), time.Now().Unix()+60, 200, -1, -1, -1, false)
if err != nil {
b.Fatal(err)
}
if err != nil {
b.Fatal(err)
}
_, _ = writer.WriteHeader([]byte("Header"))
_, _ = writer.Write(data)
err = writer.Close()
if err != nil {
b.Fatal(err)
}
}()
}
})
}

View File

@@ -4,7 +4,9 @@ import (
"errors" "errors"
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
"github.com/cespare/xxhash" "github.com/cespare/xxhash"
"github.com/iwind/TeaGo/types"
"sync" "sync"
"sync/atomic"
) )
type MemoryWriter struct { type MemoryWriter struct {
@@ -127,7 +129,8 @@ func (this *MemoryWriter) Close() error {
this.storage.valuesMap[this.hash] = this.item this.storage.valuesMap[this.hash] = this.item
select { select {
case this.storage.dirtyChan <- this.key: case this.storage.dirtyChan <- types.String(this.bodySize) + "@" +this.key :
atomic.AddInt64(&this.storage.totalDirtySize, this.bodySize)
default: default:
// remove from values map // remove from values map
delete(this.storage.valuesMap, this.hash) delete(this.storage.valuesMap, this.hash)

View File

@@ -129,14 +129,14 @@ func WriteEnd() {
func calculateDiskMaxWrites() { func calculateDiskMaxWrites() {
switch DiskSpeed { switch DiskSpeed {
case SpeedExtremelyFast: case SpeedExtremelyFast:
DiskMaxWrites = 128
case SpeedFast:
DiskMaxWrites = 64
case SpeedLow:
DiskMaxWrites = 32 DiskMaxWrites = 32
case SpeedFast:
DiskMaxWrites = 16
case SpeedLow:
DiskMaxWrites = 8
case SpeedExtremelySlow: case SpeedExtremelySlow:
DiskMaxWrites = 16 DiskMaxWrites = 4
default: default:
DiskMaxWrites = 16 DiskMaxWrites = 4
} }
} }