优化本地数据库关闭相关代码

This commit is contained in:
刘祥超
2023-06-23 21:32:38 +08:00
parent 032c118f49
commit 7fd8d7756b
5 changed files with 147 additions and 27 deletions

View File

@@ -110,7 +110,7 @@ func (this *FileListDB) Open(dbPath string) error {
} }
} }
this.writeBatch = dbs.NewBatch(writeDB.RawDB(), 4) this.writeBatch = dbs.NewBatch(writeDB, 4)
this.writeBatch.OnFail(func(err error) { this.writeBatch.OnFail(func(err error) {
remotelogs.Warn("LIST_FILE_DB", "run batch failed: "+err.Error()+" ("+filepath.Base(this.dbPath)+")") remotelogs.Warn("LIST_FILE_DB", "run batch failed: "+err.Error()+" ("+filepath.Base(this.dbPath)+")")
}) })
@@ -539,10 +539,6 @@ func (this *FileListDB) Close() error {
_ = this.listOlderItemsStmt.Close() _ = this.listOlderItemsStmt.Close()
} }
if this.writeBatch != nil {
this.writeBatch.Close()
}
var errStrings []string var errStrings []string
if this.readDB != nil { if this.readDB != nil {

View File

@@ -14,26 +14,28 @@ type batchItem struct {
} }
type Batch struct { type Batch struct {
db *sql.DB db *DB
n int n int
enableStat bool enableStat bool
onFail func(err error) onFail func(err error)
queue chan *batchItem queue chan *batchItem
close chan bool closeEvent chan bool
isClosed bool isClosed bool
} }
func NewBatch(db *sql.DB, n int) *Batch { func NewBatch(db *DB, n int) *Batch {
return &Batch{ var batch = &Batch{
db: db, db: db,
n: n, n: n,
queue: make(chan *batchItem), queue: make(chan *batchItem),
close: make(chan bool, 1), closeEvent: make(chan bool, 1),
} }
db.batches = append(db.batches, batch)
return batch
} }
func (this *Batch) EnableStat(b bool) { func (this *Batch) EnableStat(b bool) {
@@ -68,7 +70,7 @@ For:
// closed // closed
if this.isClosed { if this.isClosed {
if lastTx != nil { if lastTx != nil {
_ = lastTx.Commit() _ = this.commitTx(lastTx)
lastTx = nil lastTx = nil
} }
@@ -86,6 +88,9 @@ For:
err := this.execItem(lastTx, item) err := this.execItem(lastTx, item)
if err != nil { if err != nil {
if IsClosedErr(err) {
return
}
this.processErr(item.query, err) this.processErr(item.query, err)
} }
@@ -93,9 +98,12 @@ For:
if count == n { if count == n {
count = 0 count = 0
err = lastTx.Commit() err = this.commitTx(lastTx)
lastTx = nil lastTx = nil
if err != nil { if err != nil {
if IsClosedErr(err) {
return
}
this.processErr("commit", err) this.processErr("commit", err)
} }
} }
@@ -104,16 +112,19 @@ For:
continue For continue For
} }
count = 0 count = 0
err := lastTx.Commit() err := this.commitTx(lastTx)
lastTx = nil lastTx = nil
if err != nil { if err != nil {
if IsClosedErr(err) {
return
}
this.processErr("commit", err) this.processErr("commit", err)
} }
case <-this.close: case <-this.closeEvent:
// closed // closed
if lastTx != nil { if lastTx != nil {
_ = lastTx.Commit() _ = this.commitTx(lastTx)
lastTx = nil lastTx = nil
} }
@@ -122,17 +133,21 @@ For:
} }
} }
func (this *Batch) Close() { func (this *Batch) close() {
this.isClosed = true this.isClosed = true
select { select {
case this.close <- true: case this.closeEvent <- true:
default: default:
} }
} }
func (this *Batch) beginTx() *sql.Tx { func (this *Batch) beginTx() *sql.Tx {
if !this.db.BeginUpdating() {
return nil
}
tx, err := this.db.Begin() tx, err := this.db.Begin()
if err != nil { if err != nil {
this.processErr("begin transaction", err) this.processErr("begin transaction", err)
@@ -141,9 +156,18 @@ func (this *Batch) beginTx() *sql.Tx {
return tx return tx
} }
func (this *Batch) commitTx(tx *sql.Tx) error {
// always commit without checking database closing status
this.db.EndUpdating()
return tx.Commit()
}
func (this *Batch) execItem(tx *sql.Tx, item *batchItem) error { func (this *Batch) execItem(tx *sql.Tx, item *batchItem) error {
if this.isClosed { // check database status
return nil if this.db.BeginUpdating() {
defer this.db.EndUpdating()
} else {
return errDBIsClosed
} }
if this.enableStat { if this.enableStat {

View File

@@ -5,19 +5,31 @@ package dbs
import ( import (
"context" "context"
"database/sql" "database/sql"
"errors"
"fmt" "fmt"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils/fileutils" "github.com/TeaOSLab/EdgeNode/internal/utils/fileutils"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
"strings" "strings"
"sync"
"time"
) )
var errDBIsClosed = errors.New("the database is closed")
type DB struct { type DB struct {
locker *fileutils.Locker locker *fileutils.Locker
rawDB *sql.DB rawDB *sql.DB
statusLocker sync.Mutex
countUpdating int32
isClosing bool
enableStat bool enableStat bool
batches []*Batch
} }
func OpenWriter(dsn string) (*DB, error) { func OpenWriter(dsn string) (*DB, error) {
@@ -63,10 +75,10 @@ func NewDB(rawDB *sql.DB) *DB {
} }
events.OnKey(events.EventQuit, fmt.Sprintf("db_%p", db), func() { events.OnKey(events.EventQuit, fmt.Sprintf("db_%p", db), func() {
_ = rawDB.Close() _ = db.Close()
}) })
events.OnKey(events.EventTerminated, fmt.Sprintf("db_%p", db), func() { events.OnKey(events.EventTerminated, fmt.Sprintf("db_%p", db), func() {
_ = rawDB.Close() _ = db.Close()
}) })
return db return db
@@ -81,6 +93,13 @@ func (this *DB) EnableStat(b bool) {
} }
func (this *DB) Begin() (*sql.Tx, error) { func (this *DB) Begin() (*sql.Tx, error) {
// check database status
if this.BeginUpdating() {
defer this.EndUpdating()
} else {
return nil, errDBIsClosed
}
return this.rawDB.Begin() return this.rawDB.Begin()
} }
@@ -90,7 +109,7 @@ func (this *DB) Prepare(query string) (*Stmt, error) {
return nil, err return nil, err
} }
var s = NewStmt(stmt, query) var s = NewStmt(this, stmt, query)
if this.enableStat { if this.enableStat {
s.EnableStat() s.EnableStat()
} }
@@ -98,13 +117,28 @@ func (this *DB) Prepare(query string) (*Stmt, error) {
} }
func (this *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) { func (this *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
// check database status
if this.BeginUpdating() {
defer this.EndUpdating()
} else {
return nil, errDBIsClosed
}
if this.enableStat { if this.enableStat {
defer SharedQueryStatManager.AddQuery(query).End() defer SharedQueryStatManager.AddQuery(query).End()
} }
return this.rawDB.ExecContext(ctx, query, args...) return this.rawDB.ExecContext(ctx, query, args...)
} }
func (this *DB) Exec(query string, args ...interface{}) (sql.Result, error) { func (this *DB) Exec(query string, args ...interface{}) (sql.Result, error) {
// check database status
if this.BeginUpdating() {
defer this.EndUpdating()
} else {
return nil, errDBIsClosed
}
if this.enableStat { if this.enableStat {
defer SharedQueryStatManager.AddQuery(query).End() defer SharedQueryStatManager.AddQuery(query).End()
} }
@@ -125,7 +159,32 @@ func (this *DB) QueryRow(query string, args ...interface{}) *sql.Row {
return this.rawDB.QueryRow(query, args...) return this.rawDB.QueryRow(query, args...)
} }
// Close the database
func (this *DB) Close() error { func (this *DB) Close() error {
// check database status
this.statusLocker.Lock()
if this.isClosing {
this.statusLocker.Unlock()
return nil
}
this.isClosing = true
this.statusLocker.Unlock()
// waiting for updating operations to finish
for {
this.statusLocker.Lock()
var countUpdating = this.countUpdating
this.statusLocker.Unlock()
if countUpdating <= 0 {
break
}
time.Sleep(1 * time.Millisecond)
}
for _, batch := range this.batches {
batch.close()
}
events.Remove(fmt.Sprintf("db_%p", this)) events.Remove(fmt.Sprintf("db_%p", this))
defer func() { defer func() {
@@ -137,6 +196,24 @@ func (this *DB) Close() error {
return this.rawDB.Close() return this.rawDB.Close()
} }
func (this *DB) BeginUpdating() bool {
this.statusLocker.Lock()
defer this.statusLocker.Unlock()
if this.isClosing {
return false
}
this.countUpdating++
return true
}
func (this *DB) EndUpdating() {
this.statusLocker.Lock()
this.countUpdating--
this.statusLocker.Unlock()
}
func (this *DB) RawDB() *sql.DB { func (this *DB) RawDB() *sql.DB {
return this.rawDB return this.rawDB
} }

View File

@@ -8,14 +8,16 @@ import (
) )
type Stmt struct { type Stmt struct {
db *DB
rawStmt *sql.Stmt rawStmt *sql.Stmt
query string query string
enableStat bool enableStat bool
} }
func NewStmt(rawStmt *sql.Stmt, query string) *Stmt { func NewStmt(db *DB, rawStmt *sql.Stmt, query string) *Stmt {
return &Stmt{ return &Stmt{
db: db,
rawStmt: rawStmt, rawStmt: rawStmt,
query: query, query: query,
} }
@@ -26,6 +28,13 @@ func (this *Stmt) EnableStat() {
} }
func (this *Stmt) ExecContext(ctx context.Context, args ...interface{}) (sql.Result, error) { func (this *Stmt) ExecContext(ctx context.Context, args ...interface{}) (sql.Result, error) {
// check database status
if this.db.BeginUpdating() {
defer this.db.EndUpdating()
} else {
return nil, errDBIsClosed
}
if this.enableStat { if this.enableStat {
defer SharedQueryStatManager.AddQuery(this.query).End() defer SharedQueryStatManager.AddQuery(this.query).End()
} }
@@ -33,6 +42,13 @@ func (this *Stmt) ExecContext(ctx context.Context, args ...interface{}) (sql.Res
} }
func (this *Stmt) Exec(args ...interface{}) (sql.Result, error) { func (this *Stmt) Exec(args ...interface{}) (sql.Result, error) {
// check database status
if this.db.BeginUpdating() {
defer this.db.EndUpdating()
} else {
return nil, errDBIsClosed
}
if this.enableStat { if this.enableStat {
defer SharedQueryStatManager.AddQuery(this.query).End() defer SharedQueryStatManager.AddQuery(this.query).End()
} }

View File

@@ -0,0 +1,7 @@
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package dbs
func IsClosedErr(err error) bool {
return err == errDBIsClosed
}