mirror of
				https://github.com/TeaOSLab/EdgeNode.git
				synced 2025-11-04 16:00:25 +08:00 
			
		
		
		
	实现基本的监控
This commit is contained in:
		
							
								
								
									
										10
									
								
								internal/monitor/value.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										10
									
								
								internal/monitor/value.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,10 @@
 | 
			
		||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
 | 
			
		||||
 | 
			
		||||
package monitor
 | 
			
		||||
 | 
			
		||||
// ItemValue 数据值定义
 | 
			
		||||
type ItemValue struct {
 | 
			
		||||
	Item      string
 | 
			
		||||
	ValueJSON []byte
 | 
			
		||||
	CreatedAt int64
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										79
									
								
								internal/monitor/value_queue.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										79
									
								
								internal/monitor/value_queue.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,79 @@
 | 
			
		||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
 | 
			
		||||
 | 
			
		||||
package monitor
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/events"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/rpc"
 | 
			
		||||
	"github.com/iwind/TeaGo/maps"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var SharedValueQueue = NewValueQueue()
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	events.On(events.EventStart, func() {
 | 
			
		||||
		go SharedValueQueue.Start()
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ValueQueue 数据记录队列
 | 
			
		||||
type ValueQueue struct {
 | 
			
		||||
	valuesChan chan *ItemValue
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewValueQueue() *ValueQueue {
 | 
			
		||||
	return &ValueQueue{
 | 
			
		||||
		valuesChan: make(chan *ItemValue, 1024),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Start 启动队列
 | 
			
		||||
func (this *ValueQueue) Start() {
 | 
			
		||||
	// 这里单次循环就行,因为Loop里已经使用了Range通道
 | 
			
		||||
	err := this.Loop()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		remotelogs.Error("MONITOR_QUEUE", err.Error())
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Add 添加数据
 | 
			
		||||
func (this *ValueQueue) Add(item string, value maps.Map) {
 | 
			
		||||
	valueJSON, err := json.Marshal(value)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		remotelogs.Error("MONITOR_QUEUE", "marshal value error: "+err.Error())
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	select {
 | 
			
		||||
	case this.valuesChan <- &ItemValue{
 | 
			
		||||
		Item:      item,
 | 
			
		||||
		ValueJSON: valueJSON,
 | 
			
		||||
		CreatedAt: time.Now().Unix(),
 | 
			
		||||
	}:
 | 
			
		||||
	default:
 | 
			
		||||
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Loop 单次循环
 | 
			
		||||
func (this *ValueQueue) Loop() error {
 | 
			
		||||
	rpcClient, err := rpc.SharedRPC()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for value := range this.valuesChan {
 | 
			
		||||
		_, err = rpcClient.NodeValueRPC().CreateNodeValue(rpcClient.Context(), &pb.CreateNodeValueRequest{
 | 
			
		||||
			Item:      value.Item,
 | 
			
		||||
			ValueJSON: value.ValueJSON,
 | 
			
		||||
			CreatedAt: value.CreatedAt,
 | 
			
		||||
		})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
@@ -48,6 +48,7 @@ func (this *Listener) Listen() error {
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	netListener = NewTrafficListener(netListener)
 | 
			
		||||
	events.On(events.EventQuit, func() {
 | 
			
		||||
		remotelogs.Println("LISTENER", "quit "+this.group.FullAddr())
 | 
			
		||||
		_ = netListener.Close()
 | 
			
		||||
 
 | 
			
		||||
@@ -6,10 +6,12 @@ import (
 | 
			
		||||
	"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
 | 
			
		||||
	teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/events"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/monitor"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/rpc"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/utils"
 | 
			
		||||
	"github.com/iwind/TeaGo/lists"
 | 
			
		||||
	"github.com/iwind/TeaGo/maps"
 | 
			
		||||
	"github.com/shirou/gopsutil/cpu"
 | 
			
		||||
	"github.com/shirou/gopsutil/disk"
 | 
			
		||||
	"os"
 | 
			
		||||
@@ -63,6 +65,11 @@ func (this *NodeStatusExecutor) update() {
 | 
			
		||||
	status.IsActive = true
 | 
			
		||||
	status.ConnectionCount = sharedListenerManager.TotalActiveConnections()
 | 
			
		||||
 | 
			
		||||
	// 记录监控数据
 | 
			
		||||
	monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemConnections, maps.Map{
 | 
			
		||||
		"total": status.ConnectionCount,
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	hostname, _ := os.Hostname()
 | 
			
		||||
	status.Hostname = hostname
 | 
			
		||||
 | 
			
		||||
@@ -108,6 +115,11 @@ func (this *NodeStatusExecutor) updateCPU(status *nodeconfigs.NodeStatus) {
 | 
			
		||||
	}
 | 
			
		||||
	status.CPUUsage = percents[0] / 100
 | 
			
		||||
 | 
			
		||||
	// 记录监控数据
 | 
			
		||||
	monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemCPU, maps.Map{
 | 
			
		||||
		"usage": status.CPUUsage,
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	if this.cpuLogicalCount == 0 && this.cpuPhysicalCount == 0 {
 | 
			
		||||
		this.cpuUpdatedTime = time.Now()
 | 
			
		||||
 | 
			
		||||
@@ -188,4 +200,11 @@ func (this *NodeStatusExecutor) updateDisk(status *nodeconfigs.NodeStatus) {
 | 
			
		||||
	status.DiskTotal = total
 | 
			
		||||
	status.DiskUsage = float64(totalUsage) / float64(total)
 | 
			
		||||
	status.DiskMaxUsage = maxUsage / 100
 | 
			
		||||
 | 
			
		||||
	// 记录监控数据
 | 
			
		||||
	monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemDisk, maps.Map{
 | 
			
		||||
		"total":    status.DiskTotal,
 | 
			
		||||
		"usage":    status.DiskUsage,
 | 
			
		||||
		"maxUsage": status.DiskMaxUsage,
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -4,6 +4,8 @@ package nodes
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/monitor"
 | 
			
		||||
	"github.com/iwind/TeaGo/maps"
 | 
			
		||||
	"github.com/shirou/gopsutil/load"
 | 
			
		||||
	"github.com/shirou/gopsutil/mem"
 | 
			
		||||
)
 | 
			
		||||
@@ -22,6 +24,13 @@ func (this *NodeStatusExecutor) updateMem(status *nodeconfigs.NodeStatus) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	status.MemoryTotal = stat.Total
 | 
			
		||||
 | 
			
		||||
	// 记录监控数据
 | 
			
		||||
	monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemMemory, maps.Map{
 | 
			
		||||
		"usage": status.MemoryUsage,
 | 
			
		||||
		"total": status.MemoryTotal,
 | 
			
		||||
		"used":  stat.Used,
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 更新负载
 | 
			
		||||
@@ -38,4 +47,11 @@ func (this *NodeStatusExecutor) updateLoad(status *nodeconfigs.NodeStatus) {
 | 
			
		||||
	status.Load1m = stat.Load1
 | 
			
		||||
	status.Load5m = stat.Load5
 | 
			
		||||
	status.Load15m = stat.Load15
 | 
			
		||||
 | 
			
		||||
	// 记录监控数据
 | 
			
		||||
	monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemLoad, maps.Map{
 | 
			
		||||
		"load1m":  status.Load1m,
 | 
			
		||||
		"load5m":  status.Load5m,
 | 
			
		||||
		"load15m": status.Load15m,
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										92
									
								
								internal/nodes/traffic_conn.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										92
									
								
								internal/nodes/traffic_conn.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,92 @@
 | 
			
		||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
 | 
			
		||||
 | 
			
		||||
package nodes
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/events"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/monitor"
 | 
			
		||||
	"github.com/iwind/TeaGo/maps"
 | 
			
		||||
	"net"
 | 
			
		||||
	"sync/atomic"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// 流量统计
 | 
			
		||||
var inTrafficBytes = uint64(0)
 | 
			
		||||
var outTrafficBytes = uint64(0)
 | 
			
		||||
 | 
			
		||||
// 发送监控流量
 | 
			
		||||
func init() {
 | 
			
		||||
	events.On(events.EventStart, func() {
 | 
			
		||||
		ticker := time.NewTicker(1 * time.Minute)
 | 
			
		||||
		go func() {
 | 
			
		||||
			for range ticker.C {
 | 
			
		||||
				// 加入到数据队列中
 | 
			
		||||
				if inTrafficBytes > 0 {
 | 
			
		||||
					monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemTrafficIn, maps.Map{
 | 
			
		||||
						"total": inTrafficBytes,
 | 
			
		||||
					})
 | 
			
		||||
				}
 | 
			
		||||
				if outTrafficBytes > 0 {
 | 
			
		||||
					monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemTrafficOut, maps.Map{
 | 
			
		||||
						"total": outTrafficBytes,
 | 
			
		||||
					})
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				// 重置数据
 | 
			
		||||
				atomic.StoreUint64(&inTrafficBytes, 0)
 | 
			
		||||
				atomic.StoreUint64(&outTrafficBytes, 0)
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TrafficConn 用于统计流量的连接
 | 
			
		||||
type TrafficConn struct {
 | 
			
		||||
	rawConn net.Conn
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewTrafficConn(conn net.Conn) net.Conn {
 | 
			
		||||
	return &TrafficConn{rawConn: conn}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *TrafficConn) Read(b []byte) (n int, err error) {
 | 
			
		||||
	n, err = this.rawConn.Read(b)
 | 
			
		||||
	if n > 0 {
 | 
			
		||||
		atomic.AddUint64(&inTrafficBytes, uint64(n))
 | 
			
		||||
	}
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *TrafficConn) Write(b []byte) (n int, err error) {
 | 
			
		||||
	n, err = this.rawConn.Write(b)
 | 
			
		||||
	if n > 0 {
 | 
			
		||||
		atomic.AddUint64(&outTrafficBytes, uint64(n))
 | 
			
		||||
	}
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *TrafficConn) Close() error {
 | 
			
		||||
	return this.rawConn.Close()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *TrafficConn) LocalAddr() net.Addr {
 | 
			
		||||
	return this.rawConn.LocalAddr()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *TrafficConn) RemoteAddr() net.Addr {
 | 
			
		||||
	return this.rawConn.RemoteAddr()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *TrafficConn) SetDeadline(t time.Time) error {
 | 
			
		||||
	return this.rawConn.SetDeadline(t)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *TrafficConn) SetReadDeadline(t time.Time) error {
 | 
			
		||||
	return this.rawConn.SetReadDeadline(t)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *TrafficConn) SetWriteDeadline(t time.Time) error {
 | 
			
		||||
	return this.rawConn.SetWriteDeadline(t)
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										30
									
								
								internal/nodes/traffic_listener.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										30
									
								
								internal/nodes/traffic_listener.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,30 @@
 | 
			
		||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
 | 
			
		||||
 | 
			
		||||
package nodes
 | 
			
		||||
 | 
			
		||||
import "net"
 | 
			
		||||
 | 
			
		||||
// TrafficListener 用于统计流量的网络监听
 | 
			
		||||
type TrafficListener struct {
 | 
			
		||||
	rawListener net.Listener
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewTrafficListener(listener net.Listener) net.Listener {
 | 
			
		||||
	return &TrafficListener{rawListener: listener}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *TrafficListener) Accept() (net.Conn, error) {
 | 
			
		||||
	conn, err := this.rawListener.Accept()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return NewTrafficConn(conn), nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *TrafficListener) Close() error {
 | 
			
		||||
	return this.rawListener.Close()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *TrafficListener) Addr() net.Addr {
 | 
			
		||||
	return this.rawListener.Addr()
 | 
			
		||||
}
 | 
			
		||||
@@ -57,6 +57,10 @@ func (this *RPCClient) NodeTaskRPC() pb.NodeTaskServiceClient {
 | 
			
		||||
	return pb.NewNodeTaskServiceClient(this.pickConn())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *RPCClient) NodeValueRPC() pb.NodeValueServiceClient {
 | 
			
		||||
	return pb.NewNodeValueServiceClient(this.pickConn())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *RPCClient) HTTPAccessLogRPC() pb.HTTPAccessLogServiceClient {
 | 
			
		||||
	return pb.NewHTTPAccessLogServiceClient(this.pickConn())
 | 
			
		||||
}
 | 
			
		||||
@@ -105,7 +109,7 @@ func (this *RPCClient) ServerDailyStatRPC() pb.ServerDailyStatServiceClient {
 | 
			
		||||
	return pb.NewServerDailyStatServiceClient(this.pickConn())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 节点上下文信息
 | 
			
		||||
// Context 节点上下文信息
 | 
			
		||||
func (this *RPCClient) Context() context.Context {
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
	m := maps.Map{
 | 
			
		||||
@@ -128,7 +132,7 @@ func (this *RPCClient) Context() context.Context {
 | 
			
		||||
	return ctx
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 集群上下文
 | 
			
		||||
// ClusterContext 集群上下文
 | 
			
		||||
func (this *RPCClient) ClusterContext(clusterId string, clusterSecret string) context.Context {
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
	m := maps.Map{
 | 
			
		||||
@@ -151,14 +155,14 @@ func (this *RPCClient) ClusterContext(clusterId string, clusterSecret string) co
 | 
			
		||||
	return ctx
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 关闭连接
 | 
			
		||||
// Close 关闭连接
 | 
			
		||||
func (this *RPCClient) Close() {
 | 
			
		||||
	for _, conn := range this.conns {
 | 
			
		||||
		_ = conn.Close()
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 修改配置
 | 
			
		||||
// UpdateConfig 修改配置
 | 
			
		||||
func (this *RPCClient) UpdateConfig(config *configs.APIConfig) error {
 | 
			
		||||
	this.apiConfig = config
 | 
			
		||||
	return this.init()
 | 
			
		||||
 
 | 
			
		||||
@@ -1,55 +0,0 @@
 | 
			
		||||
package rpc
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/configs"
 | 
			
		||||
	_ "github.com/iwind/TeaGo/bootstrap"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestRPCClient_NodeRPC(t *testing.T) {
 | 
			
		||||
	before := time.Now()
 | 
			
		||||
	defer func() {
 | 
			
		||||
		t.Log(time.Since(before).Seconds()*1000, "ms")
 | 
			
		||||
	}()
 | 
			
		||||
	config, err := configs.LoadAPIConfig()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
	rpc, err := NewRPCClient(config)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
	resp, err := rpc.NodeRPC().ComposeNodeConfig(rpc.Context(), &pb.ComposeNodeConfigRequest{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
	t.Log(resp)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestSharedRPC_Stream(t *testing.T) {
 | 
			
		||||
	config, err := configs.LoadAPIConfig()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
	rpc, err := NewRPCClient(config)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
	client, err := rpc.NodeRPC().NodeStream(rpc.Context())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
	err = client.Send(&pb.NodeStreamRequest{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
	for {
 | 
			
		||||
		resp, err := client.Recv()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Fatal(err)
 | 
			
		||||
		}
 | 
			
		||||
		t.Log("recv:", resp)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@@ -15,14 +15,14 @@ import (
 | 
			
		||||
 | 
			
		||||
var SharedTrafficStatManager = NewTrafficStatManager()
 | 
			
		||||
 | 
			
		||||
// 区域流量统计
 | 
			
		||||
// TrafficStatManager 区域流量统计
 | 
			
		||||
type TrafficStatManager struct {
 | 
			
		||||
	m          map[string]int64 // [timestamp serverId] => bytes
 | 
			
		||||
	locker     sync.Mutex
 | 
			
		||||
	configFunc func() *nodeconfigs.NodeConfig
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 获取新对象
 | 
			
		||||
// NewTrafficStatManager 获取新对象
 | 
			
		||||
func NewTrafficStatManager() *TrafficStatManager {
 | 
			
		||||
	manager := &TrafficStatManager{
 | 
			
		||||
		m: map[string]int64{},
 | 
			
		||||
@@ -31,7 +31,7 @@ func NewTrafficStatManager() *TrafficStatManager {
 | 
			
		||||
	return manager
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 启动自动任务
 | 
			
		||||
// Start 启动自动任务
 | 
			
		||||
func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig) {
 | 
			
		||||
	this.configFunc = configFunc
 | 
			
		||||
 | 
			
		||||
@@ -54,7 +54,7 @@ func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 添加流量
 | 
			
		||||
// Add 添加流量
 | 
			
		||||
func (this *TrafficStatManager) Add(serverId int64, bytes int64) {
 | 
			
		||||
	if bytes == 0 {
 | 
			
		||||
		return
 | 
			
		||||
@@ -68,7 +68,7 @@ func (this *TrafficStatManager) Add(serverId int64, bytes int64) {
 | 
			
		||||
	this.locker.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 上传流量
 | 
			
		||||
// Upload 上传流量
 | 
			
		||||
func (this *TrafficStatManager) Upload() error {
 | 
			
		||||
	config := this.configFunc()
 | 
			
		||||
	if config == nil {
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user