写入和删除缓存文件时增加线程数限制

This commit is contained in:
GoEdgeLab
2024-04-29 22:36:26 +08:00
parent 43a594198e
commit f5136e94d8
15 changed files with 117 additions and 136 deletions

View File

@@ -9,6 +9,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/types"
@@ -593,9 +594,9 @@ func (this *SQLiteFileListDB) shouldRecover() bool {
// 删除数据库文件
func (this *SQLiteFileListDB) deleteDB() {
_ = os.Remove(this.dbPath)
_ = os.Remove(this.dbPath + "-shm")
_ = os.Remove(this.dbPath + "-wal")
_ = fsutils.Remove(this.dbPath)
_ = fsutils.Remove(this.dbPath + "-shm")
_ = fsutils.Remove(this.dbPath + "-wal")
}
// 加载Hash列表

View File

@@ -11,6 +11,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
"github.com/TeaOSLab/EdgeNode/internal/utils/fnv"
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
"github.com/TeaOSLab/EdgeNode/internal/zero"
"github.com/iwind/TeaGo/types"
"os"
@@ -486,7 +487,7 @@ func (this *SQLiteFileList) UpgradeV3(oldDir string, brokenOnError bool) error {
remotelogs.Println("CACHE", "upgrading local database from '"+oldDir+"' ...")
defer func() {
_ = os.Remove(indexDBPath)
_ = fsutils.Remove(indexDBPath)
remotelogs.Println("CACHE", "upgrading local database finished")
}()

View File

@@ -7,8 +7,8 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils/fnv"
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem"
"os"
"sync"
)
@@ -91,7 +91,7 @@ func (this *PartialRangesQueue) Get(filename string) ([]byte, error) {
return data, nil
}
return os.ReadFile(filename)
return fsutils.ReadFile(filename)
}
// Delete ranges filename
@@ -119,7 +119,7 @@ func (this *PartialRangesQueue) Dump() {
continue
}
err := os.WriteFile(filename, data, 0666)
err := fsutils.WriteFile(filename, data, 0666)
if err != nil {
remotelogs.Println("PARTIAL_RANGES_QUEUE", "write file '"+filename+"' failed: "+err.Error())
}

View File

@@ -406,5 +406,5 @@ func (this *FileReader) discard() error {
}
// remove file
return os.Remove(this.fp.Name())
return fsutils.Remove(this.fp.Name())
}

View File

@@ -4,6 +4,7 @@ import (
"encoding/binary"
"errors"
"fmt"
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
rangeutils "github.com/TeaOSLab/EdgeNode/internal/utils/ranges"
"github.com/iwind/TeaGo/types"
"io"
@@ -146,7 +147,7 @@ func (this *PartialFileReader) IsCompleted() bool {
func (this *PartialFileReader) discard() error {
SharedPartialRangesQueue.Delete(this.rangePath)
_ = os.Remove(this.rangePath)
_ = fsutils.Remove(this.rangePath)
return this.FileReader.discard()
}

View File

@@ -525,11 +525,6 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
return nil, fmt.Errorf("%w(001)", ErrFileIsWriting)
}
if !isFlushing && !fsutils.WriteReady() {
sharedWritingFileKeyLocker.Unlock()
return nil, ErrServerIsBusy
}
sharedWritingFileKeyMap[key] = zero.New()
sharedWritingFileKeyLocker.Unlock()
defer func() {
@@ -596,7 +591,11 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
// 数据库中是否存在
existsCacheItem, _, _ := this.list.Exist(hash)
if existsCacheItem {
if !fsutils.ReaderLimiter.TryAck() {
return nil, ErrServerIsBusy
}
readerFp, err := os.OpenFile(tmpPath, os.O_RDONLY, 0444)
fsutils.ReaderLimiter.Release()
if err == nil {
var partialReader = NewPartialFileReader(readerFp)
err = partialReader.Init()
@@ -629,15 +628,19 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
if isNewCreated && existsFile {
flags |= os.O_TRUNC
}
fsutils.WriteBegin()
if !fsutils.WriterLimiter.TryAck() {
return nil, ErrServerIsBusy
}
writer, err := os.OpenFile(tmpPath, flags, 0666)
fsutils.WriteEnd()
fsutils.WriterLimiter.Release()
if err != nil {
if os.IsNotExist(err) {
_ = os.MkdirAll(dir, 0777)
// open file again
fsutils.WriterLimiter.Ack()
writer, err = os.OpenFile(tmpPath, flags, 0666)
fsutils.WriterLimiter.Release()
}
if err != nil {
return nil, err
@@ -654,7 +657,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
if !isOk {
_ = writer.Close()
if removeOnFailure {
_ = os.Remove(tmpPath)
_ = fsutils.Remove(tmpPath)
}
}
}()
@@ -697,9 +700,9 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
metaBodySize = bodySize
}
fsutils.WriteBegin()
fsutils.WriterLimiter.Ack()
_, err = writer.Write(metaBytes)
fsutils.WriteEnd()
fsutils.WriterLimiter.Release()
if err != nil {
return nil, err
}
@@ -1152,9 +1155,7 @@ func (this *FileStorage) purgeLoop() {
for i := 0; i < times; i++ {
countFound, err := this.list.Purge(purgeCount, func(hash string) error {
path, _ := this.hashPath(hash)
fsutils.WriteBegin()
err := this.removeCacheFile(path)
fsutils.WriteEnd()
if err != nil && !os.IsNotExist(err) {
remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error())
}
@@ -1211,9 +1212,7 @@ func (this *FileStorage) purgeLoop() {
var before = time.Now()
err := this.list.PurgeLFU(count, func(hash string) error {
path, _ := this.hashPath(hash)
fsutils.WriteBegin()
err := this.removeCacheFile(path)
fsutils.WriteEnd()
if err != nil && !os.IsNotExist(err) {
remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error())
}
@@ -1481,7 +1480,7 @@ func (this *FileStorage) removeCacheFile(path string) error {
openFileCache.Close(path)
}
var err = os.Remove(path)
var err = fsutils.Remove(path)
if err == nil || os.IsNotExist(err) {
err = nil
@@ -1493,7 +1492,7 @@ func (this *FileStorage) removeCacheFile(path string) error {
_, statErr := os.Stat(partialPath)
if statErr == nil {
_ = os.Remove(partialPath)
_ = fsutils.Remove(partialPath)
SharedPartialRangesQueue.Delete(partialPath)
}
}

View File

@@ -189,7 +189,7 @@ func (this *MemoryStorage) openWriter(key string, expiresAt int64, status int, h
if isDirty &&
this.parentStorage != nil &&
this.dirtyQueueSize > 0 &&
len(this.dirtyChan) >= this.dirtyQueueSize-int(fsutils.DiskMaxWrites) /** delta **/ { // 缓存时间过长
len(this.dirtyChan) >= this.dirtyQueueSize-64 /** delta **/ { // 缓存时间过长
return nil, ErrWritingQueueFull
}

View File

@@ -43,9 +43,9 @@ func NewFileWriter(storage StorageInterface, rawWriter *os.File, key string, exp
// WriteHeader 写入数据
func (this *FileWriter) WriteHeader(data []byte) (n int, err error) {
fsutils.WriteBegin()
fsutils.WriterLimiter.Ack()
n, err = this.rawWriter.Write(data)
fsutils.WriteEnd()
fsutils.WriterLimiter.Release()
this.headerSize += int64(n)
if err != nil {
_ = this.Discard()
@@ -139,36 +139,36 @@ func (this *FileWriter) Close() error {
// check content length
if this.metaBodySize > 0 && this.bodySize != this.metaBodySize {
_ = this.rawWriter.Close()
_ = os.Remove(path)
_ = fsutils.Remove(path)
return ErrUnexpectedContentLength
}
err := this.WriteHeaderLength(types.Int(this.headerSize))
if err != nil {
fsutils.WriteBegin()
fsutils.WriterLimiter.Ack()
_ = this.rawWriter.Close()
fsutils.WriteEnd()
_ = os.Remove(path)
fsutils.WriterLimiter.Release()
_ = fsutils.Remove(path)
return err
}
err = this.WriteBodyLength(this.bodySize)
if err != nil {
fsutils.WriteBegin()
fsutils.WriterLimiter.Ack()
_ = this.rawWriter.Close()
fsutils.WriteEnd()
_ = os.Remove(path)
fsutils.WriterLimiter.Release()
_ = fsutils.Remove(path)
return err
}
fsutils.WriteBegin()
fsutils.WriterLimiter.Ack()
err = this.rawWriter.Close()
fsutils.WriteEnd()
fsutils.WriterLimiter.Release()
if err != nil {
_ = os.Remove(path)
_ = fsutils.Remove(path)
} else if strings.HasSuffix(path, FileTmpSuffix) {
err = os.Rename(path, strings.Replace(path, FileTmpSuffix, "", 1))
err = fsutils.Rename(path, strings.Replace(path, FileTmpSuffix, "", 1))
if err != nil {
_ = os.Remove(path)
_ = fsutils.Remove(path)
}
}
@@ -181,11 +181,11 @@ func (this *FileWriter) Discard() error {
this.endFunc()
})
fsutils.WriteBegin()
fsutils.WriterLimiter.Ack()
_ = this.rawWriter.Close()
fsutils.WriteEnd()
fsutils.WriterLimiter.Release()
err := os.Remove(this.rawWriter.Name())
err := fsutils.Remove(this.rawWriter.Name())
return err
}
@@ -211,9 +211,9 @@ func (this *FileWriter) ItemType() ItemType {
}
func (this *FileWriter) write(data []byte) (n int, err error) {
fsutils.WriteBegin()
fsutils.WriterLimiter.Ack()
n, err = this.rawWriter.Write(data)
fsutils.WriteEnd()
fsutils.WriterLimiter.Release()
this.bodySize += int64(n)
if this.maxSize > 0 && this.bodySize > this.maxSize {

View File

@@ -54,9 +54,9 @@ func (this *PartialFileWriter) WriteHeader(data []byte) (n int, err error) {
if !this.isNew {
return
}
fsutils.WriteBegin()
fsutils.WriterLimiter.Ack()
n, err = this.rawWriter.Write(data)
fsutils.WriteEnd()
fsutils.WriterLimiter.Release()
this.headerSize += int64(n)
if err != nil {
_ = this.Discard()
@@ -65,9 +65,9 @@ func (this *PartialFileWriter) WriteHeader(data []byte) (n int, err error) {
}
func (this *PartialFileWriter) AppendHeader(data []byte) error {
fsutils.WriteBegin()
fsutils.WriterLimiter.Ack()
_, err := this.rawWriter.Write(data)
fsutils.WriteEnd()
fsutils.WriterLimiter.Release()
if err != nil {
_ = this.Discard()
} else {
@@ -94,7 +94,9 @@ func (this *PartialFileWriter) WriteHeaderLength(headerLength int) error {
_ = this.Discard()
return err
}
fsutils.WriterLimiter.Ack()
_, err = this.rawWriter.Write(bytes4)
fsutils.WriterLimiter.Release()
if err != nil {
_ = this.Discard()
return err
@@ -104,9 +106,9 @@ func (this *PartialFileWriter) WriteHeaderLength(headerLength int) error {
// Write 写入数据
func (this *PartialFileWriter) Write(data []byte) (n int, err error) {
fsutils.WriteBegin()
fsutils.WriterLimiter.Ack()
n, err = this.rawWriter.Write(data)
fsutils.WriteEnd()
fsutils.WriterLimiter.Release()
this.bodySize += int64(n)
if err != nil {
_ = this.Discard()
@@ -145,9 +147,9 @@ func (this *PartialFileWriter) WriteAt(offset int64, data []byte) error {
// extend min size to prepare for file tail
const extendSizePerStep = 8 << 20
if stat.Size()+extendSizePerStep <= this.bodyOffset+offset+int64(len(data)) {
fsutils.WriteBegin()
fsutils.WriterLimiter.Ack()
_ = this.rawWriter.Truncate(stat.Size() + extendSizePerStep)
fsutils.WriteEnd()
fsutils.WriterLimiter.Release()
return nil
}
}
@@ -161,9 +163,9 @@ func (this *PartialFileWriter) WriteAt(offset int64, data []byte) error {
this.bodyOffset = SizeMeta + int64(keyLength) + this.headerSize
}
fsutils.WriteBegin()
fsutils.WriterLimiter.Ack()
_, err := this.rawWriter.WriteAt(data, this.bodyOffset+offset)
fsutils.WriteEnd()
fsutils.WriterLimiter.Release()
if err != nil {
return err
}
@@ -190,7 +192,9 @@ func (this *PartialFileWriter) WriteBodyLength(bodyLength int64) error {
_ = this.Discard()
return err
}
fsutils.WriterLimiter.Ack()
_, err = this.rawWriter.Write(bytes8)
fsutils.WriterLimiter.Release()
if err != nil {
_ = this.Discard()
return err
@@ -207,9 +211,9 @@ func (this *PartialFileWriter) Close() error {
this.ranges.BodySize = this.bodySize
err := this.ranges.WriteToFile(this.rangePath)
if err != nil {
fsutils.WriteBegin()
fsutils.WriterLimiter.Ack()
_ = this.rawWriter.Close()
fsutils.WriteEnd()
fsutils.WriterLimiter.Release()
this.remove()
return err
}
@@ -218,25 +222,25 @@ func (this *PartialFileWriter) Close() error {
if this.isNew {
err = this.WriteHeaderLength(types.Int(this.headerSize))
if err != nil {
fsutils.WriteBegin()
fsutils.WriterLimiter.Ack()
_ = this.rawWriter.Close()
fsutils.WriteEnd()
fsutils.WriterLimiter.Release()
this.remove()
return err
}
err = this.WriteBodyLength(this.bodySize)
if err != nil {
fsutils.WriteBegin()
fsutils.WriterLimiter.Ack()
_ = this.rawWriter.Close()
fsutils.WriteEnd()
fsutils.WriterLimiter.Release()
this.remove()
return err
}
}
fsutils.WriteBegin()
fsutils.WriterLimiter.Ack()
err = this.rawWriter.Close()
fsutils.WriteEnd()
fsutils.WriterLimiter.Release()
if err != nil {
this.remove()
}
@@ -250,14 +254,16 @@ func (this *PartialFileWriter) Discard() error {
this.endFunc()
})
fsutils.WriteBegin()
fsutils.WriterLimiter.Ack()
_ = this.rawWriter.Close()
fsutils.WriteEnd()
fsutils.WriterLimiter.Release()
SharedPartialRangesQueue.Delete(this.rangePath)
_ = os.Remove(this.rangePath)
err := os.Remove(this.rawWriter.Name())
_ = fsutils.Remove(this.rangePath)
err := fsutils.Remove(this.rawWriter.Name())
return err
}
@@ -287,8 +293,9 @@ func (this *PartialFileWriter) IsNew() bool {
}
func (this *PartialFileWriter) remove() {
_ = os.Remove(this.rawWriter.Name())
_ = fsutils.Remove(this.rawWriter.Name())
SharedPartialRangesQueue.Delete(this.rangePath)
_ = os.Remove(this.rangePath)
_ = fsutils.Remove(this.rangePath)
}

View File

@@ -4,6 +4,7 @@ package caches_test
import (
"github.com/TeaOSLab/EdgeNode/internal/caches"
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
"github.com/iwind/TeaGo/types"
"os"
"testing"
@@ -15,7 +16,7 @@ func TestPartialFileWriter_Write(t *testing.T) {
_ = os.Remove(path)
var reader = func() {
data, err := os.ReadFile(path)
data, err := fsutils.ReadFile(path)
if err != nil {
t.Fatal(err)
}

View File

@@ -39,9 +39,9 @@ func (this *Stmt) ExecContext(ctx context.Context, args ...any) (result sql.Resu
if this.enableStat {
defer SharedQueryStatManager.AddQuery(this.query).End()
}
fsutils.WriteBegin()
fsutils.WriterLimiter.Ack()
result, err = this.rawStmt.ExecContext(ctx, args...)
fsutils.WriteEnd()
fsutils.WriterLimiter.Release()
return
}
@@ -57,9 +57,9 @@ func (this *Stmt) Exec(args ...any) (result sql.Result, err error) {
defer SharedQueryStatManager.AddQuery(this.query).End()
}
fsutils.WriteBegin()
fsutils.WriterLimiter.Ack()
result, err = this.rawStmt.Exec(args...)
fsutils.WriteEnd()
fsutils.WriterLimiter.Release()
return
}

View File

@@ -91,7 +91,6 @@ func CheckDiskIsFast() (speedMB float64, isFast bool, err error) {
} else {
DiskSpeed = SpeedExtremelySlow
}
calculateDiskMaxWrites()
DiskSpeedMB = speedMB

31
internal/utils/fs/os.go Normal file
View File

@@ -0,0 +1,31 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package fsutils
import "os"
func Remove(filename string) (err error) {
WriterLimiter.Ack()
err = os.Remove(filename)
WriterLimiter.Release()
return
}
func Rename(oldPath string, newPath string) (err error) {
WriterLimiter.Ack()
err = os.Rename(oldPath, newPath)
WriterLimiter.Release()
return
}
func ReadFile(filename string) (data []byte, err error) {
ReaderLimiter.Ack()
data, err = os.ReadFile(filename)
ReaderLimiter.Release()
return
}
func WriteFile(filename string, data []byte, perm os.FileMode) (err error) {
err = os.WriteFile(filename, data, perm)
return
}

View File

@@ -9,7 +9,6 @@ import (
"github.com/iwind/TeaGo/Tea"
"github.com/shirou/gopsutil/v3/load"
"os"
"sync/atomic"
"time"
)
@@ -38,7 +37,6 @@ const (
var (
DiskSpeed = SpeedLow
DiskMaxWrites int32 = 32
DiskSpeedMB float64
)
@@ -65,7 +63,6 @@ func init() {
if err == nil && cache.SpeedMB > 0 {
DiskSpeedMB = cache.SpeedMB
DiskSpeed = cache.Speed
calculateDiskMaxWrites()
}
}
@@ -109,39 +106,6 @@ func DiskIsExtremelyFast() bool {
return DiskSpeed == SpeedExtremelyFast
}
var countWrites int32 = 0
func WriteReady() bool {
if IsInExtremelyHighLoad {
return false
}
return atomic.LoadInt32(&countWrites) < DiskMaxWrites
}
func WriteBegin() {
atomic.AddInt32(&countWrites, 1)
}
func WriteEnd() {
atomic.AddInt32(&countWrites, -1)
}
func calculateDiskMaxWrites() {
switch DiskSpeed {
case SpeedExtremelyFast:
DiskMaxWrites = 32
case SpeedFast:
DiskMaxWrites = 16
case SpeedLow:
DiskMaxWrites = 8
case SpeedExtremelySlow:
DiskMaxWrites = 4
default:
DiskMaxWrites = 4
}
}
// WaitLoad wait system load to downgrade
func WaitLoad(maxLoad float64, maxLoops int, delay time.Duration) {
for i := 0; i < maxLoops; i++ {

View File

@@ -4,33 +4,10 @@ package fsutils_test
import (
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
"github.com/iwind/TeaGo/assert"
"testing"
"time"
)
func TestWrites(t *testing.T) {
var a = assert.NewAssertion(t)
for i := 0; i < int(fsutils.DiskMaxWrites); i++ {
fsutils.WriteBegin()
}
a.IsFalse(fsutils.WriteReady())
fsutils.WriteEnd()
a.IsTrue(fsutils.WriteReady())
}
func TestWaitLoad(t *testing.T) {
fsutils.WaitLoad(100, 5, 1*time.Minute)
}
func BenchmarkWrites(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
fsutils.WriteReady()
fsutils.WriteBegin()
fsutils.WriteEnd()
}
})
}