feat: 2.6.0
1. DAG条件节点未完成
This commit is contained in:
parent
6bac05cfb3
commit
03c0c59c53
@ -1,12 +1,16 @@
|
||||
package com.aizuda.easy.retry.server.job.task.support.block.workflow;
|
||||
|
||||
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
||||
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
|
||||
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowBatchGenerator;
|
||||
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext;
|
||||
import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategies.BlockStrategyEnum;
|
||||
import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* @author: shuguang.zhang
|
||||
* @date : 2023-12-26
|
||||
@ -16,9 +20,17 @@ import org.springframework.stereotype.Component;
|
||||
@RequiredArgsConstructor
|
||||
public class ConcurrencyWorkflowBlockStrategy extends AbstractWorkflowBlockStrategy {
|
||||
private final WorkflowBatchGenerator workflowBatchGenerator;
|
||||
private final WorkflowBatchHandler workflowBatchHandler;
|
||||
|
||||
@Override
|
||||
protected void doBlock(final WorkflowBlockStrategyContext workflowBlockStrategyContext) {
|
||||
|
||||
try {
|
||||
workflowBatchHandler.checkWorkflowExecutor(workflowBlockStrategyContext.getWorkflowTaskBatchId(), null);
|
||||
} catch (IOException e) {
|
||||
throw new EasyRetryServerException("校验工作流失败", e);
|
||||
}
|
||||
|
||||
WorkflowTaskBatchGeneratorContext workflowTaskBatchGeneratorContext = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatchGeneratorContext(workflowBlockStrategyContext);
|
||||
workflowBatchGenerator.generateJobTaskBatch(workflowTaskBatchGeneratorContext);
|
||||
}
|
||||
|
@ -5,14 +5,18 @@ import com.aizuda.easy.retry.common.core.constant.SystemConstants;
|
||||
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
|
||||
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
|
||||
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
||||
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
|
||||
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
|
||||
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowBatchGenerator;
|
||||
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext;
|
||||
import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategies.BlockStrategyEnum;
|
||||
import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* @author: xiaowoniu
|
||||
* @date : 2023-12-26
|
||||
@ -22,18 +26,15 @@ import org.springframework.stereotype.Component;
|
||||
@RequiredArgsConstructor
|
||||
public class DiscardWorkflowBlockStrategy extends AbstractWorkflowBlockStrategy {
|
||||
private final WorkflowBatchGenerator workflowBatchGenerator;
|
||||
private final WorkflowBatchHandler workflowBatchHandler;
|
||||
@Override
|
||||
protected void doBlock(final WorkflowBlockStrategyContext workflowBlockStrategyContext) {
|
||||
|
||||
// 重新尝试执行, 重新生成任务批次
|
||||
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
|
||||
taskExecuteDTO.setWorkflowTaskBatchId(workflowBlockStrategyContext.getWorkflowTaskBatchId());
|
||||
taskExecuteDTO.setWorkflowId(workflowBlockStrategyContext.getWorkflowId());
|
||||
taskExecuteDTO.setTriggerType(workflowBlockStrategyContext.getTriggerType());
|
||||
taskExecuteDTO.setParentId(SystemConstants.ROOT);
|
||||
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
|
||||
actorRef.tell(taskExecuteDTO, actorRef);
|
||||
|
||||
try {
|
||||
workflowBatchHandler.checkWorkflowExecutor(workflowBlockStrategyContext.getWorkflowTaskBatchId(), null);
|
||||
} catch (IOException e) {
|
||||
throw new EasyRetryServerException("校验工作流失败", e);
|
||||
}
|
||||
// 生成状态为取消的工作流批次
|
||||
WorkflowTaskBatchGeneratorContext workflowTaskBatchGeneratorContext = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatchGeneratorContext(workflowBlockStrategyContext);
|
||||
workflowTaskBatchGeneratorContext.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
|
||||
|
@ -1,13 +1,18 @@
|
||||
package com.aizuda.easy.retry.server.job.task.support.handler;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
|
||||
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
|
||||
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
|
||||
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
|
||||
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
|
||||
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
|
||||
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
||||
import com.aizuda.easy.retry.server.common.util.DateUtils;
|
||||
import com.aizuda.easy.retry.server.common.util.GraphUtils;
|
||||
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
|
||||
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
|
||||
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
|
||||
import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler;
|
||||
@ -59,7 +64,7 @@ public class WorkflowBatchHandler {
|
||||
|
||||
public boolean complete(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) throws IOException {
|
||||
workflowTaskBatch = Optional.ofNullable(workflowTaskBatch)
|
||||
.orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId));
|
||||
.orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId));
|
||||
Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在"));
|
||||
|
||||
String flowInfo = workflowTaskBatch.getFlowInfo();
|
||||
@ -67,8 +72,8 @@ public class WorkflowBatchHandler {
|
||||
|
||||
// 说明没有后继节点了, 此时需要判断整个DAG是否全部执行完成
|
||||
List<JobTaskBatch> jobTaskBatches = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
|
||||
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
|
||||
.in(JobTaskBatch::getWorkflowNodeId, graph.nodes())
|
||||
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
|
||||
.in(JobTaskBatch::getWorkflowNodeId, graph.nodes())
|
||||
);
|
||||
|
||||
if (CollectionUtils.isEmpty(jobTaskBatches)) {
|
||||
@ -76,21 +81,21 @@ public class WorkflowBatchHandler {
|
||||
}
|
||||
|
||||
if (jobTaskBatches.stream().anyMatch(
|
||||
jobTaskBatch -> JobTaskBatchStatusEnum.NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus()))) {
|
||||
jobTaskBatch -> JobTaskBatchStatusEnum.NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus()))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
|
||||
.in(WorkflowNode::getId, graph.nodes()));
|
||||
.in(WorkflowNode::getId, graph.nodes()));
|
||||
if (jobTaskBatches.size() < workflowNodes.size()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
Map<Long, WorkflowNode> workflowNodeMap = workflowNodes.stream()
|
||||
.collect(Collectors.toMap(WorkflowNode::getId, workflowNode -> workflowNode));
|
||||
.collect(Collectors.toMap(WorkflowNode::getId, workflowNode -> workflowNode));
|
||||
|
||||
Map<Long, List<JobTaskBatch>> map = jobTaskBatches.stream()
|
||||
.collect(Collectors.groupingBy(JobTaskBatch::getParentWorkflowNodeId));
|
||||
.collect(Collectors.groupingBy(JobTaskBatch::getParentWorkflowNodeId));
|
||||
|
||||
int taskStatus = JobTaskBatchStatusEnum.SUCCESS.getStatus();
|
||||
int operationReason = JobOperationReasonEnum.NONE.getReason();
|
||||
@ -102,7 +107,7 @@ public class WorkflowBatchHandler {
|
||||
for (final Long predecessor : predecessors) {
|
||||
List<JobTaskBatch> jobTaskBatcheList = map.get(predecessor);
|
||||
Map<Integer, Long> statusCountMap = jobTaskBatcheList.stream()
|
||||
.collect(Collectors.groupingBy(JobTaskBatch::getTaskBatchStatus, Collectors.counting()));
|
||||
.collect(Collectors.groupingBy(JobTaskBatch::getTaskBatchStatus, Collectors.counting()));
|
||||
long successCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.SUCCESS.getStatus(), 0L);
|
||||
long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L);
|
||||
long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L);
|
||||
@ -128,9 +133,10 @@ public class WorkflowBatchHandler {
|
||||
for (final Long predecessor : predecessors) {
|
||||
List<JobTaskBatch> jobTaskBatcheList = map.get(predecessor);
|
||||
Map<Integer, Long> statusCountMap = jobTaskBatcheList.stream()
|
||||
.collect(Collectors.groupingBy(JobTaskBatch::getTaskBatchStatus, Collectors.counting()));
|
||||
.collect(Collectors.groupingBy(JobTaskBatch::getTaskBatchStatus, Collectors.counting()));
|
||||
long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L);
|
||||
long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L); if (failCount > 0) {
|
||||
long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L);
|
||||
if (failCount > 0) {
|
||||
taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
|
||||
operationReason = JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason();
|
||||
break;
|
||||
@ -164,13 +170,13 @@ public class WorkflowBatchHandler {
|
||||
jobTaskBatch.setTaskBatchStatus(taskStatus);
|
||||
jobTaskBatch.setOperationReason(operationReason);
|
||||
Assert.isTrue(1 == workflowTaskBatchMapper.updateById(jobTaskBatch),
|
||||
() -> new EasyRetryServerException("更新任务失败"));
|
||||
() -> new EasyRetryServerException("更新任务失败"));
|
||||
|
||||
}
|
||||
|
||||
public void stop(Long workflowTaskBatchId, Integer operationReason) {
|
||||
if (Objects.isNull(operationReason)
|
||||
|| operationReason == JobOperationReasonEnum.NONE.getReason()) {
|
||||
|| operationReason == JobOperationReasonEnum.NONE.getReason()) {
|
||||
operationReason = JobOperationReasonEnum.JOB_OVERLAY.getReason();
|
||||
}
|
||||
|
||||
@ -180,20 +186,20 @@ public class WorkflowBatchHandler {
|
||||
workflowTaskBatch.setId(workflowTaskBatchId);
|
||||
// 先停止执行中的批次
|
||||
Assert.isTrue(1 == workflowTaskBatchMapper.updateById(workflowTaskBatch),
|
||||
() -> new EasyRetryServerException("停止工作流批次失败. id:[{}]",
|
||||
workflowTaskBatchId));
|
||||
() -> new EasyRetryServerException("停止工作流批次失败. id:[{}]",
|
||||
workflowTaskBatchId));
|
||||
|
||||
// 关闭已经触发的任务
|
||||
List<JobTaskBatch> jobTaskBatches = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
|
||||
.in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE)
|
||||
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId));
|
||||
.in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE)
|
||||
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId));
|
||||
|
||||
if (CollectionUtils.isEmpty(jobTaskBatches)) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<Job> jobs = jobMapper.selectBatchIds(
|
||||
jobTaskBatches.stream().map(JobTaskBatch::getJobId).collect(Collectors.toSet()));
|
||||
jobTaskBatches.stream().map(JobTaskBatch::getJobId).collect(Collectors.toSet()));
|
||||
|
||||
Map<Long, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i));
|
||||
for (final JobTaskBatch jobTaskBatch : jobTaskBatches) {
|
||||
@ -213,7 +219,66 @@ public class WorkflowBatchHandler {
|
||||
}
|
||||
}
|
||||
|
||||
public void checkWorkflowExecutor() {
|
||||
public void checkWorkflowExecutor(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) throws IOException {
|
||||
workflowTaskBatch = Optional.ofNullable(workflowTaskBatch)
|
||||
.orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId));
|
||||
Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在"));
|
||||
String flowInfo = workflowTaskBatch.getFlowInfo();
|
||||
MutableGraph<Long> graph = GraphUtils.deserializeJsonToGraph(flowInfo);
|
||||
Set<Long> successors = graph.successors(SystemConstants.ROOT);
|
||||
if (CollectionUtils.isEmpty(successors)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 说明没有后继节点了, 此时需要判断整个DAG是否全部执行完成
|
||||
List<JobTaskBatch> jobTaskBatches = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
|
||||
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId)
|
||||
.in(JobTaskBatch::getWorkflowNodeId, graph.nodes())
|
||||
);
|
||||
|
||||
Map<Long, JobTaskBatch> jobTaskBatchMap = jobTaskBatches.stream().collect(Collectors.toMap(JobTaskBatch::getWorkflowNodeId, i -> i));
|
||||
|
||||
checkWorkflowExecutor(SystemConstants.ROOT, workflowTaskBatchId, graph, jobTaskBatchMap);
|
||||
}
|
||||
}
|
||||
|
||||
private void checkWorkflowExecutor(Long parentId, Long workflowTaskBatchId, MutableGraph<Long> graph, Map<Long, JobTaskBatch> jobTaskBatchMap) {
|
||||
|
||||
Set<Long> successors = graph.successors(parentId);
|
||||
if (CollectionUtils.isEmpty(successors)) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (Long successor : successors) {
|
||||
JobTaskBatch jobTaskBatch = jobTaskBatchMap.get(successor);
|
||||
if (Objects.isNull(jobTaskBatch)) {
|
||||
// 重新尝试执行, 重新生成任务批次
|
||||
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
|
||||
taskExecuteDTO.setWorkflowTaskBatchId(workflowTaskBatchId);
|
||||
taskExecuteDTO.setWorkflowId(successor);
|
||||
taskExecuteDTO.setTriggerType(1);
|
||||
taskExecuteDTO.setParentId(successor);
|
||||
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
|
||||
actorRef.tell(taskExecuteDTO, actorRef);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus())) {
|
||||
// 生成任务批次
|
||||
Job job = jobMapper.selectById(jobTaskBatch.getJobId());
|
||||
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job);
|
||||
jobTaskPrepare.setTriggerType(JobTriggerTypeEnum.WORKFLOW.getType());
|
||||
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli());
|
||||
jobTaskPrepare.setWorkflowNodeId(successor);
|
||||
jobTaskPrepare.setWorkflowTaskBatchId(workflowTaskBatchId);
|
||||
jobTaskPrepare.setParentWorkflowNodeId(parentId);
|
||||
// 执行预处理阶段
|
||||
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
|
||||
actorRef.tell(jobTaskPrepare, actorRef);
|
||||
continue;
|
||||
}
|
||||
|
||||
// 已经是终态的需要递归查询是否已经完成
|
||||
checkWorkflowExecutor(successor, workflowTaskBatchId, graph, jobTaskBatchMap);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user