优化字节缓冲区相关代码

This commit is contained in:
刘祥超
2024-04-15 09:26:00 +08:00
parent 4bdd248f99
commit 5e7ea9a884
15 changed files with 243 additions and 57 deletions

View File

@@ -1278,6 +1278,7 @@ func (this *FileStorage) hotLoop() {
} }
var buf = utils.BytePool16k.Get() var buf = utils.BytePool16k.Get()
defer utils.BytePool16k.Put(buf) defer utils.BytePool16k.Put(buf)
for _, item := range result[:size] { for _, item := range result[:size] {
reader, err := this.openReader(item.Key, false, false, false) reader, err := this.openReader(item.Key, false, false, false)
@@ -1319,8 +1320,8 @@ func (this *FileStorage) hotLoop() {
continue continue
} }
err = reader.ReadHeader(buf, func(n int) (goNext bool, err error) { err = reader.ReadHeader(buf.Bytes, func(n int) (goNext bool, err error) {
_, err = writer.WriteHeader(buf[:n]) _, err = writer.WriteHeader(buf.Bytes[:n])
return return
}) })
if err != nil { if err != nil {
@@ -1329,10 +1330,10 @@ func (this *FileStorage) hotLoop() {
continue continue
} }
err = reader.ReadBody(buf, func(n int) (goNext bool, err error) { err = reader.ReadBody(buf.Bytes, func(n int) (goNext bool, err error) {
goNext = true goNext = true
if n > 0 { if n > 0 {
_, err = writer.Write(buf[:n]) _, err = writer.Write(buf.Bytes[:n])
if err != nil { if err != nil {
goNext = false goNext = false
} }

View File

@@ -342,8 +342,8 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
this.writer.SetSentHeaderBytes(reader.HeaderSize()) this.writer.SetSentHeaderBytes(reader.HeaderSize())
var headerPool = this.bytePool(reader.HeaderSize()) var headerPool = this.bytePool(reader.HeaderSize())
var headerBuf = headerPool.Get() var headerBuf = headerPool.Get()
err = reader.ReadHeader(headerBuf, func(n int) (goNext bool, readErr error) { err = reader.ReadHeader(headerBuf.Bytes, func(n int) (goNext bool, readErr error) {
headerData = append(headerData, headerBuf[:n]...) headerData = append(headerData, headerBuf.Bytes[:n]...)
for { for {
var nIndex = bytes.Index(headerData, []byte{'\n'}) var nIndex = bytes.Index(headerData, []byte{'\n'})
if nIndex >= 0 { if nIndex >= 0 {
@@ -501,8 +501,8 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
var pool = this.bytePool(fileSize) var pool = this.bytePool(fileSize)
var bodyBuf = pool.Get() var bodyBuf = pool.Get()
err = reader.ReadBodyRange(bodyBuf, ranges[0].Start(), ranges[0].End(), func(n int) (goNext bool, readErr error) { err = reader.ReadBodyRange(bodyBuf.Bytes, ranges[0].Start(), ranges[0].End(), func(n int) (goNext bool, readErr error) {
_, readErr = this.writer.Write(bodyBuf[:n]) _, readErr = this.writer.Write(bodyBuf.Bytes[:n])
if readErr != nil { if readErr != nil {
return false, errWritingToClient return false, errWritingToClient
} }
@@ -557,8 +557,8 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
var pool = this.bytePool(fileSize) var pool = this.bytePool(fileSize)
var bodyBuf = pool.Get() var bodyBuf = pool.Get()
err = reader.ReadBodyRange(bodyBuf, r.Start(), r.End(), func(n int) (goNext bool, readErr error) { err = reader.ReadBodyRange(bodyBuf.Bytes, r.Start(), r.End(), func(n int) (goNext bool, readErr error) {
_, readErr = this.writer.Write(bodyBuf[:n]) _, readErr = this.writer.Write(bodyBuf.Bytes[:n])
if readErr != nil { if readErr != nil {
return false, errWritingToClient return false, errWritingToClient
} }
@@ -592,9 +592,9 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
var pool = this.bytePool(fileSize) var pool = this.bytePool(fileSize)
var bodyBuf = pool.Get() var bodyBuf = pool.Get()
if fp, canSendFile := this.writer.canSendfile(); canSendFile { if fp, canSendFile := this.writer.canSendfile(); canSendFile {
this.writer.sentBodyBytes, err = io.CopyBuffer(this.writer.rawWriter, fp, bodyBuf) this.writer.sentBodyBytes, err = io.CopyBuffer(this.writer.rawWriter, fp, bodyBuf.Bytes)
} else { } else {
_, err = io.CopyBuffer(this.writer, resp.Body, bodyBuf) _, err = io.CopyBuffer(this.writer, resp.Body, bodyBuf.Bytes)
} }
pool.Put(bodyBuf) pool.Put(bodyBuf)
} else { } else {
@@ -604,7 +604,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
} else { } else {
var pool = this.bytePool(fileSize) var pool = this.bytePool(fileSize)
var bodyBuf = pool.Get() var bodyBuf = pool.Get()
_, err = io.CopyBuffer(this.writer, resp.Body, bodyBuf) _, err = io.CopyBuffer(this.writer, resp.Body, bodyBuf.Bytes)
pool.Put(bodyBuf) pool.Put(bodyBuf)
} }
} }

View File

@@ -206,9 +206,9 @@ func (this *HTTPRequest) doFastcgi() (shouldStop bool) {
this.writer.WriteHeader(resp.StatusCode) this.writer.WriteHeader(resp.StatusCode)
// 输出到客户端 // 输出到客户端
pool := this.bytePool(resp.ContentLength) var pool = this.bytePool(resp.ContentLength)
buf := pool.Get() var buf = pool.Get()
_, err = io.CopyBuffer(this.writer, resp.Body, buf) _, err = io.CopyBuffer(this.writer, resp.Body, buf.Bytes)
pool.Put(buf) pool.Put(buf)
closeErr := resp.Body.Close() closeErr := resp.Body.Close()

View File

@@ -104,7 +104,7 @@ func (this *HTTPRequest) doPageLookup(pages []*serverconfigs.HTTPPageConfig, sta
this.writer.WriteHeader(status) this.writer.WriteHeader(status)
} }
var buf = utils.BytePool1k.Get() var buf = utils.BytePool1k.Get()
_, err = utils.CopyWithFilter(this.writer, fp, buf, func(p []byte) []byte { _, err = utils.CopyWithFilter(this.writer, fp, buf.Bytes, func(p []byte) []byte {
return []byte(this.Format(string(p))) return []byte(this.Format(string(p)))
}) })
utils.BytePool1k.Put(buf) utils.BytePool1k.Put(buf)

View File

@@ -547,7 +547,7 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId
if resp.ContentLength == 0 && len(resp.TransferEncoding) == 0 { if resp.ContentLength == 0 && len(resp.TransferEncoding) == 0 {
// 即使内容为0也需要读取一次以便于触发相关事件 // 即使内容为0也需要读取一次以便于触发相关事件
var buf = utils.BytePool4k.Get() var buf = utils.BytePool4k.Get()
_, _ = io.CopyBuffer(this.writer, resp.Body, buf) _, _ = io.CopyBuffer(this.writer, resp.Body, buf.Bytes)
utils.BytePool4k.Put(buf) utils.BytePool4k.Put(buf)
_ = resp.Body.Close() _ = resp.Body.Close()
respBodyIsClosed = true respBodyIsClosed = true
@@ -562,9 +562,9 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId
var err error var err error
if shouldAutoFlush { if shouldAutoFlush {
for { for {
n, readErr := resp.Body.Read(buf) n, readErr := resp.Body.Read(buf.Bytes)
if n > 0 { if n > 0 {
_, err = this.writer.Write(buf[:n]) _, err = this.writer.Write(buf.Bytes[:n])
this.writer.Flush() this.writer.Flush()
if err != nil { if err != nil {
break break
@@ -582,10 +582,10 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId
resp.ContentLength < (128<<20) { // TODO configure max content-length in cache policy OR CacheRef resp.ContentLength < (128<<20) { // TODO configure max content-length in cache policy OR CacheRef
var requestIsCanceled = false var requestIsCanceled = false
for { for {
n, readErr := resp.Body.Read(buf) n, readErr := resp.Body.Read(buf.Bytes)
if n > 0 && !requestIsCanceled { if n > 0 && !requestIsCanceled {
_, err = this.writer.Write(buf[:n]) _, err = this.writer.Write(buf.Bytes[:n])
if err != nil { if err != nil {
requestIsCanceled = true requestIsCanceled = true
} }
@@ -596,7 +596,7 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId
} }
} }
} else { } else {
_, err = io.CopyBuffer(this.writer, resp.Body, buf) _, err = io.CopyBuffer(this.writer, resp.Body, buf.Bytes)
} }
} }
pool.Put(buf) pool.Put(buf)

View File

@@ -327,7 +327,7 @@ func (this *HTTPRequest) doRoot() (isBreak bool) {
respHeader.Set("Content-Range", ranges[0].ComposeContentRangeHeader(types.String(fileSize))) respHeader.Set("Content-Range", ranges[0].ComposeContentRangeHeader(types.String(fileSize)))
this.writer.WriteHeader(http.StatusPartialContent) this.writer.WriteHeader(http.StatusPartialContent)
ok, err := httpRequestReadRange(resp.Body, buf, ranges[0].Start(), ranges[0].End(), func(buf []byte, n int) error { ok, err := httpRequestReadRange(resp.Body, buf.Bytes, ranges[0].Start(), ranges[0].End(), func(buf []byte, n int) error {
_, err := this.writer.Write(buf[:n]) _, err := this.writer.Write(buf[:n])
return err return err
}) })
@@ -379,7 +379,7 @@ func (this *HTTPRequest) doRoot() (isBreak bool) {
} }
} }
ok, err := httpRequestReadRange(resp.Body, buf, r.Start(), r.End(), func(buf []byte, n int) error { ok, err := httpRequestReadRange(resp.Body, buf.Bytes, r.Start(), r.End(), func(buf []byte, n int) error {
_, err := this.writer.Write(buf[:n]) _, err := this.writer.Write(buf[:n])
return err return err
}) })
@@ -404,7 +404,7 @@ func (this *HTTPRequest) doRoot() (isBreak bool) {
return true return true
} }
} else { } else {
_, err = io.CopyBuffer(this.writer, resp.Body, buf) _, err = io.CopyBuffer(this.writer, resp.Body, buf.Bytes)
if err != nil { if err != nil {
if !this.canIgnore(err) { if !this.canIgnore(err) {
logs.Error(err) logs.Error(err)

View File

@@ -69,7 +69,7 @@ func (this *HTTPRequest) doShutdown() {
this.writer.WriteHeader(http.StatusOK) this.writer.WriteHeader(http.StatusOK)
} }
var buf = utils.BytePool1k.Get() var buf = utils.BytePool1k.Get()
_, err = utils.CopyWithFilter(this.writer, fp, buf, func(p []byte) []byte { _, err = utils.CopyWithFilter(this.writer, fp, buf.Bytes, func(p []byte) []byte {
return []byte(this.Format(string(p))) return []byte(this.Format(string(p)))
}) })
utils.BytePool1k.Put(buf) utils.BytePool1k.Put(buf)

View File

@@ -70,11 +70,11 @@ func (this *HTTPRequest) doURL(method string, url string, host string, statusCod
var pool = this.bytePool(resp.ContentLength) var pool = this.bytePool(resp.ContentLength)
var buf = pool.Get() var buf = pool.Get()
if supportVariables { if supportVariables {
_, err = utils.CopyWithFilter(this.writer, resp.Body, buf, func(p []byte) []byte { _, err = utils.CopyWithFilter(this.writer, resp.Body, buf.Bytes, func(p []byte) []byte {
return []byte(this.Format(string(p))) return []byte(this.Format(string(p)))
}) })
} else { } else {
_, err = io.CopyBuffer(this.writer, resp.Body, buf) _, err = io.CopyBuffer(this.writer, resp.Body, buf.Bytes)
} }
pool.Put(buf) pool.Put(buf)

View File

@@ -181,10 +181,10 @@ func (this *HTTPRequest) doWebsocket(requestHost string, isLastRetry bool) (shou
var buf = utils.BytePool4k.Get() var buf = utils.BytePool4k.Get()
defer utils.BytePool4k.Put(buf) defer utils.BytePool4k.Put(buf)
for { for {
n, err := originConn.Read(buf) n, err := originConn.Read(buf.Bytes)
if n > 0 { if n > 0 {
this.writer.sentBodyBytes += int64(n) this.writer.sentBodyBytes += int64(n)
_, err = clientConn.Write(buf[:n]) _, err = clientConn.Write(buf.Bytes[:n])
if err != nil { if err != nil {
break break
} }

View File

@@ -882,7 +882,7 @@ func (this *HTTPWriter) SendFile(status int, path string) (int64, error) {
var buf = bufPool.Get() var buf = bufPool.Get()
defer bufPool.Put(buf) defer bufPool.Put(buf)
written, err := io.CopyBuffer(this, fp, buf) written, err := io.CopyBuffer(this, fp, buf.Bytes)
if err != nil { if err != nil {
return written, err return written, err
} }
@@ -903,7 +903,7 @@ func (this *HTTPWriter) SendResp(resp *http.Response) (int64, error) {
var buf = bufPool.Get() var buf = bufPool.Get()
defer bufPool.Put(buf) defer bufPool.Put(buf)
return io.CopyBuffer(this, resp.Body, buf) return io.CopyBuffer(this, resp.Body, buf.Bytes)
} }
// Redirect 跳转 // Redirect 跳转

View File

@@ -188,14 +188,14 @@ func (this *TCPListener) handleConn(server *serverconfigs.ServerConfig, conn net
// 从源站读取 // 从源站读取
goman.New(func() { goman.New(func() {
var originBuffer = utils.BytePool16k.Get() var originBuf = utils.BytePool16k.Get()
defer func() { defer func() {
utils.BytePool16k.Put(originBuffer) utils.BytePool16k.Put(originBuf)
}() }()
for { for {
n, err := originConn.Read(originBuffer) n, err := originConn.Read(originBuf.Bytes)
if n > 0 { if n > 0 {
_, err = conn.Write(originBuffer[:n]) _, err = conn.Write(originBuf.Bytes[:n])
if err != nil { if err != nil {
closer() closer()
break break
@@ -214,9 +214,9 @@ func (this *TCPListener) handleConn(server *serverconfigs.ServerConfig, conn net
}) })
// 从客户端读取 // 从客户端读取
var clientBuffer = utils.BytePool16k.Get() var clientBuf = utils.BytePool16k.Get()
defer func() { defer func() {
utils.BytePool16k.Put(clientBuffer) utils.BytePool16k.Put(clientBuf)
}() }()
for { for {
// 是否已达到流量限制 // 是否已达到流量限制
@@ -225,9 +225,9 @@ func (this *TCPListener) handleConn(server *serverconfigs.ServerConfig, conn net
return nil return nil
} }
n, err := conn.Read(clientBuffer) n, err := conn.Read(clientBuf.Bytes)
if n > 0 { if n > 0 {
_, err = originConn.Write(clientBuffer[:n]) _, err = originConn.Write(clientBuf.Bytes[:n])
if err != nil { if err != nil {
break break
} }

View File

@@ -388,17 +388,17 @@ func NewUDPConn(server *serverconfigs.ServerConfig, clientAddr net.Addr, proxyLi
} }
goman.New(func() { goman.New(func() {
var buffer = utils.BytePool4k.Get() var buf = utils.BytePool4k.Get()
defer func() { defer func() {
utils.BytePool4k.Put(buffer) utils.BytePool4k.Put(buf)
}() }()
for { for {
n, err := serverConn.Read(buffer) n, err := serverConn.Read(buf.Bytes)
if n > 0 { if n > 0 {
conn.activatedAt = time.Now().Unix() conn.activatedAt = time.Now().Unix()
_, writingErr := proxyListener.WriteTo(buffer[:n], cm, clientAddr) _, writingErr := proxyListener.WriteTo(buf.Bytes[:n], cm, clientAddr)
if writingErr != nil { if writingErr != nil {
conn.isOk = false conn.isOk = false
break break

View File

@@ -9,6 +9,10 @@ var BytePool4k = NewBytePool(4 << 10)
var BytePool16k = NewBytePool(16 << 10) var BytePool16k = NewBytePool(16 << 10)
var BytePool32k = NewBytePool(32 << 10) var BytePool32k = NewBytePool(32 << 10)
type BytesBuf struct {
Bytes []byte
}
// BytePool pool for get byte slice // BytePool pool for get byte slice
type BytePool struct { type BytePool struct {
length int length int
@@ -24,20 +28,22 @@ func NewBytePool(length int) *BytePool {
length: length, length: length,
rawPool: &sync.Pool{ rawPool: &sync.Pool{
New: func() any { New: func() any {
return make([]byte, length) return &BytesBuf{
Bytes: make([]byte, length),
}
}, },
}, },
} }
} }
// Get 获取一个新的byte slice // Get 获取一个新的byte slice
func (this *BytePool) Get() []byte { func (this *BytePool) Get() *BytesBuf {
return this.rawPool.Get().([]byte) return this.rawPool.Get().(*BytesBuf)
} }
// Put 放回一个使用过的byte slice // Put 放回一个使用过的byte slice
func (this *BytePool) Put(b []byte) { func (this *BytePool) Put(ptr *BytesBuf) {
this.rawPool.Put(b) this.rawPool.Put(ptr)
} }
// Length 单个字节slice长度 // Length 单个字节slice长度

View File

@@ -14,7 +14,9 @@ func TestBytePool_Memory(t *testing.T) {
var pool = utils.NewBytePool(32 * 1024) var pool = utils.NewBytePool(32 * 1024)
for i := 0; i < 20480; i++ { for i := 0; i < 20480; i++ {
pool.Put(make([]byte, 32*1024)) pool.Put(&utils.BytesBuf{
Bytes: make([]byte, 32*1024),
})
} }
//pool.Purge() //pool.Purge()
@@ -75,14 +77,40 @@ func BenchmarkBytePool_Get_Sync(b *testing.B) {
}) })
} }
func BenchmarkBytePool_Copy(b *testing.B) { func BenchmarkBytePool_Get_Sync2(b *testing.B) {
var data = bytes.Repeat([]byte{'A'}, 8<<10) runtime.GOMAXPROCS(1)
var pool = &sync.Pool{ var pool = &sync.Pool{
New: func() any { New: func() any {
return make([]byte, 8<<10) return &utils.BytesBuf{
Bytes: make([]byte, 1024),
}
}, },
} }
b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var buf = pool.Get()
pool.Put(buf)
}
})
}
func BenchmarkBytePool_Copy_Bytes_4(b *testing.B) {
const size = 4 << 10
var data = bytes.Repeat([]byte{'A'}, size)
var pool = &sync.Pool{
New: func() any {
return make([]byte, size)
},
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
var buf = pool.Get().([]byte) var buf = pool.Get().([]byte)
@@ -91,3 +119,154 @@ func BenchmarkBytePool_Copy(b *testing.B) {
} }
}) })
} }
func BenchmarkBytePool_Copy_Wrapper_4(b *testing.B) {
const size = 4 << 10
var data = bytes.Repeat([]byte{'A'}, size)
var pool = &sync.Pool{
New: func() any {
return &utils.BytesBuf{
Bytes: make([]byte, size),
}
},
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var buf = pool.Get().(*utils.BytesBuf)
copy(buf.Bytes, data)
pool.Put(buf)
}
})
}
func BenchmarkBytePool_Copy_Bytes_16(b *testing.B) {
const size = 16 << 10
var data = bytes.Repeat([]byte{'A'}, size)
var pool = &sync.Pool{
New: func() any {
return make([]byte, size)
},
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var buf = pool.Get().([]byte)
copy(buf, data)
pool.Put(buf)
}
})
}
func BenchmarkBytePool_Copy_Wrapper_16(b *testing.B) {
const size = 16 << 10
var data = bytes.Repeat([]byte{'A'}, size)
var pool = &sync.Pool{
New: func() any {
return &utils.BytesBuf{
Bytes: make([]byte, size),
}
},
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var buf = pool.Get().(*utils.BytesBuf)
copy(buf.Bytes, data)
pool.Put(buf)
}
})
}
func BenchmarkBytePool_Copy_Wrapper_Buf_16(b *testing.B) {
const size = 16 << 10
var data = bytes.Repeat([]byte{'A'}, size)
var pool = &sync.Pool{
New: func() any {
return &utils.BytesBuf{
Bytes: make([]byte, size),
}
},
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var bytesPtr = pool.Get().(*utils.BytesBuf)
var buf = bytesPtr.Bytes
copy(buf, data)
pool.Put(bytesPtr)
}
})
}
func BenchmarkBytePool_Copy_Wrapper_BytePool_16(b *testing.B) {
const size = 16 << 10
var data = bytes.Repeat([]byte{'A'}, size)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var bytesPtr = utils.BytePool16k.Get()
copy(bytesPtr.Bytes, data)
utils.BytePool16k.Put(bytesPtr)
}
})
}
func BenchmarkBytePool_Copy_Bytes_32(b *testing.B) {
const size = 32 << 10
var data = bytes.Repeat([]byte{'A'}, size)
var pool = &sync.Pool{
New: func() any {
return make([]byte, size)
},
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var buf = pool.Get().([]byte)
copy(buf, data)
pool.Put(buf)
}
})
}
func BenchmarkBytePool_Copy_Wrapper_32(b *testing.B) {
const size = 32 << 10
var data = bytes.Repeat([]byte{'A'}, size)
var pool = &sync.Pool{
New: func() any {
return &utils.BytesBuf{
Bytes: make([]byte, size),
}
},
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var buf = pool.Get().(*utils.BytesBuf)
copy(buf.Bytes, data)
pool.Put(buf)
}
})
}

View File

@@ -118,11 +118,11 @@ func (this *BlockAction) Perform(waf *WAF, group *RuleGroup, set *RuleSet, reque
} }
} }
buf := utils.BytePool1k.Get() var buf = utils.BytePool1k.Get()
_, _ = io.CopyBuffer(writer, resp.Body, buf) _, _ = io.CopyBuffer(writer, resp.Body, buf.Bytes)
utils.BytePool1k.Put(buf) utils.BytePool1k.Put(buf)
} else { } else {
path := this.URL var path = this.URL
if !filepath.IsAbs(this.URL) { if !filepath.IsAbs(this.URL) {
path = Tea.Root + string(os.PathSeparator) + path path = Tea.Root + string(os.PathSeparator) + path
} }