Files
EdgeNode/internal/nodes/http_access_log_queue.go

195 lines
4.9 KiB
Go
Raw Normal View History

2020-10-10 11:47:53 +08:00
package nodes
import (
2021-12-15 15:09:58 +08:00
"bytes"
2020-10-10 11:47:53 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
2020-10-10 11:47:53 +08:00
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/utils"
2024-03-28 17:17:34 +08:00
memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"strings"
2020-10-10 11:47:53 +08:00
"time"
"unicode/utf8"
2020-10-10 11:47:53 +08:00
)
var sharedHTTPAccessLogQueue = NewHTTPAccessLogQueue()
2021-08-25 16:46:05 +08:00
// HTTPAccessLogQueue HTTP访问日志队列
2020-10-10 11:47:53 +08:00
type HTTPAccessLogQueue struct {
queue chan *pb.HTTPAccessLog
2021-11-21 10:56:54 +08:00
rpcClient *rpc.RPCClient
2020-10-10 11:47:53 +08:00
}
2021-08-25 16:46:05 +08:00
// NewHTTPAccessLogQueue 获取新对象
2020-10-10 11:47:53 +08:00
func NewHTTPAccessLogQueue() *HTTPAccessLogQueue {
2021-08-25 16:46:05 +08:00
// 队列中最大的值,超出此数量的访问日志会被丢弃
2024-03-28 17:17:34 +08:00
var maxSize = 2_000 * (1 + memutils.SystemMemoryGB()/2)
if maxSize > 20_000 {
maxSize = 20_000
}
var queue = &HTTPAccessLogQueue{
2020-10-10 11:47:53 +08:00
queue: make(chan *pb.HTTPAccessLog, maxSize),
}
goman.New(func() {
queue.Start()
})
2020-10-10 11:47:53 +08:00
return queue
}
2021-08-25 16:46:05 +08:00
// Start 开始处理访问日志
2020-10-10 11:47:53 +08:00
func (this *HTTPAccessLogQueue) Start() {
ticker := time.NewTicker(1 * time.Second)
for range ticker.C {
err := this.loop()
if err != nil {
if rpc.IsConnError(err) {
remotelogs.Debug("ACCESS_LOG_QUEUE", err.Error())
} else {
remotelogs.Error("ACCESS_LOG_QUEUE", err.Error())
}
2020-10-10 11:47:53 +08:00
}
}
}
2021-08-25 16:46:05 +08:00
// Push 加入新访问日志
2020-10-10 11:47:53 +08:00
func (this *HTTPAccessLogQueue) Push(accessLog *pb.HTTPAccessLog) {
select {
case this.queue <- accessLog:
default:
}
}
// 上传访问日志
func (this *HTTPAccessLogQueue) loop() error {
2024-01-22 10:46:48 +08:00
const maxLen = 2000
var accessLogs = make([]*pb.HTTPAccessLog, 0, maxLen)
var count = 0
2020-10-10 11:47:53 +08:00
Loop:
for {
select {
case accessLog := <-this.queue:
accessLogs = append(accessLogs, accessLog)
count++
// 每次只提交 N 条访问日志,防止网络拥堵
2024-01-22 10:46:48 +08:00
if count >= maxLen {
2020-10-10 11:47:53 +08:00
break Loop
}
default:
break Loop
}
}
if len(accessLogs) == 0 {
return nil
}
// 发送到本地
if sharedHTTPAccessLogViewer.HasConns() {
for _, accessLog := range accessLogs {
sharedHTTPAccessLogViewer.Send(accessLog)
}
}
2020-10-10 11:47:53 +08:00
// 发送到API
2021-11-21 10:56:54 +08:00
if this.rpcClient == nil {
client, err := rpc.SharedRPC()
if err != nil {
return err
}
this.rpcClient = client
2020-10-10 11:47:53 +08:00
}
2022-08-24 20:04:46 +08:00
_, err := this.rpcClient.HTTPAccessLogRPC.CreateHTTPAccessLogs(this.rpcClient.Context(), &pb.CreateHTTPAccessLogsRequest{HttpAccessLogs: accessLogs})
2020-10-10 11:47:53 +08:00
if err != nil {
// 是否包含了invalid UTF-8
if strings.Contains(err.Error(), "string field contains invalid UTF-8") {
for _, accessLog := range accessLogs {
2022-03-20 10:53:35 +08:00
this.ToValidUTF8(accessLog)
}
// 重新提交
2022-08-24 20:04:46 +08:00
_, err = this.rpcClient.HTTPAccessLogRPC.CreateHTTPAccessLogs(this.rpcClient.Context(), &pb.CreateHTTPAccessLogsRequest{HttpAccessLogs: accessLogs})
return err
}
// 是否请求内容过大
statusCode, ok := status.FromError(err)
if ok && statusCode.Code() == codes.ResourceExhausted {
// 去除Body
for _, accessLog := range accessLogs {
accessLog.RequestBody = nil
}
// 重新提交
_, err = this.rpcClient.HTTPAccessLogRPC.CreateHTTPAccessLogs(this.rpcClient.Context(), &pb.CreateHTTPAccessLogsRequest{HttpAccessLogs: accessLogs})
return err
}
2020-10-10 11:47:53 +08:00
return err
}
return nil
}
// ToValidUTF8 处理访问日志中的非UTF-8字节
2022-03-20 10:53:35 +08:00
func (this *HTTPAccessLogQueue) ToValidUTF8(accessLog *pb.HTTPAccessLog) {
accessLog.RemoteAddr = utils.ToValidUTF8string(accessLog.RemoteAddr)
accessLog.RemoteUser = utils.ToValidUTF8string(accessLog.RemoteUser)
accessLog.RequestURI = utils.ToValidUTF8string(accessLog.RequestURI)
accessLog.RequestPath = utils.ToValidUTF8string(accessLog.RequestPath)
accessLog.RequestFilename = utils.ToValidUTF8string(accessLog.RequestFilename)
2021-12-15 15:09:58 +08:00
accessLog.RequestBody = bytes.ToValidUTF8(accessLog.RequestBody, []byte{})
accessLog.Host = utils.ToValidUTF8string(accessLog.Host)
accessLog.Hostname = utils.ToValidUTF8string(accessLog.Hostname)
2021-12-15 15:09:58 +08:00
for k, v := range accessLog.SentHeader {
if !utf8.ValidString(k) {
delete(accessLog.SentHeader, k)
continue
}
2021-12-15 15:09:58 +08:00
for index, s := range v.Values {
v.Values[index] = utils.ToValidUTF8string(s)
}
}
2021-12-15 15:09:58 +08:00
accessLog.Referer = utils.ToValidUTF8string(accessLog.Referer)
accessLog.UserAgent = utils.ToValidUTF8string(accessLog.UserAgent)
accessLog.Request = utils.ToValidUTF8string(accessLog.Request)
accessLog.ContentType = utils.ToValidUTF8string(accessLog.ContentType)
2021-12-15 15:09:58 +08:00
for k, c := range accessLog.Cookie {
if !utf8.ValidString(k) {
delete(accessLog.Cookie, k)
continue
}
accessLog.Cookie[k] = utils.ToValidUTF8string(c)
2021-12-15 15:09:58 +08:00
}
accessLog.Args = utils.ToValidUTF8string(accessLog.Args)
accessLog.QueryString = utils.ToValidUTF8string(accessLog.QueryString)
2021-12-15 15:09:58 +08:00
for k, v := range accessLog.Header {
if !utf8.ValidString(k) {
delete(accessLog.Header, k)
continue
}
2021-12-15 15:09:58 +08:00
for index, s := range v.Values {
v.Values[index] = utils.ToValidUTF8string(s)
2021-12-15 15:09:58 +08:00
}
}
for k, v := range accessLog.Errors {
accessLog.Errors[k] = utils.ToValidUTF8string(v)
}
2021-12-15 15:09:58 +08:00
}