mirror of
https://github.com/TeaOSLab/EdgeAPI.git
synced 2025-11-06 18:10:25 +08:00
大幅提升SysLocker自增性能
This commit is contained in:
@@ -2,7 +2,6 @@ package models
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"github.com/TeaOSLab/EdgeAPI/internal/zero"
|
|
||||||
_ "github.com/go-sql-driver/mysql"
|
_ "github.com/go-sql-driver/mysql"
|
||||||
"github.com/iwind/TeaGo/Tea"
|
"github.com/iwind/TeaGo/Tea"
|
||||||
"github.com/iwind/TeaGo/dbs"
|
"github.com/iwind/TeaGo/dbs"
|
||||||
@@ -14,10 +13,6 @@ import (
|
|||||||
|
|
||||||
type SysLockerDAO dbs.DAO
|
type SysLockerDAO dbs.DAO
|
||||||
|
|
||||||
// concurrent transactions control
|
|
||||||
// 考虑到存在多个API节点的可能性,容量不能太大,也不能使用mutex
|
|
||||||
var sysLockerConcurrentLimiter = make(chan zero.Zero, 8)
|
|
||||||
|
|
||||||
func NewSysLockerDAO() *SysLockerDAO {
|
func NewSysLockerDAO() *SysLockerDAO {
|
||||||
return dbs.NewDAO(&SysLockerDAO{
|
return dbs.NewDAO(&SysLockerDAO{
|
||||||
DAOObject: dbs.DAOObject{
|
DAOObject: dbs.DAOObject{
|
||||||
@@ -119,6 +114,10 @@ func (this *SysLockerDAO) Unlock(tx *dbs.Tx, key string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const sysLockerStep = 8
|
||||||
|
|
||||||
|
var increment = NewSysLockerIncrement(sysLockerStep)
|
||||||
|
|
||||||
// Increase 增加版本号
|
// Increase 增加版本号
|
||||||
func (this *SysLockerDAO) Increase(tx *dbs.Tx, key string, defaultValue int64) (int64, error) {
|
func (this *SysLockerDAO) Increase(tx *dbs.Tx, key string, defaultValue int64) (int64, error) {
|
||||||
// validate key
|
// validate key
|
||||||
@@ -130,10 +129,22 @@ func (this *SysLockerDAO) Increase(tx *dbs.Tx, key string, defaultValue int64) (
|
|||||||
var result int64
|
var result int64
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
sysLockerConcurrentLimiter <- zero.Zero{} // push
|
{
|
||||||
defer func() {
|
colValue, err := this.Query(tx).
|
||||||
<-sysLockerConcurrentLimiter // pop
|
Result("version").
|
||||||
}()
|
Attr("key", key).
|
||||||
|
FindInt64Col(0)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
var lastVersion = types.Int64(colValue)
|
||||||
|
if lastVersion <= increment.MaxValue(key) {
|
||||||
|
value, ok := increment.Pop(key)
|
||||||
|
if ok {
|
||||||
|
return value, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
err = this.Instance.RunTx(func(tx *dbs.Tx) error {
|
err = this.Instance.RunTx(func(tx *dbs.Tx) error {
|
||||||
result, err = this.Increase(tx, key, defaultValue)
|
result, err = this.Increase(tx, key, defaultValue)
|
||||||
@@ -146,7 +157,7 @@ func (this *SysLockerDAO) Increase(tx *dbs.Tx, key string, defaultValue int64) (
|
|||||||
}
|
}
|
||||||
|
|
||||||
// combine statements to make increasing faster
|
// combine statements to make increasing faster
|
||||||
colValue, err := tx.FindCol(0, "INSERT INTO `"+this.Table+"` (`key`, `version`) VALUES ('"+key+"', "+types.String(defaultValue)+") ON DUPLICATE KEY UPDATE `version`=`version`+1; SELECT `version` FROM `"+this.Table+"` WHERE `key`='"+key+"'")
|
colValue, err := tx.FindCol(0, "INSERT INTO `"+this.Table+"` (`key`, `version`) VALUES ('"+key+"', "+types.String(defaultValue)+") ON DUPLICATE KEY UPDATE `version`=`version`+"+types.String(sysLockerStep)+"; SELECT `version` FROM `"+this.Table+"` WHERE `key`='"+key+"'")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if CheckSQLErrCode(err, 1064 /** syntax error **/) {
|
if CheckSQLErrCode(err, 1064 /** syntax error **/) {
|
||||||
// continue to use seperated query
|
// continue to use seperated query
|
||||||
@@ -155,7 +166,11 @@ func (this *SysLockerDAO) Increase(tx *dbs.Tx, key string, defaultValue int64) (
|
|||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return types.Int64(colValue), nil
|
var maxVersion = types.Int64(colValue)
|
||||||
|
var minVersion = maxVersion - sysLockerStep + 1
|
||||||
|
increment.Push(key, minVersion+1, maxVersion)
|
||||||
|
|
||||||
|
return minVersion, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err = this.Query(tx).
|
err = this.Query(tx).
|
||||||
|
|||||||
@@ -43,12 +43,35 @@ func TestSysLocker_Increase_SQL(t *testing.T) {
|
|||||||
t.Log("after:", v)
|
t.Log("after:", v)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSysLocker_Increase_Cache(t *testing.T) {
|
||||||
|
var dao = NewSysLockerDAO()
|
||||||
|
for i := 0; i < 11; i++ {
|
||||||
|
v, err := dao.Increase(nil, "hello", 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Log("err:", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t.Log("hello", i, "after:", v)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 11; i++ {
|
||||||
|
v, err := dao.Increase(nil, "hello2", 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Log("err:", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t.Log("hello2", i, "after:", v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestSysLocker_Increase(t *testing.T) {
|
func TestSysLocker_Increase(t *testing.T) {
|
||||||
dbs.NotifyReady()
|
dbs.NotifyReady()
|
||||||
|
|
||||||
|
var dao = NewSysLockerDAO()
|
||||||
|
dao.Instance.Raw().SetMaxOpenConns(64)
|
||||||
|
|
||||||
var count = 1000
|
var count = 1000
|
||||||
|
|
||||||
var dao = NewSysLockerDAO()
|
|
||||||
value, err := dao.Read(nil, "hello")
|
value, err := dao.Read(nil, "hello")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -133,6 +156,8 @@ func TestSysLocker_Increase_Performance(t *testing.T) {
|
|||||||
|
|
||||||
func BenchmarkSysLockerDAO_Increase(b *testing.B) {
|
func BenchmarkSysLockerDAO_Increase(b *testing.B) {
|
||||||
var dao = NewSysLockerDAO()
|
var dao = NewSysLockerDAO()
|
||||||
|
dao.Instance.Raw().SetMaxOpenConns(64)
|
||||||
|
|
||||||
_, _ = dao.Increase(nil, "hello", 0)
|
_, _ = dao.Increase(nil, "hello", 0)
|
||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|||||||
110
internal/db/models/sys_locker_increment.go
Normal file
110
internal/db/models/sys_locker_increment.go
Normal file
@@ -0,0 +1,110 @@
|
|||||||
|
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||||
|
|
||||||
|
package models
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SysLockerIncrementItem struct {
|
||||||
|
size int
|
||||||
|
c chan int64
|
||||||
|
maxValue int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSysLockerIncrementItem(size int) *SysLockerIncrementItem {
|
||||||
|
if size <= 0 {
|
||||||
|
size = 10
|
||||||
|
}
|
||||||
|
|
||||||
|
return &SysLockerIncrementItem{
|
||||||
|
size: size,
|
||||||
|
c: make(chan int64, size),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *SysLockerIncrementItem) Pop() (result int64, ok bool) {
|
||||||
|
select {
|
||||||
|
case v := <-this.c:
|
||||||
|
result = v
|
||||||
|
ok = true
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *SysLockerIncrementItem) Push(value int64) {
|
||||||
|
if this.maxValue < value {
|
||||||
|
this.maxValue = value
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case this.c <- value:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *SysLockerIncrementItem) Reset() {
|
||||||
|
close(this.c)
|
||||||
|
this.c = make(chan int64, this.size)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *SysLockerIncrementItem) MaxValue() int64 {
|
||||||
|
return this.maxValue
|
||||||
|
}
|
||||||
|
|
||||||
|
type SysLockerIncrement struct {
|
||||||
|
itemMap map[string]*SysLockerIncrementItem // key => item
|
||||||
|
size int
|
||||||
|
locker sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSysLockerIncrement(size int) *SysLockerIncrement {
|
||||||
|
if size <= 0 {
|
||||||
|
size = 10
|
||||||
|
}
|
||||||
|
|
||||||
|
return &SysLockerIncrement{
|
||||||
|
itemMap: map[string]*SysLockerIncrementItem{},
|
||||||
|
size: size,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *SysLockerIncrement) Pop(key string) (result int64, ok bool) {
|
||||||
|
this.locker.Lock()
|
||||||
|
defer this.locker.Unlock()
|
||||||
|
|
||||||
|
item, itemOk := this.itemMap[key]
|
||||||
|
if itemOk {
|
||||||
|
result, ok = item.Pop()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *SysLockerIncrement) Push(key string, minValue int64, maxValue int64) {
|
||||||
|
this.locker.Lock()
|
||||||
|
defer this.locker.Unlock()
|
||||||
|
|
||||||
|
item, itemOk := this.itemMap[key]
|
||||||
|
if itemOk {
|
||||||
|
item.Reset()
|
||||||
|
} else {
|
||||||
|
item = NewSysLockerIncrementItem(this.size)
|
||||||
|
this.itemMap[key] = item
|
||||||
|
}
|
||||||
|
for i := minValue; i <= maxValue; i++ {
|
||||||
|
item.Push(i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *SysLockerIncrement) MaxValue(key string) int64 {
|
||||||
|
this.locker.RLock()
|
||||||
|
defer this.locker.RUnlock()
|
||||||
|
|
||||||
|
item, itemOk := this.itemMap[key]
|
||||||
|
if itemOk {
|
||||||
|
return item.MaxValue()
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
23
internal/db/models/sys_locker_increment_test.go
Normal file
23
internal/db/models/sys_locker_increment_test.go
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||||
|
|
||||||
|
package models_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewSysLockerIncrement(t *testing.T) {
|
||||||
|
var increment = models.NewSysLockerIncrement(10)
|
||||||
|
increment.Push("key", 1, 10)
|
||||||
|
t.Log(increment.MaxValue("key"))
|
||||||
|
for i := 0; i < 11; i++ {
|
||||||
|
result, value := increment.Pop("key")
|
||||||
|
t.Log(i, "=>", result, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 11; i++ {
|
||||||
|
result, value := increment.Pop("key1")
|
||||||
|
t.Log(i, "=>", result, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user