节点可以单独设置缓存的磁盘、内存容量

This commit is contained in:
GoEdgeLab
2021-05-12 21:38:44 +08:00
parent 3d52ddb3e8
commit 5e9e88246f
4 changed files with 67 additions and 29 deletions

View File

@@ -2,6 +2,7 @@ package caches
import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/iwind/TeaGo/lists"
"strconv"
@@ -10,12 +11,18 @@ import (
var SharedManager = NewManager()
// Manager 缓存策略管理器
type Manager struct {
// 全局配置
MaxDiskCapacity *shared.SizeCapacity
MaxMemoryCapacity *shared.SizeCapacity
policyMap map[int64]*serverconfigs.HTTPCachePolicy // policyId => []*Policy
storageMap map[int64]StorageInterface // policyId => *Storage
locker sync.RWMutex
}
// NewManager 获取管理器对象
func NewManager() *Manager {
return &Manager{
policyMap: map[int64]*serverconfigs.HTTPCachePolicy{},
@@ -23,7 +30,7 @@ func NewManager() *Manager {
}
}
// 重新设置策略
// UpdatePolicies 重新设置策略
func (this *Manager) UpdatePolicies(newPolicies []*serverconfigs.HTTPCachePolicy) {
this.locker.Lock()
defer this.locker.Unlock()
@@ -103,7 +110,7 @@ func (this *Manager) UpdatePolicies(newPolicies []*serverconfigs.HTTPCachePolicy
}
}
// 获取Policy信息
// FindPolicy 获取Policy信息
func (this *Manager) FindPolicy(policyId int64) *serverconfigs.HTTPCachePolicy {
this.locker.RLock()
defer this.locker.RUnlock()
@@ -112,7 +119,7 @@ func (this *Manager) FindPolicy(policyId int64) *serverconfigs.HTTPCachePolicy {
return p
}
// 根据策略ID查找存储
// FindStorageWithPolicy 根据策略ID查找存储
func (this *Manager) FindStorageWithPolicy(policyId int64) StorageInterface {
this.locker.RLock()
defer this.locker.RUnlock()
@@ -121,7 +128,7 @@ func (this *Manager) FindStorageWithPolicy(policyId int64) StorageInterface {
return storage
}
// 根据策略获取存储对象
// NewStorageWithPolicy 根据策略获取存储对象
func (this *Manager) NewStorageWithPolicy(policy *serverconfigs.HTTPCachePolicy) StorageInterface {
switch policy.Type {
case serverconfigs.CachePolicyStorageFile:

View File

@@ -40,7 +40,7 @@ var (
ErrInvalidRange = errors.New("invalid range")
)
// 文件缓存
// FileStorage 文件缓存
// 文件结构:
// [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data]
type FileStorage struct {
@@ -61,12 +61,12 @@ func NewFileStorage(policy *serverconfigs.HTTPCachePolicy) *FileStorage {
}
}
// 获取当前的Policy
// Policy 获取当前的Policy
func (this *FileStorage) Policy() *serverconfigs.HTTPCachePolicy {
return this.policy
}
// 初始化
// Init 初始化
func (this *FileStorage) Init() error {
this.list.OnAdd(func(item *Item) {
atomic.AddInt64(&this.totalSize, item.TotalSize())
@@ -148,14 +148,13 @@ func (this *FileStorage) Init() error {
// 加载内存缓存
if this.cacheConfig.MemoryPolicy != nil {
memoryCapacity := this.cacheConfig.MemoryPolicy.Capacity
if memoryCapacity != nil && memoryCapacity.Count > 0 {
if this.cacheConfig.MemoryPolicy.Capacity != nil && this.cacheConfig.MemoryPolicy.Capacity.Count > 0 {
memoryPolicy := &serverconfigs.HTTPCachePolicy{
Id: this.policy.Id,
IsOn: this.policy.IsOn,
Name: this.policy.Name,
Description: this.policy.Description,
Capacity: memoryCapacity,
Capacity: this.cacheConfig.MemoryPolicy.Capacity,
MaxKeys: this.policy.MaxKeys,
MaxSize: &shared.SizeCapacity{Count: 128, Unit: shared.SizeCapacityUnitMB}, // TODO 将来可以修改
Type: serverconfigs.CachePolicyStorageMemory,
@@ -214,7 +213,7 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) {
return reader, nil
}
// 打开缓存文件等待写入
// OpenWriter 打开缓存文件等待写入
func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) {
// 先尝试内存缓存
if this.memoryStorage != nil {
@@ -228,7 +227,8 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr
if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys {
return nil, errors.New("write file cache failed: too many keys in cache storage")
}
if this.policy.CapacityBytes() > 0 && this.policy.CapacityBytes() <= this.totalSize {
capacityBytes := this.diskCapacityBytes()
if capacityBytes > 0 && capacityBytes <= this.totalSize {
return nil, errors.New("write file cache failed: over disk size, real size: " + strconv.FormatInt(this.totalSize, 10) + " bytes")
}
@@ -340,7 +340,7 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr
return NewFileWriter(writer, key, expiredAt), nil
}
// 添加到List
// AddToList 添加到List
func (this *FileStorage) AddToList(item *Item) {
if this.memoryStorage != nil {
if item.Type == ItemTypeMemory {
@@ -354,7 +354,7 @@ func (this *FileStorage) AddToList(item *Item) {
this.list.Add(hash, item)
}
// 删除某个键值对应的缓存
// Delete 删除某个键值对应的缓存
func (this *FileStorage) Delete(key string) error {
this.locker.Lock()
defer this.locker.Unlock()
@@ -373,7 +373,7 @@ func (this *FileStorage) Delete(key string) error {
return err
}
// 统计
// Stat 统计
func (this *FileStorage) Stat() (*Stat, error) {
this.locker.RLock()
defer this.locker.RUnlock()
@@ -383,7 +383,7 @@ func (this *FileStorage) Stat() (*Stat, error) {
}), nil
}
// 清除所有的缓存
// CleanAll 清除所有的缓存
func (this *FileStorage) CleanAll() error {
this.locker.Lock()
defer this.locker.Unlock()
@@ -441,7 +441,7 @@ func (this *FileStorage) CleanAll() error {
return nil
}
// 清理过期的缓存
// Purge 清理过期的缓存
func (this *FileStorage) Purge(keys []string, urlType string) error {
this.locker.Lock()
defer this.locker.Unlock()
@@ -480,7 +480,7 @@ func (this *FileStorage) Purge(keys []string, urlType string) error {
return nil
}
// 停止
// Stop 停止
func (this *FileStorage) Stop() {
this.locker.Lock()
defer this.locker.Unlock()
@@ -709,3 +709,14 @@ func (this *FileStorage) readN(fp *os.File, buf []byte, total int) (result []byt
}
}
}
func (this *FileStorage) diskCapacityBytes() int64 {
c1 := this.policy.CapacityBytes()
if SharedManager.MaxDiskCapacity != nil {
c2 := SharedManager.MaxDiskCapacity.Bytes()
if c2 > 0 {
return c2
}
}
return c1
}

View File

@@ -39,7 +39,7 @@ func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy) *MemoryStorage {
}
}
// 初始化
// Init 初始化
func (this *MemoryStorage) Init() error {
this.list.OnAdd(func(item *Item) {
atomic.AddInt64(&this.totalSize, item.Size())
@@ -63,7 +63,7 @@ func (this *MemoryStorage) Init() error {
return nil
}
// 读取缓存
// OpenReader 读取缓存
func (this *MemoryStorage) OpenReader(key string) (Reader, error) {
hash := this.hash(key)
@@ -89,13 +89,14 @@ func (this *MemoryStorage) OpenReader(key string) (Reader, error) {
return nil, ErrNotFound
}
// 打开缓存写入器等待写入
// OpenWriter 打开缓存写入器等待写入
func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) {
// 检查是否超出最大值
if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys {
return nil, errors.New("write memory cache failed: too many keys in cache storage")
}
if this.policy.CapacityBytes() > 0 && this.policy.CapacityBytes() <= this.totalSize {
capacityBytes := this.memoryCapacityBytes()
if capacityBytes > 0 && capacityBytes <= this.totalSize {
return nil, errors.New("write memory cache failed: over memory size, real size: " + strconv.FormatInt(this.totalSize, 10) + " bytes")
}
@@ -108,7 +109,7 @@ func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (
return NewMemoryWriter(this.valuesMap, key, expiredAt, status, this.locker), nil
}
// 删除某个键值对应的缓存
// Delete 删除某个键值对应的缓存
func (this *MemoryStorage) Delete(key string) error {
hash := this.hash(key)
this.locker.Lock()
@@ -118,7 +119,7 @@ func (this *MemoryStorage) Delete(key string) error {
return nil
}
// 统计缓存
// Stat 统计缓存
func (this *MemoryStorage) Stat() (*Stat, error) {
this.locker.RLock()
defer this.locker.RUnlock()
@@ -128,7 +129,7 @@ func (this *MemoryStorage) Stat() (*Stat, error) {
}), nil
}
// 清除所有缓存
// CleanAll 清除所有缓存
func (this *MemoryStorage) CleanAll() error {
this.locker.Lock()
this.valuesMap = map[uint64]*MemoryItem{}
@@ -138,7 +139,7 @@ func (this *MemoryStorage) CleanAll() error {
return nil
}
// 批量删除缓存
// Purge 批量删除缓存
func (this *MemoryStorage) Purge(keys []string, urlType string) error {
// 目录
if urlType == "dir" {
@@ -158,7 +159,7 @@ func (this *MemoryStorage) Purge(keys []string, urlType string) error {
return nil
}
// 停止缓存策略
// Stop 停止缓存策略
func (this *MemoryStorage) Stop() {
this.locker.Lock()
defer this.locker.Unlock()
@@ -170,12 +171,12 @@ func (this *MemoryStorage) Stop() {
}
}
// 获取当前存储的Policy
// Policy 获取当前存储的Policy
func (this *MemoryStorage) Policy() *serverconfigs.HTTPCachePolicy {
return this.policy
}
// 将缓存添加到列表
// AddToList 将缓存添加到列表
func (this *MemoryStorage) AddToList(item *Item) {
item.MetaSize = int64(len(item.Key)) + 32 /** 32是我们评估的数据结构的长度 **/
hash := fmt.Sprintf("%d", this.hash(item.Key))
@@ -198,3 +199,20 @@ func (this *MemoryStorage) purgeLoop() {
}
})
}
func (this *MemoryStorage) memoryCapacityBytes() int64 {
if this.policy == nil {
return 0
}
c1 := int64(0)
if this.policy.Capacity != nil {
c1 = this.policy.Capacity.Bytes()
}
if SharedManager.MaxMemoryCapacity != nil {
c2 := SharedManager.MaxMemoryCapacity.Bytes()
if c2 > 0 {
return c2
}
}
return c1
}

View File

@@ -362,6 +362,8 @@ func (this *Node) syncConfig() error {
}
nodeconfigs.ResetNodeConfig(nodeConfig)
caches.SharedManager.MaxDiskCapacity = nodeConfig.MaxCacheDiskCapacity
caches.SharedManager.MaxMemoryCapacity = nodeConfig.MaxCacheMemoryCapacity
if nodeConfig.HTTPCachePolicy != nil {
caches.SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{nodeConfig.HTTPCachePolicy})
} else {