From 6ecdfe65eb433ef31e2e2c25442cd76d0f6c62fd Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Thu, 5 Sep 2024 00:10:04 +0800 Subject: [PATCH] =?UTF-8?q?feat:(1.2.0-beta1):=201.=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1=E9=87=8D=E8=AF=95=E7=BB=93?= =?UTF-8?q?=E6=9E=9C=E6=9B=B4=E6=96=B0=202.=20=E5=8E=BB=E9=99=A4=E5=AE=9A?= =?UTF-8?q?=E6=97=B6=E4=BB=BB=E5=8A=A1=E7=BB=93=E6=9E=9C=E6=9B=B4=E6=96=B0?= =?UTF-8?q?=E7=9A=84=E5=88=86=E5=B8=83=E5=BC=8F=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/enums/LogicalConditionEnum.java | 22 ------------- .../dispatch/JobExecutorResultActor.java | 15 +++++---- .../support/handler/JobTaskBatchHandler.java | 1 - .../job/AbstractJobExecutorResultHandler.java | 31 ++++++++++++------- 4 files changed, 27 insertions(+), 42 deletions(-) delete mode 100644 snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/enums/LogicalConditionEnum.java diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/enums/LogicalConditionEnum.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/enums/LogicalConditionEnum.java deleted file mode 100644 index 533d8c2e..00000000 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/enums/LogicalConditionEnum.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.aizuda.snailjob.server.common.enums; - -import lombok.AllArgsConstructor; -import lombok.Getter; - -/** - * @author xiaowoniu - * @date 2024-01-01 22:56:28 - * @since 2.6.0 - */ -@Getter -@AllArgsConstructor -public enum LogicalConditionEnum { - /** - * 逻辑条件 - */ - AND(1, "并"), - OR(2, "或"); - - private final Integer code; - private final String desc; -} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java index 610d4e58..8918fac1 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java @@ -68,15 +68,14 @@ public class JobExecutorResultActor extends AbstractActor { return; } - // 先尝试完成,若已完成则不需要通过获取分布式锁来完成 boolean tryCompleteAndStop = tryCompleteAndStop(result); - if (!tryCompleteAndStop) { - // 存在并发问题 - distributedLockHandler.lockWithDisposableAndRetry(() -> { - tryCompleteAndStop(result); - }, MessageFormat.format(KEY, result.getTaskBatchId(), - result.getJobId()), Duration.ofSeconds(1), Duration.ofSeconds(1), 3); - } +// if (!tryCompleteAndStop) { +// // 存在并发问题 +// distributedLockHandler.lockWithDisposableAndRetry(() -> { +// tryCompleteAndStop(result); +// }, MessageFormat.format(KEY, result.getTaskBatchId(), +// result.getJobId()), Duration.ofSeconds(1), Duration.ofSeconds(1), 3); +// } } catch (Exception e) { SnailJobLog.LOCAL.error(" job executor result exception. [{}]", result, e); } finally { diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java index 3e1d06f0..9ae6c924 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java @@ -31,7 +31,6 @@ import java.util.List; import java.util.Objects; import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED; -import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.*; /** * @author: opensnail diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/AbstractJobExecutorResultHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/AbstractJobExecutorResultHandler.java index eddf2060..83668f7c 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/AbstractJobExecutorResultHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/AbstractJobExecutorResultHandler.java @@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.job.task.support.result.job; import cn.hutool.core.collection.CollUtil; import com.aizuda.snailjob.common.core.context.SnailSpringContext; +import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum; @@ -25,7 +26,7 @@ import lombok.RequiredArgsConstructor; import java.time.LocalDateTime; import java.util.List; import java.util.Map; -import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; /** @@ -45,12 +46,12 @@ public abstract class AbstractJobExecutorResultHandler implements JobExecutorRes public void handleResult(final JobExecutorResultContext context) { List jobTasks = jobTaskMapper.selectList( - new LambdaQueryWrapper() - .select(JobTask::getTaskStatus, JobTask::getMrStage) - .eq(JobTask::getTaskBatchId, context.getTaskBatchId())); + new LambdaQueryWrapper() + .select(JobTask::getTaskStatus, JobTask::getMrStage) + .eq(JobTask::getTaskBatchId, context.getTaskBatchId())); if (CollUtil.isEmpty(jobTasks) || - jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) { + jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) { return; } @@ -58,7 +59,7 @@ public abstract class AbstractJobExecutorResultHandler implements JobExecutorRes context.setJobTaskList(jobTasks); Map statusCountMap = jobTasks.stream() - .collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting())); + .collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting())); long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L); long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L); @@ -101,13 +102,21 @@ public abstract class AbstractJobExecutorResultHandler implements JobExecutorRes jobTaskBatch.setId(context.getTaskBatchId()); jobTaskBatch.setTaskBatchStatus(taskBatchStatus); jobTaskBatch.setUpdateDt(LocalDateTime.now()); - if (Objects.nonNull(context.getJobOperationReason())) { - jobTaskBatch.setOperationReason(context.getJobOperationReason()); + jobTaskBatch.setOperationReason( + Optional.ofNullable(context.getJobOperationReason()).orElse(JobOperationReasonEnum.NONE.getReason()) + ); + + if (JobTaskBatchStatusEnum.NOT_SUCCESS.contains(taskBatchStatus) && context.isRetry()) { + jobTaskBatchMapper.update(jobTaskBatch, + new LambdaUpdateWrapper() + .eq(JobTaskBatch::getId, context.getTaskBatchId())); + return false; } + return 1 == jobTaskBatchMapper.update(jobTaskBatch, - new LambdaUpdateWrapper() - .eq(JobTaskBatch::getId, context.getTaskBatchId()) - .in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE) + new LambdaUpdateWrapper() + .eq(JobTaskBatch::getId, context.getTaskBatchId()) + .in(!context.isRetry(), JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE) ); }