diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java index a7871802e..0fb752b36 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -158,16 +158,16 @@ public class WorkflowExecutorActor extends AbstractActor { WorkflowNode parentWorkflowNode = workflowNodeMap.get(taskExecute.getParentId()); // 失败策略处理 - if (CollUtil.isNotEmpty(parentJobTaskBatchList) - && parentJobTaskBatchList.stream() - .map(JobTaskBatch::getTaskBatchStatus) - .anyMatch(i -> i != JobTaskBatchStatusEnum.SUCCESS.getStatus())) { - - // 根据失败策略判断是否继续处理 - if (Objects.equals(parentWorkflowNode.getFailStrategy(), FailStrategyEnum.BLOCK.getCode())) { - return; - } - } +// if (CollUtil.isNotEmpty(parentJobTaskBatchList) +// && parentJobTaskBatchList.stream() +// .map(JobTaskBatch::getTaskBatchStatus) +// .anyMatch(i -> i != JobTaskBatchStatusEnum.SUCCESS.getStatus())) { +// +// // 根据失败策略判断是否继续处理 +// if (Objects.equals(parentWorkflowNode.getFailStrategy(), FailStrategyEnum.BLOCK.getCode())) { +// return; +// } +// } // 决策节点 if (Objects.nonNull(parentWorkflowNode) @@ -181,11 +181,6 @@ public class WorkflowExecutorActor extends AbstractActor { // 过滤掉非当前决策节点【ParentId】的子节点 && successors.contains(workflowNode.getId())).collect(Collectors.toList()); } else { - // TODO 不通过兄弟节点去控制是否执行后续节点 -// if (!brotherNodeIsComplete(taskExecute, brotherNode, jobTaskBatchMap, parentWorkflowNode)) { -// return; -// } - workflowNodes = workflowNodes.stream() // 去掉父节点 .filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId())) @@ -214,7 +209,7 @@ public class WorkflowExecutorActor extends AbstractActor { // 决策当前节点要不要执行 Set predecessors = graph.predecessors(workflowNode.getId()); boolean predecessorsComplete = arePredecessorsComplete(taskExecute, predecessors, jobTaskBatchMap, - workflowNode); + workflowNode, workflowNodeMap); if (!SystemConstants.ROOT.equals(taskExecute.getParentId()) && !predecessorsComplete) { continue; } @@ -268,10 +263,8 @@ public class WorkflowExecutorActor extends AbstractActor { } private boolean arePredecessorsComplete(final WorkflowNodeTaskExecuteDTO taskExecute, Set predecessors, - Map> jobTaskBatchMap, WorkflowNode waitExecWorkflowNode) { + Map> jobTaskBatchMap, WorkflowNode waitExecWorkflowNode, Map workflowNodeMap) { - // 是否存在无需处理的节点 - List isExistedNotSkipJobTaskBatches = new ArrayList<>(); // 判断所有节点是否都完成 for (final Long nodeId : predecessors) { if (SystemConstants.ROOT.equals(nodeId)) { @@ -295,13 +288,17 @@ public class WorkflowExecutorActor extends AbstractActor { return Boolean.FALSE; } - if (CollUtil.isEmpty(isExistedNotSkipJobTaskBatches)) { - isExistedNotSkipJobTaskBatches = jobTaskBatches.stream().filter( - jobTaskBatch -> !WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(jobTaskBatch.getOperationReason())) - .toList(); + // 父节点只要有一个是失败的且失败策略是阻塞的则当前节点不处理 + if (jobTaskBatches.stream() + .map(JobTaskBatch::getTaskBatchStatus) + .anyMatch(i -> i != JobTaskBatchStatusEnum.SUCCESS.getStatus())) { + WorkflowNode preWorkflowNode = workflowNodeMap.get(nodeId); + // 根据失败策略判断是否继续处理 + if (Objects.equals(preWorkflowNode.getFailStrategy(), FailStrategyEnum.BLOCK.getCode())) { + return Boolean.FALSE; + } } - } return Boolean.TRUE;