增加Websocket连接数统计

This commit is contained in:
刘祥超
2023-12-20 11:43:00 +08:00
parent 4607a1f4e7
commit 4f24b7f39c
8 changed files with 59 additions and 46 deletions

View File

@@ -85,6 +85,8 @@ type HTTPRequest struct {
isAttack bool // 是否是攻击请求 isAttack bool // 是否是攻击请求
requestBodyData []byte // 读取的Body内容 requestBodyData []byte // 读取的Body内容
isWebsocketResponse bool // 是否为Websocket响应非请求
// WAF相关 // WAF相关
firewallPolicyId int64 firewallPolicyId int64
firewallRuleGroupId int64 firewallRuleGroupId int64
@@ -410,6 +412,8 @@ func (this *HTTPRequest) doEnd() {
var countAttacks int64 = 0 var countAttacks int64 = 0
var attackBytes int64 = 0 var attackBytes int64 = 0
var countWebsocketConnections int64 = 0
if this.isCached { if this.isCached {
countCached = 1 countCached = 1
cachedBytes = totalBytes cachedBytes = totalBytes
@@ -421,8 +425,11 @@ func (this *HTTPRequest) doEnd() {
attackBytes = totalBytes attackBytes = totalBytes
} }
} }
if this.isWebsocketResponse {
countWebsocketConnections = 1
}
stats.SharedTrafficStatManager.Add(this.ReqServer.UserId, this.ReqServer.Id, this.ReqHost, totalBytes, cachedBytes, 1, countCached, countAttacks, attackBytes, this.ReqServer.ShouldCheckTrafficLimit(), this.ReqServer.PlanId()) stats.SharedTrafficStatManager.Add(this.ReqServer.UserId, this.ReqServer.Id, this.ReqHost, totalBytes, cachedBytes, 1, countCached, countAttacks, attackBytes, countWebsocketConnections, this.ReqServer.ShouldCheckTrafficLimit(), this.ReqServer.PlanId())
// 指标 // 指标
if metrics.SharedManager.HasHTTPMetrics() { if metrics.SharedManager.HasHTTPMetrics() {

View File

@@ -61,6 +61,9 @@ func (this *HTTPRequest) doWebsocket(requestHost string, isLastRetry bool) (shou
} }
} }
// 标记
this.isWebsocketResponse = true
// 设置指定的来源域 // 设置指定的来源域
if !this.web.Websocket.RequestSameOrigin && len(this.web.Websocket.RequestOrigin) > 0 { if !this.web.Websocket.RequestSameOrigin && len(this.web.Websocket.RequestOrigin) > 0 {
var newRequestOrigin = this.web.Websocket.RequestOrigin var newRequestOrigin = this.web.Websocket.RequestOrigin
@@ -77,7 +80,6 @@ func (this *HTTPRequest) doWebsocket(requestHost string, isLastRetry bool) (shou
} }
// 连接源站 // 连接源站
// TODO 增加N次错误重试重试的时候需要尝试不同的源站
originConn, _, err := OriginConnect(this.origin, this.requestServerPort(), this.RawReq.RemoteAddr, requestHost) originConn, _, err := OriginConnect(this.origin, this.requestServerPort(), this.RawReq.RemoteAddr, requestHost)
if err != nil { if err != nil {
if isLastRetry { if isLastRetry {

View File

@@ -135,14 +135,14 @@ func (this *TCPListener) handleConn(server *serverconfigs.ServerConfig, conn net
serverName = tlsConn.ConnectionState().ServerName serverName = tlsConn.ConnectionState().ServerName
if len(serverName) > 0 { if len(serverName) > 0 {
// 统计 // 统计
stats.SharedTrafficStatManager.Add(server.UserId, server.Id, serverName, 0, 0, 1, 0, 0, 0, server.ShouldCheckTrafficLimit(), server.PlanId()) stats.SharedTrafficStatManager.Add(server.UserId, server.Id, serverName, 0, 0, 1, 0, 0, 0, 0, server.ShouldCheckTrafficLimit(), server.PlanId())
recordStat = true recordStat = true
} }
} }
// 统计 // 统计
if !recordStat { if !recordStat {
stats.SharedTrafficStatManager.Add(server.UserId, server.Id, "", 0, 0, 1, 0, 0, 0, server.ShouldCheckTrafficLimit(), server.PlanId()) stats.SharedTrafficStatManager.Add(server.UserId, server.Id, "", 0, 0, 1, 0, 0, 0, 0, server.ShouldCheckTrafficLimit(), server.PlanId())
} }
originConn, err := this.connectOrigin(server.Id, serverName, server.ReverseProxy, conn.RemoteAddr().String()) originConn, err := this.connectOrigin(server.Id, serverName, server.ReverseProxy, conn.RemoteAddr().String())
@@ -197,7 +197,7 @@ func (this *TCPListener) handleConn(server *serverconfigs.ServerConfig, conn net
// 记录流量 // 记录流量
if server != nil { if server != nil {
stats.SharedTrafficStatManager.Add(server.UserId, server.Id, "", int64(n), 0, 0, 0, 0, 0, server.ShouldCheckTrafficLimit(), server.PlanId()) stats.SharedTrafficStatManager.Add(server.UserId, server.Id, "", int64(n), 0, 0, 0, 0, 0, 0, server.ShouldCheckTrafficLimit(), server.PlanId())
} }
} }
if err != nil { if err != nil {

View File

@@ -370,7 +370,7 @@ func NewUDPConn(server *serverconfigs.ServerConfig, addr net.Addr, proxyListener
// 统计 // 统计
if server != nil { if server != nil {
stats.SharedTrafficStatManager.Add(server.UserId, server.Id, "", 0, 0, 1, 0, 0, 0, server.ShouldCheckTrafficLimit(), server.PlanId()) stats.SharedTrafficStatManager.Add(server.UserId, server.Id, "", 0, 0, 1, 0, 0, 0, 0, server.ShouldCheckTrafficLimit(), server.PlanId())
} }
// 处理ControlMessage // 处理ControlMessage
@@ -401,7 +401,7 @@ func NewUDPConn(server *serverconfigs.ServerConfig, addr net.Addr, proxyListener
// 记录流量和带宽 // 记录流量和带宽
if server != nil { if server != nil {
// 流量 // 流量
stats.SharedTrafficStatManager.Add(server.UserId, server.Id, "", int64(n), 0, 0, 0, 0, 0, server.ShouldCheckTrafficLimit(), server.PlanId()) stats.SharedTrafficStatManager.Add(server.UserId, server.Id, "", int64(n), 0, 0, 0, 0, 0, 0, server.ShouldCheckTrafficLimit(), server.PlanId())
// 带宽 // 带宽
var userPlanId int64 var userPlanId int64

View File

@@ -62,6 +62,7 @@ type BandwidthStat struct {
CountRequests int64 `json:"countRequests"` CountRequests int64 `json:"countRequests"`
CountCachedRequests int64 `json:"countCachedRequests"` CountCachedRequests int64 `json:"countCachedRequests"`
CountAttackRequests int64 `json:"countAttackRequests"` CountAttackRequests int64 `json:"countAttackRequests"`
CountWebsocketConnections int64 `json:"countWebsocketConnections"`
UserPlanId int64 `json:"userPlanId"` UserPlanId int64 `json:"userPlanId"`
} }
@@ -154,6 +155,7 @@ func (this *BandwidthStatManager) Loop() error {
CountRequests: stat.CountRequests, CountRequests: stat.CountRequests,
CountCachedRequests: stat.CountCachedRequests, CountCachedRequests: stat.CountCachedRequests,
CountAttackRequests: stat.CountAttackRequests, CountAttackRequests: stat.CountAttackRequests,
CountWebsocketConnections: stat.CountWebsocketConnections,
UserPlanId: stat.UserPlanId, UserPlanId: stat.UserPlanId,
NodeRegionId: regionId, NodeRegionId: regionId,
}) })
@@ -231,7 +233,7 @@ func (this *BandwidthStatManager) AddBandwidth(userId int64, userPlanId int64, s
} }
// AddTraffic 添加请求数据 // AddTraffic 添加请求数据
func (this *BandwidthStatManager) AddTraffic(serverId int64, cachedBytes int64, countRequests int64, countCachedRequests int64, countAttacks int64, attackBytes int64) { func (this *BandwidthStatManager) AddTraffic(serverId int64, cachedBytes int64, countRequests int64, countCachedRequests int64, countAttacks int64, attackBytes int64, countWebsocketConnections int64) {
var now = fasttime.Now() var now = fasttime.Now()
var day = now.Ymd() var day = now.Ymd()
var timeAt = now.Round5Hi() var timeAt = now.Round5Hi()
@@ -245,6 +247,7 @@ func (this *BandwidthStatManager) AddTraffic(serverId int64, cachedBytes int64,
stat.CountCachedRequests += countCachedRequests stat.CountCachedRequests += countCachedRequests
stat.CountAttackRequests += countAttacks stat.CountAttackRequests += countAttacks
stat.AttackBytes += attackBytes stat.AttackBytes += attackBytes
stat.CountWebsocketConnections += countWebsocketConnections
} }
this.locker.Unlock() this.locker.Unlock()
} }

View File

@@ -65,6 +65,7 @@ func BenchmarkBandwidthStatManager_Slice(b *testing.B) {
CountRequests: stat.CountRequests, CountRequests: stat.CountRequests,
CountCachedRequests: stat.CountCachedRequests, CountCachedRequests: stat.CountCachedRequests,
CountAttackRequests: stat.CountAttackRequests, CountAttackRequests: stat.CountAttackRequests,
CountWebsocketConnections: stat.CountWebsocketConnections,
NodeRegionId: 1, NodeRegionId: 1,
}) })
} }

View File

@@ -106,13 +106,13 @@ func (this *TrafficStatManager) Start() {
} }
// Add 添加流量 // Add 添加流量
func (this *TrafficStatManager) Add(userId int64, serverId int64, domain string, bytes int64, cachedBytes int64, countRequests int64, countCachedRequests int64, countAttacks int64, attackBytes int64, checkingTrafficLimit bool, planId int64) { func (this *TrafficStatManager) Add(userId int64, serverId int64, domain string, bytes int64, cachedBytes int64, countRequests int64, countCachedRequests int64, countAttacks int64, attackBytes int64, countWebsocketConnections int64, checkingTrafficLimit bool, planId int64) {
if serverId == 0 { if serverId == 0 {
return return
} }
// 添加到带宽 // 添加到带宽
SharedBandwidthStatManager.AddTraffic(serverId, cachedBytes, countRequests, countCachedRequests, countAttacks, attackBytes) SharedBandwidthStatManager.AddTraffic(serverId, cachedBytes, countRequests, countCachedRequests, countAttacks, attackBytes, countWebsocketConnections)
if bytes == 0 && countRequests == 0 { if bytes == 0 && countRequests == 0 {
return return

View File

@@ -11,7 +11,7 @@ import (
func TestTrafficStatManager_Add(t *testing.T) { func TestTrafficStatManager_Add(t *testing.T) {
manager := NewTrafficStatManager() manager := NewTrafficStatManager()
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
manager.Add(1, 1, "goedge.cn", 1, 0, 0, 0, 0, 0, false, 0) manager.Add(1, 1, "goedge.cn", 1, 0, 0, 0, 0, 0, 0, false, 0)
} }
t.Log(manager.itemMap) t.Log(manager.itemMap)
} }
@@ -19,7 +19,7 @@ func TestTrafficStatManager_Add(t *testing.T) {
func TestTrafficStatManager_Upload(t *testing.T) { func TestTrafficStatManager_Upload(t *testing.T) {
manager := NewTrafficStatManager() manager := NewTrafficStatManager()
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
manager.Add(1, 1, "goedge.cn"+types.String(rands.Int(0, 10)), 1, 0, 1, 0, 0, 0, false, 0) manager.Add(1, 1, "goedge.cn"+types.String(rands.Int(0, 10)), 1, 0, 1, 0, 0, 0, 0, false, 0)
} }
err := manager.Upload() err := manager.Upload()
if err != nil { if err != nil {
@@ -36,7 +36,7 @@ func BenchmarkTrafficStatManager_Add(b *testing.B) {
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
manager.Add(1, 1, "goedge.cn"+types.String(rand.Int63()%10), 1024, 1, 0, 0, 0, 0, false, 0) manager.Add(1, 1, "goedge.cn"+types.String(rand.Int63()%10), 1024, 1, 0, 0, 0, 0, 0, false, 0)
} }
}) })
} }