mirror of
https://github.com/TeaOSLab/EdgeAPI.git
synced 2025-11-06 18:10:25 +08:00
优化指标数据清理
This commit is contained in:
@@ -78,6 +78,12 @@ func (this *MetricItemDAO) DisableMetricItem(tx *dbs.Tx, itemId int64) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = SharedMetricSumStatDAO.DeleteItemStats(tx, itemId)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package models
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||||
_ "github.com/go-sql-driver/mysql"
|
_ "github.com/go-sql-driver/mysql"
|
||||||
"github.com/iwind/TeaGo/Tea"
|
"github.com/iwind/TeaGo/Tea"
|
||||||
"github.com/iwind/TeaGo/dbs"
|
"github.com/iwind/TeaGo/dbs"
|
||||||
@@ -22,7 +23,7 @@ func init() {
|
|||||||
var ticker = time.NewTicker(time.Duration(rands.Int(24, 48)) * time.Hour)
|
var ticker = time.NewTicker(time.Duration(rands.Int(24, 48)) * time.Hour)
|
||||||
go func() {
|
go func() {
|
||||||
for range ticker.C {
|
for range ticker.C {
|
||||||
err := SharedMetricStatDAO.Clean(nil, 30) // 只保留30天
|
err := SharedMetricStatDAO.Clean(nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logs.Println("SharedMetricStatDAO: clean expired data failed: " + err.Error())
|
logs.Println("SharedMetricStatDAO: clean expired data failed: " + err.Error())
|
||||||
}
|
}
|
||||||
@@ -461,12 +462,38 @@ func (this *MetricStatDAO) FindLatestItemStatsWithServerId(tx *dbs.Tx, serverId
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Clean 清理数据
|
// Clean 清理数据
|
||||||
func (this *MetricStatDAO) Clean(tx *dbs.Tx, days int64) error {
|
func (this *MetricStatDAO) Clean(tx *dbs.Tx) error {
|
||||||
_, err := this.Query(tx).
|
for _, category := range serverconfigs.FindAllMetricItemCategoryCodes() {
|
||||||
Lt("createdDay", timeutil.FormatTime("Ymd", time.Now().Unix()-days*86400)).
|
var offset int64 = 0
|
||||||
Delete()
|
var size int64 = 100
|
||||||
if err != nil {
|
for {
|
||||||
return err
|
items, err := SharedMetricItemDAO.ListEnabledItems(tx, category, offset, size)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, item := range items {
|
||||||
|
var config = &serverconfigs.MetricItemConfig{
|
||||||
|
Id: int64(item.Id),
|
||||||
|
Period: int(item.Period),
|
||||||
|
PeriodUnit: item.PeriodUnit,
|
||||||
|
}
|
||||||
|
var expiresDay = config.ServerExpiresDay()
|
||||||
|
_, err := this.Query(tx).
|
||||||
|
Attr("itemId", item.Id).
|
||||||
|
Lte("createdDay", expiresDay).
|
||||||
|
Limit(100_000). // 一次性不要删除太多,防止阻塞其他操作
|
||||||
|
Delete()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(items) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
offset += size
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package models
|
|||||||
import (
|
import (
|
||||||
_ "github.com/go-sql-driver/mysql"
|
_ "github.com/go-sql-driver/mysql"
|
||||||
_ "github.com/iwind/TeaGo/bootstrap"
|
_ "github.com/iwind/TeaGo/bootstrap"
|
||||||
|
"github.com/iwind/TeaGo/dbs"
|
||||||
"github.com/iwind/TeaGo/rands"
|
"github.com/iwind/TeaGo/rands"
|
||||||
"github.com/iwind/TeaGo/types"
|
"github.com/iwind/TeaGo/types"
|
||||||
timeutil "github.com/iwind/TeaGo/utils/time"
|
timeutil "github.com/iwind/TeaGo/utils/time"
|
||||||
@@ -10,14 +11,24 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestNewMetricStatDAO_InsertMany(t *testing.T) {
|
func TestNewMetricStatDAO_InsertMany(t *testing.T) {
|
||||||
for i := 0; i <= 10_000_000; i++ {
|
for i := 0; i <= 1; i++ {
|
||||||
err := NewMetricStatDAO().CreateStat(nil, types.String(i)+"_v1", 18, int64(rands.Int(0, 10000)), int64(rands.Int(0, 10000)), int64(rands.Int(0, 100)), []string{"/html" + types.String(i)}, 1, timeutil.Format("Ymd"), 0)
|
err := NewMetricStatDAO().CreateStat(nil, types.String(i)+"_v1", 18, int64(rands.Int(0, 10000)), int64(rands.Int(0, 10000)), int64(rands.Int(0, 100)), []string{"/html" + types.String(i)}, 1, timeutil.Format("Ymd"), 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if i % 10000 == 0 {
|
if i%10000 == 0 {
|
||||||
t.Log(i)
|
t.Log(i)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
t.Log("done")
|
t.Log("done")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMetricStatDAO_Clean(t *testing.T) {
|
||||||
|
dbs.NotifyReady()
|
||||||
|
|
||||||
|
err := NewMetricStatDAO().Clean(nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
t.Log("ok")
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package models
|
package models
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||||
_ "github.com/go-sql-driver/mysql"
|
_ "github.com/go-sql-driver/mysql"
|
||||||
"github.com/iwind/TeaGo/Tea"
|
"github.com/iwind/TeaGo/Tea"
|
||||||
"github.com/iwind/TeaGo/dbs"
|
"github.com/iwind/TeaGo/dbs"
|
||||||
@@ -19,7 +20,7 @@ func init() {
|
|||||||
var ticker = time.NewTicker(time.Duration(rands.Int(24, 48)) * time.Hour)
|
var ticker = time.NewTicker(time.Duration(rands.Int(24, 48)) * time.Hour)
|
||||||
go func() {
|
go func() {
|
||||||
for range ticker.C {
|
for range ticker.C {
|
||||||
err := SharedMetricSumStatDAO.Clean(nil, 30) // 只保留30天
|
err := SharedMetricSumStatDAO.Clean(nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logs.Println("SharedMetricSumStatDAO: clean expired data failed: " + err.Error())
|
logs.Println("SharedMetricSumStatDAO: clean expired data failed: " + err.Error())
|
||||||
}
|
}
|
||||||
@@ -104,6 +105,7 @@ func (this *MetricSumStatDAO) FindSumAtTime(tx *dbs.Tx, time string, itemId int6
|
|||||||
// FindServerSum 查找某个服务的统计数据
|
// FindServerSum 查找某个服务的统计数据
|
||||||
func (this *MetricSumStatDAO) FindServerSum(tx *dbs.Tx, serverId int64, time string, itemId int64, version int32) (count int64, total float32, err error) {
|
func (this *MetricSumStatDAO) FindServerSum(tx *dbs.Tx, serverId int64, time string, itemId int64, version int32) (count int64, total float32, err error) {
|
||||||
one, err := this.Query(tx).
|
one, err := this.Query(tx).
|
||||||
|
UseIndex("server_item_time").
|
||||||
Attr("serverId", serverId).
|
Attr("serverId", serverId).
|
||||||
Attr("time", time).
|
Attr("time", time).
|
||||||
Attr("itemId", itemId).
|
Attr("itemId", itemId).
|
||||||
@@ -122,6 +124,7 @@ func (this *MetricSumStatDAO) FindServerSum(tx *dbs.Tx, serverId int64, time str
|
|||||||
// FindClusterSum 查找集群上的统计数据
|
// FindClusterSum 查找集群上的统计数据
|
||||||
func (this *MetricSumStatDAO) FindClusterSum(tx *dbs.Tx, clusterId int64, time string, itemId int64, version int32) (count int64, total float32, err error) {
|
func (this *MetricSumStatDAO) FindClusterSum(tx *dbs.Tx, clusterId int64, time string, itemId int64, version int32) (count int64, total float32, err error) {
|
||||||
one, err := this.Query(tx).
|
one, err := this.Query(tx).
|
||||||
|
UseIndex("cluster_item_time").
|
||||||
Attr("clusterId", clusterId).
|
Attr("clusterId", clusterId).
|
||||||
Attr("time", time).
|
Attr("time", time).
|
||||||
Attr("itemId", itemId).
|
Attr("itemId", itemId).
|
||||||
@@ -140,6 +143,7 @@ func (this *MetricSumStatDAO) FindClusterSum(tx *dbs.Tx, clusterId int64, time s
|
|||||||
// FindNodeSum 查找节点上的统计数据
|
// FindNodeSum 查找节点上的统计数据
|
||||||
func (this *MetricSumStatDAO) FindNodeSum(tx *dbs.Tx, nodeId int64, time string, itemId int64, version int32) (count int64, total float32, err error) {
|
func (this *MetricSumStatDAO) FindNodeSum(tx *dbs.Tx, nodeId int64, time string, itemId int64, version int32) (count int64, total float32, err error) {
|
||||||
one, err := this.Query(tx).
|
one, err := this.Query(tx).
|
||||||
|
UseIndex("node_item_time").
|
||||||
Attr("nodeId", nodeId).
|
Attr("nodeId", nodeId).
|
||||||
Attr("time", time).
|
Attr("time", time).
|
||||||
Attr("itemId", itemId).
|
Attr("itemId", itemId).
|
||||||
@@ -155,14 +159,48 @@ func (this *MetricSumStatDAO) FindNodeSum(tx *dbs.Tx, nodeId int64, time string,
|
|||||||
return int64(one.(*MetricSumStat).Count), float32(one.(*MetricSumStat).Total), nil
|
return int64(one.(*MetricSumStat).Count), float32(one.(*MetricSumStat).Total), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clean 清理数据
|
// DeleteItemStats 删除某个指标相关的统计数据
|
||||||
func (this *MetricSumStatDAO) Clean(tx *dbs.Tx, days int64) error {
|
func (this *MetricSumStatDAO) DeleteItemStats(tx *dbs.Tx, itemId int64) error {
|
||||||
_, err := this.Query(tx).
|
_, err := this.Query(tx).
|
||||||
Where("(createdDay IS NULL OR createdDay<:day)").
|
Attr("itemId", itemId).
|
||||||
Param("day", timeutil.FormatTime("Ymd", time.Now().Unix()-days*86400)).
|
|
||||||
Delete()
|
Delete()
|
||||||
if err != nil {
|
return err
|
||||||
return err
|
}
|
||||||
|
|
||||||
|
// Clean 清理数据
|
||||||
|
func (this *MetricSumStatDAO) Clean(tx *dbs.Tx) error {
|
||||||
|
for _, category := range serverconfigs.FindAllMetricItemCategoryCodes() {
|
||||||
|
var offset int64 = 0
|
||||||
|
var size int64 = 100
|
||||||
|
for {
|
||||||
|
items, err := SharedMetricItemDAO.ListEnabledItems(tx, category, offset, size)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, item := range items {
|
||||||
|
var config = &serverconfigs.MetricItemConfig{
|
||||||
|
Id: int64(item.Id),
|
||||||
|
Period: int(item.Period),
|
||||||
|
PeriodUnit: item.PeriodUnit,
|
||||||
|
}
|
||||||
|
var expiresDay = config.ServerExpiresDay()
|
||||||
|
_, err := this.Query(tx).
|
||||||
|
Attr("itemId", item.Id).
|
||||||
|
Where("(createdDay IS NULL OR createdDay<:day)").
|
||||||
|
Param("day", expiresDay).
|
||||||
|
Limit(100_000). // 一次性不要删除太多,防止阻塞其他操作
|
||||||
|
Delete()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(items) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
offset += size
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,11 +3,14 @@ package models
|
|||||||
import (
|
import (
|
||||||
_ "github.com/go-sql-driver/mysql"
|
_ "github.com/go-sql-driver/mysql"
|
||||||
_ "github.com/iwind/TeaGo/bootstrap"
|
_ "github.com/iwind/TeaGo/bootstrap"
|
||||||
|
"github.com/iwind/TeaGo/dbs"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMetricSumStatDAO_Clean(t *testing.T) {
|
func TestMetricSumStatDAO_Clean(t *testing.T) {
|
||||||
err := NewMetricSumStatDAO().Clean(nil, 20)
|
dbs.NotifyReady()
|
||||||
|
|
||||||
|
err := NewMetricSumStatDAO().Clean(nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user