mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-03 06:40:25 +08:00
305 lines
8.3 KiB
Go
305 lines
8.3 KiB
Go
package stats
|
||
|
||
import (
|
||
"sort"
|
||
"strconv"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||
"github.com/TeaOSLab/EdgeNode/internal/monitor"
|
||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||
"github.com/TeaOSLab/EdgeNode/internal/rpc"
|
||
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
|
||
"github.com/TeaOSLab/EdgeNode/internal/utils/goman"
|
||
"github.com/iwind/TeaGo/Tea"
|
||
"github.com/iwind/TeaGo/maps"
|
||
"github.com/iwind/TeaGo/types"
|
||
)
|
||
|
||
var SharedTrafficStatManager = NewTrafficStatManager()
|
||
|
||
type TrafficItem struct {
|
||
UserId int64
|
||
Bytes int64
|
||
CachedBytes int64
|
||
CountRequests int64
|
||
CountCachedRequests int64
|
||
CountAttackRequests int64
|
||
AttackBytes int64
|
||
PlanId int64
|
||
CheckingTrafficLimit bool
|
||
}
|
||
|
||
func (this *TrafficItem) Add(anotherItem *TrafficItem) {
|
||
this.Bytes += anotherItem.Bytes
|
||
this.CachedBytes += anotherItem.CachedBytes
|
||
this.CountRequests += anotherItem.CountRequests
|
||
this.CountCachedRequests += anotherItem.CountCachedRequests
|
||
this.CountAttackRequests += anotherItem.CountAttackRequests
|
||
this.AttackBytes += anotherItem.AttackBytes
|
||
}
|
||
|
||
// TrafficStatManager 区域流量统计
|
||
type TrafficStatManager struct {
|
||
itemMap map[string]*TrafficItem // [timestamp serverId] => *TrafficItem
|
||
domainsMap map[int64]map[string]*TrafficItem // serverIde => { timestamp @ domain => *TrafficItem }
|
||
|
||
pbItems []*pb.ServerDailyStat
|
||
pbDomainItems []*pb.UploadServerDailyStatsRequest_DomainStat
|
||
|
||
locker sync.Mutex
|
||
|
||
totalRequests int64
|
||
}
|
||
|
||
// NewTrafficStatManager 获取新对象
|
||
func NewTrafficStatManager() *TrafficStatManager {
|
||
var manager = &TrafficStatManager{
|
||
itemMap: map[string]*TrafficItem{},
|
||
domainsMap: map[int64]map[string]*TrafficItem{},
|
||
}
|
||
|
||
return manager
|
||
}
|
||
|
||
// Start 启动自动任务
|
||
func (this *TrafficStatManager) Start() {
|
||
// 上传请求总数
|
||
var monitorTicker = time.NewTicker(1 * time.Minute)
|
||
events.OnKey(events.EventQuit, this, func() {
|
||
monitorTicker.Stop()
|
||
})
|
||
goman.New(func() {
|
||
for range monitorTicker.C {
|
||
if this.totalRequests > 0 {
|
||
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemRequests, maps.Map{"total": this.totalRequests})
|
||
this.totalRequests = 0
|
||
}
|
||
}
|
||
})
|
||
|
||
// 上传统计数据
|
||
var duration = 5 * time.Minute
|
||
if Tea.IsTesting() {
|
||
// 测试环境缩短上传时间,方便我们调试
|
||
duration = 30 * time.Second
|
||
}
|
||
var ticker = time.NewTicker(duration)
|
||
events.OnKey(events.EventQuit, this, func() {
|
||
remotelogs.Println("TRAFFIC_STAT_MANAGER", "quit")
|
||
ticker.Stop()
|
||
})
|
||
remotelogs.Println("TRAFFIC_STAT_MANAGER", "start ...")
|
||
for range ticker.C {
|
||
err := this.Upload()
|
||
if err != nil {
|
||
if !rpc.IsConnError(err) {
|
||
remotelogs.Error("TRAFFIC_STAT_MANAGER", "upload stats failed: "+err.Error())
|
||
} else {
|
||
remotelogs.Warn("TRAFFIC_STAT_MANAGER", "upload stats failed: "+err.Error())
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// Add 添加流量
|
||
func (this *TrafficStatManager) Add(userId int64, serverId int64, domain string, bytes int64, cachedBytes int64, countRequests int64, countCachedRequests int64, countAttacks int64, attackBytes int64, countWebsocketConnections int64, checkingTrafficLimit bool, planId int64) {
|
||
if serverId == 0 {
|
||
return
|
||
}
|
||
|
||
// 添加到带宽
|
||
SharedBandwidthStatManager.AddTraffic(serverId, cachedBytes, countRequests, countCachedRequests, countAttacks, attackBytes, countWebsocketConnections)
|
||
|
||
if bytes == 0 && countRequests == 0 {
|
||
return
|
||
}
|
||
|
||
this.totalRequests++
|
||
|
||
var timestamp = fasttime.Now().UnixFloor(300)
|
||
var key = strconv.FormatInt(timestamp, 10) + strconv.FormatInt(serverId, 10)
|
||
this.locker.Lock()
|
||
|
||
// 总的流量
|
||
item, ok := this.itemMap[key]
|
||
if !ok {
|
||
item = &TrafficItem{
|
||
UserId: userId,
|
||
}
|
||
this.itemMap[key] = item
|
||
}
|
||
item.Bytes += bytes
|
||
item.CachedBytes += cachedBytes
|
||
item.CountRequests += countRequests
|
||
item.CountCachedRequests += countCachedRequests
|
||
item.CountAttackRequests += countAttacks
|
||
item.AttackBytes += attackBytes
|
||
item.CheckingTrafficLimit = checkingTrafficLimit
|
||
item.PlanId = planId
|
||
|
||
// 单个域名流量
|
||
if len(domain) < 128 {
|
||
var domainKey = types.String(timestamp) + "@" + domain
|
||
serverDomainMap, ok := this.domainsMap[serverId]
|
||
if !ok {
|
||
serverDomainMap = map[string]*TrafficItem{}
|
||
this.domainsMap[serverId] = serverDomainMap
|
||
}
|
||
|
||
domainItem, ok := serverDomainMap[domainKey]
|
||
if !ok {
|
||
domainItem = &TrafficItem{}
|
||
serverDomainMap[domainKey] = domainItem
|
||
}
|
||
domainItem.Bytes += bytes
|
||
domainItem.CachedBytes += cachedBytes
|
||
domainItem.CountRequests += countRequests
|
||
domainItem.CountCachedRequests += countCachedRequests
|
||
domainItem.CountAttackRequests += countAttacks
|
||
domainItem.AttackBytes += attackBytes
|
||
}
|
||
|
||
this.locker.Unlock()
|
||
}
|
||
|
||
// Upload 上传流量
|
||
func (this *TrafficStatManager) Upload() error {
|
||
var regionId int64
|
||
nodeConfig, _ := nodeconfigs.SharedNodeConfig()
|
||
if nodeConfig != nil {
|
||
regionId = nodeConfig.RegionId
|
||
}
|
||
|
||
client, err := rpc.SharedRPC()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
this.locker.Lock()
|
||
|
||
var itemMap = this.itemMap
|
||
var domainMap = this.domainsMap
|
||
|
||
// reset
|
||
this.itemMap = map[string]*TrafficItem{}
|
||
this.domainsMap = map[int64]map[string]*TrafficItem{}
|
||
|
||
this.locker.Unlock()
|
||
|
||
// 服务统计
|
||
var pbServerStats = []*pb.ServerDailyStat{}
|
||
for key, item := range itemMap {
|
||
timestamp, err := strconv.ParseInt(key[:10], 10, 64)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
serverId, err := strconv.ParseInt(key[10:], 10, 64)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
pbServerStats = append(pbServerStats, &pb.ServerDailyStat{
|
||
UserId: item.UserId,
|
||
ServerId: serverId,
|
||
NodeRegionId: regionId,
|
||
Bytes: item.Bytes,
|
||
CachedBytes: item.CachedBytes,
|
||
CountRequests: item.CountRequests,
|
||
CountCachedRequests: item.CountCachedRequests,
|
||
CountAttackRequests: item.CountAttackRequests,
|
||
AttackBytes: item.AttackBytes,
|
||
CheckTrafficLimiting: item.CheckingTrafficLimit,
|
||
PlanId: item.PlanId,
|
||
CreatedAt: timestamp,
|
||
})
|
||
}
|
||
|
||
// 域名统计
|
||
const maxDomainsPerServer = 20
|
||
var pbDomainStats = []*pb.UploadServerDailyStatsRequest_DomainStat{}
|
||
for serverId, serverDomainMap := range domainMap {
|
||
// 如果超过单个服务最大值,则只取前N个
|
||
var shouldTrim = len(serverDomainMap) > maxDomainsPerServer
|
||
var tempItems []*pb.UploadServerDailyStatsRequest_DomainStat
|
||
|
||
for key, item := range serverDomainMap {
|
||
var pieces = strings.SplitN(key, "@", 2)
|
||
if len(pieces) != 2 {
|
||
continue
|
||
}
|
||
|
||
// 修正数据
|
||
if item.CachedBytes > item.Bytes || item.CountCachedRequests == item.CountRequests {
|
||
item.CachedBytes = item.Bytes
|
||
}
|
||
|
||
var pbItem = &pb.UploadServerDailyStatsRequest_DomainStat{
|
||
ServerId: serverId,
|
||
Domain: pieces[1],
|
||
Bytes: item.Bytes,
|
||
CachedBytes: item.CachedBytes,
|
||
CountRequests: item.CountRequests,
|
||
CountCachedRequests: item.CountCachedRequests,
|
||
CountAttackRequests: item.CountAttackRequests,
|
||
AttackBytes: item.AttackBytes,
|
||
CreatedAt: types.Int64(pieces[0]),
|
||
}
|
||
if !shouldTrim {
|
||
pbDomainStats = append(pbDomainStats, pbItem)
|
||
} else {
|
||
tempItems = append(tempItems, pbItem)
|
||
}
|
||
}
|
||
|
||
if shouldTrim {
|
||
sort.Slice(tempItems, func(i, j int) bool {
|
||
return tempItems[i].CountRequests > tempItems[j].CountRequests
|
||
})
|
||
|
||
pbDomainStats = append(pbDomainStats, tempItems[:maxDomainsPerServer]...)
|
||
}
|
||
}
|
||
|
||
// 历史未提交记录
|
||
if len(this.pbItems) > 0 || len(this.pbDomainItems) > 0 {
|
||
var expiredAt = time.Now().Unix() - 1200 // 只保留20分钟
|
||
|
||
for _, item := range this.pbItems {
|
||
if item.CreatedAt > expiredAt {
|
||
pbServerStats = append(pbServerStats, item)
|
||
}
|
||
}
|
||
this.pbItems = nil
|
||
|
||
for _, item := range this.pbDomainItems {
|
||
if item.CreatedAt > expiredAt {
|
||
pbDomainStats = append(pbDomainStats, item)
|
||
}
|
||
}
|
||
this.pbDomainItems = nil
|
||
}
|
||
|
||
if len(pbServerStats) == 0 && len(pbDomainStats) == 0 {
|
||
return nil
|
||
}
|
||
|
||
_, err = client.ServerDailyStatRPC.UploadServerDailyStats(client.Context(), &pb.UploadServerDailyStatsRequest{
|
||
Stats: pbServerStats,
|
||
DomainStats: pbDomainStats,
|
||
})
|
||
if err != nil {
|
||
// 加回历史记录
|
||
this.pbItems = pbServerStats
|
||
this.pbDomainItems = pbDomainStats
|
||
|
||
return err
|
||
}
|
||
|
||
return nil
|
||
}
|