diff --git a/internal/caches/item.go b/internal/caches/item.go index a25362d..3d3cd31 100644 --- a/internal/caches/item.go +++ b/internal/caches/item.go @@ -2,7 +2,15 @@ package caches import "time" +type ItemType = int + +const ( + ItemTypeFile ItemType = 1 + ItemTypeMemory ItemType = 2 +) + type Item struct { + Type ItemType Key string ExpiredAt int64 HeaderSize int64 diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 10f0593..98f9dec 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared" "github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/utils" @@ -43,9 +44,10 @@ var ( // 文件结构: // [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data] type FileStorage struct { - policy *serverconfigs.HTTPCachePolicy - cacheConfig *serverconfigs.HTTPFileCacheStorage - totalSize int64 + policy *serverconfigs.HTTPCachePolicy + cacheConfig *serverconfigs.HTTPFileCacheStorage // 二级缓存 + memoryStorage *MemoryStorage // 一级缓存 + totalSize int64 list *List locker sync.RWMutex @@ -77,6 +79,7 @@ func (this *FileStorage) Init() error { defer this.locker.Unlock() before := time.Now() + cacheDir := "" defer func() { // 统计 count := 0 @@ -98,7 +101,7 @@ func (this *FileStorage) Init() error { } else if size > 1024 { sizeMB = fmt.Sprintf("%.3f K", float64(size)/1024) } - remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+", cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+strconv.Itoa(count)+", size: "+sizeMB) + remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+" from '"+cacheDir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+strconv.Itoa(count)+", size: "+sizeMB) }() // 配置 @@ -112,6 +115,7 @@ func (this *FileStorage) Init() error { return err } this.cacheConfig = cacheConfig + cacheDir = cacheConfig.Dir if !filepath.IsAbs(this.cacheConfig.Dir) { this.cacheConfig.Dir = Tea.Root + Tea.DS + this.cacheConfig.Dir @@ -142,10 +146,49 @@ func (this *FileStorage) Init() error { return err } + // 加载内存缓存 + if this.cacheConfig.MemoryPolicy != nil { + memoryCapacity := this.cacheConfig.MemoryPolicy.Capacity + if memoryCapacity != nil && memoryCapacity.Count > 0 { + memoryPolicy := &serverconfigs.HTTPCachePolicy{ + Id: this.policy.Id, + IsOn: this.policy.IsOn, + Name: this.policy.Name, + Description: this.policy.Description, + Capacity: memoryCapacity, + MaxKeys: this.policy.MaxKeys, + MaxSize: &shared.SizeCapacity{Count: 128, Unit: shared.SizeCapacityUnitMB}, // TODO 将来可以修改 + Type: serverconfigs.CachePolicyStorageMemory, + Options: this.policy.Options, + Life: this.policy.Life, + MinLife: this.policy.MinLife, + MaxLife: this.policy.MaxLife, + } + err = memoryPolicy.Init() + if err != nil { + return err + } + memoryStorage := NewMemoryStorage(memoryPolicy) + err = memoryStorage.Init() + if err != nil { + return err + } + this.memoryStorage = memoryStorage + } + } + return nil } func (this *FileStorage) OpenReader(key string) (Reader, error) { + // 先尝试内存缓存 + if this.memoryStorage != nil { + reader, err := this.memoryStorage.OpenReader(key) + if err == nil { + return reader, err + } + } + hash, path := this.keyPath(key) if !this.list.Exist(hash) { return nil, ErrNotFound @@ -173,6 +216,14 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) { // 打开缓存文件等待写入 func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) { + // 先尝试内存缓存 + if this.memoryStorage != nil { + writer, err := this.memoryStorage.OpenWriter(key, expiredAt, status) + if err == nil { + return writer, nil + } + } + // 检查是否超出最大值 if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys { return nil, errors.New("write file cache failed: too many keys in cache storage") @@ -291,6 +342,13 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr // 添加到List func (this *FileStorage) AddToList(item *Item) { + if this.memoryStorage != nil { + if item.Type == ItemTypeMemory { + this.memoryStorage.AddToList(item) + return + } + } + item.MetaSize = SizeMeta hash := stringutil.Md5(item.Key) this.list.Add(hash, item) @@ -301,6 +359,11 @@ func (this *FileStorage) Delete(key string) error { this.locker.Lock() defer this.locker.Unlock() + // 先尝试内存缓存 + if this.memoryStorage != nil { + _ = this.memoryStorage.Delete(key) + } + hash, path := this.keyPath(key) this.list.Remove(hash) err := os.Remove(path) @@ -325,6 +388,11 @@ func (this *FileStorage) CleanAll() error { this.locker.Lock() defer this.locker.Unlock() + // 先尝试内存缓存 + if this.memoryStorage != nil { + _ = this.memoryStorage.CleanAll() + } + this.list.Reset() // 删除缓存和目录 @@ -378,6 +446,11 @@ func (this *FileStorage) Purge(keys []string, urlType string) error { this.locker.Lock() defer this.locker.Unlock() + // 先尝试内存缓存 + if this.memoryStorage != nil { + _ = this.memoryStorage.Purge(keys, urlType) + } + // 目录 if urlType == "dir" { resultKeys := []string{} @@ -412,6 +485,11 @@ func (this *FileStorage) Stop() { this.locker.Lock() defer this.locker.Unlock() + // 先尝试内存缓存 + if this.memoryStorage != nil { + this.memoryStorage.Stop() + } + this.list.Reset() if this.ticker != nil { this.ticker.Stop() @@ -518,6 +596,7 @@ func (this *FileStorage) decodeFile(path string) (*Item, error) { }() item := &Item{ + Type: ItemTypeFile, MetaSize: SizeMeta, } diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index f98e1a1..5c24e53 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -17,6 +17,7 @@ type MemoryItem struct { HeaderValue []byte BodyValue []byte Status int + IsDone bool } type MemoryStorage struct { @@ -67,15 +68,14 @@ func (this *MemoryStorage) OpenReader(key string) (Reader, error) { hash := this.hash(key) this.locker.RLock() + defer this.locker.RUnlock() + item := this.valuesMap[hash] - if item == nil { - this.locker.RUnlock() + if item == nil || !item.IsDone { return nil, ErrNotFound } if item.ExpiredAt > utils.UnixTime() { - this.locker.RUnlock() - reader := NewMemoryReader(item) err := reader.Init() if err != nil { @@ -83,7 +83,6 @@ func (this *MemoryStorage) OpenReader(key string) (Reader, error) { } return reader, nil } - this.locker.RUnlock() _ = this.Delete(key) @@ -190,7 +189,7 @@ func (this *MemoryStorage) hash(key string) uint64 { // 清理任务 func (this *MemoryStorage) purgeLoop() { - this.list.Purge(1000, func(hash string) { + this.list.Purge(2048, func(hash string) { uintHash, err := strconv.ParseUint(hash, 10, 64) if err == nil { this.locker.Lock() diff --git a/internal/caches/writer.go b/internal/caches/writer.go index 5a09a90..67e12eb 100644 --- a/internal/caches/writer.go +++ b/internal/caches/writer.go @@ -25,4 +25,7 @@ type Writer interface { // 过期时间 ExpiredAt() int64 + + // 内容类型 + ItemType() ItemType } diff --git a/internal/caches/writer_file.go b/internal/caches/writer_file.go index 7419346..77bf2ca 100644 --- a/internal/caches/writer_file.go +++ b/internal/caches/writer_file.go @@ -126,3 +126,8 @@ func (this *FileWriter) ExpiredAt() int64 { func (this *FileWriter) Key() string { return this.key } + +// 内容类型 +func (this *FileWriter) ItemType() ItemType { + return ItemTypeFile +} diff --git a/internal/caches/writer_gzip.go b/internal/caches/writer_gzip.go index 9ef569e..360e80c 100644 --- a/internal/caches/writer_gzip.go +++ b/internal/caches/writer_gzip.go @@ -67,3 +67,8 @@ func (this *gzipWriter) HeaderSize() int64 { func (this *gzipWriter) BodySize() int64 { return this.rawWriter.BodySize() } + +// 内容类型 +func (this *gzipWriter) ItemType() ItemType { + return this.rawWriter.ItemType() +} diff --git a/internal/caches/writer_memory.go b/internal/caches/writer_memory.go index 6e239e6..52ad525 100644 --- a/internal/caches/writer_memory.go +++ b/internal/caches/writer_memory.go @@ -14,10 +14,13 @@ type MemoryWriter struct { headerSize int64 bodySize int64 status int + + hash uint64 + item *MemoryItem } func NewMemoryWriter(m map[uint64]*MemoryItem, key string, expiredAt int64, status int, locker *sync.RWMutex) *MemoryWriter { - return &MemoryWriter{ + w := &MemoryWriter{ m: m, key: key, expiredAt: expiredAt, @@ -25,16 +28,20 @@ func NewMemoryWriter(m map[uint64]*MemoryItem, key string, expiredAt int64, stat isFirstWriting: true, status: status, } + w.hash = w.calculateHash(key) + + return w } // 写入数据 func (this *MemoryWriter) WriteHeader(data []byte) (n int, err error) { this.headerSize += int64(len(data)) - hash := this.hash(this.key) this.locker.Lock() - item, ok := this.m[hash] + item, ok := this.m[this.hash] if ok { + item.IsDone = false + // 第一次写先清空 if this.isFirstWriting { item.HeaderValue = nil @@ -43,13 +50,13 @@ func (this *MemoryWriter) WriteHeader(data []byte) (n int, err error) { } item.HeaderValue = append(item.HeaderValue, data...) } else { - item := &MemoryItem{} + item = &MemoryItem{} item.HeaderValue = append([]byte{}, data...) item.ExpiredAt = this.expiredAt item.Status = this.status - this.m[hash] = item this.isFirstWriting = false } + this.item = item this.locker.Unlock() return len(data), nil } @@ -58,10 +65,11 @@ func (this *MemoryWriter) WriteHeader(data []byte) (n int, err error) { func (this *MemoryWriter) Write(data []byte) (n int, err error) { this.bodySize += int64(len(data)) - hash := this.hash(this.key) this.locker.Lock() - item, ok := this.m[hash] + item, ok := this.m[this.hash] if ok { + item.IsDone = false + // 第一次写先清空 if this.isFirstWriting { item.HeaderValue = nil @@ -70,13 +78,13 @@ func (this *MemoryWriter) Write(data []byte) (n int, err error) { } item.BodyValue = append(item.BodyValue, data...) } else { - item := &MemoryItem{} + item = &MemoryItem{} item.BodyValue = append([]byte{}, data...) item.ExpiredAt = this.expiredAt item.Status = this.status - this.m[hash] = item this.isFirstWriting = false } + this.item = item this.locker.Unlock() return len(data), nil } @@ -92,14 +100,22 @@ func (this *MemoryWriter) BodySize() int64 { // 关闭 func (this *MemoryWriter) Close() error { + if this.item == nil { + return nil + } + + this.locker.Lock() + this.item.IsDone = true + this.m[this.hash] = this.item + this.locker.Unlock() + return nil } // 丢弃 func (this *MemoryWriter) Discard() error { - hash := this.hash(this.key) this.locker.Lock() - delete(this.m, hash) + delete(this.m, this.hash) this.locker.Unlock() return nil } @@ -114,7 +130,12 @@ func (this *MemoryWriter) ExpiredAt() int64 { return this.expiredAt } +// 内容类型 +func (this *MemoryWriter) ItemType() ItemType { + return ItemTypeMemory +} + // 计算Key Hash -func (this *MemoryWriter) hash(key string) uint64 { +func (this *MemoryWriter) calculateHash(key string) uint64 { return xxhash.Sum64String(key) } diff --git a/internal/nodes/api_stream.go b/internal/nodes/api_stream.go index 8c8a3d7..107c210 100644 --- a/internal/nodes/api_stream.go +++ b/internal/nodes/api_stream.go @@ -182,6 +182,7 @@ func (this *APIStream) handleWriteCache(message *pb.NodeStreamMessage) error { return err } storage.AddToList(&caches.Item{ + Type: writer.ItemType(), Key: msg.Key, ExpiredAt: expiredAt, HeaderSize: writer.HeaderSize(), @@ -445,6 +446,7 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error { err = writer.Close() if err == nil { storage.AddToList(&caches.Item{ + Type: writer.ItemType(), Key: key, ExpiredAt: expiredAt, }) diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index 946b204..adefdbc 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -215,6 +215,7 @@ func (this *HTTPWriter) Close() { err := this.cacheWriter.Close() if err == nil { this.cacheStorage.AddToList(&caches.Item{ + Type: this.cacheWriter.ItemType(), Key: this.cacheWriter.Key(), ExpiredAt: this.cacheWriter.ExpiredAt(), HeaderSize: this.cacheWriter.HeaderSize(),