From 4b0d6673ccd8d568f2dcd56d266e9965a60db60e Mon Sep 17 00:00:00 2001 From: srzou Date: Tue, 27 Aug 2024 15:23:58 +0800 Subject: [PATCH] =?UTF-8?q?fix(sj=5F1.2.0-beta1):=E4=BF=AE=E5=A4=8DMap/Map?= =?UTF-8?q?Reduce=E9=87=8D=E8=AF=95=E9=97=AE=E9=A2=98,=E5=8F=8A=E6=89=8B?= =?UTF-8?q?=E5=8A=A8=E6=9A=82=E5=81=9C=E7=9B=B8=E5=85=B3=E9=80=BB=E8=BE=91?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../support/handler/WorkflowBatchHandler.java | 18 +++++++++--------- .../stop/AbstractJobTaskStopHandler.java | 3 +++ .../server/web/service/handler/JobHandler.java | 4 +++- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java index e0c1aac63..ad105cab7 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java @@ -183,6 +183,15 @@ public class WorkflowBatchHandler { operationReason = JobOperationReasonEnum.JOB_OVERLAY.getReason(); } + // 关闭已经触发的任务 + List jobTaskBatches = jobTaskBatchMapper.selectList(new LambdaQueryWrapper() + .in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE) + .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId)); + + if (CollUtil.isEmpty(jobTaskBatches)) { + return; + } + WorkflowTaskBatch workflowTaskBatch = new WorkflowTaskBatch(); workflowTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus()); workflowTaskBatch.setOperationReason(operationReason); @@ -193,15 +202,6 @@ public class WorkflowBatchHandler { workflowTaskBatchId)); SnailSpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(workflowTaskBatchId)); - // 关闭已经触发的任务 - List jobTaskBatches = jobTaskBatchMapper.selectList(new LambdaQueryWrapper() - .in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE) - .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId)); - - if (CollUtil.isEmpty(jobTaskBatches)) { - return; - } - List jobs = jobMapper.selectBatchIds(StreamUtils.toSet(jobTaskBatches, JobTaskBatch::getJobId)); Map jobMap = StreamUtils.toIdentityMap(jobs, Job::getId); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/AbstractJobTaskStopHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/AbstractJobTaskStopHandler.java index 10c199b63..6645876d6 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/AbstractJobTaskStopHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/AbstractJobTaskStopHandler.java @@ -60,6 +60,9 @@ public abstract class AbstractJobTaskStopHandler implements JobTaskStopHandler, if (context.isNeedUpdateTaskStatus()) { for (final JobTask jobTask : jobTasks) { + if (jobTask.getTaskStatus() == JobTaskStatusEnum.SUCCESS.getStatus()){ + continue; + } JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(jobTask); jobExecutorResultDTO.setTaskStatus(JobTaskStatusEnum.STOP.getStatus()); jobExecutorResultDTO.setMessage("任务停止成功"); diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/handler/JobHandler.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/handler/JobHandler.java index 4f5656b26..4c6e0743f 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/handler/JobHandler.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/handler/JobHandler.java @@ -93,7 +93,9 @@ public class JobHandler { String wfContext = getWfContext(workflowTaskBatchId); for (JobTask jobTask : jobTasks) { - if (jobTask.getTaskStatus() == JobTaskStatusEnum.RUNNING.getStatus()) { + // 增加Map及MapReduce重试任务的状态判断,防止重复执行 + if (jobTask.getTaskStatus() == JobTaskStatusEnum.RUNNING.getStatus() + || jobTask.getTaskStatus() == JobTaskStatusEnum.SUCCESS.getStatus()) { continue; }