From aac4c2b42b5567ea19580329d9c3b74a23a681d6 Mon Sep 17 00:00:00 2001 From: "meilin.huang" <954537473@qq.com> Date: Sun, 1 Jun 2025 20:39:54 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E6=9C=BA=E5=99=A8=E8=AE=A1=E5=88=92?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E3=80=81=E6=95=B0=E6=8D=AE=E5=BA=93=E8=BF=81?= =?UTF-8?q?=E7=A7=BB=E4=BB=BB=E5=8A=A1=E5=88=9D=E5=A7=8B=E5=8C=96=E9=97=AE?= =?UTF-8?q?=E9=A2=98=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../internal/db/application/db_data_sync.go | 31 +++---------------- server/internal/db/application/db_transfer.go | 31 +++---------------- .../machine/application/machine_cronjob.go | 27 +++------------- server/pkg/base/app.go | 26 ++++++++++++++++ server/pkg/base/repo.go | 12 +++++++ 5 files changed, 53 insertions(+), 74 deletions(-) diff --git a/server/internal/db/application/db_data_sync.go b/server/internal/db/application/db_data_sync.go index 66b1e9c9..bb2dbe4c 100644 --- a/server/internal/db/application/db_data_sync.go +++ b/server/internal/db/application/db_data_sync.go @@ -408,32 +408,11 @@ func (app *dataSyncAppImpl) InitCronJob() { // 修改执行中状态为待执行 _ = app.UpdateByCond(context.TODO(), &entity.DataSyncTask{RunningState: entity.DataSyncTaskRunStateReady}, &entity.DataSyncTask{RunningState: entity.DataSyncTaskRunStateRunning}) - // 把所有正常任务添加到定时任务中 - cond := new(entity.DataSyncTaskQuery) - cond.PageNum = 1 - cond.PageSize = 100 - cond.Status = entity.DataSyncTaskStatusEnable - - tasks, err := app.GetPageList(cond) - if err != nil { - logx.ErrorTrace("the data synchronization task failed to initialize", err) - return - } - - total := tasks.Total - add := 0 - - for { - for _, job := range tasks.List { - app.AddCronJob(contextx.NewTraceId(), job) - add++ - } - if add >= int(total) { - return - } - - cond.PageNum = cond.PageNum + 1 - tasks, _ = app.GetPageList(cond) + if err := app.CursorByCond(&entity.DataSyncTaskQuery{Status: entity.DataSyncTaskStatusEnable}, func(dst *entity.DataSyncTask) error { + app.AddCronJob(contextx.NewTraceId(), dst) + return nil + }); err != nil { + logx.ErrorTrace("the db data sync task failed to initialize: %v", err) } } diff --git a/server/internal/db/application/db_transfer.go b/server/internal/db/application/db_transfer.go index e6c1d54f..81babe60 100644 --- a/server/internal/db/application/db_transfer.go +++ b/server/internal/db/application/db_transfer.go @@ -143,32 +143,11 @@ func (app *dbTransferAppImpl) InitCronJob() { // 把所有运行中的文件状态设置为失败 _ = app.transferFileApp.UpdateByCond(context.TODO(), &entity.DbTransferFile{Status: entity.DbTransferFileStatusFail}, &entity.DbTransferFile{Status: entity.DbTransferFileStatusRunning}) - // 把所有需要定时执行的任务添加到定时任务中 - cond := new(entity.DbTransferTaskQuery) - cond.PageNum = 1 - cond.PageSize = 100 - - cond.Status = entity.DbTransferTaskStatusEnable - cond.CronAble = entity.DbTransferTaskCronAbleEnable - jobs := []entity.DbTransferTask{} - - pr, _ := app.GetPageList(cond) - if nil == pr || pr.Total == 0 { - return - } - total := pr.Total - add := 0 - - for { - for _, job := range jobs { - app.AddCronJob(contextx.NewTraceId(), &job) - add++ - } - if add >= int(total) { - return - } - cond.PageNum++ - _, _ = app.GetPageList(cond) + if err := app.CursorByCond(&entity.DbTransferTaskQuery{Status: entity.DbTransferTaskStatusEnable, CronAble: entity.DbTransferTaskCronAbleEnable}, func(dtt *entity.DbTransferTask) error { + app.AddCronJob(contextx.NewTraceId(), dtt) + return nil + }); err != nil { + logx.ErrorTrace("the db data transfer task failed to initialize", err) } } diff --git a/server/internal/machine/application/machine_cronjob.go b/server/internal/machine/application/machine_cronjob.go index b0f58baf..6df7b3cc 100644 --- a/server/internal/machine/application/machine_cronjob.go +++ b/server/internal/machine/application/machine_cronjob.go @@ -101,28 +101,11 @@ func (m *machineCronJobAppImpl) InitCronJob() { } }() - pageParam := model.PageParam{ - PageSize: 100, - PageNum: 1, - } - - var mcjs []*entity.MachineCronJob - cond := &entity.MachineCronJob{Status: entity.MachineCronJobStatusEnable} - pr, _ := m.GetPageList(cond, pageParam) - total := pr.Total - add := 0 - - for { - for _, mcj := range mcjs { - m.addCronJob(mcj) - add++ - } - if add >= int(total) { - return - } - - pageParam.PageNum = pageParam.PageNum + 1 - m.GetPageList(cond, pageParam) + if err := m.CursorByCond(&entity.MachineCronJob{Status: entity.MachineCronJobStatusEnable}, func(mcj *entity.MachineCronJob) error { + m.addCronJob(mcj) + return nil + }); err != nil { + logx.ErrorTrace("the machine cronjob failed to initialize: %v", err) } } diff --git a/server/pkg/base/app.go b/server/pkg/base/app.go index 8d213420..2c9f6b49 100644 --- a/server/pkg/base/app.go +++ b/server/pkg/base/app.go @@ -62,6 +62,9 @@ type App[T model.ModelI] interface { // @param cond 可为*model.QueryCond也可以为普通查询model CountByCond(cond any) int64 + // CursorByCond 根据指定条件遍历model表数据 + CursorByCond(cond any, handler func(T) error) error + // Tx 执行事务操作 Tx(ctx context.Context, funcs ...func(context.Context) error) (err error) } @@ -152,6 +155,29 @@ func (ai *AppImpl[T, R]) CountByCond(cond any) int64 { return ai.GetRepo().CountByCond(cond) } +func (ai *AppImpl[T, R]) CursorByCond(cond any, handler func(T) error) error { + offset := 0 + batchSize := 200 + for { + data, err := ai.GetRepo().SelectByCondWithOffset(cond, batchSize, offset) + if err != nil { + return err + } + if len(data) == 0 { + break + } + + for _, item := range data { + if err := handler(item); err != nil { + return err + } + } + + offset += len(data) + } + return nil +} + // Tx 执行事务操作 func (ai *AppImpl[T, R]) Tx(ctx context.Context, funcs ...func(context.Context) error) (err error) { dbCtx := ctx diff --git a/server/pkg/base/repo.go b/server/pkg/base/repo.go index fe1136f9..0d7aaed9 100644 --- a/server/pkg/base/repo.go +++ b/server/pkg/base/repo.go @@ -89,6 +89,9 @@ type Repo[T model.ModelI] interface { // CountByCond 根据指定条件统计model表的数量 CountByCond(cond any) int64 + + // SelectByCondWithOffset 根据条件查询数据并支持 offset + limit 分页 + SelectByCondWithOffset(cond any, limit int, offset int) ([]T, error) } var _ (Repo[*model.Model]) = (*RepoImpl[*model.Model])(nil) @@ -251,6 +254,15 @@ func (br *RepoImpl[T]) CountByCond(cond any) int64 { return gormx.CountByCond(br.GetModel(), toQueryCond(cond)) } +func (br *RepoImpl[T]) SelectByCondWithOffset(cond any, limit int, offset int) ([]T, error) { + var models []T + err := gormx.NewQuery(br.GetModel(), toQueryCond(cond)).GenGdb().Limit(limit).Offset(offset).Find(&models).Error + if err != nil { + return nil, err + } + return models, nil +} + // NewModel 新建模型实例 func (br *RepoImpl[T]) NewModel() T { newModel := reflect.New(br.getModelType()).Interface()