进一步提升文件缓存写入速度

This commit is contained in:
GoEdgeLab
2022-11-19 15:55:05 +08:00
parent b61bf0280c
commit 6cc70dc8e5
4 changed files with 94 additions and 43 deletions

View File

@@ -567,6 +567,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz
var before = time.Now()
writer, err := os.OpenFile(tmpPath, flags, 0666)
if err != nil {
// TODO 检查在各个系统中的稳定性
if os.IsNotExist(err) {
_ = os.MkdirAll(dir, 0777)
@@ -608,8 +609,9 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz
}
if isNewCreated {
// 写入过期时间
var metaBytes = make([]byte, SizeMeta+len(key))
// 写入meta
// 从v0.5.8开始不再在meta中写入Key
var metaBytes = make([]byte, SizeMeta)
binary.BigEndian.PutUint32(metaBytes[OffsetExpiresAt:], uint32(expiredAt))
// 写入状态码
@@ -618,18 +620,6 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz
}
copy(metaBytes[OffsetStatus:], strconv.Itoa(status))
// 写入URL长度
binary.BigEndian.PutUint32(metaBytes[OffsetURLLength:], uint32(len(key)))
// 写入Header Length
binary.BigEndian.PutUint32(metaBytes[OffsetHeaderLength:], uint32(0))
// 写入Body Length
binary.BigEndian.PutUint64(metaBytes[OffsetBodyLength:], uint64(0))
// 写入URL
copy(metaBytes[OffsetKey:], key)
_, err = writer.Write(metaBytes)
if err != nil {
return nil, err
@@ -679,7 +669,7 @@ func (this *FileStorage) AddToList(item *Item) {
}
item.MetaSize = SizeMeta + 128
hash := stringutil.Md5(item.Key)
var hash = stringutil.Md5(item.Key)
err := this.list.Add(hash, item)
if err != nil && !strings.Contains(err.Error(), "UNIQUE constraint failed") {
remotelogs.Error("CACHE", "add to list failed: "+err.Error())

View File

@@ -324,13 +324,16 @@ func (this *HTTPWriter) PrepareCache(resp *http.Response, size int64) {
}
// 写入Header
var headerBuf = utils.SharedBufferPool.Get()
for k, v := range this.Header() {
for _, v1 := range v {
if this.isPartial && k == "Content-Type" && strings.Contains(v1, "multipart/byteranges") {
continue
}
_, err = cacheWriter.WriteHeader([]byte(k + ":" + v1 + "\n"))
_, err = headerBuf.Write([]byte(k + ":" + v1 + "\n"))
if err != nil {
utils.SharedBufferPool.Put(headerBuf)
remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error())
_ = this.cacheWriter.Discard()
this.cacheWriter = nil
@@ -338,6 +341,14 @@ func (this *HTTPWriter) PrepareCache(resp *http.Response, size int64) {
}
}
}
_, err = cacheWriter.WriteHeader(headerBuf.Bytes())
utils.SharedBufferPool.Put(headerBuf)
if err != nil {
remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error())
_ = this.cacheWriter.Discard()
this.cacheWriter = nil
return
}
if this.isPartial {
// content-range
@@ -633,10 +644,12 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) {
}
// 写入Header
var headerBuffer = utils.SharedBufferPool.Get()
for k, v := range this.Header() {
for _, v1 := range v {
_, err = compressionCacheWriter.WriteHeader([]byte(k + ":" + v1 + "\n"))
_, err = headerBuffer.Write([]byte(k + ":" + v1 + "\n"))
if err != nil {
utils.SharedBufferPool.Put(headerBuffer)
remotelogs.Error("HTTP_WRITER", "write compression cache failed: "+err.Error())
_ = compressionCacheWriter.Discard()
compressionCacheWriter = nil
@@ -645,6 +658,15 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) {
}
}
_, err = compressionCacheWriter.WriteHeader(headerBuffer.Bytes())
utils.SharedBufferPool.Put(headerBuffer)
if err != nil {
remotelogs.Error("HTTP_WRITER", "write compression cache failed: "+err.Error())
_ = compressionCacheWriter.Discard()
compressionCacheWriter = nil
return
}
if compressionCacheWriter != nil {
this.compressionCacheWriter = compressionCacheWriter
var teeWriter = writers.NewTeeWriterCloser(this.writer, compressionCacheWriter)

View File

@@ -2,47 +2,39 @@
package utils
import "bytes"
import (
"bytes"
"sync"
)
var SharedBufferPool = NewBufferPool()
// BufferPool pool for get byte slice
type BufferPool struct {
c chan *bytes.Buffer
rawPool *sync.Pool
}
// NewBufferPool 创建新对象
func NewBufferPool(maxSize int) *BufferPool {
if maxSize <= 0 {
maxSize = 1024
}
pool := &BufferPool{
c: make(chan *bytes.Buffer, maxSize),
func NewBufferPool() *BufferPool {
var pool = &BufferPool{}
pool.rawPool = &sync.Pool{
New: func() any {
return &bytes.Buffer{}
},
}
return pool
}
// Get 获取一个新的Buffer
func (this *BufferPool) Get() (b *bytes.Buffer) {
select {
case b = <-this.c:
b.Reset()
default:
b = &bytes.Buffer{}
var buffer = this.rawPool.Get().(*bytes.Buffer)
if buffer.Len() > 0 {
buffer.Reset()
}
return
return buffer
}
// Put 放回一个使用过的byte slice
func (this *BufferPool) Put(b *bytes.Buffer) {
b.Reset()
select {
case this.c <- b:
default:
// 已达最大容量,则抛弃
}
}
// Size 当前的数量
func (this *BufferPool) Size() int {
return len(this.c)
this.rawPool.Put(b)
}

View File

@@ -0,0 +1,47 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
package utils_test
import (
"bytes"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"strings"
"testing"
)
func TestNewBufferPool(t *testing.T) {
var pool = utils.NewBufferPool()
var b = pool.Get()
b.WriteString("Hello, World")
t.Log(b.String())
pool.Put(b)
t.Log(b.String())
b = pool.Get()
t.Log(b.String())
}
func BenchmarkNewBufferPool1(b *testing.B) {
var data = []byte(strings.Repeat("Hello", 1024))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var buffer = &bytes.Buffer{}
buffer.Write(data)
}
})
}
func BenchmarkNewBufferPool2(b *testing.B) {
var pool = utils.NewBufferPool()
var data = []byte(strings.Repeat("Hello", 1024))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var buffer = pool.Get()
buffer.Write(data)
pool.Put(buffer)
}
})
}