From fe069762bbbddf8816275d5578b3e6f0a3d27c61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E7=A5=A5=E8=B6=85?= Date: Fri, 12 Apr 2024 20:12:09 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=BD=91=E7=AB=99=E6=AF=8F?= =?UTF-8?q?=E6=97=A5=E7=8B=AC=E7=AB=8BIP=E7=BB=9F=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/nodes/http_request.go | 3 + internal/stats/bandwidth_stat_manager.go | 26 ++- internal/stats/dau_manager.go | 266 +++++++++++++++++++++++ internal/stats/dau_manager_test.go | 135 ++++++++++++ internal/utils/kvstore/db.go | 24 ++ 5 files changed, 442 insertions(+), 12 deletions(-) create mode 100644 internal/stats/dau_manager.go create mode 100644 internal/stats/dau_manager_test.go diff --git a/internal/nodes/http_request.go b/internal/nodes/http_request.go index cd4369d..23c8130 100644 --- a/internal/nodes/http_request.go +++ b/internal/nodes/http_request.go @@ -453,6 +453,9 @@ func (this *HTTPRequest) doEnd() { stats.SharedTrafficStatManager.Add(this.ReqServer.UserId, this.ReqServer.Id, this.ReqHost, totalBytes, cachedBytes, 1, countCached, countAttacks, attackBytes, countWebsocketConnections, this.ReqServer.ShouldCheckTrafficLimit(), this.ReqServer.PlanId()) + // unique IP + stats.SharedDAUManager.AddIP(this.ReqServer.Id, this.requestRemoteAddr(true)) + // 指标 if metrics.SharedManager.HasHTTPMetrics() { this.doMetricsResponse() diff --git a/internal/stats/bandwidth_stat_manager.go b/internal/stats/bandwidth_stat_manager.go index f9e30a4..4e370f2 100644 --- a/internal/stats/bandwidth_stat_manager.go +++ b/internal/stats/bandwidth_stat_manager.go @@ -76,12 +76,15 @@ type BandwidthStatManager struct { ticker *time.Ticker locker sync.Mutex + + cacheFile string } func NewBandwidthStatManager() *BandwidthStatManager { return &BandwidthStatManager{ - m: map[string]*BandwidthStat{}, - ticker: time.NewTicker(1 * time.Minute), // 时间小于1分钟是为了更快速地上传结果 + m: map[string]*BandwidthStat{}, + ticker: time.NewTicker(1 * time.Minute), // 时间小于1分钟是为了更快速地上传结果 + cacheFile: Tea.Root + "/data/stat_bandwidth.cache", } } @@ -130,6 +133,8 @@ func (this *BandwidthStatManager) Loop() error { this.pbStats = nil } + var ipStatMap = SharedDAUManager.ReadStatMap() + this.locker.Lock() for key, stat := range this.m { if stat.Day < day || stat.TimeAt < currentTime { @@ -142,6 +147,8 @@ func (this *BandwidthStatManager) Loop() error { stat.AttackBytes = stat.TotalBytes } + var ipKey = "server_" + stat.Day + "_" + types.String(stat.ServerId) + pbStats = append(pbStats, &pb.ServerBandwidthStat{ Id: 0, UserId: stat.UserId, @@ -156,6 +163,7 @@ func (this *BandwidthStatManager) Loop() error { CountCachedRequests: stat.CountCachedRequests, CountAttackRequests: stat.CountAttackRequests, CountWebsocketConnections: stat.CountWebsocketConnections, + CountIPs: ipStatMap[ipKey], UserPlanId: stat.UserPlanId, NodeRegionId: regionId, }) @@ -284,8 +292,8 @@ func (this *BandwidthStatManager) Save() error { return err } - _ = os.Remove(this.cacheFile()) - return os.WriteFile(this.cacheFile(), data, 0666) + _ = os.Remove(this.cacheFile) + return os.WriteFile(this.cacheFile, data, 0666) } // Cancel 取消上传 @@ -295,7 +303,7 @@ func (this *BandwidthStatManager) Cancel() { // 从本地缓存文件中恢复数据 func (this *BandwidthStatManager) recover() { - cacheData, err := os.ReadFile(this.cacheFile()) + cacheData, err := os.ReadFile(this.cacheFile) if err == nil { var m = map[string]*BandwidthStat{} err = json.Unmarshal(cacheData, &m) @@ -317,12 +325,6 @@ func (this *BandwidthStatManager) recover() { } } } - _ = os.Remove(this.cacheFile()) + _ = os.Remove(this.cacheFile) } } - -// 获取缓存文件 -// 不能在init()中初始化,避免无法获得正确的路径 -func (this *BandwidthStatManager) cacheFile() string { - return Tea.Root + "/data/bandwidth.dat" -} diff --git a/internal/stats/dau_manager.go b/internal/stats/dau_manager.go new file mode 100644 index 0000000..bf30857 --- /dev/null +++ b/internal/stats/dau_manager.go @@ -0,0 +1,266 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package stats + +import ( + "encoding/json" + teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" + "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" + "github.com/TeaOSLab/EdgeNode/internal/utils/kvstore" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/types" + timeutil "github.com/iwind/TeaGo/utils/time" + "os" + "runtime" + "strings" + "sync" + "testing" + "time" +) + +var SharedDAUManager = NewDAUManager() + +func init() { + if teaconst.IsMain { + err := SharedDAUManager.Init() + if err != nil { + remotelogs.Error("DAU_MANAGER", "initialize DAU manager failed: "+err.Error()) + } + } +} + +type IPInfo struct { + IP string + ServerId int64 +} + +type DAUManager struct { + cacheFile string + + ipChan chan IPInfo + ipTable *kvstore.Table[[]byte] // server_DATE_serverId_ip => nil + + statMap map[string]int64 // server_DATE_serverId => count + statLocker sync.RWMutex + + cleanTicker *time.Ticker +} + +// NewDAUManager DAU计算器 +func NewDAUManager() *DAUManager { + return &DAUManager{ + cacheFile: Tea.Root + "/data/stat_dau.cache", + statMap: map[string]int64{}, + cleanTicker: time.NewTicker(24 * time.Hour), + ipChan: make(chan IPInfo, 8192), + } +} + +func (this *DAUManager) Init() error { + // recover from cache + _ = this.recover() + + // create table + store, storeErr := kvstore.DefaultStore() + if storeErr != nil { + return storeErr + } + + db, dbErr := store.NewDB("dau") + if dbErr != nil { + return dbErr + } + + { + table, err := kvstore.NewTable[[]byte]("ip", kvstore.NewNilValueEncoder()) + if err != nil { + return err + } + db.AddTable(table) + this.ipTable = table + } + + { + table, err := kvstore.NewTable[uint64]("stats", kvstore.NewIntValueEncoder[uint64]()) + if err != nil { + return err + } + db.AddTable(table) + } + + // clean expires items + goman.New(func() { + for range this.cleanTicker.C { + err := this.CleanStats() + if err != nil { + remotelogs.Error("DAU_MANAGER", "clean stats failed: "+err.Error()) + } + } + }) + + // dump ip to kvstore + goman.New(func() { + // cache latest IPs to reduce kv queries + var cachedIPs []IPInfo + var maxIPs = runtime.NumCPU() * 8 + if maxIPs <= 0 { + maxIPs = 8 + } else if maxIPs > 64 { + maxIPs = 64 + } + + var day = fasttime.Now().Ymd() + + Loop: + for ipInfo := range this.ipChan { + // check day + if fasttime.Now().Ymd() != day { + day = fasttime.Now().Ymd() + cachedIPs = []IPInfo{} + } + + // lookup cache + for _, cachedIP := range cachedIPs { + if cachedIP.IP == ipInfo.IP && cachedIP.ServerId == ipInfo.ServerId { + continue Loop + } + } + + // add to cache + cachedIPs = append(cachedIPs, ipInfo) + if len(cachedIPs) > maxIPs { + cachedIPs = cachedIPs[1:] + } + + _ = this.processIP(ipInfo.ServerId, ipInfo.IP) + } + }) + + // dump to cache when close + events.OnClose(func() { + _ = this.Close() + }) + + return nil +} + +func (this *DAUManager) AddIP(serverId int64, ip string) { + select { + case this.ipChan <- IPInfo{ + IP: ip, + ServerId: serverId, + }: + default: + } +} + +func (this *DAUManager) processIP(serverId int64, ip string) error { + // day + var date = fasttime.Now().Ymd() + + { + var key = "server_" + date + "_" + types.String(serverId) + "_" + ip + found, err := this.ipTable.Exist(key) + if err != nil || found { + return err + } + + err = this.ipTable.Set(key, nil) + if err != nil { + return err + } + } + + { + var key = "server_" + date + "_" + types.String(serverId) + this.statLocker.Lock() + this.statMap[key] = this.statMap[key] + 1 + this.statLocker.Unlock() + } + + return nil +} + +func (this *DAUManager) ReadStatMap() map[string]int64 { + this.statLocker.Lock() + var statMap = this.statMap + this.statMap = map[string]int64{} + this.statLocker.Unlock() + return statMap +} + +func (this *DAUManager) Flush() error { + return this.ipTable.DB().Store().Flush() +} + +func (this *DAUManager) TestInspect(t *testing.T) { + err := this.ipTable.DB().Inspect(func(key []byte, value []byte) { + t.Log(string(key), "=>", string(value)) + }) + if err != nil { + t.Fatal(err) + } +} + +func (this *DAUManager) Close() error { + this.statLocker.Lock() + var statMap = this.statMap + this.statMap = map[string]int64{} + this.statLocker.Unlock() + + if len(statMap) == 0 { + return nil + } + + statJSON, err := json.Marshal(statMap) + if err != nil { + return err + } + + return os.WriteFile(this.cacheFile, statJSON, 0666) +} + +func (this *DAUManager) CleanStats() error { + // day + { + var date = timeutil.Format("Ymd", time.Now().AddDate(0, 0, -2)) + err := this.ipTable.DeleteRange("server_", "server_"+date) + if err != nil { + return err + } + } + + return nil +} + +func (this *DAUManager) Truncate() error { + return this.ipTable.Truncate() +} + +func (this *DAUManager) recover() error { + data, err := os.ReadFile(this.cacheFile) + if err != nil || len(data) == 0 { + return err + } + + _ = os.Remove(this.cacheFile) + + var statMap = map[string]int64{} + err = json.Unmarshal(data, &statMap) + if err != nil { + return err + } + + var today = timeutil.Format("Ymd") + for key := range statMap { + var pieces = strings.Split(key, "_") + if pieces[1] != today { + delete(statMap, key) + } + } + this.statMap = statMap + return nil +} diff --git a/internal/stats/dau_manager_test.go b/internal/stats/dau_manager_test.go new file mode 100644 index 0000000..94fe1c5 --- /dev/null +++ b/internal/stats/dau_manager_test.go @@ -0,0 +1,135 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package stats_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/stats" + "github.com/TeaOSLab/EdgeNode/internal/utils/testutils" + "github.com/iwind/TeaGo/rands" + "runtime" + "testing" + "time" +) + +func TestDAUManager_AddIP(t *testing.T) { + var manager = stats.NewDAUManager() + err := manager.Init() + if err != nil { + t.Fatal(err) + } + + manager.AddIP(1, "127.0.0.1") + manager.AddIP(1, "127.0.0.2") + manager.AddIP(1, "127.0.0.3") + manager.AddIP(1, "127.0.0.4") + manager.AddIP(1, "127.0.0.2") + manager.AddIP(1, "127.0.0.3") + + time.Sleep(1 * time.Second) + + err = manager.Close() + if err != nil { + t.Fatal(err) + } + + t.Log("======") + manager.TestInspect(t) +} + +func TestDAUManager_AddIP_Many(t *testing.T) { + var manager = stats.NewDAUManager() + err := manager.Init() + if err != nil { + t.Fatal(err) + } + + var before = time.Now() + defer func() { + t.Log("cost:", time.Since(before).Seconds()*1000, "ms") + }() + + var count = 1 + + if testutils.IsSingleTesting() { + count = 10_000 + } + + for i := 0; i < count; i++ { + manager.AddIP(int64(rands.Int(1, 10)), testutils.RandIP()) + } +} + +func TestDAUManager_CleanStats(t *testing.T) { + var manager = stats.NewDAUManager() + err := manager.Init() + if err != nil { + t.Fatal(err) + } + + var before = time.Now() + defer func() { + t.Log("cost:", time.Since(before).Seconds()*1000, "ms") + }() + + defer func() { + _ = manager.Flush() + }() + + err = manager.CleanStats() + if err != nil { + t.Fatal(err) + } +} + +func TestDAUManager_TestInspect(t *testing.T) { + var manager = stats.NewDAUManager() + err := manager.Init() + if err != nil { + t.Fatal(err) + } + + manager.TestInspect(t) +} + +func TestDAUManager_Truncate(t *testing.T) { + var manager = stats.NewDAUManager() + err := manager.Init() + if err != nil { + t.Fatal(err) + } + + err = manager.Truncate() + if err != nil { + t.Fatal(err) + } + + err = manager.Flush() + if err != nil { + t.Fatal(err) + } +} + +func BenchmarkDAUManager_AddIP_Cache(b *testing.B) { + runtime.GOMAXPROCS(1) + + var cachedIPs []stats.IPInfo + var maxIPs = 128 + b.Log("maxIPs:", maxIPs) + for i := 0; i < maxIPs; i++ { + cachedIPs = append(cachedIPs, stats.IPInfo{ + IP: testutils.RandIP(), + ServerId: 1, + }) + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + var ip = "1.2.3.4" + for _, cacheIP := range cachedIPs { + if cacheIP.IP == ip && cacheIP.ServerId == 1 { + break + } + } + } +} diff --git a/internal/utils/kvstore/db.go b/internal/utils/kvstore/db.go index 0de9bb0..444f38b 100644 --- a/internal/utils/kvstore/db.go +++ b/internal/utils/kvstore/db.go @@ -4,6 +4,7 @@ package kvstore import ( "errors" + "github.com/cockroachdb/pebble" "sync" ) @@ -52,6 +53,29 @@ func (this *DB) Store() *Store { return this.store } +func (this *DB) Inspect(fn func(key []byte, value []byte)) error { + it, err := this.store.rawDB.NewIter(&pebble.IterOptions{ + LowerBound: []byte(this.namespace), + UpperBound: append([]byte(this.namespace), 0xFF, 0xFF), + }) + if err != nil { + return err + } + defer func() { + _ = it.Close() + }() + + for it.First(); it.Valid(); it.Next() { + value, valueErr := it.ValueAndErr() + if valueErr != nil { + return valueErr + } + fn(it.Key(), value) + } + + return nil +} + // Truncate the database func (this *DB) Truncate() error { this.mu.Lock()