diff --git a/internal/db/models/db_node_initializer.go b/internal/db/models/db_node_initializer.go index 0cdf2a7d..5e4abf1b 100644 --- a/internal/db/models/db_node_initializer.go +++ b/internal/db/models/db_node_initializer.go @@ -19,9 +19,14 @@ import ( var accessLogDBMapping = map[int64]*dbs.DB{} // dbNodeId => DB var accessLogLocker = &sync.RWMutex{} +type httpAccessLogDefinition struct { + Name string + HasRemoteAddr bool +} + // HTTP服务访问 -var httpAccessLogDAOMapping = map[int64]*HTTPAccessLogDAOWrapper{} // dbNodeId => DAO -var httpAccessLogTableMapping = map[string]bool{} // tableName_crc(dsn) => true +var httpAccessLogDAOMapping = map[int64]*HTTPAccessLogDAOWrapper{} // dbNodeId => DAO +var httpAccessLogTableMapping = map[string]*httpAccessLogDefinition{} // tableName_crc(dsn) => true // DNS服务访问 var nsAccessLogDAOMapping = map[int64]*NSAccessLogDAOWrapper{} // dbNodeId => DAO @@ -76,7 +81,7 @@ func randomNSAccessLogDAO() (dao *NSAccessLogDAOWrapper) { } // 检查表格是否存在 -func findHTTPAccessLogTableName(db *dbs.DB, day string) (tableName string, ok bool, err error) { +func findHTTPAccessLogTableName(db *dbs.DB, day string) (tableName string, hasRemoteAddr bool, ok bool, err error) { if !regexp.MustCompile(`^\d{8}$`).MatchString(day) { err = errors.New("invalid day '" + day + "', should be YYYYMMDD") return @@ -84,25 +89,25 @@ func findHTTPAccessLogTableName(db *dbs.DB, day string) (tableName string, ok bo config, err := db.Config() if err != nil { - return "", false, err + return "", false, false, err } tableName = "edgeHTTPAccessLogs_" + day cacheKey := tableName + "_" + fmt.Sprintf("%d", crc32.ChecksumIEEE([]byte(config.Dsn))) accessLogLocker.RLock() - _, ok = httpAccessLogTableMapping[cacheKey] + def, ok := httpAccessLogTableMapping[cacheKey] accessLogLocker.RUnlock() if ok { - return tableName, true, nil + return tableName, def.HasRemoteAddr, true, nil } tableNames, err := db.TableNames() if err != nil { - return tableName, false, err + return tableName, false, false, err } - return tableName, lists.ContainsString(tableNames, tableName), nil + return tableName, false, lists.ContainsString(tableNames, tableName), nil } func findNSAccessLogTableName(db *dbs.DB, day string) (tableName string, ok bool, err error) { @@ -135,10 +140,10 @@ func findNSAccessLogTableName(db *dbs.DB, day string) (tableName string, ok bool } // 根据日期获取表名 -func findHTTPAccessLogTable(db *dbs.DB, day string, force bool) (string, error) { +func findHTTPAccessLogTable(db *dbs.DB, day string, force bool) (*httpAccessLogDefinition, error) { config, err := db.Config() if err != nil { - return "", err + return nil, err } tableName := "edgeHTTPAccessLogs_" + day @@ -146,36 +151,49 @@ func findHTTPAccessLogTable(db *dbs.DB, day string, force bool) (string, error) if !force { accessLogLocker.RLock() - _, ok := httpAccessLogTableMapping[cacheKey] + definition, ok := httpAccessLogTableMapping[cacheKey] accessLogLocker.RUnlock() if ok { - return tableName, nil + return definition, nil } } tableNames, err := db.TableNames() if err != nil { - return tableName, err + return nil, err } if lists.ContainsString(tableNames, tableName) { + table, err := db.FindTable(tableName) + if err != nil { + return nil, err + } + accessLogLocker.Lock() - httpAccessLogTableMapping[cacheKey] = true + var definition = &httpAccessLogDefinition{ + Name: tableName, + HasRemoteAddr: table.FindFieldWithName("remoteAddr") != nil, + } + httpAccessLogTableMapping[cacheKey] = definition accessLogLocker.Unlock() - return tableName, nil + return definition, nil } // 创建表格 - _, err = db.Exec("CREATE TABLE `" + tableName + "` (\n `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',\n `serverId` int(11) unsigned DEFAULT '0' COMMENT '服务ID',\n `nodeId` int(11) unsigned DEFAULT '0' COMMENT '节点ID',\n `status` int(3) unsigned DEFAULT '0' COMMENT '状态码',\n `createdAt` bigint(11) unsigned DEFAULT '0' COMMENT '创建时间',\n `content` json DEFAULT NULL COMMENT '日志内容',\n `requestId` varchar(128) DEFAULT NULL COMMENT '请求ID',\n `firewallPolicyId` int(11) unsigned DEFAULT '0' COMMENT 'WAF策略ID',\n `firewallRuleGroupId` int(11) unsigned DEFAULT '0' COMMENT 'WAF分组ID',\n `firewallRuleSetId` int(11) unsigned DEFAULT '0' COMMENT 'WAF集ID',\n `firewallRuleId` int(11) unsigned DEFAULT '0' COMMENT 'WAF规则ID',\n PRIMARY KEY (`id`),\n KEY `serverId` (`serverId`),\n KEY `nodeId` (`nodeId`),\n KEY `serverId_status` (`serverId`,`status`),\n KEY `requestId` (`requestId`),\n KEY `firewallPolicyId` (`firewallPolicyId`),\n KEY `firewallRuleGroupId` (`firewallRuleGroupId`),\n KEY `firewallRuleSetId` (`firewallRuleSetId`),\n KEY `firewallRuleId` (`firewallRuleId`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='访问日志';") + _, err = db.Exec("CREATE TABLE `" + tableName + "` (`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',`serverId` int(11) unsigned DEFAULT '0' COMMENT '服务ID',`nodeId` int(11) unsigned DEFAULT '0' COMMENT '节点ID',`status` int(3) unsigned DEFAULT '0' COMMENT '状态码',`createdAt` bigint(11) unsigned DEFAULT '0' COMMENT '创建时间',`content` json DEFAULT NULL COMMENT '日志内容',`requestId` varchar(128) DEFAULT NULL COMMENT '请求ID',`firewallPolicyId` int(11) unsigned DEFAULT '0' COMMENT 'WAF策略ID',`firewallRuleGroupId` int(11) unsigned DEFAULT '0' COMMENT 'WAF分组ID',`firewallRuleSetId` int(11) unsigned DEFAULT '0' COMMENT 'WAF集ID',`firewallRuleId` int(11) unsigned DEFAULT '0' COMMENT 'WAF规则ID',`remoteAddr` varchar(64) DEFAULT NULL COMMENT 'IP地址',PRIMARY KEY (`id`),KEY `serverId` (`serverId`),KEY `nodeId` (`nodeId`),KEY `serverId_status` (`serverId`,`status`),KEY `requestId` (`requestId`),KEY `firewallPolicyId` (`firewallPolicyId`),KEY `firewallRuleGroupId` (`firewallRuleGroupId`),KEY `firewallRuleSetId` (`firewallRuleSetId`), KEY `firewallRuleId` (`firewallRuleId`), KEY `remoteAddr` (`remoteAddr`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='访问日志';") if err != nil { - return tableName, err + return nil, err } accessLogLocker.Lock() - httpAccessLogTableMapping[cacheKey] = true + var definition = &httpAccessLogDefinition{ + Name: tableName, + HasRemoteAddr: true, + } + httpAccessLogTableMapping[cacheKey] = definition accessLogLocker.Unlock() - return tableName, nil + return definition, nil } func findNSAccessLogTable(db *dbs.DB, day string, force bool) (string, error) { @@ -315,7 +333,7 @@ func (this *DBNodeInitializer) loop() error { // 检查表是否存在 // httpAccessLog { - tableName, err := findHTTPAccessLogTable(db, timeutil.Format("Ymd"), false) + tableDef, err := findHTTPAccessLogTable(db, timeutil.Format("Ymd"), false) if err != nil { if !strings.Contains(err.Error(), "1050") { // 非表格已存在错误 logs.Println("[DB_NODE]create first table in database node failed: " + err.Error()) @@ -335,7 +353,7 @@ func (this *DBNodeInitializer) loop() error { daoObject := dbs.DAOObject{ Instance: db, DB: node.Name + "(id:" + strconv.Itoa(int(node.Id)) + ")", - Table: tableName, + Table: tableDef.Name, PkName: "id", Model: new(HTTPAccessLog), } @@ -357,7 +375,6 @@ func (this *DBNodeInitializer) loop() error { accessLogLocker.Unlock() } - // nsAccessLog { tableName, err := findNSAccessLogTable(db, timeutil.Format("Ymd"), false) diff --git a/internal/db/models/http_access_log_dao.go b/internal/db/models/http_access_log_dao.go index 05aedf0f..58d611f6 100644 --- a/internal/db/models/http_access_log_dao.go +++ b/internal/db/models/http_access_log_dao.go @@ -12,6 +12,7 @@ import ( "github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/types" timeutil "github.com/iwind/TeaGo/utils/time" + "net" "net/http" "regexp" "sort" @@ -69,7 +70,7 @@ func (this *HTTPAccessLogDAO) CreateHTTPAccessLogsWithDAO(tx *dbs.Tx, daoWrapper for _, accessLog := range accessLogs { day := timeutil.Format("Ymd", time.Unix(accessLog.Timestamp, 0)) - table, err := findHTTPAccessLogTable(dao.Instance, day, false) + tableDef, err := findHTTPAccessLogTable(dao.Instance, day, false) if err != nil { return err } @@ -85,6 +86,11 @@ func (this *HTTPAccessLogDAO) CreateHTTPAccessLogsWithDAO(tx *dbs.Tx, daoWrapper fields["firewallRuleSetId"] = accessLog.FirewallRuleSetId fields["firewallRuleId"] = accessLog.FirewallRuleId + // TODO 根据集群、服务设置获取IP + if tableDef.HasRemoteAddr { + fields["remoteAddr"] = accessLog.RawRemoteAddr + } + content, err := json.Marshal(accessLog) if err != nil { return err @@ -92,23 +98,25 @@ func (this *HTTPAccessLogDAO) CreateHTTPAccessLogsWithDAO(tx *dbs.Tx, daoWrapper fields["content"] = content _, err = dao.Query(tx). - Table(table). + Table(tableDef.Name). Sets(fields). Insert() if err != nil { // 是否为 Error 1146: Table 'xxx.xxx' doesn't exist 如果是,则创建表之后重试 if strings.Contains(err.Error(), "1146") { - table, err = findHTTPAccessLogTable(dao.Instance, day, true) + tableDef, err = findHTTPAccessLogTable(dao.Instance, day, true) if err != nil { return err } _, err = dao.Query(tx). - Table(table). + Table(tableDef.Name). Sets(fields). Insert() if err != nil { return err } + } else { + logs.Println("HTTP_ACCESS_LOG", err.Error()) } } } @@ -179,7 +187,7 @@ func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx, lastRequestId string, s dao := daoWrapper.DAO - tableName, exists, err := findHTTPAccessLogTableName(dao.Instance, day) + tableName, hasRemoteAddr, exists, err := findHTTPAccessLogTableName(dao.Instance, day) if !exists { // 表格不存在则跳过 return @@ -216,41 +224,47 @@ func (this *HTTPAccessLogDAO) listAccessLogs(tx *dbs.Tx, lastRequestId string, s // keyword if len(keyword) > 0 { - useOriginKeyword := false + // remoteAddr + if hasRemoteAddr && net.ParseIP(keyword) != nil { + query.Attr("remoteAddr", keyword) + } else { - where := "JSON_EXTRACT(content, '$.remoteAddr') LIKE :keyword OR JSON_EXTRACT(content, '$.requestURI') LIKE :keyword OR JSON_EXTRACT(content, '$.host') LIKE :keyword" + useOriginKeyword := false - // 请求方法 - if keyword == http.MethodGet || - keyword == http.MethodPost || - keyword == http.MethodHead || - keyword == http.MethodConnect || - keyword == http.MethodPut || - keyword == http.MethodTrace || - keyword == http.MethodOptions || - keyword == http.MethodDelete || - keyword == http.MethodPatch { - where += " OR JSON_EXTRACT(content, '$.requestMethod')=:originKeyword" - useOriginKeyword = true - } + where := "JSON_EXTRACT(content, '$.remoteAddr') LIKE :keyword OR JSON_EXTRACT(content, '$.requestURI') LIKE :keyword OR JSON_EXTRACT(content, '$.host') LIKE :keyword" - // 响应状态码 - if regexp.MustCompile(`^\d{3}$`).MatchString(keyword) { - where += " OR JSON_EXTRACT(content, '$.status')=:intKeyword" - query.Param("intKeyword", types.Int(keyword)) - } + // 请求方法 + if keyword == http.MethodGet || + keyword == http.MethodPost || + keyword == http.MethodHead || + keyword == http.MethodConnect || + keyword == http.MethodPut || + keyword == http.MethodTrace || + keyword == http.MethodOptions || + keyword == http.MethodDelete || + keyword == http.MethodPatch { + where += " OR JSON_EXTRACT(content, '$.requestMethod')=:originKeyword" + useOriginKeyword = true + } - if regexp.MustCompile(`^\d{3}-\d{3}$`).MatchString(keyword) { - pieces := strings.Split(keyword, "-") - where += " OR JSON_EXTRACT(content, '$.status') BETWEEN :intKeyword1 AND :intKeyword2" - query.Param("intKeyword1", types.Int(pieces[0])) - query.Param("intKeyword2", types.Int(pieces[1])) - } + // 响应状态码 + if regexp.MustCompile(`^\d{3}$`).MatchString(keyword) { + where += " OR JSON_EXTRACT(content, '$.status')=:intKeyword" + query.Param("intKeyword", types.Int(keyword)) + } - query.Where("("+where+")"). - Param("keyword", "%"+keyword+"%") - if useOriginKeyword { - query.Param("originKeyword", keyword) + if regexp.MustCompile(`^\d{3}-\d{3}$`).MatchString(keyword) { + pieces := strings.Split(keyword, "-") + where += " OR JSON_EXTRACT(content, '$.status') BETWEEN :intKeyword1 AND :intKeyword2" + query.Param("intKeyword1", types.Int(pieces[0])) + query.Param("intKeyword2", types.Int(pieces[1])) + } + + query.Where("("+where+")"). + Param("keyword", "%"+keyword+"%") + if useOriginKeyword { + query.Param("originKeyword", keyword) + } } } @@ -350,7 +364,7 @@ func (this *HTTPAccessLogDAO) FindAccessLogWithRequestId(tx *dbs.Tx, requestId s dao := daoWrapper.DAO - tableName, exists, err := findHTTPAccessLogTableName(dao.Instance, day) + tableName, _, exists, err := findHTTPAccessLogTableName(dao.Instance, day) if err != nil { logs.Println("[DB_NODE]" + err.Error()) return diff --git a/internal/db/models/http_access_log_model.go b/internal/db/models/http_access_log_model.go index f7a41733..1f099da6 100644 --- a/internal/db/models/http_access_log_model.go +++ b/internal/db/models/http_access_log_model.go @@ -1,6 +1,6 @@ package models -// +// HTTPAccessLog 访问日志 type HTTPAccessLog struct { Id uint64 `field:"id"` // ID ServerId uint32 `field:"serverId"` // 服务ID @@ -13,6 +13,7 @@ type HTTPAccessLog struct { FirewallRuleGroupId uint32 `field:"firewallRuleGroupId"` // WAF分组ID FirewallRuleSetId uint32 `field:"firewallRuleSetId"` // WAF集ID FirewallRuleId uint32 `field:"firewallRuleId"` // WAF规则ID + RemoteAddr string `field:"remoteAddr"` // IP地址 } type HTTPAccessLogOperator struct { @@ -27,6 +28,7 @@ type HTTPAccessLogOperator struct { FirewallRuleGroupId interface{} // WAF分组ID FirewallRuleSetId interface{} // WAF集ID FirewallRuleId interface{} // WAF规则ID + RemoteAddr interface{} // IP地址 } func NewHTTPAccessLogOperator() *HTTPAccessLogOperator {