diff --git a/internal/nodes/http_cache_task_manager.go b/internal/nodes/http_cache_task_manager.go index 62e0d13..675eaf2 100644 --- a/internal/nodes/http_cache_task_manager.go +++ b/internal/nodes/http_cache_task_manager.go @@ -7,6 +7,7 @@ import ( "crypto/tls" "errors" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/caches" "github.com/TeaOSLab/EdgeNode/internal/compressions" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" @@ -22,6 +23,7 @@ import ( "os" "regexp" "strings" + "sync" "time" ) @@ -42,9 +44,11 @@ var SharedHTTPCacheTaskManager = NewHTTPCacheTaskManager() // HTTPCacheTaskManager 缓存任务管理 type HTTPCacheTaskManager struct { ticker *time.Ticker - httpClient *http.Client protocolReg *regexp.Regexp + timeoutClientMap map[time.Duration]*http.Client // timeout seconds=> *http.Client + locker sync.Mutex + taskQueue chan *pb.PurgeServerCacheRequest } @@ -53,36 +57,12 @@ func NewHTTPCacheTaskManager() *HTTPCacheTaskManager { if Tea.IsTesting() { duration = 10 * time.Second } - return &HTTPCacheTaskManager{ - ticker: time.NewTicker(duration), - httpClient: &http.Client{ - Timeout: 10 * time.Minute, // TODO 可以设置请求超时时间 - Transport: &http.Transport{ - DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { - _, port, err := net.SplitHostPort(addr) - if err != nil { - return nil, err - } - conn, err := net.Dial(network, "127.0.0.1:"+port) - if err != nil { - return nil, err - } - return connutils.NewNoStat(conn), nil - }, - MaxIdleConns: 128, - MaxIdleConnsPerHost: 32, - MaxConnsPerHost: 32, - IdleConnTimeout: 2 * time.Minute, - ExpectContinueTimeout: 1 * time.Second, - TLSHandshakeTimeout: 0, - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, - }, - }, - }, - protocolReg: regexp.MustCompile(`^(?i)(http|https)://`), - taskQueue: make(chan *pb.PurgeServerCacheRequest, 1024), + return &HTTPCacheTaskManager{ + ticker: time.NewTicker(duration), + protocolReg: regexp.MustCompile(`^(?i)(http|https)://`), + taskQueue: make(chan *pb.PurgeServerCacheRequest, 1024), + timeoutClientMap: make(map[time.Duration]*http.Client), } } @@ -233,7 +213,6 @@ func (this *HTTPCacheTaskManager) processKey(key *pb.HTTPCacheTaskKey) error { } // TODO 增加失败重试 -// TODO 使用并发操作 func (this *HTTPCacheTaskManager) fetchKey(key *pb.HTTPCacheTaskKey) error { var fullKey = key.Key if !this.protocolReg.MatchString(fullKey) { @@ -249,7 +228,7 @@ func (this *HTTPCacheTaskManager) fetchKey(key *pb.HTTPCacheTaskKey) error { req.Header.Set("X-Edge-Cache-Action", "fetch") req.Header.Set("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36") // TODO 可以定义 req.Header.Set("Accept-Encoding", "gzip, deflate, br") - resp, err := this.httpClient.Do(req) + resp, err := this.httpClient().Do(req) if err != nil { err = this.simplifyErr(err) return errors.New("request failed: " + fullKey + ": " + err.Error()) @@ -288,3 +267,57 @@ func (this *HTTPCacheTaskManager) simplifyErr(err error) error { return err } + +func (this *HTTPCacheTaskManager) httpClient() *http.Client { + var timeout = serverconfigs.DefaultHTTPCachePolicyFetchTimeout + + var nodeConfig = sharedNodeConfig // copy + if nodeConfig != nil { + var cachePolicies = nodeConfig.HTTPCachePolicies // copy + if len(cachePolicies) > 0 && cachePolicies[0].FetchTimeout != nil && cachePolicies[0].FetchTimeout.Count > 0 { + var fetchTimeout = cachePolicies[0].FetchTimeout.Duration() + if fetchTimeout > 0 { + timeout = fetchTimeout + } + } + } + + this.locker.Lock() + defer this.locker.Unlock() + + client, ok := this.timeoutClientMap[timeout] + if ok { + return client + } + + client = &http.Client{ + Timeout: timeout, + Transport: &http.Transport{ + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + _, port, err := net.SplitHostPort(addr) + if err != nil { + return nil, err + } + conn, err := net.Dial(network, "127.0.0.1:"+port) + if err != nil { + return nil, err + } + + return connutils.NewNoStat(conn), nil + }, + MaxIdleConns: 128, + MaxIdleConnsPerHost: 32, + MaxConnsPerHost: 32, + IdleConnTimeout: 2 * time.Minute, + ExpectContinueTimeout: 1 * time.Second, + TLSHandshakeTimeout: 0, + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + }, + } + + this.timeoutClientMap[timeout] = client + + return client +}