diff --git a/internal/db/models/db_node_initializer.go b/internal/db/models/db_node_initializer.go index e116a0b5..0eb44066 100644 --- a/internal/db/models/db_node_initializer.go +++ b/internal/db/models/db_node_initializer.go @@ -29,8 +29,7 @@ type httpAccessLogDefinition struct { } // HTTP服务访问 -var httpAccessLogDAOMapping = map[int64]*HTTPAccessLogDAOWrapper{} // dbNodeId => DAO -var httpAccessLogTableMapping = map[string]*httpAccessLogDefinition{} // tableName_crc(dsn) => true +var httpAccessLogDAOMapping = map[int64]*HTTPAccessLogDAOWrapper{} // dbNodeId => DAO // DNS服务访问 var nsAccessLogDAOMapping = map[int64]*NSAccessLogDAOWrapper{} // dbNodeId => DAO @@ -86,36 +85,6 @@ func randomNSAccessLogDAO() (dao *NSAccessLogDAOWrapper) { return } -// 检查表格是否存在 -func findHTTPAccessLogTableName(db *dbs.DB, day string) (tableName string, hasRemoteAddr bool, hasDomain bool, ok bool, err error) { - if !regexp.MustCompile(`^\d{8}$`).MatchString(day) { - err = errors.New("invalid day '" + day + "', should be YYYYMMDD") - return - } - - config, err := db.Config() - if err != nil { - return "", false, false, false, err - } - - tableName = "edgeHTTPAccessLogs_" + day - cacheKey := tableName + "_" + fmt.Sprintf("%d", crc32.ChecksumIEEE([]byte(config.Dsn))) - - accessLogLocker.RLock() - def, ok := httpAccessLogTableMapping[cacheKey] - accessLogLocker.RUnlock() - if ok { - return tableName, def.HasRemoteAddr, def.HasDomain, true, nil - } - - def, err = findHTTPAccessLogTable(db, day, false) - if err != nil { - return tableName, false, false, false, err - } - - return tableName, def.HasRemoteAddr, def.HasDomain, def.Exists, nil -} - func findNSAccessLogTableName(db *dbs.DB, day string) (tableName string, ok bool, err error) { if !regexp.MustCompile(`^\d{8}$`).MatchString(day) { err = errors.New("invalid day '" + day + "', should be YYYYMMDD") @@ -145,75 +114,6 @@ func findNSAccessLogTableName(db *dbs.DB, day string) (tableName string, ok bool return tableName, utils.ContainsStringInsensitive(tableNames, tableName), nil } -// 根据日期获取表名 -func findHTTPAccessLogTable(db *dbs.DB, day string, force bool) (*httpAccessLogDefinition, error) { - config, err := db.Config() - if err != nil { - return nil, err - } - - tableName := "edgeHTTPAccessLogs_" + day - cacheKey := tableName + "_" + fmt.Sprintf("%d", crc32.ChecksumIEEE([]byte(config.Dsn))) - - if !force { - accessLogLocker.RLock() - definition, ok := httpAccessLogTableMapping[cacheKey] - accessLogLocker.RUnlock() - if ok { - return definition, nil - } - } - - tableNames, err := db.TableNames() - if err != nil { - return nil, err - } - - if utils.ContainsStringInsensitive(tableNames, tableName) { - table, err := db.FindTable(tableName) - if err != nil { - return nil, err - } - - accessLogLocker.Lock() - var definition = &httpAccessLogDefinition{ - Name: tableName, - HasRemoteAddr: table.FindFieldWithName("remoteAddr") != nil, - HasDomain: table.FindFieldWithName("domain") != nil, - Exists: true, - } - httpAccessLogTableMapping[cacheKey] = definition - accessLogLocker.Unlock() - return definition, nil - } - - if !force { - return &httpAccessLogDefinition{ - Name: tableName, - HasRemoteAddr: true, - HasDomain: true, - Exists: false, - }, nil - } - - // 创建表格 - _, err = db.Exec("CREATE TABLE `" + tableName + "` (\n `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',\n `serverId` int(11) unsigned DEFAULT '0' COMMENT '服务ID',\n `nodeId` int(11) unsigned DEFAULT '0' COMMENT '节点ID',\n `status` int(3) unsigned DEFAULT '0' COMMENT '状态码',\n `createdAt` bigint(11) unsigned DEFAULT '0' COMMENT '创建时间',\n `content` json DEFAULT NULL COMMENT '日志内容',\n `requestId` varchar(128) DEFAULT NULL COMMENT '请求ID',\n `firewallPolicyId` int(11) unsigned DEFAULT '0' COMMENT 'WAF策略ID',\n `firewallRuleGroupId` int(11) unsigned DEFAULT '0' COMMENT 'WAF分组ID',\n `firewallRuleSetId` int(11) unsigned DEFAULT '0' COMMENT 'WAF集ID',\n `firewallRuleId` int(11) unsigned DEFAULT '0' COMMENT 'WAF规则ID',\n `remoteAddr` varchar(64) DEFAULT NULL COMMENT 'IP地址',\n `domain` varchar(128) DEFAULT NULL COMMENT '域名',\n `requestBody` mediumblob COMMENT '请求内容',\n `responseBody` mediumblob COMMENT '响应内容',\n PRIMARY KEY (`id`),\n KEY `serverId` (`serverId`),\n KEY `nodeId` (`nodeId`),\n KEY `serverId_status` (`serverId`,`status`),\n KEY `requestId` (`requestId`),\n KEY `firewallPolicyId` (`firewallPolicyId`),\n KEY `firewallRuleGroupId` (`firewallRuleGroupId`),\n KEY `firewallRuleSetId` (`firewallRuleSetId`),\n KEY `firewallRuleId` (`firewallRuleId`),\n KEY `remoteAddr` (`remoteAddr`),\n KEY `domain` (`domain`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='访问日志';") - if err != nil { - return nil, err - } - - accessLogLocker.Lock() - var definition = &httpAccessLogDefinition{ - Name: tableName, - HasRemoteAddr: true, - Exists: true, - } - httpAccessLogTableMapping[cacheKey] = definition - accessLogLocker.Unlock() - - return definition, nil -} - func findNSAccessLogTable(db *dbs.DB, day string, force bool) (string, error) { config, err := db.Config() if err != nil { @@ -351,22 +251,17 @@ func (this *DBNodeInitializer) loop() error { // 检查表是否存在 // httpAccessLog { - tableDef, err := findHTTPAccessLogTable(db, timeutil.Format("Ymd"), true) + tableDef, err := SharedHTTPAccessLogManager.FindTable(db, timeutil.Format("Ymd"), true) if err != nil { - if !strings.Contains(err.Error(), "1050") { // 非表格已存在错误 - remotelogs.Error("DB_NODE", "create first table in database node failed: "+err.Error()) + remotelogs.Error("DB_NODE", "create first table in database node failed: "+err.Error()) - // 创建节点日志 - createLogErr := SharedNodeLogDAO.CreateLog(nil, nodeconfigs.NodeRoleDatabase, nodeId, 0, 0, "error", "ACCESS_LOG", "can not create access log table: "+err.Error(), time.Now().Unix(), "", nil) - if createLogErr != nil { - remotelogs.Error("NODE_LOG", createLogErr.Error()) - } - - continue - } else { - err = nil - continue + // 创建节点日志 + createLogErr := SharedNodeLogDAO.CreateLog(nil, nodeconfigs.NodeRoleDatabase, nodeId, 0, 0, "error", "ACCESS_LOG", "can not create access log table: "+err.Error(), time.Now().Unix(), "", nil) + if createLogErr != nil { + remotelogs.Error("NODE_LOG", createLogErr.Error()) } + + continue } daoObject := dbs.DAOObject{ diff --git a/internal/db/models/db_node_initializer_test.go b/internal/db/models/db_node_initializer_test.go index 2e35a323..5ef86db9 100644 --- a/internal/db/models/db_node_initializer_test.go +++ b/internal/db/models/db_node_initializer_test.go @@ -1,9 +1,7 @@ package models import ( - "runtime" "testing" - "time" ) func TestDBNodeInitializer_loop(t *testing.T) { @@ -14,32 +12,3 @@ func TestDBNodeInitializer_loop(t *testing.T) { } t.Log(len(accessLogDBMapping), len(httpAccessLogDAOMapping)) } - -func TestFindAccessLogTable(t *testing.T) { - before := time.Now() - db := SharedHTTPAccessLogDAO.Instance - tableName, err := findHTTPAccessLogTable(db, "20201010", false) - if err != nil { - t.Fatal(err) - } - t.Log(tableName) - t.Log(time.Since(before).Seconds()*1000, "ms") - - before = time.Now() - tableName, err = findHTTPAccessLogTable(db, "20201010", false) - - if err != nil { - t.Fatal(err) - } - t.Log(tableName) - t.Log(time.Since(before).Seconds()*1000, "ms") -} - -func BenchmarkFindAccessLogTable(b *testing.B) { - db := SharedHTTPAccessLogDAO.Instance - - runtime.GOMAXPROCS(1) - for i := 0; i < b.N; i++ { - _, _ = findHTTPAccessLogTable(db, "20201010", false) - } -} diff --git a/internal/db/models/http_access_log_dao.go b/internal/db/models/http_access_log_dao.go index d462fe24..b03767c3 100644 --- a/internal/db/models/http_access_log_dao.go +++ b/internal/db/models/http_access_log_dao.go @@ -34,13 +34,25 @@ type HTTPAccessLogDAO dbs.DAO var SharedHTTPAccessLogDAO *HTTPAccessLogDAO // 队列 -var oldAccessLogQueue = make(chan *pb.HTTPAccessLog) -var accessLogQueue = make(chan *pb.HTTPAccessLog, 10_000) -var accessLogQueueMaxLength = 100_000 -var accessLogQueuePercent = 100 // 0-100 -var accessLogCountPerSecond = 10_000 // 0 表示不限制 -var accessLogConfigJSON = []byte{} -var accessLogQueueChanged = make(chan zero.Zero, 1) +var ( + oldAccessLogQueue = make(chan *pb.HTTPAccessLog) + accessLogQueue = make(chan *pb.HTTPAccessLog, 10_000) + accessLogQueueMaxLength = 100_000 + accessLogQueuePercent = 100 // 0-100 + accessLogCountPerSecond = 10_000 // 0 表示不限制 + accessLogConfigJSON = []byte{} + accessLogQueueChanged = make(chan zero.Zero, 1) + + accessLogEnableAutoPartial = true // 是否启用自动分表 + accessLogPartialRows int64 = 500_000 // 自动分表的单表最大值 +) + +type accessLogTableQuery struct { + daoWrapper *HTTPAccessLogDAOWrapper + name string + hasRemoteAddrField bool + hasDomainField bool +} func init() { dbs.OnReady(func() { @@ -120,7 +132,7 @@ func (this *HTTPAccessLogDAO) CreateHTTPAccessLogs(tx *dbs.Tx, accessLogs []*pb. // DumpAccessLogsFromQueue 从队列导入访问日志 func (this *HTTPAccessLogDAO) DumpAccessLogsFromQueue(tx *dbs.Tx, size int) error { - dao := randomHTTPAccessLogDAO() + var dao = randomHTTPAccessLogDAO() if dao == nil { dao = &HTTPAccessLogDAOWrapper{ DAO: SharedHTTPAccessLogDAO, @@ -168,8 +180,8 @@ Loop: // CreateHTTPAccessLog 写入单条访问日志 func (this *HTTPAccessLogDAO) CreateHTTPAccessLog(tx *dbs.Tx, dao *HTTPAccessLogDAO, accessLog *pb.HTTPAccessLog) error { - day := timeutil.Format("Ymd", time.Unix(accessLog.Timestamp, 0)) - tableDef, err := findHTTPAccessLogTable(dao.Instance, day, false) + var day = timeutil.Format("Ymd", time.Unix(accessLog.Timestamp, 0)) + tableDef, err := SharedHTTPAccessLogManager.FindTable(dao.Instance, day, true) if err != nil { return err } @@ -203,27 +215,17 @@ func (this *HTTPAccessLogDAO) CreateHTTPAccessLog(tx *dbs.Tx, dao *HTTPAccessLog } fields["content"] = content - _, err = dao.Query(tx). + var lastId int64 + lastId, err = dao.Query(tx). Table(tableDef.Name). Sets(fields). Insert() if err != nil { - // 是否为 Error 1146: Table 'xxx.xxx' doesn't exist 如果是,则创建表之后重试 - if strings.Contains(err.Error(), "1146") { - tableDef, err = findHTTPAccessLogTable(dao.Instance, day, true) - if err != nil { - return err - } - _, err = dao.Query(tx). - Table(tableDef.Name). - Sets(fields). - Insert() - if err != nil { - return err - } - } else { - remotelogs.Error("HTTP_ACCESS_LOG", err.Error()) - } + return err + } + + if lastId%accessLogPartialRows == 0 { + SharedHTTPAccessLogManager.ResetTable(dao.Instance, day) } return nil @@ -296,42 +298,56 @@ func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx, lastRequestId string, s }} } - locker := sync.Mutex{} + // 查询某个集群下的节点 + var nodeIds = []int64{} + if clusterId > 0 { + nodeIds, err = SharedNodeDAO.FindAllEnabledNodeIdsWithClusterId(tx, clusterId) + if err != nil { + remotelogs.Error("DBNODE", err.Error()) + return + } + sort.Slice(nodeIds, func(i, j int) bool { + return nodeIds[i] < nodeIds[j] + }) + } - count := len(daoList) - wg := &sync.WaitGroup{} - wg.Add(count) + // 准备查询 + var tableQueries = []*accessLogTableQuery{} for _, daoWrapper := range daoList { - go func(daoWrapper *HTTPAccessLogDAOWrapper) { + var instance = daoWrapper.DAO.Instance + tableDefs, err := SharedHTTPAccessLogManager.FindTables(instance, day) + if err != nil { + return nil, "", err + } + for _, def := range tableDefs { + tableQueries = append(tableQueries, &accessLogTableQuery{ + daoWrapper: daoWrapper, + name: def.Name, + hasRemoteAddrField: def.HasRemoteAddr, + hasDomainField: def.HasDomain, + }) + } + } + + var locker = sync.Mutex{} + + var statusPrefixReg = regexp.MustCompile(`status:\s*(\d{3})`) + + var count = len(tableQueries) + var wg = &sync.WaitGroup{} + wg.Add(count) + for _, tableQuery := range tableQueries { + go func(tableQuery *accessLogTableQuery) { defer wg.Done() - dao := daoWrapper.DAO - - tableName, hasRemoteAddrField, hasDomainField, exists, err := findHTTPAccessLogTableName(dao.Instance, day) - if err != nil { - logs.Println("[DB_NODE]" + err.Error()) - return - } - if !exists { - // 表格不存在则跳过 - return - } - - query := dao.Query(tx) + var dao = tableQuery.daoWrapper.DAO + var query = dao.Query(tx) // 条件 if nodeId > 0 { query.Attr("nodeId", nodeId) } else if clusterId > 0 { - nodeIds, err := SharedNodeDAO.FindAllEnabledNodeIdsWithClusterId(tx, clusterId) - if err != nil { - remotelogs.Error("DBNODE", err.Error()) - return - } if len(nodeIds) > 0 { - sort.Slice(nodeIds, func(i, j int) bool { - return nodeIds[i] < nodeIds[j] - }) var nodeIdStrings = []string{} for _, subNodeId := range nodeIds { nodeIdStrings = append(nodeIdStrings, types.String(subNodeId)) @@ -370,7 +386,7 @@ func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx, lastRequestId string, s // keyword if len(ip) > 0 { // TODO 支持IP范围 - if hasRemoteAddrField { + if tableQuery.hasRemoteAddrField { // IP格式 if strings.Contains(ip, ",") || strings.Contains(ip, "-") { rangeConfig, err := shared.ParseIPRange(ip) @@ -389,7 +405,7 @@ func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx, lastRequestId string, s } } if len(domain) > 0 { - if hasDomainField { + if tableQuery.hasDomainField { if strings.Contains(domain, "*") { domain = strings.ReplaceAll(domain, "*", "%") domain = regexp.MustCompile(`[^a-zA-Z0-9-.%]`).ReplaceAllString(domain, "") @@ -404,11 +420,12 @@ func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx, lastRequestId string, s Param("host1", domain) } } + if len(keyword) > 0 { // remoteAddr - if hasRemoteAddrField && net.ParseIP(keyword) != nil { + if tableQuery.hasRemoteAddrField && net.ParseIP(keyword) != nil { query.Attr("remoteAddr", keyword) - } else if hasRemoteAddrField && regexp.MustCompile(`^ip:.+`).MatchString(keyword) { + } else if tableQuery.hasRemoteAddrField && regexp.MustCompile(`^ip:.+`).MatchString(keyword) { keyword = keyword[3:] pieces := strings.SplitN(keyword, ",", 2) if len(pieces) == 1 || len(pieces[1]) == 0 { @@ -416,12 +433,15 @@ func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx, lastRequestId string, s } else { query.Between("INET_ATON(remoteAddr)", utils.IP2Long(pieces[0]), utils.IP2Long(pieces[1])) } + } else if statusPrefixReg.MatchString(keyword) { + var matches = statusPrefixReg.FindStringSubmatch(keyword) + query.Attr("status", matches[1]) } else { if regexp.MustCompile(`^ip:.+`).MatchString(keyword) { keyword = keyword[3:] } - useOriginKeyword := false + var useOriginKeyword = false where := "JSON_EXTRACT(content, '$.remoteAddr') LIKE :keyword OR JSON_EXTRACT(content, '$.requestURI') LIKE :keyword OR JSON_EXTRACT(content, '$.host') LIKE :keyword OR JSON_EXTRACT(content, '$.userAgent') LIKE :keyword" @@ -447,13 +467,13 @@ func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx, lastRequestId string, s // 响应状态码 if regexp.MustCompile(`^\d{3}$`).MatchString(keyword) { - where += " OR JSON_EXTRACT(content, '$.status')=:intKeyword" + where += " OR status=:intKeyword" query.Param("intKeyword", types.Int(keyword)) } if regexp.MustCompile(`^\d{3}-\d{3}$`).MatchString(keyword) { pieces := strings.Split(keyword, "-") - where += " OR JSON_EXTRACT(content, '$.status') BETWEEN :intKeyword1 AND :intKeyword2" + where += " OR status BETWEEN :intKeyword1 AND :intKeyword2" query.Param("intKeyword1", types.Int(pieces[0])) query.Param("intKeyword2", types.Int(pieces[1])) } @@ -490,20 +510,21 @@ func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx, lastRequestId string, s // 开始查询 ones, err := query. - Table(tableName). + Table(tableQuery.name). Limit(size). FindAll() if err != nil { logs.Println("[DB_NODE]" + err.Error()) return } + locker.Lock() for _, one := range ones { accessLog := one.(*HTTPAccessLog) result = append(result, accessLog) } locker.Unlock() - }(daoWrapper) + }(tableQuery) } wg.Wait() @@ -524,7 +545,7 @@ func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx, lastRequestId string, s result = result[:size] } - requestId := result[len(result)-1].RequestId + var requestId = result[len(result)-1].RequestId if reverse { lists.Reverse(result) } @@ -556,28 +577,36 @@ func (this *HTTPAccessLogDAO) FindAccessLogWithRequestId(tx *dbs.Tx, requestId s }} } - count := len(daoList) - wg := &sync.WaitGroup{} + // 准备查询 + var day = timeutil.FormatTime("Ymd", types.Int64(requestId[:10])) + var tableQueries = []*accessLogTableQuery{} + for _, daoWrapper := range daoList { + var instance = daoWrapper.DAO.Instance + tableDefs, err := SharedHTTPAccessLogManager.FindTables(instance, day) + if err != nil { + return nil, err + } + for _, def := range tableDefs { + tableQueries = append(tableQueries, &accessLogTableQuery{ + daoWrapper: daoWrapper, + name: def.Name, + hasRemoteAddrField: def.HasRemoteAddr, + hasDomainField: def.HasDomain, + }) + } + } + + var count = len(tableQueries) + var wg = &sync.WaitGroup{} wg.Add(count) var result *HTTPAccessLog = nil - day := timeutil.FormatTime("Ymd", types.Int64(requestId[:10])) - for _, daoWrapper := range daoList { - go func(daoWrapper *HTTPAccessLogDAOWrapper) { + for _, tableQuery := range tableQueries { + go func(tableQuery *accessLogTableQuery) { defer wg.Done() - dao := daoWrapper.DAO - - tableName, _, _, exists, err := findHTTPAccessLogTableName(dao.Instance, day) - if err != nil { - logs.Println("[DB_NODE]" + err.Error()) - return - } - if !exists { - return - } - + var dao = tableQuery.daoWrapper.DAO one, err := dao.Query(tx). - Table(tableName). + Table(tableQuery.name). Attr("requestId", requestId). Find() if err != nil { @@ -587,7 +616,7 @@ func (this *HTTPAccessLogDAO) FindAccessLogWithRequestId(tx *dbs.Tx, requestId s if one != nil { result = one.(*HTTPAccessLog) } - }(daoWrapper) + }(tableQuery) } wg.Wait() return result, nil diff --git a/internal/db/models/http_access_log_manager.go b/internal/db/models/http_access_log_manager.go new file mode 100644 index 00000000..2fdcbb31 --- /dev/null +++ b/internal/db/models/http_access_log_manager.go @@ -0,0 +1,297 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package models + +import ( + "fmt" + "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/go-sql-driver/mysql" + "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/lists" + "github.com/iwind/TeaGo/types" + "regexp" + "sort" + "strings" + "sync" +) + +// 访问日志的两个表格形式 +var accessLogTableMainReg = regexp.MustCompile(`_(\d{8})$`) +var accessLogTablePartialReg = regexp.MustCompile(`_(\d{8})_(\d{4})$`) + +var SharedHTTPAccessLogManager = NewHTTPAccessLogManager() + +type HTTPAccessLogManager struct { + currentTableMapping map[string]*httpAccessLogDefinition // dsn => def + + locker sync.Mutex +} + +func NewHTTPAccessLogManager() *HTTPAccessLogManager { + return &HTTPAccessLogManager{ + currentTableMapping: map[string]*httpAccessLogDefinition{}, + } +} + +// FindTableNames 读取数据库中某日所有日志表名称 +func (this *HTTPAccessLogManager) FindTableNames(db *dbs.DB, day string) ([]string, error) { + var results = []string{} + + // 需要防止用户设置了表名自动小写 + for _, prefix := range []string{"edgeHTTPAccessLogs_" + day + "%", "edgehttpaccesslogs_" + day + "%"} { + ones, columnNames, err := db.FindOnes(`SHOW TABLES LIKE '` + prefix + `'`) + if err != nil { + return nil, errors.New("query table names error: " + err.Error()) + } + + var columnName = columnNames[0] + + for _, one := range ones { + var tableName = one[columnName].(string) + + if lists.ContainsString(results, tableName) { + continue + } + + if accessLogTableMainReg.MatchString(tableName) || accessLogTablePartialReg.MatchString(tableName) { + results = append(results, tableName) + } + } + } + + // 排序 + sort.Strings(results) + + return results, nil +} + +// FindTables 读取数据库中某日所有日志表 +func (this *HTTPAccessLogManager) FindTables(db *dbs.DB, day string) ([]*httpAccessLogDefinition, error) { + var results = []*httpAccessLogDefinition{} + var tableNames = []string{} + + // 需要防止用户设置了表名自动小写 + for _, prefix := range []string{"edgeHTTPAccessLogs_" + day + "%", "edgehttpaccesslogs_" + day + "%"} { + ones, columnNames, err := db.FindOnes(`SHOW TABLES LIKE '` + prefix + `'`) + if err != nil { + return nil, errors.New("query table names error: " + err.Error()) + } + + var columnName = columnNames[0] + + for _, one := range ones { + var tableName = one[columnName].(string) + + if lists.ContainsString(tableNames, tableName) { + continue + } + + if accessLogTableMainReg.MatchString(tableName) { + tableNames = append(tableNames, tableName) + + hasRemoteAddrField, hasDomainField, err := this.checkTableFields(db, tableName) + if err != nil { + return nil, err + } + + results = append(results, &httpAccessLogDefinition{ + Name: tableName, + HasRemoteAddr: hasRemoteAddrField, + HasDomain: hasDomainField, + Exists: true, + }) + } else if accessLogTablePartialReg.MatchString(tableName) { + tableNames = append(tableNames, tableName) + + results = append(results, &httpAccessLogDefinition{ + Name: tableName, + HasRemoteAddr: true, + HasDomain: true, + Exists: true, + }) + } + } + } + + // 排序 + sort.Slice(results, func(i, j int) bool { + return results[i].Name < results[j].Name + }) + + return results, nil +} + +// FindTable 根据日期获取表名 +// 表名组成 +// - PREFIX_DAY +// - PREFIX_DAY_0001 +func (this *HTTPAccessLogManager) FindTable(db *dbs.DB, day string, force bool) (*httpAccessLogDefinition, error) { + this.locker.Lock() + defer this.locker.Unlock() + + config, err := db.Config() + if err != nil { + return nil, err + } + var cacheKey = config.Dsn + def, ok := this.currentTableMapping[cacheKey] + if ok { + return def, nil + } + + def, err = this.findTableWithoutCache(db, day, force) + if err != nil { + return nil, err + } + + this.currentTableMapping[cacheKey] = def + return def, nil +} + +// CreateTable 创建访问日志表格 +func (this *HTTPAccessLogManager) CreateTable(db *dbs.DB, tableName string) error { + _, err := db.Exec("CREATE TABLE `" + tableName + "` (\n `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',\n `serverId` int(11) unsigned DEFAULT '0' COMMENT '服务ID',\n `nodeId` int(11) unsigned DEFAULT '0' COMMENT '节点ID',\n `status` int(3) unsigned DEFAULT '0' COMMENT '状态码',\n `createdAt` bigint(11) unsigned DEFAULT '0' COMMENT '创建时间',\n `content` json DEFAULT NULL COMMENT '日志内容',\n `requestId` varchar(128) DEFAULT NULL COMMENT '请求ID',\n `firewallPolicyId` int(11) unsigned DEFAULT '0' COMMENT 'WAF策略ID',\n `firewallRuleGroupId` int(11) unsigned DEFAULT '0' COMMENT 'WAF分组ID',\n `firewallRuleSetId` int(11) unsigned DEFAULT '0' COMMENT 'WAF集ID',\n `firewallRuleId` int(11) unsigned DEFAULT '0' COMMENT 'WAF规则ID',\n `remoteAddr` varchar(64) DEFAULT NULL COMMENT 'IP地址',\n `domain` varchar(128) DEFAULT NULL COMMENT '域名',\n `requestBody` mediumblob COMMENT '请求内容',\n `responseBody` mediumblob COMMENT '响应内容',\n PRIMARY KEY (`id`),\n KEY `serverId` (`serverId`),\n KEY `nodeId` (`nodeId`),\n KEY `serverId_status` (`serverId`,`status`),\n KEY `requestId` (`requestId`),\n KEY `firewallPolicyId` (`firewallPolicyId`),\n KEY `firewallRuleGroupId` (`firewallRuleGroupId`),\n KEY `firewallRuleSetId` (`firewallRuleSetId`),\n KEY `firewallRuleId` (`firewallRuleId`),\n KEY `remoteAddr` (`remoteAddr`),\n KEY `domain` (`domain`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='访问日志';") + if err != nil { + // 快速判断错误方法 + mysqlErr, ok := err.(*mysql.MySQLError) + if ok && mysqlErr.Number == 1050 { // Error 1050: Table 'xxx' already exists + return nil + } + + // 防止二次包装过程中错误丢失的保底错误判断方法 + if strings.Contains(err.Error(), "Error 1050") { + return nil + } + + return err + } + + return nil +} + +// ResetTable 清除某个数据库表名缓存 +func (this *HTTPAccessLogManager) ResetTable(db *dbs.DB, day string) { + this.locker.Lock() + defer this.locker.Unlock() + + config, err := db.Config() + if err != nil { + return + } + delete(this.currentTableMapping, config.Dsn) +} + +// 查找某个表格 +func (this *HTTPAccessLogManager) findTableWithoutCache(db *dbs.DB, day string, force bool) (*httpAccessLogDefinition, error) { + tableNames, err := this.FindTableNames(db, day) + if err != nil { + return nil, err + } + + var prefix = "edgeHTTPAccessLogs_" + day + + if len(tableNames) == 0 { + if force { + err := this.CreateTable(db, prefix) + if err != nil { + return nil, err + } + + return &httpAccessLogDefinition{ + Name: prefix, + HasRemoteAddr: true, + HasDomain: true, + Exists: true, + }, nil + } + + return &httpAccessLogDefinition{ + Name: prefix, + HasRemoteAddr: true, + HasDomain: true, + Exists: false, + }, nil + } + + var lastTableName = tableNames[len(tableNames)-1] + if !force || !accessLogEnableAutoPartial { + hasRemoteAddrField, hasDomainField, err := this.checkTableFields(db, lastTableName) + if err != nil { + return nil, err + } + return &httpAccessLogDefinition{ + Name: lastTableName, + HasRemoteAddr: hasRemoteAddrField, + HasDomain: hasDomainField, + Exists: true, + }, nil + } + + // 检查是否生成下个分表 + lastId, err := db.FindCol(0, "SELECT id FROM "+lastTableName+" ORDER BY id DESC LIMIT 1") + if err != nil { + return nil, err + } + + if lastId != nil { + var lastInt64Id = types.Int64(lastId) + if lastInt64Id >= accessLogPartialRows { + // create next partial table + var nextTableName = "" + if accessLogTableMainReg.MatchString(lastTableName) { + nextTableName = prefix + "_0001" + } else if accessLogTablePartialReg.MatchString(lastTableName) { + var matches = accessLogTablePartialReg.FindStringSubmatch(lastTableName) + if len(matches) < 3 { + return nil, errors.New("fatal error: invalid 'accessLogTablePartialReg'") + } + var lastPartial = matches[2] + nextTableName = prefix + "_" + fmt.Sprintf("%04d", types.Int(lastPartial)+1) + } else { + nextTableName = prefix + "_0001" + } + + err = this.CreateTable(db, nextTableName) + if err != nil { + return nil, err + } + + return &httpAccessLogDefinition{ + Name: nextTableName, + HasRemoteAddr: true, + HasDomain: true, + Exists: true, + }, nil + } + } + + // 检查字段 + hasRemoteAddrField, hasDomainField, err := this.checkTableFields(db, lastTableName) + if err != nil { + return nil, err + } + return &httpAccessLogDefinition{ + Name: lastTableName, + HasRemoteAddr: hasRemoteAddrField, + HasDomain: hasDomainField, + Exists: true, + }, nil +} + +// TODO 考虑缓存检查结果 +func (this *HTTPAccessLogManager) checkTableFields(db *dbs.DB, tableName string) (hasRemoteAddrField bool, hasDomainField bool, err error) { + fields, _, err := db.FindOnes("SHOW FIELDS FROM " + tableName) + if err != nil { + return false, false, err + } + for _, field := range fields { + var fieldName = field.GetString("Field") + if strings.ToLower(fieldName) == strings.ToLower("remoteAddr") { + hasRemoteAddrField = true + } + if strings.ToLower(fieldName) == "domain" { + hasDomainField = true + } + } + return +} diff --git a/internal/db/models/http_access_log_manager_test.go b/internal/db/models/http_access_log_manager_test.go new file mode 100644 index 00000000..08a81d85 --- /dev/null +++ b/internal/db/models/http_access_log_manager_test.go @@ -0,0 +1,148 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package models_test + +import ( + "encoding/json" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/iwind/TeaGo/dbs" + "testing" + "time" +) + +func TestNewHTTPAccessLogManager(t *testing.T) { + var config = &dbs.DBConfig{ + Driver: "mysql", + Dsn: "root:123456@tcp(127.0.0.1:3306)/db_edge_log?charset=utf8mb4&timeout=30s", + Prefix: "edge", + Connections: struct { + Pool int `yaml:"pool"` + Max int `yaml:"max"` + Life string `yaml:"life"` + LifeDuration time.Duration `yaml:",omitempty"` + }{}, + Models: struct { + Package string `yaml:"package"` + }{}, + } + + db, err := dbs.NewInstanceFromConfig(config) + if err != nil { + t.Fatal(err) + } + + var manager = models.SharedHTTPAccessLogManager + err = manager.CreateTable(db, "accessLog_1") + if err != nil { + t.Fatal(err) + } +} + +func TestHTTPAccessLogManager_FindTableNames(t *testing.T) { + var config = &dbs.DBConfig{ + Driver: "mysql", + Dsn: "root:123456@tcp(127.0.0.1:3306)/db_edge_log?charset=utf8mb4&timeout=30s", + Prefix: "edge", + Connections: struct { + Pool int `yaml:"pool"` + Max int `yaml:"max"` + Life string `yaml:"life"` + LifeDuration time.Duration `yaml:",omitempty"` + }{}, + Models: struct { + Package string `yaml:"package"` + }{}, + } + + db, err := dbs.NewInstanceFromConfig(config) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 3; i++ { + var before = time.Now() + tables, err := models.SharedHTTPAccessLogManager.FindTables(db, "20220306") + if err != nil { + t.Fatal(err) + } + data, err := json.Marshal(tables) + if err != nil { + t.Fatal(err) + } + t.Log(string(data)) + t.Log(time.Since(before).Seconds()*1000, "ms") + } +} + + +func TestHTTPAccessLogManager_FindTables(t *testing.T) { + var config = &dbs.DBConfig{ + Driver: "mysql", + Dsn: "root:123456@tcp(127.0.0.1:3306)/db_edge_log?charset=utf8mb4&timeout=30s", + Prefix: "edge", + Connections: struct { + Pool int `yaml:"pool"` + Max int `yaml:"max"` + Life string `yaml:"life"` + LifeDuration time.Duration `yaml:",omitempty"` + }{}, + Models: struct { + Package string `yaml:"package"` + }{}, + } + + db, err := dbs.NewInstanceFromConfig(config) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 3; i++ { + var before = time.Now() + tables, err := models.SharedHTTPAccessLogManager.FindTables(db, "20220306") + if err != nil { + t.Fatal(err) + } + data, err := json.Marshal(tables) + if err != nil { + t.Fatal(err) + } + t.Log(string(data)) + t.Log(time.Since(before).Seconds()*1000, "ms") + } +} + +func TestHTTPAccessLogManager_FindTable(t *testing.T) { + var config = &dbs.DBConfig{ + Driver: "mysql", + Dsn: "root:123456@tcp(127.0.0.1:3306)/db_edge_log?charset=utf8mb4&timeout=30s", + Prefix: "edge", + Connections: struct { + Pool int `yaml:"pool"` + Max int `yaml:"max"` + Life string `yaml:"life"` + LifeDuration time.Duration `yaml:",omitempty"` + }{}, + Models: struct { + Package string `yaml:"package"` + }{}, + } + + db, err := dbs.NewInstanceFromConfig(config) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 3; i++ { + var before = time.Now() + tableDef, err := models.SharedHTTPAccessLogManager.FindTable(db, "20220306", false) + if err != nil { + t.Fatal(err) + } + data, err := json.Marshal(tableDef) + if err != nil { + t.Fatal(err) + } + t.Log(string(data)) + t.Log(time.Since(before).Seconds()*1000, "ms") + } +} diff --git a/internal/rpc/services/service_db.go b/internal/rpc/services/service_db.go index 1e913406..5ac043b3 100644 --- a/internal/rpc/services/service_db.go +++ b/internal/rpc/services/service_db.go @@ -9,12 +9,12 @@ import ( "strings" ) -// 数据库相关服务 +// DBService 数据库相关服务 type DBService struct { BaseService } -// 获取所有表信息 +// FindAllDBTables 获取所有表信息 func (this *DBService) FindAllDBTables(ctx context.Context, req *pb.FindAllDBTablesRequest) (*pb.FindAllDBTablesResponse, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil { @@ -60,7 +60,7 @@ func (this *DBService) FindAllDBTables(ctx context.Context, req *pb.FindAllDBTab return &pb.FindAllDBTablesResponse{DbTables: pbTables}, nil } -// 删除表 +// DeleteDBTable 删除表 func (this *DBService) DeleteDBTable(ctx context.Context, req *pb.DeleteDBTableRequest) (*pb.RPCSuccess, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil { @@ -84,7 +84,7 @@ func (this *DBService) DeleteDBTable(ctx context.Context, req *pb.DeleteDBTableR return this.Success() } -// 清空表 +// TruncateDBTable 清空表 func (this *DBService) TruncateDBTable(ctx context.Context, req *pb.TruncateDBTableRequest) (*pb.RPCSuccess, error) { _, err := this.ValidateAdmin(ctx, 0) if err != nil { diff --git a/internal/setup/sql_dump.go b/internal/setup/sql_dump.go index 2b9062f0..eaf5c8e8 100644 --- a/internal/setup/sql_dump.go +++ b/internal/setup/sql_dump.go @@ -45,7 +45,10 @@ func (this *SQLDump) Dump(db *dbs.DB) (result *SQLDumpResult, err error) { } for _, tableName := range tableNames { // 忽略一些分表 - if strings.HasPrefix(tableName, "edgeHTTPAccessLogs_") { + if strings.HasPrefix(strings.ToLower(tableName), strings.ToLower("edgeHTTPAccessLogs_")) { + continue + } + if strings.HasPrefix(strings.ToLower(tableName), strings.ToLower("edgeNSAccessLogs_")) { continue } diff --git a/internal/tasks/log_task_test.go b/internal/tasks/log_task_test.go index ba03f433..933064bf 100644 --- a/internal/tasks/log_task_test.go +++ b/internal/tasks/log_task_test.go @@ -9,7 +9,7 @@ func TestLogTask_loopClean(t *testing.T) { dbs.NotifyReady() task := NewLogTask() - err := task.loopClean(5) + err := task.loopClean() if err != nil { t.Fatal(err) } diff --git a/internal/tasks/server_access_log_cleaner.go b/internal/tasks/server_access_log_cleaner.go index 66b1c222..6f7347ef 100644 --- a/internal/tasks/server_access_log_cleaner.go +++ b/internal/tasks/server_access_log_cleaner.go @@ -103,23 +103,18 @@ func (this *ServerAccessLogCleaner) cleanDB(db *dbs.DB, endDay string) error { return errors.New("invalid column names: " + strings.Join(columnNames, ", ")) } columnName := columnNames[0] + var reg = regexp.MustCompile(`^(?i)(edgeHTTPAccessLogs|edgeNSAccessLogs)_(\d{8})(_\d{4})?$`) for _, one := range ones { - tableName := one.GetString(columnName) + var tableName = one.GetString(columnName) if len(tableName) == 0 { continue } - ok, err := regexp.MatchString(`^(?i)(edgeHTTPAccessLogs|edgeNSAccessLogs)_(\d{8})$`, tableName) - if err != nil { - return err - } - if !ok { + if !reg.MatchString(tableName) { continue } - index := strings.LastIndex(tableName, "_") - if index < 0 { - continue - } - day := tableName[index+1:] + var matches = reg.FindStringSubmatch(tableName) + var day = matches[2] + if day < endDay { _, err = db.Exec("DROP TABLE " + tableName) if err != nil { diff --git a/internal/tasks/server_access_log_cleaner_test.go b/internal/tasks/server_access_log_cleaner_test.go index 8c2e6d67..637991f2 100644 --- a/internal/tasks/server_access_log_cleaner_test.go +++ b/internal/tasks/server_access_log_cleaner_test.go @@ -1,6 +1,7 @@ -package tasks +package tasks_test import ( + "github.com/TeaOSLab/EdgeAPI/internal/tasks" "github.com/iwind/TeaGo/dbs" "testing" ) @@ -8,7 +9,7 @@ import ( func TestServerAccessLogCleaner_Loop(t *testing.T) { dbs.NotifyReady() - task := NewServerAccessLogCleaner() + task := tasks.NewServerAccessLogCleaner() err := task.Loop() if err != nil { t.Fatal(err)