使用新的方法控制缓存并发写入速度

This commit is contained in:
GoEdgeLab
2023-07-29 09:29:36 +08:00
parent 079955b93e
commit 986ac733fd
12 changed files with 116 additions and 136 deletions

View File

@@ -12,7 +12,7 @@ var (
ErrEntityTooLarge = errors.New("entity too large") ErrEntityTooLarge = errors.New("entity too large")
ErrWritingUnavailable = errors.New("writing unavailable") ErrWritingUnavailable = errors.New("writing unavailable")
ErrWritingQueueFull = errors.New("writing queue full") ErrWritingQueueFull = errors.New("writing queue full")
ErrTooManyOpenFiles = errors.New("too many open files") ErrServerIsBusy = errors.New("server is busy")
) )
// CapacityError 容量错误 // CapacityError 容量错误
@@ -38,7 +38,7 @@ func CanIgnoreErr(err error) bool {
err == ErrEntityTooLarge || err == ErrEntityTooLarge ||
err == ErrWritingUnavailable || err == ErrWritingUnavailable ||
err == ErrWritingQueueFull || err == ErrWritingQueueFull ||
err == ErrTooManyOpenFiles { err == ErrServerIsBusy {
return true return true
} }
_, ok := err.(*CapacityError) _, ok := err.(*CapacityError)

View File

@@ -1,52 +0,0 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
package caches
import (
"github.com/TeaOSLab/EdgeNode/internal/goman"
"sync/atomic"
"time"
)
const (
modeSlow int32 = 1
modeFast int32 = 2
)
// MaxOpenFiles max open files manager
type MaxOpenFiles struct {
ticker *time.Ticker
mode int32
}
func NewMaxOpenFiles() *MaxOpenFiles {
var f = &MaxOpenFiles{}
f.ticker = time.NewTicker(1 * time.Second)
f.init()
return f
}
func (this *MaxOpenFiles) init() {
goman.New(func() {
for range this.ticker.C {
// reset mode
atomic.StoreInt32(&this.mode, modeFast)
}
})
}
func (this *MaxOpenFiles) Fast() {
atomic.AddInt32(&this.mode, modeFast)
}
func (this *MaxOpenFiles) FinishAll() {
this.Fast()
}
func (this *MaxOpenFiles) Slow() {
atomic.StoreInt32(&this.mode, modeSlow)
}
func (this *MaxOpenFiles) Next() bool {
return atomic.LoadInt32(&this.mode) != modeSlow
}

View File

@@ -1,35 +0,0 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
package caches_test
import (
"github.com/TeaOSLab/EdgeNode/internal/caches"
"testing"
"time"
)
func TestNewMaxOpenFiles(t *testing.T) {
var maxOpenFiles = caches.NewMaxOpenFiles()
maxOpenFiles.Fast()
t.Log("fast:", maxOpenFiles.Next())
maxOpenFiles.Slow()
t.Log("slow:", maxOpenFiles.Next())
time.Sleep(1*time.Second + 1*time.Millisecond)
t.Log("slow 1 second:", maxOpenFiles.Next())
maxOpenFiles.Slow()
t.Log("slow:", maxOpenFiles.Next())
maxOpenFiles.Slow()
t.Log("slow:", maxOpenFiles.Next())
time.Sleep(1 * time.Second)
t.Log("slow 1 second:", maxOpenFiles.Next())
maxOpenFiles.Slow()
t.Log("slow:", maxOpenFiles.Next())
maxOpenFiles.Fast()
t.Log("fast:", maxOpenFiles.Next())
}

View File

@@ -61,9 +61,6 @@ const (
var sharedWritingFileKeyMap = map[string]zero.Zero{} // key => bool var sharedWritingFileKeyMap = map[string]zero.Zero{} // key => bool
var sharedWritingFileKeyLocker = sync.Mutex{} var sharedWritingFileKeyLocker = sync.Mutex{}
var maxOpenFiles = NewMaxOpenFiles()
const maxOpenFilesSlowCost = 1000 * time.Microsecond // us
const protectingLoadWhenDump = false const protectingLoadWhenDump = false
// FileStorage 文件缓存 // FileStorage 文件缓存
@@ -445,9 +442,9 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
return nil, ErrFileIsWriting return nil, ErrFileIsWriting
} }
if !isFlushing && !maxOpenFiles.Next() { if !isFlushing && !fsutils.WriteReady() {
sharedWritingFileKeyLocker.Unlock() sharedWritingFileKeyLocker.Unlock()
return nil, ErrTooManyOpenFiles return nil, ErrServerIsBusy
} }
sharedWritingFileKeyMap[key] = zero.New() sharedWritingFileKeyMap[key] = zero.New()
@@ -456,9 +453,6 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
if !isOk { if !isOk {
sharedWritingFileKeyLocker.Lock() sharedWritingFileKeyLocker.Lock()
delete(sharedWritingFileKeyMap, key) delete(sharedWritingFileKeyMap, key)
if len(sharedWritingFileKeyMap) == 0 {
maxOpenFiles.FinishAll()
}
sharedWritingFileKeyLocker.Unlock() sharedWritingFileKeyLocker.Unlock()
} }
}() }()
@@ -558,7 +552,6 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
if isNewCreated && existsFile { if isNewCreated && existsFile {
flags |= os.O_TRUNC flags |= os.O_TRUNC
} }
var before = time.Now()
writer, err := os.OpenFile(tmpPath, flags, 0666) writer, err := os.OpenFile(tmpPath, flags, 0666)
if err != nil { if err != nil {
// TODO 检查在各个系统中的稳定性 // TODO 检查在各个系统中的稳定性
@@ -572,13 +565,6 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
return nil, err return nil, err
} }
} }
if !isFlushing {
if time.Since(before) >= maxOpenFilesSlowCost {
maxOpenFiles.Slow()
} else {
maxOpenFiles.Fast()
}
}
var removeOnFailure = true var removeOnFailure = true
defer func() { defer func() {
@@ -639,18 +625,12 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
return NewPartialFileWriter(writer, key, expiredAt, metaHeaderSize, metaBodySize, isNewCreated, isPartial, partialBodyOffset, partialRanges, func() { return NewPartialFileWriter(writer, key, expiredAt, metaHeaderSize, metaBodySize, isNewCreated, isPartial, partialBodyOffset, partialRanges, func() {
sharedWritingFileKeyLocker.Lock() sharedWritingFileKeyLocker.Lock()
delete(sharedWritingFileKeyMap, key) delete(sharedWritingFileKeyMap, key)
if len(sharedWritingFileKeyMap) == 0 {
maxOpenFiles.FinishAll()
}
sharedWritingFileKeyLocker.Unlock() sharedWritingFileKeyLocker.Unlock()
}), nil }), nil
} else { } else {
return NewFileWriter(this, writer, key, expiredAt, metaHeaderSize, metaBodySize, maxSize, func() { return NewFileWriter(this, writer, key, expiredAt, metaHeaderSize, metaBodySize, maxSize, func() {
sharedWritingFileKeyLocker.Lock() sharedWritingFileKeyLocker.Lock()
delete(sharedWritingFileKeyMap, key) delete(sharedWritingFileKeyMap, key)
if len(sharedWritingFileKeyMap) == 0 {
maxOpenFiles.FinishAll()
}
sharedWritingFileKeyLocker.Unlock() sharedWritingFileKeyLocker.Unlock()
}), nil }), nil
} }

View File

@@ -488,10 +488,8 @@ func (this *MemoryStorage) startFlush() {
if err == nil && loadStat != nil { if err == nil && loadStat != nil {
if loadStat.Load1 > 10 { if loadStat.Load1 > 10 {
writeDelayMS = 100 writeDelayMS = 100
} else if loadStat.Load1 > 3 { } else if loadStat.Load1 > 5 {
writeDelayMS = 50 writeDelayMS = 50
} else if loadStat.Load1 > 2 {
writeDelayMS = 10
} else { } else {
writeDelayMS = 0 writeDelayMS = 0
} }

View File

@@ -3,6 +3,7 @@ package caches
import ( import (
"encoding/binary" "encoding/binary"
"errors" "errors"
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
"github.com/iwind/TeaGo/types" "github.com/iwind/TeaGo/types"
"io" "io"
"os" "os"
@@ -42,7 +43,9 @@ func NewFileWriter(storage StorageInterface, rawWriter *os.File, key string, exp
// WriteHeader 写入数据 // WriteHeader 写入数据
func (this *FileWriter) WriteHeader(data []byte) (n int, err error) { func (this *FileWriter) WriteHeader(data []byte) (n int, err error) {
fsutils.WriteBegin()
n, err = this.rawWriter.Write(data) n, err = this.rawWriter.Write(data)
fsutils.WriteEnd()
this.headerSize += int64(n) this.headerSize += int64(n)
if err != nil { if err != nil {
_ = this.Discard() _ = this.Discard()
@@ -72,7 +75,9 @@ func (this *FileWriter) WriteHeaderLength(headerLength int) error {
// Write 写入数据 // Write 写入数据
func (this *FileWriter) Write(data []byte) (n int, err error) { func (this *FileWriter) Write(data []byte) (n int, err error) {
fsutils.WriteBegin()
n, err = this.rawWriter.Write(data) n, err = this.rawWriter.Write(data)
fsutils.WriteEnd()
this.bodySize += int64(n) this.bodySize += int64(n)
if this.maxSize > 0 && this.bodySize > this.maxSize { if this.maxSize > 0 && this.bodySize > this.maxSize {

View File

@@ -4,6 +4,7 @@ package caches
import ( import (
"encoding/binary" "encoding/binary"
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
"github.com/iwind/TeaGo/types" "github.com/iwind/TeaGo/types"
"io" "io"
"os" "os"
@@ -53,7 +54,9 @@ func (this *PartialFileWriter) WriteHeader(data []byte) (n int, err error) {
if !this.isNew { if !this.isNew {
return return
} }
fsutils.WriteBegin()
n, err = this.rawWriter.Write(data) n, err = this.rawWriter.Write(data)
fsutils.WriteEnd()
this.headerSize += int64(n) this.headerSize += int64(n)
if err != nil { if err != nil {
_ = this.Discard() _ = this.Discard()
@@ -62,7 +65,9 @@ func (this *PartialFileWriter) WriteHeader(data []byte) (n int, err error) {
} }
func (this *PartialFileWriter) AppendHeader(data []byte) error { func (this *PartialFileWriter) AppendHeader(data []byte) error {
fsutils.WriteBegin()
_, err := this.rawWriter.Write(data) _, err := this.rawWriter.Write(data)
fsutils.WriteEnd()
if err != nil { if err != nil {
_ = this.Discard() _ = this.Discard()
} else { } else {
@@ -99,7 +104,9 @@ func (this *PartialFileWriter) WriteHeaderLength(headerLength int) error {
// Write 写入数据 // Write 写入数据
func (this *PartialFileWriter) Write(data []byte) (n int, err error) { func (this *PartialFileWriter) Write(data []byte) (n int, err error) {
fsutils.WriteBegin()
n, err = this.rawWriter.Write(data) n, err = this.rawWriter.Write(data)
fsutils.WriteEnd()
this.bodySize += int64(n) this.bodySize += int64(n)
if err != nil { if err != nil {
_ = this.Discard() _ = this.Discard()
@@ -128,7 +135,9 @@ func (this *PartialFileWriter) WriteAt(offset int64, data []byte) error {
this.bodyOffset = SizeMeta + int64(keyLength) + this.headerSize this.bodyOffset = SizeMeta + int64(keyLength) + this.headerSize
} }
fsutils.WriteBegin()
_, err := this.rawWriter.WriteAt(data, this.bodyOffset+offset) _, err := this.rawWriter.WriteAt(data, this.bodyOffset+offset)
fsutils.WriteEnd()
if err != nil { if err != nil {
return err return err
} }

View File

@@ -22,8 +22,6 @@ var (
IsQuiting = false // 是否正在退出 IsQuiting = false // 是否正在退出
EnableDBStat = false // 是否开启本地数据库统计 EnableDBStat = false // 是否开启本地数据库统计
DiskIsFast = false // 是否为高速硬盘
) )
// 检查是否为主程序 // 检查是否为主程序

View File

@@ -4,7 +4,6 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils" "github.com/TeaOSLab/EdgeCommon/pkg/configutils"
iplib "github.com/TeaOSLab/EdgeCommon/pkg/iplibrary" iplib "github.com/TeaOSLab/EdgeCommon/pkg/iplibrary"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
@@ -27,7 +26,6 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils"
_ "github.com/TeaOSLab/EdgeNode/internal/utils/agents" // 引入Agent管理器 _ "github.com/TeaOSLab/EdgeNode/internal/utils/agents" // 引入Agent管理器
_ "github.com/TeaOSLab/EdgeNode/internal/utils/clock" // 触发时钟更新 _ "github.com/TeaOSLab/EdgeNode/internal/utils/clock" // 触发时钟更新
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
"github.com/TeaOSLab/EdgeNode/internal/utils/jsonutils" "github.com/TeaOSLab/EdgeNode/internal/utils/jsonutils"
"github.com/TeaOSLab/EdgeNode/internal/waf" "github.com/TeaOSLab/EdgeNode/internal/waf"
"github.com/andybalholm/brotli" "github.com/andybalholm/brotli"
@@ -143,9 +141,6 @@ func (this *Node) Start() {
// 调整系统参数 // 调整系统参数
this.checkSystem() this.checkSystem()
// 检查硬盘
this.checkDisk()
// 启动事件 // 启动事件
events.Notify(events.EventStart) events.Notify(events.EventStart)
@@ -1112,21 +1107,6 @@ func (this *Node) checkSystem() {
} }
} }
// 检查硬盘
func (this *Node) checkDisk() {
speedMB, isFast, err := fsutils.CheckDiskIsFast()
if err != nil {
remotelogs.Error("NODE", "check disk speed failed: "+err.Error())
return
}
teaconst.DiskIsFast = isFast
if isFast {
remotelogs.Println("NODE", "disk is fast, writing test speed: "+fmt.Sprintf("%.2fMB/s", speedMB))
} else {
remotelogs.Println("NODE", "disk is slow, writing test speed: "+fmt.Sprintf("%.2fMB/s", speedMB))
}
}
// 检查API节点地址 // 检查API节点地址
func (this *Node) changeAPINodeAddrs(apiNodeAddrs []*serverconfigs.NetworkAddressConfig) { func (this *Node) changeAPINodeAddrs(apiNodeAddrs []*serverconfigs.NetworkAddressConfig) {
var addrs = []string{} var addrs = []string{}

View File

@@ -65,5 +65,11 @@ func CheckDiskIsFast() (speedMB float64, isFast bool, err error) {
return return
} }
isFast = speedMB > 120 isFast = speedMB > 120
if speedMB > DiskSpeedMB {
DiskSpeedMB = speedMB
DiskIsFast = isFast
}
return return
} }

View File

@@ -0,0 +1,60 @@
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package fsutils
import (
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"sync/atomic"
"time"
)
var (
DiskIsFast bool
DiskSpeedMB float64
)
func init() {
if !teaconst.IsMain {
return
}
// test disk
go func() {
// initial check
_, _, _ = CheckDiskIsFast()
// check every one hour
var ticker = time.NewTicker(1 * time.Hour)
var count = 0
for range ticker.C {
_, _, err := CheckDiskIsFast()
if err == nil {
count++
if count > 24 {
return
}
}
}
}()
}
var countWrites int32 = 0
const MaxWrites = 32
const MaxFastWrites = 128
func WriteReady() bool {
var count = atomic.LoadInt32(&countWrites)
if DiskIsFast {
return count < MaxFastWrites
}
return count <= MaxWrites
}
func WriteBegin() {
atomic.AddInt32(&countWrites, 1)
}
func WriteEnd() {
atomic.AddInt32(&countWrites, -1)
}

View File

@@ -0,0 +1,31 @@
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package fsutils_test
import (
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
"github.com/iwind/TeaGo/assert"
"testing"
)
func TestWrites(t *testing.T) {
var a = assert.NewAssertion(t)
for i := 0; i < fsutils.MaxWrites+1; i++ {
fsutils.WriteBegin()
}
a.IsFalse(fsutils.WriteReady())
fsutils.WriteEnd()
a.IsTrue(fsutils.WriteReady())
}
func BenchmarkWrites(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
fsutils.WriteReady()
fsutils.WriteBegin()
fsutils.WriteEnd()
}
})
}