diff --git a/internal/db/models/metric_stat_dao.go b/internal/db/models/metric_stat_dao.go index 69089223..97c6d6d9 100644 --- a/internal/db/models/metric_stat_dao.go +++ b/internal/db/models/metric_stat_dao.go @@ -13,8 +13,10 @@ import ( "github.com/iwind/TeaGo/rands" "github.com/iwind/TeaGo/types" timeutil "github.com/iwind/TeaGo/utils/time" + "regexp" "sort" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -39,6 +41,8 @@ func init() { const MetricStatTablePartials = 20 // 表格Partial数量 +var metricHashRegexp = regexp.MustCompile(`^\w+$`) + func NewMetricStatDAO() *MetricStatDAO { return dbs.NewDAO(&MetricStatDAO{ DAOObject: dbs.DAOObject{ @@ -123,18 +127,30 @@ func (this *MetricStatDAO) DeleteItemStats(tx *dbs.Tx, itemId int64) error { } // DeleteNodeItemStats 删除某个节点的统计数据 -func (this *MetricStatDAO) DeleteNodeItemStats(tx *dbs.Tx, nodeId int64, serverId int64, itemId int64, time string) error { +func (this *MetricStatDAO) DeleteNodeItemStats(tx *dbs.Tx, nodeId int64, serverId int64, itemId int64, time string, keepKeys []string) error { if serverId > 0 { - _, err := this.Query(tx). + var query = this.Query(tx). Table(this.partialTable(serverId)). Attr("nodeId", nodeId). Attr("serverId", serverId). Attr("itemId", itemId). - Attr("time", time). - Delete() - if this.canIgnore(err) { + Attr("time", time) + if len(keepKeys) > 0 { + query.Reuse(false) + var s []string + for _, k := range keepKeys { + if metricHashRegexp.MatchString(k) { + s = append(s, "'"+k+"@"+types.String(nodeId)+"'") + } + } + query.Where("hash NOT IN (" + strings.Join(s, ",") + ")") + } + err := query. + DeleteQuickly() + if err == nil || this.canIgnore(err) { return nil } + return err } diff --git a/internal/db/models/metric_stat_dao_test.go b/internal/db/models/metric_stat_dao_test.go index 783e2b4f..d94b745c 100644 --- a/internal/db/models/metric_stat_dao_test.go +++ b/internal/db/models/metric_stat_dao_test.go @@ -41,7 +41,7 @@ func TestMetricStatDAO_DeleteNodeItemStats(t *testing.T) { defer func() { t.Log(time.Since(before).Seconds()*1000, "ms") }() - err := dao.DeleteNodeItemStats(nil, 1, 0, 1, timeutil.Format("Ymd")) + err := dao.DeleteNodeItemStats(nil, 1, 0, 1, timeutil.Format("Ymd"), nil) if err != nil { t.Fatal(err) } diff --git a/internal/rpc/services/service_metric_stat.go b/internal/rpc/services/service_metric_stat.go index 350c8995..aec7fed1 100644 --- a/internal/rpc/services/service_metric_stat.go +++ b/internal/rpc/services/service_metric_stat.go @@ -71,16 +71,18 @@ func init() { }() } - err = models.SharedMetricStatDAO.DeleteNodeItemStats(tx, nodeId, serverId, itemId, req.Time) - if err != nil { - return err - } - - for _, stat := range req.MetricStats { - err := models.SharedMetricStatDAO.CreateStat(tx, stat.Hash, clusterId, nodeId, req.ServerId, req.ItemId, stat.Keys, float64(stat.Value), req.Time, req.Version) + if len(req.MetricStats) > 0 { + err = models.SharedMetricStatDAO.DeleteNodeItemStats(tx, nodeId, serverId, itemId, req.Time, req.KeepKeys) if err != nil { return err } + + for _, stat := range req.MetricStats { + err = models.SharedMetricStatDAO.CreateStat(tx, stat.Hash, clusterId, nodeId, req.ServerId, req.ItemId, stat.Keys, float64(stat.Value), req.Time, req.Version) + if err != nil { + return err + } + } } // 保存总和