feat(server): 增加工作流循环调用检测

- 新增 TreeCycleDetectionWithCache 类用于检测工作流中的循环调用- 在 WorkflowServiceImpl 中调用循环检测逻辑,确保工作流中不存在循环调用
- 优化 WorkflowExecutorActor 和 WorkflowBatchHandler 中的任务批次处理逻辑
This commit is contained in:
csc 2025-06-30 14:43:47 +08:00
parent 65d4882397
commit b66a481bc0
4 changed files with 113 additions and 6 deletions

View File

@ -0,0 +1,101 @@
package com.aizuda.snailjob.server.common.util;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.dto.WorkflowConfig;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowNodeMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowNode;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.Data;
import java.util.*;
/**
* 树形结构循环检测
* 检测工作流中调用工作流其中是否存在循环调用
*/
public class TreeCycleDetectionWithCache {
@Data
public static class Node {
/**
* 工作流ID
*/
long id;
public Node(long id) {
this.id = id;
}
/**
* 从数据库中该工作流中调用的所有子工作流
*/
public List<Node> getChildrenFromDB() {
WorkflowNodeMapper workflowNodeMapper = SnailSpringContext.getBean(WorkflowNodeMapper.class);
assert workflowNodeMapper != null;
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
.eq(WorkflowNode::getJobId, SystemConstants.WORKFLOW_JOB_ID)
.eq(WorkflowNode::getWorkflowId, id));
ArrayList<Node> nodes = new ArrayList<>(workflowNodes.size());
for (WorkflowNode workflowNode : workflowNodes) {
WorkflowConfig nodeInfo = JsonUtil.parseObject(workflowNode.getNodeInfo(), WorkflowConfig.class);
nodes.add(new Node(nodeInfo.getId()));
}
return nodes;
}
}
/**
* 缓存子节点,防止多次查询数据库
*/
private final Map<Node, List<Node>> childCache = new HashMap<>();
/**
* 缓存已访问的节点防止重复处理
*/
private final Set<Node> visited = new HashSet<>();
/**
* 递归栈用于检测环
*/
private final Set<Node> recursionStack = new HashSet<>();
/**
* 检测环
*/
public boolean hasCycle(Node root) {
return dfs(root);
}
private boolean dfs(Node node) {
if (recursionStack.contains(node)) {
return true; // 发现环
}
if (visited.contains(node)) {
return false; // 已经处理过无需重复处理
}
visited.add(node);
recursionStack.add(node);
// 获取子节点可能来自缓存
List<Node> children = getCachedChildren(node);
for (Node child : children) {
if (dfs(child)) {
return true;
}
}
recursionStack.remove(node);
return false;
}
private List<Node> getCachedChildren(Node node) {
if (!childCache.containsKey(node)) {
childCache.put(node, node.getChildrenFromDB());
}
return childCache.get(node);
}
}

View File

@ -101,7 +101,7 @@ public class WorkflowExecutorActor extends AbstractActor {
if (SystemConstants.ROOT.equals(taskExecute.getParentId())
&& JobTaskBatchStatusEnum.WAITING.getStatus() == workflowTaskBatch.getTaskBatchStatus()) {
handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.RUNNING.getStatus(),
JobOperationReasonEnum.NONE.getReason());
null);
Workflow workflow = workflowMapper.selectById(workflowTaskBatch.getWorkflowId());
JobTimerWheel.clearCache(
@ -325,7 +325,7 @@ public class WorkflowExecutorActor extends AbstractActor {
* @param taskStatus 任务批次状态
* @param operationReason 操作原因
*/
private void handlerTaskBatch(WorkflowNodeTaskExecuteDTO taskExecute, int taskStatus, int operationReason) {
private void handlerTaskBatch(WorkflowNodeTaskExecuteDTO taskExecute, int taskStatus, Integer operationReason) {
WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch();
jobTaskBatch.setId(taskExecute.getWorkflowTaskBatchId());

View File

@ -137,7 +137,7 @@ public class WorkflowBatchHandler {
// 判定最后的工作流批次状态
int taskStatus = JobTaskBatchStatusEnum.SUCCESS.getStatus();
int operationReason = JobOperationReasonEnum.NONE.getReason();
// int operationReason = JobOperationReasonEnum.NONE.getReason();
// 判定所有的叶子节点是否完成
List<Long> leaves = MutableGraphCache.getLeaves(workflowTaskBatchId, flowInfo);
@ -175,7 +175,7 @@ public class WorkflowBatchHandler {
}
}
handlerTaskBatch(workflowTaskBatchId, taskStatus, operationReason);
handlerTaskBatch(workflowTaskBatchId, taskStatus, null);
return true;
@ -184,12 +184,12 @@ public class WorkflowBatchHandler {
/**
* 修改工作流任务批次状态
*/
private void handlerTaskBatch(Long workflowTaskBatchId, int taskStatus, int operationReason) {
private void handlerTaskBatch(Long workflowTaskBatchId, int taskStatus, Integer operationReason) {
WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch();
jobTaskBatch.setId(workflowTaskBatchId);
jobTaskBatch.setTaskBatchStatus(taskStatus);
// jobTaskBatch.setOperationReason(operationReason);
jobTaskBatch.setOperationReason(operationReason);
workflowTaskBatchMapper.updateById(jobTaskBatch);
}

View File

@ -246,6 +246,12 @@ public class WorkflowServiceImpl implements WorkflowService {
log.info("Graph construction complete. graph:[{}]", graph);
TreeCycleDetectionWithCache detector = new TreeCycleDetectionWithCache();
TreeCycleDetectionWithCache.Node root = new TreeCycleDetectionWithCache.Node(workflowRequestVO.getId());
//是否存在循环
boolean hasCycle = detector.hasCycle(root);
Assert.isFalse(hasCycle, () -> new SnailJobServerException("工作流中存在循环调用"));
// 保存图信息
workflow = WorkflowConverter.INSTANCE.convert(workflowRequestVO);
workflow.setId(workflowRequestVO.getId());