实现基本的区域监控终端管理功能

This commit is contained in:
GoEdgeLab
2021-09-05 11:10:18 +08:00
parent bd30d4714d
commit 63bf680adb
14 changed files with 430 additions and 151 deletions

View File

@@ -8,10 +8,7 @@ import (
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/types"
"math"
"strings"
)
const (
@@ -278,143 +275,6 @@ func (this *NodeIPAddressDAO) FindNodeAccessAndUpIPAddresses(tx *dbs.Tx, nodeId
return
}
// FireThresholds 触发阈值
func (this *NodeIPAddressDAO) FireThresholds(tx *dbs.Tx, role nodeconfigs.NodeRole, nodeId int64) error {
ones, err := this.Query(tx).
Attr("state", NodeIPAddressStateEnabled).
Attr("role", role).
Attr("nodeId", nodeId).
Attr("canAccess", true).
Attr("isOn", true).
FindAll()
if err != nil {
return err
}
for _, one := range ones {
addr := one.(*NodeIPAddress)
var thresholds = addr.DecodeThresholds()
if len(thresholds) == 0 {
continue
}
var isOk = true
var positiveSummarys = []string{}
var negativeSummarys = []string{}
for _, threshold := range thresholds {
if threshold.Value <= 0 || threshold.Duration <= 0 {
continue
}
var value = float64(0)
var summary = ""
var op = nodeconfigs.FindNodeValueOperatorName(threshold.Operator)
switch threshold.Item {
case "avgRequests":
value, err = SharedNodeValueDAO.SumValues(tx, role, nodeId, nodeconfigs.NodeValueItemRequests, "total", nodeconfigs.NodeValueSumMethodAvg, types.Int32(threshold.Duration), threshold.DurationUnit)
value = math.Round(value / 60)
summary = "平均请求数:" + types.String(value) + "/s阈值" + op + " " + types.String(threshold.Value) + "/s"
case "avgTrafficOut":
value, err = SharedNodeValueDAO.SumValues(tx, role, nodeId, nodeconfigs.NodeValueItemTrafficOut, "total", nodeconfigs.NodeValueSumMethodAvg, types.Int32(threshold.Duration), threshold.DurationUnit)
value = math.Round(value*100/1024/1024/60) / 100 // 100 = 两位小数
summary = "平均下行流量:" + types.String(value) + "MB/s阈值" + op + " " + types.String(threshold.Value) + "MB/s"
case "avgTrafficIn":
value, err = SharedNodeValueDAO.SumValues(tx, role, nodeId, nodeconfigs.NodeValueItemTrafficIn, "total", nodeconfigs.NodeValueSumMethodAvg, types.Int32(threshold.Duration), threshold.DurationUnit)
value = math.Round(value*100/1024/1024/60) / 100 // 100 = 两位小数
summary = "平均上行流量:" + types.String(value) + "MB/s阈值" + op + " " + types.String(threshold.Value) + "MB/s"
default:
// TODO 支持更多
err = errors.New("threshold item '" + threshold.Item + "' not supported")
}
if err != nil {
return err
}
if !nodeconfigs.CompareNodeValue(threshold.Operator, value, float64(threshold.Value)) {
isOk = false
negativeSummarys = append(negativeSummarys, summary)
} else {
positiveSummarys = append(positiveSummarys, summary)
}
}
if isOk && addr.IsUp == 0 { // 新上线
_, err := this.Query(tx).
Pk(addr.Id).
Set("isUp", true).
Update()
if err != nil {
return err
}
// 增加日志
var description = ""
if len(negativeSummarys) > 0 {
description = "触发阈值:" + strings.Join(negativeSummarys, "")
} else {
description = "触发阈值:" + strings.Join(positiveSummarys, "")
}
err = SharedNodeIPAddressLogDAO.CreateLog(tx, 0, int64(addr.Id), description)
if err != nil {
return err
}
clusterId, err := SharedNodeDAO.FindNodeClusterId(tx, nodeId)
if err != nil {
return err
}
err = SharedMessageDAO.CreateNodeMessage(tx, role, clusterId, nodeId, MessageTypeIPAddrUp, MessageLevelSuccess, "节点IP'"+addr.Ip+"'因为达到阈值而上线", "节点IP'"+addr.Ip+"'因为达到阈值而上线。"+description+"。", maps.Map{
"addrId": addr.Id,
}.AsJSON())
if err != nil {
return err
}
err = this.NotifyUpdate(tx, int64(addr.Id))
if err != nil {
return err
}
} else if !isOk && addr.IsUp == 1 { // 新离线
_, err := this.Query(tx).
Pk(addr.Id).
Set("isUp", false).
Update()
if err != nil {
return err
}
// 增加日志
var description = ""
if len(negativeSummarys) > 0 {
description = "触发阈值:" + strings.Join(negativeSummarys, "")
} else {
description = "触发阈值:" + strings.Join(positiveSummarys, "")
}
err = SharedNodeIPAddressLogDAO.CreateLog(tx, 0, int64(addr.Id), description)
if err != nil {
return err
}
clusterId, err := SharedNodeDAO.FindNodeClusterId(tx, nodeId)
if err != nil {
return err
}
err = SharedMessageDAO.CreateNodeMessage(tx, role, clusterId, nodeId, MessageTypeIPAddrDown, MessageLevelWarning, "节点IP'"+addr.Ip+"'因为达到阈值而下线", "节点IP'"+addr.Ip+"'因为达到阈值而下线。"+description+"。", maps.Map{
"addrId": addr.Id,
}.AsJSON())
if err != nil {
return err
}
err = this.NotifyUpdate(tx, int64(addr.Id))
if err != nil {
return err
}
}
}
return nil
}
// CountAllEnabledIPAddresses 计算IP地址数量
// TODO 目前支持边缘节点将来支持NS节点
func (this *NodeIPAddressDAO) CountAllEnabledIPAddresses(tx *dbs.Tx, role string, nodeClusterId int64, upState configutils.BoolState, keyword string) (int64, error) {