From 771eff8fb1100e14e0c5a0a8e9fc0fafcab6c48b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E7=A5=A5=E8=B6=85?= Date: Sun, 5 Jun 2022 17:15:02 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=88=B7=E6=96=B0=E3=80=81?= =?UTF-8?q?=E9=A2=84=E7=83=AD=E7=BC=93=E5=AD=98=E4=BB=BB=E5=8A=A1=E7=AE=A1?= =?UTF-8?q?=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/caches/manager.go | 12 + internal/caches/storage_file.go | 3 +- internal/caches/storage_interface.go | 1 + internal/caches/storage_memory.go | 2 + internal/iplibrary/list_utils.go | 5 + internal/nodes/api_stream.go | 229 ------------------ internal/nodes/http_cache_task_manager.go | 215 ++++++++++++++++ .../nodes/http_cache_task_manager_test.go | 25 ++ internal/nodes/http_request_cache.go | 38 +-- internal/rpc/rpc_client.go | 16 +- 10 files changed, 279 insertions(+), 267 deletions(-) create mode 100644 internal/nodes/http_cache_task_manager.go create mode 100644 internal/nodes/http_cache_task_manager_test.go diff --git a/internal/caches/manager.go b/internal/caches/manager.go index 536eadb..6b7206f 100644 --- a/internal/caches/manager.go +++ b/internal/caches/manager.go @@ -214,3 +214,15 @@ func (this *Manager) FindAllCachePaths() []string { } return result } + +// FindAllStorages 读取所有缓存存储 +func (this *Manager) FindAllStorages() []StorageInterface { + this.locker.Lock() + defer this.locker.Unlock() + + var storages = []StorageInterface{} + for _, storage := range this.storageMap { + storages = append(storages, storage) + } + return storages +} diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index acdf4de..a518e5f 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -769,9 +769,10 @@ func (this *FileStorage) Purge(keys []string, urlType string) error { return err } } + return nil } - // 文件 + // URL for _, key := range keys { hash, path := this.keyPath(key) err := this.removeCacheFile(path) diff --git a/internal/caches/storage_interface.go b/internal/caches/storage_interface.go index 2a6b14d..ef37092 100644 --- a/internal/caches/storage_interface.go +++ b/internal/caches/storage_interface.go @@ -35,6 +35,7 @@ type StorageInterface interface { CleanAll() error // Purge 批量删除缓存 + // urlType 值为file|dir Purge(keys []string, urlType string) error // Stop 停止缓存策略 diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index bc7e04a..3bb0e6d 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -267,8 +267,10 @@ func (this *MemoryStorage) Purge(keys []string, urlType string) error { return err } } + return nil } + // URL for _, key := range keys { err := this.Delete(key) if err != nil { diff --git a/internal/iplibrary/list_utils.go b/internal/iplibrary/list_utils.go index 0b92e1e..bd2d604 100644 --- a/internal/iplibrary/list_utils.go +++ b/internal/iplibrary/list_utils.go @@ -10,6 +10,11 @@ import ( // AllowIP 检查IP是否被允许访问 // 如果一个IP不在任何名单中,则允许访问 func AllowIP(ip string, serverId int64) (canGoNext bool, inAllowList bool) { + // 放行lo + if ip == "127.0.0.1" { + return true, true + } + var ipLong = utils.IP2Long(ip) if ipLong == 0 { return false, false diff --git a/internal/nodes/api_stream.go b/internal/nodes/api_stream.go index 5a9746b..bee2b67 100644 --- a/internal/nodes/api_stream.go +++ b/internal/nodes/api_stream.go @@ -3,14 +3,12 @@ package nodes import ( "bytes" "context" - "crypto/tls" "encoding/json" "fmt" "github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/caches" - "github.com/TeaOSLab/EdgeNode/internal/compressions" "github.com/TeaOSLab/EdgeNode/internal/configs" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/errors" @@ -22,16 +20,11 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/maps" - "io" - "net" - "net/http" "net/url" "os/exec" "regexp" "runtime" "strconv" - "strings" - "sync" "time" ) @@ -116,10 +109,6 @@ func (this *APIStream) loop() error { err = this.handleStatCache(message) case messageconfigs.MessageCodeCleanCache: // 清理缓存 err = this.handleCleanCache(message) - case messageconfigs.MessageCodePurgeCache: // 删除缓存 - err = this.handlePurgeCache(message) - case messageconfigs.MessageCodePreheatCache: // 预热缓存 - err = this.handlePreheatCache(message) case messageconfigs.MessageCodeNewNodeTask: // 有新的任务 err = this.handleNewNodeTask(message) case messageconfigs.MessageCodeCheckSystemdService: // 检查Systemd服务 @@ -335,224 +324,6 @@ func (this *APIStream) handleCleanCache(message *pb.NodeStreamMessage) error { return nil } -// 删除缓存 -func (this *APIStream) handlePurgeCache(message *pb.NodeStreamMessage) error { - msg := &messageconfigs.PurgeCacheMessage{} - err := json.Unmarshal(message.DataJSON, msg) - if err != nil { - this.replyFail(message.RequestId, "decode message data failed: "+err.Error()) - return err - } - - storage, shouldStop, err := this.cacheStorage(message, msg.CachePolicyJSON) - if err != nil { - return err - } - if shouldStop { - defer func() { - storage.Stop() - }() - } - - // WEBP缓存 - if msg.Type == "file" { - var keys = msg.Keys - for _, key := range keys { - keys = append(keys, - key+caches.SuffixMethod+"HEAD", - key+caches.SuffixWebP, - key+caches.SuffixPartial, - ) - // TODO 根据实际缓存的内容进行组合 - for _, encoding := range compressions.AllEncodings() { - keys = append(keys, key+caches.SuffixCompression+encoding) - keys = append(keys, key+caches.SuffixWebP+caches.SuffixCompression+encoding) - } - } - msg.Keys = keys - } - - err = storage.Purge(msg.Keys, msg.Type) - if err != nil { - this.replyFail(message.RequestId, "purge keys failed: "+err.Error()) - return err - } - - this.replyOk(message.RequestId, "ok") - - return nil -} - -// 预热缓存 -func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error { - msg := &messageconfigs.PreheatCacheMessage{} - err := json.Unmarshal(message.DataJSON, msg) - if err != nil { - this.replyFail(message.RequestId, "decode message data failed: "+err.Error()) - return err - } - - storage, shouldStop, err := this.cacheStorage(message, msg.CachePolicyJSON) - if err != nil { - return err - } - if shouldStop { - defer func() { - storage.Stop() - }() - } - - if len(msg.Keys) == 0 { - this.replyOk(message.RequestId, "ok") - return nil - } - - wg := sync.WaitGroup{} - wg.Add(len(msg.Keys)) - client := &http.Client{ - Timeout: 30 * time.Second, // TODO 可以设置请求超时时间 - Transport: &http.Transport{ - DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { - _, port, err := net.SplitHostPort(addr) - if err != nil { - return nil, err - } - return net.Dial(network, "127.0.0.1:"+port) - }, - MaxIdleConns: 4096, - MaxIdleConnsPerHost: 32, - MaxConnsPerHost: 32, - IdleConnTimeout: 2 * time.Minute, - ExpectContinueTimeout: 1 * time.Second, - TLSHandshakeTimeout: 0, - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, - }, - }, - } - defer client.CloseIdleConnections() - errorMessages := []string{} - locker := sync.Mutex{} - for _, key := range msg.Keys { - go func(key string) { - defer wg.Done() - - req, err := http.NewRequest(http.MethodGet, key, nil) - if err != nil { - locker.Lock() - errorMessages = append(errorMessages, "invalid url: "+key+": "+err.Error()) - locker.Unlock() - return - } - - // TODO 可以在管理界面自定义Header - req.Header.Set("X-Cache-Action", "preheat") - req.Header.Set("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36") - req.Header.Set("Accept-Encoding", "gzip, deflate, br") // TODO 这里需要记录下缓存是否为gzip的 - resp, err := client.Do(req) - if err != nil { - locker.Lock() - errorMessages = append(errorMessages, "request failed: "+key+": "+err.Error()) - locker.Unlock() - return - } - - if resp.StatusCode != 200 { - locker.Lock() - errorMessages = append(errorMessages, "request failed: "+key+": status code '"+strconv.Itoa(resp.StatusCode)+"'") - locker.Unlock() - return - } - - defer func() { - _ = resp.Body.Close() - }() - - // 检查最大内容长度 - // TODO 需要解决Chunked Transfer Encoding的长度判断问题 - maxSize := storage.Policy().MaxSizeBytes() - if maxSize > 0 && resp.ContentLength > maxSize { - locker.Lock() - errorMessages = append(errorMessages, "request failed: the content is too larger than policy setting") - locker.Unlock() - return - } - - expiredAt := time.Now().Unix() + 8600 - writer, err := storage.OpenWriter(key, expiredAt, 200, resp.ContentLength, -1, false) // TODO 可以设置缓存过期时间 - if err != nil { - locker.Lock() - errorMessages = append(errorMessages, "open cache writer failed: "+key+": "+err.Error()) - locker.Unlock() - return - } - - buf := make([]byte, 16*1024) - isClosed := false - - // 写入Header - for k, v := range resp.Header { - for _, v1 := range v { - _, err = writer.WriteHeader([]byte(k + ":" + v1 + "\n")) - if err != nil { - locker.Lock() - errorMessages = append(errorMessages, "write failed: "+key+": "+err.Error()) - locker.Unlock() - return - } - } - } - - // 写入Body - for { - n, err := resp.Body.Read(buf) - if n > 0 { - _, writerErr := writer.Write(buf[:n]) - if writerErr != nil { - locker.Lock() - errorMessages = append(errorMessages, "write failed: "+key+": "+writerErr.Error()) - locker.Unlock() - break - } - } - if err != nil { - if err == io.EOF { - - err = writer.Close() - if err == nil { - storage.AddToList(&caches.Item{ - Type: writer.ItemType(), - Key: key, - ExpiredAt: expiredAt, - }) - } - isClosed = true - } else { - locker.Lock() - errorMessages = append(errorMessages, "read url failed: "+key+": "+err.Error()) - locker.Unlock() - } - break - } - } - - if !isClosed { - _ = writer.Close() - } - }(key) - } - wg.Wait() - - if len(errorMessages) > 0 { - this.replyFail(message.RequestId, strings.Join(errorMessages, ", ")) - return nil - } - - this.replyOk(message.RequestId, "ok") - - return nil -} - // 处理配置变化 func (this *APIStream) handleNewNodeTask(message *pb.NodeStreamMessage) error { select { diff --git a/internal/nodes/http_cache_task_manager.go b/internal/nodes/http_cache_task_manager.go new file mode 100644 index 0000000..74c300e --- /dev/null +++ b/internal/nodes/http_cache_task_manager.go @@ -0,0 +1,215 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package nodes + +import ( + "context" + "crypto/tls" + "errors" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeNode/internal/caches" + "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" + "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/TeaOSLab/EdgeNode/internal/rpc" + "github.com/iwind/TeaGo/Tea" + "io" + "io/ioutil" + "net" + "net/http" + "regexp" + "time" +) + +func init() { + events.On(events.EventStart, func() { + goman.New(func() { + SharedHTTPCacheTaskManager.Start() + }) + }) +} + +var SharedHTTPCacheTaskManager = NewHTTPCacheTaskManager() + +// HTTPCacheTaskManager 缓存任务管理 +type HTTPCacheTaskManager struct { + ticker *time.Ticker + httpClient *http.Client + protocolReg *regexp.Regexp + + taskQueue chan *pb.PurgeServerCacheRequest +} + +func NewHTTPCacheTaskManager() *HTTPCacheTaskManager { + var duration = 1 * time.Minute + if Tea.IsTesting() { + duration = 10 * time.Second + } + return &HTTPCacheTaskManager{ + ticker: time.NewTicker(duration), + httpClient: &http.Client{ + Timeout: 30 * time.Second, // TODO 可以设置请求超时时间 + Transport: &http.Transport{ + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + _, port, err := net.SplitHostPort(addr) + if err != nil { + return nil, err + } + return net.Dial(network, "127.0.0.1:"+port) + }, + MaxIdleConns: 128, + MaxIdleConnsPerHost: 32, + MaxConnsPerHost: 32, + IdleConnTimeout: 2 * time.Minute, + ExpectContinueTimeout: 1 * time.Second, + TLSHandshakeTimeout: 0, + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + }, + }, + protocolReg: regexp.MustCompile(`^(?i)(http|https)://`), + taskQueue: make(chan *pb.PurgeServerCacheRequest, 1024), + } +} + +func (this *HTTPCacheTaskManager) Start() { + // task queue + goman.New(func() { + rpcClient, _ := rpc.SharedRPC() + + if rpcClient != nil { + for taskReq := range this.taskQueue { + _, err := rpcClient.ServerRPC().PurgeServerCache(rpcClient.Context(), taskReq) + if err != nil { + remotelogs.Error("HTTP_CACHE_TASK_MANAGER", "create purge task failed: "+err.Error()) + } + } + } + }) + + // Loop + for range this.ticker.C { + err := this.Loop() + if err != nil { + remotelogs.Error("HTTP_CACHE_TASK_MANAGER", "execute task failed: "+err.Error()) + } + } +} + +func (this *HTTPCacheTaskManager) Loop() error { + rpcClient, err := rpc.SharedRPC() + if err != nil { + return err + } + + resp, err := rpcClient.HTTPCacheTaskKeyRPC().FindDoingHTTPCacheTaskKeys(rpcClient.Context(), &pb.FindDoingHTTPCacheTaskKeysRequest{}) + if err != nil { + return err + } + + var keys = resp.HttpCacheTaskKeys + if len(keys) == 0 { + return nil + } + + var pbResults = []*pb.UpdateHTTPCacheTaskKeysStatusRequest_KeyResult{} + + for _, key := range keys { + err = this.processKey(key) + + var pbResult = &pb.UpdateHTTPCacheTaskKeysStatusRequest_KeyResult{ + Id: key.Id, + NodeClusterId: key.NodeClusterId, + Error: "", + } + + if err != nil { + pbResult.Error = err.Error() + } + pbResults = append(pbResults, pbResult) + } + + _, err = rpcClient.HTTPCacheTaskKeyRPC().UpdateHTTPCacheTaskKeysStatus(rpcClient.Context(), &pb.UpdateHTTPCacheTaskKeysStatusRequest{KeyResults: pbResults}) + if err != nil { + return err + } + + return nil +} + +func (this *HTTPCacheTaskManager) PushTaskKeys(keys []string) { + select { + case this.taskQueue <- &pb.PurgeServerCacheRequest{ + Keys: keys, + Prefixes: nil, + }: + default: + } +} + +func (this *HTTPCacheTaskManager) processKey(key *pb.HTTPCacheTaskKey) error { + switch key.Type { + case "purge": + var storages = caches.SharedManager.FindAllStorages() + for _, storage := range storages { + switch key.KeyType { + case "key": + err := storage.Purge([]string{key.Key}, "file") + if err != nil { + return err + } + case "prefix": + err := storage.Purge([]string{key.Key}, "dir") + if err != nil { + return err + } + } + } + case "fetch": + err := this.fetchKey(key) + if err != nil { + return err + } + default: + return errors.New("invalid operation type '" + key.Type + "'") + } + + return nil +} + +// TODO 增加失败重试 +func (this *HTTPCacheTaskManager) fetchKey(key *pb.HTTPCacheTaskKey) error { + var fullKey = key.Key + if !this.protocolReg.MatchString(fullKey) { + fullKey = "https://" + fullKey + } + + req, err := http.NewRequest(http.MethodGet, fullKey, nil) + if err != nil { + return errors.New("invalid url: " + fullKey + ": " + err.Error()) + } + + // TODO 可以在管理界面自定义Header + req.Header.Set("X-Edge-Cache-Action", "fetch") + req.Header.Set("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36") // TODO 可以定义 + req.Header.Set("Accept-Encoding", "gzip, deflate, br") + resp, err := this.httpClient.Do(req) + if err != nil { + return errors.New("request failed: " + fullKey + ": " + err.Error()) + } + + defer func() { + _ = resp.Body.Close() + }() + + // 读取内容,以便于生成缓存 + _, _ = io.Copy(ioutil.Discard, resp.Body) + + // 处理502 + if resp.StatusCode == http.StatusBadGateway { + return errors.New("read origin site timeout") + } + + return nil +} diff --git a/internal/nodes/http_cache_task_manager_test.go b/internal/nodes/http_cache_task_manager_test.go new file mode 100644 index 0000000..29e0bd6 --- /dev/null +++ b/internal/nodes/http_cache_task_manager_test.go @@ -0,0 +1,25 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package nodes_test + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" + "github.com/TeaOSLab/EdgeNode/internal/caches" + "github.com/TeaOSLab/EdgeNode/internal/nodes" + "testing" +) + +func TestHTTPCacheTaskManager_Loop(t *testing.T) { + // initialize cache policies + config, err := nodeconfigs.SharedNodeConfig() + if err != nil { + t.Fatal(err) + } + caches.SharedManager.UpdatePolicies(config.HTTPCachePolicies) + + var manager = nodes.NewHTTPCacheTaskManager() + err = manager.Loop() + if err != nil { + t.Fatal(err) + } +} diff --git a/internal/nodes/http_request_cache.go b/internal/nodes/http_request_cache.go index a5ecd89..bc07ad2 100644 --- a/internal/nodes/http_request_cache.go +++ b/internal/nodes/http_request_cache.go @@ -3,12 +3,9 @@ package nodes import ( "bytes" "errors" - "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeNode/internal/caches" "github.com/TeaOSLab/EdgeNode/internal/compressions" - "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" - "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/utils" rangeutils "github.com/TeaOSLab/EdgeNode/internal/utils/ranges" "github.com/iwind/TeaGo/types" @@ -33,11 +30,6 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { return } - // 判断是否在预热 - if (strings.HasPrefix(this.RawReq.RemoteAddr, "127.") || strings.HasPrefix(this.RawReq.RemoteAddr, "[::1]")) && this.RawReq.Header.Get("X-Cache-Action") == "preheat" { - return - } - // 添加 X-Cache Header var addStatusHeader = this.web.Cache.AddStatusHeader if addStatusHeader { @@ -89,6 +81,12 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { return } + // 是否正在Purge + var isPurging = this.web.Cache.PurgeIsOn && strings.ToUpper(this.RawReq.Method) == "PURGE" && this.RawReq.Header.Get("X-Edge-Purge-Key") == this.web.Cache.PurgeKey + if isPurging { + this.RawReq.Method = http.MethodGet + } + // 校验请求 if !this.cacheRef.MatchRequest(this.RawReq) { this.cacheRef = nil @@ -136,8 +134,13 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { } this.writer.cacheStorage = storage + // 如果正在预热,则不读取缓存,等待下一个步骤重新生成 + if (strings.HasPrefix(this.RawReq.RemoteAddr, "127.") || strings.HasPrefix(this.RawReq.RemoteAddr, "[::1]")) && this.RawReq.Header.Get("X-Edge-Cache-Action") == "fetch" { + return + } + // 判断是否在Purge - if this.web.Cache.PurgeIsOn && strings.ToUpper(this.RawReq.Method) == "PURGE" && this.RawReq.Header.Get("X-Edge-Purge-Key") == this.web.Cache.PurgeKey { + if isPurging { this.varMapping["cache.status"] = "PURGE" var subKeys = []string{ @@ -159,22 +162,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { } // 通过API节点清除别节点上的的Key - // TODO 改为队列,不需要每个请求都使用goroutine - goman.New(func() { - rpcClient, err := rpc.SharedRPC() - if err == nil { - for _, rpcServerService := range rpcClient.ServerRPCList() { - _, err = rpcServerService.PurgeServerCache(rpcClient.Context(), &pb.PurgeServerCacheRequest{ - Domains: []string{this.ReqHost}, - Keys: []string{key}, - Prefixes: nil, - }) - if err != nil { - remotelogs.Error("HTTP_REQUEST_CACHE", "purge failed: "+err.Error()) - } - } - } - }) + SharedHTTPCacheTaskManager.PushTaskKeys([]string{key}) return true } diff --git a/internal/rpc/rpc_client.go b/internal/rpc/rpc_client.go index e23fd3e..32222d2 100644 --- a/internal/rpc/rpc_client.go +++ b/internal/rpc/rpc_client.go @@ -67,6 +67,10 @@ func (this *RPCClient) HTTPAccessLogRPC() pb.HTTPAccessLogServiceClient { return pb.NewHTTPAccessLogServiceClient(this.pickConn()) } +func (this *RPCClient) HTTPCacheTaskKeyRPC() pb.HTTPCacheTaskKeyServiceClient { + return pb.NewHTTPCacheTaskKeyServiceClient(this.pickConn()) +} + func (this *RPCClient) APINodeRPC() pb.APINodeServiceClient { return pb.NewAPINodeServiceClient(this.pickConn()) } @@ -115,18 +119,6 @@ func (this *RPCClient) ServerRPC() pb.ServerServiceClient { return pb.NewServerServiceClient(this.pickConn()) } -func (this *RPCClient) ServerRPCList() []pb.ServerServiceClient { - this.locker.Lock() - defer this.locker.Unlock() - - var clients = []pb.ServerServiceClient{} - for _, conn := range this.conns { - clients = append(clients, pb.NewServerServiceClient(conn)) - } - - return clients -} - func (this *RPCClient) ServerDailyStatRPC() pb.ServerDailyStatServiceClient { return pb.NewServerDailyStatServiceClient(this.pickConn()) }