mirror of
				https://github.com/TeaOSLab/EdgeAPI.git
				synced 2025-11-04 16:00:24 +08:00 
			
		
		
		
	实现基础的IP地址阈值
This commit is contained in:
		@@ -103,7 +103,7 @@ func (this *NodeIPAddressDAO) FindAddressName(tx *dbs.Tx, id int64) (string, err
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// CreateAddress 创建IP地址
 | 
					// CreateAddress 创建IP地址
 | 
				
			||||||
func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, adminId int64, nodeId int64, role nodeconfigs.NodeRole, name string, ip string, canAccess bool, thresholdsJSON []byte) (addressId int64, err error) {
 | 
					func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, adminId int64, nodeId int64, role nodeconfigs.NodeRole, name string, ip string, canAccess bool) (addressId int64, err error) {
 | 
				
			||||||
	if len(role) == 0 {
 | 
						if len(role) == 0 {
 | 
				
			||||||
		role = nodeconfigs.NodeRoleNode
 | 
							role = nodeconfigs.NodeRoleNode
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -115,12 +115,6 @@ func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, adminId int64, nodeId in
 | 
				
			|||||||
	op.Ip = ip
 | 
						op.Ip = ip
 | 
				
			||||||
	op.CanAccess = canAccess
 | 
						op.CanAccess = canAccess
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if len(thresholdsJSON) > 0 {
 | 
					 | 
				
			||||||
		op.Thresholds = thresholdsJSON
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		op.Thresholds = "[]"
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	op.State = NodeIPAddressStateEnabled
 | 
						op.State = NodeIPAddressStateEnabled
 | 
				
			||||||
	addressId, err = this.SaveInt64(tx, op)
 | 
						addressId, err = this.SaveInt64(tx, op)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
@@ -142,7 +136,7 @@ func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, adminId int64, nodeId in
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// UpdateAddress 修改IP地址
 | 
					// UpdateAddress 修改IP地址
 | 
				
			||||||
func (this *NodeIPAddressDAO) UpdateAddress(tx *dbs.Tx, adminId int64, addressId int64, name string, ip string, canAccess bool, isOn bool, thresholdsJSON []byte) (err error) {
 | 
					func (this *NodeIPAddressDAO) UpdateAddress(tx *dbs.Tx, adminId int64, addressId int64, name string, ip string, canAccess bool, isOn bool) (err error) {
 | 
				
			||||||
	if addressId <= 0 {
 | 
						if addressId <= 0 {
 | 
				
			||||||
		return errors.New("invalid addressId")
 | 
							return errors.New("invalid addressId")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -154,12 +148,6 @@ func (this *NodeIPAddressDAO) UpdateAddress(tx *dbs.Tx, adminId int64, addressId
 | 
				
			|||||||
	op.CanAccess = canAccess
 | 
						op.CanAccess = canAccess
 | 
				
			||||||
	op.IsOn = isOn
 | 
						op.IsOn = isOn
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if len(thresholdsJSON) > 0 {
 | 
					 | 
				
			||||||
		op.Thresholds = thresholdsJSON
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		op.Thresholds = "[]"
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	op.State = NodeIPAddressStateEnabled // 恢复状态
 | 
						op.State = NodeIPAddressStateEnabled // 恢复状态
 | 
				
			||||||
	err = this.Save(tx, op)
 | 
						err = this.Save(tx, op)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
@@ -401,6 +389,18 @@ func (this *NodeIPAddressDAO) UpdateAddressConnectivity(tx *dbs.Tx, addrId int64
 | 
				
			|||||||
		UpdateQuickly()
 | 
							UpdateQuickly()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// UpdateAddressIsUp 设置IP地址在线状态
 | 
				
			||||||
 | 
					func (this *NodeIPAddressDAO) UpdateAddressIsUp(tx *dbs.Tx, addressId int64, isUp bool) error {
 | 
				
			||||||
 | 
						var err = this.Query(tx).
 | 
				
			||||||
 | 
							Pk(addressId).
 | 
				
			||||||
 | 
							Set("isUp", isUp).
 | 
				
			||||||
 | 
							UpdateQuickly()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return this.NotifyUpdate(tx, addressId)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NotifyUpdate 通知更新
 | 
					// NotifyUpdate 通知更新
 | 
				
			||||||
func (this *NodeIPAddressDAO) NotifyUpdate(tx *dbs.Tx, addressId int64) error {
 | 
					func (this *NodeIPAddressDAO) NotifyUpdate(tx *dbs.Tx, addressId int64) error {
 | 
				
			||||||
	address, err := this.Query(tx).
 | 
						address, err := this.Query(tx).
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1,5 +1,31 @@
 | 
				
			|||||||
 | 
					//go:build plus
 | 
				
			||||||
 | 
					// +build plus
 | 
				
			||||||
 | 
					
 | 
				
			||||||
package models
 | 
					package models
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
 | 
				
			||||||
	_ "github.com/go-sql-driver/mysql"
 | 
						_ "github.com/go-sql-driver/mysql"
 | 
				
			||||||
 | 
						"github.com/iwind/TeaGo/dbs"
 | 
				
			||||||
 | 
						"testing"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestNodeIPAddressDAO_FireThresholds(t *testing.T) {
 | 
				
			||||||
 | 
						dbs.NotifyReady()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var tx *dbs.Tx
 | 
				
			||||||
 | 
						var nodeId int64 = 126
 | 
				
			||||||
 | 
						node, err := SharedNodeDAO.FindEnabledNode(tx, nodeId)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatal(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if node == nil {
 | 
				
			||||||
 | 
							t.Log("node not found")
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						err = SharedNodeIPAddressDAO.FireThresholds(tx, nodeconfigs.NodeRoleNode, nodeId)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatal(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						t.Log("ok")
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -2,34 +2,34 @@ package models
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// NodeIPAddress 节点IP地址
 | 
					// NodeIPAddress 节点IP地址
 | 
				
			||||||
type NodeIPAddress struct {
 | 
					type NodeIPAddress struct {
 | 
				
			||||||
	Id           uint32 `field:"id"`           // ID
 | 
						Id          uint32 `field:"id"`          // ID
 | 
				
			||||||
	NodeId       uint32 `field:"nodeId"`       // 节点ID
 | 
						NodeId      uint32 `field:"nodeId"`      // 节点ID
 | 
				
			||||||
	Role         string `field:"role"`         // 节点角色
 | 
						Role        string `field:"role"`        // 节点角色
 | 
				
			||||||
	Name         string `field:"name"`         // 名称
 | 
						Name        string `field:"name"`        // 名称
 | 
				
			||||||
	Ip           string `field:"ip"`           // IP地址
 | 
						Ip          string `field:"ip"`          // IP地址
 | 
				
			||||||
	Description  string `field:"description"`  // 描述
 | 
						Description string `field:"description"` // 描述
 | 
				
			||||||
	State        uint8  `field:"state"`        // 状态
 | 
						State       uint8  `field:"state"`       // 状态
 | 
				
			||||||
	Order        uint32 `field:"order"`        // 排序
 | 
						Order       uint32 `field:"order"`       // 排序
 | 
				
			||||||
	CanAccess    uint8  `field:"canAccess"`    // 是否可以访问
 | 
						CanAccess   uint8  `field:"canAccess"`   // 是否可以访问
 | 
				
			||||||
	IsOn         uint8  `field:"isOn"`         // 是否启用
 | 
						IsOn        uint8  `field:"isOn"`        // 是否启用
 | 
				
			||||||
	IsUp         uint8  `field:"isUp"`         // 是否上线
 | 
						IsUp        uint8  `field:"isUp"`        // 是否上线
 | 
				
			||||||
	Thresholds   string `field:"thresholds"`   // 上线阈值
 | 
						//Thresholds   string `field:"thresholds"`   // 上线阈值
 | 
				
			||||||
	Connectivity string `field:"connectivity"` // 连通性状态
 | 
						Connectivity string `field:"connectivity"` // 连通性状态
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type NodeIPAddressOperator struct {
 | 
					type NodeIPAddressOperator struct {
 | 
				
			||||||
	Id           interface{} // ID
 | 
						Id          interface{} // ID
 | 
				
			||||||
	NodeId       interface{} // 节点ID
 | 
						NodeId      interface{} // 节点ID
 | 
				
			||||||
	Role         interface{} // 节点角色
 | 
						Role        interface{} // 节点角色
 | 
				
			||||||
	Name         interface{} // 名称
 | 
						Name        interface{} // 名称
 | 
				
			||||||
	Ip           interface{} // IP地址
 | 
						Ip          interface{} // IP地址
 | 
				
			||||||
	Description  interface{} // 描述
 | 
						Description interface{} // 描述
 | 
				
			||||||
	State        interface{} // 状态
 | 
						State       interface{} // 状态
 | 
				
			||||||
	Order        interface{} // 排序
 | 
						Order       interface{} // 排序
 | 
				
			||||||
	CanAccess    interface{} // 是否可以访问
 | 
						CanAccess   interface{} // 是否可以访问
 | 
				
			||||||
	IsOn         interface{} // 是否启用
 | 
						IsOn        interface{} // 是否启用
 | 
				
			||||||
	IsUp         interface{} // 是否上线
 | 
						IsUp        interface{} // 是否上线
 | 
				
			||||||
	Thresholds   interface{} // 上线阈值
 | 
						//Thresholds   interface{} // 上线阈值
 | 
				
			||||||
	Connectivity interface{} // 连通性状态
 | 
						Connectivity interface{} // 连通性状态
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -4,22 +4,8 @@ import (
 | 
				
			|||||||
	"encoding/json"
 | 
						"encoding/json"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
 | 
						"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
 | 
				
			||||||
	"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
 | 
						"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
 | 
				
			||||||
	"github.com/iwind/TeaGo/logs"
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (this *NodeIPAddress) DecodeThresholds() []*nodeconfigs.NodeValueThresholdConfig {
 | 
					 | 
				
			||||||
	var result = []*nodeconfigs.NodeValueThresholdConfig{}
 | 
					 | 
				
			||||||
	if len(this.Thresholds) == 0 {
 | 
					 | 
				
			||||||
		return result
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	err := json.Unmarshal([]byte(this.Thresholds), &result)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		// 不处理错误
 | 
					 | 
				
			||||||
		logs.Error(err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return result
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (this *NodeIPAddress) DecodeConnectivity() *nodeconfigs.Connectivity {
 | 
					func (this *NodeIPAddress) DecodeConnectivity() *nodeconfigs.Connectivity {
 | 
				
			||||||
	var connectivity = &nodeconfigs.Connectivity{}
 | 
						var connectivity = &nodeconfigs.Connectivity{}
 | 
				
			||||||
	if len(this.Connectivity) > 0 {
 | 
						if len(this.Connectivity) > 0 {
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										236
									
								
								internal/db/models/node_ip_address_threshold_dao.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										236
									
								
								internal/db/models/node_ip_address_threshold_dao.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,236 @@
 | 
				
			|||||||
 | 
					package models
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"encoding/json"
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeAPI/internal/errors"
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
 | 
				
			||||||
 | 
						_ "github.com/go-sql-driver/mysql"
 | 
				
			||||||
 | 
						"github.com/iwind/TeaGo/Tea"
 | 
				
			||||||
 | 
						"github.com/iwind/TeaGo/dbs"
 | 
				
			||||||
 | 
						"github.com/iwind/TeaGo/maps"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const (
 | 
				
			||||||
 | 
						NodeIPAddressThresholdStateEnabled  = 1 // 已启用
 | 
				
			||||||
 | 
						NodeIPAddressThresholdStateDisabled = 0 // 已禁用
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type NodeIPAddressThresholdDAO dbs.DAO
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func NewNodeIPAddressThresholdDAO() *NodeIPAddressThresholdDAO {
 | 
				
			||||||
 | 
						return dbs.NewDAO(&NodeIPAddressThresholdDAO{
 | 
				
			||||||
 | 
							DAOObject: dbs.DAOObject{
 | 
				
			||||||
 | 
								DB:     Tea.Env,
 | 
				
			||||||
 | 
								Table:  "edgeNodeIPAddressThresholds",
 | 
				
			||||||
 | 
								Model:  new(NodeIPAddressThreshold),
 | 
				
			||||||
 | 
								PkName: "id",
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}).(*NodeIPAddressThresholdDAO)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var SharedNodeIPAddressThresholdDAO *NodeIPAddressThresholdDAO
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func init() {
 | 
				
			||||||
 | 
						dbs.OnReady(func() {
 | 
				
			||||||
 | 
							SharedNodeIPAddressThresholdDAO = NewNodeIPAddressThresholdDAO()
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// EnableNodeIPAddressThreshold 启用条目
 | 
				
			||||||
 | 
					func (this *NodeIPAddressThresholdDAO) EnableNodeIPAddressThreshold(tx *dbs.Tx, id uint64) error {
 | 
				
			||||||
 | 
						_, err := this.Query(tx).
 | 
				
			||||||
 | 
							Pk(id).
 | 
				
			||||||
 | 
							Set("state", NodeIPAddressThresholdStateEnabled).
 | 
				
			||||||
 | 
							Update()
 | 
				
			||||||
 | 
						return err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// DisableNodeIPAddressThreshold 禁用条目
 | 
				
			||||||
 | 
					func (this *NodeIPAddressThresholdDAO) DisableNodeIPAddressThreshold(tx *dbs.Tx, id int64) error {
 | 
				
			||||||
 | 
						_, err := this.Query(tx).
 | 
				
			||||||
 | 
							Pk(id).
 | 
				
			||||||
 | 
							Set("state", NodeIPAddressThresholdStateDisabled).
 | 
				
			||||||
 | 
							Update()
 | 
				
			||||||
 | 
						return err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// FindEnabledNodeIPAddressThreshold 查找启用中的条目
 | 
				
			||||||
 | 
					func (this *NodeIPAddressThresholdDAO) FindEnabledNodeIPAddressThreshold(tx *dbs.Tx, id uint64) (*NodeIPAddressThreshold, error) {
 | 
				
			||||||
 | 
						result, err := this.Query(tx).
 | 
				
			||||||
 | 
							Pk(id).
 | 
				
			||||||
 | 
							Attr("state", NodeIPAddressThresholdStateEnabled).
 | 
				
			||||||
 | 
							Find()
 | 
				
			||||||
 | 
						if result == nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return result.(*NodeIPAddressThreshold), err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// FindAllEnabledThresholdsWithAddrId 查找所有阈值
 | 
				
			||||||
 | 
					func (this *NodeIPAddressThresholdDAO) FindAllEnabledThresholdsWithAddrId(tx *dbs.Tx, addrId int64) (result []*NodeIPAddressThreshold, err error) {
 | 
				
			||||||
 | 
						_, err = this.Query(tx).
 | 
				
			||||||
 | 
							Attr("addressId", addrId).
 | 
				
			||||||
 | 
							State(NodeIPAddressThresholdStateEnabled).
 | 
				
			||||||
 | 
							AscPk().
 | 
				
			||||||
 | 
							Desc("order").
 | 
				
			||||||
 | 
							Slice(&result).
 | 
				
			||||||
 | 
							FindAll()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 过滤参数
 | 
				
			||||||
 | 
						for _, threshold := range result {
 | 
				
			||||||
 | 
							err := this.formatThreshold(tx, threshold)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return nil, err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// CountAllEnabledThresholdsWithAddrId 计算所有阈值数量
 | 
				
			||||||
 | 
					func (this *NodeIPAddressThresholdDAO) CountAllEnabledThresholdsWithAddrId(tx *dbs.Tx, addrId int64) (int64, error) {
 | 
				
			||||||
 | 
						return this.Query(tx).
 | 
				
			||||||
 | 
							Attr("addressId", addrId).
 | 
				
			||||||
 | 
							State(NodeIPAddressThresholdStateEnabled).
 | 
				
			||||||
 | 
							Count()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// FindThresholdNotifiedAt 查找上次通知时间
 | 
				
			||||||
 | 
					func (this *NodeIPAddressThresholdDAO) FindThresholdNotifiedAt(tx *dbs.Tx, thresholdId int64) (int64, error) {
 | 
				
			||||||
 | 
						return this.Query(tx).
 | 
				
			||||||
 | 
							Pk(thresholdId).
 | 
				
			||||||
 | 
							Result("notifiedAt").
 | 
				
			||||||
 | 
							FindInt64Col(0)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// UpdateThresholdNotifiedAt 设置上次通知时间
 | 
				
			||||||
 | 
					func (this *NodeIPAddressThresholdDAO) UpdateThresholdNotifiedAt(tx *dbs.Tx, thresholdId int64, timestamp int64) error {
 | 
				
			||||||
 | 
						return this.Query(tx).
 | 
				
			||||||
 | 
							Pk(thresholdId).
 | 
				
			||||||
 | 
							Set("notifiedAt", timestamp).
 | 
				
			||||||
 | 
							UpdateQuickly()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// CreateThreshold 创建阈值
 | 
				
			||||||
 | 
					func (this *NodeIPAddressThresholdDAO) CreateThreshold(tx *dbs.Tx, addressId int64, items []*nodeconfigs.NodeValueThresholdItemConfig, actions []*nodeconfigs.NodeValueThresholdActionConfig, order int) (int64, error) {
 | 
				
			||||||
 | 
						if addressId <= 0 {
 | 
				
			||||||
 | 
							return 0, errors.New("invalid addressId")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						var op = NewNodeIPAddressThresholdOperator()
 | 
				
			||||||
 | 
						op.Order = order
 | 
				
			||||||
 | 
						op.AddressId = addressId
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if len(items) > 0 {
 | 
				
			||||||
 | 
							itemsJSON, err := json.Marshal(items)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return 0, err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							op.Items = itemsJSON
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							op.Items = "[]"
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if len(actions) > 0 {
 | 
				
			||||||
 | 
							actionsJSON, err := json.Marshal(actions)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return 0, err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							op.Actions = actionsJSON
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							op.Actions = "[]"
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						op.State = NodeIPAddressThresholdStateEnabled
 | 
				
			||||||
 | 
						return this.SaveInt64(tx, op)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// UpdateThreshold 修改阈值
 | 
				
			||||||
 | 
					func (this *NodeIPAddressThresholdDAO) UpdateThreshold(tx *dbs.Tx, thresholdId int64, items []*nodeconfigs.NodeValueThresholdItemConfig, actions []*nodeconfigs.NodeValueThresholdActionConfig, order int) error {
 | 
				
			||||||
 | 
						if thresholdId <= 0 {
 | 
				
			||||||
 | 
							return errors.New("invalid thresholdId")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						var op = NewNodeIPAddressThresholdOperator()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						op.State = NodeIPAddressThresholdStateEnabled // 恢复状态
 | 
				
			||||||
 | 
						if order >= 0 {
 | 
				
			||||||
 | 
							op.Order = order
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						op.Id = thresholdId
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if len(items) > 0 {
 | 
				
			||||||
 | 
							itemsJSON, err := json.Marshal(items)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							op.Items = itemsJSON
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							op.Items = "[]"
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if len(actions) > 0 {
 | 
				
			||||||
 | 
							actionsJSON, err := json.Marshal(actions)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							op.Actions = actionsJSON
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							op.Actions = "[]"
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return this.Save(tx, op)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// DisableAllThresholdsWithAddrId 禁用所有阈值
 | 
				
			||||||
 | 
					func (this *NodeIPAddressThresholdDAO) DisableAllThresholdsWithAddrId(tx *dbs.Tx, addrId int64) error {
 | 
				
			||||||
 | 
						return this.Query(tx).
 | 
				
			||||||
 | 
							Attr("addressId", addrId).
 | 
				
			||||||
 | 
							Set("state", NodeIPAddressThresholdStateDisabled).
 | 
				
			||||||
 | 
							UpdateQuickly()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// 格式化阈值
 | 
				
			||||||
 | 
					func (this *NodeIPAddressThresholdDAO) formatThreshold(tx *dbs.Tx, threshold *NodeIPAddressThreshold) error {
 | 
				
			||||||
 | 
						if len(threshold.Items) == 0 {
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						var items = threshold.DecodeItems()
 | 
				
			||||||
 | 
						for _, item := range items {
 | 
				
			||||||
 | 
							if item.Item == nodeconfigs.IPAddressThresholdItemConnectivity {
 | 
				
			||||||
 | 
								if item.Options == nil {
 | 
				
			||||||
 | 
									continue
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								var groups = item.Options.GetSlice("groups")
 | 
				
			||||||
 | 
								if len(groups) > 0 {
 | 
				
			||||||
 | 
									var newGroups = []maps.Map{}
 | 
				
			||||||
 | 
									for _, groupOne := range groups {
 | 
				
			||||||
 | 
										var groupMap = maps.NewMap(groupOne)
 | 
				
			||||||
 | 
										var groupId = groupMap.GetInt64("id")
 | 
				
			||||||
 | 
										group, err := SharedReportNodeGroupDAO.FindEnabledReportNodeGroup(tx, groupId)
 | 
				
			||||||
 | 
										if err != nil {
 | 
				
			||||||
 | 
											return err
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
										if group == nil {
 | 
				
			||||||
 | 
											continue
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
										newGroups = append(newGroups, maps.Map{
 | 
				
			||||||
 | 
											"id":   group.Id,
 | 
				
			||||||
 | 
											"name": group.Name,
 | 
				
			||||||
 | 
										})
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									item.Options["groups"] = newGroups
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						itemsJSON, err := json.Marshal(items)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						threshold.Items = string(itemsJSON)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										6
									
								
								internal/db/models/node_ip_address_threshold_dao_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										6
									
								
								internal/db/models/node_ip_address_threshold_dao_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,6 @@
 | 
				
			|||||||
 | 
					package models
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						_ "github.com/go-sql-driver/mysql"
 | 
				
			||||||
 | 
						_ "github.com/iwind/TeaGo/bootstrap"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
							
								
								
									
										26
									
								
								internal/db/models/node_ip_address_threshold_model.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										26
									
								
								internal/db/models/node_ip_address_threshold_model.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,26 @@
 | 
				
			|||||||
 | 
					package models
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NodeIPAddressThreshold IP地址阈值
 | 
				
			||||||
 | 
					type NodeIPAddressThreshold struct {
 | 
				
			||||||
 | 
						Id         uint64 `field:"id"`         // ID
 | 
				
			||||||
 | 
						AddressId  uint64 `field:"addressId"`  // IP地址ID
 | 
				
			||||||
 | 
						Items      string `field:"items"`      // 阈值条目
 | 
				
			||||||
 | 
						Actions    string `field:"actions"`    // 动作
 | 
				
			||||||
 | 
						NotifiedAt uint64 `field:"notifiedAt"` // 上次通知时间
 | 
				
			||||||
 | 
						State      uint8  `field:"state"`      // 状态
 | 
				
			||||||
 | 
						Order      uint32 `field:"order"`      // 排序
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type NodeIPAddressThresholdOperator struct {
 | 
				
			||||||
 | 
						Id         interface{} // ID
 | 
				
			||||||
 | 
						AddressId  interface{} // IP地址ID
 | 
				
			||||||
 | 
						Items      interface{} // 阈值条目
 | 
				
			||||||
 | 
						Actions    interface{} // 动作
 | 
				
			||||||
 | 
						NotifiedAt interface{} // 上次通知时间
 | 
				
			||||||
 | 
						State      interface{} // 状态
 | 
				
			||||||
 | 
						Order      interface{} // 排序
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func NewNodeIPAddressThresholdOperator() *NodeIPAddressThresholdOperator {
 | 
				
			||||||
 | 
						return &NodeIPAddressThresholdOperator{}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										31
									
								
								internal/db/models/node_ip_address_threshold_model_ext.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										31
									
								
								internal/db/models/node_ip_address_threshold_model_ext.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,31 @@
 | 
				
			|||||||
 | 
					package models
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"encoding/json"
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (this *NodeIPAddressThreshold) DecodeItems() (result []*nodeconfigs.NodeValueThresholdItemConfig) {
 | 
				
			||||||
 | 
						if len(this.Items) == 0 {
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						err := json.Unmarshal([]byte(this.Items), &result)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							remotelogs.Error("NodeIPAddressThreshold", "decode items: "+err.Error())
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (this *NodeIPAddressThreshold) DecodeActions() (result []*nodeconfigs.NodeValueThresholdActionConfig) {
 | 
				
			||||||
 | 
						if len(this.Actions) == 0 {
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						err := json.Unmarshal([]byte(this.Actions), &result)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							remotelogs.Error("NodeIPAddressThreshold", "decode actions: "+err.Error())
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -7,6 +7,7 @@ import (
 | 
				
			|||||||
	"github.com/iwind/TeaGo/Tea"
 | 
						"github.com/iwind/TeaGo/Tea"
 | 
				
			||||||
	"github.com/iwind/TeaGo/dbs"
 | 
						"github.com/iwind/TeaGo/dbs"
 | 
				
			||||||
	"github.com/iwind/TeaGo/maps"
 | 
						"github.com/iwind/TeaGo/maps"
 | 
				
			||||||
 | 
						"github.com/iwind/TeaGo/types"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -163,26 +164,41 @@ func (this *ReportResultDAO) FindAvgLevelWithTarget(tx *dbs.Tx, taskType reporte
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// FindConnectivityWithTarget 获取某个对象的连通率
 | 
					// FindConnectivityWithTarget 获取某个对象的连通率
 | 
				
			||||||
func (this *ReportResultDAO) FindConnectivityWithTarget(tx *dbs.Tx, taskType reporterconfigs.TaskType, targetId int64) (float64, error) {
 | 
					func (this *ReportResultDAO) FindConnectivityWithTarget(tx *dbs.Tx, taskType reporterconfigs.TaskType, targetId int64, groupId int64) (float64, error) {
 | 
				
			||||||
	// 已汇报数据的数量
 | 
						var query = this.Query(tx).
 | 
				
			||||||
	total, err := this.Query(tx).
 | 
					 | 
				
			||||||
		Attr("type", taskType).
 | 
							Attr("type", taskType).
 | 
				
			||||||
		Attr("targetId", targetId).
 | 
							Attr("targetId", targetId)
 | 
				
			||||||
		Where("reportNodeId IN (SELECT id FROM "+SharedReportNodeDAO.Table+" WHERE state=1 AND isOn=1)").
 | 
						if groupId > 0 {
 | 
				
			||||||
 | 
							query.Where("reportNodeId IN (SELECT id FROM "+SharedReportNodeDAO.Table+" WHERE state=1 AND isOn=1 AND JSON_CONTAINS(groupIds, :groupIdString))").
 | 
				
			||||||
 | 
								Param("groupIdString", types.String(groupId))
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							query.Where("reportNodeId IN (SELECT id FROM " + SharedReportNodeDAO.Table + " WHERE state=1 AND isOn=1)")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 已汇报数据的数量
 | 
				
			||||||
 | 
						total, err := query.
 | 
				
			||||||
		Gt("updatedAt", time.Now().Unix()-600).
 | 
							Gt("updatedAt", time.Now().Unix()-600).
 | 
				
			||||||
		Count()
 | 
							Count()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return 0, err
 | 
							return 0, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if total == 0 {
 | 
						if total == 0 {
 | 
				
			||||||
		return 0, nil
 | 
							return 1, nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 连通的数量
 | 
						// 连通的数量
 | 
				
			||||||
	countConnected, err := this.Query(tx).
 | 
						var connectedQuery = this.Query(tx).
 | 
				
			||||||
		Attr("type", taskType).
 | 
							Attr("type", taskType).
 | 
				
			||||||
		Attr("targetId", targetId).
 | 
							Attr("targetId", targetId)
 | 
				
			||||||
		Where("reportNodeId IN (SELECT id FROM "+SharedReportNodeDAO.Table+" WHERE state=1 AND isOn=1)").
 | 
					
 | 
				
			||||||
 | 
						if groupId > 0 {
 | 
				
			||||||
 | 
							connectedQuery.Where("reportNodeId IN (SELECT id FROM "+SharedReportNodeDAO.Table+" WHERE state=1 AND isOn=1 AND JSON_CONTAINS(groupIds, :groupIdString))").
 | 
				
			||||||
 | 
								Param("groupIdString", types.String(groupId))
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							connectedQuery.Where("reportNodeId IN (SELECT id FROM " + SharedReportNodeDAO.Table + " WHERE state=1 AND isOn=1)")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						countConnected, err := connectedQuery.
 | 
				
			||||||
		Attr("isOk", true).
 | 
							Attr("isOk", true).
 | 
				
			||||||
		Gt("updatedAt", time.Now().Unix()-600).
 | 
							Gt("updatedAt", time.Now().Unix()-600).
 | 
				
			||||||
		Count()
 | 
							Count()
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -53,6 +53,11 @@ func (this *APINode) registerServices(server *grpc.Server) {
 | 
				
			|||||||
		pb.RegisterNodeIPAddressLogServiceServer(server, instance)
 | 
							pb.RegisterNodeIPAddressLogServiceServer(server, instance)
 | 
				
			||||||
		this.rest(instance)
 | 
							this.rest(instance)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						{
 | 
				
			||||||
 | 
							instance := this.serviceInstance(&services.NodeIPAddressThresholdService{}).(*services.NodeIPAddressThresholdService)
 | 
				
			||||||
 | 
							pb.RegisterNodeIPAddressThresholdServiceServer(server, instance)
 | 
				
			||||||
 | 
							this.rest(instance)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	{
 | 
						{
 | 
				
			||||||
		instance := this.serviceInstance(&services.APINodeService{}).(*services.APINodeService)
 | 
							instance := this.serviceInstance(&services.APINodeService{}).(*services.APINodeService)
 | 
				
			||||||
		pb.RegisterAPINodeServiceServer(server, instance)
 | 
							pb.RegisterAPINodeServiceServer(server, instance)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1412,7 +1412,7 @@ func (this *NodeService) UpdateNodeDNS(ctx context.Context, req *pb.UpdateNodeDN
 | 
				
			|||||||
				return nil, err
 | 
									return nil, err
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		} else {
 | 
							} else {
 | 
				
			||||||
			_, err = models.SharedNodeIPAddressDAO.CreateAddress(tx, adminId, req.NodeId, nodeconfigs.NodeRoleNode, "DNS IP", req.IpAddr, true, nil)
 | 
								_, err = models.SharedNodeIPAddressDAO.CreateAddress(tx, adminId, req.NodeId, nodeconfigs.NodeRoleNode, "DNS IP", req.IpAddr, true)
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				return nil, err
 | 
									return nil, err
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -21,7 +21,7 @@ func (this *NodeIPAddressService) CreateNodeIPAddress(ctx context.Context, req *
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	tx := this.NullTx()
 | 
						tx := this.NullTx()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	addressId, err := models.SharedNodeIPAddressDAO.CreateAddress(tx, adminId, req.NodeId, req.Role, req.Name, req.Ip, req.CanAccess, req.ThresholdsJSON)
 | 
						addressId, err := models.SharedNodeIPAddressDAO.CreateAddress(tx, adminId, req.NodeId, req.Role, req.Name, req.Ip, req.CanAccess)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -39,7 +39,7 @@ func (this *NodeIPAddressService) UpdateNodeIPAddress(ctx context.Context, req *
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	tx := this.NullTx()
 | 
						tx := this.NullTx()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	err = models.SharedNodeIPAddressDAO.UpdateAddress(tx, adminId, req.NodeIPAddressId, req.Name, req.Ip, req.CanAccess, req.IsOn, req.ThresholdsJSON)
 | 
						err = models.SharedNodeIPAddressDAO.UpdateAddress(tx, adminId, req.NodeIPAddressId, req.Name, req.Ip, req.CanAccess, req.IsOn)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -119,18 +119,17 @@ func (this *NodeIPAddressService) FindEnabledNodeIPAddress(ctx context.Context,
 | 
				
			|||||||
	var result *pb.NodeIPAddress = nil
 | 
						var result *pb.NodeIPAddress = nil
 | 
				
			||||||
	if address != nil {
 | 
						if address != nil {
 | 
				
			||||||
		result = &pb.NodeIPAddress{
 | 
							result = &pb.NodeIPAddress{
 | 
				
			||||||
			Id:             int64(address.Id),
 | 
								Id:          int64(address.Id),
 | 
				
			||||||
			NodeId:         int64(address.NodeId),
 | 
								NodeId:      int64(address.NodeId),
 | 
				
			||||||
			Role:           address.Role,
 | 
								Role:        address.Role,
 | 
				
			||||||
			Name:           address.Name,
 | 
								Name:        address.Name,
 | 
				
			||||||
			Ip:             address.Ip,
 | 
								Ip:          address.Ip,
 | 
				
			||||||
			Description:    address.Description,
 | 
								Description: address.Description,
 | 
				
			||||||
			State:          int64(address.State),
 | 
								State:       int64(address.State),
 | 
				
			||||||
			Order:          int64(address.Order),
 | 
								Order:       int64(address.Order),
 | 
				
			||||||
			CanAccess:      address.CanAccess == 1,
 | 
								CanAccess:   address.CanAccess == 1,
 | 
				
			||||||
			IsOn:           address.IsOn == 1,
 | 
								IsOn:        address.IsOn == 1,
 | 
				
			||||||
			IsUp:           address.IsUp == 1,
 | 
								IsUp:        address.IsUp == 1,
 | 
				
			||||||
			ThresholdsJSON: []byte(address.Thresholds),
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -155,18 +154,17 @@ func (this *NodeIPAddressService) FindAllEnabledIPAddressesWithNodeId(ctx contex
 | 
				
			|||||||
	result := []*pb.NodeIPAddress{}
 | 
						result := []*pb.NodeIPAddress{}
 | 
				
			||||||
	for _, address := range addresses {
 | 
						for _, address := range addresses {
 | 
				
			||||||
		result = append(result, &pb.NodeIPAddress{
 | 
							result = append(result, &pb.NodeIPAddress{
 | 
				
			||||||
			Id:             int64(address.Id),
 | 
								Id:          int64(address.Id),
 | 
				
			||||||
			NodeId:         int64(address.NodeId),
 | 
								NodeId:      int64(address.NodeId),
 | 
				
			||||||
			Role:           address.Role,
 | 
								Role:        address.Role,
 | 
				
			||||||
			Name:           address.Name,
 | 
								Name:        address.Name,
 | 
				
			||||||
			Ip:             address.Ip,
 | 
								Ip:          address.Ip,
 | 
				
			||||||
			Description:    address.Description,
 | 
								Description: address.Description,
 | 
				
			||||||
			State:          int64(address.State),
 | 
								State:       int64(address.State),
 | 
				
			||||||
			Order:          int64(address.Order),
 | 
								Order:       int64(address.Order),
 | 
				
			||||||
			CanAccess:      address.CanAccess == 1,
 | 
								CanAccess:   address.CanAccess == 1,
 | 
				
			||||||
			IsOn:           address.IsOn == 1,
 | 
								IsOn:        address.IsOn == 1,
 | 
				
			||||||
			IsUp:           address.IsUp == 1,
 | 
								IsUp:        address.IsUp == 1,
 | 
				
			||||||
			ThresholdsJSON: []byte(address.Thresholds),
 | 
					 | 
				
			||||||
		})
 | 
							})
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -207,16 +205,15 @@ func (this *NodeIPAddressService) ListEnabledIPAddresses(ctx context.Context, re
 | 
				
			|||||||
	var pbAddrs = []*pb.NodeIPAddress{}
 | 
						var pbAddrs = []*pb.NodeIPAddress{}
 | 
				
			||||||
	for _, addr := range addresses {
 | 
						for _, addr := range addresses {
 | 
				
			||||||
		pbAddrs = append(pbAddrs, &pb.NodeIPAddress{
 | 
							pbAddrs = append(pbAddrs, &pb.NodeIPAddress{
 | 
				
			||||||
			Id:             int64(addr.Id),
 | 
								Id:          int64(addr.Id),
 | 
				
			||||||
			NodeId:         int64(addr.NodeId),
 | 
								NodeId:      int64(addr.NodeId),
 | 
				
			||||||
			Role:           addr.Role,
 | 
								Role:        addr.Role,
 | 
				
			||||||
			Name:           addr.Name,
 | 
								Name:        addr.Name,
 | 
				
			||||||
			Ip:             addr.Ip,
 | 
								Ip:          addr.Ip,
 | 
				
			||||||
			Description:    addr.Description,
 | 
								Description: addr.Description,
 | 
				
			||||||
			CanAccess:      addr.CanAccess == 1,
 | 
								CanAccess:   addr.CanAccess == 1,
 | 
				
			||||||
			IsOn:           addr.IsOn == 1,
 | 
								IsOn:        addr.IsOn == 1,
 | 
				
			||||||
			IsUp:           addr.IsUp == 1,
 | 
								IsUp:        addr.IsUp == 1,
 | 
				
			||||||
			ThresholdsJSON: []byte(addr.Thresholds),
 | 
					 | 
				
			||||||
		})
 | 
							})
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return &pb.ListEnabledIPAddressesResponse{NodeIPAddresses: pbAddrs}, nil
 | 
						return &pb.ListEnabledIPAddressesResponse{NodeIPAddresses: pbAddrs}, nil
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										171
									
								
								internal/rpc/services/service_node_ip_address_threshold.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										171
									
								
								internal/rpc/services/service_node_ip_address_threshold.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,171 @@
 | 
				
			|||||||
 | 
					// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package services
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"context"
 | 
				
			||||||
 | 
						"encoding/json"
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeAPI/internal/db/models"
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeAPI/internal/errors"
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NodeIPAddressThresholdService IP阈值相关服务
 | 
				
			||||||
 | 
					type NodeIPAddressThresholdService struct {
 | 
				
			||||||
 | 
						BaseService
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// CreateNodeIPAddressThreshold 创建阈值
 | 
				
			||||||
 | 
					func (this *NodeIPAddressThresholdService) CreateNodeIPAddressThreshold(ctx context.Context, req *pb.CreateNodeIPAddressThresholdRequest) (*pb.CreateNodeIPAddressThresholdResponse, error) {
 | 
				
			||||||
 | 
						_, err := this.ValidateAdmin(ctx, 0)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var tx = this.NullTx()
 | 
				
			||||||
 | 
						var items = []*nodeconfigs.NodeValueThresholdItemConfig{}
 | 
				
			||||||
 | 
						if len(req.ItemsJSON) > 0 {
 | 
				
			||||||
 | 
							err = json.Unmarshal(req.ItemsJSON, &items)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return nil, errors.New("decode items failed: " + err.Error())
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var actions = []*nodeconfigs.NodeValueThresholdActionConfig{}
 | 
				
			||||||
 | 
						if len(req.ActionsJSON) > 0 {
 | 
				
			||||||
 | 
							err = json.Unmarshal(req.ActionsJSON, &actions)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return nil, errors.New("decode actions failed: " + err.Error())
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						thresholdId, err := models.SharedNodeIPAddressThresholdDAO.CreateThreshold(tx, req.NodeIPAddressId, items, actions, 0)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return &pb.CreateNodeIPAddressThresholdResponse{NodeIPAddressThresholdId: thresholdId}, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// UpdateNodeIPAddressThreshold 修改阈值
 | 
				
			||||||
 | 
					func (this *NodeIPAddressThresholdService) UpdateNodeIPAddressThreshold(ctx context.Context, req *pb.UpdateNodeIPAddressThresholdRequest) (*pb.RPCSuccess, error) {
 | 
				
			||||||
 | 
						_, err := this.ValidateAdmin(ctx, 0)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var tx = this.NullTx()
 | 
				
			||||||
 | 
						var items = []*nodeconfigs.NodeValueThresholdItemConfig{}
 | 
				
			||||||
 | 
						if len(req.ItemsJSON) > 0 {
 | 
				
			||||||
 | 
							err = json.Unmarshal(req.ItemsJSON, &items)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return nil, errors.New("decode items failed: " + err.Error())
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var actions = []*nodeconfigs.NodeValueThresholdActionConfig{}
 | 
				
			||||||
 | 
						if len(req.ActionsJSON) > 0 {
 | 
				
			||||||
 | 
							err = json.Unmarshal(req.ActionsJSON, &actions)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return nil, errors.New("decode actions failed: " + err.Error())
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						err = models.SharedNodeIPAddressThresholdDAO.UpdateThreshold(tx, req.NodeIPAddressThresholdId, items, actions, -1)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return this.Success()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// DeleteNodeIPAddressThreshold 删除阈值
 | 
				
			||||||
 | 
					func (this *NodeIPAddressThresholdService) DeleteNodeIPAddressThreshold(ctx context.Context, req *pb.DeleteNodeIPAddressThresholdRequest) (*pb.RPCSuccess, error) {
 | 
				
			||||||
 | 
						_, err := this.ValidateAdmin(ctx, 0)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var tx = this.NullTx()
 | 
				
			||||||
 | 
						err = models.SharedNodeIPAddressThresholdDAO.DisableNodeIPAddressThreshold(tx, req.NodeIPAddressThresholdId)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return this.Success()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// FindAllEnabledNodeIPAddressThresholds 查找IP的所有阈值
 | 
				
			||||||
 | 
					func (this *NodeIPAddressThresholdService) FindAllEnabledNodeIPAddressThresholds(ctx context.Context, req *pb.FindAllEnabledNodeIPAddressThresholdsRequest) (*pb.FindAllEnabledNodeIPAddressThresholdsResponse, error) {
 | 
				
			||||||
 | 
						_, err := this.ValidateAdmin(ctx, 0)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var tx = this.NullTx()
 | 
				
			||||||
 | 
						thresholds, err := models.SharedNodeIPAddressThresholdDAO.FindAllEnabledThresholdsWithAddrId(tx, req.NodeIPAddressId)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var pbThresholds = []*pb.NodeIPAddressThreshold{}
 | 
				
			||||||
 | 
						for _, threshold := range thresholds {
 | 
				
			||||||
 | 
							pbThresholds = append(pbThresholds, &pb.NodeIPAddressThreshold{
 | 
				
			||||||
 | 
								Id:          int64(threshold.Id),
 | 
				
			||||||
 | 
								ItemsJSON:   []byte(threshold.Items),
 | 
				
			||||||
 | 
								ActionsJSON: []byte(threshold.Actions),
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return &pb.FindAllEnabledNodeIPAddressThresholdsResponse{NodeIPAddressThresholds: pbThresholds}, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// CountAllEnabledNodeIPAddressThresholds 计算IP阈值的数量
 | 
				
			||||||
 | 
					func (this *NodeIPAddressThresholdService) CountAllEnabledNodeIPAddressThresholds(ctx context.Context, req *pb.CountAllEnabledNodeIPAddressThresholdsRequest) (*pb.RPCCountResponse, error) {
 | 
				
			||||||
 | 
						_, err := this.ValidateAdmin(ctx, 0)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var tx = this.NullTx()
 | 
				
			||||||
 | 
						count, err := models.SharedNodeIPAddressThresholdDAO.CountAllEnabledThresholdsWithAddrId(tx, req.NodeIPAddressId)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return this.SuccessCount(count)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// UpdateAllNodeIPAddressThresholds 批量更新阈值
 | 
				
			||||||
 | 
					func (this *NodeIPAddressThresholdService) UpdateAllNodeIPAddressThresholds(ctx context.Context, req *pb.UpdateAllNodeIPAddressThresholdsRequest) (*pb.RPCSuccess, error) {
 | 
				
			||||||
 | 
						_, err := this.ValidateAdmin(ctx, 0)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var tx = this.NullTx()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var thresholds = []*nodeconfigs.NodeValueThresholdConfig{}
 | 
				
			||||||
 | 
						err = json.Unmarshal(req.NodeIPAddressThresholdsJSON, &thresholds)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, errors.New("decode thresholds failed: " + err.Error())
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						err = models.SharedNodeIPAddressThresholdDAO.DisableAllThresholdsWithAddrId(tx, req.NodeIPAddressId)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(thresholds) == 0 {
 | 
				
			||||||
 | 
							return this.Success()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var count = len(thresholds)
 | 
				
			||||||
 | 
						for index, threshold := range thresholds {
 | 
				
			||||||
 | 
							var order = count - index
 | 
				
			||||||
 | 
							if threshold.Id > 0 {
 | 
				
			||||||
 | 
								err = models.SharedNodeIPAddressThresholdDAO.UpdateThreshold(tx, threshold.Id, threshold.Items, threshold.Actions, order)
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								_, err = models.SharedNodeIPAddressThresholdDAO.CreateThreshold(tx, req.NodeIPAddressId, threshold.Items, threshold.Actions, order)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return nil, err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return this.Success()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -367,8 +367,7 @@ func upgradeV0_3_0(db *dbs.DB) error {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// v0.3.1
 | 
					// v0.3.1
 | 
				
			||||||
func upgradeV0_3_1(db *dbs.DB) error {
 | 
					func upgradeV0_3_1(db *dbs.DB) error {
 | 
				
			||||||
	// 忽略错误
 | 
						// 清空域名统计,已使用分表代替
 | 
				
			||||||
	_, _ = db.Exec("TRUNCATE table edgeServerDomainHourlyStats")
 | 
						_, _ = db.Exec("TRUNCATE table edgeServerDomainHourlyStats")
 | 
				
			||||||
 | 
					 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -20,3 +20,20 @@ func TestUpgradeSQLData(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	t.Log("ok")
 | 
						t.Log("ok")
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestUpgradeSQLData_v1_3_1(t *testing.T) {
 | 
				
			||||||
 | 
						db, err := dbs.NewInstanceFromConfig(&dbs.DBConfig{
 | 
				
			||||||
 | 
							Driver: "mysql",
 | 
				
			||||||
 | 
							Dsn:    "root:123456@tcp(127.0.0.1:3306)/db_edge_new?charset=utf8mb4&timeout=30s",
 | 
				
			||||||
 | 
							Prefix: "edge",
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatal(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						err = upgradeV0_3_1(db)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatal(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						t.Log("ok")
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
		Reference in New Issue
	
	Block a user