实现基础的IP地址阈值

This commit is contained in:
刘祥超
2021-09-12 20:21:42 +08:00
parent 1ea7fe0213
commit 7fcc2b7dba
15 changed files with 616 additions and 100 deletions

View File

@@ -103,7 +103,7 @@ func (this *NodeIPAddressDAO) FindAddressName(tx *dbs.Tx, id int64) (string, err
}
// CreateAddress 创建IP地址
func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, adminId int64, nodeId int64, role nodeconfigs.NodeRole, name string, ip string, canAccess bool, thresholdsJSON []byte) (addressId int64, err error) {
func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, adminId int64, nodeId int64, role nodeconfigs.NodeRole, name string, ip string, canAccess bool) (addressId int64, err error) {
if len(role) == 0 {
role = nodeconfigs.NodeRoleNode
}
@@ -115,12 +115,6 @@ func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, adminId int64, nodeId in
op.Ip = ip
op.CanAccess = canAccess
if len(thresholdsJSON) > 0 {
op.Thresholds = thresholdsJSON
} else {
op.Thresholds = "[]"
}
op.State = NodeIPAddressStateEnabled
addressId, err = this.SaveInt64(tx, op)
if err != nil {
@@ -142,7 +136,7 @@ func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, adminId int64, nodeId in
}
// UpdateAddress 修改IP地址
func (this *NodeIPAddressDAO) UpdateAddress(tx *dbs.Tx, adminId int64, addressId int64, name string, ip string, canAccess bool, isOn bool, thresholdsJSON []byte) (err error) {
func (this *NodeIPAddressDAO) UpdateAddress(tx *dbs.Tx, adminId int64, addressId int64, name string, ip string, canAccess bool, isOn bool) (err error) {
if addressId <= 0 {
return errors.New("invalid addressId")
}
@@ -154,12 +148,6 @@ func (this *NodeIPAddressDAO) UpdateAddress(tx *dbs.Tx, adminId int64, addressId
op.CanAccess = canAccess
op.IsOn = isOn
if len(thresholdsJSON) > 0 {
op.Thresholds = thresholdsJSON
} else {
op.Thresholds = "[]"
}
op.State = NodeIPAddressStateEnabled // 恢复状态
err = this.Save(tx, op)
if err != nil {
@@ -401,6 +389,18 @@ func (this *NodeIPAddressDAO) UpdateAddressConnectivity(tx *dbs.Tx, addrId int64
UpdateQuickly()
}
// UpdateAddressIsUp 设置IP地址在线状态
func (this *NodeIPAddressDAO) UpdateAddressIsUp(tx *dbs.Tx, addressId int64, isUp bool) error {
var err = this.Query(tx).
Pk(addressId).
Set("isUp", isUp).
UpdateQuickly()
if err != nil {
return err
}
return this.NotifyUpdate(tx, addressId)
}
// NotifyUpdate 通知更新
func (this *NodeIPAddressDAO) NotifyUpdate(tx *dbs.Tx, addressId int64) error {
address, err := this.Query(tx).

View File

@@ -1,5 +1,31 @@
//go:build plus
// +build plus
package models
import (
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/dbs"
"testing"
)
func TestNodeIPAddressDAO_FireThresholds(t *testing.T) {
dbs.NotifyReady()
var tx *dbs.Tx
var nodeId int64 = 126
node, err := SharedNodeDAO.FindEnabledNode(tx, nodeId)
if err != nil {
t.Fatal(err)
}
if node == nil {
t.Log("node not found")
return
}
err = SharedNodeIPAddressDAO.FireThresholds(tx, nodeconfigs.NodeRoleNode, nodeId)
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}

View File

@@ -2,34 +2,34 @@ package models
// NodeIPAddress 节点IP地址
type NodeIPAddress struct {
Id uint32 `field:"id"` // ID
NodeId uint32 `field:"nodeId"` // 节点ID
Role string `field:"role"` // 节点角色
Name string `field:"name"` // 名称
Ip string `field:"ip"` // IP地址
Description string `field:"description"` // 描述
State uint8 `field:"state"` // 状态
Order uint32 `field:"order"` // 排序
CanAccess uint8 `field:"canAccess"` // 是否可以访问
IsOn uint8 `field:"isOn"` // 是否启用
IsUp uint8 `field:"isUp"` // 是否上线
Thresholds string `field:"thresholds"` // 上线阈值
Id uint32 `field:"id"` // ID
NodeId uint32 `field:"nodeId"` // 节点ID
Role string `field:"role"` // 节点角色
Name string `field:"name"` // 名称
Ip string `field:"ip"` // IP地址
Description string `field:"description"` // 描述
State uint8 `field:"state"` // 状态
Order uint32 `field:"order"` // 排序
CanAccess uint8 `field:"canAccess"` // 是否可以访问
IsOn uint8 `field:"isOn"` // 是否启用
IsUp uint8 `field:"isUp"` // 是否上线
//Thresholds string `field:"thresholds"` // 上线阈值
Connectivity string `field:"connectivity"` // 连通性状态
}
type NodeIPAddressOperator struct {
Id interface{} // ID
NodeId interface{} // 节点ID
Role interface{} // 节点角色
Name interface{} // 名称
Ip interface{} // IP地址
Description interface{} // 描述
State interface{} // 状态
Order interface{} // 排序
CanAccess interface{} // 是否可以访问
IsOn interface{} // 是否启用
IsUp interface{} // 是否上线
Thresholds interface{} // 上线阈值
Id interface{} // ID
NodeId interface{} // 节点ID
Role interface{} // 节点角色
Name interface{} // 名称
Ip interface{} // IP地址
Description interface{} // 描述
State interface{} // 状态
Order interface{} // 排序
CanAccess interface{} // 是否可以访问
IsOn interface{} // 是否启用
IsUp interface{} // 是否上线
//Thresholds interface{} // 上线阈值
Connectivity interface{} // 连通性状态
}

View File

@@ -4,22 +4,8 @@ import (
"encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/iwind/TeaGo/logs"
)
func (this *NodeIPAddress) DecodeThresholds() []*nodeconfigs.NodeValueThresholdConfig {
var result = []*nodeconfigs.NodeValueThresholdConfig{}
if len(this.Thresholds) == 0 {
return result
}
err := json.Unmarshal([]byte(this.Thresholds), &result)
if err != nil {
// 不处理错误
logs.Error(err)
}
return result
}
func (this *NodeIPAddress) DecodeConnectivity() *nodeconfigs.Connectivity {
var connectivity = &nodeconfigs.Connectivity{}
if len(this.Connectivity) > 0 {

View File

@@ -0,0 +1,236 @@
package models
import (
"encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/maps"
)
const (
NodeIPAddressThresholdStateEnabled = 1 // 已启用
NodeIPAddressThresholdStateDisabled = 0 // 已禁用
)
type NodeIPAddressThresholdDAO dbs.DAO
func NewNodeIPAddressThresholdDAO() *NodeIPAddressThresholdDAO {
return dbs.NewDAO(&NodeIPAddressThresholdDAO{
DAOObject: dbs.DAOObject{
DB: Tea.Env,
Table: "edgeNodeIPAddressThresholds",
Model: new(NodeIPAddressThreshold),
PkName: "id",
},
}).(*NodeIPAddressThresholdDAO)
}
var SharedNodeIPAddressThresholdDAO *NodeIPAddressThresholdDAO
func init() {
dbs.OnReady(func() {
SharedNodeIPAddressThresholdDAO = NewNodeIPAddressThresholdDAO()
})
}
// EnableNodeIPAddressThreshold 启用条目
func (this *NodeIPAddressThresholdDAO) EnableNodeIPAddressThreshold(tx *dbs.Tx, id uint64) error {
_, err := this.Query(tx).
Pk(id).
Set("state", NodeIPAddressThresholdStateEnabled).
Update()
return err
}
// DisableNodeIPAddressThreshold 禁用条目
func (this *NodeIPAddressThresholdDAO) DisableNodeIPAddressThreshold(tx *dbs.Tx, id int64) error {
_, err := this.Query(tx).
Pk(id).
Set("state", NodeIPAddressThresholdStateDisabled).
Update()
return err
}
// FindEnabledNodeIPAddressThreshold 查找启用中的条目
func (this *NodeIPAddressThresholdDAO) FindEnabledNodeIPAddressThreshold(tx *dbs.Tx, id uint64) (*NodeIPAddressThreshold, error) {
result, err := this.Query(tx).
Pk(id).
Attr("state", NodeIPAddressThresholdStateEnabled).
Find()
if result == nil {
return nil, err
}
return result.(*NodeIPAddressThreshold), err
}
// FindAllEnabledThresholdsWithAddrId 查找所有阈值
func (this *NodeIPAddressThresholdDAO) FindAllEnabledThresholdsWithAddrId(tx *dbs.Tx, addrId int64) (result []*NodeIPAddressThreshold, err error) {
_, err = this.Query(tx).
Attr("addressId", addrId).
State(NodeIPAddressThresholdStateEnabled).
AscPk().
Desc("order").
Slice(&result).
FindAll()
if err != nil {
return nil, err
}
// 过滤参数
for _, threshold := range result {
err := this.formatThreshold(tx, threshold)
if err != nil {
return nil, err
}
}
return
}
// CountAllEnabledThresholdsWithAddrId 计算所有阈值数量
func (this *NodeIPAddressThresholdDAO) CountAllEnabledThresholdsWithAddrId(tx *dbs.Tx, addrId int64) (int64, error) {
return this.Query(tx).
Attr("addressId", addrId).
State(NodeIPAddressThresholdStateEnabled).
Count()
}
// FindThresholdNotifiedAt 查找上次通知时间
func (this *NodeIPAddressThresholdDAO) FindThresholdNotifiedAt(tx *dbs.Tx, thresholdId int64) (int64, error) {
return this.Query(tx).
Pk(thresholdId).
Result("notifiedAt").
FindInt64Col(0)
}
// UpdateThresholdNotifiedAt 设置上次通知时间
func (this *NodeIPAddressThresholdDAO) UpdateThresholdNotifiedAt(tx *dbs.Tx, thresholdId int64, timestamp int64) error {
return this.Query(tx).
Pk(thresholdId).
Set("notifiedAt", timestamp).
UpdateQuickly()
}
// CreateThreshold 创建阈值
func (this *NodeIPAddressThresholdDAO) CreateThreshold(tx *dbs.Tx, addressId int64, items []*nodeconfigs.NodeValueThresholdItemConfig, actions []*nodeconfigs.NodeValueThresholdActionConfig, order int) (int64, error) {
if addressId <= 0 {
return 0, errors.New("invalid addressId")
}
var op = NewNodeIPAddressThresholdOperator()
op.Order = order
op.AddressId = addressId
if len(items) > 0 {
itemsJSON, err := json.Marshal(items)
if err != nil {
return 0, err
}
op.Items = itemsJSON
} else {
op.Items = "[]"
}
if len(actions) > 0 {
actionsJSON, err := json.Marshal(actions)
if err != nil {
return 0, err
}
op.Actions = actionsJSON
} else {
op.Actions = "[]"
}
op.State = NodeIPAddressThresholdStateEnabled
return this.SaveInt64(tx, op)
}
// UpdateThreshold 修改阈值
func (this *NodeIPAddressThresholdDAO) UpdateThreshold(tx *dbs.Tx, thresholdId int64, items []*nodeconfigs.NodeValueThresholdItemConfig, actions []*nodeconfigs.NodeValueThresholdActionConfig, order int) error {
if thresholdId <= 0 {
return errors.New("invalid thresholdId")
}
var op = NewNodeIPAddressThresholdOperator()
op.State = NodeIPAddressThresholdStateEnabled // 恢复状态
if order >= 0 {
op.Order = order
}
op.Id = thresholdId
if len(items) > 0 {
itemsJSON, err := json.Marshal(items)
if err != nil {
return err
}
op.Items = itemsJSON
} else {
op.Items = "[]"
}
if len(actions) > 0 {
actionsJSON, err := json.Marshal(actions)
if err != nil {
return err
}
op.Actions = actionsJSON
} else {
op.Actions = "[]"
}
return this.Save(tx, op)
}
// DisableAllThresholdsWithAddrId 禁用所有阈值
func (this *NodeIPAddressThresholdDAO) DisableAllThresholdsWithAddrId(tx *dbs.Tx, addrId int64) error {
return this.Query(tx).
Attr("addressId", addrId).
Set("state", NodeIPAddressThresholdStateDisabled).
UpdateQuickly()
}
// 格式化阈值
func (this *NodeIPAddressThresholdDAO) formatThreshold(tx *dbs.Tx, threshold *NodeIPAddressThreshold) error {
if len(threshold.Items) == 0 {
return nil
}
var items = threshold.DecodeItems()
for _, item := range items {
if item.Item == nodeconfigs.IPAddressThresholdItemConnectivity {
if item.Options == nil {
continue
}
var groups = item.Options.GetSlice("groups")
if len(groups) > 0 {
var newGroups = []maps.Map{}
for _, groupOne := range groups {
var groupMap = maps.NewMap(groupOne)
var groupId = groupMap.GetInt64("id")
group, err := SharedReportNodeGroupDAO.FindEnabledReportNodeGroup(tx, groupId)
if err != nil {
return err
}
if group == nil {
continue
}
newGroups = append(newGroups, maps.Map{
"id": group.Id,
"name": group.Name,
})
}
item.Options["groups"] = newGroups
}
}
}
itemsJSON, err := json.Marshal(items)
if err != nil {
return err
}
threshold.Items = string(itemsJSON)
return nil
}

View File

@@ -0,0 +1,6 @@
package models
import (
_ "github.com/go-sql-driver/mysql"
_ "github.com/iwind/TeaGo/bootstrap"
)

View File

@@ -0,0 +1,26 @@
package models
// NodeIPAddressThreshold IP地址阈值
type NodeIPAddressThreshold struct {
Id uint64 `field:"id"` // ID
AddressId uint64 `field:"addressId"` // IP地址ID
Items string `field:"items"` // 阈值条目
Actions string `field:"actions"` // 动作
NotifiedAt uint64 `field:"notifiedAt"` // 上次通知时间
State uint8 `field:"state"` // 状态
Order uint32 `field:"order"` // 排序
}
type NodeIPAddressThresholdOperator struct {
Id interface{} // ID
AddressId interface{} // IP地址ID
Items interface{} // 阈值条目
Actions interface{} // 动作
NotifiedAt interface{} // 上次通知时间
State interface{} // 状态
Order interface{} // 排序
}
func NewNodeIPAddressThresholdOperator() *NodeIPAddressThresholdOperator {
return &NodeIPAddressThresholdOperator{}
}

View File

@@ -0,0 +1,31 @@
package models
import (
"encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
)
func (this *NodeIPAddressThreshold) DecodeItems() (result []*nodeconfigs.NodeValueThresholdItemConfig) {
if len(this.Items) == 0 {
return
}
err := json.Unmarshal([]byte(this.Items), &result)
if err != nil {
remotelogs.Error("NodeIPAddressThreshold", "decode items: "+err.Error())
}
return
}
func (this *NodeIPAddressThreshold) DecodeActions() (result []*nodeconfigs.NodeValueThresholdActionConfig) {
if len(this.Actions) == 0 {
return
}
err := json.Unmarshal([]byte(this.Actions), &result)
if err != nil {
remotelogs.Error("NodeIPAddressThreshold", "decode actions: "+err.Error())
}
return
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/types"
"time"
)
@@ -163,26 +164,41 @@ func (this *ReportResultDAO) FindAvgLevelWithTarget(tx *dbs.Tx, taskType reporte
}
// FindConnectivityWithTarget 获取某个对象的连通率
func (this *ReportResultDAO) FindConnectivityWithTarget(tx *dbs.Tx, taskType reporterconfigs.TaskType, targetId int64) (float64, error) {
// 已汇报数据的数量
total, err := this.Query(tx).
func (this *ReportResultDAO) FindConnectivityWithTarget(tx *dbs.Tx, taskType reporterconfigs.TaskType, targetId int64, groupId int64) (float64, error) {
var query = this.Query(tx).
Attr("type", taskType).
Attr("targetId", targetId).
Where("reportNodeId IN (SELECT id FROM "+SharedReportNodeDAO.Table+" WHERE state=1 AND isOn=1)").
Attr("targetId", targetId)
if groupId > 0 {
query.Where("reportNodeId IN (SELECT id FROM "+SharedReportNodeDAO.Table+" WHERE state=1 AND isOn=1 AND JSON_CONTAINS(groupIds, :groupIdString))").
Param("groupIdString", types.String(groupId))
} else {
query.Where("reportNodeId IN (SELECT id FROM " + SharedReportNodeDAO.Table + " WHERE state=1 AND isOn=1)")
}
// 已汇报数据的数量
total, err := query.
Gt("updatedAt", time.Now().Unix()-600).
Count()
if err != nil {
return 0, err
}
if total == 0 {
return 0, nil
return 1, nil
}
// 连通的数量
countConnected, err := this.Query(tx).
var connectedQuery = this.Query(tx).
Attr("type", taskType).
Attr("targetId", targetId).
Where("reportNodeId IN (SELECT id FROM "+SharedReportNodeDAO.Table+" WHERE state=1 AND isOn=1)").
Attr("targetId", targetId)
if groupId > 0 {
connectedQuery.Where("reportNodeId IN (SELECT id FROM "+SharedReportNodeDAO.Table+" WHERE state=1 AND isOn=1 AND JSON_CONTAINS(groupIds, :groupIdString))").
Param("groupIdString", types.String(groupId))
} else {
connectedQuery.Where("reportNodeId IN (SELECT id FROM " + SharedReportNodeDAO.Table + " WHERE state=1 AND isOn=1)")
}
countConnected, err := connectedQuery.
Attr("isOk", true).
Gt("updatedAt", time.Now().Unix()-600).
Count()