mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-28 02:50:25 +08:00
增加刷新、预热缓存任务管理
This commit is contained in:
215
internal/nodes/http_cache_task_manager.go
Normal file
215
internal/nodes/http_cache_task_manager.go
Normal file
@@ -0,0 +1,215 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/caches"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/rpc"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"time"
|
||||
)
|
||||
|
||||
func init() {
|
||||
events.On(events.EventStart, func() {
|
||||
goman.New(func() {
|
||||
SharedHTTPCacheTaskManager.Start()
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
var SharedHTTPCacheTaskManager = NewHTTPCacheTaskManager()
|
||||
|
||||
// HTTPCacheTaskManager 缓存任务管理
|
||||
type HTTPCacheTaskManager struct {
|
||||
ticker *time.Ticker
|
||||
httpClient *http.Client
|
||||
protocolReg *regexp.Regexp
|
||||
|
||||
taskQueue chan *pb.PurgeServerCacheRequest
|
||||
}
|
||||
|
||||
func NewHTTPCacheTaskManager() *HTTPCacheTaskManager {
|
||||
var duration = 1 * time.Minute
|
||||
if Tea.IsTesting() {
|
||||
duration = 10 * time.Second
|
||||
}
|
||||
return &HTTPCacheTaskManager{
|
||||
ticker: time.NewTicker(duration),
|
||||
httpClient: &http.Client{
|
||||
Timeout: 30 * time.Second, // TODO 可以设置请求超时时间
|
||||
Transport: &http.Transport{
|
||||
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
_, port, err := net.SplitHostPort(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return net.Dial(network, "127.0.0.1:"+port)
|
||||
},
|
||||
MaxIdleConns: 128,
|
||||
MaxIdleConnsPerHost: 32,
|
||||
MaxConnsPerHost: 32,
|
||||
IdleConnTimeout: 2 * time.Minute,
|
||||
ExpectContinueTimeout: 1 * time.Second,
|
||||
TLSHandshakeTimeout: 0,
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
protocolReg: regexp.MustCompile(`^(?i)(http|https)://`),
|
||||
taskQueue: make(chan *pb.PurgeServerCacheRequest, 1024),
|
||||
}
|
||||
}
|
||||
|
||||
func (this *HTTPCacheTaskManager) Start() {
|
||||
// task queue
|
||||
goman.New(func() {
|
||||
rpcClient, _ := rpc.SharedRPC()
|
||||
|
||||
if rpcClient != nil {
|
||||
for taskReq := range this.taskQueue {
|
||||
_, err := rpcClient.ServerRPC().PurgeServerCache(rpcClient.Context(), taskReq)
|
||||
if err != nil {
|
||||
remotelogs.Error("HTTP_CACHE_TASK_MANAGER", "create purge task failed: "+err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// Loop
|
||||
for range this.ticker.C {
|
||||
err := this.Loop()
|
||||
if err != nil {
|
||||
remotelogs.Error("HTTP_CACHE_TASK_MANAGER", "execute task failed: "+err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (this *HTTPCacheTaskManager) Loop() error {
|
||||
rpcClient, err := rpc.SharedRPC()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp, err := rpcClient.HTTPCacheTaskKeyRPC().FindDoingHTTPCacheTaskKeys(rpcClient.Context(), &pb.FindDoingHTTPCacheTaskKeysRequest{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var keys = resp.HttpCacheTaskKeys
|
||||
if len(keys) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var pbResults = []*pb.UpdateHTTPCacheTaskKeysStatusRequest_KeyResult{}
|
||||
|
||||
for _, key := range keys {
|
||||
err = this.processKey(key)
|
||||
|
||||
var pbResult = &pb.UpdateHTTPCacheTaskKeysStatusRequest_KeyResult{
|
||||
Id: key.Id,
|
||||
NodeClusterId: key.NodeClusterId,
|
||||
Error: "",
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
pbResult.Error = err.Error()
|
||||
}
|
||||
pbResults = append(pbResults, pbResult)
|
||||
}
|
||||
|
||||
_, err = rpcClient.HTTPCacheTaskKeyRPC().UpdateHTTPCacheTaskKeysStatus(rpcClient.Context(), &pb.UpdateHTTPCacheTaskKeysStatusRequest{KeyResults: pbResults})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *HTTPCacheTaskManager) PushTaskKeys(keys []string) {
|
||||
select {
|
||||
case this.taskQueue <- &pb.PurgeServerCacheRequest{
|
||||
Keys: keys,
|
||||
Prefixes: nil,
|
||||
}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (this *HTTPCacheTaskManager) processKey(key *pb.HTTPCacheTaskKey) error {
|
||||
switch key.Type {
|
||||
case "purge":
|
||||
var storages = caches.SharedManager.FindAllStorages()
|
||||
for _, storage := range storages {
|
||||
switch key.KeyType {
|
||||
case "key":
|
||||
err := storage.Purge([]string{key.Key}, "file")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "prefix":
|
||||
err := storage.Purge([]string{key.Key}, "dir")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
case "fetch":
|
||||
err := this.fetchKey(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
return errors.New("invalid operation type '" + key.Type + "'")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO 增加失败重试
|
||||
func (this *HTTPCacheTaskManager) fetchKey(key *pb.HTTPCacheTaskKey) error {
|
||||
var fullKey = key.Key
|
||||
if !this.protocolReg.MatchString(fullKey) {
|
||||
fullKey = "https://" + fullKey
|
||||
}
|
||||
|
||||
req, err := http.NewRequest(http.MethodGet, fullKey, nil)
|
||||
if err != nil {
|
||||
return errors.New("invalid url: " + fullKey + ": " + err.Error())
|
||||
}
|
||||
|
||||
// TODO 可以在管理界面自定义Header
|
||||
req.Header.Set("X-Edge-Cache-Action", "fetch")
|
||||
req.Header.Set("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36") // TODO 可以定义
|
||||
req.Header.Set("Accept-Encoding", "gzip, deflate, br")
|
||||
resp, err := this.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return errors.New("request failed: " + fullKey + ": " + err.Error())
|
||||
}
|
||||
|
||||
defer func() {
|
||||
_ = resp.Body.Close()
|
||||
}()
|
||||
|
||||
// 读取内容,以便于生成缓存
|
||||
_, _ = io.Copy(ioutil.Discard, resp.Body)
|
||||
|
||||
// 处理502
|
||||
if resp.StatusCode == http.StatusBadGateway {
|
||||
return errors.New("read origin site timeout")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user