mirror of
				https://github.com/TeaOSLab/EdgeAPI.git
				synced 2025-11-04 07:50:25 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			417 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			417 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package models
 | 
						||
 | 
						||
import (
 | 
						||
	"encoding/json"
 | 
						||
	"github.com/TeaOSLab/EdgeAPI/internal/errors"
 | 
						||
	"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
 | 
						||
	_ "github.com/go-sql-driver/mysql"
 | 
						||
	"github.com/iwind/TeaGo/Tea"
 | 
						||
	"github.com/iwind/TeaGo/dbs"
 | 
						||
	"github.com/iwind/TeaGo/maps"
 | 
						||
	"github.com/iwind/TeaGo/types"
 | 
						||
	timeutil "github.com/iwind/TeaGo/utils/time"
 | 
						||
	"strings"
 | 
						||
	"time"
 | 
						||
)
 | 
						||
 | 
						||
type NodeValueDAO dbs.DAO
 | 
						||
 | 
						||
func NewNodeValueDAO() *NodeValueDAO {
 | 
						||
	return dbs.NewDAO(&NodeValueDAO{
 | 
						||
		DAOObject: dbs.DAOObject{
 | 
						||
			DB:     Tea.Env,
 | 
						||
			Table:  "edgeNodeValues",
 | 
						||
			Model:  new(NodeValue),
 | 
						||
			PkName: "id",
 | 
						||
		},
 | 
						||
	}).(*NodeValueDAO)
 | 
						||
}
 | 
						||
 | 
						||
var SharedNodeValueDAO *NodeValueDAO
 | 
						||
 | 
						||
func init() {
 | 
						||
	dbs.OnReady(func() {
 | 
						||
		SharedNodeValueDAO = NewNodeValueDAO()
 | 
						||
	})
 | 
						||
}
 | 
						||
 | 
						||
// CreateValue 创建值
 | 
						||
func (this *NodeValueDAO) CreateValue(tx *dbs.Tx, clusterId int64, role nodeconfigs.NodeRole, nodeId int64, item string, valueJSON []byte, createdAt int64) error {
 | 
						||
	if len(valueJSON) == 0 {
 | 
						||
		return errors.New("'valueJSON' should not be nil")
 | 
						||
	}
 | 
						||
 | 
						||
	var day = timeutil.FormatTime("Ymd", createdAt)
 | 
						||
	var hour = timeutil.FormatTime("YmdH", createdAt)
 | 
						||
	var minute = timeutil.FormatTime("YmdHi", createdAt)
 | 
						||
 | 
						||
	err := this.Query(tx).
 | 
						||
		InsertOrUpdateQuickly(maps.Map{
 | 
						||
			"clusterId": clusterId,
 | 
						||
			"role":      role,
 | 
						||
			"nodeId":    nodeId,
 | 
						||
			"item":      item,
 | 
						||
			"value":     valueJSON,
 | 
						||
			"createdAt": createdAt,
 | 
						||
			"day":       day,
 | 
						||
			"hour":      hour,
 | 
						||
			"minute":    minute,
 | 
						||
		}, maps.Map{
 | 
						||
			"value": valueJSON,
 | 
						||
		})
 | 
						||
	if err != nil {
 | 
						||
		return err
 | 
						||
	}
 | 
						||
 | 
						||
	// 触发钩子
 | 
						||
	err = this.nodeValueHook(tx, role, nodeId, item, valueJSON)
 | 
						||
	if err != nil {
 | 
						||
		return err
 | 
						||
	}
 | 
						||
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
// Clean 清除数据
 | 
						||
func (this *NodeValueDAO) Clean(tx *dbs.Tx) error {
 | 
						||
	var hour = timeutil.Format("YmdH", time.Now().Add(-2*time.Hour))
 | 
						||
	_, err := this.Query(tx).
 | 
						||
		Where("hour<=:hour").
 | 
						||
		Param("hour", hour).
 | 
						||
		Delete()
 | 
						||
	if err != nil {
 | 
						||
		return err
 | 
						||
	}
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
// ListValues 列出最近的的数据
 | 
						||
func (this *NodeValueDAO) ListValues(tx *dbs.Tx, role string, nodeId int64, item string, timeRange nodeconfigs.NodeValueRange) (result []*NodeValue, err error) {
 | 
						||
	query := this.Query(tx).
 | 
						||
		Attr("role", role).
 | 
						||
		Attr("nodeId", nodeId).
 | 
						||
		Attr("item", item)
 | 
						||
 | 
						||
	switch timeRange {
 | 
						||
	// TODO 支持更多的时间范围
 | 
						||
	case nodeconfigs.NodeValueRangeMinute:
 | 
						||
		fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-3600) // 一个小时之前的
 | 
						||
		query.Gte("minute", fromMinute)
 | 
						||
	default:
 | 
						||
		err = errors.New("invalid 'range' value: '" + timeRange + "'")
 | 
						||
		return
 | 
						||
	}
 | 
						||
 | 
						||
	_, err = query.Slice(&result).
 | 
						||
		FindAll()
 | 
						||
	return
 | 
						||
}
 | 
						||
 | 
						||
// ListValuesWithClusterId 列出集群最近的的平均数据
 | 
						||
func (this *NodeValueDAO) ListValuesWithClusterId(tx *dbs.Tx, role string, clusterId int64, item string, key string, timeRange nodeconfigs.NodeValueRange) (result []*NodeValue, err error) {
 | 
						||
	query := this.Query(tx).
 | 
						||
		Attr("role", role).
 | 
						||
		Attr("item", item).
 | 
						||
		Result("AVG(JSON_EXTRACT(value, '$." + key + "')) AS value, MIN(createdAt) AS createdAt")
 | 
						||
 | 
						||
	switch role {
 | 
						||
	case nodeconfigs.NodeRoleNode:
 | 
						||
		query.Where("nodeId IN (SELECT id FROM " + SharedNodeDAO.Table + " WHERE (clusterId=:clusterId OR JSON_CONTAINS(secondaryClusterIds, :clusterIdString)) AND state=1)")
 | 
						||
		query.Param("clusterId", clusterId).
 | 
						||
			Param("clusterIdString", types.String(clusterId))
 | 
						||
	default:
 | 
						||
		query.Attr("clusterId", clusterId)
 | 
						||
	}
 | 
						||
 | 
						||
	switch timeRange {
 | 
						||
	// TODO 支持更多的时间范围
 | 
						||
	case nodeconfigs.NodeValueRangeMinute:
 | 
						||
		fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-3600) // 一个小时之前的
 | 
						||
		query.Gte("minute", fromMinute)
 | 
						||
		query.Result("minute")
 | 
						||
		query.Group("minute")
 | 
						||
	default:
 | 
						||
		err = errors.New("invalid 'range' value: '" + timeRange + "'")
 | 
						||
		return
 | 
						||
	}
 | 
						||
 | 
						||
	_, err = query.Slice(&result).
 | 
						||
		FindAll()
 | 
						||
 | 
						||
	if err != nil {
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
 | 
						||
	for _, nodeValue := range result {
 | 
						||
		nodeValue.Value, _ = json.Marshal(maps.Map{
 | 
						||
			key: types.Float32(string(nodeValue.Value)),
 | 
						||
		})
 | 
						||
	}
 | 
						||
 | 
						||
	return
 | 
						||
}
 | 
						||
 | 
						||
// ListValuesForUserNodes 列出用户节点相关的平均数据
 | 
						||
func (this *NodeValueDAO) ListValuesForUserNodes(tx *dbs.Tx, item string, key string, timeRange nodeconfigs.NodeValueRange) (result []*NodeValue, err error) {
 | 
						||
	query := this.Query(tx).
 | 
						||
		Attr("role", "user").
 | 
						||
		Attr("item", item).
 | 
						||
		Result("AVG(JSON_EXTRACT(value, '$." + key + "')) AS value, MIN(createdAt) AS createdAt")
 | 
						||
 | 
						||
	switch timeRange {
 | 
						||
	// TODO 支持更多的时间范围
 | 
						||
	case nodeconfigs.NodeValueRangeMinute:
 | 
						||
		fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-3600) // 一个小时之前的
 | 
						||
		query.Gte("minute", fromMinute)
 | 
						||
		query.Result("minute")
 | 
						||
		query.Group("minute")
 | 
						||
	default:
 | 
						||
		err = errors.New("invalid 'range' value: '" + timeRange + "'")
 | 
						||
		return
 | 
						||
	}
 | 
						||
 | 
						||
	_, err = query.Slice(&result).
 | 
						||
		FindAll()
 | 
						||
 | 
						||
	if err != nil {
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
 | 
						||
	for _, nodeValue := range result {
 | 
						||
		nodeValue.Value, _ = json.Marshal(maps.Map{
 | 
						||
			key: types.Float32(string(nodeValue.Value)),
 | 
						||
		})
 | 
						||
	}
 | 
						||
 | 
						||
	return
 | 
						||
}
 | 
						||
 | 
						||
// ListValuesForNSNodes 列出用户节点相关的平均数据
 | 
						||
func (this *NodeValueDAO) ListValuesForNSNodes(tx *dbs.Tx, item string, key string, timeRange nodeconfigs.NodeValueRange) (result []*NodeValue, err error) {
 | 
						||
	query := this.Query(tx).
 | 
						||
		Attr("role", "dns").
 | 
						||
		Attr("item", item).
 | 
						||
		Result("AVG(JSON_EXTRACT(value, '$." + key + "')) AS value, MIN(createdAt) AS createdAt")
 | 
						||
 | 
						||
	switch timeRange {
 | 
						||
	// TODO 支持更多的时间范围
 | 
						||
	case nodeconfigs.NodeValueRangeMinute:
 | 
						||
		fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-3600) // 一个小时之前的
 | 
						||
		query.Gte("minute", fromMinute)
 | 
						||
		query.Result("minute")
 | 
						||
		query.Group("minute")
 | 
						||
	default:
 | 
						||
		err = errors.New("invalid 'range' value: '" + timeRange + "'")
 | 
						||
		return
 | 
						||
	}
 | 
						||
 | 
						||
	_, err = query.Slice(&result).
 | 
						||
		FindAll()
 | 
						||
 | 
						||
	if err != nil {
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
 | 
						||
	for _, nodeValue := range result {
 | 
						||
		nodeValue.Value, _ = json.Marshal(maps.Map{
 | 
						||
			key: types.Float32(string(nodeValue.Value)),
 | 
						||
		})
 | 
						||
	}
 | 
						||
 | 
						||
	return
 | 
						||
}
 | 
						||
 | 
						||
// SumAllNodeValues 计算所有节点的某项参数值
 | 
						||
func (this *NodeValueDAO) SumAllNodeValues(tx *dbs.Tx, role string, item nodeconfigs.NodeValueItem, param string, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (total float64, avg float64, max float64, err error) {
 | 
						||
	if duration <= 0 {
 | 
						||
		return 0, 0, 0, nil
 | 
						||
	}
 | 
						||
 | 
						||
	var query = this.Query(tx).
 | 
						||
		Result("SUM(JSON_EXTRACT(value, '$."+param+"')) AS sumValue", "AVG(JSON_EXTRACT(value, '$."+param+"')) AS avgValue", "MAX(JSON_EXTRACT(value, '$."+param+"')) AS maxValueResult"). // maxValue 是个MySQL Keyword,这里使用maxValueResult代替
 | 
						||
		Attr("role", role).
 | 
						||
		Attr("item", item)
 | 
						||
 | 
						||
	switch durationUnit {
 | 
						||
	case nodeconfigs.NodeValueDurationUnitMinute:
 | 
						||
		fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration*60))
 | 
						||
		query.Attr("minute", fromMinute)
 | 
						||
	default:
 | 
						||
		fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration*60))
 | 
						||
		query.Attr("minute", fromMinute)
 | 
						||
	}
 | 
						||
 | 
						||
	m, _, err := query.FindOne()
 | 
						||
	if err != nil {
 | 
						||
		return 0, 0, 0, err
 | 
						||
	}
 | 
						||
 | 
						||
	return m.GetFloat64("sumValue"), m.GetFloat64("avgValue"), m.GetFloat64("maxValueResult"), nil
 | 
						||
}
 | 
						||
 | 
						||
// SumNodeValues 计算节点的某项参数值
 | 
						||
func (this *NodeValueDAO) SumNodeValues(tx *dbs.Tx, role string, nodeId int64, item string, param string, method nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (float64, error) {
 | 
						||
	if duration <= 0 {
 | 
						||
		return 0, nil
 | 
						||
	}
 | 
						||
 | 
						||
	query := this.Query(tx).
 | 
						||
		Attr("role", role).
 | 
						||
		Attr("nodeId", nodeId).
 | 
						||
		Attr("item", item)
 | 
						||
	switch method {
 | 
						||
	case nodeconfigs.NodeValueSumMethodAvg:
 | 
						||
		query.Result("AVG(JSON_EXTRACT(value, '$." + param + "'))")
 | 
						||
	case nodeconfigs.NodeValueSumMethodSum:
 | 
						||
		query.Result("SUM(JSON_EXTRACT(value, '$." + param + "'))")
 | 
						||
	default:
 | 
						||
		query.Result("AVG(JSON_EXTRACT(value, '$." + param + "'))")
 | 
						||
	}
 | 
						||
	switch durationUnit {
 | 
						||
	case nodeconfigs.NodeValueDurationUnitMinute:
 | 
						||
		fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration*60))
 | 
						||
		query.Gte("minute", fromMinute)
 | 
						||
	default:
 | 
						||
		fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration*60))
 | 
						||
		query.Gte("minute", fromMinute)
 | 
						||
	}
 | 
						||
	return query.FindFloat64Col(0)
 | 
						||
}
 | 
						||
 | 
						||
// SumNodeGroupValues 计算节点分组的某项参数值
 | 
						||
func (this *NodeValueDAO) SumNodeGroupValues(tx *dbs.Tx, role string, groupId int64, item string, param string, method nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (float64, error) {
 | 
						||
	if duration <= 0 {
 | 
						||
		return 0, nil
 | 
						||
	}
 | 
						||
 | 
						||
	query := this.Query(tx).
 | 
						||
		Attr("role", role).
 | 
						||
		Where("nodeId IN (SELECT id FROM "+SharedNodeDAO.Table+" WHERE groupId=:groupId AND state=1)").
 | 
						||
		Param("groupId", groupId).
 | 
						||
		Attr("item", item)
 | 
						||
	switch method {
 | 
						||
	case nodeconfigs.NodeValueSumMethodAvg:
 | 
						||
		query.Result("AVG(JSON_EXTRACT(value, '$." + param + "'))")
 | 
						||
	case nodeconfigs.NodeValueSumMethodSum:
 | 
						||
		query.Result("SUM(JSON_EXTRACT(value, '$." + param + "'))")
 | 
						||
	default:
 | 
						||
		query.Result("AVG(JSON_EXTRACT(value, '$." + param + "'))")
 | 
						||
	}
 | 
						||
	switch durationUnit {
 | 
						||
	case nodeconfigs.NodeValueDurationUnitMinute:
 | 
						||
		fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration*60))
 | 
						||
		query.Gte("minute", fromMinute)
 | 
						||
	default:
 | 
						||
		fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration*60))
 | 
						||
		query.Gte("minute", fromMinute)
 | 
						||
	}
 | 
						||
	return query.FindFloat64Col(0)
 | 
						||
}
 | 
						||
 | 
						||
// SumNodeClusterValues 计算节点集群的某项参数值
 | 
						||
func (this *NodeValueDAO) SumNodeClusterValues(tx *dbs.Tx, role string, clusterId int64, item string, param string, method nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (float64, error) {
 | 
						||
	if duration <= 0 {
 | 
						||
		return 0, nil
 | 
						||
	}
 | 
						||
 | 
						||
	query := this.Query(tx).
 | 
						||
		Attr("role", role).
 | 
						||
		Attr("clusterId", clusterId).
 | 
						||
		Attr("item", item)
 | 
						||
	switch method {
 | 
						||
	case nodeconfigs.NodeValueSumMethodAvg:
 | 
						||
		query.Result("AVG(JSON_EXTRACT(value, '$." + param + "'))")
 | 
						||
	case nodeconfigs.NodeValueSumMethodSum:
 | 
						||
		query.Result("SUM(JSON_EXTRACT(value, '$." + param + "'))")
 | 
						||
	default:
 | 
						||
		query.Result("AVG(JSON_EXTRACT(value, '$." + param + "'))")
 | 
						||
	}
 | 
						||
	switch durationUnit {
 | 
						||
	case nodeconfigs.NodeValueDurationUnitMinute:
 | 
						||
		fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration*60))
 | 
						||
		query.Gte("minute", fromMinute)
 | 
						||
	default:
 | 
						||
		fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(duration*60))
 | 
						||
		query.Gte("minute", fromMinute)
 | 
						||
	}
 | 
						||
	return query.FindFloat64Col(0)
 | 
						||
}
 | 
						||
 | 
						||
// FindLatestNodeValue 获取最近一条数据
 | 
						||
func (this *NodeValueDAO) FindLatestNodeValue(tx *dbs.Tx, role string, nodeId int64, item string) (*NodeValue, error) {
 | 
						||
	var fromMinute = timeutil.FormatTime("YmdHi", time.Now().Unix()-int64(60))
 | 
						||
 | 
						||
	one, err := this.Query(tx).
 | 
						||
		Attr("role", role).
 | 
						||
		Attr("nodeId", nodeId).
 | 
						||
		Attr("item", item).
 | 
						||
		Attr("minute", fromMinute).
 | 
						||
		Find()
 | 
						||
	if err != nil {
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
	if one == nil {
 | 
						||
		return nil, nil
 | 
						||
	}
 | 
						||
	return one.(*NodeValue), nil
 | 
						||
}
 | 
						||
 | 
						||
// ComposeNodeStatus 组合节点状态值
 | 
						||
func (this *NodeValueDAO) ComposeNodeStatus(tx *dbs.Tx, role string, nodeId int64, statusConfig *nodeconfigs.NodeStatus) error {
 | 
						||
	var items = []string{
 | 
						||
		nodeconfigs.NodeValueItemCPU,
 | 
						||
		nodeconfigs.NodeValueItemMemory,
 | 
						||
		nodeconfigs.NodeValueItemLoad,
 | 
						||
		nodeconfigs.NodeValueItemTrafficOut,
 | 
						||
		nodeconfigs.NodeValueItemTrafficIn,
 | 
						||
	}
 | 
						||
	ones, err := this.Query(tx).
 | 
						||
		Result("item", "value").
 | 
						||
		Attr("role", role).
 | 
						||
		Attr("nodeId", nodeId).
 | 
						||
		Attr("minute", timeutil.FormatTime("YmdHi", time.Now().Unix()-60)).
 | 
						||
		Where("item IN ('" + strings.Join(items, "', '") + "')").
 | 
						||
		FindAll()
 | 
						||
	if err != nil {
 | 
						||
		return err
 | 
						||
	}
 | 
						||
	for _, one := range ones {
 | 
						||
		var oneValue = one.(*NodeValue)
 | 
						||
		var valueMap = oneValue.DecodeMapValue()
 | 
						||
		switch oneValue.Item {
 | 
						||
		case nodeconfigs.NodeValueItemCPU:
 | 
						||
			statusConfig.CPUUsage = valueMap.GetFloat64("usage")
 | 
						||
		case nodeconfigs.NodeValueItemMemory:
 | 
						||
			statusConfig.MemoryUsage = valueMap.GetFloat64("usage")
 | 
						||
		case nodeconfigs.NodeValueItemLoad:
 | 
						||
			statusConfig.Load1m = valueMap.GetFloat64("load1m")
 | 
						||
			statusConfig.Load5m = valueMap.GetFloat64("load5m")
 | 
						||
			statusConfig.Load15m = valueMap.GetFloat64("load15m")
 | 
						||
		case nodeconfigs.NodeValueItemTrafficOut:
 | 
						||
			statusConfig.TrafficOutBytes = valueMap.GetUint64("total")
 | 
						||
		case nodeconfigs.NodeValueItemTrafficIn:
 | 
						||
			statusConfig.TrafficInBytes = valueMap.GetUint64("total")
 | 
						||
		}
 | 
						||
	}
 | 
						||
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
// ComposeNodeStatusJSON 组合节点状态值,并转换为JSON数据
 | 
						||
func (this *NodeValueDAO) ComposeNodeStatusJSON(tx *dbs.Tx, role string, nodeId int64, statusJSON []byte) ([]byte, error) {
 | 
						||
	var statusConfig = &nodeconfigs.NodeStatus{}
 | 
						||
	if len(statusJSON) > 0 {
 | 
						||
		err := json.Unmarshal(statusJSON, statusConfig)
 | 
						||
		if err != nil {
 | 
						||
			return nil, err
 | 
						||
		}
 | 
						||
	}
 | 
						||
 | 
						||
	err := this.ComposeNodeStatus(tx, role, nodeId, statusConfig)
 | 
						||
	if err != nil {
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
 | 
						||
	return json.Marshal(statusConfig)
 | 
						||
}
 |