From 524899d7d6162a39c29e2f4ba0cc644f98c93ca9 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Thu, 28 Dec 2023 11:18:05 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.6.0=201.=20=E8=B0=83=E8=AF=95?= =?UTF-8?q?=E6=9D=A1=E4=BB=B6=E8=8A=82=E7=82=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../task/support/dispatch/WorkflowExecutorActor.java | 7 ++++++- .../executor/workflow/ConditionWorkflowExecutor.java | 3 ++- .../task/support/handler/WorkflowBatchHandler.java | 11 ++++++----- .../web/model/response/WorkflowDetailResponseVO.java | 5 +++++ .../web/service/impl/WorkflowBatchServiceImpl.java | 1 + 5 files changed, 20 insertions(+), 7 deletions(-) diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java index c08a25b7..05a20a3d 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -100,7 +100,7 @@ public class WorkflowExecutorActor extends AbstractActor { // 添加父节点,为了判断父节点的处理状态 List jobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper() - .select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId) + .select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId, JobTaskBatch::getTaskBatchStatus) .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId()) .in(JobTaskBatch::getWorkflowNodeId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId()))) ); @@ -122,12 +122,17 @@ public class WorkflowExecutorActor extends AbstractActor { } } + // 去掉父节点 + workflowNodes = workflowNodes.stream().filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId())).collect( + Collectors.toList()); + List jobs = jobMapper.selectBatchIds(workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet())); Map jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i)); // 只会条件节点会使用 Boolean evaluationResult = null; for (WorkflowNode workflowNode : workflowNodes) { + // 批次已经存在就不在重复生成 JobTaskBatch jobTaskBatch = jobTaskBatchMap.get(workflowNode.getId()); if (Objects.nonNull(jobTaskBatch) && JobTaskBatchStatusEnum.COMPLETED.contains(jobTaskBatch.getTaskBatchStatus())) { diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java index 9850e2c6..18a4a97d 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java @@ -72,9 +72,10 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor { if (StrUtil.isNotBlank(context.getResult())) { contextMap = JsonUtil.parseHashMap(context.getResult()); } - result = doEval(context.getNodeExpression(), contextMap); + result = Optional.ofNullable(doEval(context.getNodeExpression(), contextMap)).orElse(Boolean.FALSE); log.info("执行条件表达式:[{}],参数: [{}] 结果:[{}]", context.getNodeExpression(), context.getResult(), result); } catch (Exception e) { + log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", context.getNodeExpression(), context.getResult(), e); taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus(); operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR.getReason(); jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus(); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java index cfb7df6c..1d9e85c4 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java @@ -28,6 +28,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode; import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.google.common.collect.Lists; import com.google.common.graph.MutableGraph; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; @@ -105,7 +106,7 @@ public class WorkflowBatchHandler { // 条件节点是或的关系一个成功就代表成功 if (WorkflowNodeTypeEnum.CONDITION.getType() == workflowNode.getNodeType()) { for (final Long predecessor : predecessors) { - List jobTaskBatcheList = map.get(predecessor); + List jobTaskBatcheList = map.getOrDefault(predecessor, Lists.newArrayList()); Map statusCountMap = jobTaskBatcheList.stream() .collect(Collectors.groupingBy(JobTaskBatch::getTaskBatchStatus, Collectors.counting())); long successCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.SUCCESS.getStatus(), 0L); @@ -131,7 +132,7 @@ public class WorkflowBatchHandler { } else { for (final Long predecessor : predecessors) { - List jobTaskBatcheList = map.get(predecessor); + List jobTaskBatcheList = map.getOrDefault(predecessor, Lists.newArrayList()); Map statusCountMap = jobTaskBatcheList.stream() .collect(Collectors.groupingBy(JobTaskBatch::getTaskBatchStatus, Collectors.counting())); long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L); @@ -256,7 +257,7 @@ public class WorkflowBatchHandler { taskExecuteDTO.setWorkflowTaskBatchId(workflowTaskBatchId); taskExecuteDTO.setWorkflowId(successor); taskExecuteDTO.setTriggerType(1); - taskExecuteDTO.setParentId(successor); + taskExecuteDTO.setParentId(parentId); ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); actorRef.tell(taskExecuteDTO, actorRef); continue; @@ -277,8 +278,8 @@ public class WorkflowBatchHandler { continue; } - // 已经是终态的需要递归查询是否已经完成 + // 已经是终态的需要递归遍历后继节点是否正常执行 checkWorkflowExecutor(successor, workflowTaskBatchId, graph, jobTaskBatchMap); } } -} \ No newline at end of file +} diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/model/response/WorkflowDetailResponseVO.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/model/response/WorkflowDetailResponseVO.java index 1eacd628..57dfae7c 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/model/response/WorkflowDetailResponseVO.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/model/response/WorkflowDetailResponseVO.java @@ -121,6 +121,11 @@ public class WorkflowDetailResponseVO { */ private Integer taskBatchStatus; + /** + * 定时任务批次id + */ + private Long jobTaskBatchId; + /** * 任务执行时间 */ diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowBatchServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowBatchServiceImpl.java index 6b02e4a8..384f8d95 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowBatchServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowBatchServiceImpl.java @@ -120,6 +120,7 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService { .peek(nodeInfo -> { JobTaskBatch jobTaskBatch = jobTaskBatchMap.get(nodeInfo.getId()); if (Objects.nonNull(jobTaskBatch)) { + nodeInfo.setJobTaskBatchId(jobTaskBatch.getId()); nodeInfo.setExecutionAt(DateUtils.toLocalDateTime(jobTaskBatch.getExecutionAt())); nodeInfo.setTaskBatchStatus(jobTaskBatch.getTaskBatchStatus()); nodeInfo.setOperationReason(jobTaskBatch.getOperationReason());