Files
EdgeNode/internal/stats/http_request_stat_manager.go
2021-01-25 16:40:31 +08:00

232 lines
6.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package stats
import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeNode/internal/iplibrary"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/types"
timeutil "github.com/iwind/TeaGo/utils/time"
"github.com/mssola/user_agent"
"strconv"
"strings"
"time"
)
var SharedHTTPRequestStatManager = NewHTTPRequestStatManager()
// HTTP请求相关的统计
// 这里的统计是一个辅助统计,注意不要因为统计而影响服务工作性能
type HTTPRequestStatManager struct {
ipChan chan string
userAgentChan chan string
cityMap map[string]int64 // serverId@country@province@city => count 不需要加锁因为我们是使用channel依次执行的
providerMap map[string]int64 // serverId@provider => count
systemMap map[string]int64 // serverId@system@version => count
browserMap map[string]int64 // serverId@browser@version => count
}
// 获取新对象
func NewHTTPRequestStatManager() *HTTPRequestStatManager {
return &HTTPRequestStatManager{
ipChan: make(chan string, 10_000), // TODO 将来可以配置容量
userAgentChan: make(chan string, 10_000), // TODO 将来可以配置容量
cityMap: map[string]int64{},
providerMap: map[string]int64{},
systemMap: map[string]int64{},
browserMap: map[string]int64{},
}
}
// 启动
func (this *HTTPRequestStatManager) Start() {
loopTicker := time.NewTicker(1 * time.Second)
uploadTicker := time.NewTicker(30 * time.Minute)
if Tea.IsTesting() {
uploadTicker = time.NewTicker(30 * time.Second) // 方便我们调试
}
for range loopTicker.C {
err := this.Loop()
if err != nil {
remotelogs.Error("HTTP_REQUEST_STAT_MANAGER", err.Error())
}
select {
case <-uploadTicker.C:
err := this.Upload()
if err != nil {
remotelogs.Error("HTTP_REQUEST_STAT_MANAGER", "upload failed: "+err.Error())
}
default:
}
}
}
// 添加客户端地址
func (this *HTTPRequestStatManager) AddRemoteAddr(serverId int64, remoteAddr string) {
if len(remoteAddr) == 0 {
return
}
if remoteAddr[0] == '[' { // 排除IPv6
return
}
index := strings.Index(remoteAddr, ":")
var ip string
if index < 0 {
ip = remoteAddr
} else {
ip = remoteAddr[:index]
}
if len(ip) > 0 {
select {
case this.ipChan <- strconv.FormatInt(serverId, 10) + "@" + ip:
default:
// 超出容量我们就丢弃
}
}
}
// 添加UserAgent
func (this *HTTPRequestStatManager) AddUserAgent(serverId int64, userAgent string) {
if len(userAgent) == 0 {
return
}
select {
case this.userAgentChan <- strconv.FormatInt(serverId, 10) + "@" + userAgent:
default:
// 超出容量我们就丢弃
}
}
// 单个循环
func (this *HTTPRequestStatManager) Loop() error {
timeout := time.NewTimer(10 * time.Minute) // 执行的最大时间
userAgentParser := &user_agent.UserAgent{}
Loop:
for {
select {
case ipString := <-this.ipChan:
atIndex := strings.Index(ipString, "@")
if atIndex < 0 {
continue
}
serverId := ipString[:atIndex]
ip := ipString[atIndex+1:]
if iplibrary.SharedLibrary != nil {
result, err := iplibrary.SharedLibrary.Lookup(ip)
if err == nil {
this.cityMap[serverId+"@"+result.Country+"@"+result.Province+"@"+result.City] ++
if len(result.ISP) > 0 {
this.providerMap[serverId+"@"+result.ISP] ++
}
}
}
case userAgentString := <-this.userAgentChan:
atIndex := strings.Index(userAgentString, "@")
if atIndex < 0 {
continue
}
serverId := userAgentString[:atIndex]
userAgent := userAgentString[atIndex+1:]
userAgentParser.Parse(userAgent)
osInfo := userAgentParser.OSInfo()
if len(osInfo.Name) > 0 {
dotIndex := strings.Index(osInfo.Version, ".")
if dotIndex > -1 {
osInfo.Version = osInfo.Version[:dotIndex]
}
this.systemMap[serverId+"@"+osInfo.Name+"@"+osInfo.Version]++
}
browser, browserVersion := userAgentParser.Browser()
if len(browser) > 0 {
dotIndex := strings.Index(browserVersion, ".")
if dotIndex > -1 {
browserVersion = browserVersion[:dotIndex]
}
this.browserMap[serverId+"@"+browser+"@"+browserVersion] ++
}
case <-timeout.C:
break Loop
default:
break Loop
}
}
timeout.Stop()
return nil
}
func (this *HTTPRequestStatManager) Upload() error {
// 上传统计数据
rpcClient, err := rpc.SharedRPC()
if err != nil {
return err
}
pbCities := []*pb.UploadServerHTTPRequestStatRequest_RegionCity{}
pbProviders := []*pb.UploadServerHTTPRequestStatRequest_RegionProvider{}
pbSystems := []*pb.UploadServerHTTPRequestStatRequest_System{}
pbBrowsers := []*pb.UploadServerHTTPRequestStatRequest_Browser{}
for k, count := range this.cityMap {
pieces := strings.SplitN(k, "@", 4)
pbCities = append(pbCities, &pb.UploadServerHTTPRequestStatRequest_RegionCity{
ServerId: types.Int64(pieces[0]),
CountryName: pieces[1],
ProvinceName: pieces[2],
CityName: pieces[3],
Count: count,
})
}
for k, count := range this.providerMap {
pieces := strings.SplitN(k, "@", 2)
pbProviders = append(pbProviders, &pb.UploadServerHTTPRequestStatRequest_RegionProvider{
ServerId: types.Int64(pieces[0]),
Name: pieces[1],
Count: count,
})
}
for k, count := range this.systemMap {
pieces := strings.SplitN(k, "@", 3)
pbSystems = append(pbSystems, &pb.UploadServerHTTPRequestStatRequest_System{
ServerId: types.Int64(pieces[0]),
Name: pieces[1],
Version: pieces[2],
Count: count,
})
}
for k, count := range this.browserMap {
pieces := strings.SplitN(k, "@", 3)
pbBrowsers = append(pbBrowsers, &pb.UploadServerHTTPRequestStatRequest_Browser{
ServerId: types.Int64(pieces[0]),
Name: pieces[1],
Version: pieces[2],
Count: count,
})
}
_, err = rpcClient.ServerRPC().UploadServerHTTPRequestStat(rpcClient.Context(), &pb.UploadServerHTTPRequestStatRequest{
Month: timeutil.Format("Ym"),
RegionCities: pbCities,
RegionProviders: pbProviders,
Systems: pbSystems,
Browsers: pbBrowsers,
})
if err != nil {
return err
}
// 重置数据
this.cityMap = map[string]int64{}
this.providerMap = map[string]int64{}
this.systemMap = map[string]int64{}
this.browserMap = map[string]int64{}
return nil
}