2020-10-10 11:49:21 +08:00
package models
import (
"encoding/json"
2020-10-10 19:21:32 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/configs"
2020-10-10 11:49:21 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
2020-10-10 19:21:32 +08:00
"github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/logs"
2020-11-02 21:15:31 +08:00
"github.com/iwind/TeaGo/types"
2020-10-10 11:49:21 +08:00
timeutil "github.com/iwind/TeaGo/utils/time"
2021-07-14 22:43:31 +08:00
"net"
2021-06-04 10:15:41 +08:00
"net/http"
2020-11-02 21:15:31 +08:00
"regexp"
2020-10-10 19:21:32 +08:00
"sort"
"strconv"
2020-10-10 11:49:21 +08:00
"strings"
2020-10-10 19:21:32 +08:00
"sync"
2020-10-10 11:49:21 +08:00
"time"
)
type HTTPAccessLogDAO dbs . DAO
2020-10-13 20:05:13 +08:00
var SharedHTTPAccessLogDAO * HTTPAccessLogDAO
func init ( ) {
dbs . OnReady ( func ( ) {
SharedHTTPAccessLogDAO = NewHTTPAccessLogDAO ( )
} )
}
2020-10-10 11:49:21 +08:00
func NewHTTPAccessLogDAO ( ) * HTTPAccessLogDAO {
return dbs . NewDAO ( & HTTPAccessLogDAO {
DAOObject : dbs . DAOObject {
DB : Tea . Env ,
Table : "edgeHTTPAccessLogs" ,
Model : new ( HTTPAccessLog ) ,
PkName : "id" ,
} ,
} ) . ( * HTTPAccessLogDAO )
}
2021-06-02 11:53:24 +08:00
// CreateHTTPAccessLogs 创建访问日志
2021-01-01 23:31:30 +08:00
func ( this * HTTPAccessLogDAO ) CreateHTTPAccessLogs ( tx * dbs . Tx , accessLogs [ ] * pb . HTTPAccessLog ) error {
2021-06-02 11:53:24 +08:00
dao := randomHTTPAccessLogDAO ( )
2020-10-10 11:49:21 +08:00
if dao == nil {
2020-10-10 19:21:32 +08:00
dao = & HTTPAccessLogDAOWrapper {
DAO : SharedHTTPAccessLogDAO ,
NodeId : 0 ,
}
2020-10-10 11:49:21 +08:00
}
2021-01-01 23:31:30 +08:00
return this . CreateHTTPAccessLogsWithDAO ( tx , dao , accessLogs )
2020-10-10 11:49:21 +08:00
}
2021-06-02 11:53:24 +08:00
// CreateHTTPAccessLogsWithDAO 使用特定的DAO创建访问日志
2021-01-01 23:31:30 +08:00
func ( this * HTTPAccessLogDAO ) CreateHTTPAccessLogsWithDAO ( tx * dbs . Tx , daoWrapper * HTTPAccessLogDAOWrapper , accessLogs [ ] * pb . HTTPAccessLog ) error {
2020-10-10 19:21:32 +08:00
if daoWrapper == nil {
2020-10-10 11:49:21 +08:00
return errors . New ( "dao should not be nil" )
}
if len ( accessLogs ) == 0 {
return nil
}
2020-10-10 19:21:32 +08:00
dao := daoWrapper . DAO
2020-10-10 11:49:21 +08:00
// TODO 改成事务批量提交,以加快速度
for _ , accessLog := range accessLogs {
day := timeutil . Format ( "Ymd" , time . Unix ( accessLog . Timestamp , 0 ) )
2021-07-14 22:43:31 +08:00
tableDef , err := findHTTPAccessLogTable ( dao . Instance , day , false )
2020-10-10 11:49:21 +08:00
if err != nil {
return err
}
fields := map [ string ] interface { } { }
fields [ "serverId" ] = accessLog . ServerId
fields [ "nodeId" ] = accessLog . NodeId
fields [ "status" ] = accessLog . Status
fields [ "createdAt" ] = accessLog . Timestamp
2020-10-10 19:21:32 +08:00
fields [ "requestId" ] = accessLog . RequestId + strconv . FormatInt ( time . Now ( ) . UnixNano ( ) , 10 ) + configs . PaddingId
2020-11-02 14:37:05 +08:00
fields [ "firewallPolicyId" ] = accessLog . FirewallPolicyId
fields [ "firewallRuleGroupId" ] = accessLog . FirewallRuleGroupId
fields [ "firewallRuleSetId" ] = accessLog . FirewallRuleSetId
fields [ "firewallRuleId" ] = accessLog . FirewallRuleId
2020-10-10 11:49:21 +08:00
2021-07-14 22:43:31 +08:00
// TODO 根据集群、服务设置获取IP
if tableDef . HasRemoteAddr {
fields [ "remoteAddr" ] = accessLog . RawRemoteAddr
}
2020-10-10 11:49:21 +08:00
content , err := json . Marshal ( accessLog )
if err != nil {
return err
}
fields [ "content" ] = content
2021-01-01 23:31:30 +08:00
_ , err = dao . Query ( tx ) .
2021-07-14 22:43:31 +08:00
Table ( tableDef . Name ) .
2020-10-10 11:49:21 +08:00
Sets ( fields ) .
Insert ( )
if err != nil {
// 是否为 Error 1146: Table 'xxx.xxx' doesn't exist 如果是,则创建表之后重试
if strings . Contains ( err . Error ( ) , "1146" ) {
2021-07-14 22:43:31 +08:00
tableDef , err = findHTTPAccessLogTable ( dao . Instance , day , true )
2020-10-10 11:49:21 +08:00
if err != nil {
return err
}
2021-01-01 23:31:30 +08:00
_ , err = dao . Query ( tx ) .
2021-07-14 22:43:31 +08:00
Table ( tableDef . Name ) .
2020-10-10 11:49:21 +08:00
Sets ( fields ) .
Insert ( )
if err != nil {
return err
}
2021-07-14 22:43:31 +08:00
} else {
logs . Println ( "HTTP_ACCESS_LOG" , err . Error ( ) )
2020-10-10 11:49:21 +08:00
}
}
}
return nil
}
2020-10-10 19:21:32 +08:00
2021-06-02 11:53:24 +08:00
// ListAccessLogs 读取往前的 单页访问日志
2021-06-04 10:15:41 +08:00
func ( this * HTTPAccessLogDAO ) ListAccessLogs ( tx * dbs . Tx , lastRequestId string , size int64 , day string , serverId int64 , reverse bool , hasError bool , firewallPolicyId int64 , firewallRuleGroupId int64 , firewallRuleSetId int64 , hasFirewallPolicy bool , userId int64 , keyword string ) ( result [ ] * HTTPAccessLog , nextLastRequestId string , hasMore bool , err error ) {
2020-10-10 19:21:32 +08:00
if len ( day ) != 8 {
return
}
// 限制能查询的最大条数,防止占用内存过多
if size > 1000 {
size = 1000
}
2021-06-04 10:15:41 +08:00
result , nextLastRequestId , err = this . listAccessLogs ( tx , lastRequestId , size , day , serverId , reverse , hasError , firewallPolicyId , firewallRuleGroupId , firewallRuleSetId , hasFirewallPolicy , userId , keyword )
2020-10-10 19:21:32 +08:00
if err != nil || int64 ( len ( result ) ) < size {
return
}
2021-06-04 10:15:41 +08:00
moreResult , _ , _ := this . listAccessLogs ( tx , nextLastRequestId , 1 , day , serverId , reverse , hasError , firewallPolicyId , firewallRuleGroupId , firewallRuleSetId , hasFirewallPolicy , userId , keyword )
2020-10-10 19:21:32 +08:00
hasMore = len ( moreResult ) > 0
return
}
// 读取往前的单页访问日志
2021-06-04 10:15:41 +08:00
func ( this * HTTPAccessLogDAO ) listAccessLogs ( tx * dbs . Tx , lastRequestId string , size int64 , day string , serverId int64 , reverse bool , hasError bool , firewallPolicyId int64 , firewallRuleGroupId int64 , firewallRuleSetId int64 , hasFirewallPolicy bool , userId int64 , keyword string ) ( result [ ] * HTTPAccessLog , nextLastRequestId string , err error ) {
2020-10-10 19:21:32 +08:00
if size <= 0 {
return nil , lastRequestId , nil
}
2021-01-20 16:46:19 +08:00
serverIds := [ ] int64 { }
if userId > 0 {
serverIds , err = SharedServerDAO . FindAllEnabledServerIdsWithUserId ( tx , userId )
if err != nil {
return
}
if len ( serverIds ) == 0 {
return
}
}
2020-10-10 19:21:32 +08:00
accessLogLocker . RLock ( )
daoList := [ ] * HTTPAccessLogDAOWrapper { }
2021-06-02 11:53:24 +08:00
for _ , daoWrapper := range httpAccessLogDAOMapping {
2020-10-10 19:21:32 +08:00
daoList = append ( daoList , daoWrapper )
}
accessLogLocker . RUnlock ( )
if len ( daoList ) == 0 {
daoList = [ ] * HTTPAccessLogDAOWrapper { {
DAO : SharedHTTPAccessLogDAO ,
NodeId : 0 ,
} }
}
locker := sync . Mutex { }
count := len ( daoList )
wg := & sync . WaitGroup { }
wg . Add ( count )
for _ , daoWrapper := range daoList {
go func ( daoWrapper * HTTPAccessLogDAOWrapper ) {
defer wg . Done ( )
dao := daoWrapper . DAO
2021-07-14 22:43:31 +08:00
tableName , hasRemoteAddr , exists , err := findHTTPAccessLogTableName ( dao . Instance , day )
2020-10-10 19:21:32 +08:00
if ! exists {
// 表格不存在则跳过
return
}
if err != nil {
logs . Println ( "[DB_NODE]" + err . Error ( ) )
return
}
2021-01-01 23:31:30 +08:00
query := dao . Query ( tx )
2020-10-10 19:21:32 +08:00
// 条件
if serverId > 0 {
query . Attr ( "serverId" , serverId )
2021-01-20 16:46:19 +08:00
} else if userId > 0 && len ( serverIds ) > 0 {
query . Attr ( "serverId" , serverIds ) .
Reuse ( false )
2020-10-10 19:21:32 +08:00
}
2020-10-31 17:44:53 +08:00
if hasError {
2020-11-02 14:37:05 +08:00
query . Where ( "status>=400" )
}
if firewallPolicyId > 0 {
query . Attr ( "firewallPolicyId" , firewallPolicyId )
}
if firewallRuleGroupId > 0 {
query . Attr ( "firewallRuleGroupId" , firewallRuleGroupId )
}
if firewallRuleSetId > 0 {
query . Attr ( "firewallRuleSetId" , firewallRuleSetId )
2020-10-31 17:44:53 +08:00
}
2021-01-20 16:46:19 +08:00
if hasFirewallPolicy {
query . Where ( "firewallPolicyId>0" )
}
2020-10-10 19:21:32 +08:00
2021-06-04 10:15:41 +08:00
// keyword
if len ( keyword ) > 0 {
2021-07-14 22:43:31 +08:00
// remoteAddr
if hasRemoteAddr && net . ParseIP ( keyword ) != nil {
query . Attr ( "remoteAddr" , keyword )
} else {
2021-06-04 10:15:41 +08:00
2021-07-14 22:43:31 +08:00
useOriginKeyword := false
where := "JSON_EXTRACT(content, '$.remoteAddr') LIKE :keyword OR JSON_EXTRACT(content, '$.requestURI') LIKE :keyword OR JSON_EXTRACT(content, '$.host') LIKE :keyword"
// 请求方法
if keyword == http . MethodGet ||
keyword == http . MethodPost ||
keyword == http . MethodHead ||
keyword == http . MethodConnect ||
keyword == http . MethodPut ||
keyword == http . MethodTrace ||
keyword == http . MethodOptions ||
keyword == http . MethodDelete ||
keyword == http . MethodPatch {
where += " OR JSON_EXTRACT(content, '$.requestMethod')=:originKeyword"
useOriginKeyword = true
}
// 响应状态码
if regexp . MustCompile ( ` ^\d { 3}$ ` ) . MatchString ( keyword ) {
where += " OR JSON_EXTRACT(content, '$.status')=:intKeyword"
query . Param ( "intKeyword" , types . Int ( keyword ) )
}
if regexp . MustCompile ( ` ^\d { 3}-\d { 3}$ ` ) . MatchString ( keyword ) {
pieces := strings . Split ( keyword , "-" )
where += " OR JSON_EXTRACT(content, '$.status') BETWEEN :intKeyword1 AND :intKeyword2"
query . Param ( "intKeyword1" , types . Int ( pieces [ 0 ] ) )
query . Param ( "intKeyword2" , types . Int ( pieces [ 1 ] ) )
}
query . Where ( "(" + where + ")" ) .
Param ( "keyword" , "%" + keyword + "%" )
if useOriginKeyword {
query . Param ( "originKeyword" , keyword )
}
2021-06-04 10:15:41 +08:00
}
}
2020-10-10 19:21:32 +08:00
// offset
if len ( lastRequestId ) > 0 {
if ! reverse {
query . Where ( "requestId<:requestId" ) .
Param ( "requestId" , lastRequestId )
} else {
query . Where ( "requestId>:requestId" ) .
Param ( "requestId" , lastRequestId )
}
}
if ! reverse {
query . Desc ( "requestId" )
} else {
query . Asc ( "requestId" )
}
// 开始查询
ones , err := query .
Table ( tableName ) .
Limit ( size ) .
FindAll ( )
if err != nil {
logs . Println ( "[DB_NODE]" + err . Error ( ) )
return
}
locker . Lock ( )
for _ , one := range ones {
accessLog := one . ( * HTTPAccessLog )
result = append ( result , accessLog )
}
locker . Unlock ( )
} ( daoWrapper )
}
wg . Wait ( )
if len ( result ) == 0 {
return nil , lastRequestId , nil
}
// 按照requestId排序
sort . Slice ( result , func ( i , j int ) bool {
if ! reverse {
return result [ i ] . RequestId > result [ j ] . RequestId
} else {
return result [ i ] . RequestId < result [ j ] . RequestId
}
} )
if int64 ( len ( result ) ) > size {
result = result [ : size ]
}
requestId := result [ len ( result ) - 1 ] . RequestId
if reverse {
lists . Reverse ( result )
}
if ! reverse {
return result , requestId , nil
} else {
return result , requestId , nil
}
}
2020-11-02 21:15:31 +08:00
2021-06-02 11:53:24 +08:00
// FindAccessLogWithRequestId 根据请求ID获取访问日志
2021-01-01 23:31:30 +08:00
func ( this * HTTPAccessLogDAO ) FindAccessLogWithRequestId ( tx * dbs . Tx , requestId string ) ( * HTTPAccessLog , error ) {
2020-11-02 21:15:31 +08:00
if ! regexp . MustCompile ( ` ^\d { 30,} ` ) . MatchString ( requestId ) {
return nil , errors . New ( "invalid requestId" )
}
accessLogLocker . RLock ( )
daoList := [ ] * HTTPAccessLogDAOWrapper { }
2021-06-02 11:53:24 +08:00
for _ , daoWrapper := range httpAccessLogDAOMapping {
2020-11-02 21:15:31 +08:00
daoList = append ( daoList , daoWrapper )
}
accessLogLocker . RUnlock ( )
if len ( daoList ) == 0 {
daoList = [ ] * HTTPAccessLogDAOWrapper { {
DAO : SharedHTTPAccessLogDAO ,
NodeId : 0 ,
} }
}
count := len ( daoList )
wg := & sync . WaitGroup { }
wg . Add ( count )
var result * HTTPAccessLog = nil
day := timeutil . FormatTime ( "Ymd" , types . Int64 ( requestId [ : 10 ] ) )
for _ , daoWrapper := range daoList {
go func ( daoWrapper * HTTPAccessLogDAOWrapper ) {
defer wg . Done ( )
dao := daoWrapper . DAO
2021-07-14 22:43:31 +08:00
tableName , _ , exists , err := findHTTPAccessLogTableName ( dao . Instance , day )
2020-11-02 21:15:31 +08:00
if err != nil {
logs . Println ( "[DB_NODE]" + err . Error ( ) )
return
}
if ! exists {
return
}
2021-01-01 23:31:30 +08:00
one , err := dao . Query ( tx ) .
2020-11-02 21:15:31 +08:00
Table ( tableName ) .
Attr ( "requestId" , requestId ) .
Find ( )
if err != nil {
logs . Println ( "[DB_NODE]" + err . Error ( ) )
return
}
if one != nil {
result = one . ( * HTTPAccessLog )
}
} ( daoWrapper )
}
wg . Wait ( )
return result , nil
}