From 87dd06d34fe7e3889e6a09c8d273b9abe2527b82 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Fri, 29 Dec 2023 15:33:27 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.6.0=201.=20=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E5=B7=A5=E4=BD=9C=E6=B5=81=E6=96=B0=E5=A2=9E=E5=92=8C=E8=AF=A6?= =?UTF-8?q?=E6=83=85=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/register/scan/RetryableScanner.java | 4 +- .../web/service/impl/WorkflowServiceImpl.java | 89 ++++++++++++++----- 2 files changed, 70 insertions(+), 23 deletions(-) diff --git a/easy-retry-client/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/register/scan/RetryableScanner.java b/easy-retry-client/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/register/scan/RetryableScanner.java index 24395410..0df855fe 100644 --- a/easy-retry-client/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/register/scan/RetryableScanner.java +++ b/easy-retry-client/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/register/scan/RetryableScanner.java @@ -9,7 +9,7 @@ import com.aizuda.easy.retry.client.core.retryer.RetryerInfo; import com.aizuda.easy.retry.client.core.strategy.ExecutorMethod; import com.aizuda.easy.retry.common.core.log.LogUtils; import lombok.extern.slf4j.Slf4j; -import org.springframework.aop.support.AopUtils; +import org.springframework.aop.framework.AopProxyUtils; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; @@ -76,7 +76,7 @@ public class RetryableScanner implements Scanner, ApplicationContextAware { Class[] include = retryable.include(); Class[] exclude = retryable.exclude(); - Class executorNotProxy = AopUtils.getTargetClass(executor); + Class executorNotProxy = AopProxyUtils.ultimateTargetClass(executor); String executorClassName = executorNotProxy.getName(); Class idempotentIdGenerate = retryable.idempotentId(); String bizNo = retryable.bizNo(); diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java index 775a46a0..6b8777d9 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java @@ -46,6 +46,7 @@ import org.springframework.util.CollectionUtils; import java.io.IOException; import java.util.*; +import java.util.concurrent.LinkedBlockingDeque; import java.util.stream.Collectors; /** @@ -57,6 +58,7 @@ import java.util.stream.Collectors; @Slf4j @RequiredArgsConstructor public class WorkflowServiceImpl implements WorkflowService { + private final WorkflowMapper workflowMapper; private final WorkflowNodeMapper workflowNodeMapper; private final SystemProperties systemProperties; @@ -64,7 +66,7 @@ public class WorkflowServiceImpl implements WorkflowService { @Override @Transactional public boolean saveWorkflow(WorkflowRequestVO workflowRequestVO) { - + log.info("保存工作流信息:{}", JsonUtil.toJsonString(workflowRequestVO)); MutableGraph graph = GraphBuilder.directed().allowsSelfLoops(false).build(); // 添加虚拟头节点 graph.addNode(SystemConstants.ROOT); @@ -82,7 +84,8 @@ public class WorkflowServiceImpl implements WorkflowService { NodeConfig nodeConfig = workflowRequestVO.getNodeConfig(); // 递归构建图 - buildGraph(Lists.newArrayList(SystemConstants.ROOT), workflowRequestVO.getGroupName(), workflow.getId(), nodeConfig, graph); + buildGraph(Lists.newArrayList(SystemConstants.ROOT), new LinkedBlockingDeque<>(), + workflowRequestVO.getGroupName(), workflow.getId(), nodeConfig, graph); log.info("图构建完成. graph:[{}]", graph); // 保存图信息 @@ -99,6 +102,7 @@ public class WorkflowServiceImpl implements WorkflowService { waitStrategyContext.setNextTriggerAt(time); return waitStrategy.computeTriggerTime(waitStrategyContext); } + @Override public WorkflowDetailResponseVO getWorkflowDetail(Long id) throws IOException { @@ -109,18 +113,21 @@ public class WorkflowServiceImpl implements WorkflowService { WorkflowDetailResponseVO responseVO = WorkflowConverter.INSTANCE.toWorkflowDetailResponseVO(workflow); List workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper() - .eq(WorkflowNode::getDeleted, 0) - .eq(WorkflowNode::getWorkflowId, id)); + .eq(WorkflowNode::getDeleted, 0) + .eq(WorkflowNode::getWorkflowId, id) + .orderByAsc(WorkflowNode::getPriorityLevel)); List nodeInfos = WorkflowConverter.INSTANCE.toNodeInfo(workflowNodes); - Map workflowNodeMap = nodeInfos.stream().collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, i -> i)); + Map workflowNodeMap = nodeInfos.stream() + .collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, i -> i)); String flowInfo = workflow.getFlowInfo(); try { MutableGraph graph = GraphUtils.deserializeJsonToGraph(flowInfo); // 反序列化构建图 - WorkflowDetailResponseVO.NodeConfig config = buildNodeConfig(graph, SystemConstants.ROOT, new HashMap<>(), workflowNodeMap); + WorkflowDetailResponseVO.NodeConfig config = buildNodeConfig(graph, SystemConstants.ROOT, new HashMap<>(), + workflowNodeMap); responseVO.setNodeConfig(config); } catch (Exception e) { log.error("反序列化失败. json:[{}]", flowInfo, e); @@ -152,8 +159,8 @@ public class WorkflowServiceImpl implements WorkflowService { Assert.notNull(workflowRequestVO.getId(), () -> new EasyRetryServerException("工作流ID不能为空")); Assert.isTrue(workflowMapper.selectCount(new LambdaQueryWrapper() - .eq(Workflow::getId, workflowRequestVO.getId())) > 0, - () -> new EasyRetryServerException("工作流不存在")); + .eq(Workflow::getId, workflowRequestVO.getId())) > 0, + () -> new EasyRetryServerException("工作流不存在")); MutableGraph graph = GraphBuilder.directed().allowsSelfLoops(false).build(); // 添加虚拟头节点 @@ -163,7 +170,8 @@ public class WorkflowServiceImpl implements WorkflowService { NodeConfig nodeConfig = workflowRequestVO.getNodeConfig(); // 递归构建图 - buildGraph(Lists.newArrayList(SystemConstants.ROOT), workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph); + buildGraph(Lists.newArrayList(SystemConstants.ROOT), new LinkedBlockingDeque<>(), + workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph); log.info("图构建完成. graph:[{}]", graph); @@ -177,9 +185,9 @@ public class WorkflowServiceImpl implements WorkflowService { } private WorkflowDetailResponseVO.NodeConfig buildNodeConfig(MutableGraph graph, - Long parentId, - Map nodeConfigMap, - Map workflowNodeMap) { + Long parentId, + Map nodeConfigMap, + Map workflowNodeMap) { Set successors = graph.successors(parentId); if (CollectionUtils.isEmpty(successors)) { @@ -201,7 +209,21 @@ public class WorkflowServiceImpl implements WorkflowService { nodeConfigMap.put(successor, currentConfig); if (predecessors.size() >= 2) { - WorkflowDetailResponseVO.NodeConfig parentNodeConfig = nodeConfigMap.get(new ArrayList<>(predecessors).get(0)); + // 查找predecessors的公共祖先节点 + Map> sets = new HashMap<>(); + for (final Long predecessor : predecessors) { + Set set = Sets.newHashSet(); + sets.put(predecessor, set); + findCommonAncestor(predecessor, set, graph); + } + + Set intersection = sets.values().stream().findFirst().get(); + for (final Set value : sets.values()) { + intersection = Sets.intersection(value, intersection); + } + + Long commonAncestor = new ArrayList<>(intersection).get(intersection.size() - 1); + WorkflowDetailResponseVO.NodeConfig parentNodeConfig = nodeConfigMap.get(graph.successors(commonAncestor).stream().findFirst().get()); parentNodeConfig.setChildNode(currentConfig); mount = false; } else { @@ -218,7 +240,20 @@ public class WorkflowServiceImpl implements WorkflowService { return currentConfig; } - public void buildGraph(List parentIds, String groupName, Long workflowId, NodeConfig nodeConfig, MutableGraph graph) { + private void findCommonAncestor(Long predecessor, Set set, MutableGraph graph) { + + Set predecessors = graph.predecessors(predecessor); + if (CollectionUtils.isEmpty(predecessors)) { + return; + } + + set.addAll(predecessors); + + findCommonAncestor(new ArrayList<>(predecessors).get(0), set, graph); + } + + public void buildGraph(List parentIds, LinkedBlockingDeque deque, String groupName, Long workflowId, + NodeConfig nodeConfig, MutableGraph graph) { if (Objects.isNull(nodeConfig)) { return; @@ -226,7 +261,6 @@ public class WorkflowServiceImpl implements WorkflowService { // 获取节点信息 List conditionNodes = nodeConfig.getConditionNodes(); - List parentIds1 = Lists.newArrayList(); if (!CollectionUtils.isEmpty(conditionNodes)) { for (final NodeInfo nodeInfo : conditionNodes) { WorkflowNode workflowNode = WorkflowConverter.INSTANCE.toWorkflowNode(nodeInfo); @@ -237,21 +271,34 @@ public class WorkflowServiceImpl implements WorkflowService { workflowNode.setJobId(SystemConstants.CONDITION_JOB_ID); } - Assert.isTrue(1 == workflowNodeMapper.insert(workflowNode), () -> new EasyRetryServerException("新增工作流节点失败")); + Assert.isTrue(1 == workflowNodeMapper.insert(workflowNode), + () -> new EasyRetryServerException("新增工作流节点失败")); // 添加节点 graph.addNode(workflowNode.getId()); for (final Long parentId : parentIds) { // 添加边 graph.putEdge(parentId, workflowNode.getId()); } - parentIds1.add(workflowNode.getId()); - log.warn("workflowNodeId:[{}] parentIds1: [{}] parentIds:[{}]", - workflowNode.getId(), JsonUtil.toJsonString(parentIds1),JsonUtil.toJsonString(parentIds)); - buildGraph(Lists.newArrayList(workflowNode.getId()), groupName, workflowId, nodeInfo.getChildNode(), graph); + log.warn("workflowNodeId:[{}] parentIds:[{}]", + workflowNode.getId(), JsonUtil.toJsonString(parentIds)); + NodeConfig childNode = nodeInfo.getChildNode(); + if (Objects.nonNull(childNode) && !CollectionUtils.isEmpty(childNode.getConditionNodes())) { + buildGraph(Lists.newArrayList(workflowNode.getId()), deque, groupName, workflowId, childNode, + graph); + } else { + // 叶子节点记录一下 + deque.add(workflowNode.getId()); + } } } - buildGraph(parentIds1, groupName, workflowId, nodeConfig.getChildNode(), graph); + NodeConfig childNode = nodeConfig.getChildNode(); + if (Objects.nonNull(childNode) && !CollectionUtils.isEmpty(childNode.getConditionNodes())) { + // 应该是conditionNodes里面叶子节点的选择 + List list = Lists.newArrayList(); + deque.drainTo(list); + buildGraph(list, deque, groupName, workflowId, childNode, graph); + } }