2021-04-05 20:48:33 +08:00
package models
import (
"github.com/TeaOSLab/EdgeAPI/internal/errors"
2021-08-24 14:22:44 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
2021-04-05 20:48:33 +08:00
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
2021-08-24 14:22:44 +08:00
"github.com/iwind/TeaGo/rands"
timeutil "github.com/iwind/TeaGo/utils/time"
2021-04-05 20:48:33 +08:00
"time"
)
type MessageTaskStatus = int
const (
MessageTaskStateEnabled = 1 // 已启用
MessageTaskStateDisabled = 0 // 已禁用
MessageTaskStatusNone MessageTaskStatus = 0 // 普通状态
MessageTaskStatusSending MessageTaskStatus = 1 // 发送中
MessageTaskStatusSuccess MessageTaskStatus = 2 // 发送成功
MessageTaskStatusFailed MessageTaskStatus = 3 // 发送失败
)
type MessageTaskDAO dbs . DAO
func NewMessageTaskDAO ( ) * MessageTaskDAO {
return dbs . NewDAO ( & MessageTaskDAO {
DAOObject : dbs . DAOObject {
DB : Tea . Env ,
Table : "edgeMessageTasks" ,
Model : new ( MessageTask ) ,
PkName : "id" ,
} ,
} ) . ( * MessageTaskDAO )
}
var SharedMessageTaskDAO * MessageTaskDAO
2021-08-24 14:22:44 +08:00
func init ( ) {
dbs . OnReadyDone ( func ( ) {
// 清理数据任务
var ticker = time . NewTicker ( time . Duration ( rands . Int ( 24 , 48 ) ) * time . Hour )
go func ( ) {
for range ticker . C {
err := SharedMessageTaskDAO . CleanExpiredMessageTasks ( nil , 30 ) // 只保留30天
if err != nil {
remotelogs . Error ( "SharedMessageTaskDAO" , "clean expired data failed: " + err . Error ( ) )
}
}
} ( )
} )
}
2021-04-05 20:48:33 +08:00
func init ( ) {
dbs . OnReady ( func ( ) {
SharedMessageTaskDAO = NewMessageTaskDAO ( )
} )
}
2021-04-12 19:19:15 +08:00
// EnableMessageTask 启用条目
2021-04-05 20:48:33 +08:00
func ( this * MessageTaskDAO ) EnableMessageTask ( tx * dbs . Tx , id int64 ) error {
_ , err := this . Query ( tx ) .
Pk ( id ) .
Set ( "state" , MessageTaskStateEnabled ) .
Update ( )
return err
}
2021-04-12 19:19:15 +08:00
// DisableMessageTask 禁用条目
2021-04-05 20:48:33 +08:00
func ( this * MessageTaskDAO ) DisableMessageTask ( tx * dbs . Tx , id int64 ) error {
_ , err := this . Query ( tx ) .
Pk ( id ) .
Set ( "state" , MessageTaskStateDisabled ) .
Update ( )
return err
}
2021-04-12 19:19:15 +08:00
// FindEnabledMessageTask 查找启用中的条目
2021-04-05 20:48:33 +08:00
func ( this * MessageTaskDAO ) FindEnabledMessageTask ( tx * dbs . Tx , id int64 ) ( * MessageTask , error ) {
result , err := this . Query ( tx ) .
Pk ( id ) .
Attr ( "state" , MessageTaskStateEnabled ) .
Find ( )
if result == nil {
return nil , err
}
return result . ( * MessageTask ) , err
}
2021-04-12 19:19:15 +08:00
// CreateMessageTask 创建任务
2021-04-05 20:48:33 +08:00
func ( this * MessageTaskDAO ) CreateMessageTask ( tx * dbs . Tx , recipientId int64 , instanceId int64 , user string , subject string , body string , isPrimary bool ) ( int64 , error ) {
op := NewMessageTaskOperator ( )
op . RecipientId = recipientId
op . InstanceId = instanceId
op . User = user
op . Subject = subject
op . Body = body
op . IsPrimary = isPrimary
2021-08-24 14:22:44 +08:00
op . Day = timeutil . Format ( "Ymd" )
2021-04-05 20:48:33 +08:00
op . Status = MessageTaskStatusNone
op . State = MessageTaskStateEnabled
return this . SaveInt64 ( tx , op )
}
2021-04-12 19:19:15 +08:00
// FindSendingMessageTasks 查找需要发送的任务
2021-04-05 20:48:33 +08:00
func ( this * MessageTaskDAO ) FindSendingMessageTasks ( tx * dbs . Tx , size int64 ) ( result [ ] * MessageTask , err error ) {
if size <= 0 {
return nil , nil
}
_ , err = this . Query ( tx ) .
State ( MessageTaskStateEnabled ) .
Attr ( "status" , MessageTaskStatusNone ) .
2021-08-24 17:46:11 +08:00
Where ( "(recipientId=0 OR recipientId IN (SELECT id FROM " + SharedMessageRecipientDAO . Table + " WHERE state=1 AND isOn=1 AND (timeFrom IS NULL OR timeTo IS NULL OR :time BETWEEN timeFrom AND timeTo)))" ) .
Param ( "time" , timeutil . Format ( "H:i:s" ) ) .
2021-04-05 20:48:33 +08:00
Desc ( "isPrimary" ) .
AscPk ( ) .
Limit ( size ) .
Slice ( & result ) .
FindAll ( )
return
}
2021-08-24 14:22:44 +08:00
// CountMessageTasksWithStatus 根据状态计算任务数量
func ( this * MessageTaskDAO ) CountMessageTasksWithStatus ( tx * dbs . Tx , status MessageTaskStatus ) ( int64 , error ) {
return this . Query ( tx ) .
State ( MessageTaskStateEnabled ) .
Attr ( "status" , status ) .
Count ( )
}
// ListMessageTasksWithStatus 根据状态列出单页任务
func ( this * MessageTaskDAO ) ListMessageTasksWithStatus ( tx * dbs . Tx , status MessageTaskStatus , offset int64 , size int64 ) ( result [ ] * MessageTask , err error ) {
_ , err = this . Query ( tx ) .
State ( MessageTaskStateEnabled ) .
Attr ( "status" , status ) .
Desc ( "isPrimary" ) .
AscPk ( ) .
Offset ( offset ) .
Limit ( size ) .
Slice ( & result ) .
FindAll ( )
return
}
2021-04-12 19:19:15 +08:00
// UpdateMessageTaskStatus 设置发送的状态
2021-04-05 20:48:33 +08:00
func ( this * MessageTaskDAO ) UpdateMessageTaskStatus ( tx * dbs . Tx , taskId int64 , status MessageTaskStatus , result [ ] byte ) error {
if taskId <= 0 {
return errors . New ( "invalid taskId" )
}
op := NewMessageTaskOperator ( )
op . Id = taskId
op . Status = status
op . SentAt = time . Now ( ) . Unix ( )
if len ( result ) > 0 {
op . Result = result
}
return this . Save ( tx , op )
}
2021-04-12 19:19:15 +08:00
// CreateMessageTasks 从集群、节点或者服务中创建任务
2021-08-24 14:22:44 +08:00
func ( this * MessageTaskDAO ) CreateMessageTasks ( tx * dbs . Tx , role nodeconfigs . NodeRole , clusterId int64 , nodeId int64 , serverId int64 , messageType MessageType , subject string , body string ) error {
receivers , err := SharedMessageReceiverDAO . FindEnabledBestFitReceivers ( tx , role , clusterId , nodeId , serverId , messageType )
2021-04-12 19:19:15 +08:00
if err != nil {
return err
}
2021-04-13 21:30:40 +08:00
allRecipientIds := [ ] int64 { }
2021-04-12 19:19:15 +08:00
for _ , receiver := range receivers {
if receiver . RecipientId > 0 {
2021-04-13 21:30:40 +08:00
allRecipientIds = append ( allRecipientIds , int64 ( receiver . RecipientId ) )
2021-04-12 19:19:15 +08:00
} else if receiver . RecipientGroupId > 0 {
recipientIds , err := SharedMessageRecipientDAO . FindAllEnabledAndOnRecipientIdsWithGroup ( tx , int64 ( receiver . RecipientGroupId ) )
if err != nil {
return err
}
2021-04-13 21:30:40 +08:00
allRecipientIds = append ( allRecipientIds , recipientIds ... )
2021-04-12 19:19:15 +08:00
}
}
2021-04-13 21:30:40 +08:00
sentMap := map [ int64 ] bool { } // recipientId => bool 用来检查是否已经发送,防止重复发送给某个接收人
for _ , recipientId := range allRecipientIds {
_ , ok := sentMap [ recipientId ]
if ok {
continue
}
sentMap [ recipientId ] = true
_ , err := this . CreateMessageTask ( tx , recipientId , 0 , "" , subject , body , false )
if err != nil {
return err
}
}
2021-04-12 19:19:15 +08:00
return nil
}
2021-08-24 14:22:44 +08:00
// CleanExpiredMessageTasks 清理
func ( this * MessageTaskDAO ) CleanExpiredMessageTasks ( tx * dbs . Tx , days int ) error {
if days <= 0 {
days = 30
}
var day = timeutil . Format ( "Ymd" , time . Now ( ) . AddDate ( 0 , 0 , - days ) )
_ , err := this . Query ( tx ) .
Where ( "(day IS NULL OR day<:day)" ) .
Param ( "day" , day ) .
Delete ( )
return err
}