Files
EdgeAPI/internal/db/models/metric_stat_dao.go

458 lines
14 KiB
Go
Raw Normal View History

2021-06-30 19:59:49 +08:00
package models
import (
"encoding/json"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
2021-07-05 16:37:53 +08:00
"github.com/iwind/TeaGo/lists"
2021-07-19 21:01:26 +08:00
"github.com/iwind/TeaGo/logs"
2021-06-30 19:59:49 +08:00
"github.com/iwind/TeaGo/maps"
2021-07-19 21:01:26 +08:00
"github.com/iwind/TeaGo/rands"
timeutil "github.com/iwind/TeaGo/utils/time"
2021-06-30 19:59:49 +08:00
"strconv"
2021-07-19 21:01:26 +08:00
"time"
2021-06-30 19:59:49 +08:00
)
type MetricStatDAO dbs.DAO
2021-07-19 21:01:26 +08:00
func init() {
dbs.OnReadyDone(func() {
// 清理数据任务
var ticker = time.NewTicker(time.Duration(rands.Int(24, 48)) * time.Hour)
go func() {
for range ticker.C {
err := SharedMetricStatDAO.Clean(nil, 120) // 只保留120天
if err != nil {
logs.Println("SharedMetricStatDAO: clean expired data failed: " + err.Error())
}
}
}()
})
}
2021-06-30 19:59:49 +08:00
func NewMetricStatDAO() *MetricStatDAO {
return dbs.NewDAO(&MetricStatDAO{
DAOObject: dbs.DAOObject{
DB: Tea.Env,
Table: "edgeMetricStats",
Model: new(MetricStat),
PkName: "id",
},
}).(*MetricStatDAO)
}
var SharedMetricStatDAO *MetricStatDAO
func init() {
dbs.OnReady(func() {
SharedMetricStatDAO = NewMetricStatDAO()
})
}
// CreateStat 创建统计数据
2021-07-01 10:39:42 +08:00
func (this *MetricStatDAO) CreateStat(tx *dbs.Tx, hash string, clusterId int64, nodeId int64, serverId int64, itemId int64, keys []string, value float64, time string, version int32) error {
2021-06-30 19:59:49 +08:00
hash += "@" + strconv.FormatInt(nodeId, 10)
var keysString string
if len(keys) > 0 {
keysJSON, err := json.Marshal(keys)
if err != nil {
return err
}
keysString = string(keysJSON)
} else {
keysString = "[]"
}
return this.Query(tx).
Param("value", value).
InsertOrUpdateQuickly(maps.Map{
2021-07-19 21:01:26 +08:00
"hash": hash,
"clusterId": clusterId,
"nodeId": nodeId,
"serverId": serverId,
"itemId": itemId,
"value": value,
"time": time,
"version": version,
"keys": keysString,
"createdDay": timeutil.Format("Ymd"),
2021-06-30 19:59:49 +08:00
}, maps.Map{
"value": value,
})
}
// DeleteOldItemStats 删除以前版本的统计数据
func (this *MetricStatDAO) DeleteOldItemStats(tx *dbs.Tx, itemId int64, version int32) error {
_, err := this.Query(tx).
Attr("itemId", itemId).
Where("version<:version").
Param("version", version).
2021-06-30 19:59:49 +08:00
Delete()
return err
}
// DeleteItemStats 删除某个指标相关的统计数据
func (this *MetricStatDAO) DeleteItemStats(tx *dbs.Tx, itemId int64) error {
_, err := this.Query(tx).
Attr("itemId", itemId).
Delete()
return err
}
// CountItemStats 计算统计数据数量
func (this *MetricStatDAO) CountItemStats(tx *dbs.Tx, itemId int64, version int32) (int64, error) {
return this.Query(tx).
Attr("itemId", itemId).
Attr("version", version).
Count()
}
// ListItemStats 列出单页统计数据
func (this *MetricStatDAO) ListItemStats(tx *dbs.Tx, itemId int64, version int32, offset int64, size int64) (result []*MetricStat, err error) {
_, err = this.Query(tx).
Attr("itemId", itemId).
Attr("version", version).
Offset(offset).
Limit(size).
2021-07-01 10:39:42 +08:00
Desc("time").
Desc("serverId").
Desc("value").
2021-06-30 19:59:49 +08:00
Slice(&result).
FindAll()
return
}
2021-07-05 16:37:53 +08:00
2021-07-19 17:58:16 +08:00
// FindItemStatsAtLastTime 取得所有集群最近一次计时前 N 个数据
// 适合每条数据中包含不同的Key的场景
func (this *MetricStatDAO) FindItemStatsAtLastTime(tx *dbs.Tx, itemId int64, ignoreEmptyKeys bool, ignoreKeys []string, version int32, size int64) (result []*MetricStat, err error) {
2021-07-19 17:58:16 +08:00
// 最近一次时间
statOne, err := this.Query(tx).
Attr("itemId", itemId).
Attr("version", version).
DescPk().
Find()
if err != nil {
return nil, err
}
if statOne == nil {
return nil, nil
}
var lastStat = statOne.(*MetricStat)
var lastTime = lastStat.Time
var query = this.Query(tx).
2021-07-19 17:58:16 +08:00
Attr("itemId", itemId).
Attr("version", version).
Attr("time", lastTime).
// TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等
// TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE
Result("MIN(time) AS time", "SUM(value) AS value", "keys").
Desc("value").
Group("keys").
Limit(size).
Slice(&result)
if ignoreEmptyKeys {
query.Where("NOT JSON_CONTAINS(`keys`, '\"\"')")
}
if len(ignoreKeys) > 0 {
ignoreKeysJSON, err := json.Marshal(ignoreKeys)
if err != nil {
return nil, err
}
query.Where("NOT JSON_CONTAINS(:ignoredKeys, JSON_EXTRACT(`keys`, '$[0]'))") // TODO $[0] 需要换成keys中的primary key位置
query.Param("ignoredKeys", string(ignoreKeysJSON))
}
_, err = query.
2021-07-19 17:58:16 +08:00
FindAll()
return
}
// FindItemStatsWithClusterIdAndLastTime 取得集群最近一次计时前 N 个数据
2021-07-05 16:37:53 +08:00
// 适合每条数据中包含不同的Key的场景
func (this *MetricStatDAO) FindItemStatsWithClusterIdAndLastTime(tx *dbs.Tx, clusterId int64, itemId int64, ignoreEmptyKeys bool, ignoreKeys []string, version int32, size int64) (result []*MetricStat, err error) {
2021-07-05 16:37:53 +08:00
// 最近一次时间
statOne, err := this.Query(tx).
Attr("itemId", itemId).
Attr("version", version).
DescPk().
Find()
if err != nil {
return nil, err
}
if statOne == nil {
return nil, nil
}
var lastStat = statOne.(*MetricStat)
var lastTime = lastStat.Time
var query = this.Query(tx).
2021-07-05 16:37:53 +08:00
Attr("clusterId", clusterId).
Attr("itemId", itemId).
Attr("version", version).
Attr("time", lastTime).
// TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等
// TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE
Result("MIN(time) AS time", "SUM(value) AS value", "keys").
Desc("value").
Group("keys").
Limit(size).
Slice(&result)
if ignoreEmptyKeys {
query.Where("NOT JSON_CONTAINS(`keys`, '\"\"')")
}
if len(ignoreKeys) > 0 {
ignoreKeysJSON, err := json.Marshal(ignoreKeys)
if err != nil {
return nil, err
}
query.Where("NOT JSON_CONTAINS(:ignoredKeys, JSON_EXTRACT(`keys`, '$[0]'))") // TODO $[0] 需要换成keys中的primary key位置
query.Param("ignoredKeys", string(ignoreKeysJSON))
}
_, err = query.
2021-07-05 16:37:53 +08:00
FindAll()
return
}
// FindItemStatsWithNodeIdAndLastTime 取得节点最近一次计时前 N 个数据
// 适合每条数据中包含不同的Key的场景
func (this *MetricStatDAO) FindItemStatsWithNodeIdAndLastTime(tx *dbs.Tx, nodeId int64, itemId int64, ignoreEmptyKeys bool, ignoreKeys []string, version int32, size int64) (result []*MetricStat, err error) {
// 最近一次时间
statOne, err := this.Query(tx).
Attr("itemId", itemId).
Attr("version", version).
DescPk().
Find()
if err != nil {
return nil, err
}
if statOne == nil {
return nil, nil
}
var lastStat = statOne.(*MetricStat)
var lastTime = lastStat.Time
var query = this.Query(tx).
Attr("nodeId", nodeId).
Attr("itemId", itemId).
Attr("version", version).
Attr("time", lastTime).
// TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等
// TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE
Result("MIN(time) AS time", "SUM(value) AS value", "keys").
Desc("value").
Group("keys").
Limit(size).
Slice(&result)
if ignoreEmptyKeys {
query.Where("NOT JSON_CONTAINS(`keys`, '\"\"')")
}
if len(ignoreKeys) > 0 {
ignoreKeysJSON, err := json.Marshal(ignoreKeys)
if err != nil {
return nil, err
}
query.Where("NOT JSON_CONTAINS(:ignoredKeys, JSON_EXTRACT(`keys`, '$[0]'))") // TODO $[0] 需要换成keys中的primary key位置
query.Param("ignoredKeys", string(ignoreKeysJSON))
}
_, err = query.
FindAll()
return
}
2021-07-07 19:55:37 +08:00
// FindItemStatsWithServerIdAndLastTime 取得节点最近一次计时前 N 个数据
// 适合每条数据中包含不同的Key的场景
func (this *MetricStatDAO) FindItemStatsWithServerIdAndLastTime(tx *dbs.Tx, serverId int64, itemId int64, ignoreEmptyKeys bool, ignoreKeys []string, version int32, size int64) (result []*MetricStat, err error) {
2021-07-07 19:55:37 +08:00
// 最近一次时间
statOne, err := this.Query(tx).
Attr("itemId", itemId).
Attr("version", version).
DescPk().
Find()
if err != nil {
return nil, err
}
if statOne == nil {
return nil, nil
}
var lastStat = statOne.(*MetricStat)
var lastTime = lastStat.Time
var query = this.Query(tx).
2021-07-07 19:55:37 +08:00
Attr("serverId", serverId).
Attr("itemId", itemId).
Attr("version", version).
Attr("time", lastTime).
// TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等
// TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE
Result("MIN(time) AS time", "SUM(value) AS value", "keys").
Desc("value").
Group("keys").
Limit(size).
Slice(&result)
if ignoreEmptyKeys {
query.Where("NOT JSON_CONTAINS(`keys`, '\"\"')")
}
if len(ignoreKeys) > 0 {
ignoreKeysJSON, err := json.Marshal(ignoreKeys)
if err != nil {
return nil, err
}
query.Where("NOT JSON_CONTAINS(:ignoredKeys, JSON_EXTRACT(`keys`, '$[0]'))") // TODO $[0] 需要换成keys中的primary key位置
query.Param("ignoredKeys", string(ignoreKeysJSON))
}
_, err = query.
2021-07-07 19:55:37 +08:00
FindAll()
return
}
2021-07-19 17:58:16 +08:00
// FindLatestItemStats 取得所有集群上最近 N 个时间的数据
// 适合同个Key在不同时间段的变化场景
func (this *MetricStatDAO) FindLatestItemStats(tx *dbs.Tx, itemId int64, ignoreEmptyKeys bool, ignoreKeys []string, version int32, size int64) (result []*MetricStat, err error) {
var query = this.Query(tx).
2021-07-19 17:58:16 +08:00
Attr("itemId", itemId).
Attr("version", version).
// TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等
// TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE
Result("time", "SUM(value) AS value", "MIN(`keys`) AS `keys`").
Desc("time").
Group("time").
Limit(size).
Slice(&result)
if ignoreEmptyKeys {
query.Where("NOT JSON_CONTAINS(`keys`, '\"\"')")
}
if len(ignoreKeys) > 0 {
ignoreKeysJSON, err := json.Marshal(ignoreKeys)
if err != nil {
return nil, err
}
query.Where("NOT JSON_CONTAINS(:ignoredKeys, JSON_EXTRACT(`keys`, '$[0]'))") // TODO $[0] 需要换成keys中的primary key位置
query.Param("ignoredKeys", string(ignoreKeysJSON))
}
_, err = query.
2021-07-19 17:58:16 +08:00
FindAll()
if err != nil {
return nil, err
}
lists.Reverse(result)
return
}
2021-07-05 16:37:53 +08:00
// FindLatestItemStatsWithClusterId 取得集群最近 N 个时间的数据
// 适合同个Key在不同时间段的变化场景
func (this *MetricStatDAO) FindLatestItemStatsWithClusterId(tx *dbs.Tx, clusterId int64, itemId int64, ignoreEmptyKeys bool, ignoreKeys []string, version int32, size int64) (result []*MetricStat, err error) {
var query = this.Query(tx).
2021-07-05 16:37:53 +08:00
Attr("clusterId", clusterId).
Attr("itemId", itemId).
Attr("version", version).
// TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等
// TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE
Result("time", "SUM(value) AS value", "MIN(`keys`) AS `keys`").
Desc("time").
Group("time").
Limit(size).
Slice(&result)
if ignoreEmptyKeys {
query.Where("NOT JSON_CONTAINS(`keys`, '\"\"')")
}
if len(ignoreKeys) > 0 {
ignoreKeysJSON, err := json.Marshal(ignoreKeys)
if err != nil {
return nil, err
}
query.Where("NOT JSON_CONTAINS(:ignoredKeys, JSON_EXTRACT(`keys`, '$[0]'))") // TODO $[0] 需要换成keys中的primary key位置
query.Param("ignoredKeys", string(ignoreKeysJSON))
}
_, err = query.
2021-07-05 16:37:53 +08:00
FindAll()
if err != nil {
return nil, err
}
lists.Reverse(result)
return
}
// FindLatestItemStatsWithNodeId 取得节点最近 N 个时间的数据
// 适合同个Key在不同时间段的变化场景
func (this *MetricStatDAO) FindLatestItemStatsWithNodeId(tx *dbs.Tx, nodeId int64, itemId int64, ignoreEmptyKeys bool, ignoreKeys []string, version int32, size int64) (result []*MetricStat, err error) {
var query = this.Query(tx).
Attr("nodeId", nodeId).
Attr("itemId", itemId).
Attr("version", version).
// TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等
// TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE
Result("time", "SUM(value) AS value", "MIN(`keys`) AS `keys`").
Desc("time").
Group("time").
Limit(size).
Slice(&result)
if ignoreEmptyKeys {
query.Where("NOT JSON_CONTAINS(`keys`, '\"\"')")
}
if len(ignoreKeys) > 0 {
ignoreKeysJSON, err := json.Marshal(ignoreKeys)
if err != nil {
return nil, err
}
query.Where("NOT JSON_CONTAINS(:ignoredKeys, JSON_EXTRACT(`keys`, '$[0]'))") // TODO $[0] 需要换成keys中的primary key位置
query.Param("ignoredKeys", string(ignoreKeysJSON))
}
_, err = query.
FindAll()
if err != nil {
return nil, err
}
lists.Reverse(result)
return
}
2021-07-07 19:55:37 +08:00
// FindLatestItemStatsWithServerId 取得服务最近 N 个时间的数据
// 适合同个Key在不同时间段的变化场景
func (this *MetricStatDAO) FindLatestItemStatsWithServerId(tx *dbs.Tx, serverId int64, itemId int64, ignoreEmptyKeys bool, ignoreKeys []string, version int32, size int64) (result []*MetricStat, err error) {
var query = this.Query(tx).
2021-07-07 19:55:37 +08:00
Attr("serverId", serverId).
Attr("itemId", itemId).
Attr("version", version).
// TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等
// TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE
Result("time", "SUM(value) AS value", "MIN(`keys`) AS `keys`").
Desc("time").
Group("time").
Limit(size).
Slice(&result)
if ignoreEmptyKeys {
query.Where("NOT JSON_CONTAINS(`keys`, '\"\"')")
}
if len(ignoreKeys) > 0 {
ignoreKeysJSON, err := json.Marshal(ignoreKeys)
if err != nil {
return nil, err
}
query.Where("NOT JSON_CONTAINS(:ignoredKeys, JSON_EXTRACT(`keys`, '$[0]'))") // TODO $[0] 需要换成keys中的primary key位置
query.Param("ignoredKeys", string(ignoreKeysJSON))
}
_, err = query.
2021-07-07 19:55:37 +08:00
FindAll()
if err != nil {
return nil, err
}
lists.Reverse(result)
return
}
2021-07-19 21:01:26 +08:00
// Clean 清理数据
func (this *MetricStatDAO) Clean(tx *dbs.Tx, days int64) error {
_, err := this.Query(tx).
Lt("createdDay", timeutil.FormatTime("Ymd", time.Now().Unix()-days*86400)).
Delete()
if err != nil {
return err
}
return nil
}