feat:(1.2.0-beta1): 1.优化定时任务重试结果更新 2. 去除定时任务结果更新的分布式锁

This commit is contained in:
opensnail 2024-09-05 00:10:04 +08:00
parent 6169a0f544
commit 6ecdfe65eb
4 changed files with 27 additions and 42 deletions

View File

@ -1,22 +0,0 @@
package com.aizuda.snailjob.server.common.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author xiaowoniu
* @date 2024-01-01 22:56:28
* @since 2.6.0
*/
@Getter
@AllArgsConstructor
public enum LogicalConditionEnum {
/**
* 逻辑条件
*/
AND(1, ""),
OR(2, "");
private final Integer code;
private final String desc;
}

View File

@ -68,15 +68,14 @@ public class JobExecutorResultActor extends AbstractActor {
return;
}
// 先尝试完成若已完成则不需要通过获取分布式锁来完成
boolean tryCompleteAndStop = tryCompleteAndStop(result);
if (!tryCompleteAndStop) {
// 存在并发问题
distributedLockHandler.lockWithDisposableAndRetry(() -> {
tryCompleteAndStop(result);
}, MessageFormat.format(KEY, result.getTaskBatchId(),
result.getJobId()), Duration.ofSeconds(1), Duration.ofSeconds(1), 3);
}
// if (!tryCompleteAndStop) {
// // 存在并发问题
// distributedLockHandler.lockWithDisposableAndRetry(() -> {
// tryCompleteAndStop(result);
// }, MessageFormat.format(KEY, result.getTaskBatchId(),
// result.getJobId()), Duration.ofSeconds(1), Duration.ofSeconds(1), 3);
// }
} catch (Exception e) {
SnailJobLog.LOCAL.error(" job executor result exception. [{}]", result, e);
} finally {

View File

@ -31,7 +31,6 @@ import java.util.List;
import java.util.Objects;
import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED;
import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.*;
/**
* @author: opensnail

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.job.task.support.result.job;
import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
@ -25,7 +26,7 @@ import lombok.RequiredArgsConstructor;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
/**
@ -45,12 +46,12 @@ public abstract class AbstractJobExecutorResultHandler implements JobExecutorRes
public void handleResult(final JobExecutorResultContext context) {
List<JobTask> jobTasks = jobTaskMapper.selectList(
new LambdaQueryWrapper<JobTask>()
.select(JobTask::getTaskStatus, JobTask::getMrStage)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
new LambdaQueryWrapper<JobTask>()
.select(JobTask::getTaskStatus, JobTask::getMrStage)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
if (CollUtil.isEmpty(jobTasks) ||
jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) {
jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) {
return;
}
@ -58,7 +59,7 @@ public abstract class AbstractJobExecutorResultHandler implements JobExecutorRes
context.setJobTaskList(jobTasks);
Map<Integer, Long> statusCountMap = jobTasks.stream()
.collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting()));
.collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting()));
long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L);
long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L);
@ -101,13 +102,21 @@ public abstract class AbstractJobExecutorResultHandler implements JobExecutorRes
jobTaskBatch.setId(context.getTaskBatchId());
jobTaskBatch.setTaskBatchStatus(taskBatchStatus);
jobTaskBatch.setUpdateDt(LocalDateTime.now());
if (Objects.nonNull(context.getJobOperationReason())) {
jobTaskBatch.setOperationReason(context.getJobOperationReason());
jobTaskBatch.setOperationReason(
Optional.ofNullable(context.getJobOperationReason()).orElse(JobOperationReasonEnum.NONE.getReason())
);
if (JobTaskBatchStatusEnum.NOT_SUCCESS.contains(taskBatchStatus) && context.isRetry()) {
jobTaskBatchMapper.update(jobTaskBatch,
new LambdaUpdateWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, context.getTaskBatchId()));
return false;
}
return 1 == jobTaskBatchMapper.update(jobTaskBatch,
new LambdaUpdateWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, context.getTaskBatchId())
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE)
new LambdaUpdateWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, context.getTaskBatchId())
.in(!context.isRetry(), JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE)
);
}