From a7cb922e9ffd95e079ce7c2161509f8cedc43b76 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Tue, 15 Mar 2022 21:33:44 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BC=93=E5=AD=98=E7=AD=96=E7=95=A5=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E6=97=B6=E5=B0=BD=E5=8F=AF=E8=83=BD=E4=B8=8D=E9=87=8D?= =?UTF-8?q?=E6=96=B0=E5=8A=A0=E8=BD=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/caches/manager.go | 14 +- internal/caches/manager_test.go | 51 +++- internal/caches/storage_file.go | 349 ++++++++++++++++++--------- internal/caches/storage_file_test.go | 4 +- internal/caches/storage_interface.go | 6 + internal/caches/storage_memory.go | 57 +++-- 6 files changed, 350 insertions(+), 131 deletions(-) diff --git a/internal/caches/manager.go b/internal/caches/manager.go index 4cda02f..de69ab6 100644 --- a/internal/caches/manager.go +++ b/internal/caches/manager.go @@ -99,7 +99,19 @@ func (this *Manager) UpdatePolicies(newPolicies []*serverconfigs.HTTPCachePolicy } else { // 检查policy是否有变化 if !storage.Policy().IsSame(policy) { - remotelogs.Println("CACHE", "policy "+strconv.FormatInt(policy.Id, 10)+" changed") + // 检查是否可以直接修改 + if storage.CanUpdatePolicy(policy) { + err := policy.Init() + if err != nil { + remotelogs.Error("CACHE", "reload policy '"+types.String(policy.Id)+"' failed: init policy failed: "+err.Error()) + continue + } + remotelogs.Println("CACHE", "reload policy '"+types.String(policy.Id)+"'") + storage.UpdatePolicy(policy) + continue + } + + remotelogs.Println("CACHE", "restart policy '"+types.String(policy.Id)+"'") // 停止老的 storage.Stop() diff --git a/internal/caches/manager_test.go b/internal/caches/manager_test.go index c64ca02..6df8d41 100644 --- a/internal/caches/manager_test.go +++ b/internal/caches/manager_test.go @@ -2,19 +2,20 @@ package caches import ( "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared" "github.com/iwind/TeaGo/Tea" "testing" ) func TestManager_UpdatePolicies(t *testing.T) { { - policies := []*serverconfigs.HTTPCachePolicy{} + var policies = []*serverconfigs.HTTPCachePolicy{} SharedManager.UpdatePolicies(policies) printManager(t) } { - policies := []*serverconfigs.HTTPCachePolicy{ + var policies = []*serverconfigs.HTTPCachePolicy{ { Id: 1, Type: serverconfigs.CachePolicyStorageFile, @@ -42,7 +43,7 @@ func TestManager_UpdatePolicies(t *testing.T) { } { - policies := []*serverconfigs.HTTPCachePolicy{ + var policies = []*serverconfigs.HTTPCachePolicy{ { Id: 1, Type: serverconfigs.CachePolicyStorageFile, @@ -71,6 +72,50 @@ func TestManager_UpdatePolicies(t *testing.T) { } } +func TestManager_ChangePolicy_Memory(t *testing.T) { + var policies = []*serverconfigs.HTTPCachePolicy{ + { + Id: 1, + Type: serverconfigs.CachePolicyStorageMemory, + Options: map[string]interface{}{}, + Capacity: &shared.SizeCapacity{Count: 1, Unit: shared.SizeCapacityUnitGB}, + }, + } + SharedManager.UpdatePolicies(policies) + SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{ + { + Id: 1, + Type: serverconfigs.CachePolicyStorageMemory, + Options: map[string]interface{}{}, + Capacity: &shared.SizeCapacity{Count: 2, Unit: shared.SizeCapacityUnitGB}, + }, + }) +} + +func TestManager_ChangePolicy_File(t *testing.T) { + var policies = []*serverconfigs.HTTPCachePolicy{ + { + Id: 1, + Type: serverconfigs.CachePolicyStorageFile, + Options: map[string]interface{}{ + "dir": Tea.Root + "/data/cache-index/p1", + }, + Capacity: &shared.SizeCapacity{Count: 1, Unit: shared.SizeCapacityUnitGB}, + }, + } + SharedManager.UpdatePolicies(policies) + SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{ + { + Id: 1, + Type: serverconfigs.CachePolicyStorageFile, + Options: map[string]interface{}{ + "dir": Tea.Root + "/data/cache-index/p1", + }, + Capacity: &shared.SizeCapacity{Count: 2, Unit: shared.SizeCapacityUnitGB}, + }, + }) +} + func printManager(t *testing.T) { t.Log("===manager==") t.Log("storage:") diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 1abbcff..705b411 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -1,6 +1,7 @@ package caches import ( + "bytes" "encoding/binary" "encoding/json" "errors" @@ -17,7 +18,6 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/utils/sizes" "github.com/TeaOSLab/EdgeNode/internal/zero" "github.com/iwind/TeaGo/Tea" - "github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/rands" "github.com/iwind/TeaGo/types" stringutil "github.com/iwind/TeaGo/utils/string" @@ -60,7 +60,7 @@ var sharedWritingFileKeyLocker = sync.Mutex{} // [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data] type FileStorage struct { policy *serverconfigs.HTTPCachePolicy - cacheConfig *serverconfigs.HTTPFileCacheStorage // 二级缓存 + options *serverconfigs.HTTPFileCacheStorage // 二级缓存 memoryStorage *MemoryStorage // 一级缓存 totalSize int64 @@ -92,31 +92,114 @@ func (this *FileStorage) Policy() *serverconfigs.HTTPCachePolicy { return this.policy } +// CanUpdatePolicy 检查策略是否可以更新 +func (this *FileStorage) CanUpdatePolicy(newPolicy *serverconfigs.HTTPCachePolicy) bool { + // 检查路径是否有变化 + oldOptionsJSON, err := json.Marshal(this.policy.Options) + if err != nil { + return false + } + var oldOptions = &serverconfigs.HTTPFileCacheStorage{} + err = json.Unmarshal(oldOptionsJSON, oldOptions) + if err != nil { + return false + } + + newOptionsJSON, err := json.Marshal(newPolicy.Options) + if err != nil { + return false + } + var newOptions = &serverconfigs.HTTPFileCacheStorage{} + err = json.Unmarshal(newOptionsJSON, newOptions) + if err != nil { + return false + } + + if oldOptions.Dir == newOptions.Dir { + return true + } + + return false +} + +// UpdatePolicy 修改策略 +func (this *FileStorage) UpdatePolicy(newPolicy *serverconfigs.HTTPCachePolicy) { + var oldOpenFileCache = this.options.OpenFileCache + + this.policy = newPolicy + + newOptionsJSON, err := json.Marshal(newPolicy.Options) + if err != nil { + return + } + var newOptions = &serverconfigs.HTTPFileCacheStorage{} + err = json.Unmarshal(newOptionsJSON, newOptions) + if err != nil { + remotelogs.Error("CACHE", "update policy '"+types.String(this.policy.Id)+"' failed: decode options failed: "+err.Error()) + return + } + + err = newOptions.Init() + if err != nil { + remotelogs.Error("CACHE", "update policy '"+types.String(this.policy.Id)+"' failed: init options failed: "+err.Error()) + return + } + + this.options = newOptions + + var memoryStorage = this.memoryStorage + if memoryStorage != nil { + if newOptions.MemoryPolicy != nil && newOptions.MemoryPolicy.CapacityBytes() > 0 { + memoryStorage.UpdatePolicy(newOptions.MemoryPolicy) + } else { + memoryStorage.Stop() + this.memoryStorage = nil + } + } else if newOptions.MemoryPolicy != nil && this.options.MemoryPolicy.Capacity != nil && this.options.MemoryPolicy.Capacity.Count > 0 { + err = this.createMemoryStorage() + if err != nil { + remotelogs.Error("CACHE", "update policy '"+types.String(this.policy.Id)+"' failed: create memory storage failed: "+err.Error()) + } + } + + // open cache + oldOpenFileCacheJSON, _ := json.Marshal(oldOpenFileCache) + newOpenFileCacheJSON, _ := json.Marshal(this.options.OpenFileCache) + if bytes.Compare(oldOpenFileCacheJSON, newOpenFileCacheJSON) != 0 { + this.initOpenFileCache() + } + + // Purge Ticker + if newPolicy.PersistenceAutoPurgeInterval != this.policy.PersistenceAutoPurgeInterval { + this.initPurgeTicker() + } +} + // Init 初始化 func (this *FileStorage) Init() error { this.locker.Lock() defer this.locker.Unlock() - before := time.Now() + var before = time.Now() // 配置 - cacheConfig := &serverconfigs.HTTPFileCacheStorage{} + var options = &serverconfigs.HTTPFileCacheStorage{} optionsJSON, err := json.Marshal(this.policy.Options) if err != nil { return err } - err = json.Unmarshal(optionsJSON, cacheConfig) + err = json.Unmarshal(optionsJSON, options) if err != nil { return err } - this.cacheConfig = cacheConfig + this.options = options - if !filepath.IsAbs(this.cacheConfig.Dir) { - this.cacheConfig.Dir = Tea.Root + Tea.DS + this.cacheConfig.Dir + if !filepath.IsAbs(this.options.Dir) { + this.options.Dir = Tea.Root + Tea.DS + this.options.Dir } - this.cacheConfig.Dir = filepath.Clean(this.cacheConfig.Dir) - var dir = this.cacheConfig.Dir + this.options.Dir = filepath.Clean(this.options.Dir) + var dir = this.options.Dir if len(dir) == 0 { return errors.New("[CACHE]cache storage dir can not be empty") @@ -127,7 +210,7 @@ func (this *FileStorage) Init() error { if err != nil { return err } - list.(*FileList).SetOldDir(this.cacheConfig.Dir + "/p" + types.String(this.policy.Id)) + list.(*FileList).SetOldDir(this.options.Dir + "/p" + types.String(this.policy.Id)) this.list = list stat, err := list.Stat(func(hash string) bool { return true @@ -158,8 +241,8 @@ func (this *FileStorage) Init() error { defer func() { // 统计 - count := stat.Count - size := stat.Size + var count = stat.Count + var size = stat.Size cost := time.Since(before).Seconds() * 1000 sizeMB := strconv.FormatInt(size, 10) + " Bytes" @@ -170,7 +253,7 @@ func (this *FileStorage) Init() error { } else if size > 1024 { sizeMB = fmt.Sprintf("%.3f K", float64(size)/1024) } - remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+" from '"+this.cacheConfig.Dir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+message.NewPrinter(language.English).Sprintf("%d", count)+", size: "+sizeMB) + remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+" from '"+this.options.Dir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+message.NewPrinter(language.English).Sprintf("%d", count)+", size: "+sizeMB) }() // 初始化list @@ -180,47 +263,15 @@ func (this *FileStorage) Init() error { } // 加载内存缓存 - if this.cacheConfig.MemoryPolicy != nil { - if this.cacheConfig.MemoryPolicy.Capacity != nil && this.cacheConfig.MemoryPolicy.Capacity.Count > 0 { - memoryPolicy := &serverconfigs.HTTPCachePolicy{ - Id: this.policy.Id, - IsOn: this.policy.IsOn, - Name: this.policy.Name, - Description: this.policy.Description, - Capacity: this.cacheConfig.MemoryPolicy.Capacity, - MaxKeys: this.policy.MaxKeys, - MaxSize: &shared.SizeCapacity{Count: 128, Unit: shared.SizeCapacityUnitMB}, // TODO 将来可以修改 - Type: serverconfigs.CachePolicyStorageMemory, - Options: this.policy.Options, - Life: this.policy.Life, - MinLife: this.policy.MinLife, - MaxLife: this.policy.MaxLife, - - MemoryAutoPurgeCount: this.policy.MemoryAutoPurgeCount, - MemoryAutoPurgeInterval: this.policy.MemoryAutoPurgeInterval, - MemoryLFUFreePercent: this.policy.MemoryLFUFreePercent, - } - err = memoryPolicy.Init() - if err != nil { - return err - } - memoryStorage := NewMemoryStorage(memoryPolicy, this) - err = memoryStorage.Init() - if err != nil { - return err - } - this.memoryStorage = memoryStorage + if this.options.MemoryPolicy != nil && this.options.MemoryPolicy.Capacity != nil && this.options.MemoryPolicy.Capacity.Count > 0 { + err = this.createMemoryStorage() + if err != nil { + return err } } // open file cache - if this.cacheConfig.OpenFileCache != nil && this.cacheConfig.OpenFileCache.IsOn && this.cacheConfig.OpenFileCache.Max > 0 { - this.openFileCache, err = NewOpenFileCache(this.cacheConfig.OpenFileCache.Max) - logs.Println("start open file cache") - if err != nil { - remotelogs.Error("CACHE", "open file cache failed: "+err.Error()) - } - } + this.initOpenFileCache() return nil } @@ -241,8 +292,9 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool, } // 先尝试内存缓存 - if allowMemory && this.memoryStorage != nil { - reader, err := this.memoryStorage.OpenReader(key, useStale, isPartial) + var memoryStorage = this.memoryStorage + if allowMemory && memoryStorage != nil { + reader, err := memoryStorage.OpenReader(key, useStale, isPartial) if err == nil { return reader, err } @@ -264,8 +316,9 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool, // TODO 尝试使用mmap加快读取速度 var isOk = false var openFile *OpenFile - if this.openFileCache != nil { - openFile = this.openFileCache.Get(path) + var openFileCache = this.openFileCache // 因为中间可能有修改,所以先赋值再获取 + if openFileCache != nil { + openFile = openFileCache.Get(path) } var fp *os.File var err error @@ -291,12 +344,12 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool, if isPartial { var partialFileReader = NewPartialFileReader(fp) partialFileReader.openFile = openFile - partialFileReader.openFileCache = this.openFileCache + partialFileReader.openFileCache = openFileCache reader = partialFileReader } else { var fileReader = NewFileReader(fp) fileReader.openFile = openFile - fileReader.openFileCache = this.openFileCache + fileReader.openFileCache = openFileCache reader = fileReader } err = reader.Init() @@ -332,8 +385,9 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, siz if maxSize > maxMemorySize { maxMemorySize = maxSize } - if !isPartial && this.memoryStorage != nil && ((size > 0 && size < maxMemorySize) || size < 0) { - writer, err := this.memoryStorage.OpenWriter(key, expiredAt, status, size, maxMemorySize, false) + var memoryStorage = this.memoryStorage + if !isPartial && memoryStorage != nil && ((size > 0 && size < maxMemorySize) || size < 0) { + writer, err := memoryStorage.OpenWriter(key, expiredAt, status, size, maxMemorySize, false) if err == nil { return writer, nil } @@ -376,7 +430,7 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, siz } var hash = stringutil.Md5(key) - var dir = this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4] + var dir = this.options.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4] _, err = os.Stat(dir) if err != nil { if !os.IsNotExist(err) { @@ -542,9 +596,10 @@ func (this *FileStorage) AddToList(item *Item) { return } - if this.memoryStorage != nil { + var memoryStorage = this.memoryStorage + if memoryStorage != nil { if item.Type == ItemTypeMemory { - this.memoryStorage.AddToList(item) + memoryStorage.AddToList(item) return } } @@ -568,9 +623,9 @@ func (this *FileStorage) Delete(key string) error { defer this.locker.Unlock() // 先尝试内存缓存 - if this.memoryStorage != nil { - _ = this.memoryStorage.Delete(key) - } + this.runMemoryStorageSafety(func(memoryStorage *MemoryStorage) { + _ = memoryStorage.Delete(key) + }) hash, path := this.keyPath(key) err := this.list.Remove(hash) @@ -601,9 +656,9 @@ func (this *FileStorage) CleanAll() error { defer this.locker.Unlock() // 先尝试内存缓存 - if this.memoryStorage != nil { - _ = this.memoryStorage.CleanAll() - } + this.runMemoryStorageSafety(func(memoryStorage *MemoryStorage) { + _ = memoryStorage.CleanAll() + }) err := this.list.CleanAll() if err != nil { @@ -677,9 +732,9 @@ func (this *FileStorage) Purge(keys []string, urlType string) error { defer this.locker.Unlock() // 先尝试内存缓存 - if this.memoryStorage != nil { - _ = this.memoryStorage.Purge(keys, urlType) - } + this.runMemoryStorageSafety(func(memoryStorage *MemoryStorage) { + _ = memoryStorage.Purge(keys, urlType) + }) // 目录 if urlType == "dir" { @@ -715,9 +770,9 @@ func (this *FileStorage) Stop() { defer this.locker.Unlock() // 先尝试内存缓存 - if this.memoryStorage != nil { - this.memoryStorage.Stop() - } + this.runMemoryStorageSafety(func(memoryStorage *MemoryStorage) { + memoryStorage.Stop() + }) _ = this.list.Reset() if this.purgeTicker != nil { @@ -743,10 +798,11 @@ func (this *FileStorage) TotalDiskSize() int64 { // TotalMemorySize 内存尺寸 func (this *FileStorage) TotalMemorySize() int64 { - if this.memoryStorage == nil { + var memoryStorage = this.memoryStorage + if memoryStorage == nil { return 0 } - return this.memoryStorage.TotalMemorySize() + return memoryStorage.TotalMemorySize() } // IgnoreKey 忽略某个Key,即不缓存某个Key @@ -756,13 +812,13 @@ func (this *FileStorage) IgnoreKey(key string) { // 绝对路径 func (this *FileStorage) dir() string { - return this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + return this.options.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" } // 获取Key对应的文件路径 func (this *FileStorage) keyPath(key string) (hash string, path string) { hash = stringutil.Md5(key) - dir := this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4] + dir := this.options.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4] path = dir + "/" + hash + ".cache" return } @@ -772,7 +828,7 @@ func (this *FileStorage) hashPath(hash string) (path string) { if len(hash) != 32 { return "" } - dir := this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4] + dir := this.options.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4] path = dir + "/" + hash + ".cache" return } @@ -793,14 +849,22 @@ func (this *FileStorage) initList() error { })**/ // 启动定时清理任务 - var autoPurgeInterval = this.policy.PersistenceAutoPurgeInterval - if autoPurgeInterval <= 0 { - autoPurgeInterval = 30 - if Tea.IsTesting() { - autoPurgeInterval = 10 - } + this.initPurgeTicker() + + // 热点处理任务 + this.hotTicker = utils.NewTicker(1 * time.Minute) + if Tea.IsTesting() { + this.hotTicker = utils.NewTicker(10 * time.Second) } - this.purgeTicker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second) + goman.New(func() { + for this.hotTicker.Next() { + trackers.Run("FILE_CACHE_STORAGE_HOT_LOOP", func() { + this.hotLoop() + }) + } + }) + + // 退出时停止 events.OnKey(events.EventQuit, this, func() { remotelogs.Println("CACHE", "quit clean timer") @@ -817,26 +881,6 @@ func (this *FileStorage) initList() error { } } }) - goman.New(func() { - for this.purgeTicker.Next() { - trackers.Run("FILE_CACHE_STORAGE_PURGE_LOOP", func() { - this.purgeLoop() - }) - } - }) - - // 热点处理任务 - this.hotTicker = utils.NewTicker(1 * time.Minute) - if Tea.IsTesting() { - this.hotTicker = utils.NewTicker(10 * time.Second) - } - goman.New(func() { - for this.hotTicker.Next() { - trackers.Run("FILE_CACHE_STORAGE_HOT_LOOP", func() { - this.hotLoop() - }) - } - }) return nil } @@ -994,7 +1038,7 @@ func (this *FileStorage) hotLoop() { expiresAt = bestExpiresAt } - writer, err := this.memoryStorage.openWriter(item.Key, expiresAt, reader.Status(), reader.BodySize(), -1, false) + writer, err := memoryStorage.openWriter(item.Key, expiresAt, reader.Status(), reader.BodySize(), -1, false) if err != nil { if !CanIgnoreErr(err) { remotelogs.Error("CACHE", "transfer hot item failed: "+err.Error()) @@ -1030,7 +1074,7 @@ func (this *FileStorage) hotLoop() { continue } - this.memoryStorage.AddToList(&Item{ + memoryStorage.AddToList(&Item{ Type: writer.ItemType(), Key: item.Key, ExpiredAt: expiresAt, @@ -1045,9 +1089,9 @@ func (this *FileStorage) hotLoop() { } func (this *FileStorage) diskCapacityBytes() int64 { - c1 := this.policy.CapacityBytes() + var c1 = this.policy.CapacityBytes() if SharedManager.MaxDiskCapacity != nil { - c2 := SharedManager.MaxDiskCapacity.Bytes() + var c2 = SharedManager.MaxDiskCapacity.Bytes() if c2 > 0 { return c2 } @@ -1097,6 +1141,8 @@ func (this *FileStorage) increaseHit(key string, hash string, reader Reader) { rate = rate / 10 } if rands.Int(0, rate) == 0 { + var memoryStorage = this.memoryStorage + var hitErr = this.list.IncreaseHit(hash) if hitErr != nil { // 此错误可以忽略 @@ -1105,7 +1151,7 @@ func (this *FileStorage) increaseHit(key string, hash string, reader Reader) { // 增加到热点 // 这里不收录缓存尺寸过大的文件 - if this.memoryStorage != nil && reader.BodySize() > 0 && reader.BodySize() < 128*1024*1024 { + if memoryStorage != nil && reader.BodySize() > 0 && reader.BodySize() < 128*1024*1024 { this.hotMapLocker.Lock() hotItem, ok := this.hotMap[key] @@ -1140,3 +1186,84 @@ func (this *FileStorage) removeCacheFile(path string) error { } return err } + +// 创建当前策略包含的内存缓存 +func (this *FileStorage) createMemoryStorage() error { + var memoryPolicy = &serverconfigs.HTTPCachePolicy{ + Id: this.policy.Id, + IsOn: this.policy.IsOn, + Name: this.policy.Name, + Description: this.policy.Description, + Capacity: this.options.MemoryPolicy.Capacity, + MaxKeys: this.policy.MaxKeys, + MaxSize: &shared.SizeCapacity{Count: 128, Unit: shared.SizeCapacityUnitMB}, // TODO 将来可以修改 + Type: serverconfigs.CachePolicyStorageMemory, + Options: this.policy.Options, + Life: this.policy.Life, + MinLife: this.policy.MinLife, + MaxLife: this.policy.MaxLife, + + MemoryAutoPurgeCount: this.policy.MemoryAutoPurgeCount, + MemoryAutoPurgeInterval: this.policy.MemoryAutoPurgeInterval, + MemoryLFUFreePercent: this.policy.MemoryLFUFreePercent, + } + err := memoryPolicy.Init() + if err != nil { + return err + } + var memoryStorage = NewMemoryStorage(memoryPolicy, this) + err = memoryStorage.Init() + if err != nil { + return err + } + this.memoryStorage = memoryStorage + + return nil +} + +func (this *FileStorage) initPurgeTicker() { + var autoPurgeInterval = this.policy.PersistenceAutoPurgeInterval + if autoPurgeInterval <= 0 { + autoPurgeInterval = 30 + if Tea.IsTesting() { + autoPurgeInterval = 10 + } + } + if this.purgeTicker != nil { + this.purgeTicker.Stop() + } + this.purgeTicker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second) + goman.New(func() { + for this.purgeTicker.Next() { + trackers.Run("FILE_CACHE_STORAGE_PURGE_LOOP", func() { + this.purgeLoop() + }) + } + }) +} + +func (this *FileStorage) initOpenFileCache() { + var err error + + var oldOpenFileCache = this.openFileCache + + // 启用新的 + if this.options.OpenFileCache != nil && this.options.OpenFileCache.IsOn && this.options.OpenFileCache.Max > 0 { + this.openFileCache, err = NewOpenFileCache(this.options.OpenFileCache.Max) + if err != nil { + remotelogs.Error("CACHE", "open file cache failed: "+err.Error()) + } + } + + // 关闭老的 + if oldOpenFileCache != nil { + oldOpenFileCache.CloseAll() + } +} + +func (this *FileStorage) runMemoryStorageSafety(f func(memoryStorage *MemoryStorage)) { + var memoryStorage = this.memoryStorage + if memoryStorage != nil { + f(memoryStorage) + } +} diff --git a/internal/caches/storage_file_test.go b/internal/caches/storage_file_test.go index 376d44c..e8ab92b 100644 --- a/internal/caches/storage_file_test.go +++ b/internal/caches/storage_file_test.go @@ -564,8 +564,8 @@ func BenchmarkFileStorage_KeyPath(b *testing.B) { runtime.GOMAXPROCS(1) var storage = &FileStorage{ - cacheConfig: &serverconfigs.HTTPFileCacheStorage{}, - policy: &serverconfigs.HTTPCachePolicy{Id: 1}, + options: &serverconfigs.HTTPFileCacheStorage{}, + policy: &serverconfigs.HTTPCachePolicy{Id: 1}, } for i := 0; i < b.N; i++ { diff --git a/internal/caches/storage_interface.go b/internal/caches/storage_interface.go index 01a3fb6..6c03d52 100644 --- a/internal/caches/storage_interface.go +++ b/internal/caches/storage_interface.go @@ -40,6 +40,12 @@ type StorageInterface interface { // Policy 获取当前存储的Policy Policy() *serverconfigs.HTTPCachePolicy + // UpdatePolicy 修改策略 + UpdatePolicy(newPolicy *serverconfigs.HTTPCachePolicy) + + // CanUpdatePolicy 检查策略是否可以更新 + CanUpdatePolicy(newPolicy *serverconfigs.HTTPCachePolicy) bool + // AddToList 将缓存添加到列表 AddToList(item *Item) diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index 9c6a6b5..7a78036 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -89,20 +89,7 @@ func (this *MemoryStorage) Init() error { atomic.AddInt64(&this.totalSize, -item.TotalSize()) }) - var autoPurgeInterval = this.policy.MemoryAutoPurgeInterval - if autoPurgeInterval <= 0 { - autoPurgeInterval = 5 - } - - // 启动定时清理任务 - this.purgeTicker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second) - goman.New(func() { - for this.purgeTicker.Next() { - var tr = trackers.Begin("MEMORY_CACHE_STORAGE_PURGE_LOOP") - this.purgeLoop() - tr.End() - } - }) + this.initPurgeTicker() // 启动定时Flush memory to disk任务 if this.parentStorage != nil { @@ -323,6 +310,26 @@ func (this *MemoryStorage) Policy() *serverconfigs.HTTPCachePolicy { return this.policy } +// UpdatePolicy 修改策略 +func (this *MemoryStorage) UpdatePolicy(newPolicy *serverconfigs.HTTPCachePolicy) { + var oldPolicy = this.policy + this.policy = newPolicy + + if oldPolicy.MemoryAutoPurgeInterval != newPolicy.MemoryAutoPurgeInterval { + this.initPurgeTicker() + } + + // 如果是空的,则清空 + if newPolicy.CapacityBytes() == 0 { + _ = this.CleanAll() + } +} + +// CanUpdatePolicy 检查策略是否可以更新 +func (this *MemoryStorage) CanUpdatePolicy(newPolicy *serverconfigs.HTTPCachePolicy) bool { + return true +} + // AddToList 将缓存添加到列表 func (this *MemoryStorage) AddToList(item *Item) { item.MetaSize = int64(len(item.Key)) + 128 /** 128是我们评估的数据结构的长度 **/ @@ -497,3 +504,25 @@ func (this *MemoryStorage) deleteWithoutLocker(key string) error { _ = this.list.Remove(fmt.Sprintf("%d", hash)) return nil } + +func (this *MemoryStorage) initPurgeTicker() { + var autoPurgeInterval = this.policy.MemoryAutoPurgeInterval + if autoPurgeInterval <= 0 { + autoPurgeInterval = 5 + } + + // 启动定时清理任务 + + if this.purgeTicker != nil { + this.purgeTicker.Stop() + } + + this.purgeTicker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second) + goman.New(func() { + for this.purgeTicker.Next() { + var tr = trackers.Begin("MEMORY_CACHE_STORAGE_PURGE_LOOP") + this.purgeLoop() + tr.End() + } + }) +}