From 17b9eee78d25b080f09b85531e6e7fcbec581b31 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Tue, 24 Aug 2021 14:22:44 +0800 Subject: [PATCH] =?UTF-8?q?=E9=80=9A=E7=9F=A5=E5=AA=92=E4=BB=8B=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E4=BB=BB=E5=8A=A1=E9=98=9F=E5=88=97=E6=9F=A5=E7=9C=8B?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 4 +- go.sum | 2 + internal/db/models/message_dao.go | 33 +--- .../db/models/message_media_instance_dao.go | 31 ++-- internal/db/models/message_receiver_dao.go | 144 ++++++++++++------ .../db/models/message_receiver_dao_test.go | 24 +++ internal/db/models/message_receiver_model.go | 2 + internal/db/models/message_recipient_dao.go | 19 ++- internal/db/models/message_target.go | 11 -- internal/db/models/message_task_dao.go | 59 ++++++- internal/db/models/message_task_log_dao.go | 41 ++++- internal/db/models/message_task_log_model.go | 4 +- internal/db/models/message_task_model.go | 4 +- internal/db/models/node_log_dao.go | 4 + internal/remotelogs/dao_interface.go | 12 ++ internal/remotelogs/utils.go | 19 ++- internal/rpc/services/service_firewall.go | 7 +- .../service_message_media_instance.go | 17 ++- .../rpc/services/service_message_receiver.go | 36 ++--- .../rpc/services/service_message_recipient.go | 23 +-- internal/rpc/services/service_message_task.go | 144 ++++++++++++++++-- .../rpc/services/service_message_task_log.go | 10 +- internal/rpc/services/service_node_cluster.go | 4 +- 23 files changed, 489 insertions(+), 165 deletions(-) delete mode 100644 internal/db/models/message_target.go create mode 100644 internal/remotelogs/dao_interface.go diff --git a/go.mod b/go.mod index 6485566b..445cc6a6 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,8 @@ go 1.15 replace github.com/TeaOSLab/EdgeCommon => ../EdgeCommon + + require ( github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/TeaOSLab/EdgeCommon v0.0.0-00010101000000-000000000000 @@ -14,7 +16,7 @@ require ( github.com/go-sql-driver/mysql v1.5.0 github.com/go-yaml/yaml v2.1.0+incompatible github.com/golang/protobuf v1.5.2 - github.com/iwind/TeaGo v0.0.0-20210809112119-a57ed0e84e34 + github.com/iwind/TeaGo v0.0.0-20210824034952-1a56ad7d0b5e github.com/iwind/gosock v0.0.0-20210722083328-12b2d66abec3 github.com/json-iterator/go v1.1.11 // indirect github.com/lionsoul2014/ip2region v2.2.0-release+incompatible diff --git a/go.sum b/go.sum index 3885e0c2..480af9f0 100644 --- a/go.sum +++ b/go.sum @@ -186,6 +186,8 @@ github.com/iwind/TeaGo v0.0.0-20210806054428-5534da0db9d1 h1:AZKkwTNEZYrpyv62zIk github.com/iwind/TeaGo v0.0.0-20210806054428-5534da0db9d1/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= github.com/iwind/TeaGo v0.0.0-20210809112119-a57ed0e84e34 h1:ZCNQXLiGF5Z1cV3Pi03zCWzwwjPfsI5XhcrNhTvCFIU= github.com/iwind/TeaGo v0.0.0-20210809112119-a57ed0e84e34/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= +github.com/iwind/TeaGo v0.0.0-20210824034952-1a56ad7d0b5e h1:GDCU57lQD6W9u5KT2834MmK022FSeAbskb7H0p2eaJY= +github.com/iwind/TeaGo v0.0.0-20210824034952-1a56ad7d0b5e/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= github.com/iwind/gosock v0.0.0-20210722083328-12b2d66abec3 h1:aBSonas7vFcgTj9u96/bWGILGv1ZbUSTLiOzcI1ZT6c= github.com/iwind/gosock v0.0.0-20210722083328-12b2d66abec3/go.mod h1:H5Q7SXwbx3a97ecJkaS2sD77gspzE7HFUafBO0peEyA= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= diff --git a/internal/db/models/message_dao.go b/internal/db/models/message_dao.go index d912915c..872958e3 100644 --- a/internal/db/models/message_dao.go +++ b/internal/db/models/message_dao.go @@ -106,11 +106,7 @@ func (this *MessageDAO) CreateClusterMessage(tx *dbs.Tx, role string, clusterId } // 发送给媒介接收人 - err = SharedMessageTaskDAO.CreateMessageTasks(tx, MessageTaskTarget{ - ClusterId: clusterId, - NodeId: 0, - ServerId: 0, - }, messageType, subject, body) + err = SharedMessageTaskDAO.CreateMessageTasks(tx, role, 0, 0, 0, messageType, subject, body) if err != nil { return err } @@ -138,29 +134,10 @@ func (this *MessageDAO) CreateNodeMessage(tx *dbs.Tx, role string, clusterId int return err } - // TODO 目前只支持边缘节点发送消息,将来要支持NS节点 - if role == nodeconfigs.NodeRoleNode { - // 发送给媒介接收人 - 集群 - err = SharedMessageTaskDAO.CreateMessageTasks(tx, MessageTaskTarget{ - ClusterId: clusterId, - NodeId: 0, - ServerId: 0, - }, messageType, subject, body) - if err != nil { - return err - } - - // 发送给媒介接收人 - 节点 - if nodeId > 0 { - err = SharedMessageTaskDAO.CreateMessageTasks(tx, MessageTaskTarget{ - ClusterId: clusterId, - NodeId: nodeId, - ServerId: 0, - }, messageType, subject, body) - if err != nil { - return err - } - } + // 发送给媒介接收人 - 集群 + err = SharedMessageTaskDAO.CreateMessageTasks(tx, role, clusterId, nodeId, 0, messageType, subject, body) + if err != nil { + return err } return nil diff --git a/internal/db/models/message_media_instance_dao.go b/internal/db/models/message_media_instance_dao.go index 7c8b1565..d32ef86d 100644 --- a/internal/db/models/message_media_instance_dao.go +++ b/internal/db/models/message_media_instance_dao.go @@ -7,6 +7,7 @@ import ( "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/maps" + "github.com/iwind/TeaGo/types" ) const ( @@ -35,7 +36,7 @@ func init() { }) } -// 启用条目 +// EnableMessageMediaInstance 启用条目 func (this *MessageMediaInstanceDAO) EnableMessageMediaInstance(tx *dbs.Tx, id int64) error { _, err := this.Query(tx). Pk(id). @@ -44,7 +45,7 @@ func (this *MessageMediaInstanceDAO) EnableMessageMediaInstance(tx *dbs.Tx, id i return err } -// 禁用条目 +// DisableMessageMediaInstance 禁用条目 func (this *MessageMediaInstanceDAO) DisableMessageMediaInstance(tx *dbs.Tx, id int64) error { _, err := this.Query(tx). Pk(id). @@ -53,19 +54,31 @@ func (this *MessageMediaInstanceDAO) DisableMessageMediaInstance(tx *dbs.Tx, id return err } -// 查找启用中的条目 -func (this *MessageMediaInstanceDAO) FindEnabledMessageMediaInstance(tx *dbs.Tx, id int64) (*MessageMediaInstance, error) { +// FindEnabledMessageMediaInstance 查找启用中的条目 +func (this *MessageMediaInstanceDAO) FindEnabledMessageMediaInstance(tx *dbs.Tx, instanceId int64, cacheMap maps.Map) (*MessageMediaInstance, error) { + if cacheMap == nil { + cacheMap = maps.Map{} + } + var cacheKey = this.Table + ":record:" + types.String(instanceId) + var cache = cacheMap.Get(cacheKey) + if cache != nil { + return cache.(*MessageMediaInstance), nil + } + result, err := this.Query(tx). - Pk(id). + Pk(instanceId). Attr("state", MessageMediaInstanceStateEnabled). Find() if result == nil { return nil, err } + + cacheMap[cacheKey] = result + return result.(*MessageMediaInstance), err } -// 创建媒介实例 +// CreateMediaInstance 创建媒介实例 func (this *MessageMediaInstanceDAO) CreateMediaInstance(tx *dbs.Tx, name string, mediaType string, params maps.Map, description string) (int64, error) { op := NewMessageMediaInstanceOperator() op.Name = name @@ -88,7 +101,7 @@ func (this *MessageMediaInstanceDAO) CreateMediaInstance(tx *dbs.Tx, name string return this.SaveInt64(tx, op) } -// 修改媒介实例 +// UpdateMediaInstance 修改媒介实例 func (this *MessageMediaInstanceDAO) UpdateMediaInstance(tx *dbs.Tx, instanceId int64, name string, mediaType string, params maps.Map, description string, isOn bool) error { if instanceId <= 0 { return errors.New("invalid instanceId") @@ -114,7 +127,7 @@ func (this *MessageMediaInstanceDAO) UpdateMediaInstance(tx *dbs.Tx, instanceId return this.Save(tx, op) } -// 计算接收人数量 +// CountAllEnabledMediaInstances 计算接收人数量 func (this *MessageMediaInstanceDAO) CountAllEnabledMediaInstances(tx *dbs.Tx, mediaType string, keyword string) (int64, error) { query := this.Query(tx) if len(mediaType) > 0 { @@ -130,7 +143,7 @@ func (this *MessageMediaInstanceDAO) CountAllEnabledMediaInstances(tx *dbs.Tx, m Count() } -// 列出单页接收人 +// ListAllEnabledMediaInstances 列出单页接收人 func (this *MessageMediaInstanceDAO) ListAllEnabledMediaInstances(tx *dbs.Tx, mediaType string, keyword string, offset int64, size int64) (result []*MessageMediaInstance, err error) { query := this.Query(tx) if len(mediaType) > 0 { diff --git a/internal/db/models/message_receiver_dao.go b/internal/db/models/message_receiver_dao.go index b1cbd1c7..87fc6743 100644 --- a/internal/db/models/message_receiver_dao.go +++ b/internal/db/models/message_receiver_dao.go @@ -75,11 +75,12 @@ func (this *MessageReceiverDAO) DisableReceivers(tx *dbs.Tx, clusterId int64, no } // CreateReceiver 创建接收人 -func (this *MessageReceiverDAO) CreateReceiver(tx *dbs.Tx, target MessageTaskTarget, messageType MessageType, params maps.Map, recipientId int64, recipientGroupId int64) (int64, error) { +func (this *MessageReceiverDAO) CreateReceiver(tx *dbs.Tx, role string, clusterId int64, nodeId int64, serverId int64, messageType MessageType, params maps.Map, recipientId int64, recipientGroupId int64) (int64, error) { op := NewMessageReceiverOperator() - op.ClusterId = target.ClusterId - op.NodeId = target.NodeId - op.ServerId = target.ServerId + op.Role = role + op.ClusterId = clusterId + op.NodeId = nodeId + op.ServerId = serverId op.Type = messageType if params == nil { @@ -98,63 +99,120 @@ func (this *MessageReceiverDAO) CreateReceiver(tx *dbs.Tx, target MessageTaskTar } // FindAllEnabledReceivers 查询接收人 -func (this *MessageReceiverDAO) FindAllEnabledReceivers(tx *dbs.Tx, target MessageTaskTarget, messageType string) (result []*MessageReceiver, err error) { +func (this *MessageReceiverDAO) FindAllEnabledReceivers(tx *dbs.Tx, role string, clusterId int64, nodeId int64, serverId int64, messageType string) (result []*MessageReceiver, err error) { query := this.Query(tx) if len(messageType) > 0 { query.Attr("type", []string{"*", messageType}) // *表示所有的 } _, err = query. - Attr("clusterId", target.ClusterId). - Attr("nodeId", target.NodeId). - Attr("serverId", target.ServerId). + Attr("role", role). + Attr("clusterId", clusterId). + Attr("nodeId", nodeId). + Attr("serverId", serverId). State(MessageReceiverStateEnabled). AscPk(). Slice(&result). FindAll() - if err != nil { - return nil, err - } - - if len(result) == 0 { - // 去掉类型再试试 - query := this.Query(tx) - _, err = query. - Attr("clusterId", target.ClusterId). - Attr("nodeId", target.NodeId). - Attr("serverId", target.ServerId). - State(MessageReceiverStateEnabled). - AscPk(). - Slice(&result). - FindAll() - if err != nil { - return nil, err - } - - // 去掉服务和节点再试试 - if len(result) == 0 { - query := this.Query(tx) - _, err = query. - Attr("clusterId", target.ClusterId). - State(MessageReceiverStateEnabled). - AscPk(). - Slice(&result). - FindAll() - } - } - return } // CountAllEnabledReceivers 计算接收人数量 -func (this *MessageReceiverDAO) CountAllEnabledReceivers(tx *dbs.Tx, target MessageTaskTarget, messageType string) (int64, error) { +func (this *MessageReceiverDAO) CountAllEnabledReceivers(tx *dbs.Tx, role string, clusterId int64, nodeId int64, serverId int64, messageType string) (int64, error) { query := this.Query(tx) if len(messageType) > 0 { query.Attr("type", []string{"*", messageType}) // *表示所有的 } return query. - Attr("clusterId", target.ClusterId). - Attr("nodeId", target.NodeId). - Attr("serverId", target.ServerId). + Attr("role", role). + Attr("clusterId", clusterId). + Attr("nodeId", nodeId). + Attr("serverId", serverId). State(MessageReceiverStateEnabled). Count() } + +// FindEnabledBestFitReceivers 查询最适合的接收人 +func (this *MessageReceiverDAO) FindEnabledBestFitReceivers(tx *dbs.Tx, role string, clusterId int64, nodeId int64, serverId int64, messageType string) (result []*MessageReceiver, err error) { + // serverId优先 + query := this.Query(tx) + if len(messageType) > 0 { + query.Attr("type", []string{"*", messageType}) // *表示所有的 + } + if len(role) > 0 { + query.Attr("role", role) + } + if serverId > 0 { + query.Attr("serverId", serverId) + } else if nodeId > 0 { + query.Attr("nodeId", nodeId) + } else if clusterId > 0 { + query.Attr("clusterId", clusterId) + } + _, err = query. + State(MessageReceiverStateEnabled). + AscPk(). + Slice(&result). + FindAll() + if err != nil || len(result) > 0 { + return + } + + // nodeId优先 + if serverId > 0 && nodeId > 0 { + query = this.Query(tx) + if len(messageType) > 0 { + query.Attr("type", []string{"*", messageType}) // *表示所有的 + } + if len(role) > 0 { + query.Attr("role", role) + } + query.Attr("nodeId", nodeId) + _, err = query. + State(MessageReceiverStateEnabled). + AscPk(). + Slice(&result). + FindAll() + if err != nil || len(result) > 0 { + return + } + } + + // clusterId优先 + if (serverId > 0 || nodeId > 0) && clusterId > 0 { + query = this.Query(tx) + if len(messageType) > 0 { + query.Attr("type", []string{"*", messageType}) // *表示所有的 + } + if len(role) > 0 { + query.Attr("role", role) + } + query.Attr("clusterId", clusterId) + _, err = query. + State(MessageReceiverStateEnabled). + AscPk(). + Slice(&result). + FindAll() + if err != nil || len(result) > 0 { + return + } + } + + // 去掉集群ID + query = this.Query(tx) + if len(messageType) > 0 { + query.Attr("type", []string{"*", messageType}) // *表示所有的 + } + if len(role) > 0 { + query.Attr("role", role) + } + _, err = query. + State(MessageReceiverStateEnabled). + AscPk(). + Slice(&result). + FindAll() + if err != nil || len(result) > 0 { + return + } + + return +} diff --git a/internal/db/models/message_receiver_dao_test.go b/internal/db/models/message_receiver_dao_test.go index 224e9db7..52934a74 100644 --- a/internal/db/models/message_receiver_dao_test.go +++ b/internal/db/models/message_receiver_dao_test.go @@ -1,6 +1,30 @@ package models import ( + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" _ "github.com/go-sql-driver/mysql" _ "github.com/iwind/TeaGo/bootstrap" + "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/logs" + "testing" ) + +func TestMessageReceiverDAO_FindEnabledBestFitReceivers(t *testing.T) { + var tx *dbs.Tx + + { + receivers, err := NewMessageReceiverDAO().FindEnabledBestFitReceivers(tx, nodeconfigs.NodeRoleNode, 18, 1, 2, "*") + if err != nil { + t.Fatal(err) + } + logs.PrintAsJSON(receivers, t) + } + + { + receivers, err := NewMessageReceiverDAO().FindEnabledBestFitReceivers(tx, nodeconfigs.NodeRoleNode, 30, 1, 2, "*") + if err != nil { + t.Fatal(err) + } + logs.PrintAsJSON(receivers, t) + } +} diff --git a/internal/db/models/message_receiver_model.go b/internal/db/models/message_receiver_model.go index 752f8f39..44bfba8f 100644 --- a/internal/db/models/message_receiver_model.go +++ b/internal/db/models/message_receiver_model.go @@ -3,6 +3,7 @@ package models // MessageReceiver 消息通知接收人 type MessageReceiver struct { Id uint32 `field:"id"` // ID + Role string `field:"role"` // 节点角色 ClusterId uint32 `field:"clusterId"` // 集群ID NodeId uint32 `field:"nodeId"` // 节点ID ServerId uint32 `field:"serverId"` // 服务ID @@ -15,6 +16,7 @@ type MessageReceiver struct { type MessageReceiverOperator struct { Id interface{} // ID + Role interface{} // 节点角色 ClusterId interface{} // 集群ID NodeId interface{} // 节点ID ServerId interface{} // 服务ID diff --git a/internal/db/models/message_recipient_dao.go b/internal/db/models/message_recipient_dao.go index 0d407f4a..dc7d7a25 100644 --- a/internal/db/models/message_recipient_dao.go +++ b/internal/db/models/message_recipient_dao.go @@ -7,6 +7,8 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/maps" + "github.com/iwind/TeaGo/types" ) const ( @@ -54,14 +56,27 @@ func (this *MessageRecipientDAO) DisableMessageRecipient(tx *dbs.Tx, id int64) e } // FindEnabledMessageRecipient 查找启用中的条目 -func (this *MessageRecipientDAO) FindEnabledMessageRecipient(tx *dbs.Tx, id int64) (*MessageRecipient, error) { +func (this *MessageRecipientDAO) FindEnabledMessageRecipient(tx *dbs.Tx, recipientId int64, cacheMap maps.Map, +) (*MessageRecipient, error) { + if cacheMap == nil { + cacheMap = maps.Map{} + } + var cacheKey = this.Table + ":record:" + types.String(recipientId) + var cache = cacheMap.Get(cacheKey) + if cache != nil { + return cache.(*MessageRecipient), nil + } + result, err := this.Query(tx). - Pk(id). + Pk(recipientId). Attr("state", MessageRecipientStateEnabled). Find() if result == nil { return nil, err } + + cacheMap[cacheKey] = result + return result.(*MessageRecipient), err } diff --git a/internal/db/models/message_target.go b/internal/db/models/message_target.go deleted file mode 100644 index 987f854b..00000000 --- a/internal/db/models/message_target.go +++ /dev/null @@ -1,11 +0,0 @@ -// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. - -package models - -// MessageTaskTarget 消息接收对象 -// 每个字段不一定都有值 -type MessageTaskTarget struct { - ClusterId int64 // 集群ID - NodeId int64 // 节点ID - ServerId int64 // 服务ID -} diff --git a/internal/db/models/message_task_dao.go b/internal/db/models/message_task_dao.go index 6d120cc6..e5d08b54 100644 --- a/internal/db/models/message_task_dao.go +++ b/internal/db/models/message_task_dao.go @@ -2,9 +2,13 @@ package models import ( "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/rands" + timeutil "github.com/iwind/TeaGo/utils/time" "time" ) @@ -35,6 +39,21 @@ func NewMessageTaskDAO() *MessageTaskDAO { var SharedMessageTaskDAO *MessageTaskDAO +func init() { + dbs.OnReadyDone(func() { + // 清理数据任务 + var ticker = time.NewTicker(time.Duration(rands.Int(24, 48)) * time.Hour) + go func() { + for range ticker.C { + err := SharedMessageTaskDAO.CleanExpiredMessageTasks(nil, 30) // 只保留30天 + if err != nil { + remotelogs.Error("SharedMessageTaskDAO", "clean expired data failed: "+err.Error()) + } + } + }() + }) +} + func init() { dbs.OnReady(func() { SharedMessageTaskDAO = NewMessageTaskDAO() @@ -80,6 +99,7 @@ func (this *MessageTaskDAO) CreateMessageTask(tx *dbs.Tx, recipientId int64, ins op.Subject = subject op.Body = body op.IsPrimary = isPrimary + op.Day = timeutil.Format("Ymd") op.Status = MessageTaskStatusNone op.State = MessageTaskStateEnabled return this.SaveInt64(tx, op) @@ -101,6 +121,28 @@ func (this *MessageTaskDAO) FindSendingMessageTasks(tx *dbs.Tx, size int64) (res return } +// CountMessageTasksWithStatus 根据状态计算任务数量 +func (this *MessageTaskDAO) CountMessageTasksWithStatus(tx *dbs.Tx, status MessageTaskStatus) (int64, error) { + return this.Query(tx). + State(MessageTaskStateEnabled). + Attr("status", status). + Count() +} + +// ListMessageTasksWithStatus 根据状态列出单页任务 +func (this *MessageTaskDAO) ListMessageTasksWithStatus(tx *dbs.Tx, status MessageTaskStatus, offset int64, size int64) (result []*MessageTask, err error) { + _, err = this.Query(tx). + State(MessageTaskStateEnabled). + Attr("status", status). + Desc("isPrimary"). + AscPk(). + Offset(offset). + Limit(size). + Slice(&result). + FindAll() + return +} + // UpdateMessageTaskStatus 设置发送的状态 func (this *MessageTaskDAO) UpdateMessageTaskStatus(tx *dbs.Tx, taskId int64, status MessageTaskStatus, result []byte) error { if taskId <= 0 { @@ -117,8 +159,8 @@ func (this *MessageTaskDAO) UpdateMessageTaskStatus(tx *dbs.Tx, taskId int64, st } // CreateMessageTasks 从集群、节点或者服务中创建任务 -func (this *MessageTaskDAO) CreateMessageTasks(tx *dbs.Tx, target MessageTaskTarget, messageType MessageType, subject string, body string) error { - receivers, err := SharedMessageReceiverDAO.FindAllEnabledReceivers(tx, target, messageType) +func (this *MessageTaskDAO) CreateMessageTasks(tx *dbs.Tx, role nodeconfigs.NodeRole, clusterId int64, nodeId int64, serverId int64, messageType MessageType, subject string, body string) error { + receivers, err := SharedMessageReceiverDAO.FindEnabledBestFitReceivers(tx, role, clusterId, nodeId, serverId, messageType) if err != nil { return err } @@ -150,3 +192,16 @@ func (this *MessageTaskDAO) CreateMessageTasks(tx *dbs.Tx, target MessageTaskTar return nil } + +// CleanExpiredMessageTasks 清理 +func (this *MessageTaskDAO) CleanExpiredMessageTasks(tx *dbs.Tx, days int) error { + if days <= 0 { + days = 30 + } + var day = timeutil.Format("Ymd", time.Now().AddDate(0, 0, -days)) + _, err := this.Query(tx). + Where("(day IS NULL OR day<:day)"). + Param("day", day). + Delete() + return err +} diff --git a/internal/db/models/message_task_log_dao.go b/internal/db/models/message_task_log_dao.go index a3fdeebd..a34394d0 100644 --- a/internal/db/models/message_task_log_dao.go +++ b/internal/db/models/message_task_log_dao.go @@ -1,13 +1,32 @@ package models import ( + "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/rands" + timeutil "github.com/iwind/TeaGo/utils/time" + "time" ) type MessageTaskLogDAO dbs.DAO +func init() { + dbs.OnReadyDone(func() { + // 清理数据任务 + var ticker = time.NewTicker(time.Duration(rands.Int(24, 48)) * time.Hour) + go func() { + for range ticker.C { + err := SharedMessageTaskLogDAO.CleanExpiredLogs(nil, 30) // 只保留30天 + if err != nil { + remotelogs.Error("SharedMessageTaskLogDAO", "clean expired data failed: "+err.Error()) + } + } + }() + }) +} + func NewMessageTaskLogDAO() *MessageTaskLogDAO { return dbs.NewDAO(&MessageTaskLogDAO{ DAOObject: dbs.DAOObject{ @@ -27,25 +46,28 @@ func init() { }) } -// 创建日志 +// CreateLog 创建日志 func (this *MessageTaskLogDAO) CreateLog(tx *dbs.Tx, taskId int64, isOk bool, errMsg string, response string) error { op := NewMessageTaskLogOperator() op.TaskId = taskId op.IsOk = isOk op.Error = errMsg op.Response = response + op.Day = timeutil.Format("Ymd") return this.Save(tx, op) } -// 计算日志数量 +// CountLogs 计算日志数量 func (this *MessageTaskLogDAO) CountLogs(tx *dbs.Tx) (int64, error) { return this.Query(tx). + Where("taskId IN (SELECT id FROM " + SharedMessageTaskDAO.Table + ")"). Count() } -// 列出单页日志 +// ListLogs 列出单页日志 func (this *MessageTaskLogDAO) ListLogs(tx *dbs.Tx, offset int64, size int64) (result []*MessageTaskLog, err error) { _, err = this.Query(tx). + Where("taskId IN (SELECT id FROM " + SharedMessageTaskDAO.Table + ")"). Offset(offset). Limit(size). DescPk(). @@ -53,3 +75,16 @@ func (this *MessageTaskLogDAO) ListLogs(tx *dbs.Tx, offset int64, size int64) (r FindAll() return } + +// CleanExpiredLogs 清理 +func (this *MessageTaskLogDAO) CleanExpiredLogs(tx *dbs.Tx, days int) error { + if days <= 0 { + days = 30 + } + var day = timeutil.Format("Ymd", time.Now().AddDate(0, 0, -days)) + _, err := this.Query(tx). + Where("(day IS NULL OR day<:day)"). + Param("day", day). + Delete() + return err +} diff --git a/internal/db/models/message_task_log_model.go b/internal/db/models/message_task_log_model.go index 042158ad..1f97e83e 100644 --- a/internal/db/models/message_task_log_model.go +++ b/internal/db/models/message_task_log_model.go @@ -1,6 +1,6 @@ package models -// 消息发送日志 +// MessageTaskLog 消息发送日志 type MessageTaskLog struct { Id uint64 `field:"id"` // ID TaskId uint64 `field:"taskId"` // 任务ID @@ -8,6 +8,7 @@ type MessageTaskLog struct { IsOk uint8 `field:"isOk"` // 是否成功 Error string `field:"error"` // 错误信息 Response string `field:"response"` // 响应信息 + Day string `field:"day"` // YYYYMMDD } type MessageTaskLogOperator struct { @@ -17,6 +18,7 @@ type MessageTaskLogOperator struct { IsOk interface{} // 是否成功 Error interface{} // 错误信息 Response interface{} // 响应信息 + Day interface{} // YYYYMMDD } func NewMessageTaskLogOperator() *MessageTaskLogOperator { diff --git a/internal/db/models/message_task_model.go b/internal/db/models/message_task_model.go index c691253f..2bbe1f58 100644 --- a/internal/db/models/message_task_model.go +++ b/internal/db/models/message_task_model.go @@ -1,6 +1,6 @@ package models -// +// MessageTask 消息发送相关任务 type MessageTask struct { Id uint64 `field:"id"` // ID RecipientId uint32 `field:"recipientId"` // 接收人ID @@ -13,6 +13,7 @@ type MessageTask struct { SentAt uint64 `field:"sentAt"` // 最后一次发送时间 State uint8 `field:"state"` // 状态 Result string `field:"result"` // 结果 + Day string `field:"day"` // YYYYMMDD IsPrimary uint8 `field:"isPrimary"` // 是否优先 } @@ -28,6 +29,7 @@ type MessageTaskOperator struct { SentAt interface{} // 最后一次发送时间 State interface{} // 状态 Result interface{} // 结果 + Day interface{} // YYYYMMDD IsPrimary interface{} // 是否优先 } diff --git a/internal/db/models/node_log_dao.go b/internal/db/models/node_log_dao.go index d198e2f8..60b4a895 100644 --- a/internal/db/models/node_log_dao.go +++ b/internal/db/models/node_log_dao.go @@ -2,6 +2,7 @@ package models import ( "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" "github.com/TeaOSLab/EdgeCommon/pkg/configutils" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" _ "github.com/go-sql-driver/mysql" @@ -33,6 +34,9 @@ var SharedNodeLogDAO *NodeLogDAO func init() { dbs.OnReady(func() { SharedNodeLogDAO = NewNodeLogDAO() + + // 设置日志存储 + remotelogs.SetDAO(SharedNodeLogDAO) }) } diff --git a/internal/remotelogs/dao_interface.go b/internal/remotelogs/dao_interface.go new file mode 100644 index 00000000..6b0c07e4 --- /dev/null +++ b/internal/remotelogs/dao_interface.go @@ -0,0 +1,12 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package remotelogs + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" + "github.com/iwind/TeaGo/dbs" +) + +type DAOInterface interface { + CreateLog(tx *dbs.Tx, nodeRole nodeconfigs.NodeRole, nodeId int64, serverId int64, originId int64, level string, tag string, description string, createdAt int64) error +} diff --git a/internal/remotelogs/utils.go b/internal/remotelogs/utils.go index 529bff41..18fed804 100644 --- a/internal/remotelogs/utils.go +++ b/internal/remotelogs/utils.go @@ -3,7 +3,6 @@ package remotelogs import ( "github.com/TeaOSLab/EdgeAPI/internal/configs" teaconst "github.com/TeaOSLab/EdgeAPI/internal/const" - "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/iwind/TeaGo/logs" @@ -11,6 +10,7 @@ import ( ) var logChan = make(chan *pb.NodeLog, 1024) +var sharedDAO DAOInterface func init() { // 定期上传日志 @@ -25,7 +25,7 @@ func init() { }() } -// 打印普通信息 +// Println 打印普通信息 func Println(tag string, description string) { logs.Println("[" + tag + "]" + description) @@ -48,7 +48,7 @@ func Println(tag string, description string) { } } -// 打印警告信息 +// Warn 打印警告信息 func Warn(tag string, description string) { logs.Println("[" + tag + "]" + description) @@ -71,7 +71,7 @@ func Warn(tag string, description string) { } } -// 打印错误信息 +// Error 打印错误信息 func Error(tag string, description string) { logs.Println("[" + tag + "]" + description) @@ -94,13 +94,22 @@ func Error(tag string, description string) { } } +// SetDAO 设置存储接口 +func SetDAO(dao DAOInterface) { + sharedDAO = dao +} + // 上传日志 func uploadLogs() error { + if sharedDAO == nil { + return nil + } + Loop: for { select { case log := <-logChan: - err := models.SharedNodeLogDAO.CreateLog(nil, nodeconfigs.NodeRoleAPI, log.NodeId, log.ServerId, log.OriginId, log.Level, log.Tag, log.Description, log.CreatedAt) + err := sharedDAO.CreateLog(nil, nodeconfigs.NodeRoleAPI, log.NodeId, log.ServerId, log.OriginId, log.Level, log.Tag, log.Description, log.CreatedAt) if err != nil { return err } diff --git a/internal/rpc/services/service_firewall.go b/internal/rpc/services/service_firewall.go index 32d49d3c..48e43480 100644 --- a/internal/rpc/services/service_firewall.go +++ b/internal/rpc/services/service_firewall.go @@ -7,6 +7,7 @@ import ( "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models/stats" "github.com/TeaOSLab/EdgeAPI/internal/utils" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/iwind/TeaGo/types" timeutil "github.com/iwind/TeaGo/utils/time" @@ -224,11 +225,7 @@ func (this *FirewallService) NotifyHTTPFirewallEvent(ctx context.Context, req *p "\n规则分组:" + ruleGroupName + "\n规则集:" + ruleSetName + "\n时间:" + timeutil.FormatTime("Y-m-d H:i:s", req.CreatedAt) - err = models.SharedMessageTaskDAO.CreateMessageTasks(tx, models.MessageTaskTarget{ - ClusterId: clusterId, - NodeId: nodeId, - ServerId: req.ServerId, - }, models.MessageTypeFirewallEvent, "发生防火墙事件", msg) + err = models.SharedMessageTaskDAO.CreateMessageTasks(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, req.ServerId, models.MessageTypeFirewallEvent, "触发防火墙事件", msg) if err != nil { return nil, err } diff --git a/internal/rpc/services/service_message_media_instance.go b/internal/rpc/services/service_message_media_instance.go index da39c2f8..c33c297a 100644 --- a/internal/rpc/services/service_message_media_instance.go +++ b/internal/rpc/services/service_message_media_instance.go @@ -8,12 +8,12 @@ import ( "github.com/iwind/TeaGo/maps" ) -// 消息媒介实例服务 +// MessageMediaInstanceService 消息媒介实例服务 type MessageMediaInstanceService struct { BaseService } -// 创建消息媒介实例 +// CreateMessageMediaInstance 创建消息媒介实例 func (this *MessageMediaInstanceService) CreateMessageMediaInstance(ctx context.Context, req *pb.CreateMessageMediaInstanceRequest) (*pb.CreateMessageMediaInstanceResponse, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil { @@ -38,7 +38,7 @@ func (this *MessageMediaInstanceService) CreateMessageMediaInstance(ctx context. return &pb.CreateMessageMediaInstanceResponse{MessageMediaInstanceId: instanceId}, nil } -// 修改消息实例 +// UpdateMessageMediaInstance 修改消息实例 func (this *MessageMediaInstanceService) UpdateMessageMediaInstance(ctx context.Context, req *pb.UpdateMessageMediaInstanceRequest) (*pb.RPCSuccess, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil { @@ -62,7 +62,7 @@ func (this *MessageMediaInstanceService) UpdateMessageMediaInstance(ctx context. return this.Success() } -// 删除媒介实例 +// DeleteMessageMediaInstance 删除媒介实例 func (this *MessageMediaInstanceService) DeleteMessageMediaInstance(ctx context.Context, req *pb.DeleteMessageMediaInstanceRequest) (*pb.RPCSuccess, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil { @@ -77,7 +77,7 @@ func (this *MessageMediaInstanceService) DeleteMessageMediaInstance(ctx context. return this.Success() } -// 计算媒介实例数量 +// CountAllEnabledMessageMediaInstances 计算媒介实例数量 func (this *MessageMediaInstanceService) CountAllEnabledMessageMediaInstances(ctx context.Context, req *pb.CountAllEnabledMessageMediaInstancesRequest) (*pb.RPCCountResponse, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil { @@ -93,7 +93,7 @@ func (this *MessageMediaInstanceService) CountAllEnabledMessageMediaInstances(ct return this.SuccessCount(count) } -// 列出单页媒介实例 +// ListEnabledMessageMediaInstances 列出单页媒介实例 func (this *MessageMediaInstanceService) ListEnabledMessageMediaInstances(ctx context.Context, req *pb.ListEnabledMessageMediaInstancesRequest) (*pb.ListEnabledMessageMediaInstancesResponse, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil { @@ -137,7 +137,7 @@ func (this *MessageMediaInstanceService) ListEnabledMessageMediaInstances(ctx co return &pb.ListEnabledMessageMediaInstancesResponse{MessageMediaInstances: pbInstances}, nil } -// 查找单个媒介实例信息 +// FindEnabledMessageMediaInstance 查找单个媒介实例信息 func (this *MessageMediaInstanceService) FindEnabledMessageMediaInstance(ctx context.Context, req *pb.FindEnabledMessageMediaInstanceRequest) (*pb.FindEnabledMessageMediaInstanceResponse, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil { @@ -145,7 +145,8 @@ func (this *MessageMediaInstanceService) FindEnabledMessageMediaInstance(ctx con } var tx = this.NullTx() - instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, req.MessageMediaInstanceId) + var cacheMap = maps.Map{} + instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, req.MessageMediaInstanceId, cacheMap) if err != nil { return nil, err } diff --git a/internal/rpc/services/service_message_receiver.go b/internal/rpc/services/service_message_receiver.go index 6f0f87dc..02332f15 100644 --- a/internal/rpc/services/service_message_receiver.go +++ b/internal/rpc/services/service_message_receiver.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/maps" @@ -21,6 +22,10 @@ func (this *MessageReceiverService) UpdateMessageReceivers(ctx context.Context, return nil, err } + if len(req.Role) == 0 { + req.Role = nodeconfigs.NodeRoleNode + } + params := maps.Map{} if len(req.ParamsJSON) > 0 { err = json.Unmarshal(req.ParamsJSON, ¶ms) @@ -37,11 +42,7 @@ func (this *MessageReceiverService) UpdateMessageReceivers(ctx context.Context, for messageType, options := range req.RecipientOptions { for _, option := range options.RecipientOptions { - _, err := models.SharedMessageReceiverDAO.CreateReceiver(tx, models.MessageTaskTarget{ - ClusterId: req.NodeClusterId, - NodeId: req.NodeId, - ServerId: req.ServerId, - }, messageType, params, option.MessageRecipientId, option.MessageRecipientGroupId) + _, err := models.SharedMessageReceiverDAO.CreateReceiver(tx, req.Role, req.NodeClusterId, req.NodeId, req.ServerId, messageType, params, option.MessageRecipientId, option.MessageRecipientGroupId) if err != nil { return err } @@ -63,12 +64,13 @@ func (this *MessageReceiverService) FindAllEnabledMessageReceivers(ctx context.C return nil, err } + if len(req.Role) == 0 { + req.Role = nodeconfigs.NodeRoleNode + } + var tx = this.NullTx() - receivers, err := models.SharedMessageReceiverDAO.FindAllEnabledReceivers(tx, models.MessageTaskTarget{ - ClusterId: req.NodeClusterId, - NodeId: req.NodeId, - ServerId: req.ServerId, - }, "") + var cacheMap = maps.Map{} + receivers, err := models.SharedMessageReceiverDAO.FindAllEnabledReceivers(tx, req.Role, req.NodeClusterId, req.NodeId, req.ServerId, "") if err != nil { return nil, err } @@ -78,7 +80,7 @@ func (this *MessageReceiverService) FindAllEnabledMessageReceivers(ctx context.C // 接收人 if receiver.RecipientId > 0 { - recipient, err := models.SharedMessageRecipientDAO.FindEnabledMessageRecipient(tx, int64(receiver.RecipientId)) + recipient, err := models.SharedMessageRecipientDAO.FindEnabledMessageRecipient(tx, int64(receiver.RecipientId), cacheMap) if err != nil { return nil, err } @@ -96,7 +98,7 @@ func (this *MessageReceiverService) FindAllEnabledMessageReceivers(ctx context.C } // 接收人 - instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(recipient.InstanceId)) + instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(recipient.InstanceId), cacheMap) if err != nil { return nil, err } @@ -177,12 +179,12 @@ func (this *MessageReceiverService) CountAllEnabledMessageReceivers(ctx context. return nil, err } + if len(req.Role) == 0 { + req.Role = nodeconfigs.NodeRoleNode + } + var tx = this.NullTx() - count, err := models.SharedMessageReceiverDAO.CountAllEnabledReceivers(tx, models.MessageTaskTarget{ - ClusterId: req.NodeClusterId, - NodeId: req.NodeId, - ServerId: req.ServerId, - }, "") + count, err := models.SharedMessageReceiverDAO.CountAllEnabledReceivers(tx, req.Role, req.NodeClusterId, req.NodeId, req.ServerId, "") if err != nil { return nil, err } diff --git a/internal/rpc/services/service_message_recipient.go b/internal/rpc/services/service_message_recipient.go index 8d92c31d..7c3a20b2 100644 --- a/internal/rpc/services/service_message_recipient.go +++ b/internal/rpc/services/service_message_recipient.go @@ -4,14 +4,15 @@ import ( "context" "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/iwind/TeaGo/maps" ) -// 消息接收人服务 +// MessageRecipientService 消息接收人服务 type MessageRecipientService struct { BaseService } -// 创建接收人 +// CreateMessageRecipient 创建接收人 func (this *MessageRecipientService) CreateMessageRecipient(ctx context.Context, req *pb.CreateMessageRecipientRequest) (*pb.CreateMessageRecipientResponse, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil { @@ -27,7 +28,7 @@ func (this *MessageRecipientService) CreateMessageRecipient(ctx context.Context, return &pb.CreateMessageRecipientResponse{MessageRecipientId: recipientId}, nil } -// 修改接收人 +// UpdateMessageRecipient 修改接收人 func (this *MessageRecipientService) UpdateMessageRecipient(ctx context.Context, req *pb.UpdateMessageRecipientRequest) (*pb.RPCSuccess, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil { @@ -43,7 +44,7 @@ func (this *MessageRecipientService) UpdateMessageRecipient(ctx context.Context, return this.Success() } -// 删除接收人 +// DeleteMessageRecipient 删除接收人 func (this *MessageRecipientService) DeleteMessageRecipient(ctx context.Context, req *pb.DeleteMessageRecipientRequest) (*pb.RPCSuccess, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil { @@ -58,7 +59,7 @@ func (this *MessageRecipientService) DeleteMessageRecipient(ctx context.Context, return this.Success() } -// 计算接收人数量 +// CountAllEnabledMessageRecipients 计算接收人数量 func (this *MessageRecipientService) CountAllEnabledMessageRecipients(ctx context.Context, req *pb.CountAllEnabledMessageRecipientsRequest) (*pb.RPCCountResponse, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil { @@ -74,7 +75,7 @@ func (this *MessageRecipientService) CountAllEnabledMessageRecipients(ctx contex return this.SuccessCount(count) } -// 列出单页接收人 +// ListEnabledMessageRecipients 列出单页接收人 func (this *MessageRecipientService) ListEnabledMessageRecipients(ctx context.Context, req *pb.ListEnabledMessageRecipientsRequest) (*pb.ListEnabledMessageRecipientsResponse, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil { @@ -82,6 +83,7 @@ func (this *MessageRecipientService) ListEnabledMessageRecipients(ctx context.Co } var tx = this.NullTx() + var cacheMap = maps.Map{} recipients, err := models.SharedMessageRecipientDAO.ListAllEnabledRecipients(tx, req.AdminId, req.MessageRecipientGroupId, req.MediaType, req.Keyword, req.Offset, req.Size) if err != nil { return nil, err @@ -104,7 +106,7 @@ func (this *MessageRecipientService) ListEnabledMessageRecipients(ctx context.Co } // 媒介实例 - instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(recipient.InstanceId)) + instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(recipient.InstanceId), cacheMap) if err != nil { return nil, err } @@ -151,7 +153,7 @@ func (this *MessageRecipientService) ListEnabledMessageRecipients(ctx context.Co return &pb.ListEnabledMessageRecipientsResponse{MessageRecipients: pbRecipients}, nil } -// 查找单个接收人信息 +// FindEnabledMessageRecipient 查找单个接收人信息 func (this *MessageRecipientService) FindEnabledMessageRecipient(ctx context.Context, req *pb.FindEnabledMessageRecipientRequest) (*pb.FindEnabledMessageRecipientResponse, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil { @@ -159,7 +161,8 @@ func (this *MessageRecipientService) FindEnabledMessageRecipient(ctx context.Con } var tx = this.NullTx() - recipient, err := models.SharedMessageRecipientDAO.FindEnabledMessageRecipient(tx, req.MessageRecipientId) + var cacheMap = maps.Map{} + recipient, err := models.SharedMessageRecipientDAO.FindEnabledMessageRecipient(tx, req.MessageRecipientId, cacheMap) if err != nil { return nil, err } @@ -183,7 +186,7 @@ func (this *MessageRecipientService) FindEnabledMessageRecipient(ctx context.Con } // 媒介实例 - instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(recipient.InstanceId)) + instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(recipient.InstanceId), cacheMap) if err != nil { return nil, err } diff --git a/internal/rpc/services/service_message_task.go b/internal/rpc/services/service_message_task.go index bad65f5f..a3ebb810 100644 --- a/internal/rpc/services/service_message_task.go +++ b/internal/rpc/services/service_message_task.go @@ -37,6 +37,7 @@ func (this *MessageTaskService) FindSendingMessageTasks(ctx context.Context, req } var tx = this.NullTx() + var cacheMap = maps.Map{} tasks, err := models.SharedMessageTaskDAO.FindSendingMessageTasks(tx, req.Size) if err != nil { return nil, err @@ -45,8 +46,7 @@ func (this *MessageTaskService) FindSendingMessageTasks(ctx context.Context, req for _, task := range tasks { var pbRecipient *pb.MessageRecipient if task.RecipientId > 0 { - // TODO 需要缓存以提升性能 - recipient, err := models.SharedMessageRecipientDAO.FindEnabledMessageRecipient(tx, int64(task.RecipientId)) + recipient, err := models.SharedMessageRecipientDAO.FindEnabledMessageRecipient(tx, int64(task.RecipientId), cacheMap) if err != nil { return nil, err } @@ -60,8 +60,7 @@ func (this *MessageTaskService) FindSendingMessageTasks(ctx context.Context, req } // 媒介 - // TODO 需要缓存以提升性能 - instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(recipient.InstanceId)) + instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(recipient.InstanceId), cacheMap) if err != nil { return nil, err } @@ -87,8 +86,7 @@ func (this *MessageTaskService) FindSendingMessageTasks(ctx context.Context, req } } else { // 没有指定既定的接收人 // 媒介 - // TODO 需要缓存以提升性能 - instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(task.InstanceId)) + instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(task.InstanceId), cacheMap) if err != nil { return nil, err } @@ -186,6 +184,7 @@ func (this *MessageTaskService) FindEnabledMessageTask(ctx context.Context, req } var tx = this.NullTx() + var cacheMap = maps.Map{} task, err := models.SharedMessageTaskDAO.FindEnabledMessageTask(tx, req.MessageTaskId) if err != nil { return nil, err @@ -194,10 +193,9 @@ func (this *MessageTaskService) FindEnabledMessageTask(ctx context.Context, req return &pb.FindEnabledMessageTaskResponse{MessageTask: nil}, nil } - // TODO 需要缓存以提升性能 var pbRecipient *pb.MessageRecipient if task.RecipientId > 0 { - recipient, err := models.SharedMessageRecipientDAO.FindEnabledMessageRecipient(tx, int64(task.RecipientId)) + recipient, err := models.SharedMessageRecipientDAO.FindEnabledMessageRecipient(tx, int64(task.RecipientId), cacheMap) if err != nil { return nil, err } @@ -211,8 +209,7 @@ func (this *MessageTaskService) FindEnabledMessageTask(ctx context.Context, req } // 媒介 - // TODO 需要缓存以提升性能 - instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(recipient.InstanceId)) + instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(recipient.InstanceId), cacheMap) if err != nil { return nil, err } @@ -237,8 +234,7 @@ func (this *MessageTaskService) FindEnabledMessageTask(ctx context.Context, req } } else { // 没有指定既定的接收人 // 媒介 - // TODO 需要缓存以提升性能 - instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(task.InstanceId)) + instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(task.InstanceId), cacheMap) if err != nil { return nil, err } @@ -282,3 +278,127 @@ func (this *MessageTaskService) FindEnabledMessageTask(ctx context.Context, req Result: result, }}, nil } + +// CountMessageTasksWithStatus 计算某个状态的消息任务数量 +func (this *MessageTaskService) CountMessageTasksWithStatus(ctx context.Context, req *pb.CountMessageTasksWithStatusRequest) (*pb.RPCCountResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + count, err := models.SharedMessageTaskDAO.CountMessageTasksWithStatus(tx, types.Int(req.Status)) + if err != nil { + return nil, err + } + + return this.SuccessCount(count) +} + +// ListMessageTasksWithStatus 根据状态列出某页任务 +func (this *MessageTaskService) ListMessageTasksWithStatus(ctx context.Context, req *pb.ListMessageTasksWithStatusRequest) (*pb.ListMessageTasksWithStatusResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + var cacheMap = maps.Map{} + tasks, err := models.SharedMessageTaskDAO.ListMessageTasksWithStatus(tx, types.Int(req.Status), req.Offset, req.Size) + if err != nil { + return nil, err + } + + var pbTasks = []*pb.MessageTask{} + for _, task := range tasks { + var pbRecipient *pb.MessageRecipient + if task.RecipientId > 0 { + recipient, err := models.SharedMessageRecipientDAO.FindEnabledMessageRecipient(tx, int64(task.RecipientId), cacheMap) + if err != nil { + return nil, err + } + if recipient == nil || recipient.IsOn == 0 { + // 如果发送人已经删除或者禁用,则删除此消息 + err = models.SharedMessageTaskDAO.DisableMessageTask(tx, int64(task.Id)) + if err != nil { + return nil, err + } + continue + } + + // 媒介 + instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(recipient.InstanceId), cacheMap) + if err != nil { + return nil, err + } + if instance == nil || instance.IsOn == 0 { + // 如果媒介实例已经删除或者禁用,则删除此消息 + err = models.SharedMessageTaskDAO.DisableMessageTask(tx, int64(task.Id)) + if err != nil { + return nil, err + } + continue + } + + pbRecipient = &pb.MessageRecipient{ + Id: int64(recipient.Id), + User: recipient.User, + MessageMediaInstance: &pb.MessageMediaInstance{ + Id: int64(instance.Id), + Name: instance.Name, + MessageMedia: &pb.MessageMedia{ + Type: instance.MediaType, + }, + ParamsJSON: []byte(instance.Params), + }, + } + } else { // 没有指定既定的接收人 + // 媒介 + instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(task.InstanceId), cacheMap) + if err != nil { + return nil, err + } + if instance == nil || instance.IsOn == 0 { + // 如果媒介实例已经删除或者禁用,则删除此消息 + err = models.SharedMessageTaskDAO.DisableMessageTask(tx, int64(task.Id)) + if err != nil { + return nil, err + } + continue + } + pbRecipient = &pb.MessageRecipient{ + Id: 0, + MessageMediaInstance: &pb.MessageMediaInstance{ + Id: int64(instance.Id), + Name: instance.Name, + MessageMedia: &pb.MessageMedia{ + Type: instance.MediaType, + }, + ParamsJSON: []byte(instance.Params), + }, + } + } + + var result = &pb.MessageTaskResult{} + if len(task.Result) > 0 { + err = json.Unmarshal([]byte(task.Result), result) + if err != nil { + return nil, err + } + } + + pbTasks = append(pbTasks, &pb.MessageTask{ + Id: int64(task.Id), + MessageRecipient: pbRecipient, + User: task.User, + Subject: task.Subject, + Body: task.Body, + CreatedAt: int64(task.CreatedAt), + Status: types.Int32(task.Status), + SentAt: int64(task.SentAt), + Result: result, + }) + } + + return &pb.ListMessageTasksWithStatusResponse{MessageTasks: pbTasks}, nil +} diff --git a/internal/rpc/services/service_message_task_log.go b/internal/rpc/services/service_message_task_log.go index 08818201..a9b61ec9 100644 --- a/internal/rpc/services/service_message_task_log.go +++ b/internal/rpc/services/service_message_task_log.go @@ -4,14 +4,15 @@ import ( "context" "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/iwind/TeaGo/maps" ) -// 消息发送日志相关服务 +// MessageTaskLogService 消息发送日志相关服务 type MessageTaskLogService struct { BaseService } -// 计算日志数量 +// CountMessageTaskLogs 计算日志数量 func (this *MessageTaskLogService) CountMessageTaskLogs(ctx context.Context, req *pb.CountMessageTaskLogsRequest) (*pb.RPCCountResponse, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil { @@ -32,6 +33,7 @@ func (this *MessageTaskLogService) ListMessageTaskLogs(ctx context.Context, req return nil, err } var tx = this.NullTx() + var cacheMap = maps.Map{} logs, err := models.SharedMessageTaskLogDAO.ListLogs(tx, req.Offset, req.Size) if err != nil { return nil, err @@ -49,7 +51,7 @@ func (this *MessageTaskLogService) ListMessageTaskLogs(ctx context.Context, req var pbRecipient *pb.MessageRecipient if task.RecipientId > 0 { - recipient, err := models.SharedMessageRecipientDAO.FindEnabledMessageRecipient(tx, int64(task.RecipientId)) + recipient, err := models.SharedMessageRecipientDAO.FindEnabledMessageRecipient(tx, int64(task.RecipientId), cacheMap) if err != nil { return nil, err } @@ -62,7 +64,7 @@ func (this *MessageTaskLogService) ListMessageTaskLogs(ctx context.Context, req } } - instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(task.InstanceId)) + instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(task.InstanceId), cacheMap) if err != nil { return nil, err } diff --git a/internal/rpc/services/service_node_cluster.go b/internal/rpc/services/service_node_cluster.go index 494fc7b1..5ee2e3a8 100644 --- a/internal/rpc/services/service_node_cluster.go +++ b/internal/rpc/services/service_node_cluster.go @@ -951,9 +951,7 @@ func (this *NodeClusterService) FindEnabledNodeClusterConfigInfo(ctx context.Con result.HasThresholds = countThresholds > 0 // message receivers - countReceivers, err := models.SharedMessageReceiverDAO.CountAllEnabledReceivers(tx, models.MessageTaskTarget{ - ClusterId: req.NodeClusterId, - }, "") + countReceivers, err := models.SharedMessageReceiverDAO.CountAllEnabledReceivers(tx, nodeconfigs.NodeRoleNode, req.NodeClusterId, 0, 0, "") if err != nil { return nil, err }