使用KV存储实现指标统计

This commit is contained in:
GoEdgeLab
2024-04-02 19:54:04 +08:00
parent d7de2bd167
commit 4c30c28b4c
21 changed files with 1561 additions and 612 deletions

View File

@@ -119,7 +119,7 @@ func (this *IPListManager) init() {
var db IPListDB
var err error
if sqliteErr == nil {
if sqliteErr == nil || !teaconst.EnableKVCacheStore {
db, err = NewSQLiteIPList()
} else {
db, err = NewKVIPList()

View File

@@ -26,8 +26,8 @@ func init() {
type Manager struct {
isQuiting bool
taskMap map[int64]*Task // itemId => *Task
categoryTaskMap map[string][]*Task // category => []*Task
taskMap map[int64]Task // itemId => Task
categoryTaskMap map[string][]Task // category => []Task
locker sync.RWMutex
hasHTTPMetrics bool
@@ -37,8 +37,8 @@ type Manager struct {
func NewManager() *Manager {
return &Manager{
taskMap: map[int64]*Task{},
categoryTaskMap: map[string][]*Task{},
taskMap: map[int64]Task{},
categoryTaskMap: map[string][]Task{},
}
}
@@ -64,11 +64,20 @@ func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) {
if err != nil {
remotelogs.Error("METRIC_MANAGER", "stop task '"+strconv.FormatInt(itemId, 10)+"' failed: "+err.Error())
}
// deleted
if newItem != nil && !newItem.IsOn {
deleteErr := task.Delete()
if deleteErr != nil {
remotelogs.Error("METRIC_MANAGER", "delete task '"+strconv.FormatInt(itemId, 10)+"' failed: "+err.Error())
}
}
delete(this.taskMap, itemId)
} else { // 更新已存在的
if newItem.Version != task.item.Version {
if newItem.Version != task.Item().Version {
remotelogs.Println("METRIC_MANAGER", "update task '"+strconv.FormatInt(itemId, 10)+"'")
task.item = newItem
task.SetItem(newItem)
}
}
}
@@ -81,7 +90,14 @@ func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) {
_, ok := this.taskMap[newItem.Id]
if !ok {
remotelogs.Println("METRIC_MANAGER", "start task '"+strconv.FormatInt(newItem.Id, 10)+"'")
task := NewTask(newItem)
var task Task
if CheckSQLiteDB(newItem.Id) || !teaconst.EnableKVCacheStore {
task = NewSQLiteTask(newItem)
} else {
task = NewKVTask(newItem)
}
err := task.Init()
if err != nil {
remotelogs.Error("METRIC_MANAGER", "initialized task failed: "+err.Error())
@@ -100,13 +116,13 @@ func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) {
this.hasHTTPMetrics = false
this.hasTCPMetrics = false
this.hasUDPMetrics = false
this.categoryTaskMap = map[string][]*Task{}
this.categoryTaskMap = map[string][]Task{}
for _, task := range this.taskMap {
var tasks = this.categoryTaskMap[task.item.Category]
var tasks = this.categoryTaskMap[task.Item().Category]
tasks = append(tasks, task)
this.categoryTaskMap[task.item.Category] = tasks
this.categoryTaskMap[task.Item().Category] = tasks
switch task.item.Category {
switch task.Item().Category {
case serverconfigs.MetricItemCategoryHTTP:
this.hasHTTPMetrics = true
case serverconfigs.MetricItemCategoryTCP:
@@ -144,6 +160,10 @@ func (this *Manager) HasUDPMetrics() bool {
return this.hasUDPMetrics
}
func (this *Manager) TaskMap() map[int64]Task {
return this.taskMap
}
// Quit 退出管理器
func (this *Manager) Quit() {
this.isQuiting = true
@@ -154,6 +174,6 @@ func (this *Manager) Quit() {
for _, task := range this.taskMap {
_ = task.Stop()
}
this.taskMap = map[int64]*Task{}
this.taskMap = map[int64]Task{}
this.locker.Unlock()
}

View File

@@ -1,18 +1,19 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package metrics
package metrics_test
import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/metrics"
"testing"
)
func TestNewManager(t *testing.T) {
var manager = NewManager()
var manager = metrics.NewManager()
{
manager.Update([]*serverconfigs.MetricItemConfig{})
for _, task := range manager.taskMap {
t.Log(task.item.Id)
for _, task := range manager.TaskMap() {
t.Log(task.Item().Id)
}
}
{
@@ -28,8 +29,8 @@ func TestNewManager(t *testing.T) {
Id: 3,
},
})
for _, task := range manager.taskMap {
t.Log("task:", task.item.Id)
for _, task := range manager.TaskMap() {
t.Log("task:", task.Item().Id)
}
}
@@ -43,8 +44,8 @@ func TestNewManager(t *testing.T) {
Id: 2,
},
})
for _, task := range manager.taskMap {
t.Log("task:", task.item.Id)
for _, task := range manager.TaskMap() {
t.Log("task:", task.Item().Id)
}
}
@@ -56,8 +57,8 @@ func TestNewManager(t *testing.T) {
Version: 1,
},
})
for _, task := range manager.taskMap {
t.Log("task:", task.item.Id)
for _, task := range manager.TaskMap() {
t.Log("task:", task.Item().Id)
}
}
}

View File

@@ -3,20 +3,120 @@
package metrics
import (
"bytes"
"encoding/binary"
"errors"
byteutils "github.com/TeaOSLab/EdgeNode/internal/utils/byte"
"github.com/TeaOSLab/EdgeNode/internal/utils/fnv"
"strconv"
"strings"
)
type Stat struct {
ServerId int64
Keys []string
Hash string
Value int64
Time string
ServerId int64 `json:"serverId"`
Keys []string `json:"keys"`
Hash string `json:"hash"`
Value int64 `json:"value"`
Time string `json:"time"`
}
func SumStat(serverId int64, keys []string, time string, version int32, itemId int64) string {
func UniqueKey(serverId int64, keys []string, time string, version int32, itemId int64) string {
var keysData = strings.Join(keys, "$EDGE$")
return strconv.FormatUint(fnv.HashString(strconv.FormatInt(serverId, 10)+"@"+keysData+"@"+time+"@"+strconv.Itoa(int(version))+"@"+strconv.FormatInt(itemId, 10)), 10)
}
func (this *Stat) UniqueKey(version int32, itemId int64) string {
return UniqueKey(this.ServerId, this.Keys, this.Time, version, itemId)
}
func (this *Stat) FullKey(version int32, itemId int64) string {
return this.Time + "_" + string(int32ToBigEndian(version)) + this.UniqueKey(version, itemId)
}
func (this *Stat) EncodeValueKey(version int32) string {
if this.Value < 0 {
this.Value = 0
}
return string(byteutils.Concat([]byte(this.Time), []byte{'_'}, int32ToBigEndian(version), int64ToBigEndian(this.ServerId), int64ToBigEndian(this.Value), []byte(this.Hash)))
}
func (this *Stat) EncodeSumKey(version int32) string {
return string(byteutils.Concat([]byte(this.Time), []byte{'_'}, int32ToBigEndian(version), int64ToBigEndian(this.ServerId)))
}
func DecodeValueKey(valueKey string) (serverId int64, timeString string, version int32, value int64, hash string, err error) {
var b = []byte(valueKey)
var timeIndex = bytes.Index(b, []byte{'_'})
if timeIndex < 0 {
return
}
timeString = string(b[:timeIndex])
b = b[timeIndex+1:]
if len(b) < 20+1 {
err = errors.New("invalid value key")
return
}
version = int32(binary.BigEndian.Uint32(b[0:4]))
serverId = int64(binary.BigEndian.Uint64(b[4:12]))
value = int64(binary.BigEndian.Uint64(b[12:20]))
hash = string(b[20:])
return
}
func DecodeSumKey(sumKey string) (serverId int64, timeString string, version int32, err error) {
var b = []byte(sumKey)
var timeIndex = bytes.Index(b, []byte{'_'})
if timeIndex < 0 {
return
}
timeString = string(b[:timeIndex])
b = b[timeIndex+1:]
if len(b) < 12 {
err = errors.New("invalid sum key")
return
}
version = int32(binary.BigEndian.Uint32(b[:4]))
serverId = int64(binary.BigEndian.Uint64(b[4:12]))
return
}
func EncodeSumValue(count uint64, total uint64) []byte {
var result [16]byte
binary.BigEndian.PutUint64(result[:8], count)
binary.BigEndian.PutUint64(result[8:], total)
return result[:]
}
func DecodeSumValue(data []byte) (count uint64, total uint64) {
if len(data) != 16 {
return
}
count = binary.BigEndian.Uint64(data[:8])
total = binary.BigEndian.Uint64(data[8:])
return
}
func int64ToBigEndian(i int64) []byte {
if i < 0 {
i = 0
}
var b = make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(i))
return b
}
func int32ToBigEndian(i int32) []byte {
if i < 0 {
i = 0
}
var b = make([]byte, 4)
binary.BigEndian.PutUint32(b, uint32(i))
return b
}

View File

@@ -0,0 +1,69 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package metrics_test
import (
"github.com/TeaOSLab/EdgeNode/internal/metrics"
"github.com/iwind/TeaGo/assert"
"testing"
)
func TestStat_EncodeValueKey(t *testing.T) {
var a = assert.NewAssertion(t)
var stat = &metrics.Stat{
ServerId: 1,
Keys: []string{"${remoteAddr}"},
Hash: "123456",
Value: 123,
Time: "20240101",
}
var valueKey = stat.EncodeValueKey(100)
t.Log(valueKey)
serverId, timeString, version, value, hash, err := metrics.DecodeValueKey(valueKey)
if err != nil {
t.Fatal(err)
}
t.Log(serverId, timeString, value, version, hash)
a.IsTrue(serverId == 1)
a.IsTrue(timeString == "20240101")
a.IsTrue(value == 123)
a.IsTrue(version == 100)
a.IsTrue(hash == "123456")
}
func TestStat_EncodeSumKey(t *testing.T) {
var a = assert.NewAssertion(t)
var stat = &metrics.Stat{
ServerId: 1,
Keys: []string{"${remoteAddr}"},
Hash: "123456",
Value: 123,
Time: "20240101",
}
var sumKey = stat.EncodeSumKey(100)
t.Log(sumKey)
serverId, timeString, version, err := metrics.DecodeSumKey(sumKey)
if err != nil {
t.Fatal(err)
}
t.Log(serverId, timeString, version)
a.IsTrue(serverId == 1)
a.IsTrue(timeString == "20240101")
a.IsTrue(version == 100)
}
func TestStat_EncodeSumValue(t *testing.T) {
var a = assert.NewAssertion(t)
var b = metrics.EncodeSumValue(123, 456)
t.Log(b)
count, sum := metrics.DecodeSumValue(b)
a.IsTrue(count == 123)
a.IsTrue(sum == 456)
}

View File

@@ -14,7 +14,7 @@ func BenchmarkSumStat(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
metrics.SumStat(1, []string{"1.2.3.4"}, timeutil.Format("Ymd"), 1, 1)
metrics.UniqueKey(1, []string{"1.2.3.4"}, timeutil.Format("Ymd"), 1, 1)
}
})
}

View File

@@ -1,544 +1,21 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package metrics
import (
"encoding/json"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
"github.com/TeaOSLab/EdgeNode/internal/zero"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/types"
"os"
"strconv"
"sync"
"sync/atomic"
"time"
)
const MaxQueueSize = 256 // TODO 可以配置,可以在单个任务里配置
// Task 单个指标任务
// 数据库存储:
//
// data/
// metric.$ID.db
// stats
// id, keys, value, time, serverId, hash
// 原理:
// 添加或者有变更时 isUploaded = false
// 上传时检查 isUploaded 状态
// 只上传每个服务中排序最前面的 N 个数据
type Task struct {
item *serverconfigs.MetricItemConfig
isLoaded bool
db *dbs.DB
statTableName string
isStopped bool
cleanTicker *utils.Ticker
uploadTicker *utils.Ticker
cleanVersion int32
insertStatStmt *dbs.Stmt
deleteByVersionStmt *dbs.Stmt
deleteByExpiresTimeStmt *dbs.Stmt
selectTopStmt *dbs.Stmt
sumStmt *dbs.Stmt
serverIdMap map[int64]zero.Zero // 所有的服务Ids
timeMap map[string]zero.Zero // time => bool
serverIdMapLocker sync.Mutex
statsMap map[string]*Stat // 待写入队列hash => *Stat
statsLocker sync.RWMutex
statsTicker *utils.Ticker
}
// NewTask 获取新任务
func NewTask(item *serverconfigs.MetricItemConfig) *Task {
return &Task{
item: item,
serverIdMap: map[int64]zero.Zero{},
timeMap: map[string]zero.Zero{},
statsMap: map[string]*Stat{},
}
}
// Init 初始化
func (this *Task) Init() error {
this.statTableName = "stats"
// 检查目录是否存在
var dir = Tea.Root + "/data"
_, err := os.Stat(dir)
if err != nil {
err = os.MkdirAll(dir, 0777)
if err != nil {
return err
}
remotelogs.Println("METRIC", "create data dir '"+dir+"'")
}
var path = dir + "/metric." + types.String(this.item.Id) + ".db"
db, err := dbs.OpenWriter("file:" + path + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=" + dbs.SyncMode + "&_locking_mode=EXCLUSIVE")
if err != nil {
return err
}
db.SetMaxOpenConns(1)
this.db = db
// 恢复数据库
var recoverEnv, _ = os.LookupEnv("EdgeRecover")
if len(recoverEnv) > 0 {
for _, indexName := range []string{"serverId", "hash"} {
_, _ = db.Exec(`REINDEX "` + indexName + `"`)
}
}
if teaconst.EnableDBStat {
this.db.EnableStat(true)
}
//创建统计表
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.statTableName + `" (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"hash" varchar(32),
"keys" varchar(1024),
"value" real DEFAULT 0,
"time" varchar(32),
"serverId" integer DEFAULT 0,
"version" integer DEFAULT 0,
"isUploaded" integer DEFAULT 0
);
CREATE INDEX IF NOT EXISTS "serverId"
ON "` + this.statTableName + `" (
"serverId" ASC,
"version" ASC
);
CREATE UNIQUE INDEX IF NOT EXISTS "hash"
ON "` + this.statTableName + `" (
"hash" ASC
);`)
if err != nil {
return err
}
// insert stat stmt
this.insertStatStmt, err = db.Prepare(`INSERT INTO "stats" ("serverId", "hash", "keys", "value", "time", "version", "isUploaded") VALUES (?, ?, ?, ?, ?, ?, 0) ON CONFLICT("hash") DO UPDATE SET "value"="value"+?, "isUploaded"=0`)
if err != nil {
return err
}
// delete by version
this.deleteByVersionStmt, err = db.Prepare(`DELETE FROM "` + this.statTableName + `" WHERE "version"<?`)
if err != nil {
return err
}
// delete by expires time
this.deleteByExpiresTimeStmt, err = db.Prepare(`DELETE FROM "` + this.statTableName + `" WHERE "time"<?`)
if err != nil {
return err
}
// select topN stmt
this.selectTopStmt, err = db.Prepare(`SELECT "id", "hash", "keys", "value", "isUploaded" FROM "` + this.statTableName + `" WHERE "serverId"=? AND "version"=? AND time=? ORDER BY "value" DESC LIMIT 20`)
if err != nil {
return err
}
// sum stmt
this.sumStmt, err = db.Prepare(`SELECT COUNT(*), IFNULL(SUM(value), 0) FROM "` + this.statTableName + `" WHERE "serverId"=? AND "version"=? AND time=?`)
if err != nil {
return err
}
// 所有的服务IDs
err = this.loadServerIdMap()
if err != nil {
return err
}
this.isLoaded = true
return nil
}
// Start 启动任务
func (this *Task) Start() error {
// 读取数据
this.statsTicker = utils.NewTicker(1 * time.Minute)
goman.New(func() {
for this.statsTicker.Next() {
var tr = trackers.Begin("[METRIC]DUMP_STATS_TO_LOCAL_DATABASE")
this.statsLocker.Lock()
var statsMap = this.statsMap
this.statsMap = map[string]*Stat{}
this.statsLocker.Unlock()
for _, stat := range statsMap {
err := this.InsertStat(stat)
if err != nil {
remotelogs.Error("METRIC", "insert stat failed: "+err.Error())
}
}
tr.End()
}
})
// 清理
this.cleanTicker = utils.NewTicker(24 * time.Hour)
goman.New(func() {
for this.cleanTicker.Next() {
var tr = trackers.Begin("[METRIC]CLEAN_EXPIRED")
err := this.CleanExpired()
tr.End()
if err != nil {
remotelogs.Error("METRIC", "clean expired stats failed: "+err.Error())
}
}
})
// 上传
this.uploadTicker = utils.NewTicker(this.item.UploadDuration())
goman.New(func() {
for this.uploadTicker.Next() {
var tr = trackers.Begin("[METRIC]UPLOAD_STATS")
err := this.Upload(1 * time.Second)
tr.End()
if err != nil && !rpc.IsConnError(err) {
remotelogs.Error("METRIC", "upload stats failed: "+err.Error())
}
}
})
return nil
}
// Add 添加数据
func (this *Task) Add(obj MetricInterface) {
if this.isStopped || !this.isLoaded {
return
}
var keys = []string{}
for _, key := range this.item.Keys {
var k = obj.MetricKey(key)
// 忽略499状态
if key == "${status}" && k == "499" {
return
}
keys = append(keys, k)
}
v, ok := obj.MetricValue(this.item.Value)
if !ok {
return
}
var hash = SumStat(obj.MetricServerId(), keys, this.item.CurrentTime(), this.item.Version, this.item.Id)
var countItems int
this.statsLocker.RLock()
oldStat, ok := this.statsMap[hash]
if !ok {
countItems = len(this.statsMap)
}
this.statsLocker.RUnlock()
if ok {
atomic.AddInt64(&oldStat.Value, 1)
} else {
// 防止过载
if countItems < MaxQueueSize {
this.statsLocker.Lock()
this.statsMap[hash] = &Stat{
ServerId: obj.MetricServerId(),
Keys: keys,
Value: v,
Time: this.item.CurrentTime(),
Hash: hash,
}
this.statsLocker.Unlock()
}
}
}
// Stop 停止任务
func (this *Task) Stop() error {
this.isStopped = true
if this.cleanTicker != nil {
this.cleanTicker.Stop()
}
if this.uploadTicker != nil {
this.uploadTicker.Stop()
}
if this.statsTicker != nil {
this.statsTicker.Stop()
}
_ = this.insertStatStmt.Close()
_ = this.deleteByVersionStmt.Close()
_ = this.deleteByExpiresTimeStmt.Close()
_ = this.selectTopStmt.Close()
_ = this.sumStmt.Close()
if this.db != nil {
_ = this.db.Close()
}
return nil
}
// InsertStat 写入数据
func (this *Task) InsertStat(stat *Stat) error {
if this.isStopped {
return nil
}
if stat == nil {
return nil
}
this.serverIdMapLocker.Lock()
this.serverIdMap[stat.ServerId] = zero.New()
this.timeMap[stat.Time] = zero.New()
this.serverIdMapLocker.Unlock()
keyData, err := json.Marshal(stat.Keys)
if err != nil {
return err
}
_, err = this.insertStatStmt.Exec(stat.ServerId, stat.Hash, keyData, stat.Value, stat.Time, this.item.Version, stat.Value)
if err != nil {
return err
}
return nil
}
// CleanExpired 清理数据
func (this *Task) CleanExpired() error {
if this.isStopped {
return nil
}
// 清除低版本数据
if this.cleanVersion < this.item.Version {
_, err := this.deleteByVersionStmt.Exec(this.item.Version)
if err != nil {
return err
}
this.cleanVersion = this.item.Version
}
// 清除过期的数据
_, err := this.deleteByExpiresTimeStmt.Exec(this.item.LocalExpiresTime())
if err != nil {
return err
}
return nil
}
// Upload 上传数据
func (this *Task) Upload(pauseDuration time.Duration) error {
if this.isStopped {
return nil
}
this.serverIdMapLocker.Lock()
// 服务IDs
var serverIds []int64
for serverId := range this.serverIdMap {
serverIds = append(serverIds, serverId)
}
this.serverIdMap = map[int64]zero.Zero{} // 清空数据
// 时间
var times = []string{}
for t := range this.timeMap {
times = append(times, t)
}
this.timeMap = map[string]zero.Zero{} // 清空数据
this.serverIdMapLocker.Unlock()
rpcClient, err := rpc.SharedRPC()
if err != nil {
return err
}
for _, serverId := range serverIds {
for _, currentTime := range times {
idStrings, err := func(serverId int64, currentTime string) (ids []string, err error) {
var t = trackers.Begin("[METRIC]SELECT_TOP_STMT")
rows, err := this.selectTopStmt.Query(serverId, this.item.Version, currentTime)
t.End()
if err != nil {
return nil, err
}
var isClosed bool
defer func() {
if isClosed {
return
}
_ = rows.Close()
}()
var pbStats []*pb.UploadingMetricStat
for rows.Next() {
var pbStat = &pb.UploadingMetricStat{}
// "id", "hash", "keys", "value", "isUploaded"
var isUploaded int
var keysData []byte
err = rows.Scan(&pbStat.Id, &pbStat.Hash, &keysData, &pbStat.Value, &isUploaded)
if err != nil {
return nil, err
}
// TODO 先不判断是否已经上传需要改造API进行配合
/**if isUploaded == 1 {
continue
}**/
if len(keysData) > 0 {
err = json.Unmarshal(keysData, &pbStat.Keys)
if err != nil {
return nil, err
}
}
pbStats = append(pbStats, pbStat)
ids = append(ids, strconv.FormatInt(pbStat.Id, 10))
}
// 提前关闭
_ = rows.Close()
isClosed = true
// 上传
if len(pbStats) > 0 {
// 计算总和
count, total, err := this.sum(serverId, currentTime)
if err != nil {
return nil, err
}
_, err = rpcClient.MetricStatRPC.UploadMetricStats(rpcClient.Context(), &pb.UploadMetricStatsRequest{
MetricStats: pbStats,
Time: currentTime,
ServerId: serverId,
ItemId: this.item.Id,
Version: this.item.Version,
Count: count,
Total: float32(total),
})
if err != nil {
return nil, err
}
}
return
}(serverId, currentTime)
if err != nil {
return err
}
if len(idStrings) > 0 {
// 设置为已上传
// TODO 先不判断是否已经上传需要改造API进行配合
/**_, err = this.db.Exec(`UPDATE "` + this.statTableName + `" SET isUploaded=1 WHERE id IN (` + strings.Join(idStrings, ",") + `)`)
if err != nil {
return err
}**/
}
}
// 休息一下,防止短时间内上传数据过多
if pauseDuration > 0 {
time.Sleep(pauseDuration)
}
}
return nil
}
// 加载服务ID
func (this *Task) loadServerIdMap() error {
{
rows, err := this.db.Query(`SELECT DISTINCT "serverId" FROM `+this.statTableName+" WHERE version=?", this.item.Version)
if err != nil {
return err
}
defer func() {
_ = rows.Close()
}()
var serverId int64
for rows.Next() {
err = rows.Scan(&serverId)
if err != nil {
return err
}
this.serverIdMapLocker.Lock()
this.serverIdMap[serverId] = zero.New()
this.serverIdMapLocker.Unlock()
}
}
{
rows, err := this.db.Query(`SELECT DISTINCT "time" FROM `+this.statTableName+" WHERE version=?", this.item.Version)
if err != nil {
return err
}
defer func() {
_ = rows.Close()
}()
var timeString string
for rows.Next() {
err = rows.Scan(&timeString)
if err != nil {
return err
}
this.serverIdMapLocker.Lock()
this.timeMap[timeString] = zero.New()
this.serverIdMapLocker.Unlock()
}
}
return nil
}
// 计算数量和综合
func (this *Task) sum(serverId int64, time string) (count int64, total float64, err error) {
rows, err := this.sumStmt.Query(serverId, this.item.Version, time)
if err != nil {
return 0, 0, err
}
defer func() {
_ = rows.Close()
}()
if rows.Next() {
err = rows.Scan(&count, &total)
if err != nil {
return 0, 0, err
}
}
return
type Task interface {
Init() error
Item() *serverconfigs.MetricItemConfig
SetItem(item *serverconfigs.MetricItemConfig)
Add(obj MetricInterface)
InsertStat(stat *Stat) error
Upload(pauseDuration time.Duration) error
Start() error
Stop() error
Delete() error
CleanExpired() error
}

View File

@@ -0,0 +1,75 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package metrics
import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"sync"
"sync/atomic"
)
type BaseTask struct {
itemConfig *serverconfigs.MetricItemConfig
isLoaded bool
isStopped bool
statsMap map[string]*Stat // 待写入队列hash => *Stat
statsLocker sync.RWMutex
}
// Add 添加数据
func (this *BaseTask) Add(obj MetricInterface) {
if this.isStopped || !this.isLoaded {
return
}
var keys = []string{}
for _, key := range this.itemConfig.Keys {
var k = obj.MetricKey(key)
// 忽略499状态
if key == "${status}" && k == "499" {
return
}
keys = append(keys, k)
}
v, ok := obj.MetricValue(this.itemConfig.Value)
if !ok {
return
}
var hash = UniqueKey(obj.MetricServerId(), keys, this.itemConfig.CurrentTime(), this.itemConfig.Version, this.itemConfig.Id)
var countItems int
this.statsLocker.RLock()
oldStat, ok := this.statsMap[hash]
if !ok {
countItems = len(this.statsMap)
}
this.statsLocker.RUnlock()
if ok {
atomic.AddInt64(&oldStat.Value, 1)
} else {
// 防止过载
if countItems < MaxQueueSize {
this.statsLocker.Lock()
this.statsMap[hash] = &Stat{
ServerId: obj.MetricServerId(),
Keys: keys,
Value: v,
Time: this.itemConfig.CurrentTime(),
Hash: hash,
}
this.statsLocker.Unlock()
}
}
}
func (this *BaseTask) Item() *serverconfigs.MetricItemConfig {
return this.itemConfig
}
func (this *BaseTask) SetItem(itemConfig *serverconfigs.MetricItemConfig) {
this.itemConfig = itemConfig
}

534
internal/metrics/task_kv.go Normal file
View File

@@ -0,0 +1,534 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package metrics
import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/TeaOSLab/EdgeNode/internal/utils"
byteutils "github.com/TeaOSLab/EdgeNode/internal/utils/byte"
"github.com/TeaOSLab/EdgeNode/internal/utils/kvstore"
"github.com/TeaOSLab/EdgeNode/internal/zero"
"github.com/cockroachdb/pebble"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/types"
"strings"
"sync"
"testing"
"time"
)
// TODO sumValues不用每次insertStat的时候都保存
// KVTask KV存储实现的任务管理
type KVTask struct {
BaseTask
itemsTable *kvstore.Table[*Stat] // hash => *Stat
valuesTable *kvstore.Table[[]byte] // time_version_serverId_value_hash => []byte(nil)
sumTable *kvstore.Table[[]byte] // time_version_serverId => [count][total]
serverTimeMap map[string]zero.Zero // 有变更的网站 serverId_time => Zero
serverIdMapLocker sync.Mutex
statsTicker *utils.Ticker
cleanTicker *utils.Ticker
uploadTicker *utils.Ticker
valuesCacheMap map[string]int64 // hash => value
}
func NewKVTask(itemConfig *serverconfigs.MetricItemConfig) *KVTask {
return &KVTask{
BaseTask: BaseTask{
itemConfig: itemConfig,
statsMap: map[string]*Stat{},
},
serverTimeMap: map[string]zero.Zero{},
valuesCacheMap: map[string]int64{},
}
}
func (this *KVTask) Init() error {
store, err := kvstore.DefaultStore()
if err != nil {
return err
}
db, err := store.NewDB("metrics" + types.String(this.itemConfig.Id))
if err != nil {
return err
}
{
table, tableErr := kvstore.NewTable[*Stat]("items", &ItemEncoder[*Stat]{})
if tableErr != nil {
return tableErr
}
db.AddTable(table)
this.itemsTable = table
}
{
table, tableErr := kvstore.NewTable[[]byte]("values", kvstore.NewNilValueEncoder())
if tableErr != nil {
return tableErr
}
db.AddTable(table)
this.valuesTable = table
}
{
table, tableErr := kvstore.NewTable[[]byte]("sumValues", kvstore.NewBytesValueEncoder())
if tableErr != nil {
return tableErr
}
db.AddTable(table)
this.sumTable = table
}
// 所有的服务IDs
err = this.loadServerIdMap()
if err != nil {
return err
}
this.isLoaded = true
return nil
}
func (this *KVTask) InsertStat(stat *Stat) error {
if this.isStopped {
return nil
}
if stat == nil {
return nil
}
var version = this.itemConfig.Version
this.serverIdMapLocker.Lock()
this.serverTimeMap[types.String(stat.ServerId)+"_"+stat.Time] = zero.New()
this.serverIdMapLocker.Unlock()
if len(stat.Hash) == 0 {
stat.Hash = stat.UniqueKey(version, this.itemConfig.Id)
}
var isNew bool
var newValue = stat.Value
// insert or update
{
var statKey = stat.FullKey(version, this.itemConfig.Id)
oldStat, err := this.itemsTable.Get(statKey)
var oldValue int64
if err != nil {
if !kvstore.IsNotFound(err) {
return err
}
isNew = true
} else {
oldValue = oldStat.Value
// delete old value from valuesTable
err = this.valuesTable.Delete(oldStat.EncodeValueKey(version))
if err != nil {
return err
}
}
oldValue += stat.Value
stat.Value = oldValue
err = this.itemsTable.Set(statKey, stat)
if err != nil {
return err
}
// insert value into valuesTable
err = this.valuesTable.Insert(stat.EncodeValueKey(version), nil)
if err != nil {
return err
}
}
// sum
{
var sumKey = stat.EncodeSumKey(version)
sumResult, err := this.sumTable.Get(sumKey)
var count uint64
var total uint64
if err != nil {
if !kvstore.IsNotFound(err) {
return err
}
} else {
count, total = DecodeSumValue(sumResult)
}
if isNew {
count++
}
total += uint64(newValue)
err = this.sumTable.Set(sumKey, EncodeSumValue(count, total))
if err != nil {
return err
}
}
return nil
}
func (this *KVTask) Upload(pauseDuration time.Duration) error {
var uploadTr = trackers.Begin("METRIC:UPLOAD_STATS")
defer uploadTr.End()
if this.isStopped {
return nil
}
this.serverIdMapLocker.Lock()
// 服务IDs
var serverTimeMap = this.serverTimeMap
this.serverTimeMap = map[string]zero.Zero{} // 清空数据
this.serverIdMapLocker.Unlock()
if len(serverTimeMap) == 0 {
return nil
}
// 控制缓存map不要太长
if len(this.valuesCacheMap) > 4096 {
var newMap = map[string]int64{}
var countElements int
for k, v := range this.valuesCacheMap {
newMap[k] = v
countElements++
if countElements >= 2048 {
break
}
}
this.valuesCacheMap = newMap
}
// 开始上传
rpcClient, err := rpc.SharedRPC()
if err != nil {
return err
}
var totalCount int
for serverTime := range serverTimeMap {
count, uploadErr := func(serverTime string) (int, error) {
serverIdString, timeString, found := strings.Cut(serverTime, "_")
if !found {
return 0, nil
}
var serverId = types.Int64(serverIdString)
if serverId <= 0 {
return 0, nil
}
return this.uploadServerStats(rpcClient, serverId, timeString)
}(serverTime)
if uploadErr != nil {
return uploadErr
}
totalCount += count
// 休息一下,防止短时间内上传数据过多
if pauseDuration > 0 && totalCount >= 100 {
time.Sleep(pauseDuration)
uploadTr.Add(-pauseDuration)
}
}
return nil
}
func (this *KVTask) Start() error {
// 读取数据
this.statsTicker = utils.NewTicker(1 * time.Minute)
if Tea.IsTesting() {
this.statsTicker = utils.NewTicker(10 * time.Second)
}
goman.New(func() {
for this.statsTicker.Next() {
var tr = trackers.Begin("METRIC:DUMP_STATS_TO_LOCAL_DATABASE")
this.statsLocker.Lock()
var statsMap = this.statsMap
this.statsMap = map[string]*Stat{}
this.statsLocker.Unlock()
for _, stat := range statsMap {
err := this.InsertStat(stat)
if err != nil {
remotelogs.Error("METRIC", "insert stat failed: "+err.Error())
}
}
tr.End()
}
})
// 清理
this.cleanTicker = utils.NewTicker(24 * time.Hour)
goman.New(func() {
for this.cleanTicker.Next() {
var tr = trackers.Begin("METRIC:CLEAN_EXPIRED")
err := this.CleanExpired()
tr.End()
if err != nil {
remotelogs.Error("METRIC", "clean expired stats failed: "+err.Error())
}
}
})
// 上传
this.uploadTicker = utils.NewTicker(this.itemConfig.UploadDuration())
goman.New(func() {
for this.uploadTicker.Next() {
err := this.Upload(1 * time.Second)
if err != nil && !rpc.IsConnError(err) {
remotelogs.Error("METRIC", "upload stats failed: "+err.Error())
}
}
})
return nil
}
func (this *KVTask) Stop() error {
this.isStopped = true
if this.cleanTicker != nil {
this.cleanTicker.Stop()
}
if this.uploadTicker != nil {
this.uploadTicker.Stop()
}
if this.statsTicker != nil {
this.statsTicker.Stop()
}
return nil
}
func (this *KVTask) Delete() error {
this.isStopped = true
return this.itemsTable.DB().Truncate()
}
func (this *KVTask) CleanExpired() error {
if this.isStopped {
return nil
}
var versionBytes = int32ToBigEndian(this.itemConfig.Version)
var expiresTime = this.itemConfig.LocalExpiresTime()
var rangeEnd = append([]byte(expiresTime+"_"), versionBytes...)
rangeEnd = append(rangeEnd, 0xFF, 0xFF)
{
err := this.itemsTable.DeleteRange("", string(rangeEnd))
if err != nil {
return err
}
}
{
err := this.valuesTable.DeleteRange("", string(rangeEnd))
if err != nil {
return err
}
}
{
err := this.sumTable.DeleteRange("", string(rangeEnd))
if err != nil {
return err
}
}
return nil
}
func (this *KVTask) Flush() error {
return this.itemsTable.DB().Store().Flush()
}
func (this *KVTask) TestInspect(t *testing.T) {
var db = this.itemsTable.DB()
it, err := db.Store().RawDB().NewIter(&pebble.IterOptions{
LowerBound: []byte(db.Namespace()),
UpperBound: append([]byte(db.Namespace()), 0xFF),
})
if err != nil {
t.Fatal(err)
}
defer func() {
_ = it.Close()
}()
for it.First(); it.Valid(); it.Next() {
valueBytes, valueErr := it.ValueAndErr()
if valueErr != nil {
t.Fatal(valueErr)
}
var key = string(it.Key()[len(db.Namespace())-1:])
t.Log(key, "=>", string(valueBytes))
if strings.HasPrefix(key, "$values$K$") {
_, _, _, value, hash, _ := DecodeValueKey(key[len("$values$K$"):])
t.Log(" |", hash, "=>", value)
} else if strings.HasPrefix(key, "$sumValues$K$") {
count, sum := DecodeSumValue(valueBytes)
t.Log(" |", count, sum)
}
}
}
func (this *KVTask) Truncate() error {
var db = this.itemsTable.DB()
err := db.Truncate()
if err != nil {
return err
}
return db.Store().Flush()
}
func (this *KVTask) uploadServerStats(rpcClient *rpc.RPCClient, serverId int64, currentTime string) (countValues int, uploadErr error) {
var pbStats []*pb.UploadingMetricStat
var keepKeys []string
var prefix = string(byteutils.Concat([]byte(currentTime), []byte{'_'}, int32ToBigEndian(this.itemConfig.Version), int64ToBigEndian(serverId)))
var newCachedKeys = map[string]int64{}
queryErr := this.valuesTable.
Query().
Prefix(prefix).
Desc().
Limit(20).
FindAll(func(tx *kvstore.Tx[[]byte], item kvstore.Item[[]byte]) (goNext bool, err error) {
_, _, version, value, hash, decodeErr := DecodeValueKey(item.Key)
if decodeErr != nil {
return false, decodeErr
}
if value <= 0 {
return true, nil
}
// value not changed for the key
if this.valuesCacheMap[hash] == value {
keepKeys = append(keepKeys, hash)
return true, nil
}
newCachedKeys[hash] = value
stat, valueErr := this.itemsTable.Get(string(byteutils.Concat([]byte(currentTime), []byte{'_'}, int32ToBigEndian(version), []byte(hash))))
if valueErr != nil {
if kvstore.IsNotFound(valueErr) {
return true, nil
}
return false, valueErr
}
if stat == nil {
return true, nil
}
pbStats = append(pbStats, &pb.UploadingMetricStat{
Id: 0, // not used in node
Hash: hash,
Keys: stat.Keys,
Value: float32(value),
})
return true, nil
})
if queryErr != nil {
return 0, queryErr
}
// count & total
var count, total uint64
{
sumValue, err := this.sumTable.Get(prefix)
if err != nil {
return 0, err
}
count, total = DecodeSumValue(sumValue)
}
_, err := rpcClient.MetricStatRPC.UploadMetricStats(rpcClient.Context(), &pb.UploadMetricStatsRequest{
MetricStats: pbStats,
Time: currentTime,
ServerId: serverId,
ItemId: this.itemConfig.Id,
Version: this.itemConfig.Version,
Count: int64(count),
Total: float32(total),
KeepKeys: keepKeys,
})
if err != nil {
return 0, err
}
// put into cache map MUST be after uploading success
for k, v := range newCachedKeys {
this.valuesCacheMap[k] = v
}
return len(pbStats), nil
}
func (this *KVTask) loadServerIdMap() error {
var offsetKey string
var currentTime = this.itemConfig.CurrentTime()
for {
var found bool
err := this.sumTable.
Query().
Limit(1000).
Offset(offsetKey).
FindAll(func(tx *kvstore.Tx[[]byte], item kvstore.Item[[]byte]) (goNext bool, err error) {
offsetKey = item.Key
found = true
serverId, timeString, version, decodeErr := DecodeSumKey(item.Key)
if decodeErr != nil {
return false, decodeErr
}
if version != this.itemConfig.Version || timeString != currentTime {
return true, nil
}
this.serverIdMapLocker.Lock()
this.serverTimeMap[types.String(serverId)+"_"+timeString] = zero.New()
this.serverIdMapLocker.Unlock()
return true, nil
})
if err != nil {
return err
}
if !found {
break
}
}
return nil
}

View File

@@ -0,0 +1,24 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package metrics
import (
"encoding/json"
"errors"
)
type ItemEncoder[T interface{ *Stat }] struct {
}
func (this *ItemEncoder[T]) Encode(value T) ([]byte, error) {
return json.Marshal(value)
}
func (this *ItemEncoder[T]) EncodeField(value T, fieldName string) ([]byte, error) {
return nil, errors.New("invalid field name '" + fieldName + "'")
}
func (this *ItemEncoder[T]) Decode(valueBytes []byte) (value T, err error) {
err = json.Unmarshal(valueBytes, &value)
return
}

View File

@@ -37,8 +37,8 @@ func (this *testObj) MetricCategory() string {
return "http"
}
func TestTask_Init(t *testing.T) {
var task = metrics.NewTask(&serverconfigs.MetricItemConfig{
func TestKVTask_Init(t *testing.T) {
var task = metrics.NewKVTask(&serverconfigs.MetricItemConfig{
Id: 1,
IsOn: false,
Category: "",
@@ -57,8 +57,8 @@ func TestTask_Init(t *testing.T) {
t.Log("ok")
}
func TestTask_Add(t *testing.T) {
var task = metrics.NewTask(&serverconfigs.MetricItemConfig{
func TestKVTask_Add(t *testing.T) {
var task = metrics.NewKVTask(&serverconfigs.MetricItemConfig{
Id: 1,
IsOn: false,
Category: "",
@@ -80,15 +80,18 @@ func TestTask_Add(t *testing.T) {
}()
task.Add(&testObj{ip: "127.0.0.2"})
time.Sleep(1 * time.Second) // waiting for inserting
if testutils.IsSingleTesting() {
time.Sleep(1 * time.Second) // waiting for inserting
}
}
func TestTask_Add_Many(t *testing.T) {
func TestKVTask_Add_Many(t *testing.T) {
if !testutils.IsSingleTesting() {
return
}
var task = metrics.NewTask(&serverconfigs.MetricItemConfig{
var task = metrics.NewKVTask(&serverconfigs.MetricItemConfig{
Id: 1,
IsOn: false,
Category: "",
@@ -120,7 +123,7 @@ func TestTask_Add_Many(t *testing.T) {
}
}
func TestTask_InsertStat(t *testing.T) {
func TestKVTask_InsertStat(t *testing.T) {
var item = &serverconfigs.MetricItemConfig{
Id: 1,
IsOn: false,
@@ -131,11 +134,19 @@ func TestTask_InsertStat(t *testing.T) {
Value: "${countRequest}",
Version: 1,
}
var task = metrics.NewTask(item)
var task = metrics.NewKVTask(item)
err := task.Init()
if err != nil {
t.Fatal(err)
}
defer func() {
err = task.Flush()
if err != nil {
t.Fatal(err)
}
}()
err = task.Start()
if err != nil {
t.Fatal(err)
@@ -144,21 +155,50 @@ func TestTask_InsertStat(t *testing.T) {
_ = task.Stop()
}()
err = task.InsertStat(&metrics.Stat{
ServerId: 1,
Keys: []string{"127.0.0.1"},
Hash: "",
Value: 1,
Time: item.CurrentTime(),
})
if err != nil {
t.Fatal(err)
{
err = task.InsertStat(&metrics.Stat{
ServerId: 1,
Keys: []string{"127.0.0.1"},
Hash: "",
Value: 1,
Time: item.CurrentTime(),
})
if err != nil {
t.Fatal(err)
}
}
t.Log("ok")
{
err = task.InsertStat(&metrics.Stat{
ServerId: 2,
Keys: []string{"127.0.0.2"},
Hash: "",
Value: 3,
Time: item.CurrentTime(),
})
if err != nil {
t.Fatal(err)
}
}
{
err = task.InsertStat(&metrics.Stat{
ServerId: 1,
Keys: []string{"127.0.0.3"},
Hash: "",
Value: 2,
Time: item.CurrentTime(),
})
if err != nil {
t.Fatal(err)
}
}
TestKVTask_TestInspect(t)
}
func TestTask_CleanExpired(t *testing.T) {
var task = metrics.NewTask(&serverconfigs.MetricItemConfig{
func TestKVTask_CleanExpired(t *testing.T) {
var task = metrics.NewKVTask(&serverconfigs.MetricItemConfig{
Id: 1,
IsOn: false,
Category: "",
@@ -184,12 +224,18 @@ func TestTask_CleanExpired(t *testing.T) {
if err != nil {
t.Fatal(err)
}
t.Log("ok")
defer func() {
_ = task.Flush()
}()
t.Log("=== inspect ===")
task.TestInspect(t)
}
func TestTask_Upload(t *testing.T) {
var task = metrics.NewTask(&serverconfigs.MetricItemConfig{
Id: 1,
func TestKVTask_Upload(t *testing.T) {
var task = metrics.NewKVTask(&serverconfigs.MetricItemConfig{
Id: 31,
IsOn: false,
Category: "",
Period: 1,
@@ -218,11 +264,51 @@ func TestTask_Upload(t *testing.T) {
t.Log("ok")
}
var testingTask *metrics.Task
func TestKVTask_TestInspect(t *testing.T) {
var task = metrics.NewKVTask(&serverconfigs.MetricItemConfig{
Id: 1,
IsOn: false,
Category: "",
Period: 1,
PeriodUnit: serverconfigs.MetricItemPeriodUnitDay,
Keys: []string{"${remoteAddr}"},
Value: "${countRequest}",
Version: 1,
})
err := task.Init()
if err != nil {
t.Fatal(err)
}
task.TestInspect(t)
}
func TestKVTask_Truncate(t *testing.T) {
var task = metrics.NewKVTask(&serverconfigs.MetricItemConfig{
Id: 1,
IsOn: false,
Category: "",
Period: 1,
PeriodUnit: serverconfigs.MetricItemPeriodUnitDay,
Keys: []string{"${remoteAddr}"},
Value: "${countRequest}",
Version: 1,
})
err := task.Init()
if err != nil {
t.Fatal(err)
}
if testutils.IsSingleTesting() {
_ = task.Truncate()
}
}
var testingTask metrics.Task
var testingTaskInitOnce = &sync.Once{}
func initTestingTask() {
testingTask = metrics.NewTask(&serverconfigs.MetricItemConfig{
testingTask = metrics.NewKVTask(&serverconfigs.MetricItemConfig{
Id: 1,
IsOn: false,
Category: "tcp",
@@ -243,7 +329,7 @@ func initTestingTask() {
}
}
func BenchmarkTask_Add(b *testing.B) {
func BenchmarkKVTask_Add(b *testing.B) {
runtime.GOMAXPROCS(1)
testingTaskInitOnce.Do(func() {

View File

@@ -0,0 +1,505 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package metrics
import (
"encoding/json"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
"github.com/TeaOSLab/EdgeNode/internal/zero"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/types"
"os"
"strconv"
"sync"
"time"
)
const MaxQueueSize = 256 // TODO 可以配置,可以在单个任务里配置
// SQLiteTask 单个指标任务
// 数据库存储:
//
// data/
// metric.$ID.db
// stats
// id, keys, value, time, serverId, hash
// 原理:
// 添加或者有变更时 isUploaded = false
// 上传时检查 isUploaded 状态
// 只上传每个服务中排序最前面的 N 个数据
type SQLiteTask struct {
BaseTask
db *dbs.DB
statTableName string
cleanTicker *utils.Ticker
uploadTicker *utils.Ticker
cleanVersion int32
insertStatStmt *dbs.Stmt
deleteByVersionStmt *dbs.Stmt
deleteByExpiresTimeStmt *dbs.Stmt
selectTopStmt *dbs.Stmt
sumStmt *dbs.Stmt
serverIdMap map[int64]zero.Zero // 所有的服务Ids
timeMap map[string]zero.Zero // time => bool
serverIdMapLocker sync.Mutex
statsTicker *utils.Ticker
}
// NewSQLiteTask 获取新任务
func NewSQLiteTask(item *serverconfigs.MetricItemConfig) *SQLiteTask {
return &SQLiteTask{
BaseTask: BaseTask{
itemConfig: item,
statsMap: map[string]*Stat{},
},
serverIdMap: map[int64]zero.Zero{},
timeMap: map[string]zero.Zero{},
}
}
func CheckSQLiteDB(itemId int64) bool {
var path = Tea.Root + "/data/metric." + types.String(itemId) + ".db"
_, err := os.Stat(path)
return err == nil
}
// Init 初始化
func (this *SQLiteTask) Init() error {
this.statTableName = "stats"
// 检查目录是否存在
var dir = Tea.Root + "/data"
_, err := os.Stat(dir)
if err != nil {
err = os.MkdirAll(dir, 0777)
if err != nil {
return err
}
remotelogs.Println("METRIC", "create data dir '"+dir+"'")
}
var path = dir + "/metric." + types.String(this.itemConfig.Id) + ".db"
db, err := dbs.OpenWriter("file:" + path + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=" + dbs.SyncMode + "&_locking_mode=EXCLUSIVE")
if err != nil {
return err
}
db.SetMaxOpenConns(1)
this.db = db
// 恢复数据库
var recoverEnv, _ = os.LookupEnv("EdgeRecover")
if len(recoverEnv) > 0 {
for _, indexName := range []string{"serverId", "hash"} {
_, _ = db.Exec(`REINDEX "` + indexName + `"`)
}
}
if teaconst.EnableDBStat {
this.db.EnableStat(true)
}
//创建统计表
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.statTableName + `" (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"hash" varchar(32),
"keys" varchar(1024),
"value" real DEFAULT 0,
"time" varchar(32),
"serverId" integer DEFAULT 0,
"version" integer DEFAULT 0,
"isUploaded" integer DEFAULT 0
);
CREATE INDEX IF NOT EXISTS "serverId"
ON "` + this.statTableName + `" (
"serverId" ASC,
"version" ASC
);
CREATE UNIQUE INDEX IF NOT EXISTS "hash"
ON "` + this.statTableName + `" (
"hash" ASC
);`)
if err != nil {
return err
}
// insert stat stmt
this.insertStatStmt, err = db.Prepare(`INSERT INTO "stats" ("serverId", "hash", "keys", "value", "time", "version", "isUploaded") VALUES (?, ?, ?, ?, ?, ?, 0) ON CONFLICT("hash") DO UPDATE SET "value"="value"+?, "isUploaded"=0`)
if err != nil {
return err
}
// delete by version
this.deleteByVersionStmt, err = db.Prepare(`DELETE FROM "` + this.statTableName + `" WHERE "version"<?`)
if err != nil {
return err
}
// delete by expires time
this.deleteByExpiresTimeStmt, err = db.Prepare(`DELETE FROM "` + this.statTableName + `" WHERE "time"<?`)
if err != nil {
return err
}
// select topN stmt
this.selectTopStmt, err = db.Prepare(`SELECT "id", "hash", "keys", "value", "isUploaded" FROM "` + this.statTableName + `" WHERE "serverId"=? AND "version"=? AND time=? ORDER BY "value" DESC LIMIT 20`)
if err != nil {
return err
}
// sum stmt
this.sumStmt, err = db.Prepare(`SELECT COUNT(*), IFNULL(SUM(value), 0) FROM "` + this.statTableName + `" WHERE "serverId"=? AND "version"=? AND time=?`)
if err != nil {
return err
}
// 所有的服务IDs
err = this.loadServerIdMap()
if err != nil {
return err
}
this.isLoaded = true
return nil
}
// Start 启动任务
func (this *SQLiteTask) Start() error {
// 读取数据
this.statsTicker = utils.NewTicker(1 * time.Minute)
goman.New(func() {
for this.statsTicker.Next() {
var tr = trackers.Begin("METRIC:DUMP_STATS_TO_LOCAL_DATABASE")
this.statsLocker.Lock()
var statsMap = this.statsMap
this.statsMap = map[string]*Stat{}
this.statsLocker.Unlock()
for _, stat := range statsMap {
err := this.InsertStat(stat)
if err != nil {
remotelogs.Error("METRIC", "insert stat failed: "+err.Error())
}
}
tr.End()
}
})
// 清理
this.cleanTicker = utils.NewTicker(24 * time.Hour)
goman.New(func() {
for this.cleanTicker.Next() {
var tr = trackers.Begin("METRIC:CLEAN_EXPIRED")
err := this.CleanExpired()
tr.End()
if err != nil {
remotelogs.Error("METRIC", "clean expired stats failed: "+err.Error())
}
}
})
// 上传
this.uploadTicker = utils.NewTicker(this.itemConfig.UploadDuration())
goman.New(func() {
for this.uploadTicker.Next() {
err := this.Upload(1 * time.Second)
if err != nil && !rpc.IsConnError(err) {
remotelogs.Error("METRIC", "upload stats failed: "+err.Error())
}
}
})
return nil
}
// Stop 停止任务
func (this *SQLiteTask) Stop() error {
this.isStopped = true
if this.cleanTicker != nil {
this.cleanTicker.Stop()
}
if this.uploadTicker != nil {
this.uploadTicker.Stop()
}
if this.statsTicker != nil {
this.statsTicker.Stop()
}
_ = this.insertStatStmt.Close()
_ = this.deleteByVersionStmt.Close()
_ = this.deleteByExpiresTimeStmt.Close()
_ = this.selectTopStmt.Close()
_ = this.sumStmt.Close()
if this.db != nil {
_ = this.db.Close()
}
return nil
}
func (this *SQLiteTask) Delete() error {
return nil
}
// InsertStat 写入数据
func (this *SQLiteTask) InsertStat(stat *Stat) error {
if this.isStopped {
return nil
}
if stat == nil {
return nil
}
this.serverIdMapLocker.Lock()
this.serverIdMap[stat.ServerId] = zero.New()
this.timeMap[stat.Time] = zero.New()
this.serverIdMapLocker.Unlock()
keyData, err := json.Marshal(stat.Keys)
if err != nil {
return err
}
_, err = this.insertStatStmt.Exec(stat.ServerId, stat.Hash, keyData, stat.Value, stat.Time, this.itemConfig.Version, stat.Value)
if err != nil {
return err
}
return nil
}
// CleanExpired 清理数据
func (this *SQLiteTask) CleanExpired() error {
if this.isStopped {
return nil
}
// 清除低版本数据
if this.cleanVersion < this.itemConfig.Version {
_, err := this.deleteByVersionStmt.Exec(this.itemConfig.Version)
if err != nil {
return err
}
this.cleanVersion = this.itemConfig.Version
}
// 清除过期的数据
_, err := this.deleteByExpiresTimeStmt.Exec(this.itemConfig.LocalExpiresTime())
if err != nil {
return err
}
return nil
}
// Upload 上传数据
func (this *SQLiteTask) Upload(pauseDuration time.Duration) error {
var uploadTr = trackers.Begin("METRIC:UPLOAD_STATS")
defer uploadTr.End()
if this.isStopped {
return nil
}
this.serverIdMapLocker.Lock()
// 服务IDs
var serverIds []int64
for serverId := range this.serverIdMap {
serverIds = append(serverIds, serverId)
}
this.serverIdMap = map[int64]zero.Zero{} // 清空数据
// 时间
var times = []string{}
for t := range this.timeMap {
times = append(times, t)
}
this.timeMap = map[string]zero.Zero{} // 清空数据
this.serverIdMapLocker.Unlock()
rpcClient, err := rpc.SharedRPC()
if err != nil {
return err
}
for _, serverId := range serverIds {
for _, currentTime := range times {
idStrings, err := func(serverId int64, currentTime string) (ids []string, err error) {
var t = trackers.Begin("METRIC:SELECT_TOP_STMT")
rows, err := this.selectTopStmt.Query(serverId, this.itemConfig.Version, currentTime)
t.End()
if err != nil {
return nil, err
}
var isClosed bool
defer func() {
if isClosed {
return
}
_ = rows.Close()
}()
var pbStats []*pb.UploadingMetricStat
for rows.Next() {
var pbStat = &pb.UploadingMetricStat{}
// "id", "hash", "keys", "value", "isUploaded"
var isUploaded int
var keysData []byte
err = rows.Scan(&pbStat.Id, &pbStat.Hash, &keysData, &pbStat.Value, &isUploaded)
if err != nil {
return nil, err
}
// TODO 先不判断是否已经上传需要改造API进行配合
/**if isUploaded == 1 {
continue
}**/
if len(keysData) > 0 {
err = json.Unmarshal(keysData, &pbStat.Keys)
if err != nil {
return nil, err
}
}
pbStats = append(pbStats, pbStat)
ids = append(ids, strconv.FormatInt(pbStat.Id, 10))
}
// 提前关闭
_ = rows.Close()
isClosed = true
// 上传
if len(pbStats) > 0 {
// 计算总和
count, total, err := this.sum(serverId, currentTime)
if err != nil {
return nil, err
}
_, err = rpcClient.MetricStatRPC.UploadMetricStats(rpcClient.Context(), &pb.UploadMetricStatsRequest{
MetricStats: pbStats,
Time: currentTime,
ServerId: serverId,
ItemId: this.itemConfig.Id,
Version: this.itemConfig.Version,
Count: count,
Total: float32(total),
})
if err != nil {
return nil, err
}
}
return
}(serverId, currentTime)
if err != nil {
return err
}
if len(idStrings) > 0 {
// 设置为已上传
// TODO 先不判断是否已经上传需要改造API进行配合
/**_, err = this.db.Exec(`UPDATE "` + this.statTableName + `" SET isUploaded=1 WHERE id IN (` + strings.Join(idStrings, ",") + `)`)
if err != nil {
return err
}**/
}
}
// 休息一下,防止短时间内上传数据过多
if pauseDuration > 0 {
time.Sleep(pauseDuration)
uploadTr.Add(-pauseDuration)
}
}
return nil
}
// 加载服务ID
func (this *SQLiteTask) loadServerIdMap() error {
{
rows, err := this.db.Query(`SELECT DISTINCT "serverId" FROM `+this.statTableName+" WHERE version=?", this.itemConfig.Version)
if err != nil {
return err
}
defer func() {
_ = rows.Close()
}()
var serverId int64
for rows.Next() {
err = rows.Scan(&serverId)
if err != nil {
return err
}
this.serverIdMapLocker.Lock()
this.serverIdMap[serverId] = zero.New()
this.serverIdMapLocker.Unlock()
}
}
{
rows, err := this.db.Query(`SELECT DISTINCT "time" FROM `+this.statTableName+" WHERE version=?", this.itemConfig.Version)
if err != nil {
return err
}
defer func() {
_ = rows.Close()
}()
var timeString string
for rows.Next() {
err = rows.Scan(&timeString)
if err != nil {
return err
}
this.serverIdMapLocker.Lock()
this.timeMap[timeString] = zero.New()
this.serverIdMapLocker.Unlock()
}
}
return nil
}
// 计算数量和综合
func (this *SQLiteTask) sum(serverId int64, time string) (count int64, total float64, err error) {
rows, err := this.sumStmt.Query(serverId, this.itemConfig.Version, time)
if err != nil {
return 0, 0, err
}
defer func() {
_ = rows.Close()
}()
if rows.Next() {
err = rows.Scan(&count, &total)
if err != nil {
return 0, 0, err
}
}
return
}

View File

@@ -26,3 +26,7 @@ func (this *tracker) End() {
func (this *tracker) Begin(subLabel string) *tracker {
return Begin(this.label + ":" + subLabel)
}
func (this *tracker) Add(duration time.Duration) {
this.startTime = this.startTime.Add(-duration)
}

View File

@@ -1,8 +1,9 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package trackers
package trackers_test
import (
"github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/iwind/TeaGo/logs"
"testing"
"time"
@@ -10,38 +11,46 @@ import (
func TestNewManager(t *testing.T) {
{
var tr = Begin("a")
var tr = trackers.Begin("a")
tr.End()
}
{
var tr = Begin("a")
var tr = trackers.Begin("a")
time.Sleep(1 * time.Millisecond)
tr.End()
}
{
var tr = Begin("a")
var tr = trackers.Begin("a")
time.Sleep(2 * time.Millisecond)
tr.End()
}
{
var tr = Begin("a")
var tr = trackers.Begin("a")
time.Sleep(3 * time.Millisecond)
tr.End()
}
{
var tr = Begin("a")
var tr = trackers.Begin("a")
time.Sleep(4 * time.Millisecond)
tr.End()
}
{
var tr = Begin("a")
var tr = trackers.Begin("a")
time.Sleep(5 * time.Millisecond)
tr.End()
}
{
var tr = Begin("b")
var tr = trackers.Begin("b")
tr.End()
}
logs.PrintAsJSON(SharedManager.Labels(), t)
logs.PrintAsJSON(trackers.SharedManager.Labels(), t)
}
func TestTrackers_Add(t *testing.T) {
var tr = trackers.Begin("a")
time.Sleep(50 * time.Millisecond)
tr.Add(-10 * time.Millisecond)
tr.End()
t.Log(trackers.SharedManager.Labels())
}

View File

@@ -199,7 +199,7 @@ func (this *Manager) loadDB() error {
var sqlitePath = Tea.Root + "/data/agents.db"
_, sqliteErr := os.Stat(sqlitePath)
var db DB
if sqliteErr == nil {
if sqliteErr == nil || !teaconst.EnableKVCacheStore {
db = NewSQLiteDB(sqlitePath)
} else {
db = NewKVDB()

View File

@@ -18,8 +18,8 @@ func Append(b []byte, b2 ...byte) []byte {
return append(Copy(b), b2...)
}
// Contact bytes
func Contact(b []byte, b2 ...[]byte) []byte {
// Concat bytes
func Concat(b []byte, b2 ...[]byte) []byte {
b = Copy(b)
for _, b3 := range b2 {
b = append(b, b3...)

View File

@@ -39,7 +39,7 @@ func TestConcat(t *testing.T) {
var prefix []byte
prefix = append(prefix, 1, 2, 3)
var b = byteutils.Contact(prefix, []byte{4, 5, 6}, []byte{7})
var b = byteutils.Concat(prefix, []byte{4, 5, 6}, []byte{7})
t.Log(b)
a.IsTrue(bytes.Equal(b, []byte{1, 2, 3, 4, 5, 6, 7}))

View File

@@ -52,6 +52,15 @@ func (this *DB) Store() *Store {
return this.store
}
// Truncate the database
func (this *DB) Truncate() error {
this.mu.Lock()
defer this.mu.Unlock()
var start = []byte(this.Namespace())
return this.store.rawDB.DeleteRange(start, append(start, 0xFF), DefaultWriteOptions)
}
func (this *DB) Close() error {
this.mu.Lock()
defer this.mu.Unlock()

View File

@@ -162,14 +162,22 @@ func (this *Store) Delete(keyBytes []byte) error {
}
func (this *Store) NewDB(dbName string) (*DB, error) {
this.mu.Lock()
defer this.mu.Unlock()
// check existence
for _, db := range this.dbs {
if db.name == dbName {
return db, nil
}
}
// create new
db, err := NewDB(this, dbName)
if err != nil {
return nil, err
}
this.mu.Lock()
defer this.mu.Unlock()
this.dbs = append(this.dbs, db)
return db, nil
}

View File

@@ -6,6 +6,7 @@ import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"github.com/cockroachdb/pebble"
"github.com/iwind/TeaGo/types"
"sync"
@@ -213,6 +214,10 @@ func (this *Table[T]) Truncate() error {
return this.db.store.rawDB.DeleteRange(this.Namespace(), append(this.Namespace(), 0xFF), DefaultWriteOptions)
}
func (this *Table[T]) DeleteRange(start string, end string) error {
return this.db.store.rawDB.DeleteRange(this.FullKeyBytes([]byte(start)), this.FullKeyBytes([]byte(end)), DefaultWriteOptions)
}
func (this *Table[T]) Query() *Query[T] {
var query = NewQuery[T]()
query.SetTable(this)
@@ -275,6 +280,7 @@ func (this *Table[T]) DecodeFieldKey(fieldName string, fieldKey []byte) (fieldVa
}
func (this *Table[T]) Close() error {
// nothing to do
return nil
}
@@ -300,7 +306,7 @@ func (this *Table[T]) deleteKeys(tx *Tx[T], key ...string) error {
value, decodeErr := this.encoder.Decode(valueBytes)
if decodeErr != nil {
return decodeErr
return fmt.Errorf("decode value failed: %w", decodeErr)
}
for _, fieldName := range this.fieldNames {
@@ -360,7 +366,7 @@ func (this *Table[T]) set(tx *Tx[T], key string, valueBytes []byte, value T, ins
var decodeErr error
oldValue, decodeErr = this.encoder.Decode(oldValueBytes)
if decodeErr != nil {
return decodeErr
return fmt.Errorf("decode value failed: %w", decodeErr)
}
oldFound = true
}
@@ -431,7 +437,7 @@ func (this *Table[T]) getWithKeyBytes(tx *Tx[T], keyBytes []byte) (value T, err
resultValue, decodeErr := this.encoder.Decode(valueBytes)
if decodeErr != nil {
return value, decodeErr
return value, fmt.Errorf("decode value failed: %w", decodeErr)
}
value = resultValue
return

View File

@@ -0,0 +1,22 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package kvstore
type NilValueEncoder[T []byte] struct {
}
func NewNilValueEncoder[T []byte]() *NilValueEncoder[T] {
return &NilValueEncoder[T]{}
}
func (this *NilValueEncoder[T]) Encode(value T) ([]byte, error) {
return nil, nil
}
func (this *NilValueEncoder[T]) EncodeField(value T, fieldName string) ([]byte, error) {
return nil, nil
}
func (this *NilValueEncoder[T]) Decode(valueData []byte) (value T, err error) {
return
}