mirror of
				https://github.com/TeaOSLab/EdgeAPI.git
				synced 2025-11-04 07:50:25 +08:00 
			
		
		
		
	优化节点阈值设置
This commit is contained in:
		@@ -1,17 +1,11 @@
 | 
				
			|||||||
package models
 | 
					package models
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"fmt"
 | 
					 | 
				
			||||||
	"github.com/TeaOSLab/EdgeAPI/internal/errors"
 | 
						"github.com/TeaOSLab/EdgeAPI/internal/errors"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
 | 
						"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
 | 
				
			||||||
	_ "github.com/go-sql-driver/mysql"
 | 
						_ "github.com/go-sql-driver/mysql"
 | 
				
			||||||
	"github.com/iwind/TeaGo/Tea"
 | 
						"github.com/iwind/TeaGo/Tea"
 | 
				
			||||||
	"github.com/iwind/TeaGo/dbs"
 | 
						"github.com/iwind/TeaGo/dbs"
 | 
				
			||||||
	"github.com/iwind/TeaGo/maps"
 | 
					 | 
				
			||||||
	"github.com/iwind/TeaGo/types"
 | 
					 | 
				
			||||||
	timeutil "github.com/iwind/TeaGo/utils/time"
 | 
					 | 
				
			||||||
	"strings"
 | 
					 | 
				
			||||||
	"time"
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const (
 | 
					const (
 | 
				
			||||||
@@ -153,12 +147,13 @@ func (this *NodeThresholdDAO) FindAllEnabledAndOnClusterThresholds(tx *dbs.Tx, r
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// FindAllEnabledAndOnNodeThresholds 查询节点专属的阈值设置
 | 
					// FindAllEnabledAndOnNodeThresholds 查询节点专属的阈值设置
 | 
				
			||||||
func (this *NodeThresholdDAO) FindAllEnabledAndOnNodeThresholds(tx *dbs.Tx, role string, nodeId int64, item string) (result []*NodeThreshold, err error) {
 | 
					func (this *NodeThresholdDAO) FindAllEnabledAndOnNodeThresholds(tx *dbs.Tx, role string, clusterId int64, nodeId int64, item string) (result []*NodeThreshold, err error) {
 | 
				
			||||||
	if nodeId <= 0 {
 | 
						if clusterId <= 0 || nodeId <= 0 {
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	_, err = this.Query(tx).
 | 
						_, err = this.Query(tx).
 | 
				
			||||||
		Attr("role", role).
 | 
							Attr("role", role).
 | 
				
			||||||
 | 
							Attr("clusterId", clusterId).
 | 
				
			||||||
		Attr("nodeId", nodeId).
 | 
							Attr("nodeId", nodeId).
 | 
				
			||||||
		Attr("item", item).
 | 
							Attr("item", item).
 | 
				
			||||||
		Attr("isOn", true).
 | 
							Attr("isOn", true).
 | 
				
			||||||
@@ -186,87 +181,3 @@ func (this *NodeThresholdDAO) CountAllEnabledThresholds(tx *dbs.Tx, role string,
 | 
				
			|||||||
	query.State(NodeThresholdStateEnabled)
 | 
						query.State(NodeThresholdStateEnabled)
 | 
				
			||||||
	return query.Count()
 | 
						return query.Count()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					 | 
				
			||||||
// FireNodeThreshold 触发相关阈值设置
 | 
					 | 
				
			||||||
func (this *NodeThresholdDAO) FireNodeThreshold(tx *dbs.Tx, role string, nodeId int64, item string) error {
 | 
					 | 
				
			||||||
	clusterId, err := SharedNodeDAO.FindNodeClusterId(tx, nodeId)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if clusterId == 0 {
 | 
					 | 
				
			||||||
		return nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// 集群相关阈值
 | 
					 | 
				
			||||||
	var thresholds []*NodeThreshold
 | 
					 | 
				
			||||||
	{
 | 
					 | 
				
			||||||
		clusterThresholds, err := this.FindAllEnabledAndOnClusterThresholds(tx, role, clusterId, item)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			return err
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		thresholds = append(thresholds, clusterThresholds...)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// 节点相关阈值
 | 
					 | 
				
			||||||
	{
 | 
					 | 
				
			||||||
		nodeThresholds, err := this.FindAllEnabledAndOnNodeThresholds(tx, role, nodeId, item)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			return err
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		thresholds = append(thresholds, nodeThresholds...)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if len(thresholds) > 0 {
 | 
					 | 
				
			||||||
		for _, threshold := range thresholds {
 | 
					 | 
				
			||||||
			if len(threshold.Param) == 0 || threshold.Duration <= 0 {
 | 
					 | 
				
			||||||
				continue
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			paramValue, err := SharedNodeValueDAO.SumNodeValues(tx, role, nodeId, item, threshold.Param, threshold.SumMethod, types.Int32(threshold.Duration), threshold.DurationUnit)
 | 
					 | 
				
			||||||
			if err != nil {
 | 
					 | 
				
			||||||
				return err
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			originValue := nodeconfigs.UnmarshalNodeValue(threshold.Value)
 | 
					 | 
				
			||||||
			thresholdValue := types.Float64(originValue)
 | 
					 | 
				
			||||||
			isMatched := nodeconfigs.CompareNodeValue(threshold.Operator, paramValue, thresholdValue)
 | 
					 | 
				
			||||||
			if isMatched {
 | 
					 | 
				
			||||||
				// TODO 执行其他动作
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
				// 是否已经通知过
 | 
					 | 
				
			||||||
				if threshold.NotifyDuration > 0 && threshold.NotifiedAt > 0 && time.Now().Unix()-int64(threshold.NotifiedAt) < int64(threshold.NotifyDuration*60) {
 | 
					 | 
				
			||||||
					continue
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
				// 创建消息
 | 
					 | 
				
			||||||
				nodeName, err := SharedNodeDAO.FindNodeName(tx, nodeId)
 | 
					 | 
				
			||||||
				if err != nil {
 | 
					 | 
				
			||||||
					return err
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
				itemName := nodeconfigs.FindNodeValueItemName(threshold.Item)
 | 
					 | 
				
			||||||
				paramName := nodeconfigs.FindNodeValueItemParamName(threshold.Item, threshold.Param)
 | 
					 | 
				
			||||||
				operatorName := nodeconfigs.FindNodeValueOperatorName(threshold.Operator)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
				subject := "节点 \"" + nodeName + "\" " + itemName + " 达到阈值"
 | 
					 | 
				
			||||||
				body := "节点 \"" + nodeName + "\" " + itemName + " 达到阈值\n阈值设置:" + paramName + " " + operatorName + " " + originValue + "\n当前值:" + fmt.Sprintf("%.2f", paramValue) + "\n触发时间:" + timeutil.Format("Y-m-d H:i:s")
 | 
					 | 
				
			||||||
				if len(threshold.Message) > 0 {
 | 
					 | 
				
			||||||
					body = threshold.Message
 | 
					 | 
				
			||||||
					body = strings.Replace(body, "${item.name}", itemName, -1)
 | 
					 | 
				
			||||||
					body = strings.Replace(body, "${value}", fmt.Sprintf("%.2f", paramValue), -1)
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
				err = SharedMessageDAO.CreateNodeMessage(tx, role, clusterId, nodeId, MessageTypeThresholdSatisfied, MessageLevelWarning, subject, body, maps.Map{}.AsJSON(), true)
 | 
					 | 
				
			||||||
				if err != nil {
 | 
					 | 
				
			||||||
					return err
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
				// 设置通知时间
 | 
					 | 
				
			||||||
				_, err = this.Query(tx).
 | 
					 | 
				
			||||||
					Pk(threshold.Id).
 | 
					 | 
				
			||||||
					Set("notifiedAt", time.Now().Unix()).
 | 
					 | 
				
			||||||
					Update()
 | 
					 | 
				
			||||||
				if err != nil {
 | 
					 | 
				
			||||||
					return err
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										12
									
								
								internal/db/models/node_threshold_dao_ext.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										12
									
								
								internal/db/models/node_threshold_dao_ext.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,12 @@
 | 
				
			|||||||
 | 
					// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
 | 
				
			||||||
 | 
					//go:build !plus
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package models
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import "github.com/iwind/TeaGo/dbs"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// FireNodeThreshold 触发相关阈值设置
 | 
				
			||||||
 | 
					func (this *NodeThresholdDAO) FireNodeThreshold(tx *dbs.Tx, role string, nodeId int64, item string) error {
 | 
				
			||||||
 | 
						// stub
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -2066,7 +2066,7 @@ func (this *NodeService) FindEnabledNodeConfigInfo(ctx context.Context, req *pb.
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// thresholds
 | 
						// thresholds
 | 
				
			||||||
	countThresholds, err := models.SharedNodeThresholdDAO.CountAllEnabledThresholds(tx, nodeconfigs.NodeRoleNode, 0, req.NodeId)
 | 
						countThresholds, err := models.SharedNodeThresholdDAO.CountAllEnabledThresholds(tx, nodeconfigs.NodeRoleNode, int64(node.ClusterId), req.NodeId)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user