From a5b97d370bace1be351a9dcfdcd3fa3379dbdc9e Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Thu, 29 Apr 2021 16:48:47 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E5=9F=BA=E6=9C=AC=E7=9A=84?= =?UTF-8?q?=E7=9B=91=E6=8E=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build/build-all.sh | 2 +- internal/monitor/value.go | 10 +++ internal/monitor/value_queue.go | 79 ++++++++++++++++++ internal/nodes/listener.go | 1 + internal/nodes/node_status_executor.go | 19 +++++ internal/nodes/node_status_executor_unix.go | 16 ++++ internal/nodes/traffic_conn.go | 92 +++++++++++++++++++++ internal/nodes/traffic_listener.go | 30 +++++++ internal/rpc/rpc_client.go | 12 ++- internal/rpc/rpc_client_test.go | 55 ------------ internal/stats/traffic_stat_manager.go | 10 +-- 11 files changed, 261 insertions(+), 65 deletions(-) create mode 100644 internal/monitor/value.go create mode 100644 internal/monitor/value_queue.go create mode 100644 internal/nodes/traffic_conn.go create mode 100644 internal/nodes/traffic_listener.go delete mode 100644 internal/rpc/rpc_client_test.go diff --git a/build/build-all.sh b/build/build-all.sh index 20d0454..94381b6 100755 --- a/build/build-all.sh +++ b/build/build-all.sh @@ -5,4 +5,4 @@ ./build.sh linux arm64 ./build.sh linux mips64 ./build.sh linux mips64le -./build.sh darwin amd64 +./build.sh darwin amd64 \ No newline at end of file diff --git a/internal/monitor/value.go b/internal/monitor/value.go new file mode 100644 index 0000000..365a285 --- /dev/null +++ b/internal/monitor/value.go @@ -0,0 +1,10 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package monitor + +// ItemValue 数据值定义 +type ItemValue struct { + Item string + ValueJSON []byte + CreatedAt int64 +} diff --git a/internal/monitor/value_queue.go b/internal/monitor/value_queue.go new file mode 100644 index 0000000..83395d1 --- /dev/null +++ b/internal/monitor/value_queue.go @@ -0,0 +1,79 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package monitor + +import ( + "encoding/json" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/TeaOSLab/EdgeNode/internal/rpc" + "github.com/iwind/TeaGo/maps" + "time" +) + +var SharedValueQueue = NewValueQueue() + +func init() { + events.On(events.EventStart, func() { + go SharedValueQueue.Start() + }) +} + +// ValueQueue 数据记录队列 +type ValueQueue struct { + valuesChan chan *ItemValue +} + +func NewValueQueue() *ValueQueue { + return &ValueQueue{ + valuesChan: make(chan *ItemValue, 1024), + } +} + +// Start 启动队列 +func (this *ValueQueue) Start() { + // 这里单次循环就行,因为Loop里已经使用了Range通道 + err := this.Loop() + if err != nil { + remotelogs.Error("MONITOR_QUEUE", err.Error()) + } +} + +// Add 添加数据 +func (this *ValueQueue) Add(item string, value maps.Map) { + valueJSON, err := json.Marshal(value) + if err != nil { + remotelogs.Error("MONITOR_QUEUE", "marshal value error: "+err.Error()) + return + } + select { + case this.valuesChan <- &ItemValue{ + Item: item, + ValueJSON: valueJSON, + CreatedAt: time.Now().Unix(), + }: + default: + + } +} + +// Loop 单次循环 +func (this *ValueQueue) Loop() error { + rpcClient, err := rpc.SharedRPC() + if err != nil { + return err + } + + for value := range this.valuesChan { + _, err = rpcClient.NodeValueRPC().CreateNodeValue(rpcClient.Context(), &pb.CreateNodeValueRequest{ + Item: value.Item, + ValueJSON: value.ValueJSON, + CreatedAt: value.CreatedAt, + }) + if err != nil { + return err + } + } + return nil +} diff --git a/internal/nodes/listener.go b/internal/nodes/listener.go index c646886..6f465d8 100644 --- a/internal/nodes/listener.go +++ b/internal/nodes/listener.go @@ -48,6 +48,7 @@ func (this *Listener) Listen() error { if err != nil { return err } + netListener = NewTrafficListener(netListener) events.On(events.EventQuit, func() { remotelogs.Println("LISTENER", "quit "+this.group.FullAddr()) _ = netListener.Close() diff --git a/internal/nodes/node_status_executor.go b/internal/nodes/node_status_executor.go index f048dcf..b5cd59c 100644 --- a/internal/nodes/node_status_executor.go +++ b/internal/nodes/node_status_executor.go @@ -6,10 +6,12 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/monitor" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/iwind/TeaGo/lists" + "github.com/iwind/TeaGo/maps" "github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/disk" "os" @@ -63,6 +65,11 @@ func (this *NodeStatusExecutor) update() { status.IsActive = true status.ConnectionCount = sharedListenerManager.TotalActiveConnections() + // 记录监控数据 + monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemConnections, maps.Map{ + "total": status.ConnectionCount, + }) + hostname, _ := os.Hostname() status.Hostname = hostname @@ -108,6 +115,11 @@ func (this *NodeStatusExecutor) updateCPU(status *nodeconfigs.NodeStatus) { } status.CPUUsage = percents[0] / 100 + // 记录监控数据 + monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemCPU, maps.Map{ + "usage": status.CPUUsage, + }) + if this.cpuLogicalCount == 0 && this.cpuPhysicalCount == 0 { this.cpuUpdatedTime = time.Now() @@ -188,4 +200,11 @@ func (this *NodeStatusExecutor) updateDisk(status *nodeconfigs.NodeStatus) { status.DiskTotal = total status.DiskUsage = float64(totalUsage) / float64(total) status.DiskMaxUsage = maxUsage / 100 + + // 记录监控数据 + monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemDisk, maps.Map{ + "total": status.DiskTotal, + "usage": status.DiskUsage, + "maxUsage": status.DiskMaxUsage, + }) } diff --git a/internal/nodes/node_status_executor_unix.go b/internal/nodes/node_status_executor_unix.go index 599c6b4..6a56ba8 100644 --- a/internal/nodes/node_status_executor_unix.go +++ b/internal/nodes/node_status_executor_unix.go @@ -4,6 +4,8 @@ package nodes import ( "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" + "github.com/TeaOSLab/EdgeNode/internal/monitor" + "github.com/iwind/TeaGo/maps" "github.com/shirou/gopsutil/load" "github.com/shirou/gopsutil/mem" ) @@ -22,6 +24,13 @@ func (this *NodeStatusExecutor) updateMem(status *nodeconfigs.NodeStatus) { } status.MemoryTotal = stat.Total + + // 记录监控数据 + monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemMemory, maps.Map{ + "usage": status.MemoryUsage, + "total": status.MemoryTotal, + "used": stat.Used, + }) } // 更新负载 @@ -38,4 +47,11 @@ func (this *NodeStatusExecutor) updateLoad(status *nodeconfigs.NodeStatus) { status.Load1m = stat.Load1 status.Load5m = stat.Load5 status.Load15m = stat.Load15 + + // 记录监控数据 + monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemLoad, maps.Map{ + "load1m": status.Load1m, + "load5m": status.Load5m, + "load15m": status.Load15m, + }) } diff --git a/internal/nodes/traffic_conn.go b/internal/nodes/traffic_conn.go new file mode 100644 index 0000000..9ced9dd --- /dev/null +++ b/internal/nodes/traffic_conn.go @@ -0,0 +1,92 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package nodes + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" + "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/monitor" + "github.com/iwind/TeaGo/maps" + "net" + "sync/atomic" + "time" +) + +// 流量统计 +var inTrafficBytes = uint64(0) +var outTrafficBytes = uint64(0) + +// 发送监控流量 +func init() { + events.On(events.EventStart, func() { + ticker := time.NewTicker(1 * time.Minute) + go func() { + for range ticker.C { + // 加入到数据队列中 + if inTrafficBytes > 0 { + monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemTrafficIn, maps.Map{ + "total": inTrafficBytes, + }) + } + if outTrafficBytes > 0 { + monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemTrafficOut, maps.Map{ + "total": outTrafficBytes, + }) + } + + // 重置数据 + atomic.StoreUint64(&inTrafficBytes, 0) + atomic.StoreUint64(&outTrafficBytes, 0) + } + }() + }) +} + +// TrafficConn 用于统计流量的连接 +type TrafficConn struct { + rawConn net.Conn +} + +func NewTrafficConn(conn net.Conn) net.Conn { + return &TrafficConn{rawConn: conn} +} + +func (this *TrafficConn) Read(b []byte) (n int, err error) { + n, err = this.rawConn.Read(b) + if n > 0 { + atomic.AddUint64(&inTrafficBytes, uint64(n)) + } + return +} + +func (this *TrafficConn) Write(b []byte) (n int, err error) { + n, err = this.rawConn.Write(b) + if n > 0 { + atomic.AddUint64(&outTrafficBytes, uint64(n)) + } + return +} + +func (this *TrafficConn) Close() error { + return this.rawConn.Close() +} + +func (this *TrafficConn) LocalAddr() net.Addr { + return this.rawConn.LocalAddr() +} + +func (this *TrafficConn) RemoteAddr() net.Addr { + return this.rawConn.RemoteAddr() +} + +func (this *TrafficConn) SetDeadline(t time.Time) error { + return this.rawConn.SetDeadline(t) +} + +func (this *TrafficConn) SetReadDeadline(t time.Time) error { + return this.rawConn.SetReadDeadline(t) +} + +func (this *TrafficConn) SetWriteDeadline(t time.Time) error { + return this.rawConn.SetWriteDeadline(t) +} diff --git a/internal/nodes/traffic_listener.go b/internal/nodes/traffic_listener.go new file mode 100644 index 0000000..e934fbb --- /dev/null +++ b/internal/nodes/traffic_listener.go @@ -0,0 +1,30 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package nodes + +import "net" + +// TrafficListener 用于统计流量的网络监听 +type TrafficListener struct { + rawListener net.Listener +} + +func NewTrafficListener(listener net.Listener) net.Listener { + return &TrafficListener{rawListener: listener} +} + +func (this *TrafficListener) Accept() (net.Conn, error) { + conn, err := this.rawListener.Accept() + if err != nil { + return nil, err + } + return NewTrafficConn(conn), nil +} + +func (this *TrafficListener) Close() error { + return this.rawListener.Close() +} + +func (this *TrafficListener) Addr() net.Addr { + return this.rawListener.Addr() +} diff --git a/internal/rpc/rpc_client.go b/internal/rpc/rpc_client.go index 47462de..28434a6 100644 --- a/internal/rpc/rpc_client.go +++ b/internal/rpc/rpc_client.go @@ -57,6 +57,10 @@ func (this *RPCClient) NodeTaskRPC() pb.NodeTaskServiceClient { return pb.NewNodeTaskServiceClient(this.pickConn()) } +func (this *RPCClient) NodeValueRPC() pb.NodeValueServiceClient { + return pb.NewNodeValueServiceClient(this.pickConn()) +} + func (this *RPCClient) HTTPAccessLogRPC() pb.HTTPAccessLogServiceClient { return pb.NewHTTPAccessLogServiceClient(this.pickConn()) } @@ -105,7 +109,7 @@ func (this *RPCClient) ServerDailyStatRPC() pb.ServerDailyStatServiceClient { return pb.NewServerDailyStatServiceClient(this.pickConn()) } -// 节点上下文信息 +// Context 节点上下文信息 func (this *RPCClient) Context() context.Context { ctx := context.Background() m := maps.Map{ @@ -128,7 +132,7 @@ func (this *RPCClient) Context() context.Context { return ctx } -// 集群上下文 +// ClusterContext 集群上下文 func (this *RPCClient) ClusterContext(clusterId string, clusterSecret string) context.Context { ctx := context.Background() m := maps.Map{ @@ -151,14 +155,14 @@ func (this *RPCClient) ClusterContext(clusterId string, clusterSecret string) co return ctx } -// 关闭连接 +// Close 关闭连接 func (this *RPCClient) Close() { for _, conn := range this.conns { _ = conn.Close() } } -// 修改配置 +// UpdateConfig 修改配置 func (this *RPCClient) UpdateConfig(config *configs.APIConfig) error { this.apiConfig = config return this.init() diff --git a/internal/rpc/rpc_client_test.go b/internal/rpc/rpc_client_test.go deleted file mode 100644 index 3ef2ecc..0000000 --- a/internal/rpc/rpc_client_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package rpc - -import ( - "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" - "github.com/TeaOSLab/EdgeNode/internal/configs" - _ "github.com/iwind/TeaGo/bootstrap" - "testing" - "time" -) - -func TestRPCClient_NodeRPC(t *testing.T) { - before := time.Now() - defer func() { - t.Log(time.Since(before).Seconds()*1000, "ms") - }() - config, err := configs.LoadAPIConfig() - if err != nil { - t.Fatal(err) - } - rpc, err := NewRPCClient(config) - if err != nil { - t.Fatal(err) - } - resp, err := rpc.NodeRPC().ComposeNodeConfig(rpc.Context(), &pb.ComposeNodeConfigRequest{}) - if err != nil { - t.Fatal(err) - } - t.Log(resp) -} - -func TestSharedRPC_Stream(t *testing.T) { - config, err := configs.LoadAPIConfig() - if err != nil { - t.Fatal(err) - } - rpc, err := NewRPCClient(config) - if err != nil { - t.Fatal(err) - } - client, err := rpc.NodeRPC().NodeStream(rpc.Context()) - if err != nil { - t.Fatal(err) - } - err = client.Send(&pb.NodeStreamRequest{}) - if err != nil { - t.Fatal(err) - } - for { - resp, err := client.Recv() - if err != nil { - t.Fatal(err) - } - t.Log("recv:", resp) - } -} diff --git a/internal/stats/traffic_stat_manager.go b/internal/stats/traffic_stat_manager.go index f6e082b..19bf3ee 100644 --- a/internal/stats/traffic_stat_manager.go +++ b/internal/stats/traffic_stat_manager.go @@ -15,14 +15,14 @@ import ( var SharedTrafficStatManager = NewTrafficStatManager() -// 区域流量统计 +// TrafficStatManager 区域流量统计 type TrafficStatManager struct { m map[string]int64 // [timestamp serverId] => bytes locker sync.Mutex configFunc func() *nodeconfigs.NodeConfig } -// 获取新对象 +// NewTrafficStatManager 获取新对象 func NewTrafficStatManager() *TrafficStatManager { manager := &TrafficStatManager{ m: map[string]int64{}, @@ -31,7 +31,7 @@ func NewTrafficStatManager() *TrafficStatManager { return manager } -// 启动自动任务 +// Start 启动自动任务 func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig) { this.configFunc = configFunc @@ -54,7 +54,7 @@ func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig) } } -// 添加流量 +// Add 添加流量 func (this *TrafficStatManager) Add(serverId int64, bytes int64) { if bytes == 0 { return @@ -68,7 +68,7 @@ func (this *TrafficStatManager) Add(serverId int64, bytes int64) { this.locker.Unlock() } -// 上传流量 +// Upload 上传流量 func (this *TrafficStatManager) Upload() error { config := this.configFunc() if config == nil {