mirror of
https://github.com/TeaOSLab/EdgeAPI.git
synced 2025-11-17 02:20:26 +08:00
通知媒介增加任务队列查看功能
This commit is contained in:
@@ -2,9 +2,13 @@ package models
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/errors"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
"github.com/iwind/TeaGo/dbs"
|
||||
"github.com/iwind/TeaGo/rands"
|
||||
timeutil "github.com/iwind/TeaGo/utils/time"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -35,6 +39,21 @@ func NewMessageTaskDAO() *MessageTaskDAO {
|
||||
|
||||
var SharedMessageTaskDAO *MessageTaskDAO
|
||||
|
||||
func init() {
|
||||
dbs.OnReadyDone(func() {
|
||||
// 清理数据任务
|
||||
var ticker = time.NewTicker(time.Duration(rands.Int(24, 48)) * time.Hour)
|
||||
go func() {
|
||||
for range ticker.C {
|
||||
err := SharedMessageTaskDAO.CleanExpiredMessageTasks(nil, 30) // 只保留30天
|
||||
if err != nil {
|
||||
remotelogs.Error("SharedMessageTaskDAO", "clean expired data failed: "+err.Error())
|
||||
}
|
||||
}
|
||||
}()
|
||||
})
|
||||
}
|
||||
|
||||
func init() {
|
||||
dbs.OnReady(func() {
|
||||
SharedMessageTaskDAO = NewMessageTaskDAO()
|
||||
@@ -80,6 +99,7 @@ func (this *MessageTaskDAO) CreateMessageTask(tx *dbs.Tx, recipientId int64, ins
|
||||
op.Subject = subject
|
||||
op.Body = body
|
||||
op.IsPrimary = isPrimary
|
||||
op.Day = timeutil.Format("Ymd")
|
||||
op.Status = MessageTaskStatusNone
|
||||
op.State = MessageTaskStateEnabled
|
||||
return this.SaveInt64(tx, op)
|
||||
@@ -101,6 +121,28 @@ func (this *MessageTaskDAO) FindSendingMessageTasks(tx *dbs.Tx, size int64) (res
|
||||
return
|
||||
}
|
||||
|
||||
// CountMessageTasksWithStatus 根据状态计算任务数量
|
||||
func (this *MessageTaskDAO) CountMessageTasksWithStatus(tx *dbs.Tx, status MessageTaskStatus) (int64, error) {
|
||||
return this.Query(tx).
|
||||
State(MessageTaskStateEnabled).
|
||||
Attr("status", status).
|
||||
Count()
|
||||
}
|
||||
|
||||
// ListMessageTasksWithStatus 根据状态列出单页任务
|
||||
func (this *MessageTaskDAO) ListMessageTasksWithStatus(tx *dbs.Tx, status MessageTaskStatus, offset int64, size int64) (result []*MessageTask, err error) {
|
||||
_, err = this.Query(tx).
|
||||
State(MessageTaskStateEnabled).
|
||||
Attr("status", status).
|
||||
Desc("isPrimary").
|
||||
AscPk().
|
||||
Offset(offset).
|
||||
Limit(size).
|
||||
Slice(&result).
|
||||
FindAll()
|
||||
return
|
||||
}
|
||||
|
||||
// UpdateMessageTaskStatus 设置发送的状态
|
||||
func (this *MessageTaskDAO) UpdateMessageTaskStatus(tx *dbs.Tx, taskId int64, status MessageTaskStatus, result []byte) error {
|
||||
if taskId <= 0 {
|
||||
@@ -117,8 +159,8 @@ func (this *MessageTaskDAO) UpdateMessageTaskStatus(tx *dbs.Tx, taskId int64, st
|
||||
}
|
||||
|
||||
// CreateMessageTasks 从集群、节点或者服务中创建任务
|
||||
func (this *MessageTaskDAO) CreateMessageTasks(tx *dbs.Tx, target MessageTaskTarget, messageType MessageType, subject string, body string) error {
|
||||
receivers, err := SharedMessageReceiverDAO.FindAllEnabledReceivers(tx, target, messageType)
|
||||
func (this *MessageTaskDAO) CreateMessageTasks(tx *dbs.Tx, role nodeconfigs.NodeRole, clusterId int64, nodeId int64, serverId int64, messageType MessageType, subject string, body string) error {
|
||||
receivers, err := SharedMessageReceiverDAO.FindEnabledBestFitReceivers(tx, role, clusterId, nodeId, serverId, messageType)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -150,3 +192,16 @@ func (this *MessageTaskDAO) CreateMessageTasks(tx *dbs.Tx, target MessageTaskTar
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CleanExpiredMessageTasks 清理
|
||||
func (this *MessageTaskDAO) CleanExpiredMessageTasks(tx *dbs.Tx, days int) error {
|
||||
if days <= 0 {
|
||||
days = 30
|
||||
}
|
||||
var day = timeutil.Format("Ymd", time.Now().AddDate(0, 0, -days))
|
||||
_, err := this.Query(tx).
|
||||
Where("(day IS NULL OR day<:day)").
|
||||
Param("day", day).
|
||||
Delete()
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user