diff --git a/internal/iplibrary/list_utils.go b/internal/iplibrary/list_utils.go index 1f96692..af1b589 100644 --- a/internal/iplibrary/list_utils.go +++ b/internal/iplibrary/list_utils.go @@ -13,7 +13,7 @@ import ( func AllowIP(ip string, serverId int64) (canGoNext bool, inAllowList bool) { if !Tea.IsTesting() { // 如果在测试环境,我们不加入一些白名单,以便于可以在本地和局域网正常测试 // 放行lo - if ip == "127.0.0.1" { + if ip == "127.0.0.1" || ip == "::1" { return true, true } diff --git a/internal/metrics/task.go b/internal/metrics/task.go index 1bbd5ee..207c0e7 100644 --- a/internal/metrics/task.go +++ b/internal/metrics/task.go @@ -210,7 +210,7 @@ func (this *Task) Start() error { var tr = trackers.Begin("[METRIC]UPLOAD_STATS") err := this.Upload(1 * time.Second) tr.End() - if err != nil { + if err != nil && !rpc.IsConnError(err) { remotelogs.Error("METRIC", "upload stats failed: "+err.Error()) } } diff --git a/internal/nodes/client_conn.go b/internal/nodes/client_conn.go index 224f0e5..dd0d900 100644 --- a/internal/nodes/client_conn.go +++ b/internal/nodes/client_conn.go @@ -7,12 +7,14 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/iplibrary" + "github.com/TeaOSLab/EdgeNode/internal/stats" "github.com/TeaOSLab/EdgeNode/internal/ttlcache" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/waf" "github.com/iwind/TeaGo/types" "net" "os" + "strings" "sync" "sync/atomic" "time" @@ -26,6 +28,8 @@ type ClientConn struct { hasDeadline bool hasRead bool + isLO bool // 是否为环路 + hasResetSYNFlood bool BaseClientConn @@ -41,10 +45,28 @@ func NewClientConn(conn net.Conn, isTLS bool, quickClose bool) net.Conn { } } - return &ClientConn{BaseClientConn: BaseClientConn{rawConn: conn}, isTLS: isTLS} + // 是否为环路 + var remoteAddr = conn.RemoteAddr().String() + var isLO = strings.HasPrefix(remoteAddr, "127.0.0.1:") || strings.HasPrefix(remoteAddr, "[::1]:") + + return &ClientConn{ + BaseClientConn: BaseClientConn{rawConn: conn}, + isTLS: isTLS, + isLO: isLO, + } } func (this *ClientConn) Read(b []byte) (n int, err error) { + // 环路直接读取 + if this.isLO { + n, err = this.rawConn.Read(b) + if n > 0 { + atomic.AddUint64(&teaconst.InTrafficBytes, uint64(n)) + } + return + } + + // TLS if this.isTLS { if !this.hasDeadline { _ = this.rawConn.SetReadDeadline(time.Now().Add(time.Duration(nodeconfigs.DefaultTLSHandshakeTimeout) * time.Second)) // TODO 握手超时时间可以设置 @@ -55,6 +77,7 @@ func (this *ClientConn) Read(b []byte) (n int, err error) { } } + // 开始读取 n, err = this.rawConn.Read(b) if n > 0 { atomic.AddUint64(&teaconst.InTrafficBytes, uint64(n)) @@ -85,7 +108,15 @@ func (this *ClientConn) Write(b []byte) (n int, err error) { n, err = this.rawConn.Write(b) if n > 0 { atomic.AddUint64(&teaconst.OutTrafficBytes, uint64(n)) + + // 统计当前服务带宽 + if this.serverId > 0 { + if !this.isLO { // 环路不统计带宽,避免缓存预热等行为产生带宽 + stats.SharedBandwidthStatManager.Add(this.serverId, int64(n)) + } + } } + return } diff --git a/internal/nodes/client_conn_base.go b/internal/nodes/client_conn_base.go index 33bce03..45b43ff 100644 --- a/internal/nodes/client_conn_base.go +++ b/internal/nodes/client_conn_base.go @@ -36,6 +36,16 @@ func (this *BaseClientConn) Bind(serverId int64, remoteAddr string, maxConnsPerS return sharedClientConnLimiter.Add(this.rawConn.RemoteAddr().String(), serverId, remoteAddr, maxConnsPerServer, maxConnsPerIP) } +// SetServerId 设置服务ID +func (this *BaseClientConn) SetServerId(serverId int64) { + this.serverId = serverId +} + +// ServerId 读取当前连接绑定的服务ID +func (this *BaseClientConn) ServerId() int64 { + return this.serverId +} + // RawIP 原本IP func (this *BaseClientConn) RawIP() string { ip, _, _ := net.SplitHostPort(this.rawConn.RemoteAddr().String()) diff --git a/internal/nodes/client_conn_interface.go b/internal/nodes/client_conn_interface.go index 34f0b20..2549bc2 100644 --- a/internal/nodes/client_conn_interface.go +++ b/internal/nodes/client_conn_interface.go @@ -11,4 +11,10 @@ type ClientConnInterface interface { // Bind 绑定服务 Bind(serverId int64, remoteAddr string, maxConnsPerServer int, maxConnsPerIP int) bool + + // ServerId 获取服务ID + ServerId() int64 + + // SetServerId 设置服务ID + SetServerId(serverId int64) } diff --git a/internal/nodes/http_request.go b/internal/nodes/http_request.go index 02b6bc2..9d43dc4 100644 --- a/internal/nodes/http_request.go +++ b/internal/nodes/http_request.go @@ -356,7 +356,7 @@ func (this *HTTPRequest) doEnd() { // 流量统计 // TODO 增加是否开启开关 - if this.ReqServer != nil { + if this.ReqServer != nil && this.ReqServer.Id > 0 { var countCached int64 = 0 var cachedBytes int64 = 0 @@ -373,17 +373,17 @@ func (this *HTTPRequest) doEnd() { } stats.SharedTrafficStatManager.Add(this.ReqServer.Id, this.ReqHost, this.writer.SentBodyBytes()+this.writer.SentHeaderBytes(), cachedBytes, 1, countCached, countAttacks, attackBytes, this.ReqServer.ShouldCheckTrafficLimit(), this.ReqServer.PlanId()) - } - // 指标 - if metrics.SharedManager.HasHTTPMetrics() { - this.doMetricsResponse() - } + // 指标 + if metrics.SharedManager.HasHTTPMetrics() { + this.doMetricsResponse() + } - // 统计 - if this.web.StatRef != nil && this.web.StatRef.IsOn { - // 放到最后执行 - this.doStat() + // 统计 + if this.web.StatRef != nil && this.web.StatRef.IsOn { + // 放到最后执行 + this.doStat() + } } } diff --git a/internal/nodes/http_request_limit.go b/internal/nodes/http_request_limit.go index 0cdc1d7..a4776ed 100644 --- a/internal/nodes/http_request_limit.go +++ b/internal/nodes/http_request_limit.go @@ -24,7 +24,7 @@ func (this *HTTPRequest) doRequestLimit() (shouldStop bool) { // 设置连接相关参数 if this.web.RequestLimit.MaxConns > 0 || this.web.RequestLimit.MaxConnsPerIP > 0 { - requestConn := this.RawReq.Context().Value(HTTPConnContextKey) + var requestConn = this.RawReq.Context().Value(HTTPConnContextKey) if requestConn != nil { clientConn, ok := requestConn.(ClientConnInterface) if ok && !clientConn.IsBound() { diff --git a/internal/nodes/listener.go b/internal/nodes/listener.go index 7d27616..22bc49c 100644 --- a/internal/nodes/listener.go +++ b/internal/nodes/listener.go @@ -43,7 +43,7 @@ func (this *Listener) Listen() error { if this.group == nil { return nil } - protocol := this.group.Protocol() + var protocol = this.group.Protocol() if protocol.IsUDPFamily() { return this.listenUDP() } @@ -54,7 +54,7 @@ func (this *Listener) listenTCP() error { if this.group == nil { return nil } - protocol := this.group.Protocol() + var protocol = this.group.Protocol() tcpListener, err := this.createTCPListener() if err != nil { diff --git a/internal/nodes/listener_http.go b/internal/nodes/listener_http.go index 868dee5..fba97aa 100644 --- a/internal/nodes/listener_http.go +++ b/internal/nodes/listener_http.go @@ -178,6 +178,17 @@ func (this *HTTPListener) ServeHTTP(rawWriter http.ResponseWriter, rawReq *http. } } + // 绑定连接 + if server != nil && server.Id > 0 { + var requestConn = rawReq.Context().Value(HTTPConnContextKey) + if requestConn != nil { + clientConn, ok := requestConn.(ClientConnInterface) + if ok { + clientConn.SetServerId(server.Id) + } + } + } + // 包装新请求对象 var req = &HTTPRequest{ RawReq: rawReq, diff --git a/internal/nodes/listener_tcp.go b/internal/nodes/listener_tcp.go index 2210bca..4a1841d 100644 --- a/internal/nodes/listener_tcp.go +++ b/internal/nodes/listener_tcp.go @@ -63,7 +63,6 @@ func (this *TCPListener) Reload(group *serverconfigs.ServerAddressGroup) { } func (this *TCPListener) handleConn(conn net.Conn) error { - var server = this.Group.FirstServer() if server == nil { return errors.New("no server available") @@ -72,6 +71,23 @@ func (this *TCPListener) handleConn(conn net.Conn) error { return errors.New("no ReverseProxy configured for the server") } + // 绑定连接和服务 + clientConn, ok := conn.(ClientConnInterface) + if ok { + clientConn.SetServerId(server.Id) + } else { + tlsConn, ok := conn.(*tls.Conn) + if ok { + var internalConn = tlsConn.NetConn() + if internalConn != nil { + clientConn, ok = internalConn.(ClientConnInterface) + if ok { + clientConn.SetServerId(server.Id) + } + } + } + } + // 是否已达到流量限制 if this.reachedTrafficLimit() { // 关闭连接 diff --git a/internal/rpc/rpc_client.go b/internal/rpc/rpc_client.go index 32222d2..d1c2df2 100644 --- a/internal/rpc/rpc_client.go +++ b/internal/rpc/rpc_client.go @@ -123,6 +123,10 @@ func (this *RPCClient) ServerDailyStatRPC() pb.ServerDailyStatServiceClient { return pb.NewServerDailyStatServiceClient(this.pickConn()) } +func (this *RPCClient) ServerBandwidthStatRPC() pb.ServerBandwidthStatServiceClient { + return pb.NewServerBandwidthStatServiceClient(this.pickConn()) +} + func (this *RPCClient) MetricStatRPC() pb.MetricStatServiceClient { return pb.NewMetricStatServiceClient(this.pickConn()) } diff --git a/internal/stats/bandwidth_stat_manager.go b/internal/stats/bandwidth_stat_manager.go new file mode 100644 index 0000000..e7681bf --- /dev/null +++ b/internal/stats/bandwidth_stat_manager.go @@ -0,0 +1,147 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package stats + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" + "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/TeaOSLab/EdgeNode/internal/rpc" + "github.com/iwind/TeaGo/logs" + "github.com/iwind/TeaGo/types" + timeutil "github.com/iwind/TeaGo/utils/time" + "sync" + "time" +) + +var SharedBandwidthStatManager = NewBandwidthStatManager() + +func init() { + events.On(events.EventLoaded, func() { + goman.New(func() { + SharedBandwidthStatManager.Start() + }) + }) +} + +type BandwidthStat struct { + Day string + TimeAt string + ServerId int64 + + CurrentBytes int64 + CurrentTimestamp int64 + MaxBytes int64 +} + +// BandwidthStatManager 服务带宽统计 +type BandwidthStatManager struct { + m map[string]*BandwidthStat // key => *BandwidthStat + + lastTime string // 上一次执行的时间 + + ticker *time.Ticker + locker sync.Mutex +} + +func NewBandwidthStatManager() *BandwidthStatManager { + return &BandwidthStatManager{ + m: map[string]*BandwidthStat{}, + ticker: time.NewTicker(1 * time.Minute), // 时间小于1分钟是为了更快速地上传结果 + } +} + +func (this *BandwidthStatManager) Start() { + for range this.ticker.C { + err := this.Loop() + if err != nil && !rpc.IsConnError(err) { + remotelogs.Error("BANDWIDTH_STAT_MANAGER", err.Error()) + } + } +} + +func (this *BandwidthStatManager) Loop() error { + var now = time.Now() + var day = timeutil.Format("Ymd", now) + var currentTime = timeutil.FormatTime("Hi", now.Unix()/300*300) + + if this.lastTime == currentTime { + return nil + } + this.lastTime = currentTime + + var pbStats = []*pb.ServerBandwidthStat{} + + this.locker.Lock() + for key, stat := range this.m { + if stat.Day < day || stat.TimeAt < currentTime { + pbStats = append(pbStats, &pb.ServerBandwidthStat{ + Id: 0, + ServerId: stat.ServerId, + Day: stat.Day, + TimeAt: stat.TimeAt, + Bytes: stat.MaxBytes, + }) + delete(this.m, key) + } + } + this.locker.Unlock() + + if len(pbStats) > 0 { + // 上传 + rpcClient, err := rpc.SharedRPC() + if err != nil { + return err + } + _, err = rpcClient.ServerBandwidthStatRPC().UploadServerBandwidthStats(rpcClient.Context(), &pb.UploadServerBandwidthStatsRequest{ServerBandwidthStats: pbStats}) + if err != nil { + return err + } + } + + return nil +} + +// Add 添加带宽数据 +func (this *BandwidthStatManager) Add(serverId int64, bytes int64) { + if serverId <= 0 || bytes == 0 { + return + } + + var now = time.Now() + var timestamp = now.Unix() + var day = timeutil.Format("Ymd", now) + var timeAt = timeutil.FormatTime("Hi", now.Unix()/300*300) + var key = types.String(serverId) + "@" + day + "@" + timeAt + + this.locker.Lock() + stat, ok := this.m[key] + if ok { + if stat.CurrentTimestamp == timestamp { + stat.CurrentBytes += bytes + } else { + stat.CurrentBytes = bytes + stat.CurrentTimestamp = timestamp + } + if stat.CurrentBytes > stat.MaxBytes { + stat.MaxBytes = stat.CurrentBytes + } + } else { + this.m[key] = &BandwidthStat{ + Day: day, + TimeAt: timeAt, + ServerId: serverId, + CurrentBytes: bytes, + MaxBytes: bytes, + CurrentTimestamp: timestamp, + } + } + this.locker.Unlock() +} + +func (this *BandwidthStatManager) Inspect() { + this.locker.Lock() + logs.PrintAsJSON(this.m) + this.locker.Unlock() +} diff --git a/internal/stats/bandwidth_stat_manager_test.go b/internal/stats/bandwidth_stat_manager_test.go new file mode 100644 index 0000000..b6dd442 --- /dev/null +++ b/internal/stats/bandwidth_stat_manager_test.go @@ -0,0 +1,33 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package stats_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/stats" + "testing" + "time" +) + +func TestBandwidthStatManager_Add(t *testing.T) { + var manager = stats.NewBandwidthStatManager() + manager.Add(1, 10) + manager.Add(1, 10) + manager.Add(1, 10) + time.Sleep(1 * time.Second) + manager.Add(1, 15) + time.Sleep(1 * time.Second) + manager.Add(1, 25) + manager.Add(1, 75) + manager.Inspect() +} + +func TestBandwidthStatManager_Loop(t *testing.T) { + var manager = stats.NewBandwidthStatManager() + manager.Add(1, 10) + manager.Add(1, 10) + manager.Add(1, 10) + err := manager.Loop() + if err != nil { + t.Fatal(err) + } +} \ No newline at end of file diff --git a/internal/stats/http_request_stat_manager.go b/internal/stats/http_request_stat_manager.go index 3549ab5..ff544e9 100644 --- a/internal/stats/http_request_stat_manager.go +++ b/internal/stats/http_request_stat_manager.go @@ -123,7 +123,7 @@ func (this *HTTPRequestStatManager) AddRemoteAddr(serverId int64, remoteAddr str if remoteAddr[0] == '[' { // 排除IPv6 return } - index := strings.Index(remoteAddr, ":") + var index = strings.Index(remoteAddr, ":") var ip string if index < 0 { ip = remoteAddr @@ -177,18 +177,18 @@ func (this *HTTPRequestStatManager) AddFirewallRuleGroupId(serverId int64, firew // Loop 单个循环 func (this *HTTPRequestStatManager) Loop() error { - timeout := time.NewTimer(10 * time.Minute) // 执行的最大时间 + var timeout = time.NewTimer(10 * time.Minute) // 执行的最大时间 Loop: for { select { case ipString := <-this.ipChan: // serverId@ip@bytes@isAttack - pieces := strings.Split(ipString, "@") + var pieces = strings.Split(ipString, "@") if len(pieces) < 4 { continue } - serverId := pieces[0] - ip := pieces[1] + var serverId = pieces[0] + var ip = pieces[1] if iplibrary.SharedLibrary != nil { result, err := iplibrary.SharedLibrary.Lookup(ip) @@ -216,12 +216,12 @@ Loop: } } case userAgentString := <-this.userAgentChan: - atIndex := strings.Index(userAgentString, "@") + var atIndex = strings.Index(userAgentString, "@") if atIndex < 0 { continue } - serverId := userAgentString[:atIndex] - userAgent := userAgentString[atIndex+1:] + var serverId = userAgentString[:atIndex] + var userAgent = userAgentString[atIndex+1:] var result = SharedUserAgentParser.Parse(userAgent) var osInfo = result.OS @@ -264,12 +264,12 @@ func (this *HTTPRequestStatManager) Upload() error { } // 月份相关 - pbCities := []*pb.UploadServerHTTPRequestStatRequest_RegionCity{} - pbProviders := []*pb.UploadServerHTTPRequestStatRequest_RegionProvider{} - pbSystems := []*pb.UploadServerHTTPRequestStatRequest_System{} - pbBrowsers := []*pb.UploadServerHTTPRequestStatRequest_Browser{} + var pbCities = []*pb.UploadServerHTTPRequestStatRequest_RegionCity{} + var pbProviders = []*pb.UploadServerHTTPRequestStatRequest_RegionProvider{} + var pbSystems = []*pb.UploadServerHTTPRequestStatRequest_System{} + var pbBrowsers = []*pb.UploadServerHTTPRequestStatRequest_Browser{} for k, stat := range this.cityMap { - pieces := strings.SplitN(k, "@", 4) + var pieces = strings.SplitN(k, "@", 4) pbCities = append(pbCities, &pb.UploadServerHTTPRequestStatRequest_RegionCity{ ServerId: types.Int64(pieces[0]), CountryName: pieces[1], @@ -282,7 +282,7 @@ func (this *HTTPRequestStatManager) Upload() error { }) } for k, count := range this.providerMap { - pieces := strings.SplitN(k, "@", 2) + var pieces = strings.SplitN(k, "@", 2) pbProviders = append(pbProviders, &pb.UploadServerHTTPRequestStatRequest_RegionProvider{ ServerId: types.Int64(pieces[0]), Name: pieces[1], @@ -290,7 +290,7 @@ func (this *HTTPRequestStatManager) Upload() error { }) } for k, count := range this.systemMap { - pieces := strings.SplitN(k, "@", 3) + var pieces = strings.SplitN(k, "@", 3) pbSystems = append(pbSystems, &pb.UploadServerHTTPRequestStatRequest_System{ ServerId: types.Int64(pieces[0]), Name: pieces[1], @@ -299,7 +299,7 @@ func (this *HTTPRequestStatManager) Upload() error { }) } for k, count := range this.browserMap { - pieces := strings.SplitN(k, "@", 3) + var pieces = strings.SplitN(k, "@", 3) pbBrowsers = append(pbBrowsers, &pb.UploadServerHTTPRequestStatRequest_Browser{ ServerId: types.Int64(pieces[0]), Name: pieces[1], @@ -309,9 +309,9 @@ func (this *HTTPRequestStatManager) Upload() error { } // 防火墙相关 - pbFirewallRuleGroups := []*pb.UploadServerHTTPRequestStatRequest_HTTPFirewallRuleGroup{} + var pbFirewallRuleGroups = []*pb.UploadServerHTTPRequestStatRequest_HTTPFirewallRuleGroup{} for k, count := range this.dailyFirewallRuleGroupMap { - pieces := strings.SplitN(k, "@", 3) + var pieces = strings.SplitN(k, "@", 3) pbFirewallRuleGroups = append(pbFirewallRuleGroups, &pb.UploadServerHTTPRequestStatRequest_HTTPFirewallRuleGroup{ ServerId: types.Int64(pieces[0]), HttpFirewallRuleGroupId: types.Int64(pieces[1]), diff --git a/internal/stats/traffic_stat_manager.go b/internal/stats/traffic_stat_manager.go index 7ff175a..4e6c1d1 100644 --- a/internal/stats/traffic_stat_manager.go +++ b/internal/stats/traffic_stat_manager.go @@ -95,6 +95,10 @@ func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig) // Add 添加流量 func (this *TrafficStatManager) Add(serverId int64, domain string, bytes int64, cachedBytes int64, countRequests int64, countCachedRequests int64, countAttacks int64, attackBytes int64, checkingTrafficLimit bool, planId int64) { + if serverId == 0 { + return + } + if bytes == 0 && countRequests == 0 { return } @@ -139,7 +143,7 @@ func (this *TrafficStatManager) Add(serverId int64, domain string, bytes int64, // Upload 上传流量 func (this *TrafficStatManager) Upload() error { - config := this.configFunc() + var config = this.configFunc() if config == nil { return nil } @@ -150,8 +154,8 @@ func (this *TrafficStatManager) Upload() error { } this.locker.Lock() - itemMap := this.itemMap - domainMap := this.domainsMap + var itemMap = this.itemMap + var domainMap = this.domainsMap this.itemMap = map[string]*TrafficItem{} this.domainsMap = map[string]*TrafficItem{} this.locker.Unlock()