From 7f80e32448dd77b171fcd2874f70694d0dffbbf4 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Sun, 14 Nov 2021 10:55:09 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=AF=B9=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E7=9A=84=E6=89=A7=E8=A1=8C=E6=97=B6=E9=97=B4=E8=BF=BD=E8=B8=AA?= =?UTF-8?q?=E5=B7=A5=E5=85=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/edge-node/main.go | 28 ++++++++++++ internal/caches/storage_file.go | 3 ++ internal/caches/storage_file_test.go | 13 ++++++ internal/caches/storage_memory.go | 11 ++++- internal/caches/storage_memory_test.go | 16 +++---- internal/metrics/task.go | 9 ++++ internal/nodes/node.go | 10 +++++ internal/nodes/node_status_executor.go | 19 +++++++++ internal/nodes/origin_state_manager.go | 4 ++ internal/nodes/task_sync_api_nodes.go | 4 ++ internal/remotelogs/utils.go | 3 ++ internal/stats/http_request_stat_manager.go | 3 ++ internal/trackers/label.go | 22 ++++++++++ internal/trackers/manager.go | 47 +++++++++++++++++++++ internal/trackers/manager_test.go | 47 +++++++++++++++++++++ 15 files changed, 229 insertions(+), 10 deletions(-) create mode 100644 internal/trackers/label.go create mode 100644 internal/trackers/manager.go create mode 100644 internal/trackers/manager_test.go diff --git a/cmd/edge-node/main.go b/cmd/edge-node/main.go index 5634125..734908f 100644 --- a/cmd/edge-node/main.go +++ b/cmd/edge-node/main.go @@ -11,6 +11,7 @@ import ( "net/http" _ "net/http/pprof" "os" + "sort" ) func main() { @@ -60,6 +61,33 @@ func main() { node := nodes.NewNode() node.Start() }) + app.On("trackers", func() { + var sock = gosock.NewTmpSock(teaconst.ProcessName) + reply, err := sock.Send(&gosock.Command{Code: "trackers"}) + if err != nil { + fmt.Println("[ERROR]" + err.Error()) + } else { + labelsMap, ok := reply.Params["labels"] + if ok { + labels, ok := labelsMap.(map[string]interface{}) + if ok { + if len(labels) == 0 { + fmt.Println("no labels yet") + } else { + var labelNames = []string{} + for label := range labels { + labelNames = append(labelNames, label) + } + sort.Strings(labelNames) + + for _, labelName := range labelNames { + fmt.Println(labelName + ": " + fmt.Sprintf("%.6f", labels[labelName])) + } + } + } + } + } + }) app.Run(func() { node := nodes.NewNode() node.Start() diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 7ffa42f..a5fc25c 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -9,6 +9,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared" "github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/rands" @@ -654,7 +655,9 @@ func (this *FileStorage) initList() error { }) go func() { for this.ticker.Next() { + var tr = trackers.Begin("FILE_CACHE_STORAGE_PURGE_LOOP") this.purgeLoop() + tr.End() } }() diff --git a/internal/caches/storage_file_test.go b/internal/caches/storage_file_test.go index f0999fb..2418a46 100644 --- a/internal/caches/storage_file_test.go +++ b/internal/caches/storage_file_test.go @@ -517,3 +517,16 @@ func BenchmarkFileStorage_Read(b *testing.B) { _ = reader.Close() } } + +func BenchmarkFileStorage_KeyPath(b *testing.B) { + runtime.GOMAXPROCS(1) + + var storage = &FileStorage{ + cacheConfig: &serverconfigs.HTTPFileCacheStorage{}, + policy: &serverconfigs.HTTPCachePolicy{Id: 1}, + } + + for i := 0; i < b.N; i++ { + _, _ = storage.keyPath(strconv.Itoa(i)) + } +} diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index ac5eebb..b2b9a7a 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -4,11 +4,13 @@ import ( "fmt" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/cespare/xxhash" "github.com/iwind/TeaGo/rands" "github.com/iwind/TeaGo/types" "math" + "runtime" "strconv" "sync" "sync/atomic" @@ -84,7 +86,9 @@ func (this *MemoryStorage) Init() error { this.purgeTicker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second) go func() { for this.purgeTicker.Next() { + var tr = trackers.Begin("MEMORY_CACHE_STORAGE_PURGE_LOOP") this.purgeLoop() + tr.End() } }() @@ -260,6 +264,9 @@ func (this *MemoryStorage) Stop() { this.locker.Unlock() + // 回收内存 + runtime.GC() + remotelogs.Println("CACHE", "close memory storage '"+strconv.FormatInt(this.policy.Id, 10)+"'") } @@ -327,14 +334,14 @@ func (this *MemoryStorage) purgeLoop() { if startLFU { var total, _ = this.list.Count() if total > 0 { - var count = types.Int(math.Ceil(float64(total) * float64(lfuFreePercent * 2) / 100)) + var count = types.Int(math.Ceil(float64(total) * float64(lfuFreePercent*2) / 100)) if count > 0 { // 限制单次清理的条数,防止占用太多系统资源 if count > 2000 { count = 2000 } - remotelogs.Println("CACHE", "LFU purge policy '"+this.policy.Name+"' id: "+types.String(this.policy.Id)+", count: "+types.String(count)) + // 这里不提示LFU,因为此事件将会非常频繁 err := this.list.PurgeLFU(count, func(hash string) error { uintHash, err := strconv.ParseUint(hash, 10, 64) diff --git a/internal/caches/storage_memory_test.go b/internal/caches/storage_memory_test.go index 071c54b..11441cc 100644 --- a/internal/caches/storage_memory_test.go +++ b/internal/caches/storage_memory_test.go @@ -13,7 +13,7 @@ import ( ) func TestMemoryStorage_OpenWriter(t *testing.T) { - storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}) + storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200) if err != nil { @@ -88,7 +88,7 @@ func TestMemoryStorage_OpenWriter(t *testing.T) { } func TestMemoryStorage_OpenReaderLock(t *testing.T) { - storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}) + storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) _ = storage.Init() var h = storage.hash("test") @@ -101,7 +101,7 @@ func TestMemoryStorage_OpenReaderLock(t *testing.T) { } func TestMemoryStorage_Delete(t *testing.T) { - storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}) + storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) { writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200) if err != nil { @@ -123,7 +123,7 @@ func TestMemoryStorage_Delete(t *testing.T) { } func TestMemoryStorage_Stat(t *testing.T) { - storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}) + storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) expiredAt := time.Now().Unix() + 60 { writer, err := storage.OpenWriter("abc", expiredAt, 200) @@ -160,7 +160,7 @@ func TestMemoryStorage_Stat(t *testing.T) { } func TestMemoryStorage_CleanAll(t *testing.T) { - storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}) + storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) expiredAt := time.Now().Unix() + 60 { writer, err := storage.OpenWriter("abc", expiredAt, 200) @@ -195,7 +195,7 @@ func TestMemoryStorage_CleanAll(t *testing.T) { } func TestMemoryStorage_Purge(t *testing.T) { - storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}) + storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) expiredAt := time.Now().Unix() + 60 { writer, err := storage.OpenWriter("abc", expiredAt, 200) @@ -232,7 +232,7 @@ func TestMemoryStorage_Purge(t *testing.T) { func TestMemoryStorage_Expire(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{ MemoryAutoPurgeInterval: 5, - }) + }, nil) err := storage.Init() if err != nil { t.Fatal(err) @@ -256,7 +256,7 @@ func TestMemoryStorage_Expire(t *testing.T) { } func TestMemoryStorage_Locker(t *testing.T) { - storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}) + storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) err := storage.Init() if err != nil { t.Fatal(err) diff --git a/internal/metrics/task.go b/internal/metrics/task.go index 2ed7279..66b2682 100644 --- a/internal/metrics/task.go +++ b/internal/metrics/task.go @@ -9,6 +9,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" + "github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/iwind/TeaGo/Tea" _ "github.com/mattn/go-sqlite3" @@ -164,6 +165,8 @@ func (this *Task) Start() error { this.statsTicker = utils.NewTicker(1 * time.Minute) go func() { for this.statsTicker.Next() { + var tr = trackers.Begin("[METRIC]DUMP_STATS_TO_LOCAL_DATABASE") + this.statsLocker.Lock() var statsMap = this.statsMap this.statsMap = map[string]*Stat{} @@ -175,6 +178,8 @@ func (this *Task) Start() error { remotelogs.Error("METRIC", "insert stat failed: "+err.Error()) } } + + tr.End() } }() @@ -182,7 +187,9 @@ func (this *Task) Start() error { this.cleanTicker = utils.NewTicker(24 * time.Hour) go func() { for this.cleanTicker.Next() { + var tr = trackers.Begin("[METRIC]CLEAN_EXPIRED") err := this.CleanExpired() + tr.End() if err != nil { remotelogs.Error("METRIC", "clean expired stats failed: "+err.Error()) } @@ -193,7 +200,9 @@ func (this *Task) Start() error { this.uploadTicker = utils.NewTicker(this.item.UploadDuration()) go func() { for this.uploadTicker.Next() { + var tr = trackers.Begin("[METRIC]UPLOAD_STATS") err := this.Upload(1 * time.Second) + tr.End() if err != nil { remotelogs.Error("METRIC", "upload stats failed: "+err.Error()) } diff --git a/internal/nodes/node.go b/internal/nodes/node.go index c13b17d..6fa72b1 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -16,6 +16,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/stats" + "github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/andybalholm/brotli" "github.com/go-yaml/yaml" @@ -231,6 +232,9 @@ func (this *Node) InstallSystemService() error { // 循环 func (this *Node) loop() error { + var tr = trackers.Begin("CHECK_NODE_CONFIG_CHANGES") + defer tr.End() + // 检查api.yaml是否存在 apiConfigFile := Tea.ConfigFile("api.yaml") _, err := os.Stat(apiConfigFile) @@ -554,6 +558,12 @@ func (this *Node) listenSock() error { time.Sleep(1 * time.Second) } }() + case "trackers": + _ = cmd.Reply(&gosock.Command{ + Params: map[string]interface{}{ + "labels": trackers.SharedManager.Labels(), + }, + }) } }) diff --git a/internal/nodes/node_status_executor.go b/internal/nodes/node_status_executor.go index f7f6ef2..e724940 100644 --- a/internal/nodes/node_status_executor.go +++ b/internal/nodes/node_status_executor.go @@ -10,6 +10,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/monitor" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" + "github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/iwind/TeaGo/lists" "github.com/iwind/TeaGo/maps" @@ -58,6 +59,9 @@ func (this *NodeStatusExecutor) update() { return } + var tr = trackers.Begin("UPLOAD_NODE_STATUS") + defer tr.End() + status := &nodeconfigs.NodeStatus{} status.BuildVersion = teaconst.Version status.BuildVersionCode = utils.VersionToLong(teaconst.Version) @@ -79,11 +83,26 @@ func (this *NodeStatusExecutor) update() { hostname, _ := os.Hostname() status.Hostname = hostname + var cpuTR = tr.Begin("cpu") this.updateCPU(status) + cpuTR.End() + + var memTR = tr.Begin("memory") this.updateMem(status) + memTR.End() + + var loadTR = tr.Begin("load") this.updateLoad(status) + loadTR.End() + + var diskTR = tr.Begin("disk") this.updateDisk(status) + diskTR.End() + + var cacheSpaceTR = tr.Begin("cache space") this.updateCacheSpace(status) + cacheSpaceTR.End() + status.UpdatedAt = time.Now().Unix() // 发送数据 diff --git a/internal/nodes/origin_state_manager.go b/internal/nodes/origin_state_manager.go index db13e2c..a52d3bd 100644 --- a/internal/nodes/origin_state_manager.go +++ b/internal/nodes/origin_state_manager.go @@ -6,6 +6,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/iwind/TeaGo/Tea" "sync" "time" @@ -60,6 +61,9 @@ func (this *OriginStateManager) Loop() error { return nil } + var tr = trackers.Begin("CHECK_ORIGIN_STATES") + defer tr.End() + var currentStates = []*OriginState{} this.locker.Lock() for originId, state := range this.stateMap { diff --git a/internal/nodes/task_sync_api_nodes.go b/internal/nodes/task_sync_api_nodes.go index 4db9265..3e19536 100644 --- a/internal/nodes/task_sync_api_nodes.go +++ b/internal/nodes/task_sync_api_nodes.go @@ -8,6 +8,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" + "github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/logs" "google.golang.org/grpc" @@ -53,6 +54,9 @@ func (this *SyncAPINodesTask) Start() { } func (this *SyncAPINodesTask) Loop() error { + var tr = trackers.Begin("SYNC_API_NODES") + defer tr.End() + // 获取所有可用的节点 rpcClient, err := rpc.SharedRPC() if err != nil { diff --git a/internal/remotelogs/utils.go b/internal/remotelogs/utils.go index 0832d16..e9f339e 100644 --- a/internal/remotelogs/utils.go +++ b/internal/remotelogs/utils.go @@ -5,6 +5,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/rpc" + "github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/iwind/TeaGo/logs" "time" ) @@ -16,7 +17,9 @@ func init() { ticker := time.NewTicker(60 * time.Second) go func() { for range ticker.C { + var tr = trackers.Begin("UPLOAD_REMOTE_LOGS") err := uploadLogs() + tr.End() if err != nil { logs.Println("[LOG]" + err.Error()) } diff --git a/internal/stats/http_request_stat_manager.go b/internal/stats/http_request_stat_manager.go index 843cb39..e2459e1 100644 --- a/internal/stats/http_request_stat_manager.go +++ b/internal/stats/http_request_stat_manager.go @@ -8,6 +8,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/monitor" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" + "github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/waf" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/maps" @@ -89,7 +90,9 @@ func (this *HTTPRequestStatManager) Start() { } select { case <-uploadTicker.C: + var tr = trackers.Begin("UPLOAD_REQUEST_STATS") err := this.Upload() + tr.End() if err != nil { if !rpc.IsConnError(err) { remotelogs.Error("HTTP_REQUEST_STAT_MANAGER", "upload failed: "+err.Error()) diff --git a/internal/trackers/label.go b/internal/trackers/label.go new file mode 100644 index 0000000..4579179 --- /dev/null +++ b/internal/trackers/label.go @@ -0,0 +1,22 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package trackers + +import "time" + +type tracker struct { + label string + startTime time.Time +} + +func Begin(label string) *tracker { + return &tracker{label: label, startTime: time.Now()} +} + +func (this *tracker) End() { + SharedManager.Add(this.label, time.Since(this.startTime).Seconds()*1000) +} + +func (this *tracker) Begin(subLabel string) *tracker { + return Begin(this.label + ":" + subLabel) +} diff --git a/internal/trackers/manager.go b/internal/trackers/manager.go new file mode 100644 index 0000000..0831f84 --- /dev/null +++ b/internal/trackers/manager.go @@ -0,0 +1,47 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package trackers + +import ( + "sync" +) + +var SharedManager = NewManager() + +type Manager struct { + m map[string][]float64 // label => time costs ms + locker sync.Mutex +} + +func NewManager() *Manager { + return &Manager{m: map[string][]float64{}} +} + +func (this *Manager) Add(label string, costMs float64) { + this.locker.Lock() + costs, ok := this.m[label] + if ok { + costs = append(costs, costMs) + if len(costs) > 5 { // 只取最近的N条 + costs = costs[1:] + } + this.m[label] = costs + } else { + this.m[label] = []float64{costMs} + } + this.locker.Unlock() +} + +func (this *Manager) Labels() map[string]float64 { + var result = map[string]float64{} + this.locker.Lock() + for label, costs := range this.m { + var sum float64 + for _, cost := range costs { + sum += cost + } + result[label] = sum / float64(len(costs)) + } + this.locker.Unlock() + return result +} diff --git a/internal/trackers/manager_test.go b/internal/trackers/manager_test.go new file mode 100644 index 0000000..743f0f7 --- /dev/null +++ b/internal/trackers/manager_test.go @@ -0,0 +1,47 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package trackers + +import ( + "github.com/iwind/TeaGo/logs" + "testing" + "time" +) + +func TestNewManager(t *testing.T) { + { + var tr = Begin("a") + tr.End() + } + { + var tr = Begin("a") + time.Sleep(1 * time.Millisecond) + tr.End() + } + { + var tr = Begin("a") + time.Sleep(2 * time.Millisecond) + tr.End() + } + { + var tr = Begin("a") + time.Sleep(3 * time.Millisecond) + tr.End() + } + { + var tr = Begin("a") + time.Sleep(4 * time.Millisecond) + tr.End() + } + { + var tr = Begin("a") + time.Sleep(5 * time.Millisecond) + tr.End() + } + { + var tr = Begin("b") + tr.End() + } + + logs.PrintAsJSON(SharedManager.Labels(), t) +}