feat(sj_1.0.0): 修复工作流多个决策节点时候后续节点无法执行问题
This commit is contained in:
parent
691c5033f8
commit
ef15020613
@ -8,11 +8,11 @@ import com.aizuda.snailjob.common.core.context.SpringContext;
|
|||||||
import com.aizuda.snailjob.common.core.enums.FailStrategyEnum;
|
import com.aizuda.snailjob.common.core.enums.FailStrategyEnum;
|
||||||
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
|
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
|
||||||
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
|
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
|
||||||
|
import com.aizuda.snailjob.common.core.enums.WorkflowNodeTypeEnum;
|
||||||
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||||
import com.aizuda.snailjob.common.core.util.StreamUtils;
|
import com.aizuda.snailjob.common.core.util.StreamUtils;
|
||||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||||
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
|
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
|
||||||
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
|
|
||||||
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
||||||
import com.aizuda.snailjob.server.common.util.DateUtils;
|
import com.aizuda.snailjob.server.common.util.DateUtils;
|
||||||
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
|
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
|
||||||
@ -137,27 +137,27 @@ public class WorkflowExecutorActor extends AbstractActor {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
WorkflowNode parentWorkflowNode = workflowNodeMap.get(taskExecute.getParentId());
|
||||||
// 失败策略处理
|
// 失败策略处理
|
||||||
if (CollUtil.isNotEmpty(parentJobTaskBatchList)
|
if (CollUtil.isNotEmpty(parentJobTaskBatchList)
|
||||||
&& parentJobTaskBatchList.stream()
|
&& parentJobTaskBatchList.stream()
|
||||||
.map(JobTaskBatch::getTaskBatchStatus)
|
.map(JobTaskBatch::getTaskBatchStatus)
|
||||||
.anyMatch(i -> i != JobTaskBatchStatusEnum.SUCCESS.getStatus())) {
|
.anyMatch(i -> i != JobTaskBatchStatusEnum.SUCCESS.getStatus())) {
|
||||||
// 判断是否继续处理,根据失败策略
|
|
||||||
WorkflowNode workflowNode = workflowNodeMap.get(taskExecute.getParentId());
|
// 根据失败策略判断是否继续处理
|
||||||
// 失败了阻塞策略
|
if (Objects.equals(parentWorkflowNode.getFailStrategy(), FailStrategyEnum.BLOCK.getCode())) {
|
||||||
if (Objects.equals(workflowNode.getFailStrategy(), FailStrategyEnum.BLOCK.getCode())) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!brotherNodeIsComplete(taskExecute, brotherNode, jobTaskBatchMap)) {
|
if (!brotherNodeIsComplete(taskExecute, brotherNode, jobTaskBatchMap, parentWorkflowNode)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 去掉父节点
|
// 去掉父节点
|
||||||
workflowNodes = workflowNodes.stream()
|
workflowNodes = workflowNodes.stream()
|
||||||
.filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId())).collect(
|
.filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId()))
|
||||||
Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
List<Job> jobs = jobMapper.selectBatchIds(StreamUtils.toSet(workflowNodes, WorkflowNode::getJobId));
|
List<Job> jobs = jobMapper.selectBatchIds(StreamUtils.toSet(workflowNodes, WorkflowNode::getJobId));
|
||||||
Map<Long, Job> jobMap = StreamUtils.toIdentityMap(jobs, Job::getId);
|
Map<Long, Job> jobMap = StreamUtils.toIdentityMap(jobs, Job::getId);
|
||||||
@ -191,12 +191,17 @@ public class WorkflowExecutorActor extends AbstractActor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private boolean brotherNodeIsComplete(WorkflowNodeTaskExecuteDTO taskExecute, Set<Long> brotherNode,
|
private boolean brotherNodeIsComplete(WorkflowNodeTaskExecuteDTO taskExecute, Set<Long> brotherNode,
|
||||||
Map<Long, List<JobTaskBatch>> jobTaskBatchMap) {
|
Map<Long, List<JobTaskBatch>> jobTaskBatchMap, WorkflowNode parentWorkflowNode) {
|
||||||
|
|
||||||
if (SystemConstants.ROOT.equals(taskExecute.getParentId())) {
|
if (SystemConstants.ROOT.equals(taskExecute.getParentId())) {
|
||||||
return Boolean.TRUE;
|
return Boolean.TRUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 决策节点不需要等待其他的兄弟节点是否完成,一个完成直接流转到后继节点
|
||||||
|
if (WorkflowNodeTypeEnum.DECISION.getType() == parentWorkflowNode.getNodeType()) {
|
||||||
|
return Boolean.TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
// 判断所有节点是否都完成
|
// 判断所有节点是否都完成
|
||||||
for (final Long nodeId : brotherNode) {
|
for (final Long nodeId : brotherNode) {
|
||||||
List<JobTaskBatch> jobTaskBatches = jobTaskBatchMap.get(nodeId);
|
List<JobTaskBatch> jobTaskBatches = jobTaskBatchMap.get(nodeId);
|
||||||
|
Loading…
Reference in New Issue
Block a user