优化指标统计性能

This commit is contained in:
GoEdgeLab
2023-08-15 20:12:09 +08:00
parent 42ebb4514d
commit f7851563a9
7 changed files with 131 additions and 29 deletions

View File

@@ -26,9 +26,9 @@ func init() {
type Manager struct {
isQuiting bool
tasks map[int64]*Task // itemId => *Task
categoryTasks map[string][]*Task // category => []*Task
locker sync.RWMutex
taskMap map[int64]*Task // itemId => *Task
categoryTaskMap map[string][]*Task // category => []*Task
locker sync.RWMutex
hasHTTPMetrics bool
hasTCPMetrics bool
@@ -37,8 +37,8 @@ type Manager struct {
func NewManager() *Manager {
return &Manager{
tasks: map[int64]*Task{},
categoryTasks: map[string][]*Task{},
taskMap: map[int64]*Task{},
categoryTaskMap: map[string][]*Task{},
}
}
@@ -56,7 +56,7 @@ func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) {
}
// 停用以前的 或 修改现在的
for itemId, task := range this.tasks {
for itemId, task := range this.taskMap {
newItem, ok := newMap[itemId]
if !ok || !newItem.IsOn { // 停用以前的
remotelogs.Println("METRIC_MANAGER", "stop task '"+strconv.FormatInt(itemId, 10)+"'")
@@ -64,7 +64,7 @@ func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) {
if err != nil {
remotelogs.Error("METRIC_MANAGER", "stop task '"+strconv.FormatInt(itemId, 10)+"' failed: "+err.Error())
}
delete(this.tasks, itemId)
delete(this.taskMap, itemId)
} else { // 更新已存在的
if newItem.Version != task.item.Version {
remotelogs.Println("METRIC_MANAGER", "update task '"+strconv.FormatInt(itemId, 10)+"'")
@@ -78,7 +78,7 @@ func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) {
if !newItem.IsOn {
continue
}
_, ok := this.tasks[newItem.Id]
_, ok := this.taskMap[newItem.Id]
if !ok {
remotelogs.Println("METRIC_MANAGER", "start task '"+strconv.FormatInt(newItem.Id, 10)+"'")
task := NewTask(newItem)
@@ -92,7 +92,7 @@ func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) {
remotelogs.Error("METRIC_MANAGER", "start task failed: "+err.Error())
continue
}
this.tasks[newItem.Id] = task
this.taskMap[newItem.Id] = task
}
}
@@ -100,11 +100,11 @@ func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) {
this.hasHTTPMetrics = false
this.hasTCPMetrics = false
this.hasUDPMetrics = false
this.categoryTasks = map[string][]*Task{}
for _, task := range this.tasks {
tasks := this.categoryTasks[task.item.Category]
this.categoryTaskMap = map[string][]*Task{}
for _, task := range this.taskMap {
var tasks = this.categoryTaskMap[task.item.Category]
tasks = append(tasks, task)
this.categoryTasks[task.item.Category] = tasks
this.categoryTaskMap[task.item.Category] = tasks
switch task.item.Category {
case serverconfigs.MetricItemCategoryHTTP:
@@ -124,10 +124,12 @@ func (this *Manager) Add(obj MetricInterface) {
}
this.locker.RLock()
for _, task := range this.categoryTasks[obj.MetricCategory()] {
var tasks = this.categoryTaskMap[obj.MetricCategory()]
this.locker.RUnlock()
for _, task := range tasks {
task.Add(obj)
}
this.locker.RUnlock()
}
func (this *Manager) HasHTTPMetrics() bool {
@@ -149,9 +151,9 @@ func (this *Manager) Quit() {
remotelogs.Println("METRIC_MANAGER", "quit")
this.locker.Lock()
for _, task := range this.tasks {
for _, task := range this.taskMap {
_ = task.Stop()
}
this.tasks = map[int64]*Task{}
this.taskMap = map[int64]*Task{}
this.locker.Unlock()
}

View File

@@ -11,7 +11,7 @@ func TestNewManager(t *testing.T) {
var manager = NewManager()
{
manager.Update([]*serverconfigs.MetricItemConfig{})
for _, task := range manager.tasks {
for _, task := range manager.taskMap {
t.Log(task.item.Id)
}
}
@@ -28,7 +28,7 @@ func TestNewManager(t *testing.T) {
Id: 3,
},
})
for _, task := range manager.tasks {
for _, task := range manager.taskMap {
t.Log("task:", task.item.Id)
}
}
@@ -43,7 +43,7 @@ func TestNewManager(t *testing.T) {
Id: 2,
},
})
for _, task := range manager.tasks {
for _, task := range manager.taskMap {
t.Log("task:", task.item.Id)
}
}
@@ -56,7 +56,7 @@ func TestNewManager(t *testing.T) {
Version: 1,
},
})
for _, task := range manager.tasks {
for _, task := range manager.taskMap {
t.Log("task:", task.item.Id)
}
}

View File

@@ -17,6 +17,6 @@ type Stat struct {
}
func SumStat(serverId int64, keys []string, time string, version int32, itemId int64) string {
keysData := strings.Join(keys, "$EDGE$")
var keysData = strings.Join(keys, "$EDGE$")
return strconv.FormatUint(fnv.HashString(strconv.FormatInt(serverId, 10)+"@"+keysData+"@"+time+"@"+strconv.Itoa(int(version))+"@"+strconv.FormatInt(itemId, 10)), 10)
}

View File

@@ -0,0 +1,20 @@
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package metrics_test
import (
"github.com/TeaOSLab/EdgeNode/internal/metrics"
timeutil "github.com/iwind/TeaGo/utils/time"
"runtime"
"testing"
)
func BenchmarkSumStat(b *testing.B) {
runtime.GOMAXPROCS(2)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
metrics.SumStat(1, []string{"1.2.3.4"}, timeutil.Format("Ymd"), 1, 1)
}
})
}

View File

@@ -19,6 +19,7 @@ import (
"os"
"strconv"
"sync"
"sync/atomic"
"time"
)
@@ -59,7 +60,7 @@ type Task struct {
serverIdMapLocker sync.Mutex
statsMap map[string]*Stat // 待写入队列hash => *Stat
statsLocker sync.Mutex
statsLocker sync.RWMutex
statsTicker *utils.Ticker
}
@@ -237,7 +238,7 @@ func (this *Task) Add(obj MetricInterface) {
var keys = []string{}
for _, key := range this.item.Keys {
k := obj.MetricKey(key)
var k = obj.MetricKey(key)
// 忽略499状态
if key == "${status}" && k == "499" {
@@ -253,14 +254,19 @@ func (this *Task) Add(obj MetricInterface) {
}
var hash = SumStat(obj.MetricServerId(), keys, this.item.CurrentTime(), this.item.Version, this.item.Id)
this.statsLocker.Lock()
var countItems int
this.statsLocker.RLock()
oldStat, ok := this.statsMap[hash]
if !ok {
countItems = len(this.statsMap)
}
this.statsLocker.RUnlock()
if ok {
oldStat.Value += v
oldStat.Hash = hash
atomic.AddInt64(&oldStat.Value, 1)
} else {
// 防止过载
if len(this.statsMap) < MaxQueueSize {
if countItems < MaxQueueSize {
this.statsLocker.Lock()
this.statsMap[hash] = &Stat{
ServerId: obj.MetricServerId(),
Keys: keys,
@@ -268,9 +274,9 @@ func (this *Task) Add(obj MetricInterface) {
Time: this.item.CurrentTime(),
Hash: hash,
}
this.statsLocker.Unlock()
}
}
this.statsLocker.Unlock()
}
// Stop 停止任务

View File

@@ -4,11 +4,15 @@ package metrics_test
import (
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/metrics"
"github.com/TeaOSLab/EdgeNode/internal/utils/testutils"
_ "github.com/iwind/TeaGo/bootstrap"
"github.com/iwind/TeaGo/rands"
"log"
"runtime"
"sync"
"testing"
"time"
)
@@ -213,3 +217,65 @@ func TestTask_Upload(t *testing.T) {
}
t.Log("ok")
}
var testingTask *metrics.Task
var testingTaskInitOnce = &sync.Once{}
func initTestingTask() {
testingTask = metrics.NewTask(&serverconfigs.MetricItemConfig{
Id: 1,
IsOn: false,
Category: "tcp",
Period: 1,
PeriodUnit: serverconfigs.MetricItemPeriodUnitDay,
Keys: []string{"${remoteAddr}"},
Value: "${countRequest}",
})
err := testingTask.Init()
if err != nil {
log.Fatal(err)
}
err = testingTask.Start()
if err != nil {
log.Fatal(err)
}
}
func BenchmarkTask_Add(b *testing.B) {
runtime.GOMAXPROCS(1)
testingTaskInitOnce.Do(func() {
initTestingTask()
})
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
testingTask.Add(&taskRequest{})
}
})
}
type taskRequest struct {
}
func (this *taskRequest) MetricKey(key string) string {
return configutils.ParseVariables(key, func(varName string) (value string) {
return "1.2.3.4"
})
}
func (this *taskRequest) MetricValue(value string) (result int64, ok bool) {
return 1, true
}
func (this *taskRequest) MetricServerId() int64 {
return 1
}
func (this *taskRequest) MetricCategory() string {
return "http"
}

View File

@@ -14,3 +14,11 @@ func TestHash(t *testing.T) {
t.Log(key + " => " + types.String(h))
}
}
func BenchmarkHashString(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
fnv.HashString("abcdefh")
}
})
}