@@ -1,6 +1,7 @@
package caches
import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
@@ -17,7 +18,6 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/utils/sizes"
"github.com/TeaOSLab/EdgeNode/internal/zero"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
stringutil "github.com/iwind/TeaGo/utils/string"
@@ -60,7 +60,7 @@ var sharedWritingFileKeyLocker = sync.Mutex{}
// [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data]
type FileStorage struct {
policy * serverconfigs . HTTPCachePolicy
cacheConfig * serverconfigs . HTTPFileCacheStorage // 二级缓存
options * serverconfigs . HTTPFileCacheStorage // 二级缓存
memoryStorage * MemoryStorage // 一级缓存
totalSize int64
@@ -92,31 +92,114 @@ func (this *FileStorage) Policy() *serverconfigs.HTTPCachePolicy {
return this . policy
}
// CanUpdatePolicy 检查策略是否可以更新
func ( this * FileStorage ) CanUpdatePolicy ( newPolicy * serverconfigs . HTTPCachePolicy ) bool {
// 检查路径是否有变化
oldOptionsJSON , err := json . Marshal ( this . policy . Options )
if err != nil {
return false
}
var oldOptions = & serverconfigs . HTTPFileCacheStorage { }
err = json . Unmarshal ( oldOptionsJSON , oldOptions )
if err != nil {
return false
}
newOptionsJSON , err := json . Marshal ( newPolicy . Options )
if err != nil {
return false
}
var newOptions = & serverconfigs . HTTPFileCacheStorage { }
err = json . Unmarshal ( newOptionsJSON , newOptions )
if err != nil {
return false
}
if oldOptions . Dir == newOptions . Dir {
return true
}
return false
}
// UpdatePolicy 修改策略
func ( this * FileStorage ) UpdatePolicy ( newPolicy * serverconfigs . HTTPCachePolicy ) {
var oldOpenFileCache = this . options . OpenFileCache
this . policy = newPolicy
newOptionsJSON , err := json . Marshal ( newPolicy . Options )
if err != nil {
return
}
var newOptions = & serverconfigs . HTTPFileCacheStorage { }
err = json . Unmarshal ( newOptionsJSON , newOptions )
if err != nil {
remotelogs . Error ( "CACHE" , "update policy '" + types . String ( this . policy . Id ) + "' failed: decode options failed: " + err . Error ( ) )
return
}
err = newOptions . Init ( )
if err != nil {
remotelogs . Error ( "CACHE" , "update policy '" + types . String ( this . policy . Id ) + "' failed: init options failed: " + err . Error ( ) )
return
}
this . options = newOptions
var memoryStorage = this . memoryStorage
if memoryStorage != nil {
if newOptions . MemoryPolicy != nil && newOptions . MemoryPolicy . CapacityBytes ( ) > 0 {
memoryStorage . UpdatePolicy ( newOptions . MemoryPolicy )
} else {
memoryStorage . Stop ( )
this . memoryStorage = nil
}
} else if newOptions . MemoryPolicy != nil && this . options . MemoryPolicy . Capacity != nil && this . options . MemoryPolicy . Capacity . Count > 0 {
err = this . createMemoryStorage ( )
if err != nil {
remotelogs . Error ( "CACHE" , "update policy '" + types . String ( this . policy . Id ) + "' failed: create memory storage failed: " + err . Error ( ) )
}
}
// open cache
oldOpenFileCacheJSON , _ := json . Marshal ( oldOpenFileCache )
newOpenFileCacheJSON , _ := json . Marshal ( this . options . OpenFileCache )
if bytes . Compare ( oldOpenFileCacheJSON , newOpenFileCacheJSON ) != 0 {
this . initOpenFileCache ( )
}
// Purge Ticker
if newPolicy . PersistenceAutoPurgeInterval != this . policy . PersistenceAutoPurgeInterval {
this . initPurgeTicker ( )
}
}
// Init 初始化
func ( this * FileStorage ) Init ( ) error {
this . locker . Lock ( )
defer this . locker . Unlock ( )
before : = time . Now ( )
var before = time . Now ( )
// 配置
cacheConfig : = & serverconfigs . HTTPFileCacheStorage { }
var options = & serverconfigs . HTTPFileCacheStorage { }
optionsJSON , err := json . Marshal ( this . policy . Options )
if err != nil {
return err
}
err = json . Unmarshal ( optionsJSON , cacheConfig )
err = json . Unmarshal ( optionsJSON , options )
if err != nil {
return err
}
this . cacheConfig = cacheConfig
this . options = options
if ! filepath . IsAbs ( this . cacheConfig . Dir ) {
this . cacheConfig . Dir = Tea . Root + Tea . DS + this . cacheConfig . Dir
if ! filepath . IsAbs ( this . options . Dir ) {
this . options . Dir = Tea . Root + Tea . DS + this . options . Dir
}
this . cacheConfig . Dir = filepath . Clean ( this . cacheConfig . Dir )
var dir = this . cacheConfig . Dir
this . options . Dir = filepath . Clean ( this . options . Dir )
var dir = this . options . Dir
if len ( dir ) == 0 {
return errors . New ( "[CACHE]cache storage dir can not be empty" )
@@ -127,7 +210,7 @@ func (this *FileStorage) Init() error {
if err != nil {
return err
}
list . ( * FileList ) . SetOldDir ( this . cacheConfig . Dir + "/p" + types . String ( this . policy . Id ) )
list . ( * FileList ) . SetOldDir ( this . options . Dir + "/p" + types . String ( this . policy . Id ) )
this . list = list
stat , err := list . Stat ( func ( hash string ) bool {
return true
@@ -158,8 +241,8 @@ func (this *FileStorage) Init() error {
defer func ( ) {
// 统计
count : = stat . Count
size : = stat . Size
var count = stat . Count
var size = stat . Size
cost := time . Since ( before ) . Seconds ( ) * 1000
sizeMB := strconv . FormatInt ( size , 10 ) + " Bytes"
@@ -170,7 +253,7 @@ func (this *FileStorage) Init() error {
} else if size > 1024 {
sizeMB = fmt . Sprintf ( "%.3f K" , float64 ( size ) / 1024 )
}
remotelogs . Println ( "CACHE" , "init policy " + strconv . FormatInt ( this . policy . Id , 10 ) + " from '" + this . cacheConfig . Dir + "', cost: " + fmt . Sprintf ( "%.2f" , cost ) + " ms, count: " + message . NewPrinter ( language . English ) . Sprintf ( "%d" , count ) + ", size: " + sizeMB )
remotelogs . Println ( "CACHE" , "init policy " + strconv . FormatInt ( this . policy . Id , 10 ) + " from '" + this . options . Dir + "', cost: " + fmt . Sprintf ( "%.2f" , cost ) + " ms, count: " + message . NewPrinter ( language . English ) . Sprintf ( "%d" , count ) + ", size: " + sizeMB )
} ( )
// 初始化list
@@ -180,47 +263,15 @@ func (this *FileStorage) Init() error {
}
// 加载内存缓存
if this . cacheConfig . MemoryPolicy != nil {
if this . cacheConfig . MemoryPolicy . Capacity != nil && this . cacheConfig . MemoryPolicy . Capacity . Count > 0 {
memoryPolicy : = & serverconfigs . HTTPCachePolicy {
Id : this . policy . Id ,
IsOn : this . policy . IsOn ,
Name : this . policy . Name ,
Description : this . policy . Description ,
Capacity : this . cacheConfig . MemoryPolicy . Capacity ,
MaxKeys : this . policy . MaxKeys ,
MaxSize : & shared . SizeCapacity { Count : 128 , Unit : shared . SizeCapacityUnitMB } , // TODO 将来可以修改
Type : serverconfigs . CachePolicyStorageMemory ,
Options : this . policy . Options ,
Life : this . policy . Life ,
MinLife : this . policy . MinLife ,
MaxLife : this . policy . MaxLife ,
MemoryAutoPurgeCount : this . policy . MemoryAutoPurgeCount ,
MemoryAutoPurgeInterval : this . policy . MemoryAutoPurgeInterval ,
MemoryLFUFreePercent : this . policy . MemoryLFUFreePercent ,
}
err = memoryPolicy . Init ( )
if err != nil {
return err
}
memoryStorage := NewMemoryStorage ( memoryPolicy , this )
err = memoryStorage . Init ( )
if err != nil {
return err
}
this . memoryStorage = memoryStorage
if this . options . MemoryPolicy != nil && this . options . MemoryPolicy . Capacity != nil && this . options . MemoryPolicy . Capacity . Count > 0 {
err = this . createMemoryStorage ( )
if err ! = nil {
return err
}
}
// open file cache
if this . cacheConfig . OpenFileCache != nil && this . cacheConfig . OpenFileCache . IsOn && this . cacheConfig . OpenFileCache . Max > 0 {
this . openFileCache , err = NewOpenFileCache ( this . cacheConfig . OpenFileCache . Max )
logs . Println ( "start open file cache" )
if err != nil {
remotelogs . Error ( "CACHE" , "open file cache failed: " + err . Error ( ) )
}
}
this . initOpenFileCache ( )
return nil
}
@@ -241,8 +292,9 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
}
// 先尝试内存缓存
if allowMemory && this . memoryStorage != nil {
reader , err := this . memoryStorage. OpenReader ( key , useStale , isPartial )
var memoryStorage = this . memoryStorage
if allowMemory && memoryStorage != nil {
reader , err := memoryStorage . OpenReader ( key , useStale , isPartial )
if err == nil {
return reader , err
}
@@ -264,8 +316,9 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
// TODO 尝试使用mmap加快读取速度
var isOk = false
var openFile * OpenFile
if this . openFileCache != nil {
openFile = this . openFileCache . Get ( path )
var openFileCache = this . openFileCache // 因为中间可能有修改,所以先赋值再获取
if openFileCache != nil {
openFile = openFileCache . Get ( path )
}
var fp * os . File
var err error
@@ -291,12 +344,12 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool,
if isPartial {
var partialFileReader = NewPartialFileReader ( fp )
partialFileReader . openFile = openFile
partialFileReader . openFileCache = this . openFileCache
partialFileReader . openFileCache = openFileCache
reader = partialFileReader
} else {
var fileReader = NewFileReader ( fp )
fileReader . openFile = openFile
fileReader . openFileCache = this . openFileCache
fileReader . openFileCache = openFileCache
reader = fileReader
}
err = reader . Init ( )
@@ -332,8 +385,9 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, siz
if maxSize > maxMemorySize {
maxMemorySize = maxSize
}
if ! isPartial && this . memoryStorage ! = nil && ( ( size > 0 && size < maxMemorySize ) || size < 0 ) {
writer , err := this . memoryStorage. OpenWriter ( key , expiredAt , status , size , maxMemorySize , false )
var memoryStorage = this . memoryStorage
if ! isPartial && memoryStorage != nil && ( ( size > 0 && size < maxMemorySize ) || size < 0 ) {
writer , err := memoryStorage . OpenWriter ( key , expiredAt , status , size , maxMemorySize , false )
if err == nil {
return writer , nil
}
@@ -376,7 +430,7 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, siz
}
var hash = stringutil . Md5 ( key )
var dir = this . cacheConfig . Dir + "/p" + strconv . FormatInt ( this . policy . Id , 10 ) + "/" + hash [ : 2 ] + "/" + hash [ 2 : 4 ]
var dir = this . options . Dir + "/p" + strconv . FormatInt ( this . policy . Id , 10 ) + "/" + hash [ : 2 ] + "/" + hash [ 2 : 4 ]
_ , err = os . Stat ( dir )
if err != nil {
if ! os . IsNotExist ( err ) {
@@ -542,9 +596,10 @@ func (this *FileStorage) AddToList(item *Item) {
return
}
if this . memoryStorage != nil {
var memoryStorage = this . memoryStorage
if memoryStorage != nil {
if item . Type == ItemTypeMemory {
this . memoryStorage. AddToList ( item )
memoryStorage . AddToList ( item )
return
}
}
@@ -568,9 +623,9 @@ func (this *FileStorage) Delete(key string) error {
defer this . locker . Unlock ( )
// 先尝试内存缓存
if this . memoryStorage != nil {
_ = this . memoryStorage. Delete ( key )
}
this . runMemoryStorageSafety ( func ( memoryStorage * MemoryStorage ) {
_ = memoryStorage . Delete ( key )
} )
hash , path := this . keyPath ( key )
err := this . list . Remove ( hash )
@@ -601,9 +656,9 @@ func (this *FileStorage) CleanAll() error {
defer this . locker . Unlock ( )
// 先尝试内存缓存
if this . memoryStorage != nil {
_ = this . memoryStorage. CleanAll ( )
}
this . runMemoryStorageSafety ( func ( memoryStorage * MemoryStorage ) {
_ = memoryStorage . CleanAll ( )
} )
err := this . list . CleanAll ( )
if err != nil {
@@ -677,9 +732,9 @@ func (this *FileStorage) Purge(keys []string, urlType string) error {
defer this . locker . Unlock ( )
// 先尝试内存缓存
if this . memoryStorage != nil {
_ = this . memoryStorage. Purge ( keys , urlType )
}
this . runMemoryStorageSafety ( func ( memoryStorage * MemoryStorage ) {
_ = memoryStorage . Purge ( keys , urlType )
} )
// 目录
if urlType == "dir" {
@@ -715,9 +770,9 @@ func (this *FileStorage) Stop() {
defer this . locker . Unlock ( )
// 先尝试内存缓存
if this . memoryStorage != nil {
this . memoryStorage. Stop ( )
}
this . runMemoryStorageSafety ( func ( memoryStorage * MemoryStorage ) {
memoryStorage . Stop ( )
} )
_ = this . list . Reset ( )
if this . purgeTicker != nil {
@@ -743,10 +798,11 @@ func (this *FileStorage) TotalDiskSize() int64 {
// TotalMemorySize 内存尺寸
func ( this * FileStorage ) TotalMemorySize ( ) int64 {
if this . memoryStorage == nil {
var memoryStorage = this . memoryStorage
if memoryStorage == nil {
return 0
}
return this . memoryStorage. TotalMemorySize ( )
return memoryStorage . TotalMemorySize ( )
}
// IgnoreKey 忽略某个Key, 即不缓存某个Key
@@ -756,13 +812,13 @@ func (this *FileStorage) IgnoreKey(key string) {
// 绝对路径
func ( this * FileStorage ) dir ( ) string {
return this . cacheConfig . Dir + "/p" + strconv . FormatInt ( this . policy . Id , 10 ) + "/"
return this . options . Dir + "/p" + strconv . FormatInt ( this . policy . Id , 10 ) + "/"
}
// 获取Key对应的文件路径
func ( this * FileStorage ) keyPath ( key string ) ( hash string , path string ) {
hash = stringutil . Md5 ( key )
dir := this . cacheConfig . Dir + "/p" + strconv . FormatInt ( this . policy . Id , 10 ) + "/" + hash [ : 2 ] + "/" + hash [ 2 : 4 ]
dir := this . options . Dir + "/p" + strconv . FormatInt ( this . policy . Id , 10 ) + "/" + hash [ : 2 ] + "/" + hash [ 2 : 4 ]
path = dir + "/" + hash + ".cache"
return
}
@@ -772,7 +828,7 @@ func (this *FileStorage) hashPath(hash string) (path string) {
if len ( hash ) != 32 {
return ""
}
dir := this . cacheConfig . Dir + "/p" + strconv . FormatInt ( this . policy . Id , 10 ) + "/" + hash [ : 2 ] + "/" + hash [ 2 : 4 ]
dir := this . options . Dir + "/p" + strconv . FormatInt ( this . policy . Id , 10 ) + "/" + hash [ : 2 ] + "/" + hash [ 2 : 4 ]
path = dir + "/" + hash + ".cache"
return
}
@@ -793,14 +849,22 @@ func (this *FileStorage) initList() error {
})**/
// 启动定时清理任务
var autoPurgeInterval = this . policy . PersistenceAutoPurgeInterval
if autoPurgeInterval <= 0 {
autoPurgeInterval = 30
if Tea . IsTesting ( ) {
autoPurgeInterval = 10
}
this . initPurgeTicker ( )
// 热点处理任务
this . hotTicker = utils . NewTicker ( 1 * time . Minute )
if Tea . IsTesting ( ) {
this . hotTicker = utils . NewTicker ( 10 * time . Second )
}
this . purgeTicker = utils . NewTicker ( time . Duration ( autoPurgeInterval ) * time . Second )
goman . New ( func ( ) {
for this . hotTicker . Next ( ) {
trackers . Run ( "FILE_CACHE_STORAGE_HOT_LOOP" , func ( ) {
this . hotLoop ( )
} )
}
} )
// 退出时停止
events . OnKey ( events . EventQuit , this , func ( ) {
remotelogs . Println ( "CACHE" , "quit clean timer" )
@@ -817,26 +881,6 @@ func (this *FileStorage) initList() error {
}
}
} )
goman . New ( func ( ) {
for this . purgeTicker . Next ( ) {
trackers . Run ( "FILE_CACHE_STORAGE_PURGE_LOOP" , func ( ) {
this . purgeLoop ( )
} )
}
} )
// 热点处理任务
this . hotTicker = utils . NewTicker ( 1 * time . Minute )
if Tea . IsTesting ( ) {
this . hotTicker = utils . NewTicker ( 10 * time . Second )
}
goman . New ( func ( ) {
for this . hotTicker . Next ( ) {
trackers . Run ( "FILE_CACHE_STORAGE_HOT_LOOP" , func ( ) {
this . hotLoop ( )
} )
}
} )
return nil
}
@@ -994,7 +1038,7 @@ func (this *FileStorage) hotLoop() {
expiresAt = bestExpiresAt
}
writer , err := this . memoryStorage. openWriter ( item . Key , expiresAt , reader . Status ( ) , reader . BodySize ( ) , - 1 , false )
writer , err := memoryStorage . openWriter ( item . Key , expiresAt , reader . Status ( ) , reader . BodySize ( ) , - 1 , false )
if err != nil {
if ! CanIgnoreErr ( err ) {
remotelogs . Error ( "CACHE" , "transfer hot item failed: " + err . Error ( ) )
@@ -1030,7 +1074,7 @@ func (this *FileStorage) hotLoop() {
continue
}
this . memoryStorage. AddToList ( & Item {
memoryStorage . AddToList ( & Item {
Type : writer . ItemType ( ) ,
Key : item . Key ,
ExpiredAt : expiresAt ,
@@ -1045,9 +1089,9 @@ func (this *FileStorage) hotLoop() {
}
func ( this * FileStorage ) diskCapacityBytes ( ) int64 {
c1 : = this . policy . CapacityBytes ( )
var c1 = this . policy . CapacityBytes ( )
if SharedManager . MaxDiskCapacity != nil {
c2 : = SharedManager . MaxDiskCapacity . Bytes ( )
var c2 = SharedManager . MaxDiskCapacity . Bytes ( )
if c2 > 0 {
return c2
}
@@ -1097,6 +1141,8 @@ func (this *FileStorage) increaseHit(key string, hash string, reader Reader) {
rate = rate / 10
}
if rands . Int ( 0 , rate ) == 0 {
var memoryStorage = this . memoryStorage
var hitErr = this . list . IncreaseHit ( hash )
if hitErr != nil {
// 此错误可以忽略
@@ -1105,7 +1151,7 @@ func (this *FileStorage) increaseHit(key string, hash string, reader Reader) {
// 增加到热点
// 这里不收录缓存尺寸过大的文件
if this . memoryStorage != nil && reader . BodySize ( ) > 0 && reader . BodySize ( ) < 128 * 1024 * 1024 {
if memoryStorage != nil && reader . BodySize ( ) > 0 && reader . BodySize ( ) < 128 * 1024 * 1024 {
this . hotMapLocker . Lock ( )
hotItem , ok := this . hotMap [ key ]
@@ -1140,3 +1186,84 @@ func (this *FileStorage) removeCacheFile(path string) error {
}
return err
}
// 创建当前策略包含的内存缓存
func ( this * FileStorage ) createMemoryStorage ( ) error {
var memoryPolicy = & serverconfigs . HTTPCachePolicy {
Id : this . policy . Id ,
IsOn : this . policy . IsOn ,
Name : this . policy . Name ,
Description : this . policy . Description ,
Capacity : this . options . MemoryPolicy . Capacity ,
MaxKeys : this . policy . MaxKeys ,
MaxSize : & shared . SizeCapacity { Count : 128 , Unit : shared . SizeCapacityUnitMB } , // TODO 将来可以修改
Type : serverconfigs . CachePolicyStorageMemory ,
Options : this . policy . Options ,
Life : this . policy . Life ,
MinLife : this . policy . MinLife ,
MaxLife : this . policy . MaxLife ,
MemoryAutoPurgeCount : this . policy . MemoryAutoPurgeCount ,
MemoryAutoPurgeInterval : this . policy . MemoryAutoPurgeInterval ,
MemoryLFUFreePercent : this . policy . MemoryLFUFreePercent ,
}
err := memoryPolicy . Init ( )
if err != nil {
return err
}
var memoryStorage = NewMemoryStorage ( memoryPolicy , this )
err = memoryStorage . Init ( )
if err != nil {
return err
}
this . memoryStorage = memoryStorage
return nil
}
func ( this * FileStorage ) initPurgeTicker ( ) {
var autoPurgeInterval = this . policy . PersistenceAutoPurgeInterval
if autoPurgeInterval <= 0 {
autoPurgeInterval = 30
if Tea . IsTesting ( ) {
autoPurgeInterval = 10
}
}
if this . purgeTicker != nil {
this . purgeTicker . Stop ( )
}
this . purgeTicker = utils . NewTicker ( time . Duration ( autoPurgeInterval ) * time . Second )
goman . New ( func ( ) {
for this . purgeTicker . Next ( ) {
trackers . Run ( "FILE_CACHE_STORAGE_PURGE_LOOP" , func ( ) {
this . purgeLoop ( )
} )
}
} )
}
func ( this * FileStorage ) initOpenFileCache ( ) {
var err error
var oldOpenFileCache = this . openFileCache
// 启用新的
if this . options . OpenFileCache != nil && this . options . OpenFileCache . IsOn && this . options . OpenFileCache . Max > 0 {
this . openFileCache , err = NewOpenFileCache ( this . options . OpenFileCache . Max )
if err != nil {
remotelogs . Error ( "CACHE" , "open file cache failed: " + err . Error ( ) )
}
}
// 关闭老的
if oldOpenFileCache != nil {
oldOpenFileCache . CloseAll ( )
}
}
func ( this * FileStorage ) runMemoryStorageSafety ( f func ( memoryStorage * MemoryStorage ) ) {
var memoryStorage = this . memoryStorage
if memoryStorage != nil {
f ( memoryStorage )
}
}