diff --git a/internal/caches/consts.go b/internal/caches/consts.go new file mode 100644 index 0000000..37086b6 --- /dev/null +++ b/internal/caches/consts.go @@ -0,0 +1,10 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package caches + +const ( + SuffixWebP = "@GOEDGE_WEBP" // WebP后缀 + SuffixCompression = "@GOEDGE_" // 压缩后缀 SuffixCompression + Encoding + SuffixMethod = "@GOEDGE_" // 请求方法后缀 SuffixMethod + RequestMethod + SuffixPartial = "@GOEDGE_partial" // 分区缓存后缀 +) diff --git a/internal/caches/reader_partial_file.go b/internal/caches/reader_partial_file.go index 70be30f..541b71d 100644 --- a/internal/caches/reader_partial_file.go +++ b/internal/caches/reader_partial_file.go @@ -7,7 +7,6 @@ import ( "github.com/iwind/TeaGo/types" "io" "os" - "strings" ) type PartialFileReader struct { @@ -18,19 +17,9 @@ type PartialFileReader struct { } func NewPartialFileReader(fp *os.File) *PartialFileReader { - // range path - var path = fp.Name() - var dotIndex = strings.LastIndex(path, ".") - var rangePath = "" - if dotIndex < 0 { - rangePath = path + "@ranges.cache" - } else { - rangePath = path[:dotIndex] + "@ranges" + path[dotIndex:] - } - return &PartialFileReader{ FileReader: NewFileReader(fp), - rangePath: rangePath, + rangePath: partialRangesFilePath(fp.Name()), } } diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 01f988f..d3d793d 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -297,7 +297,7 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool, // 增加点击量 // 1/1000采样 - if allowMemory { + if !isPartial && allowMemory { this.increaseHit(key, hash, reader) } @@ -537,8 +537,14 @@ func (this *FileStorage) Delete(key string) error { } err = os.Remove(path) if err == nil || os.IsNotExist(err) { + // 删除Partial相关 + if strings.HasSuffix(key, SuffixPartial) { + _ = os.Remove(partialRangesFilePath(path)) + } + return nil } + return err } @@ -650,6 +656,12 @@ func (this *FileStorage) Purge(keys []string, urlType string) error { if err != nil && !os.IsNotExist(err) { return err } + + // 删除Partial相关 + if strings.HasSuffix(key, SuffixPartial) { + _ = os.Remove(partialRangesFilePath(path)) + } + err = this.list.Remove(hash) if err != nil { return err diff --git a/internal/caches/utils_partial.go b/internal/caches/utils_partial.go new file mode 100644 index 0000000..7ae8f99 --- /dev/null +++ b/internal/caches/utils_partial.go @@ -0,0 +1,18 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package caches + +import "strings" + +// 获取 ranges 文件路径 +func partialRangesFilePath(path string) string { + // ranges路径 + var dotIndex = strings.LastIndex(path, ".") + var rangePath = "" + if dotIndex < 0 { + rangePath = path + "@ranges.cache" + } else { + rangePath = path[:dotIndex] + "@ranges" + path[dotIndex:] + } + return rangePath +} diff --git a/internal/caches/writer_partial_file.go b/internal/caches/writer_partial_file.go index 2262151..0429212 100644 --- a/internal/caches/writer_partial_file.go +++ b/internal/caches/writer_partial_file.go @@ -7,7 +7,6 @@ import ( "github.com/iwind/TeaGo/types" "io" "os" - "strings" "sync" ) @@ -29,16 +28,6 @@ type PartialFileWriter struct { } func NewPartialFileWriter(rawWriter *os.File, key string, expiredAt int64, isNew bool, isPartial bool, bodyOffset int64, ranges *PartialRanges, endFunc func()) *PartialFileWriter { - var path = rawWriter.Name() - // ranges路径 - var dotIndex = strings.LastIndex(path, ".") - var rangePath = "" - if dotIndex < 0 { - rangePath = path + "@ranges.cache" - } else { - rangePath = path[:dotIndex] + "@ranges" + path[dotIndex:] - } - return &PartialFileWriter{ key: key, rawWriter: rawWriter, @@ -48,7 +37,7 @@ func NewPartialFileWriter(rawWriter *os.File, key string, expiredAt int64, isNew isPartial: isPartial, bodyOffset: bodyOffset, ranges: ranges, - rangePath: rangePath, + rangePath: partialRangesFilePath(rawWriter.Name()), } } diff --git a/internal/nodes/api_stream.go b/internal/nodes/api_stream.go index 9c51db3..069da6a 100644 --- a/internal/nodes/api_stream.go +++ b/internal/nodes/api_stream.go @@ -352,14 +352,14 @@ func (this *APIStream) handlePurgeCache(message *pb.NodeStreamMessage) error { var keys = msg.Keys for _, key := range keys { keys = append(keys, - key+cacheMethodSuffix+"HEAD", - key+webpCacheSuffix, - key+cachePartialSuffix, + key+caches.SuffixMethod+"HEAD", + key+caches.SuffixWebP, + key+caches.SuffixPartial, ) // TODO 根据实际缓存的内容进行组合 for _, encoding := range compressions.AllEncodings() { - keys = append(keys, key+compressionCacheSuffix+encoding) - keys = append(keys, key+webpCacheSuffix+compressionCacheSuffix+encoding) + keys = append(keys, key+caches.SuffixCompression+encoding) + keys = append(keys, key+caches.SuffixWebP+caches.SuffixCompression+encoding) } } msg.Keys = keys diff --git a/internal/nodes/http_request_cache.go b/internal/nodes/http_request_cache.go index 2da9018..c070ed9 100644 --- a/internal/nodes/http_request_cache.go +++ b/internal/nodes/http_request_cache.go @@ -121,7 +121,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { } var method = this.Method() if method != http.MethodGet && method != http.MethodPost { - key += cacheMethodSuffix + method + key += caches.SuffixMethod + method tags = append(tags, strings.ToLower(method)) } @@ -142,14 +142,14 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { var subKeys = []string{ key, - key + cacheMethodSuffix + "HEAD", - key + webpCacheSuffix, - key + cachePartialSuffix, + key + caches.SuffixMethod + "HEAD", + key + caches.SuffixWebP, + key + caches.SuffixPartial, } // TODO 根据实际缓存的内容进行组合 for _, encoding := range compressions.AllEncodings() { - subKeys = append(subKeys, key+compressionCacheSuffix+encoding) - subKeys = append(subKeys, key+webpCacheSuffix+compressionCacheSuffix+encoding) + subKeys = append(subKeys, key+caches.SuffixCompression+encoding) + subKeys = append(subKeys, key+caches.SuffixWebP+caches.SuffixCompression+encoding) } for _, subKey := range subKeys { err := storage.Delete(subKey) @@ -210,15 +210,15 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { if ok { // 检查支持WebP的压缩缓存 if webPIsEnabled { - reader, _ = storage.OpenReader(key+webpCacheSuffix+compressionCacheSuffix+encoding, useStale, false) + reader, _ = storage.OpenReader(key+caches.SuffixWebP+caches.SuffixCompression+encoding, useStale, false) if reader != nil { tags = append(tags, "webp", encoding) } } - // 检查普通缓存 + // 检查普通压缩缓存 if reader == nil { - reader, _ = storage.OpenReader(key+compressionCacheSuffix+encoding, useStale, false) + reader, _ = storage.OpenReader(key+caches.SuffixCompression+encoding, useStale, false) if reader != nil { tags = append(tags, encoding) } @@ -232,9 +232,9 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { !isHeadMethod && reader == nil && webPIsEnabled { - reader, _ = storage.OpenReader(key+webpCacheSuffix, useStale, false) + reader, _ = storage.OpenReader(key+caches.SuffixWebP, useStale, false) if reader != nil { - this.writer.cacheReaderSuffix = webpCacheSuffix + this.writer.cacheReaderSuffix = caches.SuffixWebP tags = append(tags, "webp") } } @@ -581,7 +581,7 @@ func (this *HTTPRequest) tryPartialReader(storage caches.StorageInterface, key s return nil } - pReader, pErr := storage.OpenReader(key+cachePartialSuffix, useStale, true) + pReader, pErr := storage.OpenReader(key+caches.SuffixPartial, useStale, true) if pErr != nil { return nil } diff --git a/internal/nodes/http_request_reverse_proxy.go b/internal/nodes/http_request_reverse_proxy.go index ccf0d56..c4d7f38 100644 --- a/internal/nodes/http_request_reverse_proxy.go +++ b/internal/nodes/http_request_reverse_proxy.go @@ -284,8 +284,8 @@ func (this *HTTPRequest) doReverseProxy() { } // 输出到客户端 - pool := this.bytePool(resp.ContentLength) - buf := pool.Get() + var pool = this.bytePool(resp.ContentLength) + var buf = pool.Get() if shouldAutoFlush { for { n, readErr := resp.Body.Read(buf) diff --git a/internal/nodes/http_request_utils.go b/internal/nodes/http_request_utils.go index 8b03ae3..7ab7baf 100644 --- a/internal/nodes/http_request_utils.go +++ b/internal/nodes/http_request_utils.go @@ -15,7 +15,7 @@ import ( "sync/atomic" ) -var contentRangeRegexp = regexp.MustCompile(`^bytes (\d+)-(\d+)/`) +var contentRangeRegexp = regexp.MustCompile(`^bytes (\d+)-(\d+)/(\d+|\*)`) // 分解Range func httpRequestParseRangeHeader(rangeValue string) (result []rangeutils.Range, ok bool) { @@ -125,12 +125,18 @@ func httpRequestReadRange(reader io.Reader, buf []byte, start int64, end int64, } // 分解Content-Range -func httpRequestParseContentRangeHeader(contentRange string) (start int64) { +func httpRequestParseContentRangeHeader(contentRange string) (start int64, total int64) { var matches = contentRangeRegexp.FindStringSubmatch(contentRange) - if len(matches) < 3 { - return -1 + if len(matches) < 4 { + return -1, -1 } - return types.Int64(matches[1]) + + start = types.Int64(matches[1]) + var sizeString = matches[3] + if sizeString != "*" { + total = types.Int64(sizeString) + } + return } // 生成boundary diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index 2757cb4..19bdc82 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -35,19 +35,9 @@ import ( "sync/atomic" ) -// webp相关配置 -const webpCacheSuffix = "@GOEDGE_WEBP" - var webpMaxBufferSize int64 = 1_000_000_000 var webpTotalBufferSize int64 = 0 -// 压缩相关配置 -const compressionCacheSuffix = "@GOEDGE_" - -// 缓存相关配置 -const cacheMethodSuffix = "@GOEDGE_" -const cachePartialSuffix = "@GOEDGE_partial" - func init() { var systemMemory = utils.SystemMemoryGB() / 8 if systemMemory > 0 { @@ -277,7 +267,7 @@ func (this *HTTPWriter) PrepareCache(resp *http.Response, size int64) { var expiredAt = utils.UnixTime() + life var cacheKey = this.req.cacheKey if this.isPartial { - cacheKey += cachePartialSuffix + cacheKey += caches.SuffixPartial } cacheWriter, err := storage.OpenWriter(cacheKey, expiredAt, this.StatusCode(), size, this.isPartial) if err != nil { @@ -308,10 +298,17 @@ func (this *HTTPWriter) PrepareCache(resp *http.Response, size int64) { // content-range var contentRange = this.GetHeader("Content-Range") if len(contentRange) > 0 { - var start = httpRequestParseContentRangeHeader(contentRange) + start, total := httpRequestParseContentRangeHeader(contentRange) if start < 0 { return } + if total > 0 { + partialWriter, ok := cacheWriter.(*caches.PartialFileWriter) + if !ok { + return + } + partialWriter.SetBodyLength(total) + } var filterReader = readers.NewFilterReaderCloser(resp.Body) this.cacheIsFinished = true var hasError = false @@ -555,7 +552,7 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) { cacheKey += this.cacheReaderSuffix } - compressionCacheWriter, err := this.cacheStorage.OpenWriter(cacheKey+compressionCacheSuffix+compressionEncoding, expiredAt, this.StatusCode(), -1, false) + compressionCacheWriter, err := this.cacheStorage.OpenWriter(cacheKey+caches.SuffixCompression+compressionEncoding, expiredAt, this.StatusCode(), -1, false) if err != nil { return } @@ -761,10 +758,10 @@ func (this *HTTPWriter) Close() { var expiredAt int64 = 0 if this.cacheReader != nil { - cacheKey = this.req.cacheKey + webpCacheSuffix + cacheKey = this.req.cacheKey + caches.SuffixWebP expiredAt = this.cacheReader.ExpiresAt() } else if this.cacheWriter != nil { - cacheKey = this.cacheWriter.Key() + webpCacheSuffix + cacheKey = this.cacheWriter.Key() + caches.SuffixWebP expiredAt = this.cacheWriter.ExpiredAt() }