diff --git a/internal/caches/item.go b/internal/caches/item.go index 321531b..a25362d 100644 --- a/internal/caches/item.go +++ b/internal/caches/item.go @@ -3,12 +3,21 @@ package caches import "time" type Item struct { - Key string - ExpiredAt int64 - ValueSize int64 - Size int64 + Key string + ExpiredAt int64 + HeaderSize int64 + BodySize int64 + MetaSize int64 } func (this *Item) IsExpired() bool { return this.ExpiredAt < time.Now().Unix() } + +func (this *Item) TotalSize() int64 { + return this.Size() + this.MetaSize + int64(len(this.Key)) +} + +func (this *Item) Size() int64 { + return this.HeaderSize + this.BodySize +} diff --git a/internal/caches/list.go b/internal/caches/list.go index d965ec7..13462de 100644 --- a/internal/caches/list.go +++ b/internal/caches/list.go @@ -118,8 +118,8 @@ func (this *List) Stat(check func(hash string) bool) *Stat { // 检查文件是否存在、内容是否正确等 if check != nil && check(hash) { result.Count++ - result.ValueSize += item.ValueSize - result.Size += item.Size + result.ValueSize += item.Size() + result.Size += item.TotalSize() } } } diff --git a/internal/caches/list_test.go b/internal/caches/list_test.go index dc27df6..1213c25 100644 --- a/internal/caches/list_test.go +++ b/internal/caches/list_test.go @@ -12,14 +12,14 @@ import ( func TestList_Add(t *testing.T) { list := NewList() list.Add("a", &Item{ - Key: "a1", - ExpiredAt: time.Now().Unix() + 3600, - Size: 1024, + Key: "a1", + ExpiredAt: time.Now().Unix() + 3600, + HeaderSize: 1024, }) list.Add("b", &Item{ - Key: "b1", - ExpiredAt: time.Now().Unix() + 3600, - Size: 1024, + Key: "b1", + ExpiredAt: time.Now().Unix() + 3600, + HeaderSize: 1024, }) t.Log(list.m) } @@ -27,14 +27,14 @@ func TestList_Add(t *testing.T) { func TestList_Remove(t *testing.T) { list := NewList() list.Add("a", &Item{ - Key: "a1", - ExpiredAt: time.Now().Unix() + 3600, - Size: 1024, + Key: "a1", + ExpiredAt: time.Now().Unix() + 3600, + HeaderSize: 1024, }) list.Add("b", &Item{ - Key: "b1", - ExpiredAt: time.Now().Unix() + 3600, - Size: 1024, + Key: "b1", + ExpiredAt: time.Now().Unix() + 3600, + HeaderSize: 1024, }) list.Remove("b") t.Log(list.m) @@ -43,24 +43,24 @@ func TestList_Remove(t *testing.T) { func TestList_Purge(t *testing.T) { list := NewList() list.Add("a", &Item{ - Key: "a1", - ExpiredAt: time.Now().Unix() + 3600, - Size: 1024, + Key: "a1", + ExpiredAt: time.Now().Unix() + 3600, + HeaderSize: 1024, }) list.Add("b", &Item{ - Key: "b1", - ExpiredAt: time.Now().Unix() + 3600, - Size: 1024, + Key: "b1", + ExpiredAt: time.Now().Unix() + 3600, + HeaderSize: 1024, }) list.Add("c", &Item{ - Key: "c1", - ExpiredAt: time.Now().Unix() - 3600, - Size: 1024, + Key: "c1", + ExpiredAt: time.Now().Unix() - 3600, + HeaderSize: 1024, }) list.Add("d", &Item{ - Key: "d1", - ExpiredAt: time.Now().Unix() - 2, - Size: 1024, + Key: "d1", + ExpiredAt: time.Now().Unix() - 2, + HeaderSize: 1024, }) list.Purge(100, func(hash string) { t.Log("delete:", hash) @@ -71,24 +71,24 @@ func TestList_Purge(t *testing.T) { func TestList_Stat(t *testing.T) { list := NewList() list.Add("a", &Item{ - Key: "a1", - ExpiredAt: time.Now().Unix() + 3600, - Size: 1024, + Key: "a1", + ExpiredAt: time.Now().Unix() + 3600, + HeaderSize: 1024, }) list.Add("b", &Item{ - Key: "b1", - ExpiredAt: time.Now().Unix() + 3600, - Size: 1024, + Key: "b1", + ExpiredAt: time.Now().Unix() + 3600, + HeaderSize: 1024, }) list.Add("c", &Item{ - Key: "c1", - ExpiredAt: time.Now().Unix(), - Size: 1024, + Key: "c1", + ExpiredAt: time.Now().Unix(), + HeaderSize: 1024, }) list.Add("d", &Item{ - Key: "d1", - ExpiredAt: time.Now().Unix() - 2, - Size: 1024, + Key: "d1", + ExpiredAt: time.Now().Unix() - 2, + HeaderSize: 1024, }) result := list.Stat(func(hash string) bool { // 随机测试 @@ -104,10 +104,10 @@ func TestList_FindKeysWithPrefix(t *testing.T) { for i := 0; i < 1_000_000; i++ { key := "http://www.teaos.cn/hello" + strconv.Itoa(i/100000) + "/" + strconv.Itoa(i) + ".html" list.Add(fmt.Sprintf("%d", xxhash.Sum64String(key)), &Item{ - Key: key, - ExpiredAt: 0, - ValueSize: 0, - Size: 0, + Key: key, + ExpiredAt: 0, + BodySize: 0, + HeaderSize: 0, }) } t.Log(time.Since(before).Seconds()*1000, "ms") diff --git a/internal/caches/reader.go b/internal/caches/reader.go new file mode 100644 index 0000000..47fa058 --- /dev/null +++ b/internal/caches/reader.go @@ -0,0 +1,29 @@ +package caches + +type ReaderFunc func(n int) (goNext bool, err error) + +type Reader interface { + // 初始化 + Init() error + + // 状态码 + Status() int + + // 读取Header + ReadHeader(buf []byte, callback ReaderFunc) error + + // 读取Body + ReadBody(buf []byte, callback ReaderFunc) error + + // 读取某个范围内的Body + ReadBodyRange(buf []byte, start int64, end int64, callback ReaderFunc) error + + // Header Size + HeaderSize() int64 + + // Body Size + BodySize() int64 + + // 关闭 + Close() error +} diff --git a/internal/caches/reader_file.go b/internal/caches/reader_file.go new file mode 100644 index 0000000..af5aa53 --- /dev/null +++ b/internal/caches/reader_file.go @@ -0,0 +1,291 @@ +package caches + +import ( + "encoding/binary" + "errors" + "github.com/iwind/TeaGo/types" + "io" + "os" +) + +type FileReader struct { + fp *os.File + + status int + headerOffset int64 + headerSize int + bodySize int64 + bodyOffset int64 +} + +func NewFileReader(fp *os.File) *FileReader { + return &FileReader{fp: fp} +} + +func (this *FileReader) Init() error { + isOk := false + + defer func() { + if !isOk { + _ = this.discard() + } + }() + + // 读取状态 + _, err := this.fp.Seek(SizeExpiresAt, io.SeekStart) + if err != nil { + _ = this.discard() + return err + } + buf := make([]byte, 3) + ok, err := this.readToBuff(this.fp, buf) + if err != nil { + return err + } + if !ok { + return ErrNotFound + } + status := types.Int(string(buf)) + if status < 100 || status > 999 { + return errors.New("invalid status") + } + this.status = status + + // URL + _, err = this.fp.Seek(SizeExpiresAt+SizeStatus, io.SeekStart) + if err != nil { + return err + } + + bytes4 := make([]byte, 4) + ok, err = this.readToBuff(this.fp, bytes4) + if err != nil { + return err + } + if !ok { + return ErrNotFound + } + urlLength := binary.BigEndian.Uint32(bytes4) + + // header + ok, err = this.readToBuff(this.fp, bytes4) + if err != nil { + return err + } + if !ok { + return ErrNotFound + } + headerSize := int(binary.BigEndian.Uint32(bytes4)) + if headerSize == 0 { + return nil + } + this.headerSize = headerSize + this.headerOffset = int64(SizeMeta) + int64(urlLength) + + // body + bytes8 := make([]byte, 8) + ok, err = this.readToBuff(this.fp, bytes8) + if err != nil { + return err + } + if !ok { + return ErrNotFound + } + bodySize := int(binary.BigEndian.Uint64(bytes8)) + if bodySize == 0 { + return nil + } + this.bodySize = int64(bodySize) + this.bodyOffset = this.headerOffset + int64(headerSize) + + isOk = true + + return nil +} + +func (this *FileReader) Status() int { + return this.status +} + +func (this *FileReader) HeaderSize() int64 { + return int64(this.headerSize) +} + +func (this *FileReader) BodySize() int64 { + return this.bodySize +} + +func (this *FileReader) ReadHeader(buf []byte, callback ReaderFunc) error { + isOk := false + + defer func() { + if !isOk { + _ = this.discard() + } + }() + + _, err := this.fp.Seek(this.headerOffset, io.SeekStart) + if err != nil { + return err + } + + for { + n, err := this.fp.Read(buf) + if n > 0 { + if n < this.headerSize { + goNext, e := callback(n) + if e != nil { + isOk = true + return e + } + if !goNext { + break + } + this.headerSize -= n + } else { + _, e := callback(this.headerSize) + if e != nil { + isOk = true + return e + } + break + } + } + if err != nil { + if err != io.EOF { + return err + } + break + } + } + + isOk = true + + return nil +} + +func (this *FileReader) ReadBody(buf []byte, callback ReaderFunc) error { + isOk := false + + defer func() { + if !isOk { + _ = this.discard() + } + }() + + _, err := this.fp.Seek(this.bodyOffset, io.SeekStart) + if err != nil { + return err + } + + for { + n, err := this.fp.Read(buf) + if n > 0 { + goNext, e := callback(n) + if e != nil { + isOk = true + return e + } + if !goNext { + break + } + } + if err != nil { + if err != io.EOF { + return err + } + break + } + } + + isOk = true + + return nil +} + +func (this *FileReader) ReadBodyRange(buf []byte, start int64, end int64, callback ReaderFunc) error { + isOk := false + + defer func() { + if !isOk { + _ = this.discard() + } + }() + + offset := start + if start < 0 { + offset = this.bodyOffset + this.bodySize + end + end = this.bodyOffset + this.bodySize - 1 + } else if end < 0 { + offset = this.bodyOffset + start + end = this.bodyOffset + this.bodySize - 1 + } else { + offset = this.bodyOffset + start + end = this.bodyOffset + end + } + if offset < 0 || end < 0 || offset > end { + isOk = true + return ErrInvalidRange + } + _, err := this.fp.Seek(offset, io.SeekStart) + if err != nil { + return err + } + + for { + n, err := this.fp.Read(buf) + if n > 0 { + n2 := int(end-offset) + 1 + if n2 <= n { + _, e := callback(n2) + if e != nil { + isOk = true + return e + } + break + } else { + goNext, e := callback(n) + if e != nil { + isOk = true + return e + } + if !goNext { + break + } + } + + offset += int64(n) + if offset > end { + break + } + } + if err != nil { + if err != io.EOF { + return err + } + break + } + } + + isOk = true + + return nil +} + +func (this *FileReader) Close() error { + return this.fp.Close() +} + +func (this *FileReader) readToBuff(fp *os.File, buf []byte) (ok bool, err error) { + n, err := fp.Read(buf) + if err != nil { + return false, err + } + ok = n == len(buf) + return +} + +func (this *FileReader) discard() error { + _ = this.fp.Close() + return os.Remove(this.fp.Name()) +} diff --git a/internal/caches/reader_file_test.go b/internal/caches/reader_file_test.go new file mode 100644 index 0000000..6a4f77a --- /dev/null +++ b/internal/caches/reader_file_test.go @@ -0,0 +1,151 @@ +package caches + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/iwind/TeaGo/Tea" + "os" + "testing" +) + +func TestFileReader(t *testing.T) { + storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ + Id: 1, + IsOn: true, + Options: map[string]interface{}{ + "dir": Tea.Root + "/caches", + }, + }) + err := storage.Init() + if err != nil { + t.Fatal(err) + } + _, path := storage.keyPath("my-key") + + fp, err := os.Open(path) + if err != nil { + t.Fatal(err) + } + defer func() { + _ = fp.Close() + }() + reader := NewFileReader(fp) + err = reader.Init() + if err != nil { + t.Fatal(err) + } + + t.Log(reader.Status()) + + buf := make([]byte, 10) + err = reader.ReadHeader(buf, func(n int) (goNext bool, err error) { + t.Log("header:", string(buf[:n])) + return true, nil + }) + if err != nil { + t.Fatal(err) + } + + err = reader.ReadBody(buf, func(n int) (goNext bool, err error) { + t.Log("body:", string(buf[:n])) + return true, nil + }) +} + +func TestFileReader_Range(t *testing.T) { + storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ + Id: 1, + IsOn: true, + Options: map[string]interface{}{ + "dir": Tea.Root + "/caches", + }, + }) + err := storage.Init() + if err != nil { + t.Fatal(err) + } + + /**writer, err := storage.Open("my-number", time.Now().Unix()+30*86400, 200, 6, 10) + if err != nil { + t.Fatal(err) + } + _, err = writer.Write([]byte("Header")) + if err != nil { + t.Fatal(err) + } + _, err = writer.Write([]byte("0123456789")) + if err != nil { + t.Fatal(err) + } + _ = writer.Close()**/ + + _, path := storage.keyPath("my-number") + + fp, err := os.Open(path) + if err != nil { + t.Fatal(err) + } + defer func() { + _ = fp.Close() + }() + reader := NewFileReader(fp) + err = reader.Init() + if err != nil { + t.Fatal(err) + } + + buf := make([]byte, 6) + { + err = reader.ReadBodyRange(buf, 0, 0, func(n int) (goNext bool, err error) { + t.Log("[0, 0]", "body:", string(buf[:n])) + return true, nil + }) + if err != nil { + t.Fatal(err) + } + } + { + err = reader.ReadBodyRange(buf, 7, 7, func(n int) (goNext bool, err error) { + t.Log("[7, 7]", "body:", string(buf[:n])) + return true, nil + }) + if err != nil { + t.Fatal(err) + } + } + { + err = reader.ReadBodyRange(buf, 0, 10, func(n int) (goNext bool, err error) { + t.Log("[0, 10]", "body:", string(buf[:n])) + return true, nil + }) + if err != nil { + t.Fatal(err) + } + } + { + err = reader.ReadBodyRange(buf, 3, 5, func(n int) (goNext bool, err error) { + t.Log("[3, 5]", "body:", string(buf[:n])) + return true, nil + }) + if err != nil { + t.Fatal(err) + } + } + { + err = reader.ReadBodyRange(buf, -1, -3, func(n int) (goNext bool, err error) { + t.Log("[, -3]", "body:", string(buf[:n])) + return true, nil + }) + if err != nil { + t.Fatal(err) + } + } + { + err = reader.ReadBodyRange(buf, 3, -1, func(n int) (goNext bool, err error) { + t.Log("[3, ]", "body:", string(buf[:n])) + return true, nil + }) + if err != nil { + t.Fatal(err) + } + } +} diff --git a/internal/caches/reader_memory.go b/internal/caches/reader_memory.go new file mode 100644 index 0000000..c57e59d --- /dev/null +++ b/internal/caches/reader_memory.go @@ -0,0 +1,160 @@ +package caches + +import ( + "errors" +) + +type MemoryReader struct { + item *MemoryItem +} + +func NewMemoryReader(item *MemoryItem) *MemoryReader { + return &MemoryReader{item: item} +} + +func (this *MemoryReader) Init() error { + return nil +} + +func (this *MemoryReader) Status() int { + return this.item.Status +} + +func (this *MemoryReader) HeaderSize() int64 { + return int64(len(this.item.HeaderValue)) +} + +func (this *MemoryReader) BodySize() int64 { + return int64(len(this.item.BodyValue)) +} + +func (this *MemoryReader) ReadHeader(buf []byte, callback ReaderFunc) error { + l := len(buf) + if l == 0 { + return errors.New("using empty buffer") + } + + size := len(this.item.HeaderValue) + offset := 0 + for { + left := size - offset + if l <= left { + copy(buf, this.item.HeaderValue[offset:offset+l]) + goNext, e := callback(l) + if e != nil { + return e + } + if !goNext { + break + } + } else { + copy(buf, this.item.HeaderValue[offset:]) + _, e := callback(left) + if e != nil { + return e + } + break + } + offset += l + if offset >= size { + break + } + } + + return nil +} + +func (this *MemoryReader) ReadBody(buf []byte, callback ReaderFunc) error { + l := len(buf) + if l == 0 { + return errors.New("using empty buffer") + } + + size := len(this.item.BodyValue) + offset := 0 + for { + left := size - offset + if l <= left { + copy(buf, this.item.BodyValue[offset:offset+l]) + goNext, e := callback(l) + if e != nil { + return e + } + if !goNext { + break + } + } else { + copy(buf, this.item.BodyValue[offset:]) + _, e := callback(left) + if e != nil { + return e + } + break + } + offset += l + if offset >= size { + break + } + } + return nil +} + +func (this *MemoryReader) ReadBodyRange(buf []byte, start int64, end int64, callback ReaderFunc) error { + offset := start + bodySize := int64(len(this.item.BodyValue)) + if start < 0 { + offset = bodySize + end + end = bodySize - 1 + } else if end < 0 { + offset = start + end = bodySize - 1 + } + + if end >= bodySize { + end = bodySize - 1 + } + + if offset < 0 || end < 0 || offset > end { + return ErrInvalidRange + } + + newData := this.item.BodyValue[offset : end+1] + + l := len(buf) + if l == 0 { + return errors.New("using empty buffer") + } + + size := len(newData) + offset2 := 0 + for { + left := size - offset2 + if l <= left { + copy(buf, newData[offset2:offset2+l]) + goNext, e := callback(l) + if e != nil { + return e + } + if !goNext { + break + } + } else { + copy(buf, newData[offset2:]) + _, e := callback(left) + if e != nil { + return e + } + break + } + offset2 += l + if offset2 >= size { + break + } + } + + return nil +} + +func (this *MemoryReader) Close() error { + return nil +} diff --git a/internal/caches/reader_memory_test.go b/internal/caches/reader_memory_test.go new file mode 100644 index 0000000..77f6a32 --- /dev/null +++ b/internal/caches/reader_memory_test.go @@ -0,0 +1,105 @@ +package caches + +import "testing" + +func TestMemoryReader_Header(t *testing.T) { + item := &MemoryItem{ + ExpiredAt: 0, + HeaderValue: []byte("0123456789"), + BodyValue: nil, + Status: 2000, + } + reader := NewMemoryReader(item) + buf := make([]byte, 6) + err := reader.ReadHeader(buf, func(n int) (goNext bool, err error) { + t.Log("buf:", string(buf[:n])) + return true, nil + }) + if err != nil { + t.Fatal(err) + } +} + +func TestMemoryReader_Body(t *testing.T) { + item := &MemoryItem{ + ExpiredAt: 0, + HeaderValue: nil, + BodyValue: []byte("0123456789"), + Status: 2000, + } + reader := NewMemoryReader(item) + buf := make([]byte, 6) + err := reader.ReadBody(buf, func(n int) (goNext bool, err error) { + t.Log("buf:", string(buf[:n])) + return true, nil + }) + if err != nil { + t.Fatal(err) + } +} + +func TestMemoryReader_Body_Range(t *testing.T) { + item := &MemoryItem{ + ExpiredAt: 0, + HeaderValue: nil, + BodyValue: []byte("0123456789"), + Status: 2000, + } + reader := NewMemoryReader(item) + buf := make([]byte, 6) + var err error + { + err = reader.ReadBodyRange(buf, 0, 0, func(n int) (goNext bool, err error) { + t.Log("[0, 0]", "body:", string(buf[:n])) + return true, nil + }) + if err != nil { + t.Fatal(err) + } + } + { + err = reader.ReadBodyRange(buf, 7, 7, func(n int) (goNext bool, err error) { + t.Log("[7, 7]", "body:", string(buf[:n])) + return true, nil + }) + if err != nil { + t.Fatal(err) + } + } + { + err = reader.ReadBodyRange(buf, 0, 10, func(n int) (goNext bool, err error) { + t.Log("[0, 10]", "body:", string(buf[:n])) + return true, nil + }) + if err != nil { + t.Fatal(err) + } + } + { + err = reader.ReadBodyRange(buf, 3, 5, func(n int) (goNext bool, err error) { + t.Log("[3, 5]", "body:", string(buf[:n])) + return true, nil + }) + if err != nil { + t.Fatal(err) + } + } + { + err = reader.ReadBodyRange(buf, -1, -3, func(n int) (goNext bool, err error) { + t.Log("[, -3]", "body:", string(buf[:n])) + return true, nil + }) + if err != nil { + t.Fatal(err) + } + } + { + err = reader.ReadBodyRange(buf, 3, -1, func(n int) (goNext bool, err error) { + t.Log("[3, ]", "body:", string(buf[:n])) + return true, nil + }) + if err != nil { + t.Fatal(err) + } + } +} diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 4a896ac..10f0593 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -10,7 +10,6 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/iwind/TeaGo/Tea" - "github.com/iwind/TeaGo/types" stringutil "github.com/iwind/TeaGo/utils/string" "io" "os" @@ -25,17 +24,24 @@ import ( ) const ( - SizeExpiredAt = 10 - SizeKeyLength = 4 - SizeNL = 1 - SizeEnd = 4 + SizeExpiresAt = 4 + SizeStatus = 3 + SizeURLLength = 4 + SizeHeaderLength = 4 + SizeBodyLength = 8 + + SizeMeta = SizeExpiresAt + SizeStatus + SizeURLLength + SizeHeaderLength + SizeBodyLength ) var ( ErrNotFound = errors.New("cache not found") ErrFileIsWriting = errors.New("the file is writing") + ErrInvalidRange = errors.New("invalid range") ) +// 文件缓存 +// 文件结构: +// [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data] type FileStorage struct { policy *serverconfigs.HTTPCachePolicy cacheConfig *serverconfigs.HTTPFileCacheStorage @@ -61,10 +67,10 @@ func (this *FileStorage) Policy() *serverconfigs.HTTPCachePolicy { // 初始化 func (this *FileStorage) Init() error { this.list.OnAdd(func(item *Item) { - atomic.AddInt64(&this.totalSize, item.Size) + atomic.AddInt64(&this.totalSize, item.TotalSize()) }) this.list.OnRemove(func(item *Item) { - atomic.AddInt64(&this.totalSize, -item.Size) + atomic.AddInt64(&this.totalSize, -item.TotalSize()) }) this.locker.Lock() @@ -84,7 +90,15 @@ func (this *FileStorage) Init() error { } cost := time.Since(before).Seconds() * 1000 - remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+", cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+strconv.Itoa(count)+", size: "+fmt.Sprintf("%.3f", float64(size)/1024/1024)+" M") + sizeMB := strconv.FormatInt(size, 10) + " Bytes" + if size > 1024*1024*1024 { + sizeMB = fmt.Sprintf("%.3f G", float64(size)/1024/1024/1024) + } else if size > 1024*1024 { + sizeMB = fmt.Sprintf("%.3f M", float64(size)/1024/1024) + } else if size > 1024 { + sizeMB = fmt.Sprintf("%.3f K", float64(size)/1024) + } + remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+", cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+strconv.Itoa(count)+", size: "+sizeMB) }() // 配置 @@ -131,107 +145,34 @@ func (this *FileStorage) Init() error { return nil } -func (this *FileStorage) Read(key string, readerBuf []byte, callback func(data []byte, size int64, expiredAt int64, isEOF bool)) error { +func (this *FileStorage) OpenReader(key string) (Reader, error) { hash, path := this.keyPath(key) if !this.list.Exist(hash) { - return ErrNotFound + return nil, ErrNotFound } - this.locker.RLock() - defer this.locker.RUnlock() - // TODO 尝试使用mmap加快读取速度 fp, err := os.OpenFile(path, os.O_RDONLY, 0444) if err != nil { if !os.IsNotExist(err) { - return err + return nil, err } - return ErrNotFound + return nil, ErrNotFound } - defer func() { - _ = fp.Close() - }() - // 是否过期 - buf := make([]byte, SizeExpiredAt) - n, err := fp.Read(buf) + reader := NewFileReader(fp) if err != nil { - return err + return nil, err } - if n != len(buf) { - return ErrNotFound - } - - expiredAt := types.Int64(string(buf)) - if expiredAt < time.Now().Unix() { - // 已过期 - _ = fp.Close() - _ = os.Remove(path) - - return ErrNotFound - } - - buf = make([]byte, SizeKeyLength) - n, err = fp.Read(buf) + err = reader.Init() if err != nil { - return err + return nil, err } - if n != len(buf) { - return ErrNotFound - } - keyLength := int(binary.BigEndian.Uint32(buf)) - - offset, err := fp.Seek(-SizeEnd, io.SeekEnd) - if err != nil { - return err - } - buf = make([]byte, SizeEnd) - n, err = fp.Read(buf) - if n != len(buf) { - return ErrNotFound - } - if string(buf) != "\n$$$" { - _ = fp.Close() - _ = os.Remove(path) - return ErrNotFound - } - startOffset := SizeExpiredAt + SizeKeyLength + keyLength + SizeNL - size := int(offset) + SizeEnd - startOffset - valueSize := offset - int64(startOffset) - - _, err = fp.Seek(int64(startOffset), io.SeekStart) - if err != nil { - return err - } - - for { - n, err := fp.Read(readerBuf) - if n > 0 { - size -= n - if size < SizeEnd { // 已经到了末尾区域 - if n <= SizeEnd-size { // 已经到了末尾 - break - } else { - callback(readerBuf[:n-(SizeEnd-size)], valueSize, expiredAt, true) - } - } else { - callback(readerBuf[:n], valueSize, expiredAt, false) - } - } - if err != nil { - if err != io.EOF { - return err - } - - break - } - } - - return nil + return reader, nil } // 打开缓存文件等待写入 -func (this *FileStorage) Open(key string, expiredAt int64) (Writer, error) { +func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) { // 检查是否超出最大值 if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys { return nil, errors.New("write file cache failed: too many keys in cache storage") @@ -256,7 +197,7 @@ func (this *FileStorage) Open(key string, expiredAt int64) (Writer, error) { // 先删除 this.list.Remove(hash) - path := dir + "/" + hash + ".cache" + path := dir + "/" + hash + ".cache.tmp" writer, err := os.OpenFile(path, os.O_CREATE|os.O_SYNC|os.O_WRONLY, 0666) if err != nil { return nil, err @@ -291,21 +232,54 @@ func (this *FileStorage) Open(key string, expiredAt int64) (Writer, error) { } // 写入过期时间 - _, err = writer.WriteString(fmt.Sprintf("%d", expiredAt)) + bytes4 := make([]byte, 4) + { + binary.BigEndian.PutUint32(bytes4, uint32(expiredAt)) + _, err = writer.Write(bytes4) + if err != nil { + return nil, err + } + } + + // 写入状态码 + if status > 999 || status < 100 { + status = 200 + } + _, err = writer.WriteString(strconv.Itoa(status)) if err != nil { return nil, err } - // 写入key length - b := make([]byte, SizeKeyLength) - binary.BigEndian.PutUint32(b, uint32(len(key))) - _, err = writer.Write(b) - if err != nil { - return nil, err + // 写入URL长度 + { + binary.BigEndian.PutUint32(bytes4, uint32(len(key))) + _, err = writer.Write(bytes4) + if err != nil { + return nil, err + } } - // 写入key - _, err = writer.WriteString(key + "\n") + // 写入Header Length + { + binary.BigEndian.PutUint32(bytes4, uint32(0)) + _, err = writer.Write(bytes4) + if err != nil { + return nil, err + } + } + + // 写入Body Length + { + b := make([]byte, SizeBodyLength) + binary.BigEndian.PutUint64(b, uint64(0)) + _, err = writer.Write(b) + if err != nil { + return nil, err + } + } + + // 写入URL + _, err = writer.WriteString(key) if err != nil { return nil, err } @@ -315,90 +289,9 @@ func (this *FileStorage) Open(key string, expiredAt int64) (Writer, error) { return NewFileWriter(writer, key, expiredAt), nil } -// 写入缓存数据 -// 目录结构:$root/p$policyId/$hash[:2]/$hash[2:4]/$hash.cache -// 数据结构: [expiredAt] [key length] [key] \n value \n $$$ -func (this *FileStorage) Write(key string, expiredAt int64, valueReader io.Reader) error { - this.locker.Lock() - defer this.locker.Unlock() - - hash := stringutil.Md5(key) - dir := this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4] - - _, err := os.Stat(dir) - if err != nil { - if !os.IsNotExist(err) { - return err - } - err = os.MkdirAll(dir, 0777) - if err != nil { - return err - } - } - path := dir + "/" + hash + ".cache" - writer, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_SYNC|os.O_WRONLY, 0777) - if err != nil { - return err - } - - isOk := false - defer func() { - err = writer.Close() - if err != nil { - isOk = false - } - - // 如果出错了,就删除文件,避免写一半 - if !isOk { - _ = os.Remove(path) - } - }() - - // 写入过期时间 - _, err = writer.WriteString(fmt.Sprintf("%d", expiredAt)) - if err != nil { - return err - } - - // 写入key length - b := make([]byte, SizeKeyLength) - binary.BigEndian.PutUint32(b, uint32(len(key))) - _, err = writer.Write(b) - if err != nil { - return err - } - - // 写入key - _, err = writer.WriteString(key + "\n") - if err != nil { - return err - } - - // 写入数据 - valueSize, err := io.Copy(writer, valueReader) - if err != nil { - return err - } - - // 写入结束符 - _, err = writer.WriteString("\n$$$") - - isOk = true - - // 写入List - this.list.Add(hash, &Item{ - Key: key, - ExpiredAt: expiredAt, - ValueSize: valueSize, - Size: valueSize + SizeExpiredAt + SizeKeyLength + int64(len(key)) + SizeNL + SizeEnd, - }) - - return nil -} - // 添加到List func (this *FileStorage) AddToList(item *Item) { - item.Size = item.ValueSize + SizeExpiredAt + SizeKeyLength + int64(len(item.Key)) + SizeNL + SizeEnd + item.MetaSize = SizeMeta hash := stringutil.Md5(item.Key) this.list.Add(hash, item) } @@ -553,7 +446,18 @@ func (this *FileStorage) initList() error { this.list.Reset() dir := this.dir() - files, err := filepath.Glob(dir + "/*/*/*.cache") + + // 清除tmp + files, err := filepath.Glob(dir + "/*/*/*.cache.tmp") + if err != nil { + return err + } + for _, path := range files { + _ = os.Remove(path) + } + + // 加载缓存 + files, err = filepath.Glob(dir + "/*/*/*.cache") if err != nil { return err } @@ -603,59 +507,88 @@ func (this *FileStorage) decodeFile(path string) (*Item, error) { if err != nil { return nil, err } + + isAllOk := false defer func() { _ = fp.Close() + + if !isAllOk { + _ = os.Remove(path) + } }() - buf := make([]byte, SizeExpiredAt) - n, err := fp.Read(buf) + item := &Item{ + MetaSize: SizeMeta, + } + + bytes4 := make([]byte, 4) + + // 过期时间 + ok, err := this.readToBuff(fp, bytes4) if err != nil { return nil, err } - if n != len(buf) { - // 数据格式错误 - _ = fp.Close() - _ = os.Remove(path) - + if !ok { return nil, ErrNotFound } - expiredAt := types.Int64(string(buf)) - if expiredAt < time.Now().Unix() { - // 已过期 - _ = fp.Close() - _ = os.Remove(path) + item.ExpiredAt = int64(binary.BigEndian.Uint32(bytes4)) + + // 是否已过期 + if item.ExpiredAt < time.Now().Unix() { return nil, ErrNotFound } - buf = make([]byte, SizeKeyLength) - n, err = fp.Read(buf) + // URL Size + _, err = fp.Seek(int64(SizeExpiresAt+SizeStatus), io.SeekStart) if err != nil { return nil, err } - keyLength := binary.BigEndian.Uint32(buf) - - buf = make([]byte, keyLength) - n, err = fp.Read(buf) + ok, err = this.readToBuff(fp, bytes4) if err != nil { return nil, err } - if n != int(keyLength) { - // 数据格式错误 - _ = fp.Close() - _ = os.Remove(path) + if !ok { return nil, ErrNotFound } + urlSize := binary.BigEndian.Uint32(bytes4) - stat, err := fp.Stat() + // Header Size + ok, err = this.readToBuff(fp, bytes4) if err != nil { return nil, err } + if !ok { + return nil, ErrNotFound + } + item.HeaderSize = int64(binary.BigEndian.Uint32(bytes4)) + + // Body Size + bytes8 := make([]byte, 8) + ok, err = this.readToBuff(fp, bytes8) + if err != nil { + return nil, err + } + if !ok { + return nil, ErrNotFound + } + item.BodySize = int64(binary.BigEndian.Uint64(bytes8)) + + // URL + if urlSize > 0 { + data := utils.BytePool1024.Get() + result, ok, err := this.readN(fp, data, int(urlSize)) + utils.BytePool1024.Put(data) + if err != nil { + return nil, err + } + if !ok { + return nil, ErrNotFound + } + item.Key = string(result) + } + + isAllOk = true - item := &Item{} - item.ExpiredAt = expiredAt - item.Key = string(buf) - item.Size = stat.Size() - item.ValueSize = item.Size - SizeExpiredAt - SizeKeyLength - int64(keyLength) - SizeNL - SizeEnd return item, nil } @@ -669,3 +602,31 @@ func (this *FileStorage) purgeLoop() { } }) } + +func (this *FileStorage) readToBuff(fp *os.File, buf []byte) (ok bool, err error) { + n, err := fp.Read(buf) + if err != nil { + return false, err + } + ok = n == len(buf) + return +} + +func (this *FileStorage) readN(fp *os.File, buf []byte, total int) (result []byte, ok bool, err error) { + for { + n, err := fp.Read(buf) + if err != nil { + return nil, false, err + } + if n > 0 { + if n >= total { + result = append(result, buf[:total]...) + ok = true + return result, ok, nil + } else { + total -= n + result = append(result, buf[:n]...) + } + } + } +} diff --git a/internal/caches/storage_file_test.go b/internal/caches/storage_file_test.go index 705ac89..a5ba661 100644 --- a/internal/caches/storage_file_test.go +++ b/internal/caches/storage_file_test.go @@ -2,11 +2,14 @@ package caches import ( "bytes" + "errors" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/iwind/TeaGo/Tea" _ "github.com/iwind/TeaGo/bootstrap" "github.com/iwind/TeaGo/logs" + "io/ioutil" + "net/http" "runtime" "strconv" "sync" @@ -40,7 +43,7 @@ func TestFileStorage_Init(t *testing.T) { t.Log(len(storage.list.m), "entries left") } -func TestFileStorage_Open(t *testing.T) { +func TestFileStorage_OpenWriter(t *testing.T) { storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ Id: 1, IsOn: true, @@ -57,13 +60,20 @@ func TestFileStorage_Open(t *testing.T) { t.Log(time.Since(now).Seconds()*1000, "ms") }() - writer, err := storage.Open("abc", time.Now().Unix()+3600) + header := []byte("Header") + body := []byte("This is Body") + writer, err := storage.OpenWriter("my-key", time.Now().Unix()+86400, 200) if err != nil { t.Fatal(err) } t.Log(writer) - _, err = writer.Write([]byte("Hello,World")) + _, err = writer.WriteHeader(header) + if err != nil { + t.Fatal(err) + } + + _, err = writer.Write(body) if err != nil { t.Fatal(err) } @@ -72,6 +82,74 @@ func TestFileStorage_Open(t *testing.T) { if err != nil { t.Fatal(err) } + + t.Log("header:", writer.HeaderSize(), "body:", writer.BodySize()) + t.Log("ok") +} + +func TestFileStorage_OpenWriter_HTTP(t *testing.T) { + storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ + Id: 1, + IsOn: true, + Options: map[string]interface{}{ + "dir": Tea.Root + "/caches", + }, + }) + err := storage.Init() + if err != nil { + t.Fatal(err) + } + now := time.Now() + defer func() { + t.Log(time.Since(now).Seconds()*1000, "ms") + }() + + writer, err := storage.OpenWriter("my-http-response", time.Now().Unix()+86400, 200) + if err != nil { + t.Fatal(err) + } + t.Log(writer) + + resp := &http.Response{ + StatusCode: 200, + Header: http.Header{ + "Content-Type": []string{"text/html; charset=utf-8"}, + "Last-Modified": []string{"Wed, 06 Jan 2021 10:03:29 GMT"}, + "Server": []string{"CDN-Server"}, + }, + Body: ioutil.NopCloser(bytes.NewBuffer([]byte("THIS IS HTTP BODY"))), + } + + for k, v := range resp.Header { + for _, v1 := range v { + _, err = writer.WriteHeader([]byte(k + ":" + v1 + "\n")) + if err != nil { + t.Fatal(err) + } + } + } + + buf := make([]byte, 1024) + for { + n, err := resp.Body.Read(buf) + if n > 0 { + _, err = writer.Write(buf[:n]) + if err != nil { + t.Fatal(err) + } + } + if err != nil { + break + } + } + + err = writer.Close() + if err != nil { + t.Fatal(err) + } + + t.Log("header:", writer.HeaderSize(), "body:", writer.BodySize()) + t.Log("ok") } func TestFileStorage_Concurrent_Open_DifferentFile(t *testing.T) { @@ -99,7 +177,7 @@ func TestFileStorage_Concurrent_Open_DifferentFile(t *testing.T) { go func(i int) { defer wg.Done() - writer, err := storage.Open("abc"+strconv.Itoa(i), time.Now().Unix()+3600) + writer, err := storage.OpenWriter("abc"+strconv.Itoa(i), time.Now().Unix()+3600, 200) if err != nil { if err != ErrFileIsWriting { t.Fatal(err) @@ -151,7 +229,7 @@ func TestFileStorage_Concurrent_Open_SameFile(t *testing.T) { go func(i int) { defer wg.Done() - writer, err := storage.Open("abc"+strconv.Itoa(0), time.Now().Unix()+3600) + writer, err := storage.OpenWriter("abc"+strconv.Itoa(0), time.Now().Unix()+3600, 200) if err != nil { if err != ErrFileIsWriting { t.Fatal(err) @@ -179,35 +257,6 @@ func TestFileStorage_Concurrent_Open_SameFile(t *testing.T) { wg.Wait() } -func TestFileStorage_Write(t *testing.T) { - storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ - Id: 1, - IsOn: true, - Options: map[string]interface{}{ - "dir": Tea.Root + "/caches", - }, - }) - err := storage.Init() - if err != nil { - t.Fatal(err) - } - reader := bytes.NewBuffer([]byte(`my_value -my_value2 -my_value3 -my_value4 -my_value5 -my_value6 -my_value7 -my_value8 -my_value9 -my_value10`)) - err = storage.Write("my-key", time.Now().Unix()+3600, reader) - if err != nil { - t.Fatal(err) - } - t.Log("ok") -} - func TestFileStorage_Read(t *testing.T) { storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ Id: 1, @@ -221,9 +270,79 @@ func TestFileStorage_Read(t *testing.T) { t.Fatal(err) } now := time.Now() - t.Log(storage.Read("my-key", make([]byte, 64), func(data []byte, size int64, expiredAt int64, isEOF bool) { - t.Log("[expiredAt]", "["+string(data)+"]") - })) + reader, err := storage.OpenReader("my-key") + if err != nil { + t.Fatal(err) + } + buf := make([]byte, 6) + t.Log(reader.Status()) + err = reader.ReadHeader(buf, func(n int) (goNext bool, err error) { + t.Log("header:", string(buf[:n])) + return true, nil + }) + if err != nil { + t.Fatal(err) + } + err = reader.ReadBody(buf, func(n int) (goNext bool, err error) { + t.Log("body:", string(buf[:n])) + return true, nil + }) + if err != nil { + t.Fatal(err) + } + t.Log(time.Since(now).Seconds()*1000, "ms") +} + +func TestFileStorage_Read_HTTP_Response(t *testing.T) { + storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ + Id: 1, + IsOn: true, + Options: map[string]interface{}{ + "dir": Tea.Root + "/caches", + }, + }) + err := storage.Init() + if err != nil { + t.Fatal(err) + } + now := time.Now() + reader, err := storage.OpenReader("my-http-response") + if err != nil { + t.Fatal(err) + } + buf := make([]byte, 32) + t.Log(reader.Status()) + + headerBuf := []byte{} + err = reader.ReadHeader(buf, func(n int) (goNext bool, err error) { + headerBuf = append(headerBuf, buf...) + for { + nIndex := bytes.Index(headerBuf, []byte{'\n'}) + if nIndex >= 0 { + row := headerBuf[:nIndex] + spaceIndex := bytes.Index(row, []byte{':'}) + if spaceIndex <= 0 { + return false, errors.New("invalid header") + } + + t.Log("header row:", string(row[:spaceIndex]), string(row[spaceIndex+1:])) + headerBuf = headerBuf[nIndex+1:] + } else { + break + } + } + return true, nil + }) + if err != nil { + t.Fatal(err) + } + err = reader.ReadBody(buf, func(n int) (goNext bool, err error) { + t.Log("body:", string(buf[:n])) + return true, nil + }) + if err != nil { + t.Fatal(err) + } t.Log(time.Since(now).Seconds()*1000, "ms") } @@ -240,9 +359,23 @@ func TestFileStorage_Read_NotFound(t *testing.T) { t.Fatal(err) } now := time.Now() - t.Log(storage.Read("my-key-10000", make([]byte, 64), func(data []byte, size int64, expiredAt int64, isEOF bool) { - t.Log("[" + string(data) + "]") - })) + buf := make([]byte, 6) + reader, err := storage.OpenReader("my-key-10000") + if err != nil { + if err == ErrNotFound { + t.Log("cache not fund") + return + } + t.Fatal(err) + } + + err = reader.ReadBody(buf, func(n int) (goNext bool, err error) { + t.Log("body:", string(buf[:n])) + return true, nil + }) + if err != nil { + t.Fatal(err) + } t.Log(time.Since(now).Seconds()*1000, "ms") } @@ -319,38 +452,6 @@ func TestFileStorage_CleanAll(t *testing.T) { t.Log("ok") } -func TestFileStorage_Purge(t *testing.T) { - storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ - Id: 1, - IsOn: true, - Options: map[string]interface{}{ - "dir": Tea.Root + "/caches", - }, - }) - err := storage.Init() - if err != nil { - t.Fatal(err) - } - - _ = storage.Write("a", time.Now().Unix()+3600, bytes.NewReader([]byte("a1"))) - _ = storage.Write("b", time.Now().Unix()+3600, bytes.NewReader([]byte("b1"))) - _ = storage.Write("c", time.Now().Unix()+3600, bytes.NewReader([]byte("c1"))) - _ = storage.Write("d", time.Now().Unix()+3600, bytes.NewReader([]byte("d1"))) - - before := time.Now() - defer func() { - t.Log(time.Since(before).Seconds()*1000, "ms") - }() - - err = storage.Purge([]string{"a", "b1", "c"}, "") - if err != nil { - t.Fatal(err) - } - - t.Log(storage.list.m) - t.Log("ok") -} - func TestFileStorage_Stop(t *testing.T) { storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ Id: 1, @@ -366,6 +467,26 @@ func TestFileStorage_Stop(t *testing.T) { storage.Stop() } +func TestFileStorage_DecodeFile(t *testing.T) { + storage := NewFileStorage(&serverconfigs.HTTPCachePolicy{ + Id: 1, + IsOn: true, + Options: map[string]interface{}{ + "dir": Tea.Root + "/caches", + }, + }) + err := storage.Init() + if err != nil { + t.Fatal(err) + } + _, path := storage.keyPath("my-key") + item, err := storage.decodeFile(path) + if err != nil { + t.Fatal(err) + } + logs.PrintAsJSON(item, t) +} + func BenchmarkFileStorage_Read(b *testing.B) { runtime.GOMAXPROCS(1) @@ -382,9 +503,15 @@ func BenchmarkFileStorage_Read(b *testing.B) { if err != nil { b.Fatal(err) } - buf := make([]byte, 1024) for i := 0; i < b.N; i++ { - _ = storage.Read("my-key", buf, func(data []byte, size int64, expiredAt int64, isEOF bool) { + reader, err := storage.OpenReader("my-key") + if err != nil { + b.Fatal(err) + } + buf := make([]byte, 1024) + _ = reader.ReadBody(buf, func(n int) (goNext bool, err error) { + return true, nil }) + _ = reader.Close() } } diff --git a/internal/caches/storage_interface.go b/internal/caches/storage_interface.go index 89a7abb..23b5167 100644 --- a/internal/caches/storage_interface.go +++ b/internal/caches/storage_interface.go @@ -10,10 +10,10 @@ type StorageInterface interface { Init() error // 读取缓存 - Read(key string, readerBuf []byte, callback func(data []byte, size int64, expiredAt int64, isEOF bool)) error + OpenReader(key string) (Reader, error) // 打开缓存写入器等待写入 - Open(key string, expiredAt int64) (Writer, error) + OpenWriter(key string, expiredAt int64, status int) (Writer, error) // 删除某个键值对应的缓存 Delete(key string) error diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index 58b181a..f98e1a1 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -13,8 +13,10 @@ import ( ) type MemoryItem struct { - ExpiredAt int64 - Value []byte + ExpiredAt int64 + HeaderValue []byte + BodyValue []byte + Status int } type MemoryStorage struct { @@ -39,10 +41,10 @@ func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy) *MemoryStorage { // 初始化 func (this *MemoryStorage) Init() error { this.list.OnAdd(func(item *Item) { - atomic.AddInt64(&this.totalSize, item.Size) + atomic.AddInt64(&this.totalSize, item.Size()) }) this.list.OnRemove(func(item *Item) { - atomic.AddInt64(&this.totalSize, -item.Size) + atomic.AddInt64(&this.totalSize, -item.Size()) }) if this.purgeDuration <= 0 { @@ -61,31 +63,35 @@ func (this *MemoryStorage) Init() error { } // 读取缓存 -func (this *MemoryStorage) Read(key string, readerBuf []byte, callback func(data []byte, size int64, expiredAt int64, isEOF bool)) error { +func (this *MemoryStorage) OpenReader(key string) (Reader, error) { hash := this.hash(key) this.locker.RLock() item := this.valuesMap[hash] if item == nil { this.locker.RUnlock() - return ErrNotFound + return nil, ErrNotFound } if item.ExpiredAt > utils.UnixTime() { - // 这时如果callback处理比较慢的话,可能会影响性能,但目前没有更好的解决方案 - callback(item.Value, int64(len(item.Value)), item.ExpiredAt, true) this.locker.RUnlock() - return nil + + reader := NewMemoryReader(item) + err := reader.Init() + if err != nil { + return nil, err + } + return reader, nil } this.locker.RUnlock() _ = this.Delete(key) - return ErrNotFound + return nil, ErrNotFound } // 打开缓存写入器等待写入 -func (this *MemoryStorage) Open(key string, expiredAt int64) (Writer, error) { +func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) { // 检查是否超出最大值 if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys { return nil, errors.New("write memory cache failed: too many keys in cache storage") @@ -100,7 +106,7 @@ func (this *MemoryStorage) Open(key string, expiredAt int64) (Writer, error) { return nil, err } - return NewMemoryWriter(this.valuesMap, key, expiredAt, this.locker), nil + return NewMemoryWriter(this.valuesMap, key, expiredAt, status, this.locker), nil } // 删除某个键值对应的缓存 @@ -172,7 +178,7 @@ func (this *MemoryStorage) Policy() *serverconfigs.HTTPCachePolicy { // 将缓存添加到列表 func (this *MemoryStorage) AddToList(item *Item) { - item.Size = item.ValueSize + int64(len(item.Key)) + 32 /** 32是我们评估的数据结构的长度 **/ + item.MetaSize = int64(len(item.Key)) + 32 /** 32是我们评估的数据结构的长度 **/ hash := fmt.Sprintf("%d", this.hash(item.Key)) this.list.Add(hash, item) } diff --git a/internal/caches/storage_memory_test.go b/internal/caches/storage_memory_test.go index 5e08fbb..7a93023 100644 --- a/internal/caches/storage_memory_test.go +++ b/internal/caches/storage_memory_test.go @@ -9,21 +9,20 @@ import ( "time" ) -func TestMemoryStorage_Open(t *testing.T) { +func TestMemoryStorage_OpenWriter(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}) - writer, err := storage.Open("abc", time.Now().Unix()+60) + writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200) if err != nil { t.Fatal(err) } + _, _ = writer.WriteHeader([]byte("Header")) _, _ = writer.Write([]byte("Hello")) _, _ = writer.Write([]byte(", World")) t.Log(storage.valuesMap) { - err = storage.Read("abc", make([]byte, 8), func(data []byte, size int64, expiredAt int64, isEOF bool) { - t.Log("read:", string(data)) - }) + reader, err := storage.OpenReader("abc") if err != nil { if err == ErrNotFound { t.Log("not found: abc") @@ -31,12 +30,26 @@ func TestMemoryStorage_Open(t *testing.T) { t.Fatal(err) } } + buf := make([]byte, 1024) + t.Log("status:", reader.Status()) + err = reader.ReadHeader(buf, func(n int) (goNext bool, err error) { + t.Log("header:", string(buf[:n])) + return true, nil + }) + if err != nil { + t.Fatal(err) + } + err = reader.ReadBody(buf, func(n int) (goNext bool, err error) { + t.Log("body:", string(buf[:n])) + return true, nil + }) + if err != nil { + t.Fatal(err) + } } { - err = storage.Read("abc 2", make([]byte, 8), func(data []byte, size int64, expiredAt int64, isEOF bool) { - t.Log("read:", string(data)) - }) + _, err := storage.OpenReader("abc 2") if err != nil { if err == ErrNotFound { t.Log("not found: abc2") @@ -46,15 +59,13 @@ func TestMemoryStorage_Open(t *testing.T) { } } - writer, err = storage.Open("abc", time.Now().Unix()+60) + writer, err = storage.OpenWriter("abc", time.Now().Unix()+60, 200) if err != nil { t.Fatal(err) } _, _ = writer.Write([]byte("Hello123")) { - err = storage.Read("abc", make([]byte, 8), func(data []byte, size int64, expiredAt int64, isEOF bool) { - t.Log("read:", string(data)) - }) + reader, err := storage.OpenReader("abc") if err != nil { if err == ErrNotFound { t.Log("not found: abc") @@ -62,13 +73,21 @@ func TestMemoryStorage_Open(t *testing.T) { t.Fatal(err) } } + buf := make([]byte, 1024) + err = reader.ReadBody(buf, func(n int) (goNext bool, err error) { + t.Log("abc:", string(buf[:n])) + return true, nil + }) + if err != nil { + t.Fatal(err) + } } } func TestMemoryStorage_Delete(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}) { - writer, err := storage.Open("abc", time.Now().Unix()+60) + writer, err := storage.OpenWriter("abc", time.Now().Unix()+60, 200) if err != nil { t.Fatal(err) } @@ -76,7 +95,7 @@ func TestMemoryStorage_Delete(t *testing.T) { t.Log(len(storage.valuesMap)) } { - writer, err := storage.Open("abc1", time.Now().Unix()+60) + writer, err := storage.OpenWriter("abc1", time.Now().Unix()+60, 200) if err != nil { t.Fatal(err) } @@ -91,7 +110,7 @@ func TestMemoryStorage_Stat(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}) expiredAt := time.Now().Unix() + 60 { - writer, err := storage.Open("abc", expiredAt) + writer, err := storage.OpenWriter("abc", expiredAt, 200) if err != nil { t.Fatal(err) } @@ -99,12 +118,12 @@ func TestMemoryStorage_Stat(t *testing.T) { t.Log(len(storage.valuesMap)) storage.AddToList(&Item{ Key: "abc", - Size: 5, + BodySize: 5, ExpiredAt: expiredAt, }) } { - writer, err := storage.Open("abc1", expiredAt) + writer, err := storage.OpenWriter("abc1", expiredAt, 200) if err != nil { t.Fatal(err) } @@ -112,7 +131,7 @@ func TestMemoryStorage_Stat(t *testing.T) { t.Log(len(storage.valuesMap)) storage.AddToList(&Item{ Key: "abc1", - Size: 5, + BodySize: 5, ExpiredAt: expiredAt, }) } @@ -128,26 +147,26 @@ func TestMemoryStorage_CleanAll(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}) expiredAt := time.Now().Unix() + 60 { - writer, err := storage.Open("abc", expiredAt) + writer, err := storage.OpenWriter("abc", expiredAt, 200) if err != nil { t.Fatal(err) } _, _ = writer.Write([]byte("Hello")) storage.AddToList(&Item{ Key: "abc", - Size: 5, + BodySize: 5, ExpiredAt: expiredAt, }) } { - writer, err := storage.Open("abc1", expiredAt) + writer, err := storage.OpenWriter("abc1", expiredAt, 200) if err != nil { t.Fatal(err) } _, _ = writer.Write([]byte("Hello")) storage.AddToList(&Item{ Key: "abc1", - Size: 5, + BodySize: 5, ExpiredAt: expiredAt, }) } @@ -162,26 +181,26 @@ func TestMemoryStorage_Purge(t *testing.T) { storage := NewMemoryStorage(&serverconfigs.HTTPCachePolicy{}) expiredAt := time.Now().Unix() + 60 { - writer, err := storage.Open("abc", expiredAt) + writer, err := storage.OpenWriter("abc", expiredAt, 200) if err != nil { t.Fatal(err) } _, _ = writer.Write([]byte("Hello")) storage.AddToList(&Item{ Key: "abc", - Size: 5, + BodySize: 5, ExpiredAt: expiredAt, }) } { - writer, err := storage.Open("abc1", expiredAt) + writer, err := storage.OpenWriter("abc1", expiredAt, 200) if err != nil { t.Fatal(err) } _, _ = writer.Write([]byte("Hello")) storage.AddToList(&Item{ Key: "abc1", - Size: 5, + BodySize: 5, ExpiredAt: expiredAt, }) } @@ -203,14 +222,14 @@ func TestMemoryStorage_Expire(t *testing.T) { for i := 0; i < 1000; i++ { expiredAt := time.Now().Unix() + int64(rands.Int(0, 60)) key := "abc" + strconv.Itoa(i) - writer, err := storage.Open(key, expiredAt) + writer, err := storage.OpenWriter(key, expiredAt, 200) if err != nil { t.Fatal(err) } _, _ = writer.Write([]byte("Hello")) storage.AddToList(&Item{ Key: key, - Size: 5, + BodySize: 5, ExpiredAt: expiredAt, }) } diff --git a/internal/caches/writer.go b/internal/caches/writer.go index 8e60702..5a09a90 100644 --- a/internal/caches/writer.go +++ b/internal/caches/writer.go @@ -2,11 +2,17 @@ package caches // 缓存内容写入接口 type Writer interface { - // 写入数据 + // 写入Header数据 + WriteHeader(data []byte) (n int, err error) + + // 写入Body数据 Write(data []byte) (n int, err error) - // 写入的总数据大小 - Size() int64 + // 写入的Header数据大小 + HeaderSize() int64 + + // 写入的Body数据大小 + BodySize() int64 // 关闭 Close() error diff --git a/internal/caches/writer_file.go b/internal/caches/writer_file.go index e1f0e0f..7419346 100644 --- a/internal/caches/writer_file.go +++ b/internal/caches/writer_file.go @@ -1,15 +1,19 @@ package caches import ( + "encoding/binary" + "github.com/iwind/TeaGo/types" + "io" "os" + "strings" ) type FileWriter struct { rawWriter *os.File key string - size int64 + headerSize int64 + bodySize int64 expiredAt int64 - isReleased bool } func NewFileWriter(rawWriter *os.File, key string, expiredAt int64) *FileWriter { @@ -21,40 +25,98 @@ func NewFileWriter(rawWriter *os.File, key string, expiredAt int64) *FileWriter } // 写入数据 -func (this *FileWriter) Write(data []byte) (n int, err error) { +func (this *FileWriter) WriteHeader(data []byte) (n int, err error) { n, err = this.rawWriter.Write(data) - this.size += int64(n) + this.headerSize += int64(n) if err != nil { - _ = this.rawWriter.Close() - _ = os.Remove(this.rawWriter.Name()) - this.Release() + _ = this.Discard() } return } +// 写入Header长度数据 +func (this *FileWriter) WriteHeaderLength(headerLength int) error { + bytes4 := make([]byte, 4) + binary.BigEndian.PutUint32(bytes4, uint32(headerLength)) + _, err := this.rawWriter.Seek(SizeExpiresAt+SizeStatus+SizeURLLength, io.SeekStart) + if err != nil { + _ = this.Discard() + return err + } + _, err = this.rawWriter.Write(bytes4) + if err != nil { + _ = this.Discard() + return err + } + return nil +} + +// 写入数据 +func (this *FileWriter) Write(data []byte) (n int, err error) { + n, err = this.rawWriter.Write(data) + this.bodySize += int64(n) + if err != nil { + _ = this.Discard() + } + return +} + +// 写入Body长度数据 +func (this *FileWriter) WriteBodyLength(bodyLength int64) error { + bytes8 := make([]byte, 8) + binary.BigEndian.PutUint64(bytes8, uint64(bodyLength)) + _, err := this.rawWriter.Seek(SizeExpiresAt+SizeStatus+SizeURLLength+SizeHeaderLength, io.SeekStart) + if err != nil { + _ = this.Discard() + return err + } + _, err = this.rawWriter.Write(bytes8) + if err != nil { + _ = this.Discard() + return err + } + return nil +} + // 关闭 func (this *FileWriter) Close() error { - // 写入结束符 - _, err := this.rawWriter.WriteString("\n$$$") + err := this.WriteHeaderLength(types.Int(this.headerSize)) if err != nil { - _ = os.Remove(this.rawWriter.Name()) + return err + } + err = this.WriteBodyLength(this.bodySize) + if err != nil { + return err } - _ = this.rawWriter.Close() - this.Release() + path := this.rawWriter.Name() + err = this.rawWriter.Close() + if err != nil { + _ = os.Remove(path) + } else { + err = os.Rename(path, strings.Replace(path, ".tmp", "", 1)) + if err != nil { + _ = os.Remove(path) + } + } return err } // 丢弃 func (this *FileWriter) Discard() error { + _ = this.rawWriter.Close() + err := os.Remove(this.rawWriter.Name()) - this.Release() return err } -func (this *FileWriter) Size() int64 { - return this.size +func (this *FileWriter) HeaderSize() int64 { + return this.headerSize +} + +func (this *FileWriter) BodySize() int64 { + return this.bodySize } func (this *FileWriter) ExpiredAt() int64 { @@ -64,11 +126,3 @@ func (this *FileWriter) ExpiredAt() int64 { func (this *FileWriter) Key() string { return this.key } - -// 释放锁,一定要调用 -func (this *FileWriter) Release() { - if this.isReleased { - return - } - this.isReleased = true -} diff --git a/internal/caches/writer_gzip.go b/internal/caches/writer_gzip.go index b416bc9..9ef569e 100644 --- a/internal/caches/writer_gzip.go +++ b/internal/caches/writer_gzip.go @@ -18,6 +18,20 @@ func NewGzipWriter(gw Writer, key string, expiredAt int64) Writer { } } +func (this *gzipWriter) WriteHeader(data []byte) (n int, err error) { + return this.writer.Write(data) +} + +// 写入Header长度数据 +func (this *gzipWriter) WriteHeaderLength(headerLength int) error { + return nil +} + +// 写入Body长度数据 +func (this *gzipWriter) WriteBodyLength(bodyLength int64) error { + return nil +} + func (this *gzipWriter) Write(data []byte) (n int, err error) { return this.writer.Write(data) } @@ -46,6 +60,10 @@ func (this *gzipWriter) ExpiredAt() int64 { return this.expiredAt } -func (this *gzipWriter) Size() int64 { - return this.rawWriter.Size() +func (this *gzipWriter) HeaderSize() int64 { + return this.rawWriter.HeaderSize() +} + +func (this *gzipWriter) BodySize() int64 { + return this.rawWriter.BodySize() } diff --git a/internal/caches/writer_memory.go b/internal/caches/writer_memory.go index 33cc271..6e239e6 100644 --- a/internal/caches/writer_memory.go +++ b/internal/caches/writer_memory.go @@ -11,22 +11,25 @@ type MemoryWriter struct { m map[uint64]*MemoryItem locker *sync.RWMutex isFirstWriting bool - size int64 + headerSize int64 + bodySize int64 + status int } -func NewMemoryWriter(m map[uint64]*MemoryItem, key string, expiredAt int64, locker *sync.RWMutex) *MemoryWriter { +func NewMemoryWriter(m map[uint64]*MemoryItem, key string, expiredAt int64, status int, locker *sync.RWMutex) *MemoryWriter { return &MemoryWriter{ m: m, key: key, expiredAt: expiredAt, locker: locker, isFirstWriting: true, + status: status, } } // 写入数据 -func (this *MemoryWriter) Write(data []byte) (n int, err error) { - this.size += int64(len(data)) +func (this *MemoryWriter) WriteHeader(data []byte) (n int, err error) { + this.headerSize += int64(len(data)) hash := this.hash(this.key) this.locker.Lock() @@ -34,14 +37,43 @@ func (this *MemoryWriter) Write(data []byte) (n int, err error) { if ok { // 第一次写先清空 if this.isFirstWriting { - item.Value = nil + item.HeaderValue = nil + item.BodyValue = nil this.isFirstWriting = false } - item.Value = append(item.Value, data...) + item.HeaderValue = append(item.HeaderValue, data...) } else { item := &MemoryItem{} - item.Value = append([]byte{}, data...) + item.HeaderValue = append([]byte{}, data...) item.ExpiredAt = this.expiredAt + item.Status = this.status + this.m[hash] = item + this.isFirstWriting = false + } + this.locker.Unlock() + return len(data), nil +} + +// 写入数据 +func (this *MemoryWriter) Write(data []byte) (n int, err error) { + this.bodySize += int64(len(data)) + + hash := this.hash(this.key) + this.locker.Lock() + item, ok := this.m[hash] + if ok { + // 第一次写先清空 + if this.isFirstWriting { + item.HeaderValue = nil + item.BodyValue = nil + this.isFirstWriting = false + } + item.BodyValue = append(item.BodyValue, data...) + } else { + item := &MemoryItem{} + item.BodyValue = append([]byte{}, data...) + item.ExpiredAt = this.expiredAt + item.Status = this.status this.m[hash] = item this.isFirstWriting = false } @@ -50,8 +82,12 @@ func (this *MemoryWriter) Write(data []byte) (n int, err error) { } // 数据尺寸 -func (this *MemoryWriter) Size() int64 { - return this.size +func (this *MemoryWriter) HeaderSize() int64 { + return this.headerSize +} + +func (this *MemoryWriter) BodySize() int64 { + return this.bodySize } // 关闭 diff --git a/internal/nodes/api_stream.go b/internal/nodes/api_stream.go index 417f219..88a33d3 100644 --- a/internal/nodes/api_stream.go +++ b/internal/nodes/api_stream.go @@ -159,26 +159,38 @@ func (this *APIStream) handleWriteCache(message *pb.NodeStreamMessage) error { } expiredAt := time.Now().Unix() + msg.LifeSeconds - writer, err := storage.Open(msg.Key, expiredAt) + writer, err := storage.OpenWriter(msg.Key, expiredAt, 200) if err != nil { this.replyFail(message.RequestId, "prepare writing failed: "+err.Error()) return err } - _, err = writer.Write(msg.Value) + // 写入一个空的Header + _, err = writer.WriteHeader([]byte(":")) if err != nil { - _ = writer.Discard() this.replyFail(message.RequestId, "write failed: "+err.Error()) return err } - err = writer.Close() - if err == nil { - storage.AddToList(&caches.Item{ - Key: msg.Key, - ExpiredAt: expiredAt, - }) + + // 写入数据 + _, err = writer.Write(msg.Value) + if err != nil { + this.replyFail(message.RequestId, "write failed: "+err.Error()) + return err } + err = writer.Close() + if err != nil { + this.replyFail(message.RequestId, "write failed: "+err.Error()) + return err + } + storage.AddToList(&caches.Item{ + Key: msg.Key, + ExpiredAt: expiredAt, + HeaderSize: writer.HeaderSize(), + BodySize: writer.BodySize(), + }) + this.replyOk(message.RequestId, "write ok") return nil @@ -203,21 +215,20 @@ func (this *APIStream) handleReadCache(message *pb.NodeStreamMessage) error { }() } - buf := make([]byte, 1024) - size := 0 - err = storage.Read(msg.Key, buf, func(data []byte, valueSize int64, expiredAt int64, isEOF bool) { - size += len(data) - }) + reader, err := storage.OpenReader(msg.Key) if err != nil { if err == caches.ErrNotFound { this.replyFail(message.RequestId, "key not found") return nil } this.replyFail(message.RequestId, "read key failed: "+err.Error()) - return err + return nil } + defer func() { + _ = reader.Close() + }() - this.replyOk(message.RequestId, "value "+strconv.Itoa(size)+" bytes") + this.replyOk(message.RequestId, "value "+strconv.FormatInt(reader.BodySize(), 10)+" bytes") return nil } @@ -373,6 +384,13 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error { return } + if resp.StatusCode != 200 { + locker.Lock() + errorMessages = append(errorMessages, "request failed: "+key+": status code '"+strconv.Itoa(resp.StatusCode)+"'") + locker.Unlock() + return + } + defer func() { _ = resp.Body.Close() }() @@ -388,7 +406,7 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error { } expiredAt := time.Now().Unix() + 8600 - writer, err := storage.Open(key, expiredAt) // TODO 可以设置缓存过期时间 + writer, err := storage.OpenWriter(key, expiredAt, 200) // TODO 可以设置缓存过期时间 if err != nil { locker.Lock() errorMessages = append(errorMessages, "open cache writer failed: "+key+": "+err.Error()) @@ -398,6 +416,21 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error { buf := make([]byte, 16*1024) isClosed := false + + // 写入Header + for k, v := range resp.Header { + for _, v1 := range v { + _, err = writer.WriteHeader([]byte(k + ":" + v1 + "\n")) + if err != nil { + locker.Lock() + errorMessages = append(errorMessages, "write failed: "+key+": "+err.Error()) + locker.Unlock() + return + } + } + } + + // 写入Body for { n, err := resp.Body.Read(buf) if n > 0 { @@ -411,6 +444,7 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error { } if err != nil { if err == io.EOF { + err = writer.Close() if err == nil { storage.AddToList(&caches.Item{ @@ -484,7 +518,7 @@ func (this *APIStream) handleCheckSystemdService(message *pb.NodeStreamMessage) cmd.Add(systemctl, "is-enabled", shortName) output, err := cmd.Run() if err != nil { - this.replyFail(message.RequestId, "'systemctl' command error: " + err.Error()) + this.replyFail(message.RequestId, "'systemctl' command error: "+err.Error()) return nil } if output == "enabled" { diff --git a/internal/nodes/http_request_cache.go b/internal/nodes/http_request_cache.go index c03e4b2..f41af57 100644 --- a/internal/nodes/http_request_cache.go +++ b/internal/nodes/http_request_cache.go @@ -2,9 +2,9 @@ package nodes import ( "bytes" + "errors" "github.com/TeaOSLab/EdgeNode/internal/caches" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" - "github.com/iwind/TeaGo/types" "net/http" "strconv" ) @@ -66,74 +66,12 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) { return } - isBroken := false - headerBuf := []byte{} - statusCode := http.StatusOK - statusFound := false - headerFound := false - buf := bytePool32k.Get() - err := storage.Read(key, buf, func(data []byte, valueSize int64, expiredAt int64, isEOF bool) { - if isBroken { - return - } - - // 如果Header已发送完毕 - if headerFound { - _, _ = this.writer.Write(data) - return - } - - headerBuf = append(headerBuf, data...) - - if !statusFound { - lineIndex := bytes.IndexByte(headerBuf, '\n') - if lineIndex < 0 { - return - } - - pieces := bytes.Split(headerBuf[:lineIndex], []byte{' '}) - if len(pieces) < 2 { - isBroken = true - return - } - statusCode = types.Int(string(pieces[1])) - statusFound = true - headerBuf = headerBuf[lineIndex+1:] - - // cache相关变量 - this.varMapping["cache.status"] = "HIT" - } - - for { - lineIndex := bytes.IndexByte(headerBuf, '\n') - if lineIndex < 0 { - break - } - if lineIndex == 0 || lineIndex == 1 { - headerFound = true - - this.processResponseHeaders(statusCode) - this.writer.WriteHeader(statusCode) - - _, _ = this.writer.Write(headerBuf[lineIndex+1:]) - headerBuf = nil - break - } - - // 分解Header - line := headerBuf[:lineIndex] - colonIndex := bytes.IndexByte(line, ':') - if colonIndex <= 0 { - continue - } - this.writer.Header().Set(string(line[:colonIndex]), string(bytes.TrimSpace(line[colonIndex+1:]))) - headerBuf = headerBuf[lineIndex+1:] - } - }) - - bytePool32k.Put(buf) + defer func() { + bytePool32k.Put(buf) + }() + reader, err := storage.OpenReader(key) if err != nil { if err == caches.ErrNotFound { // cache相关变量 @@ -144,11 +82,56 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) { remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error()) return } + defer func() { + _ = reader.Close() + }() - if isBroken { + this.varMapping["cache.status"] = "HIT" + + // 读取Header + headerBuf := []byte{} + err = reader.ReadHeader(buf, func(n int) (goNext bool, err error) { + headerBuf = append(headerBuf, buf[:n]...) + for { + nIndex := bytes.Index(headerBuf, []byte{'\n'}) + if nIndex >= 0 { + row := headerBuf[:nIndex] + spaceIndex := bytes.Index(row, []byte{':'}) + if spaceIndex <= 0 { + return false, errors.New("invalid header '" + string(row) + "'") + } + + this.writer.Header().Set(string(row[:spaceIndex]), string(row[spaceIndex+1:])) + headerBuf = headerBuf[nIndex+1:] + } else { + break + } + } + return true, nil + }) + if err != nil { + remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error()) return } + this.processResponseHeaders(reader.Status()) + this.writer.WriteHeader(reader.Status()) + + // 输出Body + if this.RawReq.Method != http.MethodHead { + err = reader.ReadBody(buf, func(n int) (goNext bool, err error) { + _, err = this.writer.Write(buf[:n]) + if err != nil { + return false, err + } + return true, nil + }) + if err != nil { + remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error()) + return + } + } + this.cacheRef = nil // 终止读取不再往下传递 return true } diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index 99773e9..946b204 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -113,7 +113,6 @@ func (this *HTTPWriter) Write(data []byte) (n int, err error) { if this.cacheWriter != nil { _, err = this.cacheWriter.Write(data) if err != nil { - _ = this.cacheWriter.Discard() this.cacheWriter = nil remotelogs.Error("REQUEST_WRITER", "write cache failed: "+err.Error()) } @@ -216,9 +215,10 @@ func (this *HTTPWriter) Close() { err := this.cacheWriter.Close() if err == nil { this.cacheStorage.AddToList(&caches.Item{ - Key: this.cacheWriter.Key(), - ExpiredAt: this.cacheWriter.ExpiredAt(), - ValueSize: this.cacheWriter.Size(), + Key: this.cacheWriter.Key(), + ExpiredAt: this.cacheWriter.ExpiredAt(), + HeaderSize: this.cacheWriter.HeaderSize(), + BodySize: this.cacheWriter.BodySize(), }) } } @@ -364,7 +364,7 @@ func (this *HTTPWriter) prepareCache(size int64) { life = 60 } expiredAt := utils.UnixTime() + life - cacheWriter, err := storage.Open(this.req.cacheKey, expiredAt) + cacheWriter, err := storage.OpenWriter(this.req.cacheKey, expiredAt, this.StatusCode()) if err != nil { if err != caches.ErrFileIsWriting { remotelogs.Error("REQUEST_WRITER", "write cache failed: "+err.Error()) @@ -377,12 +377,14 @@ func (this *HTTPWriter) prepareCache(size int64) { } // 写入Header - headerData := this.HeaderData() - _, err = cacheWriter.Write(headerData) - if err != nil { - remotelogs.Error("REQUEST_WRITER", "write cache failed: "+err.Error()) - _ = this.cacheWriter.Discard() - this.cacheWriter = nil - return + for k, v := range this.Header() { + for _, v1 := range v { + _, err = cacheWriter.WriteHeader([]byte(k + ":" + v1 + "\n")) + if err != nil { + remotelogs.Error("REQUEST_WRITER", "write cache failed: "+err.Error()) + this.cacheWriter = nil + return + } + } } } diff --git a/internal/utils/byte_pool.go b/internal/utils/byte_pool.go index e211078..60b87d2 100644 --- a/internal/utils/byte_pool.go +++ b/internal/utils/byte_pool.go @@ -1,14 +1,11 @@ package utils -import ( - "time" -) +var BytePool1024 = NewBytePool(20480, 1024) // pool for get byte slice type BytePool struct { c chan []byte length int - ticker *Ticker lastSize int } @@ -25,38 +22,9 @@ func NewBytePool(maxSize, length int) *BytePool { c: make(chan []byte, maxSize), length: length, } - pool.start() return pool } -func (this *BytePool) start() { - // 清除Timer - this.ticker = NewTicker(1 * time.Minute) - go func() { - for this.ticker.Next() { - currentSize := len(this.c) - if currentSize <= 32 || this.lastSize == 0 || this.lastSize != currentSize { - this.lastSize = currentSize - continue - } - - i := 0 - For: - for { - select { - case _ = <-this.c: - i++ - if i >= currentSize/2 { - break For - } - default: - break For - } - } - } - }() -} - // 获取一个新的byte slice func (this *BytePool) Get() (b []byte) { select { @@ -83,8 +51,3 @@ func (this *BytePool) Put(b []byte) { func (this *BytePool) Size() int { return len(this.c) } - -// 销毁 -func (this *BytePool) Destroy() { - this.ticker.Stop() -} diff --git a/internal/utils/number.go b/internal/utils/number.go new file mode 100644 index 0000000..4a755df --- /dev/null +++ b/internal/utils/number.go @@ -0,0 +1,15 @@ +package utils + +func MinInt(min1 int, min2 int) int { + if min1 < min2 { + return min1 + } + return min2 +} + +func MaxInt(min1 int, min2 int) int { + if min1 < min2 { + return min2 + } + return min1 +}