mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-11 22:00:25 +08:00
修复从缓存文件中读取压缩内容时可能失败的Bug
This commit is contained in:
@@ -23,9 +23,6 @@ type FileReader struct {
|
|||||||
headerSize int
|
headerSize int
|
||||||
bodySize int64
|
bodySize int64
|
||||||
bodyOffset int64
|
bodyOffset int64
|
||||||
|
|
||||||
bodyBufLen int
|
|
||||||
bodyBuf []byte
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFileReader(fp *os.File) *FileReader {
|
func NewFileReader(fp *os.File) *FileReader {
|
||||||
@@ -181,10 +178,6 @@ func (this *FileReader) ReadHeader(buf []byte, callback ReaderFunc) error {
|
|||||||
}
|
}
|
||||||
headerSize -= n
|
headerSize -= n
|
||||||
} else {
|
} else {
|
||||||
if n > headerSize {
|
|
||||||
this.bodyBuf = buf[headerSize:]
|
|
||||||
this.bodyBufLen = n - headerSize
|
|
||||||
}
|
|
||||||
_, e := callback(headerSize)
|
_, e := callback(headerSize)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
isOk = true
|
isOk = true
|
||||||
@@ -203,6 +196,12 @@ func (this *FileReader) ReadHeader(buf []byte, callback ReaderFunc) error {
|
|||||||
|
|
||||||
isOk = true
|
isOk = true
|
||||||
|
|
||||||
|
// 移动到Body位置
|
||||||
|
_, err = this.fp.Seek(this.bodyOffset, io.SeekStart)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -215,27 +214,7 @@ func (this *FileReader) ReadBody(buf []byte, callback ReaderFunc) error {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
offset := this.bodyOffset
|
var offset = this.bodyOffset
|
||||||
|
|
||||||
// 直接返回从Header中剩余的
|
|
||||||
if this.bodyBufLen > 0 && len(buf) >= this.bodyBufLen {
|
|
||||||
offset += int64(this.bodyBufLen)
|
|
||||||
|
|
||||||
copy(buf, this.bodyBuf)
|
|
||||||
isOk = true
|
|
||||||
|
|
||||||
goNext, err := callback(this.bodyBufLen)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if !goNext {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if this.bodySize <= int64(this.bodyBufLen) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 开始读Body部分
|
// 开始读Body部分
|
||||||
_, err := this.fp.Seek(offset, io.SeekStart)
|
_, err := this.fp.Seek(offset, io.SeekStart)
|
||||||
@@ -269,42 +248,9 @@ func (this *FileReader) ReadBody(buf []byte, callback ReaderFunc) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *FileReader) Read(buf []byte) (n int, err error) {
|
func (this *FileReader) Read(buf []byte) (n int, err error) {
|
||||||
var isOk = false
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
if !isOk {
|
|
||||||
_ = this.discard()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// 直接返回从Header中剩余的
|
|
||||||
if this.bodyBufLen > 0 {
|
|
||||||
var bufLen = len(buf)
|
|
||||||
if bufLen < this.bodyBufLen {
|
|
||||||
this.bodyBufLen -= bufLen
|
|
||||||
copy(buf, this.bodyBuf[:bufLen])
|
|
||||||
this.bodyBuf = this.bodyBuf[bufLen:]
|
|
||||||
|
|
||||||
n = bufLen
|
|
||||||
} else {
|
|
||||||
copy(buf, this.bodyBuf)
|
|
||||||
this.bodyBuf = nil
|
|
||||||
|
|
||||||
if this.bodySize <= int64(this.bodyBufLen) {
|
|
||||||
err = io.EOF
|
|
||||||
}
|
|
||||||
|
|
||||||
n = this.bodyBufLen
|
|
||||||
this.bodyBufLen = 0
|
|
||||||
}
|
|
||||||
|
|
||||||
isOk = true
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
n, err = this.fp.Read(buf)
|
n, err = this.fp.Read(buf)
|
||||||
if err == nil || err == io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
isOk = true
|
_ = this.discard()
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
68
internal/compressions/reader_gzip_test.go
Normal file
68
internal/compressions/reader_gzip_test.go
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||||
|
|
||||||
|
package compressions
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"errors"
|
||||||
|
"github.com/TeaOSLab/EdgeNode/internal/caches"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGzipReader(t *testing.T) {
|
||||||
|
fp, err := os.Open("/Users/WorkSpace/EdgeProject/EdgeCache/p43/36/7e/367e02720713fe05b66573a1d69b4f0a.cache")
|
||||||
|
if err != nil {
|
||||||
|
// not fatal
|
||||||
|
t.Log(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
_ = fp.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
var buf = make([]byte, 32*1024)
|
||||||
|
cacheReader := caches.NewFileReader(fp)
|
||||||
|
err = cacheReader.Init()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
var headerBuf = []byte{}
|
||||||
|
err = cacheReader.ReadHeader(buf, func(n int) (goNext bool, err error) {
|
||||||
|
headerBuf = append(headerBuf, buf[:n]...)
|
||||||
|
for {
|
||||||
|
nIndex := bytes.Index(headerBuf, []byte{'\n'})
|
||||||
|
if nIndex >= 0 {
|
||||||
|
row := headerBuf[:nIndex]
|
||||||
|
spaceIndex := bytes.Index(row, []byte{':'})
|
||||||
|
if spaceIndex <= 0 {
|
||||||
|
return false, errors.New("invalid header '" + string(row) + "'")
|
||||||
|
}
|
||||||
|
|
||||||
|
headerBuf = headerBuf[nIndex+1:]
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
reader, err := NewGzipReader(cacheReader)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
n, err := reader.Read(buf)
|
||||||
|
if err != nil {
|
||||||
|
if err != io.EOF {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
t.Log(string(buf[:n]))
|
||||||
|
_ = n
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -187,7 +187,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !this.canIgnore(err) {
|
if !this.canIgnore(err) {
|
||||||
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
|
remotelogs.Warn("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: "+err.Error())
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -237,7 +237,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
|||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !this.canIgnore(err) {
|
if !this.canIgnore(err) {
|
||||||
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
|
remotelogs.Warn("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: "+err.Error())
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -367,7 +367,6 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
respHeader := this.writer.Header()
|
|
||||||
if len(rangeSet) == 1 {
|
if len(rangeSet) == 1 {
|
||||||
respHeader.Set("Content-Range", "bytes "+strconv.FormatInt(rangeSet[0][0], 10)+"-"+strconv.FormatInt(rangeSet[0][1], 10)+"/"+strconv.FormatInt(reader.BodySize(), 10))
|
respHeader.Set("Content-Range", "bytes "+strconv.FormatInt(rangeSet[0][0], 10)+"-"+strconv.FormatInt(rangeSet[0][1], 10)+"/"+strconv.FormatInt(reader.BodySize(), 10))
|
||||||
respHeader.Set("Content-Length", strconv.FormatInt(rangeSet[0][1]-rangeSet[0][0]+1, 10))
|
respHeader.Set("Content-Length", strconv.FormatInt(rangeSet[0][1]-rangeSet[0][0]+1, 10))
|
||||||
@@ -389,7 +388,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if !this.canIgnore(err) {
|
if !this.canIgnore(err) {
|
||||||
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
|
remotelogs.Warn("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: "+err.Error())
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -435,7 +434,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
|||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !this.canIgnore(err) {
|
if !this.canIgnore(err) {
|
||||||
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
|
remotelogs.Warn("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: "+err.Error())
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
@@ -461,7 +460,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
|
|||||||
this.varMapping["cache.status"] = "MISS"
|
this.varMapping["cache.status"] = "MISS"
|
||||||
|
|
||||||
if !this.canIgnore(err) {
|
if !this.canIgnore(err) {
|
||||||
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
|
remotelogs.Warn("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: "+err.Error())
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -368,6 +368,7 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
this.Header().Del("Content-Encoding")
|
||||||
resp.Body = reader
|
resp.Body = reader
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user