mirror of
				https://github.com/TeaOSLab/EdgeNode.git
				synced 2025-11-04 07:40:56 +08:00 
			
		
		
		
	修复从缓存文件中读取压缩内容时可能失败的Bug
This commit is contained in:
		@@ -23,9 +23,6 @@ type FileReader struct {
 | 
			
		||||
	headerSize   int
 | 
			
		||||
	bodySize     int64
 | 
			
		||||
	bodyOffset   int64
 | 
			
		||||
 | 
			
		||||
	bodyBufLen int
 | 
			
		||||
	bodyBuf    []byte
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewFileReader(fp *os.File) *FileReader {
 | 
			
		||||
@@ -181,10 +178,6 @@ func (this *FileReader) ReadHeader(buf []byte, callback ReaderFunc) error {
 | 
			
		||||
				}
 | 
			
		||||
				headerSize -= n
 | 
			
		||||
			} else {
 | 
			
		||||
				if n > headerSize {
 | 
			
		||||
					this.bodyBuf = buf[headerSize:]
 | 
			
		||||
					this.bodyBufLen = n - headerSize
 | 
			
		||||
				}
 | 
			
		||||
				_, e := callback(headerSize)
 | 
			
		||||
				if e != nil {
 | 
			
		||||
					isOk = true
 | 
			
		||||
@@ -203,6 +196,12 @@ func (this *FileReader) ReadHeader(buf []byte, callback ReaderFunc) error {
 | 
			
		||||
 | 
			
		||||
	isOk = true
 | 
			
		||||
 | 
			
		||||
	// 移动到Body位置
 | 
			
		||||
	_, err = this.fp.Seek(this.bodyOffset, io.SeekStart)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -215,27 +214,7 @@ func (this *FileReader) ReadBody(buf []byte, callback ReaderFunc) error {
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	offset := this.bodyOffset
 | 
			
		||||
 | 
			
		||||
	// 直接返回从Header中剩余的
 | 
			
		||||
	if this.bodyBufLen > 0 && len(buf) >= this.bodyBufLen {
 | 
			
		||||
		offset += int64(this.bodyBufLen)
 | 
			
		||||
 | 
			
		||||
		copy(buf, this.bodyBuf)
 | 
			
		||||
		isOk = true
 | 
			
		||||
 | 
			
		||||
		goNext, err := callback(this.bodyBufLen)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		if !goNext {
 | 
			
		||||
			return nil
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if this.bodySize <= int64(this.bodyBufLen) {
 | 
			
		||||
			return nil
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	var offset = this.bodyOffset
 | 
			
		||||
 | 
			
		||||
	// 开始读Body部分
 | 
			
		||||
	_, err := this.fp.Seek(offset, io.SeekStart)
 | 
			
		||||
@@ -269,42 +248,9 @@ func (this *FileReader) ReadBody(buf []byte, callback ReaderFunc) error {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *FileReader) Read(buf []byte) (n int, err error) {
 | 
			
		||||
	var isOk = false
 | 
			
		||||
 | 
			
		||||
	defer func() {
 | 
			
		||||
		if !isOk {
 | 
			
		||||
			_ = this.discard()
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	// 直接返回从Header中剩余的
 | 
			
		||||
	if this.bodyBufLen > 0 {
 | 
			
		||||
		var bufLen = len(buf)
 | 
			
		||||
		if bufLen < this.bodyBufLen {
 | 
			
		||||
			this.bodyBufLen -= bufLen
 | 
			
		||||
			copy(buf, this.bodyBuf[:bufLen])
 | 
			
		||||
			this.bodyBuf = this.bodyBuf[bufLen:]
 | 
			
		||||
 | 
			
		||||
			n = bufLen
 | 
			
		||||
		} else {
 | 
			
		||||
			copy(buf, this.bodyBuf)
 | 
			
		||||
			this.bodyBuf = nil
 | 
			
		||||
 | 
			
		||||
			if this.bodySize <= int64(this.bodyBufLen) {
 | 
			
		||||
				err = io.EOF
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			n = this.bodyBufLen
 | 
			
		||||
			this.bodyBufLen = 0
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		isOk = true
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	n, err = this.fp.Read(buf)
 | 
			
		||||
	if err == nil || err == io.EOF {
 | 
			
		||||
		isOk = true
 | 
			
		||||
	if err != nil && err != io.EOF {
 | 
			
		||||
		_ = this.discard()
 | 
			
		||||
	}
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										68
									
								
								internal/compressions/reader_gzip_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										68
									
								
								internal/compressions/reader_gzip_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,68 @@
 | 
			
		||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
 | 
			
		||||
 | 
			
		||||
package compressions
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/caches"
 | 
			
		||||
	"io"
 | 
			
		||||
	"os"
 | 
			
		||||
	"testing"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestGzipReader(t *testing.T) {
 | 
			
		||||
	fp, err := os.Open("/Users/WorkSpace/EdgeProject/EdgeCache/p43/36/7e/367e02720713fe05b66573a1d69b4f0a.cache")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		// not fatal
 | 
			
		||||
		t.Log(err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	defer func() {
 | 
			
		||||
		_ = fp.Close()
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	var buf = make([]byte, 32*1024)
 | 
			
		||||
	cacheReader := caches.NewFileReader(fp)
 | 
			
		||||
	err = cacheReader.Init()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
	var headerBuf = []byte{}
 | 
			
		||||
	err = cacheReader.ReadHeader(buf, func(n int) (goNext bool, err error) {
 | 
			
		||||
		headerBuf = append(headerBuf, buf[:n]...)
 | 
			
		||||
		for {
 | 
			
		||||
			nIndex := bytes.Index(headerBuf, []byte{'\n'})
 | 
			
		||||
			if nIndex >= 0 {
 | 
			
		||||
				row := headerBuf[:nIndex]
 | 
			
		||||
				spaceIndex := bytes.Index(row, []byte{':'})
 | 
			
		||||
				if spaceIndex <= 0 {
 | 
			
		||||
					return false, errors.New("invalid header '" + string(row) + "'")
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				headerBuf = headerBuf[nIndex+1:]
 | 
			
		||||
			} else {
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		return true, nil
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	reader, err := NewGzipReader(cacheReader)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		n, err := reader.Read(buf)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			if err != io.EOF {
 | 
			
		||||
				t.Fatal(err)
 | 
			
		||||
			} else {
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		t.Log(string(buf[:n]))
 | 
			
		||||
		_ = n
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@@ -187,7 +187,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if !this.canIgnore(err) {
 | 
			
		||||
				remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
 | 
			
		||||
				remotelogs.Warn("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: "+err.Error())
 | 
			
		||||
			}
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
@@ -237,7 +237,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		if !this.canIgnore(err) {
 | 
			
		||||
			remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
 | 
			
		||||
			remotelogs.Warn("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: "+err.Error())
 | 
			
		||||
		}
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
@@ -367,7 +367,6 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		respHeader := this.writer.Header()
 | 
			
		||||
		if len(rangeSet) == 1 {
 | 
			
		||||
			respHeader.Set("Content-Range", "bytes "+strconv.FormatInt(rangeSet[0][0], 10)+"-"+strconv.FormatInt(rangeSet[0][1], 10)+"/"+strconv.FormatInt(reader.BodySize(), 10))
 | 
			
		||||
			respHeader.Set("Content-Length", strconv.FormatInt(rangeSet[0][1]-rangeSet[0][0]+1, 10))
 | 
			
		||||
@@ -389,7 +388,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
 | 
			
		||||
					return true
 | 
			
		||||
				}
 | 
			
		||||
				if !this.canIgnore(err) {
 | 
			
		||||
					remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
 | 
			
		||||
					remotelogs.Warn("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: "+err.Error())
 | 
			
		||||
				}
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
@@ -435,7 +434,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
 | 
			
		||||
				})
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					if !this.canIgnore(err) {
 | 
			
		||||
						remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
 | 
			
		||||
						remotelogs.Warn("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: "+err.Error())
 | 
			
		||||
					}
 | 
			
		||||
					return true
 | 
			
		||||
				}
 | 
			
		||||
@@ -461,7 +460,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
 | 
			
		||||
				this.varMapping["cache.status"] = "MISS"
 | 
			
		||||
 | 
			
		||||
				if !this.canIgnore(err) {
 | 
			
		||||
					remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
 | 
			
		||||
					remotelogs.Warn("HTTP_REQUEST_CACHE", this.URL()+": read from cache failed: "+err.Error())
 | 
			
		||||
				}
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
 
 | 
			
		||||
@@ -368,6 +368,7 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) {
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		this.Header().Del("Content-Encoding")
 | 
			
		||||
		resp.Body = reader
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user