2021-06-30 20:01:00 +08:00
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package metrics
import (
"database/sql"
"encoding/json"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
2022-03-13 19:27:38 +08:00
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
2021-12-08 15:17:45 +08:00
"github.com/TeaOSLab/EdgeNode/internal/goman"
2021-06-30 20:01:00 +08:00
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
2021-11-14 10:55:09 +08:00
"github.com/TeaOSLab/EdgeNode/internal/trackers"
2021-06-30 20:01:00 +08:00
"github.com/TeaOSLab/EdgeNode/internal/utils"
2022-03-13 19:27:38 +08:00
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
2021-12-09 12:07:46 +08:00
"github.com/TeaOSLab/EdgeNode/internal/zero"
2021-06-30 20:01:00 +08:00
"github.com/iwind/TeaGo/Tea"
_ "github.com/mattn/go-sqlite3"
"os"
"strconv"
"sync"
"time"
)
2021-09-03 15:38:56 +08:00
const MaxQueueSize = 10240
2021-06-30 20:01:00 +08:00
// Task 单个指标任务
// 数据库存储:
// data/
// metric.$ID.db
// stats
// id, keys, value, time, serverId, hash
// 原理:
// 添加或者有变更时 isUploaded = false
// 上传时检查 isUploaded 状态
// 只上传每个服务中排序最前面的 N 个数据
type Task struct {
item * serverconfigs . MetricItemConfig
isLoaded bool
2022-03-13 19:27:38 +08:00
db * dbs . DB
2021-06-30 20:01:00 +08:00
statTableName string
isStopped bool
cleanTicker * utils . Ticker
uploadTicker * utils . Ticker
2021-07-01 10:39:56 +08:00
cleanVersion int32
2021-06-30 20:01:00 +08:00
insertStatStmt * sql . Stmt
deleteByVersionStmt * sql . Stmt
deleteByExpiresTimeStmt * sql . Stmt
selectTopStmt * sql . Stmt
2021-07-01 10:39:56 +08:00
sumStmt * sql . Stmt
2021-06-30 20:01:00 +08:00
2021-12-09 12:07:46 +08:00
serverIdMap map [ int64 ] zero . Zero // 所有的服务Ids
timeMap map [ string ] zero . Zero // time => bool
2021-06-30 20:01:00 +08:00
serverIdMapLocker sync . Mutex
2021-08-22 10:07:04 +08:00
statsMap map [ string ] * Stat
statsLocker sync . Mutex
statsTicker * utils . Ticker
2021-06-30 20:01:00 +08:00
}
// NewTask 获取新任务
func NewTask ( item * serverconfigs . MetricItemConfig ) * Task {
return & Task {
item : item ,
2021-12-09 12:07:46 +08:00
serverIdMap : map [ int64 ] zero . Zero { } ,
timeMap : map [ string ] zero . Zero { } ,
2021-08-22 10:07:04 +08:00
statsMap : map [ string ] * Stat { } ,
2021-06-30 20:01:00 +08:00
}
}
// Init 初始化
func ( this * Task ) Init ( ) error {
this . statTableName = "stats"
// 检查目录是否存在
var dir = Tea . Root + "/data"
_ , err := os . Stat ( dir )
if err != nil {
err = os . MkdirAll ( dir , 0777 )
if err != nil {
return err
}
remotelogs . Println ( "METRIC" , "create data dir '" + dir + "'" )
}
db , err := sql . Open ( "sqlite3" , "file:" + dir + "/metric." + strconv . FormatInt ( this . item . Id , 10 ) + ".db?cache=shared&mode=rwc&_journal_mode=WAL" )
if err != nil {
return err
}
db . SetMaxOpenConns ( 1 )
2022-03-13 19:27:38 +08:00
this . db = dbs . NewDB ( db )
if teaconst . EnableDBStat {
this . db . EnableStat ( true )
}
2021-06-30 20:01:00 +08:00
//创建统计表
_ , err = db . Exec ( ` CREATE TABLE IF NOT EXISTS " ` + this . statTableName + ` " (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT ,
"hash" varchar ( 32 ) ,
"keys" varchar ( 1024 ) ,
"value" real DEFAULT 0 ,
"time" varchar ( 32 ) ,
"serverId" integer DEFAULT 0 ,
"version" integer DEFAULT 0 ,
"isUploaded" integer DEFAULT 0
) ;
CREATE INDEX IF NOT EXISTS "serverId"
ON "` + this.statTableName + `" (
"serverId" ASC ,
"version" ASC
) ;
CREATE UNIQUE INDEX IF NOT EXISTS "hash"
ON "` + this.statTableName + `" (
"hash" ASC
) ; ` )
if err != nil {
return err
}
// insert stat stmt
this . insertStatStmt , err = db . Prepare ( ` INSERT INTO "stats" ("serverId", "hash", "keys", "value", "time", "version", "isUploaded") VALUES (?, ?, ?, ?, ?, ?, 0) ON CONFLICT("hash") DO UPDATE SET "value"="value"+?, "isUploaded"=0 ` )
if err != nil {
return err
}
// delete by version
this . deleteByVersionStmt , err = db . Prepare ( ` DELETE FROM " ` + this . statTableName + ` " WHERE "version"<? ` )
if err != nil {
return err
}
// delete by expires time
this . deleteByExpiresTimeStmt , err = db . Prepare ( ` DELETE FROM " ` + this . statTableName + ` " WHERE "time"<? ` )
if err != nil {
return err
}
// select topN stmt
2021-08-03 14:02:15 +08:00
this . selectTopStmt , err = db . Prepare ( ` SELECT "id", "hash", "keys", "value", "isUploaded" FROM " ` + this . statTableName + ` " WHERE "serverId"=? AND "version"=? AND time=? ORDER BY "value" DESC LIMIT 20 ` )
2021-07-01 10:39:56 +08:00
if err != nil {
return err
}
// sum stmt
this . sumStmt , err = db . Prepare ( ` SELECT COUNT(*), IFNULL(SUM(value), 0) FROM " ` + this . statTableName + ` " WHERE "serverId"=? AND "version"=? AND time=? ` )
2021-06-30 20:01:00 +08:00
if err != nil {
return err
}
// 所有的服务IDs
err = this . loadServerIdMap ( )
if err != nil {
return err
}
this . isLoaded = true
return nil
}
// Start 启动任务
func ( this * Task ) Start ( ) error {
// 读取数据
2021-08-22 10:07:04 +08:00
this . statsTicker = utils . NewTicker ( 1 * time . Minute )
2021-12-08 15:17:45 +08:00
goman . New ( func ( ) {
2021-08-22 10:07:04 +08:00
for this . statsTicker . Next ( ) {
2021-11-14 10:55:09 +08:00
var tr = trackers . Begin ( "[METRIC]DUMP_STATS_TO_LOCAL_DATABASE" )
2021-08-22 10:07:04 +08:00
this . statsLocker . Lock ( )
var statsMap = this . statsMap
this . statsMap = map [ string ] * Stat { }
this . statsLocker . Unlock ( )
for _ , stat := range statsMap {
err := this . InsertStat ( stat )
if err != nil {
remotelogs . Error ( "METRIC" , "insert stat failed: " + err . Error ( ) )
}
2021-06-30 20:01:00 +08:00
}
2021-11-14 10:55:09 +08:00
tr . End ( )
2021-06-30 20:01:00 +08:00
}
2021-12-08 15:17:45 +08:00
} )
2021-06-30 20:01:00 +08:00
// 清理
this . cleanTicker = utils . NewTicker ( 24 * time . Hour )
2021-12-08 15:17:45 +08:00
goman . New ( func ( ) {
2021-07-01 10:39:56 +08:00
for this . cleanTicker . Next ( ) {
2021-11-14 10:55:09 +08:00
var tr = trackers . Begin ( "[METRIC]CLEAN_EXPIRED" )
2021-06-30 20:01:00 +08:00
err := this . CleanExpired ( )
2021-11-14 10:55:09 +08:00
tr . End ( )
2021-06-30 20:01:00 +08:00
if err != nil {
remotelogs . Error ( "METRIC" , "clean expired stats failed: " + err . Error ( ) )
}
}
2021-12-08 15:17:45 +08:00
} )
2021-06-30 20:01:00 +08:00
// 上传
this . uploadTicker = utils . NewTicker ( this . item . UploadDuration ( ) )
2021-12-08 15:17:45 +08:00
goman . New ( func ( ) {
2021-07-01 10:39:56 +08:00
for this . uploadTicker . Next ( ) {
2021-11-14 10:55:09 +08:00
var tr = trackers . Begin ( "[METRIC]UPLOAD_STATS" )
2021-06-30 20:01:00 +08:00
err := this . Upload ( 1 * time . Second )
2021-11-14 10:55:09 +08:00
tr . End ( )
2021-06-30 20:01:00 +08:00
if err != nil {
remotelogs . Error ( "METRIC" , "upload stats failed: " + err . Error ( ) )
}
}
2021-12-08 15:17:45 +08:00
} )
2021-06-30 20:01:00 +08:00
return nil
}
// Add 添加数据
func ( this * Task ) Add ( obj MetricInterface ) {
if this . isStopped || ! this . isLoaded {
return
}
var keys = [ ] string { }
for _ , key := range this . item . Keys {
k := obj . MetricKey ( key )
2021-10-04 08:41:13 +08:00
// 忽略499状态
if key == "${status}" && k == "499" {
return
}
2021-06-30 20:01:00 +08:00
keys = append ( keys , k )
}
v , ok := obj . MetricValue ( this . item . Value )
if ! ok {
return
}
2021-08-22 10:07:04 +08:00
var hash = SumStat ( obj . MetricServerId ( ) , keys , this . item . CurrentTime ( ) , this . item . Version , this . item . Id )
this . statsLocker . Lock ( )
oldStat , ok := this . statsMap [ hash ]
if ok {
oldStat . Value += v
oldStat . Hash = hash
} else {
2021-09-03 15:38:56 +08:00
// 防止过载
if len ( this . statsMap ) < MaxQueueSize {
this . statsMap [ hash ] = & Stat {
ServerId : obj . MetricServerId ( ) ,
Keys : keys ,
Value : v ,
Time : this . item . CurrentTime ( ) ,
Hash : hash ,
}
2021-08-22 10:07:04 +08:00
}
2021-06-30 20:01:00 +08:00
}
2021-08-22 10:07:04 +08:00
this . statsLocker . Unlock ( )
2021-06-30 20:01:00 +08:00
}
// Stop 停止任务
func ( this * Task ) Stop ( ) error {
this . isStopped = true
if this . cleanTicker != nil {
this . cleanTicker . Stop ( )
}
if this . uploadTicker != nil {
this . uploadTicker . Stop ( )
}
2021-08-22 10:07:04 +08:00
if this . statsTicker != nil {
this . statsTicker . Stop ( )
}
2021-06-30 20:01:00 +08:00
_ = this . insertStatStmt . Close ( )
_ = this . deleteByVersionStmt . Close ( )
_ = this . deleteByExpiresTimeStmt . Close ( )
_ = this . selectTopStmt . Close ( )
2021-07-01 10:39:56 +08:00
_ = this . sumStmt . Close ( )
2021-06-30 20:01:00 +08:00
if this . db != nil {
_ = this . db . Close ( )
}
return nil
}
// InsertStat 写入数据
func ( this * Task ) InsertStat ( stat * Stat ) error {
if this . isStopped {
return nil
}
if stat == nil {
return nil
}
this . serverIdMapLocker . Lock ( )
2021-12-09 12:07:46 +08:00
this . serverIdMap [ stat . ServerId ] = zero . New ( )
this . timeMap [ stat . Time ] = zero . New ( )
2021-06-30 20:01:00 +08:00
this . serverIdMapLocker . Unlock ( )
keyData , err := json . Marshal ( stat . Keys )
if err != nil {
return err
}
2021-08-22 10:07:04 +08:00
_ , err = this . insertStatStmt . Exec ( stat . ServerId , stat . Hash , keyData , stat . Value , stat . Time , this . item . Version , stat . Value )
2021-06-30 20:01:00 +08:00
if err != nil {
return err
}
return nil
}
// CleanExpired 清理数据
func ( this * Task ) CleanExpired ( ) error {
if this . isStopped {
return nil
}
// 清除低版本数据
if this . cleanVersion < this . item . Version {
_ , err := this . deleteByVersionStmt . Exec ( this . item . Version )
if err != nil {
return err
}
this . cleanVersion = this . item . Version
}
// 清除过期的数据
2021-07-01 10:39:56 +08:00
_ , err := this . deleteByExpiresTimeStmt . Exec ( this . item . LocalExpiresTime ( ) )
2021-06-30 20:01:00 +08:00
if err != nil {
return err
}
return nil
}
// Upload 上传数据
func ( this * Task ) Upload ( pauseDuration time . Duration ) error {
if this . isStopped {
return nil
}
this . serverIdMapLocker . Lock ( )
2021-07-01 10:39:56 +08:00
// 服务IDs
2021-06-30 20:01:00 +08:00
var serverIds [ ] int64
for serverId := range this . serverIdMap {
serverIds = append ( serverIds , serverId )
}
2021-12-09 12:07:46 +08:00
this . serverIdMap = map [ int64 ] zero . Zero { } // 清空数据
2021-07-01 10:39:56 +08:00
// 时间
var times = [ ] string { }
for t := range this . timeMap {
times = append ( times , t )
}
2021-12-09 12:07:46 +08:00
this . timeMap = map [ string ] zero . Zero { } // 清空数据
2021-07-01 10:39:56 +08:00
2021-06-30 20:01:00 +08:00
this . serverIdMapLocker . Unlock ( )
rpcClient , err := rpc . SharedRPC ( )
if err != nil {
return err
}
for _ , serverId := range serverIds {
2021-07-01 10:39:56 +08:00
for _ , currentTime := range times {
idStrings , err := func ( serverId int64 , currentTime string ) ( ids [ ] string , err error ) {
rows , err := this . selectTopStmt . Query ( serverId , this . item . Version , currentTime )
2021-06-30 20:01:00 +08:00
if err != nil {
return nil , err
}
2021-07-01 10:39:56 +08:00
var isClosed bool
defer func ( ) {
if isClosed {
return
}
_ = rows . Close ( )
} ( )
var pbStats [ ] * pb . UploadingMetricStat
for rows . Next ( ) {
2021-08-03 14:02:15 +08:00
var pbStat = & pb . UploadingMetricStat { }
2021-07-01 10:39:56 +08:00
// "id", "hash", "keys", "value", "isUploaded"
var isUploaded int
var keysData [ ] byte
err = rows . Scan ( & pbStat . Id , & pbStat . Hash , & keysData , & pbStat . Value , & isUploaded )
if err != nil {
return nil , err
}
2021-08-03 14:02:15 +08:00
// TODO 先不判断是否已经上传, 需要改造API进行配合
/ * * if isUploaded == 1 {
2021-07-01 10:39:56 +08:00
continue
2021-08-03 14:02:15 +08:00
} * * /
2021-07-01 10:39:56 +08:00
if len ( keysData ) > 0 {
err = json . Unmarshal ( keysData , & pbStat . Keys )
if err != nil {
return nil , err
}
}
pbStats = append ( pbStats , pbStat )
ids = append ( ids , strconv . FormatInt ( pbStat . Id , 10 ) )
2021-06-30 20:01:00 +08:00
}
2021-07-01 10:39:56 +08:00
// 提前关闭
_ = rows . Close ( )
isClosed = true
// 上传
if len ( pbStats ) > 0 {
// 计算总和
count , total , err := this . sum ( serverId , currentTime )
if err != nil {
return nil , err
}
_ , err = rpcClient . MetricStatRPC ( ) . UploadMetricStats ( rpcClient . Context ( ) , & pb . UploadMetricStatsRequest {
MetricStats : pbStats ,
Time : currentTime ,
ServerId : serverId ,
ItemId : this . item . Id ,
Version : this . item . Version ,
Count : count ,
Total : float32 ( total ) ,
} )
2021-06-30 20:01:00 +08:00
if err != nil {
return nil , err
}
}
2021-07-01 10:39:56 +08:00
return
} ( serverId , currentTime )
if err != nil {
return err
}
2021-06-30 20:01:00 +08:00
2021-07-01 10:39:56 +08:00
if len ( idStrings ) > 0 {
// 设置为已上传
2022-03-13 19:27:38 +08:00
// TODO 先不判断是否已经上传, 需要改造API进行配合
/ * * _ , err = this . db . Exec ( ` UPDATE " ` + this . statTableName + ` " SET isUploaded=1 WHERE id IN ( ` + strings . Join ( idStrings , "," ) + ` ) ` )
2021-06-30 20:01:00 +08:00
if err != nil {
2021-07-01 10:39:56 +08:00
return err
2022-03-13 19:27:38 +08:00
} * * /
2021-06-30 20:01:00 +08:00
}
2021-07-01 10:39:56 +08:00
}
2021-06-30 20:01:00 +08:00
2021-07-01 10:39:56 +08:00
// 休息一下,防止短时间内上传数据过多
if pauseDuration > 0 {
time . Sleep ( pauseDuration )
}
}
return nil
}
// 加载服务ID
func ( this * Task ) loadServerIdMap ( ) error {
{
rows , err := this . db . Query ( ` SELECT DISTINCT "serverId" FROM ` + this . statTableName + " WHERE version=?" , this . item . Version )
2021-06-30 20:01:00 +08:00
if err != nil {
return err
}
2021-07-01 10:39:56 +08:00
defer func ( ) {
_ = rows . Close ( )
} ( )
2021-06-30 20:01:00 +08:00
2021-07-01 10:39:56 +08:00
var serverId int64
for rows . Next ( ) {
err = rows . Scan ( & serverId )
2021-06-30 20:01:00 +08:00
if err != nil {
return err
}
2021-07-01 10:39:56 +08:00
this . serverIdMapLocker . Lock ( )
2021-12-09 12:07:46 +08:00
this . serverIdMap [ serverId ] = zero . New ( )
2021-07-01 10:39:56 +08:00
this . serverIdMapLocker . Unlock ( )
2021-06-30 20:01:00 +08:00
}
2021-07-01 10:39:56 +08:00
}
2021-06-30 20:01:00 +08:00
2021-07-01 10:39:56 +08:00
{
rows , err := this . db . Query ( ` SELECT DISTINCT "time" FROM ` + this . statTableName + " WHERE version=?" , this . item . Version )
if err != nil {
return err
}
defer func ( ) {
_ = rows . Close ( )
} ( )
var timeString string
for rows . Next ( ) {
err = rows . Scan ( & timeString )
if err != nil {
return err
}
this . serverIdMapLocker . Lock ( )
2021-12-09 12:07:46 +08:00
this . timeMap [ timeString ] = zero . New ( )
2021-07-01 10:39:56 +08:00
this . serverIdMapLocker . Unlock ( )
2021-06-30 20:01:00 +08:00
}
}
return nil
}
2021-07-01 10:39:56 +08:00
// 计算数量和综合
func ( this * Task ) sum ( serverId int64 , time string ) ( count int64 , total float64 , err error ) {
rows , err := this . sumStmt . Query ( serverId , this . item . Version , time )
2021-06-30 20:01:00 +08:00
if err != nil {
2021-07-01 10:39:56 +08:00
return 0 , 0 , err
2021-06-30 20:01:00 +08:00
}
defer func ( ) {
_ = rows . Close ( )
} ( )
2021-07-01 10:39:56 +08:00
if rows . Next ( ) {
err = rows . Scan ( & count , & total )
2021-06-30 20:01:00 +08:00
if err != nil {
2021-07-01 10:39:56 +08:00
return 0 , 0 , err
2021-06-30 20:01:00 +08:00
}
}
2021-07-01 10:39:56 +08:00
return
2021-06-30 20:01:00 +08:00
}