增加网站每日独立IP统计

This commit is contained in:
GoEdgeLab
2024-04-12 20:12:09 +08:00
parent f1e368f85d
commit ca43d119e4
5 changed files with 442 additions and 12 deletions

View File

@@ -453,6 +453,9 @@ func (this *HTTPRequest) doEnd() {
stats.SharedTrafficStatManager.Add(this.ReqServer.UserId, this.ReqServer.Id, this.ReqHost, totalBytes, cachedBytes, 1, countCached, countAttacks, attackBytes, countWebsocketConnections, this.ReqServer.ShouldCheckTrafficLimit(), this.ReqServer.PlanId())
// unique IP
stats.SharedDAUManager.AddIP(this.ReqServer.Id, this.requestRemoteAddr(true))
// 指标
if metrics.SharedManager.HasHTTPMetrics() {
this.doMetricsResponse()

View File

@@ -76,12 +76,15 @@ type BandwidthStatManager struct {
ticker *time.Ticker
locker sync.Mutex
cacheFile string
}
func NewBandwidthStatManager() *BandwidthStatManager {
return &BandwidthStatManager{
m: map[string]*BandwidthStat{},
ticker: time.NewTicker(1 * time.Minute), // 时间小于1分钟是为了更快速地上传结果
m: map[string]*BandwidthStat{},
ticker: time.NewTicker(1 * time.Minute), // 时间小于1分钟是为了更快速地上传结果
cacheFile: Tea.Root + "/data/stat_bandwidth.cache",
}
}
@@ -130,6 +133,8 @@ func (this *BandwidthStatManager) Loop() error {
this.pbStats = nil
}
var ipStatMap = SharedDAUManager.ReadStatMap()
this.locker.Lock()
for key, stat := range this.m {
if stat.Day < day || stat.TimeAt < currentTime {
@@ -142,6 +147,8 @@ func (this *BandwidthStatManager) Loop() error {
stat.AttackBytes = stat.TotalBytes
}
var ipKey = "server_" + stat.Day + "_" + types.String(stat.ServerId)
pbStats = append(pbStats, &pb.ServerBandwidthStat{
Id: 0,
UserId: stat.UserId,
@@ -156,6 +163,7 @@ func (this *BandwidthStatManager) Loop() error {
CountCachedRequests: stat.CountCachedRequests,
CountAttackRequests: stat.CountAttackRequests,
CountWebsocketConnections: stat.CountWebsocketConnections,
CountIPs: ipStatMap[ipKey],
UserPlanId: stat.UserPlanId,
NodeRegionId: regionId,
})
@@ -284,8 +292,8 @@ func (this *BandwidthStatManager) Save() error {
return err
}
_ = os.Remove(this.cacheFile())
return os.WriteFile(this.cacheFile(), data, 0666)
_ = os.Remove(this.cacheFile)
return os.WriteFile(this.cacheFile, data, 0666)
}
// Cancel 取消上传
@@ -295,7 +303,7 @@ func (this *BandwidthStatManager) Cancel() {
// 从本地缓存文件中恢复数据
func (this *BandwidthStatManager) recover() {
cacheData, err := os.ReadFile(this.cacheFile())
cacheData, err := os.ReadFile(this.cacheFile)
if err == nil {
var m = map[string]*BandwidthStat{}
err = json.Unmarshal(cacheData, &m)
@@ -317,12 +325,6 @@ func (this *BandwidthStatManager) recover() {
}
}
}
_ = os.Remove(this.cacheFile())
_ = os.Remove(this.cacheFile)
}
}
// 获取缓存文件
// 不能在init()中初始化,避免无法获得正确的路径
func (this *BandwidthStatManager) cacheFile() string {
return Tea.Root + "/data/bandwidth.dat"
}

View File

@@ -0,0 +1,266 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package stats
import (
"encoding/json"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
"github.com/TeaOSLab/EdgeNode/internal/utils/kvstore"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/types"
timeutil "github.com/iwind/TeaGo/utils/time"
"os"
"runtime"
"strings"
"sync"
"testing"
"time"
)
var SharedDAUManager = NewDAUManager()
func init() {
if teaconst.IsMain {
err := SharedDAUManager.Init()
if err != nil {
remotelogs.Error("DAU_MANAGER", "initialize DAU manager failed: "+err.Error())
}
}
}
type IPInfo struct {
IP string
ServerId int64
}
type DAUManager struct {
cacheFile string
ipChan chan IPInfo
ipTable *kvstore.Table[[]byte] // server_DATE_serverId_ip => nil
statMap map[string]int64 // server_DATE_serverId => count
statLocker sync.RWMutex
cleanTicker *time.Ticker
}
// NewDAUManager DAU计算器
func NewDAUManager() *DAUManager {
return &DAUManager{
cacheFile: Tea.Root + "/data/stat_dau.cache",
statMap: map[string]int64{},
cleanTicker: time.NewTicker(24 * time.Hour),
ipChan: make(chan IPInfo, 8192),
}
}
func (this *DAUManager) Init() error {
// recover from cache
_ = this.recover()
// create table
store, storeErr := kvstore.DefaultStore()
if storeErr != nil {
return storeErr
}
db, dbErr := store.NewDB("dau")
if dbErr != nil {
return dbErr
}
{
table, err := kvstore.NewTable[[]byte]("ip", kvstore.NewNilValueEncoder())
if err != nil {
return err
}
db.AddTable(table)
this.ipTable = table
}
{
table, err := kvstore.NewTable[uint64]("stats", kvstore.NewIntValueEncoder[uint64]())
if err != nil {
return err
}
db.AddTable(table)
}
// clean expires items
goman.New(func() {
for range this.cleanTicker.C {
err := this.CleanStats()
if err != nil {
remotelogs.Error("DAU_MANAGER", "clean stats failed: "+err.Error())
}
}
})
// dump ip to kvstore
goman.New(func() {
// cache latest IPs to reduce kv queries
var cachedIPs []IPInfo
var maxIPs = runtime.NumCPU() * 8
if maxIPs <= 0 {
maxIPs = 8
} else if maxIPs > 64 {
maxIPs = 64
}
var day = fasttime.Now().Ymd()
Loop:
for ipInfo := range this.ipChan {
// check day
if fasttime.Now().Ymd() != day {
day = fasttime.Now().Ymd()
cachedIPs = []IPInfo{}
}
// lookup cache
for _, cachedIP := range cachedIPs {
if cachedIP.IP == ipInfo.IP && cachedIP.ServerId == ipInfo.ServerId {
continue Loop
}
}
// add to cache
cachedIPs = append(cachedIPs, ipInfo)
if len(cachedIPs) > maxIPs {
cachedIPs = cachedIPs[1:]
}
_ = this.processIP(ipInfo.ServerId, ipInfo.IP)
}
})
// dump to cache when close
events.OnClose(func() {
_ = this.Close()
})
return nil
}
func (this *DAUManager) AddIP(serverId int64, ip string) {
select {
case this.ipChan <- IPInfo{
IP: ip,
ServerId: serverId,
}:
default:
}
}
func (this *DAUManager) processIP(serverId int64, ip string) error {
// day
var date = fasttime.Now().Ymd()
{
var key = "server_" + date + "_" + types.String(serverId) + "_" + ip
found, err := this.ipTable.Exist(key)
if err != nil || found {
return err
}
err = this.ipTable.Set(key, nil)
if err != nil {
return err
}
}
{
var key = "server_" + date + "_" + types.String(serverId)
this.statLocker.Lock()
this.statMap[key] = this.statMap[key] + 1
this.statLocker.Unlock()
}
return nil
}
func (this *DAUManager) ReadStatMap() map[string]int64 {
this.statLocker.Lock()
var statMap = this.statMap
this.statMap = map[string]int64{}
this.statLocker.Unlock()
return statMap
}
func (this *DAUManager) Flush() error {
return this.ipTable.DB().Store().Flush()
}
func (this *DAUManager) TestInspect(t *testing.T) {
err := this.ipTable.DB().Inspect(func(key []byte, value []byte) {
t.Log(string(key), "=>", string(value))
})
if err != nil {
t.Fatal(err)
}
}
func (this *DAUManager) Close() error {
this.statLocker.Lock()
var statMap = this.statMap
this.statMap = map[string]int64{}
this.statLocker.Unlock()
if len(statMap) == 0 {
return nil
}
statJSON, err := json.Marshal(statMap)
if err != nil {
return err
}
return os.WriteFile(this.cacheFile, statJSON, 0666)
}
func (this *DAUManager) CleanStats() error {
// day
{
var date = timeutil.Format("Ymd", time.Now().AddDate(0, 0, -2))
err := this.ipTable.DeleteRange("server_", "server_"+date)
if err != nil {
return err
}
}
return nil
}
func (this *DAUManager) Truncate() error {
return this.ipTable.Truncate()
}
func (this *DAUManager) recover() error {
data, err := os.ReadFile(this.cacheFile)
if err != nil || len(data) == 0 {
return err
}
_ = os.Remove(this.cacheFile)
var statMap = map[string]int64{}
err = json.Unmarshal(data, &statMap)
if err != nil {
return err
}
var today = timeutil.Format("Ymd")
for key := range statMap {
var pieces = strings.Split(key, "_")
if pieces[1] != today {
delete(statMap, key)
}
}
this.statMap = statMap
return nil
}

View File

@@ -0,0 +1,135 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package stats_test
import (
"github.com/TeaOSLab/EdgeNode/internal/stats"
"github.com/TeaOSLab/EdgeNode/internal/utils/testutils"
"github.com/iwind/TeaGo/rands"
"runtime"
"testing"
"time"
)
func TestDAUManager_AddIP(t *testing.T) {
var manager = stats.NewDAUManager()
err := manager.Init()
if err != nil {
t.Fatal(err)
}
manager.AddIP(1, "127.0.0.1")
manager.AddIP(1, "127.0.0.2")
manager.AddIP(1, "127.0.0.3")
manager.AddIP(1, "127.0.0.4")
manager.AddIP(1, "127.0.0.2")
manager.AddIP(1, "127.0.0.3")
time.Sleep(1 * time.Second)
err = manager.Close()
if err != nil {
t.Fatal(err)
}
t.Log("======")
manager.TestInspect(t)
}
func TestDAUManager_AddIP_Many(t *testing.T) {
var manager = stats.NewDAUManager()
err := manager.Init()
if err != nil {
t.Fatal(err)
}
var before = time.Now()
defer func() {
t.Log("cost:", time.Since(before).Seconds()*1000, "ms")
}()
var count = 1
if testutils.IsSingleTesting() {
count = 10_000
}
for i := 0; i < count; i++ {
manager.AddIP(int64(rands.Int(1, 10)), testutils.RandIP())
}
}
func TestDAUManager_CleanStats(t *testing.T) {
var manager = stats.NewDAUManager()
err := manager.Init()
if err != nil {
t.Fatal(err)
}
var before = time.Now()
defer func() {
t.Log("cost:", time.Since(before).Seconds()*1000, "ms")
}()
defer func() {
_ = manager.Flush()
}()
err = manager.CleanStats()
if err != nil {
t.Fatal(err)
}
}
func TestDAUManager_TestInspect(t *testing.T) {
var manager = stats.NewDAUManager()
err := manager.Init()
if err != nil {
t.Fatal(err)
}
manager.TestInspect(t)
}
func TestDAUManager_Truncate(t *testing.T) {
var manager = stats.NewDAUManager()
err := manager.Init()
if err != nil {
t.Fatal(err)
}
err = manager.Truncate()
if err != nil {
t.Fatal(err)
}
err = manager.Flush()
if err != nil {
t.Fatal(err)
}
}
func BenchmarkDAUManager_AddIP_Cache(b *testing.B) {
runtime.GOMAXPROCS(1)
var cachedIPs []stats.IPInfo
var maxIPs = 128
b.Log("maxIPs:", maxIPs)
for i := 0; i < maxIPs; i++ {
cachedIPs = append(cachedIPs, stats.IPInfo{
IP: testutils.RandIP(),
ServerId: 1,
})
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
var ip = "1.2.3.4"
for _, cacheIP := range cachedIPs {
if cacheIP.IP == ip && cacheIP.ServerId == 1 {
break
}
}
}
}

View File

@@ -4,6 +4,7 @@ package kvstore
import (
"errors"
"github.com/cockroachdb/pebble"
"sync"
)
@@ -52,6 +53,29 @@ func (this *DB) Store() *Store {
return this.store
}
func (this *DB) Inspect(fn func(key []byte, value []byte)) error {
it, err := this.store.rawDB.NewIter(&pebble.IterOptions{
LowerBound: []byte(this.namespace),
UpperBound: append([]byte(this.namespace), 0xFF, 0xFF),
})
if err != nil {
return err
}
defer func() {
_ = it.Close()
}()
for it.First(); it.Valid(); it.Next() {
value, valueErr := it.ValueAndErr()
if valueErr != nil {
return valueErr
}
fn(it.Key(), value)
}
return nil
}
// Truncate the database
func (this *DB) Truncate() error {
this.mu.Lock()