Files
EdgeAPI/internal/db/models/message_task_dao.go

146 lines
3.8 KiB
Go
Raw Normal View History

2021-04-05 20:48:33 +08:00
package models
import (
"github.com/TeaOSLab/EdgeAPI/internal/errors"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
"time"
)
type MessageTaskStatus = int
const (
MessageTaskStateEnabled = 1 // 已启用
MessageTaskStateDisabled = 0 // 已禁用
MessageTaskStatusNone MessageTaskStatus = 0 // 普通状态
MessageTaskStatusSending MessageTaskStatus = 1 // 发送中
MessageTaskStatusSuccess MessageTaskStatus = 2 // 发送成功
MessageTaskStatusFailed MessageTaskStatus = 3 // 发送失败
)
type MessageTaskDAO dbs.DAO
func NewMessageTaskDAO() *MessageTaskDAO {
return dbs.NewDAO(&MessageTaskDAO{
DAOObject: dbs.DAOObject{
DB: Tea.Env,
Table: "edgeMessageTasks",
Model: new(MessageTask),
PkName: "id",
},
}).(*MessageTaskDAO)
}
var SharedMessageTaskDAO *MessageTaskDAO
func init() {
dbs.OnReady(func() {
SharedMessageTaskDAO = NewMessageTaskDAO()
})
}
2021-04-12 19:19:15 +08:00
// EnableMessageTask 启用条目
2021-04-05 20:48:33 +08:00
func (this *MessageTaskDAO) EnableMessageTask(tx *dbs.Tx, id int64) error {
_, err := this.Query(tx).
Pk(id).
Set("state", MessageTaskStateEnabled).
Update()
return err
}
2021-04-12 19:19:15 +08:00
// DisableMessageTask 禁用条目
2021-04-05 20:48:33 +08:00
func (this *MessageTaskDAO) DisableMessageTask(tx *dbs.Tx, id int64) error {
_, err := this.Query(tx).
Pk(id).
Set("state", MessageTaskStateDisabled).
Update()
return err
}
2021-04-12 19:19:15 +08:00
// FindEnabledMessageTask 查找启用中的条目
2021-04-05 20:48:33 +08:00
func (this *MessageTaskDAO) FindEnabledMessageTask(tx *dbs.Tx, id int64) (*MessageTask, error) {
result, err := this.Query(tx).
Pk(id).
Attr("state", MessageTaskStateEnabled).
Find()
if result == nil {
return nil, err
}
return result.(*MessageTask), err
}
2021-04-12 19:19:15 +08:00
// CreateMessageTask 创建任务
2021-04-05 20:48:33 +08:00
func (this *MessageTaskDAO) CreateMessageTask(tx *dbs.Tx, recipientId int64, instanceId int64, user string, subject string, body string, isPrimary bool) (int64, error) {
op := NewMessageTaskOperator()
op.RecipientId = recipientId
op.InstanceId = instanceId
op.User = user
op.Subject = subject
op.Body = body
op.IsPrimary = isPrimary
op.Status = MessageTaskStatusNone
op.State = MessageTaskStateEnabled
return this.SaveInt64(tx, op)
}
2021-04-12 19:19:15 +08:00
// FindSendingMessageTasks 查找需要发送的任务
2021-04-05 20:48:33 +08:00
func (this *MessageTaskDAO) FindSendingMessageTasks(tx *dbs.Tx, size int64) (result []*MessageTask, err error) {
if size <= 0 {
return nil, nil
}
_, err = this.Query(tx).
State(MessageTaskStateEnabled).
Attr("status", MessageTaskStatusNone).
Desc("isPrimary").
AscPk().
Limit(size).
Slice(&result).
FindAll()
return
}
2021-04-12 19:19:15 +08:00
// UpdateMessageTaskStatus 设置发送的状态
2021-04-05 20:48:33 +08:00
func (this *MessageTaskDAO) UpdateMessageTaskStatus(tx *dbs.Tx, taskId int64, status MessageTaskStatus, result []byte) error {
if taskId <= 0 {
return errors.New("invalid taskId")
}
op := NewMessageTaskOperator()
op.Id = taskId
op.Status = status
op.SentAt = time.Now().Unix()
if len(result) > 0 {
op.Result = result
}
return this.Save(tx, op)
}
2021-04-12 19:19:15 +08:00
// CreateMessageTasks 从集群、节点或者服务中创建任务
func (this *MessageTaskDAO) CreateMessageTasks(tx *dbs.Tx, target MessageTaskTarget, messageType MessageType, subject string, body string) error {
receivers, err := SharedMessageReceiverDAO.FindAllReceivers(tx, target, messageType)
if err != nil {
return err
}
for _, receiver := range receivers {
if receiver.RecipientId > 0 {
_, err := this.CreateMessageTask(tx, int64(receiver.RecipientId), 0, "", subject, body, false)
if err != nil {
return err
}
} else if receiver.RecipientGroupId > 0 {
recipientIds, err := SharedMessageRecipientDAO.FindAllEnabledAndOnRecipientIdsWithGroup(tx, int64(receiver.RecipientGroupId))
if err != nil {
return err
}
for _, recipientId := range recipientIds {
_, err := this.CreateMessageTask(tx, recipientId, 0, "", subject, body, false)
if err != nil {
return err
}
}
}
}
return nil
}