diff --git a/doc/sql/easy_retry_mysql.sql b/doc/sql/easy_retry_mysql.sql index c5aa7fc1..b57d96da 100644 --- a/doc/sql/easy_retry_mysql.sql +++ b/doc/sql/easy_retry_mysql.sql @@ -450,7 +450,7 @@ CREATE TABLE `workflow` `trigger_interval` varchar(255) NOT NULL COMMENT '间隔时长', `execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间', `executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒', - `flow_info` JSON NOT NULL COMMENT '流程信息', + `flow_info` text DEFAULT NULL COMMENT '流程信息', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', `deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除', @@ -466,14 +466,15 @@ CREATE TABLE `workflow_node` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id', + `node_name` varchar(64) NOT NULL COMMENT '节点名称', `group_name` varchar(64) NOT NULL COMMENT '组名称', - `job_id` bigint(20) NOT NULL COMMENT '任务信息id', + `job_id` bigint(20) NOT NULL DEFAULT -1 COMMENT '任务信息id', `workflow_id` bigint(20) NOT NULL COMMENT '工作流ID', `node_type` tinyint(4) NOT NULL DEFAULT 1 COMMENT '1、任务节点 2、条件节点', `expression_type` tinyint(4) NOT NULL DEFAULT 0 COMMENT '1、SpEl、2、Aviator 3、QL', `fail_strategy` tinyint(4) NOT NULL DEFAULT 0 COMMENT '失败策略 1、跳过 2、阻塞', `workflow_node_status` tinyint(4) NOT NULL DEFAULT 1 COMMENT '工作流节点状态 0、关闭、1、开启', - `node_expression` text NOT NULL COMMENT '节点表达式', + `node_expression` text DEFAULT NULL COMMENT '节点表达式', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', `deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除', diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/WorkflowNode.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/WorkflowNode.java index 235d8163..7f389369 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/WorkflowNode.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/WorkflowNode.java @@ -34,6 +34,11 @@ public class WorkflowNode implements Serializable { */ private String namespaceId; + /** + * 节点名称 + */ + private String nodeName; + /** * 组名称 */ @@ -52,22 +57,22 @@ public class WorkflowNode implements Serializable { /** * 1、任务节点 2、条件节点 */ - private Byte nodeType; + private Integer nodeType; /** * 1、SpEl、2、Aviator 3、QL */ - private Byte expressionType; + private Integer expressionType; /** * 失败策略 1、跳过 2、阻塞 */ - private Byte failStrategy; + private Integer failStrategy; /** * 工作流节点状态 0、关闭、1、开启 */ - private Byte workflowNodeStatus; + private Integer workflowNodeStatus; /** * 节点表达式 @@ -87,7 +92,7 @@ public class WorkflowNode implements Serializable { /** * 逻辑删除 1、删除 */ - private Byte deleted; + private Integer deleted; /** * 扩展字段 diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/model/request/WorkflowRequestVO.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/model/request/WorkflowRequestVO.java index f769c8d8..7a1ad75d 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/model/request/WorkflowRequestVO.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/model/request/WorkflowRequestVO.java @@ -36,16 +36,13 @@ public class WorkflowRequestVO { @NotNull(message = "工作流状态") private Integer workflowStatus; - @NotEmpty(message = "节点信息不能为空") - private List nodeInfos; + /** + * DAG节点配置 + */ + private NodeConfig nodeConfig; @Data - public static class NodeInfo { - - /** - * 优先级 - */ - private Integer priorityLevel; + public static class NodeConfig { /** * 1、任务节点 2、条件节点 @@ -53,6 +50,31 @@ public class WorkflowRequestVO { @NotNull(message = "节点类型不能为空 ") private Integer nodeType; + /** + * 节点信息 + */ + private List conditionNodes; + + /** + * 子节点 + */ + private NodeConfig childNode; + + } + + @Data + public static class NodeInfo { + + /** + * 节点名称 + */ + private String nodeName; + + /** + * 优先级 + */ + private Integer priorityLevel; + /** * 任务ID */ @@ -85,7 +107,7 @@ public class WorkflowRequestVO { /** * 子节点 */ - private List childreList; + private NodeConfig childNode; } diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java index e607205a..6a2940d5 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java @@ -5,6 +5,7 @@ import cn.hutool.core.util.StrUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.web.model.request.WorkflowRequestVO; +import com.aizuda.easy.retry.server.web.model.request.WorkflowRequestVO.NodeConfig; import com.aizuda.easy.retry.server.web.model.request.WorkflowRequestVO.NodeInfo; import com.aizuda.easy.retry.server.web.service.WorkflowService; import com.aizuda.easy.retry.server.web.service.convert.WorkflowConverter; @@ -13,9 +14,11 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNode import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow; import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode; import com.google.common.collect.Lists; +import com.google.common.graph.ElementOrder; import com.google.common.graph.GraphBuilder; import com.google.common.graph.MutableGraph; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; @@ -23,6 +26,7 @@ import org.springframework.util.CollectionUtils; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; /** * @author xiaowoniu @@ -30,6 +34,7 @@ import java.util.Map; * @since 2.6.0 */ @Service +@Slf4j @RequiredArgsConstructor public class WorkflowServiceImpl implements WorkflowService { private final WorkflowMapper workflowMapper; @@ -39,7 +44,7 @@ public class WorkflowServiceImpl implements WorkflowService { public boolean saveWorkflow(WorkflowRequestVO workflowRequestVO) { Long root = -1L; - MutableGraph graph = GraphBuilder.directed().build(); + MutableGraph graph = GraphBuilder.directed().allowsSelfLoops(false).build(); // 添加虚拟头节点 graph.addNode(root); @@ -48,12 +53,13 @@ public class WorkflowServiceImpl implements WorkflowService { workflow.setFlowInfo(StrUtil.EMPTY); Assert.isTrue(1 == workflowMapper.insert(workflow), () -> new EasyRetryServerException("新增工作流失败")); - // 组装节点信息 - List nodeInfos = workflowRequestVO.getNodeInfos(); + // 获取DAG节点配置 + NodeConfig nodeConfig = workflowRequestVO.getNodeConfig(); // 递归构建图 - buildGraph(root, workflowRequestVO.getGroupName(), workflow.getId(), nodeInfos, graph); + buildGraph(Lists.newArrayList(root), workflowRequestVO.getGroupName(), workflow.getId(), nodeConfig, graph); + log.info("图构建完成. graph:[{}]", graph); // 保存图信息 workflow.setFlowInfo(JsonUtil.toJsonString(convertGraphToAdjacencyList(graph))); workflowMapper.updateById(workflow); @@ -71,23 +77,33 @@ public class WorkflowServiceImpl implements WorkflowService { return adjacencyList; } - public void buildGraph(Long parentId, String groupName, Long workflowId, List nodeInfos, MutableGraph graph) { + public void buildGraph(List parentIds, String groupName, Long workflowId, NodeConfig nodeConfig, MutableGraph graph) { - if (CollectionUtils.isEmpty(nodeInfos)) { + if (Objects.isNull(nodeConfig)) { return; } - for (final NodeInfo nodeInfo : nodeInfos) { - WorkflowNode workflowNode = WorkflowConverter.INSTANCE.toWorkflowNode(nodeInfo); - workflowNode.setWorkflowId(workflowId); - workflowNode.setGroupName(groupName); - Assert.isTrue(1 == workflowNodeMapper.insert(workflowNode), () -> new EasyRetryServerException("新增工作流节点失败")); - // 添加节点 - graph.addNode(workflowNode.getId()); - // 添加边 - graph.putEdge(parentId, workflowNode.getId()); - buildGraph(workflowNode.getId(), groupName, workflowId, nodeInfo.getChildreList(), graph); + // 获取节点信息 + List conditionNodes = nodeConfig.getConditionNodes(); + List parentIds1 = Lists.newArrayList(); + if (!CollectionUtils.isEmpty(conditionNodes)) { + for (final NodeInfo nodeInfo : conditionNodes) { + WorkflowNode workflowNode = WorkflowConverter.INSTANCE.toWorkflowNode(nodeInfo); + workflowNode.setWorkflowId(workflowId); + workflowNode.setGroupName(groupName); + Assert.isTrue(1 == workflowNodeMapper.insert(workflowNode), () -> new EasyRetryServerException("新增工作流节点失败")); + // 添加节点 + graph.addNode(workflowNode.getId()); + for (final Long parentId : parentIds) { + // 添加边 + graph.putEdge(parentId, workflowNode.getId()); + } + parentIds1.add(workflowNode.getId()); + buildGraph(Lists.newArrayList(workflowNode.getId()), groupName, workflowId, nodeInfo.getChildNode(), graph); + } } + + buildGraph(parentIds1, groupName, workflowId, nodeConfig.getChildNode(), graph); }