缓存支持Range分片查询

This commit is contained in:
GoEdgeLab
2021-01-13 12:52:38 +08:00
parent ba6fa92dc9
commit 15e4045ec9
3 changed files with 181 additions and 27 deletions

View File

@@ -5,6 +5,7 @@ import (
"errors" "errors"
"github.com/TeaOSLab/EdgeNode/internal/caches" "github.com/TeaOSLab/EdgeNode/internal/caches"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/iwind/TeaGo/logs"
"net/http" "net/http"
"strconv" "strconv"
) )
@@ -115,20 +116,153 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
} }
this.processResponseHeaders(reader.Status()) this.processResponseHeaders(reader.Status())
this.writer.WriteHeader(reader.Status())
// 输出Body // 输出Body
if this.RawReq.Method != http.MethodHead { if this.RawReq.Method == http.MethodHead {
err = reader.ReadBody(buf, func(n int) (goNext bool, err error) { this.writer.WriteHeader(reader.Status())
_, err = this.writer.Write(buf[:n]) } else {
if err != nil { ifRangeHeaders, ok := this.RawReq.Header["If-Range"]
return false, err supportRange := true
if ok {
supportRange = false
for _, v := range ifRangeHeaders {
if v == this.writer.Header().Get("ETag") || v == this.writer.Header().Get("Last-Modified") {
supportRange = true
}
}
}
// 支持Range
rangeSet := [][]int64{}
if supportRange {
fileSize := reader.BodySize()
contentRange := this.RawReq.Header.Get("Range")
if len(contentRange) > 0 {
if fileSize == 0 {
this.processResponseHeaders(http.StatusRequestedRangeNotSatisfiable)
this.writer.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return true
}
set, ok := httpRequestParseContentRange(contentRange)
if !ok {
this.processResponseHeaders(http.StatusRequestedRangeNotSatisfiable)
this.writer.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return true
}
if len(set) > 0 {
rangeSet = set
for _, arr := range rangeSet {
if arr[0] == -1 {
arr[0] = fileSize + arr[1]
arr[1] = fileSize - 1
if arr[0] < 0 {
this.processResponseHeaders(http.StatusRequestedRangeNotSatisfiable)
this.writer.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return true
}
}
if arr[1] < 0 {
arr[1] = fileSize - 1
}
if arr[1] >= fileSize {
arr[1] = fileSize - 1
}
if arr[0] > arr[1] {
this.processResponseHeaders(http.StatusRequestedRangeNotSatisfiable)
this.writer.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return true
}
}
}
}
}
respHeader := this.writer.Header()
if len(rangeSet) == 1 {
respHeader.Set("Content-Range", "bytes "+strconv.FormatInt(rangeSet[0][0], 10)+"-"+strconv.FormatInt(rangeSet[0][1], 10)+"/"+strconv.FormatInt(reader.BodySize(), 10))
respHeader.Set("Content-Length", strconv.FormatInt(rangeSet[0][1]-rangeSet[0][0]+1, 10))
this.writer.WriteHeader(http.StatusPartialContent)
err = reader.ReadBodyRange(buf, rangeSet[0][0], rangeSet[0][1], func(n int) (goNext bool, err error) {
_, err = this.writer.Write(buf[:n])
if err != nil {
return false, err
}
return true, nil
})
if err != nil {
if err == caches.ErrInvalidRange {
this.processResponseHeaders(http.StatusRequestedRangeNotSatisfiable)
this.writer.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return true
}
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
return
}
} else if len(rangeSet) > 1 {
boundary := httpRequestGenBoundary()
respHeader.Set("Content-Type", "multipart/byteranges; boundary="+boundary)
respHeader.Del("Content-Length")
contentType := respHeader.Get("Content-Type")
this.writer.WriteHeader(http.StatusPartialContent)
for index, set := range rangeSet {
if index == 0 {
_, err = this.writer.WriteString("--" + boundary + "\r\n")
} else {
_, err = this.writer.WriteString("\r\n--" + boundary + "\r\n")
}
if err != nil {
logs.Error(err)
return true
}
_, err = this.writer.WriteString("Content-Range: " + "bytes " + strconv.FormatInt(set[0], 10) + "-" + strconv.FormatInt(set[1], 10) + "/" + strconv.FormatInt(reader.BodySize(), 10) + "\r\n")
if err != nil {
logs.Error(err)
return true
}
if len(contentType) > 0 {
_, err = this.writer.WriteString("Content-Type: " + contentType + "\r\n\r\n")
if err != nil {
logs.Error(err)
return true
}
}
err := reader.ReadBodyRange(buf, set[0], set[1], func(n int) (goNext bool, err error) {
_, err = this.writer.Write(buf[:n])
return true, err
})
if err != nil {
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
return true
}
}
_, err = this.writer.WriteString("\r\n--" + boundary + "--\r\n")
if err != nil {
logs.Error(err)
return true
}
} else {
this.writer.WriteHeader(reader.Status())
err = reader.ReadBody(buf, func(n int) (goNext bool, err error) {
_, err = this.writer.Write(buf[:n])
if err != nil {
return false, err
}
return true, nil
})
if err != nil {
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
return
} }
return true, nil
})
if err != nil {
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
return
} }
} }

View File

@@ -224,13 +224,22 @@ func (this *HTTPRequest) doRoot() (isBreak bool) {
supportRange = true supportRange = true
} }
} }
if !supportRange {
respHeader.Del("Accept-Ranges")
}
} }
// 支持Range // 支持Range
rangeSet := [][]int{} rangeSet := [][]int64{}
if supportRange { if supportRange {
contentRange := this.RawReq.Header.Get("Range") contentRange := this.RawReq.Header.Get("Range")
if len(contentRange) > 0 { if len(contentRange) > 0 {
if fileSize == 0 {
this.processResponseHeaders(http.StatusRequestedRangeNotSatisfiable)
this.writer.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return true
}
set, ok := httpRequestParseContentRange(contentRange) set, ok := httpRequestParseContentRange(contentRange)
if !ok { if !ok {
this.processResponseHeaders(http.StatusRequestedRangeNotSatisfiable) this.processResponseHeaders(http.StatusRequestedRangeNotSatisfiable)
@@ -241,8 +250,8 @@ func (this *HTTPRequest) doRoot() (isBreak bool) {
rangeSet = set rangeSet = set
for _, arr := range rangeSet { for _, arr := range rangeSet {
if arr[0] == -1 { if arr[0] == -1 {
arr[0] = int(fileSize) + arr[1] arr[0] = fileSize + arr[1]
arr[1] = int(fileSize) - 1 arr[1] = fileSize - 1
if arr[0] < 0 { if arr[0] < 0 {
this.processResponseHeaders(http.StatusRequestedRangeNotSatisfiable) this.processResponseHeaders(http.StatusRequestedRangeNotSatisfiable)
@@ -250,6 +259,17 @@ func (this *HTTPRequest) doRoot() (isBreak bool) {
return true return true
} }
} }
if arr[1] > 0 {
arr[1] = fileSize - 1
}
if arr[1] < 0 {
arr[1] = fileSize - 1
}
if arr[0] > arr[1] {
this.processResponseHeaders(http.StatusRequestedRangeNotSatisfiable)
this.writer.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return true
}
} }
} }
} else { } else {
@@ -284,7 +304,7 @@ func (this *HTTPRequest) doRoot() (isBreak bool) {
}() }()
if len(rangeSet) == 1 { if len(rangeSet) == 1 {
respHeader.Set("Content-Range", "bytes "+strconv.Itoa(rangeSet[0][0])+"-"+strconv.Itoa(rangeSet[0][1])+"/"+strconv.FormatInt(fileSize, 10)) respHeader.Set("Content-Range", "bytes "+strconv.FormatInt(rangeSet[0][0], 10)+"-"+strconv.FormatInt(rangeSet[0][1], 10)+"/"+strconv.FormatInt(fileSize, 10))
this.writer.WriteHeader(http.StatusPartialContent) this.writer.WriteHeader(http.StatusPartialContent)
ok, err := httpRequestReadRange(reader, buf, rangeSet[0][0], rangeSet[0][1], func(buf []byte, n int) error { ok, err := httpRequestReadRange(reader, buf, rangeSet[0][0], rangeSet[0][1], func(buf []byte, n int) error {
@@ -317,7 +337,7 @@ func (this *HTTPRequest) doRoot() (isBreak bool) {
return true return true
} }
_, err = this.writer.WriteString("Content-Range: " + "bytes " + strconv.Itoa(set[0]) + "-" + strconv.Itoa(set[1]) + "/" + strconv.FormatInt(fileSize, 10) + "\r\n") _, err = this.writer.WriteString("Content-Range: " + "bytes " + strconv.FormatInt(set[0], 10) + "-" + strconv.FormatInt(set[1], 10) + "/" + strconv.FormatInt(fileSize, 10) + "\r\n")
if err != nil { if err != nil {
logs.Error(err) logs.Error(err)
return true return true

View File

@@ -9,7 +9,7 @@ import (
) )
// 分解Range // 分解Range
func httpRequestParseContentRange(rangeValue string) (result [][]int, ok bool) { func httpRequestParseContentRange(rangeValue string) (result [][]int64, ok bool) {
// 参考RFChttps://tools.ietf.org/html/rfc7233 // 参考RFChttps://tools.ietf.org/html/rfc7233
index := strings.Index(rangeValue, "=") index := strings.Index(rangeValue, "=")
if index == -1 { if index == -1 {
@@ -33,20 +33,20 @@ func httpRequestParseContentRange(rangeValue string) (result [][]int, ok bool) {
return return
} }
first := piece[:index] first := piece[:index]
firstInt := -1 firstInt := int64(-1)
var err error var err error
last := piece[index+1:] last := piece[index+1:]
var lastInt = -1 var lastInt = int64(-1)
if len(first) > 0 { if len(first) > 0 {
firstInt, err = strconv.Atoi(first) firstInt, err = strconv.ParseInt(first, 10, 64)
if err != nil { if err != nil {
return return
} }
if len(last) > 0 { if len(last) > 0 {
lastInt, err = strconv.Atoi(last) lastInt, err = strconv.ParseInt(last, 10, 64)
if err != nil { if err != nil {
return return
} }
@@ -59,14 +59,14 @@ func httpRequestParseContentRange(rangeValue string) (result [][]int, ok bool) {
return return
} }
lastInt, err = strconv.Atoi(last) lastInt, err = strconv.ParseInt(last, 10, 64)
if err != nil { if err != nil {
return return
} }
lastInt = -lastInt lastInt = -lastInt
} }
result = append(result, []int{firstInt, lastInt}) result = append(result, []int64{firstInt, lastInt})
} }
ok = true ok = true
@@ -74,7 +74,7 @@ func httpRequestParseContentRange(rangeValue string) (result [][]int, ok bool) {
} }
// 读取内容Range // 读取内容Range
func httpRequestReadRange(reader io.Reader, buf []byte, start int, end int, callback func(buf []byte, n int) error) (ok bool, err error) { func httpRequestReadRange(reader io.Reader, buf []byte, start int64, end int64, callback func(buf []byte, n int) error) (ok bool, err error) {
if start < 0 || end < 0 { if start < 0 || end < 0 {
return return
} }
@@ -82,7 +82,7 @@ func httpRequestReadRange(reader io.Reader, buf []byte, start int, end int, call
if !ok { if !ok {
return return
} }
_, err = seeker.Seek(int64(start), io.SeekStart) _, err = seeker.Seek(start, io.SeekStart)
if err != nil { if err != nil {
return false, nil return false, nil
} }
@@ -91,9 +91,9 @@ func httpRequestReadRange(reader io.Reader, buf []byte, start int, end int, call
for { for {
n, err := reader.Read(buf) n, err := reader.Read(buf)
if n > 0 { if n > 0 {
offset += n offset += int64(n)
if end < offset { if end < offset {
err = callback(buf, n-(offset-end-1)) err = callback(buf, n-int(offset-end-1))
if err != nil { if err != nil {
return false, err return false, err
} }