diff --git a/build/www/.gitignore b/build/www/.gitignore new file mode 100644 index 0000000..5e46596 --- /dev/null +++ b/build/www/.gitignore @@ -0,0 +1 @@ +cache \ No newline at end of file diff --git a/internal/caches/file_writer.go b/internal/caches/file_writer.go new file mode 100644 index 0000000..29e7076 --- /dev/null +++ b/internal/caches/file_writer.go @@ -0,0 +1,77 @@ +package caches + +import ( + "os" + "sync" +) + +type FileWriter struct { + rawWriter *os.File + key string + size int64 + expiredAt int64 + locker *sync.RWMutex + isReleased bool +} + +func NewFileWriter(rawWriter *os.File, key string, expiredAt int64, locker *sync.RWMutex) *FileWriter { + return &FileWriter{ + key: key, + rawWriter: rawWriter, + expiredAt: expiredAt, + locker: locker, + } +} + +// 写入数据 +func (this *FileWriter) Write(data []byte) (n int, err error) { + n, err = this.rawWriter.Write(data) + this.size += int64(n) + if err != nil { + _ = this.rawWriter.Close() + _ = os.Remove(this.rawWriter.Name()) + this.Release() + } + return +} + +// 关闭 +func (this *FileWriter) Close() error { + // 写入结束符 + _, err := this.rawWriter.WriteString("\n$$$") + if err != nil { + _ = os.Remove(this.rawWriter.Name()) + } + + this.Release() + + return err +} + +// 丢弃 +func (this *FileWriter) Discard() error { + err := os.Remove(this.rawWriter.Name()) + this.Release() + return err +} + +func (this *FileWriter) Size() int64 { + return this.size +} + +func (this *FileWriter) ExpiredAt() int64 { + return this.expiredAt +} + +func (this *FileWriter) Key() string { + return this.key +} + +// 释放锁,一定要调用 +func (this *FileWriter) Release() { + if this.isReleased { + return + } + this.isReleased = true + this.locker.Unlock() +} diff --git a/internal/caches/manager.go b/internal/caches/manager.go index 20baa48..4185564 100644 --- a/internal/caches/manager.go +++ b/internal/caches/manager.go @@ -123,9 +123,9 @@ func (this *Manager) FindStorageWithPolicy(policyId int64) StorageInterface { // 根据策略获取存储对象 func (this *Manager) NewStorageWithPolicy(policy *serverconfigs.HTTPCachePolicy) StorageInterface { switch policy.Type { - case serverconfigs.CachePolicyTypeFile: + case serverconfigs.CachePolicyStorageFile: return NewFileStorage(policy) - case serverconfigs.CachePolicyTypeMemory: + case serverconfigs.CachePolicyStorageMemory: return nil // TODO 暂时返回nil } return nil diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index f407153..b08533b 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -34,7 +34,7 @@ var ( type FileStorage struct { policy *serverconfigs.HTTPCachePolicy - cacheConfig *serverconfigs.HTTPFileCacheConfig + cacheConfig *serverconfigs.HTTPFileCacheStorage list *List locker sync.RWMutex @@ -76,7 +76,7 @@ func (this *FileStorage) Init() error { }() // 配置 - cacheConfig := &serverconfigs.HTTPFileCacheConfig{} + cacheConfig := &serverconfigs.HTTPFileCacheStorage{} optionsJSON, err := json.Marshal(this.policy.Options) if err != nil { return err @@ -119,7 +119,7 @@ func (this *FileStorage) Init() error { return nil } -func (this *FileStorage) Read(key string, readerBuf []byte, callback func(data []byte, expiredAt int64)) error { +func (this *FileStorage) Read(key string, readerBuf []byte, callback func(data []byte, size int64, expiredAt int64, isEOF bool)) error { hash, path := this.keyPath(key) if !this.list.Exist(hash) { return ErrNotFound @@ -185,6 +185,7 @@ func (this *FileStorage) Read(key string, readerBuf []byte, callback func(data [ } startOffset := SizeExpiredAt + SizeKeyLength + keyLength + SizeNL size := int(offset) + SizeEnd - startOffset + valueSize := offset - int64(startOffset) _, err = fp.Seek(int64(startOffset), io.SeekStart) if err != nil { @@ -199,10 +200,10 @@ func (this *FileStorage) Read(key string, readerBuf []byte, callback func(data [ if n <= SizeEnd-size { // 已经到了末尾 break } else { - callback(readerBuf[:n-(SizeEnd-size)], expiredAt) + callback(readerBuf[:n-(SizeEnd-size)], valueSize, expiredAt, true) } } else { - callback(readerBuf[:n], expiredAt) + callback(readerBuf[:n], valueSize, expiredAt, false) } } if err != nil { @@ -218,7 +219,7 @@ func (this *FileStorage) Read(key string, readerBuf []byte, callback func(data [ } // 打开缓存文件等待写入 -func (this *FileStorage) Open(key string, expiredAt int64) (*Writer, error) { +func (this *FileStorage) Open(key string, expiredAt int64) (Writer, error) { hash := stringutil.Md5(key) dir := this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4] _, err := os.Stat(dir) @@ -277,7 +278,7 @@ func (this *FileStorage) Open(key string, expiredAt int64) (*Writer, error) { isOk = true - return NewWriter(writer, key, expiredAt, &this.locker), nil + return NewFileWriter(writer, key, expiredAt, &this.locker), nil } // 写入缓存数据 @@ -289,6 +290,7 @@ func (this *FileStorage) Write(key string, expiredAt int64, valueReader io.Reade hash := stringutil.Md5(key) dir := this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4] + _, err := os.Stat(dir) if err != nil { if !os.IsNotExist(err) { diff --git a/internal/caches/storage_interface.go b/internal/caches/storage_interface.go index 02b48b7..a7d5ed9 100644 --- a/internal/caches/storage_interface.go +++ b/internal/caches/storage_interface.go @@ -1,6 +1,8 @@ package caches -import "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" +) // 缓存存储接口 type StorageInterface interface { @@ -8,10 +10,10 @@ type StorageInterface interface { Init() error // 读取缓存 - Read(key string, readerBuf []byte, callback func(data []byte, expiredAt int64)) error + Read(key string, readerBuf []byte, callback func(data []byte, size int64, expiredAt int64, isEOF bool)) error // 打开缓存写入器等待写入 - Open(key string, expiredAt int64) (*Writer, error) + Open(key string, expiredAt int64) (Writer, error) // 删除某个键值对应的缓存 Delete(key string) error @@ -30,4 +32,7 @@ type StorageInterface interface { // 获取当前存储的Policy Policy() *serverconfigs.HTTPCachePolicy + + // 将缓存添加到列表 + AddToList(item *Item) } diff --git a/internal/caches/writer.go b/internal/caches/writer.go index bbc7ae5..e402d5c 100644 --- a/internal/caches/writer.go +++ b/internal/caches/writer.go @@ -1,71 +1,19 @@ package caches -import ( - "os" - "sync" -) +// 缓存内容写入接口 +type Writer interface { + // 写入数据 + Write(data []byte) (n int, err error) -type Writer struct { - rawWriter *os.File - key string - size int64 - expiredAt int64 - locker *sync.RWMutex - isReleased bool -} - -func NewWriter(rawWriter *os.File, key string, expiredAt int64, locker *sync.RWMutex) *Writer { - return &Writer{ - key: key, - rawWriter: rawWriter, - expiredAt: expiredAt, - locker: locker, - } -} - -// 写入数据 -func (this *Writer) Write(data []byte) error { - n, err := this.rawWriter.Write(data) - this.size += int64(n) - if err != nil { - _ = this.rawWriter.Close() - _ = os.Remove(this.rawWriter.Name()) - this.Release() - } - - return err -} - -// 关闭 -func (this *Writer) Close() error { - // 写入结束符 - _, err := this.rawWriter.WriteString("\n$$$") - if err != nil { - _ = os.Remove(this.rawWriter.Name()) - } - - this.Release() - - return err -} - -func (this *Writer) Size() int64 { - return this.size -} - -func (this *Writer) ExpiredAt() int64 { - return this.expiredAt -} - -func (this *Writer) Key() string { - return this.key -} - -// 释放锁,一定要调用 -func (this *Writer) Release() { - if this.isReleased { - return - } - this.isReleased = true - this.locker.Unlock() + // 关闭 + Close() error + + // 丢弃 + Discard() error + + // Key + Key() string + + // 过期时间 + ExpiredAt() int64 } diff --git a/internal/caches/writer_gzip.go b/internal/caches/writer_gzip.go new file mode 100644 index 0000000..11754e1 --- /dev/null +++ b/internal/caches/writer_gzip.go @@ -0,0 +1,47 @@ +package caches + +import "compress/gzip" + +type gzipWriter struct { + rawWriter Writer + writer *gzip.Writer + key string + expiredAt int64 +} + +func NewGzipWriter(gw Writer, key string, expiredAt int64) Writer { + return &gzipWriter{ + rawWriter: gw, + writer: gzip.NewWriter(gw), + key: key, + expiredAt: expiredAt, + } +} + +func (this *gzipWriter) Write(data []byte) (n int, err error) { + return this.writer.Write(data) +} + +func (this *gzipWriter) Close() error { + err := this.writer.Close() + if err != nil { + return err + } + return this.rawWriter.Close() +} + +func (this *gzipWriter) Discard() error { + err := this.writer.Close() + if err != nil { + return err + } + return this.rawWriter.Discard() +} + +func (this *gzipWriter) Key() string { + return this.key +} + +func (this *gzipWriter) ExpiredAt() int64 { + return this.expiredAt +} diff --git a/internal/nodes/api_stream.go b/internal/nodes/api_stream.go index fb5667c..4c9abbe 100644 --- a/internal/nodes/api_stream.go +++ b/internal/nodes/api_stream.go @@ -123,23 +123,26 @@ func (this *APIStream) handleWriteCache(message *pb.NodeStreamMessage) error { }() } - writer, err := storage.Open(msg.Key, time.Now().Unix()+msg.LifeSeconds) + expiredAt := time.Now().Unix() + msg.LifeSeconds + writer, err := storage.Open(msg.Key, expiredAt) if err != nil { this.replyFail(message.RequestId, "prepare writing failed: "+err.Error()) return err } - defer func() { - // 不用担心重复 - _ = writer.Close() - }() - - err = writer.Write(msg.Value) + _, err = writer.Write(msg.Value) if err != nil { + _ = writer.Discard() this.replyFail(message.RequestId, "write failed: "+err.Error()) return err } - _ = writer.Close() + err = writer.Close() + if err == nil { + storage.AddToList(&caches.Item{ + Key: msg.Key, + ExpiredAt: expiredAt, + }) + } this.replyOk(message.RequestId, "write ok") @@ -167,7 +170,7 @@ func (this *APIStream) handleReadCache(message *pb.NodeStreamMessage) error { buf := make([]byte, 1024) size := 0 - err = storage.Read(msg.Key, buf, func(data []byte, expiredAt int64) { + err = storage.Read(msg.Key, buf, func(data []byte, valueSize int64, expiredAt int64, isEOF bool) { size += len(data) }) if err != nil { @@ -324,7 +327,7 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error { locker.Unlock() return } - // TODO 可以自定义Header + // TODO 可以在管理界面自定义Header req.Header.Set("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36") req.Header.Set("Accept-Encoding", "gzip, deflate, br") // TODO 这里需要记录下缓存是否为gzip的 resp, err := client.Do(req) @@ -339,7 +342,8 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error { _ = resp.Body.Close() }() - writer, err := storage.Open(key, time.Now().Unix()+8600) // TODO 可以设置缓存过期事件 + expiredAt := time.Now().Unix() + 8600 + writer, err := storage.Open(key, expiredAt) // TODO 可以设置缓存过期事件 if err != nil { locker.Lock() errorMessages = append(errorMessages, "open cache writer failed: "+key+": "+err.Error()) @@ -352,7 +356,7 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error { for { n, err := resp.Body.Read(buf) if n > 0 { - writerErr := writer.Write(buf[:n]) + _, writerErr := writer.Write(buf[:n]) if writerErr != nil { locker.Lock() errorMessages = append(errorMessages, "write failed: "+key+": "+writerErr.Error()) @@ -362,7 +366,13 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error { } if err != nil { if err == io.EOF { - _ = writer.Close() + err = writer.Close() + if err == nil { + storage.AddToList(&caches.Item{ + Key: key, + ExpiredAt: expiredAt, + }) + } isClosed = true } else { locker.Lock() diff --git a/internal/nodes/http_request.go b/internal/nodes/http_request.go index 5f12bdf..5dc4c3c 100644 --- a/internal/nodes/http_request.go +++ b/internal/nodes/http_request.go @@ -56,17 +56,23 @@ type HTTPRequest struct { rewriteRule *serverconfigs.HTTPRewriteRule // 匹配到的重写规则 rewriteReplace string // 重写规则的目标 rewriteIsExternalURL bool // 重写目标是否为外部URL - cachePolicy *serverconfigs.HTTPCachePolicy // 缓存策略 - cacheCond *serverconfigs.HTTPCacheCond // 缓存条件 + cacheRef *serverconfigs.HTTPCacheRef // 缓存设置 + cacheKey string // 缓存使用的Key } // 初始化 func (this *HTTPRequest) init() { this.writer = NewHTTPWriter(this, this.RawWriter) - this.web = &serverconfigs.HTTPWebConfig{} + this.web = &serverconfigs.HTTPWebConfig{IsOn: true} this.uri = this.RawReq.URL.RequestURI() this.rawURI = this.uri - this.varMapping = map[string]string{} + this.varMapping = map[string]string{ + // 缓存相关初始化 + "cache.status": "BYPASS", + "cache.policy.name": "", + "cache.policy.id": "0", + "cache.policy.type": "", + } this.requestFromTime = time.Now() } @@ -103,18 +109,15 @@ func (this *HTTPRequest) Do() { } // Gzip - shouldCloseWriter := false - if this.web.Gzip != nil && this.web.Gzip.IsOn && this.web.Gzip.Level > 0 { - shouldCloseWriter = true + if this.web.GzipRef != nil && this.web.GzipRef.IsOn && this.web.Gzip != nil && this.web.Gzip.IsOn && this.web.Gzip.Level > 0 { this.writer.Gzip(this.web.Gzip) } // 开始调用 this.doBegin() - if shouldCloseWriter { - this.writer.Close() - } + // 关闭写入 + this.writer.Close() } // 开始调用 @@ -125,6 +128,13 @@ func (this *HTTPRequest) doBegin() { return } + // 缓存 + if this.web.Cache != nil && this.web.Cache.IsOn { + if this.doCacheRead() { + return + } + } + // 重写规则 if this.rewriteRule != nil { if this.doRewrite() { @@ -132,9 +142,6 @@ func (this *HTTPRequest) doBegin() { } } - // 缓存 - // TODO - // root if this.web.Root != nil && this.web.Root.IsOn { // 如果处理成功,则终止请求的处理 @@ -237,9 +244,15 @@ func (this *HTTPRequest) configureWeb(web *serverconfigs.HTTPWebConfig, isTop bo // gzip if web.GzipRef != nil && (web.GzipRef.IsPrior || isTop) { + this.web.GzipRef = web.GzipRef this.web.Gzip = web.Gzip } + // cache + if web.Cache != nil && (web.Cache.IsPrior || isTop) { + this.web.Cache = web.Cache + } + // 重写规则 if len(web.RewriteRefs) > 0 { for index, ref := range web.RewriteRefs { diff --git a/internal/nodes/http_request_cache.go b/internal/nodes/http_request_cache.go new file mode 100644 index 0000000..9b14f58 --- /dev/null +++ b/internal/nodes/http_request_cache.go @@ -0,0 +1,153 @@ +package nodes + +import ( + "bytes" + "github.com/TeaOSLab/EdgeNode/internal/caches" + "github.com/iwind/TeaGo/logs" + "github.com/iwind/TeaGo/types" + "net/http" + "strconv" +) + +// 读取缓存 +func (this *HTTPRequest) doCacheRead() (shouldStop bool) { + if this.web.Cache == nil || !this.web.Cache.IsOn || len(this.web.Cache.CacheRefs) == 0 { + return + } + + // 检查条件 + for _, cacheRef := range this.web.Cache.CacheRefs { + if !cacheRef.IsOn || + cacheRef.CachePolicyId == 0 || + cacheRef.CachePolicy == nil || + !cacheRef.CachePolicy.IsOn || + cacheRef.Conds == nil || + !cacheRef.Conds.HasRequestConds() { + continue + } + if cacheRef.Conds.MatchRequest(this.Format) { + this.cacheRef = cacheRef + break + } + } + if this.cacheRef == nil { + return + } + + // 相关变量 + this.varMapping["cache.policy.name"] = this.cacheRef.CachePolicy.Name + this.varMapping["cache.policy.id"] = strconv.FormatInt(this.cacheRef.CachePolicy.Id, 10) + this.varMapping["cache.policy.type"] = this.cacheRef.CachePolicy.Type + + // Cache-Pragma + if this.cacheRef.EnableRequestCachePragma { + if this.RawReq.Header.Get("Cache-Control") == "no-cache" || this.RawReq.Header.Get("Pragma") == "no-cache" { + this.cacheRef = nil + return + } + } + + // TODO 支持Vary Header + + // 检查是否有缓存 + key := this.Format(this.cacheRef.Key) + if len(key) == 0 { + this.cacheRef = nil + return + } + this.cacheKey = key + + // 读取缓存 + storage := caches.SharedManager.FindStorageWithPolicy(this.cacheRef.CachePolicyId) + if storage == nil { + this.cacheRef = nil + return + } + + buf := bytePool32k.Get() + defer func() { + bytePool32k.Put(buf) + }() + + isBroken := false + headerBuf := []byte{} + statusCode := http.StatusOK + statusFound := false + headerFound := false + + err := storage.Read(key, buf, func(data []byte, valueSize int64, expiredAt int64, isEOF bool) { + if isBroken { + return + } + + // 如果Header已发送完毕 + if headerFound { + _, _ = this.writer.Write(data) + return + } + + headerBuf = append(headerBuf, data...) + + if !statusFound { + lineIndex := bytes.IndexByte(headerBuf, '\n') + if lineIndex < 0 { + return + } + + pieces := bytes.Split(headerBuf[:lineIndex], []byte{' '}) + if len(pieces) < 2 { + isBroken = true + return + } + statusCode = types.Int(string(pieces[1])) + statusFound = true + headerBuf = headerBuf[lineIndex+1:] + + // cache相关变量 + this.varMapping["cache.status"] = "HIT" + } + + for { + lineIndex := bytes.IndexByte(headerBuf, '\n') + if lineIndex < 0 { + break + } + if lineIndex == 0 || lineIndex == 1 { + headerFound = true + + this.processResponseHeaders(statusCode) + this.writer.WriteHeader(statusCode) + + _, _ = this.writer.Write(headerBuf[lineIndex+1:]) + headerBuf = nil + break + } + + // 分解Header + line := headerBuf[:lineIndex] + colonIndex := bytes.IndexByte(line, ':') + if colonIndex <= 0 { + continue + } + this.writer.Header().Set(string(line[:colonIndex]), string(bytes.TrimSpace(line[colonIndex+1:]))) + headerBuf = headerBuf[lineIndex+1:] + } + }) + + if err != nil { + if err == caches.ErrNotFound { + // cache相关变量 + this.varMapping["cache.status"] = "MISS" + return + } + + logs.Println("read from cache failed: " + err.Error()) + return + } + + if isBroken { + return + } + + return true +} diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index 4d1b6ed..c4434e5 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -5,6 +5,9 @@ import ( "bytes" "compress/gzip" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeNode/internal/caches" + "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/iwind/TeaGo/lists" "github.com/iwind/TeaGo/logs" "net" "net/http" @@ -26,6 +29,9 @@ type HTTPWriter struct { body []byte gzipBodyBuffer *bytes.Buffer // 当使用gzip压缩时使用 gzipBodyWriter *gzip.Writer // 当使用gzip压缩时使用 + + cacheWriter caches.Writer // 缓存写入 + cacheStorage caches.StorageInterface } // 包装对象 @@ -59,62 +65,8 @@ func (this *HTTPWriter) Gzip(config *serverconfigs.HTTPGzipConfig) { // 准备输出 func (this *HTTPWriter) Prepare(size int64) { - if this.gzipConfig == nil || this.gzipConfig.Level <= 0 { - return - } - - // 判断Accept是否支持gzip - if !strings.Contains(this.req.requestHeader("Accept-Encoding"), "gzip") { - return - } - - // 尺寸和类型 - if size < this.gzipConfig.MinBytes() || (this.gzipConfig.MaxBytes() > 0 && size > this.gzipConfig.MaxBytes()) { - return - } - - // 校验其他条件 - if this.gzipConfig.Conds != nil { - if len(this.gzipConfig.Conds.Groups) > 0 { - if !this.gzipConfig.Conds.MatchRequest(this.req.Format) || !this.gzipConfig.Conds.MatchResponse(this.req.Format) { - return - } - } else { - // 默认校验文档类型 - contentType := this.writer.Header().Get("Content-Type") - if len(contentType) > 0 && (!strings.HasPrefix(contentType, "text/") && !strings.HasPrefix(contentType, "application/")) { - return - } - } - } - - // 如果已经有编码则不处理 - if len(this.writer.Header().Get("Content-Encoding")) > 0 { - return - } - - // gzip writer - var err error = nil - this.gzipWriter, err = gzip.NewWriterLevel(this.writer, int(this.gzipConfig.Level)) - if err != nil { - logs.Error(err) - return - } - - // body copy - if this.bodyCopying { - this.gzipBodyBuffer = bytes.NewBuffer([]byte{}) - this.gzipBodyWriter, err = gzip.NewWriterLevel(this.gzipBodyBuffer, int(this.gzipConfig.Level)) - if err != nil { - logs.Error(err) - } - } - - header := this.writer.Header() - header.Set("Content-Encoding", "gzip") - header.Set("Transfer-Encoding", "chunked") - header.Set("Vary", "Accept-Encoding") - header.Del("Content-Length") + this.prepareGzip(size) + this.prepareCache(size) } // 包装前的原始的Writer @@ -156,6 +108,15 @@ func (this *HTTPWriter) Write(data []byte) (n int, err error) { if n > 0 { this.sentBodyBytes += int64(n) } + + // 写入缓存 + if this.cacheWriter != nil { + _, err = this.cacheWriter.Write(data) + if err != nil { + _ = this.cacheWriter.Discard() + logs.Println("write cache failed: " + err.Error()) + } + } } else { if n == 0 { n = len(data) // 防止出现short write错误 @@ -239,6 +200,7 @@ func (this *HTTPWriter) HeaderData() []byte { // 关闭 func (this *HTTPWriter) Close() { + // gzip writer if this.gzipWriter != nil { if this.bodyCopying && this.gzipBodyWriter != nil { _ = this.gzipBodyWriter.Close() @@ -247,6 +209,17 @@ func (this *HTTPWriter) Close() { _ = this.gzipWriter.Close() this.gzipWriter = nil } + + // cache writer + if this.cacheWriter != nil { + err := this.cacheWriter.Close() + if err == nil { + this.cacheStorage.AddToList(&caches.Item{ + Key: this.cacheWriter.Key(), + ExpiredAt: this.cacheWriter.ExpiredAt(), + }) + } + } } // Hijack @@ -265,3 +238,138 @@ func (this *HTTPWriter) Flush() { flusher.Flush() } } + +// 准备Gzip +func (this *HTTPWriter) prepareGzip(size int64) { + if this.gzipConfig == nil || this.gzipConfig.Level <= 0 { + return + } + + // 判断Accept是否支持gzip + if !strings.Contains(this.req.requestHeader("Accept-Encoding"), "gzip") { + return + } + + // 尺寸和类型 + if size < this.gzipConfig.MinBytes() || (this.gzipConfig.MaxBytes() > 0 && size > this.gzipConfig.MaxBytes()) { + return + } + + // 校验其他条件 + if this.gzipConfig.Conds != nil { + if len(this.gzipConfig.Conds.Groups) > 0 { + if !this.gzipConfig.Conds.MatchRequest(this.req.Format) || !this.gzipConfig.Conds.MatchResponse(this.req.Format) { + return + } + } else { + // 默认校验文档类型 + contentType := this.writer.Header().Get("Content-Type") + if len(contentType) > 0 && (!strings.HasPrefix(contentType, "text/") && !strings.HasPrefix(contentType, "application/")) { + return + } + } + } + + // 如果已经有编码则不处理 + if len(this.writer.Header().Get("Content-Encoding")) > 0 { + return + } + + // gzip writer + var err error = nil + this.gzipWriter, err = gzip.NewWriterLevel(this.writer, int(this.gzipConfig.Level)) + if err != nil { + logs.Error(err) + return + } + + // body copy + if this.bodyCopying { + this.gzipBodyBuffer = bytes.NewBuffer([]byte{}) + this.gzipBodyWriter, err = gzip.NewWriterLevel(this.gzipBodyBuffer, int(this.gzipConfig.Level)) + if err != nil { + logs.Error(err) + } + } + + header := this.writer.Header() + header.Set("Content-Encoding", "gzip") + header.Set("Transfer-Encoding", "chunked") + header.Set("Vary", "Accept-Encoding") + header.Del("Content-Length") +} + +// 准备缓存 +func (this *HTTPWriter) prepareCache(size int64) { + if this.writer == nil || size <= 0 { + return + } + + cacheRef := this.req.cacheRef + if cacheRef == nil || + cacheRef.CachePolicy == nil || + !cacheRef.IsOn || + (cacheRef.MaxSizeBytes() > 0 && size > cacheRef.MaxSizeBytes()) || + (cacheRef.CachePolicy.MaxSizeBytes() > 0 && size > cacheRef.CachePolicy.MaxSizeBytes()) { + return + } + + // 检查状态 + if len(cacheRef.Status) > 0 && !lists.ContainsInt(cacheRef.Status, this.StatusCode()) { + return + } + + // Cache-Control + if len(cacheRef.SkipResponseCacheControlValues) > 0 { + cacheControl := this.writer.Header().Get("Cache-Control") + if len(cacheControl) > 0 { + values := strings.Split(cacheControl, ",") + for _, value := range values { + if cacheRef.ContainsCacheControl(strings.TrimSpace(value)) { + return + } + } + } + } + + // Set-Cookie + if cacheRef.SkipResponseSetCookie && len(this.writer.Header().Get("Set-Cookie")) > 0 { + return + } + + // 校验其他条件 + if cacheRef.Conds != nil && cacheRef.Conds.HasResponseConds() && !cacheRef.Conds.MatchResponse(this.req.Format) { + return + } + + // 打开缓存写入 + storage := caches.SharedManager.FindStorageWithPolicy(this.req.cacheRef.CachePolicyId) + if storage == nil { + return + } + this.cacheStorage = storage + life := cacheRef.LifeSeconds() + if life <= 60 { // 最小不能少于1分钟 + life = 60 + } + expiredAt := utils.UnixTime() + life + cacheWriter, err := storage.Open(this.req.cacheKey, expiredAt) + if err != nil { + logs.Println("write cache failed: " + err.Error()) + return + } + this.cacheWriter = cacheWriter + if this.gzipWriter != nil { + this.cacheWriter = caches.NewGzipWriter(this.cacheWriter, this.req.cacheKey, expiredAt) + } + + // 写入Header + headerData := this.HeaderData() + _, err = cacheWriter.Write(headerData) + if err != nil { + logs.Println("write cache failed: " + err.Error()) + _ = this.cacheWriter.Discard() + this.cacheWriter = nil + return + } +} diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 1fdb678..990f761 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -5,6 +5,7 @@ import ( "errors" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeNode/internal/caches" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/iwind/TeaGo/logs" @@ -102,6 +103,7 @@ func (this *Node) syncConfig(isFirstTime bool) error { // 刷新配置 logs.Println("[NODE]reload config ...") nodeconfigs.ResetNodeConfig(nodeConfig) + caches.SharedManager.UpdatePolicies(nodeConfig.AllCachePolicies()) sharedNodeConfig = nodeConfig if !isFirstTime {