2020-10-10 11:49:21 +08:00
package models
import (
2021-12-14 12:44:57 +08:00
"bytes"
2020-10-10 11:49:21 +08:00
"encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
2021-12-14 12:44:57 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
2021-08-07 22:04:22 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/utils"
2021-12-14 12:44:57 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/zero"
2020-10-10 11:49:21 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
2021-12-14 12:44:57 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
2021-08-07 22:04:22 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
2021-12-14 12:44:57 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs"
2020-10-10 11:49:21 +08:00
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
2020-10-10 19:21:32 +08:00
"github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/logs"
2021-12-14 12:44:57 +08:00
"github.com/iwind/TeaGo/rands"
2020-11-02 21:15:31 +08:00
"github.com/iwind/TeaGo/types"
2020-10-10 11:49:21 +08:00
timeutil "github.com/iwind/TeaGo/utils/time"
2021-07-14 22:43:31 +08:00
"net"
2021-06-04 10:15:41 +08:00
"net/http"
2020-11-02 21:15:31 +08:00
"regexp"
2020-10-10 19:21:32 +08:00
"sort"
2020-10-10 11:49:21 +08:00
"strings"
2020-10-10 19:21:32 +08:00
"sync"
2020-10-10 11:49:21 +08:00
"time"
)
type HTTPAccessLogDAO dbs . DAO
2020-10-13 20:05:13 +08:00
var SharedHTTPAccessLogDAO * HTTPAccessLogDAO
2021-12-14 12:44:57 +08:00
// 队列
2022-03-08 19:55:39 +08:00
var (
oldAccessLogQueue = make ( chan * pb . HTTPAccessLog )
accessLogQueue = make ( chan * pb . HTTPAccessLog , 10_000 )
accessLogQueueMaxLength = 100_000
accessLogQueuePercent = 100 // 0-100
accessLogCountPerSecond = 10_000 // 0 表示不限制
accessLogConfigJSON = [ ] byte { }
accessLogQueueChanged = make ( chan zero . Zero , 1 )
accessLogEnableAutoPartial = true // 是否启用自动分表
2022-03-09 10:01:24 +08:00
accessLogRowsPerTable int64 = 500_000 // 自动分表的单表最大值
2022-03-08 19:55:39 +08:00
)
type accessLogTableQuery struct {
daoWrapper * HTTPAccessLogDAOWrapper
name string
hasRemoteAddrField bool
hasDomainField bool
}
2021-12-14 12:44:57 +08:00
2020-10-13 20:05:13 +08:00
func init ( ) {
dbs . OnReady ( func ( ) {
SharedHTTPAccessLogDAO = NewHTTPAccessLogDAO ( )
} )
2021-12-14 12:44:57 +08:00
// 队列相关
dbs . OnReadyDone ( func ( ) {
// 检查队列变化
goman . New ( func ( ) {
var ticker = time . NewTicker ( 60 * time . Second )
// 先执行一次初始化
SharedHTTPAccessLogDAO . SetupQueue ( )
// 循环执行
for {
select {
case <- ticker . C :
SharedHTTPAccessLogDAO . SetupQueue ( )
case <- accessLogQueueChanged :
SharedHTTPAccessLogDAO . SetupQueue ( )
}
}
} )
// 导出队列内容
goman . New ( func ( ) {
var ticker = time . NewTicker ( 1 * time . Second )
for range ticker . C {
var tx * dbs . Tx
err := SharedHTTPAccessLogDAO . DumpAccessLogsFromQueue ( tx , accessLogCountPerSecond )
if err != nil {
remotelogs . Error ( "HTTP_ACCESS_LOG_QUEUE" , "dump access logs failed: " + err . Error ( ) )
}
}
} )
} )
2020-10-13 20:05:13 +08:00
}
2020-10-10 11:49:21 +08:00
func NewHTTPAccessLogDAO ( ) * HTTPAccessLogDAO {
return dbs . NewDAO ( & HTTPAccessLogDAO {
DAOObject : dbs . DAOObject {
DB : Tea . Env ,
Table : "edgeHTTPAccessLogs" ,
Model : new ( HTTPAccessLog ) ,
PkName : "id" ,
} ,
} ) . ( * HTTPAccessLogDAO )
}
2021-06-02 11:53:24 +08:00
// CreateHTTPAccessLogs 创建访问日志
2021-01-01 23:31:30 +08:00
func ( this * HTTPAccessLogDAO ) CreateHTTPAccessLogs ( tx * dbs . Tx , accessLogs [ ] * pb . HTTPAccessLog ) error {
2021-12-14 12:44:57 +08:00
// 写入队列
var queue = accessLogQueue // 这样写非常重要,防止在写入过程中队列有切换
for _ , accessLog := range accessLogs {
if accessLog . FirewallPolicyId == 0 { // 如果是WAF记录, 则采取采样率
// 采样率
if accessLogQueuePercent <= 0 {
return nil
}
if accessLogQueuePercent < 100 && rands . Int ( 1 , 100 ) > accessLogQueuePercent {
return nil
}
}
select {
case queue <- accessLog :
default :
// 超出的丢弃
}
}
return nil
}
// DumpAccessLogsFromQueue 从队列导入访问日志
func ( this * HTTPAccessLogDAO ) DumpAccessLogsFromQueue ( tx * dbs . Tx , size int ) error {
2022-03-08 19:55:39 +08:00
var dao = randomHTTPAccessLogDAO ( )
2020-10-10 11:49:21 +08:00
if dao == nil {
2020-10-10 19:21:32 +08:00
dao = & HTTPAccessLogDAOWrapper {
DAO : SharedHTTPAccessLogDAO ,
NodeId : 0 ,
}
2020-10-10 11:49:21 +08:00
}
2021-12-14 12:44:57 +08:00
if size <= 0 {
size = 1_000_000
2020-10-10 11:49:21 +08:00
}
2021-12-14 12:44:57 +08:00
// 复制变量,防止中途改变
var oldQueue = oldAccessLogQueue
var newQueue = accessLogQueue
2020-10-10 19:21:32 +08:00
2021-12-14 12:44:57 +08:00
Loop :
for i := 0 ; i < size ; i ++ {
// old
select {
case accessLog := <- oldQueue :
err := this . CreateHTTPAccessLog ( tx , dao . DAO , accessLog )
if err != nil {
return err
}
continue Loop
default :
2020-10-10 11:49:21 +08:00
}
2021-12-14 12:44:57 +08:00
// new
select {
case accessLog := <- newQueue :
err := this . CreateHTTPAccessLog ( tx , dao . DAO , accessLog )
if err != nil {
return err
}
continue Loop
default :
break Loop
2021-12-07 14:57:55 +08:00
}
2021-12-14 12:44:57 +08:00
}
2021-12-07 14:57:55 +08:00
2021-12-14 12:44:57 +08:00
return nil
}
2021-07-14 22:43:31 +08:00
2021-12-14 12:44:57 +08:00
// CreateHTTPAccessLog 写入单条访问日志
func ( this * HTTPAccessLogDAO ) CreateHTTPAccessLog ( tx * dbs . Tx , dao * HTTPAccessLogDAO , accessLog * pb . HTTPAccessLog ) error {
2022-03-08 19:55:39 +08:00
var day = timeutil . Format ( "Ymd" , time . Unix ( accessLog . Timestamp , 0 ) )
tableDef , err := SharedHTTPAccessLogManager . FindTable ( dao . Instance , day , true )
2021-12-14 12:44:57 +08:00
if err != nil {
return err
}
2020-10-10 11:49:21 +08:00
2021-12-14 12:44:57 +08:00
fields := map [ string ] interface { } { }
fields [ "serverId" ] = accessLog . ServerId
fields [ "nodeId" ] = accessLog . NodeId
fields [ "status" ] = accessLog . Status
fields [ "createdAt" ] = accessLog . Timestamp
fields [ "requestId" ] = accessLog . RequestId
fields [ "firewallPolicyId" ] = accessLog . FirewallPolicyId
fields [ "firewallRuleGroupId" ] = accessLog . FirewallRuleGroupId
fields [ "firewallRuleSetId" ] = accessLog . FirewallRuleSetId
fields [ "firewallRuleId" ] = accessLog . FirewallRuleId
if len ( accessLog . RequestBody ) > 0 {
fields [ "requestBody" ] = accessLog . RequestBody
accessLog . RequestBody = nil
}
if tableDef . HasRemoteAddr {
fields [ "remoteAddr" ] = accessLog . RemoteAddr
}
if tableDef . HasDomain {
fields [ "domain" ] = accessLog . Host
}
content , err := json . Marshal ( accessLog )
if err != nil {
return err
}
fields [ "content" ] = content
2022-03-08 19:55:39 +08:00
var lastId int64
lastId , err = dao . Query ( tx ) .
2021-12-14 12:44:57 +08:00
Table ( tableDef . Name ) .
Sets ( fields ) .
Insert ( )
if err != nil {
2022-03-08 19:55:39 +08:00
return err
}
2022-03-09 10:01:24 +08:00
if accessLogEnableAutoPartial && accessLogRowsPerTable > 0 && lastId % accessLogRowsPerTable == 0 {
2022-03-08 19:55:39 +08:00
SharedHTTPAccessLogManager . ResetTable ( dao . Instance , day )
2020-10-10 11:49:21 +08:00
}
return nil
}
2020-10-10 19:21:32 +08:00
2021-06-02 11:53:24 +08:00
// ListAccessLogs 读取往前的 单页访问日志
2021-08-07 22:04:22 +08:00
func ( this * HTTPAccessLogDAO ) ListAccessLogs ( tx * dbs . Tx , lastRequestId string ,
size int64 ,
day string ,
2022-03-09 11:00:54 +08:00
hourFrom string ,
hourTo string ,
2022-01-11 12:03:20 +08:00
clusterId int64 ,
nodeId int64 ,
2021-08-07 22:04:22 +08:00
serverId int64 ,
reverse bool ,
hasError bool ,
firewallPolicyId int64 ,
firewallRuleGroupId int64 ,
firewallRuleSetId int64 ,
hasFirewallPolicy bool ,
userId int64 ,
keyword string ,
ip string ,
domain string ) ( result [ ] * HTTPAccessLog , nextLastRequestId string , hasMore bool , err error ) {
2020-10-10 19:21:32 +08:00
if len ( day ) != 8 {
return
}
// 限制能查询的最大条数,防止占用内存过多
if size > 1000 {
size = 1000
}
2022-03-09 11:00:54 +08:00
result , nextLastRequestId , err = this . listAccessLogs ( tx , lastRequestId , size , day , hourFrom , hourTo , clusterId , nodeId , serverId , reverse , hasError , firewallPolicyId , firewallRuleGroupId , firewallRuleSetId , hasFirewallPolicy , userId , keyword , ip , domain )
2020-10-10 19:21:32 +08:00
if err != nil || int64 ( len ( result ) ) < size {
return
}
2022-03-09 11:00:54 +08:00
moreResult , _ , _ := this . listAccessLogs ( tx , nextLastRequestId , 1 , day , hourFrom , hourTo , clusterId , nodeId , serverId , reverse , hasError , firewallPolicyId , firewallRuleGroupId , firewallRuleSetId , hasFirewallPolicy , userId , keyword , ip , domain )
2020-10-10 19:21:32 +08:00
hasMore = len ( moreResult ) > 0
return
}
// 读取往前的单页访问日志
2022-03-09 11:00:54 +08:00
func ( this * HTTPAccessLogDAO ) listAccessLogs ( tx * dbs . Tx ,
lastRequestId string ,
size int64 ,
day string ,
hourFrom string ,
hourTo string ,
clusterId int64 ,
nodeId int64 ,
serverId int64 ,
reverse bool ,
hasError bool ,
firewallPolicyId int64 ,
firewallRuleGroupId int64 ,
firewallRuleSetId int64 ,
hasFirewallPolicy bool ,
userId int64 ,
keyword string ,
ip string ,
domain string ) ( result [ ] * HTTPAccessLog , nextLastRequestId string , err error ) {
2020-10-10 19:21:32 +08:00
if size <= 0 {
return nil , lastRequestId , nil
}
2021-01-20 16:46:19 +08:00
serverIds := [ ] int64 { }
if userId > 0 {
serverIds , err = SharedServerDAO . FindAllEnabledServerIdsWithUserId ( tx , userId )
if err != nil {
return
}
if len ( serverIds ) == 0 {
return
}
}
2020-10-10 19:21:32 +08:00
accessLogLocker . RLock ( )
daoList := [ ] * HTTPAccessLogDAOWrapper { }
2021-06-02 11:53:24 +08:00
for _ , daoWrapper := range httpAccessLogDAOMapping {
2020-10-10 19:21:32 +08:00
daoList = append ( daoList , daoWrapper )
}
accessLogLocker . RUnlock ( )
if len ( daoList ) == 0 {
daoList = [ ] * HTTPAccessLogDAOWrapper { {
DAO : SharedHTTPAccessLogDAO ,
NodeId : 0 ,
} }
}
2022-03-08 19:55:39 +08:00
// 查询某个集群下的节点
var nodeIds = [ ] int64 { }
if clusterId > 0 {
nodeIds , err = SharedNodeDAO . FindAllEnabledNodeIdsWithClusterId ( tx , clusterId )
if err != nil {
remotelogs . Error ( "DBNODE" , err . Error ( ) )
return
}
sort . Slice ( nodeIds , func ( i , j int ) bool {
return nodeIds [ i ] < nodeIds [ j ]
} )
}
2020-10-10 19:21:32 +08:00
2022-03-08 19:55:39 +08:00
// 准备查询
var tableQueries = [ ] * accessLogTableQuery { }
2020-10-10 19:21:32 +08:00
for _ , daoWrapper := range daoList {
2022-03-08 19:55:39 +08:00
var instance = daoWrapper . DAO . Instance
tableDefs , err := SharedHTTPAccessLogManager . FindTables ( instance , day )
if err != nil {
return nil , "" , err
}
for _ , def := range tableDefs {
tableQueries = append ( tableQueries , & accessLogTableQuery {
daoWrapper : daoWrapper ,
name : def . Name ,
hasRemoteAddrField : def . HasRemoteAddr ,
hasDomainField : def . HasDomain ,
} )
}
}
2020-10-10 19:21:32 +08:00
2022-03-08 19:55:39 +08:00
var locker = sync . Mutex { }
2020-10-10 19:21:32 +08:00
2022-03-09 11:00:54 +08:00
var statusPrefixReg = regexp . MustCompile ( ` status:\s*(\d { 3})\b ` )
var statusRangeReg = regexp . MustCompile ( ` status:\s*(\d { 3})-(\d { 3})\b ` )
2022-03-08 19:55:39 +08:00
var count = len ( tableQueries )
var wg = & sync . WaitGroup { }
wg . Add ( count )
for _ , tableQuery := range tableQueries {
go func ( tableQuery * accessLogTableQuery ) {
defer wg . Done ( )
2020-10-10 19:21:32 +08:00
2022-03-08 19:55:39 +08:00
var dao = tableQuery . daoWrapper . DAO
var query = dao . Query ( tx )
2020-10-10 19:21:32 +08:00
// 条件
2022-01-11 12:03:20 +08:00
if nodeId > 0 {
query . Attr ( "nodeId" , nodeId )
} else if clusterId > 0 {
if len ( nodeIds ) > 0 {
var nodeIdStrings = [ ] string { }
for _ , subNodeId := range nodeIds {
nodeIdStrings = append ( nodeIdStrings , types . String ( subNodeId ) )
}
query . Where ( "nodeId IN (" + strings . Join ( nodeIdStrings , "," ) + ")" )
query . Reuse ( false )
} else {
// 如果没有节点,则直接返回空
return
}
}
2020-10-10 19:21:32 +08:00
if serverId > 0 {
query . Attr ( "serverId" , serverId )
2021-01-20 16:46:19 +08:00
} else if userId > 0 && len ( serverIds ) > 0 {
query . Attr ( "serverId" , serverIds ) .
Reuse ( false )
2020-10-10 19:21:32 +08:00
}
2020-10-31 17:44:53 +08:00
if hasError {
2020-11-02 14:37:05 +08:00
query . Where ( "status>=400" )
}
if firewallPolicyId > 0 {
query . Attr ( "firewallPolicyId" , firewallPolicyId )
}
if firewallRuleGroupId > 0 {
query . Attr ( "firewallRuleGroupId" , firewallRuleGroupId )
}
if firewallRuleSetId > 0 {
query . Attr ( "firewallRuleSetId" , firewallRuleSetId )
2020-10-31 17:44:53 +08:00
}
2021-01-20 16:46:19 +08:00
if hasFirewallPolicy {
query . Where ( "firewallPolicyId>0" )
2021-08-22 16:00:32 +08:00
query . UseIndex ( "firewallPolicyId" )
2021-01-20 16:46:19 +08:00
}
2020-10-10 19:21:32 +08:00
2021-06-04 10:15:41 +08:00
// keyword
2021-08-07 22:04:22 +08:00
if len ( ip ) > 0 {
// TODO 支持IP范围
2022-03-08 19:55:39 +08:00
if tableQuery . hasRemoteAddrField {
2021-08-07 22:04:22 +08:00
// IP格式
if strings . Contains ( ip , "," ) || strings . Contains ( ip , "-" ) {
rangeConfig , err := shared . ParseIPRange ( ip )
if err == nil {
if len ( rangeConfig . IPFrom ) > 0 && len ( rangeConfig . IPTo ) > 0 {
query . Between ( "INET_ATON(remoteAddr)" , utils . IP2Long ( rangeConfig . IPFrom ) , utils . IP2Long ( rangeConfig . IPTo ) )
}
}
} else {
query . Attr ( "remoteAddr" , ip )
2021-08-22 16:20:40 +08:00
query . UseIndex ( "remoteAddr" )
2021-08-07 22:04:22 +08:00
}
} else {
query . Where ( "JSON_EXTRACT(content, '$.remoteAddr')=:ip1" ) .
Param ( "ip1" , ip )
}
}
if len ( domain ) > 0 {
2022-03-08 19:55:39 +08:00
if tableQuery . hasDomainField {
2021-08-07 22:04:22 +08:00
if strings . Contains ( domain , "*" ) {
domain = strings . ReplaceAll ( domain , "*" , "%" )
domain = regexp . MustCompile ( ` [^a-zA-Z0-9-.%] ` ) . ReplaceAllString ( domain , "" )
query . Where ( "domain LIKE :host2" ) .
Param ( "host2" , domain )
} else {
query . Attr ( "domain" , domain )
2021-08-22 16:20:40 +08:00
query . UseIndex ( "domain" )
2021-08-07 22:04:22 +08:00
}
} else {
query . Where ( "JSON_EXTRACT(content, '$.host')=:host1" ) .
Param ( "host1" , domain )
}
}
2022-03-08 19:55:39 +08:00
2021-06-04 10:15:41 +08:00
if len ( keyword ) > 0 {
2021-07-14 22:43:31 +08:00
// remoteAddr
2022-03-08 19:55:39 +08:00
if tableQuery . hasRemoteAddrField && net . ParseIP ( keyword ) != nil {
2021-07-14 22:43:31 +08:00
query . Attr ( "remoteAddr" , keyword )
2022-03-08 19:55:39 +08:00
} else if tableQuery . hasRemoteAddrField && regexp . MustCompile ( ` ^ip:.+ ` ) . MatchString ( keyword ) {
2021-07-18 17:09:06 +08:00
keyword = keyword [ 3 : ]
pieces := strings . SplitN ( keyword , "," , 2 )
if len ( pieces ) == 1 || len ( pieces [ 1 ] ) == 0 {
query . Attr ( "remoteAddr" , pieces [ 0 ] )
} else {
2021-08-07 22:04:22 +08:00
query . Between ( "INET_ATON(remoteAddr)" , utils . IP2Long ( pieces [ 0 ] ) , utils . IP2Long ( pieces [ 1 ] ) )
2021-07-18 17:09:06 +08:00
}
2022-03-09 11:00:54 +08:00
} else if statusRangeReg . MatchString ( keyword ) {
var matches = statusRangeReg . FindStringSubmatch ( keyword )
query . Between ( "status" , types . Int ( matches [ 1 ] ) , types . Int ( matches [ 2 ] ) )
// TODO 处理剩余的关键词
2022-03-08 19:55:39 +08:00
} else if statusPrefixReg . MatchString ( keyword ) {
var matches = statusPrefixReg . FindStringSubmatch ( keyword )
query . Attr ( "status" , matches [ 1 ] )
2022-03-09 11:00:54 +08:00
// TODO 处理剩余的关键词
2021-07-14 22:43:31 +08:00
} else {
2021-07-21 22:12:35 +08:00
if regexp . MustCompile ( ` ^ip:.+ ` ) . MatchString ( keyword ) {
keyword = keyword [ 3 : ]
}
2022-03-08 19:55:39 +08:00
var useOriginKeyword = false
2021-07-14 22:43:31 +08:00
2021-08-07 22:04:22 +08:00
where := "JSON_EXTRACT(content, '$.remoteAddr') LIKE :keyword OR JSON_EXTRACT(content, '$.requestURI') LIKE :keyword OR JSON_EXTRACT(content, '$.host') LIKE :keyword OR JSON_EXTRACT(content, '$.userAgent') LIKE :keyword"
2021-07-14 22:43:31 +08:00
2021-07-18 15:52:34 +08:00
jsonKeyword , err := json . Marshal ( keyword )
if err == nil {
where += " OR JSON_CONTAINS(content, :jsonKeyword, '$.tags')"
query . Param ( "jsonKeyword" , jsonKeyword )
}
2021-07-14 22:43:31 +08:00
// 请求方法
if keyword == http . MethodGet ||
keyword == http . MethodPost ||
keyword == http . MethodHead ||
keyword == http . MethodConnect ||
keyword == http . MethodPut ||
keyword == http . MethodTrace ||
keyword == http . MethodOptions ||
keyword == http . MethodDelete ||
keyword == http . MethodPatch {
where += " OR JSON_EXTRACT(content, '$.requestMethod')=:originKeyword"
useOriginKeyword = true
}
// 响应状态码
if regexp . MustCompile ( ` ^\d { 3}$ ` ) . MatchString ( keyword ) {
2022-03-08 19:55:39 +08:00
where += " OR status=:intKeyword"
2021-07-14 22:43:31 +08:00
query . Param ( "intKeyword" , types . Int ( keyword ) )
}
if regexp . MustCompile ( ` ^\d { 3}-\d { 3}$ ` ) . MatchString ( keyword ) {
pieces := strings . Split ( keyword , "-" )
2022-03-08 19:55:39 +08:00
where += " OR status BETWEEN :intKeyword1 AND :intKeyword2"
2021-07-14 22:43:31 +08:00
query . Param ( "intKeyword1" , types . Int ( pieces [ 0 ] ) )
query . Param ( "intKeyword2" , types . Int ( pieces [ 1 ] ) )
}
2021-12-02 14:44:20 +08:00
if regexp . MustCompile ( ` ^\d { 20,}\s*\.?$ ` ) . MatchString ( keyword ) {
2021-12-02 11:50:16 +08:00
where += " OR requestId=:requestId"
2021-12-02 14:44:20 +08:00
query . Param ( "requestId" , strings . TrimRight ( keyword , ". " ) )
2021-12-02 11:50:16 +08:00
}
2021-07-14 22:43:31 +08:00
query . Where ( "(" + where + ")" ) .
Param ( "keyword" , "%" + keyword + "%" )
if useOriginKeyword {
query . Param ( "originKeyword" , keyword )
}
2021-06-04 10:15:41 +08:00
}
}
2022-03-09 11:00:54 +08:00
// hourFrom - hourTo
if len ( hourFrom ) > 0 && len ( hourTo ) > 0 {
var hourFromInt = types . Int ( hourFrom )
var hourToInt = types . Int ( hourTo )
if hourFromInt >= 0 && hourFromInt <= 23 && hourToInt >= hourFromInt && hourToInt <= 23 {
var y = types . Int ( day [ : 4 ] )
var m = types . Int ( day [ 4 : 6 ] )
var d = types . Int ( day [ 6 : ] )
var timeFrom = time . Date ( y , time . Month ( m ) , d , hourFromInt , 0 , 0 , 0 , time . Local )
var timeTo = time . Date ( y , time . Month ( m ) , d , hourToInt , 59 , 59 , 0 , time . Local )
query . Between ( "createdAt" , timeFrom . Unix ( ) , timeTo . Unix ( ) )
}
}
2020-10-10 19:21:32 +08:00
// offset
if len ( lastRequestId ) > 0 {
if ! reverse {
query . Where ( "requestId<:requestId" ) .
Param ( "requestId" , lastRequestId )
} else {
query . Where ( "requestId>:requestId" ) .
Param ( "requestId" , lastRequestId )
}
}
if ! reverse {
query . Desc ( "requestId" )
} else {
query . Asc ( "requestId" )
}
// 开始查询
ones , err := query .
2022-03-08 19:55:39 +08:00
Table ( tableQuery . name ) .
2020-10-10 19:21:32 +08:00
Limit ( size ) .
FindAll ( )
if err != nil {
logs . Println ( "[DB_NODE]" + err . Error ( ) )
return
}
2022-03-08 19:55:39 +08:00
2020-10-10 19:21:32 +08:00
locker . Lock ( )
for _ , one := range ones {
accessLog := one . ( * HTTPAccessLog )
result = append ( result , accessLog )
}
locker . Unlock ( )
2022-03-08 19:55:39 +08:00
} ( tableQuery )
2020-10-10 19:21:32 +08:00
}
wg . Wait ( )
if len ( result ) == 0 {
return nil , lastRequestId , nil
}
// 按照requestId排序
sort . Slice ( result , func ( i , j int ) bool {
if ! reverse {
return result [ i ] . RequestId > result [ j ] . RequestId
} else {
return result [ i ] . RequestId < result [ j ] . RequestId
}
} )
if int64 ( len ( result ) ) > size {
result = result [ : size ]
}
2022-03-08 19:55:39 +08:00
var requestId = result [ len ( result ) - 1 ] . RequestId
2020-10-10 19:21:32 +08:00
if reverse {
lists . Reverse ( result )
}
if ! reverse {
return result , requestId , nil
} else {
return result , requestId , nil
}
}
2020-11-02 21:15:31 +08:00
2021-06-02 11:53:24 +08:00
// FindAccessLogWithRequestId 根据请求ID获取访问日志
2021-01-01 23:31:30 +08:00
func ( this * HTTPAccessLogDAO ) FindAccessLogWithRequestId ( tx * dbs . Tx , requestId string ) ( * HTTPAccessLog , error ) {
2021-11-21 15:56:13 +08:00
if ! regexp . MustCompile ( ` ^\d { 11,} ` ) . MatchString ( requestId ) {
2020-11-02 21:15:31 +08:00
return nil , errors . New ( "invalid requestId" )
}
accessLogLocker . RLock ( )
daoList := [ ] * HTTPAccessLogDAOWrapper { }
2021-06-02 11:53:24 +08:00
for _ , daoWrapper := range httpAccessLogDAOMapping {
2020-11-02 21:15:31 +08:00
daoList = append ( daoList , daoWrapper )
}
accessLogLocker . RUnlock ( )
if len ( daoList ) == 0 {
daoList = [ ] * HTTPAccessLogDAOWrapper { {
DAO : SharedHTTPAccessLogDAO ,
NodeId : 0 ,
} }
}
2022-03-08 19:55:39 +08:00
// 准备查询
var day = timeutil . FormatTime ( "Ymd" , types . Int64 ( requestId [ : 10 ] ) )
var tableQueries = [ ] * accessLogTableQuery { }
for _ , daoWrapper := range daoList {
var instance = daoWrapper . DAO . Instance
tableDefs , err := SharedHTTPAccessLogManager . FindTables ( instance , day )
if err != nil {
return nil , err
}
for _ , def := range tableDefs {
tableQueries = append ( tableQueries , & accessLogTableQuery {
daoWrapper : daoWrapper ,
name : def . Name ,
hasRemoteAddrField : def . HasRemoteAddr ,
hasDomainField : def . HasDomain ,
} )
}
}
var count = len ( tableQueries )
var wg = & sync . WaitGroup { }
2020-11-02 21:15:31 +08:00
wg . Add ( count )
var result * HTTPAccessLog = nil
2022-03-08 19:55:39 +08:00
for _ , tableQuery := range tableQueries {
go func ( tableQuery * accessLogTableQuery ) {
2020-11-02 21:15:31 +08:00
defer wg . Done ( )
2022-03-08 19:55:39 +08:00
var dao = tableQuery . daoWrapper . DAO
2021-01-01 23:31:30 +08:00
one , err := dao . Query ( tx ) .
2022-03-08 19:55:39 +08:00
Table ( tableQuery . name ) .
2020-11-02 21:15:31 +08:00
Attr ( "requestId" , requestId ) .
Find ( )
if err != nil {
logs . Println ( "[DB_NODE]" + err . Error ( ) )
return
}
if one != nil {
result = one . ( * HTTPAccessLog )
}
2022-03-08 19:55:39 +08:00
} ( tableQuery )
2020-11-02 21:15:31 +08:00
}
wg . Wait ( )
return result , nil
}
2021-12-14 12:44:57 +08:00
// SetupQueue 建立队列
func ( this * HTTPAccessLogDAO ) SetupQueue ( ) {
configJSON , err := SharedSysSettingDAO . ReadSetting ( nil , systemconfigs . SettingCodeAccessLogQueue )
if err != nil {
remotelogs . Error ( "HTTP_ACCESS_LOG_QUEUE" , "read settings failed: " + err . Error ( ) )
return
}
if len ( configJSON ) == 0 {
return
}
if bytes . Compare ( accessLogConfigJSON , configJSON ) == 0 {
return
}
accessLogConfigJSON = configJSON
var config = & serverconfigs . AccessLogQueueConfig { }
err = json . Unmarshal ( configJSON , config )
if err != nil {
remotelogs . Error ( "HTTP_ACCESS_LOG_QUEUE" , "decode settings failed: " + err . Error ( ) )
return
}
accessLogQueuePercent = config . Percent
accessLogCountPerSecond = config . CountPerSecond
if config . MaxLength <= 0 {
config . MaxLength = 100_000
}
2022-03-09 10:01:24 +08:00
accessLogEnableAutoPartial = config . EnableAutoPartial
if config . RowsPerTable > 0 {
accessLogRowsPerTable = config . RowsPerTable
}
2021-12-14 12:44:57 +08:00
if accessLogQueueMaxLength != config . MaxLength {
accessLogQueueMaxLength = config . MaxLength
oldAccessLogQueue = accessLogQueue
accessLogQueue = make ( chan * pb . HTTPAccessLog , config . MaxLength )
}
2022-03-09 10:01:24 +08:00
if Tea . IsTesting ( ) {
remotelogs . Println ( "HTTP_ACCESS_LOG_QUEUE" , "change queue " + string ( configJSON ) )
}
2021-12-14 12:44:57 +08:00
}