mirror of
				https://github.com/TeaOSLab/EdgeNode.git
				synced 2025-11-04 07:40:56 +08:00 
			
		
		
		
	优化代码
This commit is contained in:
		@@ -65,12 +65,13 @@ var sharedWritingFileKeyLocker = sync.Mutex{}
 | 
			
		||||
 | 
			
		||||
var maxOpenFiles = NewMaxOpenFiles()
 | 
			
		||||
 | 
			
		||||
const maxOpenFilesSlowCost = 5000 * time.Microsecond // 0.5ms
 | 
			
		||||
const maxOpenFilesSlowCost = 1000 * time.Microsecond // us
 | 
			
		||||
const protectingLoadWhenDump = false
 | 
			
		||||
 | 
			
		||||
// FileStorage 文件缓存
 | 
			
		||||
//   文件结构:
 | 
			
		||||
//    [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data]
 | 
			
		||||
//
 | 
			
		||||
//	文件结构:
 | 
			
		||||
//	 [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data]
 | 
			
		||||
type FileStorage struct {
 | 
			
		||||
	policy        *serverconfigs.HTTPCachePolicy
 | 
			
		||||
	options       *serverconfigs.HTTPFileCacheStorage // 二级缓存
 | 
			
		||||
 
 | 
			
		||||
@@ -95,7 +95,8 @@ func (this *MemoryStorage) Init() error {
 | 
			
		||||
	// 启动定时Flush memory to disk任务
 | 
			
		||||
	if this.parentStorage != nil {
 | 
			
		||||
		// TODO 应该根据磁盘性能决定线程数
 | 
			
		||||
		var threads = 1
 | 
			
		||||
		// TODO 线程数应该可以在缓存策略和节点中设定
 | 
			
		||||
		var threads = runtime.NumCPU()
 | 
			
		||||
 | 
			
		||||
		for i := 0; i < threads; i++ {
 | 
			
		||||
			goman.New(func() {
 | 
			
		||||
 
 | 
			
		||||
@@ -8,6 +8,7 @@ import (
 | 
			
		||||
	"runtime"
 | 
			
		||||
	"runtime/debug"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
@@ -304,3 +305,30 @@ func TestMemoryStorage_Stop(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
	t.Log(len(m))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func BenchmarkValuesMap(b *testing.B) {
 | 
			
		||||
	var m = map[uint64]*MemoryItem{}
 | 
			
		||||
	var count = 1_000_000
 | 
			
		||||
	for i := 0; i < count; i++ {
 | 
			
		||||
		m[uint64(i)] = &MemoryItem{
 | 
			
		||||
			ExpiresAt: time.Now().Unix(),
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	b.Log(len(m))
 | 
			
		||||
 | 
			
		||||
	var locker = sync.Mutex{}
 | 
			
		||||
	b.ResetTimer()
 | 
			
		||||
 | 
			
		||||
	b.RunParallel(func(pb *testing.PB) {
 | 
			
		||||
		for pb.Next() {
 | 
			
		||||
			locker.Lock()
 | 
			
		||||
			_, ok := m[uint64(rands.Int(0, 1_000_000))]
 | 
			
		||||
			_ = ok
 | 
			
		||||
			locker.Unlock()
 | 
			
		||||
 | 
			
		||||
			locker.Lock()
 | 
			
		||||
			delete(m, uint64(rands.Int(2, 1000000)))
 | 
			
		||||
			locker.Unlock()
 | 
			
		||||
		}
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -5,10 +5,46 @@ package compressions_test
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/compressions"
 | 
			
		||||
	stringutil "github.com/iwind/TeaGo/utils/string"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestBrotliWriter_LargeFile(t *testing.T) {
 | 
			
		||||
	var data = []byte{}
 | 
			
		||||
	for i := 0; i < 1024*1024; i++ {
 | 
			
		||||
		data = append(data, stringutil.Rand(32)...)
 | 
			
		||||
	}
 | 
			
		||||
	t.Log(len(data)/1024/1024, "M")
 | 
			
		||||
 | 
			
		||||
	var before = time.Now()
 | 
			
		||||
	defer func() {
 | 
			
		||||
		t.Log(time.Since(before).Seconds()*1000, "ms")
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	var buf = &bytes.Buffer{}
 | 
			
		||||
	writer, err := compressions.NewBrotliWriter(buf, 5)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var offset = 0
 | 
			
		||||
	var size = 4096
 | 
			
		||||
	for offset < len(data) {
 | 
			
		||||
		_, err = writer.Write(data[offset : offset+size])
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Fatal(err)
 | 
			
		||||
		}
 | 
			
		||||
		offset += size
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	err = writer.Close()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func BenchmarkBrotliWriter_Write(b *testing.B) {
 | 
			
		||||
	var data = []byte(strings.Repeat("A", 1024))
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -121,7 +121,7 @@ func (this *ProvinceManager) loop() error {
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	resp, err := rpcClient.RegionProvinceRPC().FindAllRegionProvincesWithCountryId(rpcClient.Context(), &pb.FindAllRegionProvincesWithCountryIdRequest{
 | 
			
		||||
	resp, err := rpcClient.RegionProvinceRPC().FindAllRegionProvincesWithRegionCountryId(rpcClient.Context(), &pb.FindAllRegionProvincesWithRegionCountryIdRequest{
 | 
			
		||||
		RegionCountryId: ChinaCountryId,
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
 
 | 
			
		||||
@@ -23,18 +23,19 @@ import (
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const MaxQueueSize = 2048
 | 
			
		||||
const MaxQueueSize = 256 // TODO 可以配置,可以在单个任务里配置
 | 
			
		||||
 | 
			
		||||
// Task 单个指标任务
 | 
			
		||||
// 数据库存储:
 | 
			
		||||
//  data/
 | 
			
		||||
//     metric.$ID.db
 | 
			
		||||
//        stats
 | 
			
		||||
//           id, keys, value, time, serverId, hash
 | 
			
		||||
//  原理:
 | 
			
		||||
//     添加或者有变更时 isUploaded = false
 | 
			
		||||
//     上传时检查 isUploaded 状态
 | 
			
		||||
//     只上传每个服务中排序最前面的 N 个数据
 | 
			
		||||
//
 | 
			
		||||
//	data/
 | 
			
		||||
//	   metric.$ID.db
 | 
			
		||||
//	      stats
 | 
			
		||||
//	         id, keys, value, time, serverId, hash
 | 
			
		||||
//	原理:
 | 
			
		||||
//	   添加或者有变更时 isUploaded = false
 | 
			
		||||
//	   上传时检查 isUploaded 状态
 | 
			
		||||
//	   只上传每个服务中排序最前面的 N 个数据
 | 
			
		||||
type Task struct {
 | 
			
		||||
	item     *serverconfigs.MetricItemConfig
 | 
			
		||||
	isLoaded bool
 | 
			
		||||
@@ -372,7 +373,9 @@ func (this *Task) Upload(pauseDuration time.Duration) error {
 | 
			
		||||
	for _, serverId := range serverIds {
 | 
			
		||||
		for _, currentTime := range times {
 | 
			
		||||
			idStrings, err := func(serverId int64, currentTime string) (ids []string, err error) {
 | 
			
		||||
				var t = trackers.Begin("[METRIC]SELECT_TOP_STMT")
 | 
			
		||||
				rows, err := this.selectTopStmt.Query(serverId, this.item.Version, currentTime)
 | 
			
		||||
				t.End()
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return nil, err
 | 
			
		||||
				}
 | 
			
		||||
 
 | 
			
		||||
@@ -161,6 +161,7 @@ func (this *HTTPClientPool) Client(req *HTTPRequest,
 | 
			
		||||
			ExpectContinueTimeout: 1 * time.Second,
 | 
			
		||||
			TLSHandshakeTimeout:   5 * time.Second,
 | 
			
		||||
			TLSClientConfig:       tlsConfig,
 | 
			
		||||
			ReadBufferSize:        8 * 1024,
 | 
			
		||||
			Proxy:                 nil,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user