fix:(1.2.0-beta2): 复原任务超时检查设置,使用jobTask对象获取相关参数

This commit is contained in:
srzou 2024-09-19 21:43:21 +08:00
parent 8f0e28d4e7
commit 216d26cf2d
3 changed files with 14 additions and 14 deletions

View File

@ -185,7 +185,7 @@ public class JobExecutorActor extends AbstractActor {
if (JobTaskBatchStatusEnum.RUNNING.getStatus() == finalTaskStatus) { if (JobTaskBatchStatusEnum.RUNNING.getStatus() == finalTaskStatus) {
// 运行中的任务需要进行超时检查 // 运行中的任务需要进行超时检查
JobTimerWheel.registerWithJob(() -> new JobTimeoutCheckTask(taskExecute), JobTimerWheel.registerWithJob(() -> new JobTimeoutCheckTask(taskExecute.getTaskBatchId(), job.getId()),
// 加500ms是为了让尽量保证客户端自己先超时中断防止客户端上报成功但是服务端已触发超时中断 // 加500ms是为了让尽量保证客户端自己先超时中断防止客户端上报成功但是服务端已触发超时中断
Duration.ofMillis(DateUtils.toEpochMilli(job.getExecutorTimeout()) + 500)); Duration.ofMillis(DateUtils.toEpochMilli(job.getExecutorTimeout()) + 500));
} }

View File

@ -5,7 +5,6 @@ import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.TimerTask; import com.aizuda.snailjob.server.common.TimerTask;
import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler; import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
@ -32,14 +31,15 @@ import java.util.Objects;
public class JobTimeoutCheckTask implements TimerTask<String> { public class JobTimeoutCheckTask implements TimerTask<String> {
private static final String IDEMPOTENT_KEY_PREFIX = "job_timeout_check_{0}"; private static final String IDEMPOTENT_KEY_PREFIX = "job_timeout_check_{0}";
private final TaskExecuteDTO taskExecuteDTO; private final Long taskBatchId;
private final Long jobId;
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
JobTaskBatchMapper jobTaskBatchMapper = SnailSpringContext.getBean(JobTaskBatchMapper.class); JobTaskBatchMapper jobTaskBatchMapper = SnailSpringContext.getBean(JobTaskBatchMapper.class);
JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectById(taskExecuteDTO.getTaskBatchId()); JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectById(taskBatchId);
if (Objects.isNull(jobTaskBatch)) { if (Objects.isNull(jobTaskBatch)) {
SnailJobLog.LOCAL.error("jobTaskBatch:[{}]不存在", taskExecuteDTO.getTaskBatchId()); SnailJobLog.LOCAL.error("jobTaskBatch:[{}]不存在", taskBatchId);
return; return;
} }
@ -49,9 +49,9 @@ public class JobTimeoutCheckTask implements TimerTask<String> {
} }
JobMapper jobMapper = SnailSpringContext.getBean(JobMapper.class); JobMapper jobMapper = SnailSpringContext.getBean(JobMapper.class);
Job job = jobMapper.selectById(taskExecuteDTO.getJobId()); Job job = jobMapper.selectById(jobId);
if (Objects.isNull(job)) { if (Objects.isNull(job)) {
SnailJobLog.LOCAL.error("job:[{}]不存在", taskExecuteDTO.getJobId()); SnailJobLog.LOCAL.error("job:[{}]不存在", jobId);
return; return;
} }
@ -61,17 +61,17 @@ public class JobTimeoutCheckTask implements TimerTask<String> {
stopJobContext.setJobOperationReason(JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason()); stopJobContext.setJobOperationReason(JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason());
stopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE); stopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE);
stopJobContext.setForceStop(Boolean.TRUE); stopJobContext.setForceStop(Boolean.TRUE);
stopJobContext.setTaskBatchId(taskExecuteDTO.getTaskBatchId()); stopJobContext.setTaskBatchId(taskBatchId);
stopJobContext.setWorkflowNodeId(taskExecuteDTO.getWorkflowNodeId()); stopJobContext.setWorkflowNodeId(jobTaskBatch.getWorkflowNodeId());
stopJobContext.setWorkflowTaskBatchId(taskExecuteDTO.getWorkflowTaskBatchId()); stopJobContext.setWorkflowTaskBatchId(jobTaskBatch.getWorkflowTaskBatchId());
instanceInterrupt.stop(stopJobContext); instanceInterrupt.stop(stopJobContext);
SnailSpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(taskExecuteDTO.getTaskBatchId())); SnailSpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(taskBatchId));
SnailJobLog.LOCAL.info("超时中断.taskBatchId:[{}]", taskExecuteDTO.getTaskBatchId()); SnailJobLog.LOCAL.info("超时中断.taskBatchId:[{}]", taskBatchId);
} }
@Override @Override
public String idempotentKey() { public String idempotentKey() {
return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, taskExecuteDTO.getTaskBatchId()); return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, taskBatchId);
} }
} }

View File

@ -117,7 +117,7 @@ public class JobHandler {
} }
// 运行中的任务需要进行超时检查 // 运行中的任务需要进行超时检查
JobTimerWheel.registerWithJob(() -> new JobTimeoutCheckTask(new TaskExecuteDTO(job.getId(), taskBatchId, workflowTaskBatchId, workflowNodeId, JobTaskExecutorSceneEnum.MANUAL_JOB.getType())), JobTimerWheel.registerWithJob(() -> new JobTimeoutCheckTask(taskBatchId, job.getId()),
// 加500ms是为了让尽量保证客户端自己先超时中断防止客户端上报成功但是服务端已触发超时中断 // 加500ms是为了让尽量保证客户端自己先超时中断防止客户端上报成功但是服务端已触发超时中断
Duration.ofMillis(DateUtils.toEpochMilli(job.getExecutorTimeout()) + 500)); Duration.ofMillis(DateUtils.toEpochMilli(job.getExecutorTimeout()) + 500));