所有数据库相关的操作支持事务

This commit is contained in:
GoEdgeLab
2021-01-01 23:31:30 +08:00
parent a478b82779
commit 155dd5b798
146 changed files with 2845 additions and 2068 deletions

View File

@@ -61,8 +61,8 @@ func init() {
}
// 启用条目
func (this *MessageDAO) EnableMessage(id int64) error {
_, err := this.Query().
func (this *MessageDAO) EnableMessage(tx *dbs.Tx, id int64) error {
_, err := this.Query(tx).
Pk(id).
Set("state", MessageStateEnabled).
Update()
@@ -70,8 +70,8 @@ func (this *MessageDAO) EnableMessage(id int64) error {
}
// 禁用条目
func (this *MessageDAO) DisableMessage(id int64) error {
_, err := this.Query().
func (this *MessageDAO) DisableMessage(tx *dbs.Tx, id int64) error {
_, err := this.Query(tx).
Pk(id).
Set("state", MessageStateDisabled).
Update()
@@ -79,8 +79,8 @@ func (this *MessageDAO) DisableMessage(id int64) error {
}
// 查找启用中的条目
func (this *MessageDAO) FindEnabledMessage(id int64) (*Message, error) {
result, err := this.Query().
func (this *MessageDAO) FindEnabledMessage(tx *dbs.Tx, id int64) (*Message, error) {
result, err := this.Query(tx).
Pk(id).
Attr("state", MessageStateEnabled).
Find()
@@ -91,19 +91,19 @@ func (this *MessageDAO) FindEnabledMessage(id int64) (*Message, error) {
}
// 创建集群消息
func (this *MessageDAO) CreateClusterMessage(clusterId int64, messageType MessageType, level string, body string, paramsJSON []byte) error {
_, err := this.createMessage(clusterId, 0, messageType, level, body, paramsJSON)
func (this *MessageDAO) CreateClusterMessage(tx *dbs.Tx, clusterId int64, messageType MessageType, level string, body string, paramsJSON []byte) error {
_, err := this.createMessage(tx, clusterId, 0, messageType, level, body, paramsJSON)
return err
}
// 创建节点消息
func (this *MessageDAO) CreateNodeMessage(clusterId int64, nodeId int64, messageType MessageType, level string, body string, paramsJSON []byte) error {
_, err := this.createMessage(clusterId, nodeId, messageType, level, body, paramsJSON)
func (this *MessageDAO) CreateNodeMessage(tx *dbs.Tx, clusterId int64, nodeId int64, messageType MessageType, level string, body string, paramsJSON []byte) error {
_, err := this.createMessage(tx, clusterId, nodeId, messageType, level, body, paramsJSON)
return err
}
// 创建普通消息
func (this *MessageDAO) CreateMessage(adminId int64, userId int64, messageType MessageType, level string, body string, paramsJSON []byte) error {
func (this *MessageDAO) CreateMessage(tx *dbs.Tx, adminId int64, userId int64, messageType MessageType, level string, body string, paramsJSON []byte) error {
h := md5.New()
h.Write([]byte(body))
h.Write(paramsJSON)
@@ -122,14 +122,14 @@ func (this *MessageDAO) CreateMessage(adminId int64, userId int64, messageType M
op.IsRead = false
op.Day = timeutil.Format("Ymd")
op.Hash = hash
err := this.Save(op)
err := this.Save(tx, op)
return err
}
// 删除某天之前的消息
func (this *MessageDAO) DeleteMessagesBeforeDay(dayTime time.Time) error {
func (this *MessageDAO) DeleteMessagesBeforeDay(tx *dbs.Tx, dayTime time.Time) error {
day := timeutil.Format("Ymd", dayTime)
_, err := this.Query().
_, err := this.Query(tx).
Where("day<:day").
Param("day", day).
Delete()
@@ -137,8 +137,8 @@ func (this *MessageDAO) DeleteMessagesBeforeDay(dayTime time.Time) error {
}
// 计算未读消息数量
func (this *MessageDAO) CountUnreadMessages(adminId int64, userId int64) (int64, error) {
query := this.Query().
func (this *MessageDAO) CountUnreadMessages(tx *dbs.Tx, adminId int64, userId int64) (int64, error) {
query := this.Query(tx).
Attr("isRead", false)
if adminId > 0 {
query.Where("(adminId=:adminId OR (adminId=0 AND userId=0))").
@@ -150,8 +150,8 @@ func (this *MessageDAO) CountUnreadMessages(adminId int64, userId int64) (int64,
}
// 列出单页未读消息
func (this *MessageDAO) ListUnreadMessages(adminId int64, userId int64, offset int64, size int64) (result []*Message, err error) {
query := this.Query().
func (this *MessageDAO) ListUnreadMessages(tx *dbs.Tx, adminId int64, userId int64, offset int64, size int64) (result []*Message, err error) {
query := this.Query(tx).
Attr("isRead", false)
if adminId > 0 {
query.Where("(adminId=:adminId OR (adminId=0 AND userId=0))").
@@ -169,22 +169,22 @@ func (this *MessageDAO) ListUnreadMessages(adminId int64, userId int64, offset i
}
// 设置消息已读状态
func (this *MessageDAO) UpdateMessageRead(messageId int64, b bool) error {
func (this *MessageDAO) UpdateMessageRead(tx *dbs.Tx, messageId int64, b bool) error {
if messageId <= 0 {
return errors.New("invalid messageId")
}
op := NewMessageOperator()
op.Id = messageId
op.IsRead = b
err := this.Save(op)
err := this.Save(tx, op)
return err
}
// 设置一组消息为已读状态
func (this *MessageDAO) UpdateMessagesRead(messageIds []int64, b bool) error {
func (this *MessageDAO) UpdateMessagesRead(tx *dbs.Tx, messageIds []int64, b bool) error {
// 这里我们一个一个更改因为In语句不容易Prepare且效率不高
for _, messageId := range messageIds {
err := this.UpdateMessageRead(messageId, b)
err := this.UpdateMessageRead(tx, messageId, b)
if err != nil {
return err
}
@@ -193,8 +193,8 @@ func (this *MessageDAO) UpdateMessagesRead(messageIds []int64, b bool) error {
}
// 设置所有消息为已读
func (this *MessageDAO) UpdateAllMessagesRead(adminId int64, userId int64) error {
query := this.Query().
func (this *MessageDAO) UpdateAllMessagesRead(tx *dbs.Tx, adminId int64, userId int64) error {
query := this.Query(tx).
Attr("isRead", false)
if adminId > 0 {
query.Where("(adminId=:adminId OR (adminId=0 AND userId=0))").
@@ -209,11 +209,11 @@ func (this *MessageDAO) UpdateAllMessagesRead(adminId int64, userId int64) error
}
// 检查消息权限
func (this *MessageDAO) CheckMessageUser(messageId int64, adminId int64, userId int64) (bool, error) {
func (this *MessageDAO) CheckMessageUser(tx *dbs.Tx, messageId int64, adminId int64, userId int64) (bool, error) {
if messageId <= 0 || (adminId <= 0 && userId <= 0) {
return false, nil
}
query := this.Query().
query := this.Query(tx).
Pk(messageId)
if adminId > 0 {
query.Where("(adminId=:adminId OR (adminId=0 AND userId=0))").
@@ -225,7 +225,7 @@ func (this *MessageDAO) CheckMessageUser(messageId int64, adminId int64, userId
}
// 创建消息
func (this *MessageDAO) createMessage(clusterId int64, nodeId int64, messageType MessageType, level string, body string, paramsJSON []byte) (int64, error) {
func (this *MessageDAO) createMessage(tx *dbs.Tx, clusterId int64, nodeId int64, messageType MessageType, level string, body string, paramsJSON []byte) (int64, error) {
h := md5.New()
h.Write([]byte(body))
h.Write(paramsJSON)
@@ -251,7 +251,7 @@ func (this *MessageDAO) createMessage(clusterId int64, nodeId int64, messageType
op.Day = timeutil.Format("Ymd")
op.Hash = hash
err := this.Save(op)
err := this.Save(tx, op)
if err != nil {
return 0, err
}