diff --git a/internal/nodes/node_status_executor.go b/internal/nodes/node_status_executor.go index d68121d..d8699b8 100644 --- a/internal/nodes/node_status_executor.go +++ b/internal/nodes/node_status_executor.go @@ -31,12 +31,15 @@ type NodeStatusExecutor struct { cpuLogicalCount int cpuPhysicalCount int + apiCallStat *rpc.CallStat + ticker *time.Ticker } func NewNodeStatusExecutor() *NodeStatusExecutor { return &NodeStatusExecutor{ - ticker: time.NewTicker(30 * time.Second), + ticker: time.NewTicker(30 * time.Second), + apiCallStat: rpc.NewCallStat(10), } } @@ -78,6 +81,11 @@ func (this *NodeStatusExecutor) update() { status.CacheTotalMemorySize = caches.SharedManager.TotalMemorySize() status.TrafficInBytes = teaconst.InTrafficBytes status.TrafficOutBytes = teaconst.OutTrafficBytes + + apiSuccessPercent, apiAvgCostSeconds := this.apiCallStat.Sum() + status.APISuccessPercent = apiSuccessPercent + status.APIAvgCostSeconds = apiAvgCostSeconds + var localFirewall = firewalls.Firewall() if localFirewall != nil && !localFirewall.IsMock() { status.LocalFirewallName = localFirewall.Name() @@ -125,9 +133,13 @@ func (this *NodeStatusExecutor) update() { remotelogs.Error("NODE_STATUS", "failed to open rpc: "+err.Error()) return } + + var before = time.Now() _, err = rpcClient.NodeRPC.UpdateNodeStatus(rpcClient.Context(), &pb.UpdateNodeStatusRequest{ StatusJSON: jsonData, }) + var costSeconds = time.Since(before).Seconds() + this.apiCallStat.Add(err == nil, costSeconds) if err != nil { if rpc.IsConnError(err) { remotelogs.Warn("NODE_STATUS", "rpc UpdateNodeStatus() failed: "+err.Error()) @@ -140,7 +152,7 @@ func (this *NodeStatusExecutor) update() { // 更新CPU func (this *NodeStatusExecutor) updateCPU(status *nodeconfigs.NodeStatus) { - duration := time.Duration(0) + var duration = time.Duration(0) if this.isFirstTime { duration = 100 * time.Millisecond } diff --git a/internal/rpc/call_stat.go b/internal/rpc/call_stat.go new file mode 100644 index 0000000..faa15dc --- /dev/null +++ b/internal/rpc/call_stat.go @@ -0,0 +1,66 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package rpc + +import ( + "sync" +) + +type callStatItem struct { + ok bool + costSeconds float64 +} + +type CallStat struct { + size int + items []*callStatItem + + locker sync.Mutex +} + +func NewCallStat(size int) *CallStat { + return &CallStat{ + size: size, + } +} + +func (this *CallStat) Add(ok bool, costSeconds float64) { + var size = this.size + if size <= 0 { + size = 10 + } + + this.locker.Lock() + this.items = append(this.items, &callStatItem{ + ok: ok, + costSeconds: costSeconds, + }) + if len(this.items) > size { + this.items = this.items[1:] + } + this.locker.Unlock() +} + +func (this *CallStat) Sum() (successPercent float64, avgCostSeconds float64) { + this.locker.Lock() + defer this.locker.Unlock() + + var totalItems = len(this.items) + if totalItems == 0 { + successPercent = 100 + return + } + + var totalOkItems = 0 + var totalCostSeconds float64 + for _, item := range this.items { + if item.ok { + totalOkItems++ + } + totalCostSeconds += item.costSeconds + } + successPercent = float64(totalOkItems) * 100 / float64(totalItems) + avgCostSeconds = totalCostSeconds / float64(totalItems) + + return +} diff --git a/internal/rpc/call_stat_test.go b/internal/rpc/call_stat_test.go new file mode 100644 index 0000000..9f2107c --- /dev/null +++ b/internal/rpc/call_stat_test.go @@ -0,0 +1,19 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package rpc_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/rpc" + "testing" +) + +func TestNewCallStat(t *testing.T) { + var stat = rpc.NewCallStat(10) + stat.Add(true, 1) + stat.Add(true, 2) + stat.Add(true, 3) + stat.Add(false, 4) + stat.Add(true, 0) + stat.Add(true, 1) + t.Log(stat.Sum()) +} diff --git a/internal/rpc/rpc_client.go b/internal/rpc/rpc_client.go index 652d464..0975b72 100644 --- a/internal/rpc/rpc_client.go +++ b/internal/rpc/rpc_client.go @@ -287,14 +287,15 @@ func (this *RPCClient) pickConn() *grpc.ClientConn { func (this *RPCClient) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error { var conn = this.pickConn() if conn == nil { - return errors.New("can not get available grpc connection") + return errors.New("could not get available grpc connection") } return conn.Invoke(ctx, method, args, reply, opts...) } + func (this *RPCClient) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { var conn = this.pickConn() if conn == nil { - return nil, errors.New("can not get available grpc connection") + return nil, errors.New("could not get available grpc connection") } return conn.NewStream(ctx, desc, method, opts...) }