2020-10-04 14:30:42 +08:00
package caches
import (
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
2021-03-02 19:43:05 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
2020-10-28 11:19:06 +08:00
"github.com/TeaOSLab/EdgeNode/internal/events"
2020-12-17 17:36:10 +08:00
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
2021-11-14 10:55:09 +08:00
"github.com/TeaOSLab/EdgeNode/internal/trackers"
2020-10-04 14:30:42 +08:00
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/Tea"
2021-11-13 21:30:24 +08:00
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
2020-10-04 14:30:42 +08:00
stringutil "github.com/iwind/TeaGo/utils/string"
2021-05-24 09:23:51 +08:00
"golang.org/x/text/language"
"golang.org/x/text/message"
2020-10-04 14:30:42 +08:00
"io"
2021-11-13 21:30:24 +08:00
"math"
2020-10-04 14:30:42 +08:00
"os"
"path/filepath"
"regexp"
2021-11-14 16:15:07 +08:00
"sort"
2020-10-04 14:30:42 +08:00
"strconv"
"strings"
"sync"
2020-10-05 20:23:18 +08:00
"sync/atomic"
2021-01-11 23:06:50 +08:00
"syscall"
2020-10-04 14:30:42 +08:00
"time"
)
const (
2021-01-13 12:02:50 +08:00
SizeExpiresAt = 4
SizeStatus = 3
SizeURLLength = 4
SizeHeaderLength = 4
SizeBodyLength = 8
SizeMeta = SizeExpiresAt + SizeStatus + SizeURLLength + SizeHeaderLength + SizeBodyLength
2020-10-04 14:30:42 +08:00
)
2021-11-14 16:15:07 +08:00
const (
HotItemSize = 1024
)
2021-05-12 21:38:44 +08:00
// FileStorage 文件缓存
2021-01-13 12:02:50 +08:00
// 文件结构:
// [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data]
2020-10-04 14:30:42 +08:00
type FileStorage struct {
2021-03-02 19:43:05 +08:00
policy * serverconfigs . HTTPCachePolicy
cacheConfig * serverconfigs . HTTPFileCacheStorage // 二级缓存
memoryStorage * MemoryStorage // 一级缓存
totalSize int64
2020-10-04 14:30:42 +08:00
2021-06-06 23:42:11 +08:00
list ListInterface
writingKeyMap map [ string ] bool // key => bool
locker sync . RWMutex
2021-11-14 16:15:07 +08:00
purgeTicker * utils . Ticker
hotMap map [ string ] * HotItem // key => count
hotMapLocker sync . Mutex
lastHotSize int
hotTicker * utils . Ticker
2020-10-04 14:30:42 +08:00
}
func NewFileStorage ( policy * serverconfigs . HTTPCachePolicy ) * FileStorage {
return & FileStorage {
2021-06-06 23:42:11 +08:00
policy : policy ,
writingKeyMap : map [ string ] bool { } ,
2021-11-14 16:15:07 +08:00
hotMap : map [ string ] * HotItem { } ,
lastHotSize : - 1 ,
2020-10-04 14:30:42 +08:00
}
}
2021-05-12 21:38:44 +08:00
// Policy 获取当前的Policy
2020-10-04 14:30:42 +08:00
func ( this * FileStorage ) Policy ( ) * serverconfigs . HTTPCachePolicy {
return this . policy
}
2021-05-12 21:38:44 +08:00
// Init 初始化
2020-10-04 14:30:42 +08:00
func ( this * FileStorage ) Init ( ) error {
this . locker . Lock ( )
defer this . locker . Unlock ( )
before := time . Now ( )
// 配置
2020-10-05 16:55:14 +08:00
cacheConfig := & serverconfigs . HTTPFileCacheStorage { }
2020-10-04 14:30:42 +08:00
optionsJSON , err := json . Marshal ( this . policy . Options )
if err != nil {
return err
}
err = json . Unmarshal ( optionsJSON , cacheConfig )
if err != nil {
return err
}
this . cacheConfig = cacheConfig
2021-05-19 12:07:35 +08:00
cacheDir := cacheConfig . Dir
2020-10-04 14:30:42 +08:00
if ! filepath . IsAbs ( this . cacheConfig . Dir ) {
this . cacheConfig . Dir = Tea . Root + Tea . DS + this . cacheConfig . Dir
}
dir := this . cacheConfig . Dir
if len ( dir ) == 0 {
return errors . New ( "[CACHE]cache storage dir can not be empty" )
}
2021-05-19 12:07:35 +08:00
list := NewFileList ( dir + "/p" + strconv . FormatInt ( this . policy . Id , 10 ) )
err = list . Init ( )
if err != nil {
return err
}
this . list = list
stat , err := list . Stat ( func ( hash string ) bool {
return true
} )
if err != nil {
return err
}
this . totalSize = stat . Size
this . list . OnAdd ( func ( item * Item ) {
atomic . AddInt64 ( & this . totalSize , item . TotalSize ( ) )
} )
this . list . OnRemove ( func ( item * Item ) {
atomic . AddInt64 ( & this . totalSize , - item . TotalSize ( ) )
} )
2020-10-04 14:30:42 +08:00
// 检查目录是否存在
_ , err = os . Stat ( dir )
if err != nil {
if ! os . IsNotExist ( err ) {
return err
} else {
err = os . MkdirAll ( dir , 0777 )
if err != nil {
return errors . New ( "[CACHE]can not create dir:" + err . Error ( ) )
}
}
}
2021-05-19 12:07:35 +08:00
defer func ( ) {
// 统计
2021-05-24 09:23:51 +08:00
count := stat . Count
size := stat . Size
2021-05-19 12:07:35 +08:00
cost := time . Since ( before ) . Seconds ( ) * 1000
sizeMB := strconv . FormatInt ( size , 10 ) + " Bytes"
if size > 1024 * 1024 * 1024 {
sizeMB = fmt . Sprintf ( "%.3f G" , float64 ( size ) / 1024 / 1024 / 1024 )
} else if size > 1024 * 1024 {
sizeMB = fmt . Sprintf ( "%.3f M" , float64 ( size ) / 1024 / 1024 )
} else if size > 1024 {
sizeMB = fmt . Sprintf ( "%.3f K" , float64 ( size ) / 1024 )
}
2021-05-24 09:23:51 +08:00
remotelogs . Println ( "CACHE" , "init policy " + strconv . FormatInt ( this . policy . Id , 10 ) + " from '" + cacheDir + "', cost: " + fmt . Sprintf ( "%.2f" , cost ) + " ms, count: " + message . NewPrinter ( language . English ) . Sprintf ( "%d" , count ) + ", size: " + sizeMB )
2021-05-19 12:07:35 +08:00
} ( )
2020-10-04 14:30:42 +08:00
// 初始化list
err = this . initList ( )
if err != nil {
return err
}
2021-03-02 19:43:05 +08:00
// 加载内存缓存
if this . cacheConfig . MemoryPolicy != nil {
2021-05-12 21:38:44 +08:00
if this . cacheConfig . MemoryPolicy . Capacity != nil && this . cacheConfig . MemoryPolicy . Capacity . Count > 0 {
2021-03-02 19:43:05 +08:00
memoryPolicy := & serverconfigs . HTTPCachePolicy {
Id : this . policy . Id ,
IsOn : this . policy . IsOn ,
Name : this . policy . Name ,
Description : this . policy . Description ,
2021-05-12 21:38:44 +08:00
Capacity : this . cacheConfig . MemoryPolicy . Capacity ,
2021-03-02 19:43:05 +08:00
MaxKeys : this . policy . MaxKeys ,
MaxSize : & shared . SizeCapacity { Count : 128 , Unit : shared . SizeCapacityUnitMB } , // TODO 将来可以修改
Type : serverconfigs . CachePolicyStorageMemory ,
Options : this . policy . Options ,
Life : this . policy . Life ,
MinLife : this . policy . MinLife ,
MaxLife : this . policy . MaxLife ,
2021-11-13 21:30:24 +08:00
MemoryAutoPurgeCount : this . policy . MemoryAutoPurgeCount ,
MemoryAutoPurgeInterval : this . policy . MemoryAutoPurgeInterval ,
MemoryLFUFreePercent : this . policy . MemoryLFUFreePercent ,
2021-03-02 19:43:05 +08:00
}
err = memoryPolicy . Init ( )
if err != nil {
return err
}
2021-11-13 21:30:24 +08:00
memoryStorage := NewMemoryStorage ( memoryPolicy , this )
2021-03-02 19:43:05 +08:00
err = memoryStorage . Init ( )
if err != nil {
return err
}
this . memoryStorage = memoryStorage
}
}
2020-10-04 14:30:42 +08:00
return nil
}
2021-01-13 12:02:50 +08:00
func ( this * FileStorage ) OpenReader ( key string ) ( Reader , error ) {
2021-11-14 16:15:07 +08:00
return this . openReader ( key , true )
}
func ( this * FileStorage ) openReader ( key string , allowMemory bool ) ( Reader , error ) {
2021-03-02 19:43:05 +08:00
// 先尝试内存缓存
2021-11-14 16:15:07 +08:00
if allowMemory && this . memoryStorage != nil {
2021-03-02 19:43:05 +08:00
reader , err := this . memoryStorage . OpenReader ( key )
if err == nil {
return reader , err
}
}
2021-06-13 17:37:57 +08:00
hash , path := this . keyPath ( key )
2020-10-04 14:30:42 +08:00
// TODO 尝试使用mmap加快读取速度
2021-06-13 17:37:57 +08:00
var isOk = false
2020-10-04 14:30:42 +08:00
fp , err := os . OpenFile ( path , os . O_RDONLY , 0444 )
if err != nil {
if ! os . IsNotExist ( err ) {
2021-01-13 12:02:50 +08:00
return nil , err
2020-10-04 14:30:42 +08:00
}
2021-01-13 12:02:50 +08:00
return nil , ErrNotFound
2020-10-04 14:30:42 +08:00
}
2021-06-13 17:37:57 +08:00
defer func ( ) {
if ! isOk {
_ = fp . Close ( )
_ = os . Remove ( path )
}
} ( )
// 检查文件记录是否已过期
exists , err := this . list . Exist ( hash )
if err != nil {
return nil , err
}
if ! exists {
return nil , ErrNotFound
}
2020-10-04 14:30:42 +08:00
2021-01-13 12:02:50 +08:00
reader := NewFileReader ( fp )
2020-10-04 14:30:42 +08:00
if err != nil {
2021-01-13 12:02:50 +08:00
return nil , err
2020-10-04 14:30:42 +08:00
}
2021-01-13 12:02:50 +08:00
err = reader . Init ( )
2020-10-04 14:30:42 +08:00
if err != nil {
2021-01-13 12:02:50 +08:00
return nil , err
2020-10-04 14:30:42 +08:00
}
2021-06-13 17:37:57 +08:00
2021-11-13 21:30:24 +08:00
// 增加点击量
// 1/1000采样
2021-11-14 16:15:07 +08:00
if allowMemory {
var rate = this . policy . PersistenceHitSampleRate
if rate <= 0 {
rate = 1000
}
if this . lastHotSize == 0 {
// 自动降低采样率来增加热点数据的缓存几率
rate = rate / 10
}
if rands . Int ( 0 , rate ) == 0 {
var hitErr = this . list . IncreaseHit ( hash )
if hitErr != nil {
// 此错误可以忽略
remotelogs . Error ( "CACHE" , "increase hit failed: " + hitErr . Error ( ) )
}
// 增加到热点
// 这里不收录缓存尺寸过大的文件
if this . memoryStorage != nil && reader . BodySize ( ) > 0 && reader . BodySize ( ) < 128 * 1024 * 1024 {
this . hotMapLocker . Lock ( )
hotItem , ok := this . hotMap [ key ]
if ok {
hotItem . Hits ++
hotItem . ExpiresAt = reader . expiresAt
} else if len ( this . hotMap ) < HotItemSize { // 控制数量
this . hotMap [ key ] = & HotItem {
Key : key ,
ExpiresAt : reader . ExpiresAt ( ) ,
Status : reader . Status ( ) ,
Hits : 1 ,
}
}
this . hotMapLocker . Unlock ( )
}
2021-11-13 21:30:24 +08:00
}
}
2021-06-13 17:37:57 +08:00
isOk = true
2021-01-13 12:02:50 +08:00
return reader , nil
2020-10-04 14:30:42 +08:00
}
2021-05-12 21:38:44 +08:00
// OpenWriter 打开缓存文件等待写入
2021-01-13 12:02:50 +08:00
func ( this * FileStorage ) OpenWriter ( key string , expiredAt int64 , status int ) ( Writer , error ) {
2021-03-02 19:43:05 +08:00
// 先尝试内存缓存
if this . memoryStorage != nil {
writer , err := this . memoryStorage . OpenWriter ( key , expiredAt , status )
if err == nil {
return writer , nil
}
}
2021-06-06 23:42:11 +08:00
// 是否正在写入
var isWriting = false
this . locker . Lock ( )
_ , ok := this . writingKeyMap [ key ]
this . locker . Unlock ( )
if ok {
return nil , ErrFileIsWriting
}
this . locker . Lock ( )
this . writingKeyMap [ key ] = true
this . locker . Unlock ( )
defer func ( ) {
if ! isWriting {
this . locker . Lock ( )
delete ( this . writingKeyMap , key )
this . locker . Unlock ( )
}
} ( )
2020-10-05 19:15:35 +08:00
// 检查是否超出最大值
2021-05-19 12:07:35 +08:00
count , err := this . list . Count ( )
if err != nil {
return nil , err
}
if this . policy . MaxKeys > 0 && count > this . policy . MaxKeys {
2021-06-08 11:24:41 +08:00
return nil , NewCapacityError ( "write file cache failed: too many keys in cache storage" )
2020-10-05 20:23:18 +08:00
}
2021-05-12 21:38:44 +08:00
capacityBytes := this . diskCapacityBytes ( )
if capacityBytes > 0 && capacityBytes <= this . totalSize {
2021-06-08 11:24:41 +08:00
return nil , NewCapacityError ( "write file cache failed: over disk size, current total size: " + strconv . FormatInt ( this . totalSize , 10 ) + " bytes, capacity: " + strconv . FormatInt ( capacityBytes , 10 ) )
2020-10-05 19:15:35 +08:00
}
2020-10-04 14:30:42 +08:00
hash := stringutil . Md5 ( key )
dir := this . cacheConfig . Dir + "/p" + strconv . FormatInt ( this . policy . Id , 10 ) + "/" + hash [ : 2 ] + "/" + hash [ 2 : 4 ]
2021-05-19 12:07:35 +08:00
_ , err = os . Stat ( dir )
2020-10-04 14:30:42 +08:00
if err != nil {
if ! os . IsNotExist ( err ) {
return nil , err
}
err = os . MkdirAll ( dir , 0777 )
if err != nil {
return nil , err
}
}
2020-10-05 20:23:18 +08:00
// 先删除
2021-05-19 12:07:35 +08:00
err = this . list . Remove ( hash )
if err != nil {
return nil , err
}
2020-10-05 20:23:18 +08:00
2021-01-13 12:02:50 +08:00
path := dir + "/" + hash + ".cache.tmp"
2021-01-11 23:06:50 +08:00
writer , err := os . OpenFile ( path , os . O_CREATE | os . O_SYNC | os . O_WRONLY , 0666 )
2020-10-04 14:30:42 +08:00
if err != nil {
return nil , err
}
2021-06-06 23:42:11 +08:00
isWriting = true
2020-10-04 14:30:42 +08:00
isOk := false
2021-01-11 23:06:50 +08:00
removeOnFailure := true
2020-10-04 14:30:42 +08:00
defer func ( ) {
if err != nil {
isOk = false
}
// 如果出错了,就删除文件,避免写一半
if ! isOk {
_ = writer . Close ( )
2021-01-11 23:06:50 +08:00
if removeOnFailure {
_ = os . Remove ( path )
}
2020-10-04 14:30:42 +08:00
}
} ( )
2021-01-11 23:06:50 +08:00
// 尝试锁定,如果锁定失败,则直接返回
err = syscall . Flock ( int ( writer . Fd ( ) ) , syscall . LOCK_EX | syscall . LOCK_NB )
if err != nil {
removeOnFailure = false
return nil , ErrFileIsWriting
}
err = writer . Truncate ( 0 )
if err != nil {
return nil , err
}
2020-10-04 14:30:42 +08:00
// 写入过期时间
2021-01-13 12:02:50 +08:00
bytes4 := make ( [ ] byte , 4 )
{
binary . BigEndian . PutUint32 ( bytes4 , uint32 ( expiredAt ) )
_ , err = writer . Write ( bytes4 )
if err != nil {
return nil , err
}
2020-10-04 14:30:42 +08:00
}
2021-01-13 12:02:50 +08:00
// 写入状态码
if status > 999 || status < 100 {
status = 200
2020-10-04 14:30:42 +08:00
}
2021-01-13 12:02:50 +08:00
_ , err = writer . WriteString ( strconv . Itoa ( status ) )
2020-10-04 14:30:42 +08:00
if err != nil {
return nil , err
}
2021-01-13 12:02:50 +08:00
// 写入URL长度
{
binary . BigEndian . PutUint32 ( bytes4 , uint32 ( len ( key ) ) )
_ , err = writer . Write ( bytes4 )
2020-10-04 14:30:42 +08:00
if err != nil {
2021-01-13 12:02:50 +08:00
return nil , err
2020-10-04 14:30:42 +08:00
}
}
2021-01-13 12:02:50 +08:00
// 写入Header Length
{
binary . BigEndian . PutUint32 ( bytes4 , uint32 ( 0 ) )
_ , err = writer . Write ( bytes4 )
2020-10-04 14:30:42 +08:00
if err != nil {
2021-01-13 12:02:50 +08:00
return nil , err
2020-10-04 14:30:42 +08:00
}
}
2021-01-13 12:02:50 +08:00
// 写入Body Length
{
b := make ( [ ] byte , SizeBodyLength )
binary . BigEndian . PutUint64 ( b , uint64 ( 0 ) )
_ , err = writer . Write ( b )
if err != nil {
return nil , err
}
2020-10-04 14:30:42 +08:00
}
2021-01-13 12:02:50 +08:00
// 写入URL
_ , err = writer . WriteString ( key )
2020-10-04 14:30:42 +08:00
if err != nil {
2021-01-13 12:02:50 +08:00
return nil , err
2020-10-04 14:30:42 +08:00
}
isOk = true
2021-06-06 23:42:11 +08:00
return NewFileWriter ( writer , key , expiredAt , func ( ) {
this . locker . Lock ( )
delete ( this . writingKeyMap , key )
this . locker . Unlock ( )
} ) , nil
2020-10-04 14:30:42 +08:00
}
2021-05-12 21:38:44 +08:00
// AddToList 添加到List
2020-10-04 14:30:42 +08:00
func ( this * FileStorage ) AddToList ( item * Item ) {
2021-03-02 19:43:05 +08:00
if this . memoryStorage != nil {
if item . Type == ItemTypeMemory {
this . memoryStorage . AddToList ( item )
return
}
}
2021-11-13 21:30:24 +08:00
item . MetaSize = SizeMeta + 128
2020-10-04 14:30:42 +08:00
hash := stringutil . Md5 ( item . Key )
2021-05-19 12:07:35 +08:00
err := this . list . Add ( hash , item )
if err != nil && ! strings . Contains ( err . Error ( ) , "UNIQUE constraint failed" ) {
remotelogs . Error ( "CACHE" , "add to list failed: " + err . Error ( ) )
}
2020-10-04 14:30:42 +08:00
}
2021-05-12 21:38:44 +08:00
// Delete 删除某个键值对应的缓存
2020-10-04 14:30:42 +08:00
func ( this * FileStorage ) Delete ( key string ) error {
this . locker . Lock ( )
defer this . locker . Unlock ( )
2021-03-02 19:43:05 +08:00
// 先尝试内存缓存
if this . memoryStorage != nil {
_ = this . memoryStorage . Delete ( key )
}
2020-10-04 14:30:42 +08:00
hash , path := this . keyPath ( key )
2021-05-19 12:07:35 +08:00
err := this . list . Remove ( hash )
if err != nil {
return err
}
err = os . Remove ( path )
2020-10-04 14:30:42 +08:00
if err == nil || os . IsNotExist ( err ) {
return nil
}
return err
}
2021-05-12 21:38:44 +08:00
// Stat 统计
2020-10-04 14:30:42 +08:00
func ( this * FileStorage ) Stat ( ) ( * Stat , error ) {
this . locker . RLock ( )
defer this . locker . RUnlock ( )
return this . list . Stat ( func ( hash string ) bool {
return true
2021-05-19 12:07:35 +08:00
} )
2020-10-04 14:30:42 +08:00
}
2021-05-12 21:38:44 +08:00
// CleanAll 清除所有的缓存
2020-10-04 14:30:42 +08:00
func ( this * FileStorage ) CleanAll ( ) error {
this . locker . Lock ( )
defer this . locker . Unlock ( )
2021-03-02 19:43:05 +08:00
// 先尝试内存缓存
if this . memoryStorage != nil {
_ = this . memoryStorage . CleanAll ( )
}
2021-05-19 12:07:35 +08:00
err := this . list . CleanAll ( )
if err != nil {
return err
}
2020-10-04 14:30:42 +08:00
// 删除缓存和目录
// 不能直接删除子目录,比较危险
dir := this . dir ( )
fp , err := os . Open ( dir )
if err != nil {
return err
}
defer func ( ) {
_ = fp . Close ( )
} ( )
stat , err := fp . Stat ( )
if err != nil {
return err
}
if ! stat . IsDir ( ) {
return nil
}
2021-05-25 18:28:24 +08:00
// 改成待删除
2020-10-04 14:30:42 +08:00
subDirs , err := fp . Readdir ( - 1 )
if err != nil {
return err
}
for _ , info := range subDirs {
subDir := info . Name ( )
// 检查目录名
ok , err := regexp . MatchString ( ` ^[0-9a-f] { 2}$ ` , subDir )
if err != nil {
return err
}
if ! ok {
continue
}
2021-05-25 18:28:24 +08:00
// 修改目录名
tmpDir := dir + "/" + subDir + "-deleted"
err = os . Rename ( dir + "/" + subDir , tmpDir )
if err != nil {
return err
}
}
// 重新遍历待删除
2021-06-17 21:13:21 +08:00
go func ( ) {
err = this . cleanDeletedDirs ( dir )
2020-10-04 14:30:42 +08:00
if err != nil {
2021-06-17 21:13:21 +08:00
remotelogs . Warn ( "CACHE" , "delete '*-deleted' dirs failed: " + err . Error ( ) )
2020-10-04 14:30:42 +08:00
}
2021-06-17 21:13:21 +08:00
} ( )
2020-10-04 14:30:42 +08:00
return nil
}
2021-05-12 21:38:44 +08:00
// Purge 清理过期的缓存
2020-12-23 21:28:50 +08:00
func ( this * FileStorage ) Purge ( keys [ ] string , urlType string ) error {
2020-10-04 14:30:42 +08:00
this . locker . Lock ( )
defer this . locker . Unlock ( )
2021-03-02 19:43:05 +08:00
// 先尝试内存缓存
if this . memoryStorage != nil {
_ = this . memoryStorage . Purge ( keys , urlType )
}
2020-12-23 21:28:50 +08:00
// 目录
if urlType == "dir" {
for _ , key := range keys {
2021-06-13 17:37:57 +08:00
err := this . list . CleanPrefix ( key )
2021-05-19 12:07:35 +08:00
if err != nil {
return err
}
2020-12-23 21:28:50 +08:00
}
}
// 文件
2020-10-04 14:30:42 +08:00
for _ , key := range keys {
hash , path := this . keyPath ( key )
2021-06-13 17:37:57 +08:00
err := os . Remove ( path )
2020-10-04 14:30:42 +08:00
if err != nil && ! os . IsNotExist ( err ) {
return err
}
2021-05-19 12:07:35 +08:00
err = this . list . Remove ( hash )
if err != nil {
return err
}
2020-10-04 14:30:42 +08:00
}
return nil
}
2021-05-12 21:38:44 +08:00
// Stop 停止
2020-10-04 14:30:42 +08:00
func ( this * FileStorage ) Stop ( ) {
this . locker . Lock ( )
defer this . locker . Unlock ( )
2021-03-02 19:43:05 +08:00
// 先尝试内存缓存
if this . memoryStorage != nil {
this . memoryStorage . Stop ( )
}
2021-05-24 09:23:51 +08:00
_ = this . list . Reset ( )
2021-11-14 16:15:07 +08:00
if this . purgeTicker != nil {
this . purgeTicker . Stop ( )
}
if this . hotTicker != nil {
this . hotTicker . Stop ( )
2020-10-04 14:30:42 +08:00
}
2021-06-13 17:37:57 +08:00
_ = this . list . Close ( )
2020-10-04 14:30:42 +08:00
}
2021-05-13 11:50:36 +08:00
// TotalDiskSize 消耗的磁盘尺寸
func ( this * FileStorage ) TotalDiskSize ( ) int64 {
return atomic . LoadInt64 ( & this . totalSize )
}
// TotalMemorySize 内存尺寸
func ( this * FileStorage ) TotalMemorySize ( ) int64 {
if this . memoryStorage == nil {
return 0
}
return this . memoryStorage . TotalMemorySize ( )
}
2020-10-04 14:30:42 +08:00
// 绝对路径
func ( this * FileStorage ) dir ( ) string {
return this . cacheConfig . Dir + "/p" + strconv . FormatInt ( this . policy . Id , 10 ) + "/"
}
// 获取Key对应的文件路径
func ( this * FileStorage ) keyPath ( key string ) ( hash string , path string ) {
hash = stringutil . Md5 ( key )
dir := this . cacheConfig . Dir + "/p" + strconv . FormatInt ( this . policy . Id , 10 ) + "/" + hash [ : 2 ] + "/" + hash [ 2 : 4 ]
path = dir + "/" + hash + ".cache"
return
}
// 获取Hash对应的文件路径
func ( this * FileStorage ) hashPath ( hash string ) ( path string ) {
if len ( hash ) != 32 {
return ""
}
dir := this . cacheConfig . Dir + "/p" + strconv . FormatInt ( this . policy . Id , 10 ) + "/" + hash [ : 2 ] + "/" + hash [ 2 : 4 ]
path = dir + "/" + hash + ".cache"
return
}
// 初始化List
func ( this * FileStorage ) initList ( ) error {
2021-05-19 12:07:35 +08:00
err := this . list . Reset ( )
if err != nil {
return err
}
2020-10-04 14:30:42 +08:00
2021-05-23 22:59:00 +08:00
// 使用异步防止阻塞主线程
2021-11-21 16:10:07 +08:00
/ * * go func ( ) {
2021-05-23 22:59:00 +08:00
dir := this . dir ( )
2021-01-13 12:02:50 +08:00
2021-05-23 22:59:00 +08:00
// 清除tmp
2021-11-21 16:10:07 +08:00
// TODO 需要一个更加高效的实现
} ( ) * * /
2021-01-13 12:02:50 +08:00
2020-10-04 14:30:42 +08:00
// 启动定时清理任务
2021-11-13 21:30:24 +08:00
var autoPurgeInterval = this . policy . PersistenceAutoPurgeInterval
if autoPurgeInterval <= 0 {
autoPurgeInterval = 30
if Tea . IsTesting ( ) {
autoPurgeInterval = 10
}
}
2021-11-14 16:15:07 +08:00
this . purgeTicker = utils . NewTicker ( time . Duration ( autoPurgeInterval ) * time . Second )
2020-10-28 11:19:06 +08:00
events . On ( events . EventQuit , func ( ) {
2020-12-17 17:36:10 +08:00
remotelogs . Println ( "CACHE" , "quit clean timer" )
2021-11-14 16:15:07 +08:00
var ticker = this . purgeTicker
2020-10-28 11:19:06 +08:00
if ticker != nil {
ticker . Stop ( )
}
} )
2020-10-04 14:30:42 +08:00
go func ( ) {
2021-11-14 16:15:07 +08:00
for this . purgeTicker . Next ( ) {
trackers . Run ( "FILE_CACHE_STORAGE_PURGE_LOOP" , func ( ) {
this . purgeLoop ( )
} )
}
} ( )
// 热点处理任务
this . hotTicker = utils . NewTicker ( 1 * time . Minute )
if Tea . IsTesting ( ) {
this . hotTicker = utils . NewTicker ( 10 * time . Second )
}
go func ( ) {
for this . hotTicker . Next ( ) {
trackers . Run ( "FILE_CACHE_STORAGE_HOT_LOOP" , func ( ) {
this . hotLoop ( )
} )
2020-10-04 14:30:42 +08:00
}
} ( )
return nil
}
// 解析文件信息
func ( this * FileStorage ) decodeFile ( path string ) ( * Item , error ) {
fp , err := os . OpenFile ( path , os . O_RDONLY , 0444 )
if err != nil {
return nil , err
}
2021-01-13 12:02:50 +08:00
isAllOk := false
2020-10-04 14:30:42 +08:00
defer func ( ) {
_ = fp . Close ( )
2021-01-13 12:02:50 +08:00
if ! isAllOk {
_ = os . Remove ( path )
}
2020-10-04 14:30:42 +08:00
} ( )
2021-01-13 12:02:50 +08:00
item := & Item {
2021-03-02 19:43:05 +08:00
Type : ItemTypeFile ,
2021-01-13 12:02:50 +08:00
MetaSize : SizeMeta ,
}
bytes4 := make ( [ ] byte , 4 )
// 过期时间
ok , err := this . readToBuff ( fp , bytes4 )
2020-10-04 14:30:42 +08:00
if err != nil {
return nil , err
}
2021-01-13 12:02:50 +08:00
if ! ok {
2020-10-04 14:30:42 +08:00
return nil , ErrNotFound
}
2021-01-13 12:02:50 +08:00
item . ExpiredAt = int64 ( binary . BigEndian . Uint32 ( bytes4 ) )
// 是否已过期
if item . ExpiredAt < time . Now ( ) . Unix ( ) {
2020-10-04 14:30:42 +08:00
return nil , ErrNotFound
}
2021-01-13 12:02:50 +08:00
// URL Size
_ , err = fp . Seek ( int64 ( SizeExpiresAt + SizeStatus ) , io . SeekStart )
2020-10-04 14:30:42 +08:00
if err != nil {
return nil , err
}
2021-01-13 12:02:50 +08:00
ok , err = this . readToBuff ( fp , bytes4 )
if err != nil {
return nil , err
}
if ! ok {
return nil , ErrNotFound
}
urlSize := binary . BigEndian . Uint32 ( bytes4 )
2020-10-04 14:30:42 +08:00
2021-01-13 12:02:50 +08:00
// Header Size
ok , err = this . readToBuff ( fp , bytes4 )
2020-10-04 14:30:42 +08:00
if err != nil {
return nil , err
}
2021-01-13 12:02:50 +08:00
if ! ok {
2020-10-04 14:30:42 +08:00
return nil , ErrNotFound
}
2021-01-13 12:02:50 +08:00
item . HeaderSize = int64 ( binary . BigEndian . Uint32 ( bytes4 ) )
2020-10-04 14:30:42 +08:00
2021-01-13 12:02:50 +08:00
// Body Size
bytes8 := make ( [ ] byte , 8 )
ok , err = this . readToBuff ( fp , bytes8 )
2020-10-04 14:30:42 +08:00
if err != nil {
return nil , err
}
2021-01-13 12:02:50 +08:00
if ! ok {
return nil , ErrNotFound
}
item . BodySize = int64 ( binary . BigEndian . Uint64 ( bytes8 ) )
// URL
if urlSize > 0 {
data := utils . BytePool1024 . Get ( )
result , ok , err := this . readN ( fp , data , int ( urlSize ) )
utils . BytePool1024 . Put ( data )
if err != nil {
return nil , err
}
if ! ok {
return nil , ErrNotFound
}
item . Key = string ( result )
}
isAllOk = true
2020-10-04 14:30:42 +08:00
return item , nil
}
// 清理任务
func ( this * FileStorage ) purgeLoop ( ) {
2021-11-13 21:30:24 +08:00
// 计算是否应该开启LFU清理
var capacityBytes = this . policy . CapacityBytes ( )
var startLFU = false
var usedPercent = float32 ( this . TotalDiskSize ( ) * 100 ) / float32 ( capacityBytes )
var lfuFreePercent = this . policy . PersistenceLFUFreePercent
if lfuFreePercent <= 0 {
lfuFreePercent = 5
}
if capacityBytes > 0 {
if lfuFreePercent < 100 {
if usedPercent >= 100 - lfuFreePercent {
startLFU = true
}
}
}
// 清理过期
{
var times = 1
// 空闲时间多清理
if utils . SharedFreeHoursManager . IsFreeHour ( ) {
times = 5
}
// 处于LFU阈值时, 多清理
if startLFU {
times = 5
}
var purgeCount = this . policy . PersistenceAutoPurgeCount
if purgeCount <= 0 {
purgeCount = 1000
}
for i := 0 ; i < times ; i ++ {
countFound , err := this . list . Purge ( purgeCount , func ( hash string ) error {
path := this . hashPath ( hash )
err := os . Remove ( path )
if err != nil && ! os . IsNotExist ( err ) {
remotelogs . Error ( "CACHE" , "purge '" + path + "' error: " + err . Error ( ) )
}
return nil
} )
if err != nil {
remotelogs . Warn ( "CACHE" , "purge file storage failed: " + err . Error ( ) )
continue
}
if countFound < purgeCount {
break
}
time . Sleep ( 1 * time . Second )
}
}
// 磁盘空间不足时,清除老旧的缓存
if startLFU {
var total , _ = this . list . Count ( )
if total > 0 {
var count = types . Int ( math . Ceil ( float64 ( total ) * float64 ( lfuFreePercent * 2 ) / 100 ) )
if count > 0 {
// 限制单次清理的条数,防止占用太多系统资源
if count > 2000 {
count = 2000
}
remotelogs . Println ( "CACHE" , "LFU purge policy '" + this . policy . Name + "' id: " + types . String ( this . policy . Id ) + ", count: " + types . String ( count ) )
err := this . list . PurgeLFU ( count , func ( hash string ) error {
path := this . hashPath ( hash )
err := os . Remove ( path )
if err != nil && ! os . IsNotExist ( err ) {
remotelogs . Error ( "CACHE" , "purge '" + path + "' error: " + err . Error ( ) )
}
return nil
} )
if err != nil {
remotelogs . Warn ( "CACHE" , "purge file storage in LFU failed: " + err . Error ( ) )
}
}
2020-10-04 14:30:42 +08:00
}
2021-06-13 17:37:57 +08:00
}
2020-10-04 14:30:42 +08:00
}
2021-01-13 12:02:50 +08:00
2021-11-14 16:15:07 +08:00
// 热点数据任务
func ( this * FileStorage ) hotLoop ( ) {
var memoryStorage = this . memoryStorage
if memoryStorage == nil {
return
}
this . hotMapLocker . Lock ( )
if len ( this . hotMap ) == 0 {
this . hotMapLocker . Unlock ( )
this . lastHotSize = 0
return
}
this . lastHotSize = len ( this . hotMap )
var result = [ ] * HotItem { } // [ {key: ..., hits: ...}, ... ]
for _ , v := range this . hotMap {
result = append ( result , v )
}
this . hotMap = map [ string ] * HotItem { }
this . hotMapLocker . Unlock ( )
// 取Top10
if len ( result ) > 0 {
sort . Slice ( result , func ( i , j int ) bool {
return result [ i ] . Hits > result [ j ] . Hits
} )
var size = 1
if len ( result ) < 10 {
size = 1
} else {
size = len ( result ) / 10
}
var buf = make ( [ ] byte , 32 * 1024 )
for _ , item := range result [ : size ] {
reader , err := this . openReader ( item . Key , false )
if err != nil {
continue
}
if reader == nil {
continue
}
if reader . ExpiresAt ( ) <= time . Now ( ) . Unix ( ) {
continue
}
writer , err := this . memoryStorage . openWriter ( item . Key , item . ExpiresAt , item . Status , false )
if err != nil {
if ! CanIgnoreErr ( err ) {
remotelogs . Error ( "CACHE" , "transfer hot item failed: " + err . Error ( ) )
}
_ = reader . Close ( )
continue
}
if writer == nil {
_ = reader . Close ( )
continue
}
err = reader . ReadHeader ( buf , func ( n int ) ( goNext bool , err error ) {
_ , err = writer . WriteHeader ( buf [ : n ] )
return
} )
if err != nil {
_ = reader . Close ( )
_ = writer . Discard ( )
continue
}
err = reader . ReadBody ( buf , func ( n int ) ( goNext bool , err error ) {
_ , err = writer . Write ( buf [ : n ] )
return
} )
if err != nil {
_ = reader . Close ( )
_ = writer . Discard ( )
continue
}
this . memoryStorage . AddToList ( & Item {
Type : writer . ItemType ( ) ,
Key : item . Key ,
ExpiredAt : item . ExpiresAt ,
HeaderSize : writer . HeaderSize ( ) ,
BodySize : writer . BodySize ( ) ,
} )
_ = reader . Close ( )
_ = writer . Close ( )
}
}
}
2021-01-13 12:02:50 +08:00
func ( this * FileStorage ) readToBuff ( fp * os . File , buf [ ] byte ) ( ok bool , err error ) {
n , err := fp . Read ( buf )
if err != nil {
return false , err
}
ok = n == len ( buf )
return
}
func ( this * FileStorage ) readN ( fp * os . File , buf [ ] byte , total int ) ( result [ ] byte , ok bool , err error ) {
for {
n , err := fp . Read ( buf )
if err != nil {
return nil , false , err
}
if n > 0 {
if n >= total {
result = append ( result , buf [ : total ] ... )
ok = true
return result , ok , nil
} else {
total -= n
result = append ( result , buf [ : n ] ... )
}
}
}
}
2021-05-12 21:38:44 +08:00
func ( this * FileStorage ) diskCapacityBytes ( ) int64 {
c1 := this . policy . CapacityBytes ( )
if SharedManager . MaxDiskCapacity != nil {
c2 := SharedManager . MaxDiskCapacity . Bytes ( )
if c2 > 0 {
return c2
}
}
return c1
}
2021-06-17 21:13:21 +08:00
// 清理 *-deleted 目录
// 由于在很多硬盘上耗时非常久,所以应该放在后台运行
func ( this * FileStorage ) cleanDeletedDirs ( dir string ) error {
fp , err := os . Open ( dir )
if err != nil {
return err
}
defer func ( ) {
_ = fp . Close ( )
} ( )
subDirs , err := fp . Readdir ( - 1 )
if err != nil {
return err
}
for _ , info := range subDirs {
subDir := info . Name ( )
if ! strings . HasSuffix ( subDir , "-deleted" ) {
continue
}
// 删除
err = os . RemoveAll ( dir + "/" + subDir )
if err != nil {
if ! os . IsNotExist ( err ) {
return err
}
}
}
return nil
}