diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java index 040b11058..d29c68539 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java @@ -185,7 +185,7 @@ public class JobExecutorActor extends AbstractActor { if (JobTaskBatchStatusEnum.RUNNING.getStatus() == finalTaskStatus) { // 运行中的任务,需要进行超时检查 - JobTimerWheel.registerWithJob(() -> new JobTimeoutCheckTask(taskExecute), + JobTimerWheel.registerWithJob(() -> new JobTimeoutCheckTask(taskExecute.getTaskBatchId(), job.getId()), // 加500ms是为了让尽量保证客户端自己先超时中断,防止客户端上报成功但是服务端已触发超时中断 Duration.ofMillis(DateUtils.toEpochMilli(job.getExecutorTimeout()) + 500)); } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimeoutCheckTask.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimeoutCheckTask.java index a83107dfe..dc19523bf 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimeoutCheckTask.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimeoutCheckTask.java @@ -5,7 +5,6 @@ import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.TimerTask; -import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler; import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; @@ -32,14 +31,15 @@ import java.util.Objects; public class JobTimeoutCheckTask implements TimerTask { private static final String IDEMPOTENT_KEY_PREFIX = "job_timeout_check_{0}"; - private final TaskExecuteDTO taskExecuteDTO; + private final Long taskBatchId; + private final Long jobId; @Override public void run(Timeout timeout) throws Exception { JobTaskBatchMapper jobTaskBatchMapper = SnailSpringContext.getBean(JobTaskBatchMapper.class); - JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectById(taskExecuteDTO.getTaskBatchId()); + JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectById(taskBatchId); if (Objects.isNull(jobTaskBatch)) { - SnailJobLog.LOCAL.error("jobTaskBatch:[{}]不存在", taskExecuteDTO.getTaskBatchId()); + SnailJobLog.LOCAL.error("jobTaskBatch:[{}]不存在", taskBatchId); return; } @@ -49,9 +49,9 @@ public class JobTimeoutCheckTask implements TimerTask { } JobMapper jobMapper = SnailSpringContext.getBean(JobMapper.class); - Job job = jobMapper.selectById(taskExecuteDTO.getJobId()); + Job job = jobMapper.selectById(jobId); if (Objects.isNull(job)) { - SnailJobLog.LOCAL.error("job:[{}]不存在", taskExecuteDTO.getJobId()); + SnailJobLog.LOCAL.error("job:[{}]不存在", jobId); return; } @@ -61,17 +61,17 @@ public class JobTimeoutCheckTask implements TimerTask { stopJobContext.setJobOperationReason(JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason()); stopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE); stopJobContext.setForceStop(Boolean.TRUE); - stopJobContext.setTaskBatchId(taskExecuteDTO.getTaskBatchId()); - stopJobContext.setWorkflowNodeId(taskExecuteDTO.getWorkflowNodeId()); - stopJobContext.setWorkflowTaskBatchId(taskExecuteDTO.getWorkflowTaskBatchId()); + stopJobContext.setTaskBatchId(taskBatchId); + stopJobContext.setWorkflowNodeId(jobTaskBatch.getWorkflowNodeId()); + stopJobContext.setWorkflowTaskBatchId(jobTaskBatch.getWorkflowTaskBatchId()); instanceInterrupt.stop(stopJobContext); - SnailSpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(taskExecuteDTO.getTaskBatchId())); - SnailJobLog.LOCAL.info("超时中断.taskBatchId:[{}]", taskExecuteDTO.getTaskBatchId()); + SnailSpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(taskBatchId)); + SnailJobLog.LOCAL.info("超时中断.taskBatchId:[{}]", taskBatchId); } @Override public String idempotentKey() { - return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, taskExecuteDTO.getTaskBatchId()); + return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, taskBatchId); } } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/handler/JobHandler.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/handler/JobHandler.java index b461e5b80..4c6e0743f 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/handler/JobHandler.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/handler/JobHandler.java @@ -117,7 +117,7 @@ public class JobHandler { } // 运行中的任务,需要进行超时检查 - JobTimerWheel.registerWithJob(() -> new JobTimeoutCheckTask(new TaskExecuteDTO(job.getId(), taskBatchId, workflowTaskBatchId, workflowNodeId, JobTaskExecutorSceneEnum.MANUAL_JOB.getType())), + JobTimerWheel.registerWithJob(() -> new JobTimeoutCheckTask(taskBatchId, job.getId()), // 加500ms是为了让尽量保证客户端自己先超时中断,防止客户端上报成功但是服务端已触发超时中断 Duration.ofMillis(DateUtils.toEpochMilli(job.getExecutorTimeout()) + 500));