优化消息任务相关代码

This commit is contained in:
刘祥超
2023-10-15 09:39:46 +08:00
parent 500d72aaf3
commit 2f361c5bcc
2 changed files with 14 additions and 150 deletions

View File

@@ -1,17 +1,12 @@
package models
import (
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
stringutil "github.com/iwind/TeaGo/utils/string"
timeutil "github.com/iwind/TeaGo/utils/time"
"time"
)
@@ -87,151 +82,6 @@ func (this *MessageTaskDAO) FindEnabledMessageTask(tx *dbs.Tx, id int64) (*Messa
return result.(*MessageTask), err
}
// CreateMessageTask 创建任务
func (this *MessageTaskDAO) CreateMessageTask(tx *dbs.Tx, recipientId int64, instanceId int64, user string, subject string, body string, isPrimary bool) (int64, error) {
if !teaconst.IsPlus {
return 0, nil
}
var hash = stringutil.Md5(types.String(recipientId) + "@" + types.String(instanceId) + "@" + user + "@" + subject + "@" + types.String(isPrimary))
recipientInstanceId, err := SharedMessageRecipientDAO.FindRecipientInstanceId(tx, recipientId)
if err != nil {
return 0, err
}
if recipientInstanceId > 0 {
hashLifeSeconds, err := SharedMessageMediaInstanceDAO.FindInstanceHashLifeSeconds(tx, recipientInstanceId)
if err != nil {
return 0, err
}
if hashLifeSeconds >= 0 { // 意味着此值如果小于0则不做判断
lastMessageAt, err := this.Query(tx).
Attr("hash", hash).
Result("createdAt").
DescPk().
FindInt64Col(0)
if err != nil {
return 0, err
}
// 对于同一个人N分钟内消息不重复发送
if hashLifeSeconds <= 0 {
hashLifeSeconds = 60
}
if lastMessageAt > 0 && time.Now().Unix()-lastMessageAt < int64(hashLifeSeconds) {
return 0, nil
}
}
}
var op = NewMessageTaskOperator()
op.RecipientId = recipientId
op.InstanceId = instanceId
op.Hash = hash
op.User = user
op.Subject = subject
op.Body = body
op.IsPrimary = isPrimary
op.Day = timeutil.Format("Ymd")
op.Status = MessageTaskStatusNone
op.State = MessageTaskStateEnabled
return this.SaveInt64(tx, op)
}
// FindSendingMessageTasks 查找需要发送的任务
func (this *MessageTaskDAO) FindSendingMessageTasks(tx *dbs.Tx, size int64) (result []*MessageTask, err error) {
if size <= 0 {
return nil, nil
}
_, err = this.Query(tx).
State(MessageTaskStateEnabled).
Attr("status", MessageTaskStatusNone).
Where("(recipientId=0 OR recipientId IN (SELECT id FROM "+SharedMessageRecipientDAO.Table+" WHERE state=1 AND isOn=1 AND (timeFrom IS NULL OR timeTo IS NULL OR :time BETWEEN timeFrom AND timeTo)))").
Param("time", timeutil.Format("H:i:s")).
Desc("isPrimary").
AscPk().
Limit(size).
Slice(&result).
FindAll()
return
}
// CountMessageTasksWithStatus 根据状态计算任务数量
func (this *MessageTaskDAO) CountMessageTasksWithStatus(tx *dbs.Tx, status MessageTaskStatus) (int64, error) {
return this.Query(tx).
State(MessageTaskStateEnabled).
Attr("status", status).
Count()
}
// ListMessageTasksWithStatus 根据状态列出单页任务
func (this *MessageTaskDAO) ListMessageTasksWithStatus(tx *dbs.Tx, status MessageTaskStatus, offset int64, size int64) (result []*MessageTask, err error) {
_, err = this.Query(tx).
State(MessageTaskStateEnabled).
Attr("status", status).
Desc("isPrimary").
AscPk().
Offset(offset).
Limit(size).
Slice(&result).
FindAll()
return
}
// UpdateMessageTaskStatus 设置发送的状态
func (this *MessageTaskDAO) UpdateMessageTaskStatus(tx *dbs.Tx, taskId int64, status MessageTaskStatus, result []byte) error {
if taskId <= 0 {
return errors.New("invalid taskId")
}
var op = NewMessageTaskOperator()
op.Id = taskId
op.Status = status
op.SentAt = time.Now().Unix()
if len(result) > 0 {
op.Result = result
}
return this.Save(tx, op)
}
// CreateMessageTasks 从集群、节点或者服务中创建任务
func (this *MessageTaskDAO) CreateMessageTasks(tx *dbs.Tx, role nodeconfigs.NodeRole, clusterId int64, nodeId int64, serverId int64, messageType MessageType, subject string, body string) error {
if !teaconst.IsPlus {
return nil
}
receivers, err := SharedMessageReceiverDAO.FindEnabledBestFitReceivers(tx, role, clusterId, nodeId, serverId, messageType)
if err != nil {
return err
}
var allRecipientIds = []int64{}
for _, receiver := range receivers {
if receiver.RecipientId > 0 {
allRecipientIds = append(allRecipientIds, int64(receiver.RecipientId))
} else if receiver.RecipientGroupId > 0 {
recipientIds, err := SharedMessageRecipientDAO.FindAllEnabledAndOnRecipientIdsWithGroup(tx, int64(receiver.RecipientGroupId))
if err != nil {
return err
}
allRecipientIds = append(allRecipientIds, recipientIds...)
}
}
var sentMap = map[int64]bool{} // recipientId => bool 用来检查是否已经发送,防止重复发送给某个接收人
for _, recipientId := range allRecipientIds {
_, ok := sentMap[recipientId]
if ok {
continue
}
sentMap[recipientId] = true
_, err := this.CreateMessageTask(tx, recipientId, 0, "", subject, body, false)
if err != nil {
return err
}
}
return nil
}
// CleanExpiredMessageTasks 清理
func (this *MessageTaskDAO) CleanExpiredMessageTasks(tx *dbs.Tx, days int) error {
if days <= 0 {

View File

@@ -0,0 +1,14 @@
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
//go:build !plus
package models
import (
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/iwind/TeaGo/dbs"
)
// CreateMessageTasks 从集群、节点或者服务中创建任务
func (this *MessageTaskDAO) CreateMessageTasks(tx *dbs.Tx, role nodeconfigs.NodeRole, clusterId int64, nodeId int64, serverId int64, messageType MessageType, subject string, body string) error {
return nil
}