Files
EdgeAPI/internal/db/models/http_access_log_dao.go

670 lines
18 KiB
Go
Raw Normal View History

2020-10-10 11:49:21 +08:00
package models
import (
2021-12-14 12:44:57 +08:00
"bytes"
2020-10-10 11:49:21 +08:00
"encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
2021-12-14 12:44:57 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeAPI/internal/utils"
2021-12-14 12:44:57 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/zero"
2020-10-10 11:49:21 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
2021-12-14 12:44:57 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
2021-12-14 12:44:57 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs"
2020-10-10 11:49:21 +08:00
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
2020-10-10 19:21:32 +08:00
"github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/logs"
2021-12-14 12:44:57 +08:00
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
2020-10-10 11:49:21 +08:00
timeutil "github.com/iwind/TeaGo/utils/time"
"net"
"net/http"
"regexp"
2020-10-10 19:21:32 +08:00
"sort"
2020-10-10 11:49:21 +08:00
"strings"
2020-10-10 19:21:32 +08:00
"sync"
2020-10-10 11:49:21 +08:00
"time"
)
type HTTPAccessLogDAO dbs.DAO
2020-10-13 20:05:13 +08:00
var SharedHTTPAccessLogDAO *HTTPAccessLogDAO
2021-12-14 12:44:57 +08:00
// 队列
2022-03-08 19:55:39 +08:00
var (
oldAccessLogQueue = make(chan *pb.HTTPAccessLog)
accessLogQueue = make(chan *pb.HTTPAccessLog, 10_000)
accessLogQueueMaxLength = 100_000
accessLogQueuePercent = 100 // 0-100
accessLogCountPerSecond = 10_000 // 0 表示不限制
accessLogConfigJSON = []byte{}
accessLogQueueChanged = make(chan zero.Zero, 1)
accessLogEnableAutoPartial = true // 是否启用自动分表
accessLogRowsPerTable int64 = 500_000 // 自动分表的单表最大值
2022-03-08 19:55:39 +08:00
)
type accessLogTableQuery struct {
daoWrapper *HTTPAccessLogDAOWrapper
name string
hasRemoteAddrField bool
hasDomainField bool
}
2021-12-14 12:44:57 +08:00
2020-10-13 20:05:13 +08:00
func init() {
dbs.OnReady(func() {
SharedHTTPAccessLogDAO = NewHTTPAccessLogDAO()
})
2021-12-14 12:44:57 +08:00
// 队列相关
dbs.OnReadyDone(func() {
// 检查队列变化
goman.New(func() {
var ticker = time.NewTicker(60 * time.Second)
// 先执行一次初始化
SharedHTTPAccessLogDAO.SetupQueue()
// 循环执行
for {
select {
case <-ticker.C:
SharedHTTPAccessLogDAO.SetupQueue()
case <-accessLogQueueChanged:
SharedHTTPAccessLogDAO.SetupQueue()
}
}
})
// 导出队列内容
goman.New(func() {
var ticker = time.NewTicker(1 * time.Second)
for range ticker.C {
var tx *dbs.Tx
err := SharedHTTPAccessLogDAO.DumpAccessLogsFromQueue(tx, accessLogCountPerSecond)
if err != nil {
remotelogs.Error("HTTP_ACCESS_LOG_QUEUE", "dump access logs failed: "+err.Error())
}
}
})
})
2020-10-13 20:05:13 +08:00
}
2020-10-10 11:49:21 +08:00
func NewHTTPAccessLogDAO() *HTTPAccessLogDAO {
return dbs.NewDAO(&HTTPAccessLogDAO{
DAOObject: dbs.DAOObject{
DB: Tea.Env,
Table: "edgeHTTPAccessLogs",
Model: new(HTTPAccessLog),
PkName: "id",
},
}).(*HTTPAccessLogDAO)
}
2021-06-02 11:53:24 +08:00
// CreateHTTPAccessLogs 创建访问日志
func (this *HTTPAccessLogDAO) CreateHTTPAccessLogs(tx *dbs.Tx, accessLogs []*pb.HTTPAccessLog) error {
2021-12-14 12:44:57 +08:00
// 写入队列
var queue = accessLogQueue // 这样写非常重要,防止在写入过程中队列有切换
for _, accessLog := range accessLogs {
if accessLog.FirewallPolicyId == 0 { // 如果是WAF记录则采取采样率
// 采样率
if accessLogQueuePercent <= 0 {
return nil
}
if accessLogQueuePercent < 100 && rands.Int(1, 100) > accessLogQueuePercent {
return nil
}
}
select {
case queue <- accessLog:
default:
// 超出的丢弃
}
}
return nil
}
// DumpAccessLogsFromQueue 从队列导入访问日志
func (this *HTTPAccessLogDAO) DumpAccessLogsFromQueue(tx *dbs.Tx, size int) error {
2022-03-08 19:55:39 +08:00
var dao = randomHTTPAccessLogDAO()
2020-10-10 11:49:21 +08:00
if dao == nil {
2020-10-10 19:21:32 +08:00
dao = &HTTPAccessLogDAOWrapper{
DAO: SharedHTTPAccessLogDAO,
NodeId: 0,
}
2020-10-10 11:49:21 +08:00
}
2021-12-14 12:44:57 +08:00
if size <= 0 {
size = 1_000_000
2020-10-10 11:49:21 +08:00
}
2021-12-14 12:44:57 +08:00
// 复制变量,防止中途改变
var oldQueue = oldAccessLogQueue
var newQueue = accessLogQueue
2020-10-10 19:21:32 +08:00
2021-12-14 12:44:57 +08:00
Loop:
for i := 0; i < size; i++ {
// old
select {
case accessLog := <-oldQueue:
err := this.CreateHTTPAccessLog(tx, dao.DAO, accessLog)
if err != nil {
return err
}
continue Loop
default:
2020-10-10 11:49:21 +08:00
}
2021-12-14 12:44:57 +08:00
// new
select {
case accessLog := <-newQueue:
err := this.CreateHTTPAccessLog(tx, dao.DAO, accessLog)
if err != nil {
return err
}
continue Loop
default:
break Loop
2021-12-07 14:57:55 +08:00
}
2021-12-14 12:44:57 +08:00
}
2021-12-07 14:57:55 +08:00
2021-12-14 12:44:57 +08:00
return nil
}
2021-12-14 12:44:57 +08:00
// CreateHTTPAccessLog 写入单条访问日志
func (this *HTTPAccessLogDAO) CreateHTTPAccessLog(tx *dbs.Tx, dao *HTTPAccessLogDAO, accessLog *pb.HTTPAccessLog) error {
2022-03-08 19:55:39 +08:00
var day = timeutil.Format("Ymd", time.Unix(accessLog.Timestamp, 0))
tableDef, err := SharedHTTPAccessLogManager.FindTable(dao.Instance, day, true)
2021-12-14 12:44:57 +08:00
if err != nil {
return err
}
2020-10-10 11:49:21 +08:00
2021-12-14 12:44:57 +08:00
fields := map[string]interface{}{}
fields["serverId"] = accessLog.ServerId
fields["nodeId"] = accessLog.NodeId
fields["status"] = accessLog.Status
fields["createdAt"] = accessLog.Timestamp
fields["requestId"] = accessLog.RequestId
fields["firewallPolicyId"] = accessLog.FirewallPolicyId
fields["firewallRuleGroupId"] = accessLog.FirewallRuleGroupId
fields["firewallRuleSetId"] = accessLog.FirewallRuleSetId
fields["firewallRuleId"] = accessLog.FirewallRuleId
if len(accessLog.RequestBody) > 0 {
fields["requestBody"] = accessLog.RequestBody
accessLog.RequestBody = nil
}
if tableDef.HasRemoteAddr {
fields["remoteAddr"] = accessLog.RemoteAddr
}
if tableDef.HasDomain {
fields["domain"] = accessLog.Host
}
content, err := json.Marshal(accessLog)
if err != nil {
return err
}
fields["content"] = content
2022-03-08 19:55:39 +08:00
var lastId int64
lastId, err = dao.Query(tx).
2021-12-14 12:44:57 +08:00
Table(tableDef.Name).
Sets(fields).
Insert()
if err != nil {
2022-03-08 19:55:39 +08:00
return err
}
if accessLogEnableAutoPartial && accessLogRowsPerTable > 0 && lastId%accessLogRowsPerTable == 0 {
2022-03-08 19:55:39 +08:00
SharedHTTPAccessLogManager.ResetTable(dao.Instance, day)
2020-10-10 11:49:21 +08:00
}
return nil
}
2020-10-10 19:21:32 +08:00
2021-06-02 11:53:24 +08:00
// ListAccessLogs 读取往前的 单页访问日志
func (this *HTTPAccessLogDAO) ListAccessLogs(tx *dbs.Tx, lastRequestId string,
size int64,
day string,
clusterId int64,
nodeId int64,
serverId int64,
reverse bool,
hasError bool,
firewallPolicyId int64,
firewallRuleGroupId int64,
firewallRuleSetId int64,
hasFirewallPolicy bool,
userId int64,
keyword string,
ip string,
domain string) (result []*HTTPAccessLog, nextLastRequestId string, hasMore bool, err error) {
2020-10-10 19:21:32 +08:00
if len(day) != 8 {
return
}
// 限制能查询的最大条数,防止占用内存过多
if size > 1000 {
size = 1000
}
result, nextLastRequestId, err = this.listAccessLogs(tx, lastRequestId, size, day, clusterId, nodeId, serverId, reverse, hasError, firewallPolicyId, firewallRuleGroupId, firewallRuleSetId, hasFirewallPolicy, userId, keyword, ip, domain)
2020-10-10 19:21:32 +08:00
if err != nil || int64(len(result)) < size {
return
}
moreResult, _, _ := this.listAccessLogs(tx, nextLastRequestId, 1, day, clusterId, nodeId, serverId, reverse, hasError, firewallPolicyId, firewallRuleGroupId, firewallRuleSetId, hasFirewallPolicy, userId, keyword, ip, domain)
2020-10-10 19:21:32 +08:00
hasMore = len(moreResult) > 0
return
}
// 读取往前的单页访问日志
func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx, lastRequestId string, size int64, day string, clusterId int64, nodeId int64, serverId int64, reverse bool, hasError bool, firewallPolicyId int64, firewallRuleGroupId int64, firewallRuleSetId int64, hasFirewallPolicy bool, userId int64, keyword string, ip string, domain string) (result []*HTTPAccessLog, nextLastRequestId string, err error) {
2020-10-10 19:21:32 +08:00
if size <= 0 {
return nil, lastRequestId, nil
}
serverIds := []int64{}
if userId > 0 {
serverIds, err = SharedServerDAO.FindAllEnabledServerIdsWithUserId(tx, userId)
if err != nil {
return
}
if len(serverIds) == 0 {
return
}
}
2020-10-10 19:21:32 +08:00
accessLogLocker.RLock()
daoList := []*HTTPAccessLogDAOWrapper{}
2021-06-02 11:53:24 +08:00
for _, daoWrapper := range httpAccessLogDAOMapping {
2020-10-10 19:21:32 +08:00
daoList = append(daoList, daoWrapper)
}
accessLogLocker.RUnlock()
if len(daoList) == 0 {
daoList = []*HTTPAccessLogDAOWrapper{{
DAO: SharedHTTPAccessLogDAO,
NodeId: 0,
}}
}
2022-03-08 19:55:39 +08:00
// 查询某个集群下的节点
var nodeIds = []int64{}
if clusterId > 0 {
nodeIds, err = SharedNodeDAO.FindAllEnabledNodeIdsWithClusterId(tx, clusterId)
if err != nil {
remotelogs.Error("DBNODE", err.Error())
return
}
sort.Slice(nodeIds, func(i, j int) bool {
return nodeIds[i] < nodeIds[j]
})
}
2020-10-10 19:21:32 +08:00
2022-03-08 19:55:39 +08:00
// 准备查询
var tableQueries = []*accessLogTableQuery{}
2020-10-10 19:21:32 +08:00
for _, daoWrapper := range daoList {
2022-03-08 19:55:39 +08:00
var instance = daoWrapper.DAO.Instance
tableDefs, err := SharedHTTPAccessLogManager.FindTables(instance, day)
if err != nil {
return nil, "", err
}
for _, def := range tableDefs {
tableQueries = append(tableQueries, &accessLogTableQuery{
daoWrapper: daoWrapper,
name: def.Name,
hasRemoteAddrField: def.HasRemoteAddr,
hasDomainField: def.HasDomain,
})
}
}
2020-10-10 19:21:32 +08:00
2022-03-08 19:55:39 +08:00
var locker = sync.Mutex{}
2020-10-10 19:21:32 +08:00
2022-03-08 19:55:39 +08:00
var statusPrefixReg = regexp.MustCompile(`status:\s*(\d{3})`)
var count = len(tableQueries)
var wg = &sync.WaitGroup{}
wg.Add(count)
for _, tableQuery := range tableQueries {
go func(tableQuery *accessLogTableQuery) {
defer wg.Done()
2020-10-10 19:21:32 +08:00
2022-03-08 19:55:39 +08:00
var dao = tableQuery.daoWrapper.DAO
var query = dao.Query(tx)
2020-10-10 19:21:32 +08:00
// 条件
if nodeId > 0 {
query.Attr("nodeId", nodeId)
} else if clusterId > 0 {
if len(nodeIds) > 0 {
var nodeIdStrings = []string{}
for _, subNodeId := range nodeIds {
nodeIdStrings = append(nodeIdStrings, types.String(subNodeId))
}
query.Where("nodeId IN (" + strings.Join(nodeIdStrings, ",") + ")")
query.Reuse(false)
} else {
// 如果没有节点,则直接返回空
return
}
}
2020-10-10 19:21:32 +08:00
if serverId > 0 {
query.Attr("serverId", serverId)
} else if userId > 0 && len(serverIds) > 0 {
query.Attr("serverId", serverIds).
Reuse(false)
2020-10-10 19:21:32 +08:00
}
if hasError {
2020-11-02 14:37:05 +08:00
query.Where("status>=400")
}
if firewallPolicyId > 0 {
query.Attr("firewallPolicyId", firewallPolicyId)
}
if firewallRuleGroupId > 0 {
query.Attr("firewallRuleGroupId", firewallRuleGroupId)
}
if firewallRuleSetId > 0 {
query.Attr("firewallRuleSetId", firewallRuleSetId)
}
if hasFirewallPolicy {
query.Where("firewallPolicyId>0")
2021-08-22 16:00:32 +08:00
query.UseIndex("firewallPolicyId")
}
2020-10-10 19:21:32 +08:00
// keyword
if len(ip) > 0 {
// TODO 支持IP范围
2022-03-08 19:55:39 +08:00
if tableQuery.hasRemoteAddrField {
// IP格式
if strings.Contains(ip, ",") || strings.Contains(ip, "-") {
rangeConfig, err := shared.ParseIPRange(ip)
if err == nil {
if len(rangeConfig.IPFrom) > 0 && len(rangeConfig.IPTo) > 0 {
query.Between("INET_ATON(remoteAddr)", utils.IP2Long(rangeConfig.IPFrom), utils.IP2Long(rangeConfig.IPTo))
}
}
} else {
query.Attr("remoteAddr", ip)
2021-08-22 16:20:40 +08:00
query.UseIndex("remoteAddr")
}
} else {
query.Where("JSON_EXTRACT(content, '$.remoteAddr')=:ip1").
Param("ip1", ip)
}
}
if len(domain) > 0 {
2022-03-08 19:55:39 +08:00
if tableQuery.hasDomainField {
if strings.Contains(domain, "*") {
domain = strings.ReplaceAll(domain, "*", "%")
domain = regexp.MustCompile(`[^a-zA-Z0-9-.%]`).ReplaceAllString(domain, "")
query.Where("domain LIKE :host2").
Param("host2", domain)
} else {
query.Attr("domain", domain)
2021-08-22 16:20:40 +08:00
query.UseIndex("domain")
}
} else {
query.Where("JSON_EXTRACT(content, '$.host')=:host1").
Param("host1", domain)
}
}
2022-03-08 19:55:39 +08:00
if len(keyword) > 0 {
// remoteAddr
2022-03-08 19:55:39 +08:00
if tableQuery.hasRemoteAddrField && net.ParseIP(keyword) != nil {
query.Attr("remoteAddr", keyword)
2022-03-08 19:55:39 +08:00
} else if tableQuery.hasRemoteAddrField && regexp.MustCompile(`^ip:.+`).MatchString(keyword) {
2021-07-18 17:09:06 +08:00
keyword = keyword[3:]
pieces := strings.SplitN(keyword, ",", 2)
if len(pieces) == 1 || len(pieces[1]) == 0 {
query.Attr("remoteAddr", pieces[0])
} else {
query.Between("INET_ATON(remoteAddr)", utils.IP2Long(pieces[0]), utils.IP2Long(pieces[1]))
2021-07-18 17:09:06 +08:00
}
2022-03-08 19:55:39 +08:00
} else if statusPrefixReg.MatchString(keyword) {
var matches = statusPrefixReg.FindStringSubmatch(keyword)
query.Attr("status", matches[1])
} else {
if regexp.MustCompile(`^ip:.+`).MatchString(keyword) {
keyword = keyword[3:]
}
2022-03-08 19:55:39 +08:00
var useOriginKeyword = false
where := "JSON_EXTRACT(content, '$.remoteAddr') LIKE :keyword OR JSON_EXTRACT(content, '$.requestURI') LIKE :keyword OR JSON_EXTRACT(content, '$.host') LIKE :keyword OR JSON_EXTRACT(content, '$.userAgent') LIKE :keyword"
2021-07-18 15:52:34 +08:00
jsonKeyword, err := json.Marshal(keyword)
if err == nil {
where += " OR JSON_CONTAINS(content, :jsonKeyword, '$.tags')"
query.Param("jsonKeyword", jsonKeyword)
}
// 请求方法
if keyword == http.MethodGet ||
keyword == http.MethodPost ||
keyword == http.MethodHead ||
keyword == http.MethodConnect ||
keyword == http.MethodPut ||
keyword == http.MethodTrace ||
keyword == http.MethodOptions ||
keyword == http.MethodDelete ||
keyword == http.MethodPatch {
where += " OR JSON_EXTRACT(content, '$.requestMethod')=:originKeyword"
useOriginKeyword = true
}
// 响应状态码
if regexp.MustCompile(`^\d{3}$`).MatchString(keyword) {
2022-03-08 19:55:39 +08:00
where += " OR status=:intKeyword"
query.Param("intKeyword", types.Int(keyword))
}
if regexp.MustCompile(`^\d{3}-\d{3}$`).MatchString(keyword) {
pieces := strings.Split(keyword, "-")
2022-03-08 19:55:39 +08:00
where += " OR status BETWEEN :intKeyword1 AND :intKeyword2"
query.Param("intKeyword1", types.Int(pieces[0]))
query.Param("intKeyword2", types.Int(pieces[1]))
}
if regexp.MustCompile(`^\d{20,}\s*\.?$`).MatchString(keyword) {
2021-12-02 11:50:16 +08:00
where += " OR requestId=:requestId"
query.Param("requestId", strings.TrimRight(keyword, ". "))
2021-12-02 11:50:16 +08:00
}
query.Where("("+where+")").
Param("keyword", "%"+keyword+"%")
if useOriginKeyword {
query.Param("originKeyword", keyword)
}
}
}
2020-10-10 19:21:32 +08:00
// offset
if len(lastRequestId) > 0 {
if !reverse {
query.Where("requestId<:requestId").
Param("requestId", lastRequestId)
} else {
query.Where("requestId>:requestId").
Param("requestId", lastRequestId)
}
}
if !reverse {
query.Desc("requestId")
} else {
query.Asc("requestId")
}
// 开始查询
ones, err := query.
2022-03-08 19:55:39 +08:00
Table(tableQuery.name).
2020-10-10 19:21:32 +08:00
Limit(size).
FindAll()
if err != nil {
logs.Println("[DB_NODE]" + err.Error())
return
}
2022-03-08 19:55:39 +08:00
2020-10-10 19:21:32 +08:00
locker.Lock()
for _, one := range ones {
accessLog := one.(*HTTPAccessLog)
result = append(result, accessLog)
}
locker.Unlock()
2022-03-08 19:55:39 +08:00
}(tableQuery)
2020-10-10 19:21:32 +08:00
}
wg.Wait()
if len(result) == 0 {
return nil, lastRequestId, nil
}
// 按照requestId排序
sort.Slice(result, func(i, j int) bool {
if !reverse {
return result[i].RequestId > result[j].RequestId
} else {
return result[i].RequestId < result[j].RequestId
}
})
if int64(len(result)) > size {
result = result[:size]
}
2022-03-08 19:55:39 +08:00
var requestId = result[len(result)-1].RequestId
2020-10-10 19:21:32 +08:00
if reverse {
lists.Reverse(result)
}
if !reverse {
return result, requestId, nil
} else {
return result, requestId, nil
}
}
2021-06-02 11:53:24 +08:00
// FindAccessLogWithRequestId 根据请求ID获取访问日志
func (this *HTTPAccessLogDAO) FindAccessLogWithRequestId(tx *dbs.Tx, requestId string) (*HTTPAccessLog, error) {
2021-11-21 15:56:13 +08:00
if !regexp.MustCompile(`^\d{11,}`).MatchString(requestId) {
return nil, errors.New("invalid requestId")
}
accessLogLocker.RLock()
daoList := []*HTTPAccessLogDAOWrapper{}
2021-06-02 11:53:24 +08:00
for _, daoWrapper := range httpAccessLogDAOMapping {
daoList = append(daoList, daoWrapper)
}
accessLogLocker.RUnlock()
if len(daoList) == 0 {
daoList = []*HTTPAccessLogDAOWrapper{{
DAO: SharedHTTPAccessLogDAO,
NodeId: 0,
}}
}
2022-03-08 19:55:39 +08:00
// 准备查询
var day = timeutil.FormatTime("Ymd", types.Int64(requestId[:10]))
var tableQueries = []*accessLogTableQuery{}
for _, daoWrapper := range daoList {
var instance = daoWrapper.DAO.Instance
tableDefs, err := SharedHTTPAccessLogManager.FindTables(instance, day)
if err != nil {
return nil, err
}
for _, def := range tableDefs {
tableQueries = append(tableQueries, &accessLogTableQuery{
daoWrapper: daoWrapper,
name: def.Name,
hasRemoteAddrField: def.HasRemoteAddr,
hasDomainField: def.HasDomain,
})
}
}
var count = len(tableQueries)
var wg = &sync.WaitGroup{}
wg.Add(count)
var result *HTTPAccessLog = nil
2022-03-08 19:55:39 +08:00
for _, tableQuery := range tableQueries {
go func(tableQuery *accessLogTableQuery) {
defer wg.Done()
2022-03-08 19:55:39 +08:00
var dao = tableQuery.daoWrapper.DAO
one, err := dao.Query(tx).
2022-03-08 19:55:39 +08:00
Table(tableQuery.name).
Attr("requestId", requestId).
Find()
if err != nil {
logs.Println("[DB_NODE]" + err.Error())
return
}
if one != nil {
result = one.(*HTTPAccessLog)
}
2022-03-08 19:55:39 +08:00
}(tableQuery)
}
wg.Wait()
return result, nil
}
2021-12-14 12:44:57 +08:00
// SetupQueue 建立队列
func (this *HTTPAccessLogDAO) SetupQueue() {
configJSON, err := SharedSysSettingDAO.ReadSetting(nil, systemconfigs.SettingCodeAccessLogQueue)
if err != nil {
remotelogs.Error("HTTP_ACCESS_LOG_QUEUE", "read settings failed: "+err.Error())
return
}
if len(configJSON) == 0 {
return
}
if bytes.Compare(accessLogConfigJSON, configJSON) == 0 {
return
}
accessLogConfigJSON = configJSON
var config = &serverconfigs.AccessLogQueueConfig{}
err = json.Unmarshal(configJSON, config)
if err != nil {
remotelogs.Error("HTTP_ACCESS_LOG_QUEUE", "decode settings failed: "+err.Error())
return
}
accessLogQueuePercent = config.Percent
accessLogCountPerSecond = config.CountPerSecond
if config.MaxLength <= 0 {
config.MaxLength = 100_000
}
accessLogEnableAutoPartial = config.EnableAutoPartial
if config.RowsPerTable > 0 {
accessLogRowsPerTable = config.RowsPerTable
}
2021-12-14 12:44:57 +08:00
if accessLogQueueMaxLength != config.MaxLength {
accessLogQueueMaxLength = config.MaxLength
oldAccessLogQueue = accessLogQueue
accessLogQueue = make(chan *pb.HTTPAccessLog, config.MaxLength)
}
if Tea.IsTesting() {
remotelogs.Println("HTTP_ACCESS_LOG_QUEUE", "change queue "+string(configJSON))
}
2021-12-14 12:44:57 +08:00
}