mirror of
https://github.com/TeaOSLab/EdgeAPI.git
synced 2025-11-07 10:40:25 +08:00
访问日志可以使用分表查询
This commit is contained in:
@@ -56,6 +56,17 @@ func init() {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func AllAccessLogDBs() []*dbs.DB {
|
||||||
|
accessLogLocker.Lock()
|
||||||
|
defer accessLogLocker.Unlock()
|
||||||
|
|
||||||
|
var result = []*dbs.DB{}
|
||||||
|
for _, db := range accessLogDBMapping {
|
||||||
|
result = append(result, db)
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
// 获取获取DAO
|
// 获取获取DAO
|
||||||
func randomHTTPAccessLogDAO() (dao *HTTPAccessLogDAOWrapper) {
|
func randomHTTPAccessLogDAO() (dao *HTTPAccessLogDAOWrapper) {
|
||||||
accessLogLocker.RLock()
|
accessLogLocker.RLock()
|
||||||
|
|||||||
@@ -250,7 +250,9 @@ func (this *HTTPAccessLogDAO) CreateHTTPAccessLog(tx *dbs.Tx, dao *HTTPAccessLog
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ListAccessLogs 读取往前的 单页访问日志
|
// ListAccessLogs 读取往前的 单页访问日志
|
||||||
func (this *HTTPAccessLogDAO) ListAccessLogs(tx *dbs.Tx, lastRequestId string,
|
func (this *HTTPAccessLogDAO) ListAccessLogs(tx *dbs.Tx,
|
||||||
|
partition int32,
|
||||||
|
lastRequestId string,
|
||||||
size int64,
|
size int64,
|
||||||
day string,
|
day string,
|
||||||
hourFrom string,
|
hourFrom string,
|
||||||
@@ -277,18 +279,19 @@ func (this *HTTPAccessLogDAO) ListAccessLogs(tx *dbs.Tx, lastRequestId string,
|
|||||||
size = 1000
|
size = 1000
|
||||||
}
|
}
|
||||||
|
|
||||||
result, nextLastRequestId, err = this.listAccessLogs(tx, lastRequestId, size, day, hourFrom, hourTo, clusterId, nodeId, serverId, reverse, hasError, firewallPolicyId, firewallRuleGroupId, firewallRuleSetId, hasFirewallPolicy, userId, keyword, ip, domain)
|
result, nextLastRequestId, err = this.listAccessLogs(tx, partition, lastRequestId, size, day, hourFrom, hourTo, clusterId, nodeId, serverId, reverse, hasError, firewallPolicyId, firewallRuleGroupId, firewallRuleSetId, hasFirewallPolicy, userId, keyword, ip, domain)
|
||||||
if err != nil || int64(len(result)) < size {
|
if err != nil || int64(len(result)) < size {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
moreResult, _, _ := this.listAccessLogs(tx, nextLastRequestId, 1, day, hourFrom, hourTo, clusterId, nodeId, serverId, reverse, hasError, firewallPolicyId, firewallRuleGroupId, firewallRuleSetId, hasFirewallPolicy, userId, keyword, ip, domain)
|
moreResult, _, _ := this.listAccessLogs(tx, partition, nextLastRequestId, 1, day, hourFrom, hourTo, clusterId, nodeId, serverId, reverse, hasError, firewallPolicyId, firewallRuleGroupId, firewallRuleSetId, hasFirewallPolicy, userId, keyword, ip, domain)
|
||||||
hasMore = len(moreResult) > 0
|
hasMore = len(moreResult) > 0
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// 读取往前的单页访问日志
|
// 读取往前的单页访问日志
|
||||||
func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx,
|
func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx,
|
||||||
|
partition int32,
|
||||||
lastRequestId string,
|
lastRequestId string,
|
||||||
size int64,
|
size int64,
|
||||||
day string,
|
day string,
|
||||||
@@ -341,7 +344,7 @@ func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx,
|
|||||||
if clusterId > 0 {
|
if clusterId > 0 {
|
||||||
nodeIds, err = SharedNodeDAO.FindAllEnabledNodeIdsWithClusterId(tx, clusterId)
|
nodeIds, err = SharedNodeDAO.FindAllEnabledNodeIdsWithClusterId(tx, clusterId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
remotelogs.Error("DBNODE", err.Error())
|
remotelogs.Error("DB_NODE", err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
sort.Slice(nodeIds, func(i, j int) bool {
|
sort.Slice(nodeIds, func(i, j int) bool {
|
||||||
@@ -353,18 +356,25 @@ func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx,
|
|||||||
var tableQueries = []*accessLogTableQuery{}
|
var tableQueries = []*accessLogTableQuery{}
|
||||||
for _, daoWrapper := range daoList {
|
for _, daoWrapper := range daoList {
|
||||||
var instance = daoWrapper.DAO.Instance
|
var instance = daoWrapper.DAO.Instance
|
||||||
tableDefs, err := SharedHTTPAccessLogManager.FindTables(instance, day)
|
def, err := SharedHTTPAccessLogManager.FindPartitionTable(instance, day, partition)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", err
|
return nil, "", err
|
||||||
}
|
}
|
||||||
for _, def := range tableDefs {
|
if !def.Exists {
|
||||||
tableQueries = append(tableQueries, &accessLogTableQuery{
|
continue
|
||||||
daoWrapper: daoWrapper,
|
|
||||||
name: def.Name,
|
|
||||||
hasRemoteAddrField: def.HasRemoteAddr,
|
|
||||||
hasDomainField: def.HasDomain,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tableQueries = append(tableQueries, &accessLogTableQuery{
|
||||||
|
daoWrapper: daoWrapper,
|
||||||
|
name: def.Name,
|
||||||
|
hasRemoteAddrField: def.HasRemoteAddr,
|
||||||
|
hasDomainField: def.HasDomain,
|
||||||
|
})
|
||||||
|
logs.Println("query:", def.Name) // TODO
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(tableQueries) == 0 {
|
||||||
|
return nil, "", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var locker = sync.Mutex{}
|
var locker = sync.Mutex{}
|
||||||
|
|||||||
@@ -54,7 +54,7 @@ func TestHTTPAccessLogDAO_ListAccessLogs(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, "", 10, timeutil.Format("Ymd"), "", "", 0, 0, 0, false, false, 0, 0, 0, false, 0, "", "", "")
|
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, -1, "", 10, timeutil.Format("Ymd"), "", "", 0, 0, 0, false, false, 0, 0, 0, false, 0, "", "", "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -81,7 +81,7 @@ func TestHTTPAccessLogDAO_ListAccessLogs_Page(t *testing.T) {
|
|||||||
times := 0 // 防止循环次数太多
|
times := 0 // 防止循环次数太多
|
||||||
for {
|
for {
|
||||||
before := time.Now()
|
before := time.Now()
|
||||||
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, lastRequestId, 2, timeutil.Format("Ymd"), "", "", 0, 0, 0, false, false, 0, 0, 0, false, 0, "", "", "")
|
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, -1, lastRequestId, 2, timeutil.Format("Ymd"), "", "", 0, 0, 0, false, false, 0, 0, 0, false, 0, "", "", "")
|
||||||
cost := time.Since(before).Seconds()
|
cost := time.Since(before).Seconds()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -112,7 +112,7 @@ func TestHTTPAccessLogDAO_ListAccessLogs_Reverse(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
before := time.Now()
|
before := time.Now()
|
||||||
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, "16023261176446590001000000000000003500000004", 2, timeutil.Format("Ymd"), "", "", 0, 0, 0, true, false, 0, 0, 0, false, 0, "", "", "")
|
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, -1, "16023261176446590001000000000000003500000004", 2, timeutil.Format("Ymd"), "", "", 0, 0, 0, true, false, 0, 0, 0, false, 0, "", "", "")
|
||||||
cost := time.Since(before).Seconds()
|
cost := time.Since(before).Seconds()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -137,7 +137,7 @@ func TestHTTPAccessLogDAO_ListAccessLogs_Page_NotExists(t *testing.T) {
|
|||||||
times := 0 // 防止循环次数太多
|
times := 0 // 防止循环次数太多
|
||||||
for {
|
for {
|
||||||
before := time.Now()
|
before := time.Now()
|
||||||
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, lastRequestId, 2, timeutil.Format("Ymd", time.Now().AddDate(0, 0, 1)), "", "", 0, 0, 0, false, false, 0, 0, 0, false, 0, "", "", "")
|
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, -1, lastRequestId, 2, timeutil.Format("Ymd", time.Now().AddDate(0, 0, 1)), "", "", 0, 0, 0, false, false, 0, 0, 0, false, 0, "", "", "")
|
||||||
cost := time.Since(before).Seconds()
|
cost := time.Since(before).Seconds()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// 访问日志的两个表格形式
|
// 访问日志的两个表格形式
|
||||||
|
// 括号位置需要固定,会用来读取日期和分区
|
||||||
var accessLogTableMainReg = regexp.MustCompile(`_(\d{8})$`)
|
var accessLogTableMainReg = regexp.MustCompile(`_(\d{8})$`)
|
||||||
var accessLogTablePartialReg = regexp.MustCompile(`_(\d{8})_(\d{4})$`)
|
var accessLogTablePartialReg = regexp.MustCompile(`_(\d{8})_(\d{4})$`)
|
||||||
|
|
||||||
@@ -149,6 +150,50 @@ func (this *HTTPAccessLogManager) FindTables(db *dbs.DB, day string) ([]*httpAcc
|
|||||||
return results, nil
|
return results, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *HTTPAccessLogManager) FindPartitionTable(db *dbs.DB, day string, partition int32) (*httpAccessLogDefinition, error) {
|
||||||
|
var tableNames []string
|
||||||
|
if partition < 0 {
|
||||||
|
tableList, err := this.FindTables(db, day)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(tableList) > 0 {
|
||||||
|
return tableList[len(tableList)-1], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return &httpAccessLogDefinition{
|
||||||
|
Name: "",
|
||||||
|
HasRemoteAddr: false,
|
||||||
|
HasDomain: false,
|
||||||
|
Exists: false,
|
||||||
|
}, nil
|
||||||
|
} else if partition == 0 {
|
||||||
|
tableNames = []string{"edgeHTTPAccessLogs_" + day, "edgehttpaccesslogs_" + day}
|
||||||
|
} else {
|
||||||
|
tableNames = []string{"edgeHTTPAccessLogs_" + day + "_" + fmt.Sprintf("%04d", partition), "edgehttpaccesslogs_" + day + "_" + fmt.Sprintf("%04d", partition)}
|
||||||
|
}
|
||||||
|
for _, tableName := range tableNames {
|
||||||
|
hasRemoteField, hasDomainField, err := this.checkTableFields(db, tableName)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return &httpAccessLogDefinition{
|
||||||
|
Name: tableName,
|
||||||
|
HasRemoteAddr: hasRemoteField,
|
||||||
|
HasDomain: hasDomainField,
|
||||||
|
Exists: true,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return &httpAccessLogDefinition{
|
||||||
|
Name: "",
|
||||||
|
HasRemoteAddr: false,
|
||||||
|
HasDomain: false,
|
||||||
|
Exists: false,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// FindLastTable 根据日期获取上一个可以使用的表名
|
// FindLastTable 根据日期获取上一个可以使用的表名
|
||||||
// 表名组成
|
// 表名组成
|
||||||
// - PREFIX_DAY
|
// - PREFIX_DAY
|
||||||
@@ -218,6 +263,60 @@ func (this *HTTPAccessLogManager) ResetTable(db *dbs.DB, day string) {
|
|||||||
delete(this.currentTableMapping, this.composeTableCacheKey(config.Dsn, day))
|
delete(this.currentTableMapping, this.composeTableCacheKey(config.Dsn, day))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TablePartition 从表名中获取分区
|
||||||
|
func (this *HTTPAccessLogManager) TablePartition(tableName string) (partition int32) {
|
||||||
|
if accessLogTablePartialReg.MatchString(tableName) {
|
||||||
|
return types.Int32(accessLogTablePartialReg.FindStringSubmatch(tableName)[2])
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindLatestPartition 读取最后一个分区
|
||||||
|
func (this *HTTPAccessLogManager) FindLatestPartition(day string) (int32, error) {
|
||||||
|
var dbList = AllAccessLogDBs()
|
||||||
|
if len(dbList) == 0 {
|
||||||
|
return 0, errors.New("no valid database")
|
||||||
|
}
|
||||||
|
|
||||||
|
var partitions = []int32{}
|
||||||
|
var locker sync.Mutex
|
||||||
|
|
||||||
|
var wg = sync.WaitGroup{}
|
||||||
|
wg.Add(len(dbList))
|
||||||
|
|
||||||
|
var lastErr error
|
||||||
|
for _, db := range dbList {
|
||||||
|
go func(db *dbs.DB) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
names, err := this.FindTableNames(db, day)
|
||||||
|
if err != nil {
|
||||||
|
lastErr = err
|
||||||
|
}
|
||||||
|
for _, name := range names {
|
||||||
|
var partition = this.TablePartition(name)
|
||||||
|
locker.Lock()
|
||||||
|
if !lists.Contains(partitions, partition) {
|
||||||
|
partitions = append(partitions, partition)
|
||||||
|
}
|
||||||
|
locker.Unlock()
|
||||||
|
}
|
||||||
|
}(db)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
if lastErr != nil {
|
||||||
|
return 0, lastErr
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(partitions) == 0 {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return partitions[len(partitions)-1], nil
|
||||||
|
}
|
||||||
|
|
||||||
// 查找某个表格
|
// 查找某个表格
|
||||||
func (this *HTTPAccessLogManager) findTableWithoutCache(db *dbs.DB, day string, force bool) (*httpAccessLogDefinition, error) {
|
func (this *HTTPAccessLogManager) findTableWithoutCache(db *dbs.DB, day string, force bool) (*httpAccessLogDefinition, error) {
|
||||||
tableNames, err := this.FindTableNames(db, day)
|
tableNames, err := this.FindTableNames(db, day)
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
||||||
"github.com/iwind/TeaGo/dbs"
|
"github.com/iwind/TeaGo/dbs"
|
||||||
|
timeutil "github.com/iwind/TeaGo/utils/time"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@@ -157,3 +158,32 @@ func TestHTTPAccessLogManager_FindLastTable(t *testing.T) {
|
|||||||
t.Log(time.Since(before).Seconds()*1000, "ms")
|
t.Log(time.Since(before).Seconds()*1000, "ms")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHTTPAccessLogManager_FindPartitionTable(t *testing.T) {
|
||||||
|
var config = &dbs.DBConfig{
|
||||||
|
Driver: "mysql",
|
||||||
|
Dsn: "root:123456@tcp(127.0.0.1:3306)/db_edge_log?charset=utf8mb4&timeout=30s",
|
||||||
|
Prefix: "edge",
|
||||||
|
Connections: struct {
|
||||||
|
Pool int `yaml:"pool"`
|
||||||
|
Max int `yaml:"max"`
|
||||||
|
Life string `yaml:"life"`
|
||||||
|
LifeDuration time.Duration `yaml:",omitempty"`
|
||||||
|
}{},
|
||||||
|
Models: struct {
|
||||||
|
Package string `yaml:"package"`
|
||||||
|
}{},
|
||||||
|
}
|
||||||
|
|
||||||
|
db, err := dbs.NewInstanceFromConfig(config)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
_ = db.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
t.Log(models.SharedHTTPAccessLogManager.FindPartitionTable(db, timeutil.Format("Ymd", time.Now().AddDate(0, 0, -1)), -1))
|
||||||
|
t.Log(models.SharedHTTPAccessLogManager.FindPartitionTable(db, timeutil.Format("Ymd", time.Now().AddDate(0, 0, -1)), 0))
|
||||||
|
t.Log(models.SharedHTTPAccessLogManager.FindPartitionTable(db, timeutil.Format("Ymd", time.Now().AddDate(0, 0, -1)), 1))
|
||||||
|
}
|
||||||
|
|||||||
@@ -4,8 +4,13 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"github.com/TeaOSLab/EdgeAPI/internal/accesslogs"
|
"github.com/TeaOSLab/EdgeAPI/internal/accesslogs"
|
||||||
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
||||||
|
"github.com/TeaOSLab/EdgeAPI/internal/errors"
|
||||||
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
|
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
|
||||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||||
|
"github.com/iwind/TeaGo/dbs"
|
||||||
|
"github.com/iwind/TeaGo/lists"
|
||||||
|
"regexp"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
// HTTPAccessLogService 访问日志相关服务
|
// HTTPAccessLogService 访问日志相关服务
|
||||||
@@ -72,7 +77,7 @@ func (this *HTTPAccessLogService) ListHTTPAccessLogs(ctx context.Context, req *p
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
accessLogs, requestId, hasMore, err := models.SharedHTTPAccessLogDAO.ListAccessLogs(tx, req.RequestId, req.Size, req.Day, req.HourFrom, req.HourTo, req.NodeClusterId, req.NodeId, req.ServerId, req.Reverse, req.HasError, req.FirewallPolicyId, req.FirewallRuleGroupId, req.FirewallRuleSetId, req.HasFirewallPolicy, req.UserId, req.Keyword, req.Ip, req.Domain)
|
accessLogs, requestId, hasMore, err := models.SharedHTTPAccessLogDAO.ListAccessLogs(tx, req.Partition, req.RequestId, req.Size, req.Day, req.HourFrom, req.HourTo, req.NodeClusterId, req.NodeId, req.ServerId, req.Reverse, req.HasError, req.FirewallPolicyId, req.FirewallRuleGroupId, req.FirewallRuleSetId, req.HasFirewallPolicy, req.UserId, req.Keyword, req.Ip, req.Domain)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -165,3 +170,63 @@ func (this *HTTPAccessLogService) FindHTTPAccessLog(ctx context.Context, req *pb
|
|||||||
}
|
}
|
||||||
return &pb.FindHTTPAccessLogResponse{HttpAccessLog: a}, nil
|
return &pb.FindHTTPAccessLogResponse{HttpAccessLog: a}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FindHTTPAccessLogPartitions 查找日志分区
|
||||||
|
func (this *HTTPAccessLogService) FindHTTPAccessLogPartitions(ctx context.Context, req *pb.FindHTTPAccessLogPartitionsRequest) (*pb.FindHTTPAccessLogPartitionsResponse, error) {
|
||||||
|
_, err := this.ValidateAdmin(ctx, 0)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !regexp.MustCompile(`^\d{8}$`).MatchString(req.Day) {
|
||||||
|
return nil, errors.New("invalid 'day': " + req.Day)
|
||||||
|
}
|
||||||
|
|
||||||
|
var dbList = models.AllAccessLogDBs()
|
||||||
|
if len(dbList) == 0 {
|
||||||
|
return &pb.FindHTTPAccessLogPartitionsResponse{
|
||||||
|
Partitions: nil,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var partitions = []int32{}
|
||||||
|
var locker sync.Mutex
|
||||||
|
|
||||||
|
var wg = sync.WaitGroup{}
|
||||||
|
wg.Add(len(dbList))
|
||||||
|
|
||||||
|
var lastErr error
|
||||||
|
for _, db := range dbList {
|
||||||
|
go func(db *dbs.DB) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
names, err := models.SharedHTTPAccessLogManager.FindTableNames(db, req.Day)
|
||||||
|
if err != nil {
|
||||||
|
lastErr = err
|
||||||
|
}
|
||||||
|
for _, name := range names {
|
||||||
|
var partition = models.SharedHTTPAccessLogManager.TablePartition(name)
|
||||||
|
locker.Lock()
|
||||||
|
if !lists.Contains(partitions, partition) {
|
||||||
|
partitions = append(partitions, partition)
|
||||||
|
}
|
||||||
|
locker.Unlock()
|
||||||
|
}
|
||||||
|
}(db)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
if lastErr != nil {
|
||||||
|
return nil, lastErr
|
||||||
|
}
|
||||||
|
|
||||||
|
var reversePartitions = []int32{}
|
||||||
|
for i := len(partitions) - 1; i >= 0; i-- {
|
||||||
|
reversePartitions = append(reversePartitions, partitions[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
return &pb.FindHTTPAccessLogPartitionsResponse{
|
||||||
|
Partitions: partitions,
|
||||||
|
ReversePartitions: partitions,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user