diff --git a/internal/stats/http_request_stat_manager.go b/internal/stats/http_request_stat_manager.go index cd95d9f..7b3fc9e 100644 --- a/internal/stats/http_request_stat_manager.go +++ b/internal/stats/http_request_stat_manager.go @@ -20,6 +20,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" ) @@ -46,7 +47,13 @@ type HTTPRequestStatManager struct { dailyFirewallRuleGroupMap map[string]int64 // serverId@firewallRuleGroupId@action => count + serverCityCountMap map[string]int16 // serverIdString => count cities + serverSystemCountMap map[string]int16 // serverIdString => count systems + serverBrowserCountMap map[string]int16 // serverIdString => count browsers + totalAttackRequests int64 + + locker sync.Mutex } // NewHTTPRequestStatManager 获取新对象 @@ -60,6 +67,10 @@ func NewHTTPRequestStatManager() *HTTPRequestStatManager { systemMap: map[string]int64{}, browserMap: map[string]int64{}, dailyFirewallRuleGroupMap: map[string]int64{}, + + serverCityCountMap: map[string]int16{}, + serverSystemCountMap: map[string]int16{}, + serverBrowserCountMap: map[string]int16{}, } } @@ -79,7 +90,6 @@ func (this *HTTPRequestStatManager) Start() { } }) - var loopTicker = time.NewTicker(1 * time.Second) var uploadTicker = time.NewTicker(30 * time.Minute) if Tea.IsTesting() { uploadTicker = time.NewTicker(10 * time.Second) // 在测试环境下缩短Ticker时间,以方便我们调试 @@ -87,20 +97,12 @@ func (this *HTTPRequestStatManager) Start() { remotelogs.Println("HTTP_REQUEST_STAT_MANAGER", "start ...") events.OnKey(events.EventQuit, this, func() { remotelogs.Println("HTTP_REQUEST_STAT_MANAGER", "quit") - loopTicker.Stop() uploadTicker.Stop() }) - for range loopTicker.C { - err := this.Loop() - if err != nil { - if rpc.IsConnError(err) { - remotelogs.Warn("HTTP_REQUEST_STAT_MANAGER", err.Error()) - } else { - remotelogs.Error("HTTP_REQUEST_STAT_MANAGER", err.Error()) - } - } - select { - case <-uploadTicker.C: + + // 上传Ticker + goman.New(func() { + for range uploadTicker.C { var tr = trackers.Begin("UPLOAD_REQUEST_STATS") err := this.Upload() tr.End() @@ -111,9 +113,20 @@ func (this *HTTPRequestStatManager) Start() { remotelogs.Warn("HTTP_REQUEST_STAT_MANAGER", "upload failed: "+err.Error()) } } - default: } + }) + + // 分析Ticker + for { + err := this.Loop() + if err != nil { + if rpc.IsConnError(err) { + remotelogs.Warn("HTTP_REQUEST_STAT_MANAGER", err.Error()) + } else { + remotelogs.Error("HTTP_REQUEST_STAT_MANAGER", err.Error()) + } + } } } @@ -149,7 +162,7 @@ func (this *HTTPRequestStatManager) AddRemoteAddr(serverId int64, remoteAddr str // AddUserAgent 添加UserAgent func (this *HTTPRequestStatManager) AddUserAgent(serverId int64, userAgent string, ip string) { - if len(userAgent) == 0 { + if len(userAgent) == 0 || strings.ContainsRune(userAgent, '@') /** 非常重要,防止后面组合字符串时出现异常 **/ { return } @@ -184,79 +197,101 @@ func (this *HTTPRequestStatManager) AddFirewallRuleGroupId(serverId int64, firew // Loop 单个循环 func (this *HTTPRequestStatManager) Loop() error { - var timeout = time.NewTimer(10 * time.Minute) // 执行的最大时间 -Loop: - for { - select { - case ipString := <-this.ipChan: - // serverId@ip@bytes@isAttack - var pieces = strings.Split(ipString, "@") - if len(pieces) < 4 { - continue - } - var serverId = pieces[0] - var ip = pieces[1] - - var result = iplib.LookupIP(ip) - if result != nil && result.IsOk() { - var key = serverId + "@" + result.CountryName() + "@" + result.ProvinceName() + "@" + result.CityName() - stat, ok := this.cityMap[key] - if !ok { - stat = &StatItem{} - this.cityMap[key] = stat - } - stat.Bytes += types.Int64(pieces[2]) - stat.CountRequests++ - if types.Int8(pieces[3]) == 1 { - stat.AttackBytes += types.Int64(pieces[2]) - stat.CountAttackRequests++ - } - - if len(result.ProviderName()) > 0 { - this.providerMap[serverId+"@"+result.ProviderName()]++ - } - } - - case userAgentString := <-this.userAgentChan: - var atIndex = strings.Index(userAgentString, "@") - if atIndex < 0 { - continue - } - var serverId = userAgentString[:atIndex] - var userAgent = userAgentString[atIndex+1:] - - var result = SharedUserAgentParser.Parse(userAgent) - var osInfo = result.OS - if len(osInfo.Name) > 0 { - dotIndex := strings.Index(osInfo.Version, ".") - if dotIndex > -1 { - osInfo.Version = osInfo.Version[:dotIndex] - } - if len(this.systemMap) < 100_000 { // 限制最大数据,防止攻击 - this.systemMap[serverId+"@"+osInfo.Name+"@"+osInfo.Version]++ - } - } - - var browser, browserVersion = result.BrowserName, result.BrowserVersion - if len(browser) > 0 { - dotIndex := strings.Index(browserVersion, ".") - if dotIndex > -1 { - browserVersion = browserVersion[:dotIndex] - } - if len(this.browserMap) < 100_000 { // 限制最大数据,防止攻击 - this.browserMap[serverId+"@"+browser+"@"+browserVersion]++ - } - } - case firewallRuleGroupString := <-this.firewallRuleGroupChan: - this.dailyFirewallRuleGroupMap[firewallRuleGroupString]++ - case <-timeout.C: - break Loop - default: - break Loop + select { + case ipString := <-this.ipChan: + // serverId@ip@bytes@isAttack + var pieces = strings.Split(ipString, "@") + if len(pieces) < 4 { + return nil } - } + var serverId = pieces[0] + var ip = pieces[1] - timeout.Stop() + var result = iplib.LookupIP(ip) + if result != nil && result.IsOk() { + var key = serverId + "@" + types.String(result.CountryId()) + "@" + types.String(result.ProvinceId()) + "@" + types.String(result.CityId()) + this.locker.Lock() + stat, ok := this.cityMap[key] + if !ok { + // 检查数量 + if this.serverCityCountMap[key] > 128 { // 限制单个服务的城市数量,防止数量过多 + this.locker.Unlock() + return nil + } + this.serverCityCountMap[key]++ // 需要放在限制之后,因为使用的是int16 + + stat = &StatItem{} + this.cityMap[key] = stat + } + stat.Bytes += types.Int64(pieces[2]) + stat.CountRequests++ + if types.Int8(pieces[3]) == 1 { + stat.AttackBytes += types.Int64(pieces[2]) + stat.CountAttackRequests++ + } + + if result.ProviderId() > 0 { + this.providerMap[serverId+"@"+types.String(result.ProviderId())]++ + } + this.locker.Unlock() + } + case userAgentString := <-this.userAgentChan: + var atIndex = strings.Index(userAgentString, "@") + if atIndex < 0 { + return nil + } + var serverId = userAgentString[:atIndex] + var userAgent = userAgentString[atIndex+1:] + + var result = SharedUserAgentParser.Parse(userAgent) + var osInfo = result.OS + if len(osInfo.Name) > 0 { + dotIndex := strings.Index(osInfo.Version, ".") + if dotIndex > -1 { + osInfo.Version = osInfo.Version[:dotIndex] + } + this.locker.Lock() + + var systemKey = serverId + "@" + osInfo.Name + "@" + osInfo.Version + _, ok := this.systemMap[systemKey] + if !ok { + if this.serverSystemCountMap[serverId] < 128 { // 限制最大数据,防止攻击 + this.serverSystemCountMap[serverId]++ + ok = true + } + } + if ok { + this.systemMap[systemKey]++ + } + this.locker.Unlock() + } + + var browser, browserVersion = result.BrowserName, result.BrowserVersion + if len(browser) > 0 { + dotIndex := strings.Index(browserVersion, ".") + if dotIndex > -1 { + browserVersion = browserVersion[:dotIndex] + } + this.locker.Lock() + + var browserKey = serverId + "@" + browser + "@" + browserVersion + _, ok := this.browserMap[browserKey] + if !ok { + if this.serverBrowserCountMap[serverId] < 256 { // 限制最大数据,防止攻击 + this.serverBrowserCountMap[serverId]++ + ok = true + } + } + if ok { + this.browserMap[browserKey]++ + } + this.locker.Unlock() + } + case firewallRuleGroupString := <-this.firewallRuleGroupChan: + this.locker.Lock() + this.dailyFirewallRuleGroupMap[firewallRuleGroupString]++ + this.locker.Unlock() + } return nil } @@ -269,11 +304,31 @@ func (this *HTTPRequestStatManager) Upload() error { return err } + // 拷贝数据 + this.locker.Lock() + var cityMap = this.cityMap + var providerMap = this.providerMap + var systemMap = this.systemMap + var browserMap = this.browserMap + var dailyFirewallRuleGroupMap = this.dailyFirewallRuleGroupMap + + this.cityMap = map[string]*StatItem{} + this.providerMap = map[string]int64{} + this.systemMap = map[string]int64{} + this.browserMap = map[string]int64{} + this.dailyFirewallRuleGroupMap = map[string]int64{} + + this.serverCityCountMap = map[string]int16{} + this.serverSystemCountMap = map[string]int16{} + this.serverBrowserCountMap = map[string]int16{} + + this.locker.Unlock() + // 上传限制 - var maxCities int16 = 20 - var maxProviders int16 = 20 - var maxSystems int16 = 20 - var maxBrowsers int16 = 20 + var maxCities int16 = 32 + var maxProviders int16 = 32 + var maxSystems int16 = 64 + var maxBrowsers int16 = 64 nodeConfig, _ := nodeconfigs.SharedNodeConfig() if nodeConfig != nil { var serverConfig = nodeConfig.GlobalServerConfig // 复制是为了防止在中途修改 @@ -300,21 +355,21 @@ func (this *HTTPRequestStatManager) Upload() error { var pbBrowsers = []*pb.UploadServerHTTPRequestStatRequest_Browser{} // 城市 - for k, stat := range this.cityMap { + for k, stat := range cityMap { var pieces = strings.SplitN(k, "@", 4) var serverId = types.Int64(pieces[0]) pbCities = append(pbCities, &pb.UploadServerHTTPRequestStatRequest_RegionCity{ ServerId: serverId, - CountryName: pieces[1], - ProvinceName: pieces[2], - CityName: pieces[3], + CountryId: types.Int64(pieces[1]), + ProvinceId: types.Int64(pieces[2]), + CityId: types.Int64(pieces[3]), CountRequests: stat.CountRequests, CountAttackRequests: stat.CountAttackRequests, Bytes: stat.Bytes, AttackBytes: stat.AttackBytes, }) } - if len(this.cityMap) > int(maxCities) { + if len(cityMap) > int(maxCities) { var newPBCities = []*pb.UploadServerHTTPRequestStatRequest_RegionCity{} sort.Slice(pbCities, func(i, j int) bool { return pbCities[i].CountRequests > pbCities[j].CountRequests @@ -333,16 +388,16 @@ func (this *HTTPRequestStatManager) Upload() error { } // 运营商 - for k, count := range this.providerMap { + for k, count := range providerMap { var pieces = strings.SplitN(k, "@", 2) var serverId = types.Int64(pieces[0]) pbProviders = append(pbProviders, &pb.UploadServerHTTPRequestStatRequest_RegionProvider{ - ServerId: serverId, - Name: pieces[1], - Count: count, + ServerId: serverId, + ProviderId: types.Int64(pieces[1]), + Count: count, }) } - if len(this.providerMap) > int(maxProviders) { + if len(providerMap) > int(maxProviders) { var newPBProviders = []*pb.UploadServerHTTPRequestStatRequest_RegionProvider{} sort.Slice(pbProviders, func(i, j int) bool { return pbProviders[i].Count > pbProviders[j].Count @@ -361,7 +416,7 @@ func (this *HTTPRequestStatManager) Upload() error { } // 操作系统 - for k, count := range this.systemMap { + for k, count := range systemMap { var pieces = strings.SplitN(k, "@", 3) var serverId = types.Int64(pieces[0]) pbSystems = append(pbSystems, &pb.UploadServerHTTPRequestStatRequest_System{ @@ -371,7 +426,7 @@ func (this *HTTPRequestStatManager) Upload() error { Count: count, }) } - if len(this.systemMap) > int(maxSystems) { + if len(systemMap) > int(maxSystems) { var newPBSystems = []*pb.UploadServerHTTPRequestStatRequest_System{} sort.Slice(pbSystems, func(i, j int) bool { return pbSystems[i].Count > pbSystems[j].Count @@ -390,7 +445,7 @@ func (this *HTTPRequestStatManager) Upload() error { } // 浏览器 - for k, count := range this.browserMap { + for k, count := range browserMap { var pieces = strings.SplitN(k, "@", 3) var serverId = types.Int64(pieces[0]) pbBrowsers = append(pbBrowsers, &pb.UploadServerHTTPRequestStatRequest_Browser{ @@ -400,7 +455,7 @@ func (this *HTTPRequestStatManager) Upload() error { Count: count, }) } - if len(this.browserMap) > int(maxBrowsers) { + if len(browserMap) > int(maxBrowsers) { var newPBBrowsers = []*pb.UploadServerHTTPRequestStatRequest_Browser{} sort.Slice(pbBrowsers, func(i, j int) bool { return pbBrowsers[i].Count > pbBrowsers[j].Count @@ -420,7 +475,7 @@ func (this *HTTPRequestStatManager) Upload() error { // 防火墙相关 var pbFirewallRuleGroups = []*pb.UploadServerHTTPRequestStatRequest_HTTPFirewallRuleGroup{} - for k, count := range this.dailyFirewallRuleGroupMap { + for k, count := range dailyFirewallRuleGroupMap { var pieces = strings.SplitN(k, "@", 3) pbFirewallRuleGroups = append(pbFirewallRuleGroups, &pb.UploadServerHTTPRequestStatRequest_HTTPFirewallRuleGroup{ ServerId: types.Int64(pieces[0]), @@ -430,14 +485,6 @@ func (this *HTTPRequestStatManager) Upload() error { }) } - // 重置数据 - // 这里需要放到上传数据之前,防止因上传失败而导致统计数据堆积 - this.cityMap = map[string]*StatItem{} - this.providerMap = map[string]int64{} - this.systemMap = map[string]int64{} - this.browserMap = map[string]int64{} - this.dailyFirewallRuleGroupMap = map[string]int64{} - // 检查是否有数据 if len(pbCities) == 0 && len(pbProviders) == 0 &&