mirror of
				https://github.com/TeaOSLab/EdgeNode.git
				synced 2025-11-04 07:40:56 +08:00 
			
		
		
		
	优化代码
This commit is contained in:
		@@ -11,7 +11,7 @@ func TestOpenFilePool_Get(t *testing.T) {
 | 
			
		||||
	var pool = caches.NewOpenFilePool("a")
 | 
			
		||||
	t.Log(pool.Filename())
 | 
			
		||||
	t.Log(pool.Get())
 | 
			
		||||
	t.Log(pool.Put(caches.NewOpenFile(nil, nil)))
 | 
			
		||||
	t.Log(pool.Put(caches.NewOpenFile(nil, nil, []byte{})))
 | 
			
		||||
	t.Log(pool.Get())
 | 
			
		||||
	t.Log(pool.Get())
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -325,10 +325,11 @@ func (this *FileStorage) openReader(key string, allowMemory bool, useStale bool)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// OpenWriter 打开缓存文件等待写入
 | 
			
		||||
func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) {
 | 
			
		||||
func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int, size int64) (Writer, error) {
 | 
			
		||||
	// 先尝试内存缓存
 | 
			
		||||
	if this.memoryStorage != nil {
 | 
			
		||||
		writer, err := this.memoryStorage.OpenWriter(key, expiredAt, status)
 | 
			
		||||
	// 我们限定仅小文件优先存在内存中
 | 
			
		||||
	if this.memoryStorage != nil && size > 0 && size < 32*1024*1024 {
 | 
			
		||||
		writer, err := this.memoryStorage.OpenWriter(key, expiredAt, status, size)
 | 
			
		||||
		if err == nil {
 | 
			
		||||
			return writer, nil
 | 
			
		||||
		}
 | 
			
		||||
@@ -904,7 +905,7 @@ func (this *FileStorage) hotLoop() {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			writer, err := this.memoryStorage.openWriter(item.Key, item.ExpiresAt, item.Status, false)
 | 
			
		||||
			writer, err := this.memoryStorage.openWriter(item.Key, item.ExpiresAt, item.Status, reader.BodySize(), false)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				if !CanIgnoreErr(err) {
 | 
			
		||||
					remotelogs.Error("CACHE", "transfer hot item failed: "+err.Error())
 | 
			
		||||
 
 | 
			
		||||
@@ -62,7 +62,7 @@ func TestFileStorage_OpenWriter(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
	header := []byte("Header")
 | 
			
		||||
	body := []byte("This is Body")
 | 
			
		||||
	writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200)
 | 
			
		||||
	writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200, -1)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -104,7 +104,7 @@ func TestFileStorage_OpenWriter_HTTP(t *testing.T) {
 | 
			
		||||
		t.Log(time.Since(now).Seconds()*1000, "ms")
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	writer, err := storage.OpenWriter("my-http-response", time.Now().Unix()+86400, 200)
 | 
			
		||||
	writer, err := storage.OpenWriter("my-http-response", time.Now().Unix()+86400, 200, -1)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -177,7 +177,7 @@ func TestFileStorage_Concurrent_Open_DifferentFile(t *testing.T) {
 | 
			
		||||
		go func(i int) {
 | 
			
		||||
			defer wg.Done()
 | 
			
		||||
 | 
			
		||||
			writer, err := storage.OpenWriter("abc"+strconv.Itoa(i), time.Now().Unix()+3600, 200)
 | 
			
		||||
			writer, err := storage.OpenWriter("abc"+strconv.Itoa(i), time.Now().Unix()+3600, 200, -1)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				if err != ErrFileIsWriting {
 | 
			
		||||
					t.Fatal(err)
 | 
			
		||||
@@ -229,7 +229,7 @@ func TestFileStorage_Concurrent_Open_SameFile(t *testing.T) {
 | 
			
		||||
		go func(i int) {
 | 
			
		||||
			defer wg.Done()
 | 
			
		||||
 | 
			
		||||
			writer, err := storage.OpenWriter("abc"+strconv.Itoa(0), time.Now().Unix()+3600, 200)
 | 
			
		||||
			writer, err := storage.OpenWriter("abc"+strconv.Itoa(0), time.Now().Unix()+3600, 200, -1)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				if err != ErrFileIsWriting {
 | 
			
		||||
					t.Fatal(err)
 | 
			
		||||
 
 | 
			
		||||
@@ -13,7 +13,7 @@ type StorageInterface interface {
 | 
			
		||||
	OpenReader(key string, useStale bool) (reader Reader, err error)
 | 
			
		||||
 | 
			
		||||
	// OpenWriter 打开缓存写入器等待写入
 | 
			
		||||
	OpenWriter(key string, expiredAt int64, status int) (Writer, error)
 | 
			
		||||
	OpenWriter(key string, expiredAt int64, status int, size int64) (Writer, error)
 | 
			
		||||
 | 
			
		||||
	// Delete 删除某个键值对应的缓存
 | 
			
		||||
	Delete(key string) error
 | 
			
		||||
 
 | 
			
		||||
@@ -145,11 +145,11 @@ func (this *MemoryStorage) OpenReader(key string, useStale bool) (Reader, error)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// OpenWriter 打开缓存写入器等待写入
 | 
			
		||||
func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) {
 | 
			
		||||
	return this.openWriter(key, expiredAt, status, true)
 | 
			
		||||
func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int, size int64) (Writer, error) {
 | 
			
		||||
	return this.openWriter(key, expiredAt, status, size, true)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, isDirty bool) (Writer, error) {
 | 
			
		||||
func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, size int64, isDirty bool) (Writer, error) {
 | 
			
		||||
	this.locker.Lock()
 | 
			
		||||
	defer this.locker.Unlock()
 | 
			
		||||
 | 
			
		||||
@@ -182,7 +182,10 @@ func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, i
 | 
			
		||||
		return nil, NewCapacityError("write memory cache failed: too many keys in cache storage")
 | 
			
		||||
	}
 | 
			
		||||
	capacityBytes := this.memoryCapacityBytes()
 | 
			
		||||
	if capacityBytes > 0 && capacityBytes <= this.totalSize {
 | 
			
		||||
	if size < 0 {
 | 
			
		||||
		size = 0
 | 
			
		||||
	}
 | 
			
		||||
	if capacityBytes > 0 && capacityBytes <= this.totalSize+size {
 | 
			
		||||
		return nil, NewCapacityError("write memory cache failed: over memory size: " + strconv.FormatInt(capacityBytes, 10) + ", current size: " + strconv.FormatInt(this.totalSize, 10) + " bytes")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -384,7 +387,7 @@ func (this *MemoryStorage) flushItem(key string) {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	writer, err := this.parentStorage.OpenWriter(key, item.ExpiredAt, item.Status)
 | 
			
		||||
	writer, err := this.parentStorage.OpenWriter(key, item.ExpiredAt, item.Status, -1)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		if !CanIgnoreErr(err) {
 | 
			
		||||
			remotelogs.Error("CACHE", "flush items failed: open writer failed: "+err.Error())
 | 
			
		||||
 
 | 
			
		||||
@@ -15,7 +15,7 @@ import (
 | 
			
		||||
func TestMemoryStorage_OpenWriter(t *testing.T) {
 | 
			
		||||
	storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
 | 
			
		||||
 | 
			
		||||
	writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200)
 | 
			
		||||
	writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -62,7 +62,7 @@ func TestMemoryStorage_OpenWriter(t *testing.T) {
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	writer, err = storage.OpenWriter("abc", time.Now().Unix()+60, 200)
 | 
			
		||||
	writer, err = storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -103,7 +103,7 @@ func TestMemoryStorage_OpenReaderLock(t *testing.T) {
 | 
			
		||||
func TestMemoryStorage_Delete(t *testing.T) {
 | 
			
		||||
	storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
 | 
			
		||||
	{
 | 
			
		||||
		writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200)
 | 
			
		||||
		writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200, -1)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Fatal(err)
 | 
			
		||||
		}
 | 
			
		||||
@@ -111,7 +111,7 @@ func TestMemoryStorage_Delete(t *testing.T) {
 | 
			
		||||
		t.Log(len(storage.valuesMap))
 | 
			
		||||
	}
 | 
			
		||||
	{
 | 
			
		||||
		writer, err := storage.OpenWriter("abc1", time.Now().Unix()+60, 200)
 | 
			
		||||
		writer, err := storage.OpenWriter("abc1", time.Now().Unix()+60, 200, -1)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Fatal(err)
 | 
			
		||||
		}
 | 
			
		||||
@@ -126,7 +126,7 @@ func TestMemoryStorage_Stat(t *testing.T) {
 | 
			
		||||
	storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
 | 
			
		||||
	expiredAt := time.Now().Unix() + 60
 | 
			
		||||
	{
 | 
			
		||||
		writer, err := storage.OpenWriter("abc", expiredAt, 200)
 | 
			
		||||
		writer, err := storage.OpenWriter("abc", expiredAt, 200, -1)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Fatal(err)
 | 
			
		||||
		}
 | 
			
		||||
@@ -139,7 +139,7 @@ func TestMemoryStorage_Stat(t *testing.T) {
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
	{
 | 
			
		||||
		writer, err := storage.OpenWriter("abc1", expiredAt, 200)
 | 
			
		||||
		writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Fatal(err)
 | 
			
		||||
		}
 | 
			
		||||
@@ -163,7 +163,7 @@ func TestMemoryStorage_CleanAll(t *testing.T) {
 | 
			
		||||
	storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
 | 
			
		||||
	expiredAt := time.Now().Unix() + 60
 | 
			
		||||
	{
 | 
			
		||||
		writer, err := storage.OpenWriter("abc", expiredAt, 200)
 | 
			
		||||
		writer, err := storage.OpenWriter("abc", expiredAt, 200, -1)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Fatal(err)
 | 
			
		||||
		}
 | 
			
		||||
@@ -175,7 +175,7 @@ func TestMemoryStorage_CleanAll(t *testing.T) {
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
	{
 | 
			
		||||
		writer, err := storage.OpenWriter("abc1", expiredAt, 200)
 | 
			
		||||
		writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Fatal(err)
 | 
			
		||||
		}
 | 
			
		||||
@@ -198,7 +198,7 @@ func TestMemoryStorage_Purge(t *testing.T) {
 | 
			
		||||
	storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}, nil)
 | 
			
		||||
	expiredAt := time.Now().Unix() + 60
 | 
			
		||||
	{
 | 
			
		||||
		writer, err := storage.OpenWriter("abc", expiredAt, 200)
 | 
			
		||||
		writer, err := storage.OpenWriter("abc", expiredAt, 200, -1)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Fatal(err)
 | 
			
		||||
		}
 | 
			
		||||
@@ -210,7 +210,7 @@ func TestMemoryStorage_Purge(t *testing.T) {
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
	{
 | 
			
		||||
		writer, err := storage.OpenWriter("abc1", expiredAt, 200)
 | 
			
		||||
		writer, err := storage.OpenWriter("abc1", expiredAt, 200, -1)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Fatal(err)
 | 
			
		||||
		}
 | 
			
		||||
@@ -241,7 +241,7 @@ func TestMemoryStorage_Expire(t *testing.T) {
 | 
			
		||||
	for i := 0; i < 1000; i++ {
 | 
			
		||||
		expiredAt := time.Now().Unix() + int64(rands.Int(0, 60))
 | 
			
		||||
		key := "abc" + strconv.Itoa(i)
 | 
			
		||||
		writer, err := storage.OpenWriter(key, expiredAt, 200)
 | 
			
		||||
		writer, err := storage.OpenWriter(key, expiredAt, 200, -1)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Fatal(err)
 | 
			
		||||
		}
 | 
			
		||||
 
 | 
			
		||||
@@ -1,76 +0,0 @@
 | 
			
		||||
package caches
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/compressions"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type compressionWriter struct {
 | 
			
		||||
	rawWriter Writer
 | 
			
		||||
	writer    compressions.Writer
 | 
			
		||||
	key       string
 | 
			
		||||
	expiredAt int64
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewCompressionWriter(gw Writer, cpWriter compressions.Writer, key string, expiredAt int64) Writer {
 | 
			
		||||
	return &compressionWriter{
 | 
			
		||||
		rawWriter: gw,
 | 
			
		||||
		writer:    cpWriter,
 | 
			
		||||
		key:       key,
 | 
			
		||||
		expiredAt: expiredAt,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *compressionWriter) WriteHeader(data []byte) (n int, err error) {
 | 
			
		||||
	return this.writer.Write(data)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WriteHeaderLength 写入Header长度数据
 | 
			
		||||
func (this *compressionWriter) WriteHeaderLength(headerLength int) error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WriteBodyLength 写入Body长度数据
 | 
			
		||||
func (this *compressionWriter) WriteBodyLength(bodyLength int64) error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *compressionWriter) Write(data []byte) (n int, err error) {
 | 
			
		||||
	return this.writer.Write(data)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *compressionWriter) Close() error {
 | 
			
		||||
	err := this.writer.Close()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	return this.rawWriter.Close()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *compressionWriter) Discard() error {
 | 
			
		||||
	err := this.writer.Close()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	return this.rawWriter.Discard()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *compressionWriter) Key() string {
 | 
			
		||||
	return this.key
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *compressionWriter) ExpiredAt() int64 {
 | 
			
		||||
	return this.expiredAt
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *compressionWriter) HeaderSize() int64 {
 | 
			
		||||
	return this.rawWriter.HeaderSize()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *compressionWriter) BodySize() int64 {
 | 
			
		||||
	return this.rawWriter.BodySize()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ItemType 内容类型
 | 
			
		||||
func (this *compressionWriter) ItemType() ItemType {
 | 
			
		||||
	return this.rawWriter.ItemType()
 | 
			
		||||
}
 | 
			
		||||
@@ -182,7 +182,7 @@ func (this *APIStream) handleWriteCache(message *pb.NodeStreamMessage) error {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	expiredAt := time.Now().Unix() + msg.LifeSeconds
 | 
			
		||||
	writer, err := storage.OpenWriter(msg.Key, expiredAt, 200)
 | 
			
		||||
	writer, err := storage.OpenWriter(msg.Key, expiredAt, 200, int64(len(msg.Value)))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		this.replyFail(message.RequestId, "prepare writing failed: "+err.Error())
 | 
			
		||||
		return err
 | 
			
		||||
@@ -462,7 +462,7 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error {
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			expiredAt := time.Now().Unix() + 8600
 | 
			
		||||
			writer, err := storage.OpenWriter(key, expiredAt, 200) // TODO 可以设置缓存过期时间
 | 
			
		||||
			writer, err := storage.OpenWriter(key, expiredAt, 200, resp.ContentLength) // TODO 可以设置缓存过期时间
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				locker.Lock()
 | 
			
		||||
				errorMessages = append(errorMessages, "open cache writer failed: "+key+": "+err.Error())
 | 
			
		||||
 
 | 
			
		||||
@@ -21,7 +21,7 @@ import (
 | 
			
		||||
func (this *HTTPRequest) doCacheRead(useStale bool) (shouldStop bool) {
 | 
			
		||||
	this.cacheCanTryStale = false
 | 
			
		||||
 | 
			
		||||
	cachePolicy := this.ReqServer.HTTPCachePolicy
 | 
			
		||||
	var cachePolicy = this.ReqServer.HTTPCachePolicy
 | 
			
		||||
	if cachePolicy == nil || !cachePolicy.IsOn {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -243,7 +243,7 @@ func (this *HTTPWriter) PrepareCache(resp *http.Response, size int64) {
 | 
			
		||||
 | 
			
		||||
	var expiredAt = utils.UnixTime() + life
 | 
			
		||||
	var cacheKey = this.req.cacheKey
 | 
			
		||||
	cacheWriter, err := storage.OpenWriter(cacheKey, expiredAt, this.StatusCode())
 | 
			
		||||
	cacheWriter, err := storage.OpenWriter(cacheKey, expiredAt, this.StatusCode(), size)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		if !caches.CanIgnoreErr(err) {
 | 
			
		||||
			remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error())
 | 
			
		||||
@@ -537,7 +537,7 @@ func (this *HTTPWriter) Close() {
 | 
			
		||||
		if this.cacheWriter != nil {
 | 
			
		||||
			var cacheKey = this.cacheWriter.Key() + webpSuffix
 | 
			
		||||
 | 
			
		||||
			webpCacheWriter, _ = this.cacheStorage.OpenWriter(cacheKey, this.cacheWriter.ExpiredAt(), this.StatusCode())
 | 
			
		||||
			webpCacheWriter, _ = this.cacheStorage.OpenWriter(cacheKey, this.cacheWriter.ExpiredAt(), this.StatusCode(), -1)
 | 
			
		||||
			if webpCacheWriter != nil {
 | 
			
		||||
				// 写入Header
 | 
			
		||||
				for k, v := range this.Header() {
 | 
			
		||||
@@ -660,7 +660,7 @@ func (this *HTTPWriter) Close() {
 | 
			
		||||
			// 对比Content-Length
 | 
			
		||||
			var contentLengthString = this.Header().Get("Content-Length")
 | 
			
		||||
			if len(contentLengthString) > 0 {
 | 
			
		||||
				contentLength := types.Int64(contentLengthString)
 | 
			
		||||
				var contentLength = types.Int64(contentLengthString)
 | 
			
		||||
				if contentLength != this.cacheWriter.BodySize() {
 | 
			
		||||
					this.isOk = false
 | 
			
		||||
					_ = this.cacheWriter.Discard()
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user