实现基础的206 partial content缓存

This commit is contained in:
刘祥超
2022-03-03 19:36:28 +08:00
parent 8aeb5eb1b6
commit 581a3d49fc
39 changed files with 1139 additions and 271 deletions

View File

@@ -4,28 +4,37 @@ package caches
import (
"encoding/json"
"errors"
"io/ioutil"
)
// PartialRanges 内容分区范围定义
type PartialRanges struct {
ranges [][2]int64
Ranges [][2]int64 `json:"ranges"`
}
// NewPartialRanges 获取新对象
func NewPartialRanges() *PartialRanges {
return &PartialRanges{ranges: [][2]int64{}}
return &PartialRanges{Ranges: [][2]int64{}}
}
// NewPartialRangesFromJSON 从JSON中解析范围
func NewPartialRangesFromJSON(data []byte) (*PartialRanges, error) {
var rs = [][2]int64{}
var rs = NewPartialRanges()
err := json.Unmarshal(data, &rs)
if err != nil {
return nil, err
}
var r = NewPartialRanges()
r.ranges = rs
return r, nil
return rs, nil
}
func NewPartialRangesFromFile(path string) (*PartialRanges, error) {
data, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
return NewPartialRangesFromJSON(data)
}
// Add 添加新范围
@@ -36,46 +45,41 @@ func (this *PartialRanges) Add(begin int64, end int64) {
var nr = [2]int64{begin, end}
var count = len(this.ranges)
var count = len(this.Ranges)
if count == 0 {
this.ranges = [][2]int64{nr}
this.Ranges = [][2]int64{nr}
return
}
// insert
// TODO 将来使用二分法改进
var index = -1
for i, r := range this.ranges {
for i, r := range this.Ranges {
if r[0] > begin || (r[0] == begin && r[1] >= end) {
index = i
this.ranges = append(this.ranges, [2]int64{})
copy(this.ranges[index+1:], this.ranges[index:])
this.ranges[index] = nr
this.Ranges = append(this.Ranges, [2]int64{})
copy(this.Ranges[index+1:], this.Ranges[index:])
this.Ranges[index] = nr
break
}
}
if index == -1 {
index = count
this.ranges = append(this.ranges, nr)
this.Ranges = append(this.Ranges, nr)
}
this.merge(index)
}
// Ranges 获取所有范围
func (this *PartialRanges) Ranges() [][2]int64 {
return this.ranges
}
// Contains 检查是否包含某个范围
func (this *PartialRanges) Contains(begin int64, end int64) bool {
if len(this.ranges) == 0 {
return true
if len(this.Ranges) == 0 {
return false
}
// TODO 使用二分法查找改进性能
for _, r2 := range this.ranges {
for _, r2 := range this.Ranges {
if r2[0] <= begin && r2[1] >= end {
return true
}
@@ -86,21 +90,46 @@ func (this *PartialRanges) Contains(begin int64, end int64) bool {
// AsJSON 转换为JSON
func (this *PartialRanges) AsJSON() ([]byte, error) {
return json.Marshal(this.ranges)
return json.Marshal(this)
}
// WriteToFile 写入到文件中
func (this *PartialRanges) WriteToFile(path string) error {
data, err := this.AsJSON()
if err != nil {
return errors.New("convert to json failed: " + err.Error())
}
return ioutil.WriteFile(path, data, 0666)
}
// ReadFromFile 从文件中读取
func (this *PartialRanges) ReadFromFile(path string) (*PartialRanges, error) {
data, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
return NewPartialRangesFromJSON(data)
}
func (this *PartialRanges) Max() int64 {
if len(this.Ranges) > 0 {
return this.Ranges[len(this.Ranges)-1][1]
}
return 0
}
func (this *PartialRanges) merge(index int) {
// forward
var lastIndex = index
for i := index; i >= 1; i-- {
var curr = this.ranges[i]
var prev = this.ranges[i-1]
var curr = this.Ranges[i]
var prev = this.Ranges[i-1]
var w1 = this.w(curr)
var w2 = this.w(prev)
if w1+w2 >= this.max(curr[1], prev[1])-this.min(curr[0], prev[0])-1 {
prev = [2]int64{this.min(curr[0], prev[0]), this.max(curr[1], prev[1])}
this.ranges[i-1] = prev
this.ranges = append(this.ranges[:i], this.ranges[i+1:]...)
this.Ranges[i-1] = prev
this.Ranges = append(this.Ranges[:i], this.Ranges[i+1:]...)
lastIndex = i - 1
} else {
break
@@ -109,15 +138,15 @@ func (this *PartialRanges) merge(index int) {
// backward
index = lastIndex
for index < len(this.ranges)-1 {
var curr = this.ranges[index]
var next = this.ranges[index+1]
for index < len(this.Ranges)-1 {
var curr = this.Ranges[index]
var next = this.Ranges[index+1]
var w1 = this.w(curr)
var w2 = this.w(next)
if w1+w2 >= this.max(curr[1], next[1])-this.min(curr[0], next[0])-1 {
curr = [2]int64{this.min(curr[0], next[0]), this.max(curr[1], next[1])}
this.ranges = append(this.ranges[:index], this.ranges[index+1:]...)
this.ranges[index] = curr
this.Ranges = append(this.Ranges[:index], this.Ranges[index+1:]...)
this.Ranges[index] = curr
} else {
break
}

View File

@@ -21,7 +21,8 @@ func TestNewPartialRanges(t *testing.T) {
r.Add(200, 1000)
r.Add(200, 10040)
logs.PrintAsJSON(r.Ranges())
logs.PrintAsJSON(r.Ranges, t)
t.Log("max:", r.Max())
}
func TestNewPartialRanges1(t *testing.T) {
@@ -35,7 +36,7 @@ func TestNewPartialRanges1(t *testing.T) {
r.Add(200, 300)
r.Add(1, 1000)
var rs = r.Ranges()
var rs = r.Ranges
logs.PrintAsJSON(rs, t)
a.IsTrue(len(rs) == 1)
if len(rs) == 1 {
@@ -56,7 +57,7 @@ func TestNewPartialRanges2(t *testing.T) {
r.Add(303, 304)
r.Add(250, 400)
var rs = r.Ranges()
var rs = r.Ranges
logs.PrintAsJSON(rs, t)
}
@@ -68,7 +69,7 @@ func TestNewPartialRanges3(t *testing.T) {
r.Add(200, 300)
r.Add(250, 400)
var rs = r.Ranges()
var rs = r.Ranges
logs.PrintAsJSON(rs, t)
}
@@ -83,7 +84,7 @@ func TestNewPartialRanges4(t *testing.T) {
r.Add(410, 415)
r.Add(400, 409)
var rs = r.Ranges()
var rs = r.Ranges
logs.PrintAsJSON(rs, t)
t.Log(r.Contains(400, 416))
}
@@ -93,7 +94,7 @@ func TestNewPartialRanges5(t *testing.T) {
for j := 0; j < 1000; j++ {
r.Add(int64(j), int64(j+100))
}
logs.PrintAsJSON(r.Ranges(), t)
logs.PrintAsJSON(r.Ranges, t)
}
func TestNewPartialRanges_AsJSON(t *testing.T) {
@@ -111,7 +112,7 @@ func TestNewPartialRanges_AsJSON(t *testing.T) {
if err != nil {
t.Fatal(err)
}
t.Log(r2.Ranges())
t.Log(r2.Ranges)
}
func BenchmarkNewPartialRanges(b *testing.B) {

View File

@@ -1,5 +1,7 @@
package caches
import "github.com/TeaOSLab/EdgeNode/internal/utils/ranges"
type ReaderFunc func(n int) (goNext bool, err error)
type Reader interface {
@@ -36,6 +38,9 @@ type Reader interface {
// BodySize Body Size
BodySize() int64
// ContainsRange 是否包含某个区间内容
ContainsRange(r rangeutils.Range) bool
// Close 关闭
Close() error
}

View File

@@ -3,6 +3,7 @@ package caches
import (
"encoding/binary"
"errors"
rangeutils "github.com/TeaOSLab/EdgeNode/internal/utils/ranges"
"github.com/iwind/TeaGo/types"
"io"
"os"
@@ -332,6 +333,11 @@ func (this *FileReader) ReadBodyRange(buf []byte, start int64, end int64, callba
return nil
}
// ContainsRange 是否包含某些区间内容
func (this *FileReader) ContainsRange(r rangeutils.Range) bool {
return true
}
func (this *FileReader) Close() error {
if this.openFileCache != nil {
if this.isClosed {

View File

@@ -2,6 +2,7 @@ package caches
import (
"errors"
rangeutils "github.com/TeaOSLab/EdgeNode/internal/utils/ranges"
"io"
)
@@ -197,6 +198,11 @@ func (this *MemoryReader) ReadBodyRange(buf []byte, start int64, end int64, call
return nil
}
// ContainsRange 是否包含某些区间内容
func (this *MemoryReader) ContainsRange(r rangeutils.Range) bool {
return true
}
func (this *MemoryReader) Close() error {
return nil
}

View File

@@ -0,0 +1,142 @@
package caches
import (
"encoding/binary"
"errors"
rangeutils "github.com/TeaOSLab/EdgeNode/internal/utils/ranges"
"github.com/iwind/TeaGo/types"
"io"
"os"
"strings"
)
type PartialFileReader struct {
*FileReader
ranges *PartialRanges
rangePath string
}
func NewPartialFileReader(fp *os.File) *PartialFileReader {
// range path
var path = fp.Name()
var dotIndex = strings.LastIndex(path, ".")
var rangePath = ""
if dotIndex < 0 {
rangePath = path + "@ranges.cache"
} else {
rangePath = path[:dotIndex] + "@ranges" + path[dotIndex:]
}
return &PartialFileReader{
FileReader: NewFileReader(fp),
rangePath: rangePath,
}
}
func (this *PartialFileReader) Init() error {
return this.InitAutoDiscard(true)
}
func (this *PartialFileReader) InitAutoDiscard(autoDiscard bool) error {
if this.openFile != nil {
this.meta = this.openFile.meta
this.header = this.openFile.header
}
isOk := false
if autoDiscard {
defer func() {
if !isOk {
_ = this.discard()
}
}()
}
// 读取Range
ranges, err := NewPartialRangesFromFile(this.rangePath)
if err != nil {
return errors.New("read ranges failed: " + err.Error())
}
this.ranges = ranges
var buf = this.meta
if len(buf) == 0 {
buf = make([]byte, SizeMeta)
ok, err := this.readToBuff(this.fp, buf)
if err != nil {
return err
}
if !ok {
return ErrNotFound
}
this.meta = buf
}
this.expiresAt = int64(binary.BigEndian.Uint32(buf[:SizeExpiresAt]))
status := types.Int(string(buf[SizeExpiresAt : SizeExpiresAt+SizeStatus]))
if status < 100 || status > 999 {
return errors.New("invalid status")
}
this.status = status
// URL
urlLength := binary.BigEndian.Uint32(buf[SizeExpiresAt+SizeStatus : SizeExpiresAt+SizeStatus+SizeURLLength])
// header
headerSize := int(binary.BigEndian.Uint32(buf[SizeExpiresAt+SizeStatus+SizeURLLength : SizeExpiresAt+SizeStatus+SizeURLLength+SizeHeaderLength]))
if headerSize == 0 {
return nil
}
this.headerSize = headerSize
this.headerOffset = int64(SizeMeta) + int64(urlLength)
// body
this.bodyOffset = this.headerOffset + int64(headerSize)
bodySize := int(binary.BigEndian.Uint64(buf[SizeExpiresAt+SizeStatus+SizeURLLength+SizeHeaderLength : SizeExpiresAt+SizeStatus+SizeURLLength+SizeHeaderLength+SizeBodyLength]))
if bodySize == 0 {
isOk = true
return nil
}
this.bodySize = int64(bodySize)
// read header
if this.openFileCache != nil && len(this.header) == 0 {
if headerSize > 0 && headerSize <= 512 {
this.header = make([]byte, headerSize)
_, err := this.fp.Seek(this.headerOffset, io.SeekStart)
if err != nil {
return err
}
_, err = this.readToBuff(this.fp, this.header)
if err != nil {
return err
}
}
}
isOk = true
return nil
}
// ContainsRange 是否包含某些区间内容
// 这里的 r 是已经经过格式化的
func (this *PartialFileReader) ContainsRange(r rangeutils.Range) bool {
return this.ranges.Contains(r.Start(), r.End())
}
// MaxLength 获取区间最大长度
func (this *PartialFileReader) MaxLength() int64 {
if this.bodySize > 0 {
return this.bodySize
}
return this.ranges.Max() + 1
}
func (this *PartialFileReader) discard() error {
_ = os.Remove(this.rangePath)
return this.FileReader.discard()
}

View File

@@ -216,19 +216,24 @@ func (this *FileStorage) Init() error {
return nil
}
func (this *FileStorage) OpenReader(key string, useStale bool) (Reader, error) {
return this.openReader(key, true, useStale)
func (this *FileStorage) OpenReader(key string, useStale bool, isPartial bool) (Reader, error) {
return this.openReader(key, true, useStale, isPartial)
}
func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool) (Reader, error) {
func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool, isPartial bool) (Reader, error) {
// 使用陈旧缓存的时候,我们认为是短暂的,只需要从文件里检查即可
if useStale {
allowMemory = false
}
// 区间缓存只存在文件中
if isPartial {
allowMemory = false
}
// 先尝试内存缓存
if allowMemory && this.memoryStorage != nil {
reader, err := this.memoryStorage.OpenReader(key, useStale)
reader, err := this.memoryStorage.OpenReader(key, useStale, isPartial)
if err == nil {
return reader, err
}
@@ -273,9 +278,18 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool)
}
}()
var reader = NewFileReader(fp)
reader.openFile = openFile
reader.openFileCache = this.openFileCache
var reader Reader
if isPartial {
var partialFileReader = NewPartialFileReader(fp)
partialFileReader.openFile = openFile
partialFileReader.openFileCache = this.openFileCache
reader = partialFileReader
} else {
var fileReader = NewFileReader(fp)
fileReader.openFile = openFile
fileReader.openFileCache = this.openFileCache
reader = fileReader
}
err = reader.Init()
if err != nil {
return nil, err
@@ -284,40 +298,7 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool)
// 增加点击量
// 1/1000采样
if allowMemory {
var rate = this.policy.PersistenceHitSampleRate
if rate <= 0 {
rate = 1000
}
if this.lastHotSize == 0 {
// 自动降低采样率来增加热点数据的缓存几率
rate = rate / 10
}
if rands.Int(0, rate) == 0 {
var hitErr = this.list.IncreaseHit(hash)
if hitErr != nil {
// 此错误可以忽略
remotelogs.Error("CACHE", "increase hit failed: "+hitErr.Error())
}
// 增加到热点
// 这里不收录缓存尺寸过大的文件
if this.memoryStorage != nil && reader.BodySize() > 0 && reader.BodySize() < 128*1024*1024 {
this.hotMapLocker.Lock()
hotItem, ok := this.hotMap[key]
if ok {
hotItem.Hits++
hotItem.ExpiresAt = reader.expiresAt
} else if len(this.hotMap) < HotItemSize { // 控制数量
this.hotMap[key] = &HotItem{
Key: key,
ExpiresAt: reader.ExpiresAt(),
Status: reader.Status(),
Hits: 1,
}
}
this.hotMapLocker.Unlock()
}
}
this.increaseHit(key, hash, reader)
}
isOk = true
@@ -380,7 +361,8 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, siz
}
// 检查缓存是否已经生成
var cachePath = dir + "/" + hash + ".cache"
var cachePathName = dir + "/" + hash
var cachePath = cachePathName + ".cache"
stat, err := os.Stat(cachePath)
if err == nil && time.Now().Sub(stat.ModTime()) <= 1*time.Second {
// 防止并发连续写入
@@ -388,7 +370,7 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, siz
}
var tmpPath = cachePath + ".tmp"
if isPartial {
tmpPath = cachePath
tmpPath = cachePathName + ".cache"
}
// 先删除
@@ -502,7 +484,12 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, siz
isOk = true
if isPartial {
return NewPartialFileWriter(writer, key, expiredAt, isNewCreated, isPartial, partialBodyOffset, func() {
ranges, err := NewPartialRangesFromFile(cachePathName + "@ranges.cache")
if err != nil {
ranges = NewPartialRanges()
}
return NewPartialFileWriter(writer, key, expiredAt, isNewCreated, isPartial, partialBodyOffset, ranges, func() {
sharedWritingFileKeyLocker.Lock()
delete(sharedWritingFileKeyMap, key)
sharedWritingFileKeyLocker.Unlock()
@@ -923,7 +910,7 @@ func (this *FileStorage) hotLoop() {
var buf = utils.BytePool16k.Get()
defer utils.BytePool16k.Put(buf)
for _, item := range result[:size] {
reader, err := this.openReader(item.Key, false, false)
reader, err := this.openReader(item.Key, false, false, false)
if err != nil {
continue
}
@@ -1025,3 +1012,41 @@ func (this *FileStorage) cleanDeletedDirs(dir string) error {
}
return nil
}
// 增加某个Key的点击量
func (this *FileStorage) increaseHit(key string, hash string, reader Reader) {
var rate = this.policy.PersistenceHitSampleRate
if rate <= 0 {
rate = 1000
}
if this.lastHotSize == 0 {
// 自动降低采样率来增加热点数据的缓存几率
rate = rate / 10
}
if rands.Int(0, rate) == 0 {
var hitErr = this.list.IncreaseHit(hash)
if hitErr != nil {
// 此错误可以忽略
remotelogs.Error("CACHE", "increase hit failed: "+hitErr.Error())
}
// 增加到热点
// 这里不收录缓存尺寸过大的文件
if this.memoryStorage != nil && reader.BodySize() > 0 && reader.BodySize() < 128*1024*1024 {
this.hotMapLocker.Lock()
hotItem, ok := this.hotMap[key]
if ok {
hotItem.Hits++
hotItem.ExpiresAt = reader.ExpiresAt()
} else if len(this.hotMap) < HotItemSize { // 控制数量
this.hotMap[key] = &HotItem{
Key: key,
ExpiresAt: reader.ExpiresAt(),
Status: reader.Status(),
Hits: 1,
}
}
this.hotMapLocker.Unlock()
}
}
}

View File

@@ -110,7 +110,7 @@ func TestFileStorage_OpenWriter_Partial(t *testing.T) {
t.Fatal(err)
}
err = writer.WriteAt([]byte("Hello, World"), 0)
err = writer.WriteAt(0, []byte("Hello, World"))
if err != nil {
t.Fatal(err)
}
@@ -311,7 +311,7 @@ func TestFileStorage_Read(t *testing.T) {
t.Fatal(err)
}
now := time.Now()
reader, err := storage.OpenReader("my-key", false)
reader, err := storage.OpenReader("my-key", false, false)
if err != nil {
t.Fatal(err)
}
@@ -347,7 +347,7 @@ func TestFileStorage_Read_HTTP_Response(t *testing.T) {
t.Fatal(err)
}
now := time.Now()
reader, err := storage.OpenReader("my-http-response", false)
reader, err := storage.OpenReader("my-http-response", false, false)
if err != nil {
t.Fatal(err)
}
@@ -401,7 +401,7 @@ func TestFileStorage_Read_NotFound(t *testing.T) {
}
now := time.Now()
buf := make([]byte, 6)
reader, err := storage.OpenReader("my-key-10000", false)
reader, err := storage.OpenReader("my-key-10000", false, false)
if err != nil {
if err == ErrNotFound {
t.Log("cache not fund")
@@ -543,7 +543,7 @@ func BenchmarkFileStorage_Read(b *testing.B) {
b.Fatal(err)
}
for i := 0; i < b.N; i++ {
reader, err := storage.OpenReader("my-key", false)
reader, err := storage.OpenReader("my-key", false, false)
if err != nil {
b.Fatal(err)
}

View File

@@ -10,7 +10,7 @@ type StorageInterface interface {
Init() error
// OpenReader 读取缓存
OpenReader(key string, useStale bool) (reader Reader, err error)
OpenReader(key string, useStale bool, isPartial bool) (reader Reader, err error)
// OpenWriter 打开缓存写入器等待写入
OpenWriter(key string, expiredAt int64, status int, size int64, isPartial bool) (Writer, error)

View File

@@ -105,7 +105,7 @@ func (this *MemoryStorage) Init() error {
}
// OpenReader 读取缓存
func (this *MemoryStorage) OpenReader(key string, useStale bool) (Reader, error) {
func (this *MemoryStorage) OpenReader(key string, useStale bool, isPartial bool) (Reader, error) {
hash := this.hash(key)
this.locker.RLock()

View File

@@ -25,7 +25,7 @@ func TestMemoryStorage_OpenWriter(t *testing.T) {
t.Log(storage.valuesMap)
{
reader, err := storage.OpenReader("abc", false)
reader, err := storage.OpenReader("abc", false, false)
if err != nil {
if err == ErrNotFound {
t.Log("not found: abc")
@@ -52,7 +52,7 @@ func TestMemoryStorage_OpenWriter(t *testing.T) {
}
{
_, err := storage.OpenReader("abc 2", false)
_, err := storage.OpenReader("abc 2", false, false)
if err != nil {
if err == ErrNotFound {
t.Log("not found: abc2")
@@ -68,7 +68,7 @@ func TestMemoryStorage_OpenWriter(t *testing.T) {
}
_, _ = writer.Write([]byte("Hello123"))
{
reader, err := storage.OpenReader("abc", false)
reader, err := storage.OpenReader("abc", false, false)
if err != nil {
if err == ErrNotFound {
t.Log("not found: abc")
@@ -97,7 +97,7 @@ func TestMemoryStorage_OpenReaderLock(t *testing.T) {
IsDone: true,
},
}
_, _ = storage.OpenReader("test", false)
_, _ = storage.OpenReader("test", false, false)
}
func TestMemoryStorage_Delete(t *testing.T) {

View File

@@ -9,7 +9,7 @@ type Writer interface {
Write(data []byte) (n int, err error)
// WriteAt 在指定位置写入数据
WriteAt(data []byte, offset int64) error
WriteAt(offset int64, data []byte) error
// HeaderSize 写入的Header数据大小
HeaderSize() int64

View File

@@ -67,7 +67,7 @@ func (this *FileWriter) Write(data []byte) (n int, err error) {
}
// WriteAt 在指定位置写入数据
func (this *FileWriter) WriteAt(data []byte, offset int64) error {
func (this *FileWriter) WriteAt(offset int64, data []byte) error {
_ = data
_ = offset
return errors.New("not supported")

View File

@@ -57,7 +57,7 @@ func (this *MemoryWriter) Write(data []byte) (n int, err error) {
}
// WriteAt 在指定位置写入数据
func (this *MemoryWriter) WriteAt(b []byte, offset int64) error {
func (this *MemoryWriter) WriteAt(offset int64, b []byte) error {
_ = b
_ = offset
return errors.New("not supported")

View File

@@ -23,9 +23,22 @@ type PartialFileWriter struct {
isNew bool
isPartial bool
bodyOffset int64
ranges *PartialRanges
rangePath string
}
func NewPartialFileWriter(rawWriter *os.File, key string, expiredAt int64, isNew bool, isPartial bool, bodyOffset int64, endFunc func()) *PartialFileWriter {
func NewPartialFileWriter(rawWriter *os.File, key string, expiredAt int64, isNew bool, isPartial bool, bodyOffset int64, ranges *PartialRanges, endFunc func()) *PartialFileWriter {
var path = rawWriter.Name()
// ranges路径
var dotIndex = strings.LastIndex(path, ".")
var rangePath = ""
if dotIndex < 0 {
rangePath = path + "@ranges.cache"
} else {
rangePath = path[:dotIndex] + "@ranges" + path[dotIndex:]
}
return &PartialFileWriter{
key: key,
rawWriter: rawWriter,
@@ -34,6 +47,8 @@ func NewPartialFileWriter(rawWriter *os.File, key string, expiredAt int64, isNew
isNew: isNew,
isPartial: isPartial,
bodyOffset: bodyOffset,
ranges: ranges,
rangePath: rangePath,
}
}
@@ -50,6 +65,21 @@ func (this *PartialFileWriter) WriteHeader(data []byte) (n int, err error) {
return
}
func (this *PartialFileWriter) AppendHeader(data []byte) error {
_, err := this.rawWriter.Write(data)
if err != nil {
_ = this.Discard()
} else {
var c = len(data)
this.headerSize += int64(c)
err = this.WriteHeaderLength(int(this.headerSize))
if err != nil {
_ = this.Discard()
}
}
return err
}
// WriteHeaderLength 写入Header长度数据
func (this *PartialFileWriter) WriteHeaderLength(headerLength int) error {
bytes4 := make([]byte, 4)
@@ -78,12 +108,34 @@ func (this *PartialFileWriter) Write(data []byte) (n int, err error) {
}
// WriteAt 在指定位置写入数据
func (this *PartialFileWriter) WriteAt(data []byte, offset int64) error {
func (this *PartialFileWriter) WriteAt(offset int64, data []byte) error {
var c = int64(len(data))
if c == 0 {
return nil
}
var end = offset + c - 1
// 是否已包含在内
if this.ranges.Contains(offset, end) {
return nil
}
if this.bodyOffset == 0 {
this.bodyOffset = SizeMeta + int64(len(this.key)) + this.headerSize
}
_, err := this.rawWriter.WriteAt(data, this.bodyOffset+offset)
return err
if err != nil {
return err
}
this.ranges.Add(offset, end)
return nil
}
// SetBodyLength 设置内容总长度
func (this *PartialFileWriter) SetBodyLength(bodyLength int64) {
this.bodySize = bodyLength
}
// WriteBodyLength 写入Body长度数据
@@ -109,31 +161,30 @@ func (this *PartialFileWriter) Close() error {
this.endFunc()
})
var path = this.rawWriter.Name()
err := this.ranges.WriteToFile(this.rangePath)
if err != nil {
return err
}
// 关闭当前writer
if this.isNew {
err := this.WriteHeaderLength(types.Int(this.headerSize))
err = this.WriteHeaderLength(types.Int(this.headerSize))
if err != nil {
_ = this.rawWriter.Close()
_ = os.Remove(path)
this.remove()
return err
}
err = this.WriteBodyLength(this.bodySize)
if err != nil {
_ = this.rawWriter.Close()
_ = os.Remove(path)
this.remove()
return err
}
}
err := this.rawWriter.Close()
err = this.rawWriter.Close()
if err != nil {
_ = os.Remove(path)
} else if !this.isPartial {
err = os.Rename(path, strings.Replace(path, ".tmp", "", 1))
if err != nil {
_ = os.Remove(path)
}
this.remove()
}
return err
@@ -147,6 +198,8 @@ func (this *PartialFileWriter) Discard() error {
_ = this.rawWriter.Close()
_ = os.Remove(this.rangePath)
err := os.Remove(this.rawWriter.Name())
return err
}
@@ -171,3 +224,12 @@ func (this *PartialFileWriter) Key() string {
func (this *PartialFileWriter) ItemType() ItemType {
return ItemTypeFile
}
func (this *PartialFileWriter) IsNew() bool {
return this.isNew && len(this.ranges.Ranges) == 0
}
func (this *PartialFileWriter) remove() {
_ = os.Remove(this.rawWriter.Name())
_ = os.Remove(this.rangePath)
}

View File

@@ -11,8 +11,8 @@ import (
"time"
)
func TestPartialFileWriter_SeekOffset(t *testing.T) {
var path = "/tmp/test@partial.cache"
func TestPartialFileWriter_Write(t *testing.T) {
var path = "/tmp/test_partial.cache"
_ = os.Remove(path)
var reader = func() {
@@ -27,7 +27,8 @@ func TestPartialFileWriter_SeekOffset(t *testing.T) {
if err != nil {
t.Fatal(err)
}
var writer = caches.NewPartialFileWriter(fp, "test", time.Now().Unix()+86500, true, true, 0, func() {
var ranges = caches.NewPartialRanges()
var writer = caches.NewPartialFileWriter(fp, "test", time.Now().Unix()+86500, true, true, 0, ranges, func() {
t.Log("end")
})
_, err = writer.WriteHeader([]byte("header"))
@@ -36,7 +37,7 @@ func TestPartialFileWriter_SeekOffset(t *testing.T) {
}
// 移动位置
err = writer.WriteAt([]byte("HELLO"), 100)
err = writer.WriteAt(100, []byte("HELLO"))
if err != nil {
t.Fatal(err)
}