mirror of
https://github.com/TeaOSLab/EdgeAPI.git
synced 2025-11-27 18:33:39 +08:00
实现访问日志队列
This commit is contained in:
@@ -1,16 +1,23 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/errors"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/goman"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/utils"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/zero"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs"
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
"github.com/iwind/TeaGo/dbs"
|
||||
"github.com/iwind/TeaGo/lists"
|
||||
"github.com/iwind/TeaGo/logs"
|
||||
"github.com/iwind/TeaGo/rands"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
timeutil "github.com/iwind/TeaGo/utils/time"
|
||||
"net"
|
||||
@@ -26,10 +33,53 @@ type HTTPAccessLogDAO dbs.DAO
|
||||
|
||||
var SharedHTTPAccessLogDAO *HTTPAccessLogDAO
|
||||
|
||||
// 队列
|
||||
var oldAccessLogQueue = make(chan *pb.HTTPAccessLog)
|
||||
var accessLogQueue = make(chan *pb.HTTPAccessLog, 10_000)
|
||||
var accessLogQueueMaxLength = 100_000
|
||||
var accessLogQueuePercent = 100 // 0-100
|
||||
var accessLogCountPerSecond = 10_000 // 0 表示不限制
|
||||
var accessLogConfigJSON = []byte{}
|
||||
var accessLogQueueChanged = make(chan zero.Zero, 1)
|
||||
|
||||
func init() {
|
||||
dbs.OnReady(func() {
|
||||
SharedHTTPAccessLogDAO = NewHTTPAccessLogDAO()
|
||||
})
|
||||
|
||||
// 队列相关
|
||||
dbs.OnReadyDone(func() {
|
||||
// 检查队列变化
|
||||
goman.New(func() {
|
||||
var ticker = time.NewTicker(60 * time.Second)
|
||||
|
||||
// 先执行一次初始化
|
||||
SharedHTTPAccessLogDAO.SetupQueue()
|
||||
|
||||
// 循环执行
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
SharedHTTPAccessLogDAO.SetupQueue()
|
||||
case <-accessLogQueueChanged:
|
||||
SharedHTTPAccessLogDAO.SetupQueue()
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// 导出队列内容
|
||||
goman.New(func() {
|
||||
var ticker = time.NewTicker(1 * time.Second)
|
||||
for range ticker.C {
|
||||
var tx *dbs.Tx
|
||||
err := SharedHTTPAccessLogDAO.DumpAccessLogsFromQueue(tx, accessLogCountPerSecond)
|
||||
if err != nil {
|
||||
remotelogs.Error("HTTP_ACCESS_LOG_QUEUE", "dump access logs failed: "+err.Error())
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func NewHTTPAccessLogDAO() *HTTPAccessLogDAO {
|
||||
@@ -45,6 +95,31 @@ func NewHTTPAccessLogDAO() *HTTPAccessLogDAO {
|
||||
|
||||
// CreateHTTPAccessLogs 创建访问日志
|
||||
func (this *HTTPAccessLogDAO) CreateHTTPAccessLogs(tx *dbs.Tx, accessLogs []*pb.HTTPAccessLog) error {
|
||||
// 写入队列
|
||||
var queue = accessLogQueue // 这样写非常重要,防止在写入过程中队列有切换
|
||||
for _, accessLog := range accessLogs {
|
||||
if accessLog.FirewallPolicyId == 0 { // 如果是WAF记录,则采取采样率
|
||||
// 采样率
|
||||
if accessLogQueuePercent <= 0 {
|
||||
return nil
|
||||
}
|
||||
if accessLogQueuePercent < 100 && rands.Int(1, 100) > accessLogQueuePercent {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case queue <- accessLog:
|
||||
default:
|
||||
// 超出的丢弃
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// DumpAccessLogsFromQueue 从队列导入访问日志
|
||||
func (this *HTTPAccessLogDAO) DumpAccessLogsFromQueue(tx *dbs.Tx, size int) error {
|
||||
dao := randomHTTPAccessLogDAO()
|
||||
if dao == nil {
|
||||
dao = &HTTPAccessLogDAOWrapper{
|
||||
@@ -52,79 +127,102 @@ func (this *HTTPAccessLogDAO) CreateHTTPAccessLogs(tx *dbs.Tx, accessLogs []*pb.
|
||||
NodeId: 0,
|
||||
}
|
||||
}
|
||||
return this.CreateHTTPAccessLogsWithDAO(tx, dao, accessLogs)
|
||||
|
||||
if size <= 0 {
|
||||
size = 1_000_000
|
||||
}
|
||||
|
||||
// 复制变量,防止中途改变
|
||||
var oldQueue = oldAccessLogQueue
|
||||
var newQueue = accessLogQueue
|
||||
|
||||
Loop:
|
||||
for i := 0; i < size; i++ {
|
||||
// old
|
||||
select {
|
||||
case accessLog := <-oldQueue:
|
||||
err := this.CreateHTTPAccessLog(tx, dao.DAO, accessLog)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
continue Loop
|
||||
default:
|
||||
|
||||
}
|
||||
|
||||
// new
|
||||
select {
|
||||
case accessLog := <-newQueue:
|
||||
err := this.CreateHTTPAccessLog(tx, dao.DAO, accessLog)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
continue Loop
|
||||
default:
|
||||
break Loop
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateHTTPAccessLogsWithDAO 使用特定的DAO创建访问日志
|
||||
func (this *HTTPAccessLogDAO) CreateHTTPAccessLogsWithDAO(tx *dbs.Tx, daoWrapper *HTTPAccessLogDAOWrapper, accessLogs []*pb.HTTPAccessLog) error {
|
||||
if daoWrapper == nil {
|
||||
return errors.New("dao should not be nil")
|
||||
}
|
||||
if len(accessLogs) == 0 {
|
||||
return nil
|
||||
// CreateHTTPAccessLog 写入单条访问日志
|
||||
func (this *HTTPAccessLogDAO) CreateHTTPAccessLog(tx *dbs.Tx, dao *HTTPAccessLogDAO, accessLog *pb.HTTPAccessLog) error {
|
||||
day := timeutil.Format("Ymd", time.Unix(accessLog.Timestamp, 0))
|
||||
tableDef, err := findHTTPAccessLogTable(dao.Instance, day, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dao := daoWrapper.DAO
|
||||
fields := map[string]interface{}{}
|
||||
fields["serverId"] = accessLog.ServerId
|
||||
fields["nodeId"] = accessLog.NodeId
|
||||
fields["status"] = accessLog.Status
|
||||
fields["createdAt"] = accessLog.Timestamp
|
||||
fields["requestId"] = accessLog.RequestId
|
||||
fields["firewallPolicyId"] = accessLog.FirewallPolicyId
|
||||
fields["firewallRuleGroupId"] = accessLog.FirewallRuleGroupId
|
||||
fields["firewallRuleSetId"] = accessLog.FirewallRuleSetId
|
||||
fields["firewallRuleId"] = accessLog.FirewallRuleId
|
||||
|
||||
// TODO 改成事务批量提交,以加快速度
|
||||
if len(accessLog.RequestBody) > 0 {
|
||||
fields["requestBody"] = accessLog.RequestBody
|
||||
accessLog.RequestBody = nil
|
||||
}
|
||||
|
||||
for _, accessLog := range accessLogs {
|
||||
day := timeutil.Format("Ymd", time.Unix(accessLog.Timestamp, 0))
|
||||
tableDef, err := findHTTPAccessLogTable(dao.Instance, day, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if tableDef.HasRemoteAddr {
|
||||
fields["remoteAddr"] = accessLog.RemoteAddr
|
||||
}
|
||||
if tableDef.HasDomain {
|
||||
fields["domain"] = accessLog.Host
|
||||
}
|
||||
|
||||
fields := map[string]interface{}{}
|
||||
fields["serverId"] = accessLog.ServerId
|
||||
fields["nodeId"] = accessLog.NodeId
|
||||
fields["status"] = accessLog.Status
|
||||
fields["createdAt"] = accessLog.Timestamp
|
||||
fields["requestId"] = accessLog.RequestId
|
||||
fields["firewallPolicyId"] = accessLog.FirewallPolicyId
|
||||
fields["firewallRuleGroupId"] = accessLog.FirewallRuleGroupId
|
||||
fields["firewallRuleSetId"] = accessLog.FirewallRuleSetId
|
||||
fields["firewallRuleId"] = accessLog.FirewallRuleId
|
||||
content, err := json.Marshal(accessLog)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fields["content"] = content
|
||||
|
||||
if len(accessLog.RequestBody) > 0 {
|
||||
fields["requestBody"] = accessLog.RequestBody
|
||||
accessLog.RequestBody = nil
|
||||
}
|
||||
|
||||
if tableDef.HasRemoteAddr {
|
||||
fields["remoteAddr"] = accessLog.RemoteAddr
|
||||
}
|
||||
if tableDef.HasDomain {
|
||||
fields["domain"] = accessLog.Host
|
||||
}
|
||||
|
||||
content, err := json.Marshal(accessLog)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fields["content"] = content
|
||||
|
||||
_, err = dao.Query(tx).
|
||||
Table(tableDef.Name).
|
||||
Sets(fields).
|
||||
Insert()
|
||||
if err != nil {
|
||||
// 是否为 Error 1146: Table 'xxx.xxx' doesn't exist 如果是,则创建表之后重试
|
||||
if strings.Contains(err.Error(), "1146") {
|
||||
tableDef, err = findHTTPAccessLogTable(dao.Instance, day, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = dao.Query(tx).
|
||||
Table(tableDef.Name).
|
||||
Sets(fields).
|
||||
Insert()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
logs.Println("HTTP_ACCESS_LOG", err.Error())
|
||||
_, err = dao.Query(tx).
|
||||
Table(tableDef.Name).
|
||||
Sets(fields).
|
||||
Insert()
|
||||
if err != nil {
|
||||
// 是否为 Error 1146: Table 'xxx.xxx' doesn't exist 如果是,则创建表之后重试
|
||||
if strings.Contains(err.Error(), "1146") {
|
||||
tableDef, err = findHTTPAccessLogTable(dao.Instance, day, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = dao.Query(tx).
|
||||
Table(tableDef.Name).
|
||||
Sets(fields).
|
||||
Insert()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
remotelogs.Error("HTTP_ACCESS_LOG", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -468,3 +566,42 @@ func (this *HTTPAccessLogDAO) FindAccessLogWithRequestId(tx *dbs.Tx, requestId s
|
||||
wg.Wait()
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// SetupQueue 建立队列
|
||||
func (this *HTTPAccessLogDAO) SetupQueue() {
|
||||
configJSON, err := SharedSysSettingDAO.ReadSetting(nil, systemconfigs.SettingCodeAccessLogQueue)
|
||||
if err != nil {
|
||||
remotelogs.Error("HTTP_ACCESS_LOG_QUEUE", "read settings failed: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
if len(configJSON) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
if bytes.Compare(accessLogConfigJSON, configJSON) == 0 {
|
||||
return
|
||||
}
|
||||
accessLogConfigJSON = configJSON
|
||||
|
||||
var config = &serverconfigs.AccessLogQueueConfig{}
|
||||
err = json.Unmarshal(configJSON, config)
|
||||
if err != nil {
|
||||
remotelogs.Error("HTTP_ACCESS_LOG_QUEUE", "decode settings failed: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
accessLogQueuePercent = config.Percent
|
||||
accessLogCountPerSecond = config.CountPerSecond
|
||||
if config.MaxLength <= 0 {
|
||||
config.MaxLength = 100_000
|
||||
}
|
||||
|
||||
if accessLogQueueMaxLength != config.MaxLength {
|
||||
accessLogQueueMaxLength = config.MaxLength
|
||||
oldAccessLogQueue = accessLogQueue
|
||||
accessLogQueue = make(chan *pb.HTTPAccessLog, config.MaxLength)
|
||||
}
|
||||
|
||||
remotelogs.Println("HTTP_ACCESS_LOG_QUEUE", "change queue max length: "+types.String(config.MaxLength)+", percent: "+types.String(config.Percent)+", countPerSecond: "+types.String(config.CountPerSecond))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user