mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-18 03:35:10 +08:00
读取文件时增加线程数限制
This commit is contained in:
@@ -55,3 +55,7 @@ func IsCapacityError(err error) bool {
|
|||||||
var capacityErr *CapacityError
|
var capacityErr *CapacityError
|
||||||
return errors.As(err, &capacityErr)
|
return errors.As(err, &capacityErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func IsBusyError(err error) bool {
|
||||||
|
return err != nil && errors.Is(err, ErrServerIsBusy)
|
||||||
|
}
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package caches
|
|||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
|
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
|
||||||
rangeutils "github.com/TeaOSLab/EdgeNode/internal/utils/ranges"
|
rangeutils "github.com/TeaOSLab/EdgeNode/internal/utils/ranges"
|
||||||
"github.com/iwind/TeaGo/types"
|
"github.com/iwind/TeaGo/types"
|
||||||
"io"
|
"io"
|
||||||
@@ -174,7 +175,9 @@ func (this *FileReader) ReadHeader(buf []byte, callback ReaderFunc) error {
|
|||||||
var headerSize = this.headerSize
|
var headerSize = this.headerSize
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
fsutils.ReaderLimiter.Ack()
|
||||||
n, err := this.fp.Read(buf)
|
n, err := this.fp.Read(buf)
|
||||||
|
fsutils.ReaderLimiter.Release()
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
if n < headerSize {
|
if n < headerSize {
|
||||||
goNext, e := callback(n)
|
goNext, e := callback(n)
|
||||||
@@ -236,7 +239,9 @@ func (this *FileReader) ReadBody(buf []byte, callback ReaderFunc) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
fsutils.ReaderLimiter.Ack()
|
||||||
n, err := this.fp.Read(buf)
|
n, err := this.fp.Read(buf)
|
||||||
|
fsutils.ReaderLimiter.Release()
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
goNext, e := callback(n)
|
goNext, e := callback(n)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
@@ -267,7 +272,9 @@ func (this *FileReader) Read(buf []byte) (n int, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fsutils.ReaderLimiter.Ack()
|
||||||
n, err = this.fp.Read(buf)
|
n, err = this.fp.Read(buf)
|
||||||
|
fsutils.ReaderLimiter.Release()
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
_ = this.discard()
|
_ = this.discard()
|
||||||
}
|
}
|
||||||
@@ -299,13 +306,17 @@ func (this *FileReader) ReadBodyRange(buf []byte, start int64, end int64, callba
|
|||||||
isOk = true
|
isOk = true
|
||||||
return ErrInvalidRange
|
return ErrInvalidRange
|
||||||
}
|
}
|
||||||
|
fsutils.ReaderLimiter.Ack()
|
||||||
_, err := this.fp.Seek(offset, io.SeekStart)
|
_, err := this.fp.Seek(offset, io.SeekStart)
|
||||||
|
fsutils.ReaderLimiter.Release()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
fsutils.ReaderLimiter.Ack()
|
||||||
n, err := this.fp.Read(buf)
|
n, err := this.fp.Read(buf)
|
||||||
|
fsutils.ReaderLimiter.Release()
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
var n2 = int(end-offset) + 1
|
var n2 = int(end-offset) + 1
|
||||||
if n2 <= n {
|
if n2 <= n {
|
||||||
@@ -375,7 +386,9 @@ func (this *FileReader) Close() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *FileReader) readToBuff(fp *os.File, buf []byte) (ok bool, err error) {
|
func (this *FileReader) readToBuff(fp *os.File, buf []byte) (ok bool, err error) {
|
||||||
|
fsutils.ReaderLimiter.Ack()
|
||||||
n, err := fp.Read(buf)
|
n, err := fp.Read(buf)
|
||||||
|
fsutils.ReaderLimiter.Release()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -393,7 +393,11 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
|
|||||||
|
|
||||||
// 尝试通过MMAP读取
|
// 尝试通过MMAP读取
|
||||||
if estimatedSize > 0 {
|
if estimatedSize > 0 {
|
||||||
|
if !fsutils.ReaderLimiter.TryAck() {
|
||||||
|
return nil, ErrServerIsBusy
|
||||||
|
}
|
||||||
reader, err := this.tryMMAPReader(isPartial, estimatedSize, path)
|
reader, err := this.tryMMAPReader(isPartial, estimatedSize, path)
|
||||||
|
fsutils.ReaderLimiter.Release()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -412,7 +416,11 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
|
|||||||
|
|
||||||
var err error
|
var err error
|
||||||
if openFile == nil {
|
if openFile == nil {
|
||||||
|
if !fsutils.ReaderLimiter.TryAck() {
|
||||||
|
return nil, ErrServerIsBusy
|
||||||
|
}
|
||||||
fp, err = os.OpenFile(path, os.O_RDONLY, 0444)
|
fp, err = os.OpenFile(path, os.O_RDONLY, 0444)
|
||||||
|
fsutils.ReaderLimiter.Release()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !os.IsNotExist(err) {
|
if !os.IsNotExist(err) {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|||||||
@@ -232,7 +232,12 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
|||||||
if this.web.Compression != nil && this.web.Compression.IsOn {
|
if this.web.Compression != nil && this.web.Compression.IsOn {
|
||||||
_, encoding, ok := this.web.Compression.MatchAcceptEncoding(this.RawReq.Header.Get("Accept-Encoding"))
|
_, encoding, ok := this.web.Compression.MatchAcceptEncoding(this.RawReq.Header.Get("Accept-Encoding"))
|
||||||
if ok {
|
if ok {
|
||||||
reader, _ = storage.OpenReader(key+caches.SuffixWebP+caches.SuffixCompression+encoding, useStale, false)
|
reader, err = storage.OpenReader(key+caches.SuffixWebP+caches.SuffixCompression+encoding, useStale, false)
|
||||||
|
if err != nil && caches.IsBusyError(err) {
|
||||||
|
this.varMapping["cache.status"] = "BUSY"
|
||||||
|
this.cacheRef = nil
|
||||||
|
return
|
||||||
|
}
|
||||||
if reader != nil {
|
if reader != nil {
|
||||||
tags = append(tags, "webp", encoding)
|
tags = append(tags, "webp", encoding)
|
||||||
}
|
}
|
||||||
@@ -244,7 +249,12 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
|||||||
if webPIsEnabled && !isPartialRequest &&
|
if webPIsEnabled && !isPartialRequest &&
|
||||||
!isHeadMethod &&
|
!isHeadMethod &&
|
||||||
reader == nil {
|
reader == nil {
|
||||||
reader, _ = storage.OpenReader(key+caches.SuffixWebP, useStale, false)
|
reader, err = storage.OpenReader(key+caches.SuffixWebP, useStale, false)
|
||||||
|
if err != nil && caches.IsBusyError(err) {
|
||||||
|
this.varMapping["cache.status"] = "BUSY"
|
||||||
|
this.cacheRef = nil
|
||||||
|
return
|
||||||
|
}
|
||||||
if reader != nil {
|
if reader != nil {
|
||||||
this.writer.cacheReaderSuffix = caches.SuffixWebP
|
this.writer.cacheReaderSuffix = caches.SuffixWebP
|
||||||
tags = append(tags, "webp")
|
tags = append(tags, "webp")
|
||||||
@@ -256,7 +266,12 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
|||||||
if this.web.Compression != nil && this.web.Compression.IsOn {
|
if this.web.Compression != nil && this.web.Compression.IsOn {
|
||||||
_, encoding, ok := this.web.Compression.MatchAcceptEncoding(this.RawReq.Header.Get("Accept-Encoding"))
|
_, encoding, ok := this.web.Compression.MatchAcceptEncoding(this.RawReq.Header.Get("Accept-Encoding"))
|
||||||
if ok {
|
if ok {
|
||||||
reader, _ = storage.OpenReader(key+caches.SuffixCompression+encoding, useStale, false)
|
reader, err = storage.OpenReader(key+caches.SuffixCompression+encoding, useStale, false)
|
||||||
|
if err != nil && caches.IsBusyError(err) {
|
||||||
|
this.varMapping["cache.status"] = "BUSY"
|
||||||
|
this.cacheRef = nil
|
||||||
|
return
|
||||||
|
}
|
||||||
if reader != nil {
|
if reader != nil {
|
||||||
tags = append(tags, encoding)
|
tags = append(tags, encoding)
|
||||||
}
|
}
|
||||||
@@ -269,6 +284,11 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
|||||||
var partialRanges []rangeutils.Range
|
var partialRanges []rangeutils.Range
|
||||||
if reader == nil {
|
if reader == nil {
|
||||||
reader, err = storage.OpenReader(key, useStale, false)
|
reader, err = storage.OpenReader(key, useStale, false)
|
||||||
|
if err != nil && caches.IsBusyError(err) {
|
||||||
|
this.varMapping["cache.status"] = "BUSY"
|
||||||
|
this.cacheRef = nil
|
||||||
|
return
|
||||||
|
}
|
||||||
if err != nil && this.cacheRef.AllowPartialContent {
|
if err != nil && this.cacheRef.AllowPartialContent {
|
||||||
// 尝试读取分片的缓存内容
|
// 尝试读取分片的缓存内容
|
||||||
if len(rangeHeader) == 0 && this.cacheRef.ForcePartialContent {
|
if len(rangeHeader) == 0 && this.cacheRef.ForcePartialContent {
|
||||||
@@ -277,7 +297,11 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if len(rangeHeader) > 0 {
|
if len(rangeHeader) > 0 {
|
||||||
pReader, ranges := this.tryPartialReader(storage, key, useStale, rangeHeader, this.cacheRef.ForcePartialContent)
|
pReader, ranges, goNext := this.tryPartialReader(storage, key, useStale, rangeHeader, this.cacheRef.ForcePartialContent)
|
||||||
|
if !goNext {
|
||||||
|
this.cacheRef = nil
|
||||||
|
return
|
||||||
|
}
|
||||||
if pReader != nil {
|
if pReader != nil {
|
||||||
isPartialCache = true
|
isPartialCache = true
|
||||||
reader = pReader
|
reader = pReader
|
||||||
@@ -648,26 +672,33 @@ func (this *HTTPRequest) addExpiresHeader(expiresAt int64) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 尝试读取区间缓存
|
// 尝试读取区间缓存
|
||||||
func (this *HTTPRequest) tryPartialReader(storage caches.StorageInterface, key string, useStale bool, rangeHeader string, forcePartialContent bool) (caches.Reader, []rangeutils.Range) {
|
func (this *HTTPRequest) tryPartialReader(storage caches.StorageInterface, key string, useStale bool, rangeHeader string, forcePartialContent bool) (resultReader caches.Reader, ranges []rangeutils.Range, goNext bool) {
|
||||||
|
goNext = true
|
||||||
|
|
||||||
// 尝试读取Partial cache
|
// 尝试读取Partial cache
|
||||||
if len(rangeHeader) == 0 {
|
if len(rangeHeader) == 0 {
|
||||||
return nil, nil
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ranges, ok := httpRequestParseRangeHeader(rangeHeader)
|
ranges, ok := httpRequestParseRangeHeader(rangeHeader)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, nil
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
pReader, pErr := storage.OpenReader(key+caches.SuffixPartial, useStale, true)
|
pReader, pErr := storage.OpenReader(key+caches.SuffixPartial, useStale, true)
|
||||||
if pErr != nil {
|
if pErr != nil {
|
||||||
return nil, nil
|
if caches.IsBusyError(pErr) {
|
||||||
|
this.varMapping["cache.status"] = "BUSY"
|
||||||
|
goNext = false
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
partialReader, ok := pReader.(*caches.PartialFileReader)
|
partialReader, ok := pReader.(*caches.PartialFileReader)
|
||||||
if !ok {
|
if !ok {
|
||||||
_ = pReader.Close()
|
_ = pReader.Close()
|
||||||
return nil, nil
|
return
|
||||||
}
|
}
|
||||||
var isOk = false
|
var isOk = false
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -681,7 +712,7 @@ func (this *HTTPRequest) tryPartialReader(storage caches.StorageInterface, key s
|
|||||||
len(ranges) > 0 &&
|
len(ranges) > 0 &&
|
||||||
ranges[0][1] < 0 &&
|
ranges[0][1] < 0 &&
|
||||||
!partialReader.IsCompleted() {
|
!partialReader.IsCompleted() {
|
||||||
return nil, nil
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// 检查范围
|
// 检查范围
|
||||||
@@ -689,15 +720,15 @@ func (this *HTTPRequest) tryPartialReader(storage caches.StorageInterface, key s
|
|||||||
for index, r := range ranges {
|
for index, r := range ranges {
|
||||||
r1, ok := r.Convert(partialReader.MaxLength())
|
r1, ok := r.Convert(partialReader.MaxLength())
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, nil
|
return
|
||||||
}
|
}
|
||||||
r2, ok := partialReader.ContainsRange(r1)
|
r2, ok := partialReader.ContainsRange(r1)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, nil
|
return
|
||||||
}
|
}
|
||||||
ranges[index] = r2
|
ranges[index] = r2
|
||||||
}
|
}
|
||||||
|
|
||||||
isOk = true
|
isOk = true
|
||||||
return pReader, ranges
|
return pReader, ranges, true
|
||||||
}
|
}
|
||||||
|
|||||||
64
internal/utils/fs/limiter.go
Normal file
64
internal/utils/fs/limiter.go
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||||
|
|
||||||
|
package fsutils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"runtime"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var WriterLimiter = NewLimiter(runtime.NumCPU())
|
||||||
|
var ReaderLimiter = NewLimiter(runtime.NumCPU())
|
||||||
|
|
||||||
|
type Limiter struct {
|
||||||
|
threads chan struct{}
|
||||||
|
timers chan *time.Timer
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewLimiter(threads int) *Limiter {
|
||||||
|
var threadsChan = make(chan struct{}, threads)
|
||||||
|
for i := 0; i < threads; i++ {
|
||||||
|
threadsChan <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Limiter{
|
||||||
|
threads: threadsChan,
|
||||||
|
timers: make(chan *time.Timer, 2048),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *Limiter) Ack() {
|
||||||
|
<-this.threads
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *Limiter) TryAck() bool {
|
||||||
|
const timeoutDuration = 500 * time.Millisecond
|
||||||
|
|
||||||
|
var timeout *time.Timer
|
||||||
|
select {
|
||||||
|
case timeout = <-this.timers:
|
||||||
|
timeout.Reset(timeoutDuration)
|
||||||
|
default:
|
||||||
|
timeout = time.NewTimer(timeoutDuration)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
timeout.Stop()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case this.timers <- timeout:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-this.threads:
|
||||||
|
return true
|
||||||
|
case <-timeout.C:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *Limiter) Release() {
|
||||||
|
this.threads <- struct{}{}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user