mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-09 03:50:27 +08:00
优化缓存相关代码
This commit is contained in:
@@ -11,7 +11,7 @@ func TestOpenFilePool_Get(t *testing.T) {
|
|||||||
var pool = caches.NewOpenFilePool("a")
|
var pool = caches.NewOpenFilePool("a")
|
||||||
t.Log(pool.Filename())
|
t.Log(pool.Filename())
|
||||||
t.Log(pool.Get())
|
t.Log(pool.Get())
|
||||||
t.Log(pool.Put(caches.NewOpenFile(nil, nil, []byte{})))
|
t.Log(pool.Put(caches.NewOpenFile(nil, nil, nil, 0)))
|
||||||
t.Log(pool.Get())
|
t.Log(pool.Get())
|
||||||
t.Log(pool.Get())
|
t.Log(pool.Get())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -67,17 +67,17 @@ func (this *FileReader) InitAutoDiscard(autoDiscard bool) error {
|
|||||||
|
|
||||||
this.expiresAt = int64(binary.BigEndian.Uint32(buf[:SizeExpiresAt]))
|
this.expiresAt = int64(binary.BigEndian.Uint32(buf[:SizeExpiresAt]))
|
||||||
|
|
||||||
status := types.Int(string(buf[SizeExpiresAt : SizeExpiresAt+SizeStatus]))
|
status := types.Int(string(buf[OffsetStatus : OffsetStatus+SizeStatus]))
|
||||||
if status < 100 || status > 999 {
|
if status < 100 || status > 999 {
|
||||||
return errors.New("invalid status")
|
return errors.New("invalid status")
|
||||||
}
|
}
|
||||||
this.status = status
|
this.status = status
|
||||||
|
|
||||||
// URL
|
// URL
|
||||||
urlLength := binary.BigEndian.Uint32(buf[SizeExpiresAt+SizeStatus : SizeExpiresAt+SizeStatus+SizeURLLength])
|
urlLength := binary.BigEndian.Uint32(buf[OffsetURLLength : OffsetURLLength+SizeURLLength])
|
||||||
|
|
||||||
// header
|
// header
|
||||||
headerSize := int(binary.BigEndian.Uint32(buf[SizeExpiresAt+SizeStatus+SizeURLLength : SizeExpiresAt+SizeStatus+SizeURLLength+SizeHeaderLength]))
|
headerSize := int(binary.BigEndian.Uint32(buf[OffsetHeaderLength : OffsetHeaderLength+SizeHeaderLength]))
|
||||||
if headerSize == 0 {
|
if headerSize == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -86,7 +86,7 @@ func (this *FileReader) InitAutoDiscard(autoDiscard bool) error {
|
|||||||
|
|
||||||
// body
|
// body
|
||||||
this.bodyOffset = this.headerOffset + int64(headerSize)
|
this.bodyOffset = this.headerOffset + int64(headerSize)
|
||||||
bodySize := int(binary.BigEndian.Uint64(buf[SizeExpiresAt+SizeStatus+SizeURLLength+SizeHeaderLength : SizeExpiresAt+SizeStatus+SizeURLLength+SizeHeaderLength+SizeBodyLength]))
|
bodySize := int(binary.BigEndian.Uint64(buf[OffsetBodyLength : OffsetBodyLength+SizeBodyLength]))
|
||||||
if bodySize == 0 {
|
if bodySize == 0 {
|
||||||
isOk = true
|
isOk = true
|
||||||
return nil
|
return nil
|
||||||
@@ -353,7 +353,9 @@ func (this *FileReader) Close() error {
|
|||||||
if this.openFile != nil {
|
if this.openFile != nil {
|
||||||
this.openFileCache.Put(this.fp.Name(), this.openFile)
|
this.openFileCache.Put(this.fp.Name(), this.openFile)
|
||||||
} else {
|
} else {
|
||||||
this.openFileCache.Put(this.fp.Name(), NewOpenFile(this.fp, this.meta, this.header, this.LastModified()))
|
var cacheMeta = make([]byte, len(this.meta))
|
||||||
|
copy(cacheMeta, this.meta)
|
||||||
|
this.openFileCache.Put(this.fp.Name(), NewOpenFile(this.fp, cacheMeta, this.header, this.LastModified()))
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ func (this *MemoryReader) TypeName() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *MemoryReader) ExpiresAt() int64 {
|
func (this *MemoryReader) ExpiresAt() int64 {
|
||||||
return this.item.ExpiredAt
|
return this.item.ExpiresAt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *MemoryReader) Status() int {
|
func (this *MemoryReader) Status() int {
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import "testing"
|
|||||||
|
|
||||||
func TestMemoryReader_Header(t *testing.T) {
|
func TestMemoryReader_Header(t *testing.T) {
|
||||||
item := &MemoryItem{
|
item := &MemoryItem{
|
||||||
ExpiredAt: 0,
|
ExpiresAt: 0,
|
||||||
HeaderValue: []byte("0123456789"),
|
HeaderValue: []byte("0123456789"),
|
||||||
BodyValue: nil,
|
BodyValue: nil,
|
||||||
Status: 2000,
|
Status: 2000,
|
||||||
@@ -22,7 +22,7 @@ func TestMemoryReader_Header(t *testing.T) {
|
|||||||
|
|
||||||
func TestMemoryReader_Body(t *testing.T) {
|
func TestMemoryReader_Body(t *testing.T) {
|
||||||
item := &MemoryItem{
|
item := &MemoryItem{
|
||||||
ExpiredAt: 0,
|
ExpiresAt: 0,
|
||||||
HeaderValue: nil,
|
HeaderValue: nil,
|
||||||
BodyValue: []byte("0123456789"),
|
BodyValue: []byte("0123456789"),
|
||||||
Status: 2000,
|
Status: 2000,
|
||||||
@@ -40,7 +40,7 @@ func TestMemoryReader_Body(t *testing.T) {
|
|||||||
|
|
||||||
func TestMemoryReader_Body_Range(t *testing.T) {
|
func TestMemoryReader_Body_Range(t *testing.T) {
|
||||||
item := &MemoryItem{
|
item := &MemoryItem{
|
||||||
ExpiredAt: 0,
|
ExpiresAt: 0,
|
||||||
HeaderValue: nil,
|
HeaderValue: nil,
|
||||||
BodyValue: []byte("0123456789"),
|
BodyValue: []byte("0123456789"),
|
||||||
Status: 2000,
|
Status: 2000,
|
||||||
|
|||||||
@@ -38,12 +38,18 @@ import (
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
SizeExpiresAt = 4
|
SizeExpiresAt = 4
|
||||||
|
OffsetExpiresAt = 0
|
||||||
SizeStatus = 3
|
SizeStatus = 3
|
||||||
|
OffsetStatus = SizeExpiresAt
|
||||||
SizeURLLength = 4
|
SizeURLLength = 4
|
||||||
|
OffsetURLLength = OffsetStatus + SizeStatus
|
||||||
SizeHeaderLength = 4
|
SizeHeaderLength = 4
|
||||||
|
OffsetHeaderLength = OffsetURLLength + SizeURLLength
|
||||||
SizeBodyLength = 8
|
SizeBodyLength = 8
|
||||||
|
OffsetBodyLength = OffsetHeaderLength + SizeHeaderLength
|
||||||
|
|
||||||
SizeMeta = SizeExpiresAt + SizeStatus + SizeURLLength + SizeHeaderLength + SizeBodyLength
|
SizeMeta = SizeExpiresAt + SizeStatus + SizeURLLength + SizeHeaderLength + SizeBodyLength
|
||||||
|
OffsetKey = SizeMeta
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -56,6 +62,8 @@ const (
|
|||||||
var sharedWritingFileKeyMap = map[string]zero.Zero{} // key => bool
|
var sharedWritingFileKeyMap = map[string]zero.Zero{} // key => bool
|
||||||
var sharedWritingFileKeyLocker = sync.Mutex{}
|
var sharedWritingFileKeyLocker = sync.Mutex{}
|
||||||
|
|
||||||
|
var maxOpenFiles = 2
|
||||||
|
|
||||||
// FileStorage 文件缓存
|
// FileStorage 文件缓存
|
||||||
// 文件结构:
|
// 文件结构:
|
||||||
// [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data]
|
// [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data]
|
||||||
@@ -369,7 +377,16 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// OpenWriter 打开缓存文件等待写入
|
// OpenWriter 打开缓存文件等待写入
|
||||||
func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, size int64, maxSize int64, isPartial bool) (Writer, error) {
|
func (this *FileStorage) OpenWriter(key string, expiresAt int64, status int, size int64, maxSize int64, isPartial bool) (Writer, error) {
|
||||||
|
return this.openWriter(key, expiresAt, status, size, maxSize, isPartial, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// OpenFlushWriter 打开从其他媒介直接刷入的写入器
|
||||||
|
func (this *FileStorage) OpenFlushWriter(key string, expiresAt int64, status int) (Writer, error) {
|
||||||
|
return this.openWriter(key, expiresAt, status, -1, -1, false, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *FileStorage) openWriter(key string, expiredAt int64, status int, size int64, maxSize int64, isPartial bool, isFlushing bool) (Writer, error) {
|
||||||
// 是否正在退出
|
// 是否正在退出
|
||||||
if teaconst.IsQuiting {
|
if teaconst.IsQuiting {
|
||||||
return nil, ErrWritingUnavailable
|
return nil, ErrWritingUnavailable
|
||||||
@@ -387,7 +404,7 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, siz
|
|||||||
maxMemorySize = maxSize
|
maxMemorySize = maxSize
|
||||||
}
|
}
|
||||||
var memoryStorage = this.memoryStorage
|
var memoryStorage = this.memoryStorage
|
||||||
if !isPartial && memoryStorage != nil && ((size > 0 && size < maxMemorySize) || size < 0) {
|
if !isFlushing && !isPartial && memoryStorage != nil && ((size > 0 && size < maxMemorySize) || size < 0) {
|
||||||
writer, err := memoryStorage.OpenWriter(key, expiredAt, status, size, maxMemorySize, false)
|
writer, err := memoryStorage.OpenWriter(key, expiredAt, status, size, maxMemorySize, false)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return writer, nil
|
return writer, nil
|
||||||
@@ -407,6 +424,12 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, siz
|
|||||||
sharedWritingFileKeyLocker.Unlock()
|
sharedWritingFileKeyLocker.Unlock()
|
||||||
return nil, ErrFileIsWriting
|
return nil, ErrFileIsWriting
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(sharedWritingFileKeyMap) > maxOpenFiles {
|
||||||
|
sharedWritingFileKeyLocker.Unlock()
|
||||||
|
return nil, ErrTooManyOpenFiles
|
||||||
|
}
|
||||||
|
|
||||||
sharedWritingFileKeyMap[key] = zero.New()
|
sharedWritingFileKeyMap[key] = zero.New()
|
||||||
sharedWritingFileKeyLocker.Unlock()
|
sharedWritingFileKeyLocker.Unlock()
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -523,54 +546,28 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, siz
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 写入过期时间
|
// 写入过期时间
|
||||||
bytes4 := make([]byte, 4)
|
var metaBytes = make([]byte, SizeMeta+len(key))
|
||||||
{
|
binary.BigEndian.PutUint32(metaBytes[OffsetExpiresAt:], uint32(expiredAt))
|
||||||
binary.BigEndian.PutUint32(bytes4, uint32(expiredAt))
|
|
||||||
_, err = writer.Write(bytes4)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 写入状态码
|
// 写入状态码
|
||||||
if status > 999 || status < 100 {
|
if status > 999 || status < 100 {
|
||||||
status = 200
|
status = 200
|
||||||
}
|
}
|
||||||
_, err = writer.WriteString(strconv.Itoa(status))
|
copy(metaBytes[OffsetStatus:], strconv.Itoa(status))
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// 写入URL长度
|
// 写入URL长度
|
||||||
{
|
binary.BigEndian.PutUint32(metaBytes[OffsetURLLength:], uint32(len(key)))
|
||||||
binary.BigEndian.PutUint32(bytes4, uint32(len(key)))
|
|
||||||
_, err = writer.Write(bytes4)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 写入Header Length
|
// 写入Header Length
|
||||||
{
|
binary.BigEndian.PutUint32(metaBytes[OffsetHeaderLength:], uint32(0))
|
||||||
binary.BigEndian.PutUint32(bytes4, uint32(0))
|
|
||||||
_, err = writer.Write(bytes4)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 写入Body Length
|
// 写入Body Length
|
||||||
{
|
binary.BigEndian.PutUint64(metaBytes[OffsetBodyLength:], uint64(0))
|
||||||
b := make([]byte, SizeBodyLength)
|
|
||||||
binary.BigEndian.PutUint64(b, uint64(0))
|
|
||||||
_, err = writer.Write(b)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 写入URL
|
// 写入URL
|
||||||
_, err = writer.WriteString(key)
|
copy(metaBytes[OffsetKey:], key)
|
||||||
|
|
||||||
|
_, err = writer.Write(metaBytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,7 +14,10 @@ type StorageInterface interface {
|
|||||||
|
|
||||||
// OpenWriter 打开缓存写入器等待写入
|
// OpenWriter 打开缓存写入器等待写入
|
||||||
// size 和 maxSize 可能为-1
|
// size 和 maxSize 可能为-1
|
||||||
OpenWriter(key string, expiredAt int64, status int, size int64, maxSize int64, isPartial bool) (Writer, error)
|
OpenWriter(key string, expiresAt int64, status int, size int64, maxSize int64, isPartial bool) (Writer, error)
|
||||||
|
|
||||||
|
// OpenFlushWriter 打开从其他媒介直接刷入的写入器
|
||||||
|
OpenFlushWriter(key string, expiresAt int64, status int) (Writer, error)
|
||||||
|
|
||||||
// Delete 删除某个键值对应的缓存
|
// Delete 删除某个键值对应的缓存
|
||||||
Delete(key string) error
|
Delete(key string) error
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type MemoryItem struct {
|
type MemoryItem struct {
|
||||||
ExpiredAt int64
|
ExpiresAt int64
|
||||||
HeaderValue []byte
|
HeaderValue []byte
|
||||||
BodyValue []byte
|
BodyValue []byte
|
||||||
Status int
|
Status int
|
||||||
@@ -32,7 +32,7 @@ type MemoryItem struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *MemoryItem) IsExpired() bool {
|
func (this *MemoryItem) IsExpired() bool {
|
||||||
return this.ExpiredAt < utils.UnixTime()
|
return this.ExpiresAt < utils.UnixTime()
|
||||||
}
|
}
|
||||||
|
|
||||||
type MemoryStorage struct {
|
type MemoryStorage struct {
|
||||||
@@ -118,7 +118,7 @@ func (this *MemoryStorage) OpenReader(key string, useStale bool, isPartial bool)
|
|||||||
return nil, ErrNotFound
|
return nil, ErrNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
if useStale || (item.ExpiredAt > utils.UnixTime()) {
|
if useStale || (item.ExpiresAt > utils.UnixTime()) {
|
||||||
reader := NewMemoryReader(item)
|
reader := NewMemoryReader(item)
|
||||||
err := reader.Init()
|
err := reader.Init()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -160,7 +160,12 @@ func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int, s
|
|||||||
return this.openWriter(key, expiredAt, status, size, maxSize, true)
|
return this.openWriter(key, expiredAt, status, size, maxSize, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, size int64, maxSize int64, isDirty bool) (Writer, error) {
|
// OpenFlushWriter 打开从其他媒介直接刷入的写入器
|
||||||
|
func (this *MemoryStorage) OpenFlushWriter(key string, expiresAt int64, status int) (Writer, error) {
|
||||||
|
return this.openWriter(key, expiresAt, status, -1, -1, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *MemoryStorage) openWriter(key string, expiresAt int64, status int, size int64, maxSize int64, isDirty bool) (Writer, error) {
|
||||||
// 待写入队列是否已满
|
// 待写入队列是否已满
|
||||||
if isDirty &&
|
if isDirty &&
|
||||||
this.parentStorage != nil &&
|
this.parentStorage != nil &&
|
||||||
@@ -215,7 +220,7 @@ func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, s
|
|||||||
}
|
}
|
||||||
|
|
||||||
isWriting = true
|
isWriting = true
|
||||||
return NewMemoryWriter(this, key, expiredAt, status, isDirty, maxSize, func() {
|
return NewMemoryWriter(this, key, expiresAt, status, isDirty, maxSize, func() {
|
||||||
this.locker.Lock()
|
this.locker.Lock()
|
||||||
delete(this.writingKeyMap, key)
|
delete(this.writingKeyMap, key)
|
||||||
this.locker.Unlock()
|
this.locker.Unlock()
|
||||||
@@ -471,7 +476,7 @@ func (this *MemoryStorage) flushItem(key string) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
writer, err := this.parentStorage.OpenWriter(key, item.ExpiredAt, item.Status, -1, -1, false)
|
writer, err := this.parentStorage.OpenFlushWriter(key, item.ExpiresAt, item.Status)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !CanIgnoreErr(err) {
|
if !CanIgnoreErr(err) {
|
||||||
remotelogs.Error("CACHE", "flush items failed: open writer failed: "+err.Error())
|
remotelogs.Error("CACHE", "flush items failed: open writer failed: "+err.Error())
|
||||||
@@ -503,7 +508,7 @@ func (this *MemoryStorage) flushItem(key string) {
|
|||||||
this.parentStorage.AddToList(&Item{
|
this.parentStorage.AddToList(&Item{
|
||||||
Type: writer.ItemType(),
|
Type: writer.ItemType(),
|
||||||
Key: key,
|
Key: key,
|
||||||
ExpiredAt: item.ExpiredAt,
|
ExpiredAt: item.ExpiresAt,
|
||||||
HeaderSize: writer.HeaderSize(),
|
HeaderSize: writer.HeaderSize(),
|
||||||
BodySize: writer.BodySize(),
|
BodySize: writer.BodySize(),
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ func NewMemoryWriter(memoryStorage *MemoryStorage, key string, expiredAt int64,
|
|||||||
key: key,
|
key: key,
|
||||||
expiredAt: expiredAt,
|
expiredAt: expiredAt,
|
||||||
item: &MemoryItem{
|
item: &MemoryItem{
|
||||||
ExpiredAt: expiredAt,
|
ExpiresAt: expiredAt,
|
||||||
ModifiedAt: time.Now().Unix(),
|
ModifiedAt: time.Now().Unix(),
|
||||||
Status: status,
|
Status: status,
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -313,6 +313,8 @@ func (this *HTTPWriter) PrepareCache(resp *http.Response, size int64) {
|
|||||||
if !caches.CanIgnoreErr(err) {
|
if !caches.CanIgnoreErr(err) {
|
||||||
remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error())
|
remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.Header().Set("X-Cache", "BYPASS, open failed")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
this.cacheWriter = cacheWriter
|
this.cacheWriter = cacheWriter
|
||||||
|
|||||||
Reference in New Issue
Block a user