diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java index 9040bff5..596992b8 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -8,11 +8,11 @@ import com.aizuda.snailjob.common.core.context.SpringContext; import com.aizuda.snailjob.common.core.enums.FailStrategyEnum; import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.snailjob.common.core.enums.WorkflowNodeTypeEnum; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.core.util.StreamUtils; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.akka.ActorGenerator; -import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO; @@ -137,27 +137,27 @@ public class WorkflowExecutorActor extends AbstractActor { return; } + WorkflowNode parentWorkflowNode = workflowNodeMap.get(taskExecute.getParentId()); // 失败策略处理 if (CollUtil.isNotEmpty(parentJobTaskBatchList) && parentJobTaskBatchList.stream() .map(JobTaskBatch::getTaskBatchStatus) .anyMatch(i -> i != JobTaskBatchStatusEnum.SUCCESS.getStatus())) { - // 判断是否继续处理,根据失败策略 - WorkflowNode workflowNode = workflowNodeMap.get(taskExecute.getParentId()); - // 失败了阻塞策略 - if (Objects.equals(workflowNode.getFailStrategy(), FailStrategyEnum.BLOCK.getCode())) { + + // 根据失败策略判断是否继续处理 + if (Objects.equals(parentWorkflowNode.getFailStrategy(), FailStrategyEnum.BLOCK.getCode())) { return; } } - if (!brotherNodeIsComplete(taskExecute, brotherNode, jobTaskBatchMap)) { + if (!brotherNodeIsComplete(taskExecute, brotherNode, jobTaskBatchMap, parentWorkflowNode)) { return; } // 去掉父节点 workflowNodes = workflowNodes.stream() - .filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId())).collect( - Collectors.toList()); + .filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId())) + .collect(Collectors.toList()); List jobs = jobMapper.selectBatchIds(StreamUtils.toSet(workflowNodes, WorkflowNode::getJobId)); Map jobMap = StreamUtils.toIdentityMap(jobs, Job::getId); @@ -191,12 +191,17 @@ public class WorkflowExecutorActor extends AbstractActor { } private boolean brotherNodeIsComplete(WorkflowNodeTaskExecuteDTO taskExecute, Set brotherNode, - Map> jobTaskBatchMap) { + Map> jobTaskBatchMap, WorkflowNode parentWorkflowNode) { if (SystemConstants.ROOT.equals(taskExecute.getParentId())) { return Boolean.TRUE; } + // 决策节点不需要等待其他的兄弟节点是否完成,一个完成直接流转到后继节点 + if (WorkflowNodeTypeEnum.DECISION.getType() == parentWorkflowNode.getNodeType()) { + return Boolean.TRUE; + } + // 判断所有节点是否都完成 for (final Long nodeId : brotherNode) { List jobTaskBatches = jobTaskBatchMap.get(nodeId);