diff --git a/internal/grids/README.md b/internal/grids/README.md deleted file mode 100644 index c9cca8f..0000000 --- a/internal/grids/README.md +++ /dev/null @@ -1,2 +0,0 @@ -# Memory Grid -Cache items in memory, using partitions and LRU. \ No newline at end of file diff --git a/internal/grids/cell.go b/internal/grids/cell.go deleted file mode 100644 index c60388d..0000000 --- a/internal/grids/cell.go +++ /dev/null @@ -1,186 +0,0 @@ -package grids - -import ( - "math" - "sync" - "time" -) - -type Cell struct { - LimitSize int64 - LimitCount int - - mapping map[uint64]*Item // key => item - list *List // { item1, item2, ... } - totalBytes int64 - locker sync.RWMutex -} - -func NewCell() *Cell { - return &Cell{ - mapping: map[uint64]*Item{}, - list: NewList(), - } -} - -func (this *Cell) Write(hashKey uint64, item *Item) { - if item == nil { - return - } - this.locker.Lock() - - oldItem, ok := this.mapping[hashKey] - if ok { - this.list.Remove(oldItem) - - if this.LimitSize > 0 { - this.totalBytes -= oldItem.Size() - } - } - - // limit count - if this.LimitCount > 0 && len(this.mapping) >= this.LimitCount { - this.locker.Unlock() - return - } - - // trim memory - size := item.Size() - shouldTrim := false - if this.LimitSize > 0 && this.LimitSize < this.totalBytes+size { - this.Trim() - shouldTrim = true - } - - // compare again - if shouldTrim { - if this.LimitSize > 0 && this.LimitSize < this.totalBytes+size { - this.locker.Unlock() - return - } - } - - this.totalBytes += size - - this.list.Add(item) - this.mapping[hashKey] = item - - this.locker.Unlock() -} - -func (this *Cell) Increase64(key []byte, expireAt int64, hashKey uint64, delta int64) (result int64) { - this.locker.Lock() - item, ok := this.mapping[hashKey] - if ok { - // reset to zero if expired - if item.ExpireAt < time.Now().Unix() { - item.ValueInt64 = 0 - item.ExpireAt = expireAt - } - item.IncreaseInt64(delta) - result = item.ValueInt64 - } else { - item := NewItem(key, ItemInt64) - item.ValueInt64 = delta - item.ExpireAt = expireAt - this.mapping[hashKey] = item - result = delta - } - this.locker.Unlock() - return -} - -func (this *Cell) Read(hashKey uint64) *Item { - this.locker.Lock() - - item, ok := this.mapping[hashKey] - if ok { - this.list.Remove(item) - this.list.Add(item) - - this.locker.Unlock() - - if item.ExpireAt < time.Now().Unix() { - return nil - } - return item - } - - this.locker.Unlock() - return nil -} - -func (this *Cell) Stat() *CellStat { - this.locker.RLock() - defer this.locker.RUnlock() - - return &CellStat{ - TotalBytes: this.totalBytes, - CountItems: len(this.mapping), - } -} - -// trim NOT ACTIVE items -// should called in locker context -func (this *Cell) Trim() { - l := len(this.mapping) - if l == 0 { - return - } - - inactiveSize := int(math.Ceil(float64(l) / 10)) // trim 10% items - this.list.Range(func(item *Item) (goNext bool) { - inactiveSize-- - delete(this.mapping, item.HashKey()) - this.list.Remove(item) - this.totalBytes -= item.Size() - return inactiveSize > 0 - }) -} - -func (this *Cell) Delete(hashKey uint64) { - this.locker.Lock() - item, ok := this.mapping[hashKey] - if ok { - delete(this.mapping, hashKey) - this.list.Remove(item) - this.totalBytes -= item.Size() - } - this.locker.Unlock() -} - -// range all items in the cell -func (this *Cell) Range(f func(item *Item)) { - this.locker.Lock() - for _, item := range this.mapping { - f(item) - } - this.locker.Unlock() -} - -func (this *Cell) Recycle() { - this.locker.Lock() - if len(this.mapping) == 0 { - this.locker.Unlock() - return - } - - timestamp := time.Now().Unix() - for key, item := range this.mapping { - if item.ExpireAt < timestamp { - delete(this.mapping, key) - this.list.Remove(item) - this.totalBytes -= item.Size() - } - } - - this.locker.Unlock() -} - -func (this *Cell) Reset() { - this.locker.Lock() - this.list.Reset() - this.mapping = map[uint64]*Item{} - this.totalBytes = 0 - this.locker.Unlock() -} diff --git a/internal/grids/cell_stat.go b/internal/grids/cell_stat.go deleted file mode 100644 index aa37efa..0000000 --- a/internal/grids/cell_stat.go +++ /dev/null @@ -1,6 +0,0 @@ -package grids - -type CellStat struct { - TotalBytes int64 - CountItems int -} diff --git a/internal/grids/cell_test.go b/internal/grids/cell_test.go deleted file mode 100644 index e499e97..0000000 --- a/internal/grids/cell_test.go +++ /dev/null @@ -1,214 +0,0 @@ -package grids - -import ( - "fmt" - "runtime" - "strconv" - "strings" - "sync" - "testing" - "time" -) - -func TestCell_List(t *testing.T) { - cell := NewCell() - cell.Write(1, &Item{ - ValueInt64: 1, - }) - cell.Write(2, &Item{ - ValueInt64: 2, - }) - cell.Write(3, &Item{ - ValueInt64: 3, - }) - - { - t.Log("====") - l := cell.list - for e := l.head; e != nil; e = e.Next { - t.Log("element:", e.ValueInt64) - } - } - - cell.Write(1, &Item{ - ValueInt64: 1, - }) - cell.Write(3, &Item{ - ValueInt64: 3, - }) - cell.Write(3, &Item{ - ValueInt64: 3, - }) - cell.Write(2, &Item{ - ValueInt64: 2, - }) - cell.Delete(2) - - { - t.Log("====") - l := cell.list - for e := l.head; e != nil; e = e.Next { - t.Log("element:", e.ValueInt64) - } - } - - for _, m := range cell.mapping { - t.Log(m.ValueInt64) - } -} - -func TestCell_LimitSize(t *testing.T) { - cell := NewCell() - cell.LimitSize = 1024 - - for i := int64(0); i < 100; i ++ { - key := []byte(fmt.Sprintf("%d", i)) - cell.Write(HashKey(key), &Item{ - Key: key, - ValueInt64: i, - Type: ItemInt64, - }) - } - - t.Log("totalBytes:", cell.totalBytes) - - { - t.Log("====") - l := cell.list - s := []string{} - for e := l.head; e != nil; e = e.Next { - s = append(s, fmt.Sprintf("%d", e.ValueInt64)) - } - t.Log("{ " + strings.Join(s, ", ") + " }") - } - - t.Log("mapping:", len(cell.mapping)) - s := []string{} - for _, item := range cell.mapping { - s = append(s, fmt.Sprintf("%d", item.ValueInt64)) - } - t.Log("{ " + strings.Join(s, ", ") + " }") -} - -func TestCell_MemoryUsage(t *testing.T) { - //runtime.GOMAXPROCS(4) - - cell := NewCell() - cell.LimitSize = 1024 * 1024 * 1024 * 1 - - before := time.Now() - - wg := sync.WaitGroup{} - wg.Add(4) - - for j := 0; j < 4; j ++ { - go func(j int) { - start := j * 50 * 10000 - for i := start; i < start+50*10000; i ++ { - key := []byte(strconv.Itoa(i) + "VERY_LONG_STRING") - cell.Write(HashKey(key), &Item{ - Key: key, - ValueInt64: int64(i), - Type: ItemInt64, - }) - } - wg.Done() - }(j) - } - - wg.Wait() - t.Log("items:", len(cell.mapping)) - t.Log(time.Since(before).Seconds(), "s", "totalBytes:", cell.totalBytes/1024/1024, "M") - //time.Sleep(10 * time.Second) -} - -func BenchmarkCell_Write(b *testing.B) { - runtime.GOMAXPROCS(1) - - cell := NewCell() - - for i := 0; i < b.N; i ++ { - key := []byte(strconv.Itoa(i) + "_LONG_KEY_LONG_KEY_LONG_KEY_LONG_KEY") - cell.Write(HashKey(key), &Item{ - Key: key, - ValueInt64: int64(i), - Type: ItemInt64, - }) - } - - b.Log("items:", len(cell.mapping)) -} - -func TestCell_Read(t *testing.T) { - cell := NewCell() - - cell.Write(1, &Item{ - ValueInt64: 1, - ExpireAt: time.Now().Unix() + 3600, - }) - cell.Write(2, &Item{ - ValueInt64: 2, - ExpireAt: time.Now().Unix() + 3600, - }) - cell.Write(3, &Item{ - ValueInt64: 3, - ExpireAt: time.Now().Unix() + 3600, - }) - - { - s := []string{} - cell.list.Range(func(item *Item) (goNext bool) { - s = append(s, fmt.Sprintf("%d", item.ValueInt64)) - return true - }) - t.Log("before:", s) - } - - t.Log(cell.Read(1).ValueInt64) - - { - s := []string{} - cell.list.Range(func(item *Item) (goNext bool) { - s = append(s, fmt.Sprintf("%d", item.ValueInt64)) - return true - }) - t.Log("after:", s) - } - - t.Log(cell.Read(2).ValueInt64) - - { - s := []string{} - cell.list.Range(func(item *Item) (goNext bool) { - s = append(s, fmt.Sprintf("%d", item.ValueInt64)) - return true - }) - t.Log("after:", s) - } -} - -func TestCell_Recycle(t *testing.T) { - cell := NewCell() - cell.Write(1, &Item{ - ValueInt64: 1, - ExpireAt: time.Now().Unix() - 1, - }) - - cell.Write(2, &Item{ - ValueInt64: 2, - ExpireAt: time.Now().Unix() + 1, - }) - - cell.Recycle() - - { - s := []string{} - cell.list.Range(func(item *Item) (goNext bool) { - s = append(s, fmt.Sprintf("%d", item.ValueInt64)) - return true - }) - t.Log("after:", s) - } - - t.Log(cell.list.Len(), cell.totalBytes) -} diff --git a/internal/grids/grid.go b/internal/grids/grid.go deleted file mode 100644 index 03a62ef..0000000 --- a/internal/grids/grid.go +++ /dev/null @@ -1,225 +0,0 @@ -package grids - -import ( - "bytes" - "compress/gzip" - "github.com/iwind/TeaGo/logs" - "github.com/iwind/TeaGo/timers" - "math" - "time" -) - -// Memory Cache Grid -// -// | Grid | -// | cell1, cell2, ..., cell1024 | -// | item1, item2, ..., item1000000 | -type Grid struct { - cells []*Cell - countCells uint64 - - recycleIndex int - recycleLooper *timers.Looper - recycleInterval int - - gzipLevel int - - limitSize int64 - limitCount int -} - -func NewGrid(countCells int, opt ...interface{}) *Grid { - grid := &Grid{ - recycleIndex: -1, - } - - for _, o := range opt { - switch x := o.(type) { - case *CompressOpt: - grid.gzipLevel = x.Level - case *LimitSizeOpt: - grid.limitSize = x.Size - case *LimitCountOpt: - grid.limitCount = x.Count - case *RecycleIntervalOpt: - grid.recycleInterval = x.Interval - } - } - - cells := []*Cell{} - if countCells <= 0 { - countCells = 1 - } else if countCells > 100*10000 { - countCells = 100 * 10000 - } - for i := 0; i < countCells; i++ { - cell := NewCell() - cell.LimitSize = int64(math.Floor(float64(grid.limitSize) / float64(countCells))) - cell.LimitCount = int(math.Floor(float64(grid.limitCount)) / float64(countCells)) - - cells = append(cells, cell) - } - grid.cells = cells - grid.countCells = uint64(len(cells)) - - grid.recycleTimer() - return grid -} - -// get all cells in the grid -func (this *Grid) Cells() []*Cell { - return this.cells -} - -func (this *Grid) WriteItem(item *Item) { - if this.countCells <= 0 { - return - } - hashKey := item.HashKey() - this.cellForHashKey(hashKey).Write(hashKey, item) -} - -func (this *Grid) WriteInt64(key []byte, value int64, lifeSeconds int64) { - this.WriteItem(&Item{ - Key: key, - Type: ItemInt64, - ValueInt64: value, - ExpireAt: UnixTime() + lifeSeconds, - }) -} - -func (this *Grid) IncreaseInt64(key []byte, delta int64, lifeSeconds int64) (result int64) { - hashKey := HashKey(key) - return this.cellForHashKey(hashKey).Increase64(key, time.Now().Unix()+lifeSeconds, hashKey, delta) -} - -func (this *Grid) WriteString(key []byte, value string, lifeSeconds int64) { - this.WriteBytes(key, []byte(value), lifeSeconds) -} - -func (this *Grid) WriteBytes(key []byte, value []byte, lifeSeconds int64) { - isCompressed := false - if this.gzipLevel != gzip.NoCompression { - buf := bytes.NewBuffer([]byte{}) - writer, err := gzip.NewWriterLevel(buf, this.gzipLevel) - if err != nil { - logs.Error(err) - this.WriteItem(&Item{ - Key: key, - Type: ItemBytes, - ValueBytes: value, - ExpireAt: time.Now().Unix() + lifeSeconds, - }) - return - } - - _, err = writer.Write([]byte(value)) - if err != nil { - logs.Error(err) - this.WriteItem(&Item{ - Key: key, - Type: ItemBytes, - ValueBytes: value, - ExpireAt: time.Now().Unix() + lifeSeconds, - }) - return - } - - err = writer.Close() - if err != nil { - logs.Error(err) - this.WriteItem(&Item{ - Key: key, - Type: ItemBytes, - ValueBytes: value, - ExpireAt: time.Now().Unix() + lifeSeconds, - }) - return - } - value = buf.Bytes() - isCompressed = true - } - - this.WriteItem(&Item{ - Key: key, - Type: ItemBytes, - ValueBytes: value, - ExpireAt: time.Now().Unix() + lifeSeconds, - IsCompressed: isCompressed, - }) -} - -func (this *Grid) WriteInterface(key []byte, value interface{}, lifeSeconds int64) { - this.WriteItem(&Item{ - Key: key, - Type: ItemInterface, - ValueInterface: value, - ExpireAt: time.Now().Unix() + lifeSeconds, - IsCompressed: false, - }) -} - -func (this *Grid) Read(key []byte) *Item { - if this.countCells <= 0 { - return nil - } - hashKey := HashKey(key) - return this.cellForHashKey(hashKey).Read(hashKey) -} - -func (this *Grid) Stat() *Stat { - stat := &Stat{} - for _, cell := range this.cells { - cellStat := cell.Stat() - stat.CountItems += cellStat.CountItems - stat.TotalBytes += cellStat.TotalBytes - } - return stat -} - -func (this *Grid) Delete(key []byte) { - if this.countCells <= 0 { - return - } - hashKey := HashKey(key) - this.cellForHashKey(hashKey).Delete(hashKey) -} - -func (this *Grid) Reset() { - for _, cell := range this.cells { - cell.Reset() - } -} - -func (this *Grid) Destroy() { - if this.recycleLooper != nil { - this.recycleLooper.Stop() - this.recycleLooper = nil - } - this.cells = nil -} - -func (this *Grid) cellForHashKey(hashKey uint64) *Cell { - if hashKey < 0 { - return this.cells[-hashKey%this.countCells] - } else { - return this.cells[hashKey%this.countCells] - } -} - -func (this *Grid) recycleTimer() { - duration := 1 * time.Minute - if this.recycleInterval > 0 { - duration = time.Duration(this.recycleInterval) * time.Second - } - this.recycleLooper = timers.Loop(duration, func(looper *timers.Looper) { - if this.countCells == 0 { - return - } - this.recycleIndex++ - if this.recycleIndex > int(this.countCells-1) { - this.recycleIndex = 0 - } - this.cells[this.recycleIndex].Recycle() - }) -} diff --git a/internal/grids/grid_test.go b/internal/grids/grid_test.go deleted file mode 100644 index 41fb100..0000000 --- a/internal/grids/grid_test.go +++ /dev/null @@ -1,207 +0,0 @@ -package grids - -import ( - "compress/gzip" - "fmt" - "runtime" - "strconv" - "strings" - "sync" - "testing" - "time" -) - -func TestMemoryGrid_Write(t *testing.T) { - grid := NewGrid(5, NewRecycleIntervalOpt(2), NewLimitSizeOpt(10240)) - t.Log("123456:", grid.Read([]byte("123456"))) - - grid.WriteInt64([]byte("abc"), 1, 5) - t.Log(grid.Read([]byte("abc")).ValueInt64) - - grid.WriteString([]byte("abc"), "123", 5) - t.Log(string(grid.Read([]byte("abc")).Bytes())) - - grid.WriteBytes([]byte("abc"), []byte("123"), 5) - t.Log(grid.Read([]byte("abc")).Bytes()) - - grid.Delete([]byte("abc")) - t.Log(grid.Read([]byte("abc"))) - - for i := 0; i < 100; i++ { - grid.WriteInt64([]byte(fmt.Sprintf("%d", i)), 123, 1) - } - - t.Log("before recycle:") - for index, cell := range grid.cells { - t.Log("cell:", index, len(cell.mapping), "items") - } - - time.Sleep(3 * time.Second) - t.Log("after recycle:") - for index, cell := range grid.cells { - t.Log("cell:", index, len(cell.mapping), "items") - } - - grid.Destroy() -} - -func TestMemoryGrid_Write_LimitCount(t *testing.T) { - grid := NewGrid(2, NewLimitCountOpt(10)) - for i := 0; i < 100; i++ { - grid.WriteInt64([]byte(strconv.Itoa(i)), int64(i), 30) - } - t.Log(grid.Stat().CountItems, "items") -} - -func TestMemoryGrid_Compress(t *testing.T) { - grid := NewGrid(5, NewCompressOpt(1)) - grid.WriteString([]byte("hello"), strings.Repeat("abcd", 10240), 30) - t.Log(len(string(grid.Read([]byte("hello")).String()))) - t.Log(len(grid.Read([]byte("hello")).ValueBytes)) -} - -func BenchmarkMemoryGrid_Performance(b *testing.B) { - runtime.GOMAXPROCS(1) - - grid := NewGrid(1024) - for i := 0; i < b.N; i++ { - key := "key:" + strconv.Itoa(i) - grid.WriteInt64([]byte(key), int64(i), 3600) - } -} - -func TestMemoryGrid_Performance(t *testing.T) { - runtime.GOMAXPROCS(1) - - grid := NewGrid(1024) - - now := time.Now() - - s := []byte(strings.Repeat("abcd", 10*1024)) - - for i := 0; i < 100000; i++ { - grid.WriteBytes([]byte(fmt.Sprintf("key:%d_%d", i, 1)), s, 3600) - item := grid.Read([]byte(fmt.Sprintf("key:%d_%d", i, 1))) - if item != nil { - _ = item.String() - } - } - - countItems := 0 - for _, cell := range grid.cells { - countItems += len(cell.mapping) - } - t.Log(countItems, "items") - - t.Log(time.Since(now).Seconds()*1000, "ms") -} - -func TestMemoryGrid_Performance_Concurrent(t *testing.T) { - //runtime.GOMAXPROCS(1) - - grid := NewGrid(1024) - - now := time.Now() - - s := []byte(strings.Repeat("abcd", 10*1024)) - - wg := sync.WaitGroup{} - wg.Add(runtime.NumCPU()) - - for c := 0; c < runtime.NumCPU(); c++ { - go func(c int) { - defer wg.Done() - for i := 0; i < 50000; i++ { - grid.WriteBytes([]byte(fmt.Sprintf("key:%d_%d", i, c)), s, 3600) - item := grid.Read([]byte(fmt.Sprintf("key:%d_%d", i, c))) - if item != nil { - _ = item.String() - } - } - }(c) - } - - wg.Wait() - countItems := 0 - for _, cell := range grid.cells { - countItems += len(cell.mapping) - } - t.Log(countItems, "items") - - t.Log(time.Since(now).Seconds()*1000, "ms") -} - -func TestMemoryGrid_CompressPerformance(t *testing.T) { - runtime.GOMAXPROCS(1) - - grid := NewGrid(1024, NewCompressOpt(gzip.BestCompression)) - - now := time.Now() - data := []byte(strings.Repeat("abcd", 1024)) - - for i := 0; i < 100000; i++ { - grid.WriteBytes([]byte(fmt.Sprintf("key:%d", i)), data, 3600) - item := grid.Read([]byte(fmt.Sprintf("key:%d", i+100))) - if item != nil { - _ = item.String() - } - } - - countItems := 0 - for _, cell := range grid.cells { - countItems += len(cell.mapping) - } - t.Log(countItems, "items") - - t.Log(time.Since(now).Seconds()*1000, "ms") -} - -func TestMemoryGrid_IncreaseInt64(t *testing.T) { - grid := NewGrid(1024) - grid.WriteInt64([]byte("abc"), 123, 10) - grid.IncreaseInt64([]byte("abc"), 123, 10) - grid.IncreaseInt64([]byte("abc"), 123, 10) - item := grid.Read([]byte("abc")) - if item == nil { - t.Fatal("item == nil") - } - - if item.ValueInt64 != 369 { - t.Fatal("not 369") - } -} - -func TestMemoryGrid_Destroy(t *testing.T) { - grid := NewGrid(1024) - grid.WriteInt64([]byte("abc"), 123, 10) - t.Log(grid.recycleLooper, grid.cells) - grid.Destroy() - t.Log(grid.recycleLooper, grid.cells) - - if grid.recycleLooper != nil { - t.Fatal("looper != nil") - } -} - -func TestMemoryGrid_Recycle(t *testing.T) { - cell := NewCell() - timestamp := time.Now().Unix() - for i := 0; i < 300_0000; i++ { - cell.Write(uint64(i), &Item{ - ExpireAt: timestamp - 30, - }) - } - before := time.Now() - cell.Recycle() - t.Log(time.Since(before).Seconds()*1000, "ms") - t.Log(len(cell.mapping)) - - runtime.GC() - printMem(t) -} - -func printMem(t *testing.T) { - mem := &runtime.MemStats{} - runtime.ReadMemStats(mem) - t.Log(mem.Alloc/1024/1024, "M") -} diff --git a/internal/grids/item.go b/internal/grids/item.go deleted file mode 100644 index d664998..0000000 --- a/internal/grids/item.go +++ /dev/null @@ -1,88 +0,0 @@ -package grids - -import ( - "bytes" - "compress/gzip" - "github.com/cespare/xxhash" - "github.com/iwind/TeaGo/logs" - "sync/atomic" - "unsafe" -) - -type ItemType = int8 - -const ( - ItemInt64 = 1 - ItemBytes = 2 - ItemInterface = 3 -) - -func HashKey(key []byte) uint64 { - return xxhash.Sum64(key) -} - -type Item struct { - Key []byte - ExpireAt int64 - Type ItemType - ValueInt64 int64 - ValueBytes []byte - ValueInterface interface{} - IsCompressed bool - - // linked list - Prev *Item - Next *Item - - size int64 -} - -func NewItem(key []byte, dataType ItemType) *Item { - return &Item{ - Key: key, - Type: dataType, - } -} - -func (this *Item) HashKey() uint64 { - return HashKey(this.Key) -} - -func (this *Item) IncreaseInt64(delta int64) { - atomic.AddInt64(&this.ValueInt64, delta) -} - -func (this *Item) Bytes() []byte { - if this.IsCompressed { - reader, err := gzip.NewReader(bytes.NewBuffer(this.ValueBytes)) - if err != nil { - logs.Error(err) - return this.ValueBytes - } - - buf := make([]byte, 256) - dataBuf := bytes.NewBuffer([]byte{}) - for { - n, err := reader.Read(buf) - if n > 0 { - dataBuf.Write(buf[:n]) - } - if err != nil { - break - } - } - return dataBuf.Bytes() - } - return this.ValueBytes -} - -func (this *Item) String() string { - return string(this.Bytes()) -} - -func (this *Item) Size() int64 { - if this.size == 0 { - this.size = int64(int(unsafe.Sizeof(*this)) + len(this.Key) + len(this.ValueBytes)) - } - return this.size -} diff --git a/internal/grids/item_test.go b/internal/grids/item_test.go deleted file mode 100644 index b7baccb..0000000 --- a/internal/grids/item_test.go +++ /dev/null @@ -1,69 +0,0 @@ -package grids - -import ( - "crypto/md5" - "github.com/cespare/xxhash" - "strconv" - "testing" -) - -func TestItem_Size(t *testing.T) { - item := &Item{ - ValueInt64: 1024, - Key: []byte("123"), - ValueBytes: []byte("Hello, World"), - } - t.Log(item.Size()) -} - -func BenchmarkItem_Size(b *testing.B) { - item := &Item{ - ValueInt64: 1024, - Key: []byte("123"), - ValueBytes: []byte("Hello, World"), - } - for i := 0; i < b.N; i++ { - _ = item.Size() - } -} - -func TestItem_HashKey(t *testing.T) { - t.Log(HashKey([]byte("2"))) -} - -func TestItem_xxHash(t *testing.T) { - result := xxhash.Sum64([]byte("123456")) - t.Log(result) -} - -func TestItem_unique(t *testing.T) { - m := map[uint64]bool{} - for i := 0; i < 1000*10000; i++ { - s := "Hello,World,LONG KEY,LONG KEY,LONG KEY,LONG KEY" + strconv.Itoa(i) - result := xxhash.Sum64([]byte(s)) - _, ok := m[result] - if ok { - t.Log("found same", i) - break - } else { - m[result] = true - } - } - - t.Log(xxhash.Sum64([]byte("01"))) - t.Log(xxhash.Sum64([]byte("10"))) -} - -func BenchmarkItem_HashKeyMd5(b *testing.B) { - for i := 0; i < b.N; i++ { - h := md5.New() - h.Write([]byte("HELLO_KEY_" + strconv.Itoa(i))) - _ = h.Sum(nil) - } -} - -func BenchmarkItem_xxHash(b *testing.B) { - for i := 0; i < b.N; i++ { - _ = xxhash.Sum64([]byte("HELLO_KEY_" + strconv.Itoa(i))) - } -} diff --git a/internal/grids/list.go b/internal/grids/list.go deleted file mode 100644 index 35c7d6a..0000000 --- a/internal/grids/list.go +++ /dev/null @@ -1,68 +0,0 @@ -package grids - -type List struct { - head *Item - end *Item -} - -func NewList() *List { - return &List{} -} - -func (this *List) Add(item *Item) { - if item == nil { - return - } - if this.end != nil { - this.end.Next = item - item.Prev = this.end - item.Next = nil - } - this.end = item - if this.head == nil { - this.head = item - } -} - -func (this *List) Remove(item *Item) { - if item == nil { - return - } - if item.Prev != nil { - item.Prev.Next = item.Next - } - if item.Next != nil { - item.Next.Prev = item.Prev - } - if item == this.head { - this.head = item.Next - } - if item == this.end { - this.end = item.Prev - } - - item.Prev = nil - item.Next = nil -} - -func (this *List) Len() int { - l := 0 - for e := this.head; e != nil; e = e.Next { - l ++ - } - return l -} - -func (this *List) Range(f func(item *Item) (goNext bool)) { - for e := this.head; e != nil; e = e.Next { - goNext := f(e) - if !goNext { - break - } - } -} - -func (this *List) Reset() { - this.head = nil - this.end = nil -} diff --git a/internal/grids/list_test.go b/internal/grids/list_test.go deleted file mode 100644 index d7ed42c..0000000 --- a/internal/grids/list_test.go +++ /dev/null @@ -1,64 +0,0 @@ -package grids - -import "testing" - -func TestList(t *testing.T) { - l := &List{} - - var e1 *Item = nil - { - e := &Item{ - ValueInt64: 1, - } - l.Add(e) - e1 = e - } - - var e2 *Item = nil - { - e := &Item{ - ValueInt64: 2, - } - l.Add(e) - e2 = e - } - - var e3 *Item = nil - { - e := &Item{ - ValueInt64: 3, - } - l.Add(e) - e3 = e - } - - var e4 *Item = nil - { - e := &Item{ - ValueInt64: 4, - } - l.Add(e) - e4 = e - } - - l.Remove(e1) - //l.Remove(e2) - //l.Remove(e3) - l.Remove(e4) - - for e := l.head; e != nil; e = e.Next { - t.Log(e.ValueInt64) - } - - t.Log("e1, e2, e3, e4, head, end:", e1, e2, e3, e4) - if l.head != nil { - t.Log("head:", l.head.ValueInt64) - } else { - t.Log("head: nil") - } - if l.end != nil { - t.Log("end:", l.end.ValueInt64) - } else { - t.Log("end: nil") - } -} diff --git a/internal/grids/opt_compress.go b/internal/grids/opt_compress.go deleted file mode 100644 index 3cdc8bf..0000000 --- a/internal/grids/opt_compress.go +++ /dev/null @@ -1,11 +0,0 @@ -package grids - -type CompressOpt struct { - Level int -} - -func NewCompressOpt(level int) *CompressOpt { - return &CompressOpt{ - Level: level, - } -} diff --git a/internal/grids/opt_limit_count.go b/internal/grids/opt_limit_count.go deleted file mode 100644 index 2858b8d..0000000 --- a/internal/grids/opt_limit_count.go +++ /dev/null @@ -1,11 +0,0 @@ -package grids - -type LimitCountOpt struct { - Count int -} - -func NewLimitCountOpt(count int) *LimitCountOpt { - return &LimitCountOpt{ - Count: count, - } -} diff --git a/internal/grids/opt_limit_size.go b/internal/grids/opt_limit_size.go deleted file mode 100644 index ab513ef..0000000 --- a/internal/grids/opt_limit_size.go +++ /dev/null @@ -1,11 +0,0 @@ -package grids - -type LimitSizeOpt struct { - Size int64 -} - -func NewLimitSizeOpt(size int64) *LimitSizeOpt { - return &LimitSizeOpt{ - Size: size, - } -} diff --git a/internal/grids/opt_recycle_interval.go b/internal/grids/opt_recycle_interval.go deleted file mode 100644 index 73b792b..0000000 --- a/internal/grids/opt_recycle_interval.go +++ /dev/null @@ -1,11 +0,0 @@ -package grids - -type RecycleIntervalOpt struct { - Interval int -} - -func NewRecycleIntervalOpt(interval int) *RecycleIntervalOpt { - return &RecycleIntervalOpt{ - Interval: interval, - } -} diff --git a/internal/grids/stat.go b/internal/grids/stat.go deleted file mode 100644 index 7202811..0000000 --- a/internal/grids/stat.go +++ /dev/null @@ -1,6 +0,0 @@ -package grids - -type Stat struct { - TotalBytes int64 - CountItems int -} diff --git a/internal/grids/time.go b/internal/grids/time.go deleted file mode 100644 index b1fc21e..0000000 --- a/internal/grids/time.go +++ /dev/null @@ -1,26 +0,0 @@ -package grids - -import ( - "time" -) - -var unixTime = time.Now().Unix() -var unixTimerIsReady = false - -func init() { - ticker := time.NewTicker(500 * time.Millisecond) - go func() { - for range ticker.C { - unixTimerIsReady = true - unixTime = time.Now().Unix() - } - }() -} - -// 最快获取时间戳的方式,通常用在不需要特别精确时间戳的场景 -func UnixTime() int64 { - if unixTimerIsReady { - return unixTime - } - return time.Now().Unix() -} diff --git a/internal/grids/time_test.go b/internal/grids/time_test.go deleted file mode 100644 index 50c2fa7..0000000 --- a/internal/grids/time_test.go +++ /dev/null @@ -1,13 +0,0 @@ -package grids - -import ( - "testing" - "time" -) - -func TestUnixTime(t *testing.T) { - for i := 0; i < 5; i++ { - t.Log(UnixTime(), "real:", time.Now().Unix()) - time.Sleep(1 * time.Second) - } -} diff --git a/internal/ttlcache/cache.go b/internal/ttlcache/cache.go index f642f39..4913c26 100644 --- a/internal/ttlcache/cache.go +++ b/internal/ttlcache/cache.go @@ -1,6 +1,7 @@ package ttlcache import ( + "github.com/TeaOSLab/EdgeNode/internal/utils" "time" ) @@ -12,11 +13,13 @@ import ( // KeyMap列表数据结构 // { timestamp1 => [key1, key2, ...] }, ... type Cache struct { + isDestroyed bool pieces []*Piece countPieces uint64 maxItems int gcPieceIndex int + ticker *utils.Ticker } func NewCache(opt ...OptionInterface) *Cache { @@ -49,8 +52,8 @@ func NewCache(opt ...OptionInterface) *Cache { // start timer go func() { - ticker := time.NewTicker(5 * time.Second) - for range ticker.C { + cache.ticker = utils.NewTicker(5 * time.Second) + for cache.ticker.Next() { cache.GC() } }() @@ -59,6 +62,10 @@ func NewCache(opt ...OptionInterface) *Cache { } func (this *Cache) Write(key string, value interface{}, expiredAt int64) { + if this.isDestroyed { + return + } + currentTimestamp := time.Now().Unix() if expiredAt <= currentTimestamp { return @@ -76,6 +83,25 @@ func (this *Cache) Write(key string, value interface{}, expiredAt int64) { }) } +func (this *Cache) IncreaseInt64(key string, delta int64, expiredAt int64) int64 { + if this.isDestroyed { + return 0 + } + + currentTimestamp := time.Now().Unix() + if expiredAt <= currentTimestamp { + return 0 + } + + maxExpiredAt := currentTimestamp + 30*86400 + if expiredAt > maxExpiredAt { + expiredAt = maxExpiredAt + } + uint64Key := HashKey([]byte(key)) + pieceIndex := uint64Key % this.countPieces + return this.pieces[pieceIndex].IncreaseInt64(uint64Key, delta, expiredAt) +} + func (this *Cache) Read(key string) (item *Item) { uint64Key := HashKey([]byte(key)) return this.pieces[uint64Key%this.countPieces].Read(uint64Key) @@ -109,3 +135,15 @@ func (this *Cache) GC() { } this.gcPieceIndex = newIndex } + +func (this *Cache) Destroy() { + this.isDestroyed = true + + if this.ticker != nil { + this.ticker.Stop() + this.ticker = nil + } + for _, piece := range this.pieces { + piece.Destroy() + } +} diff --git a/internal/ttlcache/cache_test.go b/internal/ttlcache/cache_test.go index ddeb34b..0ffcf11 100644 --- a/internal/ttlcache/cache_test.go +++ b/internal/ttlcache/cache_test.go @@ -36,6 +36,27 @@ func BenchmarkCache_Add(b *testing.B) { } } +func TestCache_IncreaseInt64(t *testing.T) { + var cache = NewCache() + + { + cache.IncreaseInt64("a", 1, time.Now().Unix()+3600) + t.Log(cache.Read("a")) + } + { + cache.IncreaseInt64("a", 1, time.Now().Unix()+3600+1) + t.Log(cache.Read("a")) + } + { + cache.Write("b", 1, time.Now().Unix()+3600+2) + t.Log(cache.Read("b")) + } + { + cache.IncreaseInt64("b", 1, time.Now().Unix()+3600+3) + t.Log(cache.Read("b")) + } +} + func TestCache_Read(t *testing.T) { runtime.GOMAXPROCS(1) diff --git a/internal/ttlcache/piece.go b/internal/ttlcache/piece.go index 3e3ed13..c1f8782 100644 --- a/internal/ttlcache/piece.go +++ b/internal/ttlcache/piece.go @@ -2,6 +2,7 @@ package ttlcache import ( "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/iwind/TeaGo/types" "sync" "time" ) @@ -26,6 +27,26 @@ func (this *Piece) Add(key uint64, item *Item) () { this.locker.Unlock() } +func (this *Piece) IncreaseInt64(key uint64, delta int64, expiredAt int64) (result int64) { + this.locker.Lock() + item, ok := this.m[key] + if ok { + result := types.Int64(item.Value) + delta + item.Value = result + item.expiredAt = expiredAt + } else { + if len(this.m) < this.maxItems { + result = delta + this.m[key] = &Item{ + Value: delta, + expiredAt: expiredAt, + } + } + } + this.locker.Unlock() + return +} + func (this *Piece) Delete(key uint64) { this.locker.Lock() delete(this.m, key) @@ -60,3 +81,9 @@ func (this *Piece) GC() { } this.locker.Unlock() } + +func (this *Piece) Destroy() { + this.locker.Lock() + this.m = nil + this.locker.Unlock() +} diff --git a/internal/waf/checkpoints/cc.go b/internal/waf/checkpoints/cc.go index 89a85be..8de7ef9 100644 --- a/internal/waf/checkpoints/cc.go +++ b/internal/waf/checkpoints/cc.go @@ -1,7 +1,7 @@ package checkpoints import ( - "github.com/TeaOSLab/EdgeNode/internal/grids" + "github.com/TeaOSLab/EdgeNode/internal/ttlcache" "github.com/TeaOSLab/EdgeNode/internal/waf/requests" "github.com/iwind/TeaGo/maps" "github.com/iwind/TeaGo/types" @@ -9,6 +9,7 @@ import ( "regexp" "strings" "sync" + "time" ) // ${cc.arg} @@ -16,8 +17,8 @@ import ( type CCCheckpoint struct { Checkpoint - grid *grids.Grid - once sync.Once + cache *ttlcache.Cache + once sync.Once } func (this *CCCheckpoint) Init() { @@ -25,20 +26,20 @@ func (this *CCCheckpoint) Init() { } func (this *CCCheckpoint) Start() { - if this.grid != nil { - this.grid.Destroy() + if this.cache != nil { + this.cache.Destroy() } - this.grid = grids.NewGrid(32, grids.NewLimitCountOpt(1000_0000)) + this.cache = ttlcache.NewCache() } func (this *CCCheckpoint) RequestValue(req *requests.Request, param string, options maps.Map) (value interface{}, sysErr error, userErr error) { value = 0 - if this.grid == nil { + if this.cache == nil { this.once.Do(func() { this.Start() }) - if this.grid == nil { + if this.cache == nil { return } } @@ -115,7 +116,7 @@ func (this *CCCheckpoint) RequestValue(req *requests.Request, param string, opti if len(key) == 0 { key = this.ip(req) } - value = this.grid.IncreaseInt64([]byte(key), 1, period) + value = this.cache.IncreaseInt64(key, int64(1), time.Now().Unix()+period) } return @@ -204,9 +205,9 @@ func (this *CCCheckpoint) Options() []OptionInterface { } func (this *CCCheckpoint) Stop() { - if this.grid != nil { - this.grid.Destroy() - this.grid = nil + if this.cache != nil { + this.cache.Destroy() + this.cache = nil } }