mirror of
https://github.com/TeaOSLab/EdgeAPI.git
synced 2025-11-03 23:20:26 +08:00
417 lines
12 KiB
Go
417 lines
12 KiB
Go
package models
|
||
|
||
import (
|
||
"encoding/json"
|
||
"github.com/TeaOSLab/EdgeAPI/internal/errors"
|
||
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
||
_ "github.com/go-sql-driver/mysql"
|
||
"github.com/iwind/TeaGo/Tea"
|
||
"github.com/iwind/TeaGo/dbs"
|
||
"github.com/iwind/TeaGo/maps"
|
||
"github.com/iwind/TeaGo/types"
|
||
timeutil "github.com/iwind/TeaGo/utils/time"
|
||
"strings"
|
||
"time"
|
||
)
|
||
|
||
type NodeValueDAO dbs.DAO
|
||
|
||
func NewNodeValueDAO() *NodeValueDAO {
|
||
return dbs.NewDAO(&NodeValueDAO{
|
||
DAOObject: dbs.DAOObject{
|
||
DB: Tea.Env,
|
||
Table: "edgeNodeValues",
|
||
Model: new(NodeValue),
|
||
PkName: "id",
|
||
},
|
||
}).(*NodeValueDAO)
|
||
}
|
||
|
||
var SharedNodeValueDAO *NodeValueDAO
|
||
|
||
func init() {
|
||
dbs.OnReady(func() {
|
||
SharedNodeValueDAO = NewNodeValueDAO()
|
||
})
|
||
}
|
||
|
||
// CreateValue 创建值
|
||
func (this *NodeValueDAO) CreateValue(tx *dbs.Tx, clusterId int64, role nodeconfigs.NodeRole, nodeId int64, item string, valueJSON []byte, createdAt int64) error {
|
||
if len(valueJSON) == 0 {
|
||
return errors.New("'valueJSON' should not be nil")
|
||
}
|
||
|
||
var day = timeutil.FormatTime("Ymd", createdAt)
|
||
var hour = timeutil.FormatTime("YmdH", createdAt)
|
||
var minute = timeutil.FormatTime("YmdHi", createdAt)
|
||
|
||
err := this.Query(tx).
|
||
InsertOrUpdateQuickly(maps.Map{
|
||
"clusterId": clusterId,
|
||
"role": role,
|
||
"nodeId": nodeId,
|
||
"item": item,
|
||
"value": valueJSON,
|
||
"createdAt": createdAt,
|
||
"day": day,
|
||
"hour": hour,
|
||
"minute": minute,
|
||
}, maps.Map{
|
||
"value": valueJSON,
|
||
})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 触发钩子
|
||
err = this.nodeValueHook(tx, role, nodeId, item, valueJSON)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// Clean 清除数据
|
||
func (this *NodeValueDAO) Clean(tx *dbs.Tx) error {
|
||
var hour = timeutil.Format("YmdH", time.Now().Add(-2*time.Hour))
|
||
_, err := this.Query(tx).
|
||
Where("hour<=:hour").
|
||
Param("hour", hour).
|
||
Delete()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// ListValues 列出最近的的数据
|
||
func (this *NodeValueDAO) ListValues(tx *dbs.Tx, role string, nodeId int64, item string, timeRange nodeconfigs.NodeValueRange) (result []*NodeValue, err error) {
|
||
query := this.Query(tx).
|
||
Attr("role", role).
|
||
Attr("nodeId", nodeId).
|
||
Attr("item", item)
|
||
|
||
switch timeRange {
|
||
// TODO 支持更多的时间范围
|
||
case nodeconfigs.NodeValueRangeMinute:
|
||
fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-3600) // 一个小时之前的
|
||
query.Gte("minute", fromMinute)
|
||
default:
|
||
err = errors.New("invalid 'range' value: '" + timeRange + "'")
|
||
return
|
||
}
|
||
|
||
_, err = query.Slice(&result).
|
||
FindAll()
|
||
return
|
||
}
|
||
|
||
// ListValuesWithClusterId 列出集群最近的的平均数据
|
||
func (this *NodeValueDAO) ListValuesWithClusterId(tx *dbs.Tx, role string, clusterId int64, item string, key string, timeRange nodeconfigs.NodeValueRange) (result []*NodeValue, err error) {
|
||
query := this.Query(tx).
|
||
Attr("role", role).
|
||
Attr("item", item).
|
||
Result("AVG(JSON_EXTRACT(value, '$." + key + "')) AS value, MIN(createdAt) AS createdAt")
|
||
|
||
switch role {
|
||
case nodeconfigs.NodeRoleNode:
|
||
query.Where("nodeId IN (SELECT id FROM " + SharedNodeDAO.Table + " WHERE (clusterId=:clusterId OR JSON_CONTAINS(secondaryClusterIds, :clusterIdString)) AND state=1)")
|
||
query.Param("clusterId", clusterId).
|
||
Param("clusterIdString", types.String(clusterId))
|
||
default:
|
||
query.Attr("clusterId", clusterId)
|
||
}
|
||
|
||
switch timeRange {
|
||
// TODO 支持更多的时间范围
|
||
case nodeconfigs.NodeValueRangeMinute:
|
||
fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-3600) // 一个小时之前的
|
||
query.Gte("minute", fromMinute)
|
||
query.Result("minute")
|
||
query.Group("minute")
|
||
default:
|
||
err = errors.New("invalid 'range' value: '" + timeRange + "'")
|
||
return
|
||
}
|
||
|
||
_, err = query.Slice(&result).
|
||
FindAll()
|
||
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
for _, nodeValue := range result {
|
||
nodeValue.Value, _ = json.Marshal(maps.Map{
|
||
key: types.Float32(string(nodeValue.Value)),
|
||
})
|
||
}
|
||
|
||
return
|
||
}
|
||
|
||
// ListValuesForUserNodes 列出用户节点相关的平均数据
|
||
func (this *NodeValueDAO) ListValuesForUserNodes(tx *dbs.Tx, item string, key string, timeRange nodeconfigs.NodeValueRange) (result []*NodeValue, err error) {
|
||
query := this.Query(tx).
|
||
Attr("role", "user").
|
||
Attr("item", item).
|
||
Result("AVG(JSON_EXTRACT(value, '$." + key + "')) AS value, MIN(createdAt) AS createdAt")
|
||
|
||
switch timeRange {
|
||
// TODO 支持更多的时间范围
|
||
case nodeconfigs.NodeValueRangeMinute:
|
||
fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-3600) // 一个小时之前的
|
||
query.Gte("minute", fromMinute)
|
||
query.Result("minute")
|
||
query.Group("minute")
|
||
default:
|
||
err = errors.New("invalid 'range' value: '" + timeRange + "'")
|
||
return
|
||
}
|
||
|
||
_, err = query.Slice(&result).
|
||
FindAll()
|
||
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
for _, nodeValue := range result {
|
||
nodeValue.Value, _ = json.Marshal(maps.Map{
|
||
key: types.Float32(string(nodeValue.Value)),
|
||
})
|
||
}
|
||
|
||
return
|
||
}
|
||
|
||
// ListValuesForNSNodes 列出用户节点相关的平均数据
|
||
func (this *NodeValueDAO) ListValuesForNSNodes(tx *dbs.Tx, item string, key string, timeRange nodeconfigs.NodeValueRange) (result []*NodeValue, err error) {
|
||
query := this.Query(tx).
|
||
Attr("role", "dns").
|
||
Attr("item", item).
|
||
Result("AVG(JSON_EXTRACT(value, '$." + key + "')) AS value, MIN(createdAt) AS createdAt")
|
||
|
||
switch timeRange {
|
||
// TODO 支持更多的时间范围
|
||
case nodeconfigs.NodeValueRangeMinute:
|
||
fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-3600) // 一个小时之前的
|
||
query.Gte("minute", fromMinute)
|
||
query.Result("minute")
|
||
query.Group("minute")
|
||
default:
|
||
err = errors.New("invalid 'range' value: '" + timeRange + "'")
|
||
return
|
||
}
|
||
|
||
_, err = query.Slice(&result).
|
||
FindAll()
|
||
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
for _, nodeValue := range result {
|
||
nodeValue.Value, _ = json.Marshal(maps.Map{
|
||
key: types.Float32(string(nodeValue.Value)),
|
||
})
|
||
}
|
||
|
||
return
|
||
}
|
||
|
||
// SumAllNodeValues 计算所有节点的某项参数值
|
||
func (this *NodeValueDAO) SumAllNodeValues(tx *dbs.Tx, role string, item nodeconfigs.NodeValueItem, param string, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (total float64, avg float64, max float64, err error) {
|
||
if duration <= 0 {
|
||
return 0, 0, 0, nil
|
||
}
|
||
|
||
var query = this.Query(tx).
|
||
Result("SUM(JSON_EXTRACT(value, '$."+param+"')) AS sumValue", "AVG(JSON_EXTRACT(value, '$."+param+"')) AS avgValue", "MAX(JSON_EXTRACT(value, '$."+param+"')) AS maxValueResult"). // maxValue 是个MySQL Keyword,这里使用maxValueResult代替
|
||
Attr("role", role).
|
||
Attr("item", item)
|
||
|
||
switch durationUnit {
|
||
case nodeconfigs.NodeValueDurationUnitMinute:
|
||
fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration*60))
|
||
query.Attr("minute", fromMinute)
|
||
default:
|
||
fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration*60))
|
||
query.Attr("minute", fromMinute)
|
||
}
|
||
|
||
m, _, err := query.FindOne()
|
||
if err != nil {
|
||
return 0, 0, 0, err
|
||
}
|
||
|
||
return m.GetFloat64("sumValue"), m.GetFloat64("avgValue"), m.GetFloat64("maxValueResult"), nil
|
||
}
|
||
|
||
// SumNodeValues 计算节点的某项参数值
|
||
func (this *NodeValueDAO) SumNodeValues(tx *dbs.Tx, role string, nodeId int64, item string, param string, method nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (float64, error) {
|
||
if duration <= 0 {
|
||
return 0, nil
|
||
}
|
||
|
||
query := this.Query(tx).
|
||
Attr("role", role).
|
||
Attr("nodeId", nodeId).
|
||
Attr("item", item)
|
||
switch method {
|
||
case nodeconfigs.NodeValueSumMethodAvg:
|
||
query.Result("AVG(JSON_EXTRACT(value, '$." + param + "'))")
|
||
case nodeconfigs.NodeValueSumMethodSum:
|
||
query.Result("SUM(JSON_EXTRACT(value, '$." + param + "'))")
|
||
default:
|
||
query.Result("AVG(JSON_EXTRACT(value, '$." + param + "'))")
|
||
}
|
||
switch durationUnit {
|
||
case nodeconfigs.NodeValueDurationUnitMinute:
|
||
fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration*60))
|
||
query.Gte("minute", fromMinute)
|
||
default:
|
||
fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration*60))
|
||
query.Gte("minute", fromMinute)
|
||
}
|
||
return query.FindFloat64Col(0)
|
||
}
|
||
|
||
// SumNodeGroupValues 计算节点分组的某项参数值
|
||
func (this *NodeValueDAO) SumNodeGroupValues(tx *dbs.Tx, role string, groupId int64, item string, param string, method nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (float64, error) {
|
||
if duration <= 0 {
|
||
return 0, nil
|
||
}
|
||
|
||
query := this.Query(tx).
|
||
Attr("role", role).
|
||
Where("nodeId IN (SELECT id FROM "+SharedNodeDAO.Table+" WHERE groupId=:groupId AND state=1)").
|
||
Param("groupId", groupId).
|
||
Attr("item", item)
|
||
switch method {
|
||
case nodeconfigs.NodeValueSumMethodAvg:
|
||
query.Result("AVG(JSON_EXTRACT(value, '$." + param + "'))")
|
||
case nodeconfigs.NodeValueSumMethodSum:
|
||
query.Result("SUM(JSON_EXTRACT(value, '$." + param + "'))")
|
||
default:
|
||
query.Result("AVG(JSON_EXTRACT(value, '$." + param + "'))")
|
||
}
|
||
switch durationUnit {
|
||
case nodeconfigs.NodeValueDurationUnitMinute:
|
||
fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration*60))
|
||
query.Gte("minute", fromMinute)
|
||
default:
|
||
fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration*60))
|
||
query.Gte("minute", fromMinute)
|
||
}
|
||
return query.FindFloat64Col(0)
|
||
}
|
||
|
||
// SumNodeClusterValues 计算节点集群的某项参数值
|
||
func (this *NodeValueDAO) SumNodeClusterValues(tx *dbs.Tx, role string, clusterId int64, item string, param string, method nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (float64, error) {
|
||
if duration <= 0 {
|
||
return 0, nil
|
||
}
|
||
|
||
query := this.Query(tx).
|
||
Attr("role", role).
|
||
Attr("clusterId", clusterId).
|
||
Attr("item", item)
|
||
switch method {
|
||
case nodeconfigs.NodeValueSumMethodAvg:
|
||
query.Result("AVG(JSON_EXTRACT(value, '$." + param + "'))")
|
||
case nodeconfigs.NodeValueSumMethodSum:
|
||
query.Result("SUM(JSON_EXTRACT(value, '$." + param + "'))")
|
||
default:
|
||
query.Result("AVG(JSON_EXTRACT(value, '$." + param + "'))")
|
||
}
|
||
switch durationUnit {
|
||
case nodeconfigs.NodeValueDurationUnitMinute:
|
||
fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration*60))
|
||
query.Gte("minute", fromMinute)
|
||
default:
|
||
fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration*60))
|
||
query.Gte("minute", fromMinute)
|
||
}
|
||
return query.FindFloat64Col(0)
|
||
}
|
||
|
||
// FindLatestNodeValue 获取最近一条数据
|
||
func (this *NodeValueDAO) FindLatestNodeValue(tx *dbs.Tx, role string, nodeId int64, item string) (*NodeValue, error) {
|
||
var fromMinute = timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(60))
|
||
|
||
one, err := this.Query(tx).
|
||
Attr("role", role).
|
||
Attr("nodeId", nodeId).
|
||
Attr("item", item).
|
||
Attr("minute", fromMinute).
|
||
Find()
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if one == nil {
|
||
return nil, nil
|
||
}
|
||
return one.(*NodeValue), nil
|
||
}
|
||
|
||
// ComposeNodeStatus 组合节点状态值
|
||
func (this *NodeValueDAO) ComposeNodeStatus(tx *dbs.Tx, role string, nodeId int64, statusConfig *nodeconfigs.NodeStatus) error {
|
||
var items = []string{
|
||
nodeconfigs.NodeValueItemCPU,
|
||
nodeconfigs.NodeValueItemMemory,
|
||
nodeconfigs.NodeValueItemLoad,
|
||
nodeconfigs.NodeValueItemTrafficOut,
|
||
nodeconfigs.NodeValueItemTrafficIn,
|
||
}
|
||
ones, err := this.Query(tx).
|
||
Result("item", "value").
|
||
Attr("role", role).
|
||
Attr("nodeId", nodeId).
|
||
Attr("minute", timeutil.FormatTime("YmdHi", time.Now().Unix()-60)).
|
||
Where("item IN ('" + strings.Join(items, "', '") + "')").
|
||
FindAll()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
for _, one := range ones {
|
||
var oneValue = one.(*NodeValue)
|
||
var valueMap = oneValue.DecodeMapValue()
|
||
switch oneValue.Item {
|
||
case nodeconfigs.NodeValueItemCPU:
|
||
statusConfig.CPUUsage = valueMap.GetFloat64("usage")
|
||
case nodeconfigs.NodeValueItemMemory:
|
||
statusConfig.MemoryUsage = valueMap.GetFloat64("usage")
|
||
case nodeconfigs.NodeValueItemLoad:
|
||
statusConfig.Load1m = valueMap.GetFloat64("load1m")
|
||
statusConfig.Load5m = valueMap.GetFloat64("load5m")
|
||
statusConfig.Load15m = valueMap.GetFloat64("load15m")
|
||
case nodeconfigs.NodeValueItemTrafficOut:
|
||
statusConfig.TrafficOutBytes = valueMap.GetUint64("total")
|
||
case nodeconfigs.NodeValueItemTrafficIn:
|
||
statusConfig.TrafficInBytes = valueMap.GetUint64("total")
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// ComposeNodeStatusJSON 组合节点状态值,并转换为JSON数据
|
||
func (this *NodeValueDAO) ComposeNodeStatusJSON(tx *dbs.Tx, role string, nodeId int64, statusJSON []byte) ([]byte, error) {
|
||
var statusConfig = &nodeconfigs.NodeStatus{}
|
||
if len(statusJSON) > 0 {
|
||
err := json.Unmarshal(statusJSON, statusConfig)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
}
|
||
|
||
err := this.ComposeNodeStatus(tx, role, nodeId, statusConfig)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
return json.Marshal(statusConfig)
|
||
}
|