From 1162271bdc66cd5d3e9543a2220b3c716089a430 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Wed, 26 Jun 2024 13:50:14 +0800 Subject: [PATCH] =?UTF-8?q?feat(sj=5F1.1.0-beta2):=20=E5=B7=A5=E4=BD=9C?= =?UTF-8?q?=E6=B5=81=E6=B7=BB=E5=8A=A0RECOVERY=E9=98=BB=E5=A1=9E=E7=AD=96?= =?UTF-8?q?=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ConcurrencyWorkflowBlockStrategy.java | 10 +++--- .../DiscardWorkflowBlockStrategy.java | 10 +++--- .../RecoveryWorkflowBlockStrategy.java | 35 +++++++++++++++++++ .../support/handler/WorkflowBatchHandler.java | 15 +++++--- 4 files changed, 56 insertions(+), 14 deletions(-) create mode 100644 snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/RecoveryWorkflowBlockStrategy.java diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/ConcurrencyWorkflowBlockStrategy.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/ConcurrencyWorkflowBlockStrategy.java index a02ed24c..04f12a45 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/ConcurrencyWorkflowBlockStrategy.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/ConcurrencyWorkflowBlockStrategy.java @@ -25,11 +25,11 @@ public class ConcurrencyWorkflowBlockStrategy extends AbstractWorkflowBlockStrat @Override protected void doBlock(final WorkflowBlockStrategyContext workflowBlockStrategyContext) { - try { - workflowBatchHandler.checkWorkflowExecutor(workflowBlockStrategyContext.getWorkflowTaskBatchId(), null); - } catch (IOException e) { - throw new SnailJobServerException("校验工作流失败", e); - } +// try { +// workflowBatchHandler.recoveryWorkflowExecutor(workflowBlockStrategyContext.getWorkflowTaskBatchId(), null); +// } catch (IOException e) { +// throw new SnailJobServerException("校验工作流失败", e); +// } WorkflowTaskBatchGeneratorContext workflowTaskBatchGeneratorContext = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatchGeneratorContext(workflowBlockStrategyContext); workflowBatchGenerator.generateJobTaskBatch(workflowTaskBatchGeneratorContext); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/DiscardWorkflowBlockStrategy.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/DiscardWorkflowBlockStrategy.java index b7f1f09f..6cc5c33e 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/DiscardWorkflowBlockStrategy.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/DiscardWorkflowBlockStrategy.java @@ -27,11 +27,11 @@ public class DiscardWorkflowBlockStrategy extends AbstractWorkflowBlockStrategy @Override protected void doBlock(final WorkflowBlockStrategyContext workflowBlockStrategyContext) { - try { - workflowBatchHandler.checkWorkflowExecutor(workflowBlockStrategyContext.getWorkflowTaskBatchId(), null); - } catch (IOException e) { - throw new SnailJobServerException("校验工作流失败", e); - } +// try { +// workflowBatchHandler.recoveryWorkflowExecutor(workflowBlockStrategyContext.getWorkflowTaskBatchId(), null); +// } catch (IOException e) { +// throw new SnailJobServerException("校验工作流失败", e); +// } // 生成状态为取消的工作流批次 WorkflowTaskBatchGeneratorContext workflowTaskBatchGeneratorContext = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatchGeneratorContext(workflowBlockStrategyContext); workflowTaskBatchGeneratorContext.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus()); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/RecoveryWorkflowBlockStrategy.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/RecoveryWorkflowBlockStrategy.java new file mode 100644 index 00000000..eed4835f --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/RecoveryWorkflowBlockStrategy.java @@ -0,0 +1,35 @@ +package com.aizuda.snailjob.server.job.task.support.block.workflow; + +import com.aizuda.snailjob.server.common.exception.SnailJobServerException; +import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum; +import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + * @author: opensnail + * @date : 2024-06-26 + * @since : sj_1.1.0 + */ +@Component +@RequiredArgsConstructor +public class RecoveryWorkflowBlockStrategy extends AbstractWorkflowBlockStrategy { + private final WorkflowBatchHandler workflowBatchHandler; + + @Override + protected void doBlock(final WorkflowBlockStrategyContext workflowBlockStrategyContext) { + + try { + workflowBatchHandler.recoveryWorkflowExecutor(workflowBlockStrategyContext.getWorkflowTaskBatchId(), null); + } catch (IOException e) { + throw new SnailJobServerException("校验工作流失败", e); + } + } + + @Override + protected BlockStrategyEnum blockStrategyEnum() { + return BlockStrategyEnum.RECOVERY; + } +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java index 7eb22eb2..b6b635c3 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java @@ -222,7 +222,14 @@ public class WorkflowBatchHandler { } } - public void checkWorkflowExecutor(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) throws IOException { + /** + * 重新触发未执行成功的工作流节点 + * + * @param workflowTaskBatchId 工作流批次 + * @param workflowTaskBatch 工作流批次信息(若为null, 则会通过workflowTaskBatchId查询) + * @throws IOException + */ + public void recoveryWorkflowExecutor(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) throws IOException { workflowTaskBatch = Optional.ofNullable(workflowTaskBatch) .orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId)); Assert.notNull(workflowTaskBatch, () -> new SnailJobServerException("任务不存在")); @@ -241,10 +248,10 @@ public class WorkflowBatchHandler { Map jobTaskBatchMap = StreamUtils.toIdentityMap(jobTaskBatches, JobTaskBatch::getWorkflowNodeId); - checkWorkflowExecutor(SystemConstants.ROOT, workflowTaskBatchId, graph, jobTaskBatchMap); + recoveryWorkflowExecutor(SystemConstants.ROOT, workflowTaskBatchId, graph, jobTaskBatchMap); } - private void checkWorkflowExecutor(Long parentId, Long workflowTaskBatchId, MutableGraph graph, Map jobTaskBatchMap) { + private void recoveryWorkflowExecutor(Long parentId, Long workflowTaskBatchId, MutableGraph graph, Map jobTaskBatchMap) { // 判定条件节点是否已经执行完成 JobTaskBatch parentJobTaskBatch = jobTaskBatchMap.get(parentId); @@ -289,7 +296,7 @@ public class WorkflowBatchHandler { } // 已经是终态的需要递归遍历后继节点是否正常执行 - checkWorkflowExecutor(successor, workflowTaskBatchId, graph, jobTaskBatchMap); + recoveryWorkflowExecutor(successor, workflowTaskBatchId, graph, jobTaskBatchMap); } }