feat: 2.6.0

1. 完成DAG模型匹配前端
This commit is contained in:
byteblogs168 2023-12-14 18:15:10 +08:00
parent bd178c0f09
commit 1a60c0feca
4 changed files with 77 additions and 33 deletions

View File

@ -450,7 +450,7 @@ CREATE TABLE `workflow`
`trigger_interval` varchar(255) NOT NULL COMMENT '间隔时长', `trigger_interval` varchar(255) NOT NULL COMMENT '间隔时长',
`execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间', `execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间',
`executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒', `executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',
`flow_info` JSON NOT NULL COMMENT '流程信息', `flow_info` text DEFAULT NULL COMMENT '流程信息',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除', `deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
@ -466,14 +466,15 @@ CREATE TABLE `workflow_node`
( (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id', `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`node_name` varchar(64) NOT NULL COMMENT '节点名称',
`group_name` varchar(64) NOT NULL COMMENT '组名称', `group_name` varchar(64) NOT NULL COMMENT '组名称',
`job_id` bigint(20) NOT NULL COMMENT '任务信息id', `job_id` bigint(20) NOT NULL DEFAULT -1 COMMENT '任务信息id',
`workflow_id` bigint(20) NOT NULL COMMENT '工作流ID', `workflow_id` bigint(20) NOT NULL COMMENT '工作流ID',
`node_type` tinyint(4) NOT NULL DEFAULT 1 COMMENT '1、任务节点 2、条件节点', `node_type` tinyint(4) NOT NULL DEFAULT 1 COMMENT '1、任务节点 2、条件节点',
`expression_type` tinyint(4) NOT NULL DEFAULT 0 COMMENT '1、SpEl、2、Aviator 3、QL', `expression_type` tinyint(4) NOT NULL DEFAULT 0 COMMENT '1、SpEl、2、Aviator 3、QL',
`fail_strategy` tinyint(4) NOT NULL DEFAULT 0 COMMENT '失败策略 1、跳过 2、阻塞', `fail_strategy` tinyint(4) NOT NULL DEFAULT 0 COMMENT '失败策略 1、跳过 2、阻塞',
`workflow_node_status` tinyint(4) NOT NULL DEFAULT 1 COMMENT '工作流节点状态 0、关闭、1、开启', `workflow_node_status` tinyint(4) NOT NULL DEFAULT 1 COMMENT '工作流节点状态 0、关闭、1、开启',
`node_expression` text NOT NULL COMMENT '节点表达式', `node_expression` text DEFAULT NULL COMMENT '节点表达式',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除', `deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',

View File

@ -34,6 +34,11 @@ public class WorkflowNode implements Serializable {
*/ */
private String namespaceId; private String namespaceId;
/**
* 节点名称
*/
private String nodeName;
/** /**
* 组名称 * 组名称
*/ */
@ -52,22 +57,22 @@ public class WorkflowNode implements Serializable {
/** /**
* 1任务节点 2条件节点 * 1任务节点 2条件节点
*/ */
private Byte nodeType; private Integer nodeType;
/** /**
* 1SpEl2Aviator 3QL * 1SpEl2Aviator 3QL
*/ */
private Byte expressionType; private Integer expressionType;
/** /**
* 失败策略 1跳过 2阻塞 * 失败策略 1跳过 2阻塞
*/ */
private Byte failStrategy; private Integer failStrategy;
/** /**
* 工作流节点状态 0关闭1开启 * 工作流节点状态 0关闭1开启
*/ */
private Byte workflowNodeStatus; private Integer workflowNodeStatus;
/** /**
* 节点表达式 * 节点表达式
@ -87,7 +92,7 @@ public class WorkflowNode implements Serializable {
/** /**
* 逻辑删除 1删除 * 逻辑删除 1删除
*/ */
private Byte deleted; private Integer deleted;
/** /**
* 扩展字段 * 扩展字段

View File

@ -36,16 +36,13 @@ public class WorkflowRequestVO {
@NotNull(message = "工作流状态") @NotNull(message = "工作流状态")
private Integer workflowStatus; private Integer workflowStatus;
@NotEmpty(message = "节点信息不能为空") /**
private List<NodeInfo> nodeInfos; * DAG节点配置
*/
private NodeConfig nodeConfig;
@Data @Data
public static class NodeInfo { public static class NodeConfig {
/**
* 优先级
*/
private Integer priorityLevel;
/** /**
* 1任务节点 2条件节点 * 1任务节点 2条件节点
@ -53,6 +50,31 @@ public class WorkflowRequestVO {
@NotNull(message = "节点类型不能为空 ") @NotNull(message = "节点类型不能为空 ")
private Integer nodeType; private Integer nodeType;
/**
* 节点信息
*/
private List<NodeInfo> conditionNodes;
/**
* 子节点
*/
private NodeConfig childNode;
}
@Data
public static class NodeInfo {
/**
* 节点名称
*/
private String nodeName;
/**
* 优先级
*/
private Integer priorityLevel;
/** /**
* 任务ID * 任务ID
*/ */
@ -85,7 +107,7 @@ public class WorkflowRequestVO {
/** /**
* 子节点 * 子节点
*/ */
private List<NodeInfo> childreList; private NodeConfig childNode;
} }

View File

@ -5,6 +5,7 @@ import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.web.model.request.WorkflowRequestVO; import com.aizuda.easy.retry.server.web.model.request.WorkflowRequestVO;
import com.aizuda.easy.retry.server.web.model.request.WorkflowRequestVO.NodeConfig;
import com.aizuda.easy.retry.server.web.model.request.WorkflowRequestVO.NodeInfo; import com.aizuda.easy.retry.server.web.model.request.WorkflowRequestVO.NodeInfo;
import com.aizuda.easy.retry.server.web.service.WorkflowService; import com.aizuda.easy.retry.server.web.service.WorkflowService;
import com.aizuda.easy.retry.server.web.service.convert.WorkflowConverter; import com.aizuda.easy.retry.server.web.service.convert.WorkflowConverter;
@ -13,9 +14,11 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNode
import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow; import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode; import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.graph.ElementOrder;
import com.google.common.graph.GraphBuilder; import com.google.common.graph.GraphBuilder;
import com.google.common.graph.MutableGraph; import com.google.common.graph.MutableGraph;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
@ -23,6 +26,7 @@ import org.springframework.util.CollectionUtils;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
/** /**
* @author xiaowoniu * @author xiaowoniu
@ -30,6 +34,7 @@ import java.util.Map;
* @since 2.6.0 * @since 2.6.0
*/ */
@Service @Service
@Slf4j
@RequiredArgsConstructor @RequiredArgsConstructor
public class WorkflowServiceImpl implements WorkflowService { public class WorkflowServiceImpl implements WorkflowService {
private final WorkflowMapper workflowMapper; private final WorkflowMapper workflowMapper;
@ -39,7 +44,7 @@ public class WorkflowServiceImpl implements WorkflowService {
public boolean saveWorkflow(WorkflowRequestVO workflowRequestVO) { public boolean saveWorkflow(WorkflowRequestVO workflowRequestVO) {
Long root = -1L; Long root = -1L;
MutableGraph<Long> graph = GraphBuilder.directed().build(); MutableGraph<Long> graph = GraphBuilder.directed().allowsSelfLoops(false).build();
// 添加虚拟头节点 // 添加虚拟头节点
graph.addNode(root); graph.addNode(root);
@ -48,12 +53,13 @@ public class WorkflowServiceImpl implements WorkflowService {
workflow.setFlowInfo(StrUtil.EMPTY); workflow.setFlowInfo(StrUtil.EMPTY);
Assert.isTrue(1 == workflowMapper.insert(workflow), () -> new EasyRetryServerException("新增工作流失败")); Assert.isTrue(1 == workflowMapper.insert(workflow), () -> new EasyRetryServerException("新增工作流失败"));
// 组装节点信息 // 获取DAG节点配置
List<NodeInfo> nodeInfos = workflowRequestVO.getNodeInfos(); NodeConfig nodeConfig = workflowRequestVO.getNodeConfig();
// 递归构建图 // 递归构建图
buildGraph(root, workflowRequestVO.getGroupName(), workflow.getId(), nodeInfos, graph); buildGraph(Lists.newArrayList(root), workflowRequestVO.getGroupName(), workflow.getId(), nodeConfig, graph);
log.info("图构建完成. graph:[{}]", graph);
// 保存图信息 // 保存图信息
workflow.setFlowInfo(JsonUtil.toJsonString(convertGraphToAdjacencyList(graph))); workflow.setFlowInfo(JsonUtil.toJsonString(convertGraphToAdjacencyList(graph)));
workflowMapper.updateById(workflow); workflowMapper.updateById(workflow);
@ -71,23 +77,33 @@ public class WorkflowServiceImpl implements WorkflowService {
return adjacencyList; return adjacencyList;
} }
public void buildGraph(Long parentId, String groupName, Long workflowId, List<NodeInfo> nodeInfos, MutableGraph<Long> graph) { public void buildGraph(List<Long> parentIds, String groupName, Long workflowId, NodeConfig nodeConfig, MutableGraph<Long> graph) {
if (CollectionUtils.isEmpty(nodeInfos)) { if (Objects.isNull(nodeConfig)) {
return; return;
} }
for (final NodeInfo nodeInfo : nodeInfos) { // 获取节点信息
WorkflowNode workflowNode = WorkflowConverter.INSTANCE.toWorkflowNode(nodeInfo); List<NodeInfo> conditionNodes = nodeConfig.getConditionNodes();
workflowNode.setWorkflowId(workflowId); List<Long> parentIds1 = Lists.newArrayList();
workflowNode.setGroupName(groupName); if (!CollectionUtils.isEmpty(conditionNodes)) {
Assert.isTrue(1 == workflowNodeMapper.insert(workflowNode), () -> new EasyRetryServerException("新增工作流节点失败")); for (final NodeInfo nodeInfo : conditionNodes) {
// 添加节点 WorkflowNode workflowNode = WorkflowConverter.INSTANCE.toWorkflowNode(nodeInfo);
graph.addNode(workflowNode.getId()); workflowNode.setWorkflowId(workflowId);
// 添加边 workflowNode.setGroupName(groupName);
graph.putEdge(parentId, workflowNode.getId()); Assert.isTrue(1 == workflowNodeMapper.insert(workflowNode), () -> new EasyRetryServerException("新增工作流节点失败"));
buildGraph(workflowNode.getId(), groupName, workflowId, nodeInfo.getChildreList(), graph); // 添加节点
graph.addNode(workflowNode.getId());
for (final Long parentId : parentIds) {
// 添加边
graph.putEdge(parentId, workflowNode.getId());
}
parentIds1.add(workflowNode.getId());
buildGraph(Lists.newArrayList(workflowNode.getId()), groupName, workflowId, nodeInfo.getChildNode(), graph);
}
} }
buildGraph(parentIds1, groupName, workflowId, nodeConfig.getChildNode(), graph);
} }