diff --git a/internal/db/models/db_node_initializer.go b/internal/db/models/db_node_initializer.go index 39caea39..35ee6b9a 100644 --- a/internal/db/models/db_node_initializer.go +++ b/internal/db/models/db_node_initializer.go @@ -56,6 +56,17 @@ func init() { }) } +func AllAccessLogDBs() []*dbs.DB { + accessLogLocker.Lock() + defer accessLogLocker.Unlock() + + var result = []*dbs.DB{} + for _, db := range accessLogDBMapping { + result = append(result, db) + } + return result +} + // 获取获取DAO func randomHTTPAccessLogDAO() (dao *HTTPAccessLogDAOWrapper) { accessLogLocker.RLock() diff --git a/internal/db/models/http_access_log_dao.go b/internal/db/models/http_access_log_dao.go index 34e12e42..56aef4de 100644 --- a/internal/db/models/http_access_log_dao.go +++ b/internal/db/models/http_access_log_dao.go @@ -250,7 +250,9 @@ func (this *HTTPAccessLogDAO) CreateHTTPAccessLog(tx *dbs.Tx, dao *HTTPAccessLog } // ListAccessLogs 读取往前的 单页访问日志 -func (this *HTTPAccessLogDAO) ListAccessLogs(tx *dbs.Tx, lastRequestId string, +func (this *HTTPAccessLogDAO) ListAccessLogs(tx *dbs.Tx, + partition int32, + lastRequestId string, size int64, day string, hourFrom string, @@ -277,18 +279,19 @@ func (this *HTTPAccessLogDAO) ListAccessLogs(tx *dbs.Tx, lastRequestId string, size = 1000 } - result, nextLastRequestId, err = this.listAccessLogs(tx, lastRequestId, size, day, hourFrom, hourTo, clusterId, nodeId, serverId, reverse, hasError, firewallPolicyId, firewallRuleGroupId, firewallRuleSetId, hasFirewallPolicy, userId, keyword, ip, domain) + result, nextLastRequestId, err = this.listAccessLogs(tx, partition, lastRequestId, size, day, hourFrom, hourTo, clusterId, nodeId, serverId, reverse, hasError, firewallPolicyId, firewallRuleGroupId, firewallRuleSetId, hasFirewallPolicy, userId, keyword, ip, domain) if err != nil || int64(len(result)) < size { return } - moreResult, _, _ := this.listAccessLogs(tx, nextLastRequestId, 1, day, hourFrom, hourTo, clusterId, nodeId, serverId, reverse, hasError, firewallPolicyId, firewallRuleGroupId, firewallRuleSetId, hasFirewallPolicy, userId, keyword, ip, domain) + moreResult, _, _ := this.listAccessLogs(tx, partition, nextLastRequestId, 1, day, hourFrom, hourTo, clusterId, nodeId, serverId, reverse, hasError, firewallPolicyId, firewallRuleGroupId, firewallRuleSetId, hasFirewallPolicy, userId, keyword, ip, domain) hasMore = len(moreResult) > 0 return } // 读取往前的单页访问日志 func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx, + partition int32, lastRequestId string, size int64, day string, @@ -341,7 +344,7 @@ func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx, if clusterId > 0 { nodeIds, err = SharedNodeDAO.FindAllEnabledNodeIdsWithClusterId(tx, clusterId) if err != nil { - remotelogs.Error("DBNODE", err.Error()) + remotelogs.Error("DB_NODE", err.Error()) return } sort.Slice(nodeIds, func(i, j int) bool { @@ -353,18 +356,25 @@ func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx, var tableQueries = []*accessLogTableQuery{} for _, daoWrapper := range daoList { var instance = daoWrapper.DAO.Instance - tableDefs, err := SharedHTTPAccessLogManager.FindTables(instance, day) + def, err := SharedHTTPAccessLogManager.FindPartitionTable(instance, day, partition) if err != nil { return nil, "", err } - for _, def := range tableDefs { - tableQueries = append(tableQueries, &accessLogTableQuery{ - daoWrapper: daoWrapper, - name: def.Name, - hasRemoteAddrField: def.HasRemoteAddr, - hasDomainField: def.HasDomain, - }) + if !def.Exists { + continue } + + tableQueries = append(tableQueries, &accessLogTableQuery{ + daoWrapper: daoWrapper, + name: def.Name, + hasRemoteAddrField: def.HasRemoteAddr, + hasDomainField: def.HasDomain, + }) + logs.Println("query:", def.Name) // TODO + } + + if len(tableQueries) == 0 { + return nil, "", nil } var locker = sync.Mutex{} diff --git a/internal/db/models/http_access_log_dao_test.go b/internal/db/models/http_access_log_dao_test.go index 34729917..d84ffc3a 100644 --- a/internal/db/models/http_access_log_dao_test.go +++ b/internal/db/models/http_access_log_dao_test.go @@ -54,7 +54,7 @@ func TestHTTPAccessLogDAO_ListAccessLogs(t *testing.T) { t.Fatal(err) } - accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, "", 10, timeutil.Format("Ymd"), "", "", 0, 0, 0, false, false, 0, 0, 0, false, 0, "", "", "") + accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, -1, "", 10, timeutil.Format("Ymd"), "", "", 0, 0, 0, false, false, 0, 0, 0, false, 0, "", "", "") if err != nil { t.Fatal(err) } @@ -81,7 +81,7 @@ func TestHTTPAccessLogDAO_ListAccessLogs_Page(t *testing.T) { times := 0 // 防止循环次数太多 for { before := time.Now() - accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, lastRequestId, 2, timeutil.Format("Ymd"), "", "", 0, 0, 0, false, false, 0, 0, 0, false, 0, "", "", "") + accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, -1, lastRequestId, 2, timeutil.Format("Ymd"), "", "", 0, 0, 0, false, false, 0, 0, 0, false, 0, "", "", "") cost := time.Since(before).Seconds() if err != nil { t.Fatal(err) @@ -112,7 +112,7 @@ func TestHTTPAccessLogDAO_ListAccessLogs_Reverse(t *testing.T) { } before := time.Now() - accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, "16023261176446590001000000000000003500000004", 2, timeutil.Format("Ymd"), "", "", 0, 0, 0, true, false, 0, 0, 0, false, 0, "", "", "") + accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, -1, "16023261176446590001000000000000003500000004", 2, timeutil.Format("Ymd"), "", "", 0, 0, 0, true, false, 0, 0, 0, false, 0, "", "", "") cost := time.Since(before).Seconds() if err != nil { t.Fatal(err) @@ -137,7 +137,7 @@ func TestHTTPAccessLogDAO_ListAccessLogs_Page_NotExists(t *testing.T) { times := 0 // 防止循环次数太多 for { before := time.Now() - accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, lastRequestId, 2, timeutil.Format("Ymd", time.Now().AddDate(0, 0, 1)), "", "", 0, 0, 0, false, false, 0, 0, 0, false, 0, "", "", "") + accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, -1, lastRequestId, 2, timeutil.Format("Ymd", time.Now().AddDate(0, 0, 1)), "", "", 0, 0, 0, false, false, 0, 0, 0, false, 0, "", "", "") cost := time.Since(before).Seconds() if err != nil { t.Fatal(err) diff --git a/internal/db/models/http_access_log_manager.go b/internal/db/models/http_access_log_manager.go index 694a9859..5fafedaa 100644 --- a/internal/db/models/http_access_log_manager.go +++ b/internal/db/models/http_access_log_manager.go @@ -15,6 +15,7 @@ import ( ) // 访问日志的两个表格形式 +// 括号位置需要固定,会用来读取日期和分区 var accessLogTableMainReg = regexp.MustCompile(`_(\d{8})$`) var accessLogTablePartialReg = regexp.MustCompile(`_(\d{8})_(\d{4})$`) @@ -149,6 +150,50 @@ func (this *HTTPAccessLogManager) FindTables(db *dbs.DB, day string) ([]*httpAcc return results, nil } +func (this *HTTPAccessLogManager) FindPartitionTable(db *dbs.DB, day string, partition int32) (*httpAccessLogDefinition, error) { + var tableNames []string + if partition < 0 { + tableList, err := this.FindTables(db, day) + if err != nil { + return nil, err + } + + if len(tableList) > 0 { + return tableList[len(tableList)-1], nil + } + + return &httpAccessLogDefinition{ + Name: "", + HasRemoteAddr: false, + HasDomain: false, + Exists: false, + }, nil + } else if partition == 0 { + tableNames = []string{"edgeHTTPAccessLogs_" + day, "edgehttpaccesslogs_" + day} + } else { + tableNames = []string{"edgeHTTPAccessLogs_" + day + "_" + fmt.Sprintf("%04d", partition), "edgehttpaccesslogs_" + day + "_" + fmt.Sprintf("%04d", partition)} + } + for _, tableName := range tableNames { + hasRemoteField, hasDomainField, err := this.checkTableFields(db, tableName) + if err != nil { + continue + } + return &httpAccessLogDefinition{ + Name: tableName, + HasRemoteAddr: hasRemoteField, + HasDomain: hasDomainField, + Exists: true, + }, nil + } + + return &httpAccessLogDefinition{ + Name: "", + HasRemoteAddr: false, + HasDomain: false, + Exists: false, + }, nil +} + // FindLastTable 根据日期获取上一个可以使用的表名 // 表名组成 // - PREFIX_DAY @@ -218,6 +263,60 @@ func (this *HTTPAccessLogManager) ResetTable(db *dbs.DB, day string) { delete(this.currentTableMapping, this.composeTableCacheKey(config.Dsn, day)) } +// TablePartition 从表名中获取分区 +func (this *HTTPAccessLogManager) TablePartition(tableName string) (partition int32) { + if accessLogTablePartialReg.MatchString(tableName) { + return types.Int32(accessLogTablePartialReg.FindStringSubmatch(tableName)[2]) + } + + return 0 +} + +// FindLatestPartition 读取最后一个分区 +func (this *HTTPAccessLogManager) FindLatestPartition(day string) (int32, error) { + var dbList = AllAccessLogDBs() + if len(dbList) == 0 { + return 0, errors.New("no valid database") + } + + var partitions = []int32{} + var locker sync.Mutex + + var wg = sync.WaitGroup{} + wg.Add(len(dbList)) + + var lastErr error + for _, db := range dbList { + go func(db *dbs.DB) { + defer wg.Done() + + names, err := this.FindTableNames(db, day) + if err != nil { + lastErr = err + } + for _, name := range names { + var partition = this.TablePartition(name) + locker.Lock() + if !lists.Contains(partitions, partition) { + partitions = append(partitions, partition) + } + locker.Unlock() + } + }(db) + } + wg.Wait() + + if lastErr != nil { + return 0, lastErr + } + + if len(partitions) == 0 { + return 0, nil + } + + return partitions[len(partitions)-1], nil +} + // 查找某个表格 func (this *HTTPAccessLogManager) findTableWithoutCache(db *dbs.DB, day string, force bool) (*httpAccessLogDefinition, error) { tableNames, err := this.FindTableNames(db, day) diff --git a/internal/db/models/http_access_log_manager_test.go b/internal/db/models/http_access_log_manager_test.go index 8f1492e0..fcda0ca3 100644 --- a/internal/db/models/http_access_log_manager_test.go +++ b/internal/db/models/http_access_log_manager_test.go @@ -6,6 +6,7 @@ import ( "encoding/json" "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/iwind/TeaGo/dbs" + timeutil "github.com/iwind/TeaGo/utils/time" "testing" "time" ) @@ -157,3 +158,32 @@ func TestHTTPAccessLogManager_FindLastTable(t *testing.T) { t.Log(time.Since(before).Seconds()*1000, "ms") } } + +func TestHTTPAccessLogManager_FindPartitionTable(t *testing.T) { + var config = &dbs.DBConfig{ + Driver: "mysql", + Dsn: "root:123456@tcp(127.0.0.1:3306)/db_edge_log?charset=utf8mb4&timeout=30s", + Prefix: "edge", + Connections: struct { + Pool int `yaml:"pool"` + Max int `yaml:"max"` + Life string `yaml:"life"` + LifeDuration time.Duration `yaml:",omitempty"` + }{}, + Models: struct { + Package string `yaml:"package"` + }{}, + } + + db, err := dbs.NewInstanceFromConfig(config) + if err != nil { + t.Fatal(err) + } + defer func() { + _ = db.Close() + }() + + t.Log(models.SharedHTTPAccessLogManager.FindPartitionTable(db, timeutil.Format("Ymd", time.Now().AddDate(0, 0, -1)), -1)) + t.Log(models.SharedHTTPAccessLogManager.FindPartitionTable(db, timeutil.Format("Ymd", time.Now().AddDate(0, 0, -1)), 0)) + t.Log(models.SharedHTTPAccessLogManager.FindPartitionTable(db, timeutil.Format("Ymd", time.Now().AddDate(0, 0, -1)), 1)) +} diff --git a/internal/rpc/services/service_http_access_log.go b/internal/rpc/services/service_http_access_log.go index 97fcb107..a58e77ab 100644 --- a/internal/rpc/services/service_http_access_log.go +++ b/internal/rpc/services/service_http_access_log.go @@ -4,8 +4,13 @@ import ( "context" "github.com/TeaOSLab/EdgeAPI/internal/accesslogs" "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeAPI/internal/errors" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/lists" + "regexp" + "sync" ) // HTTPAccessLogService 访问日志相关服务 @@ -72,7 +77,7 @@ func (this *HTTPAccessLogService) ListHTTPAccessLogs(ctx context.Context, req *p } } - accessLogs, requestId, hasMore, err := models.SharedHTTPAccessLogDAO.ListAccessLogs(tx, req.RequestId, req.Size, req.Day, req.HourFrom, req.HourTo, req.NodeClusterId, req.NodeId, req.ServerId, req.Reverse, req.HasError, req.FirewallPolicyId, req.FirewallRuleGroupId, req.FirewallRuleSetId, req.HasFirewallPolicy, req.UserId, req.Keyword, req.Ip, req.Domain) + accessLogs, requestId, hasMore, err := models.SharedHTTPAccessLogDAO.ListAccessLogs(tx, req.Partition, req.RequestId, req.Size, req.Day, req.HourFrom, req.HourTo, req.NodeClusterId, req.NodeId, req.ServerId, req.Reverse, req.HasError, req.FirewallPolicyId, req.FirewallRuleGroupId, req.FirewallRuleSetId, req.HasFirewallPolicy, req.UserId, req.Keyword, req.Ip, req.Domain) if err != nil { return nil, err } @@ -165,3 +170,63 @@ func (this *HTTPAccessLogService) FindHTTPAccessLog(ctx context.Context, req *pb } return &pb.FindHTTPAccessLogResponse{HttpAccessLog: a}, nil } + +// FindHTTPAccessLogPartitions 查找日志分区 +func (this *HTTPAccessLogService) FindHTTPAccessLogPartitions(ctx context.Context, req *pb.FindHTTPAccessLogPartitionsRequest) (*pb.FindHTTPAccessLogPartitionsResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + if !regexp.MustCompile(`^\d{8}$`).MatchString(req.Day) { + return nil, errors.New("invalid 'day': " + req.Day) + } + + var dbList = models.AllAccessLogDBs() + if len(dbList) == 0 { + return &pb.FindHTTPAccessLogPartitionsResponse{ + Partitions: nil, + }, nil + } + + var partitions = []int32{} + var locker sync.Mutex + + var wg = sync.WaitGroup{} + wg.Add(len(dbList)) + + var lastErr error + for _, db := range dbList { + go func(db *dbs.DB) { + defer wg.Done() + + names, err := models.SharedHTTPAccessLogManager.FindTableNames(db, req.Day) + if err != nil { + lastErr = err + } + for _, name := range names { + var partition = models.SharedHTTPAccessLogManager.TablePartition(name) + locker.Lock() + if !lists.Contains(partitions, partition) { + partitions = append(partitions, partition) + } + locker.Unlock() + } + }(db) + } + wg.Wait() + + if lastErr != nil { + return nil, lastErr + } + + var reversePartitions = []int32{} + for i := len(partitions) - 1; i >= 0; i-- { + reversePartitions = append(reversePartitions, partitions[i]) + } + + return &pb.FindHTTPAccessLogPartitionsResponse{ + Partitions: partitions, + ReversePartitions: partitions, + }, nil +}