diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/ConcurrencyWorkflowBlockStrategy.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/ConcurrencyWorkflowBlockStrategy.java index a02ed24cc..04f12a458 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/ConcurrencyWorkflowBlockStrategy.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/ConcurrencyWorkflowBlockStrategy.java @@ -25,11 +25,11 @@ public class ConcurrencyWorkflowBlockStrategy extends AbstractWorkflowBlockStrat @Override protected void doBlock(final WorkflowBlockStrategyContext workflowBlockStrategyContext) { - try { - workflowBatchHandler.checkWorkflowExecutor(workflowBlockStrategyContext.getWorkflowTaskBatchId(), null); - } catch (IOException e) { - throw new SnailJobServerException("校验工作流失败", e); - } +// try { +// workflowBatchHandler.recoveryWorkflowExecutor(workflowBlockStrategyContext.getWorkflowTaskBatchId(), null); +// } catch (IOException e) { +// throw new SnailJobServerException("校验工作流失败", e); +// } WorkflowTaskBatchGeneratorContext workflowTaskBatchGeneratorContext = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatchGeneratorContext(workflowBlockStrategyContext); workflowBatchGenerator.generateJobTaskBatch(workflowTaskBatchGeneratorContext); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/DiscardWorkflowBlockStrategy.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/DiscardWorkflowBlockStrategy.java index b7f1f09fd..6cc5c33e3 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/DiscardWorkflowBlockStrategy.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/DiscardWorkflowBlockStrategy.java @@ -27,11 +27,11 @@ public class DiscardWorkflowBlockStrategy extends AbstractWorkflowBlockStrategy @Override protected void doBlock(final WorkflowBlockStrategyContext workflowBlockStrategyContext) { - try { - workflowBatchHandler.checkWorkflowExecutor(workflowBlockStrategyContext.getWorkflowTaskBatchId(), null); - } catch (IOException e) { - throw new SnailJobServerException("校验工作流失败", e); - } +// try { +// workflowBatchHandler.recoveryWorkflowExecutor(workflowBlockStrategyContext.getWorkflowTaskBatchId(), null); +// } catch (IOException e) { +// throw new SnailJobServerException("校验工作流失败", e); +// } // 生成状态为取消的工作流批次 WorkflowTaskBatchGeneratorContext workflowTaskBatchGeneratorContext = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatchGeneratorContext(workflowBlockStrategyContext); workflowTaskBatchGeneratorContext.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus()); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/RecoveryWorkflowBlockStrategy.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/RecoveryWorkflowBlockStrategy.java new file mode 100644 index 000000000..eed4835f7 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/RecoveryWorkflowBlockStrategy.java @@ -0,0 +1,35 @@ +package com.aizuda.snailjob.server.job.task.support.block.workflow; + +import com.aizuda.snailjob.server.common.exception.SnailJobServerException; +import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum; +import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + * @author: opensnail + * @date : 2024-06-26 + * @since : sj_1.1.0 + */ +@Component +@RequiredArgsConstructor +public class RecoveryWorkflowBlockStrategy extends AbstractWorkflowBlockStrategy { + private final WorkflowBatchHandler workflowBatchHandler; + + @Override + protected void doBlock(final WorkflowBlockStrategyContext workflowBlockStrategyContext) { + + try { + workflowBatchHandler.recoveryWorkflowExecutor(workflowBlockStrategyContext.getWorkflowTaskBatchId(), null); + } catch (IOException e) { + throw new SnailJobServerException("校验工作流失败", e); + } + } + + @Override + protected BlockStrategyEnum blockStrategyEnum() { + return BlockStrategyEnum.RECOVERY; + } +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java index 7eb22eb2b..b6b635c31 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java @@ -222,7 +222,14 @@ public class WorkflowBatchHandler { } } - public void checkWorkflowExecutor(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) throws IOException { + /** + * 重新触发未执行成功的工作流节点 + * + * @param workflowTaskBatchId 工作流批次 + * @param workflowTaskBatch 工作流批次信息(若为null, 则会通过workflowTaskBatchId查询) + * @throws IOException + */ + public void recoveryWorkflowExecutor(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) throws IOException { workflowTaskBatch = Optional.ofNullable(workflowTaskBatch) .orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId)); Assert.notNull(workflowTaskBatch, () -> new SnailJobServerException("任务不存在")); @@ -241,10 +248,10 @@ public class WorkflowBatchHandler { Map jobTaskBatchMap = StreamUtils.toIdentityMap(jobTaskBatches, JobTaskBatch::getWorkflowNodeId); - checkWorkflowExecutor(SystemConstants.ROOT, workflowTaskBatchId, graph, jobTaskBatchMap); + recoveryWorkflowExecutor(SystemConstants.ROOT, workflowTaskBatchId, graph, jobTaskBatchMap); } - private void checkWorkflowExecutor(Long parentId, Long workflowTaskBatchId, MutableGraph graph, Map jobTaskBatchMap) { + private void recoveryWorkflowExecutor(Long parentId, Long workflowTaskBatchId, MutableGraph graph, Map jobTaskBatchMap) { // 判定条件节点是否已经执行完成 JobTaskBatch parentJobTaskBatch = jobTaskBatchMap.get(parentId); @@ -289,7 +296,7 @@ public class WorkflowBatchHandler { } // 已经是终态的需要递归遍历后继节点是否正常执行 - checkWorkflowExecutor(successor, workflowTaskBatchId, graph, jobTaskBatchMap); + recoveryWorkflowExecutor(successor, workflowTaskBatchId, graph, jobTaskBatchMap); } }