mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-03 06:40:25 +08:00
实现open file cache
This commit is contained in:
7
go.mod
7
go.mod
@@ -2,7 +2,9 @@ module github.com/TeaOSLab/EdgeNode
|
||||
|
||||
go 1.15
|
||||
|
||||
replace github.com/TeaOSLab/EdgeCommon => ../EdgeCommon
|
||||
replace (
|
||||
github.com/TeaOSLab/EdgeCommon => ../EdgeCommon
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/TeaOSLab/EdgeCommon v0.0.0-00010101000000-000000000000
|
||||
@@ -11,6 +13,7 @@ require (
|
||||
github.com/cespare/xxhash v1.1.0
|
||||
github.com/chai2010/webp v1.1.0 // indirect
|
||||
github.com/dchest/captcha v0.0.0-20200903113550-03f5f0333e1f
|
||||
github.com/fsnotify/fsnotify v1.5.1 // indirect
|
||||
github.com/go-yaml/yaml v2.1.0+incompatible
|
||||
github.com/golang/protobuf v1.5.2
|
||||
github.com/iwind/TeaGo v0.0.0-20211026123858-7de7a21cad24
|
||||
@@ -28,7 +31,7 @@ require (
|
||||
github.com/yusufpapurcu/wmi v1.2.2 // indirect
|
||||
golang.org/x/image v0.0.0-20211028202545-6944b10bf410
|
||||
golang.org/x/net v0.0.0-20211215060638-4ddde0e984e9
|
||||
golang.org/x/sys v0.0.0-20211214234402-4825e8c3871d
|
||||
golang.org/x/sys v0.0.0-20220111092808-5a964db01320
|
||||
golang.org/x/text v0.3.7
|
||||
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
|
||||
google.golang.org/grpc v1.43.0
|
||||
|
||||
7
go.sum
7
go.sum
@@ -43,6 +43,8 @@ github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.
|
||||
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
|
||||
github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
|
||||
github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
|
||||
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
|
||||
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
|
||||
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
|
||||
@@ -198,8 +200,11 @@ golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20211214234402-4825e8c3871d h1:1oIt9o40TWWI9FUaveVpUvBe13FNqBNVXy3ue2fcfkw=
|
||||
golang.org/x/sys v0.0.0-20211214234402-4825e8c3871d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220111092808-5a964db01320 h1:0jf+tOCoZ3LyutmCOWpVni1chK4VfFLhRsDK7MhqGRY=
|
||||
golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
||||
@@ -271,5 +276,3 @@ gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclp
|
||||
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
rogchap.com/v8go v0.7.0 h1:kgjbiO4zE5itA962ze6Hqmbs4HgZbGzmueCXsZtremg=
|
||||
rogchap.com/v8go v0.7.0/go.mod h1:MxgP3pL2MW4dpme/72QRs8sgNMmM0pRc8DPhcuLWPAs=
|
||||
|
||||
@@ -85,7 +85,7 @@ func (this *Manager) UpdatePolicies(newPolicies []*serverconfigs.HTTPCachePolicy
|
||||
for _, policy := range this.policyMap {
|
||||
storage, ok := this.storageMap[policy.Id]
|
||||
if !ok {
|
||||
storage := this.NewStorageWithPolicy(policy)
|
||||
storage = this.NewStorageWithPolicy(policy)
|
||||
if storage == nil {
|
||||
remotelogs.Error("CACHE", "can not find storage type '"+policy.Type+"'")
|
||||
continue
|
||||
@@ -106,7 +106,7 @@ func (this *Manager) UpdatePolicies(newPolicies []*serverconfigs.HTTPCachePolicy
|
||||
delete(this.storageMap, policy.Id)
|
||||
|
||||
// 启动新的
|
||||
storage := this.NewStorageWithPolicy(policy)
|
||||
storage = this.NewStorageWithPolicy(policy)
|
||||
if storage == nil {
|
||||
remotelogs.Error("CACHE", "can not find storage type '"+policy.Type+"'")
|
||||
continue
|
||||
|
||||
31
internal/caches/open_file.go
Normal file
31
internal/caches/open_file.go
Normal file
@@ -0,0 +1,31 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package caches
|
||||
|
||||
import (
|
||||
"io"
|
||||
"os"
|
||||
)
|
||||
|
||||
type OpenFile struct {
|
||||
fp *os.File
|
||||
meta []byte
|
||||
version int64
|
||||
}
|
||||
|
||||
func NewOpenFile(fp *os.File, meta []byte) *OpenFile {
|
||||
return &OpenFile{
|
||||
fp: fp,
|
||||
meta: meta,
|
||||
}
|
||||
}
|
||||
|
||||
func (this *OpenFile) SeekStart() error {
|
||||
_, err := this.fp.Seek(0, io.SeekStart)
|
||||
return err
|
||||
}
|
||||
|
||||
func (this *OpenFile) Close() error {
|
||||
this.meta = nil
|
||||
return this.fp.Close()
|
||||
}
|
||||
155
internal/caches/open_file_cache.go
Normal file
155
internal/caches/open_file_cache.go
Normal file
@@ -0,0 +1,155 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package caches
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils/linkedlist"
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"github.com/iwind/TeaGo/logs"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type OpenFileCache struct {
|
||||
poolMap map[string]*OpenFilePool // file path => Pool
|
||||
poolList *linkedlist.List
|
||||
watcher *fsnotify.Watcher
|
||||
|
||||
locker sync.Mutex
|
||||
|
||||
maxSize int
|
||||
count int
|
||||
}
|
||||
|
||||
func NewOpenFileCache(maxSize int) (*OpenFileCache, error) {
|
||||
if maxSize <= 0 {
|
||||
maxSize = 16384
|
||||
}
|
||||
|
||||
var cache = &OpenFileCache{
|
||||
maxSize: maxSize,
|
||||
poolMap: map[string]*OpenFilePool{},
|
||||
poolList: linkedlist.NewList(),
|
||||
}
|
||||
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cache.watcher = watcher
|
||||
|
||||
goman.New(func() {
|
||||
for event := range watcher.Events {
|
||||
if event.Op&fsnotify.Chmod != fsnotify.Chmod {
|
||||
cache.Close(event.Name)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
return cache, nil
|
||||
}
|
||||
|
||||
func (this *OpenFileCache) Get(filename string) *OpenFile {
|
||||
this.locker.Lock()
|
||||
defer this.locker.Unlock()
|
||||
pool, ok := this.poolMap[filename]
|
||||
if ok {
|
||||
file, consumed := pool.Get()
|
||||
if consumed {
|
||||
this.count--
|
||||
}
|
||||
return file
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *OpenFileCache) Put(filename string, file *OpenFile) {
|
||||
this.locker.Lock()
|
||||
defer this.locker.Unlock()
|
||||
|
||||
pool, ok := this.poolMap[filename]
|
||||
var success bool
|
||||
if ok {
|
||||
success = pool.Put(file)
|
||||
} else {
|
||||
_ = this.watcher.Add(filename)
|
||||
pool = NewOpenFilePool(filename)
|
||||
this.poolMap[filename] = pool
|
||||
success = pool.Put(file)
|
||||
}
|
||||
this.poolList.Push(pool.linkItem)
|
||||
|
||||
// 检查长度
|
||||
if success {
|
||||
this.count++
|
||||
|
||||
// 如果超过当前容量,则关闭多余的
|
||||
for this.count > this.maxSize {
|
||||
var head = this.poolList.Head()
|
||||
if head == nil {
|
||||
break
|
||||
}
|
||||
|
||||
var headPool = head.Value.(*OpenFilePool)
|
||||
headFile, consumed := headPool.Get()
|
||||
if consumed {
|
||||
this.count--
|
||||
if headFile != nil {
|
||||
_ = headFile.Close()
|
||||
}
|
||||
}
|
||||
|
||||
if headPool.Len() == 0 {
|
||||
delete(this.poolMap, headPool.filename)
|
||||
this.poolList.Remove(head)
|
||||
_ = this.watcher.Remove(headPool.filename)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (this *OpenFileCache) Close(filename string) {
|
||||
this.locker.Lock()
|
||||
|
||||
pool, ok := this.poolMap[filename]
|
||||
if ok {
|
||||
delete(this.poolMap, filename)
|
||||
this.poolList.Remove(pool.linkItem)
|
||||
_ = this.watcher.Remove(filename)
|
||||
this.count -= pool.Len()
|
||||
}
|
||||
|
||||
this.locker.Unlock()
|
||||
|
||||
// 在locker之外,提升性能
|
||||
if ok {
|
||||
pool.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (this *OpenFileCache) CloseAll() {
|
||||
this.locker.Lock()
|
||||
for _, pool := range this.poolMap {
|
||||
pool.Close()
|
||||
}
|
||||
this.poolMap = map[string]*OpenFilePool{}
|
||||
this.poolList.Reset()
|
||||
_ = this.watcher.Close()
|
||||
this.locker.Unlock()
|
||||
}
|
||||
|
||||
func (this *OpenFileCache) Debug() {
|
||||
var ticker = time.NewTicker(5 * time.Second)
|
||||
goman.New(func() {
|
||||
for range ticker.C {
|
||||
logs.Println("==== " + types.String(this.count) + " ====")
|
||||
this.poolList.Range(func(item *linkedlist.Item) (goNext bool) {
|
||||
logs.Println(filepath.Base(item.Value.(*OpenFilePool).Filename()), item.Value.(*OpenFilePool).Len())
|
||||
return true
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
76
internal/caches/open_file_pool.go
Normal file
76
internal/caches/open_file_pool.go
Normal file
@@ -0,0 +1,76 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package caches
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils/linkedlist"
|
||||
)
|
||||
|
||||
type OpenFilePool struct {
|
||||
c chan *OpenFile
|
||||
linkItem *linkedlist.Item
|
||||
filename string
|
||||
version int64
|
||||
}
|
||||
|
||||
func NewOpenFilePool(filename string) *OpenFilePool {
|
||||
var pool = &OpenFilePool{
|
||||
filename: filename,
|
||||
c: make(chan *OpenFile, 1024),
|
||||
version: utils.UnixTimeMilli(),
|
||||
}
|
||||
pool.linkItem = linkedlist.NewItem(pool)
|
||||
return pool
|
||||
}
|
||||
|
||||
func (this *OpenFilePool) Filename() string {
|
||||
return this.filename
|
||||
}
|
||||
|
||||
func (this *OpenFilePool) Get() (*OpenFile, bool) {
|
||||
select {
|
||||
case file := <-this.c:
|
||||
err := file.SeekStart()
|
||||
if err != nil {
|
||||
_ = file.Close()
|
||||
return nil, true
|
||||
}
|
||||
file.version = this.version
|
||||
|
||||
return file, true
|
||||
default:
|
||||
return nil, false
|
||||
}
|
||||
}
|
||||
|
||||
func (this *OpenFilePool) Put(file *OpenFile) bool {
|
||||
if file.version > 0 && file.version != this.version {
|
||||
_ = file.Close()
|
||||
return false
|
||||
}
|
||||
select {
|
||||
case this.c <- file:
|
||||
return true
|
||||
default:
|
||||
// 多余的直接关闭
|
||||
_ = file.Close()
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (this *OpenFilePool) Len() int {
|
||||
return len(this.c)
|
||||
}
|
||||
|
||||
func (this *OpenFilePool) Close() {
|
||||
Loop:
|
||||
for {
|
||||
select {
|
||||
case file := <-this.c:
|
||||
_ = file.Close()
|
||||
default:
|
||||
break Loop
|
||||
}
|
||||
}
|
||||
}
|
||||
17
internal/caches/open_file_pool_test.go
Normal file
17
internal/caches/open_file_pool_test.go
Normal file
@@ -0,0 +1,17 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package caches_test
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/caches"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestOpenFilePool_Get(t *testing.T) {
|
||||
var pool = caches.NewOpenFilePool("a")
|
||||
t.Log(pool.Filename())
|
||||
t.Log(pool.Get())
|
||||
t.Log(pool.Put(caches.NewOpenFile(nil, nil)))
|
||||
t.Log(pool.Get())
|
||||
t.Log(pool.Get())
|
||||
}
|
||||
@@ -11,6 +11,10 @@ import (
|
||||
type FileReader struct {
|
||||
fp *os.File
|
||||
|
||||
openFile *OpenFile
|
||||
openFileCache *OpenFileCache
|
||||
|
||||
meta []byte
|
||||
expiresAt int64
|
||||
status int
|
||||
headerOffset int64
|
||||
@@ -35,13 +39,17 @@ func (this *FileReader) Init() error {
|
||||
}
|
||||
}()
|
||||
|
||||
var buf = make([]byte, SizeMeta)
|
||||
ok, err := this.readToBuff(this.fp, buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
return ErrNotFound
|
||||
var buf = this.meta
|
||||
if len(buf) == 0 {
|
||||
buf = make([]byte, SizeMeta)
|
||||
ok, err := this.readToBuff(this.fp, buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
return ErrNotFound
|
||||
}
|
||||
this.meta = buf
|
||||
}
|
||||
|
||||
this.expiresAt = int64(binary.BigEndian.Uint32(buf[:SizeExpiresAt]))
|
||||
@@ -323,6 +331,14 @@ func (this *FileReader) ReadBodyRange(buf []byte, start int64, end int64, callba
|
||||
}
|
||||
|
||||
func (this *FileReader) Close() error {
|
||||
if this.openFileCache != nil {
|
||||
if this.openFile != nil {
|
||||
this.openFileCache.Put(this.fp.Name(), this.openFile)
|
||||
} else {
|
||||
this.openFileCache.Put(this.fp.Name(), NewOpenFile(this.fp, this.meta))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return this.fp.Close()
|
||||
}
|
||||
|
||||
|
||||
@@ -14,12 +14,12 @@ import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/zero"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
"github.com/iwind/TeaGo/logs"
|
||||
"github.com/iwind/TeaGo/rands"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
stringutil "github.com/iwind/TeaGo/utils/string"
|
||||
"golang.org/x/text/language"
|
||||
"golang.org/x/text/message"
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@@ -67,6 +67,8 @@ type FileStorage struct {
|
||||
hotMapLocker sync.Mutex
|
||||
lastHotSize int
|
||||
hotTicker *utils.Ticker
|
||||
|
||||
openFileCache *OpenFileCache
|
||||
}
|
||||
|
||||
func NewFileStorage(policy *serverconfigs.HTTPCachePolicy) *FileStorage {
|
||||
@@ -100,13 +102,13 @@ func (this *FileStorage) Init() error {
|
||||
return err
|
||||
}
|
||||
this.cacheConfig = cacheConfig
|
||||
cacheDir := cacheConfig.Dir
|
||||
|
||||
if !filepath.IsAbs(this.cacheConfig.Dir) {
|
||||
this.cacheConfig.Dir = Tea.Root + Tea.DS + this.cacheConfig.Dir
|
||||
}
|
||||
|
||||
dir := this.cacheConfig.Dir
|
||||
this.cacheConfig.Dir = filepath.Clean(this.cacheConfig.Dir)
|
||||
var dir = this.cacheConfig.Dir
|
||||
|
||||
if len(dir) == 0 {
|
||||
return errors.New("[CACHE]cache storage dir can not be empty")
|
||||
@@ -159,7 +161,7 @@ func (this *FileStorage) Init() error {
|
||||
} else if size > 1024 {
|
||||
sizeMB = fmt.Sprintf("%.3f K", float64(size)/1024)
|
||||
}
|
||||
remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+" from '"+cacheDir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+message.NewPrinter(language.English).Sprintf("%d", count)+", size: "+sizeMB)
|
||||
remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+" from '"+this.cacheConfig.Dir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+message.NewPrinter(language.English).Sprintf("%d", count)+", size: "+sizeMB)
|
||||
}()
|
||||
|
||||
// 初始化list
|
||||
@@ -202,6 +204,15 @@ func (this *FileStorage) Init() error {
|
||||
}
|
||||
}
|
||||
|
||||
// open file cache
|
||||
if this.cacheConfig.OpenFileCache != nil && this.cacheConfig.OpenFileCache.IsOn && this.cacheConfig.OpenFileCache.Max > 0 {
|
||||
this.openFileCache, err = NewOpenFileCache(this.cacheConfig.OpenFileCache.Max)
|
||||
logs.Println("start open file cache")
|
||||
if err != nil {
|
||||
remotelogs.Error("CACHE", "open file cache failed: "+err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -238,7 +249,18 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool)
|
||||
|
||||
// TODO 尝试使用mmap加快读取速度
|
||||
var isOk = false
|
||||
fp, err := os.OpenFile(path, os.O_RDONLY, 0444)
|
||||
var openFile *OpenFile
|
||||
if this.openFileCache != nil {
|
||||
openFile = this.openFileCache.Get(path)
|
||||
}
|
||||
var fp *os.File
|
||||
var err error
|
||||
var meta []byte
|
||||
if openFile != nil {
|
||||
fp, meta = openFile.fp, openFile.meta
|
||||
} else {
|
||||
fp, err = os.OpenFile(path, os.O_RDONLY, 0444)
|
||||
}
|
||||
if err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
@@ -253,8 +275,10 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool)
|
||||
}()
|
||||
|
||||
reader := NewFileReader(fp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
reader.openFile = openFile
|
||||
reader.openFileCache = this.openFileCache
|
||||
if len(meta) > 0 {
|
||||
reader.meta = meta
|
||||
}
|
||||
err = reader.Init()
|
||||
if err != nil {
|
||||
@@ -623,6 +647,8 @@ func (this *FileStorage) Purge(keys []string, urlType string) error {
|
||||
|
||||
// Stop 停止
|
||||
func (this *FileStorage) Stop() {
|
||||
events.Remove(this)
|
||||
|
||||
this.locker.Lock()
|
||||
defer this.locker.Unlock()
|
||||
|
||||
@@ -640,6 +666,10 @@ func (this *FileStorage) Stop() {
|
||||
}
|
||||
|
||||
_ = this.list.Close()
|
||||
|
||||
if this.openFileCache != nil {
|
||||
this.openFileCache.CloseAll()
|
||||
}
|
||||
}
|
||||
|
||||
// TotalDiskSize 消耗的磁盘尺寸
|
||||
@@ -702,11 +732,20 @@ func (this *FileStorage) initList() error {
|
||||
}
|
||||
}
|
||||
this.purgeTicker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second)
|
||||
events.On(events.EventQuit, func() {
|
||||
events.OnKey(events.EventQuit, this, func() {
|
||||
remotelogs.Println("CACHE", "quit clean timer")
|
||||
var ticker = this.purgeTicker
|
||||
if ticker != nil {
|
||||
ticker.Stop()
|
||||
|
||||
{
|
||||
var ticker = this.purgeTicker
|
||||
if ticker != nil {
|
||||
ticker.Stop()
|
||||
}
|
||||
}
|
||||
{
|
||||
var ticker = this.hotTicker
|
||||
if ticker != nil {
|
||||
ticker.Stop()
|
||||
}
|
||||
}
|
||||
})
|
||||
goman.New(func() {
|
||||
@@ -733,98 +772,6 @@ func (this *FileStorage) initList() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 解析文件信息
|
||||
func (this *FileStorage) decodeFile(path string) (*Item, error) {
|
||||
fp, err := os.OpenFile(path, os.O_RDONLY, 0444)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
isAllOk := false
|
||||
defer func() {
|
||||
_ = fp.Close()
|
||||
|
||||
if !isAllOk {
|
||||
_ = os.Remove(path)
|
||||
}
|
||||
}()
|
||||
|
||||
item := &Item{
|
||||
Type: ItemTypeFile,
|
||||
MetaSize: SizeMeta,
|
||||
}
|
||||
|
||||
bytes4 := make([]byte, 4)
|
||||
|
||||
// 过期时间
|
||||
ok, err := this.readToBuff(fp, bytes4)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !ok {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
item.ExpiredAt = int64(binary.BigEndian.Uint32(bytes4))
|
||||
|
||||
// 是否已过期
|
||||
if item.ExpiredAt < time.Now().Unix() {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
// URL Size
|
||||
_, err = fp.Seek(int64(SizeExpiresAt+SizeStatus), io.SeekStart)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ok, err = this.readToBuff(fp, bytes4)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !ok {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
urlSize := binary.BigEndian.Uint32(bytes4)
|
||||
|
||||
// Header Size
|
||||
ok, err = this.readToBuff(fp, bytes4)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !ok {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
item.HeaderSize = int64(binary.BigEndian.Uint32(bytes4))
|
||||
|
||||
// Body Size
|
||||
bytes8 := make([]byte, 8)
|
||||
ok, err = this.readToBuff(fp, bytes8)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !ok {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
item.BodySize = int64(binary.BigEndian.Uint64(bytes8))
|
||||
|
||||
// URL
|
||||
if urlSize > 0 {
|
||||
data := utils.BytePool1k.Get()
|
||||
result, ok, err := this.readN(fp, data, int(urlSize))
|
||||
utils.BytePool1k.Put(data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !ok {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
item.Key = string(result)
|
||||
}
|
||||
|
||||
isAllOk = true
|
||||
|
||||
return item, nil
|
||||
}
|
||||
|
||||
// 清理任务
|
||||
func (this *FileStorage) purgeLoop() {
|
||||
// 计算是否应该开启LFU清理
|
||||
@@ -1008,34 +955,6 @@ func (this *FileStorage) hotLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
func (this *FileStorage) readToBuff(fp *os.File, buf []byte) (ok bool, err error) {
|
||||
n, err := fp.Read(buf)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
ok = n == len(buf)
|
||||
return
|
||||
}
|
||||
|
||||
func (this *FileStorage) readN(fp *os.File, buf []byte, total int) (result []byte, ok bool, err error) {
|
||||
for {
|
||||
n, err := fp.Read(buf)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
if n > 0 {
|
||||
if n >= total {
|
||||
result = append(result, buf[:total]...)
|
||||
ok = true
|
||||
return result, ok, nil
|
||||
} else {
|
||||
total -= n
|
||||
result = append(result, buf[:n]...)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (this *FileStorage) diskCapacityBytes() int64 {
|
||||
c1 := this.policy.CapacityBytes()
|
||||
if SharedManager.MaxDiskCapacity != nil {
|
||||
|
||||
14
internal/utils/linkedlist/item.go
Normal file
14
internal/utils/linkedlist/item.go
Normal file
@@ -0,0 +1,14 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package linkedlist
|
||||
|
||||
type Item struct {
|
||||
prev *Item
|
||||
next *Item
|
||||
|
||||
Value interface{}
|
||||
}
|
||||
|
||||
func NewItem(value interface{}) *Item {
|
||||
return &Item{Value: value}
|
||||
}
|
||||
93
internal/utils/linkedlist/list.go
Normal file
93
internal/utils/linkedlist/list.go
Normal file
@@ -0,0 +1,93 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package linkedlist
|
||||
|
||||
type List struct {
|
||||
head *Item
|
||||
end *Item
|
||||
count int
|
||||
}
|
||||
|
||||
func NewList() *List {
|
||||
return &List{}
|
||||
}
|
||||
|
||||
func (this *List) Head() *Item {
|
||||
return this.head
|
||||
}
|
||||
|
||||
func (this *List) End() *Item {
|
||||
return this.end
|
||||
}
|
||||
|
||||
func (this *List) Push(item *Item) {
|
||||
if item == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// 如果已经在末尾了,则do nothing
|
||||
if this.end == item {
|
||||
return
|
||||
}
|
||||
|
||||
if item.prev != nil || item.next != nil || this.head == item {
|
||||
this.Remove(item)
|
||||
}
|
||||
this.add(item)
|
||||
}
|
||||
|
||||
func (this *List) Remove(item *Item) {
|
||||
if item == nil {
|
||||
return
|
||||
}
|
||||
if item.prev != nil {
|
||||
item.prev.next = item.next
|
||||
}
|
||||
if item.next != nil {
|
||||
item.next.prev = item.prev
|
||||
}
|
||||
if item == this.head {
|
||||
this.head = item.next
|
||||
}
|
||||
if item == this.end {
|
||||
this.end = item.prev
|
||||
}
|
||||
|
||||
item.prev = nil
|
||||
item.next = nil
|
||||
this.count--
|
||||
}
|
||||
|
||||
func (this *List) Len() int {
|
||||
return this.count
|
||||
}
|
||||
|
||||
func (this *List) Range(f func(item *Item) (goNext bool)) {
|
||||
for e := this.head; e != nil; e = e.next {
|
||||
goNext := f(e)
|
||||
if !goNext {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (this *List) Reset() {
|
||||
this.head = nil
|
||||
this.end = nil
|
||||
}
|
||||
|
||||
func (this *List) add(item *Item) {
|
||||
if item == nil {
|
||||
return
|
||||
}
|
||||
if this.end != nil {
|
||||
this.end.next = item
|
||||
item.prev = this.end
|
||||
item.next = nil
|
||||
}
|
||||
this.end = item
|
||||
if this.head == nil {
|
||||
this.head = item
|
||||
}
|
||||
this.count++
|
||||
}
|
||||
82
internal/utils/linkedlist/list_test.go
Normal file
82
internal/utils/linkedlist/list_test.go
Normal file
@@ -0,0 +1,82 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package linkedlist_test
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils/linkedlist"
|
||||
"runtime"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestNewList_Memory(t *testing.T) {
|
||||
var stat1 = &runtime.MemStats{}
|
||||
runtime.ReadMemStats(stat1)
|
||||
|
||||
var list = linkedlist.NewList()
|
||||
for i := 0; i < 1_000_000; i++ {
|
||||
var item = &linkedlist.Item{}
|
||||
list.Push(item)
|
||||
}
|
||||
|
||||
var stat2 = &runtime.MemStats{}
|
||||
runtime.ReadMemStats(stat2)
|
||||
t.Log((stat2.HeapInuse-stat1.HeapInuse)/1024/1024, "MB")
|
||||
t.Log(list.Len())
|
||||
|
||||
var count = 0
|
||||
list.Range(func(item *linkedlist.Item) (goNext bool) {
|
||||
count++
|
||||
return true
|
||||
})
|
||||
t.Log(count)
|
||||
}
|
||||
|
||||
func TestList_Push(t *testing.T) {
|
||||
var list = linkedlist.NewList()
|
||||
list.Push(linkedlist.NewItem(1))
|
||||
list.Push(linkedlist.NewItem(2))
|
||||
|
||||
var item3 = linkedlist.NewItem(3)
|
||||
list.Push(item3)
|
||||
|
||||
var item4 = linkedlist.NewItem(4)
|
||||
list.Push(item4)
|
||||
list.Range(func(item *linkedlist.Item) (goNext bool) {
|
||||
t.Log(item.Value)
|
||||
return true
|
||||
})
|
||||
|
||||
t.Log("=== after push3 ===")
|
||||
list.Push(item3)
|
||||
list.Range(func(item *linkedlist.Item) (goNext bool) {
|
||||
t.Log(item.Value)
|
||||
return true
|
||||
})
|
||||
|
||||
t.Log("=== after push4 ===")
|
||||
list.Push(item4)
|
||||
list.Push(item3)
|
||||
list.Push(item3)
|
||||
list.Push(item3)
|
||||
list.Push(item4)
|
||||
list.Push(item4)
|
||||
list.Range(func(item *linkedlist.Item) (goNext bool) {
|
||||
t.Log(item.Value)
|
||||
return true
|
||||
})
|
||||
|
||||
t.Log("=== after remove ===")
|
||||
list.Remove(item3)
|
||||
list.Range(func(item *linkedlist.Item) (goNext bool) {
|
||||
t.Log(item.Value)
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkList_Add(b *testing.B) {
|
||||
var list = linkedlist.NewList()
|
||||
for i := 0; i < b.N; i++ {
|
||||
var item = &linkedlist.Item{}
|
||||
list.Push(item)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user