2022-06-05 17:15:02 +08:00
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
package nodes
import (
"context"
"crypto/tls"
"errors"
2023-08-11 14:38:00 +08:00
"fmt"
2022-06-05 17:15:02 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
2023-08-06 17:08:29 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
2022-06-05 17:15:02 +08:00
"github.com/TeaOSLab/EdgeNode/internal/caches"
2022-06-15 12:54:56 +08:00
"github.com/TeaOSLab/EdgeNode/internal/compressions"
2023-03-10 15:14:14 +08:00
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
2022-06-05 17:15:02 +08:00
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
2024-05-03 15:31:40 +08:00
"github.com/TeaOSLab/EdgeNode/internal/utils/bytepool"
2023-04-21 15:08:44 +08:00
connutils "github.com/TeaOSLab/EdgeNode/internal/utils/conns"
2024-05-11 09:23:54 +08:00
"github.com/TeaOSLab/EdgeNode/internal/utils/goman"
2022-06-05 17:15:02 +08:00
"github.com/iwind/TeaGo/Tea"
"io"
"net"
"net/http"
2023-07-26 18:45:17 +08:00
"os"
2022-06-05 17:15:02 +08:00
"regexp"
2022-06-15 12:54:56 +08:00
"strings"
2023-08-06 17:08:29 +08:00
"sync"
2022-06-05 17:15:02 +08:00
"time"
)
func init ( ) {
2023-03-10 15:14:14 +08:00
if ! teaconst . IsMain {
return
}
2022-06-05 17:15:02 +08:00
events . On ( events . EventStart , func ( ) {
goman . New ( func ( ) {
SharedHTTPCacheTaskManager . Start ( )
} )
} )
}
var SharedHTTPCacheTaskManager = NewHTTPCacheTaskManager ( )
// HTTPCacheTaskManager 缓存任务管理
type HTTPCacheTaskManager struct {
ticker * time . Ticker
protocolReg * regexp . Regexp
2023-08-06 17:08:29 +08:00
timeoutClientMap map [ time . Duration ] * http . Client // timeout seconds=> *http.Client
locker sync . Mutex
2022-06-05 17:15:02 +08:00
taskQueue chan * pb . PurgeServerCacheRequest
}
func NewHTTPCacheTaskManager ( ) * HTTPCacheTaskManager {
2022-06-15 12:54:56 +08:00
var duration = 30 * time . Second
2022-06-05 17:15:02 +08:00
if Tea . IsTesting ( ) {
duration = 10 * time . Second
}
2023-04-21 15:08:44 +08:00
2023-08-06 17:08:29 +08:00
return & HTTPCacheTaskManager {
ticker : time . NewTicker ( duration ) ,
protocolReg : regexp . MustCompile ( ` ^(?i)(http|https):// ` ) ,
taskQueue : make ( chan * pb . PurgeServerCacheRequest , 1024 ) ,
timeoutClientMap : make ( map [ time . Duration ] * http . Client ) ,
2022-06-05 17:15:02 +08:00
}
}
func ( this * HTTPCacheTaskManager ) Start ( ) {
// task queue
goman . New ( func ( ) {
rpcClient , _ := rpc . SharedRPC ( )
if rpcClient != nil {
for taskReq := range this . taskQueue {
2022-08-24 20:04:46 +08:00
_ , err := rpcClient . ServerRPC . PurgeServerCache ( rpcClient . Context ( ) , taskReq )
2022-06-05 17:15:02 +08:00
if err != nil {
remotelogs . Error ( "HTTP_CACHE_TASK_MANAGER" , "create purge task failed: " + err . Error ( ) )
}
}
}
} )
// Loop
for range this . ticker . C {
err := this . Loop ( )
if err != nil {
remotelogs . Error ( "HTTP_CACHE_TASK_MANAGER" , "execute task failed: " + err . Error ( ) )
}
}
}
func ( this * HTTPCacheTaskManager ) Loop ( ) error {
rpcClient , err := rpc . SharedRPC ( )
if err != nil {
return err
}
2022-08-24 20:04:46 +08:00
resp , err := rpcClient . HTTPCacheTaskKeyRPC . FindDoingHTTPCacheTaskKeys ( rpcClient . Context ( ) , & pb . FindDoingHTTPCacheTaskKeysRequest { } )
2022-06-05 17:15:02 +08:00
if err != nil {
2022-08-20 08:15:16 +08:00
// 忽略连接错误
if rpc . IsConnError ( err ) {
return nil
}
2022-06-05 17:15:02 +08:00
return err
}
var keys = resp . HttpCacheTaskKeys
if len ( keys ) == 0 {
return nil
}
var pbResults = [ ] * pb . UpdateHTTPCacheTaskKeysStatusRequest_KeyResult { }
2023-07-26 18:45:17 +08:00
var taskGroup = goman . NewTaskGroup ( )
2022-06-05 17:15:02 +08:00
for _ , key := range keys {
2023-07-26 18:45:17 +08:00
var taskKey = key
taskGroup . Run ( func ( ) {
processErr := this . processKey ( taskKey )
var pbResult = & pb . UpdateHTTPCacheTaskKeysStatusRequest_KeyResult {
Id : taskKey . Id ,
NodeClusterId : taskKey . NodeClusterId ,
Error : "" ,
}
2022-06-05 17:15:02 +08:00
2023-07-26 18:45:17 +08:00
if processErr != nil {
pbResult . Error = processErr . Error ( )
}
2022-06-05 17:15:02 +08:00
2023-07-26 18:45:17 +08:00
taskGroup . Lock ( )
pbResults = append ( pbResults , pbResult )
taskGroup . Unlock ( )
} )
2022-06-05 17:15:02 +08:00
}
2023-07-26 18:45:17 +08:00
taskGroup . Wait ( )
2022-08-24 20:04:46 +08:00
_ , err = rpcClient . HTTPCacheTaskKeyRPC . UpdateHTTPCacheTaskKeysStatus ( rpcClient . Context ( ) , & pb . UpdateHTTPCacheTaskKeysStatusRequest { KeyResults : pbResults } )
2022-06-05 17:15:02 +08:00
if err != nil {
return err
}
return nil
}
func ( this * HTTPCacheTaskManager ) PushTaskKeys ( keys [ ] string ) {
select {
case this . taskQueue <- & pb . PurgeServerCacheRequest {
Keys : keys ,
Prefixes : nil ,
} :
default :
}
}
func ( this * HTTPCacheTaskManager ) processKey ( key * pb . HTTPCacheTaskKey ) error {
switch key . Type {
case "purge" :
var storages = caches . SharedManager . FindAllStorages ( )
for _ , storage := range storages {
switch key . KeyType {
case "key" :
2022-06-15 12:54:56 +08:00
var cacheKeys = [ ] string { key . Key }
if strings . HasPrefix ( key . Key , "http://" ) {
cacheKeys = append ( cacheKeys , strings . Replace ( key . Key , "http://" , "https://" , 1 ) )
} else if strings . HasPrefix ( key . Key , "https://" ) {
cacheKeys = append ( cacheKeys , strings . Replace ( key . Key , "https://" , "http://" , 1 ) )
}
// TODO 提升效率
for _ , cacheKey := range cacheKeys {
var subKeys = [ ] string {
cacheKey ,
cacheKey + caches . SuffixMethod + "HEAD" ,
cacheKey + caches . SuffixWebP ,
cacheKey + caches . SuffixPartial ,
}
// TODO 根据实际缓存的内容进行组合
for _ , encoding := range compressions . AllEncodings ( ) {
subKeys = append ( subKeys , cacheKey + caches . SuffixCompression + encoding )
subKeys = append ( subKeys , cacheKey + caches . SuffixWebP + caches . SuffixCompression + encoding )
}
err := storage . Purge ( subKeys , "file" )
if err != nil {
return err
}
2022-06-05 17:15:02 +08:00
}
case "prefix" :
2022-06-15 12:54:56 +08:00
var prefixes = [ ] string { key . Key }
if strings . HasPrefix ( key . Key , "http://" ) {
prefixes = append ( prefixes , strings . Replace ( key . Key , "http://" , "https://" , 1 ) )
} else if strings . HasPrefix ( key . Key , "https://" ) {
prefixes = append ( prefixes , strings . Replace ( key . Key , "https://" , "http://" , 1 ) )
}
err := storage . Purge ( prefixes , "dir" )
2022-06-05 17:15:02 +08:00
if err != nil {
return err
}
}
}
case "fetch" :
err := this . fetchKey ( key )
if err != nil {
return err
}
default :
return errors . New ( "invalid operation type '" + key . Type + "'" )
}
return nil
}
// TODO 增加失败重试
func ( this * HTTPCacheTaskManager ) fetchKey ( key * pb . HTTPCacheTaskKey ) error {
var fullKey = key . Key
if ! this . protocolReg . MatchString ( fullKey ) {
fullKey = "https://" + fullKey
}
req , err := http . NewRequest ( http . MethodGet , fullKey , nil )
if err != nil {
2023-08-11 14:38:00 +08:00
return fmt . Errorf ( "invalid url: '%s': %w" , fullKey , err )
2022-06-05 17:15:02 +08:00
}
// TODO 可以在管理界面自定义Header
req . Header . Set ( "X-Edge-Cache-Action" , "fetch" )
req . Header . Set ( "User-Agent" , "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36" ) // TODO 可以定义
req . Header . Set ( "Accept-Encoding" , "gzip, deflate, br" )
2023-08-06 17:08:29 +08:00
resp , err := this . httpClient ( ) . Do ( req )
2022-06-05 17:15:02 +08:00
if err != nil {
2023-07-26 18:45:17 +08:00
err = this . simplifyErr ( err )
2023-08-11 14:38:00 +08:00
return fmt . Errorf ( "request failed: '%s': %w" , fullKey , err )
2022-06-05 17:15:02 +08:00
}
defer func ( ) {
_ = resp . Body . Close ( )
} ( )
// 处理502
if resp . StatusCode == http . StatusBadGateway {
return errors . New ( "read origin site timeout" )
}
2023-07-26 18:45:17 +08:00
// 读取内容,以便于生成缓存
2024-05-03 15:31:40 +08:00
var buf = bytepool . Pool16k . Get ( )
2024-04-15 12:20:31 +08:00
_ , err = io . CopyBuffer ( io . Discard , resp . Body , buf . Bytes )
2024-05-03 15:31:40 +08:00
bytepool . Pool16k . Put ( buf )
2023-07-26 18:45:17 +08:00
if err != nil {
if err != io . EOF {
err = this . simplifyErr ( err )
2023-08-11 14:38:00 +08:00
return fmt . Errorf ( "request failed: '%s': %w" , fullKey , err )
2023-07-26 18:45:17 +08:00
} else {
err = nil
}
}
2022-06-05 17:15:02 +08:00
return nil
}
2023-07-26 18:45:17 +08:00
func ( this * HTTPCacheTaskManager ) simplifyErr ( err error ) error {
if err == nil {
return nil
}
if os . IsTimeout ( err ) {
return errors . New ( "timeout to read origin site" )
}
return err
}
2023-08-06 17:08:29 +08:00
func ( this * HTTPCacheTaskManager ) httpClient ( ) * http . Client {
var timeout = serverconfigs . DefaultHTTPCachePolicyFetchTimeout
var nodeConfig = sharedNodeConfig // copy
if nodeConfig != nil {
var cachePolicies = nodeConfig . HTTPCachePolicies // copy
if len ( cachePolicies ) > 0 && cachePolicies [ 0 ] . FetchTimeout != nil && cachePolicies [ 0 ] . FetchTimeout . Count > 0 {
var fetchTimeout = cachePolicies [ 0 ] . FetchTimeout . Duration ( )
if fetchTimeout > 0 {
timeout = fetchTimeout
}
}
}
this . locker . Lock ( )
defer this . locker . Unlock ( )
client , ok := this . timeoutClientMap [ timeout ]
if ok {
return client
}
client = & http . Client {
Timeout : timeout ,
Transport : & http . Transport {
DialContext : func ( ctx context . Context , network , addr string ) ( net . Conn , error ) {
_ , port , err := net . SplitHostPort ( addr )
if err != nil {
return nil , err
}
conn , err := net . Dial ( network , "127.0.0.1:" + port )
if err != nil {
return nil , err
}
return connutils . NewNoStat ( conn ) , nil
} ,
MaxIdleConns : 128 ,
MaxIdleConnsPerHost : 32 ,
MaxConnsPerHost : 32 ,
IdleConnTimeout : 2 * time . Minute ,
ExpectContinueTimeout : 1 * time . Second ,
TLSHandshakeTimeout : 0 ,
TLSClientConfig : & tls . Config {
InsecureSkipVerify : true ,
} ,
} ,
}
this . timeoutClientMap [ timeout ] = client
return client
}