fix: sql执行记录根据关键词搜索问题修复等

This commit is contained in:
meilin.huang
2025-06-22 10:52:06 +08:00
parent 7eb4d064ea
commit 54d3a5b368
31 changed files with 305 additions and 322 deletions

View File

@@ -80,32 +80,26 @@ func (d *DataSyncTask) SaveTask(rc *req.Ctx) {
func (d *DataSyncTask) DeleteTask(rc *req.Ctx) {
taskId := rc.PathParam("taskId")
rc.ReqParam = taskId
ids := strings.Split(taskId, ",")
for _, v := range ids {
for _, v := range strings.Split(taskId, ",") {
biz.ErrIsNil(d.dataSyncTaskApp.Delete(rc.MetaCtx, cast.ToUint64(v)))
}
}
func (d *DataSyncTask) ChangeStatus(rc *req.Ctx) {
form, task := req.BindJsonAndCopyTo[*form.DataSyncTaskStatusForm, *entity.DataSyncTask](rc)
_ = d.dataSyncTaskApp.UpdateById(rc.MetaCtx, task)
if task.Status == entity.DataSyncTaskStatusEnable {
task, err := d.dataSyncTaskApp.GetById(task.Id)
biz.ErrIsNil(err, "task not found")
d.dataSyncTaskApp.AddCronJob(rc.MetaCtx, task)
} else {
d.dataSyncTaskApp.RemoveCronJobById(task.Id)
}
// 记录请求日志
form := req.BindJsonAndValid[*form.DataSyncTaskStatusForm](rc)
rc.ReqParam = form
task, err := d.dataSyncTaskApp.GetById(form.Id)
biz.ErrIsNil(err)
task.Status = entity.DataSyncTaskStatus(form.Status)
biz.ErrIsNil(d.dataSyncTaskApp.Save(rc.MetaCtx, task))
}
func (d *DataSyncTask) Run(rc *req.Ctx) {
taskId := d.getTaskId(rc)
rc.ReqParam = taskId
_ = d.dataSyncTaskApp.RunCronJob(rc.MetaCtx, taskId)
biz.ErrIsNil(d.dataSyncTaskApp.Run(rc.MetaCtx, taskId))
}
func (d *DataSyncTask) Stop(rc *req.Ctx) {

View File

@@ -13,19 +13,18 @@ import (
"mayfly-go/pkg/biz"
"mayfly-go/pkg/model"
"mayfly-go/pkg/req"
"mayfly-go/pkg/utils/collx"
"strings"
"github.com/may-fly/cast"
)
type DbTransferTask struct {
dbTransferTask application.DbTransferTask `inject:"T"`
dbTransferFile application.DbTransferFile `inject:"T"`
dbApp application.Db `inject:"T"`
tagApp tagapp.TagTree `inject:"T"`
dbSqlExecApp application.DbSqlExec `inject:"T"`
fileApp fileapp.File `inject:"T"`
dbTransferTaskApp application.DbTransferTask `inject:"T"`
dbTransferFileApp application.DbTransferFile `inject:"T"`
dbApp application.Db `inject:"T"`
tagApp tagapp.TagTree `inject:"T"`
dbSqlExecApp application.DbSqlExec `inject:"T"`
fileApp fileapp.File `inject:"T"`
}
func (d *DbTransferTask) ReqConfs() *req.Confs {
@@ -63,13 +62,13 @@ func (d *DbTransferTask) ReqConfs() *req.Confs {
func (d *DbTransferTask) Tasks(rc *req.Ctx) {
queryCond := req.BindQuery[*entity.DbTransferTaskQuery](rc)
res, err := d.dbTransferTask.GetPageList(queryCond)
res, err := d.dbTransferTaskApp.GetPageList(queryCond)
biz.ErrIsNil(err)
resVo := model.PageResultConv[*entity.DbTransferTask, *vo.DbTransferTaskListVO](res)
for _, item := range resVo.List {
item.RunningState = entity.DbTransferTaskRunStateSuccess
if d.dbTransferTask.IsRunning(item.Id) {
if d.dbTransferTaskApp.IsRunning(item.Id) {
item.RunningState = entity.DbTransferTaskRunStateRunning
}
}
@@ -81,48 +80,45 @@ func (d *DbTransferTask) SaveTask(rc *req.Ctx) {
reqForm, task := req.BindJsonAndCopyTo[*form.DbTransferTaskForm, *entity.DbTransferTask](rc)
rc.ReqParam = reqForm
biz.ErrIsNil(d.dbTransferTask.Save(rc.MetaCtx, task))
biz.ErrIsNil(d.dbTransferTaskApp.Save(rc.MetaCtx, task))
}
func (d *DbTransferTask) DeleteTask(rc *req.Ctx) {
taskId := rc.PathParam("taskId")
rc.ReqParam = taskId
ids := strings.Split(taskId, ",")
uids := collx.ArrayMap[string, uint64](ids, func(val string) uint64 {
return cast.ToUint64(val)
})
biz.ErrIsNil(d.dbTransferTask.DeleteById(rc.MetaCtx, uids...))
for _, id := range strings.Split(taskId, ",") {
biz.ErrIsNil(d.dbTransferTaskApp.Delete(rc.MetaCtx, cast.ToUint64(id)))
}
}
func (d *DbTransferTask) ChangeStatus(rc *req.Ctx) {
form, task := req.BindJsonAndCopyTo[*form.DbTransferTaskStatusForm, *entity.DbTransferTask](rc)
_ = d.dbTransferTask.UpdateById(rc.MetaCtx, task)
task, err := d.dbTransferTask.GetById(task.Id)
biz.ErrIsNil(err, "task not found")
d.dbTransferTask.AddCronJob(rc.MetaCtx, task)
// 记录请求日志
form := req.BindJsonAndValid[*form.DbTransferTaskStatusForm](rc)
rc.ReqParam = form
task, err := d.dbTransferTaskApp.GetById(form.Id)
biz.ErrIsNil(err)
task.Status = form.Status
biz.ErrIsNil(d.dbTransferTaskApp.Save(rc.MetaCtx, task))
}
func (d *DbTransferTask) Run(rc *req.Ctx) {
taskId := uint64(rc.PathParamInt("taskId"))
logId, _ := d.dbTransferTask.CreateLog(rc.MetaCtx, taskId)
go d.dbTransferTask.Run(rc.MetaCtx, taskId, logId)
rc.ReqParam = taskId
logId, err := d.dbTransferTaskApp.Run(rc.MetaCtx, taskId)
biz.ErrIsNil(err)
rc.ResData = logId
}
func (d *DbTransferTask) Stop(rc *req.Ctx) {
biz.ErrIsNil(d.dbTransferTask.Stop(rc.MetaCtx, uint64(rc.PathParamInt("taskId"))))
biz.ErrIsNil(d.dbTransferTaskApp.Stop(rc.MetaCtx, uint64(rc.PathParamInt("taskId"))))
}
func (d *DbTransferTask) Files(rc *req.Ctx) {
queryCond := req.BindQuery[*entity.DbTransferFileQuery](rc)
res, err := d.dbTransferFile.GetPageList(queryCond)
res, err := d.dbTransferFileApp.GetPageList(queryCond)
biz.ErrIsNil(err)
rc.ResData = res
}
@@ -136,7 +132,7 @@ func (d *DbTransferTask) FileDel(rc *req.Ctx) {
for _, v := range ids {
uIds = append(uIds, cast.ToUint64(v))
}
biz.ErrIsNil(d.dbTransferFile.Delete(rc.MetaCtx, uIds...))
biz.ErrIsNil(d.dbTransferFileApp.Delete(rc.MetaCtx, uIds...))
}
func (d *DbTransferTask) FileRun(rc *req.Ctx) {
@@ -144,7 +140,7 @@ func (d *DbTransferTask) FileRun(rc *req.Ctx) {
rc.ReqParam = fm
tFile, err := d.dbTransferFile.GetById(fm.Id)
tFile, err := d.dbTransferFileApp.GetById(fm.Id)
biz.IsTrue(tFile != nil && err == nil, "file not found")
targetDbConn, err := d.dbApp.GetDbConn(rc.MetaCtx, fm.TargetDbId, fm.TargetDbName)

View File

@@ -26,5 +26,5 @@ type DataSyncTaskForm struct {
type DataSyncTaskStatusForm struct {
Id uint64 `binding:"required" json:"taskId"`
Status int `json:"status"`
Status int8 `json:"status"`
}

View File

@@ -30,7 +30,7 @@ type DbTransferTaskForm struct {
}
type DbTransferTaskStatusForm struct {
Id uint64 `binding:"required" json:"taskId" form:"taskId"`
Status int `json:"status" form:"status"`
Status int8 `json:"status" form:"status"`
}
type DbTransferFileForm struct {
Id uint64 `json:"id"`

View File

@@ -8,10 +8,12 @@ import (
"mayfly-go/internal/db/dbm/dbi"
"mayfly-go/internal/db/domain/entity"
"mayfly-go/internal/db/domain/repository"
"mayfly-go/internal/db/imsg"
"mayfly-go/pkg/base"
"mayfly-go/pkg/cache"
"mayfly-go/pkg/contextx"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/i18n"
"mayfly-go/pkg/logx"
"mayfly-go/pkg/model"
"mayfly-go/pkg/scheduler"
@@ -36,11 +38,7 @@ type DataSyncTask interface {
InitCronJob()
AddCronJob(ctx context.Context, taskEntity *entity.DataSyncTask)
RemoveCronJobById(taskId uint64)
RunCronJob(ctx context.Context, id uint64) error
Run(ctx context.Context, id uint64) error
StopTask(ctx context.Context, id uint64) error
@@ -61,10 +59,6 @@ var (
whereReg = regexp.MustCompile(`(?i)where`)
)
func (app *dataSyncAppImpl) InjectDbDataSyncTaskRepo(repo repository.DataSyncTask) {
app.Repo = repo
}
func (app *dataSyncAppImpl) GetPageList(condition *entity.DataSyncTaskQuery, orderBy ...string) (*model.PageResult[*entity.DataSyncTask], error) {
return app.GetRepo().GetTaskList(condition, orderBy...)
}
@@ -79,55 +73,31 @@ func (app *dataSyncAppImpl) Save(ctx context.Context, taskEntity *entity.DataSyn
taskEntity.TaskKey = ""
err = app.UpdateById(ctx, taskEntity)
}
if err != nil {
return err
}
task, err := app.GetById(taskEntity.Id)
if err != nil {
return err
}
app.AddCronJob(ctx, task)
app.addCronJob(ctx, taskEntity)
return nil
}
func (app *dataSyncAppImpl) Delete(ctx context.Context, id uint64) error {
if err := app.DeleteById(ctx, id); err != nil {
return err
}
app.RemoveCronJobById(id)
return nil
}
func (app *dataSyncAppImpl) AddCronJob(ctx context.Context, taskEntity *entity.DataSyncTask) {
key := taskEntity.TaskKey
// 先移除旧的任务
scheduler.RemoveByKey(key)
// 根据状态添加新的任务
if taskEntity.Status == entity.DataSyncTaskStatusEnable {
taskId := taskEntity.Id
logx.Infof("start add the data sync task job: %s, cron[%s]", taskEntity.TaskName, taskEntity.TaskCron)
if err := scheduler.AddFunByKey(key, taskEntity.TaskCron, func() {
if err := app.RunCronJob(context.Background(), taskId); err != nil {
logx.Errorf("the data sync task failed to execute at a scheduled time: %s", err.Error())
}
}); err != nil {
logx.ErrorTrace("add db data sync job failed", err)
}
}
}
func (app *dataSyncAppImpl) RemoveCronJobById(taskId uint64) {
task, err := app.GetById(taskId)
task, err := app.GetById(id)
if err == nil {
scheduler.RemoveByKey(task.TaskKey)
return errorx.NewBiz("sync task not found")
}
app.MarkStop(taskId)
scheduler.RemoveByKey(task.TaskKey)
app.MarkStop(id)
return app.DeleteById(ctx, id)
}
func (app *dataSyncAppImpl) RunCronJob(ctx context.Context, id uint64) error {
func (app *dataSyncAppImpl) Run(ctx context.Context, id uint64) error {
if app.IsRunning(id) {
logx.Warn("[%d] the db sync task is running...", id)
return nil
}
task, err := app.GetById(id)
if err != nil {
return errorx.NewBiz("task not found")
@@ -139,10 +109,25 @@ func (app *dataSyncAppImpl) RunCronJob(ctx context.Context, id uint64) error {
return errorx.NewBiz("the task is in progress")
}
updateStateTask := &entity.DataSyncTask{
RunningState: entity.DataSyncTaskRunStateRunning,
}
updateStateTask.Id = id
if err := app.UpdateById(ctx, updateStateTask); err != nil {
return errorx.NewBiz("failed to update task running state: %s", err.Error())
}
// 标记该任务运行中
app.MarkRunning(id)
go func() {
now := time.Now()
syncLog := &entity.DataSyncLog{
TaskId: task.Id,
CreateTime: &now,
Status: entity.DataSyncTaskStateFail, // 默认失败
}
defer app.endRunning(task, syncLog)
// 通过占位符格式化sql
updSql := ""
orderSql := ""
@@ -179,49 +164,40 @@ func (app *dataSyncAppImpl) RunCronJob(ctx context.Context, id uint64) error {
// 组装查询sql
sqlStr := fmt.Sprintf("%s %s %s %s", task.DataSql, where, updSql, orderSql)
syncLog.DataSqlFull = sqlStr
log, err := app.doDataSync(ctx, sqlStr, task)
err := app.doDataSync(ctx, sqlStr, task, syncLog)
if err != nil {
log.ErrText = fmt.Sprintf("execution failure: %s", err.Error())
logx.ErrorContext(ctx, log.ErrText)
log.Status = entity.DataSyncTaskStateFail
syncLog.ErrText = i18n.T(imsg.DataSyncFailMsg, "msg", err.Error())
logx.ErrorContext(ctx, syncLog.ErrText)
syncLog.Status = entity.DataSyncTaskStateFail
} else {
log.Status = entity.DataSyncTaskStateSuccess
syncLog.Status = entity.DataSyncTaskStateSuccess
}
app.endRunning(task, log)
}()
return nil
}
func (app *dataSyncAppImpl) doDataSync(ctx context.Context, sql string, task *entity.DataSyncTask) (*entity.DataSyncLog, error) {
now := time.Now()
syncLog := &entity.DataSyncLog{
TaskId: task.Id,
CreateTime: &now,
DataSqlFull: sql,
Status: entity.DataSyncTaskStateRunning,
}
func (app *dataSyncAppImpl) doDataSync(ctx context.Context, sql string, task *entity.DataSyncTask, syncLog *entity.DataSyncLog) error {
// 获取源数据库连接
srcConn, err := app.dbApp.GetDbConn(ctx, uint64(task.SrcDbId), task.SrcDbName)
if err != nil {
return syncLog, errorx.NewBiz("failed to connect to the source database: %s", err.Error())
return errorx.NewBiz("failed to connect to the source database: %s", err.Error())
}
// 获取目标数据库连接
targetConn, err := app.dbApp.GetDbConn(ctx, uint64(task.TargetDbId), task.TargetDbName)
if err != nil {
return syncLog, errorx.NewBiz("failed to connect to the target database: %s", err.Error())
return errorx.NewBiz("failed to connect to the target database: %s", err.Error())
}
// task.FieldMap为json数组字符串 [{"src":"id","target":"id"}]转为map
var fieldMap []map[string]string
err = json.Unmarshal([]byte(task.FieldMap), &fieldMap)
if err != nil {
return syncLog, errorx.NewBiz("there was an error parsing the field map json: %s", err.Error())
return errorx.NewBiz("there was an error parsing the field map json: %s", err.Error())
}
// 记录本次同步数据总数
@@ -237,7 +213,7 @@ func (app *dataSyncAppImpl) doDataSync(ctx context.Context, sql string, task *en
targetTableColumns, err := targetConn.GetMetadata().GetColumns(task.TargetTableName)
if err != nil {
return syncLog, errorx.NewBiz("failed to get target table columns: %s", err.Error())
return errorx.NewBiz("failed to get target table columns: %s", err.Error())
}
targetColumnName2Column := collx.ArrayToMap(targetTableColumns, func(column dbi.Column) string {
return column.ColumnName
@@ -257,7 +233,7 @@ func (app *dataSyncAppImpl) doDataSync(ctx context.Context, sql string, task *en
}
// 记录当前已同步的数据量
syncLog.ErrText = fmt.Sprintf("during the execution of this task, %d has been synchronized", total)
syncLog.ErrText = i18n.T(imsg.DataSyncingMsg, "count", total)
logx.InfoContext(ctx, syncLog.ErrText)
syncLog.ResNum = total
app.saveLog(syncLog)
@@ -274,23 +250,23 @@ func (app *dataSyncAppImpl) doDataSync(ctx context.Context, sql string, task *en
})
if err != nil {
return syncLog, err
return err
}
// 处理剩余的数据
if len(result) > 0 {
if err := app.srcData2TargetDb(result, fieldMap, updFieldName, task, targetConn, targetInsertColumns); err != nil {
return syncLog, err
return err
}
}
logx.InfofContext(ctx, "synchronous task: [%s], finished execution, save records successfully: [%d]", task.TaskName, total)
// 执行成功日志
syncLog.ErrText = fmt.Sprintf("the synchronous task was executed successfully. New data: %d", total)
syncLog.ErrText = i18n.T(imsg.DataSyncSuccessMsg, "count", total)
syncLog.ResNum = total
return syncLog, nil
return nil
}
func (app *dataSyncAppImpl) srcData2TargetDb(srcRes []map[string]any, fieldMap []map[string]string, updFieldName string, task *entity.DataSyncTask, targetDbConn *dbi.DbConn, targetInsertColumns []dbi.Column) (err error) {
@@ -383,11 +359,6 @@ func (app *dataSyncAppImpl) endRunning(taskEntity *entity.DataSyncTask, log *ent
task.RecentState = state
task.UpdFieldVal = taskEntity.UpdFieldVal
task.RunningState = entity.DataSyncTaskRunStateReady
// 运行失败之后设置任务状态为禁用
//if state == entity.DataSyncTaskStateFail {
// taskEntity.Status = entity.DataSyncTaskStatusDisable
// app.RemoveCronJob(taskEntity)
//}
_ = app.UpdateById(context.Background(), task)
// 保存执行日志
app.saveLog(log)
@@ -409,7 +380,7 @@ func (app *dataSyncAppImpl) InitCronJob() {
_ = app.UpdateByCond(context.TODO(), &entity.DataSyncTask{RunningState: entity.DataSyncTaskRunStateReady}, &entity.DataSyncTask{RunningState: entity.DataSyncTaskRunStateRunning})
if err := app.CursorByCond(&entity.DataSyncTaskQuery{Status: entity.DataSyncTaskStatusEnable}, func(dst *entity.DataSyncTask) error {
app.AddCronJob(contextx.NewTraceId(), dst)
app.addCronJob(contextx.NewTraceId(), dst)
return nil
}); err != nil {
logx.ErrorTrace("the db data sync task failed to initialize: %v", err)
@@ -422,11 +393,6 @@ func (app *dataSyncAppImpl) GetTaskLogList(condition *entity.DataSyncLogQuery, o
// MarkRunning 标记任务执行中
func (app *dataSyncAppImpl) MarkRunning(taskId uint64) {
task := new(entity.DataSyncTask)
task.Id = taskId
task.RunningState = entity.DataSyncTaskRunStateRunning
_ = app.UpdateById(context.Background(), task)
cache.Set(fmt.Sprintf("mayfly:db:syncdata:%d", taskId), 1, -1)
}
@@ -439,3 +405,22 @@ func (app *dataSyncAppImpl) MarkStop(taskId uint64) {
func (app *dataSyncAppImpl) IsRunning(taskId uint64) bool {
return cache.GetStr(fmt.Sprintf("mayfly:db:syncdata:%d", taskId)) != ""
}
func (app *dataSyncAppImpl) addCronJob(ctx context.Context, taskEntity *entity.DataSyncTask) {
key := taskEntity.TaskKey
// 先移除旧的任务
scheduler.RemoveByKey(key)
// 根据状态添加新的任务
if taskEntity.Status == entity.DataSyncTaskStatusEnable {
taskId := taskEntity.Id
logx.Infof("start add the data sync task job: %s, cron[%s]", taskEntity.TaskName, taskEntity.TaskCron)
if err := scheduler.AddFunByKey(key, taskEntity.TaskCron, func() {
if err := app.Run(context.Background(), taskId); err != nil {
logx.Errorf("the data sync task failed to execute at a scheduled time: %s", err.Error())
}
}); err != nil {
logx.ErrorTrace("add db data sync job failed", err)
}
}
}

View File

@@ -99,7 +99,7 @@ func (d *dbSqlExecAppImpl) Exec(ctx context.Context, execSqlReq *dto.DbSqlExecRe
stmts, err := sp.Parse(execSql)
// sql解析失败则使用默认方式切割
if err != nil {
sqlparser.SQLSplit(strings.NewReader(execSql), func(oneSql string) error {
sqlparser.SQLSplit(strings.NewReader(execSql), ';', func(oneSql string) error {
var execRes *dto.DbSqlExecRes
var err error
@@ -229,7 +229,7 @@ func (d *dbSqlExecAppImpl) ExecReader(ctx context.Context, execReader *dto.SqlRe
}
tx, _ := dbConn.Begin()
err := sqlparser.SQLSplit(execReader.Reader, func(sql string) error {
err := sqlparser.SQLSplit(execReader.Reader, ';', func(sql string) error {
if executedStatements%50 == 0 {
if needSendMsg {
ws.SendJsonMsg(ws.UserId(la.Id), clientId, msgdto.InfoSysMsg(i18n.T(imsg.SqlScripRunProgress), &progressMsg{
@@ -589,28 +589,35 @@ func (d *dbSqlExecAppImpl) doExec(ctx context.Context, dbConn *dbi.DbConn, sql s
}
func isSelect(sql string) bool {
return strings.Contains(strings.ToLower(sql[:10]), "select")
return strings.Contains(getSqlPrefix(sql), "select")
}
func isUpdate(sql string) bool {
return strings.Contains(strings.ToLower(sql[:10]), "update")
return strings.Contains(getSqlPrefix(sql), "update")
}
func isDelete(sql string) bool {
return strings.Contains(strings.ToLower(sql[:10]), "delete")
return strings.Contains(getSqlPrefix(sql), "delete")
}
func isInsert(sql string) bool {
return strings.Contains(strings.ToLower(sql[:10]), "insert")
return strings.Contains(getSqlPrefix(sql), "insert")
}
func isOtherQuery(sql string) bool {
sqlPrefix := strings.ToLower(sql[:10])
sqlPrefix := getSqlPrefix(sql)
return strings.Contains(sqlPrefix, "explain") || strings.Contains(sqlPrefix, "show") || strings.Contains(sqlPrefix, "with")
}
func isDDL(sql string) bool {
sqlPrefix := strings.ToLower(sql[:10])
sqlPrefix := getSqlPrefix(sql)
return strings.Contains(sqlPrefix, "create") || strings.Contains(sqlPrefix, "alter") ||
strings.Contains(sqlPrefix, "drop") || strings.Contains(sqlPrefix, "truncate") || strings.Contains(sqlPrefix, "rename")
}
func getSqlPrefix(sql string) string {
if len(sql) < 10 {
return strings.ToLower(sql)
}
return strings.ToLower(sql[:10])
}

View File

@@ -42,13 +42,9 @@ type DbTransferTask interface {
InitCronJob()
AddCronJob(ctx context.Context, taskEntity *entity.DbTransferTask)
RemoveCronJobById(taskId uint64)
CreateLog(ctx context.Context, taskId uint64) (uint64, error)
Run(ctx context.Context, taskId uint64, logId uint64)
// Run 执行迁移任务
// return logId, error
Run(ctx context.Context, taskId uint64) (uint64, error)
IsRunning(taskId uint64) bool
@@ -84,46 +80,19 @@ func (app *dbTransferAppImpl) Save(ctx context.Context, taskEntity *entity.DbTra
if err != nil {
return err
}
// 视情况添加或删除任务
task, err := app.GetById(taskEntity.Id)
if err != nil {
return err
}
app.AddCronJob(ctx, task)
app.addCronJob(ctx, taskEntity)
return nil
}
func (app *dbTransferAppImpl) Delete(ctx context.Context, id uint64) error {
if err := app.DeleteById(ctx, id); err != nil {
return err
task, err := app.GetById(id)
if err != nil {
return errorx.NewBiz("db transfer task not found")
}
app.RemoveCronJobById(id)
scheduler.RemoveByKey(task.TaskKey)
app.MarkStop(id)
return nil
}
func (app *dbTransferAppImpl) AddCronJob(ctx context.Context, taskEntity *entity.DbTransferTask) {
key := taskEntity.TaskKey
// 先移除旧的任务
scheduler.RemoveByKey(key)
// 根据状态添加新的任务
if taskEntity.Status == entity.DbTransferTaskStatusEnable && taskEntity.CronAble == entity.DbTransferTaskCronAbleEnable {
if key == "" {
taskEntity.TaskKey = uuid.New().String()
key = taskEntity.TaskKey
_ = app.UpdateById(ctx, taskEntity)
}
taskId := taskEntity.Id
if err := scheduler.AddFunByKey(key, taskEntity.Cron, func() {
logx.Infof("start the synchronization task: %d", taskId)
logId, _ := app.CreateLog(ctx, taskId)
app.Run(ctx, taskId, logId)
}); err != nil {
logx.ErrorTrace("add db transfer cron job failed", err)
}
}
return app.DeleteById(ctx, id)
}
func (app *dbTransferAppImpl) InitCronJob() {
@@ -144,86 +113,80 @@ func (app *dbTransferAppImpl) InitCronJob() {
_ = app.transferFileApp.UpdateByCond(context.TODO(), &entity.DbTransferFile{Status: entity.DbTransferFileStatusFail}, &entity.DbTransferFile{Status: entity.DbTransferFileStatusRunning})
if err := app.CursorByCond(&entity.DbTransferTaskQuery{Status: entity.DbTransferTaskStatusEnable, CronAble: entity.DbTransferTaskCronAbleEnable}, func(dtt *entity.DbTransferTask) error {
app.AddCronJob(contextx.NewTraceId(), dtt)
app.addCronJob(contextx.NewTraceId(), dtt)
return nil
}); err != nil {
logx.ErrorTrace("the db data transfer task failed to initialize", err)
}
}
func (app *dbTransferAppImpl) CreateLog(ctx context.Context, taskId uint64) (uint64, error) {
logId, err := app.logApp.CreateLog(ctx, &sysapp.CreateLogReq{
Description: "DBMS - Execution DB Transfer",
ReqParam: collx.Kvs("taskId", taskId),
Type: sysentity.SyslogTypeRunning,
Resp: "Data transfer starts...",
})
return logId, err
}
func (app *dbTransferAppImpl) Run(ctx context.Context, taskId uint64) (uint64, error) {
if app.IsRunning(taskId) {
return 0, errorx.NewBiz("the db transfer task [%d] is running, please do not repeat the operation", taskId)
}
func (app *dbTransferAppImpl) Run(ctx context.Context, taskId uint64, logId uint64) {
task, err := app.GetById(taskId)
if err != nil {
logx.Errorf("Create DBMS- Failed to perform data transfer log: %v", err)
return
}
if app.IsRunning(taskId) {
logx.Error("[%d] the task is running...", taskId)
return
return 0, errorx.NewBiz("db transfer task [%d] not found", taskId)
}
logId, _ := app.CreateLog(ctx, taskId)
start := time.Now()
// 修改状态与关联日志id
task.LogId = logId
task.RunningState = entity.DbTransferTaskRunStateRunning
if err = app.UpdateById(ctx, task); err != nil {
logx.Errorf("failed to update task execution status")
return
return logId, err
}
// 标记该任务开始执行
app.MarkRunning(taskId)
// 获取源库连接、目标库连接判断连接可用性否则记录日志xx连接不可用
// 获取源库表信息
srcConn, err := app.dbApp.GetDbConn(ctx, uint64(task.SrcDbId), task.SrcDbName)
if err != nil {
app.EndTransfer(ctx, logId, taskId, "failed to obtain source db connection", err, nil)
return
}
// 获取迁移表信息
var tables []dbi.Table
if task.CheckedKeys == "all" {
tables, err = srcConn.GetMetadata().GetTables()
go func() {
// 获取源库连接、目标库连接判断连接可用性否则记录日志xx连接不可用
// 获取源库表信息
srcConn, err := app.dbApp.GetDbConn(ctx, uint64(task.SrcDbId), task.SrcDbName)
if err != nil {
app.EndTransfer(ctx, logId, taskId, "failed to get source table information", err, nil)
app.EndTransfer(ctx, logId, taskId, "failed to obtain source db connection", err, nil)
return
}
} else {
tableNames := strings.Split(task.CheckedKeys, ",")
tables, err = srcConn.GetMetadata().GetTables(tableNames...)
if err != nil {
app.EndTransfer(ctx, logId, taskId, "failed to get source table information", err, nil)
// 获取迁移表信息
var tables []dbi.Table
if task.CheckedKeys == "all" {
tables, err = srcConn.GetMetadata().GetTables()
if err != nil {
app.EndTransfer(ctx, logId, taskId, "failed to get source table information", err, nil)
return
}
} else {
tableNames := strings.Split(task.CheckedKeys, ",")
tables, err = srcConn.GetMetadata().GetTables(tableNames...)
if err != nil {
app.EndTransfer(ctx, logId, taskId, "failed to get source table information", err, nil)
return
}
}
// 迁移到文件或数据库
switch task.Mode {
case entity.DbTransferTaskModeFile:
app.transfer2File(ctx, taskId, logId, task, srcConn, start, tables)
case entity.DbTransferTaskModeDb:
app.transfer2Db(ctx, taskId, logId, task, srcConn, start, tables)
default:
app.EndTransfer(ctx, logId, taskId, "error in transfer mode, only migrating to files or databases is currently supported", err, nil)
return
}
}
}()
// 迁移到文件或数据库
if task.Mode == entity.DbTransferTaskModeFile {
app.transfer2File(ctx, taskId, logId, task, srcConn, start, tables)
} else if task.Mode == entity.DbTransferTaskModeDb {
defer app.MarkStop(taskId)
defer app.logApp.Flush(logId, true)
app.transfer2Db(ctx, taskId, logId, task, srcConn, start, tables)
} else {
app.EndTransfer(ctx, logId, taskId, "error in transfer mode, only migrating to files or databases is currently supported", err, nil)
return
}
return logId, nil
}
func (app *dbTransferAppImpl) transfer2Db(ctx context.Context, taskId uint64, logId uint64, task *entity.DbTransferTask, srcConn *dbi.DbConn, start time.Time, tables []dbi.Table) {
defer app.MarkStop(taskId)
defer app.logApp.Flush(logId, true)
// 获取目标库表信息
targetConn, err := app.dbApp.GetDbConn(ctx, uint64(task.TargetDbId), task.TargetDbName)
if err != nil {
@@ -235,7 +198,7 @@ func (app *dbTransferAppImpl) transfer2Db(ctx context.Context, taskId uint64, lo
tableNames := collx.ArrayMap(tables, func(t dbi.Table) string { return t.TableName })
// 分组迁移
tableGroups := collx.ArraySplit[string](tableNames, 2)
tableGroups := collx.ArraySplit(tableNames, 2)
errGroup, _ := errgroup.WithContext(ctx)
for _, tables := range tableGroups {
@@ -288,7 +251,7 @@ func (app *dbTransferAppImpl) transfer2Db(ctx context.Context, taskId uint64, lo
return err
}
tx, _ := targetConn.Begin()
err = sqlparser.SQLSplit(pr, func(stmt string) error {
err = sqlparser.SQLSplit(pr, ';', func(stmt string) error {
if _, err := targetConn.TxExec(tx, stmt); err != nil {
app.EndTransfer(ctx, logId, taskId, fmt.Sprintf("执行sql出错: %s", stmt), err, nil)
pw.CloseWithError(err)
@@ -308,9 +271,7 @@ func (app *dbTransferAppImpl) transfer2Db(ctx context.Context, taskId uint64, lo
})
}
err = errGroup.Wait()
if err != nil {
if err = errGroup.Wait(); err != nil {
app.EndTransfer(ctx, logId, taskId, "transfer table failed", err, nil)
return
}
@@ -417,6 +378,31 @@ func (d *dbTransferAppImpl) TimerDeleteTransferFile() {
})
}
func (app *dbTransferAppImpl) addCronJob(ctx context.Context, taskEntity *entity.DbTransferTask) {
key := taskEntity.TaskKey
// 先移除旧的任务
scheduler.RemoveByKey(key)
// 根据状态添加新的任务
if taskEntity.Status == entity.DbTransferTaskStatusEnable && taskEntity.CronAble == entity.DbTransferTaskCronAbleEnable {
if key == "" {
taskEntity.TaskKey = uuid.New().String()
key = taskEntity.TaskKey
_ = app.UpdateById(ctx, taskEntity)
}
taskId := taskEntity.Id
if err := scheduler.AddFunByKey(key, taskEntity.Cron, func() {
logx.Infof("start the synchronization task: %d", taskId)
if _, err := app.Run(ctx, taskId); err != nil {
logx.Warn(err.Error())
}
}); err != nil {
logx.ErrorTrace("add db transfer cron job failed", err)
}
}
}
// MarkRunning 标记任务执行中
func (app *dbTransferAppImpl) MarkRunning(taskId uint64) {
cache.Set(fmt.Sprintf("mayfly:db:transfer:%d", taskId), 1, -1)
@@ -432,6 +418,16 @@ func (app *dbTransferAppImpl) IsRunning(taskId uint64) bool {
return cache.GetStr(fmt.Sprintf("mayfly:db:transfer:%d", taskId)) != ""
}
func (app *dbTransferAppImpl) CreateLog(ctx context.Context, taskId uint64) (uint64, error) {
logId, err := app.logApp.CreateLog(ctx, &sysapp.CreateLogReq{
Description: "DBMS - Execution DB Transfer",
ReqParam: collx.Kvs("taskId", taskId),
Type: sysentity.SyslogTypeRunning,
Resp: "Data transfer starts...",
})
return logId, err
}
func (app *dbTransferAppImpl) Log(ctx context.Context, logId uint64, msg string, extra ...any) {
logType := sysentity.SyslogTypeRunning
logx.InfoContext(ctx, msg)
@@ -467,10 +463,3 @@ func (app *dbTransferAppImpl) EndTransfer(ctx context.Context, logId uint64, tas
task.RunningState = transferState
app.UpdateById(context.Background(), task)
}
func (app *dbTransferAppImpl) RemoveCronJobById(taskId uint64) {
task, err := app.GetById(taskId)
if err == nil {
scheduler.RemoveByKey(task.TaskKey)
}
}

View File

@@ -16,6 +16,6 @@ type SqlParser interface {
}
// SQLSplit sql切割
func SQLSplit(r io.Reader, callback utils.StmtCallback) error {
return utils.SplitStmts(r, callback)
func SQLSplit(r io.Reader, delimiter rune, callback utils.StmtCallback) error {
return utils.SplitStmts(r, delimiter, callback)
}

View File

@@ -23,7 +23,7 @@ DROP COLUMN delete_time;
⑸ 其他工作;⑹ 甲方现场要求乙方完成的其它临时工作。', '{"ip":"::1 ","username":"test_user"}', 'errCode: 401, errMsg: 您的密码安全等级较低,请修改后重新登录;\n信息嘻嘻嘻', '-', 0, '2024-04-23 20:00:35', 0, NULL, '');
`
err := SQLSplit(strings.NewReader(allsql), func(sql string) error {
err := SQLSplit(strings.NewReader(allsql), ';', func(sql string) error {
// if strings.Contains(sql, "INSERT") {
// return fmt.Errorf("不能存在INSERT语句")
// }

View File

@@ -10,12 +10,12 @@ type DataSyncTask struct {
model.Model
// 基本信息
TaskName string `json:"taskName" gorm:"not null;size:255;comment:任务名"` // 任务名
TaskCron string `json:"taskCron" gorm:"not null;size:50;comment:任务Cron表达式"` // 任务Cron表达式
Status int8 `json:"status" gorm:"not null;default:1;comment:状态 1启用 2禁用"` // 状态 1启用 2禁用
TaskKey string `json:"taskKey" gorm:"size:100;comment:任务唯一标识"` // 任务唯一标识
RecentState int8 `json:"recentState" gorm:"not null;default:0;comment:最近执行状态 1成功 -1失败"` // 最近执行状态 1成功 -1失败
RunningState int8 `json:"runningState" gorm:"not null;default:2;comment:运行时状态 1运行中、2待运行、3已停止"` // 运行时状态 1运行中、2待运行、3已停止
TaskName string `json:"taskName" gorm:"not null;size:255;comment:任务名"` // 任务名
TaskCron string `json:"taskCron" gorm:"not null;size:50;comment:任务Cron表达式"` // 任务Cron表达式
Status DataSyncTaskStatus `json:"status" gorm:"not null;default:1;comment:状态 1启用 2禁用"` // 状态 1启用 2禁用
TaskKey string `json:"taskKey" gorm:"size:100;comment:任务唯一标识"` // 任务唯一标识
RecentState int8 `json:"recentState" gorm:"not null;default:0;comment:最近执行状态 1成功 -1失败"` // 最近执行状态 1成功 -1失败
RunningState int8 `json:"runningState" gorm:"not null;default:2;comment:运行时状态 1运行中、2待运行、3已停止"` // 运行时状态 1运行中、2待运行、3已停止
// 源数据库信息
SrcDbId int64 `json:"srcDbId" gorm:"not null;comment:源数据库ID"` // 源数据库ID
@@ -55,9 +55,12 @@ func (d *DataSyncLog) TableName() string {
return "t_db_data_sync_log"
}
// DataSyncTaskQuery 数据同步任务状态
type DataSyncTaskStatus int8
const (
DataSyncTaskStatusEnable int8 = 1 // 启用状态
DataSyncTaskStatusDisable int8 = -1 // 禁用状态
DataSyncTaskStatusEnable DataSyncTaskStatus = 1 // 启用状态
DataSyncTaskStatusDisable DataSyncTaskStatus = -1 // 禁用状态
DataSyncTaskStateSuccess int8 = 1 // 执行成功状态
DataSyncTaskStateRunning int8 = 2 // 执行中状态

View File

@@ -18,8 +18,8 @@ type InstanceQuery struct {
type DataSyncTaskQuery struct {
model.PageParam
Name string `json:"name" form:"name"`
Status int8 `json:"status" form:"status"`
Name string `json:"name" form:"name"`
Status DataSyncTaskStatus `json:"status" form:"status"`
}
type DataSyncLogQuery struct {
model.PageParam

View File

@@ -37,4 +37,7 @@ var En = map[i18n.MsgId]string{
LogDataSyncSave: "datasync - Save data sync task",
LogDataSyncDelete: "datasync - Delete data sync task",
LogDataSyncChangeStatus: "datasync - Change status",
DataSyncSuccessMsg: "the synchronous task was executed successfully. New data: {{.count}}",
DataSyncFailMsg: "execution failure: {{.msg}}",
DataSyncingMsg: "during the execution of this task, {{.count}} has been synchronized",
}

View File

@@ -47,4 +47,7 @@ const (
LogDataSyncSave
LogDataSyncDelete
LogDataSyncChangeStatus
DataSyncSuccessMsg
DataSyncFailMsg
DataSyncingMsg
)

View File

@@ -37,4 +37,7 @@ var Zh_CN = map[i18n.MsgId]string{
LogDataSyncSave: "datasync-保存数据同步任务",
LogDataSyncDelete: "datasync-删除数据同步任务",
LogDataSyncChangeStatus: "datasync-启停任务",
DataSyncSuccessMsg: "执行成功,本次同步{{.count}}条",
DataSyncFailMsg: "执行失败: {{.msg}}",
DataSyncingMsg: "执行中,已同步{{.count}}条",
}

View File

@@ -24,7 +24,7 @@ func (d *dbSqlExecRepoImpl) GetPageList(condition *entity.DbSqlExecQuery, orderB
Eq("creator_id", condition.CreatorId).
Eq("flow_biz_key", condition.FlowBizKey).
In("status", condition.Status).
Like("sql", condition.Keyword).
Like("`sql`", condition.Keyword).
Ge("create_time", condition.StartTime).
Le("create_time", condition.EndTime).
RLike("db", condition.Db).OrderBy(orderBy...)

View File

@@ -20,9 +20,9 @@ import (
"strings"
"github.com/may-fly/cast"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo/options"
)
type Mongo struct {
@@ -196,7 +196,7 @@ func (m *Mongo) FindCommand(rc *req.Ctx) {
// 处理_id查询字段,使用ObjectId函数包装
id, ok := filter["_id"].(string)
if ok && id != "" {
objId, err := primitive.ObjectIDFromHex(id)
objId, err := bson.ObjectIDFromHex(id)
if err == nil {
filter["_id"] = objId
}
@@ -222,7 +222,7 @@ func (m *Mongo) UpdateByIdCommand(rc *req.Ctx) {
docId := commandForm.DocId
docIdVal, ok := docId.(string)
if ok {
objId, err := primitive.ObjectIDFromHex(docIdVal)
objId, err := bson.ObjectIDFromHex(docIdVal)
if err == nil {
docId = objId
}
@@ -246,7 +246,7 @@ func (m *Mongo) DeleteByIdCommand(rc *req.Ctx) {
docId := commandForm.DocId
docIdVal, ok := docId.(string)
if ok {
objId, err := primitive.ObjectIDFromHex(docIdVal)
objId, err := bson.ObjectIDFromHex(docIdVal)
if err == nil {
docId = objId
}

View File

@@ -4,7 +4,7 @@ import (
"context"
"mayfly-go/pkg/logx"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/v2/mongo"
)
type MongoConn struct {

View File

@@ -11,8 +11,8 @@ import (
machineapp "mayfly-go/internal/machine/application"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
)
type MongoInfo struct {
@@ -36,7 +36,7 @@ func (mi *MongoInfo) Conn() (*MongoConn, error) {
mongoOptions.SetDialer(&MongoSshDialer{machineId: mi.SshTunnelMachineId})
}
client, err := mongo.Connect(ctx, mongoOptions)
client, err := mongo.Connect(mongoOptions)
if err != nil {
return nil, err
}

View File

@@ -4,7 +4,7 @@ import "fmt"
const (
AppName = "mayfly-go"
Version = "v1.10.1"
Version = "v1.10.2"
)
func GetAppInfo() string {

View File

@@ -1,22 +1,8 @@
package consts
import "time"
const (
AdminId = 1
MachineConnExpireTime = 60 * time.Minute
DbConnExpireTime = 120 * time.Minute
RedisConnExpireTime = 30 * time.Minute
MongoConnExpireTime = 30 * time.Minute
EsConnExpireTime = 30 * time.Minute
/**** 开发测试使用 ****/
// MachineConnExpireTime = 4 * time.Minute
// DbConnExpireTime = 2 * time.Minute
// RedisConnExpireTime = 2 * time.Minute
// MongoConnExpireTime = 2 * time.Minute
ResourceTypeMachine int8 = 1
ResourceTypeDbInstance int8 = 2
ResourceTypeRedis int8 = 3

View File

@@ -11,8 +11,8 @@ import (
// StmtCallback stmt回调函数
type StmtCallback func(stmt string) error
// SplitStmts 语句切割(用于以;结尾为一条语句,并且去除// -- /**/等注释)主要由阿里通义灵码提供
func SplitStmts(r io.Reader, callback StmtCallback) error {
// SplitStmts 语句切割(用于以指定delimiter结尾为一条语句,并且去除// -- /**/等注释)主要由阿里通义灵码提供
func SplitStmts(r io.Reader, delimiter rune, callback StmtCallback) error {
reader := bufio.NewReaderSize(r, 512*1024)
buffer := new(bytes.Buffer) // 使用 bytes.Buffer 来处理数据
var currentStatement bytes.Buffer
@@ -83,7 +83,7 @@ func SplitStmts(r io.Reader, callback StmtCallback) error {
stringDelimiter = r
currentStatement.WriteRune(r)
buffer.Next(size)
case r == ';' && !inString && !inMultiLineComment && !inSingleLineComment:
case r == delimiter && !inString && !inMultiLineComment && !inSingleLineComment:
sql := strings.TrimSpace(currentStatement.String())
if sql != "" {
if err := callback(sql); err != nil {

View File

@@ -265,7 +265,7 @@ func (r *redisAppImpl) FlowBizHandle(ctx context.Context, bizHandleParam *flowap
handleRes := make([]map[string]any, 0)
hasErr := false
utils.SplitStmts(strings.NewReader(runCmdParam.Cmd), func(stmt string) error {
utils.SplitStmts(strings.NewReader(runCmdParam.Cmd), ';', func(stmt string) error {
cmd := strings.TrimSpace(stmt)
runRes := collx.Kvs("cmd", cmd)
if res, err := redisConn.RunCmd(ctx, collx.ArrayMap[string, any](parseRedisCommand(cmd), func(val string) any { return val })...); err != nil {

View File

@@ -8,7 +8,7 @@ import (
)
func TestParseRedisCommand(t *testing.T) {
utils.SplitStmts(strings.NewReader("del 'key l3'; set key2 key3; set 'key3' 'value3 and value4'; hset field1 key1 'hvalue2 hvalue3'"), func(stmt string) error {
utils.SplitStmts(strings.NewReader("del 'key l3'; set key2 key3; set 'key3' 'value3 and value4'; hset field1 key1 'hvalue2 hvalue3'"), ';', func(stmt string) error {
res := parseRedisCommand(stmt)
fmt.Println(res)
return nil