mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2026-03-01 12:15:47 +08:00
实现新的计数器算法(将时间分片, 统计更加精准)
This commit is contained in:
173
internal/utils/counters/counter.go
Normal file
173
internal/utils/counters/counter.go
Normal file
@@ -0,0 +1,173 @@
|
||||
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
|
||||
package counters
|
||||
|
||||
import (
|
||||
syncutils "github.com/TeaOSLab/EdgeNode/internal/utils/sync"
|
||||
"github.com/cespare/xxhash"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
"runtime"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Counter struct {
|
||||
countMaps uint64
|
||||
locker *syncutils.RWMutex
|
||||
itemMaps []map[uint64]*Item
|
||||
|
||||
gcTicker *time.Ticker
|
||||
gcIndex int
|
||||
}
|
||||
|
||||
// NewCounter create new counter
|
||||
func NewCounter() *Counter {
|
||||
var count = runtime.NumCPU() * 4
|
||||
if count < 8 {
|
||||
count = 8
|
||||
}
|
||||
|
||||
var itemMaps = []map[uint64]*Item{}
|
||||
for i := 0; i < count; i++ {
|
||||
itemMaps = append(itemMaps, map[uint64]*Item{})
|
||||
}
|
||||
|
||||
var counter = &Counter{
|
||||
countMaps: uint64(count),
|
||||
locker: syncutils.NewRWMutex(count),
|
||||
itemMaps: itemMaps,
|
||||
}
|
||||
|
||||
return counter
|
||||
}
|
||||
|
||||
// WithGC start the counter with gc automatically
|
||||
func (this *Counter) WithGC() *Counter {
|
||||
if this.gcTicker != nil {
|
||||
return this
|
||||
}
|
||||
this.gcTicker = time.NewTicker(10 * time.Second)
|
||||
if Tea.IsTesting() {
|
||||
this.gcTicker = time.NewTicker(1 * time.Second)
|
||||
}
|
||||
go func() {
|
||||
for range this.gcTicker.C {
|
||||
this.GC()
|
||||
}
|
||||
}()
|
||||
|
||||
return this
|
||||
}
|
||||
|
||||
// Increase key
|
||||
func (this *Counter) Increase(key uint64, lifeSeconds int) uint64 {
|
||||
var index = int(key % this.countMaps)
|
||||
this.locker.RLock(index)
|
||||
var item = this.itemMaps[index][key]
|
||||
this.locker.RUnlock(index)
|
||||
if item == nil { // no need to care about duplication
|
||||
item = NewItem(lifeSeconds)
|
||||
this.locker.Lock(index)
|
||||
|
||||
// check again
|
||||
oldItem, ok := this.itemMaps[index][key]
|
||||
if !ok {
|
||||
this.itemMaps[index][key] = item
|
||||
} else {
|
||||
item = oldItem
|
||||
}
|
||||
|
||||
this.locker.Unlock(index)
|
||||
}
|
||||
|
||||
this.locker.Lock(index)
|
||||
var result = item.Increase()
|
||||
this.locker.Unlock(index)
|
||||
return result
|
||||
}
|
||||
|
||||
// IncreaseKey increase string key
|
||||
func (this *Counter) IncreaseKey(key string, lifeSeconds int) uint64 {
|
||||
return this.Increase(this.hash(key), lifeSeconds)
|
||||
}
|
||||
|
||||
// Get value of key
|
||||
func (this *Counter) Get(key uint64) uint64 {
|
||||
var index = int(key % this.countMaps)
|
||||
this.locker.RLock(index)
|
||||
defer this.locker.RUnlock(index)
|
||||
var item = this.itemMaps[index][key]
|
||||
if item != nil {
|
||||
return item.Sum()
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// GetKey get value of string key
|
||||
func (this *Counter) GetKey(key string) uint64 {
|
||||
return this.Get(this.hash(key))
|
||||
}
|
||||
|
||||
// Reset key
|
||||
func (this *Counter) Reset(key uint64) {
|
||||
var index = int(key % this.countMaps)
|
||||
this.locker.RLock(index)
|
||||
var item = this.itemMaps[index][key]
|
||||
this.locker.RUnlock(index)
|
||||
|
||||
if item != nil {
|
||||
this.locker.Lock(index)
|
||||
delete(this.itemMaps[index], key)
|
||||
this.locker.Unlock(index)
|
||||
}
|
||||
}
|
||||
|
||||
// ResetKey string key
|
||||
func (this *Counter) ResetKey(key string) {
|
||||
this.Reset(this.hash(key))
|
||||
}
|
||||
|
||||
// TotalItems get items count
|
||||
func (this *Counter) TotalItems() int {
|
||||
var total = 0
|
||||
|
||||
for i := 0; i < int(this.countMaps); i++ {
|
||||
this.locker.RLock(i)
|
||||
total += len(this.itemMaps[i])
|
||||
this.locker.RUnlock(i)
|
||||
}
|
||||
|
||||
return total
|
||||
}
|
||||
|
||||
// GC garbage expired items
|
||||
func (this *Counter) GC() {
|
||||
var gcIndex = this.gcIndex
|
||||
|
||||
this.gcIndex++
|
||||
if this.gcIndex >= int(this.countMaps) {
|
||||
this.gcIndex = 0
|
||||
}
|
||||
|
||||
this.locker.RLock(gcIndex)
|
||||
var itemMap = this.itemMaps[gcIndex]
|
||||
var expiredKeys = []uint64{}
|
||||
for key, item := range itemMap {
|
||||
if item.IsExpired() {
|
||||
expiredKeys = append(expiredKeys, key)
|
||||
}
|
||||
}
|
||||
this.locker.RUnlock(gcIndex)
|
||||
|
||||
if len(expiredKeys) > 0 {
|
||||
for _, key := range expiredKeys {
|
||||
this.locker.Lock(gcIndex)
|
||||
delete(itemMap, key)
|
||||
this.locker.Unlock(gcIndex)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// calculate hash of the key
|
||||
func (this *Counter) hash(key string) uint64 {
|
||||
return xxhash.Sum64String(key)
|
||||
}
|
||||
143
internal/utils/counters/counter_test.go
Normal file
143
internal/utils/counters/counter_test.go
Normal file
@@ -0,0 +1,143 @@
|
||||
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
|
||||
package counters_test
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils/counters"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils/testutils"
|
||||
"github.com/iwind/TeaGo/assert"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"runtime"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestCounter_Increase(t *testing.T) {
|
||||
var a = assert.NewAssertion(t)
|
||||
|
||||
var counter = counters.NewCounter()
|
||||
a.IsTrue(counter.Increase(1, 10) == 1)
|
||||
a.IsTrue(counter.Increase(1, 10) == 2)
|
||||
a.IsTrue(counter.Increase(2, 10) == 1)
|
||||
|
||||
counter.Reset(1)
|
||||
a.IsTrue(counter.Get(1) == 0) // changed
|
||||
a.IsTrue(counter.Get(2) == 1) // not changed
|
||||
}
|
||||
|
||||
func TestCounter_IncreaseKey(t *testing.T) {
|
||||
var a = assert.NewAssertion(t)
|
||||
|
||||
var counter = counters.NewCounter()
|
||||
a.IsTrue(counter.IncreaseKey("1", 10) == 1)
|
||||
a.IsTrue(counter.IncreaseKey("1", 10) == 2)
|
||||
a.IsTrue(counter.IncreaseKey("2", 10) == 1)
|
||||
|
||||
counter.ResetKey("1")
|
||||
a.IsTrue(counter.GetKey("1") == 0) // changed
|
||||
a.IsTrue(counter.GetKey("2") == 1) // not changed
|
||||
}
|
||||
|
||||
func TestCounter_GC(t *testing.T) {
|
||||
if !testutils.IsSingleTesting() {
|
||||
return
|
||||
}
|
||||
|
||||
var counter = counters.NewCounter()
|
||||
counter.Increase(1, 20)
|
||||
time.Sleep(1 * time.Second)
|
||||
counter.Increase(1, 20)
|
||||
time.Sleep(1 * time.Second)
|
||||
counter.Increase(1, 20)
|
||||
counter.GC()
|
||||
}
|
||||
|
||||
func TestCounter_GC2(t *testing.T) {
|
||||
var counter = counters.NewCounter()
|
||||
for i := 0; i < runtime.NumCPU()*32; i++ {
|
||||
counter.GC()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkCounter_Increase(b *testing.B) {
|
||||
runtime.GOMAXPROCS(4)
|
||||
|
||||
var counter = counters.NewCounter()
|
||||
b.ResetTimer()
|
||||
|
||||
var i uint64
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
counter.Increase(atomic.AddUint64(&i, 1)%1e6, 20)
|
||||
}
|
||||
})
|
||||
|
||||
//b.Log(counter.TotalItems())
|
||||
}
|
||||
|
||||
func BenchmarkCounter_IncreaseKey(b *testing.B) {
|
||||
runtime.GOMAXPROCS(4)
|
||||
|
||||
var counter = counters.NewCounter()
|
||||
|
||||
go func() {
|
||||
var ticker = time.NewTicker(100 * time.Millisecond)
|
||||
for range ticker.C {
|
||||
counter.GC()
|
||||
}
|
||||
}()
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
var i uint64
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
counter.IncreaseKey(types.String(atomic.AddUint64(&i, 1)%1e6), 20)
|
||||
}
|
||||
})
|
||||
|
||||
//b.Log(counter.TotalItems())
|
||||
}
|
||||
|
||||
func BenchmarkCounter_IncreaseKey2(b *testing.B) {
|
||||
runtime.GOMAXPROCS(4)
|
||||
|
||||
var counter = counters.NewCounter()
|
||||
|
||||
go func() {
|
||||
var ticker = time.NewTicker(1 * time.Millisecond)
|
||||
for range ticker.C {
|
||||
counter.GC()
|
||||
}
|
||||
}()
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
var i uint64
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
counter.IncreaseKey(types.String(atomic.AddUint64(&i, 1)%1e5), 20)
|
||||
}
|
||||
})
|
||||
|
||||
//b.Log(counter.TotalItems())
|
||||
}
|
||||
|
||||
func BenchmarkCounter_GC(b *testing.B) {
|
||||
runtime.GOMAXPROCS(4)
|
||||
|
||||
var counter = counters.NewCounter()
|
||||
|
||||
for i := uint64(0); i < 1e5; i++ {
|
||||
counter.IncreaseKey(types.String(i), 20)
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
counter.GC()
|
||||
}
|
||||
})
|
||||
}
|
||||
77
internal/utils/counters/item.go
Normal file
77
internal/utils/counters/item.go
Normal file
@@ -0,0 +1,77 @@
|
||||
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
|
||||
package counters
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
|
||||
)
|
||||
|
||||
type Item struct {
|
||||
lifeSeconds int64
|
||||
|
||||
spanSeconds int64
|
||||
spans []*Span
|
||||
|
||||
lastUpdateTime int64
|
||||
}
|
||||
|
||||
func NewItem(lifeSeconds int) *Item {
|
||||
if lifeSeconds <= 0 {
|
||||
lifeSeconds = 60
|
||||
}
|
||||
var spanSeconds = lifeSeconds / 10
|
||||
if spanSeconds < 1 {
|
||||
spanSeconds = 1
|
||||
}
|
||||
var countSpans = lifeSeconds/spanSeconds + 1 /** prevent index out of bounds **/
|
||||
var spans = []*Span{}
|
||||
for i := 0; i < countSpans; i++ {
|
||||
spans = append(spans, NewSpan())
|
||||
}
|
||||
|
||||
return &Item{
|
||||
lifeSeconds: int64(lifeSeconds),
|
||||
spanSeconds: int64(spanSeconds),
|
||||
spans: spans,
|
||||
lastUpdateTime: fasttime.Now().Unix(),
|
||||
}
|
||||
}
|
||||
|
||||
func (this *Item) Increase() uint64 {
|
||||
var currentTime = fasttime.Now().Unix()
|
||||
var spanIndex = int(currentTime % this.lifeSeconds / this.spanSeconds)
|
||||
var span = this.spans[spanIndex]
|
||||
var roundTime = currentTime / this.spanSeconds * this.spanSeconds
|
||||
|
||||
this.lastUpdateTime = currentTime
|
||||
|
||||
if span.Timestamp < roundTime {
|
||||
span.Timestamp = roundTime // update time
|
||||
span.Count = 0 // reset count
|
||||
}
|
||||
span.Count++
|
||||
|
||||
return this.Sum()
|
||||
}
|
||||
|
||||
func (this *Item) Sum() uint64 {
|
||||
var result uint64 = 0
|
||||
var currentTimestamp = fasttime.Now().Unix()
|
||||
for _, span := range this.spans {
|
||||
if span.Timestamp >= currentTimestamp-this.lifeSeconds {
|
||||
result += span.Count
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (this *Item) Reset() {
|
||||
for _, span := range this.spans {
|
||||
span.Count = 0
|
||||
span.Timestamp = 0
|
||||
}
|
||||
}
|
||||
|
||||
func (this *Item) IsExpired() bool {
|
||||
return this.lastUpdateTime < fasttime.Now().Unix()-this.lifeSeconds-this.spanSeconds
|
||||
}
|
||||
54
internal/utils/counters/item_test.go
Normal file
54
internal/utils/counters/item_test.go
Normal file
@@ -0,0 +1,54 @@
|
||||
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
|
||||
package counters_test
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils/counters"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils/testutils"
|
||||
"github.com/iwind/TeaGo/assert"
|
||||
timeutil "github.com/iwind/TeaGo/utils/time"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestItem_Increase(t *testing.T) {
|
||||
// run only under single testing
|
||||
if !testutils.IsSingleTesting() {
|
||||
return
|
||||
}
|
||||
|
||||
var a = assert.NewAssertion(t)
|
||||
|
||||
var item = counters.NewItem(20)
|
||||
for i := 0; i < 100; i++ {
|
||||
t.Log(item.Increase(), timeutil.Format("i:s"))
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
|
||||
item.Reset()
|
||||
a.IsTrue(item.Sum() == 0)
|
||||
}
|
||||
|
||||
func TestItem_IsExpired(t *testing.T) {
|
||||
if !testutils.IsSingleTesting() {
|
||||
return
|
||||
}
|
||||
|
||||
var item = counters.NewItem(10)
|
||||
t.Log(item.IsExpired())
|
||||
time.Sleep(10 * time.Second)
|
||||
t.Log(item.IsExpired())
|
||||
time.Sleep(2 * time.Second)
|
||||
t.Log(item.IsExpired())
|
||||
}
|
||||
|
||||
func BenchmarkItem_Increase(b *testing.B) {
|
||||
runtime.GOMAXPROCS(1)
|
||||
|
||||
var item = counters.NewItem(60)
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
item.Increase()
|
||||
}
|
||||
}
|
||||
12
internal/utils/counters/span.go
Normal file
12
internal/utils/counters/span.go
Normal file
@@ -0,0 +1,12 @@
|
||||
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
|
||||
package counters
|
||||
|
||||
type Span struct {
|
||||
Timestamp int64
|
||||
Count uint64
|
||||
}
|
||||
|
||||
func NewSpan() *Span {
|
||||
return &Span{}
|
||||
}
|
||||
Reference in New Issue
Block a user