访问日志增加RemoteAddr字段以加速查询

This commit is contained in:
刘祥超
2021-07-14 22:43:31 +08:00
parent 8d71afc176
commit c370dada11
3 changed files with 92 additions and 59 deletions

View File

@@ -19,9 +19,14 @@ import (
var accessLogDBMapping = map[int64]*dbs.DB{} // dbNodeId => DB
var accessLogLocker = &sync.RWMutex{}
type httpAccessLogDefinition struct {
Name string
HasRemoteAddr bool
}
// HTTP服务访问
var httpAccessLogDAOMapping = map[int64]*HTTPAccessLogDAOWrapper{} // dbNodeId => DAO
var httpAccessLogTableMapping = map[string]bool{} // tableName_crc(dsn) => true
var httpAccessLogDAOMapping = map[int64]*HTTPAccessLogDAOWrapper{} // dbNodeId => DAO
var httpAccessLogTableMapping = map[string]*httpAccessLogDefinition{} // tableName_crc(dsn) => true
// DNS服务访问
var nsAccessLogDAOMapping = map[int64]*NSAccessLogDAOWrapper{} // dbNodeId => DAO
@@ -76,7 +81,7 @@ func randomNSAccessLogDAO() (dao *NSAccessLogDAOWrapper) {
}
// 检查表格是否存在
func findHTTPAccessLogTableName(db *dbs.DB, day string) (tableName string, ok bool, err error) {
func findHTTPAccessLogTableName(db *dbs.DB, day string) (tableName string, hasRemoteAddr bool, ok bool, err error) {
if !regexp.MustCompile(`^\d{8}$`).MatchString(day) {
err = errors.New("invalid day '" + day + "', should be YYYYMMDD")
return
@@ -84,25 +89,25 @@ func findHTTPAccessLogTableName(db *dbs.DB, day string) (tableName string, ok bo
config, err := db.Config()
if err != nil {
return "", false, err
return "", false, false, err
}
tableName = "edgeHTTPAccessLogs_" + day
cacheKey := tableName + "_" + fmt.Sprintf("%d", crc32.ChecksumIEEE([]byte(config.Dsn)))
accessLogLocker.RLock()
_, ok = httpAccessLogTableMapping[cacheKey]
def, ok := httpAccessLogTableMapping[cacheKey]
accessLogLocker.RUnlock()
if ok {
return tableName, true, nil
return tableName, def.HasRemoteAddr, true, nil
}
tableNames, err := db.TableNames()
if err != nil {
return tableName, false, err
return tableName, false, false, err
}
return tableName, lists.ContainsString(tableNames, tableName), nil
return tableName, false, lists.ContainsString(tableNames, tableName), nil
}
func findNSAccessLogTableName(db *dbs.DB, day string) (tableName string, ok bool, err error) {
@@ -135,10 +140,10 @@ func findNSAccessLogTableName(db *dbs.DB, day string) (tableName string, ok bool
}
// 根据日期获取表名
func findHTTPAccessLogTable(db *dbs.DB, day string, force bool) (string, error) {
func findHTTPAccessLogTable(db *dbs.DB, day string, force bool) (*httpAccessLogDefinition, error) {
config, err := db.Config()
if err != nil {
return "", err
return nil, err
}
tableName := "edgeHTTPAccessLogs_" + day
@@ -146,36 +151,49 @@ func findHTTPAccessLogTable(db *dbs.DB, day string, force bool) (string, error)
if !force {
accessLogLocker.RLock()
_, ok := httpAccessLogTableMapping[cacheKey]
definition, ok := httpAccessLogTableMapping[cacheKey]
accessLogLocker.RUnlock()
if ok {
return tableName, nil
return definition, nil
}
}
tableNames, err := db.TableNames()
if err != nil {
return tableName, err
return nil, err
}
if lists.ContainsString(tableNames, tableName) {
table, err := db.FindTable(tableName)
if err != nil {
return nil, err
}
accessLogLocker.Lock()
httpAccessLogTableMapping[cacheKey] = true
var definition = &httpAccessLogDefinition{
Name: tableName,
HasRemoteAddr: table.FindFieldWithName("remoteAddr") != nil,
}
httpAccessLogTableMapping[cacheKey] = definition
accessLogLocker.Unlock()
return tableName, nil
return definition, nil
}
// 创建表格
_, err = db.Exec("CREATE TABLE `" + tableName + "` (\n `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',\n `serverId` int(11) unsigned DEFAULT '0' COMMENT '服务ID',\n `nodeId` int(11) unsigned DEFAULT '0' COMMENT '节点ID',\n `status` int(3) unsigned DEFAULT '0' COMMENT '状态码',\n `createdAt` bigint(11) unsigned DEFAULT '0' COMMENT '创建时间',\n `content` json DEFAULT NULL COMMENT '日志内容',\n `requestId` varchar(128) DEFAULT NULL COMMENT '请求ID',\n `firewallPolicyId` int(11) unsigned DEFAULT '0' COMMENT 'WAF策略ID',\n `firewallRuleGroupId` int(11) unsigned DEFAULT '0' COMMENT 'WAF分组ID',\n `firewallRuleSetId` int(11) unsigned DEFAULT '0' COMMENT 'WAF集ID',\n `firewallRuleId` int(11) unsigned DEFAULT '0' COMMENT 'WAF规则ID',\n PRIMARY KEY (`id`),\n KEY `serverId` (`serverId`),\n KEY `nodeId` (`nodeId`),\n KEY `serverId_status` (`serverId`,`status`),\n KEY `requestId` (`requestId`),\n KEY `firewallPolicyId` (`firewallPolicyId`),\n KEY `firewallRuleGroupId` (`firewallRuleGroupId`),\n KEY `firewallRuleSetId` (`firewallRuleSetId`),\n KEY `firewallRuleId` (`firewallRuleId`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='访问日志';")
_, err = db.Exec("CREATE TABLE `" + tableName + "` (`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',`serverId` int(11) unsigned DEFAULT '0' COMMENT '服务ID',`nodeId` int(11) unsigned DEFAULT '0' COMMENT '节点ID',`status` int(3) unsigned DEFAULT '0' COMMENT '状态码',`createdAt` bigint(11) unsigned DEFAULT '0' COMMENT '创建时间',`content` json DEFAULT NULL COMMENT '日志内容',`requestId` varchar(128) DEFAULT NULL COMMENT '请求ID',`firewallPolicyId` int(11) unsigned DEFAULT '0' COMMENT 'WAF策略ID',`firewallRuleGroupId` int(11) unsigned DEFAULT '0' COMMENT 'WAF分组ID',`firewallRuleSetId` int(11) unsigned DEFAULT '0' COMMENT 'WAF集ID',`firewallRuleId` int(11) unsigned DEFAULT '0' COMMENT 'WAF规则ID',`remoteAddr` varchar(64) DEFAULT NULL COMMENT 'IP地址',PRIMARY KEY (`id`),KEY `serverId` (`serverId`),KEY `nodeId` (`nodeId`),KEY `serverId_status` (`serverId`,`status`),KEY `requestId` (`requestId`),KEY `firewallPolicyId` (`firewallPolicyId`),KEY `firewallRuleGroupId` (`firewallRuleGroupId`),KEY `firewallRuleSetId` (`firewallRuleSetId`), KEY `firewallRuleId` (`firewallRuleId`), KEY `remoteAddr` (`remoteAddr`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='访问日志';")
if err != nil {
return tableName, err
return nil, err
}
accessLogLocker.Lock()
httpAccessLogTableMapping[cacheKey] = true
var definition = &httpAccessLogDefinition{
Name: tableName,
HasRemoteAddr: true,
}
httpAccessLogTableMapping[cacheKey] = definition
accessLogLocker.Unlock()
return tableName, nil
return definition, nil
}
func findNSAccessLogTable(db *dbs.DB, day string, force bool) (string, error) {
@@ -315,7 +333,7 @@ func (this *DBNodeInitializer) loop() error {
// 检查表是否存在
// httpAccessLog
{
tableName, err := findHTTPAccessLogTable(db, timeutil.Format("Ymd"), false)
tableDef, err := findHTTPAccessLogTable(db, timeutil.Format("Ymd"), false)
if err != nil {
if !strings.Contains(err.Error(), "1050") { // 非表格已存在错误
logs.Println("[DB_NODE]create first table in database node failed: " + err.Error())
@@ -335,7 +353,7 @@ func (this *DBNodeInitializer) loop() error {
daoObject := dbs.DAOObject{
Instance: db,
DB: node.Name + "(id:" + strconv.Itoa(int(node.Id)) + ")",
Table: tableName,
Table: tableDef.Name,
PkName: "id",
Model: new(HTTPAccessLog),
}
@@ -357,7 +375,6 @@ func (this *DBNodeInitializer) loop() error {
accessLogLocker.Unlock()
}
// nsAccessLog
{
tableName, err := findNSAccessLogTable(db, timeutil.Format("Ymd"), false)