diff --git a/internal/utils/agents/agent_ip.go b/internal/utils/agents/agent_ip.go index 3283d24..81e0aab 100644 --- a/internal/utils/agents/agent_ip.go +++ b/internal/utils/agents/agent_ip.go @@ -3,7 +3,7 @@ package agents type AgentIP struct { - Id int64 - IP string - AgentCode string + Id int64 `json:"id"` + IP string `json:"ip"` + AgentCode string `json:"agentCode"` } diff --git a/internal/utils/agents/db.go b/internal/utils/agents/db.go index 55a34d7..7ffa388 100644 --- a/internal/utils/agents/db.go +++ b/internal/utils/agents/db.go @@ -1,160 +1,9 @@ -// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . package agents -import ( - "errors" - "github.com/TeaOSLab/EdgeNode/internal/events" - "github.com/TeaOSLab/EdgeNode/internal/remotelogs" - "github.com/TeaOSLab/EdgeNode/internal/utils/dbs" - "github.com/iwind/TeaGo/Tea" - "github.com/iwind/TeaGo/types" - "log" - "os" - "path/filepath" - "strings" -) - -const ( - tableAgentIPs = "agentIPs" -) - -type DB struct { - db *dbs.DB - path string - - insertAgentIPStmt *dbs.Stmt - listAgentIPsStmt *dbs.Stmt -} - -func NewDB(path string) *DB { - var db = &DB{path: path} - - events.OnClose(func() { - _ = db.Close() - }) - - return db -} - -func (this *DB) Init() error { - // 检查目录是否存在 - var dir = filepath.Dir(this.path) - - _, err := os.Stat(dir) - if err != nil { - err = os.MkdirAll(dir, 0777) - if err != nil { - return err - } - remotelogs.Println("DB", "create database dir '"+dir+"'") - } - - // TODO 思考 data.db 的数据安全性 - db, err := dbs.OpenWriter("file:" + this.path + "?cache=shared&mode=rwc&_journal_mode=WAL&_locking_mode=EXCLUSIVE") - if err != nil { - return err - } - db.SetMaxOpenConns(1) - - /**_, err = db.Exec("VACUUM") - if err != nil { - return err - }**/ - - _, err = db.Exec(`CREATE TABLE IF NOT EXISTS "` + tableAgentIPs + `" ( - "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, - "ip" varchar(64), - "agentCode" varchar(128) -);`) - if err != nil { - return err - } - - // 预编译语句 - - // agent ip record statements - this.insertAgentIPStmt, err = db.Prepare(`INSERT INTO "` + tableAgentIPs + `" ("id", "ip", "agentCode") VALUES (?, ?, ?)`) - if err != nil { - return err - } - - this.listAgentIPsStmt, err = db.Prepare(`SELECT "id", "ip", "agentCode" FROM "` + tableAgentIPs + `" ORDER BY "id" ASC LIMIT ? OFFSET ?`) - if err != nil { - return err - } - - this.db = db - - return nil -} - -func (this *DB) InsertAgentIP(ipId int64, ip string, agentCode string) error { - if this.db == nil { - return errors.New("db should not be nil") - } - - _, err := this.insertAgentIPStmt.Exec(ipId, ip, agentCode) - if err != nil { - // 不提示ID重复错误 - if strings.Contains(err.Error(), "UNIQUE constraint") { - return nil - } - - return err - } - - return nil -} - -func (this *DB) ListAgentIPs(offset int64, size int64) (agentIPs []*AgentIP, err error) { - if this.db == nil { - return nil, errors.New("db should not be nil") - } - rows, err := this.listAgentIPsStmt.Query(size, offset) - if err != nil { - return nil, err - } - defer func() { - _ = rows.Close() - }() - for rows.Next() { - var agentIP = &AgentIP{} - err = rows.Scan(&agentIP.Id, &agentIP.IP, &agentIP.AgentCode) - if err != nil { - return nil, err - } - agentIPs = append(agentIPs, agentIP) - } - return -} - -func (this *DB) Close() error { - if this.db == nil { - return nil - } - - for _, stmt := range []*dbs.Stmt{ - this.insertAgentIPStmt, - this.listAgentIPsStmt, - } { - if stmt != nil { - _ = stmt.Close() - } - } - - return this.db.Close() -} - -// 打印日志 -func (this *DB) log(args ...any) { - if !Tea.IsTesting() { - return - } - if len(args) == 0 { - return - } - - args[0] = "[" + types.String(args[0]) + "]" - log.Println(args...) +type DB interface { + Init() error + InsertAgentIP(ipId int64, ip string, agentCode string) error + ListAgentIPs(offset int64, size int64) (agentIPs []*AgentIP, err error) } diff --git a/internal/utils/agents/db_kv.go b/internal/utils/agents/db_kv.go new file mode 100644 index 0000000..ab52971 --- /dev/null +++ b/internal/utils/agents/db_kv.go @@ -0,0 +1,93 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package agents + +import ( + "errors" + "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/utils/kvstore" +) + +type KVDB struct { + table *kvstore.Table[*AgentIP] + encoder *AgentIPEncoder[*AgentIP] + lastKey string +} + +func NewKVDB() *KVDB { + var db = &KVDB{} + + events.OnClose(func() { + _ = db.Close() + }) + + return db +} + +func (this *KVDB) Init() error { + store, err := kvstore.DefaultStore() + if err != nil { + return err + } + + db, err := store.NewDB("agents") + if err != nil { + return err + } + + { + this.encoder = &AgentIPEncoder[*AgentIP]{} + table, tableErr := kvstore.NewTable[*AgentIP]("agent_ips", this.encoder) + if tableErr != nil { + return tableErr + } + db.AddTable(table) + this.table = table + } + + return nil +} + +func (this *KVDB) InsertAgentIP(ipId int64, ip string, agentCode string) error { + if this.table == nil { + return errors.New("table should not be nil") + } + + var item = &AgentIP{ + Id: ipId, + IP: ip, + AgentCode: agentCode, + } + var key = this.encoder.EncodeKey(item) + return this.table.Set(key, item) +} + +func (this *KVDB) ListAgentIPs(offset int64, size int64) (agentIPs []*AgentIP, err error) { + if this.table == nil { + return nil, errors.New("table should not be nil") + } + + err = this.table. + Query(). + Limit(int(size)). + Offset(this.lastKey). + FindAll(func(tx *kvstore.Tx[*AgentIP], item kvstore.Item[*AgentIP]) (goNext bool, err error) { + this.lastKey = item.Key + agentIPs = append(agentIPs, item.Value) + return true, nil + }) + + return +} + +func (this *KVDB) Close() error { + return nil +} + +func (this *KVDB) Flush() error { + if this.table == nil { + return errors.New("table should not be nil") + } + + return this.table.DB().Store().Flush() +} diff --git a/internal/utils/agents/db_kv_objects.go b/internal/utils/agents/db_kv_objects.go new file mode 100644 index 0000000..25f263b --- /dev/null +++ b/internal/utils/agents/db_kv_objects.go @@ -0,0 +1,36 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package agents + +import ( + "encoding/binary" + "encoding/json" + "errors" +) + +type AgentIPEncoder[T interface{ *AgentIP }] struct { +} + +func (this *AgentIPEncoder[T]) Encode(value T) ([]byte, error) { + return json.Marshal(value) +} + +func (this *AgentIPEncoder[T]) EncodeField(value T, fieldName string) ([]byte, error) { + return nil, errors.New("invalid field name '" + fieldName + "'") +} + +func (this *AgentIPEncoder[T]) Decode(valueBytes []byte) (value T, err error) { + err = json.Unmarshal(valueBytes, &value) + return +} + +// EncodeKey generate key for ip item +func (this *AgentIPEncoder[T]) EncodeKey(item *AgentIP) string { + var b = make([]byte, 8) + if item.Id < 0 { + item.Id = 0 + } + + binary.BigEndian.PutUint64(b, uint64(item.Id)) + return string(b) +} diff --git a/internal/utils/agents/db_kv_test.go b/internal/utils/agents/db_kv_test.go new file mode 100644 index 0000000..446e83a --- /dev/null +++ b/internal/utils/agents/db_kv_test.go @@ -0,0 +1,53 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package agents_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/utils/agents" + "strconv" + "testing" +) + +func TestKVDB_InsertAgentIP(t *testing.T) { + var db = agents.NewKVDB() + err := db.Init() + if err != nil { + t.Fatal(err) + } + + defer func() { + _ = db.Flush() + }() + + for i := 1; i <= 5; i++ { + err = db.InsertAgentIP(int64(i), "192.168.2."+strconv.Itoa(i), "example") + if err != nil { + t.Fatal(err) + } + } +} + +func TestKVDB_ListAgentIPs(t *testing.T) { + var db = agents.NewKVDB() + err := db.Init() + if err != nil { + t.Fatal(err) + } + + const count = 10 + + for { + agentIPs, listErr := db.ListAgentIPs(0, count) + if listErr != nil { + t.Fatal(listErr) + } + t.Log("===") + for _, agentIP := range agentIPs { + t.Logf("%+v", agentIP) + } + + if len(agentIPs) < count { + break + } + } +} diff --git a/internal/utils/agents/db_sqlite.go b/internal/utils/agents/db_sqlite.go new file mode 100644 index 0000000..7dc219b --- /dev/null +++ b/internal/utils/agents/db_sqlite.go @@ -0,0 +1,160 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package agents + +import ( + "errors" + "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/TeaOSLab/EdgeNode/internal/utils/dbs" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/types" + "log" + "os" + "path/filepath" + "strings" +) + +const ( + tableAgentIPs = "agentIPs" +) + +type SQLiteDB struct { + db *dbs.DB + path string + + insertAgentIPStmt *dbs.Stmt + listAgentIPsStmt *dbs.Stmt +} + +func NewSQLiteDB(path string) *SQLiteDB { + var db = &SQLiteDB{path: path} + + events.OnClose(func() { + _ = db.Close() + }) + + return db +} + +func (this *SQLiteDB) Init() error { + // 检查目录是否存在 + var dir = filepath.Dir(this.path) + + _, err := os.Stat(dir) + if err != nil { + err = os.MkdirAll(dir, 0777) + if err != nil { + return err + } + remotelogs.Println("DB", "create database dir '"+dir+"'") + } + + // TODO 思考 data.db 的数据安全性 + db, err := dbs.OpenWriter("file:" + this.path + "?cache=shared&mode=rwc&_journal_mode=WAL&_locking_mode=EXCLUSIVE") + if err != nil { + return err + } + db.SetMaxOpenConns(1) + + /**_, err = db.Exec("VACUUM") + if err != nil { + return err + }**/ + + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS "` + tableAgentIPs + `" ( + "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, + "ip" varchar(64), + "agentCode" varchar(128) +);`) + if err != nil { + return err + } + + // 预编译语句 + + // agent ip record statements + this.insertAgentIPStmt, err = db.Prepare(`INSERT INTO "` + tableAgentIPs + `" ("id", "ip", "agentCode") VALUES (?, ?, ?)`) + if err != nil { + return err + } + + this.listAgentIPsStmt, err = db.Prepare(`SELECT "id", "ip", "agentCode" FROM "` + tableAgentIPs + `" ORDER BY "id" ASC LIMIT ? OFFSET ?`) + if err != nil { + return err + } + + this.db = db + + return nil +} + +func (this *SQLiteDB) InsertAgentIP(ipId int64, ip string, agentCode string) error { + if this.db == nil { + return errors.New("db should not be nil") + } + + _, err := this.insertAgentIPStmt.Exec(ipId, ip, agentCode) + if err != nil { + // 不提示ID重复错误 + if strings.Contains(err.Error(), "UNIQUE constraint") { + return nil + } + + return err + } + + return nil +} + +func (this *SQLiteDB) ListAgentIPs(offset int64, size int64) (agentIPs []*AgentIP, err error) { + if this.db == nil { + return nil, errors.New("db should not be nil") + } + rows, err := this.listAgentIPsStmt.Query(size, offset) + if err != nil { + return nil, err + } + defer func() { + _ = rows.Close() + }() + for rows.Next() { + var agentIP = &AgentIP{} + err = rows.Scan(&agentIP.Id, &agentIP.IP, &agentIP.AgentCode) + if err != nil { + return nil, err + } + agentIPs = append(agentIPs, agentIP) + } + return +} + +func (this *SQLiteDB) Close() error { + if this.db == nil { + return nil + } + + for _, stmt := range []*dbs.Stmt{ + this.insertAgentIPStmt, + this.listAgentIPsStmt, + } { + if stmt != nil { + _ = stmt.Close() + } + } + + return this.db.Close() +} + +// 打印日志 +func (this *SQLiteDB) log(args ...any) { + if !Tea.IsTesting() { + return + } + if len(args) == 0 { + return + } + + args[0] = "[" + types.String(args[0]) + "]" + log.Println(args...) +} diff --git a/internal/utils/agents/manager.go b/internal/utils/agents/manager.go index 52a65ce..e669d22 100644 --- a/internal/utils/agents/manager.go +++ b/internal/utils/agents/manager.go @@ -10,6 +10,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/iwind/TeaGo/Tea" + "os" "sync" "time" ) @@ -33,7 +34,7 @@ type Manager struct { ipMap map[string]string // ip => agentCode locker sync.RWMutex - db *DB + db DB lastId int64 } @@ -44,7 +45,7 @@ func NewManager() *Manager { } } -func (this *Manager) SetDB(db *DB) { +func (this *Manager) SetDB(db DB) { this.db = db } @@ -195,7 +196,14 @@ func (this *Manager) ContainsIP(ip string) bool { } func (this *Manager) loadDB() error { - var db = NewDB(Tea.Root + "/data/agents.db") + var sqlitePath = Tea.Root + "/data/agents.db" + _, sqliteErr := os.Stat(sqlitePath) + var db DB + if sqliteErr == nil { + db = NewSQLiteDB(sqlitePath) + } else { + db = NewKVDB() + } err := db.Init() if err != nil { return err diff --git a/internal/utils/agents/manager_test.go b/internal/utils/agents/manager_test.go index 3995260..7164d39 100644 --- a/internal/utils/agents/manager_test.go +++ b/internal/utils/agents/manager_test.go @@ -15,7 +15,7 @@ func TestNewManager(t *testing.T) { return } - var db = agents.NewDB(Tea.Root + "/data/agents.db") + var db = agents.NewSQLiteDB(Tea.Root + "/data/agents.db") err := db.Init() if err != nil { t.Fatal(err) diff --git a/internal/utils/kvstore/store.go b/internal/utils/kvstore/store.go index 7063c36..6137e0f 100644 --- a/internal/utils/kvstore/store.go +++ b/internal/utils/kvstore/store.go @@ -4,6 +4,7 @@ package kvstore import ( "errors" + "fmt" "github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" @@ -38,16 +39,16 @@ func NewStore(storeName string) (*Store, error) { return nil, errors.New("invalid store name '" + storeName + "'") } - var root = Tea.Root + "/data/stores" - _, err := os.Stat(root) + var path = Tea.Root + "/data/stores/" + storeName + StoreSuffix + _, err := os.Stat(path) if err != nil && os.IsNotExist(err) { - _ = os.MkdirAll(root, 0777) + _ = os.MkdirAll(path, 0777) } return &Store{ name: storeName, - path: Tea.Root + "/data/stores/" + storeName + StoreSuffix, - locker: fsutils.NewLocker(Tea.Root + "/data/stores/" + storeName + StoreSuffix + "/.fs"), + path: path, + locker: fsutils.NewLocker(path + "/.fs"), }, nil } @@ -96,21 +97,24 @@ func DefaultStore() (*Store, error) { return defaultSore, nil } + var resultErr error storeOnce.Do(func() { store, err := NewStore("default") if err != nil { - remotelogs.Error("KV", "create default store failed: "+err.Error()) + resultErr = fmt.Errorf("create default store failed: %w", err) + remotelogs.Error("KV", resultErr.Error()) return } err = store.Open() if err != nil { - remotelogs.Error("KV", "open default store failed: "+err.Error()) + resultErr = fmt.Errorf("open default store failed: %w", err) + remotelogs.Error("KV", resultErr.Error()) return } defaultSore = store }) - return defaultSore, nil + return defaultSore, resultErr } func (this *Store) Open() error {