优化Partial Content内容缓存,现在可以使用“部分文件缓存+部分回源”的方式提供内容

This commit is contained in:
GoEdgeLab
2024-05-07 16:20:22 +08:00
parent c1307e42f3
commit 16370307f0
11 changed files with 219 additions and 16 deletions

View File

@@ -5,6 +5,7 @@ package caches
import (
"bytes"
"encoding/json"
rangeutils "github.com/TeaOSLab/EdgeNode/internal/utils/ranges"
"github.com/iwind/TeaGo/types"
"strconv"
)
@@ -101,7 +102,6 @@ func (this *PartialRanges) Add(begin int64, end int64) {
}
// insert
// TODO 将来使用二分法改进
var index = -1
for i, r := range this.Ranges {
if r[0] > begin || (r[0] == begin && r[1] >= end) {
@@ -127,7 +127,6 @@ func (this *PartialRanges) Contains(begin int64, end int64) bool {
return false
}
// TODO 使用二分法查找改进性能
for _, r2 := range this.Ranges {
if r2[0] <= begin && r2[1] >= end {
return true
@@ -143,7 +142,6 @@ func (this *PartialRanges) Nearest(begin int64, end int64) (r [2]int64, ok bool)
return
}
// TODO 使用二分法查找改进性能
for _, r2 := range this.Ranges {
if r2[0] <= begin && r2[1] > begin {
r = [2]int64{begin, this.min(end, r2[1])}
@@ -154,6 +152,21 @@ func (this *PartialRanges) Nearest(begin int64, end int64) (r [2]int64, ok bool)
return
}
// FindRangeAtPosition 查找在某个位置上的范围
func (this *PartialRanges) FindRangeAtPosition(position int64) (r rangeutils.Range, ok bool) {
if len(this.Ranges) == 0 || position < 0 {
return
}
for _, r2 := range this.Ranges {
if r2[0] <= position && r2[1] > position {
return [2]int64{position, r2[1]}, true
}
}
return
}
// 转换为字符串
func (this *PartialRanges) String() string {
var s = "v:" + strconv.Itoa(this.Version) + "\n" + // version

View File

@@ -1,6 +1,9 @@
package caches
import "github.com/TeaOSLab/EdgeNode/internal/utils/ranges"
import (
"github.com/TeaOSLab/EdgeNode/internal/utils/ranges"
"io"
)
type ReaderFunc func(n int) (goNext bool, err error)
@@ -41,6 +44,9 @@ type Reader interface {
// ContainsRange 是否包含某个区间内容
ContainsRange(r rangeutils.Range) (r2 rangeutils.Range, ok bool)
// SetNextReader 设置下一个内容Reader
SetNextReader(nextReader io.ReadCloser)
// Close 关闭
Close() error
}

View File

@@ -0,0 +1,14 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package caches
import "io"
type BaseReader struct {
nextReader io.ReadCloser
}
// SetNextReader 设置下一个内容Reader
func (this *BaseReader) SetNextReader(nextReader io.ReadCloser) {
this.nextReader = nextReader
}

View File

@@ -11,6 +11,8 @@ import (
)
type FileReader struct {
BaseReader
fp *fsutils.File
openFile *OpenFile
@@ -343,6 +345,33 @@ func (this *FileReader) ReadBodyRange(buf []byte, start int64, end int64, callba
isOk = true
// 读取下一个Reader
if this.nextReader != nil {
defer func() {
_ = this.nextReader.Close()
}()
for {
var n int
n, err = this.nextReader.Read(buf)
if n > 0 {
goNext, writeErr := callback(n)
if writeErr != nil {
return writeErr
}
if !goNext {
break
}
}
if err != nil {
if err != io.EOF {
return err
}
break
}
}
}
return nil
}

View File

@@ -7,6 +7,8 @@ import (
)
type MemoryReader struct {
BaseReader
item *MemoryItem
offset int

View File

@@ -30,6 +30,8 @@ type PartialFileWriter struct {
ranges *PartialRanges
rangePath string
writtenBytes int64
}
func NewPartialFileWriter(rawWriter *fsutils.File, key string, expiredAt int64, metaHeaderSize int, metaBodySize int64, isNew bool, isPartial bool, bodyOffset int64, ranges *PartialRanges, endFunc func()) *PartialFileWriter {
@@ -154,13 +156,25 @@ func (this *PartialFileWriter) WriteAt(offset int64, data []byte) error {
this.bodyOffset = SizeMeta + int64(keyLength) + this.headerSize
}
_, err := this.rawWriter.WriteAt(data, this.bodyOffset+offset)
n, err := this.rawWriter.WriteAt(data, this.bodyOffset+offset)
if err != nil {
return err
}
this.ranges.Add(offset, end)
// 保存ranges内容到文件当新增数据达到一定量时就更新是为了及时更新ranges文件以便于其他请求能够及时读取到已经缓存的部分内容
this.writtenBytes += int64(n)
if this.writtenBytes > (1 << 20) {
this.writtenBytes = 0
if len(this.rangePath) > 0 {
if this.bodySize > 0 {
this.ranges.BodySize = this.bodySize
}
_ = this.ranges.WriteToFile(this.rangePath)
}
}
return nil
}
@@ -195,7 +209,9 @@ func (this *PartialFileWriter) Close() error {
this.endFunc()
})
if this.bodySize > 0 {
this.ranges.BodySize = this.bodySize
}
err := this.ranges.WriteToFile(this.rangePath)
if err != nil {
_ = this.rawWriter.Close()

View File

@@ -401,7 +401,7 @@ func (this *HTTPRequest) doBegin() {
// Reverse Proxy
if this.reverseProxyRef != nil && this.reverseProxyRef.IsOn && this.reverseProxy != nil && this.reverseProxy.IsOn {
this.doReverseProxy()
_ = this.doReverseProxy(true)
return
}

View File

@@ -282,6 +282,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
// 检查正常的文件
var isPartialCache = false
var partialRanges []rangeutils.Range
var firstRangeEnd int64
if reader == nil {
reader, err = storage.OpenReader(key, useStale, false)
if err != nil && caches.IsBusyError(err) {
@@ -297,7 +298,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
}
if len(rangeHeader) > 0 {
pReader, ranges, goNext := this.tryPartialReader(storage, key, useStale, rangeHeader, this.cacheRef.ForcePartialContent)
pReader, ranges, rangeEnd, goNext := this.tryPartialReader(storage, key, useStale, rangeHeader, this.cacheRef.ForcePartialContent)
if !goNext {
this.cacheRef = nil
return
@@ -306,6 +307,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
isPartialCache = true
reader = pReader
partialRanges = ranges
firstRangeEnd = rangeEnd
err = nil
}
}
@@ -523,7 +525,12 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
var pool = this.bytePool(fileSize)
var bodyBuf = pool.Get()
err = reader.ReadBodyRange(bodyBuf.Bytes, ranges[0].Start(), ranges[0].End(), func(n int) (goNext bool, readErr error) {
var rangeEnd = ranges[0].End()
if firstRangeEnd > 0 {
rangeEnd = firstRangeEnd
}
err = reader.ReadBodyRange(bodyBuf.Bytes, ranges[0].Start(), rangeEnd, func(n int) (goNext bool, readErr error) {
_, readErr = this.writer.Write(bodyBuf.Bytes[:n])
if readErr != nil {
return false, errWritingToClient
@@ -542,7 +549,8 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
if !this.canIgnore(err) {
remotelogs.WarnServer("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: "+err.Error())
}
return
return true
}
} else if len(ranges) > 1 {
var boundary = httpRequestGenBoundary()
@@ -672,7 +680,7 @@ func (this *HTTPRequest) addExpiresHeader(expiresAt int64) {
}
// 尝试读取区间缓存
func (this *HTTPRequest) tryPartialReader(storage caches.StorageInterface, key string, useStale bool, rangeHeader string, forcePartialContent bool) (resultReader caches.Reader, ranges []rangeutils.Range, goNext bool) {
func (this *HTTPRequest) tryPartialReader(storage caches.StorageInterface, key string, useStale bool, rangeHeader string, forcePartialContent bool) (resultReader caches.Reader, ranges []rangeutils.Range, firstRangeEnd int64, goNext bool) {
goNext = true
// 尝试读取Partial cache
@@ -712,6 +720,17 @@ func (this *HTTPRequest) tryPartialReader(storage caches.StorageInterface, key s
len(ranges) > 0 &&
ranges[0][1] < 0 &&
!partialReader.IsCompleted() {
if partialReader.BodySize() > 0 {
var r = ranges[0]
r2, findOk := partialReader.Ranges().FindRangeAtPosition(r.Start())
if findOk && r2.Length() >= (256<<10) /* worth reading */ {
isOk = true
ranges[0] = [2]int64{r.Start(), partialReader.BodySize()} // Content-Range: bytes 0-[BODY_LENGTH]
pReader.SetNextReader(NewHTTPRequestPartialReader(this, r2.End(), partialReader))
return pReader, ranges, r2.End() - 1 /* not include last byte */, true
}
}
return
}
@@ -730,5 +749,5 @@ func (this *HTTPRequest) tryPartialReader(storage caches.StorageInterface, key s
}
isOk = true
return pReader, ranges, true
return pReader, ranges, -1, true
}

View File

@@ -0,0 +1,86 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package nodes
import (
"github.com/TeaOSLab/EdgeNode/internal/caches"
"github.com/iwind/TeaGo/types"
"io"
"net/http"
)
// HTTPRequestPartialReader 分区文件读取器
type HTTPRequestPartialReader struct {
req *HTTPRequest
offset int64
resp *http.Response
cacheReader caches.Reader
cacheWriter caches.Writer
}
// NewHTTPRequestPartialReader 构建新的分区文件读取器
// req 当前请求
// offset 读取位置
// reader 当前缓存读取器
func NewHTTPRequestPartialReader(req *HTTPRequest, offset int64, reader caches.Reader) *HTTPRequestPartialReader {
return &HTTPRequestPartialReader{
req: req,
offset: offset,
cacheReader: reader,
}
}
// 读取内容
func (this *HTTPRequestPartialReader) Read(p []byte) (n int, err error) {
if this.resp == nil {
_ = this.cacheReader.Close()
this.req.RawReq.Header.Set("Range", "bytes="+types.String(this.offset)+"-")
var resp = this.req.doReverseProxy(false)
if resp == nil {
err = io.ErrUnexpectedEOF
return
}
this.resp = resp
this.prepareCacheWriter()
}
n, err = this.resp.Body.Read(p)
// 写入到缓存
if n > 0 && this.cacheWriter != nil {
_ = this.cacheWriter.WriteAt(this.offset, p[:n])
this.offset += int64(n)
}
return
}
// Close 关闭读取器
func (this *HTTPRequestPartialReader) Close() error {
if this.cacheWriter != nil {
_ = this.cacheWriter.Close()
}
if this.resp != nil && this.resp.Body != nil {
return this.resp.Body.Close()
}
return nil
}
// 准备缓存写入器
func (this *HTTPRequestPartialReader) prepareCacheWriter() {
var storage = this.req.writer.cacheStorage
if storage == nil {
return
}
var cacheKey = this.req.cacheKey + caches.SuffixPartial
writer, err := storage.OpenWriter(cacheKey, this.cacheReader.ExpiresAt(), this.cacheReader.Status(), int(this.cacheReader.HeaderSize()), this.cacheReader.BodySize(), -1, true)
if err == nil {
this.cacheWriter = writer
}
}

View File

@@ -21,7 +21,8 @@ import (
)
// 处理反向代理
func (this *HTTPRequest) doReverseProxy() {
// writeToClient 读取响应并发送到客户端
func (this *HTTPRequest) doReverseProxy(writeToClient bool) (resultResp *http.Response) {
if this.reverseProxy == nil {
return
}
@@ -33,8 +34,9 @@ func (this *HTTPRequest) doReverseProxy() {
var failStatusCode int
for i := 0; i < retries; i++ {
originId, lnNodeId, shouldRetry := this.doOriginRequest(failedOriginIds, failedLnNodeIds, i == 0, i == retries-1, &failStatusCode)
originId, lnNodeId, shouldRetry, resp := this.doOriginRequest(failedOriginIds, failedLnNodeIds, i == 0, i == retries-1, &failStatusCode, writeToClient)
if !shouldRetry {
resultResp = resp
break
}
if originId > 0 {
@@ -44,10 +46,12 @@ func (this *HTTPRequest) doReverseProxy() {
failedLnNodeIds = append(failedLnNodeIds, lnNodeId)
}
}
return
}
// 请求源站
func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeIds []int64, isFirstTry bool, isLastRetry bool, failStatusCode *int) (originId int64, lnNodeId int64, shouldRetry bool) {
func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeIds []int64, isFirstTry bool, isLastRetry bool, failStatusCode *int, writeToClient bool) (originId int64, lnNodeId int64, shouldRetry bool, resultResp *http.Response) {
// 对URL的处理
var stripPrefix = this.reverseProxy.StripPrefix
var requestURI = this.reverseProxy.RequestURI
@@ -345,8 +349,10 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId
if resp != nil && resp.Body != nil {
defer func() {
if !respBodyIsClosed {
if writeToClient {
_ = resp.Body.Close()
}
}
}()
}
@@ -423,6 +429,11 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId
return
}
if !writeToClient {
resultResp = resp
return
}
// fix Content-Type
if resp.Header["Content-Type"] == nil {
resp.Header["Content-Type"] = []string{}

View File

@@ -88,6 +88,8 @@ type HTTPWriter struct {
cacheReader caches.Reader
cacheReaderSuffix string
statusSent bool
}
// NewHTTPWriter 包装对象
@@ -844,6 +846,11 @@ func (this *HTTPWriter) SetSentHeaderBytes(sentHeaderBytes int64) {
// WriteHeader 写入状态码
func (this *HTTPWriter) WriteHeader(statusCode int) {
if this.statusSent {
return
}
this.statusSent = true
if this.rawWriter != nil {
this.rawWriter.WriteHeader(statusCode)
}