通知媒介增加任务队列查看功能

This commit is contained in:
GoEdgeLab
2021-08-24 14:22:44 +08:00
parent 55fe8f5707
commit 17b9eee78d
23 changed files with 489 additions and 165 deletions

View File

@@ -106,11 +106,7 @@ func (this *MessageDAO) CreateClusterMessage(tx *dbs.Tx, role string, clusterId
}
// 发送给媒介接收人
err = SharedMessageTaskDAO.CreateMessageTasks(tx, MessageTaskTarget{
ClusterId: clusterId,
NodeId: 0,
ServerId: 0,
}, messageType, subject, body)
err = SharedMessageTaskDAO.CreateMessageTasks(tx, role, 0, 0, 0, messageType, subject, body)
if err != nil {
return err
}
@@ -138,29 +134,10 @@ func (this *MessageDAO) CreateNodeMessage(tx *dbs.Tx, role string, clusterId int
return err
}
// TODO 目前只支持边缘节点发送消息将来要支持NS节点
if role == nodeconfigs.NodeRoleNode {
// 发送给媒介接收人 - 集群
err = SharedMessageTaskDAO.CreateMessageTasks(tx, MessageTaskTarget{
ClusterId: clusterId,
NodeId: 0,
ServerId: 0,
}, messageType, subject, body)
if err != nil {
return err
}
// 发送给媒介接收人 - 节点
if nodeId > 0 {
err = SharedMessageTaskDAO.CreateMessageTasks(tx, MessageTaskTarget{
ClusterId: clusterId,
NodeId: nodeId,
ServerId: 0,
}, messageType, subject, body)
if err != nil {
return err
}
}
// 发送给媒介接收人 - 集群
err = SharedMessageTaskDAO.CreateMessageTasks(tx, role, clusterId, nodeId, 0, messageType, subject, body)
if err != nil {
return err
}
return nil

View File

@@ -7,6 +7,7 @@ import (
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/types"
)
const (
@@ -35,7 +36,7 @@ func init() {
})
}
// 启用条目
// EnableMessageMediaInstance 启用条目
func (this *MessageMediaInstanceDAO) EnableMessageMediaInstance(tx *dbs.Tx, id int64) error {
_, err := this.Query(tx).
Pk(id).
@@ -44,7 +45,7 @@ func (this *MessageMediaInstanceDAO) EnableMessageMediaInstance(tx *dbs.Tx, id i
return err
}
// 禁用条目
// DisableMessageMediaInstance 禁用条目
func (this *MessageMediaInstanceDAO) DisableMessageMediaInstance(tx *dbs.Tx, id int64) error {
_, err := this.Query(tx).
Pk(id).
@@ -53,19 +54,31 @@ func (this *MessageMediaInstanceDAO) DisableMessageMediaInstance(tx *dbs.Tx, id
return err
}
// 查找启用中的条目
func (this *MessageMediaInstanceDAO) FindEnabledMessageMediaInstance(tx *dbs.Tx, id int64) (*MessageMediaInstance, error) {
// FindEnabledMessageMediaInstance 查找启用中的条目
func (this *MessageMediaInstanceDAO) FindEnabledMessageMediaInstance(tx *dbs.Tx, instanceId int64, cacheMap maps.Map) (*MessageMediaInstance, error) {
if cacheMap == nil {
cacheMap = maps.Map{}
}
var cacheKey = this.Table + ":record:" + types.String(instanceId)
var cache = cacheMap.Get(cacheKey)
if cache != nil {
return cache.(*MessageMediaInstance), nil
}
result, err := this.Query(tx).
Pk(id).
Pk(instanceId).
Attr("state", MessageMediaInstanceStateEnabled).
Find()
if result == nil {
return nil, err
}
cacheMap[cacheKey] = result
return result.(*MessageMediaInstance), err
}
// 创建媒介实例
// CreateMediaInstance 创建媒介实例
func (this *MessageMediaInstanceDAO) CreateMediaInstance(tx *dbs.Tx, name string, mediaType string, params maps.Map, description string) (int64, error) {
op := NewMessageMediaInstanceOperator()
op.Name = name
@@ -88,7 +101,7 @@ func (this *MessageMediaInstanceDAO) CreateMediaInstance(tx *dbs.Tx, name string
return this.SaveInt64(tx, op)
}
// 修改媒介实例
// UpdateMediaInstance 修改媒介实例
func (this *MessageMediaInstanceDAO) UpdateMediaInstance(tx *dbs.Tx, instanceId int64, name string, mediaType string, params maps.Map, description string, isOn bool) error {
if instanceId <= 0 {
return errors.New("invalid instanceId")
@@ -114,7 +127,7 @@ func (this *MessageMediaInstanceDAO) UpdateMediaInstance(tx *dbs.Tx, instanceId
return this.Save(tx, op)
}
// 计算接收人数量
// CountAllEnabledMediaInstances 计算接收人数量
func (this *MessageMediaInstanceDAO) CountAllEnabledMediaInstances(tx *dbs.Tx, mediaType string, keyword string) (int64, error) {
query := this.Query(tx)
if len(mediaType) > 0 {
@@ -130,7 +143,7 @@ func (this *MessageMediaInstanceDAO) CountAllEnabledMediaInstances(tx *dbs.Tx, m
Count()
}
// 列出单页接收人
// ListAllEnabledMediaInstances 列出单页接收人
func (this *MessageMediaInstanceDAO) ListAllEnabledMediaInstances(tx *dbs.Tx, mediaType string, keyword string, offset int64, size int64) (result []*MessageMediaInstance, err error) {
query := this.Query(tx)
if len(mediaType) > 0 {

View File

@@ -75,11 +75,12 @@ func (this *MessageReceiverDAO) DisableReceivers(tx *dbs.Tx, clusterId int64, no
}
// CreateReceiver 创建接收人
func (this *MessageReceiverDAO) CreateReceiver(tx *dbs.Tx, target MessageTaskTarget, messageType MessageType, params maps.Map, recipientId int64, recipientGroupId int64) (int64, error) {
func (this *MessageReceiverDAO) CreateReceiver(tx *dbs.Tx, role string, clusterId int64, nodeId int64, serverId int64, messageType MessageType, params maps.Map, recipientId int64, recipientGroupId int64) (int64, error) {
op := NewMessageReceiverOperator()
op.ClusterId = target.ClusterId
op.NodeId = target.NodeId
op.ServerId = target.ServerId
op.Role = role
op.ClusterId = clusterId
op.NodeId = nodeId
op.ServerId = serverId
op.Type = messageType
if params == nil {
@@ -98,63 +99,120 @@ func (this *MessageReceiverDAO) CreateReceiver(tx *dbs.Tx, target MessageTaskTar
}
// FindAllEnabledReceivers 查询接收人
func (this *MessageReceiverDAO) FindAllEnabledReceivers(tx *dbs.Tx, target MessageTaskTarget, messageType string) (result []*MessageReceiver, err error) {
func (this *MessageReceiverDAO) FindAllEnabledReceivers(tx *dbs.Tx, role string, clusterId int64, nodeId int64, serverId int64, messageType string) (result []*MessageReceiver, err error) {
query := this.Query(tx)
if len(messageType) > 0 {
query.Attr("type", []string{"*", messageType}) // *表示所有的
}
_, err = query.
Attr("clusterId", target.ClusterId).
Attr("nodeId", target.NodeId).
Attr("serverId", target.ServerId).
Attr("role", role).
Attr("clusterId", clusterId).
Attr("nodeId", nodeId).
Attr("serverId", serverId).
State(MessageReceiverStateEnabled).
AscPk().
Slice(&result).
FindAll()
if err != nil {
return nil, err
}
if len(result) == 0 {
// 去掉类型再试试
query := this.Query(tx)
_, err = query.
Attr("clusterId", target.ClusterId).
Attr("nodeId", target.NodeId).
Attr("serverId", target.ServerId).
State(MessageReceiverStateEnabled).
AscPk().
Slice(&result).
FindAll()
if err != nil {
return nil, err
}
// 去掉服务和节点再试试
if len(result) == 0 {
query := this.Query(tx)
_, err = query.
Attr("clusterId", target.ClusterId).
State(MessageReceiverStateEnabled).
AscPk().
Slice(&result).
FindAll()
}
}
return
}
// CountAllEnabledReceivers 计算接收人数量
func (this *MessageReceiverDAO) CountAllEnabledReceivers(tx *dbs.Tx, target MessageTaskTarget, messageType string) (int64, error) {
func (this *MessageReceiverDAO) CountAllEnabledReceivers(tx *dbs.Tx, role string, clusterId int64, nodeId int64, serverId int64, messageType string) (int64, error) {
query := this.Query(tx)
if len(messageType) > 0 {
query.Attr("type", []string{"*", messageType}) // *表示所有的
}
return query.
Attr("clusterId", target.ClusterId).
Attr("nodeId", target.NodeId).
Attr("serverId", target.ServerId).
Attr("role", role).
Attr("clusterId", clusterId).
Attr("nodeId", nodeId).
Attr("serverId", serverId).
State(MessageReceiverStateEnabled).
Count()
}
// FindEnabledBestFitReceivers 查询最适合的接收人
func (this *MessageReceiverDAO) FindEnabledBestFitReceivers(tx *dbs.Tx, role string, clusterId int64, nodeId int64, serverId int64, messageType string) (result []*MessageReceiver, err error) {
// serverId优先
query := this.Query(tx)
if len(messageType) > 0 {
query.Attr("type", []string{"*", messageType}) // *表示所有的
}
if len(role) > 0 {
query.Attr("role", role)
}
if serverId > 0 {
query.Attr("serverId", serverId)
} else if nodeId > 0 {
query.Attr("nodeId", nodeId)
} else if clusterId > 0 {
query.Attr("clusterId", clusterId)
}
_, err = query.
State(MessageReceiverStateEnabled).
AscPk().
Slice(&result).
FindAll()
if err != nil || len(result) > 0 {
return
}
// nodeId优先
if serverId > 0 && nodeId > 0 {
query = this.Query(tx)
if len(messageType) > 0 {
query.Attr("type", []string{"*", messageType}) // *表示所有的
}
if len(role) > 0 {
query.Attr("role", role)
}
query.Attr("nodeId", nodeId)
_, err = query.
State(MessageReceiverStateEnabled).
AscPk().
Slice(&result).
FindAll()
if err != nil || len(result) > 0 {
return
}
}
// clusterId优先
if (serverId > 0 || nodeId > 0) && clusterId > 0 {
query = this.Query(tx)
if len(messageType) > 0 {
query.Attr("type", []string{"*", messageType}) // *表示所有的
}
if len(role) > 0 {
query.Attr("role", role)
}
query.Attr("clusterId", clusterId)
_, err = query.
State(MessageReceiverStateEnabled).
AscPk().
Slice(&result).
FindAll()
if err != nil || len(result) > 0 {
return
}
}
// 去掉集群ID
query = this.Query(tx)
if len(messageType) > 0 {
query.Attr("type", []string{"*", messageType}) // *表示所有的
}
if len(role) > 0 {
query.Attr("role", role)
}
_, err = query.
State(MessageReceiverStateEnabled).
AscPk().
Slice(&result).
FindAll()
if err != nil || len(result) > 0 {
return
}
return
}

View File

@@ -1,6 +1,30 @@
package models
import (
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
_ "github.com/go-sql-driver/mysql"
_ "github.com/iwind/TeaGo/bootstrap"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/logs"
"testing"
)
func TestMessageReceiverDAO_FindEnabledBestFitReceivers(t *testing.T) {
var tx *dbs.Tx
{
receivers, err := NewMessageReceiverDAO().FindEnabledBestFitReceivers(tx, nodeconfigs.NodeRoleNode, 18, 1, 2, "*")
if err != nil {
t.Fatal(err)
}
logs.PrintAsJSON(receivers, t)
}
{
receivers, err := NewMessageReceiverDAO().FindEnabledBestFitReceivers(tx, nodeconfigs.NodeRoleNode, 30, 1, 2, "*")
if err != nil {
t.Fatal(err)
}
logs.PrintAsJSON(receivers, t)
}
}

View File

@@ -3,6 +3,7 @@ package models
// MessageReceiver 消息通知接收人
type MessageReceiver struct {
Id uint32 `field:"id"` // ID
Role string `field:"role"` // 节点角色
ClusterId uint32 `field:"clusterId"` // 集群ID
NodeId uint32 `field:"nodeId"` // 节点ID
ServerId uint32 `field:"serverId"` // 服务ID
@@ -15,6 +16,7 @@ type MessageReceiver struct {
type MessageReceiverOperator struct {
Id interface{} // ID
Role interface{} // 节点角色
ClusterId interface{} // 集群ID
NodeId interface{} // 节点ID
ServerId interface{} // 服务ID

View File

@@ -7,6 +7,8 @@ import (
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/types"
)
const (
@@ -54,14 +56,27 @@ func (this *MessageRecipientDAO) DisableMessageRecipient(tx *dbs.Tx, id int64) e
}
// FindEnabledMessageRecipient 查找启用中的条目
func (this *MessageRecipientDAO) FindEnabledMessageRecipient(tx *dbs.Tx, id int64) (*MessageRecipient, error) {
func (this *MessageRecipientDAO) FindEnabledMessageRecipient(tx *dbs.Tx, recipientId int64, cacheMap maps.Map,
) (*MessageRecipient, error) {
if cacheMap == nil {
cacheMap = maps.Map{}
}
var cacheKey = this.Table + ":record:" + types.String(recipientId)
var cache = cacheMap.Get(cacheKey)
if cache != nil {
return cache.(*MessageRecipient), nil
}
result, err := this.Query(tx).
Pk(id).
Pk(recipientId).
Attr("state", MessageRecipientStateEnabled).
Find()
if result == nil {
return nil, err
}
cacheMap[cacheKey] = result
return result.(*MessageRecipient), err
}

View File

@@ -1,11 +0,0 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package models
// MessageTaskTarget 消息接收对象
// 每个字段不一定都有值
type MessageTaskTarget struct {
ClusterId int64 // 集群ID
NodeId int64 // 节点ID
ServerId int64 // 服务ID
}

View File

@@ -2,9 +2,13 @@ package models
import (
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/rands"
timeutil "github.com/iwind/TeaGo/utils/time"
"time"
)
@@ -35,6 +39,21 @@ func NewMessageTaskDAO() *MessageTaskDAO {
var SharedMessageTaskDAO *MessageTaskDAO
func init() {
dbs.OnReadyDone(func() {
// 清理数据任务
var ticker = time.NewTicker(time.Duration(rands.Int(24, 48)) * time.Hour)
go func() {
for range ticker.C {
err := SharedMessageTaskDAO.CleanExpiredMessageTasks(nil, 30) // 只保留30天
if err != nil {
remotelogs.Error("SharedMessageTaskDAO", "clean expired data failed: "+err.Error())
}
}
}()
})
}
func init() {
dbs.OnReady(func() {
SharedMessageTaskDAO = NewMessageTaskDAO()
@@ -80,6 +99,7 @@ func (this *MessageTaskDAO) CreateMessageTask(tx *dbs.Tx, recipientId int64, ins
op.Subject = subject
op.Body = body
op.IsPrimary = isPrimary
op.Day = timeutil.Format("Ymd")
op.Status = MessageTaskStatusNone
op.State = MessageTaskStateEnabled
return this.SaveInt64(tx, op)
@@ -101,6 +121,28 @@ func (this *MessageTaskDAO) FindSendingMessageTasks(tx *dbs.Tx, size int64) (res
return
}
// CountMessageTasksWithStatus 根据状态计算任务数量
func (this *MessageTaskDAO) CountMessageTasksWithStatus(tx *dbs.Tx, status MessageTaskStatus) (int64, error) {
return this.Query(tx).
State(MessageTaskStateEnabled).
Attr("status", status).
Count()
}
// ListMessageTasksWithStatus 根据状态列出单页任务
func (this *MessageTaskDAO) ListMessageTasksWithStatus(tx *dbs.Tx, status MessageTaskStatus, offset int64, size int64) (result []*MessageTask, err error) {
_, err = this.Query(tx).
State(MessageTaskStateEnabled).
Attr("status", status).
Desc("isPrimary").
AscPk().
Offset(offset).
Limit(size).
Slice(&result).
FindAll()
return
}
// UpdateMessageTaskStatus 设置发送的状态
func (this *MessageTaskDAO) UpdateMessageTaskStatus(tx *dbs.Tx, taskId int64, status MessageTaskStatus, result []byte) error {
if taskId <= 0 {
@@ -117,8 +159,8 @@ func (this *MessageTaskDAO) UpdateMessageTaskStatus(tx *dbs.Tx, taskId int64, st
}
// CreateMessageTasks 从集群、节点或者服务中创建任务
func (this *MessageTaskDAO) CreateMessageTasks(tx *dbs.Tx, target MessageTaskTarget, messageType MessageType, subject string, body string) error {
receivers, err := SharedMessageReceiverDAO.FindAllEnabledReceivers(tx, target, messageType)
func (this *MessageTaskDAO) CreateMessageTasks(tx *dbs.Tx, role nodeconfigs.NodeRole, clusterId int64, nodeId int64, serverId int64, messageType MessageType, subject string, body string) error {
receivers, err := SharedMessageReceiverDAO.FindEnabledBestFitReceivers(tx, role, clusterId, nodeId, serverId, messageType)
if err != nil {
return err
}
@@ -150,3 +192,16 @@ func (this *MessageTaskDAO) CreateMessageTasks(tx *dbs.Tx, target MessageTaskTar
return nil
}
// CleanExpiredMessageTasks 清理
func (this *MessageTaskDAO) CleanExpiredMessageTasks(tx *dbs.Tx, days int) error {
if days <= 0 {
days = 30
}
var day = timeutil.Format("Ymd", time.Now().AddDate(0, 0, -days))
_, err := this.Query(tx).
Where("(day IS NULL OR day<:day)").
Param("day", day).
Delete()
return err
}

View File

@@ -1,13 +1,32 @@
package models
import (
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/rands"
timeutil "github.com/iwind/TeaGo/utils/time"
"time"
)
type MessageTaskLogDAO dbs.DAO
func init() {
dbs.OnReadyDone(func() {
// 清理数据任务
var ticker = time.NewTicker(time.Duration(rands.Int(24, 48)) * time.Hour)
go func() {
for range ticker.C {
err := SharedMessageTaskLogDAO.CleanExpiredLogs(nil, 30) // 只保留30天
if err != nil {
remotelogs.Error("SharedMessageTaskLogDAO", "clean expired data failed: "+err.Error())
}
}
}()
})
}
func NewMessageTaskLogDAO() *MessageTaskLogDAO {
return dbs.NewDAO(&MessageTaskLogDAO{
DAOObject: dbs.DAOObject{
@@ -27,25 +46,28 @@ func init() {
})
}
// 创建日志
// CreateLog 创建日志
func (this *MessageTaskLogDAO) CreateLog(tx *dbs.Tx, taskId int64, isOk bool, errMsg string, response string) error {
op := NewMessageTaskLogOperator()
op.TaskId = taskId
op.IsOk = isOk
op.Error = errMsg
op.Response = response
op.Day = timeutil.Format("Ymd")
return this.Save(tx, op)
}
// 计算日志数量
// CountLogs 计算日志数量
func (this *MessageTaskLogDAO) CountLogs(tx *dbs.Tx) (int64, error) {
return this.Query(tx).
Where("taskId IN (SELECT id FROM " + SharedMessageTaskDAO.Table + ")").
Count()
}
// 列出单页日志
// ListLogs 列出单页日志
func (this *MessageTaskLogDAO) ListLogs(tx *dbs.Tx, offset int64, size int64) (result []*MessageTaskLog, err error) {
_, err = this.Query(tx).
Where("taskId IN (SELECT id FROM " + SharedMessageTaskDAO.Table + ")").
Offset(offset).
Limit(size).
DescPk().
@@ -53,3 +75,16 @@ func (this *MessageTaskLogDAO) ListLogs(tx *dbs.Tx, offset int64, size int64) (r
FindAll()
return
}
// CleanExpiredLogs 清理
func (this *MessageTaskLogDAO) CleanExpiredLogs(tx *dbs.Tx, days int) error {
if days <= 0 {
days = 30
}
var day = timeutil.Format("Ymd", time.Now().AddDate(0, 0, -days))
_, err := this.Query(tx).
Where("(day IS NULL OR day<:day)").
Param("day", day).
Delete()
return err
}

View File

@@ -1,6 +1,6 @@
package models
// 消息发送日志
// MessageTaskLog 消息发送日志
type MessageTaskLog struct {
Id uint64 `field:"id"` // ID
TaskId uint64 `field:"taskId"` // 任务ID
@@ -8,6 +8,7 @@ type MessageTaskLog struct {
IsOk uint8 `field:"isOk"` // 是否成功
Error string `field:"error"` // 错误信息
Response string `field:"response"` // 响应信息
Day string `field:"day"` // YYYYMMDD
}
type MessageTaskLogOperator struct {
@@ -17,6 +18,7 @@ type MessageTaskLogOperator struct {
IsOk interface{} // 是否成功
Error interface{} // 错误信息
Response interface{} // 响应信息
Day interface{} // YYYYMMDD
}
func NewMessageTaskLogOperator() *MessageTaskLogOperator {

View File

@@ -1,6 +1,6 @@
package models
//
// MessageTask 消息发送相关任务
type MessageTask struct {
Id uint64 `field:"id"` // ID
RecipientId uint32 `field:"recipientId"` // 接收人ID
@@ -13,6 +13,7 @@ type MessageTask struct {
SentAt uint64 `field:"sentAt"` // 最后一次发送时间
State uint8 `field:"state"` // 状态
Result string `field:"result"` // 结果
Day string `field:"day"` // YYYYMMDD
IsPrimary uint8 `field:"isPrimary"` // 是否优先
}
@@ -28,6 +29,7 @@ type MessageTaskOperator struct {
SentAt interface{} // 最后一次发送时间
State interface{} // 状态
Result interface{} // 结果
Day interface{} // YYYYMMDD
IsPrimary interface{} // 是否优先
}

View File

@@ -2,6 +2,7 @@ package models
import (
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
_ "github.com/go-sql-driver/mysql"
@@ -33,6 +34,9 @@ var SharedNodeLogDAO *NodeLogDAO
func init() {
dbs.OnReady(func() {
SharedNodeLogDAO = NewNodeLogDAO()
// 设置日志存储
remotelogs.SetDAO(SharedNodeLogDAO)
})
}