Files
EdgeAPI/internal/db/models/node_ip_address_dao.go

515 lines
14 KiB
Go
Raw Normal View History

2020-08-30 16:12:00 +08:00
package models
import (
"errors"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/dns"
2021-08-31 17:24:52 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
2020-08-30 16:12:00 +08:00
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
2021-08-18 16:19:16 +08:00
"github.com/iwind/TeaGo/maps"
2020-08-30 16:12:00 +08:00
"github.com/iwind/TeaGo/types"
2021-08-18 16:19:16 +08:00
"math"
"strings"
2020-08-30 16:12:00 +08:00
)
const (
NodeIPAddressStateEnabled = 1 // 已启用
NodeIPAddressStateDisabled = 0 // 已禁用
)
type NodeIPAddressDAO dbs.DAO
func NewNodeIPAddressDAO() *NodeIPAddressDAO {
return dbs.NewDAO(&NodeIPAddressDAO{
DAOObject: dbs.DAOObject{
DB: Tea.Env,
Table: "edgeNodeIPAddresses",
Model: new(NodeIPAddress),
PkName: "id",
},
}).(*NodeIPAddressDAO)
}
2020-10-13 20:05:13 +08:00
var SharedNodeIPAddressDAO *NodeIPAddressDAO
func init() {
dbs.OnReady(func() {
SharedNodeIPAddressDAO = NewNodeIPAddressDAO()
})
}
2020-08-30 16:12:00 +08:00
// EnableAddress 启用条目
func (this *NodeIPAddressDAO) EnableAddress(tx *dbs.Tx, addressId int64) (err error) {
_, err = this.Query(tx).
Pk(addressId).
2020-08-30 16:12:00 +08:00
Set("state", NodeIPAddressStateEnabled).
Update()
if err != nil {
return err
}
return this.NotifyUpdate(tx, addressId)
2020-08-30 16:12:00 +08:00
}
// DisableAddress 禁用IP地址
func (this *NodeIPAddressDAO) DisableAddress(tx *dbs.Tx, addressId int64) (err error) {
_, err = this.Query(tx).
Pk(addressId).
2020-08-30 16:12:00 +08:00
Set("state", NodeIPAddressStateDisabled).
Update()
if err != nil {
return err
}
return this.NotifyUpdate(tx, addressId)
2020-08-30 16:12:00 +08:00
}
// DisableAllAddressesWithNodeId 禁用节点的所有的IP地址
func (this *NodeIPAddressDAO) DisableAllAddressesWithNodeId(tx *dbs.Tx, nodeId int64, role nodeconfigs.NodeRole) error {
2020-08-30 16:12:00 +08:00
if nodeId <= 0 {
return errors.New("invalid nodeId")
}
if len(role) == 0 {
role = nodeconfigs.NodeRoleNode
}
_, err := this.Query(tx).
2020-08-30 16:12:00 +08:00
Attr("nodeId", nodeId).
Attr("role", role).
2020-08-30 16:12:00 +08:00
Set("state", NodeIPAddressStateDisabled).
Update()
if err != nil {
return err
}
return SharedNodeDAO.NotifyDNSUpdate(tx, nodeId)
2020-08-30 16:12:00 +08:00
}
// FindEnabledAddress 查找启用中的IP地址
func (this *NodeIPAddressDAO) FindEnabledAddress(tx *dbs.Tx, id int64) (*NodeIPAddress, error) {
result, err := this.Query(tx).
2020-08-30 16:12:00 +08:00
Pk(id).
Attr("state", NodeIPAddressStateEnabled).
Find()
if result == nil {
return nil, err
}
return result.(*NodeIPAddress), err
}
// FindAddressName 根据主键查找名称
func (this *NodeIPAddressDAO) FindAddressName(tx *dbs.Tx, id int64) (string, error) {
return this.Query(tx).
2020-08-30 16:12:00 +08:00
Pk(id).
Result("name").
FindStringCol("")
}
// CreateAddress 创建IP地址
2021-08-31 17:24:52 +08:00
func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, adminId int64, nodeId int64, role nodeconfigs.NodeRole, name string, ip string, canAccess bool, thresholdsJSON []byte) (addressId int64, err error) {
if len(role) == 0 {
role = nodeconfigs.NodeRoleNode
}
2020-08-30 16:12:00 +08:00
op := NewNodeIPAddressOperator()
op.NodeId = nodeId
op.Role = role
2020-08-30 16:12:00 +08:00
op.Name = name
op.Ip = ip
op.CanAccess = canAccess
2021-08-18 16:19:16 +08:00
if len(thresholdsJSON) > 0 {
op.Thresholds = thresholdsJSON
} else {
op.Thresholds = "[]"
}
2020-08-30 16:12:00 +08:00
op.State = NodeIPAddressStateEnabled
2021-08-31 17:24:52 +08:00
addressId, err = this.SaveInt64(tx, op)
2020-08-30 16:12:00 +08:00
if err != nil {
return 0, err
}
2021-01-27 23:00:02 +08:00
err = SharedNodeDAO.NotifyDNSUpdate(tx, nodeId)
if err != nil {
return 0, err
}
2021-08-31 17:24:52 +08:00
// 创建日志
err = SharedNodeIPAddressLogDAO.CreateLog(tx, adminId, addressId, "创建IP")
if err != nil {
return 0, err
}
return addressId, nil
2020-08-30 16:12:00 +08:00
}
// UpdateAddress 修改IP地址
2021-08-31 17:24:52 +08:00
func (this *NodeIPAddressDAO) UpdateAddress(tx *dbs.Tx, adminId int64, addressId int64, name string, ip string, canAccess bool, isOn bool, thresholdsJSON []byte) (err error) {
2020-08-30 16:12:00 +08:00
if addressId <= 0 {
return errors.New("invalid addressId")
}
op := NewNodeIPAddressOperator()
op.Id = addressId
op.Name = name
op.Ip = ip
op.CanAccess = canAccess
op.IsOn = isOn
2021-08-18 16:19:16 +08:00
if len(thresholdsJSON) > 0 {
op.Thresholds = thresholdsJSON
} else {
op.Thresholds = "[]"
}
op.State = NodeIPAddressStateEnabled // 恢复状态
err = this.Save(tx, op)
if err != nil {
return err
}
2021-08-31 17:24:52 +08:00
// 创建日志
err = SharedNodeIPAddressLogDAO.CreateLog(tx, adminId, addressId, "修改IP")
if err != nil {
return err
}
return this.NotifyUpdate(tx, addressId)
2020-08-30 16:12:00 +08:00
}
// UpdateAddressIP 修改IP地址中的IP
func (this *NodeIPAddressDAO) UpdateAddressIP(tx *dbs.Tx, addressId int64, ip string) error {
2020-11-14 09:41:58 +08:00
if addressId <= 0 {
return errors.New("invalid addressId")
}
op := NewNodeIPAddressOperator()
op.Id = addressId
op.Ip = ip
err := this.Save(tx, op)
if err != nil {
return err
}
return this.NotifyUpdate(tx, addressId)
2020-11-14 09:41:58 +08:00
}
// UpdateAddressNodeId 修改IP地址所属节点
func (this *NodeIPAddressDAO) UpdateAddressNodeId(tx *dbs.Tx, addressId int64, nodeId int64) error {
_, err := this.Query(tx).
2020-08-30 16:12:00 +08:00
Pk(addressId).
Set("nodeId", nodeId).
Set("state", NodeIPAddressStateEnabled). // 恢复状态
Update()
2021-01-27 23:00:02 +08:00
if err != nil {
return err
}
err = SharedNodeDAO.NotifyDNSUpdate(tx, nodeId)
if err != nil {
return err
}
return nil
2020-08-30 16:12:00 +08:00
}
// FindAllEnabledAddressesWithNode 查找节点的所有的IP地址
func (this *NodeIPAddressDAO) FindAllEnabledAddressesWithNode(tx *dbs.Tx, nodeId int64, role nodeconfigs.NodeRole) (result []*NodeIPAddress, err error) {
if len(role) == 0 {
role = nodeconfigs.NodeRoleNode
}
_, err = this.Query(tx).
2020-08-30 16:12:00 +08:00
Attr("nodeId", nodeId).
Attr("role", role).
2020-08-30 16:12:00 +08:00
State(NodeIPAddressStateEnabled).
Desc("order").
AscPk().
Slice(&result).
FindAll()
return
}
// FindFirstNodeAccessIPAddress 查找节点的第一个可访问的IP地址
func (this *NodeIPAddressDAO) FindFirstNodeAccessIPAddress(tx *dbs.Tx, nodeId int64, role nodeconfigs.NodeRole) (string, error) {
if len(role) == 0 {
role = nodeconfigs.NodeRoleNode
}
return this.Query(tx).
Attr("nodeId", nodeId).
Attr("role", role).
State(NodeIPAddressStateEnabled).
Attr("canAccess", true).
Desc("order").
AscPk().
Result("ip").
FindStringCol("")
}
2020-11-14 09:41:58 +08:00
// FindFirstNodeAccessIPAddressId 查找节点的第一个可访问的IP地址ID
func (this *NodeIPAddressDAO) FindFirstNodeAccessIPAddressId(tx *dbs.Tx, nodeId int64, role nodeconfigs.NodeRole) (int64, error) {
if len(role) == 0 {
role = nodeconfigs.NodeRoleNode
}
return this.Query(tx).
2020-11-14 09:41:58 +08:00
Attr("nodeId", nodeId).
Attr("role", role).
2020-11-14 09:41:58 +08:00
State(NodeIPAddressStateEnabled).
Attr("canAccess", true).
Desc("order").
AscPk().
Result("id").
FindInt64Col(0)
}
2021-01-27 23:00:02 +08:00
// FindNodeAccessAndUpIPAddresses 查找节点所有的可访问的IP地址
func (this *NodeIPAddressDAO) FindNodeAccessAndUpIPAddresses(tx *dbs.Tx, nodeId int64, role nodeconfigs.NodeRole) (result []*NodeIPAddress, err error) {
if len(role) == 0 {
role = nodeconfigs.NodeRoleNode
}
2021-01-27 23:00:02 +08:00
_, err = this.Query(tx).
2021-06-27 21:59:37 +08:00
Attr("role", role).
2021-01-27 23:00:02 +08:00
Attr("nodeId", nodeId).
State(NodeIPAddressStateEnabled).
Attr("canAccess", true).
Attr("isOn", true).
Attr("isUp", true).
2021-01-27 23:00:02 +08:00
Desc("order").
AscPk().
Slice(&result).
FindAll()
return
}
2021-08-18 16:19:16 +08:00
// FireThresholds 触发阈值
func (this *NodeIPAddressDAO) FireThresholds(tx *dbs.Tx, role nodeconfigs.NodeRole, nodeId int64) error {
ones, err := this.Query(tx).
Attr("state", NodeIPAddressStateEnabled).
Attr("role", role).
Attr("nodeId", nodeId).
Attr("canAccess", true).
Attr("isOn", true).
FindAll()
if err != nil {
return err
}
for _, one := range ones {
addr := one.(*NodeIPAddress)
var thresholds = addr.DecodeThresholds()
if len(thresholds) == 0 {
continue
}
var isOk = true
2021-08-31 17:24:52 +08:00
var positiveSummarys = []string{}
var negativeSummarys = []string{}
2021-08-18 16:19:16 +08:00
for _, threshold := range thresholds {
if threshold.Value <= 0 || threshold.Duration <= 0 {
continue
}
var value = float64(0)
2021-08-31 17:24:52 +08:00
var summary = ""
var op = nodeconfigs.FindNodeValueOperatorName(threshold.Operator)
2021-08-18 16:19:16 +08:00
switch threshold.Item {
case "avgRequests":
value, err = SharedNodeValueDAO.SumValues(tx, role, nodeId, nodeconfigs.NodeValueItemRequests, "total", nodeconfigs.NodeValueSumMethodAvg, types.Int32(threshold.Duration), threshold.DurationUnit)
value = math.Round(value / 60)
2021-08-31 17:24:52 +08:00
summary = "平均请求数:" + types.String(value) + "/s阈值" + op + " " + types.String(threshold.Value) + "/s"
2021-08-18 16:19:16 +08:00
case "avgTrafficOut":
value, err = SharedNodeValueDAO.SumValues(tx, role, nodeId, nodeconfigs.NodeValueItemTrafficOut, "total", nodeconfigs.NodeValueSumMethodAvg, types.Int32(threshold.Duration), threshold.DurationUnit)
value = math.Round(value*100/1024/1024/60) / 100 // 100 = 两位小数
2021-08-31 17:24:52 +08:00
summary = "平均下行流量:" + types.String(value) + "MB/s阈值" + op + " " + types.String(threshold.Value) + "MB/s"
2021-08-18 16:19:16 +08:00
case "avgTrafficIn":
value, err = SharedNodeValueDAO.SumValues(tx, role, nodeId, nodeconfigs.NodeValueItemTrafficIn, "total", nodeconfigs.NodeValueSumMethodAvg, types.Int32(threshold.Duration), threshold.DurationUnit)
value = math.Round(value*100/1024/1024/60) / 100 // 100 = 两位小数
2021-08-31 17:24:52 +08:00
summary = "平均上行流量:" + types.String(value) + "MB/s阈值" + op + " " + types.String(threshold.Value) + "MB/s"
2021-08-18 16:19:16 +08:00
default:
// TODO 支持更多
err = errors.New("threshold item '" + threshold.Item + "' not supported")
}
if err != nil {
return err
}
if !nodeconfigs.CompareNodeValue(threshold.Operator, value, float64(threshold.Value)) {
isOk = false
2021-08-31 17:24:52 +08:00
negativeSummarys = append(negativeSummarys, summary)
} else {
positiveSummarys = append(positiveSummarys, summary)
2021-08-18 16:19:16 +08:00
}
}
if isOk && addr.IsUp == 0 { // 新上线
_, err := this.Query(tx).
Pk(addr.Id).
Set("isUp", true).
Update()
if err != nil {
return err
}
2021-08-31 17:24:52 +08:00
// 增加日志
var description = ""
if len(negativeSummarys) > 0 {
description = "触发阈值:" + strings.Join(negativeSummarys, "")
} else {
description = "触发阈值:" + strings.Join(positiveSummarys, "")
}
err = SharedNodeIPAddressLogDAO.CreateLog(tx, 0, int64(addr.Id), description)
if err != nil {
return err
}
2021-08-18 16:19:16 +08:00
clusterId, err := SharedNodeDAO.FindNodeClusterId(tx, nodeId)
if err != nil {
return err
}
2021-08-31 17:24:52 +08:00
err = SharedMessageDAO.CreateNodeMessage(tx, role, clusterId, nodeId, MessageTypeIPAddrUp, MessageLevelSuccess, "节点IP'"+addr.Ip+"'因为达到阈值而上线", "节点IP'"+addr.Ip+"'因为达到阈值而上线。"+description+"。", maps.Map{
2021-08-18 16:19:16 +08:00
"addrId": addr.Id,
}.AsJSON())
if err != nil {
return err
}
err = this.NotifyUpdate(tx, int64(addr.Id))
if err != nil {
return err
}
} else if !isOk && addr.IsUp == 1 { // 新离线
_, err := this.Query(tx).
Pk(addr.Id).
Set("isUp", false).
Update()
if err != nil {
return err
}
2021-08-31 17:24:52 +08:00
// 增加日志
var description = ""
if len(negativeSummarys) > 0 {
description = "触发阈值:" + strings.Join(negativeSummarys, "")
} else {
description = "触发阈值:" + strings.Join(positiveSummarys, "")
}
err = SharedNodeIPAddressLogDAO.CreateLog(tx, 0, int64(addr.Id), description)
if err != nil {
return err
}
2021-08-18 16:19:16 +08:00
clusterId, err := SharedNodeDAO.FindNodeClusterId(tx, nodeId)
if err != nil {
return err
}
2021-08-31 17:24:52 +08:00
err = SharedMessageDAO.CreateNodeMessage(tx, role, clusterId, nodeId, MessageTypeIPAddrDown, MessageLevelWarning, "节点IP'"+addr.Ip+"'因为达到阈值而下线", "节点IP'"+addr.Ip+"'因为达到阈值而下线。"+description+"。", maps.Map{
2021-08-18 16:19:16 +08:00
"addrId": addr.Id,
}.AsJSON())
if err != nil {
return err
}
err = this.NotifyUpdate(tx, int64(addr.Id))
if err != nil {
return err
}
}
}
return nil
}
2021-08-31 17:24:52 +08:00
// CountAllEnabledIPAddresses 计算IP地址数量
// TODO 目前支持边缘节点将来支持NS节点
func (this *NodeIPAddressDAO) CountAllEnabledIPAddresses(tx *dbs.Tx, role string, nodeClusterId int64, upState configutils.BoolState, keyword string) (int64, error) {
var query = this.Query(tx).
State(NodeIPAddressStateEnabled).
Attr("role", role)
// 集群
if nodeClusterId > 0 {
query.Where("nodeId IN (SELECT id FROM "+SharedNodeDAO.Table+" WHERE (clusterId=:clusterId OR JSON_CONTAINS(secondaryClusterIds, :clusterIdString)) AND state=1)").
Param("clusterId", nodeClusterId).
Param("clusterIdString", types.String(nodeClusterId))
} else {
query.Where("nodeId IN (SELECT id FROM " + SharedNodeDAO.Table + " WHERE state=1 AND clusterId IN (SELECT id FROM " + SharedNodeClusterDAO.Table + " WHERE state=1))")
}
// 在线状态
switch upState {
case configutils.BoolStateYes:
query.Attr("isUp", 1)
case configutils.BoolStateNo:
query.Attr("isUp", 0)
}
// 关键词
if len(keyword) > 0 {
query.Where("(ip LIKE :keyword OR name LIKE :keyword OR description LIKE :keyword OR nodeId IN (SELECT id FROM "+SharedNodeDAO.Table+" WHERE state=1 AND name LIKE :keyword))").
Param("keyword", "%"+keyword+"%")
}
return query.Count()
}
// ListEnabledIPAddresses 列出单页的IP地址
func (this *NodeIPAddressDAO) ListEnabledIPAddresses(tx *dbs.Tx, role string, nodeClusterId int64, upState configutils.BoolState, keyword string, offset int64, size int64) (result []*NodeIPAddress, err error) {
var query = this.Query(tx).
State(NodeIPAddressStateEnabled).
Attr("role", role)
// 集群
if nodeClusterId > 0 {
query.Where("nodeId IN (SELECT id FROM "+SharedNodeDAO.Table+" WHERE (clusterId=:clusterId OR JSON_CONTAINS(secondaryClusterIds, :clusterIdString)) AND state=1)").
Param("clusterId", nodeClusterId).
Param("clusterIdString", types.String(nodeClusterId))
} else {
query.Where("nodeId IN (SELECT id FROM " + SharedNodeDAO.Table + " WHERE state=1 AND clusterId IN (SELECT id FROM " + SharedNodeClusterDAO.Table + " WHERE state=1))")
}
// 在线状态
switch upState {
case configutils.BoolStateYes:
query.Attr("isUp", 1)
case configutils.BoolStateNo:
query.Attr("isUp", 0)
}
// 关键词
if len(keyword) > 0 {
query.Where("(ip LIKE :keyword OR name LIKE :keyword OR description LIKE :keyword OR nodeId IN (SELECT id FROM "+SharedNodeDAO.Table+" WHERE state=1 AND name LIKE :keyword))").
Param("keyword", "%"+keyword+"%")
}
_, err = query.Offset(offset).
Limit(size).
Asc("isUp").
Desc("nodeId").
Slice(&result).
FindAll()
return
}
// NotifyUpdate 通知更新
func (this *NodeIPAddressDAO) NotifyUpdate(tx *dbs.Tx, addressId int64) error {
address, err := this.Query(tx).
Pk(addressId).
Result("nodeId", "role").
Find()
if err != nil {
return err
}
if address == nil {
return nil
}
var nodeId = int64(address.(*NodeIPAddress).NodeId)
if nodeId == 0 {
return nil
}
var role = address.(*NodeIPAddress).Role
switch role {
case nodeconfigs.NodeRoleNode:
err = dns.SharedDNSTaskDAO.CreateNodeTask(tx, nodeId, dns.DNSTaskTypeNodeChange)
}
if err != nil {
return err
}
return nil
}