diff --git a/internal/nodes/http_client_pool.go b/internal/nodes/http_client_pool.go index bc626ff..c3cf403 100644 --- a/internal/nodes/http_client_pool.go +++ b/internal/nodes/http_client_pool.go @@ -146,7 +146,7 @@ func (this *HTTPClientPool) Client(req *HTTPRequest, return nil, err } - return conn, nil + return NewOriginConn(conn), nil }, MaxIdleConns: 0, MaxIdleConnsPerHost: idleConns, diff --git a/internal/nodes/http_request_reverse_proxy.go b/internal/nodes/http_request_reverse_proxy.go index 9cb4512..0256631 100644 --- a/internal/nodes/http_request_reverse_proxy.go +++ b/internal/nodes/http_request_reverse_proxy.go @@ -524,7 +524,28 @@ func (this *HTTPRequest) doOriginRequest(failedOriginIds []int64, failedLnNodeId } } } else { - _, err = io.CopyBuffer(this.writer, resp.Body, buf) + if this.cacheRef != nil && + this.cacheRef.EnableReadingOriginAsync && + resp.ContentLength > 0 && + resp.ContentLength < (128<<20) { // TODO configure max content-length in cache policy OR CacheRef + var requestIsCanceled = false + for { + n, readErr := resp.Body.Read(buf) + + if n > 0 && !requestIsCanceled { + _, err = this.writer.Write(buf[:n]) + if err != nil { + requestIsCanceled = true + } + } + if readErr != nil { + err = readErr + break + } + } + } else { + _, err = io.CopyBuffer(this.writer, resp.Body, buf) + } } pool.Put(buf) diff --git a/internal/nodes/origin_conn.go b/internal/nodes/origin_conn.go new file mode 100644 index 0000000..2bd4d6c --- /dev/null +++ b/internal/nodes/origin_conn.go @@ -0,0 +1,102 @@ +// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package nodes + +import ( + teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "github.com/TeaOSLab/EdgeNode/internal/goman" + "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" + "github.com/TeaOSLab/EdgeNode/internal/zero" + "net" + "sync" + "time" +) + +const originConnCloseDelaySeconds = 3 + +var closingOriginConnMap = map[*OriginConn]zero.Zero{} +var closingOriginConnLocker = &sync.RWMutex{} + +func init() { + if !teaconst.IsMain { + return + } + + goman.New(func() { + var ticker = time.NewTicker(originConnCloseDelaySeconds * time.Second) + for range ticker.C { + CleanOriginConnsTask() + } + }) +} + +func CleanOriginConnsTask() { + var closingConns = []*OriginConn{} + + closingOriginConnLocker.RLock() + for conn := range closingOriginConnMap { + if conn.IsExpired() { + closingConns = append(closingConns, conn) + } + } + closingOriginConnLocker.RUnlock() + + if len(closingConns) > 0 { + for _, conn := range closingConns { + _ = conn.ForceClose() + closingOriginConnLocker.Lock() + delete(closingOriginConnMap, conn) + closingOriginConnLocker.Unlock() + } + } +} + +// OriginConn connection with origin site +type OriginConn struct { + net.Conn + + lastReadOk bool + lastReadAt int64 + isClosed bool +} + +// NewOriginConn create new origin connection +func NewOriginConn(rawConn net.Conn) net.Conn { + return &OriginConn{Conn: rawConn} +} + +// Read implement Read() for net.Conn interface +func (this *OriginConn) Read(b []byte) (n int, err error) { + n, err = this.Conn.Read(b) + this.lastReadOk = err == nil + if this.lastReadOk { + this.lastReadAt = fasttime.Now().Unix() + } + return +} + +// Close implement Close() for net.Conn interface +func (this *OriginConn) Close() error { + if this.lastReadOk && fasttime.Now().Unix()-this.lastReadAt <= originConnCloseDelaySeconds { + closingOriginConnLocker.Lock() + closingOriginConnMap[this] = zero.Zero{} + closingOriginConnLocker.Unlock() + return nil + } + + this.isClosed = true + return this.Conn.Close() +} + +func (this *OriginConn) ForceClose() error { + if this.isClosed { + return nil + } + + this.isClosed = true + return this.Conn.Close() +} + +func (this *OriginConn) IsExpired() bool { + return fasttime.Now().Unix()-this.lastReadAt > originConnCloseDelaySeconds +}