增加对数据库操作的统计命令:edge-node dbstat/减少几个不必要的查询操作

This commit is contained in:
GoEdgeLab
2022-03-13 19:27:38 +08:00
parent c1890be289
commit 5279877cd2
10 changed files with 343 additions and 23 deletions

View File

@@ -81,6 +81,12 @@ func main() {
node := nodes.NewNode()
node.Start()
})
app.On("dbstat", func() {
teaconst.EnableDBStat = true
node := nodes.NewNode()
node.Start()
})
app.On("trackers", func() {
var sock = gosock.NewTmpSock(teaconst.ProcessName)
reply, err := sock.Send(&gosock.Command{Code: "trackers"})

View File

@@ -5,10 +5,12 @@ package caches
import (
"database/sql"
"errors"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/ttlcache"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
"github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/types"
timeutil "github.com/iwind/TeaGo/utils/time"
@@ -21,26 +23,26 @@ import (
// FileList 文件缓存列表管理
type FileList struct {
dir string
db *sql.DB
db *dbs.DB
total int64
onAdd func(item *Item)
onRemove func(item *Item)
// cacheItems
existsByHashStmt *sql.Stmt // 根据hash检查是否存在
insertStmt *sql.Stmt // 写入数据
selectByHashStmt *sql.Stmt // 使用hash查询数据
deleteByHashStmt *sql.Stmt // 根据hash删除数据
statStmt *sql.Stmt // 统计
purgeStmt *sql.Stmt // 清理
deleteAllStmt *sql.Stmt // 删除所有数据
existsByHashStmt *dbs.Stmt // 根据hash检查是否存在
insertStmt *dbs.Stmt // 写入数据
selectByHashStmt *dbs.Stmt // 使用hash查询数据
deleteByHashStmt *dbs.Stmt // 根据hash删除数据
statStmt *dbs.Stmt // 统计
purgeStmt *dbs.Stmt // 清理
deleteAllStmt *dbs.Stmt // 删除所有数据
// hits
insertHitStmt *sql.Stmt // 写入数据
increaseHitStmt *sql.Stmt // 增加点击量
deleteHitByHashStmt *sql.Stmt // 根据hash删除数据
lfuHitsStmt *sql.Stmt // 读取老的数据
insertHitStmt *dbs.Stmt // 写入数据
increaseHitStmt *dbs.Stmt // 增加点击量
deleteHitByHashStmt *dbs.Stmt // 根据hash删除数据
lfuHitsStmt *dbs.Stmt // 读取老的数据
oldTables []string
itemsTableName string
@@ -80,14 +82,18 @@ func (this *FileList) Init() error {
}
var dbPath = dir + "/index.db"
remotelogs.Println("CACHE", "loading database '"+dbPath+"'")
db, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=shared&mode=rwc&_journal_mode=WAL")
db, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=shared&mode=rwc&_journal_mode=WAL&_cache_size=16000")
if err != nil {
return errors.New("open database failed: " + err.Error())
}
db.SetMaxOpenConns(1)
this.db = db
this.db = dbs.NewDB(db)
if teaconst.EnableDBStat {
this.db.EnableStat(true)
}
// TODO 耗时过长,暂时不整理数据库
/**_, err = db.Exec("VACUUM")
@@ -196,15 +202,13 @@ func (this *FileList) Add(hash string, item *Item) error {
item.StaleAt = item.ExpiredAt
}
// 放入队列
_, err := this.insertStmt.Exec(hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt, item.StaleAt, item.Host, item.ServerId, utils.UnixTime())
if err != nil {
return err
}
_, err = this.insertHitStmt.Exec(hash, timeutil.Format("YW"))
if err != nil {
return err
}
// 这里不增加点击量,以减少对数据库的操作次数
this.memoryCache.Write(hash, 1, item.ExpiredAt)
atomic.AddInt64(&this.total, 1)

View File

@@ -14,4 +14,8 @@ var (
NodeIdString = ""
GlobalProductName = nodeconfigs.DefaultProductName
// Track
EnableDBStat = false
)

View File

@@ -7,17 +7,18 @@ import (
"encoding/json"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
"github.com/TeaOSLab/EdgeNode/internal/zero"
"github.com/iwind/TeaGo/Tea"
_ "github.com/mattn/go-sqlite3"
"os"
"strconv"
"strings"
"sync"
"time"
)
@@ -38,7 +39,7 @@ type Task struct {
item *serverconfigs.MetricItemConfig
isLoaded bool
db *sql.DB
db *dbs.DB
statTableName string
isStopped bool
@@ -92,7 +93,11 @@ func (this *Task) Init() error {
return err
}
db.SetMaxOpenConns(1)
this.db = db
this.db = dbs.NewDB(db)
if teaconst.EnableDBStat {
this.db.EnableStat(true)
}
//创建统计表
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.statTableName + `" (
@@ -438,10 +443,11 @@ func (this *Task) Upload(pauseDuration time.Duration) error {
if len(idStrings) > 0 {
// 设置为已上传
_, err = this.db.Exec(`UPDATE "` + this.statTableName + `" SET isUploaded=1 WHERE id IN (` + strings.Join(idStrings, ",") + `)`)
// TODO 先不判断是否已经上传需要改造API进行配合
/**_, err = this.db.Exec(`UPDATE "` + this.statTableName + `" SET isUploaded=1 WHERE id IN (` + strings.Join(idStrings, ",") + `)`)
if err != nil {
return err
}
}**/
}
}

69
internal/utils/dbs/db.go Normal file
View File

@@ -0,0 +1,69 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package dbs
import (
"context"
"database/sql"
)
type DB struct {
rawDB *sql.DB
enableStat bool
}
func NewDB(rawDB *sql.DB) *DB {
return &DB{
rawDB: rawDB,
}
}
func (this *DB) EnableStat(b bool) {
this.enableStat = b
}
func (this *DB) Prepare(query string) (*Stmt, error) {
stmt, err := this.rawDB.Prepare(query)
if err != nil {
return nil, err
}
var s = NewStmt(stmt, query)
if this.enableStat {
s.EnableStat()
}
return s, nil
}
func (this *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
if this.enableStat {
defer SharedQueryStatManager.AddQuery(query).End()
}
return this.rawDB.ExecContext(ctx, query, args...)
}
func (this *DB) Exec(query string, args ...interface{}) (sql.Result, error) {
if this.enableStat {
defer SharedQueryStatManager.AddQuery(query).End()
}
return this.rawDB.Exec(query, args...)
}
func (this *DB) Query(query string, args ...interface{}) (*sql.Rows, error) {
if this.enableStat {
defer SharedQueryStatManager.AddQuery(query).End()
}
return this.rawDB.Query(query, args...)
}
func (this *DB) QueryRow(query string, args ...interface{}) *sql.Row {
if this.enableStat {
defer SharedQueryStatManager.AddQuery(query).End()
}
return this.rawDB.QueryRow(query, args...)
}
func (this *DB) Close() error {
return this.rawDB.Close()
}

View File

@@ -0,0 +1,24 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package dbs
import "time"
type QueryLabel struct {
manager *QueryStatManager
query string
before time.Time
}
func NewQueryLabel(manager *QueryStatManager, query string) *QueryLabel {
return &QueryLabel{
manager: manager,
query: query,
before: time.Now(),
}
}
func (this *QueryLabel) End() {
var cost = time.Since(this.before).Seconds()
this.manager.AddCost(this.query, cost)
}

View File

@@ -0,0 +1,30 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package dbs
type QueryStat struct {
Query string
CostMin float64
CostMax float64
CostTotal float64
Calls int64
}
func NewQueryStat(query string) *QueryStat {
return &QueryStat{
Query: query,
}
}
func (this *QueryStat) AddCost(cost float64) {
if this.CostMin == 0 || this.CostMin > cost {
this.CostMin = cost
}
if this.CostMax == 0 || this.CostMax < cost {
this.CostMax = cost
}
this.CostTotal += cost
this.Calls++
}

View File

@@ -0,0 +1,81 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package dbs
import (
"fmt"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/iwind/TeaGo/logs"
"sort"
"strings"
"sync"
"time"
)
func init() {
var ticker = time.NewTicker(5 * time.Second)
events.On(events.EventLoaded, func() {
if teaconst.EnableDBStat {
goman.New(func() {
for range ticker.C {
var stats = []string{}
for _, stat := range SharedQueryStatManager.TopN(10) {
var avg = stat.CostTotal / float64(stat.Calls)
stats = append(stats, fmt.Sprintf("%.2fms/%.2fms/%.2fms - %d - %s", stat.CostMin*1000, stat.CostMax*1000, avg*1000, stat.Calls, stat.Query))
}
logs.Println("====DB STATS====\n" + strings.Join(stats, "\n"))
}
})
}
})
}
var SharedQueryStatManager = NewQueryStatManager()
type QueryStatManager struct {
statsMap map[string]*QueryStat // query => *QueryStat
locker sync.Mutex
}
func NewQueryStatManager() *QueryStatManager {
return &QueryStatManager{
statsMap: map[string]*QueryStat{},
}
}
func (this *QueryStatManager) AddQuery(query string) *QueryLabel {
return NewQueryLabel(this, query)
}
func (this *QueryStatManager) AddCost(query string, cost float64) {
this.locker.Lock()
defer this.locker.Unlock()
stat, ok := this.statsMap[query]
if !ok {
stat = NewQueryStat(query)
this.statsMap[query] = stat
}
stat.AddCost(cost)
}
func (this *QueryStatManager) TopN(n int) []*QueryStat {
this.locker.Lock()
defer this.locker.Unlock()
var stats = []*QueryStat{}
for _, stat := range this.statsMap {
stats = append(stats, stat)
}
sort.Slice(stats, func(i, j int) bool {
return stats[i].CostMax > stats[j].CostMax
})
if len(stats) > n {
return stats[:n]
}
return stats
}

View File

@@ -0,0 +1,24 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package dbs_test
import (
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
"github.com/iwind/TeaGo/logs"
"testing"
"time"
)
func TestQueryStatManager(t *testing.T) {
var manager = dbs.NewQueryStatManager()
{
var label = manager.AddQuery("sql 1")
time.Sleep(1 * time.Second)
label.End()
}
manager.AddQuery("sql 1").End()
manager.AddQuery("sql 2").End()
for _, stat := range manager.TopN(10) {
logs.PrintAsJSON(stat, t)
}
}

View File

@@ -0,0 +1,72 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package dbs
import (
"context"
"database/sql"
)
type Stmt struct {
rawStmt *sql.Stmt
query string
enableStat bool
}
func NewStmt(rawStmt *sql.Stmt, query string) *Stmt {
return &Stmt{
rawStmt: rawStmt,
query: query,
}
}
func (this *Stmt) EnableStat() {
this.enableStat = true
}
func (this *Stmt) ExecContext(ctx context.Context, args ...interface{}) (sql.Result, error) {
if this.enableStat {
defer SharedQueryStatManager.AddQuery(this.query).End()
}
return this.rawStmt.ExecContext(ctx, args...)
}
func (this *Stmt) Exec(args ...interface{}) (sql.Result, error) {
if this.enableStat {
defer SharedQueryStatManager.AddQuery(this.query).End()
}
return this.rawStmt.Exec(args...)
}
func (this *Stmt) QueryContext(ctx context.Context, args ...interface{}) (*sql.Rows, error) {
if this.enableStat {
defer SharedQueryStatManager.AddQuery(this.query).End()
}
return this.rawStmt.QueryContext(ctx, args...)
}
func (this *Stmt) Query(args ...interface{}) (*sql.Rows, error) {
if this.enableStat {
defer SharedQueryStatManager.AddQuery(this.query).End()
}
return this.rawStmt.Query(args...)
}
func (this *Stmt) QueryRowContext(ctx context.Context, args ...interface{}) *sql.Row {
if this.enableStat {
defer SharedQueryStatManager.AddQuery(this.query).End()
}
return this.rawStmt.QueryRowContext(ctx, args...)
}
func (this *Stmt) QueryRow(args ...interface{}) *sql.Row {
if this.enableStat {
defer SharedQueryStatManager.AddQuery(this.query).End()
}
return this.rawStmt.QueryRow(args...)
}
func (this *Stmt) Close() error {
return this.rawStmt.Close()
}