From 3a4382a9abebe413e72f7ad3ebefe7c0153473d5 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Thu, 4 Jul 2024 13:03:47 +0800 Subject: [PATCH] =?UTF-8?q?fix(sj=5F1.1.0-beta2):=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E4=BA=8B=E5=8A=A1=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../support/dispatch/JobExecutorActor.java | 33 ++++---- .../task/MapReduceTaskGenerator.java | 77 +++++++++---------- 2 files changed, 56 insertions(+), 54 deletions(-) diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java index e7b5118fd..b5d55fd6f 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java @@ -149,20 +149,27 @@ public class JobExecutorActor extends AbstractActor { return; } - // 获取工作流的上下文 - WorkflowTaskBatch workflowTaskBatch = null; - Long workflowTaskBatchId = taskExecute.getWorkflowTaskBatchId(); - if (Objects.nonNull(workflowTaskBatchId)) { - workflowTaskBatch = workflowTaskBatchMapper.selectOne( - new LambdaQueryWrapper() - .select(WorkflowTaskBatch::getWfContext) - .eq(WorkflowTaskBatch::getId, taskExecute.getWorkflowTaskBatchId()) - ); - } + // 事务提交以后再执行任务 + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCompletion(final int status) { + // 获取工作流的上下文 + WorkflowTaskBatch workflowTaskBatch = null; + Long workflowTaskBatchId = taskExecute.getWorkflowTaskBatchId(); + if (Objects.nonNull(workflowTaskBatchId)) { + workflowTaskBatch = workflowTaskBatchMapper.selectOne( + new LambdaQueryWrapper() + .select(WorkflowTaskBatch::getWfContext) + .eq(WorkflowTaskBatch::getId, taskExecute.getWorkflowTaskBatchId()) + ); + } + + // 执行任务 + JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(job.getTaskType()); + jobExecutor.execute(buildJobExecutorContext(taskExecute, job, taskList, workflowTaskBatch)); + } + }); - // 执行任务 - JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(job.getTaskType()); - jobExecutor.execute(buildJobExecutorContext(taskExecute, job, taskList, workflowTaskBatch)); } finally { log.debug("准备执行任务完成.[{}]", JsonUtil.toJsonString(taskExecute)); final int finalTaskStatus = taskStatus; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java index 5bd20e1f3..cd5b5503a 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java @@ -29,6 +29,7 @@ import lombok.RequiredArgsConstructor; import org.jetbrains.annotations.NotNull; import org.springframework.stereotype.Component; import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallback; import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionTemplate; @@ -178,7 +179,6 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { return finalJobTasks; } - @NotNull private List createMapJobTasks(final JobTaskGenerateContext context) { List mapSubTask = context.getMapSubTask(); if (CollUtil.isEmpty(mapSubTask)) { @@ -194,51 +194,46 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { .eq(JobTask::getLeaf, StatusEnum.YES.getStatus()) ); - List jobTasks = new ArrayList<>(mapSubTask.size()); + return transactionTemplate.execute(status -> { + List jobTasks = new ArrayList<>(mapSubTask.size()); + for (int index = 0; index < mapSubTask.size(); index++) { + Pair clientInfo = getClientNodeInfo(context); - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - @Override - protected void doInTransactionWithoutResult(final TransactionStatus status) { + // 新增任务实例 + JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); + jobTask.setClientInfo(clientInfo.getKey()); + jobTask.setArgsType(context.getArgsType()); + JobArgsHolder jobArgsHolder = new JobArgsHolder(); + jobArgsHolder.setJobParams(context.getArgsStr()); + jobArgsHolder.setMaps(mapSubTask.get(index)); + jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); + jobTask.setArgsType(JobArgsTypeEnum.JSON.getArgsType()); + jobTask.setTaskStatus(clientInfo.getValue()); + jobTask.setMrStage(MapReduceStageEnum.MAP.getStage()); + jobTask.setTaskName(context.getTaskName()); + jobTask.setLeaf(StatusEnum.YES.getStatus()); + jobTask.setParentId(Objects.isNull(context.getParentId()) ? 0L : context.getParentId()); + jobTask.setRetryCount(0); + jobTask.setCreateDt(LocalDateTime.now()); + jobTask.setUpdateDt(LocalDateTime.now()); + jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); + jobTasks.add(jobTask); + } - for (int index = 0; index < mapSubTask.size(); index++) { - Pair clientInfo = getClientNodeInfo(context); + batchSaveJobTasks(jobTasks); - // 新增任务实例 - JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); - jobTask.setClientInfo(clientInfo.getKey()); - jobTask.setArgsType(context.getArgsType()); - JobArgsHolder jobArgsHolder = new JobArgsHolder(); - jobArgsHolder.setJobParams(context.getArgsStr()); - jobArgsHolder.setMaps(mapSubTask.get(index)); - jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); - jobTask.setArgsType(JobArgsTypeEnum.JSON.getArgsType()); - jobTask.setTaskStatus(clientInfo.getValue()); - jobTask.setMrStage(MapReduceStageEnum.MAP.getStage()); - jobTask.setTaskName(context.getTaskName()); - jobTask.setLeaf(StatusEnum.YES.getStatus()); - jobTask.setParentId(Objects.isNull(context.getParentId()) ? 0L : context.getParentId()); - jobTask.setRetryCount(0); - jobTask.setCreateDt(LocalDateTime.now()); - jobTask.setUpdateDt(LocalDateTime.now()); - jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); - jobTasks.add(jobTask); - } + // 更新父节点的为非叶子节点 + if (Objects.nonNull(parentJobTask)) { + JobTask parentJobTask1 = new JobTask(); + parentJobTask1.setId(context.getParentId()); + parentJobTask1.setLeaf(StatusEnum.NO.getStatus()); + Assert.isTrue(1 == jobTaskMapper.updateById(parentJobTask1), + () -> new SnailJobMapReduceException("更新父节点失败")); + } - batchSaveJobTasks(jobTasks); + return jobTasks; + }); - // 更新父节点的为非叶子节点 - if (Objects.nonNull(parentJobTask)) { - JobTask parentJobTask = new JobTask(); - parentJobTask.setId(context.getParentId()); - parentJobTask.setLeaf(StatusEnum.NO.getStatus()); - Assert.isTrue(1 == jobTaskMapper.updateById(parentJobTask), - () -> new SnailJobMapReduceException("更新父节点失败")); - } - - } - }); - - return jobTasks; } private Pair getClientNodeInfo(JobTaskGenerateContext context) {