feat: 2.6.0

1. 优化查询DAG详情
This commit is contained in:
byteblogs168 2023-12-19 00:03:12 +08:00
parent 594013f65b
commit 9c99330adb
6 changed files with 119 additions and 50 deletions

View File

@ -0,0 +1,29 @@
package com.aizuda.easy.retry.server.common.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author xiaowoniu
* @date 2023-12-15 22:42:17
* @since 2.6.0
*/
@Getter
@AllArgsConstructor
public enum WorkflowNodeType {
TASK(1, "任务节点"),
Condition(1, "条件节点"),
callback(1, "回调节点");
private final Integer nodeType;
private final String desc;
public static WorkflowNodeType get(Integer nodeType) {
for (WorkflowNodeType workflowNodeType : WorkflowNodeType.values()) {
if (workflowNodeType.nodeType.equals(nodeType)) {
return workflowNodeType;
}
}
return null;
}
}

View File

@ -1,33 +1,19 @@
package com.aizuda.easy.retry.server.web.controller;
import cn.hutool.core.util.ReUtil;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.web.annotation.LoginRequired;
import com.aizuda.easy.retry.server.web.annotation.LoginUser;
import com.aizuda.easy.retry.server.web.annotation.RoleEnum;
import com.aizuda.easy.retry.server.web.model.base.PageResult;
import com.aizuda.easy.retry.server.web.model.request.GroupConfigQueryVO;
import com.aizuda.easy.retry.server.web.model.request.GroupConfigRequestVO;
import com.aizuda.easy.retry.server.web.model.request.UserSessionVO;
import com.aizuda.easy.retry.server.web.model.response.GroupConfigResponseVO;
import com.aizuda.easy.retry.server.web.annotation.LoginRequired;
import com.aizuda.easy.retry.server.web.annotation.RoleEnum;
import com.aizuda.easy.retry.server.web.service.GroupConfigService;
import com.aizuda.easy.retry.template.datasource.enums.DbTypeEnum;
import com.google.common.collect.Lists;
import com.zaxxer.hikari.HikariDataSource;
import org.jetbrains.annotations.NotNull;
import org.slf4j.helpers.MessageFormatter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.text.MessageFormat;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* 重试组接口

View File

@ -41,8 +41,9 @@ public class WorkflowController {
}
@PutMapping
public void updateWorkflow() {
@LoginRequired(role = RoleEnum.USER)
public Boolean updateWorkflow(@RequestBody @Validated WorkflowRequestVO workflowRequestVO) {
return workflowService.updateWorkflow(workflowRequestVO);
}
@GetMapping("{id}")

View File

@ -17,6 +17,8 @@ import java.util.List;
@Data
public class WorkflowRequestVO {
private Long id;
@NotBlank(message = "组名称不能为空")
@Pattern(regexp = "^[A-Za-z0-9_]{1,64}$", message = "仅支持长度为1~64字符且类型为数字、字母和下划线")
private String groupName;

View File

@ -21,4 +21,6 @@ public interface WorkflowService {
WorkflowDetailResponseVO getWorkflowDetail(Long id) throws IOException;
PageResult<List<WorkflowResponseVO>> listPage(WorkflowQueryVO queryVO);
Boolean updateWorkflow(WorkflowRequestVO workflowRequestVO);
}

View File

@ -11,16 +11,13 @@ import com.aizuda.easy.retry.server.web.model.request.WorkflowQueryVO;
import com.aizuda.easy.retry.server.web.model.request.WorkflowRequestVO;
import com.aizuda.easy.retry.server.web.model.request.WorkflowRequestVO.NodeConfig;
import com.aizuda.easy.retry.server.web.model.request.WorkflowRequestVO.NodeInfo;
import com.aizuda.easy.retry.server.web.model.response.JobResponseVO;
import com.aizuda.easy.retry.server.web.model.response.WorkflowDetailResponseVO;
import com.aizuda.easy.retry.server.web.model.response.WorkflowResponseVO;
import com.aizuda.easy.retry.server.web.service.WorkflowService;
import com.aizuda.easy.retry.server.web.service.convert.JobResponseVOConverter;
import com.aizuda.easy.retry.server.web.service.convert.WorkflowConverter;
import com.aizuda.easy.retry.server.web.util.UserSessionUtils;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@ -28,6 +25,7 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.graph.GraphBuilder;
import com.google.common.graph.MutableGraph;
import lombok.RequiredArgsConstructor;
@ -101,8 +99,9 @@ public class WorkflowServiceImpl implements WorkflowService {
String flowInfo = workflow.getFlowInfo();
try {
MutableGraph<Long> graph = deserializeJsonToGraph(flowInfo);
// 反序列化构建图
WorkflowDetailResponseVO.NodeConfig config = deserializeJsonToGraph(flowInfo, workflowNodeMap);
WorkflowDetailResponseVO.NodeConfig config = buildNodeConfig(graph, root, new HashMap<>(), workflowNodeMap);
responseVO.setNodeConfig(config);
} catch (Exception e) {
log.error("反序列化失败. json:[{}]", flowInfo, e);
@ -128,50 +127,100 @@ public class WorkflowServiceImpl implements WorkflowService {
return new PageResult<>(pageDTO, jobResponseList);
}
@Override
public Boolean updateWorkflow(WorkflowRequestVO workflowRequestVO) {
Assert.notNull(workflowRequestVO.getId(), () -> new EasyRetryServerException("工作流ID不能为空"));
Assert.isTrue(workflowMapper.selectCount(new LambdaQueryWrapper<Workflow>()
.eq(Workflow::getId, workflowRequestVO.getId())) > 0,
() -> new EasyRetryServerException("工作流不存在"));
MutableGraph<Long> graph = GraphBuilder.directed().allowsSelfLoops(false).build();
// 添加虚拟头节点
graph.addNode(root);
// 获取DAG节点配置
NodeConfig nodeConfig = workflowRequestVO.getNodeConfig();
// 递归构建图
buildGraph(Lists.newArrayList(root), workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph);
log.info("图构建完成. graph:[{}]", graph);
// 保存图信息
Workflow workflow = new Workflow();
workflow.setId(workflowRequestVO.getId());
workflow.setFlowInfo(JsonUtil.toJsonString(convertGraphToAdjacencyList(graph)));
Assert.isTrue(workflowMapper.updateById(workflow) > 0, () -> new EasyRetryServerException("更新失败"));
return Boolean.TRUE;
}
// 从JSON反序列化为Guava图
private static WorkflowDetailResponseVO.NodeConfig deserializeJsonToGraph(String jsonGraph,
Map<Long, WorkflowDetailResponseVO.NodeInfo> workflowNodeMap) throws IOException {
private static MutableGraph<Long> deserializeJsonToGraph(String jsonGraph) throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
// 将JSON字符串转换为Map<String, Iterable<String>>
// 将JSON字符串转换为Map<Long, Iterable<Long>>
Map<Long, Iterable<Long>> adjacencyList = objectMapper.readValue(
jsonGraph, new TypeReference<Map<Long, Iterable<Long>>>() {});
Map<Long, WorkflowDetailResponseVO.NodeConfig> configMap = new HashMap<>();
WorkflowDetailResponseVO.NodeConfig rootConfig = new WorkflowDetailResponseVO.NodeConfig();
jsonGraph, new TypeReference<Map<Long, Iterable<Long>>>() {
});
// 创建Guava图并添加节点和边
MutableGraph<Long> graph = GraphBuilder.directed().build();
for (Map.Entry<Long, Iterable<Long>> entry : adjacencyList.entrySet()) {
Long node = entry.getKey();
Iterable<Long> successors = entry.getValue();
WorkflowDetailResponseVO.NodeConfig previousConfig = configMap.getOrDefault(node, new WorkflowDetailResponseVO.NodeConfig());
WorkflowDetailResponseVO.NodeConfig currentConfig = new WorkflowDetailResponseVO.NodeConfig();
graph.addNode(node);
for (Long successor : successors) {
WorkflowDetailResponseVO.NodeInfo nodeInfo = workflowNodeMap.get(successor);
// 第一层节点
if (node == root) {
rootConfig.setNodeType(nodeInfo.getNodeType());
List<WorkflowDetailResponseVO.NodeInfo> nodeInfos = Optional.ofNullable(
rootConfig.getConditionNodes()).orElse(Lists.newArrayList());
nodeInfos.add(nodeInfo);
rootConfig.setConditionNodes(nodeInfos);
configMap.put(nodeInfo.getId(), rootConfig);
} else {
currentConfig.setNodeType(nodeInfo.getNodeType());
List<WorkflowDetailResponseVO.NodeInfo> nodeInfos = Optional.ofNullable(
currentConfig.getConditionNodes()).orElse(Lists.newArrayList());
nodeInfos.add(nodeInfo);
currentConfig.setConditionNodes(nodeInfos);
configMap.put(nodeInfo.getId(), currentConfig);
previousConfig.setChildNode(currentConfig);
}
graph.putEdge(node, successor);
}
}
return rootConfig;
return graph;
}
private WorkflowDetailResponseVO.NodeConfig buildNodeConfig(MutableGraph<Long> graph,
Long parentId,
Map<Long, WorkflowDetailResponseVO.NodeConfig> nodeConfigMap,
Map<Long, WorkflowDetailResponseVO.NodeInfo> workflowNodeMap) {
Set<Long> successors = graph.successors(parentId);
if (CollectionUtils.isEmpty(successors)) {
return null;
}
WorkflowDetailResponseVO.NodeInfo previousNodeInfo = workflowNodeMap.get(parentId);
WorkflowDetailResponseVO.NodeConfig currentConfig = new WorkflowDetailResponseVO.NodeConfig();
currentConfig.setConditionNodes(Lists.newArrayList());
boolean mount = false;
for (Long successor : successors) {
Set<Long> predecessors = graph.predecessors(successor);
WorkflowDetailResponseVO.NodeInfo nodeInfo = workflowNodeMap.get(successor);
currentConfig.setNodeType(nodeInfo.getNodeType());
currentConfig.getConditionNodes().add(nodeInfo);
nodeConfigMap.put(successor, currentConfig);
if (predecessors.size() >= 2) {
WorkflowDetailResponseVO.NodeConfig parentNodeConfig = nodeConfigMap.get(new ArrayList<>(predecessors).get(0));
parentNodeConfig.setChildNode(currentConfig);
mount = false;
} else {
mount = true;
}
buildNodeConfig(graph, successor, nodeConfigMap, workflowNodeMap);
}
if (parentId != root && mount) {
previousNodeInfo.setChildNode(currentConfig);
}
return currentConfig;
}
private Map<Long, Iterable<Long>> convertGraphToAdjacencyList(MutableGraph<Long> graph) {
Map<Long, Iterable<Long>> adjacencyList = new HashMap<>();