mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-10 04:20:27 +08:00
优化计数器
This commit is contained in:
@@ -3,10 +3,11 @@
|
|||||||
package counters
|
package counters
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
|
||||||
syncutils "github.com/TeaOSLab/EdgeNode/internal/utils/sync"
|
syncutils "github.com/TeaOSLab/EdgeNode/internal/utils/sync"
|
||||||
"github.com/cespare/xxhash"
|
"github.com/cespare/xxhash"
|
||||||
"github.com/iwind/TeaGo/Tea"
|
|
||||||
"runtime"
|
"runtime"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -17,6 +18,7 @@ type Counter struct {
|
|||||||
|
|
||||||
gcTicker *time.Ticker
|
gcTicker *time.Ticker
|
||||||
gcIndex int
|
gcIndex int
|
||||||
|
gcLocker sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewCounter create new counter
|
// NewCounter create new counter
|
||||||
@@ -24,6 +26,8 @@ func NewCounter() *Counter {
|
|||||||
var count = runtime.NumCPU() * 4
|
var count = runtime.NumCPU() * 4
|
||||||
if count < 8 {
|
if count < 8 {
|
||||||
count = 8
|
count = 8
|
||||||
|
} else if count > 128 {
|
||||||
|
count = 128
|
||||||
}
|
}
|
||||||
|
|
||||||
var itemMaps = []map[uint64]*Item{}
|
var itemMaps = []map[uint64]*Item{}
|
||||||
@@ -45,10 +49,7 @@ func (this *Counter) WithGC() *Counter {
|
|||||||
if this.gcTicker != nil {
|
if this.gcTicker != nil {
|
||||||
return this
|
return this
|
||||||
}
|
}
|
||||||
this.gcTicker = time.NewTicker(10 * time.Second)
|
this.gcTicker = time.NewTicker(1 * time.Second)
|
||||||
if Tea.IsTesting() {
|
|
||||||
this.gcTicker = time.NewTicker(1 * time.Second)
|
|
||||||
}
|
|
||||||
go func() {
|
go func() {
|
||||||
for range this.gcTicker.C {
|
for range this.gcTicker.C {
|
||||||
this.GC()
|
this.GC()
|
||||||
@@ -141,6 +142,7 @@ func (this *Counter) TotalItems() int {
|
|||||||
|
|
||||||
// GC garbage expired items
|
// GC garbage expired items
|
||||||
func (this *Counter) GC() {
|
func (this *Counter) GC() {
|
||||||
|
this.gcLocker.Lock()
|
||||||
var gcIndex = this.gcIndex
|
var gcIndex = this.gcIndex
|
||||||
|
|
||||||
this.gcIndex++
|
this.gcIndex++
|
||||||
@@ -148,11 +150,15 @@ func (this *Counter) GC() {
|
|||||||
this.gcIndex = 0
|
this.gcIndex = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.gcLocker.Unlock()
|
||||||
|
|
||||||
|
var currentTime = fasttime.Now().Unix()
|
||||||
|
|
||||||
this.locker.RLock(gcIndex)
|
this.locker.RLock(gcIndex)
|
||||||
var itemMap = this.itemMaps[gcIndex]
|
var itemMap = this.itemMaps[gcIndex]
|
||||||
var expiredKeys = []uint64{}
|
var expiredKeys = []uint64{}
|
||||||
for key, item := range itemMap {
|
for key, item := range itemMap {
|
||||||
if item.IsExpired() {
|
if item.IsExpired(currentTime) {
|
||||||
expiredKeys = append(expiredKeys, key)
|
expiredKeys = append(expiredKeys, key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,7 +6,9 @@ import (
|
|||||||
"github.com/TeaOSLab/EdgeNode/internal/utils/counters"
|
"github.com/TeaOSLab/EdgeNode/internal/utils/counters"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/utils/testutils"
|
"github.com/TeaOSLab/EdgeNode/internal/utils/testutils"
|
||||||
"github.com/iwind/TeaGo/assert"
|
"github.com/iwind/TeaGo/assert"
|
||||||
|
"github.com/iwind/TeaGo/rands"
|
||||||
"github.com/iwind/TeaGo/types"
|
"github.com/iwind/TeaGo/types"
|
||||||
|
timeutil "github.com/iwind/TeaGo/utils/time"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
@@ -54,9 +56,21 @@ func TestCounter_GC(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestCounter_GC2(t *testing.T) {
|
func TestCounter_GC2(t *testing.T) {
|
||||||
var counter = counters.NewCounter()
|
if !testutils.IsSingleTesting() {
|
||||||
for i := 0; i < runtime.NumCPU()*32; i++ {
|
return
|
||||||
counter.GC()
|
}
|
||||||
|
|
||||||
|
var counter = counters.NewCounter().WithGC()
|
||||||
|
for i := 0; i < 1e5; i++ {
|
||||||
|
counter.Increase(uint64(i), rands.Int(10, 300))
|
||||||
|
}
|
||||||
|
|
||||||
|
var ticker = time.NewTicker(1 * time.Second)
|
||||||
|
for range ticker.C {
|
||||||
|
t.Log(timeutil.Format("H:i:s"), counter.TotalItems())
|
||||||
|
if counter.TotalItems() == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -140,4 +154,6 @@ func BenchmarkCounter_GC(b *testing.B) {
|
|||||||
counter.GC()
|
counter.GC()
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
//b.Log(counter.TotalItems())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -72,6 +72,6 @@ func (this *Item) Reset() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *Item) IsExpired() bool {
|
func (this *Item) IsExpired(currentTime int64) bool {
|
||||||
return this.lastUpdateTime < fasttime.Now().Unix()-this.lifeSeconds-this.spanSeconds
|
return this.lastUpdateTime < currentTime-this.lifeSeconds-this.spanSeconds
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -35,12 +35,14 @@ func TestItem_IsExpired(t *testing.T) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var currentTime = time.Now().Unix()
|
||||||
|
|
||||||
var item = counters.NewItem(10)
|
var item = counters.NewItem(10)
|
||||||
t.Log(item.IsExpired())
|
t.Log(item.IsExpired(currentTime))
|
||||||
time.Sleep(10 * time.Second)
|
time.Sleep(10 * time.Second)
|
||||||
t.Log(item.IsExpired())
|
t.Log(item.IsExpired(currentTime))
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
t.Log(item.IsExpired())
|
t.Log(item.IsExpired(currentTime))
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkItem_Increase(b *testing.B) {
|
func BenchmarkItem_Increase(b *testing.B) {
|
||||||
|
|||||||
Reference in New Issue
Block a user