From 91e82dde8ec3747b2a62a99275800e00b535cf6e Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Sun, 17 Oct 2021 20:23:10 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0PURGE=E6=9F=90=E4=B8=AAURL?= =?UTF-8?q?=E7=BC=93=E5=AD=98=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/caches/item.go | 2 +- internal/caches/storage_memory_test.go | 38 ++++++++++++++++++++++++++ internal/nodes/http_request_cache.go | 28 +++++++++++++++++++ internal/rpc/rpc_client.go | 12 ++++++++ 4 files changed, 79 insertions(+), 1 deletion(-) diff --git a/internal/caches/item.go b/internal/caches/item.go index e86af0c..6589be4 100644 --- a/internal/caches/item.go +++ b/internal/caches/item.go @@ -27,7 +27,7 @@ func (this *Item) IsExpired() bool { } func (this *Item) TotalSize() int64 { - return this.Size() + this.MetaSize + int64(len(this.Key)) + 64 + return this.Size() + this.MetaSize + int64(len(this.Key)) + int64(len(this.Host)) + 64 } func (this *Item) Size() int64 { diff --git a/internal/caches/storage_memory_test.go b/internal/caches/storage_memory_test.go index dbd7efa..1128fb8 100644 --- a/internal/caches/storage_memory_test.go +++ b/internal/caches/storage_memory_test.go @@ -1,9 +1,12 @@ package caches import ( + "bytes" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/rands" + "runtime" + "runtime/debug" "strconv" "testing" "time" @@ -265,3 +268,38 @@ func TestMemoryStorage_Locker(t *testing.T) { } t.Log("ok") } + +func TestMemoryStorage_Stop(t *testing.T) { + var stat1 = &runtime.MemStats{} + runtime.ReadMemStats(stat1) + + var m = map[uint64]*MemoryItem{} + for i := 0; i < 1_000_000; i++ { + m[uint64(i)] = &MemoryItem{ + HeaderValue: []byte("Hello, World"), + BodyValue: bytes.Repeat([]byte("Hello"), 1024), + } + } + + m = map[uint64]*MemoryItem{} + + var before = time.Now() + //runtime.GC() + debug.FreeOSMemory() + /**go func() { + time.Sleep(10 * time.Second) + runtime.GC() + }()**/ + t.Log(time.Since(before).Seconds()*1000, "ms") + + var stat2 = &runtime.MemStats{} + runtime.ReadMemStats(stat2) + + if stat2.HeapInuse > stat1.HeapInuse { + t.Log(stat2.HeapInuse, stat1.HeapInuse, (stat2.HeapInuse-stat1.HeapInuse)/1024/1024, "MB") + } else { + t.Log("0 MB") + } + + t.Log(len(m)) +} diff --git a/internal/nodes/http_request_cache.go b/internal/nodes/http_request_cache.go index 1bff327..54525a6 100644 --- a/internal/nodes/http_request_cache.go +++ b/internal/nodes/http_request_cache.go @@ -3,8 +3,10 @@ package nodes import ( "bytes" "errors" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeNode/internal/caches" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/TeaOSLab/EdgeNode/internal/rpc" "net/http" "path/filepath" "strconv" @@ -109,6 +111,32 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) { return } + // 判断是否在Purge + if this.web.Cache.PurgeIsOn && strings.ToUpper(this.RawReq.Method) == "PURGE" && this.RawReq.Header.Get("Edge-Purge-Key") == this.web.Cache.PurgeKey { + err := storage.Delete(key) + if err != nil { + remotelogs.Error("HTTP_REQUEST_CACHE", "purge failed: "+err.Error()) + } + + go func() { + rpcClient, err := rpc.SharedRPC() + if err == nil { + for _, rpcServerService := range rpcClient.ServerRPCList() { + _, err = rpcServerService.PurgeServerCache(rpcClient.Context(), &pb.PurgeServerCacheRequest{ + Domains: []string{this.Host}, + Keys: []string{key}, + Prefixes: nil, + }) + if err != nil { + remotelogs.Error("HTTP_REQUEST_CACHE", "purge failed: "+err.Error()) + } + } + } + }() + + return true + } + buf := bytePool32k.Get() defer func() { bytePool32k.Put(buf) diff --git a/internal/rpc/rpc_client.go b/internal/rpc/rpc_client.go index 072262f..1deb114 100644 --- a/internal/rpc/rpc_client.go +++ b/internal/rpc/rpc_client.go @@ -105,6 +105,18 @@ func (this *RPCClient) ServerRPC() pb.ServerServiceClient { return pb.NewServerServiceClient(this.pickConn()) } +func (this *RPCClient) ServerRPCList() []pb.ServerServiceClient { + this.locker.Lock() + defer this.locker.Unlock() + + var clients = []pb.ServerServiceClient{} + for _, conn := range this.conns { + clients = append(clients, pb.NewServerServiceClient(conn)) + } + + return clients +} + func (this *RPCClient) ServerDailyStatRPC() pb.ServerDailyStatServiceClient { return pb.NewServerDailyStatServiceClient(this.pickConn()) }