feat(sj_1.1.0-beta2): 工作流添加RECOVERY阻塞策略

This commit is contained in:
opensnail 2024-06-26 13:50:14 +08:00
parent b60892b76f
commit 1162271bdc
4 changed files with 56 additions and 14 deletions

View File

@ -25,11 +25,11 @@ public class ConcurrencyWorkflowBlockStrategy extends AbstractWorkflowBlockStrat
@Override
protected void doBlock(final WorkflowBlockStrategyContext workflowBlockStrategyContext) {
try {
workflowBatchHandler.checkWorkflowExecutor(workflowBlockStrategyContext.getWorkflowTaskBatchId(), null);
} catch (IOException e) {
throw new SnailJobServerException("校验工作流失败", e);
}
// try {
// workflowBatchHandler.recoveryWorkflowExecutor(workflowBlockStrategyContext.getWorkflowTaskBatchId(), null);
// } catch (IOException e) {
// throw new SnailJobServerException("校验工作流失败", e);
// }
WorkflowTaskBatchGeneratorContext workflowTaskBatchGeneratorContext = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatchGeneratorContext(workflowBlockStrategyContext);
workflowBatchGenerator.generateJobTaskBatch(workflowTaskBatchGeneratorContext);

View File

@ -27,11 +27,11 @@ public class DiscardWorkflowBlockStrategy extends AbstractWorkflowBlockStrategy
@Override
protected void doBlock(final WorkflowBlockStrategyContext workflowBlockStrategyContext) {
try {
workflowBatchHandler.checkWorkflowExecutor(workflowBlockStrategyContext.getWorkflowTaskBatchId(), null);
} catch (IOException e) {
throw new SnailJobServerException("校验工作流失败", e);
}
// try {
// workflowBatchHandler.recoveryWorkflowExecutor(workflowBlockStrategyContext.getWorkflowTaskBatchId(), null);
// } catch (IOException e) {
// throw new SnailJobServerException("校验工作流失败", e);
// }
// 生成状态为取消的工作流批次
WorkflowTaskBatchGeneratorContext workflowTaskBatchGeneratorContext = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatchGeneratorContext(workflowBlockStrategyContext);
workflowTaskBatchGeneratorContext.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());

View File

@ -0,0 +1,35 @@
package com.aizuda.snailjob.server.job.task.support.block.workflow;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author: opensnail
* @date : 2024-06-26
* @since : sj_1.1.0
*/
@Component
@RequiredArgsConstructor
public class RecoveryWorkflowBlockStrategy extends AbstractWorkflowBlockStrategy {
private final WorkflowBatchHandler workflowBatchHandler;
@Override
protected void doBlock(final WorkflowBlockStrategyContext workflowBlockStrategyContext) {
try {
workflowBatchHandler.recoveryWorkflowExecutor(workflowBlockStrategyContext.getWorkflowTaskBatchId(), null);
} catch (IOException e) {
throw new SnailJobServerException("校验工作流失败", e);
}
}
@Override
protected BlockStrategyEnum blockStrategyEnum() {
return BlockStrategyEnum.RECOVERY;
}
}

View File

@ -222,7 +222,14 @@ public class WorkflowBatchHandler {
}
}
public void checkWorkflowExecutor(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) throws IOException {
/**
* 重新触发未执行成功的工作流节点
*
* @param workflowTaskBatchId 工作流批次
* @param workflowTaskBatch 工作流批次信息(若为null, 则会通过workflowTaskBatchId查询)
* @throws IOException
*/
public void recoveryWorkflowExecutor(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) throws IOException {
workflowTaskBatch = Optional.ofNullable(workflowTaskBatch)
.orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId));
Assert.notNull(workflowTaskBatch, () -> new SnailJobServerException("任务不存在"));
@ -241,10 +248,10 @@ public class WorkflowBatchHandler {
Map<Long, JobTaskBatch> jobTaskBatchMap = StreamUtils.toIdentityMap(jobTaskBatches, JobTaskBatch::getWorkflowNodeId);
checkWorkflowExecutor(SystemConstants.ROOT, workflowTaskBatchId, graph, jobTaskBatchMap);
recoveryWorkflowExecutor(SystemConstants.ROOT, workflowTaskBatchId, graph, jobTaskBatchMap);
}
private void checkWorkflowExecutor(Long parentId, Long workflowTaskBatchId, MutableGraph<Long> graph, Map<Long, JobTaskBatch> jobTaskBatchMap) {
private void recoveryWorkflowExecutor(Long parentId, Long workflowTaskBatchId, MutableGraph<Long> graph, Map<Long, JobTaskBatch> jobTaskBatchMap) {
// 判定条件节点是否已经执行完成
JobTaskBatch parentJobTaskBatch = jobTaskBatchMap.get(parentId);
@ -289,7 +296,7 @@ public class WorkflowBatchHandler {
}
// 已经是终态的需要递归遍历后继节点是否正常执行
checkWorkflowExecutor(successor, workflowTaskBatchId, graph, jobTaskBatchMap);
recoveryWorkflowExecutor(successor, workflowTaskBatchId, graph, jobTaskBatchMap);
}
}