diff --git a/internal/utils/readers/.gitignore b/internal/utils/readers/.gitignore new file mode 100644 index 0000000..933e364 --- /dev/null +++ b/internal/utils/readers/.gitignore @@ -0,0 +1 @@ +readers_concurrent_file* \ No newline at end of file diff --git a/internal/utils/readers/reader_concurrent.go b/internal/utils/readers/reader_concurrent.go deleted file mode 100644 index 2dd9674..0000000 --- a/internal/utils/readers/reader_concurrent.go +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . - -package readers - -import ( - "errors" - "github.com/iwind/TeaGo/types" - "io" - "sync" -) - -type concurrentSubReader struct { - main *ConcurrentReaderList - index int -} - -func (this *concurrentSubReader) Read(p []byte) (n int, err error) { - n, err = this.main.readIndex(p, this.index) - this.index++ - return -} - -func (this *concurrentSubReader) Close() error { - this.main.removeSubReader(this) - - err := this.main.Close() - if err != nil { - return err - } - return nil -} - -// ConcurrentReaderList -// TODO 动态调整 pieces = pieces[minPieceIndex:] 以节约内存 -type ConcurrentReaderList struct { - locker sync.RWMutex - readLocker sync.Mutex - - mainReader io.ReadCloser - subReaderMap map[*concurrentSubReader]bool - pieces [][]byte - lastErr error -} - -func NewConcurrentReaderList(mainReader io.ReadCloser) *ConcurrentReaderList { - return &ConcurrentReaderList{ - mainReader: mainReader, - subReaderMap: map[*concurrentSubReader]bool{}, - } -} - -func (this *ConcurrentReaderList) NewReader() io.ReadCloser { - var subReader = &concurrentSubReader{ - main: this, - } - this.locker.Lock() - this.subReaderMap[subReader] = true - this.locker.Unlock() - return subReader -} - -func (this *ConcurrentReaderList) read(p []byte) (n int, err error) { - n, err = this.mainReader.Read(p) - this.lastErr = err - - if n > 0 { - var piece = make([]byte, n) - copy(piece, p[:n]) - this.locker.Lock() - this.pieces = append(this.pieces, piece) - this.locker.Unlock() - } - - return -} - -func (this *ConcurrentReaderList) readIndex(p []byte, index int) (n int, err error) { - // 如果已经有数据 - this.locker.RLock() - var countPieces = len(this.pieces) - if index < countPieces { - var piece = this.pieces[index] - this.locker.RUnlock() - var pn = len(piece) - if len(p) < pn { - err = errors.New("invalid buffer length '" + types.String(len(p)) + "' vs '" + types.String(len(piece)) + "'") - return - } - n = pn - copy(p, piece) - return - } - this.locker.RUnlock() - - if this.lastErr != nil { - return 0, this.lastErr - } - - // 如果没有数据,则读取之 - this.readLocker.Lock() - - // 再次检查数据是否已更新 - this.locker.RLock() - if len(this.pieces) > countPieces || this.lastErr != nil { - this.locker.RUnlock() - this.readLocker.Unlock() - return this.readIndex(p, index) - } - this.locker.RUnlock() - - // 从原始Reader中读取 - n, err = this.read(p) - this.readLocker.Unlock() - if n > 0 { - // 重新尝试 - return this.readIndex(p, index) - } - return -} - -func (this *ConcurrentReaderList) removeSubReader(subReader *concurrentSubReader) { - this.locker.Lock() - delete(this.subReaderMap, subReader) - this.locker.Unlock() -} - -func (this *ConcurrentReaderList) Close() error { - this.locker.Lock() - if len(this.subReaderMap) == 0 { - this.locker.Unlock() - return this.mainReader.Close() - } - this.locker.Unlock() - return nil -} diff --git a/internal/utils/readers/reader_concurrent_test.go b/internal/utils/readers/reader_concurrent_test.go deleted file mode 100644 index 2ac2097..0000000 --- a/internal/utils/readers/reader_concurrent_test.go +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . - -package readers_test - -import ( - "bytes" - "github.com/TeaOSLab/EdgeNode/internal/utils/readers" - "io" - "sync" - "testing" - "time" -) - -type testReader struct { - t *testing.T - - rawReader io.Reader -} - -func (this *testReader) Read(p []byte) (n int, err error) { - time.Sleep(1 * time.Second) // 延迟 - return this.rawReader.Read(p) -} - -func (this *testReader) Close() error { - this.t.Log("close") - return nil -} - -func TestNewConcurrentReader(t *testing.T) { - var originBuffer = &bytes.Buffer{} - originBuffer.Write([]byte("0123456789_hello_world")) - var originLength = originBuffer.Len() - var concurrentReader = readers.NewConcurrentReaderList(&testReader{ - t: t, - rawReader: originBuffer, - }) - - var threads = 32 - var wg = &sync.WaitGroup{} - wg.Add(threads) - - var locker = &sync.Mutex{} - var m = map[int][]byte{} // i => []byte - - for i := 0; i < threads; i++ { - go func(i int) { - defer wg.Done() - - var reader = concurrentReader.NewReader() - - var buf = make([]byte, 4) - for { - n, err := reader.Read(buf) - if n > 0 { - locker.Lock() - m[i] = append(m[i], buf[:n]...) - locker.Unlock() - //t.Log(i, string(buf[:n])) - } - if err != nil { - if err == io.EOF { - break - } - t.Log("ERROR:", err) - } - } - - _ = reader.Close() - }(i) - } - - wg.Wait() - - for i, b := range m { - if len(b) != originLength { - t.Fatal("ERROR:", i, string(b)) - } - t.Log(i, string(b)) - } -}