From 2f458bbc1765e3f98805a6f4287714b787199124 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Mon, 15 Apr 2024 09:26:00 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=AD=97=E8=8A=82=E7=BC=93?= =?UTF-8?q?=E5=86=B2=E5=8C=BA=E7=9B=B8=E5=85=B3=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/caches/storage_file.go | 9 +- internal/nodes/http_request_cache.go | 18 +- internal/nodes/http_request_fastcgi.go | 6 +- internal/nodes/http_request_page.go | 2 +- internal/nodes/http_request_reverse_proxy.go | 12 +- internal/nodes/http_request_root.go | 6 +- internal/nodes/http_request_shutdown.go | 2 +- internal/nodes/http_request_url.go | 4 +- internal/nodes/http_request_websocket.go | 4 +- internal/nodes/http_writer.go | 4 +- internal/nodes/listener_tcp.go | 16 +- internal/nodes/listener_udp.go | 8 +- internal/utils/byte_pool.go | 16 +- internal/utils/byte_pool_test.go | 187 ++++++++++++++++++- internal/waf/action_block.go | 6 +- 15 files changed, 243 insertions(+), 57 deletions(-) diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 013c5d6..ab31607 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -1278,6 +1278,7 @@ func (this *FileStorage) hotLoop() { } var buf = utils.BytePool16k.Get() + defer utils.BytePool16k.Put(buf) for _, item := range result[:size] { reader, err := this.openReader(item.Key, false, false, false) @@ -1319,8 +1320,8 @@ func (this *FileStorage) hotLoop() { continue } - err = reader.ReadHeader(buf, func(n int) (goNext bool, err error) { - _, err = writer.WriteHeader(buf[:n]) + err = reader.ReadHeader(buf.Bytes, func(n int) (goNext bool, err error) { + _, err = writer.WriteHeader(buf.Bytes[:n]) return }) if err != nil { @@ -1329,10 +1330,10 @@ func (this *FileStorage) hotLoop() { continue } - err = reader.ReadBody(buf, func(n int) (goNext bool, err error) { + err = reader.ReadBody(buf.Bytes, func(n int) (goNext bool, err error) { goNext = true if n > 0 { - _, err = writer.Write(buf[:n]) + _, err = writer.Write(buf.Bytes[:n]) if err != nil { goNext = false } diff --git a/internal/nodes/http_request_cache.go b/internal/nodes/http_request_cache.go index 555483b..d37e611 100644 --- a/internal/nodes/http_request_cache.go +++ b/internal/nodes/http_request_cache.go @@ -342,8 +342,8 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { this.writer.SetSentHeaderBytes(reader.HeaderSize()) var headerPool = this.bytePool(reader.HeaderSize()) var headerBuf = headerPool.Get() - err = reader.ReadHeader(headerBuf, func(n int) (goNext bool, readErr error) { - headerData = append(headerData, headerBuf[:n]...) + err = reader.ReadHeader(headerBuf.Bytes, func(n int) (goNext bool, readErr error) { + headerData = append(headerData, headerBuf.Bytes[:n]...) for { var nIndex = bytes.Index(headerData, []byte{'\n'}) if nIndex >= 0 { @@ -501,8 +501,8 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { var pool = this.bytePool(fileSize) var bodyBuf = pool.Get() - err = reader.ReadBodyRange(bodyBuf, ranges[0].Start(), ranges[0].End(), func(n int) (goNext bool, readErr error) { - _, readErr = this.writer.Write(bodyBuf[:n]) + err = reader.ReadBodyRange(bodyBuf.Bytes, ranges[0].Start(), ranges[0].End(), func(n int) (goNext bool, readErr error) { + _, readErr = this.writer.Write(bodyBuf.Bytes[:n]) if readErr != nil { return false, errWritingToClient } @@ -557,8 +557,8 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { var pool = this.bytePool(fileSize) var bodyBuf = pool.Get() - err = reader.ReadBodyRange(bodyBuf, r.Start(), r.End(), func(n int) (goNext bool, readErr error) { - _, readErr = this.writer.Write(bodyBuf[:n]) + err = reader.ReadBodyRange(bodyBuf.Bytes, r.Start(), r.End(), func(n int) (goNext bool, readErr error) { + _, readErr = this.writer.Write(bodyBuf.Bytes[:n]) if readErr != nil { return false, errWritingToClient } @@ -592,9 +592,9 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { var pool = this.bytePool(fileSize) var bodyBuf = pool.Get() if fp, canSendFile := this.writer.canSendfile(); canSendFile { - this.writer.sentBodyBytes, err = io.CopyBuffer(this.writer.rawWriter, fp, bodyBuf) + this.writer.sentBodyBytes, err = io.CopyBuffer(this.writer.rawWriter, fp, bodyBuf.Bytes) } else { - _, err = io.CopyBuffer(this.writer, resp.Body, bodyBuf) + _, err = io.CopyBuffer(this.writer, resp.Body, bodyBuf.Bytes) } pool.Put(bodyBuf) } else { @@ -604,7 +604,7 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { } else { var pool = this.bytePool(fileSize) var bodyBuf = pool.Get() - _, err = io.CopyBuffer(this.writer, resp.Body, bodyBuf) + _, err = io.CopyBuffer(this.writer, resp.Body, bodyBuf.Bytes) pool.Put(bodyBuf) } } diff --git a/internal/nodes/http_request_fastcgi.go b/internal/nodes/http_request_fastcgi.go index f591060..6945c6e 100644 --- a/internal/nodes/http_request_fastcgi.go +++ b/internal/nodes/http_request_fastcgi.go @@ -206,9 +206,9 @@ func (this *HTTPRequest) doFastcgi() (shouldStop bool) { this.writer.WriteHeader(resp.StatusCode) // 输出到客户端 - pool := this.bytePool(resp.ContentLength) - buf := pool.Get() - _, err = io.CopyBuffer(this.writer, resp.Body, buf) + var pool = this.bytePool(resp.ContentLength) + var buf = pool.Get() + _, err = io.CopyBuffer(this.writer, resp.Body, buf.Bytes) pool.Put(buf) closeErr := resp.Body.Close() diff --git a/internal/nodes/http_request_page.go b/internal/nodes/http_request_page.go index 3b61aba..8579fb1 100644 --- a/internal/nodes/http_request_page.go +++ b/internal/nodes/http_request_page.go @@ -104,7 +104,7 @@ func (this *HTTPRequest) doPageLookup(pages []*serverconfigs.HTTPPageConfig, sta this.writer.WriteHeader(status) } var buf = utils.BytePool1k.Get() - _, err = utils.CopyWithFilter(this.writer, fp, buf, func(p []byte) []byte { + _, err = utils.CopyWithFilter(this.writer, fp, buf.Bytes, func(p []byte) []byte { return []byte(this.Format(string(p))) }) utils.BytePool1k.Put(buf) diff --git a/internal/nodes/http_request_reverse_proxy.go b/internal/nodes/http_request_reverse_proxy.go index 4bc6638..f9dce1e 100644 --- a/internal/nodes/http_request_reverse_proxy.go +++ b/internal/nodes/http_request_reverse_proxy.go @@ -547,7 +547,7 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId if resp.ContentLength == 0 && len(resp.TransferEncoding) == 0 { // 即使内容为0,也需要读取一次,以便于触发相关事件 var buf = utils.BytePool4k.Get() - _, _ = io.CopyBuffer(this.writer, resp.Body, buf) + _, _ = io.CopyBuffer(this.writer, resp.Body, buf.Bytes) utils.BytePool4k.Put(buf) _ = resp.Body.Close() respBodyIsClosed = true @@ -562,9 +562,9 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId var err error if shouldAutoFlush { for { - n, readErr := resp.Body.Read(buf) + n, readErr := resp.Body.Read(buf.Bytes) if n > 0 { - _, err = this.writer.Write(buf[:n]) + _, err = this.writer.Write(buf.Bytes[:n]) this.writer.Flush() if err != nil { break @@ -582,10 +582,10 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId resp.ContentLength < (128<<20) { // TODO configure max content-length in cache policy OR CacheRef var requestIsCanceled = false for { - n, readErr := resp.Body.Read(buf) + n, readErr := resp.Body.Read(buf.Bytes) if n > 0 && !requestIsCanceled { - _, err = this.writer.Write(buf[:n]) + _, err = this.writer.Write(buf.Bytes[:n]) if err != nil { requestIsCanceled = true } @@ -596,7 +596,7 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId } } } else { - _, err = io.CopyBuffer(this.writer, resp.Body, buf) + _, err = io.CopyBuffer(this.writer, resp.Body, buf.Bytes) } } pool.Put(buf) diff --git a/internal/nodes/http_request_root.go b/internal/nodes/http_request_root.go index c693541..15ccbf8 100644 --- a/internal/nodes/http_request_root.go +++ b/internal/nodes/http_request_root.go @@ -327,7 +327,7 @@ func (this *HTTPRequest) doRoot() (isBreak bool) { respHeader.Set("Content-Range", ranges[0].ComposeContentRangeHeader(types.String(fileSize))) this.writer.WriteHeader(http.StatusPartialContent) - ok, err := httpRequestReadRange(resp.Body, buf, ranges[0].Start(), ranges[0].End(), func(buf []byte, n int) error { + ok, err := httpRequestReadRange(resp.Body, buf.Bytes, ranges[0].Start(), ranges[0].End(), func(buf []byte, n int) error { _, err := this.writer.Write(buf[:n]) return err }) @@ -379,7 +379,7 @@ func (this *HTTPRequest) doRoot() (isBreak bool) { } } - ok, err := httpRequestReadRange(resp.Body, buf, r.Start(), r.End(), func(buf []byte, n int) error { + ok, err := httpRequestReadRange(resp.Body, buf.Bytes, r.Start(), r.End(), func(buf []byte, n int) error { _, err := this.writer.Write(buf[:n]) return err }) @@ -404,7 +404,7 @@ func (this *HTTPRequest) doRoot() (isBreak bool) { return true } } else { - _, err = io.CopyBuffer(this.writer, resp.Body, buf) + _, err = io.CopyBuffer(this.writer, resp.Body, buf.Bytes) if err != nil { if !this.canIgnore(err) { logs.Error(err) diff --git a/internal/nodes/http_request_shutdown.go b/internal/nodes/http_request_shutdown.go index 3fc19fc..62131cf 100644 --- a/internal/nodes/http_request_shutdown.go +++ b/internal/nodes/http_request_shutdown.go @@ -69,7 +69,7 @@ func (this *HTTPRequest) doShutdown() { this.writer.WriteHeader(http.StatusOK) } var buf = utils.BytePool1k.Get() - _, err = utils.CopyWithFilter(this.writer, fp, buf, func(p []byte) []byte { + _, err = utils.CopyWithFilter(this.writer, fp, buf.Bytes, func(p []byte) []byte { return []byte(this.Format(string(p))) }) utils.BytePool1k.Put(buf) diff --git a/internal/nodes/http_request_url.go b/internal/nodes/http_request_url.go index 6baea82..a4b9ced 100644 --- a/internal/nodes/http_request_url.go +++ b/internal/nodes/http_request_url.go @@ -70,11 +70,11 @@ func (this *HTTPRequest) doURL(method string, url string, host string, statusCod var pool = this.bytePool(resp.ContentLength) var buf = pool.Get() if supportVariables { - _, err = utils.CopyWithFilter(this.writer, resp.Body, buf, func(p []byte) []byte { + _, err = utils.CopyWithFilter(this.writer, resp.Body, buf.Bytes, func(p []byte) []byte { return []byte(this.Format(string(p))) }) } else { - _, err = io.CopyBuffer(this.writer, resp.Body, buf) + _, err = io.CopyBuffer(this.writer, resp.Body, buf.Bytes) } pool.Put(buf) diff --git a/internal/nodes/http_request_websocket.go b/internal/nodes/http_request_websocket.go index 6d58aa3..0de6d7f 100644 --- a/internal/nodes/http_request_websocket.go +++ b/internal/nodes/http_request_websocket.go @@ -181,10 +181,10 @@ func (this *HTTPRequest) doWebsocket(requestHost string, isLastRetry bool) (shou var buf = utils.BytePool4k.Get() defer utils.BytePool4k.Put(buf) for { - n, err := originConn.Read(buf) + n, err := originConn.Read(buf.Bytes) if n > 0 { this.writer.sentBodyBytes += int64(n) - _, err = clientConn.Write(buf[:n]) + _, err = clientConn.Write(buf.Bytes[:n]) if err != nil { break } diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index 700b81a..ac12403 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -882,7 +882,7 @@ func (this *HTTPWriter) SendFile(status int, path string) (int64, error) { var buf = bufPool.Get() defer bufPool.Put(buf) - written, err := io.CopyBuffer(this, fp, buf) + written, err := io.CopyBuffer(this, fp, buf.Bytes) if err != nil { return written, err } @@ -903,7 +903,7 @@ func (this *HTTPWriter) SendResp(resp *http.Response) (int64, error) { var buf = bufPool.Get() defer bufPool.Put(buf) - return io.CopyBuffer(this, resp.Body, buf) + return io.CopyBuffer(this, resp.Body, buf.Bytes) } // Redirect 跳转 diff --git a/internal/nodes/listener_tcp.go b/internal/nodes/listener_tcp.go index cd0661c..2e6dc2b 100644 --- a/internal/nodes/listener_tcp.go +++ b/internal/nodes/listener_tcp.go @@ -188,14 +188,14 @@ func (this *TCPListener) handleConn(server *serverconfigs.ServerConfig, conn net // 从源站读取 goman.New(func() { - var originBuffer = utils.BytePool16k.Get() + var originBuf = utils.BytePool16k.Get() defer func() { - utils.BytePool16k.Put(originBuffer) + utils.BytePool16k.Put(originBuf) }() for { - n, err := originConn.Read(originBuffer) + n, err := originConn.Read(originBuf.Bytes) if n > 0 { - _, err = conn.Write(originBuffer[:n]) + _, err = conn.Write(originBuf.Bytes[:n]) if err != nil { closer() break @@ -214,9 +214,9 @@ func (this *TCPListener) handleConn(server *serverconfigs.ServerConfig, conn net }) // 从客户端读取 - var clientBuffer = utils.BytePool16k.Get() + var clientBuf = utils.BytePool16k.Get() defer func() { - utils.BytePool16k.Put(clientBuffer) + utils.BytePool16k.Put(clientBuf) }() for { // 是否已达到流量限制 @@ -225,9 +225,9 @@ func (this *TCPListener) handleConn(server *serverconfigs.ServerConfig, conn net return nil } - n, err := conn.Read(clientBuffer) + n, err := conn.Read(clientBuf.Bytes) if n > 0 { - _, err = originConn.Write(clientBuffer[:n]) + _, err = originConn.Write(clientBuf.Bytes[:n]) if err != nil { break } diff --git a/internal/nodes/listener_udp.go b/internal/nodes/listener_udp.go index 7c8d7cd..2923dcf 100644 --- a/internal/nodes/listener_udp.go +++ b/internal/nodes/listener_udp.go @@ -388,17 +388,17 @@ func NewUDPConn(server *serverconfigs.ServerConfig, clientAddr net.Addr, proxyLi } goman.New(func() { - var buffer = utils.BytePool4k.Get() + var buf = utils.BytePool4k.Get() defer func() { - utils.BytePool4k.Put(buffer) + utils.BytePool4k.Put(buf) }() for { - n, err := serverConn.Read(buffer) + n, err := serverConn.Read(buf.Bytes) if n > 0 { conn.activatedAt = time.Now().Unix() - _, writingErr := proxyListener.WriteTo(buffer[:n], cm, clientAddr) + _, writingErr := proxyListener.WriteTo(buf.Bytes[:n], cm, clientAddr) if writingErr != nil { conn.isOk = false break diff --git a/internal/utils/byte_pool.go b/internal/utils/byte_pool.go index 285cec0..65085c0 100644 --- a/internal/utils/byte_pool.go +++ b/internal/utils/byte_pool.go @@ -9,6 +9,10 @@ var BytePool4k = NewBytePool(4 << 10) var BytePool16k = NewBytePool(16 << 10) var BytePool32k = NewBytePool(32 << 10) +type BytesBuf struct { + Bytes []byte +} + // BytePool pool for get byte slice type BytePool struct { length int @@ -24,20 +28,22 @@ func NewBytePool(length int) *BytePool { length: length, rawPool: &sync.Pool{ New: func() any { - return make([]byte, length) + return &BytesBuf{ + Bytes: make([]byte, length), + } }, }, } } // Get 获取一个新的byte slice -func (this *BytePool) Get() []byte { - return this.rawPool.Get().([]byte) +func (this *BytePool) Get() *BytesBuf { + return this.rawPool.Get().(*BytesBuf) } // Put 放回一个使用过的byte slice -func (this *BytePool) Put(b []byte) { - this.rawPool.Put(b) +func (this *BytePool) Put(ptr *BytesBuf) { + this.rawPool.Put(ptr) } // Length 单个字节slice长度 diff --git a/internal/utils/byte_pool_test.go b/internal/utils/byte_pool_test.go index ed13e7d..4179411 100644 --- a/internal/utils/byte_pool_test.go +++ b/internal/utils/byte_pool_test.go @@ -14,7 +14,9 @@ func TestBytePool_Memory(t *testing.T) { var pool = utils.NewBytePool(32 * 1024) for i := 0; i < 20480; i++ { - pool.Put(make([]byte, 32*1024)) + pool.Put(&utils.BytesBuf{ + Bytes: make([]byte, 32*1024), + }) } //pool.Purge() @@ -75,14 +77,40 @@ func BenchmarkBytePool_Get_Sync(b *testing.B) { }) } -func BenchmarkBytePool_Copy(b *testing.B) { - var data = bytes.Repeat([]byte{'A'}, 8<<10) +func BenchmarkBytePool_Get_Sync2(b *testing.B) { + runtime.GOMAXPROCS(1) var pool = &sync.Pool{ New: func() any { - return make([]byte, 8<<10) + return &utils.BytesBuf{ + Bytes: make([]byte, 1024), + } }, } + + b.ReportAllocs() + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + var buf = pool.Get() + pool.Put(buf) + } + }) +} + +func BenchmarkBytePool_Copy_Bytes_4(b *testing.B) { + const size = 4 << 10 + + var data = bytes.Repeat([]byte{'A'}, size) + + var pool = &sync.Pool{ + New: func() any { + return make([]byte, size) + }, + } + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { for pb.Next() { var buf = pool.Get().([]byte) @@ -91,3 +119,154 @@ func BenchmarkBytePool_Copy(b *testing.B) { } }) } + +func BenchmarkBytePool_Copy_Wrapper_4(b *testing.B) { + const size = 4 << 10 + + var data = bytes.Repeat([]byte{'A'}, size) + + var pool = &sync.Pool{ + New: func() any { + return &utils.BytesBuf{ + Bytes: make([]byte, size), + } + }, + } + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + var buf = pool.Get().(*utils.BytesBuf) + copy(buf.Bytes, data) + pool.Put(buf) + } + }) +} + +func BenchmarkBytePool_Copy_Bytes_16(b *testing.B) { + const size = 16 << 10 + + var data = bytes.Repeat([]byte{'A'}, size) + + var pool = &sync.Pool{ + New: func() any { + return make([]byte, size) + }, + } + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + var buf = pool.Get().([]byte) + copy(buf, data) + pool.Put(buf) + } + }) +} + +func BenchmarkBytePool_Copy_Wrapper_16(b *testing.B) { + const size = 16 << 10 + + var data = bytes.Repeat([]byte{'A'}, size) + + var pool = &sync.Pool{ + New: func() any { + return &utils.BytesBuf{ + Bytes: make([]byte, size), + } + }, + } + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + var buf = pool.Get().(*utils.BytesBuf) + copy(buf.Bytes, data) + pool.Put(buf) + } + }) +} + +func BenchmarkBytePool_Copy_Wrapper_Buf_16(b *testing.B) { + const size = 16 << 10 + + var data = bytes.Repeat([]byte{'A'}, size) + + var pool = &sync.Pool{ + New: func() any { + return &utils.BytesBuf{ + Bytes: make([]byte, size), + } + }, + } + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + var bytesPtr = pool.Get().(*utils.BytesBuf) + var buf = bytesPtr.Bytes + copy(buf, data) + pool.Put(bytesPtr) + } + }) +} + +func BenchmarkBytePool_Copy_Wrapper_BytePool_16(b *testing.B) { + const size = 16 << 10 + + var data = bytes.Repeat([]byte{'A'}, size) + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + var bytesPtr = utils.BytePool16k.Get() + copy(bytesPtr.Bytes, data) + utils.BytePool16k.Put(bytesPtr) + } + }) +} + +func BenchmarkBytePool_Copy_Bytes_32(b *testing.B) { + const size = 32 << 10 + + var data = bytes.Repeat([]byte{'A'}, size) + + var pool = &sync.Pool{ + New: func() any { + return make([]byte, size) + }, + } + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + var buf = pool.Get().([]byte) + copy(buf, data) + pool.Put(buf) + } + }) +} + +func BenchmarkBytePool_Copy_Wrapper_32(b *testing.B) { + const size = 32 << 10 + + var data = bytes.Repeat([]byte{'A'}, size) + + var pool = &sync.Pool{ + New: func() any { + return &utils.BytesBuf{ + Bytes: make([]byte, size), + } + }, + } + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + var buf = pool.Get().(*utils.BytesBuf) + copy(buf.Bytes, data) + pool.Put(buf) + } + }) +} diff --git a/internal/waf/action_block.go b/internal/waf/action_block.go index 5390431..797f4f4 100644 --- a/internal/waf/action_block.go +++ b/internal/waf/action_block.go @@ -118,11 +118,11 @@ func (this *BlockAction) Perform(waf *WAF, group *RuleGroup, set *RuleSet, reque } } - buf := utils.BytePool1k.Get() - _, _ = io.CopyBuffer(writer, resp.Body, buf) + var buf = utils.BytePool1k.Get() + _, _ = io.CopyBuffer(writer, resp.Body, buf.Bytes) utils.BytePool1k.Put(buf) } else { - path := this.URL + var path = this.URL if !filepath.IsAbs(this.URL) { path = Tea.Root + string(os.PathSeparator) + path }