mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-12-15 07:06:36 +08:00
优化缓存管理
This commit is contained in:
@@ -2,11 +2,14 @@ package caches
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
_ "github.com/iwind/TeaGo/bootstrap"
|
||||
"github.com/iwind/TeaGo/logs"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
@@ -40,7 +43,7 @@ func TestFileStorage_Init(t *testing.T) {
|
||||
t.Log(len(storage.list.m), "entries left")
|
||||
}
|
||||
|
||||
func TestFileStorage_Open(t *testing.T) {
|
||||
func TestFileStorage_OpenWriter(t *testing.T) {
|
||||
storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{
|
||||
Id: 1,
|
||||
IsOn: true,
|
||||
@@ -57,13 +60,20 @@ func TestFileStorage_Open(t *testing.T) {
|
||||
t.Log(time.Since(now).Seconds()*1000, "ms")
|
||||
}()
|
||||
|
||||
writer, err := storage.Open("abc", time.Now().Unix()+3600)
|
||||
header := []byte("Header")
|
||||
body := []byte("This is Body")
|
||||
writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log(writer)
|
||||
|
||||
_, err = writer.Write([]byte("Hello,World"))
|
||||
_, err = writer.WriteHeader(header)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = writer.Write(body)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -72,6 +82,74 @@ func TestFileStorage_Open(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Log("header:", writer.HeaderSize(), "body:", writer.BodySize())
|
||||
t.Log("ok")
|
||||
}
|
||||
|
||||
func TestFileStorage_OpenWriter_HTTP(t *testing.T) {
|
||||
storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{
|
||||
Id: 1,
|
||||
IsOn: true,
|
||||
Options: map[string]interface{}{
|
||||
"dir": Tea.Root + "/caches",
|
||||
},
|
||||
})
|
||||
err := storage.Init()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
now := time.Now()
|
||||
defer func() {
|
||||
t.Log(time.Since(now).Seconds()*1000, "ms")
|
||||
}()
|
||||
|
||||
writer, err := storage.OpenWriter("my-http-response", time.Now().Unix()+86400, 200)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log(writer)
|
||||
|
||||
resp := &http.Response{
|
||||
StatusCode: 200,
|
||||
Header: http.Header{
|
||||
"Content-Type": []string{"text/html; charset=utf-8"},
|
||||
"Last-Modified": []string{"Wed, 06 Jan 2021 10:03:29 GMT"},
|
||||
"Server": []string{"CDN-Server"},
|
||||
},
|
||||
Body: ioutil.NopCloser(bytes.NewBuffer([]byte("THIS IS HTTP BODY"))),
|
||||
}
|
||||
|
||||
for k, v := range resp.Header {
|
||||
for _, v1 := range v {
|
||||
_, err = writer.WriteHeader([]byte(k + ":" + v1 + "\n"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
buf := make([]byte, 1024)
|
||||
for {
|
||||
n, err := resp.Body.Read(buf)
|
||||
if n > 0 {
|
||||
_, err = writer.Write(buf[:n])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
err = writer.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Log("header:", writer.HeaderSize(), "body:", writer.BodySize())
|
||||
t.Log("ok")
|
||||
}
|
||||
|
||||
func TestFileStorage_Concurrent_Open_DifferentFile(t *testing.T) {
|
||||
@@ -99,7 +177,7 @@ func TestFileStorage_Concurrent_Open_DifferentFile(t *testing.T) {
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
|
||||
writer, err := storage.Open("abc"+strconv.Itoa(i), time.Now().Unix()+3600)
|
||||
writer, err := storage.OpenWriter("abc"+strconv.Itoa(i), time.Now().Unix()+3600, 200)
|
||||
if err != nil {
|
||||
if err != ErrFileIsWriting {
|
||||
t.Fatal(err)
|
||||
@@ -151,7 +229,7 @@ func TestFileStorage_Concurrent_Open_SameFile(t *testing.T) {
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
|
||||
writer, err := storage.Open("abc"+strconv.Itoa(0), time.Now().Unix()+3600)
|
||||
writer, err := storage.OpenWriter("abc"+strconv.Itoa(0), time.Now().Unix()+3600, 200)
|
||||
if err != nil {
|
||||
if err != ErrFileIsWriting {
|
||||
t.Fatal(err)
|
||||
@@ -179,35 +257,6 @@ func TestFileStorage_Concurrent_Open_SameFile(t *testing.T) {
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestFileStorage_Write(t *testing.T) {
|
||||
storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{
|
||||
Id: 1,
|
||||
IsOn: true,
|
||||
Options: map[string]interface{}{
|
||||
"dir": Tea.Root + "/caches",
|
||||
},
|
||||
})
|
||||
err := storage.Init()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
reader := bytes.NewBuffer([]byte(`my_value
|
||||
my_value2
|
||||
my_value3
|
||||
my_value4
|
||||
my_value5
|
||||
my_value6
|
||||
my_value7
|
||||
my_value8
|
||||
my_value9
|
||||
my_value10`))
|
||||
err = storage.Write("my-key", time.Now().Unix()+3600, reader)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("ok")
|
||||
}
|
||||
|
||||
func TestFileStorage_Read(t *testing.T) {
|
||||
storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{
|
||||
Id: 1,
|
||||
@@ -221,9 +270,79 @@ func TestFileStorage_Read(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
now := time.Now()
|
||||
t.Log(storage.Read("my-key", make([]byte, 64), func(data []byte, size int64, expiredAt int64, isEOF bool) {
|
||||
t.Log("[expiredAt]", "["+string(data)+"]")
|
||||
}))
|
||||
reader, err := storage.OpenReader("my-key")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
buf := make([]byte, 6)
|
||||
t.Log(reader.Status())
|
||||
err = reader.ReadHeader(buf, func(n int) (goNext bool, err error) {
|
||||
t.Log("header:", string(buf[:n]))
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = reader.ReadBody(buf, func(n int) (goNext bool, err error) {
|
||||
t.Log("body:", string(buf[:n]))
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log(time.Since(now).Seconds()*1000, "ms")
|
||||
}
|
||||
|
||||
func TestFileStorage_Read_HTTP_Response(t *testing.T) {
|
||||
storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{
|
||||
Id: 1,
|
||||
IsOn: true,
|
||||
Options: map[string]interface{}{
|
||||
"dir": Tea.Root + "/caches",
|
||||
},
|
||||
})
|
||||
err := storage.Init()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
now := time.Now()
|
||||
reader, err := storage.OpenReader("my-http-response")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
buf := make([]byte, 32)
|
||||
t.Log(reader.Status())
|
||||
|
||||
headerBuf := []byte{}
|
||||
err = reader.ReadHeader(buf, func(n int) (goNext bool, err error) {
|
||||
headerBuf = append(headerBuf, buf...)
|
||||
for {
|
||||
nIndex := bytes.Index(headerBuf, []byte{'\n'})
|
||||
if nIndex >= 0 {
|
||||
row := headerBuf[:nIndex]
|
||||
spaceIndex := bytes.Index(row, []byte{':'})
|
||||
if spaceIndex <= 0 {
|
||||
return false, errors.New("invalid header")
|
||||
}
|
||||
|
||||
t.Log("header row:", string(row[:spaceIndex]), string(row[spaceIndex+1:]))
|
||||
headerBuf = headerBuf[nIndex+1:]
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = reader.ReadBody(buf, func(n int) (goNext bool, err error) {
|
||||
t.Log("body:", string(buf[:n]))
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log(time.Since(now).Seconds()*1000, "ms")
|
||||
}
|
||||
|
||||
@@ -240,9 +359,23 @@ func TestFileStorage_Read_NotFound(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
now := time.Now()
|
||||
t.Log(storage.Read("my-key-10000", make([]byte, 64), func(data []byte, size int64, expiredAt int64, isEOF bool) {
|
||||
t.Log("[" + string(data) + "]")
|
||||
}))
|
||||
buf := make([]byte, 6)
|
||||
reader, err := storage.OpenReader("my-key-10000")
|
||||
if err != nil {
|
||||
if err == ErrNotFound {
|
||||
t.Log("cache not fund")
|
||||
return
|
||||
}
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = reader.ReadBody(buf, func(n int) (goNext bool, err error) {
|
||||
t.Log("body:", string(buf[:n]))
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log(time.Since(now).Seconds()*1000, "ms")
|
||||
}
|
||||
|
||||
@@ -319,38 +452,6 @@ func TestFileStorage_CleanAll(t *testing.T) {
|
||||
t.Log("ok")
|
||||
}
|
||||
|
||||
func TestFileStorage_Purge(t *testing.T) {
|
||||
storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{
|
||||
Id: 1,
|
||||
IsOn: true,
|
||||
Options: map[string]interface{}{
|
||||
"dir": Tea.Root + "/caches",
|
||||
},
|
||||
})
|
||||
err := storage.Init()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_ = storage.Write("a", time.Now().Unix()+3600, bytes.NewReader([]byte("a1")))
|
||||
_ = storage.Write("b", time.Now().Unix()+3600, bytes.NewReader([]byte("b1")))
|
||||
_ = storage.Write("c", time.Now().Unix()+3600, bytes.NewReader([]byte("c1")))
|
||||
_ = storage.Write("d", time.Now().Unix()+3600, bytes.NewReader([]byte("d1")))
|
||||
|
||||
before := time.Now()
|
||||
defer func() {
|
||||
t.Log(time.Since(before).Seconds()*1000, "ms")
|
||||
}()
|
||||
|
||||
err = storage.Purge([]string{"a", "b1", "c"}, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Log(storage.list.m)
|
||||
t.Log("ok")
|
||||
}
|
||||
|
||||
func TestFileStorage_Stop(t *testing.T) {
|
||||
storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{
|
||||
Id: 1,
|
||||
@@ -366,6 +467,26 @@ func TestFileStorage_Stop(t *testing.T) {
|
||||
storage.Stop()
|
||||
}
|
||||
|
||||
func TestFileStorage_DecodeFile(t *testing.T) {
|
||||
storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{
|
||||
Id: 1,
|
||||
IsOn: true,
|
||||
Options: map[string]interface{}{
|
||||
"dir": Tea.Root + "/caches",
|
||||
},
|
||||
})
|
||||
err := storage.Init()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, path := storage.keyPath("my-key")
|
||||
item, err := storage.decodeFile(path)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
logs.PrintAsJSON(item, t)
|
||||
}
|
||||
|
||||
func BenchmarkFileStorage_Read(b *testing.B) {
|
||||
runtime.GOMAXPROCS(1)
|
||||
|
||||
@@ -382,9 +503,15 @@ func BenchmarkFileStorage_Read(b *testing.B) {
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
buf := make([]byte, 1024)
|
||||
for i := 0; i < b.N; i++ {
|
||||
_ = storage.Read("my-key", buf, func(data []byte, size int64, expiredAt int64, isEOF bool) {
|
||||
reader, err := storage.OpenReader("my-key")
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
buf := make([]byte, 1024)
|
||||
_ = reader.ReadBody(buf, func(n int) (goNext bool, err error) {
|
||||
return true, nil
|
||||
})
|
||||
_ = reader.Close()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user