在监控系统运行时上报API连接状况

This commit is contained in:
GoEdgeLab
2022-11-22 11:23:39 +08:00
parent 2c4c64d0f5
commit 33dd1e9305
4 changed files with 102 additions and 4 deletions

View File

@@ -31,12 +31,15 @@ type NodeStatusExecutor struct {
cpuLogicalCount int cpuLogicalCount int
cpuPhysicalCount int cpuPhysicalCount int
apiCallStat *rpc.CallStat
ticker *time.Ticker ticker *time.Ticker
} }
func NewNodeStatusExecutor() *NodeStatusExecutor { func NewNodeStatusExecutor() *NodeStatusExecutor {
return &NodeStatusExecutor{ return &NodeStatusExecutor{
ticker: time.NewTicker(30 * time.Second), ticker: time.NewTicker(30 * time.Second),
apiCallStat: rpc.NewCallStat(10),
} }
} }
@@ -78,6 +81,11 @@ func (this *NodeStatusExecutor) update() {
status.CacheTotalMemorySize = caches.SharedManager.TotalMemorySize() status.CacheTotalMemorySize = caches.SharedManager.TotalMemorySize()
status.TrafficInBytes = teaconst.InTrafficBytes status.TrafficInBytes = teaconst.InTrafficBytes
status.TrafficOutBytes = teaconst.OutTrafficBytes status.TrafficOutBytes = teaconst.OutTrafficBytes
apiSuccessPercent, apiAvgCostSeconds := this.apiCallStat.Sum()
status.APISuccessPercent = apiSuccessPercent
status.APIAvgCostSeconds = apiAvgCostSeconds
var localFirewall = firewalls.Firewall() var localFirewall = firewalls.Firewall()
if localFirewall != nil && !localFirewall.IsMock() { if localFirewall != nil && !localFirewall.IsMock() {
status.LocalFirewallName = localFirewall.Name() status.LocalFirewallName = localFirewall.Name()
@@ -125,9 +133,13 @@ func (this *NodeStatusExecutor) update() {
remotelogs.Error("NODE_STATUS", "failed to open rpc: "+err.Error()) remotelogs.Error("NODE_STATUS", "failed to open rpc: "+err.Error())
return return
} }
var before = time.Now()
_, err = rpcClient.NodeRPC.UpdateNodeStatus(rpcClient.Context(), &pb.UpdateNodeStatusRequest{ _, err = rpcClient.NodeRPC.UpdateNodeStatus(rpcClient.Context(), &pb.UpdateNodeStatusRequest{
StatusJSON: jsonData, StatusJSON: jsonData,
}) })
var costSeconds = time.Since(before).Seconds()
this.apiCallStat.Add(err == nil, costSeconds)
if err != nil { if err != nil {
if rpc.IsConnError(err) { if rpc.IsConnError(err) {
remotelogs.Warn("NODE_STATUS", "rpc UpdateNodeStatus() failed: "+err.Error()) remotelogs.Warn("NODE_STATUS", "rpc UpdateNodeStatus() failed: "+err.Error())
@@ -140,7 +152,7 @@ func (this *NodeStatusExecutor) update() {
// 更新CPU // 更新CPU
func (this *NodeStatusExecutor) updateCPU(status *nodeconfigs.NodeStatus) { func (this *NodeStatusExecutor) updateCPU(status *nodeconfigs.NodeStatus) {
duration := time.Duration(0) var duration = time.Duration(0)
if this.isFirstTime { if this.isFirstTime {
duration = 100 * time.Millisecond duration = 100 * time.Millisecond
} }

66
internal/rpc/call_stat.go Normal file
View File

@@ -0,0 +1,66 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
package rpc
import (
"sync"
)
type callStatItem struct {
ok bool
costSeconds float64
}
type CallStat struct {
size int
items []*callStatItem
locker sync.Mutex
}
func NewCallStat(size int) *CallStat {
return &CallStat{
size: size,
}
}
func (this *CallStat) Add(ok bool, costSeconds float64) {
var size = this.size
if size <= 0 {
size = 10
}
this.locker.Lock()
this.items = append(this.items, &callStatItem{
ok: ok,
costSeconds: costSeconds,
})
if len(this.items) > size {
this.items = this.items[1:]
}
this.locker.Unlock()
}
func (this *CallStat) Sum() (successPercent float64, avgCostSeconds float64) {
this.locker.Lock()
defer this.locker.Unlock()
var totalItems = len(this.items)
if totalItems == 0 {
successPercent = 100
return
}
var totalOkItems = 0
var totalCostSeconds float64
for _, item := range this.items {
if item.ok {
totalOkItems++
}
totalCostSeconds += item.costSeconds
}
successPercent = float64(totalOkItems) * 100 / float64(totalItems)
avgCostSeconds = totalCostSeconds / float64(totalItems)
return
}

View File

@@ -0,0 +1,19 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
package rpc_test
import (
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"testing"
)
func TestNewCallStat(t *testing.T) {
var stat = rpc.NewCallStat(10)
stat.Add(true, 1)
stat.Add(true, 2)
stat.Add(true, 3)
stat.Add(false, 4)
stat.Add(true, 0)
stat.Add(true, 1)
t.Log(stat.Sum())
}

View File

@@ -287,14 +287,15 @@ func (this *RPCClient) pickConn() *grpc.ClientConn {
func (this *RPCClient) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error { func (this *RPCClient) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error {
var conn = this.pickConn() var conn = this.pickConn()
if conn == nil { if conn == nil {
return errors.New("can not get available grpc connection") return errors.New("could not get available grpc connection")
} }
return conn.Invoke(ctx, method, args, reply, opts...) return conn.Invoke(ctx, method, args, reply, opts...)
} }
func (this *RPCClient) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { func (this *RPCClient) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
var conn = this.pickConn() var conn = this.pickConn()
if conn == nil { if conn == nil {
return nil, errors.New("can not get available grpc connection") return nil, errors.New("could not get available grpc connection")
} }
return conn.NewStream(ctx, desc, method, opts...) return conn.NewStream(ctx, desc, method, opts...)
} }