对本地数据库文件进行加锁

This commit is contained in:
刘祥超
2023-03-07 16:22:32 +08:00
parent 44d1a2415c
commit d7e6da8d2c
8 changed files with 188 additions and 34 deletions

View File

@@ -7,9 +7,9 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/ttlcache" "github.com/TeaOSLab/EdgeNode/internal/ttlcache"
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
"github.com/TeaOSLab/EdgeNode/internal/utils/fnv" "github.com/TeaOSLab/EdgeNode/internal/utils/fnv"
"github.com/iwind/TeaGo/types" "github.com/iwind/TeaGo/types"
_ "github.com/mattn/go-sqlite3"
"os" "os"
"sync/atomic" "sync/atomic"
"time" "time"
@@ -450,7 +450,7 @@ func (this *FileList) UpgradeV3(oldDir string, brokenOnError bool) error {
remotelogs.Println("CACHE", "upgrading local database finished") remotelogs.Println("CACHE", "upgrading local database finished")
}() }()
db, err := sql.Open("sqlite3", "file:"+indexDBPath+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF") db, err := dbs.OpenWriter("file:" + indexDBPath + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF&_locking_mode=EXCLUSIVE")
if err != nil { if err != nil {
return err return err
} }

View File

@@ -3,7 +3,6 @@
package caches package caches
import ( import (
"database/sql"
"errors" "errors"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const" teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/goman"
@@ -82,14 +81,14 @@ func (this *FileListDB) Open(dbPath string) error {
} }
// write db // write db
writeDB, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=private&mode=rwc&_journal_mode=WAL&_sync=OFF&_cache_size="+types.String(cacheSize)+"&_secure_delete=FAST") writeDB, err := dbs.OpenWriter("file:" + dbPath + "?cache=private&mode=rwc&_journal_mode=WAL&_sync=OFF&_cache_size=" + types.String(cacheSize) + "&_secure_delete=FAST&_locking_mode=EXCLUSIVE")
if err != nil { if err != nil {
return errors.New("open write database failed: " + err.Error()) return errors.New("open write database failed: " + err.Error())
} }
writeDB.SetMaxOpenConns(1) writeDB.SetMaxOpenConns(1)
this.writeDB = dbs.NewDB(writeDB) this.writeDB = writeDB
// TODO 耗时过长,暂时不整理数据库 // TODO 耗时过长,暂时不整理数据库
// TODO 需要根据行数来判断是否VACUUM // TODO 需要根据行数来判断是否VACUUM
@@ -109,7 +108,7 @@ func (this *FileListDB) Open(dbPath string) error {
} }
} }
this.writeBatch = dbs.NewBatch(writeDB, 4) this.writeBatch = dbs.NewBatch(writeDB.RawDB(), 4)
this.writeBatch.OnFail(func(err error) { this.writeBatch.OnFail(func(err error) {
remotelogs.Warn("LIST_FILE_DB", "run batch failed: "+err.Error()+" ("+filepath.Base(this.dbPath)+")") remotelogs.Warn("LIST_FILE_DB", "run batch failed: "+err.Error()+" ("+filepath.Base(this.dbPath)+")")
}) })
@@ -124,14 +123,14 @@ func (this *FileListDB) Open(dbPath string) error {
} }
// read db // read db
readDB, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=private&mode=ro&_journal_mode=WAL&_sync=OFF&_cache_size="+types.String(cacheSize)) readDB, err := dbs.OpenReader("file:" + dbPath + "?cache=private&mode=ro&_journal_mode=WAL&_sync=OFF&_cache_size=" + types.String(cacheSize))
if err != nil { if err != nil {
return errors.New("open read database failed: " + err.Error()) return errors.New("open read database failed: " + err.Error())
} }
readDB.SetMaxOpenConns(runtime.NumCPU()) readDB.SetMaxOpenConns(runtime.NumCPU())
this.readDB = dbs.NewDB(readDB) this.readDB = readDB
if teaconst.EnableDBStat { if teaconst.EnableDBStat {
this.readDB.EnableStat(true) this.readDB.EnableStat(true)

View File

@@ -3,28 +3,27 @@
package iplibrary package iplibrary
import ( import (
"database/sql"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
_ "github.com/mattn/go-sqlite3"
"os" "os"
"path/filepath" "path/filepath"
"time" "time"
) )
type IPListDB struct { type IPListDB struct {
db *sql.DB db *dbs.DB
itemTableName string itemTableName string
deleteExpiredItemsStmt *sql.Stmt deleteExpiredItemsStmt *dbs.Stmt
deleteItemStmt *sql.Stmt deleteItemStmt *dbs.Stmt
insertItemStmt *sql.Stmt insertItemStmt *dbs.Stmt
selectItemsStmt *sql.Stmt selectItemsStmt *dbs.Stmt
selectMaxVersionStmt *sql.Stmt selectMaxVersionStmt *dbs.Stmt
cleanTicker *time.Ticker cleanTicker *time.Ticker
@@ -56,7 +55,7 @@ func (this *IPListDB) init() error {
var path = this.dir + "/ip_list.db" var path = this.dir + "/ip_list.db"
db, err := sql.Open("sqlite3", "file:"+path+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF") db, err := dbs.OpenWriter("file:" + path + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF&_locking_mode=EXCLUSIVE")
if err != nil { if err != nil {
return err return err
} }

View File

@@ -3,7 +3,6 @@
package metrics package metrics
import ( import (
"database/sql"
"encoding/json" "encoding/json"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
@@ -17,7 +16,6 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/zero" "github.com/TeaOSLab/EdgeNode/internal/zero"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/types" "github.com/iwind/TeaGo/types"
_ "github.com/mattn/go-sqlite3"
"os" "os"
"strconv" "strconv"
"sync" "sync"
@@ -50,11 +48,11 @@ type Task struct {
cleanVersion int32 cleanVersion int32
insertStatStmt *sql.Stmt insertStatStmt *dbs.Stmt
deleteByVersionStmt *sql.Stmt deleteByVersionStmt *dbs.Stmt
deleteByExpiresTimeStmt *sql.Stmt deleteByExpiresTimeStmt *dbs.Stmt
selectTopStmt *sql.Stmt selectTopStmt *dbs.Stmt
sumStmt *sql.Stmt sumStmt *dbs.Stmt
serverIdMap map[int64]zero.Zero // 所有的服务Ids serverIdMap map[int64]zero.Zero // 所有的服务Ids
timeMap map[string]zero.Zero // time => bool timeMap map[string]zero.Zero // time => bool
@@ -92,12 +90,12 @@ func (this *Task) Init() error {
var path = dir + "/metric." + types.String(this.item.Id) + ".db" var path = dir + "/metric." + types.String(this.item.Id) + ".db"
db, err := sql.Open("sqlite3", "file:"+path+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF") db, err := dbs.OpenWriter("file:" + path + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF&_locking_mode=EXCLUSIVE")
if err != nil { if err != nil {
return err return err
} }
db.SetMaxOpenConns(1) db.SetMaxOpenConns(1)
this.db = dbs.NewDB(db) this.db = db
// 恢复数据库 // 恢复数据库
var recoverEnv, _ = os.LookupEnv("EdgeRecover") var recoverEnv, _ = os.LookupEnv("EdgeRecover")

View File

@@ -3,13 +3,12 @@
package agents package agents
import ( import (
"database/sql"
"errors" "errors"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/types" "github.com/iwind/TeaGo/types"
_ "github.com/mattn/go-sqlite3"
"log" "log"
"os" "os"
"path/filepath" "path/filepath"
@@ -21,11 +20,11 @@ const (
) )
type DB struct { type DB struct {
db *sql.DB db *dbs.DB
path string path string
insertAgentIPStmt *sql.Stmt insertAgentIPStmt *dbs.Stmt
listAgentIPsStmt *sql.Stmt listAgentIPsStmt *dbs.Stmt
} }
func NewDB(path string) *DB { func NewDB(path string) *DB {
@@ -52,7 +51,7 @@ func (this *DB) Init() error {
} }
// TODO 思考 data.db 的数据安全性 // TODO 思考 data.db 的数据安全性
db, err := sql.Open("sqlite3", "file:"+this.path+"?cache=shared&mode=rwc&_journal_mode=WAL") db, err := dbs.OpenWriter("file:" + this.path + "?cache=shared&mode=rwc&_journal_mode=WAL&_locking_mode=EXCLUSIVE")
if err != nil { if err != nil {
return err return err
} }
@@ -135,7 +134,7 @@ func (this *DB) Close() error {
return nil return nil
} }
for _, stmt := range []*sql.Stmt{ for _, stmt := range []*dbs.Stmt{
this.insertAgentIPStmt, this.insertAgentIPStmt,
this.listAgentIPsStmt, this.listAgentIPsStmt,
} { } {

View File

@@ -7,14 +7,56 @@ import (
"database/sql" "database/sql"
"fmt" "fmt"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils/fileutils"
_ "github.com/mattn/go-sqlite3"
"strings"
) )
type DB struct { type DB struct {
rawDB *sql.DB locker *fileutils.Locker
rawDB *sql.DB
enableStat bool enableStat bool
} }
func OpenWriter(dsn string) (*DB, error) {
return open(dsn, true)
}
func OpenReader(dsn string) (*DB, error) {
return open(dsn, false)
}
func open(dsn string, lock bool) (*DB, error) {
// locker
var locker *fileutils.Locker
if lock {
var path = dsn
var queryIndex = strings.Index(dsn, "?")
if queryIndex >= 0 {
path = path[:queryIndex]
}
path = strings.TrimSpace(strings.TrimPrefix(path, "file:"))
locker = fileutils.NewLocker(path)
err := locker.Lock()
if err != nil {
remotelogs.Warn("DB", "lock '"+path+"' failed: "+err.Error())
locker = nil
}
}
// open
rawDB, err := sql.Open("sqlite3", dsn)
if err != nil {
return nil, err
}
var db = NewDB(rawDB)
db.locker = locker
return db, nil
}
func NewDB(rawDB *sql.DB) *DB { func NewDB(rawDB *sql.DB) *DB {
var db = &DB{ var db = &DB{
rawDB: rawDB, rawDB: rawDB,
@@ -30,6 +72,10 @@ func NewDB(rawDB *sql.DB) *DB {
return db return db
} }
func (this *DB) SetMaxOpenConns(n int) {
this.rawDB.SetMaxOpenConns(n)
}
func (this *DB) EnableStat(b bool) { func (this *DB) EnableStat(b bool) {
this.enableStat = b this.enableStat = b
} }
@@ -81,6 +127,13 @@ func (this *DB) QueryRow(query string, args ...interface{}) *sql.Row {
func (this *DB) Close() error { func (this *DB) Close() error {
events.Remove(fmt.Sprintf("db_%p", this)) events.Remove(fmt.Sprintf("db_%p", this))
defer func() {
if this.locker != nil {
_ = this.locker.Release()
}
}()
return this.rawDB.Close() return this.rawDB.Close()
} }

View File

@@ -0,0 +1,82 @@
// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
package fileutils
import (
"os"
"syscall"
"time"
)
type Locker struct {
path string
fp *os.File
}
func NewLocker(path string) *Locker {
return &Locker{
path: path + ".lock",
}
}
func (this *Locker) TryLock() (ok bool, err error) {
if this.fp == nil {
fp, err := os.OpenFile(this.path, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return false, err
}
this.fp = fp
}
return this.tryLock()
}
func (this *Locker) Lock() error {
if this.fp == nil {
fp, err := os.OpenFile(this.path, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return err
}
this.fp = fp
}
for {
b, err := this.tryLock()
if err != nil {
_ = this.fp.Close()
return err
}
if b {
return nil
}
time.Sleep(100 * time.Millisecond)
}
}
func (this *Locker) Release() error {
err := this.fp.Close()
if err != nil {
return err
}
this.fp = nil
return nil
}
func (this *Locker) tryLock() (ok bool, err error) {
err = syscall.Flock(int(this.fp.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
if err == nil {
return true, nil
}
errno, isErrNo := err.(syscall.Errno)
if !isErrNo {
return
}
if !errno.Temporary() {
return
}
err = nil // 不提示错误
return
}

View File

@@ -0,0 +1,24 @@
// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
package fileutils_test
import (
"github.com/TeaOSLab/EdgeNode/internal/utils/fileutils"
"testing"
)
func TestLocker_Lock(t *testing.T) {
var path = "/tmp/file-test"
var locker = fileutils.NewLocker(path)
err := locker.Lock()
if err != nil {
t.Fatal(err)
}
_ = locker.Release()
var locker2 = fileutils.NewLocker(path)
err = locker2.Lock()
if err != nil {
t.Fatal(err)
}
}