feat: 2.6.0
1. 完成DAG模型匹配前端
This commit is contained in:
parent
bff0bd9b1d
commit
71a7f14339
@ -450,7 +450,7 @@ CREATE TABLE `workflow`
|
|||||||
`trigger_interval` varchar(255) NOT NULL COMMENT '间隔时长',
|
`trigger_interval` varchar(255) NOT NULL COMMENT '间隔时长',
|
||||||
`execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间',
|
`execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间',
|
||||||
`executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',
|
`executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',
|
||||||
`flow_info` JSON NOT NULL COMMENT '流程信息',
|
`flow_info` text DEFAULT NULL COMMENT '流程信息',
|
||||||
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||||
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||||
`deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
|
`deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
|
||||||
@ -466,14 +466,15 @@ CREATE TABLE `workflow_node`
|
|||||||
(
|
(
|
||||||
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
|
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
|
||||||
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
|
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
|
||||||
|
`node_name` varchar(64) NOT NULL COMMENT '节点名称',
|
||||||
`group_name` varchar(64) NOT NULL COMMENT '组名称',
|
`group_name` varchar(64) NOT NULL COMMENT '组名称',
|
||||||
`job_id` bigint(20) NOT NULL COMMENT '任务信息id',
|
`job_id` bigint(20) NOT NULL DEFAULT -1 COMMENT '任务信息id',
|
||||||
`workflow_id` bigint(20) NOT NULL COMMENT '工作流ID',
|
`workflow_id` bigint(20) NOT NULL COMMENT '工作流ID',
|
||||||
`node_type` tinyint(4) NOT NULL DEFAULT 1 COMMENT '1、任务节点 2、条件节点',
|
`node_type` tinyint(4) NOT NULL DEFAULT 1 COMMENT '1、任务节点 2、条件节点',
|
||||||
`expression_type` tinyint(4) NOT NULL DEFAULT 0 COMMENT '1、SpEl、2、Aviator 3、QL',
|
`expression_type` tinyint(4) NOT NULL DEFAULT 0 COMMENT '1、SpEl、2、Aviator 3、QL',
|
||||||
`fail_strategy` tinyint(4) NOT NULL DEFAULT 0 COMMENT '失败策略 1、跳过 2、阻塞',
|
`fail_strategy` tinyint(4) NOT NULL DEFAULT 0 COMMENT '失败策略 1、跳过 2、阻塞',
|
||||||
`workflow_node_status` tinyint(4) NOT NULL DEFAULT 1 COMMENT '工作流节点状态 0、关闭、1、开启',
|
`workflow_node_status` tinyint(4) NOT NULL DEFAULT 1 COMMENT '工作流节点状态 0、关闭、1、开启',
|
||||||
`node_expression` text NOT NULL COMMENT '节点表达式',
|
`node_expression` text DEFAULT NULL COMMENT '节点表达式',
|
||||||
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||||
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||||
`deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
|
`deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
|
||||||
|
@ -34,6 +34,11 @@ public class WorkflowNode implements Serializable {
|
|||||||
*/
|
*/
|
||||||
private String namespaceId;
|
private String namespaceId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 节点名称
|
||||||
|
*/
|
||||||
|
private String nodeName;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 组名称
|
* 组名称
|
||||||
*/
|
*/
|
||||||
@ -52,22 +57,22 @@ public class WorkflowNode implements Serializable {
|
|||||||
/**
|
/**
|
||||||
* 1、任务节点 2、条件节点
|
* 1、任务节点 2、条件节点
|
||||||
*/
|
*/
|
||||||
private Byte nodeType;
|
private Integer nodeType;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 1、SpEl、2、Aviator 3、QL
|
* 1、SpEl、2、Aviator 3、QL
|
||||||
*/
|
*/
|
||||||
private Byte expressionType;
|
private Integer expressionType;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 失败策略 1、跳过 2、阻塞
|
* 失败策略 1、跳过 2、阻塞
|
||||||
*/
|
*/
|
||||||
private Byte failStrategy;
|
private Integer failStrategy;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 工作流节点状态 0、关闭、1、开启
|
* 工作流节点状态 0、关闭、1、开启
|
||||||
*/
|
*/
|
||||||
private Byte workflowNodeStatus;
|
private Integer workflowNodeStatus;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 节点表达式
|
* 节点表达式
|
||||||
@ -87,7 +92,7 @@ public class WorkflowNode implements Serializable {
|
|||||||
/**
|
/**
|
||||||
* 逻辑删除 1、删除
|
* 逻辑删除 1、删除
|
||||||
*/
|
*/
|
||||||
private Byte deleted;
|
private Integer deleted;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 扩展字段
|
* 扩展字段
|
||||||
|
@ -36,16 +36,13 @@ public class WorkflowRequestVO {
|
|||||||
@NotNull(message = "工作流状态")
|
@NotNull(message = "工作流状态")
|
||||||
private Integer workflowStatus;
|
private Integer workflowStatus;
|
||||||
|
|
||||||
@NotEmpty(message = "节点信息不能为空")
|
/**
|
||||||
private List<NodeInfo> nodeInfos;
|
* DAG节点配置
|
||||||
|
*/
|
||||||
|
private NodeConfig nodeConfig;
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
public static class NodeInfo {
|
public static class NodeConfig {
|
||||||
|
|
||||||
/**
|
|
||||||
* 优先级
|
|
||||||
*/
|
|
||||||
private Integer priorityLevel;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 1、任务节点 2、条件节点
|
* 1、任务节点 2、条件节点
|
||||||
@ -53,6 +50,31 @@ public class WorkflowRequestVO {
|
|||||||
@NotNull(message = "节点类型不能为空 ")
|
@NotNull(message = "节点类型不能为空 ")
|
||||||
private Integer nodeType;
|
private Integer nodeType;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 节点信息
|
||||||
|
*/
|
||||||
|
private List<NodeInfo> conditionNodes;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 子节点
|
||||||
|
*/
|
||||||
|
private NodeConfig childNode;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Data
|
||||||
|
public static class NodeInfo {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 节点名称
|
||||||
|
*/
|
||||||
|
private String nodeName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 优先级
|
||||||
|
*/
|
||||||
|
private Integer priorityLevel;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 任务ID
|
* 任务ID
|
||||||
*/
|
*/
|
||||||
@ -85,7 +107,7 @@ public class WorkflowRequestVO {
|
|||||||
/**
|
/**
|
||||||
* 子节点
|
* 子节点
|
||||||
*/
|
*/
|
||||||
private List<NodeInfo> childreList;
|
private NodeConfig childNode;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5,6 +5,7 @@ import cn.hutool.core.util.StrUtil;
|
|||||||
import com.aizuda.easy.retry.common.core.util.JsonUtil;
|
import com.aizuda.easy.retry.common.core.util.JsonUtil;
|
||||||
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
||||||
import com.aizuda.easy.retry.server.web.model.request.WorkflowRequestVO;
|
import com.aizuda.easy.retry.server.web.model.request.WorkflowRequestVO;
|
||||||
|
import com.aizuda.easy.retry.server.web.model.request.WorkflowRequestVO.NodeConfig;
|
||||||
import com.aizuda.easy.retry.server.web.model.request.WorkflowRequestVO.NodeInfo;
|
import com.aizuda.easy.retry.server.web.model.request.WorkflowRequestVO.NodeInfo;
|
||||||
import com.aizuda.easy.retry.server.web.service.WorkflowService;
|
import com.aizuda.easy.retry.server.web.service.WorkflowService;
|
||||||
import com.aizuda.easy.retry.server.web.service.convert.WorkflowConverter;
|
import com.aizuda.easy.retry.server.web.service.convert.WorkflowConverter;
|
||||||
@ -13,9 +14,11 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNode
|
|||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.graph.ElementOrder;
|
||||||
import com.google.common.graph.GraphBuilder;
|
import com.google.common.graph.GraphBuilder;
|
||||||
import com.google.common.graph.MutableGraph;
|
import com.google.common.graph.MutableGraph;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
@ -23,6 +26,7 @@ import org.springframework.util.CollectionUtils;
|
|||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author xiaowoniu
|
* @author xiaowoniu
|
||||||
@ -30,6 +34,7 @@ import java.util.Map;
|
|||||||
* @since 2.6.0
|
* @since 2.6.0
|
||||||
*/
|
*/
|
||||||
@Service
|
@Service
|
||||||
|
@Slf4j
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
public class WorkflowServiceImpl implements WorkflowService {
|
public class WorkflowServiceImpl implements WorkflowService {
|
||||||
private final WorkflowMapper workflowMapper;
|
private final WorkflowMapper workflowMapper;
|
||||||
@ -39,7 +44,7 @@ public class WorkflowServiceImpl implements WorkflowService {
|
|||||||
public boolean saveWorkflow(WorkflowRequestVO workflowRequestVO) {
|
public boolean saveWorkflow(WorkflowRequestVO workflowRequestVO) {
|
||||||
|
|
||||||
Long root = -1L;
|
Long root = -1L;
|
||||||
MutableGraph<Long> graph = GraphBuilder.directed().build();
|
MutableGraph<Long> graph = GraphBuilder.directed().allowsSelfLoops(false).build();
|
||||||
// 添加虚拟头节点
|
// 添加虚拟头节点
|
||||||
graph.addNode(root);
|
graph.addNode(root);
|
||||||
|
|
||||||
@ -48,12 +53,13 @@ public class WorkflowServiceImpl implements WorkflowService {
|
|||||||
workflow.setFlowInfo(StrUtil.EMPTY);
|
workflow.setFlowInfo(StrUtil.EMPTY);
|
||||||
Assert.isTrue(1 == workflowMapper.insert(workflow), () -> new EasyRetryServerException("新增工作流失败"));
|
Assert.isTrue(1 == workflowMapper.insert(workflow), () -> new EasyRetryServerException("新增工作流失败"));
|
||||||
|
|
||||||
// 组装节点信息
|
// 获取DAG节点配置
|
||||||
List<NodeInfo> nodeInfos = workflowRequestVO.getNodeInfos();
|
NodeConfig nodeConfig = workflowRequestVO.getNodeConfig();
|
||||||
|
|
||||||
// 递归构建图
|
// 递归构建图
|
||||||
buildGraph(root, workflowRequestVO.getGroupName(), workflow.getId(), nodeInfos, graph);
|
buildGraph(Lists.newArrayList(root), workflowRequestVO.getGroupName(), workflow.getId(), nodeConfig, graph);
|
||||||
|
|
||||||
|
log.info("图构建完成. graph:[{}]", graph);
|
||||||
// 保存图信息
|
// 保存图信息
|
||||||
workflow.setFlowInfo(JsonUtil.toJsonString(convertGraphToAdjacencyList(graph)));
|
workflow.setFlowInfo(JsonUtil.toJsonString(convertGraphToAdjacencyList(graph)));
|
||||||
workflowMapper.updateById(workflow);
|
workflowMapper.updateById(workflow);
|
||||||
@ -71,23 +77,33 @@ public class WorkflowServiceImpl implements WorkflowService {
|
|||||||
return adjacencyList;
|
return adjacencyList;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void buildGraph(Long parentId, String groupName, Long workflowId, List<NodeInfo> nodeInfos, MutableGraph<Long> graph) {
|
public void buildGraph(List<Long> parentIds, String groupName, Long workflowId, NodeConfig nodeConfig, MutableGraph<Long> graph) {
|
||||||
|
|
||||||
if (CollectionUtils.isEmpty(nodeInfos)) {
|
if (Objects.isNull(nodeConfig)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final NodeInfo nodeInfo : nodeInfos) {
|
// 获取节点信息
|
||||||
WorkflowNode workflowNode = WorkflowConverter.INSTANCE.toWorkflowNode(nodeInfo);
|
List<NodeInfo> conditionNodes = nodeConfig.getConditionNodes();
|
||||||
workflowNode.setWorkflowId(workflowId);
|
List<Long> parentIds1 = Lists.newArrayList();
|
||||||
workflowNode.setGroupName(groupName);
|
if (!CollectionUtils.isEmpty(conditionNodes)) {
|
||||||
Assert.isTrue(1 == workflowNodeMapper.insert(workflowNode), () -> new EasyRetryServerException("新增工作流节点失败"));
|
for (final NodeInfo nodeInfo : conditionNodes) {
|
||||||
// 添加节点
|
WorkflowNode workflowNode = WorkflowConverter.INSTANCE.toWorkflowNode(nodeInfo);
|
||||||
graph.addNode(workflowNode.getId());
|
workflowNode.setWorkflowId(workflowId);
|
||||||
// 添加边
|
workflowNode.setGroupName(groupName);
|
||||||
graph.putEdge(parentId, workflowNode.getId());
|
Assert.isTrue(1 == workflowNodeMapper.insert(workflowNode), () -> new EasyRetryServerException("新增工作流节点失败"));
|
||||||
buildGraph(workflowNode.getId(), groupName, workflowId, nodeInfo.getChildreList(), graph);
|
// 添加节点
|
||||||
|
graph.addNode(workflowNode.getId());
|
||||||
|
for (final Long parentId : parentIds) {
|
||||||
|
// 添加边
|
||||||
|
graph.putEdge(parentId, workflowNode.getId());
|
||||||
|
}
|
||||||
|
parentIds1.add(workflowNode.getId());
|
||||||
|
buildGraph(Lists.newArrayList(workflowNode.getId()), groupName, workflowId, nodeInfo.getChildNode(), graph);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
buildGraph(parentIds1, groupName, workflowId, nodeConfig.getChildNode(), graph);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user