缓存策略增加预热超时时间设置(默认从10分钟调整为20分钟)

This commit is contained in:
GoEdgeLab
2023-08-06 17:08:29 +08:00
parent a43743ff8a
commit 5a39b635f0

View File

@@ -7,6 +7,7 @@ import (
"crypto/tls" "crypto/tls"
"errors" "errors"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/caches" "github.com/TeaOSLab/EdgeNode/internal/caches"
"github.com/TeaOSLab/EdgeNode/internal/compressions" "github.com/TeaOSLab/EdgeNode/internal/compressions"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const" teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
@@ -22,6 +23,7 @@ import (
"os" "os"
"regexp" "regexp"
"strings" "strings"
"sync"
"time" "time"
) )
@@ -42,9 +44,11 @@ var SharedHTTPCacheTaskManager = NewHTTPCacheTaskManager()
// HTTPCacheTaskManager 缓存任务管理 // HTTPCacheTaskManager 缓存任务管理
type HTTPCacheTaskManager struct { type HTTPCacheTaskManager struct {
ticker *time.Ticker ticker *time.Ticker
httpClient *http.Client
protocolReg *regexp.Regexp protocolReg *regexp.Regexp
timeoutClientMap map[time.Duration]*http.Client // timeout seconds=> *http.Client
locker sync.Mutex
taskQueue chan *pb.PurgeServerCacheRequest taskQueue chan *pb.PurgeServerCacheRequest
} }
@@ -53,36 +57,12 @@ func NewHTTPCacheTaskManager() *HTTPCacheTaskManager {
if Tea.IsTesting() { if Tea.IsTesting() {
duration = 10 * time.Second duration = 10 * time.Second
} }
return &HTTPCacheTaskManager{ return &HTTPCacheTaskManager{
ticker: time.NewTicker(duration), ticker: time.NewTicker(duration),
httpClient: &http.Client{
Timeout: 10 * time.Minute, // TODO 可以设置请求超时时间
Transport: &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
_, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
}
conn, err := net.Dial(network, "127.0.0.1:"+port)
if err != nil {
return nil, err
}
return connutils.NewNoStat(conn), nil
},
MaxIdleConns: 128,
MaxIdleConnsPerHost: 32,
MaxConnsPerHost: 32,
IdleConnTimeout: 2 * time.Minute,
ExpectContinueTimeout: 1 * time.Second,
TLSHandshakeTimeout: 0,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
},
protocolReg: regexp.MustCompile(`^(?i)(http|https)://`), protocolReg: regexp.MustCompile(`^(?i)(http|https)://`),
taskQueue: make(chan *pb.PurgeServerCacheRequest, 1024), taskQueue: make(chan *pb.PurgeServerCacheRequest, 1024),
timeoutClientMap: make(map[time.Duration]*http.Client),
} }
} }
@@ -233,7 +213,6 @@ func (this *HTTPCacheTaskManager) processKey(key *pb.HTTPCacheTaskKey) error {
} }
// TODO 增加失败重试 // TODO 增加失败重试
// TODO 使用并发操作
func (this *HTTPCacheTaskManager) fetchKey(key *pb.HTTPCacheTaskKey) error { func (this *HTTPCacheTaskManager) fetchKey(key *pb.HTTPCacheTaskKey) error {
var fullKey = key.Key var fullKey = key.Key
if !this.protocolReg.MatchString(fullKey) { if !this.protocolReg.MatchString(fullKey) {
@@ -249,7 +228,7 @@ func (this *HTTPCacheTaskManager) fetchKey(key *pb.HTTPCacheTaskKey) error {
req.Header.Set("X-Edge-Cache-Action", "fetch") req.Header.Set("X-Edge-Cache-Action", "fetch")
req.Header.Set("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36") // TODO 可以定义 req.Header.Set("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36") // TODO 可以定义
req.Header.Set("Accept-Encoding", "gzip, deflate, br") req.Header.Set("Accept-Encoding", "gzip, deflate, br")
resp, err := this.httpClient.Do(req) resp, err := this.httpClient().Do(req)
if err != nil { if err != nil {
err = this.simplifyErr(err) err = this.simplifyErr(err)
return errors.New("request failed: " + fullKey + ": " + err.Error()) return errors.New("request failed: " + fullKey + ": " + err.Error())
@@ -288,3 +267,57 @@ func (this *HTTPCacheTaskManager) simplifyErr(err error) error {
return err return err
} }
func (this *HTTPCacheTaskManager) httpClient() *http.Client {
var timeout = serverconfigs.DefaultHTTPCachePolicyFetchTimeout
var nodeConfig = sharedNodeConfig // copy
if nodeConfig != nil {
var cachePolicies = nodeConfig.HTTPCachePolicies // copy
if len(cachePolicies) > 0 && cachePolicies[0].FetchTimeout != nil && cachePolicies[0].FetchTimeout.Count > 0 {
var fetchTimeout = cachePolicies[0].FetchTimeout.Duration()
if fetchTimeout > 0 {
timeout = fetchTimeout
}
}
}
this.locker.Lock()
defer this.locker.Unlock()
client, ok := this.timeoutClientMap[timeout]
if ok {
return client
}
client = &http.Client{
Timeout: timeout,
Transport: &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
_, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
}
conn, err := net.Dial(network, "127.0.0.1:"+port)
if err != nil {
return nil, err
}
return connutils.NewNoStat(conn), nil
},
MaxIdleConns: 128,
MaxIdleConnsPerHost: 32,
MaxConnsPerHost: 32,
IdleConnTimeout: 2 * time.Minute,
ExpectContinueTimeout: 1 * time.Second,
TLSHandshakeTimeout: 0,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
}
this.timeoutClientMap[timeout] = client
return client
}