mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-05 09:30:26 +08:00
KV存储增加panic处理
This commit is contained in:
@@ -7,7 +7,6 @@ import (
|
|||||||
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
|
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/utils/kvstore"
|
"github.com/TeaOSLab/EdgeNode/internal/utils/kvstore"
|
||||||
"github.com/cockroachdb/pebble"
|
"github.com/cockroachdb/pebble"
|
||||||
"github.com/iwind/TeaGo/types"
|
|
||||||
"regexp"
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
@@ -20,7 +19,7 @@ type KVListFileStore struct {
|
|||||||
// tables
|
// tables
|
||||||
itemsTable *kvstore.Table[*Item]
|
itemsTable *kvstore.Table[*Item]
|
||||||
|
|
||||||
isReady bool
|
rawIsReady bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewKVListFileStore(path string) *KVListFileStore {
|
func NewKVListFileStore(path string) *KVListFileStore {
|
||||||
@@ -64,7 +63,7 @@ func (this *KVListFileStore) Open() error {
|
|||||||
this.itemsTable = table
|
this.itemsTable = table
|
||||||
}
|
}
|
||||||
|
|
||||||
this.isReady = true
|
this.rawIsReady = true
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -74,13 +73,13 @@ func (this *KVListFileStore) Path() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *KVListFileStore) AddItem(hash string, item *Item) error {
|
func (this *KVListFileStore) AddItem(hash string, item *Item) error {
|
||||||
if !this.isReady {
|
if !this.isReady() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var currentTime = fasttime.Now().Unix()
|
var currentTime = fasttime.Now().Unix()
|
||||||
if item.ExpiresAt <= currentTime {
|
if item.ExpiresAt <= currentTime {
|
||||||
return errors.New("invalid expires time '" + types.String(item.ExpiresAt) + "'")
|
return nil
|
||||||
}
|
}
|
||||||
if item.CreatedAt <= 0 {
|
if item.CreatedAt <= 0 {
|
||||||
item.CreatedAt = currentTime
|
item.CreatedAt = currentTime
|
||||||
@@ -92,7 +91,7 @@ func (this *KVListFileStore) AddItem(hash string, item *Item) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *KVListFileStore) ExistItem(hash string) (bool, error) {
|
func (this *KVListFileStore) ExistItem(hash string) (bool, error) {
|
||||||
if !this.isReady {
|
if !this.isReady() {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -111,7 +110,7 @@ func (this *KVListFileStore) ExistItem(hash string) (bool, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *KVListFileStore) ExistQuickItem(hash string) (bool, error) {
|
func (this *KVListFileStore) ExistQuickItem(hash string) (bool, error) {
|
||||||
if !this.isReady {
|
if !this.isReady() {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -119,7 +118,7 @@ func (this *KVListFileStore) ExistQuickItem(hash string) (bool, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *KVListFileStore) RemoveItem(hash string) error {
|
func (this *KVListFileStore) RemoveItem(hash string) error {
|
||||||
if !this.isReady {
|
if !this.isReady() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -127,7 +126,7 @@ func (this *KVListFileStore) RemoveItem(hash string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *KVListFileStore) RemoveAllItems() error {
|
func (this *KVListFileStore) RemoveAllItems() error {
|
||||||
if !this.isReady {
|
if !this.isReady() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -135,7 +134,7 @@ func (this *KVListFileStore) RemoveAllItems() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *KVListFileStore) PurgeItems(count int, callback func(hash string) error) (int, error) {
|
func (this *KVListFileStore) PurgeItems(count int, callback func(hash string) error) (int, error) {
|
||||||
if !this.isReady {
|
if !this.isReady() {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -188,7 +187,7 @@ func (this *KVListFileStore) PurgeItems(count int, callback func(hash string) er
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *KVListFileStore) PurgeLFUItems(count int, callback func(hash string) error) error {
|
func (this *KVListFileStore) PurgeLFUItems(count int, callback func(hash string) error) error {
|
||||||
if !this.isReady {
|
if !this.isReady() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -234,7 +233,7 @@ func (this *KVListFileStore) PurgeLFUItems(count int, callback func(hash string)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *KVListFileStore) CleanItemsWithPrefix(prefix string) error {
|
func (this *KVListFileStore) CleanItemsWithPrefix(prefix string) error {
|
||||||
if !this.isReady {
|
if !this.isReady() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -292,7 +291,7 @@ func (this *KVListFileStore) CleanItemsWithPrefix(prefix string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *KVListFileStore) CleanItemsWithWildcardPrefix(prefix string) error {
|
func (this *KVListFileStore) CleanItemsWithWildcardPrefix(prefix string) error {
|
||||||
if !this.isReady {
|
if !this.isReady() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -350,7 +349,7 @@ func (this *KVListFileStore) CleanItemsWithWildcardPrefix(prefix string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *KVListFileStore) CleanItemsWithWildcardKey(key string) error {
|
func (this *KVListFileStore) CleanItemsWithWildcardKey(key string) error {
|
||||||
if !this.isReady {
|
if !this.isReady() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -419,7 +418,7 @@ func (this *KVListFileStore) CleanItemsWithWildcardKey(key string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *KVListFileStore) CountItems() (int64, error) {
|
func (this *KVListFileStore) CountItems() (int64, error) {
|
||||||
if !this.isReady {
|
if !this.isReady() {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -427,7 +426,7 @@ func (this *KVListFileStore) CountItems() (int64, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *KVListFileStore) StatItems() (*Stat, error) {
|
func (this *KVListFileStore) StatItems() (*Stat, error) {
|
||||||
if !this.isReady {
|
if !this.isReady() {
|
||||||
return &Stat{}, nil
|
return &Stat{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -447,7 +446,7 @@ func (this *KVListFileStore) StatItems() (*Stat, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *KVListFileStore) TestInspect(t *testing.T) error {
|
func (this *KVListFileStore) TestInspect(t *testing.T) error {
|
||||||
if !this.isReady {
|
if !this.isReady() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -470,11 +469,13 @@ func (this *KVListFileStore) TestInspect(t *testing.T) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *KVListFileStore) Close() error {
|
func (this *KVListFileStore) Close() error {
|
||||||
this.isReady = false
|
|
||||||
|
|
||||||
if this.rawStore != nil {
|
if this.rawStore != nil {
|
||||||
return this.rawStore.Close()
|
return this.rawStore.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *KVListFileStore) isReady() bool {
|
||||||
|
return this.rawIsReady && !this.rawStore.IsClosed()
|
||||||
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ package kvstore_test
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/utils/kvstore"
|
"github.com/TeaOSLab/EdgeNode/internal/utils/kvstore"
|
||||||
|
"github.com/iwind/TeaGo/assert"
|
||||||
"runtime"
|
"runtime"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@@ -25,6 +26,7 @@ func TestQuery_FindAll(t *testing.T) {
|
|||||||
//Desc().
|
//Desc().
|
||||||
FindAll(func(tx *kvstore.Tx[*testCachedItem], item kvstore.Item[*testCachedItem]) (goNext bool, err error) {
|
FindAll(func(tx *kvstore.Tx[*testCachedItem], item kvstore.Item[*testCachedItem]) (goNext bool, err error) {
|
||||||
t.Log("key:", item.Key, "value:", item.Value)
|
t.Log("key:", item.Key, "value:", item.Value)
|
||||||
|
|
||||||
return true, nil
|
return true, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -46,6 +48,12 @@ func TestQuery_FindAll_Break(t *testing.T) {
|
|||||||
FindAll(func(tx *kvstore.Tx[*testCachedItem], item kvstore.Item[*testCachedItem]) (goNext bool, err error) {
|
FindAll(func(tx *kvstore.Tx[*testCachedItem], item kvstore.Item[*testCachedItem]) (goNext bool, err error) {
|
||||||
t.Log("key:", item.Key, "value:", item.Value)
|
t.Log("key:", item.Key, "value:", item.Value)
|
||||||
count++
|
count++
|
||||||
|
|
||||||
|
if count > 2 {
|
||||||
|
// break test
|
||||||
|
_ = table.DB().Store().Close()
|
||||||
|
}
|
||||||
|
|
||||||
return count < 3, nil
|
return count < 3, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -53,6 +61,34 @@ func TestQuery_FindAll_Break(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestQuery_FindAll_Break_Closed(t *testing.T) {
|
||||||
|
var table = testOpenStoreTable[*testCachedItem](t, "cache_items", &testCacheItemEncoder[*testCachedItem]{})
|
||||||
|
|
||||||
|
var a = assert.NewAssertion(t)
|
||||||
|
|
||||||
|
var before = time.Now()
|
||||||
|
defer func() {
|
||||||
|
t.Log("cost:", time.Since(before).Seconds()*1000, "ms")
|
||||||
|
}()
|
||||||
|
|
||||||
|
var count int
|
||||||
|
err := table.
|
||||||
|
Query().
|
||||||
|
FindAll(func(tx *kvstore.Tx[*testCachedItem], item kvstore.Item[*testCachedItem]) (goNext bool, err error) {
|
||||||
|
t.Log("key:", item.Key, "value:", item.Value)
|
||||||
|
count++
|
||||||
|
|
||||||
|
if count > 2 {
|
||||||
|
// break test
|
||||||
|
_ = table.DB().Store().Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
return count < 3, nil
|
||||||
|
})
|
||||||
|
t.Log("expected error:", err)
|
||||||
|
a.IsTrue(err != nil)
|
||||||
|
}
|
||||||
|
|
||||||
func TestQuery_FindAll_Desc(t *testing.T) {
|
func TestQuery_FindAll_Desc(t *testing.T) {
|
||||||
var table = testOpenStoreTable[*testCachedItem](t, "cache_items", &testCacheItemEncoder[*testCachedItem]{})
|
var table = testOpenStoreTable[*testCachedItem](t, "cache_items", &testCacheItemEncoder[*testCachedItem]{})
|
||||||
|
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ type Store struct {
|
|||||||
|
|
||||||
dbs []*DB
|
dbs []*DB
|
||||||
|
|
||||||
locker sync.Mutex
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStore create store with name
|
// NewStore create store with name
|
||||||
@@ -126,8 +126,8 @@ func (this *Store) NewDB(dbName string) (*DB, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
this.locker.Lock()
|
this.mu.Lock()
|
||||||
defer this.locker.Unlock()
|
defer this.mu.Unlock()
|
||||||
|
|
||||||
this.dbs = append(this.dbs, db)
|
this.dbs = append(this.dbs, db)
|
||||||
return db, nil
|
return db, nil
|
||||||
@@ -142,7 +142,7 @@ func (this *Store) Close() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
this.locker.Lock()
|
this.mu.Lock()
|
||||||
var lastErr error
|
var lastErr error
|
||||||
for _, db := range this.dbs {
|
for _, db := range this.dbs {
|
||||||
err := db.Close()
|
err := db.Close()
|
||||||
@@ -151,12 +151,16 @@ func (this *Store) Close() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
this.locker.Unlock()
|
this.mu.Unlock()
|
||||||
|
|
||||||
if this.rawDB != nil {
|
if this.rawDB != nil {
|
||||||
this.isClosed = true
|
this.isClosed = true
|
||||||
return this.rawDB.Close()
|
err := this.rawDB.Close()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return lastErr
|
return lastErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -60,6 +60,10 @@ func (this *Table[T]) SetDB(db *DB) {
|
|||||||
this.db = db
|
this.db = db
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *Table[T]) DB() *DB {
|
||||||
|
return this.db
|
||||||
|
}
|
||||||
|
|
||||||
func (this *Table[T]) Set(key string, value T) error {
|
func (this *Table[T]) Set(key string, value T) error {
|
||||||
if len(key) > KeyMaxLength {
|
if len(key) > KeyMaxLength {
|
||||||
return ErrKeyTooLong
|
return ErrKeyTooLong
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ package kvstore
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"github.com/cockroachdb/pebble"
|
"github.com/cockroachdb/pebble"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -59,7 +60,17 @@ func (this *Tx[T]) Close() error {
|
|||||||
return this.batch.Close()
|
return this.batch.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *Tx[T]) Commit() error {
|
func (this *Tx[T]) Commit() (err error) {
|
||||||
|
defer func() {
|
||||||
|
var panicErr = recover()
|
||||||
|
if panicErr != nil {
|
||||||
|
resultErr, ok := panicErr.(error)
|
||||||
|
if ok {
|
||||||
|
err = fmt.Errorf("commit batch failed: %w", resultErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
return this.batch.Commit(DefaultWriteOptions)
|
return this.batch.Commit(DefaultWriteOptions)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user