fix: 机器文件下载问题修复&dbm重构

This commit is contained in:
meilin.huang
2024-01-12 13:15:30 +08:00
parent bc811cbd49
commit bfd346e65a
32 changed files with 454 additions and 322 deletions

View File

@@ -0,0 +1,228 @@
package mysql
import (
"context"
"database/sql"
"fmt"
"mayfly-go/internal/db/dbm/dbi"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/utils/anyx"
"regexp"
"strings"
)
const (
MYSQL_META_FILE = "metasql/mysql_meta.sql"
MYSQL_DBS = "MYSQL_DBS"
MYSQL_TABLE_INFO_KEY = "MYSQL_TABLE_INFO"
MYSQL_INDEX_INFO_KEY = "MYSQL_INDEX_INFO"
MYSQL_COLUMN_MA_KEY = "MYSQL_COLUMN_MA"
)
type MysqlDialect struct {
dc *dbi.DbConn
}
func (md *MysqlDialect) GetDbServer() (*dbi.DbServer, error) {
_, res, err := md.dc.Query("SELECT VERSION() version")
if err != nil {
return nil, err
}
ds := &dbi.DbServer{
Version: anyx.ConvString(res[0]["version"]),
}
return ds, nil
}
func (md *MysqlDialect) GetDbNames() ([]string, error) {
_, res, err := md.dc.Query(dbi.GetLocalSql(MYSQL_META_FILE, MYSQL_DBS))
if err != nil {
return nil, err
}
databases := make([]string, 0)
for _, re := range res {
databases = append(databases, anyx.ConvString(re["dbname"]))
}
return databases, nil
}
// 获取表基础元信息, 如表名等
func (md *MysqlDialect) GetTables() ([]dbi.Table, error) {
_, res, err := md.dc.Query(dbi.GetLocalSql(MYSQL_META_FILE, MYSQL_TABLE_INFO_KEY))
if err != nil {
return nil, err
}
tables := make([]dbi.Table, 0)
for _, re := range res {
tables = append(tables, dbi.Table{
TableName: re["tableName"].(string),
TableComment: anyx.ConvString(re["tableComment"]),
CreateTime: anyx.ConvString(re["createTime"]),
TableRows: anyx.ConvInt(re["tableRows"]),
DataLength: anyx.ConvInt64(re["dataLength"]),
IndexLength: anyx.ConvInt64(re["indexLength"]),
})
}
return tables, nil
}
// 获取列元信息, 如列名等
func (md *MysqlDialect) GetColumns(tableNames ...string) ([]dbi.Column, error) {
tableName := ""
for i := 0; i < len(tableNames); i++ {
if i != 0 {
tableName = tableName + ", "
}
tableName = tableName + "'" + tableNames[i] + "'"
}
_, res, err := md.dc.Query(fmt.Sprintf(dbi.GetLocalSql(MYSQL_META_FILE, MYSQL_COLUMN_MA_KEY), tableName))
if err != nil {
return nil, err
}
columns := make([]dbi.Column, 0)
for _, re := range res {
columns = append(columns, dbi.Column{
TableName: re["tableName"].(string),
ColumnName: re["columnName"].(string),
ColumnType: anyx.ConvString(re["columnType"]),
ColumnComment: anyx.ConvString(re["columnComment"]),
Nullable: anyx.ConvString(re["nullable"]),
ColumnKey: anyx.ConvString(re["columnKey"]),
ColumnDefault: anyx.ConvString(re["columnDefault"]),
NumScale: anyx.ConvString(re["numScale"]),
})
}
return columns, nil
}
// 获取表主键字段名,不存在主键标识则默认第一个字段
func (md *MysqlDialect) GetPrimaryKey(tablename string) (string, error) {
columns, err := md.GetColumns(tablename)
if err != nil {
return "", err
}
if len(columns) == 0 {
return "", errorx.NewBiz("[%s] 表不存在", tablename)
}
for _, v := range columns {
if v.ColumnKey == "PRI" {
return v.ColumnName, nil
}
}
return columns[0].ColumnName, nil
}
// 获取表索引信息
func (md *MysqlDialect) GetTableIndex(tableName string) ([]dbi.Index, error) {
_, res, err := md.dc.Query(dbi.GetLocalSql(MYSQL_META_FILE, MYSQL_INDEX_INFO_KEY), tableName)
if err != nil {
return nil, err
}
indexs := make([]dbi.Index, 0)
for _, re := range res {
indexs = append(indexs, dbi.Index{
IndexName: re["indexName"].(string),
ColumnName: anyx.ConvString(re["columnName"]),
IndexType: anyx.ConvString(re["indexType"]),
IndexComment: anyx.ConvString(re["indexComment"]),
NonUnique: anyx.ConvInt(re["nonUnique"]),
SeqInIndex: anyx.ConvInt(re["seqInIndex"]),
})
}
// 把查询结果以索引名分组,索引字段以逗号连接
result := make([]dbi.Index, 0)
key := ""
for _, v := range indexs {
// 当前的索引名
in := v.IndexName
if key == in {
// 索引字段已根据名称和顺序排序,故取最后一个即可
i := len(result) - 1
// 同索引字段以逗号连接
result[i].ColumnName = result[i].ColumnName + "," + v.ColumnName
} else {
key = in
result = append(result, v)
}
}
return result, nil
}
// 获取建表ddl
func (md *MysqlDialect) GetTableDDL(tableName string) (string, error) {
_, res, err := md.dc.Query(fmt.Sprintf("show create table `%s` ", tableName))
if err != nil {
return "", err
}
return res[0]["Create Table"].(string) + ";", nil
}
func (md *MysqlDialect) WalkTableRecord(tableName string, walkFn dbi.WalkQueryRowsFunc) error {
return md.dc.WalkQueryRows(context.Background(), fmt.Sprintf("SELECT * FROM %s", tableName), walkFn)
}
func (md *MysqlDialect) GetSchemas() ([]string, error) {
return nil, nil
}
// GetDbProgram 获取数据库程序模块,用于数据库备份与恢复
func (md *MysqlDialect) GetDbProgram() dbi.DbProgram {
return NewDbProgramMysql(md.dc)
}
func (md *MysqlDialect) GetDataType(dbColumnType string) dbi.DataType {
if regexp.MustCompile(`(?i)int|double|float|number|decimal|byte|bit`).MatchString(dbColumnType) {
return dbi.DataTypeNumber
}
// 日期时间类型
if regexp.MustCompile(`(?i)datetime|timestamp`).MatchString(dbColumnType) {
return dbi.DataTypeDateTime
}
// 日期类型
if regexp.MustCompile(`(?i)date`).MatchString(dbColumnType) {
return dbi.DataTypeDate
}
// 时间类型
if regexp.MustCompile(`(?i)time`).MatchString(dbColumnType) {
return dbi.DataTypeTime
}
return dbi.DataTypeString
}
func (md *MysqlDialect) BatchInsert(tx *sql.Tx, tableName string, columns []string, values [][]any) (int64, error) {
// 生成占位符字符串:如:(?,?)
// 重复字符串并用逗号连接
repeated := strings.Repeat("?,", len(columns))
// 去除最后一个逗号,占位符由括号包裹
placeholder := fmt.Sprintf("(%s)", strings.TrimSuffix(repeated, ","))
// 执行批量insert sqlmysql支持批量insert语法
// insert into table_name (column1, column2, ...) values (value1, value2, ...), (value1, value2, ...), ...
// 重复占位符字符串n遍
repeated = strings.Repeat(placeholder+",", len(values))
// 去除最后一个逗号
placeholder = strings.TrimSuffix(repeated, ",")
sqlStr := fmt.Sprintf("insert into %s (%s) values %s", md.dc.Info.Type.QuoteIdentifier(tableName), strings.Join(columns, ","), placeholder)
// 执行批量insert sql
// 把二维数组转为一维数组
var args []any
for _, v := range values {
args = append(args, v...)
}
return md.dc.TxExec(tx, sqlStr, args...)
}
func (md *MysqlDialect) FormatStrData(dbColumnValue string, dataType dbi.DataType) string {
// mysql不需要格式化时间日期等
return dbColumnValue
}

View File

@@ -0,0 +1,52 @@
package mysql
import (
"context"
"database/sql"
"fmt"
"mayfly-go/internal/db/dbm/dbi"
machineapp "mayfly-go/internal/machine/application"
"net"
"sync"
"github.com/go-sql-driver/mysql"
)
var (
meta dbi.Meta
once sync.Once
)
func GetMeta() dbi.Meta {
once.Do(func() {
meta = new(MysqlMeta)
})
return meta
}
type MysqlMeta struct {
}
func (md *MysqlMeta) GetSqlDb(d *dbi.DbInfo) (*sql.DB, error) {
// SSH Conect
if d.SshTunnelMachineId > 0 {
sshTunnelMachine, err := machineapp.GetMachineApp().GetSshTunnelMachine(d.SshTunnelMachineId)
if err != nil {
return nil, err
}
mysql.RegisterDialContext(d.Network, func(ctx context.Context, addr string) (net.Conn, error) {
return sshTunnelMachine.GetDialConn("tcp", addr)
})
}
// 设置dataSourceName -> 更多参数参考https://github.com/go-sql-driver/mysql#dsn-data-source-name
dsn := fmt.Sprintf("%s:%s@%s(%s:%d)/%s?timeout=8s", d.Username, d.Password, d.Network, d.Host, d.Port, d.Database)
if d.Params != "" {
dsn = fmt.Sprintf("%s&%s", dsn, d.Params)
}
const driverName = "mysql"
return sql.Open(driverName, dsn)
}
func (md *MysqlMeta) GetDialect(conn *dbi.DbConn) dbi.Dialect {
return &MysqlDialect{conn}
}

View File

@@ -0,0 +1,772 @@
package mysql
import (
"bufio"
"context"
"database/sql"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
"time"
"github.com/pkg/errors"
"golang.org/x/sync/singleflight"
"mayfly-go/internal/db/config"
"mayfly-go/internal/db/dbm/dbi"
"mayfly-go/internal/db/domain/entity"
"mayfly-go/pkg/logx"
)
var _ dbi.DbProgram = (*DbProgramMysql)(nil)
type DbProgramMysql struct {
dbConn *dbi.DbConn
// mysqlBin 用于集成测试
mysqlBin *config.MysqlBin
// backupPath 用于集成测试
backupPath string
}
func NewDbProgramMysql(dbConn *dbi.DbConn) *DbProgramMysql {
return &DbProgramMysql{
dbConn: dbConn,
}
}
func (svc *DbProgramMysql) dbInfo() *dbi.DbInfo {
dbInfo := svc.dbConn.Info
err := dbInfo.IfUseSshTunnelChangeIpPort()
if err != nil {
logx.Errorf("通过ssh隧道连接db失败: %s", err.Error())
}
return dbInfo
}
func (svc *DbProgramMysql) getMysqlBin() *config.MysqlBin {
if svc.mysqlBin != nil {
return svc.mysqlBin
}
dbInfo := svc.dbInfo()
var mysqlBin *config.MysqlBin
switch dbInfo.Type {
case dbi.DbTypeMariadb:
mysqlBin = config.GetMysqlBin(config.ConfigKeyDbMariadbBin)
case dbi.DbTypeMysql:
mysqlBin = config.GetMysqlBin(config.ConfigKeyDbMysqlBin)
default:
panic(fmt.Sprintf("不兼容 MySQL 的数据库类型: %v", dbInfo.Type))
}
svc.mysqlBin = mysqlBin
return svc.mysqlBin
}
func (svc *DbProgramMysql) getBackupPath() string {
if len(svc.backupPath) > 0 {
return svc.backupPath
}
return config.GetDbBackupRestore().BackupPath
}
func (svc *DbProgramMysql) GetBinlogFilePath(fileName string) string {
return filepath.Join(svc.getBinlogDir(svc.dbInfo().InstanceId), fileName)
}
func (svc *DbProgramMysql) Backup(ctx context.Context, backupHistory *entity.DbBackupHistory) (*entity.BinlogInfo, error) {
dir := svc.getDbBackupDir(backupHistory.DbInstanceId, backupHistory.DbBackupId)
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
return nil, err
}
tmpFile := filepath.Join(dir, "backup.tmp")
defer func() {
_ = os.Remove(tmpFile)
}()
dbInfo := svc.dbInfo()
args := []string{
"--host", dbInfo.Host,
"--port", strconv.Itoa(dbInfo.Port),
"--user", dbInfo.Username,
"--password=" + dbInfo.Password,
"--add-drop-database",
"--result-file", tmpFile,
"--single-transaction",
"--master-data=2",
"--databases", backupHistory.DbName,
}
cmd := exec.CommandContext(ctx, svc.getMysqlBin().MysqldumpPath, args...)
logx.Debugf("backup database using mysqldump binary: %s", cmd.String())
if err := runCmd(cmd); err != nil {
logx.Errorf("运行 mysqldump 程序失败: %v", err)
return nil, errors.Wrap(err, "运行 mysqldump 程序失败")
}
logx.Debugf("Checking dumped file stat: %s", tmpFile)
if _, err := os.Stat(tmpFile); err != nil {
logx.Errorf("未找到备份文件: %v", err)
return nil, errors.Wrapf(err, "未找到备份文件")
}
reader, err := os.Open(tmpFile)
if err != nil {
return nil, err
}
binlogInfo, err := readBinlogInfoFromBackup(reader)
_ = reader.Close()
if err != nil {
return nil, errors.Wrapf(err, "从备份文件中读取 binlog 信息失败")
}
fileName := filepath.Join(dir, fmt.Sprintf("%s.sql", backupHistory.Uuid))
if err := os.Rename(tmpFile, fileName); err != nil {
return nil, errors.Wrap(err, "备份文件改名失败")
}
return binlogInfo, nil
}
func (svc *DbProgramMysql) RestoreBackupHistory(ctx context.Context, dbName string, dbBackupId uint64, dbBackupHistoryUuid string) error {
dbInfo := svc.dbInfo()
args := []string{
"--host", dbInfo.Host,
"--port", strconv.Itoa(dbInfo.Port),
"--database", dbName,
"--user", dbInfo.Username,
"--password=" + dbInfo.Password,
}
fileName := filepath.Join(svc.getDbBackupDir(svc.dbInfo().InstanceId, dbBackupId),
fmt.Sprintf("%v.sql", dbBackupHistoryUuid))
file, err := os.Open(fileName)
if err != nil {
return errors.Wrap(err, "打开备份文件失败")
}
defer func() {
_ = file.Close()
}()
cmd := exec.CommandContext(ctx, svc.getMysqlBin().MysqlPath, args...)
cmd.Stdin = file
logx.Debug("恢复数据库: ", cmd.String())
if err := runCmd(cmd); err != nil {
logx.Errorf("运行 mysql 程序失败: %v", err)
return errors.Wrap(err, "运行 mysql 程序失败")
}
return nil
}
// Download binlog files on server.
func (svc *DbProgramMysql) downloadBinlogFilesOnServer(ctx context.Context, binlogFilesOnServerSorted []*entity.BinlogFile, downloadLatestBinlogFile bool) error {
if len(binlogFilesOnServerSorted) == 0 {
logx.Debug("No binlog file found on server to download")
return nil
}
dbInfo := svc.dbInfo()
if err := os.MkdirAll(svc.getBinlogDir(dbInfo.InstanceId), os.ModePerm); err != nil {
return errors.Wrapf(err, "创建 binlog 目录失败: %q", svc.getBinlogDir(svc.dbInfo().InstanceId))
}
latestBinlogFileOnServer := binlogFilesOnServerSorted[len(binlogFilesOnServerSorted)-1]
for _, fileOnServer := range binlogFilesOnServerSorted {
isLatest := fileOnServer.Name == latestBinlogFileOnServer.Name
if isLatest && !downloadLatestBinlogFile {
continue
}
binlogFilePath := filepath.Join(svc.getBinlogDir(dbInfo.InstanceId), fileOnServer.Name)
logx.Debug("Downloading binlog file from MySQL server.", logx.String("path", binlogFilePath), logx.Bool("isLatest", isLatest))
if err := svc.downloadBinlogFile(ctx, fileOnServer, isLatest); err != nil {
logx.Error("下载 binlog 文件失败", logx.String("path", binlogFilePath), logx.String("error", err.Error()))
return errors.Wrapf(err, "下载 binlog 文件失败: %q", binlogFilePath)
}
}
return nil
}
// Parse the first binlog eventTs of a local binlog file.
func (svc *DbProgramMysql) parseLocalBinlogFirstEventTime(ctx context.Context, filePath string) (eventTime time.Time, parseErr error) {
args := []string{
// Local binlog file path.
filePath,
// Verify checksum binlog events.
"--verify-binlog-checksum",
// Tell mysqlbinlog to suppress the BINLOG statements for row events, which reduces the unneeded output.
"--base64-output=DECODE-ROWS",
}
cmd := exec.CommandContext(ctx, svc.getMysqlBin().MysqlbinlogPath, args...)
var stderr strings.Builder
cmd.Stderr = &stderr
pr, err := cmd.StdoutPipe()
if err != nil {
return time.Time{}, err
}
if err := cmd.Start(); err != nil {
return time.Time{}, err
}
defer func() {
_ = cmd.Cancel()
if err := cmd.Wait(); err != nil && parseErr != nil && stderr.Len() > 0 {
parseErr = errors.Wrap(parseErr, stderr.String())
}
}()
for s := bufio.NewScanner(pr); s.Scan(); {
line := s.Text()
eventTimeParsed, found, err := parseBinlogEventTimeInLine(line)
if err != nil {
return time.Time{}, errors.Wrap(err, "解析 binlog 文件失败")
}
if found {
return eventTimeParsed, nil
}
}
return time.Time{}, errors.New("解析 binlog 文件失败")
}
var singleFlightGroup singleflight.Group
// FetchBinlogs downloads binlog files from startingFileName on server to `binlogDir`.
func (svc *DbProgramMysql) FetchBinlogs(ctx context.Context, downloadLatestBinlogFile bool, earliestBackupSequence, latestBinlogSequence int64) ([]*entity.BinlogFile, error) {
var downloaded bool
key := strconv.FormatUint(svc.dbInfo().InstanceId, 16)
binlogFiles, err, _ := singleFlightGroup.Do(key, func() (interface{}, error) {
downloaded = true
return svc.fetchBinlogs(ctx, downloadLatestBinlogFile, earliestBackupSequence, latestBinlogSequence)
})
if err != nil {
return nil, err
}
if downloaded {
return binlogFiles.([]*entity.BinlogFile), nil
}
if !downloadLatestBinlogFile {
return nil, nil
}
binlogFiles, err, _ = singleFlightGroup.Do(key, func() (interface{}, error) {
return svc.fetchBinlogs(ctx, true, earliestBackupSequence, latestBinlogSequence)
})
if err != nil {
return nil, err
}
return binlogFiles.([]*entity.BinlogFile), err
}
// fetchBinlogs downloads binlog files from startingFileName on server to `binlogDir`.
func (svc *DbProgramMysql) fetchBinlogs(ctx context.Context, downloadLatestBinlogFile bool, earliestBackupSequence, latestBinlogSequence int64) ([]*entity.BinlogFile, error) {
// Read binlog files list on server.
binlogFilesOnServerSorted, err := svc.GetSortedBinlogFilesOnServer(ctx)
if err != nil {
return nil, err
}
if len(binlogFilesOnServerSorted) == 0 {
logx.Debug("No binlog file found on server to download")
return nil, nil
}
indexHistory := -1
for i, file := range binlogFilesOnServerSorted {
if latestBinlogSequence == file.Sequence {
indexHistory = i + 1
break
}
if earliestBackupSequence == file.Sequence {
indexHistory = i
break
}
}
if indexHistory < 0 {
return nil, errors.New(fmt.Sprintf("在数据库服务器上未找到 binlog 文件: %d, %d", earliestBackupSequence, latestBinlogSequence))
}
if indexHistory > len(binlogFilesOnServerSorted)-1 {
indexHistory = len(binlogFilesOnServerSorted) - 1
}
binlogFilesOnServerSorted = binlogFilesOnServerSorted[indexHistory:]
if err := svc.downloadBinlogFilesOnServer(ctx, binlogFilesOnServerSorted, downloadLatestBinlogFile); err != nil {
return nil, err
}
return binlogFilesOnServerSorted, nil
}
// Syncs the binlog specified by `meta` between the instance and local.
// If isLast is true, it means that this is the last binlog file containing the targetTs event.
// It may keep growing as there are ongoing writes to the database. So we just need to check that
// the file size is larger or equal to the binlog file size we queried from the MySQL server earlier.
func (svc *DbProgramMysql) downloadBinlogFile(ctx context.Context, binlogFileToDownload *entity.BinlogFile, isLast bool) error {
dbInfo := svc.dbInfo()
tempBinlogPrefix := filepath.Join(svc.getBinlogDir(dbInfo.InstanceId), "tmp-")
args := []string{
binlogFileToDownload.Name,
"--read-from-remote-server",
// Verify checksum binlog events.
"--verify-binlog-checksum",
"--host", dbInfo.Host,
"--port", strconv.Itoa(dbInfo.Port),
"--user", dbInfo.Username,
"--raw",
// With --raw this is a prefix for the file names.
"--result-file", tempBinlogPrefix,
}
cmd := exec.CommandContext(ctx, svc.getMysqlBin().MysqlbinlogPath, args...)
// We cannot set password as a flag. Otherwise, there is warning message
// "mysqlbinlog: [Warning] Using a password on the command line interface can be insecure."
if dbInfo.Password != "" {
cmd.Env = append(cmd.Env, fmt.Sprintf("MYSQL_PWD=%s", dbInfo.Password))
}
logx.Debug("Downloading binlog files using mysqlbinlog:", cmd.String())
binlogFilePathTemp := tempBinlogPrefix + binlogFileToDownload.Name
defer func() {
_ = os.Remove(binlogFilePathTemp)
}()
if err := runCmd(cmd); err != nil {
logx.Errorf("运行 mysqlbinlog 程序失败: %v", err)
return errors.Wrap(err, "运行 mysqlbinlog 程序失败")
}
logx.Debug("Checking downloaded binlog file stat", logx.String("path", binlogFilePathTemp))
binlogFileTempInfo, err := os.Stat(binlogFilePathTemp)
if err != nil {
logx.Error("未找到 binlog 文件", logx.String("path", binlogFilePathTemp), logx.String("error", err.Error()))
return errors.Wrapf(err, "未找到 binlog 文件: %q", binlogFilePathTemp)
}
if !isLast && binlogFileTempInfo.Size() != binlogFileToDownload.Size {
logx.Error("Downloaded archived binlog file size is not equal to size queried on the MySQL server earlier.",
logx.String("binlog", binlogFileToDownload.Name),
logx.Int64("sizeInfo", binlogFileToDownload.Size),
logx.Int64("downloadedSize", binlogFileTempInfo.Size()),
)
return errors.Errorf("下载的 binlog 文件 %q 与服务上的文件大小不一致 %d != %d", binlogFilePathTemp, binlogFileTempInfo.Size(), binlogFileToDownload.Size)
}
binlogFilePath := svc.GetBinlogFilePath(binlogFileToDownload.Name)
if err := os.Rename(binlogFilePathTemp, binlogFilePath); err != nil {
return errors.Wrapf(err, "binlog 文件更名失败: %q -> %q", binlogFilePathTemp, binlogFilePath)
}
firstEventTime, err := svc.parseLocalBinlogFirstEventTime(ctx, binlogFilePath)
if err != nil {
return err
}
binlogFileToDownload.FirstEventTime = firstEventTime
binlogFileToDownload.Downloaded = true
return nil
}
// GetSortedBinlogFilesOnServer returns the information of binlog files in ascending order by their numeric extension.
func (svc *DbProgramMysql) GetSortedBinlogFilesOnServer(_ context.Context) ([]*entity.BinlogFile, error) {
query := "SHOW BINARY LOGS"
columns, rows, err := svc.dbConn.Query(query)
if err != nil {
return nil, errors.Wrapf(err, "SQL 语句 %q 执行失败", query)
}
findFileName := false
findFileSize := false
for _, column := range columns {
switch column.Name {
case "Log_name":
findFileName = true
case "File_size":
findFileSize = true
}
}
if !findFileName || !findFileSize {
return nil, errors.Errorf("SQL 语句 %q 执行结果解析失败", query)
}
var binlogFiles []*entity.BinlogFile
for _, row := range rows {
name, nameOk := row["Log_name"].(string)
size, sizeOk := row["File_size"].(uint64)
if !nameOk || !sizeOk {
return nil, errors.Errorf("SQL 语句 %q 执行结果解析失败", query)
}
_, seq, err := ParseBinlogName(name)
if err != nil {
return nil, errors.Wrapf(err, "SQL 语句 %q 执行结果解析失败", query)
}
binlogFile := &entity.BinlogFile{
Name: name,
Size: int64(size),
Sequence: seq,
}
binlogFiles = append(binlogFiles, binlogFile)
}
return sortBinlogFiles(binlogFiles), nil
}
var regexpBinlogInfo = regexp.MustCompile("CHANGE MASTER TO MASTER_LOG_FILE='([^.]+).([0-9]+)', MASTER_LOG_POS=([0-9]+);")
func readBinlogInfoFromBackup(reader io.Reader) (*entity.BinlogInfo, error) {
matching := false
r := bufio.NewReader(reader)
const maxMatchRow = 100
for i := 0; i < maxMatchRow; i++ {
row, err := r.ReadString('\n')
if err == io.EOF {
break
} else if err != nil {
return nil, err
}
if !matching {
if row == "-- Position to start replication or point-in-time recovery from\n" {
matching = true
} else {
continue
}
}
res := regexpBinlogInfo.FindStringSubmatch(row)
if res == nil {
continue
}
seq, err := strconv.ParseInt(res[2], 10, 64)
if err != nil {
return nil, err
}
pos, err := strconv.ParseInt(res[3], 10, 64)
if err != nil {
return nil, err
}
return &entity.BinlogInfo{
FileName: fmt.Sprintf("%s.%s", res[1], res[2]),
Sequence: seq,
Position: pos,
}, nil
}
return nil, errors.New("备份文件中未找到 binlog 信息")
}
// Use command like mysqlbinlog --start-datetime=targetTs binlog.000001 to parse the first binlog event position with timestamp equal or after targetTs.
func (svc *DbProgramMysql) GetBinlogEventPositionAtOrAfterTime(ctx context.Context, binlogName string, targetTime time.Time) (position int64, parseErr error) {
binlogPath := svc.GetBinlogFilePath(binlogName)
args := []string{
// Local binlog file path.
binlogPath,
// Verify checksum binlog events.
"--verify-binlog-checksum",
// Tell mysqlbinlog to suppress the BINLOG statements for row events, which reduces the unneeded output.
"--base64-output=DECODE-ROWS",
// Instruct mysqlbinlog to start output only after encountering the first binlog event with timestamp equal or after targetTime.
"--start-datetime", targetTime.Local().Format(time.DateTime),
}
cmd := exec.CommandContext(ctx, svc.getMysqlBin().MysqlbinlogPath, args...)
var stderr strings.Builder
cmd.Stderr = &stderr
pr, err := cmd.StdoutPipe()
if err != nil {
return 0, err
}
if err := cmd.Start(); err != nil {
return 0, err
}
defer func() {
_ = cmd.Cancel()
if err := cmd.Wait(); err != nil && parseErr != nil && stderr.Len() > 0 {
parseErr = errors.Wrap(errors.New(stderr.String()), parseErr.Error())
}
}()
for s := bufio.NewScanner(pr); s.Scan(); {
line := s.Text()
posParsed, found, err := parseBinlogEventPosInLine(line)
if err != nil {
return 0, errors.Wrap(err, "binlog 文件解析失败")
}
// When invoking mysqlbinlog with --start-datetime, the first valid event will always be FORMAT_DESCRIPTION_EVENT which should be skipped.
if found && posParsed != 4 {
return posParsed, nil
}
}
return 0, errors.Errorf("在 %s 之后没有 binlog 事件", targetTime.Format(time.DateTime))
}
// ReplayBinlog replays the binlog for `originDatabase` from `startBinlogInfo.Position` to `targetTs`, read binlog from `binlogDir`.
func (svc *DbProgramMysql) ReplayBinlog(ctx context.Context, originalDatabase, targetDatabase string, restoreInfo *dbi.RestoreInfo) (replayErr error) {
const (
// Variable lower_case_table_names related.
// LetterCaseOnDiskLetterCaseCmp stores table and database names using the letter case specified in the CREATE TABLE or CREATE DATABASE statement.
// Name comparisons are case-sensitive.
LetterCaseOnDiskLetterCaseCmp = 0
// LowerCaseOnDiskLowerCaseCmp stores table names in lowercase on disk and name comparisons are not case-sensitive.
LowerCaseOnDiskLowerCaseCmp = 1
// LetterCaseOnDiskLowerCaseCmp stores table and database names are stored on disk using the letter case specified in the CREATE TABLE or CREATE DATABASE statement, but MySQL converts them to lowercase on lookup.
// Name comparisons are not case-sensitive.
LetterCaseOnDiskLowerCaseCmp = 2
)
caseVariable := "lower_case_table_names"
identifierCaseSensitive, err := svc.getServerVariable(ctx, caseVariable)
if err != nil {
return err
}
identifierCaseSensitiveValue, err := strconv.Atoi(identifierCaseSensitive)
if err != nil {
return err
}
var originalDBName string
switch identifierCaseSensitiveValue {
case LetterCaseOnDiskLetterCaseCmp:
originalDBName = originalDatabase
case LowerCaseOnDiskLowerCaseCmp:
originalDBName = strings.ToLower(originalDatabase)
case LetterCaseOnDiskLowerCaseCmp:
originalDBName = strings.ToLower(originalDatabase)
default:
return errors.Errorf("参数 %s 的值 %s 不符合预期: [%d, %d, %d] ", caseVariable, identifierCaseSensitive, 0, 1, 2)
}
// Extract the SQL statements from the binlog and replay them to the pitrDatabase via the mysql client by pipe.
mysqlbinlogArgs := []string{
// Verify checksum binlog events.
"--verify-binlog-checksum",
// Disable binary logging.
"--disable-log-bin",
// Create rewrite rules for databases when playing back from logs written in row-based format, so that we can apply the binlog to PITR database instead of the original database.
"--rewrite-db", fmt.Sprintf("%s->%s", originalDBName, targetDatabase),
// List entries for just this database. It's applied after the --rewrite-db option, so we should provide the rewritten database, i.e., pitrDatabase.
"--database", targetDatabase,
// Decode binary log from first event with position equal to or greater than argument.
"--start-position", fmt.Sprintf("%d", restoreInfo.StartPosition),
// Stop decoding binary log at first event with position equal to or greater than argument.
"--stop-position", fmt.Sprintf("%d", restoreInfo.TargetPosition),
}
dbInfo := svc.dbInfo()
mysqlbinlogArgs = append(mysqlbinlogArgs, restoreInfo.GetBinlogPaths(svc.getBinlogDir(dbInfo.InstanceId))...)
mysqlArgs := []string{
"--host", dbInfo.Host,
"--port", strconv.Itoa(dbInfo.Port),
"--user", dbInfo.Username,
}
if dbInfo.Password != "" {
// The --password parameter of mysql/mysqlbinlog does not support the "--password PASSWORD" format (split by space).
// If provided like that, the program will hang.
mysqlArgs = append(mysqlArgs, fmt.Sprintf("--password=%s", dbInfo.Password))
}
mysqlbinlogCmd := exec.CommandContext(ctx, svc.getMysqlBin().MysqlbinlogPath, mysqlbinlogArgs...)
mysqlCmd := exec.CommandContext(ctx, svc.getMysqlBin().MysqlPath, mysqlArgs...)
logx.Debug("Start replay binlog commands.",
logx.String("mysqlbinlog", mysqlbinlogCmd.String()),
logx.String("mysql", mysqlCmd.String()))
defer func() {
if replayErr == nil {
logx.Debug("Replayed binlog successfully.")
}
}()
mysqlRead, err := mysqlbinlogCmd.StdoutPipe()
if err != nil {
return errors.Wrap(err, "创建 mysqlbinlog 输出管道失败")
}
defer func() {
_ = mysqlRead.Close()
}()
var mysqlbinlogErr, mysqlErr strings.Builder
mysqlbinlogCmd.Stderr = &mysqlbinlogErr
mysqlCmd.Stderr = &mysqlErr
mysqlCmd.Stdout = os.Stdout
mysqlCmd.Stdin = mysqlRead
if err := mysqlbinlogCmd.Start(); err != nil {
return errors.Wrap(err, "启动 mysqlbinlog 程序失败")
}
defer func() {
if err := mysqlbinlogCmd.Wait(); err != nil {
if replayErr != nil {
replayErr = errors.Wrap(replayErr, "运行 mysqlbinlog 程序失败")
} else {
replayErr = errors.Errorf("运行 mysqlbinlog 程序失败: %s", mysqlbinlogErr.String())
}
}
}()
if err := mysqlCmd.Start(); err != nil {
return errors.Wrap(err, "启动 mysql 程序失败")
}
if err := mysqlCmd.Wait(); err != nil {
return errors.Errorf("运行 mysql 程序失败: %s", mysqlErr.String())
}
return nil
}
func (svc *DbProgramMysql) getServerVariable(_ context.Context, varName string) (string, error) {
query := fmt.Sprintf("SHOW VARIABLES LIKE '%s'", varName)
_, rows, err := svc.dbConn.Query(query)
if err != nil {
return "", err
}
if len(rows) == 0 {
return "", sql.ErrNoRows
}
var varNameFound, value string
varNameFound = rows[0]["Variable_name"].(string)
if varName != varNameFound {
return "", errors.Errorf("未找到数据库参数 %s", varName)
}
value = rows[0]["Value"].(string)
return value, nil
}
// CheckBinlogEnabled checks whether binlog is enabled for the current instance.
func (svc *DbProgramMysql) CheckBinlogEnabled(ctx context.Context) error {
value, err := svc.getServerVariable(ctx, "log_bin")
if err != nil {
return err
}
if strings.ToUpper(value) != "ON" {
return errors.Errorf("数据库未启用 binlog")
}
return nil
}
// CheckBinlogRowFormat checks whether the binlog format is ROW.
func (svc *DbProgramMysql) CheckBinlogRowFormat(ctx context.Context) error {
value, err := svc.getServerVariable(ctx, "binlog_format")
if err != nil {
return err
}
if strings.ToUpper(value) != "ROW" {
return errors.Errorf("binlog 格式 %s 不是行模式", value)
}
return nil
}
func runCmd(cmd *exec.Cmd) error {
var stderr strings.Builder
cmd.Stdout = os.Stdout
cmd.Stderr = &stderr
if err := cmd.Start(); err != nil {
return err
}
if err := cmd.Wait(); err != nil {
return errors.New(stderr.String())
}
return nil
}
func (svc *DbProgramMysql) execute(database string, sql string) error {
dbInfo := svc.dbInfo()
args := []string{
"--host", dbInfo.Host,
"--port", strconv.Itoa(dbInfo.Port),
"--user", dbInfo.Username,
"--password=" + dbInfo.Password,
"--execute", sql,
}
if len(database) > 0 {
args = append(args, database)
}
cmd := exec.Command(svc.getMysqlBin().MysqlPath, args...)
logx.Debug("execute sql using mysql binary: ", cmd.String())
if err := runCmd(cmd); err != nil {
logx.Errorf("运行 mysql 程序失败: %v", err)
return errors.Wrap(err, "运行 mysql 程序失败")
}
return nil
}
// sortBinlogFiles will sort binlog files in ascending order by their numeric extension.
// For mysql binlog, after the serial number reaches 999999, the next serial number will not return to 000000, but 1000000,
// so we cannot directly use string to compare lexicographical order.
func sortBinlogFiles(binlogFiles []*entity.BinlogFile) []*entity.BinlogFile {
var sorted []*entity.BinlogFile
sorted = append(sorted, binlogFiles...)
sort.Slice(sorted, func(i, j int) bool {
return sorted[i].Sequence < sorted[j].Sequence
})
return sorted
}
func parseBinlogEventTimeInLine(line string) (eventTs time.Time, found bool, err error) {
// The target line starts with string like "#220421 14:49:26 server id 1"
if !strings.Contains(line, "server id") {
return time.Time{}, false, nil
}
if strings.Contains(line, "end_log_pos 0") {
// https://github.com/mysql/mysql-server/blob/8.0/client/mysqlbinlog.cc#L1209-L1212
// Fake events with end_log_pos=0 could be generated and we need to ignore them.
return time.Time{}, false, nil
}
fields := strings.Fields(line)
// fields should starts with ["#220421", "14:49:26", "server", "id", "1", "end_log_pos", "34794"]
if len(fields) < 7 ||
(len(fields[0]) != 7 || fields[2] != "server" || fields[3] != "id" || fields[5] != "end_log_pos") {
return time.Time{}, false, errors.Errorf("found unexpected mysqlbinlog output line %q when parsing binlog event timestamp", line)
}
datetime, err := time.ParseInLocation("060102 15:04:05", fmt.Sprintf("%s %s", fields[0][1:], fields[1]), time.Local)
if err != nil {
return time.Time{}, false, err
}
return datetime, true, nil
}
func parseBinlogEventPosInLine(line string) (pos int64, found bool, err error) {
// The mysqlbinlog output will contains a line starting with "# at 35065", which is the binlog event's start position.
if !strings.HasPrefix(line, "# at ") {
return 0, false, nil
}
// This is the line containing the start position of the binlog event.
fields := strings.Fields(line)
if len(fields) != 3 {
return 0, false, errors.Errorf("unexpected mysqlbinlog output line %q when parsing binlog event start position", line)
}
pos, err = strconv.ParseInt(fields[2], 10, 0)
if err != nil {
return 0, false, err
}
return pos, true, nil
}
// ParseBinlogName parses the numeric extension and the binary log base name by using split the dot.
// Examples:
// - ("binlog.000001") => ("binlog", 1)
// - ("binlog000001") => ("", err)
func ParseBinlogName(name string) (string, int64, error) {
s := strings.Split(name, ".")
if len(s) != 2 {
return "", 0, errors.Errorf("failed to parse binlog extension, expecting two parts in the binlog file name %q but got %d", name, len(s))
}
seq, err := strconv.ParseInt(s[1], 10, 0)
if err != nil {
return "", 0, errors.Wrapf(err, "failed to parse the sequence number %s", s[1])
}
return s[0], seq, nil
}
// getBinlogDir gets the binlogDir.
func (svc *DbProgramMysql) getBinlogDir(instanceId uint64) string {
return filepath.Join(
svc.getBackupPath(),
fmt.Sprintf("instance-%d", instanceId),
"binlog")
}
func (svc *DbProgramMysql) getDbInstanceBackupRoot(instanceId uint64) string {
return filepath.Join(
svc.getBackupPath(),
fmt.Sprintf("instance-%d", instanceId))
}
func (svc *DbProgramMysql) getDbBackupDir(instanceId, backupId uint64) string {
return filepath.Join(
svc.getBackupPath(),
fmt.Sprintf("instance-%d", instanceId),
fmt.Sprintf("backup-%d", backupId))
}

View File

@@ -0,0 +1,276 @@
//go:build e2e
package mysql
import (
"context"
"errors"
"fmt"
"github.com/stretchr/testify/suite"
"mayfly-go/internal/db/config"
"mayfly-go/internal/db/domain/entity"
"mayfly-go/internal/db/domain/repository"
"mayfly-go/internal/db/infrastructure/persistence"
"os"
"path/filepath"
"runtime"
"strings"
"testing"
"time"
)
const (
instanceIdTest = 0
backupIdTest = 0
dbNameBackupTest = "test-backup-01"
tableNameBackupTest = "test-backup"
tableNameRestorePITTest = "test-restore-pit"
tableNameNoBackupTest = "test-not-backup"
)
type DbInstanceSuite struct {
suite.Suite
repositories *repository.Repositories
instanceSvc *DbProgramMysql
dbConn *DbConn
}
func (s *DbInstanceSuite) SetupSuite() {
if err := chdir("mayfly-go", "server"); err != nil {
panic(err)
}
dbInfo := DbInfo{
Type: DbTypeMysql,
Host: "localhost",
Port: 3306,
Username: "test",
Password: "test",
}
dbConn, err := dbInfo.Conn()
s.Require().NoError(err)
s.dbConn = dbConn
s.repositories = &repository.Repositories{
Instance: persistence.GetInstanceRepo(),
Backup: persistence.NewDbBackupRepo(),
BackupHistory: persistence.NewDbBackupHistoryRepo(),
Restore: persistence.NewDbRestoreRepo(),
RestoreHistory: persistence.NewDbRestoreHistoryRepo(),
Binlog: persistence.NewDbBinlogRepo(),
BinlogHistory: persistence.NewDbBinlogHistoryRepo(),
}
s.instanceSvc = NewDbProgramMysql(s.dbConn)
var extName string
if runtime.GOOS == "windows" {
extName = ".exe"
}
path := "db/mysql/bin"
s.instanceSvc.mysqlBin = &config.MysqlBin{
Path: filepath.Join(path),
MysqlPath: filepath.Join(path, "mysql"+extName),
MysqldumpPath: filepath.Join(path, "mysqldump"+extName),
MysqlbinlogPath: filepath.Join(path, "mysqlbinlog"+extName),
}
s.instanceSvc.backupPath = "db/backup"
}
func (s *DbInstanceSuite) TearDownSuite() {
if s.dbConn != nil {
s.dbConn.Close()
s.dbConn = nil
}
}
func (s *DbInstanceSuite) SetupTest() {
sql := strings.Builder{}
require := s.Require()
sql.WriteString(fmt.Sprintf("drop database if exists `%s`;", dbNameBackupTest))
sql.WriteString(fmt.Sprintf("create database `%s`;", dbNameBackupTest))
require.NoError(s.instanceSvc.execute("", sql.String()))
}
func (s *DbInstanceSuite) TearDownTest() {
require := s.Require()
sql := fmt.Sprintf("drop database if exists `%s`", dbNameBackupTest)
require.NoError(s.instanceSvc.execute("", sql))
_ = os.RemoveAll(s.instanceSvc.getDbInstanceBackupRoot(instanceIdTest))
}
func (s *DbInstanceSuite) TestBackup() {
history := &entity.DbBackupHistory{
DbName: dbNameBackupTest,
Uuid: dbNameBackupTest,
}
history.Id = backupIdTest
s.testBackup(history)
}
func (s *DbInstanceSuite) testBackup(backupHistory *entity.DbBackupHistory) {
require := s.Require()
binlogInfo, err := s.instanceSvc.Backup(context.Background(), backupHistory)
require.NoError(err)
fileName := filepath.Join(s.instanceSvc.getDbBackupDir(s.dbConn.Info.InstanceId, backupHistory.Id), dbNameBackupTest+".sql")
_, err = os.Stat(fileName)
require.NoError(err)
backupHistory.BinlogFileName = binlogInfo.FileName
backupHistory.BinlogSequence = binlogInfo.Sequence
backupHistory.BinlogPosition = binlogInfo.Position
}
func TestDbInstance(t *testing.T) {
suite.Run(t, &DbInstanceSuite{})
}
func (s *DbInstanceSuite) TestRestoreDatabase() {
backupHistory := &entity.DbBackupHistory{
DbName: dbNameBackupTest,
Uuid: dbNameBackupTest,
}
s.createTable(dbNameBackupTest, tableNameBackupTest, "")
s.selectTable(dbNameBackupTest, tableNameBackupTest, "")
s.testBackup(backupHistory)
s.createTable(dbNameBackupTest, tableNameNoBackupTest, "")
s.selectTable(dbNameBackupTest, tableNameNoBackupTest, "")
s.testRestore(backupHistory)
s.selectTable(dbNameBackupTest, tableNameBackupTest, "")
s.selectTable(dbNameBackupTest, tableNameNoBackupTest, "运行 mysql 程序失败")
}
func (s *DbInstanceSuite) TestRestorePontInTime() {
backupHistory := &entity.DbBackupHistory{
DbName: dbNameBackupTest,
Uuid: dbNameBackupTest,
}
s.createTable(dbNameBackupTest, tableNameBackupTest, "")
s.selectTable(dbNameBackupTest, tableNameBackupTest, "")
s.testBackup(backupHistory)
s.createTable(dbNameBackupTest, tableNameRestorePITTest, "")
s.selectTable(dbNameBackupTest, tableNameRestorePITTest, "")
time.Sleep(time.Second)
targetTime := time.Now()
s.dropTable(dbNameBackupTest, tableNameBackupTest, "")
s.selectTable(dbNameBackupTest, tableNameBackupTest, "运行 mysql 程序失败")
s.createTable(dbNameBackupTest, tableNameNoBackupTest, "")
s.selectTable(dbNameBackupTest, tableNameNoBackupTest, "")
s.testRestore(backupHistory)
s.selectTable(dbNameBackupTest, tableNameBackupTest, "")
s.selectTable(dbNameBackupTest, tableNameRestorePITTest, "运行 mysql 程序失败")
s.selectTable(dbNameBackupTest, tableNameNoBackupTest, "运行 mysql 程序失败")
s.testReplayBinlog(backupHistory, targetTime)
s.selectTable(dbNameBackupTest, tableNameBackupTest, "")
s.selectTable(dbNameBackupTest, tableNameRestorePITTest, "")
s.selectTable(dbNameBackupTest, tableNameNoBackupTest, "运行 mysql 程序失败")
}
func (s *DbInstanceSuite) testReplayBinlog(backupHistory *entity.DbBackupHistory, targetTime time.Time) {
require := s.Require()
binlogFilesOnServerSorted, err := s.instanceSvc.GetSortedBinlogFilesOnServer(context.Background())
require.NoError(err)
require.True(len(binlogFilesOnServerSorted) > 0, "binlog 文件不存在")
for i, bf := range binlogFilesOnServerSorted {
if bf.Name == backupHistory.BinlogFileName {
binlogFilesOnServerSorted = binlogFilesOnServerSorted[i:]
break
}
require.Less(i, len(binlogFilesOnServerSorted), "binlog 文件没找到")
}
err = s.instanceSvc.downloadBinlogFilesOnServer(context.Background(), binlogFilesOnServerSorted, true)
require.NoError(err)
binlogFileLast := binlogFilesOnServerSorted[len(binlogFilesOnServerSorted)-1]
position, err := s.instanceSvc.GetBinlogEventPositionAtOrAfterTime(context.Background(), binlogFileLast.Name, targetTime)
require.NoError(err)
binlogHistories := make([]*entity.DbBinlogHistory, 0, 2)
binlogHistoryBackup := &entity.DbBinlogHistory{
FileName: backupHistory.BinlogFileName,
Sequence: backupHistory.BinlogSequence,
}
binlogHistories = append(binlogHistories, binlogHistoryBackup)
if binlogHistoryBackup.Sequence != binlogFileLast.Sequence {
require.Equal(binlogFilesOnServerSorted[0].Sequence, binlogHistoryBackup.Sequence)
binlogHistoryLast := &entity.DbBinlogHistory{
FileName: binlogFileLast.Name,
Sequence: binlogFileLast.Sequence,
}
binlogHistories = append(binlogHistories, binlogHistoryLast)
}
restoreInfo := &RestoreInfo{
BackupHistory: backupHistory,
BinlogHistories: binlogHistories,
StartPosition: backupHistory.BinlogPosition,
TargetPosition: position,
TargetTime: targetTime,
}
err = s.instanceSvc.ReplayBinlog(context.Background(), dbNameBackupTest, dbNameBackupTest, restoreInfo)
require.NoError(err)
}
func (s *DbInstanceSuite) testRestore(backupHistory *entity.DbBackupHistory) {
require := s.Require()
err := s.instanceSvc.RestoreBackupHistory(context.Background(), backupHistory.DbName, backupHistory.DbBackupId, backupHistory.Uuid)
require.NoError(err)
}
func (s *DbInstanceSuite) selectTable(database, tableName, wantErr string) {
require := s.Require()
sql := fmt.Sprintf("select * from`%s`;", tableName)
err := s.instanceSvc.execute(database, sql)
if len(wantErr) > 0 {
require.ErrorContains(err, wantErr)
return
}
require.NoError(err)
}
func (s *DbInstanceSuite) createTable(database, tableName, wantErr string) {
require := s.Require()
sql := fmt.Sprintf("create table `%s`(id int);", tableName)
err := s.instanceSvc.execute(database, sql)
if len(wantErr) > 0 {
require.ErrorContains(err, wantErr)
return
}
require.NoError(err)
}
func (s *DbInstanceSuite) dropTable(database, tableName, wantErr string) {
require := s.Require()
sql := fmt.Sprintf("drop table `%s`;", tableName)
err := s.instanceSvc.execute(database, sql)
if len(wantErr) > 0 {
require.ErrorContains(err, wantErr)
return
}
require.NoError(err)
}
func chdir(projectName string, subdir ...string) error {
subdir = append([]string{"/", projectName}, subdir...)
suffix := filepath.Join(subdir...)
wd, err := os.Getwd()
if err != nil {
return err
}
for {
if strings.HasSuffix(wd, suffix) {
if err := os.Chdir(wd); err != nil {
return err
}
return nil
}
upper := filepath.Join(wd, "..")
if upper == wd {
return errors.New(fmt.Sprintf("not found directory: %s", suffix[1:]))
}
wd = upper
}
}

View File

@@ -0,0 +1,26 @@
package mysql
import (
"mayfly-go/internal/db/domain/entity"
"strings"
"testing"
"github.com/stretchr/testify/require"
)
func Test_readBinlogInfoFromBackup(t *testing.T) {
text := `
--
-- Position to start replication or point-in-time recovery from
--
-- CHANGE MASTER TO MASTER_LOG_FILE='binlog.000003', MASTER_LOG_POS=379;
`
got, err := readBinlogInfoFromBackup(strings.NewReader(text))
require.NoError(t, err)
require.Equal(t, &entity.BinlogInfo{
FileName: "binlog.000003",
Sequence: 3,
Position: 379,
}, got)
}