彻底替换掉memorygrid

This commit is contained in:
GoEdgeLab
2020-11-22 12:11:39 +08:00
parent 4815f378aa
commit 61b3a42a31
21 changed files with 101 additions and 1232 deletions

View File

@@ -1,2 +0,0 @@
# Memory Grid
Cache items in memory, using partitions and LRU.

View File

@@ -1,186 +0,0 @@
package grids
import (
"math"
"sync"
"time"
)
type Cell struct {
LimitSize int64
LimitCount int
mapping map[uint64]*Item // key => item
list *List // { item1, item2, ... }
totalBytes int64
locker sync.RWMutex
}
func NewCell() *Cell {
return &Cell{
mapping: map[uint64]*Item{},
list: NewList(),
}
}
func (this *Cell) Write(hashKey uint64, item *Item) {
if item == nil {
return
}
this.locker.Lock()
oldItem, ok := this.mapping[hashKey]
if ok {
this.list.Remove(oldItem)
if this.LimitSize > 0 {
this.totalBytes -= oldItem.Size()
}
}
// limit count
if this.LimitCount > 0 && len(this.mapping) >= this.LimitCount {
this.locker.Unlock()
return
}
// trim memory
size := item.Size()
shouldTrim := false
if this.LimitSize > 0 && this.LimitSize < this.totalBytes+size {
this.Trim()
shouldTrim = true
}
// compare again
if shouldTrim {
if this.LimitSize > 0 && this.LimitSize < this.totalBytes+size {
this.locker.Unlock()
return
}
}
this.totalBytes += size
this.list.Add(item)
this.mapping[hashKey] = item
this.locker.Unlock()
}
func (this *Cell) Increase64(key []byte, expireAt int64, hashKey uint64, delta int64) (result int64) {
this.locker.Lock()
item, ok := this.mapping[hashKey]
if ok {
// reset to zero if expired
if item.ExpireAt < time.Now().Unix() {
item.ValueInt64 = 0
item.ExpireAt = expireAt
}
item.IncreaseInt64(delta)
result = item.ValueInt64
} else {
item := NewItem(key, ItemInt64)
item.ValueInt64 = delta
item.ExpireAt = expireAt
this.mapping[hashKey] = item
result = delta
}
this.locker.Unlock()
return
}
func (this *Cell) Read(hashKey uint64) *Item {
this.locker.Lock()
item, ok := this.mapping[hashKey]
if ok {
this.list.Remove(item)
this.list.Add(item)
this.locker.Unlock()
if item.ExpireAt < time.Now().Unix() {
return nil
}
return item
}
this.locker.Unlock()
return nil
}
func (this *Cell) Stat() *CellStat {
this.locker.RLock()
defer this.locker.RUnlock()
return &CellStat{
TotalBytes: this.totalBytes,
CountItems: len(this.mapping),
}
}
// trim NOT ACTIVE items
// should called in locker context
func (this *Cell) Trim() {
l := len(this.mapping)
if l == 0 {
return
}
inactiveSize := int(math.Ceil(float64(l) / 10)) // trim 10% items
this.list.Range(func(item *Item) (goNext bool) {
inactiveSize--
delete(this.mapping, item.HashKey())
this.list.Remove(item)
this.totalBytes -= item.Size()
return inactiveSize > 0
})
}
func (this *Cell) Delete(hashKey uint64) {
this.locker.Lock()
item, ok := this.mapping[hashKey]
if ok {
delete(this.mapping, hashKey)
this.list.Remove(item)
this.totalBytes -= item.Size()
}
this.locker.Unlock()
}
// range all items in the cell
func (this *Cell) Range(f func(item *Item)) {
this.locker.Lock()
for _, item := range this.mapping {
f(item)
}
this.locker.Unlock()
}
func (this *Cell) Recycle() {
this.locker.Lock()
if len(this.mapping) == 0 {
this.locker.Unlock()
return
}
timestamp := time.Now().Unix()
for key, item := range this.mapping {
if item.ExpireAt < timestamp {
delete(this.mapping, key)
this.list.Remove(item)
this.totalBytes -= item.Size()
}
}
this.locker.Unlock()
}
func (this *Cell) Reset() {
this.locker.Lock()
this.list.Reset()
this.mapping = map[uint64]*Item{}
this.totalBytes = 0
this.locker.Unlock()
}

View File

@@ -1,6 +0,0 @@
package grids
type CellStat struct {
TotalBytes int64
CountItems int
}

View File

@@ -1,214 +0,0 @@
package grids
import (
"fmt"
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"
)
func TestCell_List(t *testing.T) {
cell := NewCell()
cell.Write(1, &Item{
ValueInt64: 1,
})
cell.Write(2, &Item{
ValueInt64: 2,
})
cell.Write(3, &Item{
ValueInt64: 3,
})
{
t.Log("====")
l := cell.list
for e := l.head; e != nil; e = e.Next {
t.Log("element:", e.ValueInt64)
}
}
cell.Write(1, &Item{
ValueInt64: 1,
})
cell.Write(3, &Item{
ValueInt64: 3,
})
cell.Write(3, &Item{
ValueInt64: 3,
})
cell.Write(2, &Item{
ValueInt64: 2,
})
cell.Delete(2)
{
t.Log("====")
l := cell.list
for e := l.head; e != nil; e = e.Next {
t.Log("element:", e.ValueInt64)
}
}
for _, m := range cell.mapping {
t.Log(m.ValueInt64)
}
}
func TestCell_LimitSize(t *testing.T) {
cell := NewCell()
cell.LimitSize = 1024
for i := int64(0); i < 100; i ++ {
key := []byte(fmt.Sprintf("%d", i))
cell.Write(HashKey(key), &Item{
Key: key,
ValueInt64: i,
Type: ItemInt64,
})
}
t.Log("totalBytes:", cell.totalBytes)
{
t.Log("====")
l := cell.list
s := []string{}
for e := l.head; e != nil; e = e.Next {
s = append(s, fmt.Sprintf("%d", e.ValueInt64))
}
t.Log("{ " + strings.Join(s, ", ") + " }")
}
t.Log("mapping:", len(cell.mapping))
s := []string{}
for _, item := range cell.mapping {
s = append(s, fmt.Sprintf("%d", item.ValueInt64))
}
t.Log("{ " + strings.Join(s, ", ") + " }")
}
func TestCell_MemoryUsage(t *testing.T) {
//runtime.GOMAXPROCS(4)
cell := NewCell()
cell.LimitSize = 1024 * 1024 * 1024 * 1
before := time.Now()
wg := sync.WaitGroup{}
wg.Add(4)
for j := 0; j < 4; j ++ {
go func(j int) {
start := j * 50 * 10000
for i := start; i < start+50*10000; i ++ {
key := []byte(strconv.Itoa(i) + "VERY_LONG_STRING")
cell.Write(HashKey(key), &Item{
Key: key,
ValueInt64: int64(i),
Type: ItemInt64,
})
}
wg.Done()
}(j)
}
wg.Wait()
t.Log("items:", len(cell.mapping))
t.Log(time.Since(before).Seconds(), "s", "totalBytes:", cell.totalBytes/1024/1024, "M")
//time.Sleep(10 * time.Second)
}
func BenchmarkCell_Write(b *testing.B) {
runtime.GOMAXPROCS(1)
cell := NewCell()
for i := 0; i < b.N; i ++ {
key := []byte(strconv.Itoa(i) + "_LONG_KEY_LONG_KEY_LONG_KEY_LONG_KEY")
cell.Write(HashKey(key), &Item{
Key: key,
ValueInt64: int64(i),
Type: ItemInt64,
})
}
b.Log("items:", len(cell.mapping))
}
func TestCell_Read(t *testing.T) {
cell := NewCell()
cell.Write(1, &Item{
ValueInt64: 1,
ExpireAt: time.Now().Unix() + 3600,
})
cell.Write(2, &Item{
ValueInt64: 2,
ExpireAt: time.Now().Unix() + 3600,
})
cell.Write(3, &Item{
ValueInt64: 3,
ExpireAt: time.Now().Unix() + 3600,
})
{
s := []string{}
cell.list.Range(func(item *Item) (goNext bool) {
s = append(s, fmt.Sprintf("%d", item.ValueInt64))
return true
})
t.Log("before:", s)
}
t.Log(cell.Read(1).ValueInt64)
{
s := []string{}
cell.list.Range(func(item *Item) (goNext bool) {
s = append(s, fmt.Sprintf("%d", item.ValueInt64))
return true
})
t.Log("after:", s)
}
t.Log(cell.Read(2).ValueInt64)
{
s := []string{}
cell.list.Range(func(item *Item) (goNext bool) {
s = append(s, fmt.Sprintf("%d", item.ValueInt64))
return true
})
t.Log("after:", s)
}
}
func TestCell_Recycle(t *testing.T) {
cell := NewCell()
cell.Write(1, &Item{
ValueInt64: 1,
ExpireAt: time.Now().Unix() - 1,
})
cell.Write(2, &Item{
ValueInt64: 2,
ExpireAt: time.Now().Unix() + 1,
})
cell.Recycle()
{
s := []string{}
cell.list.Range(func(item *Item) (goNext bool) {
s = append(s, fmt.Sprintf("%d", item.ValueInt64))
return true
})
t.Log("after:", s)
}
t.Log(cell.list.Len(), cell.totalBytes)
}

View File

@@ -1,225 +0,0 @@
package grids
import (
"bytes"
"compress/gzip"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/timers"
"math"
"time"
)
// Memory Cache Grid
//
// | Grid |
// | cell1, cell2, ..., cell1024 |
// | item1, item2, ..., item1000000 |
type Grid struct {
cells []*Cell
countCells uint64
recycleIndex int
recycleLooper *timers.Looper
recycleInterval int
gzipLevel int
limitSize int64
limitCount int
}
func NewGrid(countCells int, opt ...interface{}) *Grid {
grid := &Grid{
recycleIndex: -1,
}
for _, o := range opt {
switch x := o.(type) {
case *CompressOpt:
grid.gzipLevel = x.Level
case *LimitSizeOpt:
grid.limitSize = x.Size
case *LimitCountOpt:
grid.limitCount = x.Count
case *RecycleIntervalOpt:
grid.recycleInterval = x.Interval
}
}
cells := []*Cell{}
if countCells <= 0 {
countCells = 1
} else if countCells > 100*10000 {
countCells = 100 * 10000
}
for i := 0; i < countCells; i++ {
cell := NewCell()
cell.LimitSize = int64(math.Floor(float64(grid.limitSize) / float64(countCells)))
cell.LimitCount = int(math.Floor(float64(grid.limitCount)) / float64(countCells))
cells = append(cells, cell)
}
grid.cells = cells
grid.countCells = uint64(len(cells))
grid.recycleTimer()
return grid
}
// get all cells in the grid
func (this *Grid) Cells() []*Cell {
return this.cells
}
func (this *Grid) WriteItem(item *Item) {
if this.countCells <= 0 {
return
}
hashKey := item.HashKey()
this.cellForHashKey(hashKey).Write(hashKey, item)
}
func (this *Grid) WriteInt64(key []byte, value int64, lifeSeconds int64) {
this.WriteItem(&Item{
Key: key,
Type: ItemInt64,
ValueInt64: value,
ExpireAt: UnixTime() + lifeSeconds,
})
}
func (this *Grid) IncreaseInt64(key []byte, delta int64, lifeSeconds int64) (result int64) {
hashKey := HashKey(key)
return this.cellForHashKey(hashKey).Increase64(key, time.Now().Unix()+lifeSeconds, hashKey, delta)
}
func (this *Grid) WriteString(key []byte, value string, lifeSeconds int64) {
this.WriteBytes(key, []byte(value), lifeSeconds)
}
func (this *Grid) WriteBytes(key []byte, value []byte, lifeSeconds int64) {
isCompressed := false
if this.gzipLevel != gzip.NoCompression {
buf := bytes.NewBuffer([]byte{})
writer, err := gzip.NewWriterLevel(buf, this.gzipLevel)
if err != nil {
logs.Error(err)
this.WriteItem(&Item{
Key: key,
Type: ItemBytes,
ValueBytes: value,
ExpireAt: time.Now().Unix() + lifeSeconds,
})
return
}
_, err = writer.Write([]byte(value))
if err != nil {
logs.Error(err)
this.WriteItem(&Item{
Key: key,
Type: ItemBytes,
ValueBytes: value,
ExpireAt: time.Now().Unix() + lifeSeconds,
})
return
}
err = writer.Close()
if err != nil {
logs.Error(err)
this.WriteItem(&Item{
Key: key,
Type: ItemBytes,
ValueBytes: value,
ExpireAt: time.Now().Unix() + lifeSeconds,
})
return
}
value = buf.Bytes()
isCompressed = true
}
this.WriteItem(&Item{
Key: key,
Type: ItemBytes,
ValueBytes: value,
ExpireAt: time.Now().Unix() + lifeSeconds,
IsCompressed: isCompressed,
})
}
func (this *Grid) WriteInterface(key []byte, value interface{}, lifeSeconds int64) {
this.WriteItem(&Item{
Key: key,
Type: ItemInterface,
ValueInterface: value,
ExpireAt: time.Now().Unix() + lifeSeconds,
IsCompressed: false,
})
}
func (this *Grid) Read(key []byte) *Item {
if this.countCells <= 0 {
return nil
}
hashKey := HashKey(key)
return this.cellForHashKey(hashKey).Read(hashKey)
}
func (this *Grid) Stat() *Stat {
stat := &Stat{}
for _, cell := range this.cells {
cellStat := cell.Stat()
stat.CountItems += cellStat.CountItems
stat.TotalBytes += cellStat.TotalBytes
}
return stat
}
func (this *Grid) Delete(key []byte) {
if this.countCells <= 0 {
return
}
hashKey := HashKey(key)
this.cellForHashKey(hashKey).Delete(hashKey)
}
func (this *Grid) Reset() {
for _, cell := range this.cells {
cell.Reset()
}
}
func (this *Grid) Destroy() {
if this.recycleLooper != nil {
this.recycleLooper.Stop()
this.recycleLooper = nil
}
this.cells = nil
}
func (this *Grid) cellForHashKey(hashKey uint64) *Cell {
if hashKey < 0 {
return this.cells[-hashKey%this.countCells]
} else {
return this.cells[hashKey%this.countCells]
}
}
func (this *Grid) recycleTimer() {
duration := 1 * time.Minute
if this.recycleInterval > 0 {
duration = time.Duration(this.recycleInterval) * time.Second
}
this.recycleLooper = timers.Loop(duration, func(looper *timers.Looper) {
if this.countCells == 0 {
return
}
this.recycleIndex++
if this.recycleIndex > int(this.countCells-1) {
this.recycleIndex = 0
}
this.cells[this.recycleIndex].Recycle()
})
}

View File

@@ -1,207 +0,0 @@
package grids
import (
"compress/gzip"
"fmt"
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"
)
func TestMemoryGrid_Write(t *testing.T) {
grid := NewGrid(5, NewRecycleIntervalOpt(2), NewLimitSizeOpt(10240))
t.Log("123456:", grid.Read([]byte("123456")))
grid.WriteInt64([]byte("abc"), 1, 5)
t.Log(grid.Read([]byte("abc")).ValueInt64)
grid.WriteString([]byte("abc"), "123", 5)
t.Log(string(grid.Read([]byte("abc")).Bytes()))
grid.WriteBytes([]byte("abc"), []byte("123"), 5)
t.Log(grid.Read([]byte("abc")).Bytes())
grid.Delete([]byte("abc"))
t.Log(grid.Read([]byte("abc")))
for i := 0; i < 100; i++ {
grid.WriteInt64([]byte(fmt.Sprintf("%d", i)), 123, 1)
}
t.Log("before recycle:")
for index, cell := range grid.cells {
t.Log("cell:", index, len(cell.mapping), "items")
}
time.Sleep(3 * time.Second)
t.Log("after recycle:")
for index, cell := range grid.cells {
t.Log("cell:", index, len(cell.mapping), "items")
}
grid.Destroy()
}
func TestMemoryGrid_Write_LimitCount(t *testing.T) {
grid := NewGrid(2, NewLimitCountOpt(10))
for i := 0; i < 100; i++ {
grid.WriteInt64([]byte(strconv.Itoa(i)), int64(i), 30)
}
t.Log(grid.Stat().CountItems, "items")
}
func TestMemoryGrid_Compress(t *testing.T) {
grid := NewGrid(5, NewCompressOpt(1))
grid.WriteString([]byte("hello"), strings.Repeat("abcd", 10240), 30)
t.Log(len(string(grid.Read([]byte("hello")).String())))
t.Log(len(grid.Read([]byte("hello")).ValueBytes))
}
func BenchmarkMemoryGrid_Performance(b *testing.B) {
runtime.GOMAXPROCS(1)
grid := NewGrid(1024)
for i := 0; i < b.N; i++ {
key := "key:" + strconv.Itoa(i)
grid.WriteInt64([]byte(key), int64(i), 3600)
}
}
func TestMemoryGrid_Performance(t *testing.T) {
runtime.GOMAXPROCS(1)
grid := NewGrid(1024)
now := time.Now()
s := []byte(strings.Repeat("abcd", 10*1024))
for i := 0; i < 100000; i++ {
grid.WriteBytes([]byte(fmt.Sprintf("key:%d_%d", i, 1)), s, 3600)
item := grid.Read([]byte(fmt.Sprintf("key:%d_%d", i, 1)))
if item != nil {
_ = item.String()
}
}
countItems := 0
for _, cell := range grid.cells {
countItems += len(cell.mapping)
}
t.Log(countItems, "items")
t.Log(time.Since(now).Seconds()*1000, "ms")
}
func TestMemoryGrid_Performance_Concurrent(t *testing.T) {
//runtime.GOMAXPROCS(1)
grid := NewGrid(1024)
now := time.Now()
s := []byte(strings.Repeat("abcd", 10*1024))
wg := sync.WaitGroup{}
wg.Add(runtime.NumCPU())
for c := 0; c < runtime.NumCPU(); c++ {
go func(c int) {
defer wg.Done()
for i := 0; i < 50000; i++ {
grid.WriteBytes([]byte(fmt.Sprintf("key:%d_%d", i, c)), s, 3600)
item := grid.Read([]byte(fmt.Sprintf("key:%d_%d", i, c)))
if item != nil {
_ = item.String()
}
}
}(c)
}
wg.Wait()
countItems := 0
for _, cell := range grid.cells {
countItems += len(cell.mapping)
}
t.Log(countItems, "items")
t.Log(time.Since(now).Seconds()*1000, "ms")
}
func TestMemoryGrid_CompressPerformance(t *testing.T) {
runtime.GOMAXPROCS(1)
grid := NewGrid(1024, NewCompressOpt(gzip.BestCompression))
now := time.Now()
data := []byte(strings.Repeat("abcd", 1024))
for i := 0; i < 100000; i++ {
grid.WriteBytes([]byte(fmt.Sprintf("key:%d", i)), data, 3600)
item := grid.Read([]byte(fmt.Sprintf("key:%d", i+100)))
if item != nil {
_ = item.String()
}
}
countItems := 0
for _, cell := range grid.cells {
countItems += len(cell.mapping)
}
t.Log(countItems, "items")
t.Log(time.Since(now).Seconds()*1000, "ms")
}
func TestMemoryGrid_IncreaseInt64(t *testing.T) {
grid := NewGrid(1024)
grid.WriteInt64([]byte("abc"), 123, 10)
grid.IncreaseInt64([]byte("abc"), 123, 10)
grid.IncreaseInt64([]byte("abc"), 123, 10)
item := grid.Read([]byte("abc"))
if item == nil {
t.Fatal("item == nil")
}
if item.ValueInt64 != 369 {
t.Fatal("not 369")
}
}
func TestMemoryGrid_Destroy(t *testing.T) {
grid := NewGrid(1024)
grid.WriteInt64([]byte("abc"), 123, 10)
t.Log(grid.recycleLooper, grid.cells)
grid.Destroy()
t.Log(grid.recycleLooper, grid.cells)
if grid.recycleLooper != nil {
t.Fatal("looper != nil")
}
}
func TestMemoryGrid_Recycle(t *testing.T) {
cell := NewCell()
timestamp := time.Now().Unix()
for i := 0; i < 300_0000; i++ {
cell.Write(uint64(i), &Item{
ExpireAt: timestamp - 30,
})
}
before := time.Now()
cell.Recycle()
t.Log(time.Since(before).Seconds()*1000, "ms")
t.Log(len(cell.mapping))
runtime.GC()
printMem(t)
}
func printMem(t *testing.T) {
mem := &runtime.MemStats{}
runtime.ReadMemStats(mem)
t.Log(mem.Alloc/1024/1024, "M")
}

View File

@@ -1,88 +0,0 @@
package grids
import (
"bytes"
"compress/gzip"
"github.com/cespare/xxhash"
"github.com/iwind/TeaGo/logs"
"sync/atomic"
"unsafe"
)
type ItemType = int8
const (
ItemInt64 = 1
ItemBytes = 2
ItemInterface = 3
)
func HashKey(key []byte) uint64 {
return xxhash.Sum64(key)
}
type Item struct {
Key []byte
ExpireAt int64
Type ItemType
ValueInt64 int64
ValueBytes []byte
ValueInterface interface{}
IsCompressed bool
// linked list
Prev *Item
Next *Item
size int64
}
func NewItem(key []byte, dataType ItemType) *Item {
return &Item{
Key: key,
Type: dataType,
}
}
func (this *Item) HashKey() uint64 {
return HashKey(this.Key)
}
func (this *Item) IncreaseInt64(delta int64) {
atomic.AddInt64(&this.ValueInt64, delta)
}
func (this *Item) Bytes() []byte {
if this.IsCompressed {
reader, err := gzip.NewReader(bytes.NewBuffer(this.ValueBytes))
if err != nil {
logs.Error(err)
return this.ValueBytes
}
buf := make([]byte, 256)
dataBuf := bytes.NewBuffer([]byte{})
for {
n, err := reader.Read(buf)
if n > 0 {
dataBuf.Write(buf[:n])
}
if err != nil {
break
}
}
return dataBuf.Bytes()
}
return this.ValueBytes
}
func (this *Item) String() string {
return string(this.Bytes())
}
func (this *Item) Size() int64 {
if this.size == 0 {
this.size = int64(int(unsafe.Sizeof(*this)) + len(this.Key) + len(this.ValueBytes))
}
return this.size
}

View File

@@ -1,69 +0,0 @@
package grids
import (
"crypto/md5"
"github.com/cespare/xxhash"
"strconv"
"testing"
)
func TestItem_Size(t *testing.T) {
item := &Item{
ValueInt64: 1024,
Key: []byte("123"),
ValueBytes: []byte("Hello, World"),
}
t.Log(item.Size())
}
func BenchmarkItem_Size(b *testing.B) {
item := &Item{
ValueInt64: 1024,
Key: []byte("123"),
ValueBytes: []byte("Hello, World"),
}
for i := 0; i < b.N; i++ {
_ = item.Size()
}
}
func TestItem_HashKey(t *testing.T) {
t.Log(HashKey([]byte("2")))
}
func TestItem_xxHash(t *testing.T) {
result := xxhash.Sum64([]byte("123456"))
t.Log(result)
}
func TestItem_unique(t *testing.T) {
m := map[uint64]bool{}
for i := 0; i < 1000*10000; i++ {
s := "Hello,World,LONG KEY,LONG KEY,LONG KEY,LONG KEY" + strconv.Itoa(i)
result := xxhash.Sum64([]byte(s))
_, ok := m[result]
if ok {
t.Log("found same", i)
break
} else {
m[result] = true
}
}
t.Log(xxhash.Sum64([]byte("01")))
t.Log(xxhash.Sum64([]byte("10")))
}
func BenchmarkItem_HashKeyMd5(b *testing.B) {
for i := 0; i < b.N; i++ {
h := md5.New()
h.Write([]byte("HELLO_KEY_" + strconv.Itoa(i)))
_ = h.Sum(nil)
}
}
func BenchmarkItem_xxHash(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = xxhash.Sum64([]byte("HELLO_KEY_" + strconv.Itoa(i)))
}
}

View File

@@ -1,68 +0,0 @@
package grids
type List struct {
head *Item
end *Item
}
func NewList() *List {
return &List{}
}
func (this *List) Add(item *Item) {
if item == nil {
return
}
if this.end != nil {
this.end.Next = item
item.Prev = this.end
item.Next = nil
}
this.end = item
if this.head == nil {
this.head = item
}
}
func (this *List) Remove(item *Item) {
if item == nil {
return
}
if item.Prev != nil {
item.Prev.Next = item.Next
}
if item.Next != nil {
item.Next.Prev = item.Prev
}
if item == this.head {
this.head = item.Next
}
if item == this.end {
this.end = item.Prev
}
item.Prev = nil
item.Next = nil
}
func (this *List) Len() int {
l := 0
for e := this.head; e != nil; e = e.Next {
l ++
}
return l
}
func (this *List) Range(f func(item *Item) (goNext bool)) {
for e := this.head; e != nil; e = e.Next {
goNext := f(e)
if !goNext {
break
}
}
}
func (this *List) Reset() {
this.head = nil
this.end = nil
}

View File

@@ -1,64 +0,0 @@
package grids
import "testing"
func TestList(t *testing.T) {
l := &List{}
var e1 *Item = nil
{
e := &Item{
ValueInt64: 1,
}
l.Add(e)
e1 = e
}
var e2 *Item = nil
{
e := &Item{
ValueInt64: 2,
}
l.Add(e)
e2 = e
}
var e3 *Item = nil
{
e := &Item{
ValueInt64: 3,
}
l.Add(e)
e3 = e
}
var e4 *Item = nil
{
e := &Item{
ValueInt64: 4,
}
l.Add(e)
e4 = e
}
l.Remove(e1)
//l.Remove(e2)
//l.Remove(e3)
l.Remove(e4)
for e := l.head; e != nil; e = e.Next {
t.Log(e.ValueInt64)
}
t.Log("e1, e2, e3, e4, head, end:", e1, e2, e3, e4)
if l.head != nil {
t.Log("head:", l.head.ValueInt64)
} else {
t.Log("head: nil")
}
if l.end != nil {
t.Log("end:", l.end.ValueInt64)
} else {
t.Log("end: nil")
}
}

View File

@@ -1,11 +0,0 @@
package grids
type CompressOpt struct {
Level int
}
func NewCompressOpt(level int) *CompressOpt {
return &CompressOpt{
Level: level,
}
}

View File

@@ -1,11 +0,0 @@
package grids
type LimitCountOpt struct {
Count int
}
func NewLimitCountOpt(count int) *LimitCountOpt {
return &LimitCountOpt{
Count: count,
}
}

View File

@@ -1,11 +0,0 @@
package grids
type LimitSizeOpt struct {
Size int64
}
func NewLimitSizeOpt(size int64) *LimitSizeOpt {
return &LimitSizeOpt{
Size: size,
}
}

View File

@@ -1,11 +0,0 @@
package grids
type RecycleIntervalOpt struct {
Interval int
}
func NewRecycleIntervalOpt(interval int) *RecycleIntervalOpt {
return &RecycleIntervalOpt{
Interval: interval,
}
}

View File

@@ -1,6 +0,0 @@
package grids
type Stat struct {
TotalBytes int64
CountItems int
}

View File

@@ -1,26 +0,0 @@
package grids
import (
"time"
)
var unixTime = time.Now().Unix()
var unixTimerIsReady = false
func init() {
ticker := time.NewTicker(500 * time.Millisecond)
go func() {
for range ticker.C {
unixTimerIsReady = true
unixTime = time.Now().Unix()
}
}()
}
// 最快获取时间戳的方式,通常用在不需要特别精确时间戳的场景
func UnixTime() int64 {
if unixTimerIsReady {
return unixTime
}
return time.Now().Unix()
}

View File

@@ -1,13 +0,0 @@
package grids
import (
"testing"
"time"
)
func TestUnixTime(t *testing.T) {
for i := 0; i < 5; i++ {
t.Log(UnixTime(), "real:", time.Now().Unix())
time.Sleep(1 * time.Second)
}
}

View File

@@ -1,6 +1,7 @@
package ttlcache package ttlcache
import ( import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
"time" "time"
) )
@@ -12,11 +13,13 @@ import (
// KeyMap列表数据结构 // KeyMap列表数据结构
// { timestamp1 => [key1, key2, ...] }, ... // { timestamp1 => [key1, key2, ...] }, ...
type Cache struct { type Cache struct {
isDestroyed bool
pieces []*Piece pieces []*Piece
countPieces uint64 countPieces uint64
maxItems int maxItems int
gcPieceIndex int gcPieceIndex int
ticker *utils.Ticker
} }
func NewCache(opt ...OptionInterface) *Cache { func NewCache(opt ...OptionInterface) *Cache {
@@ -49,8 +52,8 @@ func NewCache(opt ...OptionInterface) *Cache {
// start timer // start timer
go func() { go func() {
ticker := time.NewTicker(5 * time.Second) cache.ticker = utils.NewTicker(5 * time.Second)
for range ticker.C { for cache.ticker.Next() {
cache.GC() cache.GC()
} }
}() }()
@@ -59,6 +62,10 @@ func NewCache(opt ...OptionInterface) *Cache {
} }
func (this *Cache) Write(key string, value interface{}, expiredAt int64) { func (this *Cache) Write(key string, value interface{}, expiredAt int64) {
if this.isDestroyed {
return
}
currentTimestamp := time.Now().Unix() currentTimestamp := time.Now().Unix()
if expiredAt <= currentTimestamp { if expiredAt <= currentTimestamp {
return return
@@ -76,6 +83,25 @@ func (this *Cache) Write(key string, value interface{}, expiredAt int64) {
}) })
} }
func (this *Cache) IncreaseInt64(key string, delta int64, expiredAt int64) int64 {
if this.isDestroyed {
return 0
}
currentTimestamp := time.Now().Unix()
if expiredAt <= currentTimestamp {
return 0
}
maxExpiredAt := currentTimestamp + 30*86400
if expiredAt > maxExpiredAt {
expiredAt = maxExpiredAt
}
uint64Key := HashKey([]byte(key))
pieceIndex := uint64Key % this.countPieces
return this.pieces[pieceIndex].IncreaseInt64(uint64Key, delta, expiredAt)
}
func (this *Cache) Read(key string) (item *Item) { func (this *Cache) Read(key string) (item *Item) {
uint64Key := HashKey([]byte(key)) uint64Key := HashKey([]byte(key))
return this.pieces[uint64Key%this.countPieces].Read(uint64Key) return this.pieces[uint64Key%this.countPieces].Read(uint64Key)
@@ -109,3 +135,15 @@ func (this *Cache) GC() {
} }
this.gcPieceIndex = newIndex this.gcPieceIndex = newIndex
} }
func (this *Cache) Destroy() {
this.isDestroyed = true
if this.ticker != nil {
this.ticker.Stop()
this.ticker = nil
}
for _, piece := range this.pieces {
piece.Destroy()
}
}

View File

@@ -36,6 +36,27 @@ func BenchmarkCache_Add(b *testing.B) {
} }
} }
func TestCache_IncreaseInt64(t *testing.T) {
var cache = NewCache()
{
cache.IncreaseInt64("a", 1, time.Now().Unix()+3600)
t.Log(cache.Read("a"))
}
{
cache.IncreaseInt64("a", 1, time.Now().Unix()+3600+1)
t.Log(cache.Read("a"))
}
{
cache.Write("b", 1, time.Now().Unix()+3600+2)
t.Log(cache.Read("b"))
}
{
cache.IncreaseInt64("b", 1, time.Now().Unix()+3600+3)
t.Log(cache.Read("b"))
}
}
func TestCache_Read(t *testing.T) { func TestCache_Read(t *testing.T) {
runtime.GOMAXPROCS(1) runtime.GOMAXPROCS(1)

View File

@@ -2,6 +2,7 @@ package ttlcache
import ( import (
"github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/types"
"sync" "sync"
"time" "time"
) )
@@ -26,6 +27,26 @@ func (this *Piece) Add(key uint64, item *Item) () {
this.locker.Unlock() this.locker.Unlock()
} }
func (this *Piece) IncreaseInt64(key uint64, delta int64, expiredAt int64) (result int64) {
this.locker.Lock()
item, ok := this.m[key]
if ok {
result := types.Int64(item.Value) + delta
item.Value = result
item.expiredAt = expiredAt
} else {
if len(this.m) < this.maxItems {
result = delta
this.m[key] = &Item{
Value: delta,
expiredAt: expiredAt,
}
}
}
this.locker.Unlock()
return
}
func (this *Piece) Delete(key uint64) { func (this *Piece) Delete(key uint64) {
this.locker.Lock() this.locker.Lock()
delete(this.m, key) delete(this.m, key)
@@ -60,3 +81,9 @@ func (this *Piece) GC() {
} }
this.locker.Unlock() this.locker.Unlock()
} }
func (this *Piece) Destroy() {
this.locker.Lock()
this.m = nil
this.locker.Unlock()
}

View File

@@ -1,7 +1,7 @@
package checkpoints package checkpoints
import ( import (
"github.com/TeaOSLab/EdgeNode/internal/grids" "github.com/TeaOSLab/EdgeNode/internal/ttlcache"
"github.com/TeaOSLab/EdgeNode/internal/waf/requests" "github.com/TeaOSLab/EdgeNode/internal/waf/requests"
"github.com/iwind/TeaGo/maps" "github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/types" "github.com/iwind/TeaGo/types"
@@ -9,6 +9,7 @@ import (
"regexp" "regexp"
"strings" "strings"
"sync" "sync"
"time"
) )
// ${cc.arg} // ${cc.arg}
@@ -16,8 +17,8 @@ import (
type CCCheckpoint struct { type CCCheckpoint struct {
Checkpoint Checkpoint
grid *grids.Grid cache *ttlcache.Cache
once sync.Once once sync.Once
} }
func (this *CCCheckpoint) Init() { func (this *CCCheckpoint) Init() {
@@ -25,20 +26,20 @@ func (this *CCCheckpoint) Init() {
} }
func (this *CCCheckpoint) Start() { func (this *CCCheckpoint) Start() {
if this.grid != nil { if this.cache != nil {
this.grid.Destroy() this.cache.Destroy()
} }
this.grid = grids.NewGrid(32, grids.NewLimitCountOpt(1000_0000)) this.cache = ttlcache.NewCache()
} }
func (this *CCCheckpoint) RequestValue(req *requests.Request, param string, options maps.Map) (value interface{}, sysErr error, userErr error) { func (this *CCCheckpoint) RequestValue(req *requests.Request, param string, options maps.Map) (value interface{}, sysErr error, userErr error) {
value = 0 value = 0
if this.grid == nil { if this.cache == nil {
this.once.Do(func() { this.once.Do(func() {
this.Start() this.Start()
}) })
if this.grid == nil { if this.cache == nil {
return return
} }
} }
@@ -115,7 +116,7 @@ func (this *CCCheckpoint) RequestValue(req *requests.Request, param string, opti
if len(key) == 0 { if len(key) == 0 {
key = this.ip(req) key = this.ip(req)
} }
value = this.grid.IncreaseInt64([]byte(key), 1, period) value = this.cache.IncreaseInt64(key, int64(1), time.Now().Unix()+period)
} }
return return
@@ -204,9 +205,9 @@ func (this *CCCheckpoint) Options() []OptionInterface {
} }
func (this *CCCheckpoint) Stop() { func (this *CCCheckpoint) Stop() {
if this.grid != nil { if this.cache != nil {
this.grid.Destroy() this.cache.Destroy()
this.grid = nil this.cache = nil
} }
} }