diff --git a/internal/metrics/stat.go b/internal/metrics/stat.go index 698e958..759872d 100644 --- a/internal/metrics/stat.go +++ b/internal/metrics/stat.go @@ -3,6 +3,7 @@ package metrics import ( + "encoding/json" "github.com/cespare/xxhash" "strconv" ) @@ -13,10 +14,9 @@ type Stat struct { Hash string Value int64 Time string - - keysData []byte } -func (this *Stat) Sum(version int32, itemId int64) { - this.Hash = strconv.FormatUint(xxhash.Sum64String(strconv.FormatInt(this.ServerId, 10)+"@"+string(this.keysData)+"@"+this.Time+"@"+strconv.Itoa(int(version))+"@"+strconv.FormatInt(itemId, 10)), 10) +func SumStat(serverId int64, keys []string, time string, version int32, itemId int64) string { + keysData, _ := json.Marshal(keys) + return strconv.FormatUint(xxhash.Sum64String(strconv.FormatInt(serverId, 10)+"@"+string(keysData)+"@"+time+"@"+strconv.Itoa(int(version))+"@"+strconv.FormatInt(itemId, 10)), 10) } diff --git a/internal/metrics/task.go b/internal/metrics/task.go index 525179e..57e5202 100644 --- a/internal/metrics/task.go +++ b/internal/metrics/task.go @@ -35,7 +35,6 @@ type Task struct { db *sql.DB statTableName string - statsChan chan *Stat isStopped bool cleanTicker *utils.Ticker @@ -52,15 +51,19 @@ type Task struct { serverIdMap map[int64]bool // 所有的服务Ids timeMap map[string]bool // time => bool serverIdMapLocker sync.Mutex + + statsMap map[string]*Stat + statsLocker sync.Mutex + statsTicker *utils.Ticker } // NewTask 获取新任务 func NewTask(item *serverconfigs.MetricItemConfig) *Task { return &Task{ item: item, - statsChan: make(chan *Stat, 40960), serverIdMap: map[int64]bool{}, timeMap: map[string]bool{}, + statsMap: map[string]*Stat{}, } } @@ -156,14 +159,19 @@ ON "` + this.statTableName + `" ( // Start 启动任务 func (this *Task) Start() error { // 读取数据 + this.statsTicker = utils.NewTicker(1 * time.Minute) go func() { - for stat := range this.statsChan { - if stat == nil { - return - } - err := this.InsertStat(stat) - if err != nil { - remotelogs.Error("METRIC", "insert stat failed: "+err.Error()) + for this.statsTicker.Next() { + this.statsLocker.Lock() + var statsMap = this.statsMap + this.statsMap = map[string]*Stat{} + this.statsLocker.Unlock() + + for _, stat := range statsMap { + err := this.InsertStat(stat) + if err != nil { + remotelogs.Error("METRIC", "insert stat failed: "+err.Error()) + } } } }() @@ -210,18 +218,22 @@ func (this *Task) Add(obj MetricInterface) { return } - var stat = &Stat{ - ServerId: obj.MetricServerId(), - Keys: keys, - Value: v, - Time: this.item.CurrentTime(), - } - - select { - case this.statsChan <- stat: - default: - // 丢弃 + var hash = SumStat(obj.MetricServerId(), keys, this.item.CurrentTime(), this.item.Version, this.item.Id) + this.statsLocker.Lock() + oldStat, ok := this.statsMap[hash] + if ok { + oldStat.Value += v + oldStat.Hash = hash + } else { + this.statsMap[hash] = &Stat{ + ServerId: obj.MetricServerId(), + Keys: keys, + Value: v, + Time: this.item.CurrentTime(), + Hash: hash, + } } + this.statsLocker.Unlock() } // Stop 停止任务 @@ -234,6 +246,9 @@ func (this *Task) Stop() error { if this.uploadTicker != nil { this.uploadTicker.Stop() } + if this.statsTicker != nil { + this.statsTicker.Stop() + } _ = this.insertStatStmt.Close() _ = this.deleteByVersionStmt.Close() @@ -241,14 +256,6 @@ func (this *Task) Stop() error { _ = this.selectTopStmt.Close() _ = this.sumStmt.Close() - if this.statsChan != nil { - go func() { - // 延时关闭,防止关闭时写入 - time.Sleep(5 * time.Second) - close(this.statsChan) - }() - } - if this.db != nil { _ = this.db.Close() } @@ -274,10 +281,8 @@ func (this *Task) InsertStat(stat *Stat) error { if err != nil { return err } - stat.keysData = keyData - stat.Sum(this.item.Version, this.item.Id) - _, err = this.insertStatStmt.Exec(stat.ServerId, stat.Hash, stat.keysData, stat.Value, stat.Time, this.item.Version, stat.Value) + _, err = this.insertStatStmt.Exec(stat.ServerId, stat.Hash, keyData, stat.Value, stat.Time, this.item.Version, stat.Value) if err != nil { return err }