diff --git a/build/data/.gitignore b/build/data/.gitignore index ed0f64e..f59ec20 100644 --- a/build/data/.gitignore +++ b/build/data/.gitignore @@ -1 +1 @@ -index.* \ No newline at end of file +* \ No newline at end of file diff --git a/go.mod b/go.mod index 2e7578e..2e6b17e 100644 --- a/go.mod +++ b/go.mod @@ -12,10 +12,13 @@ require ( github.com/go-ole/go-ole v1.2.4 // indirect github.com/go-yaml/yaml v2.1.0+incompatible github.com/golang/protobuf v1.5.2 - github.com/iwind/TeaGo v0.0.0-20210411134150-ddf57e240c2f + github.com/iwind/TeaGo v0.0.0-20210628135026-38575a4ab060 github.com/iwind/gofcgi v0.0.0-20210528023741-a92711d45f11 + github.com/json-iterator/go v1.1.11 // indirect github.com/lionsoul2014/ip2region v2.2.0-release+incompatible github.com/mattn/go-sqlite3 v1.14.7 + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect github.com/mssola/user_agent v0.5.2 github.com/shirou/gopsutil v3.21.5+incompatible github.com/tklauser/go-sysconf v0.3.6 // indirect diff --git a/go.sum b/go.sum index dd8b53e..3ac5bc0 100644 --- a/go.sum +++ b/go.sum @@ -65,9 +65,13 @@ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/iwind/TeaGo v0.0.0-20210411134150-ddf57e240c2f h1:r2O8PONj/KiuZjJHVHn7KlCePUIjNtgAmvLfgRafQ8o= github.com/iwind/TeaGo v0.0.0-20210411134150-ddf57e240c2f/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= +github.com/iwind/TeaGo v0.0.0-20210628135026-38575a4ab060 h1:qdLtK4PDXxk2vMKkTWl5Fl9xqYuRCukzWAgJbLHdfOo= +github.com/iwind/TeaGo v0.0.0-20210628135026-38575a4ab060/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= github.com/iwind/gofcgi v0.0.0-20210528023741-a92711d45f11 h1:DaQjoWZhLNxjhIXedVg4/vFEtHkZhK4IjIwsWdyzBLg= github.com/iwind/gofcgi v0.0.0-20210528023741-a92711d45f11/go.mod h1:JtbX20untAjUVjZs1ZBtq80f5rJWvwtQNRL6EnuYRnY= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ= +github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= @@ -77,7 +81,11 @@ github.com/lionsoul2014/ip2region v2.2.0-release+incompatible/go.mod h1:+ZBN7PBo github.com/mattn/go-sqlite3 v1.14.7 h1:fxWBnXkxfM6sRiuH3bqJ4CfzZojMOLVc0UTsTglEghA= github.com/mattn/go-sqlite3 v1.14.7/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mssola/user_agent v0.5.2 h1:CZkTUahjL1+OcZ5zv3kZr8QiJ8jy2H08vZIEkBeRbxo= github.com/mssola/user_agent v0.5.2/go.mod h1:TTPno8LPY3wAIEKRpAtkdMT0f8SE24pLRGPahjCH4uw= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= diff --git a/internal/metrics/manager.go b/internal/metrics/manager.go new file mode 100644 index 0000000..972c264 --- /dev/null +++ b/internal/metrics/manager.go @@ -0,0 +1,121 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package metrics + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "strconv" + "sync" +) + +var SharedManager = NewManager() + +type Manager struct { + tasks map[int64]*Task // itemId => *Task + categoryTasks map[string][]*Task // category => []*Task + locker sync.RWMutex + + hasHTTPMetrics bool + hasTCPMetrics bool + hasUDPMetrics bool +} + +func NewManager() *Manager { + return &Manager{ + tasks: map[int64]*Task{}, + categoryTasks: map[string][]*Task{}, + } +} + +func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) { + this.locker.Lock() + defer this.locker.Unlock() + + var newMap = map[int64]*serverconfigs.MetricItemConfig{} + for _, item := range items { + newMap[item.Id] = item + } + + // 停用以前的 或 修改现在的 + for itemId, task := range this.tasks { + newItem, ok := newMap[itemId] + if !ok || !newItem.IsOn { // 停用以前的 + remotelogs.Println("METRIC_MANAGER", "stop task '"+strconv.FormatInt(itemId, 10)+"'") + err := task.Stop() + if err != nil { + remotelogs.Error("METRIC_MANAGER", "stop task '"+strconv.FormatInt(itemId, 10)+"' failed: "+err.Error()) + } + delete(this.tasks, itemId) + } else { // 更新已存在的 + if newItem.Version != task.item.Version { + remotelogs.Println("METRIC_MANAGER", "update task '"+strconv.FormatInt(itemId, 10)+"'") + task.item = newItem + } + } + } + + // 启动新的 + for _, newItem := range items { + if !newItem.IsOn { + continue + } + _, ok := this.tasks[newItem.Id] + if !ok { + remotelogs.Println("METRIC_MANAGER", "start task '"+strconv.FormatInt(newItem.Id, 10)+"'") + task := NewTask(newItem) + err := task.Init() + if err != nil { + remotelogs.Error("METRIC_MANAGER", "initialized task failed: "+err.Error()) + continue + } + err = task.Start() + if err != nil { + remotelogs.Error("METRIC_MANAGER", "start task failed: "+err.Error()) + continue + } + this.tasks[newItem.Id] = task + } + } + + // 按分类存放 + this.hasHTTPMetrics = false + this.hasTCPMetrics = false + this.hasUDPMetrics = false + this.categoryTasks = map[string][]*Task{} + for _, task := range this.tasks { + tasks := this.categoryTasks[task.item.Category] + tasks = append(tasks, task) + this.categoryTasks[task.item.Category] = tasks + + switch task.item.Category { + case serverconfigs.MetricItemCategoryHTTP: + this.hasHTTPMetrics = true + case serverconfigs.MetricItemCategoryTCP: + this.hasTCPMetrics = true + case serverconfigs.MetricItemCategoryUDP: + this.hasUDPMetrics = true + } + } +} + +// Add 添加数据 +func (this *Manager) Add(obj MetricInterface) { + this.locker.RLock() + for _, task := range this.categoryTasks[obj.MetricCategory()] { + task.Add(obj) + } + this.locker.RUnlock() +} + +func (this *Manager) HasHTTPMetrics() bool { + return this.hasHTTPMetrics +} + +func (this *Manager) HasTCPMetrics() bool { + return this.hasTCPMetrics +} + +func (this *Manager) HasUDPMetrics() bool { + return this.hasUDPMetrics +} diff --git a/internal/metrics/manager_test.go b/internal/metrics/manager_test.go new file mode 100644 index 0000000..8dc242f --- /dev/null +++ b/internal/metrics/manager_test.go @@ -0,0 +1,63 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package metrics + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "testing" +) + +func TestNewManager(t *testing.T) { + var manager = NewManager() + { + manager.Update([]*serverconfigs.MetricItemConfig{}) + for _, task := range manager.tasks { + t.Log(task.item.Id) + } + } + { + t.Log("====") + manager.Update([]*serverconfigs.MetricItemConfig{ + { + Id: 1, + }, + { + Id: 2, + }, + { + Id: 3, + }, + }) + for _, task := range manager.tasks { + t.Log("task:", task.item.Id) + } + } + + { + t.Log("====") + manager.Update([]*serverconfigs.MetricItemConfig{ + { + Id: 1, + }, + { + Id: 2, + }, + }) + for _, task := range manager.tasks { + t.Log("task:", task.item.Id) + } + } + + { + t.Log("====") + manager.Update([]*serverconfigs.MetricItemConfig{ + { + Id: 1, + Version: 1, + }, + }) + for _, task := range manager.tasks { + t.Log("task:", task.item.Id) + } + } +} diff --git a/internal/metrics/metric_interface.go b/internal/metrics/metric_interface.go new file mode 100644 index 0000000..7278369 --- /dev/null +++ b/internal/metrics/metric_interface.go @@ -0,0 +1,17 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package metrics + +type MetricInterface interface { + // MetricKey 指标对象 + MetricKey(key string) string + + // MetricValue 指标值 + MetricValue(value string) (result int64, ok bool) + + // MetricServerId 服务ID + MetricServerId() int64 + + // MetricCategory 指标分类 + MetricCategory() string +} diff --git a/internal/metrics/stat.go b/internal/metrics/stat.go new file mode 100644 index 0000000..f3ea3ce --- /dev/null +++ b/internal/metrics/stat.go @@ -0,0 +1,22 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package metrics + +import ( + "github.com/cespare/xxhash" + "strconv" +) + +type Stat struct { + ServerId int64 + Keys []string + Hash string + Value int64 + Time string + + keysData []byte +} + +func (this *Stat) Sum(version int, itemId int64) { + this.Hash = strconv.FormatUint(xxhash.Sum64String(strconv.FormatInt(this.ServerId, 10)+"@"+string(this.keysData)+"@"+this.Time+"@"+strconv.Itoa(version)+"@"+strconv.FormatInt(itemId, 10)), 10) +} diff --git a/internal/metrics/task.go b/internal/metrics/task.go new file mode 100644 index 0000000..b441da3 --- /dev/null +++ b/internal/metrics/task.go @@ -0,0 +1,414 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package metrics + +import ( + "database/sql" + "encoding/json" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/TeaOSLab/EdgeNode/internal/rpc" + "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/iwind/TeaGo/Tea" + _ "github.com/mattn/go-sqlite3" + "os" + "strconv" + "strings" + "sync" + "time" +) + +// Task 单个指标任务 +// 数据库存储: +// data/ +// metric.$ID.db +// stats +// id, keys, value, time, serverId, hash +// 原理: +// 添加或者有变更时 isUploaded = false +// 上传时检查 isUploaded 状态 +// 只上传每个服务中排序最前面的 N 个数据 +type Task struct { + item *serverconfigs.MetricItemConfig + isLoaded bool + + db *sql.DB + statTableName string + statsChan chan *Stat + isStopped bool + + cleanTicker *utils.Ticker + uploadTicker *utils.Ticker + + cleanVersion int + + insertStatStmt *sql.Stmt + deleteByVersionStmt *sql.Stmt + deleteByExpiresTimeStmt *sql.Stmt + selectTopStmt *sql.Stmt + + serverIdMap map[int64]bool // 所有的服务Ids + serverIdMapLocker sync.Mutex +} + +// NewTask 获取新任务 +func NewTask(item *serverconfigs.MetricItemConfig) *Task { + return &Task{ + item: item, + statsChan: make(chan *Stat, 40960), + serverIdMap: map[int64]bool{}, + } +} + +// Init 初始化 +func (this *Task) Init() error { + this.statTableName = "stats" + + // 检查目录是否存在 + var dir = Tea.Root + "/data" + _, err := os.Stat(dir) + if err != nil { + err = os.MkdirAll(dir, 0777) + if err != nil { + return err + } + remotelogs.Println("METRIC", "create data dir '"+dir+"'") + } + + db, err := sql.Open("sqlite3", "file:"+dir+"/metric."+strconv.FormatInt(this.item.Id, 10)+".db?cache=shared&mode=rwc&_journal_mode=WAL") + if err != nil { + return err + } + db.SetMaxOpenConns(1) + this.db = db + + //创建统计表 + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.statTableName + `" ( + "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, + "hash" varchar(32), + "keys" varchar(1024), + "value" real DEFAULT 0, + "time" varchar(32), + "serverId" integer DEFAULT 0, + "version" integer DEFAULT 0, + "isUploaded" integer DEFAULT 0 +); + +CREATE INDEX IF NOT EXISTS "serverId" +ON "` + this.statTableName + `" ( + "serverId" ASC, + "version" ASC +); + +CREATE UNIQUE INDEX IF NOT EXISTS "hash" +ON "` + this.statTableName + `" ( + "hash" ASC +);`) + if err != nil { + return err + } + + // insert stat stmt + this.insertStatStmt, err = db.Prepare(`INSERT INTO "stats" ("serverId", "hash", "keys", "value", "time", "version", "isUploaded") VALUES (?, ?, ?, ?, ?, ?, 0) ON CONFLICT("hash") DO UPDATE SET "value"="value"+?, "isUploaded"=0`) + if err != nil { + return err + } + + // delete by version + this.deleteByVersionStmt, err = db.Prepare(`DELETE FROM "` + this.statTableName + `" WHERE "version" 0 { + err = json.Unmarshal(keysData, &pbStat.Keys) + if err != nil { + return nil, err + } + } + pbStats = append(pbStats, pbStat) + ids = append(ids, strconv.FormatInt(pbStat.Id, 10)) + } + + // 提前关闭 + _ = rows.Close() + isClosed = true + + // 上传 + if len(pbStats) > 0 { + _, err = rpcClient.MetricStatRPC().UploadMetricStats(rpcClient.Context(), &pb.UploadMetricStatsRequest{MetricStats: pbStats}) + if err != nil { + return nil, err + } + } + + return + }(serverId) + if err != nil { + return err + } + + if len(idStrings) > 0 { + // 设置为已上传 + _, err = this.db.Exec(`UPDATE "` + this.statTableName + `" SET isUploaded=1 WHERE id IN (` + strings.Join(idStrings, ",") + `)`) + if err != nil { + return err + } + } + + // 休息一下,防止短时间内上传数据过多 + if pauseDuration > 0 && len(idStrings) > 0 { + time.Sleep(pauseDuration) + } + } + + return nil +} + +// 加载服务ID +func (this *Task) loadServerIdMap() error { + rows, err := this.db.Query(`SELECT DISTINCT "serverId" FROM `+this.statTableName+" WHERE version=?", this.item.Version) + if err != nil { + return err + } + defer func() { + _ = rows.Close() + }() + + var serverId int64 + for rows.Next() { + err = rows.Scan(&serverId) + if err != nil { + return err + } + this.serverIdMapLocker.Lock() + this.serverIdMap[serverId] = true + this.serverIdMapLocker.Unlock() + } + return nil +} diff --git a/internal/metrics/task_test.go b/internal/metrics/task_test.go new file mode 100644 index 0000000..bb51623 --- /dev/null +++ b/internal/metrics/task_test.go @@ -0,0 +1,209 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package metrics_test + +import ( + "fmt" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeNode/internal/metrics" + _ "github.com/iwind/TeaGo/bootstrap" + "github.com/iwind/TeaGo/rands" + "testing" + "time" +) + +type testObj struct { + ip string +} + +func (this *testObj) MetricKey(key string) string { + return this.ip +} + +func (this *testObj) MetricValue(value string) (int64, bool) { + return 1, true +} + +func (this *testObj) MetricServerId() int64 { + return int64(rands.Int(1, 100)) +} + +func (this *testObj) MetricCategory() string { + return "http" +} + +func TestTask_Init(t *testing.T) { + var task = metrics.NewTask(&serverconfigs.MetricItemConfig{ + Id: 1, + IsOn: false, + Category: "", + Period: 0, + PeriodUnit: "", + Keys: nil, + Value: "", + }) + err := task.Init() + if err != nil { + t.Fatal(err) + } + defer func() { + _ = task.Stop() + }() + t.Log("ok") +} + +func TestTask_Add(t *testing.T) { + var task = metrics.NewTask(&serverconfigs.MetricItemConfig{ + Id: 1, + IsOn: false, + Category: "", + Period: 1, + PeriodUnit: serverconfigs.MetricItemPeriodUnitDay, + Keys: []string{"${remoteAddr}"}, + Value: "${countRequest}", + }) + err := task.Init() + if err != nil { + t.Fatal(err) + } + err = task.Start() + if err != nil { + t.Fatal(err) + } + defer func() { + _ = task.Stop() + }() + + task.Add(&testObj{ip: "127.0.0.2"}) + time.Sleep(1 * time.Second) // waiting for inserting +} + +func TestTask_Add_Many(t *testing.T) { + var task = metrics.NewTask(&serverconfigs.MetricItemConfig{ + Id: 1, + IsOn: false, + Category: "", + Period: 1, + PeriodUnit: serverconfigs.MetricItemPeriodUnitDay, + Keys: []string{"${remoteAddr}"}, + Value: "${countRequest}", + Version: 1, + }) + err := task.Init() + if err != nil { + t.Fatal(err) + } + err = task.Start() + if err != nil { + t.Fatal(err) + } + defer func() { + _ = task.Stop() + }() + + for i := 0; i < 4_000_000; i++ { + task.Add(&testObj{ + ip: fmt.Sprintf("%d.%d.%d.%d", rands.Int(0, 255), rands.Int(0, 255), rands.Int(0, 255), rands.Int(0, 255)), + }) + if i%10000 == 0 { + time.Sleep(1 * time.Second) + } + } +} + +func TestTask_InsertStat(t *testing.T) { + var item = &serverconfigs.MetricItemConfig{ + Id: 1, + IsOn: false, + Category: "", + Period: 1, + PeriodUnit: serverconfigs.MetricItemPeriodUnitDay, + Keys: []string{"${remoteAddr}"}, + Value: "${countRequest}", + Version: 1, + } + var task = metrics.NewTask(item) + err := task.Init() + if err != nil { + t.Fatal(err) + } + err = task.Start() + if err != nil { + t.Fatal(err) + } + defer func() { + _ = task.Stop() + }() + + err = task.InsertStat(&metrics.Stat{ + ServerId: 1, + Keys: []string{"127.0.0.1"}, + Hash: "", + Value: 1, + Time: item.CurrentTime(), + }) + if err != nil { + t.Fatal(err) + } + t.Log("ok") +} + +func TestTask_CleanExpired(t *testing.T) { + var task = metrics.NewTask(&serverconfigs.MetricItemConfig{ + Id: 1, + IsOn: false, + Category: "", + Period: 1, + PeriodUnit: serverconfigs.MetricItemPeriodUnitDay, + Keys: []string{"${remoteAddr}"}, + Value: "${countRequest}", + Version: 1, + }) + err := task.Init() + if err != nil { + t.Fatal(err) + } + err = task.Start() + if err != nil { + t.Fatal(err) + } + defer func() { + _ = task.Stop() + }() + + err = task.CleanExpired() + if err != nil { + t.Fatal(err) + } + t.Log("ok") +} + +func TestTask_Upload(t *testing.T) { + var task = metrics.NewTask(&serverconfigs.MetricItemConfig{ + Id: 1, + IsOn: false, + Category: "", + Period: 1, + PeriodUnit: serverconfigs.MetricItemPeriodUnitDay, + Keys: []string{"${remoteAddr}"}, + Value: "${countRequest}", + Version: 1, + }) + err := task.Init() + if err != nil { + t.Fatal(err) + } + err = task.Start() + if err != nil { + t.Fatal(err) + } + defer func() { + _ = task.Stop() + }() + + err = task.Upload(0) + if err != nil { + t.Fatal(err) + } + t.Log("ok") +} diff --git a/internal/nodes/http_request.go b/internal/nodes/http_request.go index fd0c276..8bfde85 100644 --- a/internal/nodes/http_request.go +++ b/internal/nodes/http_request.go @@ -7,6 +7,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/configutils" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "github.com/TeaOSLab/EdgeNode/internal/metrics" "github.com/TeaOSLab/EdgeNode/internal/stats" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/iwind/TeaGo/types" @@ -247,6 +248,11 @@ func (this *HTTPRequest) doEnd() { stats.SharedTrafficStatManager.Add(this.Server.Id, this.writer.sentBodyBytes, 0, 1, 0) } } + + // 指标 + if metrics.SharedManager.HasHTTPMetrics() { + this.doMetricsResponse() + } } // RawURI 原始的请求URI diff --git a/internal/nodes/http_request_metrics.go b/internal/nodes/http_request_metrics.go new file mode 100644 index 0000000..6af856c --- /dev/null +++ b/internal/nodes/http_request_metrics.go @@ -0,0 +1,58 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package nodes + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeNode/internal/metrics" +) + +// 指标统计 - 响应 +// 只需要在结束时调用指标进行统计 +func (this *HTTPRequest) doMetricsResponse() { + metrics.SharedManager.Add(this) +} + +func (this *HTTPRequest) MetricKey(key string) string { + return this.Format(key) +} + +func (this *HTTPRequest) MetricValue(value string) (result int64, ok bool) { + // TODO 需要忽略健康检查的请求,但是同时也要防止攻击者模拟健康检查 + switch value { + case "${countRequest}": + return 1, true + case "${countTrafficOut}": + // 这里不包括Header长度 + return this.writer.SentBodyBytes(), true + case "${countTrafficIn}": + var hl int64 = 0 // header length + for k, values := range this.RawReq.Header { + for _, v := range values { + hl += int64(len(k) + len(v) + 2 /** k: v **/) + } + } + return this.RawReq.ContentLength + hl, true + case "${countConnection}": + metricNewConnMapLocker.Lock() + _, ok := metricNewConnMap[this.RawReq.RemoteAddr] + if ok { + delete(metricNewConnMap, this.RawReq.RemoteAddr) + } + metricNewConnMapLocker.Unlock() + if ok { + return 1, true + } else { + return 0, false + } + } + return 0, false +} + +func (this *HTTPRequest) MetricServerId() int64 { + return this.Server.Id +} + +func (this *HTTPRequest) MetricCategory() string { + return serverconfigs.MetricItemCategoryHTTP +} diff --git a/internal/nodes/http_request_stat.go b/internal/nodes/http_request_stat.go index 1f722b6..b6ea796 100644 --- a/internal/nodes/http_request_stat.go +++ b/internal/nodes/http_request_stat.go @@ -7,6 +7,8 @@ func (this *HTTPRequest) doStat() { if this.Server == nil { return } + + // 内置的统计 stats.SharedHTTPRequestStatManager.AddRemoteAddr(this.Server.Id, this.requestRemoteAddr()) stats.SharedHTTPRequestStatManager.AddUserAgent(this.Server.Id, this.requestHeader("User-Agent")) } diff --git a/internal/nodes/listener_http.go b/internal/nodes/listener_http.go index 3c68c7f..b8d8b7b 100644 --- a/internal/nodes/listener_http.go +++ b/internal/nodes/listener_http.go @@ -9,11 +9,14 @@ import ( "net" "net/http" "strings" + "sync" "sync/atomic" "time" ) var httpErrorLogger = log.New(io.Discard, "", 0) +var metricNewConnMap = map[string]bool{} // remoteAddr => bool +var metricNewConnMapLocker = &sync.Mutex{} type HTTPListener struct { BaseListener @@ -39,15 +42,27 @@ func (this *HTTPListener) Serve() error { this.httpServer = &http.Server{ Addr: this.addr, Handler: handler, - ReadHeaderTimeout: 3 * time.Second, // TODO 改成可以配置 + ReadHeaderTimeout: 2 * time.Second, // TODO 改成可以配置 IdleTimeout: 2 * time.Minute, // TODO 改成可以配置 ErrorLog: httpErrorLogger, ConnState: func(conn net.Conn, state http.ConnState) { switch state { case http.StateNew: atomic.AddInt64(&this.countActiveConnections, 1) + + // 为指标存储连接信息 + if sharedNodeConfig.HasHTTPConnectionMetrics() { + metricNewConnMapLocker.Lock() + metricNewConnMap[conn.RemoteAddr().String()] = true + metricNewConnMapLocker.Unlock() + } case http.StateClosed: atomic.AddInt64(&this.countActiveConnections, -1) + + // 移除指标存储连接信息 + metricNewConnMapLocker.Lock() + delete(metricNewConnMap, conn.RemoteAddr().String()) + metricNewConnMapLocker.Unlock() } }, } diff --git a/internal/nodes/node.go b/internal/nodes/node.go index d65c2f1..d6f0e38 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -12,6 +12,7 @@ import ( teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/iplibrary" + "github.com/TeaOSLab/EdgeNode/internal/metrics" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/stats" @@ -396,6 +397,8 @@ func (this *Node) syncConfig() error { iplibrary.SharedActionManager.UpdateActions(nodeConfig.FirewallActions) sharedNodeConfig = nodeConfig + metrics.SharedManager.Update(nodeConfig.MetricItems) + // 发送事件 events.Notify(events.EventReload) diff --git a/internal/rpc/rpc_client.go b/internal/rpc/rpc_client.go index 28434a6..37f1a32 100644 --- a/internal/rpc/rpc_client.go +++ b/internal/rpc/rpc_client.go @@ -109,6 +109,10 @@ func (this *RPCClient) ServerDailyStatRPC() pb.ServerDailyStatServiceClient { return pb.NewServerDailyStatServiceClient(this.pickConn()) } +func (this *RPCClient) MetricStatRPC() pb.MetricStatServiceClient { + return pb.NewMetricStatServiceClient(this.pickConn()) +} + // Context 节点上下文信息 func (this *RPCClient) Context() context.Context { ctx := context.Background()