diff --git a/internal/db/models/message_dao.go b/internal/db/models/message_dao.go index 42942733..2cf16a2e 100644 --- a/internal/db/models/message_dao.go +++ b/internal/db/models/message_dao.go @@ -37,6 +37,7 @@ const ( MessageTypeLogCapacityOverflow MessageType = "LogCapacityOverflow" // 日志超出最大限制 MessageTypeServerNamesAuditingSuccess MessageType = "ServerNamesAuditingSuccess" // 服务域名审核成功 MessageTypeServerNamesAuditingFailed MessageType = "ServerNamesAuditingFailed" // 服务域名审核失败 + MessageTypeThresholdSatisfied MessageType = "ThresholdSatisfied" // 满足阈值 ) type MessageDAO dbs.DAO @@ -112,7 +113,20 @@ func (this *MessageDAO) CreateClusterMessage(tx *dbs.Tx, clusterId int64, messag // CreateNodeMessage 创建节点消息 func (this *MessageDAO) CreateNodeMessage(tx *dbs.Tx, clusterId int64, nodeId int64, messageType MessageType, level string, subject string, body string, paramsJSON []byte) error { - _, err := this.createMessage(tx, clusterId, nodeId, messageType, level, subject, body, paramsJSON) + // 检查N分钟内是否已经发送过 + hash := this.calHash(subject, body, paramsJSON) + exists, err := this.Query(tx). + Attr("hash", hash). + Gt("createdAt", time.Now().Unix()-10*60). // 10分钟 + Exist() + if err != nil { + return err + } + if exists { + return nil + } + + _, err = this.createMessage(tx, clusterId, nodeId, messageType, level, subject, body, paramsJSON) if err != nil { return err } @@ -144,11 +158,6 @@ func (this *MessageDAO) CreateNodeMessage(tx *dbs.Tx, clusterId int64, nodeId in // CreateMessage 创建普通消息 func (this *MessageDAO) CreateMessage(tx *dbs.Tx, adminId int64, userId int64, messageType MessageType, level string, subject string, body string, paramsJSON []byte) error { - h := md5.New() - h.Write([]byte(body)) - h.Write(paramsJSON) - hash := fmt.Sprintf("%x", h.Sum(nil)) - op := NewMessageOperator() op.AdminId = adminId op.UserId = userId @@ -169,7 +178,7 @@ func (this *MessageDAO) CreateMessage(tx *dbs.Tx, adminId int64, userId int64, m op.State = MessageStateEnabled op.IsRead = false op.Day = timeutil.Format("Ymd") - op.Hash = hash + op.Hash = this.calHash(subject, body, paramsJSON) err := this.Save(tx, op) if err != nil { return err @@ -278,11 +287,6 @@ func (this *MessageDAO) CheckMessageUser(tx *dbs.Tx, messageId int64, adminId in // 创建消息 func (this *MessageDAO) createMessage(tx *dbs.Tx, clusterId int64, nodeId int64, messageType MessageType, level string, subject string, body string, paramsJSON []byte) (int64, error) { - h := md5.New() - h.Write([]byte(body)) - h.Write(paramsJSON) - hash := fmt.Sprintf("%x", h.Sum(nil)) - // TODO 检查同样的消息最近是否发送过 // 创建新消息 @@ -309,7 +313,7 @@ func (this *MessageDAO) createMessage(tx *dbs.Tx, clusterId int64, nodeId int64, op.State = MessageStateEnabled op.CreatedAt = time.Now().Unix() op.Day = timeutil.Format("Ymd") - op.Hash = hash + op.Hash = this.calHash(subject, body, paramsJSON) err := this.Save(tx, op) if err != nil { @@ -317,3 +321,12 @@ func (this *MessageDAO) createMessage(tx *dbs.Tx, clusterId int64, nodeId int64, } return types.Int64(op.Id), nil } + +// 计算Hash +func (this *MessageDAO) calHash(subject string, body string, paramsJSON []byte) string { + h := md5.New() + h.Write([]byte(subject)) + h.Write([]byte(body)) + h.Write(paramsJSON) + return fmt.Sprintf("%x", h.Sum(nil)) +} diff --git a/internal/db/models/node_threshold_dao.go b/internal/db/models/node_threshold_dao.go index 2b91ba87..cd780f3b 100644 --- a/internal/db/models/node_threshold_dao.go +++ b/internal/db/models/node_threshold_dao.go @@ -1,11 +1,17 @@ package models import ( + "fmt" "github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/maps" + "github.com/iwind/TeaGo/types" + timeutil "github.com/iwind/TeaGo/utils/time" + "strings" + "time" ) const ( @@ -65,8 +71,9 @@ func (this *NodeThresholdDAO) FindEnabledNodeThreshold(tx *dbs.Tx, id int64) (*N } // CreateThreshold 创建阈值 -func (this *NodeThresholdDAO) CreateThreshold(tx *dbs.Tx, clusterId int64, nodeId int64, item nodeconfigs.NodeValueItem, param string, operator nodeconfigs.NodeValueOperator, valueJSON []byte, message string, sumMethod nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (int64, error) { +func (this *NodeThresholdDAO) CreateThreshold(tx *dbs.Tx, role string, clusterId int64, nodeId int64, item nodeconfigs.NodeValueItem, param string, operator nodeconfigs.NodeValueOperator, valueJSON []byte, message string, sumMethod nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit, notifyDuration int32) (int64, error) { op := NewNodeThresholdOperator() + op.Role = role op.ClusterId = clusterId op.NodeId = nodeId op.Item = item @@ -77,13 +84,14 @@ func (this *NodeThresholdDAO) CreateThreshold(tx *dbs.Tx, clusterId int64, nodeI op.SumMethod = sumMethod op.Duration = duration op.DurationUnit = durationUnit + op.NotifyDuration = notifyDuration op.IsOn = true op.State = NodeThresholdStateEnabled return this.SaveInt64(tx, op) } // UpdateThreshold 修改阈值 -func (this *NodeThresholdDAO) UpdateThreshold(tx *dbs.Tx, thresholdId int64, item nodeconfigs.NodeValueItem, param string, operator nodeconfigs.NodeValueOperator, valueJSON []byte, message string, sumMethod nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit, isOn bool) error { +func (this *NodeThresholdDAO) UpdateThreshold(tx *dbs.Tx, thresholdId int64, item nodeconfigs.NodeValueItem, param string, operator nodeconfigs.NodeValueOperator, valueJSON []byte, message string, sumMethod nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit, notifyDuration int32, isOn bool) error { if thresholdId <= 0 { return errors.New("invalid thresholdId") } @@ -97,12 +105,13 @@ func (this *NodeThresholdDAO) UpdateThreshold(tx *dbs.Tx, thresholdId int64, ite op.SumMethod = sumMethod op.Duration = duration op.DurationUnit = durationUnit + op.NotifyDuration = notifyDuration op.IsOn = isOn return this.Save(tx, op) } // FindAllEnabledThresholds 列出所有阈值 -func (this *NodeThresholdDAO) FindAllEnabledThresholds(tx *dbs.Tx, clusterId int64, nodeId int64) (result []*NodeThreshold, err error) { +func (this *NodeThresholdDAO) FindAllEnabledThresholds(tx *dbs.Tx, role string, clusterId int64, nodeId int64) (result []*NodeThreshold, err error) { if clusterId <= 0 && nodeId <= 0 { return } @@ -116,11 +125,51 @@ func (this *NodeThresholdDAO) FindAllEnabledThresholds(tx *dbs.Tx, clusterId int query.State(NodeThresholdStateEnabled) query.Slice(&result) _, err = query. + Attr("role", role). + Asc("IF(nodeId>0, 1, 0)"). + Desc("order"). AscPk(). FindAll() return } +// FindAllEnabledAndOnClusterThresholds 查询集群专属的阈值设置 +func (this *NodeThresholdDAO) FindAllEnabledAndOnClusterThresholds(tx *dbs.Tx, role string, clusterId int64, item string) (result []*NodeThreshold, err error) { + if clusterId <= 0 { + return + } + _, err = this.Query(tx). + Attr("role", role). + Attr("clusterId", clusterId). + Attr("nodeId", 0). + Attr("item", item). + Attr("isOn", true). + State(NodeThresholdStateEnabled). + Desc("order"). + AscPk(). + Slice(&result). + FindAll() + return +} + +// FindAllEnabledAndOnNodeThresholds 查询节点专属的阈值设置 +func (this *NodeThresholdDAO) FindAllEnabledAndOnNodeThresholds(tx *dbs.Tx, role string, nodeId int64, item string) (result []*NodeThreshold, err error) { + if nodeId <= 0 { + return + } + _, err = this.Query(tx). + Attr("role", role). + Attr("nodeId", nodeId). + Attr("item", item). + Attr("isOn", true). + State(NodeThresholdStateEnabled). + Desc("order"). + AscPk(). + Slice(&result). + FindAll() + return +} + // CountAllEnabledThresholds 计算阈值的数量 func (this *NodeThresholdDAO) CountAllEnabledThresholds(tx *dbs.Tx, clusterId int64, nodeId int64) (int64, error) { if clusterId <= 0 && nodeId <= 0 { @@ -136,3 +185,87 @@ func (this *NodeThresholdDAO) CountAllEnabledThresholds(tx *dbs.Tx, clusterId in query.State(NodeThresholdStateEnabled) return query.Count() } + +// FireNodeThreshold 触发相关阈值设置 +func (this *NodeThresholdDAO) FireNodeThreshold(tx *dbs.Tx, role string, nodeId int64, item string) error { + clusterId, err := SharedNodeDAO.FindNodeClusterId(tx, nodeId) + if err != nil { + return err + } + if clusterId == 0 { + return nil + } + + // 集群相关阈值 + var thresholds []*NodeThreshold + { + clusterThresholds, err := this.FindAllEnabledAndOnClusterThresholds(tx, role, clusterId, item) + if err != nil { + return err + } + thresholds = append(thresholds, clusterThresholds...) + } + + // 节点相关阈值 + { + nodeThresholds, err := this.FindAllEnabledAndOnNodeThresholds(tx, role, nodeId, item) + if err != nil { + return err + } + thresholds = append(thresholds, nodeThresholds...) + } + + if len(thresholds) > 0 { + for _, threshold := range thresholds { + if len(threshold.Param) == 0 || threshold.Duration <= 0 { + continue + } + paramValue, err := SharedNodeValueDAO.SumValues(tx, role, nodeId, item, threshold.Param, threshold.SumMethod, types.Int32(threshold.Duration), threshold.DurationUnit) + if err != nil { + return err + } + originValue := nodeconfigs.UnmarshalNodeValue([]byte(threshold.Value)) + thresholdValue := types.Float64(originValue) + isMatched := nodeconfigs.CompareNodeValue(threshold.Operator, paramValue, thresholdValue) + if isMatched { + // TODO 执行其他动作 + + // 是否已经通知过 + if threshold.NotifyDuration > 0 && threshold.NotifiedAt > 0 && time.Now().Unix()-int64(threshold.NotifiedAt) < int64(threshold.NotifyDuration*60) { + continue + } + + // 创建消息 + nodeName, err := SharedNodeDAO.FindNodeName(tx, nodeId) + if err != nil { + return err + } + itemName := nodeconfigs.FindNodeValueItemName(threshold.Item) + paramName := nodeconfigs.FindNodeValueItemParamName(threshold.Item, threshold.Param) + operatorName := nodeconfigs.FindNodeValueOperatorName(threshold.Operator) + + subject := "节点 \"" + nodeName + "\" " + itemName + " 达到阈值" + body := "节点 \"" + nodeName + "\" " + itemName + " 达到阈值\n阈值设置:" + paramName + " " + operatorName + " " + originValue + "\n当前值:" + fmt.Sprintf("%.2f", paramValue) + "\n触发时间:" + timeutil.Format("Y-m-d H:i:s") + if len(threshold.Message) > 0 { + body = threshold.Message + body = strings.Replace(body, "${item.name}", itemName, -1) + body = strings.Replace(body, "${value}", fmt.Sprintf("%.2f", paramValue), -1) + } + err = SharedMessageDAO.CreateNodeMessage(tx, clusterId, nodeId, MessageTypeThresholdSatisfied, MessageLevelWarning, subject, body, maps.Map{}.AsJSON()) + if err != nil { + return err + } + + // 设置通知时间 + _, err = this.Query(tx). + Pk(threshold.Id). + Set("notifiedAt", time.Now().Unix()). + Update() + if err != nil { + return err + } + } + } + } + return nil +} diff --git a/internal/db/models/node_threshold_model.go b/internal/db/models/node_threshold_model.go index 44468767..5f33c379 100644 --- a/internal/db/models/node_threshold_model.go +++ b/internal/db/models/node_threshold_model.go @@ -2,37 +2,43 @@ package models // NodeThreshold 集群阈值设置 type NodeThreshold struct { - Id uint64 `field:"id"` // ID - ClusterId uint32 `field:"clusterId"` // 集群ID - NodeId uint32 `field:"nodeId"` // 节点ID - IsOn uint8 `field:"isOn"` // 是否启用 - Item string `field:"item"` // 监控项 - Param string `field:"param"` // 参数 - Operator string `field:"operator"` // 操作符 - Value string `field:"value"` // 对比值 - Message string `field:"message"` // 消息内容 - State uint8 `field:"state"` // 状态 - Duration uint32 `field:"duration"` // 时间段 - DurationUnit string `field:"durationUnit"` // 时间段单位 - SumMethod string `field:"sumMethod"` // 聚合方法 - Order uint32 `field:"order"` // 排序 + Id uint64 `field:"id"` // ID + Role string `field:"role"` // 节点角色 + ClusterId uint32 `field:"clusterId"` // 集群ID + NodeId uint32 `field:"nodeId"` // 节点ID + IsOn uint8 `field:"isOn"` // 是否启用 + Item string `field:"item"` // 监控项 + Param string `field:"param"` // 参数 + Operator string `field:"operator"` // 操作符 + Value string `field:"value"` // 对比值 + Message string `field:"message"` // 消息内容 + NotifyDuration uint32 `field:"notifyDuration"` // 通知间隔 + NotifiedAt uint32 `field:"notifiedAt"` // 上次通知时间 + Duration uint32 `field:"duration"` // 时间段 + DurationUnit string `field:"durationUnit"` // 时间段单位 + SumMethod string `field:"sumMethod"` // 聚合方法 + Order uint32 `field:"order"` // 排序 + State uint8 `field:"state"` // 状态 } type NodeThresholdOperator struct { - Id interface{} // ID - ClusterId interface{} // 集群ID - NodeId interface{} // 节点ID - IsOn interface{} // 是否启用 - Item interface{} // 监控项 - Param interface{} // 参数 - Operator interface{} // 操作符 - Value interface{} // 对比值 - Message interface{} // 消息内容 - State interface{} // 状态 - Duration interface{} // 时间段 - DurationUnit interface{} // 时间段单位 - SumMethod interface{} // 聚合方法 - Order interface{} // 排序 + Id interface{} // ID + Role interface{} // 节点角色 + ClusterId interface{} // 集群ID + NodeId interface{} // 节点ID + IsOn interface{} // 是否启用 + Item interface{} // 监控项 + Param interface{} // 参数 + Operator interface{} // 操作符 + Value interface{} // 对比值 + Message interface{} // 消息内容 + NotifyDuration interface{} // 通知间隔 + NotifiedAt interface{} // 上次通知时间 + Duration interface{} // 时间段 + DurationUnit interface{} // 时间段单位 + SumMethod interface{} // 聚合方法 + Order interface{} // 排序 + State interface{} // 状态 } func NewNodeThresholdOperator() *NodeThresholdOperator { diff --git a/internal/db/models/node_value_dao.go b/internal/db/models/node_value_dao.go index 01651dc5..ffef4f46 100644 --- a/internal/db/models/node_value_dao.go +++ b/internal/db/models/node_value_dao.go @@ -89,3 +89,32 @@ func (this *NodeValueDAO) ListValues(tx *dbs.Tx, role string, nodeId int64, item FindAll() return } + +// SumValues 计算某项参数值 +func (this *NodeValueDAO) SumValues(tx *dbs.Tx, role string, nodeId int64, item string, param string, method nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (float64, error) { + if duration <= 0 { + return 0, nil + } + + query := this.Query(tx). + Attr("role", role). + Attr("nodeId", nodeId). + Attr("item", item) + switch method { + case nodeconfigs.NodeValueSumMethodAvg: + query.Result("AVG(JSON_EXTRACT(value, '$." + param + "'))") + case nodeconfigs.NodeValueSumMethodSum: + query.Result("SUM(JSON_EXTRACT(value, '$." + param + "'))") + default: + query.Result("AVG(JSON_EXTRACT(value, '$." + param + "'))") + } + switch durationUnit { + case nodeconfigs.NodeValueDurationUnitMinute: + fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration * 60)) + query.Gte("minute", fromMinute) + default: + fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration * 60)) + query.Gte("minute", fromMinute) + } + return query.FindFloat64Col(0) +} diff --git a/internal/rpc/services/service_node_threshold.go b/internal/rpc/services/service_node_threshold.go index dbf7f6c6..b0ec7c47 100644 --- a/internal/rpc/services/service_node_threshold.go +++ b/internal/rpc/services/service_node_threshold.go @@ -5,6 +5,7 @@ package services import ( "context" "github.com/TeaOSLab/EdgeAPI/internal/db/models" + rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/iwind/TeaGo/types" ) @@ -21,7 +22,7 @@ func (this *NodeThresholdService) CreateNodeThreshold(ctx context.Context, req * return nil, err } var tx = this.NullTx() - thresholdId, err := models.SharedNodeThresholdDAO.CreateThreshold(tx, req.NodeClusterId, req.NodeId, req.Item, req.Param, req.Operator, req.ValueJSON, req.Message, req.SumMethod, req.Duration, req.DurationUnit) + thresholdId, err := models.SharedNodeThresholdDAO.CreateThreshold(tx, rpcutils.UserTypeNode, req.NodeClusterId, req.NodeId, req.Item, req.Param, req.Operator, req.ValueJSON, req.Message, req.SumMethod, req.Duration, req.DurationUnit, req.NotifyDuration) if err != nil { return nil, err } @@ -35,7 +36,7 @@ func (this *NodeThresholdService) UpdateNodeThreshold(ctx context.Context, req * return nil, err } var tx = this.NullTx() - err = models.SharedNodeThresholdDAO.UpdateThreshold(tx, req.NodeThresholdId, req.Item, req.Param, req.Operator, req.ValueJSON, req.Message, req.SumMethod, req.Duration, req.DurationUnit, req.IsOn) + err = models.SharedNodeThresholdDAO.UpdateThreshold(tx, req.NodeThresholdId, req.Item, req.Param, req.Operator, req.ValueJSON, req.Message, req.SumMethod, req.Duration, req.DurationUnit, req.NotifyDuration, req.IsOn) if err != nil { return nil, err } @@ -65,24 +66,41 @@ func (this *NodeThresholdService) FindAllEnabledNodeThresholds(ctx context.Conte var tx = this.NullTx() pbThresholds := []*pb.NodeThreshold{} - thresholds, err := models.SharedNodeThresholdDAO.FindAllEnabledThresholds(tx, req.NodeClusterId, req.NodeId) + thresholds, err := models.SharedNodeThresholdDAO.FindAllEnabledThresholds(tx, req.Role, req.NodeClusterId, req.NodeId) if err != nil { return nil, err } for _, threshold := range thresholds { + // 节点信息 + var pbNode *pb.Node = nil + if threshold.NodeId > 0 { + nodeName, err := models.SharedNodeDAO.FindNodeName(tx, int64(threshold.NodeId)) + if err != nil { + return nil, err + } + if len(nodeName) == 0 { + continue + } + pbNode = &pb.Node{ + Id: int64(threshold.NodeId), + Name: nodeName, + } + } + pbThresholds = append(pbThresholds, &pb.NodeThreshold{ - Id: int64(threshold.Id), - ClusterId: int64(threshold.ClusterId), - NodeId: int64(threshold.NodeId), - Item: threshold.Item, - Param: threshold.Param, - Operator: threshold.Operator, - ValueJSON: []byte(threshold.Value), - Message: threshold.Message, - Duration: types.Int32(threshold.Duration), - DurationUnit: threshold.DurationUnit, - SumMethod: threshold.SumMethod, - IsOn: threshold.IsOn == 1, + Id: int64(threshold.Id), + ClusterId: int64(threshold.ClusterId), + Node: pbNode, + Item: threshold.Item, + Param: threshold.Param, + Operator: threshold.Operator, + ValueJSON: []byte(threshold.Value), + Message: threshold.Message, + Duration: types.Int32(threshold.Duration), + DurationUnit: threshold.DurationUnit, + SumMethod: threshold.SumMethod, + NotifyDuration: int32(threshold.NotifyDuration), + IsOn: threshold.IsOn == 1, }) } return &pb.FindAllEnabledNodeThresholdsResponse{NodeThresholds: pbThresholds}, nil @@ -119,18 +137,35 @@ func (this *NodeThresholdService) FindEnabledNodeThreshold(ctx context.Context, return &pb.FindEnabledNodeThresholdResponse{NodeThreshold: nil}, nil } + // 节点信息 + var pbNode *pb.Node = nil + if threshold.NodeId > 0 { + nodeName, err := models.SharedNodeDAO.FindNodeName(tx, int64(threshold.NodeId)) + if err != nil { + return nil, err + } + if len(nodeName) == 0 { + return &pb.FindEnabledNodeThresholdResponse{NodeThreshold: nil}, nil + } + pbNode = &pb.Node{ + Id: int64(threshold.NodeId), + Name: nodeName, + } + } + return &pb.FindEnabledNodeThresholdResponse{NodeThreshold: &pb.NodeThreshold{ - Id: int64(threshold.Id), - ClusterId: int64(threshold.ClusterId), - NodeId: int64(threshold.NodeId), - Item: threshold.Item, - Param: threshold.Param, - Operator: threshold.Operator, - ValueJSON: []byte(threshold.Value), - Message: threshold.Message, - Duration: types.Int32(threshold.Duration), - DurationUnit: threshold.DurationUnit, - SumMethod: threshold.SumMethod, - IsOn: threshold.IsOn == 1, + Id: int64(threshold.Id), + ClusterId: int64(threshold.ClusterId), + Node: pbNode, + Item: threshold.Item, + Param: threshold.Param, + Operator: threshold.Operator, + ValueJSON: []byte(threshold.Value), + Message: threshold.Message, + Duration: types.Int32(threshold.Duration), + DurationUnit: threshold.DurationUnit, + SumMethod: threshold.SumMethod, + NotifyDuration: int32(threshold.NotifyDuration), + IsOn: threshold.IsOn == 1, }}, nil } diff --git a/internal/rpc/services/service_node_value.go b/internal/rpc/services/service_node_value.go index 4fdde4c6..8eb00c38 100644 --- a/internal/rpc/services/service_node_value.go +++ b/internal/rpc/services/service_node_value.go @@ -26,6 +26,12 @@ func (this *NodeValueService) CreateNodeValue(ctx context.Context, req *pb.Creat return nil, err } + // 触发阈值 + err = models.SharedNodeThresholdDAO.FireNodeThreshold(tx, role, nodeId, req.Item) + if err != nil { + return nil, err + } + return this.Success() }