初步实现对象存储源站

This commit is contained in:
GoEdgeLab
2023-06-07 17:27:55 +08:00
parent 7c442fc43d
commit a4e616a446
2 changed files with 113 additions and 76 deletions

View File

@@ -0,0 +1,15 @@
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
//go:build !plus
package nodes
import (
"errors"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"net/http"
)
func (this *HTTPRequest) doOSSOrigin(origin *serverconfigs.OriginConfig) (*http.Response, error) {
// stub
return nil, errors.New("not implemented")
}

View File

@@ -117,14 +117,20 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId
requestHostHasVariables = origin.RequestHostHasVariables() requestHostHasVariables = origin.RequestHostHasVariables()
} }
// 处理OSS
var isHTTPOrigin = origin.OSS == nil
// 处理Scheme // 处理Scheme
if origin.Addr == nil { if isHTTPOrigin && origin.Addr == nil {
err := errors.New(this.URL() + ": Origin '" + strconv.FormatInt(origin.Id, 10) + "' does not has a address") err := errors.New(this.URL() + ": Origin '" + strconv.FormatInt(origin.Id, 10) + "' does not has a address")
remotelogs.ErrorServer("HTTP_REQUEST_REVERSE_PROXY", err.Error()) remotelogs.ErrorServer("HTTP_REQUEST_REVERSE_PROXY", err.Error())
this.write50x(err, http.StatusBadGateway, "Origin site did not has a valid address", "源站尚未配置地址", true) this.write50x(err, http.StatusBadGateway, "Origin site did not has a valid address", "源站尚未配置地址", true)
return return
} }
if isHTTPOrigin {
this.RawReq.URL.Scheme = origin.Addr.Protocol.Primary().Scheme() this.RawReq.URL.Scheme = origin.Addr.Protocol.Primary().Scheme()
}
// StripPrefix // StripPrefix
if len(stripPrefix) > 0 { if len(stripPrefix) > 0 {
@@ -161,8 +167,10 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId
this.uri = utils.CleanPath(this.uri) this.uri = utils.CleanPath(this.uri)
} }
var originAddr = ""
if isHTTPOrigin {
// 获取源站地址 // 获取源站地址
var originAddr = origin.Addr.PickAddress() originAddr = origin.Addr.PickAddress()
if origin.Addr.HostHasVariables() { if origin.Addr.HostHasVariables() {
originAddr = this.Format(originAddr) originAddr = this.Format(originAddr)
} }
@@ -220,6 +228,7 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId
this.RawReq.URL.Host = utils.ParseAddrHost(this.RawReq.URL.Host) this.RawReq.URL.Host = utils.ParseAddrHost(this.RawReq.URL.Host)
} }
} }
}
// 重组请求URL // 重组请求URL
var questionMark = strings.Index(this.uri, "?") var questionMark = strings.Index(this.uri, "?")
@@ -243,11 +252,14 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId
} }
// 判断是否为Websocket请求 // 判断是否为Websocket请求
if this.RawReq.Header.Get("Upgrade") == "websocket" { if isHTTPOrigin && this.RawReq.Header.Get("Upgrade") == "websocket" {
shouldRetry = this.doWebsocket(requestHost, isLastRetry) shouldRetry = this.doWebsocket(requestHost, isLastRetry)
return return
} }
var resp *http.Response
var requestErr error
if isHTTPOrigin { // 普通HTTP(S)源站
// 获取请求客户端 // 获取请求客户端
client, err := SharedHTTPClientPool.Client(this, origin, originAddr, this.reverseProxy.ProxyProtocol, this.reverseProxy.FollowRedirects) client, err := SharedHTTPClientPool.Client(this, origin, originAddr, this.reverseProxy.ProxyProtocol, this.reverseProxy.FollowRedirects)
if err != nil { if err != nil {
@@ -257,16 +269,25 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId
} }
// 开始请求 // 开始请求
resp, err := client.Do(this.RawReq) resp, requestErr = client.Do(this.RawReq)
if err != nil { } else if origin.OSS != nil { // OSS源站
resp, requestErr = this.doOSSOrigin(origin)
if requestErr == nil && resp == nil {
return
}
} else {
this.writeCode(http.StatusBadGateway, "The type of origin site has not been supported.", "设置的源站类型尚未支持。")
return
}
if requestErr != nil {
// 客户端取消请求,则不提示 // 客户端取消请求,则不提示
httpErr, ok := err.(*url.Error) httpErr, ok := requestErr.(*url.Error)
if !ok { if !ok {
SharedOriginStateManager.Fail(origin, requestHost, this.reverseProxy, func() { SharedOriginStateManager.Fail(origin, requestHost, this.reverseProxy, func() {
this.reverseProxy.ResetScheduling() this.reverseProxy.ResetScheduling()
}) })
this.write50x(err, http.StatusBadGateway, "Failed to read origin site", "源站读取失败", true) this.write50x(requestErr, http.StatusBadGateway, "Failed to read origin site", "源站读取失败", true)
remotelogs.WarnServer("HTTP_REQUEST_REVERSE_PROXY", this.RawReq.URL.String()+": Request origin server failed: "+err.Error()) remotelogs.WarnServer("HTTP_REQUEST_REVERSE_PROXY", this.RawReq.URL.String()+": Request origin server failed: "+requestErr.Error())
} else if httpErr.Err != context.Canceled { } else if httpErr.Err != context.Canceled {
SharedOriginStateManager.Fail(origin, requestHost, this.reverseProxy, func() { SharedOriginStateManager.Fail(origin, requestHost, this.reverseProxy, func() {
this.reverseProxy.ResetScheduling() this.reverseProxy.ResetScheduling()
@@ -282,21 +303,21 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId
} }
if httpErr.Err != io.EOF { if httpErr.Err != io.EOF {
remotelogs.WarnServer("HTTP_REQUEST_REVERSE_PROXY", this.URL()+": Request origin server failed: "+err.Error()) remotelogs.WarnServer("HTTP_REQUEST_REVERSE_PROXY", this.URL()+": Request origin server failed: "+requestErr.Error())
} }
return return
} }
if httpErr.Timeout() { if httpErr.Timeout() {
this.write50x(err, http.StatusGatewayTimeout, "Read origin site timeout", "源站读取超时", true) this.write50x(requestErr, http.StatusGatewayTimeout, "Read origin site timeout", "源站读取超时", true)
} else if httpErr.Temporary() { } else if httpErr.Temporary() {
this.write50x(err, http.StatusServiceUnavailable, "Origin site unavailable now", "源站当前不可用", true) this.write50x(requestErr, http.StatusServiceUnavailable, "Origin site unavailable now", "源站当前不可用", true)
} else { } else {
this.write50x(err, http.StatusBadGateway, "Failed to read origin site", "源站读取失败", true) this.write50x(requestErr, http.StatusBadGateway, "Failed to read origin site", "源站读取失败", true)
} }
if httpErr.Err != io.EOF { if httpErr.Err != io.EOF {
remotelogs.WarnServer("HTTP_REQUEST_REVERSE_PROXY", this.URL()+": Request origin server failed: "+err.Error()) remotelogs.WarnServer("HTTP_REQUEST_REVERSE_PROXY", this.URL()+": Request origin server failed: "+requestErr.Error())
} }
} else { } else {
// 是否为客户端方面的错误 // 是否为客户端方面的错误
@@ -316,7 +337,7 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId
} }
if !isClientError { if !isClientError {
this.write50x(err, http.StatusBadGateway, "Failed to read origin site", "源站读取失败", true) this.write50x(requestErr, http.StatusBadGateway, "Failed to read origin site", "源站读取失败", true)
} }
} }
if resp != nil && resp.Body != nil { if resp != nil && resp.Body != nil {
@@ -348,7 +369,7 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId
// WAF对出站进行检查 // WAF对出站进行检查
if this.web.FirewallRef != nil && this.web.FirewallRef.IsOn { if this.web.FirewallRef != nil && this.web.FirewallRef.IsOn {
if this.doWAFResponse(resp) { if this.doWAFResponse(resp) {
err = resp.Body.Close() err := resp.Body.Close()
if err != nil { if err != nil {
remotelogs.WarnServer("HTTP_REQUEST_REVERSE_PROXY", this.URL()+": Closing Error (WAF): "+err.Error()) remotelogs.WarnServer("HTTP_REQUEST_REVERSE_PROXY", this.URL()+": Closing Error (WAF): "+err.Error())
} }
@@ -358,7 +379,7 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId
// 特殊页面 // 特殊页面
if len(this.web.Pages) > 0 && this.doPage(resp.StatusCode) { if len(this.web.Pages) > 0 && this.doPage(resp.StatusCode) {
err = resp.Body.Close() err := resp.Body.Close()
if err != nil { if err != nil {
remotelogs.WarnServer("HTTP_REQUEST_REVERSE_PROXY", this.URL()+": Closing error (Page): "+err.Error()) remotelogs.WarnServer("HTTP_REQUEST_REVERSE_PROXY", this.URL()+": Closing error (Page): "+err.Error())
} }
@@ -439,6 +460,7 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId
// 输出到客户端 // 输出到客户端
var pool = this.bytePool(resp.ContentLength) var pool = this.bytePool(resp.ContentLength)
var buf = pool.Get() var buf = pool.Get()
var err error
if shouldAutoFlush { if shouldAutoFlush {
for { for {
n, readErr := resp.Body.Read(buf) n, readErr := resp.Body.Read(buf)