2020-09-26 08:06:40 +08:00
package models
import (
2023-07-04 22:02:17 +08:00
"errors"
2023-07-02 15:27:49 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/zero"
2020-09-26 08:06:40 +08:00
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
2021-02-02 15:25:40 +08:00
"github.com/iwind/TeaGo/maps"
2020-09-26 08:06:40 +08:00
"github.com/iwind/TeaGo/types"
2023-07-04 22:02:17 +08:00
"strings"
2020-09-26 08:06:40 +08:00
"time"
)
type SysLockerDAO dbs . DAO
2023-07-02 15:27:49 +08:00
// concurrent transactions control
// 考虑到存在多个API节点的可能性, 容量不能太大, 也不能使用mutex
var sysLockerConcurrentLimiter = make ( chan zero . Zero , 8 )
2020-09-26 08:06:40 +08:00
func NewSysLockerDAO ( ) * SysLockerDAO {
return dbs . NewDAO ( & SysLockerDAO {
DAOObject : dbs . DAOObject {
DB : Tea . Env ,
Table : "edgeSysLockers" ,
Model : new ( SysLocker ) ,
PkName : "id" ,
} ,
} ) . ( * SysLockerDAO )
}
2020-10-13 20:05:13 +08:00
var SharedSysLockerDAO * SysLockerDAO
func init ( ) {
dbs . OnReady ( func ( ) {
SharedSysLockerDAO = NewSysLockerDAO ( )
} )
}
2020-09-26 08:06:40 +08:00
2022-02-14 09:00:24 +08:00
// Lock 开锁
2021-01-14 16:34:52 +08:00
func ( this * SysLockerDAO ) Lock ( tx * dbs . Tx , key string , timeout int64 ) ( ok bool , err error ) {
2020-09-26 08:06:40 +08:00
maxErrors := 5
for {
2021-01-01 23:31:30 +08:00
one , err := this . Query ( tx ) .
2020-09-26 08:06:40 +08:00
Attr ( "key" , key ) .
Find ( )
if err != nil {
maxErrors --
if maxErrors < 0 {
return false , err
}
continue
}
// 如果没有锁,则创建
if one == nil {
2022-07-24 09:56:27 +08:00
var op = NewSysLockerOperator ( )
2020-09-26 08:06:40 +08:00
op . Key = key
op . TimeoutAt = time . Now ( ) . Unix ( ) + timeout
op . Version = 1
2021-01-01 23:31:30 +08:00
err := this . Save ( tx , op )
2020-09-26 08:06:40 +08:00
if err != nil {
maxErrors --
if maxErrors < 0 {
return false , err
}
continue
}
return true , nil
}
// 如果已经有锁
locker := one . ( * SysLocker )
if time . Now ( ) . Unix ( ) <= int64 ( locker . TimeoutAt ) {
return false , nil
}
// 修改
2022-07-24 09:56:27 +08:00
var op = NewSysLockerOperator ( )
2020-09-26 08:06:40 +08:00
op . Id = locker . Id
op . Version = locker . Version + 1
op . TimeoutAt = time . Now ( ) . Unix ( ) + timeout
2021-01-01 23:31:30 +08:00
err = this . Save ( tx , op )
2020-09-26 08:06:40 +08:00
if err != nil {
maxErrors --
if maxErrors < 0 {
return false , err
}
continue
}
// 再次查询版本
2021-01-01 23:31:30 +08:00
version , err := this . Query ( tx ) .
2020-09-26 08:06:40 +08:00
Attr ( "key" , key ) .
Result ( "version" ) .
FindCol ( "0" )
if err != nil {
maxErrors --
if maxErrors < 0 {
return false , err
}
continue
}
if types . Int64 ( version ) != int64 ( locker . Version ) + 1 {
return false , nil
}
return true , nil
}
}
2022-02-14 09:00:24 +08:00
// Unlock 解锁
2021-01-01 23:31:30 +08:00
func ( this * SysLockerDAO ) Unlock ( tx * dbs . Tx , key string ) error {
_ , err := this . Query ( tx ) .
2020-09-26 08:06:40 +08:00
Attr ( "key" , key ) .
Set ( "timeoutAt" , time . Now ( ) . Unix ( ) - 86400 * 365 ) .
Update ( )
return err
}
2021-02-02 15:25:40 +08:00
2022-02-14 09:00:24 +08:00
// Increase 增加版本号
2021-02-02 15:25:40 +08:00
func ( this * SysLockerDAO ) Increase ( tx * dbs . Tx , key string , defaultValue int64 ) ( int64 , error ) {
2023-07-04 22:02:17 +08:00
// validate key
if strings . Contains ( key , "'" ) {
return 0 , errors . New ( "invalid key '" + key + "'" )
}
2021-02-02 15:25:40 +08:00
if tx == nil {
var result int64
var err error
2023-07-02 15:27:49 +08:00
sysLockerConcurrentLimiter <- zero . Zero { } // push
defer func ( ) {
<- sysLockerConcurrentLimiter // pop
} ( )
2021-02-02 15:25:40 +08:00
err = this . Instance . RunTx ( func ( tx * dbs . Tx ) error {
result , err = this . Increase ( tx , key , defaultValue )
if err != nil {
return err
}
return nil
} )
return result , err
}
2023-07-04 22:02:17 +08:00
// combine statements to make increasing faster
colValue , err := tx . FindCol ( 0 , "INSERT INTO `edgeSysLockers` (`key`, `version`) VALUES ('" + key + "', " + types . String ( defaultValue ) + ") ON DUPLICATE KEY UPDATE `version`=`version`+1; SELECT `version` FROM edgeSysLockers WHERE `key`='" + key + "'" )
if err != nil {
if CheckSQLErrCode ( err , 1064 /** syntax error **/ ) {
// continue to use seperated query
err = nil
} else {
return 0 , err
}
} else {
return types . Int64 ( colValue ) , nil
}
err = this . Query ( tx ) .
2023-07-04 14:42:14 +08:00
Reuse ( false ) . // no need to prepare statement in every transaction
2021-02-02 15:25:40 +08:00
InsertOrUpdateQuickly ( maps . Map {
"key" : key ,
"version" : defaultValue ,
} , maps . Map {
"version" : dbs . SQL ( "version+1" ) ,
} )
if err != nil {
return 0 , err
}
return this . Query ( tx ) .
2023-07-04 14:42:14 +08:00
Reuse ( false ) . // no need to prepare statement in every transaction
2021-02-02 15:25:40 +08:00
Attr ( "key" , key ) .
Result ( "version" ) .
FindInt64Col ( 0 )
}
2022-10-20 21:47:21 +08:00
// 读取当前版本号
func ( this * SysLockerDAO ) Read ( tx * dbs . Tx , key string ) ( int64 , error ) {
return this . Query ( tx ) .
Attr ( "key" , key ) .
Result ( "version" ) .
FindInt64Col ( 0 )
2023-07-02 15:27:49 +08:00
}