mirror of
				https://github.com/TeaOSLab/EdgeNode.git
				synced 2025-11-04 07:40:56 +08:00 
			
		
		
		
	支持缓存压缩后的内容
This commit is contained in:
		@@ -18,6 +18,11 @@ const (
 | 
			
		||||
 | 
			
		||||
var ErrNotSupportedContentEncoding = errors.New("not supported content encoding")
 | 
			
		||||
 | 
			
		||||
// AllEncodings 当前支持的所有编码
 | 
			
		||||
func AllEncodings() []ContentEncoding {
 | 
			
		||||
	return []ContentEncoding{ContentEncodingBr, ContentEncodingGzip, ContentEncodingDeflate}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewReader 获取Reader
 | 
			
		||||
func NewReader(reader io.Reader, contentEncoding ContentEncoding) (Reader, error) {
 | 
			
		||||
	switch contentEncoding {
 | 
			
		||||
 
 | 
			
		||||
@@ -9,6 +9,7 @@ import (
 | 
			
		||||
	"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/caches"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/compressions"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/configs"
 | 
			
		||||
	teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/errors"
 | 
			
		||||
@@ -350,7 +351,12 @@ func (this *APIStream) handlePurgeCache(message *pb.NodeStreamMessage) error {
 | 
			
		||||
	if msg.Type == "file" {
 | 
			
		||||
		var keys = msg.Keys
 | 
			
		||||
		for _, key := range keys {
 | 
			
		||||
			keys = append(keys, key+webpSuffix)
 | 
			
		||||
			keys = append(keys, key+webpCacheSuffix)
 | 
			
		||||
			// TODO 根据实际缓存的内容进行组合
 | 
			
		||||
			for _, encoding := range compressions.AllEncodings() {
 | 
			
		||||
				keys = append(keys, key+compressionCacheSuffix+encoding)
 | 
			
		||||
				keys = append(keys, key+webpCacheSuffix+compressionCacheSuffix+encoding)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		msg.Keys = keys
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -5,6 +5,7 @@ import (
 | 
			
		||||
	"errors"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/caches"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/compressions"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/goman"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/rpc"
 | 
			
		||||
@@ -122,16 +123,27 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
 | 
			
		||||
		this.cacheRef = nil
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	this.writer.cacheStorage = storage
 | 
			
		||||
 | 
			
		||||
	// 判断是否在Purge
 | 
			
		||||
	if this.web.Cache.PurgeIsOn && strings.ToUpper(this.RawReq.Method) == "PURGE" && this.RawReq.Header.Get("X-Edge-Purge-Key") == this.web.Cache.PurgeKey {
 | 
			
		||||
		this.varMapping["cache.status"] = "PURGE"
 | 
			
		||||
 | 
			
		||||
		err := storage.Delete(key)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			remotelogs.Error("HTTP_REQUEST_CACHE", "purge failed: "+err.Error())
 | 
			
		||||
		var subKeys = []string{key}
 | 
			
		||||
		// TODO 根据实际缓存的内容进行组合
 | 
			
		||||
		for _, encoding := range compressions.AllEncodings() {
 | 
			
		||||
			subKeys = append(subKeys, key+compressionCacheSuffix+encoding)
 | 
			
		||||
			subKeys = append(subKeys, key+webpCacheSuffix+compressionCacheSuffix+encoding)
 | 
			
		||||
		}
 | 
			
		||||
		for _, subKey := range subKeys {
 | 
			
		||||
			err := storage.Delete(subKey)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				remotelogs.Error("HTTP_REQUEST_CACHE", "purge failed: "+err.Error())
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// 通过API节点清除别节点上的的Key
 | 
			
		||||
		// TODO 改为队列,不需要每个请求都使用goroutine
 | 
			
		||||
		goman.New(func() {
 | 
			
		||||
			rpcClient, err := rpc.SharedRPC()
 | 
			
		||||
			if err == nil {
 | 
			
		||||
@@ -160,15 +172,46 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
 | 
			
		||||
	var reader caches.Reader
 | 
			
		||||
	var err error
 | 
			
		||||
 | 
			
		||||
	// 是否优先检查WebP
 | 
			
		||||
	var isWebP = false
 | 
			
		||||
	// 检查是否支持WebP
 | 
			
		||||
	var tags = []string{}
 | 
			
		||||
	var webPIsEnabled = false
 | 
			
		||||
	if this.web.WebP != nil &&
 | 
			
		||||
		this.web.WebP.IsOn &&
 | 
			
		||||
		this.web.WebP.MatchRequest(filepath.Ext(this.Path()), this.Format) &&
 | 
			
		||||
		this.web.WebP.MatchAccept(this.requestHeader("Accept")) {
 | 
			
		||||
		reader, _ = storage.OpenReader(key+webpSuffix, useStale)
 | 
			
		||||
		this.web.WebP.MatchAccept(this.RawReq.Header.Get("Accept")) {
 | 
			
		||||
		webPIsEnabled = true
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 检查压缩缓存
 | 
			
		||||
	if reader == nil {
 | 
			
		||||
		if this.web.Compression != nil && this.web.Compression.IsOn {
 | 
			
		||||
			_, encoding, ok := this.web.Compression.MatchAcceptEncoding(this.RawReq.Header.Get("Accept-Encoding"))
 | 
			
		||||
			if ok {
 | 
			
		||||
				// 检查支持WebP的压缩缓存
 | 
			
		||||
				if webPIsEnabled {
 | 
			
		||||
					reader, _ = storage.OpenReader(key+webpCacheSuffix+compressionCacheSuffix+encoding, useStale)
 | 
			
		||||
					if reader != nil {
 | 
			
		||||
						tags = append(tags, "webp", encoding)
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				// 检查普通缓存
 | 
			
		||||
				if reader == nil {
 | 
			
		||||
					reader, _ = storage.OpenReader(key+compressionCacheSuffix+encoding, useStale)
 | 
			
		||||
					if reader != nil {
 | 
			
		||||
						tags = append(tags, encoding)
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 检查WebP
 | 
			
		||||
	if reader == nil && webPIsEnabled {
 | 
			
		||||
		reader, _ = storage.OpenReader(key+webpCacheSuffix, useStale)
 | 
			
		||||
		if reader != nil {
 | 
			
		||||
			isWebP = true
 | 
			
		||||
			this.writer.cacheReaderSuffix = webpCacheSuffix
 | 
			
		||||
			tags = append(tags, "webp")
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -265,8 +308,8 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
 | 
			
		||||
	var eTag = ""
 | 
			
		||||
	var lastModifiedAt = reader.LastModified()
 | 
			
		||||
	if lastModifiedAt > 0 {
 | 
			
		||||
		if isWebP {
 | 
			
		||||
			eTag = "\"" + strconv.FormatInt(lastModifiedAt, 10) + "_webp" + "\""
 | 
			
		||||
		if len(tags) > 0 {
 | 
			
		||||
			eTag = "\"" + strconv.FormatInt(lastModifiedAt, 10) + "_" + strings.Join(tags, "_") + "\""
 | 
			
		||||
		} else {
 | 
			
		||||
			eTag = "\"" + strconv.FormatInt(lastModifiedAt, 10) + "\""
 | 
			
		||||
		}
 | 
			
		||||
 
 | 
			
		||||
@@ -156,6 +156,9 @@ func httpRequestNextId() string {
 | 
			
		||||
 | 
			
		||||
// 检查是否可以接受某个编码
 | 
			
		||||
func httpAcceptEncoding(acceptEncodings string, encoding string) bool {
 | 
			
		||||
	if len(acceptEncodings) == 0 {
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
	var pieces = strings.Split(acceptEncodings, ",")
 | 
			
		||||
	for _, piece := range pieces {
 | 
			
		||||
		var qualityIndex = strings.Index(piece, ";")
 | 
			
		||||
@@ -169,3 +172,20 @@ func httpAcceptEncoding(acceptEncodings string, encoding string) bool {
 | 
			
		||||
	}
 | 
			
		||||
	return false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 分隔编码
 | 
			
		||||
func httpAcceptEncodings(acceptEncodings string) (encodings []string) {
 | 
			
		||||
	if len(acceptEncodings) == 0 {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	var pieces = strings.Split(acceptEncodings, ",")
 | 
			
		||||
	for _, piece := range pieces {
 | 
			
		||||
		var qualityIndex = strings.Index(piece, ";")
 | 
			
		||||
		if qualityIndex >= 0 {
 | 
			
		||||
			piece = piece[:qualityIndex]
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		encodings = append(encodings, strings.TrimSpace(piece))
 | 
			
		||||
	}
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -35,11 +35,14 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// webp相关配置
 | 
			
		||||
const webpSuffix = "@GOEDGE_WEBP"
 | 
			
		||||
const webpCacheSuffix = "@GOEDGE_WEBP"
 | 
			
		||||
 | 
			
		||||
var webpMaxBufferSize int64 = 1_000_000_000
 | 
			
		||||
var webpTotalBufferSize int64 = 0
 | 
			
		||||
 | 
			
		||||
// 压缩相关配置
 | 
			
		||||
const compressionCacheSuffix = "@GOEDGE_"
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	var systemMemory = utils.SystemMemoryGB() / 8
 | 
			
		||||
	if systemMemory > 0 {
 | 
			
		||||
@@ -66,17 +69,24 @@ type HTTPWriter struct {
 | 
			
		||||
	isOk       bool // 是否完全成功
 | 
			
		||||
	isFinished bool // 是否已完成
 | 
			
		||||
 | 
			
		||||
	// Partial
 | 
			
		||||
	isPartial bool
 | 
			
		||||
 | 
			
		||||
	// WebP
 | 
			
		||||
	webpIsEncoding        bool
 | 
			
		||||
	webpOriginContentType string
 | 
			
		||||
 | 
			
		||||
	// Compression
 | 
			
		||||
	compressionConfig *serverconfigs.HTTPCompressionConfig
 | 
			
		||||
	compressionConfig      *serverconfigs.HTTPCompressionConfig
 | 
			
		||||
	compressionCacheWriter caches.Writer
 | 
			
		||||
 | 
			
		||||
	// Cache
 | 
			
		||||
	cacheStorage    caches.StorageInterface
 | 
			
		||||
	cacheWriter     caches.Writer
 | 
			
		||||
	cacheIsFinished bool
 | 
			
		||||
 | 
			
		||||
	cacheReader       caches.Reader
 | 
			
		||||
	cacheReaderSuffix string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewHTTPWriter 包装对象
 | 
			
		||||
@@ -95,13 +105,22 @@ func (this *HTTPWriter) Prepare(resp *http.Response, size int64, status int, ena
 | 
			
		||||
	this.size = size
 | 
			
		||||
	this.statusCode = status
 | 
			
		||||
 | 
			
		||||
	if resp != nil {
 | 
			
		||||
	this.isPartial = status == http.StatusPartialContent
 | 
			
		||||
 | 
			
		||||
	if resp != nil && resp.Body != nil {
 | 
			
		||||
		cacheReader, ok := resp.Body.(caches.Reader)
 | 
			
		||||
		if ok {
 | 
			
		||||
			this.cacheReader = cacheReader
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		this.rawReader = resp.Body
 | 
			
		||||
 | 
			
		||||
		if enableCache {
 | 
			
		||||
			this.PrepareCache(resp, size)
 | 
			
		||||
		}
 | 
			
		||||
		this.PrepareWebP(resp, size)
 | 
			
		||||
		if !this.isPartial {
 | 
			
		||||
			this.PrepareWebP(resp, size)
 | 
			
		||||
		}
 | 
			
		||||
		this.PrepareCompression(resp, size)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -302,7 +321,7 @@ func (this *HTTPWriter) PrepareWebP(resp *http.Response, size int64) {
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		var contentEncoding = resp.Header.Get("Content-Encoding")
 | 
			
		||||
		var contentEncoding = this.Header().Get("Content-Encoding")
 | 
			
		||||
		switch contentEncoding {
 | 
			
		||||
		case "gzip", "deflate", "br":
 | 
			
		||||
			reader, err := compressions.NewReader(resp.Body, contentEncoding)
 | 
			
		||||
@@ -389,19 +408,70 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) {
 | 
			
		||||
		resp.Body = reader
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 需要放在compression cache writer之前
 | 
			
		||||
	var header = this.rawWriter.Header()
 | 
			
		||||
	header.Set("Content-Encoding", compressionEncoding)
 | 
			
		||||
	header.Set("Vary", "Accept-Encoding")
 | 
			
		||||
	header.Del("Content-Length")
 | 
			
		||||
 | 
			
		||||
	// compression cache writer
 | 
			
		||||
	if !this.isPartial && this.cacheStorage != nil && (this.cacheReader != nil || this.cacheWriter != nil) && !this.webpIsEncoding {
 | 
			
		||||
		var cacheKey = ""
 | 
			
		||||
		var expiredAt int64 = 0
 | 
			
		||||
 | 
			
		||||
		if this.cacheReader != nil {
 | 
			
		||||
			cacheKey = this.req.cacheKey
 | 
			
		||||
			expiredAt = this.cacheReader.ExpiresAt()
 | 
			
		||||
		} else if this.cacheWriter != nil {
 | 
			
		||||
			cacheKey = this.cacheWriter.Key()
 | 
			
		||||
			expiredAt = this.cacheWriter.ExpiredAt()
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if len(this.cacheReaderSuffix) > 0 {
 | 
			
		||||
			cacheKey += this.cacheReaderSuffix
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		compressionCacheWriter, err := this.cacheStorage.OpenWriter(cacheKey+compressionCacheSuffix+compressionEncoding, expiredAt, this.StatusCode(), -1, false)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// 写入Header
 | 
			
		||||
		for k, v := range this.Header() {
 | 
			
		||||
			for _, v1 := range v {
 | 
			
		||||
				_, err = compressionCacheWriter.WriteHeader([]byte(k + ":" + v1 + "\n"))
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					remotelogs.Error("HTTP_WRITER", "write compression cache failed: "+err.Error())
 | 
			
		||||
					_ = compressionCacheWriter.Discard()
 | 
			
		||||
					compressionCacheWriter = nil
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if compressionCacheWriter != nil {
 | 
			
		||||
			this.compressionCacheWriter = compressionCacheWriter
 | 
			
		||||
			var teeWriter = writers.NewTeeWriterCloser(this.writer, compressionCacheWriter)
 | 
			
		||||
			teeWriter.OnFail(func(err error) {
 | 
			
		||||
				_ = compressionCacheWriter.Discard()
 | 
			
		||||
				this.compressionCacheWriter = nil
 | 
			
		||||
			})
 | 
			
		||||
			this.writer = teeWriter
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// compression writer
 | 
			
		||||
	var err error = nil
 | 
			
		||||
	compressionWriter, err := compressions.NewWriter(this.writer, compressionType, int(this.compressionConfig.Level))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		remotelogs.Error("HTTP_WRITER", err.Error())
 | 
			
		||||
		header.Del("Content-Encoding")
 | 
			
		||||
		if this.compressionCacheWriter != nil {
 | 
			
		||||
			_ = this.compressionCacheWriter.Discard()
 | 
			
		||||
		}
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	this.writer = compressionWriter
 | 
			
		||||
 | 
			
		||||
	header := this.rawWriter.Header()
 | 
			
		||||
	header.Set("Content-Encoding", compressionEncoding)
 | 
			
		||||
	header.Set("Vary", "Accept-Encoding")
 | 
			
		||||
	header.Del("Content-Length")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetCompression 设置内容压缩配置
 | 
			
		||||
@@ -557,13 +627,26 @@ func (this *HTTPWriter) Close() {
 | 
			
		||||
		var webpCacheWriter caches.Writer
 | 
			
		||||
 | 
			
		||||
		// 准备WebP Cache
 | 
			
		||||
		if this.cacheWriter != nil {
 | 
			
		||||
			var cacheKey = this.cacheWriter.Key() + webpSuffix
 | 
			
		||||
		if this.cacheReader != nil || this.cacheWriter != nil {
 | 
			
		||||
			var cacheKey = ""
 | 
			
		||||
			var expiredAt int64 = 0
 | 
			
		||||
 | 
			
		||||
			webpCacheWriter, _ = this.cacheStorage.OpenWriter(cacheKey, this.cacheWriter.ExpiredAt(), this.StatusCode(), -1, false)
 | 
			
		||||
			if this.cacheReader != nil {
 | 
			
		||||
				cacheKey = this.req.cacheKey + webpCacheSuffix
 | 
			
		||||
				expiredAt = this.cacheReader.ExpiresAt()
 | 
			
		||||
			} else if this.cacheWriter != nil {
 | 
			
		||||
				cacheKey = this.cacheWriter.Key() + webpCacheSuffix
 | 
			
		||||
				expiredAt = this.cacheWriter.ExpiredAt()
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			webpCacheWriter, _ = this.cacheStorage.OpenWriter(cacheKey, expiredAt, this.StatusCode(), -1, false)
 | 
			
		||||
			if webpCacheWriter != nil {
 | 
			
		||||
				// 写入Header
 | 
			
		||||
				for k, v := range this.Header() {
 | 
			
		||||
					// 这里是原始的数据,不需要内容编码
 | 
			
		||||
					if k == "Content-Encoding" || k == "Transfer-Encoding" {
 | 
			
		||||
						continue
 | 
			
		||||
					}
 | 
			
		||||
					for _, v1 := range v {
 | 
			
		||||
						_, err := webpCacheWriter.WriteHeader([]byte(k + ":" + v1 + "\n"))
 | 
			
		||||
						if err != nil {
 | 
			
		||||
@@ -711,6 +794,27 @@ func (this *HTTPWriter) Close() {
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if this.compressionCacheWriter != nil {
 | 
			
		||||
		if this.isOk {
 | 
			
		||||
			err := this.compressionCacheWriter.Close()
 | 
			
		||||
			if err == nil {
 | 
			
		||||
				var expiredAt = this.compressionCacheWriter.ExpiredAt()
 | 
			
		||||
				this.cacheStorage.AddToList(&caches.Item{
 | 
			
		||||
					Type:       this.compressionCacheWriter.ItemType(),
 | 
			
		||||
					Key:        this.compressionCacheWriter.Key(),
 | 
			
		||||
					ExpiredAt:  expiredAt,
 | 
			
		||||
					StaleAt:    expiredAt + int64(this.calculateStaleLife()),
 | 
			
		||||
					HeaderSize: this.compressionCacheWriter.HeaderSize(),
 | 
			
		||||
					BodySize:   this.compressionCacheWriter.BodySize(),
 | 
			
		||||
					Host:       this.req.ReqHost,
 | 
			
		||||
					ServerId:   this.req.ReqServer.Id,
 | 
			
		||||
				})
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			_ = this.compressionCacheWriter.Discard()
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	this.sentBodyBytes = this.counterWriter.TotalBytes()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user