From 40e137e69ebb4eb56a7ffd39f9ecc37e2044bdfa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E7=A5=A5=E8=B6=85?= Date: Tue, 15 Aug 2023 20:12:09 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=8C=87=E6=A0=87=E7=BB=9F?= =?UTF-8?q?=E8=AE=A1=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/metrics/manager.go | 36 +++++++++-------- internal/metrics/manager_test.go | 8 ++-- internal/metrics/stat.go | 2 +- internal/metrics/sum_test.go | 20 ++++++++++ internal/metrics/task.go | 20 ++++++---- internal/metrics/task_test.go | 66 ++++++++++++++++++++++++++++++++ internal/utils/fnv/hash_test.go | 8 ++++ 7 files changed, 131 insertions(+), 29 deletions(-) create mode 100644 internal/metrics/sum_test.go diff --git a/internal/metrics/manager.go b/internal/metrics/manager.go index 0202634..ad08f16 100644 --- a/internal/metrics/manager.go +++ b/internal/metrics/manager.go @@ -26,9 +26,9 @@ func init() { type Manager struct { isQuiting bool - tasks map[int64]*Task // itemId => *Task - categoryTasks map[string][]*Task // category => []*Task - locker sync.RWMutex + taskMap map[int64]*Task // itemId => *Task + categoryTaskMap map[string][]*Task // category => []*Task + locker sync.RWMutex hasHTTPMetrics bool hasTCPMetrics bool @@ -37,8 +37,8 @@ type Manager struct { func NewManager() *Manager { return &Manager{ - tasks: map[int64]*Task{}, - categoryTasks: map[string][]*Task{}, + taskMap: map[int64]*Task{}, + categoryTaskMap: map[string][]*Task{}, } } @@ -56,7 +56,7 @@ func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) { } // 停用以前的 或 修改现在的 - for itemId, task := range this.tasks { + for itemId, task := range this.taskMap { newItem, ok := newMap[itemId] if !ok || !newItem.IsOn { // 停用以前的 remotelogs.Println("METRIC_MANAGER", "stop task '"+strconv.FormatInt(itemId, 10)+"'") @@ -64,7 +64,7 @@ func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) { if err != nil { remotelogs.Error("METRIC_MANAGER", "stop task '"+strconv.FormatInt(itemId, 10)+"' failed: "+err.Error()) } - delete(this.tasks, itemId) + delete(this.taskMap, itemId) } else { // 更新已存在的 if newItem.Version != task.item.Version { remotelogs.Println("METRIC_MANAGER", "update task '"+strconv.FormatInt(itemId, 10)+"'") @@ -78,7 +78,7 @@ func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) { if !newItem.IsOn { continue } - _, ok := this.tasks[newItem.Id] + _, ok := this.taskMap[newItem.Id] if !ok { remotelogs.Println("METRIC_MANAGER", "start task '"+strconv.FormatInt(newItem.Id, 10)+"'") task := NewTask(newItem) @@ -92,7 +92,7 @@ func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) { remotelogs.Error("METRIC_MANAGER", "start task failed: "+err.Error()) continue } - this.tasks[newItem.Id] = task + this.taskMap[newItem.Id] = task } } @@ -100,11 +100,11 @@ func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) { this.hasHTTPMetrics = false this.hasTCPMetrics = false this.hasUDPMetrics = false - this.categoryTasks = map[string][]*Task{} - for _, task := range this.tasks { - tasks := this.categoryTasks[task.item.Category] + this.categoryTaskMap = map[string][]*Task{} + for _, task := range this.taskMap { + var tasks = this.categoryTaskMap[task.item.Category] tasks = append(tasks, task) - this.categoryTasks[task.item.Category] = tasks + this.categoryTaskMap[task.item.Category] = tasks switch task.item.Category { case serverconfigs.MetricItemCategoryHTTP: @@ -124,10 +124,12 @@ func (this *Manager) Add(obj MetricInterface) { } this.locker.RLock() - for _, task := range this.categoryTasks[obj.MetricCategory()] { + var tasks = this.categoryTaskMap[obj.MetricCategory()] + this.locker.RUnlock() + + for _, task := range tasks { task.Add(obj) } - this.locker.RUnlock() } func (this *Manager) HasHTTPMetrics() bool { @@ -149,9 +151,9 @@ func (this *Manager) Quit() { remotelogs.Println("METRIC_MANAGER", "quit") this.locker.Lock() - for _, task := range this.tasks { + for _, task := range this.taskMap { _ = task.Stop() } - this.tasks = map[int64]*Task{} + this.taskMap = map[int64]*Task{} this.locker.Unlock() } diff --git a/internal/metrics/manager_test.go b/internal/metrics/manager_test.go index 8dc242f..600a4e9 100644 --- a/internal/metrics/manager_test.go +++ b/internal/metrics/manager_test.go @@ -11,7 +11,7 @@ func TestNewManager(t *testing.T) { var manager = NewManager() { manager.Update([]*serverconfigs.MetricItemConfig{}) - for _, task := range manager.tasks { + for _, task := range manager.taskMap { t.Log(task.item.Id) } } @@ -28,7 +28,7 @@ func TestNewManager(t *testing.T) { Id: 3, }, }) - for _, task := range manager.tasks { + for _, task := range manager.taskMap { t.Log("task:", task.item.Id) } } @@ -43,7 +43,7 @@ func TestNewManager(t *testing.T) { Id: 2, }, }) - for _, task := range manager.tasks { + for _, task := range manager.taskMap { t.Log("task:", task.item.Id) } } @@ -56,7 +56,7 @@ func TestNewManager(t *testing.T) { Version: 1, }, }) - for _, task := range manager.tasks { + for _, task := range manager.taskMap { t.Log("task:", task.item.Id) } } diff --git a/internal/metrics/stat.go b/internal/metrics/stat.go index 750359e..b48964a 100644 --- a/internal/metrics/stat.go +++ b/internal/metrics/stat.go @@ -17,6 +17,6 @@ type Stat struct { } func SumStat(serverId int64, keys []string, time string, version int32, itemId int64) string { - keysData := strings.Join(keys, "$EDGE$") + var keysData = strings.Join(keys, "$EDGE$") return strconv.FormatUint(fnv.HashString(strconv.FormatInt(serverId, 10)+"@"+keysData+"@"+time+"@"+strconv.Itoa(int(version))+"@"+strconv.FormatInt(itemId, 10)), 10) } diff --git a/internal/metrics/sum_test.go b/internal/metrics/sum_test.go new file mode 100644 index 0000000..65b3c56 --- /dev/null +++ b/internal/metrics/sum_test.go @@ -0,0 +1,20 @@ +// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package metrics_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/metrics" + timeutil "github.com/iwind/TeaGo/utils/time" + "runtime" + "testing" +) + +func BenchmarkSumStat(b *testing.B) { + runtime.GOMAXPROCS(2) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + metrics.SumStat(1, []string{"1.2.3.4"}, timeutil.Format("Ymd"), 1, 1) + } + }) +} diff --git a/internal/metrics/task.go b/internal/metrics/task.go index ec5b551..11b1bde 100644 --- a/internal/metrics/task.go +++ b/internal/metrics/task.go @@ -19,6 +19,7 @@ import ( "os" "strconv" "sync" + "sync/atomic" "time" ) @@ -59,7 +60,7 @@ type Task struct { serverIdMapLocker sync.Mutex statsMap map[string]*Stat // 待写入队列,hash => *Stat - statsLocker sync.Mutex + statsLocker sync.RWMutex statsTicker *utils.Ticker } @@ -237,7 +238,7 @@ func (this *Task) Add(obj MetricInterface) { var keys = []string{} for _, key := range this.item.Keys { - k := obj.MetricKey(key) + var k = obj.MetricKey(key) // 忽略499状态 if key == "${status}" && k == "499" { @@ -253,14 +254,19 @@ func (this *Task) Add(obj MetricInterface) { } var hash = SumStat(obj.MetricServerId(), keys, this.item.CurrentTime(), this.item.Version, this.item.Id) - this.statsLocker.Lock() + var countItems int + this.statsLocker.RLock() oldStat, ok := this.statsMap[hash] + if !ok { + countItems = len(this.statsMap) + } + this.statsLocker.RUnlock() if ok { - oldStat.Value += v - oldStat.Hash = hash + atomic.AddInt64(&oldStat.Value, 1) } else { // 防止过载 - if len(this.statsMap) < MaxQueueSize { + if countItems < MaxQueueSize { + this.statsLocker.Lock() this.statsMap[hash] = &Stat{ ServerId: obj.MetricServerId(), Keys: keys, @@ -268,9 +274,9 @@ func (this *Task) Add(obj MetricInterface) { Time: this.item.CurrentTime(), Hash: hash, } + this.statsLocker.Unlock() } } - this.statsLocker.Unlock() } // Stop 停止任务 diff --git a/internal/metrics/task_test.go b/internal/metrics/task_test.go index be2d4a4..e97de0f 100644 --- a/internal/metrics/task_test.go +++ b/internal/metrics/task_test.go @@ -4,11 +4,15 @@ package metrics_test import ( "fmt" + "github.com/TeaOSLab/EdgeCommon/pkg/configutils" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/metrics" "github.com/TeaOSLab/EdgeNode/internal/utils/testutils" _ "github.com/iwind/TeaGo/bootstrap" "github.com/iwind/TeaGo/rands" + "log" + "runtime" + "sync" "testing" "time" ) @@ -213,3 +217,65 @@ func TestTask_Upload(t *testing.T) { } t.Log("ok") } + +var testingTask *metrics.Task +var testingTaskInitOnce = &sync.Once{} + +func initTestingTask() { + testingTask = metrics.NewTask(&serverconfigs.MetricItemConfig{ + Id: 1, + IsOn: false, + Category: "tcp", + Period: 1, + PeriodUnit: serverconfigs.MetricItemPeriodUnitDay, + Keys: []string{"${remoteAddr}"}, + Value: "${countRequest}", + }) + + err := testingTask.Init() + if err != nil { + log.Fatal(err) + } + + err = testingTask.Start() + if err != nil { + log.Fatal(err) + } +} + +func BenchmarkTask_Add(b *testing.B) { + runtime.GOMAXPROCS(1) + + testingTaskInitOnce.Do(func() { + initTestingTask() + }) + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + testingTask.Add(&taskRequest{}) + } + }) +} + +type taskRequest struct { +} + +func (this *taskRequest) MetricKey(key string) string { + return configutils.ParseVariables(key, func(varName string) (value string) { + return "1.2.3.4" + }) +} + +func (this *taskRequest) MetricValue(value string) (result int64, ok bool) { + return 1, true +} + +func (this *taskRequest) MetricServerId() int64 { + return 1 +} + +func (this *taskRequest) MetricCategory() string { + return "http" +} diff --git a/internal/utils/fnv/hash_test.go b/internal/utils/fnv/hash_test.go index 3b43293..a311c09 100644 --- a/internal/utils/fnv/hash_test.go +++ b/internal/utils/fnv/hash_test.go @@ -14,3 +14,11 @@ func TestHash(t *testing.T) { t.Log(key + " => " + types.String(h)) } } + +func BenchmarkHashString(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + fnv.HashString("abcdefh") + } + }) +} \ No newline at end of file