feat: 2.4.0

1. 修复超时任务执行失败失败
This commit is contained in:
byteblogs168 2023-10-27 13:48:38 +08:00
parent 2abdf57f8e
commit bdf059a95c
61 changed files with 146 additions and 90 deletions

View File

@ -19,6 +19,9 @@ public enum JobOperationReasonEnum {
EXECUTE_TIMEOUT(1, "任务执行超时"),
NOT_CLIENT(2, "无客户端节点"),
JOB_CLOSED(3, "JOB已关闭"),
JOB_DISCARD(4, "任务丢弃"),
JOB_OVERLAY(5, "任务被覆盖"),
NOT_EXECUTE_TASK(6, "无可执行任务项"),
;
private final int reason;

View File

@ -27,7 +27,7 @@ public class JobExecutorResultDTO {
private Object result;
private JobOperationReasonEnum jobOperationReasonEnum;
private Integer jobOperationReason;
}

View File

@ -50,6 +50,7 @@ public interface JobTaskConverter {
TaskStopJobContext toStopJobContext(BlockStrategies.BlockStrategyContext context);
TaskStopJobContext toStopJobContext(JobExecutorResultDTO context);
TaskStopJobContext toStopJobContext(JobTaskPrepareDTO context);
JobLogMessage toJobLogMessage(JobLogDTO jobLogDTO);

View File

@ -64,7 +64,7 @@ public class JobExecutorResultActor extends AbstractActor {
()-> new EasyRetryServerException("更新任务实例失败"));
// 更新批次上的状态
boolean complete = jobTaskBatchHandler.complete(result.getTaskBatchId(), result.getJobOperationReasonEnum());
boolean complete = jobTaskBatchHandler.complete(result.getTaskBatchId(), result.getJobOperationReason());
if (complete) {
// 尝试停止任务
// 若是集群任务则客户端会主动关闭

View File

@ -17,6 +17,7 @@ import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.time.ZoneId;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
@ -43,12 +44,12 @@ public class JobTaskBatchGenerator {
if (CollectionUtils.isEmpty(CacheRegisterTable.getServerNodeSet(context.getGroupName()))) {
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
jobTaskBatch.setOperationReason(JobOperationReasonEnum.NOT_CLIENT.getReason());
Assert.isTrue(1 == jobTaskBatchMapper.insert(jobTaskBatch), () -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", context.getJobId()));
return;
} else {
// 生成一个新的任务
jobTaskBatch.setTaskBatchStatus(Optional.ofNullable(context.getTaskBatchStatus()).orElse(JobTaskBatchStatusEnum.WAITING.getStatus()));
jobTaskBatch.setOperationReason(context.getOperationReason());
}
// 生成一个新的任务
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.WAITING.getStatus());
Assert.isTrue(1 == jobTaskBatchMapper.insert(jobTaskBatch), () -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", context.getJobId()));
// 进入时间轮

View File

@ -22,15 +22,20 @@ public class JobTaskBatchGeneratorContext {
*/
private Long jobId;
/**
* 任务类型
*/
private Integer taskInstanceType;
/**
* 下次触发时间
*/
private LocalDateTime nextTriggerAt;
/**
* 操作原因
*/
private Integer operationReason;
/**
* 任务批次状态
*/
private Integer taskBatchStatus;
}

View File

@ -18,7 +18,9 @@ import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* @author: www.byteblogs.com
@ -32,21 +34,37 @@ public class JobTaskBatchHandler {
@Autowired
private JobTaskBatchMapper jobTaskBatchMapper;
public boolean complete(Long taskBatchId, JobOperationReasonEnum jobOperationReasonEnum) {
public boolean complete(Long taskBatchId, Integer jobOperationReason) {
List<JobTask> jobTasks = jobTaskMapper.selectList(
new LambdaQueryWrapper<JobTask>().select(JobTask::getTaskStatus)
.eq(JobTask::getTaskBatchId, taskBatchId));
if (CollectionUtils.isEmpty(jobTasks) || jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) {
return false;
}
long failCount = jobTasks.stream().filter(jobTask -> jobTask.getTaskStatus() == JobTaskBatchStatusEnum.FAIL.getStatus()).count();
long stopCount = jobTasks.stream().filter(jobTask -> jobTask.getTaskStatus() == JobTaskBatchStatusEnum.STOP.getStatus()).count();
JobTaskBatch jobTaskBatch = new JobTaskBatch();
jobTaskBatch.setId(taskBatchId);
if (CollectionUtils.isEmpty(jobTasks)) {
// todo 此为异常数据没有生成对应的任务项(task) 直接更新为失败
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.FAIL.getStatus());
jobTaskBatch.setOperationReason(JobOperationReasonEnum.NOT_EXECUTE_TASK.getReason());
jobTaskBatchMapper.update(jobTaskBatch,
new LambdaUpdateWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, taskBatchId)
.in(JobTaskBatch::getTaskBatchStatus, JobTaskStatusEnum.NOT_COMPLETE));
return false;
}
if (jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) {
return false;
}
Map<Integer, Long> statusCountMap = jobTasks.stream()
.collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting()));
long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L);
long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L);
if (failCount > 0) {
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.FAIL.getStatus());
} else if (stopCount > 0) {
@ -55,8 +73,8 @@ public class JobTaskBatchHandler {
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
}
if (Objects.nonNull(jobOperationReasonEnum)) {
jobTaskBatch.setOperationReason(jobOperationReasonEnum.getReason());
if (Objects.nonNull(jobOperationReason)) {
jobTaskBatch.setOperationReason(jobOperationReason);
}
return 1 == jobTaskBatchMapper.update(jobTaskBatch,

View File

@ -5,6 +5,9 @@ import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.job.task.support.BlockStrategy;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler;
import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory;
import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext;
import com.aizuda.easy.retry.server.job.task.support.strategy.BlockStrategies;
import com.aizuda.easy.retry.server.job.task.support.strategy.BlockStrategies.BlockStrategyEnum;
import com.aizuda.easy.retry.server.job.task.support.handler.JobTaskBatchHandler;
@ -40,27 +43,31 @@ public class RunningJobPrepareHandler extends AbstractJobPrePareHandler {
// 若存在所有的任务都是完成但是批次上的状态为运行中则是并发导致的未把批次状态变成为终态此处做一次兜底处理
int blockStrategy = prepare.getBlockStrategy();
JobOperationReasonEnum jobOperationReasonEnum = JobOperationReasonEnum.NONE;
if (jobTaskBatchHandler.complete(prepare.getTaskBatchId(), jobOperationReasonEnum)) {
if (jobTaskBatchHandler.complete(prepare.getTaskBatchId(), jobOperationReasonEnum.getReason())) {
blockStrategy = BlockStrategyEnum.CONCURRENCY.getBlockStrategy();
} else {
// 计算超时时间
long delay = System.currentTimeMillis() - prepare.getExecutionAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
// 计算超时时间到达超时时间覆盖任务
// 计算超时时间到达超时时间中断任务
if (delay > prepare.getExecutorTimeout() * 1000) {
log.info("任务执行超时.taskBatchId:[{}] delay:[{}] executorTimeout:[{}]", prepare.getTaskBatchId(), delay, prepare.getExecutorTimeout() * 1000);
blockStrategy = BlockStrategies.BlockStrategyEnum.OVERLAY.getBlockStrategy();
jobOperationReasonEnum = JobOperationReasonEnum.EXECUTE_TIMEOUT;
// 超时停止任务
JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(prepare.getTaskType());
TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(prepare);
stopJobContext.setJobOperationReason(JobOperationReasonEnum.EXECUTE_TIMEOUT.getReason());
stopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE);
instanceInterrupt.stop(stopJobContext);
}
}
// 仅是超时检测的不执行阻塞策略
if (prepare.isOnlyTimeoutCheck()) {
return;
}
BlockStrategies.BlockStrategyContext blockStrategyContext = JobTaskConverter.INSTANCE.toBlockStrategyContext(prepare);
blockStrategyContext.setOperationReason(jobOperationReasonEnum);
blockStrategyContext.setOperationReason(jobOperationReasonEnum.getReason());
BlockStrategy blockStrategyInterface = BlockStrategies.BlockStrategyEnum.getBlockStrategy(blockStrategy);
blockStrategyInterface.block(blockStrategyContext);

View File

@ -50,7 +50,7 @@ public abstract class AbstractJobTaskStopHandler implements JobTaskStopHandler,
JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(jobTask);
jobExecutorResultDTO.setTaskStatus(JobTaskStatusEnum.STOP.getStatus());
jobExecutorResultDTO.setMessage("任务停止成功");
jobExecutorResultDTO.setJobOperationReasonEnum(context.getJobOperationReasonEnum());
jobExecutorResultDTO.setJobOperationReason(context.getJobOperationReason());
jobExecutorResultDTO.setTaskType(getTaskType().getType());
ActorRef actorRef = ActorGenerator.jobTaskExecutorResultActor();
actorRef.tell(jobExecutorResultDTO, actorRef);

View File

@ -42,7 +42,7 @@ public class TaskStopJobContext {
private List<JobTask> jobTasks;
private JobOperationReasonEnum jobOperationReasonEnum;
private Integer jobOperationReason;
private boolean forceStop;

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support.strategy;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.support.BlockStrategy;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
@ -16,6 +17,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.Optional;
/**
* @author: www.byteblogs.com
@ -65,7 +67,7 @@ public class BlockStrategies {
*/
private LocalDateTime nextTriggerAt;
private JobOperationReasonEnum operationReason;
private Integer operationReason;
}
@ -74,6 +76,13 @@ public class BlockStrategies {
@Override
public void block(final BlockStrategyContext context) {
log.warn("阻塞策略为丢弃此次执行. taskBatchId:[{}]", context.getTaskBatchId());
// 重新生成任务
JobTaskBatchGenerator jobTaskBatchGenerator = SpringContext.getBeanByType(JobTaskBatchGenerator.class);
JobTaskBatchGeneratorContext jobTaskBatchGeneratorContext = JobTaskConverter.INSTANCE.toJobTaskGeneratorContext(context);
jobTaskBatchGeneratorContext.setTaskBatchStatus(JobTaskBatchStatusEnum.FAIL.getStatus());
jobTaskBatchGeneratorContext.setOperationReason(JobOperationReasonEnum.JOB_OVERLAY.getReason());
jobTaskBatchGenerator.generateJobTaskBatch(jobTaskBatchGeneratorContext);
}
}
@ -91,7 +100,7 @@ public class BlockStrategies {
// 停止任务
JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(context.taskType);
TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(context);
stopJobContext.setJobOperationReasonEnum(context.getOperationReason());
stopJobContext.setJobOperationReason(Optional.ofNullable(context.getOperationReason()).orElse(JobOperationReasonEnum.JOB_DISCARD.getReason()));
stopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE);
instanceInterrupt.stop(stopJobContext);

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -112,6 +112,18 @@ const enums = {
'3': {
'name': '任务已关闭',
'color': '#087da1'
},
'4': {
'name': '任务丢弃',
'color': '#3a2f81'
},
'5': {
'name': '任务被覆盖',
'color': '#c2238a'
},
'6': {
'name': '无可执行任务项',
'color': '#23c28a'
}
},
taskStatus: {