mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-08 11:20:27 +08:00
缓存策略修改时尽可能不重新加载
This commit is contained in:
@@ -99,7 +99,19 @@ func (this *Manager) UpdatePolicies(newPolicies []*serverconfigs.HTTPCachePolicy
|
|||||||
} else {
|
} else {
|
||||||
// 检查policy是否有变化
|
// 检查policy是否有变化
|
||||||
if !storage.Policy().IsSame(policy) {
|
if !storage.Policy().IsSame(policy) {
|
||||||
remotelogs.Println("CACHE", "policy "+strconv.FormatInt(policy.Id, 10)+" changed")
|
// 检查是否可以直接修改
|
||||||
|
if storage.CanUpdatePolicy(policy) {
|
||||||
|
err := policy.Init()
|
||||||
|
if err != nil {
|
||||||
|
remotelogs.Error("CACHE", "reload policy '"+types.String(policy.Id)+"' failed: init policy failed: "+err.Error())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
remotelogs.Println("CACHE", "reload policy '"+types.String(policy.Id)+"'")
|
||||||
|
storage.UpdatePolicy(policy)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
remotelogs.Println("CACHE", "restart policy '"+types.String(policy.Id)+"'")
|
||||||
|
|
||||||
// 停止老的
|
// 停止老的
|
||||||
storage.Stop()
|
storage.Stop()
|
||||||
|
|||||||
@@ -2,19 +2,20 @@ package caches
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||||
|
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
|
||||||
"github.com/iwind/TeaGo/Tea"
|
"github.com/iwind/TeaGo/Tea"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestManager_UpdatePolicies(t *testing.T) {
|
func TestManager_UpdatePolicies(t *testing.T) {
|
||||||
{
|
{
|
||||||
policies := []*serverconfigs.HTTPCachePolicy{}
|
var policies = []*serverconfigs.HTTPCachePolicy{}
|
||||||
SharedManager.UpdatePolicies(policies)
|
SharedManager.UpdatePolicies(policies)
|
||||||
printManager(t)
|
printManager(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
policies := []*serverconfigs.HTTPCachePolicy{
|
var policies = []*serverconfigs.HTTPCachePolicy{
|
||||||
{
|
{
|
||||||
Id: 1,
|
Id: 1,
|
||||||
Type: serverconfigs.CachePolicyStorageFile,
|
Type: serverconfigs.CachePolicyStorageFile,
|
||||||
@@ -42,7 +43,7 @@ func TestManager_UpdatePolicies(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
policies := []*serverconfigs.HTTPCachePolicy{
|
var policies = []*serverconfigs.HTTPCachePolicy{
|
||||||
{
|
{
|
||||||
Id: 1,
|
Id: 1,
|
||||||
Type: serverconfigs.CachePolicyStorageFile,
|
Type: serverconfigs.CachePolicyStorageFile,
|
||||||
@@ -71,6 +72,50 @@ func TestManager_UpdatePolicies(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestManager_ChangePolicy_Memory(t *testing.T) {
|
||||||
|
var policies = []*serverconfigs.HTTPCachePolicy{
|
||||||
|
{
|
||||||
|
Id: 1,
|
||||||
|
Type: serverconfigs.CachePolicyStorageMemory,
|
||||||
|
Options: map[string]interface{}{},
|
||||||
|
Capacity: &shared.SizeCapacity{Count: 1, Unit: shared.SizeCapacityUnitGB},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
SharedManager.UpdatePolicies(policies)
|
||||||
|
SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{
|
||||||
|
{
|
||||||
|
Id: 1,
|
||||||
|
Type: serverconfigs.CachePolicyStorageMemory,
|
||||||
|
Options: map[string]interface{}{},
|
||||||
|
Capacity: &shared.SizeCapacity{Count: 2, Unit: shared.SizeCapacityUnitGB},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestManager_ChangePolicy_File(t *testing.T) {
|
||||||
|
var policies = []*serverconfigs.HTTPCachePolicy{
|
||||||
|
{
|
||||||
|
Id: 1,
|
||||||
|
Type: serverconfigs.CachePolicyStorageFile,
|
||||||
|
Options: map[string]interface{}{
|
||||||
|
"dir": Tea.Root + "/data/cache-index/p1",
|
||||||
|
},
|
||||||
|
Capacity: &shared.SizeCapacity{Count: 1, Unit: shared.SizeCapacityUnitGB},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
SharedManager.UpdatePolicies(policies)
|
||||||
|
SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{
|
||||||
|
{
|
||||||
|
Id: 1,
|
||||||
|
Type: serverconfigs.CachePolicyStorageFile,
|
||||||
|
Options: map[string]interface{}{
|
||||||
|
"dir": Tea.Root + "/data/cache-index/p1",
|
||||||
|
},
|
||||||
|
Capacity: &shared.SizeCapacity{Count: 2, Unit: shared.SizeCapacityUnitGB},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func printManager(t *testing.T) {
|
func printManager(t *testing.T) {
|
||||||
t.Log("===manager==")
|
t.Log("===manager==")
|
||||||
t.Log("storage:")
|
t.Log("storage:")
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package caches
|
package caches
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
@@ -17,7 +18,6 @@ import (
|
|||||||
"github.com/TeaOSLab/EdgeNode/internal/utils/sizes"
|
"github.com/TeaOSLab/EdgeNode/internal/utils/sizes"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/zero"
|
"github.com/TeaOSLab/EdgeNode/internal/zero"
|
||||||
"github.com/iwind/TeaGo/Tea"
|
"github.com/iwind/TeaGo/Tea"
|
||||||
"github.com/iwind/TeaGo/logs"
|
|
||||||
"github.com/iwind/TeaGo/rands"
|
"github.com/iwind/TeaGo/rands"
|
||||||
"github.com/iwind/TeaGo/types"
|
"github.com/iwind/TeaGo/types"
|
||||||
stringutil "github.com/iwind/TeaGo/utils/string"
|
stringutil "github.com/iwind/TeaGo/utils/string"
|
||||||
@@ -60,7 +60,7 @@ var sharedWritingFileKeyLocker = sync.Mutex{}
|
|||||||
// [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data]
|
// [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data]
|
||||||
type FileStorage struct {
|
type FileStorage struct {
|
||||||
policy *serverconfigs.HTTPCachePolicy
|
policy *serverconfigs.HTTPCachePolicy
|
||||||
cacheConfig *serverconfigs.HTTPFileCacheStorage // 二级缓存
|
options *serverconfigs.HTTPFileCacheStorage // 二级缓存
|
||||||
memoryStorage *MemoryStorage // 一级缓存
|
memoryStorage *MemoryStorage // 一级缓存
|
||||||
totalSize int64
|
totalSize int64
|
||||||
|
|
||||||
@@ -92,31 +92,114 @@ func (this *FileStorage) Policy() *serverconfigs.HTTPCachePolicy {
|
|||||||
return this.policy
|
return this.policy
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CanUpdatePolicy 检查策略是否可以更新
|
||||||
|
func (this *FileStorage) CanUpdatePolicy(newPolicy *serverconfigs.HTTPCachePolicy) bool {
|
||||||
|
// 检查路径是否有变化
|
||||||
|
oldOptionsJSON, err := json.Marshal(this.policy.Options)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
var oldOptions = &serverconfigs.HTTPFileCacheStorage{}
|
||||||
|
err = json.Unmarshal(oldOptionsJSON, oldOptions)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
newOptionsJSON, err := json.Marshal(newPolicy.Options)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
var newOptions = &serverconfigs.HTTPFileCacheStorage{}
|
||||||
|
err = json.Unmarshal(newOptionsJSON, newOptions)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if oldOptions.Dir == newOptions.Dir {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdatePolicy 修改策略
|
||||||
|
func (this *FileStorage) UpdatePolicy(newPolicy *serverconfigs.HTTPCachePolicy) {
|
||||||
|
var oldOpenFileCache = this.options.OpenFileCache
|
||||||
|
|
||||||
|
this.policy = newPolicy
|
||||||
|
|
||||||
|
newOptionsJSON, err := json.Marshal(newPolicy.Options)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var newOptions = &serverconfigs.HTTPFileCacheStorage{}
|
||||||
|
err = json.Unmarshal(newOptionsJSON, newOptions)
|
||||||
|
if err != nil {
|
||||||
|
remotelogs.Error("CACHE", "update policy '"+types.String(this.policy.Id)+"' failed: decode options failed: "+err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = newOptions.Init()
|
||||||
|
if err != nil {
|
||||||
|
remotelogs.Error("CACHE", "update policy '"+types.String(this.policy.Id)+"' failed: init options failed: "+err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
this.options = newOptions
|
||||||
|
|
||||||
|
var memoryStorage = this.memoryStorage
|
||||||
|
if memoryStorage != nil {
|
||||||
|
if newOptions.MemoryPolicy != nil && newOptions.MemoryPolicy.CapacityBytes() > 0 {
|
||||||
|
memoryStorage.UpdatePolicy(newOptions.MemoryPolicy)
|
||||||
|
} else {
|
||||||
|
memoryStorage.Stop()
|
||||||
|
this.memoryStorage = nil
|
||||||
|
}
|
||||||
|
} else if newOptions.MemoryPolicy != nil && this.options.MemoryPolicy.Capacity != nil && this.options.MemoryPolicy.Capacity.Count > 0 {
|
||||||
|
err = this.createMemoryStorage()
|
||||||
|
if err != nil {
|
||||||
|
remotelogs.Error("CACHE", "update policy '"+types.String(this.policy.Id)+"' failed: create memory storage failed: "+err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// open cache
|
||||||
|
oldOpenFileCacheJSON, _ := json.Marshal(oldOpenFileCache)
|
||||||
|
newOpenFileCacheJSON, _ := json.Marshal(this.options.OpenFileCache)
|
||||||
|
if bytes.Compare(oldOpenFileCacheJSON, newOpenFileCacheJSON) != 0 {
|
||||||
|
this.initOpenFileCache()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Purge Ticker
|
||||||
|
if newPolicy.PersistenceAutoPurgeInterval != this.policy.PersistenceAutoPurgeInterval {
|
||||||
|
this.initPurgeTicker()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Init 初始化
|
// Init 初始化
|
||||||
func (this *FileStorage) Init() error {
|
func (this *FileStorage) Init() error {
|
||||||
this.locker.Lock()
|
this.locker.Lock()
|
||||||
defer this.locker.Unlock()
|
defer this.locker.Unlock()
|
||||||
|
|
||||||
before := time.Now()
|
var before = time.Now()
|
||||||
|
|
||||||
// 配置
|
// 配置
|
||||||
cacheConfig := &serverconfigs.HTTPFileCacheStorage{}
|
var options = &serverconfigs.HTTPFileCacheStorage{}
|
||||||
optionsJSON, err := json.Marshal(this.policy.Options)
|
optionsJSON, err := json.Marshal(this.policy.Options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = json.Unmarshal(optionsJSON, cacheConfig)
|
err = json.Unmarshal(optionsJSON, options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
this.cacheConfig = cacheConfig
|
this.options = options
|
||||||
|
|
||||||
if !filepath.IsAbs(this.cacheConfig.Dir) {
|
if !filepath.IsAbs(this.options.Dir) {
|
||||||
this.cacheConfig.Dir = Tea.Root + Tea.DS + this.cacheConfig.Dir
|
this.options.Dir = Tea.Root + Tea.DS + this.options.Dir
|
||||||
}
|
}
|
||||||
|
|
||||||
this.cacheConfig.Dir = filepath.Clean(this.cacheConfig.Dir)
|
this.options.Dir = filepath.Clean(this.options.Dir)
|
||||||
var dir = this.cacheConfig.Dir
|
var dir = this.options.Dir
|
||||||
|
|
||||||
if len(dir) == 0 {
|
if len(dir) == 0 {
|
||||||
return errors.New("[CACHE]cache storage dir can not be empty")
|
return errors.New("[CACHE]cache storage dir can not be empty")
|
||||||
@@ -127,7 +210,7 @@ func (this *FileStorage) Init() error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
list.(*FileList).SetOldDir(this.cacheConfig.Dir + "/p" + types.String(this.policy.Id))
|
list.(*FileList).SetOldDir(this.options.Dir + "/p" + types.String(this.policy.Id))
|
||||||
this.list = list
|
this.list = list
|
||||||
stat, err := list.Stat(func(hash string) bool {
|
stat, err := list.Stat(func(hash string) bool {
|
||||||
return true
|
return true
|
||||||
@@ -158,8 +241,8 @@ func (this *FileStorage) Init() error {
|
|||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
// 统计
|
// 统计
|
||||||
count := stat.Count
|
var count = stat.Count
|
||||||
size := stat.Size
|
var size = stat.Size
|
||||||
|
|
||||||
cost := time.Since(before).Seconds() * 1000
|
cost := time.Since(before).Seconds() * 1000
|
||||||
sizeMB := strconv.FormatInt(size, 10) + " Bytes"
|
sizeMB := strconv.FormatInt(size, 10) + " Bytes"
|
||||||
@@ -170,7 +253,7 @@ func (this *FileStorage) Init() error {
|
|||||||
} else if size > 1024 {
|
} else if size > 1024 {
|
||||||
sizeMB = fmt.Sprintf("%.3f K", float64(size)/1024)
|
sizeMB = fmt.Sprintf("%.3f K", float64(size)/1024)
|
||||||
}
|
}
|
||||||
remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+" from '"+this.cacheConfig.Dir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+message.NewPrinter(language.English).Sprintf("%d", count)+", size: "+sizeMB)
|
remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+" from '"+this.options.Dir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+message.NewPrinter(language.English).Sprintf("%d", count)+", size: "+sizeMB)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// 初始化list
|
// 初始化list
|
||||||
@@ -180,47 +263,15 @@ func (this *FileStorage) Init() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 加载内存缓存
|
// 加载内存缓存
|
||||||
if this.cacheConfig.MemoryPolicy != nil {
|
if this.options.MemoryPolicy != nil && this.options.MemoryPolicy.Capacity != nil && this.options.MemoryPolicy.Capacity.Count > 0 {
|
||||||
if this.cacheConfig.MemoryPolicy.Capacity != nil && this.cacheConfig.MemoryPolicy.Capacity.Count > 0 {
|
err = this.createMemoryStorage()
|
||||||
memoryPolicy := &serverconfigs.HTTPCachePolicy{
|
if err != nil {
|
||||||
Id: this.policy.Id,
|
return err
|
||||||
IsOn: this.policy.IsOn,
|
|
||||||
Name: this.policy.Name,
|
|
||||||
Description: this.policy.Description,
|
|
||||||
Capacity: this.cacheConfig.MemoryPolicy.Capacity,
|
|
||||||
MaxKeys: this.policy.MaxKeys,
|
|
||||||
MaxSize: &shared.SizeCapacity{Count: 128, Unit: shared.SizeCapacityUnitMB}, // TODO 将来可以修改
|
|
||||||
Type: serverconfigs.CachePolicyStorageMemory,
|
|
||||||
Options: this.policy.Options,
|
|
||||||
Life: this.policy.Life,
|
|
||||||
MinLife: this.policy.MinLife,
|
|
||||||
MaxLife: this.policy.MaxLife,
|
|
||||||
|
|
||||||
MemoryAutoPurgeCount: this.policy.MemoryAutoPurgeCount,
|
|
||||||
MemoryAutoPurgeInterval: this.policy.MemoryAutoPurgeInterval,
|
|
||||||
MemoryLFUFreePercent: this.policy.MemoryLFUFreePercent,
|
|
||||||
}
|
|
||||||
err = memoryPolicy.Init()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
memoryStorage := NewMemoryStorage(memoryPolicy, this)
|
|
||||||
err = memoryStorage.Init()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
this.memoryStorage = memoryStorage
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// open file cache
|
// open file cache
|
||||||
if this.cacheConfig.OpenFileCache != nil && this.cacheConfig.OpenFileCache.IsOn && this.cacheConfig.OpenFileCache.Max > 0 {
|
this.initOpenFileCache()
|
||||||
this.openFileCache, err = NewOpenFileCache(this.cacheConfig.OpenFileCache.Max)
|
|
||||||
logs.Println("start open file cache")
|
|
||||||
if err != nil {
|
|
||||||
remotelogs.Error("CACHE", "open file cache failed: "+err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -241,8 +292,9 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 先尝试内存缓存
|
// 先尝试内存缓存
|
||||||
if allowMemory && this.memoryStorage != nil {
|
var memoryStorage = this.memoryStorage
|
||||||
reader, err := this.memoryStorage.OpenReader(key, useStale, isPartial)
|
if allowMemory && memoryStorage != nil {
|
||||||
|
reader, err := memoryStorage.OpenReader(key, useStale, isPartial)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return reader, err
|
return reader, err
|
||||||
}
|
}
|
||||||
@@ -264,8 +316,9 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
|
|||||||
// TODO 尝试使用mmap加快读取速度
|
// TODO 尝试使用mmap加快读取速度
|
||||||
var isOk = false
|
var isOk = false
|
||||||
var openFile *OpenFile
|
var openFile *OpenFile
|
||||||
if this.openFileCache != nil {
|
var openFileCache = this.openFileCache // 因为中间可能有修改,所以先赋值再获取
|
||||||
openFile = this.openFileCache.Get(path)
|
if openFileCache != nil {
|
||||||
|
openFile = openFileCache.Get(path)
|
||||||
}
|
}
|
||||||
var fp *os.File
|
var fp *os.File
|
||||||
var err error
|
var err error
|
||||||
@@ -291,12 +344,12 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
|
|||||||
if isPartial {
|
if isPartial {
|
||||||
var partialFileReader = NewPartialFileReader(fp)
|
var partialFileReader = NewPartialFileReader(fp)
|
||||||
partialFileReader.openFile = openFile
|
partialFileReader.openFile = openFile
|
||||||
partialFileReader.openFileCache = this.openFileCache
|
partialFileReader.openFileCache = openFileCache
|
||||||
reader = partialFileReader
|
reader = partialFileReader
|
||||||
} else {
|
} else {
|
||||||
var fileReader = NewFileReader(fp)
|
var fileReader = NewFileReader(fp)
|
||||||
fileReader.openFile = openFile
|
fileReader.openFile = openFile
|
||||||
fileReader.openFileCache = this.openFileCache
|
fileReader.openFileCache = openFileCache
|
||||||
reader = fileReader
|
reader = fileReader
|
||||||
}
|
}
|
||||||
err = reader.Init()
|
err = reader.Init()
|
||||||
@@ -332,8 +385,9 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, siz
|
|||||||
if maxSize > maxMemorySize {
|
if maxSize > maxMemorySize {
|
||||||
maxMemorySize = maxSize
|
maxMemorySize = maxSize
|
||||||
}
|
}
|
||||||
if !isPartial && this.memoryStorage != nil && ((size > 0 && size < maxMemorySize) || size < 0) {
|
var memoryStorage = this.memoryStorage
|
||||||
writer, err := this.memoryStorage.OpenWriter(key, expiredAt, status, size, maxMemorySize, false)
|
if !isPartial && memoryStorage != nil && ((size > 0 && size < maxMemorySize) || size < 0) {
|
||||||
|
writer, err := memoryStorage.OpenWriter(key, expiredAt, status, size, maxMemorySize, false)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return writer, nil
|
return writer, nil
|
||||||
}
|
}
|
||||||
@@ -376,7 +430,7 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, siz
|
|||||||
}
|
}
|
||||||
|
|
||||||
var hash = stringutil.Md5(key)
|
var hash = stringutil.Md5(key)
|
||||||
var dir = this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4]
|
var dir = this.options.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4]
|
||||||
_, err = os.Stat(dir)
|
_, err = os.Stat(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !os.IsNotExist(err) {
|
if !os.IsNotExist(err) {
|
||||||
@@ -542,9 +596,10 @@ func (this *FileStorage) AddToList(item *Item) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if this.memoryStorage != nil {
|
var memoryStorage = this.memoryStorage
|
||||||
|
if memoryStorage != nil {
|
||||||
if item.Type == ItemTypeMemory {
|
if item.Type == ItemTypeMemory {
|
||||||
this.memoryStorage.AddToList(item)
|
memoryStorage.AddToList(item)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -568,9 +623,9 @@ func (this *FileStorage) Delete(key string) error {
|
|||||||
defer this.locker.Unlock()
|
defer this.locker.Unlock()
|
||||||
|
|
||||||
// 先尝试内存缓存
|
// 先尝试内存缓存
|
||||||
if this.memoryStorage != nil {
|
this.runMemoryStorageSafety(func(memoryStorage *MemoryStorage) {
|
||||||
_ = this.memoryStorage.Delete(key)
|
_ = memoryStorage.Delete(key)
|
||||||
}
|
})
|
||||||
|
|
||||||
hash, path := this.keyPath(key)
|
hash, path := this.keyPath(key)
|
||||||
err := this.list.Remove(hash)
|
err := this.list.Remove(hash)
|
||||||
@@ -601,9 +656,9 @@ func (this *FileStorage) CleanAll() error {
|
|||||||
defer this.locker.Unlock()
|
defer this.locker.Unlock()
|
||||||
|
|
||||||
// 先尝试内存缓存
|
// 先尝试内存缓存
|
||||||
if this.memoryStorage != nil {
|
this.runMemoryStorageSafety(func(memoryStorage *MemoryStorage) {
|
||||||
_ = this.memoryStorage.CleanAll()
|
_ = memoryStorage.CleanAll()
|
||||||
}
|
})
|
||||||
|
|
||||||
err := this.list.CleanAll()
|
err := this.list.CleanAll()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -677,9 +732,9 @@ func (this *FileStorage) Purge(keys []string, urlType string) error {
|
|||||||
defer this.locker.Unlock()
|
defer this.locker.Unlock()
|
||||||
|
|
||||||
// 先尝试内存缓存
|
// 先尝试内存缓存
|
||||||
if this.memoryStorage != nil {
|
this.runMemoryStorageSafety(func(memoryStorage *MemoryStorage) {
|
||||||
_ = this.memoryStorage.Purge(keys, urlType)
|
_ = memoryStorage.Purge(keys, urlType)
|
||||||
}
|
})
|
||||||
|
|
||||||
// 目录
|
// 目录
|
||||||
if urlType == "dir" {
|
if urlType == "dir" {
|
||||||
@@ -715,9 +770,9 @@ func (this *FileStorage) Stop() {
|
|||||||
defer this.locker.Unlock()
|
defer this.locker.Unlock()
|
||||||
|
|
||||||
// 先尝试内存缓存
|
// 先尝试内存缓存
|
||||||
if this.memoryStorage != nil {
|
this.runMemoryStorageSafety(func(memoryStorage *MemoryStorage) {
|
||||||
this.memoryStorage.Stop()
|
memoryStorage.Stop()
|
||||||
}
|
})
|
||||||
|
|
||||||
_ = this.list.Reset()
|
_ = this.list.Reset()
|
||||||
if this.purgeTicker != nil {
|
if this.purgeTicker != nil {
|
||||||
@@ -743,10 +798,11 @@ func (this *FileStorage) TotalDiskSize() int64 {
|
|||||||
|
|
||||||
// TotalMemorySize 内存尺寸
|
// TotalMemorySize 内存尺寸
|
||||||
func (this *FileStorage) TotalMemorySize() int64 {
|
func (this *FileStorage) TotalMemorySize() int64 {
|
||||||
if this.memoryStorage == nil {
|
var memoryStorage = this.memoryStorage
|
||||||
|
if memoryStorage == nil {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
return this.memoryStorage.TotalMemorySize()
|
return memoryStorage.TotalMemorySize()
|
||||||
}
|
}
|
||||||
|
|
||||||
// IgnoreKey 忽略某个Key,即不缓存某个Key
|
// IgnoreKey 忽略某个Key,即不缓存某个Key
|
||||||
@@ -756,13 +812,13 @@ func (this *FileStorage) IgnoreKey(key string) {
|
|||||||
|
|
||||||
// 绝对路径
|
// 绝对路径
|
||||||
func (this *FileStorage) dir() string {
|
func (this *FileStorage) dir() string {
|
||||||
return this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/"
|
return this.options.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/"
|
||||||
}
|
}
|
||||||
|
|
||||||
// 获取Key对应的文件路径
|
// 获取Key对应的文件路径
|
||||||
func (this *FileStorage) keyPath(key string) (hash string, path string) {
|
func (this *FileStorage) keyPath(key string) (hash string, path string) {
|
||||||
hash = stringutil.Md5(key)
|
hash = stringutil.Md5(key)
|
||||||
dir := this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4]
|
dir := this.options.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4]
|
||||||
path = dir + "/" + hash + ".cache"
|
path = dir + "/" + hash + ".cache"
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -772,7 +828,7 @@ func (this *FileStorage) hashPath(hash string) (path string) {
|
|||||||
if len(hash) != 32 {
|
if len(hash) != 32 {
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
dir := this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4]
|
dir := this.options.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4]
|
||||||
path = dir + "/" + hash + ".cache"
|
path = dir + "/" + hash + ".cache"
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -793,14 +849,22 @@ func (this *FileStorage) initList() error {
|
|||||||
})**/
|
})**/
|
||||||
|
|
||||||
// 启动定时清理任务
|
// 启动定时清理任务
|
||||||
var autoPurgeInterval = this.policy.PersistenceAutoPurgeInterval
|
this.initPurgeTicker()
|
||||||
if autoPurgeInterval <= 0 {
|
|
||||||
autoPurgeInterval = 30
|
// 热点处理任务
|
||||||
if Tea.IsTesting() {
|
this.hotTicker = utils.NewTicker(1 * time.Minute)
|
||||||
autoPurgeInterval = 10
|
if Tea.IsTesting() {
|
||||||
}
|
this.hotTicker = utils.NewTicker(10 * time.Second)
|
||||||
}
|
}
|
||||||
this.purgeTicker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second)
|
goman.New(func() {
|
||||||
|
for this.hotTicker.Next() {
|
||||||
|
trackers.Run("FILE_CACHE_STORAGE_HOT_LOOP", func() {
|
||||||
|
this.hotLoop()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// 退出时停止
|
||||||
events.OnKey(events.EventQuit, this, func() {
|
events.OnKey(events.EventQuit, this, func() {
|
||||||
remotelogs.Println("CACHE", "quit clean timer")
|
remotelogs.Println("CACHE", "quit clean timer")
|
||||||
|
|
||||||
@@ -817,26 +881,6 @@ func (this *FileStorage) initList() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
goman.New(func() {
|
|
||||||
for this.purgeTicker.Next() {
|
|
||||||
trackers.Run("FILE_CACHE_STORAGE_PURGE_LOOP", func() {
|
|
||||||
this.purgeLoop()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// 热点处理任务
|
|
||||||
this.hotTicker = utils.NewTicker(1 * time.Minute)
|
|
||||||
if Tea.IsTesting() {
|
|
||||||
this.hotTicker = utils.NewTicker(10 * time.Second)
|
|
||||||
}
|
|
||||||
goman.New(func() {
|
|
||||||
for this.hotTicker.Next() {
|
|
||||||
trackers.Run("FILE_CACHE_STORAGE_HOT_LOOP", func() {
|
|
||||||
this.hotLoop()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -994,7 +1038,7 @@ func (this *FileStorage) hotLoop() {
|
|||||||
expiresAt = bestExpiresAt
|
expiresAt = bestExpiresAt
|
||||||
}
|
}
|
||||||
|
|
||||||
writer, err := this.memoryStorage.openWriter(item.Key, expiresAt, reader.Status(), reader.BodySize(), -1, false)
|
writer, err := memoryStorage.openWriter(item.Key, expiresAt, reader.Status(), reader.BodySize(), -1, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !CanIgnoreErr(err) {
|
if !CanIgnoreErr(err) {
|
||||||
remotelogs.Error("CACHE", "transfer hot item failed: "+err.Error())
|
remotelogs.Error("CACHE", "transfer hot item failed: "+err.Error())
|
||||||
@@ -1030,7 +1074,7 @@ func (this *FileStorage) hotLoop() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
this.memoryStorage.AddToList(&Item{
|
memoryStorage.AddToList(&Item{
|
||||||
Type: writer.ItemType(),
|
Type: writer.ItemType(),
|
||||||
Key: item.Key,
|
Key: item.Key,
|
||||||
ExpiredAt: expiresAt,
|
ExpiredAt: expiresAt,
|
||||||
@@ -1045,9 +1089,9 @@ func (this *FileStorage) hotLoop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *FileStorage) diskCapacityBytes() int64 {
|
func (this *FileStorage) diskCapacityBytes() int64 {
|
||||||
c1 := this.policy.CapacityBytes()
|
var c1 = this.policy.CapacityBytes()
|
||||||
if SharedManager.MaxDiskCapacity != nil {
|
if SharedManager.MaxDiskCapacity != nil {
|
||||||
c2 := SharedManager.MaxDiskCapacity.Bytes()
|
var c2 = SharedManager.MaxDiskCapacity.Bytes()
|
||||||
if c2 > 0 {
|
if c2 > 0 {
|
||||||
return c2
|
return c2
|
||||||
}
|
}
|
||||||
@@ -1097,6 +1141,8 @@ func (this *FileStorage) increaseHit(key string, hash string, reader Reader) {
|
|||||||
rate = rate / 10
|
rate = rate / 10
|
||||||
}
|
}
|
||||||
if rands.Int(0, rate) == 0 {
|
if rands.Int(0, rate) == 0 {
|
||||||
|
var memoryStorage = this.memoryStorage
|
||||||
|
|
||||||
var hitErr = this.list.IncreaseHit(hash)
|
var hitErr = this.list.IncreaseHit(hash)
|
||||||
if hitErr != nil {
|
if hitErr != nil {
|
||||||
// 此错误可以忽略
|
// 此错误可以忽略
|
||||||
@@ -1105,7 +1151,7 @@ func (this *FileStorage) increaseHit(key string, hash string, reader Reader) {
|
|||||||
|
|
||||||
// 增加到热点
|
// 增加到热点
|
||||||
// 这里不收录缓存尺寸过大的文件
|
// 这里不收录缓存尺寸过大的文件
|
||||||
if this.memoryStorage != nil && reader.BodySize() > 0 && reader.BodySize() < 128*1024*1024 {
|
if memoryStorage != nil && reader.BodySize() > 0 && reader.BodySize() < 128*1024*1024 {
|
||||||
this.hotMapLocker.Lock()
|
this.hotMapLocker.Lock()
|
||||||
hotItem, ok := this.hotMap[key]
|
hotItem, ok := this.hotMap[key]
|
||||||
|
|
||||||
@@ -1140,3 +1186,84 @@ func (this *FileStorage) removeCacheFile(path string) error {
|
|||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 创建当前策略包含的内存缓存
|
||||||
|
func (this *FileStorage) createMemoryStorage() error {
|
||||||
|
var memoryPolicy = &serverconfigs.HTTPCachePolicy{
|
||||||
|
Id: this.policy.Id,
|
||||||
|
IsOn: this.policy.IsOn,
|
||||||
|
Name: this.policy.Name,
|
||||||
|
Description: this.policy.Description,
|
||||||
|
Capacity: this.options.MemoryPolicy.Capacity,
|
||||||
|
MaxKeys: this.policy.MaxKeys,
|
||||||
|
MaxSize: &shared.SizeCapacity{Count: 128, Unit: shared.SizeCapacityUnitMB}, // TODO 将来可以修改
|
||||||
|
Type: serverconfigs.CachePolicyStorageMemory,
|
||||||
|
Options: this.policy.Options,
|
||||||
|
Life: this.policy.Life,
|
||||||
|
MinLife: this.policy.MinLife,
|
||||||
|
MaxLife: this.policy.MaxLife,
|
||||||
|
|
||||||
|
MemoryAutoPurgeCount: this.policy.MemoryAutoPurgeCount,
|
||||||
|
MemoryAutoPurgeInterval: this.policy.MemoryAutoPurgeInterval,
|
||||||
|
MemoryLFUFreePercent: this.policy.MemoryLFUFreePercent,
|
||||||
|
}
|
||||||
|
err := memoryPolicy.Init()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
var memoryStorage = NewMemoryStorage(memoryPolicy, this)
|
||||||
|
err = memoryStorage.Init()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
this.memoryStorage = memoryStorage
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *FileStorage) initPurgeTicker() {
|
||||||
|
var autoPurgeInterval = this.policy.PersistenceAutoPurgeInterval
|
||||||
|
if autoPurgeInterval <= 0 {
|
||||||
|
autoPurgeInterval = 30
|
||||||
|
if Tea.IsTesting() {
|
||||||
|
autoPurgeInterval = 10
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if this.purgeTicker != nil {
|
||||||
|
this.purgeTicker.Stop()
|
||||||
|
}
|
||||||
|
this.purgeTicker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second)
|
||||||
|
goman.New(func() {
|
||||||
|
for this.purgeTicker.Next() {
|
||||||
|
trackers.Run("FILE_CACHE_STORAGE_PURGE_LOOP", func() {
|
||||||
|
this.purgeLoop()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *FileStorage) initOpenFileCache() {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
var oldOpenFileCache = this.openFileCache
|
||||||
|
|
||||||
|
// 启用新的
|
||||||
|
if this.options.OpenFileCache != nil && this.options.OpenFileCache.IsOn && this.options.OpenFileCache.Max > 0 {
|
||||||
|
this.openFileCache, err = NewOpenFileCache(this.options.OpenFileCache.Max)
|
||||||
|
if err != nil {
|
||||||
|
remotelogs.Error("CACHE", "open file cache failed: "+err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 关闭老的
|
||||||
|
if oldOpenFileCache != nil {
|
||||||
|
oldOpenFileCache.CloseAll()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *FileStorage) runMemoryStorageSafety(f func(memoryStorage *MemoryStorage)) {
|
||||||
|
var memoryStorage = this.memoryStorage
|
||||||
|
if memoryStorage != nil {
|
||||||
|
f(memoryStorage)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -564,8 +564,8 @@ func BenchmarkFileStorage_KeyPath(b *testing.B) {
|
|||||||
runtime.GOMAXPROCS(1)
|
runtime.GOMAXPROCS(1)
|
||||||
|
|
||||||
var storage = &FileStorage{
|
var storage = &FileStorage{
|
||||||
cacheConfig: &serverconfigs.HTTPFileCacheStorage{},
|
options: &serverconfigs.HTTPFileCacheStorage{},
|
||||||
policy: &serverconfigs.HTTPCachePolicy{Id: 1},
|
policy: &serverconfigs.HTTPCachePolicy{Id: 1},
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
|
|||||||
@@ -40,6 +40,12 @@ type StorageInterface interface {
|
|||||||
// Policy 获取当前存储的Policy
|
// Policy 获取当前存储的Policy
|
||||||
Policy() *serverconfigs.HTTPCachePolicy
|
Policy() *serverconfigs.HTTPCachePolicy
|
||||||
|
|
||||||
|
// UpdatePolicy 修改策略
|
||||||
|
UpdatePolicy(newPolicy *serverconfigs.HTTPCachePolicy)
|
||||||
|
|
||||||
|
// CanUpdatePolicy 检查策略是否可以更新
|
||||||
|
CanUpdatePolicy(newPolicy *serverconfigs.HTTPCachePolicy) bool
|
||||||
|
|
||||||
// AddToList 将缓存添加到列表
|
// AddToList 将缓存添加到列表
|
||||||
AddToList(item *Item)
|
AddToList(item *Item)
|
||||||
|
|
||||||
|
|||||||
@@ -89,20 +89,7 @@ func (this *MemoryStorage) Init() error {
|
|||||||
atomic.AddInt64(&this.totalSize, -item.TotalSize())
|
atomic.AddInt64(&this.totalSize, -item.TotalSize())
|
||||||
})
|
})
|
||||||
|
|
||||||
var autoPurgeInterval = this.policy.MemoryAutoPurgeInterval
|
this.initPurgeTicker()
|
||||||
if autoPurgeInterval <= 0 {
|
|
||||||
autoPurgeInterval = 5
|
|
||||||
}
|
|
||||||
|
|
||||||
// 启动定时清理任务
|
|
||||||
this.purgeTicker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second)
|
|
||||||
goman.New(func() {
|
|
||||||
for this.purgeTicker.Next() {
|
|
||||||
var tr = trackers.Begin("MEMORY_CACHE_STORAGE_PURGE_LOOP")
|
|
||||||
this.purgeLoop()
|
|
||||||
tr.End()
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// 启动定时Flush memory to disk任务
|
// 启动定时Flush memory to disk任务
|
||||||
if this.parentStorage != nil {
|
if this.parentStorage != nil {
|
||||||
@@ -323,6 +310,26 @@ func (this *MemoryStorage) Policy() *serverconfigs.HTTPCachePolicy {
|
|||||||
return this.policy
|
return this.policy
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpdatePolicy 修改策略
|
||||||
|
func (this *MemoryStorage) UpdatePolicy(newPolicy *serverconfigs.HTTPCachePolicy) {
|
||||||
|
var oldPolicy = this.policy
|
||||||
|
this.policy = newPolicy
|
||||||
|
|
||||||
|
if oldPolicy.MemoryAutoPurgeInterval != newPolicy.MemoryAutoPurgeInterval {
|
||||||
|
this.initPurgeTicker()
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果是空的,则清空
|
||||||
|
if newPolicy.CapacityBytes() == 0 {
|
||||||
|
_ = this.CleanAll()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CanUpdatePolicy 检查策略是否可以更新
|
||||||
|
func (this *MemoryStorage) CanUpdatePolicy(newPolicy *serverconfigs.HTTPCachePolicy) bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
// AddToList 将缓存添加到列表
|
// AddToList 将缓存添加到列表
|
||||||
func (this *MemoryStorage) AddToList(item *Item) {
|
func (this *MemoryStorage) AddToList(item *Item) {
|
||||||
item.MetaSize = int64(len(item.Key)) + 128 /** 128是我们评估的数据结构的长度 **/
|
item.MetaSize = int64(len(item.Key)) + 128 /** 128是我们评估的数据结构的长度 **/
|
||||||
@@ -497,3 +504,25 @@ func (this *MemoryStorage) deleteWithoutLocker(key string) error {
|
|||||||
_ = this.list.Remove(fmt.Sprintf("%d", hash))
|
_ = this.list.Remove(fmt.Sprintf("%d", hash))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *MemoryStorage) initPurgeTicker() {
|
||||||
|
var autoPurgeInterval = this.policy.MemoryAutoPurgeInterval
|
||||||
|
if autoPurgeInterval <= 0 {
|
||||||
|
autoPurgeInterval = 5
|
||||||
|
}
|
||||||
|
|
||||||
|
// 启动定时清理任务
|
||||||
|
|
||||||
|
if this.purgeTicker != nil {
|
||||||
|
this.purgeTicker.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
this.purgeTicker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second)
|
||||||
|
goman.New(func() {
|
||||||
|
for this.purgeTicker.Next() {
|
||||||
|
var tr = trackers.Begin("MEMORY_CACHE_STORAGE_PURGE_LOOP")
|
||||||
|
this.purgeLoop()
|
||||||
|
tr.End()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user