mirror of
				https://github.com/TeaOSLab/EdgeAPI.git
				synced 2025-11-04 16:00:24 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			223 lines
		
	
	
		
			5.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			223 lines
		
	
	
		
			5.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package services
 | 
						||
 | 
						||
import (
 | 
						||
	"context"
 | 
						||
	"github.com/TeaOSLab/EdgeAPI/internal/db/models"
 | 
						||
	"github.com/TeaOSLab/EdgeAPI/internal/errors"
 | 
						||
	rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
 | 
						||
	"github.com/TeaOSLab/EdgeAPI/internal/utils/regexputils"
 | 
						||
	"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
 | 
						||
	"github.com/iwind/TeaGo/dbs"
 | 
						||
	"github.com/iwind/TeaGo/lists"
 | 
						||
	"sync"
 | 
						||
)
 | 
						||
 | 
						||
// HTTPAccessLogService 访问日志相关服务
 | 
						||
type HTTPAccessLogService struct {
 | 
						||
	BaseService
 | 
						||
}
 | 
						||
 | 
						||
// CreateHTTPAccessLogs 创建访问日志
 | 
						||
func (this *HTTPAccessLogService) CreateHTTPAccessLogs(ctx context.Context, req *pb.CreateHTTPAccessLogsRequest) (*pb.CreateHTTPAccessLogsResponse, error) {
 | 
						||
	// 校验请求
 | 
						||
	_, _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeNode)
 | 
						||
	if err != nil {
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
 | 
						||
	if len(req.HttpAccessLogs) == 0 {
 | 
						||
		return &pb.CreateHTTPAccessLogsResponse{}, nil
 | 
						||
	}
 | 
						||
 | 
						||
	var tx = this.NullTx()
 | 
						||
 | 
						||
	err = models.SharedHTTPAccessLogDAO.CreateHTTPAccessLogs(tx, req.HttpAccessLogs)
 | 
						||
	if err != nil {
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
 | 
						||
	err = this.writeAccessLogsToPolicy(req.HttpAccessLogs)
 | 
						||
	if err != nil {
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
 | 
						||
	return &pb.CreateHTTPAccessLogsResponse{}, nil
 | 
						||
}
 | 
						||
 | 
						||
// ListHTTPAccessLogs 列出单页访问日志
 | 
						||
func (this *HTTPAccessLogService) ListHTTPAccessLogs(ctx context.Context, req *pb.ListHTTPAccessLogsRequest) (*pb.ListHTTPAccessLogsResponse, error) {
 | 
						||
	// 校验请求
 | 
						||
	_, userId, err := this.ValidateAdminAndUser(ctx, true)
 | 
						||
	if err != nil {
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
 | 
						||
	var tx = this.NullTx()
 | 
						||
 | 
						||
	// 检查服务ID
 | 
						||
	if userId > 0 {
 | 
						||
		req.UserId = userId
 | 
						||
 | 
						||
		// 这里不用担心serverId <= 0 的情况,因为如果userId>0,则只会查询当前用户下的服务,不会产生安全问题
 | 
						||
		if req.ServerId > 0 {
 | 
						||
			err = models.SharedServerDAO.CheckUserServer(tx, userId, req.ServerId)
 | 
						||
			if err != nil {
 | 
						||
				return nil, err
 | 
						||
			}
 | 
						||
		}
 | 
						||
	}
 | 
						||
 | 
						||
	accessLogs, requestId, hasMore, err := models.SharedHTTPAccessLogDAO.ListAccessLogs(tx, req.Partition, req.RequestId, req.Size, req.Day, req.HourFrom, req.HourTo, req.NodeClusterId, req.NodeId, req.ServerId, req.Reverse, req.HasError, req.FirewallPolicyId, req.FirewallRuleGroupId, req.FirewallRuleSetId, req.HasFirewallPolicy, req.UserId, req.Keyword, req.Ip, req.Domain)
 | 
						||
	if err != nil {
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
 | 
						||
	var result = []*pb.HTTPAccessLog{}
 | 
						||
	var pbNodeMap = map[int64]*pb.Node{}
 | 
						||
	var pbClusterMap = map[int64]*pb.NodeCluster{}
 | 
						||
	for _, accessLog := range accessLogs {
 | 
						||
		a, err := accessLog.ToPB()
 | 
						||
		if err != nil {
 | 
						||
			return nil, err
 | 
						||
		}
 | 
						||
 | 
						||
		// 节点 & 集群
 | 
						||
		pbNode, ok := pbNodeMap[a.NodeId]
 | 
						||
		if ok {
 | 
						||
			a.Node = pbNode
 | 
						||
		} else {
 | 
						||
			node, err := models.SharedNodeDAO.FindEnabledNode(tx, a.NodeId)
 | 
						||
			if err != nil {
 | 
						||
				return nil, err
 | 
						||
			}
 | 
						||
			if node != nil {
 | 
						||
				pbNode = &pb.Node{Id: int64(node.Id), Name: node.Name}
 | 
						||
 | 
						||
				var clusterId = int64(node.ClusterId)
 | 
						||
				pbCluster, ok := pbClusterMap[clusterId]
 | 
						||
				if ok {
 | 
						||
					pbNode.NodeCluster = pbCluster
 | 
						||
				} else {
 | 
						||
					cluster, err := models.SharedNodeClusterDAO.FindEnabledNodeCluster(tx, clusterId)
 | 
						||
					if err != nil {
 | 
						||
						return nil, err
 | 
						||
					}
 | 
						||
					if cluster != nil {
 | 
						||
						pbCluster = &pb.NodeCluster{
 | 
						||
							Id:   int64(cluster.Id),
 | 
						||
							Name: cluster.Name,
 | 
						||
						}
 | 
						||
						pbNode.NodeCluster = pbCluster
 | 
						||
						pbClusterMap[clusterId] = pbCluster
 | 
						||
					}
 | 
						||
				}
 | 
						||
 | 
						||
				pbNodeMap[a.NodeId] = pbNode
 | 
						||
				a.Node = pbNode
 | 
						||
			}
 | 
						||
		}
 | 
						||
 | 
						||
		result = append(result, a)
 | 
						||
	}
 | 
						||
 | 
						||
	return &pb.ListHTTPAccessLogsResponse{
 | 
						||
		HttpAccessLogs: result,
 | 
						||
		AccessLogs:     result, // TODO 仅仅为了兼容,当用户节点版本大于0.0.8时可以删除
 | 
						||
		HasMore:        hasMore,
 | 
						||
		RequestId:      requestId,
 | 
						||
	}, nil
 | 
						||
}
 | 
						||
 | 
						||
// FindHTTPAccessLog 查找单个日志
 | 
						||
func (this *HTTPAccessLogService) FindHTTPAccessLog(ctx context.Context, req *pb.FindHTTPAccessLogRequest) (*pb.FindHTTPAccessLogResponse, error) {
 | 
						||
	// 校验请求
 | 
						||
	_, userId, err := this.ValidateAdminAndUser(ctx, true)
 | 
						||
	if err != nil {
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
 | 
						||
	var tx = this.NullTx()
 | 
						||
 | 
						||
	accessLog, err := models.SharedHTTPAccessLogDAO.FindAccessLogWithRequestId(tx, req.RequestId)
 | 
						||
	if err != nil {
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
	if accessLog == nil {
 | 
						||
		return &pb.FindHTTPAccessLogResponse{HttpAccessLog: nil}, nil
 | 
						||
	}
 | 
						||
 | 
						||
	// 检查权限
 | 
						||
	if userId > 0 {
 | 
						||
		err = models.SharedServerDAO.CheckUserServer(tx, userId, int64(accessLog.ServerId))
 | 
						||
		if err != nil {
 | 
						||
			return nil, err
 | 
						||
		}
 | 
						||
	}
 | 
						||
 | 
						||
	a, err := accessLog.ToPB()
 | 
						||
	if err != nil {
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
	return &pb.FindHTTPAccessLogResponse{HttpAccessLog: a}, nil
 | 
						||
}
 | 
						||
 | 
						||
// FindHTTPAccessLogPartitions 查找日志分区
 | 
						||
func (this *HTTPAccessLogService) FindHTTPAccessLogPartitions(ctx context.Context, req *pb.FindHTTPAccessLogPartitionsRequest) (*pb.FindHTTPAccessLogPartitionsResponse, error) {
 | 
						||
	_, err := this.ValidateAdmin(ctx)
 | 
						||
	if err != nil {
 | 
						||
		return nil, err
 | 
						||
	}
 | 
						||
 | 
						||
	if !regexputils.YYYYMMDD.MatchString(req.Day) {
 | 
						||
		return nil, errors.New("invalid 'day': " + req.Day)
 | 
						||
	}
 | 
						||
 | 
						||
	var dbList = models.AllAccessLogDBs()
 | 
						||
	if len(dbList) == 0 {
 | 
						||
		return &pb.FindHTTPAccessLogPartitionsResponse{
 | 
						||
			Partitions: nil,
 | 
						||
		}, nil
 | 
						||
	}
 | 
						||
 | 
						||
	var partitions = []int32{}
 | 
						||
	var locker sync.Mutex
 | 
						||
 | 
						||
	var wg = sync.WaitGroup{}
 | 
						||
	wg.Add(len(dbList))
 | 
						||
 | 
						||
	var lastErr error
 | 
						||
	for _, db := range dbList {
 | 
						||
		go func(db *dbs.DB) {
 | 
						||
			defer wg.Done()
 | 
						||
 | 
						||
			names, err := models.SharedHTTPAccessLogManager.FindTableNames(db, req.Day)
 | 
						||
			if err != nil {
 | 
						||
				lastErr = err
 | 
						||
			}
 | 
						||
			for _, name := range names {
 | 
						||
				var partition = models.SharedHTTPAccessLogManager.TablePartition(name)
 | 
						||
				locker.Lock()
 | 
						||
				if !lists.Contains(partitions, partition) {
 | 
						||
					partitions = append(partitions, partition)
 | 
						||
				}
 | 
						||
				locker.Unlock()
 | 
						||
			}
 | 
						||
		}(db)
 | 
						||
	}
 | 
						||
	wg.Wait()
 | 
						||
 | 
						||
	if lastErr != nil {
 | 
						||
		return nil, lastErr
 | 
						||
	}
 | 
						||
 | 
						||
	var reversePartitions = []int32{}
 | 
						||
	for i := len(partitions) - 1; i >= 0; i-- {
 | 
						||
		reversePartitions = append(reversePartitions, partitions[i])
 | 
						||
	}
 | 
						||
 | 
						||
	return &pb.FindHTTPAccessLogPartitionsResponse{
 | 
						||
		Partitions:        partitions,
 | 
						||
		ReversePartitions: reversePartitions,
 | 
						||
	}, nil
 | 
						||
}
 |