From 3cb31a14d51f81cf3140f384a45af3dea1f0b4a9 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Sun, 30 Jun 2024 22:58:02 +0800 Subject: [PATCH] =?UTF-8?q?fix(sj=5F1.1.0-beta2):=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E5=9B=9E=E8=B0=83=E8=8A=82=E7=82=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dispatch/WorkflowExecutorActor.java | 58 ++++--- .../workflow/CallbackWorkflowExecutor.java | 30 ++-- .../workflow/JobTaskWorkflowExecutor.java | 12 +- .../impl/WorkflowBatchServiceImpl.java | 153 ++++++++++-------- 4 files changed, 144 insertions(+), 109 deletions(-) diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java index 3e9c6bf4..a7871802 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -8,6 +8,7 @@ import com.aizuda.snailjob.common.core.context.SpringContext; import com.aizuda.snailjob.common.core.enums.FailStrategyEnum; import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.snailjob.common.core.enums.NodeTypeEnum; import com.aizuda.snailjob.common.core.enums.WorkflowNodeTypeEnum; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.core.util.StreamUtils; @@ -212,7 +213,8 @@ public class WorkflowExecutorActor extends AbstractActor { // 决策当前节点要不要执行 Set predecessors = graph.predecessors(workflowNode.getId()); - boolean predecessorsComplete = arePredecessorsComplete(taskExecute, predecessors, jobTaskBatchMap, workflowNode, parentJobTaskBatchList); + boolean predecessorsComplete = arePredecessorsComplete(taskExecute, predecessors, jobTaskBatchMap, + workflowNode); if (!SystemConstants.ROOT.equals(taskExecute.getParentId()) && !predecessorsComplete) { continue; } @@ -230,7 +232,7 @@ public class WorkflowExecutorActor extends AbstractActor { context.setWfContext(workflowTaskBatch.getWfContext()); // 这里父节点取最新的批次判断状态 if (CollUtil.isNotEmpty(parentJobTaskBatchList)) { - context.setParentOperationReason(parentJobTaskBatchList.get(0).getOperationReason()); + fillParentOperationReason(allJobTaskBatchList, parentJobTaskBatchList, parentWorkflowNode, context); } workflowExecutor.execute(context); @@ -240,9 +242,33 @@ public class WorkflowExecutorActor extends AbstractActor { } + private static void fillParentOperationReason(final List allJobTaskBatchList, + final List parentJobTaskBatchList, final WorkflowNode parentWorkflowNode, + final WorkflowExecutorContext context) { + JobTaskBatch jobTaskBatch = allJobTaskBatchList.stream() + .filter(batch -> !WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(batch.getOperationReason())) + .findFirst().orElse(null); + + /* + 若当前节点的父节点存在无需处理的节点(比如决策节点的某个未匹中的分支),则需要等待正常的节点来执行此节点,若正常节点已经调度过了, + 此时则没有能触发后继节点继续调度的节点存在了。 因此这里将改变parentOperationReason = 0使之能继续往后处理 + 基于性能的考虑这里在直接在parentJobTaskBatchList列表的头节点插入一个不是跳过的节点,这样就可以正常流转了 + eg: {"-1":[480],"480":[481,488,490],"481":[482],"482":[483],"483":[484],"484":[485],"485":[486],"486":[487],"487":[497,498],"488":[489],"489":[497,498],"490":[491,493,495],"491":[492],"492":[497,498],"493":[494],"494":[497,498],"495":[496],"496":[497,498],"497":[499],"498":[499],"499":[]} + */ + if (parentJobTaskBatchList.stream() + .map(JobTaskBatch::getOperationReason) + .filter(Objects::nonNull) + .anyMatch(JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION::contains) + && Objects.nonNull(jobTaskBatch) + && parentWorkflowNode.getNodeType() != WorkflowNodeTypeEnum.DECISION.getType()) { + context.setParentOperationReason(JobOperationReasonEnum.NONE.getReason()); + } else { + context.setParentOperationReason(parentJobTaskBatchList.get(0).getOperationReason()); + } + } + private boolean arePredecessorsComplete(final WorkflowNodeTaskExecuteDTO taskExecute, Set predecessors, - Map> jobTaskBatchMap, WorkflowNode waitExecWorkflowNode, - List parentJobTaskBatchList) { + Map> jobTaskBatchMap, WorkflowNode waitExecWorkflowNode) { // 是否存在无需处理的节点 List isExistedNotSkipJobTaskBatches = new ArrayList<>(); @@ -263,37 +289,21 @@ public class WorkflowExecutorActor extends AbstractActor { boolean isCompleted = jobTaskBatches.stream().anyMatch( jobTaskBatch -> JobTaskBatchStatusEnum.NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus())); if (isCompleted) { - SnailJobLog.LOCAL.info("存在未完成的兄弟节点. [{}] 待执行节点:[{}] parentId:[{}]", nodeId, taskExecute.getParentId(), + SnailJobLog.LOCAL.info("存在未完成的兄弟节点. [{}] 待执行节点:[{}] parentId:[{}]", nodeId, + taskExecute.getParentId(), waitExecWorkflowNode.getId()); return Boolean.FALSE; } if (CollUtil.isEmpty(isExistedNotSkipJobTaskBatches)) { isExistedNotSkipJobTaskBatches = jobTaskBatches.stream().filter( - jobTaskBatch -> !WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(jobTaskBatch.getOperationReason())).toList(); + jobTaskBatch -> !WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(jobTaskBatch.getOperationReason())) + .toList(); } } - // 父节点是否存在无需处理的节点,若存在一个不是的则需要等待正常的节点处理 - // 如果父节点是无需处理则不再继续执行 - if (CollUtil.isNotEmpty(parentJobTaskBatchList) && - parentJobTaskBatchList.stream() - .map(JobTaskBatch::getOperationReason) - .filter(Objects::nonNull) - .anyMatch(JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION::contains) - && CollUtil.isNotEmpty(isExistedNotSkipJobTaskBatches)) { - /* - 等待正常的节点来执行此节点,若正常节点已经调度过了,此时则没有能触发后继节点继续调度的节点存在了。 - 因此这里将重新选当前节点的前驱节点中选一个作为父节点来触发,使之能够继续往后执行 - 基于性能的考虑这里在直接在parentJobTaskBatchList列表的头节点插入一个不是跳过的节点,这样就可以正常流转了 - eg: {"-1":[480],"480":[481,488,490],"481":[482],"482":[483],"483":[484],"484":[485],"485":[486],"486":[487],"487":[497,498],"488":[489],"489":[497,498],"490":[491,493,495],"491":[492],"492":[497,498],"493":[494],"494":[497,498],"495":[496],"496":[497,498],"497":[499],"498":[499],"499":[]} - */ - log.warn("-->>> isExistedNotSkip:[{}] nodeId:[{}] parentId:[{}]", CollUtil.isNotEmpty(isExistedNotSkipJobTaskBatches), waitExecWorkflowNode.getId(), taskExecute.getParentId()); - parentJobTaskBatchList.add(0, isExistedNotSkipJobTaskBatches.get(0)); - } - return Boolean.TRUE; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java index e49dcdc3..d223f3df 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java @@ -9,14 +9,12 @@ import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.dto.CallbackConfig; import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO; import com.aizuda.snailjob.server.common.rpc.okhttp.RequestInterceptor; -import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter; import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent; import com.aizuda.snailjob.server.model.dto.CallbackParamsDTO; -import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.github.rholder.retry.*; +import com.google.common.collect.Sets; import lombok.RequiredArgsConstructor; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; @@ -26,11 +24,14 @@ import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.TimeUnit; +import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_DECISION_FAILED; +import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED; + /** * @author xiaowoniu * @date 2023-12-24 08:18:06 @@ -39,7 +40,8 @@ import java.util.concurrent.TimeUnit; @Component @RequiredArgsConstructor public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor { - + private static final Set NO_REQUIRED_CONFIG = Sets.newHashSet(WORKFLOW_NODE_NO_REQUIRED.getReason(), + WORKFLOW_DECISION_FAILED.getReason()); private static final String CALLBACK_TIMEOUT = "10"; private final RestTemplate restTemplate; @@ -61,7 +63,12 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor { context.setOperationReason(JobOperationReasonEnum.NONE.getReason()); context.setJobTaskStatus(JobTaskStatusEnum.SUCCESS.getStatus()); - if (Objects.equals(context.getWorkflowNodeStatus(), StatusEnum.NO.getStatus())) { + if (NO_REQUIRED_CONFIG.contains(context.getParentOperationReason())) { + // 针对无需处理的批次直接新增一个记录 + context.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus()); + context.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason()); + context.setJobTaskStatus(JobTaskStatusEnum.CANCEL.getStatus()); + } else if (Objects.equals(context.getWorkflowNodeStatus(), StatusEnum.NO.getStatus())) { context.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus()); context.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason()); context.setJobTaskStatus(JobTaskStatusEnum.CANCEL.getStatus()); @@ -69,9 +76,6 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor { invokeCallback(context); } - // ToDo 执行下一个节点 -// workflowTaskExecutor(context); - } private void invokeCallback(WorkflowExecutorContext context) { @@ -160,8 +164,14 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor { SnailJobLog.REMOTE.info("节点[{}]回调成功.\n回调参数:{} \n回调结果:[{}] <|>{}<|>", context.getWorkflowNodeId(), context.getWfContext(), context.getEvaluationResult(), jobLogMetaDTO); } else if (jobTaskBatch.getTaskBatchStatus() == JobTaskStatusEnum.CANCEL.getStatus()) { - SnailJobLog.REMOTE.warn("节点[{}]取消回调. 取消原因: 任务状态已关闭 <|>{}<|>", + if (NO_REQUIRED_CONFIG.contains(context.getParentOperationReason())) { + SnailJobLog.REMOTE.warn("节点[{}]取消回调. 取消原因: 当前任务无需处理 <|>{}<|>", context.getWorkflowNodeId(), jobLogMetaDTO); + } else { + SnailJobLog.REMOTE.warn("节点[{}]取消回调. 取消原因: 任务状态已关闭 <|>{}<|>", + context.getWorkflowNodeId(), jobLogMetaDTO); + } + } else { SnailJobLog.REMOTE.error("节点[{}]回调失败.\n失败原因:{} <|>{}<|>", context.getWorkflowNodeId(), diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java index bae540e5..489474f3 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java @@ -63,8 +63,8 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor { context.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason()); context.setJobTaskStatus(JobTaskStatusEnum.CANCEL.getStatus()); - // 创建批次和任务节点 - invokeCancelJobTask(context); + // 创建批次和任务节点4 + invokeCancelJobTask(context, "当前节点无需处理"); } else if (Objects.equals(context.getWorkflowNodeStatus(), StatusEnum.NO.getStatus())) { // 针对无需处理的批次直接新增一个记录 context.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus()); @@ -72,7 +72,7 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor { context.setJobTaskStatus(JobTaskStatusEnum.CANCEL.getStatus()); // 创建批次和任务节点 - invokeCancelJobTask(context); + invokeCancelJobTask(context, "任务已关闭"); } else { invokeJobTask(context); } @@ -88,7 +88,7 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor { actorRef.tell(jobTaskPrepare, actorRef); } - private void invokeCancelJobTask(final WorkflowExecutorContext context) { + private void invokeCancelJobTask(final WorkflowExecutorContext context, String cancelReason) { JobTaskBatch jobTaskBatch = generateJobTaskBatch(context); JobTask jobTask = generateJobTask(context, jobTaskBatch); @@ -100,7 +100,7 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor { jobLogMetaDTO.setJobId(context.getJobId()); jobLogMetaDTO.setTaskId(jobTask.getId()); - SnailJobLog.REMOTE.warn("节点[{}]已取消任务执行. 取消原因: 任务已关闭. <|>{}<|>", - context.getWorkflowNodeId(), jobLogMetaDTO); + SnailJobLog.REMOTE.warn("节点[{}]已取消任务执行. 取消原因: {}. <|>{}<|>", + context.getWorkflowNodeId(), cancelReason, jobLogMetaDTO); } } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/WorkflowBatchServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/WorkflowBatchServiceImpl.java index ced0d270..0e75c8f8 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/WorkflowBatchServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/WorkflowBatchServiceImpl.java @@ -50,6 +50,7 @@ import java.util.stream.Collectors; @Slf4j @RequiredArgsConstructor public class WorkflowBatchServiceImpl implements WorkflowBatchService { + private static final Integer NOT_HANDLE_STATUS = 99; private static final Integer WORKFLOW_DECISION_FAILED_STATUS = 98; private final WorkflowTaskBatchMapper workflowTaskBatchMapper; @@ -62,7 +63,7 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService { private static boolean isNoOperation(JobTaskBatch i) { return JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(i.getOperationReason()) - || i.getTaskBatchStatus() == JobTaskBatchStatusEnum.STOP.getStatus(); + || i.getTaskBatchStatus() == JobTaskBatchStatusEnum.STOP.getStatus(); } @Override @@ -78,19 +79,20 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService { } QueryWrapper wrapper = new QueryWrapper() - .eq("batch.namespace_id", userSessionVO.getNamespaceId()) - .eq(queryVO.getWorkflowId() != null, "batch.workflow_id", queryVO.getWorkflowId()) - .in(CollUtil.isNotEmpty(groupNames), "batch.group_name", groupNames) - .eq(queryVO.getTaskBatchStatus() != null, "batch.task_batch_status", queryVO.getTaskBatchStatus()) - .likeRight(StrUtil.isNotBlank(queryVO.getWorkflowName()), "flow.workflow_name", queryVO.getWorkflowName()) - .between(ObjUtil.isNotNull(queryVO.getDatetimeRange()), - "batch.create_dt", queryVO.getStartDt(), queryVO.getEndDt()) - .eq("batch.deleted", 0) - .orderByDesc("batch.id"); - List batchResponseDOList = workflowTaskBatchMapper.selectWorkflowBatchPageList(pageDTO, wrapper); + .eq("batch.namespace_id", userSessionVO.getNamespaceId()) + .eq(queryVO.getWorkflowId() != null, "batch.workflow_id", queryVO.getWorkflowId()) + .in(CollUtil.isNotEmpty(groupNames), "batch.group_name", groupNames) + .eq(queryVO.getTaskBatchStatus() != null, "batch.task_batch_status", queryVO.getTaskBatchStatus()) + .likeRight(StrUtil.isNotBlank(queryVO.getWorkflowName()), "flow.workflow_name", queryVO.getWorkflowName()) + .between(ObjUtil.isNotNull(queryVO.getDatetimeRange()), + "batch.create_dt", queryVO.getStartDt(), queryVO.getEndDt()) + .eq("batch.deleted", 0) + .orderByDesc("batch.id"); + List batchResponseDOList = workflowTaskBatchMapper.selectWorkflowBatchPageList(pageDTO, + wrapper); List batchResponseVOList = - WorkflowConverter.INSTANCE.convertListToWorkflowBatchList(batchResponseDOList); + WorkflowConverter.INSTANCE.convertListToWorkflowBatchList(batchResponseDOList); return new PageResult<>(pageDTO, batchResponseVOList); } @@ -99,9 +101,9 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService { public WorkflowDetailResponseVO getWorkflowBatchDetail(Long id) { WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne( - new LambdaQueryWrapper() - .eq(WorkflowTaskBatch::getId, id) - .eq(WorkflowTaskBatch::getNamespaceId, UserSessionUtils.currentUserSession().getNamespaceId())); + new LambdaQueryWrapper() + .eq(WorkflowTaskBatch::getId, id) + .eq(WorkflowTaskBatch::getNamespaceId, UserSessionUtils.currentUserSession().getNamespaceId())); if (Objects.isNull(workflowTaskBatch)) { return null; } @@ -110,22 +112,22 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService { WorkflowDetailResponseVO responseVO = WorkflowConverter.INSTANCE.convert(workflow); List workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper() - .eq(WorkflowNode::getDeleted, StatusEnum.NO.getStatus()) - .eq(WorkflowNode::getWorkflowId, workflow.getId())); + .eq(WorkflowNode::getDeleted, StatusEnum.NO.getStatus()) + .eq(WorkflowNode::getWorkflowId, workflow.getId())); List jobs = jobMapper.selectList( - new LambdaQueryWrapper() - .in(Job::getId, StreamUtils.toSet(workflowNodes, WorkflowNode::getJobId))); + new LambdaQueryWrapper() + .in(Job::getId, StreamUtils.toSet(workflowNodes, WorkflowNode::getJobId))); Map jobMap = StreamUtils.toIdentityMap(jobs, Job::getId); List alJobTaskBatchList = jobTaskBatchMapper.selectList( - new LambdaQueryWrapper() - .eq(JobTaskBatch::getWorkflowTaskBatchId, id) - .orderByDesc(JobTaskBatch::getId)); + new LambdaQueryWrapper() + .eq(JobTaskBatch::getWorkflowTaskBatchId, id) + .orderByDesc(JobTaskBatch::getId)); Map> jobTaskBatchMap = StreamUtils.groupByKey(alJobTaskBatchList, - JobTaskBatch::getWorkflowNodeId); + JobTaskBatch::getWorkflowNodeId); List nodeInfos = WorkflowConverter.INSTANCE.convertList(workflowNodes); String flowInfo = workflowTaskBatch.getFlowInfo(); @@ -133,65 +135,78 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService { Set allNoOperationNode = Sets.newHashSet(); Map workflowNodeMap = nodeInfos.stream() - .peek(nodeInfo -> { + .peek(nodeInfo -> { - JobTaskConfig jobTask = nodeInfo.getJobTask(); - if (Objects.nonNull(jobTask)) { - jobTask.setJobName(jobMap.getOrDefault(jobTask.getJobId(), new Job()).getJobName()); - } + JobTaskConfig jobTask = nodeInfo.getJobTask(); + if (Objects.nonNull(jobTask)) { + jobTask.setJobName(jobMap.getOrDefault(jobTask.getJobId(), new Job()).getJobName()); + } - List jobTaskBatchList = jobTaskBatchMap.get(nodeInfo.getId()); - if (CollUtil.isNotEmpty(jobTaskBatchList)) { - jobTaskBatchList = jobTaskBatchList.stream() - .sorted(Comparator.comparingInt(JobTaskBatch::getTaskBatchStatus)) - .collect(Collectors.toList()); - nodeInfo.setJobBatchList(JobBatchResponseVOConverter.INSTANCE.convertListToJobBatchList(jobTaskBatchList)); - - // 取第最新的一条状态 - JobTaskBatch jobTaskBatch = jobTaskBatchList.get(0); - if (JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason() == jobTaskBatch.getOperationReason()) { - // 前端展示使用 - nodeInfo.setTaskBatchStatus(WORKFLOW_DECISION_FAILED_STATUS); - } else { - nodeInfo.setTaskBatchStatus(jobTaskBatch.getTaskBatchStatus()); - } - - if (jobTaskBatchList.stream() - .filter(Objects::nonNull) - .anyMatch(WorkflowBatchServiceImpl::isNoOperation)) { - // 当前节点下面的所有节点都是无需处理的节点 - Set allDescendants = MutableGraphCache.getAllDescendants(graph, nodeInfo.getId()); - allNoOperationNode.addAll(allDescendants); - } else { - // 删除被误添加的节点 - allNoOperationNode.remove(nodeInfo.getId()); - } + List jobTaskBatchList = jobTaskBatchMap.get(nodeInfo.getId()); + if (CollUtil.isNotEmpty(jobTaskBatchList)) { + jobTaskBatchList = jobTaskBatchList.stream() + .sorted(Comparator.comparingInt(JobTaskBatch::getTaskBatchStatus)) + .collect(Collectors.toList()); + nodeInfo.setJobBatchList( + JobBatchResponseVOConverter.INSTANCE.convertListToJobBatchList(jobTaskBatchList)); + // 取第最新的一条状态 + JobTaskBatch jobTaskBatch = jobTaskBatchList.get(0); + if (JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason() + == jobTaskBatch.getOperationReason()) { + // 前端展示使用 + nodeInfo.setTaskBatchStatus(WORKFLOW_DECISION_FAILED_STATUS); } else { - if (JobTaskBatchStatusEnum.NOT_SUCCESS.contains(workflowTaskBatch.getTaskBatchStatus())) { - allNoOperationNode.add(nodeInfo.getId()); - } + nodeInfo.setTaskBatchStatus(jobTaskBatch.getTaskBatchStatus()); } - }) - .collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, Function.identity())); + + if (jobTaskBatchList.stream() + .filter(Objects::nonNull) + .anyMatch(WorkflowBatchServiceImpl::isNoOperation)) { + // 当前节点下面的所有节点都是无需处理的节点 + Set allDescendants = MutableGraphCache.getAllDescendants(graph, nodeInfo.getId()); + allNoOperationNode.addAll(allDescendants); + } else { + // 删除被误添加的节点 + allNoOperationNode.remove(nodeInfo.getId()); + } + + } else { + if (JobTaskBatchStatusEnum.NOT_SUCCESS.contains(workflowTaskBatch.getTaskBatchStatus())) { + allNoOperationNode.add(nodeInfo.getId()); + } + } + }) + .collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, Function.identity())); for (Long noOperationNodeId : allNoOperationNode) { WorkflowDetailResponseVO.NodeInfo nodeInfo = workflowNodeMap.get(noOperationNodeId); - JobBatchResponseVO jobBatchResponseVO = new JobBatchResponseVO(); - JobTaskConfig jobTask = nodeInfo.getJobTask(); - if (Objects.nonNull(jobTask)) { - jobBatchResponseVO.setJobId(jobTask.getJobId()); + List jobTaskBatches = jobTaskBatchMap.get(nodeInfo.getId()); + + if (CollUtil.isNotEmpty(jobTaskBatches)) { + jobTaskBatches = jobTaskBatches.stream() + .sorted(Comparator.comparingInt(JobTaskBatch::getTaskBatchStatus)) + .collect(Collectors.toList()); + nodeInfo.setJobBatchList( + JobBatchResponseVOConverter.INSTANCE.convertListToJobBatchList(jobTaskBatches)); + } else { + JobBatchResponseVO jobBatchResponseVO = new JobBatchResponseVO(); + JobTaskConfig jobTask = nodeInfo.getJobTask(); + if (Objects.nonNull(jobTask)) { + jobBatchResponseVO.setJobId(jobTask.getJobId()); + } + // 只为前端展示提供 + nodeInfo.setTaskBatchStatus(NOT_HANDLE_STATUS); + jobBatchResponseVO.setTaskBatchStatus(NOT_HANDLE_STATUS); + jobBatchResponseVO.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason()); + nodeInfo.setJobBatchList(Lists.newArrayList(jobBatchResponseVO)); } - // 只为前端展示提供 - nodeInfo.setTaskBatchStatus(NOT_HANDLE_STATUS); - jobBatchResponseVO.setTaskBatchStatus(NOT_HANDLE_STATUS); - jobBatchResponseVO.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason()); - nodeInfo.setJobBatchList(Lists.newArrayList(jobBatchResponseVO)); } + try { // 反序列化构建图 WorkflowDetailResponseVO.NodeConfig config = workflowHandler.buildNodeConfig(graph, SystemConstants.ROOT, - new HashMap<>(), workflowNodeMap); + new HashMap<>(), workflowNodeMap); responseVO.setNodeConfig(config); } catch (Exception e) { log.error("反序列化失败. json:[{}]", flowInfo, e);