package services import ( "context" "encoding/json" "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/utils" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/iwind/TeaGo/types" ) // MessageTaskService 消息发送任务服务 type MessageTaskService struct { BaseService } // CreateMessageTask 创建任务 func (this *MessageTaskService) CreateMessageTask(ctx context.Context, req *pb.CreateMessageTaskRequest) (*pb.CreateMessageTaskResponse, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() taskId, err := models.SharedMessageTaskDAO.CreateMessageTask(tx, req.MessageRecipientId, req.MessageMediaInstanceId, req.User, req.Subject, req.Body, req.IsPrimary) if err != nil { return nil, err } return &pb.CreateMessageTaskResponse{MessageTaskId: taskId}, nil } // DeleteMessageTask 删除消息任务 func (this *MessageTaskService) DeleteMessageTask(ctx context.Context, req *pb.DeleteMessageTaskRequest) (*pb.RPCSuccess, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() err = models.SharedMessageTaskDAO.DisableMessageTask(tx, req.MessageTaskId) if err != nil { return nil, err } return this.Success() } // FindEnabledMessageTask 读取消息任务状态 func (this *MessageTaskService) FindEnabledMessageTask(ctx context.Context, req *pb.FindEnabledMessageTaskRequest) (*pb.FindEnabledMessageTaskResponse, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() var cacheMap = utils.NewCacheMap() task, err := models.SharedMessageTaskDAO.FindEnabledMessageTask(tx, req.MessageTaskId) if err != nil { return nil, err } if task == nil { return &pb.FindEnabledMessageTaskResponse{MessageTask: nil}, nil } var pbRecipient *pb.MessageRecipient if task.RecipientId > 0 { recipient, err := models.SharedMessageRecipientDAO.FindEnabledMessageRecipient(tx, int64(task.RecipientId), cacheMap) if err != nil { return nil, err } if recipient == nil || !recipient.IsOn { // 如果发送人已经删除或者禁用,则删除此消息 err = models.SharedMessageTaskDAO.DisableMessageTask(tx, int64(task.Id)) if err != nil { return nil, err } return &pb.FindEnabledMessageTaskResponse{MessageTask: nil}, nil } // 媒介 instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(recipient.InstanceId), cacheMap) if err != nil { return nil, err } if instance == nil || !instance.IsOn { // 如果媒介实例已经删除或者禁用,则删除此消息 err = models.SharedMessageTaskDAO.DisableMessageTask(tx, int64(task.Id)) if err != nil { return nil, err } return &pb.FindEnabledMessageTaskResponse{MessageTask: nil}, nil } pbRecipient = &pb.MessageRecipient{ MessageMediaInstance: &pb.MessageMediaInstance{ Id: int64(instance.Id), MessageMedia: &pb.MessageMedia{ Type: instance.MediaType, }, ParamsJSON: instance.Params, }, } } else { // 没有指定既定的接收人 // 媒介 instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(task.InstanceId), cacheMap) if err != nil { return nil, err } if instance == nil || !instance.IsOn { // 如果媒介实例已经删除或者禁用,则删除此消息 err = models.SharedMessageTaskDAO.DisableMessageTask(tx, int64(task.Id)) if err != nil { return nil, err } return &pb.FindEnabledMessageTaskResponse{MessageTask: nil}, nil } pbRecipient = &pb.MessageRecipient{ Id: 0, MessageMediaInstance: &pb.MessageMediaInstance{ Id: int64(instance.Id), MessageMedia: &pb.MessageMedia{ Type: instance.MediaType, }, ParamsJSON: instance.Params, }, } } var result = &pb.MessageTaskResult{} if len(task.Result) > 0 { err = json.Unmarshal(task.Result, result) if err != nil { return nil, err } } return &pb.FindEnabledMessageTaskResponse{MessageTask: &pb.MessageTask{ Id: int64(task.Id), MessageRecipient: pbRecipient, User: task.User, Subject: task.Subject, Body: task.Body, CreatedAt: int64(task.CreatedAt), Status: int32(task.Status), SentAt: int64(task.SentAt), Result: result, }}, nil } // CountMessageTasksWithStatus 计算某个状态的消息任务数量 func (this *MessageTaskService) CountMessageTasksWithStatus(ctx context.Context, req *pb.CountMessageTasksWithStatusRequest) (*pb.RPCCountResponse, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() count, err := models.SharedMessageTaskDAO.CountMessageTasksWithStatus(tx, types.Int(req.Status)) if err != nil { return nil, err } return this.SuccessCount(count) } // ListMessageTasksWithStatus 根据状态列出某页任务 func (this *MessageTaskService) ListMessageTasksWithStatus(ctx context.Context, req *pb.ListMessageTasksWithStatusRequest) (*pb.ListMessageTasksWithStatusResponse, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() var cacheMap = utils.NewCacheMap() tasks, err := models.SharedMessageTaskDAO.ListMessageTasksWithStatus(tx, types.Int(req.Status), req.Offset, req.Size) if err != nil { return nil, err } var pbTasks = []*pb.MessageTask{} for _, task := range tasks { var pbRecipient *pb.MessageRecipient if task.RecipientId > 0 { recipient, err := models.SharedMessageRecipientDAO.FindEnabledMessageRecipient(tx, int64(task.RecipientId), cacheMap) if err != nil { return nil, err } if recipient == nil || !recipient.IsOn { // 如果发送人已经删除或者禁用,则删除此消息 err = models.SharedMessageTaskDAO.DisableMessageTask(tx, int64(task.Id)) if err != nil { return nil, err } continue } // 媒介 instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(recipient.InstanceId), cacheMap) if err != nil { return nil, err } if instance == nil || !instance.IsOn { // 如果媒介实例已经删除或者禁用,则删除此消息 err = models.SharedMessageTaskDAO.DisableMessageTask(tx, int64(task.Id)) if err != nil { return nil, err } continue } pbRecipient = &pb.MessageRecipient{ Id: int64(recipient.Id), User: recipient.User, MessageMediaInstance: &pb.MessageMediaInstance{ Id: int64(instance.Id), Name: instance.Name, MessageMedia: &pb.MessageMedia{ Type: instance.MediaType, }, ParamsJSON: instance.Params, RateJSON: instance.Rate, }, } } else { // 没有指定既定的接收人 // 媒介 instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(task.InstanceId), cacheMap) if err != nil { return nil, err } if instance == nil || !instance.IsOn { // 如果媒介实例已经删除或者禁用,则删除此消息 err = models.SharedMessageTaskDAO.DisableMessageTask(tx, int64(task.Id)) if err != nil { return nil, err } continue } pbRecipient = &pb.MessageRecipient{ Id: 0, MessageMediaInstance: &pb.MessageMediaInstance{ Id: int64(instance.Id), Name: instance.Name, MessageMedia: &pb.MessageMedia{ Type: instance.MediaType, }, ParamsJSON: instance.Params, RateJSON: instance.Rate, }, } } var result = &pb.MessageTaskResult{} if len(task.Result) > 0 { err = json.Unmarshal(task.Result, result) if err != nil { return nil, err } } pbTasks = append(pbTasks, &pb.MessageTask{ Id: int64(task.Id), MessageRecipient: pbRecipient, User: task.User, Subject: task.Subject, Body: task.Body, CreatedAt: int64(task.CreatedAt), Status: types.Int32(task.Status), SentAt: int64(task.SentAt), Result: result, }) } return &pb.ListMessageTasksWithStatusResponse{MessageTasks: pbTasks}, nil }