From ea39310b7ecab8b2c3b611365062e6ee36b2a338 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Wed, 1 May 2024 15:53:49 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=B9=B6=E5=8F=91=E8=AF=BB?= =?UTF-8?q?=E5=86=99=E7=9B=B8=E5=85=B3=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/caches/reader_file.go | 22 ++---- internal/caches/reader_partial_file.go | 3 +- internal/caches/storage_file.go | 16 +++-- internal/caches/writer_file.go | 22 ++---- internal/caches/writer_partial_file.go | 33 ++------- internal/const/const.go | 2 +- internal/utils/fs/file.go | 94 ++++++++++++++++++++++++++ internal/utils/fs/file_test.go | 16 +++++ 8 files changed, 137 insertions(+), 71 deletions(-) create mode 100644 internal/utils/fs/file.go create mode 100644 internal/utils/fs/file_test.go diff --git a/internal/caches/reader_file.go b/internal/caches/reader_file.go index f62cb23..e55b6a8 100644 --- a/internal/caches/reader_file.go +++ b/internal/caches/reader_file.go @@ -11,7 +11,7 @@ import ( ) type FileReader struct { - fp *os.File + fp *fsutils.File openFile *OpenFile openFileCache *OpenFileCache @@ -29,7 +29,7 @@ type FileReader struct { isClosed bool } -func NewFileReader(fp *os.File) *FileReader { +func NewFileReader(fp *fsutils.File) *FileReader { return &FileReader{fp: fp} } @@ -175,9 +175,7 @@ func (this *FileReader) ReadHeader(buf []byte, callback ReaderFunc) error { var headerSize = this.headerSize for { - fsutils.ReaderLimiter.Ack() n, err := this.fp.Read(buf) - fsutils.ReaderLimiter.Release() if n > 0 { if n < headerSize { goNext, e := callback(n) @@ -239,9 +237,7 @@ func (this *FileReader) ReadBody(buf []byte, callback ReaderFunc) error { } for { - fsutils.ReaderLimiter.Ack() n, err := this.fp.Read(buf) - fsutils.ReaderLimiter.Release() if n > 0 { goNext, e := callback(n) if e != nil { @@ -272,9 +268,7 @@ func (this *FileReader) Read(buf []byte) (n int, err error) { return } - fsutils.ReaderLimiter.Ack() n, err = this.fp.Read(buf) - fsutils.ReaderLimiter.Release() if err != nil && err != io.EOF { _ = this.discard() } @@ -306,18 +300,14 @@ func (this *FileReader) ReadBodyRange(buf []byte, start int64, end int64, callba isOk = true return ErrInvalidRange } - fsutils.ReaderLimiter.Ack() _, err := this.fp.Seek(offset, io.SeekStart) - fsutils.ReaderLimiter.Release() if err != nil { return err } for { var n int - fsutils.ReaderLimiter.Ack() n, err = this.fp.Read(buf) - fsutils.ReaderLimiter.Release() if n > 0 { var n2 = int(end-offset) + 1 if n2 <= n { @@ -363,7 +353,7 @@ func (this *FileReader) ContainsRange(r rangeutils.Range) (r2 rangeutils.Range, // FP 原始的文件句柄 func (this *FileReader) FP() *os.File { - return this.fp + return this.fp.Raw() } func (this *FileReader) Close() error { @@ -378,7 +368,7 @@ func (this *FileReader) Close() error { } else { var cacheMeta = make([]byte, len(this.meta)) copy(cacheMeta, this.meta) - this.openFileCache.Put(this.fp.Name(), NewOpenFile(this.fp, cacheMeta, this.header, this.LastModified(), this.bodySize)) + this.openFileCache.Put(this.fp.Name(), NewOpenFile(this.fp.Raw(), cacheMeta, this.header, this.LastModified(), this.bodySize)) } return nil } @@ -386,10 +376,8 @@ func (this *FileReader) Close() error { return this.fp.Close() } -func (this *FileReader) readToBuff(fp *os.File, buf []byte) (ok bool, err error) { - fsutils.ReaderLimiter.Ack() +func (this *FileReader) readToBuff(fp *fsutils.File, buf []byte) (ok bool, err error) { n, err := fp.Read(buf) - fsutils.ReaderLimiter.Release() if err != nil { return false, err } diff --git a/internal/caches/reader_partial_file.go b/internal/caches/reader_partial_file.go index 9ce627b..64d4119 100644 --- a/internal/caches/reader_partial_file.go +++ b/internal/caches/reader_partial_file.go @@ -8,7 +8,6 @@ import ( rangeutils "github.com/TeaOSLab/EdgeNode/internal/utils/ranges" "github.com/iwind/TeaGo/types" "io" - "os" ) type PartialFileReader struct { @@ -18,7 +17,7 @@ type PartialFileReader struct { rangePath string } -func NewPartialFileReader(fp *os.File) *PartialFileReader { +func NewPartialFileReader(fp *fsutils.File) *PartialFileReader { return &PartialFileReader{ FileReader: NewFileReader(fp), rangePath: PartialRangesFilePath(fp.Name()), diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 73aba4a..250c08d 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -439,12 +439,12 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool, var reader Reader if isPartial { - var partialFileReader = NewPartialFileReader(fp) + var partialFileReader = NewPartialFileReader(fsutils.NewFile(fp, fsutils.FlagRead)) partialFileReader.openFile = openFile partialFileReader.openFileCache = openFileCache reader = partialFileReader } else { - var fileReader = NewFileReader(fp) + var fileReader = NewFileReader(fsutils.NewFile(fp, fsutils.FlagRead)) fileReader.openFile = openFile fileReader.openFileCache = openFileCache reader = fileReader @@ -593,7 +593,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea if existsCacheItem { readerFp, err := fsutils.OpenFile(tmpPath, os.O_RDONLY, 0444) if err == nil { - var partialReader = NewPartialFileReader(readerFp) + var partialReader = NewPartialFileReader(fsutils.NewFile(readerFp, fsutils.FlagRead)) err = partialReader.Init() _ = partialReader.Close() if err == nil && partialReader.bodyOffset > 0 { @@ -629,7 +629,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea return nil, ErrServerIsBusy } } - writer, err := os.OpenFile(tmpPath, flags, 0666) + fp, err := os.OpenFile(tmpPath, flags, 0666) if !isFlushing { fsutils.WriterLimiter.Release() } @@ -639,7 +639,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea // open file again fsutils.WriterLimiter.Ack() - writer, err = os.OpenFile(tmpPath, flags, 0666) + fp, err = os.OpenFile(tmpPath, flags, 0666) fsutils.WriterLimiter.Release() } if err != nil { @@ -647,6 +647,8 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea } } + var writer = fsutils.NewFile(fp, fsutils.FlagWrite) + var removeOnFailure = true defer func() { if err != nil { @@ -663,7 +665,9 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea }() // 尝试锁定,如果锁定失败,则直接返回 + fsutils.WriterLimiter.Ack() err = syscall.Flock(int(writer.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) + fsutils.WriterLimiter.Release() if err != nil { removeOnFailure = false return nil, fmt.Errorf("%w (003)", ErrFileIsWriting) @@ -700,9 +704,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea metaBodySize = bodySize } - fsutils.WriterLimiter.Ack() _, err = writer.Write(metaBytes) - fsutils.WriterLimiter.Release() if err != nil { return nil, err } diff --git a/internal/caches/writer_file.go b/internal/caches/writer_file.go index ca21fe9..42c0584 100644 --- a/internal/caches/writer_file.go +++ b/internal/caches/writer_file.go @@ -6,14 +6,13 @@ import ( fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" "github.com/iwind/TeaGo/types" "io" - "os" "strings" "sync" ) type FileWriter struct { storage StorageInterface - rawWriter *os.File + rawWriter *fsutils.File key string metaHeaderSize int @@ -26,9 +25,11 @@ type FileWriter struct { maxSize int64 endFunc func() once sync.Once + + modifiedBytes int } -func NewFileWriter(storage StorageInterface, rawWriter *os.File, key string, expiredAt int64, metaHeaderSize int, metaBodySize int64, maxSize int64, endFunc func()) *FileWriter { +func NewFileWriter(storage StorageInterface, rawWriter *fsutils.File, key string, expiredAt int64, metaHeaderSize int, metaBodySize int64, maxSize int64, endFunc func()) *FileWriter { return &FileWriter{ storage: storage, key: key, @@ -43,9 +44,7 @@ func NewFileWriter(storage StorageInterface, rawWriter *os.File, key string, exp // WriteHeader 写入数据 func (this *FileWriter) WriteHeader(data []byte) (n int, err error) { - fsutils.WriterLimiter.Ack() n, err = this.rawWriter.Write(data) - fsutils.WriterLimiter.Release() this.headerSize += int64(n) if err != nil { _ = this.Discard() @@ -79,7 +78,7 @@ func (this *FileWriter) Write(data []byte) (n int, err error) { var l = len(data) if l > (2 << 20) { var offset = 0 - const bufferSize = 256 << 10 + const bufferSize = 64 << 10 for { var end = offset + bufferSize if end > l { @@ -145,24 +144,18 @@ func (this *FileWriter) Close() error { err := this.WriteHeaderLength(types.Int(this.headerSize)) if err != nil { - fsutils.WriterLimiter.Ack() _ = this.rawWriter.Close() - fsutils.WriterLimiter.Release() _ = fsutils.Remove(path) return err } err = this.WriteBodyLength(this.bodySize) if err != nil { - fsutils.WriterLimiter.Ack() _ = this.rawWriter.Close() - fsutils.WriterLimiter.Release() _ = fsutils.Remove(path) return err } - fsutils.WriterLimiter.Ack() err = this.rawWriter.Close() - fsutils.WriterLimiter.Release() if err != nil { _ = fsutils.Remove(path) } else if strings.HasSuffix(path, FileTmpSuffix) { @@ -181,9 +174,7 @@ func (this *FileWriter) Discard() error { this.endFunc() }) - fsutils.WriterLimiter.Ack() _ = this.rawWriter.Close() - fsutils.WriterLimiter.Release() err := fsutils.Remove(this.rawWriter.Name()) return err @@ -211,9 +202,7 @@ func (this *FileWriter) ItemType() ItemType { } func (this *FileWriter) write(data []byte) (n int, err error) { - fsutils.WriterLimiter.Ack() n, err = this.rawWriter.Write(data) - fsutils.WriterLimiter.Release() this.bodySize += int64(n) if this.maxSize > 0 && this.bodySize > this.maxSize { @@ -227,5 +216,6 @@ func (this *FileWriter) write(data []byte) (n int, err error) { if err != nil { _ = this.Discard() } + return } diff --git a/internal/caches/writer_partial_file.go b/internal/caches/writer_partial_file.go index 4afe677..2b46c5a 100644 --- a/internal/caches/writer_partial_file.go +++ b/internal/caches/writer_partial_file.go @@ -7,12 +7,11 @@ import ( fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" "github.com/iwind/TeaGo/types" "io" - "os" "sync" ) type PartialFileWriter struct { - rawWriter *os.File + rawWriter *fsutils.File key string metaHeaderSize int @@ -33,7 +32,7 @@ type PartialFileWriter struct { rangePath string } -func NewPartialFileWriter(rawWriter *os.File, key string, expiredAt int64, metaHeaderSize int, metaBodySize int64, isNew bool, isPartial bool, bodyOffset int64, ranges *PartialRanges, endFunc func()) *PartialFileWriter { +func NewPartialFileWriter(rawWriter *fsutils.File, key string, expiredAt int64, metaHeaderSize int, metaBodySize int64, isNew bool, isPartial bool, bodyOffset int64, ranges *PartialRanges, endFunc func()) *PartialFileWriter { return &PartialFileWriter{ key: key, rawWriter: rawWriter, @@ -54,9 +53,7 @@ func (this *PartialFileWriter) WriteHeader(data []byte) (n int, err error) { if !this.isNew { return } - fsutils.WriterLimiter.Ack() n, err = this.rawWriter.Write(data) - fsutils.WriterLimiter.Release() this.headerSize += int64(n) if err != nil { _ = this.Discard() @@ -65,9 +62,7 @@ func (this *PartialFileWriter) WriteHeader(data []byte) (n int, err error) { } func (this *PartialFileWriter) AppendHeader(data []byte) error { - fsutils.WriterLimiter.Ack() _, err := this.rawWriter.Write(data) - fsutils.WriterLimiter.Release() if err != nil { _ = this.Discard() } else { @@ -94,9 +89,7 @@ func (this *PartialFileWriter) WriteHeaderLength(headerLength int) error { _ = this.Discard() return err } - fsutils.WriterLimiter.Ack() _, err = this.rawWriter.Write(bytes4) - fsutils.WriterLimiter.Release() if err != nil { _ = this.Discard() return err @@ -106,9 +99,7 @@ func (this *PartialFileWriter) WriteHeaderLength(headerLength int) error { // Write 写入数据 func (this *PartialFileWriter) Write(data []byte) (n int, err error) { - fsutils.WriterLimiter.Ack() n, err = this.rawWriter.Write(data) - fsutils.WriterLimiter.Release() this.bodySize += int64(n) if err != nil { _ = this.Discard() @@ -132,11 +123,14 @@ func (this *PartialFileWriter) WriteAt(offset int64, data []byte) error { // prevent extending too much space in a single writing var maxOffset = this.ranges.Max() if offset-maxOffset > 16<<20 { + var extendSizePerStep int64 = 1 << 20 var maxExtendSize int64 = 32 << 20 if fsutils.DiskIsExtremelyFast() { maxExtendSize = 128 << 20 + extendSizePerStep = 4 << 20 } else if fsutils.DiskIsFast() { maxExtendSize = 64 << 20 + extendSizePerStep = 2 << 20 } if offset-maxOffset > maxExtendSize { stat, err := this.rawWriter.Stat() @@ -145,11 +139,8 @@ func (this *PartialFileWriter) WriteAt(offset int64, data []byte) error { } // extend min size to prepare for file tail - const extendSizePerStep = 8 << 20 if stat.Size()+extendSizePerStep <= this.bodyOffset+offset+int64(len(data)) { - fsutils.WriterLimiter.Ack() _ = this.rawWriter.Truncate(stat.Size() + extendSizePerStep) - fsutils.WriterLimiter.Release() return nil } } @@ -163,9 +154,7 @@ func (this *PartialFileWriter) WriteAt(offset int64, data []byte) error { this.bodyOffset = SizeMeta + int64(keyLength) + this.headerSize } - fsutils.WriterLimiter.Ack() _, err := this.rawWriter.WriteAt(data, this.bodyOffset+offset) - fsutils.WriterLimiter.Release() if err != nil { return err } @@ -192,9 +181,7 @@ func (this *PartialFileWriter) WriteBodyLength(bodyLength int64) error { _ = this.Discard() return err } - fsutils.WriterLimiter.Ack() _, err = this.rawWriter.Write(bytes8) - fsutils.WriterLimiter.Release() if err != nil { _ = this.Discard() return err @@ -211,9 +198,7 @@ func (this *PartialFileWriter) Close() error { this.ranges.BodySize = this.bodySize err := this.ranges.WriteToFile(this.rangePath) if err != nil { - fsutils.WriterLimiter.Ack() _ = this.rawWriter.Close() - fsutils.WriterLimiter.Release() this.remove() return err } @@ -222,25 +207,19 @@ func (this *PartialFileWriter) Close() error { if this.isNew { err = this.WriteHeaderLength(types.Int(this.headerSize)) if err != nil { - fsutils.WriterLimiter.Ack() _ = this.rawWriter.Close() - fsutils.WriterLimiter.Release() this.remove() return err } err = this.WriteBodyLength(this.bodySize) if err != nil { - fsutils.WriterLimiter.Ack() _ = this.rawWriter.Close() - fsutils.WriterLimiter.Release() this.remove() return err } } - fsutils.WriterLimiter.Ack() err = this.rawWriter.Close() - fsutils.WriterLimiter.Release() if err != nil { this.remove() } @@ -254,9 +233,7 @@ func (this *PartialFileWriter) Discard() error { this.endFunc() }) - fsutils.WriterLimiter.Ack() _ = this.rawWriter.Close() - fsutils.WriterLimiter.Release() SharedPartialRangesQueue.Delete(this.rangePath) diff --git a/internal/const/const.go b/internal/const/const.go index 8c805f9..150a34d 100644 --- a/internal/const/const.go +++ b/internal/const/const.go @@ -1,7 +1,7 @@ package teaconst const ( - Version = "1.3.8" + Version = "1.3.8.1" ProductName = "Edge Node" ProcessName = "edge-node" diff --git a/internal/utils/fs/file.go b/internal/utils/fs/file.go new file mode 100644 index 0000000..c3588cf --- /dev/null +++ b/internal/utils/fs/file.go @@ -0,0 +1,94 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package fsutils + +import "os" + +const FlagRead = 0x1 +const FlagWrite = 0x2 + +type File struct { + rawFile *os.File + readonly bool +} + +func NewFile(rawFile *os.File, flags int) *File { + return &File{ + rawFile: rawFile, + readonly: flags&FlagRead == FlagRead, + } +} + +func (this *File) Name() string { + return this.rawFile.Name() +} + +func (this *File) Fd() uintptr { + return this.rawFile.Fd() +} + +func (this *File) Raw() *os.File { + return this.rawFile +} + +func (this *File) Stat() (os.FileInfo, error) { + return this.rawFile.Stat() +} + +func (this *File) Seek(offset int64, whence int) (ret int64, err error) { + ret, err = this.rawFile.Seek(offset, whence) + return +} + +func (this *File) Read(b []byte) (n int, err error) { + ReaderLimiter.Ack() + n, err = this.rawFile.Read(b) + ReaderLimiter.Release() + return +} + +func (this *File) ReadAt(b []byte, off int64) (n int, err error) { + ReaderLimiter.Ack() + n, err = this.rawFile.ReadAt(b, off) + ReaderLimiter.Release() + return +} + +func (this *File) Write(b []byte) (n int, err error) { + WriterLimiter.Ack() + n, err = this.rawFile.Write(b) + WriterLimiter.Release() + return +} + +func (this *File) WriteAt(b []byte, off int64) (n int, err error) { + WriterLimiter.Ack() + n, err = this.rawFile.WriteAt(b, off) + WriterLimiter.Release() + return +} + +func (this *File) Sync() (err error) { + WriterLimiter.Ack() + err = this.rawFile.Sync() + WriterLimiter.Release() + return +} + +func (this *File) Truncate(size int64) (err error) { + WriterLimiter.Ack() + err = this.rawFile.Truncate(size) + WriterLimiter.Release() + return +} + +func (this *File) Close() (err error) { + if !this.readonly { + WriterLimiter.Ack() + } + err = this.rawFile.Close() + if !this.readonly { + WriterLimiter.Release() + } + return +} diff --git a/internal/utils/fs/file_test.go b/internal/utils/fs/file_test.go new file mode 100644 index 0000000..d9bd6d2 --- /dev/null +++ b/internal/utils/fs/file_test.go @@ -0,0 +1,16 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package fsutils_test + +import ( + fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" + "github.com/iwind/TeaGo/assert" + "testing" +) + +func TestFileFlags(t *testing.T) { + var a = assert.NewAssertion(t) + a.IsTrue(fsutils.FlagRead&fsutils.FlagRead == fsutils.FlagRead) + a.IsTrue(fsutils.FlagWrite&fsutils.FlagWrite != fsutils.FlagRead) + a.IsTrue((fsutils.FlagWrite|fsutils.FlagRead)&fsutils.FlagRead == fsutils.FlagRead) +}