diff --git a/internal/caches/item.go b/internal/caches/item.go index 2e1f248..d5bdb0d 100644 --- a/internal/caches/item.go +++ b/internal/caches/item.go @@ -22,13 +22,14 @@ type Item struct { Type ItemType `json:"-"` Key string `json:"1,omitempty"` ExpiresAt int64 `json:"2,omitempty"` - StaleAt int64 `json:"-"` + StaleAt int64 `json:"3,omitempty"` HeaderSize int64 `json:"-"` - BodySize int64 `json:"-"` + BodySize int64 `json:"4,omitempty"` MetaSize int64 `json:"-"` - Host string `json:"-"` // 主机名 - ServerId int64 `json:"3,omitempty"` // 服务ID + Host string `json:"-"` // 主机名 + ServerId int64 `json:"5,omitempty"` // 服务ID Week int32 `json:"-"` + CreatedAt int64 `json:"6,omitempty"` } func (this *Item) IsExpired() bool { diff --git a/internal/caches/list_file_kv.go b/internal/caches/list_file_kv.go new file mode 100644 index 0000000..09bb0bc --- /dev/null +++ b/internal/caches/list_file_kv.go @@ -0,0 +1,296 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package caches + +import ( + "fmt" + "github.com/TeaOSLab/EdgeNode/internal/goman" + "github.com/TeaOSLab/EdgeNode/internal/utils/fnv" + "github.com/iwind/TeaGo/types" + "strings" + "testing" +) + +const countKVStores = 10 + +type KVFileList struct { + dir string + stores [countKVStores]*KVListFileStore + + onAdd func(item *Item) + onRemove func(item *Item) +} + +func NewKVFileList(dir string) *KVFileList { + dir = strings.TrimSuffix(dir, "/") + + var stores = [countKVStores]*KVListFileStore{} + for i := 0; i < countKVStores; i++ { + stores[i] = NewKVListFileStore(dir + "/db-" + types.String(i) + ".store") + } + + return &KVFileList{ + dir: dir, + stores: stores, + } +} + +// Init 初始化 +func (this *KVFileList) Init() error { + for _, store := range this.stores { + err := store.Open() + if err != nil { + return fmt.Errorf("open store '"+store.Path()+"' failed: %w", err) + } + } + + return nil +} + +// Reset 重置数据 +func (this *KVFileList) Reset() error { + // do nothing + return nil +} + +// Add 添加内容 +func (this *KVFileList) Add(hash string, item *Item) error { + err := this.getStore(hash).AddItem(hash, item) + if err != nil { + return err + } + + if this.onAdd != nil { + this.onAdd(item) + } + + return nil +} + +// Exist 检查内容是否存在 +func (this *KVFileList) Exist(hash string) (bool, error) { + return this.getStore(hash).ExistItem(hash) +} + +// ExistQuick 快速检查内容是否存在 +func (this *KVFileList) ExistQuick(hash string) (bool, error) { + return this.getStore(hash).ExistQuickItem(hash) +} + +// CleanPrefix 清除某个前缀的缓存 +func (this *KVFileList) CleanPrefix(prefix string) error { + var group = goman.NewTaskGroup() + var lastErr error + for _, store := range this.stores { + var storeCopy = store + group.Run(func() { + err := storeCopy.CleanItemsWithPrefix(prefix) + if err != nil { + lastErr = err + } + }) + } + group.Wait() + return lastErr +} + +// CleanMatchKey 清除通配符匹配的Key +func (this *KVFileList) CleanMatchKey(key string) error { + var group = goman.NewTaskGroup() + var lastErr error + for _, store := range this.stores { + var storeCopy = store + group.Run(func() { + err := storeCopy.CleanItemsWithWildcardKey(key) + if err != nil { + lastErr = err + } + }) + } + group.Wait() + return lastErr +} + +// CleanMatchPrefix 清除通配符匹配的前缀 +func (this *KVFileList) CleanMatchPrefix(prefix string) error { + var group = goman.NewTaskGroup() + var lastErr error + for _, store := range this.stores { + var storeCopy = store + group.Run(func() { + err := storeCopy.CleanItemsWithWildcardPrefix(prefix) + if err != nil { + lastErr = err + } + }) + } + group.Wait() + return lastErr +} + +// Remove 删除内容 +func (this *KVFileList) Remove(hash string) error { + err := this.getStore(hash).RemoveItem(hash) + if err != nil { + return err + } + + if this.onRemove != nil { + // when remove file item, no any extra information needed + this.onRemove(nil) + } + + return nil +} + +// Purge 清理过期数据 +func (this *KVFileList) Purge(count int, callback func(hash string) error) (int, error) { + count /= countKVStores + if count <= 0 { + count = 100 + } + + var countFound = 0 + var lastErr error + for _, store := range this.stores { + purgeCount, err := store.PurgeItems(count, callback) + countFound += purgeCount + if err != nil { + lastErr = err + } + } + + return countFound, lastErr +} + +// PurgeLFU 清理LFU数据 +func (this *KVFileList) PurgeLFU(count int, callback func(hash string) error) error { + count /= countKVStores + if count <= 0 { + count = 100 + } + + var lastErr error + for _, store := range this.stores { + err := store.PurgeLFUItems(count, callback) + if err != nil { + lastErr = err + } + } + return lastErr +} + +// CleanAll 清除所有缓存 +func (this *KVFileList) CleanAll() error { + var group = goman.NewTaskGroup() + var lastErr error + for _, store := range this.stores { + var storeCopy = store + group.Run(func() { + err := storeCopy.RemoveAllItems() + if err != nil { + lastErr = err + } + }) + } + group.Wait() + return lastErr +} + +// Stat 统计 +func (this *KVFileList) Stat(check func(hash string) bool) (*Stat, error) { + var stat = &Stat{} + + var group = goman.NewTaskGroup() + + var lastErr error + for _, store := range this.stores { + var storeCopy = store + group.Run(func() { + storeStat, err := storeCopy.StatItems() + if err != nil { + lastErr = err + return + } + + group.Lock() + stat.Size += storeStat.Size + stat.ValueSize += storeStat.ValueSize + stat.Count += storeStat.Count + group.Unlock() + }) + } + + group.Wait() + + return stat, lastErr +} + +// Count 总数量 +func (this *KVFileList) Count() (int64, error) { + var count int64 + + var group = goman.NewTaskGroup() + + var lastErr error + for _, store := range this.stores { + var storeCopy = store + group.Run(func() { + countStoreItems, err := storeCopy.CountItems() + if err != nil { + lastErr = err + return + } + + group.Lock() + count += countStoreItems + group.Unlock() + }) + } + + group.Wait() + + return count, lastErr +} + +// OnAdd 添加事件 +func (this *KVFileList) OnAdd(fn func(item *Item)) { + this.onAdd = fn +} + +// OnRemove 删除事件 +func (this *KVFileList) OnRemove(fn func(item *Item)) { + this.onRemove = fn +} + +// Close 关闭 +func (this *KVFileList) Close() error { + var lastErr error + for _, store := range this.stores { + err := store.Close() + if err != nil { + lastErr = err + } + } + return lastErr +} + +// IncreaseHit 增加点击量 +func (this *KVFileList) IncreaseHit(hash string) error { + // do nothing + return nil +} + +func (this *KVFileList) TestInspect(t *testing.T) error { + for _, store := range this.stores { + err := store.TestInspect(t) + if err != nil { + return err + } + } + return nil +} + +func (this *KVFileList) getStore(hash string) *KVListFileStore { + return this.stores[fnv.HashString(hash)%countKVStores] +} diff --git a/internal/caches/list_file_kv_objects.go b/internal/caches/list_file_kv_objects.go new file mode 100644 index 0000000..5654f5d --- /dev/null +++ b/internal/caches/list_file_kv_objects.go @@ -0,0 +1,66 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package caches + +import ( + "encoding/binary" + "encoding/json" + "strings" +) + +// ItemKVEncoder item encoder +type ItemKVEncoder[T interface{ *Item }] struct { +} + +func NewItemKVEncoder[T interface{ *Item }]() *ItemKVEncoder[T] { + return &ItemKVEncoder[T]{} +} + +func (this *ItemKVEncoder[T]) Encode(value T) ([]byte, error) { + return json.Marshal(value) +} + +func (this *ItemKVEncoder[T]) EncodeField(value T, fieldName string) ([]byte, error) { + switch fieldName { + case "createdAt": + var b = make([]byte, 4) + var createdAt = any(value).(*Item).CreatedAt + binary.BigEndian.PutUint32(b, uint32(createdAt)) + return b, nil + case "staleAt": + var b = make([]byte, 4) + var staleAt = any(value).(*Item).StaleAt + if staleAt < 0 { + staleAt = 0 + } + binary.BigEndian.PutUint32(b, uint32(staleAt)) + return b, nil + case "serverId": + var b = make([]byte, 4) + var serverId = any(value).(*Item).ServerId + if serverId < 0 { + serverId = 0 + } + binary.BigEndian.PutUint32(b, uint32(serverId)) + return b, nil + case "key": + return []byte(any(value).(*Item).Key), nil + case "wildKey": + var key = any(value).(*Item).Key + var dotIndex = strings.Index(key, ".") + if dotIndex > 0 { + var slashIndex = strings.LastIndex(key[:dotIndex], "/") + if slashIndex > 0 { + key = key[:dotIndex][:slashIndex+1] + "*" + key[dotIndex:] + } + } + + return []byte(key), nil + } + return nil, nil +} + +func (this *ItemKVEncoder[T]) Decode(valueBytes []byte) (value T, err error) { + err = json.Unmarshal(valueBytes, &value) + return +} diff --git a/internal/caches/list_file_kv_objects_test.go b/internal/caches/list_file_kv_objects_test.go new file mode 100644 index 0000000..bc13082 --- /dev/null +++ b/internal/caches/list_file_kv_objects_test.go @@ -0,0 +1,69 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package caches_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/caches" + "github.com/iwind/TeaGo/assert" + "testing" +) + +func TestItemKVEncoder_EncodeField(t *testing.T) { + var a = assert.NewAssertion(t) + + var encoder = caches.NewItemKVEncoder[*caches.Item]() + { + key, err := encoder.EncodeField(&caches.Item{ + Key: "https://example.com/index.html", + }, "key") + if err != nil { + t.Fatal(err) + } + t.Log("key:", string(key)) + a.IsTrue(string(key) == "https://example.com/index.html") + } + + { + key, err := encoder.EncodeField(&caches.Item{ + Key: "", + }, "wildKey") + if err != nil { + t.Fatal(err) + } + t.Log("key:", string(key)) + a.IsTrue(string(key) == "") + } + + { + key, err := encoder.EncodeField(&caches.Item{ + Key: "example.com/index.html", + }, "wildKey") + if err != nil { + t.Fatal(err) + } + t.Log("key:", string(key)) + a.IsTrue(string(key) == "example.com/index.html") + } + + { + key, err := encoder.EncodeField(&caches.Item{ + Key: "https://example.com/index.html", + }, "wildKey") + if err != nil { + t.Fatal(err) + } + t.Log("key:", string(key)) + a.IsTrue(string(key) == "https://*.com/index.html") + } + + { + key, err := encoder.EncodeField(&caches.Item{ + Key: "https://www.example.com/index.html", + }, "wildKey") + if err != nil { + t.Fatal(err) + } + t.Log("key:", string(key)) + a.IsTrue(string(key) == "https://*.example.com/index.html") + } +} diff --git a/internal/caches/list_file_kv_store.go b/internal/caches/list_file_kv_store.go new file mode 100644 index 0000000..5b77c7f --- /dev/null +++ b/internal/caches/list_file_kv_store.go @@ -0,0 +1,480 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package caches + +import ( + "errors" + "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" + "github.com/TeaOSLab/EdgeNode/internal/utils/kvstore" + "github.com/cockroachdb/pebble" + "github.com/iwind/TeaGo/types" + "regexp" + "strings" + "testing" +) + +type KVListFileStore struct { + path string + rawStore *kvstore.Store + + // tables + itemsTable *kvstore.Table[*Item] + + isReady bool +} + +func NewKVListFileStore(path string) *KVListFileStore { + return &KVListFileStore{ + path: path, + } +} + +func (this *KVListFileStore) Open() error { + var reg = regexp.MustCompile(`^(.+)/([\w-]+)(\.store)$`) + var matches = reg.FindStringSubmatch(this.path) + if len(matches) != 4 { + return errors.New("invalid path '" + this.path + "'") + } + var dir = matches[1] + var name = matches[2] + + rawStore, err := kvstore.OpenStoreDir(dir, name) + if err != nil { + return err + } + this.rawStore = rawStore + + db, err := rawStore.NewDB("cache") + if err != nil { + return err + } + + { + table, tableErr := kvstore.NewTable[*Item]("items", NewItemKVEncoder[*Item]()) + if tableErr != nil { + return tableErr + } + + err = table.AddFields("staleAt", "key", "wildKey", "createdAt") + if err != nil { + return err + } + + db.AddTable(table) + this.itemsTable = table + } + + this.isReady = true + + return nil +} + +func (this *KVListFileStore) Path() string { + return this.path +} + +func (this *KVListFileStore) AddItem(hash string, item *Item) error { + if !this.isReady { + return nil + } + + var currentTime = fasttime.Now().Unix() + if item.ExpiresAt <= currentTime { + return errors.New("invalid expires time '" + types.String(item.ExpiresAt) + "'") + } + if item.CreatedAt <= 0 { + item.CreatedAt = currentTime + } + if item.StaleAt <= 0 { + item.StaleAt = item.ExpiresAt + DefaultStaleCacheSeconds + } + return this.itemsTable.Set(hash, item) +} + +func (this *KVListFileStore) ExistItem(hash string) (bool, error) { + if !this.isReady { + return false, nil + } + + item, err := this.itemsTable.Get(hash) + if err != nil { + if kvstore.IsKeyNotFound(err) { + return false, nil + } + return false, err + } + if item == nil { + return false, nil + } + + return item.ExpiresAt >= fasttime.NewFastTime().Unix(), nil +} + +func (this *KVListFileStore) ExistQuickItem(hash string) (bool, error) { + if !this.isReady { + return false, nil + } + + return this.itemsTable.Exist(hash) +} + +func (this *KVListFileStore) RemoveItem(hash string) error { + if !this.isReady { + return nil + } + + return this.itemsTable.Delete(hash) +} + +func (this *KVListFileStore) RemoveAllItems() error { + if !this.isReady { + return nil + } + + return this.itemsTable.Truncate() +} + +func (this *KVListFileStore) PurgeItems(count int, callback func(hash string) error) (int, error) { + if !this.isReady { + return 0, nil + } + + var countFound int + var currentTime = fasttime.Now().Unix() + var hashList []string + err := this.itemsTable. + Query(). + FieldAsc("staleAt"). + Limit(count). + FindAll(func(tx *kvstore.Tx[*Item], item kvstore.Item[*Item]) (goNext bool, err error) { + if item.Value == nil { + return true, nil + } + if item.Value.StaleAt < currentTime { + countFound++ + hashList = append(hashList, item.Key) + return true, nil + } + return false, nil + }) + if err != nil { + return 0, err + } + + // delete items + if len(hashList) > 0 { + txErr := this.itemsTable.WriteTx(func(tx *kvstore.Tx[*Item]) error { + for _, hash := range hashList { + deleteErr := tx.Delete(hash) + if deleteErr != nil { + return deleteErr + } + } + return nil + }) + if txErr != nil { + return 0, txErr + } + + for _, hash := range hashList { + callbackErr := callback(hash) + if callbackErr != nil { + return 0, callbackErr + } + } + } + + return countFound, nil +} + +func (this *KVListFileStore) PurgeLFUItems(count int, callback func(hash string) error) error { + if !this.isReady { + return nil + } + + var hashList []string + err := this.itemsTable. + Query(). + FieldAsc("createdAt"). + Limit(count). + FindAll(func(tx *kvstore.Tx[*Item], item kvstore.Item[*Item]) (goNext bool, err error) { + if item.Value != nil { + hashList = append(hashList, item.Key) + } + return true, nil + }) + if err != nil { + return err + } + + // delete items + if len(hashList) > 0 { + txErr := this.itemsTable.WriteTx(func(tx *kvstore.Tx[*Item]) error { + for _, hash := range hashList { + deleteErr := tx.Delete(hash) + if deleteErr != nil { + return deleteErr + } + } + return nil + }) + if txErr != nil { + return txErr + } + + for _, hash := range hashList { + callbackErr := callback(hash) + if callbackErr != nil { + return callbackErr + } + } + } + + return nil +} + +func (this *KVListFileStore) CleanItemsWithPrefix(prefix string) error { + if !this.isReady { + return nil + } + + if len(prefix) == 0 { + return nil + } + + var currentTime = fasttime.Now().Unix() + + var fieldOffset []byte + const size = 1000 + for { + var count int + err := this.itemsTable. + Query(). + FieldPrefix("key", prefix). + FieldOffset(fieldOffset). + Limit(size). + ForUpdate(). + FindAll(func(tx *kvstore.Tx[*Item], item kvstore.Item[*Item]) (goNext bool, err error) { + if item.Value == nil { + return true, nil + } + + count++ + fieldOffset = item.FieldKey + + if item.Value.CreatedAt >= currentTime { + return true, nil + } + if item.Value.ExpiresAt == 0 { + return true, nil + } + + item.Value.ExpiresAt = 0 + item.Value.StaleAt = 0 + + setErr := tx.Set(item.Key, item.Value) // TODO improve performance + if setErr != nil { + return false, setErr + } + + return true, nil + }) + if err != nil { + return err + } + + if count < size { + break + } + } + + return nil +} + +func (this *KVListFileStore) CleanItemsWithWildcardPrefix(prefix string) error { + if !this.isReady { + return nil + } + + if len(prefix) == 0 { + return nil + } + + var currentTime = fasttime.Now().Unix() + + var fieldOffset []byte + const size = 1000 + for { + var count int + err := this.itemsTable. + Query(). + FieldPrefix("wildKey", prefix). + FieldOffset(fieldOffset). + Limit(size). + ForUpdate(). + FindAll(func(tx *kvstore.Tx[*Item], item kvstore.Item[*Item]) (goNext bool, err error) { + if item.Value == nil { + return true, nil + } + + count++ + fieldOffset = item.FieldKey + + if item.Value.CreatedAt >= currentTime { + return true, nil + } + if item.Value.ExpiresAt == 0 { + return true, nil + } + + item.Value.ExpiresAt = 0 + item.Value.StaleAt = 0 + + setErr := tx.Set(item.Key, item.Value) // TODO improve performance + if setErr != nil { + return false, setErr + } + + return true, nil + }) + if err != nil { + return err + } + + if count < size { + break + } + } + + return nil +} + +func (this *KVListFileStore) CleanItemsWithWildcardKey(key string) error { + if !this.isReady { + return nil + } + + if len(key) == 0 { + return nil + } + + var currentTime = fasttime.Now().Unix() + + for _, realKey := range []string{key, key + SuffixAll} { + var fieldOffset = append(this.itemsTable.FieldKey("wildKey"), '$') + fieldOffset = append(fieldOffset, realKey...) + const size = 1000 + + var wildKey string + if !strings.HasSuffix(realKey, SuffixAll) { + wildKey = string(append([]byte(realKey), 0, 0)) + } else { + wildKey = realKey + } + + for { + var count int + err := this.itemsTable. + Query(). + FieldPrefix("wildKey", wildKey). + FieldOffset(fieldOffset). + Limit(size). + ForUpdate(). + FindAll(func(tx *kvstore.Tx[*Item], item kvstore.Item[*Item]) (goNext bool, err error) { + if item.Value == nil { + return true, nil + } + + count++ + fieldOffset = item.FieldKey + + if item.Value.CreatedAt >= currentTime { + return true, nil + } + if item.Value.ExpiresAt == 0 { + return true, nil + } + + item.Value.ExpiresAt = 0 + item.Value.StaleAt = 0 + + setErr := tx.Set(item.Key, item.Value) // TODO improve performance + if setErr != nil { + return false, setErr + } + + return true, nil + }) + if err != nil { + return err + } + + if count < size { + break + } + } + } + + return nil +} + +func (this *KVListFileStore) CountItems() (int64, error) { + if !this.isReady { + return 0, nil + } + + return this.itemsTable.Count() +} + +func (this *KVListFileStore) StatItems() (*Stat, error) { + if !this.isReady { + return &Stat{}, nil + } + + var stat = &Stat{} + + err := this.itemsTable. + Query(). + FindAll(func(tx *kvstore.Tx[*Item], item kvstore.Item[*Item]) (goNext bool, err error) { + if item.Value != nil { + stat.Size += item.Value.Size() + stat.ValueSize += item.Value.BodySize + stat.Count++ + } + return true, nil + }) + return stat, err +} + +func (this *KVListFileStore) TestInspect(t *testing.T) error { + if !this.isReady { + return nil + } + + it, err := this.rawStore.RawDB().NewIter(&pebble.IterOptions{}) + if err != nil { + return err + } + defer func() { + _ = it.Close() + }() + + for it.First(); it.Valid(); it.Next() { + valueBytes, valueErr := it.ValueAndErr() + if valueErr != nil { + return valueErr + } + t.Log(string(it.Key()), "=>", string(valueBytes)) + } + return nil +} + +func (this *KVListFileStore) Close() error { + this.isReady = false + + if this.rawStore != nil { + return this.rawStore.Close() + } + + return nil +} diff --git a/internal/caches/list_file_kv_test.go b/internal/caches/list_file_kv_test.go new file mode 100644 index 0000000..f17fc34 --- /dev/null +++ b/internal/caches/list_file_kv_test.go @@ -0,0 +1,317 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package caches_test + +import ( + "fmt" + "github.com/TeaOSLab/EdgeNode/internal/caches" + "github.com/TeaOSLab/EdgeNode/internal/utils/testutils" + "github.com/iwind/TeaGo/Tea" + _ "github.com/iwind/TeaGo/bootstrap" + stringutil "github.com/iwind/TeaGo/utils/string" + "math/rand" + "strconv" + "sync" + "testing" + "time" +) + +var testingKVList *caches.KVFileList + +func TestMain(m *testing.M) { + m.Run() + + if testingKVList != nil { + _ = testingKVList.Close() + } +} + +func testOpenKVFileList(t *testing.T) *caches.KVFileList { + var list = caches.NewKVFileList(Tea.Root + "/data/stores/cache-stores") + err := list.Init() + if err != nil { + t.Fatal(err) + } + + testingKVList = list + return list +} + +func TestNewKVFileList(t *testing.T) { + _ = testOpenKVFileList(t) +} + +func TestKVFileList_Add(t *testing.T) { + var list = testOpenKVFileList(t) + + err := list.Add(stringutil.Md5("123456"), &caches.Item{ + Type: caches.ItemTypeFile, + Key: "https://example.com/index.html", + ExpiresAt: time.Now().Unix() + 60, + StaleAt: 0, + HeaderSize: 0, + BodySize: 4096, + MetaSize: 0, + Host: "", + ServerId: 1, + Week: 0, + }) + if err != nil { + t.Fatal(err) + } +} + +func TestKVFileList_Add_Many(t *testing.T) { + if !testutils.IsSingleTesting() { + return + } + + var list = testOpenKVFileList(t) + + const start = 0 + const count = 1_000_000 + const concurrent = 100 + + var before = time.Now() + defer func() { + var costSeconds = time.Since(before).Seconds() + t.Log("cost:", fmt.Sprintf("%.2fs", costSeconds), "qps:", fmt.Sprintf("%.2fK/s", float64(count)/1000/costSeconds)) + }() + + var wg = &sync.WaitGroup{} + wg.Add(concurrent) + for c := 0; c < concurrent; c++ { + go func(c int) { + defer wg.Done() + + var segmentStart = start + count/concurrent*c + for i := segmentStart; i < segmentStart+count/concurrent; i++ { + err := list.Add(stringutil.Md5(strconv.Itoa(i)), &caches.Item{ + Type: caches.ItemTypeFile, + Key: "https://www.example.com/index.html" + strconv.Itoa(i), + ExpiresAt: time.Now().Unix() + 60, + StaleAt: 0, + HeaderSize: 0, + BodySize: int64(rand.Int() % 1_000_000), + MetaSize: 0, + Host: "", + ServerId: 1, + Week: 0, + }) + if err != nil { + t.Log(err) + } + } + }(c) + } + wg.Wait() +} + +func TestKVFileList_Add_Many_Suffix(t *testing.T) { + if !testutils.IsSingleTesting() { + return + } + + var list = testOpenKVFileList(t) + + const start = 0 + const count = 1000 + const concurrent = 100 + + var before = time.Now() + defer func() { + var costSeconds = time.Since(before).Seconds() + t.Log("cost:", fmt.Sprintf("%.2fs", costSeconds), "qps:", fmt.Sprintf("%.2fK/s", float64(count)/1000/costSeconds)) + }() + + var wg = &sync.WaitGroup{} + wg.Add(concurrent) + for c := 0; c < concurrent; c++ { + go func(c int) { + defer wg.Done() + + var segmentStart = start + count/concurrent*c + for i := segmentStart; i < segmentStart+count/concurrent; i++ { + err := list.Add(stringutil.Md5(strconv.Itoa(i)+caches.SuffixAll), &caches.Item{ + Type: caches.ItemTypeFile, + Key: "https://www.example.com/index.html" + strconv.Itoa(i) + caches.SuffixAll + "zip", + ExpiresAt: time.Now().Unix() + 60, + StaleAt: 0, + HeaderSize: 0, + BodySize: int64(rand.Int() % 1_000_000), + MetaSize: 0, + Host: "", + ServerId: 1, + Week: 0, + }) + if err != nil { + t.Log(err) + } + } + }(c) + } + wg.Wait() +} + +func TestKVFileList_Exist(t *testing.T) { + var list = testOpenKVFileList(t) + for _, hash := range []string{ + stringutil.Md5("123456"), + stringutil.Md5("654321"), + } { + b, err := list.Exist(hash) + if err != nil { + t.Fatal(err) + } + t.Log(hash, "=>", b) + } +} + +func TestKVFileList_Remove(t *testing.T) { + var list = testOpenKVFileList(t) + for _, hash := range []string{ + stringutil.Md5("123456"), + stringutil.Md5("654321"), + } { + err := list.Remove(hash) + if err != nil { + t.Fatal(err) + } + } +} + +func TestKVFileList_CleanAll(t *testing.T) { + var list = testOpenKVFileList(t) + err := list.CleanAll() + if err != nil { + t.Fatal(err) + } +} + +func TestKVFileList_Inspect(t *testing.T) { + if !testutils.IsSingleTesting() { + return + } + + var list = testOpenKVFileList(t) + err := list.TestInspect(t) + if err != nil { + t.Fatal(err) + } +} + +func TestKVFileList_Purge(t *testing.T) { + var list = testOpenKVFileList(t) + var before = time.Now() + count, err := list.Purge(4_000, func(hash string) error { + //t.Log("hash:", hash) + return nil + }) + if err != nil { + t.Fatal(err) + } + t.Log("cost:", fmt.Sprintf("%.2fms", time.Since(before).Seconds()*1000), "count:", count) +} + +func TestKVFileList_PurgeLFU(t *testing.T) { + var list = testOpenKVFileList(t) + var before = time.Now() + err := list.PurgeLFU(20000, func(hash string) error { + t.Log("hash:", hash) + return nil + }) + if err != nil { + t.Fatal(err) + } + t.Log("cost:", fmt.Sprintf("%.2fms", time.Since(before).Seconds()*1000)) +} + +func TestKVFileList_Count(t *testing.T) { + var list = testOpenKVFileList(t) + var before = time.Now() + count, err := list.Count() + if err != nil { + t.Fatal(err) + } + t.Log("cost:", fmt.Sprintf("%.2fms", time.Since(before).Seconds()*1000), "count:", count) +} + +func TestKVFileList_Stat(t *testing.T) { + var list = testOpenKVFileList(t) + var before = time.Now() + stat, err := list.Stat(func(hash string) bool { + return true + }) + if err != nil { + t.Fatal(err) + } + t.Log("cost:", fmt.Sprintf("%.2fms", time.Since(before).Seconds()*1000), "stat:", fmt.Sprintf("%+v", stat)) +} + +func TestKVFileList_CleanPrefix(t *testing.T) { + var list = testOpenKVFileList(t) + var before = time.Now() + + defer func() { + var costSeconds = time.Since(before).Seconds() + t.Log("cost:", fmt.Sprintf("%.2fms", costSeconds*1000)) + }() + + err := list.CleanPrefix("https://www.example.com/index.html") + if err != nil { + t.Fatal(err) + } +} + +func TestKVFileList_CleanMatchPrefix(t *testing.T) { + var list = testOpenKVFileList(t) + var before = time.Now() + + defer func() { + var costSeconds = time.Since(before).Seconds() + t.Log("cost:", fmt.Sprintf("%.2fms", costSeconds*1000)) + }() + + err := list.CleanMatchPrefix("https://*.example.com/index.html") + if err != nil { + t.Fatal(err) + } +} + +func TestKVFileList_CleanMatchKey(t *testing.T) { + var list = testOpenKVFileList(t) + var before = time.Now() + + defer func() { + var costSeconds = time.Since(before).Seconds() + t.Log("cost:", fmt.Sprintf("%.2fms", costSeconds*1000)) + }() + + err := list.CleanMatchKey("https://*.example.com/index.html123") + if err != nil { + t.Fatal(err) + } +} + +func BenchmarkKVFileList_Exist(b *testing.B) { + var list = caches.NewKVFileList(Tea.Root + "/data/stores/cache-stores") + err := list.Init() + if err != nil { + b.Fatal(err) + } + + defer func() { + _ = list.Close() + }() + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, existErr := list.Exist(stringutil.Md5(strconv.Itoa(rand.Int() % 2_000_000))) + if existErr != nil { + b.Fatal(existErr) + } + } + }) +} diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index c04ec96..6a1e9c7 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -258,12 +258,25 @@ func (this *FileStorage) Init() error { return errors.New("[CACHE]cache storage dir can not be empty") } - var list = NewSQLiteFileList(dir + "/p" + types.String(this.policy.Id) + "/.indexes") - err = list.Init() - if err != nil { - return err + // read list + var list ListInterface + var sqliteIndexesDir = dir + "/p" + types.String(this.policy.Id) + "/.indexes" + _, sqliteIndexesDirErr := os.Stat(sqliteIndexesDir) + if sqliteIndexesDirErr == nil || !teaconst.EnableKVCacheStore { + list = NewSQLiteFileList(sqliteIndexesDir) + err = list.Init() + if err != nil { + return err + } + list.(*SQLiteFileList).SetOldDir(dir + "/p" + types.String(this.policy.Id)) + } else { + list = NewKVFileList(dir + "/p" + types.String(this.policy.Id) + "/.stores") + err = list.Init() + if err != nil { + return err + } } - list.(*SQLiteFileList).SetOldDir(dir + "/p" + types.String(this.policy.Id)) + this.list = list // 检查目录是否存在 @@ -1608,7 +1621,8 @@ func (this *FileStorage) subDir(hash string) (dirPath string, dirIsFull bool) { // ScanGarbageCaches 清理目录中“失联”的缓存文件 // “失联”为不在HashMap中的文件 func (this *FileStorage) ScanGarbageCaches(fileCallback func(path string) error) error { - if !this.list.(*SQLiteFileList).HashMapIsLoaded() { + _, isSQLite := this.list.(*SQLiteFileList) + if isSQLite && !this.list.(*SQLiteFileList).HashMapIsLoaded() { return errors.New("cache list is loading") } @@ -1641,8 +1655,8 @@ func (this *FileStorage) ScanGarbageCaches(fileCallback func(path string) error) continue } - dir2Matches, err := filepath.Glob(dir1 + "/*") - if err != nil { + dir2Matches, globErr := filepath.Glob(dir1 + "/*") + if globErr != nil { // ignore error continue } @@ -1665,8 +1679,8 @@ func (this *FileStorage) ScanGarbageCaches(fileCallback func(path string) error) } } - fileMatches, err := filepath.Glob(dir2 + "/*.cache") - if err != nil { + fileMatches, globDir2Err := filepath.Glob(dir2 + "/*.cache") + if globDir2Err != nil { // ignore error continue } @@ -1678,7 +1692,19 @@ func (this *FileStorage) ScanGarbageCaches(fileCallback func(path string) error) continue } - isReady, found := this.list.(*SQLiteFileList).ExistQuick(hash) + var isReady, found bool + switch rawList := this.list.(type) { + case *SQLiteFileList: + isReady, found = rawList.ExistQuick(hash) + case *KVFileList: + isReady = true + var checkErr error + found, checkErr = rawList.ExistQuick(hash) + if checkErr != nil { + return checkErr + } + } + if !isReady { continue } @@ -1688,8 +1714,8 @@ func (this *FileStorage) ScanGarbageCaches(fileCallback func(path string) error) } // 检查文件正在被写入 - stat, err := os.Stat(file) - if err != nil { + stat, statErr := os.Stat(file) + if statErr != nil { continue } if fasttime.Now().Unix()-stat.ModTime().Unix() < 300 /** 5 minutes **/ { @@ -1698,9 +1724,9 @@ func (this *FileStorage) ScanGarbageCaches(fileCallback func(path string) error) if fileCallback != nil { countFound++ - err = fileCallback(file) - if err != nil { - return err + callbackErr := fileCallback(file) + if callbackErr != nil { + return callbackErr } } } diff --git a/internal/const/const.go b/internal/const/const.go index 009ea98..361eb04 100644 --- a/internal/const/const.go +++ b/internal/const/const.go @@ -1,7 +1,7 @@ package teaconst const ( - Version = "1.3.3.1" + Version = "1.3.4" ProductName = "Edge Node" ProcessName = "edge-node" @@ -16,4 +16,6 @@ const ( AccessLogSockName = "edge-node.accesslog" CacheGarbageSockName = "edge-node.cache.garbage" + + EnableKVCacheStore = false // determine store cache keys in KVStore or sqlite ) diff --git a/internal/utils/kvstore/db.go b/internal/utils/kvstore/db.go new file mode 100644 index 0000000..6716af6 --- /dev/null +++ b/internal/utils/kvstore/db.go @@ -0,0 +1,68 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore + +import ( + "errors" + "sync" +) + +type DB struct { + store *Store + + name string + namespace string + tableMap map[string]TableInterface + + locker sync.RWMutex +} + +func NewDB(store *Store, dbName string) (*DB, error) { + if !IsValidName(dbName) { + return nil, errors.New("invalid database name '" + dbName + "'") + } + + return &DB{ + store: store, + name: dbName, + namespace: "$" + dbName + "$", + tableMap: map[string]TableInterface{}, + }, nil +} + +func (this *DB) AddTable(table TableInterface) { + table.SetNamespace([]byte(this.Namespace() + table.Name() + "$")) + table.SetDB(this) + + this.locker.Lock() + defer this.locker.Unlock() + + this.tableMap[table.Name()] = table +} + +func (this *DB) Name() string { + return this.name +} + +func (this *DB) Namespace() string { + return this.namespace +} + +func (this *DB) Store() *Store { + return this.store +} + +func (this *DB) Close() error { + this.locker.Lock() + defer this.locker.Unlock() + + var lastErr error + for _, table := range this.tableMap { + err := table.Close() + if err != nil { + lastErr = err + } + } + + return lastErr +} diff --git a/internal/utils/kvstore/db_test.go b/internal/utils/kvstore/db_test.go new file mode 100644 index 0000000..e2b4682 --- /dev/null +++ b/internal/utils/kvstore/db_test.go @@ -0,0 +1,48 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/utils/kvstore" + "github.com/cockroachdb/pebble" + "testing" +) + +func TestNewDB(t *testing.T) { + store, err := kvstore.OpenStore("test") + if err != nil { + t.Fatal(err) + } + defer func() { + _ = store.Close() + }() + + _, err = store.NewDB("TEST_DB") + if err != nil { + t.Fatal(err) + } + + testingStore = store + testInspectDB(t) +} + +func testInspectDB(t *testing.T) { + if testingStore == nil { + return + } + it, err := testingStore.RawDB().NewIter(&pebble.IterOptions{}) + if err != nil { + t.Fatal(err) + } + defer func() { + _ = it.Close() + }() + + for it.First(); it.Valid(); it.Next() { + valueBytes, valueErr := it.ValueAndErr() + if valueErr != nil { + t.Fatal(valueErr) + } + t.Log(string(it.Key()), "=>", string(valueBytes)) + } +} diff --git a/internal/utils/kvstore/errors.go b/internal/utils/kvstore/errors.go new file mode 100644 index 0000000..60c16be --- /dev/null +++ b/internal/utils/kvstore/errors.go @@ -0,0 +1,18 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore + +import ( + "errors" + "github.com/cockroachdb/pebble" +) + +var ErrTableNotFound = errors.New("table not found") +var ErrKeyTooLong = errors.New("too long key") + +func IsKeyNotFound(err error) bool { + if err == nil { + return false + } + return errors.Is(err, pebble.ErrNotFound) +} diff --git a/internal/utils/kvstore/item.go b/internal/utils/kvstore/item.go new file mode 100644 index 0000000..a6187cc --- /dev/null +++ b/internal/utils/kvstore/item.go @@ -0,0 +1,9 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore + +type Item[T any] struct { + Key string + Value T + FieldKey []byte +} diff --git a/internal/utils/kvstore/iterator_options.go b/internal/utils/kvstore/iterator_options.go new file mode 100644 index 0000000..47d8fe6 --- /dev/null +++ b/internal/utils/kvstore/iterator_options.go @@ -0,0 +1,17 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore + +import "github.com/cockroachdb/pebble" + +type IteratorOptions struct { + LowerBound []byte + UpperBound []byte +} + +func (this *IteratorOptions) RawOptions() *pebble.IterOptions { + return &pebble.IterOptions{ + LowerBound: this.LowerBound, + UpperBound: this.UpperBound, + } +} diff --git a/internal/utils/kvstore/logger.go b/internal/utils/kvstore/logger.go new file mode 100644 index 0000000..caadbed --- /dev/null +++ b/internal/utils/kvstore/logger.go @@ -0,0 +1,17 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore + +type Logger struct { +} + +func NewLogger() *Logger { + return &Logger{} +} + +func (this *Logger) Infof(format string, args ...any) { + +} +func (this *Logger) Fatalf(format string, args ...any) { + +} diff --git a/internal/utils/kvstore/options.go b/internal/utils/kvstore/options.go new file mode 100644 index 0000000..0f9394b --- /dev/null +++ b/internal/utils/kvstore/options.go @@ -0,0 +1,9 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore + +import "github.com/cockroachdb/pebble" + +var DefaultWriteOptions = &pebble.WriteOptions{ + Sync: false, +} diff --git a/internal/utils/kvstore/query.go b/internal/utils/kvstore/query.go new file mode 100644 index 0000000..80c3475 --- /dev/null +++ b/internal/utils/kvstore/query.go @@ -0,0 +1,472 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore + +import ( + "bytes" + "errors" +) + +type DataType = int + +const ( + DataTypeKey DataType = 1 + DataTypeField DataType = 2 +) + +type QueryOperator int + +const ( + QueryOperatorGt QueryOperator = 1 + QueryOperatorGte QueryOperator = 2 + QueryOperatorLt QueryOperator = 3 + QueryOperatorLte QueryOperator = 4 +) + +type QueryOperatorInfo struct { + Operator QueryOperator + Value any +} + +type IteratorFunc[T any] func(tx *Tx[T], item Item[T]) (goNext bool, err error) + +type Query[T any] struct { + table *Table[T] + tx *Tx[T] + + dataType int + offsetKey string + limit int + prefix string + reverse bool + forUpdate bool + + keysOnly bool + + fieldName string + fieldReverse bool + fieldOperators []QueryOperatorInfo + fieldPrefix string + fieldOffsetKey []byte +} + +func NewQuery[T any]() *Query[T] { + return &Query[T]{ + limit: -1, + dataType: DataTypeKey, + } +} + +func (this *Query[T]) SetTable(table *Table[T]) *Query[T] { + this.table = table + return this +} + +func (this *Query[T]) SetTx(tx *Tx[T]) *Query[T] { + this.tx = tx + return this +} + +func (this *Query[T]) ForKey() *Query[T] { + this.dataType = DataTypeKey + return this +} + +func (this *Query[T]) ForField() *Query[T] { + this.dataType = DataTypeField + return this +} + +func (this *Query[T]) Limit(limit int) *Query[T] { + this.limit = limit + return this +} + +func (this *Query[T]) Offset(offsetKey string) *Query[T] { + this.offsetKey = offsetKey + return this +} + +func (this *Query[T]) Prefix(prefix string) *Query[T] { + this.prefix = prefix + return this +} + +func (this *Query[T]) Desc() *Query[T] { + this.reverse = true + return this +} + +func (this *Query[T]) ForUpdate() *Query[T] { + this.forUpdate = true + return this +} + +func (this *Query[T]) KeysOnly() *Query[T] { + this.keysOnly = true + return this +} + +func (this *Query[T]) FieldAsc(fieldName string) *Query[T] { + this.fieldName = fieldName + this.fieldReverse = false + return this +} + +func (this *Query[T]) FieldDesc(fieldName string) *Query[T] { + this.fieldName = fieldName + this.fieldReverse = true + return this +} + +func (this *Query[T]) FieldPrefix(fieldName string, fieldPrefix string) *Query[T] { + this.fieldName = fieldName + this.fieldPrefix = fieldPrefix + return this +} + +func (this *Query[T]) FieldOffset(fieldOffset []byte) *Query[T] { + this.fieldOffsetKey = fieldOffset + return this +} + +//func (this *Query[T]) FieldLt(value any) *Query[T] { +// this.fieldOperators = append(this.fieldOperators, QueryOperatorInfo{ +// Operator: QueryOperatorLt, +// Value: value, +// }) +// return this +//} +// +//func (this *Query[T]) FieldLte(value any) *Query[T] { +// this.fieldOperators = append(this.fieldOperators, QueryOperatorInfo{ +// Operator: QueryOperatorLte, +// Value: value, +// }) +// return this +//} +// +//func (this *Query[T]) FieldGt(value any) *Query[T] { +// this.fieldOperators = append(this.fieldOperators, QueryOperatorInfo{ +// Operator: QueryOperatorGt, +// Value: value, +// }) +// return this +//} +// +//func (this *Query[T]) FieldGte(value any) *Query[T] { +// this.fieldOperators = append(this.fieldOperators, QueryOperatorInfo{ +// Operator: QueryOperatorGte, +// Value: value, +// }) +// return this +//} + +func (this *Query[T]) FindAll(fn IteratorFunc[T]) error { + if this.tx != nil { + defer func() { + _ = this.tx.Close() + }() + + var itErr error + if len(this.fieldName) == 0 { + itErr = this.iterateKeys(fn) + } else { + itErr = this.iterateFields(fn) + } + if itErr != nil { + return itErr + } + return this.tx.Commit() + } + + if this.table != nil { + var txFn func(fn func(tx *Tx[T]) error) error + if this.forUpdate { + txFn = this.table.WriteTx + } else { + txFn = this.table.ReadTx + } + + return txFn(func(tx *Tx[T]) error { + this.tx = tx + + if len(this.fieldName) == 0 { + return this.iterateKeys(fn) + } + return this.iterateFields(fn) + }) + } + + return errors.New("current query require 'table' or 'tx'") +} + +func (this *Query[T]) iterateKeys(fn IteratorFunc[T]) error { + if this.limit == 0 { + return nil + } + + var opt = &IteratorOptions{} + + var prefix []byte + switch this.dataType { + case DataTypeKey: + prefix = append(this.table.Namespace(), KeyPrefix...) + case DataTypeField: + prefix = append(this.table.Namespace(), FieldPrefix...) + default: + prefix = append(this.table.Namespace(), KeyPrefix...) + } + + var prefixLen = len(prefix) + + if len(this.prefix) > 0 { + prefix = append(prefix, this.prefix...) + } + + var offsetKey []byte + if this.reverse { + if len(this.offsetKey) > 0 { + offsetKey = append(prefix, this.offsetKey...) + } else { + offsetKey = append(prefix, 0xFF) + } + + opt.LowerBound = prefix + opt.UpperBound = offsetKey + } else { + if len(this.offsetKey) > 0 { + offsetKey = append(prefix, this.offsetKey...) + } else { + offsetKey = prefix + } + opt.LowerBound = offsetKey + opt.UpperBound = append(offsetKey, 0xFF) + } + + var hasOffsetKey = len(this.offsetKey) > 0 + + it, itErr := this.tx.NewIterator(opt) + if itErr != nil { + return itErr + } + defer func() { + _ = it.Close() + }() + + var count int + + var itemFn = func() (goNext bool, err error) { + var keyBytes = it.Key() + + // skip first offset key + if hasOffsetKey { + hasOffsetKey = false + + if bytes.Equal(keyBytes, offsetKey) { + return true, nil + } + } + + // call fn + var value T + if !this.keysOnly { + valueBytes, valueErr := it.ValueAndErr() + if valueErr != nil { + return false, valueErr + } + value, err = this.table.encoder.Decode(valueBytes) + if err != nil { + return false, err + } + } + + goNext, callbackErr := fn(this.tx, Item[T]{ + Key: string(keyBytes[prefixLen:]), + Value: value, + }) + if callbackErr != nil { + return false, callbackErr + } + if !goNext { + return false, nil + } + + // limit + if this.limit > 0 { + count++ + + if count >= this.limit { + return false, nil + } + } + + return true, nil + } + + if this.reverse { + for it.Last(); it.Valid(); it.Prev() { + goNext, itemErr := itemFn() + if itemErr != nil { + return itemErr + } + if !goNext { + break + } + } + } else { + for it.First(); it.Valid(); it.Next() { + goNext, itemErr := itemFn() + if itemErr != nil { + return itemErr + } + if !goNext { + break + } + } + } + + return nil +} + +func (this *Query[T]) iterateFields(fn IteratorFunc[T]) error { + if this.limit == 0 { + return nil + } + + var hasOffsetKey = len(this.offsetKey) > 0 || len(this.fieldOffsetKey) > 0 + + var opt = &IteratorOptions{} + + var prefix = this.table.FieldKey(this.fieldName) + prefix = append(prefix, '$') + + if len(this.fieldPrefix) > 0 { + prefix = append(prefix, this.fieldPrefix...) + } + + var offsetKey []byte + if this.fieldReverse { + if len(this.fieldOffsetKey) > 0 { + offsetKey = this.fieldOffsetKey + } else if len(this.offsetKey) > 0 { + offsetKey = append(prefix, this.offsetKey...) + } else { + offsetKey = append(prefix, 0xFF) + } + opt.LowerBound = prefix + opt.UpperBound = offsetKey + } else { + if len(this.fieldOffsetKey) > 0 { + offsetKey = this.fieldOffsetKey + } else if len(this.offsetKey) > 0 { + offsetKey = append(prefix, this.offsetKey...) + offsetKey = append(offsetKey, 0) + } else { + offsetKey = prefix + } + + opt.LowerBound = offsetKey + opt.UpperBound = append(prefix, 0xFF) + } + + it, itErr := this.tx.NewIterator(opt) + if itErr != nil { + return itErr + } + defer func() { + _ = it.Close() + }() + + var count int + + var itemFn = func() (goNext bool, err error) { + var fieldKeyBytes = it.Key() + + fieldValueBytes, keyBytes, decodeKeyErr := this.table.DecodeFieldKey(this.fieldName, fieldKeyBytes) + if decodeKeyErr != nil { + return false, decodeKeyErr + } + + // skip first offset key + if hasOffsetKey { + hasOffsetKey = false + + if (len(this.fieldOffsetKey) > 0 && bytes.Equal(fieldKeyBytes, this.fieldOffsetKey)) || + bytes.Equal(fieldValueBytes, []byte(this.offsetKey)) { + return true, nil + } + } + + // 执行operators + if len(this.fieldOperators) > 0 { + if !this.matchOperators(fieldValueBytes) { + return true, nil + } + } + + var resultItem = Item[T]{ + Key: string(keyBytes), + FieldKey: fieldKeyBytes, + } + if !this.keysOnly { + value, getErr := this.table.getWithKeyBytes(this.tx, this.table.FullKeyBytes(keyBytes)) + if getErr != nil { + if IsKeyNotFound(getErr) { + return true, nil + } + return false, getErr + } + + resultItem.Value = value + } + + goNext, err = fn(this.tx, resultItem) + if err != nil { + return + } + if !goNext { + return false, nil + } + + // limit + if this.limit > 0 { + count++ + + if count >= this.limit { + return false, nil + } + } + + return true, nil + } + + if this.reverse { + for it.Last(); it.Valid(); it.Prev() { + goNext, itemErr := itemFn() + if itemErr != nil { + return itemErr + } + if !goNext { + break + } + } + } else { + for it.First(); it.Valid(); it.Next() { + goNext, itemErr := itemFn() + if itemErr != nil { + return itemErr + } + if !goNext { + break + } + } + } + + return nil +} + +func (this *Query[T]) matchOperators(fieldValueBytes []byte) bool { + // TODO + return true +} diff --git a/internal/utils/kvstore/query_test.go b/internal/utils/kvstore/query_test.go new file mode 100644 index 0000000..814a3c9 --- /dev/null +++ b/internal/utils/kvstore/query_test.go @@ -0,0 +1,240 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore_test + +import ( + "fmt" + "github.com/TeaOSLab/EdgeNode/internal/utils/kvstore" + "runtime" + "testing" + "time" +) + +func TestQuery_FindAll(t *testing.T) { + var table = testOpenStoreTable[*testCachedItem](t, "cache_items", &testCacheItemEncoder[*testCachedItem]{}) + + var before = time.Now() + defer func() { + t.Log("cost:", time.Since(before).Seconds()*1000, "ms") + }() + + err := table. + Query(). + Limit(10). + //Offset("a1000"). + //Desc(). + FindAll(func(tx *kvstore.Tx[*testCachedItem], item kvstore.Item[*testCachedItem]) (goNext bool, err error) { + t.Log("key:", item.Key, "value:", item.Value) + return true, nil + }) + if err != nil { + t.Fatal(err) + } +} + +func TestQuery_FindAll_Break(t *testing.T) { + var table = testOpenStoreTable[*testCachedItem](t, "cache_items", &testCacheItemEncoder[*testCachedItem]{}) + + var before = time.Now() + defer func() { + t.Log("cost:", time.Since(before).Seconds()*1000, "ms") + }() + + var count int + err := table. + Query(). + FindAll(func(tx *kvstore.Tx[*testCachedItem], item kvstore.Item[*testCachedItem]) (goNext bool, err error) { + t.Log("key:", item.Key, "value:", item.Value) + count++ + return count < 3, nil + }) + if err != nil { + t.Fatal(err) + } +} + +func TestQuery_FindAll_Desc(t *testing.T) { + var table = testOpenStoreTable[*testCachedItem](t, "cache_items", &testCacheItemEncoder[*testCachedItem]{}) + + err := table.Query(). + Desc(). + Limit(10). + FindAll(func(tx *kvstore.Tx[*testCachedItem], item kvstore.Item[*testCachedItem]) (goNext bool, err error) { + t.Log("key:", item.Key, "value:", item.Value) + return true, nil + }) + if err != nil { + t.Fatal(err) + } +} + +func TestQuery_FindAll_Offset(t *testing.T) { + var table = testOpenStoreTable[*testCachedItem](t, "cache_items", &testCacheItemEncoder[*testCachedItem]{}) + + { + t.Log("=== forward ===") + err := table.Query(). + Offset("a3"). + Limit(10). + FindAll(func(tx *kvstore.Tx[*testCachedItem], item kvstore.Item[*testCachedItem]) (goNext bool, err error) { + t.Log("key:", item.Key, "value:", item.Value) + return true, nil + }) + if err != nil { + t.Fatal(err) + } + } + + { + t.Log("=== backward ===") + err := table.Query(). + Desc(). + Offset("a3"). + Limit(10). + //KeyOnly(). + FindAll(func(tx *kvstore.Tx[*testCachedItem], item kvstore.Item[*testCachedItem]) (goNext bool, err error) { + t.Log("key:", item.Key, "value:", item.Value) + return true, nil + }) + if err != nil { + t.Fatal(err) + } + } +} + +func TestQuery_FindAll_Count(t *testing.T) { + var table = testOpenStoreTable[*testCachedItem](t, "cache_items", &testCacheItemEncoder[*testCachedItem]{}) + + var count int + + var before = time.Now() + defer func() { + var costSeconds = time.Since(before).Seconds() + t.Log("cost:", costSeconds*1000, "ms", "qps:", fmt.Sprintf("%.2fM/s", float64(count)/costSeconds/1_000_000)) + }() + + err := table. + Query(). + KeysOnly(). + FindAll(func(tx *kvstore.Tx[*testCachedItem], item kvstore.Item[*testCachedItem]) (goNext bool, err error) { + count++ + return true, nil + }) + if err != nil { + t.Fatal(err) + } + + t.Log("count:", count) +} + +func TestQuery_FindAll_Field(t *testing.T) { + var table = testOpenStoreTable[*testCachedItem](t, "cache_items", &testCacheItemEncoder[*testCachedItem]{}) + + var before = time.Now() + defer func() { + var costSeconds = time.Since(before).Seconds() + t.Log("cost:", costSeconds*1000, "ms", "qps:", int(1/costSeconds)) + }() + + var lastFieldKey []byte + + t.Log("=======") + { + err := table. + Query(). + FieldAsc("expiresAt"). + //KeysOnly(). + //FieldLt(1710848959). + Limit(3). + FindAll(func(tx *kvstore.Tx[*testCachedItem], item kvstore.Item[*testCachedItem]) (goNext bool, err error) { + t.Log(item.Key, "=>", item.Value) + lastFieldKey = item.FieldKey + return true, nil + }) + if err != nil { + t.Fatal(err) + } + + } + + t.Log("=======") + { + err := table. + Query(). + FieldAsc("expiresAt"). + //KeysOnly(). + //FieldLt(1710848959). + FieldOffset(lastFieldKey). + Limit(3). + FindAll(func(tx *kvstore.Tx[*testCachedItem], item kvstore.Item[*testCachedItem]) (goNext bool, err error) { + t.Log(item.Key, "=>", item.Value) + lastFieldKey = item.FieldKey + return true, nil + }) + if err != nil { + t.Fatal(err) + } + } +} + +func TestQuery_FindAll_Field_Many(t *testing.T) { + var table = testOpenStoreTable[*testCachedItem](t, "cache_items", &testCacheItemEncoder[*testCachedItem]{}) + + var before = time.Now() + defer func() { + var costSeconds = time.Since(before).Seconds() + t.Log("cost:", costSeconds*1000, "ms", "qps:", int(1/costSeconds)) + }() + + err := table. + Query(). + FieldAsc("expiresAt"). + KeysOnly(). + Limit(1000). + FindAll(func(tx *kvstore.Tx[*testCachedItem], item kvstore.Item[*testCachedItem]) (goNext bool, err error) { + t.Log(item.Key, "=>", item.Value) + return true, nil + }) + if err != nil { + t.Fatal(err) + } +} + +func BenchmarkQuery_FindAll(b *testing.B) { + runtime.GOMAXPROCS(4) + + store, err := kvstore.OpenStore("test") + if err != nil { + b.Fatal(err) + } + defer func() { + _ = store.Close() + }() + + db, err := store.NewDB("db1") + if err != nil { + b.Fatal(err) + } + + table, err := kvstore.NewTable[*testCachedItem]("cache_items", &testCacheItemEncoder[*testCachedItem]{}) + if err != nil { + b.Fatal(err) + } + + db.AddTable(table) + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + err = table.Query(). + //Limit(100). + FindAll(func(tx *kvstore.Tx[*testCachedItem], item kvstore.Item[*testCachedItem]) (goNext bool, err error) { + return true, nil + }) + if err != nil { + b.Fatal(err) + } + } + }) +} diff --git a/internal/utils/kvstore/store.go b/internal/utils/kvstore/store.go new file mode 100644 index 0000000..db40236 --- /dev/null +++ b/internal/utils/kvstore/store.go @@ -0,0 +1,165 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore + +import ( + "errors" + "fmt" + "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/cockroachdb/pebble" + "github.com/iwind/TeaGo/Tea" + "io" + "os" + "strings" + "sync" +) + +const StoreSuffix = ".store" + +type Store struct { + name string + + path string + rawDB *pebble.DB + + isClosed bool + + dbs []*DB + + locker sync.Mutex +} + +// NewStore create store with name +func NewStore(storeName string) (*Store, error) { + if !IsValidName(storeName) { + return nil, errors.New("invalid store name '" + storeName + "'") + } + + var root = Tea.Root + "/data/stores" + _, err := os.Stat(root) + if err != nil && os.IsNotExist(err) { + _ = os.MkdirAll(root, 0777) + } + + return &Store{ + name: storeName, + path: Tea.Root + "/data/stores/" + storeName + StoreSuffix, + }, nil +} + +func OpenStore(storeName string) (*Store, error) { + store, err := NewStore(storeName) + if err != nil { + return nil, err + } + err = store.Open() + if err != nil { + return nil, err + } + + return store, nil +} + +func OpenStoreDir(dir string, storeName string) (*Store, error) { + if !IsValidName(storeName) { + return nil, errors.New("invalid store name '" + storeName + "'") + } + + _, err := os.Stat(dir) + if err != nil && os.IsNotExist(err) { + _ = os.MkdirAll(dir, 0777) + } + + dir = strings.TrimSuffix(dir, "/") + + var store = &Store{ + name: storeName, + path: dir + "/" + storeName + StoreSuffix, + } + + err = store.Open() + if err != nil { + return nil, err + } + return store, nil +} + +func (this *Store) Open() error { + var opt = &pebble.Options{ + Logger: NewLogger(), + } + + // TODO 需要修改 BytesPerSync 和 WALBytesPerSync 等等默认参数 + + rawDB, err := pebble.Open(this.path, opt) + if err != nil { + return err + } + this.rawDB = rawDB + + // events + events.OnKey(events.EventQuit, fmt.Sprintf("kvstore_%p", this), func() { + _ = this.Close() + }) + events.OnKey(events.EventTerminated, fmt.Sprintf("kvstore_%p", this), func() { + _ = this.Close() + }) + + return nil +} + +func (this *Store) Set(keyBytes []byte, valueBytes []byte) error { + return this.rawDB.Set(keyBytes, valueBytes, DefaultWriteOptions) +} + +func (this *Store) Get(keyBytes []byte) (valueBytes []byte, closer io.Closer, err error) { + return this.rawDB.Get(keyBytes) +} + +func (this *Store) Delete(keyBytes []byte) error { + return this.rawDB.Delete(keyBytes, DefaultWriteOptions) +} + +func (this *Store) NewDB(dbName string) (*DB, error) { + db, err := NewDB(this, dbName) + if err != nil { + return nil, err + } + + this.locker.Lock() + defer this.locker.Unlock() + + this.dbs = append(this.dbs, db) + return db, nil +} + +func (this *Store) RawDB() *pebble.DB { + return this.rawDB +} + +func (this *Store) Close() error { + if this.isClosed { + return nil + } + + this.locker.Lock() + var lastErr error + for _, db := range this.dbs { + err := db.Close() + if err != nil { + lastErr = err + } + } + + this.locker.Unlock() + + if this.rawDB != nil { + this.isClosed = true + return this.rawDB.Close() + } + return lastErr +} + +func (this *Store) IsClosed() bool { + return this.isClosed +} diff --git a/internal/utils/kvstore/store_test.go b/internal/utils/kvstore/store_test.go new file mode 100644 index 0000000..a35a314 --- /dev/null +++ b/internal/utils/kvstore/store_test.go @@ -0,0 +1,162 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/utils/kvstore" + "github.com/cockroachdb/pebble" + "github.com/iwind/TeaGo/Tea" + _ "github.com/iwind/TeaGo/bootstrap" + "testing" + "time" +) + +func TestMain(m *testing.M) { + m.Run() + + if testingStore != nil { + _ = testingStore.Close() + } +} + +func TestStore_Open(t *testing.T) { + store, err := kvstore.OpenStore("test") + if err != nil { + t.Fatal(err) + } + defer func() { + _ = store.Close() + }() + + t.Log("opened") +} + +func TestStore_RawDB(t *testing.T) { + store, err := kvstore.OpenStore("test") + if err != nil { + t.Fatal(err) + } + defer func() { + _ = store.Close() + }() + + err = store.RawDB().Set([]byte("hello"), []byte("world"), nil) + if err != nil { + t.Fatal(err) + } +} + +func TestOpenStoreDir(t *testing.T) { + store, err := kvstore.OpenStoreDir(Tea.Root+"/data/stores", "test3") + if err != nil { + t.Fatal(err) + } + defer func() { + _ = store.Close() + }() + + t.Log("opened") + + _ = store +} + +func TestStore_CloseTwice(t *testing.T) { + store, err := kvstore.OpenStore("test") + if err != nil { + t.Fatal(err) + } + defer func() { + for i := 0; i < 3; i++ { + err = store.Close() + if err != nil { + t.Fatal(err) + } + } + }() +} + +func TestStore_Count(t *testing.T) { + testCountStore(t) +} + +var testingStore *kvstore.Store + +func testOpenStore(t *testing.T) *kvstore.DB { + var err error + testingStore, err = kvstore.OpenStore("test") + if err != nil { + t.Fatal(err) + } + + db, err := testingStore.NewDB("db1") + if err != nil { + t.Fatal(err) + } + + return db +} + +func testOpenStoreTable[T any](t *testing.T, tableName string, encoder kvstore.ValueEncoder[T]) *kvstore.Table[T] { + var err error + + var before = time.Now() + testingStore, err = kvstore.OpenStore("test") + if err != nil { + t.Fatal(err) + } + t.Log("store open cost:", time.Since(before).Seconds()*1000, "ms") + + db, err := testingStore.NewDB("db1") + if err != nil { + t.Fatal(err) + } + + table, err := kvstore.NewTable[T](tableName, encoder) + if err != nil { + t.Fatal(err) + } + db.AddTable(table) + + return table +} + +func testOpenStoreTableForBenchmark[T any](t *testing.B, tableName string, encoder kvstore.ValueEncoder[T]) *kvstore.Table[T] { + var err error + testingStore, err = kvstore.OpenStore("test") + if err != nil { + t.Fatal(err) + } + + db, err := testingStore.NewDB("db1") + if err != nil { + t.Fatal(err) + } + + table, err := kvstore.NewTable[T](tableName, encoder) + if err != nil { + t.Fatal(err) + } + db.AddTable(table) + + return table +} + +func testCountStore(t *testing.T) { + var err error + testingStore, err = kvstore.OpenStore("test") + if err != nil { + t.Fatal(err) + } + var count int + it, err := testingStore.RawDB().NewIter(&pebble.IterOptions{}) + if err != nil { + t.Fatal(err) + } + defer func() { + _ = it.Close() + }() + for it.First(); it.Valid(); it.Next() { + count++ + } + t.Log("count:", count) +} diff --git a/internal/utils/kvstore/table.go b/internal/utils/kvstore/table.go new file mode 100644 index 0000000..bf69ec7 --- /dev/null +++ b/internal/utils/kvstore/table.go @@ -0,0 +1,379 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore + +import ( + "bytes" + "encoding/binary" + "errors" + "github.com/cockroachdb/pebble" + "github.com/iwind/TeaGo/types" + "sync" +) + +const ( + KeyPrefix = "K$" + KeyMaxLength = 8 << 10 + + FieldPrefix = "F$" + + MaxBatchKeys = 8 << 10 // TODO not implemented +) + +type Table[T any] struct { + name string + rawNamespace []byte + db *DB + encoder ValueEncoder[T] + fieldNames []string + + mu *sync.RWMutex +} + +func NewTable[T any](tableName string, encoder ValueEncoder[T]) (*Table[T], error) { + if !IsValidName(tableName) { + return nil, errors.New("invalid table name '" + tableName + "'") + } + + return &Table[T]{ + name: tableName, + encoder: encoder, + mu: &sync.RWMutex{}, + }, nil +} + +func (this *Table[T]) Name() string { + return this.name +} + +func (this *Table[T]) Namespace() []byte { + var dest = make([]byte, len(this.rawNamespace)) + copy(dest, this.rawNamespace) + return dest +} + +func (this *Table[T]) SetNamespace(namespace []byte) { + this.rawNamespace = namespace +} + +func (this *Table[T]) SetDB(db *DB) { + this.db = db +} + +func (this *Table[T]) Set(key string, value T) error { + if len(key) > KeyMaxLength { + return ErrKeyTooLong + } + + valueBytes, err := this.encoder.Encode(value) + if err != nil { + return err + } + + return this.WriteTx(func(tx *Tx[T]) error { + return this.set(tx, key, valueBytes, value) + }) +} + +// ComposeFieldKey compose field key +// $Namespace$FieldName$FieldValueSeparatorKeyValueFieldLength[2] +func (this *Table[T]) ComposeFieldKey(keyBytes []byte, fieldName string, fieldValueBytes []byte) []byte { + // TODO use 'make()' and 'copy()' to pre-alloc memory space + var b = make([]byte, 2) + binary.BigEndian.PutUint16(b, uint16(len(fieldValueBytes))) + var fieldKey = append(this.FieldKey(fieldName), '$') // namespace + fieldKey = append(fieldKey, fieldValueBytes...) // field value + fieldKey = append(fieldKey, 0, 0) // separator + fieldKey = append(fieldKey, keyBytes...) // key value + fieldKey = append(fieldKey, b...) // field value length + return fieldKey +} + +func (this *Table[T]) Exist(key string) (found bool, err error) { + _, closer, err := this.db.store.rawDB.Get(this.FullKey(key)) + if err != nil { + if IsKeyNotFound(err) { + return false, nil + } + return false, err + } + defer func() { + _ = closer.Close() + }() + + return true, nil +} + +func (this *Table[T]) Get(key string) (value T, err error) { + err = this.ReadTx(func(tx *Tx[T]) error { + resultValue, getErr := this.get(tx, key) + if getErr == nil { + value = resultValue + } + return getErr + }) + + return +} + +func (this *Table[T]) Delete(key ...string) error { + if len(key) == 0 { + return nil + } + + return this.WriteTx(func(tx *Tx[T]) error { + return this.deleteKeys(tx, key...) + }) +} + +func (this *Table[T]) ReadTx(fn func(tx *Tx[T]) error) error { + var tx = NewTx[T](this, true) + defer func() { + _ = tx.Close() + }() + + err := fn(tx) + if err != nil { + return err + } + + return tx.Commit() +} + +func (this *Table[T]) WriteTx(fn func(tx *Tx[T]) error) error { + var tx = NewTx[T](this, false) + defer func() { + _ = tx.Close() + }() + + err := fn(tx) + if err != nil { + return err + } + + return tx.Commit() +} + +func (this *Table[T]) Truncate() error { + this.mu.Lock() + defer this.mu.Unlock() + + return this.db.store.rawDB.DeleteRange(this.Namespace(), append(this.Namespace(), 0xFF), DefaultWriteOptions) +} + +func (this *Table[T]) Query() *Query[T] { + var query = NewQuery[T]() + query.SetTable(this) + return query +} + +func (this *Table[T]) Count() (int64, error) { + var count int64 + + var begin = this.FullKeyBytes(nil) + it, err := this.db.store.rawDB.NewIter(&pebble.IterOptions{ + LowerBound: begin, + UpperBound: append(begin, 0xFF), + }) + if err != nil { + return 0, err + } + defer func() { + _ = it.Close() + }() + + for it.First(); it.Valid(); it.Next() { + count++ + } + + return count, err +} + +func (this *Table[T]) FullKey(realKey string) []byte { + return append(this.Namespace(), KeyPrefix+realKey...) +} + +func (this *Table[T]) FullKeyBytes(realKeyBytes []byte) []byte { + var k = append(this.Namespace(), KeyPrefix...) + k = append(k, realKeyBytes...) + return k +} + +func (this *Table[T]) FieldKey(fieldName string) []byte { + var data = append(this.Namespace(), FieldPrefix...) + data = append(data, fieldName...) + return data +} + +func (this *Table[T]) DecodeFieldKey(fieldName string, fieldKey []byte) (fieldValue []byte, key []byte, err error) { + var l = len(fieldKey) + var baseLen = len(this.FieldKey(fieldName)) + 1 /** $ **/ + 2 /** separator length **/ + 2 /** field length **/ + if l < baseLen { + err = errors.New("invalid field key") + return + } + + var fieldValueLen = binary.BigEndian.Uint16(fieldKey[l-2:]) + var data = fieldKey[baseLen-4 : l-2] + + fieldValue = data[:fieldValueLen] + key = data[fieldValueLen+2: /** separator length **/] + + return +} + +func (this *Table[T]) Close() error { + return nil +} + +func (this *Table[T]) deleteKeys(tx *Tx[T], key ...string) error { + var batch = tx.batch + + for _, singleKey := range key { + var keyErr = func(singleKey string) error { + var keyBytes = this.FullKey(singleKey) + + // delete field values + if len(this.fieldNames) > 0 { + valueBytes, closer, getErr := batch.Get(keyBytes) + if getErr != nil { + if IsKeyNotFound(getErr) { + return nil + } + return getErr + } + defer func() { + _ = closer.Close() + }() + + value, decodeErr := this.encoder.Decode(valueBytes) + if decodeErr != nil { + return decodeErr + } + + for _, fieldName := range this.fieldNames { + fieldValueBytes, fieldErr := this.encoder.EncodeField(value, fieldName) + if fieldErr != nil { + return fieldErr + } + + deleteKeyErr := batch.Delete(this.ComposeFieldKey([]byte(singleKey), fieldName, fieldValueBytes), DefaultWriteOptions) + if deleteKeyErr != nil { + return deleteKeyErr + } + } + } + + err := batch.Delete(keyBytes, DefaultWriteOptions) + if err != nil { + return err + } + + return nil + }(singleKey) + if keyErr != nil { + return keyErr + } + } + + return nil +} + +func (this *Table[T]) set(tx *Tx[T], key string, valueBytes []byte, value T) error { + var keyBytes = this.FullKey(key) + + var batch = tx.batch + + // read old value + var oldValue T + var oldFound bool + var countFields = len(this.fieldNames) + if countFields > 0 { + oldValueBytes, closer, getErr := batch.Get(keyBytes) + if getErr != nil { + if !IsKeyNotFound(getErr) { + return getErr + } + } else { + defer func() { + _ = closer.Close() + }() + + var decodeErr error + oldValue, decodeErr = this.encoder.Decode(oldValueBytes) + if decodeErr != nil { + return decodeErr + } + oldFound = true + } + } + + setErr := batch.Set(keyBytes, valueBytes, DefaultWriteOptions) + if setErr != nil { + return setErr + } + + // process fields + if countFields > 0 { + // add new field keys + for _, fieldName := range this.fieldNames { + // 把EncodeField放在TX里,是为了节约内存 + fieldValueBytes, fieldErr := this.encoder.EncodeField(value, fieldName) + if fieldErr != nil { + return fieldErr + } + + if len(fieldValueBytes) > 8<<10 { + return errors.New("field value too long: " + types.String(len(fieldValueBytes))) + } + + var newFieldKeyBytes = this.ComposeFieldKey([]byte(key), fieldName, fieldValueBytes) + + // delete old field key + if oldFound { + oldFieldValueBytes, oldFieldErr := this.encoder.EncodeField(oldValue, fieldName) + if oldFieldErr != nil { + return oldFieldErr + } + var oldFieldKeyBytes = this.ComposeFieldKey([]byte(key), fieldName, oldFieldValueBytes) + if bytes.Equal(oldFieldKeyBytes, newFieldKeyBytes) { + // skip the field + continue + } + deleteFieldErr := batch.Delete(oldFieldKeyBytes, DefaultWriteOptions) + if deleteFieldErr != nil { + return deleteFieldErr + } + } + + // set new field key + setFieldErr := batch.Set(newFieldKeyBytes, nil, DefaultWriteOptions) + if setFieldErr != nil { + return setFieldErr + } + } + } + + return nil +} + +func (this *Table[T]) get(tx *Tx[T], key string) (value T, err error) { + return this.getWithKeyBytes(tx, this.FullKey(key)) +} + +func (this *Table[T]) getWithKeyBytes(tx *Tx[T], keyBytes []byte) (value T, err error) { + valueBytes, closer, err := tx.batch.Get(keyBytes) + if err != nil { + return value, err + } + defer func() { + _ = closer.Close() + }() + + resultValue, decodeErr := this.encoder.Decode(valueBytes) + if decodeErr != nil { + return value, decodeErr + } + value = resultValue + return +} diff --git a/internal/utils/kvstore/table_counter.go b/internal/utils/kvstore/table_counter.go new file mode 100644 index 0000000..ffd44ca --- /dev/null +++ b/internal/utils/kvstore/table_counter.go @@ -0,0 +1,33 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore + +type CounterTable[T int64 | uint64] struct { + *Table[T] +} + +func NewCounterTable[T int64 | uint64](name string) (*CounterTable[T], error) { + table, err := NewTable[T](name, NewIntValueEncoder[T]()) + if err != nil { + return nil, err + } + + return &CounterTable[T]{ + Table: table, + }, nil +} + +func (this *CounterTable[T]) Increase(key string, delta T) (newValue T, err error) { + err = this.Table.WriteTx(func(tx *Tx[T]) error { + value, getErr := tx.Get(key) + if getErr != nil { + if !IsKeyNotFound(getErr) { + return getErr + } + } + + newValue = value + delta + return tx.Set(key, newValue) + }) + return +} diff --git a/internal/utils/kvstore/table_counter_test.go b/internal/utils/kvstore/table_counter_test.go new file mode 100644 index 0000000..d187a92 --- /dev/null +++ b/internal/utils/kvstore/table_counter_test.go @@ -0,0 +1,80 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/utils/kvstore" + "runtime" + "testing" +) + +func TestCounterTable_Increase(t *testing.T) { + store, err := kvstore.OpenStore("test") + if err != nil { + t.Fatal(err) + } + defer func() { + _ = store.Close() + }() + + db, err := store.NewDB("TEST_DB") + if err != nil { + t.Fatal(err) + } + + table, err := kvstore.NewCounterTable[uint64]("users_counter") + if err != nil { + t.Fatal(err) + } + + db.AddTable(table) + + count, err := table.Increase("counter", 1) + if err != nil { + t.Fatal(err) + } + t.Log(count) +} + +func BenchmarkCounterTable_Increase(b *testing.B) { + runtime.GOMAXPROCS(1) + + store, err := kvstore.OpenStore("test") + if err != nil { + b.Fatal(err) + } + defer func() { + _ = store.Close() + }() + + db, err := store.NewDB("TEST_DB") + if err != nil { + b.Fatal(err) + } + + table, err := kvstore.NewCounterTable[uint64]("users_counter") + if err != nil { + b.Fatal(err) + } + + db.AddTable(table) + + defer func() { + count, incrErr := table.Increase("counter", 1) + if incrErr != nil { + b.Fatal(incrErr) + } + b.Log(count) + }() + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, incrErr := table.Increase("counter", 1) + if incrErr != nil { + b.Fatal(incrErr) + } + } + }) +} diff --git a/internal/utils/kvstore/table_field.go b/internal/utils/kvstore/table_field.go new file mode 100644 index 0000000..d784420 --- /dev/null +++ b/internal/utils/kvstore/table_field.go @@ -0,0 +1,41 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore + +import ( + "errors" +) + +func (this *Table[T]) AddField(fieldName string) error { + if !IsValidName(fieldName) { + return errors.New("invalid field name '" + fieldName + "'") + } + + // check existence + for _, field := range this.fieldNames { + if field == fieldName { + return nil + } + } + + this.fieldNames = append(this.fieldNames, fieldName) + return nil +} + +func (this *Table[T]) AddFields(fieldName ...string) error { + for _, subFieldName := range fieldName { + err := this.AddField(subFieldName) + if err != nil { + return err + } + } + return nil +} + +func (this *Table[T]) DropField(fieldName string) error { + this.mu.Lock() + defer this.mu.Unlock() + + var start = this.FieldKey(fieldName + "$") + return this.db.store.rawDB.DeleteRange(start, append(start, 0xFF), DefaultWriteOptions) +} diff --git a/internal/utils/kvstore/table_field_test.go b/internal/utils/kvstore/table_field_test.go new file mode 100644 index 0000000..f1adc44 --- /dev/null +++ b/internal/utils/kvstore/table_field_test.go @@ -0,0 +1,251 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore_test + +import ( + "encoding/binary" + "errors" + "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" + "github.com/TeaOSLab/EdgeNode/internal/utils/kvstore" + "github.com/TeaOSLab/EdgeNode/internal/utils/testutils" + "strconv" + "testing" + "time" +) + +type testCachedItem struct { + Hash string `json:"1"` // as key + URL string `json:"2"` + ExpiresAt int64 `json:"3"` + Tag string `json:"tag"` + HeaderSize int64 `json:"headerSize"` + BodySize int64 `json:"bodySize"` + MetaSize int `json:"metaSize"` + StaleAt int64 `json:"staleAt"` + CreatedAt int64 `json:"createdAt"` + Host string `json:"host"` + ServerId int64 `json:"serverId"` +} + +type testCacheItemEncoder[T interface{ *testCachedItem }] struct { + kvstore.BaseObjectEncoder[T] +} + +func (this *testCacheItemEncoder[T]) EncodeField(value T, fieldName string) ([]byte, error) { + switch fieldName { + case "expiresAt": + var b = make([]byte, 4) + binary.BigEndian.PutUint32(b, uint32(any(value).(*testCachedItem).ExpiresAt)) + return b, nil + case "staleAt": + var b = make([]byte, 4) + binary.BigEndian.PutUint32(b, uint32(any(value).(*testCachedItem).StaleAt)) + return b, nil + case "url": + return []byte(any(value).(*testCachedItem).URL), nil + } + return nil, errors.New("EncodeField: invalid field name '" + fieldName + "'") +} + +func TestTable_AddField(t *testing.T) { + var table = testOpenStoreTable[*testCachedItem](t, "cache_items", &testCacheItemEncoder[*testCachedItem]{}) + + err := table.AddFields("expiresAt") + if err != nil { + t.Fatal(err) + } + + var before = time.Now() + for _, item := range []*testCachedItem{ + { + Hash: "a1", + URL: "https://example.com/a1", + ExpiresAt: 1710832067, + }, + { + Hash: "a5", + URL: "https://example.com/a5", + ExpiresAt: time.Now().Unix() + 7200, + }, + { + Hash: "a4", + URL: "https://example.com/a4", + ExpiresAt: time.Now().Unix() + 86400, + }, + { + Hash: "a3", + URL: "https://example.com/a3", + ExpiresAt: time.Now().Unix() + 1800, + }, + { + Hash: "a2", + URL: "https://example.com/a2", + ExpiresAt: time.Now().Unix() + 365*86400, + }, + } { + err = table.Set(item.Hash, item) + if err != nil { + t.Fatal(err) + } + } + + t.Log("set cost:", time.Since(before).Seconds()*1000, "ms") + + testInspectDB(t) +} + +func TestTable_AddField_Many(t *testing.T) { + if !testutils.IsSingleTesting() { + return + } + + //runtime.GOMAXPROCS(1) + + var table = testOpenStoreTable[*testCachedItem](t, "cache_items", &testCacheItemEncoder[*testCachedItem]{}) + + { + err := table.AddFields("expiresAt") + if err != nil { + t.Fatal(err) + } + } + + { + err := table.AddFields("staleAt") + if err != nil { + t.Fatal(err) + } + } + + { + err := table.AddFields("url") + if err != nil { + t.Fatal(err) + } + } + + var before = time.Now() + const from = 0 + const count = 4_000_000 + + defer func() { + var costSeconds = time.Since(before).Seconds() + t.Log("cost:", costSeconds*1000, "ms", "qps:", int64(float64(count)/costSeconds)) + }() + + for i := from; i < from+count; i++ { + var item = &testCachedItem{ + Hash: "a" + strconv.Itoa(i), + URL: "https://example.com/a" + strconv.Itoa(i), + ExpiresAt: 1710832067 + int64(i), + StaleAt: fasttime.Now().Unix() + int64(i), + CreatedAt: fasttime.Now().Unix(), + } + err := table.Set(item.Hash, item) + if err != nil { + t.Fatal(err) + } + } +} + +func TestTable_AddField_Delete_Many(t *testing.T) { + if !testutils.IsSingleTesting() { + return + } + + //runtime.GOMAXPROCS(1) + + var table = testOpenStoreTable[*testCachedItem](t, "cache_items", &testCacheItemEncoder[*testCachedItem]{}) + + { + err := table.AddFields("expiresAt") + if err != nil { + t.Fatal(err) + } + } + + { + err := table.AddFields("staleAt") + if err != nil { + t.Fatal(err) + } + } + + { + err := table.AddFields("url") + if err != nil { + t.Fatal(err) + } + } + + var before = time.Now() + const from = 0 + const count = 1_000_000 + + for i := from; i < from+count; i++ { + var item = &testCachedItem{ + Hash: "a" + strconv.Itoa(i), + } + err := table.Delete(item.Hash) + if err != nil { + t.Fatal(err) + } + } + + var costSeconds = time.Since(before).Seconds() + t.Log("cost:", costSeconds*1000, "ms", "qps:", int64(float64(count)/costSeconds)) + + countLeft, err := table.Count() + if err != nil { + t.Fatal(err) + } + t.Log("left:", countLeft) +} + +func TestTable_DropField(t *testing.T) { + var table = testOpenStoreTable[*testCachedItem](t, "cache_items", &testCacheItemEncoder[*testCachedItem]{}) + + var before = time.Now() + defer func() { + var costSeconds = time.Since(before).Seconds() + t.Log("cost:", costSeconds*1000, "ms") + }() + + err := table.DropField("expiresAt") + if err != nil { + t.Fatal(err) + } +} + +/**func TestTable_DeleteFieldValue(t *testing.T) { + var table = testOpenStoreTable[*testCachedItem](t, "cache_items", &testCacheItemEncoder[*testCachedItem]{}) + err := table.AddField("expiresAt") + if err != nil { + t.Fatal(err) + } + + var before = time.Now() + defer func() { + var costSeconds = time.Since(before).Seconds() + t.Log("cost:", costSeconds*1000, "ms") + }() + + err = table.Delete("a2") + if err != nil { + t.Fatal(err) + } + + testInspectDB(t) +} +**/ + +func TestTable_Inspect(t *testing.T) { + var table = testOpenStoreTable[*testCachedItem](t, "cache_items", &testCacheItemEncoder[*testCachedItem]{}) + + err := table.AddFields("expiresAt") + if err != nil { + t.Fatal(err) + } + + testInspectDB(t) +} diff --git a/internal/utils/kvstore/table_interface.go b/internal/utils/kvstore/table_interface.go new file mode 100644 index 0000000..fd486e5 --- /dev/null +++ b/internal/utils/kvstore/table_interface.go @@ -0,0 +1,10 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore + +type TableInterface interface { + Name() string + SetNamespace(namespace []byte) + SetDB(db *DB) + Close() error +} diff --git a/internal/utils/kvstore/table_test.go b/internal/utils/kvstore/table_test.go new file mode 100644 index 0000000..81a05f4 --- /dev/null +++ b/internal/utils/kvstore/table_test.go @@ -0,0 +1,403 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore_test + +import ( + "fmt" + "github.com/TeaOSLab/EdgeNode/internal/utils/kvstore" + "github.com/TeaOSLab/EdgeNode/internal/utils/testutils" + "github.com/iwind/TeaGo/assert" + "github.com/iwind/TeaGo/types" + "math/rand" + "runtime" + "strconv" + "testing" + "time" +) + +func TestTable_Set(t *testing.T) { + store, err := kvstore.OpenStore("test") + if err != nil { + t.Fatal(err) + } + defer func() { + _ = store.Close() + }() + + db, err := store.NewDB("TEST_DB") + if err != nil { + t.Fatal(err) + } + + table, err := kvstore.NewTable[string]("users", kvstore.NewStringValueEncoder[string]()) + if err != nil { + t.Fatal(err) + } + + db.AddTable(table) + + const originValue = "b12345" + + err = table.Set("a", originValue) + if err != nil { + t.Fatal(err) + } + + value, err := table.Get("a") + if err != nil { + if kvstore.IsKeyNotFound(err) { + t.Log("not found key") + return + } + t.Fatal(err) + } + t.Log("value:", value) + + var a = assert.NewAssertion(t) + a.IsTrue(originValue == value) +} + +func TestTable_Get(t *testing.T) { + store, err := kvstore.OpenStore("test") + if err != nil { + t.Fatal(err) + } + defer func() { + _ = store.Close() + }() + + db, err := store.NewDB("TEST_DB") + if err != nil { + t.Fatal(err) + } + + table, err := kvstore.NewTable[string]("users", kvstore.NewStringValueEncoder[string]()) + if err != nil { + t.Fatal(err) + } + + db.AddTable(table) + + for _, key := range []string{"a", "b", "c"} { + value, getErr := table.Get(key) + if getErr != nil { + if kvstore.IsKeyNotFound(getErr) { + t.Log("not found key", key) + continue + } + t.Fatal(getErr) + } + t.Log(key, "=>", "value:", value) + } +} + +func TestTable_Exist(t *testing.T) { + store, err := kvstore.OpenStore("test") + if err != nil { + t.Fatal(err) + } + defer func() { + _ = store.Close() + }() + + db, err := store.NewDB("TEST_DB") + if err != nil { + t.Fatal(err) + } + + table, err := kvstore.NewTable[string]("users", kvstore.NewStringValueEncoder[string]()) + if err != nil { + t.Fatal(err) + } + + db.AddTable(table) + + for _, key := range []string{"a", "b", "c", "12345"} { + b, checkErr := table.Exist(key) + if checkErr != nil { + t.Fatal(checkErr) + } + t.Log(key, "=>", b) + } +} + +func TestTable_Delete(t *testing.T) { + var a = assert.NewAssertion(t) + + store, err := kvstore.OpenStore("test") + if err != nil { + t.Fatal(err) + } + defer func() { + _ = store.Close() + }() + + db, err := store.NewDB("TEST_DB") + if err != nil { + t.Fatal(err) + } + + table, err := kvstore.NewTable[string]("users", kvstore.NewStringValueEncoder[string]()) + if err != nil { + t.Fatal(err) + } + + db.AddTable(table) + + value, err := table.Get("a123") + if err != nil { + if !kvstore.IsKeyNotFound(err) { + t.Fatal(err) + } + } else { + t.Log("old value:", value) + } + + err = table.Set("a123", "123456") + if err != nil { + t.Fatal(err) + } + + { + value, err = table.Get("a123") + if err != nil { + t.Fatal(err) + } + a.IsTrue(value == "123456") + } + + err = table.Delete("a123") + if err != nil { + t.Fatal(err) + } + + { + _, err = table.Get("a123") + a.IsTrue(kvstore.IsKeyNotFound(err)) + } +} + +func TestTable_Delete_Empty(t *testing.T) { + var a = assert.NewAssertion(t) + + store, err := kvstore.OpenStore("test") + if err != nil { + t.Fatal(err) + } + defer func() { + _ = store.Close() + }() + + db, err := store.NewDB("TEST_DB") + if err != nil { + t.Fatal(err) + } + + table, err := kvstore.NewTable[string]("users", kvstore.NewStringValueEncoder[string]()) + if err != nil { + t.Fatal(err) + } + + db.AddTable(table) + + { + err = table.Delete("a1", "a2", "a3", "a4", "") + if err != nil { + t.Fatal(err) + } + } + + { + err = table.Delete() + if err != nil { + t.Fatal(err) + } + } + + // set new + err = table.Set("a123", "123456") + if err != nil { + t.Fatal(err) + } + + // delete again + { + err = table.Delete("a1", "a2", "a3", "a4", "") + if err != nil { + t.Fatal(err) + } + } + + { + err = table.Delete() + if err != nil { + t.Fatal(err) + } + } + + // read + { + var value string + value, err = table.Get("a123") + if err != nil { + t.Fatal(err) + } + a.IsTrue(value == "123456") + } +} + +func TestTable_Count(t *testing.T) { + var table = testOpenStoreTable[*testCachedItem](t, "cache_items", &testCacheItemEncoder[*testCachedItem]{}) + + var before = time.Now() + count, err := table.Count() + if err != nil { + t.Fatal(err) + } + var costSeconds = time.Since(before).Seconds() + t.Log("count:", count, "cost:", costSeconds*1000, "ms", "qps:", fmt.Sprintf("%.2fM/s", float64(count)/costSeconds/1_000_000)) + + // watch memory usage + if testutils.IsSingleTesting() { + //time.Sleep(5 * time.Minute) + } +} + +func TestTable_Truncate(t *testing.T) { + var table = testOpenStoreTable[*testCachedItem](t, "cache_items", &testCacheItemEncoder[*testCachedItem]{}) + var before = time.Now() + err := table.Truncate() + if err != nil { + t.Fatal(err) + } + + var costSeconds = time.Since(before).Seconds() + t.Log("cost:", costSeconds*1000, "ms") + + t.Log("===after truncate===") + testInspectDB(t) +} + +func TestTable_ComposeFieldKey(t *testing.T) { + var a = assert.NewAssertion(t) + + var table = testOpenStoreTable[*testCachedItem](t, "cache_items", &testCacheItemEncoder[*testCachedItem]{}) + var fieldKeyBytes = table.ComposeFieldKey([]byte("Lily"), "username", []byte("lucy")) + t.Log(string(fieldKeyBytes)) + fieldValueBytes, keyValueBytes, err := table.DecodeFieldKey("username", fieldKeyBytes) + if err != nil { + t.Fatal(err) + } + t.Log("field:", string(fieldValueBytes), "key:", string(keyValueBytes)) + a.IsTrue(string(fieldValueBytes) == "lucy") + a.IsTrue(string(keyValueBytes) == "Lily") +} + +func BenchmarkTable_Set(b *testing.B) { + runtime.GOMAXPROCS(4) + + store, err := kvstore.OpenStore("test") + if err != nil { + b.Fatal(err) + } + defer func() { + _ = store.Close() + }() + + db, err := store.NewDB("TEST_DB") + if err != nil { + b.Fatal(err) + } + + table, err := kvstore.NewTable[uint8]("users", kvstore.NewIntValueEncoder[uint8]()) + if err != nil { + b.Fatal(err) + } + + db.AddTable(table) + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + putErr := table.Set(strconv.Itoa(rand.Int()), 1) + if putErr != nil { + b.Fatal(putErr) + } + } + }) +} + +func BenchmarkTable_Get(b *testing.B) { + runtime.GOMAXPROCS(4) + + store, err := kvstore.OpenStore("test") + if err != nil { + b.Fatal(err) + } + defer func() { + _ = store.Close() + }() + + db, err := store.NewDB("TEST_DB") + if err != nil { + b.Fatal(err) + } + + table, err := kvstore.NewTable[uint8]("users", kvstore.NewIntValueEncoder[uint8]()) + if err != nil { + b.Fatal(err) + } + + db.AddTable(table) + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, putErr := table.Get(types.String(rand.Int())) + if putErr != nil { + if kvstore.IsKeyNotFound(putErr) { + continue + } + b.Fatal(putErr) + } + } + }) +} + +/**func BenchmarkTable_NextId(b *testing.B) { + runtime.GOMAXPROCS(4) + + store, err := kvstore.OpenStore("test") + if err != nil { + b.Fatal(err) + } + defer func() { + _ = store.Close() + }() + + db, err := store.NewDB("TEST_DB") + if err != nil { + b.Fatal(err) + } + + table, err := kvstore.NewTable[uint8]("users", kvstore.NewIntValueEncoder[uint8]()) + if err != nil { + b.Fatal(err) + } + + db.AddTable(table) + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, nextErr := table.NextId("a") + if nextErr != nil { + b.Fatal(nextErr) + } + } + }) +} +**/ diff --git a/internal/utils/kvstore/tx.go b/internal/utils/kvstore/tx.go new file mode 100644 index 0000000..9fa638a --- /dev/null +++ b/internal/utils/kvstore/tx.go @@ -0,0 +1,70 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore + +import ( + "errors" + "github.com/cockroachdb/pebble" +) + +type Tx[T any] struct { + table *Table[T] + readOnly bool + + batch *pebble.Batch +} + +func NewTx[T any](table *Table[T], readOnly bool) *Tx[T] { + return &Tx[T]{ + table: table, + readOnly: readOnly, + batch: table.db.store.rawDB.NewIndexedBatch(), + } +} + +func (this *Tx[T]) Set(key string, value T) error { + if this.readOnly { + return errors.New("can not set value in readonly transaction") + } + + if len(key) > KeyMaxLength { + return ErrKeyTooLong + } + + valueBytes, err := this.table.encoder.Encode(value) + if err != nil { + return err + } + + return this.table.set(this, key, valueBytes, value) +} + +func (this *Tx[T]) Get(key string) (value T, err error) { + return this.table.get(this, key) +} + +func (this *Tx[T]) Delete(key string) error { + if this.readOnly { + return errors.New("can not delete value in readonly transaction") + } + + return this.table.deleteKeys(this, key) +} + +func (this *Tx[T]) NewIterator(opt *IteratorOptions) (*pebble.Iterator, error) { + return this.batch.NewIter(opt.RawOptions()) +} + +func (this *Tx[T]) Close() error { + return this.batch.Close() +} + +func (this *Tx[T]) Commit() error { + return this.batch.Commit(DefaultWriteOptions) +} + +func (this *Tx[T]) Query() *Query[T] { + var query = NewQuery[T]() + query.SetTx(this) + return query +} diff --git a/internal/utils/kvstore/tx_test.go b/internal/utils/kvstore/tx_test.go new file mode 100644 index 0000000..d0268d8 --- /dev/null +++ b/internal/utils/kvstore/tx_test.go @@ -0,0 +1,65 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore_test + +import ( + "fmt" + "github.com/TeaOSLab/EdgeNode/internal/utils/kvstore" + "testing" +) + +func TestTable_ReadTx(t *testing.T) { + store, err := kvstore.OpenStore("test") + if err != nil { + t.Fatal(err) + } + defer func() { + _ = store.Close() + }() + + db, err := store.NewDB("TEST_DB") + if err != nil { + t.Fatal(err) + } + + table, err := kvstore.NewCounterTable[uint64]("users_counter") + if err != nil { + t.Fatal(err) + } + + db.AddTable(table) + + err = table.WriteTx(func(tx *kvstore.Tx[uint64]) error { + for i := 0; i < 1000; i++ { + var key = fmt.Sprintf("a%03d", i) + setErr := tx.Set(key, uint64(i)) + if setErr != nil { + return setErr + } + + value, getErr := tx.Get(key) + if getErr != nil { + return getErr + } + t.Log("write:", key, "=>", value) + } + return nil + }) + if err != nil { + t.Fatal(err) + } + + err = table.ReadTx(func(tx *kvstore.Tx[uint64]) error { + for _, key := range []string{"a100", "a101", "a102"} { + value, getErr := tx.Get(key) + if getErr != nil { + return getErr + } + t.Log("read:", key, "=>", value) + } + return nil + }) + if err != nil { + t.Fatal(err) + } +} diff --git a/internal/utils/kvstore/utils.go b/internal/utils/kvstore/utils.go new file mode 100644 index 0000000..79008e8 --- /dev/null +++ b/internal/utils/kvstore/utils.go @@ -0,0 +1,50 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore + +import ( + "errors" + "os" + "regexp" + "strings" +) + +var nameRegexp = regexp.MustCompile(`^[a-zA-Z0-9_.-]+$`) + +// IsValidName check if store name or database name or table name is valid +func IsValidName(name string) bool { + return nameRegexp.MatchString(name) +} + +// RemoveStore remove store directory +func RemoveStore(path string) error { + var errNotStoreDirectory = errors.New("not store directory") + + if strings.HasSuffix(path, StoreSuffix) { + _, err := os.Stat(path) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + + // validate store + { + _, err = os.Stat(path + "/CURRENT") + if err != nil { + return errNotStoreDirectory + } + } + { + _, err = os.Stat(path + "/LOCK") + if err != nil { + return errNotStoreDirectory + } + } + + return os.RemoveAll(path) + } + + return errNotStoreDirectory +} diff --git a/internal/utils/kvstore/utils_test.go b/internal/utils/kvstore/utils_test.go new file mode 100644 index 0000000..943b35a --- /dev/null +++ b/internal/utils/kvstore/utils_test.go @@ -0,0 +1,29 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/utils/kvstore" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/assert" + _ "github.com/iwind/TeaGo/bootstrap" + "testing" +) + +func TestRemoveDB(t *testing.T) { + err := kvstore.RemoveStore(Tea.Root + "/data/stores/test.store") + if err != nil { + t.Fatal(err) + } +} + +func TestIsValidName(t *testing.T) { + var a = assert.NewAssertion(t) + + a.IsTrue(kvstore.IsValidName("a")) + a.IsTrue(kvstore.IsValidName("aB")) + a.IsTrue(kvstore.IsValidName("aBC1")) + a.IsTrue(kvstore.IsValidName("aBC1._-")) + a.IsFalse(kvstore.IsValidName(" aBC1._-")) + a.IsFalse(kvstore.IsValidName("")) +} diff --git a/internal/utils/kvstore/value_encode_int.go b/internal/utils/kvstore/value_encode_int.go new file mode 100644 index 0000000..f47896a --- /dev/null +++ b/internal/utils/kvstore/value_encode_int.go @@ -0,0 +1,81 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore + +import ( + "encoding/binary" + "github.com/iwind/TeaGo/types" + "golang.org/x/exp/constraints" + "strconv" +) + +type IntValueEncoder[T constraints.Integer] struct { +} + +func NewIntValueEncoder[T constraints.Integer]() *IntValueEncoder[T] { + return &IntValueEncoder[T]{} +} + +func (this *IntValueEncoder[T]) Encode(value T) (data []byte, err error) { + switch v := any(value).(type) { + case int8, int16, int32, int, uint: + data = []byte(types.String(v)) + case int64: + data = []byte(strconv.FormatInt(v, 16)) + case uint8: + return []byte{v}, nil + case uint16: + data = make([]byte, 2) + binary.BigEndian.PutUint16(data, v) + case uint32: + data = make([]byte, 4) + binary.BigEndian.PutUint32(data, v) + case uint64: + data = make([]byte, 8) + binary.BigEndian.PutUint64(data, v) + } + + return +} + +func (this *IntValueEncoder[T]) EncodeField(value T, fieldName string) ([]byte, error) { + _ = fieldName + return this.Encode(value) +} + +func (this *IntValueEncoder[T]) Decode(valueData []byte) (value T, err error) { + switch any(value).(type) { + case int8: + value = T(types.Int8(string(valueData))) + case int16: + value = T(types.Int16(string(valueData))) + case int32: + value = T(types.Int32(string(valueData))) + case int64: + int64Value, _ := strconv.ParseInt(string(valueData), 16, 64) + value = T(int64Value) + case int: + value = T(types.Int(string(valueData))) + case uint: + value = T(types.Uint(string(valueData))) + case uint8: + if len(valueData) == 1 { + value = T(valueData[0]) + } + case uint16: + if len(valueData) == 2 { + value = T(binary.BigEndian.Uint16(valueData)) + } + case uint32: + if len(valueData) == 4 { + value = T(binary.BigEndian.Uint32(valueData)) + } + case uint64: + if len(valueData) == 8 { + value = T(binary.BigEndian.Uint64(valueData)) + } + } + + return +} + diff --git a/internal/utils/kvstore/value_encoder.go b/internal/utils/kvstore/value_encoder.go new file mode 100644 index 0000000..f6db0d0 --- /dev/null +++ b/internal/utils/kvstore/value_encoder.go @@ -0,0 +1,23 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore + +import "encoding/json" + +type ValueEncoder[T any] interface { + Encode(value T) ([]byte, error) + EncodeField(value T, fieldName string) ([]byte, error) + Decode(valueBytes []byte) (value T, err error) +} + +type BaseObjectEncoder[T any] struct { +} + +func (this *BaseObjectEncoder[T]) Encode(value T) ([]byte, error) { + return json.Marshal(value) +} + +func (this *BaseObjectEncoder[T]) Decode(valueData []byte) (value T, err error) { + err = json.Unmarshal(valueData, &value) + return +} diff --git a/internal/utils/kvstore/value_encoder_bool.go b/internal/utils/kvstore/value_encoder_bool.go new file mode 100644 index 0000000..303413e --- /dev/null +++ b/internal/utils/kvstore/value_encoder_bool.go @@ -0,0 +1,29 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore + +type BoolValueEncoder[T bool] struct { +} + +func NewBoolValueEncoder[T bool]() *BoolValueEncoder[T] { + return &BoolValueEncoder[T]{} +} + +func (this *BoolValueEncoder[T]) Encode(value T) ([]byte, error) { + if value { + return []byte{1}, nil + } + return []byte{0}, nil +} + +func (this *BoolValueEncoder[T]) EncodeField(value T, fieldName string) ([]byte, error) { + _ = fieldName + return this.Encode(value) +} + +func (this *BoolValueEncoder[T]) Decode(valueData []byte) (value T, err error) { + if len(valueData) == 1 { + value = valueData[0] == 1 + } + return +} diff --git a/internal/utils/kvstore/value_encoder_bytes.go b/internal/utils/kvstore/value_encoder_bytes.go new file mode 100644 index 0000000..ec3ca16 --- /dev/null +++ b/internal/utils/kvstore/value_encoder_bytes.go @@ -0,0 +1,24 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore + +type BytesValueEncoder[T []byte] struct { +} + +func NewBytesValueEncoder[T []byte]() *BytesValueEncoder[T] { + return &BytesValueEncoder[T]{} +} + +func (this *BytesValueEncoder[T]) Encode(value T) ([]byte, error) { + return value, nil +} + +func (this *BytesValueEncoder[T]) EncodeField(value T, fieldName string) ([]byte, error) { + _ = fieldName + return this.Encode(value) +} + +func (this *BytesValueEncoder[T]) Decode(valueData []byte) (value T, err error) { + value = valueData + return +} diff --git a/internal/utils/kvstore/value_encoder_string.go b/internal/utils/kvstore/value_encoder_string.go new file mode 100644 index 0000000..1bf390c --- /dev/null +++ b/internal/utils/kvstore/value_encoder_string.go @@ -0,0 +1,24 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore + +type StringValueEncoder[T string] struct { +} + +func NewStringValueEncoder[T string]() *StringValueEncoder[T] { + return &StringValueEncoder[T]{} +} + +func (this *StringValueEncoder[T]) Encode(value T) ([]byte, error) { + return []byte(value), nil +} + +func (this *StringValueEncoder[T]) EncodeField(value T, fieldName string) ([]byte, error) { + _ = fieldName + return this.Encode(value) +} + +func (this *StringValueEncoder[T]) Decode(valueData []byte) (value T, err error) { + value = T(valueData) + return +} diff --git a/internal/utils/kvstore/value_encoder_test.go b/internal/utils/kvstore/value_encoder_test.go new file mode 100644 index 0000000..554ff84 --- /dev/null +++ b/internal/utils/kvstore/value_encoder_test.go @@ -0,0 +1,469 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package kvstore_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/utils/kvstore" + "github.com/iwind/TeaGo/assert" + "testing" +) + +func TestStringValueEncoder_Encode(t *testing.T) { + var a = assert.NewAssertion(t) + + var encoder = kvstore.NewStringValueEncoder[string]() + data, err := encoder.Encode("abcdefg") + if err != nil { + t.Fatal(err) + } + + value, err := encoder.Decode(data) + if err != nil { + t.Fatal(err) + } + a.IsTrue(value == "abcdefg") +} + +func TestIntValueEncoder_Encode(t *testing.T) { + var a = assert.NewAssertion(t) + + { + var encoder = kvstore.NewIntValueEncoder[int8]() + data, err := encoder.Encode(1) + if err != nil { + t.Fatal(err) + } + v, err := encoder.Decode(data) + if err != nil { + t.Fatal(err) + } + a.IsTrue(v == 1) + t.Log("int8", string(data), "=>", data, "=>", v) + } + + { + var encoder = kvstore.NewIntValueEncoder[int8]() + data, err := encoder.Encode(-1) + if err != nil { + t.Fatal(err) + } + v, err := encoder.Decode(data) + if err != nil { + t.Fatal(err) + } + a.IsTrue(v == -1) + t.Log("int8", string(data), "=>", data, "=>", v) + } + + { + var encoder = kvstore.NewIntValueEncoder[int16]() + data, err := encoder.Encode(123) + if err != nil { + t.Fatal(err) + } + v, err := encoder.Decode(data) + if err != nil { + t.Fatal(err) + } + a.IsTrue(v == 123) + t.Log("int16", string(data), "=>", data, "=>", v) + } + + { + var encoder = kvstore.NewIntValueEncoder[int16]() + data, err := encoder.Encode(-123) + if err != nil { + t.Fatal(err) + } + v, err := encoder.Decode(data) + if err != nil { + t.Fatal(err) + } + a.IsTrue(v == -123) + t.Log("int16", string(data), "=>", data, "=>", v) + } + + { + var encoder = kvstore.NewIntValueEncoder[int32]() + data, err := encoder.Encode(123) + if err != nil { + t.Fatal(err) + } + v, err := encoder.Decode(data) + if err != nil { + t.Fatal(err) + } + a.IsTrue(v == 123) + t.Log("int32", string(data), "=>", data, "=>", v) + } + + { + var encoder = kvstore.NewIntValueEncoder[int32]() + data, err := encoder.Encode(-123) + if err != nil { + t.Fatal(err) + } + v, err := encoder.Decode(data) + if err != nil { + t.Fatal(err) + } + a.IsTrue(v == -123) + t.Log("int32", string(data), "=>", data, "=>", v) + } + + { + var encoder = kvstore.NewIntValueEncoder[int64]() + data, err := encoder.Encode(123456) + if err != nil { + t.Fatal(err) + } + v, err := encoder.Decode(data) + if err != nil { + t.Fatal(err) + } + a.IsTrue(v == 123456) + t.Log("int64", string(data), "=>", data, "=>", v) + } + + { + var encoder = kvstore.NewIntValueEncoder[int64]() + data, err := encoder.Encode(1234567890) + if err != nil { + t.Fatal(err) + } + v, err := encoder.Decode(data) + if err != nil { + t.Fatal(err) + } + a.IsTrue(v == 1234567890) + t.Log("int64", string(data), "=>", data, "=>", v) + } + + { + var encoder = kvstore.NewIntValueEncoder[int64]() + data, err := encoder.Encode(-123456) + if err != nil { + t.Fatal(err) + } + v, err := encoder.Decode(data) + if err != nil { + t.Fatal(err) + } + a.IsTrue(v == -123456) + t.Log("int64", string(data), "=>", data, "=>", v) + } + + { + var encoder = kvstore.NewIntValueEncoder[int]() + data, err := encoder.Encode(123) + if err != nil { + t.Fatal(err) + } + v, err := encoder.Decode(data) + if err != nil { + t.Fatal(err) + } + a.IsTrue(v == 123) + t.Log("int", string(data), "=>", data, "=>", v) + } + + { + var encoder = kvstore.NewIntValueEncoder[int]() + data, err := encoder.Encode(-123) + if err != nil { + t.Fatal(err) + } + v, err := encoder.Decode(data) + if err != nil { + t.Fatal(err) + } + a.IsTrue(v == -123) + t.Log("int", string(data), "=>", data, "=>", v) + } + + { + var encoder = kvstore.NewIntValueEncoder[uint]() + data, err := encoder.Encode(123) + if err != nil { + t.Fatal(err) + } + v, err := encoder.Decode(data) + if err != nil { + t.Fatal(err) + } + a.IsTrue(v == 123) + t.Log("uint", string(data), "=>", data, "=>", v) + } + + { + var encoder = kvstore.NewIntValueEncoder[uint8]() + data, err := encoder.Encode(97) + if err != nil { + t.Fatal(err) + } + v, err := encoder.Decode(data) + if err != nil { + t.Fatal(err) + } + a.IsTrue(v == 97) + t.Log("uint8", string(data), "=>", data, "=>", v) + } + + { + var encoder = kvstore.NewIntValueEncoder[uint16]() + data, err := encoder.Encode(123) + if err != nil { + t.Fatal(err) + } + v, err := encoder.Decode(data) + if err != nil { + t.Fatal(err) + } + a.IsTrue(v == 123) + t.Log("uint16", string(data), "=>", data, "=>", v) + } + + { + var encoder = kvstore.NewIntValueEncoder[uint32]() + data, err := encoder.Encode(123) + if err != nil { + t.Fatal(err) + } + v, err := encoder.Decode(data) + if err != nil { + t.Fatal(err) + } + a.IsTrue(v == 123) + t.Log("uint32", string(data), "=>", data, "=>", v) + } + + { + var encoder = kvstore.NewIntValueEncoder[uint64]() + data, err := encoder.Encode(123) + if err != nil { + t.Fatal(err) + } + v, err := encoder.Decode(data) + if err != nil { + t.Fatal(err) + } + a.IsTrue(v == 123) + t.Log("uint64", string(data), "=>", data, "=>", v) + } + + { + var encoder = kvstore.NewIntValueEncoder[uint64]() + data, err := encoder.Encode(1234567890) + if err != nil { + t.Fatal(err) + } + v, err := encoder.Decode(data) + if err != nil { + t.Fatal(err) + } + a.IsTrue(v == 1234567890) + t.Log("uint64", string(data), "=>", data, "=>", v) + } +} + +func TestBytesValueEncoder_Encode(t *testing.T) { + var encoder = kvstore.NewBytesValueEncoder[[]byte]() + { + data, err := encoder.Encode(nil) + if err != nil { + t.Fatal(err) + } + value, err := encoder.Decode(data) + if err != nil { + t.Fatal(err) + } + t.Log(data, "=>", value) + } + + { + data, err := encoder.Encode([]byte("ABC")) + if err != nil { + t.Fatal(err) + } + value, err := encoder.Decode(data) + if err != nil { + t.Fatal(err) + } + t.Log(data, "=>", value) + } +} + +func TestBytesValueEncoder_Bool(t *testing.T) { + var encoder = kvstore.NewBoolValueEncoder[bool]() + { + data, err := encoder.Encode(true) + if err != nil { + t.Fatal(err) + } + value, err := encoder.Decode(data) + if err != nil { + t.Fatal(err) + } + t.Log(data, "=>", value) + } + + { + data, err := encoder.Encode(false) + if err != nil { + t.Fatal(err) + } + value, err := encoder.Decode(data) + if err != nil { + t.Fatal(err) + } + t.Log(data, "=>", value) + } + + { + value, err := encoder.Decode(nil) + if err != nil { + t.Fatal(err) + } + t.Log("nil", "=>", value) + } + + { + value, err := encoder.Decode([]byte{1, 2, 3, 4}) + if err != nil { + t.Fatal(err) + } + t.Log("nil", "=>", value) + } +} + +type objectType struct { + Name string `json:"1"` + Age int `json:"2"` +} + +type objectTypeEncoder[T objectType] struct { + kvstore.BaseObjectEncoder[T] +} + +func (this *objectTypeEncoder[T]) EncodeField(value T, fieldName string) ([]byte, error) { + return nil, nil +} + +func TestBaseObjectEncoder_Encode(t *testing.T) { + var encoder = &objectTypeEncoder[objectType]{} + + { + data, err := encoder.Encode(objectType{ + Name: "lily", + Age: 20, + }) + if err != nil { + t.Fatal(err) + } + t.Log("encoded:", string(data)) + } + + { + value, err := encoder.Decode([]byte(`{"1":"lily","2":20}`)) + if err != nil { + t.Fatal(err) + } + t.Logf("decoded: %+v", value) + } +} + +type objectType2 struct { + Name string `json:"1"` + Age int `json:"2"` +} + +type objectTypeEncoder2[T interface{ *objectType2 }] struct { + kvstore.BaseObjectEncoder[T] +} + +func (this *objectTypeEncoder2[T]) EncodeField(value T, fieldName string) ([]byte, error) { + switch fieldName { + case "Name": + return []byte(any(value).(*objectType2).Name), nil + } + return nil, nil +} + +func TestBaseObjectEncoder_Encode2(t *testing.T) { + var encoder = &objectTypeEncoder2[*objectType2]{} + + { + data, err := encoder.Encode(&objectType2{ + Name: "lily", + Age: 20, + }) + if err != nil { + t.Fatal(err) + } + t.Log("encoded:", string(data)) + } + + { + value, err := encoder.Decode([]byte(`{"1":"lily","2":20}`)) + if err != nil { + t.Fatal(err) + } + t.Logf("decoded: %+v", value) + } + + { + field, err := encoder.EncodeField(&objectType2{ + Name: "lily", + Age: 20, + }, "Name") + if err != nil { + t.Fatal(err) + } + t.Log("encoded field:", string(field)) + } +} + +func BenchmarkStringValueEncoder_Encode(b *testing.B) { + for i := 0; i < b.N; i++ { + var encoder = kvstore.NewStringValueEncoder[string]() + data, err := encoder.Encode("1234567890") + if err != nil { + b.Fatal(err) + } + v, err := encoder.Decode(data) + if err != nil { + b.Fatal(err) + } + _ = v + } +} + +func BenchmarkIntValueEncoder_Encode(b *testing.B) { + for i := 0; i < b.N; i++ { + var encoder = kvstore.NewIntValueEncoder[int64]() + data, err := encoder.Encode(1234567890) + if err != nil { + b.Fatal(err) + } + v, err := encoder.Decode(data) + if err != nil { + b.Fatal(err) + } + _ = v + } +} + +func BenchmarkUIntValueEncoder_Encode(b *testing.B) { + for i := 0; i < b.N; i++ { + var encoder = kvstore.NewIntValueEncoder[uint64]() + data, err := encoder.Encode(1234567890) + if err != nil { + b.Fatal(err) + } + v, err := encoder.Decode(data) + if err != nil { + b.Fatal(err) + } + _ = v + } +}