mirror of
				https://github.com/TeaOSLab/EdgeNode.git
				synced 2025-11-04 07:40:56 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			276 lines
		
	
	
		
			5.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			276 lines
		
	
	
		
			5.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
 | 
						|
 | 
						|
package stats
 | 
						|
 | 
						|
import (
 | 
						|
	"encoding/json"
 | 
						|
	"github.com/TeaOSLab/EdgeNode/internal/events"
 | 
						|
	"github.com/TeaOSLab/EdgeNode/internal/goman"
 | 
						|
	"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
 | 
						|
	"github.com/TeaOSLab/EdgeNode/internal/trackers"
 | 
						|
	"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
 | 
						|
	"github.com/TeaOSLab/EdgeNode/internal/utils/idles"
 | 
						|
	"github.com/TeaOSLab/EdgeNode/internal/utils/kvstore"
 | 
						|
	"github.com/iwind/TeaGo/Tea"
 | 
						|
	"github.com/iwind/TeaGo/types"
 | 
						|
	timeutil "github.com/iwind/TeaGo/utils/time"
 | 
						|
	"os"
 | 
						|
	"runtime"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"testing"
 | 
						|
	"time"
 | 
						|
)
 | 
						|
 | 
						|
var SharedDAUManager = NewDAUManager()
 | 
						|
 | 
						|
type IPInfo struct {
 | 
						|
	IP       string
 | 
						|
	ServerId int64
 | 
						|
}
 | 
						|
 | 
						|
type DAUManager struct {
 | 
						|
	isReady bool
 | 
						|
 | 
						|
	cacheFile string
 | 
						|
 | 
						|
	ipChan  chan IPInfo
 | 
						|
	ipTable *kvstore.Table[[]byte] // server_DATE_serverId_ip => nil
 | 
						|
 | 
						|
	statMap    map[string]int64 // server_DATE_serverId => count
 | 
						|
	statLocker sync.RWMutex
 | 
						|
 | 
						|
	cleanTicker *time.Ticker
 | 
						|
}
 | 
						|
 | 
						|
// NewDAUManager DAU计算器
 | 
						|
func NewDAUManager() *DAUManager {
 | 
						|
	return &DAUManager{
 | 
						|
		cacheFile:   Tea.Root + "/data/stat_dau.cache",
 | 
						|
		statMap:     map[string]int64{},
 | 
						|
		cleanTicker: time.NewTicker(24 * time.Hour),
 | 
						|
		ipChan:      make(chan IPInfo, 8192),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (this *DAUManager) Init() error {
 | 
						|
	// recover from cache
 | 
						|
	_ = this.recover()
 | 
						|
 | 
						|
	// create table
 | 
						|
	store, storeErr := kvstore.DefaultStore()
 | 
						|
	if storeErr != nil {
 | 
						|
		return storeErr
 | 
						|
	}
 | 
						|
 | 
						|
	db, dbErr := store.NewDB("dau")
 | 
						|
	if dbErr != nil {
 | 
						|
		return dbErr
 | 
						|
	}
 | 
						|
 | 
						|
	{
 | 
						|
		table, err := kvstore.NewTable[[]byte]("ip", kvstore.NewNilValueEncoder())
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		db.AddTable(table)
 | 
						|
		this.ipTable = table
 | 
						|
	}
 | 
						|
 | 
						|
	{
 | 
						|
		table, err := kvstore.NewTable[uint64]("stats", kvstore.NewIntValueEncoder[uint64]())
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		db.AddTable(table)
 | 
						|
	}
 | 
						|
 | 
						|
	// clean expires items
 | 
						|
	goman.New(func() {
 | 
						|
		idles.RunTicker(this.cleanTicker, func() {
 | 
						|
			err := this.CleanStats()
 | 
						|
			if err != nil {
 | 
						|
				remotelogs.Error("DAU_MANAGER", "clean stats failed: "+err.Error())
 | 
						|
			}
 | 
						|
		})
 | 
						|
	})
 | 
						|
 | 
						|
	// dump ip to kvstore
 | 
						|
	goman.New(func() {
 | 
						|
		// cache latest IPs to reduce kv queries
 | 
						|
		var cachedIPs []IPInfo
 | 
						|
		var maxIPs = runtime.NumCPU() * 8
 | 
						|
		if maxIPs <= 0 {
 | 
						|
			maxIPs = 8
 | 
						|
		} else if maxIPs > 64 {
 | 
						|
			maxIPs = 64
 | 
						|
		}
 | 
						|
 | 
						|
		var day = fasttime.Now().Ymd()
 | 
						|
 | 
						|
	Loop:
 | 
						|
		for ipInfo := range this.ipChan {
 | 
						|
			// check day
 | 
						|
			if fasttime.Now().Ymd() != day {
 | 
						|
				day = fasttime.Now().Ymd()
 | 
						|
				cachedIPs = []IPInfo{}
 | 
						|
			}
 | 
						|
 | 
						|
			// lookup cache
 | 
						|
			for _, cachedIP := range cachedIPs {
 | 
						|
				if cachedIP.IP == ipInfo.IP && cachedIP.ServerId == ipInfo.ServerId {
 | 
						|
					continue Loop
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			// add to cache
 | 
						|
			cachedIPs = append(cachedIPs, ipInfo)
 | 
						|
			if len(cachedIPs) > maxIPs {
 | 
						|
				cachedIPs = cachedIPs[1:]
 | 
						|
			}
 | 
						|
 | 
						|
			_ = this.processIP(ipInfo.ServerId, ipInfo.IP)
 | 
						|
		}
 | 
						|
	})
 | 
						|
 | 
						|
	// dump to cache when close
 | 
						|
	events.OnClose(func() {
 | 
						|
		_ = this.Close()
 | 
						|
	})
 | 
						|
 | 
						|
	this.isReady = true
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (this *DAUManager) AddIP(serverId int64, ip string) {
 | 
						|
	select {
 | 
						|
	case this.ipChan <- IPInfo{
 | 
						|
		IP:       ip,
 | 
						|
		ServerId: serverId,
 | 
						|
	}:
 | 
						|
	default:
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (this *DAUManager) processIP(serverId int64, ip string) error {
 | 
						|
	if !this.isReady {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	// day
 | 
						|
	var date = fasttime.Now().Ymd()
 | 
						|
 | 
						|
	{
 | 
						|
		var key = "server_" + date + "_" + types.String(serverId) + "_" + ip
 | 
						|
		found, err := this.ipTable.Exist(key)
 | 
						|
		if err != nil || found {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
 | 
						|
		err = this.ipTable.Set(key, nil)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	{
 | 
						|
		var key = "server_" + date + "_" + types.String(serverId)
 | 
						|
		this.statLocker.Lock()
 | 
						|
		this.statMap[key] = this.statMap[key] + 1
 | 
						|
		this.statLocker.Unlock()
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (this *DAUManager) ReadStatMap() map[string]int64 {
 | 
						|
	this.statLocker.Lock()
 | 
						|
	var statMap = this.statMap
 | 
						|
	this.statMap = map[string]int64{}
 | 
						|
	this.statLocker.Unlock()
 | 
						|
	return statMap
 | 
						|
}
 | 
						|
 | 
						|
func (this *DAUManager) Flush() error {
 | 
						|
	return this.ipTable.DB().Store().Flush()
 | 
						|
}
 | 
						|
 | 
						|
func (this *DAUManager) TestInspect(t *testing.T) {
 | 
						|
	err := this.ipTable.DB().Inspect(func(key []byte, value []byte) {
 | 
						|
		t.Log(string(key), "=>", string(value))
 | 
						|
	})
 | 
						|
	if err != nil {
 | 
						|
		t.Fatal(err)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (this *DAUManager) Close() error {
 | 
						|
	this.cleanTicker.Stop()
 | 
						|
 | 
						|
	this.statLocker.Lock()
 | 
						|
	var statMap = this.statMap
 | 
						|
	this.statMap = map[string]int64{}
 | 
						|
	this.statLocker.Unlock()
 | 
						|
 | 
						|
	if len(statMap) == 0 {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	statJSON, err := json.Marshal(statMap)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	return os.WriteFile(this.cacheFile, statJSON, 0666)
 | 
						|
}
 | 
						|
 | 
						|
func (this *DAUManager) CleanStats() error {
 | 
						|
	if !this.isReady {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	var tr = trackers.Begin("STAT:DAU_CLEAN_STATS")
 | 
						|
	defer tr.End()
 | 
						|
 | 
						|
	// day
 | 
						|
	{
 | 
						|
		var date = timeutil.Format("Ymd", time.Now().AddDate(0, 0, -2))
 | 
						|
		err := this.ipTable.DeleteRange("server_", "server_"+date)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (this *DAUManager) Truncate() error {
 | 
						|
	return this.ipTable.Truncate()
 | 
						|
}
 | 
						|
 | 
						|
func (this *DAUManager) recover() error {
 | 
						|
	data, err := os.ReadFile(this.cacheFile)
 | 
						|
	if err != nil || len(data) == 0 {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	_ = os.Remove(this.cacheFile)
 | 
						|
 | 
						|
	var statMap = map[string]int64{}
 | 
						|
	err = json.Unmarshal(data, &statMap)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	var today = timeutil.Format("Ymd")
 | 
						|
	for key := range statMap {
 | 
						|
		var pieces = strings.Split(key, "_")
 | 
						|
		if pieces[1] != today {
 | 
						|
			delete(statMap, key)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	this.statMap = statMap
 | 
						|
	return nil
 | 
						|
}
 |