2021-07-05 11:37:22 +08:00
package stats
import (
2023-07-01 17:54:40 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
2021-07-05 11:37:22 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/errors"
2021-12-14 10:49:29 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/goman"
2021-07-05 11:37:22 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/maps"
2021-07-19 21:01:26 +08:00
"github.com/iwind/TeaGo/rands"
2021-09-08 17:32:08 +08:00
"github.com/iwind/TeaGo/types"
2021-07-05 11:37:22 +08:00
timeutil "github.com/iwind/TeaGo/utils/time"
2021-09-08 17:32:08 +08:00
"sort"
"strings"
"sync"
2021-07-05 11:37:22 +08:00
"time"
)
type ServerDomainHourlyStatDAO dbs . DAO
func init ( ) {
dbs . OnReadyDone ( func ( ) {
// 清理数据任务
2021-07-19 21:01:26 +08:00
var ticker = time . NewTicker ( time . Duration ( rands . Int ( 24 , 48 ) ) * time . Hour )
2021-12-14 10:49:29 +08:00
goman . New ( func ( ) {
2021-07-05 11:37:22 +08:00
for range ticker . C {
2023-07-01 17:54:40 +08:00
err := SharedServerDomainHourlyStatDAO . CleanDefaultDays ( nil , 7 ) // 只保留 N 天
2021-07-05 11:37:22 +08:00
if err != nil {
remotelogs . Error ( "ServerDomainHourlyStatDAO" , "clean expired data failed: " + err . Error ( ) )
}
}
2021-12-14 10:49:29 +08:00
} )
2021-07-05 11:37:22 +08:00
} )
}
func NewServerDomainHourlyStatDAO ( ) * ServerDomainHourlyStatDAO {
return dbs . NewDAO ( & ServerDomainHourlyStatDAO {
DAOObject : dbs . DAOObject {
DB : Tea . Env ,
Table : "edgeServerDomainHourlyStats" ,
Model : new ( ServerDomainHourlyStat ) ,
PkName : "id" ,
} ,
} ) . ( * ServerDomainHourlyStatDAO )
}
var SharedServerDomainHourlyStatDAO * ServerDomainHourlyStatDAO
func init ( ) {
dbs . OnReady ( func ( ) {
SharedServerDomainHourlyStatDAO = NewServerDomainHourlyStatDAO ( )
} )
}
2021-09-08 17:32:08 +08:00
// PartitionTable 获取分区表格名称
func ( this * ServerDomainHourlyStatDAO ) PartitionTable ( domain string ) string {
if len ( domain ) == 0 {
return this . Table + "_0"
}
if ( domain [ 0 ] >= '0' && domain [ 0 ] <= '9' ) || ( domain [ 0 ] >= 'a' && domain [ 0 ] <= 'z' ) || ( domain [ 0 ] >= 'A' && domain [ 0 ] <= 'Z' ) {
return this . Table + "_" + strings . ToLower ( string ( domain [ 0 ] ) )
}
return this . Table + "_0"
}
// FindAllPartitionTables 获取所有表格名称
func ( this * ServerDomainHourlyStatDAO ) FindAllPartitionTables ( ) [ ] string {
var tables = [ ] string { }
for i := '0' ; i <= '9' ; i ++ {
tables = append ( tables , this . Table + "_" + string ( i ) )
}
for i := 'a' ; i <= 'z' ; i ++ {
tables = append ( tables , this . Table + "_" + string ( i ) )
}
return tables
}
2021-07-05 11:37:22 +08:00
// IncreaseHourlyStat 增加统计数据
2021-07-13 11:04:45 +08:00
func ( this * ServerDomainHourlyStatDAO ) IncreaseHourlyStat ( tx * dbs . Tx , clusterId int64 , nodeId int64 , serverId int64 , domain string , hour string , bytes int64 , cachedBytes int64 , countRequests int64 , countCachedRequests int64 , countAttackRequests int64 , attackBytes int64 ) error {
2021-07-05 11:37:22 +08:00
if len ( hour ) != 10 {
return errors . New ( "invalid hour '" + hour + "'" )
}
2023-11-02 17:19:56 +08:00
if len ( domain ) == 0 || len ( domain ) > 64 {
2021-09-08 17:32:08 +08:00
return nil
}
2021-07-05 11:37:22 +08:00
err := this . Query ( tx ) .
2021-09-08 17:32:08 +08:00
Table ( this . PartitionTable ( domain ) ) .
2021-07-05 11:37:22 +08:00
Param ( "bytes" , bytes ) .
Param ( "cachedBytes" , cachedBytes ) .
Param ( "countRequests" , countRequests ) .
Param ( "countCachedRequests" , countCachedRequests ) .
2021-07-13 11:04:45 +08:00
Param ( "countAttackRequests" , countAttackRequests ) .
Param ( "attackBytes" , attackBytes ) .
2021-07-05 11:37:22 +08:00
InsertOrUpdateQuickly ( maps . Map {
"clusterId" : clusterId ,
"nodeId" : nodeId ,
"serverId" : serverId ,
"hour" : hour ,
"domain" : domain ,
"bytes" : bytes ,
"cachedBytes" : cachedBytes ,
"countRequests" : countRequests ,
"countCachedRequests" : countCachedRequests ,
2021-07-13 11:04:45 +08:00
"countAttackRequests" : countAttackRequests ,
"attackBytes" : attackBytes ,
2021-07-05 11:37:22 +08:00
} , maps . Map {
"bytes" : dbs . SQL ( "bytes+:bytes" ) ,
"cachedBytes" : dbs . SQL ( "cachedBytes+:cachedBytes" ) ,
"countRequests" : dbs . SQL ( "countRequests+:countRequests" ) ,
"countCachedRequests" : dbs . SQL ( "countCachedRequests+:countCachedRequests" ) ,
2021-07-13 11:04:45 +08:00
"countAttackRequests" : dbs . SQL ( "countAttackRequests+:countAttackRequests" ) ,
"attackBytes" : dbs . SQL ( "attackBytes+:attackBytes" ) ,
2021-07-05 11:37:22 +08:00
} )
if err != nil {
return err
}
return nil
}
2021-07-13 11:04:45 +08:00
// FindTopDomainStats 取得一定时间内的域名排行数据
2021-09-08 17:32:08 +08:00
func ( this * ServerDomainHourlyStatDAO ) FindTopDomainStats ( tx * dbs . Tx , hourFrom string , hourTo string , size int64 ) ( result [ ] * ServerDomainHourlyStat , resultErr error ) {
var tables = this . FindAllPartitionTables ( )
var wg = sync . WaitGroup { }
wg . Add ( len ( tables ) )
var locker = sync . Mutex { }
for _ , table := range tables {
go func ( table string ) {
defer wg . Done ( )
var topResults = [ ] * ServerDomainHourlyStat { }
// TODO 节点如果已经被删除,则忽略
_ , err := this . Query ( tx ) .
Table ( table ) .
Between ( "hour" , hourFrom , hourTo ) .
Result ( "domain, MIN(serverId) AS serverId, SUM(bytes) AS bytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests, SUM(countAttackRequests) AS countAttackRequests, SUM(attackBytes) AS attackBytes" ) .
Group ( "domain" ) .
Desc ( "countRequests" ) .
Limit ( size ) .
Slice ( & topResults ) .
FindAll ( )
if err != nil {
resultErr = err
return
}
if len ( topResults ) > 0 {
2021-10-09 16:01:29 +08:00
locker . Lock ( )
result = append ( result , topResults ... )
locker . Unlock ( )
}
} ( table )
}
wg . Wait ( )
sort . Slice ( result , func ( i , j int ) bool {
return result [ i ] . CountRequests > result [ j ] . CountRequests
} )
if len ( result ) > types . Int ( size ) {
result = result [ : types . Int ( size ) ]
}
return
}
// FindTopDomainStatsWithAttack 取得一定时间内的域名排行数据
func ( this * ServerDomainHourlyStatDAO ) FindTopDomainStatsWithAttack ( tx * dbs . Tx , hourFrom string , hourTo string , size int64 ) ( result [ ] * ServerDomainHourlyStat , resultErr error ) {
var tables = this . FindAllPartitionTables ( )
var wg = sync . WaitGroup { }
wg . Add ( len ( tables ) )
var locker = sync . Mutex { }
for _ , table := range tables {
go func ( table string ) {
defer wg . Done ( )
var topResults = [ ] * ServerDomainHourlyStat { }
// TODO 节点如果已经被删除,则忽略
_ , err := this . Query ( tx ) .
Table ( table ) .
Gt ( "countAttackRequests" , 0 ) .
Between ( "hour" , hourFrom , hourTo ) .
Result ( "domain, MIN(serverId) AS serverId, SUM(bytes) AS bytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests, SUM(countAttackRequests) AS countAttackRequests, SUM(attackBytes) AS attackBytes" ) .
Group ( "domain" ) .
Desc ( "countRequests" ) .
Limit ( size ) .
Slice ( & topResults ) .
FindAll ( )
if err != nil {
resultErr = err
return
}
if len ( topResults ) > 0 {
2021-09-08 17:32:08 +08:00
locker . Lock ( )
result = append ( result , topResults ... )
locker . Unlock ( )
}
} ( table )
}
wg . Wait ( )
sort . Slice ( result , func ( i , j int ) bool {
return result [ i ] . CountRequests > result [ j ] . CountRequests
} )
if len ( result ) > types . Int ( size ) {
result = result [ : types . Int ( size ) ]
}
2021-07-13 11:04:45 +08:00
return
}
2021-07-06 20:06:34 +08:00
// FindTopDomainStatsWithClusterId 取得集群上的一定时间内的域名排行数据
2021-09-08 17:32:08 +08:00
func ( this * ServerDomainHourlyStatDAO ) FindTopDomainStatsWithClusterId ( tx * dbs . Tx , clusterId int64 , hourFrom string , hourTo string , size int64 ) ( result [ ] * ServerDomainHourlyStat , resultErr error ) {
var tables = this . FindAllPartitionTables ( )
var wg = sync . WaitGroup { }
wg . Add ( len ( tables ) )
var locker = sync . Mutex { }
for _ , table := range tables {
go func ( table string ) {
defer wg . Done ( )
var topResults = [ ] * ServerDomainHourlyStat { }
// TODO 节点如果已经被删除,则忽略
_ , err := this . Query ( tx ) .
Table ( table ) .
Attr ( "clusterId" , clusterId ) .
Between ( "hour" , hourFrom , hourTo ) .
2021-09-16 09:23:36 +08:00
UseIndex ( "hour" ) .
2021-09-08 17:32:08 +08:00
Result ( "domain, MIN(serverId) AS serverId, SUM(bytes) AS bytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests, SUM(countAttackRequests) AS countAttackRequests, SUM(attackBytes) AS attackBytes" ) .
Group ( "domain" ) .
Desc ( "countRequests" ) .
Limit ( size ) .
Slice ( & topResults ) .
FindAll ( )
if err != nil {
resultErr = err
return
}
if len ( topResults ) > 0 {
locker . Lock ( )
result = append ( result , topResults ... )
locker . Unlock ( )
}
} ( table )
}
wg . Wait ( )
sort . Slice ( result , func ( i , j int ) bool {
return result [ i ] . CountRequests > result [ j ] . CountRequests
} )
if len ( result ) > types . Int ( size ) {
result = result [ : types . Int ( size ) ]
}
2021-07-05 11:37:22 +08:00
return
}
2021-07-06 20:06:34 +08:00
// FindTopDomainStatsWithNodeId 取得节点上的一定时间内的域名排行数据
2021-09-08 17:32:08 +08:00
func ( this * ServerDomainHourlyStatDAO ) FindTopDomainStatsWithNodeId ( tx * dbs . Tx , nodeId int64 , hourFrom string , hourTo string , size int64 ) ( result [ ] * ServerDomainHourlyStat , resultErr error ) {
var tables = this . FindAllPartitionTables ( )
var wg = sync . WaitGroup { }
wg . Add ( len ( tables ) )
var locker = sync . Mutex { }
for _ , table := range tables {
go func ( table string ) {
defer wg . Done ( )
var topResults = [ ] * ServerDomainHourlyStat { }
// TODO 节点如果已经被删除,则忽略
_ , err := this . Query ( tx ) .
Table ( table ) .
Attr ( "nodeId" , nodeId ) .
Between ( "hour" , hourFrom , hourTo ) .
2021-09-16 09:23:36 +08:00
UseIndex ( "hour" ) .
2021-09-08 17:32:08 +08:00
Result ( "domain, MIN(serverId) AS serverId, SUM(bytes) AS bytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests, SUM(countAttackRequests) AS countAttackRequests, SUM(attackBytes) AS attackBytes" ) .
Group ( "domain" ) .
Desc ( "countRequests" ) .
Limit ( size ) .
Slice ( & topResults ) .
FindAll ( )
if err != nil {
resultErr = err
return
}
if len ( topResults ) > 0 {
locker . Lock ( )
result = append ( result , topResults ... )
locker . Unlock ( )
}
} ( table )
}
wg . Wait ( )
sort . Slice ( result , func ( i , j int ) bool {
return result [ i ] . CountRequests > result [ j ] . CountRequests
} )
if len ( result ) > types . Int ( size ) {
result = result [ : types . Int ( size ) ]
}
2021-07-06 20:06:34 +08:00
return
}
2021-07-07 19:55:37 +08:00
// FindTopDomainStatsWithServerId 取得某个服务的一定时间内的域名排行数据
2021-09-08 17:32:08 +08:00
func ( this * ServerDomainHourlyStatDAO ) FindTopDomainStatsWithServerId ( tx * dbs . Tx , serverId int64 , hourFrom string , hourTo string , size int64 ) ( result [ ] * ServerDomainHourlyStat , resultErr error ) {
var tables = this . FindAllPartitionTables ( )
var wg = sync . WaitGroup { }
wg . Add ( len ( tables ) )
var locker = sync . Mutex { }
for _ , table := range tables {
go func ( table string ) {
defer wg . Done ( )
var topResults = [ ] * ServerDomainHourlyStat { }
// TODO 节点如果已经被删除,则忽略
_ , err := this . Query ( tx ) .
Table ( table ) .
Attr ( "serverId" , serverId ) .
Between ( "hour" , hourFrom , hourTo ) .
2022-03-27 16:39:54 +08:00
UseIndex ( "serverId" , "hour" ) .
2021-09-08 17:32:08 +08:00
Result ( "domain, MIN(serverId) AS serverId, SUM(bytes) AS bytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests, SUM(countAttackRequests) AS countAttackRequests, SUM(attackBytes) AS attackBytes" ) .
Group ( "domain" ) .
Desc ( "countRequests" ) .
Limit ( size ) .
Slice ( & topResults ) .
FindAll ( )
if err != nil {
resultErr = err
return
}
if len ( topResults ) > 0 {
locker . Lock ( )
result = append ( result , topResults ... )
locker . Unlock ( )
}
} ( table )
}
wg . Wait ( )
sort . Slice ( result , func ( i , j int ) bool {
return result [ i ] . CountRequests > result [ j ] . CountRequests
} )
if len ( result ) > types . Int ( size ) {
result = result [ : types . Int ( size ) ]
}
2021-07-07 19:55:37 +08:00
return
}
2023-07-01 17:54:40 +08:00
// CleanDays 清理历史数据
func ( this * ServerDomainHourlyStatDAO ) CleanDays ( tx * dbs . Tx , days int ) error {
2021-07-05 11:37:22 +08:00
var hour = timeutil . Format ( "Ymd00" , time . Now ( ) . AddDate ( 0 , 0 , - days ) )
2021-09-08 17:32:08 +08:00
for _ , table := range this . FindAllPartitionTables ( ) {
_ , err := this . Query ( tx ) .
Table ( table ) .
Lt ( "hour" , hour ) .
Delete ( )
2022-01-20 11:40:28 +08:00
if err != nil {
return err
}
2021-09-08 17:32:08 +08:00
}
return nil
2021-07-05 11:37:22 +08:00
}
2023-07-01 17:54:40 +08:00
func ( this * ServerDomainHourlyStatDAO ) CleanDefaultDays ( tx * dbs . Tx , defaultDays int ) error {
databaseConfig , err := models . SharedSysSettingDAO . ReadDatabaseConfig ( tx )
if err != nil {
return err
}
if databaseConfig != nil && databaseConfig . ServerDomainHourlyStat . Clean . Days > 0 {
defaultDays = databaseConfig . ServerDomainHourlyStat . Clean . Days
}
if defaultDays <= 0 {
defaultDays = 7
}
return this . CleanDays ( tx , defaultDays )
}