diff --git a/internal/compressions/utils.go b/internal/compressions/utils.go index 3e577f3..5a0b6bd 100644 --- a/internal/compressions/utils.go +++ b/internal/compressions/utils.go @@ -18,6 +18,11 @@ const ( var ErrNotSupportedContentEncoding = errors.New("not supported content encoding") +// AllEncodings 当前支持的所有编码 +func AllEncodings() []ContentEncoding { + return []ContentEncoding{ContentEncodingBr, ContentEncodingGzip, ContentEncodingDeflate} +} + // NewReader 获取Reader func NewReader(reader io.Reader, contentEncoding ContentEncoding) (Reader, error) { switch contentEncoding { diff --git a/internal/nodes/api_stream.go b/internal/nodes/api_stream.go index 1586796..60719b8 100644 --- a/internal/nodes/api_stream.go +++ b/internal/nodes/api_stream.go @@ -9,6 +9,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/caches" + "github.com/TeaOSLab/EdgeNode/internal/compressions" "github.com/TeaOSLab/EdgeNode/internal/configs" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/errors" @@ -350,7 +351,12 @@ func (this *APIStream) handlePurgeCache(message *pb.NodeStreamMessage) error { if msg.Type == "file" { var keys = msg.Keys for _, key := range keys { - keys = append(keys, key+webpSuffix) + keys = append(keys, key+webpCacheSuffix) + // TODO 根据实际缓存的内容进行组合 + for _, encoding := range compressions.AllEncodings() { + keys = append(keys, key+compressionCacheSuffix+encoding) + keys = append(keys, key+webpCacheSuffix+compressionCacheSuffix+encoding) + } } msg.Keys = keys } diff --git a/internal/nodes/http_request_cache.go b/internal/nodes/http_request_cache.go index cf8f0d2..a9f2359 100644 --- a/internal/nodes/http_request_cache.go +++ b/internal/nodes/http_request_cache.go @@ -5,6 +5,7 @@ import ( "errors" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeNode/internal/caches" + "github.com/TeaOSLab/EdgeNode/internal/compressions" "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" @@ -122,16 +123,27 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { this.cacheRef = nil return } + this.writer.cacheStorage = storage // 判断是否在Purge if this.web.Cache.PurgeIsOn && strings.ToUpper(this.RawReq.Method) == "PURGE" && this.RawReq.Header.Get("X-Edge-Purge-Key") == this.web.Cache.PurgeKey { this.varMapping["cache.status"] = "PURGE" - err := storage.Delete(key) - if err != nil { - remotelogs.Error("HTTP_REQUEST_CACHE", "purge failed: "+err.Error()) + var subKeys = []string{key} + // TODO 根据实际缓存的内容进行组合 + for _, encoding := range compressions.AllEncodings() { + subKeys = append(subKeys, key+compressionCacheSuffix+encoding) + subKeys = append(subKeys, key+webpCacheSuffix+compressionCacheSuffix+encoding) + } + for _, subKey := range subKeys { + err := storage.Delete(subKey) + if err != nil { + remotelogs.Error("HTTP_REQUEST_CACHE", "purge failed: "+err.Error()) + } } + // 通过API节点清除别节点上的的Key + // TODO 改为队列,不需要每个请求都使用goroutine goman.New(func() { rpcClient, err := rpc.SharedRPC() if err == nil { @@ -160,15 +172,46 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { var reader caches.Reader var err error - // 是否优先检查WebP - var isWebP = false + // 检查是否支持WebP + var tags = []string{} + var webPIsEnabled = false if this.web.WebP != nil && this.web.WebP.IsOn && this.web.WebP.MatchRequest(filepath.Ext(this.Path()), this.Format) && - this.web.WebP.MatchAccept(this.requestHeader("Accept")) { - reader, _ = storage.OpenReader(key+webpSuffix, useStale) + this.web.WebP.MatchAccept(this.RawReq.Header.Get("Accept")) { + webPIsEnabled = true + } + + // 检查压缩缓存 + if reader == nil { + if this.web.Compression != nil && this.web.Compression.IsOn { + _, encoding, ok := this.web.Compression.MatchAcceptEncoding(this.RawReq.Header.Get("Accept-Encoding")) + if ok { + // 检查支持WebP的压缩缓存 + if webPIsEnabled { + reader, _ = storage.OpenReader(key+webpCacheSuffix+compressionCacheSuffix+encoding, useStale) + if reader != nil { + tags = append(tags, "webp", encoding) + } + } + + // 检查普通缓存 + if reader == nil { + reader, _ = storage.OpenReader(key+compressionCacheSuffix+encoding, useStale) + if reader != nil { + tags = append(tags, encoding) + } + } + } + } + } + + // 检查WebP + if reader == nil && webPIsEnabled { + reader, _ = storage.OpenReader(key+webpCacheSuffix, useStale) if reader != nil { - isWebP = true + this.writer.cacheReaderSuffix = webpCacheSuffix + tags = append(tags, "webp") } } @@ -265,8 +308,8 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { var eTag = "" var lastModifiedAt = reader.LastModified() if lastModifiedAt > 0 { - if isWebP { - eTag = "\"" + strconv.FormatInt(lastModifiedAt, 10) + "_webp" + "\"" + if len(tags) > 0 { + eTag = "\"" + strconv.FormatInt(lastModifiedAt, 10) + "_" + strings.Join(tags, "_") + "\"" } else { eTag = "\"" + strconv.FormatInt(lastModifiedAt, 10) + "\"" } diff --git a/internal/nodes/http_request_utils.go b/internal/nodes/http_request_utils.go index ccd30b3..7218e4f 100644 --- a/internal/nodes/http_request_utils.go +++ b/internal/nodes/http_request_utils.go @@ -156,6 +156,9 @@ func httpRequestNextId() string { // 检查是否可以接受某个编码 func httpAcceptEncoding(acceptEncodings string, encoding string) bool { + if len(acceptEncodings) == 0 { + return false + } var pieces = strings.Split(acceptEncodings, ",") for _, piece := range pieces { var qualityIndex = strings.Index(piece, ";") @@ -169,3 +172,20 @@ func httpAcceptEncoding(acceptEncodings string, encoding string) bool { } return false } + +// 分隔编码 +func httpAcceptEncodings(acceptEncodings string) (encodings []string) { + if len(acceptEncodings) == 0 { + return + } + var pieces = strings.Split(acceptEncodings, ",") + for _, piece := range pieces { + var qualityIndex = strings.Index(piece, ";") + if qualityIndex >= 0 { + piece = piece[:qualityIndex] + } + + encodings = append(encodings, strings.TrimSpace(piece)) + } + return +} diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index 9e3e987..ba3a126 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -35,11 +35,14 @@ import ( ) // webp相关配置 -const webpSuffix = "@GOEDGE_WEBP" +const webpCacheSuffix = "@GOEDGE_WEBP" var webpMaxBufferSize int64 = 1_000_000_000 var webpTotalBufferSize int64 = 0 +// 压缩相关配置 +const compressionCacheSuffix = "@GOEDGE_" + func init() { var systemMemory = utils.SystemMemoryGB() / 8 if systemMemory > 0 { @@ -66,17 +69,24 @@ type HTTPWriter struct { isOk bool // 是否完全成功 isFinished bool // 是否已完成 + // Partial + isPartial bool + // WebP webpIsEncoding bool webpOriginContentType string // Compression - compressionConfig *serverconfigs.HTTPCompressionConfig + compressionConfig *serverconfigs.HTTPCompressionConfig + compressionCacheWriter caches.Writer // Cache cacheStorage caches.StorageInterface cacheWriter caches.Writer cacheIsFinished bool + + cacheReader caches.Reader + cacheReaderSuffix string } // NewHTTPWriter 包装对象 @@ -95,13 +105,22 @@ func (this *HTTPWriter) Prepare(resp *http.Response, size int64, status int, ena this.size = size this.statusCode = status - if resp != nil { + this.isPartial = status == http.StatusPartialContent + + if resp != nil && resp.Body != nil { + cacheReader, ok := resp.Body.(caches.Reader) + if ok { + this.cacheReader = cacheReader + } + this.rawReader = resp.Body if enableCache { this.PrepareCache(resp, size) } - this.PrepareWebP(resp, size) + if !this.isPartial { + this.PrepareWebP(resp, size) + } this.PrepareCompression(resp, size) } @@ -302,7 +321,7 @@ func (this *HTTPWriter) PrepareWebP(resp *http.Response, size int64) { return } - var contentEncoding = resp.Header.Get("Content-Encoding") + var contentEncoding = this.Header().Get("Content-Encoding") switch contentEncoding { case "gzip", "deflate", "br": reader, err := compressions.NewReader(resp.Body, contentEncoding) @@ -389,19 +408,70 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) { resp.Body = reader } + // 需要放在compression cache writer之前 + var header = this.rawWriter.Header() + header.Set("Content-Encoding", compressionEncoding) + header.Set("Vary", "Accept-Encoding") + header.Del("Content-Length") + + // compression cache writer + if !this.isPartial && this.cacheStorage != nil && (this.cacheReader != nil || this.cacheWriter != nil) && !this.webpIsEncoding { + var cacheKey = "" + var expiredAt int64 = 0 + + if this.cacheReader != nil { + cacheKey = this.req.cacheKey + expiredAt = this.cacheReader.ExpiresAt() + } else if this.cacheWriter != nil { + cacheKey = this.cacheWriter.Key() + expiredAt = this.cacheWriter.ExpiredAt() + } + + if len(this.cacheReaderSuffix) > 0 { + cacheKey += this.cacheReaderSuffix + } + + compressionCacheWriter, err := this.cacheStorage.OpenWriter(cacheKey+compressionCacheSuffix+compressionEncoding, expiredAt, this.StatusCode(), -1, false) + if err != nil { + return + } + + // 写入Header + for k, v := range this.Header() { + for _, v1 := range v { + _, err = compressionCacheWriter.WriteHeader([]byte(k + ":" + v1 + "\n")) + if err != nil { + remotelogs.Error("HTTP_WRITER", "write compression cache failed: "+err.Error()) + _ = compressionCacheWriter.Discard() + compressionCacheWriter = nil + return + } + } + } + + if compressionCacheWriter != nil { + this.compressionCacheWriter = compressionCacheWriter + var teeWriter = writers.NewTeeWriterCloser(this.writer, compressionCacheWriter) + teeWriter.OnFail(func(err error) { + _ = compressionCacheWriter.Discard() + this.compressionCacheWriter = nil + }) + this.writer = teeWriter + } + } + // compression writer var err error = nil compressionWriter, err := compressions.NewWriter(this.writer, compressionType, int(this.compressionConfig.Level)) if err != nil { remotelogs.Error("HTTP_WRITER", err.Error()) + header.Del("Content-Encoding") + if this.compressionCacheWriter != nil { + _ = this.compressionCacheWriter.Discard() + } return } this.writer = compressionWriter - - header := this.rawWriter.Header() - header.Set("Content-Encoding", compressionEncoding) - header.Set("Vary", "Accept-Encoding") - header.Del("Content-Length") } // SetCompression 设置内容压缩配置 @@ -557,13 +627,26 @@ func (this *HTTPWriter) Close() { var webpCacheWriter caches.Writer // 准备WebP Cache - if this.cacheWriter != nil { - var cacheKey = this.cacheWriter.Key() + webpSuffix + if this.cacheReader != nil || this.cacheWriter != nil { + var cacheKey = "" + var expiredAt int64 = 0 - webpCacheWriter, _ = this.cacheStorage.OpenWriter(cacheKey, this.cacheWriter.ExpiredAt(), this.StatusCode(), -1, false) + if this.cacheReader != nil { + cacheKey = this.req.cacheKey + webpCacheSuffix + expiredAt = this.cacheReader.ExpiresAt() + } else if this.cacheWriter != nil { + cacheKey = this.cacheWriter.Key() + webpCacheSuffix + expiredAt = this.cacheWriter.ExpiredAt() + } + + webpCacheWriter, _ = this.cacheStorage.OpenWriter(cacheKey, expiredAt, this.StatusCode(), -1, false) if webpCacheWriter != nil { // 写入Header for k, v := range this.Header() { + // 这里是原始的数据,不需要内容编码 + if k == "Content-Encoding" || k == "Transfer-Encoding" { + continue + } for _, v1 := range v { _, err := webpCacheWriter.WriteHeader([]byte(k + ":" + v1 + "\n")) if err != nil { @@ -711,6 +794,27 @@ func (this *HTTPWriter) Close() { } } + if this.compressionCacheWriter != nil { + if this.isOk { + err := this.compressionCacheWriter.Close() + if err == nil { + var expiredAt = this.compressionCacheWriter.ExpiredAt() + this.cacheStorage.AddToList(&caches.Item{ + Type: this.compressionCacheWriter.ItemType(), + Key: this.compressionCacheWriter.Key(), + ExpiredAt: expiredAt, + StaleAt: expiredAt + int64(this.calculateStaleLife()), + HeaderSize: this.compressionCacheWriter.HeaderSize(), + BodySize: this.compressionCacheWriter.BodySize(), + Host: this.req.ReqHost, + ServerId: this.req.ReqServer.Id, + }) + } + } else { + _ = this.compressionCacheWriter.Discard() + } + } + this.sentBodyBytes = this.counterWriter.TotalBytes() }