对统计指标进行分表

This commit is contained in:
刘祥超
2022-03-28 16:25:16 +08:00
parent 7c4a01137b
commit 6d2ecb9af3
6 changed files with 503 additions and 228 deletions

View File

@@ -26,5 +26,5 @@ const (
ReportNodeVersion = "0.1.0" ReportNodeVersion = "0.1.0"
// SQLVersion SQL版本号 // SQLVersion SQL版本号
SQLVersion = "5" SQLVersion = "6"
) )

View File

@@ -12,6 +12,7 @@ import (
"github.com/iwind/TeaGo/types" "github.com/iwind/TeaGo/types"
"sort" "sort"
"strings" "strings"
"sync"
) )
const ( const (
@@ -34,6 +35,9 @@ func NewMetricItemDAO() *MetricItemDAO {
var SharedMetricItemDAO *MetricItemDAO var SharedMetricItemDAO *MetricItemDAO
var metricItemLastTimeCacheMap = map[int64]string{} // itemId => time
var metricItemLastTimeCacheLocker = &sync.Mutex{}
func init() { func init() {
dbs.OnReady(func() { dbs.OnReady(func() {
SharedMetricItemDAO = NewMetricItemDAO() SharedMetricItemDAO = NewMetricItemDAO()
@@ -330,6 +334,32 @@ func (this *MetricItemDAO) FindItemVersion(tx *dbs.Tx, itemId int64) (int32, err
return types.Int32(version), nil return types.Int32(version), nil
} }
// UpdateMetricLastTime 更新指标最新数据的时间
func (this *MetricItemDAO) UpdateMetricLastTime(tx *dbs.Tx, itemId int64, lastTime string) error {
metricItemLastTimeCacheLocker.Lock()
cachedTime, ok := metricItemLastTimeCacheMap[itemId]
if ok && cachedTime == lastTime {
metricItemLastTimeCacheLocker.Unlock()
return nil
}
metricItemLastTimeCacheMap[itemId] = lastTime
metricItemLastTimeCacheLocker.Unlock()
return this.Query(tx).
Pk(itemId).
Set("lastTime", lastTime).
UpdateQuickly()
}
// FindMetricLastTime 读取指标最新数据的时间
func (this *MetricItemDAO) FindMetricLastTime(tx *dbs.Tx, itemId int64) (string, error) {
return this.Query(tx).
Result("lastTime").
Pk(itemId).
FindStringCol("")
}
// NotifyUpdate 通知更新 // NotifyUpdate 通知更新
func (this *MetricItemDAO) NotifyUpdate(tx *dbs.Tx, itemId int64, isPublic bool) error { func (this *MetricItemDAO) NotifyUpdate(tx *dbs.Tx, itemId int64, isPublic bool) error {
if isPublic { if isPublic {

View File

@@ -19,6 +19,7 @@ type MetricItem struct {
State uint8 `field:"state"` // 状态 State uint8 `field:"state"` // 状态
Version uint32 `field:"version"` // 版本号 Version uint32 `field:"version"` // 版本号
IsPublic bool `field:"isPublic"` // 是否为公用 IsPublic bool `field:"isPublic"` // 是否为公用
LastTime string `field:"lastTime"` // 最新时间
} }
type MetricItemOperator struct { type MetricItemOperator struct {
@@ -37,6 +38,7 @@ type MetricItemOperator struct {
State interface{} // 状态 State interface{} // 状态
Version interface{} // 版本号 Version interface{} // 版本号
IsPublic interface{} // 是否为公用 IsPublic interface{} // 是否为公用
LastTime interface{} // 最新时间
} }
func NewMetricItemOperator() *MetricItemOperator { func NewMetricItemOperator() *MetricItemOperator {

View File

@@ -11,8 +11,12 @@ import (
"github.com/iwind/TeaGo/lists" "github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/maps" "github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/rands" "github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
timeutil "github.com/iwind/TeaGo/utils/time" timeutil "github.com/iwind/TeaGo/utils/time"
"sort"
"strconv" "strconv"
"sync"
"sync/atomic"
"time" "time"
) )
@@ -33,6 +37,8 @@ func init() {
}) })
} }
const MetricStatTablePartials = 20 // 表格Partial数量
func NewMetricStatDAO() *MetricStatDAO { func NewMetricStatDAO() *MetricStatDAO {
return dbs.NewDAO(&MetricStatDAO{ return dbs.NewDAO(&MetricStatDAO{
DAOObject: dbs.DAOObject{ DAOObject: dbs.DAOObject{
@@ -65,7 +71,8 @@ func (this *MetricStatDAO) CreateStat(tx *dbs.Tx, hash string, clusterId int64,
} else { } else {
keysString = "[]" keysString = "[]"
} }
return this.Query(tx). err := this.Query(tx).
Table(this.partialTable(serverId)).
Param("value", value). Param("value", value).
InsertOrUpdateQuickly(maps.Map{ InsertOrUpdateQuickly(maps.Map{
"hash": hash, "hash": hash,
@@ -81,57 +88,127 @@ func (this *MetricStatDAO) CreateStat(tx *dbs.Tx, hash string, clusterId int64,
}, maps.Map{ }, maps.Map{
"value": value, "value": value,
}) })
if err != nil {
return err
}
return SharedMetricItemDAO.UpdateMetricLastTime(tx, itemId, time)
} }
// DeleteOldVersionItemStats 删除以前版本的统计数据 // DeleteOldVersionItemStats 删除以前版本的统计数据
func (this *MetricStatDAO) DeleteOldVersionItemStats(tx *dbs.Tx, itemId int64, version int32) error { func (this *MetricStatDAO) DeleteOldVersionItemStats(tx *dbs.Tx, itemId int64, version int32) error {
_, err := this.Query(tx). return this.runBatch(func(table string, locker *sync.Mutex) error {
Attr("itemId", itemId). _, err := this.Query(tx).
Where("version<:version"). Table(table).
Param("version", version). Attr("itemId", itemId).
Delete() Where("version<:version").
return err Param("version", version).
Delete()
return err
})
} }
// DeleteItemStats 删除某个指标相关的统计数据 // DeleteItemStats 删除某个指标相关的统计数据
func (this *MetricStatDAO) DeleteItemStats(tx *dbs.Tx, itemId int64) error { func (this *MetricStatDAO) DeleteItemStats(tx *dbs.Tx, itemId int64) error {
_, err := this.Query(tx). return this.runBatch(func(table string, locker *sync.Mutex) error {
Attr("itemId", itemId). _, err := this.Query(tx).
Delete() Table(table).
return err Attr("itemId", itemId).
Delete()
return err
})
} }
// DeleteNodeItemStats 删除某个节点的统计数据 // DeleteNodeItemStats 删除某个节点的统计数据
func (this *MetricStatDAO) DeleteNodeItemStats(tx *dbs.Tx, nodeId int64, serverId int64, itemId int64, time string) error { func (this *MetricStatDAO) DeleteNodeItemStats(tx *dbs.Tx, nodeId int64, serverId int64, itemId int64, time string) error {
_, err := this.Query(tx). if serverId > 0 {
Attr("nodeId", nodeId). _, err := this.Query(tx).
Attr("serverId", serverId). Table(this.partialTable(serverId)).
Attr("itemId", itemId). Attr("nodeId", nodeId).
Attr("time", time). Attr("serverId", serverId).
Delete() Attr("itemId", itemId).
Attr("time", time).
Delete()
return err
}
err := this.runBatch(func(table string, locker *sync.Mutex) error {
_, err := this.Query(tx).
Table(table).
Attr("nodeId", nodeId).
Attr("serverId", serverId).
Attr("itemId", itemId).
Attr("time", time).
Delete()
return err
})
return err return err
} }
// CountItemStats 计算统计数据数量 // CountItemStats 计算统计数据数量
func (this *MetricStatDAO) CountItemStats(tx *dbs.Tx, itemId int64, version int32) (int64, error) { func (this *MetricStatDAO) CountItemStats(tx *dbs.Tx, itemId int64, version int32) (int64, error) {
return this.Query(tx). var total int64 = 0
Attr("itemId", itemId).
Attr("version", version). err := this.runBatch(func(table string, locker *sync.Mutex) error {
Count() count, err := this.Query(tx).
Table(table).
Attr("itemId", itemId).
Attr("version", version).
Count()
if err != nil {
return err
}
atomic.AddInt64(&total, count)
return nil
})
if err != nil {
return 0, err
}
return total, nil
} }
// ListItemStats 列出单页统计数据 // ListItemStats 列出单页统计数据
func (this *MetricStatDAO) ListItemStats(tx *dbs.Tx, itemId int64, version int32, offset int64, size int64) (result []*MetricStat, err error) { func (this *MetricStatDAO) ListItemStats(tx *dbs.Tx, itemId int64, version int32, offset int64, size int64) (result []*MetricStat, err error) {
_, err = this.Query(tx). err = this.runBatch(func(table string, locker *sync.Mutex) error {
Attr("itemId", itemId). var partialResult = []*MetricStat{}
Attr("version", version). _, err = this.Query(tx).
Offset(offset). Table(table).
Limit(size). Attr("itemId", itemId).
Desc("time"). Attr("version", version).
Desc("serverId"). Offset(offset).
Desc("value"). Limit(size).
Slice(&result). Desc("time").
FindAll() Desc("serverId").
Desc("value").
Slice(&partialResult).
FindAll()
if err != nil {
return err
}
locker.Lock()
result = append(result, partialResult...)
locker.Unlock()
return nil
})
if err != nil {
return nil, err
}
sort.Slice(result, func(i, j int) bool {
if result[i].Time > result[j].Time {
return true
}
if result[i].ServerId > result[j].ServerId {
return true
}
if result[i].Value > result[j].Value {
return true
}
return false
})
return return
} }
@@ -139,92 +216,123 @@ func (this *MetricStatDAO) ListItemStats(tx *dbs.Tx, itemId int64, version int32
// 适合每条数据中包含不同的Key的场景 // 适合每条数据中包含不同的Key的场景
func (this *MetricStatDAO) FindItemStatsAtLastTime(tx *dbs.Tx, itemId int64, ignoreEmptyKeys bool, ignoreKeys []string, version int32, size int64) (result []*MetricStat, err error) { func (this *MetricStatDAO) FindItemStatsAtLastTime(tx *dbs.Tx, itemId int64, ignoreEmptyKeys bool, ignoreKeys []string, version int32, size int64) (result []*MetricStat, err error) {
// 最近一次时间 // 最近一次时间
statOne, err := this.Query(tx). lastTime, err := SharedMetricItemDAO.FindMetricLastTime(tx, itemId)
Attr("itemId", itemId). if err != nil || len(lastTime) == 0 {
Attr("version", version). return nil, err
DescPk(). }
Find()
err = this.runBatch(func(table string, locker *sync.Mutex) error {
var partialResult = []*MetricStat{}
var query = this.Query(tx).
Table(table).
Attr("itemId", itemId).
Attr("version", version).
Attr("time", lastTime).
// TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等
// TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE
Result("MIN(time) AS time", "SUM(value) AS value", "keys").
Desc("value").
Group("keys").
Limit(size).
Slice(&partialResult)
if ignoreEmptyKeys {
query.Where("NOT JSON_CONTAINS(`keys`, '\"\"')")
}
if len(ignoreKeys) > 0 {
ignoreKeysJSON, err := json.Marshal(ignoreKeys)
if err != nil {
return err
}
query.Where("NOT JSON_CONTAINS(:ignoredKeys, JSON_EXTRACT(`keys`, '$[0]'))") // TODO $[0] 需要换成keys中的primary key位置
query.Param("ignoredKeys", string(ignoreKeysJSON))
}
_, err = query.
FindAll()
if err != nil {
return err
}
locker.Lock()
result = append(result, partialResult...)
locker.Unlock()
return nil
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
if statOne == nil { sort.Slice(result, func(i, j int) bool {
return nil, nil return result[i].Value > result[j].Value
})
if len(result) > types.Int(size) {
result = result[:types.Int(size)]
} }
var lastStat = statOne.(*MetricStat)
var lastTime = lastStat.Time
var query = this.Query(tx).
Attr("itemId", itemId).
Attr("version", version).
Attr("time", lastTime).
// TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等
// TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE
Result("MIN(time) AS time", "SUM(value) AS value", "keys").
Desc("value").
Group("keys").
Limit(size).
Slice(&result)
if ignoreEmptyKeys {
query.Where("NOT JSON_CONTAINS(`keys`, '\"\"')")
}
if len(ignoreKeys) > 0 {
ignoreKeysJSON, err := json.Marshal(ignoreKeys)
if err != nil {
return nil, err
}
query.Where("NOT JSON_CONTAINS(:ignoredKeys, JSON_EXTRACT(`keys`, '$[0]'))") // TODO $[0] 需要换成keys中的primary key位置
query.Param("ignoredKeys", string(ignoreKeysJSON))
}
_, err = query.
FindAll()
return return
} }
// FindItemStatsWithClusterIdAndLastTime 取得集群最近一次计时前 N 个数据 // FindItemStatsWithClusterIdAndLastTime 取得集群最近一次计时前 N 个数据
// 适合每条数据中包含不同的Key的场景 // 适合每条数据中包含不同的Key的场景
func (this *MetricStatDAO) FindItemStatsWithClusterIdAndLastTime(tx *dbs.Tx, clusterId int64, itemId int64, ignoreEmptyKeys bool, ignoreKeys []string, version int32, size int64) (result []*MetricStat, err error) { func (this *MetricStatDAO) FindItemStatsWithClusterIdAndLastTime(tx *dbs.Tx, clusterId int64, itemId int64, ignoreEmptyKeys bool, ignoreKeys []string, version int32, size int64) (result []*MetricStat, err error) {
// 最近一次时间 lastTime, err := SharedMetricItemDAO.FindMetricLastTime(tx, itemId)
statOne, err := this.Query(tx). if err != nil || len(lastTime) == 0 {
Attr("itemId", itemId). return nil, err
Attr("version", version). }
DescPk().
Find() err = this.runBatch(func(table string, locker *sync.Mutex) error {
var partialResult = []*MetricStat{}
var query = this.Query(tx).
Table(table).
UseIndex("cluster_item_time").
Attr("clusterId", clusterId).
Attr("itemId", itemId).
Attr("version", version).
Attr("time", lastTime).
// TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等
// TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE
Result("MIN(time) AS time", "SUM(value) AS value", "keys").
Desc("value").
Group("keys").
Limit(size).
Slice(&partialResult)
if ignoreEmptyKeys {
query.Where("NOT JSON_CONTAINS(`keys`, '\"\"')")
}
if len(ignoreKeys) > 0 {
ignoreKeysJSON, err := json.Marshal(ignoreKeys)
if err != nil {
return err
}
query.Where("NOT JSON_CONTAINS(:ignoredKeys, JSON_EXTRACT(`keys`, '$[0]'))") // TODO $[0] 需要换成keys中的primary key位置
query.Param("ignoredKeys", string(ignoreKeysJSON))
}
_, err = query.
FindAll()
if err != nil {
return err
}
locker.Lock()
result = append(result, partialResult...)
locker.Unlock()
return nil
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
if statOne == nil {
return nil, nil
}
var lastStat = statOne.(*MetricStat)
var lastTime = lastStat.Time
var query = this.Query(tx). sort.Slice(result, func(i, j int) bool {
UseIndex("cluster_item_time"). return result[i].Value > result[j].Value
Attr("clusterId", clusterId). })
Attr("itemId", itemId).
Attr("version", version). if len(result) > types.Int(size) {
Attr("time", lastTime). result = result[:types.Int(size)]
// TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等
// TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE
Result("MIN(time) AS time", "SUM(value) AS value", "keys").
Desc("value").
Group("keys").
Limit(size).
Slice(&result)
if ignoreEmptyKeys {
query.Where("NOT JSON_CONTAINS(`keys`, '\"\"')")
}
if len(ignoreKeys) > 0 {
ignoreKeysJSON, err := json.Marshal(ignoreKeys)
if err != nil {
return nil, err
}
query.Where("NOT JSON_CONTAINS(:ignoredKeys, JSON_EXTRACT(`keys`, '$[0]'))") // TODO $[0] 需要换成keys中的primary key位置
query.Param("ignoredKeys", string(ignoreKeysJSON))
} }
_, err = query.
FindAll()
return return
} }
@@ -232,68 +340,78 @@ func (this *MetricStatDAO) FindItemStatsWithClusterIdAndLastTime(tx *dbs.Tx, clu
// 适合每条数据中包含不同的Key的场景 // 适合每条数据中包含不同的Key的场景
func (this *MetricStatDAO) FindItemStatsWithNodeIdAndLastTime(tx *dbs.Tx, nodeId int64, itemId int64, ignoreEmptyKeys bool, ignoreKeys []string, version int32, size int64) (result []*MetricStat, err error) { func (this *MetricStatDAO) FindItemStatsWithNodeIdAndLastTime(tx *dbs.Tx, nodeId int64, itemId int64, ignoreEmptyKeys bool, ignoreKeys []string, version int32, size int64) (result []*MetricStat, err error) {
// 最近一次时间 // 最近一次时间
statOne, err := this.Query(tx). lastTime, err := SharedMetricItemDAO.FindMetricLastTime(tx, itemId)
Attr("itemId", itemId).
Attr("version", version).
DescPk().
Find()
if err != nil { if err != nil {
return nil, err return nil, err
} }
if statOne == nil {
return nil, nil err = this.runBatch(func(table string, locker *sync.Mutex) error {
} var partialResult = []*MetricStat{}
var lastStat = statOne.(*MetricStat) var query = this.Query(tx).
var lastTime = lastStat.Time Table(table).
var query = this.Query(tx). UseIndex("node_item_time").
UseIndex("node_item_time"). Attr("nodeId", nodeId).
Attr("nodeId", nodeId). Attr("itemId", itemId).
Attr("itemId", itemId). Attr("version", version).
Attr("version", version). Attr("time", lastTime).
Attr("time", lastTime). // TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等
// TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等 // TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE
// TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE Result("MIN(time) AS time", "SUM(value) AS value", "keys").
Result("MIN(time) AS time", "SUM(value) AS value", "keys"). Desc("value").
Desc("value"). Group("keys").
Group("keys"). Limit(size).
Limit(size). Slice(&partialResult)
Slice(&result) if ignoreEmptyKeys {
if ignoreEmptyKeys { query.Where("NOT JSON_CONTAINS(`keys`, '\"\"')")
query.Where("NOT JSON_CONTAINS(`keys`, '\"\"')")
}
if len(ignoreKeys) > 0 {
ignoreKeysJSON, err := json.Marshal(ignoreKeys)
if err != nil {
return nil, err
} }
query.Where("NOT JSON_CONTAINS(:ignoredKeys, JSON_EXTRACT(`keys`, '$[0]'))") // TODO $[0] 需要换成keys中的primary key位置 if len(ignoreKeys) > 0 {
query.Param("ignoredKeys", string(ignoreKeysJSON)) ignoreKeysJSON, err := json.Marshal(ignoreKeys)
if err != nil {
return err
}
query.Where("NOT JSON_CONTAINS(:ignoredKeys, JSON_EXTRACT(`keys`, '$[0]'))") // TODO $[0] 需要换成keys中的primary key位置
query.Param("ignoredKeys", string(ignoreKeysJSON))
}
_, err = query.
FindAll()
if err != nil {
return err
}
locker.Lock()
result = append(result, partialResult...)
locker.Unlock()
return nil
})
if err != nil {
return nil, err
}
sort.Slice(result, func(i, j int) bool {
return result[i].Value > result[j].Value
})
if len(result) > types.Int(size) {
result = result[:types.Int(size)]
} }
_, err = query.
FindAll()
return return
} }
// FindItemStatsWithServerIdAndLastTime 取得节点最近一次计时前 N 个数据 // FindItemStatsWithServerIdAndLastTime 取得服务最近一次计时前 N 个数据
// 适合每条数据中包含不同的Key的场景 // 适合每条数据中包含不同的Key的场景
func (this *MetricStatDAO) FindItemStatsWithServerIdAndLastTime(tx *dbs.Tx, serverId int64, itemId int64, ignoreEmptyKeys bool, ignoreKeys []string, version int32, size int64) (result []*MetricStat, err error) { func (this *MetricStatDAO) FindItemStatsWithServerIdAndLastTime(tx *dbs.Tx, serverId int64, itemId int64, ignoreEmptyKeys bool, ignoreKeys []string, version int32, size int64) (result []*MetricStat, err error) {
// 最近一次时间 // 最近一次时间
statOne, err := this.Query(tx). lastTime, err := SharedMetricItemDAO.FindMetricLastTime(tx, itemId)
Attr("itemId", itemId). if err != nil || len(lastTime) == 0 {
Attr("version", version).
DescPk().
Find()
if err != nil {
return nil, err return nil, err
} }
if statOne == nil {
return nil, nil
}
var lastStat = statOne.(*MetricStat)
var lastTime = lastStat.Time
var query = this.Query(tx). var query = this.Query(tx).
Table(this.partialTable(serverId)).
UseIndex("server_item_time"). UseIndex("server_item_time").
Attr("serverId", serverId). Attr("serverId", serverId).
Attr("itemId", itemId). Attr("itemId", itemId).
@@ -326,68 +444,115 @@ func (this *MetricStatDAO) FindItemStatsWithServerIdAndLastTime(tx *dbs.Tx, serv
// FindLatestItemStats 取得所有集群上最近 N 个时间的数据 // FindLatestItemStats 取得所有集群上最近 N 个时间的数据
// 适合同个Key在不同时间段的变化场景 // 适合同个Key在不同时间段的变化场景
func (this *MetricStatDAO) FindLatestItemStats(tx *dbs.Tx, itemId int64, ignoreEmptyKeys bool, ignoreKeys []string, version int32, size int64) (result []*MetricStat, err error) { func (this *MetricStatDAO) FindLatestItemStats(tx *dbs.Tx, itemId int64, ignoreEmptyKeys bool, ignoreKeys []string, version int32, size int64) (result []*MetricStat, err error) {
var query = this.Query(tx). err = this.runBatch(func(table string, locker *sync.Mutex) error {
Attr("itemId", itemId). var partialResult = []*MetricStat{}
Attr("version", version). var query = this.Query(tx).
// TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等 Table(table).
// TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE Attr("itemId", itemId).
Result("time", "SUM(value) AS value", "MIN(`keys`) AS `keys`"). Attr("version", version).
Desc("time"). // TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等
Group("time"). // TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE
Limit(size). Result("time", "SUM(value) AS value", "MIN(`keys`) AS `keys`").
Slice(&result) Desc("time").
if ignoreEmptyKeys { Group("time").
query.Where("NOT JSON_CONTAINS(`keys`, '\"\"')") Limit(size).
} Slice(&partialResult)
if len(ignoreKeys) > 0 { if ignoreEmptyKeys {
ignoreKeysJSON, err := json.Marshal(ignoreKeys) query.Where("NOT JSON_CONTAINS(`keys`, '\"\"')")
if err != nil { }
return nil, err if len(ignoreKeys) > 0 {
ignoreKeysJSON, err := json.Marshal(ignoreKeys)
if err != nil {
return err
}
query.Where("NOT JSON_CONTAINS(:ignoredKeys, JSON_EXTRACT(`keys`, '$[0]'))") // TODO $[0] 需要换成keys中的primary key位置
query.Param("ignoredKeys", string(ignoreKeysJSON))
} }
query.Where("NOT JSON_CONTAINS(:ignoredKeys, JSON_EXTRACT(`keys`, '$[0]'))") // TODO $[0] 需要换成keys中的primary key位置
query.Param("ignoredKeys", string(ignoreKeysJSON))
}
_, err = query. _, err = query.
FindAll() FindAll()
if err != nil {
return err
}
locker.Lock()
result = append(result, partialResult...)
locker.Unlock()
return nil
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
sort.Slice(result, func(i, j int) bool {
return result[i].Time > result[j].Time
})
if len(result) > types.Int(size) {
result = result[:types.Int(size)]
}
lists.Reverse(result) lists.Reverse(result)
return return
} }
// FindLatestItemStatsWithClusterId 取得集群最近 N 个时间的数据 // FindLatestItemStatsWithClusterId 取得集群最近 N 个时间的数据
// 适合同个Key在不同时间段的变化场景 // 适合同个Key在不同时间段的变化场景
func (this *MetricStatDAO) FindLatestItemStatsWithClusterId(tx *dbs.Tx, clusterId int64, itemId int64, ignoreEmptyKeys bool, ignoreKeys []string, version int32, size int64) (result []*MetricStat, err error) { func (this *MetricStatDAO) FindLatestItemStatsWithClusterId(tx *dbs.Tx, clusterId int64, itemId int64, ignoreEmptyKeys bool, ignoreKeys []string, version int32, size int64) (result []*MetricStat, err error) {
var query = this.Query(tx). err = this.runBatch(func(table string, locker *sync.Mutex) error {
Attr("clusterId", clusterId). var partialResult = []*MetricStat{}
Attr("itemId", itemId). var query = this.Query(tx).
Attr("version", version). Table(table).
// TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等 Attr("clusterId", clusterId).
// TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE Attr("itemId", itemId).
Result("time", "SUM(value) AS value", "MIN(`keys`) AS `keys`"). Attr("version", version).
Desc("time"). // TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等
Group("time"). // TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE
Limit(size). Result("time", "SUM(value) AS value", "MIN(`keys`) AS `keys`").
Slice(&result) Desc("time").
if ignoreEmptyKeys { Group("time").
query.Where("NOT JSON_CONTAINS(`keys`, '\"\"')") Limit(size).
} Slice(&partialResult)
if len(ignoreKeys) > 0 { if ignoreEmptyKeys {
ignoreKeysJSON, err := json.Marshal(ignoreKeys) query.Where("NOT JSON_CONTAINS(`keys`, '\"\"')")
if err != nil { }
return nil, err if len(ignoreKeys) > 0 {
ignoreKeysJSON, err := json.Marshal(ignoreKeys)
if err != nil {
return err
}
query.Where("NOT JSON_CONTAINS(:ignoredKeys, JSON_EXTRACT(`keys`, '$[0]'))") // TODO $[0] 需要换成keys中的primary key位置
query.Param("ignoredKeys", string(ignoreKeysJSON))
} }
query.Where("NOT JSON_CONTAINS(:ignoredKeys, JSON_EXTRACT(`keys`, '$[0]'))") // TODO $[0] 需要换成keys中的primary key位置
query.Param("ignoredKeys", string(ignoreKeysJSON))
}
_, err = query. _, err = query.
FindAll() FindAll()
if err != nil {
return err
}
locker.Lock()
result = append(result, partialResult...)
locker.Unlock()
return nil
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
sort.Slice(result, func(i, j int) bool {
return result[i].Time > result[j].Time
})
if len(result) > types.Int(size) {
result = result[:types.Int(size)]
}
lists.Reverse(result) lists.Reverse(result)
return return
} }
@@ -395,34 +560,53 @@ func (this *MetricStatDAO) FindLatestItemStatsWithClusterId(tx *dbs.Tx, clusterI
// FindLatestItemStatsWithNodeId 取得节点最近 N 个时间的数据 // FindLatestItemStatsWithNodeId 取得节点最近 N 个时间的数据
// 适合同个Key在不同时间段的变化场景 // 适合同个Key在不同时间段的变化场景
func (this *MetricStatDAO) FindLatestItemStatsWithNodeId(tx *dbs.Tx, nodeId int64, itemId int64, ignoreEmptyKeys bool, ignoreKeys []string, version int32, size int64) (result []*MetricStat, err error) { func (this *MetricStatDAO) FindLatestItemStatsWithNodeId(tx *dbs.Tx, nodeId int64, itemId int64, ignoreEmptyKeys bool, ignoreKeys []string, version int32, size int64) (result []*MetricStat, err error) {
var query = this.Query(tx). err = this.runBatch(func(table string, locker *sync.Mutex) error {
Attr("nodeId", nodeId). var partialResult = []*MetricStat{}
Attr("itemId", itemId). var query = this.Query(tx).
Attr("version", version). Table(table).
// TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等 Attr("nodeId", nodeId).
// TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE Attr("itemId", itemId).
Result("time", "SUM(value) AS value", "MIN(`keys`) AS `keys`"). Attr("version", version).
Desc("time"). // TODO 增加更多聚合算法,比如 AVG、MEDIAN、MIN、MAX 等
Group("time"). // TODO 这里的 MIN(`keys`) 在MySQL8中可以换成FIRST_VALUE
Limit(size). Result("time", "SUM(value) AS value", "MIN(`keys`) AS `keys`").
Slice(&result) Desc("time").
if ignoreEmptyKeys { Group("time").
query.Where("NOT JSON_CONTAINS(`keys`, '\"\"')") Limit(size).
} Slice(&partialResult)
if len(ignoreKeys) > 0 { if ignoreEmptyKeys {
ignoreKeysJSON, err := json.Marshal(ignoreKeys) query.Where("NOT JSON_CONTAINS(`keys`, '\"\"')")
if err != nil {
return nil, err
} }
query.Where("NOT JSON_CONTAINS(:ignoredKeys, JSON_EXTRACT(`keys`, '$[0]'))") // TODO $[0] 需要换成keys中的primary key位置 if len(ignoreKeys) > 0 {
query.Param("ignoredKeys", string(ignoreKeysJSON)) ignoreKeysJSON, err := json.Marshal(ignoreKeys)
if err != nil {
return err
}
query.Where("NOT JSON_CONTAINS(:ignoredKeys, JSON_EXTRACT(`keys`, '$[0]'))") // TODO $[0] 需要换成keys中的primary key位置
query.Param("ignoredKeys", string(ignoreKeysJSON))
}
_, err = query.
FindAll()
if err != nil {
return err
}
locker.Lock()
result = append(result, partialResult...)
locker.Unlock()
return nil
})
sort.Slice(result, func(i, j int) bool {
return result[i].Time > result[j].Time
})
if len(result) > types.Int(size) {
result = result[:types.Int(size)]
} }
_, err = query.
FindAll()
if err != nil {
return nil, err
}
lists.Reverse(result) lists.Reverse(result)
return return
} }
@@ -431,6 +615,7 @@ func (this *MetricStatDAO) FindLatestItemStatsWithNodeId(tx *dbs.Tx, nodeId int6
// 适合同个Key在不同时间段的变化场景 // 适合同个Key在不同时间段的变化场景
func (this *MetricStatDAO) FindLatestItemStatsWithServerId(tx *dbs.Tx, serverId int64, itemId int64, ignoreEmptyKeys bool, ignoreKeys []string, version int32, size int64) (result []*MetricStat, err error) { func (this *MetricStatDAO) FindLatestItemStatsWithServerId(tx *dbs.Tx, serverId int64, itemId int64, ignoreEmptyKeys bool, ignoreKeys []string, version int32, size int64) (result []*MetricStat, err error) {
var query = this.Query(tx). var query = this.Query(tx).
Table(this.partialTable(serverId)).
Attr("serverId", serverId). Attr("serverId", serverId).
Attr("itemId", itemId). Attr("itemId", itemId).
Attr("version", version). Attr("version", version).
@@ -480,12 +665,16 @@ func (this *MetricStatDAO) Clean(tx *dbs.Tx) error {
ExpiresPeriod: int(item.ExpiresPeriod), ExpiresPeriod: int(item.ExpiresPeriod),
} }
var expiresDay = config.ServerExpiresDay() var expiresDay = config.ServerExpiresDay()
_, err := this.Query(tx). err := this.runBatch(func(table string, locker *sync.Mutex) error {
Attr("itemId", item.Id). _, err := this.Query(tx).
Lte("createdDay", expiresDay). Table(table).
UseIndex("createdDay"). Attr("itemId", item.Id).
Limit(100_000). // 一次性不要删除太多,防止阻塞其他操作 Lte("createdDay", expiresDay).
Delete() UseIndex("createdDay").
Limit(10_000). // 一次性不要删除太多,防止阻塞其他操作
Delete()
return err
})
if err != nil { if err != nil {
return err return err
} }
@@ -500,3 +689,29 @@ func (this *MetricStatDAO) Clean(tx *dbs.Tx) error {
} }
return nil return nil
} }
// 获取分区表
func (this *MetricStatDAO) partialTable(serverId int64) string {
return this.Table + "_" + types.String(serverId%int64(MetricStatTablePartials))
}
// 批量执行
func (this *MetricStatDAO) runBatch(f func(table string, locker *sync.Mutex) error) error {
var locker = &sync.Mutex{}
var wg = sync.WaitGroup{}
wg.Add(MetricStatTablePartials)
var resultErr error
for i := 0; i < MetricStatTablePartials; i++ {
var table = this.partialTable(int64(i))
go func(table string) {
defer wg.Done()
err := f(table, locker)
if err != nil {
resultErr = err
}
}(table)
}
wg.Wait()
return resultErr
}

View File

@@ -1,6 +1,7 @@
package models package models_test
import ( import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
_ "github.com/iwind/TeaGo/bootstrap" _ "github.com/iwind/TeaGo/bootstrap"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
@@ -8,11 +9,12 @@ import (
"github.com/iwind/TeaGo/types" "github.com/iwind/TeaGo/types"
timeutil "github.com/iwind/TeaGo/utils/time" timeutil "github.com/iwind/TeaGo/utils/time"
"testing" "testing"
"time"
) )
func TestNewMetricStatDAO_InsertMany(t *testing.T) { func TestNewMetricStatDAO_InsertMany(t *testing.T) {
for i := 0; i <= 1; i++ { for i := 0; i <= 1; i++ {
err := NewMetricStatDAO().CreateStat(nil, types.String(i)+"_v1", 18, int64(rands.Int(0, 10000)), int64(rands.Int(0, 10000)), int64(rands.Int(0, 100)), []string{"/html" + types.String(i)}, 1, timeutil.Format("Ymd"), 0) err := models.NewMetricStatDAO().CreateStat(nil, types.String(i)+"_v1", 18, int64(rands.Int(0, 10000)), int64(rands.Int(0, 10000)), int64(rands.Int(0, 100)), []string{"/html" + types.String(i)}, 1, timeutil.Format("Ymd"), 0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -23,12 +25,38 @@ func TestNewMetricStatDAO_InsertMany(t *testing.T) {
t.Log("done") t.Log("done")
} }
func TestMetricStatDAO_Clean(t *testing.T) { func TestMetricStatDAO_Clean2(t *testing.T) {
dbs.NotifyReady() dbs.NotifyReady()
err := NewMetricStatDAO().Clean(nil) err := models.NewMetricStatDAO().Clean(nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
t.Log("ok") t.Log("ok")
} }
func TestMetricStatDAO_DeleteNodeItemStats(t *testing.T) {
var dao = models.NewMetricStatDAO()
var before = time.Now()
defer func() {
t.Log(time.Since(before).Seconds()*1000, "ms")
}()
err := dao.DeleteNodeItemStats(nil, 1, 0, 1, timeutil.Format("Ymd"))
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}
func TestMetricStatDAO_CountItemStats(t *testing.T) {
var dao = models.NewMetricStatDAO()
var before = time.Now()
defer func() {
t.Log(time.Since(before).Seconds()*1000, "ms")
}()
count, err := dao.CountItemStats(nil, 1, 1)
if err != nil {
t.Fatal(err)
}
t.Log("count:", count)
}

File diff suppressed because one or more lines are too long