diff --git a/internal/utils/sync/map_int.go b/internal/utils/sync/map_int.go new file mode 100644 index 0000000..f89ae8d --- /dev/null +++ b/internal/utils/sync/map_int.go @@ -0,0 +1,136 @@ +// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package syncutils + +import ( + "runtime" + "sync" +) + +type KType interface { + int | int16 | int32 | int64 | uint | uint16 | uint32 | uint64 | uintptr +} + +type VType interface { + any +} + +type IntMap[K KType, V VType] struct { + count int + m []map[K]V + lockers []*sync.RWMutex +} + +func NewIntMap[K KType, V VType]() *IntMap[K, V] { + var count = runtime.NumCPU() * 8 + if count <= 0 { + count = 32 + } + + var m = []map[K]V{} + var lockers = []*sync.RWMutex{} + for i := 0; i < count; i++ { + m = append(m, map[K]V{}) + lockers = append(lockers, &sync.RWMutex{}) + } + + return &IntMap[K, V]{ + count: count, + m: m, + lockers: lockers, + } +} + +func (this *IntMap[K, V]) Put(k K, v V) { + var index = this.index(k) + this.lockers[index].Lock() + this.m[index][k] = v + this.lockers[index].Unlock() +} + +func (this *IntMap[K, V]) PutCompact(k K, v V, compactFunc func(oldV V, newV V) V) { + var index = this.index(k) + this.lockers[index].Lock() + // 再次检查是否已经存在,如果已经存在则合并 + oldV, ok := this.m[index][k] + if ok { + this.m[index][k] = compactFunc(oldV, v) + } else { + this.m[index][k] = v + } + this.lockers[index].Unlock() +} + +func (this *IntMap[K, V]) Has(k K) bool { + var index = this.index(k) + this.lockers[index].RLock() + _, ok := this.m[index][k] + this.lockers[index].RUnlock() + return ok +} + +func (this *IntMap[K, V]) Get(k K) (value V) { + var index = this.index(k) + this.lockers[index].RLock() + value, _ = this.m[index][k] + this.lockers[index].RUnlock() + return +} + +func (this *IntMap[K, V]) GetOk(k K) (value V, ok bool) { + var index = this.index(k) + this.lockers[index].RLock() + value, ok = this.m[index][k] + this.lockers[index].RUnlock() + return +} + +func (this *IntMap[K, V]) Delete(k K) { + var index = this.index(k) + this.lockers[index].Lock() + delete(this.m[index], k) + this.lockers[index].Unlock() +} + +func (this *IntMap[K, V]) DeleteUnsafe(k K) { + var index = this.index(k) + delete(this.m[index], k) +} + +func (this *IntMap[K, V]) Len() int { + var l int + for i := 0; i < this.count; i++ { + this.lockers[i].RLock() + l += len(this.m[i]) + this.lockers[i].RUnlock() + } + return l +} + +func (this *IntMap[K, V]) ForEachRead(iterator func(k K, v V)) { + for i := 0; i < this.count; i++ { + this.lockers[i].RLock() + for k, v := range this.m[i] { + iterator(k, v) + } + this.lockers[i].RUnlock() + } +} + +func (this *IntMap[K, V]) ForEachWrite(iterator func(k K, v V)) { + for i := 0; i < this.count; i++ { + this.lockers[i].Lock() + for k, v := range this.m[i] { + iterator(k, v) + } + this.lockers[i].Unlock() + } +} + +func (this *IntMap[K, V]) index(k K) int { + var index = int(k % K(this.count)) + if index < 0 { + index = -index + } + return index +} diff --git a/internal/utils/sync/map_int_test.go b/internal/utils/sync/map_int_test.go new file mode 100644 index 0000000..a0b97f6 --- /dev/null +++ b/internal/utils/sync/map_int_test.go @@ -0,0 +1,125 @@ +// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package syncutils_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/stats" + syncutils "github.com/TeaOSLab/EdgeNode/internal/utils/sync" + "github.com/iwind/TeaGo/assert" + "github.com/iwind/TeaGo/types" + "sync" + "testing" +) + +func TestIntMap(t *testing.T) { + var a = assert.NewAssertion(t) + + var m = syncutils.NewIntMap[int, string]() + m.Put(1, "1") + a.IsTrue(m.Has(1)) + a.IsFalse(m.Has(2)) + m.Put(-1, "-1") + t.Log(m.Get(-1)) + t.Log(m.Len(), "values") + { + a.IsTrue(m.Has(-1)) + m.Delete(-1) + a.IsFalse(m.Has(-1)) + } + t.Log(m.Len(), "values") +} + +func TestInt64Map(t *testing.T) { + var a = assert.NewAssertion(t) + + var m = syncutils.NewIntMap[int64, string]() + m.Put(1, "1") + a.IsTrue(m.Has(1)) + a.IsFalse(m.Has(2)) + m.Put(-1, "-1") + t.Log(m.Get(-1)) + t.Log(m.Len(), "values") + { + a.IsTrue(m.Has(-1)) + m.Delete(-1) + a.IsFalse(m.Has(-1)) + } + m.Put(1024000000, "large int") + t.Log(m.Get(1024000000)) + t.Log(m.Len(), "values") +} + +func TestIntMap_PutCompact(t *testing.T) { + var a = assert.NewAssertion(t) + + var m = syncutils.NewIntMap[int, string]() + m.Put(1, "a") + m.Put(1, "b") + a.IsTrue(m.Get(1) == "b") + + m.PutCompact(1, "c", func(oldV string, newV string) string { + return oldV + newV + }) + + a.IsTrue(m.Get(1) == "bc") +} + +func TestIntMap_ForEachRead(t *testing.T) { + var m = syncutils.NewIntMap[int, string]() + for i := 0; i < 100; i++ { + m.Put(i, "v"+types.String(i)) + } + + t.Log(m.Len()) + + m.ForEachRead(func(k int, v string) { + t.Log(k, v) + }) +} + +func TestIntMap_ForEachWrite(t *testing.T) { + var m = syncutils.NewIntMap[int, string]() + for i := 0; i < 100; i++ { + m.Put(i, "v"+types.String(i)) + } + + t.Log(m.Len()) + + m.ForEachRead(func(k int, v string) { + t.Log(k, v) + m.DeleteUnsafe(k) + }) + t.Log(m.Len(), "elements left") +} + +func BenchmarkNewIntMap(b *testing.B) { + var m = syncutils.NewIntMap[int, *stats.BandwidthStat]() + + b.RunParallel(func(pb *testing.PB) { + var i int + for pb.Next() { + i++ + m.Put(i, &stats.BandwidthStat{ServerId: 100}) + _ = m.Get(i + 100) + } + }) +} + +func BenchmarkNewIntMap2(b *testing.B) { + var m = map[int]*stats.BandwidthStat{} + var locker = sync.RWMutex{} + + b.RunParallel(func(pb *testing.PB) { + var i int + for pb.Next() { + i++ + locker.Lock() + m[i] = &stats.BandwidthStat{ServerId: 100} + locker.Unlock() + + locker.RLock() + _ = m[i+100] + locker.RUnlock() + } + }) +} diff --git a/internal/utils/sync/rw_mutex.go b/internal/utils/sync/rw_mutex.go new file mode 100644 index 0000000..b4bee82 --- /dev/null +++ b/internal/utils/sync/rw_mutex.go @@ -0,0 +1,52 @@ +// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package syncutils + +import ( + "sync" +) + +type RWMutex struct { + lockers []*sync.RWMutex + countLockers int +} + +func NewRWMutex(count int) *RWMutex { + if count <= 0 { + count = 1 + } + + var lockers = []*sync.RWMutex{} + for i := 0; i < count; i++ { + lockers = append(lockers, &sync.RWMutex{}) + } + + return &RWMutex{ + lockers: lockers, + countLockers: len(lockers), + } +} + +func (this *RWMutex) Lock(index int) { + this.lockers[index%this.countLockers].Lock() +} + +func (this *RWMutex) Unlock(index int) { + this.lockers[index%this.countLockers].Unlock() +} + +func (this *RWMutex) RLock(index int) { + this.lockers[index%this.countLockers].RLock() +} + +func (this *RWMutex) RUnlock(index int) { + this.lockers[index%this.countLockers].RUnlock() +} + +func (this *RWMutex) TryLock(index int) bool { + return this.lockers[index%this.countLockers].TryLock() +} + +func (this *RWMutex) TryRLock(index int) bool { + return this.lockers[index%this.countLockers].TryRLock() +} diff --git a/internal/utils/sync/rw_mutex_test.go b/internal/utils/sync/rw_mutex_test.go new file mode 100644 index 0000000..092d2b6 --- /dev/null +++ b/internal/utils/sync/rw_mutex_test.go @@ -0,0 +1,47 @@ +// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package syncutils_test + +import ( + syncutils "github.com/TeaOSLab/EdgeNode/internal/utils/sync" + "runtime" + "sync" + "testing" + "time" +) + +func TestNewRWMutex(t *testing.T) { + var locker = syncutils.NewRWMutex(runtime.NumCPU()) + locker.Lock(1) + t.Log(locker.TryLock(1)) + locker.Unlock(1) + t.Log(locker.TryLock(1)) +} + +func BenchmarkRWMutex_Lock(b *testing.B) { + var locker = syncutils.NewRWMutex(runtime.NumCPU()) + + b.RunParallel(func(pb *testing.PB) { + var i = 0 + for pb.Next() { + i++ + locker.Lock(i) + time.Sleep(1 * time.Millisecond) + locker.Unlock(i) + } + }) +} + +func BenchmarkRWMutex_Lock2(b *testing.B) { + var locker = &sync.Mutex{} + + b.RunParallel(func(pb *testing.PB) { + var i = 0 + for pb.Next() { + i++ + locker.Lock() + time.Sleep(1 * time.Millisecond) + locker.Unlock() + } + }) +}