mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-02 22:10:25 +08:00
优化并发读写相关代码
This commit is contained in:
@@ -11,7 +11,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type FileReader struct {
|
type FileReader struct {
|
||||||
fp *os.File
|
fp *fsutils.File
|
||||||
|
|
||||||
openFile *OpenFile
|
openFile *OpenFile
|
||||||
openFileCache *OpenFileCache
|
openFileCache *OpenFileCache
|
||||||
@@ -29,7 +29,7 @@ type FileReader struct {
|
|||||||
isClosed bool
|
isClosed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFileReader(fp *os.File) *FileReader {
|
func NewFileReader(fp *fsutils.File) *FileReader {
|
||||||
return &FileReader{fp: fp}
|
return &FileReader{fp: fp}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -175,9 +175,7 @@ func (this *FileReader) ReadHeader(buf []byte, callback ReaderFunc) error {
|
|||||||
var headerSize = this.headerSize
|
var headerSize = this.headerSize
|
||||||
|
|
||||||
for {
|
for {
|
||||||
fsutils.ReaderLimiter.Ack()
|
|
||||||
n, err := this.fp.Read(buf)
|
n, err := this.fp.Read(buf)
|
||||||
fsutils.ReaderLimiter.Release()
|
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
if n < headerSize {
|
if n < headerSize {
|
||||||
goNext, e := callback(n)
|
goNext, e := callback(n)
|
||||||
@@ -239,9 +237,7 @@ func (this *FileReader) ReadBody(buf []byte, callback ReaderFunc) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
fsutils.ReaderLimiter.Ack()
|
|
||||||
n, err := this.fp.Read(buf)
|
n, err := this.fp.Read(buf)
|
||||||
fsutils.ReaderLimiter.Release()
|
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
goNext, e := callback(n)
|
goNext, e := callback(n)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
@@ -272,9 +268,7 @@ func (this *FileReader) Read(buf []byte) (n int, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
fsutils.ReaderLimiter.Ack()
|
|
||||||
n, err = this.fp.Read(buf)
|
n, err = this.fp.Read(buf)
|
||||||
fsutils.ReaderLimiter.Release()
|
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
_ = this.discard()
|
_ = this.discard()
|
||||||
}
|
}
|
||||||
@@ -306,18 +300,14 @@ func (this *FileReader) ReadBodyRange(buf []byte, start int64, end int64, callba
|
|||||||
isOk = true
|
isOk = true
|
||||||
return ErrInvalidRange
|
return ErrInvalidRange
|
||||||
}
|
}
|
||||||
fsutils.ReaderLimiter.Ack()
|
|
||||||
_, err := this.fp.Seek(offset, io.SeekStart)
|
_, err := this.fp.Seek(offset, io.SeekStart)
|
||||||
fsutils.ReaderLimiter.Release()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
var n int
|
var n int
|
||||||
fsutils.ReaderLimiter.Ack()
|
|
||||||
n, err = this.fp.Read(buf)
|
n, err = this.fp.Read(buf)
|
||||||
fsutils.ReaderLimiter.Release()
|
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
var n2 = int(end-offset) + 1
|
var n2 = int(end-offset) + 1
|
||||||
if n2 <= n {
|
if n2 <= n {
|
||||||
@@ -363,7 +353,7 @@ func (this *FileReader) ContainsRange(r rangeutils.Range) (r2 rangeutils.Range,
|
|||||||
|
|
||||||
// FP 原始的文件句柄
|
// FP 原始的文件句柄
|
||||||
func (this *FileReader) FP() *os.File {
|
func (this *FileReader) FP() *os.File {
|
||||||
return this.fp
|
return this.fp.Raw()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *FileReader) Close() error {
|
func (this *FileReader) Close() error {
|
||||||
@@ -378,7 +368,7 @@ func (this *FileReader) Close() error {
|
|||||||
} else {
|
} else {
|
||||||
var cacheMeta = make([]byte, len(this.meta))
|
var cacheMeta = make([]byte, len(this.meta))
|
||||||
copy(cacheMeta, this.meta)
|
copy(cacheMeta, this.meta)
|
||||||
this.openFileCache.Put(this.fp.Name(), NewOpenFile(this.fp, cacheMeta, this.header, this.LastModified(), this.bodySize))
|
this.openFileCache.Put(this.fp.Name(), NewOpenFile(this.fp.Raw(), cacheMeta, this.header, this.LastModified(), this.bodySize))
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -386,10 +376,8 @@ func (this *FileReader) Close() error {
|
|||||||
return this.fp.Close()
|
return this.fp.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *FileReader) readToBuff(fp *os.File, buf []byte) (ok bool, err error) {
|
func (this *FileReader) readToBuff(fp *fsutils.File, buf []byte) (ok bool, err error) {
|
||||||
fsutils.ReaderLimiter.Ack()
|
|
||||||
n, err := fp.Read(buf)
|
n, err := fp.Read(buf)
|
||||||
fsutils.ReaderLimiter.Release()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,7 +8,6 @@ import (
|
|||||||
rangeutils "github.com/TeaOSLab/EdgeNode/internal/utils/ranges"
|
rangeutils "github.com/TeaOSLab/EdgeNode/internal/utils/ranges"
|
||||||
"github.com/iwind/TeaGo/types"
|
"github.com/iwind/TeaGo/types"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type PartialFileReader struct {
|
type PartialFileReader struct {
|
||||||
@@ -18,7 +17,7 @@ type PartialFileReader struct {
|
|||||||
rangePath string
|
rangePath string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPartialFileReader(fp *os.File) *PartialFileReader {
|
func NewPartialFileReader(fp *fsutils.File) *PartialFileReader {
|
||||||
return &PartialFileReader{
|
return &PartialFileReader{
|
||||||
FileReader: NewFileReader(fp),
|
FileReader: NewFileReader(fp),
|
||||||
rangePath: PartialRangesFilePath(fp.Name()),
|
rangePath: PartialRangesFilePath(fp.Name()),
|
||||||
|
|||||||
@@ -439,12 +439,12 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
|
|||||||
|
|
||||||
var reader Reader
|
var reader Reader
|
||||||
if isPartial {
|
if isPartial {
|
||||||
var partialFileReader = NewPartialFileReader(fp)
|
var partialFileReader = NewPartialFileReader(fsutils.NewFile(fp, fsutils.FlagRead))
|
||||||
partialFileReader.openFile = openFile
|
partialFileReader.openFile = openFile
|
||||||
partialFileReader.openFileCache = openFileCache
|
partialFileReader.openFileCache = openFileCache
|
||||||
reader = partialFileReader
|
reader = partialFileReader
|
||||||
} else {
|
} else {
|
||||||
var fileReader = NewFileReader(fp)
|
var fileReader = NewFileReader(fsutils.NewFile(fp, fsutils.FlagRead))
|
||||||
fileReader.openFile = openFile
|
fileReader.openFile = openFile
|
||||||
fileReader.openFileCache = openFileCache
|
fileReader.openFileCache = openFileCache
|
||||||
reader = fileReader
|
reader = fileReader
|
||||||
@@ -593,7 +593,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
|
|||||||
if existsCacheItem {
|
if existsCacheItem {
|
||||||
readerFp, err := fsutils.OpenFile(tmpPath, os.O_RDONLY, 0444)
|
readerFp, err := fsutils.OpenFile(tmpPath, os.O_RDONLY, 0444)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
var partialReader = NewPartialFileReader(readerFp)
|
var partialReader = NewPartialFileReader(fsutils.NewFile(readerFp, fsutils.FlagRead))
|
||||||
err = partialReader.Init()
|
err = partialReader.Init()
|
||||||
_ = partialReader.Close()
|
_ = partialReader.Close()
|
||||||
if err == nil && partialReader.bodyOffset > 0 {
|
if err == nil && partialReader.bodyOffset > 0 {
|
||||||
@@ -629,7 +629,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
|
|||||||
return nil, ErrServerIsBusy
|
return nil, ErrServerIsBusy
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
writer, err := os.OpenFile(tmpPath, flags, 0666)
|
fp, err := os.OpenFile(tmpPath, flags, 0666)
|
||||||
if !isFlushing {
|
if !isFlushing {
|
||||||
fsutils.WriterLimiter.Release()
|
fsutils.WriterLimiter.Release()
|
||||||
}
|
}
|
||||||
@@ -639,7 +639,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
|
|||||||
|
|
||||||
// open file again
|
// open file again
|
||||||
fsutils.WriterLimiter.Ack()
|
fsutils.WriterLimiter.Ack()
|
||||||
writer, err = os.OpenFile(tmpPath, flags, 0666)
|
fp, err = os.OpenFile(tmpPath, flags, 0666)
|
||||||
fsutils.WriterLimiter.Release()
|
fsutils.WriterLimiter.Release()
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -647,6 +647,8 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var writer = fsutils.NewFile(fp, fsutils.FlagWrite)
|
||||||
|
|
||||||
var removeOnFailure = true
|
var removeOnFailure = true
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -663,7 +665,9 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// 尝试锁定,如果锁定失败,则直接返回
|
// 尝试锁定,如果锁定失败,则直接返回
|
||||||
|
fsutils.WriterLimiter.Ack()
|
||||||
err = syscall.Flock(int(writer.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
|
err = syscall.Flock(int(writer.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
|
||||||
|
fsutils.WriterLimiter.Release()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
removeOnFailure = false
|
removeOnFailure = false
|
||||||
return nil, fmt.Errorf("%w (003)", ErrFileIsWriting)
|
return nil, fmt.Errorf("%w (003)", ErrFileIsWriting)
|
||||||
@@ -700,9 +704,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
|
|||||||
metaBodySize = bodySize
|
metaBodySize = bodySize
|
||||||
}
|
}
|
||||||
|
|
||||||
fsutils.WriterLimiter.Ack()
|
|
||||||
_, err = writer.Write(metaBytes)
|
_, err = writer.Write(metaBytes)
|
||||||
fsutils.WriterLimiter.Release()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,14 +6,13 @@ import (
|
|||||||
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
|
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
|
||||||
"github.com/iwind/TeaGo/types"
|
"github.com/iwind/TeaGo/types"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
type FileWriter struct {
|
type FileWriter struct {
|
||||||
storage StorageInterface
|
storage StorageInterface
|
||||||
rawWriter *os.File
|
rawWriter *fsutils.File
|
||||||
key string
|
key string
|
||||||
|
|
||||||
metaHeaderSize int
|
metaHeaderSize int
|
||||||
@@ -26,9 +25,11 @@ type FileWriter struct {
|
|||||||
maxSize int64
|
maxSize int64
|
||||||
endFunc func()
|
endFunc func()
|
||||||
once sync.Once
|
once sync.Once
|
||||||
|
|
||||||
|
modifiedBytes int
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFileWriter(storage StorageInterface, rawWriter *os.File, key string, expiredAt int64, metaHeaderSize int, metaBodySize int64, maxSize int64, endFunc func()) *FileWriter {
|
func NewFileWriter(storage StorageInterface, rawWriter *fsutils.File, key string, expiredAt int64, metaHeaderSize int, metaBodySize int64, maxSize int64, endFunc func()) *FileWriter {
|
||||||
return &FileWriter{
|
return &FileWriter{
|
||||||
storage: storage,
|
storage: storage,
|
||||||
key: key,
|
key: key,
|
||||||
@@ -43,9 +44,7 @@ func NewFileWriter(storage StorageInterface, rawWriter *os.File, key string, exp
|
|||||||
|
|
||||||
// WriteHeader 写入数据
|
// WriteHeader 写入数据
|
||||||
func (this *FileWriter) WriteHeader(data []byte) (n int, err error) {
|
func (this *FileWriter) WriteHeader(data []byte) (n int, err error) {
|
||||||
fsutils.WriterLimiter.Ack()
|
|
||||||
n, err = this.rawWriter.Write(data)
|
n, err = this.rawWriter.Write(data)
|
||||||
fsutils.WriterLimiter.Release()
|
|
||||||
this.headerSize += int64(n)
|
this.headerSize += int64(n)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = this.Discard()
|
_ = this.Discard()
|
||||||
@@ -79,7 +78,7 @@ func (this *FileWriter) Write(data []byte) (n int, err error) {
|
|||||||
var l = len(data)
|
var l = len(data)
|
||||||
if l > (2 << 20) {
|
if l > (2 << 20) {
|
||||||
var offset = 0
|
var offset = 0
|
||||||
const bufferSize = 256 << 10
|
const bufferSize = 64 << 10
|
||||||
for {
|
for {
|
||||||
var end = offset + bufferSize
|
var end = offset + bufferSize
|
||||||
if end > l {
|
if end > l {
|
||||||
@@ -145,24 +144,18 @@ func (this *FileWriter) Close() error {
|
|||||||
|
|
||||||
err := this.WriteHeaderLength(types.Int(this.headerSize))
|
err := this.WriteHeaderLength(types.Int(this.headerSize))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fsutils.WriterLimiter.Ack()
|
|
||||||
_ = this.rawWriter.Close()
|
_ = this.rawWriter.Close()
|
||||||
fsutils.WriterLimiter.Release()
|
|
||||||
_ = fsutils.Remove(path)
|
_ = fsutils.Remove(path)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = this.WriteBodyLength(this.bodySize)
|
err = this.WriteBodyLength(this.bodySize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fsutils.WriterLimiter.Ack()
|
|
||||||
_ = this.rawWriter.Close()
|
_ = this.rawWriter.Close()
|
||||||
fsutils.WriterLimiter.Release()
|
|
||||||
_ = fsutils.Remove(path)
|
_ = fsutils.Remove(path)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
fsutils.WriterLimiter.Ack()
|
|
||||||
err = this.rawWriter.Close()
|
err = this.rawWriter.Close()
|
||||||
fsutils.WriterLimiter.Release()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = fsutils.Remove(path)
|
_ = fsutils.Remove(path)
|
||||||
} else if strings.HasSuffix(path, FileTmpSuffix) {
|
} else if strings.HasSuffix(path, FileTmpSuffix) {
|
||||||
@@ -181,9 +174,7 @@ func (this *FileWriter) Discard() error {
|
|||||||
this.endFunc()
|
this.endFunc()
|
||||||
})
|
})
|
||||||
|
|
||||||
fsutils.WriterLimiter.Ack()
|
|
||||||
_ = this.rawWriter.Close()
|
_ = this.rawWriter.Close()
|
||||||
fsutils.WriterLimiter.Release()
|
|
||||||
|
|
||||||
err := fsutils.Remove(this.rawWriter.Name())
|
err := fsutils.Remove(this.rawWriter.Name())
|
||||||
return err
|
return err
|
||||||
@@ -211,9 +202,7 @@ func (this *FileWriter) ItemType() ItemType {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *FileWriter) write(data []byte) (n int, err error) {
|
func (this *FileWriter) write(data []byte) (n int, err error) {
|
||||||
fsutils.WriterLimiter.Ack()
|
|
||||||
n, err = this.rawWriter.Write(data)
|
n, err = this.rawWriter.Write(data)
|
||||||
fsutils.WriterLimiter.Release()
|
|
||||||
this.bodySize += int64(n)
|
this.bodySize += int64(n)
|
||||||
|
|
||||||
if this.maxSize > 0 && this.bodySize > this.maxSize {
|
if this.maxSize > 0 && this.bodySize > this.maxSize {
|
||||||
@@ -227,5 +216,6 @@ func (this *FileWriter) write(data []byte) (n int, err error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
_ = this.Discard()
|
_ = this.Discard()
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,12 +7,11 @@ import (
|
|||||||
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
|
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
|
||||||
"github.com/iwind/TeaGo/types"
|
"github.com/iwind/TeaGo/types"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
type PartialFileWriter struct {
|
type PartialFileWriter struct {
|
||||||
rawWriter *os.File
|
rawWriter *fsutils.File
|
||||||
key string
|
key string
|
||||||
|
|
||||||
metaHeaderSize int
|
metaHeaderSize int
|
||||||
@@ -33,7 +32,7 @@ type PartialFileWriter struct {
|
|||||||
rangePath string
|
rangePath string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPartialFileWriter(rawWriter *os.File, key string, expiredAt int64, metaHeaderSize int, metaBodySize int64, isNew bool, isPartial bool, bodyOffset int64, ranges *PartialRanges, endFunc func()) *PartialFileWriter {
|
func NewPartialFileWriter(rawWriter *fsutils.File, key string, expiredAt int64, metaHeaderSize int, metaBodySize int64, isNew bool, isPartial bool, bodyOffset int64, ranges *PartialRanges, endFunc func()) *PartialFileWriter {
|
||||||
return &PartialFileWriter{
|
return &PartialFileWriter{
|
||||||
key: key,
|
key: key,
|
||||||
rawWriter: rawWriter,
|
rawWriter: rawWriter,
|
||||||
@@ -54,9 +53,7 @@ func (this *PartialFileWriter) WriteHeader(data []byte) (n int, err error) {
|
|||||||
if !this.isNew {
|
if !this.isNew {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
fsutils.WriterLimiter.Ack()
|
|
||||||
n, err = this.rawWriter.Write(data)
|
n, err = this.rawWriter.Write(data)
|
||||||
fsutils.WriterLimiter.Release()
|
|
||||||
this.headerSize += int64(n)
|
this.headerSize += int64(n)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = this.Discard()
|
_ = this.Discard()
|
||||||
@@ -65,9 +62,7 @@ func (this *PartialFileWriter) WriteHeader(data []byte) (n int, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *PartialFileWriter) AppendHeader(data []byte) error {
|
func (this *PartialFileWriter) AppendHeader(data []byte) error {
|
||||||
fsutils.WriterLimiter.Ack()
|
|
||||||
_, err := this.rawWriter.Write(data)
|
_, err := this.rawWriter.Write(data)
|
||||||
fsutils.WriterLimiter.Release()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = this.Discard()
|
_ = this.Discard()
|
||||||
} else {
|
} else {
|
||||||
@@ -94,9 +89,7 @@ func (this *PartialFileWriter) WriteHeaderLength(headerLength int) error {
|
|||||||
_ = this.Discard()
|
_ = this.Discard()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
fsutils.WriterLimiter.Ack()
|
|
||||||
_, err = this.rawWriter.Write(bytes4)
|
_, err = this.rawWriter.Write(bytes4)
|
||||||
fsutils.WriterLimiter.Release()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = this.Discard()
|
_ = this.Discard()
|
||||||
return err
|
return err
|
||||||
@@ -106,9 +99,7 @@ func (this *PartialFileWriter) WriteHeaderLength(headerLength int) error {
|
|||||||
|
|
||||||
// Write 写入数据
|
// Write 写入数据
|
||||||
func (this *PartialFileWriter) Write(data []byte) (n int, err error) {
|
func (this *PartialFileWriter) Write(data []byte) (n int, err error) {
|
||||||
fsutils.WriterLimiter.Ack()
|
|
||||||
n, err = this.rawWriter.Write(data)
|
n, err = this.rawWriter.Write(data)
|
||||||
fsutils.WriterLimiter.Release()
|
|
||||||
this.bodySize += int64(n)
|
this.bodySize += int64(n)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = this.Discard()
|
_ = this.Discard()
|
||||||
@@ -132,11 +123,14 @@ func (this *PartialFileWriter) WriteAt(offset int64, data []byte) error {
|
|||||||
// prevent extending too much space in a single writing
|
// prevent extending too much space in a single writing
|
||||||
var maxOffset = this.ranges.Max()
|
var maxOffset = this.ranges.Max()
|
||||||
if offset-maxOffset > 16<<20 {
|
if offset-maxOffset > 16<<20 {
|
||||||
|
var extendSizePerStep int64 = 1 << 20
|
||||||
var maxExtendSize int64 = 32 << 20
|
var maxExtendSize int64 = 32 << 20
|
||||||
if fsutils.DiskIsExtremelyFast() {
|
if fsutils.DiskIsExtremelyFast() {
|
||||||
maxExtendSize = 128 << 20
|
maxExtendSize = 128 << 20
|
||||||
|
extendSizePerStep = 4 << 20
|
||||||
} else if fsutils.DiskIsFast() {
|
} else if fsutils.DiskIsFast() {
|
||||||
maxExtendSize = 64 << 20
|
maxExtendSize = 64 << 20
|
||||||
|
extendSizePerStep = 2 << 20
|
||||||
}
|
}
|
||||||
if offset-maxOffset > maxExtendSize {
|
if offset-maxOffset > maxExtendSize {
|
||||||
stat, err := this.rawWriter.Stat()
|
stat, err := this.rawWriter.Stat()
|
||||||
@@ -145,11 +139,8 @@ func (this *PartialFileWriter) WriteAt(offset int64, data []byte) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// extend min size to prepare for file tail
|
// extend min size to prepare for file tail
|
||||||
const extendSizePerStep = 8 << 20
|
|
||||||
if stat.Size()+extendSizePerStep <= this.bodyOffset+offset+int64(len(data)) {
|
if stat.Size()+extendSizePerStep <= this.bodyOffset+offset+int64(len(data)) {
|
||||||
fsutils.WriterLimiter.Ack()
|
|
||||||
_ = this.rawWriter.Truncate(stat.Size() + extendSizePerStep)
|
_ = this.rawWriter.Truncate(stat.Size() + extendSizePerStep)
|
||||||
fsutils.WriterLimiter.Release()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -163,9 +154,7 @@ func (this *PartialFileWriter) WriteAt(offset int64, data []byte) error {
|
|||||||
this.bodyOffset = SizeMeta + int64(keyLength) + this.headerSize
|
this.bodyOffset = SizeMeta + int64(keyLength) + this.headerSize
|
||||||
}
|
}
|
||||||
|
|
||||||
fsutils.WriterLimiter.Ack()
|
|
||||||
_, err := this.rawWriter.WriteAt(data, this.bodyOffset+offset)
|
_, err := this.rawWriter.WriteAt(data, this.bodyOffset+offset)
|
||||||
fsutils.WriterLimiter.Release()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -192,9 +181,7 @@ func (this *PartialFileWriter) WriteBodyLength(bodyLength int64) error {
|
|||||||
_ = this.Discard()
|
_ = this.Discard()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
fsutils.WriterLimiter.Ack()
|
|
||||||
_, err = this.rawWriter.Write(bytes8)
|
_, err = this.rawWriter.Write(bytes8)
|
||||||
fsutils.WriterLimiter.Release()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = this.Discard()
|
_ = this.Discard()
|
||||||
return err
|
return err
|
||||||
@@ -211,9 +198,7 @@ func (this *PartialFileWriter) Close() error {
|
|||||||
this.ranges.BodySize = this.bodySize
|
this.ranges.BodySize = this.bodySize
|
||||||
err := this.ranges.WriteToFile(this.rangePath)
|
err := this.ranges.WriteToFile(this.rangePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fsutils.WriterLimiter.Ack()
|
|
||||||
_ = this.rawWriter.Close()
|
_ = this.rawWriter.Close()
|
||||||
fsutils.WriterLimiter.Release()
|
|
||||||
this.remove()
|
this.remove()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -222,25 +207,19 @@ func (this *PartialFileWriter) Close() error {
|
|||||||
if this.isNew {
|
if this.isNew {
|
||||||
err = this.WriteHeaderLength(types.Int(this.headerSize))
|
err = this.WriteHeaderLength(types.Int(this.headerSize))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fsutils.WriterLimiter.Ack()
|
|
||||||
_ = this.rawWriter.Close()
|
_ = this.rawWriter.Close()
|
||||||
fsutils.WriterLimiter.Release()
|
|
||||||
this.remove()
|
this.remove()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = this.WriteBodyLength(this.bodySize)
|
err = this.WriteBodyLength(this.bodySize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fsutils.WriterLimiter.Ack()
|
|
||||||
_ = this.rawWriter.Close()
|
_ = this.rawWriter.Close()
|
||||||
fsutils.WriterLimiter.Release()
|
|
||||||
this.remove()
|
this.remove()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fsutils.WriterLimiter.Ack()
|
|
||||||
err = this.rawWriter.Close()
|
err = this.rawWriter.Close()
|
||||||
fsutils.WriterLimiter.Release()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
this.remove()
|
this.remove()
|
||||||
}
|
}
|
||||||
@@ -254,9 +233,7 @@ func (this *PartialFileWriter) Discard() error {
|
|||||||
this.endFunc()
|
this.endFunc()
|
||||||
})
|
})
|
||||||
|
|
||||||
fsutils.WriterLimiter.Ack()
|
|
||||||
_ = this.rawWriter.Close()
|
_ = this.rawWriter.Close()
|
||||||
fsutils.WriterLimiter.Release()
|
|
||||||
|
|
||||||
SharedPartialRangesQueue.Delete(this.rangePath)
|
SharedPartialRangesQueue.Delete(this.rangePath)
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
package teaconst
|
package teaconst
|
||||||
|
|
||||||
const (
|
const (
|
||||||
Version = "1.3.8"
|
Version = "1.3.8.1"
|
||||||
|
|
||||||
ProductName = "Edge Node"
|
ProductName = "Edge Node"
|
||||||
ProcessName = "edge-node"
|
ProcessName = "edge-node"
|
||||||
|
|||||||
94
internal/utils/fs/file.go
Normal file
94
internal/utils/fs/file.go
Normal file
@@ -0,0 +1,94 @@
|
|||||||
|
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||||
|
|
||||||
|
package fsutils
|
||||||
|
|
||||||
|
import "os"
|
||||||
|
|
||||||
|
const FlagRead = 0x1
|
||||||
|
const FlagWrite = 0x2
|
||||||
|
|
||||||
|
type File struct {
|
||||||
|
rawFile *os.File
|
||||||
|
readonly bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFile(rawFile *os.File, flags int) *File {
|
||||||
|
return &File{
|
||||||
|
rawFile: rawFile,
|
||||||
|
readonly: flags&FlagRead == FlagRead,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *File) Name() string {
|
||||||
|
return this.rawFile.Name()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *File) Fd() uintptr {
|
||||||
|
return this.rawFile.Fd()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *File) Raw() *os.File {
|
||||||
|
return this.rawFile
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *File) Stat() (os.FileInfo, error) {
|
||||||
|
return this.rawFile.Stat()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *File) Seek(offset int64, whence int) (ret int64, err error) {
|
||||||
|
ret, err = this.rawFile.Seek(offset, whence)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *File) Read(b []byte) (n int, err error) {
|
||||||
|
ReaderLimiter.Ack()
|
||||||
|
n, err = this.rawFile.Read(b)
|
||||||
|
ReaderLimiter.Release()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *File) ReadAt(b []byte, off int64) (n int, err error) {
|
||||||
|
ReaderLimiter.Ack()
|
||||||
|
n, err = this.rawFile.ReadAt(b, off)
|
||||||
|
ReaderLimiter.Release()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *File) Write(b []byte) (n int, err error) {
|
||||||
|
WriterLimiter.Ack()
|
||||||
|
n, err = this.rawFile.Write(b)
|
||||||
|
WriterLimiter.Release()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *File) WriteAt(b []byte, off int64) (n int, err error) {
|
||||||
|
WriterLimiter.Ack()
|
||||||
|
n, err = this.rawFile.WriteAt(b, off)
|
||||||
|
WriterLimiter.Release()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *File) Sync() (err error) {
|
||||||
|
WriterLimiter.Ack()
|
||||||
|
err = this.rawFile.Sync()
|
||||||
|
WriterLimiter.Release()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *File) Truncate(size int64) (err error) {
|
||||||
|
WriterLimiter.Ack()
|
||||||
|
err = this.rawFile.Truncate(size)
|
||||||
|
WriterLimiter.Release()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *File) Close() (err error) {
|
||||||
|
if !this.readonly {
|
||||||
|
WriterLimiter.Ack()
|
||||||
|
}
|
||||||
|
err = this.rawFile.Close()
|
||||||
|
if !this.readonly {
|
||||||
|
WriterLimiter.Release()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
16
internal/utils/fs/file_test.go
Normal file
16
internal/utils/fs/file_test.go
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||||
|
|
||||||
|
package fsutils_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
|
||||||
|
"github.com/iwind/TeaGo/assert"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestFileFlags(t *testing.T) {
|
||||||
|
var a = assert.NewAssertion(t)
|
||||||
|
a.IsTrue(fsutils.FlagRead&fsutils.FlagRead == fsutils.FlagRead)
|
||||||
|
a.IsTrue(fsutils.FlagWrite&fsutils.FlagWrite != fsutils.FlagRead)
|
||||||
|
a.IsTrue((fsutils.FlagWrite|fsutils.FlagRead)&fsutils.FlagRead == fsutils.FlagRead)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user