feat: 2.6.0
1. 调试条件节点
This commit is contained in:
parent
031cc93743
commit
524899d7d6
@ -100,7 +100,7 @@ public class WorkflowExecutorActor extends AbstractActor {
|
||||
|
||||
// 添加父节点,为了判断父节点的处理状态
|
||||
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
|
||||
.select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId)
|
||||
.select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId, JobTaskBatch::getTaskBatchStatus)
|
||||
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
|
||||
.in(JobTaskBatch::getWorkflowNodeId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId())))
|
||||
);
|
||||
@ -122,12 +122,17 @@ public class WorkflowExecutorActor extends AbstractActor {
|
||||
}
|
||||
}
|
||||
|
||||
// 去掉父节点
|
||||
workflowNodes = workflowNodes.stream().filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId())).collect(
|
||||
Collectors.toList());
|
||||
|
||||
List<Job> jobs = jobMapper.selectBatchIds(workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet()));
|
||||
Map<Long, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i));
|
||||
|
||||
// 只会条件节点会使用
|
||||
Boolean evaluationResult = null;
|
||||
for (WorkflowNode workflowNode : workflowNodes) {
|
||||
|
||||
// 批次已经存在就不在重复生成
|
||||
JobTaskBatch jobTaskBatch = jobTaskBatchMap.get(workflowNode.getId());
|
||||
if (Objects.nonNull(jobTaskBatch) && JobTaskBatchStatusEnum.COMPLETED.contains(jobTaskBatch.getTaskBatchStatus())) {
|
||||
|
@ -72,9 +72,10 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
|
||||
if (StrUtil.isNotBlank(context.getResult())) {
|
||||
contextMap = JsonUtil.parseHashMap(context.getResult());
|
||||
}
|
||||
result = doEval(context.getNodeExpression(), contextMap);
|
||||
result = Optional.ofNullable(doEval(context.getNodeExpression(), contextMap)).orElse(Boolean.FALSE);
|
||||
log.info("执行条件表达式:[{}],参数: [{}] 结果:[{}]", context.getNodeExpression(), context.getResult(), result);
|
||||
} catch (Exception e) {
|
||||
log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", context.getNodeExpression(), context.getResult(), e);
|
||||
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
|
||||
operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR.getReason();
|
||||
jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus();
|
||||
|
@ -28,6 +28,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.graph.MutableGraph;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.stereotype.Component;
|
||||
@ -105,7 +106,7 @@ public class WorkflowBatchHandler {
|
||||
// 条件节点是或的关系一个成功就代表成功
|
||||
if (WorkflowNodeTypeEnum.CONDITION.getType() == workflowNode.getNodeType()) {
|
||||
for (final Long predecessor : predecessors) {
|
||||
List<JobTaskBatch> jobTaskBatcheList = map.get(predecessor);
|
||||
List<JobTaskBatch> jobTaskBatcheList = map.getOrDefault(predecessor, Lists.newArrayList());
|
||||
Map<Integer, Long> statusCountMap = jobTaskBatcheList.stream()
|
||||
.collect(Collectors.groupingBy(JobTaskBatch::getTaskBatchStatus, Collectors.counting()));
|
||||
long successCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.SUCCESS.getStatus(), 0L);
|
||||
@ -131,7 +132,7 @@ public class WorkflowBatchHandler {
|
||||
} else {
|
||||
|
||||
for (final Long predecessor : predecessors) {
|
||||
List<JobTaskBatch> jobTaskBatcheList = map.get(predecessor);
|
||||
List<JobTaskBatch> jobTaskBatcheList = map.getOrDefault(predecessor, Lists.newArrayList());
|
||||
Map<Integer, Long> statusCountMap = jobTaskBatcheList.stream()
|
||||
.collect(Collectors.groupingBy(JobTaskBatch::getTaskBatchStatus, Collectors.counting()));
|
||||
long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L);
|
||||
@ -256,7 +257,7 @@ public class WorkflowBatchHandler {
|
||||
taskExecuteDTO.setWorkflowTaskBatchId(workflowTaskBatchId);
|
||||
taskExecuteDTO.setWorkflowId(successor);
|
||||
taskExecuteDTO.setTriggerType(1);
|
||||
taskExecuteDTO.setParentId(successor);
|
||||
taskExecuteDTO.setParentId(parentId);
|
||||
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
|
||||
actorRef.tell(taskExecuteDTO, actorRef);
|
||||
continue;
|
||||
@ -277,8 +278,8 @@ public class WorkflowBatchHandler {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 已经是终态的需要递归查询是否已经完成
|
||||
// 已经是终态的需要递归遍历后继节点是否正常执行
|
||||
checkWorkflowExecutor(successor, workflowTaskBatchId, graph, jobTaskBatchMap);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -121,6 +121,11 @@ public class WorkflowDetailResponseVO {
|
||||
*/
|
||||
private Integer taskBatchStatus;
|
||||
|
||||
/**
|
||||
* 定时任务批次id
|
||||
*/
|
||||
private Long jobTaskBatchId;
|
||||
|
||||
/**
|
||||
* 任务执行时间
|
||||
*/
|
||||
|
@ -120,6 +120,7 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
|
||||
.peek(nodeInfo -> {
|
||||
JobTaskBatch jobTaskBatch = jobTaskBatchMap.get(nodeInfo.getId());
|
||||
if (Objects.nonNull(jobTaskBatch)) {
|
||||
nodeInfo.setJobTaskBatchId(jobTaskBatch.getId());
|
||||
nodeInfo.setExecutionAt(DateUtils.toLocalDateTime(jobTaskBatch.getExecutionAt()));
|
||||
nodeInfo.setTaskBatchStatus(jobTaskBatch.getTaskBatchStatus());
|
||||
nodeInfo.setOperationReason(jobTaskBatch.getOperationReason());
|
||||
|
Loading…
Reference in New Issue
Block a user