优化代码/商业版支持L2节点

This commit is contained in:
GoEdgeLab
2022-04-04 12:06:53 +08:00
parent 144bef4fe3
commit 4aeb3cc7b0
10 changed files with 242 additions and 149 deletions

View File

@@ -29,7 +29,7 @@ type HTTPClientPool struct {
// NewHTTPClientPool 获取新对象
func NewHTTPClientPool() *HTTPClientPool {
pool := &HTTPClientPool{
var pool = &HTTPClientPool{
clientExpiredDuration: 3600 * time.Second,
clientsMap: map[string]*HTTPClient{},
}
@@ -42,12 +42,16 @@ func NewHTTPClientPool() *HTTPClientPool {
}
// Client 根据地址获取客户端
func (this *HTTPClientPool) Client(req *HTTPRequest, origin *serverconfigs.OriginConfig, originAddr string, proxyProtocol *serverconfigs.ProxyProtocolConfig, followRedirects bool) (rawClient *http.Client, err error) {
func (this *HTTPClientPool) Client(req *HTTPRequest,
origin *serverconfigs.OriginConfig,
originAddr string,
proxyProtocol *serverconfigs.ProxyProtocolConfig,
followRedirects bool) (rawClient *http.Client, err error) {
if origin.Addr == nil {
return nil, errors.New("origin addr should not be empty (originId:" + strconv.FormatInt(origin.Id, 10) + ")")
}
key := origin.UniqueKey() + "@" + originAddr
var key = origin.UniqueKey() + "@" + originAddr
this.locker.Lock()
defer this.locker.Unlock()
@@ -58,11 +62,11 @@ func (this *HTTPClientPool) Client(req *HTTPRequest, origin *serverconfigs.Origi
return client.RawClient(), nil
}
maxConnections := origin.MaxConns
connectionTimeout := origin.ConnTimeoutDuration()
readTimeout := origin.ReadTimeoutDuration()
idleTimeout := origin.IdleTimeoutDuration()
idleConns := origin.MaxIdleConns
var maxConnections = origin.MaxConns
var connectionTimeout = origin.ConnTimeoutDuration()
var readTimeout = origin.ReadTimeoutDuration()
var idleTimeout = origin.IdleTimeoutDuration()
var idleConns = origin.MaxIdleConns
// 超时时间
if connectionTimeout <= 0 {
@@ -73,7 +77,7 @@ func (this *HTTPClientPool) Client(req *HTTPRequest, origin *serverconfigs.Origi
idleTimeout = 2 * time.Minute
}
numberCPU := runtime.NumCPU()
var numberCPU = runtime.NumCPU()
if numberCPU < 8 {
numberCPU = 8
}
@@ -163,7 +167,7 @@ func (this *HTTPClientPool) Client(req *HTTPRequest, origin *serverconfigs.Origi
// 清理不使用的Client
func (this *HTTPClientPool) cleanClients() {
ticker := time.NewTicker(this.clientExpiredDuration)
var ticker = time.NewTicker(this.clientExpiredDuration)
for range ticker.C {
currentAt := time.Now().Unix()
@@ -181,11 +185,11 @@ func (this *HTTPClientPool) cleanClients() {
// 支持TOA
func (this *HTTPClientPool) handleTOA(req *HTTPRequest, ctx context.Context, network string, originAddr string, connectionTimeout time.Duration) (net.Conn, error) {
// TODO 每个服务读取自身所属集群的TOA设置
toaConfig := sharedTOAManager.Config()
var toaConfig = sharedTOAManager.Config()
if toaConfig != nil && toaConfig.IsOn {
retries := 3
var retries = 3
for i := 1; i <= retries; i++ {
port := int(toaConfig.RandLocalPort())
var port = int(toaConfig.RandLocalPort())
// TODO 思考是否支持X-Real-IP/X-Forwarded-IP
err := sharedTOAManager.SendMsg("add:" + strconv.Itoa(port) + ":" + req.requestRemoteAddr(true))
if err != nil {
@@ -223,7 +227,7 @@ func (this *HTTPClientPool) handlePROXYProtocol(conn net.Conn, req *HTTPRequest,
if reqConn != nil {
destAddr = reqConn.(net.Conn).LocalAddr()
}
header := proxyproto.Header{
var header = proxyproto.Header{
Version: byte(proxyProtocol.Version),
Command: proxyproto.PROXY,
TransportProtocol: transportProtocol,

View File

@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/iplibrary"
@@ -47,6 +48,13 @@ type HTTPRequest struct {
IsHTTP bool
IsHTTPS bool
// 共享参数
nodeConfig *nodeconfigs.NodeConfig
// ln request
isLnRequest bool
lnRemoteAddr string
// 内部参数
isSubRequest bool
writer *HTTPWriter
@@ -145,6 +153,9 @@ func (this *HTTPRequest) Do() {
return
}
// 是否为低级别节点
this.isLnRequest = this.checkLnRequest()
// 回调事件
this.onInit()
if this.writer.isFinished {
@@ -152,6 +163,7 @@ func (this *HTTPRequest) Do() {
return
}
if !this.isLnRequest {
// 特殊URL处理
if len(this.rawURI) > 1 && this.rawURI[1] == '.' {
// ACME
@@ -205,6 +217,7 @@ func (this *HTTPRequest) Do() {
if this.web.Compression != nil && this.web.Compression.IsOn && this.web.Compression.Level > 0 {
this.writer.SetCompression(this.web.Compression)
}
}
// 开始调用
this.doBegin()
@@ -218,6 +231,7 @@ func (this *HTTPRequest) Do() {
// 开始调用
func (this *HTTPRequest) doBegin() {
if !this.isLnRequest {
// 处理request limit
if this.web.RequestLimit != nil &&
this.web.RequestLimit.IsOn {
@@ -269,6 +283,7 @@ func (this *HTTPRequest) doBegin() {
this.doShutdown()
return
}
}
// 缓存
if this.web.Cache != nil && this.web.Cache.IsOn {
@@ -277,6 +292,7 @@ func (this *HTTPRequest) doBegin() {
}
}
if !this.isLnRequest {
// 重写规则
if this.rewriteRule != nil {
if this.doRewrite() {
@@ -303,6 +319,7 @@ func (this *HTTPRequest) doBegin() {
return
}
}
}
// Reverse Proxy
if this.reverseProxyRef != nil && this.reverseProxyRef.IsOn && this.reverseProxy != nil && this.reverseProxy.IsOn {
@@ -809,9 +826,9 @@ func (this *HTTPRequest) Format(source string) string {
if prefix == "node" {
switch suffix {
case "id":
return strconv.FormatInt(sharedNodeConfig.Id, 10)
return strconv.FormatInt(this.nodeConfig.Id, 10)
case "name":
return sharedNodeConfig.Name
return this.nodeConfig.Name
case "role":
return teaconst.Role
}
@@ -970,13 +987,13 @@ func (this *HTTPRequest) Format(source string) string {
if prefix == "product" {
switch suffix {
case "name":
if sharedNodeConfig.ProductConfig != nil && len(sharedNodeConfig.ProductConfig.Name) > 0 {
return sharedNodeConfig.ProductConfig.Name
if this.nodeConfig.ProductConfig != nil && len(this.nodeConfig.ProductConfig.Name) > 0 {
return this.nodeConfig.ProductConfig.Name
}
return teaconst.GlobalProductName
case "version":
if sharedNodeConfig.ProductConfig != nil && len(sharedNodeConfig.ProductConfig.Version) > 0 {
return sharedNodeConfig.ProductConfig.Version
if this.nodeConfig.ProductConfig != nil && len(this.nodeConfig.ProductConfig.Version) > 0 {
return this.nodeConfig.ProductConfig.Version
}
return teaconst.Version
}
@@ -995,6 +1012,10 @@ func (this *HTTPRequest) addVarMapping(varMapping map[string]string) {
// 获取请求的客户端地址
func (this *HTTPRequest) requestRemoteAddr(supportVar bool) string {
if len(this.lnRemoteAddr) > 0 {
return this.lnRemoteAddr
}
if supportVar && len(this.remoteAddr) > 0 {
return this.remoteAddr
}

View File

@@ -0,0 +1,17 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
//go:build !plus
// +build !plus
package nodes
import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
)
func (this *HTTPRequest) checkLnRequest() bool {
return false
}
func (this *HTTPRequest) getLnOrigin() *serverconfigs.OriginConfig {
return nil
}

View File

@@ -92,7 +92,7 @@ func (this *HTTPRequest) log() {
accessLog := &pb.HTTPAccessLog{
RequestId: this.requestId,
NodeId: sharedNodeConfig.Id,
NodeId: this.nodeConfig.Id,
ServerId: this.ReqServer.Id,
RemoteAddr: this.requestRemoteAddr(true),
RawRemoteAddr: addr,

View File

@@ -36,7 +36,21 @@ func (this *HTTPRequest) doReverseProxy() {
requestCall.Request = this.RawReq
requestCall.Formatter = this.Format
requestCall.Domain = this.ReqHost
var origin = this.reverseProxy.NextOrigin(requestCall)
var origin *serverconfigs.OriginConfig
// 二级节点
if this.cacheRef != nil {
origin = this.getLnOrigin()
if origin != nil {
// 强制变更原来访问的域名
requestHost = this.ReqHost
}
}
// 自定义源站
if origin == nil {
origin = this.reverseProxy.NextOrigin(requestCall)
requestCall.CallResponseCallbacks(this.writer)
if origin == nil {
err := errors.New(this.URL() + ": no available origin sites for reverse proxy")
@@ -44,7 +58,7 @@ func (this *HTTPRequest) doReverseProxy() {
this.write50x(err, http.StatusBadGateway, true)
return
}
this.origin = origin // 设置全局变量是为了日志等处理
if len(origin.StripPrefix) > 0 {
stripPrefix = origin.StripPrefix
}
@@ -52,6 +66,10 @@ func (this *HTTPRequest) doReverseProxy() {
requestURI = origin.RequestURI
requestURIHasVariables = origin.RequestURIHasVariables()
}
}
this.origin = origin // 设置全局变量是为了日志等处理
if len(origin.RequestHost) > 0 {
requestHost = origin.RequestHost
requestHostHasVariables = origin.RequestHostHasVariables()

View File

@@ -24,7 +24,7 @@ func (this *HTTPRequest) doWAFRequest() (blocked bool) {
var remoteAddr = this.requestRemoteAddr(true)
// 检查是否为白名单直连
if !Tea.IsTesting() && sharedNodeConfig.IPIsAutoAllowed(remoteAddr) {
if !Tea.IsTesting() && this.nodeConfig.IPIsAutoAllowed(remoteAddr) {
return
}

View File

@@ -445,7 +445,7 @@ func (this *HTTPWriter) PrepareWebP(resp *http.Response, size int64) {
}
// 集群配置
var policy = sharedNodeConfig.FindWebPImagePolicyWithClusterId(this.req.ReqServer.ClusterId)
var policy = this.req.nodeConfig.FindWebPImagePolicyWithClusterId(this.req.ReqServer.ClusterId)
if policy == nil {
policy = nodeconfigs.DefaultWebPImagePolicy
}

View File

@@ -134,7 +134,7 @@ func (this *HTTPListener) Reload(group *serverconfigs.ServerAddressGroup) {
// ServerHTTP 处理HTTP请求
func (this *HTTPListener) ServeHTTP(rawWriter http.ResponseWriter, rawReq *http.Request) {
// 域名
reqHost := rawReq.Host
var reqHost = rawReq.Host
// TLS域名
if this.isIP(reqHost) {
@@ -214,6 +214,8 @@ func (this *HTTPListener) ServeHTTP(rawWriter http.ResponseWriter, rawReq *http.
ServerAddr: this.addr,
IsHTTP: this.isHTTP,
IsHTTPS: this.isHTTPS,
nodeConfig: sharedNodeConfig,
}
req.Do()
}

View File

@@ -339,6 +339,32 @@ func (this *Node) loop() error {
return errors.New("reload common scripts failed: " + err.Error())
}
// 修改为已同步
_, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
NodeTaskId: task.Id,
IsOk: true,
Error: "",
})
if err != nil {
return err
}
case "nodeLevelChanged":
levelInfoResp, err := rpcClient.NodeRPC().FindNodeLevelInfo(nodeCtx, &pb.FindNodeLevelInfoRequest{})
if err != nil {
return err
}
sharedNodeConfig.Level = levelInfoResp.Level
var parentNodes = map[int64][]*nodeconfigs.ParentNodeConfig{}
if len(levelInfoResp.ParentNodesMapJSON) > 0 {
err = json.Unmarshal(levelInfoResp.ParentNodesMapJSON, &parentNodes)
if err != nil {
return errors.New("decode level info failed: " + err.Error())
}
}
sharedNodeConfig.ParentNodes = parentNodes
// 修改为已同步
_, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
NodeTaskId: task.Id,

View File

@@ -124,9 +124,10 @@ func (this *OriginStateManager) Loop() error {
// Fail 添加失败的源站
func (this *OriginStateManager) Fail(origin *serverconfigs.OriginConfig, reverseProxy *serverconfigs.ReverseProxyConfig, callback func()) {
if origin == nil {
if origin == nil || origin.Id <= 0 {
return
}
this.locker.Lock()
state, ok := this.stateMap[origin.Id]
var timestamp = time.Now().Unix()
@@ -164,7 +165,7 @@ func (this *OriginStateManager) Fail(origin *serverconfigs.OriginConfig, reverse
// Success 添加成功的源站
func (this *OriginStateManager) Success(origin *serverconfigs.OriginConfig, callback func()) {
if origin == nil {
if origin == nil || origin.Id <= 0 {
return
}
@@ -182,6 +183,10 @@ func (this *OriginStateManager) Success(origin *serverconfigs.OriginConfig, call
// IsAvailable 检查是否正常
func (this *OriginStateManager) IsAvailable(originId int64) bool {
if originId <= 0 {
return true
}
this.locker.RLock()
_, ok := this.stateMap[originId]
this.locker.RUnlock()