From 5e9e88246f1801337eb2039665c9bd77b28cfe45 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Wed, 12 May 2021 21:38:44 +0800 Subject: [PATCH] =?UTF-8?q?=E8=8A=82=E7=82=B9=E5=8F=AF=E4=BB=A5=E5=8D=95?= =?UTF-8?q?=E7=8B=AC=E8=AE=BE=E7=BD=AE=E7=BC=93=E5=AD=98=E7=9A=84=E7=A3=81?= =?UTF-8?q?=E7=9B=98=E3=80=81=E5=86=85=E5=AD=98=E5=AE=B9=E9=87=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/caches/manager.go | 15 ++++++++---- internal/caches/storage_file.go | 39 +++++++++++++++++++----------- internal/caches/storage_memory.go | 40 ++++++++++++++++++++++--------- internal/nodes/node.go | 2 ++ 4 files changed, 67 insertions(+), 29 deletions(-) diff --git a/internal/caches/manager.go b/internal/caches/manager.go index 7a93644..b6e39ed 100644 --- a/internal/caches/manager.go +++ b/internal/caches/manager.go @@ -2,6 +2,7 @@ package caches import ( "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/iwind/TeaGo/lists" "strconv" @@ -10,12 +11,18 @@ import ( var SharedManager = NewManager() +// Manager 缓存策略管理器 type Manager struct { + // 全局配置 + MaxDiskCapacity *shared.SizeCapacity + MaxMemoryCapacity *shared.SizeCapacity + policyMap map[int64]*serverconfigs.HTTPCachePolicy // policyId => []*Policy storageMap map[int64]StorageInterface // policyId => *Storage locker sync.RWMutex } +// NewManager 获取管理器对象 func NewManager() *Manager { return &Manager{ policyMap: map[int64]*serverconfigs.HTTPCachePolicy{}, @@ -23,7 +30,7 @@ func NewManager() *Manager { } } -// 重新设置策略 +// UpdatePolicies 重新设置策略 func (this *Manager) UpdatePolicies(newPolicies []*serverconfigs.HTTPCachePolicy) { this.locker.Lock() defer this.locker.Unlock() @@ -103,7 +110,7 @@ func (this *Manager) UpdatePolicies(newPolicies []*serverconfigs.HTTPCachePolicy } } -// 获取Policy信息 +// FindPolicy 获取Policy信息 func (this *Manager) FindPolicy(policyId int64) *serverconfigs.HTTPCachePolicy { this.locker.RLock() defer this.locker.RUnlock() @@ -112,7 +119,7 @@ func (this *Manager) FindPolicy(policyId int64) *serverconfigs.HTTPCachePolicy { return p } -// 根据策略ID查找存储 +// FindStorageWithPolicy 根据策略ID查找存储 func (this *Manager) FindStorageWithPolicy(policyId int64) StorageInterface { this.locker.RLock() defer this.locker.RUnlock() @@ -121,7 +128,7 @@ func (this *Manager) FindStorageWithPolicy(policyId int64) StorageInterface { return storage } -// 根据策略获取存储对象 +// NewStorageWithPolicy 根据策略获取存储对象 func (this *Manager) NewStorageWithPolicy(policy *serverconfigs.HTTPCachePolicy) StorageInterface { switch policy.Type { case serverconfigs.CachePolicyStorageFile: diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 98f9dec..057c485 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -40,7 +40,7 @@ var ( ErrInvalidRange = errors.New("invalid range") ) -// 文件缓存 +// FileStorage 文件缓存 // 文件结构: // [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data] type FileStorage struct { @@ -61,12 +61,12 @@ func NewFileStorage(policy *serverconfigs.HTTPCachePolicy) *FileStorage { } } -// 获取当前的Policy +// Policy 获取当前的Policy func (this *FileStorage) Policy() *serverconfigs.HTTPCachePolicy { return this.policy } -// 初始化 +// Init 初始化 func (this *FileStorage) Init() error { this.list.OnAdd(func(item *Item) { atomic.AddInt64(&this.totalSize, item.TotalSize()) @@ -148,14 +148,13 @@ func (this *FileStorage) Init() error { // 加载内存缓存 if this.cacheConfig.MemoryPolicy != nil { - memoryCapacity := this.cacheConfig.MemoryPolicy.Capacity - if memoryCapacity != nil && memoryCapacity.Count > 0 { + if this.cacheConfig.MemoryPolicy.Capacity != nil && this.cacheConfig.MemoryPolicy.Capacity.Count > 0 { memoryPolicy := &serverconfigs.HTTPCachePolicy{ Id: this.policy.Id, IsOn: this.policy.IsOn, Name: this.policy.Name, Description: this.policy.Description, - Capacity: memoryCapacity, + Capacity: this.cacheConfig.MemoryPolicy.Capacity, MaxKeys: this.policy.MaxKeys, MaxSize: &shared.SizeCapacity{Count: 128, Unit: shared.SizeCapacityUnitMB}, // TODO 将来可以修改 Type: serverconfigs.CachePolicyStorageMemory, @@ -214,7 +213,7 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) { return reader, nil } -// 打开缓存文件等待写入 +// OpenWriter 打开缓存文件等待写入 func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) { // 先尝试内存缓存 if this.memoryStorage != nil { @@ -228,7 +227,8 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys { return nil, errors.New("write file cache failed: too many keys in cache storage") } - if this.policy.CapacityBytes() > 0 && this.policy.CapacityBytes() <= this.totalSize { + capacityBytes := this.diskCapacityBytes() + if capacityBytes > 0 && capacityBytes <= this.totalSize { return nil, errors.New("write file cache failed: over disk size, real size: " + strconv.FormatInt(this.totalSize, 10) + " bytes") } @@ -340,7 +340,7 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr return NewFileWriter(writer, key, expiredAt), nil } -// 添加到List +// AddToList 添加到List func (this *FileStorage) AddToList(item *Item) { if this.memoryStorage != nil { if item.Type == ItemTypeMemory { @@ -354,7 +354,7 @@ func (this *FileStorage) AddToList(item *Item) { this.list.Add(hash, item) } -// 删除某个键值对应的缓存 +// Delete 删除某个键值对应的缓存 func (this *FileStorage) Delete(key string) error { this.locker.Lock() defer this.locker.Unlock() @@ -373,7 +373,7 @@ func (this *FileStorage) Delete(key string) error { return err } -// 统计 +// Stat 统计 func (this *FileStorage) Stat() (*Stat, error) { this.locker.RLock() defer this.locker.RUnlock() @@ -383,7 +383,7 @@ func (this *FileStorage) Stat() (*Stat, error) { }), nil } -// 清除所有的缓存 +// CleanAll 清除所有的缓存 func (this *FileStorage) CleanAll() error { this.locker.Lock() defer this.locker.Unlock() @@ -441,7 +441,7 @@ func (this *FileStorage) CleanAll() error { return nil } -// 清理过期的缓存 +// Purge 清理过期的缓存 func (this *FileStorage) Purge(keys []string, urlType string) error { this.locker.Lock() defer this.locker.Unlock() @@ -480,7 +480,7 @@ func (this *FileStorage) Purge(keys []string, urlType string) error { return nil } -// 停止 +// Stop 停止 func (this *FileStorage) Stop() { this.locker.Lock() defer this.locker.Unlock() @@ -709,3 +709,14 @@ func (this *FileStorage) readN(fp *os.File, buf []byte, total int) (result []byt } } } + +func (this *FileStorage) diskCapacityBytes() int64 { + c1 := this.policy.CapacityBytes() + if SharedManager.MaxDiskCapacity != nil { + c2 := SharedManager.MaxDiskCapacity.Bytes() + if c2 > 0 { + return c2 + } + } + return c1 +} diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index 5c24e53..c40e68a 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -39,7 +39,7 @@ func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy) *MemoryStorage { } } -// 初始化 +// Init 初始化 func (this *MemoryStorage) Init() error { this.list.OnAdd(func(item *Item) { atomic.AddInt64(&this.totalSize, item.Size()) @@ -63,7 +63,7 @@ func (this *MemoryStorage) Init() error { return nil } -// 读取缓存 +// OpenReader 读取缓存 func (this *MemoryStorage) OpenReader(key string) (Reader, error) { hash := this.hash(key) @@ -89,13 +89,14 @@ func (this *MemoryStorage) OpenReader(key string) (Reader, error) { return nil, ErrNotFound } -// 打开缓存写入器等待写入 +// OpenWriter 打开缓存写入器等待写入 func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) { // 检查是否超出最大值 if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys { return nil, errors.New("write memory cache failed: too many keys in cache storage") } - if this.policy.CapacityBytes() > 0 && this.policy.CapacityBytes() <= this.totalSize { + capacityBytes := this.memoryCapacityBytes() + if capacityBytes > 0 && capacityBytes <= this.totalSize { return nil, errors.New("write memory cache failed: over memory size, real size: " + strconv.FormatInt(this.totalSize, 10) + " bytes") } @@ -108,7 +109,7 @@ func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) ( return NewMemoryWriter(this.valuesMap, key, expiredAt, status, this.locker), nil } -// 删除某个键值对应的缓存 +// Delete 删除某个键值对应的缓存 func (this *MemoryStorage) Delete(key string) error { hash := this.hash(key) this.locker.Lock() @@ -118,7 +119,7 @@ func (this *MemoryStorage) Delete(key string) error { return nil } -// 统计缓存 +// Stat 统计缓存 func (this *MemoryStorage) Stat() (*Stat, error) { this.locker.RLock() defer this.locker.RUnlock() @@ -128,7 +129,7 @@ func (this *MemoryStorage) Stat() (*Stat, error) { }), nil } -// 清除所有缓存 +// CleanAll 清除所有缓存 func (this *MemoryStorage) CleanAll() error { this.locker.Lock() this.valuesMap = map[uint64]*MemoryItem{} @@ -138,7 +139,7 @@ func (this *MemoryStorage) CleanAll() error { return nil } -// 批量删除缓存 +// Purge 批量删除缓存 func (this *MemoryStorage) Purge(keys []string, urlType string) error { // 目录 if urlType == "dir" { @@ -158,7 +159,7 @@ func (this *MemoryStorage) Purge(keys []string, urlType string) error { return nil } -// 停止缓存策略 +// Stop 停止缓存策略 func (this *MemoryStorage) Stop() { this.locker.Lock() defer this.locker.Unlock() @@ -170,12 +171,12 @@ func (this *MemoryStorage) Stop() { } } -// 获取当前存储的Policy +// Policy 获取当前存储的Policy func (this *MemoryStorage) Policy() *serverconfigs.HTTPCachePolicy { return this.policy } -// 将缓存添加到列表 +// AddToList 将缓存添加到列表 func (this *MemoryStorage) AddToList(item *Item) { item.MetaSize = int64(len(item.Key)) + 32 /** 32是我们评估的数据结构的长度 **/ hash := fmt.Sprintf("%d", this.hash(item.Key)) @@ -198,3 +199,20 @@ func (this *MemoryStorage) purgeLoop() { } }) } + +func (this *MemoryStorage) memoryCapacityBytes() int64 { + if this.policy == nil { + return 0 + } + c1 := int64(0) + if this.policy.Capacity != nil { + c1 = this.policy.Capacity.Bytes() + } + if SharedManager.MaxMemoryCapacity != nil { + c2 := SharedManager.MaxMemoryCapacity.Bytes() + if c2 > 0 { + return c2 + } + } + return c1 +} diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 05cb107..166a1d7 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -362,6 +362,8 @@ func (this *Node) syncConfig() error { } nodeconfigs.ResetNodeConfig(nodeConfig) + caches.SharedManager.MaxDiskCapacity = nodeConfig.MaxCacheDiskCapacity + caches.SharedManager.MaxMemoryCapacity = nodeConfig.MaxCacheMemoryCapacity if nodeConfig.HTTPCachePolicy != nil { caches.SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{nodeConfig.HTTPCachePolicy}) } else {