// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. package metrics import ( "encoding/json" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils/dbs" "github.com/TeaOSLab/EdgeNode/internal/utils/goman" "github.com/TeaOSLab/EdgeNode/internal/utils/idles" "github.com/TeaOSLab/EdgeNode/internal/utils/trackers" "github.com/TeaOSLab/EdgeNode/internal/utils/zero" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/types" "os" "strconv" "sync" "time" ) const MaxQueueSize = 256 // TODO 可以配置,可以在单个任务里配置 // SQLiteTask 单个指标任务 // 数据库存储: // // data/ // metric.$ID.db // stats // id, keys, value, time, serverId, hash // 原理: // 添加或者有变更时 isUploaded = false // 上传时检查 isUploaded 状态 // 只上传每个服务中排序最前面的 N 个数据 type SQLiteTask struct { BaseTask db *dbs.DB statTableName string cleanTicker *time.Ticker uploadTicker *utils.Ticker cleanVersion int32 insertStatStmt *dbs.Stmt deleteByVersionStmt *dbs.Stmt deleteByExpiresTimeStmt *dbs.Stmt selectTopStmt *dbs.Stmt sumStmt *dbs.Stmt serverIdMap map[int64]zero.Zero // 所有的服务Ids timeMap map[string]zero.Zero // time => bool serverIdMapLocker sync.Mutex statsTicker *utils.Ticker } // NewSQLiteTask 获取新任务 func NewSQLiteTask(item *serverconfigs.MetricItemConfig) *SQLiteTask { return &SQLiteTask{ BaseTask: BaseTask{ itemConfig: item, statsMap: map[string]*Stat{}, }, serverIdMap: map[int64]zero.Zero{}, timeMap: map[string]zero.Zero{}, } } func CheckSQLiteDB(itemId int64) bool { var path = Tea.Root + "/data/metric." + types.String(itemId) + ".db" _, err := os.Stat(path) return err == nil } // Init 初始化 func (this *SQLiteTask) Init() error { this.statTableName = "stats" // 检查目录是否存在 var dir = Tea.Root + "/data" _, err := os.Stat(dir) if err != nil { err = os.MkdirAll(dir, 0777) if err != nil { return err } remotelogs.Println("METRIC", "create data dir '"+dir+"'") } var path = dir + "/metric." + types.String(this.itemConfig.Id) + ".db" db, err := dbs.OpenWriter("file:" + path + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=" + dbs.SyncMode + "&_locking_mode=EXCLUSIVE") if err != nil { return err } db.SetMaxOpenConns(1) this.db = db // 恢复数据库 var recoverEnv, _ = os.LookupEnv("EdgeRecover") if len(recoverEnv) > 0 { for _, indexName := range []string{"serverId", "hash"} { _, _ = db.Exec(`REINDEX "` + indexName + `"`) } } if teaconst.EnableDBStat { this.db.EnableStat(true) } //创建统计表 _, err = db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.statTableName + `" ( "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "hash" varchar(32), "keys" varchar(1024), "value" real DEFAULT 0, "time" varchar(32), "serverId" integer DEFAULT 0, "version" integer DEFAULT 0, "isUploaded" integer DEFAULT 0 ); CREATE INDEX IF NOT EXISTS "serverId" ON "` + this.statTableName + `" ( "serverId" ASC, "version" ASC ); CREATE UNIQUE INDEX IF NOT EXISTS "hash" ON "` + this.statTableName + `" ( "hash" ASC );`) if err != nil { return err } // insert stat stmt this.insertStatStmt, err = db.Prepare(`INSERT INTO "stats" ("serverId", "hash", "keys", "value", "time", "version", "isUploaded") VALUES (?, ?, ?, ?, ?, ?, 0) ON CONFLICT("hash") DO UPDATE SET "value"="value"+?, "isUploaded"=0`) if err != nil { return err } // delete by version this.deleteByVersionStmt, err = db.Prepare(`DELETE FROM "` + this.statTableName + `" WHERE "version" 0 { err = json.Unmarshal(keysData, &pbStat.Keys) if err != nil { return nil, err } } pbStats = append(pbStats, pbStat) ids = append(ids, strconv.FormatInt(pbStat.Id, 10)) } // 提前关闭 _ = rows.Close() isClosed = true // 上传 if len(pbStats) > 0 { // 计算总和 count, total, err := this.sum(serverId, currentTime) if err != nil { return nil, err } _, err = rpcClient.MetricStatRPC.UploadMetricStats(rpcClient.Context(), &pb.UploadMetricStatsRequest{ MetricStats: pbStats, Time: currentTime, ServerId: serverId, ItemId: this.itemConfig.Id, Version: this.itemConfig.Version, Count: count, Total: float32(total), }) if err != nil { return nil, err } } return }(serverId, currentTime) if err != nil { return err } if len(idStrings) > 0 { // 设置为已上传 // TODO 先不判断是否已经上传,需要改造API进行配合 /**_, err = this.db.Exec(`UPDATE "` + this.statTableName + `" SET isUploaded=1 WHERE id IN (` + strings.Join(idStrings, ",") + `)`) if err != nil { return err }**/ } } // 休息一下,防止短时间内上传数据过多 if pauseDuration > 0 { time.Sleep(pauseDuration) uploadTr.Add(-pauseDuration) } } return nil } // 加载服务ID func (this *SQLiteTask) loadServerIdMap() error { { rows, err := this.db.Query(`SELECT DISTINCT "serverId" FROM `+this.statTableName+" WHERE version=?", this.itemConfig.Version) if err != nil { return err } defer func() { _ = rows.Close() }() var serverId int64 for rows.Next() { err = rows.Scan(&serverId) if err != nil { return err } this.serverIdMapLocker.Lock() this.serverIdMap[serverId] = zero.New() this.serverIdMapLocker.Unlock() } } { rows, err := this.db.Query(`SELECT DISTINCT "time" FROM `+this.statTableName+" WHERE version=?", this.itemConfig.Version) if err != nil { return err } defer func() { _ = rows.Close() }() var timeString string for rows.Next() { err = rows.Scan(&timeString) if err != nil { return err } this.serverIdMapLocker.Lock() this.timeMap[timeString] = zero.New() this.serverIdMapLocker.Unlock() } } return nil } // 计算数量和综合 func (this *SQLiteTask) sum(serverId int64, time string) (count int64, total float64, err error) { rows, err := this.sumStmt.Query(serverId, this.itemConfig.Version, time) if err != nil { return 0, 0, err } defer func() { _ = rows.Close() }() if rows.Next() { err = rows.Scan(&count, &total) if err != nil { return 0, 0, err } } return }