// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. package metrics import ( "database/sql" "encoding/json" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/iwind/TeaGo/Tea" _ "github.com/mattn/go-sqlite3" "os" "strconv" "strings" "sync" "time" ) // Task 单个指标任务 // 数据库存储: // data/ // metric.$ID.db // stats // id, keys, value, time, serverId, hash // 原理: // 添加或者有变更时 isUploaded = false // 上传时检查 isUploaded 状态 // 只上传每个服务中排序最前面的 N 个数据 type Task struct { item *serverconfigs.MetricItemConfig isLoaded bool db *sql.DB statTableName string statsChan chan *Stat isStopped bool cleanTicker *utils.Ticker uploadTicker *utils.Ticker cleanVersion int insertStatStmt *sql.Stmt deleteByVersionStmt *sql.Stmt deleteByExpiresTimeStmt *sql.Stmt selectTopStmt *sql.Stmt serverIdMap map[int64]bool // 所有的服务Ids serverIdMapLocker sync.Mutex } // NewTask 获取新任务 func NewTask(item *serverconfigs.MetricItemConfig) *Task { return &Task{ item: item, statsChan: make(chan *Stat, 40960), serverIdMap: map[int64]bool{}, } } // Init 初始化 func (this *Task) Init() error { this.statTableName = "stats" // 检查目录是否存在 var dir = Tea.Root + "/data" _, err := os.Stat(dir) if err != nil { err = os.MkdirAll(dir, 0777) if err != nil { return err } remotelogs.Println("METRIC", "create data dir '"+dir+"'") } db, err := sql.Open("sqlite3", "file:"+dir+"/metric."+strconv.FormatInt(this.item.Id, 10)+".db?cache=shared&mode=rwc&_journal_mode=WAL") if err != nil { return err } db.SetMaxOpenConns(1) this.db = db //创建统计表 _, err = db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.statTableName + `" ( "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "hash" varchar(32), "keys" varchar(1024), "value" real DEFAULT 0, "time" varchar(32), "serverId" integer DEFAULT 0, "version" integer DEFAULT 0, "isUploaded" integer DEFAULT 0 ); CREATE INDEX IF NOT EXISTS "serverId" ON "` + this.statTableName + `" ( "serverId" ASC, "version" ASC ); CREATE UNIQUE INDEX IF NOT EXISTS "hash" ON "` + this.statTableName + `" ( "hash" ASC );`) if err != nil { return err } // insert stat stmt this.insertStatStmt, err = db.Prepare(`INSERT INTO "stats" ("serverId", "hash", "keys", "value", "time", "version", "isUploaded") VALUES (?, ?, ?, ?, ?, ?, 0) ON CONFLICT("hash") DO UPDATE SET "value"="value"+?, "isUploaded"=0`) if err != nil { return err } // delete by version this.deleteByVersionStmt, err = db.Prepare(`DELETE FROM "` + this.statTableName + `" WHERE "version" 0 { err = json.Unmarshal(keysData, &pbStat.Keys) if err != nil { return nil, err } } pbStats = append(pbStats, pbStat) ids = append(ids, strconv.FormatInt(pbStat.Id, 10)) } // 提前关闭 _ = rows.Close() isClosed = true // 上传 if len(pbStats) > 0 { _, err = rpcClient.MetricStatRPC().UploadMetricStats(rpcClient.Context(), &pb.UploadMetricStatsRequest{MetricStats: pbStats}) if err != nil { return nil, err } } return }(serverId) if err != nil { return err } if len(idStrings) > 0 { // 设置为已上传 _, err = this.db.Exec(`UPDATE "` + this.statTableName + `" SET isUploaded=1 WHERE id IN (` + strings.Join(idStrings, ",") + `)`) if err != nil { return err } } // 休息一下,防止短时间内上传数据过多 if pauseDuration > 0 && len(idStrings) > 0 { time.Sleep(pauseDuration) } } return nil } // 加载服务ID func (this *Task) loadServerIdMap() error { rows, err := this.db.Query(`SELECT DISTINCT "serverId" FROM `+this.statTableName+" WHERE version=?", this.item.Version) if err != nil { return err } defer func() { _ = rows.Close() }() var serverId int64 for rows.Next() { err = rows.Scan(&serverId) if err != nil { return err } this.serverIdMapLocker.Lock() this.serverIdMap[serverId] = true this.serverIdMapLocker.Unlock() } return nil }