From d1c84fb0023ba2d078f0076d75a7e002f3ac2568 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Sun, 14 Aug 2022 16:28:40 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/caches/storage_file.go | 7 ++-- internal/caches/storage_memory.go | 3 +- internal/caches/storage_memory_test.go | 28 ++++++++++++++++ internal/compressions/writer_brotli_test.go | 36 +++++++++++++++++++++ internal/iplibrary/manager_province.go | 2 +- internal/metrics/task.go | 21 ++++++------ internal/nodes/http_client_pool.go | 1 + 7 files changed, 84 insertions(+), 14 deletions(-) diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index a8303ca..e66e6e9 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -65,12 +65,13 @@ var sharedWritingFileKeyLocker = sync.Mutex{} var maxOpenFiles = NewMaxOpenFiles() -const maxOpenFilesSlowCost = 5000 * time.Microsecond // 0.5ms +const maxOpenFilesSlowCost = 1000 * time.Microsecond // us const protectingLoadWhenDump = false // FileStorage 文件缓存 -// 文件结构: -// [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data] +// +// 文件结构: +// [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data] type FileStorage struct { policy *serverconfigs.HTTPCachePolicy options *serverconfigs.HTTPFileCacheStorage // 二级缓存 diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index 90e4df6..226867d 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -95,7 +95,8 @@ func (this *MemoryStorage) Init() error { // 启动定时Flush memory to disk任务 if this.parentStorage != nil { // TODO 应该根据磁盘性能决定线程数 - var threads = 1 + // TODO 线程数应该可以在缓存策略和节点中设定 + var threads = runtime.NumCPU() for i := 0; i < threads; i++ { goman.New(func() { diff --git a/internal/caches/storage_memory_test.go b/internal/caches/storage_memory_test.go index 27110fe..21eee0d 100644 --- a/internal/caches/storage_memory_test.go +++ b/internal/caches/storage_memory_test.go @@ -8,6 +8,7 @@ import ( "runtime" "runtime/debug" "strconv" + "sync" "testing" "time" ) @@ -304,3 +305,30 @@ func TestMemoryStorage_Stop(t *testing.T) { t.Log(len(m)) } + +func BenchmarkValuesMap(b *testing.B) { + var m = map[uint64]*MemoryItem{} + var count = 1_000_000 + for i := 0; i < count; i++ { + m[uint64(i)] = &MemoryItem{ + ExpiresAt: time.Now().Unix(), + } + } + b.Log(len(m)) + + var locker = sync.Mutex{} + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + locker.Lock() + _, ok := m[uint64(rands.Int(0, 1_000_000))] + _ = ok + locker.Unlock() + + locker.Lock() + delete(m, uint64(rands.Int(2, 1000000))) + locker.Unlock() + } + }) +} diff --git a/internal/compressions/writer_brotli_test.go b/internal/compressions/writer_brotli_test.go index c047805..befa227 100644 --- a/internal/compressions/writer_brotli_test.go +++ b/internal/compressions/writer_brotli_test.go @@ -5,10 +5,46 @@ package compressions_test import ( "bytes" "github.com/TeaOSLab/EdgeNode/internal/compressions" + stringutil "github.com/iwind/TeaGo/utils/string" "strings" "testing" + "time" ) +func TestBrotliWriter_LargeFile(t *testing.T) { + var data = []byte{} + for i := 0; i < 1024*1024; i++ { + data = append(data, stringutil.Rand(32)...) + } + t.Log(len(data)/1024/1024, "M") + + var before = time.Now() + defer func() { + t.Log(time.Since(before).Seconds()*1000, "ms") + }() + + var buf = &bytes.Buffer{} + writer, err := compressions.NewBrotliWriter(buf, 5) + if err != nil { + t.Fatal(err) + } + + var offset = 0 + var size = 4096 + for offset < len(data) { + _, err = writer.Write(data[offset : offset+size]) + if err != nil { + t.Fatal(err) + } + offset += size + } + + err = writer.Close() + if err != nil { + t.Fatal(err) + } +} + func BenchmarkBrotliWriter_Write(b *testing.B) { var data = []byte(strings.Repeat("A", 1024)) diff --git a/internal/iplibrary/manager_province.go b/internal/iplibrary/manager_province.go index 068e044..5ce1239 100644 --- a/internal/iplibrary/manager_province.go +++ b/internal/iplibrary/manager_province.go @@ -121,7 +121,7 @@ func (this *ProvinceManager) loop() error { if err != nil { return err } - resp, err := rpcClient.RegionProvinceRPC().FindAllRegionProvincesWithCountryId(rpcClient.Context(), &pb.FindAllRegionProvincesWithCountryIdRequest{ + resp, err := rpcClient.RegionProvinceRPC().FindAllRegionProvincesWithRegionCountryId(rpcClient.Context(), &pb.FindAllRegionProvincesWithRegionCountryIdRequest{ RegionCountryId: ChinaCountryId, }) if err != nil { diff --git a/internal/metrics/task.go b/internal/metrics/task.go index 207c0e7..aa4a45b 100644 --- a/internal/metrics/task.go +++ b/internal/metrics/task.go @@ -23,18 +23,19 @@ import ( "time" ) -const MaxQueueSize = 2048 +const MaxQueueSize = 256 // TODO 可以配置,可以在单个任务里配置 // Task 单个指标任务 // 数据库存储: -// data/ -// metric.$ID.db -// stats -// id, keys, value, time, serverId, hash -// 原理: -// 添加或者有变更时 isUploaded = false -// 上传时检查 isUploaded 状态 -// 只上传每个服务中排序最前面的 N 个数据 +// +// data/ +// metric.$ID.db +// stats +// id, keys, value, time, serverId, hash +// 原理: +// 添加或者有变更时 isUploaded = false +// 上传时检查 isUploaded 状态 +// 只上传每个服务中排序最前面的 N 个数据 type Task struct { item *serverconfigs.MetricItemConfig isLoaded bool @@ -372,7 +373,9 @@ func (this *Task) Upload(pauseDuration time.Duration) error { for _, serverId := range serverIds { for _, currentTime := range times { idStrings, err := func(serverId int64, currentTime string) (ids []string, err error) { + var t = trackers.Begin("[METRIC]SELECT_TOP_STMT") rows, err := this.selectTopStmt.Query(serverId, this.item.Version, currentTime) + t.End() if err != nil { return nil, err } diff --git a/internal/nodes/http_client_pool.go b/internal/nodes/http_client_pool.go index 3e08fab..16ef378 100644 --- a/internal/nodes/http_client_pool.go +++ b/internal/nodes/http_client_pool.go @@ -161,6 +161,7 @@ func (this *HTTPClientPool) Client(req *HTTPRequest, ExpectContinueTimeout: 1 * time.Second, TLSHandshakeTimeout: 5 * time.Second, TLSClientConfig: tlsConfig, + ReadBufferSize: 8 * 1024, Proxy: nil, }, }