diff --git a/internal/caches/list_file.go b/internal/caches/list_file.go index b257a16..4812d8a 100644 --- a/internal/caches/list_file.go +++ b/internal/caches/list_file.go @@ -7,9 +7,9 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/ttlcache" + "github.com/TeaOSLab/EdgeNode/internal/utils/dbs" "github.com/TeaOSLab/EdgeNode/internal/utils/fnv" "github.com/iwind/TeaGo/types" - _ "github.com/mattn/go-sqlite3" "os" "sync/atomic" "time" @@ -450,7 +450,7 @@ func (this *FileList) UpgradeV3(oldDir string, brokenOnError bool) error { remotelogs.Println("CACHE", "upgrading local database finished") }() - db, err := sql.Open("sqlite3", "file:"+indexDBPath+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF") + db, err := dbs.OpenWriter("file:" + indexDBPath + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF&_locking_mode=EXCLUSIVE") if err != nil { return err } diff --git a/internal/caches/list_file_db.go b/internal/caches/list_file_db.go index 7c3ae32..3e25117 100644 --- a/internal/caches/list_file_db.go +++ b/internal/caches/list_file_db.go @@ -3,7 +3,6 @@ package caches import ( - "database/sql" "errors" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/goman" @@ -82,14 +81,14 @@ func (this *FileListDB) Open(dbPath string) error { } // write db - writeDB, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=private&mode=rwc&_journal_mode=WAL&_sync=OFF&_cache_size="+types.String(cacheSize)+"&_secure_delete=FAST") + writeDB, err := dbs.OpenWriter("file:" + dbPath + "?cache=private&mode=rwc&_journal_mode=WAL&_sync=OFF&_cache_size=" + types.String(cacheSize) + "&_secure_delete=FAST&_locking_mode=EXCLUSIVE") if err != nil { return errors.New("open write database failed: " + err.Error()) } writeDB.SetMaxOpenConns(1) - this.writeDB = dbs.NewDB(writeDB) + this.writeDB = writeDB // TODO 耗时过长,暂时不整理数据库 // TODO 需要根据行数来判断是否VACUUM @@ -109,7 +108,7 @@ func (this *FileListDB) Open(dbPath string) error { } } - this.writeBatch = dbs.NewBatch(writeDB, 4) + this.writeBatch = dbs.NewBatch(writeDB.RawDB(), 4) this.writeBatch.OnFail(func(err error) { remotelogs.Warn("LIST_FILE_DB", "run batch failed: "+err.Error()+" ("+filepath.Base(this.dbPath)+")") }) @@ -124,14 +123,14 @@ func (this *FileListDB) Open(dbPath string) error { } // read db - readDB, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=private&mode=ro&_journal_mode=WAL&_sync=OFF&_cache_size="+types.String(cacheSize)) + readDB, err := dbs.OpenReader("file:" + dbPath + "?cache=private&mode=ro&_journal_mode=WAL&_sync=OFF&_cache_size=" + types.String(cacheSize)) if err != nil { return errors.New("open read database failed: " + err.Error()) } readDB.SetMaxOpenConns(runtime.NumCPU()) - this.readDB = dbs.NewDB(readDB) + this.readDB = readDB if teaconst.EnableDBStat { this.readDB.EnableStat(true) diff --git a/internal/iplibrary/ip_list_db.go b/internal/iplibrary/ip_list_db.go index 37e6dc0..4278b85 100644 --- a/internal/iplibrary/ip_list_db.go +++ b/internal/iplibrary/ip_list_db.go @@ -3,28 +3,27 @@ package iplibrary import ( - "database/sql" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/TeaOSLab/EdgeNode/internal/utils/dbs" "github.com/iwind/TeaGo/Tea" - _ "github.com/mattn/go-sqlite3" "os" "path/filepath" "time" ) type IPListDB struct { - db *sql.DB + db *dbs.DB itemTableName string - deleteExpiredItemsStmt *sql.Stmt - deleteItemStmt *sql.Stmt - insertItemStmt *sql.Stmt - selectItemsStmt *sql.Stmt - selectMaxVersionStmt *sql.Stmt + deleteExpiredItemsStmt *dbs.Stmt + deleteItemStmt *dbs.Stmt + insertItemStmt *dbs.Stmt + selectItemsStmt *dbs.Stmt + selectMaxVersionStmt *dbs.Stmt cleanTicker *time.Ticker @@ -56,7 +55,7 @@ func (this *IPListDB) init() error { var path = this.dir + "/ip_list.db" - db, err := sql.Open("sqlite3", "file:"+path+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF") + db, err := dbs.OpenWriter("file:" + path + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF&_locking_mode=EXCLUSIVE") if err != nil { return err } diff --git a/internal/metrics/task.go b/internal/metrics/task.go index 5bc33b6..ec5b551 100644 --- a/internal/metrics/task.go +++ b/internal/metrics/task.go @@ -3,7 +3,6 @@ package metrics import ( - "database/sql" "encoding/json" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" @@ -17,7 +16,6 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/zero" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/types" - _ "github.com/mattn/go-sqlite3" "os" "strconv" "sync" @@ -50,11 +48,11 @@ type Task struct { cleanVersion int32 - insertStatStmt *sql.Stmt - deleteByVersionStmt *sql.Stmt - deleteByExpiresTimeStmt *sql.Stmt - selectTopStmt *sql.Stmt - sumStmt *sql.Stmt + insertStatStmt *dbs.Stmt + deleteByVersionStmt *dbs.Stmt + deleteByExpiresTimeStmt *dbs.Stmt + selectTopStmt *dbs.Stmt + sumStmt *dbs.Stmt serverIdMap map[int64]zero.Zero // 所有的服务Ids timeMap map[string]zero.Zero // time => bool @@ -92,12 +90,12 @@ func (this *Task) Init() error { var path = dir + "/metric." + types.String(this.item.Id) + ".db" - db, err := sql.Open("sqlite3", "file:"+path+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF") + db, err := dbs.OpenWriter("file:" + path + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF&_locking_mode=EXCLUSIVE") if err != nil { return err } db.SetMaxOpenConns(1) - this.db = dbs.NewDB(db) + this.db = db // 恢复数据库 var recoverEnv, _ = os.LookupEnv("EdgeRecover") diff --git a/internal/utils/agents/db.go b/internal/utils/agents/db.go index 5798f33..46624b3 100644 --- a/internal/utils/agents/db.go +++ b/internal/utils/agents/db.go @@ -3,13 +3,12 @@ package agents import ( - "database/sql" "errors" "github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/TeaOSLab/EdgeNode/internal/utils/dbs" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/types" - _ "github.com/mattn/go-sqlite3" "log" "os" "path/filepath" @@ -21,11 +20,11 @@ const ( ) type DB struct { - db *sql.DB + db *dbs.DB path string - insertAgentIPStmt *sql.Stmt - listAgentIPsStmt *sql.Stmt + insertAgentIPStmt *dbs.Stmt + listAgentIPsStmt *dbs.Stmt } func NewDB(path string) *DB { @@ -52,7 +51,7 @@ func (this *DB) Init() error { } // TODO 思考 data.db 的数据安全性 - db, err := sql.Open("sqlite3", "file:"+this.path+"?cache=shared&mode=rwc&_journal_mode=WAL") + db, err := dbs.OpenWriter("file:" + this.path + "?cache=shared&mode=rwc&_journal_mode=WAL&_locking_mode=EXCLUSIVE") if err != nil { return err } @@ -135,7 +134,7 @@ func (this *DB) Close() error { return nil } - for _, stmt := range []*sql.Stmt{ + for _, stmt := range []*dbs.Stmt{ this.insertAgentIPStmt, this.listAgentIPsStmt, } { diff --git a/internal/utils/dbs/db.go b/internal/utils/dbs/db.go index 044883a..365d847 100644 --- a/internal/utils/dbs/db.go +++ b/internal/utils/dbs/db.go @@ -7,14 +7,56 @@ import ( "database/sql" "fmt" "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/TeaOSLab/EdgeNode/internal/utils/fileutils" + _ "github.com/mattn/go-sqlite3" + "strings" ) type DB struct { - rawDB *sql.DB + locker *fileutils.Locker + rawDB *sql.DB enableStat bool } +func OpenWriter(dsn string) (*DB, error) { + return open(dsn, true) +} + +func OpenReader(dsn string) (*DB, error) { + return open(dsn, false) +} + +func open(dsn string, lock bool) (*DB, error) { + // locker + var locker *fileutils.Locker + if lock { + var path = dsn + var queryIndex = strings.Index(dsn, "?") + if queryIndex >= 0 { + path = path[:queryIndex] + } + path = strings.TrimSpace(strings.TrimPrefix(path, "file:")) + locker = fileutils.NewLocker(path) + err := locker.Lock() + if err != nil { + remotelogs.Warn("DB", "lock '"+path+"' failed: "+err.Error()) + locker = nil + } + } + + // open + rawDB, err := sql.Open("sqlite3", dsn) + if err != nil { + return nil, err + } + + var db = NewDB(rawDB) + db.locker = locker + return db, nil +} + func NewDB(rawDB *sql.DB) *DB { var db = &DB{ rawDB: rawDB, @@ -30,6 +72,10 @@ func NewDB(rawDB *sql.DB) *DB { return db } +func (this *DB) SetMaxOpenConns(n int) { + this.rawDB.SetMaxOpenConns(n) +} + func (this *DB) EnableStat(b bool) { this.enableStat = b } @@ -81,6 +127,13 @@ func (this *DB) QueryRow(query string, args ...interface{}) *sql.Row { func (this *DB) Close() error { events.Remove(fmt.Sprintf("db_%p", this)) + + defer func() { + if this.locker != nil { + _ = this.locker.Release() + } + }() + return this.rawDB.Close() } diff --git a/internal/utils/fileutils/locker.go b/internal/utils/fileutils/locker.go new file mode 100644 index 0000000..e48b5a1 --- /dev/null +++ b/internal/utils/fileutils/locker.go @@ -0,0 +1,82 @@ +// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package fileutils + +import ( + "os" + "syscall" + "time" +) + +type Locker struct { + path string + fp *os.File +} + +func NewLocker(path string) *Locker { + return &Locker{ + path: path + ".lock", + } +} + +func (this *Locker) TryLock() (ok bool, err error) { + if this.fp == nil { + fp, err := os.OpenFile(this.path, os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + return false, err + } + this.fp = fp + } + return this.tryLock() +} + +func (this *Locker) Lock() error { + if this.fp == nil { + fp, err := os.OpenFile(this.path, os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + return err + } + this.fp = fp + } + + for { + b, err := this.tryLock() + if err != nil { + _ = this.fp.Close() + return err + } + if b { + return nil + } + time.Sleep(100 * time.Millisecond) + } +} + +func (this *Locker) Release() error { + err := this.fp.Close() + if err != nil { + return err + } + this.fp = nil + return nil +} + +func (this *Locker) tryLock() (ok bool, err error) { + err = syscall.Flock(int(this.fp.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) + if err == nil { + return true, nil + } + + errno, isErrNo := err.(syscall.Errno) + if !isErrNo { + return + } + + if !errno.Temporary() { + return + } + + err = nil // 不提示错误 + + return +} diff --git a/internal/utils/fileutils/locker_test.go b/internal/utils/fileutils/locker_test.go new file mode 100644 index 0000000..d6b98d3 --- /dev/null +++ b/internal/utils/fileutils/locker_test.go @@ -0,0 +1,24 @@ +// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package fileutils_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/utils/fileutils" + "testing" +) + +func TestLocker_Lock(t *testing.T) { + var path = "/tmp/file-test" + var locker = fileutils.NewLocker(path) + err := locker.Lock() + if err != nil { + t.Fatal(err) + } + _ = locker.Release() + + var locker2 = fileutils.NewLocker(path) + err = locker2.Lock() + if err != nil { + t.Fatal(err) + } +}