mirror of
				https://github.com/TeaOSLab/EdgeNode.git
				synced 2025-11-04 16:00:25 +08:00 
			
		
		
		
	提升Partial Content的范围数据(ranges)写入效率
This commit is contained in:
		@@ -6,7 +6,6 @@ import (
 | 
				
			|||||||
	"bytes"
 | 
						"bytes"
 | 
				
			||||||
	"encoding/json"
 | 
						"encoding/json"
 | 
				
			||||||
	"github.com/iwind/TeaGo/types"
 | 
						"github.com/iwind/TeaGo/types"
 | 
				
			||||||
	"os"
 | 
					 | 
				
			||||||
	"strconv"
 | 
						"strconv"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -70,7 +69,7 @@ func NewPartialRangesFromJSON(data []byte) (*PartialRanges, error) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// NewPartialRangesFromFile 从文件中加载范围信息
 | 
					// NewPartialRangesFromFile 从文件中加载范围信息
 | 
				
			||||||
func NewPartialRangesFromFile(path string) (*PartialRanges, error) {
 | 
					func NewPartialRangesFromFile(path string) (*PartialRanges, error) {
 | 
				
			||||||
	data, err := os.ReadFile(path)
 | 
						data, err := SharedPartialRangesQueue.Get(path)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -172,7 +171,8 @@ func (this *PartialRanges) Bytes() []byte {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// WriteToFile 写入到文件中
 | 
					// WriteToFile 写入到文件中
 | 
				
			||||||
func (this *PartialRanges) WriteToFile(path string) error {
 | 
					func (this *PartialRanges) WriteToFile(path string) error {
 | 
				
			||||||
	return os.WriteFile(path, this.Bytes(), 0666)
 | 
						SharedPartialRangesQueue.Put(path, this.Bytes())
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Max 获取最大位置
 | 
					// Max 获取最大位置
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										144
									
								
								internal/caches/partial_ranges_queue.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										144
									
								
								internal/caches/partial_ranges_queue.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,144 @@
 | 
				
			|||||||
 | 
					// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package caches
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeNode/internal/goman"
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeNode/internal/utils/fnv"
 | 
				
			||||||
 | 
						memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem"
 | 
				
			||||||
 | 
						"os"
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var SharedPartialRangesQueue = NewPartialRangesQueue()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func init() {
 | 
				
			||||||
 | 
						if !teaconst.IsMain {
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						SharedPartialRangesQueue.Start()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const partialRangesQueueSharding = 8
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// PartialRangesQueue ranges file writing queue
 | 
				
			||||||
 | 
					type PartialRangesQueue struct {
 | 
				
			||||||
 | 
						m [partialRangesQueueSharding]map[string][]byte // { filename => data, ... }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						c  chan string // filename1, ...
 | 
				
			||||||
 | 
						mu [partialRangesQueueSharding]*sync.RWMutex
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NewPartialRangesQueue Create new queue
 | 
				
			||||||
 | 
					func NewPartialRangesQueue() *PartialRangesQueue {
 | 
				
			||||||
 | 
						var queueSize = 512
 | 
				
			||||||
 | 
						var memGB = memutils.SystemMemoryGB()
 | 
				
			||||||
 | 
						if memGB > 16 {
 | 
				
			||||||
 | 
							queueSize = 8 << 10
 | 
				
			||||||
 | 
						} else if memGB > 8 {
 | 
				
			||||||
 | 
							queueSize = 4 << 10
 | 
				
			||||||
 | 
						} else if memGB > 4 {
 | 
				
			||||||
 | 
							queueSize = 2 << 10
 | 
				
			||||||
 | 
						} else if memGB > 2 {
 | 
				
			||||||
 | 
							queueSize = 1 << 10
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var m = [partialRangesQueueSharding]map[string][]byte{}
 | 
				
			||||||
 | 
						var muList = [partialRangesQueueSharding]*sync.RWMutex{}
 | 
				
			||||||
 | 
						for i := 0; i < partialRangesQueueSharding; i++ {
 | 
				
			||||||
 | 
							muList[i] = &sync.RWMutex{}
 | 
				
			||||||
 | 
							m[i] = map[string][]byte{}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return &PartialRangesQueue{
 | 
				
			||||||
 | 
							mu: muList,
 | 
				
			||||||
 | 
							m:  m,
 | 
				
			||||||
 | 
							c:  make(chan string, queueSize),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Start the queue
 | 
				
			||||||
 | 
					func (this *PartialRangesQueue) Start() {
 | 
				
			||||||
 | 
						goman.New(func() {
 | 
				
			||||||
 | 
							this.Dump()
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Put ranges data to filename
 | 
				
			||||||
 | 
					func (this *PartialRangesQueue) Put(filename string, data []byte) {
 | 
				
			||||||
 | 
						var index = this.indexForKey(filename)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						this.mu[index].Lock()
 | 
				
			||||||
 | 
						this.m[index][filename] = data
 | 
				
			||||||
 | 
						this.mu[index].Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// always wait to finish
 | 
				
			||||||
 | 
						this.c <- filename
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Get ranges data from filename
 | 
				
			||||||
 | 
					func (this *PartialRangesQueue) Get(filename string) ([]byte, error) {
 | 
				
			||||||
 | 
						var index = this.indexForKey(filename)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						this.mu[index].RLock()
 | 
				
			||||||
 | 
						data, ok := this.m[index][filename]
 | 
				
			||||||
 | 
						this.mu[index].RUnlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if ok {
 | 
				
			||||||
 | 
							return data, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return os.ReadFile(filename)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Delete ranges filename
 | 
				
			||||||
 | 
					func (this *PartialRangesQueue) Delete(filename string) {
 | 
				
			||||||
 | 
						var index = this.indexForKey(filename)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						this.mu[index].Lock()
 | 
				
			||||||
 | 
						delete(this.m[index], filename)
 | 
				
			||||||
 | 
						this.mu[index].Unlock()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Dump ranges to filename from memory
 | 
				
			||||||
 | 
					func (this *PartialRangesQueue) Dump() {
 | 
				
			||||||
 | 
						for filename := range this.c {
 | 
				
			||||||
 | 
							var index = this.indexForKey(filename)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							this.mu[index].Lock()
 | 
				
			||||||
 | 
							data, ok := this.m[index][filename]
 | 
				
			||||||
 | 
							if ok {
 | 
				
			||||||
 | 
								delete(this.m[index], filename)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							this.mu[index].Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if !ok || len(data) == 0 {
 | 
				
			||||||
 | 
								continue
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							err := os.WriteFile(filename, data, 0666)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								remotelogs.Println("PARTIAL_RANGES_QUEUE", "write file '"+filename+"' failed: "+err.Error())
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Len count all files
 | 
				
			||||||
 | 
					func (this *PartialRangesQueue) Len() int {
 | 
				
			||||||
 | 
						var count int
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for i := 0; i < partialRangesQueueSharding; i++ {
 | 
				
			||||||
 | 
							this.mu[i].RLock()
 | 
				
			||||||
 | 
							count += len(this.m[i])
 | 
				
			||||||
 | 
							this.mu[i].RUnlock()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return count
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (this *PartialRangesQueue) indexForKey(filename string) int {
 | 
				
			||||||
 | 
						return int(fnv.HashString(filename) % partialRangesQueueSharding)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										31
									
								
								internal/caches/partial_ranges_queue_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										31
									
								
								internal/caches/partial_ranges_queue_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,31 @@
 | 
				
			|||||||
 | 
					// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package caches_test
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"github.com/TeaOSLab/EdgeNode/internal/caches"
 | 
				
			||||||
 | 
						"github.com/iwind/TeaGo/assert"
 | 
				
			||||||
 | 
						"testing"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestNewPartialRangesQueue(t *testing.T) {
 | 
				
			||||||
 | 
						var a = assert.NewAssertion(t)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var queue = caches.NewPartialRangesQueue()
 | 
				
			||||||
 | 
						queue.Put("a", []byte{1, 2, 3})
 | 
				
			||||||
 | 
						t.Log("add 'a':", queue.Len())
 | 
				
			||||||
 | 
						t.Log(queue.Get("a"))
 | 
				
			||||||
 | 
						a.IsTrue(queue.Len() == 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						queue.Put("a", nil)
 | 
				
			||||||
 | 
						t.Log("add 'a':", queue.Len())
 | 
				
			||||||
 | 
						a.IsTrue(queue.Len() == 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						queue.Put("b", nil)
 | 
				
			||||||
 | 
						t.Log("add 'b':", queue.Len())
 | 
				
			||||||
 | 
						a.IsTrue(queue.Len() == 2)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						queue.Delete("a")
 | 
				
			||||||
 | 
						t.Log("delete 'a':", queue.Len())
 | 
				
			||||||
 | 
						a.IsTrue(queue.Len() == 1)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -145,6 +145,8 @@ func (this *PartialFileReader) IsCompleted() bool {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (this *PartialFileReader) discard() error {
 | 
					func (this *PartialFileReader) discard() error {
 | 
				
			||||||
 | 
						SharedPartialRangesQueue.Delete(this.rangePath)
 | 
				
			||||||
	_ = os.Remove(this.rangePath)
 | 
						_ = os.Remove(this.rangePath)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return this.FileReader.discard()
 | 
						return this.FileReader.discard()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1486,6 +1486,7 @@ func (this *FileStorage) removeCacheFile(path string) error {
 | 
				
			|||||||
		_, statErr := os.Stat(partialPath)
 | 
							_, statErr := os.Stat(partialPath)
 | 
				
			||||||
		if statErr == nil {
 | 
							if statErr == nil {
 | 
				
			||||||
			_ = os.Remove(partialPath)
 | 
								_ = os.Remove(partialPath)
 | 
				
			||||||
 | 
								SharedPartialRangesQueue.Delete(partialPath)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return err
 | 
						return err
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -228,6 +228,7 @@ func (this *PartialFileWriter) Discard() error {
 | 
				
			|||||||
	_ = this.rawWriter.Close()
 | 
						_ = this.rawWriter.Close()
 | 
				
			||||||
	fsutils.WriteEnd()
 | 
						fsutils.WriteEnd()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						SharedPartialRangesQueue.Delete(this.rangePath)
 | 
				
			||||||
	_ = os.Remove(this.rangePath)
 | 
						_ = os.Remove(this.rangePath)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	err := os.Remove(this.rawWriter.Name())
 | 
						err := os.Remove(this.rawWriter.Name())
 | 
				
			||||||
@@ -261,5 +262,7 @@ func (this *PartialFileWriter) IsNew() bool {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func (this *PartialFileWriter) remove() {
 | 
					func (this *PartialFileWriter) remove() {
 | 
				
			||||||
	_ = os.Remove(this.rawWriter.Name())
 | 
						_ = os.Remove(this.rawWriter.Name())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						SharedPartialRangesQueue.Delete(this.rangePath)
 | 
				
			||||||
	_ = os.Remove(this.rangePath)
 | 
						_ = os.Remove(this.rangePath)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user