diff --git a/internal/caches/partial_ranges.go b/internal/caches/partial_ranges.go index dad4841..ab359e6 100644 --- a/internal/caches/partial_ranges.go +++ b/internal/caches/partial_ranges.go @@ -6,7 +6,6 @@ import ( "bytes" "encoding/json" "github.com/iwind/TeaGo/types" - "os" "strconv" ) @@ -70,7 +69,7 @@ func NewPartialRangesFromJSON(data []byte) (*PartialRanges, error) { // NewPartialRangesFromFile 从文件中加载范围信息 func NewPartialRangesFromFile(path string) (*PartialRanges, error) { - data, err := os.ReadFile(path) + data, err := SharedPartialRangesQueue.Get(path) if err != nil { return nil, err } @@ -172,7 +171,8 @@ func (this *PartialRanges) Bytes() []byte { // WriteToFile 写入到文件中 func (this *PartialRanges) WriteToFile(path string) error { - return os.WriteFile(path, this.Bytes(), 0666) + SharedPartialRangesQueue.Put(path, this.Bytes()) + return nil } // Max 获取最大位置 diff --git a/internal/caches/partial_ranges_queue.go b/internal/caches/partial_ranges_queue.go new file mode 100644 index 0000000..b0e2c51 --- /dev/null +++ b/internal/caches/partial_ranges_queue.go @@ -0,0 +1,144 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package caches + +import ( + teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "github.com/TeaOSLab/EdgeNode/internal/goman" + "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/TeaOSLab/EdgeNode/internal/utils/fnv" + memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem" + "os" + "sync" +) + +var SharedPartialRangesQueue = NewPartialRangesQueue() + +func init() { + if !teaconst.IsMain { + return + } + + SharedPartialRangesQueue.Start() +} + +const partialRangesQueueSharding = 8 + +// PartialRangesQueue ranges file writing queue +type PartialRangesQueue struct { + m [partialRangesQueueSharding]map[string][]byte // { filename => data, ... } + + c chan string // filename1, ... + mu [partialRangesQueueSharding]*sync.RWMutex +} + +// NewPartialRangesQueue Create new queue +func NewPartialRangesQueue() *PartialRangesQueue { + var queueSize = 512 + var memGB = memutils.SystemMemoryGB() + if memGB > 16 { + queueSize = 8 << 10 + } else if memGB > 8 { + queueSize = 4 << 10 + } else if memGB > 4 { + queueSize = 2 << 10 + } else if memGB > 2 { + queueSize = 1 << 10 + } + + var m = [partialRangesQueueSharding]map[string][]byte{} + var muList = [partialRangesQueueSharding]*sync.RWMutex{} + for i := 0; i < partialRangesQueueSharding; i++ { + muList[i] = &sync.RWMutex{} + m[i] = map[string][]byte{} + } + + return &PartialRangesQueue{ + mu: muList, + m: m, + c: make(chan string, queueSize), + } +} + +// Start the queue +func (this *PartialRangesQueue) Start() { + goman.New(func() { + this.Dump() + }) +} + +// Put ranges data to filename +func (this *PartialRangesQueue) Put(filename string, data []byte) { + var index = this.indexForKey(filename) + + this.mu[index].Lock() + this.m[index][filename] = data + this.mu[index].Unlock() + + // always wait to finish + this.c <- filename +} + +// Get ranges data from filename +func (this *PartialRangesQueue) Get(filename string) ([]byte, error) { + var index = this.indexForKey(filename) + + this.mu[index].RLock() + data, ok := this.m[index][filename] + this.mu[index].RUnlock() + + if ok { + return data, nil + } + + return os.ReadFile(filename) +} + +// Delete ranges filename +func (this *PartialRangesQueue) Delete(filename string) { + var index = this.indexForKey(filename) + + this.mu[index].Lock() + delete(this.m[index], filename) + this.mu[index].Unlock() +} + +// Dump ranges to filename from memory +func (this *PartialRangesQueue) Dump() { + for filename := range this.c { + var index = this.indexForKey(filename) + + this.mu[index].Lock() + data, ok := this.m[index][filename] + if ok { + delete(this.m[index], filename) + } + this.mu[index].Unlock() + + if !ok || len(data) == 0 { + continue + } + + err := os.WriteFile(filename, data, 0666) + if err != nil { + remotelogs.Println("PARTIAL_RANGES_QUEUE", "write file '"+filename+"' failed: "+err.Error()) + } + } +} + +// Len count all files +func (this *PartialRangesQueue) Len() int { + var count int + + for i := 0; i < partialRangesQueueSharding; i++ { + this.mu[i].RLock() + count += len(this.m[i]) + this.mu[i].RUnlock() + } + + return count +} + +func (this *PartialRangesQueue) indexForKey(filename string) int { + return int(fnv.HashString(filename) % partialRangesQueueSharding) +} diff --git a/internal/caches/partial_ranges_queue_test.go b/internal/caches/partial_ranges_queue_test.go new file mode 100644 index 0000000..20c6f66 --- /dev/null +++ b/internal/caches/partial_ranges_queue_test.go @@ -0,0 +1,31 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package caches_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/caches" + "github.com/iwind/TeaGo/assert" + "testing" +) + +func TestNewPartialRangesQueue(t *testing.T) { + var a = assert.NewAssertion(t) + + var queue = caches.NewPartialRangesQueue() + queue.Put("a", []byte{1, 2, 3}) + t.Log("add 'a':", queue.Len()) + t.Log(queue.Get("a")) + a.IsTrue(queue.Len() == 1) + + queue.Put("a", nil) + t.Log("add 'a':", queue.Len()) + a.IsTrue(queue.Len() == 1) + + queue.Put("b", nil) + t.Log("add 'b':", queue.Len()) + a.IsTrue(queue.Len() == 2) + + queue.Delete("a") + t.Log("delete 'a':", queue.Len()) + a.IsTrue(queue.Len() == 1) +} diff --git a/internal/caches/reader_partial_file.go b/internal/caches/reader_partial_file.go index d45c0cb..ea69dbf 100644 --- a/internal/caches/reader_partial_file.go +++ b/internal/caches/reader_partial_file.go @@ -145,6 +145,8 @@ func (this *PartialFileReader) IsCompleted() bool { } func (this *PartialFileReader) discard() error { + SharedPartialRangesQueue.Delete(this.rangePath) _ = os.Remove(this.rangePath) + return this.FileReader.discard() } diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 6b708b4..2d56eb8 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -1486,6 +1486,7 @@ func (this *FileStorage) removeCacheFile(path string) error { _, statErr := os.Stat(partialPath) if statErr == nil { _ = os.Remove(partialPath) + SharedPartialRangesQueue.Delete(partialPath) } } return err diff --git a/internal/caches/writer_partial_file.go b/internal/caches/writer_partial_file.go index fd02a7d..64a1bfd 100644 --- a/internal/caches/writer_partial_file.go +++ b/internal/caches/writer_partial_file.go @@ -228,6 +228,7 @@ func (this *PartialFileWriter) Discard() error { _ = this.rawWriter.Close() fsutils.WriteEnd() + SharedPartialRangesQueue.Delete(this.rangePath) _ = os.Remove(this.rangePath) err := os.Remove(this.rawWriter.Name()) @@ -261,5 +262,7 @@ func (this *PartialFileWriter) IsNew() bool { func (this *PartialFileWriter) remove() { _ = os.Remove(this.rawWriter.Name()) + + SharedPartialRangesQueue.Delete(this.rangePath) _ = os.Remove(this.rangePath) }