mirror of
https://github.com/TeaOSLab/EdgeAPI.git
synced 2025-11-03 23:20:26 +08:00
871 lines
24 KiB
Go
871 lines
24 KiB
Go
package models
|
||
|
||
import (
|
||
"bytes"
|
||
"encoding/json"
|
||
dbutils "github.com/TeaOSLab/EdgeAPI/internal/db/utils"
|
||
"github.com/TeaOSLab/EdgeAPI/internal/errors"
|
||
"github.com/TeaOSLab/EdgeAPI/internal/goman"
|
||
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
|
||
"github.com/TeaOSLab/EdgeAPI/internal/utils"
|
||
"github.com/TeaOSLab/EdgeAPI/internal/zero"
|
||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
|
||
"github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs"
|
||
_ "github.com/go-sql-driver/mysql"
|
||
"github.com/iwind/TeaGo/Tea"
|
||
"github.com/iwind/TeaGo/dbs"
|
||
"github.com/iwind/TeaGo/lists"
|
||
"github.com/iwind/TeaGo/logs"
|
||
"github.com/iwind/TeaGo/rands"
|
||
"github.com/iwind/TeaGo/types"
|
||
timeutil "github.com/iwind/TeaGo/utils/time"
|
||
"golang.org/x/net/idna"
|
||
"net"
|
||
"net/http"
|
||
"net/url"
|
||
"regexp"
|
||
"sort"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
type HTTPAccessLogDAO dbs.DAO
|
||
|
||
var SharedHTTPAccessLogDAO *HTTPAccessLogDAO
|
||
|
||
// 队列
|
||
var (
|
||
oldAccessLogQueue = make(chan *pb.HTTPAccessLog)
|
||
accessLogQueue = make(chan *pb.HTTPAccessLog, 10_000)
|
||
accessLogQueueMaxLength = 100_000 // 队列最大长度
|
||
accessLogQueuePercent = 100 // 0-100
|
||
accessLogCountPerSecond = 10_000 // 每秒钟写入条数,0 表示不限制
|
||
accessLogPerTx = 100 // 单事务写入条数
|
||
accessLogConfigJSON = []byte{}
|
||
accessLogQueueChanged = make(chan zero.Zero, 1)
|
||
|
||
accessLogEnableAutoPartial = true // 是否启用自动分表
|
||
accessLogRowsPerTable int64 = 500_000 // 自动分表的单表最大值
|
||
)
|
||
|
||
func AccessLogQueuePercent() int {
|
||
return accessLogQueuePercent
|
||
}
|
||
|
||
type accessLogTableQuery struct {
|
||
daoWrapper *HTTPAccessLogDAOWrapper
|
||
name string
|
||
hasRemoteAddrField bool
|
||
hasDomainField bool
|
||
}
|
||
|
||
func init() {
|
||
dbs.OnReady(func() {
|
||
SharedHTTPAccessLogDAO = NewHTTPAccessLogDAO()
|
||
})
|
||
|
||
// 队列相关
|
||
dbs.OnReadyDone(func() {
|
||
// 检查队列变化
|
||
goman.New(func() {
|
||
var ticker = time.NewTicker(60 * time.Second)
|
||
|
||
// 先执行一次初始化
|
||
SharedHTTPAccessLogDAO.SetupQueue()
|
||
|
||
// 循环执行
|
||
for {
|
||
select {
|
||
case <-ticker.C:
|
||
SharedHTTPAccessLogDAO.SetupQueue()
|
||
case <-accessLogQueueChanged:
|
||
SharedHTTPAccessLogDAO.SetupQueue()
|
||
}
|
||
}
|
||
})
|
||
|
||
// 导出队列内容
|
||
goman.New(func() {
|
||
var ticker = time.NewTicker(1 * time.Second)
|
||
var accessLogPerLoop = accessLogPerTx
|
||
|
||
for range ticker.C {
|
||
var countTxs = accessLogCountPerSecond / accessLogPerLoop
|
||
if countTxs <= 0 {
|
||
countTxs = 1
|
||
}
|
||
for i := 0; i < countTxs; i++ {
|
||
var before = time.Now()
|
||
hasMore, err := SharedHTTPAccessLogDAO.DumpAccessLogsFromQueue(accessLogPerLoop)
|
||
|
||
// 如果用时过长,则调整每次写入次数
|
||
var costMs = time.Since(before).Milliseconds()
|
||
if costMs > 150 {
|
||
accessLogPerLoop = accessLogPerTx / 4
|
||
} else if costMs > 100 {
|
||
accessLogPerLoop = accessLogPerTx / 2
|
||
} // 这里不需要恢复成默认值,因为可能是写入数量比较小
|
||
if err != nil {
|
||
remotelogs.Error("HTTP_ACCESS_LOG_QUEUE", "dump access logs failed: "+err.Error())
|
||
} else if !hasMore {
|
||
break
|
||
}
|
||
}
|
||
}
|
||
})
|
||
})
|
||
}
|
||
|
||
func NewHTTPAccessLogDAO() *HTTPAccessLogDAO {
|
||
return dbs.NewDAO(&HTTPAccessLogDAO{
|
||
DAOObject: dbs.DAOObject{
|
||
DB: Tea.Env,
|
||
Table: "edgeHTTPAccessLogs",
|
||
Model: new(HTTPAccessLog),
|
||
PkName: "id",
|
||
},
|
||
}).(*HTTPAccessLogDAO)
|
||
}
|
||
|
||
// CreateHTTPAccessLogs 创建访问日志
|
||
func (this *HTTPAccessLogDAO) CreateHTTPAccessLogs(tx *dbs.Tx, accessLogs []*pb.HTTPAccessLog) error {
|
||
// 写入队列
|
||
var queue = accessLogQueue // 这样写非常重要,防止在写入过程中队列有切换
|
||
for _, accessLog := range accessLogs {
|
||
if accessLog.FirewallPolicyId == 0 { // 如果是非WAF记录,则采取采样率
|
||
// 采样率
|
||
if accessLogQueuePercent <= 0 {
|
||
return nil
|
||
}
|
||
if accessLogQueuePercent < 100 && rands.Int(1, 100) > accessLogQueuePercent {
|
||
continue
|
||
}
|
||
}
|
||
|
||
select {
|
||
case queue <- accessLog:
|
||
default:
|
||
// 超出的丢弃
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// DumpAccessLogsFromQueue 从队列导入访问日志
|
||
func (this *HTTPAccessLogDAO) DumpAccessLogsFromQueue(size int) (hasMore bool, err error) {
|
||
if size <= 0 {
|
||
size = 100
|
||
}
|
||
|
||
if len(oldAccessLogQueue) == 0 && len(accessLogQueue) == 0 {
|
||
return false, nil
|
||
}
|
||
|
||
var dao = randomHTTPAccessLogDAO()
|
||
if dao == nil {
|
||
dao = &HTTPAccessLogDAOWrapper{
|
||
DAO: SharedHTTPAccessLogDAO,
|
||
NodeId: 0,
|
||
}
|
||
|
||
// 检查本地数据库空间
|
||
if dbutils.IsLocalDatabase && !dbutils.HasFreeSpace {
|
||
return false, errors.New("dump accesslog failed: there is no enough space left for database (" + dbutils.LocalDatabaseDataDir + ")")
|
||
}
|
||
} else if dao.IsLocal {
|
||
// 检查本地数据库空间
|
||
// 我们假定本地只能安装一个数据库,访问日志中的数据库和当前API连接的数据库一致
|
||
if !dbutils.HasFreeSpace {
|
||
return true, errors.New("dump accesslog failed: there is no enough space left for database (" + dbutils.LocalDatabaseDataDir + ")")
|
||
}
|
||
}
|
||
|
||
// 开始事务
|
||
tx, err := dao.DAO.Instance.Begin()
|
||
if err != nil {
|
||
return false, err
|
||
}
|
||
defer func() {
|
||
_ = tx.Commit()
|
||
}()
|
||
|
||
// 复制变量,防止中途改变
|
||
var oldQueue = oldAccessLogQueue
|
||
var newQueue = accessLogQueue
|
||
|
||
hasMore = true
|
||
|
||
Loop:
|
||
for i := 0; i < size; i++ {
|
||
// old
|
||
select {
|
||
case accessLog := <-oldQueue:
|
||
err := this.CreateHTTPAccessLog(tx, dao.DAO, accessLog)
|
||
if err != nil {
|
||
return false, err
|
||
}
|
||
continue Loop
|
||
default:
|
||
|
||
}
|
||
|
||
// new
|
||
select {
|
||
case accessLog := <-newQueue:
|
||
err := this.CreateHTTPAccessLog(tx, dao.DAO, accessLog)
|
||
if err != nil {
|
||
return false, err
|
||
}
|
||
continue Loop
|
||
default:
|
||
hasMore = false
|
||
break Loop
|
||
}
|
||
}
|
||
|
||
return hasMore, nil
|
||
}
|
||
|
||
// CreateHTTPAccessLog 写入单条访问日志
|
||
func (this *HTTPAccessLogDAO) CreateHTTPAccessLog(tx *dbs.Tx, dao *HTTPAccessLogDAO, accessLog *pb.HTTPAccessLog) error {
|
||
var day string
|
||
// 注意:如果你修改了 TimeISO8601 的逻辑,这里也需要同步修改
|
||
if len(accessLog.TimeISO8601) > 10 {
|
||
day = strings.ReplaceAll(accessLog.TimeISO8601[:10], "-", "")
|
||
} else {
|
||
day = timeutil.FormatTime("Ymd", accessLog.Timestamp)
|
||
}
|
||
|
||
tableDef, err := SharedHTTPAccessLogManager.FindLastTable(dao.Instance, day, true)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
var fields = map[string]any{}
|
||
fields["serverId"] = accessLog.ServerId
|
||
fields["nodeId"] = accessLog.NodeId
|
||
fields["status"] = accessLog.Status
|
||
fields["createdAt"] = accessLog.Timestamp
|
||
fields["requestId"] = accessLog.RequestId
|
||
fields["firewallPolicyId"] = accessLog.FirewallPolicyId
|
||
fields["firewallRuleGroupId"] = accessLog.FirewallRuleGroupId
|
||
fields["firewallRuleSetId"] = accessLog.FirewallRuleSetId
|
||
fields["firewallRuleId"] = accessLog.FirewallRuleId
|
||
|
||
if len(accessLog.RequestBody) > 0 {
|
||
fields["requestBody"] = accessLog.RequestBody
|
||
accessLog.RequestBody = nil
|
||
}
|
||
|
||
if tableDef.HasRemoteAddr {
|
||
fields["remoteAddr"] = accessLog.RemoteAddr
|
||
}
|
||
if tableDef.HasDomain {
|
||
if len(accessLog.Host) > 128 {
|
||
fields["domain"] = accessLog.Host[:128]
|
||
} else {
|
||
fields["domain"] = accessLog.Host
|
||
}
|
||
}
|
||
|
||
content, err := json.Marshal(accessLog)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
fields["content"] = content
|
||
|
||
var lastId int64
|
||
lastId, err = dao.Query(tx).
|
||
Table(tableDef.Name).
|
||
Sets(fields).
|
||
Insert()
|
||
if err != nil {
|
||
// 错误重试
|
||
if CheckSQLErrCode(err, 1146) { // Error 1146: Table 'xxx' doesn't exist
|
||
err = SharedHTTPAccessLogManager.CreateTable(dao.Instance, tableDef.Name)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 重新尝试
|
||
lastId, err = dao.Query(tx).
|
||
Table(tableDef.Name).
|
||
Sets(fields).
|
||
Insert()
|
||
}
|
||
|
||
if err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
if accessLogEnableAutoPartial && accessLogRowsPerTable > 0 && lastId >= accessLogRowsPerTable {
|
||
SharedHTTPAccessLogManager.ResetTable(dao.Instance, day)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// ListAccessLogs 读取往前的 单页访问日志
|
||
func (this *HTTPAccessLogDAO) ListAccessLogs(tx *dbs.Tx,
|
||
partition int32,
|
||
lastRequestId string,
|
||
size int64,
|
||
day string,
|
||
hourFrom string,
|
||
hourTo string,
|
||
clusterId int64,
|
||
nodeId int64,
|
||
serverId int64,
|
||
reverse bool,
|
||
hasError bool,
|
||
firewallPolicyId int64,
|
||
firewallRuleGroupId int64,
|
||
firewallRuleSetId int64,
|
||
hasFirewallPolicy bool,
|
||
userId int64,
|
||
keyword string,
|
||
ip string,
|
||
domain string) (result []*HTTPAccessLog, nextLastRequestId string, hasMore bool, err error) {
|
||
if len(day) != 8 {
|
||
return
|
||
}
|
||
|
||
// 限制能查询的最大条数,防止占用内存过多
|
||
if size > 1000 {
|
||
size = 1000
|
||
}
|
||
|
||
result, nextLastRequestId, err = this.listAccessLogs(tx, partition, lastRequestId, size, day, hourFrom, hourTo, clusterId, nodeId, serverId, reverse, hasError, firewallPolicyId, firewallRuleGroupId, firewallRuleSetId, hasFirewallPolicy, userId, keyword, ip, domain)
|
||
if err != nil || int64(len(result)) < size {
|
||
return
|
||
}
|
||
|
||
moreResult, _, _ := this.listAccessLogs(tx, partition, nextLastRequestId, 1, day, hourFrom, hourTo, clusterId, nodeId, serverId, reverse, hasError, firewallPolicyId, firewallRuleGroupId, firewallRuleSetId, hasFirewallPolicy, userId, keyword, ip, domain)
|
||
hasMore = len(moreResult) > 0
|
||
return
|
||
}
|
||
|
||
// 读取往前的单页访问日志
|
||
func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx,
|
||
partition int32,
|
||
lastRequestId string,
|
||
size int64,
|
||
day string,
|
||
hourFrom string,
|
||
hourTo string,
|
||
clusterId int64,
|
||
nodeId int64,
|
||
serverId int64,
|
||
reverse bool,
|
||
hasError bool,
|
||
firewallPolicyId int64,
|
||
firewallRuleGroupId int64,
|
||
firewallRuleSetId int64,
|
||
hasFirewallPolicy bool,
|
||
userId int64,
|
||
keyword string,
|
||
ip string,
|
||
domain string) (result []*HTTPAccessLog, nextLastRequestId string, err error) {
|
||
if size <= 0 {
|
||
return nil, lastRequestId, nil
|
||
}
|
||
|
||
var serverIds = []int64{}
|
||
if userId > 0 {
|
||
serverIds, err = SharedServerDAO.FindAllEnabledServerIdsWithUserId(tx, userId)
|
||
if err != nil {
|
||
return
|
||
}
|
||
if len(serverIds) == 0 {
|
||
return
|
||
}
|
||
}
|
||
|
||
accessLogLocker.RLock()
|
||
var daoList = []*HTTPAccessLogDAOWrapper{}
|
||
for _, daoWrapper := range httpAccessLogDAOMapping {
|
||
daoList = append(daoList, daoWrapper)
|
||
}
|
||
accessLogLocker.RUnlock()
|
||
|
||
if len(daoList) == 0 {
|
||
daoList = []*HTTPAccessLogDAOWrapper{{
|
||
DAO: SharedHTTPAccessLogDAO,
|
||
NodeId: 0,
|
||
}}
|
||
}
|
||
|
||
// 查询某个集群下的节点
|
||
var nodeIds = []int64{}
|
||
if clusterId > 0 {
|
||
nodeIds, err = SharedNodeDAO.FindAllEnabledNodeIdsWithClusterId(tx, clusterId)
|
||
if err != nil {
|
||
remotelogs.Error("DB_NODE", err.Error())
|
||
return
|
||
}
|
||
sort.Slice(nodeIds, func(i, j int) bool {
|
||
return nodeIds[i] < nodeIds[j]
|
||
})
|
||
}
|
||
|
||
// 准备查询
|
||
var tableQueries = []*accessLogTableQuery{}
|
||
var maxTableName = ""
|
||
for _, daoWrapper := range daoList {
|
||
var instance = daoWrapper.DAO.Instance
|
||
def, err := SharedHTTPAccessLogManager.FindPartitionTable(instance, day, partition)
|
||
if err != nil {
|
||
return nil, "", err
|
||
}
|
||
if !def.Exists {
|
||
continue
|
||
}
|
||
|
||
if len(maxTableName) == 0 || def.Name > maxTableName {
|
||
maxTableName = def.Name
|
||
}
|
||
|
||
tableQueries = append(tableQueries, &accessLogTableQuery{
|
||
daoWrapper: daoWrapper,
|
||
name: def.Name,
|
||
hasRemoteAddrField: def.HasRemoteAddr,
|
||
hasDomainField: def.HasDomain,
|
||
})
|
||
}
|
||
|
||
// 检查各个分表是否一致
|
||
if partition < 0 {
|
||
var newTableQueries = []*accessLogTableQuery{}
|
||
for _, tableQuery := range tableQueries {
|
||
if tableQuery.name != maxTableName {
|
||
continue
|
||
}
|
||
newTableQueries = append(newTableQueries, tableQuery)
|
||
}
|
||
tableQueries = newTableQueries
|
||
}
|
||
|
||
if len(tableQueries) == 0 {
|
||
return nil, "", nil
|
||
}
|
||
|
||
var locker = sync.Mutex{}
|
||
|
||
// 这里正则表达式中的括号不能轻易变更,因为后面有引用
|
||
// TODO 支持多个查询条件的组合,比如 status:200 proto:HTTP/1.1
|
||
var statusPrefixReg = regexp.MustCompile(`status:\s*(\d{3})\b`)
|
||
var statusRangeReg = regexp.MustCompile(`status:\s*(\d{3})-(\d{3})\b`)
|
||
var urlReg = regexp.MustCompile(`^(http|https)://`)
|
||
var requestPathReg = regexp.MustCompile(`requestPath:(\S+)`)
|
||
var protoReg = regexp.MustCompile(`proto:(\S+)`)
|
||
var schemeReg = regexp.MustCompile(`scheme:(\S+)`)
|
||
var methodReg = regexp.MustCompile(`(?:method|requestMethod):(\S+)`)
|
||
var refererReg = regexp.MustCompile(`referer:(\S+)`)
|
||
|
||
var count = len(tableQueries)
|
||
var wg = &sync.WaitGroup{}
|
||
wg.Add(count)
|
||
for _, tableQuery := range tableQueries {
|
||
go func(tableQuery *accessLogTableQuery, keyword string) {
|
||
defer wg.Done()
|
||
|
||
var dao = tableQuery.daoWrapper.DAO
|
||
var query = dao.Query(tx)
|
||
query.Result("id", "serverId", "nodeId", "status", "createdAt", "content", "requestId", "firewallPolicyId", "firewallRuleGroupId", "firewallRuleSetId", "firewallRuleId", "remoteAddr", "domain")
|
||
|
||
// 条件
|
||
if nodeId > 0 {
|
||
query.Attr("nodeId", nodeId)
|
||
} else if clusterId > 0 {
|
||
if len(nodeIds) > 0 {
|
||
var nodeIdStrings = []string{}
|
||
for _, subNodeId := range nodeIds {
|
||
nodeIdStrings = append(nodeIdStrings, types.String(subNodeId))
|
||
}
|
||
|
||
query.Where("nodeId IN (" + strings.Join(nodeIdStrings, ",") + ")")
|
||
query.Reuse(false)
|
||
} else {
|
||
// 如果没有节点,则直接返回空
|
||
return
|
||
}
|
||
}
|
||
if serverId > 0 {
|
||
query.Attr("serverId", serverId)
|
||
} else if userId > 0 && len(serverIds) > 0 {
|
||
query.Attr("serverId", serverIds).
|
||
Reuse(false)
|
||
}
|
||
if hasError {
|
||
query.Where("status>=400")
|
||
}
|
||
if firewallPolicyId > 0 {
|
||
query.Attr("firewallPolicyId", firewallPolicyId)
|
||
}
|
||
if firewallRuleGroupId > 0 {
|
||
query.Attr("firewallRuleGroupId", firewallRuleGroupId)
|
||
}
|
||
if firewallRuleSetId > 0 {
|
||
query.Attr("firewallRuleSetId", firewallRuleSetId)
|
||
}
|
||
if hasFirewallPolicy {
|
||
query.Where("firewallPolicyId>0")
|
||
query.UseIndex("firewallPolicyId")
|
||
}
|
||
|
||
// keyword
|
||
if len(ip) > 0 {
|
||
// TODO 支持IP范围
|
||
if tableQuery.hasRemoteAddrField {
|
||
// IP格式
|
||
if strings.Contains(ip, ",") || strings.Contains(ip, "-") {
|
||
rangeConfig, err := shared.ParseIPRange(ip)
|
||
if err == nil {
|
||
if len(rangeConfig.IPFrom) > 0 && len(rangeConfig.IPTo) > 0 {
|
||
query.Between("INET_ATON(remoteAddr)", utils.IP2Long(rangeConfig.IPFrom), utils.IP2Long(rangeConfig.IPTo))
|
||
}
|
||
}
|
||
} else {
|
||
// 去掉IPv6的[]
|
||
ip = strings.Trim(ip, "[]")
|
||
|
||
query.Attr("remoteAddr", ip)
|
||
query.UseIndex("remoteAddr")
|
||
}
|
||
} else {
|
||
query.Where("JSON_EXTRACT(content, '$.remoteAddr')=:ip1").
|
||
Param("ip1", ip)
|
||
}
|
||
}
|
||
if len(domain) > 0 {
|
||
if tableQuery.hasDomainField {
|
||
if strings.Contains(domain, "*") {
|
||
domain = strings.ReplaceAll(domain, "*", "%")
|
||
domain = regexp.MustCompile(`[^a-zA-Z0-9-.%]`).ReplaceAllString(domain, "")
|
||
query.Where("domain LIKE :host2").
|
||
Param("host2", domain)
|
||
} else {
|
||
// 中文域名
|
||
if !regexp.MustCompile(`^[a-zA-Z0-9-.]+$`).MatchString(domain) {
|
||
unicodeDomain, err := idna.ToASCII(domain)
|
||
if err == nil && len(unicodeDomain) > 0 {
|
||
domain = unicodeDomain
|
||
}
|
||
}
|
||
|
||
query.Attr("domain", domain)
|
||
query.UseIndex("domain")
|
||
}
|
||
} else {
|
||
query.Where("JSON_EXTRACT(content, '$.host')=:host1").
|
||
Param("host1", domain)
|
||
}
|
||
}
|
||
|
||
if len(keyword) > 0 {
|
||
var isSpecialKeyword = false
|
||
|
||
if tableQuery.hasRemoteAddrField && net.ParseIP(keyword) != nil { // ip
|
||
isSpecialKeyword = true
|
||
query.Attr("remoteAddr", keyword)
|
||
} else if tableQuery.hasRemoteAddrField && regexp.MustCompile(`^ip:.+`).MatchString(keyword) { // ip:x.x.x.x
|
||
isSpecialKeyword = true
|
||
keyword = keyword[3:]
|
||
pieces := strings.SplitN(keyword, ",", 2)
|
||
if len(pieces) == 1 || len(pieces[1]) == 0 || pieces[0] == pieces[1] {
|
||
query.Attr("remoteAddr", pieces[0])
|
||
} else {
|
||
query.Between("INET_ATON(remoteAddr)", utils.IP2Long(pieces[0]), utils.IP2Long(pieces[1]))
|
||
}
|
||
} else if statusRangeReg.MatchString(keyword) { // status:200-400
|
||
isSpecialKeyword = true
|
||
var matches = statusRangeReg.FindStringSubmatch(keyword)
|
||
query.Between("status", types.Int(matches[1]), types.Int(matches[2]))
|
||
} else if statusPrefixReg.MatchString(keyword) { // status:200
|
||
isSpecialKeyword = true
|
||
var matches = statusPrefixReg.FindStringSubmatch(keyword)
|
||
query.Attr("status", matches[1])
|
||
} else if requestPathReg.MatchString(keyword) {
|
||
isSpecialKeyword = true
|
||
var matches = requestPathReg.FindStringSubmatch(keyword)
|
||
query.Where("JSON_EXTRACT(content, '$.requestPath')=:keyword").
|
||
Param("keyword", matches[1])
|
||
} else if protoReg.MatchString(keyword) {
|
||
isSpecialKeyword = true
|
||
var matches = protoReg.FindStringSubmatch(keyword)
|
||
query.Where("JSON_EXTRACT(content, '$.proto')=:keyword").
|
||
Param("keyword", strings.ToUpper(matches[1]))
|
||
} else if schemeReg.MatchString(keyword) {
|
||
isSpecialKeyword = true
|
||
var matches = schemeReg.FindStringSubmatch(keyword)
|
||
query.Where("JSON_EXTRACT(content, '$.scheme')=:keyword").
|
||
Param("keyword", strings.ToLower(matches[1]))
|
||
} else if urlReg.MatchString(keyword) { // https://xxx/yyy
|
||
u, err := url.Parse(keyword)
|
||
if err == nil {
|
||
isSpecialKeyword = true
|
||
query.Attr("domain", u.Host)
|
||
query.Where("JSON_EXTRACT(content, '$.requestURI') LIKE :keyword").
|
||
Param("keyword", dbutils.QuoteLikePrefix("\""+u.RequestURI()))
|
||
}
|
||
} else if methodReg.MatchString(keyword) { // method|requestMethod:xxx
|
||
isSpecialKeyword = true
|
||
var matches = methodReg.FindStringSubmatch(keyword)
|
||
query.Where("JSON_EXTRACT(content, '$.requestMethod')=:keyword").
|
||
Param("keyword", strings.ToUpper(matches[1]))
|
||
} else if refererReg.MatchString(keyword) {
|
||
isSpecialKeyword = true
|
||
var matches = refererReg.FindStringSubmatch(keyword)
|
||
query.Where("JSON_EXTRACT(content, '$.referer') LIKE :keyword").
|
||
Param("keyword", dbutils.QuoteLike(matches[1]))
|
||
}
|
||
if !isSpecialKeyword {
|
||
if regexp.MustCompile(`^ip:.+`).MatchString(keyword) {
|
||
keyword = keyword[3:]
|
||
}
|
||
|
||
var useOriginKeyword = false
|
||
|
||
where := "JSON_EXTRACT(content, '$.remoteAddr') LIKE :keyword OR JSON_EXTRACT(content, '$.requestURI') LIKE :keyword OR JSON_EXTRACT(content, '$.host') LIKE :keyword OR JSON_EXTRACT(content, '$.userAgent') LIKE :keyword"
|
||
|
||
jsonKeyword, err := json.Marshal(keyword)
|
||
if err == nil {
|
||
where += " OR JSON_CONTAINS(content, :jsonKeyword, '$.tags')"
|
||
query.Param("jsonKeyword", jsonKeyword)
|
||
}
|
||
|
||
// 请求方法
|
||
if keyword == http.MethodGet ||
|
||
keyword == http.MethodPost ||
|
||
keyword == http.MethodHead ||
|
||
keyword == http.MethodConnect ||
|
||
keyword == http.MethodPut ||
|
||
keyword == http.MethodTrace ||
|
||
keyword == http.MethodOptions ||
|
||
keyword == http.MethodDelete ||
|
||
keyword == http.MethodPatch {
|
||
where += " OR JSON_EXTRACT(content, '$.requestMethod')=:originKeyword"
|
||
useOriginKeyword = true
|
||
}
|
||
|
||
// 响应状态码
|
||
if regexp.MustCompile(`^\d{3}$`).MatchString(keyword) {
|
||
where += " OR status=:intKeyword"
|
||
query.Param("intKeyword", types.Int(keyword))
|
||
}
|
||
|
||
if regexp.MustCompile(`^\d{3}-\d{3}$`).MatchString(keyword) {
|
||
pieces := strings.Split(keyword, "-")
|
||
where += " OR status BETWEEN :intKeyword1 AND :intKeyword2"
|
||
query.Param("intKeyword1", types.Int(pieces[0]))
|
||
query.Param("intKeyword2", types.Int(pieces[1]))
|
||
}
|
||
|
||
if regexp.MustCompile(`^\d{20,}\s*\.?$`).MatchString(keyword) {
|
||
where += " OR requestId=:requestId"
|
||
query.Param("requestId", strings.TrimRight(keyword, ". "))
|
||
}
|
||
|
||
query.Where("("+where+")").
|
||
Param("keyword", dbutils.QuoteLike(keyword))
|
||
if useOriginKeyword {
|
||
query.Param("originKeyword", keyword)
|
||
}
|
||
}
|
||
}
|
||
|
||
// hourFrom - hourTo
|
||
if len(hourFrom) > 0 && len(hourTo) > 0 {
|
||
var hourFromInt = types.Int(hourFrom)
|
||
var hourToInt = types.Int(hourTo)
|
||
if hourFromInt >= 0 && hourFromInt <= 23 && hourToInt >= hourFromInt && hourToInt <= 23 {
|
||
var y = types.Int(day[:4])
|
||
var m = types.Int(day[4:6])
|
||
var d = types.Int(day[6:])
|
||
var timeFrom = time.Date(y, time.Month(m), d, hourFromInt, 0, 0, 0, time.Local)
|
||
var timeTo = time.Date(y, time.Month(m), d, hourToInt, 59, 59, 0, time.Local)
|
||
query.Between("createdAt", timeFrom.Unix(), timeTo.Unix())
|
||
}
|
||
}
|
||
|
||
// offset
|
||
if len(lastRequestId) > 0 {
|
||
if !reverse {
|
||
query.Where("requestId<:requestId").
|
||
Param("requestId", lastRequestId)
|
||
} else {
|
||
query.Where("requestId>:requestId").
|
||
Param("requestId", lastRequestId)
|
||
}
|
||
}
|
||
|
||
if !reverse {
|
||
query.Desc("requestId")
|
||
} else {
|
||
query.Asc("requestId")
|
||
}
|
||
|
||
// 开始查询
|
||
ones, err := query.
|
||
Table(tableQuery.name).
|
||
Limit(size).
|
||
FindAll()
|
||
if err != nil {
|
||
remotelogs.Println("DB_NODE", err.Error())
|
||
return
|
||
}
|
||
|
||
locker.Lock()
|
||
for _, one := range ones {
|
||
var accessLog = one.(*HTTPAccessLog)
|
||
result = append(result, accessLog)
|
||
}
|
||
locker.Unlock()
|
||
}(tableQuery, keyword)
|
||
}
|
||
wg.Wait()
|
||
|
||
if len(result) == 0 {
|
||
return nil, lastRequestId, nil
|
||
}
|
||
|
||
// 按照requestId排序
|
||
sort.Slice(result, func(i, j int) bool {
|
||
if !reverse {
|
||
return result[i].RequestId > result[j].RequestId
|
||
} else {
|
||
return result[i].RequestId < result[j].RequestId
|
||
}
|
||
})
|
||
|
||
if int64(len(result)) > size {
|
||
result = result[:size]
|
||
}
|
||
|
||
var requestId = result[len(result)-1].RequestId
|
||
if reverse {
|
||
lists.Reverse(result)
|
||
}
|
||
|
||
if !reverse {
|
||
return result, requestId, nil
|
||
} else {
|
||
return result, requestId, nil
|
||
}
|
||
}
|
||
|
||
// FindAccessLogWithRequestId 根据请求ID获取访问日志
|
||
func (this *HTTPAccessLogDAO) FindAccessLogWithRequestId(tx *dbs.Tx, requestId string) (*HTTPAccessLog, error) {
|
||
if !regexp.MustCompile(`^\d{11,}`).MatchString(requestId) {
|
||
return nil, errors.New("invalid requestId")
|
||
}
|
||
|
||
accessLogLocker.RLock()
|
||
daoList := []*HTTPAccessLogDAOWrapper{}
|
||
for _, daoWrapper := range httpAccessLogDAOMapping {
|
||
daoList = append(daoList, daoWrapper)
|
||
}
|
||
accessLogLocker.RUnlock()
|
||
|
||
if len(daoList) == 0 {
|
||
daoList = []*HTTPAccessLogDAOWrapper{{
|
||
DAO: SharedHTTPAccessLogDAO,
|
||
NodeId: 0,
|
||
}}
|
||
}
|
||
|
||
// 准备查询
|
||
var day = timeutil.FormatTime("Ymd", types.Int64(requestId[:10]))
|
||
var tableQueries = []*accessLogTableQuery{}
|
||
for _, daoWrapper := range daoList {
|
||
var instance = daoWrapper.DAO.Instance
|
||
tableDefs, err := SharedHTTPAccessLogManager.FindTables(instance, day)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
for _, def := range tableDefs {
|
||
tableQueries = append(tableQueries, &accessLogTableQuery{
|
||
daoWrapper: daoWrapper,
|
||
name: def.Name,
|
||
hasRemoteAddrField: def.HasRemoteAddr,
|
||
hasDomainField: def.HasDomain,
|
||
})
|
||
}
|
||
}
|
||
|
||
var count = len(tableQueries)
|
||
var wg = &sync.WaitGroup{}
|
||
wg.Add(count)
|
||
var result *HTTPAccessLog = nil
|
||
for _, tableQuery := range tableQueries {
|
||
go func(tableQuery *accessLogTableQuery) {
|
||
defer wg.Done()
|
||
|
||
var dao = tableQuery.daoWrapper.DAO
|
||
one, err := dao.Query(tx).
|
||
Table(tableQuery.name).
|
||
Attr("requestId", requestId).
|
||
Find()
|
||
if err != nil {
|
||
logs.Println("[DB_NODE]" + err.Error())
|
||
return
|
||
}
|
||
if one != nil {
|
||
result = one.(*HTTPAccessLog)
|
||
}
|
||
}(tableQuery)
|
||
}
|
||
wg.Wait()
|
||
return result, nil
|
||
}
|
||
|
||
// SetupQueue 建立队列
|
||
func (this *HTTPAccessLogDAO) SetupQueue() {
|
||
configJSON, err := SharedSysSettingDAO.ReadSetting(nil, systemconfigs.SettingCodeAccessLogQueue)
|
||
if err != nil {
|
||
remotelogs.Error("HTTP_ACCESS_LOG_QUEUE", "read settings failed: "+err.Error())
|
||
return
|
||
}
|
||
|
||
if len(configJSON) == 0 {
|
||
return
|
||
}
|
||
|
||
if bytes.Equal(accessLogConfigJSON, configJSON) {
|
||
return
|
||
}
|
||
accessLogConfigJSON = configJSON
|
||
|
||
var config = &serverconfigs.AccessLogQueueConfig{}
|
||
err = json.Unmarshal(configJSON, config)
|
||
if err != nil {
|
||
remotelogs.Error("HTTP_ACCESS_LOG_QUEUE", "decode settings failed: "+err.Error())
|
||
return
|
||
}
|
||
|
||
accessLogQueuePercent = config.Percent
|
||
accessLogCountPerSecond = config.CountPerSecond
|
||
if accessLogCountPerSecond <= 0 {
|
||
accessLogCountPerSecond = 10_000
|
||
}
|
||
if config.MaxLength <= 0 {
|
||
config.MaxLength = 100_000
|
||
}
|
||
|
||
accessLogEnableAutoPartial = config.EnableAutoPartial
|
||
if config.RowsPerTable > 0 {
|
||
accessLogRowsPerTable = config.RowsPerTable
|
||
}
|
||
|
||
if accessLogQueueMaxLength != config.MaxLength {
|
||
accessLogQueueMaxLength = config.MaxLength
|
||
oldAccessLogQueue = accessLogQueue
|
||
accessLogQueue = make(chan *pb.HTTPAccessLog, config.MaxLength)
|
||
}
|
||
}
|