自动对访问日志进行分表

This commit is contained in:
刘祥超
2022-03-08 19:55:39 +08:00
parent 088636553c
commit 85a46a9827
10 changed files with 582 additions and 245 deletions

View File

@@ -34,13 +34,25 @@ type HTTPAccessLogDAO dbs.DAO
var SharedHTTPAccessLogDAO *HTTPAccessLogDAO
// 队列
var oldAccessLogQueue = make(chan *pb.HTTPAccessLog)
var accessLogQueue = make(chan *pb.HTTPAccessLog, 10_000)
var accessLogQueueMaxLength = 100_000
var accessLogQueuePercent = 100 // 0-100
var accessLogCountPerSecond = 10_000 // 0 表示不限制
var accessLogConfigJSON = []byte{}
var accessLogQueueChanged = make(chan zero.Zero, 1)
var (
oldAccessLogQueue = make(chan *pb.HTTPAccessLog)
accessLogQueue = make(chan *pb.HTTPAccessLog, 10_000)
accessLogQueueMaxLength = 100_000
accessLogQueuePercent = 100 // 0-100
accessLogCountPerSecond = 10_000 // 0 表示不限制
accessLogConfigJSON = []byte{}
accessLogQueueChanged = make(chan zero.Zero, 1)
accessLogEnableAutoPartial = true // 是否启用自动分表
accessLogPartialRows int64 = 500_000 // 自动分表的单表最大值
)
type accessLogTableQuery struct {
daoWrapper *HTTPAccessLogDAOWrapper
name string
hasRemoteAddrField bool
hasDomainField bool
}
func init() {
dbs.OnReady(func() {
@@ -120,7 +132,7 @@ func (this *HTTPAccessLogDAO) CreateHTTPAccessLogs(tx *dbs.Tx, accessLogs []*pb.
// DumpAccessLogsFromQueue 从队列导入访问日志
func (this *HTTPAccessLogDAO) DumpAccessLogsFromQueue(tx *dbs.Tx, size int) error {
dao := randomHTTPAccessLogDAO()
var dao = randomHTTPAccessLogDAO()
if dao == nil {
dao = &HTTPAccessLogDAOWrapper{
DAO: SharedHTTPAccessLogDAO,
@@ -168,8 +180,8 @@ Loop:
// CreateHTTPAccessLog 写入单条访问日志
func (this *HTTPAccessLogDAO) CreateHTTPAccessLog(tx *dbs.Tx, dao *HTTPAccessLogDAO, accessLog *pb.HTTPAccessLog) error {
day := timeutil.Format("Ymd", time.Unix(accessLog.Timestamp, 0))
tableDef, err := findHTTPAccessLogTable(dao.Instance, day, false)
var day = timeutil.Format("Ymd", time.Unix(accessLog.Timestamp, 0))
tableDef, err := SharedHTTPAccessLogManager.FindTable(dao.Instance, day, true)
if err != nil {
return err
}
@@ -203,27 +215,17 @@ func (this *HTTPAccessLogDAO) CreateHTTPAccessLog(tx *dbs.Tx, dao *HTTPAccessLog
}
fields["content"] = content
_, err = dao.Query(tx).
var lastId int64
lastId, err = dao.Query(tx).
Table(tableDef.Name).
Sets(fields).
Insert()
if err != nil {
// 是否为 Error 1146: Table 'xxx.xxx' doesn't exist 如果是,则创建表之后重试
if strings.Contains(err.Error(), "1146") {
tableDef, err = findHTTPAccessLogTable(dao.Instance, day, true)
if err != nil {
return err
}
_, err = dao.Query(tx).
Table(tableDef.Name).
Sets(fields).
Insert()
if err != nil {
return err
}
} else {
remotelogs.Error("HTTP_ACCESS_LOG", err.Error())
}
return err
}
if lastId%accessLogPartialRows == 0 {
SharedHTTPAccessLogManager.ResetTable(dao.Instance, day)
}
return nil
@@ -296,42 +298,56 @@ func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx, lastRequestId string, s
}}
}
locker := sync.Mutex{}
// 查询某个集群下的节点
var nodeIds = []int64{}
if clusterId > 0 {
nodeIds, err = SharedNodeDAO.FindAllEnabledNodeIdsWithClusterId(tx, clusterId)
if err != nil {
remotelogs.Error("DBNODE", err.Error())
return
}
sort.Slice(nodeIds, func(i, j int) bool {
return nodeIds[i] < nodeIds[j]
})
}
count := len(daoList)
wg := &sync.WaitGroup{}
wg.Add(count)
// 准备查询
var tableQueries = []*accessLogTableQuery{}
for _, daoWrapper := range daoList {
go func(daoWrapper *HTTPAccessLogDAOWrapper) {
var instance = daoWrapper.DAO.Instance
tableDefs, err := SharedHTTPAccessLogManager.FindTables(instance, day)
if err != nil {
return nil, "", err
}
for _, def := range tableDefs {
tableQueries = append(tableQueries, &accessLogTableQuery{
daoWrapper: daoWrapper,
name: def.Name,
hasRemoteAddrField: def.HasRemoteAddr,
hasDomainField: def.HasDomain,
})
}
}
var locker = sync.Mutex{}
var statusPrefixReg = regexp.MustCompile(`status:\s*(\d{3})`)
var count = len(tableQueries)
var wg = &sync.WaitGroup{}
wg.Add(count)
for _, tableQuery := range tableQueries {
go func(tableQuery *accessLogTableQuery) {
defer wg.Done()
dao := daoWrapper.DAO
tableName, hasRemoteAddrField, hasDomainField, exists, err := findHTTPAccessLogTableName(dao.Instance, day)
if err != nil {
logs.Println("[DB_NODE]" + err.Error())
return
}
if !exists {
// 表格不存在则跳过
return
}
query := dao.Query(tx)
var dao = tableQuery.daoWrapper.DAO
var query = dao.Query(tx)
// 条件
if nodeId > 0 {
query.Attr("nodeId", nodeId)
} else if clusterId > 0 {
nodeIds, err := SharedNodeDAO.FindAllEnabledNodeIdsWithClusterId(tx, clusterId)
if err != nil {
remotelogs.Error("DBNODE", err.Error())
return
}
if len(nodeIds) > 0 {
sort.Slice(nodeIds, func(i, j int) bool {
return nodeIds[i] < nodeIds[j]
})
var nodeIdStrings = []string{}
for _, subNodeId := range nodeIds {
nodeIdStrings = append(nodeIdStrings, types.String(subNodeId))
@@ -370,7 +386,7 @@ func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx, lastRequestId string, s
// keyword
if len(ip) > 0 {
// TODO 支持IP范围
if hasRemoteAddrField {
if tableQuery.hasRemoteAddrField {
// IP格式
if strings.Contains(ip, ",") || strings.Contains(ip, "-") {
rangeConfig, err := shared.ParseIPRange(ip)
@@ -389,7 +405,7 @@ func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx, lastRequestId string, s
}
}
if len(domain) > 0 {
if hasDomainField {
if tableQuery.hasDomainField {
if strings.Contains(domain, "*") {
domain = strings.ReplaceAll(domain, "*", "%")
domain = regexp.MustCompile(`[^a-zA-Z0-9-.%]`).ReplaceAllString(domain, "")
@@ -404,11 +420,12 @@ func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx, lastRequestId string, s
Param("host1", domain)
}
}
if len(keyword) > 0 {
// remoteAddr
if hasRemoteAddrField && net.ParseIP(keyword) != nil {
if tableQuery.hasRemoteAddrField && net.ParseIP(keyword) != nil {
query.Attr("remoteAddr", keyword)
} else if hasRemoteAddrField && regexp.MustCompile(`^ip:.+`).MatchString(keyword) {
} else if tableQuery.hasRemoteAddrField && regexp.MustCompile(`^ip:.+`).MatchString(keyword) {
keyword = keyword[3:]
pieces := strings.SplitN(keyword, ",", 2)
if len(pieces) == 1 || len(pieces[1]) == 0 {
@@ -416,12 +433,15 @@ func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx, lastRequestId string, s
} else {
query.Between("INET_ATON(remoteAddr)", utils.IP2Long(pieces[0]), utils.IP2Long(pieces[1]))
}
} else if statusPrefixReg.MatchString(keyword) {
var matches = statusPrefixReg.FindStringSubmatch(keyword)
query.Attr("status", matches[1])
} else {
if regexp.MustCompile(`^ip:.+`).MatchString(keyword) {
keyword = keyword[3:]
}
useOriginKeyword := false
var useOriginKeyword = false
where := "JSON_EXTRACT(content, '$.remoteAddr') LIKE :keyword OR JSON_EXTRACT(content, '$.requestURI') LIKE :keyword OR JSON_EXTRACT(content, '$.host') LIKE :keyword OR JSON_EXTRACT(content, '$.userAgent') LIKE :keyword"
@@ -447,13 +467,13 @@ func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx, lastRequestId string, s
// 响应状态码
if regexp.MustCompile(`^\d{3}$`).MatchString(keyword) {
where += " OR JSON_EXTRACT(content, '$.status')=:intKeyword"
where += " OR status=:intKeyword"
query.Param("intKeyword", types.Int(keyword))
}
if regexp.MustCompile(`^\d{3}-\d{3}$`).MatchString(keyword) {
pieces := strings.Split(keyword, "-")
where += " OR JSON_EXTRACT(content, '$.status') BETWEEN :intKeyword1 AND :intKeyword2"
where += " OR status BETWEEN :intKeyword1 AND :intKeyword2"
query.Param("intKeyword1", types.Int(pieces[0]))
query.Param("intKeyword2", types.Int(pieces[1]))
}
@@ -490,20 +510,21 @@ func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx, lastRequestId string, s
// 开始查询
ones, err := query.
Table(tableName).
Table(tableQuery.name).
Limit(size).
FindAll()
if err != nil {
logs.Println("[DB_NODE]" + err.Error())
return
}
locker.Lock()
for _, one := range ones {
accessLog := one.(*HTTPAccessLog)
result = append(result, accessLog)
}
locker.Unlock()
}(daoWrapper)
}(tableQuery)
}
wg.Wait()
@@ -524,7 +545,7 @@ func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx, lastRequestId string, s
result = result[:size]
}
requestId := result[len(result)-1].RequestId
var requestId = result[len(result)-1].RequestId
if reverse {
lists.Reverse(result)
}
@@ -556,28 +577,36 @@ func (this *HTTPAccessLogDAO) FindAccessLogWithRequestId(tx *dbs.Tx, requestId s
}}
}
count := len(daoList)
wg := &sync.WaitGroup{}
// 准备查询
var day = timeutil.FormatTime("Ymd", types.Int64(requestId[:10]))
var tableQueries = []*accessLogTableQuery{}
for _, daoWrapper := range daoList {
var instance = daoWrapper.DAO.Instance
tableDefs, err := SharedHTTPAccessLogManager.FindTables(instance, day)
if err != nil {
return nil, err
}
for _, def := range tableDefs {
tableQueries = append(tableQueries, &accessLogTableQuery{
daoWrapper: daoWrapper,
name: def.Name,
hasRemoteAddrField: def.HasRemoteAddr,
hasDomainField: def.HasDomain,
})
}
}
var count = len(tableQueries)
var wg = &sync.WaitGroup{}
wg.Add(count)
var result *HTTPAccessLog = nil
day := timeutil.FormatTime("Ymd", types.Int64(requestId[:10]))
for _, daoWrapper := range daoList {
go func(daoWrapper *HTTPAccessLogDAOWrapper) {
for _, tableQuery := range tableQueries {
go func(tableQuery *accessLogTableQuery) {
defer wg.Done()
dao := daoWrapper.DAO
tableName, _, _, exists, err := findHTTPAccessLogTableName(dao.Instance, day)
if err != nil {
logs.Println("[DB_NODE]" + err.Error())
return
}
if !exists {
return
}
var dao = tableQuery.daoWrapper.DAO
one, err := dao.Query(tx).
Table(tableName).
Table(tableQuery.name).
Attr("requestId", requestId).
Find()
if err != nil {
@@ -587,7 +616,7 @@ func (this *HTTPAccessLogDAO) FindAccessLogWithRequestId(tx *dbs.Tx, requestId s
if one != nil {
result = one.(*HTTPAccessLog)
}
}(daoWrapper)
}(tableQuery)
}
wg.Wait()
return result, nil