From eb37493af2d59bae16311c3b40d3eae35d851902 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Wed, 8 Sep 2021 17:32:08 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AF=B9=E5=9F=9F=E5=90=8D=E7=BB=9F=E8=AE=A1?= =?UTF-8?q?=E8=BF=9B=E8=A1=8C=E5=88=86=E8=A1=A8=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/db/models/metric_stat_dao_test.go | 3 +- .../stats/server_domain_hourly_stat_dao.go | 266 ++++++++++++++---- .../server_domain_hourly_stat_dao_test.go | 68 +++++ .../stats/traffic_hourly_stat_dao_test.go | 2 +- 4 files changed, 290 insertions(+), 49 deletions(-) diff --git a/internal/db/models/metric_stat_dao_test.go b/internal/db/models/metric_stat_dao_test.go index 5cc59f25..2b3d3c83 100644 --- a/internal/db/models/metric_stat_dao_test.go +++ b/internal/db/models/metric_stat_dao_test.go @@ -5,12 +5,13 @@ import ( _ "github.com/iwind/TeaGo/bootstrap" "github.com/iwind/TeaGo/rands" "github.com/iwind/TeaGo/types" + timeutil "github.com/iwind/TeaGo/utils/time" "testing" ) func TestNewMetricStatDAO_InsertMany(t *testing.T) { for i := 0; i <= 10_000_000; i++ { - err := NewMetricStatDAO().CreateStat(nil, types.String(i)+"_v1", 18, int64(rands.Int(0, 10000)), int64(rands.Int(0, 10000)), int64(rands.Int(0, 100)), []string{"/html" + types.String(i)}, 1, "20210830", 0) + err := NewMetricStatDAO().CreateStat(nil, types.String(i)+"_v1", 18, int64(rands.Int(0, 10000)), int64(rands.Int(0, 10000)), int64(rands.Int(0, 100)), []string{"/html" + types.String(i)}, 1, timeutil.Format("Ymd"), 0) if err != nil { t.Fatal(err) } diff --git a/internal/db/models/stats/server_domain_hourly_stat_dao.go b/internal/db/models/stats/server_domain_hourly_stat_dao.go index 5c6a45b4..0dbd25fc 100644 --- a/internal/db/models/stats/server_domain_hourly_stat_dao.go +++ b/internal/db/models/stats/server_domain_hourly_stat_dao.go @@ -8,7 +8,11 @@ import ( "github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/maps" "github.com/iwind/TeaGo/rands" + "github.com/iwind/TeaGo/types" timeutil "github.com/iwind/TeaGo/utils/time" + "sort" + "strings" + "sync" "time" ) @@ -48,12 +52,40 @@ func init() { }) } +// PartitionTable 获取分区表格名称 +func (this *ServerDomainHourlyStatDAO) PartitionTable(domain string) string { + if len(domain) == 0 { + return this.Table + "_0" + } + if (domain[0] >= '0' && domain[0] <= '9') || (domain[0] >= 'a' && domain[0] <= 'z') || (domain[0] >= 'A' && domain[0] <= 'Z') { + return this.Table + "_" + strings.ToLower(string(domain[0])) + } + + return this.Table + "_0" +} + +// FindAllPartitionTables 获取所有表格名称 +func (this *ServerDomainHourlyStatDAO) FindAllPartitionTables() []string { + var tables = []string{} + for i := '0'; i <= '9'; i++ { + tables = append(tables, this.Table+"_"+string(i)) + } + for i := 'a'; i <= 'z'; i++ { + tables = append(tables, this.Table+"_"+string(i)) + } + return tables +} + // IncreaseHourlyStat 增加统计数据 func (this *ServerDomainHourlyStatDAO) IncreaseHourlyStat(tx *dbs.Tx, clusterId int64, nodeId int64, serverId int64, domain string, hour string, bytes int64, cachedBytes int64, countRequests int64, countCachedRequests int64, countAttackRequests int64, attackBytes int64) error { if len(hour) != 10 { return errors.New("invalid hour '" + hour + "'") } + if len(domain) == 0 { + return nil + } err := this.Query(tx). + Table(this.PartitionTable(domain)). Param("bytes", bytes). Param("cachedBytes", cachedBytes). Param("countRequests", countRequests). @@ -87,69 +119,209 @@ func (this *ServerDomainHourlyStatDAO) IncreaseHourlyStat(tx *dbs.Tx, clusterId } // FindTopDomainStats 取得一定时间内的域名排行数据 -func (this *ServerDomainHourlyStatDAO) FindTopDomainStats(tx *dbs.Tx, hourFrom string, hourTo string, size int64) (result []*ServerDomainHourlyStat, err error) { - // TODO 节点如果已经被删除,则忽略 - _, err = this.Query(tx). - Between("hour", hourFrom, hourTo). - Result("domain, MIN(serverId) AS serverId, SUM(bytes) AS bytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests, SUM(countAttackRequests) AS countAttackRequests, SUM(attackBytes) AS attackBytes"). - Group("domain"). - Desc("countRequests"). - Limit(size). - Slice(&result). - FindAll() +func (this *ServerDomainHourlyStatDAO) FindTopDomainStats(tx *dbs.Tx, hourFrom string, hourTo string, size int64) (result []*ServerDomainHourlyStat, resultErr error) { + var tables = this.FindAllPartitionTables() + var wg = sync.WaitGroup{} + wg.Add(len(tables)) + var locker = sync.Mutex{} + + for _, table := range tables { + go func(table string) { + defer wg.Done() + + var topResults = []*ServerDomainHourlyStat{} + + // TODO 节点如果已经被删除,则忽略 + _, err := this.Query(tx). + Table(table). + Between("hour", hourFrom, hourTo). + Result("domain, MIN(serverId) AS serverId, SUM(bytes) AS bytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests, SUM(countAttackRequests) AS countAttackRequests, SUM(attackBytes) AS attackBytes"). + Group("domain"). + Desc("countRequests"). + Limit(size). + Slice(&topResults). + FindAll() + if err != nil { + resultErr = err + return + } + + if len(topResults) > 0 { + locker.Lock() + result = append(result, topResults...) + locker.Unlock() + } + }(table) + } + wg.Wait() + + sort.Slice(result, func(i, j int) bool { + return result[i].CountRequests > result[j].CountRequests + }) + + if len(result) > types.Int(size) { + result = result[:types.Int(size)] + } + return } // FindTopDomainStatsWithClusterId 取得集群上的一定时间内的域名排行数据 -func (this *ServerDomainHourlyStatDAO) FindTopDomainStatsWithClusterId(tx *dbs.Tx, clusterId int64, hourFrom string, hourTo string, size int64) (result []*ServerDomainHourlyStat, err error) { - // TODO 节点如果已经被删除,则忽略 - _, err = this.Query(tx). - Attr("clusterId", clusterId). - Between("hour", hourFrom, hourTo). - Result("domain, MIN(serverId) AS serverId, SUM(bytes) AS bytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests, SUM(countAttackRequests) AS countAttackRequests, SUM(attackBytes) AS attackBytes"). - Group("domain"). - Desc("countRequests"). - Limit(size). - Slice(&result). - FindAll() +func (this *ServerDomainHourlyStatDAO) FindTopDomainStatsWithClusterId(tx *dbs.Tx, clusterId int64, hourFrom string, hourTo string, size int64) (result []*ServerDomainHourlyStat, resultErr error) { + var tables = this.FindAllPartitionTables() + var wg = sync.WaitGroup{} + wg.Add(len(tables)) + var locker = sync.Mutex{} + + for _, table := range tables { + go func(table string) { + defer wg.Done() + + var topResults = []*ServerDomainHourlyStat{} + + // TODO 节点如果已经被删除,则忽略 + _, err := this.Query(tx). + Table(table). + Attr("clusterId", clusterId). + Between("hour", hourFrom, hourTo). + Result("domain, MIN(serverId) AS serverId, SUM(bytes) AS bytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests, SUM(countAttackRequests) AS countAttackRequests, SUM(attackBytes) AS attackBytes"). + Group("domain"). + Desc("countRequests"). + Limit(size). + Slice(&topResults). + FindAll() + if err != nil { + resultErr = err + return + } + + if len(topResults) > 0 { + locker.Lock() + result = append(result, topResults...) + locker.Unlock() + } + }(table) + } + wg.Wait() + + sort.Slice(result, func(i, j int) bool { + return result[i].CountRequests > result[j].CountRequests + }) + + if len(result) > types.Int(size) { + result = result[:types.Int(size)] + } + return } // FindTopDomainStatsWithNodeId 取得节点上的一定时间内的域名排行数据 -func (this *ServerDomainHourlyStatDAO) FindTopDomainStatsWithNodeId(tx *dbs.Tx, nodeId int64, hourFrom string, hourTo string, size int64) (result []*ServerDomainHourlyStat, err error) { - // TODO 节点如果已经被删除,则忽略 - _, err = this.Query(tx). - Attr("nodeId", nodeId). - Between("hour", hourFrom, hourTo). - Result("domain, MIN(serverId) AS serverId, SUM(bytes) AS bytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests, SUM(countAttackRequests) AS countAttackRequests, SUM(attackBytes) AS attackBytes"). - Group("domain"). - Desc("countRequests"). - Limit(size). - Slice(&result). - FindAll() +func (this *ServerDomainHourlyStatDAO) FindTopDomainStatsWithNodeId(tx *dbs.Tx, nodeId int64, hourFrom string, hourTo string, size int64) (result []*ServerDomainHourlyStat, resultErr error) { + var tables = this.FindAllPartitionTables() + var wg = sync.WaitGroup{} + wg.Add(len(tables)) + var locker = sync.Mutex{} + + for _, table := range tables { + go func(table string) { + defer wg.Done() + + var topResults = []*ServerDomainHourlyStat{} + + // TODO 节点如果已经被删除,则忽略 + _, err := this.Query(tx). + Table(table). + Attr("nodeId", nodeId). + Between("hour", hourFrom, hourTo). + Result("domain, MIN(serverId) AS serverId, SUM(bytes) AS bytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests, SUM(countAttackRequests) AS countAttackRequests, SUM(attackBytes) AS attackBytes"). + Group("domain"). + Desc("countRequests"). + Limit(size). + Slice(&topResults). + FindAll() + if err != nil { + resultErr = err + return + } + + if len(topResults) > 0 { + locker.Lock() + result = append(result, topResults...) + locker.Unlock() + } + }(table) + } + wg.Wait() + + sort.Slice(result, func(i, j int) bool { + return result[i].CountRequests > result[j].CountRequests + }) + + if len(result) > types.Int(size) { + result = result[:types.Int(size)] + } + return } // FindTopDomainStatsWithServerId 取得某个服务的一定时间内的域名排行数据 -func (this *ServerDomainHourlyStatDAO) FindTopDomainStatsWithServerId(tx *dbs.Tx, serverId int64, hourFrom string, hourTo string, size int64) (result []*ServerDomainHourlyStat, err error) { - // TODO 节点如果已经被删除,则忽略 - _, err = this.Query(tx). - Attr("serverId", serverId). - Between("hour", hourFrom, hourTo). - Result("domain, MIN(serverId) AS serverId, SUM(bytes) AS bytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests, SUM(countAttackRequests) AS countAttackRequests, SUM(attackBytes) AS attackBytes"). - Group("domain"). - Desc("countRequests"). - Limit(size). - Slice(&result). - FindAll() +func (this *ServerDomainHourlyStatDAO) FindTopDomainStatsWithServerId(tx *dbs.Tx, serverId int64, hourFrom string, hourTo string, size int64) (result []*ServerDomainHourlyStat, resultErr error) { + var tables = this.FindAllPartitionTables() + var wg = sync.WaitGroup{} + wg.Add(len(tables)) + var locker = sync.Mutex{} + + for _, table := range tables { + go func(table string) { + defer wg.Done() + + var topResults = []*ServerDomainHourlyStat{} + + // TODO 节点如果已经被删除,则忽略 + _, err := this.Query(tx). + Table(table). + Attr("serverId", serverId). + Between("hour", hourFrom, hourTo). + Result("domain, MIN(serverId) AS serverId, SUM(bytes) AS bytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests, SUM(countAttackRequests) AS countAttackRequests, SUM(attackBytes) AS attackBytes"). + Group("domain"). + Desc("countRequests"). + Limit(size). + Slice(&topResults). + FindAll() + if err != nil { + resultErr = err + return + } + + if len(topResults) > 0 { + locker.Lock() + result = append(result, topResults...) + locker.Unlock() + } + }(table) + } + wg.Wait() + + sort.Slice(result, func(i, j int) bool { + return result[i].CountRequests > result[j].CountRequests + }) + + if len(result) > types.Int(size) { + result = result[:types.Int(size)] + } + return } // Clean 清理历史数据 func (this *ServerDomainHourlyStatDAO) Clean(tx *dbs.Tx, days int) error { var hour = timeutil.Format("Ymd00", time.Now().AddDate(0, 0, -days)) - _, err := this.Query(tx). - Lt("hour", hour). - Delete() - return err + for _, table := range this.FindAllPartitionTables() { + _, err := this.Query(tx). + Table(table). + Lt("hour", hour). + Delete() + return err + } + return nil } diff --git a/internal/db/models/stats/server_domain_hourly_stat_dao_test.go b/internal/db/models/stats/server_domain_hourly_stat_dao_test.go index 2c1cd671..ccc8b547 100644 --- a/internal/db/models/stats/server_domain_hourly_stat_dao_test.go +++ b/internal/db/models/stats/server_domain_hourly_stat_dao_test.go @@ -1,6 +1,74 @@ package stats import ( + "fmt" _ "github.com/go-sql-driver/mysql" + "github.com/iwind/TeaGo/assert" _ "github.com/iwind/TeaGo/bootstrap" + "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/rands" + "github.com/iwind/TeaGo/types" + timeutil "github.com/iwind/TeaGo/utils/time" + "testing" + "time" ) + +func TestServerDomainHourlyStatDAO_PartitionTable(t *testing.T) { + var a = assert.NewAssertion(t) + + var dao = NewServerDomainHourlyStatDAO() + a.IsTrue(dao.PartitionTable("") == "edgeServerDomainHourlyStats_0") + a.IsTrue(dao.PartitionTable("a1") == "edgeServerDomainHourlyStats_a") + a.IsTrue(dao.PartitionTable("Y1") == "edgeServerDomainHourlyStats_y") + a.IsTrue(dao.PartitionTable("z1") == "edgeServerDomainHourlyStats_z") + a.IsTrue(dao.PartitionTable("A1") == "edgeServerDomainHourlyStats_a") + a.IsTrue(dao.PartitionTable("Z1") == "edgeServerDomainHourlyStats_z") + a.IsTrue(dao.PartitionTable("中国") == "edgeServerDomainHourlyStats_0") + a.IsTrue(dao.PartitionTable("_") == "edgeServerDomainHourlyStats_0") + a.IsTrue(dao.PartitionTable(" ") == "edgeServerDomainHourlyStats_0") +} + +func TestServerDomainHourlyStatDAO_FindAllPartitionTables(t *testing.T) { + var dao = NewServerDomainHourlyStatDAO() + t.Log(dao.FindAllPartitionTables()) +} + +func TestServerDomainHourlyStatDAO_IncreaseHourlyStat(t *testing.T) { + dbs.NotifyReady() + + for i := 0; i < 1_000_000; i++ { + var f = string([]rune{int32(rands.Int('0', '9'))}) + + err := NewServerDomainHourlyStatDAO().IncreaseHourlyStat(nil, 18, 48, 23, f+"rand"+types.String(i%500_000)+".com", timeutil.Format("Ymd")+fmt.Sprintf("%02d", rands.Int(0, 23)), 1, 1, 1, 1, 1, 1) + if err != nil { + t.Fatal(err) + } + if i%10000 == 0 { + t.Log(i) + } + } +} + +func TestServerDomainHourlyStatDAO_FindTopDomainStats(t *testing.T) { + var dao = NewServerDomainHourlyStatDAO() + var before = time.Now() + defer func() { + t.Log(time.Since(before).Seconds()*1000, "ms") + }() + stats, err := dao.FindTopDomainStats(nil, timeutil.Format("Ymd00"), timeutil.Format("Ymd23"), 10) + if err != nil { + t.Fatal(err) + } + for _, stat := range stats { + t.Log(stat.Domain, stat.CountRequests) + } +} + +func TestServerDomainHourlyStatDAO_Clean(t *testing.T) { + var dao = NewServerDomainHourlyStatDAO() + err := dao.Clean(nil, 10) + if err != nil { + t.Fatal(err) + } + t.Log("ok") +} diff --git a/internal/db/models/stats/traffic_hourly_stat_dao_test.go b/internal/db/models/stats/traffic_hourly_stat_dao_test.go index fc49b4ee..e0053787 100644 --- a/internal/db/models/stats/traffic_hourly_stat_dao_test.go +++ b/internal/db/models/stats/traffic_hourly_stat_dao_test.go @@ -12,7 +12,7 @@ func TestTrafficHourlyStatDAO_IncreaseDayBytes(t *testing.T) { dbs.NotifyReady() now := time.Now() - err := SharedTrafficHourlyStatDAO.IncreaseHourlyBytes(nil, timeutil.Format("YmdH"), 1) + err := SharedTrafficHourlyStatDAO.IncreaseHourlyStat(nil, timeutil.Format("YmdH"), 1, 1, 1, 1, 1, 1) if err != nil { t.Fatal(err) }