mirror of
				https://github.com/TeaOSLab/EdgeNode.git
				synced 2025-11-04 07:40:56 +08:00 
			
		
		
		
	Agent识别库增加KV存储
This commit is contained in:
		@@ -3,7 +3,7 @@
 | 
			
		||||
package agents
 | 
			
		||||
 | 
			
		||||
type AgentIP struct {
 | 
			
		||||
	Id        int64
 | 
			
		||||
	IP        string
 | 
			
		||||
	AgentCode string
 | 
			
		||||
	Id        int64  `json:"id"`
 | 
			
		||||
	IP        string `json:"ip"`
 | 
			
		||||
	AgentCode string `json:"agentCode"`
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -1,160 +1,9 @@
 | 
			
		||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
 | 
			
		||||
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
 | 
			
		||||
 | 
			
		||||
package agents
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"errors"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/events"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
 | 
			
		||||
	"github.com/iwind/TeaGo/Tea"
 | 
			
		||||
	"github.com/iwind/TeaGo/types"
 | 
			
		||||
	"log"
 | 
			
		||||
	"os"
 | 
			
		||||
	"path/filepath"
 | 
			
		||||
	"strings"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	tableAgentIPs = "agentIPs"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type DB struct {
 | 
			
		||||
	db   *dbs.DB
 | 
			
		||||
	path string
 | 
			
		||||
 | 
			
		||||
	insertAgentIPStmt *dbs.Stmt
 | 
			
		||||
	listAgentIPsStmt  *dbs.Stmt
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewDB(path string) *DB {
 | 
			
		||||
	var db = &DB{path: path}
 | 
			
		||||
 | 
			
		||||
	events.OnClose(func() {
 | 
			
		||||
		_ = db.Close()
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	return db
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *DB) Init() error {
 | 
			
		||||
	// 检查目录是否存在
 | 
			
		||||
	var dir = filepath.Dir(this.path)
 | 
			
		||||
 | 
			
		||||
	_, err := os.Stat(dir)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		err = os.MkdirAll(dir, 0777)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		remotelogs.Println("DB", "create database dir '"+dir+"'")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// TODO 思考 data.db 的数据安全性
 | 
			
		||||
	db, err := dbs.OpenWriter("file:" + this.path + "?cache=shared&mode=rwc&_journal_mode=WAL&_locking_mode=EXCLUSIVE")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	db.SetMaxOpenConns(1)
 | 
			
		||||
 | 
			
		||||
	/**_, err = db.Exec("VACUUM")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}**/
 | 
			
		||||
 | 
			
		||||
	_, err = db.Exec(`CREATE TABLE IF NOT EXISTS "` + tableAgentIPs + `" (
 | 
			
		||||
  "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
 | 
			
		||||
  "ip" varchar(64),
 | 
			
		||||
  "agentCode" varchar(128)
 | 
			
		||||
);`)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 预编译语句
 | 
			
		||||
 | 
			
		||||
	// agent ip record statements
 | 
			
		||||
	this.insertAgentIPStmt, err = db.Prepare(`INSERT INTO "` + tableAgentIPs + `" ("id", "ip", "agentCode") VALUES (?, ?, ?)`)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	this.listAgentIPsStmt, err = db.Prepare(`SELECT "id", "ip", "agentCode" FROM "` + tableAgentIPs + `" ORDER BY "id" ASC LIMIT ? OFFSET ?`)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	this.db = db
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *DB) InsertAgentIP(ipId int64, ip string, agentCode string) error {
 | 
			
		||||
	if this.db == nil {
 | 
			
		||||
		return errors.New("db should not be nil")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	_, err := this.insertAgentIPStmt.Exec(ipId, ip, agentCode)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		// 不提示ID重复错误
 | 
			
		||||
		if strings.Contains(err.Error(), "UNIQUE constraint") {
 | 
			
		||||
			return nil
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *DB) ListAgentIPs(offset int64, size int64) (agentIPs []*AgentIP, err error) {
 | 
			
		||||
	if this.db == nil {
 | 
			
		||||
		return nil, errors.New("db should not be nil")
 | 
			
		||||
	}
 | 
			
		||||
	rows, err := this.listAgentIPsStmt.Query(size, offset)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	defer func() {
 | 
			
		||||
		_ = rows.Close()
 | 
			
		||||
	}()
 | 
			
		||||
	for rows.Next() {
 | 
			
		||||
		var agentIP = &AgentIP{}
 | 
			
		||||
		err = rows.Scan(&agentIP.Id, &agentIP.IP, &agentIP.AgentCode)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		agentIPs = append(agentIPs, agentIP)
 | 
			
		||||
	}
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *DB) Close() error {
 | 
			
		||||
	if this.db == nil {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, stmt := range []*dbs.Stmt{
 | 
			
		||||
		this.insertAgentIPStmt,
 | 
			
		||||
		this.listAgentIPsStmt,
 | 
			
		||||
	} {
 | 
			
		||||
		if stmt != nil {
 | 
			
		||||
			_ = stmt.Close()
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return this.db.Close()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 打印日志
 | 
			
		||||
func (this *DB) log(args ...any) {
 | 
			
		||||
	if !Tea.IsTesting() {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	if len(args) == 0 {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	args[0] = "[" + types.String(args[0]) + "]"
 | 
			
		||||
	log.Println(args...)
 | 
			
		||||
type DB interface {
 | 
			
		||||
	Init() error
 | 
			
		||||
	InsertAgentIP(ipId int64, ip string, agentCode string) error
 | 
			
		||||
	ListAgentIPs(offset int64, size int64) (agentIPs []*AgentIP, err error)
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										93
									
								
								internal/utils/agents/db_kv.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										93
									
								
								internal/utils/agents/db_kv.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,93 @@
 | 
			
		||||
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
 | 
			
		||||
 | 
			
		||||
package agents
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"errors"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/events"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/utils/kvstore"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type KVDB struct {
 | 
			
		||||
	table   *kvstore.Table[*AgentIP]
 | 
			
		||||
	encoder *AgentIPEncoder[*AgentIP]
 | 
			
		||||
	lastKey string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewKVDB() *KVDB {
 | 
			
		||||
	var db = &KVDB{}
 | 
			
		||||
 | 
			
		||||
	events.OnClose(func() {
 | 
			
		||||
		_ = db.Close()
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	return db
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *KVDB) Init() error {
 | 
			
		||||
	store, err := kvstore.DefaultStore()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	db, err := store.NewDB("agents")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	{
 | 
			
		||||
		this.encoder = &AgentIPEncoder[*AgentIP]{}
 | 
			
		||||
		table, tableErr := kvstore.NewTable[*AgentIP]("agent_ips", this.encoder)
 | 
			
		||||
		if tableErr != nil {
 | 
			
		||||
			return tableErr
 | 
			
		||||
		}
 | 
			
		||||
		db.AddTable(table)
 | 
			
		||||
		this.table = table
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *KVDB) InsertAgentIP(ipId int64, ip string, agentCode string) error {
 | 
			
		||||
	if this.table == nil {
 | 
			
		||||
		return errors.New("table should not be nil")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var item = &AgentIP{
 | 
			
		||||
		Id:        ipId,
 | 
			
		||||
		IP:        ip,
 | 
			
		||||
		AgentCode: agentCode,
 | 
			
		||||
	}
 | 
			
		||||
	var key = this.encoder.EncodeKey(item)
 | 
			
		||||
	return this.table.Set(key, item)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *KVDB) ListAgentIPs(offset int64, size int64) (agentIPs []*AgentIP, err error) {
 | 
			
		||||
	if this.table == nil {
 | 
			
		||||
		return nil, errors.New("table should not be nil")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	err = this.table.
 | 
			
		||||
		Query().
 | 
			
		||||
		Limit(int(size)).
 | 
			
		||||
		Offset(this.lastKey).
 | 
			
		||||
		FindAll(func(tx *kvstore.Tx[*AgentIP], item kvstore.Item[*AgentIP]) (goNext bool, err error) {
 | 
			
		||||
			this.lastKey = item.Key
 | 
			
		||||
			agentIPs = append(agentIPs, item.Value)
 | 
			
		||||
			return true, nil
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *KVDB) Close() error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *KVDB) Flush() error {
 | 
			
		||||
	if this.table == nil {
 | 
			
		||||
		return errors.New("table should not be nil")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return this.table.DB().Store().Flush()
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										36
									
								
								internal/utils/agents/db_kv_objects.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										36
									
								
								internal/utils/agents/db_kv_objects.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,36 @@
 | 
			
		||||
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
 | 
			
		||||
 | 
			
		||||
package agents
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/binary"
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"errors"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type AgentIPEncoder[T interface{ *AgentIP }] struct {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *AgentIPEncoder[T]) Encode(value T) ([]byte, error) {
 | 
			
		||||
	return json.Marshal(value)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *AgentIPEncoder[T]) EncodeField(value T, fieldName string) ([]byte, error) {
 | 
			
		||||
	return nil, errors.New("invalid field name '" + fieldName + "'")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *AgentIPEncoder[T]) Decode(valueBytes []byte) (value T, err error) {
 | 
			
		||||
	err = json.Unmarshal(valueBytes, &value)
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// EncodeKey generate key for ip item
 | 
			
		||||
func (this *AgentIPEncoder[T]) EncodeKey(item *AgentIP) string {
 | 
			
		||||
	var b = make([]byte, 8)
 | 
			
		||||
	if item.Id < 0 {
 | 
			
		||||
		item.Id = 0
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	binary.BigEndian.PutUint64(b, uint64(item.Id))
 | 
			
		||||
	return string(b)
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										53
									
								
								internal/utils/agents/db_kv_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										53
									
								
								internal/utils/agents/db_kv_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,53 @@
 | 
			
		||||
// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
 | 
			
		||||
 | 
			
		||||
package agents_test
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/utils/agents"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"testing"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestKVDB_InsertAgentIP(t *testing.T) {
 | 
			
		||||
	var db = agents.NewKVDB()
 | 
			
		||||
	err := db.Init()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	defer func() {
 | 
			
		||||
		_ = db.Flush()
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	for i := 1; i <= 5; i++ {
 | 
			
		||||
		err = db.InsertAgentIP(int64(i), "192.168.2."+strconv.Itoa(i), "example")
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Fatal(err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestKVDB_ListAgentIPs(t *testing.T) {
 | 
			
		||||
	var db = agents.NewKVDB()
 | 
			
		||||
	err := db.Init()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	const count = 10
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		agentIPs, listErr := db.ListAgentIPs(0, count)
 | 
			
		||||
		if listErr != nil {
 | 
			
		||||
			t.Fatal(listErr)
 | 
			
		||||
		}
 | 
			
		||||
		t.Log("===")
 | 
			
		||||
		for _, agentIP := range agentIPs {
 | 
			
		||||
			t.Logf("%+v", agentIP)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if len(agentIPs) < count {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										160
									
								
								internal/utils/agents/db_sqlite.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										160
									
								
								internal/utils/agents/db_sqlite.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,160 @@
 | 
			
		||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
 | 
			
		||||
 | 
			
		||||
package agents
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"errors"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/events"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
 | 
			
		||||
	"github.com/iwind/TeaGo/Tea"
 | 
			
		||||
	"github.com/iwind/TeaGo/types"
 | 
			
		||||
	"log"
 | 
			
		||||
	"os"
 | 
			
		||||
	"path/filepath"
 | 
			
		||||
	"strings"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	tableAgentIPs = "agentIPs"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type SQLiteDB struct {
 | 
			
		||||
	db   *dbs.DB
 | 
			
		||||
	path string
 | 
			
		||||
 | 
			
		||||
	insertAgentIPStmt *dbs.Stmt
 | 
			
		||||
	listAgentIPsStmt  *dbs.Stmt
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewSQLiteDB(path string) *SQLiteDB {
 | 
			
		||||
	var db = &SQLiteDB{path: path}
 | 
			
		||||
 | 
			
		||||
	events.OnClose(func() {
 | 
			
		||||
		_ = db.Close()
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	return db
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *SQLiteDB) Init() error {
 | 
			
		||||
	// 检查目录是否存在
 | 
			
		||||
	var dir = filepath.Dir(this.path)
 | 
			
		||||
 | 
			
		||||
	_, err := os.Stat(dir)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		err = os.MkdirAll(dir, 0777)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		remotelogs.Println("DB", "create database dir '"+dir+"'")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// TODO 思考 data.db 的数据安全性
 | 
			
		||||
	db, err := dbs.OpenWriter("file:" + this.path + "?cache=shared&mode=rwc&_journal_mode=WAL&_locking_mode=EXCLUSIVE")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	db.SetMaxOpenConns(1)
 | 
			
		||||
 | 
			
		||||
	/**_, err = db.Exec("VACUUM")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}**/
 | 
			
		||||
 | 
			
		||||
	_, err = db.Exec(`CREATE TABLE IF NOT EXISTS "` + tableAgentIPs + `" (
 | 
			
		||||
  "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
 | 
			
		||||
  "ip" varchar(64),
 | 
			
		||||
  "agentCode" varchar(128)
 | 
			
		||||
);`)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 预编译语句
 | 
			
		||||
 | 
			
		||||
	// agent ip record statements
 | 
			
		||||
	this.insertAgentIPStmt, err = db.Prepare(`INSERT INTO "` + tableAgentIPs + `" ("id", "ip", "agentCode") VALUES (?, ?, ?)`)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	this.listAgentIPsStmt, err = db.Prepare(`SELECT "id", "ip", "agentCode" FROM "` + tableAgentIPs + `" ORDER BY "id" ASC LIMIT ? OFFSET ?`)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	this.db = db
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *SQLiteDB) InsertAgentIP(ipId int64, ip string, agentCode string) error {
 | 
			
		||||
	if this.db == nil {
 | 
			
		||||
		return errors.New("db should not be nil")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	_, err := this.insertAgentIPStmt.Exec(ipId, ip, agentCode)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		// 不提示ID重复错误
 | 
			
		||||
		if strings.Contains(err.Error(), "UNIQUE constraint") {
 | 
			
		||||
			return nil
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *SQLiteDB) ListAgentIPs(offset int64, size int64) (agentIPs []*AgentIP, err error) {
 | 
			
		||||
	if this.db == nil {
 | 
			
		||||
		return nil, errors.New("db should not be nil")
 | 
			
		||||
	}
 | 
			
		||||
	rows, err := this.listAgentIPsStmt.Query(size, offset)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	defer func() {
 | 
			
		||||
		_ = rows.Close()
 | 
			
		||||
	}()
 | 
			
		||||
	for rows.Next() {
 | 
			
		||||
		var agentIP = &AgentIP{}
 | 
			
		||||
		err = rows.Scan(&agentIP.Id, &agentIP.IP, &agentIP.AgentCode)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		agentIPs = append(agentIPs, agentIP)
 | 
			
		||||
	}
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *SQLiteDB) Close() error {
 | 
			
		||||
	if this.db == nil {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, stmt := range []*dbs.Stmt{
 | 
			
		||||
		this.insertAgentIPStmt,
 | 
			
		||||
		this.listAgentIPsStmt,
 | 
			
		||||
	} {
 | 
			
		||||
		if stmt != nil {
 | 
			
		||||
			_ = stmt.Close()
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return this.db.Close()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 打印日志
 | 
			
		||||
func (this *SQLiteDB) log(args ...any) {
 | 
			
		||||
	if !Tea.IsTesting() {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	if len(args) == 0 {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	args[0] = "[" + types.String(args[0]) + "]"
 | 
			
		||||
	log.Println(args...)
 | 
			
		||||
}
 | 
			
		||||
@@ -10,6 +10,7 @@ import (
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/rpc"
 | 
			
		||||
	"github.com/iwind/TeaGo/Tea"
 | 
			
		||||
	"os"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
@@ -33,7 +34,7 @@ type Manager struct {
 | 
			
		||||
	ipMap  map[string]string // ip => agentCode
 | 
			
		||||
	locker sync.RWMutex
 | 
			
		||||
 | 
			
		||||
	db *DB
 | 
			
		||||
	db DB
 | 
			
		||||
 | 
			
		||||
	lastId int64
 | 
			
		||||
}
 | 
			
		||||
@@ -44,7 +45,7 @@ func NewManager() *Manager {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *Manager) SetDB(db *DB) {
 | 
			
		||||
func (this *Manager) SetDB(db DB) {
 | 
			
		||||
	this.db = db
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -195,7 +196,14 @@ func (this *Manager) ContainsIP(ip string) bool {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *Manager) loadDB() error {
 | 
			
		||||
	var db = NewDB(Tea.Root + "/data/agents.db")
 | 
			
		||||
	var sqlitePath = Tea.Root + "/data/agents.db"
 | 
			
		||||
	_, sqliteErr := os.Stat(sqlitePath)
 | 
			
		||||
	var db DB
 | 
			
		||||
	if sqliteErr == nil {
 | 
			
		||||
		db = NewSQLiteDB(sqlitePath)
 | 
			
		||||
	} else {
 | 
			
		||||
		db = NewKVDB()
 | 
			
		||||
	}
 | 
			
		||||
	err := db.Init()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
 
 | 
			
		||||
@@ -15,7 +15,7 @@ func TestNewManager(t *testing.T) {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var db = agents.NewDB(Tea.Root + "/data/agents.db")
 | 
			
		||||
	var db = agents.NewSQLiteDB(Tea.Root + "/data/agents.db")
 | 
			
		||||
	err := db.Init()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
 
 | 
			
		||||
@@ -4,6 +4,7 @@ package kvstore
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/events"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
 | 
			
		||||
	fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
 | 
			
		||||
@@ -38,16 +39,16 @@ func NewStore(storeName string) (*Store, error) {
 | 
			
		||||
		return nil, errors.New("invalid store name '" + storeName + "'")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var root = Tea.Root + "/data/stores"
 | 
			
		||||
	_, err := os.Stat(root)
 | 
			
		||||
	var path = Tea.Root + "/data/stores/" + storeName + StoreSuffix
 | 
			
		||||
	_, err := os.Stat(path)
 | 
			
		||||
	if err != nil && os.IsNotExist(err) {
 | 
			
		||||
		_ = os.MkdirAll(root, 0777)
 | 
			
		||||
		_ = os.MkdirAll(path, 0777)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &Store{
 | 
			
		||||
		name:   storeName,
 | 
			
		||||
		path:   Tea.Root + "/data/stores/" + storeName + StoreSuffix,
 | 
			
		||||
		locker: fsutils.NewLocker(Tea.Root + "/data/stores/" + storeName + StoreSuffix + "/.fs"),
 | 
			
		||||
		path:   path,
 | 
			
		||||
		locker: fsutils.NewLocker(path + "/.fs"),
 | 
			
		||||
	}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -96,21 +97,24 @@ func DefaultStore() (*Store, error) {
 | 
			
		||||
		return defaultSore, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var resultErr error
 | 
			
		||||
	storeOnce.Do(func() {
 | 
			
		||||
		store, err := NewStore("default")
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			remotelogs.Error("KV", "create default store failed: "+err.Error())
 | 
			
		||||
			resultErr = fmt.Errorf("create default store failed: %w", err)
 | 
			
		||||
			remotelogs.Error("KV", resultErr.Error())
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		err = store.Open()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			remotelogs.Error("KV", "open default store failed: "+err.Error())
 | 
			
		||||
			resultErr = fmt.Errorf("open default store failed: %w", err)
 | 
			
		||||
			remotelogs.Error("KV", resultErr.Error())
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		defaultSore = store
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	return defaultSore, nil
 | 
			
		||||
	return defaultSore, resultErr
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (this *Store) Open() error {
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user