diff --git a/internal/caches/partial_ranges.go b/internal/caches/partial_ranges.go index 52b5a1c..5333a7a 100644 --- a/internal/caches/partial_ranges.go +++ b/internal/caches/partial_ranges.go @@ -10,17 +10,21 @@ import ( // PartialRanges 内容分区范围定义 type PartialRanges struct { - Ranges [][2]int64 `json:"ranges"` + ExpiresAt int64 `json:"expiresAt"` // 过期时间 + Ranges [][2]int64 `json:"ranges"` } // NewPartialRanges 获取新对象 -func NewPartialRanges() *PartialRanges { - return &PartialRanges{Ranges: [][2]int64{}} +func NewPartialRanges(expiresAt int64) *PartialRanges { + return &PartialRanges{ + Ranges: [][2]int64{}, + ExpiresAt: expiresAt, + } } // NewPartialRangesFromJSON 从JSON中解析范围 func NewPartialRangesFromJSON(data []byte) (*PartialRanges, error) { - var rs = NewPartialRanges() + var rs = NewPartialRanges(0) err := json.Unmarshal(data, &rs) if err != nil { return nil, err @@ -135,6 +139,10 @@ func (this *PartialRanges) Max() int64 { return 0 } +func (this *PartialRanges) Reset() { + this.Ranges = [][2]int64{} +} + func (this *PartialRanges) merge(index int) { // forward var lastIndex = index diff --git a/internal/caches/partial_ranges_test.go b/internal/caches/partial_ranges_test.go index 956ea98..0cf5a38 100644 --- a/internal/caches/partial_ranges_test.go +++ b/internal/caches/partial_ranges_test.go @@ -10,7 +10,7 @@ import ( ) func TestNewPartialRanges(t *testing.T) { - var r = caches.NewPartialRanges() + var r = caches.NewPartialRanges(0) r.Add(1, 100) r.Add(50, 300) @@ -28,7 +28,7 @@ func TestNewPartialRanges(t *testing.T) { func TestNewPartialRanges1(t *testing.T) { var a = assert.NewAssertion(t) - var r = caches.NewPartialRanges() + var r = caches.NewPartialRanges(0) r.Add(1, 100) r.Add(1, 101) r.Add(1, 102) @@ -47,7 +47,7 @@ func TestNewPartialRanges1(t *testing.T) { func TestNewPartialRanges2(t *testing.T) { // low -> high - var r = caches.NewPartialRanges() + var r = caches.NewPartialRanges(0) r.Add(1, 100) r.Add(1, 101) r.Add(1, 102) @@ -63,7 +63,7 @@ func TestNewPartialRanges2(t *testing.T) { func TestNewPartialRanges3(t *testing.T) { // high -> low - var r = caches.NewPartialRanges() + var r = caches.NewPartialRanges(0) r.Add(301, 302) r.Add(303, 304) r.Add(200, 300) @@ -75,7 +75,7 @@ func TestNewPartialRanges3(t *testing.T) { func TestNewPartialRanges4(t *testing.T) { // nearby - var r = caches.NewPartialRanges() + var r = caches.NewPartialRanges(0) r.Add(301, 302) r.Add(303, 304) r.Add(305, 306) @@ -90,7 +90,7 @@ func TestNewPartialRanges4(t *testing.T) { } func TestNewPartialRanges5(t *testing.T) { - var r = caches.NewPartialRanges() + var r = caches.NewPartialRanges(0) for j := 0; j < 1000; j++ { r.Add(int64(j), int64(j+100)) } @@ -100,7 +100,7 @@ func TestNewPartialRanges5(t *testing.T) { func TestNewPartialRanges_Nearest(t *testing.T) { { // nearby - var r = caches.NewPartialRanges() + var r = caches.NewPartialRanges(0) r.Add(301, 400) r.Add(401, 500) r.Add(501, 600) @@ -112,7 +112,7 @@ func TestNewPartialRanges_Nearest(t *testing.T) { { // nearby - var r = caches.NewPartialRanges() + var r = caches.NewPartialRanges(0) r.Add(301, 400) r.Add(450, 500) r.Add(550, 600) @@ -131,7 +131,7 @@ func TestNewPartialRanges_Large_Range(t *testing.T) { var largeSize int64 = 10000000000000 t.Log(largeSize/1024/1024/1024, "G") - var r = caches.NewPartialRanges() + var r = caches.NewPartialRanges(0) r.Add(1, largeSize) jsonData, err := r.AsJSON() if err != nil { @@ -148,7 +148,7 @@ func TestNewPartialRanges_Large_Range(t *testing.T) { } func TestNewPartialRanges_AsJSON(t *testing.T) { - var r = caches.NewPartialRanges() + var r = caches.NewPartialRanges(0) for j := 0; j < 1000; j++ { r.Add(int64(j), int64(j+100)) } @@ -167,7 +167,7 @@ func TestNewPartialRanges_AsJSON(t *testing.T) { func BenchmarkNewPartialRanges(b *testing.B) { for i := 0; i < b.N; i++ { - var r = caches.NewPartialRanges() + var r = caches.NewPartialRanges(0) for j := 0; j < 1000; j++ { r.Add(int64(j), int64(j+100)) } diff --git a/internal/caches/reader_partial_file.go b/internal/caches/reader_partial_file.go index 8ed81d0..96dcc81 100644 --- a/internal/caches/reader_partial_file.go +++ b/internal/caches/reader_partial_file.go @@ -117,13 +117,10 @@ func (this *PartialFileReader) ContainsRange(r rangeutils.Range) (r2 rangeutils. r2, ok = this.ranges.Nearest(r.Start(), r.End()) if ok && this.bodySize > 0 { // 考虑可配置 - var span int64 = 512 * 1024 - if this.bodySize > 1<<30 { - span = 1 << 20 - } + const minSpan = 128 << 10 // 这里限制返回的最小缓存,防止因为返回的内容过小而导致请求过多 - if r2.Length() < r.Length() && r2.Length() < span { + if r2.Length() < r.Length() && r2.Length() < minSpan { ok = false } } @@ -138,6 +135,10 @@ func (this *PartialFileReader) MaxLength() int64 { return this.ranges.Max() + 1 } +func (this *PartialFileReader) Ranges() *PartialRanges { + return this.ranges +} + func (this *PartialFileReader) discard() error { _ = os.Remove(this.rangePath) return this.FileReader.discard() diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 7dccc00..1ad9de9 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -544,19 +544,28 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea // 从已经存储的内容中读取信息 var isNewCreated = true var partialBodyOffset int64 + var partialRanges *PartialRanges if isPartial { - readerFp, err := os.OpenFile(tmpPath, os.O_RDONLY, 0444) - if err == nil { - var partialReader = NewPartialFileReader(readerFp) - err = partialReader.Init() - _ = partialReader.Close() - if err == nil && partialReader.bodyOffset > 0 { - isNewCreated = false - partialBodyOffset = partialReader.bodyOffset - } else { - _ = this.removeCacheFile(tmpPath) + // 数据库中是否存在 + existsCacheItem, _ := this.list.Exist(hash) + if existsCacheItem { + readerFp, err := os.OpenFile(tmpPath, os.O_RDONLY, 0444) + if err == nil { + var partialReader = NewPartialFileReader(readerFp) + err = partialReader.Init() + _ = partialReader.Close() + if err == nil && partialReader.bodyOffset > 0 { + partialRanges = partialReader.Ranges() + isNewCreated = false + partialBodyOffset = partialReader.bodyOffset + } else { + _ = this.removeCacheFile(tmpPath) + } } } + if partialRanges == nil { + partialRanges = NewPartialRanges(expiredAt) + } } var flags = os.O_CREATE | os.O_WRONLY @@ -641,12 +650,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea isOk = true if isPartial { - ranges, err := NewPartialRangesFromFile(cachePathName + "@ranges.cache") - if err != nil { - ranges = NewPartialRanges() - } - - return NewPartialFileWriter(writer, key, expiredAt, isNewCreated, isPartial, partialBodyOffset, ranges, func() { + return NewPartialFileWriter(writer, key, expiredAt, isNewCreated, isPartial, partialBodyOffset, partialRanges, func() { sharedWritingFileKeyLocker.Lock() delete(sharedWritingFileKeyMap, key) if len(sharedWritingFileKeyMap) == 0 { diff --git a/internal/caches/writer_partial_file.go b/internal/caches/writer_partial_file.go index 0429212..1b48c2d 100644 --- a/internal/caches/writer_partial_file.go +++ b/internal/caches/writer_partial_file.go @@ -152,6 +152,8 @@ func (this *PartialFileWriter) Close() error { err := this.ranges.WriteToFile(this.rangePath) if err != nil { + _ = this.rawWriter.Close() + this.remove() return err } diff --git a/internal/caches/writer_partial_file_test.go b/internal/caches/writer_partial_file_test.go index e1cfb48..91eba70 100644 --- a/internal/caches/writer_partial_file_test.go +++ b/internal/caches/writer_partial_file_test.go @@ -26,7 +26,7 @@ func TestPartialFileWriter_Write(t *testing.T) { if err != nil { t.Fatal(err) } - var ranges = caches.NewPartialRanges() + var ranges = caches.NewPartialRanges(0) var writer = caches.NewPartialFileWriter(fp, "test", time.Now().Unix()+86500, true, true, 0, ranges, func() { t.Log("end") }) diff --git a/internal/nodes/http_request_cache.go b/internal/nodes/http_request_cache.go index 3afc627..a416733 100644 --- a/internal/nodes/http_request_cache.go +++ b/internal/nodes/http_request_cache.go @@ -628,7 +628,14 @@ func (this *HTTPRequest) tryPartialReader(storage caches.StorageInterface, key s }() // 检查范围 + const maxFirstSpan = 16 << 20 // TODO 可以在缓存策略中设置此值 for index, r := range ranges { + // 没有指定结束字节时,自动指定一个 + if r.Start() >= 0 && r.End() == -1 { + if partialReader.MaxLength() > r.Start()+maxFirstSpan { + r[1] = r.Start() + maxFirstSpan + } + } r1, ok := r.Convert(partialReader.MaxLength()) if !ok { return nil, nil diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index a86e1f9..487ec3e 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -326,7 +326,7 @@ func (this *HTTPWriter) PrepareCache(resp *http.Response, size int64) { // 写入Header var headerBuf = utils.SharedBufferPool.Get() for k, v := range this.Header() { - if k == "Set-Cookie" { + if k == "Set-Cookie" || (this.isPartial && k == "Content-Range") { continue } for _, v1 := range v { @@ -649,7 +649,7 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) { // 写入Header var headerBuffer = utils.SharedBufferPool.Get() for k, v := range this.Header() { - if k == "Set-Cookie" { + if k == "Set-Cookie" || (this.isPartial && k == "Content-Range") { continue } for _, v1 := range v { @@ -1193,7 +1193,7 @@ func (this *HTTPWriter) finishRequest() { // 计算Header长度 func (this *HTTPWriter) calculateHeaderLength() (result int) { for k, v := range this.Header() { - if k == "Set-Cookie" { + if k == "Set-Cookie" || (this.isPartial && k == "Content-Range") { continue } for _, v1 := range v { diff --git a/internal/utils/ranges/range_test.go b/internal/utils/ranges/range_test.go index 29df200..73ec1b3 100644 --- a/internal/utils/ranges/range_test.go +++ b/internal/utils/ranges/range_test.go @@ -67,3 +67,10 @@ func TestRange_ComposeContentRangeHeader(t *testing.T) { var r = rangeutils.NewRange(1, 100) t.Log(r.ComposeContentRangeHeader("1000")) } + +func TestRange_SetLength(t *testing.T) { + var r = rangeutils.NewRange(1, 100) + t.Log(r) + + t.Log(r.SetLength(1024)) +}