diff --git a/internal/caches/errors.go b/internal/caches/errors.go index 2fc49ba..626f958 100644 --- a/internal/caches/errors.go +++ b/internal/caches/errors.go @@ -55,3 +55,7 @@ func IsCapacityError(err error) bool { var capacityErr *CapacityError return errors.As(err, &capacityErr) } + +func IsBusyError(err error) bool { + return err != nil && errors.Is(err, ErrServerIsBusy) +} diff --git a/internal/caches/reader_file.go b/internal/caches/reader_file.go index 9c2146f..184224a 100644 --- a/internal/caches/reader_file.go +++ b/internal/caches/reader_file.go @@ -3,6 +3,7 @@ package caches import ( "encoding/binary" "errors" + fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" rangeutils "github.com/TeaOSLab/EdgeNode/internal/utils/ranges" "github.com/iwind/TeaGo/types" "io" @@ -174,7 +175,9 @@ func (this *FileReader) ReadHeader(buf []byte, callback ReaderFunc) error { var headerSize = this.headerSize for { + fsutils.ReaderLimiter.Ack() n, err := this.fp.Read(buf) + fsutils.ReaderLimiter.Release() if n > 0 { if n < headerSize { goNext, e := callback(n) @@ -236,7 +239,9 @@ func (this *FileReader) ReadBody(buf []byte, callback ReaderFunc) error { } for { + fsutils.ReaderLimiter.Ack() n, err := this.fp.Read(buf) + fsutils.ReaderLimiter.Release() if n > 0 { goNext, e := callback(n) if e != nil { @@ -267,7 +272,9 @@ func (this *FileReader) Read(buf []byte) (n int, err error) { return } + fsutils.ReaderLimiter.Ack() n, err = this.fp.Read(buf) + fsutils.ReaderLimiter.Release() if err != nil && err != io.EOF { _ = this.discard() } @@ -299,13 +306,17 @@ func (this *FileReader) ReadBodyRange(buf []byte, start int64, end int64, callba isOk = true return ErrInvalidRange } + fsutils.ReaderLimiter.Ack() _, err := this.fp.Seek(offset, io.SeekStart) + fsutils.ReaderLimiter.Release() if err != nil { return err } for { + fsutils.ReaderLimiter.Ack() n, err := this.fp.Read(buf) + fsutils.ReaderLimiter.Release() if n > 0 { var n2 = int(end-offset) + 1 if n2 <= n { @@ -375,7 +386,9 @@ func (this *FileReader) Close() error { } func (this *FileReader) readToBuff(fp *os.File, buf []byte) (ok bool, err error) { + fsutils.ReaderLimiter.Ack() n, err := fp.Read(buf) + fsutils.ReaderLimiter.Release() if err != nil { return false, err } diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 2d56eb8..15029f3 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -393,7 +393,11 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool, // 尝试通过MMAP读取 if estimatedSize > 0 { + if !fsutils.ReaderLimiter.TryAck() { + return nil, ErrServerIsBusy + } reader, err := this.tryMMAPReader(isPartial, estimatedSize, path) + fsutils.ReaderLimiter.Release() if err != nil { return nil, err } @@ -412,7 +416,11 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool, var err error if openFile == nil { + if !fsutils.ReaderLimiter.TryAck() { + return nil, ErrServerIsBusy + } fp, err = os.OpenFile(path, os.O_RDONLY, 0444) + fsutils.ReaderLimiter.Release() if err != nil { if !os.IsNotExist(err) { return nil, err diff --git a/internal/nodes/http_request_cache.go b/internal/nodes/http_request_cache.go index ff03cc3..b1e3d3e 100644 --- a/internal/nodes/http_request_cache.go +++ b/internal/nodes/http_request_cache.go @@ -232,7 +232,12 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { if this.web.Compression != nil && this.web.Compression.IsOn { _, encoding, ok := this.web.Compression.MatchAcceptEncoding(this.RawReq.Header.Get("Accept-Encoding")) if ok { - reader, _ = storage.OpenReader(key+caches.SuffixWebP+caches.SuffixCompression+encoding, useStale, false) + reader, err = storage.OpenReader(key+caches.SuffixWebP+caches.SuffixCompression+encoding, useStale, false) + if err != nil && caches.IsBusyError(err) { + this.varMapping["cache.status"] = "BUSY" + this.cacheRef = nil + return + } if reader != nil { tags = append(tags, "webp", encoding) } @@ -244,7 +249,12 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { if webPIsEnabled && !isPartialRequest && !isHeadMethod && reader == nil { - reader, _ = storage.OpenReader(key+caches.SuffixWebP, useStale, false) + reader, err = storage.OpenReader(key+caches.SuffixWebP, useStale, false) + if err != nil && caches.IsBusyError(err) { + this.varMapping["cache.status"] = "BUSY" + this.cacheRef = nil + return + } if reader != nil { this.writer.cacheReaderSuffix = caches.SuffixWebP tags = append(tags, "webp") @@ -256,7 +266,12 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { if this.web.Compression != nil && this.web.Compression.IsOn { _, encoding, ok := this.web.Compression.MatchAcceptEncoding(this.RawReq.Header.Get("Accept-Encoding")) if ok { - reader, _ = storage.OpenReader(key+caches.SuffixCompression+encoding, useStale, false) + reader, err = storage.OpenReader(key+caches.SuffixCompression+encoding, useStale, false) + if err != nil && caches.IsBusyError(err) { + this.varMapping["cache.status"] = "BUSY" + this.cacheRef = nil + return + } if reader != nil { tags = append(tags, encoding) } @@ -269,6 +284,11 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { var partialRanges []rangeutils.Range if reader == nil { reader, err = storage.OpenReader(key, useStale, false) + if err != nil && caches.IsBusyError(err) { + this.varMapping["cache.status"] = "BUSY" + this.cacheRef = nil + return + } if err != nil && this.cacheRef.AllowPartialContent { // 尝试读取分片的缓存内容 if len(rangeHeader) == 0 && this.cacheRef.ForcePartialContent { @@ -277,7 +297,11 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { } if len(rangeHeader) > 0 { - pReader, ranges := this.tryPartialReader(storage, key, useStale, rangeHeader, this.cacheRef.ForcePartialContent) + pReader, ranges, goNext := this.tryPartialReader(storage, key, useStale, rangeHeader, this.cacheRef.ForcePartialContent) + if !goNext { + this.cacheRef = nil + return + } if pReader != nil { isPartialCache = true reader = pReader @@ -648,26 +672,33 @@ func (this *HTTPRequest) addExpiresHeader(expiresAt int64) { } // 尝试读取区间缓存 -func (this *HTTPRequest) tryPartialReader(storage caches.StorageInterface, key string, useStale bool, rangeHeader string, forcePartialContent bool) (caches.Reader, []rangeutils.Range) { +func (this *HTTPRequest) tryPartialReader(storage caches.StorageInterface, key string, useStale bool, rangeHeader string, forcePartialContent bool) (resultReader caches.Reader, ranges []rangeutils.Range, goNext bool) { + goNext = true + // 尝试读取Partial cache if len(rangeHeader) == 0 { - return nil, nil + return } ranges, ok := httpRequestParseRangeHeader(rangeHeader) if !ok { - return nil, nil + return } pReader, pErr := storage.OpenReader(key+caches.SuffixPartial, useStale, true) if pErr != nil { - return nil, nil + if caches.IsBusyError(pErr) { + this.varMapping["cache.status"] = "BUSY" + goNext = false + return + } + return } partialReader, ok := pReader.(*caches.PartialFileReader) if !ok { _ = pReader.Close() - return nil, nil + return } var isOk = false defer func() { @@ -681,7 +712,7 @@ func (this *HTTPRequest) tryPartialReader(storage caches.StorageInterface, key s len(ranges) > 0 && ranges[0][1] < 0 && !partialReader.IsCompleted() { - return nil, nil + return } // 检查范围 @@ -689,15 +720,15 @@ func (this *HTTPRequest) tryPartialReader(storage caches.StorageInterface, key s for index, r := range ranges { r1, ok := r.Convert(partialReader.MaxLength()) if !ok { - return nil, nil + return } r2, ok := partialReader.ContainsRange(r1) if !ok { - return nil, nil + return } ranges[index] = r2 } isOk = true - return pReader, ranges + return pReader, ranges, true } diff --git a/internal/utils/fs/limiter.go b/internal/utils/fs/limiter.go new file mode 100644 index 0000000..e809b93 --- /dev/null +++ b/internal/utils/fs/limiter.go @@ -0,0 +1,64 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package fsutils + +import ( + "runtime" + "time" +) + +var WriterLimiter = NewLimiter(runtime.NumCPU()) +var ReaderLimiter = NewLimiter(runtime.NumCPU()) + +type Limiter struct { + threads chan struct{} + timers chan *time.Timer +} + +func NewLimiter(threads int) *Limiter { + var threadsChan = make(chan struct{}, threads) + for i := 0; i < threads; i++ { + threadsChan <- struct{}{} + } + + return &Limiter{ + threads: threadsChan, + timers: make(chan *time.Timer, 2048), + } +} + +func (this *Limiter) Ack() { + <-this.threads +} + +func (this *Limiter) TryAck() bool { + const timeoutDuration = 500 * time.Millisecond + + var timeout *time.Timer + select { + case timeout = <-this.timers: + timeout.Reset(timeoutDuration) + default: + timeout = time.NewTimer(timeoutDuration) + } + + defer func() { + timeout.Stop() + + select { + case this.timers <- timeout: + default: + } + }() + + select { + case <-this.threads: + return true + case <-timeout.C: + return false + } +} + +func (this *Limiter) Release() { + this.threads <- struct{}{} +}