From cf5dd5ffead7730415659d44123701d6204cacd4 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Sun, 2 Jul 2023 15:27:49 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=87=AA=E5=A2=9E=E9=94=81?= =?UTF-8?q?=E7=AE=97=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/db/models/sys_locker_dao.go | 14 +++- internal/db/models/sys_locker_dao_test.go | 83 +++++++++++++++++++++-- 2 files changed, 88 insertions(+), 9 deletions(-) diff --git a/internal/db/models/sys_locker_dao.go b/internal/db/models/sys_locker_dao.go index 49a1912d..232fe10c 100644 --- a/internal/db/models/sys_locker_dao.go +++ b/internal/db/models/sys_locker_dao.go @@ -1,6 +1,7 @@ package models import ( + "github.com/TeaOSLab/EdgeAPI/internal/zero" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" @@ -11,6 +12,10 @@ import ( type SysLockerDAO dbs.DAO +// concurrent transactions control +// 考虑到存在多个API节点的可能性,容量不能太大,也不能使用mutex +var sysLockerConcurrentLimiter = make(chan zero.Zero, 8) + func NewSysLockerDAO() *SysLockerDAO { return dbs.NewDAO(&SysLockerDAO{ DAOObject: dbs.DAOObject{ @@ -117,6 +122,12 @@ func (this *SysLockerDAO) Increase(tx *dbs.Tx, key string, defaultValue int64) ( if tx == nil { var result int64 var err error + + sysLockerConcurrentLimiter <- zero.Zero{} // push + defer func() { + <-sysLockerConcurrentLimiter // pop + }() + err = this.Instance.RunTx(func(tx *dbs.Tx) error { result, err = this.Increase(tx, key, defaultValue) if err != nil { @@ -142,11 +153,10 @@ func (this *SysLockerDAO) Increase(tx *dbs.Tx, key string, defaultValue int64) ( FindInt64Col(0) } - // 读取当前版本号 func (this *SysLockerDAO) Read(tx *dbs.Tx, key string) (int64, error) { return this.Query(tx). Attr("key", key). Result("version"). FindInt64Col(0) -} \ No newline at end of file +} diff --git a/internal/db/models/sys_locker_dao_test.go b/internal/db/models/sys_locker_dao_test.go index 13b73bad..70af37bc 100644 --- a/internal/db/models/sys_locker_dao_test.go +++ b/internal/db/models/sys_locker_dao_test.go @@ -3,8 +3,10 @@ package models import ( _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/types" "sync" "testing" + "time" ) func TestSysLockerDAO_Lock(t *testing.T) { @@ -25,22 +27,89 @@ func TestSysLockerDAO_Lock(t *testing.T) { } func TestSysLocker_Increase(t *testing.T) { - count := 100 - wg := sync.WaitGroup{} + dbs.NotifyReady() + + var count = 1000 + + var dao = NewSysLockerDAO() + value, err := dao.Read(nil, "hello") + if err != nil { + t.Fatal(err) + } + t.Log("before", value) + + var locker = sync.Mutex{} + var allValueMap = map[int64]bool{} + + var before = time.Now() + + var wg = sync.WaitGroup{} wg.Add(count) for i := 0; i < count; i++ { - go func() { + go func(i int) { defer wg.Done() - v, err := NewSysLockerDAO().Increase(nil, "hello", 0) + + var key = "hello" + v, err := dao.Increase(nil, key, 0) if err != nil { t.Log("err:", err) return } - t.Log("v:", v) - }() + + locker.Lock() + if allValueMap[v] { + t.Log("duplicated:", v) + } else { + allValueMap[v] = true + } + locker.Unlock() + + //t.Log("v:", v) + _ = v + }(i) } wg.Wait() - t.Log("ok") + + t.Log("cost:", time.Since(before).Seconds()*1000, "ms") + + value, err = dao.Read(nil, "hello") + if err != nil { + t.Fatal(err) + } + + t.Log("after", value, "values:", len(allValueMap)) +} + +func TestSysLocker_Increase_Performance(t *testing.T) { + dbs.NotifyReady() + + var count = 1000 + + var dao = NewSysLockerDAO() + + var before = time.Now() + + var wg = sync.WaitGroup{} + wg.Add(count) + + for i := 0; i < count; i++ { + go func(i int) { + defer wg.Done() + + var key = "hello" + types.String(i%10) + v, err := dao.Increase(nil, key, 0) + if err != nil { + t.Log("err:", err) + return + } + //t.Log("v:", v) + _ = v + }(i) + } + + wg.Wait() + + t.Log("cost:", time.Since(before).Seconds()*1000, "ms") }