Files
EdgeNode/internal/utils/dbs/db.go

235 lines
4.5 KiB
Go
Raw Normal View History

// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package dbs
import (
"context"
"database/sql"
"errors"
"fmt"
2023-07-27 10:03:18 +08:00
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
2023-03-07 16:22:32 +08:00
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
2023-07-28 11:10:57 +08:00
"github.com/TeaOSLab/EdgeNode/internal/utils/fs"
2023-03-07 16:22:32 +08:00
_ "github.com/mattn/go-sqlite3"
"strings"
"sync"
"time"
)
var errDBIsClosed = errors.New("the database is closed")
type DB struct {
2023-07-28 11:10:57 +08:00
locker *fsutils.Locker
2023-03-07 16:22:32 +08:00
rawDB *sql.DB
2023-07-27 10:03:18 +08:00
dsn string
statusLocker sync.Mutex
countUpdating int32
isClosing bool
enableStat bool
batches []*Batch
}
2023-03-07 16:22:32 +08:00
func OpenWriter(dsn string) (*DB, error) {
return open(dsn, true)
}
func OpenReader(dsn string) (*DB, error) {
return open(dsn, false)
}
func open(dsn string, lock bool) (*DB, error) {
2023-07-27 10:03:18 +08:00
if teaconst.IsQuiting {
return nil, errors.New("can not open database when process is quiting")
}
2023-03-07 16:22:32 +08:00
// locker
2023-07-28 11:10:57 +08:00
var locker *fsutils.Locker
2023-03-07 16:22:32 +08:00
if lock {
var path = dsn
var queryIndex = strings.Index(dsn, "?")
if queryIndex >= 0 {
path = path[:queryIndex]
}
path = strings.TrimSpace(strings.TrimPrefix(path, "file:"))
2023-07-28 11:10:57 +08:00
locker = fsutils.NewLocker(path)
2023-03-07 16:22:32 +08:00
err := locker.Lock()
if err != nil {
remotelogs.Warn("DB", "lock '"+path+"' failed: "+err.Error())
locker = nil
}
}
// open
rawDB, err := sql.Open("sqlite3", dsn)
if err != nil {
return nil, err
}
2023-07-27 10:03:18 +08:00
var db = NewDB(rawDB, dsn)
2023-03-07 16:22:32 +08:00
db.locker = locker
return db, nil
}
2023-07-27 10:03:18 +08:00
func NewDB(rawDB *sql.DB, dsn string) *DB {
var db = &DB{
rawDB: rawDB,
2023-07-27 10:03:18 +08:00
dsn: dsn,
}
2022-09-26 16:14:24 +08:00
events.OnKey(events.EventQuit, fmt.Sprintf("db_%p", db), func() {
_ = db.Close()
2022-09-26 16:14:24 +08:00
})
events.OnKey(events.EventTerminated, fmt.Sprintf("db_%p", db), func() {
_ = db.Close()
})
return db
}
2023-03-07 16:22:32 +08:00
func (this *DB) SetMaxOpenConns(n int) {
this.rawDB.SetMaxOpenConns(n)
}
func (this *DB) EnableStat(b bool) {
this.enableStat = b
}
func (this *DB) Begin() (*sql.Tx, error) {
// check database status
if this.BeginUpdating() {
defer this.EndUpdating()
} else {
return nil, errDBIsClosed
}
return this.rawDB.Begin()
}
func (this *DB) Prepare(query string) (*Stmt, error) {
stmt, err := this.rawDB.Prepare(query)
if err != nil {
return nil, err
}
var s = NewStmt(this, stmt, query)
if this.enableStat {
s.EnableStat()
}
return s, nil
}
func (this *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
// check database status
if this.BeginUpdating() {
defer this.EndUpdating()
} else {
return nil, errDBIsClosed
}
if this.enableStat {
defer SharedQueryStatManager.AddQuery(query).End()
}
return this.rawDB.ExecContext(ctx, query, args...)
}
func (this *DB) Exec(query string, args ...interface{}) (sql.Result, error) {
// check database status
if this.BeginUpdating() {
defer this.EndUpdating()
} else {
return nil, errDBIsClosed
}
if this.enableStat {
defer SharedQueryStatManager.AddQuery(query).End()
}
return this.rawDB.Exec(query, args...)
}
func (this *DB) Query(query string, args ...interface{}) (*sql.Rows, error) {
if this.enableStat {
defer SharedQueryStatManager.AddQuery(query).End()
}
return this.rawDB.Query(query, args...)
}
func (this *DB) QueryRow(query string, args ...interface{}) *sql.Row {
if this.enableStat {
defer SharedQueryStatManager.AddQuery(query).End()
}
return this.rawDB.QueryRow(query, args...)
}
// Close the database
func (this *DB) Close() error {
// check database status
this.statusLocker.Lock()
if this.isClosing {
this.statusLocker.Unlock()
return nil
}
this.isClosing = true
this.statusLocker.Unlock()
// waiting for updating operations to finish
for {
this.statusLocker.Lock()
var countUpdating = this.countUpdating
this.statusLocker.Unlock()
if countUpdating <= 0 {
break
}
time.Sleep(1 * time.Millisecond)
}
for _, batch := range this.batches {
batch.close()
}
events.Remove(fmt.Sprintf("db_%p", this))
2023-03-07 16:22:32 +08:00
defer func() {
if this.locker != nil {
_ = this.locker.Release()
}
}()
2023-07-27 10:03:18 +08:00
// print log
2023-08-01 15:36:04 +08:00
/**if len(this.dsn) > 0 {
2023-07-27 10:03:18 +08:00
u, _ := url.Parse(this.dsn)
if u != nil && len(u.Path) > 0 {
remotelogs.Debug("DB", "close '"+u.Path)
}
2023-08-01 15:36:04 +08:00
}**/
2023-07-27 10:03:18 +08:00
return this.rawDB.Close()
}
func (this *DB) BeginUpdating() bool {
this.statusLocker.Lock()
defer this.statusLocker.Unlock()
if this.isClosing {
return false
}
this.countUpdating++
return true
}
func (this *DB) EndUpdating() {
this.statusLocker.Lock()
this.countUpdating--
this.statusLocker.Unlock()
}
func (this *DB) RawDB() *sql.DB {
return this.rawDB
}