mirror of
https://github.com/TeaOSLab/EdgeAPI.git
synced 2025-11-06 18:10:25 +08:00
提升数据库升级速度
This commit is contained in:
@@ -18,7 +18,7 @@ func main() {
|
|||||||
fmt.Println("[ERROR]" + err.Error())
|
fmt.Println("[ERROR]" + err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
results, err := setup.NewSQLDump().Dump(db)
|
results, err := setup.NewSQLDump().Dump(db, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("[ERROR]" + err.Error())
|
fmt.Println("[ERROR]" + err.Error())
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
"github.com/iwind/TeaGo/lists"
|
"github.com/iwind/TeaGo/lists"
|
||||||
"github.com/iwind/TeaGo/types"
|
"github.com/iwind/TeaGo/types"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
"runtime"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
@@ -53,14 +54,20 @@ func NewSQLDump() *SQLDump {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Dump 导出数据
|
// Dump 导出数据
|
||||||
func (this *SQLDump) Dump(db *dbs.DB) (result *SQLDumpResult, err error) {
|
func (this *SQLDump) Dump(db *dbs.DB, includingRecords bool) (result *SQLDumpResult, err error) {
|
||||||
result = &SQLDumpResult{}
|
result = &SQLDumpResult{}
|
||||||
|
|
||||||
tableNames, err := db.TableNames()
|
tableNames, err := db.TableNames()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
for _, tableName := range tableNames {
|
|
||||||
|
fullTableMap, err := this.findFullTables(db, tableNames)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for tableName, table := range fullTableMap {
|
||||||
// 忽略一些分表
|
// 忽略一些分表
|
||||||
if strings.HasPrefix(strings.ToLower(tableName), strings.ToLower("edgeHTTPAccessLogs_")) {
|
if strings.HasPrefix(strings.ToLower(tableName), strings.ToLower("edgeHTTPAccessLogs_")) {
|
||||||
continue
|
continue
|
||||||
@@ -69,10 +76,6 @@ func (this *SQLDump) Dump(db *dbs.DB) (result *SQLDumpResult, err error) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
table, err := db.FindFullTable(tableName)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
sqlTable := &SQLTable{
|
sqlTable := &SQLTable{
|
||||||
Name: table.Name,
|
Name: table.Name,
|
||||||
Engine: table.Engine,
|
Engine: table.Engine,
|
||||||
@@ -102,28 +105,30 @@ func (this *SQLDump) Dump(db *dbs.DB) (result *SQLDumpResult, err error) {
|
|||||||
|
|
||||||
// Records
|
// Records
|
||||||
var records = []*SQLRecord{}
|
var records = []*SQLRecord{}
|
||||||
recordsTable := this.findRecordsTable(tableName)
|
if includingRecords {
|
||||||
if recordsTable != nil {
|
recordsTable := this.findRecordsTable(tableName)
|
||||||
ones, _, err := db.FindOnes("SELECT * FROM " + tableName + " ORDER BY id ASC")
|
if recordsTable != nil {
|
||||||
if err != nil {
|
ones, _, err := db.FindOnes("SELECT * FROM " + tableName + " ORDER BY id ASC")
|
||||||
return result, err
|
if err != nil {
|
||||||
}
|
return result, err
|
||||||
for _, one := range ones {
|
|
||||||
record := &SQLRecord{
|
|
||||||
Id: one.GetInt64("id"),
|
|
||||||
Values: map[string]string{},
|
|
||||||
UniqueFields: recordsTable.UniqueFields,
|
|
||||||
ExceptFields: recordsTable.ExceptFields,
|
|
||||||
}
|
}
|
||||||
for k, v := range one {
|
for _, one := range ones {
|
||||||
// 需要排除的字段
|
record := &SQLRecord{
|
||||||
if lists.ContainsString(record.ExceptFields, k) {
|
Id: one.GetInt64("id"),
|
||||||
continue
|
Values: map[string]string{},
|
||||||
|
UniqueFields: recordsTable.UniqueFields,
|
||||||
|
ExceptFields: recordsTable.ExceptFields,
|
||||||
}
|
}
|
||||||
|
for k, v := range one {
|
||||||
|
// 需要排除的字段
|
||||||
|
if lists.ContainsString(record.ExceptFields, k) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
record.Values[k] = types.String(v)
|
record.Values[k] = types.String(v)
|
||||||
|
}
|
||||||
|
records = append(records, record)
|
||||||
}
|
}
|
||||||
records = append(records, record)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sqlTable.Records = records
|
sqlTable.Records = records
|
||||||
@@ -218,7 +223,7 @@ func (this *SQLDump) applyQueue(db *dbs.DB, newResult *SQLDumpResult, showLog bo
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
currentResult, err := this.Dump(db)
|
currentResult, err := this.Dump(db, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -343,7 +348,7 @@ func (this *SQLDump) applyQueue(db *dbs.DB, newResult *SQLDumpResult, showLog bo
|
|||||||
// +
|
// +
|
||||||
for _, record := range newTable.Records {
|
for _, record := range newTable.Records {
|
||||||
var queryArgs = []string{}
|
var queryArgs = []string{}
|
||||||
var queryValues = []interface{}{}
|
var queryValues = []any{}
|
||||||
var valueStrings = []string{}
|
var valueStrings = []string{}
|
||||||
for _, field := range record.UniqueFields {
|
for _, field := range record.UniqueFields {
|
||||||
queryArgs = append(queryArgs, field+"=?")
|
queryArgs = append(queryArgs, field+"=?")
|
||||||
@@ -390,8 +395,8 @@ func (this *SQLDump) applyQueue(db *dbs.DB, newResult *SQLDumpResult, showLog bo
|
|||||||
if showLog {
|
if showLog {
|
||||||
fmt.Println("* record " + newTable.Name + " " + strings.Join(valueStrings, ", "))
|
fmt.Println("* record " + newTable.Name + " " + strings.Join(valueStrings, ", "))
|
||||||
}
|
}
|
||||||
args := []string{}
|
var args = []string{}
|
||||||
values := []interface{}{}
|
var values = []any{}
|
||||||
for k, v := range record.Values {
|
for k, v := range record.Values {
|
||||||
if k == "id" {
|
if k == "id" {
|
||||||
continue
|
continue
|
||||||
@@ -418,6 +423,59 @@ func (this *SQLDump) applyQueue(db *dbs.DB, newResult *SQLDumpResult, showLog bo
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 查找所有表的完整信息
|
||||||
|
func (this *SQLDump) findFullTables(db *dbs.DB, tableNames []string) (map[string]*dbs.Table, error) {
|
||||||
|
var fullTableMap = map[string]*dbs.Table{}
|
||||||
|
if len(tableNames) == 0 {
|
||||||
|
return fullTableMap, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var locker = &sync.Mutex{}
|
||||||
|
var queue = make(chan string, len(tableNames))
|
||||||
|
for _, tableName := range tableNames {
|
||||||
|
queue <- tableName
|
||||||
|
}
|
||||||
|
|
||||||
|
var wg = &sync.WaitGroup{}
|
||||||
|
var concurrent = 8
|
||||||
|
|
||||||
|
if runtime.NumCPU() > 4 {
|
||||||
|
concurrent = 32
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Add(concurrent)
|
||||||
|
var lastErr error
|
||||||
|
for i := 0; i < concurrent; i++ {
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case tableName := <-queue:
|
||||||
|
table, err := db.FindFullTable(tableName)
|
||||||
|
if err != nil {
|
||||||
|
locker.Lock()
|
||||||
|
lastErr = err
|
||||||
|
locker.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
locker.Lock()
|
||||||
|
fullTableMap[tableName] = table
|
||||||
|
locker.Unlock()
|
||||||
|
default:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
if lastErr != nil {
|
||||||
|
return nil, lastErr
|
||||||
|
}
|
||||||
|
|
||||||
|
return fullTableMap, nil
|
||||||
|
}
|
||||||
|
|
||||||
// 查找有记录的表
|
// 查找有记录的表
|
||||||
func (this *SQLDump) findRecordsTable(tableName string) *SQLRecordsTable {
|
func (this *SQLDump) findRecordsTable(tableName string) *SQLRecordsTable {
|
||||||
for _, table := range recordsTables {
|
for _, table := range recordsTables {
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ func TestSQLDump_Dump(t *testing.T) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
dump := NewSQLDump()
|
dump := NewSQLDump()
|
||||||
result, err := dump.Dump(db)
|
result, err := dump.Dump(db, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -64,7 +64,7 @@ func TestSQLDump_Apply(t *testing.T) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
var dump = NewSQLDump()
|
var dump = NewSQLDump()
|
||||||
result, err := dump.Dump(db)
|
result, err := dump.Dump(db, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -55,7 +55,7 @@ func (this *SQLExecutor) Run(showLog bool) error {
|
|||||||
_ = db.Close()
|
_ = db.Close()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
sqlDump := NewSQLDump()
|
var sqlDump = NewSQLDump()
|
||||||
_, err = sqlDump.Apply(db, LatestSQLResult, showLog)
|
_, err = sqlDump.Apply(db, LatestSQLResult, showLog)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|||||||
Reference in New Issue
Block a user