diff --git a/internal/caches/hot_item.go b/internal/caches/hot_item.go new file mode 100644 index 0000000..f13b0d2 --- /dev/null +++ b/internal/caches/hot_item.go @@ -0,0 +1,10 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package caches + +type HotItem struct { + Key string + ExpiresAt int64 + Hits uint32 + Status int +} diff --git a/internal/caches/reader.go b/internal/caches/reader.go index 1223a20..d95d0d6 100644 --- a/internal/caches/reader.go +++ b/internal/caches/reader.go @@ -9,6 +9,9 @@ type Reader interface { // TypeName 类型名称 TypeName() string + // ExpiresAt 过期时间 + ExpiresAt() int64 + // Status 状态码 Status() int diff --git a/internal/caches/reader_file.go b/internal/caches/reader_file.go index d97ec75..ec77204 100644 --- a/internal/caches/reader_file.go +++ b/internal/caches/reader_file.go @@ -11,6 +11,7 @@ import ( type FileReader struct { fp *os.File + expiresAt int64 status int headerOffset int64 headerSize int @@ -43,6 +44,8 @@ func (this *FileReader) Init() error { return ErrNotFound } + this.expiresAt = int64(binary.BigEndian.Uint32(buf[:SizeExpiresAt])) + status := types.Int(string(buf[SizeExpiresAt : SizeExpiresAt+SizeStatus])) if status < 100 || status > 999 { return errors.New("invalid status") @@ -78,6 +81,10 @@ func (this *FileReader) TypeName() string { return "disk" } +func (this *FileReader) ExpiresAt() int64 { + return this.expiresAt +} + func (this *FileReader) Status() int { return this.status } diff --git a/internal/caches/reader_memory.go b/internal/caches/reader_memory.go index 7996773..70b0a43 100644 --- a/internal/caches/reader_memory.go +++ b/internal/caches/reader_memory.go @@ -20,6 +20,10 @@ func (this *MemoryReader) TypeName() string { return "memory" } +func (this *MemoryReader) ExpiresAt() int64 { + return this.item.ExpiredAt +} + func (this *MemoryReader) Status() int { return this.item.Status } diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index a5fc25c..0b07c3e 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -22,6 +22,7 @@ import ( "os" "path/filepath" "regexp" + "sort" "strconv" "strings" "sync" @@ -40,6 +41,10 @@ const ( SizeMeta = SizeExpiresAt + SizeStatus + SizeURLLength + SizeHeaderLength + SizeBodyLength ) +const ( + HotItemSize = 1024 +) + // FileStorage 文件缓存 // 文件结构: // [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data] @@ -52,13 +57,20 @@ type FileStorage struct { list ListInterface writingKeyMap map[string]bool // key => bool locker sync.RWMutex - ticker *utils.Ticker + purgeTicker *utils.Ticker + + hotMap map[string]*HotItem // key => count + hotMapLocker sync.Mutex + lastHotSize int + hotTicker *utils.Ticker } func NewFileStorage(policy *serverconfigs.HTTPCachePolicy) *FileStorage { return &FileStorage{ policy: policy, writingKeyMap: map[string]bool{}, + hotMap: map[string]*HotItem{}, + lastHotSize: -1, } } @@ -191,8 +203,12 @@ func (this *FileStorage) Init() error { } func (this *FileStorage) OpenReader(key string) (Reader, error) { + return this.openReader(key, true) +} + +func (this *FileStorage) openReader(key string, allowMemory bool) (Reader, error) { // 先尝试内存缓存 - if this.memoryStorage != nil { + if allowMemory && this.memoryStorage != nil { reader, err := this.memoryStorage.OpenReader(key) if err == nil { return reader, err @@ -237,12 +253,40 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) { // 增加点击量 // 1/1000采样 - // TODO 考虑是否在缓存策略里设置 - if rands.Int(0, 1000) == 0 { - var hitErr = this.list.IncreaseHit(hash) - if hitErr != nil { - // 此错误可以忽略 - remotelogs.Error("CACHE", "increase hit failed: "+hitErr.Error()) + if allowMemory { + var rate = this.policy.PersistenceHitSampleRate + if rate <= 0 { + rate = 1000 + } + if this.lastHotSize == 0 { + // 自动降低采样率来增加热点数据的缓存几率 + rate = rate / 10 + } + if rands.Int(0, rate) == 0 { + var hitErr = this.list.IncreaseHit(hash) + if hitErr != nil { + // 此错误可以忽略 + remotelogs.Error("CACHE", "increase hit failed: "+hitErr.Error()) + } + + // 增加到热点 + // 这里不收录缓存尺寸过大的文件 + if this.memoryStorage != nil && reader.BodySize() > 0 && reader.BodySize() < 128*1024*1024 { + this.hotMapLocker.Lock() + hotItem, ok := this.hotMap[key] + if ok { + hotItem.Hits++ + hotItem.ExpiresAt = reader.expiresAt + } else if len(this.hotMap) < HotItemSize { // 控制数量 + this.hotMap[key] = &HotItem{ + Key: key, + ExpiresAt: reader.ExpiresAt(), + Status: reader.Status(), + Hits: 1, + } + } + this.hotMapLocker.Unlock() + } } } @@ -574,8 +618,11 @@ func (this *FileStorage) Stop() { } _ = this.list.Reset() - if this.ticker != nil { - this.ticker.Stop() + if this.purgeTicker != nil { + this.purgeTicker.Stop() + } + if this.hotTicker != nil { + this.hotTicker.Stop() } _ = this.list.Close() @@ -645,19 +692,32 @@ func (this *FileStorage) initList() error { autoPurgeInterval = 10 } } - this.ticker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second) + this.purgeTicker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second) events.On(events.EventQuit, func() { remotelogs.Println("CACHE", "quit clean timer") - var ticker = this.ticker + var ticker = this.purgeTicker if ticker != nil { ticker.Stop() } }) go func() { - for this.ticker.Next() { - var tr = trackers.Begin("FILE_CACHE_STORAGE_PURGE_LOOP") - this.purgeLoop() - tr.End() + for this.purgeTicker.Next() { + trackers.Run("FILE_CACHE_STORAGE_PURGE_LOOP", func() { + this.purgeLoop() + }) + } + }() + + // 热点处理任务 + this.hotTicker = utils.NewTicker(1 * time.Minute) + if Tea.IsTesting() { + this.hotTicker = utils.NewTicker(10 * time.Second) + } + go func() { + for this.hotTicker.Next() { + trackers.Run("FILE_CACHE_STORAGE_HOT_LOOP", func() { + this.hotLoop() + }) } }() @@ -842,6 +902,102 @@ func (this *FileStorage) purgeLoop() { } } +// 热点数据任务 +func (this *FileStorage) hotLoop() { + var memoryStorage = this.memoryStorage + if memoryStorage == nil { + return + } + + this.hotMapLocker.Lock() + if len(this.hotMap) == 0 { + this.hotMapLocker.Unlock() + this.lastHotSize = 0 + return + } + + this.lastHotSize = len(this.hotMap) + + var result = []*HotItem{} // [ {key: ..., hits: ...}, ... ] + for _, v := range this.hotMap { + result = append(result, v) + } + + this.hotMap = map[string]*HotItem{} + this.hotMapLocker.Unlock() + + // 取Top10 + if len(result) > 0 { + sort.Slice(result, func(i, j int) bool { + return result[i].Hits > result[j].Hits + }) + var size = 1 + if len(result) < 10 { + size = 1 + } else { + size = len(result) / 10 + } + + var buf = make([]byte, 32*1024) + for _, item := range result[:size] { + reader, err := this.openReader(item.Key, false) + if err != nil { + continue + } + if reader == nil { + continue + } + if reader.ExpiresAt() <= time.Now().Unix() { + continue + } + + writer, err := this.memoryStorage.openWriter(item.Key, item.ExpiresAt, item.Status, false) + if err != nil { + if !CanIgnoreErr(err) { + remotelogs.Error("CACHE", "transfer hot item failed: "+err.Error()) + } + _ = reader.Close() + continue + } + if writer == nil { + _ = reader.Close() + continue + } + + err = reader.ReadHeader(buf, func(n int) (goNext bool, err error) { + _, err = writer.WriteHeader(buf[:n]) + return + }) + if err != nil { + _ = reader.Close() + _ = writer.Discard() + continue + } + + err = reader.ReadBody(buf, func(n int) (goNext bool, err error) { + _, err = writer.Write(buf[:n]) + return + }) + if err != nil { + _ = reader.Close() + _ = writer.Discard() + continue + } + + this.memoryStorage.AddToList(&Item{ + Type: writer.ItemType(), + Key: item.Key, + ExpiredAt: item.ExpiresAt, + HeaderSize: writer.HeaderSize(), + BodySize: writer.BodySize(), + }) + + _ = reader.Close() + _ = writer.Close() + } + } +} + func (this *FileStorage) readToBuff(fp *os.File, buf []byte) (ok bool, err error) { n, err := fp.Read(buf) if err != nil { diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index b2b9a7a..bf4cf89 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -144,6 +144,10 @@ func (this *MemoryStorage) OpenReader(key string) (Reader, error) { // OpenWriter 打开缓存写入器等待写入 func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) { + return this.openWriter(key, expiredAt, status, true) +} + +func (this *MemoryStorage) openWriter(key string, expiredAt int64, status int, isDirty bool) (Writer, error) { this.locker.Lock() defer this.locker.Unlock() @@ -187,7 +191,7 @@ func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) ( } isWriting = true - return NewMemoryWriter(this, key, expiredAt, status, func() { + return NewMemoryWriter(this, key, expiredAt, status, isDirty, func() { this.locker.Lock() delete(this.writingKeyMap, key) this.locker.Unlock() diff --git a/internal/caches/writer_memory.go b/internal/caches/writer_memory.go index e0920bd..3a3b173 100644 --- a/internal/caches/writer_memory.go +++ b/internal/caches/writer_memory.go @@ -13,13 +13,14 @@ type MemoryWriter struct { headerSize int64 bodySize int64 status int + isDirty bool hash uint64 item *MemoryItem endFunc func() } -func NewMemoryWriter(memoryStorage *MemoryStorage, key string, expiredAt int64, status int, endFunc func()) *MemoryWriter { +func NewMemoryWriter(memoryStorage *MemoryStorage, key string, expiredAt int64, status int, isDirty bool, endFunc func()) *MemoryWriter { w := &MemoryWriter{ storage: memoryStorage, key: key, @@ -30,6 +31,7 @@ func NewMemoryWriter(memoryStorage *MemoryStorage, key string, expiredAt int64, Status: status, }, status: status, + isDirty: isDirty, endFunc: endFunc, } w.hash = w.calculateHash(key) @@ -73,11 +75,13 @@ func (this *MemoryWriter) Close() error { this.storage.locker.Lock() this.item.IsDone = true this.storage.valuesMap[this.hash] = this.item - if this.storage.parentStorage != nil { - select { - case this.storage.dirtyChan <- this.key: - default: + if this.isDirty { + if this.storage.parentStorage != nil { + select { + case this.storage.dirtyChan <- this.key: + default: + } } } this.storage.locker.Unlock() diff --git a/internal/trackers/label.go b/internal/trackers/label.go index 4579179..825ea33 100644 --- a/internal/trackers/label.go +++ b/internal/trackers/label.go @@ -13,6 +13,12 @@ func Begin(label string) *tracker { return &tracker{label: label, startTime: time.Now()} } +func Run(label string, f func()) { + var tr = Begin(label) + f() + tr.End() +} + func (this *tracker) End() { SharedManager.Add(this.label, time.Since(this.startTime).Seconds()*1000) } diff --git a/internal/utils/ticker_test.go b/internal/utils/ticker_test.go index 40dbbac..46875c5 100644 --- a/internal/utils/ticker_test.go +++ b/internal/utils/ticker_test.go @@ -6,6 +6,20 @@ import ( "time" ) +func TestRawTicker(t *testing.T) { + var ticker = time.NewTicker(2 * time.Second) + go func() { + for range ticker.C { + t.Log("tick") + } + t.Log("stop") + }() + + time.Sleep(6 * time.Second) + ticker.Stop() + time.Sleep(1 * time.Second) +} + func TestTicker(t *testing.T) { ticker := NewTicker(3 * time.Second) go func() {