diff --git a/internal/utils/counters/counter.go b/internal/utils/counters/counter.go index 995bb11..fc98c93 100644 --- a/internal/utils/counters/counter.go +++ b/internal/utils/counters/counter.go @@ -3,10 +3,11 @@ package counters import ( + "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" syncutils "github.com/TeaOSLab/EdgeNode/internal/utils/sync" "github.com/cespare/xxhash" - "github.com/iwind/TeaGo/Tea" "runtime" + "sync" "time" ) @@ -17,6 +18,7 @@ type Counter struct { gcTicker *time.Ticker gcIndex int + gcLocker sync.Mutex } // NewCounter create new counter @@ -24,6 +26,8 @@ func NewCounter() *Counter { var count = runtime.NumCPU() * 4 if count < 8 { count = 8 + } else if count > 128 { + count = 128 } var itemMaps = []map[uint64]*Item{} @@ -45,10 +49,7 @@ func (this *Counter) WithGC() *Counter { if this.gcTicker != nil { return this } - this.gcTicker = time.NewTicker(10 * time.Second) - if Tea.IsTesting() { - this.gcTicker = time.NewTicker(1 * time.Second) - } + this.gcTicker = time.NewTicker(1 * time.Second) go func() { for range this.gcTicker.C { this.GC() @@ -141,6 +142,7 @@ func (this *Counter) TotalItems() int { // GC garbage expired items func (this *Counter) GC() { + this.gcLocker.Lock() var gcIndex = this.gcIndex this.gcIndex++ @@ -148,11 +150,15 @@ func (this *Counter) GC() { this.gcIndex = 0 } + this.gcLocker.Unlock() + + var currentTime = fasttime.Now().Unix() + this.locker.RLock(gcIndex) var itemMap = this.itemMaps[gcIndex] var expiredKeys = []uint64{} for key, item := range itemMap { - if item.IsExpired() { + if item.IsExpired(currentTime) { expiredKeys = append(expiredKeys, key) } } diff --git a/internal/utils/counters/counter_test.go b/internal/utils/counters/counter_test.go index ed5d991..b742809 100644 --- a/internal/utils/counters/counter_test.go +++ b/internal/utils/counters/counter_test.go @@ -6,7 +6,9 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/utils/counters" "github.com/TeaOSLab/EdgeNode/internal/utils/testutils" "github.com/iwind/TeaGo/assert" + "github.com/iwind/TeaGo/rands" "github.com/iwind/TeaGo/types" + timeutil "github.com/iwind/TeaGo/utils/time" "runtime" "sync/atomic" "testing" @@ -54,9 +56,21 @@ func TestCounter_GC(t *testing.T) { } func TestCounter_GC2(t *testing.T) { - var counter = counters.NewCounter() - for i := 0; i < runtime.NumCPU()*32; i++ { - counter.GC() + if !testutils.IsSingleTesting() { + return + } + + var counter = counters.NewCounter().WithGC() + for i := 0; i < 1e5; i++ { + counter.Increase(uint64(i), rands.Int(10, 300)) + } + + var ticker = time.NewTicker(1 * time.Second) + for range ticker.C { + t.Log(timeutil.Format("H:i:s"), counter.TotalItems()) + if counter.TotalItems() == 0 { + break + } } } @@ -140,4 +154,6 @@ func BenchmarkCounter_GC(b *testing.B) { counter.GC() } }) + + //b.Log(counter.TotalItems()) } diff --git a/internal/utils/counters/item.go b/internal/utils/counters/item.go index ffd21a2..41c67c6 100644 --- a/internal/utils/counters/item.go +++ b/internal/utils/counters/item.go @@ -72,6 +72,6 @@ func (this *Item) Reset() { } } -func (this *Item) IsExpired() bool { - return this.lastUpdateTime < fasttime.Now().Unix()-this.lifeSeconds-this.spanSeconds +func (this *Item) IsExpired(currentTime int64) bool { + return this.lastUpdateTime < currentTime-this.lifeSeconds-this.spanSeconds } diff --git a/internal/utils/counters/item_test.go b/internal/utils/counters/item_test.go index 3da656f..391b817 100644 --- a/internal/utils/counters/item_test.go +++ b/internal/utils/counters/item_test.go @@ -35,12 +35,14 @@ func TestItem_IsExpired(t *testing.T) { return } + var currentTime = time.Now().Unix() + var item = counters.NewItem(10) - t.Log(item.IsExpired()) + t.Log(item.IsExpired(currentTime)) time.Sleep(10 * time.Second) - t.Log(item.IsExpired()) + t.Log(item.IsExpired(currentTime)) time.Sleep(2 * time.Second) - t.Log(item.IsExpired()) + t.Log(item.IsExpired(currentTime)) } func BenchmarkItem_Increase(b *testing.B) {