优化指标统计写入数据逻辑

This commit is contained in:
刘祥超
2021-08-22 10:07:04 +08:00
parent be7de223af
commit 8161270f47
2 changed files with 40 additions and 35 deletions

View File

@@ -35,7 +35,6 @@ type Task struct {
db *sql.DB
statTableName string
statsChan chan *Stat
isStopped bool
cleanTicker *utils.Ticker
@@ -52,15 +51,19 @@ type Task struct {
serverIdMap map[int64]bool // 所有的服务Ids
timeMap map[string]bool // time => bool
serverIdMapLocker sync.Mutex
statsMap map[string]*Stat
statsLocker sync.Mutex
statsTicker *utils.Ticker
}
// NewTask 获取新任务
func NewTask(item *serverconfigs.MetricItemConfig) *Task {
return &Task{
item: item,
statsChan: make(chan *Stat, 40960),
serverIdMap: map[int64]bool{},
timeMap: map[string]bool{},
statsMap: map[string]*Stat{},
}
}
@@ -156,14 +159,19 @@ ON "` + this.statTableName + `" (
// Start 启动任务
func (this *Task) Start() error {
// 读取数据
this.statsTicker = utils.NewTicker(1 * time.Minute)
go func() {
for stat := range this.statsChan {
if stat == nil {
return
}
err := this.InsertStat(stat)
if err != nil {
remotelogs.Error("METRIC", "insert stat failed: "+err.Error())
for this.statsTicker.Next() {
this.statsLocker.Lock()
var statsMap = this.statsMap
this.statsMap = map[string]*Stat{}
this.statsLocker.Unlock()
for _, stat := range statsMap {
err := this.InsertStat(stat)
if err != nil {
remotelogs.Error("METRIC", "insert stat failed: "+err.Error())
}
}
}
}()
@@ -210,18 +218,22 @@ func (this *Task) Add(obj MetricInterface) {
return
}
var stat = &Stat{
ServerId: obj.MetricServerId(),
Keys: keys,
Value: v,
Time: this.item.CurrentTime(),
}
select {
case this.statsChan <- stat:
default:
// 丢弃
var hash = SumStat(obj.MetricServerId(), keys, this.item.CurrentTime(), this.item.Version, this.item.Id)
this.statsLocker.Lock()
oldStat, ok := this.statsMap[hash]
if ok {
oldStat.Value += v
oldStat.Hash = hash
} else {
this.statsMap[hash] = &Stat{
ServerId: obj.MetricServerId(),
Keys: keys,
Value: v,
Time: this.item.CurrentTime(),
Hash: hash,
}
}
this.statsLocker.Unlock()
}
// Stop 停止任务
@@ -234,6 +246,9 @@ func (this *Task) Stop() error {
if this.uploadTicker != nil {
this.uploadTicker.Stop()
}
if this.statsTicker != nil {
this.statsTicker.Stop()
}
_ = this.insertStatStmt.Close()
_ = this.deleteByVersionStmt.Close()
@@ -241,14 +256,6 @@ func (this *Task) Stop() error {
_ = this.selectTopStmt.Close()
_ = this.sumStmt.Close()
if this.statsChan != nil {
go func() {
// 延时关闭,防止关闭时写入
time.Sleep(5 * time.Second)
close(this.statsChan)
}()
}
if this.db != nil {
_ = this.db.Close()
}
@@ -274,10 +281,8 @@ func (this *Task) InsertStat(stat *Stat) error {
if err != nil {
return err
}
stat.keysData = keyData
stat.Sum(this.item.Version, this.item.Id)
_, err = this.insertStatStmt.Exec(stat.ServerId, stat.Hash, stat.keysData, stat.Value, stat.Time, this.item.Version, stat.Value)
_, err = this.insertStatStmt.Exec(stat.ServerId, stat.Hash, keyData, stat.Value, stat.Time, this.item.Version, stat.Value)
if err != nil {
return err
}