diff --git a/internal/db/models/message_dao.go b/internal/db/models/message_dao.go index 7cf0046d..d912915c 100644 --- a/internal/db/models/message_dao.go +++ b/internal/db/models/message_dao.go @@ -40,6 +40,8 @@ const ( MessageTypeServerNamesAuditingFailed MessageType = "ServerNamesAuditingFailed" // 服务域名审核失败 MessageTypeThresholdSatisfied MessageType = "ThresholdSatisfied" // 满足阈值 MessageTypeFirewallEvent MessageType = "FirewallEvent" // 防火墙事件 + MessageTypeIPAddrUp MessageType = "IPAddrUp" // IP地址上线 + MessageTypeIPAddrDown MessageType = "IPAddrDown" // IP地址下线 MessageTypeNSNodeInactive MessageType = "NSNodeInactive" // 边缘节点不活跃 MessageTypeNSNodeActive MessageType = "NSNodeActive" // 边缘节点活跃 diff --git a/internal/db/models/node_ip_address_dao.go b/internal/db/models/node_ip_address_dao.go index 5032072a..dac31657 100644 --- a/internal/db/models/node_ip_address_dao.go +++ b/internal/db/models/node_ip_address_dao.go @@ -7,7 +7,10 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/maps" "github.com/iwind/TeaGo/types" + "math" + "strings" ) const ( @@ -101,7 +104,7 @@ func (this *NodeIPAddressDAO) FindAddressName(tx *dbs.Tx, id int64) (string, err } // CreateAddress 创建IP地址 -func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, nodeId int64, role nodeconfigs.NodeRole, name string, ip string, canAccess bool) (addressId int64, err error) { +func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, nodeId int64, role nodeconfigs.NodeRole, name string, ip string, canAccess bool, thresholdsJSON []byte) (addressId int64, err error) { if len(role) == 0 { role = nodeconfigs.NodeRoleNode } @@ -112,6 +115,13 @@ func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, nodeId int64, role nodec op.Name = name op.Ip = ip op.CanAccess = canAccess + + if len(thresholdsJSON) > 0 { + op.Thresholds = thresholdsJSON + } else { + op.Thresholds = "[]" + } + op.State = NodeIPAddressStateEnabled err = this.Save(tx, op) if err != nil { @@ -127,7 +137,7 @@ func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, nodeId int64, role nodec } // UpdateAddress 修改IP地址 -func (this *NodeIPAddressDAO) UpdateAddress(tx *dbs.Tx, addressId int64, name string, ip string, canAccess bool, isOn bool) (err error) { +func (this *NodeIPAddressDAO) UpdateAddress(tx *dbs.Tx, addressId int64, name string, ip string, canAccess bool, isOn bool, thresholdsJSON []byte) (err error) { if addressId <= 0 { return errors.New("invalid addressId") } @@ -138,6 +148,13 @@ func (this *NodeIPAddressDAO) UpdateAddress(tx *dbs.Tx, addressId int64, name st op.Ip = ip op.CanAccess = canAccess op.IsOn = isOn + + if len(thresholdsJSON) > 0 { + op.Thresholds = thresholdsJSON + } else { + op.Thresholds = "[]" + } + op.State = NodeIPAddressStateEnabled // 恢复状态 err = this.Save(tx, op) if err != nil { @@ -247,6 +264,111 @@ func (this *NodeIPAddressDAO) FindNodeAccessAndUpIPAddresses(tx *dbs.Tx, nodeId return } +// FireThresholds 触发阈值 +func (this *NodeIPAddressDAO) FireThresholds(tx *dbs.Tx, role nodeconfigs.NodeRole, nodeId int64) error { + + ones, err := this.Query(tx). + Attr("state", NodeIPAddressStateEnabled). + Attr("role", role). + Attr("nodeId", nodeId). + Attr("canAccess", true). + Attr("isOn", true). + FindAll() + if err != nil { + return err + } + for _, one := range ones { + addr := one.(*NodeIPAddress) + var thresholds = addr.DecodeThresholds() + if len(thresholds) == 0 { + continue + } + var isOk = true + var summary = []string{} + for _, threshold := range thresholds { + if threshold.Value <= 0 || threshold.Duration <= 0 { + continue + } + + var value = float64(0) + switch threshold.Item { + case "avgRequests": + value, err = SharedNodeValueDAO.SumValues(tx, role, nodeId, nodeconfigs.NodeValueItemRequests, "total", nodeconfigs.NodeValueSumMethodAvg, types.Int32(threshold.Duration), threshold.DurationUnit) + value = math.Round(value / 60) + summary = append(summary, "平均请求数:"+types.String(value)+"/s") + case "avgTrafficOut": + value, err = SharedNodeValueDAO.SumValues(tx, role, nodeId, nodeconfigs.NodeValueItemTrafficOut, "total", nodeconfigs.NodeValueSumMethodAvg, types.Int32(threshold.Duration), threshold.DurationUnit) + value = math.Round(value*100/1024/1024/60) / 100 // 100 = 两位小数 + summary = append(summary, "平均下行流量:"+types.String(value)+"MB/s") + case "avgTrafficIn": + value, err = SharedNodeValueDAO.SumValues(tx, role, nodeId, nodeconfigs.NodeValueItemTrafficIn, "total", nodeconfigs.NodeValueSumMethodAvg, types.Int32(threshold.Duration), threshold.DurationUnit) + value = math.Round(value*100/1024/1024/60) / 100 // 100 = 两位小数 + summary = append(summary, "平均上行流量:"+types.String(value)+"MB/s") + default: + // TODO 支持更多 + err = errors.New("threshold item '" + threshold.Item + "' not supported") + } + if err != nil { + return err + } + if !nodeconfigs.CompareNodeValue(threshold.Operator, value, float64(threshold.Value)) { + isOk = false + } + } + if isOk && addr.IsUp == 0 { // 新上线 + _, err := this.Query(tx). + Pk(addr.Id). + Set("isUp", true). + Update() + if err != nil { + return err + } + + clusterId, err := SharedNodeDAO.FindNodeClusterId(tx, nodeId) + if err != nil { + return err + } + err = SharedMessageDAO.CreateNodeMessage(tx, role, clusterId, nodeId, MessageTypeIPAddrUp, MessageLevelSuccess, "节点IP'"+addr.Ip+"'因为达到阈值而上线", "节点IP'"+addr.Ip+"'因为达到阈值而上线。"+strings.Join(summary, ",") + "。", maps.Map{ + "addrId": addr.Id, + }.AsJSON()) + if err != nil { + return err + } + + err = this.NotifyUpdate(tx, int64(addr.Id)) + if err != nil { + return err + } + } else if !isOk && addr.IsUp == 1 { // 新离线 + _, err := this.Query(tx). + Pk(addr.Id). + Set("isUp", false). + Update() + if err != nil { + return err + } + + clusterId, err := SharedNodeDAO.FindNodeClusterId(tx, nodeId) + if err != nil { + return err + } + err = SharedMessageDAO.CreateNodeMessage(tx, role, clusterId, nodeId, MessageTypeIPAddrDown, MessageLevelWarning, "节点IP'"+addr.Ip+"'因为达到阈值而下线", "节点IP'"+addr.Ip+"'因为达到阈值而下线。"+strings.Join(summary, ",") + "。", maps.Map{ + "addrId": addr.Id, + }.AsJSON()) + if err != nil { + return err + } + + err = this.NotifyUpdate(tx, int64(addr.Id)) + if err != nil { + return err + } + } + } + + return nil +} + // NotifyUpdate 通知更新 func (this *NodeIPAddressDAO) NotifyUpdate(tx *dbs.Tx, addressId int64) error { address, err := this.Query(tx). diff --git a/internal/db/models/node_ip_address_model_ext.go b/internal/db/models/node_ip_address_model_ext.go index 2640e7f9..923de037 100644 --- a/internal/db/models/node_ip_address_model_ext.go +++ b/internal/db/models/node_ip_address_model_ext.go @@ -1 +1,20 @@ package models + +import ( + "encoding/json" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" + "github.com/iwind/TeaGo/logs" +) + +func (this *NodeIPAddress) DecodeThresholds() []*nodeconfigs.NodeValueThresholdConfig { + var result = []*nodeconfigs.NodeValueThresholdConfig{} + if len(this.Thresholds) == 0 { + return result + } + err := json.Unmarshal([]byte(this.Thresholds), &result) + if err != nil { + // 不处理错误 + logs.Error(err) + } + return result +} diff --git a/internal/rpc/services/service_node.go b/internal/rpc/services/service_node.go index 95287a50..70943075 100644 --- a/internal/rpc/services/service_node.go +++ b/internal/rpc/services/service_node.go @@ -1377,7 +1377,7 @@ func (this *NodeService) UpdateNodeDNS(ctx context.Context, req *pb.UpdateNodeDN return nil, err } } else { - _, err = models.SharedNodeIPAddressDAO.CreateAddress(tx, req.NodeId, nodeconfigs.NodeRoleNode, "DNS IP", req.IpAddr, true) + _, err = models.SharedNodeIPAddressDAO.CreateAddress(tx, req.NodeId, nodeconfigs.NodeRoleNode, "DNS IP", req.IpAddr, true, nil) if err != nil { return nil, err } diff --git a/internal/rpc/services/service_node_ip_address.go b/internal/rpc/services/service_node_ip_address.go index 80701ac4..5e331267 100644 --- a/internal/rpc/services/service_node_ip_address.go +++ b/internal/rpc/services/service_node_ip_address.go @@ -20,7 +20,7 @@ func (this *NodeIPAddressService) CreateNodeIPAddress(ctx context.Context, req * tx := this.NullTx() - addressId, err := models.SharedNodeIPAddressDAO.CreateAddress(tx, req.NodeId, req.Role, req.Name, req.Ip, req.CanAccess) + addressId, err := models.SharedNodeIPAddressDAO.CreateAddress(tx, req.NodeId, req.Role, req.Name, req.Ip, req.CanAccess, req.ThresholdsJSON) if err != nil { return nil, err } @@ -38,7 +38,7 @@ func (this *NodeIPAddressService) UpdateNodeIPAddress(ctx context.Context, req * tx := this.NullTx() - err = models.SharedNodeIPAddressDAO.UpdateAddress(tx, req.AddressId, req.Name, req.Ip, req.CanAccess, req.IsOn) + err = models.SharedNodeIPAddressDAO.UpdateAddress(tx, req.AddressId, req.Name, req.Ip, req.CanAccess, req.IsOn, req.ThresholdsJSON) if err != nil { return nil, err } diff --git a/internal/rpc/services/service_node_value.go b/internal/rpc/services/service_node_value.go index 8c5b396a..60cdef8a 100644 --- a/internal/rpc/services/service_node_value.go +++ b/internal/rpc/services/service_node_value.go @@ -4,6 +4,7 @@ package services import ( "context" + teaconst "github.com/TeaOSLab/EdgeAPI/internal/const" "github.com/TeaOSLab/EdgeAPI/internal/db/models" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" @@ -39,12 +40,21 @@ func (this *NodeValueService) CreateNodeValue(ctx context.Context, req *pb.Creat return nil, err } - // 触发阈值 + // 触发节点阈值 err = models.SharedNodeThresholdDAO.FireNodeThreshold(tx, role, nodeId, req.Item) if err != nil { return nil, err } + // 触发IP阈值 + // 企业版专有 + if teaconst.IsPlus { + err = models.SharedNodeIPAddressDAO.FireThresholds(tx, role, nodeId) + if err != nil { + return nil, err + } + } + return this.Success() }