mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-09 20:10:27 +08:00
优化本地数据库相关代码
This commit is contained in:
@@ -458,7 +458,7 @@ func (this *FileList) UpgradeV3(oldDir string, brokenOnError bool) error {
|
||||
remotelogs.Println("CACHE", "upgrading local database finished")
|
||||
}()
|
||||
|
||||
db, err := dbs.OpenWriter("file:" + indexDBPath + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=NORMAL&_locking_mode=EXCLUSIVE")
|
||||
db, err := dbs.OpenWriter("file:" + indexDBPath + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=" + dbs.SyncMode + "&_locking_mode=EXCLUSIVE")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -73,7 +73,7 @@ func (this *FileListDB) Open(dbPath string) error {
|
||||
|
||||
// write db
|
||||
// 这里不能加 EXCLUSIVE 锁,不然异步事务可能会失败
|
||||
writeDB, err := dbs.OpenWriter("file:" + dbPath + "?cache=private&mode=rwc&_journal_mode=WAL&_sync=NORMAL&_cache_size=" + types.String(cacheSize) + "&_secure_delete=FAST")
|
||||
writeDB, err := dbs.OpenWriter("file:" + dbPath + "?cache=private&mode=rwc&_journal_mode=WAL&_sync=" + dbs.SyncMode + "&_cache_size=" + types.String(cacheSize) + "&_secure_delete=FAST")
|
||||
if err != nil {
|
||||
return fmt.Errorf("open write database failed: %w", err)
|
||||
}
|
||||
|
||||
@@ -60,7 +60,7 @@ func (this *IPListDB) init() error {
|
||||
|
||||
var path = this.dir + "/ip_list.db"
|
||||
|
||||
db, err := dbs.OpenWriter("file:" + path + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=NORMAL&_locking_mode=EXCLUSIVE")
|
||||
db, err := dbs.OpenWriter("file:" + path + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=" + dbs.SyncMode + "&_locking_mode=EXCLUSIVE")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -91,7 +91,7 @@ func (this *Task) Init() error {
|
||||
|
||||
var path = dir + "/metric." + types.String(this.item.Id) + ".db"
|
||||
|
||||
db, err := dbs.OpenWriter("file:" + path + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=NORMAL&_locking_mode=EXCLUSIVE")
|
||||
db, err := dbs.OpenWriter("file:" + path + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=" + dbs.SyncMode + "&_locking_mode=EXCLUSIVE")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -18,6 +18,10 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
SyncMode = "NORMAL"
|
||||
)
|
||||
|
||||
var errDBIsClosed = errors.New("the database is closed")
|
||||
|
||||
type DB struct {
|
||||
@@ -199,6 +203,7 @@ func (this *DB) Close() error {
|
||||
this.statusLocker.Unlock()
|
||||
|
||||
// waiting for updating operations to finish
|
||||
var maxLoops = 5_000
|
||||
for {
|
||||
this.statusLocker.Lock()
|
||||
var countUpdating = this.countUpdating
|
||||
@@ -207,6 +212,11 @@ func (this *DB) Close() error {
|
||||
break
|
||||
}
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
|
||||
maxLoops--
|
||||
if maxLoops <= 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
for _, batch := range this.batches {
|
||||
|
||||
@@ -3,12 +3,13 @@
|
||||
package dbs_test
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
|
||||
"net/url"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParseDSN(t *testing.T) {
|
||||
var dsn = "file:/home/cache/p43/.indexes/db-3.db?cache=private&mode=ro&_journal_mode=WAL&_sync=NORMAL&_cache_size=88000"
|
||||
var dsn = "file:/home/cache/p43/.indexes/db-3.db?cache=private&mode=ro&_journal_mode=WAL&_sync=" + dbs.SyncMode + "&_cache_size=88000"
|
||||
u, err := url.Parse(dsn)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
||||
@@ -5,6 +5,7 @@ package dbs
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
|
||||
)
|
||||
|
||||
type Stmt struct {
|
||||
@@ -27,7 +28,7 @@ func (this *Stmt) EnableStat() {
|
||||
this.enableStat = true
|
||||
}
|
||||
|
||||
func (this *Stmt) ExecContext(ctx context.Context, args ...any) (sql.Result, error) {
|
||||
func (this *Stmt) ExecContext(ctx context.Context, args ...any) (result sql.Result, err error) {
|
||||
// check database status
|
||||
if this.db.BeginUpdating() {
|
||||
defer this.db.EndUpdating()
|
||||
@@ -38,10 +39,13 @@ func (this *Stmt) ExecContext(ctx context.Context, args ...any) (sql.Result, err
|
||||
if this.enableStat {
|
||||
defer SharedQueryStatManager.AddQuery(this.query).End()
|
||||
}
|
||||
return this.rawStmt.ExecContext(ctx, args...)
|
||||
fsutils.WriteBegin()
|
||||
result, err = this.rawStmt.ExecContext(ctx, args...)
|
||||
fsutils.WriteEnd()
|
||||
return
|
||||
}
|
||||
|
||||
func (this *Stmt) Exec(args ...any) (sql.Result, error) {
|
||||
func (this *Stmt) Exec(args ...any) (result sql.Result, err error) {
|
||||
// check database status
|
||||
if this.db.BeginUpdating() {
|
||||
defer this.db.EndUpdating()
|
||||
@@ -52,7 +56,11 @@ func (this *Stmt) Exec(args ...any) (sql.Result, error) {
|
||||
if this.enableStat {
|
||||
defer SharedQueryStatManager.AddQuery(this.query).End()
|
||||
}
|
||||
return this.rawStmt.Exec(args...)
|
||||
|
||||
fsutils.WriteBegin()
|
||||
result, err = this.rawStmt.Exec(args...)
|
||||
fsutils.WriteEnd()
|
||||
return
|
||||
}
|
||||
|
||||
func (this *Stmt) QueryContext(ctx context.Context, args ...any) (*sql.Rows, error) {
|
||||
|
||||
Reference in New Issue
Block a user