diff --git a/internal/nodes/http_access_log_queue.go b/internal/nodes/http_access_log_queue.go new file mode 100644 index 0000000..8e1c91c --- /dev/null +++ b/internal/nodes/http_access_log_queue.go @@ -0,0 +1,86 @@ +package nodes + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeNode/internal/logs" + "github.com/TeaOSLab/EdgeNode/internal/rpc" + "time" +) + +var sharedHTTPAccessLogQueue = NewHTTPAccessLogQueue() + +// HTTP访问日志队列 +type HTTPAccessLogQueue struct { + queue chan *pb.HTTPAccessLog +} + +// 获取新对象 +func NewHTTPAccessLogQueue() *HTTPAccessLogQueue { + // 队列中最大的值,超出此数量的访问日志会被抛弃 + // TODO 需要可以在界面中设置 + maxSize := 10000 + queue := &HTTPAccessLogQueue{ + queue: make(chan *pb.HTTPAccessLog, maxSize), + } + go queue.Start() + + return queue +} + +// 开始处理访问日志 +func (this *HTTPAccessLogQueue) Start() { + ticker := time.NewTicker(1 * time.Second) + for range ticker.C { + err := this.loop() + if err != nil { + logs.Error("ACCESS_LOG_QUEUE", err.Error()) + } + } +} + +// 加入新访问日志 +func (this *HTTPAccessLogQueue) Push(accessLog *pb.HTTPAccessLog) { + select { + case this.queue <- accessLog: + default: + + } +} + +// 上传访问日志 +func (this *HTTPAccessLogQueue) loop() error { + accessLogs := []*pb.HTTPAccessLog{} + count := 0 +Loop: + for { + select { + case accessLog := <-this.queue: + accessLogs = append(accessLogs, accessLog) + count++ + + // 每次只提交 N 条访问日志,防止网络拥堵 + if count > 1000 { + break Loop + } + default: + break Loop + } + } + + if len(accessLogs) == 0 { + return nil + } + + // 发送到API + client, err := rpc.SharedRPC() + if err != nil { + return err + } + + _, err = client.HTTPAccessLogRPC().CreateHTTPAccessLogs(client.Context(), &pb.CreateHTTPAccessLogsRequest{AccessLogs: accessLogs}) + if err != nil { + return err + } + + return nil +} diff --git a/internal/nodes/http_request.go b/internal/nodes/http_request.go index 3cd685d..bbab46d 100644 --- a/internal/nodes/http_request.go +++ b/internal/nodes/http_request.go @@ -58,6 +58,12 @@ type HTTPRequest struct { rewriteIsExternalURL bool // 重写目标是否为外部URL cacheRef *serverconfigs.HTTPCacheRef // 缓存设置 cacheKey string // 缓存使用的Key + + // WAF相关 + firewallPolicyId int64 + firewallRuleGroupId int64 + firewallRuleSetId int64 + firewallRuleId int64 } // 初始化 @@ -98,6 +104,7 @@ func (this *HTTPRequest) Do() { // WAF if this.web.FirewallRef != nil && this.web.FirewallRef.IsOn && this.web.FirewallPolicy != nil && this.web.FirewallPolicy.IsOn { if this.doWAFRequest() { + this.doEnd() return } } @@ -122,6 +129,9 @@ func (this *HTTPRequest) Do() { // 关闭写入 this.writer.Close() + + // 结束调用 + this.doEnd() } // 开始调用 @@ -263,6 +273,11 @@ func (this *HTTPRequest) configureWeb(web *serverconfigs.HTTPWebConfig, isTop bo this.web.FirewallPolicy = web.FirewallPolicy } + // access log + if web.AccessLogRef != nil && (web.AccessLogRef.IsPrior || isTop) { + this.web.AccessLogRef = web.AccessLogRef + } + // 重写规则 if len(web.RewriteRefs) > 0 { for index, ref := range web.RewriteRefs { @@ -990,12 +1005,6 @@ func (this *HTTPRequest) addError(err error) { this.errors = append(this.errors, err.Error()) } -// 日志 -func (this *HTTPRequest) log() { - // 计算请求时间 - this.requestCost = time.Since(this.requestFromTime).Seconds() -} - // 计算合适的buffer size func (this *HTTPRequest) bytePool(contentLength int64) *utils.BytePool { if contentLength <= 0 { diff --git a/internal/nodes/http_request_log.go b/internal/nodes/http_request_log.go new file mode 100644 index 0000000..cd7430f --- /dev/null +++ b/internal/nodes/http_request_log.go @@ -0,0 +1,128 @@ +package nodes + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "strings" + "time" +) + +// 日志 +func (this *HTTPRequest) log() { + // 计算请求时间 + this.requestCost = time.Since(this.requestFromTime).Seconds() + + ref := this.web.AccessLogRef + if ref == nil { + ref = serverconfigs.DefaultHTTPAccessLogRef + } + if !ref.IsOn { + return + } + + if !ref.Match(this.writer.StatusCode()) { + return + } + + addr := this.RawReq.RemoteAddr + index := strings.LastIndex(addr, ":") + if index > 0 { + addr = addr[:index] + } + + // 请求Cookie + cookies := map[string]string{} + if ref.ContainsField(serverconfigs.HTTPAccessLogFieldCookie) { + for _, cookie := range this.RawReq.Cookies() { + cookies[cookie.Name] = cookie.Value + } + } + + // 请求Header + pbReqHeader := map[string]*pb.Strings{} + if ref.ContainsField(serverconfigs.HTTPAccessLogFieldHeader) { + for k, v := range this.RawReq.Header { + pbReqHeader[k] = &pb.Strings{Values: v} + } + } + + // 响应Header + pbResHeader := map[string]*pb.Strings{} + if ref.ContainsField(serverconfigs.HTTPAccessLogFieldSentHeader) { + for k, v := range this.writer.Header() { + pbResHeader[k] = &pb.Strings{Values: v} + } + } + + // 参数列表 + queryString := "" + if ref.ContainsField(serverconfigs.HTTPAccessLogFieldArg) { + queryString = this.requestQueryString() + } + + // 浏览器 + userAgent := "" + if ref.ContainsField(serverconfigs.HTTPAccessLogFieldUserAgent) || ref.ContainsField(serverconfigs.HTTPAccessLogFieldExtend) { + userAgent = this.RawReq.UserAgent() + } + + // 请求来源 + referer := "" + if ref.ContainsField(serverconfigs.HTTPAccessLogFieldReferer) { + referer = this.RawReq.Referer() + } + + accessLog := &pb.HTTPAccessLog{ + NodeId: sharedNodeConfig.Id, + ServerId: this.Server.Id, + RemoteAddr: this.requestRemoteAddr(), + RawRemoteAddr: addr, + RemotePort: int32(this.requestRemotePort()), + RemoteUser: this.requestRemoteUser(), + RequestURI: this.rawURI, + RequestPath: this.requestPath(), + RequestLength: this.requestLength(), + RequestTime: this.requestCost, + RequestMethod: this.RawReq.Method, + RequestFilename: this.requestFilename(), + Scheme: this.requestScheme(), + Proto: this.RawReq.Proto, + BytesSent: this.writer.SentBodyBytes(), // TODO 加上Header Size + BodyBytesSent: this.writer.SentBodyBytes(), + Status: int32(this.writer.StatusCode()), + StatusMessage: "", + TimeISO8601: this.requestFromTime.Format("2006-01-02T15:04:05.000Z07:00"), + TimeLocal: this.requestFromTime.Format("2/Jan/2006:15:04:05 -0700"), + Msec: float64(this.requestFromTime.Unix()) + float64(this.requestFromTime.Nanosecond())/1000000000, + Timestamp: this.requestFromTime.Unix(), + Host: this.Host, + Referer: referer, + UserAgent: userAgent, + Request: this.requestString(), + ContentType: this.requestContentType(), + Cookie: cookies, + Args: queryString, + QueryString: queryString, + Header: pbReqHeader, + ServerName: this.ServerName, + ServerPort: int32(this.requestServerPort()), + ServerProtocol: this.RawReq.Proto, + SentHeader: pbResHeader, + Errors: this.errors, + Hostname: HOSTNAME, + + FirewallPolicyId: this.firewallPolicyId, + FirewallRuleGroupId: this.firewallRuleGroupId, + FirewallRuleSetId: this.firewallRuleSetId, + FirewallRuleId: this.firewallRuleId, + } + + if this.origin != nil { + accessLog.OriginId = this.origin.Id + accessLog.OriginAddress = this.originAddr + } + + // TODO 记录匹配的 locationId和rewriteId + + sharedHTTPAccessLogQueue.Push(accessLog) +} diff --git a/internal/rpc/rpc_client.go b/internal/rpc/rpc_client.go index e639152..ecfb795 100644 --- a/internal/rpc/rpc_client.go +++ b/internal/rpc/rpc_client.go @@ -52,6 +52,10 @@ func (this *RPCClient) NodeLogRPC() pb.NodeLogServiceClient { return pb.NewNodeLogServiceClient(this.pickConn()) } +func (this *RPCClient) HTTPAccessLogRPC() pb.HTTPAccessLogServiceClient { + return pb.NewHTTPAccessLogServiceClient(this.pickConn()) +} + func (this *RPCClient) Context() context.Context { ctx := context.Background() m := maps.Map{