From bcd561f52f5320d150cfb78cd8f1310c019bac66 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Sun, 6 Jun 2021 23:42:11 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96HTTP=E7=BC=93=E5=AD=98?= =?UTF-8?q?=EF=BC=8C=E4=B8=BB=E8=A6=81=E6=98=AF=E5=B9=B6=E5=8F=91=E5=86=B2?= =?UTF-8?q?=E7=AA=81=E3=80=81=E7=BC=93=E5=AD=98=E5=86=99=E5=85=A5=E4=B8=8D?= =?UTF-8?q?=E5=85=A8=E7=AD=89=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/edge-node/main.go | 20 ++++- internal/caches/list_memory.go | 10 +++ internal/caches/list_memory_test.go | 45 ++++++++--- internal/caches/storage_file.go | 36 +++++++-- internal/caches/storage_memory.go | 55 +++++++++++-- internal/caches/writer_file.go | 22 ++++-- internal/caches/writer_memory.go | 33 +++++--- internal/nodes/http_request.go | 6 ++ internal/nodes/http_request_cache.go | 10 +-- internal/nodes/http_request_fastcgi.go | 14 +++- internal/nodes/http_request_page.go | 7 +- internal/nodes/http_request_reverse_proxy.go | 25 +++--- internal/nodes/http_request_root.go | 3 + internal/nodes/http_request_shutdown.go | 11 ++- internal/nodes/http_request_url.go | 9 +++ internal/nodes/http_request_waf.go | 8 +- internal/nodes/http_writer.go | 83 +++++++++++--------- internal/nodes/listener_http.go | 5 ++ 18 files changed, 299 insertions(+), 103 deletions(-) diff --git a/cmd/edge-node/main.go b/cmd/edge-node/main.go index b53498a..db79173 100644 --- a/cmd/edge-node/main.go +++ b/cmd/edge-node/main.go @@ -7,8 +7,11 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/nodes" "github.com/iwind/TeaGo/Tea" _ "github.com/iwind/TeaGo/bootstrap" + "github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/types" "io/ioutil" + "net/http" + _ "net/http/pprof" "os" "syscall" ) @@ -17,7 +20,7 @@ func main() { app := apps.NewAppCmd(). Version(teaconst.Version). Product(teaconst.ProductName). - Usage(teaconst.ProcessName + " [-v|start|stop|restart|quit|test|service|daemon]") + Usage(teaconst.ProcessName + " [-v|start|stop|restart|status|quit|test|service|daemon|pprof]") app.On("test", func() { err := nodes.NewNode().Test() @@ -57,6 +60,21 @@ func main() { _ = process.Signal(syscall.SIGQUIT) } }) + app.On("pprof", func() { + // TODO 自己指定端口 + addr := "127.0.0.1:6060" + logs.Println("starting with pprof '" + addr + "'...") + + go func() { + err := http.ListenAndServe(addr, nil) + if err != nil { + logs.Println("[error]" + err.Error()) + } + }() + + node := nodes.NewNode() + node.Start() + }) app.Run(func() { node := nodes.NewNode() node.Start() diff --git a/internal/caches/list_memory.go b/internal/caches/list_memory.go index 5ab2132..407182f 100644 --- a/internal/caches/list_memory.go +++ b/internal/caches/list_memory.go @@ -33,6 +33,16 @@ func (this *MemoryList) Reset() error { func (this *MemoryList) Add(hash string, item *Item) error { this.locker.Lock() + + // 先删除,为了可以正确触发统计 + oldItem, ok := this.m[hash] + if ok { + if this.onRemove != nil { + this.onRemove(oldItem) + } + } + + // 添加 if this.onAdd != nil { this.onAdd(item) } diff --git a/internal/caches/list_memory_test.go b/internal/caches/list_memory_test.go index d0135fb..48a325b 100644 --- a/internal/caches/list_memory_test.go +++ b/internal/caches/list_memory_test.go @@ -9,8 +9,8 @@ import ( "time" ) -func TestList_Add(t *testing.T) { - list := &MemoryList{} +func TestMemoryList_Add(t *testing.T) { + list := NewMemoryList().(*MemoryList) _ = list.Add("a", &Item{ Key: "a1", ExpiredAt: time.Now().Unix() + 3600, @@ -24,8 +24,8 @@ func TestList_Add(t *testing.T) { t.Log(list.m) } -func TestList_Remove(t *testing.T) { - list := &MemoryList{} +func TestMemoryList_Remove(t *testing.T) { + list := NewMemoryList().(*MemoryList) _ = list.Add("a", &Item{ Key: "a1", ExpiredAt: time.Now().Unix() + 3600, @@ -40,8 +40,8 @@ func TestList_Remove(t *testing.T) { t.Log(list.m) } -func TestList_Purge(t *testing.T) { - list := &MemoryList{} +func TestMemoryList_Purge(t *testing.T) { + list := NewMemoryList().(*MemoryList) _ = list.Add("a", &Item{ Key: "a1", ExpiredAt: time.Now().Unix() + 3600, @@ -69,8 +69,8 @@ func TestList_Purge(t *testing.T) { t.Log(list.m) } -func TestList_Stat(t *testing.T) { - list := &MemoryList{} +func TestMemoryList_Stat(t *testing.T) { + list := NewMemoryList() _ = list.Add("a", &Item{ Key: "a1", ExpiredAt: time.Now().Unix() + 3600, @@ -99,8 +99,8 @@ func TestList_Stat(t *testing.T) { t.Log(result) } -func TestList_FindKeysWithPrefix(t *testing.T) { - list := &MemoryList{} +func TestMemoryList_FindKeysWithPrefix(t *testing.T) { + list := NewMemoryList() before := time.Now() for i := 0; i < 1_000_000; i++ { key := "http://www.teaos.cn/hello" + strconv.Itoa(i/100000) + "/" + strconv.Itoa(i) + ".html" @@ -121,3 +121,28 @@ func TestList_FindKeysWithPrefix(t *testing.T) { t.Log(len(keys)) t.Log(time.Since(before).Seconds()*1000, "ms") } + +func TestMemoryList_GC(t *testing.T) { + list := NewMemoryList().(*MemoryList) + for i := 0; i < 1_000_000; i++ { + key := "http://www.teaos.cn/hello" + strconv.Itoa(i/100000) + "/" + strconv.Itoa(i) + ".html" + _ = list.Add(fmt.Sprintf("%d", xxhash.Sum64String(key)), &Item{ + Key: key, + ExpiredAt: 0, + BodySize: 0, + HeaderSize: 0, + }) + } + time.Sleep(10 * time.Second) + t.Log("clean...", len(list.m)) + _ = list.CleanAll() + before := time.Now() + //runtime.GC() + t.Log("gc cost:", time.Since(before).Seconds()*1000, "ms") + + timeout := time.NewTimer(2 * time.Minute) + <-timeout.C + t.Log("2 minutes passed") + + time.Sleep(30 * time.Minute) +} diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index fd7837c..cf9b541 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -51,14 +51,16 @@ type FileStorage struct { memoryStorage *MemoryStorage // 一级缓存 totalSize int64 - list ListInterface - locker sync.RWMutex - ticker *utils.Ticker + list ListInterface + writingKeyMap map[string]bool // key => bool + locker sync.RWMutex + ticker *utils.Ticker } func NewFileStorage(policy *serverconfigs.HTTPCachePolicy) *FileStorage { return &FileStorage{ - policy: policy, + policy: policy, + writingKeyMap: map[string]bool{}, } } @@ -227,6 +229,25 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr } } + // 是否正在写入 + var isWriting = false + this.locker.Lock() + _, ok := this.writingKeyMap[key] + this.locker.Unlock() + if ok { + return nil, ErrFileIsWriting + } + this.locker.Lock() + this.writingKeyMap[key] = true + this.locker.Unlock() + defer func() { + if !isWriting { + this.locker.Lock() + delete(this.writingKeyMap, key) + this.locker.Unlock() + } + }() + // 检查是否超出最大值 count, err := this.list.Count() if err != nil { @@ -264,6 +285,7 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr if err != nil { return nil, err } + isWriting = true isOk := false removeOnFailure := true @@ -348,7 +370,11 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr isOk = true - return NewFileWriter(writer, key, expiredAt), nil + return NewFileWriter(writer, key, expiredAt, func() { + this.locker.Lock() + delete(this.writingKeyMap, key) + this.locker.Unlock() + }), nil } // AddToList 添加到List diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index d9f3f15..67f110b 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -20,6 +20,10 @@ type MemoryItem struct { IsDone bool } +func (this *MemoryItem) IsExpired() bool { + return this.ExpiredAt < utils.UnixTime() +} + type MemoryStorage struct { policy *serverconfigs.HTTPCachePolicy list ListInterface @@ -28,14 +32,16 @@ type MemoryStorage struct { ticker *utils.Ticker purgeDuration time.Duration totalSize int64 + writingKeyMap map[string]bool // key => bool } func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy) *MemoryStorage { return &MemoryStorage{ - policy: policy, - list: NewMemoryList(), - locker: &sync.RWMutex{}, - valuesMap: map[uint64]*MemoryItem{}, + policy: policy, + list: NewMemoryList(), + locker: &sync.RWMutex{}, + valuesMap: map[uint64]*MemoryItem{}, + writingKeyMap: map[string]bool{}, } } @@ -91,6 +97,29 @@ func (this *MemoryStorage) OpenReader(key string) (Reader, error) { // OpenWriter 打开缓存写入器等待写入 func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) { + this.locker.Lock() + defer this.locker.Unlock() + + // 是否正在写入 + var isWriting = false + _, ok := this.writingKeyMap[key] + if ok { + return nil, ErrFileIsWriting + } + this.writingKeyMap[key] = true + defer func() { + if !isWriting { + delete(this.writingKeyMap, key) + } + }() + + // 检查是否过期 + hash := this.hash(key) + item, ok := this.valuesMap[hash] + if ok && !item.IsExpired() { + return nil, ErrFileIsWriting + } + // 检查是否超出最大值 totalKeys, err := this.list.Count() if err != nil { @@ -101,16 +130,21 @@ func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) ( } capacityBytes := this.memoryCapacityBytes() if capacityBytes > 0 && capacityBytes <= this.totalSize { - return nil, errors.New("write memory cache failed: over memory size, real size: " + strconv.FormatInt(this.totalSize, 10) + " bytes") + return nil, errors.New("write memory cache failed: over memory size: " + strconv.FormatInt(capacityBytes, 10) + ", current size: " + strconv.FormatInt(this.totalSize, 10) + " bytes") } // 先删除 - err = this.Delete(key) + err = this.deleteWithoutKey(key) if err != nil { return nil, err } - return NewMemoryWriter(this.valuesMap, key, expiredAt, status, this.locker), nil + isWriting = true + return NewMemoryWriter(this.valuesMap, key, expiredAt, status, this.locker, func() { + this.locker.Lock() + delete(this.writingKeyMap, key) + this.locker.Unlock() + }), nil } // Delete 删除某个键值对应的缓存 @@ -235,3 +269,10 @@ func (this *MemoryStorage) memoryCapacityBytes() int64 { } return c1 } + +func (this *MemoryStorage) deleteWithoutKey(key string) error { + hash := this.hash(key) + delete(this.valuesMap, hash) + _ = this.list.Remove(fmt.Sprintf("%d", hash)) + return nil +} diff --git a/internal/caches/writer_file.go b/internal/caches/writer_file.go index 77bf2ca..115ab35 100644 --- a/internal/caches/writer_file.go +++ b/internal/caches/writer_file.go @@ -14,17 +14,19 @@ type FileWriter struct { headerSize int64 bodySize int64 expiredAt int64 + endFunc func() } -func NewFileWriter(rawWriter *os.File, key string, expiredAt int64) *FileWriter { +func NewFileWriter(rawWriter *os.File, key string, expiredAt int64, endFunc func()) *FileWriter { return &FileWriter{ key: key, rawWriter: rawWriter, expiredAt: expiredAt, + endFunc: endFunc, } } -// 写入数据 +// WriteHeader 写入数据 func (this *FileWriter) WriteHeader(data []byte) (n int, err error) { n, err = this.rawWriter.Write(data) this.headerSize += int64(n) @@ -34,7 +36,7 @@ func (this *FileWriter) WriteHeader(data []byte) (n int, err error) { return } -// 写入Header长度数据 +// WriteHeaderLength 写入Header长度数据 func (this *FileWriter) WriteHeaderLength(headerLength int) error { bytes4 := make([]byte, 4) binary.BigEndian.PutUint32(bytes4, uint32(headerLength)) @@ -51,7 +53,7 @@ func (this *FileWriter) WriteHeaderLength(headerLength int) error { return nil } -// 写入数据 +// Write 写入数据 func (this *FileWriter) Write(data []byte) (n int, err error) { n, err = this.rawWriter.Write(data) this.bodySize += int64(n) @@ -61,7 +63,7 @@ func (this *FileWriter) Write(data []byte) (n int, err error) { return } -// 写入Body长度数据 +// WriteBodyLength 写入Body长度数据 func (this *FileWriter) WriteBodyLength(bodyLength int64) error { bytes8 := make([]byte, 8) binary.BigEndian.PutUint64(bytes8, uint64(bodyLength)) @@ -78,8 +80,10 @@ func (this *FileWriter) WriteBodyLength(bodyLength int64) error { return nil } -// 关闭 +// Close 关闭 func (this *FileWriter) Close() error { + defer this.endFunc() + err := this.WriteHeaderLength(types.Int(this.headerSize)) if err != nil { return err @@ -103,8 +107,10 @@ func (this *FileWriter) Close() error { return err } -// 丢弃 +// Discard 丢弃 func (this *FileWriter) Discard() error { + defer this.endFunc() + _ = this.rawWriter.Close() err := os.Remove(this.rawWriter.Name()) @@ -127,7 +133,7 @@ func (this *FileWriter) Key() string { return this.key } -// 内容类型 +// ItemType 获取内容类型 func (this *FileWriter) ItemType() ItemType { return ItemTypeFile } diff --git a/internal/caches/writer_memory.go b/internal/caches/writer_memory.go index 39c1b90..9a10f1a 100644 --- a/internal/caches/writer_memory.go +++ b/internal/caches/writer_memory.go @@ -14,11 +14,12 @@ type MemoryWriter struct { bodySize int64 status int - hash uint64 - item *MemoryItem + hash uint64 + item *MemoryItem + endFunc func() } -func NewMemoryWriter(m map[uint64]*MemoryItem, key string, expiredAt int64, status int, locker *sync.RWMutex) *MemoryWriter { +func NewMemoryWriter(m map[uint64]*MemoryItem, key string, expiredAt int64, status int, locker *sync.RWMutex, endFunc func()) *MemoryWriter { w := &MemoryWriter{ m: m, key: key, @@ -28,38 +29,43 @@ func NewMemoryWriter(m map[uint64]*MemoryItem, key string, expiredAt int64, stat ExpiredAt: expiredAt, Status: status, }, - status: status, + status: status, + endFunc: endFunc, } w.hash = w.calculateHash(key) return w } -// 写入数据 +// WriteHeader 写入数据 func (this *MemoryWriter) WriteHeader(data []byte) (n int, err error) { this.headerSize += int64(len(data)) this.item.HeaderValue = append(this.item.HeaderValue, data...) return len(data), nil } -// 写入数据 +// Write 写入数据 func (this *MemoryWriter) Write(data []byte) (n int, err error) { this.bodySize += int64(len(data)) this.item.BodyValue = append(this.item.BodyValue, data...) return len(data), nil } -// 数据尺寸 +// HeaderSize 数据尺寸 func (this *MemoryWriter) HeaderSize() int64 { return this.headerSize } +// BodySize 主体内容尺寸 func (this *MemoryWriter) BodySize() int64 { return this.bodySize } -// 关闭 +// Close 关闭 func (this *MemoryWriter) Close() error { + // 需要在Locker之外 + defer this.endFunc() + if this.item == nil { return nil } @@ -72,25 +78,28 @@ func (this *MemoryWriter) Close() error { return nil } -// 丢弃 +// Discard 丢弃 func (this *MemoryWriter) Discard() error { + // 需要在Locker之外 + defer this.endFunc() + this.locker.Lock() delete(this.m, this.hash) this.locker.Unlock() return nil } -// Key +// Key 获取Key func (this *MemoryWriter) Key() string { return this.key } -// 过期时间 +// ExpiredAt 过期时间 func (this *MemoryWriter) ExpiredAt() int64 { return this.expiredAt } -// 内容类型 +// ItemType 内容类型 func (this *MemoryWriter) ItemType() ItemType { return ItemTypeMemory } diff --git a/internal/nodes/http_request.go b/internal/nodes/http_request.go index 3ee094b..1702c84 100644 --- a/internal/nodes/http_request.go +++ b/internal/nodes/http_request.go @@ -1142,6 +1142,12 @@ func (this *HTTPRequest) canIgnore(err error) bool { return true } + // 网络错误 + _, ok := err.(*net.OpError) + if ok { + return true + } + // 客户端主动取消 if err == context.Canceled { return true diff --git a/internal/nodes/http_request_cache.go b/internal/nodes/http_request_cache.go index 48717c7..f82242e 100644 --- a/internal/nodes/http_request_cache.go +++ b/internal/nodes/http_request_cache.go @@ -108,7 +108,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) { } if !this.canIgnore(err) { - remotelogs.Warn("REQUEST_CACHE", "read from cache failed: "+err.Error()) + remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error()) } return } @@ -142,7 +142,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) { }) if err != nil { if !this.canIgnore(err) { - remotelogs.Warn("REQUEST_CACHE", "read from cache failed: "+err.Error()) + remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error()) } return } @@ -234,7 +234,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) { return true } if !this.canIgnore(err) { - remotelogs.Warn("REQUEST_CACHE", "read from cache failed: "+err.Error()) + remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error()) } return } @@ -277,7 +277,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) { }) if err != nil { if !this.canIgnore(err) { - remotelogs.Warn("REQUEST_CACHE", "read from cache failed: "+err.Error()) + remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error()) } return true } @@ -300,7 +300,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) { }) if err != nil { if !this.canIgnore(err) { - remotelogs.Warn("REQUEST_CACHE", "read from cache failed: "+err.Error()) + remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error()) } return } diff --git a/internal/nodes/http_request_fastcgi.go b/internal/nodes/http_request_fastcgi.go index 0cc04f3..a82e8f2 100644 --- a/internal/nodes/http_request_fastcgi.go +++ b/internal/nodes/http_request_fastcgi.go @@ -200,14 +200,20 @@ func (this *HTTPRequest) doFastcgi() (shouldStop bool) { _, err = io.CopyBuffer(this.writer, resp.Body, buf) pool.Put(buf) - err1 := resp.Body.Close() - if err1 != nil { - remotelogs.Warn("REQUEST_FASTCGI", err1.Error()) + closeErr := resp.Body.Close() + if closeErr != nil { + remotelogs.Warn("HTTP_REQUEST_FASTCGI", closeErr.Error()) } if err != nil && err != io.EOF { - remotelogs.Warn("REQUEST_FASTCGI", err.Error()) + remotelogs.Warn("HTTP_REQUEST_FASTCGI", err.Error()) this.addError(err) } + + // 是否成功结束 + if err == nil && closeErr == nil { + this.writer.SetOk() + } + return } diff --git a/internal/nodes/http_request_page.go b/internal/nodes/http_request_page.go index 73e7485..5ca1638 100644 --- a/internal/nodes/http_request_page.go +++ b/internal/nodes/http_request_page.go @@ -1,6 +1,7 @@ package nodes import ( + "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/logs" "io" @@ -50,7 +51,11 @@ func (this *HTTPRequest) doPage(status int) (shouldStop bool) { _, err = io.CopyBuffer(this.writer, fp, buf) bytePool1k.Put(buf) if err != nil { - logs.Error(err) + if !this.canIgnore(err) { + remotelogs.Warn("HTTP_REQUEST_PAGE", "write to client failed: "+err.Error()) + } + } else { + this.writer.SetOk() } err = fp.Close() if err != nil { diff --git a/internal/nodes/http_request_reverse_proxy.go b/internal/nodes/http_request_reverse_proxy.go index 875ca67..9454093 100644 --- a/internal/nodes/http_request_reverse_proxy.go +++ b/internal/nodes/http_request_reverse_proxy.go @@ -35,7 +35,7 @@ func (this *HTTPRequest) doReverseProxy() { origin := this.reverseProxy.NextOrigin(requestCall) if origin == nil { err := errors.New(this.requestPath() + ": no available backends for reverse proxy") - remotelogs.Error("REQUEST_REVERSE_PROXY", err.Error()) + remotelogs.Error("HTTP_REQUEST_REVERSE_PROXY", err.Error()) this.write502(err) return } @@ -55,7 +55,7 @@ func (this *HTTPRequest) doReverseProxy() { // 处理Scheme if origin.Addr == nil { err := errors.New(this.requestPath() + ": origin '" + strconv.FormatInt(origin.Id, 10) + "' does not has a address") - remotelogs.Error("REQUEST_REVERSE_PROXY", err.Error()) + remotelogs.Error("HTTP_REQUEST_REVERSE_PROXY", err.Error()) this.write502(err) return } @@ -142,7 +142,7 @@ func (this *HTTPRequest) doReverseProxy() { // 获取请求客户端 client, err := SharedHTTPClientPool.Client(this.RawReq, origin, originAddr) if err != nil { - remotelogs.Error("REQUEST_REVERSE_PROXY", err.Error()) + remotelogs.Error("HTTP_REQUEST_REVERSE_PROXY", err.Error()) this.write502(err) return } @@ -162,7 +162,7 @@ func (this *HTTPRequest) doReverseProxy() { // TODO 如果超过最大失败次数,则下线 this.write502(err) - remotelogs.Println("REQUEST_REVERSE_PROXY", this.RawReq.URL.String()+"': "+err.Error()) + remotelogs.Println("HTTP_REQUEST_REVERSE_PROXY", this.RawReq.URL.String()+"': "+err.Error()) } else { // 是否为客户端方面的错误 isClientError := false @@ -189,7 +189,7 @@ func (this *HTTPRequest) doReverseProxy() { if this.doWAFResponse(resp) { err = resp.Body.Close() if err != nil { - remotelogs.Warn("REQUEST_REVERSE_PROXY", err.Error()) + remotelogs.Warn("HTTP_REQUEST_REVERSE_PROXY", err.Error()) } return } @@ -201,7 +201,7 @@ func (this *HTTPRequest) doReverseProxy() { if len(this.web.Pages) > 0 && this.doPage(resp.StatusCode) { err = resp.Body.Close() if err != nil { - remotelogs.Warn("REQUEST_REVERSE_PROXY", err.Error()) + remotelogs.Warn("HTTP_REQUEST_REVERSE_PROXY", err.Error()) } return } @@ -254,17 +254,22 @@ func (this *HTTPRequest) doReverseProxy() { } pool.Put(buf) - err1 := resp.Body.Close() - if err1 != nil { + closeErr := resp.Body.Close() + if closeErr != nil { if !this.canIgnore(err) { - remotelogs.Warn("REQUEST_REVERSE_PROXY", err1.Error()) + remotelogs.Warn("HTTP_REQUEST_REVERSE_PROXY", closeErr.Error()) } } if err != nil && err != io.EOF { if !this.canIgnore(err) { - remotelogs.Warn("REQUEST_REVERSE_PROXY", err.Error()) + remotelogs.Warn("HTTP_REQUEST_REVERSE_PROXY", err.Error()) this.addError(err) } } + + // 是否成功结束 + if err == nil && closeErr == nil { + this.writer.SetOk() + } } diff --git a/internal/nodes/http_request_root.go b/internal/nodes/http_request_root.go index 4ba7604..6170c6a 100644 --- a/internal/nodes/http_request_root.go +++ b/internal/nodes/http_request_root.go @@ -382,6 +382,9 @@ func (this *HTTPRequest) doRoot() (isBreak bool) { } } + // 设置成功 + this.writer.SetOk() + return true } diff --git a/internal/nodes/http_request_shutdown.go b/internal/nodes/http_request_shutdown.go index d9d1a06..832c992 100644 --- a/internal/nodes/http_request_shutdown.go +++ b/internal/nodes/http_request_shutdown.go @@ -1,6 +1,7 @@ package nodes import ( + "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/logs" "io" @@ -65,8 +66,16 @@ func (this *HTTPRequest) doShutdown() { buf := bytePool1k.Get() _, err = io.CopyBuffer(this.writer, fp, buf) bytePool1k.Put(buf) + if err != nil { + if !this.canIgnore(err) { + remotelogs.Warn("HTTP_REQUEST_SHUTDOWN", "write to client failed: "+err.Error()) + } + } else { + this.writer.SetOk() + } + err = fp.Close() if err != nil { - logs.Error(err) + remotelogs.Warn("HTTP_REQUEST_SHUTDOWN", "close file failed: "+err.Error()) } } diff --git a/internal/nodes/http_request_url.go b/internal/nodes/http_request_url.go index 78e16a4..f8a2689 100644 --- a/internal/nodes/http_request_url.go +++ b/internal/nodes/http_request_url.go @@ -2,6 +2,7 @@ package nodes import ( "errors" + "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/iwind/TeaGo/logs" "io" @@ -68,4 +69,12 @@ func (this *HTTPRequest) doURL(method string, url string, host string, statusCod buf := pool.Get() _, err = io.CopyBuffer(this.writer, resp.Body, buf) pool.Put(buf) + + if err != nil { + if !this.canIgnore(err) { + remotelogs.Warn("HTTP_REQUEST_URL", "write to client failed: "+err.Error()) + } + } else { + this.writer.SetOk() + } } diff --git a/internal/nodes/http_request_waf.go b/internal/nodes/http_request_waf.go index 57795bb..d8b5324 100644 --- a/internal/nodes/http_request_waf.go +++ b/internal/nodes/http_request_waf.go @@ -70,7 +70,7 @@ func (this *HTTPRequest) checkWAFRequest(firewallPolicy *firewallconfigs.HTTPFir for _, action := range actions { goNext, err := action.DoHTTP(this.RawReq, this.RawWriter) if err != nil { - remotelogs.Error("REQUEST", "do action '"+err.Error()+"' failed: "+err.Error()) + remotelogs.Error("HTTP_REQUEST_WAF", "do action '"+err.Error()+"' failed: "+err.Error()) return true, false } if !goNext { @@ -101,7 +101,7 @@ func (this *HTTPRequest) checkWAFRequest(firewallPolicy *firewallconfigs.HTTPFir for _, remoteAddr := range remoteAddrs { result, err := iplibrary.SharedLibrary.Lookup(remoteAddr) if err != nil { - remotelogs.Error("REQUEST", "iplibrary lookup failed: "+err.Error()) + remotelogs.Error("HTTP_REQUEST_WAF", "iplibrary lookup failed: "+err.Error()) } else if result != nil { // 检查国家级别封禁 if len(regionConfig.DenyCountryIds) > 0 && len(result.Country) > 0 { @@ -147,7 +147,7 @@ func (this *HTTPRequest) checkWAFRequest(firewallPolicy *firewallconfigs.HTTPFir } goNext, ruleGroup, ruleSet, err := w.MatchRequest(this.RawReq, this.writer) if err != nil { - remotelogs.Error("REQUEST", this.rawURI+": "+err.Error()) + remotelogs.Error("HTTP_REQUEST_WAF", this.rawURI+": "+err.Error()) return } @@ -181,7 +181,7 @@ func (this *HTTPRequest) doWAFResponse(resp *http.Response) (blocked bool) { goNext, ruleGroup, ruleSet, err := w.MatchResponse(this.RawReq, resp, this.writer) if err != nil { - remotelogs.Error("REQUEST", this.rawURI+": "+err.Error()) + remotelogs.Error("HTTP_REQUEST_WAF", this.rawURI+": "+err.Error()) return } diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index 6ffdbd2..36ba65f 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -14,7 +14,7 @@ import ( "strings" ) -// 响应Writer +// HTTPWriter 响应Writer type HTTPWriter struct { req *HTTPRequest writer http.ResponseWriter @@ -32,9 +32,11 @@ type HTTPWriter struct { cacheWriter caches.Writer // 缓存写入 cacheStorage caches.StorageInterface + + isOk bool // 是否完全成功 } -// 包装对象 +// NewHTTPWriter 包装对象 func NewHTTPWriter(req *HTTPRequest, httpResponseWriter http.ResponseWriter) *HTTPWriter { return &HTTPWriter{ req: req, @@ -42,7 +44,7 @@ func NewHTTPWriter(req *HTTPRequest, httpResponseWriter http.ResponseWriter) *HT } } -// 重置 +// Reset 重置 func (this *HTTPWriter) Reset(httpResponseWriter http.ResponseWriter) { this.writer = httpResponseWriter @@ -58,12 +60,12 @@ func (this *HTTPWriter) Reset(httpResponseWriter http.ResponseWriter) { this.gzipBodyWriter = nil } -// 设置Gzip +// Gzip 设置Gzip func (this *HTTPWriter) Gzip(config *serverconfigs.HTTPGzipConfig) { this.gzipConfig = config } -// 准备输出 +// Prepare 准备输出 func (this *HTTPWriter) Prepare(size int64, status int) { this.statusCode = status @@ -71,12 +73,12 @@ func (this *HTTPWriter) Prepare(size int64, status int) { this.prepareCache(size) } -// 包装前的原始的Writer +// Raw 包装前的原始的Writer func (this *HTTPWriter) Raw() http.ResponseWriter { return this.writer } -// 获取Header +// Header 获取Header func (this *HTTPWriter) Header() http.Header { if this.writer == nil { return http.Header{} @@ -84,7 +86,7 @@ func (this *HTTPWriter) Header() http.Header { return this.writer.Header() } -// 添加一组Header +// AddHeaders 添加一组Header func (this *HTTPWriter) AddHeaders(header http.Header) { if this.writer == nil { return @@ -99,7 +101,7 @@ func (this *HTTPWriter) AddHeaders(header http.Header) { } } -// 写入数据 +// Write 写入数据 func (this *HTTPWriter) Write(data []byte) (n int, err error) { if this.writer != nil { if this.gzipWriter != nil { @@ -115,8 +117,9 @@ func (this *HTTPWriter) Write(data []byte) (n int, err error) { if this.cacheWriter != nil { _, err = this.cacheWriter.Write(data) if err != nil { + _ = this.cacheWriter.Discard() this.cacheWriter = nil - remotelogs.Error("REQUEST_WRITER", "write cache failed: "+err.Error()) + remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error()) } } } else { @@ -128,7 +131,7 @@ func (this *HTTPWriter) Write(data []byte) (n int, err error) { if this.gzipBodyWriter != nil { _, err := this.gzipBodyWriter.Write(data) if err != nil { - remotelogs.Error("REQUEST_WRITER", err.Error()) + remotelogs.Error("HTTP_WRITER", err.Error()) } } else { this.body = append(this.body, data...) @@ -137,17 +140,17 @@ func (this *HTTPWriter) Write(data []byte) (n int, err error) { return } -// 写入字符串 +// WriteString 写入字符串 func (this *HTTPWriter) WriteString(s string) (n int, err error) { return this.Write([]byte(s)) } -// 读取发送的字节数 +// SentBodyBytes 读取发送的字节数 func (this *HTTPWriter) SentBodyBytes() int64 { return this.sentBodyBytes } -// 写入状态码 +// WriteHeader 写入状态码 func (this *HTTPWriter) WriteHeader(statusCode int) { if this.writer != nil { this.writer.WriteHeader(statusCode) @@ -155,7 +158,7 @@ func (this *HTTPWriter) WriteHeader(statusCode int) { this.statusCode = statusCode } -// 读取状态码 +// StatusCode 读取状态码 func (this *HTTPWriter) StatusCode() int { if this.statusCode == 0 { return http.StatusOK @@ -163,22 +166,22 @@ func (this *HTTPWriter) StatusCode() int { return this.statusCode } -// 设置拷贝Body数据 +// SetBodyCopying 设置拷贝Body数据 func (this *HTTPWriter) SetBodyCopying(b bool) { this.bodyCopying = b } -// 判断是否在拷贝Body数据 +// BodyIsCopying 判断是否在拷贝Body数据 func (this *HTTPWriter) BodyIsCopying() bool { return this.bodyCopying } -// 读取拷贝的Body数据 +// Body 读取拷贝的Body数据 func (this *HTTPWriter) Body() []byte { return this.body } -// 读取Header二进制数据 +// HeaderData 读取Header二进制数据 func (this *HTTPWriter) HeaderData() []byte { if this.writer == nil { return nil @@ -200,7 +203,12 @@ func (this *HTTPWriter) HeaderData() []byte { return writer.Bytes() } -// 关闭 +// SetOk 设置成功 +func (this *HTTPWriter) SetOk() { + this.isOk = true +} + +// Close 关闭 func (this *HTTPWriter) Close() { // gzip writer if this.gzipWriter != nil { @@ -214,20 +222,24 @@ func (this *HTTPWriter) Close() { // cache writer if this.cacheWriter != nil { - err := this.cacheWriter.Close() - if err == nil { - this.cacheStorage.AddToList(&caches.Item{ - Type: this.cacheWriter.ItemType(), - Key: this.cacheWriter.Key(), - ExpiredAt: this.cacheWriter.ExpiredAt(), - HeaderSize: this.cacheWriter.HeaderSize(), - BodySize: this.cacheWriter.BodySize(), - }) + if this.isOk { + err := this.cacheWriter.Close() + if err == nil { + this.cacheStorage.AddToList(&caches.Item{ + Type: this.cacheWriter.ItemType(), + Key: this.cacheWriter.Key(), + ExpiredAt: this.cacheWriter.ExpiredAt(), + HeaderSize: this.cacheWriter.HeaderSize(), + BodySize: this.cacheWriter.BodySize(), + }) + } + } else { + _ = this.cacheWriter.Discard() } } } -// Hijack +// Hijack Hijack func (this *HTTPWriter) Hijack() (conn net.Conn, buf *bufio.ReadWriter, err error) { hijack, ok := this.writer.(http.Hijacker) if ok { @@ -236,7 +248,7 @@ func (this *HTTPWriter) Hijack() (conn net.Conn, buf *bufio.ReadWriter, err erro return } -// Flush +// Flush Flush func (this *HTTPWriter) Flush() { flusher, ok := this.writer.(http.Flusher) if ok { @@ -284,7 +296,7 @@ func (this *HTTPWriter) prepareGzip(size int64) { var err error = nil this.gzipWriter, err = gzip.NewWriterLevel(this.writer, int(this.gzipConfig.Level)) if err != nil { - remotelogs.Error("REQUEST_WRITER", err.Error()) + remotelogs.Error("HTTP_WRITER", err.Error()) return } @@ -293,7 +305,7 @@ func (this *HTTPWriter) prepareGzip(size int64) { this.gzipBodyBuffer = bytes.NewBuffer([]byte{}) this.gzipBodyWriter, err = gzip.NewWriterLevel(this.gzipBodyBuffer, int(this.gzipConfig.Level)) if err != nil { - remotelogs.Error("REQUEST_WRITER", err.Error()) + remotelogs.Error("HTTP_WRITER", err.Error()) } } @@ -376,7 +388,7 @@ func (this *HTTPWriter) prepareCache(size int64) { cacheWriter, err := storage.OpenWriter(this.req.cacheKey, expiredAt, this.StatusCode()) if err != nil { if err != caches.ErrFileIsWriting { - remotelogs.Error("REQUEST_WRITER", "write cache failed: "+err.Error()) + remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error()) } return } @@ -390,7 +402,8 @@ func (this *HTTPWriter) prepareCache(size int64) { for _, v1 := range v { _, err = cacheWriter.WriteHeader([]byte(k + ":" + v1 + "\n")) if err != nil { - remotelogs.Error("REQUEST_WRITER", "write cache failed: "+err.Error()) + remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error()) + _ = this.cacheWriter.Discard() this.cacheWriter = nil return } diff --git a/internal/nodes/listener_http.go b/internal/nodes/listener_http.go index c6a09b9..3c68c7f 100644 --- a/internal/nodes/listener_http.go +++ b/internal/nodes/listener_http.go @@ -4,6 +4,8 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "golang.org/x/net/http2" + "io" + "log" "net" "net/http" "strings" @@ -11,6 +13,8 @@ import ( "time" ) +var httpErrorLogger = log.New(io.Discard, "", 0) + type HTTPListener struct { BaseListener @@ -37,6 +41,7 @@ func (this *HTTPListener) Serve() error { Handler: handler, ReadHeaderTimeout: 3 * time.Second, // TODO 改成可以配置 IdleTimeout: 2 * time.Minute, // TODO 改成可以配置 + ErrorLog: httpErrorLogger, ConnState: func(conn net.Conn, state http.ConnState) { switch state { case http.StateNew: