mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-22 06:20:27 +08:00
删除Partial缓存时,同时删除区间范围相关文件
This commit is contained in:
10
internal/caches/consts.go
Normal file
10
internal/caches/consts.go
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||||
|
|
||||||
|
package caches
|
||||||
|
|
||||||
|
const (
|
||||||
|
SuffixWebP = "@GOEDGE_WEBP" // WebP后缀
|
||||||
|
SuffixCompression = "@GOEDGE_" // 压缩后缀 SuffixCompression + Encoding
|
||||||
|
SuffixMethod = "@GOEDGE_" // 请求方法后缀 SuffixMethod + RequestMethod
|
||||||
|
SuffixPartial = "@GOEDGE_partial" // 分区缓存后缀
|
||||||
|
)
|
||||||
@@ -7,7 +7,6 @@ import (
|
|||||||
"github.com/iwind/TeaGo/types"
|
"github.com/iwind/TeaGo/types"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type PartialFileReader struct {
|
type PartialFileReader struct {
|
||||||
@@ -18,19 +17,9 @@ type PartialFileReader struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewPartialFileReader(fp *os.File) *PartialFileReader {
|
func NewPartialFileReader(fp *os.File) *PartialFileReader {
|
||||||
// range path
|
|
||||||
var path = fp.Name()
|
|
||||||
var dotIndex = strings.LastIndex(path, ".")
|
|
||||||
var rangePath = ""
|
|
||||||
if dotIndex < 0 {
|
|
||||||
rangePath = path + "@ranges.cache"
|
|
||||||
} else {
|
|
||||||
rangePath = path[:dotIndex] + "@ranges" + path[dotIndex:]
|
|
||||||
}
|
|
||||||
|
|
||||||
return &PartialFileReader{
|
return &PartialFileReader{
|
||||||
FileReader: NewFileReader(fp),
|
FileReader: NewFileReader(fp),
|
||||||
rangePath: rangePath,
|
rangePath: partialRangesFilePath(fp.Name()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -297,7 +297,7 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
|
|||||||
|
|
||||||
// 增加点击量
|
// 增加点击量
|
||||||
// 1/1000采样
|
// 1/1000采样
|
||||||
if allowMemory {
|
if !isPartial && allowMemory {
|
||||||
this.increaseHit(key, hash, reader)
|
this.increaseHit(key, hash, reader)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -537,8 +537,14 @@ func (this *FileStorage) Delete(key string) error {
|
|||||||
}
|
}
|
||||||
err = os.Remove(path)
|
err = os.Remove(path)
|
||||||
if err == nil || os.IsNotExist(err) {
|
if err == nil || os.IsNotExist(err) {
|
||||||
|
// 删除Partial相关
|
||||||
|
if strings.HasSuffix(key, SuffixPartial) {
|
||||||
|
_ = os.Remove(partialRangesFilePath(path))
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -650,6 +656,12 @@ func (this *FileStorage) Purge(keys []string, urlType string) error {
|
|||||||
if err != nil && !os.IsNotExist(err) {
|
if err != nil && !os.IsNotExist(err) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 删除Partial相关
|
||||||
|
if strings.HasSuffix(key, SuffixPartial) {
|
||||||
|
_ = os.Remove(partialRangesFilePath(path))
|
||||||
|
}
|
||||||
|
|
||||||
err = this.list.Remove(hash)
|
err = this.list.Remove(hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|||||||
18
internal/caches/utils_partial.go
Normal file
18
internal/caches/utils_partial.go
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||||
|
|
||||||
|
package caches
|
||||||
|
|
||||||
|
import "strings"
|
||||||
|
|
||||||
|
// 获取 ranges 文件路径
|
||||||
|
func partialRangesFilePath(path string) string {
|
||||||
|
// ranges路径
|
||||||
|
var dotIndex = strings.LastIndex(path, ".")
|
||||||
|
var rangePath = ""
|
||||||
|
if dotIndex < 0 {
|
||||||
|
rangePath = path + "@ranges.cache"
|
||||||
|
} else {
|
||||||
|
rangePath = path[:dotIndex] + "@ranges" + path[dotIndex:]
|
||||||
|
}
|
||||||
|
return rangePath
|
||||||
|
}
|
||||||
@@ -7,7 +7,6 @@ import (
|
|||||||
"github.com/iwind/TeaGo/types"
|
"github.com/iwind/TeaGo/types"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -29,16 +28,6 @@ type PartialFileWriter struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewPartialFileWriter(rawWriter *os.File, key string, expiredAt int64, isNew bool, isPartial bool, bodyOffset int64, ranges *PartialRanges, endFunc func()) *PartialFileWriter {
|
func NewPartialFileWriter(rawWriter *os.File, key string, expiredAt int64, isNew bool, isPartial bool, bodyOffset int64, ranges *PartialRanges, endFunc func()) *PartialFileWriter {
|
||||||
var path = rawWriter.Name()
|
|
||||||
// ranges路径
|
|
||||||
var dotIndex = strings.LastIndex(path, ".")
|
|
||||||
var rangePath = ""
|
|
||||||
if dotIndex < 0 {
|
|
||||||
rangePath = path + "@ranges.cache"
|
|
||||||
} else {
|
|
||||||
rangePath = path[:dotIndex] + "@ranges" + path[dotIndex:]
|
|
||||||
}
|
|
||||||
|
|
||||||
return &PartialFileWriter{
|
return &PartialFileWriter{
|
||||||
key: key,
|
key: key,
|
||||||
rawWriter: rawWriter,
|
rawWriter: rawWriter,
|
||||||
@@ -48,7 +37,7 @@ func NewPartialFileWriter(rawWriter *os.File, key string, expiredAt int64, isNew
|
|||||||
isPartial: isPartial,
|
isPartial: isPartial,
|
||||||
bodyOffset: bodyOffset,
|
bodyOffset: bodyOffset,
|
||||||
ranges: ranges,
|
ranges: ranges,
|
||||||
rangePath: rangePath,
|
rangePath: partialRangesFilePath(rawWriter.Name()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -352,14 +352,14 @@ func (this *APIStream) handlePurgeCache(message *pb.NodeStreamMessage) error {
|
|||||||
var keys = msg.Keys
|
var keys = msg.Keys
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
keys = append(keys,
|
keys = append(keys,
|
||||||
key+cacheMethodSuffix+"HEAD",
|
key+caches.SuffixMethod+"HEAD",
|
||||||
key+webpCacheSuffix,
|
key+caches.SuffixWebP,
|
||||||
key+cachePartialSuffix,
|
key+caches.SuffixPartial,
|
||||||
)
|
)
|
||||||
// TODO 根据实际缓存的内容进行组合
|
// TODO 根据实际缓存的内容进行组合
|
||||||
for _, encoding := range compressions.AllEncodings() {
|
for _, encoding := range compressions.AllEncodings() {
|
||||||
keys = append(keys, key+compressionCacheSuffix+encoding)
|
keys = append(keys, key+caches.SuffixCompression+encoding)
|
||||||
keys = append(keys, key+webpCacheSuffix+compressionCacheSuffix+encoding)
|
keys = append(keys, key+caches.SuffixWebP+caches.SuffixCompression+encoding)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
msg.Keys = keys
|
msg.Keys = keys
|
||||||
|
|||||||
@@ -121,7 +121,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
|||||||
}
|
}
|
||||||
var method = this.Method()
|
var method = this.Method()
|
||||||
if method != http.MethodGet && method != http.MethodPost {
|
if method != http.MethodGet && method != http.MethodPost {
|
||||||
key += cacheMethodSuffix + method
|
key += caches.SuffixMethod + method
|
||||||
tags = append(tags, strings.ToLower(method))
|
tags = append(tags, strings.ToLower(method))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -142,14 +142,14 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
|||||||
|
|
||||||
var subKeys = []string{
|
var subKeys = []string{
|
||||||
key,
|
key,
|
||||||
key + cacheMethodSuffix + "HEAD",
|
key + caches.SuffixMethod + "HEAD",
|
||||||
key + webpCacheSuffix,
|
key + caches.SuffixWebP,
|
||||||
key + cachePartialSuffix,
|
key + caches.SuffixPartial,
|
||||||
}
|
}
|
||||||
// TODO 根据实际缓存的内容进行组合
|
// TODO 根据实际缓存的内容进行组合
|
||||||
for _, encoding := range compressions.AllEncodings() {
|
for _, encoding := range compressions.AllEncodings() {
|
||||||
subKeys = append(subKeys, key+compressionCacheSuffix+encoding)
|
subKeys = append(subKeys, key+caches.SuffixCompression+encoding)
|
||||||
subKeys = append(subKeys, key+webpCacheSuffix+compressionCacheSuffix+encoding)
|
subKeys = append(subKeys, key+caches.SuffixWebP+caches.SuffixCompression+encoding)
|
||||||
}
|
}
|
||||||
for _, subKey := range subKeys {
|
for _, subKey := range subKeys {
|
||||||
err := storage.Delete(subKey)
|
err := storage.Delete(subKey)
|
||||||
@@ -210,15 +210,15 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
|||||||
if ok {
|
if ok {
|
||||||
// 检查支持WebP的压缩缓存
|
// 检查支持WebP的压缩缓存
|
||||||
if webPIsEnabled {
|
if webPIsEnabled {
|
||||||
reader, _ = storage.OpenReader(key+webpCacheSuffix+compressionCacheSuffix+encoding, useStale, false)
|
reader, _ = storage.OpenReader(key+caches.SuffixWebP+caches.SuffixCompression+encoding, useStale, false)
|
||||||
if reader != nil {
|
if reader != nil {
|
||||||
tags = append(tags, "webp", encoding)
|
tags = append(tags, "webp", encoding)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 检查普通缓存
|
// 检查普通压缩缓存
|
||||||
if reader == nil {
|
if reader == nil {
|
||||||
reader, _ = storage.OpenReader(key+compressionCacheSuffix+encoding, useStale, false)
|
reader, _ = storage.OpenReader(key+caches.SuffixCompression+encoding, useStale, false)
|
||||||
if reader != nil {
|
if reader != nil {
|
||||||
tags = append(tags, encoding)
|
tags = append(tags, encoding)
|
||||||
}
|
}
|
||||||
@@ -232,9 +232,9 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
|||||||
!isHeadMethod &&
|
!isHeadMethod &&
|
||||||
reader == nil &&
|
reader == nil &&
|
||||||
webPIsEnabled {
|
webPIsEnabled {
|
||||||
reader, _ = storage.OpenReader(key+webpCacheSuffix, useStale, false)
|
reader, _ = storage.OpenReader(key+caches.SuffixWebP, useStale, false)
|
||||||
if reader != nil {
|
if reader != nil {
|
||||||
this.writer.cacheReaderSuffix = webpCacheSuffix
|
this.writer.cacheReaderSuffix = caches.SuffixWebP
|
||||||
tags = append(tags, "webp")
|
tags = append(tags, "webp")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -581,7 +581,7 @@ func (this *HTTPRequest) tryPartialReader(storage caches.StorageInterface, key s
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
pReader, pErr := storage.OpenReader(key+cachePartialSuffix, useStale, true)
|
pReader, pErr := storage.OpenReader(key+caches.SuffixPartial, useStale, true)
|
||||||
if pErr != nil {
|
if pErr != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -284,8 +284,8 @@ func (this *HTTPRequest) doReverseProxy() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 输出到客户端
|
// 输出到客户端
|
||||||
pool := this.bytePool(resp.ContentLength)
|
var pool = this.bytePool(resp.ContentLength)
|
||||||
buf := pool.Get()
|
var buf = pool.Get()
|
||||||
if shouldAutoFlush {
|
if shouldAutoFlush {
|
||||||
for {
|
for {
|
||||||
n, readErr := resp.Body.Read(buf)
|
n, readErr := resp.Body.Read(buf)
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
var contentRangeRegexp = regexp.MustCompile(`^bytes (\d+)-(\d+)/`)
|
var contentRangeRegexp = regexp.MustCompile(`^bytes (\d+)-(\d+)/(\d+|\*)`)
|
||||||
|
|
||||||
// 分解Range
|
// 分解Range
|
||||||
func httpRequestParseRangeHeader(rangeValue string) (result []rangeutils.Range, ok bool) {
|
func httpRequestParseRangeHeader(rangeValue string) (result []rangeutils.Range, ok bool) {
|
||||||
@@ -125,12 +125,18 @@ func httpRequestReadRange(reader io.Reader, buf []byte, start int64, end int64,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 分解Content-Range
|
// 分解Content-Range
|
||||||
func httpRequestParseContentRangeHeader(contentRange string) (start int64) {
|
func httpRequestParseContentRangeHeader(contentRange string) (start int64, total int64) {
|
||||||
var matches = contentRangeRegexp.FindStringSubmatch(contentRange)
|
var matches = contentRangeRegexp.FindStringSubmatch(contentRange)
|
||||||
if len(matches) < 3 {
|
if len(matches) < 4 {
|
||||||
return -1
|
return -1, -1
|
||||||
}
|
}
|
||||||
return types.Int64(matches[1])
|
|
||||||
|
start = types.Int64(matches[1])
|
||||||
|
var sizeString = matches[3]
|
||||||
|
if sizeString != "*" {
|
||||||
|
total = types.Int64(sizeString)
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// 生成boundary
|
// 生成boundary
|
||||||
|
|||||||
@@ -35,19 +35,9 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
// webp相关配置
|
|
||||||
const webpCacheSuffix = "@GOEDGE_WEBP"
|
|
||||||
|
|
||||||
var webpMaxBufferSize int64 = 1_000_000_000
|
var webpMaxBufferSize int64 = 1_000_000_000
|
||||||
var webpTotalBufferSize int64 = 0
|
var webpTotalBufferSize int64 = 0
|
||||||
|
|
||||||
// 压缩相关配置
|
|
||||||
const compressionCacheSuffix = "@GOEDGE_"
|
|
||||||
|
|
||||||
// 缓存相关配置
|
|
||||||
const cacheMethodSuffix = "@GOEDGE_"
|
|
||||||
const cachePartialSuffix = "@GOEDGE_partial"
|
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
var systemMemory = utils.SystemMemoryGB() / 8
|
var systemMemory = utils.SystemMemoryGB() / 8
|
||||||
if systemMemory > 0 {
|
if systemMemory > 0 {
|
||||||
@@ -277,7 +267,7 @@ func (this *HTTPWriter) PrepareCache(resp *http.Response, size int64) {
|
|||||||
var expiredAt = utils.UnixTime() + life
|
var expiredAt = utils.UnixTime() + life
|
||||||
var cacheKey = this.req.cacheKey
|
var cacheKey = this.req.cacheKey
|
||||||
if this.isPartial {
|
if this.isPartial {
|
||||||
cacheKey += cachePartialSuffix
|
cacheKey += caches.SuffixPartial
|
||||||
}
|
}
|
||||||
cacheWriter, err := storage.OpenWriter(cacheKey, expiredAt, this.StatusCode(), size, this.isPartial)
|
cacheWriter, err := storage.OpenWriter(cacheKey, expiredAt, this.StatusCode(), size, this.isPartial)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -308,10 +298,17 @@ func (this *HTTPWriter) PrepareCache(resp *http.Response, size int64) {
|
|||||||
// content-range
|
// content-range
|
||||||
var contentRange = this.GetHeader("Content-Range")
|
var contentRange = this.GetHeader("Content-Range")
|
||||||
if len(contentRange) > 0 {
|
if len(contentRange) > 0 {
|
||||||
var start = httpRequestParseContentRangeHeader(contentRange)
|
start, total := httpRequestParseContentRangeHeader(contentRange)
|
||||||
if start < 0 {
|
if start < 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if total > 0 {
|
||||||
|
partialWriter, ok := cacheWriter.(*caches.PartialFileWriter)
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
partialWriter.SetBodyLength(total)
|
||||||
|
}
|
||||||
var filterReader = readers.NewFilterReaderCloser(resp.Body)
|
var filterReader = readers.NewFilterReaderCloser(resp.Body)
|
||||||
this.cacheIsFinished = true
|
this.cacheIsFinished = true
|
||||||
var hasError = false
|
var hasError = false
|
||||||
@@ -555,7 +552,7 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) {
|
|||||||
cacheKey += this.cacheReaderSuffix
|
cacheKey += this.cacheReaderSuffix
|
||||||
}
|
}
|
||||||
|
|
||||||
compressionCacheWriter, err := this.cacheStorage.OpenWriter(cacheKey+compressionCacheSuffix+compressionEncoding, expiredAt, this.StatusCode(), -1, false)
|
compressionCacheWriter, err := this.cacheStorage.OpenWriter(cacheKey+caches.SuffixCompression+compressionEncoding, expiredAt, this.StatusCode(), -1, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -761,10 +758,10 @@ func (this *HTTPWriter) Close() {
|
|||||||
var expiredAt int64 = 0
|
var expiredAt int64 = 0
|
||||||
|
|
||||||
if this.cacheReader != nil {
|
if this.cacheReader != nil {
|
||||||
cacheKey = this.req.cacheKey + webpCacheSuffix
|
cacheKey = this.req.cacheKey + caches.SuffixWebP
|
||||||
expiredAt = this.cacheReader.ExpiresAt()
|
expiredAt = this.cacheReader.ExpiresAt()
|
||||||
} else if this.cacheWriter != nil {
|
} else if this.cacheWriter != nil {
|
||||||
cacheKey = this.cacheWriter.Key() + webpCacheSuffix
|
cacheKey = this.cacheWriter.Key() + caches.SuffixWebP
|
||||||
expiredAt = this.cacheWriter.ExpiredAt()
|
expiredAt = this.cacheWriter.ExpiredAt()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user