diff --git a/internal/caches/item.go b/internal/caches/item.go index bdd2717..ab2f9f7 100644 --- a/internal/caches/item.go +++ b/internal/caches/item.go @@ -1,6 +1,8 @@ package caches -import "time" +import ( + "github.com/TeaOSLab/EdgeNode/internal/utils" +) type ItemType = int @@ -19,7 +21,7 @@ type Item struct { } func (this *Item) IsExpired() bool { - return this.ExpiredAt < time.Now().Unix() + return this.ExpiredAt < utils.UnixTime() } func (this *Item) TotalSize() int64 { diff --git a/internal/caches/list_memory.go b/internal/caches/list_memory.go index 407182f..00241a6 100644 --- a/internal/caches/list_memory.go +++ b/internal/caches/list_memory.go @@ -1,32 +1,48 @@ package caches import ( + "github.com/iwind/TeaGo/logs" + "strconv" "strings" "sync" + "testing" ) // MemoryList 内存缓存列表管理 type MemoryList struct { - m map[string]*Item // hash => item + itemMaps map[string]map[string]*Item // prefix => { hash => item } + prefixes []string locker sync.RWMutex onAdd func(item *Item) onRemove func(item *Item) + + purgeIndex int } func NewMemoryList() ListInterface { return &MemoryList{ - m: map[string]*Item{}, + itemMaps: map[string]map[string]*Item{}, } } func (this *MemoryList) Init() error { - // 内存列表不需要初始化 + this.prefixes = []string{"000"} + for i := 100; i <= 999; i++ { + this.prefixes = append(this.prefixes, strconv.Itoa(i)) + } + + for _, prefix := range this.prefixes { + this.itemMaps[prefix] = map[string]*Item{} + } + return nil } func (this *MemoryList) Reset() error { this.locker.Lock() - this.m = map[string]*Item{} + for key := range this.itemMaps { + this.itemMaps[key] = map[string]*Item{} + } this.locker.Unlock() return nil } @@ -34,8 +50,15 @@ func (this *MemoryList) Reset() error { func (this *MemoryList) Add(hash string, item *Item) error { this.locker.Lock() + prefix := this.prefix(hash) + itemMap, ok := this.itemMaps[prefix] + if !ok { + itemMap = map[string]*Item{} + this.itemMaps[prefix] = itemMap + } + // 先删除,为了可以正确触发统计 - oldItem, ok := this.m[hash] + oldItem, ok := itemMap[hash] if ok { if this.onRemove != nil { this.onRemove(oldItem) @@ -46,7 +69,8 @@ func (this *MemoryList) Add(hash string, item *Item) error { if this.onAdd != nil { this.onAdd(item) } - this.m[hash] = item + + itemMap[hash] = item this.locker.Unlock() return nil } @@ -55,7 +79,12 @@ func (this *MemoryList) Exist(hash string) (bool, error) { this.locker.RLock() defer this.locker.RUnlock() - item, ok := this.m[hash] + prefix := this.prefix(hash) + itemMap, ok := this.itemMaps[prefix] + if !ok { + return false, nil + } + item, ok := itemMap[hash] if !ok { return false, nil } @@ -69,9 +98,11 @@ func (this *MemoryList) FindKeysWithPrefix(prefix string) (keys []string, err er defer this.locker.RUnlock() // TODO 需要优化性能,支持千万级数据低于1s的处理速度 - for _, item := range this.m { - if strings.HasPrefix(item.Key, prefix) { - keys = append(keys, item.Key) + for _, itemMap := range this.itemMaps { + for _, item := range itemMap { + if strings.HasPrefix(item.Key, prefix) { + keys = append(keys, item.Key) + } } } return @@ -80,12 +111,18 @@ func (this *MemoryList) FindKeysWithPrefix(prefix string) (keys []string, err er func (this *MemoryList) Remove(hash string) error { this.locker.Lock() - item, ok := this.m[hash] + itemMap, ok := this.itemMaps[this.prefix(hash)] + if !ok { + this.locker.Unlock() + return nil + } + + item, ok := itemMap[hash] if ok { if this.onRemove != nil { this.onRemove(item) } - delete(this.m, hash) + delete(itemMap, hash) } this.locker.Unlock() @@ -98,7 +135,20 @@ func (this *MemoryList) Remove(hash string) error { func (this *MemoryList) Purge(count int, callback func(hash string) error) error { this.locker.Lock() deletedHashList := []string{} - for hash, item := range this.m { + + if this.purgeIndex >= len(this.prefixes) { + this.purgeIndex = 0 + } + prefix := this.prefixes[this.purgeIndex] + + this.purgeIndex++ + + itemMap, ok := this.itemMaps[prefix] + if !ok { + this.locker.Unlock() + return nil + } + for hash, item := range itemMap { if count <= 0 { break } @@ -107,7 +157,7 @@ func (this *MemoryList) Purge(count int, callback func(hash string) error) error if this.onRemove != nil { this.onRemove(item) } - delete(this.m, hash) + delete(itemMap, hash) deletedHashList = append(deletedHashList, hash) } @@ -139,13 +189,15 @@ func (this *MemoryList) Stat(check func(hash string) bool) (*Stat, error) { Count: 0, Size: 0, } - for hash, item := range this.m { - if !item.IsExpired() { - // 检查文件是否存在、内容是否正确等 - if check != nil && check(hash) { - result.Count++ - result.ValueSize += item.Size() - result.Size += item.TotalSize() + for _, itemMap := range this.itemMaps { + for hash, item := range itemMap { + if !item.IsExpired() { + // 检查文件是否存在、内容是否正确等 + if check != nil && check(hash) { + result.Count++ + result.ValueSize += item.Size() + result.Size += item.TotalSize() + } } } } @@ -155,9 +207,12 @@ func (this *MemoryList) Stat(check func(hash string) bool) (*Stat, error) { // Count 总数量 func (this *MemoryList) Count() (int64, error) { this.locker.RLock() - count := int64(len(this.m)) + var count = 0 + for _, itemMap := range this.itemMaps { + count += len(itemMap) + } this.locker.RUnlock() - return count, nil + return int64(count), nil } // OnAdd 添加事件 @@ -169,3 +224,27 @@ func (this *MemoryList) OnAdd(f func(item *Item)) { func (this *MemoryList) OnRemove(f func(item *Item)) { this.onRemove = f } + +func (this *MemoryList) print(t *testing.T) { + this.locker.Lock() + for _, itemMap := range this.itemMaps { + if len(itemMap) > 0 { + logs.PrintAsJSON(itemMap, t) + } + } + this.locker.Unlock() +} + +func (this *MemoryList) prefix(hash string) string { + var prefix string + if len(hash) > 3 { + prefix = hash[:3] + } else { + prefix = hash + } + _, ok := this.itemMaps[prefix] + if !ok { + prefix = "000" + } + return prefix +} diff --git a/internal/caches/list_memory_test.go b/internal/caches/list_memory_test.go index 48a325b..daafe30 100644 --- a/internal/caches/list_memory_test.go +++ b/internal/caches/list_memory_test.go @@ -3,6 +3,8 @@ package caches import ( "fmt" "github.com/cespare/xxhash" + "github.com/iwind/TeaGo/logs" + "github.com/iwind/TeaGo/rands" "math/rand" "strconv" "testing" @@ -11,6 +13,7 @@ import ( func TestMemoryList_Add(t *testing.T) { list := NewMemoryList().(*MemoryList) + _ = list.Init() _ = list.Add("a", &Item{ Key: "a1", ExpiredAt: time.Now().Unix() + 3600, @@ -21,11 +24,18 @@ func TestMemoryList_Add(t *testing.T) { ExpiredAt: time.Now().Unix() + 3600, HeaderSize: 1024, }) - t.Log(list.m) + _ = list.Add("123456", &Item{ + Key: "c1", + ExpiredAt: time.Now().Unix() + 3600, + HeaderSize: 1024, + }) + t.Log(list.prefixes) + logs.PrintAsJSON(list.itemMaps, t) } func TestMemoryList_Remove(t *testing.T) { list := NewMemoryList().(*MemoryList) + _ = list.Init() _ = list.Add("a", &Item{ Key: "a1", ExpiredAt: time.Now().Unix() + 3600, @@ -37,11 +47,12 @@ func TestMemoryList_Remove(t *testing.T) { HeaderSize: 1024, }) _ = list.Remove("b") - t.Log(list.m) + list.print(t) } func TestMemoryList_Purge(t *testing.T) { list := NewMemoryList().(*MemoryList) + _ = list.Init() _ = list.Add("a", &Item{ Key: "a1", ExpiredAt: time.Now().Unix() + 3600, @@ -66,11 +77,35 @@ func TestMemoryList_Purge(t *testing.T) { t.Log("delete:", hash) return nil }) - t.Log(list.m) + list.print(t) + + for i := 0; i < 1000; i++ { + _ = list.Purge(100, func(hash string) error { + t.Log("delete:", hash) + return nil + }) + t.Log(list.purgeIndex) + } +} + +func TestMemoryList_Purge_Large_List(t *testing.T) { + list := NewMemoryList().(*MemoryList) + _ = list.Init() + + for i := 0; i < 1_000_000; i++ { + _ = list.Add("a"+strconv.Itoa(i), &Item{ + Key: "a" + strconv.Itoa(i), + ExpiredAt: time.Now().Unix() + int64(rands.Int(0, 24*3600)), + HeaderSize: 1024, + }) + } + + time.Sleep(1 * time.Hour) } func TestMemoryList_Stat(t *testing.T) { list := NewMemoryList() + _ = list.Init() _ = list.Add("a", &Item{ Key: "a1", ExpiredAt: time.Now().Unix() + 3600, @@ -101,9 +136,10 @@ func TestMemoryList_Stat(t *testing.T) { func TestMemoryList_FindKeysWithPrefix(t *testing.T) { list := NewMemoryList() + _ = list.Init() before := time.Now() for i := 0; i < 1_000_000; i++ { - key := "http://www.teaos.cn/hello" + strconv.Itoa(i/100000) + "/" + strconv.Itoa(i) + ".html" + key := "http://www.teaos.cn/hello/" + strconv.Itoa(i/10000) + "/" + strconv.Itoa(i) + ".html" _ = list.Add(fmt.Sprintf("%d", xxhash.Sum64String(key)), &Item{ Key: key, ExpiredAt: 0, @@ -114,7 +150,7 @@ func TestMemoryList_FindKeysWithPrefix(t *testing.T) { t.Log(time.Since(before).Seconds()*1000, "ms") before = time.Now() - keys, err := list.FindKeysWithPrefix("http://www.teaos.cn/hello/5000") + keys, err := list.FindKeysWithPrefix("http://www.teaos.cn/hello/50") if err != nil { t.Fatal(err) } @@ -124,6 +160,7 @@ func TestMemoryList_FindKeysWithPrefix(t *testing.T) { func TestMemoryList_GC(t *testing.T) { list := NewMemoryList().(*MemoryList) + _ = list.Init() for i := 0; i < 1_000_000; i++ { key := "http://www.teaos.cn/hello" + strconv.Itoa(i/100000) + "/" + strconv.Itoa(i) + ".html" _ = list.Add(fmt.Sprintf("%d", xxhash.Sum64String(key)), &Item{ @@ -134,8 +171,9 @@ func TestMemoryList_GC(t *testing.T) { }) } time.Sleep(10 * time.Second) - t.Log("clean...", len(list.m)) + t.Log("clean...", len(list.itemMaps)) _ = list.CleanAll() + t.Log("cleanAll...", len(list.itemMaps)) before := time.Now() //runtime.GC() t.Log("gc cost:", time.Since(before).Seconds()*1000, "ms") diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index 804a0b7..a20435c 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -54,7 +54,7 @@ func (this *MemoryStorage) Init() error { }) if this.purgeDuration <= 0 { - this.purgeDuration = 30 * time.Second + this.purgeDuration = 10 * time.Second } // 启动定时清理任务