mirror of
				https://github.com/TeaOSLab/EdgeNode.git
				synced 2025-11-04 16:00:25 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			120 lines
		
	
	
		
			2.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			120 lines
		
	
	
		
			2.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package nodes
 | 
						|
 | 
						|
import (
 | 
						|
	"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
 | 
						|
	"github.com/TeaOSLab/EdgeNode/internal/goman"
 | 
						|
	"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
 | 
						|
	"github.com/TeaOSLab/EdgeNode/internal/rpc"
 | 
						|
	"reflect"
 | 
						|
	"strings"
 | 
						|
	"time"
 | 
						|
)
 | 
						|
 | 
						|
var sharedHTTPAccessLogQueue = NewHTTPAccessLogQueue()
 | 
						|
 | 
						|
// HTTPAccessLogQueue HTTP访问日志队列
 | 
						|
type HTTPAccessLogQueue struct {
 | 
						|
	queue chan *pb.HTTPAccessLog
 | 
						|
 | 
						|
	rpcClient *rpc.RPCClient
 | 
						|
}
 | 
						|
 | 
						|
// NewHTTPAccessLogQueue 获取新对象
 | 
						|
func NewHTTPAccessLogQueue() *HTTPAccessLogQueue {
 | 
						|
	// 队列中最大的值,超出此数量的访问日志会被丢弃
 | 
						|
	// TODO 需要可以在界面中设置
 | 
						|
	maxSize := 20000
 | 
						|
	queue := &HTTPAccessLogQueue{
 | 
						|
		queue: make(chan *pb.HTTPAccessLog, maxSize),
 | 
						|
	}
 | 
						|
	goman.New(func() {
 | 
						|
		queue.Start()
 | 
						|
	})
 | 
						|
 | 
						|
	return queue
 | 
						|
}
 | 
						|
 | 
						|
// Start 开始处理访问日志
 | 
						|
func (this *HTTPAccessLogQueue) Start() {
 | 
						|
	ticker := time.NewTicker(1 * time.Second)
 | 
						|
	for range ticker.C {
 | 
						|
		err := this.loop()
 | 
						|
		if err != nil {
 | 
						|
			remotelogs.Error("ACCESS_LOG_QUEUE", err.Error())
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Push 加入新访问日志
 | 
						|
func (this *HTTPAccessLogQueue) Push(accessLog *pb.HTTPAccessLog) {
 | 
						|
	select {
 | 
						|
	case this.queue <- accessLog:
 | 
						|
	default:
 | 
						|
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// 上传访问日志
 | 
						|
func (this *HTTPAccessLogQueue) loop() error {
 | 
						|
	var accessLogs = []*pb.HTTPAccessLog{}
 | 
						|
	var count = 0
 | 
						|
 | 
						|
Loop:
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case accessLog := <-this.queue:
 | 
						|
			accessLogs = append(accessLogs, accessLog)
 | 
						|
			count++
 | 
						|
 | 
						|
			// 每次只提交 N 条访问日志,防止网络拥堵
 | 
						|
			if count > 2000 {
 | 
						|
				break Loop
 | 
						|
			}
 | 
						|
		default:
 | 
						|
			break Loop
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if len(accessLogs) == 0 {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	// 发送到API
 | 
						|
	if this.rpcClient == nil {
 | 
						|
		client, err := rpc.SharedRPC()
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		this.rpcClient = client
 | 
						|
	}
 | 
						|
 | 
						|
	_, err := this.rpcClient.HTTPAccessLogRPC().CreateHTTPAccessLogs(this.rpcClient.Context(), &pb.CreateHTTPAccessLogsRequest{HttpAccessLogs: accessLogs})
 | 
						|
	if err != nil {
 | 
						|
		// 是否包含了invalid UTF-8
 | 
						|
		if strings.Contains(err.Error(), "string field contains invalid UTF-8") {
 | 
						|
			for _, accessLog := range accessLogs {
 | 
						|
				this.toValidUTF8(accessLog)
 | 
						|
			}
 | 
						|
 | 
						|
			// 重新提交
 | 
						|
			_, err = this.rpcClient.HTTPAccessLogRPC().CreateHTTPAccessLogs(this.rpcClient.Context(), &pb.CreateHTTPAccessLogsRequest{HttpAccessLogs: accessLogs})
 | 
						|
			return err
 | 
						|
		}
 | 
						|
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (this *HTTPAccessLogQueue) toValidUTF8(accessLog *pb.HTTPAccessLog) {
 | 
						|
	var v = reflect.Indirect(reflect.ValueOf(accessLog))
 | 
						|
	var countFields = v.NumField()
 | 
						|
	for i := 0; i < countFields; i++ {
 | 
						|
		var field = v.Field(i)
 | 
						|
		if field.Kind() == reflect.String {
 | 
						|
			field.SetString(strings.ToValidUTF8(field.String(), ""))
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 |