mirror of
				https://github.com/TeaOSLab/EdgeNode.git
				synced 2025-11-04 07:40:56 +08:00 
			
		
		
		
	增加内存缓存队列长度,确保不会产生不在队列里的缓存对象
This commit is contained in:
		@@ -436,7 +436,7 @@ func (this *FileStorage) openWriter(key string, expiredAt int64, status int, hea
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// 如果队列满了,则等待
 | 
			
		||||
		if err == ErrWritingQueueFull {
 | 
			
		||||
		if errors.Is(err, ErrWritingQueueFull) {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -9,7 +9,6 @@ import (
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
 | 
			
		||||
	fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
 | 
			
		||||
	setutils "github.com/TeaOSLab/EdgeNode/internal/utils/sets"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/utils/sizes"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/zero"
 | 
			
		||||
	"github.com/cespare/xxhash"
 | 
			
		||||
	"github.com/iwind/TeaGo/types"
 | 
			
		||||
@@ -67,7 +66,7 @@ func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy, parentStorage Stora
 | 
			
		||||
 | 
			
		||||
	if parentStorage != nil {
 | 
			
		||||
		if queueSize <= 0 {
 | 
			
		||||
			queueSize = 2048 + int(policy.CapacityBytes()/sizes.G)*2048
 | 
			
		||||
			queueSize = utils.SystemMemoryGB() * 100_000
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		dirtyChan = make(chan string, queueSize)
 | 
			
		||||
@@ -166,7 +165,7 @@ func (this *MemoryStorage) openWriter(key string, expiresAt int64, status int, h
 | 
			
		||||
	if isDirty &&
 | 
			
		||||
		this.parentStorage != nil &&
 | 
			
		||||
		this.dirtyQueueSize > 0 &&
 | 
			
		||||
		len(this.dirtyChan) == this.dirtyQueueSize { // 缓存时间过长
 | 
			
		||||
		len(this.dirtyChan) >= this.dirtyQueueSize-int(fsutils.DiskMaxWrites) /** delta **/ { // 缓存时间过长
 | 
			
		||||
		return nil, ErrWritingQueueFull
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -202,7 +201,7 @@ func (this *MemoryStorage) openWriter(key string, expiresAt int64, status int, h
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 检查是否超出最大值
 | 
			
		||||
	// 检查是否超出容量最大值
 | 
			
		||||
	var capacityBytes = this.memoryCapacityBytes()
 | 
			
		||||
	if bodySize < 0 {
 | 
			
		||||
		bodySize = 0
 | 
			
		||||
@@ -226,8 +225,8 @@ func (this *MemoryStorage) openWriter(key string, expiresAt int64, status int, h
 | 
			
		||||
		if valueItem != nil {
 | 
			
		||||
			valueItem.TotalSize = int64(len(valueItem.HeaderValue) + len(valueItem.BodyValue) + len(key) + 256 /** meta size **/)
 | 
			
		||||
 | 
			
		||||
			atomic.AddInt64(&this.usedSize, valueItem.TotalSize)
 | 
			
		||||
			runtime.SetFinalizer(valueItem, this.valueItemFinalizer)
 | 
			
		||||
			atomic.AddInt64(&this.usedSize, valueItem.TotalSize)
 | 
			
		||||
		}
 | 
			
		||||
	}), nil
 | 
			
		||||
}
 | 
			
		||||
@@ -516,6 +515,11 @@ func (this *MemoryStorage) flushItem(key string) {
 | 
			
		||||
	item, ok := this.valuesMap[hash]
 | 
			
		||||
	this.locker.RUnlock()
 | 
			
		||||
 | 
			
		||||
	// 从内存中移除,并确保无论如何都会执行
 | 
			
		||||
	defer func() {
 | 
			
		||||
		_ = this.Delete(key)
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
@@ -564,9 +568,6 @@ func (this *MemoryStorage) flushItem(key string) {
 | 
			
		||||
		HeaderSize: writer.HeaderSize(),
 | 
			
		||||
		BodySize:   writer.BodySize(),
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	// 从内存中移除
 | 
			
		||||
	_ = this.Delete(key)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *MemoryStorage) memoryCapacityBytes() int64 {
 | 
			
		||||
 
 | 
			
		||||
@@ -2,9 +2,9 @@ package caches
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"errors"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
 | 
			
		||||
	"github.com/cespare/xxhash"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type MemoryWriter struct {
 | 
			
		||||
@@ -29,7 +29,7 @@ type MemoryWriter struct {
 | 
			
		||||
func NewMemoryWriter(memoryStorage *MemoryStorage, key string, expiredAt int64, status int, isDirty bool, expectedBodySize int64, maxSize int64, endFunc func(valueItem *MemoryItem)) *MemoryWriter {
 | 
			
		||||
	var valueItem = &MemoryItem{
 | 
			
		||||
		ExpiresAt:  expiredAt,
 | 
			
		||||
		ModifiedAt: time.Now().Unix(),
 | 
			
		||||
		ModifiedAt: fasttime.Now().Unix(),
 | 
			
		||||
		Status:     status,
 | 
			
		||||
	}
 | 
			
		||||
	if expectedBodySize > 0 {
 | 
			
		||||
@@ -120,18 +120,26 @@ func (this *MemoryWriter) Close() error {
 | 
			
		||||
 | 
			
		||||
	this.storage.locker.Lock()
 | 
			
		||||
	this.item.IsDone = true
 | 
			
		||||
	this.storage.valuesMap[this.hash] = this.item
 | 
			
		||||
	var err error
 | 
			
		||||
	if this.isDirty {
 | 
			
		||||
		if this.storage.parentStorage != nil {
 | 
			
		||||
			select {
 | 
			
		||||
			case this.storage.dirtyChan <- this.key:
 | 
			
		||||
				this.storage.valuesMap[this.hash] = this.item
 | 
			
		||||
			default:
 | 
			
		||||
				// do not add value map
 | 
			
		||||
				err = ErrWritingQueueFull
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			err = ErrWritingQueueFull
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		this.storage.valuesMap[this.hash] = this.item
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	this.storage.locker.Unlock()
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Discard 丢弃
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user