feat: 系统升级支持数据库自动迁移,避免手动执行升级脚本

This commit is contained in:
meilin.huang
2025-02-13 21:11:23 +08:00
parent efb2b7368c
commit aa393590b2
78 changed files with 1965 additions and 1827 deletions

View File

@@ -159,7 +159,7 @@ func (app *dataSyncAppImpl) RunCronJob(ctx context.Context, id uint64) error {
break
}
}
return errorx.NewBiz("get column data type")
return errorx.NewBiz("get column data type... ignore~")
})
updSql = fmt.Sprintf("and %s > %s", task.UpdField, updFieldDataType.DataType.SQLValue(task.UpdFieldVal))
@@ -394,7 +394,7 @@ func (app *dataSyncAppImpl) saveLog(log *entity.DataSyncLog) {
func (app *dataSyncAppImpl) InitCronJob() {
defer func() {
if err := recover(); err != nil {
logx.ErrorTrace("the data synchronization task failed to initialize: %s", err.(error))
logx.ErrorTrace("the data synchronization task failed to initialize", err)
}
}()
@@ -410,7 +410,12 @@ func (app *dataSyncAppImpl) InitCronJob() {
cond.Status = entity.DataSyncTaskStatusEnable
jobs := new([]entity.DataSyncTask)
pr, _ := app.GetPageList(cond, pageParam, jobs)
pr, err := app.GetPageList(cond, pageParam, jobs)
if err != nil {
logx.ErrorTrace("the data synchronization task failed to initialize", err)
return
}
total := pr.Total
add := 0

View File

@@ -7,13 +7,13 @@ import (
type Db struct {
model.Model
Code string `orm:"column(code)" json:"code"`
Name string `orm:"column(name)" json:"name"`
GetDatabaseMode DbGetDatabaseMode `json:"getDatabaseMode"` // 获取数据库方式
Database string `orm:"column(database)" json:"database"`
Remark string `json:"remark"`
InstanceId uint64
AuthCertName string `json:"authCertName"`
Code string `json:"code" gorm:"size:32;not null;index:idx_code"`
Name string `json:"name" gorm:"size:255;not null;"`
GetDatabaseMode DbGetDatabaseMode `json:"getDatabaseMode" gorm:"comment:库名获取方式(-1.实时获取、1.指定库名)"` // 获取数据库方式
Database string `json:"database" gorm:"size:2000;"`
Remark string `json:"remark" gorm:"size:255;"`
InstanceId uint64 `json:"instanceId" gorm:"not null;"`
AuthCertName string `json:"authCertName" gorm:"size:255;"`
}
type DbGetDatabaseMode int8

View File

@@ -5,34 +5,35 @@ import (
"time"
)
// DataSyncTask 数据同步
type DataSyncTask struct {
model.Model
// 基本信息
TaskName string `orm:"column(task_name)" json:"taskName"` // 任务名
TaskCron string `orm:"column(task_cron)" json:"taskCron"` // 任务Cron表达式
Status int8 `orm:"column(status)" json:"status"` // 状态 1启用 2禁用
TaskKey string `orm:"column(key)" json:"taskKey"` // 任务唯一标识
RecentState int8 `orm:"column(recent_state)" json:"recentState"` // 最近执行状态 1成功 -1失败
RunningState int8 `orm:"column(running_state)" json:"runningState"` // 运行时状态 1运行中、2待运行、3已停止
TaskName string `json:"taskName" gorm:"not null;size:255;comment:任务名"` // 任务名
TaskCron string `json:"taskCron" gorm:"not null;size:50;comment:任务Cron表达式"` // 任务Cron表达式
Status int8 `json:"status" gorm:"not null;default:1;comment:状态 1启用 2禁用"` // 状态 1启用 2禁用
TaskKey string `json:"taskKey" gorm:"size:100;comment:任务唯一标识"` // 任务唯一标识
RecentState int8 `json:"recentState" gorm:"not null;default:0;comment:最近执行状态 1成功 -1失败"` // 最近执行状态 1成功 -1失败
RunningState int8 `json:"runningState" gorm:"not null;default:2;comment:运行时状态 1运行中、2待运行、3已停止"` // 运行时状态 1运行中、2待运行、3已停止
// 源数据库信息
SrcDbId int64 `orm:"column(src_db_id)" json:"srcDbId"`
SrcDbName string `orm:"column(src_db_name)" json:"srcDbName"`
SrcTagPath string `orm:"column(src_tag_path)" json:"srcTagPath"`
DataSql string `orm:"column(data_sql)" json:"dataSql"` // 数据源查询sql
PageSize int `orm:"column(page_size)" json:"pageSize"` // 配置分页sql查询的条数
UpdField string `orm:"column(upd_field)" json:"updField"` // 更新字段, 选择由哪个字段为更新字段查询数据源的时候会带上这个字段where update_time > {最近更新的最大值}
UpdFieldVal string `orm:"column(upd_field_val)" json:"updFieldVal"` // 更新字段当前值
UpdFieldSrc string `orm:"column(upd_field_src)" json:"updFieldSrc"` // 更新值来源, 如select name as user_name from user; 则updFieldSrc的值为user_name
SrcDbId int64 `json:"srcDbId" gorm:"not null;comment:源数据库ID"` // 源数据库ID
SrcDbName string `json:"srcDbName" gorm:"size:100;comment:源数据库名"` // 源数据库名
SrcTagPath string `json:"srcTagPath" gorm:"size:200;comment:源数据库tag路径"` // 源数据库tag路径
DataSql string `json:"dataSql" gorm:"not null;type:text;comment:数据查询sql"` // 数据源查询sql
PageSize int `json:"pageSize" gorm:"not null;comment:数据同步分页大小"` // 配置分页sql查询的条数
UpdField string `json:"updField" gorm:"not null;size:100;default:'id';comment:更新字段,默认'id'"` // 更新字段, 选择由哪个字段为更新字段查询数据源的时候会带上这个字段where update_time > {最近更新的最大值}
UpdFieldVal string `json:"updFieldVal" gorm:"size:100;comment:当前更新值"` // 更新字段当前值
UpdFieldSrc string `json:"updFieldSrc" gorm:"comment:更新值来源, 如select name as user_name from user; 则updFieldSrc的值为user_name"` // 更新值来源, 如select name as user_name from user; 则updFieldSrc的值为user_name
// 目标数据库信息
TargetDbId int64 `orm:"column(target_db_id)" json:"targetDbId"`
TargetDbName string `orm:"column(target_db_name)" json:"targetDbName"`
TargetTagPath string `orm:"column(target_tag_path)" json:"targetTagPath"`
TargetTableName string `orm:"column(target_table_name)" json:"targetTableName"`
FieldMap string `orm:"column(field_map)" json:"fieldMap"` // 字段映射json
DuplicateStrategy int `orm:"column(duplicate_strategy)" json:"duplicateStrategy"` // 冲突策略 -11忽略2覆盖
TargetDbId int64 `json:"targetDbId" gorm:"not null;comment:目标数据库ID"` // 目标数据库ID
TargetDbName string `json:"targetDbName" gorm:"size:150;comment:目标数据库名"` // 目标数据库名
TargetTagPath string `json:"targetTagPath" gorm:"size:255;comment:目标数据库tag路径"` // 目标数据库tag路径
TargetTableName string `json:"targetTableName" gorm:"size:150;comment:目标数据库表名"` // 目标数据库表名
FieldMap string `json:"fieldMap" gorm:"type:text;comment:字段映射json"` // 字段映射json
DuplicateStrategy int `json:"duplicateStrategy" gorm:"not null;default:-1;comment:唯一键冲突策略 -11忽略2覆盖"` // 冲突策略 -11忽略2覆盖
}
func (d *DataSyncTask) TableName() string {
@@ -41,12 +42,13 @@ func (d *DataSyncTask) TableName() string {
type DataSyncLog struct {
model.IdModel
TaskId uint64 `orm:"column(task_id)" json:"taskId"` // 任务表id
CreateTime *time.Time `orm:"column(create_time)" json:"createTime"`
DataSqlFull string `orm:"column(data_sql_full)" json:"dataSqlFull"` // 执行的完整sql
ResNum int `orm:"column(res_num)" json:"resNum"` // 收到数据条数
ErrText string `orm:"column(err_text)" json:"errText"` // 错误日志
Status int8 `orm:"column(status)" json:"status"` // 状态:1.成功 -1.失败
CreateTime *time.Time `json:"createTime" gorm:"not null;"` // 创建时间
TaskId uint64 `json:"taskId" gorm:"not null;comment:同步任务表id"` // 任务表id
DataSqlFull string `json:"dataSqlFull" gorm:"not null;type:text;comment:执行的完整sql"` // 执行的完整sql
ResNum int `json:"resNum" gorm:"comment:收到数据条数"` // 收到数据条数
ErrText string `json:"errText" gorm:"type:text;comment:日志"` // 日志
Status int8 `json:"status" gorm:"not null;default:1;comment:状态:1.成功 0.失败"` // 状态:1.成功 0.失败
}
func (d *DataSyncLog) TableName() string {

View File

@@ -5,18 +5,19 @@ import (
"mayfly-go/pkg/model"
)
// DbInstance 数据库实例信息
type DbInstance struct {
model.Model
Code string `json:"code"`
Name string `json:"name"`
Type string `json:"type"` // 类型mysql oracle等
Host string `json:"host"`
Code string `json:"code" gorm:"size:32;not null;"`
Name string `json:"name" gorm:"size:32;not null;"`
Type string `json:"type" gorm:"size:32;not null;"` // 类型mysql oracle等
Host string `json:"host" gorm:"size:255;not null;"`
Port int `json:"port"`
Network string `json:"network"`
Extra *string `json:"extra"` // 连接需要的其他额外参数json格式, 如oracle需要sid等
Params *string `json:"params"` // 使用指针类型,可更新为零值(空字符串)
Remark *string `json:"remark"`
Network string `json:"network" gorm:"size:20;"`
Extra *string `json:"extra" gorm:"size:1000;comment:连接需要的额外参数如oracle数据库需要sid等"` // 连接需要的其他额外参数json格式, 如oracle需要sid等
Params *string `json:"params" gorm:"size:255;comment:其他连接参数"` // 使用指针类型,可更新为零值(空字符串)
Remark *string `json:"remark" gorm:"size:255;"`
SshTunnelMachineId int `json:"sshTunnelMachineId"` // ssh隧道机器id
}

View File

@@ -4,12 +4,13 @@ import (
"mayfly-go/pkg/model"
)
// DbSql 用户保存的数据库sql
type DbSql struct {
model.Model `orm:"-"`
DbId uint64 `json:"dbId"`
Db string `json:"db"`
Type int `json:"type"` // 类型
Sql string `json:"sql"`
Name string `json:"name"`
DbId uint64 `json:"dbId" gorm:"not null;"`
Db string `json:"db" gorm:"size:100;not null;"`
Type int `json:"type" gorm:"not null;"` // 类型
Sql string `json:"sql" gorm:"size:4000;comment:sql语句"`
Name string `json:"name" gorm:"size:255;not null;comment:sql模板名"`
}

View File

@@ -8,17 +8,17 @@ import (
type DbSqlExec struct {
model.Model `orm:"-"`
DbId uint64 `json:"dbId"`
Db string `json:"db"`
Table string `json:"table"`
Type int8 `json:"type"` // 类型
Sql string `json:"sql"` // 执行的sql
OldValue string `json:"oldValue"`
Remark string `json:"remark"`
Status int8 `json:"status"` // 执行状态
Res string `json:"res"` // 执行结果
DbId uint64 `json:"dbId" gorm:"not null;"`
Db string `json:"db" gorm:"size:150;not null;"`
Table string `json:"table" gorm:"size:150;"`
Type int8 `json:"type" gorm:"not null;"` // 类型
Sql string `json:"sql" gorm:"size:5000;not null;"` // 执行的sql
OldValue string `json:"oldValue" gorm:"size:5000;"`
Remark string `json:"remark" gorm:"size:255;"`
Status int8 `json:"status"` // 执行状态
Res string `json:"res" gorm:"size:1000;"` // 执行结果
FlowBizKey string `json:"flowBizKey"` // 流程业务key
FlowBizKey string `json:"flowBizKey" gorm:"size:50;index:idx_flow_biz_key;comment:流程关联的业务key"` // 流程业务key
}
const (

View File

@@ -7,34 +7,33 @@ import (
type DbTransferTask struct {
model.Model
RunningState int8 `orm:"column(running_state)" json:"runningState"` // 运行状态
TaskName string `json:"taskName" gorm:"size:255;not null;"` // 任务名称
TaskKey string `json:"taskKey" gorm:"size:100;not null;"` // 定时任务唯一uuid key
CronAble int8 `json:"cronAble" gorm:"default:-1;not null;"` // 是否定时 1是 -1否
Cron string `json:"cron" gorm:"size:32;"` // 定时任务cron表达式
Mode int8 `json:"mode"` // 数据迁移方式1、迁移到数据库 2、迁移到文件
TargetFileDbType string `json:"targetFileDbType" gorm:"size:32;"` // 目标文件数据库类型
FileSaveDays int `json:"fileSaveDays"` // 文件保存天数
Status int8 `json:"status"` // 启用状态 1启用 -1禁用
RunningState int8 `json:"runningState"` // 运行状态
LogId uint64 `json:"logId"`
TaskName string `orm:"column(task_name)" json:"taskName"` // 任务名称
Status int8 `orm:"column(status)" json:"status"` // 启用状态 1启用 -1禁用
CronAble int8 `orm:"column(cron_able)" json:"cronAble"` // 是否定时 1是 -1否
Cron string `orm:"column(cron)" json:"cron"` // 定时任务cron表达式
Mode int8 `orm:"column(mode)" json:"mode"` // 数据迁移方式1、迁移到数据库 2、迁移到文件
TargetFileDbType string `orm:"column(target_file_db_type)" json:"targetFileDbType"` // 目标文件数据库类型
FileSaveDays int `json:"fileSaveDays"` // 文件保存天数
TaskKey string `orm:"column(key)" json:"taskKey"` // 定时任务唯一uuid key
CheckedKeys string `orm:"column(checked_keys)" json:"checkedKeys"` // 选中需要迁移的表
DeleteTable int `orm:"column(delete_table)" json:"deleteTable"` // 创建表前是否删除表
NameCase int `orm:"column(name_case)" json:"nameCase"` // 表名、字段大小写转换 1无 2大写 3小写
Strategy int `orm:"column(strategy)" json:"strategy"` // 迁移策略 1全量 2增量
CheckedKeys string `json:"checkedKeys" gorm:"type:text;"` // 选中需要迁移的表
DeleteTable int8 `json:"deleteTable"` // 创建表前是否删除表
NameCase int8 `json:"nameCase"` // 表名、字段大小写转换 1无 2大写 3小写
Strategy int8 `json:"strategy"` // 迁移策略 1全量 2增量
SrcDbId int64 `orm:"column(src_db_id)" json:"srcDbId"` // 源库id
SrcDbName string `orm:"column(src_db_name)" json:"srcDbName"` // 源库名
SrcTagPath string `orm:"column(src_tag_path)" json:"srcTagPath"` // 源库tagPath
SrcDbType string `orm:"column(src_db_type)" json:"srcDbType"` // 源库类型
SrcInstName string `orm:"column(src_inst_name)" json:"srcInstName"` // 源库实例名
TargetDbId int `orm:"column(target_db_id)" json:"targetDbId"` // 目标库id
TargetDbName string `orm:"column(target_db_name)" json:"targetDbName"` // 目标库名
TargetDbType string `orm:"column(target_tag_path)" json:"targetDbType"` // 目标库类型
TargetInstName string `orm:"column(target_db_type)" json:"targetInstName"` // 目标库实例名
TargetTagPath string `orm:"column(target_inst_name)" json:"targetTagPath"` // 目标库tagPath
SrcDbId int64 `json:"srcDbId" gorm:"not null;"` // 源库id
SrcDbName string `json:"srcDbName" gorm:"size:255;not null;"` // 源库名
SrcTagPath string `json:"srcTagPath" gorm:"size:255;"` // 源库tagPath
SrcDbType string `json:"srcDbType" gorm:"size:32;not null;"` // 源库类型
SrcInstName string `json:"srcInstName" gorm:"size:255;"` // 源库实例名
TargetDbId int `json:"targetDbId" gorm:"not null;"` // 目标库id
TargetDbName string `json:"targetDbName" gorm:"size:255;not null;"` // 目标库名
TargetDbType string `json:"targetDbType" gorm:"size:32;not null;"` // 目标库类型
TargetInstName string `json:"targetInstName" gorm:"size:255;"` // 目标库实例名
TargetTagPath string `json:"targetTagPath" gorm:"size:255;"` // 目标库tagPath
}
func (d *DbTransferTask) TableName() string {

View File

@@ -5,15 +5,16 @@ import (
"time"
)
// DbTransferFile 数据库迁移文件管理
type DbTransferFile struct {
model.IdModel
IsDeleted int8 `orm:"column(is_deleted)" json:"-"` // 是否删除 1是 0否
CreateTime *time.Time `orm:"column(create_time)" json:"createTime"` // 创建时间,默认当前时间戳
Status int8 `orm:"column(status)" json:"status"` // 状态 1、执行中 2、执行成功 3、执行失败
TaskId uint64 `orm:"column(task_id)" json:"taskId"` // 迁移任务ID
LogId uint64 `orm:"column(log_id)" json:"logId"` // 日志ID
FileDbType string `orm:"column(file_db_type)" json:"fileDbType"` // sql文件数据库类型
FileKey string `orm:"column(file_key)" json:"fileKey"` // 文件
IsDeleted int8 `json:"-" gorm:"default:0;"` // 是否删除 1是 0否
CreateTime *time.Time `json:"createTime"` // 创建时间,默认当前时间戳
Status int8 `json:"status" gorm:"default:1;comment:状态 1、执行中 2、执行成功 3、执行失败"` // 状态 1、执行中 2、执行成功 3、执行失败
TaskId uint64 `json:"taskId" gorm:"comment:迁移任务ID"` // 迁移任务ID
LogId uint64 `json:"logId" gorm:"comment:日志ID"` // 日志ID
FileDbType string `json:"fileDbType" gorm:"size:32;comment:sql文件数据库类型"` // sql文件数据库类型
FileKey string `json:"fileKey" gorm:"size:50;comment:文件"` // 文件
}
func (d *DbTransferFile) TableName() string {