diff --git a/internal/caches/list_file.go b/internal/caches/list_file.go index d01c148..9ff2dd7 100644 --- a/internal/caches/list_file.go +++ b/internal/caches/list_file.go @@ -458,7 +458,7 @@ func (this *FileList) UpgradeV3(oldDir string, brokenOnError bool) error { remotelogs.Println("CACHE", "upgrading local database finished") }() - db, err := dbs.OpenWriter("file:" + indexDBPath + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=NORMAL&_locking_mode=EXCLUSIVE") + db, err := dbs.OpenWriter("file:" + indexDBPath + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=" + dbs.SyncMode + "&_locking_mode=EXCLUSIVE") if err != nil { return err } diff --git a/internal/caches/list_file_db.go b/internal/caches/list_file_db.go index 06fd312..d796d42 100644 --- a/internal/caches/list_file_db.go +++ b/internal/caches/list_file_db.go @@ -73,7 +73,7 @@ func (this *FileListDB) Open(dbPath string) error { // write db // 这里不能加 EXCLUSIVE 锁,不然异步事务可能会失败 - writeDB, err := dbs.OpenWriter("file:" + dbPath + "?cache=private&mode=rwc&_journal_mode=WAL&_sync=NORMAL&_cache_size=" + types.String(cacheSize) + "&_secure_delete=FAST") + writeDB, err := dbs.OpenWriter("file:" + dbPath + "?cache=private&mode=rwc&_journal_mode=WAL&_sync=" + dbs.SyncMode + "&_cache_size=" + types.String(cacheSize) + "&_secure_delete=FAST") if err != nil { return fmt.Errorf("open write database failed: %w", err) } diff --git a/internal/iplibrary/ip_list_db.go b/internal/iplibrary/ip_list_db.go index e9f4d36..6fae4fe 100644 --- a/internal/iplibrary/ip_list_db.go +++ b/internal/iplibrary/ip_list_db.go @@ -60,7 +60,7 @@ func (this *IPListDB) init() error { var path = this.dir + "/ip_list.db" - db, err := dbs.OpenWriter("file:" + path + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=NORMAL&_locking_mode=EXCLUSIVE") + db, err := dbs.OpenWriter("file:" + path + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=" + dbs.SyncMode + "&_locking_mode=EXCLUSIVE") if err != nil { return err } diff --git a/internal/metrics/task.go b/internal/metrics/task.go index 087276c..a872ab7 100644 --- a/internal/metrics/task.go +++ b/internal/metrics/task.go @@ -91,7 +91,7 @@ func (this *Task) Init() error { var path = dir + "/metric." + types.String(this.item.Id) + ".db" - db, err := dbs.OpenWriter("file:" + path + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=NORMAL&_locking_mode=EXCLUSIVE") + db, err := dbs.OpenWriter("file:" + path + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=" + dbs.SyncMode + "&_locking_mode=EXCLUSIVE") if err != nil { return err } diff --git a/internal/utils/dbs/db.go b/internal/utils/dbs/db.go index f3f988c..67b1401 100644 --- a/internal/utils/dbs/db.go +++ b/internal/utils/dbs/db.go @@ -18,6 +18,10 @@ import ( "time" ) +const ( + SyncMode = "NORMAL" +) + var errDBIsClosed = errors.New("the database is closed") type DB struct { @@ -199,6 +203,7 @@ func (this *DB) Close() error { this.statusLocker.Unlock() // waiting for updating operations to finish + var maxLoops = 5_000 for { this.statusLocker.Lock() var countUpdating = this.countUpdating @@ -207,6 +212,11 @@ func (this *DB) Close() error { break } time.Sleep(1 * time.Millisecond) + + maxLoops-- + if maxLoops <= 0 { + break + } } for _, batch := range this.batches { diff --git a/internal/utils/dbs/db_test.go b/internal/utils/dbs/db_test.go index b5de615..c88791d 100644 --- a/internal/utils/dbs/db_test.go +++ b/internal/utils/dbs/db_test.go @@ -3,12 +3,13 @@ package dbs_test import ( + "github.com/TeaOSLab/EdgeNode/internal/utils/dbs" "net/url" "testing" ) func TestParseDSN(t *testing.T) { - var dsn = "file:/home/cache/p43/.indexes/db-3.db?cache=private&mode=ro&_journal_mode=WAL&_sync=NORMAL&_cache_size=88000" + var dsn = "file:/home/cache/p43/.indexes/db-3.db?cache=private&mode=ro&_journal_mode=WAL&_sync=" + dbs.SyncMode + "&_cache_size=88000" u, err := url.Parse(dsn) if err != nil { t.Fatal(err) diff --git a/internal/utils/dbs/stmt.go b/internal/utils/dbs/stmt.go index 7c34ff2..fa1ab7a 100644 --- a/internal/utils/dbs/stmt.go +++ b/internal/utils/dbs/stmt.go @@ -5,6 +5,7 @@ package dbs import ( "context" "database/sql" + fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" ) type Stmt struct { @@ -27,7 +28,7 @@ func (this *Stmt) EnableStat() { this.enableStat = true } -func (this *Stmt) ExecContext(ctx context.Context, args ...any) (sql.Result, error) { +func (this *Stmt) ExecContext(ctx context.Context, args ...any) (result sql.Result, err error) { // check database status if this.db.BeginUpdating() { defer this.db.EndUpdating() @@ -38,10 +39,13 @@ func (this *Stmt) ExecContext(ctx context.Context, args ...any) (sql.Result, err if this.enableStat { defer SharedQueryStatManager.AddQuery(this.query).End() } - return this.rawStmt.ExecContext(ctx, args...) + fsutils.WriteBegin() + result, err = this.rawStmt.ExecContext(ctx, args...) + fsutils.WriteEnd() + return } -func (this *Stmt) Exec(args ...any) (sql.Result, error) { +func (this *Stmt) Exec(args ...any) (result sql.Result, err error) { // check database status if this.db.BeginUpdating() { defer this.db.EndUpdating() @@ -52,7 +56,11 @@ func (this *Stmt) Exec(args ...any) (sql.Result, error) { if this.enableStat { defer SharedQueryStatManager.AddQuery(this.query).End() } - return this.rawStmt.Exec(args...) + + fsutils.WriteBegin() + result, err = this.rawStmt.Exec(args...) + fsutils.WriteEnd() + return } func (this *Stmt) QueryContext(ctx context.Context, args ...any) (*sql.Rows, error) {