2021-04-29 16:48:09 +08:00
package models
import (
2022-04-07 18:31:38 +08:00
"encoding/json"
2024-07-27 14:15:25 +08:00
"strings"
"time"
2021-04-29 16:48:09 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/maps"
2021-08-01 11:13:46 +08:00
"github.com/iwind/TeaGo/types"
2021-04-29 16:48:09 +08:00
timeutil "github.com/iwind/TeaGo/utils/time"
)
type NodeValueDAO dbs . DAO
func NewNodeValueDAO ( ) * NodeValueDAO {
return dbs . NewDAO ( & NodeValueDAO {
DAOObject : dbs . DAOObject {
DB : Tea . Env ,
Table : "edgeNodeValues" ,
Model : new ( NodeValue ) ,
PkName : "id" ,
} ,
} ) . ( * NodeValueDAO )
}
var SharedNodeValueDAO * NodeValueDAO
func init ( ) {
dbs . OnReady ( func ( ) {
SharedNodeValueDAO = NewNodeValueDAO ( )
} )
}
// CreateValue 创建值
2021-07-05 11:37:22 +08:00
func ( this * NodeValueDAO ) CreateValue ( tx * dbs . Tx , clusterId int64 , role nodeconfigs . NodeRole , nodeId int64 , item string , valueJSON [ ] byte , createdAt int64 ) error {
2023-05-17 18:42:21 +08:00
if len ( valueJSON ) == 0 {
return errors . New ( "'valueJSON' should not be nil" )
}
2022-04-07 18:31:38 +08:00
var day = timeutil . FormatTime ( "Ymd" , createdAt )
var hour = timeutil . FormatTime ( "YmdH" , createdAt )
var minute = timeutil . FormatTime ( "YmdHi" , createdAt )
2021-04-29 16:48:09 +08:00
2023-05-17 18:42:21 +08:00
err := this . Query ( tx ) .
2021-04-29 16:48:09 +08:00
InsertOrUpdateQuickly ( maps . Map {
2021-07-05 11:37:22 +08:00
"clusterId" : clusterId ,
2021-04-29 16:48:09 +08:00
"role" : role ,
"nodeId" : nodeId ,
"item" : item ,
"value" : valueJSON ,
"createdAt" : createdAt ,
"day" : day ,
"hour" : hour ,
"minute" : minute ,
} , maps . Map {
"value" : valueJSON ,
} )
2023-05-17 18:42:21 +08:00
if err != nil {
return err
}
// 触发钩子
err = this . nodeValueHook ( tx , role , nodeId , item , valueJSON )
if err != nil {
return err
}
return nil
2021-04-29 16:48:09 +08:00
}
2021-12-06 10:15:32 +08:00
// Clean 清除数据
func ( this * NodeValueDAO ) Clean ( tx * dbs . Tx ) error {
2022-01-09 11:14:05 +08:00
var hour = timeutil . Format ( "YmdH" , time . Now ( ) . Add ( - 2 * time . Hour ) )
2021-04-29 16:48:09 +08:00
_ , err := this . Query ( tx ) .
2022-01-09 11:14:05 +08:00
Where ( "hour<=:hour" ) .
Param ( "hour" , hour ) .
2021-04-29 16:48:09 +08:00
Delete ( )
if err != nil {
return err
}
return nil
}
// ListValues 列出最近的的数据
func ( this * NodeValueDAO ) ListValues ( tx * dbs . Tx , role string , nodeId int64 , item string , timeRange nodeconfigs . NodeValueRange ) ( result [ ] * NodeValue , err error ) {
query := this . Query ( tx ) .
Attr ( "role" , role ) .
Attr ( "nodeId" , nodeId ) .
Attr ( "item" , item )
switch timeRange {
// TODO 支持更多的时间范围
case nodeconfigs . NodeValueRangeMinute :
fromMinute := timeutil . FormatTime ( "YmdHi" , time . Now ( ) . Unix ( ) - 3600 ) // 一个小时之前的
query . Gte ( "minute" , fromMinute )
default :
err = errors . New ( "invalid 'range' value: '" + timeRange + "'" )
return
}
_ , err = query . Slice ( & result ) .
FindAll ( )
return
}
2021-05-05 19:50:55 +08:00
2021-07-05 11:37:22 +08:00
// ListValuesWithClusterId 列出集群最近的的平均数据
2021-07-06 20:06:34 +08:00
func ( this * NodeValueDAO ) ListValuesWithClusterId ( tx * dbs . Tx , role string , clusterId int64 , item string , key string , timeRange nodeconfigs . NodeValueRange ) ( result [ ] * NodeValue , err error ) {
2021-07-05 11:37:22 +08:00
query := this . Query ( tx ) .
Attr ( "role" , role ) .
Attr ( "item" , item ) .
Result ( "AVG(JSON_EXTRACT(value, '$." + key + "')) AS value, MIN(createdAt) AS createdAt" )
2021-08-01 11:13:46 +08:00
switch role {
case nodeconfigs . NodeRoleNode :
query . Where ( "nodeId IN (SELECT id FROM " + SharedNodeDAO . Table + " WHERE (clusterId=:clusterId OR JSON_CONTAINS(secondaryClusterIds, :clusterIdString)) AND state=1)" )
query . Param ( "clusterId" , clusterId ) .
Param ( "clusterIdString" , types . String ( clusterId ) )
default :
query . Attr ( "clusterId" , clusterId )
}
2021-07-05 11:37:22 +08:00
switch timeRange {
// TODO 支持更多的时间范围
case nodeconfigs . NodeValueRangeMinute :
fromMinute := timeutil . FormatTime ( "YmdHi" , time . Now ( ) . Unix ( ) - 3600 ) // 一个小时之前的
query . Gte ( "minute" , fromMinute )
query . Result ( "minute" )
query . Group ( "minute" )
default :
err = errors . New ( "invalid 'range' value: '" + timeRange + "'" )
return
}
_ , err = query . Slice ( & result ) .
FindAll ( )
2022-04-16 22:22:05 +08:00
if err != nil {
return nil , err
}
for _ , nodeValue := range result {
nodeValue . Value , _ = json . Marshal ( maps . Map {
key : types . Float32 ( string ( nodeValue . Value ) ) ,
} )
}
2021-07-05 11:37:22 +08:00
return
}
2021-07-11 18:05:57 +08:00
// ListValuesForUserNodes 列出用户节点相关的平均数据
func ( this * NodeValueDAO ) ListValuesForUserNodes ( tx * dbs . Tx , item string , key string , timeRange nodeconfigs . NodeValueRange ) ( result [ ] * NodeValue , err error ) {
query := this . Query ( tx ) .
Attr ( "role" , "user" ) .
Attr ( "item" , item ) .
Result ( "AVG(JSON_EXTRACT(value, '$." + key + "')) AS value, MIN(createdAt) AS createdAt" )
switch timeRange {
// TODO 支持更多的时间范围
case nodeconfigs . NodeValueRangeMinute :
fromMinute := timeutil . FormatTime ( "YmdHi" , time . Now ( ) . Unix ( ) - 3600 ) // 一个小时之前的
query . Gte ( "minute" , fromMinute )
query . Result ( "minute" )
query . Group ( "minute" )
default :
err = errors . New ( "invalid 'range' value: '" + timeRange + "'" )
return
}
_ , err = query . Slice ( & result ) .
FindAll ( )
2022-04-16 22:22:05 +08:00
if err != nil {
return nil , err
}
for _ , nodeValue := range result {
nodeValue . Value , _ = json . Marshal ( maps . Map {
key : types . Float32 ( string ( nodeValue . Value ) ) ,
} )
}
2021-07-11 18:05:57 +08:00
return
}
2021-07-11 21:44:08 +08:00
// ListValuesForNSNodes 列出用户节点相关的平均数据
func ( this * NodeValueDAO ) ListValuesForNSNodes ( tx * dbs . Tx , item string , key string , timeRange nodeconfigs . NodeValueRange ) ( result [ ] * NodeValue , err error ) {
query := this . Query ( tx ) .
Attr ( "role" , "dns" ) .
Attr ( "item" , item ) .
Result ( "AVG(JSON_EXTRACT(value, '$." + key + "')) AS value, MIN(createdAt) AS createdAt" )
switch timeRange {
// TODO 支持更多的时间范围
case nodeconfigs . NodeValueRangeMinute :
fromMinute := timeutil . FormatTime ( "YmdHi" , time . Now ( ) . Unix ( ) - 3600 ) // 一个小时之前的
query . Gte ( "minute" , fromMinute )
query . Result ( "minute" )
query . Group ( "minute" )
default :
err = errors . New ( "invalid 'range' value: '" + timeRange + "'" )
return
}
_ , err = query . Slice ( & result ) .
FindAll ( )
2022-04-16 22:22:05 +08:00
if err != nil {
return nil , err
}
for _ , nodeValue := range result {
nodeValue . Value , _ = json . Marshal ( maps . Map {
key : types . Float32 ( string ( nodeValue . Value ) ) ,
} )
}
2021-07-11 21:44:08 +08:00
return
}
2022-04-07 18:31:38 +08:00
// SumAllNodeValues 计算所有节点的某项参数值
func ( this * NodeValueDAO ) SumAllNodeValues ( tx * dbs . Tx , role string , item nodeconfigs . NodeValueItem , param string , duration int32 , durationUnit nodeconfigs . NodeValueDurationUnit ) ( total float64 , avg float64 , max float64 , err error ) {
if duration <= 0 {
return 0 , 0 , 0 , nil
}
var query = this . Query ( tx ) .
Result ( "SUM(JSON_EXTRACT(value, '$." + param + "')) AS sumValue" , "AVG(JSON_EXTRACT(value, '$." + param + "')) AS avgValue" , "MAX(JSON_EXTRACT(value, '$." + param + "')) AS maxValueResult" ) . // maxValue 是个MySQL Keyword, 这里使用maxValueResult代替
Attr ( "role" , role ) .
Attr ( "item" , item )
switch durationUnit {
case nodeconfigs . NodeValueDurationUnitMinute :
fromMinute := timeutil . FormatTime ( "YmdHi" , time . Now ( ) . Unix ( ) - int64 ( duration * 60 ) )
query . Attr ( "minute" , fromMinute )
default :
fromMinute := timeutil . FormatTime ( "YmdHi" , time . Now ( ) . Unix ( ) - int64 ( duration * 60 ) )
query . Attr ( "minute" , fromMinute )
}
m , _ , err := query . FindOne ( )
if err != nil {
return 0 , 0 , 0 , err
}
return m . GetFloat64 ( "sumValue" ) , m . GetFloat64 ( "avgValue" ) , m . GetFloat64 ( "maxValueResult" ) , nil
}
2021-09-14 11:36:22 +08:00
// SumNodeValues 计算节点的某项参数值
func ( this * NodeValueDAO ) SumNodeValues ( tx * dbs . Tx , role string , nodeId int64 , item string , param string , method nodeconfigs . NodeValueSumMethod , duration int32 , durationUnit nodeconfigs . NodeValueDurationUnit ) ( float64 , error ) {
2021-05-05 19:50:55 +08:00
if duration <= 0 {
return 0 , nil
}
query := this . Query ( tx ) .
Attr ( "role" , role ) .
Attr ( "nodeId" , nodeId ) .
Attr ( "item" , item )
switch method {
case nodeconfigs . NodeValueSumMethodAvg :
2021-09-14 11:36:22 +08:00
query . Result ( "AVG(JSON_EXTRACT(value, '$." + param + "'))" )
case nodeconfigs . NodeValueSumMethodSum :
query . Result ( "SUM(JSON_EXTRACT(value, '$." + param + "'))" )
default :
query . Result ( "AVG(JSON_EXTRACT(value, '$." + param + "'))" )
}
switch durationUnit {
case nodeconfigs . NodeValueDurationUnitMinute :
fromMinute := timeutil . FormatTime ( "YmdHi" , time . Now ( ) . Unix ( ) - int64 ( duration * 60 ) )
query . Gte ( "minute" , fromMinute )
default :
fromMinute := timeutil . FormatTime ( "YmdHi" , time . Now ( ) . Unix ( ) - int64 ( duration * 60 ) )
query . Gte ( "minute" , fromMinute )
}
return query . FindFloat64Col ( 0 )
}
// SumNodeGroupValues 计算节点分组的某项参数值
func ( this * NodeValueDAO ) SumNodeGroupValues ( tx * dbs . Tx , role string , groupId int64 , item string , param string , method nodeconfigs . NodeValueSumMethod , duration int32 , durationUnit nodeconfigs . NodeValueDurationUnit ) ( float64 , error ) {
if duration <= 0 {
return 0 , nil
}
query := this . Query ( tx ) .
Attr ( "role" , role ) .
Where ( "nodeId IN (SELECT id FROM " + SharedNodeDAO . Table + " WHERE groupId=:groupId AND state=1)" ) .
Param ( "groupId" , groupId ) .
Attr ( "item" , item )
switch method {
case nodeconfigs . NodeValueSumMethodAvg :
query . Result ( "AVG(JSON_EXTRACT(value, '$." + param + "'))" )
case nodeconfigs . NodeValueSumMethodSum :
query . Result ( "SUM(JSON_EXTRACT(value, '$." + param + "'))" )
default :
query . Result ( "AVG(JSON_EXTRACT(value, '$." + param + "'))" )
}
switch durationUnit {
case nodeconfigs . NodeValueDurationUnitMinute :
fromMinute := timeutil . FormatTime ( "YmdHi" , time . Now ( ) . Unix ( ) - int64 ( duration * 60 ) )
query . Gte ( "minute" , fromMinute )
default :
fromMinute := timeutil . FormatTime ( "YmdHi" , time . Now ( ) . Unix ( ) - int64 ( duration * 60 ) )
query . Gte ( "minute" , fromMinute )
}
return query . FindFloat64Col ( 0 )
}
// SumNodeClusterValues 计算节点集群的某项参数值
func ( this * NodeValueDAO ) SumNodeClusterValues ( tx * dbs . Tx , role string , clusterId int64 , item string , param string , method nodeconfigs . NodeValueSumMethod , duration int32 , durationUnit nodeconfigs . NodeValueDurationUnit ) ( float64 , error ) {
if duration <= 0 {
return 0 , nil
}
query := this . Query ( tx ) .
Attr ( "role" , role ) .
Attr ( "clusterId" , clusterId ) .
Attr ( "item" , item )
switch method {
case nodeconfigs . NodeValueSumMethodAvg :
2021-05-05 19:50:55 +08:00
query . Result ( "AVG(JSON_EXTRACT(value, '$." + param + "'))" )
case nodeconfigs . NodeValueSumMethodSum :
query . Result ( "SUM(JSON_EXTRACT(value, '$." + param + "'))" )
default :
query . Result ( "AVG(JSON_EXTRACT(value, '$." + param + "'))" )
}
switch durationUnit {
case nodeconfigs . NodeValueDurationUnitMinute :
2021-07-05 11:37:22 +08:00
fromMinute := timeutil . FormatTime ( "YmdHi" , time . Now ( ) . Unix ( ) - int64 ( duration * 60 ) )
2021-05-05 19:50:55 +08:00
query . Gte ( "minute" , fromMinute )
default :
2021-07-05 11:37:22 +08:00
fromMinute := timeutil . FormatTime ( "YmdHi" , time . Now ( ) . Unix ( ) - int64 ( duration * 60 ) )
2021-05-05 19:50:55 +08:00
query . Gte ( "minute" , fromMinute )
}
return query . FindFloat64Col ( 0 )
}
2021-07-06 20:06:34 +08:00
// FindLatestNodeValue 获取最近一条数据
func ( this * NodeValueDAO ) FindLatestNodeValue ( tx * dbs . Tx , role string , nodeId int64 , item string ) ( * NodeValue , error ) {
2023-05-17 18:42:21 +08:00
var fromMinute = timeutil . FormatTime ( "YmdHi" , time . Now ( ) . Unix ( ) - int64 ( 60 ) )
2022-04-07 18:31:38 +08:00
2021-07-06 20:06:34 +08:00
one , err := this . Query ( tx ) .
Attr ( "role" , role ) .
Attr ( "nodeId" , nodeId ) .
Attr ( "item" , item ) .
2022-04-07 18:31:38 +08:00
Attr ( "minute" , fromMinute ) .
2021-07-06 20:06:34 +08:00
Find ( )
if err != nil {
return nil , err
}
if one == nil {
return nil , nil
}
return one . ( * NodeValue ) , nil
}
2022-04-07 18:31:38 +08:00
// ComposeNodeStatus 组合节点状态值
func ( this * NodeValueDAO ) ComposeNodeStatus ( tx * dbs . Tx , role string , nodeId int64 , statusConfig * nodeconfigs . NodeStatus ) error {
var items = [ ] string {
nodeconfigs . NodeValueItemCPU ,
nodeconfigs . NodeValueItemMemory ,
nodeconfigs . NodeValueItemLoad ,
nodeconfigs . NodeValueItemTrafficOut ,
nodeconfigs . NodeValueItemTrafficIn ,
}
ones , err := this . Query ( tx ) .
Result ( "item" , "value" ) .
Attr ( "role" , role ) .
Attr ( "nodeId" , nodeId ) .
Attr ( "minute" , timeutil . FormatTime ( "YmdHi" , time . Now ( ) . Unix ( ) - 60 ) ) .
Where ( "item IN ('" + strings . Join ( items , "', '" ) + "')" ) .
FindAll ( )
if err != nil {
return err
}
for _ , one := range ones {
var oneValue = one . ( * NodeValue )
var valueMap = oneValue . DecodeMapValue ( )
switch oneValue . Item {
case nodeconfigs . NodeValueItemCPU :
statusConfig . CPUUsage = valueMap . GetFloat64 ( "usage" )
case nodeconfigs . NodeValueItemMemory :
statusConfig . MemoryUsage = valueMap . GetFloat64 ( "usage" )
case nodeconfigs . NodeValueItemLoad :
statusConfig . Load1m = valueMap . GetFloat64 ( "load1m" )
statusConfig . Load5m = valueMap . GetFloat64 ( "load5m" )
statusConfig . Load15m = valueMap . GetFloat64 ( "load15m" )
case nodeconfigs . NodeValueItemTrafficOut :
statusConfig . TrafficOutBytes = valueMap . GetUint64 ( "total" )
case nodeconfigs . NodeValueItemTrafficIn :
statusConfig . TrafficInBytes = valueMap . GetUint64 ( "total" )
}
}
return nil
}
// ComposeNodeStatusJSON 组合节点状态值, 并转换为JSON数据
func ( this * NodeValueDAO ) ComposeNodeStatusJSON ( tx * dbs . Tx , role string , nodeId int64 , statusJSON [ ] byte ) ( [ ] byte , error ) {
var statusConfig = & nodeconfigs . NodeStatus { }
if len ( statusJSON ) > 0 {
err := json . Unmarshal ( statusJSON , statusConfig )
if err != nil {
return nil , err
}
}
err := this . ComposeNodeStatus ( tx , role , nodeId , statusConfig )
if err != nil {
return nil , err
}
return json . Marshal ( statusConfig )
}