mirror of
https://github.com/TeaOSLab/EdgeAPI.git
synced 2025-11-03 23:20:26 +08:00
节点IP地址可以设置阈值
This commit is contained in:
@@ -40,6 +40,8 @@ const (
|
||||
MessageTypeServerNamesAuditingFailed MessageType = "ServerNamesAuditingFailed" // 服务域名审核失败
|
||||
MessageTypeThresholdSatisfied MessageType = "ThresholdSatisfied" // 满足阈值
|
||||
MessageTypeFirewallEvent MessageType = "FirewallEvent" // 防火墙事件
|
||||
MessageTypeIPAddrUp MessageType = "IPAddrUp" // IP地址上线
|
||||
MessageTypeIPAddrDown MessageType = "IPAddrDown" // IP地址下线
|
||||
|
||||
MessageTypeNSNodeInactive MessageType = "NSNodeInactive" // 边缘节点不活跃
|
||||
MessageTypeNSNodeActive MessageType = "NSNodeActive" // 边缘节点活跃
|
||||
|
||||
@@ -7,7 +7,10 @@ import (
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
"github.com/iwind/TeaGo/dbs"
|
||||
"github.com/iwind/TeaGo/maps"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"math"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -101,7 +104,7 @@ func (this *NodeIPAddressDAO) FindAddressName(tx *dbs.Tx, id int64) (string, err
|
||||
}
|
||||
|
||||
// CreateAddress 创建IP地址
|
||||
func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, nodeId int64, role nodeconfigs.NodeRole, name string, ip string, canAccess bool) (addressId int64, err error) {
|
||||
func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, nodeId int64, role nodeconfigs.NodeRole, name string, ip string, canAccess bool, thresholdsJSON []byte) (addressId int64, err error) {
|
||||
if len(role) == 0 {
|
||||
role = nodeconfigs.NodeRoleNode
|
||||
}
|
||||
@@ -112,6 +115,13 @@ func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, nodeId int64, role nodec
|
||||
op.Name = name
|
||||
op.Ip = ip
|
||||
op.CanAccess = canAccess
|
||||
|
||||
if len(thresholdsJSON) > 0 {
|
||||
op.Thresholds = thresholdsJSON
|
||||
} else {
|
||||
op.Thresholds = "[]"
|
||||
}
|
||||
|
||||
op.State = NodeIPAddressStateEnabled
|
||||
err = this.Save(tx, op)
|
||||
if err != nil {
|
||||
@@ -127,7 +137,7 @@ func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, nodeId int64, role nodec
|
||||
}
|
||||
|
||||
// UpdateAddress 修改IP地址
|
||||
func (this *NodeIPAddressDAO) UpdateAddress(tx *dbs.Tx, addressId int64, name string, ip string, canAccess bool, isOn bool) (err error) {
|
||||
func (this *NodeIPAddressDAO) UpdateAddress(tx *dbs.Tx, addressId int64, name string, ip string, canAccess bool, isOn bool, thresholdsJSON []byte) (err error) {
|
||||
if addressId <= 0 {
|
||||
return errors.New("invalid addressId")
|
||||
}
|
||||
@@ -138,6 +148,13 @@ func (this *NodeIPAddressDAO) UpdateAddress(tx *dbs.Tx, addressId int64, name st
|
||||
op.Ip = ip
|
||||
op.CanAccess = canAccess
|
||||
op.IsOn = isOn
|
||||
|
||||
if len(thresholdsJSON) > 0 {
|
||||
op.Thresholds = thresholdsJSON
|
||||
} else {
|
||||
op.Thresholds = "[]"
|
||||
}
|
||||
|
||||
op.State = NodeIPAddressStateEnabled // 恢复状态
|
||||
err = this.Save(tx, op)
|
||||
if err != nil {
|
||||
@@ -247,6 +264,111 @@ func (this *NodeIPAddressDAO) FindNodeAccessAndUpIPAddresses(tx *dbs.Tx, nodeId
|
||||
return
|
||||
}
|
||||
|
||||
// FireThresholds 触发阈值
|
||||
func (this *NodeIPAddressDAO) FireThresholds(tx *dbs.Tx, role nodeconfigs.NodeRole, nodeId int64) error {
|
||||
|
||||
ones, err := this.Query(tx).
|
||||
Attr("state", NodeIPAddressStateEnabled).
|
||||
Attr("role", role).
|
||||
Attr("nodeId", nodeId).
|
||||
Attr("canAccess", true).
|
||||
Attr("isOn", true).
|
||||
FindAll()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, one := range ones {
|
||||
addr := one.(*NodeIPAddress)
|
||||
var thresholds = addr.DecodeThresholds()
|
||||
if len(thresholds) == 0 {
|
||||
continue
|
||||
}
|
||||
var isOk = true
|
||||
var summary = []string{}
|
||||
for _, threshold := range thresholds {
|
||||
if threshold.Value <= 0 || threshold.Duration <= 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
var value = float64(0)
|
||||
switch threshold.Item {
|
||||
case "avgRequests":
|
||||
value, err = SharedNodeValueDAO.SumValues(tx, role, nodeId, nodeconfigs.NodeValueItemRequests, "total", nodeconfigs.NodeValueSumMethodAvg, types.Int32(threshold.Duration), threshold.DurationUnit)
|
||||
value = math.Round(value / 60)
|
||||
summary = append(summary, "平均请求数:"+types.String(value)+"/s")
|
||||
case "avgTrafficOut":
|
||||
value, err = SharedNodeValueDAO.SumValues(tx, role, nodeId, nodeconfigs.NodeValueItemTrafficOut, "total", nodeconfigs.NodeValueSumMethodAvg, types.Int32(threshold.Duration), threshold.DurationUnit)
|
||||
value = math.Round(value*100/1024/1024/60) / 100 // 100 = 两位小数
|
||||
summary = append(summary, "平均下行流量:"+types.String(value)+"MB/s")
|
||||
case "avgTrafficIn":
|
||||
value, err = SharedNodeValueDAO.SumValues(tx, role, nodeId, nodeconfigs.NodeValueItemTrafficIn, "total", nodeconfigs.NodeValueSumMethodAvg, types.Int32(threshold.Duration), threshold.DurationUnit)
|
||||
value = math.Round(value*100/1024/1024/60) / 100 // 100 = 两位小数
|
||||
summary = append(summary, "平均上行流量:"+types.String(value)+"MB/s")
|
||||
default:
|
||||
// TODO 支持更多
|
||||
err = errors.New("threshold item '" + threshold.Item + "' not supported")
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !nodeconfigs.CompareNodeValue(threshold.Operator, value, float64(threshold.Value)) {
|
||||
isOk = false
|
||||
}
|
||||
}
|
||||
if isOk && addr.IsUp == 0 { // 新上线
|
||||
_, err := this.Query(tx).
|
||||
Pk(addr.Id).
|
||||
Set("isUp", true).
|
||||
Update()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
clusterId, err := SharedNodeDAO.FindNodeClusterId(tx, nodeId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = SharedMessageDAO.CreateNodeMessage(tx, role, clusterId, nodeId, MessageTypeIPAddrUp, MessageLevelSuccess, "节点IP'"+addr.Ip+"'因为达到阈值而上线", "节点IP'"+addr.Ip+"'因为达到阈值而上线。"+strings.Join(summary, ",") + "。", maps.Map{
|
||||
"addrId": addr.Id,
|
||||
}.AsJSON())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = this.NotifyUpdate(tx, int64(addr.Id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else if !isOk && addr.IsUp == 1 { // 新离线
|
||||
_, err := this.Query(tx).
|
||||
Pk(addr.Id).
|
||||
Set("isUp", false).
|
||||
Update()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
clusterId, err := SharedNodeDAO.FindNodeClusterId(tx, nodeId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = SharedMessageDAO.CreateNodeMessage(tx, role, clusterId, nodeId, MessageTypeIPAddrDown, MessageLevelWarning, "节点IP'"+addr.Ip+"'因为达到阈值而下线", "节点IP'"+addr.Ip+"'因为达到阈值而下线。"+strings.Join(summary, ",") + "。", maps.Map{
|
||||
"addrId": addr.Id,
|
||||
}.AsJSON())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = this.NotifyUpdate(tx, int64(addr.Id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NotifyUpdate 通知更新
|
||||
func (this *NodeIPAddressDAO) NotifyUpdate(tx *dbs.Tx, addressId int64) error {
|
||||
address, err := this.Query(tx).
|
||||
|
||||
@@ -1 +1,20 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
||||
"github.com/iwind/TeaGo/logs"
|
||||
)
|
||||
|
||||
func (this *NodeIPAddress) DecodeThresholds() []*nodeconfigs.NodeValueThresholdConfig {
|
||||
var result = []*nodeconfigs.NodeValueThresholdConfig{}
|
||||
if len(this.Thresholds) == 0 {
|
||||
return result
|
||||
}
|
||||
err := json.Unmarshal([]byte(this.Thresholds), &result)
|
||||
if err != nil {
|
||||
// 不处理错误
|
||||
logs.Error(err)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -1377,7 +1377,7 @@ func (this *NodeService) UpdateNodeDNS(ctx context.Context, req *pb.UpdateNodeDN
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
_, err = models.SharedNodeIPAddressDAO.CreateAddress(tx, req.NodeId, nodeconfigs.NodeRoleNode, "DNS IP", req.IpAddr, true)
|
||||
_, err = models.SharedNodeIPAddressDAO.CreateAddress(tx, req.NodeId, nodeconfigs.NodeRoleNode, "DNS IP", req.IpAddr, true, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ func (this *NodeIPAddressService) CreateNodeIPAddress(ctx context.Context, req *
|
||||
|
||||
tx := this.NullTx()
|
||||
|
||||
addressId, err := models.SharedNodeIPAddressDAO.CreateAddress(tx, req.NodeId, req.Role, req.Name, req.Ip, req.CanAccess)
|
||||
addressId, err := models.SharedNodeIPAddressDAO.CreateAddress(tx, req.NodeId, req.Role, req.Name, req.Ip, req.CanAccess, req.ThresholdsJSON)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -38,7 +38,7 @@ func (this *NodeIPAddressService) UpdateNodeIPAddress(ctx context.Context, req *
|
||||
|
||||
tx := this.NullTx()
|
||||
|
||||
err = models.SharedNodeIPAddressDAO.UpdateAddress(tx, req.AddressId, req.Name, req.Ip, req.CanAccess, req.IsOn)
|
||||
err = models.SharedNodeIPAddressDAO.UpdateAddress(tx, req.AddressId, req.Name, req.Ip, req.CanAccess, req.IsOn, req.ThresholdsJSON)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
||||
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
@@ -39,12 +40,21 @@ func (this *NodeValueService) CreateNodeValue(ctx context.Context, req *pb.Creat
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 触发阈值
|
||||
// 触发节点阈值
|
||||
err = models.SharedNodeThresholdDAO.FireNodeThreshold(tx, role, nodeId, req.Item)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 触发IP阈值
|
||||
// 企业版专有
|
||||
if teaconst.IsPlus {
|
||||
err = models.SharedNodeIPAddressDAO.FireThresholds(tx, role, nodeId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return this.Success()
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user