mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2026-01-04 06:16:36 +08:00
缓存文件列表使用sqlite管理
This commit is contained in:
@@ -49,7 +49,7 @@ type FileStorage struct {
|
||||
memoryStorage *MemoryStorage // 一级缓存
|
||||
totalSize int64
|
||||
|
||||
list *List
|
||||
list ListInterface
|
||||
locker sync.RWMutex
|
||||
ticker *utils.Ticker
|
||||
}
|
||||
@@ -57,7 +57,6 @@ type FileStorage struct {
|
||||
func NewFileStorage(policy *serverconfigs.HTTPCachePolicy) *FileStorage {
|
||||
return &FileStorage{
|
||||
policy: policy,
|
||||
list: NewList(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,41 +67,10 @@ func (this *FileStorage) Policy() *serverconfigs.HTTPCachePolicy {
|
||||
|
||||
// Init 初始化
|
||||
func (this *FileStorage) Init() error {
|
||||
this.list.OnAdd(func(item *Item) {
|
||||
atomic.AddInt64(&this.totalSize, item.TotalSize())
|
||||
})
|
||||
this.list.OnRemove(func(item *Item) {
|
||||
atomic.AddInt64(&this.totalSize, -item.TotalSize())
|
||||
})
|
||||
|
||||
this.locker.Lock()
|
||||
defer this.locker.Unlock()
|
||||
|
||||
before := time.Now()
|
||||
cacheDir := ""
|
||||
defer func() {
|
||||
// 统计
|
||||
count := 0
|
||||
size := int64(0)
|
||||
if this.list != nil {
|
||||
stat := this.list.Stat(func(hash string) bool {
|
||||
return true
|
||||
})
|
||||
count = stat.Count
|
||||
size = stat.Size
|
||||
}
|
||||
|
||||
cost := time.Since(before).Seconds() * 1000
|
||||
sizeMB := strconv.FormatInt(size, 10) + " Bytes"
|
||||
if size > 1024*1024*1024 {
|
||||
sizeMB = fmt.Sprintf("%.3f G", float64(size)/1024/1024/1024)
|
||||
} else if size > 1024*1024 {
|
||||
sizeMB = fmt.Sprintf("%.3f M", float64(size)/1024/1024)
|
||||
} else if size > 1024 {
|
||||
sizeMB = fmt.Sprintf("%.3f K", float64(size)/1024)
|
||||
}
|
||||
remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+" from '"+cacheDir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+strconv.Itoa(count)+", size: "+sizeMB)
|
||||
}()
|
||||
|
||||
// 配置
|
||||
cacheConfig := &serverconfigs.HTTPFileCacheStorage{}
|
||||
@@ -115,7 +83,7 @@ func (this *FileStorage) Init() error {
|
||||
return err
|
||||
}
|
||||
this.cacheConfig = cacheConfig
|
||||
cacheDir = cacheConfig.Dir
|
||||
cacheDir := cacheConfig.Dir
|
||||
|
||||
if !filepath.IsAbs(this.cacheConfig.Dir) {
|
||||
this.cacheConfig.Dir = Tea.Root + Tea.DS + this.cacheConfig.Dir
|
||||
@@ -127,6 +95,26 @@ func (this *FileStorage) Init() error {
|
||||
return errors.New("[CACHE]cache storage dir can not be empty")
|
||||
}
|
||||
|
||||
list := NewFileList(dir + "/p" + strconv.FormatInt(this.policy.Id, 10))
|
||||
err = list.Init()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
this.list = list
|
||||
stat, err := list.Stat(func(hash string) bool {
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
this.totalSize = stat.Size
|
||||
this.list.OnAdd(func(item *Item) {
|
||||
atomic.AddInt64(&this.totalSize, item.TotalSize())
|
||||
})
|
||||
this.list.OnRemove(func(item *Item) {
|
||||
atomic.AddInt64(&this.totalSize, -item.TotalSize())
|
||||
})
|
||||
|
||||
// 检查目录是否存在
|
||||
_, err = os.Stat(dir)
|
||||
if err != nil {
|
||||
@@ -140,6 +128,34 @@ func (this *FileStorage) Init() error {
|
||||
}
|
||||
}
|
||||
|
||||
defer func() {
|
||||
// 统计
|
||||
count := 0
|
||||
size := int64(0)
|
||||
if this.list != nil {
|
||||
stat, err := this.list.Stat(func(hash string) bool {
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
remotelogs.Error("CACHE", "stat cache "+strconv.FormatInt(this.policy.Id, 10)+" failed: "+err.Error())
|
||||
} else {
|
||||
count = stat.Count
|
||||
size = stat.Size
|
||||
}
|
||||
}
|
||||
|
||||
cost := time.Since(before).Seconds() * 1000
|
||||
sizeMB := strconv.FormatInt(size, 10) + " Bytes"
|
||||
if size > 1024*1024*1024 {
|
||||
sizeMB = fmt.Sprintf("%.3f G", float64(size)/1024/1024/1024)
|
||||
} else if size > 1024*1024 {
|
||||
sizeMB = fmt.Sprintf("%.3f M", float64(size)/1024/1024)
|
||||
} else if size > 1024 {
|
||||
sizeMB = fmt.Sprintf("%.3f K", float64(size)/1024)
|
||||
}
|
||||
remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+" from '"+cacheDir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+strconv.Itoa(count)+", size: "+sizeMB)
|
||||
}()
|
||||
|
||||
// 初始化list
|
||||
err = this.initList()
|
||||
if err != nil {
|
||||
@@ -188,10 +204,7 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) {
|
||||
}
|
||||
}
|
||||
|
||||
hash, path := this.keyPath(key)
|
||||
if !this.list.Exist(hash) {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
_, path := this.keyPath(key)
|
||||
|
||||
// TODO 尝试使用mmap加快读取速度
|
||||
fp, err := os.OpenFile(path, os.O_RDONLY, 0444)
|
||||
@@ -224,17 +237,21 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr
|
||||
}
|
||||
|
||||
// 检查是否超出最大值
|
||||
if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys {
|
||||
count, err := this.list.Count()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if this.policy.MaxKeys > 0 && count > this.policy.MaxKeys {
|
||||
return nil, errors.New("write file cache failed: too many keys in cache storage")
|
||||
}
|
||||
capacityBytes := this.diskCapacityBytes()
|
||||
if capacityBytes > 0 && capacityBytes <= this.totalSize {
|
||||
return nil, errors.New("write file cache failed: over disk size, real size: " + strconv.FormatInt(this.totalSize, 10) + " bytes")
|
||||
return nil, errors.New("write file cache failed: over disk size, current total size: " + strconv.FormatInt(this.totalSize, 10) + " bytes, capacity: " + strconv.FormatInt(capacityBytes, 10))
|
||||
}
|
||||
|
||||
hash := stringutil.Md5(key)
|
||||
dir := this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4]
|
||||
_, err := os.Stat(dir)
|
||||
_, err = os.Stat(dir)
|
||||
if err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
@@ -246,7 +263,10 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr
|
||||
}
|
||||
|
||||
// 先删除
|
||||
this.list.Remove(hash)
|
||||
err = this.list.Remove(hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
path := dir + "/" + hash + ".cache.tmp"
|
||||
writer, err := os.OpenFile(path, os.O_CREATE|os.O_SYNC|os.O_WRONLY, 0666)
|
||||
@@ -351,7 +371,10 @@ func (this *FileStorage) AddToList(item *Item) {
|
||||
|
||||
item.MetaSize = SizeMeta
|
||||
hash := stringutil.Md5(item.Key)
|
||||
this.list.Add(hash, item)
|
||||
err := this.list.Add(hash, item)
|
||||
if err != nil && !strings.Contains(err.Error(), "UNIQUE constraint failed") {
|
||||
remotelogs.Error("CACHE", "add to list failed: "+err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// Delete 删除某个键值对应的缓存
|
||||
@@ -365,8 +388,11 @@ func (this *FileStorage) Delete(key string) error {
|
||||
}
|
||||
|
||||
hash, path := this.keyPath(key)
|
||||
this.list.Remove(hash)
|
||||
err := os.Remove(path)
|
||||
err := this.list.Remove(hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = os.Remove(path)
|
||||
if err == nil || os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
@@ -380,7 +406,7 @@ func (this *FileStorage) Stat() (*Stat, error) {
|
||||
|
||||
return this.list.Stat(func(hash string) bool {
|
||||
return true
|
||||
}), nil
|
||||
})
|
||||
}
|
||||
|
||||
// CleanAll 清除所有的缓存
|
||||
@@ -393,7 +419,10 @@ func (this *FileStorage) CleanAll() error {
|
||||
_ = this.memoryStorage.CleanAll()
|
||||
}
|
||||
|
||||
this.list.Reset()
|
||||
err := this.list.CleanAll()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 删除缓存和目录
|
||||
// 不能直接删除子目录,比较危险
|
||||
@@ -455,7 +484,11 @@ func (this *FileStorage) Purge(keys []string, urlType string) error {
|
||||
if urlType == "dir" {
|
||||
resultKeys := []string{}
|
||||
for _, key := range keys {
|
||||
resultKeys = append(resultKeys, this.list.FindKeysWithPrefix(key)...)
|
||||
subKeys, err := this.list.FindKeysWithPrefix(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resultKeys = append(resultKeys, subKeys...)
|
||||
}
|
||||
keys = resultKeys
|
||||
}
|
||||
@@ -463,7 +496,11 @@ func (this *FileStorage) Purge(keys []string, urlType string) error {
|
||||
// 文件
|
||||
for _, key := range keys {
|
||||
hash, path := this.keyPath(key)
|
||||
if !this.list.Exist(hash) {
|
||||
exists, err := this.list.Exist(hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !exists {
|
||||
err := os.Remove(path)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
@@ -471,11 +508,14 @@ func (this *FileStorage) Purge(keys []string, urlType string) error {
|
||||
continue
|
||||
}
|
||||
|
||||
err := os.Remove(path)
|
||||
err = os.Remove(path)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
this.list.Remove(hash)
|
||||
err = this.list.Remove(hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -534,7 +574,10 @@ func (this *FileStorage) hashPath(hash string) (path string) {
|
||||
|
||||
// 初始化List
|
||||
func (this *FileStorage) initList() error {
|
||||
this.list.Reset()
|
||||
err := this.list.Reset()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dir := this.dir()
|
||||
|
||||
@@ -547,33 +590,6 @@ func (this *FileStorage) initList() error {
|
||||
_ = os.Remove(path)
|
||||
}
|
||||
|
||||
// 加载缓存
|
||||
files, err = filepath.Glob(dir + "/*/*/*.cache")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, path := range files {
|
||||
basename := filepath.Base(path)
|
||||
index := strings.LastIndex(basename, ".")
|
||||
if index < 0 {
|
||||
continue
|
||||
}
|
||||
hash := basename[:index]
|
||||
|
||||
// 解析文件信息
|
||||
item, err := this.decodeFile(path)
|
||||
if err != nil {
|
||||
if err != ErrNotFound {
|
||||
remotelogs.Error("CACHE", "decode path '"+path+"': "+err.Error())
|
||||
}
|
||||
continue
|
||||
}
|
||||
if item == nil {
|
||||
continue
|
||||
}
|
||||
this.list.Add(hash, item)
|
||||
}
|
||||
|
||||
// 启动定时清理任务
|
||||
this.ticker = utils.NewTicker(30 * time.Second)
|
||||
events.On(events.EventQuit, func() {
|
||||
@@ -686,12 +702,13 @@ func (this *FileStorage) decodeFile(path string) (*Item, error) {
|
||||
|
||||
// 清理任务
|
||||
func (this *FileStorage) purgeLoop() {
|
||||
this.list.Purge(1000, func(hash string) {
|
||||
_ = this.list.Purge(1000, func(hash string) error {
|
||||
path := this.hashPath(hash)
|
||||
err := os.Remove(path)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error())
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user