访问日志可以使用分表查询

This commit is contained in:
GoEdgeLab
2022-04-17 16:18:53 +08:00
parent d418baa893
commit 9f302c86f6
6 changed files with 232 additions and 17 deletions

View File

@@ -56,6 +56,17 @@ func init() {
})
}
func AllAccessLogDBs() []*dbs.DB {
accessLogLocker.Lock()
defer accessLogLocker.Unlock()
var result = []*dbs.DB{}
for _, db := range accessLogDBMapping {
result = append(result, db)
}
return result
}
// 获取获取DAO
func randomHTTPAccessLogDAO() (dao *HTTPAccessLogDAOWrapper) {
accessLogLocker.RLock()

View File

@@ -250,7 +250,9 @@ func (this *HTTPAccessLogDAO) CreateHTTPAccessLog(tx *dbs.Tx, dao *HTTPAccessLog
}
// ListAccessLogs 读取往前的 单页访问日志
func (this *HTTPAccessLogDAO) ListAccessLogs(tx *dbs.Tx, lastRequestId string,
func (this *HTTPAccessLogDAO) ListAccessLogs(tx *dbs.Tx,
partition int32,
lastRequestId string,
size int64,
day string,
hourFrom string,
@@ -277,18 +279,19 @@ func (this *HTTPAccessLogDAO) ListAccessLogs(tx *dbs.Tx, lastRequestId string,
size = 1000
}
result, nextLastRequestId, err = this.listAccessLogs(tx, lastRequestId, size, day, hourFrom, hourTo, clusterId, nodeId, serverId, reverse, hasError, firewallPolicyId, firewallRuleGroupId, firewallRuleSetId, hasFirewallPolicy, userId, keyword, ip, domain)
result, nextLastRequestId, err = this.listAccessLogs(tx, partition, lastRequestId, size, day, hourFrom, hourTo, clusterId, nodeId, serverId, reverse, hasError, firewallPolicyId, firewallRuleGroupId, firewallRuleSetId, hasFirewallPolicy, userId, keyword, ip, domain)
if err != nil || int64(len(result)) < size {
return
}
moreResult, _, _ := this.listAccessLogs(tx, nextLastRequestId, 1, day, hourFrom, hourTo, clusterId, nodeId, serverId, reverse, hasError, firewallPolicyId, firewallRuleGroupId, firewallRuleSetId, hasFirewallPolicy, userId, keyword, ip, domain)
moreResult, _, _ := this.listAccessLogs(tx, partition, nextLastRequestId, 1, day, hourFrom, hourTo, clusterId, nodeId, serverId, reverse, hasError, firewallPolicyId, firewallRuleGroupId, firewallRuleSetId, hasFirewallPolicy, userId, keyword, ip, domain)
hasMore = len(moreResult) > 0
return
}
// 读取往前的单页访问日志
func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx,
partition int32,
lastRequestId string,
size int64,
day string,
@@ -341,7 +344,7 @@ func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx,
if clusterId > 0 {
nodeIds, err = SharedNodeDAO.FindAllEnabledNodeIdsWithClusterId(tx, clusterId)
if err != nil {
remotelogs.Error("DBNODE", err.Error())
remotelogs.Error("DB_NODE", err.Error())
return
}
sort.Slice(nodeIds, func(i, j int) bool {
@@ -353,18 +356,25 @@ func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx,
var tableQueries = []*accessLogTableQuery{}
for _, daoWrapper := range daoList {
var instance = daoWrapper.DAO.Instance
tableDefs, err := SharedHTTPAccessLogManager.FindTables(instance, day)
def, err := SharedHTTPAccessLogManager.FindPartitionTable(instance, day, partition)
if err != nil {
return nil, "", err
}
for _, def := range tableDefs {
if !def.Exists {
continue
}
tableQueries = append(tableQueries, &accessLogTableQuery{
daoWrapper: daoWrapper,
name: def.Name,
hasRemoteAddrField: def.HasRemoteAddr,
hasDomainField: def.HasDomain,
})
logs.Println("query:", def.Name) // TODO
}
if len(tableQueries) == 0 {
return nil, "", nil
}
var locker = sync.Mutex{}

View File

@@ -54,7 +54,7 @@ func TestHTTPAccessLogDAO_ListAccessLogs(t *testing.T) {
t.Fatal(err)
}
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, "", 10, timeutil.Format("Ymd"), "", "", 0, 0, 0, false, false, 0, 0, 0, false, 0, "", "", "")
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, -1, "", 10, timeutil.Format("Ymd"), "", "", 0, 0, 0, false, false, 0, 0, 0, false, 0, "", "", "")
if err != nil {
t.Fatal(err)
}
@@ -81,7 +81,7 @@ func TestHTTPAccessLogDAO_ListAccessLogs_Page(t *testing.T) {
times := 0 // 防止循环次数太多
for {
before := time.Now()
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, lastRequestId, 2, timeutil.Format("Ymd"), "", "", 0, 0, 0, false, false, 0, 0, 0, false, 0, "", "", "")
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, -1, lastRequestId, 2, timeutil.Format("Ymd"), "", "", 0, 0, 0, false, false, 0, 0, 0, false, 0, "", "", "")
cost := time.Since(before).Seconds()
if err != nil {
t.Fatal(err)
@@ -112,7 +112,7 @@ func TestHTTPAccessLogDAO_ListAccessLogs_Reverse(t *testing.T) {
}
before := time.Now()
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, "16023261176446590001000000000000003500000004", 2, timeutil.Format("Ymd"), "", "", 0, 0, 0, true, false, 0, 0, 0, false, 0, "", "", "")
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, -1, "16023261176446590001000000000000003500000004", 2, timeutil.Format("Ymd"), "", "", 0, 0, 0, true, false, 0, 0, 0, false, 0, "", "", "")
cost := time.Since(before).Seconds()
if err != nil {
t.Fatal(err)
@@ -137,7 +137,7 @@ func TestHTTPAccessLogDAO_ListAccessLogs_Page_NotExists(t *testing.T) {
times := 0 // 防止循环次数太多
for {
before := time.Now()
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, lastRequestId, 2, timeutil.Format("Ymd", time.Now().AddDate(0, 0, 1)), "", "", 0, 0, 0, false, false, 0, 0, 0, false, 0, "", "", "")
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, -1, lastRequestId, 2, timeutil.Format("Ymd", time.Now().AddDate(0, 0, 1)), "", "", 0, 0, 0, false, false, 0, 0, 0, false, 0, "", "", "")
cost := time.Since(before).Seconds()
if err != nil {
t.Fatal(err)

View File

@@ -15,6 +15,7 @@ import (
)
// 访问日志的两个表格形式
// 括号位置需要固定,会用来读取日期和分区
var accessLogTableMainReg = regexp.MustCompile(`_(\d{8})$`)
var accessLogTablePartialReg = regexp.MustCompile(`_(\d{8})_(\d{4})$`)
@@ -149,6 +150,50 @@ func (this *HTTPAccessLogManager) FindTables(db *dbs.DB, day string) ([]*httpAcc
return results, nil
}
func (this *HTTPAccessLogManager) FindPartitionTable(db *dbs.DB, day string, partition int32) (*httpAccessLogDefinition, error) {
var tableNames []string
if partition < 0 {
tableList, err := this.FindTables(db, day)
if err != nil {
return nil, err
}
if len(tableList) > 0 {
return tableList[len(tableList)-1], nil
}
return &httpAccessLogDefinition{
Name: "",
HasRemoteAddr: false,
HasDomain: false,
Exists: false,
}, nil
} else if partition == 0 {
tableNames = []string{"edgeHTTPAccessLogs_" + day, "edgehttpaccesslogs_" + day}
} else {
tableNames = []string{"edgeHTTPAccessLogs_" + day + "_" + fmt.Sprintf("%04d", partition), "edgehttpaccesslogs_" + day + "_" + fmt.Sprintf("%04d", partition)}
}
for _, tableName := range tableNames {
hasRemoteField, hasDomainField, err := this.checkTableFields(db, tableName)
if err != nil {
continue
}
return &httpAccessLogDefinition{
Name: tableName,
HasRemoteAddr: hasRemoteField,
HasDomain: hasDomainField,
Exists: true,
}, nil
}
return &httpAccessLogDefinition{
Name: "",
HasRemoteAddr: false,
HasDomain: false,
Exists: false,
}, nil
}
// FindLastTable 根据日期获取上一个可以使用的表名
// 表名组成
// - PREFIX_DAY
@@ -218,6 +263,60 @@ func (this *HTTPAccessLogManager) ResetTable(db *dbs.DB, day string) {
delete(this.currentTableMapping, this.composeTableCacheKey(config.Dsn, day))
}
// TablePartition 从表名中获取分区
func (this *HTTPAccessLogManager) TablePartition(tableName string) (partition int32) {
if accessLogTablePartialReg.MatchString(tableName) {
return types.Int32(accessLogTablePartialReg.FindStringSubmatch(tableName)[2])
}
return 0
}
// FindLatestPartition 读取最后一个分区
func (this *HTTPAccessLogManager) FindLatestPartition(day string) (int32, error) {
var dbList = AllAccessLogDBs()
if len(dbList) == 0 {
return 0, errors.New("no valid database")
}
var partitions = []int32{}
var locker sync.Mutex
var wg = sync.WaitGroup{}
wg.Add(len(dbList))
var lastErr error
for _, db := range dbList {
go func(db *dbs.DB) {
defer wg.Done()
names, err := this.FindTableNames(db, day)
if err != nil {
lastErr = err
}
for _, name := range names {
var partition = this.TablePartition(name)
locker.Lock()
if !lists.Contains(partitions, partition) {
partitions = append(partitions, partition)
}
locker.Unlock()
}
}(db)
}
wg.Wait()
if lastErr != nil {
return 0, lastErr
}
if len(partitions) == 0 {
return 0, nil
}
return partitions[len(partitions)-1], nil
}
// 查找某个表格
func (this *HTTPAccessLogManager) findTableWithoutCache(db *dbs.DB, day string, force bool) (*httpAccessLogDefinition, error) {
tableNames, err := this.FindTableNames(db, day)

View File

@@ -6,6 +6,7 @@ import (
"encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/iwind/TeaGo/dbs"
timeutil "github.com/iwind/TeaGo/utils/time"
"testing"
"time"
)
@@ -157,3 +158,32 @@ func TestHTTPAccessLogManager_FindLastTable(t *testing.T) {
t.Log(time.Since(before).Seconds()*1000, "ms")
}
}
func TestHTTPAccessLogManager_FindPartitionTable(t *testing.T) {
var config = &dbs.DBConfig{
Driver: "mysql",
Dsn: "root:123456@tcp(127.0.0.1:3306)/db_edge_log?charset=utf8mb4&timeout=30s",
Prefix: "edge",
Connections: struct {
Pool int `yaml:"pool"`
Max int `yaml:"max"`
Life string `yaml:"life"`
LifeDuration time.Duration `yaml:",omitempty"`
}{},
Models: struct {
Package string `yaml:"package"`
}{},
}
db, err := dbs.NewInstanceFromConfig(config)
if err != nil {
t.Fatal(err)
}
defer func() {
_ = db.Close()
}()
t.Log(models.SharedHTTPAccessLogManager.FindPartitionTable(db, timeutil.Format("Ymd", time.Now().AddDate(0, 0, -1)), -1))
t.Log(models.SharedHTTPAccessLogManager.FindPartitionTable(db, timeutil.Format("Ymd", time.Now().AddDate(0, 0, -1)), 0))
t.Log(models.SharedHTTPAccessLogManager.FindPartitionTable(db, timeutil.Format("Ymd", time.Now().AddDate(0, 0, -1)), 1))
}

View File

@@ -4,8 +4,13 @@ import (
"context"
"github.com/TeaOSLab/EdgeAPI/internal/accesslogs"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/lists"
"regexp"
"sync"
)
// HTTPAccessLogService 访问日志相关服务
@@ -72,7 +77,7 @@ func (this *HTTPAccessLogService) ListHTTPAccessLogs(ctx context.Context, req *p
}
}
accessLogs, requestId, hasMore, err := models.SharedHTTPAccessLogDAO.ListAccessLogs(tx, req.RequestId, req.Size, req.Day, req.HourFrom, req.HourTo, req.NodeClusterId, req.NodeId, req.ServerId, req.Reverse, req.HasError, req.FirewallPolicyId, req.FirewallRuleGroupId, req.FirewallRuleSetId, req.HasFirewallPolicy, req.UserId, req.Keyword, req.Ip, req.Domain)
accessLogs, requestId, hasMore, err := models.SharedHTTPAccessLogDAO.ListAccessLogs(tx, req.Partition, req.RequestId, req.Size, req.Day, req.HourFrom, req.HourTo, req.NodeClusterId, req.NodeId, req.ServerId, req.Reverse, req.HasError, req.FirewallPolicyId, req.FirewallRuleGroupId, req.FirewallRuleSetId, req.HasFirewallPolicy, req.UserId, req.Keyword, req.Ip, req.Domain)
if err != nil {
return nil, err
}
@@ -165,3 +170,63 @@ func (this *HTTPAccessLogService) FindHTTPAccessLog(ctx context.Context, req *pb
}
return &pb.FindHTTPAccessLogResponse{HttpAccessLog: a}, nil
}
// FindHTTPAccessLogPartitions 查找日志分区
func (this *HTTPAccessLogService) FindHTTPAccessLogPartitions(ctx context.Context, req *pb.FindHTTPAccessLogPartitionsRequest) (*pb.FindHTTPAccessLogPartitionsResponse, error) {
_, err := this.ValidateAdmin(ctx, 0)
if err != nil {
return nil, err
}
if !regexp.MustCompile(`^\d{8}$`).MatchString(req.Day) {
return nil, errors.New("invalid 'day': " + req.Day)
}
var dbList = models.AllAccessLogDBs()
if len(dbList) == 0 {
return &pb.FindHTTPAccessLogPartitionsResponse{
Partitions: nil,
}, nil
}
var partitions = []int32{}
var locker sync.Mutex
var wg = sync.WaitGroup{}
wg.Add(len(dbList))
var lastErr error
for _, db := range dbList {
go func(db *dbs.DB) {
defer wg.Done()
names, err := models.SharedHTTPAccessLogManager.FindTableNames(db, req.Day)
if err != nil {
lastErr = err
}
for _, name := range names {
var partition = models.SharedHTTPAccessLogManager.TablePartition(name)
locker.Lock()
if !lists.Contains(partitions, partition) {
partitions = append(partitions, partition)
}
locker.Unlock()
}
}(db)
}
wg.Wait()
if lastErr != nil {
return nil, lastErr
}
var reversePartitions = []int32{}
for i := len(partitions) - 1; i >= 0; i-- {
reversePartitions = append(reversePartitions, partitions[i])
}
return &pb.FindHTTPAccessLogPartitionsResponse{
Partitions: partitions,
ReversePartitions: partitions,
}, nil
}