diff --git a/internal/caches/partial_ranges.go b/internal/caches/partial_ranges.go new file mode 100644 index 0000000..d5f94f8 --- /dev/null +++ b/internal/caches/partial_ranges.go @@ -0,0 +1,143 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package caches + +import ( + "encoding/json" +) + +// PartialRanges 内容分区范围定义 +type PartialRanges struct { + ranges [][2]int64 +} + +// NewPartialRanges 获取新对象 +func NewPartialRanges() *PartialRanges { + return &PartialRanges{ranges: [][2]int64{}} +} + +// NewPartialRangesFromJSON 从JSON中解析范围 +func NewPartialRangesFromJSON(data []byte) (*PartialRanges, error) { + var rs = [][2]int64{} + err := json.Unmarshal(data, &rs) + if err != nil { + return nil, err + } + var r = NewPartialRanges() + r.ranges = rs + return r, nil +} + +// Add 添加新范围 +func (this *PartialRanges) Add(begin int64, end int64) { + if begin > end { + begin, end = end, begin + } + + var nr = [2]int64{begin, end} + + var count = len(this.ranges) + if count == 0 { + this.ranges = [][2]int64{nr} + return + } + + // insert + // TODO 将来使用二分法改进 + var index = -1 + for i, r := range this.ranges { + if r[0] > begin || (r[0] == begin && r[1] >= end) { + index = i + this.ranges = append(this.ranges, [2]int64{}) + copy(this.ranges[index+1:], this.ranges[index:]) + this.ranges[index] = nr + break + } + } + + if index == -1 { + index = count + this.ranges = append(this.ranges, nr) + } + + this.merge(index) +} + +// Ranges 获取所有范围 +func (this *PartialRanges) Ranges() [][2]int64 { + return this.ranges +} + +// Contains 检查是否包含某个范围 +func (this *PartialRanges) Contains(begin int64, end int64) bool { + if len(this.ranges) == 0 { + return true + } + + // TODO 使用二分法查找改进性能 + for _, r2 := range this.ranges { + if r2[0] <= begin && r2[1] >= end { + return true + } + } + + return false +} + +// AsJSON 转换为JSON +func (this *PartialRanges) AsJSON() ([]byte, error) { + return json.Marshal(this.ranges) +} + +func (this *PartialRanges) merge(index int) { + // forward + var lastIndex = index + for i := index; i >= 1; i-- { + var curr = this.ranges[i] + var prev = this.ranges[i-1] + var w1 = this.w(curr) + var w2 = this.w(prev) + if w1+w2 >= this.max(curr[1], prev[1])-this.min(curr[0], prev[0])-1 { + prev = [2]int64{this.min(curr[0], prev[0]), this.max(curr[1], prev[1])} + this.ranges[i-1] = prev + this.ranges = append(this.ranges[:i], this.ranges[i+1:]...) + lastIndex = i - 1 + } else { + break + } + } + + // backward + index = lastIndex + for index < len(this.ranges)-1 { + var curr = this.ranges[index] + var next = this.ranges[index+1] + var w1 = this.w(curr) + var w2 = this.w(next) + if w1+w2 >= this.max(curr[1], next[1])-this.min(curr[0], next[0])-1 { + curr = [2]int64{this.min(curr[0], next[0]), this.max(curr[1], next[1])} + this.ranges = append(this.ranges[:index], this.ranges[index+1:]...) + this.ranges[index] = curr + } else { + break + } + } +} + +func (this *PartialRanges) w(r [2]int64) int64 { + return r[1] - r[0] +} + +func (this *PartialRanges) min(n1 int64, n2 int64) int64 { + if n1 <= n2 { + return n1 + } + return n2 +} + +func (this *PartialRanges) max(n1 int64, n2 int64) int64 { + if n1 >= n2 { + return n1 + } + return n2 +} diff --git a/internal/caches/partial_ranges_test.go b/internal/caches/partial_ranges_test.go new file mode 100644 index 0000000..5476970 --- /dev/null +++ b/internal/caches/partial_ranges_test.go @@ -0,0 +1,124 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package caches_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/caches" + "github.com/iwind/TeaGo/assert" + "github.com/iwind/TeaGo/logs" + "testing" +) + +func TestNewPartialRanges(t *testing.T) { + var r = caches.NewPartialRanges() + r.Add(1, 100) + r.Add(50, 300) + + r.Add(30, 80) + r.Add(30, 100) + r.Add(30, 400) + r.Add(1000, 10000) + r.Add(200, 1000) + r.Add(200, 10040) + + logs.PrintAsJSON(r.Ranges()) +} + +func TestNewPartialRanges1(t *testing.T) { + var a = assert.NewAssertion(t) + + var r = caches.NewPartialRanges() + r.Add(1, 100) + r.Add(1, 101) + r.Add(1, 102) + r.Add(2, 103) + r.Add(200, 300) + r.Add(1, 1000) + + var rs = r.Ranges() + logs.PrintAsJSON(rs, t) + a.IsTrue(len(rs) == 1) + if len(rs) == 1 { + a.IsTrue(rs[0][0] == 1) + a.IsTrue(rs[0][1] == 1000) + } +} + +func TestNewPartialRanges2(t *testing.T) { + // low -> high + var r = caches.NewPartialRanges() + r.Add(1, 100) + r.Add(1, 101) + r.Add(1, 102) + r.Add(2, 103) + r.Add(200, 300) + r.Add(301, 302) + r.Add(303, 304) + r.Add(250, 400) + + var rs = r.Ranges() + logs.PrintAsJSON(rs, t) +} + +func TestNewPartialRanges3(t *testing.T) { + // high -> low + var r = caches.NewPartialRanges() + r.Add(301, 302) + r.Add(303, 304) + r.Add(200, 300) + r.Add(250, 400) + + var rs = r.Ranges() + logs.PrintAsJSON(rs, t) +} + +func TestNewPartialRanges4(t *testing.T) { + // nearby + var r = caches.NewPartialRanges() + r.Add(301, 302) + r.Add(303, 304) + r.Add(305, 306) + + r.Add(417, 417) + r.Add(410, 415) + r.Add(400, 409) + + var rs = r.Ranges() + logs.PrintAsJSON(rs, t) + t.Log(r.Contains(400, 416)) +} + +func TestNewPartialRanges5(t *testing.T) { + var r = caches.NewPartialRanges() + for j := 0; j < 1000; j++ { + r.Add(int64(j), int64(j+100)) + } + logs.PrintAsJSON(r.Ranges(), t) +} + +func TestNewPartialRanges_AsJSON(t *testing.T) { + var r = caches.NewPartialRanges() + for j := 0; j < 1000; j++ { + r.Add(int64(j), int64(j+100)) + } + data, err := r.AsJSON() + if err != nil { + t.Fatal(err) + } + t.Log(string(data)) + + r2, err := caches.NewPartialRangesFromJSON(data) + if err != nil { + t.Fatal(err) + } + t.Log(r2.Ranges()) +} + +func BenchmarkNewPartialRanges(b *testing.B) { + for i := 0; i < b.N; i++ { + var r = caches.NewPartialRanges() + for j := 0; j < 1000; j++ { + r.Add(int64(j), int64(j+100)) + } + } +} diff --git a/internal/caches/reader_file.go b/internal/caches/reader_file.go index 781b64f..6fec531 100644 --- a/internal/caches/reader_file.go +++ b/internal/caches/reader_file.go @@ -32,6 +32,10 @@ func NewFileReader(fp *os.File) *FileReader { } func (this *FileReader) Init() error { + return this.InitAutoDiscard(true) +} + +func (this *FileReader) InitAutoDiscard(autoDiscard bool) error { if this.openFile != nil { this.meta = this.openFile.meta this.header = this.openFile.header @@ -39,11 +43,13 @@ func (this *FileReader) Init() error { isOk := false - defer func() { - if !isOk { - _ = this.discard() - } - }() + if autoDiscard { + defer func() { + if !isOk { + _ = this.discard() + } + }() + } var buf = this.meta if len(buf) == 0 { @@ -78,13 +84,13 @@ func (this *FileReader) Init() error { this.headerOffset = int64(SizeMeta) + int64(urlLength) // body + this.bodyOffset = this.headerOffset + int64(headerSize) bodySize := int(binary.BigEndian.Uint64(buf[SizeExpiresAt+SizeStatus+SizeURLLength+SizeHeaderLength : SizeExpiresAt+SizeStatus+SizeURLLength+SizeHeaderLength+SizeBodyLength])) if bodySize == 0 { isOk = true return nil } this.bodySize = int64(bodySize) - this.bodyOffset = this.headerOffset + int64(headerSize) // read header if this.openFileCache != nil && len(this.header) == 0 { diff --git a/internal/caches/reader_file_test.go b/internal/caches/reader_file_test.go index f557442..9b1b6ed 100644 --- a/internal/caches/reader_file_test.go +++ b/internal/caches/reader_file_test.go @@ -54,6 +54,30 @@ func TestFileReader(t *testing.T) { } } +func TestFileReader_ReadHeader(t *testing.T) { + var path = "/Users/WorkSpace/EdgeProject/EdgeCache/p43/12/6b/126bbed90fc80f2bdfb19558948b0d49.cache" + fp, err := os.Open(path) + if err != nil { + t.Fatal(err) + } + defer func() { + _ = fp.Close() + }() + var reader = NewFileReader(fp) + err = reader.Init() + if err != nil { + t.Fatal(err) + } + var buf = make([]byte, 16*1024) + err = reader.ReadHeader(buf, func(n int) (goNext bool, err error) { + t.Log("header:", string(buf[:n])) + return + }) + if err != nil { + t.Fatal(err) + } +} + func TestFileReader_Range(t *testing.T) { storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ Id: 1, diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index d0e721a..b6c5ac5 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -325,11 +325,11 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool) } // OpenWriter 打开缓存文件等待写入 -func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, size int64) (Writer, error) { +func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, size int64, isPartial bool) (Writer, error) { // 先尝试内存缓存 // 我们限定仅小文件优先存在内存中 - if this.memoryStorage != nil && size > 0 && size < 32*1024*1024 { - writer, err := this.memoryStorage.OpenWriter(key, expiredAt, status, size) + if !isPartial && this.memoryStorage != nil && size > 0 && size < 32*1024*1024 { + writer, err := this.memoryStorage.OpenWriter(key, expiredAt, status, size, false) if err == nil { return writer, nil } @@ -361,13 +361,13 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, siz if this.policy.MaxKeys > 0 && count > this.policy.MaxKeys { return nil, NewCapacityError("write file cache failed: too many keys in cache storage") } - capacityBytes := this.diskCapacityBytes() + var capacityBytes = this.diskCapacityBytes() if capacityBytes > 0 && capacityBytes <= this.totalSize { return nil, NewCapacityError("write file cache failed: over disk size, current total size: " + strconv.FormatInt(this.totalSize, 10) + " bytes, capacity: " + strconv.FormatInt(capacityBytes, 10)) } - hash := stringutil.Md5(key) - dir := this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4] + var hash = stringutil.Md5(key) + var dir = this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4] _, err = os.Stat(dir) if err != nil { if !os.IsNotExist(err) { @@ -387,6 +387,9 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, siz return nil, ErrFileIsWriting } var tmpPath = cachePath + ".tmp" + if isPartial { + tmpPath = cachePath + } // 先删除 err = this.list.Remove(hash) @@ -421,70 +424,96 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, siz return nil, ErrFileIsWriting } - err = writer.Truncate(0) - if err != nil { - return nil, err - } - - // 写入过期时间 - bytes4 := make([]byte, 4) - { - binary.BigEndian.PutUint32(bytes4, uint32(expiredAt)) - _, err = writer.Write(bytes4) - if err != nil { - return nil, err + // 是否已经有内容 + var isNewCreated = true + var partialBodyOffset int64 + if isPartial { + partialFP, err := os.OpenFile(tmpPath, os.O_RDONLY, 0444) + if err == nil { + var partialReader = NewFileReader(partialFP) + err = partialReader.InitAutoDiscard(false) + if err == nil && partialReader.bodyOffset > 0 { + isNewCreated = false + partialBodyOffset = partialReader.bodyOffset + } + _ = partialReader.Close() } } - // 写入状态码 - if status > 999 || status < 100 { - status = 200 - } - _, err = writer.WriteString(strconv.Itoa(status)) - if err != nil { - return nil, err - } - - // 写入URL长度 - { - binary.BigEndian.PutUint32(bytes4, uint32(len(key))) - _, err = writer.Write(bytes4) + if isNewCreated { + err = writer.Truncate(0) if err != nil { return nil, err } - } - // 写入Header Length - { - binary.BigEndian.PutUint32(bytes4, uint32(0)) - _, err = writer.Write(bytes4) + // 写入过期时间 + bytes4 := make([]byte, 4) + { + binary.BigEndian.PutUint32(bytes4, uint32(expiredAt)) + _, err = writer.Write(bytes4) + if err != nil { + return nil, err + } + } + + // 写入状态码 + if status > 999 || status < 100 { + status = 200 + } + _, err = writer.WriteString(strconv.Itoa(status)) if err != nil { return nil, err } - } - // 写入Body Length - { - b := make([]byte, SizeBodyLength) - binary.BigEndian.PutUint64(b, uint64(0)) - _, err = writer.Write(b) + // 写入URL长度 + { + binary.BigEndian.PutUint32(bytes4, uint32(len(key))) + _, err = writer.Write(bytes4) + if err != nil { + return nil, err + } + } + + // 写入Header Length + { + binary.BigEndian.PutUint32(bytes4, uint32(0)) + _, err = writer.Write(bytes4) + if err != nil { + return nil, err + } + } + + // 写入Body Length + { + b := make([]byte, SizeBodyLength) + binary.BigEndian.PutUint64(b, uint64(0)) + _, err = writer.Write(b) + if err != nil { + return nil, err + } + } + + // 写入URL + _, err = writer.WriteString(key) if err != nil { return nil, err } } - // 写入URL - _, err = writer.WriteString(key) - if err != nil { - return nil, err - } - isOk = true - return NewFileWriter(writer, key, expiredAt, func() { - sharedWritingFileKeyLocker.Lock() - delete(sharedWritingFileKeyMap, key) - sharedWritingFileKeyLocker.Unlock() - }), nil + if isPartial { + return NewPartialFileWriter(writer, key, expiredAt, isNewCreated, isPartial, partialBodyOffset, func() { + sharedWritingFileKeyLocker.Lock() + delete(sharedWritingFileKeyMap, key) + sharedWritingFileKeyLocker.Unlock() + }), nil + } else { + return NewFileWriter(writer, key, expiredAt, func() { + sharedWritingFileKeyLocker.Lock() + delete(sharedWritingFileKeyMap, key) + sharedWritingFileKeyLocker.Unlock() + }), nil + } } // AddToList 添加到List @@ -930,6 +959,9 @@ func (this *FileStorage) hotLoop() { err = reader.ReadBody(buf, func(n int) (goNext bool, err error) { _, err = writer.Write(buf[:n]) + if err == nil { + goNext = true + } return }) if err != nil { diff --git a/internal/caches/storage_file_test.go b/internal/caches/storage_file_test.go index e647360..38f3fc0 100644 --- a/internal/caches/storage_file_test.go +++ b/internal/caches/storage_file_test.go @@ -62,7 +62,7 @@ func TestFileStorage_OpenWriter(t *testing.T) { header := []byte("Header") body := []byte("This is Body") - writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200, -1) + writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200, -1, false) if err != nil { t.Fatal(err) } @@ -87,6 +87,41 @@ func TestFileStorage_OpenWriter(t *testing.T) { t.Log("ok") } +func TestFileStorage_OpenWriter_Partial(t *testing.T) { + var storage = NewFileStorage(&serverconfigs.HTTPCachePolicy{ + Id: 2, + IsOn: true, + Options: map[string]interface{}{ + "dir": Tea.Root + "/caches", + }, + }) + err := storage.Init() + if err != nil { + t.Fatal(err) + } + + writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200, -1, true) + if err != nil { + t.Fatal(err) + } + + _, err = writer.WriteHeader([]byte("Content-Type:text/html; charset=utf-8")) + if err != nil { + t.Fatal(err) + } + + err = writer.WriteAt([]byte("Hello, World"), 0) + if err != nil { + t.Fatal(err) + } + + err = writer.Close() + if err != nil { + t.Fatal(err) + } + t.Log(writer) +} + func TestFileStorage_OpenWriter_HTTP(t *testing.T) { storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ Id: 1, @@ -104,7 +139,7 @@ func TestFileStorage_OpenWriter_HTTP(t *testing.T) { t.Log(time.Since(now).Seconds()*1000, "ms") }() - writer, err := storage.OpenWriter("my-http-response", time.Now().Unix()+86400, 200, -1) + writer, err := storage.OpenWriter("my-http-response", time.Now().Unix()+86400, 200, -1, false) if err != nil { t.Fatal(err) } @@ -177,10 +212,11 @@ func TestFileStorage_Concurrent_Open_DifferentFile(t *testing.T) { go func(i int) { defer wg.Done() - writer, err := storage.OpenWriter("abc"+strconv.Itoa(i), time.Now().Unix()+3600, 200, -1) + writer, err := storage.OpenWriter("abc"+strconv.Itoa(i), time.Now().Unix()+3600, 200, -1, false) if err != nil { if err != ErrFileIsWriting { - t.Fatal(err) + t.Error(err) + return } return } @@ -188,7 +224,8 @@ func TestFileStorage_Concurrent_Open_DifferentFile(t *testing.T) { _, err = writer.Write([]byte("Hello,World")) if err != nil { - t.Fatal(err) + t.Error(err) + return } // 故意造成慢速写入 @@ -196,7 +233,8 @@ func TestFileStorage_Concurrent_Open_DifferentFile(t *testing.T) { err = writer.Close() if err != nil { - t.Fatal(err) + t.Error(err) + return } }(i) } @@ -229,10 +267,11 @@ func TestFileStorage_Concurrent_Open_SameFile(t *testing.T) { go func(i int) { defer wg.Done() - writer, err := storage.OpenWriter("abc"+strconv.Itoa(0), time.Now().Unix()+3600, 200, -1) + writer, err := storage.OpenWriter("abc"+strconv.Itoa(0), time.Now().Unix()+3600, 200, -1, false) if err != nil { if err != ErrFileIsWriting { - t.Fatal(err) + t.Error(err) + return } return } @@ -241,7 +280,8 @@ func TestFileStorage_Concurrent_Open_SameFile(t *testing.T) { t.Log("writing") _, err = writer.Write([]byte("Hello,World")) if err != nil { - t.Fatal(err) + t.Error(err) + return } // 故意造成慢速写入 @@ -249,7 +289,8 @@ func TestFileStorage_Concurrent_Open_SameFile(t *testing.T) { err = writer.Close() if err != nil { - t.Fatal(err) + t.Error(err) + return } }(i) } diff --git a/internal/caches/storage_interface.go b/internal/caches/storage_interface.go index 81e298e..2ed8647 100644 --- a/internal/caches/storage_interface.go +++ b/internal/caches/storage_interface.go @@ -13,7 +13,7 @@ type StorageInterface interface { OpenReader(key string, useStale bool) (reader Reader, err error) // OpenWriter 打开缓存写入器等待写入 - OpenWriter(key string, expiredAt int64, status int, size int64) (Writer, error) + OpenWriter(key string, expiredAt int64, status int, size int64, isPartial bool) (Writer, error) // Delete 删除某个键值对应的缓存 Delete(key string) error diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index ce7fd9e..9700fed 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -145,7 +145,11 @@ func (this *MemoryStorage) OpenReader(key string, useStale bool) (Reader, error) } // OpenWriter 打开缓存写入器等待写入 -func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int, size int64) (Writer, error) { +func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int, size int64, isPartial bool) (Writer, error) { + // TODO 内存缓存暂时不支持分块内容存储 + if isPartial { + return nil, ErrFileIsWriting + } return this.openWriter(key, expiredAt, status, size, true) } @@ -387,7 +391,7 @@ func (this *MemoryStorage) flushItem(key string) { return } - writer, err := this.parentStorage.OpenWriter(key, item.ExpiredAt, item.Status, -1) + writer, err := this.parentStorage.OpenWriter(key, item.ExpiredAt, item.Status, -1, false) if err != nil { if !CanIgnoreErr(err) { remotelogs.Error("CACHE", "flush items failed: open writer failed: "+err.Error()) diff --git a/internal/caches/storage_memory_test.go b/internal/caches/storage_memory_test.go index 2aae411..fe6da81 100644 --- a/internal/caches/storage_memory_test.go +++ b/internal/caches/storage_memory_test.go @@ -15,7 +15,7 @@ import ( func TestMemoryStorage_OpenWriter(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) - writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1) + writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1, false) if err != nil { t.Fatal(err) } @@ -62,7 +62,7 @@ func TestMemoryStorage_OpenWriter(t *testing.T) { } } - writer, err = storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1) + writer, err = storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1, false) if err != nil { t.Fatal(err) } @@ -103,7 +103,7 @@ func TestMemoryStorage_OpenReaderLock(t *testing.T) { func TestMemoryStorage_Delete(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) { - writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1) + writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1, false) if err != nil { t.Fatal(err) } @@ -111,7 +111,7 @@ func TestMemoryStorage_Delete(t *testing.T) { t.Log(len(storage.valuesMap)) } { - writer, err := storage.OpenWriter("abc1", time.Now().Unix()+60, 200, -1) + writer, err := storage.OpenWriter("abc1", time.Now().Unix()+60, 200, -1, false) if err != nil { t.Fatal(err) } @@ -126,7 +126,7 @@ func TestMemoryStorage_Stat(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) expiredAt := time.Now().Unix() + 60 { - writer, err := storage.OpenWriter("abc", expiredAt, 200, -1) + writer, err := storage.OpenWriter("abc", expiredAt, 200, -1, false) if err != nil { t.Fatal(err) } @@ -139,7 +139,7 @@ func TestMemoryStorage_Stat(t *testing.T) { }) } { - writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1) + writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1, false) if err != nil { t.Fatal(err) } @@ -163,7 +163,7 @@ func TestMemoryStorage_CleanAll(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) expiredAt := time.Now().Unix() + 60 { - writer, err := storage.OpenWriter("abc", expiredAt, 200, -1) + writer, err := storage.OpenWriter("abc", expiredAt, 200, -1, false) if err != nil { t.Fatal(err) } @@ -175,7 +175,7 @@ func TestMemoryStorage_CleanAll(t *testing.T) { }) } { - writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1) + writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1, false) if err != nil { t.Fatal(err) } @@ -198,7 +198,7 @@ func TestMemoryStorage_Purge(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil) expiredAt := time.Now().Unix() + 60 { - writer, err := storage.OpenWriter("abc", expiredAt, 200, -1) + writer, err := storage.OpenWriter("abc", expiredAt, 200, -1, false) if err != nil { t.Fatal(err) } @@ -210,7 +210,7 @@ func TestMemoryStorage_Purge(t *testing.T) { }) } { - writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1) + writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1, false) if err != nil { t.Fatal(err) } @@ -241,7 +241,7 @@ func TestMemoryStorage_Expire(t *testing.T) { for i := 0; i < 1000; i++ { expiredAt := time.Now().Unix() + int64(rands.Int(0, 60)) key := "abc" + strconv.Itoa(i) - writer, err := storage.OpenWriter(key, expiredAt, 200, -1) + writer, err := storage.OpenWriter(key, expiredAt, 200, -1, false) if err != nil { t.Fatal(err) } diff --git a/internal/caches/writer.go b/internal/caches/writer.go index ca180a1..40e66bc 100644 --- a/internal/caches/writer.go +++ b/internal/caches/writer.go @@ -8,6 +8,9 @@ type Writer interface { // Write 写入Body数据 Write(data []byte) (n int, err error) + // WriteAt 在指定位置写入数据 + WriteAt(data []byte, offset int64) error + // HeaderSize 写入的Header数据大小 HeaderSize() int64 diff --git a/internal/caches/writer_file.go b/internal/caches/writer_file.go index 06ff7ed..550643a 100644 --- a/internal/caches/writer_file.go +++ b/internal/caches/writer_file.go @@ -2,6 +2,7 @@ package caches import ( "encoding/binary" + "errors" "github.com/iwind/TeaGo/types" "io" "os" @@ -65,6 +66,13 @@ func (this *FileWriter) Write(data []byte) (n int, err error) { return } +// WriteAt 在指定位置写入数据 +func (this *FileWriter) WriteAt(data []byte, offset int64) error { + _ = data + _ = offset + return errors.New("not supported") +} + // WriteBodyLength 写入Body长度数据 func (this *FileWriter) WriteBodyLength(bodyLength int64) error { bytes8 := make([]byte, 8) diff --git a/internal/caches/writer_memory.go b/internal/caches/writer_memory.go index 5f81cf1..e2e9771 100644 --- a/internal/caches/writer_memory.go +++ b/internal/caches/writer_memory.go @@ -1,6 +1,7 @@ package caches import ( + "errors" "github.com/cespare/xxhash" "sync" "time" @@ -55,6 +56,13 @@ func (this *MemoryWriter) Write(data []byte) (n int, err error) { return len(data), nil } +// WriteAt 在指定位置写入数据 +func (this *MemoryWriter) WriteAt(b []byte, offset int64) error { + _ = b + _ = offset + return errors.New("not supported") +} + // HeaderSize 数据尺寸 func (this *MemoryWriter) HeaderSize() int64 { return this.headerSize diff --git a/internal/caches/writer_partial_file.go b/internal/caches/writer_partial_file.go new file mode 100644 index 0000000..24821e9 --- /dev/null +++ b/internal/caches/writer_partial_file.go @@ -0,0 +1,173 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package caches + +import ( + "encoding/binary" + "github.com/iwind/TeaGo/types" + "io" + "os" + "strings" + "sync" +) + +type PartialFileWriter struct { + rawWriter *os.File + key string + headerSize int64 + bodySize int64 + expiredAt int64 + endFunc func() + once sync.Once + + isNew bool + isPartial bool + bodyOffset int64 +} + +func NewPartialFileWriter(rawWriter *os.File, key string, expiredAt int64, isNew bool, isPartial bool, bodyOffset int64, endFunc func()) *PartialFileWriter { + return &PartialFileWriter{ + key: key, + rawWriter: rawWriter, + expiredAt: expiredAt, + endFunc: endFunc, + isNew: isNew, + isPartial: isPartial, + bodyOffset: bodyOffset, + } +} + +// WriteHeader 写入数据 +func (this *PartialFileWriter) WriteHeader(data []byte) (n int, err error) { + if !this.isNew { + return + } + n, err = this.rawWriter.Write(data) + this.headerSize += int64(n) + if err != nil { + _ = this.Discard() + } + return +} + +// WriteHeaderLength 写入Header长度数据 +func (this *PartialFileWriter) WriteHeaderLength(headerLength int) error { + bytes4 := make([]byte, 4) + binary.BigEndian.PutUint32(bytes4, uint32(headerLength)) + _, err := this.rawWriter.Seek(SizeExpiresAt+SizeStatus+SizeURLLength, io.SeekStart) + if err != nil { + _ = this.Discard() + return err + } + _, err = this.rawWriter.Write(bytes4) + if err != nil { + _ = this.Discard() + return err + } + return nil +} + +// Write 写入数据 +func (this *PartialFileWriter) Write(data []byte) (n int, err error) { + n, err = this.rawWriter.Write(data) + this.bodySize += int64(n) + if err != nil { + _ = this.Discard() + } + return +} + +// WriteAt 在指定位置写入数据 +func (this *PartialFileWriter) WriteAt(data []byte, offset int64) error { + if this.bodyOffset == 0 { + this.bodyOffset = SizeMeta + int64(len(this.key)) + this.headerSize + } + _, err := this.rawWriter.WriteAt(data, this.bodyOffset+offset) + return err +} + +// WriteBodyLength 写入Body长度数据 +func (this *PartialFileWriter) WriteBodyLength(bodyLength int64) error { + bytes8 := make([]byte, 8) + binary.BigEndian.PutUint64(bytes8, uint64(bodyLength)) + _, err := this.rawWriter.Seek(SizeExpiresAt+SizeStatus+SizeURLLength+SizeHeaderLength, io.SeekStart) + if err != nil { + _ = this.Discard() + return err + } + _, err = this.rawWriter.Write(bytes8) + if err != nil { + _ = this.Discard() + return err + } + return nil +} + +// Close 关闭 +func (this *PartialFileWriter) Close() error { + defer this.once.Do(func() { + this.endFunc() + }) + + var path = this.rawWriter.Name() + + if this.isNew { + err := this.WriteHeaderLength(types.Int(this.headerSize)) + if err != nil { + _ = this.rawWriter.Close() + _ = os.Remove(path) + return err + } + err = this.WriteBodyLength(this.bodySize) + if err != nil { + _ = this.rawWriter.Close() + _ = os.Remove(path) + return err + } + } + + err := this.rawWriter.Close() + if err != nil { + _ = os.Remove(path) + } else if !this.isPartial { + err = os.Rename(path, strings.Replace(path, ".tmp", "", 1)) + if err != nil { + _ = os.Remove(path) + } + } + + return err +} + +// Discard 丢弃 +func (this *PartialFileWriter) Discard() error { + defer this.once.Do(func() { + this.endFunc() + }) + + _ = this.rawWriter.Close() + + err := os.Remove(this.rawWriter.Name()) + return err +} + +func (this *PartialFileWriter) HeaderSize() int64 { + return this.headerSize +} + +func (this *PartialFileWriter) BodySize() int64 { + return this.bodySize +} + +func (this *PartialFileWriter) ExpiredAt() int64 { + return this.expiredAt +} + +func (this *PartialFileWriter) Key() string { + return this.key +} + +// ItemType 获取内容类型 +func (this *PartialFileWriter) ItemType() ItemType { + return ItemTypeFile +} diff --git a/internal/caches/writer_partial_file_test.go b/internal/caches/writer_partial_file_test.go new file mode 100644 index 0000000..0058d33 --- /dev/null +++ b/internal/caches/writer_partial_file_test.go @@ -0,0 +1,50 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package caches_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/caches" + "github.com/iwind/TeaGo/types" + "io/ioutil" + "os" + "testing" + "time" +) + +func TestPartialFileWriter_SeekOffset(t *testing.T) { + var path = "/tmp/test@partial.cache" + _ = os.Remove(path) + + var reader = func() { + data, err := ioutil.ReadFile(path) + if err != nil { + t.Fatal(err) + } + t.Log("["+types.String(len(data))+"]", string(data)) + } + + fp, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + t.Fatal(err) + } + var writer = caches.NewPartialFileWriter(fp, "test", time.Now().Unix()+86500, true, true, 0, func() { + t.Log("end") + }) + _, err = writer.WriteHeader([]byte("header")) + if err != nil { + t.Fatal(err) + } + + // 移动位置 + err = writer.WriteAt([]byte("HELLO"), 100) + if err != nil { + t.Fatal(err) + } + + err = writer.Close() + if err != nil { + t.Fatal(err) + } + + reader() +} diff --git a/internal/nodes/api_stream.go b/internal/nodes/api_stream.go index b86d92b..1586796 100644 --- a/internal/nodes/api_stream.go +++ b/internal/nodes/api_stream.go @@ -182,7 +182,7 @@ func (this *APIStream) handleWriteCache(message *pb.NodeStreamMessage) error { } expiredAt := time.Now().Unix() + msg.LifeSeconds - writer, err := storage.OpenWriter(msg.Key, expiredAt, 200, int64(len(msg.Value))) + writer, err := storage.OpenWriter(msg.Key, expiredAt, 200, int64(len(msg.Value)), false) if err != nil { this.replyFail(message.RequestId, "prepare writing failed: "+err.Error()) return err @@ -462,7 +462,7 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error { } expiredAt := time.Now().Unix() + 8600 - writer, err := storage.OpenWriter(key, expiredAt, 200, resp.ContentLength) // TODO 可以设置缓存过期时间 + writer, err := storage.OpenWriter(key, expiredAt, 200, resp.ContentLength, false) // TODO 可以设置缓存过期时间 if err != nil { locker.Lock() errorMessages = append(errorMessages, "open cache writer failed: "+key+": "+err.Error()) diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index 209139d..9e3e987 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -243,7 +243,7 @@ func (this *HTTPWriter) PrepareCache(resp *http.Response, size int64) { var expiredAt = utils.UnixTime() + life var cacheKey = this.req.cacheKey - cacheWriter, err := storage.OpenWriter(cacheKey, expiredAt, this.StatusCode(), size) + cacheWriter, err := storage.OpenWriter(cacheKey, expiredAt, this.StatusCode(), size, false) if err != nil { if !caches.CanIgnoreErr(err) { remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error()) @@ -310,6 +310,7 @@ func (this *HTTPWriter) PrepareWebP(resp *http.Response, size int64) { return } this.Header().Del("Content-Encoding") + this.Header().Del("Content-Length") this.rawReader = reader case "": // 空 default: @@ -559,7 +560,7 @@ func (this *HTTPWriter) Close() { if this.cacheWriter != nil { var cacheKey = this.cacheWriter.Key() + webpSuffix - webpCacheWriter, _ = this.cacheStorage.OpenWriter(cacheKey, this.cacheWriter.ExpiredAt(), this.StatusCode(), -1) + webpCacheWriter, _ = this.cacheStorage.OpenWriter(cacheKey, this.cacheWriter.ExpiredAt(), this.StatusCode(), -1, false) if webpCacheWriter != nil { // 写入Header for k, v := range this.Header() {