diff --git a/internal/utils/kvstore/errors.go b/internal/utils/kvstore/errors.go index 659e20c..4468228 100644 --- a/internal/utils/kvstore/errors.go +++ b/internal/utils/kvstore/errors.go @@ -4,12 +4,14 @@ package kvstore import ( "errors" + "fmt" "github.com/cockroachdb/pebble" ) var ErrTableNotFound = errors.New("table not found") var ErrKeyTooLong = errors.New("too long key") -var ErrSkip= errors.New("skip") // skip count in iterator +var ErrSkip = errors.New("skip") // skip count in iterator +var ErrTableClosed = errors.New("table closed") func IsNotFound(err error) bool { return err != nil && errors.Is(err, pebble.ErrNotFound) @@ -22,3 +24,7 @@ func IsSkipError(err error) bool { func Skip() (bool, error) { return true, ErrSkip } + +func NewTableClosedErr(tableName string) error { + return fmt.Errorf("table '"+tableName+"' closed: %w", ErrTableClosed) +} diff --git a/internal/utils/kvstore/query.go b/internal/utils/kvstore/query.go index 6e9649c..b37f034 100644 --- a/internal/utils/kvstore/query.go +++ b/internal/utils/kvstore/query.go @@ -165,6 +165,10 @@ func (this *Query[T]) FieldOffset(fieldOffset []byte) *Query[T] { //} func (this *Query[T]) FindAll(fn IteratorFunc[T]) (err error) { + if this.table != nil && this.table.isClosed { + return NewTableClosedErr(this.table.name) + } + defer func() { var panicErr = recover() if panicErr != nil { diff --git a/internal/utils/kvstore/table.go b/internal/utils/kvstore/table.go index 6ecfc0a..0216b72 100644 --- a/internal/utils/kvstore/table.go +++ b/internal/utils/kvstore/table.go @@ -27,6 +27,7 @@ type Table[T any] struct { db *DB encoder ValueEncoder[T] fieldNames []string + isClosed bool mu *sync.RWMutex } @@ -70,6 +71,10 @@ func (this *Table[T]) Encoder() ValueEncoder[T] { } func (this *Table[T]) Set(key string, value T) error { + if this.isClosed { + return NewTableClosedErr(this.name) + } + if len(key) > KeyMaxLength { return ErrKeyTooLong } @@ -85,6 +90,10 @@ func (this *Table[T]) Set(key string, value T) error { } func (this *Table[T]) SetSync(key string, value T) error { + if this.isClosed { + return NewTableClosedErr(this.name) + } + if len(key) > KeyMaxLength { return ErrKeyTooLong } @@ -100,6 +109,10 @@ func (this *Table[T]) SetSync(key string, value T) error { } func (this *Table[T]) Insert(key string, value T) error { + if this.isClosed { + return NewTableClosedErr(this.name) + } + if len(key) > KeyMaxLength { return ErrKeyTooLong } @@ -129,6 +142,10 @@ func (this *Table[T]) ComposeFieldKey(keyBytes []byte, fieldName string, fieldVa } func (this *Table[T]) Exist(key string) (found bool, err error) { + if this.isClosed { + return false, NewTableClosedErr(this.name) + } + _, closer, err := this.db.store.rawDB.Get(this.FullKey(key)) if err != nil { if IsNotFound(err) { @@ -144,6 +161,11 @@ func (this *Table[T]) Exist(key string) (found bool, err error) { } func (this *Table[T]) Get(key string) (value T, err error) { + if this.isClosed { + err = NewTableClosedErr(this.name) + return + } + err = this.ReadTx(func(tx *Tx[T]) error { resultValue, getErr := this.get(tx, key) if getErr == nil { @@ -156,6 +178,10 @@ func (this *Table[T]) Get(key string) (value T, err error) { } func (this *Table[T]) Delete(key ...string) error { + if this.isClosed { + return NewTableClosedErr(this.name) + } + if len(key) == 0 { return nil } @@ -166,6 +192,10 @@ func (this *Table[T]) Delete(key ...string) error { } func (this *Table[T]) ReadTx(fn func(tx *Tx[T]) error) error { + if this.isClosed { + return NewTableClosedErr(this.name) + } + var tx = NewTx[T](this, true) defer func() { _ = tx.Close() @@ -180,6 +210,10 @@ func (this *Table[T]) ReadTx(fn func(tx *Tx[T]) error) error { } func (this *Table[T]) WriteTx(fn func(tx *Tx[T]) error) error { + if this.isClosed { + return NewTableClosedErr(this.name) + } + var tx = NewTx[T](this, false) defer func() { _ = tx.Close() @@ -194,6 +228,10 @@ func (this *Table[T]) WriteTx(fn func(tx *Tx[T]) error) error { } func (this *Table[T]) WriteTxSync(fn func(tx *Tx[T]) error) error { + if this.isClosed { + return NewTableClosedErr(this.name) + } + var tx = NewTx[T](this, false) defer func() { _ = tx.Close() @@ -208,6 +246,10 @@ func (this *Table[T]) WriteTxSync(fn func(tx *Tx[T]) error) error { } func (this *Table[T]) Truncate() error { + if this.isClosed { + return NewTableClosedErr(this.name) + } + this.mu.Lock() defer this.mu.Unlock() @@ -215,6 +257,10 @@ func (this *Table[T]) Truncate() error { } func (this *Table[T]) DeleteRange(start string, end string) error { + if this.isClosed { + return NewTableClosedErr(this.name) + } + return this.db.store.rawDB.DeleteRange(this.FullKeyBytes([]byte(start)), this.FullKeyBytes([]byte(end)), DefaultWriteOptions) } @@ -280,7 +326,7 @@ func (this *Table[T]) DecodeFieldKey(fieldName string, fieldKey []byte) (fieldVa } func (this *Table[T]) Close() error { - // nothing to do + this.isClosed = true return nil } diff --git a/internal/utils/kvstore/table_counter.go b/internal/utils/kvstore/table_counter.go index 6f394d3..02cc7d0 100644 --- a/internal/utils/kvstore/table_counter.go +++ b/internal/utils/kvstore/table_counter.go @@ -18,6 +18,11 @@ func NewCounterTable[T int64 | uint64](name string) (*CounterTable[T], error) { } func (this *CounterTable[T]) Increase(key string, delta T) (newValue T, err error) { + if this.isClosed { + err = NewTableClosedErr(this.name) + return + } + err = this.Table.WriteTx(func(tx *Tx[T]) error { value, getErr := tx.Get(key) if getErr != nil { diff --git a/internal/utils/kvstore/tx.go b/internal/utils/kvstore/tx.go index 5381e11..6b047f6 100644 --- a/internal/utils/kvstore/tx.go +++ b/internal/utils/kvstore/tx.go @@ -75,10 +75,17 @@ func (this *Tx[T]) Insert(key string, value T) error { } func (this *Tx[T]) Get(key string) (value T, err error) { + if this.table.isClosed { + err = NewTableClosedErr(this.table.name) + return + } return this.table.get(this, key) } func (this *Tx[T]) Delete(key string) error { + if this.table.isClosed { + return NewTableClosedErr(this.table.name) + } if this.readOnly { return errors.New("can not delete value in readonly transaction") }