优化HTTP缓存,主要是并发冲突、缓存写入不全等问题

This commit is contained in:
刘祥超
2021-06-06 23:42:11 +08:00
parent 0df5dfad23
commit a49b724745
18 changed files with 299 additions and 103 deletions

View File

@@ -7,8 +7,11 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/nodes" "github.com/TeaOSLab/EdgeNode/internal/nodes"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
_ "github.com/iwind/TeaGo/bootstrap" _ "github.com/iwind/TeaGo/bootstrap"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/types" "github.com/iwind/TeaGo/types"
"io/ioutil" "io/ioutil"
"net/http"
_ "net/http/pprof"
"os" "os"
"syscall" "syscall"
) )
@@ -17,7 +20,7 @@ func main() {
app := apps.NewAppCmd(). app := apps.NewAppCmd().
Version(teaconst.Version). Version(teaconst.Version).
Product(teaconst.ProductName). Product(teaconst.ProductName).
Usage(teaconst.ProcessName + " [-v|start|stop|restart|quit|test|service|daemon]") Usage(teaconst.ProcessName + " [-v|start|stop|restart|status|quit|test|service|daemon|pprof]")
app.On("test", func() { app.On("test", func() {
err := nodes.NewNode().Test() err := nodes.NewNode().Test()
@@ -57,6 +60,21 @@ func main() {
_ = process.Signal(syscall.SIGQUIT) _ = process.Signal(syscall.SIGQUIT)
} }
}) })
app.On("pprof", func() {
// TODO 自己指定端口
addr := "127.0.0.1:6060"
logs.Println("starting with pprof '" + addr + "'...")
go func() {
err := http.ListenAndServe(addr, nil)
if err != nil {
logs.Println("[error]" + err.Error())
}
}()
node := nodes.NewNode()
node.Start()
})
app.Run(func() { app.Run(func() {
node := nodes.NewNode() node := nodes.NewNode()
node.Start() node.Start()

View File

@@ -33,6 +33,16 @@ func (this *MemoryList) Reset() error {
func (this *MemoryList) Add(hash string, item *Item) error { func (this *MemoryList) Add(hash string, item *Item) error {
this.locker.Lock() this.locker.Lock()
// 先删除,为了可以正确触发统计
oldItem, ok := this.m[hash]
if ok {
if this.onRemove != nil {
this.onRemove(oldItem)
}
}
// 添加
if this.onAdd != nil { if this.onAdd != nil {
this.onAdd(item) this.onAdd(item)
} }

View File

@@ -9,8 +9,8 @@ import (
"time" "time"
) )
func TestList_Add(t *testing.T) { func TestMemoryList_Add(t *testing.T) {
list := &MemoryList{} list := NewMemoryList().(*MemoryList)
_ = list.Add("a", &Item{ _ = list.Add("a", &Item{
Key: "a1", Key: "a1",
ExpiredAt: time.Now().Unix() + 3600, ExpiredAt: time.Now().Unix() + 3600,
@@ -24,8 +24,8 @@ func TestList_Add(t *testing.T) {
t.Log(list.m) t.Log(list.m)
} }
func TestList_Remove(t *testing.T) { func TestMemoryList_Remove(t *testing.T) {
list := &MemoryList{} list := NewMemoryList().(*MemoryList)
_ = list.Add("a", &Item{ _ = list.Add("a", &Item{
Key: "a1", Key: "a1",
ExpiredAt: time.Now().Unix() + 3600, ExpiredAt: time.Now().Unix() + 3600,
@@ -40,8 +40,8 @@ func TestList_Remove(t *testing.T) {
t.Log(list.m) t.Log(list.m)
} }
func TestList_Purge(t *testing.T) { func TestMemoryList_Purge(t *testing.T) {
list := &MemoryList{} list := NewMemoryList().(*MemoryList)
_ = list.Add("a", &Item{ _ = list.Add("a", &Item{
Key: "a1", Key: "a1",
ExpiredAt: time.Now().Unix() + 3600, ExpiredAt: time.Now().Unix() + 3600,
@@ -69,8 +69,8 @@ func TestList_Purge(t *testing.T) {
t.Log(list.m) t.Log(list.m)
} }
func TestList_Stat(t *testing.T) { func TestMemoryList_Stat(t *testing.T) {
list := &MemoryList{} list := NewMemoryList()
_ = list.Add("a", &Item{ _ = list.Add("a", &Item{
Key: "a1", Key: "a1",
ExpiredAt: time.Now().Unix() + 3600, ExpiredAt: time.Now().Unix() + 3600,
@@ -99,8 +99,8 @@ func TestList_Stat(t *testing.T) {
t.Log(result) t.Log(result)
} }
func TestList_FindKeysWithPrefix(t *testing.T) { func TestMemoryList_FindKeysWithPrefix(t *testing.T) {
list := &MemoryList{} list := NewMemoryList()
before := time.Now() before := time.Now()
for i := 0; i < 1_000_000; i++ { for i := 0; i < 1_000_000; i++ {
key := "http://www.teaos.cn/hello" + strconv.Itoa(i/100000) + "/" + strconv.Itoa(i) + ".html" key := "http://www.teaos.cn/hello" + strconv.Itoa(i/100000) + "/" + strconv.Itoa(i) + ".html"
@@ -121,3 +121,28 @@ func TestList_FindKeysWithPrefix(t *testing.T) {
t.Log(len(keys)) t.Log(len(keys))
t.Log(time.Since(before).Seconds()*1000, "ms") t.Log(time.Since(before).Seconds()*1000, "ms")
} }
func TestMemoryList_GC(t *testing.T) {
list := NewMemoryList().(*MemoryList)
for i := 0; i < 1_000_000; i++ {
key := "http://www.teaos.cn/hello" + strconv.Itoa(i/100000) + "/" + strconv.Itoa(i) + ".html"
_ = list.Add(fmt.Sprintf("%d", xxhash.Sum64String(key)), &Item{
Key: key,
ExpiredAt: 0,
BodySize: 0,
HeaderSize: 0,
})
}
time.Sleep(10 * time.Second)
t.Log("clean...", len(list.m))
_ = list.CleanAll()
before := time.Now()
//runtime.GC()
t.Log("gc cost:", time.Since(before).Seconds()*1000, "ms")
timeout := time.NewTimer(2 * time.Minute)
<-timeout.C
t.Log("2 minutes passed")
time.Sleep(30 * time.Minute)
}

View File

@@ -51,14 +51,16 @@ type FileStorage struct {
memoryStorage *MemoryStorage // 一级缓存 memoryStorage *MemoryStorage // 一级缓存
totalSize int64 totalSize int64
list ListInterface list ListInterface
locker sync.RWMutex writingKeyMap map[string]bool // key => bool
ticker *utils.Ticker locker sync.RWMutex
ticker *utils.Ticker
} }
func NewFileStorage(policy *serverconfigs.HTTPCachePolicy) *FileStorage { func NewFileStorage(policy *serverconfigs.HTTPCachePolicy) *FileStorage {
return &FileStorage{ return &FileStorage{
policy: policy, policy: policy,
writingKeyMap: map[string]bool{},
} }
} }
@@ -227,6 +229,25 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr
} }
} }
// 是否正在写入
var isWriting = false
this.locker.Lock()
_, ok := this.writingKeyMap[key]
this.locker.Unlock()
if ok {
return nil, ErrFileIsWriting
}
this.locker.Lock()
this.writingKeyMap[key] = true
this.locker.Unlock()
defer func() {
if !isWriting {
this.locker.Lock()
delete(this.writingKeyMap, key)
this.locker.Unlock()
}
}()
// 检查是否超出最大值 // 检查是否超出最大值
count, err := this.list.Count() count, err := this.list.Count()
if err != nil { if err != nil {
@@ -264,6 +285,7 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr
if err != nil { if err != nil {
return nil, err return nil, err
} }
isWriting = true
isOk := false isOk := false
removeOnFailure := true removeOnFailure := true
@@ -348,7 +370,11 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr
isOk = true isOk = true
return NewFileWriter(writer, key, expiredAt), nil return NewFileWriter(writer, key, expiredAt, func() {
this.locker.Lock()
delete(this.writingKeyMap, key)
this.locker.Unlock()
}), nil
} }
// AddToList 添加到List // AddToList 添加到List

View File

@@ -20,6 +20,10 @@ type MemoryItem struct {
IsDone bool IsDone bool
} }
func (this *MemoryItem) IsExpired() bool {
return this.ExpiredAt < utils.UnixTime()
}
type MemoryStorage struct { type MemoryStorage struct {
policy *serverconfigs.HTTPCachePolicy policy *serverconfigs.HTTPCachePolicy
list ListInterface list ListInterface
@@ -28,14 +32,16 @@ type MemoryStorage struct {
ticker *utils.Ticker ticker *utils.Ticker
purgeDuration time.Duration purgeDuration time.Duration
totalSize int64 totalSize int64
writingKeyMap map[string]bool // key => bool
} }
func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy) *MemoryStorage { func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy) *MemoryStorage {
return &MemoryStorage{ return &MemoryStorage{
policy: policy, policy: policy,
list: NewMemoryList(), list: NewMemoryList(),
locker: &sync.RWMutex{}, locker: &sync.RWMutex{},
valuesMap: map[uint64]*MemoryItem{}, valuesMap: map[uint64]*MemoryItem{},
writingKeyMap: map[string]bool{},
} }
} }
@@ -91,6 +97,29 @@ func (this *MemoryStorage) OpenReader(key string) (Reader, error) {
// OpenWriter 打开缓存写入器等待写入 // OpenWriter 打开缓存写入器等待写入
func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) { func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) {
this.locker.Lock()
defer this.locker.Unlock()
// 是否正在写入
var isWriting = false
_, ok := this.writingKeyMap[key]
if ok {
return nil, ErrFileIsWriting
}
this.writingKeyMap[key] = true
defer func() {
if !isWriting {
delete(this.writingKeyMap, key)
}
}()
// 检查是否过期
hash := this.hash(key)
item, ok := this.valuesMap[hash]
if ok && !item.IsExpired() {
return nil, ErrFileIsWriting
}
// 检查是否超出最大值 // 检查是否超出最大值
totalKeys, err := this.list.Count() totalKeys, err := this.list.Count()
if err != nil { if err != nil {
@@ -101,16 +130,21 @@ func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (
} }
capacityBytes := this.memoryCapacityBytes() capacityBytes := this.memoryCapacityBytes()
if capacityBytes > 0 && capacityBytes <= this.totalSize { if capacityBytes > 0 && capacityBytes <= this.totalSize {
return nil, errors.New("write memory cache failed: over memory size, real size: " + strconv.FormatInt(this.totalSize, 10) + " bytes") return nil, errors.New("write memory cache failed: over memory size: " + strconv.FormatInt(capacityBytes, 10) + ", current size: " + strconv.FormatInt(this.totalSize, 10) + " bytes")
} }
// 先删除 // 先删除
err = this.Delete(key) err = this.deleteWithoutKey(key)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewMemoryWriter(this.valuesMap, key, expiredAt, status, this.locker), nil isWriting = true
return NewMemoryWriter(this.valuesMap, key, expiredAt, status, this.locker, func() {
this.locker.Lock()
delete(this.writingKeyMap, key)
this.locker.Unlock()
}), nil
} }
// Delete 删除某个键值对应的缓存 // Delete 删除某个键值对应的缓存
@@ -235,3 +269,10 @@ func (this *MemoryStorage) memoryCapacityBytes() int64 {
} }
return c1 return c1
} }
func (this *MemoryStorage) deleteWithoutKey(key string) error {
hash := this.hash(key)
delete(this.valuesMap, hash)
_ = this.list.Remove(fmt.Sprintf("%d", hash))
return nil
}

View File

@@ -14,17 +14,19 @@ type FileWriter struct {
headerSize int64 headerSize int64
bodySize int64 bodySize int64
expiredAt int64 expiredAt int64
endFunc func()
} }
func NewFileWriter(rawWriter *os.File, key string, expiredAt int64) *FileWriter { func NewFileWriter(rawWriter *os.File, key string, expiredAt int64, endFunc func()) *FileWriter {
return &FileWriter{ return &FileWriter{
key: key, key: key,
rawWriter: rawWriter, rawWriter: rawWriter,
expiredAt: expiredAt, expiredAt: expiredAt,
endFunc: endFunc,
} }
} }
// 写入数据 // WriteHeader 写入数据
func (this *FileWriter) WriteHeader(data []byte) (n int, err error) { func (this *FileWriter) WriteHeader(data []byte) (n int, err error) {
n, err = this.rawWriter.Write(data) n, err = this.rawWriter.Write(data)
this.headerSize += int64(n) this.headerSize += int64(n)
@@ -34,7 +36,7 @@ func (this *FileWriter) WriteHeader(data []byte) (n int, err error) {
return return
} }
// 写入Header长度数据 // WriteHeaderLength 写入Header长度数据
func (this *FileWriter) WriteHeaderLength(headerLength int) error { func (this *FileWriter) WriteHeaderLength(headerLength int) error {
bytes4 := make([]byte, 4) bytes4 := make([]byte, 4)
binary.BigEndian.PutUint32(bytes4, uint32(headerLength)) binary.BigEndian.PutUint32(bytes4, uint32(headerLength))
@@ -51,7 +53,7 @@ func (this *FileWriter) WriteHeaderLength(headerLength int) error {
return nil return nil
} }
// 写入数据 // Write 写入数据
func (this *FileWriter) Write(data []byte) (n int, err error) { func (this *FileWriter) Write(data []byte) (n int, err error) {
n, err = this.rawWriter.Write(data) n, err = this.rawWriter.Write(data)
this.bodySize += int64(n) this.bodySize += int64(n)
@@ -61,7 +63,7 @@ func (this *FileWriter) Write(data []byte) (n int, err error) {
return return
} }
// 写入Body长度数据 // WriteBodyLength 写入Body长度数据
func (this *FileWriter) WriteBodyLength(bodyLength int64) error { func (this *FileWriter) WriteBodyLength(bodyLength int64) error {
bytes8 := make([]byte, 8) bytes8 := make([]byte, 8)
binary.BigEndian.PutUint64(bytes8, uint64(bodyLength)) binary.BigEndian.PutUint64(bytes8, uint64(bodyLength))
@@ -78,8 +80,10 @@ func (this *FileWriter) WriteBodyLength(bodyLength int64) error {
return nil return nil
} }
// 关闭 // Close 关闭
func (this *FileWriter) Close() error { func (this *FileWriter) Close() error {
defer this.endFunc()
err := this.WriteHeaderLength(types.Int(this.headerSize)) err := this.WriteHeaderLength(types.Int(this.headerSize))
if err != nil { if err != nil {
return err return err
@@ -103,8 +107,10 @@ func (this *FileWriter) Close() error {
return err return err
} }
// 丢弃 // Discard 丢弃
func (this *FileWriter) Discard() error { func (this *FileWriter) Discard() error {
defer this.endFunc()
_ = this.rawWriter.Close() _ = this.rawWriter.Close()
err := os.Remove(this.rawWriter.Name()) err := os.Remove(this.rawWriter.Name())
@@ -127,7 +133,7 @@ func (this *FileWriter) Key() string {
return this.key return this.key
} }
// 内容类型 // ItemType 获取内容类型
func (this *FileWriter) ItemType() ItemType { func (this *FileWriter) ItemType() ItemType {
return ItemTypeFile return ItemTypeFile
} }

View File

@@ -14,11 +14,12 @@ type MemoryWriter struct {
bodySize int64 bodySize int64
status int status int
hash uint64 hash uint64
item *MemoryItem item *MemoryItem
endFunc func()
} }
func NewMemoryWriter(m map[uint64]*MemoryItem, key string, expiredAt int64, status int, locker *sync.RWMutex) *MemoryWriter { func NewMemoryWriter(m map[uint64]*MemoryItem, key string, expiredAt int64, status int, locker *sync.RWMutex, endFunc func()) *MemoryWriter {
w := &MemoryWriter{ w := &MemoryWriter{
m: m, m: m,
key: key, key: key,
@@ -28,38 +29,43 @@ func NewMemoryWriter(m map[uint64]*MemoryItem, key string, expiredAt int64, stat
ExpiredAt: expiredAt, ExpiredAt: expiredAt,
Status: status, Status: status,
}, },
status: status, status: status,
endFunc: endFunc,
} }
w.hash = w.calculateHash(key) w.hash = w.calculateHash(key)
return w return w
} }
// 写入数据 // WriteHeader 写入数据
func (this *MemoryWriter) WriteHeader(data []byte) (n int, err error) { func (this *MemoryWriter) WriteHeader(data []byte) (n int, err error) {
this.headerSize += int64(len(data)) this.headerSize += int64(len(data))
this.item.HeaderValue = append(this.item.HeaderValue, data...) this.item.HeaderValue = append(this.item.HeaderValue, data...)
return len(data), nil return len(data), nil
} }
// 写入数据 // Write 写入数据
func (this *MemoryWriter) Write(data []byte) (n int, err error) { func (this *MemoryWriter) Write(data []byte) (n int, err error) {
this.bodySize += int64(len(data)) this.bodySize += int64(len(data))
this.item.BodyValue = append(this.item.BodyValue, data...) this.item.BodyValue = append(this.item.BodyValue, data...)
return len(data), nil return len(data), nil
} }
// 数据尺寸 // HeaderSize 数据尺寸
func (this *MemoryWriter) HeaderSize() int64 { func (this *MemoryWriter) HeaderSize() int64 {
return this.headerSize return this.headerSize
} }
// BodySize 主体内容尺寸
func (this *MemoryWriter) BodySize() int64 { func (this *MemoryWriter) BodySize() int64 {
return this.bodySize return this.bodySize
} }
// 关闭 // Close 关闭
func (this *MemoryWriter) Close() error { func (this *MemoryWriter) Close() error {
// 需要在Locker之外
defer this.endFunc()
if this.item == nil { if this.item == nil {
return nil return nil
} }
@@ -72,25 +78,28 @@ func (this *MemoryWriter) Close() error {
return nil return nil
} }
// 丢弃 // Discard 丢弃
func (this *MemoryWriter) Discard() error { func (this *MemoryWriter) Discard() error {
// 需要在Locker之外
defer this.endFunc()
this.locker.Lock() this.locker.Lock()
delete(this.m, this.hash) delete(this.m, this.hash)
this.locker.Unlock() this.locker.Unlock()
return nil return nil
} }
// Key // Key 获取Key
func (this *MemoryWriter) Key() string { func (this *MemoryWriter) Key() string {
return this.key return this.key
} }
// 过期时间 // ExpiredAt 过期时间
func (this *MemoryWriter) ExpiredAt() int64 { func (this *MemoryWriter) ExpiredAt() int64 {
return this.expiredAt return this.expiredAt
} }
// 内容类型 // ItemType 内容类型
func (this *MemoryWriter) ItemType() ItemType { func (this *MemoryWriter) ItemType() ItemType {
return ItemTypeMemory return ItemTypeMemory
} }

View File

@@ -1142,6 +1142,12 @@ func (this *HTTPRequest) canIgnore(err error) bool {
return true return true
} }
// 网络错误
_, ok := err.(*net.OpError)
if ok {
return true
}
// 客户端主动取消 // 客户端主动取消
if err == context.Canceled { if err == context.Canceled {
return true return true

View File

@@ -108,7 +108,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
} }
if !this.canIgnore(err) { if !this.canIgnore(err) {
remotelogs.Warn("REQUEST_CACHE", "read from cache failed: "+err.Error()) remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
} }
return return
} }
@@ -142,7 +142,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
}) })
if err != nil { if err != nil {
if !this.canIgnore(err) { if !this.canIgnore(err) {
remotelogs.Warn("REQUEST_CACHE", "read from cache failed: "+err.Error()) remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
} }
return return
} }
@@ -234,7 +234,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
return true return true
} }
if !this.canIgnore(err) { if !this.canIgnore(err) {
remotelogs.Warn("REQUEST_CACHE", "read from cache failed: "+err.Error()) remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
} }
return return
} }
@@ -277,7 +277,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
}) })
if err != nil { if err != nil {
if !this.canIgnore(err) { if !this.canIgnore(err) {
remotelogs.Warn("REQUEST_CACHE", "read from cache failed: "+err.Error()) remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
} }
return true return true
} }
@@ -300,7 +300,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
}) })
if err != nil { if err != nil {
if !this.canIgnore(err) { if !this.canIgnore(err) {
remotelogs.Warn("REQUEST_CACHE", "read from cache failed: "+err.Error()) remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
} }
return return
} }

View File

@@ -200,14 +200,20 @@ func (this *HTTPRequest) doFastcgi() (shouldStop bool) {
_, err = io.CopyBuffer(this.writer, resp.Body, buf) _, err = io.CopyBuffer(this.writer, resp.Body, buf)
pool.Put(buf) pool.Put(buf)
err1 := resp.Body.Close() closeErr := resp.Body.Close()
if err1 != nil { if closeErr != nil {
remotelogs.Warn("REQUEST_FASTCGI", err1.Error()) remotelogs.Warn("HTTP_REQUEST_FASTCGI", closeErr.Error())
} }
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
remotelogs.Warn("REQUEST_FASTCGI", err.Error()) remotelogs.Warn("HTTP_REQUEST_FASTCGI", err.Error())
this.addError(err) this.addError(err)
} }
// 是否成功结束
if err == nil && closeErr == nil {
this.writer.SetOk()
}
return return
} }

View File

@@ -1,6 +1,7 @@
package nodes package nodes
import ( import (
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/logs"
"io" "io"
@@ -50,7 +51,11 @@ func (this *HTTPRequest) doPage(status int) (shouldStop bool) {
_, err = io.CopyBuffer(this.writer, fp, buf) _, err = io.CopyBuffer(this.writer, fp, buf)
bytePool1k.Put(buf) bytePool1k.Put(buf)
if err != nil { if err != nil {
logs.Error(err) if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_PAGE", "write to client failed: "+err.Error())
}
} else {
this.writer.SetOk()
} }
err = fp.Close() err = fp.Close()
if err != nil { if err != nil {

View File

@@ -35,7 +35,7 @@ func (this *HTTPRequest) doReverseProxy() {
origin := this.reverseProxy.NextOrigin(requestCall) origin := this.reverseProxy.NextOrigin(requestCall)
if origin == nil { if origin == nil {
err := errors.New(this.requestPath() + ": no available backends for reverse proxy") err := errors.New(this.requestPath() + ": no available backends for reverse proxy")
remotelogs.Error("REQUEST_REVERSE_PROXY", err.Error()) remotelogs.Error("HTTP_REQUEST_REVERSE_PROXY", err.Error())
this.write502(err) this.write502(err)
return return
} }
@@ -55,7 +55,7 @@ func (this *HTTPRequest) doReverseProxy() {
// 处理Scheme // 处理Scheme
if origin.Addr == nil { if origin.Addr == nil {
err := errors.New(this.requestPath() + ": origin '" + strconv.FormatInt(origin.Id, 10) + "' does not has a address") err := errors.New(this.requestPath() + ": origin '" + strconv.FormatInt(origin.Id, 10) + "' does not has a address")
remotelogs.Error("REQUEST_REVERSE_PROXY", err.Error()) remotelogs.Error("HTTP_REQUEST_REVERSE_PROXY", err.Error())
this.write502(err) this.write502(err)
return return
} }
@@ -142,7 +142,7 @@ func (this *HTTPRequest) doReverseProxy() {
// 获取请求客户端 // 获取请求客户端
client, err := SharedHTTPClientPool.Client(this.RawReq, origin, originAddr) client, err := SharedHTTPClientPool.Client(this.RawReq, origin, originAddr)
if err != nil { if err != nil {
remotelogs.Error("REQUEST_REVERSE_PROXY", err.Error()) remotelogs.Error("HTTP_REQUEST_REVERSE_PROXY", err.Error())
this.write502(err) this.write502(err)
return return
} }
@@ -162,7 +162,7 @@ func (this *HTTPRequest) doReverseProxy() {
// TODO 如果超过最大失败次数,则下线 // TODO 如果超过最大失败次数,则下线
this.write502(err) this.write502(err)
remotelogs.Println("REQUEST_REVERSE_PROXY", this.RawReq.URL.String()+"': "+err.Error()) remotelogs.Println("HTTP_REQUEST_REVERSE_PROXY", this.RawReq.URL.String()+"': "+err.Error())
} else { } else {
// 是否为客户端方面的错误 // 是否为客户端方面的错误
isClientError := false isClientError := false
@@ -189,7 +189,7 @@ func (this *HTTPRequest) doReverseProxy() {
if this.doWAFResponse(resp) { if this.doWAFResponse(resp) {
err = resp.Body.Close() err = resp.Body.Close()
if err != nil { if err != nil {
remotelogs.Warn("REQUEST_REVERSE_PROXY", err.Error()) remotelogs.Warn("HTTP_REQUEST_REVERSE_PROXY", err.Error())
} }
return return
} }
@@ -201,7 +201,7 @@ func (this *HTTPRequest) doReverseProxy() {
if len(this.web.Pages) > 0 && this.doPage(resp.StatusCode) { if len(this.web.Pages) > 0 && this.doPage(resp.StatusCode) {
err = resp.Body.Close() err = resp.Body.Close()
if err != nil { if err != nil {
remotelogs.Warn("REQUEST_REVERSE_PROXY", err.Error()) remotelogs.Warn("HTTP_REQUEST_REVERSE_PROXY", err.Error())
} }
return return
} }
@@ -254,17 +254,22 @@ func (this *HTTPRequest) doReverseProxy() {
} }
pool.Put(buf) pool.Put(buf)
err1 := resp.Body.Close() closeErr := resp.Body.Close()
if err1 != nil { if closeErr != nil {
if !this.canIgnore(err) { if !this.canIgnore(err) {
remotelogs.Warn("REQUEST_REVERSE_PROXY", err1.Error()) remotelogs.Warn("HTTP_REQUEST_REVERSE_PROXY", closeErr.Error())
} }
} }
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
if !this.canIgnore(err) { if !this.canIgnore(err) {
remotelogs.Warn("REQUEST_REVERSE_PROXY", err.Error()) remotelogs.Warn("HTTP_REQUEST_REVERSE_PROXY", err.Error())
this.addError(err) this.addError(err)
} }
} }
// 是否成功结束
if err == nil && closeErr == nil {
this.writer.SetOk()
}
} }

View File

@@ -382,6 +382,9 @@ func (this *HTTPRequest) doRoot() (isBreak bool) {
} }
} }
// 设置成功
this.writer.SetOk()
return true return true
} }

View File

@@ -1,6 +1,7 @@
package nodes package nodes
import ( import (
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/logs"
"io" "io"
@@ -65,8 +66,16 @@ func (this *HTTPRequest) doShutdown() {
buf := bytePool1k.Get() buf := bytePool1k.Get()
_, err = io.CopyBuffer(this.writer, fp, buf) _, err = io.CopyBuffer(this.writer, fp, buf)
bytePool1k.Put(buf) bytePool1k.Put(buf)
if err != nil {
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_SHUTDOWN", "write to client failed: "+err.Error())
}
} else {
this.writer.SetOk()
}
err = fp.Close() err = fp.Close()
if err != nil { if err != nil {
logs.Error(err) remotelogs.Warn("HTTP_REQUEST_SHUTDOWN", "close file failed: "+err.Error())
} }
} }

View File

@@ -2,6 +2,7 @@ package nodes
import ( import (
"errors" "errors"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/logs"
"io" "io"
@@ -68,4 +69,12 @@ func (this *HTTPRequest) doURL(method string, url string, host string, statusCod
buf := pool.Get() buf := pool.Get()
_, err = io.CopyBuffer(this.writer, resp.Body, buf) _, err = io.CopyBuffer(this.writer, resp.Body, buf)
pool.Put(buf) pool.Put(buf)
if err != nil {
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_URL", "write to client failed: "+err.Error())
}
} else {
this.writer.SetOk()
}
} }

View File

@@ -70,7 +70,7 @@ func (this *HTTPRequest) checkWAFRequest(firewallPolicy *firewallconfigs.HTTPFir
for _, action := range actions { for _, action := range actions {
goNext, err := action.DoHTTP(this.RawReq, this.RawWriter) goNext, err := action.DoHTTP(this.RawReq, this.RawWriter)
if err != nil { if err != nil {
remotelogs.Error("REQUEST", "do action '"+err.Error()+"' failed: "+err.Error()) remotelogs.Error("HTTP_REQUEST_WAF", "do action '"+err.Error()+"' failed: "+err.Error())
return true, false return true, false
} }
if !goNext { if !goNext {
@@ -101,7 +101,7 @@ func (this *HTTPRequest) checkWAFRequest(firewallPolicy *firewallconfigs.HTTPFir
for _, remoteAddr := range remoteAddrs { for _, remoteAddr := range remoteAddrs {
result, err := iplibrary.SharedLibrary.Lookup(remoteAddr) result, err := iplibrary.SharedLibrary.Lookup(remoteAddr)
if err != nil { if err != nil {
remotelogs.Error("REQUEST", "iplibrary lookup failed: "+err.Error()) remotelogs.Error("HTTP_REQUEST_WAF", "iplibrary lookup failed: "+err.Error())
} else if result != nil { } else if result != nil {
// 检查国家级别封禁 // 检查国家级别封禁
if len(regionConfig.DenyCountryIds) > 0 && len(result.Country) > 0 { if len(regionConfig.DenyCountryIds) > 0 && len(result.Country) > 0 {
@@ -147,7 +147,7 @@ func (this *HTTPRequest) checkWAFRequest(firewallPolicy *firewallconfigs.HTTPFir
} }
goNext, ruleGroup, ruleSet, err := w.MatchRequest(this.RawReq, this.writer) goNext, ruleGroup, ruleSet, err := w.MatchRequest(this.RawReq, this.writer)
if err != nil { if err != nil {
remotelogs.Error("REQUEST", this.rawURI+": "+err.Error()) remotelogs.Error("HTTP_REQUEST_WAF", this.rawURI+": "+err.Error())
return return
} }
@@ -181,7 +181,7 @@ func (this *HTTPRequest) doWAFResponse(resp *http.Response) (blocked bool) {
goNext, ruleGroup, ruleSet, err := w.MatchResponse(this.RawReq, resp, this.writer) goNext, ruleGroup, ruleSet, err := w.MatchResponse(this.RawReq, resp, this.writer)
if err != nil { if err != nil {
remotelogs.Error("REQUEST", this.rawURI+": "+err.Error()) remotelogs.Error("HTTP_REQUEST_WAF", this.rawURI+": "+err.Error())
return return
} }

View File

@@ -14,7 +14,7 @@ import (
"strings" "strings"
) )
// 响应Writer // HTTPWriter 响应Writer
type HTTPWriter struct { type HTTPWriter struct {
req *HTTPRequest req *HTTPRequest
writer http.ResponseWriter writer http.ResponseWriter
@@ -32,9 +32,11 @@ type HTTPWriter struct {
cacheWriter caches.Writer // 缓存写入 cacheWriter caches.Writer // 缓存写入
cacheStorage caches.StorageInterface cacheStorage caches.StorageInterface
isOk bool // 是否完全成功
} }
// 包装对象 // NewHTTPWriter 包装对象
func NewHTTPWriter(req *HTTPRequest, httpResponseWriter http.ResponseWriter) *HTTPWriter { func NewHTTPWriter(req *HTTPRequest, httpResponseWriter http.ResponseWriter) *HTTPWriter {
return &HTTPWriter{ return &HTTPWriter{
req: req, req: req,
@@ -42,7 +44,7 @@ func NewHTTPWriter(req *HTTPRequest, httpResponseWriter http.ResponseWriter) *HT
} }
} }
// 重置 // Reset 重置
func (this *HTTPWriter) Reset(httpResponseWriter http.ResponseWriter) { func (this *HTTPWriter) Reset(httpResponseWriter http.ResponseWriter) {
this.writer = httpResponseWriter this.writer = httpResponseWriter
@@ -58,12 +60,12 @@ func (this *HTTPWriter) Reset(httpResponseWriter http.ResponseWriter) {
this.gzipBodyWriter = nil this.gzipBodyWriter = nil
} }
// 设置Gzip // Gzip 设置Gzip
func (this *HTTPWriter) Gzip(config *serverconfigs.HTTPGzipConfig) { func (this *HTTPWriter) Gzip(config *serverconfigs.HTTPGzipConfig) {
this.gzipConfig = config this.gzipConfig = config
} }
// 准备输出 // Prepare 准备输出
func (this *HTTPWriter) Prepare(size int64, status int) { func (this *HTTPWriter) Prepare(size int64, status int) {
this.statusCode = status this.statusCode = status
@@ -71,12 +73,12 @@ func (this *HTTPWriter) Prepare(size int64, status int) {
this.prepareCache(size) this.prepareCache(size)
} }
// 包装前的原始的Writer // Raw 包装前的原始的Writer
func (this *HTTPWriter) Raw() http.ResponseWriter { func (this *HTTPWriter) Raw() http.ResponseWriter {
return this.writer return this.writer
} }
// 获取Header // Header 获取Header
func (this *HTTPWriter) Header() http.Header { func (this *HTTPWriter) Header() http.Header {
if this.writer == nil { if this.writer == nil {
return http.Header{} return http.Header{}
@@ -84,7 +86,7 @@ func (this *HTTPWriter) Header() http.Header {
return this.writer.Header() return this.writer.Header()
} }
// 添加一组Header // AddHeaders 添加一组Header
func (this *HTTPWriter) AddHeaders(header http.Header) { func (this *HTTPWriter) AddHeaders(header http.Header) {
if this.writer == nil { if this.writer == nil {
return return
@@ -99,7 +101,7 @@ func (this *HTTPWriter) AddHeaders(header http.Header) {
} }
} }
// 写入数据 // Write 写入数据
func (this *HTTPWriter) Write(data []byte) (n int, err error) { func (this *HTTPWriter) Write(data []byte) (n int, err error) {
if this.writer != nil { if this.writer != nil {
if this.gzipWriter != nil { if this.gzipWriter != nil {
@@ -115,8 +117,9 @@ func (this *HTTPWriter) Write(data []byte) (n int, err error) {
if this.cacheWriter != nil { if this.cacheWriter != nil {
_, err = this.cacheWriter.Write(data) _, err = this.cacheWriter.Write(data)
if err != nil { if err != nil {
_ = this.cacheWriter.Discard()
this.cacheWriter = nil this.cacheWriter = nil
remotelogs.Error("REQUEST_WRITER", "write cache failed: "+err.Error()) remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error())
} }
} }
} else { } else {
@@ -128,7 +131,7 @@ func (this *HTTPWriter) Write(data []byte) (n int, err error) {
if this.gzipBodyWriter != nil { if this.gzipBodyWriter != nil {
_, err := this.gzipBodyWriter.Write(data) _, err := this.gzipBodyWriter.Write(data)
if err != nil { if err != nil {
remotelogs.Error("REQUEST_WRITER", err.Error()) remotelogs.Error("HTTP_WRITER", err.Error())
} }
} else { } else {
this.body = append(this.body, data...) this.body = append(this.body, data...)
@@ -137,17 +140,17 @@ func (this *HTTPWriter) Write(data []byte) (n int, err error) {
return return
} }
// 写入字符串 // WriteString 写入字符串
func (this *HTTPWriter) WriteString(s string) (n int, err error) { func (this *HTTPWriter) WriteString(s string) (n int, err error) {
return this.Write([]byte(s)) return this.Write([]byte(s))
} }
// 读取发送的字节数 // SentBodyBytes 读取发送的字节数
func (this *HTTPWriter) SentBodyBytes() int64 { func (this *HTTPWriter) SentBodyBytes() int64 {
return this.sentBodyBytes return this.sentBodyBytes
} }
// 写入状态码 // WriteHeader 写入状态码
func (this *HTTPWriter) WriteHeader(statusCode int) { func (this *HTTPWriter) WriteHeader(statusCode int) {
if this.writer != nil { if this.writer != nil {
this.writer.WriteHeader(statusCode) this.writer.WriteHeader(statusCode)
@@ -155,7 +158,7 @@ func (this *HTTPWriter) WriteHeader(statusCode int) {
this.statusCode = statusCode this.statusCode = statusCode
} }
// 读取状态码 // StatusCode 读取状态码
func (this *HTTPWriter) StatusCode() int { func (this *HTTPWriter) StatusCode() int {
if this.statusCode == 0 { if this.statusCode == 0 {
return http.StatusOK return http.StatusOK
@@ -163,22 +166,22 @@ func (this *HTTPWriter) StatusCode() int {
return this.statusCode return this.statusCode
} }
// 设置拷贝Body数据 // SetBodyCopying 设置拷贝Body数据
func (this *HTTPWriter) SetBodyCopying(b bool) { func (this *HTTPWriter) SetBodyCopying(b bool) {
this.bodyCopying = b this.bodyCopying = b
} }
// 判断是否在拷贝Body数据 // BodyIsCopying 判断是否在拷贝Body数据
func (this *HTTPWriter) BodyIsCopying() bool { func (this *HTTPWriter) BodyIsCopying() bool {
return this.bodyCopying return this.bodyCopying
} }
// 读取拷贝的Body数据 // Body 读取拷贝的Body数据
func (this *HTTPWriter) Body() []byte { func (this *HTTPWriter) Body() []byte {
return this.body return this.body
} }
// 读取Header二进制数据 // HeaderData 读取Header二进制数据
func (this *HTTPWriter) HeaderData() []byte { func (this *HTTPWriter) HeaderData() []byte {
if this.writer == nil { if this.writer == nil {
return nil return nil
@@ -200,7 +203,12 @@ func (this *HTTPWriter) HeaderData() []byte {
return writer.Bytes() return writer.Bytes()
} }
// 关闭 // SetOk 设置成功
func (this *HTTPWriter) SetOk() {
this.isOk = true
}
// Close 关闭
func (this *HTTPWriter) Close() { func (this *HTTPWriter) Close() {
// gzip writer // gzip writer
if this.gzipWriter != nil { if this.gzipWriter != nil {
@@ -214,20 +222,24 @@ func (this *HTTPWriter) Close() {
// cache writer // cache writer
if this.cacheWriter != nil { if this.cacheWriter != nil {
err := this.cacheWriter.Close() if this.isOk {
if err == nil { err := this.cacheWriter.Close()
this.cacheStorage.AddToList(&caches.Item{ if err == nil {
Type: this.cacheWriter.ItemType(), this.cacheStorage.AddToList(&caches.Item{
Key: this.cacheWriter.Key(), Type: this.cacheWriter.ItemType(),
ExpiredAt: this.cacheWriter.ExpiredAt(), Key: this.cacheWriter.Key(),
HeaderSize: this.cacheWriter.HeaderSize(), ExpiredAt: this.cacheWriter.ExpiredAt(),
BodySize: this.cacheWriter.BodySize(), HeaderSize: this.cacheWriter.HeaderSize(),
}) BodySize: this.cacheWriter.BodySize(),
})
}
} else {
_ = this.cacheWriter.Discard()
} }
} }
} }
// Hijack // Hijack Hijack
func (this *HTTPWriter) Hijack() (conn net.Conn, buf *bufio.ReadWriter, err error) { func (this *HTTPWriter) Hijack() (conn net.Conn, buf *bufio.ReadWriter, err error) {
hijack, ok := this.writer.(http.Hijacker) hijack, ok := this.writer.(http.Hijacker)
if ok { if ok {
@@ -236,7 +248,7 @@ func (this *HTTPWriter) Hijack() (conn net.Conn, buf *bufio.ReadWriter, err erro
return return
} }
// Flush // Flush Flush
func (this *HTTPWriter) Flush() { func (this *HTTPWriter) Flush() {
flusher, ok := this.writer.(http.Flusher) flusher, ok := this.writer.(http.Flusher)
if ok { if ok {
@@ -284,7 +296,7 @@ func (this *HTTPWriter) prepareGzip(size int64) {
var err error = nil var err error = nil
this.gzipWriter, err = gzip.NewWriterLevel(this.writer, int(this.gzipConfig.Level)) this.gzipWriter, err = gzip.NewWriterLevel(this.writer, int(this.gzipConfig.Level))
if err != nil { if err != nil {
remotelogs.Error("REQUEST_WRITER", err.Error()) remotelogs.Error("HTTP_WRITER", err.Error())
return return
} }
@@ -293,7 +305,7 @@ func (this *HTTPWriter) prepareGzip(size int64) {
this.gzipBodyBuffer = bytes.NewBuffer([]byte{}) this.gzipBodyBuffer = bytes.NewBuffer([]byte{})
this.gzipBodyWriter, err = gzip.NewWriterLevel(this.gzipBodyBuffer, int(this.gzipConfig.Level)) this.gzipBodyWriter, err = gzip.NewWriterLevel(this.gzipBodyBuffer, int(this.gzipConfig.Level))
if err != nil { if err != nil {
remotelogs.Error("REQUEST_WRITER", err.Error()) remotelogs.Error("HTTP_WRITER", err.Error())
} }
} }
@@ -376,7 +388,7 @@ func (this *HTTPWriter) prepareCache(size int64) {
cacheWriter, err := storage.OpenWriter(this.req.cacheKey, expiredAt, this.StatusCode()) cacheWriter, err := storage.OpenWriter(this.req.cacheKey, expiredAt, this.StatusCode())
if err != nil { if err != nil {
if err != caches.ErrFileIsWriting { if err != caches.ErrFileIsWriting {
remotelogs.Error("REQUEST_WRITER", "write cache failed: "+err.Error()) remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error())
} }
return return
} }
@@ -390,7 +402,8 @@ func (this *HTTPWriter) prepareCache(size int64) {
for _, v1 := range v { for _, v1 := range v {
_, err = cacheWriter.WriteHeader([]byte(k + ":" + v1 + "\n")) _, err = cacheWriter.WriteHeader([]byte(k + ":" + v1 + "\n"))
if err != nil { if err != nil {
remotelogs.Error("REQUEST_WRITER", "write cache failed: "+err.Error()) remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error())
_ = this.cacheWriter.Discard()
this.cacheWriter = nil this.cacheWriter = nil
return return
} }

View File

@@ -4,6 +4,8 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"golang.org/x/net/http2" "golang.org/x/net/http2"
"io"
"log"
"net" "net"
"net/http" "net/http"
"strings" "strings"
@@ -11,6 +13,8 @@ import (
"time" "time"
) )
var httpErrorLogger = log.New(io.Discard, "", 0)
type HTTPListener struct { type HTTPListener struct {
BaseListener BaseListener
@@ -37,6 +41,7 @@ func (this *HTTPListener) Serve() error {
Handler: handler, Handler: handler,
ReadHeaderTimeout: 3 * time.Second, // TODO 改成可以配置 ReadHeaderTimeout: 3 * time.Second, // TODO 改成可以配置
IdleTimeout: 2 * time.Minute, // TODO 改成可以配置 IdleTimeout: 2 * time.Minute, // TODO 改成可以配置
ErrorLog: httpErrorLogger,
ConnState: func(conn net.Conn, state http.ConnState) { ConnState: func(conn net.Conn, state http.ConnState) {
switch state { switch state {
case http.StateNew: case http.StateNew: