diff --git a/internal/db/models/sys_locker_dao.go b/internal/db/models/sys_locker_dao.go index 9bfbf1f4..3169bcfd 100644 --- a/internal/db/models/sys_locker_dao.go +++ b/internal/db/models/sys_locker_dao.go @@ -2,7 +2,6 @@ package models import ( "errors" - "github.com/TeaOSLab/EdgeAPI/internal/zero" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" @@ -14,10 +13,6 @@ import ( type SysLockerDAO dbs.DAO -// concurrent transactions control -// 考虑到存在多个API节点的可能性,容量不能太大,也不能使用mutex -var sysLockerConcurrentLimiter = make(chan zero.Zero, 8) - func NewSysLockerDAO() *SysLockerDAO { return dbs.NewDAO(&SysLockerDAO{ DAOObject: dbs.DAOObject{ @@ -119,6 +114,10 @@ func (this *SysLockerDAO) Unlock(tx *dbs.Tx, key string) error { return err } +const sysLockerStep = 8 + +var increment = NewSysLockerIncrement(sysLockerStep) + // Increase 增加版本号 func (this *SysLockerDAO) Increase(tx *dbs.Tx, key string, defaultValue int64) (int64, error) { // validate key @@ -130,10 +129,22 @@ func (this *SysLockerDAO) Increase(tx *dbs.Tx, key string, defaultValue int64) ( var result int64 var err error - sysLockerConcurrentLimiter <- zero.Zero{} // push - defer func() { - <-sysLockerConcurrentLimiter // pop - }() + { + colValue, err := this.Query(tx). + Result("version"). + Attr("key", key). + FindInt64Col(0) + if err != nil { + return 0, err + } + var lastVersion = types.Int64(colValue) + if lastVersion <= increment.MaxValue(key) { + value, ok := increment.Pop(key) + if ok { + return value, nil + } + } + } err = this.Instance.RunTx(func(tx *dbs.Tx) error { result, err = this.Increase(tx, key, defaultValue) @@ -146,7 +157,7 @@ func (this *SysLockerDAO) Increase(tx *dbs.Tx, key string, defaultValue int64) ( } // combine statements to make increasing faster - colValue, err := tx.FindCol(0, "INSERT INTO `"+this.Table+"` (`key`, `version`) VALUES ('"+key+"', "+types.String(defaultValue)+") ON DUPLICATE KEY UPDATE `version`=`version`+1; SELECT `version` FROM `"+this.Table+"` WHERE `key`='"+key+"'") + colValue, err := tx.FindCol(0, "INSERT INTO `"+this.Table+"` (`key`, `version`) VALUES ('"+key+"', "+types.String(defaultValue)+") ON DUPLICATE KEY UPDATE `version`=`version`+"+types.String(sysLockerStep)+"; SELECT `version` FROM `"+this.Table+"` WHERE `key`='"+key+"'") if err != nil { if CheckSQLErrCode(err, 1064 /** syntax error **/) { // continue to use seperated query @@ -155,7 +166,11 @@ func (this *SysLockerDAO) Increase(tx *dbs.Tx, key string, defaultValue int64) ( return 0, err } } else { - return types.Int64(colValue), nil + var maxVersion = types.Int64(colValue) + var minVersion = maxVersion - sysLockerStep + 1 + increment.Push(key, minVersion+1, maxVersion) + + return minVersion, nil } err = this.Query(tx). diff --git a/internal/db/models/sys_locker_dao_test.go b/internal/db/models/sys_locker_dao_test.go index 07120f97..40b774f6 100644 --- a/internal/db/models/sys_locker_dao_test.go +++ b/internal/db/models/sys_locker_dao_test.go @@ -43,12 +43,35 @@ func TestSysLocker_Increase_SQL(t *testing.T) { t.Log("after:", v) } +func TestSysLocker_Increase_Cache(t *testing.T) { + var dao = NewSysLockerDAO() + for i := 0; i < 11; i++ { + v, err := dao.Increase(nil, "hello", 0) + if err != nil { + t.Log("err:", err) + return + } + t.Log("hello", i, "after:", v) + } + + for i := 0; i < 11; i++ { + v, err := dao.Increase(nil, "hello2", 0) + if err != nil { + t.Log("err:", err) + return + } + t.Log("hello2", i, "after:", v) + } +} + func TestSysLocker_Increase(t *testing.T) { dbs.NotifyReady() + var dao = NewSysLockerDAO() + dao.Instance.Raw().SetMaxOpenConns(64) + var count = 1000 - var dao = NewSysLockerDAO() value, err := dao.Read(nil, "hello") if err != nil { t.Fatal(err) @@ -133,6 +156,8 @@ func TestSysLocker_Increase_Performance(t *testing.T) { func BenchmarkSysLockerDAO_Increase(b *testing.B) { var dao = NewSysLockerDAO() + dao.Instance.Raw().SetMaxOpenConns(64) + _, _ = dao.Increase(nil, "hello", 0) b.ResetTimer() diff --git a/internal/db/models/sys_locker_increment.go b/internal/db/models/sys_locker_increment.go new file mode 100644 index 00000000..b475bcee --- /dev/null +++ b/internal/db/models/sys_locker_increment.go @@ -0,0 +1,110 @@ +// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package models + +import ( + "sync" +) + +type SysLockerIncrementItem struct { + size int + c chan int64 + maxValue int64 +} + +func NewSysLockerIncrementItem(size int) *SysLockerIncrementItem { + if size <= 0 { + size = 10 + } + + return &SysLockerIncrementItem{ + size: size, + c: make(chan int64, size), + } +} + +func (this *SysLockerIncrementItem) Pop() (result int64, ok bool) { + select { + case v := <-this.c: + result = v + ok = true + return + default: + return + } +} + +func (this *SysLockerIncrementItem) Push(value int64) { + if this.maxValue < value { + this.maxValue = value + } + + select { + case this.c <- value: + default: + } +} + +func (this *SysLockerIncrementItem) Reset() { + close(this.c) + this.c = make(chan int64, this.size) +} + +func (this *SysLockerIncrementItem) MaxValue() int64 { + return this.maxValue +} + +type SysLockerIncrement struct { + itemMap map[string]*SysLockerIncrementItem // key => item + size int + locker sync.RWMutex +} + +func NewSysLockerIncrement(size int) *SysLockerIncrement { + if size <= 0 { + size = 10 + } + + return &SysLockerIncrement{ + itemMap: map[string]*SysLockerIncrementItem{}, + size: size, + } +} + +func (this *SysLockerIncrement) Pop(key string) (result int64, ok bool) { + this.locker.Lock() + defer this.locker.Unlock() + + item, itemOk := this.itemMap[key] + if itemOk { + result, ok = item.Pop() + } + return +} + +func (this *SysLockerIncrement) Push(key string, minValue int64, maxValue int64) { + this.locker.Lock() + defer this.locker.Unlock() + + item, itemOk := this.itemMap[key] + if itemOk { + item.Reset() + } else { + item = NewSysLockerIncrementItem(this.size) + this.itemMap[key] = item + } + for i := minValue; i <= maxValue; i++ { + item.Push(i) + } +} + +func (this *SysLockerIncrement) MaxValue(key string) int64 { + this.locker.RLock() + defer this.locker.RUnlock() + + item, itemOk := this.itemMap[key] + if itemOk { + return item.MaxValue() + } + return 0 +} diff --git a/internal/db/models/sys_locker_increment_test.go b/internal/db/models/sys_locker_increment_test.go new file mode 100644 index 00000000..e9d3fa8f --- /dev/null +++ b/internal/db/models/sys_locker_increment_test.go @@ -0,0 +1,23 @@ +// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package models_test + +import ( + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "testing" +) + +func TestNewSysLockerIncrement(t *testing.T) { + var increment = models.NewSysLockerIncrement(10) + increment.Push("key", 1, 10) + t.Log(increment.MaxValue("key")) + for i := 0; i < 11; i++ { + result, value := increment.Pop("key") + t.Log(i, "=>", result, value) + } + + for i := 0; i < 11; i++ { + result, value := increment.Pop("key1") + t.Log(i, "=>", result, value) + } +}