Files
EdgeAPI/internal/rpc/services/service_http_access_log.go
GoEdgeLab 5a17ae9d79 v1.4.1
2024-07-27 14:15:25 +08:00

226 lines
5.8 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package services
import (
"context"
"sync"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
"github.com/TeaOSLab/EdgeAPI/internal/utils/regexputils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/lists"
)
// HTTPAccessLogService 访问日志相关服务
type HTTPAccessLogService struct {
BaseService
}
// CreateHTTPAccessLogs 创建访问日志
func (this *HTTPAccessLogService) CreateHTTPAccessLogs(ctx context.Context, req *pb.CreateHTTPAccessLogsRequest) (*pb.CreateHTTPAccessLogsResponse, error) {
// 校验请求
_, _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeNode)
if err != nil {
return nil, err
}
if len(req.HttpAccessLogs) == 0 {
return &pb.CreateHTTPAccessLogsResponse{}, nil
}
var tx = this.NullTx()
if this.canWriteAccessLogsToDB() {
err = models.SharedHTTPAccessLogDAO.CreateHTTPAccessLogs(tx, req.HttpAccessLogs)
if err != nil {
return nil, err
}
}
err = this.writeAccessLogsToPolicy(req.HttpAccessLogs)
if err != nil {
return nil, err
}
return &pb.CreateHTTPAccessLogsResponse{}, nil
}
// ListHTTPAccessLogs 列出单页访问日志
func (this *HTTPAccessLogService) ListHTTPAccessLogs(ctx context.Context, req *pb.ListHTTPAccessLogsRequest) (*pb.ListHTTPAccessLogsResponse, error) {
// 校验请求
_, userId, err := this.ValidateAdminAndUser(ctx, true)
if err != nil {
return nil, err
}
var tx = this.NullTx()
// 检查服务ID
if userId > 0 {
req.UserId = userId
// 这里不用担心serverId <= 0 的情况因为如果userId>0则只会查询当前用户下的服务不会产生安全问题
if req.ServerId > 0 {
err = models.SharedServerDAO.CheckUserServer(tx, userId, req.ServerId)
if err != nil {
return nil, err
}
}
}
accessLogs, requestId, hasMore, err := models.SharedHTTPAccessLogDAO.ListAccessLogs(tx, req.Partition, req.RequestId, req.Size, req.Day, req.HourFrom, req.HourTo, req.NodeClusterId, req.NodeId, req.ServerId, req.Reverse, req.HasError, req.FirewallPolicyId, req.FirewallRuleGroupId, req.FirewallRuleSetId, req.HasFirewallPolicy, req.UserId, req.Keyword, req.Ip, req.Domain)
if err != nil {
return nil, err
}
var result = []*pb.HTTPAccessLog{}
var pbNodeMap = map[int64]*pb.Node{}
var pbClusterMap = map[int64]*pb.NodeCluster{}
for _, accessLog := range accessLogs {
a, err := accessLog.ToPB()
if err != nil {
return nil, err
}
// 节点 & 集群
pbNode, ok := pbNodeMap[a.NodeId]
if ok {
a.Node = pbNode
} else {
node, err := models.SharedNodeDAO.FindEnabledNode(tx, a.NodeId)
if err != nil {
return nil, err
}
if node != nil {
pbNode = &pb.Node{Id: int64(node.Id), Name: node.Name}
var clusterId = int64(node.ClusterId)
pbCluster, ok := pbClusterMap[clusterId]
if ok {
pbNode.NodeCluster = pbCluster
} else {
cluster, err := models.SharedNodeClusterDAO.FindEnabledNodeCluster(tx, clusterId)
if err != nil {
return nil, err
}
if cluster != nil {
pbCluster = &pb.NodeCluster{
Id: int64(cluster.Id),
Name: cluster.Name,
}
pbNode.NodeCluster = pbCluster
pbClusterMap[clusterId] = pbCluster
}
}
pbNodeMap[a.NodeId] = pbNode
a.Node = pbNode
}
}
result = append(result, a)
}
return &pb.ListHTTPAccessLogsResponse{
HttpAccessLogs: result,
AccessLogs: result, // TODO 仅仅为了兼容当用户节点版本大于0.0.8时可以删除
HasMore: hasMore,
RequestId: requestId,
}, nil
}
// FindHTTPAccessLog 查找单个日志
func (this *HTTPAccessLogService) FindHTTPAccessLog(ctx context.Context, req *pb.FindHTTPAccessLogRequest) (*pb.FindHTTPAccessLogResponse, error) {
// 校验请求
_, userId, err := this.ValidateAdminAndUser(ctx, true)
if err != nil {
return nil, err
}
var tx = this.NullTx()
accessLog, err := models.SharedHTTPAccessLogDAO.FindAccessLogWithRequestId(tx, req.RequestId)
if err != nil {
return nil, err
}
if accessLog == nil {
return &pb.FindHTTPAccessLogResponse{HttpAccessLog: nil}, nil
}
// 检查权限
if userId > 0 {
err = models.SharedServerDAO.CheckUserServer(tx, userId, int64(accessLog.ServerId))
if err != nil {
return nil, err
}
}
a, err := accessLog.ToPB()
if err != nil {
return nil, err
}
return &pb.FindHTTPAccessLogResponse{HttpAccessLog: a}, nil
}
// FindHTTPAccessLogPartitions 查找日志分区
func (this *HTTPAccessLogService) FindHTTPAccessLogPartitions(ctx context.Context, req *pb.FindHTTPAccessLogPartitionsRequest) (*pb.FindHTTPAccessLogPartitionsResponse, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
if !regexputils.YYYYMMDD.MatchString(req.Day) {
return nil, errors.New("invalid 'day': " + req.Day)
}
var dbList = models.AllAccessLogDBs()
if len(dbList) == 0 {
return &pb.FindHTTPAccessLogPartitionsResponse{
Partitions: nil,
}, nil
}
var partitions = []int32{}
var locker sync.Mutex
var wg = sync.WaitGroup{}
wg.Add(len(dbList))
var lastErr error
for _, db := range dbList {
go func(db *dbs.DB) {
defer wg.Done()
names, err := models.SharedHTTPAccessLogManager.FindTableNames(db, req.Day)
if err != nil {
lastErr = err
}
for _, name := range names {
var partition = models.SharedHTTPAccessLogManager.TablePartition(name)
locker.Lock()
if !lists.Contains(partitions, partition) {
partitions = append(partitions, partition)
}
locker.Unlock()
}
}(db)
}
wg.Wait()
if lastErr != nil {
return nil, lastErr
}
var reversePartitions = []int32{}
for i := len(partitions) - 1; i >= 0; i-- {
reversePartitions = append(reversePartitions, partitions[i])
}
return &pb.FindHTTPAccessLogPartitionsResponse{
Partitions: partitions,
ReversePartitions: reversePartitions,
}, nil
}