IP阈值增加节点分组和集群相关统计项目

This commit is contained in:
GoEdgeLab
2021-09-14 11:36:22 +08:00
parent 187bb3bc0c
commit c2831398c8
9 changed files with 83 additions and 22 deletions

View File

@@ -118,21 +118,23 @@ func (this *MessageDAO) CreateClusterMessage(tx *dbs.Tx, role string, clusterId
}
// CreateNodeMessage 创建节点消息
func (this *MessageDAO) CreateNodeMessage(tx *dbs.Tx, role string, clusterId int64, nodeId int64, messageType MessageType, level string, subject string, body string, paramsJSON []byte) error {
func (this *MessageDAO) CreateNodeMessage(tx *dbs.Tx, role string, clusterId int64, nodeId int64, messageType MessageType, level string, subject string, body string, paramsJSON []byte, force bool) error {
// 检查N分钟内是否已经发送过
hash := this.calHash(role, clusterId, nodeId, subject, body, paramsJSON)
exists, err := this.Query(tx).
Attr("hash", hash).
Gt("createdAt", time.Now().Unix()-10*60). // 10分钟
Exist()
if err != nil {
return err
}
if exists {
return nil
if !force {
exists, err := this.Query(tx).
Attr("hash", hash).
Gt("createdAt", time.Now().Unix()-10*60). // 10分钟
Exist()
if err != nil {
return err
}
if exists {
return nil
}
}
_, err = this.createMessage(tx, role, clusterId, nodeId, messageType, level, subject, body, paramsJSON)
_, err := this.createMessage(tx, role, clusterId, nodeId, messageType, level, subject, body, paramsJSON)
if err != nil {
return err
}

View File

@@ -113,7 +113,7 @@ func (this *NodeDAO) FindEnabledBasicNode(tx *dbs.Tx, nodeId int64) (*Node, erro
one, err := this.Query(tx).
State(NodeStateEnabled).
Pk(nodeId).
Result("id", "name", "clusterId", "isOn", "isUp").
Result("id", "name", "clusterId", "groupId", "isOn", "isUp").
Find()
if one == nil {
return nil, err

View File

@@ -221,7 +221,7 @@ func (this *NodeThresholdDAO) FireNodeThreshold(tx *dbs.Tx, role string, nodeId
if len(threshold.Param) == 0 || threshold.Duration <= 0 {
continue
}
paramValue, err := SharedNodeValueDAO.SumValues(tx, role, nodeId, item, threshold.Param, threshold.SumMethod, types.Int32(threshold.Duration), threshold.DurationUnit)
paramValue, err := SharedNodeValueDAO.SumNodeValues(tx, role, nodeId, item, threshold.Param, threshold.SumMethod, types.Int32(threshold.Duration), threshold.DurationUnit)
if err != nil {
return err
}
@@ -252,7 +252,7 @@ func (this *NodeThresholdDAO) FireNodeThreshold(tx *dbs.Tx, role string, nodeId
body = strings.Replace(body, "${item.name}", itemName, -1)
body = strings.Replace(body, "${value}", fmt.Sprintf("%.2f", paramValue), -1)
}
err = SharedMessageDAO.CreateNodeMessage(tx, role, clusterId, nodeId, MessageTypeThresholdSatisfied, MessageLevelWarning, subject, body, maps.Map{}.AsJSON())
err = SharedMessageDAO.CreateNodeMessage(tx, role, clusterId, nodeId, MessageTypeThresholdSatisfied, MessageLevelWarning, subject, body, maps.Map{}.AsJSON(), true)
if err != nil {
return err
}

View File

@@ -173,8 +173,8 @@ func (this *NodeValueDAO) ListValuesForNSNodes(tx *dbs.Tx, item string, key stri
return
}
// SumValues 计算某项参数值
func (this *NodeValueDAO) SumValues(tx *dbs.Tx, role string, nodeId int64, item string, param string, method nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (float64, error) {
// SumNodeValues 计算节点的某项参数值
func (this *NodeValueDAO) SumNodeValues(tx *dbs.Tx, role string, nodeId int64, item string, param string, method nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (float64, error) {
if duration <= 0 {
return 0, nil
}
@@ -202,6 +202,65 @@ func (this *NodeValueDAO) SumValues(tx *dbs.Tx, role string, nodeId int64, item
return query.FindFloat64Col(0)
}
// SumNodeGroupValues 计算节点分组的某项参数值
func (this *NodeValueDAO) SumNodeGroupValues(tx *dbs.Tx, role string, groupId int64, item string, param string, method nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (float64, error) {
if duration <= 0 {
return 0, nil
}
query := this.Query(tx).
Attr("role", role).
Where("nodeId IN (SELECT id FROM "+SharedNodeDAO.Table+" WHERE groupId=:groupId AND state=1)").
Param("groupId", groupId).
Attr("item", item)
switch method {
case nodeconfigs.NodeValueSumMethodAvg:
query.Result("AVG(JSON_EXTRACT(value, '$." + param + "'))")
case nodeconfigs.NodeValueSumMethodSum:
query.Result("SUM(JSON_EXTRACT(value, '$." + param + "'))")
default:
query.Result("AVG(JSON_EXTRACT(value, '$." + param + "'))")
}
switch durationUnit {
case nodeconfigs.NodeValueDurationUnitMinute:
fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration*60))
query.Gte("minute", fromMinute)
default:
fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration*60))
query.Gte("minute", fromMinute)
}
return query.FindFloat64Col(0)
}
// SumNodeClusterValues 计算节点集群的某项参数值
func (this *NodeValueDAO) SumNodeClusterValues(tx *dbs.Tx, role string, clusterId int64, item string, param string, method nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (float64, error) {
if duration <= 0 {
return 0, nil
}
query := this.Query(tx).
Attr("role", role).
Attr("clusterId", clusterId).
Attr("item", item)
switch method {
case nodeconfigs.NodeValueSumMethodAvg:
query.Result("AVG(JSON_EXTRACT(value, '$." + param + "'))")
case nodeconfigs.NodeValueSumMethodSum:
query.Result("SUM(JSON_EXTRACT(value, '$." + param + "'))")
default:
query.Result("AVG(JSON_EXTRACT(value, '$." + param + "'))")
}
switch durationUnit {
case nodeconfigs.NodeValueDurationUnitMinute:
fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration*60))
query.Gte("minute", fromMinute)
default:
fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration*60))
query.Gte("minute", fromMinute)
}
return query.FindFloat64Col(0)
}
// FindLatestNodeValue 获取最近一条数据
func (this *NodeValueDAO) FindLatestNodeValue(tx *dbs.Tx, role string, nodeId int64, item string) (*NodeValue, error) {
one, err := this.Query(tx).