From 03c0c59c53637b193df024ebe1065fb2e3753377 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Wed, 27 Dec 2023 23:58:07 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.6.0=201.=20DAG=E6=9D=A1=E4=BB=B6?= =?UTF-8?q?=E8=8A=82=E7=82=B9=E6=9C=AA=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ConcurrencyWorkflowBlockStrategy.java | 12 ++ .../DiscardWorkflowBlockStrategy.java | 19 ++-- .../support/handler/WorkflowBatchHandler.java | 103 ++++++++++++++---- 3 files changed, 106 insertions(+), 28 deletions(-) diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/ConcurrencyWorkflowBlockStrategy.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/ConcurrencyWorkflowBlockStrategy.java index 3721f1fa..475e2950 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/ConcurrencyWorkflowBlockStrategy.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/ConcurrencyWorkflowBlockStrategy.java @@ -1,12 +1,16 @@ package com.aizuda.easy.retry.server.job.task.support.block.workflow; +import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter; import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowBatchGenerator; import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext; import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategies.BlockStrategyEnum; +import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; +import java.io.IOException; + /** * @author: shuguang.zhang * @date : 2023-12-26 @@ -16,9 +20,17 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class ConcurrencyWorkflowBlockStrategy extends AbstractWorkflowBlockStrategy { private final WorkflowBatchGenerator workflowBatchGenerator; + private final WorkflowBatchHandler workflowBatchHandler; @Override protected void doBlock(final WorkflowBlockStrategyContext workflowBlockStrategyContext) { + + try { + workflowBatchHandler.checkWorkflowExecutor(workflowBlockStrategyContext.getWorkflowTaskBatchId(), null); + } catch (IOException e) { + throw new EasyRetryServerException("校验工作流失败", e); + } + WorkflowTaskBatchGeneratorContext workflowTaskBatchGeneratorContext = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatchGeneratorContext(workflowBlockStrategyContext); workflowBatchGenerator.generateJobTaskBatch(workflowTaskBatchGeneratorContext); } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/DiscardWorkflowBlockStrategy.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/DiscardWorkflowBlockStrategy.java index 8c5b53cf..51fd65c2 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/DiscardWorkflowBlockStrategy.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/DiscardWorkflowBlockStrategy.java @@ -5,14 +5,18 @@ import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter; import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowBatchGenerator; import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext; import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategies.BlockStrategyEnum; +import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; +import java.io.IOException; + /** * @author: xiaowoniu * @date : 2023-12-26 @@ -22,18 +26,15 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class DiscardWorkflowBlockStrategy extends AbstractWorkflowBlockStrategy { private final WorkflowBatchGenerator workflowBatchGenerator; + private final WorkflowBatchHandler workflowBatchHandler; @Override protected void doBlock(final WorkflowBlockStrategyContext workflowBlockStrategyContext) { - // 重新尝试执行, 重新生成任务批次 - WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); - taskExecuteDTO.setWorkflowTaskBatchId(workflowBlockStrategyContext.getWorkflowTaskBatchId()); - taskExecuteDTO.setWorkflowId(workflowBlockStrategyContext.getWorkflowId()); - taskExecuteDTO.setTriggerType(workflowBlockStrategyContext.getTriggerType()); - taskExecuteDTO.setParentId(SystemConstants.ROOT); - ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); - actorRef.tell(taskExecuteDTO, actorRef); - + try { + workflowBatchHandler.checkWorkflowExecutor(workflowBlockStrategyContext.getWorkflowTaskBatchId(), null); + } catch (IOException e) { + throw new EasyRetryServerException("校验工作流失败", e); + } // 生成状态为取消的工作流批次 WorkflowTaskBatchGeneratorContext workflowTaskBatchGeneratorContext = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatchGeneratorContext(workflowBlockStrategyContext); workflowTaskBatchGeneratorContext.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus()); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java index fd2636f5..cfb7df6c 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java @@ -1,13 +1,18 @@ package com.aizuda.easy.retry.server.job.task.support.handler; +import akka.actor.ActorRef; import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum; +import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.common.util.GraphUtils; +import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler; @@ -59,7 +64,7 @@ public class WorkflowBatchHandler { public boolean complete(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) throws IOException { workflowTaskBatch = Optional.ofNullable(workflowTaskBatch) - .orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId)); + .orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId)); Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在")); String flowInfo = workflowTaskBatch.getFlowInfo(); @@ -67,8 +72,8 @@ public class WorkflowBatchHandler { // 说明没有后继节点了, 此时需要判断整个DAG是否全部执行完成 List jobTaskBatches = jobTaskBatchMapper.selectList(new LambdaQueryWrapper() - .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId()) - .in(JobTaskBatch::getWorkflowNodeId, graph.nodes()) + .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId()) + .in(JobTaskBatch::getWorkflowNodeId, graph.nodes()) ); if (CollectionUtils.isEmpty(jobTaskBatches)) { @@ -76,21 +81,21 @@ public class WorkflowBatchHandler { } if (jobTaskBatches.stream().anyMatch( - jobTaskBatch -> JobTaskBatchStatusEnum.NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus()))) { + jobTaskBatch -> JobTaskBatchStatusEnum.NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus()))) { return false; } List workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper() - .in(WorkflowNode::getId, graph.nodes())); + .in(WorkflowNode::getId, graph.nodes())); if (jobTaskBatches.size() < workflowNodes.size()) { return false; } Map workflowNodeMap = workflowNodes.stream() - .collect(Collectors.toMap(WorkflowNode::getId, workflowNode -> workflowNode)); + .collect(Collectors.toMap(WorkflowNode::getId, workflowNode -> workflowNode)); Map> map = jobTaskBatches.stream() - .collect(Collectors.groupingBy(JobTaskBatch::getParentWorkflowNodeId)); + .collect(Collectors.groupingBy(JobTaskBatch::getParentWorkflowNodeId)); int taskStatus = JobTaskBatchStatusEnum.SUCCESS.getStatus(); int operationReason = JobOperationReasonEnum.NONE.getReason(); @@ -102,7 +107,7 @@ public class WorkflowBatchHandler { for (final Long predecessor : predecessors) { List jobTaskBatcheList = map.get(predecessor); Map statusCountMap = jobTaskBatcheList.stream() - .collect(Collectors.groupingBy(JobTaskBatch::getTaskBatchStatus, Collectors.counting())); + .collect(Collectors.groupingBy(JobTaskBatch::getTaskBatchStatus, Collectors.counting())); long successCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.SUCCESS.getStatus(), 0L); long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L); long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L); @@ -128,9 +133,10 @@ public class WorkflowBatchHandler { for (final Long predecessor : predecessors) { List jobTaskBatcheList = map.get(predecessor); Map statusCountMap = jobTaskBatcheList.stream() - .collect(Collectors.groupingBy(JobTaskBatch::getTaskBatchStatus, Collectors.counting())); + .collect(Collectors.groupingBy(JobTaskBatch::getTaskBatchStatus, Collectors.counting())); long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L); - long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L); if (failCount > 0) { + long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L); + if (failCount > 0) { taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus(); operationReason = JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason(); break; @@ -164,13 +170,13 @@ public class WorkflowBatchHandler { jobTaskBatch.setTaskBatchStatus(taskStatus); jobTaskBatch.setOperationReason(operationReason); Assert.isTrue(1 == workflowTaskBatchMapper.updateById(jobTaskBatch), - () -> new EasyRetryServerException("更新任务失败")); + () -> new EasyRetryServerException("更新任务失败")); } public void stop(Long workflowTaskBatchId, Integer operationReason) { if (Objects.isNull(operationReason) - || operationReason == JobOperationReasonEnum.NONE.getReason()) { + || operationReason == JobOperationReasonEnum.NONE.getReason()) { operationReason = JobOperationReasonEnum.JOB_OVERLAY.getReason(); } @@ -180,20 +186,20 @@ public class WorkflowBatchHandler { workflowTaskBatch.setId(workflowTaskBatchId); // 先停止执行中的批次 Assert.isTrue(1 == workflowTaskBatchMapper.updateById(workflowTaskBatch), - () -> new EasyRetryServerException("停止工作流批次失败. id:[{}]", - workflowTaskBatchId)); + () -> new EasyRetryServerException("停止工作流批次失败. id:[{}]", + workflowTaskBatchId)); // 关闭已经触发的任务 List jobTaskBatches = jobTaskBatchMapper.selectList(new LambdaQueryWrapper() - .in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE) - .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId)); + .in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE) + .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId)); if (CollectionUtils.isEmpty(jobTaskBatches)) { return; } List jobs = jobMapper.selectBatchIds( - jobTaskBatches.stream().map(JobTaskBatch::getJobId).collect(Collectors.toSet())); + jobTaskBatches.stream().map(JobTaskBatch::getJobId).collect(Collectors.toSet())); Map jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i)); for (final JobTaskBatch jobTaskBatch : jobTaskBatches) { @@ -213,7 +219,66 @@ public class WorkflowBatchHandler { } } - public void checkWorkflowExecutor() { + public void checkWorkflowExecutor(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) throws IOException { + workflowTaskBatch = Optional.ofNullable(workflowTaskBatch) + .orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId)); + Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在")); + String flowInfo = workflowTaskBatch.getFlowInfo(); + MutableGraph graph = GraphUtils.deserializeJsonToGraph(flowInfo); + Set successors = graph.successors(SystemConstants.ROOT); + if (CollectionUtils.isEmpty(successors)) { + return; + } + // 说明没有后继节点了, 此时需要判断整个DAG是否全部执行完成 + List jobTaskBatches = jobTaskBatchMapper.selectList(new LambdaQueryWrapper() + .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId) + .in(JobTaskBatch::getWorkflowNodeId, graph.nodes()) + ); + + Map jobTaskBatchMap = jobTaskBatches.stream().collect(Collectors.toMap(JobTaskBatch::getWorkflowNodeId, i -> i)); + + checkWorkflowExecutor(SystemConstants.ROOT, workflowTaskBatchId, graph, jobTaskBatchMap); } -} + + private void checkWorkflowExecutor(Long parentId, Long workflowTaskBatchId, MutableGraph graph, Map jobTaskBatchMap) { + + Set successors = graph.successors(parentId); + if (CollectionUtils.isEmpty(successors)) { + return; + } + + for (Long successor : successors) { + JobTaskBatch jobTaskBatch = jobTaskBatchMap.get(successor); + if (Objects.isNull(jobTaskBatch)) { + // 重新尝试执行, 重新生成任务批次 + WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); + taskExecuteDTO.setWorkflowTaskBatchId(workflowTaskBatchId); + taskExecuteDTO.setWorkflowId(successor); + taskExecuteDTO.setTriggerType(1); + taskExecuteDTO.setParentId(successor); + ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); + actorRef.tell(taskExecuteDTO, actorRef); + continue; + } + + if (NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus())) { + // 生成任务批次 + Job job = jobMapper.selectById(jobTaskBatch.getJobId()); + JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job); + jobTaskPrepare.setTriggerType(JobTriggerTypeEnum.WORKFLOW.getType()); + jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli()); + jobTaskPrepare.setWorkflowNodeId(successor); + jobTaskPrepare.setWorkflowTaskBatchId(workflowTaskBatchId); + jobTaskPrepare.setParentWorkflowNodeId(parentId); + // 执行预处理阶段 + ActorRef actorRef = ActorGenerator.jobTaskPrepareActor(); + actorRef.tell(jobTaskPrepare, actorRef); + continue; + } + + // 已经是终态的需要递归查询是否已经完成 + checkWorkflowExecutor(successor, workflowTaskBatchId, graph, jobTaskBatchMap); + } + } +} \ No newline at end of file