From 56665ba206bd9699a8dd838374e06c0fa0e65128 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Tue, 4 May 2021 21:02:31 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E5=9F=BA=E7=A1=80=E7=9A=84?= =?UTF-8?q?=E9=98=88=E5=80=BC=E8=AE=BE=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/db/models/message_receiver_dao.go | 18 ++- internal/db/models/message_task_dao.go | 2 +- internal/db/models/node_threshold_dao.go | 138 ++++++++++++++++++ internal/db/models/node_threshold_dao_test.go | 6 + internal/db/models/node_threshold_model.go | 40 +++++ .../db/models/node_threshold_model_ext.go | 1 + internal/nodes/api_node.go | 1 + .../rpc/services/service_message_receiver.go | 27 +++- .../rpc/services/service_node_threshold.go | 136 +++++++++++++++++ 9 files changed, 362 insertions(+), 7 deletions(-) create mode 100644 internal/db/models/node_threshold_dao.go create mode 100644 internal/db/models/node_threshold_dao_test.go create mode 100644 internal/db/models/node_threshold_model.go create mode 100644 internal/db/models/node_threshold_model_ext.go create mode 100644 internal/rpc/services/service_node_threshold.go diff --git a/internal/db/models/message_receiver_dao.go b/internal/db/models/message_receiver_dao.go index 30353243..dc3b21b1 100644 --- a/internal/db/models/message_receiver_dao.go +++ b/internal/db/models/message_receiver_dao.go @@ -97,8 +97,8 @@ func (this *MessageReceiverDAO) CreateReceiver(tx *dbs.Tx, target MessageTaskTar return this.SaveInt64(tx, op) } -// FindAllReceivers 查询接收人 -func (this *MessageReceiverDAO) FindAllReceivers(tx *dbs.Tx, target MessageTaskTarget, messageType string) (result []*MessageReceiver, err error) { +// FindAllEnabledReceivers 查询接收人 +func (this *MessageReceiverDAO) FindAllEnabledReceivers(tx *dbs.Tx, target MessageTaskTarget, messageType string) (result []*MessageReceiver, err error) { query := this.Query(tx) if len(messageType) > 0 { query.Attr("type", []string{"*", messageType}) // *表示所有的 @@ -113,3 +113,17 @@ func (this *MessageReceiverDAO) FindAllReceivers(tx *dbs.Tx, target MessageTaskT FindAll() return } + +// CountAllEnabledReceivers 计算接收人数量 +func (this *MessageReceiverDAO) CountAllEnabledReceivers(tx *dbs.Tx, target MessageTaskTarget, messageType string) (int64, error) { + query := this.Query(tx) + if len(messageType) > 0 { + query.Attr("type", []string{"*", messageType}) // *表示所有的 + } + return query. + Attr("clusterId", target.ClusterId). + Attr("nodeId", target.NodeId). + Attr("serverId", target.ServerId). + State(MessageReceiverStateEnabled). + Count() +} diff --git a/internal/db/models/message_task_dao.go b/internal/db/models/message_task_dao.go index ca4fdd1a..6d120cc6 100644 --- a/internal/db/models/message_task_dao.go +++ b/internal/db/models/message_task_dao.go @@ -118,7 +118,7 @@ func (this *MessageTaskDAO) UpdateMessageTaskStatus(tx *dbs.Tx, taskId int64, st // CreateMessageTasks 从集群、节点或者服务中创建任务 func (this *MessageTaskDAO) CreateMessageTasks(tx *dbs.Tx, target MessageTaskTarget, messageType MessageType, subject string, body string) error { - receivers, err := SharedMessageReceiverDAO.FindAllReceivers(tx, target, messageType) + receivers, err := SharedMessageReceiverDAO.FindAllEnabledReceivers(tx, target, messageType) if err != nil { return err } diff --git a/internal/db/models/node_threshold_dao.go b/internal/db/models/node_threshold_dao.go new file mode 100644 index 00000000..2b91ba87 --- /dev/null +++ b/internal/db/models/node_threshold_dao.go @@ -0,0 +1,138 @@ +package models + +import ( + "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" + _ "github.com/go-sql-driver/mysql" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/dbs" +) + +const ( + NodeThresholdStateEnabled = 1 // 已启用 + NodeThresholdStateDisabled = 0 // 已禁用 +) + +type NodeThresholdDAO dbs.DAO + +func NewNodeThresholdDAO() *NodeThresholdDAO { + return dbs.NewDAO(&NodeThresholdDAO{ + DAOObject: dbs.DAOObject{ + DB: Tea.Env, + Table: "edgeNodeThresholds", + Model: new(NodeThreshold), + PkName: "id", + }, + }).(*NodeThresholdDAO) +} + +var SharedNodeThresholdDAO *NodeThresholdDAO + +func init() { + dbs.OnReady(func() { + SharedNodeThresholdDAO = NewNodeThresholdDAO() + }) +} + +// EnableNodeThreshold 启用条目 +func (this *NodeThresholdDAO) EnableNodeThreshold(tx *dbs.Tx, id int64) error { + _, err := this.Query(tx). + Pk(id). + Set("state", NodeThresholdStateEnabled). + Update() + return err +} + +// DisableNodeThreshold 禁用条目 +func (this *NodeThresholdDAO) DisableNodeThreshold(tx *dbs.Tx, id int64) error { + _, err := this.Query(tx). + Pk(id). + Set("state", NodeThresholdStateDisabled). + Update() + return err +} + +// FindEnabledNodeThreshold 查找启用中的条目 +func (this *NodeThresholdDAO) FindEnabledNodeThreshold(tx *dbs.Tx, id int64) (*NodeThreshold, error) { + result, err := this.Query(tx). + Pk(id). + Attr("state", NodeThresholdStateEnabled). + Find() + if result == nil { + return nil, err + } + return result.(*NodeThreshold), err +} + +// CreateThreshold 创建阈值 +func (this *NodeThresholdDAO) CreateThreshold(tx *dbs.Tx, clusterId int64, nodeId int64, item nodeconfigs.NodeValueItem, param string, operator nodeconfigs.NodeValueOperator, valueJSON []byte, message string, sumMethod nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (int64, error) { + op := NewNodeThresholdOperator() + op.ClusterId = clusterId + op.NodeId = nodeId + op.Item = item + op.Param = param + op.Operator = operator + op.Value = valueJSON + op.Message = message + op.SumMethod = sumMethod + op.Duration = duration + op.DurationUnit = durationUnit + op.IsOn = true + op.State = NodeThresholdStateEnabled + return this.SaveInt64(tx, op) +} + +// UpdateThreshold 修改阈值 +func (this *NodeThresholdDAO) UpdateThreshold(tx *dbs.Tx, thresholdId int64, item nodeconfigs.NodeValueItem, param string, operator nodeconfigs.NodeValueOperator, valueJSON []byte, message string, sumMethod nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit, isOn bool) error { + if thresholdId <= 0 { + return errors.New("invalid thresholdId") + } + op := NewNodeThresholdOperator() + op.Id = thresholdId + op.Item = item + op.Param = param + op.Operator = operator + op.Value = valueJSON + op.Message = message + op.SumMethod = sumMethod + op.Duration = duration + op.DurationUnit = durationUnit + op.IsOn = isOn + return this.Save(tx, op) +} + +// FindAllEnabledThresholds 列出所有阈值 +func (this *NodeThresholdDAO) FindAllEnabledThresholds(tx *dbs.Tx, clusterId int64, nodeId int64) (result []*NodeThreshold, err error) { + if clusterId <= 0 && nodeId <= 0 { + return + } + query := this.Query(tx) + if clusterId > 0 { + query.Attr("clusterId", clusterId) + } + if nodeId > 0 { + query.Attr("nodeId", nodeId) + } + query.State(NodeThresholdStateEnabled) + query.Slice(&result) + _, err = query. + AscPk(). + FindAll() + return +} + +// CountAllEnabledThresholds 计算阈值的数量 +func (this *NodeThresholdDAO) CountAllEnabledThresholds(tx *dbs.Tx, clusterId int64, nodeId int64) (int64, error) { + if clusterId <= 0 && nodeId <= 0 { + return 0, nil + } + query := this.Query(tx) + if clusterId > 0 { + query.Attr("clusterId", clusterId) + } + if nodeId > 0 { + query.Attr("nodeId", nodeId) + } + query.State(NodeThresholdStateEnabled) + return query.Count() +} diff --git a/internal/db/models/node_threshold_dao_test.go b/internal/db/models/node_threshold_dao_test.go new file mode 100644 index 00000000..224e9db7 --- /dev/null +++ b/internal/db/models/node_threshold_dao_test.go @@ -0,0 +1,6 @@ +package models + +import ( + _ "github.com/go-sql-driver/mysql" + _ "github.com/iwind/TeaGo/bootstrap" +) diff --git a/internal/db/models/node_threshold_model.go b/internal/db/models/node_threshold_model.go new file mode 100644 index 00000000..44468767 --- /dev/null +++ b/internal/db/models/node_threshold_model.go @@ -0,0 +1,40 @@ +package models + +// NodeThreshold 集群阈值设置 +type NodeThreshold struct { + Id uint64 `field:"id"` // ID + ClusterId uint32 `field:"clusterId"` // 集群ID + NodeId uint32 `field:"nodeId"` // 节点ID + IsOn uint8 `field:"isOn"` // 是否启用 + Item string `field:"item"` // 监控项 + Param string `field:"param"` // 参数 + Operator string `field:"operator"` // 操作符 + Value string `field:"value"` // 对比值 + Message string `field:"message"` // 消息内容 + State uint8 `field:"state"` // 状态 + Duration uint32 `field:"duration"` // 时间段 + DurationUnit string `field:"durationUnit"` // 时间段单位 + SumMethod string `field:"sumMethod"` // 聚合方法 + Order uint32 `field:"order"` // 排序 +} + +type NodeThresholdOperator struct { + Id interface{} // ID + ClusterId interface{} // 集群ID + NodeId interface{} // 节点ID + IsOn interface{} // 是否启用 + Item interface{} // 监控项 + Param interface{} // 参数 + Operator interface{} // 操作符 + Value interface{} // 对比值 + Message interface{} // 消息内容 + State interface{} // 状态 + Duration interface{} // 时间段 + DurationUnit interface{} // 时间段单位 + SumMethod interface{} // 聚合方法 + Order interface{} // 排序 +} + +func NewNodeThresholdOperator() *NodeThresholdOperator { + return &NodeThresholdOperator{} +} diff --git a/internal/db/models/node_threshold_model_ext.go b/internal/db/models/node_threshold_model_ext.go new file mode 100644 index 00000000..2640e7f9 --- /dev/null +++ b/internal/db/models/node_threshold_model_ext.go @@ -0,0 +1 @@ +package models diff --git a/internal/nodes/api_node.go b/internal/nodes/api_node.go index 2aeec805..739fb771 100644 --- a/internal/nodes/api_node.go +++ b/internal/nodes/api_node.go @@ -265,6 +265,7 @@ func (this *APINode) listenRPC(listener net.Listener, tlsConfig *tls.Config) err pb.RegisterAuthorityKeyServiceServer(rpcServer, &services.AuthorityKeyService{}) pb.RegisterAuthorityNodeServiceServer(rpcServer, &services.AuthorityNodeService{}) pb.RegisterLatestItemServiceServer(rpcServer, &services.LatestItemService{}) + pb.RegisterNodeThresholdServiceServer(rpcServer, &services.NodeThresholdService{}) err := rpcServer.Serve(listener) if err != nil { return errors.New("[API_NODE]start rpc failed: " + err.Error()) diff --git a/internal/rpc/services/service_message_receiver.go b/internal/rpc/services/service_message_receiver.go index 06c2b5ab..6f0f87dc 100644 --- a/internal/rpc/services/service_message_receiver.go +++ b/internal/rpc/services/service_message_receiver.go @@ -56,15 +56,15 @@ func (this *MessageReceiverService) UpdateMessageReceivers(ctx context.Context, return this.Success() } -// FindAllMessageReceivers 查找接收者 -func (this *MessageReceiverService) FindAllMessageReceivers(ctx context.Context, req *pb.FindAllMessageReceiversRequest) (*pb.FindAllMessageReceiversResponse, error) { +// FindAllEnabledMessageReceivers 查找接收者 +func (this *MessageReceiverService) FindAllEnabledMessageReceivers(ctx context.Context, req *pb.FindAllEnabledMessageReceiversRequest) (*pb.FindAllEnabledMessageReceiversResponse, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil { return nil, err } var tx = this.NullTx() - receivers, err := models.SharedMessageReceiverDAO.FindAllReceivers(tx, models.MessageTaskTarget{ + receivers, err := models.SharedMessageReceiverDAO.FindAllEnabledReceivers(tx, models.MessageTaskTarget{ ClusterId: req.NodeClusterId, NodeId: req.NodeId, ServerId: req.ServerId, @@ -152,7 +152,7 @@ func (this *MessageReceiverService) FindAllMessageReceivers(ctx context.Context, MessageRecipientGroup: pbRecipientGroup, }) } - return &pb.FindAllMessageReceiversResponse{MessageReceivers: pbReceivers}, nil + return &pb.FindAllEnabledMessageReceiversResponse{MessageReceivers: pbReceivers}, nil } // DeleteMessageReceiver 删除接收者 @@ -169,3 +169,22 @@ func (this *MessageReceiverService) DeleteMessageReceiver(ctx context.Context, r } return this.Success() } + +// CountAllEnabledMessageReceivers 计算接收者数量 +func (this *MessageReceiverService) CountAllEnabledMessageReceivers(ctx context.Context, req *pb.CountAllEnabledMessageReceiversRequest) (*pb.RPCCountResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + count, err := models.SharedMessageReceiverDAO.CountAllEnabledReceivers(tx, models.MessageTaskTarget{ + ClusterId: req.NodeClusterId, + NodeId: req.NodeId, + ServerId: req.ServerId, + }, "") + if err != nil { + return nil, err + } + return this.SuccessCount(count) +} diff --git a/internal/rpc/services/service_node_threshold.go b/internal/rpc/services/service_node_threshold.go new file mode 100644 index 00000000..dbf7f6c6 --- /dev/null +++ b/internal/rpc/services/service_node_threshold.go @@ -0,0 +1,136 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package services + +import ( + "context" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/iwind/TeaGo/types" +) + +// NodeThresholdService 节点阈值服务 +type NodeThresholdService struct { + BaseService +} + +// CreateNodeThreshold 创建阈值 +func (this *NodeThresholdService) CreateNodeThreshold(ctx context.Context, req *pb.CreateNodeThresholdRequest) (*pb.CreateNodeThresholdResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + var tx = this.NullTx() + thresholdId, err := models.SharedNodeThresholdDAO.CreateThreshold(tx, req.NodeClusterId, req.NodeId, req.Item, req.Param, req.Operator, req.ValueJSON, req.Message, req.SumMethod, req.Duration, req.DurationUnit) + if err != nil { + return nil, err + } + return &pb.CreateNodeThresholdResponse{NodeThresholdId: thresholdId}, nil +} + +// UpdateNodeThreshold 创建阈值 +func (this *NodeThresholdService) UpdateNodeThreshold(ctx context.Context, req *pb.UpdateNodeThresholdRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + var tx = this.NullTx() + err = models.SharedNodeThresholdDAO.UpdateThreshold(tx, req.NodeThresholdId, req.Item, req.Param, req.Operator, req.ValueJSON, req.Message, req.SumMethod, req.Duration, req.DurationUnit, req.IsOn) + if err != nil { + return nil, err + } + return this.Success() +} + +// DeleteNodeThreshold 删除阈值 +func (this *NodeThresholdService) DeleteNodeThreshold(ctx context.Context, req *pb.DeleteNodeThresholdRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + var tx = this.NullTx() + err = models.SharedNodeThresholdDAO.DisableNodeThreshold(tx, req.NodeThresholdId) + if err != nil { + return nil, err + } + return this.Success() +} + +// FindAllEnabledNodeThresholds 查询阈值 +func (this *NodeThresholdService) FindAllEnabledNodeThresholds(ctx context.Context, req *pb.FindAllEnabledNodeThresholdsRequest) (*pb.FindAllEnabledNodeThresholdsResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + pbThresholds := []*pb.NodeThreshold{} + thresholds, err := models.SharedNodeThresholdDAO.FindAllEnabledThresholds(tx, req.NodeClusterId, req.NodeId) + if err != nil { + return nil, err + } + for _, threshold := range thresholds { + pbThresholds = append(pbThresholds, &pb.NodeThreshold{ + Id: int64(threshold.Id), + ClusterId: int64(threshold.ClusterId), + NodeId: int64(threshold.NodeId), + Item: threshold.Item, + Param: threshold.Param, + Operator: threshold.Operator, + ValueJSON: []byte(threshold.Value), + Message: threshold.Message, + Duration: types.Int32(threshold.Duration), + DurationUnit: threshold.DurationUnit, + SumMethod: threshold.SumMethod, + IsOn: threshold.IsOn == 1, + }) + } + return &pb.FindAllEnabledNodeThresholdsResponse{NodeThresholds: pbThresholds}, nil +} + +// CountAllEnabledNodeThresholds 计算阈值数量 +func (this *NodeThresholdService) CountAllEnabledNodeThresholds(ctx context.Context, req *pb.CountAllEnabledNodeThresholdsRequest) (*pb.RPCCountResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + count, err := models.SharedNodeThresholdDAO.CountAllEnabledThresholds(tx, req.NodeClusterId, req.NodeId) + if err != nil { + return nil, err + } + return this.SuccessCount(count) +} + +// FindEnabledNodeThreshold 查询单个阈值详情 +func (this *NodeThresholdService) FindEnabledNodeThreshold(ctx context.Context, req *pb.FindEnabledNodeThresholdRequest) (*pb.FindEnabledNodeThresholdResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + threshold, err := models.SharedNodeThresholdDAO.FindEnabledNodeThreshold(tx, req.NodeThresholdId) + if err != nil { + return nil, err + } + if threshold == nil { + return &pb.FindEnabledNodeThresholdResponse{NodeThreshold: nil}, nil + } + + return &pb.FindEnabledNodeThresholdResponse{NodeThreshold: &pb.NodeThreshold{ + Id: int64(threshold.Id), + ClusterId: int64(threshold.ClusterId), + NodeId: int64(threshold.NodeId), + Item: threshold.Item, + Param: threshold.Param, + Operator: threshold.Operator, + ValueJSON: []byte(threshold.Value), + Message: threshold.Message, + Duration: types.Int32(threshold.Duration), + DurationUnit: threshold.DurationUnit, + SumMethod: threshold.SumMethod, + IsOn: threshold.IsOn == 1, + }}, nil +}