mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-03 23:20:25 +08:00
328 lines
8.2 KiB
Go
328 lines
8.2 KiB
Go
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
|
|
|
package nodes
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"errors"
|
|
"fmt"
|
|
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
|
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
|
"github.com/TeaOSLab/EdgeNode/internal/caches"
|
|
"github.com/TeaOSLab/EdgeNode/internal/compressions"
|
|
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
|
"github.com/TeaOSLab/EdgeNode/internal/events"
|
|
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
|
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
|
"github.com/TeaOSLab/EdgeNode/internal/rpc"
|
|
"github.com/TeaOSLab/EdgeNode/internal/utils/bytepool"
|
|
connutils "github.com/TeaOSLab/EdgeNode/internal/utils/conns"
|
|
"github.com/iwind/TeaGo/Tea"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"regexp"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
func init() {
|
|
if !teaconst.IsMain {
|
|
return
|
|
}
|
|
|
|
events.On(events.EventStart, func() {
|
|
goman.New(func() {
|
|
SharedHTTPCacheTaskManager.Start()
|
|
})
|
|
})
|
|
}
|
|
|
|
var SharedHTTPCacheTaskManager = NewHTTPCacheTaskManager()
|
|
|
|
// HTTPCacheTaskManager 缓存任务管理
|
|
type HTTPCacheTaskManager struct {
|
|
ticker *time.Ticker
|
|
protocolReg *regexp.Regexp
|
|
|
|
timeoutClientMap map[time.Duration]*http.Client // timeout seconds=> *http.Client
|
|
locker sync.Mutex
|
|
|
|
taskQueue chan *pb.PurgeServerCacheRequest
|
|
}
|
|
|
|
func NewHTTPCacheTaskManager() *HTTPCacheTaskManager {
|
|
var duration = 30 * time.Second
|
|
if Tea.IsTesting() {
|
|
duration = 10 * time.Second
|
|
}
|
|
|
|
return &HTTPCacheTaskManager{
|
|
ticker: time.NewTicker(duration),
|
|
protocolReg: regexp.MustCompile(`^(?i)(http|https)://`),
|
|
taskQueue: make(chan *pb.PurgeServerCacheRequest, 1024),
|
|
timeoutClientMap: make(map[time.Duration]*http.Client),
|
|
}
|
|
}
|
|
|
|
func (this *HTTPCacheTaskManager) Start() {
|
|
// task queue
|
|
goman.New(func() {
|
|
rpcClient, _ := rpc.SharedRPC()
|
|
|
|
if rpcClient != nil {
|
|
for taskReq := range this.taskQueue {
|
|
_, err := rpcClient.ServerRPC.PurgeServerCache(rpcClient.Context(), taskReq)
|
|
if err != nil {
|
|
remotelogs.Error("HTTP_CACHE_TASK_MANAGER", "create purge task failed: "+err.Error())
|
|
}
|
|
}
|
|
}
|
|
})
|
|
|
|
// Loop
|
|
for range this.ticker.C {
|
|
err := this.Loop()
|
|
if err != nil {
|
|
remotelogs.Error("HTTP_CACHE_TASK_MANAGER", "execute task failed: "+err.Error())
|
|
}
|
|
}
|
|
}
|
|
|
|
func (this *HTTPCacheTaskManager) Loop() error {
|
|
rpcClient, err := rpc.SharedRPC()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
resp, err := rpcClient.HTTPCacheTaskKeyRPC.FindDoingHTTPCacheTaskKeys(rpcClient.Context(), &pb.FindDoingHTTPCacheTaskKeysRequest{})
|
|
if err != nil {
|
|
// 忽略连接错误
|
|
if rpc.IsConnError(err) {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
var keys = resp.HttpCacheTaskKeys
|
|
if len(keys) == 0 {
|
|
return nil
|
|
}
|
|
|
|
var pbResults = []*pb.UpdateHTTPCacheTaskKeysStatusRequest_KeyResult{}
|
|
|
|
var taskGroup = goman.NewTaskGroup()
|
|
for _, key := range keys {
|
|
var taskKey = key
|
|
taskGroup.Run(func() {
|
|
processErr := this.processKey(taskKey)
|
|
var pbResult = &pb.UpdateHTTPCacheTaskKeysStatusRequest_KeyResult{
|
|
Id: taskKey.Id,
|
|
NodeClusterId: taskKey.NodeClusterId,
|
|
Error: "",
|
|
}
|
|
|
|
if processErr != nil {
|
|
pbResult.Error = processErr.Error()
|
|
}
|
|
|
|
taskGroup.Lock()
|
|
pbResults = append(pbResults, pbResult)
|
|
taskGroup.Unlock()
|
|
})
|
|
}
|
|
|
|
taskGroup.Wait()
|
|
|
|
_, err = rpcClient.HTTPCacheTaskKeyRPC.UpdateHTTPCacheTaskKeysStatus(rpcClient.Context(), &pb.UpdateHTTPCacheTaskKeysStatusRequest{KeyResults: pbResults})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (this *HTTPCacheTaskManager) PushTaskKeys(keys []string) {
|
|
select {
|
|
case this.taskQueue <- &pb.PurgeServerCacheRequest{
|
|
Keys: keys,
|
|
Prefixes: nil,
|
|
}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (this *HTTPCacheTaskManager) processKey(key *pb.HTTPCacheTaskKey) error {
|
|
switch key.Type {
|
|
case "purge":
|
|
var storages = caches.SharedManager.FindAllStorages()
|
|
for _, storage := range storages {
|
|
switch key.KeyType {
|
|
case "key":
|
|
var cacheKeys = []string{key.Key}
|
|
if strings.HasPrefix(key.Key, "http://") {
|
|
cacheKeys = append(cacheKeys, strings.Replace(key.Key, "http://", "https://", 1))
|
|
} else if strings.HasPrefix(key.Key, "https://") {
|
|
cacheKeys = append(cacheKeys, strings.Replace(key.Key, "https://", "http://", 1))
|
|
}
|
|
|
|
// TODO 提升效率
|
|
for _, cacheKey := range cacheKeys {
|
|
var subKeys = []string{
|
|
cacheKey,
|
|
cacheKey + caches.SuffixMethod + "HEAD",
|
|
cacheKey + caches.SuffixWebP,
|
|
cacheKey + caches.SuffixPartial,
|
|
}
|
|
// TODO 根据实际缓存的内容进行组合
|
|
for _, encoding := range compressions.AllEncodings() {
|
|
subKeys = append(subKeys, cacheKey+caches.SuffixCompression+encoding)
|
|
subKeys = append(subKeys, cacheKey+caches.SuffixWebP+caches.SuffixCompression+encoding)
|
|
}
|
|
|
|
err := storage.Purge(subKeys, "file")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
case "prefix":
|
|
var prefixes = []string{key.Key}
|
|
if strings.HasPrefix(key.Key, "http://") {
|
|
prefixes = append(prefixes, strings.Replace(key.Key, "http://", "https://", 1))
|
|
} else if strings.HasPrefix(key.Key, "https://") {
|
|
prefixes = append(prefixes, strings.Replace(key.Key, "https://", "http://", 1))
|
|
}
|
|
|
|
err := storage.Purge(prefixes, "dir")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
case "fetch":
|
|
err := this.fetchKey(key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
default:
|
|
return errors.New("invalid operation type '" + key.Type + "'")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// TODO 增加失败重试
|
|
func (this *HTTPCacheTaskManager) fetchKey(key *pb.HTTPCacheTaskKey) error {
|
|
var fullKey = key.Key
|
|
if !this.protocolReg.MatchString(fullKey) {
|
|
fullKey = "https://" + fullKey
|
|
}
|
|
|
|
req, err := http.NewRequest(http.MethodGet, fullKey, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid url: '%s': %w", fullKey, err)
|
|
}
|
|
|
|
// TODO 可以在管理界面自定义Header
|
|
req.Header.Set("X-Edge-Cache-Action", "fetch")
|
|
req.Header.Set("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36") // TODO 可以定义
|
|
req.Header.Set("Accept-Encoding", "gzip, deflate, br")
|
|
resp, err := this.httpClient().Do(req)
|
|
if err != nil {
|
|
err = this.simplifyErr(err)
|
|
return fmt.Errorf("request failed: '%s': %w", fullKey, err)
|
|
}
|
|
|
|
defer func() {
|
|
_ = resp.Body.Close()
|
|
}()
|
|
|
|
// 处理502
|
|
if resp.StatusCode == http.StatusBadGateway {
|
|
return errors.New("read origin site timeout")
|
|
}
|
|
|
|
// 读取内容,以便于生成缓存
|
|
var buf = bytepool.Pool16k.Get()
|
|
_, err = io.CopyBuffer(io.Discard, resp.Body, buf.Bytes)
|
|
bytepool.Pool16k.Put(buf)
|
|
if err != nil {
|
|
if err != io.EOF {
|
|
err = this.simplifyErr(err)
|
|
return fmt.Errorf("request failed: '%s': %w", fullKey, err)
|
|
} else {
|
|
err = nil
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (this *HTTPCacheTaskManager) simplifyErr(err error) error {
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
if os.IsTimeout(err) {
|
|
return errors.New("timeout to read origin site")
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (this *HTTPCacheTaskManager) httpClient() *http.Client {
|
|
var timeout = serverconfigs.DefaultHTTPCachePolicyFetchTimeout
|
|
|
|
var nodeConfig = sharedNodeConfig // copy
|
|
if nodeConfig != nil {
|
|
var cachePolicies = nodeConfig.HTTPCachePolicies // copy
|
|
if len(cachePolicies) > 0 && cachePolicies[0].FetchTimeout != nil && cachePolicies[0].FetchTimeout.Count > 0 {
|
|
var fetchTimeout = cachePolicies[0].FetchTimeout.Duration()
|
|
if fetchTimeout > 0 {
|
|
timeout = fetchTimeout
|
|
}
|
|
}
|
|
}
|
|
|
|
this.locker.Lock()
|
|
defer this.locker.Unlock()
|
|
|
|
client, ok := this.timeoutClientMap[timeout]
|
|
if ok {
|
|
return client
|
|
}
|
|
|
|
client = &http.Client{
|
|
Timeout: timeout,
|
|
Transport: &http.Transport{
|
|
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
|
|
_, port, err := net.SplitHostPort(addr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
conn, err := net.Dial(network, "127.0.0.1:"+port)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return connutils.NewNoStat(conn), nil
|
|
},
|
|
MaxIdleConns: 128,
|
|
MaxIdleConnsPerHost: 32,
|
|
MaxConnsPerHost: 32,
|
|
IdleConnTimeout: 2 * time.Minute,
|
|
ExpectContinueTimeout: 1 * time.Second,
|
|
TLSHandshakeTimeout: 0,
|
|
TLSClientConfig: &tls.Config{
|
|
InsecureSkipVerify: true,
|
|
},
|
|
},
|
|
}
|
|
|
|
this.timeoutClientMap[timeout] = client
|
|
|
|
return client
|
|
}
|