实现基础的阈值设置

This commit is contained in:
GoEdgeLab
2021-05-04 21:02:31 +08:00
parent 62cf0eb44f
commit 56665ba206
9 changed files with 362 additions and 7 deletions

View File

@@ -97,8 +97,8 @@ func (this *MessageReceiverDAO) CreateReceiver(tx *dbs.Tx, target MessageTaskTar
return this.SaveInt64(tx, op) return this.SaveInt64(tx, op)
} }
// FindAllReceivers 查询接收人 // FindAllEnabledReceivers 查询接收人
func (this *MessageReceiverDAO) FindAllReceivers(tx *dbs.Tx, target MessageTaskTarget, messageType string) (result []*MessageReceiver, err error) { func (this *MessageReceiverDAO) FindAllEnabledReceivers(tx *dbs.Tx, target MessageTaskTarget, messageType string) (result []*MessageReceiver, err error) {
query := this.Query(tx) query := this.Query(tx)
if len(messageType) > 0 { if len(messageType) > 0 {
query.Attr("type", []string{"*", messageType}) // *表示所有的 query.Attr("type", []string{"*", messageType}) // *表示所有的
@@ -113,3 +113,17 @@ func (this *MessageReceiverDAO) FindAllReceivers(tx *dbs.Tx, target MessageTaskT
FindAll() FindAll()
return return
} }
// CountAllEnabledReceivers 计算接收人数量
func (this *MessageReceiverDAO) CountAllEnabledReceivers(tx *dbs.Tx, target MessageTaskTarget, messageType string) (int64, error) {
query := this.Query(tx)
if len(messageType) > 0 {
query.Attr("type", []string{"*", messageType}) // *表示所有的
}
return query.
Attr("clusterId", target.ClusterId).
Attr("nodeId", target.NodeId).
Attr("serverId", target.ServerId).
State(MessageReceiverStateEnabled).
Count()
}

View File

@@ -118,7 +118,7 @@ func (this *MessageTaskDAO) UpdateMessageTaskStatus(tx *dbs.Tx, taskId int64, st
// CreateMessageTasks 从集群、节点或者服务中创建任务 // CreateMessageTasks 从集群、节点或者服务中创建任务
func (this *MessageTaskDAO) CreateMessageTasks(tx *dbs.Tx, target MessageTaskTarget, messageType MessageType, subject string, body string) error { func (this *MessageTaskDAO) CreateMessageTasks(tx *dbs.Tx, target MessageTaskTarget, messageType MessageType, subject string, body string) error {
receivers, err := SharedMessageReceiverDAO.FindAllReceivers(tx, target, messageType) receivers, err := SharedMessageReceiverDAO.FindAllEnabledReceivers(tx, target, messageType)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -0,0 +1,138 @@
package models
import (
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
)
const (
NodeThresholdStateEnabled = 1 // 已启用
NodeThresholdStateDisabled = 0 // 已禁用
)
type NodeThresholdDAO dbs.DAO
func NewNodeThresholdDAO() *NodeThresholdDAO {
return dbs.NewDAO(&NodeThresholdDAO{
DAOObject: dbs.DAOObject{
DB: Tea.Env,
Table: "edgeNodeThresholds",
Model: new(NodeThreshold),
PkName: "id",
},
}).(*NodeThresholdDAO)
}
var SharedNodeThresholdDAO *NodeThresholdDAO
func init() {
dbs.OnReady(func() {
SharedNodeThresholdDAO = NewNodeThresholdDAO()
})
}
// EnableNodeThreshold 启用条目
func (this *NodeThresholdDAO) EnableNodeThreshold(tx *dbs.Tx, id int64) error {
_, err := this.Query(tx).
Pk(id).
Set("state", NodeThresholdStateEnabled).
Update()
return err
}
// DisableNodeThreshold 禁用条目
func (this *NodeThresholdDAO) DisableNodeThreshold(tx *dbs.Tx, id int64) error {
_, err := this.Query(tx).
Pk(id).
Set("state", NodeThresholdStateDisabled).
Update()
return err
}
// FindEnabledNodeThreshold 查找启用中的条目
func (this *NodeThresholdDAO) FindEnabledNodeThreshold(tx *dbs.Tx, id int64) (*NodeThreshold, error) {
result, err := this.Query(tx).
Pk(id).
Attr("state", NodeThresholdStateEnabled).
Find()
if result == nil {
return nil, err
}
return result.(*NodeThreshold), err
}
// CreateThreshold 创建阈值
func (this *NodeThresholdDAO) CreateThreshold(tx *dbs.Tx, clusterId int64, nodeId int64, item nodeconfigs.NodeValueItem, param string, operator nodeconfigs.NodeValueOperator, valueJSON []byte, message string, sumMethod nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (int64, error) {
op := NewNodeThresholdOperator()
op.ClusterId = clusterId
op.NodeId = nodeId
op.Item = item
op.Param = param
op.Operator = operator
op.Value = valueJSON
op.Message = message
op.SumMethod = sumMethod
op.Duration = duration
op.DurationUnit = durationUnit
op.IsOn = true
op.State = NodeThresholdStateEnabled
return this.SaveInt64(tx, op)
}
// UpdateThreshold 修改阈值
func (this *NodeThresholdDAO) UpdateThreshold(tx *dbs.Tx, thresholdId int64, item nodeconfigs.NodeValueItem, param string, operator nodeconfigs.NodeValueOperator, valueJSON []byte, message string, sumMethod nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit, isOn bool) error {
if thresholdId <= 0 {
return errors.New("invalid thresholdId")
}
op := NewNodeThresholdOperator()
op.Id = thresholdId
op.Item = item
op.Param = param
op.Operator = operator
op.Value = valueJSON
op.Message = message
op.SumMethod = sumMethod
op.Duration = duration
op.DurationUnit = durationUnit
op.IsOn = isOn
return this.Save(tx, op)
}
// FindAllEnabledThresholds 列出所有阈值
func (this *NodeThresholdDAO) FindAllEnabledThresholds(tx *dbs.Tx, clusterId int64, nodeId int64) (result []*NodeThreshold, err error) {
if clusterId <= 0 && nodeId <= 0 {
return
}
query := this.Query(tx)
if clusterId > 0 {
query.Attr("clusterId", clusterId)
}
if nodeId > 0 {
query.Attr("nodeId", nodeId)
}
query.State(NodeThresholdStateEnabled)
query.Slice(&result)
_, err = query.
AscPk().
FindAll()
return
}
// CountAllEnabledThresholds 计算阈值的数量
func (this *NodeThresholdDAO) CountAllEnabledThresholds(tx *dbs.Tx, clusterId int64, nodeId int64) (int64, error) {
if clusterId <= 0 && nodeId <= 0 {
return 0, nil
}
query := this.Query(tx)
if clusterId > 0 {
query.Attr("clusterId", clusterId)
}
if nodeId > 0 {
query.Attr("nodeId", nodeId)
}
query.State(NodeThresholdStateEnabled)
return query.Count()
}

View File

@@ -0,0 +1,6 @@
package models
import (
_ "github.com/go-sql-driver/mysql"
_ "github.com/iwind/TeaGo/bootstrap"
)

View File

@@ -0,0 +1,40 @@
package models
// NodeThreshold 集群阈值设置
type NodeThreshold struct {
Id uint64 `field:"id"` // ID
ClusterId uint32 `field:"clusterId"` // 集群ID
NodeId uint32 `field:"nodeId"` // 节点ID
IsOn uint8 `field:"isOn"` // 是否启用
Item string `field:"item"` // 监控项
Param string `field:"param"` // 参数
Operator string `field:"operator"` // 操作符
Value string `field:"value"` // 对比值
Message string `field:"message"` // 消息内容
State uint8 `field:"state"` // 状态
Duration uint32 `field:"duration"` // 时间段
DurationUnit string `field:"durationUnit"` // 时间段单位
SumMethod string `field:"sumMethod"` // 聚合方法
Order uint32 `field:"order"` // 排序
}
type NodeThresholdOperator struct {
Id interface{} // ID
ClusterId interface{} // 集群ID
NodeId interface{} // 节点ID
IsOn interface{} // 是否启用
Item interface{} // 监控项
Param interface{} // 参数
Operator interface{} // 操作符
Value interface{} // 对比值
Message interface{} // 消息内容
State interface{} // 状态
Duration interface{} // 时间段
DurationUnit interface{} // 时间段单位
SumMethod interface{} // 聚合方法
Order interface{} // 排序
}
func NewNodeThresholdOperator() *NodeThresholdOperator {
return &NodeThresholdOperator{}
}

View File

@@ -0,0 +1 @@
package models

View File

@@ -265,6 +265,7 @@ func (this *APINode) listenRPC(listener net.Listener, tlsConfig *tls.Config) err
pb.RegisterAuthorityKeyServiceServer(rpcServer, &services.AuthorityKeyService{}) pb.RegisterAuthorityKeyServiceServer(rpcServer, &services.AuthorityKeyService{})
pb.RegisterAuthorityNodeServiceServer(rpcServer, &services.AuthorityNodeService{}) pb.RegisterAuthorityNodeServiceServer(rpcServer, &services.AuthorityNodeService{})
pb.RegisterLatestItemServiceServer(rpcServer, &services.LatestItemService{}) pb.RegisterLatestItemServiceServer(rpcServer, &services.LatestItemService{})
pb.RegisterNodeThresholdServiceServer(rpcServer, &services.NodeThresholdService{})
err := rpcServer.Serve(listener) err := rpcServer.Serve(listener)
if err != nil { if err != nil {
return errors.New("[API_NODE]start rpc failed: " + err.Error()) return errors.New("[API_NODE]start rpc failed: " + err.Error())

View File

@@ -56,15 +56,15 @@ func (this *MessageReceiverService) UpdateMessageReceivers(ctx context.Context,
return this.Success() return this.Success()
} }
// FindAllMessageReceivers 查找接收者 // FindAllEnabledMessageReceivers 查找接收者
func (this *MessageReceiverService) FindAllMessageReceivers(ctx context.Context, req *pb.FindAllMessageReceiversRequest) (*pb.FindAllMessageReceiversResponse, error) { func (this *MessageReceiverService) FindAllEnabledMessageReceivers(ctx context.Context, req *pb.FindAllEnabledMessageReceiversRequest) (*pb.FindAllEnabledMessageReceiversResponse, error) {
_, err := this.ValidateAdmin(ctx, 0) _, err := this.ValidateAdmin(ctx, 0)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var tx = this.NullTx() var tx = this.NullTx()
receivers, err := models.SharedMessageReceiverDAO.FindAllReceivers(tx, models.MessageTaskTarget{ receivers, err := models.SharedMessageReceiverDAO.FindAllEnabledReceivers(tx, models.MessageTaskTarget{
ClusterId: req.NodeClusterId, ClusterId: req.NodeClusterId,
NodeId: req.NodeId, NodeId: req.NodeId,
ServerId: req.ServerId, ServerId: req.ServerId,
@@ -152,7 +152,7 @@ func (this *MessageReceiverService) FindAllMessageReceivers(ctx context.Context,
MessageRecipientGroup: pbRecipientGroup, MessageRecipientGroup: pbRecipientGroup,
}) })
} }
return &pb.FindAllMessageReceiversResponse{MessageReceivers: pbReceivers}, nil return &pb.FindAllEnabledMessageReceiversResponse{MessageReceivers: pbReceivers}, nil
} }
// DeleteMessageReceiver 删除接收者 // DeleteMessageReceiver 删除接收者
@@ -169,3 +169,22 @@ func (this *MessageReceiverService) DeleteMessageReceiver(ctx context.Context, r
} }
return this.Success() return this.Success()
} }
// CountAllEnabledMessageReceivers 计算接收者数量
func (this *MessageReceiverService) CountAllEnabledMessageReceivers(ctx context.Context, req *pb.CountAllEnabledMessageReceiversRequest) (*pb.RPCCountResponse, error) {
_, err := this.ValidateAdmin(ctx, 0)
if err != nil {
return nil, err
}
var tx = this.NullTx()
count, err := models.SharedMessageReceiverDAO.CountAllEnabledReceivers(tx, models.MessageTaskTarget{
ClusterId: req.NodeClusterId,
NodeId: req.NodeId,
ServerId: req.ServerId,
}, "")
if err != nil {
return nil, err
}
return this.SuccessCount(count)
}

View File

@@ -0,0 +1,136 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package services
import (
"context"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/types"
)
// NodeThresholdService 节点阈值服务
type NodeThresholdService struct {
BaseService
}
// CreateNodeThreshold 创建阈值
func (this *NodeThresholdService) CreateNodeThreshold(ctx context.Context, req *pb.CreateNodeThresholdRequest) (*pb.CreateNodeThresholdResponse, error) {
_, err := this.ValidateAdmin(ctx, 0)
if err != nil {
return nil, err
}
var tx = this.NullTx()
thresholdId, err := models.SharedNodeThresholdDAO.CreateThreshold(tx, req.NodeClusterId, req.NodeId, req.Item, req.Param, req.Operator, req.ValueJSON, req.Message, req.SumMethod, req.Duration, req.DurationUnit)
if err != nil {
return nil, err
}
return &pb.CreateNodeThresholdResponse{NodeThresholdId: thresholdId}, nil
}
// UpdateNodeThreshold 创建阈值
func (this *NodeThresholdService) UpdateNodeThreshold(ctx context.Context, req *pb.UpdateNodeThresholdRequest) (*pb.RPCSuccess, error) {
_, err := this.ValidateAdmin(ctx, 0)
if err != nil {
return nil, err
}
var tx = this.NullTx()
err = models.SharedNodeThresholdDAO.UpdateThreshold(tx, req.NodeThresholdId, req.Item, req.Param, req.Operator, req.ValueJSON, req.Message, req.SumMethod, req.Duration, req.DurationUnit, req.IsOn)
if err != nil {
return nil, err
}
return this.Success()
}
// DeleteNodeThreshold 删除阈值
func (this *NodeThresholdService) DeleteNodeThreshold(ctx context.Context, req *pb.DeleteNodeThresholdRequest) (*pb.RPCSuccess, error) {
_, err := this.ValidateAdmin(ctx, 0)
if err != nil {
return nil, err
}
var tx = this.NullTx()
err = models.SharedNodeThresholdDAO.DisableNodeThreshold(tx, req.NodeThresholdId)
if err != nil {
return nil, err
}
return this.Success()
}
// FindAllEnabledNodeThresholds 查询阈值
func (this *NodeThresholdService) FindAllEnabledNodeThresholds(ctx context.Context, req *pb.FindAllEnabledNodeThresholdsRequest) (*pb.FindAllEnabledNodeThresholdsResponse, error) {
_, err := this.ValidateAdmin(ctx, 0)
if err != nil {
return nil, err
}
var tx = this.NullTx()
pbThresholds := []*pb.NodeThreshold{}
thresholds, err := models.SharedNodeThresholdDAO.FindAllEnabledThresholds(tx, req.NodeClusterId, req.NodeId)
if err != nil {
return nil, err
}
for _, threshold := range thresholds {
pbThresholds = append(pbThresholds, &pb.NodeThreshold{
Id: int64(threshold.Id),
ClusterId: int64(threshold.ClusterId),
NodeId: int64(threshold.NodeId),
Item: threshold.Item,
Param: threshold.Param,
Operator: threshold.Operator,
ValueJSON: []byte(threshold.Value),
Message: threshold.Message,
Duration: types.Int32(threshold.Duration),
DurationUnit: threshold.DurationUnit,
SumMethod: threshold.SumMethod,
IsOn: threshold.IsOn == 1,
})
}
return &pb.FindAllEnabledNodeThresholdsResponse{NodeThresholds: pbThresholds}, nil
}
// CountAllEnabledNodeThresholds 计算阈值数量
func (this *NodeThresholdService) CountAllEnabledNodeThresholds(ctx context.Context, req *pb.CountAllEnabledNodeThresholdsRequest) (*pb.RPCCountResponse, error) {
_, err := this.ValidateAdmin(ctx, 0)
if err != nil {
return nil, err
}
var tx = this.NullTx()
count, err := models.SharedNodeThresholdDAO.CountAllEnabledThresholds(tx, req.NodeClusterId, req.NodeId)
if err != nil {
return nil, err
}
return this.SuccessCount(count)
}
// FindEnabledNodeThreshold 查询单个阈值详情
func (this *NodeThresholdService) FindEnabledNodeThreshold(ctx context.Context, req *pb.FindEnabledNodeThresholdRequest) (*pb.FindEnabledNodeThresholdResponse, error) {
_, err := this.ValidateAdmin(ctx, 0)
if err != nil {
return nil, err
}
var tx = this.NullTx()
threshold, err := models.SharedNodeThresholdDAO.FindEnabledNodeThreshold(tx, req.NodeThresholdId)
if err != nil {
return nil, err
}
if threshold == nil {
return &pb.FindEnabledNodeThresholdResponse{NodeThreshold: nil}, nil
}
return &pb.FindEnabledNodeThresholdResponse{NodeThreshold: &pb.NodeThreshold{
Id: int64(threshold.Id),
ClusterId: int64(threshold.ClusterId),
NodeId: int64(threshold.NodeId),
Item: threshold.Item,
Param: threshold.Param,
Operator: threshold.Operator,
ValueJSON: []byte(threshold.Value),
Message: threshold.Message,
Duration: types.Int32(threshold.Duration),
DurationUnit: threshold.DurationUnit,
SumMethod: threshold.SumMethod,
IsOn: threshold.IsOn == 1,
}}, nil
}