From ef150206134a37e913f7803e3a2343cf5bccef69 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Sun, 2 Jun 2024 13:13:17 +0800 Subject: [PATCH] =?UTF-8?q?feat(sj=5F1.0.0):=20=E4=BF=AE=E5=A4=8D=E5=B7=A5?= =?UTF-8?q?=E4=BD=9C=E6=B5=81=E5=A4=9A=E4=B8=AA=E5=86=B3=E7=AD=96=E8=8A=82?= =?UTF-8?q?=E7=82=B9=E6=97=B6=E5=80=99=E5=90=8E=E7=BB=AD=E8=8A=82=E7=82=B9?= =?UTF-8?q?=E6=97=A0=E6=B3=95=E6=89=A7=E8=A1=8C=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dispatch/WorkflowExecutorActor.java | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java index 9040bff5..596992b8 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -8,11 +8,11 @@ import com.aizuda.snailjob.common.core.context.SpringContext; import com.aizuda.snailjob.common.core.enums.FailStrategyEnum; import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.snailjob.common.core.enums.WorkflowNodeTypeEnum; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.core.util.StreamUtils; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.akka.ActorGenerator; -import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO; @@ -137,27 +137,27 @@ public class WorkflowExecutorActor extends AbstractActor { return; } + WorkflowNode parentWorkflowNode = workflowNodeMap.get(taskExecute.getParentId()); // 失败策略处理 if (CollUtil.isNotEmpty(parentJobTaskBatchList) && parentJobTaskBatchList.stream() .map(JobTaskBatch::getTaskBatchStatus) .anyMatch(i -> i != JobTaskBatchStatusEnum.SUCCESS.getStatus())) { - // 判断是否继续处理,根据失败策略 - WorkflowNode workflowNode = workflowNodeMap.get(taskExecute.getParentId()); - // 失败了阻塞策略 - if (Objects.equals(workflowNode.getFailStrategy(), FailStrategyEnum.BLOCK.getCode())) { + + // 根据失败策略判断是否继续处理 + if (Objects.equals(parentWorkflowNode.getFailStrategy(), FailStrategyEnum.BLOCK.getCode())) { return; } } - if (!brotherNodeIsComplete(taskExecute, brotherNode, jobTaskBatchMap)) { + if (!brotherNodeIsComplete(taskExecute, brotherNode, jobTaskBatchMap, parentWorkflowNode)) { return; } // 去掉父节点 workflowNodes = workflowNodes.stream() - .filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId())).collect( - Collectors.toList()); + .filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId())) + .collect(Collectors.toList()); List jobs = jobMapper.selectBatchIds(StreamUtils.toSet(workflowNodes, WorkflowNode::getJobId)); Map jobMap = StreamUtils.toIdentityMap(jobs, Job::getId); @@ -191,12 +191,17 @@ public class WorkflowExecutorActor extends AbstractActor { } private boolean brotherNodeIsComplete(WorkflowNodeTaskExecuteDTO taskExecute, Set brotherNode, - Map> jobTaskBatchMap) { + Map> jobTaskBatchMap, WorkflowNode parentWorkflowNode) { if (SystemConstants.ROOT.equals(taskExecute.getParentId())) { return Boolean.TRUE; } + // 决策节点不需要等待其他的兄弟节点是否完成,一个完成直接流转到后继节点 + if (WorkflowNodeTypeEnum.DECISION.getType() == parentWorkflowNode.getNodeType()) { + return Boolean.TRUE; + } + // 判断所有节点是否都完成 for (final Long nodeId : brotherNode) { List jobTaskBatches = jobTaskBatchMap.get(nodeId);