Files
EdgeAPI/internal/setup/sql_dump.go

450 lines
12 KiB
Go
Raw Normal View History

2020-11-16 23:30:47 +08:00
package setup
import (
"errors"
"fmt"
2020-11-16 23:30:47 +08:00
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/lists"
2020-11-16 23:30:47 +08:00
"github.com/iwind/TeaGo/types"
"regexp"
"strings"
"sync"
2020-11-16 23:30:47 +08:00
)
var recordsTables = []*SQLRecordsTable{
{
TableName: "edgeRegionCountries",
UniqueFields: []string{"name"},
ExceptFields: []string{"customName", "customCodes"},
2020-11-16 23:30:47 +08:00
},
{
TableName: "edgeRegionProvinces",
UniqueFields: []string{"name", "countryId"},
ExceptFields: []string{"customName", "customCodes"},
2020-11-16 23:30:47 +08:00
},
{
TableName: "edgeRegionCities",
UniqueFields: []string{"name", "provinceId"},
ExceptFields: []string{"customName", "customCodes"},
},
{
TableName: "edgeRegionTowns",
UniqueFields: []string{"name", "cityId"},
ExceptFields: []string{"customName", "customCodes"},
},
{
TableName: "edgeRegionProviders",
UniqueFields: []string{"name"},
ExceptFields: []string{"customName", "customCodes"},
},
2020-11-16 23:30:47 +08:00
}
type sqlItem struct {
sqlString string
args []any
}
2020-11-16 23:30:47 +08:00
type SQLDump struct {
}
func NewSQLDump() *SQLDump {
return &SQLDump{}
}
// Dump 导出数据
2020-11-16 23:30:47 +08:00
func (this *SQLDump) Dump(db *dbs.DB) (result *SQLDumpResult, err error) {
result = &SQLDumpResult{}
tableNames, err := db.TableNames()
if err != nil {
return result, err
}
for _, tableName := range tableNames {
// 忽略一些分表
2022-03-08 19:55:39 +08:00
if strings.HasPrefix(strings.ToLower(tableName), strings.ToLower("edgeHTTPAccessLogs_")) {
continue
}
if strings.HasPrefix(strings.ToLower(tableName), strings.ToLower("edgeNSAccessLogs_")) {
2020-11-16 23:30:47 +08:00
continue
}
table, err := db.FindFullTable(tableName)
if err != nil {
return nil, err
}
sqlTable := &SQLTable{
Name: table.Name,
Engine: table.Engine,
Charset: table.Collation,
2022-09-23 09:28:19 +08:00
Definition: regexp.MustCompile(` AUTO_INCREMENT=\d+`).ReplaceAllString(table.Code, ""),
2020-11-16 23:30:47 +08:00
}
// 字段
var fields = []*SQLField{}
2020-11-16 23:30:47 +08:00
for _, field := range table.Fields {
fields = append(fields, &SQLField{
Name: field.Name,
Definition: field.Definition(),
})
}
sqlTable.Fields = fields
// 索引
var indexes = []*SQLIndex{}
2020-11-16 23:30:47 +08:00
for _, index := range table.Indexes {
indexes = append(indexes, &SQLIndex{
Name: index.Name,
Definition: index.Definition(),
})
}
sqlTable.Indexes = indexes
// Records
var records = []*SQLRecord{}
2020-11-16 23:30:47 +08:00
recordsTable := this.findRecordsTable(tableName)
if recordsTable != nil {
ones, _, err := db.FindOnes("SELECT * FROM " + tableName + " ORDER BY id ASC")
if err != nil {
return result, err
}
for _, one := range ones {
record := &SQLRecord{
Id: one.GetInt64("id"),
Values: map[string]string{},
UniqueFields: recordsTable.UniqueFields,
ExceptFields: recordsTable.ExceptFields,
2020-11-16 23:30:47 +08:00
}
for k, v := range one {
// 需要排除的字段
if lists.ContainsString(record.ExceptFields, k) {
continue
}
2020-11-16 23:30:47 +08:00
record.Values[k] = types.String(v)
}
records = append(records, record)
}
}
sqlTable.Records = records
result.Tables = append(result.Tables, sqlTable)
}
return
}
// Apply 应用数据
func (this *SQLDump) Apply(db *dbs.DB, newResult *SQLDumpResult, showLog bool) (ops []string, err error) {
// 设置Innodb事务提交模式
{
result, err := db.FindOne("SHOW VARIABLES WHERE variable_name='innodb_flush_log_at_trx_commit'")
if err == nil && result != nil {
var oldValue = result.GetInt("Value")
if oldValue == 1 {
_, _ = db.Exec("SET GLOBAL innodb_flush_log_at_trx_commit=2")
}
}
}
// 执行队列
var execQueue = make(chan *sqlItem, 256)
var threads = 32
var wg = sync.WaitGroup{}
wg.Add(threads + 1 /** applyQueue **/)
var applyOps []string
var applyErr error
go func() {
defer wg.Done()
defer close(execQueue)
applyOps, applyErr = this.applyQueue(db, newResult, showLog, execQueue)
}()
var sqlErrors = []error{}
var sqlErrLocker = &sync.Mutex{}
for i := 0; i < threads; i++ {
go func() {
defer wg.Done()
for item := range execQueue {
_, err := db.Exec(item.sqlString, item.args...)
if err != nil {
sqlErrLocker.Lock()
sqlErrors = append(sqlErrors, errors.New(item.sqlString+": "+err.Error()))
sqlErrLocker.Unlock()
break
}
}
}()
}
wg.Wait()
if applyErr != nil {
return nil, applyErr
}
if len(sqlErrors) == 0 {
// 升级数据
err = UpgradeSQLData(db)
if err != nil {
return nil, errors.New("upgrade data failed: " + err.Error())
}
return applyOps, nil
}
return nil, sqlErrors[0]
}
func (this *SQLDump) applyQueue(db *dbs.DB, newResult *SQLDumpResult, showLog bool, queue chan *sqlItem) (ops []string, err error) {
var execSQL = func(sqlString string, args ...any) {
queue <- &sqlItem{
sqlString: sqlString,
args: args,
}
}
2020-11-16 23:30:47 +08:00
currentResult, err := this.Dump(db)
if err != nil {
return nil, err
}
// 新增表格
for _, newTable := range newResult.Tables {
var oldTable = currentResult.FindTable(newTable.Name)
2020-11-16 23:30:47 +08:00
if oldTable == nil {
var op = "+ table " + newTable.Name
ops = append(ops, op)
if showLog {
fmt.Println(op)
}
if len(newTable.Records) == 0 {
execSQL(newTable.Definition)
} else {
_, err = db.Exec(newTable.Definition)
if err != nil {
return nil, errors.New("'" + op + "' failed: " + err.Error())
}
2020-11-16 23:30:47 +08:00
}
} else if oldTable.Definition != newTable.Definition {
// 对比字段
// +
for _, newField := range newTable.Fields {
var oldField = oldTable.FindField(newField.Name)
2020-11-16 23:30:47 +08:00
if oldField == nil {
var op = "+ " + newTable.Name + " " + newField.Name
ops = append(ops, op)
if showLog {
fmt.Println(op)
}
2020-12-09 20:44:05 +08:00
_, err = db.Exec("ALTER TABLE " + newTable.Name + " ADD `" + newField.Name + "` " + newField.Definition)
2020-11-16 23:30:47 +08:00
if err != nil {
return nil, errors.New("'" + op + "' failed: " + err.Error())
2020-11-16 23:30:47 +08:00
}
2020-12-09 20:44:05 +08:00
} else if !newField.EqualDefinition(oldField.Definition) {
var op = "* " + newTable.Name + " " + newField.Name
ops = append(ops, op)
if showLog {
fmt.Println(op)
}
2020-12-09 20:44:05 +08:00
_, err = db.Exec("ALTER TABLE " + newTable.Name + " MODIFY `" + newField.Name + "` " + newField.Definition)
2020-11-16 23:30:47 +08:00
if err != nil {
return nil, errors.New("'" + op + "' failed: " + err.Error())
2020-11-16 23:30:47 +08:00
}
}
}
// 对比索引
// +
for _, newIndex := range newTable.Indexes {
var oldIndex = oldTable.FindIndex(newIndex.Name)
2020-11-16 23:30:47 +08:00
if oldIndex == nil {
var op = "+ index " + newTable.Name + " " + newIndex.Name
ops = append(ops, op)
if showLog {
fmt.Println(op)
}
2020-11-16 23:30:47 +08:00
_, err = db.Exec("ALTER TABLE " + newTable.Name + " ADD " + newIndex.Definition)
if err != nil {
2021-08-06 14:22:17 +08:00
err = this.tryCreateIndex(err, db, newTable.Name, newIndex.Definition)
if err != nil {
return nil, errors.New("'" + op + "' failed: " + err.Error())
2021-08-06 14:22:17 +08:00
}
2020-11-16 23:30:47 +08:00
}
} else if oldIndex.Definition != newIndex.Definition {
var op = "* index " + newTable.Name + " " + newIndex.Name
ops = append(ops, op)
if showLog {
fmt.Println(op)
}
2020-11-16 23:30:47 +08:00
_, err = db.Exec("ALTER TABLE " + newTable.Name + " DROP KEY " + newIndex.Name)
if err != nil {
return nil, errors.New("'" + op + "' drop old key failed: " + err.Error())
2020-11-16 23:30:47 +08:00
}
_, err = db.Exec("ALTER TABLE " + newTable.Name + " ADD " + newIndex.Definition)
if err != nil {
2021-08-06 14:22:17 +08:00
err = this.tryCreateIndex(err, db, newTable.Name, newIndex.Definition)
if err != nil {
return nil, errors.New("'" + op + "' failed: " + err.Error())
2021-08-06 14:22:17 +08:00
}
2020-11-16 23:30:47 +08:00
}
}
}
// -
for _, oldIndex := range oldTable.Indexes {
var newIndex = newTable.FindIndex(oldIndex.Name)
2020-11-16 23:30:47 +08:00
if newIndex == nil {
var op = "- index " + oldTable.Name + " " + oldIndex.Name
ops = append(ops, op)
if showLog {
fmt.Println(op)
}
2020-11-16 23:30:47 +08:00
_, err = db.Exec("ALTER TABLE " + oldTable.Name + " DROP KEY " + oldIndex.Name)
if err != nil {
return nil, errors.New("'" + op + "' failed: " + err.Error())
2020-11-16 23:30:47 +08:00
}
}
}
// 对比字段
// -
for _, oldField := range oldTable.Fields {
var newField = newTable.FindField(oldField.Name)
2020-11-16 23:30:47 +08:00
if newField == nil {
var op = "- field " + oldTable.Name + " " + oldField.Name
ops = append(ops, op)
if showLog {
fmt.Println(op)
}
2020-12-09 20:44:05 +08:00
_, err = db.Exec("ALTER TABLE " + oldTable.Name + " DROP COLUMN `" + oldField.Name + "`")
2020-11-16 23:30:47 +08:00
if err != nil {
return nil, errors.New("'" + op + "' failed: " + err.Error())
2020-11-16 23:30:47 +08:00
}
}
}
}
// 对比记录
// +
for _, record := range newTable.Records {
var queryArgs = []string{}
var queryValues = []interface{}{}
var valueStrings = []string{}
2020-11-16 23:30:47 +08:00
for _, field := range record.UniqueFields {
queryArgs = append(queryArgs, field+"=?")
queryValues = append(queryValues, record.Values[field])
valueStrings = append(valueStrings, record.Values[field])
}
var recordId int64
for field, recordValue := range record.Values {
if field == "id" {
recordId = types.Int64(recordValue)
break
}
}
queryValues = append(queryValues, recordId)
one, err := db.FindOne("SELECT * FROM "+newTable.Name+" WHERE (("+strings.Join(queryArgs, " AND ")+") OR id=?)", queryValues...)
2020-11-16 23:30:47 +08:00
if err != nil {
return nil, err
}
if one == nil {
ops = append(ops, "+ record "+newTable.Name+" "+strings.Join(valueStrings, ", "))
if showLog {
fmt.Println("+ record " + newTable.Name + " " + strings.Join(valueStrings, ", "))
}
var params = []string{}
var args = []string{}
var values = []any{}
2020-11-16 23:30:47 +08:00
for k, v := range record.Values {
// 需要排除的字段
if lists.ContainsString(record.ExceptFields, k) {
continue
}
// ID需要保留因为各个表格之间需要有对应关系
2020-11-16 23:30:47 +08:00
params = append(params, "`"+k+"`")
args = append(args, "?")
values = append(values, v)
}
execSQL("INSERT INTO "+newTable.Name+" ("+strings.Join(params, ", ")+") VALUES ("+strings.Join(args, ", ")+")", values...)
2020-11-16 23:30:47 +08:00
} else if !record.ValuesEquals(one) {
ops = append(ops, "* record "+newTable.Name+" "+strings.Join(valueStrings, ", "))
if showLog {
fmt.Println("* record " + newTable.Name + " " + strings.Join(valueStrings, ", "))
}
2020-11-16 23:30:47 +08:00
args := []string{}
values := []interface{}{}
for k, v := range record.Values {
if k == "id" {
continue
}
// 需要排除的字段
if lists.ContainsString(record.ExceptFields, k) {
continue
}
2020-11-16 23:30:47 +08:00
args = append(args, k+"=?")
values = append(values, v)
}
values = append(values, one.GetInt("id"))
execSQL("UPDATE "+newTable.Name+" SET "+strings.Join(args, ", ")+" WHERE id=?", values...)
2020-11-16 23:30:47 +08:00
}
}
}
// 减少表格
// 由于我们不删除任何表格,所以这里什么都不做
return
}
// 查找有记录的表
func (this *SQLDump) findRecordsTable(tableName string) *SQLRecordsTable {
for _, table := range recordsTables {
if table.TableName == tableName {
return table
}
}
return nil
}
2021-08-06 14:22:17 +08:00
// 创建索引
func (this *SQLDump) tryCreateIndex(err error, db *dbs.DB, tableName string, indexDefinition string) error {
if err == nil {
return nil
}
// 处理Duplicate entry
2021-08-07 16:11:35 +08:00
if strings.Contains(err.Error(), "Error 1062: Duplicate entry") && (strings.HasSuffix(tableName, "Stats") || strings.HasSuffix(tableName, "Values")) {
2021-08-06 14:22:17 +08:00
var tries = 5 // 尝试次数
for i := 0; i < tries; i++ {
_, err = db.Exec("TRUNCATE TABLE " + tableName)
if err != nil {
if i == tries-1 {
return err
}
continue
}
_, err = db.Exec("ALTER TABLE " + tableName + " ADD " + indexDefinition)
if err != nil {
if i == tries-1 {
return err
}
} else {
return nil
}
}
}
return err
}