mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-12 06:10:25 +08:00
优化本地数据库关闭相关代码
This commit is contained in:
@@ -110,7 +110,7 @@ func (this *FileListDB) Open(dbPath string) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
this.writeBatch = dbs.NewBatch(writeDB.RawDB(), 4)
|
this.writeBatch = dbs.NewBatch(writeDB, 4)
|
||||||
this.writeBatch.OnFail(func(err error) {
|
this.writeBatch.OnFail(func(err error) {
|
||||||
remotelogs.Warn("LIST_FILE_DB", "run batch failed: "+err.Error()+" ("+filepath.Base(this.dbPath)+")")
|
remotelogs.Warn("LIST_FILE_DB", "run batch failed: "+err.Error()+" ("+filepath.Base(this.dbPath)+")")
|
||||||
})
|
})
|
||||||
@@ -539,10 +539,6 @@ func (this *FileListDB) Close() error {
|
|||||||
_ = this.listOlderItemsStmt.Close()
|
_ = this.listOlderItemsStmt.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
if this.writeBatch != nil {
|
|
||||||
this.writeBatch.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
var errStrings []string
|
var errStrings []string
|
||||||
|
|
||||||
if this.readDB != nil {
|
if this.readDB != nil {
|
||||||
|
|||||||
@@ -14,26 +14,28 @@ type batchItem struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Batch struct {
|
type Batch struct {
|
||||||
db *sql.DB
|
db *DB
|
||||||
n int
|
n int
|
||||||
|
|
||||||
enableStat bool
|
enableStat bool
|
||||||
|
|
||||||
onFail func(err error)
|
onFail func(err error)
|
||||||
|
|
||||||
queue chan *batchItem
|
queue chan *batchItem
|
||||||
close chan bool
|
closeEvent chan bool
|
||||||
|
|
||||||
isClosed bool
|
isClosed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBatch(db *sql.DB, n int) *Batch {
|
func NewBatch(db *DB, n int) *Batch {
|
||||||
return &Batch{
|
var batch = &Batch{
|
||||||
db: db,
|
db: db,
|
||||||
n: n,
|
n: n,
|
||||||
queue: make(chan *batchItem),
|
queue: make(chan *batchItem),
|
||||||
close: make(chan bool, 1),
|
closeEvent: make(chan bool, 1),
|
||||||
}
|
}
|
||||||
|
db.batches = append(db.batches, batch)
|
||||||
|
return batch
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *Batch) EnableStat(b bool) {
|
func (this *Batch) EnableStat(b bool) {
|
||||||
@@ -68,7 +70,7 @@ For:
|
|||||||
// closed
|
// closed
|
||||||
if this.isClosed {
|
if this.isClosed {
|
||||||
if lastTx != nil {
|
if lastTx != nil {
|
||||||
_ = lastTx.Commit()
|
_ = this.commitTx(lastTx)
|
||||||
lastTx = nil
|
lastTx = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -86,6 +88,9 @@ For:
|
|||||||
|
|
||||||
err := this.execItem(lastTx, item)
|
err := this.execItem(lastTx, item)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if IsClosedErr(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
this.processErr(item.query, err)
|
this.processErr(item.query, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -93,9 +98,12 @@ For:
|
|||||||
|
|
||||||
if count == n {
|
if count == n {
|
||||||
count = 0
|
count = 0
|
||||||
err = lastTx.Commit()
|
err = this.commitTx(lastTx)
|
||||||
lastTx = nil
|
lastTx = nil
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if IsClosedErr(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
this.processErr("commit", err)
|
this.processErr("commit", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -104,16 +112,19 @@ For:
|
|||||||
continue For
|
continue For
|
||||||
}
|
}
|
||||||
count = 0
|
count = 0
|
||||||
err := lastTx.Commit()
|
err := this.commitTx(lastTx)
|
||||||
lastTx = nil
|
lastTx = nil
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if IsClosedErr(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
this.processErr("commit", err)
|
this.processErr("commit", err)
|
||||||
}
|
}
|
||||||
case <-this.close:
|
case <-this.closeEvent:
|
||||||
// closed
|
// closed
|
||||||
|
|
||||||
if lastTx != nil {
|
if lastTx != nil {
|
||||||
_ = lastTx.Commit()
|
_ = this.commitTx(lastTx)
|
||||||
lastTx = nil
|
lastTx = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -122,17 +133,21 @@ For:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *Batch) Close() {
|
func (this *Batch) close() {
|
||||||
this.isClosed = true
|
this.isClosed = true
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case this.close <- true:
|
case this.closeEvent <- true:
|
||||||
default:
|
default:
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *Batch) beginTx() *sql.Tx {
|
func (this *Batch) beginTx() *sql.Tx {
|
||||||
|
if !this.db.BeginUpdating() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
tx, err := this.db.Begin()
|
tx, err := this.db.Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
this.processErr("begin transaction", err)
|
this.processErr("begin transaction", err)
|
||||||
@@ -141,9 +156,18 @@ func (this *Batch) beginTx() *sql.Tx {
|
|||||||
return tx
|
return tx
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *Batch) commitTx(tx *sql.Tx) error {
|
||||||
|
// always commit without checking database closing status
|
||||||
|
this.db.EndUpdating()
|
||||||
|
return tx.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
func (this *Batch) execItem(tx *sql.Tx, item *batchItem) error {
|
func (this *Batch) execItem(tx *sql.Tx, item *batchItem) error {
|
||||||
if this.isClosed {
|
// check database status
|
||||||
return nil
|
if this.db.BeginUpdating() {
|
||||||
|
defer this.db.EndUpdating()
|
||||||
|
} else {
|
||||||
|
return errDBIsClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
if this.enableStat {
|
if this.enableStat {
|
||||||
|
|||||||
@@ -5,19 +5,31 @@ package dbs
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||||
"github.com/TeaOSLab/EdgeNode/internal/utils/fileutils"
|
"github.com/TeaOSLab/EdgeNode/internal/utils/fileutils"
|
||||||
_ "github.com/mattn/go-sqlite3"
|
_ "github.com/mattn/go-sqlite3"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var errDBIsClosed = errors.New("the database is closed")
|
||||||
|
|
||||||
type DB struct {
|
type DB struct {
|
||||||
locker *fileutils.Locker
|
locker *fileutils.Locker
|
||||||
rawDB *sql.DB
|
rawDB *sql.DB
|
||||||
|
|
||||||
|
statusLocker sync.Mutex
|
||||||
|
countUpdating int32
|
||||||
|
|
||||||
|
isClosing bool
|
||||||
|
|
||||||
enableStat bool
|
enableStat bool
|
||||||
|
|
||||||
|
batches []*Batch
|
||||||
}
|
}
|
||||||
|
|
||||||
func OpenWriter(dsn string) (*DB, error) {
|
func OpenWriter(dsn string) (*DB, error) {
|
||||||
@@ -63,10 +75,10 @@ func NewDB(rawDB *sql.DB) *DB {
|
|||||||
}
|
}
|
||||||
|
|
||||||
events.OnKey(events.EventQuit, fmt.Sprintf("db_%p", db), func() {
|
events.OnKey(events.EventQuit, fmt.Sprintf("db_%p", db), func() {
|
||||||
_ = rawDB.Close()
|
_ = db.Close()
|
||||||
})
|
})
|
||||||
events.OnKey(events.EventTerminated, fmt.Sprintf("db_%p", db), func() {
|
events.OnKey(events.EventTerminated, fmt.Sprintf("db_%p", db), func() {
|
||||||
_ = rawDB.Close()
|
_ = db.Close()
|
||||||
})
|
})
|
||||||
|
|
||||||
return db
|
return db
|
||||||
@@ -81,6 +93,13 @@ func (this *DB) EnableStat(b bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *DB) Begin() (*sql.Tx, error) {
|
func (this *DB) Begin() (*sql.Tx, error) {
|
||||||
|
// check database status
|
||||||
|
if this.BeginUpdating() {
|
||||||
|
defer this.EndUpdating()
|
||||||
|
} else {
|
||||||
|
return nil, errDBIsClosed
|
||||||
|
}
|
||||||
|
|
||||||
return this.rawDB.Begin()
|
return this.rawDB.Begin()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -90,7 +109,7 @@ func (this *DB) Prepare(query string) (*Stmt, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var s = NewStmt(stmt, query)
|
var s = NewStmt(this, stmt, query)
|
||||||
if this.enableStat {
|
if this.enableStat {
|
||||||
s.EnableStat()
|
s.EnableStat()
|
||||||
}
|
}
|
||||||
@@ -98,13 +117,28 @@ func (this *DB) Prepare(query string) (*Stmt, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
|
func (this *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
|
||||||
|
// check database status
|
||||||
|
if this.BeginUpdating() {
|
||||||
|
defer this.EndUpdating()
|
||||||
|
} else {
|
||||||
|
return nil, errDBIsClosed
|
||||||
|
}
|
||||||
|
|
||||||
if this.enableStat {
|
if this.enableStat {
|
||||||
defer SharedQueryStatManager.AddQuery(query).End()
|
defer SharedQueryStatManager.AddQuery(query).End()
|
||||||
}
|
}
|
||||||
|
|
||||||
return this.rawDB.ExecContext(ctx, query, args...)
|
return this.rawDB.ExecContext(ctx, query, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *DB) Exec(query string, args ...interface{}) (sql.Result, error) {
|
func (this *DB) Exec(query string, args ...interface{}) (sql.Result, error) {
|
||||||
|
// check database status
|
||||||
|
if this.BeginUpdating() {
|
||||||
|
defer this.EndUpdating()
|
||||||
|
} else {
|
||||||
|
return nil, errDBIsClosed
|
||||||
|
}
|
||||||
|
|
||||||
if this.enableStat {
|
if this.enableStat {
|
||||||
defer SharedQueryStatManager.AddQuery(query).End()
|
defer SharedQueryStatManager.AddQuery(query).End()
|
||||||
}
|
}
|
||||||
@@ -125,7 +159,32 @@ func (this *DB) QueryRow(query string, args ...interface{}) *sql.Row {
|
|||||||
return this.rawDB.QueryRow(query, args...)
|
return this.rawDB.QueryRow(query, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close the database
|
||||||
func (this *DB) Close() error {
|
func (this *DB) Close() error {
|
||||||
|
// check database status
|
||||||
|
this.statusLocker.Lock()
|
||||||
|
if this.isClosing {
|
||||||
|
this.statusLocker.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
this.isClosing = true
|
||||||
|
this.statusLocker.Unlock()
|
||||||
|
|
||||||
|
// waiting for updating operations to finish
|
||||||
|
for {
|
||||||
|
this.statusLocker.Lock()
|
||||||
|
var countUpdating = this.countUpdating
|
||||||
|
this.statusLocker.Unlock()
|
||||||
|
if countUpdating <= 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(1 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, batch := range this.batches {
|
||||||
|
batch.close()
|
||||||
|
}
|
||||||
|
|
||||||
events.Remove(fmt.Sprintf("db_%p", this))
|
events.Remove(fmt.Sprintf("db_%p", this))
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -137,6 +196,24 @@ func (this *DB) Close() error {
|
|||||||
return this.rawDB.Close()
|
return this.rawDB.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *DB) BeginUpdating() bool {
|
||||||
|
this.statusLocker.Lock()
|
||||||
|
defer this.statusLocker.Unlock()
|
||||||
|
|
||||||
|
if this.isClosing {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
this.countUpdating++
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *DB) EndUpdating() {
|
||||||
|
this.statusLocker.Lock()
|
||||||
|
this.countUpdating--
|
||||||
|
this.statusLocker.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
func (this *DB) RawDB() *sql.DB {
|
func (this *DB) RawDB() *sql.DB {
|
||||||
return this.rawDB
|
return this.rawDB
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,14 +8,16 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Stmt struct {
|
type Stmt struct {
|
||||||
|
db *DB
|
||||||
rawStmt *sql.Stmt
|
rawStmt *sql.Stmt
|
||||||
query string
|
query string
|
||||||
|
|
||||||
enableStat bool
|
enableStat bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStmt(rawStmt *sql.Stmt, query string) *Stmt {
|
func NewStmt(db *DB, rawStmt *sql.Stmt, query string) *Stmt {
|
||||||
return &Stmt{
|
return &Stmt{
|
||||||
|
db: db,
|
||||||
rawStmt: rawStmt,
|
rawStmt: rawStmt,
|
||||||
query: query,
|
query: query,
|
||||||
}
|
}
|
||||||
@@ -26,6 +28,13 @@ func (this *Stmt) EnableStat() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *Stmt) ExecContext(ctx context.Context, args ...interface{}) (sql.Result, error) {
|
func (this *Stmt) ExecContext(ctx context.Context, args ...interface{}) (sql.Result, error) {
|
||||||
|
// check database status
|
||||||
|
if this.db.BeginUpdating() {
|
||||||
|
defer this.db.EndUpdating()
|
||||||
|
} else {
|
||||||
|
return nil, errDBIsClosed
|
||||||
|
}
|
||||||
|
|
||||||
if this.enableStat {
|
if this.enableStat {
|
||||||
defer SharedQueryStatManager.AddQuery(this.query).End()
|
defer SharedQueryStatManager.AddQuery(this.query).End()
|
||||||
}
|
}
|
||||||
@@ -33,6 +42,13 @@ func (this *Stmt) ExecContext(ctx context.Context, args ...interface{}) (sql.Res
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *Stmt) Exec(args ...interface{}) (sql.Result, error) {
|
func (this *Stmt) Exec(args ...interface{}) (sql.Result, error) {
|
||||||
|
// check database status
|
||||||
|
if this.db.BeginUpdating() {
|
||||||
|
defer this.db.EndUpdating()
|
||||||
|
} else {
|
||||||
|
return nil, errDBIsClosed
|
||||||
|
}
|
||||||
|
|
||||||
if this.enableStat {
|
if this.enableStat {
|
||||||
defer SharedQueryStatManager.AddQuery(this.query).End()
|
defer SharedQueryStatManager.AddQuery(this.query).End()
|
||||||
}
|
}
|
||||||
|
|||||||
7
internal/utils/dbs/utils.go
Normal file
7
internal/utils/dbs/utils.go
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||||
|
|
||||||
|
package dbs
|
||||||
|
|
||||||
|
func IsClosedErr(err error) bool {
|
||||||
|
return err == errDBIsClosed
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user