diff --git a/internal/db/utils/utils.go b/internal/db/utils/utils.go index e50cd5d6..d8d2e7c5 100644 --- a/internal/db/utils/utils.go +++ b/internal/db/utils/utils.go @@ -35,3 +35,37 @@ func QuoteLikePrefix(keyword string) string { func QuoteLikeSuffix(keyword string) string { return "%" + QuoteLikeKeyword(keyword) } + +// SetGlobalVarMin 设置变量最小值 +func SetGlobalVarMin(db *dbs.DB, variableName string, minValue int) error { + result, err := db.FindOne("SHOW VARIABLES WHERE variable_name=?", variableName) + if err != nil { + return err + } + if len(result) == 0 { + return nil + } + var oldValue = result.GetInt("Value") + if oldValue > 0 /** 小于等于0通常表示不限制 **/ && oldValue < minValue { + _, err = db.Exec("SET GLOBAL "+variableName+"=?", minValue) + return err + } + return nil +} + +// SetGlobalVarMax 设置变量最大值 +func SetGlobalVarMax(db *dbs.DB, variableName string, maxValue int) error { + result, err := db.FindOne("SHOW VARIABLES WHERE variable_name=?", variableName) + if err != nil { + return err + } + if len(result) == 0 { + return nil + } + var oldValue = result.GetInt("Value") + if oldValue > maxValue { + _, err = db.Exec("SET GLOBAL "+variableName+"=?", maxValue) + return err + } + return nil +} diff --git a/internal/nodes/api_node.go b/internal/nodes/api_node.go index 924a1ea6..41d161f2 100644 --- a/internal/nodes/api_node.go +++ b/internal/nodes/api_node.go @@ -8,6 +8,7 @@ import ( "github.com/TeaOSLab/EdgeAPI/internal/configs" teaconst "github.com/TeaOSLab/EdgeAPI/internal/const" "github.com/TeaOSLab/EdgeAPI/internal/db/models" + dbutils "github.com/TeaOSLab/EdgeAPI/internal/db/utils" "github.com/TeaOSLab/EdgeAPI/internal/events" "github.com/TeaOSLab/EdgeAPI/internal/goman" "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" @@ -32,7 +33,6 @@ import ( "net" "os" "os/exec" - "regexp" "runtime" "sort" "strconv" @@ -384,27 +384,20 @@ func (this *APINode) setupDB() error { return err } - // 调整预处理语句数量 + // 设置Innodb事务提交模式 { - result, err := db.FindOne("SHOW VARIABLES WHERE variable_name='max_prepared_stmt_count'") - if err != nil { - return err - } - var value = result.GetString("Value") - if regexp.MustCompile(`^\d+$`).MatchString(value) { - var valueInt = types.Int(value) - if valueInt < 65535 { - _, err := db.Exec("SET GLOBAL max_prepared_stmt_count=65535") - if err != nil { - return errors.New("run 'SET GLOBAL max_prepared_stmt_count' on database failed: " + err.Error() + ", \nyou can change the variable in 'my.cnf': \n~~~\n" + `[mysqld] -max_prepared_stmt_count=65535 -~~~ -then restart mysqld.`) - } + result, err := db.FindOne("SHOW VARIABLES WHERE variable_name='innodb_flush_log_at_trx_commit'") + if err == nil && result != nil { + var oldValue = result.GetInt("Value") + if oldValue == 1 { + _, _ = db.Exec("SET GLOBAL innodb_flush_log_at_trx_commit=2") } } } + // 调整预处理语句数量 + _ = dbutils.SetGlobalVarMin(db, "max_prepared_stmt_count", 65535) + // 调整binlog过期时间 { const binlogExpireDays = 7 @@ -413,25 +406,22 @@ then restart mysqld.`) if err == nil { var versionString = types.String(version) if strings.HasPrefix(versionString, "8.") { - result, err := db.FindOne("SHOW VARIABLES WHERE variable_name='binlog_expire_logs_seconds'") - if err == nil && result != nil { - var oldValue = result.GetInt("Value") - if oldValue > binlogExpireDays*86400 { - _, _ = db.Exec("SET GLOBAL binlog_expire_logs_seconds=" + types.String(binlogExpireDays*86400)) - } - } + _ = dbutils.SetGlobalVarMax(db, "binlog_expire_logs_seconds", binlogExpireDays*86400) } else if strings.HasPrefix(versionString, "5.") { - result, err := db.FindOne("SHOW VARIABLES WHERE variable_name='expire_logs_days'") - if err == nil && result != nil { - var oldValue = result.GetInt("Value") - if oldValue > binlogExpireDays { - _, _ = db.Exec("SET GLOBAL expire_logs_days=" + types.String(binlogExpireDays)) - } - } + _ = dbutils.SetGlobalVarMax(db, "expire_logs_days", binlogExpireDays) } } } + // 设置binlog_cache_size + _ = dbutils.SetGlobalVarMin(db, "binlog_cache_size", 1*1024*1024) + + // 设置binlog_stmt_cache_size + _ = dbutils.SetGlobalVarMin(db, "binlog_stmt_cache_size", 1*1024*1024) + + // 设置thread_cache_size + _ = dbutils.SetGlobalVarMin(db, "thread_cache_size", 32) + return nil } diff --git a/internal/setup/sql_dump.go b/internal/setup/sql_dump.go index 4c0af150..4e2341f6 100644 --- a/internal/setup/sql_dump.go +++ b/internal/setup/sql_dump.go @@ -1,13 +1,14 @@ package setup import ( + "errors" "fmt" - "github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/lists" "github.com/iwind/TeaGo/types" "regexp" "strings" + "sync" ) var recordsTables = []*SQLRecordsTable{ @@ -38,6 +39,11 @@ var recordsTables = []*SQLRecordsTable{ }, } +type sqlItem struct { + sqlString string + args []any +} + type SQLDump struct { } @@ -129,6 +135,78 @@ func (this *SQLDump) Dump(db *dbs.DB) (result *SQLDumpResult, err error) { // Apply 应用数据 func (this *SQLDump) Apply(db *dbs.DB, newResult *SQLDumpResult, showLog bool) (ops []string, err error) { + // 设置Innodb事务提交模式 + { + + result, err := db.FindOne("SHOW VARIABLES WHERE variable_name='innodb_flush_log_at_trx_commit'") + if err == nil && result != nil { + var oldValue = result.GetInt("Value") + if oldValue == 1 { + _, _ = db.Exec("SET GLOBAL innodb_flush_log_at_trx_commit=2") + } + } + } + + // 执行队列 + var execQueue = make(chan *sqlItem, 256) + + var threads = 32 + var wg = sync.WaitGroup{} + wg.Add(threads + 1 /** applyQueue **/) + + var applyOps []string + var applyErr error + go func() { + defer wg.Done() + defer close(execQueue) + + applyOps, applyErr = this.applyQueue(db, newResult, showLog, execQueue) + }() + + var sqlErrors = []error{} + var sqlErrLocker = &sync.Mutex{} + for i := 0; i < threads; i++ { + go func() { + defer wg.Done() + + for item := range execQueue { + _, err := db.Exec(item.sqlString, item.args...) + if err != nil { + sqlErrLocker.Lock() + sqlErrors = append(sqlErrors, errors.New(item.sqlString+": "+err.Error())) + sqlErrLocker.Unlock() + break + } + } + }() + } + wg.Wait() + + if applyErr != nil { + return nil, applyErr + } + + if len(sqlErrors) == 0 { + // 升级数据 + err = UpgradeSQLData(db) + if err != nil { + return nil, errors.New("upgrade data failed: " + err.Error()) + } + + return applyOps, nil + } + + return nil, sqlErrors[0] +} + +func (this *SQLDump) applyQueue(db *dbs.DB, newResult *SQLDumpResult, showLog bool, queue chan *sqlItem) (ops []string, err error) { + var execSQL = func(sqlString string, args ...any) { + queue <- &sqlItem{ + sqlString: sqlString, + args: args, + } + } + currentResult, err := this.Dump(db) if err != nil { return nil, err @@ -143,9 +221,13 @@ func (this *SQLDump) Apply(db *dbs.DB, newResult *SQLDumpResult, showLog bool) ( if showLog { fmt.Println(op) } - _, err = db.Exec(newTable.Definition) - if err != nil { - return nil, errors.New("'" + op + "' failed: " + err.Error()) + if len(newTable.Records) == 0 { + execSQL(newTable.Definition) + } else { + _, err = db.Exec(newTable.Definition) + if err != nil { + return nil, errors.New("'" + op + "' failed: " + err.Error()) + } } } else if oldTable.Definition != newTable.Definition { // 对比字段 @@ -291,10 +373,7 @@ func (this *SQLDump) Apply(db *dbs.DB, newResult *SQLDumpResult, showLog bool) ( values = append(values, v) } - _, err = db.Exec("INSERT INTO "+newTable.Name+" ("+strings.Join(params, ", ")+") VALUES ("+strings.Join(args, ", ")+")", values...) - if err != nil { - return nil, err - } + execSQL("INSERT INTO "+newTable.Name+" ("+strings.Join(params, ", ")+") VALUES ("+strings.Join(args, ", ")+")", values...) } else if !record.ValuesEquals(one) { ops = append(ops, "* record "+newTable.Name+" "+strings.Join(valueStrings, ", ")) if showLog { @@ -316,10 +395,8 @@ func (this *SQLDump) Apply(db *dbs.DB, newResult *SQLDumpResult, showLog bool) ( values = append(values, v) } values = append(values, one.GetInt("id")) - _, err = db.Exec("UPDATE "+newTable.Name+" SET "+strings.Join(args, ", ")+" WHERE id=?", values...) - if err != nil { - return nil, err - } + + execSQL("UPDATE "+newTable.Name+" SET "+strings.Join(args, ", ")+" WHERE id=?", values...) } } } @@ -327,12 +404,6 @@ func (this *SQLDump) Apply(db *dbs.DB, newResult *SQLDumpResult, showLog bool) ( // 减少表格 // 由于我们不删除任何表格,所以这里什么都不做 - // 升级数据 - err = UpgradeSQLData(db) - if err != nil { - return nil, errors.New("upgrade data failed: " + err.Error()) - } - return } diff --git a/internal/setup/sql_dump_test.go b/internal/setup/sql_dump_test.go index ff72badd..3c22e6d8 100644 --- a/internal/setup/sql_dump_test.go +++ b/internal/setup/sql_dump_test.go @@ -4,6 +4,7 @@ import ( "encoding/json" "github.com/iwind/TeaGo/dbs" "testing" + "time" ) func TestSQLDump_Dump(t *testing.T) { @@ -62,15 +63,20 @@ func TestSQLDump_Apply(t *testing.T) { _ = db.Close() }() - dump := NewSQLDump() + var dump = NewSQLDump() result, err := dump.Dump(db) if err != nil { t.Fatal(err) } + var before = time.Now() + defer func() { + t.Log("cost:", time.Since(before)) + }() + db2, err := dbs.NewInstanceFromConfig(&dbs.DBConfig{ Driver: "mysql", - Dsn: "root:123456@tcp(127.0.0.1:3306)/db_edge_new?charset=utf8mb4&timeout=30s", + Dsn: "root:123456@tcp(192.168.2.60:3306)/db_edge_new?charset=utf8mb4&timeout=30s", Prefix: "edge", }) if err != nil { @@ -84,9 +90,10 @@ func TestSQLDump_Apply(t *testing.T) { t.Fatal(err) } t.Log("ok") - if len(ops) > 0 { + /**if len(ops) > 0 { for _, op := range ops { t.Log("", op) } - } + }**/ + _ = ops }