diff --git a/internal/nodes/http_request_cache.go b/internal/nodes/http_request_cache.go index 70590e9..77448f4 100644 --- a/internal/nodes/http_request_cache.go +++ b/internal/nodes/http_request_cache.go @@ -302,21 +302,21 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { }() // 读取Header - var headerBuf = []byte{} + var headerData = []byte{} this.writer.SetSentHeaderBytes(reader.HeaderSize()) err = reader.ReadHeader(buf, func(n int) (goNext bool, err error) { - headerBuf = append(headerBuf, buf[:n]...) + headerData = append(headerData, buf[:n]...) for { - nIndex := bytes.Index(headerBuf, []byte{'\n'}) + nIndex := bytes.Index(headerData, []byte{'\n'}) if nIndex >= 0 { - row := headerBuf[:nIndex] + row := headerData[:nIndex] spaceIndex := bytes.Index(row, []byte{':'}) if spaceIndex <= 0 { return false, errors.New("invalid header '" + string(row) + "'") } this.writer.Header().Set(string(row[:spaceIndex]), string(row[spaceIndex+1:])) - headerBuf = headerBuf[nIndex+1:] + headerData = headerData[nIndex+1:] } else { break } diff --git a/internal/utils/readers/reader_concurrent.go b/internal/utils/readers/reader_concurrent.go new file mode 100644 index 0000000..2dd9674 --- /dev/null +++ b/internal/utils/readers/reader_concurrent.go @@ -0,0 +1,135 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package readers + +import ( + "errors" + "github.com/iwind/TeaGo/types" + "io" + "sync" +) + +type concurrentSubReader struct { + main *ConcurrentReaderList + index int +} + +func (this *concurrentSubReader) Read(p []byte) (n int, err error) { + n, err = this.main.readIndex(p, this.index) + this.index++ + return +} + +func (this *concurrentSubReader) Close() error { + this.main.removeSubReader(this) + + err := this.main.Close() + if err != nil { + return err + } + return nil +} + +// ConcurrentReaderList +// TODO 动态调整 pieces = pieces[minPieceIndex:] 以节约内存 +type ConcurrentReaderList struct { + locker sync.RWMutex + readLocker sync.Mutex + + mainReader io.ReadCloser + subReaderMap map[*concurrentSubReader]bool + pieces [][]byte + lastErr error +} + +func NewConcurrentReaderList(mainReader io.ReadCloser) *ConcurrentReaderList { + return &ConcurrentReaderList{ + mainReader: mainReader, + subReaderMap: map[*concurrentSubReader]bool{}, + } +} + +func (this *ConcurrentReaderList) NewReader() io.ReadCloser { + var subReader = &concurrentSubReader{ + main: this, + } + this.locker.Lock() + this.subReaderMap[subReader] = true + this.locker.Unlock() + return subReader +} + +func (this *ConcurrentReaderList) read(p []byte) (n int, err error) { + n, err = this.mainReader.Read(p) + this.lastErr = err + + if n > 0 { + var piece = make([]byte, n) + copy(piece, p[:n]) + this.locker.Lock() + this.pieces = append(this.pieces, piece) + this.locker.Unlock() + } + + return +} + +func (this *ConcurrentReaderList) readIndex(p []byte, index int) (n int, err error) { + // 如果已经有数据 + this.locker.RLock() + var countPieces = len(this.pieces) + if index < countPieces { + var piece = this.pieces[index] + this.locker.RUnlock() + var pn = len(piece) + if len(p) < pn { + err = errors.New("invalid buffer length '" + types.String(len(p)) + "' vs '" + types.String(len(piece)) + "'") + return + } + n = pn + copy(p, piece) + return + } + this.locker.RUnlock() + + if this.lastErr != nil { + return 0, this.lastErr + } + + // 如果没有数据,则读取之 + this.readLocker.Lock() + + // 再次检查数据是否已更新 + this.locker.RLock() + if len(this.pieces) > countPieces || this.lastErr != nil { + this.locker.RUnlock() + this.readLocker.Unlock() + return this.readIndex(p, index) + } + this.locker.RUnlock() + + // 从原始Reader中读取 + n, err = this.read(p) + this.readLocker.Unlock() + if n > 0 { + // 重新尝试 + return this.readIndex(p, index) + } + return +} + +func (this *ConcurrentReaderList) removeSubReader(subReader *concurrentSubReader) { + this.locker.Lock() + delete(this.subReaderMap, subReader) + this.locker.Unlock() +} + +func (this *ConcurrentReaderList) Close() error { + this.locker.Lock() + if len(this.subReaderMap) == 0 { + this.locker.Unlock() + return this.mainReader.Close() + } + this.locker.Unlock() + return nil +} diff --git a/internal/utils/readers/reader_concurrent_test.go b/internal/utils/readers/reader_concurrent_test.go new file mode 100644 index 0000000..2ac2097 --- /dev/null +++ b/internal/utils/readers/reader_concurrent_test.go @@ -0,0 +1,81 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package readers_test + +import ( + "bytes" + "github.com/TeaOSLab/EdgeNode/internal/utils/readers" + "io" + "sync" + "testing" + "time" +) + +type testReader struct { + t *testing.T + + rawReader io.Reader +} + +func (this *testReader) Read(p []byte) (n int, err error) { + time.Sleep(1 * time.Second) // 延迟 + return this.rawReader.Read(p) +} + +func (this *testReader) Close() error { + this.t.Log("close") + return nil +} + +func TestNewConcurrentReader(t *testing.T) { + var originBuffer = &bytes.Buffer{} + originBuffer.Write([]byte("0123456789_hello_world")) + var originLength = originBuffer.Len() + var concurrentReader = readers.NewConcurrentReaderList(&testReader{ + t: t, + rawReader: originBuffer, + }) + + var threads = 32 + var wg = &sync.WaitGroup{} + wg.Add(threads) + + var locker = &sync.Mutex{} + var m = map[int][]byte{} // i => []byte + + for i := 0; i < threads; i++ { + go func(i int) { + defer wg.Done() + + var reader = concurrentReader.NewReader() + + var buf = make([]byte, 4) + for { + n, err := reader.Read(buf) + if n > 0 { + locker.Lock() + m[i] = append(m[i], buf[:n]...) + locker.Unlock() + //t.Log(i, string(buf[:n])) + } + if err != nil { + if err == io.EOF { + break + } + t.Log("ERROR:", err) + } + } + + _ = reader.Close() + }(i) + } + + wg.Wait() + + for i, b := range m { + if len(b) != originLength { + t.Fatal("ERROR:", i, string(b)) + } + t.Log(i, string(b)) + } +}