Files
EdgeNode/internal/caches/storage_file.go

1361 lines
32 KiB
Go
Raw Normal View History

2020-10-04 14:30:42 +08:00
package caches
import (
"bytes"
2020-10-04 14:30:42 +08:00
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
2020-10-28 11:19:06 +08:00
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/trackers"
2020-10-04 14:30:42 +08:00
"github.com/TeaOSLab/EdgeNode/internal/utils"
setutils "github.com/TeaOSLab/EdgeNode/internal/utils/sets"
"github.com/TeaOSLab/EdgeNode/internal/utils/sizes"
"github.com/TeaOSLab/EdgeNode/internal/zero"
2020-10-04 14:30:42 +08:00
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
2020-10-04 14:30:42 +08:00
stringutil "github.com/iwind/TeaGo/utils/string"
"golang.org/x/sys/unix"
"golang.org/x/text/language"
"golang.org/x/text/message"
"math"
2020-10-04 14:30:42 +08:00
"os"
"path/filepath"
"regexp"
"sort"
2020-10-04 14:30:42 +08:00
"strconv"
"strings"
"sync"
2020-10-05 20:23:18 +08:00
"sync/atomic"
"syscall"
2020-10-04 14:30:42 +08:00
"time"
)
const (
2022-04-14 09:36:02 +08:00
SizeExpiresAt = 4
OffsetExpiresAt = 0
SizeStatus = 3
OffsetStatus = SizeExpiresAt
SizeURLLength = 4
OffsetURLLength = OffsetStatus + SizeStatus
SizeHeaderLength = 4
OffsetHeaderLength = OffsetURLLength + SizeURLLength
SizeBodyLength = 8
OffsetBodyLength = OffsetHeaderLength + SizeHeaderLength
SizeMeta = SizeExpiresAt + SizeStatus + SizeURLLength + SizeHeaderLength + SizeBodyLength
OffsetKey = SizeMeta
2020-10-04 14:30:42 +08:00
)
const (
2022-03-20 21:15:25 +08:00
FileStorageMaxIgnoreKeys = 32768 // 最大可忽略的键值数(尺寸过大的键值)
HotItemSize = 1024 // 热点数据数量
HotItemLifeSeconds int64 = 3600 // 热点数据生命周期
FileToMemoryMaxSize = 32 * sizes.M // 可以从文件写入到内存的最大文件尺寸
FileTmpSuffix = ".tmp"
MinDiskSpace = 5 << 30 // 当前磁盘最小剩余空间
)
var sharedWritingFileKeyMap = map[string]zero.Zero{} // key => bool
var sharedWritingFileKeyLocker = sync.Mutex{}
2022-07-17 10:24:35 +08:00
var maxOpenFiles = NewMaxOpenFiles()
2022-04-14 10:25:34 +08:00
2022-08-14 16:28:40 +08:00
const maxOpenFilesSlowCost = 1000 * time.Microsecond // us
2022-07-26 08:29:22 +08:00
const protectingLoadWhenDump = false
2022-04-14 09:36:02 +08:00
// FileStorage 文件缓存
2022-08-14 16:28:40 +08:00
//
// 文件结构:
// [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data]
2020-10-04 14:30:42 +08:00
type FileStorage struct {
policy *serverconfigs.HTTPCachePolicy
options *serverconfigs.HTTPFileCacheStorage // 二级缓存
memoryStorage *MemoryStorage // 一级缓存
totalSize int64
2020-10-04 14:30:42 +08:00
list ListInterface
locker sync.RWMutex
purgeTicker *utils.Ticker
hotMap map[string]*HotItem // key => count
hotMapLocker sync.Mutex
lastHotSize int
hotTicker *utils.Ticker
2022-01-12 21:09:00 +08:00
ignoreKeys *setutils.FixedSet
2022-01-12 21:09:00 +08:00
openFileCache *OpenFileCache
diskIsFull bool
2020-10-04 14:30:42 +08:00
}
func NewFileStorage(policy *serverconfigs.HTTPCachePolicy) *FileStorage {
return &FileStorage{
policy: policy,
hotMap: map[string]*HotItem{},
lastHotSize: -1,
2022-03-20 21:15:25 +08:00
ignoreKeys: setutils.NewFixedSet(FileStorageMaxIgnoreKeys),
2020-10-04 14:30:42 +08:00
}
}
// Policy 获取当前的Policy
2020-10-04 14:30:42 +08:00
func (this *FileStorage) Policy() *serverconfigs.HTTPCachePolicy {
return this.policy
}
// CanUpdatePolicy 检查策略是否可以更新
func (this *FileStorage) CanUpdatePolicy(newPolicy *serverconfigs.HTTPCachePolicy) bool {
// 检查路径是否有变化
oldOptionsJSON, err := json.Marshal(this.policy.Options)
if err != nil {
return false
}
var oldOptions = &serverconfigs.HTTPFileCacheStorage{}
err = json.Unmarshal(oldOptionsJSON, oldOptions)
if err != nil {
return false
}
newOptionsJSON, err := json.Marshal(newPolicy.Options)
if err != nil {
return false
}
var newOptions = &serverconfigs.HTTPFileCacheStorage{}
err = json.Unmarshal(newOptionsJSON, newOptions)
if err != nil {
return false
}
if oldOptions.Dir == newOptions.Dir {
return true
}
return false
}
// UpdatePolicy 修改策略
func (this *FileStorage) UpdatePolicy(newPolicy *serverconfigs.HTTPCachePolicy) {
var oldOpenFileCache = this.options.OpenFileCache
this.policy = newPolicy
newOptionsJSON, err := json.Marshal(newPolicy.Options)
if err != nil {
return
}
var newOptions = &serverconfigs.HTTPFileCacheStorage{}
err = json.Unmarshal(newOptionsJSON, newOptions)
if err != nil {
remotelogs.Error("CACHE", "update policy '"+types.String(this.policy.Id)+"' failed: decode options failed: "+err.Error())
return
}
err = newOptions.Init()
if err != nil {
remotelogs.Error("CACHE", "update policy '"+types.String(this.policy.Id)+"' failed: init options failed: "+err.Error())
return
}
this.options = newOptions
var memoryStorage = this.memoryStorage
if memoryStorage != nil {
if newOptions.MemoryPolicy != nil && newOptions.MemoryPolicy.CapacityBytes() > 0 {
memoryStorage.UpdatePolicy(newOptions.MemoryPolicy)
} else {
memoryStorage.Stop()
this.memoryStorage = nil
}
} else if newOptions.MemoryPolicy != nil && this.options.MemoryPolicy.Capacity != nil && this.options.MemoryPolicy.Capacity.Count > 0 {
err = this.createMemoryStorage()
if err != nil {
remotelogs.Error("CACHE", "update policy '"+types.String(this.policy.Id)+"' failed: create memory storage failed: "+err.Error())
}
}
// open cache
oldOpenFileCacheJSON, _ := json.Marshal(oldOpenFileCache)
newOpenFileCacheJSON, _ := json.Marshal(this.options.OpenFileCache)
2022-09-18 16:18:31 +08:00
if !bytes.Equal(oldOpenFileCacheJSON, newOpenFileCacheJSON) {
this.initOpenFileCache()
}
// Purge Ticker
if newPolicy.PersistenceAutoPurgeInterval != this.policy.PersistenceAutoPurgeInterval {
this.initPurgeTicker()
}
}
// Init 初始化
2020-10-04 14:30:42 +08:00
func (this *FileStorage) Init() error {
this.locker.Lock()
defer this.locker.Unlock()
var before = time.Now()
2020-10-04 14:30:42 +08:00
// 配置
var options = &serverconfigs.HTTPFileCacheStorage{}
2020-10-04 14:30:42 +08:00
optionsJSON, err := json.Marshal(this.policy.Options)
if err != nil {
return err
}
err = json.Unmarshal(optionsJSON, options)
2020-10-04 14:30:42 +08:00
if err != nil {
return err
}
this.options = options
2020-10-04 14:30:42 +08:00
if !filepath.IsAbs(this.options.Dir) {
this.options.Dir = Tea.Root + Tea.DS + this.options.Dir
2020-10-04 14:30:42 +08:00
}
this.options.Dir = filepath.Clean(this.options.Dir)
var dir = this.options.Dir
2020-10-04 14:30:42 +08:00
if len(dir) == 0 {
return errors.New("[CACHE]cache storage dir can not be empty")
}
2022-03-16 16:20:53 +08:00
var list = NewFileList(dir + "/p" + types.String(this.policy.Id) + "/.indexes")
2021-05-19 12:07:35 +08:00
err = list.Init()
if err != nil {
return err
}
2022-03-16 16:20:53 +08:00
list.(*FileList).SetOldDir(dir + "/p" + types.String(this.policy.Id))
2021-05-19 12:07:35 +08:00
this.list = list
stat, err := list.Stat(func(hash string) bool {
return true
})
if err != nil {
return err
}
this.totalSize = stat.Size
this.list.OnAdd(func(item *Item) {
atomic.AddInt64(&this.totalSize, item.TotalSize())
})
this.list.OnRemove(func(item *Item) {
atomic.AddInt64(&this.totalSize, -item.TotalSize())
})
2020-10-04 14:30:42 +08:00
// 检查目录是否存在
_, err = os.Stat(dir)
if err != nil {
if !os.IsNotExist(err) {
return err
} else {
err = os.MkdirAll(dir, 0777)
if err != nil {
return errors.New("[CACHE]can not create dir:" + err.Error())
}
}
}
2021-05-19 12:07:35 +08:00
defer func() {
// 统计
var count = stat.Count
var size = stat.Size
2021-05-19 12:07:35 +08:00
var cost = time.Since(before).Seconds() * 1000
2021-05-19 12:07:35 +08:00
sizeMB := strconv.FormatInt(size, 10) + " Bytes"
2022-03-20 21:15:25 +08:00
if size > 1*sizes.G {
sizeMB = fmt.Sprintf("%.3f G", float64(size)/float64(sizes.G))
} else if size > 1*sizes.M {
sizeMB = fmt.Sprintf("%.3f M", float64(size)/float64(sizes.M))
} else if size > 1*sizes.K {
sizeMB = fmt.Sprintf("%.3f K", float64(size)/float64(sizes.K))
2021-05-19 12:07:35 +08:00
}
remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+" from '"+this.options.Dir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+message.NewPrinter(language.English).Sprintf("%d", count)+", size: "+sizeMB)
2021-05-19 12:07:35 +08:00
}()
2020-10-04 14:30:42 +08:00
// 初始化list
err = this.initList()
if err != nil {
return err
}
// 加载内存缓存
if this.options.MemoryPolicy != nil && this.options.MemoryPolicy.Capacity != nil && this.options.MemoryPolicy.Capacity.Count > 0 {
err = this.createMemoryStorage()
if err != nil {
return err
}
}
2022-01-12 21:09:00 +08:00
// open file cache
this.initOpenFileCache()
2022-01-12 21:09:00 +08:00
// 检查磁盘空间
this.checkDiskSpace()
2020-10-04 14:30:42 +08:00
return nil
}
func (this *FileStorage) OpenReader(key string, useStale bool, isPartial bool) (Reader, error) {
return this.openReader(key, true, useStale, isPartial)
}
func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool, isPartial bool) (Reader, error) {
2021-12-16 17:27:21 +08:00
// 使用陈旧缓存的时候,我们认为是短暂的,只需要从文件里检查即可
if useStale {
allowMemory = false
}
// 区间缓存只存在文件中
if isPartial {
allowMemory = false
}
// 先尝试内存缓存
var memoryStorage = this.memoryStorage
if allowMemory && memoryStorage != nil {
reader, err := memoryStorage.OpenReader(key, useStale, isPartial)
if err == nil {
return reader, err
}
}
2021-06-13 17:37:57 +08:00
hash, path := this.keyPath(key)
2020-10-04 14:30:42 +08:00
2021-12-16 17:27:21 +08:00
// 检查文件记录是否已过期
if !useStale {
exists, err := this.list.Exist(hash)
if err != nil {
return nil, err
}
if !exists {
return nil, ErrNotFound
}
}
2020-10-04 14:30:42 +08:00
// TODO 尝试使用mmap加快读取速度
2021-06-13 17:37:57 +08:00
var isOk = false
2022-01-12 21:09:00 +08:00
var openFile *OpenFile
var openFileCache = this.openFileCache // 因为中间可能有修改,所以先赋值再获取
if openFileCache != nil {
openFile = openFileCache.Get(path)
2022-01-12 21:09:00 +08:00
}
var fp *os.File
var err error
if openFile == nil {
2022-01-12 21:09:00 +08:00
fp, err = os.OpenFile(path, os.O_RDONLY, 0444)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}
return nil, ErrNotFound
2020-10-04 14:30:42 +08:00
}
} else {
fp = openFile.fp
2020-10-04 14:30:42 +08:00
}
2021-06-13 17:37:57 +08:00
defer func() {
if !isOk {
_ = fp.Close()
2022-03-05 16:47:17 +08:00
_ = this.removeCacheFile(path)
2021-06-13 17:37:57 +08:00
}
}()
var reader Reader
if isPartial {
var partialFileReader = NewPartialFileReader(fp)
partialFileReader.openFile = openFile
partialFileReader.openFileCache = openFileCache
reader = partialFileReader
} else {
var fileReader = NewFileReader(fp)
fileReader.openFile = openFile
fileReader.openFileCache = openFileCache
reader = fileReader
}
2021-01-13 12:02:50 +08:00
err = reader.Init()
2020-10-04 14:30:42 +08:00
if err != nil {
2021-01-13 12:02:50 +08:00
return nil, err
2020-10-04 14:30:42 +08:00
}
2021-06-13 17:37:57 +08:00
// 增加点击量
// 1/1000采样
if !isPartial && allowMemory && reader.BodySize() < FileToMemoryMaxSize {
this.increaseHit(key, hash, reader)
}
2021-06-13 17:37:57 +08:00
isOk = true
2021-01-13 12:02:50 +08:00
return reader, nil
2020-10-04 14:30:42 +08:00
}
// OpenWriter 打开缓存文件等待写入
2022-04-14 09:36:02 +08:00
func (this *FileStorage) OpenWriter(key string, expiresAt int64, status int, size int64, maxSize int64, isPartial bool) (Writer, error) {
return this.openWriter(key, expiresAt, status, size, maxSize, isPartial, false)
}
// OpenFlushWriter 打开从其他媒介直接刷入的写入器
func (this *FileStorage) OpenFlushWriter(key string, expiresAt int64, status int) (Writer, error) {
return this.openWriter(key, expiresAt, status, -1, -1, false, true)
}
func (this *FileStorage) openWriter(key string, expiredAt int64, status int, size int64, maxSize int64, isPartial bool, isFlushing bool) (Writer, error) {
// 是否正在退出
if teaconst.IsQuiting {
2022-03-15 18:32:39 +08:00
return nil, ErrWritingUnavailable
}
// 当前磁盘可用容量是否严重不足
if this.diskIsFull {
return nil, NewCapacityError("the disk is full")
}
// 是否已忽略
if this.ignoreKeys.Has(key) {
return nil, ErrEntityTooLarge
}
// 先尝试内存缓存
2022-02-15 16:44:39 +08:00
// 我们限定仅小文件优先存在内存中
var maxMemorySize = FileToMemoryMaxSize
if maxSize > maxMemorySize {
maxMemorySize = maxSize
}
var memoryStorage = this.memoryStorage
2022-04-14 09:36:02 +08:00
if !isFlushing && !isPartial && memoryStorage != nil && ((size > 0 && size < maxMemorySize) || size < 0) {
writer, err := memoryStorage.OpenWriter(key, expiredAt, status, size, maxMemorySize, false)
if err == nil {
return writer, nil
}
2022-03-15 18:32:39 +08:00
// 如果队列满了,则等待
if err == ErrWritingQueueFull {
return nil, err
}
}
// 是否正在写入
var isOk = false
sharedWritingFileKeyLocker.Lock()
_, ok := sharedWritingFileKeyMap[key]
if ok {
sharedWritingFileKeyLocker.Unlock()
return nil, ErrFileIsWriting
}
2022-04-14 09:36:02 +08:00
2022-07-17 10:24:35 +08:00
if !isFlushing && !maxOpenFiles.Next() {
2022-04-14 09:36:02 +08:00
sharedWritingFileKeyLocker.Unlock()
return nil, ErrTooManyOpenFiles
}
sharedWritingFileKeyMap[key] = zero.New()
sharedWritingFileKeyLocker.Unlock()
defer func() {
if !isOk {
sharedWritingFileKeyLocker.Lock()
delete(sharedWritingFileKeyMap, key)
2022-07-17 10:24:35 +08:00
if len(sharedWritingFileKeyMap) == 0 {
maxOpenFiles.FinishAll()
}
sharedWritingFileKeyLocker.Unlock()
}
}()
2020-10-05 19:15:35 +08:00
// 检查是否超出最大值
2021-05-19 12:07:35 +08:00
count, err := this.list.Count()
if err != nil {
return nil, err
}
if this.policy.MaxKeys > 0 && count > this.policy.MaxKeys {
2021-06-08 11:24:41 +08:00
return nil, NewCapacityError("write file cache failed: too many keys in cache storage")
2020-10-05 20:23:18 +08:00
}
var capacityBytes = this.diskCapacityBytes()
if capacityBytes > 0 && capacityBytes <= this.totalSize {
2021-06-08 11:24:41 +08:00
return nil, NewCapacityError("write file cache failed: over disk size, current total size: " + strconv.FormatInt(this.totalSize, 10) + " bytes, capacity: " + strconv.FormatInt(capacityBytes, 10))
2020-10-05 19:15:35 +08:00
}
var hash = stringutil.Md5(key)
// TODO 可以只stat一次
var dir = this.options.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4]
2021-05-19 12:07:35 +08:00
_, err = os.Stat(dir)
2020-10-04 14:30:42 +08:00
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}
err = os.MkdirAll(dir, 0777)
if err != nil {
return nil, err
}
}
// 检查缓存是否已经生成
var cachePathName = dir + "/" + hash
var cachePath = cachePathName + ".cache"
// 关闭OpenFileCache
var openFileCache = this.openFileCache
if openFileCache != nil {
openFileCache.Close(cachePath)
}
// 查询当前已有缓存文件
stat, err := os.Stat(cachePath)
// 检查两次写入缓存的时间是否过于相近,分片内容不受此限制
if err == nil && !isPartial && time.Now().Sub(stat.ModTime()) <= 1*time.Second {
// 防止并发连续写入
return nil, ErrFileIsWriting
}
// 构造文件名
var tmpPath = cachePath
var existsFile = false
if stat != nil {
existsFile = true
// 如果已经存在,则增加一个.tmp后缀防止读写冲突
tmpPath += FileTmpSuffix
}
if isPartial {
tmpPath = cachePathName + ".cache"
}
2020-10-05 20:23:18 +08:00
// 先删除
2022-03-05 19:31:50 +08:00
if !isPartial {
err = this.list.Remove(hash)
if err != nil {
return nil, err
}
}
// 从已经存储的内容中读取信息
var isNewCreated = true
var partialBodyOffset int64
if isPartial {
readerFp, err := os.OpenFile(tmpPath, os.O_RDONLY, 0444)
if err == nil {
var partialReader = NewPartialFileReader(readerFp)
err = partialReader.Init()
_ = partialReader.Close()
if err == nil && partialReader.bodyOffset > 0 {
isNewCreated = false
partialBodyOffset = partialReader.bodyOffset
} else {
_ = this.removeCacheFile(tmpPath)
}
}
2021-05-19 12:07:35 +08:00
}
2020-10-05 20:23:18 +08:00
2022-04-15 14:23:06 +08:00
var flags = os.O_CREATE | os.O_WRONLY
if isNewCreated && existsFile {
2022-04-15 14:23:06 +08:00
flags |= os.O_TRUNC
}
var before = time.Now()
2022-04-15 14:23:06 +08:00
writer, err := os.OpenFile(tmpPath, flags, 0666)
2020-10-04 14:30:42 +08:00
if err != nil {
return nil, err
}
if !isFlushing {
if time.Since(before) >= maxOpenFilesSlowCost {
maxOpenFiles.Slow()
} else {
maxOpenFiles.Fast()
}
}
2020-10-04 14:30:42 +08:00
var removeOnFailure = true
2020-10-04 14:30:42 +08:00
defer func() {
if err != nil {
isOk = false
}
// 如果出错了,就删除文件,避免写一半
if !isOk {
_ = writer.Close()
if removeOnFailure {
_ = os.Remove(tmpPath)
}
2020-10-04 14:30:42 +08:00
}
}()
// 尝试锁定,如果锁定失败,则直接返回
err = syscall.Flock(int(writer.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
if err != nil {
removeOnFailure = false
return nil, ErrFileIsWriting
}
if isNewCreated {
// 写入过期时间
2022-04-14 09:36:02 +08:00
var metaBytes = make([]byte, SizeMeta+len(key))
binary.BigEndian.PutUint32(metaBytes[OffsetExpiresAt:], uint32(expiredAt))
2020-10-04 14:30:42 +08:00
// 写入状态码
if status > 999 || status < 100 {
status = 200
}
2022-04-14 09:36:02 +08:00
copy(metaBytes[OffsetStatus:], strconv.Itoa(status))
2020-10-04 14:30:42 +08:00
// 写入URL长度
2022-04-14 09:36:02 +08:00
binary.BigEndian.PutUint32(metaBytes[OffsetURLLength:], uint32(len(key)))
2020-10-04 14:30:42 +08:00
// 写入Header Length
2022-04-14 09:36:02 +08:00
binary.BigEndian.PutUint32(metaBytes[OffsetHeaderLength:], uint32(0))
// 写入Body Length
2022-04-14 09:36:02 +08:00
binary.BigEndian.PutUint64(metaBytes[OffsetBodyLength:], uint64(0))
// 写入URL
2022-04-14 09:36:02 +08:00
copy(metaBytes[OffsetKey:], key)
_, err = writer.Write(metaBytes)
2021-01-13 12:02:50 +08:00
if err != nil {
return nil, err
}
2020-10-04 14:30:42 +08:00
}
isOk = true
if isPartial {
ranges, err := NewPartialRangesFromFile(cachePathName + "@ranges.cache")
if err != nil {
ranges = NewPartialRanges()
}
return NewPartialFileWriter(writer, key, expiredAt, isNewCreated, isPartial, partialBodyOffset, ranges, func() {
sharedWritingFileKeyLocker.Lock()
delete(sharedWritingFileKeyMap, key)
2022-07-17 10:24:35 +08:00
if len(sharedWritingFileKeyMap) == 0 {
maxOpenFiles.FinishAll()
}
sharedWritingFileKeyLocker.Unlock()
}), nil
} else {
return NewFileWriter(this, writer, key, expiredAt, -1, func() {
sharedWritingFileKeyLocker.Lock()
delete(sharedWritingFileKeyMap, key)
2022-07-17 10:24:35 +08:00
if len(sharedWritingFileKeyMap) == 0 {
maxOpenFiles.FinishAll()
}
sharedWritingFileKeyLocker.Unlock()
}), nil
}
2020-10-04 14:30:42 +08:00
}
// AddToList 添加到List
2020-10-04 14:30:42 +08:00
func (this *FileStorage) AddToList(item *Item) {
// 是否正在退出
if teaconst.IsQuiting {
return
}
var memoryStorage = this.memoryStorage
if memoryStorage != nil {
if item.Type == ItemTypeMemory {
memoryStorage.AddToList(item)
return
}
}
item.MetaSize = SizeMeta + 128
2020-10-04 14:30:42 +08:00
hash := stringutil.Md5(item.Key)
2021-05-19 12:07:35 +08:00
err := this.list.Add(hash, item)
if err != nil && !strings.Contains(err.Error(), "UNIQUE constraint failed") {
remotelogs.Error("CACHE", "add to list failed: "+err.Error())
}
2020-10-04 14:30:42 +08:00
}
// Delete 删除某个键值对应的缓存
2020-10-04 14:30:42 +08:00
func (this *FileStorage) Delete(key string) error {
// 是否正在退出
if teaconst.IsQuiting {
return nil
}
2020-10-04 14:30:42 +08:00
this.locker.Lock()
defer this.locker.Unlock()
// 先尝试内存缓存
this.runMemoryStorageSafety(func(memoryStorage *MemoryStorage) {
_ = memoryStorage.Delete(key)
})
2020-10-04 14:30:42 +08:00
hash, path := this.keyPath(key)
2021-05-19 12:07:35 +08:00
err := this.list.Remove(hash)
if err != nil {
return err
}
2022-03-05 16:47:17 +08:00
err = this.removeCacheFile(path)
2020-10-04 14:30:42 +08:00
if err == nil || os.IsNotExist(err) {
return nil
}
2020-10-04 14:30:42 +08:00
return err
}
// Stat 统计
2020-10-04 14:30:42 +08:00
func (this *FileStorage) Stat() (*Stat, error) {
this.locker.RLock()
defer this.locker.RUnlock()
return this.list.Stat(func(hash string) bool {
return true
2021-05-19 12:07:35 +08:00
})
2020-10-04 14:30:42 +08:00
}
// CleanAll 清除所有的缓存
2020-10-04 14:30:42 +08:00
func (this *FileStorage) CleanAll() error {
this.locker.Lock()
defer this.locker.Unlock()
// 先尝试内存缓存
this.runMemoryStorageSafety(func(memoryStorage *MemoryStorage) {
_ = memoryStorage.CleanAll()
})
2021-05-19 12:07:35 +08:00
err := this.list.CleanAll()
if err != nil {
return err
}
2020-10-04 14:30:42 +08:00
// 删除缓存和目录
// 不能直接删除子目录,比较危险
dir := this.dir()
fp, err := os.Open(dir)
if err != nil {
return err
}
defer func() {
_ = fp.Close()
}()
stat, err := fp.Stat()
if err != nil {
return err
}
if !stat.IsDir() {
return nil
}
2021-05-25 18:28:24 +08:00
// 改成待删除
2020-10-04 14:30:42 +08:00
subDirs, err := fp.Readdir(-1)
if err != nil {
return err
}
for _, info := range subDirs {
subDir := info.Name()
// 检查目录名
ok, err := regexp.MatchString(`^[0-9a-f]{2}$`, subDir)
if err != nil {
return err
}
if !ok {
continue
}
2021-05-25 18:28:24 +08:00
// 修改目录名
tmpDir := dir + "/" + subDir + "-deleted"
err = os.Rename(dir+"/"+subDir, tmpDir)
if err != nil {
return err
}
}
// 重新遍历待删除
goman.New(func() {
2021-06-17 21:13:21 +08:00
err = this.cleanDeletedDirs(dir)
2020-10-04 14:30:42 +08:00
if err != nil {
2021-06-17 21:13:21 +08:00
remotelogs.Warn("CACHE", "delete '*-deleted' dirs failed: "+err.Error())
2020-10-04 14:30:42 +08:00
}
})
2020-10-04 14:30:42 +08:00
return nil
}
// Purge 清理过期的缓存
2020-12-23 21:28:50 +08:00
func (this *FileStorage) Purge(keys []string, urlType string) error {
// 是否正在退出
if teaconst.IsQuiting {
return nil
}
2020-10-04 14:30:42 +08:00
this.locker.Lock()
defer this.locker.Unlock()
// 先尝试内存缓存
this.runMemoryStorageSafety(func(memoryStorage *MemoryStorage) {
_ = memoryStorage.Purge(keys, urlType)
})
2020-12-23 21:28:50 +08:00
// 目录
if urlType == "dir" {
for _, key := range keys {
2021-06-13 17:37:57 +08:00
err := this.list.CleanPrefix(key)
2021-05-19 12:07:35 +08:00
if err != nil {
return err
}
2020-12-23 21:28:50 +08:00
}
return nil
2020-12-23 21:28:50 +08:00
}
// URL
2020-10-04 14:30:42 +08:00
for _, key := range keys {
hash, path := this.keyPath(key)
2022-03-05 16:47:17 +08:00
err := this.removeCacheFile(path)
2020-10-04 14:30:42 +08:00
if err != nil && !os.IsNotExist(err) {
return err
}
2021-05-19 12:07:35 +08:00
err = this.list.Remove(hash)
if err != nil {
return err
}
2020-10-04 14:30:42 +08:00
}
return nil
}
// Stop 停止
2020-10-04 14:30:42 +08:00
func (this *FileStorage) Stop() {
2022-01-12 21:09:00 +08:00
events.Remove(this)
2020-10-04 14:30:42 +08:00
this.locker.Lock()
defer this.locker.Unlock()
// 先尝试内存缓存
this.runMemoryStorageSafety(func(memoryStorage *MemoryStorage) {
memoryStorage.Stop()
})
_ = this.list.Reset()
if this.purgeTicker != nil {
this.purgeTicker.Stop()
}
if this.hotTicker != nil {
this.hotTicker.Stop()
2020-10-04 14:30:42 +08:00
}
2021-06-13 17:37:57 +08:00
_ = this.list.Close()
2022-01-12 21:09:00 +08:00
var openFileCache = this.openFileCache
if openFileCache != nil {
openFileCache.CloseAll()
2022-01-12 21:09:00 +08:00
}
this.ignoreKeys.Reset()
2020-10-04 14:30:42 +08:00
}
// TotalDiskSize 消耗的磁盘尺寸
func (this *FileStorage) TotalDiskSize() int64 {
return atomic.LoadInt64(&this.totalSize)
}
// TotalMemorySize 内存尺寸
func (this *FileStorage) TotalMemorySize() int64 {
var memoryStorage = this.memoryStorage
if memoryStorage == nil {
return 0
}
return memoryStorage.TotalMemorySize()
}
// IgnoreKey 忽略某个Key即不缓存某个Key
func (this *FileStorage) IgnoreKey(key string) {
this.ignoreKeys.Push(key)
}
2022-04-04 19:45:57 +08:00
// CanSendfile 是否支持Sendfile
func (this *FileStorage) CanSendfile() bool {
if this.options == nil {
return false
}
return this.options.EnableSendfile
}
2020-10-04 14:30:42 +08:00
// 绝对路径
func (this *FileStorage) dir() string {
return this.options.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/"
2020-10-04 14:30:42 +08:00
}
// 获取Key对应的文件路径
func (this *FileStorage) keyPath(key string) (hash string, path string) {
hash = stringutil.Md5(key)
dir := this.options.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4]
2020-10-04 14:30:42 +08:00
path = dir + "/" + hash + ".cache"
return
}
// 获取Hash对应的文件路径
func (this *FileStorage) hashPath(hash string) (path string) {
if len(hash) != 32 {
return ""
}
dir := this.options.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4]
2020-10-04 14:30:42 +08:00
path = dir + "/" + hash + ".cache"
return
}
// 初始化List
func (this *FileStorage) initList() error {
2021-05-19 12:07:35 +08:00
err := this.list.Reset()
if err != nil {
return err
}
2020-10-04 14:30:42 +08:00
2021-05-23 22:59:00 +08:00
// 使用异步防止阻塞主线程
/**goman.New(func() {
2021-05-23 22:59:00 +08:00
dir := this.dir()
2021-01-13 12:02:50 +08:00
2021-05-23 22:59:00 +08:00
// 清除tmp
// TODO 需要一个更加高效的实现
})**/
2021-01-13 12:02:50 +08:00
2020-10-04 14:30:42 +08:00
// 启动定时清理任务
this.initPurgeTicker()
// 热点处理任务
this.hotTicker = utils.NewTicker(1 * time.Minute)
if Tea.IsTesting() {
this.hotTicker = utils.NewTicker(10 * time.Second)
}
goman.New(func() {
for this.hotTicker.Next() {
trackers.Run("FILE_CACHE_STORAGE_HOT_LOOP", func() {
this.hotLoop()
})
}
})
// 退出时停止
2022-01-12 21:09:00 +08:00
events.OnKey(events.EventQuit, this, func() {
remotelogs.Println("CACHE", "quit clean timer")
2022-01-12 21:09:00 +08:00
{
var ticker = this.purgeTicker
if ticker != nil {
ticker.Stop()
}
}
{
var ticker = this.hotTicker
if ticker != nil {
ticker.Stop()
}
2020-10-28 11:19:06 +08:00
}
})
2020-10-04 14:30:42 +08:00
return nil
}
// 清理任务
func (this *FileStorage) purgeLoop() {
// 检查磁盘剩余空间
this.checkDiskSpace()
// 计算是否应该开启LFU清理
var capacityBytes = this.diskCapacityBytes()
var startLFU = false
var lfuFreePercent = this.policy.PersistenceLFUFreePercent
if lfuFreePercent <= 0 {
lfuFreePercent = 5
}
if this.diskIsFull {
startLFU = true
} else {
var usedPercent = float32(this.TotalDiskSize()*100) / float32(capacityBytes)
if capacityBytes > 0 {
if lfuFreePercent < 100 {
if usedPercent >= 100-lfuFreePercent {
startLFU = true
}
}
}
}
// 清理过期
{
var times = 1
// 空闲时间多清理
if utils.SharedFreeHoursManager.IsFreeHour() {
times = 5
}
// 处于LFU阈值时多清理
if startLFU {
times = 5
}
var purgeCount = this.policy.PersistenceAutoPurgeCount
if purgeCount <= 0 {
purgeCount = 1000
}
for i := 0; i < times; i++ {
countFound, err := this.list.Purge(purgeCount, func(hash string) error {
path := this.hashPath(hash)
2022-03-05 16:47:17 +08:00
err := this.removeCacheFile(path)
if err != nil && !os.IsNotExist(err) {
remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error())
}
2022-03-05 16:47:17 +08:00
return nil
})
if err != nil {
remotelogs.Warn("CACHE", "purge file storage failed: "+err.Error())
continue
}
if countFound < purgeCount {
break
}
time.Sleep(1 * time.Second)
}
}
// 磁盘空间不足时,清除老旧的缓存
if startLFU {
var total, _ = this.list.Count()
if total > 0 {
var count = types.Int(math.Ceil(float64(total) * float64(lfuFreePercent*2) / 100))
if count > 0 {
// 限制单次清理的条数,防止占用太多系统资源
if count > 2000 {
count = 2000
}
remotelogs.Println("CACHE", "LFU purge policy '"+this.policy.Name+"' id: "+types.String(this.policy.Id)+", count: "+types.String(count))
err := this.list.PurgeLFU(count, func(hash string) error {
path := this.hashPath(hash)
2022-03-05 16:47:17 +08:00
err := this.removeCacheFile(path)
if err != nil && !os.IsNotExist(err) {
remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error())
}
2022-03-05 16:47:17 +08:00
return nil
})
if err != nil {
remotelogs.Warn("CACHE", "purge file storage in LFU failed: "+err.Error())
}
}
2020-10-04 14:30:42 +08:00
}
2021-06-13 17:37:57 +08:00
}
2020-10-04 14:30:42 +08:00
}
2021-01-13 12:02:50 +08:00
// 热点数据任务
func (this *FileStorage) hotLoop() {
var memoryStorage = this.memoryStorage
if memoryStorage == nil {
return
}
this.hotMapLocker.Lock()
if len(this.hotMap) == 0 {
this.hotMapLocker.Unlock()
this.lastHotSize = 0
return
}
this.lastHotSize = len(this.hotMap)
var result = []*HotItem{} // [ {key: ..., hits: ...}, ... ]
for _, v := range this.hotMap {
result = append(result, v)
}
this.hotMap = map[string]*HotItem{}
this.hotMapLocker.Unlock()
2022-03-20 20:58:34 +08:00
// 取Top10%写入内存
if len(result) > 0 {
sort.Slice(result, func(i, j int) bool {
return result[i].Hits > result[j].Hits
})
var size = 1
if len(result) < 10 {
size = 1
} else {
size = len(result) / 10
}
2021-12-19 11:32:26 +08:00
var buf = utils.BytePool16k.Get()
defer utils.BytePool16k.Put(buf)
for _, item := range result[:size] {
reader, err := this.openReader(item.Key, false, false, false)
if err != nil {
continue
}
if reader == nil {
continue
}
// 如果即将过期,则忽略
var nowUnixTime = time.Now().Unix()
if reader.ExpiresAt() <= nowUnixTime+600 {
continue
}
// 计算合适的过期时间
var bestExpiresAt = nowUnixTime + HotItemLifeSeconds
var hotTimes = int64(item.Hits) / 1000
if hotTimes > 8 {
hotTimes = 8
}
bestExpiresAt += hotTimes * HotItemLifeSeconds
var expiresAt = reader.ExpiresAt()
if expiresAt <= 0 || expiresAt > bestExpiresAt {
expiresAt = bestExpiresAt
}
writer, err := memoryStorage.openWriter(item.Key, expiresAt, reader.Status(), reader.BodySize(), -1, false)
if err != nil {
if !CanIgnoreErr(err) {
remotelogs.Error("CACHE", "transfer hot item failed: "+err.Error())
}
_ = reader.Close()
continue
}
if writer == nil {
_ = reader.Close()
continue
}
err = reader.ReadHeader(buf, func(n int) (goNext bool, err error) {
_, err = writer.WriteHeader(buf[:n])
return
})
if err != nil {
_ = reader.Close()
_ = writer.Discard()
continue
}
err = reader.ReadBody(buf, func(n int) (goNext bool, err error) {
_, err = writer.Write(buf[:n])
if err == nil {
goNext = true
}
return
})
if err != nil {
_ = reader.Close()
_ = writer.Discard()
continue
}
memoryStorage.AddToList(&Item{
Type: writer.ItemType(),
Key: item.Key,
ExpiredAt: expiresAt,
HeaderSize: writer.HeaderSize(),
BodySize: writer.BodySize(),
})
_ = reader.Close()
_ = writer.Close()
}
}
}
func (this *FileStorage) diskCapacityBytes() int64 {
var c1 = this.policy.CapacityBytes()
if SharedManager.MaxDiskCapacity != nil {
var c2 = SharedManager.MaxDiskCapacity.Bytes()
if c2 > 0 {
return c2
}
}
return c1
}
2021-06-17 21:13:21 +08:00
// 清理 *-deleted 目录
// 由于在很多硬盘上耗时非常久,所以应该放在后台运行
func (this *FileStorage) cleanDeletedDirs(dir string) error {
fp, err := os.Open(dir)
if err != nil {
return err
}
defer func() {
_ = fp.Close()
}()
subDirs, err := fp.Readdir(-1)
if err != nil {
return err
}
for _, info := range subDirs {
subDir := info.Name()
if !strings.HasSuffix(subDir, "-deleted") {
continue
}
// 删除
err = os.RemoveAll(dir + "/" + subDir)
if err != nil {
if !os.IsNotExist(err) {
return err
}
}
}
return nil
}
// 增加某个Key的点击量
func (this *FileStorage) increaseHit(key string, hash string, reader Reader) {
var rate = this.policy.PersistenceHitSampleRate
if rate <= 0 {
rate = 1000
}
if this.lastHotSize == 0 {
// 自动降低采样率来增加热点数据的缓存几率
rate = rate / 10
}
if rands.Int(0, rate) == 0 {
var memoryStorage = this.memoryStorage
var hitErr = this.list.IncreaseHit(hash)
if hitErr != nil {
// 此错误可以忽略
remotelogs.Error("CACHE", "increase hit failed: "+hitErr.Error())
}
// 增加到热点
// 这里不收录缓存尺寸过大的文件
2022-03-20 21:15:25 +08:00
if memoryStorage != nil && reader.BodySize() > 0 && reader.BodySize() < 128*sizes.M {
this.hotMapLocker.Lock()
hotItem, ok := this.hotMap[key]
// 限制热点数据存活时间
var unixTime = time.Now().Unix()
var expiresAt = reader.ExpiresAt()
if expiresAt <= 0 || expiresAt > unixTime+HotItemLifeSeconds {
expiresAt = unixTime + HotItemLifeSeconds
}
if ok {
hotItem.Hits++
} else if len(this.hotMap) < HotItemSize { // 控制数量
this.hotMap[key] = &HotItem{
Key: key,
Hits: 1,
}
}
this.hotMapLocker.Unlock()
}
}
}
2022-03-05 16:47:17 +08:00
// 删除缓存文件
func (this *FileStorage) removeCacheFile(path string) error {
var openFileCache = this.openFileCache
if openFileCache != nil {
openFileCache.Close(path)
}
2022-03-05 16:47:17 +08:00
var err = os.Remove(path)
if err == nil || os.IsNotExist(err) {
err = nil
// 删除Partial相关
var partialPath = partialRangesFilePath(path)
if openFileCache != nil {
openFileCache.Close(partialPath)
}
_ = os.Remove(partialPath)
2022-03-05 16:47:17 +08:00
}
return err
}
// 创建当前策略包含的内存缓存
func (this *FileStorage) createMemoryStorage() error {
var memoryPolicy = &serverconfigs.HTTPCachePolicy{
Id: this.policy.Id,
IsOn: this.policy.IsOn,
Name: this.policy.Name,
Description: this.policy.Description,
Capacity: this.options.MemoryPolicy.Capacity,
MaxKeys: this.policy.MaxKeys,
MaxSize: &shared.SizeCapacity{Count: 128, Unit: shared.SizeCapacityUnitMB}, // TODO 将来可以修改
Type: serverconfigs.CachePolicyStorageMemory,
Options: this.policy.Options,
Life: this.policy.Life,
MinLife: this.policy.MinLife,
MaxLife: this.policy.MaxLife,
MemoryAutoPurgeCount: this.policy.MemoryAutoPurgeCount,
MemoryAutoPurgeInterval: this.policy.MemoryAutoPurgeInterval,
MemoryLFUFreePercent: this.policy.MemoryLFUFreePercent,
}
err := memoryPolicy.Init()
if err != nil {
return err
}
var memoryStorage = NewMemoryStorage(memoryPolicy, this)
err = memoryStorage.Init()
if err != nil {
return err
}
this.memoryStorage = memoryStorage
return nil
}
func (this *FileStorage) initPurgeTicker() {
var autoPurgeInterval = this.policy.PersistenceAutoPurgeInterval
if autoPurgeInterval <= 0 {
autoPurgeInterval = 30
if Tea.IsTesting() {
autoPurgeInterval = 10
}
}
if this.purgeTicker != nil {
this.purgeTicker.Stop()
}
this.purgeTicker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second)
goman.New(func() {
for this.purgeTicker.Next() {
trackers.Run("FILE_CACHE_STORAGE_PURGE_LOOP", func() {
this.purgeLoop()
})
}
})
}
func (this *FileStorage) initOpenFileCache() {
var err error
var oldOpenFileCache = this.openFileCache
// 启用新的
if this.options.OpenFileCache != nil && this.options.OpenFileCache.IsOn && this.options.OpenFileCache.Max > 0 {
this.openFileCache, err = NewOpenFileCache(this.options.OpenFileCache.Max)
if err != nil {
remotelogs.Error("CACHE", "open file cache failed: "+err.Error())
}
}
// 关闭老的
if oldOpenFileCache != nil {
oldOpenFileCache.CloseAll()
}
}
func (this *FileStorage) runMemoryStorageSafety(f func(memoryStorage *MemoryStorage)) {
var memoryStorage = this.memoryStorage
if memoryStorage != nil {
f(memoryStorage)
}
}
// 检查磁盘剩余空间
func (this *FileStorage) checkDiskSpace() {
if this.options != nil && len(this.options.Dir) > 0 {
var stat unix.Statfs_t
err := unix.Statfs(this.options.Dir, &stat)
if err == nil {
var availableBytes = stat.Bavail * uint64(stat.Bsize)
this.diskIsFull = availableBytes < MinDiskSpace
}
}
}