提升Partial Content的范围数据(ranges)写入效率

This commit is contained in:
刘祥超
2024-04-20 17:44:23 +08:00
parent c9dac96366
commit a7ad2cea8f
6 changed files with 184 additions and 3 deletions

View File

@@ -6,7 +6,6 @@ import (
"bytes"
"encoding/json"
"github.com/iwind/TeaGo/types"
"os"
"strconv"
)
@@ -70,7 +69,7 @@ func NewPartialRangesFromJSON(data []byte) (*PartialRanges, error) {
// NewPartialRangesFromFile 从文件中加载范围信息
func NewPartialRangesFromFile(path string) (*PartialRanges, error) {
data, err := os.ReadFile(path)
data, err := SharedPartialRangesQueue.Get(path)
if err != nil {
return nil, err
}
@@ -172,7 +171,8 @@ func (this *PartialRanges) Bytes() []byte {
// WriteToFile 写入到文件中
func (this *PartialRanges) WriteToFile(path string) error {
return os.WriteFile(path, this.Bytes(), 0666)
SharedPartialRangesQueue.Put(path, this.Bytes())
return nil
}
// Max 获取最大位置

View File

@@ -0,0 +1,144 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package caches
import (
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils/fnv"
memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem"
"os"
"sync"
)
var SharedPartialRangesQueue = NewPartialRangesQueue()
func init() {
if !teaconst.IsMain {
return
}
SharedPartialRangesQueue.Start()
}
const partialRangesQueueSharding = 8
// PartialRangesQueue ranges file writing queue
type PartialRangesQueue struct {
m [partialRangesQueueSharding]map[string][]byte // { filename => data, ... }
c chan string // filename1, ...
mu [partialRangesQueueSharding]*sync.RWMutex
}
// NewPartialRangesQueue Create new queue
func NewPartialRangesQueue() *PartialRangesQueue {
var queueSize = 512
var memGB = memutils.SystemMemoryGB()
if memGB > 16 {
queueSize = 8 << 10
} else if memGB > 8 {
queueSize = 4 << 10
} else if memGB > 4 {
queueSize = 2 << 10
} else if memGB > 2 {
queueSize = 1 << 10
}
var m = [partialRangesQueueSharding]map[string][]byte{}
var muList = [partialRangesQueueSharding]*sync.RWMutex{}
for i := 0; i < partialRangesQueueSharding; i++ {
muList[i] = &sync.RWMutex{}
m[i] = map[string][]byte{}
}
return &PartialRangesQueue{
mu: muList,
m: m,
c: make(chan string, queueSize),
}
}
// Start the queue
func (this *PartialRangesQueue) Start() {
goman.New(func() {
this.Dump()
})
}
// Put ranges data to filename
func (this *PartialRangesQueue) Put(filename string, data []byte) {
var index = this.indexForKey(filename)
this.mu[index].Lock()
this.m[index][filename] = data
this.mu[index].Unlock()
// always wait to finish
this.c <- filename
}
// Get ranges data from filename
func (this *PartialRangesQueue) Get(filename string) ([]byte, error) {
var index = this.indexForKey(filename)
this.mu[index].RLock()
data, ok := this.m[index][filename]
this.mu[index].RUnlock()
if ok {
return data, nil
}
return os.ReadFile(filename)
}
// Delete ranges filename
func (this *PartialRangesQueue) Delete(filename string) {
var index = this.indexForKey(filename)
this.mu[index].Lock()
delete(this.m[index], filename)
this.mu[index].Unlock()
}
// Dump ranges to filename from memory
func (this *PartialRangesQueue) Dump() {
for filename := range this.c {
var index = this.indexForKey(filename)
this.mu[index].Lock()
data, ok := this.m[index][filename]
if ok {
delete(this.m[index], filename)
}
this.mu[index].Unlock()
if !ok || len(data) == 0 {
continue
}
err := os.WriteFile(filename, data, 0666)
if err != nil {
remotelogs.Println("PARTIAL_RANGES_QUEUE", "write file '"+filename+"' failed: "+err.Error())
}
}
}
// Len count all files
func (this *PartialRangesQueue) Len() int {
var count int
for i := 0; i < partialRangesQueueSharding; i++ {
this.mu[i].RLock()
count += len(this.m[i])
this.mu[i].RUnlock()
}
return count
}
func (this *PartialRangesQueue) indexForKey(filename string) int {
return int(fnv.HashString(filename) % partialRangesQueueSharding)
}

View File

@@ -0,0 +1,31 @@
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package caches_test
import (
"github.com/TeaOSLab/EdgeNode/internal/caches"
"github.com/iwind/TeaGo/assert"
"testing"
)
func TestNewPartialRangesQueue(t *testing.T) {
var a = assert.NewAssertion(t)
var queue = caches.NewPartialRangesQueue()
queue.Put("a", []byte{1, 2, 3})
t.Log("add 'a':", queue.Len())
t.Log(queue.Get("a"))
a.IsTrue(queue.Len() == 1)
queue.Put("a", nil)
t.Log("add 'a':", queue.Len())
a.IsTrue(queue.Len() == 1)
queue.Put("b", nil)
t.Log("add 'b':", queue.Len())
a.IsTrue(queue.Len() == 2)
queue.Delete("a")
t.Log("delete 'a':", queue.Len())
a.IsTrue(queue.Len() == 1)
}

View File

@@ -145,6 +145,8 @@ func (this *PartialFileReader) IsCompleted() bool {
}
func (this *PartialFileReader) discard() error {
SharedPartialRangesQueue.Delete(this.rangePath)
_ = os.Remove(this.rangePath)
return this.FileReader.discard()
}

View File

@@ -1486,6 +1486,7 @@ func (this *FileStorage) removeCacheFile(path string) error {
_, statErr := os.Stat(partialPath)
if statErr == nil {
_ = os.Remove(partialPath)
SharedPartialRangesQueue.Delete(partialPath)
}
}
return err

View File

@@ -228,6 +228,7 @@ func (this *PartialFileWriter) Discard() error {
_ = this.rawWriter.Close()
fsutils.WriteEnd()
SharedPartialRangesQueue.Delete(this.rangePath)
_ = os.Remove(this.rangePath)
err := os.Remove(this.rawWriter.Name())
@@ -261,5 +262,7 @@ func (this *PartialFileWriter) IsNew() bool {
func (this *PartialFileWriter) remove() {
_ = os.Remove(this.rawWriter.Name())
SharedPartialRangesQueue.Delete(this.rangePath)
_ = os.Remove(this.rangePath)
}