增加对任务的执行时间追踪工具

This commit is contained in:
刘祥超
2021-11-14 10:55:09 +08:00
parent 7e43324b53
commit 4daeca912a
15 changed files with 229 additions and 10 deletions

View File

@@ -11,6 +11,7 @@ import (
"net/http"
_ "net/http/pprof"
"os"
"sort"
)
func main() {
@@ -60,6 +61,33 @@ func main() {
node := nodes.NewNode()
node.Start()
})
app.On("trackers", func() {
var sock = gosock.NewTmpSock(teaconst.ProcessName)
reply, err := sock.Send(&gosock.Command{Code: "trackers"})
if err != nil {
fmt.Println("[ERROR]" + err.Error())
} else {
labelsMap, ok := reply.Params["labels"]
if ok {
labels, ok := labelsMap.(map[string]interface{})
if ok {
if len(labels) == 0 {
fmt.Println("no labels yet")
} else {
var labelNames = []string{}
for label := range labels {
labelNames = append(labelNames, label)
}
sort.Strings(labelNames)
for _, labelName := range labelNames {
fmt.Println(labelName + ": " + fmt.Sprintf("%.6f", labels[labelName]))
}
}
}
}
}
})
app.Run(func() {
node := nodes.NewNode()
node.Start()

View File

@@ -9,6 +9,7 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/rands"
@@ -654,7 +655,9 @@ func (this *FileStorage) initList() error {
})
go func() {
for this.ticker.Next() {
var tr = trackers.Begin("FILE_CACHE_STORAGE_PURGE_LOOP")
this.purgeLoop()
tr.End()
}
}()

View File

@@ -517,3 +517,16 @@ func BenchmarkFileStorage_Read(b *testing.B) {
_ = reader.Close()
}
}
func BenchmarkFileStorage_KeyPath(b *testing.B) {
runtime.GOMAXPROCS(1)
var storage = &FileStorage{
cacheConfig: &serverconfigs.HTTPFileCacheStorage{},
policy: &serverconfigs.HTTPCachePolicy{Id: 1},
}
for i := 0; i < b.N; i++ {
_, _ = storage.keyPath(strconv.Itoa(i))
}
}

View File

@@ -4,11 +4,13 @@ import (
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/cespare/xxhash"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
"math"
"runtime"
"strconv"
"sync"
"sync/atomic"
@@ -84,7 +86,9 @@ func (this *MemoryStorage) Init() error {
this.purgeTicker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second)
go func() {
for this.purgeTicker.Next() {
var tr = trackers.Begin("MEMORY_CACHE_STORAGE_PURGE_LOOP")
this.purgeLoop()
tr.End()
}
}()
@@ -260,6 +264,9 @@ func (this *MemoryStorage) Stop() {
this.locker.Unlock()
// 回收内存
runtime.GC()
remotelogs.Println("CACHE", "close memory storage '"+strconv.FormatInt(this.policy.Id, 10)+"'")
}
@@ -327,14 +334,14 @@ func (this *MemoryStorage) purgeLoop() {
if startLFU {
var total, _ = this.list.Count()
if total > 0 {
var count = types.Int(math.Ceil(float64(total) * float64(lfuFreePercent * 2) / 100))
var count = types.Int(math.Ceil(float64(total) * float64(lfuFreePercent*2) / 100))
if count > 0 {
// 限制单次清理的条数,防止占用太多系统资源
if count > 2000 {
count = 2000
}
remotelogs.Println("CACHE", "LFU purge policy '"+this.policy.Name+"' id: "+types.String(this.policy.Id)+", count: "+types.String(count))
// 这里不提示LFU因为此事件将会非常频繁
err := this.list.PurgeLFU(count, func(hash string) error {
uintHash, err := strconv.ParseUint(hash, 10, 64)

View File

@@ -13,7 +13,7 @@ import (
)
func TestMemoryStorage_OpenWriter(t *testing.T) {
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{})
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200)
if err != nil {
@@ -88,7 +88,7 @@ func TestMemoryStorage_OpenWriter(t *testing.T) {
}
func TestMemoryStorage_OpenReaderLock(t *testing.T) {
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{})
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
_ = storage.Init()
var h = storage.hash("test")
@@ -101,7 +101,7 @@ func TestMemoryStorage_OpenReaderLock(t *testing.T) {
}
func TestMemoryStorage_Delete(t *testing.T) {
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{})
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
{
writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200)
if err != nil {
@@ -123,7 +123,7 @@ func TestMemoryStorage_Delete(t *testing.T) {
}
func TestMemoryStorage_Stat(t *testing.T) {
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{})
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
expiredAt := time.Now().Unix() + 60
{
writer, err := storage.OpenWriter("abc", expiredAt, 200)
@@ -160,7 +160,7 @@ func TestMemoryStorage_Stat(t *testing.T) {
}
func TestMemoryStorage_CleanAll(t *testing.T) {
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{})
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
expiredAt := time.Now().Unix() + 60
{
writer, err := storage.OpenWriter("abc", expiredAt, 200)
@@ -195,7 +195,7 @@ func TestMemoryStorage_CleanAll(t *testing.T) {
}
func TestMemoryStorage_Purge(t *testing.T) {
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{})
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
expiredAt := time.Now().Unix() + 60
{
writer, err := storage.OpenWriter("abc", expiredAt, 200)
@@ -232,7 +232,7 @@ func TestMemoryStorage_Purge(t *testing.T) {
func TestMemoryStorage_Expire(t *testing.T) {
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{
MemoryAutoPurgeInterval: 5,
})
}, nil)
err := storage.Init()
if err != nil {
t.Fatal(err)
@@ -256,7 +256,7 @@ func TestMemoryStorage_Expire(t *testing.T) {
}
func TestMemoryStorage_Locker(t *testing.T) {
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{})
storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
err := storage.Init()
if err != nil {
t.Fatal(err)

View File

@@ -9,6 +9,7 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/Tea"
_ "github.com/mattn/go-sqlite3"
@@ -164,6 +165,8 @@ func (this *Task) Start() error {
this.statsTicker = utils.NewTicker(1 * time.Minute)
go func() {
for this.statsTicker.Next() {
var tr = trackers.Begin("[METRIC]DUMP_STATS_TO_LOCAL_DATABASE")
this.statsLocker.Lock()
var statsMap = this.statsMap
this.statsMap = map[string]*Stat{}
@@ -175,6 +178,8 @@ func (this *Task) Start() error {
remotelogs.Error("METRIC", "insert stat failed: "+err.Error())
}
}
tr.End()
}
}()
@@ -182,7 +187,9 @@ func (this *Task) Start() error {
this.cleanTicker = utils.NewTicker(24 * time.Hour)
go func() {
for this.cleanTicker.Next() {
var tr = trackers.Begin("[METRIC]CLEAN_EXPIRED")
err := this.CleanExpired()
tr.End()
if err != nil {
remotelogs.Error("METRIC", "clean expired stats failed: "+err.Error())
}
@@ -193,7 +200,9 @@ func (this *Task) Start() error {
this.uploadTicker = utils.NewTicker(this.item.UploadDuration())
go func() {
for this.uploadTicker.Next() {
var tr = trackers.Begin("[METRIC]UPLOAD_STATS")
err := this.Upload(1 * time.Second)
tr.End()
if err != nil {
remotelogs.Error("METRIC", "upload stats failed: "+err.Error())
}

View File

@@ -16,6 +16,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/stats"
"github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/andybalholm/brotli"
"github.com/go-yaml/yaml"
@@ -231,6 +232,9 @@ func (this *Node) InstallSystemService() error {
// 循环
func (this *Node) loop() error {
var tr = trackers.Begin("CHECK_NODE_CONFIG_CHANGES")
defer tr.End()
// 检查api.yaml是否存在
apiConfigFile := Tea.ConfigFile("api.yaml")
_, err := os.Stat(apiConfigFile)
@@ -554,6 +558,12 @@ func (this *Node) listenSock() error {
time.Sleep(1 * time.Second)
}
}()
case "trackers":
_ = cmd.Reply(&gosock.Command{
Params: map[string]interface{}{
"labels": trackers.SharedManager.Labels(),
},
})
}
})

View File

@@ -10,6 +10,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/monitor"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/maps"
@@ -58,6 +59,9 @@ func (this *NodeStatusExecutor) update() {
return
}
var tr = trackers.Begin("UPLOAD_NODE_STATUS")
defer tr.End()
status := &nodeconfigs.NodeStatus{}
status.BuildVersion = teaconst.Version
status.BuildVersionCode = utils.VersionToLong(teaconst.Version)
@@ -79,11 +83,26 @@ func (this *NodeStatusExecutor) update() {
hostname, _ := os.Hostname()
status.Hostname = hostname
var cpuTR = tr.Begin("cpu")
this.updateCPU(status)
cpuTR.End()
var memTR = tr.Begin("memory")
this.updateMem(status)
memTR.End()
var loadTR = tr.Begin("load")
this.updateLoad(status)
loadTR.End()
var diskTR = tr.Begin("disk")
this.updateDisk(status)
diskTR.End()
var cacheSpaceTR = tr.Begin("cache space")
this.updateCacheSpace(status)
cacheSpaceTR.End()
status.UpdatedAt = time.Now().Unix()
// 发送数据

View File

@@ -6,6 +6,7 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/iwind/TeaGo/Tea"
"sync"
"time"
@@ -60,6 +61,9 @@ func (this *OriginStateManager) Loop() error {
return nil
}
var tr = trackers.Begin("CHECK_ORIGIN_STATES")
defer tr.End()
var currentStates = []*OriginState{}
this.locker.Lock()
for originId, state := range this.stateMap {

View File

@@ -8,6 +8,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/logs"
"google.golang.org/grpc"
@@ -53,6 +54,9 @@ func (this *SyncAPINodesTask) Start() {
}
func (this *SyncAPINodesTask) Loop() error {
var tr = trackers.Begin("SYNC_API_NODES")
defer tr.End()
// 获取所有可用的节点
rpcClient, err := rpc.SharedRPC()
if err != nil {

View File

@@ -5,6 +5,7 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/iwind/TeaGo/logs"
"time"
)
@@ -16,7 +17,9 @@ func init() {
ticker := time.NewTicker(60 * time.Second)
go func() {
for range ticker.C {
var tr = trackers.Begin("UPLOAD_REMOTE_LOGS")
err := uploadLogs()
tr.End()
if err != nil {
logs.Println("[LOG]" + err.Error())
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/monitor"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/TeaOSLab/EdgeNode/internal/waf"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/maps"
@@ -89,7 +90,9 @@ func (this *HTTPRequestStatManager) Start() {
}
select {
case <-uploadTicker.C:
var tr = trackers.Begin("UPLOAD_REQUEST_STATS")
err := this.Upload()
tr.End()
if err != nil {
if !rpc.IsConnError(err) {
remotelogs.Error("HTTP_REQUEST_STAT_MANAGER", "upload failed: "+err.Error())

View File

@@ -0,0 +1,22 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package trackers
import "time"
type tracker struct {
label string
startTime time.Time
}
func Begin(label string) *tracker {
return &tracker{label: label, startTime: time.Now()}
}
func (this *tracker) End() {
SharedManager.Add(this.label, time.Since(this.startTime).Seconds()*1000)
}
func (this *tracker) Begin(subLabel string) *tracker {
return Begin(this.label + ":" + subLabel)
}

View File

@@ -0,0 +1,47 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package trackers
import (
"sync"
)
var SharedManager = NewManager()
type Manager struct {
m map[string][]float64 // label => time costs ms
locker sync.Mutex
}
func NewManager() *Manager {
return &Manager{m: map[string][]float64{}}
}
func (this *Manager) Add(label string, costMs float64) {
this.locker.Lock()
costs, ok := this.m[label]
if ok {
costs = append(costs, costMs)
if len(costs) > 5 { // 只取最近的N条
costs = costs[1:]
}
this.m[label] = costs
} else {
this.m[label] = []float64{costMs}
}
this.locker.Unlock()
}
func (this *Manager) Labels() map[string]float64 {
var result = map[string]float64{}
this.locker.Lock()
for label, costs := range this.m {
var sum float64
for _, cost := range costs {
sum += cost
}
result[label] = sum / float64(len(costs))
}
this.locker.Unlock()
return result
}

View File

@@ -0,0 +1,47 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package trackers
import (
"github.com/iwind/TeaGo/logs"
"testing"
"time"
)
func TestNewManager(t *testing.T) {
{
var tr = Begin("a")
tr.End()
}
{
var tr = Begin("a")
time.Sleep(1 * time.Millisecond)
tr.End()
}
{
var tr = Begin("a")
time.Sleep(2 * time.Millisecond)
tr.End()
}
{
var tr = Begin("a")
time.Sleep(3 * time.Millisecond)
tr.End()
}
{
var tr = Begin("a")
time.Sleep(4 * time.Millisecond)
tr.End()
}
{
var tr = Begin("a")
time.Sleep(5 * time.Millisecond)
tr.End()
}
{
var tr = Begin("b")
tr.End()
}
logs.PrintAsJSON(SharedManager.Labels(), t)
}