diff --git a/cmd/edge-node/main.go b/cmd/edge-node/main.go index 4387c14..cf3cb5c 100644 --- a/cmd/edge-node/main.go +++ b/cmd/edge-node/main.go @@ -81,6 +81,12 @@ func main() { node := nodes.NewNode() node.Start() }) + app.On("dbstat", func() { + teaconst.EnableDBStat = true + + node := nodes.NewNode() + node.Start() + }) app.On("trackers", func() { var sock = gosock.NewTmpSock(teaconst.ProcessName) reply, err := sock.Send(&gosock.Command{Code: "trackers"}) diff --git a/internal/caches/list_file.go b/internal/caches/list_file.go index c784441..490ea4c 100644 --- a/internal/caches/list_file.go +++ b/internal/caches/list_file.go @@ -5,10 +5,12 @@ package caches import ( "database/sql" "errors" + teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/ttlcache" "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/TeaOSLab/EdgeNode/internal/utils/dbs" "github.com/iwind/TeaGo/lists" "github.com/iwind/TeaGo/types" timeutil "github.com/iwind/TeaGo/utils/time" @@ -21,26 +23,26 @@ import ( // FileList 文件缓存列表管理 type FileList struct { dir string - db *sql.DB + db *dbs.DB total int64 onAdd func(item *Item) onRemove func(item *Item) // cacheItems - existsByHashStmt *sql.Stmt // 根据hash检查是否存在 - insertStmt *sql.Stmt // 写入数据 - selectByHashStmt *sql.Stmt // 使用hash查询数据 - deleteByHashStmt *sql.Stmt // 根据hash删除数据 - statStmt *sql.Stmt // 统计 - purgeStmt *sql.Stmt // 清理 - deleteAllStmt *sql.Stmt // 删除所有数据 + existsByHashStmt *dbs.Stmt // 根据hash检查是否存在 + insertStmt *dbs.Stmt // 写入数据 + selectByHashStmt *dbs.Stmt // 使用hash查询数据 + deleteByHashStmt *dbs.Stmt // 根据hash删除数据 + statStmt *dbs.Stmt // 统计 + purgeStmt *dbs.Stmt // 清理 + deleteAllStmt *dbs.Stmt // 删除所有数据 // hits - insertHitStmt *sql.Stmt // 写入数据 - increaseHitStmt *sql.Stmt // 增加点击量 - deleteHitByHashStmt *sql.Stmt // 根据hash删除数据 - lfuHitsStmt *sql.Stmt // 读取老的数据 + insertHitStmt *dbs.Stmt // 写入数据 + increaseHitStmt *dbs.Stmt // 增加点击量 + deleteHitByHashStmt *dbs.Stmt // 根据hash删除数据 + lfuHitsStmt *dbs.Stmt // 读取老的数据 oldTables []string itemsTableName string @@ -80,14 +82,18 @@ func (this *FileList) Init() error { } var dbPath = dir + "/index.db" remotelogs.Println("CACHE", "loading database '"+dbPath+"'") - db, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=shared&mode=rwc&_journal_mode=WAL") + db, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=shared&mode=rwc&_journal_mode=WAL&_cache_size=16000") if err != nil { return errors.New("open database failed: " + err.Error()) } db.SetMaxOpenConns(1) - this.db = db + this.db = dbs.NewDB(db) + + if teaconst.EnableDBStat { + this.db.EnableStat(true) + } // TODO 耗时过长,暂时不整理数据库 /**_, err = db.Exec("VACUUM") @@ -196,15 +202,13 @@ func (this *FileList) Add(hash string, item *Item) error { item.StaleAt = item.ExpiredAt } + // 放入队列 _, err := this.insertStmt.Exec(hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt, item.StaleAt, item.Host, item.ServerId, utils.UnixTime()) if err != nil { return err } - _, err = this.insertHitStmt.Exec(hash, timeutil.Format("YW")) - if err != nil { - return err - } + // 这里不增加点击量,以减少对数据库的操作次数 this.memoryCache.Write(hash, 1, item.ExpiredAt) atomic.AddInt64(&this.total, 1) diff --git a/internal/const/vars.go b/internal/const/vars.go index 3a338e2..bee5a9e 100644 --- a/internal/const/vars.go +++ b/internal/const/vars.go @@ -14,4 +14,8 @@ var ( NodeIdString = "" GlobalProductName = nodeconfigs.DefaultProductName + + // Track + + EnableDBStat = false ) diff --git a/internal/metrics/task.go b/internal/metrics/task.go index b163d58..6038856 100644 --- a/internal/metrics/task.go +++ b/internal/metrics/task.go @@ -7,17 +7,18 @@ import ( "encoding/json" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/TeaOSLab/EdgeNode/internal/utils/dbs" "github.com/TeaOSLab/EdgeNode/internal/zero" "github.com/iwind/TeaGo/Tea" _ "github.com/mattn/go-sqlite3" "os" "strconv" - "strings" "sync" "time" ) @@ -38,7 +39,7 @@ type Task struct { item *serverconfigs.MetricItemConfig isLoaded bool - db *sql.DB + db *dbs.DB statTableName string isStopped bool @@ -92,7 +93,11 @@ func (this *Task) Init() error { return err } db.SetMaxOpenConns(1) - this.db = db + this.db = dbs.NewDB(db) + + if teaconst.EnableDBStat { + this.db.EnableStat(true) + } //创建统计表 _, err = db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.statTableName + `" ( @@ -438,10 +443,11 @@ func (this *Task) Upload(pauseDuration time.Duration) error { if len(idStrings) > 0 { // 设置为已上传 - _, err = this.db.Exec(`UPDATE "` + this.statTableName + `" SET isUploaded=1 WHERE id IN (` + strings.Join(idStrings, ",") + `)`) + // TODO 先不判断是否已经上传,需要改造API进行配合 + /**_, err = this.db.Exec(`UPDATE "` + this.statTableName + `" SET isUploaded=1 WHERE id IN (` + strings.Join(idStrings, ",") + `)`) if err != nil { return err - } + }**/ } } diff --git a/internal/utils/dbs/db.go b/internal/utils/dbs/db.go new file mode 100644 index 0000000..e7b6701 --- /dev/null +++ b/internal/utils/dbs/db.go @@ -0,0 +1,69 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package dbs + +import ( + "context" + "database/sql" +) + +type DB struct { + rawDB *sql.DB + + enableStat bool +} + +func NewDB(rawDB *sql.DB) *DB { + return &DB{ + rawDB: rawDB, + } +} + +func (this *DB) EnableStat(b bool) { + this.enableStat = b +} + +func (this *DB) Prepare(query string) (*Stmt, error) { + stmt, err := this.rawDB.Prepare(query) + if err != nil { + return nil, err + } + + var s = NewStmt(stmt, query) + if this.enableStat { + s.EnableStat() + } + return s, nil +} + +func (this *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) { + if this.enableStat { + defer SharedQueryStatManager.AddQuery(query).End() + } + return this.rawDB.ExecContext(ctx, query, args...) +} + +func (this *DB) Exec(query string, args ...interface{}) (sql.Result, error) { + if this.enableStat { + defer SharedQueryStatManager.AddQuery(query).End() + } + return this.rawDB.Exec(query, args...) +} + +func (this *DB) Query(query string, args ...interface{}) (*sql.Rows, error) { + if this.enableStat { + defer SharedQueryStatManager.AddQuery(query).End() + } + return this.rawDB.Query(query, args...) +} + +func (this *DB) QueryRow(query string, args ...interface{}) *sql.Row { + if this.enableStat { + defer SharedQueryStatManager.AddQuery(query).End() + } + return this.rawDB.QueryRow(query, args...) +} + +func (this *DB) Close() error { + return this.rawDB.Close() +} diff --git a/internal/utils/dbs/query_label.go b/internal/utils/dbs/query_label.go new file mode 100644 index 0000000..4a0f14e --- /dev/null +++ b/internal/utils/dbs/query_label.go @@ -0,0 +1,24 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package dbs + +import "time" + +type QueryLabel struct { + manager *QueryStatManager + query string + before time.Time +} + +func NewQueryLabel(manager *QueryStatManager, query string) *QueryLabel { + return &QueryLabel{ + manager: manager, + query: query, + before: time.Now(), + } +} + +func (this *QueryLabel) End() { + var cost = time.Since(this.before).Seconds() + this.manager.AddCost(this.query, cost) +} diff --git a/internal/utils/dbs/query_stat.go b/internal/utils/dbs/query_stat.go new file mode 100644 index 0000000..9e31669 --- /dev/null +++ b/internal/utils/dbs/query_stat.go @@ -0,0 +1,30 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package dbs + +type QueryStat struct { + Query string + CostMin float64 + CostMax float64 + + CostTotal float64 + Calls int64 +} + +func NewQueryStat(query string) *QueryStat { + return &QueryStat{ + Query: query, + } +} + +func (this *QueryStat) AddCost(cost float64) { + if this.CostMin == 0 || this.CostMin > cost { + this.CostMin = cost + } + if this.CostMax == 0 || this.CostMax < cost { + this.CostMax = cost + } + + this.CostTotal += cost + this.Calls++ +} diff --git a/internal/utils/dbs/query_stat_manager.go b/internal/utils/dbs/query_stat_manager.go new file mode 100644 index 0000000..0498807 --- /dev/null +++ b/internal/utils/dbs/query_stat_manager.go @@ -0,0 +1,81 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package dbs + +import ( + "fmt" + teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" + "github.com/iwind/TeaGo/logs" + "sort" + "strings" + "sync" + "time" +) + +func init() { + var ticker = time.NewTicker(5 * time.Second) + + events.On(events.EventLoaded, func() { + if teaconst.EnableDBStat { + goman.New(func() { + for range ticker.C { + var stats = []string{} + for _, stat := range SharedQueryStatManager.TopN(10) { + var avg = stat.CostTotal / float64(stat.Calls) + stats = append(stats, fmt.Sprintf("%.2fms/%.2fms/%.2fms - %d - %s", stat.CostMin*1000, stat.CostMax*1000, avg*1000, stat.Calls, stat.Query)) + } + logs.Println("====DB STATS====\n" + strings.Join(stats, "\n")) + } + }) + } + }) +} + +var SharedQueryStatManager = NewQueryStatManager() + +type QueryStatManager struct { + statsMap map[string]*QueryStat // query => *QueryStat + locker sync.Mutex +} + +func NewQueryStatManager() *QueryStatManager { + return &QueryStatManager{ + statsMap: map[string]*QueryStat{}, + } +} + +func (this *QueryStatManager) AddQuery(query string) *QueryLabel { + return NewQueryLabel(this, query) +} + +func (this *QueryStatManager) AddCost(query string, cost float64) { + this.locker.Lock() + defer this.locker.Unlock() + + stat, ok := this.statsMap[query] + if !ok { + stat = NewQueryStat(query) + this.statsMap[query] = stat + } + stat.AddCost(cost) +} + +func (this *QueryStatManager) TopN(n int) []*QueryStat { + this.locker.Lock() + defer this.locker.Unlock() + + var stats = []*QueryStat{} + for _, stat := range this.statsMap { + stats = append(stats, stat) + } + sort.Slice(stats, func(i, j int) bool { + return stats[i].CostMax > stats[j].CostMax + }) + + if len(stats) > n { + return stats[:n] + } + return stats +} diff --git a/internal/utils/dbs/query_stat_manager_test.go b/internal/utils/dbs/query_stat_manager_test.go new file mode 100644 index 0000000..7361821 --- /dev/null +++ b/internal/utils/dbs/query_stat_manager_test.go @@ -0,0 +1,24 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package dbs_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/utils/dbs" + "github.com/iwind/TeaGo/logs" + "testing" + "time" +) + +func TestQueryStatManager(t *testing.T) { + var manager = dbs.NewQueryStatManager() + { + var label = manager.AddQuery("sql 1") + time.Sleep(1 * time.Second) + label.End() + } + manager.AddQuery("sql 1").End() + manager.AddQuery("sql 2").End() + for _, stat := range manager.TopN(10) { + logs.PrintAsJSON(stat, t) + } +} diff --git a/internal/utils/dbs/stmt.go b/internal/utils/dbs/stmt.go new file mode 100644 index 0000000..a5df4c2 --- /dev/null +++ b/internal/utils/dbs/stmt.go @@ -0,0 +1,72 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package dbs + +import ( + "context" + "database/sql" +) + +type Stmt struct { + rawStmt *sql.Stmt + query string + + enableStat bool +} + +func NewStmt(rawStmt *sql.Stmt, query string) *Stmt { + return &Stmt{ + rawStmt: rawStmt, + query: query, + } +} + +func (this *Stmt) EnableStat() { + this.enableStat = true +} + +func (this *Stmt) ExecContext(ctx context.Context, args ...interface{}) (sql.Result, error) { + if this.enableStat { + defer SharedQueryStatManager.AddQuery(this.query).End() + } + return this.rawStmt.ExecContext(ctx, args...) +} + +func (this *Stmt) Exec(args ...interface{}) (sql.Result, error) { + if this.enableStat { + defer SharedQueryStatManager.AddQuery(this.query).End() + } + return this.rawStmt.Exec(args...) +} + +func (this *Stmt) QueryContext(ctx context.Context, args ...interface{}) (*sql.Rows, error) { + if this.enableStat { + defer SharedQueryStatManager.AddQuery(this.query).End() + } + return this.rawStmt.QueryContext(ctx, args...) +} + +func (this *Stmt) Query(args ...interface{}) (*sql.Rows, error) { + if this.enableStat { + defer SharedQueryStatManager.AddQuery(this.query).End() + } + return this.rawStmt.Query(args...) +} + +func (this *Stmt) QueryRowContext(ctx context.Context, args ...interface{}) *sql.Row { + if this.enableStat { + defer SharedQueryStatManager.AddQuery(this.query).End() + } + return this.rawStmt.QueryRowContext(ctx, args...) +} + +func (this *Stmt) QueryRow(args ...interface{}) *sql.Row { + if this.enableStat { + defer SharedQueryStatManager.AddQuery(this.query).End() + } + return this.rawStmt.QueryRow(args...) +} + +func (this *Stmt) Close() error { + return this.rawStmt.Close() +}