diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/TaskExecuteDTO.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/TaskExecuteDTO.java index d5969c466..8bc24b85f 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/TaskExecuteDTO.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/TaskExecuteDTO.java @@ -22,4 +22,15 @@ public class TaskExecuteDTO { */ private Integer taskExecutorScene; + public TaskExecuteDTO() { + } + + public TaskExecuteDTO(Long jobId, Long taskBatchId, Long workflowTaskBatchId, Long workflowNodeId, Integer taskExecutorScene) { + this.jobId = jobId; + this.taskBatchId = taskBatchId; + this.workflowTaskBatchId = workflowTaskBatchId; + this.workflowNodeId = workflowNodeId; + this.taskExecutorScene = taskExecutorScene; + } + } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java index d29c68539..040b11058 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java @@ -185,7 +185,7 @@ public class JobExecutorActor extends AbstractActor { if (JobTaskBatchStatusEnum.RUNNING.getStatus() == finalTaskStatus) { // 运行中的任务,需要进行超时检查 - JobTimerWheel.registerWithJob(() -> new JobTimeoutCheckTask(taskExecute.getTaskBatchId(), job.getId()), + JobTimerWheel.registerWithJob(() -> new JobTimeoutCheckTask(taskExecute), // 加500ms是为了让尽量保证客户端自己先超时中断,防止客户端上报成功但是服务端已触发超时中断 Duration.ofMillis(DateUtils.toEpochMilli(job.getExecutorTimeout()) + 500)); } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/AbstractJobTaskStopHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/AbstractJobTaskStopHandler.java index 6645876d6..0b3d6e18b 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/AbstractJobTaskStopHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/AbstractJobTaskStopHandler.java @@ -68,6 +68,8 @@ public abstract class AbstractJobTaskStopHandler implements JobTaskStopHandler, jobExecutorResultDTO.setMessage("任务停止成功"); jobExecutorResultDTO.setJobOperationReason(context.getJobOperationReason()); jobExecutorResultDTO.setTaskType(getTaskType().getType()); + jobExecutorResultDTO.setWorkflowNodeId(context.getWorkflowNodeId()); + jobExecutorResultDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); ActorRef actorRef = ActorGenerator.jobTaskExecutorResultActor(); actorRef.tell(jobExecutorResultDTO, actorRef); } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/TaskStopJobContext.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/TaskStopJobContext.java index 7593e745a..339e39254 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/TaskStopJobContext.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/TaskStopJobContext.java @@ -49,6 +49,13 @@ public class TaskStopJobContext { private boolean forceStop; + /** + * 工作流任务批次id + */ + private Long workflowTaskBatchId; + + private Long workflowNodeId; + protected List getJobTasks() { return jobTasks; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimeoutCheckTask.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimeoutCheckTask.java index a7d49a9dc..a83107dfe 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimeoutCheckTask.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimeoutCheckTask.java @@ -5,6 +5,7 @@ import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.TimerTask; +import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler; import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; @@ -31,15 +32,14 @@ import java.util.Objects; public class JobTimeoutCheckTask implements TimerTask { private static final String IDEMPOTENT_KEY_PREFIX = "job_timeout_check_{0}"; - private final Long taskBatchId; - private final Long jobId; + private final TaskExecuteDTO taskExecuteDTO; @Override public void run(Timeout timeout) throws Exception { JobTaskBatchMapper jobTaskBatchMapper = SnailSpringContext.getBean(JobTaskBatchMapper.class); - JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectById(taskBatchId); + JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectById(taskExecuteDTO.getTaskBatchId()); if (Objects.isNull(jobTaskBatch)) { - SnailJobLog.LOCAL.error("jobTaskBatch:[{}]不存在", taskBatchId); + SnailJobLog.LOCAL.error("jobTaskBatch:[{}]不存在", taskExecuteDTO.getTaskBatchId()); return; } @@ -49,9 +49,9 @@ public class JobTimeoutCheckTask implements TimerTask { } JobMapper jobMapper = SnailSpringContext.getBean(JobMapper.class); - Job job = jobMapper.selectById(jobId); + Job job = jobMapper.selectById(taskExecuteDTO.getJobId()); if (Objects.isNull(job)) { - SnailJobLog.LOCAL.error("job:[{}]不存在", jobId); + SnailJobLog.LOCAL.error("job:[{}]不存在", taskExecuteDTO.getJobId()); return; } @@ -61,15 +61,17 @@ public class JobTimeoutCheckTask implements TimerTask { stopJobContext.setJobOperationReason(JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason()); stopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE); stopJobContext.setForceStop(Boolean.TRUE); - stopJobContext.setTaskBatchId(taskBatchId); + stopJobContext.setTaskBatchId(taskExecuteDTO.getTaskBatchId()); + stopJobContext.setWorkflowNodeId(taskExecuteDTO.getWorkflowNodeId()); + stopJobContext.setWorkflowTaskBatchId(taskExecuteDTO.getWorkflowTaskBatchId()); instanceInterrupt.stop(stopJobContext); - SnailSpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(taskBatchId)); - SnailJobLog.LOCAL.info("超时中断.taskBatchId:[{}]", taskBatchId); + SnailSpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(taskExecuteDTO.getTaskBatchId())); + SnailJobLog.LOCAL.info("超时中断.taskBatchId:[{}]", taskExecuteDTO.getTaskBatchId()); } @Override public String idempotentKey() { - return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, taskBatchId); + return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, taskExecuteDTO.getTaskBatchId()); } } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/handler/JobHandler.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/handler/JobHandler.java index 4c6e0743f..b461e5b80 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/handler/JobHandler.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/handler/JobHandler.java @@ -117,7 +117,7 @@ public class JobHandler { } // 运行中的任务,需要进行超时检查 - JobTimerWheel.registerWithJob(() -> new JobTimeoutCheckTask(taskBatchId, job.getId()), + JobTimerWheel.registerWithJob(() -> new JobTimeoutCheckTask(new TaskExecuteDTO(job.getId(), taskBatchId, workflowTaskBatchId, workflowNodeId, JobTaskExecutorSceneEnum.MANUAL_JOB.getType())), // 加500ms是为了让尽量保证客户端自己先超时中断,防止客户端上报成功但是服务端已触发超时中断 Duration.ofMillis(DateUtils.toEpochMilli(job.getExecutorTimeout()) + 500));