mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-12-15 23:26:36 +08:00
实现基本的反向代理
This commit is contained in:
@@ -1,7 +1,14 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
||||
"github.com/iwind/TeaGo/logs"
|
||||
"io"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
@@ -11,9 +18,46 @@ func (this *HTTPRequest) doReverseProxy() {
|
||||
return
|
||||
}
|
||||
|
||||
// 对URL的处理
|
||||
stripPrefix := this.reverseProxy.StripPrefix
|
||||
requestURI := this.reverseProxy.RequestURI
|
||||
requestURIHasVariables := this.reverseProxy.RequestURIHasVariables()
|
||||
requestHost := this.reverseProxy.RequestHost
|
||||
requestHostHasVariables := this.reverseProxy.RequestHostHasVariables()
|
||||
|
||||
// 源站
|
||||
requestCall := shared.NewRequestCall()
|
||||
origin := this.reverseProxy.NextOrigin(requestCall)
|
||||
if origin == nil {
|
||||
err := errors.New(this.requestPath() + ": no available backends for reverse proxy")
|
||||
logs.Error(err)
|
||||
this.write500(err)
|
||||
return
|
||||
}
|
||||
this.origin = origin // 设置全局变量是为了日志等处理
|
||||
if len(origin.StripPrefix) > 0 {
|
||||
stripPrefix = origin.StripPrefix
|
||||
}
|
||||
if len(origin.RequestURI) > 0 {
|
||||
requestURI = origin.RequestURI
|
||||
requestURIHasVariables = origin.RequestURIHasVariables()
|
||||
}
|
||||
if len(origin.RequestHost) > 0 {
|
||||
requestHost = origin.RequestHost
|
||||
requestHostHasVariables = origin.RequestHostHasVariables()
|
||||
}
|
||||
|
||||
// 处理Scheme
|
||||
if origin.Addr == nil {
|
||||
err := errors.New(this.requestPath() + ": origin '" + strconv.FormatInt(origin.Id, 10) + "' does not has a address")
|
||||
logs.Error(err)
|
||||
this.write500(err)
|
||||
return
|
||||
}
|
||||
this.RawReq.URL.Scheme = origin.Addr.Protocol.Primary().Scheme()
|
||||
|
||||
// StripPrefix
|
||||
if len(this.reverseProxy.StripPrefix) > 0 {
|
||||
stripPrefix := this.reverseProxy.StripPrefix
|
||||
if len(stripPrefix) > 0 {
|
||||
if stripPrefix[0] != '/' {
|
||||
stripPrefix = "/" + stripPrefix
|
||||
}
|
||||
@@ -24,11 +68,11 @@ func (this *HTTPRequest) doReverseProxy() {
|
||||
}
|
||||
|
||||
// RequestURI
|
||||
if len(this.reverseProxy.RequestURI) > 0 {
|
||||
if this.reverseProxy.RequestURIHasVariables() {
|
||||
this.uri = this.Format(this.reverseProxy.RequestURI)
|
||||
if len(requestURI) > 0 {
|
||||
if requestURIHasVariables {
|
||||
this.uri = this.Format(requestURI)
|
||||
} else {
|
||||
this.uri = this.reverseProxy.RequestURI
|
||||
this.uri = requestURI
|
||||
}
|
||||
if len(this.uri) == 0 || this.uri[0] != '/' {
|
||||
this.uri = "/" + this.uri
|
||||
@@ -47,6 +91,18 @@ func (this *HTTPRequest) doReverseProxy() {
|
||||
this.uri = utils.CleanPath(this.uri)
|
||||
}
|
||||
|
||||
// RequestHost
|
||||
if len(requestHost) > 0 {
|
||||
if requestHostHasVariables {
|
||||
this.RawReq.Host = this.Format(requestHost)
|
||||
} else {
|
||||
this.RawReq.Host = this.reverseProxy.RequestHost
|
||||
}
|
||||
this.RawReq.URL.Host = this.RawReq.Host
|
||||
} else {
|
||||
this.RawReq.URL.Host = this.Host
|
||||
}
|
||||
|
||||
// 重组请求URL
|
||||
questionMark := strings.Index(this.uri, "?")
|
||||
if questionMark > -1 {
|
||||
@@ -56,16 +112,11 @@ func (this *HTTPRequest) doReverseProxy() {
|
||||
this.RawReq.URL.Path = this.uri
|
||||
this.RawReq.URL.RawQuery = ""
|
||||
}
|
||||
this.RawReq.RequestURI = ""
|
||||
|
||||
// RequestHost
|
||||
if len(this.reverseProxy.RequestHost) > 0 {
|
||||
if this.reverseProxy.RequestHostHasVariables() {
|
||||
this.RawReq.Host = this.Format(this.reverseProxy.RequestHost)
|
||||
} else {
|
||||
this.RawReq.Host = this.reverseProxy.RequestHost
|
||||
}
|
||||
this.RawReq.URL.Host = this.RawReq.Host
|
||||
}
|
||||
// 处理Header
|
||||
this.setForwardHeaders(this.RawReq.Header)
|
||||
this.processRequestHeaders(this.RawReq.Header)
|
||||
|
||||
// 判断是否为Websocket请求
|
||||
if this.RawReq.Header.Get("Upgrade") == "websocket" {
|
||||
@@ -73,6 +124,110 @@ func (this *HTTPRequest) doReverseProxy() {
|
||||
return
|
||||
}
|
||||
|
||||
// 普通HTTP请求
|
||||
// 获取请求客户端
|
||||
client, addr, err := SharedHTTPClientPool.Client(this, origin)
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
this.write500(err)
|
||||
return
|
||||
}
|
||||
|
||||
this.originAddr = addr
|
||||
|
||||
// 开始请求
|
||||
resp, err := client.Do(this.RawReq)
|
||||
if err != nil {
|
||||
// 客户端取消请求,则不提示
|
||||
httpErr, ok := err.(*url.Error)
|
||||
if !ok || httpErr.Err != context.Canceled {
|
||||
// TODO 如果超过最大失败次数,则下线
|
||||
|
||||
this.write500(err)
|
||||
logs.Println("[proxy]'" + this.RawReq.URL.String() + "': " + err.Error())
|
||||
} else {
|
||||
// 是否为客户端方面的错误
|
||||
isClientError := false
|
||||
if ok {
|
||||
if httpErr.Err == context.Canceled {
|
||||
isClientError = true
|
||||
this.addError(errors.New(httpErr.Op + " " + httpErr.URL + ": client closed the connection"))
|
||||
this.writer.WriteHeader(499) // 仿照nginx
|
||||
}
|
||||
}
|
||||
|
||||
if !isClientError {
|
||||
this.write500(err)
|
||||
}
|
||||
}
|
||||
if resp != nil && resp.Body != nil {
|
||||
_ = resp.Body.Close()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// WAF对出站进行检查
|
||||
// TODO
|
||||
|
||||
// TODO 清除源站错误次数
|
||||
|
||||
// 特殊页面
|
||||
// TODO
|
||||
|
||||
// 设置Charset
|
||||
// TODO 这里应该可以设置文本类型的列表,以及是否强制覆盖所有文本类型的字符集
|
||||
if this.web.Charset != nil && this.web.Charset.IsOn && len(this.web.Charset.Charset) > 0 {
|
||||
contentTypes, ok := resp.Header["Content-Type"]
|
||||
if ok && len(contentTypes) > 0 {
|
||||
contentType := contentTypes[0]
|
||||
if _, found := textMimeMap[contentType]; found {
|
||||
resp.Header["Content-Type"][0] = contentType + "; charset=" + this.web.Charset.Charset
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 响应Header
|
||||
this.writer.AddHeaders(resp.Header)
|
||||
this.processResponseHeaders(resp.StatusCode)
|
||||
|
||||
// 是否需要刷新
|
||||
shouldFlush := this.RawReq.Header.Get("Accept") == "text/event-stream"
|
||||
|
||||
// 准备
|
||||
this.writer.Prepare(resp.ContentLength)
|
||||
|
||||
// 设置响应代码
|
||||
this.writer.WriteHeader(resp.StatusCode)
|
||||
|
||||
// 输出到客户端
|
||||
pool := this.bytePool(resp.ContentLength)
|
||||
buf := pool.Get()
|
||||
if shouldFlush {
|
||||
for {
|
||||
n, readErr := resp.Body.Read(buf)
|
||||
if n > 0 {
|
||||
_, err = this.writer.Write(buf[:n])
|
||||
this.writer.Flush()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
if readErr != nil {
|
||||
err = readErr
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
_, err = io.CopyBuffer(this.writer, resp.Body, buf)
|
||||
}
|
||||
pool.Put(buf)
|
||||
|
||||
err1 := resp.Body.Close()
|
||||
if err1 != nil {
|
||||
logs.Error(err1)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logs.Error(err)
|
||||
this.addError(err)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user