From 4c30c28b4c7f1e9a1d9b93a2ec73e7b1cef92e00 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Tue, 2 Apr 2024 19:54:04 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8KV=E5=AD=98=E5=82=A8=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=E6=8C=87=E6=A0=87=E7=BB=9F=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/iplibrary/manager_ip_list.go | 2 +- internal/metrics/manager.go | 44 +- internal/metrics/manager_test.go | 21 +- internal/metrics/stat.go | 112 +++- internal/metrics/stat_test.go | 69 +++ internal/metrics/sum_test.go | 2 +- internal/metrics/task.go | 547 +----------------- internal/metrics/task_base.go | 75 +++ internal/metrics/task_kv.go | 534 +++++++++++++++++ internal/metrics/task_kv_objects.go | 24 + .../metrics/{task_test.go => task_kv_test.go} | 142 ++++- internal/metrics/task_sqlite.go | 505 ++++++++++++++++ internal/trackers/label.go | 4 + internal/trackers/manager_test.go | 27 +- internal/utils/agents/manager.go | 2 +- internal/utils/byte/utils.go | 4 +- internal/utils/byte/utils_test.go | 2 +- internal/utils/kvstore/db.go | 9 + internal/utils/kvstore/store.go | 14 +- internal/utils/kvstore/table.go | 12 +- internal/utils/kvstore/value_encoder_nil.go | 22 + 21 files changed, 1561 insertions(+), 612 deletions(-) create mode 100644 internal/metrics/stat_test.go create mode 100644 internal/metrics/task_base.go create mode 100644 internal/metrics/task_kv.go create mode 100644 internal/metrics/task_kv_objects.go rename internal/metrics/{task_test.go => task_kv_test.go} (64%) create mode 100644 internal/metrics/task_sqlite.go create mode 100644 internal/utils/kvstore/value_encoder_nil.go diff --git a/internal/iplibrary/manager_ip_list.go b/internal/iplibrary/manager_ip_list.go index 782e135..c6dffb9 100644 --- a/internal/iplibrary/manager_ip_list.go +++ b/internal/iplibrary/manager_ip_list.go @@ -119,7 +119,7 @@ func (this *IPListManager) init() { var db IPListDB var err error - if sqliteErr == nil { + if sqliteErr == nil || !teaconst.EnableKVCacheStore { db, err = NewSQLiteIPList() } else { db, err = NewKVIPList() diff --git a/internal/metrics/manager.go b/internal/metrics/manager.go index ad08f16..e503966 100644 --- a/internal/metrics/manager.go +++ b/internal/metrics/manager.go @@ -26,8 +26,8 @@ func init() { type Manager struct { isQuiting bool - taskMap map[int64]*Task // itemId => *Task - categoryTaskMap map[string][]*Task // category => []*Task + taskMap map[int64]Task // itemId => Task + categoryTaskMap map[string][]Task // category => []Task locker sync.RWMutex hasHTTPMetrics bool @@ -37,8 +37,8 @@ type Manager struct { func NewManager() *Manager { return &Manager{ - taskMap: map[int64]*Task{}, - categoryTaskMap: map[string][]*Task{}, + taskMap: map[int64]Task{}, + categoryTaskMap: map[string][]Task{}, } } @@ -64,11 +64,20 @@ func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) { if err != nil { remotelogs.Error("METRIC_MANAGER", "stop task '"+strconv.FormatInt(itemId, 10)+"' failed: "+err.Error()) } + + // deleted + if newItem != nil && !newItem.IsOn { + deleteErr := task.Delete() + if deleteErr != nil { + remotelogs.Error("METRIC_MANAGER", "delete task '"+strconv.FormatInt(itemId, 10)+"' failed: "+err.Error()) + } + } + delete(this.taskMap, itemId) } else { // 更新已存在的 - if newItem.Version != task.item.Version { + if newItem.Version != task.Item().Version { remotelogs.Println("METRIC_MANAGER", "update task '"+strconv.FormatInt(itemId, 10)+"'") - task.item = newItem + task.SetItem(newItem) } } } @@ -81,7 +90,14 @@ func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) { _, ok := this.taskMap[newItem.Id] if !ok { remotelogs.Println("METRIC_MANAGER", "start task '"+strconv.FormatInt(newItem.Id, 10)+"'") - task := NewTask(newItem) + var task Task + + if CheckSQLiteDB(newItem.Id) || !teaconst.EnableKVCacheStore { + task = NewSQLiteTask(newItem) + } else { + task = NewKVTask(newItem) + } + err := task.Init() if err != nil { remotelogs.Error("METRIC_MANAGER", "initialized task failed: "+err.Error()) @@ -100,13 +116,13 @@ func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) { this.hasHTTPMetrics = false this.hasTCPMetrics = false this.hasUDPMetrics = false - this.categoryTaskMap = map[string][]*Task{} + this.categoryTaskMap = map[string][]Task{} for _, task := range this.taskMap { - var tasks = this.categoryTaskMap[task.item.Category] + var tasks = this.categoryTaskMap[task.Item().Category] tasks = append(tasks, task) - this.categoryTaskMap[task.item.Category] = tasks + this.categoryTaskMap[task.Item().Category] = tasks - switch task.item.Category { + switch task.Item().Category { case serverconfigs.MetricItemCategoryHTTP: this.hasHTTPMetrics = true case serverconfigs.MetricItemCategoryTCP: @@ -144,6 +160,10 @@ func (this *Manager) HasUDPMetrics() bool { return this.hasUDPMetrics } +func (this *Manager) TaskMap() map[int64]Task { + return this.taskMap +} + // Quit 退出管理器 func (this *Manager) Quit() { this.isQuiting = true @@ -154,6 +174,6 @@ func (this *Manager) Quit() { for _, task := range this.taskMap { _ = task.Stop() } - this.taskMap = map[int64]*Task{} + this.taskMap = map[int64]Task{} this.locker.Unlock() } diff --git a/internal/metrics/manager_test.go b/internal/metrics/manager_test.go index 600a4e9..bfb416b 100644 --- a/internal/metrics/manager_test.go +++ b/internal/metrics/manager_test.go @@ -1,18 +1,19 @@ // Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. -package metrics +package metrics_test import ( "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeNode/internal/metrics" "testing" ) func TestNewManager(t *testing.T) { - var manager = NewManager() + var manager = metrics.NewManager() { manager.Update([]*serverconfigs.MetricItemConfig{}) - for _, task := range manager.taskMap { - t.Log(task.item.Id) + for _, task := range manager.TaskMap() { + t.Log(task.Item().Id) } } { @@ -28,8 +29,8 @@ func TestNewManager(t *testing.T) { Id: 3, }, }) - for _, task := range manager.taskMap { - t.Log("task:", task.item.Id) + for _, task := range manager.TaskMap() { + t.Log("task:", task.Item().Id) } } @@ -43,8 +44,8 @@ func TestNewManager(t *testing.T) { Id: 2, }, }) - for _, task := range manager.taskMap { - t.Log("task:", task.item.Id) + for _, task := range manager.TaskMap() { + t.Log("task:", task.Item().Id) } } @@ -56,8 +57,8 @@ func TestNewManager(t *testing.T) { Version: 1, }, }) - for _, task := range manager.taskMap { - t.Log("task:", task.item.Id) + for _, task := range manager.TaskMap() { + t.Log("task:", task.Item().Id) } } } diff --git a/internal/metrics/stat.go b/internal/metrics/stat.go index b48964a..f57b8bb 100644 --- a/internal/metrics/stat.go +++ b/internal/metrics/stat.go @@ -3,20 +3,120 @@ package metrics import ( + "bytes" + "encoding/binary" + "errors" + byteutils "github.com/TeaOSLab/EdgeNode/internal/utils/byte" "github.com/TeaOSLab/EdgeNode/internal/utils/fnv" "strconv" "strings" ) type Stat struct { - ServerId int64 - Keys []string - Hash string - Value int64 - Time string + ServerId int64 `json:"serverId"` + Keys []string `json:"keys"` + Hash string `json:"hash"` + Value int64 `json:"value"` + Time string `json:"time"` } -func SumStat(serverId int64, keys []string, time string, version int32, itemId int64) string { +func UniqueKey(serverId int64, keys []string, time string, version int32, itemId int64) string { var keysData = strings.Join(keys, "$EDGE$") return strconv.FormatUint(fnv.HashString(strconv.FormatInt(serverId, 10)+"@"+keysData+"@"+time+"@"+strconv.Itoa(int(version))+"@"+strconv.FormatInt(itemId, 10)), 10) } + +func (this *Stat) UniqueKey(version int32, itemId int64) string { + return UniqueKey(this.ServerId, this.Keys, this.Time, version, itemId) +} + +func (this *Stat) FullKey(version int32, itemId int64) string { + return this.Time + "_" + string(int32ToBigEndian(version)) + this.UniqueKey(version, itemId) +} + +func (this *Stat) EncodeValueKey(version int32) string { + if this.Value < 0 { + this.Value = 0 + } + + return string(byteutils.Concat([]byte(this.Time), []byte{'_'}, int32ToBigEndian(version), int64ToBigEndian(this.ServerId), int64ToBigEndian(this.Value), []byte(this.Hash))) +} + +func (this *Stat) EncodeSumKey(version int32) string { + return string(byteutils.Concat([]byte(this.Time), []byte{'_'}, int32ToBigEndian(version), int64ToBigEndian(this.ServerId))) +} + +func DecodeValueKey(valueKey string) (serverId int64, timeString string, version int32, value int64, hash string, err error) { + var b = []byte(valueKey) + var timeIndex = bytes.Index(b, []byte{'_'}) + if timeIndex < 0 { + return + } + + timeString = string(b[:timeIndex]) + b = b[timeIndex+1:] + + if len(b) < 20+1 { + err = errors.New("invalid value key") + return + } + + version = int32(binary.BigEndian.Uint32(b[0:4])) + serverId = int64(binary.BigEndian.Uint64(b[4:12])) + value = int64(binary.BigEndian.Uint64(b[12:20])) + hash = string(b[20:]) + return +} + +func DecodeSumKey(sumKey string) (serverId int64, timeString string, version int32, err error) { + var b = []byte(sumKey) + var timeIndex = bytes.Index(b, []byte{'_'}) + if timeIndex < 0 { + return + } + + timeString = string(b[:timeIndex]) + b = b[timeIndex+1:] + + if len(b) < 12 { + err = errors.New("invalid sum key") + return + } + + version = int32(binary.BigEndian.Uint32(b[:4])) + serverId = int64(binary.BigEndian.Uint64(b[4:12])) + return +} + +func EncodeSumValue(count uint64, total uint64) []byte { + var result [16]byte + binary.BigEndian.PutUint64(result[:8], count) + binary.BigEndian.PutUint64(result[8:], total) + return result[:] +} + +func DecodeSumValue(data []byte) (count uint64, total uint64) { + if len(data) != 16 { + return + } + count = binary.BigEndian.Uint64(data[:8]) + total = binary.BigEndian.Uint64(data[8:]) + return +} + +func int64ToBigEndian(i int64) []byte { + if i < 0 { + i = 0 + } + var b = make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(i)) + return b +} + +func int32ToBigEndian(i int32) []byte { + if i < 0 { + i = 0 + } + var b = make([]byte, 4) + binary.BigEndian.PutUint32(b, uint32(i)) + return b +} diff --git a/internal/metrics/stat_test.go b/internal/metrics/stat_test.go new file mode 100644 index 0000000..3e02026 --- /dev/null +++ b/internal/metrics/stat_test.go @@ -0,0 +1,69 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package metrics_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/metrics" + "github.com/iwind/TeaGo/assert" + "testing" +) + +func TestStat_EncodeValueKey(t *testing.T) { + var a = assert.NewAssertion(t) + + var stat = &metrics.Stat{ + ServerId: 1, + Keys: []string{"${remoteAddr}"}, + Hash: "123456", + Value: 123, + Time: "20240101", + } + + var valueKey = stat.EncodeValueKey(100) + t.Log(valueKey) + + serverId, timeString, version, value, hash, err := metrics.DecodeValueKey(valueKey) + if err != nil { + t.Fatal(err) + } + t.Log(serverId, timeString, value, version, hash) + a.IsTrue(serverId == 1) + a.IsTrue(timeString == "20240101") + a.IsTrue(value == 123) + a.IsTrue(version == 100) + a.IsTrue(hash == "123456") +} + +func TestStat_EncodeSumKey(t *testing.T) { + var a = assert.NewAssertion(t) + + var stat = &metrics.Stat{ + ServerId: 1, + Keys: []string{"${remoteAddr}"}, + Hash: "123456", + Value: 123, + Time: "20240101", + } + var sumKey = stat.EncodeSumKey(100) + t.Log(sumKey) + + serverId, timeString, version, err := metrics.DecodeSumKey(sumKey) + if err != nil { + t.Fatal(err) + } + t.Log(serverId, timeString, version) + a.IsTrue(serverId == 1) + a.IsTrue(timeString == "20240101") + a.IsTrue(version == 100) +} + +func TestStat_EncodeSumValue(t *testing.T) { + var a = assert.NewAssertion(t) + + var b = metrics.EncodeSumValue(123, 456) + t.Log(b) + + count, sum := metrics.DecodeSumValue(b) + a.IsTrue(count == 123) + a.IsTrue(sum == 456) +} diff --git a/internal/metrics/sum_test.go b/internal/metrics/sum_test.go index 65b3c56..e9f2a89 100644 --- a/internal/metrics/sum_test.go +++ b/internal/metrics/sum_test.go @@ -14,7 +14,7 @@ func BenchmarkSumStat(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { - metrics.SumStat(1, []string{"1.2.3.4"}, timeutil.Format("Ymd"), 1, 1) + metrics.UniqueKey(1, []string{"1.2.3.4"}, timeutil.Format("Ymd"), 1, 1) } }) } diff --git a/internal/metrics/task.go b/internal/metrics/task.go index a872ab7..c6207de 100644 --- a/internal/metrics/task.go +++ b/internal/metrics/task.go @@ -1,544 +1,21 @@ -// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . package metrics import ( - "encoding/json" - "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" - teaconst "github.com/TeaOSLab/EdgeNode/internal/const" - "github.com/TeaOSLab/EdgeNode/internal/goman" - "github.com/TeaOSLab/EdgeNode/internal/remotelogs" - "github.com/TeaOSLab/EdgeNode/internal/rpc" - "github.com/TeaOSLab/EdgeNode/internal/trackers" - "github.com/TeaOSLab/EdgeNode/internal/utils" - "github.com/TeaOSLab/EdgeNode/internal/utils/dbs" - "github.com/TeaOSLab/EdgeNode/internal/zero" - "github.com/iwind/TeaGo/Tea" - "github.com/iwind/TeaGo/types" - "os" - "strconv" - "sync" - "sync/atomic" "time" ) -const MaxQueueSize = 256 // TODO 可以配置,可以在单个任务里配置 - -// Task 单个指标任务 -// 数据库存储: -// -// data/ -// metric.$ID.db -// stats -// id, keys, value, time, serverId, hash -// 原理: -// 添加或者有变更时 isUploaded = false -// 上传时检查 isUploaded 状态 -// 只上传每个服务中排序最前面的 N 个数据 -type Task struct { - item *serverconfigs.MetricItemConfig - isLoaded bool - - db *dbs.DB - statTableName string - isStopped bool - - cleanTicker *utils.Ticker - uploadTicker *utils.Ticker - - cleanVersion int32 - - insertStatStmt *dbs.Stmt - deleteByVersionStmt *dbs.Stmt - deleteByExpiresTimeStmt *dbs.Stmt - selectTopStmt *dbs.Stmt - sumStmt *dbs.Stmt - - serverIdMap map[int64]zero.Zero // 所有的服务Ids - timeMap map[string]zero.Zero // time => bool - serverIdMapLocker sync.Mutex - - statsMap map[string]*Stat // 待写入队列,hash => *Stat - statsLocker sync.RWMutex - statsTicker *utils.Ticker -} - -// NewTask 获取新任务 -func NewTask(item *serverconfigs.MetricItemConfig) *Task { - return &Task{ - item: item, - serverIdMap: map[int64]zero.Zero{}, - timeMap: map[string]zero.Zero{}, - statsMap: map[string]*Stat{}, - } -} - -// Init 初始化 -func (this *Task) Init() error { - this.statTableName = "stats" - - // 检查目录是否存在 - var dir = Tea.Root + "/data" - _, err := os.Stat(dir) - if err != nil { - err = os.MkdirAll(dir, 0777) - if err != nil { - return err - } - remotelogs.Println("METRIC", "create data dir '"+dir+"'") - } - - var path = dir + "/metric." + types.String(this.item.Id) + ".db" - - db, err := dbs.OpenWriter("file:" + path + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=" + dbs.SyncMode + "&_locking_mode=EXCLUSIVE") - if err != nil { - return err - } - db.SetMaxOpenConns(1) - this.db = db - - // 恢复数据库 - var recoverEnv, _ = os.LookupEnv("EdgeRecover") - if len(recoverEnv) > 0 { - for _, indexName := range []string{"serverId", "hash"} { - _, _ = db.Exec(`REINDEX "` + indexName + `"`) - } - } - - if teaconst.EnableDBStat { - this.db.EnableStat(true) - } - - //创建统计表 - _, err = db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.statTableName + `" ( - "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, - "hash" varchar(32), - "keys" varchar(1024), - "value" real DEFAULT 0, - "time" varchar(32), - "serverId" integer DEFAULT 0, - "version" integer DEFAULT 0, - "isUploaded" integer DEFAULT 0 -); - -CREATE INDEX IF NOT EXISTS "serverId" -ON "` + this.statTableName + `" ( - "serverId" ASC, - "version" ASC -); - -CREATE UNIQUE INDEX IF NOT EXISTS "hash" -ON "` + this.statTableName + `" ( - "hash" ASC -);`) - if err != nil { - return err - } - - // insert stat stmt - this.insertStatStmt, err = db.Prepare(`INSERT INTO "stats" ("serverId", "hash", "keys", "value", "time", "version", "isUploaded") VALUES (?, ?, ?, ?, ?, ?, 0) ON CONFLICT("hash") DO UPDATE SET "value"="value"+?, "isUploaded"=0`) - if err != nil { - return err - } - - // delete by version - this.deleteByVersionStmt, err = db.Prepare(`DELETE FROM "` + this.statTableName + `" WHERE "version" 0 { - err = json.Unmarshal(keysData, &pbStat.Keys) - if err != nil { - return nil, err - } - } - pbStats = append(pbStats, pbStat) - ids = append(ids, strconv.FormatInt(pbStat.Id, 10)) - } - - // 提前关闭 - _ = rows.Close() - isClosed = true - - // 上传 - if len(pbStats) > 0 { - // 计算总和 - count, total, err := this.sum(serverId, currentTime) - if err != nil { - return nil, err - } - - _, err = rpcClient.MetricStatRPC.UploadMetricStats(rpcClient.Context(), &pb.UploadMetricStatsRequest{ - MetricStats: pbStats, - Time: currentTime, - ServerId: serverId, - ItemId: this.item.Id, - Version: this.item.Version, - Count: count, - Total: float32(total), - }) - if err != nil { - return nil, err - } - } - - return - }(serverId, currentTime) - if err != nil { - return err - } - - if len(idStrings) > 0 { - // 设置为已上传 - // TODO 先不判断是否已经上传,需要改造API进行配合 - /**_, err = this.db.Exec(`UPDATE "` + this.statTableName + `" SET isUploaded=1 WHERE id IN (` + strings.Join(idStrings, ",") + `)`) - if err != nil { - return err - }**/ - } - } - - // 休息一下,防止短时间内上传数据过多 - if pauseDuration > 0 { - time.Sleep(pauseDuration) - } - } - - return nil -} - -// 加载服务ID -func (this *Task) loadServerIdMap() error { - { - rows, err := this.db.Query(`SELECT DISTINCT "serverId" FROM `+this.statTableName+" WHERE version=?", this.item.Version) - if err != nil { - return err - } - defer func() { - _ = rows.Close() - }() - - var serverId int64 - for rows.Next() { - err = rows.Scan(&serverId) - if err != nil { - return err - } - this.serverIdMapLocker.Lock() - this.serverIdMap[serverId] = zero.New() - this.serverIdMapLocker.Unlock() - } - } - - { - rows, err := this.db.Query(`SELECT DISTINCT "time" FROM `+this.statTableName+" WHERE version=?", this.item.Version) - if err != nil { - return err - } - defer func() { - _ = rows.Close() - }() - - var timeString string - for rows.Next() { - err = rows.Scan(&timeString) - if err != nil { - return err - } - this.serverIdMapLocker.Lock() - this.timeMap[timeString] = zero.New() - this.serverIdMapLocker.Unlock() - } - } - - return nil -} - -// 计算数量和综合 -func (this *Task) sum(serverId int64, time string) (count int64, total float64, err error) { - rows, err := this.sumStmt.Query(serverId, this.item.Version, time) - if err != nil { - return 0, 0, err - } - defer func() { - _ = rows.Close() - }() - if rows.Next() { - err = rows.Scan(&count, &total) - if err != nil { - return 0, 0, err - } - } - return +type Task interface { + Init() error + Item() *serverconfigs.MetricItemConfig + SetItem(item *serverconfigs.MetricItemConfig) + Add(obj MetricInterface) + InsertStat(stat *Stat) error + Upload(pauseDuration time.Duration) error + Start() error + Stop() error + Delete() error + CleanExpired() error } diff --git a/internal/metrics/task_base.go b/internal/metrics/task_base.go new file mode 100644 index 0000000..766675e --- /dev/null +++ b/internal/metrics/task_base.go @@ -0,0 +1,75 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package metrics + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "sync" + "sync/atomic" +) + +type BaseTask struct { + itemConfig *serverconfigs.MetricItemConfig + isLoaded bool + isStopped bool + + statsMap map[string]*Stat // 待写入队列,hash => *Stat + statsLocker sync.RWMutex +} + +// Add 添加数据 +func (this *BaseTask) Add(obj MetricInterface) { + if this.isStopped || !this.isLoaded { + return + } + + var keys = []string{} + for _, key := range this.itemConfig.Keys { + var k = obj.MetricKey(key) + + // 忽略499状态 + if key == "${status}" && k == "499" { + return + } + + keys = append(keys, k) + } + + v, ok := obj.MetricValue(this.itemConfig.Value) + if !ok { + return + } + + var hash = UniqueKey(obj.MetricServerId(), keys, this.itemConfig.CurrentTime(), this.itemConfig.Version, this.itemConfig.Id) + var countItems int + this.statsLocker.RLock() + oldStat, ok := this.statsMap[hash] + if !ok { + countItems = len(this.statsMap) + } + this.statsLocker.RUnlock() + if ok { + atomic.AddInt64(&oldStat.Value, 1) + } else { + // 防止过载 + if countItems < MaxQueueSize { + this.statsLocker.Lock() + this.statsMap[hash] = &Stat{ + ServerId: obj.MetricServerId(), + Keys: keys, + Value: v, + Time: this.itemConfig.CurrentTime(), + Hash: hash, + } + this.statsLocker.Unlock() + } + } +} + +func (this *BaseTask) Item() *serverconfigs.MetricItemConfig { + return this.itemConfig +} + +func (this *BaseTask) SetItem(itemConfig *serverconfigs.MetricItemConfig) { + this.itemConfig = itemConfig +} diff --git a/internal/metrics/task_kv.go b/internal/metrics/task_kv.go new file mode 100644 index 0000000..ac6d49f --- /dev/null +++ b/internal/metrics/task_kv.go @@ -0,0 +1,534 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package metrics + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeNode/internal/goman" + "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/TeaOSLab/EdgeNode/internal/rpc" + "github.com/TeaOSLab/EdgeNode/internal/trackers" + "github.com/TeaOSLab/EdgeNode/internal/utils" + byteutils "github.com/TeaOSLab/EdgeNode/internal/utils/byte" + "github.com/TeaOSLab/EdgeNode/internal/utils/kvstore" + "github.com/TeaOSLab/EdgeNode/internal/zero" + "github.com/cockroachdb/pebble" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/types" + "strings" + "sync" + "testing" + "time" +) + +// TODO sumValues不用每次insertStat的时候都保存 + +// KVTask KV存储实现的任务管理 +type KVTask struct { + BaseTask + + itemsTable *kvstore.Table[*Stat] // hash => *Stat + valuesTable *kvstore.Table[[]byte] // time_version_serverId_value_hash => []byte(nil) + sumTable *kvstore.Table[[]byte] // time_version_serverId => [count][total] + + serverTimeMap map[string]zero.Zero // 有变更的网站 serverId_time => Zero + serverIdMapLocker sync.Mutex + + statsTicker *utils.Ticker + cleanTicker *utils.Ticker + uploadTicker *utils.Ticker + + valuesCacheMap map[string]int64 // hash => value +} + +func NewKVTask(itemConfig *serverconfigs.MetricItemConfig) *KVTask { + return &KVTask{ + BaseTask: BaseTask{ + itemConfig: itemConfig, + statsMap: map[string]*Stat{}, + }, + + serverTimeMap: map[string]zero.Zero{}, + valuesCacheMap: map[string]int64{}, + } +} + +func (this *KVTask) Init() error { + store, err := kvstore.DefaultStore() + if err != nil { + return err + } + + db, err := store.NewDB("metrics" + types.String(this.itemConfig.Id)) + if err != nil { + return err + } + + { + table, tableErr := kvstore.NewTable[*Stat]("items", &ItemEncoder[*Stat]{}) + if tableErr != nil { + return tableErr + } + db.AddTable(table) + this.itemsTable = table + } + + { + table, tableErr := kvstore.NewTable[[]byte]("values", kvstore.NewNilValueEncoder()) + if tableErr != nil { + return tableErr + } + db.AddTable(table) + this.valuesTable = table + } + + { + table, tableErr := kvstore.NewTable[[]byte]("sumValues", kvstore.NewBytesValueEncoder()) + if tableErr != nil { + return tableErr + } + db.AddTable(table) + this.sumTable = table + } + + // 所有的服务IDs + err = this.loadServerIdMap() + if err != nil { + return err + } + + this.isLoaded = true + + return nil +} + +func (this *KVTask) InsertStat(stat *Stat) error { + if this.isStopped { + return nil + } + if stat == nil { + return nil + } + + var version = this.itemConfig.Version + + this.serverIdMapLocker.Lock() + this.serverTimeMap[types.String(stat.ServerId)+"_"+stat.Time] = zero.New() + this.serverIdMapLocker.Unlock() + + if len(stat.Hash) == 0 { + stat.Hash = stat.UniqueKey(version, this.itemConfig.Id) + } + + var isNew bool + var newValue = stat.Value + + // insert or update + { + var statKey = stat.FullKey(version, this.itemConfig.Id) + oldStat, err := this.itemsTable.Get(statKey) + var oldValue int64 + if err != nil { + if !kvstore.IsNotFound(err) { + return err + } + isNew = true + } else { + oldValue = oldStat.Value + + // delete old value from valuesTable + err = this.valuesTable.Delete(oldStat.EncodeValueKey(version)) + if err != nil { + return err + } + } + + oldValue += stat.Value + stat.Value = oldValue + err = this.itemsTable.Set(statKey, stat) + if err != nil { + return err + } + + // insert value into valuesTable + err = this.valuesTable.Insert(stat.EncodeValueKey(version), nil) + if err != nil { + return err + } + } + + // sum + { + var sumKey = stat.EncodeSumKey(version) + sumResult, err := this.sumTable.Get(sumKey) + var count uint64 + var total uint64 + if err != nil { + if !kvstore.IsNotFound(err) { + return err + } + } else { + count, total = DecodeSumValue(sumResult) + } + + if isNew { + count++ + } + total += uint64(newValue) + + err = this.sumTable.Set(sumKey, EncodeSumValue(count, total)) + if err != nil { + return err + } + } + + return nil +} + +func (this *KVTask) Upload(pauseDuration time.Duration) error { + var uploadTr = trackers.Begin("METRIC:UPLOAD_STATS") + defer uploadTr.End() + + if this.isStopped { + return nil + } + + this.serverIdMapLocker.Lock() + + // 服务IDs + var serverTimeMap = this.serverTimeMap + this.serverTimeMap = map[string]zero.Zero{} // 清空数据 + + this.serverIdMapLocker.Unlock() + + if len(serverTimeMap) == 0 { + return nil + } + + // 控制缓存map不要太长 + if len(this.valuesCacheMap) > 4096 { + var newMap = map[string]int64{} + var countElements int + for k, v := range this.valuesCacheMap { + newMap[k] = v + countElements++ + if countElements >= 2048 { + break + } + } + this.valuesCacheMap = newMap + } + + // 开始上传 + rpcClient, err := rpc.SharedRPC() + if err != nil { + return err + } + + var totalCount int + + for serverTime := range serverTimeMap { + count, uploadErr := func(serverTime string) (int, error) { + serverIdString, timeString, found := strings.Cut(serverTime, "_") + if !found { + return 0, nil + } + var serverId = types.Int64(serverIdString) + if serverId <= 0 { + return 0, nil + } + + return this.uploadServerStats(rpcClient, serverId, timeString) + }(serverTime) + if uploadErr != nil { + return uploadErr + } + + totalCount += count + + // 休息一下,防止短时间内上传数据过多 + if pauseDuration > 0 && totalCount >= 100 { + time.Sleep(pauseDuration) + uploadTr.Add(-pauseDuration) + } + } + + return nil +} + +func (this *KVTask) Start() error { + // 读取数据 + this.statsTicker = utils.NewTicker(1 * time.Minute) + if Tea.IsTesting() { + this.statsTicker = utils.NewTicker(10 * time.Second) + } + goman.New(func() { + for this.statsTicker.Next() { + var tr = trackers.Begin("METRIC:DUMP_STATS_TO_LOCAL_DATABASE") + + this.statsLocker.Lock() + var statsMap = this.statsMap + this.statsMap = map[string]*Stat{} + this.statsLocker.Unlock() + + for _, stat := range statsMap { + err := this.InsertStat(stat) + if err != nil { + remotelogs.Error("METRIC", "insert stat failed: "+err.Error()) + } + } + + tr.End() + } + }) + + // 清理 + this.cleanTicker = utils.NewTicker(24 * time.Hour) + goman.New(func() { + for this.cleanTicker.Next() { + var tr = trackers.Begin("METRIC:CLEAN_EXPIRED") + err := this.CleanExpired() + tr.End() + if err != nil { + remotelogs.Error("METRIC", "clean expired stats failed: "+err.Error()) + } + } + }) + + // 上传 + this.uploadTicker = utils.NewTicker(this.itemConfig.UploadDuration()) + goman.New(func() { + for this.uploadTicker.Next() { + err := this.Upload(1 * time.Second) + if err != nil && !rpc.IsConnError(err) { + remotelogs.Error("METRIC", "upload stats failed: "+err.Error()) + } + } + }) + + return nil +} + +func (this *KVTask) Stop() error { + this.isStopped = true + + if this.cleanTicker != nil { + this.cleanTicker.Stop() + } + if this.uploadTicker != nil { + this.uploadTicker.Stop() + } + if this.statsTicker != nil { + this.statsTicker.Stop() + } + + return nil +} + +func (this *KVTask) Delete() error { + this.isStopped = true + + return this.itemsTable.DB().Truncate() +} + +func (this *KVTask) CleanExpired() error { + if this.isStopped { + return nil + } + + var versionBytes = int32ToBigEndian(this.itemConfig.Version) + var expiresTime = this.itemConfig.LocalExpiresTime() + + var rangeEnd = append([]byte(expiresTime+"_"), versionBytes...) + rangeEnd = append(rangeEnd, 0xFF, 0xFF) + + { + err := this.itemsTable.DeleteRange("", string(rangeEnd)) + if err != nil { + return err + } + } + + { + err := this.valuesTable.DeleteRange("", string(rangeEnd)) + if err != nil { + return err + } + } + + { + err := this.sumTable.DeleteRange("", string(rangeEnd)) + if err != nil { + return err + } + } + + return nil +} + +func (this *KVTask) Flush() error { + return this.itemsTable.DB().Store().Flush() +} + +func (this *KVTask) TestInspect(t *testing.T) { + var db = this.itemsTable.DB() + it, err := db.Store().RawDB().NewIter(&pebble.IterOptions{ + LowerBound: []byte(db.Namespace()), + UpperBound: append([]byte(db.Namespace()), 0xFF), + }) + if err != nil { + t.Fatal(err) + } + defer func() { + _ = it.Close() + }() + + for it.First(); it.Valid(); it.Next() { + valueBytes, valueErr := it.ValueAndErr() + if valueErr != nil { + t.Fatal(valueErr) + } + var key = string(it.Key()[len(db.Namespace())-1:]) + t.Log(key, "=>", string(valueBytes)) + if strings.HasPrefix(key, "$values$K$") { + _, _, _, value, hash, _ := DecodeValueKey(key[len("$values$K$"):]) + t.Log(" |", hash, "=>", value) + } else if strings.HasPrefix(key, "$sumValues$K$") { + count, sum := DecodeSumValue(valueBytes) + t.Log(" |", count, sum) + } + } +} + +func (this *KVTask) Truncate() error { + var db = this.itemsTable.DB() + err := db.Truncate() + if err != nil { + return err + } + return db.Store().Flush() +} + +func (this *KVTask) uploadServerStats(rpcClient *rpc.RPCClient, serverId int64, currentTime string) (countValues int, uploadErr error) { + var pbStats []*pb.UploadingMetricStat + var keepKeys []string + + var prefix = string(byteutils.Concat([]byte(currentTime), []byte{'_'}, int32ToBigEndian(this.itemConfig.Version), int64ToBigEndian(serverId))) + var newCachedKeys = map[string]int64{} + queryErr := this.valuesTable. + Query(). + Prefix(prefix). + Desc(). + Limit(20). + FindAll(func(tx *kvstore.Tx[[]byte], item kvstore.Item[[]byte]) (goNext bool, err error) { + _, _, version, value, hash, decodeErr := DecodeValueKey(item.Key) + if decodeErr != nil { + return false, decodeErr + } + if value <= 0 { + return true, nil + } + + // value not changed for the key + if this.valuesCacheMap[hash] == value { + keepKeys = append(keepKeys, hash) + return true, nil + } + + newCachedKeys[hash] = value + + stat, valueErr := this.itemsTable.Get(string(byteutils.Concat([]byte(currentTime), []byte{'_'}, int32ToBigEndian(version), []byte(hash)))) + if valueErr != nil { + if kvstore.IsNotFound(valueErr) { + return true, nil + } + return false, valueErr + } + if stat == nil { + return true, nil + } + + pbStats = append(pbStats, &pb.UploadingMetricStat{ + Id: 0, // not used in node + Hash: hash, + Keys: stat.Keys, + Value: float32(value), + }) + + return true, nil + }) + if queryErr != nil { + return 0, queryErr + } + + // count & total + var count, total uint64 + { + sumValue, err := this.sumTable.Get(prefix) + if err != nil { + return 0, err + } + count, total = DecodeSumValue(sumValue) + } + + _, err := rpcClient.MetricStatRPC.UploadMetricStats(rpcClient.Context(), &pb.UploadMetricStatsRequest{ + MetricStats: pbStats, + Time: currentTime, + ServerId: serverId, + ItemId: this.itemConfig.Id, + Version: this.itemConfig.Version, + Count: int64(count), + Total: float32(total), + KeepKeys: keepKeys, + }) + if err != nil { + return 0, err + } + + // put into cache map MUST be after uploading success + for k, v := range newCachedKeys { + this.valuesCacheMap[k] = v + } + + return len(pbStats), nil +} + +func (this *KVTask) loadServerIdMap() error { + var offsetKey string + var currentTime = this.itemConfig.CurrentTime() + for { + var found bool + err := this.sumTable. + Query(). + Limit(1000). + Offset(offsetKey). + FindAll(func(tx *kvstore.Tx[[]byte], item kvstore.Item[[]byte]) (goNext bool, err error) { + offsetKey = item.Key + found = true + + serverId, timeString, version, decodeErr := DecodeSumKey(item.Key) + if decodeErr != nil { + return false, decodeErr + } + + if version != this.itemConfig.Version || timeString != currentTime { + return true, nil + } + + this.serverIdMapLocker.Lock() + this.serverTimeMap[types.String(serverId)+"_"+timeString] = zero.New() + this.serverIdMapLocker.Unlock() + + return true, nil + }) + if err != nil { + return err + } + if !found { + break + } + } + + return nil +} diff --git a/internal/metrics/task_kv_objects.go b/internal/metrics/task_kv_objects.go new file mode 100644 index 0000000..208a399 --- /dev/null +++ b/internal/metrics/task_kv_objects.go @@ -0,0 +1,24 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package metrics + +import ( + "encoding/json" + "errors" +) + +type ItemEncoder[T interface{ *Stat }] struct { +} + +func (this *ItemEncoder[T]) Encode(value T) ([]byte, error) { + return json.Marshal(value) +} + +func (this *ItemEncoder[T]) EncodeField(value T, fieldName string) ([]byte, error) { + return nil, errors.New("invalid field name '" + fieldName + "'") +} + +func (this *ItemEncoder[T]) Decode(valueBytes []byte) (value T, err error) { + err = json.Unmarshal(valueBytes, &value) + return +} diff --git a/internal/metrics/task_test.go b/internal/metrics/task_kv_test.go similarity index 64% rename from internal/metrics/task_test.go rename to internal/metrics/task_kv_test.go index e97de0f..4b1d2a0 100644 --- a/internal/metrics/task_test.go +++ b/internal/metrics/task_kv_test.go @@ -37,8 +37,8 @@ func (this *testObj) MetricCategory() string { return "http" } -func TestTask_Init(t *testing.T) { - var task = metrics.NewTask(&serverconfigs.MetricItemConfig{ +func TestKVTask_Init(t *testing.T) { + var task = metrics.NewKVTask(&serverconfigs.MetricItemConfig{ Id: 1, IsOn: false, Category: "", @@ -57,8 +57,8 @@ func TestTask_Init(t *testing.T) { t.Log("ok") } -func TestTask_Add(t *testing.T) { - var task = metrics.NewTask(&serverconfigs.MetricItemConfig{ +func TestKVTask_Add(t *testing.T) { + var task = metrics.NewKVTask(&serverconfigs.MetricItemConfig{ Id: 1, IsOn: false, Category: "", @@ -80,15 +80,18 @@ func TestTask_Add(t *testing.T) { }() task.Add(&testObj{ip: "127.0.0.2"}) - time.Sleep(1 * time.Second) // waiting for inserting + + if testutils.IsSingleTesting() { + time.Sleep(1 * time.Second) // waiting for inserting + } } -func TestTask_Add_Many(t *testing.T) { +func TestKVTask_Add_Many(t *testing.T) { if !testutils.IsSingleTesting() { return } - var task = metrics.NewTask(&serverconfigs.MetricItemConfig{ + var task = metrics.NewKVTask(&serverconfigs.MetricItemConfig{ Id: 1, IsOn: false, Category: "", @@ -120,7 +123,7 @@ func TestTask_Add_Many(t *testing.T) { } } -func TestTask_InsertStat(t *testing.T) { +func TestKVTask_InsertStat(t *testing.T) { var item = &serverconfigs.MetricItemConfig{ Id: 1, IsOn: false, @@ -131,11 +134,19 @@ func TestTask_InsertStat(t *testing.T) { Value: "${countRequest}", Version: 1, } - var task = metrics.NewTask(item) + var task = metrics.NewKVTask(item) err := task.Init() if err != nil { t.Fatal(err) } + + defer func() { + err = task.Flush() + if err != nil { + t.Fatal(err) + } + }() + err = task.Start() if err != nil { t.Fatal(err) @@ -144,21 +155,50 @@ func TestTask_InsertStat(t *testing.T) { _ = task.Stop() }() - err = task.InsertStat(&metrics.Stat{ - ServerId: 1, - Keys: []string{"127.0.0.1"}, - Hash: "", - Value: 1, - Time: item.CurrentTime(), - }) - if err != nil { - t.Fatal(err) + { + err = task.InsertStat(&metrics.Stat{ + ServerId: 1, + Keys: []string{"127.0.0.1"}, + Hash: "", + Value: 1, + Time: item.CurrentTime(), + }) + if err != nil { + t.Fatal(err) + } } - t.Log("ok") + + { + err = task.InsertStat(&metrics.Stat{ + ServerId: 2, + Keys: []string{"127.0.0.2"}, + Hash: "", + Value: 3, + Time: item.CurrentTime(), + }) + if err != nil { + t.Fatal(err) + } + } + + { + err = task.InsertStat(&metrics.Stat{ + ServerId: 1, + Keys: []string{"127.0.0.3"}, + Hash: "", + Value: 2, + Time: item.CurrentTime(), + }) + if err != nil { + t.Fatal(err) + } + } + + TestKVTask_TestInspect(t) } -func TestTask_CleanExpired(t *testing.T) { - var task = metrics.NewTask(&serverconfigs.MetricItemConfig{ +func TestKVTask_CleanExpired(t *testing.T) { + var task = metrics.NewKVTask(&serverconfigs.MetricItemConfig{ Id: 1, IsOn: false, Category: "", @@ -184,12 +224,18 @@ func TestTask_CleanExpired(t *testing.T) { if err != nil { t.Fatal(err) } - t.Log("ok") + + defer func() { + _ = task.Flush() + }() + + t.Log("=== inspect ===") + task.TestInspect(t) } -func TestTask_Upload(t *testing.T) { - var task = metrics.NewTask(&serverconfigs.MetricItemConfig{ - Id: 1, +func TestKVTask_Upload(t *testing.T) { + var task = metrics.NewKVTask(&serverconfigs.MetricItemConfig{ + Id: 31, IsOn: false, Category: "", Period: 1, @@ -218,11 +264,51 @@ func TestTask_Upload(t *testing.T) { t.Log("ok") } -var testingTask *metrics.Task +func TestKVTask_TestInspect(t *testing.T) { + var task = metrics.NewKVTask(&serverconfigs.MetricItemConfig{ + Id: 1, + IsOn: false, + Category: "", + Period: 1, + PeriodUnit: serverconfigs.MetricItemPeriodUnitDay, + Keys: []string{"${remoteAddr}"}, + Value: "${countRequest}", + Version: 1, + }) + err := task.Init() + if err != nil { + t.Fatal(err) + } + + task.TestInspect(t) +} + +func TestKVTask_Truncate(t *testing.T) { + var task = metrics.NewKVTask(&serverconfigs.MetricItemConfig{ + Id: 1, + IsOn: false, + Category: "", + Period: 1, + PeriodUnit: serverconfigs.MetricItemPeriodUnitDay, + Keys: []string{"${remoteAddr}"}, + Value: "${countRequest}", + Version: 1, + }) + err := task.Init() + if err != nil { + t.Fatal(err) + } + + if testutils.IsSingleTesting() { + _ = task.Truncate() + } +} + +var testingTask metrics.Task var testingTaskInitOnce = &sync.Once{} func initTestingTask() { - testingTask = metrics.NewTask(&serverconfigs.MetricItemConfig{ + testingTask = metrics.NewKVTask(&serverconfigs.MetricItemConfig{ Id: 1, IsOn: false, Category: "tcp", @@ -243,7 +329,7 @@ func initTestingTask() { } } -func BenchmarkTask_Add(b *testing.B) { +func BenchmarkKVTask_Add(b *testing.B) { runtime.GOMAXPROCS(1) testingTaskInitOnce.Do(func() { diff --git a/internal/metrics/task_sqlite.go b/internal/metrics/task_sqlite.go new file mode 100644 index 0000000..3ca3181 --- /dev/null +++ b/internal/metrics/task_sqlite.go @@ -0,0 +1,505 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package metrics + +import ( + "encoding/json" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "github.com/TeaOSLab/EdgeNode/internal/goman" + "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/TeaOSLab/EdgeNode/internal/rpc" + "github.com/TeaOSLab/EdgeNode/internal/trackers" + "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/TeaOSLab/EdgeNode/internal/utils/dbs" + "github.com/TeaOSLab/EdgeNode/internal/zero" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/types" + "os" + "strconv" + "sync" + "time" +) + +const MaxQueueSize = 256 // TODO 可以配置,可以在单个任务里配置 + +// SQLiteTask 单个指标任务 +// 数据库存储: +// +// data/ +// metric.$ID.db +// stats +// id, keys, value, time, serverId, hash +// 原理: +// 添加或者有变更时 isUploaded = false +// 上传时检查 isUploaded 状态 +// 只上传每个服务中排序最前面的 N 个数据 +type SQLiteTask struct { + BaseTask + + db *dbs.DB + statTableName string + + cleanTicker *utils.Ticker + uploadTicker *utils.Ticker + + cleanVersion int32 + + insertStatStmt *dbs.Stmt + deleteByVersionStmt *dbs.Stmt + deleteByExpiresTimeStmt *dbs.Stmt + selectTopStmt *dbs.Stmt + sumStmt *dbs.Stmt + + serverIdMap map[int64]zero.Zero // 所有的服务Ids + timeMap map[string]zero.Zero // time => bool + serverIdMapLocker sync.Mutex + + statsTicker *utils.Ticker +} + +// NewSQLiteTask 获取新任务 +func NewSQLiteTask(item *serverconfigs.MetricItemConfig) *SQLiteTask { + return &SQLiteTask{ + BaseTask: BaseTask{ + itemConfig: item, + statsMap: map[string]*Stat{}, + }, + + serverIdMap: map[int64]zero.Zero{}, + timeMap: map[string]zero.Zero{}, + } +} + +func CheckSQLiteDB(itemId int64) bool { + var path = Tea.Root + "/data/metric." + types.String(itemId) + ".db" + _, err := os.Stat(path) + return err == nil +} + +// Init 初始化 +func (this *SQLiteTask) Init() error { + this.statTableName = "stats" + + // 检查目录是否存在 + var dir = Tea.Root + "/data" + _, err := os.Stat(dir) + if err != nil { + err = os.MkdirAll(dir, 0777) + if err != nil { + return err + } + remotelogs.Println("METRIC", "create data dir '"+dir+"'") + } + + var path = dir + "/metric." + types.String(this.itemConfig.Id) + ".db" + + db, err := dbs.OpenWriter("file:" + path + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=" + dbs.SyncMode + "&_locking_mode=EXCLUSIVE") + if err != nil { + return err + } + db.SetMaxOpenConns(1) + this.db = db + + // 恢复数据库 + var recoverEnv, _ = os.LookupEnv("EdgeRecover") + if len(recoverEnv) > 0 { + for _, indexName := range []string{"serverId", "hash"} { + _, _ = db.Exec(`REINDEX "` + indexName + `"`) + } + } + + if teaconst.EnableDBStat { + this.db.EnableStat(true) + } + + //创建统计表 + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.statTableName + `" ( + "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, + "hash" varchar(32), + "keys" varchar(1024), + "value" real DEFAULT 0, + "time" varchar(32), + "serverId" integer DEFAULT 0, + "version" integer DEFAULT 0, + "isUploaded" integer DEFAULT 0 +); + +CREATE INDEX IF NOT EXISTS "serverId" +ON "` + this.statTableName + `" ( + "serverId" ASC, + "version" ASC +); + +CREATE UNIQUE INDEX IF NOT EXISTS "hash" +ON "` + this.statTableName + `" ( + "hash" ASC +);`) + if err != nil { + return err + } + + // insert stat stmt + this.insertStatStmt, err = db.Prepare(`INSERT INTO "stats" ("serverId", "hash", "keys", "value", "time", "version", "isUploaded") VALUES (?, ?, ?, ?, ?, ?, 0) ON CONFLICT("hash") DO UPDATE SET "value"="value"+?, "isUploaded"=0`) + if err != nil { + return err + } + + // delete by version + this.deleteByVersionStmt, err = db.Prepare(`DELETE FROM "` + this.statTableName + `" WHERE "version" 0 { + err = json.Unmarshal(keysData, &pbStat.Keys) + if err != nil { + return nil, err + } + } + pbStats = append(pbStats, pbStat) + ids = append(ids, strconv.FormatInt(pbStat.Id, 10)) + } + + // 提前关闭 + _ = rows.Close() + isClosed = true + + // 上传 + if len(pbStats) > 0 { + // 计算总和 + count, total, err := this.sum(serverId, currentTime) + if err != nil { + return nil, err + } + + _, err = rpcClient.MetricStatRPC.UploadMetricStats(rpcClient.Context(), &pb.UploadMetricStatsRequest{ + MetricStats: pbStats, + Time: currentTime, + ServerId: serverId, + ItemId: this.itemConfig.Id, + Version: this.itemConfig.Version, + Count: count, + Total: float32(total), + }) + if err != nil { + return nil, err + } + } + + return + }(serverId, currentTime) + if err != nil { + return err + } + + if len(idStrings) > 0 { + // 设置为已上传 + // TODO 先不判断是否已经上传,需要改造API进行配合 + /**_, err = this.db.Exec(`UPDATE "` + this.statTableName + `" SET isUploaded=1 WHERE id IN (` + strings.Join(idStrings, ",") + `)`) + if err != nil { + return err + }**/ + } + } + + // 休息一下,防止短时间内上传数据过多 + if pauseDuration > 0 { + time.Sleep(pauseDuration) + uploadTr.Add(-pauseDuration) + } + } + + return nil +} + +// 加载服务ID +func (this *SQLiteTask) loadServerIdMap() error { + { + rows, err := this.db.Query(`SELECT DISTINCT "serverId" FROM `+this.statTableName+" WHERE version=?", this.itemConfig.Version) + if err != nil { + return err + } + defer func() { + _ = rows.Close() + }() + + var serverId int64 + for rows.Next() { + err = rows.Scan(&serverId) + if err != nil { + return err + } + this.serverIdMapLocker.Lock() + this.serverIdMap[serverId] = zero.New() + this.serverIdMapLocker.Unlock() + } + } + + { + rows, err := this.db.Query(`SELECT DISTINCT "time" FROM `+this.statTableName+" WHERE version=?", this.itemConfig.Version) + if err != nil { + return err + } + defer func() { + _ = rows.Close() + }() + + var timeString string + for rows.Next() { + err = rows.Scan(&timeString) + if err != nil { + return err + } + this.serverIdMapLocker.Lock() + this.timeMap[timeString] = zero.New() + this.serverIdMapLocker.Unlock() + } + } + + return nil +} + +// 计算数量和综合 +func (this *SQLiteTask) sum(serverId int64, time string) (count int64, total float64, err error) { + rows, err := this.sumStmt.Query(serverId, this.itemConfig.Version, time) + if err != nil { + return 0, 0, err + } + defer func() { + _ = rows.Close() + }() + if rows.Next() { + err = rows.Scan(&count, &total) + if err != nil { + return 0, 0, err + } + } + return +} diff --git a/internal/trackers/label.go b/internal/trackers/label.go index 825ea33..4944b86 100644 --- a/internal/trackers/label.go +++ b/internal/trackers/label.go @@ -26,3 +26,7 @@ func (this *tracker) End() { func (this *tracker) Begin(subLabel string) *tracker { return Begin(this.label + ":" + subLabel) } + +func (this *tracker) Add(duration time.Duration) { + this.startTime = this.startTime.Add(-duration) +} diff --git a/internal/trackers/manager_test.go b/internal/trackers/manager_test.go index 743f0f7..c03650f 100644 --- a/internal/trackers/manager_test.go +++ b/internal/trackers/manager_test.go @@ -1,8 +1,9 @@ // Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. -package trackers +package trackers_test import ( + "github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/iwind/TeaGo/logs" "testing" "time" @@ -10,38 +11,46 @@ import ( func TestNewManager(t *testing.T) { { - var tr = Begin("a") + var tr = trackers.Begin("a") tr.End() } { - var tr = Begin("a") + var tr = trackers.Begin("a") time.Sleep(1 * time.Millisecond) tr.End() } { - var tr = Begin("a") + var tr = trackers.Begin("a") time.Sleep(2 * time.Millisecond) tr.End() } { - var tr = Begin("a") + var tr = trackers.Begin("a") time.Sleep(3 * time.Millisecond) tr.End() } { - var tr = Begin("a") + var tr = trackers.Begin("a") time.Sleep(4 * time.Millisecond) tr.End() } { - var tr = Begin("a") + var tr = trackers.Begin("a") time.Sleep(5 * time.Millisecond) tr.End() } { - var tr = Begin("b") + var tr = trackers.Begin("b") tr.End() } - logs.PrintAsJSON(SharedManager.Labels(), t) + logs.PrintAsJSON(trackers.SharedManager.Labels(), t) +} + +func TestTrackers_Add(t *testing.T) { + var tr = trackers.Begin("a") + time.Sleep(50 * time.Millisecond) + tr.Add(-10 * time.Millisecond) + tr.End() + t.Log(trackers.SharedManager.Labels()) } diff --git a/internal/utils/agents/manager.go b/internal/utils/agents/manager.go index e669d22..b319bd2 100644 --- a/internal/utils/agents/manager.go +++ b/internal/utils/agents/manager.go @@ -199,7 +199,7 @@ func (this *Manager) loadDB() error { var sqlitePath = Tea.Root + "/data/agents.db" _, sqliteErr := os.Stat(sqlitePath) var db DB - if sqliteErr == nil { + if sqliteErr == nil || !teaconst.EnableKVCacheStore { db = NewSQLiteDB(sqlitePath) } else { db = NewKVDB() diff --git a/internal/utils/byte/utils.go b/internal/utils/byte/utils.go index 24f7d94..d0eb611 100644 --- a/internal/utils/byte/utils.go +++ b/internal/utils/byte/utils.go @@ -18,8 +18,8 @@ func Append(b []byte, b2 ...byte) []byte { return append(Copy(b), b2...) } -// Contact bytes -func Contact(b []byte, b2 ...[]byte) []byte { +// Concat bytes +func Concat(b []byte, b2 ...[]byte) []byte { b = Copy(b) for _, b3 := range b2 { b = append(b, b3...) diff --git a/internal/utils/byte/utils_test.go b/internal/utils/byte/utils_test.go index 2794ca0..aa91104 100644 --- a/internal/utils/byte/utils_test.go +++ b/internal/utils/byte/utils_test.go @@ -39,7 +39,7 @@ func TestConcat(t *testing.T) { var prefix []byte prefix = append(prefix, 1, 2, 3) - var b = byteutils.Contact(prefix, []byte{4, 5, 6}, []byte{7}) + var b = byteutils.Concat(prefix, []byte{4, 5, 6}, []byte{7}) t.Log(b) a.IsTrue(bytes.Equal(b, []byte{1, 2, 3, 4, 5, 6, 7})) diff --git a/internal/utils/kvstore/db.go b/internal/utils/kvstore/db.go index 38c77a9..0de9bb0 100644 --- a/internal/utils/kvstore/db.go +++ b/internal/utils/kvstore/db.go @@ -52,6 +52,15 @@ func (this *DB) Store() *Store { return this.store } +// Truncate the database +func (this *DB) Truncate() error { + this.mu.Lock() + defer this.mu.Unlock() + + var start = []byte(this.Namespace()) + return this.store.rawDB.DeleteRange(start, append(start, 0xFF), DefaultWriteOptions) +} + func (this *DB) Close() error { this.mu.Lock() defer this.mu.Unlock() diff --git a/internal/utils/kvstore/store.go b/internal/utils/kvstore/store.go index 7b21e51..f3f7658 100644 --- a/internal/utils/kvstore/store.go +++ b/internal/utils/kvstore/store.go @@ -162,14 +162,22 @@ func (this *Store) Delete(keyBytes []byte) error { } func (this *Store) NewDB(dbName string) (*DB, error) { + this.mu.Lock() + defer this.mu.Unlock() + + // check existence + for _, db := range this.dbs { + if db.name == dbName { + return db, nil + } + } + + // create new db, err := NewDB(this, dbName) if err != nil { return nil, err } - this.mu.Lock() - defer this.mu.Unlock() - this.dbs = append(this.dbs, db) return db, nil } diff --git a/internal/utils/kvstore/table.go b/internal/utils/kvstore/table.go index b86b144..6ecfc0a 100644 --- a/internal/utils/kvstore/table.go +++ b/internal/utils/kvstore/table.go @@ -6,6 +6,7 @@ import ( "bytes" "encoding/binary" "errors" + "fmt" "github.com/cockroachdb/pebble" "github.com/iwind/TeaGo/types" "sync" @@ -213,6 +214,10 @@ func (this *Table[T]) Truncate() error { return this.db.store.rawDB.DeleteRange(this.Namespace(), append(this.Namespace(), 0xFF), DefaultWriteOptions) } +func (this *Table[T]) DeleteRange(start string, end string) error { + return this.db.store.rawDB.DeleteRange(this.FullKeyBytes([]byte(start)), this.FullKeyBytes([]byte(end)), DefaultWriteOptions) +} + func (this *Table[T]) Query() *Query[T] { var query = NewQuery[T]() query.SetTable(this) @@ -275,6 +280,7 @@ func (this *Table[T]) DecodeFieldKey(fieldName string, fieldKey []byte) (fieldVa } func (this *Table[T]) Close() error { + // nothing to do return nil } @@ -300,7 +306,7 @@ func (this *Table[T]) deleteKeys(tx *Tx[T], key ...string) error { value, decodeErr := this.encoder.Decode(valueBytes) if decodeErr != nil { - return decodeErr + return fmt.Errorf("decode value failed: %w", decodeErr) } for _, fieldName := range this.fieldNames { @@ -360,7 +366,7 @@ func (this *Table[T]) set(tx *Tx[T], key string, valueBytes []byte, value T, ins var decodeErr error oldValue, decodeErr = this.encoder.Decode(oldValueBytes) if decodeErr != nil { - return decodeErr + return fmt.Errorf("decode value failed: %w", decodeErr) } oldFound = true } @@ -431,7 +437,7 @@ func (this *Table[T]) getWithKeyBytes(tx *Tx[T], keyBytes []byte) (value T, err resultValue, decodeErr := this.encoder.Decode(valueBytes) if decodeErr != nil { - return value, decodeErr + return value, fmt.Errorf("decode value failed: %w", decodeErr) } value = resultValue return diff --git a/internal/utils/kvstore/value_encoder_nil.go b/internal/utils/kvstore/value_encoder_nil.go new file mode 100644 index 0000000..bc3a076 --- /dev/null +++ b/internal/utils/kvstore/value_encoder_nil.go @@ -0,0 +1,22 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore + +type NilValueEncoder[T []byte] struct { +} + +func NewNilValueEncoder[T []byte]() *NilValueEncoder[T] { + return &NilValueEncoder[T]{} +} + +func (this *NilValueEncoder[T]) Encode(value T) ([]byte, error) { + return nil, nil +} + +func (this *NilValueEncoder[T]) EncodeField(value T, fieldName string) ([]byte, error) { + return nil, nil +} + +func (this *NilValueEncoder[T]) Decode(valueData []byte) (value T, err error) { + return +}