fix:(1.2.0-beta2): 修复工作流中定时任务超时,未更新工作流状态

This commit is contained in:
srzou 2024-09-19 10:37:42 +08:00
parent 7325636d8f
commit 8f0e28d4e7
6 changed files with 34 additions and 12 deletions

View File

@ -22,4 +22,15 @@ public class TaskExecuteDTO {
*/
private Integer taskExecutorScene;
public TaskExecuteDTO() {
}
public TaskExecuteDTO(Long jobId, Long taskBatchId, Long workflowTaskBatchId, Long workflowNodeId, Integer taskExecutorScene) {
this.jobId = jobId;
this.taskBatchId = taskBatchId;
this.workflowTaskBatchId = workflowTaskBatchId;
this.workflowNodeId = workflowNodeId;
this.taskExecutorScene = taskExecutorScene;
}
}

View File

@ -185,7 +185,7 @@ public class JobExecutorActor extends AbstractActor {
if (JobTaskBatchStatusEnum.RUNNING.getStatus() == finalTaskStatus) {
// 运行中的任务需要进行超时检查
JobTimerWheel.registerWithJob(() -> new JobTimeoutCheckTask(taskExecute.getTaskBatchId(), job.getId()),
JobTimerWheel.registerWithJob(() -> new JobTimeoutCheckTask(taskExecute),
// 加500ms是为了让尽量保证客户端自己先超时中断防止客户端上报成功但是服务端已触发超时中断
Duration.ofMillis(DateUtils.toEpochMilli(job.getExecutorTimeout()) + 500));
}

View File

@ -68,6 +68,8 @@ public abstract class AbstractJobTaskStopHandler implements JobTaskStopHandler,
jobExecutorResultDTO.setMessage("任务停止成功");
jobExecutorResultDTO.setJobOperationReason(context.getJobOperationReason());
jobExecutorResultDTO.setTaskType(getTaskType().getType());
jobExecutorResultDTO.setWorkflowNodeId(context.getWorkflowNodeId());
jobExecutorResultDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
ActorRef actorRef = ActorGenerator.jobTaskExecutorResultActor();
actorRef.tell(jobExecutorResultDTO, actorRef);
}

View File

@ -49,6 +49,13 @@ public class TaskStopJobContext {
private boolean forceStop;
/**
* 工作流任务批次id
*/
private Long workflowTaskBatchId;
private Long workflowNodeId;
protected List<JobTask> getJobTasks() {
return jobTasks;
}

View File

@ -5,6 +5,7 @@ import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.TimerTask;
import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
@ -31,15 +32,14 @@ import java.util.Objects;
public class JobTimeoutCheckTask implements TimerTask<String> {
private static final String IDEMPOTENT_KEY_PREFIX = "job_timeout_check_{0}";
private final Long taskBatchId;
private final Long jobId;
private final TaskExecuteDTO taskExecuteDTO;
@Override
public void run(Timeout timeout) throws Exception {
JobTaskBatchMapper jobTaskBatchMapper = SnailSpringContext.getBean(JobTaskBatchMapper.class);
JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectById(taskBatchId);
JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectById(taskExecuteDTO.getTaskBatchId());
if (Objects.isNull(jobTaskBatch)) {
SnailJobLog.LOCAL.error("jobTaskBatch:[{}]不存在", taskBatchId);
SnailJobLog.LOCAL.error("jobTaskBatch:[{}]不存在", taskExecuteDTO.getTaskBatchId());
return;
}
@ -49,9 +49,9 @@ public class JobTimeoutCheckTask implements TimerTask<String> {
}
JobMapper jobMapper = SnailSpringContext.getBean(JobMapper.class);
Job job = jobMapper.selectById(jobId);
Job job = jobMapper.selectById(taskExecuteDTO.getJobId());
if (Objects.isNull(job)) {
SnailJobLog.LOCAL.error("job:[{}]不存在", jobId);
SnailJobLog.LOCAL.error("job:[{}]不存在", taskExecuteDTO.getJobId());
return;
}
@ -61,15 +61,17 @@ public class JobTimeoutCheckTask implements TimerTask<String> {
stopJobContext.setJobOperationReason(JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason());
stopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE);
stopJobContext.setForceStop(Boolean.TRUE);
stopJobContext.setTaskBatchId(taskBatchId);
stopJobContext.setTaskBatchId(taskExecuteDTO.getTaskBatchId());
stopJobContext.setWorkflowNodeId(taskExecuteDTO.getWorkflowNodeId());
stopJobContext.setWorkflowTaskBatchId(taskExecuteDTO.getWorkflowTaskBatchId());
instanceInterrupt.stop(stopJobContext);
SnailSpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(taskBatchId));
SnailJobLog.LOCAL.info("超时中断.taskBatchId:[{}]", taskBatchId);
SnailSpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(taskExecuteDTO.getTaskBatchId()));
SnailJobLog.LOCAL.info("超时中断.taskBatchId:[{}]", taskExecuteDTO.getTaskBatchId());
}
@Override
public String idempotentKey() {
return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, taskBatchId);
return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, taskExecuteDTO.getTaskBatchId());
}
}

View File

@ -117,7 +117,7 @@ public class JobHandler {
}
// 运行中的任务需要进行超时检查
JobTimerWheel.registerWithJob(() -> new JobTimeoutCheckTask(taskBatchId, job.getId()),
JobTimerWheel.registerWithJob(() -> new JobTimeoutCheckTask(new TaskExecuteDTO(job.getId(), taskBatchId, workflowTaskBatchId, workflowNodeId, JobTaskExecutorSceneEnum.MANUAL_JOB.getType())),
// 加500ms是为了让尽量保证客户端自己先超时中断防止客户端上报成功但是服务端已触发超时中断
Duration.ofMillis(DateUtils.toEpochMilli(job.getExecutorTimeout()) + 500));