From ed0c562b2e81c74d43c7e46bf2db14ae7bb58f38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E7=A5=A5=E8=B6=85?= Date: Thu, 14 Apr 2022 09:36:02 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=BC=93=E5=AD=98=E7=9B=B8?= =?UTF-8?q?=E5=85=B3=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/caches/open_file_pool_test.go | 2 +- internal/caches/reader_file.go | 12 ++-- internal/caches/reader_memory.go | 2 +- internal/caches/reader_memory_test.go | 6 +- internal/caches/storage_file.go | 83 +++++++++++++------------- internal/caches/storage_interface.go | 5 +- internal/caches/storage_memory.go | 19 +++--- internal/caches/writer_memory.go | 2 +- internal/nodes/http_writer.go | 2 + 9 files changed, 71 insertions(+), 62 deletions(-) diff --git a/internal/caches/open_file_pool_test.go b/internal/caches/open_file_pool_test.go index a1460d8..b27662f 100644 --- a/internal/caches/open_file_pool_test.go +++ b/internal/caches/open_file_pool_test.go @@ -11,7 +11,7 @@ func TestOpenFilePool_Get(t *testing.T) { var pool = caches.NewOpenFilePool("a") t.Log(pool.Filename()) t.Log(pool.Get()) - t.Log(pool.Put(caches.NewOpenFile(nil, nil, []byte{}))) + t.Log(pool.Put(caches.NewOpenFile(nil, nil, nil, 0))) t.Log(pool.Get()) t.Log(pool.Get()) } diff --git a/internal/caches/reader_file.go b/internal/caches/reader_file.go index 115724c..0b14a74 100644 --- a/internal/caches/reader_file.go +++ b/internal/caches/reader_file.go @@ -67,17 +67,17 @@ func (this *FileReader) InitAutoDiscard(autoDiscard bool) error { this.expiresAt = int64(binary.BigEndian.Uint32(buf[:SizeExpiresAt])) - status := types.Int(string(buf[SizeExpiresAt : SizeExpiresAt+SizeStatus])) + status := types.Int(string(buf[OffsetStatus : OffsetStatus+SizeStatus])) if status < 100 || status > 999 { return errors.New("invalid status") } this.status = status // URL - urlLength := binary.BigEndian.Uint32(buf[SizeExpiresAt+SizeStatus : SizeExpiresAt+SizeStatus+SizeURLLength]) + urlLength := binary.BigEndian.Uint32(buf[OffsetURLLength : OffsetURLLength+SizeURLLength]) // header - headerSize := int(binary.BigEndian.Uint32(buf[SizeExpiresAt+SizeStatus+SizeURLLength : SizeExpiresAt+SizeStatus+SizeURLLength+SizeHeaderLength])) + headerSize := int(binary.BigEndian.Uint32(buf[OffsetHeaderLength : OffsetHeaderLength+SizeHeaderLength])) if headerSize == 0 { return nil } @@ -86,7 +86,7 @@ func (this *FileReader) InitAutoDiscard(autoDiscard bool) error { // body this.bodyOffset = this.headerOffset + int64(headerSize) - bodySize := int(binary.BigEndian.Uint64(buf[SizeExpiresAt+SizeStatus+SizeURLLength+SizeHeaderLength : SizeExpiresAt+SizeStatus+SizeURLLength+SizeHeaderLength+SizeBodyLength])) + bodySize := int(binary.BigEndian.Uint64(buf[OffsetBodyLength : OffsetBodyLength+SizeBodyLength])) if bodySize == 0 { isOk = true return nil @@ -353,7 +353,9 @@ func (this *FileReader) Close() error { if this.openFile != nil { this.openFileCache.Put(this.fp.Name(), this.openFile) } else { - this.openFileCache.Put(this.fp.Name(), NewOpenFile(this.fp, this.meta, this.header, this.LastModified())) + var cacheMeta = make([]byte, len(this.meta)) + copy(cacheMeta, this.meta) + this.openFileCache.Put(this.fp.Name(), NewOpenFile(this.fp, cacheMeta, this.header, this.LastModified())) } return nil } diff --git a/internal/caches/reader_memory.go b/internal/caches/reader_memory.go index 1301560..f5fea39 100644 --- a/internal/caches/reader_memory.go +++ b/internal/caches/reader_memory.go @@ -25,7 +25,7 @@ func (this *MemoryReader) TypeName() string { } func (this *MemoryReader) ExpiresAt() int64 { - return this.item.ExpiredAt + return this.item.ExpiresAt } func (this *MemoryReader) Status() int { diff --git a/internal/caches/reader_memory_test.go b/internal/caches/reader_memory_test.go index 77f6a32..a388bdf 100644 --- a/internal/caches/reader_memory_test.go +++ b/internal/caches/reader_memory_test.go @@ -4,7 +4,7 @@ import "testing" func TestMemoryReader_Header(t *testing.T) { item := &MemoryItem{ - ExpiredAt: 0, + ExpiresAt: 0, HeaderValue: []byte("0123456789"), BodyValue: nil, Status: 2000, @@ -22,7 +22,7 @@ func TestMemoryReader_Header(t *testing.T) { func TestMemoryReader_Body(t *testing.T) { item := &MemoryItem{ - ExpiredAt: 0, + ExpiresAt: 0, HeaderValue: nil, BodyValue: []byte("0123456789"), Status: 2000, @@ -40,7 +40,7 @@ func TestMemoryReader_Body(t *testing.T) { func TestMemoryReader_Body_Range(t *testing.T) { item := &MemoryItem{ - ExpiredAt: 0, + ExpiresAt: 0, HeaderValue: nil, BodyValue: []byte("0123456789"), Status: 2000, diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 44bed8f..9312370 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -37,13 +37,19 @@ import ( ) const ( - SizeExpiresAt = 4 - SizeStatus = 3 - SizeURLLength = 4 - SizeHeaderLength = 4 - SizeBodyLength = 8 + SizeExpiresAt = 4 + OffsetExpiresAt = 0 + SizeStatus = 3 + OffsetStatus = SizeExpiresAt + SizeURLLength = 4 + OffsetURLLength = OffsetStatus + SizeStatus + SizeHeaderLength = 4 + OffsetHeaderLength = OffsetURLLength + SizeURLLength + SizeBodyLength = 8 + OffsetBodyLength = OffsetHeaderLength + SizeHeaderLength - SizeMeta = SizeExpiresAt + SizeStatus + SizeURLLength + SizeHeaderLength + SizeBodyLength + SizeMeta = SizeExpiresAt + SizeStatus + SizeURLLength + SizeHeaderLength + SizeBodyLength + OffsetKey = SizeMeta ) const ( @@ -56,6 +62,8 @@ const ( var sharedWritingFileKeyMap = map[string]zero.Zero{} // key => bool var sharedWritingFileKeyLocker = sync.Mutex{} +var maxOpenFiles = 2 + // FileStorage 文件缓存 // 文件结构: // [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data] @@ -369,7 +377,16 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool, } // OpenWriter 打开缓存文件等待写入 -func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, size int64, maxSize int64, isPartial bool) (Writer, error) { +func (this *FileStorage) OpenWriter(key string, expiresAt int64, status int, size int64, maxSize int64, isPartial bool) (Writer, error) { + return this.openWriter(key, expiresAt, status, size, maxSize, isPartial, false) +} + +// OpenFlushWriter 打开从其他媒介直接刷入的写入器 +func (this *FileStorage) OpenFlushWriter(key string, expiresAt int64, status int) (Writer, error) { + return this.openWriter(key, expiresAt, status, -1, -1, false, true) +} + +func (this *FileStorage) openWriter(key string, expiredAt int64, status int, size int64, maxSize int64, isPartial bool, isFlushing bool) (Writer, error) { // 是否正在退出 if teaconst.IsQuiting { return nil, ErrWritingUnavailable @@ -387,7 +404,7 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, siz maxMemorySize = maxSize } var memoryStorage = this.memoryStorage - if !isPartial && memoryStorage != nil && ((size > 0 && size < maxMemorySize) || size < 0) { + if !isFlushing && !isPartial && memoryStorage != nil && ((size > 0 && size < maxMemorySize) || size < 0) { writer, err := memoryStorage.OpenWriter(key, expiredAt, status, size, maxMemorySize, false) if err == nil { return writer, nil @@ -407,6 +424,12 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, siz sharedWritingFileKeyLocker.Unlock() return nil, ErrFileIsWriting } + + if len(sharedWritingFileKeyMap) > maxOpenFiles { + sharedWritingFileKeyLocker.Unlock() + return nil, ErrTooManyOpenFiles + } + sharedWritingFileKeyMap[key] = zero.New() sharedWritingFileKeyLocker.Unlock() defer func() { @@ -523,54 +546,28 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, siz } // 写入过期时间 - bytes4 := make([]byte, 4) - { - binary.BigEndian.PutUint32(bytes4, uint32(expiredAt)) - _, err = writer.Write(bytes4) - if err != nil { - return nil, err - } - } + var metaBytes = make([]byte, SizeMeta+len(key)) + binary.BigEndian.PutUint32(metaBytes[OffsetExpiresAt:], uint32(expiredAt)) // 写入状态码 if status > 999 || status < 100 { status = 200 } - _, err = writer.WriteString(strconv.Itoa(status)) - if err != nil { - return nil, err - } + copy(metaBytes[OffsetStatus:], strconv.Itoa(status)) // 写入URL长度 - { - binary.BigEndian.PutUint32(bytes4, uint32(len(key))) - _, err = writer.Write(bytes4) - if err != nil { - return nil, err - } - } + binary.BigEndian.PutUint32(metaBytes[OffsetURLLength:], uint32(len(key))) // 写入Header Length - { - binary.BigEndian.PutUint32(bytes4, uint32(0)) - _, err = writer.Write(bytes4) - if err != nil { - return nil, err - } - } + binary.BigEndian.PutUint32(metaBytes[OffsetHeaderLength:], uint32(0)) // 写入Body Length - { - b := make([]byte, SizeBodyLength) - binary.BigEndian.PutUint64(b, uint64(0)) - _, err = writer.Write(b) - if err != nil { - return nil, err - } - } + binary.BigEndian.PutUint64(metaBytes[OffsetBodyLength:], uint64(0)) // 写入URL - _, err = writer.WriteString(key) + copy(metaBytes[OffsetKey:], key) + + _, err = writer.Write(metaBytes) if err != nil { return nil, err } diff --git a/internal/caches/storage_interface.go b/internal/caches/storage_interface.go index be12bff..2a6b14d 100644 --- a/internal/caches/storage_interface.go +++ b/internal/caches/storage_interface.go @@ -14,7 +14,10 @@ type StorageInterface interface { // OpenWriter 打开缓存写入器等待写入 // size 和 maxSize 可能为-1 - OpenWriter(key string, expiredAt int64, status int, size int64, maxSize int64, isPartial bool) (Writer, error) + OpenWriter(key string, expiresAt int64, status int, size int64, maxSize int64, isPartial bool) (Writer, error) + + // OpenFlushWriter 打开从其他媒介直接刷入的写入器 + OpenFlushWriter(key string, expiresAt int64, status int) (Writer, error) // Delete 删除某个键值对应的缓存 Delete(key string) error diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index 201ddc0..bc7e04a 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -23,7 +23,7 @@ import ( ) type MemoryItem struct { - ExpiredAt int64 + ExpiresAt int64 HeaderValue []byte BodyValue []byte Status int @@ -32,7 +32,7 @@ type MemoryItem struct { } func (this *MemoryItem) IsExpired() bool { - return this.ExpiredAt < utils.UnixTime() + return this.ExpiresAt < utils.UnixTime() } type MemoryStorage struct { @@ -118,7 +118,7 @@ func (this *MemoryStorage) OpenReader(key string, useStale bool, isPartial bool) return nil, ErrNotFound } - if useStale || (item.ExpiredAt > utils.UnixTime()) { + if useStale || (item.ExpiresAt > utils.UnixTime()) { reader := NewMemoryReader(item) err := reader.Init() if err != nil { @@ -160,7 +160,12 @@ func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int, s return this.openWriter(key, expiredAt, status, size, maxSize, true) } -func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, size int64, maxSize int64, isDirty bool) (Writer, error) { +// OpenFlushWriter 打开从其他媒介直接刷入的写入器 +func (this *MemoryStorage) OpenFlushWriter(key string, expiresAt int64, status int) (Writer, error) { + return this.openWriter(key, expiresAt, status, -1, -1, true) +} + +func (this *MemoryStorage) openWriter(key string, expiresAt int64, status int, size int64, maxSize int64, isDirty bool) (Writer, error) { // 待写入队列是否已满 if isDirty && this.parentStorage != nil && @@ -215,7 +220,7 @@ func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, s } isWriting = true - return NewMemoryWriter(this, key, expiredAt, status, isDirty, maxSize, func() { + return NewMemoryWriter(this, key, expiresAt, status, isDirty, maxSize, func() { this.locker.Lock() delete(this.writingKeyMap, key) this.locker.Unlock() @@ -471,7 +476,7 @@ func (this *MemoryStorage) flushItem(key string) { return } - writer, err := this.parentStorage.OpenWriter(key, item.ExpiredAt, item.Status, -1, -1, false) + writer, err := this.parentStorage.OpenFlushWriter(key, item.ExpiresAt, item.Status) if err != nil { if !CanIgnoreErr(err) { remotelogs.Error("CACHE", "flush items failed: open writer failed: "+err.Error()) @@ -503,7 +508,7 @@ func (this *MemoryStorage) flushItem(key string) { this.parentStorage.AddToList(&Item{ Type: writer.ItemType(), Key: key, - ExpiredAt: item.ExpiredAt, + ExpiredAt: item.ExpiresAt, HeaderSize: writer.HeaderSize(), BodySize: writer.BodySize(), }) diff --git a/internal/caches/writer_memory.go b/internal/caches/writer_memory.go index 2a1c630..dc12f39 100644 --- a/internal/caches/writer_memory.go +++ b/internal/caches/writer_memory.go @@ -30,7 +30,7 @@ func NewMemoryWriter(memoryStorage *MemoryStorage, key string, expiredAt int64, key: key, expiredAt: expiredAt, item: &MemoryItem{ - ExpiredAt: expiredAt, + ExpiresAt: expiredAt, ModifiedAt: time.Now().Unix(), Status: status, }, diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index 89cf81c..88baa93 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -313,6 +313,8 @@ func (this *HTTPWriter) PrepareCache(resp *http.Response, size int64) { if !caches.CanIgnoreErr(err) { remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error()) } + + this.Header().Set("X-Cache", "BYPASS, open failed") return } this.cacheWriter = cacheWriter