From 0eec199ff69329da1716e3859eca574038982e04 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Mon, 12 Apr 2021 19:19:15 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E5=8F=91=E9=80=81=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E5=88=B0=E5=AA=92=E4=BB=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 2 +- go.sum | 2 + internal/db/models/message_dao.go | 116 +++++++++--- internal/db/models/message_model.go | 4 +- internal/db/models/message_receiver_dao.go | 115 ++++++++++++ .../db/models/message_receiver_dao_test.go | 6 + internal/db/models/message_receiver_model.go | 30 +++ .../db/models/message_receiver_model_ext.go | 1 + internal/db/models/message_recipient_dao.go | 39 +++- .../db/models/message_recipient_dao_test.go | 11 ++ internal/db/models/message_target.go | 9 + internal/db/models/message_task_dao.go | 40 +++- internal/db/models/node_dao_test.go | 4 +- internal/nodes/api_node.go | 1 + .../rpc/services/service_message_receiver.go | 171 ++++++++++++++++++ .../rpc/services/service_message_recipient.go | 8 +- .../rpc/services/service_message_task_log.go | 3 +- internal/rpc/services/service_node_stream.go | 12 +- internal/rpc/services/service_server.go | 8 +- internal/tasks/health_check_cluster_task.go | 4 +- internal/tasks/health_check_executor.go | 6 +- internal/tasks/log_task.go | 2 +- internal/tasks/node_monitor_task.go | 6 +- .../tasks/ssl_cert_expire_check_executor.go | 19 +- 24 files changed, 548 insertions(+), 71 deletions(-) create mode 100644 internal/db/models/message_receiver_dao.go create mode 100644 internal/db/models/message_receiver_dao_test.go create mode 100644 internal/db/models/message_receiver_model.go create mode 100644 internal/db/models/message_receiver_model_ext.go create mode 100644 internal/db/models/message_target.go create mode 100644 internal/rpc/services/service_message_receiver.go diff --git a/go.mod b/go.mod index 8803750b..459d6ac2 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/go-sql-driver/mysql v1.5.0 github.com/go-yaml/yaml v2.1.0+incompatible github.com/golang/protobuf v1.4.2 - github.com/iwind/TeaGo v0.0.0-20210325033016-3279bdaa087d + github.com/iwind/TeaGo v0.0.0-20210411134150-ddf57e240c2f github.com/lionsoul2014/ip2region v2.2.0-release+incompatible github.com/mozillazg/go-pinyin v0.18.0 github.com/pkg/sftp v1.12.0 diff --git a/go.sum b/go.sum index 71d31b2e..526b102d 100644 --- a/go.sum +++ b/go.sum @@ -181,6 +181,8 @@ github.com/iwind/TeaGo v0.0.0-20210302120856-7588e79bdbe3 h1:k9K3HHMmkF7HYyIHz21 github.com/iwind/TeaGo v0.0.0-20210302120856-7588e79bdbe3/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= github.com/iwind/TeaGo v0.0.0-20210325033016-3279bdaa087d h1:FQTYJmZeCMdwM0Bz+C4h31SDBt04ap6A4JOjm+FfYwk= github.com/iwind/TeaGo v0.0.0-20210325033016-3279bdaa087d/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= +github.com/iwind/TeaGo v0.0.0-20210411134150-ddf57e240c2f h1:r2O8PONj/KiuZjJHVHn7KlCePUIjNtgAmvLfgRafQ8o= +github.com/iwind/TeaGo v0.0.0-20210411134150-ddf57e240c2f/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc= github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= diff --git a/internal/db/models/message_dao.go b/internal/db/models/message_dao.go index 9080c137..42942733 100644 --- a/internal/db/models/message_dao.go +++ b/internal/db/models/message_dao.go @@ -25,12 +25,12 @@ const ( type MessageType = string const ( - MessageTypeHealthCheckFailed MessageType = "HealthCheckFailed" - MessageTypeHealthCheckNodeUp MessageType = "HealthCheckNodeUp" - MessageTypeHealthCheckNodeDown MessageType = "HealthCheckNodeDown" - MessageTypeNodeInactive MessageType = "NodeInactive" - MessageTypeNodeActive MessageType = "NodeActive" - MessageTypeClusterDNSSyncFailed MessageType = "ClusterDNSSyncFailed" + MessageTypeHealthCheckFailed MessageType = "HealthCheckFailed" // 节点健康检查失败 + MessageTypeHealthCheckNodeUp MessageType = "HealthCheckNodeUp" // 因健康检查节点上线 + MessageTypeHealthCheckNodeDown MessageType = "HealthCheckNodeDown" // 因健康检查节点下线 + MessageTypeNodeInactive MessageType = "NodeInactive" // 节点不活跃 + MessageTypeNodeActive MessageType = "NodeActive" // 节点活跃 + MessageTypeClusterDNSSyncFailed MessageType = "ClusterDNSSyncFailed" // DNS同步失败 MessageTypeSSLCertExpiring MessageType = "SSLCertExpiring" // SSL证书即将过期 MessageTypeSSLCertACMETaskFailed MessageType = "SSLCertACMETaskFailed" // SSL证书任务执行失败 MessageTypeSSLCertACMETaskSuccess MessageType = "SSLCertACMETaskSuccess" // SSL证书任务执行成功 @@ -60,7 +60,7 @@ func init() { }) } -// 启用条目 +// EnableMessage 启用条目 func (this *MessageDAO) EnableMessage(tx *dbs.Tx, id int64) error { _, err := this.Query(tx). Pk(id). @@ -69,7 +69,7 @@ func (this *MessageDAO) EnableMessage(tx *dbs.Tx, id int64) error { return err } -// 禁用条目 +// DisableMessage 禁用条目 func (this *MessageDAO) DisableMessage(tx *dbs.Tx, id int64) error { _, err := this.Query(tx). Pk(id). @@ -78,7 +78,7 @@ func (this *MessageDAO) DisableMessage(tx *dbs.Tx, id int64) error { return err } -// 查找启用中的条目 +// FindEnabledMessage 查找启用中的条目 func (this *MessageDAO) FindEnabledMessage(tx *dbs.Tx, id int64) (*Message, error) { result, err := this.Query(tx). Pk(id). @@ -90,20 +90,60 @@ func (this *MessageDAO) FindEnabledMessage(tx *dbs.Tx, id int64) (*Message, erro return result.(*Message), err } -// 创建集群消息 -func (this *MessageDAO) CreateClusterMessage(tx *dbs.Tx, clusterId int64, messageType MessageType, level string, body string, paramsJSON []byte) error { - _, err := this.createMessage(tx, clusterId, 0, messageType, level, body, paramsJSON) - return err +// CreateClusterMessage 创建集群消息 +func (this *MessageDAO) CreateClusterMessage(tx *dbs.Tx, clusterId int64, messageType MessageType, level string, subject string, body string, paramsJSON []byte) error { + _, err := this.createMessage(tx, clusterId, 0, messageType, level, subject, body, paramsJSON) + if err != nil { + return err + } + + // 发送给媒介接收人 + err = SharedMessageTaskDAO.CreateMessageTasks(tx, MessageTaskTarget{ + ClusterId: clusterId, + NodeId: 0, + ServerId: 0, + }, messageType, subject, body) + if err != nil { + return err + } + + return nil } -// 创建节点消息 -func (this *MessageDAO) CreateNodeMessage(tx *dbs.Tx, clusterId int64, nodeId int64, messageType MessageType, level string, body string, paramsJSON []byte) error { - _, err := this.createMessage(tx, clusterId, nodeId, messageType, level, body, paramsJSON) - return err +// CreateNodeMessage 创建节点消息 +func (this *MessageDAO) CreateNodeMessage(tx *dbs.Tx, clusterId int64, nodeId int64, messageType MessageType, level string, subject string, body string, paramsJSON []byte) error { + _, err := this.createMessage(tx, clusterId, nodeId, messageType, level, subject, body, paramsJSON) + if err != nil { + return err + } + + // 发送给媒介接收人 - 集群 + err = SharedMessageTaskDAO.CreateMessageTasks(tx, MessageTaskTarget{ + ClusterId: clusterId, + NodeId: 0, + ServerId: 0, + }, messageType, subject, body) + if err != nil { + return err + } + + // 发送给媒介接收人 - 节点 + if nodeId > 0 { + err = SharedMessageTaskDAO.CreateMessageTasks(tx, MessageTaskTarget{ + ClusterId: clusterId, + NodeId: nodeId, + ServerId: 0, + }, messageType, subject, body) + if err != nil { + return err + } + } + + return nil } -// 创建普通消息 -func (this *MessageDAO) CreateMessage(tx *dbs.Tx, adminId int64, userId int64, messageType MessageType, level string, body string, paramsJSON []byte) error { +// CreateMessage 创建普通消息 +func (this *MessageDAO) CreateMessage(tx *dbs.Tx, adminId int64, userId int64, messageType MessageType, level string, subject string, body string, paramsJSON []byte) error { h := md5.New() h.Write([]byte(body)) h.Write(paramsJSON) @@ -114,6 +154,14 @@ func (this *MessageDAO) CreateMessage(tx *dbs.Tx, adminId int64, userId int64, m op.UserId = userId op.Type = messageType op.Level = level + + subjectRunes := []rune(subject) + if len(subjectRunes) > 100 { + op.Subject = string(subjectRunes[:100]) + "..." + } else { + op.Subject = subject + } + op.Body = body if len(paramsJSON) > 0 { op.Params = paramsJSON @@ -123,10 +171,14 @@ func (this *MessageDAO) CreateMessage(tx *dbs.Tx, adminId int64, userId int64, m op.Day = timeutil.Format("Ymd") op.Hash = hash err := this.Save(tx, op) - return err + if err != nil { + return err + } + + return nil } -// 删除某天之前的消息 +// DeleteMessagesBeforeDay 删除某天之前的消息 func (this *MessageDAO) DeleteMessagesBeforeDay(tx *dbs.Tx, dayTime time.Time) error { day := timeutil.Format("Ymd", dayTime) _, err := this.Query(tx). @@ -136,7 +188,7 @@ func (this *MessageDAO) DeleteMessagesBeforeDay(tx *dbs.Tx, dayTime time.Time) e return err } -// 计算未读消息数量 +// CountUnreadMessages 计算未读消息数量 func (this *MessageDAO) CountUnreadMessages(tx *dbs.Tx, adminId int64, userId int64) (int64, error) { query := this.Query(tx). Attr("isRead", false) @@ -149,7 +201,7 @@ func (this *MessageDAO) CountUnreadMessages(tx *dbs.Tx, adminId int64, userId in return query.Count() } -// 列出单页未读消息 +// ListUnreadMessages 列出单页未读消息 func (this *MessageDAO) ListUnreadMessages(tx *dbs.Tx, adminId int64, userId int64, offset int64, size int64) (result []*Message, err error) { query := this.Query(tx). Attr("isRead", false) @@ -168,7 +220,7 @@ func (this *MessageDAO) ListUnreadMessages(tx *dbs.Tx, adminId int64, userId int return } -// 设置消息已读状态 +// UpdateMessageRead 设置消息已读状态 func (this *MessageDAO) UpdateMessageRead(tx *dbs.Tx, messageId int64, b bool) error { if messageId <= 0 { return errors.New("invalid messageId") @@ -180,7 +232,7 @@ func (this *MessageDAO) UpdateMessageRead(tx *dbs.Tx, messageId int64, b bool) e return err } -// 设置一组消息为已读状态 +// UpdateMessagesRead 设置一组消息为已读状态 func (this *MessageDAO) UpdateMessagesRead(tx *dbs.Tx, messageIds []int64, b bool) error { // 这里我们一个一个更改,因为In语句不容易Prepare,且效率不高 for _, messageId := range messageIds { @@ -192,7 +244,7 @@ func (this *MessageDAO) UpdateMessagesRead(tx *dbs.Tx, messageIds []int64, b boo return nil } -// 设置所有消息为已读 +// UpdateAllMessagesRead 设置所有消息为已读 func (this *MessageDAO) UpdateAllMessagesRead(tx *dbs.Tx, adminId int64, userId int64) error { query := this.Query(tx). Attr("isRead", false) @@ -208,7 +260,7 @@ func (this *MessageDAO) UpdateAllMessagesRead(tx *dbs.Tx, adminId int64, userId return err } -// 检查消息权限 +// CheckMessageUser 检查消息权限 func (this *MessageDAO) CheckMessageUser(tx *dbs.Tx, messageId int64, adminId int64, userId int64) (bool, error) { if messageId <= 0 || (adminId <= 0 && userId <= 0) { return false, nil @@ -225,7 +277,7 @@ func (this *MessageDAO) CheckMessageUser(tx *dbs.Tx, messageId int64, adminId in } // 创建消息 -func (this *MessageDAO) createMessage(tx *dbs.Tx, clusterId int64, nodeId int64, messageType MessageType, level string, body string, paramsJSON []byte) (int64, error) { +func (this *MessageDAO) createMessage(tx *dbs.Tx, clusterId int64, nodeId int64, messageType MessageType, level string, subject string, body string, paramsJSON []byte) (int64, error) { h := md5.New() h.Write([]byte(body)) h.Write(paramsJSON) @@ -241,6 +293,14 @@ func (this *MessageDAO) createMessage(tx *dbs.Tx, clusterId int64, nodeId int64, op.NodeId = nodeId op.Type = messageType op.Level = level + + subjectRunes := []rune(subject) + if len(subjectRunes) > 100 { + op.Subject = string(subjectRunes[:100]) + "..." + } else { + op.Subject = subject + } + op.Body = body if len(paramsJSON) > 0 { op.Params = paramsJSON diff --git a/internal/db/models/message_model.go b/internal/db/models/message_model.go index c79305cc..8db81a78 100644 --- a/internal/db/models/message_model.go +++ b/internal/db/models/message_model.go @@ -1,6 +1,6 @@ package models -// 消息通知 +// Message 消息通知 type Message struct { Id uint64 `field:"id"` // ID AdminId uint32 `field:"adminId"` // 管理员ID @@ -8,6 +8,7 @@ type Message struct { ClusterId uint32 `field:"clusterId"` // 集群ID NodeId uint32 `field:"nodeId"` // 节点ID Level string `field:"level"` // 级别 + Subject string `field:"subject"` // 标题 Body string `field:"body"` // 内容 Type string `field:"type"` // 消息类型 Params string `field:"params"` // 额外的参数 @@ -25,6 +26,7 @@ type MessageOperator struct { ClusterId interface{} // 集群ID NodeId interface{} // 节点ID Level interface{} // 级别 + Subject interface{} // 标题 Body interface{} // 内容 Type interface{} // 消息类型 Params interface{} // 额外的参数 diff --git a/internal/db/models/message_receiver_dao.go b/internal/db/models/message_receiver_dao.go new file mode 100644 index 00000000..30353243 --- /dev/null +++ b/internal/db/models/message_receiver_dao.go @@ -0,0 +1,115 @@ +package models + +import ( + "encoding/json" + _ "github.com/go-sql-driver/mysql" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/maps" +) + +const ( + MessageReceiverStateEnabled = 1 // 已启用 + MessageReceiverStateDisabled = 0 // 已禁用 +) + +type MessageReceiverDAO dbs.DAO + +func NewMessageReceiverDAO() *MessageReceiverDAO { + return dbs.NewDAO(&MessageReceiverDAO{ + DAOObject: dbs.DAOObject{ + DB: Tea.Env, + Table: "edgeMessageReceivers", + Model: new(MessageReceiver), + PkName: "id", + }, + }).(*MessageReceiverDAO) +} + +var SharedMessageReceiverDAO *MessageReceiverDAO + +func init() { + dbs.OnReady(func() { + SharedMessageReceiverDAO = NewMessageReceiverDAO() + }) +} + +// EnableMessageReceiver 启用条目 +func (this *MessageReceiverDAO) EnableMessageReceiver(tx *dbs.Tx, id int64) error { + _, err := this.Query(tx). + Pk(id). + Set("state", MessageReceiverStateEnabled). + Update() + return err +} + +// DisableMessageReceiver 禁用条目 +func (this *MessageReceiverDAO) DisableMessageReceiver(tx *dbs.Tx, id int64) error { + _, err := this.Query(tx). + Pk(id). + Set("state", MessageReceiverStateDisabled). + Update() + return err +} + +// FindEnabledMessageReceiver 查找启用中的条目 +func (this *MessageReceiverDAO) FindEnabledMessageReceiver(tx *dbs.Tx, id int64) (*MessageReceiver, error) { + result, err := this.Query(tx). + Pk(id). + Attr("state", MessageReceiverStateEnabled). + Find() + if result == nil { + return nil, err + } + return result.(*MessageReceiver), err +} + +// DisableReceivers 禁用一组接收人 +func (this *MessageReceiverDAO) DisableReceivers(tx *dbs.Tx, clusterId int64, nodeId int64, serverId int64) error { + return this.Query(tx). + Attr("clusterId", clusterId). + Attr("nodeId", nodeId). + Attr("serverId", serverId). + Set("state", MessageReceiverStateDisabled). + UpdateQuickly() +} + +// CreateReceiver 创建接收人 +func (this *MessageReceiverDAO) CreateReceiver(tx *dbs.Tx, target MessageTaskTarget, messageType MessageType, params maps.Map, recipientId int64, recipientGroupId int64) (int64, error) { + op := NewMessageReceiverOperator() + op.ClusterId = target.ClusterId + op.NodeId = target.NodeId + op.ServerId = target.ServerId + op.Type = messageType + + if params == nil { + params = maps.Map{} + } + paramsJSON, err := json.Marshal(params) + if err != nil { + return 0, err + } + op.Params = paramsJSON + + op.RecipientId = recipientId + op.RecipientGroupId = recipientGroupId + op.State = MessageReceiverStateEnabled + return this.SaveInt64(tx, op) +} + +// FindAllReceivers 查询接收人 +func (this *MessageReceiverDAO) FindAllReceivers(tx *dbs.Tx, target MessageTaskTarget, messageType string) (result []*MessageReceiver, err error) { + query := this.Query(tx) + if len(messageType) > 0 { + query.Attr("type", []string{"*", messageType}) // *表示所有的 + } + _, err = query. + Attr("clusterId", target.ClusterId). + Attr("nodeId", target.NodeId). + Attr("serverId", target.ServerId). + State(MessageReceiverStateEnabled). + AscPk(). + Slice(&result). + FindAll() + return +} diff --git a/internal/db/models/message_receiver_dao_test.go b/internal/db/models/message_receiver_dao_test.go new file mode 100644 index 00000000..224e9db7 --- /dev/null +++ b/internal/db/models/message_receiver_dao_test.go @@ -0,0 +1,6 @@ +package models + +import ( + _ "github.com/go-sql-driver/mysql" + _ "github.com/iwind/TeaGo/bootstrap" +) diff --git a/internal/db/models/message_receiver_model.go b/internal/db/models/message_receiver_model.go new file mode 100644 index 00000000..752f8f39 --- /dev/null +++ b/internal/db/models/message_receiver_model.go @@ -0,0 +1,30 @@ +package models + +// MessageReceiver 消息通知接收人 +type MessageReceiver struct { + Id uint32 `field:"id"` // ID + ClusterId uint32 `field:"clusterId"` // 集群ID + NodeId uint32 `field:"nodeId"` // 节点ID + ServerId uint32 `field:"serverId"` // 服务ID + Type string `field:"type"` // 类型 + Params string `field:"params"` // 参数 + RecipientId uint32 `field:"recipientId"` // 接收人ID + RecipientGroupId uint32 `field:"recipientGroupId"` // 接收人分组ID + State uint8 `field:"state"` // 状态 +} + +type MessageReceiverOperator struct { + Id interface{} // ID + ClusterId interface{} // 集群ID + NodeId interface{} // 节点ID + ServerId interface{} // 服务ID + Type interface{} // 类型 + Params interface{} // 参数 + RecipientId interface{} // 接收人ID + RecipientGroupId interface{} // 接收人分组ID + State interface{} // 状态 +} + +func NewMessageReceiverOperator() *MessageReceiverOperator { + return &MessageReceiverOperator{} +} diff --git a/internal/db/models/message_receiver_model_ext.go b/internal/db/models/message_receiver_model_ext.go new file mode 100644 index 00000000..2640e7f9 --- /dev/null +++ b/internal/db/models/message_receiver_model_ext.go @@ -0,0 +1 @@ +package models diff --git a/internal/db/models/message_recipient_dao.go b/internal/db/models/message_recipient_dao.go index 1430cea4..0d407f4a 100644 --- a/internal/db/models/message_recipient_dao.go +++ b/internal/db/models/message_recipient_dao.go @@ -3,6 +3,7 @@ package models import ( "encoding/json" "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" @@ -34,7 +35,7 @@ func init() { }) } -// 启用条目 +// EnableMessageRecipient 启用条目 func (this *MessageRecipientDAO) EnableMessageRecipient(tx *dbs.Tx, id int64) error { _, err := this.Query(tx). Pk(id). @@ -43,7 +44,7 @@ func (this *MessageRecipientDAO) EnableMessageRecipient(tx *dbs.Tx, id int64) er return err } -// 禁用条目 +// DisableMessageRecipient 禁用条目 func (this *MessageRecipientDAO) DisableMessageRecipient(tx *dbs.Tx, id int64) error { _, err := this.Query(tx). Pk(id). @@ -52,7 +53,7 @@ func (this *MessageRecipientDAO) DisableMessageRecipient(tx *dbs.Tx, id int64) e return err } -// 查找启用中的条目 +// FindEnabledMessageRecipient 查找启用中的条目 func (this *MessageRecipientDAO) FindEnabledMessageRecipient(tx *dbs.Tx, id int64) (*MessageRecipient, error) { result, err := this.Query(tx). Pk(id). @@ -64,7 +65,7 @@ func (this *MessageRecipientDAO) FindEnabledMessageRecipient(tx *dbs.Tx, id int6 return result.(*MessageRecipient), err } -// 创建接收人 +// CreateRecipient 创建接收人 func (this *MessageRecipientDAO) CreateRecipient(tx *dbs.Tx, adminId int64, instanceId int64, user string, groupIds []int64, description string) (int64, error) { op := NewMessageRecipientOperator() op.AdminId = adminId @@ -87,7 +88,7 @@ func (this *MessageRecipientDAO) CreateRecipient(tx *dbs.Tx, adminId int64, inst return this.SaveInt64(tx, op) } -// 修改接收人 +// UpdateRecipient 修改接收人 func (this *MessageRecipientDAO) UpdateRecipient(tx *dbs.Tx, recipientId int64, adminId int64, instanceId int64, user string, groupIds []int64, description string, isOn bool) error { if recipientId <= 0 { return errors.New("invalid recipientId") @@ -114,14 +115,15 @@ func (this *MessageRecipientDAO) UpdateRecipient(tx *dbs.Tx, recipientId int64, return this.Save(tx, op) } -// 计算接收人数量 +// CountAllEnabledRecipients 计算接收人数量 func (this *MessageRecipientDAO) CountAllEnabledRecipients(tx *dbs.Tx, adminId int64, groupId int64, mediaType string, keyword string) (int64, error) { query := this.Query(tx) if adminId > 0 { query.Attr("adminId", adminId) } if groupId > 0 { - query.Attr("groupId", groupId) + query.Where("JSON_CONTAINS(groupIds, :groupId)"). + Param("groupId", numberutils.FormatInt64(groupId)) } if len(mediaType) > 0 { query.Where("instanceId IN (SELECT id FROM "+SharedMessageMediaInstanceDAO.Table+" WHERE state=1 AND mediaType=:mediaType)"). @@ -138,14 +140,15 @@ func (this *MessageRecipientDAO) CountAllEnabledRecipients(tx *dbs.Tx, adminId i Count() } -// 列出单页接收人 +// ListAllEnabledRecipients 列出单页接收人 func (this *MessageRecipientDAO) ListAllEnabledRecipients(tx *dbs.Tx, adminId int64, groupId int64, mediaType string, keyword string, offset int64, size int64) (result []*MessageRecipient, err error) { query := this.Query(tx) if adminId > 0 { query.Attr("adminId", adminId) } if groupId > 0 { - query.Attr("groupId", groupId) + query.Where("JSON_CONTAINS(groupIds, :groupId)"). + Param("groupId", numberutils.FormatInt64(groupId)) } if len(mediaType) > 0 { query.Where("instanceId IN (SELECT id FROM "+SharedMessageMediaInstanceDAO.Table+" WHERE state=1 AND mediaType=:mediaType)"). @@ -166,3 +169,21 @@ func (this *MessageRecipientDAO) ListAllEnabledRecipients(tx *dbs.Tx, adminId in FindAll() return } + +// FindAllEnabledAndOnRecipientIdsWithGroup 查找某个分组下的所有可用接收人ID +func (this *MessageRecipientDAO) FindAllEnabledAndOnRecipientIdsWithGroup(tx *dbs.Tx, groupId int64) ([]int64, error) { + ones, err := this.Query(tx). + Where("JSON_CONTAINS(groupIds, :groupId)"). + Param("groupId", numberutils.FormatInt64(groupId)). + State(MessageRecipientStateEnabled). + ResultPk(). + FindAll() + if err != nil { + return nil, err + } + result := []int64{} + for _, one := range ones { + result = append(result, int64(one.(*MessageRecipient).Id)) + } + return result, nil +} diff --git a/internal/db/models/message_recipient_dao_test.go b/internal/db/models/message_recipient_dao_test.go index 224e9db7..2c56ac48 100644 --- a/internal/db/models/message_recipient_dao_test.go +++ b/internal/db/models/message_recipient_dao_test.go @@ -3,4 +3,15 @@ package models import ( _ "github.com/go-sql-driver/mysql" _ "github.com/iwind/TeaGo/bootstrap" + "github.com/iwind/TeaGo/dbs" + "testing" ) + +func TestMessageRecipientDAO_FindAllEnabledAndOnRecipientIdsWithGroup(t *testing.T) { + dbs.NotifyReady() + recipientIds, err := SharedMessageRecipientDAO.FindAllEnabledAndOnRecipientIdsWithGroup(nil, 4) + if err != nil { + t.Fatal(err) + } + t.Log(recipientIds) +} diff --git a/internal/db/models/message_target.go b/internal/db/models/message_target.go new file mode 100644 index 00000000..550871ae --- /dev/null +++ b/internal/db/models/message_target.go @@ -0,0 +1,9 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package models + +type MessageTaskTarget struct { + ClusterId int64 + NodeId int64 + ServerId int64 +} diff --git a/internal/db/models/message_task_dao.go b/internal/db/models/message_task_dao.go index 34db17ac..f4e47d26 100644 --- a/internal/db/models/message_task_dao.go +++ b/internal/db/models/message_task_dao.go @@ -41,7 +41,7 @@ func init() { }) } -// 启用条目 +// EnableMessageTask 启用条目 func (this *MessageTaskDAO) EnableMessageTask(tx *dbs.Tx, id int64) error { _, err := this.Query(tx). Pk(id). @@ -50,7 +50,7 @@ func (this *MessageTaskDAO) EnableMessageTask(tx *dbs.Tx, id int64) error { return err } -// 禁用条目 +// DisableMessageTask 禁用条目 func (this *MessageTaskDAO) DisableMessageTask(tx *dbs.Tx, id int64) error { _, err := this.Query(tx). Pk(id). @@ -59,7 +59,7 @@ func (this *MessageTaskDAO) DisableMessageTask(tx *dbs.Tx, id int64) error { return err } -// 查找启用中的条目 +// FindEnabledMessageTask 查找启用中的条目 func (this *MessageTaskDAO) FindEnabledMessageTask(tx *dbs.Tx, id int64) (*MessageTask, error) { result, err := this.Query(tx). Pk(id). @@ -71,7 +71,7 @@ func (this *MessageTaskDAO) FindEnabledMessageTask(tx *dbs.Tx, id int64) (*Messa return result.(*MessageTask), err } -// 创建任务 +// CreateMessageTask 创建任务 func (this *MessageTaskDAO) CreateMessageTask(tx *dbs.Tx, recipientId int64, instanceId int64, user string, subject string, body string, isPrimary bool) (int64, error) { op := NewMessageTaskOperator() op.RecipientId = recipientId @@ -85,7 +85,7 @@ func (this *MessageTaskDAO) CreateMessageTask(tx *dbs.Tx, recipientId int64, ins return this.SaveInt64(tx, op) } -// 查找需要发送的任务 +// FindSendingMessageTasks 查找需要发送的任务 func (this *MessageTaskDAO) FindSendingMessageTasks(tx *dbs.Tx, size int64) (result []*MessageTask, err error) { if size <= 0 { return nil, nil @@ -101,7 +101,7 @@ func (this *MessageTaskDAO) FindSendingMessageTasks(tx *dbs.Tx, size int64) (res return } -// 设置发送的状态 +// UpdateMessageTaskStatus 设置发送的状态 func (this *MessageTaskDAO) UpdateMessageTaskStatus(tx *dbs.Tx, taskId int64, status MessageTaskStatus, result []byte) error { if taskId <= 0 { return errors.New("invalid taskId") @@ -115,3 +115,31 @@ func (this *MessageTaskDAO) UpdateMessageTaskStatus(tx *dbs.Tx, taskId int64, st } return this.Save(tx, op) } + +// CreateMessageTasks 从集群、节点或者服务中创建任务 +func (this *MessageTaskDAO) CreateMessageTasks(tx *dbs.Tx, target MessageTaskTarget, messageType MessageType, subject string, body string) error { + receivers, err := SharedMessageReceiverDAO.FindAllReceivers(tx, target, messageType) + if err != nil { + return err + } + for _, receiver := range receivers { + if receiver.RecipientId > 0 { + _, err := this.CreateMessageTask(tx, int64(receiver.RecipientId), 0, "", subject, body, false) + if err != nil { + return err + } + } else if receiver.RecipientGroupId > 0 { + recipientIds, err := SharedMessageRecipientDAO.FindAllEnabledAndOnRecipientIdsWithGroup(tx, int64(receiver.RecipientGroupId)) + if err != nil { + return err + } + for _, recipientId := range recipientIds { + _, err := this.CreateMessageTask(tx, recipientId, 0, "", subject, body, false) + if err != nil { + return err + } + } + } + } + return nil +} diff --git a/internal/db/models/node_dao_test.go b/internal/db/models/node_dao_test.go index e98ef6d5..38da5fe9 100644 --- a/internal/db/models/node_dao_test.go +++ b/internal/db/models/node_dao_test.go @@ -18,9 +18,9 @@ func TestNodeDAO_FindAllNodeIdsMatch(t *testing.T) { func TestNodeDAO_UpdateNodeUp(t *testing.T) { dbs.NotifyReady() var tx *dbs.Tx - isChanged, err := SharedNodeDAO.UpdateNodeUp(tx, 57, false, 3, 3) + err := SharedNodeDAO.UpdateNodeUp(tx, 57, false) if err != nil { t.Fatal(err) } - t.Log("changed:", isChanged) + t.Log("ok") } diff --git a/internal/nodes/api_node.go b/internal/nodes/api_node.go index d804f46e..8789f4a5 100644 --- a/internal/nodes/api_node.go +++ b/internal/nodes/api_node.go @@ -210,6 +210,7 @@ func (this *APINode) listenRPC(listener net.Listener, tlsConfig *tls.Config) err pb.RegisterHTTPAccessLogServiceServer(rpcServer, &services.HTTPAccessLogService{}) pb.RegisterMessageServiceServer(rpcServer, &services.MessageService{}) pb.RegisterMessageRecipientServiceServer(rpcServer, &services.MessageRecipientService{}) + pb.RegisterMessageReceiverServiceServer(rpcServer, &services.MessageReceiverService{}) pb.RegisterMessageMediaServiceServer(rpcServer, &services.MessageMediaService{}) pb.RegisterMessageRecipientGroupServiceServer(rpcServer, &services.MessageRecipientGroupService{}) pb.RegisterMessageMediaInstanceServiceServer(rpcServer, &services.MessageMediaInstanceService{}) diff --git a/internal/rpc/services/service_message_receiver.go b/internal/rpc/services/service_message_receiver.go new file mode 100644 index 00000000..06c2b5ab --- /dev/null +++ b/internal/rpc/services/service_message_receiver.go @@ -0,0 +1,171 @@ +package services + +import ( + "context" + "encoding/json" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/maps" +) + +// MessageReceiverService 消息对象接收人 +type MessageReceiverService struct { + BaseService +} + +// UpdateMessageReceivers 创建接收者 +func (this *MessageReceiverService) UpdateMessageReceivers(ctx context.Context, req *pb.UpdateMessageReceiversRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + params := maps.Map{} + if len(req.ParamsJSON) > 0 { + err = json.Unmarshal(req.ParamsJSON, ¶ms) + if err != nil { + return nil, err + } + } + + err = this.RunTx(func(tx *dbs.Tx) error { + err = models.SharedMessageReceiverDAO.DisableReceivers(tx, req.NodeClusterId, req.NodeId, req.ServerId) + if err != nil { + return err + } + + for messageType, options := range req.RecipientOptions { + for _, option := range options.RecipientOptions { + _, err := models.SharedMessageReceiverDAO.CreateReceiver(tx, models.MessageTaskTarget{ + ClusterId: req.NodeClusterId, + NodeId: req.NodeId, + ServerId: req.ServerId, + }, messageType, params, option.MessageRecipientId, option.MessageRecipientGroupId) + if err != nil { + return err + } + } + } + return nil + }) + if err != nil { + return nil, err + } + + return this.Success() +} + +// FindAllMessageReceivers 查找接收者 +func (this *MessageReceiverService) FindAllMessageReceivers(ctx context.Context, req *pb.FindAllMessageReceiversRequest) (*pb.FindAllMessageReceiversResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + receivers, err := models.SharedMessageReceiverDAO.FindAllReceivers(tx, models.MessageTaskTarget{ + ClusterId: req.NodeClusterId, + NodeId: req.NodeId, + ServerId: req.ServerId, + }, "") + if err != nil { + return nil, err + } + pbReceivers := []*pb.MessageReceiver{} + for _, receiver := range receivers { + var pbRecipient *pb.MessageRecipient = nil + + // 接收人 + if receiver.RecipientId > 0 { + recipient, err := models.SharedMessageRecipientDAO.FindEnabledMessageRecipient(tx, int64(receiver.RecipientId)) + if err != nil { + return nil, err + } + if recipient == nil { + continue + } + + // 管理员 + admin, err := models.SharedAdminDAO.FindEnabledAdmin(tx, int64(recipient.AdminId)) + if err != nil { + return nil, err + } + if admin == nil { + continue + } + + // 接收人 + instance, err := models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, int64(recipient.InstanceId)) + if err != nil { + return nil, err + } + if instance == nil { + continue + } + + pbRecipient = &pb.MessageRecipient{ + Id: int64(recipient.Id), + Admin: &pb.Admin{ + Id: int64(admin.Id), + Fullname: admin.Fullname, + Username: admin.Username, + IsOn: admin.IsOn == 1, + }, + MessageMediaInstance: &pb.MessageMediaInstance{ + Id: int64(instance.Id), + Name: instance.Name, + IsOn: instance.IsOn == 1, + }, + IsOn: recipient.IsOn == 1, + MessageRecipientGroups: nil, + Description: "", + User: "", + } + } + + // 接收人分组 + var pbRecipientGroup *pb.MessageRecipientGroup = nil + if receiver.RecipientGroupId > 0 { + group, err := models.SharedMessageRecipientGroupDAO.FindEnabledMessageRecipientGroup(tx, int64(receiver.RecipientGroupId)) + if err != nil { + return nil, err + } + if group == nil { + continue + } + pbRecipientGroup = &pb.MessageRecipientGroup{ + Id: int64(group.Id), + Name: group.Name, + IsOn: group.IsOn == 1, + } + } + + pbReceivers = append(pbReceivers, &pb.MessageReceiver{ + Id: int64(receiver.Id), + ClusterId: int64(receiver.ClusterId), + NodeId: int64(receiver.NodeId), + ServerId: int64(receiver.ServerId), + Type: receiver.Type, + ParamsJSON: []byte(receiver.Params), + MessageRecipient: pbRecipient, + MessageRecipientGroup: pbRecipientGroup, + }) + } + return &pb.FindAllMessageReceiversResponse{MessageReceivers: pbReceivers}, nil +} + +// DeleteMessageReceiver 删除接收者 +func (this *MessageReceiverService) DeleteMessageReceiver(ctx context.Context, req *pb.DeleteMessageReceiverRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + err = models.SharedMessageReceiverDAO.DisableMessageReceiver(tx, req.MessageReceiverId) + if err != nil { + return nil, err + } + return this.Success() +} diff --git a/internal/rpc/services/service_message_recipient.go b/internal/rpc/services/service_message_recipient.go index 6507dc7b..8d92c31d 100644 --- a/internal/rpc/services/service_message_recipient.go +++ b/internal/rpc/services/service_message_recipient.go @@ -19,7 +19,7 @@ func (this *MessageRecipientService) CreateMessageRecipient(ctx context.Context, } var tx = this.NullTx() - recipientId, err := models.SharedMessageRecipientDAO.CreateRecipient(tx, req.AdminId, req.InstanceId, req.User, req.GroupIds, req.Description) + recipientId, err := models.SharedMessageRecipientDAO.CreateRecipient(tx, req.AdminId, req.MessageMediaInstanceId, req.User, req.MessageRecipientGroupIds, req.Description) if err != nil { return nil, err } @@ -35,7 +35,7 @@ func (this *MessageRecipientService) UpdateMessageRecipient(ctx context.Context, } var tx = this.NullTx() - err = models.SharedMessageRecipientDAO.UpdateRecipient(tx, req.MessageRecipientId, req.AdminId, req.InstanceId, req.User, req.GroupIds, req.Description, req.IsOn) + err = models.SharedMessageRecipientDAO.UpdateRecipient(tx, req.MessageRecipientId, req.AdminId, req.MessageMediaInstanceId, req.User, req.MessageRecipientGroupIds, req.Description, req.IsOn) if err != nil { return nil, err } @@ -66,7 +66,7 @@ func (this *MessageRecipientService) CountAllEnabledMessageRecipients(ctx contex } var tx = this.NullTx() - count, err := models.SharedMessageRecipientDAO.CountAllEnabledRecipients(tx, req.AdminId, req.GroupId, req.MediaType, req.Keyword) + count, err := models.SharedMessageRecipientDAO.CountAllEnabledRecipients(tx, req.AdminId, req.MessageRecipientGroupId, req.MediaType, req.Keyword) if err != nil { return nil, err } @@ -82,7 +82,7 @@ func (this *MessageRecipientService) ListEnabledMessageRecipients(ctx context.Co } var tx = this.NullTx() - recipients, err := models.SharedMessageRecipientDAO.ListAllEnabledRecipients(tx, req.AdminId, req.GroupId, req.MediaType, req.Keyword, req.Offset, req.Size) + recipients, err := models.SharedMessageRecipientDAO.ListAllEnabledRecipients(tx, req.AdminId, req.MessageRecipientGroupId, req.MediaType, req.Keyword, req.Offset, req.Size) if err != nil { return nil, err } diff --git a/internal/rpc/services/service_message_task_log.go b/internal/rpc/services/service_message_task_log.go index 7eb69e3d..08818201 100644 --- a/internal/rpc/services/service_message_task_log.go +++ b/internal/rpc/services/service_message_task_log.go @@ -25,7 +25,7 @@ func (this *MessageTaskLogService) CountMessageTaskLogs(ctx context.Context, req return this.SuccessCount(count) } -// 列出当页日志 +// ListMessageTaskLogs 列出当页日志 func (this *MessageTaskLogService) ListMessageTaskLogs(ctx context.Context, req *pb.ListMessageTaskLogsRequest) (*pb.ListMessageTaskLogsResponse, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil { @@ -58,6 +58,7 @@ func (this *MessageTaskLogService) ListMessageTaskLogs(ctx context.Context, req Id: int64(recipient.Id), User: recipient.User, } + task.InstanceId = recipient.InstanceId } } diff --git a/internal/rpc/services/service_node_stream.go b/internal/rpc/services/service_node_stream.go index d7052335..3c4758ad 100644 --- a/internal/rpc/services/service_node_stream.go +++ b/internal/rpc/services/service_node_stream.go @@ -65,7 +65,7 @@ func init() { }() } -// 节点stream +// NodeStream 节点stream func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) error { // TODO 使用此stream快速通知边缘节点更新 // 校验节点 @@ -114,7 +114,13 @@ func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) erro if err != nil { return err } - err = models.SharedMessageDAO.CreateNodeMessage(tx, clusterId, nodeId, models.MessageTypeNodeActive, models.MessageLevelSuccess, "节点已经恢复在线", nil) + nodeName, err := models.SharedNodeDAO.FindNodeName(tx, nodeId) + if err != nil { + return err + } + subject := "节点\"" + nodeName + "\"已经恢复在线" + msg := "节点\"" + nodeName + "\"已经恢复在线" + err = models.SharedMessageDAO.CreateNodeMessage(tx, clusterId, nodeId, models.MessageTypeNodeActive, models.MessageLevelSuccess, subject, msg, nil) if err != nil { return err } @@ -190,7 +196,7 @@ func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) erro } } -// 向节点发送命令 +// SendCommandToNode 向节点发送命令 func (this *NodeService) SendCommandToNode(ctx context.Context, req *pb.NodeStreamMessage) (*pb.NodeStreamMessage, error) { // 校验请求 _, _, err := this.ValidateAdminAndUser(ctx, 0, 0) diff --git a/internal/rpc/services/service_server.go b/internal/rpc/services/service_server.go index 969c9cbc..e701ae0b 100644 --- a/internal/rpc/services/service_server.go +++ b/internal/rpc/services/service_server.go @@ -446,14 +446,18 @@ func (this *ServerService) UpdateServerNamesAuditing(ctx context.Context, req *p _, userId, err := models.SharedServerDAO.FindServerAdminIdAndUserId(tx, req.ServerId) if userId > 0 { if req.AuditingResult.IsOk { - err = models.SharedMessageDAO.CreateMessage(tx, 0, userId, models.MessageTypeServerNamesAuditingSuccess, models.MessageLevelSuccess, "服务域名审核通过", maps.Map{ + subject := "服务域名审核通过" + msg := "服务域名审核通过" + err = models.SharedMessageDAO.CreateMessage(tx, 0, userId, models.MessageTypeServerNamesAuditingSuccess, models.MessageLevelSuccess, subject, msg, maps.Map{ "serverId": req.ServerId, }.AsJSON()) if err != nil { return nil, err } } else { - err = models.SharedMessageDAO.CreateMessage(tx, 0, userId, models.MessageTypeServerNamesAuditingFailed, models.LevelError, "服务域名审核失败,原因:"+req.AuditingResult.Reason, maps.Map{ + subject := "服务域名审核失败" + msg := "服务域名审核失败,原因:" + req.AuditingResult.Reason + err = models.SharedMessageDAO.CreateMessage(tx, 0, userId, models.MessageTypeServerNamesAuditingFailed, models.LevelError, subject, msg, maps.Map{ "serverId": req.ServerId, }.AsJSON()) if err != nil { diff --git a/internal/tasks/health_check_cluster_task.go b/internal/tasks/health_check_cluster_task.go index 33606646..257b5443 100644 --- a/internal/tasks/health_check_cluster_task.go +++ b/internal/tasks/health_check_cluster_task.go @@ -132,7 +132,7 @@ func (this *HealthCheckClusterTask) loop(seconds int64) error { return err } message := "有" + numberutils.FormatInt(len(failedResults)) + "个节点在健康检查中出现问题" - err = models.NewMessageDAO().CreateClusterMessage(nil, this.clusterId, models.MessageTypeHealthCheckFailed, models.MessageLevelError, message, failedResultsJSON) + err = models.NewMessageDAO().CreateClusterMessage(nil, this.clusterId, models.MessageTypeHealthCheckFailed, models.MessageLevelError, message, message, failedResultsJSON) if err != nil { return err } @@ -141,7 +141,7 @@ func (this *HealthCheckClusterTask) loop(seconds int64) error { return nil } -// 获取当前配置 +// Config 获取当前配置 func (this *HealthCheckClusterTask) Config() *serverconfigs.HealthCheckConfig { return this.config } diff --git a/internal/tasks/health_check_executor.go b/internal/tasks/health_check_executor.go index 77155b43..442bec1b 100644 --- a/internal/tasks/health_check_executor.go +++ b/internal/tasks/health_check_executor.go @@ -136,9 +136,11 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) { } else if isChanged { // 通知恢复或下线 if result.IsOk { - err = models.NewMessageDAO().CreateNodeMessage(nil, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeUp, models.MessageLevelSuccess, "健康检查成功,节点\""+result.Node.Name+"\"已恢复上线", nil) + message := "健康检查成功,节点\"" + result.Node.Name + "\"已恢复上线" + err = models.NewMessageDAO().CreateNodeMessage(nil, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeUp, models.MessageLevelSuccess, message, message, nil) } else { - err = models.NewMessageDAO().CreateNodeMessage(nil, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeDown, models.MessageLevelError, "健康检查失败,节点\""+result.Node.Name+"\"已自动下线", nil) + message := "健康检查失败,节点\"" + result.Node.Name + "\"已自动下线" + err = models.NewMessageDAO().CreateNodeMessage(nil, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeDown, models.MessageLevelError, message, message, nil) } } } diff --git a/internal/tasks/log_task.go b/internal/tasks/log_task.go index 88a1eb38..d896f505 100644 --- a/internal/tasks/log_task.go +++ b/internal/tasks/log_task.go @@ -132,7 +132,7 @@ func (this *LogTask) loopMonitor(seconds int64) error { return err } if sumBytes > capacityBytes { - err := models.SharedMessageDAO.CreateMessage(nil, 0, 0, models.MessageTypeLogCapacityOverflow, models.MessageLevelError, "日志用量已经超出最大限制,当前的用量为"+this.formatBytes(sumBytes)+",而设置的最大容量为"+this.formatBytes(capacityBytes)+"。", nil) + err := models.SharedMessageDAO.CreateMessage(nil, 0, 0, models.MessageTypeLogCapacityOverflow, models.MessageLevelError, "日志用量已经超出最大限制", "日志用量已经超出最大限制,当前的用量为"+this.formatBytes(sumBytes)+",而设置的最大容量为"+this.formatBytes(capacityBytes)+"。", nil) if err != nil { return err } diff --git a/internal/tasks/node_monitor_task.go b/internal/tasks/node_monitor_task.go index 51be4999..42d23204 100644 --- a/internal/tasks/node_monitor_task.go +++ b/internal/tasks/node_monitor_task.go @@ -24,7 +24,7 @@ func init() { }) } -// 健康节点任务 +// NodeMonitorTask 健康节点任务 type NodeMonitorTask struct { intervalSeconds int } @@ -80,7 +80,9 @@ func (this *NodeMonitorTask) monitorCluster(cluster *models.NodeCluster) error { return err } for _, node := range inactiveNodes { - err = models.SharedMessageDAO.CreateNodeMessage(nil, clusterId, int64(node.Id), models.MessageTypeNodeInactive, models.LevelError, "节点已处于离线状态", nil) + subject := "节点\"" + node.Name + "\"已处于离线状态" + msg := "节点\"" + node.Name + "\"已处于离线状态" + err = models.SharedMessageDAO.CreateNodeMessage(nil, clusterId, int64(node.Id), models.MessageTypeNodeInactive, models.LevelError, subject, msg, nil) if err != nil { return err } diff --git a/internal/tasks/ssl_cert_expire_check_executor.go b/internal/tasks/ssl_cert_expire_check_executor.go index 10fa068e..84fa6c0b 100644 --- a/internal/tasks/ssl_cert_expire_check_executor.go +++ b/internal/tasks/ssl_cert_expire_check_executor.go @@ -18,7 +18,7 @@ func init() { }) } -// 证书检查任务 +// SSLCertExpireCheckExecutor 证书检查任务 type SSLCertExpireCheckExecutor struct { } @@ -26,7 +26,7 @@ func NewSSLCertExpireCheckExecutor() *SSLCertExpireCheckExecutor { return &SSLCertExpireCheckExecutor{} } -// 启动任务 +// Start 启动任务 func (this *SSLCertExpireCheckExecutor) Start() { seconds := int64(3600) ticker := time.NewTicker(time.Duration(seconds) * time.Second) @@ -66,6 +66,7 @@ func (this *SSLCertExpireCheckExecutor) loop(seconds int64) error { } for _, cert := range certs { // 发送消息 + subject := "SSL证书\"" + cert.Name + "\"在" + strconv.Itoa(days) + "天后将到期," msg := "SSL证书\"" + cert.Name + "\"(" + cert.DnsNames + ")在" + strconv.Itoa(days) + "天后将到期," // 是否有自动更新任务 @@ -85,7 +86,7 @@ func (this *SSLCertExpireCheckExecutor) loop(seconds int64) error { msg += "请及时更新证书。" } - err = models.SharedMessageDAO.CreateMessage(nil, int64(cert.AdminId), int64(cert.UserId), models.MessageTypeSSLCertExpiring, models.MessageLevelWarning, msg, maps.Map{ + err = models.SharedMessageDAO.CreateMessage(nil, int64(cert.AdminId), int64(cert.UserId), models.MessageTypeSSLCertExpiring, models.MessageLevelWarning, subject, msg, maps.Map{ "certId": cert.Id, "acmeTaskId": cert.AcmeTaskId, }.AsJSON()) @@ -109,6 +110,7 @@ func (this *SSLCertExpireCheckExecutor) loop(seconds int64) error { } for _, cert := range certs { // 发送消息 + subject := "SSL证书\"" + cert.Name + "\"在" + strconv.Itoa(days) + "天后将到期," msg := "SSL证书\"" + cert.Name + "\"(" + cert.DnsNames + ")在" + strconv.Itoa(days) + "天后将到期," // 是否有自动更新任务 @@ -122,8 +124,9 @@ func (this *SSLCertExpireCheckExecutor) loop(seconds int64) error { isOk, errMsg, _ := acme.SharedACMETaskDAO.RunTask(nil, int64(cert.AcmeTaskId)) if isOk { // 发送成功通知 + subject := "系统已成功为你自动更新了证书\"" + cert.Name + "\"" msg = "系统已成功为你自动更新了证书\"" + cert.Name + "\"(" + cert.DnsNames + ")。" - err = models.SharedMessageDAO.CreateMessage(nil, int64(cert.AdminId), int64(cert.UserId), models.MessageTypeSSLCertACMETaskSuccess, models.MessageLevelSuccess, msg, maps.Map{ + err = models.SharedMessageDAO.CreateMessage(nil, int64(cert.AdminId), int64(cert.UserId), models.MessageTypeSSLCertACMETaskSuccess, models.MessageLevelSuccess, subject, msg, maps.Map{ "certId": cert.Id, "acmeTaskId": cert.AcmeTaskId, }.AsJSON()) @@ -135,8 +138,9 @@ func (this *SSLCertExpireCheckExecutor) loop(seconds int64) error { } } else { // 发送失败通知 + subject := "系统在尝试自动更新证书\"" + cert.Name + "\"时发生错误" msg = "系统在尝试自动更新证书\"" + cert.Name + "\"(" + cert.DnsNames + ")时发生错误:" + errMsg + "。请检查系统设置并修复错误。" - err = models.SharedMessageDAO.CreateMessage(nil, int64(cert.AdminId), int64(cert.UserId), models.MessageTypeSSLCertACMETaskFailed, models.MessageLevelError, msg, maps.Map{ + err = models.SharedMessageDAO.CreateMessage(nil, int64(cert.AdminId), int64(cert.UserId), models.MessageTypeSSLCertACMETaskFailed, models.MessageLevelError, subject, msg, maps.Map{ "certId": cert.Id, "acmeTaskId": cert.AcmeTaskId, }.AsJSON()) @@ -159,7 +163,7 @@ func (this *SSLCertExpireCheckExecutor) loop(seconds int64) error { msg += "请及时更新证书。" } - err = models.SharedMessageDAO.CreateMessage(nil, int64(cert.AdminId), int64(cert.UserId), models.MessageTypeSSLCertExpiring, models.MessageLevelWarning, msg, maps.Map{ + err = models.SharedMessageDAO.CreateMessage(nil, int64(cert.AdminId), int64(cert.UserId), models.MessageTypeSSLCertExpiring, models.MessageLevelWarning, subject, msg, maps.Map{ "certId": cert.Id, "acmeTaskId": cert.AcmeTaskId, }.AsJSON()) @@ -184,8 +188,9 @@ func (this *SSLCertExpireCheckExecutor) loop(seconds int64) error { for _, cert := range certs { // 发送消息 today := timeutil.Format("Y-m-d") + subject := "SSL证书\"" + cert.Name + "\"在今天(" + today + ")过期" msg := "SSL证书\"" + cert.Name + "\"(" + cert.DnsNames + ")在今天(" + today + ")过期,请及时更新证书,之后将不再重复提醒。" - err = models.SharedMessageDAO.CreateMessage(nil, int64(cert.AdminId), int64(cert.UserId), models.MessageTypeSSLCertExpiring, models.MessageLevelWarning, msg, maps.Map{ + err = models.SharedMessageDAO.CreateMessage(nil, int64(cert.AdminId), int64(cert.UserId), models.MessageTypeSSLCertExpiring, models.MessageLevelWarning, subject, msg, maps.Map{ "certId": cert.Id, "acmeTaskId": cert.AcmeTaskId, }.AsJSON())