使用并发队列安装和升级数据表/启动时自动调整MySQL变量

This commit is contained in:
GoEdgeLab
2022-09-25 20:34:19 +08:00
parent d2c925f843
commit dd48aa59b3
4 changed files with 155 additions and 53 deletions

View File

@@ -35,3 +35,37 @@ func QuoteLikePrefix(keyword string) string {
func QuoteLikeSuffix(keyword string) string { func QuoteLikeSuffix(keyword string) string {
return "%" + QuoteLikeKeyword(keyword) return "%" + QuoteLikeKeyword(keyword)
} }
// SetGlobalVarMin 设置变量最小值
func SetGlobalVarMin(db *dbs.DB, variableName string, minValue int) error {
result, err := db.FindOne("SHOW VARIABLES WHERE variable_name=?", variableName)
if err != nil {
return err
}
if len(result) == 0 {
return nil
}
var oldValue = result.GetInt("Value")
if oldValue > 0 /** 小于等于0通常表示不限制 **/ && oldValue < minValue {
_, err = db.Exec("SET GLOBAL "+variableName+"=?", minValue)
return err
}
return nil
}
// SetGlobalVarMax 设置变量最大值
func SetGlobalVarMax(db *dbs.DB, variableName string, maxValue int) error {
result, err := db.FindOne("SHOW VARIABLES WHERE variable_name=?", variableName)
if err != nil {
return err
}
if len(result) == 0 {
return nil
}
var oldValue = result.GetInt("Value")
if oldValue > maxValue {
_, err = db.Exec("SET GLOBAL "+variableName+"=?", maxValue)
return err
}
return nil
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/configs" "github.com/TeaOSLab/EdgeAPI/internal/configs"
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const" teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
dbutils "github.com/TeaOSLab/EdgeAPI/internal/db/utils"
"github.com/TeaOSLab/EdgeAPI/internal/events" "github.com/TeaOSLab/EdgeAPI/internal/events"
"github.com/TeaOSLab/EdgeAPI/internal/goman" "github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs" "github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
@@ -32,7 +33,6 @@ import (
"net" "net"
"os" "os"
"os/exec" "os/exec"
"regexp"
"runtime" "runtime"
"sort" "sort"
"strconv" "strconv"
@@ -384,27 +384,20 @@ func (this *APINode) setupDB() error {
return err return err
} }
// 调整预处理语句数量 // 设置Innodb事务提交模式
{ {
result, err := db.FindOne("SHOW VARIABLES WHERE variable_name='max_prepared_stmt_count'") result, err := db.FindOne("SHOW VARIABLES WHERE variable_name='innodb_flush_log_at_trx_commit'")
if err != nil { if err == nil && result != nil {
return err var oldValue = result.GetInt("Value")
} if oldValue == 1 {
var value = result.GetString("Value") _, _ = db.Exec("SET GLOBAL innodb_flush_log_at_trx_commit=2")
if regexp.MustCompile(`^\d+$`).MatchString(value) {
var valueInt = types.Int(value)
if valueInt < 65535 {
_, err := db.Exec("SET GLOBAL max_prepared_stmt_count=65535")
if err != nil {
return errors.New("run 'SET GLOBAL max_prepared_stmt_count' on database failed: " + err.Error() + ", \nyou can change the variable in 'my.cnf': \n~~~\n" + `[mysqld]
max_prepared_stmt_count=65535
~~~
then restart mysqld.`)
}
} }
} }
} }
// 调整预处理语句数量
_ = dbutils.SetGlobalVarMin(db, "max_prepared_stmt_count", 65535)
// 调整binlog过期时间 // 调整binlog过期时间
{ {
const binlogExpireDays = 7 const binlogExpireDays = 7
@@ -413,25 +406,22 @@ then restart mysqld.`)
if err == nil { if err == nil {
var versionString = types.String(version) var versionString = types.String(version)
if strings.HasPrefix(versionString, "8.") { if strings.HasPrefix(versionString, "8.") {
result, err := db.FindOne("SHOW VARIABLES WHERE variable_name='binlog_expire_logs_seconds'") _ = dbutils.SetGlobalVarMax(db, "binlog_expire_logs_seconds", binlogExpireDays*86400)
if err == nil && result != nil {
var oldValue = result.GetInt("Value")
if oldValue > binlogExpireDays*86400 {
_, _ = db.Exec("SET GLOBAL binlog_expire_logs_seconds=" + types.String(binlogExpireDays*86400))
}
}
} else if strings.HasPrefix(versionString, "5.") { } else if strings.HasPrefix(versionString, "5.") {
result, err := db.FindOne("SHOW VARIABLES WHERE variable_name='expire_logs_days'") _ = dbutils.SetGlobalVarMax(db, "expire_logs_days", binlogExpireDays)
if err == nil && result != nil {
var oldValue = result.GetInt("Value")
if oldValue > binlogExpireDays {
_, _ = db.Exec("SET GLOBAL expire_logs_days=" + types.String(binlogExpireDays))
}
}
} }
} }
} }
// 设置binlog_cache_size
_ = dbutils.SetGlobalVarMin(db, "binlog_cache_size", 1*1024*1024)
// 设置binlog_stmt_cache_size
_ = dbutils.SetGlobalVarMin(db, "binlog_stmt_cache_size", 1*1024*1024)
// 设置thread_cache_size
_ = dbutils.SetGlobalVarMin(db, "thread_cache_size", 32)
return nil return nil
} }

View File

@@ -1,13 +1,14 @@
package setup package setup
import ( import (
"errors"
"fmt" "fmt"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/lists" "github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/types" "github.com/iwind/TeaGo/types"
"regexp" "regexp"
"strings" "strings"
"sync"
) )
var recordsTables = []*SQLRecordsTable{ var recordsTables = []*SQLRecordsTable{
@@ -38,6 +39,11 @@ var recordsTables = []*SQLRecordsTable{
}, },
} }
type sqlItem struct {
sqlString string
args []any
}
type SQLDump struct { type SQLDump struct {
} }
@@ -129,6 +135,78 @@ func (this *SQLDump) Dump(db *dbs.DB) (result *SQLDumpResult, err error) {
// Apply 应用数据 // Apply 应用数据
func (this *SQLDump) Apply(db *dbs.DB, newResult *SQLDumpResult, showLog bool) (ops []string, err error) { func (this *SQLDump) Apply(db *dbs.DB, newResult *SQLDumpResult, showLog bool) (ops []string, err error) {
// 设置Innodb事务提交模式
{
result, err := db.FindOne("SHOW VARIABLES WHERE variable_name='innodb_flush_log_at_trx_commit'")
if err == nil && result != nil {
var oldValue = result.GetInt("Value")
if oldValue == 1 {
_, _ = db.Exec("SET GLOBAL innodb_flush_log_at_trx_commit=2")
}
}
}
// 执行队列
var execQueue = make(chan *sqlItem, 256)
var threads = 32
var wg = sync.WaitGroup{}
wg.Add(threads + 1 /** applyQueue **/)
var applyOps []string
var applyErr error
go func() {
defer wg.Done()
defer close(execQueue)
applyOps, applyErr = this.applyQueue(db, newResult, showLog, execQueue)
}()
var sqlErrors = []error{}
var sqlErrLocker = &sync.Mutex{}
for i := 0; i < threads; i++ {
go func() {
defer wg.Done()
for item := range execQueue {
_, err := db.Exec(item.sqlString, item.args...)
if err != nil {
sqlErrLocker.Lock()
sqlErrors = append(sqlErrors, errors.New(item.sqlString+": "+err.Error()))
sqlErrLocker.Unlock()
break
}
}
}()
}
wg.Wait()
if applyErr != nil {
return nil, applyErr
}
if len(sqlErrors) == 0 {
// 升级数据
err = UpgradeSQLData(db)
if err != nil {
return nil, errors.New("upgrade data failed: " + err.Error())
}
return applyOps, nil
}
return nil, sqlErrors[0]
}
func (this *SQLDump) applyQueue(db *dbs.DB, newResult *SQLDumpResult, showLog bool, queue chan *sqlItem) (ops []string, err error) {
var execSQL = func(sqlString string, args ...any) {
queue <- &sqlItem{
sqlString: sqlString,
args: args,
}
}
currentResult, err := this.Dump(db) currentResult, err := this.Dump(db)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -143,10 +221,14 @@ func (this *SQLDump) Apply(db *dbs.DB, newResult *SQLDumpResult, showLog bool) (
if showLog { if showLog {
fmt.Println(op) fmt.Println(op)
} }
if len(newTable.Records) == 0 {
execSQL(newTable.Definition)
} else {
_, err = db.Exec(newTable.Definition) _, err = db.Exec(newTable.Definition)
if err != nil { if err != nil {
return nil, errors.New("'" + op + "' failed: " + err.Error()) return nil, errors.New("'" + op + "' failed: " + err.Error())
} }
}
} else if oldTable.Definition != newTable.Definition { } else if oldTable.Definition != newTable.Definition {
// 对比字段 // 对比字段
// + // +
@@ -291,10 +373,7 @@ func (this *SQLDump) Apply(db *dbs.DB, newResult *SQLDumpResult, showLog bool) (
values = append(values, v) values = append(values, v)
} }
_, err = db.Exec("INSERT INTO "+newTable.Name+" ("+strings.Join(params, ", ")+") VALUES ("+strings.Join(args, ", ")+")", values...) execSQL("INSERT INTO "+newTable.Name+" ("+strings.Join(params, ", ")+") VALUES ("+strings.Join(args, ", ")+")", values...)
if err != nil {
return nil, err
}
} else if !record.ValuesEquals(one) { } else if !record.ValuesEquals(one) {
ops = append(ops, "* record "+newTable.Name+" "+strings.Join(valueStrings, ", ")) ops = append(ops, "* record "+newTable.Name+" "+strings.Join(valueStrings, ", "))
if showLog { if showLog {
@@ -316,10 +395,8 @@ func (this *SQLDump) Apply(db *dbs.DB, newResult *SQLDumpResult, showLog bool) (
values = append(values, v) values = append(values, v)
} }
values = append(values, one.GetInt("id")) values = append(values, one.GetInt("id"))
_, err = db.Exec("UPDATE "+newTable.Name+" SET "+strings.Join(args, ", ")+" WHERE id=?", values...)
if err != nil { execSQL("UPDATE "+newTable.Name+" SET "+strings.Join(args, ", ")+" WHERE id=?", values...)
return nil, err
}
} }
} }
} }
@@ -327,12 +404,6 @@ func (this *SQLDump) Apply(db *dbs.DB, newResult *SQLDumpResult, showLog bool) (
// 减少表格 // 减少表格
// 由于我们不删除任何表格,所以这里什么都不做 // 由于我们不删除任何表格,所以这里什么都不做
// 升级数据
err = UpgradeSQLData(db)
if err != nil {
return nil, errors.New("upgrade data failed: " + err.Error())
}
return return
} }

View File

@@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"testing" "testing"
"time"
) )
func TestSQLDump_Dump(t *testing.T) { func TestSQLDump_Dump(t *testing.T) {
@@ -62,15 +63,20 @@ func TestSQLDump_Apply(t *testing.T) {
_ = db.Close() _ = db.Close()
}() }()
dump := NewSQLDump() var dump = NewSQLDump()
result, err := dump.Dump(db) result, err := dump.Dump(db)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
var before = time.Now()
defer func() {
t.Log("cost:", time.Since(before))
}()
db2, err := dbs.NewInstanceFromConfig(&dbs.DBConfig{ db2, err := dbs.NewInstanceFromConfig(&dbs.DBConfig{
Driver: "mysql", Driver: "mysql",
Dsn: "root:123456@tcp(127.0.0.1:3306)/db_edge_new?charset=utf8mb4&timeout=30s", Dsn: "root:123456@tcp(192.168.2.60:3306)/db_edge_new?charset=utf8mb4&timeout=30s",
Prefix: "edge", Prefix: "edge",
}) })
if err != nil { if err != nil {
@@ -84,9 +90,10 @@ func TestSQLDump_Apply(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
t.Log("ok") t.Log("ok")
if len(ops) > 0 { /**if len(ops) > 0 {
for _, op := range ops { for _, op := range ops {
t.Log("", op) t.Log("", op)
} }
} }**/
_ = ops
} }