diff --git a/internal/caches/list_file_db_sqlite.go b/internal/caches/list_file_db_sqlite.go index 039f078..a489973 100644 --- a/internal/caches/list_file_db_sqlite.go +++ b/internal/caches/list_file_db_sqlite.go @@ -9,6 +9,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/utils/dbs" "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" + fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem" "github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/types" @@ -593,9 +594,9 @@ func (this *SQLiteFileListDB) shouldRecover() bool { // 删除数据库文件 func (this *SQLiteFileListDB) deleteDB() { - _ = os.Remove(this.dbPath) - _ = os.Remove(this.dbPath + "-shm") - _ = os.Remove(this.dbPath + "-wal") + _ = fsutils.Remove(this.dbPath) + _ = fsutils.Remove(this.dbPath + "-shm") + _ = fsutils.Remove(this.dbPath + "-wal") } // 加载Hash列表 diff --git a/internal/caches/list_file_sqlite.go b/internal/caches/list_file_sqlite.go index 043d513..d10f0c3 100644 --- a/internal/caches/list_file_sqlite.go +++ b/internal/caches/list_file_sqlite.go @@ -11,6 +11,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/utils/dbs" "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" "github.com/TeaOSLab/EdgeNode/internal/utils/fnv" + fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" "github.com/TeaOSLab/EdgeNode/internal/zero" "github.com/iwind/TeaGo/types" "os" @@ -486,7 +487,7 @@ func (this *SQLiteFileList) UpgradeV3(oldDir string, brokenOnError bool) error { remotelogs.Println("CACHE", "upgrading local database from '"+oldDir+"' ...") defer func() { - _ = os.Remove(indexDBPath) + _ = fsutils.Remove(indexDBPath) remotelogs.Println("CACHE", "upgrading local database finished") }() diff --git a/internal/caches/partial_ranges_queue.go b/internal/caches/partial_ranges_queue.go index b0e2c51..3e12a2a 100644 --- a/internal/caches/partial_ranges_queue.go +++ b/internal/caches/partial_ranges_queue.go @@ -7,8 +7,8 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/utils/fnv" + fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem" - "os" "sync" ) @@ -91,7 +91,7 @@ func (this *PartialRangesQueue) Get(filename string) ([]byte, error) { return data, nil } - return os.ReadFile(filename) + return fsutils.ReadFile(filename) } // Delete ranges filename @@ -119,7 +119,7 @@ func (this *PartialRangesQueue) Dump() { continue } - err := os.WriteFile(filename, data, 0666) + err := fsutils.WriteFile(filename, data, 0666) if err != nil { remotelogs.Println("PARTIAL_RANGES_QUEUE", "write file '"+filename+"' failed: "+err.Error()) } diff --git a/internal/caches/reader_file.go b/internal/caches/reader_file.go index 184224a..78756c4 100644 --- a/internal/caches/reader_file.go +++ b/internal/caches/reader_file.go @@ -406,5 +406,5 @@ func (this *FileReader) discard() error { } // remove file - return os.Remove(this.fp.Name()) + return fsutils.Remove(this.fp.Name()) } diff --git a/internal/caches/reader_partial_file.go b/internal/caches/reader_partial_file.go index ea69dbf..9ce627b 100644 --- a/internal/caches/reader_partial_file.go +++ b/internal/caches/reader_partial_file.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "errors" "fmt" + fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" rangeutils "github.com/TeaOSLab/EdgeNode/internal/utils/ranges" "github.com/iwind/TeaGo/types" "io" @@ -146,7 +147,7 @@ func (this *PartialFileReader) IsCompleted() bool { func (this *PartialFileReader) discard() error { SharedPartialRangesQueue.Delete(this.rangePath) - _ = os.Remove(this.rangePath) + _ = fsutils.Remove(this.rangePath) return this.FileReader.discard() } diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 15029f3..2cafca1 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -525,11 +525,6 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea return nil, fmt.Errorf("%w(001)", ErrFileIsWriting) } - if !isFlushing && !fsutils.WriteReady() { - sharedWritingFileKeyLocker.Unlock() - return nil, ErrServerIsBusy - } - sharedWritingFileKeyMap[key] = zero.New() sharedWritingFileKeyLocker.Unlock() defer func() { @@ -596,7 +591,11 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea // 数据库中是否存在 existsCacheItem, _, _ := this.list.Exist(hash) if existsCacheItem { + if !fsutils.ReaderLimiter.TryAck() { + return nil, ErrServerIsBusy + } readerFp, err := os.OpenFile(tmpPath, os.O_RDONLY, 0444) + fsutils.ReaderLimiter.Release() if err == nil { var partialReader = NewPartialFileReader(readerFp) err = partialReader.Init() @@ -629,15 +628,19 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea if isNewCreated && existsFile { flags |= os.O_TRUNC } - fsutils.WriteBegin() + if !fsutils.WriterLimiter.TryAck() { + return nil, ErrServerIsBusy + } writer, err := os.OpenFile(tmpPath, flags, 0666) - fsutils.WriteEnd() + fsutils.WriterLimiter.Release() if err != nil { if os.IsNotExist(err) { _ = os.MkdirAll(dir, 0777) // open file again + fsutils.WriterLimiter.Ack() writer, err = os.OpenFile(tmpPath, flags, 0666) + fsutils.WriterLimiter.Release() } if err != nil { return nil, err @@ -654,7 +657,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea if !isOk { _ = writer.Close() if removeOnFailure { - _ = os.Remove(tmpPath) + _ = fsutils.Remove(tmpPath) } } }() @@ -697,9 +700,9 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea metaBodySize = bodySize } - fsutils.WriteBegin() + fsutils.WriterLimiter.Ack() _, err = writer.Write(metaBytes) - fsutils.WriteEnd() + fsutils.WriterLimiter.Release() if err != nil { return nil, err } @@ -1152,9 +1155,7 @@ func (this *FileStorage) purgeLoop() { for i := 0; i < times; i++ { countFound, err := this.list.Purge(purgeCount, func(hash string) error { path, _ := this.hashPath(hash) - fsutils.WriteBegin() err := this.removeCacheFile(path) - fsutils.WriteEnd() if err != nil && !os.IsNotExist(err) { remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error()) } @@ -1211,9 +1212,7 @@ func (this *FileStorage) purgeLoop() { var before = time.Now() err := this.list.PurgeLFU(count, func(hash string) error { path, _ := this.hashPath(hash) - fsutils.WriteBegin() err := this.removeCacheFile(path) - fsutils.WriteEnd() if err != nil && !os.IsNotExist(err) { remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error()) } @@ -1481,7 +1480,7 @@ func (this *FileStorage) removeCacheFile(path string) error { openFileCache.Close(path) } - var err = os.Remove(path) + var err = fsutils.Remove(path) if err == nil || os.IsNotExist(err) { err = nil @@ -1493,7 +1492,7 @@ func (this *FileStorage) removeCacheFile(path string) error { _, statErr := os.Stat(partialPath) if statErr == nil { - _ = os.Remove(partialPath) + _ = fsutils.Remove(partialPath) SharedPartialRangesQueue.Delete(partialPath) } } diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index a5c4667..5d14150 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -189,7 +189,7 @@ func (this *MemoryStorage) openWriter(key string, expiresAt int64, status int, h if isDirty && this.parentStorage != nil && this.dirtyQueueSize > 0 && - len(this.dirtyChan) >= this.dirtyQueueSize-int(fsutils.DiskMaxWrites) /** delta **/ { // 缓存时间过长 + len(this.dirtyChan) >= this.dirtyQueueSize-64 /** delta **/ { // 缓存时间过长 return nil, ErrWritingQueueFull } diff --git a/internal/caches/writer_file.go b/internal/caches/writer_file.go index 37745fa..ca21fe9 100644 --- a/internal/caches/writer_file.go +++ b/internal/caches/writer_file.go @@ -43,9 +43,9 @@ func NewFileWriter(storage StorageInterface, rawWriter *os.File, key string, exp // WriteHeader 写入数据 func (this *FileWriter) WriteHeader(data []byte) (n int, err error) { - fsutils.WriteBegin() + fsutils.WriterLimiter.Ack() n, err = this.rawWriter.Write(data) - fsutils.WriteEnd() + fsutils.WriterLimiter.Release() this.headerSize += int64(n) if err != nil { _ = this.Discard() @@ -139,36 +139,36 @@ func (this *FileWriter) Close() error { // check content length if this.metaBodySize > 0 && this.bodySize != this.metaBodySize { _ = this.rawWriter.Close() - _ = os.Remove(path) + _ = fsutils.Remove(path) return ErrUnexpectedContentLength } err := this.WriteHeaderLength(types.Int(this.headerSize)) if err != nil { - fsutils.WriteBegin() + fsutils.WriterLimiter.Ack() _ = this.rawWriter.Close() - fsutils.WriteEnd() - _ = os.Remove(path) + fsutils.WriterLimiter.Release() + _ = fsutils.Remove(path) return err } err = this.WriteBodyLength(this.bodySize) if err != nil { - fsutils.WriteBegin() + fsutils.WriterLimiter.Ack() _ = this.rawWriter.Close() - fsutils.WriteEnd() - _ = os.Remove(path) + fsutils.WriterLimiter.Release() + _ = fsutils.Remove(path) return err } - fsutils.WriteBegin() + fsutils.WriterLimiter.Ack() err = this.rawWriter.Close() - fsutils.WriteEnd() + fsutils.WriterLimiter.Release() if err != nil { - _ = os.Remove(path) + _ = fsutils.Remove(path) } else if strings.HasSuffix(path, FileTmpSuffix) { - err = os.Rename(path, strings.Replace(path, FileTmpSuffix, "", 1)) + err = fsutils.Rename(path, strings.Replace(path, FileTmpSuffix, "", 1)) if err != nil { - _ = os.Remove(path) + _ = fsutils.Remove(path) } } @@ -181,11 +181,11 @@ func (this *FileWriter) Discard() error { this.endFunc() }) - fsutils.WriteBegin() + fsutils.WriterLimiter.Ack() _ = this.rawWriter.Close() - fsutils.WriteEnd() + fsutils.WriterLimiter.Release() - err := os.Remove(this.rawWriter.Name()) + err := fsutils.Remove(this.rawWriter.Name()) return err } @@ -211,9 +211,9 @@ func (this *FileWriter) ItemType() ItemType { } func (this *FileWriter) write(data []byte) (n int, err error) { - fsutils.WriteBegin() + fsutils.WriterLimiter.Ack() n, err = this.rawWriter.Write(data) - fsutils.WriteEnd() + fsutils.WriterLimiter.Release() this.bodySize += int64(n) if this.maxSize > 0 && this.bodySize > this.maxSize { diff --git a/internal/caches/writer_partial_file.go b/internal/caches/writer_partial_file.go index e9617d2..4afe677 100644 --- a/internal/caches/writer_partial_file.go +++ b/internal/caches/writer_partial_file.go @@ -54,9 +54,9 @@ func (this *PartialFileWriter) WriteHeader(data []byte) (n int, err error) { if !this.isNew { return } - fsutils.WriteBegin() + fsutils.WriterLimiter.Ack() n, err = this.rawWriter.Write(data) - fsutils.WriteEnd() + fsutils.WriterLimiter.Release() this.headerSize += int64(n) if err != nil { _ = this.Discard() @@ -65,9 +65,9 @@ func (this *PartialFileWriter) WriteHeader(data []byte) (n int, err error) { } func (this *PartialFileWriter) AppendHeader(data []byte) error { - fsutils.WriteBegin() + fsutils.WriterLimiter.Ack() _, err := this.rawWriter.Write(data) - fsutils.WriteEnd() + fsutils.WriterLimiter.Release() if err != nil { _ = this.Discard() } else { @@ -94,7 +94,9 @@ func (this *PartialFileWriter) WriteHeaderLength(headerLength int) error { _ = this.Discard() return err } + fsutils.WriterLimiter.Ack() _, err = this.rawWriter.Write(bytes4) + fsutils.WriterLimiter.Release() if err != nil { _ = this.Discard() return err @@ -104,9 +106,9 @@ func (this *PartialFileWriter) WriteHeaderLength(headerLength int) error { // Write 写入数据 func (this *PartialFileWriter) Write(data []byte) (n int, err error) { - fsutils.WriteBegin() + fsutils.WriterLimiter.Ack() n, err = this.rawWriter.Write(data) - fsutils.WriteEnd() + fsutils.WriterLimiter.Release() this.bodySize += int64(n) if err != nil { _ = this.Discard() @@ -145,9 +147,9 @@ func (this *PartialFileWriter) WriteAt(offset int64, data []byte) error { // extend min size to prepare for file tail const extendSizePerStep = 8 << 20 if stat.Size()+extendSizePerStep <= this.bodyOffset+offset+int64(len(data)) { - fsutils.WriteBegin() + fsutils.WriterLimiter.Ack() _ = this.rawWriter.Truncate(stat.Size() + extendSizePerStep) - fsutils.WriteEnd() + fsutils.WriterLimiter.Release() return nil } } @@ -161,9 +163,9 @@ func (this *PartialFileWriter) WriteAt(offset int64, data []byte) error { this.bodyOffset = SizeMeta + int64(keyLength) + this.headerSize } - fsutils.WriteBegin() + fsutils.WriterLimiter.Ack() _, err := this.rawWriter.WriteAt(data, this.bodyOffset+offset) - fsutils.WriteEnd() + fsutils.WriterLimiter.Release() if err != nil { return err } @@ -190,7 +192,9 @@ func (this *PartialFileWriter) WriteBodyLength(bodyLength int64) error { _ = this.Discard() return err } + fsutils.WriterLimiter.Ack() _, err = this.rawWriter.Write(bytes8) + fsutils.WriterLimiter.Release() if err != nil { _ = this.Discard() return err @@ -207,9 +211,9 @@ func (this *PartialFileWriter) Close() error { this.ranges.BodySize = this.bodySize err := this.ranges.WriteToFile(this.rangePath) if err != nil { - fsutils.WriteBegin() + fsutils.WriterLimiter.Ack() _ = this.rawWriter.Close() - fsutils.WriteEnd() + fsutils.WriterLimiter.Release() this.remove() return err } @@ -218,25 +222,25 @@ func (this *PartialFileWriter) Close() error { if this.isNew { err = this.WriteHeaderLength(types.Int(this.headerSize)) if err != nil { - fsutils.WriteBegin() + fsutils.WriterLimiter.Ack() _ = this.rawWriter.Close() - fsutils.WriteEnd() + fsutils.WriterLimiter.Release() this.remove() return err } err = this.WriteBodyLength(this.bodySize) if err != nil { - fsutils.WriteBegin() + fsutils.WriterLimiter.Ack() _ = this.rawWriter.Close() - fsutils.WriteEnd() + fsutils.WriterLimiter.Release() this.remove() return err } } - fsutils.WriteBegin() + fsutils.WriterLimiter.Ack() err = this.rawWriter.Close() - fsutils.WriteEnd() + fsutils.WriterLimiter.Release() if err != nil { this.remove() } @@ -250,14 +254,16 @@ func (this *PartialFileWriter) Discard() error { this.endFunc() }) - fsutils.WriteBegin() + fsutils.WriterLimiter.Ack() _ = this.rawWriter.Close() - fsutils.WriteEnd() + fsutils.WriterLimiter.Release() SharedPartialRangesQueue.Delete(this.rangePath) - _ = os.Remove(this.rangePath) - err := os.Remove(this.rawWriter.Name()) + _ = fsutils.Remove(this.rangePath) + + err := fsutils.Remove(this.rawWriter.Name()) + return err } @@ -287,8 +293,9 @@ func (this *PartialFileWriter) IsNew() bool { } func (this *PartialFileWriter) remove() { - _ = os.Remove(this.rawWriter.Name()) + _ = fsutils.Remove(this.rawWriter.Name()) SharedPartialRangesQueue.Delete(this.rangePath) - _ = os.Remove(this.rangePath) + + _ = fsutils.Remove(this.rangePath) } diff --git a/internal/caches/writer_partial_file_test.go b/internal/caches/writer_partial_file_test.go index 8352678..97ee4a5 100644 --- a/internal/caches/writer_partial_file_test.go +++ b/internal/caches/writer_partial_file_test.go @@ -4,6 +4,7 @@ package caches_test import ( "github.com/TeaOSLab/EdgeNode/internal/caches" + fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" "github.com/iwind/TeaGo/types" "os" "testing" @@ -15,7 +16,7 @@ func TestPartialFileWriter_Write(t *testing.T) { _ = os.Remove(path) var reader = func() { - data, err := os.ReadFile(path) + data, err := fsutils.ReadFile(path) if err != nil { t.Fatal(err) } diff --git a/internal/utils/dbs/stmt.go b/internal/utils/dbs/stmt.go index fa1ab7a..8358f5c 100644 --- a/internal/utils/dbs/stmt.go +++ b/internal/utils/dbs/stmt.go @@ -39,9 +39,9 @@ func (this *Stmt) ExecContext(ctx context.Context, args ...any) (result sql.Resu if this.enableStat { defer SharedQueryStatManager.AddQuery(this.query).End() } - fsutils.WriteBegin() + fsutils.WriterLimiter.Ack() result, err = this.rawStmt.ExecContext(ctx, args...) - fsutils.WriteEnd() + fsutils.WriterLimiter.Release() return } @@ -57,9 +57,9 @@ func (this *Stmt) Exec(args ...any) (result sql.Result, err error) { defer SharedQueryStatManager.AddQuery(this.query).End() } - fsutils.WriteBegin() + fsutils.WriterLimiter.Ack() result, err = this.rawStmt.Exec(args...) - fsutils.WriteEnd() + fsutils.WriterLimiter.Release() return } diff --git a/internal/utils/fs/disk.go b/internal/utils/fs/disk.go index c6eef8a..13ab3eb 100644 --- a/internal/utils/fs/disk.go +++ b/internal/utils/fs/disk.go @@ -91,7 +91,6 @@ func CheckDiskIsFast() (speedMB float64, isFast bool, err error) { } else { DiskSpeed = SpeedExtremelySlow } - calculateDiskMaxWrites() DiskSpeedMB = speedMB diff --git a/internal/utils/fs/os.go b/internal/utils/fs/os.go new file mode 100644 index 0000000..bc4e0f6 --- /dev/null +++ b/internal/utils/fs/os.go @@ -0,0 +1,31 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package fsutils + +import "os" + +func Remove(filename string) (err error) { + WriterLimiter.Ack() + err = os.Remove(filename) + WriterLimiter.Release() + return +} + +func Rename(oldPath string, newPath string) (err error) { + WriterLimiter.Ack() + err = os.Rename(oldPath, newPath) + WriterLimiter.Release() + return +} + +func ReadFile(filename string) (data []byte, err error) { + ReaderLimiter.Ack() + data, err = os.ReadFile(filename) + ReaderLimiter.Release() + return +} + +func WriteFile(filename string, data []byte, perm os.FileMode) (err error) { + err = os.WriteFile(filename, data, perm) + return +} diff --git a/internal/utils/fs/status.go b/internal/utils/fs/status.go index 3bdf426..4c50e69 100644 --- a/internal/utils/fs/status.go +++ b/internal/utils/fs/status.go @@ -9,7 +9,6 @@ import ( "github.com/iwind/TeaGo/Tea" "github.com/shirou/gopsutil/v3/load" "os" - "sync/atomic" "time" ) @@ -37,9 +36,8 @@ const ( ) var ( - DiskSpeed = SpeedLow - DiskMaxWrites int32 = 32 - DiskSpeedMB float64 + DiskSpeed = SpeedLow + DiskSpeedMB float64 ) var IsInHighLoad = false @@ -65,7 +63,6 @@ func init() { if err == nil && cache.SpeedMB > 0 { DiskSpeedMB = cache.SpeedMB DiskSpeed = cache.Speed - calculateDiskMaxWrites() } } @@ -109,39 +106,6 @@ func DiskIsExtremelyFast() bool { return DiskSpeed == SpeedExtremelyFast } -var countWrites int32 = 0 - -func WriteReady() bool { - if IsInExtremelyHighLoad { - return false - } - - return atomic.LoadInt32(&countWrites) < DiskMaxWrites -} - -func WriteBegin() { - atomic.AddInt32(&countWrites, 1) -} - -func WriteEnd() { - atomic.AddInt32(&countWrites, -1) -} - -func calculateDiskMaxWrites() { - switch DiskSpeed { - case SpeedExtremelyFast: - DiskMaxWrites = 32 - case SpeedFast: - DiskMaxWrites = 16 - case SpeedLow: - DiskMaxWrites = 8 - case SpeedExtremelySlow: - DiskMaxWrites = 4 - default: - DiskMaxWrites = 4 - } -} - // WaitLoad wait system load to downgrade func WaitLoad(maxLoad float64, maxLoops int, delay time.Duration) { for i := 0; i < maxLoops; i++ { diff --git a/internal/utils/fs/status_test.go b/internal/utils/fs/status_test.go index 88db891..54f3ad2 100644 --- a/internal/utils/fs/status_test.go +++ b/internal/utils/fs/status_test.go @@ -4,33 +4,10 @@ package fsutils_test import ( fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" - "github.com/iwind/TeaGo/assert" "testing" "time" ) -func TestWrites(t *testing.T) { - var a = assert.NewAssertion(t) - - for i := 0; i < int(fsutils.DiskMaxWrites); i++ { - fsutils.WriteBegin() - } - a.IsFalse(fsutils.WriteReady()) - - fsutils.WriteEnd() - a.IsTrue(fsutils.WriteReady()) -} - func TestWaitLoad(t *testing.T) { fsutils.WaitLoad(100, 5, 1*time.Minute) } - -func BenchmarkWrites(b *testing.B) { - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - fsutils.WriteReady() - fsutils.WriteBegin() - fsutils.WriteEnd() - } - }) -}