Files
EdgeNode/internal/metrics/task.go

545 lines
12 KiB
Go
Raw Normal View History

2021-06-30 20:01:00 +08:00
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package metrics
import (
"encoding/json"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/goman"
2021-06-30 20:01:00 +08:00
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/trackers"
2021-06-30 20:01:00 +08:00
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
"github.com/TeaOSLab/EdgeNode/internal/zero"
2021-06-30 20:01:00 +08:00
"github.com/iwind/TeaGo/Tea"
2022-09-02 16:12:58 +08:00
"github.com/iwind/TeaGo/types"
2021-06-30 20:01:00 +08:00
"os"
"strconv"
"sync"
2023-08-15 20:12:09 +08:00
"sync/atomic"
2021-06-30 20:01:00 +08:00
"time"
)
2022-08-14 16:28:40 +08:00
const MaxQueueSize = 256 // TODO 可以配置,可以在单个任务里配置
2021-06-30 20:01:00 +08:00
// Task 单个指标任务
// 数据库存储:
2022-08-14 16:28:40 +08:00
//
// data/
// metric.$ID.db
// stats
// id, keys, value, time, serverId, hash
// 原理:
// 添加或者有变更时 isUploaded = false
// 上传时检查 isUploaded 状态
// 只上传每个服务中排序最前面的 N 个数据
2021-06-30 20:01:00 +08:00
type Task struct {
item *serverconfigs.MetricItemConfig
isLoaded bool
db *dbs.DB
2021-06-30 20:01:00 +08:00
statTableName string
isStopped bool
cleanTicker *utils.Ticker
uploadTicker *utils.Ticker
2021-07-01 10:39:56 +08:00
cleanVersion int32
2021-06-30 20:01:00 +08:00
2023-03-07 16:22:32 +08:00
insertStatStmt *dbs.Stmt
deleteByVersionStmt *dbs.Stmt
deleteByExpiresTimeStmt *dbs.Stmt
selectTopStmt *dbs.Stmt
sumStmt *dbs.Stmt
2021-06-30 20:01:00 +08:00
serverIdMap map[int64]zero.Zero // 所有的服务Ids
timeMap map[string]zero.Zero // time => bool
2021-06-30 20:01:00 +08:00
serverIdMapLocker sync.Mutex
2021-08-22 10:07:04 +08:00
2022-05-18 21:41:34 +08:00
statsMap map[string]*Stat // 待写入队列hash => *Stat
2023-08-15 20:12:09 +08:00
statsLocker sync.RWMutex
2021-08-22 10:07:04 +08:00
statsTicker *utils.Ticker
2021-06-30 20:01:00 +08:00
}
// NewTask 获取新任务
func NewTask(item *serverconfigs.MetricItemConfig) *Task {
return &Task{
item: item,
serverIdMap: map[int64]zero.Zero{},
timeMap: map[string]zero.Zero{},
2021-08-22 10:07:04 +08:00
statsMap: map[string]*Stat{},
2021-06-30 20:01:00 +08:00
}
}
// Init 初始化
func (this *Task) Init() error {
this.statTableName = "stats"
// 检查目录是否存在
var dir = Tea.Root + "/data"
_, err := os.Stat(dir)
if err != nil {
err = os.MkdirAll(dir, 0777)
if err != nil {
return err
}
remotelogs.Println("METRIC", "create data dir '"+dir+"'")
}
2022-09-02 16:12:58 +08:00
var path = dir + "/metric." + types.String(this.item.Id) + ".db"
2023-03-07 16:22:32 +08:00
db, err := dbs.OpenWriter("file:" + path + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF&_locking_mode=EXCLUSIVE")
2021-06-30 20:01:00 +08:00
if err != nil {
return err
}
db.SetMaxOpenConns(1)
2023-03-07 16:22:32 +08:00
this.db = db
// 恢复数据库
var recoverEnv, _ = os.LookupEnv("EdgeRecover")
if len(recoverEnv) > 0 {
for _, indexName := range []string{"serverId", "hash"} {
_, _ = db.Exec(`REINDEX "` + indexName + `"`)
}
}
if teaconst.EnableDBStat {
this.db.EnableStat(true)
}
2021-06-30 20:01:00 +08:00
//创建统计表
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.statTableName + `" (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"hash" varchar(32),
"keys" varchar(1024),
"value" real DEFAULT 0,
"time" varchar(32),
"serverId" integer DEFAULT 0,
"version" integer DEFAULT 0,
"isUploaded" integer DEFAULT 0
);
CREATE INDEX IF NOT EXISTS "serverId"
ON "` + this.statTableName + `" (
"serverId" ASC,
"version" ASC
);
CREATE UNIQUE INDEX IF NOT EXISTS "hash"
ON "` + this.statTableName + `" (
"hash" ASC
);`)
if err != nil {
return err
}
// insert stat stmt
this.insertStatStmt, err = db.Prepare(`INSERT INTO "stats" ("serverId", "hash", "keys", "value", "time", "version", "isUploaded") VALUES (?, ?, ?, ?, ?, ?, 0) ON CONFLICT("hash") DO UPDATE SET "value"="value"+?, "isUploaded"=0`)
if err != nil {
return err
}
// delete by version
this.deleteByVersionStmt, err = db.Prepare(`DELETE FROM "` + this.statTableName + `" WHERE "version"<?`)
if err != nil {
return err
}
// delete by expires time
this.deleteByExpiresTimeStmt, err = db.Prepare(`DELETE FROM "` + this.statTableName + `" WHERE "time"<?`)
if err != nil {
return err
}
// select topN stmt
this.selectTopStmt, err = db.Prepare(`SELECT "id", "hash", "keys", "value", "isUploaded" FROM "` + this.statTableName + `" WHERE "serverId"=? AND "version"=? AND time=? ORDER BY "value" DESC LIMIT 20`)
2021-07-01 10:39:56 +08:00
if err != nil {
return err
}
// sum stmt
this.sumStmt, err = db.Prepare(`SELECT COUNT(*), IFNULL(SUM(value), 0) FROM "` + this.statTableName + `" WHERE "serverId"=? AND "version"=? AND time=?`)
2021-06-30 20:01:00 +08:00
if err != nil {
return err
}
// 所有的服务IDs
err = this.loadServerIdMap()
if err != nil {
return err
}
this.isLoaded = true
return nil
}
// Start 启动任务
func (this *Task) Start() error {
// 读取数据
2021-08-22 10:07:04 +08:00
this.statsTicker = utils.NewTicker(1 * time.Minute)
goman.New(func() {
2021-08-22 10:07:04 +08:00
for this.statsTicker.Next() {
var tr = trackers.Begin("[METRIC]DUMP_STATS_TO_LOCAL_DATABASE")
2021-08-22 10:07:04 +08:00
this.statsLocker.Lock()
var statsMap = this.statsMap
this.statsMap = map[string]*Stat{}
this.statsLocker.Unlock()
for _, stat := range statsMap {
err := this.InsertStat(stat)
if err != nil {
remotelogs.Error("METRIC", "insert stat failed: "+err.Error())
}
2021-06-30 20:01:00 +08:00
}
tr.End()
2021-06-30 20:01:00 +08:00
}
})
2021-06-30 20:01:00 +08:00
// 清理
this.cleanTicker = utils.NewTicker(24 * time.Hour)
goman.New(func() {
2021-07-01 10:39:56 +08:00
for this.cleanTicker.Next() {
var tr = trackers.Begin("[METRIC]CLEAN_EXPIRED")
2021-06-30 20:01:00 +08:00
err := this.CleanExpired()
tr.End()
2021-06-30 20:01:00 +08:00
if err != nil {
remotelogs.Error("METRIC", "clean expired stats failed: "+err.Error())
}
}
})
2021-06-30 20:01:00 +08:00
// 上传
this.uploadTicker = utils.NewTicker(this.item.UploadDuration())
goman.New(func() {
2021-07-01 10:39:56 +08:00
for this.uploadTicker.Next() {
var tr = trackers.Begin("[METRIC]UPLOAD_STATS")
2021-06-30 20:01:00 +08:00
err := this.Upload(1 * time.Second)
tr.End()
2022-07-05 20:37:00 +08:00
if err != nil && !rpc.IsConnError(err) {
2021-06-30 20:01:00 +08:00
remotelogs.Error("METRIC", "upload stats failed: "+err.Error())
}
}
})
2021-06-30 20:01:00 +08:00
return nil
}
// Add 添加数据
func (this *Task) Add(obj MetricInterface) {
if this.isStopped || !this.isLoaded {
return
}
var keys = []string{}
for _, key := range this.item.Keys {
2023-08-15 20:12:09 +08:00
var k = obj.MetricKey(key)
// 忽略499状态
if key == "${status}" && k == "499" {
return
}
2021-06-30 20:01:00 +08:00
keys = append(keys, k)
}
v, ok := obj.MetricValue(this.item.Value)
if !ok {
return
}
2021-08-22 10:07:04 +08:00
var hash = SumStat(obj.MetricServerId(), keys, this.item.CurrentTime(), this.item.Version, this.item.Id)
2023-08-15 20:12:09 +08:00
var countItems int
this.statsLocker.RLock()
2021-08-22 10:07:04 +08:00
oldStat, ok := this.statsMap[hash]
2023-08-15 20:12:09 +08:00
if !ok {
countItems = len(this.statsMap)
}
this.statsLocker.RUnlock()
2021-08-22 10:07:04 +08:00
if ok {
2023-08-15 20:12:09 +08:00
atomic.AddInt64(&oldStat.Value, 1)
2021-08-22 10:07:04 +08:00
} else {
// 防止过载
2023-08-15 20:12:09 +08:00
if countItems < MaxQueueSize {
this.statsLocker.Lock()
this.statsMap[hash] = &Stat{
ServerId: obj.MetricServerId(),
Keys: keys,
Value: v,
Time: this.item.CurrentTime(),
Hash: hash,
}
2023-08-15 20:12:09 +08:00
this.statsLocker.Unlock()
2021-08-22 10:07:04 +08:00
}
2021-06-30 20:01:00 +08:00
}
}
// Stop 停止任务
func (this *Task) Stop() error {
this.isStopped = true
if this.cleanTicker != nil {
this.cleanTicker.Stop()
}
if this.uploadTicker != nil {
this.uploadTicker.Stop()
}
2021-08-22 10:07:04 +08:00
if this.statsTicker != nil {
this.statsTicker.Stop()
}
2021-06-30 20:01:00 +08:00
_ = this.insertStatStmt.Close()
_ = this.deleteByVersionStmt.Close()
_ = this.deleteByExpiresTimeStmt.Close()
_ = this.selectTopStmt.Close()
2021-07-01 10:39:56 +08:00
_ = this.sumStmt.Close()
2021-06-30 20:01:00 +08:00
if this.db != nil {
_ = this.db.Close()
}
return nil
}
// InsertStat 写入数据
func (this *Task) InsertStat(stat *Stat) error {
if this.isStopped {
return nil
}
if stat == nil {
return nil
}
this.serverIdMapLocker.Lock()
this.serverIdMap[stat.ServerId] = zero.New()
this.timeMap[stat.Time] = zero.New()
2021-06-30 20:01:00 +08:00
this.serverIdMapLocker.Unlock()
keyData, err := json.Marshal(stat.Keys)
if err != nil {
return err
}
2021-08-22 10:07:04 +08:00
_, err = this.insertStatStmt.Exec(stat.ServerId, stat.Hash, keyData, stat.Value, stat.Time, this.item.Version, stat.Value)
2021-06-30 20:01:00 +08:00
if err != nil {
return err
}
return nil
}
// CleanExpired 清理数据
func (this *Task) CleanExpired() error {
if this.isStopped {
return nil
}
// 清除低版本数据
if this.cleanVersion < this.item.Version {
_, err := this.deleteByVersionStmt.Exec(this.item.Version)
if err != nil {
return err
}
this.cleanVersion = this.item.Version
}
// 清除过期的数据
2021-07-01 10:39:56 +08:00
_, err := this.deleteByExpiresTimeStmt.Exec(this.item.LocalExpiresTime())
2021-06-30 20:01:00 +08:00
if err != nil {
return err
}
return nil
}
// Upload 上传数据
func (this *Task) Upload(pauseDuration time.Duration) error {
if this.isStopped {
return nil
}
this.serverIdMapLocker.Lock()
2021-07-01 10:39:56 +08:00
// 服务IDs
2021-06-30 20:01:00 +08:00
var serverIds []int64
for serverId := range this.serverIdMap {
serverIds = append(serverIds, serverId)
}
this.serverIdMap = map[int64]zero.Zero{} // 清空数据
2021-07-01 10:39:56 +08:00
// 时间
var times = []string{}
for t := range this.timeMap {
times = append(times, t)
}
this.timeMap = map[string]zero.Zero{} // 清空数据
2021-07-01 10:39:56 +08:00
2021-06-30 20:01:00 +08:00
this.serverIdMapLocker.Unlock()
rpcClient, err := rpc.SharedRPC()
if err != nil {
return err
}
for _, serverId := range serverIds {
2021-07-01 10:39:56 +08:00
for _, currentTime := range times {
idStrings, err := func(serverId int64, currentTime string) (ids []string, err error) {
2022-08-14 16:28:40 +08:00
var t = trackers.Begin("[METRIC]SELECT_TOP_STMT")
2021-07-01 10:39:56 +08:00
rows, err := this.selectTopStmt.Query(serverId, this.item.Version, currentTime)
2022-08-14 16:28:40 +08:00
t.End()
2021-06-30 20:01:00 +08:00
if err != nil {
return nil, err
}
2021-07-01 10:39:56 +08:00
var isClosed bool
defer func() {
if isClosed {
return
}
_ = rows.Close()
}()
var pbStats []*pb.UploadingMetricStat
for rows.Next() {
var pbStat = &pb.UploadingMetricStat{}
2021-07-01 10:39:56 +08:00
// "id", "hash", "keys", "value", "isUploaded"
var isUploaded int
var keysData []byte
err = rows.Scan(&pbStat.Id, &pbStat.Hash, &keysData, &pbStat.Value, &isUploaded)
if err != nil {
return nil, err
}
// TODO 先不判断是否已经上传需要改造API进行配合
/**if isUploaded == 1 {
2021-07-01 10:39:56 +08:00
continue
}**/
2021-07-01 10:39:56 +08:00
if len(keysData) > 0 {
err = json.Unmarshal(keysData, &pbStat.Keys)
if err != nil {
return nil, err
}
}
pbStats = append(pbStats, pbStat)
ids = append(ids, strconv.FormatInt(pbStat.Id, 10))
2021-06-30 20:01:00 +08:00
}
2021-07-01 10:39:56 +08:00
// 提前关闭
_ = rows.Close()
isClosed = true
// 上传
if len(pbStats) > 0 {
// 计算总和
count, total, err := this.sum(serverId, currentTime)
if err != nil {
return nil, err
}
2022-08-24 20:04:46 +08:00
_, err = rpcClient.MetricStatRPC.UploadMetricStats(rpcClient.Context(), &pb.UploadMetricStatsRequest{
2021-07-01 10:39:56 +08:00
MetricStats: pbStats,
Time: currentTime,
ServerId: serverId,
ItemId: this.item.Id,
Version: this.item.Version,
Count: count,
Total: float32(total),
})
2021-06-30 20:01:00 +08:00
if err != nil {
return nil, err
}
}
2021-07-01 10:39:56 +08:00
return
}(serverId, currentTime)
if err != nil {
return err
}
2021-06-30 20:01:00 +08:00
2021-07-01 10:39:56 +08:00
if len(idStrings) > 0 {
// 设置为已上传
// TODO 先不判断是否已经上传需要改造API进行配合
/**_, err = this.db.Exec(`UPDATE "` + this.statTableName + `" SET isUploaded=1 WHERE id IN (` + strings.Join(idStrings, ",") + `)`)
2021-06-30 20:01:00 +08:00
if err != nil {
2021-07-01 10:39:56 +08:00
return err
}**/
2021-06-30 20:01:00 +08:00
}
2021-07-01 10:39:56 +08:00
}
2021-06-30 20:01:00 +08:00
2021-07-01 10:39:56 +08:00
// 休息一下,防止短时间内上传数据过多
if pauseDuration > 0 {
time.Sleep(pauseDuration)
}
}
return nil
}
// 加载服务ID
func (this *Task) loadServerIdMap() error {
{
rows, err := this.db.Query(`SELECT DISTINCT "serverId" FROM `+this.statTableName+" WHERE version=?", this.item.Version)
2021-06-30 20:01:00 +08:00
if err != nil {
return err
}
2021-07-01 10:39:56 +08:00
defer func() {
_ = rows.Close()
}()
2021-06-30 20:01:00 +08:00
2021-07-01 10:39:56 +08:00
var serverId int64
for rows.Next() {
err = rows.Scan(&serverId)
2021-06-30 20:01:00 +08:00
if err != nil {
return err
}
2021-07-01 10:39:56 +08:00
this.serverIdMapLocker.Lock()
this.serverIdMap[serverId] = zero.New()
2021-07-01 10:39:56 +08:00
this.serverIdMapLocker.Unlock()
2021-06-30 20:01:00 +08:00
}
2021-07-01 10:39:56 +08:00
}
2021-06-30 20:01:00 +08:00
2021-07-01 10:39:56 +08:00
{
rows, err := this.db.Query(`SELECT DISTINCT "time" FROM `+this.statTableName+" WHERE version=?", this.item.Version)
if err != nil {
return err
}
defer func() {
_ = rows.Close()
}()
var timeString string
for rows.Next() {
err = rows.Scan(&timeString)
if err != nil {
return err
}
this.serverIdMapLocker.Lock()
this.timeMap[timeString] = zero.New()
2021-07-01 10:39:56 +08:00
this.serverIdMapLocker.Unlock()
2021-06-30 20:01:00 +08:00
}
}
return nil
}
2021-07-01 10:39:56 +08:00
// 计算数量和综合
func (this *Task) sum(serverId int64, time string) (count int64, total float64, err error) {
rows, err := this.sumStmt.Query(serverId, this.item.Version, time)
2021-06-30 20:01:00 +08:00
if err != nil {
2021-07-01 10:39:56 +08:00
return 0, 0, err
2021-06-30 20:01:00 +08:00
}
defer func() {
_ = rows.Close()
}()
2021-07-01 10:39:56 +08:00
if rows.Next() {
err = rows.Scan(&count, &total)
2021-06-30 20:01:00 +08:00
if err != nil {
2021-07-01 10:39:56 +08:00
return 0, 0, err
2021-06-30 20:01:00 +08:00
}
}
2021-07-01 10:39:56 +08:00
return
2021-06-30 20:01:00 +08:00
}