From 500c1bc8c28bca9f8f87901dd4ba04af7cba4022 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Thu, 17 Feb 2022 16:56:13 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=BB=8E=E7=BC=93=E5=AD=98?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E4=B8=AD=E8=AF=BB=E5=8F=96=E5=8E=8B=E7=BC=A9?= =?UTF-8?q?=E5=86=85=E5=AE=B9=E6=97=B6=E5=8F=AF=E8=83=BD=E5=A4=B1=E8=B4=A5?= =?UTF-8?q?=E7=9A=84Bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/caches/reader_file.go | 72 +++-------------------- internal/compressions/reader_gzip_test.go | 68 +++++++++++++++++++++ internal/nodes/http_request_cache.go | 11 ++-- internal/nodes/http_writer.go | 1 + 4 files changed, 83 insertions(+), 69 deletions(-) create mode 100644 internal/compressions/reader_gzip_test.go diff --git a/internal/caches/reader_file.go b/internal/caches/reader_file.go index 5317ff8..aabc100 100644 --- a/internal/caches/reader_file.go +++ b/internal/caches/reader_file.go @@ -23,9 +23,6 @@ type FileReader struct { headerSize int bodySize int64 bodyOffset int64 - - bodyBufLen int - bodyBuf []byte } func NewFileReader(fp *os.File) *FileReader { @@ -181,10 +178,6 @@ func (this *FileReader) ReadHeader(buf []byte, callback ReaderFunc) error { } headerSize -= n } else { - if n > headerSize { - this.bodyBuf = buf[headerSize:] - this.bodyBufLen = n - headerSize - } _, e := callback(headerSize) if e != nil { isOk = true @@ -203,6 +196,12 @@ func (this *FileReader) ReadHeader(buf []byte, callback ReaderFunc) error { isOk = true + // 移动到Body位置 + _, err = this.fp.Seek(this.bodyOffset, io.SeekStart) + if err != nil { + return err + } + return nil } @@ -215,27 +214,7 @@ func (this *FileReader) ReadBody(buf []byte, callback ReaderFunc) error { } }() - offset := this.bodyOffset - - // 直接返回从Header中剩余的 - if this.bodyBufLen > 0 && len(buf) >= this.bodyBufLen { - offset += int64(this.bodyBufLen) - - copy(buf, this.bodyBuf) - isOk = true - - goNext, err := callback(this.bodyBufLen) - if err != nil { - return err - } - if !goNext { - return nil - } - - if this.bodySize <= int64(this.bodyBufLen) { - return nil - } - } + var offset = this.bodyOffset // 开始读Body部分 _, err := this.fp.Seek(offset, io.SeekStart) @@ -269,42 +248,9 @@ func (this *FileReader) ReadBody(buf []byte, callback ReaderFunc) error { } func (this *FileReader) Read(buf []byte) (n int, err error) { - var isOk = false - - defer func() { - if !isOk { - _ = this.discard() - } - }() - - // 直接返回从Header中剩余的 - if this.bodyBufLen > 0 { - var bufLen = len(buf) - if bufLen < this.bodyBufLen { - this.bodyBufLen -= bufLen - copy(buf, this.bodyBuf[:bufLen]) - this.bodyBuf = this.bodyBuf[bufLen:] - - n = bufLen - } else { - copy(buf, this.bodyBuf) - this.bodyBuf = nil - - if this.bodySize <= int64(this.bodyBufLen) { - err = io.EOF - } - - n = this.bodyBufLen - this.bodyBufLen = 0 - } - - isOk = true - return - } - n, err = this.fp.Read(buf) - if err == nil || err == io.EOF { - isOk = true + if err != nil && err != io.EOF { + _ = this.discard() } return } diff --git a/internal/compressions/reader_gzip_test.go b/internal/compressions/reader_gzip_test.go new file mode 100644 index 0000000..0d364f4 --- /dev/null +++ b/internal/compressions/reader_gzip_test.go @@ -0,0 +1,68 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package compressions + +import ( + "bytes" + "errors" + "github.com/TeaOSLab/EdgeNode/internal/caches" + "io" + "os" + "testing" +) + +func TestGzipReader(t *testing.T) { + fp, err := os.Open("/Users/WorkSpace/EdgeProject/EdgeCache/p43/36/7e/367e02720713fe05b66573a1d69b4f0a.cache") + if err != nil { + // not fatal + t.Log(err) + return + } + defer func() { + _ = fp.Close() + }() + + var buf = make([]byte, 32*1024) + cacheReader := caches.NewFileReader(fp) + err = cacheReader.Init() + if err != nil { + t.Fatal(err) + } + var headerBuf = []byte{} + err = cacheReader.ReadHeader(buf, func(n int) (goNext bool, err error) { + headerBuf = append(headerBuf, buf[:n]...) + for { + nIndex := bytes.Index(headerBuf, []byte{'\n'}) + if nIndex >= 0 { + row := headerBuf[:nIndex] + spaceIndex := bytes.Index(row, []byte{':'}) + if spaceIndex <= 0 { + return false, errors.New("invalid header '" + string(row) + "'") + } + + headerBuf = headerBuf[nIndex+1:] + } else { + break + } + } + return true, nil + }) + + reader, err := NewGzipReader(cacheReader) + if err != nil { + t.Fatal(err) + } + + for { + n, err := reader.Read(buf) + if err != nil { + if err != io.EOF { + t.Fatal(err) + } else { + break + } + } + t.Log(string(buf[:n])) + _ = n + } +} diff --git a/internal/nodes/http_request_cache.go b/internal/nodes/http_request_cache.go index ff90132..e953655 100644 --- a/internal/nodes/http_request_cache.go +++ b/internal/nodes/http_request_cache.go @@ -187,7 +187,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { } if !this.canIgnore(err) { - remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error()) + remotelogs.Warn("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: "+err.Error()) } return } @@ -237,7 +237,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { }) if err != nil { if !this.canIgnore(err) { - remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error()) + remotelogs.Warn("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: "+err.Error()) } return } @@ -367,7 +367,6 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { } } - respHeader := this.writer.Header() if len(rangeSet) == 1 { respHeader.Set("Content-Range", "bytes "+strconv.FormatInt(rangeSet[0][0], 10)+"-"+strconv.FormatInt(rangeSet[0][1], 10)+"/"+strconv.FormatInt(reader.BodySize(), 10)) respHeader.Set("Content-Length", strconv.FormatInt(rangeSet[0][1]-rangeSet[0][0]+1, 10)) @@ -389,7 +388,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { return true } if !this.canIgnore(err) { - remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error()) + remotelogs.Warn("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: "+err.Error()) } return } @@ -435,7 +434,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { }) if err != nil { if !this.canIgnore(err) { - remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error()) + remotelogs.Warn("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: "+err.Error()) } return true } @@ -461,7 +460,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { this.varMapping["cache.status"] = "MISS" if !this.canIgnore(err) { - remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error()) + remotelogs.Warn("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: "+err.Error()) } return } diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index 3c31bda..eb9f036 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -368,6 +368,7 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) { if err != nil { return } + this.Header().Del("Content-Encoding") resp.Body = reader }