指标统计数据分表

This commit is contained in:
刘祥超
2022-03-30 15:35:42 +08:00
parent 4b3a9cedfa
commit 1c1e82ee38
5 changed files with 136 additions and 60 deletions

View File

@@ -9,12 +9,16 @@ import (
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/maps" "github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/rands" "github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
timeutil "github.com/iwind/TeaGo/utils/time" timeutil "github.com/iwind/TeaGo/utils/time"
"sync"
"time" "time"
) )
type MetricSumStatDAO dbs.DAO type MetricSumStatDAO dbs.DAO
const MetricSumStatTablePartials = 20 // 表格Partial数量
func init() { func init() {
dbs.OnReadyDone(func() { dbs.OnReadyDone(func() {
// 清理数据任务 // 清理数据任务
@@ -52,6 +56,7 @@ func init() {
// UpdateSum 更新统计数据 // UpdateSum 更新统计数据
func (this *MetricSumStatDAO) UpdateSum(tx *dbs.Tx, clusterId int64, nodeId int64, serverId int64, time string, itemId int64, version int32, count int64, total float32) error { func (this *MetricSumStatDAO) UpdateSum(tx *dbs.Tx, clusterId int64, nodeId int64, serverId int64, time string, itemId int64, version int32, count int64, total float32) error {
return this.Query(tx). return this.Query(tx).
Table(this.partialTable(serverId)).
InsertOrUpdateQuickly(maps.Map{ InsertOrUpdateQuickly(maps.Map{
"clusterId": clusterId, "clusterId": clusterId,
"nodeId": nodeId, "nodeId": nodeId,
@@ -71,6 +76,7 @@ func (this *MetricSumStatDAO) UpdateSum(tx *dbs.Tx, clusterId int64, nodeId int6
// FindNodeServerSum 查找某个服务在某个节点上的统计数据 // FindNodeServerSum 查找某个服务在某个节点上的统计数据
func (this *MetricSumStatDAO) FindNodeServerSum(tx *dbs.Tx, nodeId int64, serverId int64, time string, itemId int64, version int32) (count int64, total float32, err error) { func (this *MetricSumStatDAO) FindNodeServerSum(tx *dbs.Tx, nodeId int64, serverId int64, time string, itemId int64, version int32) (count int64, total float32, err error) {
one, err := this.Query(tx). one, err := this.Query(tx).
Table(this.partialTable(serverId)).
Attr("nodeId", nodeId). Attr("nodeId", nodeId).
Attr("serverId", serverId). Attr("serverId", serverId).
Attr("time", time). Attr("time", time).
@@ -83,29 +89,42 @@ func (this *MetricSumStatDAO) FindNodeServerSum(tx *dbs.Tx, nodeId int64, server
if one == nil { if one == nil {
return return
} }
return int64(one.(*MetricSumStat).Count), float32(one.(*MetricSumStat).Total), nil
count = int64(one.(*MetricSumStat).Count)
total = float32(one.(*MetricSumStat).Total)
return
} }
// FindSumAtTime 查找某个时间的统计数据 // FindSumAtTime 查找某个时间的统计数据
func (this *MetricSumStatDAO) FindSumAtTime(tx *dbs.Tx, time string, itemId int64, version int32) (count int64, total float32, err error) { func (this *MetricSumStatDAO) FindSumAtTime(tx *dbs.Tx, time string, itemId int64, version int32) (count int64, total float32, err error) {
err = this.runBatch(func(table string, locker *sync.Mutex) error {
one, err := this.Query(tx). one, err := this.Query(tx).
Table(table).
Attr("time", time). Attr("time", time).
Attr("itemId", itemId). Attr("itemId", itemId).
Attr("version", version). Attr("version", version).
Result("SUM(count) AS `count`, SUM(total) AS total"). Result("SUM(count) AS `count`, SUM(total) AS total").
Find() Find()
if err != nil { if err != nil {
return 0, 0, err return err
} }
if one == nil { if one == nil {
return return nil
} }
return int64(one.(*MetricSumStat).Count), float32(one.(*MetricSumStat).Total), nil locker.Lock()
count += int64(one.(*MetricSumStat).Count)
total += float32(one.(*MetricSumStat).Total)
locker.Unlock()
return nil
})
return
} }
// FindServerSum 查找某个服务的统计数据 // FindServerSum 查找某个服务的统计数据
func (this *MetricSumStatDAO) FindServerSum(tx *dbs.Tx, serverId int64, time string, itemId int64, version int32) (count int64, total float32, err error) { func (this *MetricSumStatDAO) FindServerSum(tx *dbs.Tx, serverId int64, time string, itemId int64, version int32) (count int64, total float32, err error) {
one, err := this.Query(tx). one, err := this.Query(tx).
Table(this.partialTable(serverId)).
UseIndex("server_item_time"). UseIndex("server_item_time").
Attr("serverId", serverId). Attr("serverId", serverId).
Attr("time", time). Attr("time", time).
@@ -124,7 +143,9 @@ func (this *MetricSumStatDAO) FindServerSum(tx *dbs.Tx, serverId int64, time str
// FindClusterSum 查找集群上的统计数据 // FindClusterSum 查找集群上的统计数据
func (this *MetricSumStatDAO) FindClusterSum(tx *dbs.Tx, clusterId int64, time string, itemId int64, version int32) (count int64, total float32, err error) { func (this *MetricSumStatDAO) FindClusterSum(tx *dbs.Tx, clusterId int64, time string, itemId int64, version int32) (count int64, total float32, err error) {
err = this.runBatch(func(table string, locker *sync.Mutex) error {
one, err := this.Query(tx). one, err := this.Query(tx).
Table(table).
UseIndex("cluster_item_time"). UseIndex("cluster_item_time").
Attr("clusterId", clusterId). Attr("clusterId", clusterId).
Attr("time", time). Attr("time", time).
@@ -133,17 +154,26 @@ func (this *MetricSumStatDAO) FindClusterSum(tx *dbs.Tx, clusterId int64, time s
Result("SUM(count) AS `count`, SUM(total) AS total"). Result("SUM(count) AS `count`, SUM(total) AS total").
Find() Find()
if err != nil { if err != nil {
return 0, 0, err return err
} }
if one == nil { if one == nil {
return return nil
} }
return int64(one.(*MetricSumStat).Count), float32(one.(*MetricSumStat).Total), nil locker.Lock()
count += int64(one.(*MetricSumStat).Count)
total += float32(one.(*MetricSumStat).Total)
locker.Unlock()
return nil
})
return
} }
// FindNodeSum 查找节点上的统计数据 // FindNodeSum 查找节点上的统计数据
func (this *MetricSumStatDAO) FindNodeSum(tx *dbs.Tx, nodeId int64, time string, itemId int64, version int32) (count int64, total float32, err error) { func (this *MetricSumStatDAO) FindNodeSum(tx *dbs.Tx, nodeId int64, time string, itemId int64, version int32) (count int64, total float32, err error) {
err = this.runBatch(func(table string, locker *sync.Mutex) error {
one, err := this.Query(tx). one, err := this.Query(tx).
Table(table).
UseIndex("node_item_time"). UseIndex("node_item_time").
Attr("nodeId", nodeId). Attr("nodeId", nodeId).
Attr("time", time). Attr("time", time).
@@ -152,20 +182,30 @@ func (this *MetricSumStatDAO) FindNodeSum(tx *dbs.Tx, nodeId int64, time string,
Result("SUM(count) AS `count`, SUM(total) AS total"). Result("SUM(count) AS `count`, SUM(total) AS total").
Find() Find()
if err != nil { if err != nil {
return 0, 0, err return err
} }
if one == nil { if one == nil {
return return nil
} }
return int64(one.(*MetricSumStat).Count), float32(one.(*MetricSumStat).Total), nil locker.Lock()
count += int64(one.(*MetricSumStat).Count)
total += float32(one.(*MetricSumStat).Total)
locker.Unlock()
return nil
})
return
} }
// DeleteItemStats 删除某个指标相关的统计数据 // DeleteItemStats 删除某个指标相关的统计数据
func (this *MetricSumStatDAO) DeleteItemStats(tx *dbs.Tx, itemId int64) error { func (this *MetricSumStatDAO) DeleteItemStats(tx *dbs.Tx, itemId int64) error {
return this.runBatch(func(table string, locker *sync.Mutex) error {
_, err := this.Query(tx). _, err := this.Query(tx).
Table(table).
Attr("itemId", itemId). Attr("itemId", itemId).
Delete() Delete()
return err return err
})
} }
// Clean 清理数据 // Clean 清理数据
@@ -186,13 +226,17 @@ func (this *MetricSumStatDAO) Clean(tx *dbs.Tx) error {
ExpiresPeriod: int(item.ExpiresPeriod), ExpiresPeriod: int(item.ExpiresPeriod),
} }
var expiresDay = config.ServerExpiresDay() var expiresDay = config.ServerExpiresDay()
err = this.runBatch(func(table string, locker *sync.Mutex) error {
_, err := this.Query(tx). _, err := this.Query(tx).
Table(table).
Attr("itemId", item.Id). Attr("itemId", item.Id).
Where("(createdDay IS NULL OR createdDay<:day)"). Where("(createdDay IS NULL OR createdDay<:day)").
Param("day", expiresDay). Param("day", expiresDay).
UseIndex("createdDay"). UseIndex("createdDay").
Limit(100_000). // 一次性不要删除太多,防止阻塞其他操作 Limit(10_000). // 一次性不要删除太多,防止阻塞其他操作
Delete() Delete()
return err
})
if err != nil { if err != nil {
return err return err
} }
@@ -207,3 +251,29 @@ func (this *MetricSumStatDAO) Clean(tx *dbs.Tx) error {
} }
return nil return nil
} }
// 获取分区表
func (this *MetricSumStatDAO) partialTable(serverId int64) string {
return this.Table + "_" + types.String(serverId%int64(MetricSumStatTablePartials))
}
// 批量执行
func (this *MetricSumStatDAO) runBatch(f func(table string, locker *sync.Mutex) error) error {
var locker = &sync.Mutex{}
var wg = sync.WaitGroup{}
wg.Add(MetricSumStatTablePartials)
var resultErr error
for i := 0; i < MetricSumStatTablePartials; i++ {
var table = this.partialTable(int64(i))
go func(table string) {
defer wg.Done()
err := f(table, locker)
if err != nil {
resultErr = err
}
}(table)
}
wg.Wait()
return resultErr
}

View File

@@ -1,16 +1,22 @@
package models package models_test
import ( import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
_ "github.com/iwind/TeaGo/bootstrap" _ "github.com/iwind/TeaGo/bootstrap"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
timeutil "github.com/iwind/TeaGo/utils/time"
"testing" "testing"
) )
func TestMetricSumStatDAO_FindNodeSum(t *testing.T) {
t.Log(models.NewMetricSumStatDAO().FindNodeSum(nil, 46, timeutil.Format("Ymd"), 1, 1))
}
func TestMetricSumStatDAO_Clean(t *testing.T) { func TestMetricSumStatDAO_Clean(t *testing.T) {
dbs.NotifyReady() dbs.NotifyReady()
err := NewMetricSumStatDAO().Clean(nil) err := models.NewMetricSumStatDAO().Clean(nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -37,7 +37,7 @@ func (this *DBService) FindAllDBTables(ctx context.Context, req *pb.FindAllDBTab
if strings.HasPrefix(lowerTableName, "edgehttpaccesslogs_") { if strings.HasPrefix(lowerTableName, "edgehttpaccesslogs_") {
canDelete = true canDelete = true
canClean = true canClean = true
} else if lists.ContainsString([]string{"edgemessages", "edgelogs", "edgenodelogs", "edgemetricstats", "edgemetricsumstats", "edgeserverdomainhourlystats", "edgeserverregionprovincemonthlystats", "edgeserverregionprovidermonthlystats", "edgeserverregioncountrymonthlystats", "edgeserverregioncountrydailystats", "edgeserverregioncitymonthlystats", "edgeserverhttpfirewallhourlystats", "edgeserverhttpfirewalldailystats", "edgenodeclustertrafficdailystats", "edgenodetrafficdailystats", "edgenodetraffichourlystats", "edgensrecordhourlystats", "edgeserverclientbrowsermonthlystats", "edgeserverclientsystemmonthlystats"}, lowerTableName) || strings.HasPrefix(lowerTableName, "edgeserverdomainhourlystats_") || strings.HasPrefix(lowerTableName, "edgemetricstats_") { } else if lists.ContainsString([]string{"edgemessages", "edgelogs", "edgenodelogs", "edgemetricstats", "edgemetricsumstats", "edgeserverdomainhourlystats", "edgeserverregionprovincemonthlystats", "edgeserverregionprovidermonthlystats", "edgeserverregioncountrymonthlystats", "edgeserverregioncountrydailystats", "edgeserverregioncitymonthlystats", "edgeserverhttpfirewallhourlystats", "edgeserverhttpfirewalldailystats", "edgenodeclustertrafficdailystats", "edgenodetrafficdailystats", "edgenodetraffichourlystats", "edgensrecordhourlystats", "edgeserverclientbrowsermonthlystats", "edgeserverclientsystemmonthlystats"}, lowerTableName) || strings.HasPrefix(lowerTableName, "edgeserverdomainhourlystats_") || strings.HasPrefix(lowerTableName, "edgemetricstats_") || strings.HasPrefix(lowerTableName, "edgemetricsumstats_") {
canClean = true canClean = true
} }

View File

@@ -211,7 +211,7 @@ func (this *DBNodeService) FindAllDBNodeTables(ctx context.Context, req *pb.Find
if strings.HasPrefix(lowerTableName, "edgehttpaccesslogs_") || strings.HasPrefix(lowerTableName, "edgensaccesslogs_") { if strings.HasPrefix(lowerTableName, "edgehttpaccesslogs_") || strings.HasPrefix(lowerTableName, "edgensaccesslogs_") {
canDelete = true canDelete = true
canClean = true canClean = true
} else if lists.ContainsString([]string{"edgemessages", "edgelogs", "edgenodelogs", "edgemetricstats", "edgemetricsumstats", "edgeserverdomainhourlystats", "edgeserverregionprovincemonthlystats", "edgeserverregionprovidermonthlystats", "edgeserverregioncountrymonthlystats", "edgeserverregioncountrydailystats", "edgeserverregioncitymonthlystats", "edgeserverhttpfirewallhourlystats", "edgeserverhttpfirewalldailystats", "edgenodeclustertrafficdailystats", "edgenodetrafficdailystats", "edgenodetraffichourlystats", "edgensrecordhourlystats", "edgeserverclientbrowsermonthlystats", "edgeserverclientsystemmonthlystats"}, lowerTableName) || strings.HasPrefix(lowerTableName, "edgeserverdomainhourlystats_") || strings.HasPrefix(lowerTableName, "edgemetricstats_") { } else if lists.ContainsString([]string{"edgemessages", "edgelogs", "edgenodelogs", "edgemetricstats", "edgemetricsumstats", "edgeserverdomainhourlystats", "edgeserverregionprovincemonthlystats", "edgeserverregionprovidermonthlystats", "edgeserverregioncountrymonthlystats", "edgeserverregioncountrydailystats", "edgeserverregioncitymonthlystats", "edgeserverhttpfirewallhourlystats", "edgeserverhttpfirewalldailystats", "edgenodeclustertrafficdailystats", "edgenodetrafficdailystats", "edgenodetraffichourlystats", "edgensrecordhourlystats", "edgeserverclientbrowsermonthlystats", "edgeserverclientsystemmonthlystats"}, lowerTableName) || strings.HasPrefix(lowerTableName, "edgeserverdomainhourlystats_") || strings.HasPrefix(lowerTableName, "edgemetricstats_") || strings.HasPrefix(lowerTableName, "edgemetricsumstats_") {
canClean = true canClean = true
} }

File diff suppressed because one or more lines are too long