mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-07 18:50:27 +08:00
增加服务带宽统计
This commit is contained in:
@@ -13,7 +13,7 @@ import (
|
|||||||
func AllowIP(ip string, serverId int64) (canGoNext bool, inAllowList bool) {
|
func AllowIP(ip string, serverId int64) (canGoNext bool, inAllowList bool) {
|
||||||
if !Tea.IsTesting() { // 如果在测试环境,我们不加入一些白名单,以便于可以在本地和局域网正常测试
|
if !Tea.IsTesting() { // 如果在测试环境,我们不加入一些白名单,以便于可以在本地和局域网正常测试
|
||||||
// 放行lo
|
// 放行lo
|
||||||
if ip == "127.0.0.1" {
|
if ip == "127.0.0.1" || ip == "::1" {
|
||||||
return true, true
|
return true, true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -210,7 +210,7 @@ func (this *Task) Start() error {
|
|||||||
var tr = trackers.Begin("[METRIC]UPLOAD_STATS")
|
var tr = trackers.Begin("[METRIC]UPLOAD_STATS")
|
||||||
err := this.Upload(1 * time.Second)
|
err := this.Upload(1 * time.Second)
|
||||||
tr.End()
|
tr.End()
|
||||||
if err != nil {
|
if err != nil && !rpc.IsConnError(err) {
|
||||||
remotelogs.Error("METRIC", "upload stats failed: "+err.Error())
|
remotelogs.Error("METRIC", "upload stats failed: "+err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,12 +7,14 @@ import (
|
|||||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs"
|
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs"
|
||||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/iplibrary"
|
"github.com/TeaOSLab/EdgeNode/internal/iplibrary"
|
||||||
|
"github.com/TeaOSLab/EdgeNode/internal/stats"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/ttlcache"
|
"github.com/TeaOSLab/EdgeNode/internal/ttlcache"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/waf"
|
"github.com/TeaOSLab/EdgeNode/internal/waf"
|
||||||
"github.com/iwind/TeaGo/types"
|
"github.com/iwind/TeaGo/types"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
@@ -26,6 +28,8 @@ type ClientConn struct {
|
|||||||
hasDeadline bool
|
hasDeadline bool
|
||||||
hasRead bool
|
hasRead bool
|
||||||
|
|
||||||
|
isLO bool // 是否为环路
|
||||||
|
|
||||||
hasResetSYNFlood bool
|
hasResetSYNFlood bool
|
||||||
|
|
||||||
BaseClientConn
|
BaseClientConn
|
||||||
@@ -41,10 +45,28 @@ func NewClientConn(conn net.Conn, isTLS bool, quickClose bool) net.Conn {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return &ClientConn{BaseClientConn: BaseClientConn{rawConn: conn}, isTLS: isTLS}
|
// 是否为环路
|
||||||
|
var remoteAddr = conn.RemoteAddr().String()
|
||||||
|
var isLO = strings.HasPrefix(remoteAddr, "127.0.0.1:") || strings.HasPrefix(remoteAddr, "[::1]:")
|
||||||
|
|
||||||
|
return &ClientConn{
|
||||||
|
BaseClientConn: BaseClientConn{rawConn: conn},
|
||||||
|
isTLS: isTLS,
|
||||||
|
isLO: isLO,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *ClientConn) Read(b []byte) (n int, err error) {
|
func (this *ClientConn) Read(b []byte) (n int, err error) {
|
||||||
|
// 环路直接读取
|
||||||
|
if this.isLO {
|
||||||
|
n, err = this.rawConn.Read(b)
|
||||||
|
if n > 0 {
|
||||||
|
atomic.AddUint64(&teaconst.InTrafficBytes, uint64(n))
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// TLS
|
||||||
if this.isTLS {
|
if this.isTLS {
|
||||||
if !this.hasDeadline {
|
if !this.hasDeadline {
|
||||||
_ = this.rawConn.SetReadDeadline(time.Now().Add(time.Duration(nodeconfigs.DefaultTLSHandshakeTimeout) * time.Second)) // TODO 握手超时时间可以设置
|
_ = this.rawConn.SetReadDeadline(time.Now().Add(time.Duration(nodeconfigs.DefaultTLSHandshakeTimeout) * time.Second)) // TODO 握手超时时间可以设置
|
||||||
@@ -55,6 +77,7 @@ func (this *ClientConn) Read(b []byte) (n int, err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 开始读取
|
||||||
n, err = this.rawConn.Read(b)
|
n, err = this.rawConn.Read(b)
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
atomic.AddUint64(&teaconst.InTrafficBytes, uint64(n))
|
atomic.AddUint64(&teaconst.InTrafficBytes, uint64(n))
|
||||||
@@ -85,7 +108,15 @@ func (this *ClientConn) Write(b []byte) (n int, err error) {
|
|||||||
n, err = this.rawConn.Write(b)
|
n, err = this.rawConn.Write(b)
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
atomic.AddUint64(&teaconst.OutTrafficBytes, uint64(n))
|
atomic.AddUint64(&teaconst.OutTrafficBytes, uint64(n))
|
||||||
|
|
||||||
|
// 统计当前服务带宽
|
||||||
|
if this.serverId > 0 {
|
||||||
|
if !this.isLO { // 环路不统计带宽,避免缓存预热等行为产生带宽
|
||||||
|
stats.SharedBandwidthStatManager.Add(this.serverId, int64(n))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -36,6 +36,16 @@ func (this *BaseClientConn) Bind(serverId int64, remoteAddr string, maxConnsPerS
|
|||||||
return sharedClientConnLimiter.Add(this.rawConn.RemoteAddr().String(), serverId, remoteAddr, maxConnsPerServer, maxConnsPerIP)
|
return sharedClientConnLimiter.Add(this.rawConn.RemoteAddr().String(), serverId, remoteAddr, maxConnsPerServer, maxConnsPerIP)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetServerId 设置服务ID
|
||||||
|
func (this *BaseClientConn) SetServerId(serverId int64) {
|
||||||
|
this.serverId = serverId
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServerId 读取当前连接绑定的服务ID
|
||||||
|
func (this *BaseClientConn) ServerId() int64 {
|
||||||
|
return this.serverId
|
||||||
|
}
|
||||||
|
|
||||||
// RawIP 原本IP
|
// RawIP 原本IP
|
||||||
func (this *BaseClientConn) RawIP() string {
|
func (this *BaseClientConn) RawIP() string {
|
||||||
ip, _, _ := net.SplitHostPort(this.rawConn.RemoteAddr().String())
|
ip, _, _ := net.SplitHostPort(this.rawConn.RemoteAddr().String())
|
||||||
|
|||||||
@@ -11,4 +11,10 @@ type ClientConnInterface interface {
|
|||||||
|
|
||||||
// Bind 绑定服务
|
// Bind 绑定服务
|
||||||
Bind(serverId int64, remoteAddr string, maxConnsPerServer int, maxConnsPerIP int) bool
|
Bind(serverId int64, remoteAddr string, maxConnsPerServer int, maxConnsPerIP int) bool
|
||||||
|
|
||||||
|
// ServerId 获取服务ID
|
||||||
|
ServerId() int64
|
||||||
|
|
||||||
|
// SetServerId 设置服务ID
|
||||||
|
SetServerId(serverId int64)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -356,7 +356,7 @@ func (this *HTTPRequest) doEnd() {
|
|||||||
|
|
||||||
// 流量统计
|
// 流量统计
|
||||||
// TODO 增加是否开启开关
|
// TODO 增加是否开启开关
|
||||||
if this.ReqServer != nil {
|
if this.ReqServer != nil && this.ReqServer.Id > 0 {
|
||||||
var countCached int64 = 0
|
var countCached int64 = 0
|
||||||
var cachedBytes int64 = 0
|
var cachedBytes int64 = 0
|
||||||
|
|
||||||
@@ -373,17 +373,17 @@ func (this *HTTPRequest) doEnd() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
stats.SharedTrafficStatManager.Add(this.ReqServer.Id, this.ReqHost, this.writer.SentBodyBytes()+this.writer.SentHeaderBytes(), cachedBytes, 1, countCached, countAttacks, attackBytes, this.ReqServer.ShouldCheckTrafficLimit(), this.ReqServer.PlanId())
|
stats.SharedTrafficStatManager.Add(this.ReqServer.Id, this.ReqHost, this.writer.SentBodyBytes()+this.writer.SentHeaderBytes(), cachedBytes, 1, countCached, countAttacks, attackBytes, this.ReqServer.ShouldCheckTrafficLimit(), this.ReqServer.PlanId())
|
||||||
}
|
|
||||||
|
|
||||||
// 指标
|
// 指标
|
||||||
if metrics.SharedManager.HasHTTPMetrics() {
|
if metrics.SharedManager.HasHTTPMetrics() {
|
||||||
this.doMetricsResponse()
|
this.doMetricsResponse()
|
||||||
}
|
}
|
||||||
|
|
||||||
// 统计
|
// 统计
|
||||||
if this.web.StatRef != nil && this.web.StatRef.IsOn {
|
if this.web.StatRef != nil && this.web.StatRef.IsOn {
|
||||||
// 放到最后执行
|
// 放到最后执行
|
||||||
this.doStat()
|
this.doStat()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ func (this *HTTPRequest) doRequestLimit() (shouldStop bool) {
|
|||||||
|
|
||||||
// 设置连接相关参数
|
// 设置连接相关参数
|
||||||
if this.web.RequestLimit.MaxConns > 0 || this.web.RequestLimit.MaxConnsPerIP > 0 {
|
if this.web.RequestLimit.MaxConns > 0 || this.web.RequestLimit.MaxConnsPerIP > 0 {
|
||||||
requestConn := this.RawReq.Context().Value(HTTPConnContextKey)
|
var requestConn = this.RawReq.Context().Value(HTTPConnContextKey)
|
||||||
if requestConn != nil {
|
if requestConn != nil {
|
||||||
clientConn, ok := requestConn.(ClientConnInterface)
|
clientConn, ok := requestConn.(ClientConnInterface)
|
||||||
if ok && !clientConn.IsBound() {
|
if ok && !clientConn.IsBound() {
|
||||||
|
|||||||
@@ -43,7 +43,7 @@ func (this *Listener) Listen() error {
|
|||||||
if this.group == nil {
|
if this.group == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
protocol := this.group.Protocol()
|
var protocol = this.group.Protocol()
|
||||||
if protocol.IsUDPFamily() {
|
if protocol.IsUDPFamily() {
|
||||||
return this.listenUDP()
|
return this.listenUDP()
|
||||||
}
|
}
|
||||||
@@ -54,7 +54,7 @@ func (this *Listener) listenTCP() error {
|
|||||||
if this.group == nil {
|
if this.group == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
protocol := this.group.Protocol()
|
var protocol = this.group.Protocol()
|
||||||
|
|
||||||
tcpListener, err := this.createTCPListener()
|
tcpListener, err := this.createTCPListener()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -178,6 +178,17 @@ func (this *HTTPListener) ServeHTTP(rawWriter http.ResponseWriter, rawReq *http.
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 绑定连接
|
||||||
|
if server != nil && server.Id > 0 {
|
||||||
|
var requestConn = rawReq.Context().Value(HTTPConnContextKey)
|
||||||
|
if requestConn != nil {
|
||||||
|
clientConn, ok := requestConn.(ClientConnInterface)
|
||||||
|
if ok {
|
||||||
|
clientConn.SetServerId(server.Id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// 包装新请求对象
|
// 包装新请求对象
|
||||||
var req = &HTTPRequest{
|
var req = &HTTPRequest{
|
||||||
RawReq: rawReq,
|
RawReq: rawReq,
|
||||||
|
|||||||
@@ -63,7 +63,6 @@ func (this *TCPListener) Reload(group *serverconfigs.ServerAddressGroup) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *TCPListener) handleConn(conn net.Conn) error {
|
func (this *TCPListener) handleConn(conn net.Conn) error {
|
||||||
|
|
||||||
var server = this.Group.FirstServer()
|
var server = this.Group.FirstServer()
|
||||||
if server == nil {
|
if server == nil {
|
||||||
return errors.New("no server available")
|
return errors.New("no server available")
|
||||||
@@ -72,6 +71,23 @@ func (this *TCPListener) handleConn(conn net.Conn) error {
|
|||||||
return errors.New("no ReverseProxy configured for the server")
|
return errors.New("no ReverseProxy configured for the server")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 绑定连接和服务
|
||||||
|
clientConn, ok := conn.(ClientConnInterface)
|
||||||
|
if ok {
|
||||||
|
clientConn.SetServerId(server.Id)
|
||||||
|
} else {
|
||||||
|
tlsConn, ok := conn.(*tls.Conn)
|
||||||
|
if ok {
|
||||||
|
var internalConn = tlsConn.NetConn()
|
||||||
|
if internalConn != nil {
|
||||||
|
clientConn, ok = internalConn.(ClientConnInterface)
|
||||||
|
if ok {
|
||||||
|
clientConn.SetServerId(server.Id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// 是否已达到流量限制
|
// 是否已达到流量限制
|
||||||
if this.reachedTrafficLimit() {
|
if this.reachedTrafficLimit() {
|
||||||
// 关闭连接
|
// 关闭连接
|
||||||
|
|||||||
@@ -123,6 +123,10 @@ func (this *RPCClient) ServerDailyStatRPC() pb.ServerDailyStatServiceClient {
|
|||||||
return pb.NewServerDailyStatServiceClient(this.pickConn())
|
return pb.NewServerDailyStatServiceClient(this.pickConn())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *RPCClient) ServerBandwidthStatRPC() pb.ServerBandwidthStatServiceClient {
|
||||||
|
return pb.NewServerBandwidthStatServiceClient(this.pickConn())
|
||||||
|
}
|
||||||
|
|
||||||
func (this *RPCClient) MetricStatRPC() pb.MetricStatServiceClient {
|
func (this *RPCClient) MetricStatRPC() pb.MetricStatServiceClient {
|
||||||
return pb.NewMetricStatServiceClient(this.pickConn())
|
return pb.NewMetricStatServiceClient(this.pickConn())
|
||||||
}
|
}
|
||||||
|
|||||||
147
internal/stats/bandwidth_stat_manager.go
Normal file
147
internal/stats/bandwidth_stat_manager.go
Normal file
@@ -0,0 +1,147 @@
|
|||||||
|
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||||
|
|
||||||
|
package stats
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||||
|
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||||
|
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||||
|
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||||
|
"github.com/TeaOSLab/EdgeNode/internal/rpc"
|
||||||
|
"github.com/iwind/TeaGo/logs"
|
||||||
|
"github.com/iwind/TeaGo/types"
|
||||||
|
timeutil "github.com/iwind/TeaGo/utils/time"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var SharedBandwidthStatManager = NewBandwidthStatManager()
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
events.On(events.EventLoaded, func() {
|
||||||
|
goman.New(func() {
|
||||||
|
SharedBandwidthStatManager.Start()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
type BandwidthStat struct {
|
||||||
|
Day string
|
||||||
|
TimeAt string
|
||||||
|
ServerId int64
|
||||||
|
|
||||||
|
CurrentBytes int64
|
||||||
|
CurrentTimestamp int64
|
||||||
|
MaxBytes int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// BandwidthStatManager 服务带宽统计
|
||||||
|
type BandwidthStatManager struct {
|
||||||
|
m map[string]*BandwidthStat // key => *BandwidthStat
|
||||||
|
|
||||||
|
lastTime string // 上一次执行的时间
|
||||||
|
|
||||||
|
ticker *time.Ticker
|
||||||
|
locker sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBandwidthStatManager() *BandwidthStatManager {
|
||||||
|
return &BandwidthStatManager{
|
||||||
|
m: map[string]*BandwidthStat{},
|
||||||
|
ticker: time.NewTicker(1 * time.Minute), // 时间小于1分钟是为了更快速地上传结果
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *BandwidthStatManager) Start() {
|
||||||
|
for range this.ticker.C {
|
||||||
|
err := this.Loop()
|
||||||
|
if err != nil && !rpc.IsConnError(err) {
|
||||||
|
remotelogs.Error("BANDWIDTH_STAT_MANAGER", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *BandwidthStatManager) Loop() error {
|
||||||
|
var now = time.Now()
|
||||||
|
var day = timeutil.Format("Ymd", now)
|
||||||
|
var currentTime = timeutil.FormatTime("Hi", now.Unix()/300*300)
|
||||||
|
|
||||||
|
if this.lastTime == currentTime {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
this.lastTime = currentTime
|
||||||
|
|
||||||
|
var pbStats = []*pb.ServerBandwidthStat{}
|
||||||
|
|
||||||
|
this.locker.Lock()
|
||||||
|
for key, stat := range this.m {
|
||||||
|
if stat.Day < day || stat.TimeAt < currentTime {
|
||||||
|
pbStats = append(pbStats, &pb.ServerBandwidthStat{
|
||||||
|
Id: 0,
|
||||||
|
ServerId: stat.ServerId,
|
||||||
|
Day: stat.Day,
|
||||||
|
TimeAt: stat.TimeAt,
|
||||||
|
Bytes: stat.MaxBytes,
|
||||||
|
})
|
||||||
|
delete(this.m, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.locker.Unlock()
|
||||||
|
|
||||||
|
if len(pbStats) > 0 {
|
||||||
|
// 上传
|
||||||
|
rpcClient, err := rpc.SharedRPC()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = rpcClient.ServerBandwidthStatRPC().UploadServerBandwidthStats(rpcClient.Context(), &pb.UploadServerBandwidthStatsRequest{ServerBandwidthStats: pbStats})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add 添加带宽数据
|
||||||
|
func (this *BandwidthStatManager) Add(serverId int64, bytes int64) {
|
||||||
|
if serverId <= 0 || bytes == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var now = time.Now()
|
||||||
|
var timestamp = now.Unix()
|
||||||
|
var day = timeutil.Format("Ymd", now)
|
||||||
|
var timeAt = timeutil.FormatTime("Hi", now.Unix()/300*300)
|
||||||
|
var key = types.String(serverId) + "@" + day + "@" + timeAt
|
||||||
|
|
||||||
|
this.locker.Lock()
|
||||||
|
stat, ok := this.m[key]
|
||||||
|
if ok {
|
||||||
|
if stat.CurrentTimestamp == timestamp {
|
||||||
|
stat.CurrentBytes += bytes
|
||||||
|
} else {
|
||||||
|
stat.CurrentBytes = bytes
|
||||||
|
stat.CurrentTimestamp = timestamp
|
||||||
|
}
|
||||||
|
if stat.CurrentBytes > stat.MaxBytes {
|
||||||
|
stat.MaxBytes = stat.CurrentBytes
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
this.m[key] = &BandwidthStat{
|
||||||
|
Day: day,
|
||||||
|
TimeAt: timeAt,
|
||||||
|
ServerId: serverId,
|
||||||
|
CurrentBytes: bytes,
|
||||||
|
MaxBytes: bytes,
|
||||||
|
CurrentTimestamp: timestamp,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.locker.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *BandwidthStatManager) Inspect() {
|
||||||
|
this.locker.Lock()
|
||||||
|
logs.PrintAsJSON(this.m)
|
||||||
|
this.locker.Unlock()
|
||||||
|
}
|
||||||
33
internal/stats/bandwidth_stat_manager_test.go
Normal file
33
internal/stats/bandwidth_stat_manager_test.go
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||||
|
|
||||||
|
package stats_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/TeaOSLab/EdgeNode/internal/stats"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBandwidthStatManager_Add(t *testing.T) {
|
||||||
|
var manager = stats.NewBandwidthStatManager()
|
||||||
|
manager.Add(1, 10)
|
||||||
|
manager.Add(1, 10)
|
||||||
|
manager.Add(1, 10)
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
manager.Add(1, 15)
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
manager.Add(1, 25)
|
||||||
|
manager.Add(1, 75)
|
||||||
|
manager.Inspect()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBandwidthStatManager_Loop(t *testing.T) {
|
||||||
|
var manager = stats.NewBandwidthStatManager()
|
||||||
|
manager.Add(1, 10)
|
||||||
|
manager.Add(1, 10)
|
||||||
|
manager.Add(1, 10)
|
||||||
|
err := manager.Loop()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -123,7 +123,7 @@ func (this *HTTPRequestStatManager) AddRemoteAddr(serverId int64, remoteAddr str
|
|||||||
if remoteAddr[0] == '[' { // 排除IPv6
|
if remoteAddr[0] == '[' { // 排除IPv6
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
index := strings.Index(remoteAddr, ":")
|
var index = strings.Index(remoteAddr, ":")
|
||||||
var ip string
|
var ip string
|
||||||
if index < 0 {
|
if index < 0 {
|
||||||
ip = remoteAddr
|
ip = remoteAddr
|
||||||
@@ -177,18 +177,18 @@ func (this *HTTPRequestStatManager) AddFirewallRuleGroupId(serverId int64, firew
|
|||||||
|
|
||||||
// Loop 单个循环
|
// Loop 单个循环
|
||||||
func (this *HTTPRequestStatManager) Loop() error {
|
func (this *HTTPRequestStatManager) Loop() error {
|
||||||
timeout := time.NewTimer(10 * time.Minute) // 执行的最大时间
|
var timeout = time.NewTimer(10 * time.Minute) // 执行的最大时间
|
||||||
Loop:
|
Loop:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case ipString := <-this.ipChan:
|
case ipString := <-this.ipChan:
|
||||||
// serverId@ip@bytes@isAttack
|
// serverId@ip@bytes@isAttack
|
||||||
pieces := strings.Split(ipString, "@")
|
var pieces = strings.Split(ipString, "@")
|
||||||
if len(pieces) < 4 {
|
if len(pieces) < 4 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
serverId := pieces[0]
|
var serverId = pieces[0]
|
||||||
ip := pieces[1]
|
var ip = pieces[1]
|
||||||
|
|
||||||
if iplibrary.SharedLibrary != nil {
|
if iplibrary.SharedLibrary != nil {
|
||||||
result, err := iplibrary.SharedLibrary.Lookup(ip)
|
result, err := iplibrary.SharedLibrary.Lookup(ip)
|
||||||
@@ -216,12 +216,12 @@ Loop:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
case userAgentString := <-this.userAgentChan:
|
case userAgentString := <-this.userAgentChan:
|
||||||
atIndex := strings.Index(userAgentString, "@")
|
var atIndex = strings.Index(userAgentString, "@")
|
||||||
if atIndex < 0 {
|
if atIndex < 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
serverId := userAgentString[:atIndex]
|
var serverId = userAgentString[:atIndex]
|
||||||
userAgent := userAgentString[atIndex+1:]
|
var userAgent = userAgentString[atIndex+1:]
|
||||||
|
|
||||||
var result = SharedUserAgentParser.Parse(userAgent)
|
var result = SharedUserAgentParser.Parse(userAgent)
|
||||||
var osInfo = result.OS
|
var osInfo = result.OS
|
||||||
@@ -264,12 +264,12 @@ func (this *HTTPRequestStatManager) Upload() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 月份相关
|
// 月份相关
|
||||||
pbCities := []*pb.UploadServerHTTPRequestStatRequest_RegionCity{}
|
var pbCities = []*pb.UploadServerHTTPRequestStatRequest_RegionCity{}
|
||||||
pbProviders := []*pb.UploadServerHTTPRequestStatRequest_RegionProvider{}
|
var pbProviders = []*pb.UploadServerHTTPRequestStatRequest_RegionProvider{}
|
||||||
pbSystems := []*pb.UploadServerHTTPRequestStatRequest_System{}
|
var pbSystems = []*pb.UploadServerHTTPRequestStatRequest_System{}
|
||||||
pbBrowsers := []*pb.UploadServerHTTPRequestStatRequest_Browser{}
|
var pbBrowsers = []*pb.UploadServerHTTPRequestStatRequest_Browser{}
|
||||||
for k, stat := range this.cityMap {
|
for k, stat := range this.cityMap {
|
||||||
pieces := strings.SplitN(k, "@", 4)
|
var pieces = strings.SplitN(k, "@", 4)
|
||||||
pbCities = append(pbCities, &pb.UploadServerHTTPRequestStatRequest_RegionCity{
|
pbCities = append(pbCities, &pb.UploadServerHTTPRequestStatRequest_RegionCity{
|
||||||
ServerId: types.Int64(pieces[0]),
|
ServerId: types.Int64(pieces[0]),
|
||||||
CountryName: pieces[1],
|
CountryName: pieces[1],
|
||||||
@@ -282,7 +282,7 @@ func (this *HTTPRequestStatManager) Upload() error {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
for k, count := range this.providerMap {
|
for k, count := range this.providerMap {
|
||||||
pieces := strings.SplitN(k, "@", 2)
|
var pieces = strings.SplitN(k, "@", 2)
|
||||||
pbProviders = append(pbProviders, &pb.UploadServerHTTPRequestStatRequest_RegionProvider{
|
pbProviders = append(pbProviders, &pb.UploadServerHTTPRequestStatRequest_RegionProvider{
|
||||||
ServerId: types.Int64(pieces[0]),
|
ServerId: types.Int64(pieces[0]),
|
||||||
Name: pieces[1],
|
Name: pieces[1],
|
||||||
@@ -290,7 +290,7 @@ func (this *HTTPRequestStatManager) Upload() error {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
for k, count := range this.systemMap {
|
for k, count := range this.systemMap {
|
||||||
pieces := strings.SplitN(k, "@", 3)
|
var pieces = strings.SplitN(k, "@", 3)
|
||||||
pbSystems = append(pbSystems, &pb.UploadServerHTTPRequestStatRequest_System{
|
pbSystems = append(pbSystems, &pb.UploadServerHTTPRequestStatRequest_System{
|
||||||
ServerId: types.Int64(pieces[0]),
|
ServerId: types.Int64(pieces[0]),
|
||||||
Name: pieces[1],
|
Name: pieces[1],
|
||||||
@@ -299,7 +299,7 @@ func (this *HTTPRequestStatManager) Upload() error {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
for k, count := range this.browserMap {
|
for k, count := range this.browserMap {
|
||||||
pieces := strings.SplitN(k, "@", 3)
|
var pieces = strings.SplitN(k, "@", 3)
|
||||||
pbBrowsers = append(pbBrowsers, &pb.UploadServerHTTPRequestStatRequest_Browser{
|
pbBrowsers = append(pbBrowsers, &pb.UploadServerHTTPRequestStatRequest_Browser{
|
||||||
ServerId: types.Int64(pieces[0]),
|
ServerId: types.Int64(pieces[0]),
|
||||||
Name: pieces[1],
|
Name: pieces[1],
|
||||||
@@ -309,9 +309,9 @@ func (this *HTTPRequestStatManager) Upload() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 防火墙相关
|
// 防火墙相关
|
||||||
pbFirewallRuleGroups := []*pb.UploadServerHTTPRequestStatRequest_HTTPFirewallRuleGroup{}
|
var pbFirewallRuleGroups = []*pb.UploadServerHTTPRequestStatRequest_HTTPFirewallRuleGroup{}
|
||||||
for k, count := range this.dailyFirewallRuleGroupMap {
|
for k, count := range this.dailyFirewallRuleGroupMap {
|
||||||
pieces := strings.SplitN(k, "@", 3)
|
var pieces = strings.SplitN(k, "@", 3)
|
||||||
pbFirewallRuleGroups = append(pbFirewallRuleGroups, &pb.UploadServerHTTPRequestStatRequest_HTTPFirewallRuleGroup{
|
pbFirewallRuleGroups = append(pbFirewallRuleGroups, &pb.UploadServerHTTPRequestStatRequest_HTTPFirewallRuleGroup{
|
||||||
ServerId: types.Int64(pieces[0]),
|
ServerId: types.Int64(pieces[0]),
|
||||||
HttpFirewallRuleGroupId: types.Int64(pieces[1]),
|
HttpFirewallRuleGroupId: types.Int64(pieces[1]),
|
||||||
|
|||||||
@@ -95,6 +95,10 @@ func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig)
|
|||||||
|
|
||||||
// Add 添加流量
|
// Add 添加流量
|
||||||
func (this *TrafficStatManager) Add(serverId int64, domain string, bytes int64, cachedBytes int64, countRequests int64, countCachedRequests int64, countAttacks int64, attackBytes int64, checkingTrafficLimit bool, planId int64) {
|
func (this *TrafficStatManager) Add(serverId int64, domain string, bytes int64, cachedBytes int64, countRequests int64, countCachedRequests int64, countAttacks int64, attackBytes int64, checkingTrafficLimit bool, planId int64) {
|
||||||
|
if serverId == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if bytes == 0 && countRequests == 0 {
|
if bytes == 0 && countRequests == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -139,7 +143,7 @@ func (this *TrafficStatManager) Add(serverId int64, domain string, bytes int64,
|
|||||||
|
|
||||||
// Upload 上传流量
|
// Upload 上传流量
|
||||||
func (this *TrafficStatManager) Upload() error {
|
func (this *TrafficStatManager) Upload() error {
|
||||||
config := this.configFunc()
|
var config = this.configFunc()
|
||||||
if config == nil {
|
if config == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -150,8 +154,8 @@ func (this *TrafficStatManager) Upload() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
this.locker.Lock()
|
this.locker.Lock()
|
||||||
itemMap := this.itemMap
|
var itemMap = this.itemMap
|
||||||
domainMap := this.domainsMap
|
var domainMap = this.domainsMap
|
||||||
this.itemMap = map[string]*TrafficItem{}
|
this.itemMap = map[string]*TrafficItem{}
|
||||||
this.domainsMap = map[string]*TrafficItem{}
|
this.domainsMap = map[string]*TrafficItem{}
|
||||||
this.locker.Unlock()
|
this.locker.Unlock()
|
||||||
|
|||||||
Reference in New Issue
Block a user