feat: 2.6.0

1. 修复工作流新增和详情问题
This commit is contained in:
byteblogs168 2023-12-29 15:33:27 +08:00
parent 0fba221787
commit 87dd06d34f
2 changed files with 70 additions and 23 deletions

View File

@ -9,7 +9,7 @@ import com.aizuda.easy.retry.client.core.retryer.RetryerInfo;
import com.aizuda.easy.retry.client.core.strategy.ExecutorMethod;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.support.AopUtils;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
@ -76,7 +76,7 @@ public class RetryableScanner implements Scanner, ApplicationContextAware {
Class<? extends Throwable>[] include = retryable.include();
Class<? extends Throwable>[] exclude = retryable.exclude();
Class executorNotProxy = AopUtils.getTargetClass(executor);
Class executorNotProxy = AopProxyUtils.ultimateTargetClass(executor);
String executorClassName = executorNotProxy.getName();
Class<? extends IdempotentIdGenerate> idempotentIdGenerate = retryable.idempotentId();
String bizNo = retryable.bizNo();

View File

@ -46,6 +46,7 @@ import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.stream.Collectors;
/**
@ -57,6 +58,7 @@ import java.util.stream.Collectors;
@Slf4j
@RequiredArgsConstructor
public class WorkflowServiceImpl implements WorkflowService {
private final WorkflowMapper workflowMapper;
private final WorkflowNodeMapper workflowNodeMapper;
private final SystemProperties systemProperties;
@ -64,7 +66,7 @@ public class WorkflowServiceImpl implements WorkflowService {
@Override
@Transactional
public boolean saveWorkflow(WorkflowRequestVO workflowRequestVO) {
log.info("保存工作流信息:{}", JsonUtil.toJsonString(workflowRequestVO));
MutableGraph<Long> graph = GraphBuilder.directed().allowsSelfLoops(false).build();
// 添加虚拟头节点
graph.addNode(SystemConstants.ROOT);
@ -82,7 +84,8 @@ public class WorkflowServiceImpl implements WorkflowService {
NodeConfig nodeConfig = workflowRequestVO.getNodeConfig();
// 递归构建图
buildGraph(Lists.newArrayList(SystemConstants.ROOT), workflowRequestVO.getGroupName(), workflow.getId(), nodeConfig, graph);
buildGraph(Lists.newArrayList(SystemConstants.ROOT), new LinkedBlockingDeque<>(),
workflowRequestVO.getGroupName(), workflow.getId(), nodeConfig, graph);
log.info("图构建完成. graph:[{}]", graph);
// 保存图信息
@ -99,6 +102,7 @@ public class WorkflowServiceImpl implements WorkflowService {
waitStrategyContext.setNextTriggerAt(time);
return waitStrategy.computeTriggerTime(waitStrategyContext);
}
@Override
public WorkflowDetailResponseVO getWorkflowDetail(Long id) throws IOException {
@ -109,18 +113,21 @@ public class WorkflowServiceImpl implements WorkflowService {
WorkflowDetailResponseVO responseVO = WorkflowConverter.INSTANCE.toWorkflowDetailResponseVO(workflow);
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
.eq(WorkflowNode::getDeleted, 0)
.eq(WorkflowNode::getWorkflowId, id));
.eq(WorkflowNode::getDeleted, 0)
.eq(WorkflowNode::getWorkflowId, id)
.orderByAsc(WorkflowNode::getPriorityLevel));
List<WorkflowDetailResponseVO.NodeInfo> nodeInfos = WorkflowConverter.INSTANCE.toNodeInfo(workflowNodes);
Map<Long, WorkflowDetailResponseVO.NodeInfo> workflowNodeMap = nodeInfos.stream().collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, i -> i));
Map<Long, WorkflowDetailResponseVO.NodeInfo> workflowNodeMap = nodeInfos.stream()
.collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, i -> i));
String flowInfo = workflow.getFlowInfo();
try {
MutableGraph<Long> graph = GraphUtils.deserializeJsonToGraph(flowInfo);
// 反序列化构建图
WorkflowDetailResponseVO.NodeConfig config = buildNodeConfig(graph, SystemConstants.ROOT, new HashMap<>(), workflowNodeMap);
WorkflowDetailResponseVO.NodeConfig config = buildNodeConfig(graph, SystemConstants.ROOT, new HashMap<>(),
workflowNodeMap);
responseVO.setNodeConfig(config);
} catch (Exception e) {
log.error("反序列化失败. json:[{}]", flowInfo, e);
@ -152,8 +159,8 @@ public class WorkflowServiceImpl implements WorkflowService {
Assert.notNull(workflowRequestVO.getId(), () -> new EasyRetryServerException("工作流ID不能为空"));
Assert.isTrue(workflowMapper.selectCount(new LambdaQueryWrapper<Workflow>()
.eq(Workflow::getId, workflowRequestVO.getId())) > 0,
() -> new EasyRetryServerException("工作流不存在"));
.eq(Workflow::getId, workflowRequestVO.getId())) > 0,
() -> new EasyRetryServerException("工作流不存在"));
MutableGraph<Long> graph = GraphBuilder.directed().allowsSelfLoops(false).build();
// 添加虚拟头节点
@ -163,7 +170,8 @@ public class WorkflowServiceImpl implements WorkflowService {
NodeConfig nodeConfig = workflowRequestVO.getNodeConfig();
// 递归构建图
buildGraph(Lists.newArrayList(SystemConstants.ROOT), workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph);
buildGraph(Lists.newArrayList(SystemConstants.ROOT), new LinkedBlockingDeque<>(),
workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph);
log.info("图构建完成. graph:[{}]", graph);
@ -177,9 +185,9 @@ public class WorkflowServiceImpl implements WorkflowService {
}
private WorkflowDetailResponseVO.NodeConfig buildNodeConfig(MutableGraph<Long> graph,
Long parentId,
Map<Long, WorkflowDetailResponseVO.NodeConfig> nodeConfigMap,
Map<Long, WorkflowDetailResponseVO.NodeInfo> workflowNodeMap) {
Long parentId,
Map<Long, WorkflowDetailResponseVO.NodeConfig> nodeConfigMap,
Map<Long, WorkflowDetailResponseVO.NodeInfo> workflowNodeMap) {
Set<Long> successors = graph.successors(parentId);
if (CollectionUtils.isEmpty(successors)) {
@ -201,7 +209,21 @@ public class WorkflowServiceImpl implements WorkflowService {
nodeConfigMap.put(successor, currentConfig);
if (predecessors.size() >= 2) {
WorkflowDetailResponseVO.NodeConfig parentNodeConfig = nodeConfigMap.get(new ArrayList<>(predecessors).get(0));
// 查找predecessors的公共祖先节点
Map<Long, Set<Long>> sets = new HashMap<>();
for (final Long predecessor : predecessors) {
Set<Long> set = Sets.newHashSet();
sets.put(predecessor, set);
findCommonAncestor(predecessor, set, graph);
}
Set<Long> intersection = sets.values().stream().findFirst().get();
for (final Set<Long> value : sets.values()) {
intersection = Sets.intersection(value, intersection);
}
Long commonAncestor = new ArrayList<>(intersection).get(intersection.size() - 1);
WorkflowDetailResponseVO.NodeConfig parentNodeConfig = nodeConfigMap.get(graph.successors(commonAncestor).stream().findFirst().get());
parentNodeConfig.setChildNode(currentConfig);
mount = false;
} else {
@ -218,7 +240,20 @@ public class WorkflowServiceImpl implements WorkflowService {
return currentConfig;
}
public void buildGraph(List<Long> parentIds, String groupName, Long workflowId, NodeConfig nodeConfig, MutableGraph<Long> graph) {
private void findCommonAncestor(Long predecessor, Set<Long> set, MutableGraph<Long> graph) {
Set<Long> predecessors = graph.predecessors(predecessor);
if (CollectionUtils.isEmpty(predecessors)) {
return;
}
set.addAll(predecessors);
findCommonAncestor(new ArrayList<>(predecessors).get(0), set, graph);
}
public void buildGraph(List<Long> parentIds, LinkedBlockingDeque<Long> deque, String groupName, Long workflowId,
NodeConfig nodeConfig, MutableGraph<Long> graph) {
if (Objects.isNull(nodeConfig)) {
return;
@ -226,7 +261,6 @@ public class WorkflowServiceImpl implements WorkflowService {
// 获取节点信息
List<NodeInfo> conditionNodes = nodeConfig.getConditionNodes();
List<Long> parentIds1 = Lists.newArrayList();
if (!CollectionUtils.isEmpty(conditionNodes)) {
for (final NodeInfo nodeInfo : conditionNodes) {
WorkflowNode workflowNode = WorkflowConverter.INSTANCE.toWorkflowNode(nodeInfo);
@ -237,21 +271,34 @@ public class WorkflowServiceImpl implements WorkflowService {
workflowNode.setJobId(SystemConstants.CONDITION_JOB_ID);
}
Assert.isTrue(1 == workflowNodeMapper.insert(workflowNode), () -> new EasyRetryServerException("新增工作流节点失败"));
Assert.isTrue(1 == workflowNodeMapper.insert(workflowNode),
() -> new EasyRetryServerException("新增工作流节点失败"));
// 添加节点
graph.addNode(workflowNode.getId());
for (final Long parentId : parentIds) {
// 添加边
graph.putEdge(parentId, workflowNode.getId());
}
parentIds1.add(workflowNode.getId());
log.warn("workflowNodeId:[{}] parentIds1: [{}] parentIds:[{}]",
workflowNode.getId(), JsonUtil.toJsonString(parentIds1),JsonUtil.toJsonString(parentIds));
buildGraph(Lists.newArrayList(workflowNode.getId()), groupName, workflowId, nodeInfo.getChildNode(), graph);
log.warn("workflowNodeId:[{}] parentIds:[{}]",
workflowNode.getId(), JsonUtil.toJsonString(parentIds));
NodeConfig childNode = nodeInfo.getChildNode();
if (Objects.nonNull(childNode) && !CollectionUtils.isEmpty(childNode.getConditionNodes())) {
buildGraph(Lists.newArrayList(workflowNode.getId()), deque, groupName, workflowId, childNode,
graph);
} else {
// 叶子节点记录一下
deque.add(workflowNode.getId());
}
}
}
buildGraph(parentIds1, groupName, workflowId, nodeConfig.getChildNode(), graph);
NodeConfig childNode = nodeConfig.getChildNode();
if (Objects.nonNull(childNode) && !CollectionUtils.isEmpty(childNode.getConditionNodes())) {
// 应该是conditionNodes里面叶子节点的选择
List<Long> list = Lists.newArrayList();
deque.drainTo(list);
buildGraph(list, deque, groupName, workflowId, childNode, graph);
}
}