mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-06 10:00:25 +08:00
实现请求日志写入
This commit is contained in:
86
internal/nodes/http_access_log_queue.go
Normal file
86
internal/nodes/http_access_log_queue.go
Normal file
@@ -0,0 +1,86 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/logs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/rpc"
|
||||
"time"
|
||||
)
|
||||
|
||||
var sharedHTTPAccessLogQueue = NewHTTPAccessLogQueue()
|
||||
|
||||
// HTTP访问日志队列
|
||||
type HTTPAccessLogQueue struct {
|
||||
queue chan *pb.HTTPAccessLog
|
||||
}
|
||||
|
||||
// 获取新对象
|
||||
func NewHTTPAccessLogQueue() *HTTPAccessLogQueue {
|
||||
// 队列中最大的值,超出此数量的访问日志会被抛弃
|
||||
// TODO 需要可以在界面中设置
|
||||
maxSize := 10000
|
||||
queue := &HTTPAccessLogQueue{
|
||||
queue: make(chan *pb.HTTPAccessLog, maxSize),
|
||||
}
|
||||
go queue.Start()
|
||||
|
||||
return queue
|
||||
}
|
||||
|
||||
// 开始处理访问日志
|
||||
func (this *HTTPAccessLogQueue) Start() {
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
for range ticker.C {
|
||||
err := this.loop()
|
||||
if err != nil {
|
||||
logs.Error("ACCESS_LOG_QUEUE", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 加入新访问日志
|
||||
func (this *HTTPAccessLogQueue) Push(accessLog *pb.HTTPAccessLog) {
|
||||
select {
|
||||
case this.queue <- accessLog:
|
||||
default:
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// 上传访问日志
|
||||
func (this *HTTPAccessLogQueue) loop() error {
|
||||
accessLogs := []*pb.HTTPAccessLog{}
|
||||
count := 0
|
||||
Loop:
|
||||
for {
|
||||
select {
|
||||
case accessLog := <-this.queue:
|
||||
accessLogs = append(accessLogs, accessLog)
|
||||
count++
|
||||
|
||||
// 每次只提交 N 条访问日志,防止网络拥堵
|
||||
if count > 1000 {
|
||||
break Loop
|
||||
}
|
||||
default:
|
||||
break Loop
|
||||
}
|
||||
}
|
||||
|
||||
if len(accessLogs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 发送到API
|
||||
client, err := rpc.SharedRPC()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = client.HTTPAccessLogRPC().CreateHTTPAccessLogs(client.Context(), &pb.CreateHTTPAccessLogsRequest{AccessLogs: accessLogs})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -58,6 +58,12 @@ type HTTPRequest struct {
|
||||
rewriteIsExternalURL bool // 重写目标是否为外部URL
|
||||
cacheRef *serverconfigs.HTTPCacheRef // 缓存设置
|
||||
cacheKey string // 缓存使用的Key
|
||||
|
||||
// WAF相关
|
||||
firewallPolicyId int64
|
||||
firewallRuleGroupId int64
|
||||
firewallRuleSetId int64
|
||||
firewallRuleId int64
|
||||
}
|
||||
|
||||
// 初始化
|
||||
@@ -98,6 +104,7 @@ func (this *HTTPRequest) Do() {
|
||||
// WAF
|
||||
if this.web.FirewallRef != nil && this.web.FirewallRef.IsOn && this.web.FirewallPolicy != nil && this.web.FirewallPolicy.IsOn {
|
||||
if this.doWAFRequest() {
|
||||
this.doEnd()
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -122,6 +129,9 @@ func (this *HTTPRequest) Do() {
|
||||
|
||||
// 关闭写入
|
||||
this.writer.Close()
|
||||
|
||||
// 结束调用
|
||||
this.doEnd()
|
||||
}
|
||||
|
||||
// 开始调用
|
||||
@@ -263,6 +273,11 @@ func (this *HTTPRequest) configureWeb(web *serverconfigs.HTTPWebConfig, isTop bo
|
||||
this.web.FirewallPolicy = web.FirewallPolicy
|
||||
}
|
||||
|
||||
// access log
|
||||
if web.AccessLogRef != nil && (web.AccessLogRef.IsPrior || isTop) {
|
||||
this.web.AccessLogRef = web.AccessLogRef
|
||||
}
|
||||
|
||||
// 重写规则
|
||||
if len(web.RewriteRefs) > 0 {
|
||||
for index, ref := range web.RewriteRefs {
|
||||
@@ -990,12 +1005,6 @@ func (this *HTTPRequest) addError(err error) {
|
||||
this.errors = append(this.errors, err.Error())
|
||||
}
|
||||
|
||||
// 日志
|
||||
func (this *HTTPRequest) log() {
|
||||
// 计算请求时间
|
||||
this.requestCost = time.Since(this.requestFromTime).Seconds()
|
||||
}
|
||||
|
||||
// 计算合适的buffer size
|
||||
func (this *HTTPRequest) bytePool(contentLength int64) *utils.BytePool {
|
||||
if contentLength <= 0 {
|
||||
|
||||
128
internal/nodes/http_request_log.go
Normal file
128
internal/nodes/http_request_log.go
Normal file
@@ -0,0 +1,128 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// 日志
|
||||
func (this *HTTPRequest) log() {
|
||||
// 计算请求时间
|
||||
this.requestCost = time.Since(this.requestFromTime).Seconds()
|
||||
|
||||
ref := this.web.AccessLogRef
|
||||
if ref == nil {
|
||||
ref = serverconfigs.DefaultHTTPAccessLogRef
|
||||
}
|
||||
if !ref.IsOn {
|
||||
return
|
||||
}
|
||||
|
||||
if !ref.Match(this.writer.StatusCode()) {
|
||||
return
|
||||
}
|
||||
|
||||
addr := this.RawReq.RemoteAddr
|
||||
index := strings.LastIndex(addr, ":")
|
||||
if index > 0 {
|
||||
addr = addr[:index]
|
||||
}
|
||||
|
||||
// 请求Cookie
|
||||
cookies := map[string]string{}
|
||||
if ref.ContainsField(serverconfigs.HTTPAccessLogFieldCookie) {
|
||||
for _, cookie := range this.RawReq.Cookies() {
|
||||
cookies[cookie.Name] = cookie.Value
|
||||
}
|
||||
}
|
||||
|
||||
// 请求Header
|
||||
pbReqHeader := map[string]*pb.Strings{}
|
||||
if ref.ContainsField(serverconfigs.HTTPAccessLogFieldHeader) {
|
||||
for k, v := range this.RawReq.Header {
|
||||
pbReqHeader[k] = &pb.Strings{Values: v}
|
||||
}
|
||||
}
|
||||
|
||||
// 响应Header
|
||||
pbResHeader := map[string]*pb.Strings{}
|
||||
if ref.ContainsField(serverconfigs.HTTPAccessLogFieldSentHeader) {
|
||||
for k, v := range this.writer.Header() {
|
||||
pbResHeader[k] = &pb.Strings{Values: v}
|
||||
}
|
||||
}
|
||||
|
||||
// 参数列表
|
||||
queryString := ""
|
||||
if ref.ContainsField(serverconfigs.HTTPAccessLogFieldArg) {
|
||||
queryString = this.requestQueryString()
|
||||
}
|
||||
|
||||
// 浏览器
|
||||
userAgent := ""
|
||||
if ref.ContainsField(serverconfigs.HTTPAccessLogFieldUserAgent) || ref.ContainsField(serverconfigs.HTTPAccessLogFieldExtend) {
|
||||
userAgent = this.RawReq.UserAgent()
|
||||
}
|
||||
|
||||
// 请求来源
|
||||
referer := ""
|
||||
if ref.ContainsField(serverconfigs.HTTPAccessLogFieldReferer) {
|
||||
referer = this.RawReq.Referer()
|
||||
}
|
||||
|
||||
accessLog := &pb.HTTPAccessLog{
|
||||
NodeId: sharedNodeConfig.Id,
|
||||
ServerId: this.Server.Id,
|
||||
RemoteAddr: this.requestRemoteAddr(),
|
||||
RawRemoteAddr: addr,
|
||||
RemotePort: int32(this.requestRemotePort()),
|
||||
RemoteUser: this.requestRemoteUser(),
|
||||
RequestURI: this.rawURI,
|
||||
RequestPath: this.requestPath(),
|
||||
RequestLength: this.requestLength(),
|
||||
RequestTime: this.requestCost,
|
||||
RequestMethod: this.RawReq.Method,
|
||||
RequestFilename: this.requestFilename(),
|
||||
Scheme: this.requestScheme(),
|
||||
Proto: this.RawReq.Proto,
|
||||
BytesSent: this.writer.SentBodyBytes(), // TODO 加上Header Size
|
||||
BodyBytesSent: this.writer.SentBodyBytes(),
|
||||
Status: int32(this.writer.StatusCode()),
|
||||
StatusMessage: "",
|
||||
TimeISO8601: this.requestFromTime.Format("2006-01-02T15:04:05.000Z07:00"),
|
||||
TimeLocal: this.requestFromTime.Format("2/Jan/2006:15:04:05 -0700"),
|
||||
Msec: float64(this.requestFromTime.Unix()) + float64(this.requestFromTime.Nanosecond())/1000000000,
|
||||
Timestamp: this.requestFromTime.Unix(),
|
||||
Host: this.Host,
|
||||
Referer: referer,
|
||||
UserAgent: userAgent,
|
||||
Request: this.requestString(),
|
||||
ContentType: this.requestContentType(),
|
||||
Cookie: cookies,
|
||||
Args: queryString,
|
||||
QueryString: queryString,
|
||||
Header: pbReqHeader,
|
||||
ServerName: this.ServerName,
|
||||
ServerPort: int32(this.requestServerPort()),
|
||||
ServerProtocol: this.RawReq.Proto,
|
||||
SentHeader: pbResHeader,
|
||||
Errors: this.errors,
|
||||
Hostname: HOSTNAME,
|
||||
|
||||
FirewallPolicyId: this.firewallPolicyId,
|
||||
FirewallRuleGroupId: this.firewallRuleGroupId,
|
||||
FirewallRuleSetId: this.firewallRuleSetId,
|
||||
FirewallRuleId: this.firewallRuleId,
|
||||
}
|
||||
|
||||
if this.origin != nil {
|
||||
accessLog.OriginId = this.origin.Id
|
||||
accessLog.OriginAddress = this.originAddr
|
||||
}
|
||||
|
||||
// TODO 记录匹配的 locationId和rewriteId
|
||||
|
||||
sharedHTTPAccessLogQueue.Push(accessLog)
|
||||
}
|
||||
@@ -52,6 +52,10 @@ func (this *RPCClient) NodeLogRPC() pb.NodeLogServiceClient {
|
||||
return pb.NewNodeLogServiceClient(this.pickConn())
|
||||
}
|
||||
|
||||
func (this *RPCClient) HTTPAccessLogRPC() pb.HTTPAccessLogServiceClient {
|
||||
return pb.NewHTTPAccessLogServiceClient(this.pickConn())
|
||||
}
|
||||
|
||||
func (this *RPCClient) Context() context.Context {
|
||||
ctx := context.Background()
|
||||
m := maps.Map{
|
||||
|
||||
Reference in New Issue
Block a user