mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-03 06:40:25 +08:00
实现缓存策略的部分功能
This commit is contained in:
1
build/www/.gitignore
vendored
Normal file
1
build/www/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
cache
|
||||
77
internal/caches/file_writer.go
Normal file
77
internal/caches/file_writer.go
Normal file
@@ -0,0 +1,77 @@
|
||||
package caches
|
||||
|
||||
import (
|
||||
"os"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type FileWriter struct {
|
||||
rawWriter *os.File
|
||||
key string
|
||||
size int64
|
||||
expiredAt int64
|
||||
locker *sync.RWMutex
|
||||
isReleased bool
|
||||
}
|
||||
|
||||
func NewFileWriter(rawWriter *os.File, key string, expiredAt int64, locker *sync.RWMutex) *FileWriter {
|
||||
return &FileWriter{
|
||||
key: key,
|
||||
rawWriter: rawWriter,
|
||||
expiredAt: expiredAt,
|
||||
locker: locker,
|
||||
}
|
||||
}
|
||||
|
||||
// 写入数据
|
||||
func (this *FileWriter) Write(data []byte) (n int, err error) {
|
||||
n, err = this.rawWriter.Write(data)
|
||||
this.size += int64(n)
|
||||
if err != nil {
|
||||
_ = this.rawWriter.Close()
|
||||
_ = os.Remove(this.rawWriter.Name())
|
||||
this.Release()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// 关闭
|
||||
func (this *FileWriter) Close() error {
|
||||
// 写入结束符
|
||||
_, err := this.rawWriter.WriteString("\n$$$")
|
||||
if err != nil {
|
||||
_ = os.Remove(this.rawWriter.Name())
|
||||
}
|
||||
|
||||
this.Release()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// 丢弃
|
||||
func (this *FileWriter) Discard() error {
|
||||
err := os.Remove(this.rawWriter.Name())
|
||||
this.Release()
|
||||
return err
|
||||
}
|
||||
|
||||
func (this *FileWriter) Size() int64 {
|
||||
return this.size
|
||||
}
|
||||
|
||||
func (this *FileWriter) ExpiredAt() int64 {
|
||||
return this.expiredAt
|
||||
}
|
||||
|
||||
func (this *FileWriter) Key() string {
|
||||
return this.key
|
||||
}
|
||||
|
||||
// 释放锁,一定要调用
|
||||
func (this *FileWriter) Release() {
|
||||
if this.isReleased {
|
||||
return
|
||||
}
|
||||
this.isReleased = true
|
||||
this.locker.Unlock()
|
||||
}
|
||||
@@ -123,9 +123,9 @@ func (this *Manager) FindStorageWithPolicy(policyId int64) StorageInterface {
|
||||
// 根据策略获取存储对象
|
||||
func (this *Manager) NewStorageWithPolicy(policy *serverconfigs.HTTPCachePolicy) StorageInterface {
|
||||
switch policy.Type {
|
||||
case serverconfigs.CachePolicyTypeFile:
|
||||
case serverconfigs.CachePolicyStorageFile:
|
||||
return NewFileStorage(policy)
|
||||
case serverconfigs.CachePolicyTypeMemory:
|
||||
case serverconfigs.CachePolicyStorageMemory:
|
||||
return nil // TODO 暂时返回nil
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -34,7 +34,7 @@ var (
|
||||
|
||||
type FileStorage struct {
|
||||
policy *serverconfigs.HTTPCachePolicy
|
||||
cacheConfig *serverconfigs.HTTPFileCacheConfig
|
||||
cacheConfig *serverconfigs.HTTPFileCacheStorage
|
||||
|
||||
list *List
|
||||
locker sync.RWMutex
|
||||
@@ -76,7 +76,7 @@ func (this *FileStorage) Init() error {
|
||||
}()
|
||||
|
||||
// 配置
|
||||
cacheConfig := &serverconfigs.HTTPFileCacheConfig{}
|
||||
cacheConfig := &serverconfigs.HTTPFileCacheStorage{}
|
||||
optionsJSON, err := json.Marshal(this.policy.Options)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -119,7 +119,7 @@ func (this *FileStorage) Init() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *FileStorage) Read(key string, readerBuf []byte, callback func(data []byte, expiredAt int64)) error {
|
||||
func (this *FileStorage) Read(key string, readerBuf []byte, callback func(data []byte, size int64, expiredAt int64, isEOF bool)) error {
|
||||
hash, path := this.keyPath(key)
|
||||
if !this.list.Exist(hash) {
|
||||
return ErrNotFound
|
||||
@@ -185,6 +185,7 @@ func (this *FileStorage) Read(key string, readerBuf []byte, callback func(data [
|
||||
}
|
||||
startOffset := SizeExpiredAt + SizeKeyLength + keyLength + SizeNL
|
||||
size := int(offset) + SizeEnd - startOffset
|
||||
valueSize := offset - int64(startOffset)
|
||||
|
||||
_, err = fp.Seek(int64(startOffset), io.SeekStart)
|
||||
if err != nil {
|
||||
@@ -199,10 +200,10 @@ func (this *FileStorage) Read(key string, readerBuf []byte, callback func(data [
|
||||
if n <= SizeEnd-size { // 已经到了末尾
|
||||
break
|
||||
} else {
|
||||
callback(readerBuf[:n-(SizeEnd-size)], expiredAt)
|
||||
callback(readerBuf[:n-(SizeEnd-size)], valueSize, expiredAt, true)
|
||||
}
|
||||
} else {
|
||||
callback(readerBuf[:n], expiredAt)
|
||||
callback(readerBuf[:n], valueSize, expiredAt, false)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
@@ -218,7 +219,7 @@ func (this *FileStorage) Read(key string, readerBuf []byte, callback func(data [
|
||||
}
|
||||
|
||||
// 打开缓存文件等待写入
|
||||
func (this *FileStorage) Open(key string, expiredAt int64) (*Writer, error) {
|
||||
func (this *FileStorage) Open(key string, expiredAt int64) (Writer, error) {
|
||||
hash := stringutil.Md5(key)
|
||||
dir := this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4]
|
||||
_, err := os.Stat(dir)
|
||||
@@ -277,7 +278,7 @@ func (this *FileStorage) Open(key string, expiredAt int64) (*Writer, error) {
|
||||
|
||||
isOk = true
|
||||
|
||||
return NewWriter(writer, key, expiredAt, &this.locker), nil
|
||||
return NewFileWriter(writer, key, expiredAt, &this.locker), nil
|
||||
}
|
||||
|
||||
// 写入缓存数据
|
||||
@@ -289,6 +290,7 @@ func (this *FileStorage) Write(key string, expiredAt int64, valueReader io.Reade
|
||||
|
||||
hash := stringutil.Md5(key)
|
||||
dir := this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4]
|
||||
|
||||
_, err := os.Stat(dir)
|
||||
if err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package caches
|
||||
|
||||
import "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
)
|
||||
|
||||
// 缓存存储接口
|
||||
type StorageInterface interface {
|
||||
@@ -8,10 +10,10 @@ type StorageInterface interface {
|
||||
Init() error
|
||||
|
||||
// 读取缓存
|
||||
Read(key string, readerBuf []byte, callback func(data []byte, expiredAt int64)) error
|
||||
Read(key string, readerBuf []byte, callback func(data []byte, size int64, expiredAt int64, isEOF bool)) error
|
||||
|
||||
// 打开缓存写入器等待写入
|
||||
Open(key string, expiredAt int64) (*Writer, error)
|
||||
Open(key string, expiredAt int64) (Writer, error)
|
||||
|
||||
// 删除某个键值对应的缓存
|
||||
Delete(key string) error
|
||||
@@ -30,4 +32,7 @@ type StorageInterface interface {
|
||||
|
||||
// 获取当前存储的Policy
|
||||
Policy() *serverconfigs.HTTPCachePolicy
|
||||
|
||||
// 将缓存添加到列表
|
||||
AddToList(item *Item)
|
||||
}
|
||||
|
||||
@@ -1,71 +1,19 @@
|
||||
package caches
|
||||
|
||||
import (
|
||||
"os"
|
||||
"sync"
|
||||
)
|
||||
// 缓存内容写入接口
|
||||
type Writer interface {
|
||||
// 写入数据
|
||||
Write(data []byte) (n int, err error)
|
||||
|
||||
type Writer struct {
|
||||
rawWriter *os.File
|
||||
key string
|
||||
size int64
|
||||
expiredAt int64
|
||||
locker *sync.RWMutex
|
||||
isReleased bool
|
||||
}
|
||||
|
||||
func NewWriter(rawWriter *os.File, key string, expiredAt int64, locker *sync.RWMutex) *Writer {
|
||||
return &Writer{
|
||||
key: key,
|
||||
rawWriter: rawWriter,
|
||||
expiredAt: expiredAt,
|
||||
locker: locker,
|
||||
}
|
||||
}
|
||||
|
||||
// 写入数据
|
||||
func (this *Writer) Write(data []byte) error {
|
||||
n, err := this.rawWriter.Write(data)
|
||||
this.size += int64(n)
|
||||
if err != nil {
|
||||
_ = this.rawWriter.Close()
|
||||
_ = os.Remove(this.rawWriter.Name())
|
||||
this.Release()
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// 关闭
|
||||
func (this *Writer) Close() error {
|
||||
// 写入结束符
|
||||
_, err := this.rawWriter.WriteString("\n$$$")
|
||||
if err != nil {
|
||||
_ = os.Remove(this.rawWriter.Name())
|
||||
}
|
||||
|
||||
this.Release()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (this *Writer) Size() int64 {
|
||||
return this.size
|
||||
}
|
||||
|
||||
func (this *Writer) ExpiredAt() int64 {
|
||||
return this.expiredAt
|
||||
}
|
||||
|
||||
func (this *Writer) Key() string {
|
||||
return this.key
|
||||
}
|
||||
|
||||
// 释放锁,一定要调用
|
||||
func (this *Writer) Release() {
|
||||
if this.isReleased {
|
||||
return
|
||||
}
|
||||
this.isReleased = true
|
||||
this.locker.Unlock()
|
||||
// 关闭
|
||||
Close() error
|
||||
|
||||
// 丢弃
|
||||
Discard() error
|
||||
|
||||
// Key
|
||||
Key() string
|
||||
|
||||
// 过期时间
|
||||
ExpiredAt() int64
|
||||
}
|
||||
|
||||
47
internal/caches/writer_gzip.go
Normal file
47
internal/caches/writer_gzip.go
Normal file
@@ -0,0 +1,47 @@
|
||||
package caches
|
||||
|
||||
import "compress/gzip"
|
||||
|
||||
type gzipWriter struct {
|
||||
rawWriter Writer
|
||||
writer *gzip.Writer
|
||||
key string
|
||||
expiredAt int64
|
||||
}
|
||||
|
||||
func NewGzipWriter(gw Writer, key string, expiredAt int64) Writer {
|
||||
return &gzipWriter{
|
||||
rawWriter: gw,
|
||||
writer: gzip.NewWriter(gw),
|
||||
key: key,
|
||||
expiredAt: expiredAt,
|
||||
}
|
||||
}
|
||||
|
||||
func (this *gzipWriter) Write(data []byte) (n int, err error) {
|
||||
return this.writer.Write(data)
|
||||
}
|
||||
|
||||
func (this *gzipWriter) Close() error {
|
||||
err := this.writer.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return this.rawWriter.Close()
|
||||
}
|
||||
|
||||
func (this *gzipWriter) Discard() error {
|
||||
err := this.writer.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return this.rawWriter.Discard()
|
||||
}
|
||||
|
||||
func (this *gzipWriter) Key() string {
|
||||
return this.key
|
||||
}
|
||||
|
||||
func (this *gzipWriter) ExpiredAt() int64 {
|
||||
return this.expiredAt
|
||||
}
|
||||
@@ -123,23 +123,26 @@ func (this *APIStream) handleWriteCache(message *pb.NodeStreamMessage) error {
|
||||
}()
|
||||
}
|
||||
|
||||
writer, err := storage.Open(msg.Key, time.Now().Unix()+msg.LifeSeconds)
|
||||
expiredAt := time.Now().Unix() + msg.LifeSeconds
|
||||
writer, err := storage.Open(msg.Key, expiredAt)
|
||||
if err != nil {
|
||||
this.replyFail(message.RequestId, "prepare writing failed: "+err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
// 不用担心重复
|
||||
_ = writer.Close()
|
||||
}()
|
||||
|
||||
err = writer.Write(msg.Value)
|
||||
_, err = writer.Write(msg.Value)
|
||||
if err != nil {
|
||||
_ = writer.Discard()
|
||||
this.replyFail(message.RequestId, "write failed: "+err.Error())
|
||||
return err
|
||||
}
|
||||
_ = writer.Close()
|
||||
err = writer.Close()
|
||||
if err == nil {
|
||||
storage.AddToList(&caches.Item{
|
||||
Key: msg.Key,
|
||||
ExpiredAt: expiredAt,
|
||||
})
|
||||
}
|
||||
|
||||
this.replyOk(message.RequestId, "write ok")
|
||||
|
||||
@@ -167,7 +170,7 @@ func (this *APIStream) handleReadCache(message *pb.NodeStreamMessage) error {
|
||||
|
||||
buf := make([]byte, 1024)
|
||||
size := 0
|
||||
err = storage.Read(msg.Key, buf, func(data []byte, expiredAt int64) {
|
||||
err = storage.Read(msg.Key, buf, func(data []byte, valueSize int64, expiredAt int64, isEOF bool) {
|
||||
size += len(data)
|
||||
})
|
||||
if err != nil {
|
||||
@@ -324,7 +327,7 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error {
|
||||
locker.Unlock()
|
||||
return
|
||||
}
|
||||
// TODO 可以自定义Header
|
||||
// TODO 可以在管理界面自定义Header
|
||||
req.Header.Set("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36")
|
||||
req.Header.Set("Accept-Encoding", "gzip, deflate, br") // TODO 这里需要记录下缓存是否为gzip的
|
||||
resp, err := client.Do(req)
|
||||
@@ -339,7 +342,8 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error {
|
||||
_ = resp.Body.Close()
|
||||
}()
|
||||
|
||||
writer, err := storage.Open(key, time.Now().Unix()+8600) // TODO 可以设置缓存过期事件
|
||||
expiredAt := time.Now().Unix() + 8600
|
||||
writer, err := storage.Open(key, expiredAt) // TODO 可以设置缓存过期事件
|
||||
if err != nil {
|
||||
locker.Lock()
|
||||
errorMessages = append(errorMessages, "open cache writer failed: "+key+": "+err.Error())
|
||||
@@ -352,7 +356,7 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error {
|
||||
for {
|
||||
n, err := resp.Body.Read(buf)
|
||||
if n > 0 {
|
||||
writerErr := writer.Write(buf[:n])
|
||||
_, writerErr := writer.Write(buf[:n])
|
||||
if writerErr != nil {
|
||||
locker.Lock()
|
||||
errorMessages = append(errorMessages, "write failed: "+key+": "+writerErr.Error())
|
||||
@@ -362,7 +366,13 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error {
|
||||
}
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
_ = writer.Close()
|
||||
err = writer.Close()
|
||||
if err == nil {
|
||||
storage.AddToList(&caches.Item{
|
||||
Key: key,
|
||||
ExpiredAt: expiredAt,
|
||||
})
|
||||
}
|
||||
isClosed = true
|
||||
} else {
|
||||
locker.Lock()
|
||||
|
||||
@@ -56,17 +56,23 @@ type HTTPRequest struct {
|
||||
rewriteRule *serverconfigs.HTTPRewriteRule // 匹配到的重写规则
|
||||
rewriteReplace string // 重写规则的目标
|
||||
rewriteIsExternalURL bool // 重写目标是否为外部URL
|
||||
cachePolicy *serverconfigs.HTTPCachePolicy // 缓存策略
|
||||
cacheCond *serverconfigs.HTTPCacheCond // 缓存条件
|
||||
cacheRef *serverconfigs.HTTPCacheRef // 缓存设置
|
||||
cacheKey string // 缓存使用的Key
|
||||
}
|
||||
|
||||
// 初始化
|
||||
func (this *HTTPRequest) init() {
|
||||
this.writer = NewHTTPWriter(this, this.RawWriter)
|
||||
this.web = &serverconfigs.HTTPWebConfig{}
|
||||
this.web = &serverconfigs.HTTPWebConfig{IsOn: true}
|
||||
this.uri = this.RawReq.URL.RequestURI()
|
||||
this.rawURI = this.uri
|
||||
this.varMapping = map[string]string{}
|
||||
this.varMapping = map[string]string{
|
||||
// 缓存相关初始化
|
||||
"cache.status": "BYPASS",
|
||||
"cache.policy.name": "",
|
||||
"cache.policy.id": "0",
|
||||
"cache.policy.type": "",
|
||||
}
|
||||
this.requestFromTime = time.Now()
|
||||
}
|
||||
|
||||
@@ -103,18 +109,15 @@ func (this *HTTPRequest) Do() {
|
||||
}
|
||||
|
||||
// Gzip
|
||||
shouldCloseWriter := false
|
||||
if this.web.Gzip != nil && this.web.Gzip.IsOn && this.web.Gzip.Level > 0 {
|
||||
shouldCloseWriter = true
|
||||
if this.web.GzipRef != nil && this.web.GzipRef.IsOn && this.web.Gzip != nil && this.web.Gzip.IsOn && this.web.Gzip.Level > 0 {
|
||||
this.writer.Gzip(this.web.Gzip)
|
||||
}
|
||||
|
||||
// 开始调用
|
||||
this.doBegin()
|
||||
|
||||
if shouldCloseWriter {
|
||||
this.writer.Close()
|
||||
}
|
||||
// 关闭写入
|
||||
this.writer.Close()
|
||||
}
|
||||
|
||||
// 开始调用
|
||||
@@ -125,6 +128,13 @@ func (this *HTTPRequest) doBegin() {
|
||||
return
|
||||
}
|
||||
|
||||
// 缓存
|
||||
if this.web.Cache != nil && this.web.Cache.IsOn {
|
||||
if this.doCacheRead() {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// 重写规则
|
||||
if this.rewriteRule != nil {
|
||||
if this.doRewrite() {
|
||||
@@ -132,9 +142,6 @@ func (this *HTTPRequest) doBegin() {
|
||||
}
|
||||
}
|
||||
|
||||
// 缓存
|
||||
// TODO
|
||||
|
||||
// root
|
||||
if this.web.Root != nil && this.web.Root.IsOn {
|
||||
// 如果处理成功,则终止请求的处理
|
||||
@@ -237,9 +244,15 @@ func (this *HTTPRequest) configureWeb(web *serverconfigs.HTTPWebConfig, isTop bo
|
||||
|
||||
// gzip
|
||||
if web.GzipRef != nil && (web.GzipRef.IsPrior || isTop) {
|
||||
this.web.GzipRef = web.GzipRef
|
||||
this.web.Gzip = web.Gzip
|
||||
}
|
||||
|
||||
// cache
|
||||
if web.Cache != nil && (web.Cache.IsPrior || isTop) {
|
||||
this.web.Cache = web.Cache
|
||||
}
|
||||
|
||||
// 重写规则
|
||||
if len(web.RewriteRefs) > 0 {
|
||||
for index, ref := range web.RewriteRefs {
|
||||
|
||||
153
internal/nodes/http_request_cache.go
Normal file
153
internal/nodes/http_request_cache.go
Normal file
@@ -0,0 +1,153 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/caches"
|
||||
"github.com/iwind/TeaGo/logs"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"net/http"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// 读取缓存
|
||||
func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
|
||||
if this.web.Cache == nil || !this.web.Cache.IsOn || len(this.web.Cache.CacheRefs) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// 检查条件
|
||||
for _, cacheRef := range this.web.Cache.CacheRefs {
|
||||
if !cacheRef.IsOn ||
|
||||
cacheRef.CachePolicyId == 0 ||
|
||||
cacheRef.CachePolicy == nil ||
|
||||
!cacheRef.CachePolicy.IsOn ||
|
||||
cacheRef.Conds == nil ||
|
||||
!cacheRef.Conds.HasRequestConds() {
|
||||
continue
|
||||
}
|
||||
if cacheRef.Conds.MatchRequest(this.Format) {
|
||||
this.cacheRef = cacheRef
|
||||
break
|
||||
}
|
||||
}
|
||||
if this.cacheRef == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// 相关变量
|
||||
this.varMapping["cache.policy.name"] = this.cacheRef.CachePolicy.Name
|
||||
this.varMapping["cache.policy.id"] = strconv.FormatInt(this.cacheRef.CachePolicy.Id, 10)
|
||||
this.varMapping["cache.policy.type"] = this.cacheRef.CachePolicy.Type
|
||||
|
||||
// Cache-Pragma
|
||||
if this.cacheRef.EnableRequestCachePragma {
|
||||
if this.RawReq.Header.Get("Cache-Control") == "no-cache" || this.RawReq.Header.Get("Pragma") == "no-cache" {
|
||||
this.cacheRef = nil
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// TODO 支持Vary Header
|
||||
|
||||
// 检查是否有缓存
|
||||
key := this.Format(this.cacheRef.Key)
|
||||
if len(key) == 0 {
|
||||
this.cacheRef = nil
|
||||
return
|
||||
}
|
||||
this.cacheKey = key
|
||||
|
||||
// 读取缓存
|
||||
storage := caches.SharedManager.FindStorageWithPolicy(this.cacheRef.CachePolicyId)
|
||||
if storage == nil {
|
||||
this.cacheRef = nil
|
||||
return
|
||||
}
|
||||
|
||||
buf := bytePool32k.Get()
|
||||
defer func() {
|
||||
bytePool32k.Put(buf)
|
||||
}()
|
||||
|
||||
isBroken := false
|
||||
headerBuf := []byte{}
|
||||
statusCode := http.StatusOK
|
||||
statusFound := false
|
||||
headerFound := false
|
||||
|
||||
err := storage.Read(key, buf, func(data []byte, valueSize int64, expiredAt int64, isEOF bool) {
|
||||
if isBroken {
|
||||
return
|
||||
}
|
||||
|
||||
// 如果Header已发送完毕
|
||||
if headerFound {
|
||||
_, _ = this.writer.Write(data)
|
||||
return
|
||||
}
|
||||
|
||||
headerBuf = append(headerBuf, data...)
|
||||
|
||||
if !statusFound {
|
||||
lineIndex := bytes.IndexByte(headerBuf, '\n')
|
||||
if lineIndex < 0 {
|
||||
return
|
||||
}
|
||||
|
||||
pieces := bytes.Split(headerBuf[:lineIndex], []byte{' '})
|
||||
if len(pieces) < 2 {
|
||||
isBroken = true
|
||||
return
|
||||
}
|
||||
statusCode = types.Int(string(pieces[1]))
|
||||
statusFound = true
|
||||
headerBuf = headerBuf[lineIndex+1:]
|
||||
|
||||
// cache相关变量
|
||||
this.varMapping["cache.status"] = "HIT"
|
||||
}
|
||||
|
||||
for {
|
||||
lineIndex := bytes.IndexByte(headerBuf, '\n')
|
||||
if lineIndex < 0 {
|
||||
break
|
||||
}
|
||||
if lineIndex == 0 || lineIndex == 1 {
|
||||
headerFound = true
|
||||
|
||||
this.processResponseHeaders(statusCode)
|
||||
this.writer.WriteHeader(statusCode)
|
||||
|
||||
_, _ = this.writer.Write(headerBuf[lineIndex+1:])
|
||||
headerBuf = nil
|
||||
break
|
||||
}
|
||||
|
||||
// 分解Header
|
||||
line := headerBuf[:lineIndex]
|
||||
colonIndex := bytes.IndexByte(line, ':')
|
||||
if colonIndex <= 0 {
|
||||
continue
|
||||
}
|
||||
this.writer.Header().Set(string(line[:colonIndex]), string(bytes.TrimSpace(line[colonIndex+1:])))
|
||||
headerBuf = headerBuf[lineIndex+1:]
|
||||
}
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
if err == caches.ErrNotFound {
|
||||
// cache相关变量
|
||||
this.varMapping["cache.status"] = "MISS"
|
||||
return
|
||||
}
|
||||
|
||||
logs.Println("read from cache failed: " + err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
if isBroken {
|
||||
return
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
@@ -5,6 +5,9 @@ import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/caches"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
||||
"github.com/iwind/TeaGo/lists"
|
||||
"github.com/iwind/TeaGo/logs"
|
||||
"net"
|
||||
"net/http"
|
||||
@@ -26,6 +29,9 @@ type HTTPWriter struct {
|
||||
body []byte
|
||||
gzipBodyBuffer *bytes.Buffer // 当使用gzip压缩时使用
|
||||
gzipBodyWriter *gzip.Writer // 当使用gzip压缩时使用
|
||||
|
||||
cacheWriter caches.Writer // 缓存写入
|
||||
cacheStorage caches.StorageInterface
|
||||
}
|
||||
|
||||
// 包装对象
|
||||
@@ -59,62 +65,8 @@ func (this *HTTPWriter) Gzip(config *serverconfigs.HTTPGzipConfig) {
|
||||
|
||||
// 准备输出
|
||||
func (this *HTTPWriter) Prepare(size int64) {
|
||||
if this.gzipConfig == nil || this.gzipConfig.Level <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// 判断Accept是否支持gzip
|
||||
if !strings.Contains(this.req.requestHeader("Accept-Encoding"), "gzip") {
|
||||
return
|
||||
}
|
||||
|
||||
// 尺寸和类型
|
||||
if size < this.gzipConfig.MinBytes() || (this.gzipConfig.MaxBytes() > 0 && size > this.gzipConfig.MaxBytes()) {
|
||||
return
|
||||
}
|
||||
|
||||
// 校验其他条件
|
||||
if this.gzipConfig.Conds != nil {
|
||||
if len(this.gzipConfig.Conds.Groups) > 0 {
|
||||
if !this.gzipConfig.Conds.MatchRequest(this.req.Format) || !this.gzipConfig.Conds.MatchResponse(this.req.Format) {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
// 默认校验文档类型
|
||||
contentType := this.writer.Header().Get("Content-Type")
|
||||
if len(contentType) > 0 && (!strings.HasPrefix(contentType, "text/") && !strings.HasPrefix(contentType, "application/")) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 如果已经有编码则不处理
|
||||
if len(this.writer.Header().Get("Content-Encoding")) > 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// gzip writer
|
||||
var err error = nil
|
||||
this.gzipWriter, err = gzip.NewWriterLevel(this.writer, int(this.gzipConfig.Level))
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
// body copy
|
||||
if this.bodyCopying {
|
||||
this.gzipBodyBuffer = bytes.NewBuffer([]byte{})
|
||||
this.gzipBodyWriter, err = gzip.NewWriterLevel(this.gzipBodyBuffer, int(this.gzipConfig.Level))
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
header := this.writer.Header()
|
||||
header.Set("Content-Encoding", "gzip")
|
||||
header.Set("Transfer-Encoding", "chunked")
|
||||
header.Set("Vary", "Accept-Encoding")
|
||||
header.Del("Content-Length")
|
||||
this.prepareGzip(size)
|
||||
this.prepareCache(size)
|
||||
}
|
||||
|
||||
// 包装前的原始的Writer
|
||||
@@ -156,6 +108,15 @@ func (this *HTTPWriter) Write(data []byte) (n int, err error) {
|
||||
if n > 0 {
|
||||
this.sentBodyBytes += int64(n)
|
||||
}
|
||||
|
||||
// 写入缓存
|
||||
if this.cacheWriter != nil {
|
||||
_, err = this.cacheWriter.Write(data)
|
||||
if err != nil {
|
||||
_ = this.cacheWriter.Discard()
|
||||
logs.Println("write cache failed: " + err.Error())
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if n == 0 {
|
||||
n = len(data) // 防止出现short write错误
|
||||
@@ -239,6 +200,7 @@ func (this *HTTPWriter) HeaderData() []byte {
|
||||
|
||||
// 关闭
|
||||
func (this *HTTPWriter) Close() {
|
||||
// gzip writer
|
||||
if this.gzipWriter != nil {
|
||||
if this.bodyCopying && this.gzipBodyWriter != nil {
|
||||
_ = this.gzipBodyWriter.Close()
|
||||
@@ -247,6 +209,17 @@ func (this *HTTPWriter) Close() {
|
||||
_ = this.gzipWriter.Close()
|
||||
this.gzipWriter = nil
|
||||
}
|
||||
|
||||
// cache writer
|
||||
if this.cacheWriter != nil {
|
||||
err := this.cacheWriter.Close()
|
||||
if err == nil {
|
||||
this.cacheStorage.AddToList(&caches.Item{
|
||||
Key: this.cacheWriter.Key(),
|
||||
ExpiredAt: this.cacheWriter.ExpiredAt(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Hijack
|
||||
@@ -265,3 +238,138 @@ func (this *HTTPWriter) Flush() {
|
||||
flusher.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
// 准备Gzip
|
||||
func (this *HTTPWriter) prepareGzip(size int64) {
|
||||
if this.gzipConfig == nil || this.gzipConfig.Level <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// 判断Accept是否支持gzip
|
||||
if !strings.Contains(this.req.requestHeader("Accept-Encoding"), "gzip") {
|
||||
return
|
||||
}
|
||||
|
||||
// 尺寸和类型
|
||||
if size < this.gzipConfig.MinBytes() || (this.gzipConfig.MaxBytes() > 0 && size > this.gzipConfig.MaxBytes()) {
|
||||
return
|
||||
}
|
||||
|
||||
// 校验其他条件
|
||||
if this.gzipConfig.Conds != nil {
|
||||
if len(this.gzipConfig.Conds.Groups) > 0 {
|
||||
if !this.gzipConfig.Conds.MatchRequest(this.req.Format) || !this.gzipConfig.Conds.MatchResponse(this.req.Format) {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
// 默认校验文档类型
|
||||
contentType := this.writer.Header().Get("Content-Type")
|
||||
if len(contentType) > 0 && (!strings.HasPrefix(contentType, "text/") && !strings.HasPrefix(contentType, "application/")) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 如果已经有编码则不处理
|
||||
if len(this.writer.Header().Get("Content-Encoding")) > 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// gzip writer
|
||||
var err error = nil
|
||||
this.gzipWriter, err = gzip.NewWriterLevel(this.writer, int(this.gzipConfig.Level))
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
// body copy
|
||||
if this.bodyCopying {
|
||||
this.gzipBodyBuffer = bytes.NewBuffer([]byte{})
|
||||
this.gzipBodyWriter, err = gzip.NewWriterLevel(this.gzipBodyBuffer, int(this.gzipConfig.Level))
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
header := this.writer.Header()
|
||||
header.Set("Content-Encoding", "gzip")
|
||||
header.Set("Transfer-Encoding", "chunked")
|
||||
header.Set("Vary", "Accept-Encoding")
|
||||
header.Del("Content-Length")
|
||||
}
|
||||
|
||||
// 准备缓存
|
||||
func (this *HTTPWriter) prepareCache(size int64) {
|
||||
if this.writer == nil || size <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
cacheRef := this.req.cacheRef
|
||||
if cacheRef == nil ||
|
||||
cacheRef.CachePolicy == nil ||
|
||||
!cacheRef.IsOn ||
|
||||
(cacheRef.MaxSizeBytes() > 0 && size > cacheRef.MaxSizeBytes()) ||
|
||||
(cacheRef.CachePolicy.MaxSizeBytes() > 0 && size > cacheRef.CachePolicy.MaxSizeBytes()) {
|
||||
return
|
||||
}
|
||||
|
||||
// 检查状态
|
||||
if len(cacheRef.Status) > 0 && !lists.ContainsInt(cacheRef.Status, this.StatusCode()) {
|
||||
return
|
||||
}
|
||||
|
||||
// Cache-Control
|
||||
if len(cacheRef.SkipResponseCacheControlValues) > 0 {
|
||||
cacheControl := this.writer.Header().Get("Cache-Control")
|
||||
if len(cacheControl) > 0 {
|
||||
values := strings.Split(cacheControl, ",")
|
||||
for _, value := range values {
|
||||
if cacheRef.ContainsCacheControl(strings.TrimSpace(value)) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Set-Cookie
|
||||
if cacheRef.SkipResponseSetCookie && len(this.writer.Header().Get("Set-Cookie")) > 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// 校验其他条件
|
||||
if cacheRef.Conds != nil && cacheRef.Conds.HasResponseConds() && !cacheRef.Conds.MatchResponse(this.req.Format) {
|
||||
return
|
||||
}
|
||||
|
||||
// 打开缓存写入
|
||||
storage := caches.SharedManager.FindStorageWithPolicy(this.req.cacheRef.CachePolicyId)
|
||||
if storage == nil {
|
||||
return
|
||||
}
|
||||
this.cacheStorage = storage
|
||||
life := cacheRef.LifeSeconds()
|
||||
if life <= 60 { // 最小不能少于1分钟
|
||||
life = 60
|
||||
}
|
||||
expiredAt := utils.UnixTime() + life
|
||||
cacheWriter, err := storage.Open(this.req.cacheKey, expiredAt)
|
||||
if err != nil {
|
||||
logs.Println("write cache failed: " + err.Error())
|
||||
return
|
||||
}
|
||||
this.cacheWriter = cacheWriter
|
||||
if this.gzipWriter != nil {
|
||||
this.cacheWriter = caches.NewGzipWriter(this.cacheWriter, this.req.cacheKey, expiredAt)
|
||||
}
|
||||
|
||||
// 写入Header
|
||||
headerData := this.HeaderData()
|
||||
_, err = cacheWriter.Write(headerData)
|
||||
if err != nil {
|
||||
logs.Println("write cache failed: " + err.Error())
|
||||
_ = this.cacheWriter.Discard()
|
||||
this.cacheWriter = nil
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/caches"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/rpc"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
||||
"github.com/iwind/TeaGo/logs"
|
||||
@@ -102,6 +103,7 @@ func (this *Node) syncConfig(isFirstTime bool) error {
|
||||
// 刷新配置
|
||||
logs.Println("[NODE]reload config ...")
|
||||
nodeconfigs.ResetNodeConfig(nodeConfig)
|
||||
caches.SharedManager.UpdatePolicies(nodeConfig.AllCachePolicies())
|
||||
sharedNodeConfig = nodeConfig
|
||||
|
||||
if !isFirstTime {
|
||||
|
||||
Reference in New Issue
Block a user