mirror of
https://github.com/TeaOSLab/EdgeAPI.git
synced 2025-11-11 04:50:25 +08:00
实现阈值通知消息
This commit is contained in:
@@ -37,6 +37,7 @@ const (
|
|||||||
MessageTypeLogCapacityOverflow MessageType = "LogCapacityOverflow" // 日志超出最大限制
|
MessageTypeLogCapacityOverflow MessageType = "LogCapacityOverflow" // 日志超出最大限制
|
||||||
MessageTypeServerNamesAuditingSuccess MessageType = "ServerNamesAuditingSuccess" // 服务域名审核成功
|
MessageTypeServerNamesAuditingSuccess MessageType = "ServerNamesAuditingSuccess" // 服务域名审核成功
|
||||||
MessageTypeServerNamesAuditingFailed MessageType = "ServerNamesAuditingFailed" // 服务域名审核失败
|
MessageTypeServerNamesAuditingFailed MessageType = "ServerNamesAuditingFailed" // 服务域名审核失败
|
||||||
|
MessageTypeThresholdSatisfied MessageType = "ThresholdSatisfied" // 满足阈值
|
||||||
)
|
)
|
||||||
|
|
||||||
type MessageDAO dbs.DAO
|
type MessageDAO dbs.DAO
|
||||||
@@ -112,7 +113,20 @@ func (this *MessageDAO) CreateClusterMessage(tx *dbs.Tx, clusterId int64, messag
|
|||||||
|
|
||||||
// CreateNodeMessage 创建节点消息
|
// CreateNodeMessage 创建节点消息
|
||||||
func (this *MessageDAO) CreateNodeMessage(tx *dbs.Tx, clusterId int64, nodeId int64, messageType MessageType, level string, subject string, body string, paramsJSON []byte) error {
|
func (this *MessageDAO) CreateNodeMessage(tx *dbs.Tx, clusterId int64, nodeId int64, messageType MessageType, level string, subject string, body string, paramsJSON []byte) error {
|
||||||
_, err := this.createMessage(tx, clusterId, nodeId, messageType, level, subject, body, paramsJSON)
|
// 检查N分钟内是否已经发送过
|
||||||
|
hash := this.calHash(subject, body, paramsJSON)
|
||||||
|
exists, err := this.Query(tx).
|
||||||
|
Attr("hash", hash).
|
||||||
|
Gt("createdAt", time.Now().Unix()-10*60). // 10分钟
|
||||||
|
Exist()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if exists {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = this.createMessage(tx, clusterId, nodeId, messageType, level, subject, body, paramsJSON)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -144,11 +158,6 @@ func (this *MessageDAO) CreateNodeMessage(tx *dbs.Tx, clusterId int64, nodeId in
|
|||||||
|
|
||||||
// CreateMessage 创建普通消息
|
// CreateMessage 创建普通消息
|
||||||
func (this *MessageDAO) CreateMessage(tx *dbs.Tx, adminId int64, userId int64, messageType MessageType, level string, subject string, body string, paramsJSON []byte) error {
|
func (this *MessageDAO) CreateMessage(tx *dbs.Tx, adminId int64, userId int64, messageType MessageType, level string, subject string, body string, paramsJSON []byte) error {
|
||||||
h := md5.New()
|
|
||||||
h.Write([]byte(body))
|
|
||||||
h.Write(paramsJSON)
|
|
||||||
hash := fmt.Sprintf("%x", h.Sum(nil))
|
|
||||||
|
|
||||||
op := NewMessageOperator()
|
op := NewMessageOperator()
|
||||||
op.AdminId = adminId
|
op.AdminId = adminId
|
||||||
op.UserId = userId
|
op.UserId = userId
|
||||||
@@ -169,7 +178,7 @@ func (this *MessageDAO) CreateMessage(tx *dbs.Tx, adminId int64, userId int64, m
|
|||||||
op.State = MessageStateEnabled
|
op.State = MessageStateEnabled
|
||||||
op.IsRead = false
|
op.IsRead = false
|
||||||
op.Day = timeutil.Format("Ymd")
|
op.Day = timeutil.Format("Ymd")
|
||||||
op.Hash = hash
|
op.Hash = this.calHash(subject, body, paramsJSON)
|
||||||
err := this.Save(tx, op)
|
err := this.Save(tx, op)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -278,11 +287,6 @@ func (this *MessageDAO) CheckMessageUser(tx *dbs.Tx, messageId int64, adminId in
|
|||||||
|
|
||||||
// 创建消息
|
// 创建消息
|
||||||
func (this *MessageDAO) createMessage(tx *dbs.Tx, clusterId int64, nodeId int64, messageType MessageType, level string, subject string, body string, paramsJSON []byte) (int64, error) {
|
func (this *MessageDAO) createMessage(tx *dbs.Tx, clusterId int64, nodeId int64, messageType MessageType, level string, subject string, body string, paramsJSON []byte) (int64, error) {
|
||||||
h := md5.New()
|
|
||||||
h.Write([]byte(body))
|
|
||||||
h.Write(paramsJSON)
|
|
||||||
hash := fmt.Sprintf("%x", h.Sum(nil))
|
|
||||||
|
|
||||||
// TODO 检查同样的消息最近是否发送过
|
// TODO 检查同样的消息最近是否发送过
|
||||||
|
|
||||||
// 创建新消息
|
// 创建新消息
|
||||||
@@ -309,7 +313,7 @@ func (this *MessageDAO) createMessage(tx *dbs.Tx, clusterId int64, nodeId int64,
|
|||||||
op.State = MessageStateEnabled
|
op.State = MessageStateEnabled
|
||||||
op.CreatedAt = time.Now().Unix()
|
op.CreatedAt = time.Now().Unix()
|
||||||
op.Day = timeutil.Format("Ymd")
|
op.Day = timeutil.Format("Ymd")
|
||||||
op.Hash = hash
|
op.Hash = this.calHash(subject, body, paramsJSON)
|
||||||
|
|
||||||
err := this.Save(tx, op)
|
err := this.Save(tx, op)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -317,3 +321,12 @@ func (this *MessageDAO) createMessage(tx *dbs.Tx, clusterId int64, nodeId int64,
|
|||||||
}
|
}
|
||||||
return types.Int64(op.Id), nil
|
return types.Int64(op.Id), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 计算Hash
|
||||||
|
func (this *MessageDAO) calHash(subject string, body string, paramsJSON []byte) string {
|
||||||
|
h := md5.New()
|
||||||
|
h.Write([]byte(subject))
|
||||||
|
h.Write([]byte(body))
|
||||||
|
h.Write(paramsJSON)
|
||||||
|
return fmt.Sprintf("%x", h.Sum(nil))
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,11 +1,17 @@
|
|||||||
package models
|
package models
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"github.com/TeaOSLab/EdgeAPI/internal/errors"
|
"github.com/TeaOSLab/EdgeAPI/internal/errors"
|
||||||
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
||||||
_ "github.com/go-sql-driver/mysql"
|
_ "github.com/go-sql-driver/mysql"
|
||||||
"github.com/iwind/TeaGo/Tea"
|
"github.com/iwind/TeaGo/Tea"
|
||||||
"github.com/iwind/TeaGo/dbs"
|
"github.com/iwind/TeaGo/dbs"
|
||||||
|
"github.com/iwind/TeaGo/maps"
|
||||||
|
"github.com/iwind/TeaGo/types"
|
||||||
|
timeutil "github.com/iwind/TeaGo/utils/time"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -65,8 +71,9 @@ func (this *NodeThresholdDAO) FindEnabledNodeThreshold(tx *dbs.Tx, id int64) (*N
|
|||||||
}
|
}
|
||||||
|
|
||||||
// CreateThreshold 创建阈值
|
// CreateThreshold 创建阈值
|
||||||
func (this *NodeThresholdDAO) CreateThreshold(tx *dbs.Tx, clusterId int64, nodeId int64, item nodeconfigs.NodeValueItem, param string, operator nodeconfigs.NodeValueOperator, valueJSON []byte, message string, sumMethod nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (int64, error) {
|
func (this *NodeThresholdDAO) CreateThreshold(tx *dbs.Tx, role string, clusterId int64, nodeId int64, item nodeconfigs.NodeValueItem, param string, operator nodeconfigs.NodeValueOperator, valueJSON []byte, message string, sumMethod nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit, notifyDuration int32) (int64, error) {
|
||||||
op := NewNodeThresholdOperator()
|
op := NewNodeThresholdOperator()
|
||||||
|
op.Role = role
|
||||||
op.ClusterId = clusterId
|
op.ClusterId = clusterId
|
||||||
op.NodeId = nodeId
|
op.NodeId = nodeId
|
||||||
op.Item = item
|
op.Item = item
|
||||||
@@ -77,13 +84,14 @@ func (this *NodeThresholdDAO) CreateThreshold(tx *dbs.Tx, clusterId int64, nodeI
|
|||||||
op.SumMethod = sumMethod
|
op.SumMethod = sumMethod
|
||||||
op.Duration = duration
|
op.Duration = duration
|
||||||
op.DurationUnit = durationUnit
|
op.DurationUnit = durationUnit
|
||||||
|
op.NotifyDuration = notifyDuration
|
||||||
op.IsOn = true
|
op.IsOn = true
|
||||||
op.State = NodeThresholdStateEnabled
|
op.State = NodeThresholdStateEnabled
|
||||||
return this.SaveInt64(tx, op)
|
return this.SaveInt64(tx, op)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateThreshold 修改阈值
|
// UpdateThreshold 修改阈值
|
||||||
func (this *NodeThresholdDAO) UpdateThreshold(tx *dbs.Tx, thresholdId int64, item nodeconfigs.NodeValueItem, param string, operator nodeconfigs.NodeValueOperator, valueJSON []byte, message string, sumMethod nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit, isOn bool) error {
|
func (this *NodeThresholdDAO) UpdateThreshold(tx *dbs.Tx, thresholdId int64, item nodeconfigs.NodeValueItem, param string, operator nodeconfigs.NodeValueOperator, valueJSON []byte, message string, sumMethod nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit, notifyDuration int32, isOn bool) error {
|
||||||
if thresholdId <= 0 {
|
if thresholdId <= 0 {
|
||||||
return errors.New("invalid thresholdId")
|
return errors.New("invalid thresholdId")
|
||||||
}
|
}
|
||||||
@@ -97,12 +105,13 @@ func (this *NodeThresholdDAO) UpdateThreshold(tx *dbs.Tx, thresholdId int64, ite
|
|||||||
op.SumMethod = sumMethod
|
op.SumMethod = sumMethod
|
||||||
op.Duration = duration
|
op.Duration = duration
|
||||||
op.DurationUnit = durationUnit
|
op.DurationUnit = durationUnit
|
||||||
|
op.NotifyDuration = notifyDuration
|
||||||
op.IsOn = isOn
|
op.IsOn = isOn
|
||||||
return this.Save(tx, op)
|
return this.Save(tx, op)
|
||||||
}
|
}
|
||||||
|
|
||||||
// FindAllEnabledThresholds 列出所有阈值
|
// FindAllEnabledThresholds 列出所有阈值
|
||||||
func (this *NodeThresholdDAO) FindAllEnabledThresholds(tx *dbs.Tx, clusterId int64, nodeId int64) (result []*NodeThreshold, err error) {
|
func (this *NodeThresholdDAO) FindAllEnabledThresholds(tx *dbs.Tx, role string, clusterId int64, nodeId int64) (result []*NodeThreshold, err error) {
|
||||||
if clusterId <= 0 && nodeId <= 0 {
|
if clusterId <= 0 && nodeId <= 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -116,11 +125,51 @@ func (this *NodeThresholdDAO) FindAllEnabledThresholds(tx *dbs.Tx, clusterId int
|
|||||||
query.State(NodeThresholdStateEnabled)
|
query.State(NodeThresholdStateEnabled)
|
||||||
query.Slice(&result)
|
query.Slice(&result)
|
||||||
_, err = query.
|
_, err = query.
|
||||||
|
Attr("role", role).
|
||||||
|
Asc("IF(nodeId>0, 1, 0)").
|
||||||
|
Desc("order").
|
||||||
AscPk().
|
AscPk().
|
||||||
FindAll()
|
FindAll()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FindAllEnabledAndOnClusterThresholds 查询集群专属的阈值设置
|
||||||
|
func (this *NodeThresholdDAO) FindAllEnabledAndOnClusterThresholds(tx *dbs.Tx, role string, clusterId int64, item string) (result []*NodeThreshold, err error) {
|
||||||
|
if clusterId <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_, err = this.Query(tx).
|
||||||
|
Attr("role", role).
|
||||||
|
Attr("clusterId", clusterId).
|
||||||
|
Attr("nodeId", 0).
|
||||||
|
Attr("item", item).
|
||||||
|
Attr("isOn", true).
|
||||||
|
State(NodeThresholdStateEnabled).
|
||||||
|
Desc("order").
|
||||||
|
AscPk().
|
||||||
|
Slice(&result).
|
||||||
|
FindAll()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindAllEnabledAndOnNodeThresholds 查询节点专属的阈值设置
|
||||||
|
func (this *NodeThresholdDAO) FindAllEnabledAndOnNodeThresholds(tx *dbs.Tx, role string, nodeId int64, item string) (result []*NodeThreshold, err error) {
|
||||||
|
if nodeId <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_, err = this.Query(tx).
|
||||||
|
Attr("role", role).
|
||||||
|
Attr("nodeId", nodeId).
|
||||||
|
Attr("item", item).
|
||||||
|
Attr("isOn", true).
|
||||||
|
State(NodeThresholdStateEnabled).
|
||||||
|
Desc("order").
|
||||||
|
AscPk().
|
||||||
|
Slice(&result).
|
||||||
|
FindAll()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// CountAllEnabledThresholds 计算阈值的数量
|
// CountAllEnabledThresholds 计算阈值的数量
|
||||||
func (this *NodeThresholdDAO) CountAllEnabledThresholds(tx *dbs.Tx, clusterId int64, nodeId int64) (int64, error) {
|
func (this *NodeThresholdDAO) CountAllEnabledThresholds(tx *dbs.Tx, clusterId int64, nodeId int64) (int64, error) {
|
||||||
if clusterId <= 0 && nodeId <= 0 {
|
if clusterId <= 0 && nodeId <= 0 {
|
||||||
@@ -136,3 +185,87 @@ func (this *NodeThresholdDAO) CountAllEnabledThresholds(tx *dbs.Tx, clusterId in
|
|||||||
query.State(NodeThresholdStateEnabled)
|
query.State(NodeThresholdStateEnabled)
|
||||||
return query.Count()
|
return query.Count()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FireNodeThreshold 触发相关阈值设置
|
||||||
|
func (this *NodeThresholdDAO) FireNodeThreshold(tx *dbs.Tx, role string, nodeId int64, item string) error {
|
||||||
|
clusterId, err := SharedNodeDAO.FindNodeClusterId(tx, nodeId)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if clusterId == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 集群相关阈值
|
||||||
|
var thresholds []*NodeThreshold
|
||||||
|
{
|
||||||
|
clusterThresholds, err := this.FindAllEnabledAndOnClusterThresholds(tx, role, clusterId, item)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
thresholds = append(thresholds, clusterThresholds...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 节点相关阈值
|
||||||
|
{
|
||||||
|
nodeThresholds, err := this.FindAllEnabledAndOnNodeThresholds(tx, role, nodeId, item)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
thresholds = append(thresholds, nodeThresholds...)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(thresholds) > 0 {
|
||||||
|
for _, threshold := range thresholds {
|
||||||
|
if len(threshold.Param) == 0 || threshold.Duration <= 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
paramValue, err := SharedNodeValueDAO.SumValues(tx, role, nodeId, item, threshold.Param, threshold.SumMethod, types.Int32(threshold.Duration), threshold.DurationUnit)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
originValue := nodeconfigs.UnmarshalNodeValue([]byte(threshold.Value))
|
||||||
|
thresholdValue := types.Float64(originValue)
|
||||||
|
isMatched := nodeconfigs.CompareNodeValue(threshold.Operator, paramValue, thresholdValue)
|
||||||
|
if isMatched {
|
||||||
|
// TODO 执行其他动作
|
||||||
|
|
||||||
|
// 是否已经通知过
|
||||||
|
if threshold.NotifyDuration > 0 && threshold.NotifiedAt > 0 && time.Now().Unix()-int64(threshold.NotifiedAt) < int64(threshold.NotifyDuration*60) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建消息
|
||||||
|
nodeName, err := SharedNodeDAO.FindNodeName(tx, nodeId)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
itemName := nodeconfigs.FindNodeValueItemName(threshold.Item)
|
||||||
|
paramName := nodeconfigs.FindNodeValueItemParamName(threshold.Item, threshold.Param)
|
||||||
|
operatorName := nodeconfigs.FindNodeValueOperatorName(threshold.Operator)
|
||||||
|
|
||||||
|
subject := "节点 \"" + nodeName + "\" " + itemName + " 达到阈值"
|
||||||
|
body := "节点 \"" + nodeName + "\" " + itemName + " 达到阈值\n阈值设置:" + paramName + " " + operatorName + " " + originValue + "\n当前值:" + fmt.Sprintf("%.2f", paramValue) + "\n触发时间:" + timeutil.Format("Y-m-d H:i:s")
|
||||||
|
if len(threshold.Message) > 0 {
|
||||||
|
body = threshold.Message
|
||||||
|
body = strings.Replace(body, "${item.name}", itemName, -1)
|
||||||
|
body = strings.Replace(body, "${value}", fmt.Sprintf("%.2f", paramValue), -1)
|
||||||
|
}
|
||||||
|
err = SharedMessageDAO.CreateNodeMessage(tx, clusterId, nodeId, MessageTypeThresholdSatisfied, MessageLevelWarning, subject, body, maps.Map{}.AsJSON())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 设置通知时间
|
||||||
|
_, err = this.Query(tx).
|
||||||
|
Pk(threshold.Id).
|
||||||
|
Set("notifiedAt", time.Now().Unix()).
|
||||||
|
Update()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package models
|
|||||||
// NodeThreshold 集群阈值设置
|
// NodeThreshold 集群阈值设置
|
||||||
type NodeThreshold struct {
|
type NodeThreshold struct {
|
||||||
Id uint64 `field:"id"` // ID
|
Id uint64 `field:"id"` // ID
|
||||||
|
Role string `field:"role"` // 节点角色
|
||||||
ClusterId uint32 `field:"clusterId"` // 集群ID
|
ClusterId uint32 `field:"clusterId"` // 集群ID
|
||||||
NodeId uint32 `field:"nodeId"` // 节点ID
|
NodeId uint32 `field:"nodeId"` // 节点ID
|
||||||
IsOn uint8 `field:"isOn"` // 是否启用
|
IsOn uint8 `field:"isOn"` // 是否启用
|
||||||
@@ -11,15 +12,18 @@ type NodeThreshold struct {
|
|||||||
Operator string `field:"operator"` // 操作符
|
Operator string `field:"operator"` // 操作符
|
||||||
Value string `field:"value"` // 对比值
|
Value string `field:"value"` // 对比值
|
||||||
Message string `field:"message"` // 消息内容
|
Message string `field:"message"` // 消息内容
|
||||||
State uint8 `field:"state"` // 状态
|
NotifyDuration uint32 `field:"notifyDuration"` // 通知间隔
|
||||||
|
NotifiedAt uint32 `field:"notifiedAt"` // 上次通知时间
|
||||||
Duration uint32 `field:"duration"` // 时间段
|
Duration uint32 `field:"duration"` // 时间段
|
||||||
DurationUnit string `field:"durationUnit"` // 时间段单位
|
DurationUnit string `field:"durationUnit"` // 时间段单位
|
||||||
SumMethod string `field:"sumMethod"` // 聚合方法
|
SumMethod string `field:"sumMethod"` // 聚合方法
|
||||||
Order uint32 `field:"order"` // 排序
|
Order uint32 `field:"order"` // 排序
|
||||||
|
State uint8 `field:"state"` // 状态
|
||||||
}
|
}
|
||||||
|
|
||||||
type NodeThresholdOperator struct {
|
type NodeThresholdOperator struct {
|
||||||
Id interface{} // ID
|
Id interface{} // ID
|
||||||
|
Role interface{} // 节点角色
|
||||||
ClusterId interface{} // 集群ID
|
ClusterId interface{} // 集群ID
|
||||||
NodeId interface{} // 节点ID
|
NodeId interface{} // 节点ID
|
||||||
IsOn interface{} // 是否启用
|
IsOn interface{} // 是否启用
|
||||||
@@ -28,11 +32,13 @@ type NodeThresholdOperator struct {
|
|||||||
Operator interface{} // 操作符
|
Operator interface{} // 操作符
|
||||||
Value interface{} // 对比值
|
Value interface{} // 对比值
|
||||||
Message interface{} // 消息内容
|
Message interface{} // 消息内容
|
||||||
State interface{} // 状态
|
NotifyDuration interface{} // 通知间隔
|
||||||
|
NotifiedAt interface{} // 上次通知时间
|
||||||
Duration interface{} // 时间段
|
Duration interface{} // 时间段
|
||||||
DurationUnit interface{} // 时间段单位
|
DurationUnit interface{} // 时间段单位
|
||||||
SumMethod interface{} // 聚合方法
|
SumMethod interface{} // 聚合方法
|
||||||
Order interface{} // 排序
|
Order interface{} // 排序
|
||||||
|
State interface{} // 状态
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewNodeThresholdOperator() *NodeThresholdOperator {
|
func NewNodeThresholdOperator() *NodeThresholdOperator {
|
||||||
|
|||||||
@@ -89,3 +89,32 @@ func (this *NodeValueDAO) ListValues(tx *dbs.Tx, role string, nodeId int64, item
|
|||||||
FindAll()
|
FindAll()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SumValues 计算某项参数值
|
||||||
|
func (this *NodeValueDAO) SumValues(tx *dbs.Tx, role string, nodeId int64, item string, param string, method nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (float64, error) {
|
||||||
|
if duration <= 0 {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
query := this.Query(tx).
|
||||||
|
Attr("role", role).
|
||||||
|
Attr("nodeId", nodeId).
|
||||||
|
Attr("item", item)
|
||||||
|
switch method {
|
||||||
|
case nodeconfigs.NodeValueSumMethodAvg:
|
||||||
|
query.Result("AVG(JSON_EXTRACT(value, '$." + param + "'))")
|
||||||
|
case nodeconfigs.NodeValueSumMethodSum:
|
||||||
|
query.Result("SUM(JSON_EXTRACT(value, '$." + param + "'))")
|
||||||
|
default:
|
||||||
|
query.Result("AVG(JSON_EXTRACT(value, '$." + param + "'))")
|
||||||
|
}
|
||||||
|
switch durationUnit {
|
||||||
|
case nodeconfigs.NodeValueDurationUnitMinute:
|
||||||
|
fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration * 60))
|
||||||
|
query.Gte("minute", fromMinute)
|
||||||
|
default:
|
||||||
|
fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration * 60))
|
||||||
|
query.Gte("minute", fromMinute)
|
||||||
|
}
|
||||||
|
return query.FindFloat64Col(0)
|
||||||
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ package services
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
||||||
|
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
|
||||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||||
"github.com/iwind/TeaGo/types"
|
"github.com/iwind/TeaGo/types"
|
||||||
)
|
)
|
||||||
@@ -21,7 +22,7 @@ func (this *NodeThresholdService) CreateNodeThreshold(ctx context.Context, req *
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var tx = this.NullTx()
|
var tx = this.NullTx()
|
||||||
thresholdId, err := models.SharedNodeThresholdDAO.CreateThreshold(tx, req.NodeClusterId, req.NodeId, req.Item, req.Param, req.Operator, req.ValueJSON, req.Message, req.SumMethod, req.Duration, req.DurationUnit)
|
thresholdId, err := models.SharedNodeThresholdDAO.CreateThreshold(tx, rpcutils.UserTypeNode, req.NodeClusterId, req.NodeId, req.Item, req.Param, req.Operator, req.ValueJSON, req.Message, req.SumMethod, req.Duration, req.DurationUnit, req.NotifyDuration)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -35,7 +36,7 @@ func (this *NodeThresholdService) UpdateNodeThreshold(ctx context.Context, req *
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var tx = this.NullTx()
|
var tx = this.NullTx()
|
||||||
err = models.SharedNodeThresholdDAO.UpdateThreshold(tx, req.NodeThresholdId, req.Item, req.Param, req.Operator, req.ValueJSON, req.Message, req.SumMethod, req.Duration, req.DurationUnit, req.IsOn)
|
err = models.SharedNodeThresholdDAO.UpdateThreshold(tx, req.NodeThresholdId, req.Item, req.Param, req.Operator, req.ValueJSON, req.Message, req.SumMethod, req.Duration, req.DurationUnit, req.NotifyDuration, req.IsOn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -65,15 +66,31 @@ func (this *NodeThresholdService) FindAllEnabledNodeThresholds(ctx context.Conte
|
|||||||
|
|
||||||
var tx = this.NullTx()
|
var tx = this.NullTx()
|
||||||
pbThresholds := []*pb.NodeThreshold{}
|
pbThresholds := []*pb.NodeThreshold{}
|
||||||
thresholds, err := models.SharedNodeThresholdDAO.FindAllEnabledThresholds(tx, req.NodeClusterId, req.NodeId)
|
thresholds, err := models.SharedNodeThresholdDAO.FindAllEnabledThresholds(tx, req.Role, req.NodeClusterId, req.NodeId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
for _, threshold := range thresholds {
|
for _, threshold := range thresholds {
|
||||||
|
// 节点信息
|
||||||
|
var pbNode *pb.Node = nil
|
||||||
|
if threshold.NodeId > 0 {
|
||||||
|
nodeName, err := models.SharedNodeDAO.FindNodeName(tx, int64(threshold.NodeId))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(nodeName) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
pbNode = &pb.Node{
|
||||||
|
Id: int64(threshold.NodeId),
|
||||||
|
Name: nodeName,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pbThresholds = append(pbThresholds, &pb.NodeThreshold{
|
pbThresholds = append(pbThresholds, &pb.NodeThreshold{
|
||||||
Id: int64(threshold.Id),
|
Id: int64(threshold.Id),
|
||||||
ClusterId: int64(threshold.ClusterId),
|
ClusterId: int64(threshold.ClusterId),
|
||||||
NodeId: int64(threshold.NodeId),
|
Node: pbNode,
|
||||||
Item: threshold.Item,
|
Item: threshold.Item,
|
||||||
Param: threshold.Param,
|
Param: threshold.Param,
|
||||||
Operator: threshold.Operator,
|
Operator: threshold.Operator,
|
||||||
@@ -82,6 +99,7 @@ func (this *NodeThresholdService) FindAllEnabledNodeThresholds(ctx context.Conte
|
|||||||
Duration: types.Int32(threshold.Duration),
|
Duration: types.Int32(threshold.Duration),
|
||||||
DurationUnit: threshold.DurationUnit,
|
DurationUnit: threshold.DurationUnit,
|
||||||
SumMethod: threshold.SumMethod,
|
SumMethod: threshold.SumMethod,
|
||||||
|
NotifyDuration: int32(threshold.NotifyDuration),
|
||||||
IsOn: threshold.IsOn == 1,
|
IsOn: threshold.IsOn == 1,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -119,10 +137,26 @@ func (this *NodeThresholdService) FindEnabledNodeThreshold(ctx context.Context,
|
|||||||
return &pb.FindEnabledNodeThresholdResponse{NodeThreshold: nil}, nil
|
return &pb.FindEnabledNodeThresholdResponse{NodeThreshold: nil}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 节点信息
|
||||||
|
var pbNode *pb.Node = nil
|
||||||
|
if threshold.NodeId > 0 {
|
||||||
|
nodeName, err := models.SharedNodeDAO.FindNodeName(tx, int64(threshold.NodeId))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(nodeName) == 0 {
|
||||||
|
return &pb.FindEnabledNodeThresholdResponse{NodeThreshold: nil}, nil
|
||||||
|
}
|
||||||
|
pbNode = &pb.Node{
|
||||||
|
Id: int64(threshold.NodeId),
|
||||||
|
Name: nodeName,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return &pb.FindEnabledNodeThresholdResponse{NodeThreshold: &pb.NodeThreshold{
|
return &pb.FindEnabledNodeThresholdResponse{NodeThreshold: &pb.NodeThreshold{
|
||||||
Id: int64(threshold.Id),
|
Id: int64(threshold.Id),
|
||||||
ClusterId: int64(threshold.ClusterId),
|
ClusterId: int64(threshold.ClusterId),
|
||||||
NodeId: int64(threshold.NodeId),
|
Node: pbNode,
|
||||||
Item: threshold.Item,
|
Item: threshold.Item,
|
||||||
Param: threshold.Param,
|
Param: threshold.Param,
|
||||||
Operator: threshold.Operator,
|
Operator: threshold.Operator,
|
||||||
@@ -131,6 +165,7 @@ func (this *NodeThresholdService) FindEnabledNodeThreshold(ctx context.Context,
|
|||||||
Duration: types.Int32(threshold.Duration),
|
Duration: types.Int32(threshold.Duration),
|
||||||
DurationUnit: threshold.DurationUnit,
|
DurationUnit: threshold.DurationUnit,
|
||||||
SumMethod: threshold.SumMethod,
|
SumMethod: threshold.SumMethod,
|
||||||
|
NotifyDuration: int32(threshold.NotifyDuration),
|
||||||
IsOn: threshold.IsOn == 1,
|
IsOn: threshold.IsOn == 1,
|
||||||
}}, nil
|
}}, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -26,6 +26,12 @@ func (this *NodeValueService) CreateNodeValue(ctx context.Context, req *pb.Creat
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 触发阈值
|
||||||
|
err = models.SharedNodeThresholdDAO.FireNodeThreshold(tx, role, nodeId, req.Item)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
return this.Success()
|
return this.Success()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user