diff --git a/internal/caches/list_file_db.go b/internal/caches/list_file_db.go index dc7bae3..771625e 100644 --- a/internal/caches/list_file_db.go +++ b/internal/caches/list_file_db.go @@ -14,6 +14,7 @@ import ( "github.com/iwind/TeaGo/types" timeutil "github.com/iwind/TeaGo/utils/time" "os" + "path/filepath" "runtime" "strings" "time" @@ -108,7 +109,7 @@ func (this *FileListDB) Open(dbPath string) error { this.writeBatch = dbs.NewBatch(writeDB, 4) this.writeBatch.OnFail(func(err error) { - remotelogs.Warn("LIST_FILE_DB", "run batch failed: "+err.Error()) + remotelogs.Warn("LIST_FILE_DB", "run batch failed: "+err.Error()+" ("+filepath.Base(this.dbPath)+")") }) goman.New(func() { diff --git a/internal/caches/partial_ranges.go b/internal/caches/partial_ranges.go index 86b321c..5fa9a21 100644 --- a/internal/caches/partial_ranges.go +++ b/internal/caches/partial_ranges.go @@ -7,19 +7,21 @@ import ( "encoding/json" "github.com/iwind/TeaGo/types" "os" + "strconv" ) // PartialRanges 内容分区范围定义 type PartialRanges struct { - ExpiresAt int64 `json:"expiresAt"` // 过期时间 - Ranges [][2]int64 `json:"ranges"` + Version int `json:"version"` // 版本号 + Ranges [][2]int64 `json:"ranges"` // 范围 + BodySize int64 `json:"bodySize"` // 总长度 } // NewPartialRanges 获取新对象 func NewPartialRanges(expiresAt int64) *PartialRanges { return &PartialRanges{ - Ranges: [][2]int64{}, - ExpiresAt: expiresAt, + Ranges: [][2]int64{}, + Version: 1, } } @@ -35,9 +37,11 @@ func NewPartialRangesFromData(data []byte) (*PartialRanges, error) { var colonIndex = bytes.IndexRune(line, ':') if colonIndex > 0 { switch string(line[:colonIndex]) { - case "e": - rs.ExpiresAt = types.Int64(line[colonIndex+1:]) - case "r": + case "v": // 版本号 + rs.Version = types.Int(line[colonIndex+1:]) + case "b": // 总长度 + rs.BodySize = types.Int64(line[colonIndex+1:]) + case "r": // 范围信息 var commaIndex = bytes.IndexRune(line, ',') if commaIndex > 0 { rs.Ranges = append(rs.Ranges, [2]int64{types.Int64(line[colonIndex+1 : commaIndex]), types.Int64(line[commaIndex+1:])}) @@ -53,16 +57,18 @@ func NewPartialRangesFromData(data []byte) (*PartialRanges, error) { } // NewPartialRangesFromJSON 从JSON中解析范围 -func newPartialRangesFromJSON(data []byte) (*PartialRanges, error) { +func NewPartialRangesFromJSON(data []byte) (*PartialRanges, error) { var rs = NewPartialRanges(0) err := json.Unmarshal(data, &rs) if err != nil { return nil, err } + rs.Version = 0 return rs, nil } +// NewPartialRangesFromFile 从文件中加载范围信息 func NewPartialRangesFromFile(path string) (*PartialRanges, error) { data, err := os.ReadFile(path) if err != nil { @@ -74,7 +80,7 @@ func NewPartialRangesFromFile(path string) (*PartialRanges, error) { // 兼容老的JSON格式 if data[0] == '{' { - return newPartialRangesFromJSON(data) + return NewPartialRangesFromJSON(data) } // 新的格式 @@ -151,13 +157,15 @@ func (this *PartialRanges) Nearest(begin int64, end int64) (r [2]int64, ok bool) // 转换为字符串 func (this *PartialRanges) String() string { - var s = "e:" + types.String(this.ExpiresAt) + "\n" + var s = "v:" + strconv.Itoa(this.Version) + "\n" + // version + "b:" + this.formatInt64(this.BodySize) + "\n" // bodySize for _, r := range this.Ranges { - s += "r:" + types.String(r[0]) + "," + types.String(r[1]) + "\n" + s += "r:" + this.formatInt64(r[0]) + "," + this.formatInt64(r[1]) + "\n" // range } return s } +// Bytes 将内容转换为字节 func (this *PartialRanges) Bytes() []byte { return []byte(this.String()) } @@ -167,6 +175,7 @@ func (this *PartialRanges) WriteToFile(path string) error { return os.WriteFile(path, this.Bytes(), 0666) } +// Max 获取最大位置 func (this *PartialRanges) Max() int64 { if len(this.Ranges) > 0 { return this.Ranges[len(this.Ranges)-1][1] @@ -174,6 +183,7 @@ func (this *PartialRanges) Max() int64 { return 0 } +// Reset 重置范围信息 func (this *PartialRanges) Reset() { this.Ranges = [][2]int64{} } @@ -230,3 +240,7 @@ func (this *PartialRanges) max(n1 int64, n2 int64) int64 { } return n2 } + +func (this *PartialRanges) formatInt64(i int64) string { + return strconv.FormatInt(i, 10) +} diff --git a/internal/caches/partial_ranges_test.go b/internal/caches/partial_ranges_test.go index 891888d..703e1de 100644 --- a/internal/caches/partial_ranges_test.go +++ b/internal/caches/partial_ranges_test.go @@ -149,7 +149,6 @@ func TestNewPartialRanges_Large_Range(t *testing.T) { func TestPartialRanges_Encode_JSON(t *testing.T) { var r = caches.NewPartialRanges(0) - r.ExpiresAt = time.Now().Unix() for i := 0; i < 10; i++ { r.Ranges = append(r.Ranges, [2]int64{int64(i * 100), int64(i*100 + 100)}) } @@ -164,7 +163,7 @@ func TestPartialRanges_Encode_JSON(t *testing.T) { func TestPartialRanges_Encode_String(t *testing.T) { var r = caches.NewPartialRanges(0) - r.ExpiresAt = time.Now().Unix() + r.BodySize = 1024 for i := 0; i < 10; i++ { r.Ranges = append(r.Ranges, [2]int64{int64(i * 100), int64(i*100 + 100)}) } @@ -180,6 +179,36 @@ func TestPartialRanges_Encode_String(t *testing.T) { logs.PrintAsJSON(r2, t) } +func TestPartialRanges_Version(t *testing.T) { + { + ranges, err := caches.NewPartialRangesFromData([]byte(`e:1668928495 +r:0,1048576 +r:1140260864,1140295164`)) + if err != nil { + t.Fatal(err) + } + t.Log("version:", ranges.Version) + } + { + ranges, err := caches.NewPartialRangesFromData([]byte(`e:1668928495 +r:0,1048576 +r:1140260864,1140295164 +v:0 +`)) + if err != nil { + t.Fatal(err) + } + t.Log("version:", ranges.Version) + } + { + ranges, err := caches.NewPartialRangesFromJSON([]byte(`{}`)) + if err != nil { + t.Fatal(err) + } + t.Log("version:", ranges.Version) + } +} + func BenchmarkNewPartialRanges(b *testing.B) { for i := 0; i < b.N; i++ { var r = caches.NewPartialRanges(0) @@ -188,3 +217,16 @@ func BenchmarkNewPartialRanges(b *testing.B) { } } } + +func BenchmarkPartialRanges_String(b *testing.B) { + var r = caches.NewPartialRanges(0) + r.BodySize = 1024 + for i := 0; i < 10; i++ { + r.Ranges = append(r.Ranges, [2]int64{int64(i * 100), int64(i*100 + 100)}) + } + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _ = r.String() + } +} diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 1ad9de9..c030c78 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -556,13 +556,23 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea _ = partialReader.Close() if err == nil && partialReader.bodyOffset > 0 { partialRanges = partialReader.Ranges() - isNewCreated = false - partialBodyOffset = partialReader.bodyOffset + if bodySize > 0 && partialRanges != nil && partialRanges.BodySize > 0 && bodySize != partialRanges.BodySize { + _ = this.removeCacheFile(tmpPath) + } else { + isNewCreated = false + partialBodyOffset = partialReader.bodyOffset + } } else { _ = this.removeCacheFile(tmpPath) } } } + if isNewCreated { + err = this.list.Remove(hash) + if err != nil { + return nil, err + } + } if partialRanges == nil { partialRanges = NewPartialRanges(expiredAt) } @@ -617,7 +627,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea } var metaBodySize int64 = -1 - var metaHeaderSize int = -1 + var metaHeaderSize = -1 if isNewCreated { // 写入meta // 从v0.5.8开始不再在meta中写入Key @@ -650,7 +660,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea isOk = true if isPartial { - return NewPartialFileWriter(writer, key, expiredAt, isNewCreated, isPartial, partialBodyOffset, partialRanges, func() { + return NewPartialFileWriter(writer, key, expiredAt, metaHeaderSize, metaBodySize, isNewCreated, isPartial, partialBodyOffset, partialRanges, func() { sharedWritingFileKeyLocker.Lock() delete(sharedWritingFileKeyMap, key) if len(sharedWritingFileKeyMap) == 0 { diff --git a/internal/caches/writer_partial_file.go b/internal/caches/writer_partial_file.go index 1b48c2d..c718a0f 100644 --- a/internal/caches/writer_partial_file.go +++ b/internal/caches/writer_partial_file.go @@ -11,13 +11,18 @@ import ( ) type PartialFileWriter struct { - rawWriter *os.File - key string - headerSize int64 - bodySize int64 - expiredAt int64 - endFunc func() - once sync.Once + rawWriter *os.File + key string + + metaHeaderSize int + headerSize int64 + + metaBodySize int64 + bodySize int64 + + expiredAt int64 + endFunc func() + once sync.Once isNew bool isPartial bool @@ -27,17 +32,19 @@ type PartialFileWriter struct { rangePath string } -func NewPartialFileWriter(rawWriter *os.File, key string, expiredAt int64, isNew bool, isPartial bool, bodyOffset int64, ranges *PartialRanges, endFunc func()) *PartialFileWriter { +func NewPartialFileWriter(rawWriter *os.File, key string, expiredAt int64, metaHeaderSize int, metaBodySize int64, isNew bool, isPartial bool, bodyOffset int64, ranges *PartialRanges, endFunc func()) *PartialFileWriter { return &PartialFileWriter{ - key: key, - rawWriter: rawWriter, - expiredAt: expiredAt, - endFunc: endFunc, - isNew: isNew, - isPartial: isPartial, - bodyOffset: bodyOffset, - ranges: ranges, - rangePath: partialRangesFilePath(rawWriter.Name()), + key: key, + rawWriter: rawWriter, + expiredAt: expiredAt, + endFunc: endFunc, + isNew: isNew, + isPartial: isPartial, + bodyOffset: bodyOffset, + ranges: ranges, + rangePath: partialRangesFilePath(rawWriter.Name()), + metaHeaderSize: metaHeaderSize, + metaBodySize: metaBodySize, } } @@ -71,7 +78,11 @@ func (this *PartialFileWriter) AppendHeader(data []byte) error { // WriteHeaderLength 写入Header长度数据 func (this *PartialFileWriter) WriteHeaderLength(headerLength int) error { - bytes4 := make([]byte, 4) + if this.metaHeaderSize > 0 && this.metaHeaderSize == headerLength { + return nil + } + + var bytes4 = make([]byte, 4) binary.BigEndian.PutUint32(bytes4, uint32(headerLength)) _, err := this.rawWriter.Seek(SizeExpiresAt+SizeStatus+SizeURLLength, io.SeekStart) if err != nil { @@ -110,8 +121,13 @@ func (this *PartialFileWriter) WriteAt(offset int64, data []byte) error { } if this.bodyOffset == 0 { - this.bodyOffset = SizeMeta + int64(len(this.key)) + this.headerSize + var keyLength = 0 + if this.ranges.Version == 0 { // 以往的版本包含有Key + keyLength = len(this.key) + } + this.bodyOffset = SizeMeta + int64(keyLength) + this.headerSize } + _, err := this.rawWriter.WriteAt(data, this.bodyOffset+offset) if err != nil { return err @@ -129,7 +145,10 @@ func (this *PartialFileWriter) SetBodyLength(bodyLength int64) { // WriteBodyLength 写入Body长度数据 func (this *PartialFileWriter) WriteBodyLength(bodyLength int64) error { - bytes8 := make([]byte, 8) + if this.metaBodySize > 0 && this.metaBodySize == bodyLength { + return nil + } + var bytes8 = make([]byte, 8) binary.BigEndian.PutUint64(bytes8, uint64(bodyLength)) _, err := this.rawWriter.Seek(SizeExpiresAt+SizeStatus+SizeURLLength+SizeHeaderLength, io.SeekStart) if err != nil { @@ -150,6 +169,7 @@ func (this *PartialFileWriter) Close() error { this.endFunc() }) + this.ranges.BodySize = this.bodySize err := this.ranges.WriteToFile(this.rangePath) if err != nil { _ = this.rawWriter.Close() diff --git a/internal/caches/writer_partial_file_test.go b/internal/caches/writer_partial_file_test.go index 91eba70..8352678 100644 --- a/internal/caches/writer_partial_file_test.go +++ b/internal/caches/writer_partial_file_test.go @@ -27,7 +27,7 @@ func TestPartialFileWriter_Write(t *testing.T) { t.Fatal(err) } var ranges = caches.NewPartialRanges(0) - var writer = caches.NewPartialFileWriter(fp, "test", time.Now().Unix()+86500, true, true, 0, ranges, func() { + var writer = caches.NewPartialFileWriter(fp, "test", time.Now().Unix()+86500, -1, -1, true, true, 0, ranges, func() { t.Log("end") }) _, err = writer.WriteHeader([]byte("header")) diff --git a/internal/nodes/http_request_cache.go b/internal/nodes/http_request_cache.go index a416733..580261b 100644 --- a/internal/nodes/http_request_cache.go +++ b/internal/nodes/http_request_cache.go @@ -628,14 +628,14 @@ func (this *HTTPRequest) tryPartialReader(storage caches.StorageInterface, key s }() // 检查范围 - const maxFirstSpan = 16 << 20 // TODO 可以在缓存策略中设置此值 + //const maxFirstSpan = 16 << 20 // TODO 可以在缓存策略中设置此值 for index, r := range ranges { - // 没有指定结束字节时,自动指定一个 - if r.Start() >= 0 && r.End() == -1 { + // 没有指定结束位置时,自动指定一个 + /**if r.Start() >= 0 && r.End() == -1 { if partialReader.MaxLength() > r.Start()+maxFirstSpan { r[1] = r.Start() + maxFirstSpan } - } + }**/ r1, ok := r.Convert(partialReader.MaxLength()) if !ok { return nil, nil diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index 487ec3e..be2bc79 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -303,7 +303,19 @@ func (this *HTTPWriter) PrepareCache(resp *http.Response, size int64) { if this.isPartial { cacheKey += caches.SuffixPartial } - cacheWriter, err := storage.OpenWriter(cacheKey, expiresAt, this.StatusCode(), this.calculateHeaderLength(), size, cacheRef.MaxSizeBytes(), this.isPartial) + + // 待写入尺寸 + var totalSize = size + if totalSize < 0 && this.isPartial { + var contentRange = resp.Header.Get("Content-Range") + if len(contentRange) > 0 { + _, partialTotalSize := httpRequestParseContentRangeHeader(contentRange) + if partialTotalSize > 0 { + totalSize = partialTotalSize + } + } + } + cacheWriter, err := storage.OpenWriter(cacheKey, expiresAt, this.StatusCode(), this.calculateHeaderLength(), totalSize, cacheRef.MaxSizeBytes(), this.isPartial) if err != nil { if err == caches.ErrEntityTooLarge && addStatusHeader { this.Header().Set("X-Cache", "BYPASS, entity too large")