diff --git a/internal/db/models/message_task_dao.go b/internal/db/models/message_task_dao.go index e58e592e..67a05c6d 100644 --- a/internal/db/models/message_task_dao.go +++ b/internal/db/models/message_task_dao.go @@ -1,17 +1,12 @@ package models import ( - teaconst "github.com/TeaOSLab/EdgeAPI/internal/const" - "github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/goman" "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" - "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/rands" - "github.com/iwind/TeaGo/types" - stringutil "github.com/iwind/TeaGo/utils/string" timeutil "github.com/iwind/TeaGo/utils/time" "time" ) @@ -87,151 +82,6 @@ func (this *MessageTaskDAO) FindEnabledMessageTask(tx *dbs.Tx, id int64) (*Messa return result.(*MessageTask), err } -// CreateMessageTask 创建任务 -func (this *MessageTaskDAO) CreateMessageTask(tx *dbs.Tx, recipientId int64, instanceId int64, user string, subject string, body string, isPrimary bool) (int64, error) { - if !teaconst.IsPlus { - return 0, nil - } - - var hash = stringutil.Md5(types.String(recipientId) + "@" + types.String(instanceId) + "@" + user + "@" + subject + "@" + types.String(isPrimary)) - recipientInstanceId, err := SharedMessageRecipientDAO.FindRecipientInstanceId(tx, recipientId) - if err != nil { - return 0, err - } - if recipientInstanceId > 0 { - hashLifeSeconds, err := SharedMessageMediaInstanceDAO.FindInstanceHashLifeSeconds(tx, recipientInstanceId) - if err != nil { - return 0, err - } - - if hashLifeSeconds >= 0 { // 意味着此值如果小于0,则不做判断 - lastMessageAt, err := this.Query(tx). - Attr("hash", hash). - Result("createdAt"). - DescPk(). - FindInt64Col(0) - if err != nil { - return 0, err - } - - // 对于同一个人N分钟内消息不重复发送 - if hashLifeSeconds <= 0 { - hashLifeSeconds = 60 - } - if lastMessageAt > 0 && time.Now().Unix()-lastMessageAt < int64(hashLifeSeconds) { - return 0, nil - } - } - } - - var op = NewMessageTaskOperator() - op.RecipientId = recipientId - op.InstanceId = instanceId - op.Hash = hash - op.User = user - op.Subject = subject - op.Body = body - op.IsPrimary = isPrimary - op.Day = timeutil.Format("Ymd") - op.Status = MessageTaskStatusNone - op.State = MessageTaskStateEnabled - return this.SaveInt64(tx, op) -} - -// FindSendingMessageTasks 查找需要发送的任务 -func (this *MessageTaskDAO) FindSendingMessageTasks(tx *dbs.Tx, size int64) (result []*MessageTask, err error) { - if size <= 0 { - return nil, nil - } - _, err = this.Query(tx). - State(MessageTaskStateEnabled). - Attr("status", MessageTaskStatusNone). - Where("(recipientId=0 OR recipientId IN (SELECT id FROM "+SharedMessageRecipientDAO.Table+" WHERE state=1 AND isOn=1 AND (timeFrom IS NULL OR timeTo IS NULL OR :time BETWEEN timeFrom AND timeTo)))"). - Param("time", timeutil.Format("H:i:s")). - Desc("isPrimary"). - AscPk(). - Limit(size). - Slice(&result). - FindAll() - return -} - -// CountMessageTasksWithStatus 根据状态计算任务数量 -func (this *MessageTaskDAO) CountMessageTasksWithStatus(tx *dbs.Tx, status MessageTaskStatus) (int64, error) { - return this.Query(tx). - State(MessageTaskStateEnabled). - Attr("status", status). - Count() -} - -// ListMessageTasksWithStatus 根据状态列出单页任务 -func (this *MessageTaskDAO) ListMessageTasksWithStatus(tx *dbs.Tx, status MessageTaskStatus, offset int64, size int64) (result []*MessageTask, err error) { - _, err = this.Query(tx). - State(MessageTaskStateEnabled). - Attr("status", status). - Desc("isPrimary"). - AscPk(). - Offset(offset). - Limit(size). - Slice(&result). - FindAll() - return -} - -// UpdateMessageTaskStatus 设置发送的状态 -func (this *MessageTaskDAO) UpdateMessageTaskStatus(tx *dbs.Tx, taskId int64, status MessageTaskStatus, result []byte) error { - if taskId <= 0 { - return errors.New("invalid taskId") - } - var op = NewMessageTaskOperator() - op.Id = taskId - op.Status = status - op.SentAt = time.Now().Unix() - if len(result) > 0 { - op.Result = result - } - return this.Save(tx, op) -} - -// CreateMessageTasks 从集群、节点或者服务中创建任务 -func (this *MessageTaskDAO) CreateMessageTasks(tx *dbs.Tx, role nodeconfigs.NodeRole, clusterId int64, nodeId int64, serverId int64, messageType MessageType, subject string, body string) error { - if !teaconst.IsPlus { - return nil - } - - receivers, err := SharedMessageReceiverDAO.FindEnabledBestFitReceivers(tx, role, clusterId, nodeId, serverId, messageType) - if err != nil { - return err - } - var allRecipientIds = []int64{} - for _, receiver := range receivers { - if receiver.RecipientId > 0 { - allRecipientIds = append(allRecipientIds, int64(receiver.RecipientId)) - } else if receiver.RecipientGroupId > 0 { - recipientIds, err := SharedMessageRecipientDAO.FindAllEnabledAndOnRecipientIdsWithGroup(tx, int64(receiver.RecipientGroupId)) - if err != nil { - return err - } - allRecipientIds = append(allRecipientIds, recipientIds...) - } - } - - var sentMap = map[int64]bool{} // recipientId => bool 用来检查是否已经发送,防止重复发送给某个接收人 - for _, recipientId := range allRecipientIds { - _, ok := sentMap[recipientId] - if ok { - continue - } - sentMap[recipientId] = true - _, err := this.CreateMessageTask(tx, recipientId, 0, "", subject, body, false) - if err != nil { - return err - } - } - - return nil -} - // CleanExpiredMessageTasks 清理 func (this *MessageTaskDAO) CleanExpiredMessageTasks(tx *dbs.Tx, days int) error { if days <= 0 { diff --git a/internal/db/models/message_task_dao_ext.go b/internal/db/models/message_task_dao_ext.go new file mode 100644 index 00000000..fcae45a2 --- /dev/null +++ b/internal/db/models/message_task_dao_ext.go @@ -0,0 +1,14 @@ +// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . +//go:build !plus + +package models + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" + "github.com/iwind/TeaGo/dbs" +) + +// CreateMessageTasks 从集群、节点或者服务中创建任务 +func (this *MessageTaskDAO) CreateMessageTasks(tx *dbs.Tx, role nodeconfigs.NodeRole, clusterId int64, nodeId int64, serverId int64, messageType MessageType, subject string, body string) error { + return nil +}