mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-19 04:10:30 +08:00
指标数据增加总和数据
This commit is contained in:
@@ -41,14 +41,16 @@ type Task struct {
|
||||
cleanTicker *utils.Ticker
|
||||
uploadTicker *utils.Ticker
|
||||
|
||||
cleanVersion int
|
||||
cleanVersion int32
|
||||
|
||||
insertStatStmt *sql.Stmt
|
||||
deleteByVersionStmt *sql.Stmt
|
||||
deleteByExpiresTimeStmt *sql.Stmt
|
||||
selectTopStmt *sql.Stmt
|
||||
sumStmt *sql.Stmt
|
||||
|
||||
serverIdMap map[int64]bool // 所有的服务Ids
|
||||
serverIdMap map[int64]bool // 所有的服务Ids
|
||||
timeMap map[string]bool // time => bool
|
||||
serverIdMapLocker sync.Mutex
|
||||
}
|
||||
|
||||
@@ -58,6 +60,7 @@ func NewTask(item *serverconfigs.MetricItemConfig) *Task {
|
||||
item: item,
|
||||
statsChan: make(chan *Stat, 40960),
|
||||
serverIdMap: map[int64]bool{},
|
||||
timeMap: map[string]bool{},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -128,7 +131,13 @@ ON "` + this.statTableName + `" (
|
||||
}
|
||||
|
||||
// select topN stmt
|
||||
this.selectTopStmt, err = db.Prepare(`SELECT "id", "serverId", "hash", "keys", "value", "time", "version", "isUploaded" FROM "` + this.statTableName + `" WHERE "serverId"=? AND "version"=? ORDER BY "value" DESC LIMIT 100`)
|
||||
this.selectTopStmt, err = db.Prepare(`SELECT "id", "hash", "keys", "value", "isUploaded" FROM "` + this.statTableName + `" WHERE "serverId"=? AND "version"=? AND time=? ORDER BY "value" DESC LIMIT 100`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// sum stmt
|
||||
this.sumStmt, err = db.Prepare(`SELECT COUNT(*), IFNULL(SUM(value), 0) FROM "` + this.statTableName + `" WHERE "serverId"=? AND "version"=? AND time=?`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -162,7 +171,7 @@ func (this *Task) Start() error {
|
||||
// 清理
|
||||
this.cleanTicker = utils.NewTicker(24 * time.Hour)
|
||||
go func() {
|
||||
if this.cleanTicker.Next() {
|
||||
for this.cleanTicker.Next() {
|
||||
err := this.CleanExpired()
|
||||
if err != nil {
|
||||
remotelogs.Error("METRIC", "clean expired stats failed: "+err.Error())
|
||||
@@ -173,7 +182,7 @@ func (this *Task) Start() error {
|
||||
// 上传
|
||||
this.uploadTicker = utils.NewTicker(this.item.UploadDuration())
|
||||
go func() {
|
||||
if this.uploadTicker.Next() {
|
||||
for this.uploadTicker.Next() {
|
||||
err := this.Upload(1 * time.Second)
|
||||
if err != nil {
|
||||
remotelogs.Error("METRIC", "upload stats failed: "+err.Error())
|
||||
@@ -230,6 +239,7 @@ func (this *Task) Stop() error {
|
||||
_ = this.deleteByVersionStmt.Close()
|
||||
_ = this.deleteByExpiresTimeStmt.Close()
|
||||
_ = this.selectTopStmt.Close()
|
||||
_ = this.sumStmt.Close()
|
||||
|
||||
if this.statsChan != nil {
|
||||
go func() {
|
||||
@@ -257,6 +267,7 @@ func (this *Task) InsertStat(stat *Stat) error {
|
||||
|
||||
this.serverIdMapLocker.Lock()
|
||||
this.serverIdMap[stat.ServerId] = true
|
||||
this.timeMap[stat.Time] = true
|
||||
this.serverIdMapLocker.Unlock()
|
||||
|
||||
keyData, err := json.Marshal(stat.Keys)
|
||||
@@ -289,7 +300,7 @@ func (this *Task) CleanExpired() error {
|
||||
}
|
||||
|
||||
// 清除过期的数据
|
||||
_, err := this.deleteByExpiresTimeStmt.Exec(this.item.ExpiresTime())
|
||||
_, err := this.deleteByExpiresTimeStmt.Exec(this.item.LocalExpiresTime())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -304,11 +315,21 @@ func (this *Task) Upload(pauseDuration time.Duration) error {
|
||||
}
|
||||
|
||||
this.serverIdMapLocker.Lock()
|
||||
|
||||
// 服务IDs
|
||||
var serverIds []int64
|
||||
for serverId := range this.serverIdMap {
|
||||
serverIds = append(serverIds, serverId)
|
||||
}
|
||||
this.serverIdMap = map[int64]bool{} // 清空数据
|
||||
|
||||
// 时间
|
||||
var times = []string{}
|
||||
for t := range this.timeMap {
|
||||
times = append(times, t)
|
||||
}
|
||||
this.timeMap = map[string]bool{} // 清空数据
|
||||
|
||||
this.serverIdMapLocker.Unlock()
|
||||
|
||||
rpcClient, err := rpc.SharedRPC()
|
||||
@@ -317,72 +338,87 @@ func (this *Task) Upload(pauseDuration time.Duration) error {
|
||||
}
|
||||
|
||||
for _, serverId := range serverIds {
|
||||
idStrings, err := func(serverId int64) (ids []string, err error) {
|
||||
rows, err := this.selectTopStmt.Query(serverId, this.item.Version)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var isClosed bool
|
||||
defer func() {
|
||||
if isClosed {
|
||||
return
|
||||
}
|
||||
_ = rows.Close()
|
||||
}()
|
||||
|
||||
var pbStats []*pb.MetricStat
|
||||
for rows.Next() {
|
||||
var pbStat = &pb.MetricStat{
|
||||
ItemId: this.item.Id,
|
||||
}
|
||||
// "id", "serverId", "hash", "keys", "value", "time", "version", "isUploaded"
|
||||
var isUploaded int
|
||||
var keysData []byte
|
||||
err = rows.Scan(&pbStat.Id, &pbStat.ServerId, &pbStat.Hash, &keysData, &pbStat.Value, &pbStat.Time, &pbStat.Version, &isUploaded)
|
||||
for _, currentTime := range times {
|
||||
idStrings, err := func(serverId int64, currentTime string) (ids []string, err error) {
|
||||
rows, err := this.selectTopStmt.Query(serverId, this.item.Version, currentTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if isUploaded == 1 {
|
||||
continue
|
||||
var isClosed bool
|
||||
defer func() {
|
||||
if isClosed {
|
||||
return
|
||||
}
|
||||
_ = rows.Close()
|
||||
}()
|
||||
|
||||
var pbStats []*pb.UploadingMetricStat
|
||||
for rows.Next() {
|
||||
var pbStat = &pb.UploadingMetricStat{
|
||||
}
|
||||
// "id", "hash", "keys", "value", "isUploaded"
|
||||
var isUploaded int
|
||||
var keysData []byte
|
||||
err = rows.Scan(&pbStat.Id, &pbStat.Hash, &keysData, &pbStat.Value, &isUploaded)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if isUploaded == 1 {
|
||||
continue
|
||||
}
|
||||
if len(keysData) > 0 {
|
||||
err = json.Unmarshal(keysData, &pbStat.Keys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
pbStats = append(pbStats, pbStat)
|
||||
ids = append(ids, strconv.FormatInt(pbStat.Id, 10))
|
||||
}
|
||||
if len(keysData) > 0 {
|
||||
err = json.Unmarshal(keysData, &pbStat.Keys)
|
||||
|
||||
// 提前关闭
|
||||
_ = rows.Close()
|
||||
isClosed = true
|
||||
|
||||
// 上传
|
||||
if len(pbStats) > 0 {
|
||||
// 计算总和
|
||||
count, total, err := this.sum(serverId, currentTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err = rpcClient.MetricStatRPC().UploadMetricStats(rpcClient.Context(), &pb.UploadMetricStatsRequest{
|
||||
MetricStats: pbStats,
|
||||
Time: currentTime,
|
||||
ServerId: serverId,
|
||||
ItemId: this.item.Id,
|
||||
Version: this.item.Version,
|
||||
Count: count,
|
||||
Total: float32(total),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
pbStats = append(pbStats, pbStat)
|
||||
ids = append(ids, strconv.FormatInt(pbStat.Id, 10))
|
||||
}
|
||||
|
||||
// 提前关闭
|
||||
_ = rows.Close()
|
||||
isClosed = true
|
||||
|
||||
// 上传
|
||||
if len(pbStats) > 0 {
|
||||
_, err = rpcClient.MetricStatRPC().UploadMetricStats(rpcClient.Context(), &pb.UploadMetricStatsRequest{MetricStats: pbStats})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}(serverId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(idStrings) > 0 {
|
||||
// 设置为已上传
|
||||
_, err = this.db.Exec(`UPDATE "` + this.statTableName + `" SET isUploaded=1 WHERE id IN (` + strings.Join(idStrings, ",") + `)`)
|
||||
return
|
||||
}(serverId, currentTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(idStrings) > 0 {
|
||||
// 设置为已上传
|
||||
_, err = this.db.Exec(`UPDATE "` + this.statTableName + `" SET isUploaded=1 WHERE id IN (` + strings.Join(idStrings, ",") + `)`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 休息一下,防止短时间内上传数据过多
|
||||
if pauseDuration > 0 && len(idStrings) > 0 {
|
||||
if pauseDuration > 0 {
|
||||
time.Sleep(pauseDuration)
|
||||
}
|
||||
}
|
||||
@@ -392,23 +428,65 @@ func (this *Task) Upload(pauseDuration time.Duration) error {
|
||||
|
||||
// 加载服务ID
|
||||
func (this *Task) loadServerIdMap() error {
|
||||
rows, err := this.db.Query(`SELECT DISTINCT "serverId" FROM `+this.statTableName+" WHERE version=?", this.item.Version)
|
||||
{
|
||||
rows, err := this.db.Query(`SELECT DISTINCT "serverId" FROM `+this.statTableName+" WHERE version=?", this.item.Version)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
_ = rows.Close()
|
||||
}()
|
||||
|
||||
var serverId int64
|
||||
for rows.Next() {
|
||||
err = rows.Scan(&serverId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
this.serverIdMapLocker.Lock()
|
||||
this.serverIdMap[serverId] = true
|
||||
this.serverIdMapLocker.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
rows, err := this.db.Query(`SELECT DISTINCT "time" FROM `+this.statTableName+" WHERE version=?", this.item.Version)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
_ = rows.Close()
|
||||
}()
|
||||
|
||||
var timeString string
|
||||
for rows.Next() {
|
||||
err = rows.Scan(&timeString)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
this.serverIdMapLocker.Lock()
|
||||
this.timeMap[timeString] = true
|
||||
this.serverIdMapLocker.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// 计算数量和综合
|
||||
func (this *Task) sum(serverId int64, time string) (count int64, total float64, err error) {
|
||||
rows, err := this.sumStmt.Query(serverId, this.item.Version, time)
|
||||
if err != nil {
|
||||
return err
|
||||
return 0, 0, err
|
||||
}
|
||||
defer func() {
|
||||
_ = rows.Close()
|
||||
}()
|
||||
|
||||
var serverId int64
|
||||
for rows.Next() {
|
||||
err = rows.Scan(&serverId)
|
||||
if rows.Next() {
|
||||
err = rows.Scan(&count, &total)
|
||||
if err != nil {
|
||||
return err
|
||||
return 0, 0, err
|
||||
}
|
||||
this.serverIdMapLocker.Lock()
|
||||
this.serverIdMap[serverId] = true
|
||||
this.serverIdMapLocker.Unlock()
|
||||
}
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user