mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-06 18:10:26 +08:00
支持缓存压缩后的内容
This commit is contained in:
@@ -18,6 +18,11 @@ const (
|
|||||||
|
|
||||||
var ErrNotSupportedContentEncoding = errors.New("not supported content encoding")
|
var ErrNotSupportedContentEncoding = errors.New("not supported content encoding")
|
||||||
|
|
||||||
|
// AllEncodings 当前支持的所有编码
|
||||||
|
func AllEncodings() []ContentEncoding {
|
||||||
|
return []ContentEncoding{ContentEncodingBr, ContentEncodingGzip, ContentEncodingDeflate}
|
||||||
|
}
|
||||||
|
|
||||||
// NewReader 获取Reader
|
// NewReader 获取Reader
|
||||||
func NewReader(reader io.Reader, contentEncoding ContentEncoding) (Reader, error) {
|
func NewReader(reader io.Reader, contentEncoding ContentEncoding) (Reader, error) {
|
||||||
switch contentEncoding {
|
switch contentEncoding {
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/caches"
|
"github.com/TeaOSLab/EdgeNode/internal/caches"
|
||||||
|
"github.com/TeaOSLab/EdgeNode/internal/compressions"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/configs"
|
"github.com/TeaOSLab/EdgeNode/internal/configs"
|
||||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/errors"
|
"github.com/TeaOSLab/EdgeNode/internal/errors"
|
||||||
@@ -350,7 +351,12 @@ func (this *APIStream) handlePurgeCache(message *pb.NodeStreamMessage) error {
|
|||||||
if msg.Type == "file" {
|
if msg.Type == "file" {
|
||||||
var keys = msg.Keys
|
var keys = msg.Keys
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
keys = append(keys, key+webpSuffix)
|
keys = append(keys, key+webpCacheSuffix)
|
||||||
|
// TODO 根据实际缓存的内容进行组合
|
||||||
|
for _, encoding := range compressions.AllEncodings() {
|
||||||
|
keys = append(keys, key+compressionCacheSuffix+encoding)
|
||||||
|
keys = append(keys, key+webpCacheSuffix+compressionCacheSuffix+encoding)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
msg.Keys = keys
|
msg.Keys = keys
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/caches"
|
"github.com/TeaOSLab/EdgeNode/internal/caches"
|
||||||
|
"github.com/TeaOSLab/EdgeNode/internal/compressions"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/rpc"
|
"github.com/TeaOSLab/EdgeNode/internal/rpc"
|
||||||
@@ -122,16 +123,27 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
|||||||
this.cacheRef = nil
|
this.cacheRef = nil
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
this.writer.cacheStorage = storage
|
||||||
|
|
||||||
// 判断是否在Purge
|
// 判断是否在Purge
|
||||||
if this.web.Cache.PurgeIsOn && strings.ToUpper(this.RawReq.Method) == "PURGE" && this.RawReq.Header.Get("X-Edge-Purge-Key") == this.web.Cache.PurgeKey {
|
if this.web.Cache.PurgeIsOn && strings.ToUpper(this.RawReq.Method) == "PURGE" && this.RawReq.Header.Get("X-Edge-Purge-Key") == this.web.Cache.PurgeKey {
|
||||||
this.varMapping["cache.status"] = "PURGE"
|
this.varMapping["cache.status"] = "PURGE"
|
||||||
|
|
||||||
err := storage.Delete(key)
|
var subKeys = []string{key}
|
||||||
if err != nil {
|
// TODO 根据实际缓存的内容进行组合
|
||||||
remotelogs.Error("HTTP_REQUEST_CACHE", "purge failed: "+err.Error())
|
for _, encoding := range compressions.AllEncodings() {
|
||||||
|
subKeys = append(subKeys, key+compressionCacheSuffix+encoding)
|
||||||
|
subKeys = append(subKeys, key+webpCacheSuffix+compressionCacheSuffix+encoding)
|
||||||
|
}
|
||||||
|
for _, subKey := range subKeys {
|
||||||
|
err := storage.Delete(subKey)
|
||||||
|
if err != nil {
|
||||||
|
remotelogs.Error("HTTP_REQUEST_CACHE", "purge failed: "+err.Error())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 通过API节点清除别节点上的的Key
|
||||||
|
// TODO 改为队列,不需要每个请求都使用goroutine
|
||||||
goman.New(func() {
|
goman.New(func() {
|
||||||
rpcClient, err := rpc.SharedRPC()
|
rpcClient, err := rpc.SharedRPC()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@@ -160,15 +172,46 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
|||||||
var reader caches.Reader
|
var reader caches.Reader
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
// 是否优先检查WebP
|
// 检查是否支持WebP
|
||||||
var isWebP = false
|
var tags = []string{}
|
||||||
|
var webPIsEnabled = false
|
||||||
if this.web.WebP != nil &&
|
if this.web.WebP != nil &&
|
||||||
this.web.WebP.IsOn &&
|
this.web.WebP.IsOn &&
|
||||||
this.web.WebP.MatchRequest(filepath.Ext(this.Path()), this.Format) &&
|
this.web.WebP.MatchRequest(filepath.Ext(this.Path()), this.Format) &&
|
||||||
this.web.WebP.MatchAccept(this.requestHeader("Accept")) {
|
this.web.WebP.MatchAccept(this.RawReq.Header.Get("Accept")) {
|
||||||
reader, _ = storage.OpenReader(key+webpSuffix, useStale)
|
webPIsEnabled = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// 检查压缩缓存
|
||||||
|
if reader == nil {
|
||||||
|
if this.web.Compression != nil && this.web.Compression.IsOn {
|
||||||
|
_, encoding, ok := this.web.Compression.MatchAcceptEncoding(this.RawReq.Header.Get("Accept-Encoding"))
|
||||||
|
if ok {
|
||||||
|
// 检查支持WebP的压缩缓存
|
||||||
|
if webPIsEnabled {
|
||||||
|
reader, _ = storage.OpenReader(key+webpCacheSuffix+compressionCacheSuffix+encoding, useStale)
|
||||||
|
if reader != nil {
|
||||||
|
tags = append(tags, "webp", encoding)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 检查普通缓存
|
||||||
|
if reader == nil {
|
||||||
|
reader, _ = storage.OpenReader(key+compressionCacheSuffix+encoding, useStale)
|
||||||
|
if reader != nil {
|
||||||
|
tags = append(tags, encoding)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 检查WebP
|
||||||
|
if reader == nil && webPIsEnabled {
|
||||||
|
reader, _ = storage.OpenReader(key+webpCacheSuffix, useStale)
|
||||||
if reader != nil {
|
if reader != nil {
|
||||||
isWebP = true
|
this.writer.cacheReaderSuffix = webpCacheSuffix
|
||||||
|
tags = append(tags, "webp")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -265,8 +308,8 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
|||||||
var eTag = ""
|
var eTag = ""
|
||||||
var lastModifiedAt = reader.LastModified()
|
var lastModifiedAt = reader.LastModified()
|
||||||
if lastModifiedAt > 0 {
|
if lastModifiedAt > 0 {
|
||||||
if isWebP {
|
if len(tags) > 0 {
|
||||||
eTag = "\"" + strconv.FormatInt(lastModifiedAt, 10) + "_webp" + "\""
|
eTag = "\"" + strconv.FormatInt(lastModifiedAt, 10) + "_" + strings.Join(tags, "_") + "\""
|
||||||
} else {
|
} else {
|
||||||
eTag = "\"" + strconv.FormatInt(lastModifiedAt, 10) + "\""
|
eTag = "\"" + strconv.FormatInt(lastModifiedAt, 10) + "\""
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -156,6 +156,9 @@ func httpRequestNextId() string {
|
|||||||
|
|
||||||
// 检查是否可以接受某个编码
|
// 检查是否可以接受某个编码
|
||||||
func httpAcceptEncoding(acceptEncodings string, encoding string) bool {
|
func httpAcceptEncoding(acceptEncodings string, encoding string) bool {
|
||||||
|
if len(acceptEncodings) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
var pieces = strings.Split(acceptEncodings, ",")
|
var pieces = strings.Split(acceptEncodings, ",")
|
||||||
for _, piece := range pieces {
|
for _, piece := range pieces {
|
||||||
var qualityIndex = strings.Index(piece, ";")
|
var qualityIndex = strings.Index(piece, ";")
|
||||||
@@ -169,3 +172,20 @@ func httpAcceptEncoding(acceptEncodings string, encoding string) bool {
|
|||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 分隔编码
|
||||||
|
func httpAcceptEncodings(acceptEncodings string) (encodings []string) {
|
||||||
|
if len(acceptEncodings) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var pieces = strings.Split(acceptEncodings, ",")
|
||||||
|
for _, piece := range pieces {
|
||||||
|
var qualityIndex = strings.Index(piece, ";")
|
||||||
|
if qualityIndex >= 0 {
|
||||||
|
piece = piece[:qualityIndex]
|
||||||
|
}
|
||||||
|
|
||||||
|
encodings = append(encodings, strings.TrimSpace(piece))
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|||||||
@@ -35,11 +35,14 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// webp相关配置
|
// webp相关配置
|
||||||
const webpSuffix = "@GOEDGE_WEBP"
|
const webpCacheSuffix = "@GOEDGE_WEBP"
|
||||||
|
|
||||||
var webpMaxBufferSize int64 = 1_000_000_000
|
var webpMaxBufferSize int64 = 1_000_000_000
|
||||||
var webpTotalBufferSize int64 = 0
|
var webpTotalBufferSize int64 = 0
|
||||||
|
|
||||||
|
// 压缩相关配置
|
||||||
|
const compressionCacheSuffix = "@GOEDGE_"
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
var systemMemory = utils.SystemMemoryGB() / 8
|
var systemMemory = utils.SystemMemoryGB() / 8
|
||||||
if systemMemory > 0 {
|
if systemMemory > 0 {
|
||||||
@@ -66,17 +69,24 @@ type HTTPWriter struct {
|
|||||||
isOk bool // 是否完全成功
|
isOk bool // 是否完全成功
|
||||||
isFinished bool // 是否已完成
|
isFinished bool // 是否已完成
|
||||||
|
|
||||||
|
// Partial
|
||||||
|
isPartial bool
|
||||||
|
|
||||||
// WebP
|
// WebP
|
||||||
webpIsEncoding bool
|
webpIsEncoding bool
|
||||||
webpOriginContentType string
|
webpOriginContentType string
|
||||||
|
|
||||||
// Compression
|
// Compression
|
||||||
compressionConfig *serverconfigs.HTTPCompressionConfig
|
compressionConfig *serverconfigs.HTTPCompressionConfig
|
||||||
|
compressionCacheWriter caches.Writer
|
||||||
|
|
||||||
// Cache
|
// Cache
|
||||||
cacheStorage caches.StorageInterface
|
cacheStorage caches.StorageInterface
|
||||||
cacheWriter caches.Writer
|
cacheWriter caches.Writer
|
||||||
cacheIsFinished bool
|
cacheIsFinished bool
|
||||||
|
|
||||||
|
cacheReader caches.Reader
|
||||||
|
cacheReaderSuffix string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewHTTPWriter 包装对象
|
// NewHTTPWriter 包装对象
|
||||||
@@ -95,13 +105,22 @@ func (this *HTTPWriter) Prepare(resp *http.Response, size int64, status int, ena
|
|||||||
this.size = size
|
this.size = size
|
||||||
this.statusCode = status
|
this.statusCode = status
|
||||||
|
|
||||||
if resp != nil {
|
this.isPartial = status == http.StatusPartialContent
|
||||||
|
|
||||||
|
if resp != nil && resp.Body != nil {
|
||||||
|
cacheReader, ok := resp.Body.(caches.Reader)
|
||||||
|
if ok {
|
||||||
|
this.cacheReader = cacheReader
|
||||||
|
}
|
||||||
|
|
||||||
this.rawReader = resp.Body
|
this.rawReader = resp.Body
|
||||||
|
|
||||||
if enableCache {
|
if enableCache {
|
||||||
this.PrepareCache(resp, size)
|
this.PrepareCache(resp, size)
|
||||||
}
|
}
|
||||||
this.PrepareWebP(resp, size)
|
if !this.isPartial {
|
||||||
|
this.PrepareWebP(resp, size)
|
||||||
|
}
|
||||||
this.PrepareCompression(resp, size)
|
this.PrepareCompression(resp, size)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -302,7 +321,7 @@ func (this *HTTPWriter) PrepareWebP(resp *http.Response, size int64) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var contentEncoding = resp.Header.Get("Content-Encoding")
|
var contentEncoding = this.Header().Get("Content-Encoding")
|
||||||
switch contentEncoding {
|
switch contentEncoding {
|
||||||
case "gzip", "deflate", "br":
|
case "gzip", "deflate", "br":
|
||||||
reader, err := compressions.NewReader(resp.Body, contentEncoding)
|
reader, err := compressions.NewReader(resp.Body, contentEncoding)
|
||||||
@@ -389,19 +408,70 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) {
|
|||||||
resp.Body = reader
|
resp.Body = reader
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 需要放在compression cache writer之前
|
||||||
|
var header = this.rawWriter.Header()
|
||||||
|
header.Set("Content-Encoding", compressionEncoding)
|
||||||
|
header.Set("Vary", "Accept-Encoding")
|
||||||
|
header.Del("Content-Length")
|
||||||
|
|
||||||
|
// compression cache writer
|
||||||
|
if !this.isPartial && this.cacheStorage != nil && (this.cacheReader != nil || this.cacheWriter != nil) && !this.webpIsEncoding {
|
||||||
|
var cacheKey = ""
|
||||||
|
var expiredAt int64 = 0
|
||||||
|
|
||||||
|
if this.cacheReader != nil {
|
||||||
|
cacheKey = this.req.cacheKey
|
||||||
|
expiredAt = this.cacheReader.ExpiresAt()
|
||||||
|
} else if this.cacheWriter != nil {
|
||||||
|
cacheKey = this.cacheWriter.Key()
|
||||||
|
expiredAt = this.cacheWriter.ExpiredAt()
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(this.cacheReaderSuffix) > 0 {
|
||||||
|
cacheKey += this.cacheReaderSuffix
|
||||||
|
}
|
||||||
|
|
||||||
|
compressionCacheWriter, err := this.cacheStorage.OpenWriter(cacheKey+compressionCacheSuffix+compressionEncoding, expiredAt, this.StatusCode(), -1, false)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 写入Header
|
||||||
|
for k, v := range this.Header() {
|
||||||
|
for _, v1 := range v {
|
||||||
|
_, err = compressionCacheWriter.WriteHeader([]byte(k + ":" + v1 + "\n"))
|
||||||
|
if err != nil {
|
||||||
|
remotelogs.Error("HTTP_WRITER", "write compression cache failed: "+err.Error())
|
||||||
|
_ = compressionCacheWriter.Discard()
|
||||||
|
compressionCacheWriter = nil
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if compressionCacheWriter != nil {
|
||||||
|
this.compressionCacheWriter = compressionCacheWriter
|
||||||
|
var teeWriter = writers.NewTeeWriterCloser(this.writer, compressionCacheWriter)
|
||||||
|
teeWriter.OnFail(func(err error) {
|
||||||
|
_ = compressionCacheWriter.Discard()
|
||||||
|
this.compressionCacheWriter = nil
|
||||||
|
})
|
||||||
|
this.writer = teeWriter
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// compression writer
|
// compression writer
|
||||||
var err error = nil
|
var err error = nil
|
||||||
compressionWriter, err := compressions.NewWriter(this.writer, compressionType, int(this.compressionConfig.Level))
|
compressionWriter, err := compressions.NewWriter(this.writer, compressionType, int(this.compressionConfig.Level))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
remotelogs.Error("HTTP_WRITER", err.Error())
|
remotelogs.Error("HTTP_WRITER", err.Error())
|
||||||
|
header.Del("Content-Encoding")
|
||||||
|
if this.compressionCacheWriter != nil {
|
||||||
|
_ = this.compressionCacheWriter.Discard()
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
this.writer = compressionWriter
|
this.writer = compressionWriter
|
||||||
|
|
||||||
header := this.rawWriter.Header()
|
|
||||||
header.Set("Content-Encoding", compressionEncoding)
|
|
||||||
header.Set("Vary", "Accept-Encoding")
|
|
||||||
header.Del("Content-Length")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetCompression 设置内容压缩配置
|
// SetCompression 设置内容压缩配置
|
||||||
@@ -557,13 +627,26 @@ func (this *HTTPWriter) Close() {
|
|||||||
var webpCacheWriter caches.Writer
|
var webpCacheWriter caches.Writer
|
||||||
|
|
||||||
// 准备WebP Cache
|
// 准备WebP Cache
|
||||||
if this.cacheWriter != nil {
|
if this.cacheReader != nil || this.cacheWriter != nil {
|
||||||
var cacheKey = this.cacheWriter.Key() + webpSuffix
|
var cacheKey = ""
|
||||||
|
var expiredAt int64 = 0
|
||||||
|
|
||||||
webpCacheWriter, _ = this.cacheStorage.OpenWriter(cacheKey, this.cacheWriter.ExpiredAt(), this.StatusCode(), -1, false)
|
if this.cacheReader != nil {
|
||||||
|
cacheKey = this.req.cacheKey + webpCacheSuffix
|
||||||
|
expiredAt = this.cacheReader.ExpiresAt()
|
||||||
|
} else if this.cacheWriter != nil {
|
||||||
|
cacheKey = this.cacheWriter.Key() + webpCacheSuffix
|
||||||
|
expiredAt = this.cacheWriter.ExpiredAt()
|
||||||
|
}
|
||||||
|
|
||||||
|
webpCacheWriter, _ = this.cacheStorage.OpenWriter(cacheKey, expiredAt, this.StatusCode(), -1, false)
|
||||||
if webpCacheWriter != nil {
|
if webpCacheWriter != nil {
|
||||||
// 写入Header
|
// 写入Header
|
||||||
for k, v := range this.Header() {
|
for k, v := range this.Header() {
|
||||||
|
// 这里是原始的数据,不需要内容编码
|
||||||
|
if k == "Content-Encoding" || k == "Transfer-Encoding" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
for _, v1 := range v {
|
for _, v1 := range v {
|
||||||
_, err := webpCacheWriter.WriteHeader([]byte(k + ":" + v1 + "\n"))
|
_, err := webpCacheWriter.WriteHeader([]byte(k + ":" + v1 + "\n"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -711,6 +794,27 @@ func (this *HTTPWriter) Close() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if this.compressionCacheWriter != nil {
|
||||||
|
if this.isOk {
|
||||||
|
err := this.compressionCacheWriter.Close()
|
||||||
|
if err == nil {
|
||||||
|
var expiredAt = this.compressionCacheWriter.ExpiredAt()
|
||||||
|
this.cacheStorage.AddToList(&caches.Item{
|
||||||
|
Type: this.compressionCacheWriter.ItemType(),
|
||||||
|
Key: this.compressionCacheWriter.Key(),
|
||||||
|
ExpiredAt: expiredAt,
|
||||||
|
StaleAt: expiredAt + int64(this.calculateStaleLife()),
|
||||||
|
HeaderSize: this.compressionCacheWriter.HeaderSize(),
|
||||||
|
BodySize: this.compressionCacheWriter.BodySize(),
|
||||||
|
Host: this.req.ReqHost,
|
||||||
|
ServerId: this.req.ReqServer.Id,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
_ = this.compressionCacheWriter.Discard()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
this.sentBodyBytes = this.counterWriter.TotalBytes()
|
this.sentBodyBytes = this.counterWriter.TotalBytes()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user