diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 250c08d..a03f225 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -14,6 +14,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/TeaOSLab/EdgeNode/internal/utils/bytepool" "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem" @@ -1285,9 +1286,9 @@ func (this *FileStorage) hotLoop() { size = len(result) / 10 } - var buf = utils.BytePool16k.Get() + var buf = bytepool.Pool16k.Get() - defer utils.BytePool16k.Put(buf) + defer bytepool.Pool16k.Put(buf) for _, item := range result[:size] { reader, err := this.openReader(item.Key, false, false, false) if err != nil { diff --git a/internal/nodes/http_cache_task_manager.go b/internal/nodes/http_cache_task_manager.go index b6389ee..c87fede 100644 --- a/internal/nodes/http_cache_task_manager.go +++ b/internal/nodes/http_cache_task_manager.go @@ -16,7 +16,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" - "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/TeaOSLab/EdgeNode/internal/utils/bytepool" connutils "github.com/TeaOSLab/EdgeNode/internal/utils/conns" "github.com/iwind/TeaGo/Tea" "io" @@ -246,9 +246,9 @@ func (this *HTTPCacheTaskManager) fetchKey(key *pb.HTTPCacheTaskKey) error { } // 读取内容,以便于生成缓存 - var buf = utils.BytePool16k.Get() + var buf = bytepool.Pool16k.Get() _, err = io.CopyBuffer(io.Discard, resp.Body, buf.Bytes) - utils.BytePool16k.Put(buf) + bytepool.Pool16k.Put(buf) if err != nil { if err != io.EOF { err = this.simplifyErr(err) diff --git a/internal/nodes/http_request.go b/internal/nodes/http_request.go index de7ba15..3a146a1 100644 --- a/internal/nodes/http_request.go +++ b/internal/nodes/http_request.go @@ -13,6 +13,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/metrics" "github.com/TeaOSLab/EdgeNode/internal/stats" "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/TeaOSLab/EdgeNode/internal/utils/bytepool" "github.com/iwind/TeaGo/lists" "github.com/iwind/TeaGo/maps" "github.com/iwind/TeaGo/types" @@ -1985,20 +1986,17 @@ func (this *HTTPRequest) addError(err error) { } // 计算合适的buffer size -func (this *HTTPRequest) bytePool(contentLength int64) *utils.BytePool { +func (this *HTTPRequest) bytePool(contentLength int64) *bytepool.Pool { if contentLength < 0 { - return utils.BytePool16k + return bytepool.Pool16k } - if contentLength < 8192 { // 8K - return utils.BytePool1k + if contentLength < 8192 { // < 8K + return bytepool.Pool1k } - if contentLength < 32768 { // 32K - return utils.BytePool16k + if contentLength < 32768 { // < 32K + return bytepool.Pool16k } - if contentLength < 131072 { // 128K - return utils.BytePool32k - } - return utils.BytePool32k + return bytepool.Pool32k } // 检查是否可以忽略错误 @@ -2008,20 +2006,21 @@ func (this *HTTPRequest) canIgnore(err error) bool { } // 已读到头 - if err == io.EOF || err == io.ErrUnexpectedEOF { + if err == io.EOF || errors.Is(err, io.ErrUnexpectedEOF) { return true } // 网络错误 - _, ok := err.(*net.OpError) + var opErr *net.OpError + ok := errors.As(err, &opErr) if ok { return true } // 客户端主动取消 - if err == errWritingToClient || - err == context.Canceled || - err == io.ErrShortWrite || + if errors.Is(err, errWritingToClient) || + errors.Is(err, context.Canceled) || + errors.Is(err, io.ErrShortWrite) || strings.Contains(err.Error(), "write: connection") || strings.Contains(err.Error(), "write: broken pipe") || strings.Contains(err.Error(), "write tcp") { diff --git a/internal/nodes/http_request_page.go b/internal/nodes/http_request_page.go index 8f19ccd..b3a4a6f 100644 --- a/internal/nodes/http_request_page.go +++ b/internal/nodes/http_request_page.go @@ -4,6 +4,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/TeaOSLab/EdgeNode/internal/utils/bytepool" "github.com/iwind/TeaGo/Tea" "net/http" "os" @@ -103,11 +104,11 @@ func (this *HTTPRequest) doPageLookup(pages []*serverconfigs.HTTPPageConfig, sta this.writer.Prepare(nil, stat.Size(), status, true) this.writer.WriteHeader(status) } - var buf = utils.BytePool1k.Get() + var buf = bytepool.Pool1k.Get() _, err = utils.CopyWithFilter(this.writer, fp, buf.Bytes, func(p []byte) []byte { return []byte(this.Format(string(p))) }) - utils.BytePool1k.Put(buf) + bytepool.Pool1k.Put(buf) if err != nil { if !this.canIgnore(err) { remotelogs.Warn("HTTP_REQUEST_PAGE", "write to client failed: "+err.Error()) diff --git a/internal/nodes/http_request_reverse_proxy.go b/internal/nodes/http_request_reverse_proxy.go index 79c1153..7e5650d 100644 --- a/internal/nodes/http_request_reverse_proxy.go +++ b/internal/nodes/http_request_reverse_proxy.go @@ -8,6 +8,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/compressions" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/TeaOSLab/EdgeNode/internal/utils/bytepool" "github.com/TeaOSLab/EdgeNode/internal/utils/fnv" "github.com/TeaOSLab/EdgeNode/internal/utils/minifiers" "github.com/iwind/TeaGo/lists" @@ -581,9 +582,9 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId // 是否有内容 if resp.ContentLength == 0 && len(resp.TransferEncoding) == 0 { // 即使内容为0,也需要读取一次,以便于触发相关事件 - var buf = utils.BytePool4k.Get() + var buf = bytepool.Pool4k.Get() _, _ = io.CopyBuffer(this.writer, resp.Body, buf.Bytes) - utils.BytePool4k.Put(buf) + bytepool.Pool4k.Put(buf) _ = resp.Body.Close() respBodyIsClosed = true diff --git a/internal/nodes/http_request_shutdown.go b/internal/nodes/http_request_shutdown.go index 62131cf..444dc16 100644 --- a/internal/nodes/http_request_shutdown.go +++ b/internal/nodes/http_request_shutdown.go @@ -4,6 +4,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/TeaOSLab/EdgeNode/internal/utils/bytepool" "github.com/iwind/TeaGo/Tea" "net/http" "os" @@ -68,11 +69,11 @@ func (this *HTTPRequest) doShutdown() { this.ProcessResponseHeaders(this.writer.Header(), http.StatusOK) this.writer.WriteHeader(http.StatusOK) } - var buf = utils.BytePool1k.Get() + var buf = bytepool.Pool1k.Get() _, err = utils.CopyWithFilter(this.writer, fp, buf.Bytes, func(p []byte) []byte { return []byte(this.Format(string(p))) }) - utils.BytePool1k.Put(buf) + bytepool.Pool1k.Put(buf) if err != nil { if !this.canIgnore(err) { remotelogs.Warn("HTTP_REQUEST_SHUTDOWN", "write to client failed: "+err.Error()) diff --git a/internal/nodes/http_request_websocket.go b/internal/nodes/http_request_websocket.go index 0a70485..6c89d77 100644 --- a/internal/nodes/http_request_websocket.go +++ b/internal/nodes/http_request_websocket.go @@ -4,7 +4,7 @@ import ( "bufio" "bytes" "errors" - "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/TeaOSLab/EdgeNode/internal/utils/bytepool" "io" "net/http" "net/url" @@ -178,8 +178,8 @@ func (this *HTTPRequest) doWebsocket(requestHost string, isLastRetry bool) (shou } // 复制剩余的数据 - var buf = utils.BytePool4k.Get() - defer utils.BytePool4k.Put(buf) + var buf = bytepool.Pool4k.Get() + defer bytepool.Pool4k.Put(buf) for { n, readErr := originConn.Read(buf.Bytes) if n > 0 { @@ -197,9 +197,9 @@ func (this *HTTPRequest) doWebsocket(requestHost string, isLastRetry bool) (shou _ = originConn.Close() }() - var buf = utils.BytePool4k.Get() + var buf = bytepool.Pool4k.Get() _, _ = io.CopyBuffer(originConn, clientConn, buf.Bytes) - utils.BytePool4k.Put(buf) + bytepool.Pool4k.Put(buf) return } diff --git a/internal/nodes/listener_tcp.go b/internal/nodes/listener_tcp.go index 2e6dc2b..9e360cf 100644 --- a/internal/nodes/listener_tcp.go +++ b/internal/nodes/listener_tcp.go @@ -8,7 +8,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/stats" - "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/TeaOSLab/EdgeNode/internal/utils/bytepool" "github.com/iwind/TeaGo/types" "github.com/pires/go-proxyproto" "net" @@ -188,9 +188,9 @@ func (this *TCPListener) handleConn(server *serverconfigs.ServerConfig, conn net // 从源站读取 goman.New(func() { - var originBuf = utils.BytePool16k.Get() + var originBuf = bytepool.Pool16k.Get() defer func() { - utils.BytePool16k.Put(originBuf) + bytepool.Pool16k.Put(originBuf) }() for { n, err := originConn.Read(originBuf.Bytes) @@ -214,9 +214,9 @@ func (this *TCPListener) handleConn(server *serverconfigs.ServerConfig, conn net }) // 从客户端读取 - var clientBuf = utils.BytePool16k.Get() + var clientBuf = bytepool.Pool16k.Get() defer func() { - utils.BytePool16k.Put(clientBuf) + bytepool.Pool16k.Put(clientBuf) }() for { // 是否已达到流量限制 diff --git a/internal/nodes/listener_udp.go b/internal/nodes/listener_udp.go index 927da5f..99c96b8 100644 --- a/internal/nodes/listener_udp.go +++ b/internal/nodes/listener_udp.go @@ -9,6 +9,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/stats" "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/TeaOSLab/EdgeNode/internal/utils/bytepool" "github.com/iwind/TeaGo/types" "github.com/pires/go-proxyproto" "golang.org/x/net/ipv4" @@ -400,9 +401,9 @@ func NewUDPConn(server *serverconfigs.ServerConfig, clientAddr net.Addr, proxyLi } goman.New(func() { - var buf = utils.BytePool4k.Get() + var buf = bytepool.Pool4k.Get() defer func() { - utils.BytePool4k.Put(buf) + bytepool.Pool4k.Put(buf) }() for { diff --git a/internal/utils/byte_pool.go b/internal/utils/byte_pool.go deleted file mode 100644 index 65085c0..0000000 --- a/internal/utils/byte_pool.go +++ /dev/null @@ -1,52 +0,0 @@ -package utils - -import ( - "sync" -) - -var BytePool1k = NewBytePool(1 << 10) -var BytePool4k = NewBytePool(4 << 10) -var BytePool16k = NewBytePool(16 << 10) -var BytePool32k = NewBytePool(32 << 10) - -type BytesBuf struct { - Bytes []byte -} - -// BytePool pool for get byte slice -type BytePool struct { - length int - rawPool *sync.Pool -} - -// NewBytePool 创建新对象 -func NewBytePool(length int) *BytePool { - if length < 0 { - length = 1024 - } - return &BytePool{ - length: length, - rawPool: &sync.Pool{ - New: func() any { - return &BytesBuf{ - Bytes: make([]byte, length), - } - }, - }, - } -} - -// Get 获取一个新的byte slice -func (this *BytePool) Get() *BytesBuf { - return this.rawPool.Get().(*BytesBuf) -} - -// Put 放回一个使用过的byte slice -func (this *BytePool) Put(ptr *BytesBuf) { - this.rawPool.Put(ptr) -} - -// Length 单个字节slice长度 -func (this *BytePool) Length() int { - return this.length -} diff --git a/internal/utils/bytepool/byte_pool.go b/internal/utils/bytepool/byte_pool.go new file mode 100644 index 0000000..a2ddce9 --- /dev/null +++ b/internal/utils/bytepool/byte_pool.go @@ -0,0 +1,52 @@ +package bytepool + +import ( + "sync" +) + +var Pool1k = NewPool(1 << 10) +var Pool4k = NewPool(4 << 10) +var Pool16k = NewPool(16 << 10) +var Pool32k = NewPool(32 << 10) + +type Buf struct { + Bytes []byte +} + +// Pool for get byte slice +type Pool struct { + length int + rawPool *sync.Pool +} + +// NewPool 创建新对象 +func NewPool(length int) *Pool { + if length < 0 { + length = 1024 + } + return &Pool{ + length: length, + rawPool: &sync.Pool{ + New: func() any { + return &Buf{ + Bytes: make([]byte, length), + } + }, + }, + } +} + +// Get 获取一个新的byte slice +func (this *Pool) Get() *Buf { + return this.rawPool.Get().(*Buf) +} + +// Put 放回一个使用过的byte slice +func (this *Pool) Put(ptr *Buf) { + this.rawPool.Put(ptr) +} + +// Length 单个字节slice长度 +func (this *Pool) Length() int { + return this.length +} diff --git a/internal/utils/byte_pool_test.go b/internal/utils/bytepool/byte_pool_test.go similarity index 87% rename from internal/utils/byte_pool_test.go rename to internal/utils/bytepool/byte_pool_test.go index 4179411..560c1df 100644 --- a/internal/utils/byte_pool_test.go +++ b/internal/utils/bytepool/byte_pool_test.go @@ -1,8 +1,8 @@ -package utils_test +package bytepool_test import ( "bytes" - "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/TeaOSLab/EdgeNode/internal/utils/bytepool" "runtime" "sync" "testing" @@ -12,9 +12,9 @@ func TestBytePool_Memory(t *testing.T) { var stat1 = &runtime.MemStats{} runtime.ReadMemStats(stat1) - var pool = utils.NewBytePool(32 * 1024) + var pool = bytepool.NewPool(32 * 1024) for i := 0; i < 20480; i++ { - pool.Put(&utils.BytesBuf{ + pool.Put(&bytepool.Buf{ Bytes: make([]byte, 32*1024), }) } @@ -33,7 +33,7 @@ func TestBytePool_Memory(t *testing.T) { func BenchmarkBytePool_Get(b *testing.B) { runtime.GOMAXPROCS(1) - var pool = utils.NewBytePool(1) + var pool = bytepool.NewPool(1) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -46,7 +46,7 @@ func BenchmarkBytePool_Get(b *testing.B) { func BenchmarkBytePool_Get_Parallel(b *testing.B) { runtime.GOMAXPROCS(1) - var pool = utils.NewBytePool(1024) + var pool = bytepool.NewPool(1024) b.ResetTimer() b.RunParallel(func(pb *testing.PB) { @@ -82,7 +82,7 @@ func BenchmarkBytePool_Get_Sync2(b *testing.B) { var pool = &sync.Pool{ New: func() any { - return &utils.BytesBuf{ + return &bytepool.Buf{ Bytes: make([]byte, 1024), } }, @@ -127,7 +127,7 @@ func BenchmarkBytePool_Copy_Wrapper_4(b *testing.B) { var pool = &sync.Pool{ New: func() any { - return &utils.BytesBuf{ + return &bytepool.Buf{ Bytes: make([]byte, size), } }, @@ -136,7 +136,7 @@ func BenchmarkBytePool_Copy_Wrapper_4(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { - var buf = pool.Get().(*utils.BytesBuf) + var buf = pool.Get().(*bytepool.Buf) copy(buf.Bytes, data) pool.Put(buf) } @@ -171,7 +171,7 @@ func BenchmarkBytePool_Copy_Wrapper_16(b *testing.B) { var pool = &sync.Pool{ New: func() any { - return &utils.BytesBuf{ + return &bytepool.Buf{ Bytes: make([]byte, size), } }, @@ -180,7 +180,7 @@ func BenchmarkBytePool_Copy_Wrapper_16(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { - var buf = pool.Get().(*utils.BytesBuf) + var buf = pool.Get().(*bytepool.Buf) copy(buf.Bytes, data) pool.Put(buf) } @@ -194,7 +194,7 @@ func BenchmarkBytePool_Copy_Wrapper_Buf_16(b *testing.B) { var pool = &sync.Pool{ New: func() any { - return &utils.BytesBuf{ + return &bytepool.Buf{ Bytes: make([]byte, size), } }, @@ -203,7 +203,7 @@ func BenchmarkBytePool_Copy_Wrapper_Buf_16(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { - var bytesPtr = pool.Get().(*utils.BytesBuf) + var bytesPtr = pool.Get().(*bytepool.Buf) var buf = bytesPtr.Bytes copy(buf, data) pool.Put(bytesPtr) @@ -220,9 +220,9 @@ func BenchmarkBytePool_Copy_Wrapper_BytePool_16(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { - var bytesPtr = utils.BytePool16k.Get() + var bytesPtr = bytepool.Pool16k.Get() copy(bytesPtr.Bytes, data) - utils.BytePool16k.Put(bytesPtr) + bytepool.Pool16k.Put(bytesPtr) } }) } @@ -255,7 +255,7 @@ func BenchmarkBytePool_Copy_Wrapper_32(b *testing.B) { var pool = &sync.Pool{ New: func() any { - return &utils.BytesBuf{ + return &bytepool.Buf{ Bytes: make([]byte, size), } }, @@ -264,7 +264,7 @@ func BenchmarkBytePool_Copy_Wrapper_32(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { - var buf = pool.Get().(*utils.BytesBuf) + var buf = pool.Get().(*bytepool.Buf) copy(buf.Bytes, data) pool.Put(buf) } diff --git a/internal/waf/action_block.go b/internal/waf/action_block.go index 797f4f4..0e0ef6b 100644 --- a/internal/waf/action_block.go +++ b/internal/waf/action_block.go @@ -4,6 +4,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/TeaOSLab/EdgeNode/internal/utils/bytepool" "github.com/TeaOSLab/EdgeNode/internal/waf/requests" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/logs" @@ -118,9 +119,9 @@ func (this *BlockAction) Perform(waf *WAF, group *RuleGroup, set *RuleSet, reque } } - var buf = utils.BytePool1k.Get() + var buf = bytepool.Pool1k.Get() _, _ = io.CopyBuffer(writer, resp.Body, buf.Bytes) - utils.BytePool1k.Put(buf) + bytepool.Pool1k.Put(buf) } else { var path = this.URL if !filepath.IsAbs(this.URL) { diff --git a/internal/waf/action_post_307.go b/internal/waf/action_post_307.go index 00f3b26..718f5e5 100644 --- a/internal/waf/action_post_307.go +++ b/internal/waf/action_post_307.go @@ -3,6 +3,7 @@ package waf import ( "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/TeaOSLab/EdgeNode/internal/utils/bytepool" "github.com/TeaOSLab/EdgeNode/internal/waf/requests" "github.com/iwind/TeaGo/types" "io" @@ -119,9 +120,9 @@ func (this *Post307Action) Perform(waf *WAF, group *RuleGroup, set *RuleSet, req // 清空请求内容 var req = request.WAFRaw() if req.ContentLength > 0 && req.Body != nil { - var buf = utils.BytePool16k.Get() + var buf = bytepool.Pool16k.Get() _, _ = io.CopyBuffer(io.Discard, req.Body, buf.Bytes) - utils.BytePool16k.Put(buf) + bytepool.Pool16k.Put(buf) _ = req.Body.Close() }