diff --git a/internal/caches/partial_ranges.go b/internal/caches/partial_ranges.go index ab359e6..6de41ca 100644 --- a/internal/caches/partial_ranges.go +++ b/internal/caches/partial_ranges.go @@ -5,6 +5,7 @@ package caches import ( "bytes" "encoding/json" + rangeutils "github.com/TeaOSLab/EdgeNode/internal/utils/ranges" "github.com/iwind/TeaGo/types" "strconv" ) @@ -101,7 +102,6 @@ func (this *PartialRanges) Add(begin int64, end int64) { } // insert - // TODO 将来使用二分法改进 var index = -1 for i, r := range this.Ranges { if r[0] > begin || (r[0] == begin && r[1] >= end) { @@ -127,7 +127,6 @@ func (this *PartialRanges) Contains(begin int64, end int64) bool { return false } - // TODO 使用二分法查找改进性能 for _, r2 := range this.Ranges { if r2[0] <= begin && r2[1] >= end { return true @@ -143,7 +142,6 @@ func (this *PartialRanges) Nearest(begin int64, end int64) (r [2]int64, ok bool) return } - // TODO 使用二分法查找改进性能 for _, r2 := range this.Ranges { if r2[0] <= begin && r2[1] > begin { r = [2]int64{begin, this.min(end, r2[1])} @@ -154,6 +152,21 @@ func (this *PartialRanges) Nearest(begin int64, end int64) (r [2]int64, ok bool) return } +// FindRangeAtPosition 查找在某个位置上的范围 +func (this *PartialRanges) FindRangeAtPosition(position int64) (r rangeutils.Range, ok bool) { + if len(this.Ranges) == 0 || position < 0 { + return + } + + for _, r2 := range this.Ranges { + if r2[0] <= position && r2[1] > position { + return [2]int64{position, r2[1]}, true + } + } + + return +} + // 转换为字符串 func (this *PartialRanges) String() string { var s = "v:" + strconv.Itoa(this.Version) + "\n" + // version diff --git a/internal/caches/reader.go b/internal/caches/reader.go index e3ca22d..2c7b77b 100644 --- a/internal/caches/reader.go +++ b/internal/caches/reader.go @@ -1,6 +1,9 @@ package caches -import "github.com/TeaOSLab/EdgeNode/internal/utils/ranges" +import ( + "github.com/TeaOSLab/EdgeNode/internal/utils/ranges" + "io" +) type ReaderFunc func(n int) (goNext bool, err error) @@ -41,6 +44,9 @@ type Reader interface { // ContainsRange 是否包含某个区间内容 ContainsRange(r rangeutils.Range) (r2 rangeutils.Range, ok bool) + // SetNextReader 设置下一个内容Reader + SetNextReader(nextReader io.ReadCloser) + // Close 关闭 Close() error } diff --git a/internal/caches/reader_base.go b/internal/caches/reader_base.go new file mode 100644 index 0000000..603e5af --- /dev/null +++ b/internal/caches/reader_base.go @@ -0,0 +1,14 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package caches + +import "io" + +type BaseReader struct { + nextReader io.ReadCloser +} + +// SetNextReader 设置下一个内容Reader +func (this *BaseReader) SetNextReader(nextReader io.ReadCloser) { + this.nextReader = nextReader +} diff --git a/internal/caches/reader_file.go b/internal/caches/reader_file.go index e55b6a8..c1c9c1a 100644 --- a/internal/caches/reader_file.go +++ b/internal/caches/reader_file.go @@ -11,6 +11,8 @@ import ( ) type FileReader struct { + BaseReader + fp *fsutils.File openFile *OpenFile @@ -343,6 +345,33 @@ func (this *FileReader) ReadBodyRange(buf []byte, start int64, end int64, callba isOk = true + // 读取下一个Reader + if this.nextReader != nil { + defer func() { + _ = this.nextReader.Close() + }() + + for { + var n int + n, err = this.nextReader.Read(buf) + if n > 0 { + goNext, writeErr := callback(n) + if writeErr != nil { + return writeErr + } + if !goNext { + break + } + } + if err != nil { + if err != io.EOF { + return err + } + break + } + } + } + return nil } diff --git a/internal/caches/reader_memory.go b/internal/caches/reader_memory.go index f5fea39..19e509c 100644 --- a/internal/caches/reader_memory.go +++ b/internal/caches/reader_memory.go @@ -7,6 +7,8 @@ import ( ) type MemoryReader struct { + BaseReader + item *MemoryItem offset int diff --git a/internal/caches/writer_partial_file.go b/internal/caches/writer_partial_file.go index 2b46c5a..65c1385 100644 --- a/internal/caches/writer_partial_file.go +++ b/internal/caches/writer_partial_file.go @@ -30,6 +30,8 @@ type PartialFileWriter struct { ranges *PartialRanges rangePath string + + writtenBytes int64 } func NewPartialFileWriter(rawWriter *fsutils.File, key string, expiredAt int64, metaHeaderSize int, metaBodySize int64, isNew bool, isPartial bool, bodyOffset int64, ranges *PartialRanges, endFunc func()) *PartialFileWriter { @@ -154,13 +156,25 @@ func (this *PartialFileWriter) WriteAt(offset int64, data []byte) error { this.bodyOffset = SizeMeta + int64(keyLength) + this.headerSize } - _, err := this.rawWriter.WriteAt(data, this.bodyOffset+offset) + n, err := this.rawWriter.WriteAt(data, this.bodyOffset+offset) if err != nil { return err } this.ranges.Add(offset, end) + // 保存ranges内容到文件,当新增数据达到一定量时就更新,是为了及时更新ranges文件,以便于其他请求能够及时读取到已经缓存的部分内容 + this.writtenBytes += int64(n) + if this.writtenBytes > (1 << 20) { + this.writtenBytes = 0 + if len(this.rangePath) > 0 { + if this.bodySize > 0 { + this.ranges.BodySize = this.bodySize + } + _ = this.ranges.WriteToFile(this.rangePath) + } + } + return nil } @@ -195,7 +209,9 @@ func (this *PartialFileWriter) Close() error { this.endFunc() }) - this.ranges.BodySize = this.bodySize + if this.bodySize > 0 { + this.ranges.BodySize = this.bodySize + } err := this.ranges.WriteToFile(this.rangePath) if err != nil { _ = this.rawWriter.Close() diff --git a/internal/nodes/http_request.go b/internal/nodes/http_request.go index f472f17..f8ad15d 100644 --- a/internal/nodes/http_request.go +++ b/internal/nodes/http_request.go @@ -401,7 +401,7 @@ func (this *HTTPRequest) doBegin() { // Reverse Proxy if this.reverseProxyRef != nil && this.reverseProxyRef.IsOn && this.reverseProxy != nil && this.reverseProxy.IsOn { - this.doReverseProxy() + _ = this.doReverseProxy(true) return } diff --git a/internal/nodes/http_request_cache.go b/internal/nodes/http_request_cache.go index b1e3d3e..4ab030d 100644 --- a/internal/nodes/http_request_cache.go +++ b/internal/nodes/http_request_cache.go @@ -282,6 +282,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { // 检查正常的文件 var isPartialCache = false var partialRanges []rangeutils.Range + var firstRangeEnd int64 if reader == nil { reader, err = storage.OpenReader(key, useStale, false) if err != nil && caches.IsBusyError(err) { @@ -297,7 +298,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { } if len(rangeHeader) > 0 { - pReader, ranges, goNext := this.tryPartialReader(storage, key, useStale, rangeHeader, this.cacheRef.ForcePartialContent) + pReader, ranges, rangeEnd, goNext := this.tryPartialReader(storage, key, useStale, rangeHeader, this.cacheRef.ForcePartialContent) if !goNext { this.cacheRef = nil return @@ -306,6 +307,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { isPartialCache = true reader = pReader partialRanges = ranges + firstRangeEnd = rangeEnd err = nil } } @@ -523,7 +525,12 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { var pool = this.bytePool(fileSize) var bodyBuf = pool.Get() - err = reader.ReadBodyRange(bodyBuf.Bytes, ranges[0].Start(), ranges[0].End(), func(n int) (goNext bool, readErr error) { + + var rangeEnd = ranges[0].End() + if firstRangeEnd > 0 { + rangeEnd = firstRangeEnd + } + err = reader.ReadBodyRange(bodyBuf.Bytes, ranges[0].Start(), rangeEnd, func(n int) (goNext bool, readErr error) { _, readErr = this.writer.Write(bodyBuf.Bytes[:n]) if readErr != nil { return false, errWritingToClient @@ -542,7 +549,8 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { if !this.canIgnore(err) { remotelogs.WarnServer("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: "+err.Error()) } - return + + return true } } else if len(ranges) > 1 { var boundary = httpRequestGenBoundary() @@ -672,7 +680,7 @@ func (this *HTTPRequest) addExpiresHeader(expiresAt int64) { } // 尝试读取区间缓存 -func (this *HTTPRequest) tryPartialReader(storage caches.StorageInterface, key string, useStale bool, rangeHeader string, forcePartialContent bool) (resultReader caches.Reader, ranges []rangeutils.Range, goNext bool) { +func (this *HTTPRequest) tryPartialReader(storage caches.StorageInterface, key string, useStale bool, rangeHeader string, forcePartialContent bool) (resultReader caches.Reader, ranges []rangeutils.Range, firstRangeEnd int64, goNext bool) { goNext = true // 尝试读取Partial cache @@ -712,6 +720,17 @@ func (this *HTTPRequest) tryPartialReader(storage caches.StorageInterface, key s len(ranges) > 0 && ranges[0][1] < 0 && !partialReader.IsCompleted() { + if partialReader.BodySize() > 0 { + var r = ranges[0] + r2, findOk := partialReader.Ranges().FindRangeAtPosition(r.Start()) + if findOk && r2.Length() >= (256<<10) /* worth reading */ { + isOk = true + ranges[0] = [2]int64{r.Start(), partialReader.BodySize()} // Content-Range: bytes 0-[BODY_LENGTH] + + pReader.SetNextReader(NewHTTPRequestPartialReader(this, r2.End(), partialReader)) + return pReader, ranges, r2.End() - 1 /* not include last byte */, true + } + } return } @@ -730,5 +749,5 @@ func (this *HTTPRequest) tryPartialReader(storage caches.StorageInterface, key s } isOk = true - return pReader, ranges, true + return pReader, ranges, -1, true } diff --git a/internal/nodes/http_request_cache_partial.go b/internal/nodes/http_request_cache_partial.go new file mode 100644 index 0000000..6b6f7cf --- /dev/null +++ b/internal/nodes/http_request_cache_partial.go @@ -0,0 +1,86 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package nodes + +import ( + "github.com/TeaOSLab/EdgeNode/internal/caches" + "github.com/iwind/TeaGo/types" + "io" + "net/http" +) + +// HTTPRequestPartialReader 分区文件读取器 +type HTTPRequestPartialReader struct { + req *HTTPRequest + offset int64 + resp *http.Response + + cacheReader caches.Reader + cacheWriter caches.Writer +} + +// NewHTTPRequestPartialReader 构建新的分区文件读取器 +// req 当前请求 +// offset 读取位置 +// reader 当前缓存读取器 +func NewHTTPRequestPartialReader(req *HTTPRequest, offset int64, reader caches.Reader) *HTTPRequestPartialReader { + return &HTTPRequestPartialReader{ + req: req, + offset: offset, + cacheReader: reader, + } +} + +// 读取内容 +func (this *HTTPRequestPartialReader) Read(p []byte) (n int, err error) { + if this.resp == nil { + _ = this.cacheReader.Close() + + this.req.RawReq.Header.Set("Range", "bytes="+types.String(this.offset)+"-") + var resp = this.req.doReverseProxy(false) + if resp == nil { + err = io.ErrUnexpectedEOF + return + } + this.resp = resp + + this.prepareCacheWriter() + } + + n, err = this.resp.Body.Read(p) + + // 写入到缓存 + if n > 0 && this.cacheWriter != nil { + _ = this.cacheWriter.WriteAt(this.offset, p[:n]) + this.offset += int64(n) + } + + return +} + +// Close 关闭读取器 +func (this *HTTPRequestPartialReader) Close() error { + if this.cacheWriter != nil { + _ = this.cacheWriter.Close() + } + + if this.resp != nil && this.resp.Body != nil { + return this.resp.Body.Close() + } + + return nil +} + +// 准备缓存写入器 +func (this *HTTPRequestPartialReader) prepareCacheWriter() { + var storage = this.req.writer.cacheStorage + if storage == nil { + return + } + + var cacheKey = this.req.cacheKey + caches.SuffixPartial + writer, err := storage.OpenWriter(cacheKey, this.cacheReader.ExpiresAt(), this.cacheReader.Status(), int(this.cacheReader.HeaderSize()), this.cacheReader.BodySize(), -1, true) + if err == nil { + this.cacheWriter = writer + } +} diff --git a/internal/nodes/http_request_reverse_proxy.go b/internal/nodes/http_request_reverse_proxy.go index 7e5650d..47f0321 100644 --- a/internal/nodes/http_request_reverse_proxy.go +++ b/internal/nodes/http_request_reverse_proxy.go @@ -21,7 +21,8 @@ import ( ) // 处理反向代理 -func (this *HTTPRequest) doReverseProxy() { +// writeToClient 读取响应并发送到客户端 +func (this *HTTPRequest) doReverseProxy(writeToClient bool) (resultResp *http.Response) { if this.reverseProxy == nil { return } @@ -33,8 +34,9 @@ func (this *HTTPRequest) doReverseProxy() { var failStatusCode int for i := 0; i < retries; i++ { - originId, lnNodeId, shouldRetry := this.doOriginRequest(failedOriginIds, failedLnNodeIds, i == 0, i == retries-1, &failStatusCode) + originId, lnNodeId, shouldRetry, resp := this.doOriginRequest(failedOriginIds, failedLnNodeIds, i == 0, i == retries-1, &failStatusCode, writeToClient) if !shouldRetry { + resultResp = resp break } if originId > 0 { @@ -44,10 +46,12 @@ func (this *HTTPRequest) doReverseProxy() { failedLnNodeIds = append(failedLnNodeIds, lnNodeId) } } + + return } // 请求源站 -func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeIds []int64, isFirstTry bool, isLastRetry bool, failStatusCode *int) (originId int64, lnNodeId int64, shouldRetry bool) { +func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeIds []int64, isFirstTry bool, isLastRetry bool, failStatusCode *int, writeToClient bool) (originId int64, lnNodeId int64, shouldRetry bool, resultResp *http.Response) { // 对URL的处理 var stripPrefix = this.reverseProxy.StripPrefix var requestURI = this.reverseProxy.RequestURI @@ -345,7 +349,9 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId if resp != nil && resp.Body != nil { defer func() { if !respBodyIsClosed { - _ = resp.Body.Close() + if writeToClient { + _ = resp.Body.Close() + } } }() } @@ -423,6 +429,11 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId return } + if !writeToClient { + resultResp = resp + return + } + // fix Content-Type if resp.Header["Content-Type"] == nil { resp.Header["Content-Type"] = []string{} diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index 405fa4a..1306d5e 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -88,6 +88,8 @@ type HTTPWriter struct { cacheReader caches.Reader cacheReaderSuffix string + + statusSent bool } // NewHTTPWriter 包装对象 @@ -844,6 +846,11 @@ func (this *HTTPWriter) SetSentHeaderBytes(sentHeaderBytes int64) { // WriteHeader 写入状态码 func (this *HTTPWriter) WriteHeader(statusCode int) { + if this.statusSent { + return + } + this.statusSent = true + if this.rawWriter != nil { this.rawWriter.WriteHeader(statusCode) }