进一步提升文件缓存写入速度

This commit is contained in:
刘祥超
2022-11-19 15:55:05 +08:00
parent a194360a56
commit 8b5d74af9b
4 changed files with 94 additions and 43 deletions

View File

@@ -567,6 +567,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz
var before = time.Now() var before = time.Now()
writer, err := os.OpenFile(tmpPath, flags, 0666) writer, err := os.OpenFile(tmpPath, flags, 0666)
if err != nil { if err != nil {
// TODO 检查在各个系统中的稳定性
if os.IsNotExist(err) { if os.IsNotExist(err) {
_ = os.MkdirAll(dir, 0777) _ = os.MkdirAll(dir, 0777)
@@ -608,8 +609,9 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz
} }
if isNewCreated { if isNewCreated {
// 写入过期时间 // 写入meta
var metaBytes = make([]byte, SizeMeta+len(key)) // 从v0.5.8开始不再在meta中写入Key
var metaBytes = make([]byte, SizeMeta)
binary.BigEndian.PutUint32(metaBytes[OffsetExpiresAt:], uint32(expiredAt)) binary.BigEndian.PutUint32(metaBytes[OffsetExpiresAt:], uint32(expiredAt))
// 写入状态码 // 写入状态码
@@ -618,18 +620,6 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, siz
} }
copy(metaBytes[OffsetStatus:], strconv.Itoa(status)) copy(metaBytes[OffsetStatus:], strconv.Itoa(status))
// 写入URL长度
binary.BigEndian.PutUint32(metaBytes[OffsetURLLength:], uint32(len(key)))
// 写入Header Length
binary.BigEndian.PutUint32(metaBytes[OffsetHeaderLength:], uint32(0))
// 写入Body Length
binary.BigEndian.PutUint64(metaBytes[OffsetBodyLength:], uint64(0))
// 写入URL
copy(metaBytes[OffsetKey:], key)
_, err = writer.Write(metaBytes) _, err = writer.Write(metaBytes)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -679,7 +669,7 @@ func (this *FileStorage) AddToList(item *Item) {
} }
item.MetaSize = SizeMeta + 128 item.MetaSize = SizeMeta + 128
hash := stringutil.Md5(item.Key) var hash = stringutil.Md5(item.Key)
err := this.list.Add(hash, item) err := this.list.Add(hash, item)
if err != nil && !strings.Contains(err.Error(), "UNIQUE constraint failed") { if err != nil && !strings.Contains(err.Error(), "UNIQUE constraint failed") {
remotelogs.Error("CACHE", "add to list failed: "+err.Error()) remotelogs.Error("CACHE", "add to list failed: "+err.Error())

View File

@@ -324,13 +324,16 @@ func (this *HTTPWriter) PrepareCache(resp *http.Response, size int64) {
} }
// 写入Header // 写入Header
var headerBuf = utils.SharedBufferPool.Get()
for k, v := range this.Header() { for k, v := range this.Header() {
for _, v1 := range v { for _, v1 := range v {
if this.isPartial && k == "Content-Type" && strings.Contains(v1, "multipart/byteranges") { if this.isPartial && k == "Content-Type" && strings.Contains(v1, "multipart/byteranges") {
continue continue
} }
_, err = cacheWriter.WriteHeader([]byte(k + ":" + v1 + "\n")) _, err = headerBuf.Write([]byte(k + ":" + v1 + "\n"))
if err != nil { if err != nil {
utils.SharedBufferPool.Put(headerBuf)
remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error()) remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error())
_ = this.cacheWriter.Discard() _ = this.cacheWriter.Discard()
this.cacheWriter = nil this.cacheWriter = nil
@@ -338,6 +341,14 @@ func (this *HTTPWriter) PrepareCache(resp *http.Response, size int64) {
} }
} }
} }
_, err = cacheWriter.WriteHeader(headerBuf.Bytes())
utils.SharedBufferPool.Put(headerBuf)
if err != nil {
remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error())
_ = this.cacheWriter.Discard()
this.cacheWriter = nil
return
}
if this.isPartial { if this.isPartial {
// content-range // content-range
@@ -633,10 +644,12 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) {
} }
// 写入Header // 写入Header
var headerBuffer = utils.SharedBufferPool.Get()
for k, v := range this.Header() { for k, v := range this.Header() {
for _, v1 := range v { for _, v1 := range v {
_, err = compressionCacheWriter.WriteHeader([]byte(k + ":" + v1 + "\n")) _, err = headerBuffer.Write([]byte(k + ":" + v1 + "\n"))
if err != nil { if err != nil {
utils.SharedBufferPool.Put(headerBuffer)
remotelogs.Error("HTTP_WRITER", "write compression cache failed: "+err.Error()) remotelogs.Error("HTTP_WRITER", "write compression cache failed: "+err.Error())
_ = compressionCacheWriter.Discard() _ = compressionCacheWriter.Discard()
compressionCacheWriter = nil compressionCacheWriter = nil
@@ -645,6 +658,15 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) {
} }
} }
_, err = compressionCacheWriter.WriteHeader(headerBuffer.Bytes())
utils.SharedBufferPool.Put(headerBuffer)
if err != nil {
remotelogs.Error("HTTP_WRITER", "write compression cache failed: "+err.Error())
_ = compressionCacheWriter.Discard()
compressionCacheWriter = nil
return
}
if compressionCacheWriter != nil { if compressionCacheWriter != nil {
this.compressionCacheWriter = compressionCacheWriter this.compressionCacheWriter = compressionCacheWriter
var teeWriter = writers.NewTeeWriterCloser(this.writer, compressionCacheWriter) var teeWriter = writers.NewTeeWriterCloser(this.writer, compressionCacheWriter)

View File

@@ -2,47 +2,39 @@
package utils package utils
import "bytes" import (
"bytes"
"sync"
)
var SharedBufferPool = NewBufferPool()
// BufferPool pool for get byte slice // BufferPool pool for get byte slice
type BufferPool struct { type BufferPool struct {
c chan *bytes.Buffer rawPool *sync.Pool
} }
// NewBufferPool 创建新对象 // NewBufferPool 创建新对象
func NewBufferPool(maxSize int) *BufferPool { func NewBufferPool() *BufferPool {
if maxSize <= 0 { var pool = &BufferPool{}
maxSize = 1024 pool.rawPool = &sync.Pool{
} New: func() any {
pool := &BufferPool{ return &bytes.Buffer{}
c: make(chan *bytes.Buffer, maxSize), },
} }
return pool return pool
} }
// Get 获取一个新的Buffer // Get 获取一个新的Buffer
func (this *BufferPool) Get() (b *bytes.Buffer) { func (this *BufferPool) Get() (b *bytes.Buffer) {
select { var buffer = this.rawPool.Get().(*bytes.Buffer)
case b = <-this.c: if buffer.Len() > 0 {
b.Reset() buffer.Reset()
default:
b = &bytes.Buffer{}
} }
return return buffer
} }
// Put 放回一个使用过的byte slice // Put 放回一个使用过的byte slice
func (this *BufferPool) Put(b *bytes.Buffer) { func (this *BufferPool) Put(b *bytes.Buffer) {
b.Reset() this.rawPool.Put(b)
select {
case this.c <- b:
default:
// 已达最大容量,则抛弃
}
}
// Size 当前的数量
func (this *BufferPool) Size() int {
return len(this.c)
} }

View File

@@ -0,0 +1,47 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
package utils_test
import (
"bytes"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"strings"
"testing"
)
func TestNewBufferPool(t *testing.T) {
var pool = utils.NewBufferPool()
var b = pool.Get()
b.WriteString("Hello, World")
t.Log(b.String())
pool.Put(b)
t.Log(b.String())
b = pool.Get()
t.Log(b.String())
}
func BenchmarkNewBufferPool1(b *testing.B) {
var data = []byte(strings.Repeat("Hello", 1024))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var buffer = &bytes.Buffer{}
buffer.Write(data)
}
})
}
func BenchmarkNewBufferPool2(b *testing.B) {
var pool = utils.NewBufferPool()
var data = []byte(strings.Repeat("Hello", 1024))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var buffer = pool.Get()
buffer.Write(data)
pool.Put(buffer)
}
})
}