diff --git a/internal/caches/errors.go b/internal/caches/errors.go index a0d986c..0fb8e75 100644 --- a/internal/caches/errors.go +++ b/internal/caches/errors.go @@ -12,7 +12,7 @@ var ( ErrEntityTooLarge = errors.New("entity too large") ErrWritingUnavailable = errors.New("writing unavailable") ErrWritingQueueFull = errors.New("writing queue full") - ErrTooManyOpenFiles = errors.New("too many open files") + ErrServerIsBusy = errors.New("server is busy") ) // CapacityError 容量错误 @@ -38,7 +38,7 @@ func CanIgnoreErr(err error) bool { err == ErrEntityTooLarge || err == ErrWritingUnavailable || err == ErrWritingQueueFull || - err == ErrTooManyOpenFiles { + err == ErrServerIsBusy { return true } _, ok := err.(*CapacityError) diff --git a/internal/caches/max_open_files.go b/internal/caches/max_open_files.go deleted file mode 100644 index c911230..0000000 --- a/internal/caches/max_open_files.go +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . - -package caches - -import ( - "github.com/TeaOSLab/EdgeNode/internal/goman" - "sync/atomic" - "time" -) - -const ( - modeSlow int32 = 1 - modeFast int32 = 2 -) - -// MaxOpenFiles max open files manager -type MaxOpenFiles struct { - ticker *time.Ticker - mode int32 -} - -func NewMaxOpenFiles() *MaxOpenFiles { - var f = &MaxOpenFiles{} - f.ticker = time.NewTicker(1 * time.Second) - f.init() - return f -} - -func (this *MaxOpenFiles) init() { - goman.New(func() { - for range this.ticker.C { - // reset mode - atomic.StoreInt32(&this.mode, modeFast) - } - }) -} - -func (this *MaxOpenFiles) Fast() { - atomic.AddInt32(&this.mode, modeFast) -} - -func (this *MaxOpenFiles) FinishAll() { - this.Fast() -} - -func (this *MaxOpenFiles) Slow() { - atomic.StoreInt32(&this.mode, modeSlow) -} - -func (this *MaxOpenFiles) Next() bool { - return atomic.LoadInt32(&this.mode) != modeSlow -} diff --git a/internal/caches/max_open_files_test.go b/internal/caches/max_open_files_test.go deleted file mode 100644 index 2b37ab4..0000000 --- a/internal/caches/max_open_files_test.go +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . - -package caches_test - -import ( - "github.com/TeaOSLab/EdgeNode/internal/caches" - "testing" - "time" -) - -func TestNewMaxOpenFiles(t *testing.T) { - var maxOpenFiles = caches.NewMaxOpenFiles() - maxOpenFiles.Fast() - t.Log("fast:", maxOpenFiles.Next()) - - maxOpenFiles.Slow() - t.Log("slow:", maxOpenFiles.Next()) - time.Sleep(1*time.Second + 1*time.Millisecond) - t.Log("slow 1 second:", maxOpenFiles.Next()) - - maxOpenFiles.Slow() - t.Log("slow:", maxOpenFiles.Next()) - - maxOpenFiles.Slow() - t.Log("slow:", maxOpenFiles.Next()) - - time.Sleep(1 * time.Second) - t.Log("slow 1 second:", maxOpenFiles.Next()) - - maxOpenFiles.Slow() - t.Log("slow:", maxOpenFiles.Next()) - - maxOpenFiles.Fast() - t.Log("fast:", maxOpenFiles.Next()) -} diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 36d9b99..99624b1 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -61,9 +61,6 @@ const ( var sharedWritingFileKeyMap = map[string]zero.Zero{} // key => bool var sharedWritingFileKeyLocker = sync.Mutex{} -var maxOpenFiles = NewMaxOpenFiles() - -const maxOpenFilesSlowCost = 1000 * time.Microsecond // us const protectingLoadWhenDump = false // FileStorage 文件缓存 @@ -445,9 +442,9 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea return nil, ErrFileIsWriting } - if !isFlushing && !maxOpenFiles.Next() { + if !isFlushing && !fsutils.WriteReady() { sharedWritingFileKeyLocker.Unlock() - return nil, ErrTooManyOpenFiles + return nil, ErrServerIsBusy } sharedWritingFileKeyMap[key] = zero.New() @@ -456,9 +453,6 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea if !isOk { sharedWritingFileKeyLocker.Lock() delete(sharedWritingFileKeyMap, key) - if len(sharedWritingFileKeyMap) == 0 { - maxOpenFiles.FinishAll() - } sharedWritingFileKeyLocker.Unlock() } }() @@ -558,7 +552,6 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea if isNewCreated && existsFile { flags |= os.O_TRUNC } - var before = time.Now() writer, err := os.OpenFile(tmpPath, flags, 0666) if err != nil { // TODO 检查在各个系统中的稳定性 @@ -572,13 +565,6 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea return nil, err } } - if !isFlushing { - if time.Since(before) >= maxOpenFilesSlowCost { - maxOpenFiles.Slow() - } else { - maxOpenFiles.Fast() - } - } var removeOnFailure = true defer func() { @@ -639,18 +625,12 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea return NewPartialFileWriter(writer, key, expiredAt, metaHeaderSize, metaBodySize, isNewCreated, isPartial, partialBodyOffset, partialRanges, func() { sharedWritingFileKeyLocker.Lock() delete(sharedWritingFileKeyMap, key) - if len(sharedWritingFileKeyMap) == 0 { - maxOpenFiles.FinishAll() - } sharedWritingFileKeyLocker.Unlock() }), nil } else { return NewFileWriter(this, writer, key, expiredAt, metaHeaderSize, metaBodySize, maxSize, func() { sharedWritingFileKeyLocker.Lock() delete(sharedWritingFileKeyMap, key) - if len(sharedWritingFileKeyMap) == 0 { - maxOpenFiles.FinishAll() - } sharedWritingFileKeyLocker.Unlock() }), nil } diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index 7fd381f..f432a9d 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -488,10 +488,8 @@ func (this *MemoryStorage) startFlush() { if err == nil && loadStat != nil { if loadStat.Load1 > 10 { writeDelayMS = 100 - } else if loadStat.Load1 > 3 { + } else if loadStat.Load1 > 5 { writeDelayMS = 50 - } else if loadStat.Load1 > 2 { - writeDelayMS = 10 } else { writeDelayMS = 0 } diff --git a/internal/caches/writer_file.go b/internal/caches/writer_file.go index 2f915f5..514deee 100644 --- a/internal/caches/writer_file.go +++ b/internal/caches/writer_file.go @@ -3,6 +3,7 @@ package caches import ( "encoding/binary" "errors" + fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" "github.com/iwind/TeaGo/types" "io" "os" @@ -42,7 +43,9 @@ func NewFileWriter(storage StorageInterface, rawWriter *os.File, key string, exp // WriteHeader 写入数据 func (this *FileWriter) WriteHeader(data []byte) (n int, err error) { + fsutils.WriteBegin() n, err = this.rawWriter.Write(data) + fsutils.WriteEnd() this.headerSize += int64(n) if err != nil { _ = this.Discard() @@ -72,7 +75,9 @@ func (this *FileWriter) WriteHeaderLength(headerLength int) error { // Write 写入数据 func (this *FileWriter) Write(data []byte) (n int, err error) { + fsutils.WriteBegin() n, err = this.rawWriter.Write(data) + fsutils.WriteEnd() this.bodySize += int64(n) if this.maxSize > 0 && this.bodySize > this.maxSize { diff --git a/internal/caches/writer_partial_file.go b/internal/caches/writer_partial_file.go index c718a0f..cf4b3fd 100644 --- a/internal/caches/writer_partial_file.go +++ b/internal/caches/writer_partial_file.go @@ -4,6 +4,7 @@ package caches import ( "encoding/binary" + fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" "github.com/iwind/TeaGo/types" "io" "os" @@ -53,7 +54,9 @@ func (this *PartialFileWriter) WriteHeader(data []byte) (n int, err error) { if !this.isNew { return } + fsutils.WriteBegin() n, err = this.rawWriter.Write(data) + fsutils.WriteEnd() this.headerSize += int64(n) if err != nil { _ = this.Discard() @@ -62,7 +65,9 @@ func (this *PartialFileWriter) WriteHeader(data []byte) (n int, err error) { } func (this *PartialFileWriter) AppendHeader(data []byte) error { + fsutils.WriteBegin() _, err := this.rawWriter.Write(data) + fsutils.WriteEnd() if err != nil { _ = this.Discard() } else { @@ -99,7 +104,9 @@ func (this *PartialFileWriter) WriteHeaderLength(headerLength int) error { // Write 写入数据 func (this *PartialFileWriter) Write(data []byte) (n int, err error) { + fsutils.WriteBegin() n, err = this.rawWriter.Write(data) + fsutils.WriteEnd() this.bodySize += int64(n) if err != nil { _ = this.Discard() @@ -128,7 +135,9 @@ func (this *PartialFileWriter) WriteAt(offset int64, data []byte) error { this.bodyOffset = SizeMeta + int64(keyLength) + this.headerSize } + fsutils.WriteBegin() _, err := this.rawWriter.WriteAt(data, this.bodyOffset+offset) + fsutils.WriteEnd() if err != nil { return err } diff --git a/internal/const/vars.go b/internal/const/vars.go index 32b4fe3..d8f5023 100644 --- a/internal/const/vars.go +++ b/internal/const/vars.go @@ -22,8 +22,6 @@ var ( IsQuiting = false // 是否正在退出 EnableDBStat = false // 是否开启本地数据库统计 - - DiskIsFast = false // 是否为高速硬盘 ) // 检查是否为主程序 diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 5808725..c23087a 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/json" "errors" - "fmt" "github.com/TeaOSLab/EdgeCommon/pkg/configutils" iplib "github.com/TeaOSLab/EdgeCommon/pkg/iplibrary" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" @@ -27,7 +26,6 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/utils" _ "github.com/TeaOSLab/EdgeNode/internal/utils/agents" // 引入Agent管理器 _ "github.com/TeaOSLab/EdgeNode/internal/utils/clock" // 触发时钟更新 - fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" "github.com/TeaOSLab/EdgeNode/internal/utils/jsonutils" "github.com/TeaOSLab/EdgeNode/internal/waf" "github.com/andybalholm/brotli" @@ -143,9 +141,6 @@ func (this *Node) Start() { // 调整系统参数 this.checkSystem() - // 检查硬盘 - this.checkDisk() - // 启动事件 events.Notify(events.EventStart) @@ -1112,21 +1107,6 @@ func (this *Node) checkSystem() { } } -// 检查硬盘 -func (this *Node) checkDisk() { - speedMB, isFast, err := fsutils.CheckDiskIsFast() - if err != nil { - remotelogs.Error("NODE", "check disk speed failed: "+err.Error()) - return - } - teaconst.DiskIsFast = isFast - if isFast { - remotelogs.Println("NODE", "disk is fast, writing test speed: "+fmt.Sprintf("%.2fMB/s", speedMB)) - } else { - remotelogs.Println("NODE", "disk is slow, writing test speed: "+fmt.Sprintf("%.2fMB/s", speedMB)) - } -} - // 检查API节点地址 func (this *Node) changeAPINodeAddrs(apiNodeAddrs []*serverconfigs.NetworkAddressConfig) { var addrs = []string{} diff --git a/internal/utils/fs/disk.go b/internal/utils/fs/disk.go index 2ac98b6..c063cef 100644 --- a/internal/utils/fs/disk.go +++ b/internal/utils/fs/disk.go @@ -65,5 +65,11 @@ func CheckDiskIsFast() (speedMB float64, isFast bool, err error) { return } isFast = speedMB > 120 + + if speedMB > DiskSpeedMB { + DiskSpeedMB = speedMB + DiskIsFast = isFast + } + return } diff --git a/internal/utils/fs/status.go b/internal/utils/fs/status.go new file mode 100644 index 0000000..9d3302d --- /dev/null +++ b/internal/utils/fs/status.go @@ -0,0 +1,60 @@ +// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package fsutils + +import ( + teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "sync/atomic" + "time" +) + +var ( + DiskIsFast bool + DiskSpeedMB float64 +) + +func init() { + if !teaconst.IsMain { + return + } + + // test disk + go func() { + // initial check + _, _, _ = CheckDiskIsFast() + + // check every one hour + var ticker = time.NewTicker(1 * time.Hour) + var count = 0 + for range ticker.C { + _, _, err := CheckDiskIsFast() + if err == nil { + count++ + if count > 24 { + return + } + } + } + }() +} + +var countWrites int32 = 0 + +const MaxWrites = 32 +const MaxFastWrites = 128 + +func WriteReady() bool { + var count = atomic.LoadInt32(&countWrites) + if DiskIsFast { + return count < MaxFastWrites + } + return count <= MaxWrites +} + +func WriteBegin() { + atomic.AddInt32(&countWrites, 1) +} + +func WriteEnd() { + atomic.AddInt32(&countWrites, -1) +} diff --git a/internal/utils/fs/status_test.go b/internal/utils/fs/status_test.go new file mode 100644 index 0000000..c322984 --- /dev/null +++ b/internal/utils/fs/status_test.go @@ -0,0 +1,31 @@ +// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package fsutils_test + +import ( + fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" + "github.com/iwind/TeaGo/assert" + "testing" +) + +func TestWrites(t *testing.T) { + var a = assert.NewAssertion(t) + + for i := 0; i < fsutils.MaxWrites+1; i++ { + fsutils.WriteBegin() + } + a.IsFalse(fsutils.WriteReady()) + + fsutils.WriteEnd() + a.IsTrue(fsutils.WriteReady()) +} + +func BenchmarkWrites(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + fsutils.WriteReady() + fsutils.WriteBegin() + fsutils.WriteEnd() + } + }) +}