优化并发读写限制

This commit is contained in:
GoEdgeLab
2024-05-01 12:42:35 +08:00
parent 2b35971a66
commit c50a0de9eb
5 changed files with 59 additions and 8 deletions

View File

@@ -314,8 +314,9 @@ func (this *FileReader) ReadBodyRange(buf []byte, start int64, end int64, callba
}
for {
var n int
fsutils.ReaderLimiter.Ack()
n, err := this.fp.Read(buf)
n, err = this.fp.Read(buf)
fsutils.ReaderLimiter.Release()
if n > 0 {
var n2 = int(end-offset) + 1

View File

@@ -380,6 +380,7 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
// 检查文件记录是否已过期
var estimatedSize int64
var existInList bool
if !useStale {
exists, filesize, err := this.list.Exist(hash)
if err != nil {
@@ -389,6 +390,7 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
return nil, ErrNotFound
}
estimatedSize = filesize
existInList = true
}
// 尝试通过MMAP读取
@@ -412,7 +414,13 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
var err error
if openFile == nil {
if existInList {
fsutils.ReaderLimiter.Ack()
}
fp, err = os.OpenFile(path, os.O_RDONLY, 0444)
if existInList {
fsutils.ReaderLimiter.Release()
}
if err != nil {
if !os.IsNotExist(err) {
return nil, err
@@ -583,7 +591,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
// 数据库中是否存在
existsCacheItem, _, _ := this.list.Exist(hash)
if existsCacheItem {
readerFp, err := os.OpenFile(tmpPath, os.O_RDONLY, 0444)
readerFp, err := fsutils.OpenFile(tmpPath, os.O_RDONLY, 0444)
if err == nil {
var partialReader = NewPartialFileReader(readerFp)
err = partialReader.Init()
@@ -616,8 +624,10 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
if isNewCreated && existsFile {
flags |= os.O_TRUNC
}
if !isFlushing && !fsutils.WriterLimiter.TryAck() {
return nil, ErrServerIsBusy
if !isFlushing {
if !fsutils.WriterLimiter.TryAck() {
return nil, ErrServerIsBusy
}
}
writer, err := os.OpenFile(tmpPath, flags, 0666)
if !isFlushing {

View File

@@ -8,8 +8,8 @@ import (
)
var maxThreads = runtime.NumCPU()
var WriterLimiter = NewLimiter(max(maxThreads, 4))
var ReaderLimiter = NewLimiter(max(maxThreads*2, 8))
var WriterLimiter = NewLimiter(max(maxThreads, 8))
var ReaderLimiter = NewLimiter(max(maxThreads, 8))
type Limiter struct {
threads chan struct{}
@@ -60,7 +60,7 @@ func (this *Limiter) Ack() {
}
func (this *Limiter) TryAck() bool {
const timeoutDuration = 1 * time.Second
const timeoutDuration = 500 * time.Millisecond
var timeout *time.Timer
select {

View File

@@ -2,7 +2,9 @@
package fsutils
import "os"
import (
"os"
)
func Remove(filename string) (err error) {
WriterLimiter.Ack()
@@ -31,3 +33,24 @@ func WriteFile(filename string, data []byte, perm os.FileMode) (err error) {
WriterLimiter.Release()
return
}
func OpenFile(name string, flag int, perm os.FileMode) (f *os.File, err error) {
if flag&os.O_RDONLY == os.O_RDONLY {
ReaderLimiter.Ack()
}
f, err = os.OpenFile(name, flag, perm)
if flag&os.O_RDONLY == os.O_RDONLY {
ReaderLimiter.Release()
}
return
}
func Open(name string) (f *os.File, err error) {
ReaderLimiter.Ack()
f, err = os.Open(name)
ReaderLimiter.Release()
return
}

View File

@@ -0,0 +1,17 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package fsutils_test
import (
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
"os"
"testing"
)
func TestOpenFile(t *testing.T) {
f, err := fsutils.OpenFile("./os_test.go", os.O_RDONLY, 0444)
if err != nil {
t.Fatal(err)
}
_ = f.Close()
}