From 60a4016101b3ed041b024d3b5fd9decf28f3a046 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Fri, 29 Mar 2024 19:28:16 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8MMAP=E6=8F=90=E5=8D=87?= =?UTF-8?q?=E7=BC=93=E5=AD=98=E8=AF=BB=E5=8F=96=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/caches/reader_file_mmap.go | 29 ++++++++++++++++++++++++++ internal/caches/storage_file.go | 31 ++++++++++++++++++++-------- internal/nodes/http_request_cache.go | 17 +++++++++++---- internal/utils/mem/system.go | 21 ++++++++++++++++++- internal/utils/mem/system_test.go | 1 + 5 files changed, 85 insertions(+), 14 deletions(-) create mode 100644 internal/caches/reader_file_mmap.go diff --git a/internal/caches/reader_file_mmap.go b/internal/caches/reader_file_mmap.go new file mode 100644 index 0000000..4a26537 --- /dev/null +++ b/internal/caches/reader_file_mmap.go @@ -0,0 +1,29 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . +//go:build !plus + +package caches + +import ( + "errors" + "io" + "os" +) + +func IsValidForMMAP(fp *os.File) (ok bool, stat os.FileInfo) { + // stub + return +} + +type MMAPFileReader struct { + FileReader +} + +func NewMMAPFileReader(fp *os.File, stat os.FileInfo) (*MMAPFileReader, error) { + // stub + return &MMAPFileReader{}, errors.New("not implemented") +} + +func (this *MMAPFileReader) CopyBodyTo(writer io.Writer) (int, error) { + // stub + return 0, errors.New("not implemented") +} diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 36f0e16..7f64a9f 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -125,7 +125,7 @@ func (this *FileStorage) CanUpdatePolicy(newPolicy *serverconfigs.HTTPCachePolic if err != nil { return false } - var oldOptions = &serverconfigs.HTTPFileCacheStorage{} + var oldOptions = serverconfigs.NewHTTPFileCacheStorage() err = json.Unmarshal(oldOptionsJSON, oldOptions) if err != nil { return false @@ -135,7 +135,7 @@ func (this *FileStorage) CanUpdatePolicy(newPolicy *serverconfigs.HTTPCachePolic if err != nil { return false } - var newOptions = &serverconfigs.HTTPFileCacheStorage{} + var newOptions = serverconfigs.NewHTTPFileCacheStorage() err = json.Unmarshal(newOptionsJSON, newOptions) if err != nil { return false @@ -158,7 +158,7 @@ func (this *FileStorage) UpdatePolicy(newPolicy *serverconfigs.HTTPCachePolicy) if err != nil { return } - var newOptions = &serverconfigs.HTTPFileCacheStorage{} + var newOptions = serverconfigs.NewHTTPFileCacheStorage() err = json.Unmarshal(newOptionsJSON, newOptions) if err != nil { remotelogs.Error("CACHE", "update policy '"+types.String(this.policy.Id)+"' failed: decode options failed: "+err.Error()) @@ -223,7 +223,7 @@ func (this *FileStorage) Init() error { var before = time.Now() // 配置 - var options = &serverconfigs.HTTPFileCacheStorage{} + var options = serverconfigs.NewHTTPFileCacheStorage() optionsJSON, err := json.Marshal(this.policy.Options) if err != nil { return err @@ -370,7 +370,6 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool, } } - // TODO 尝试使用mmap加快读取速度 var isOk = false var openFile *OpenFile var openFileCache = this.openFileCache // 因为中间可能有修改,所以先赋值再获取 @@ -378,6 +377,7 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool, openFile = openFileCache.Get(path) } var fp *os.File + var err error if openFile == nil { fp, err = os.OpenFile(path, os.O_RDONLY, 0444) @@ -404,11 +404,24 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool, partialFileReader.openFileCache = openFileCache reader = partialFileReader } else { - var fileReader = NewFileReader(fp) - fileReader.openFile = openFile - fileReader.openFileCache = openFileCache - reader = fileReader + var options = this.options // copy + if options != nil && options.EnableMMAP { + if isValid, stat := IsValidForMMAP(fp); isValid { + reader, err = NewMMAPFileReader(fp, stat) + if err != nil { + return nil, err + } + } + } + + if reader == nil { + var fileReader = NewFileReader(fp) + fileReader.openFile = openFile + fileReader.openFileCache = openFileCache + reader = fileReader + } } + err = reader.Init() if err != nil { return nil, err diff --git a/internal/nodes/http_request_cache.go b/internal/nodes/http_request_cache.go index a1357fd..555483b 100644 --- a/internal/nodes/http_request_cache.go +++ b/internal/nodes/http_request_cache.go @@ -588,18 +588,27 @@ func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) { this.writer.Prepare(resp, fileSize, reader.Status(), false) this.writer.WriteHeader(reader.Status()) - var pool = this.bytePool(fileSize) - var bodyBuf = pool.Get() if storage.CanSendfile() { + var pool = this.bytePool(fileSize) + var bodyBuf = pool.Get() if fp, canSendFile := this.writer.canSendfile(); canSendFile { this.writer.sentBodyBytes, err = io.CopyBuffer(this.writer.rawWriter, fp, bodyBuf) } else { _, err = io.CopyBuffer(this.writer, resp.Body, bodyBuf) } + pool.Put(bodyBuf) } else { - _, err = io.CopyBuffer(this.writer, resp.Body, bodyBuf) + mmapReader, isMMAPReader := reader.(*caches.MMAPFileReader) + if isMMAPReader { + _, err = mmapReader.CopyBodyTo(this.writer) + } else { + var pool = this.bytePool(fileSize) + var bodyBuf = pool.Get() + _, err = io.CopyBuffer(this.writer, resp.Body, bodyBuf) + pool.Put(bodyBuf) + } } - pool.Put(bodyBuf) + if err == io.EOF { err = nil } diff --git a/internal/utils/mem/system.go b/internal/utils/mem/system.go index 0ab450b..74c64cb 100644 --- a/internal/utils/mem/system.go +++ b/internal/utils/mem/system.go @@ -4,11 +4,14 @@ package memutils import ( teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/shirou/gopsutil/v3/mem" + "time" ) var systemTotalMemory = -1 var systemMemoryBytes uint64 +var availableMemoryGB int func init() { if !teaconst.IsMain { @@ -16,6 +19,16 @@ func init() { } _ = SystemMemoryGB() + + goman.New(func() { + var ticker = time.NewTicker(10 * time.Second) + for range ticker.C { + stat, err := mem.VirtualMemory() + if err == nil { + availableMemoryGB = int(stat.Available >> 30) + } + } + }) } // SystemMemoryGB 系统内存GB数量 @@ -32,7 +45,8 @@ func SystemMemoryGB() int { systemMemoryBytes = stat.Total - systemTotalMemory = int(stat.Total / (1 << 30)) + availableMemoryGB = int(stat.Available >> 30) + systemTotalMemory = int(stat.Total >> 30) if systemTotalMemory <= 0 { systemTotalMemory = 1 } @@ -46,3 +60,8 @@ func SystemMemoryGB() int { func SystemMemoryBytes() uint64 { return systemMemoryBytes } + +// AvailableMemoryGB 获取当下可用内存GB数 +func AvailableMemoryGB() int { + return availableMemoryGB +} diff --git a/internal/utils/mem/system_test.go b/internal/utils/mem/system_test.go index b05e0d5..6169e4b 100644 --- a/internal/utils/mem/system_test.go +++ b/internal/utils/mem/system_test.go @@ -14,4 +14,5 @@ func TestSystemMemoryGB(t *testing.T) { t.Log(memutils.SystemMemoryBytes()) t.Log(memutils.SystemMemoryBytes()) t.Log(memutils.SystemMemoryBytes()>>30, "GB") + t.Log("available:", memutils.AvailableMemoryGB()) }