节点上传指标数据时只上传变更的部分

This commit is contained in:
GoEdgeLab
2024-04-03 08:15:20 +08:00
parent 739dfc7d1c
commit adc66afa86
3 changed files with 31 additions and 13 deletions

View File

@@ -13,8 +13,10 @@ import (
"github.com/iwind/TeaGo/rands" "github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types" "github.com/iwind/TeaGo/types"
timeutil "github.com/iwind/TeaGo/utils/time" timeutil "github.com/iwind/TeaGo/utils/time"
"regexp"
"sort" "sort"
"strconv" "strconv"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@@ -39,6 +41,8 @@ func init() {
const MetricStatTablePartials = 20 // 表格Partial数量 const MetricStatTablePartials = 20 // 表格Partial数量
var metricHashRegexp = regexp.MustCompile(`^\w+$`)
func NewMetricStatDAO() *MetricStatDAO { func NewMetricStatDAO() *MetricStatDAO {
return dbs.NewDAO(&MetricStatDAO{ return dbs.NewDAO(&MetricStatDAO{
DAOObject: dbs.DAOObject{ DAOObject: dbs.DAOObject{
@@ -123,18 +127,30 @@ func (this *MetricStatDAO) DeleteItemStats(tx *dbs.Tx, itemId int64) error {
} }
// DeleteNodeItemStats 删除某个节点的统计数据 // DeleteNodeItemStats 删除某个节点的统计数据
func (this *MetricStatDAO) DeleteNodeItemStats(tx *dbs.Tx, nodeId int64, serverId int64, itemId int64, time string) error { func (this *MetricStatDAO) DeleteNodeItemStats(tx *dbs.Tx, nodeId int64, serverId int64, itemId int64, time string, keepKeys []string) error {
if serverId > 0 { if serverId > 0 {
_, err := this.Query(tx). var query = this.Query(tx).
Table(this.partialTable(serverId)). Table(this.partialTable(serverId)).
Attr("nodeId", nodeId). Attr("nodeId", nodeId).
Attr("serverId", serverId). Attr("serverId", serverId).
Attr("itemId", itemId). Attr("itemId", itemId).
Attr("time", time). Attr("time", time)
Delete() if len(keepKeys) > 0 {
if this.canIgnore(err) { query.Reuse(false)
var s []string
for _, k := range keepKeys {
if metricHashRegexp.MatchString(k) {
s = append(s, "'"+k+"@"+types.String(nodeId)+"'")
}
}
query.Where("hash NOT IN (" + strings.Join(s, ",") + ")")
}
err := query.
DeleteQuickly()
if err == nil || this.canIgnore(err) {
return nil return nil
} }
return err return err
} }

View File

@@ -41,7 +41,7 @@ func TestMetricStatDAO_DeleteNodeItemStats(t *testing.T) {
defer func() { defer func() {
t.Log(time.Since(before).Seconds()*1000, "ms") t.Log(time.Since(before).Seconds()*1000, "ms")
}() }()
err := dao.DeleteNodeItemStats(nil, 1, 0, 1, timeutil.Format("Ymd")) err := dao.DeleteNodeItemStats(nil, 1, 0, 1, timeutil.Format("Ymd"), nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -71,16 +71,18 @@ func init() {
}() }()
} }
err = models.SharedMetricStatDAO.DeleteNodeItemStats(tx, nodeId, serverId, itemId, req.Time) if len(req.MetricStats) > 0 {
if err != nil { err = models.SharedMetricStatDAO.DeleteNodeItemStats(tx, nodeId, serverId, itemId, req.Time, req.KeepKeys)
return err
}
for _, stat := range req.MetricStats {
err := models.SharedMetricStatDAO.CreateStat(tx, stat.Hash, clusterId, nodeId, req.ServerId, req.ItemId, stat.Keys, float64(stat.Value), req.Time, req.Version)
if err != nil { if err != nil {
return err return err
} }
for _, stat := range req.MetricStats {
err = models.SharedMetricStatDAO.CreateStat(tx, stat.Hash, clusterId, nodeId, req.ServerId, req.ItemId, stat.Keys, float64(stat.Value), req.Time, req.Version)
if err != nil {
return err
}
}
} }
// 保存总和 // 保存总和