diff --git a/internal/const/const.go b/internal/const/const.go index 70bd636..c934293 100644 --- a/internal/const/const.go +++ b/internal/const/const.go @@ -1,7 +1,7 @@ package teaconst const ( - Version = "0.0.5" + Version = "0.0.6" ProductName = "Edge Node" ProcessName = "edge-node" diff --git a/internal/nodes/http_request.go b/internal/nodes/http_request.go index 93d6a0f..1fb7ddd 100644 --- a/internal/nodes/http_request.go +++ b/internal/nodes/http_request.go @@ -199,7 +199,14 @@ func (this *HTTPRequest) doBegin() { // 结束调用 func (this *HTTPRequest) doEnd() { + // 记录日志 this.log() + + // 流量统计 + // TODO 增加是否开启开关 + if this.Server != nil { + SharedTrafficStatManager.Add(this.Server.Id, this.writer.sentBodyBytes) + } } // 原始的请求URI diff --git a/internal/nodes/http_request_websocket.go b/internal/nodes/http_request_websocket.go index 0bc316b..772257b 100644 --- a/internal/nodes/http_request_websocket.go +++ b/internal/nodes/http_request_websocket.go @@ -69,7 +69,20 @@ func (this *HTTPRequest) doWebsocket() { }() go func() { - _, _ = io.Copy(clientConn, originConn) + buf := make([]byte, 4*1024) // TODO 使用内存池 + for { + n, err := originConn.Read(buf) + if n > 0 { + this.writer.sentBodyBytes += int64(n) + _, err = clientConn.Write(buf[:n]) + if err != nil { + break + } + } + if err != nil { + break + } + } _ = clientConn.Close() _ = originConn.Close() }() diff --git a/internal/nodes/listener_tcp.go b/internal/nodes/listener_tcp.go index ac814bf..4181cb3 100644 --- a/internal/nodes/listener_tcp.go +++ b/internal/nodes/listener_tcp.go @@ -65,7 +65,7 @@ func (this *TCPListener) handleConn(conn net.Conn) error { } go func() { - originBuffer := make([]byte, 32*1024) // TODO 需要可以设置,并可以使用Pool + originBuffer := make([]byte, 4*1024) // TODO 需要可以设置,并可以使用Pool for { n, err := originConn.Read(originBuffer) if n > 0 { @@ -74,6 +74,9 @@ func (this *TCPListener) handleConn(conn net.Conn) error { closer() break } + + // 记录流量 + SharedTrafficStatManager.Add(firstServer.Id, int64(n)) } if err != nil { closer() @@ -82,7 +85,7 @@ func (this *TCPListener) handleConn(conn net.Conn) error { } }() - clientBuffer := make([]byte, 32*1024) // TODO 需要可以设置,并可以使用Pool + clientBuffer := make([]byte, 4*1024) // TODO 需要可以设置,并可以使用Pool for { n, err := conn.Read(clientBuffer) if n > 0 { diff --git a/internal/nodes/traffic_stat_manager.go b/internal/nodes/traffic_stat_manager.go new file mode 100644 index 0000000..8b95828 --- /dev/null +++ b/internal/nodes/traffic_stat_manager.go @@ -0,0 +1,102 @@ +package nodes + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeNode/internal/logs" + "github.com/TeaOSLab/EdgeNode/internal/rpc" + "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/iwind/TeaGo/Tea" + "strconv" + "sync" + "time" +) + +var SharedTrafficStatManager = NewTrafficStatManager() + +// 流量统计 +type TrafficStatManager struct { + m map[string]int64 // [timestamp serverId] => bytes + locker sync.Mutex +} + +// 获取新对象 +func NewTrafficStatManager() *TrafficStatManager { + manager := &TrafficStatManager{ + m: map[string]int64{}, + } + + go manager.Start() + + return manager +} + +// 启动自动任务 +func (this *TrafficStatManager) Start() { + duration := 5 * time.Minute + if Tea.IsTesting() { + // 测试环境缩短上传时间,方便我们调试 + duration = 30 * time.Second + } + ticker := time.NewTicker(duration) + for range ticker.C { + err := this.Upload() + if err != nil { + logs.Error("TRAFFIC_STAT_MANAGER", "upload stats failed: "+err.Error()) + } + } +} + +// 添加流量 +func (this *TrafficStatManager) Add(serverId int64, bytes int64) { + if bytes == 0 { + return + } + + timestamp := utils.UnixTime() / 300 * 300 + + key := strconv.FormatInt(timestamp, 10) + strconv.FormatInt(serverId, 10) + this.locker.Lock() + this.m[key] += bytes + this.locker.Unlock() +} + +// 上传流量 +func (this *TrafficStatManager) Upload() error { + if sharedNodeConfig == nil { + return nil + } + + client, err := rpc.SharedRPC() + if err != nil { + return err + } + + this.locker.Lock() + m := this.m + this.m = map[string]int64{} + this.locker.Unlock() + + pbStats := []*pb.ServerDailyStat{} + for key, bytes := range m { + timestamp, err := strconv.ParseInt(key[:10], 10, 64) + if err != nil { + return err + } + serverId, err := strconv.ParseInt(key[10:], 10, 64) + if err != nil { + return err + } + + pbStats = append(pbStats, &pb.ServerDailyStat{ + ServerId: serverId, + RegionId: sharedNodeConfig.RegionId, + Bytes: bytes, + CreatedAt: timestamp, + }) + } + if len(pbStats) == 0 { + return nil + } + _, err = client.ServerDailyStatRPC().UploadServerDailyStats(client.Context(), &pb.UploadServerDailyStatsRequest{Stats: pbStats}) + return err +} diff --git a/internal/nodes/traffic_stat_manager_test.go b/internal/nodes/traffic_stat_manager_test.go new file mode 100644 index 0000000..23afa39 --- /dev/null +++ b/internal/nodes/traffic_stat_manager_test.go @@ -0,0 +1,35 @@ +package nodes + +import ( + "runtime" + "testing" +) + +func TestTrafficStatManager_Add(t *testing.T) { + manager := NewTrafficStatManager() + for i := 0; i < 100; i++ { + manager.Add(1, 10) + } + t.Log(manager.m) +} + +func TestTrafficStatManager_Upload(t *testing.T) { + manager := NewTrafficStatManager() + for i := 0; i < 100; i++ { + manager.Add(1, 10) + } + err := manager.Upload() + if err != nil { + t.Fatal(err) + } + t.Log("ok") +} + +func BenchmarkTrafficStatManager_Add(b *testing.B) { + runtime.GOMAXPROCS(1) + + manager := NewTrafficStatManager() + for i := 0; i < b.N; i++ { + manager.Add(1, 1024) + } +} diff --git a/internal/rpc/rpc_client.go b/internal/rpc/rpc_client.go index 7624491..a751ec4 100644 --- a/internal/rpc/rpc_client.go +++ b/internal/rpc/rpc_client.go @@ -93,6 +93,10 @@ func (this *RPCClient) ACMEAuthenticationRPC() pb.ACMEAuthenticationServiceClien return pb.NewACMEAuthenticationServiceClient(this.pickConn()) } +func (this *RPCClient) ServerDailyStatRPC() pb.ServerDailyStatServiceClient { + return pb.NewServerDailyStatServiceClient(this.pickConn()) +} + // 节点上下文信息 func (this *RPCClient) Context() context.Context { ctx := context.Background()