diff --git a/internal/db/models/message_dao.go b/internal/db/models/message_dao.go index 2bf9ff52..e88b9837 100644 --- a/internal/db/models/message_dao.go +++ b/internal/db/models/message_dao.go @@ -118,21 +118,23 @@ func (this *MessageDAO) CreateClusterMessage(tx *dbs.Tx, role string, clusterId } // CreateNodeMessage 创建节点消息 -func (this *MessageDAO) CreateNodeMessage(tx *dbs.Tx, role string, clusterId int64, nodeId int64, messageType MessageType, level string, subject string, body string, paramsJSON []byte) error { +func (this *MessageDAO) CreateNodeMessage(tx *dbs.Tx, role string, clusterId int64, nodeId int64, messageType MessageType, level string, subject string, body string, paramsJSON []byte, force bool) error { // 检查N分钟内是否已经发送过 hash := this.calHash(role, clusterId, nodeId, subject, body, paramsJSON) - exists, err := this.Query(tx). - Attr("hash", hash). - Gt("createdAt", time.Now().Unix()-10*60). // 10分钟 - Exist() - if err != nil { - return err - } - if exists { - return nil + if !force { + exists, err := this.Query(tx). + Attr("hash", hash). + Gt("createdAt", time.Now().Unix()-10*60). // 10分钟 + Exist() + if err != nil { + return err + } + if exists { + return nil + } } - _, err = this.createMessage(tx, role, clusterId, nodeId, messageType, level, subject, body, paramsJSON) + _, err := this.createMessage(tx, role, clusterId, nodeId, messageType, level, subject, body, paramsJSON) if err != nil { return err } diff --git a/internal/db/models/node_dao.go b/internal/db/models/node_dao.go index ebb869f4..efd27b41 100644 --- a/internal/db/models/node_dao.go +++ b/internal/db/models/node_dao.go @@ -113,7 +113,7 @@ func (this *NodeDAO) FindEnabledBasicNode(tx *dbs.Tx, nodeId int64) (*Node, erro one, err := this.Query(tx). State(NodeStateEnabled). Pk(nodeId). - Result("id", "name", "clusterId", "isOn", "isUp"). + Result("id", "name", "clusterId", "groupId", "isOn", "isUp"). Find() if one == nil { return nil, err diff --git a/internal/db/models/node_threshold_dao.go b/internal/db/models/node_threshold_dao.go index 19628fe8..655a071f 100644 --- a/internal/db/models/node_threshold_dao.go +++ b/internal/db/models/node_threshold_dao.go @@ -221,7 +221,7 @@ func (this *NodeThresholdDAO) FireNodeThreshold(tx *dbs.Tx, role string, nodeId if len(threshold.Param) == 0 || threshold.Duration <= 0 { continue } - paramValue, err := SharedNodeValueDAO.SumValues(tx, role, nodeId, item, threshold.Param, threshold.SumMethod, types.Int32(threshold.Duration), threshold.DurationUnit) + paramValue, err := SharedNodeValueDAO.SumNodeValues(tx, role, nodeId, item, threshold.Param, threshold.SumMethod, types.Int32(threshold.Duration), threshold.DurationUnit) if err != nil { return err } @@ -252,7 +252,7 @@ func (this *NodeThresholdDAO) FireNodeThreshold(tx *dbs.Tx, role string, nodeId body = strings.Replace(body, "${item.name}", itemName, -1) body = strings.Replace(body, "${value}", fmt.Sprintf("%.2f", paramValue), -1) } - err = SharedMessageDAO.CreateNodeMessage(tx, role, clusterId, nodeId, MessageTypeThresholdSatisfied, MessageLevelWarning, subject, body, maps.Map{}.AsJSON()) + err = SharedMessageDAO.CreateNodeMessage(tx, role, clusterId, nodeId, MessageTypeThresholdSatisfied, MessageLevelWarning, subject, body, maps.Map{}.AsJSON(), true) if err != nil { return err } diff --git a/internal/db/models/node_value_dao.go b/internal/db/models/node_value_dao.go index 39b76390..b33f85ba 100644 --- a/internal/db/models/node_value_dao.go +++ b/internal/db/models/node_value_dao.go @@ -173,8 +173,8 @@ func (this *NodeValueDAO) ListValuesForNSNodes(tx *dbs.Tx, item string, key stri return } -// SumValues 计算某项参数值 -func (this *NodeValueDAO) SumValues(tx *dbs.Tx, role string, nodeId int64, item string, param string, method nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (float64, error) { +// SumNodeValues 计算节点的某项参数值 +func (this *NodeValueDAO) SumNodeValues(tx *dbs.Tx, role string, nodeId int64, item string, param string, method nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (float64, error) { if duration <= 0 { return 0, nil } @@ -202,6 +202,65 @@ func (this *NodeValueDAO) SumValues(tx *dbs.Tx, role string, nodeId int64, item return query.FindFloat64Col(0) } +// SumNodeGroupValues 计算节点分组的某项参数值 +func (this *NodeValueDAO) SumNodeGroupValues(tx *dbs.Tx, role string, groupId int64, item string, param string, method nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (float64, error) { + if duration <= 0 { + return 0, nil + } + + query := this.Query(tx). + Attr("role", role). + Where("nodeId IN (SELECT id FROM "+SharedNodeDAO.Table+" WHERE groupId=:groupId AND state=1)"). + Param("groupId", groupId). + Attr("item", item) + switch method { + case nodeconfigs.NodeValueSumMethodAvg: + query.Result("AVG(JSON_EXTRACT(value, '$." + param + "'))") + case nodeconfigs.NodeValueSumMethodSum: + query.Result("SUM(JSON_EXTRACT(value, '$." + param + "'))") + default: + query.Result("AVG(JSON_EXTRACT(value, '$." + param + "'))") + } + switch durationUnit { + case nodeconfigs.NodeValueDurationUnitMinute: + fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration*60)) + query.Gte("minute", fromMinute) + default: + fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration*60)) + query.Gte("minute", fromMinute) + } + return query.FindFloat64Col(0) +} + +// SumNodeClusterValues 计算节点集群的某项参数值 +func (this *NodeValueDAO) SumNodeClusterValues(tx *dbs.Tx, role string, clusterId int64, item string, param string, method nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (float64, error) { + if duration <= 0 { + return 0, nil + } + + query := this.Query(tx). + Attr("role", role). + Attr("clusterId", clusterId). + Attr("item", item) + switch method { + case nodeconfigs.NodeValueSumMethodAvg: + query.Result("AVG(JSON_EXTRACT(value, '$." + param + "'))") + case nodeconfigs.NodeValueSumMethodSum: + query.Result("SUM(JSON_EXTRACT(value, '$." + param + "'))") + default: + query.Result("AVG(JSON_EXTRACT(value, '$." + param + "'))") + } + switch durationUnit { + case nodeconfigs.NodeValueDurationUnitMinute: + fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration*60)) + query.Gte("minute", fromMinute) + default: + fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration*60)) + query.Gte("minute", fromMinute) + } + return query.FindFloat64Col(0) +} + // FindLatestNodeValue 获取最近一条数据 func (this *NodeValueDAO) FindLatestNodeValue(tx *dbs.Tx, role string, nodeId int64, item string) (*NodeValue, error) { one, err := this.Query(tx). diff --git a/internal/rpc/services/nameservers/service_ns_node_stream.go b/internal/rpc/services/nameservers/service_ns_node_stream.go index 51ffe605..48a9292e 100644 --- a/internal/rpc/services/nameservers/service_ns_node_stream.go +++ b/internal/rpc/services/nameservers/service_ns_node_stream.go @@ -156,7 +156,7 @@ func (this *NSNodeService) NsNodeStream(server pb.NSNodeService_NsNodeStreamServ } subject := "DNS节点\"" + nodeName + "\"已经恢复在线" msg := "DNS节点\"" + nodeName + "\"已经恢复在线" - err = models.SharedMessageDAO.CreateNodeMessage(tx, nodeconfigs.NodeRoleDNS, clusterId, nodeId, models.MessageTypeNSNodeActive, models.MessageLevelSuccess, subject, msg, nil) + err = models.SharedMessageDAO.CreateNodeMessage(tx, nodeconfigs.NodeRoleDNS, clusterId, nodeId, models.MessageTypeNSNodeActive, models.MessageLevelSuccess, subject, msg, nil, false) if err != nil { return err } diff --git a/internal/rpc/services/service_node_stream.go b/internal/rpc/services/service_node_stream.go index b86b6e0f..2ddf4d2c 100644 --- a/internal/rpc/services/service_node_stream.go +++ b/internal/rpc/services/service_node_stream.go @@ -120,7 +120,7 @@ func (this *NodeService) NodeStream(server pb.NodeService_NodeStreamServer) erro } subject := "节点\"" + nodeName + "\"已经恢复在线" msg := "节点\"" + nodeName + "\"已经恢复在线" - err = models.SharedMessageDAO.CreateNodeMessage(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, models.MessageTypeNodeActive, models.MessageLevelSuccess, subject, msg, nil) + err = models.SharedMessageDAO.CreateNodeMessage(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, models.MessageTypeNodeActive, models.MessageLevelSuccess, subject, msg, nil, false) if err != nil { return err } diff --git a/internal/tasks/health_check_executor.go b/internal/tasks/health_check_executor.go index f3dd1f37..8fd5be64 100644 --- a/internal/tasks/health_check_executor.go +++ b/internal/tasks/health_check_executor.go @@ -140,10 +140,10 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) { // 通知恢复或下线 if result.IsOk { message := "健康检查成功,节点\"" + result.Node.Name + "\"已恢复上线" - err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeUp, models.MessageLevelSuccess, message, message, nil) + err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeUp, models.MessageLevelSuccess, message, message, nil, false) } else { message := "健康检查失败,节点\"" + result.Node.Name + "\"已自动下线" - err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeDown, models.MessageLevelError, message, message, nil) + err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), models.MessageTypeHealthCheckNodeDown, models.MessageLevelError, message, message, nil, false) } } } diff --git a/internal/tasks/node_monitor_task.go b/internal/tasks/node_monitor_task.go index a29775f6..1835c0cf 100644 --- a/internal/tasks/node_monitor_task.go +++ b/internal/tasks/node_monitor_task.go @@ -83,7 +83,7 @@ func (this *NodeMonitorTask) monitorCluster(cluster *models.NodeCluster) error { for _, node := range inactiveNodes { subject := "节点\"" + node.Name + "\"已处于离线状态" msg := "节点\"" + node.Name + "\"已处于离线状态" - err = models.SharedMessageDAO.CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, clusterId, int64(node.Id), models.MessageTypeNodeInactive, models.LevelError, subject, msg, nil) + err = models.SharedMessageDAO.CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, clusterId, int64(node.Id), models.MessageTypeNodeInactive, models.LevelError, subject, msg, nil, false) if err != nil { return err } diff --git a/internal/tasks/ns_node_monitor_task.go b/internal/tasks/ns_node_monitor_task.go index 537317bf..11c2c4a0 100644 --- a/internal/tasks/ns_node_monitor_task.go +++ b/internal/tasks/ns_node_monitor_task.go @@ -83,7 +83,7 @@ func (this *NSNodeMonitorTask) monitorCluster(cluster *models.NSCluster) error { for _, node := range inactiveNodes { subject := "DNS节点\"" + node.Name + "\"已处于离线状态" msg := "DNS节点\"" + node.Name + "\"已处于离线状态" - err = models.SharedMessageDAO.CreateNodeMessage(nil, nodeconfigs.NodeRoleDNS, clusterId, int64(node.Id), models.MessageTypeNSNodeInactive, models.LevelError, subject, msg, nil) + err = models.SharedMessageDAO.CreateNodeMessage(nil, nodeconfigs.NodeRoleDNS, clusterId, int64(node.Id), models.MessageTypeNSNodeInactive, models.LevelError, subject, msg, nil, false) if err != nil { return err }