feat: 2.6.0

1. 调试条件节点
This commit is contained in:
byteblogs168 2023-12-26 18:35:24 +08:00
parent 37fdb3a731
commit e12235fda1
19 changed files with 517 additions and 32 deletions

View File

@ -4,8 +4,6 @@ import cn.hutool.core.util.StrUtil;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.util.HashMap;
/**
* 标识某个操作的具体原因
*
@ -27,6 +25,7 @@ public enum JobOperationReasonEnum {
TASK_EXECUTE_ERROR(7, "任务执行期间发生非预期异常"),
MANNER_STOP(8, "手动停止"),
WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR(8, "条件节点执行异常"),
JOB_TASK_INTERRUPTED(8, "任务中断"),
;
private final int reason;

View File

@ -19,6 +19,11 @@ public class WorkflowTaskPrepareDTO {
*/
private Integer triggerType;
/**
* 阻塞策略 1丢弃 2覆盖 3并行
*/
private Integer blockStrategy;
/**
* 工作流名称
*/
@ -58,4 +63,14 @@ public class WorkflowTaskPrepareDTO {
* 下次触发时间
*/
private long nextTriggerAt;
/**
* 任务执行时间
*/
private Long executionAt;
/**
* 仅做超时检测
*/
private boolean onlyTimeoutCheck;
}

View File

@ -5,7 +5,14 @@ import com.aizuda.easy.retry.server.job.task.support.strategy.BlockStrategies.Bl
/**
* @author: www.byteblogs.com
* @date : 2023-09-25 17:53
* @since : 1.0.0
*/
public interface BlockStrategy {
/**
* 阻塞策略
*
* @param context 策略上下文
*/
void block(BlockStrategyContext context);
}

View File

@ -52,6 +52,7 @@ public interface JobTaskConverter {
TaskStopJobContext toStopJobContext(BlockStrategies.BlockStrategyContext context);
TaskStopJobContext toStopJobContext(JobExecutorResultDTO context);
TaskStopJobContext toStopJobContext(Job job);
TaskStopJobContext toStopJobContext(JobTaskPrepareDTO context);
JobLogMessage toJobLogMessage(JobLogDTO jobLogDTO);

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowPartitionTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.block.workflow.WorkflowBlockStrategyContext;
import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowExecutorContext;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext;
@ -41,4 +42,8 @@ public interface WorkflowTaskConverter {
@Mapping(source = "id", target = "workflowNodeId")
)
WorkflowExecutorContext toWorkflowExecutorContext(WorkflowNode workflowNode);
WorkflowTaskBatchGeneratorContext toWorkflowTaskBatchGeneratorContext(WorkflowBlockStrategyContext context);
WorkflowBlockStrategyContext toWorkflowBlockStrategyContext(WorkflowTaskPrepareDTO prepareDTO);
}

View File

@ -0,0 +1,32 @@
package com.aizuda.easy.retry.server.job.task.support.block.workflow;
import com.aizuda.easy.retry.server.job.task.support.BlockStrategy;
import com.aizuda.easy.retry.server.job.task.support.strategy.BlockStrategies.BlockStrategyContext;
import com.aizuda.easy.retry.server.job.task.support.strategy.BlockStrategies.BlockStrategyEnum;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.transaction.annotation.Transactional;
/**
* @author: xiaowoniu
* @date : 2023-12-26
* @since : 2.6.0
*/
public abstract class AbstractWorkflowBlockStrategy implements BlockStrategy, InitializingBean {
@Override
@Transactional
public void block(final BlockStrategyContext context) {
WorkflowBlockStrategyContext workflowBlockStrategyContext = (WorkflowBlockStrategyContext) context;
doBlock(workflowBlockStrategyContext);
}
protected abstract void doBlock(final WorkflowBlockStrategyContext workflowBlockStrategyContext);
protected abstract BlockStrategyEnum blockStrategyEnum();
@Override
public void afterPropertiesSet() throws Exception {
WorkflowBlockStrategyFactory.registerTaskStop(blockStrategyEnum(), this);
}
}

View File

@ -0,0 +1,32 @@
package com.aizuda.easy.retry.server.job.task.support.block.workflow;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowBatchGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.job.task.support.strategy.BlockStrategies.BlockStrategyEnum;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
/**
* @author: shuguang.zhang
* @date : 2023-12-26
* @since : 2.6.0
*/
@Component
@RequiredArgsConstructor
public class ConcurrencyWorkflowBlockStrategy extends AbstractWorkflowBlockStrategy {
private final WorkflowBatchGenerator workflowBatchGenerator;
@Override
protected void doBlock(final WorkflowBlockStrategyContext workflowBlockStrategyContext) {
WorkflowTaskBatchGeneratorContext workflowTaskBatchGeneratorContext = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatchGeneratorContext(workflowBlockStrategyContext);
workflowBatchGenerator.generateJobTaskBatch(workflowTaskBatchGeneratorContext);
}
@Override
protected BlockStrategyEnum blockStrategyEnum() {
return BlockStrategyEnum.CONCURRENCY;
}
}

View File

@ -0,0 +1,34 @@
package com.aizuda.easy.retry.server.job.task.support.block.workflow;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowBatchGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.job.task.support.strategy.BlockStrategies.BlockStrategyEnum;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
/**
* @author: xiaowoniu
* @date : 2023-12-26
* @since : 2.6.0
*/
@Component
@RequiredArgsConstructor
public class DiscardWorkflowBlockStrategy extends AbstractWorkflowBlockStrategy {
private final WorkflowBatchGenerator workflowBatchGenerator;
@Override
protected void doBlock(final WorkflowBlockStrategyContext workflowBlockStrategyContext) {
// 生成状态为取消的工作流批次
WorkflowTaskBatchGeneratorContext workflowTaskBatchGeneratorContext = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatchGeneratorContext(workflowBlockStrategyContext);
workflowTaskBatchGeneratorContext.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
workflowTaskBatchGeneratorContext.setOperationReason(JobOperationReasonEnum.JOB_DISCARD.getReason());
workflowBatchGenerator.generateJobTaskBatch(workflowTaskBatchGeneratorContext);
}
@Override
protected BlockStrategyEnum blockStrategyEnum() {
return BlockStrategyEnum.DISCARD;
}
}

View File

@ -0,0 +1,64 @@
package com.aizuda.easy.retry.server.job.task.support.block.workflow;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.support.BlockStrategy;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowBatchGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory;
import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext;
import com.aizuda.easy.retry.server.job.task.support.strategy.BlockStrategies;
import com.aizuda.easy.retry.server.job.task.support.strategy.BlockStrategies.BlockStrategyEnum;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import static com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum.NOT_COMPLETE;
/**
* @author: xiaowoniu
* @date : 2023-12-26
* @since : 2.6.0
*/
@Component
@RequiredArgsConstructor
public class OverlayWorkflowBlockStrategy extends AbstractWorkflowBlockStrategy {
private final WorkflowBatchHandler workflowBatchHandler;
private final WorkflowBatchGenerator workflowBatchGenerator;
@Override
protected void doBlock(final WorkflowBlockStrategyContext workflowBlockStrategyContext) {
// 停止当前批次
workflowBatchHandler.stop(workflowBlockStrategyContext.getWorkflowTaskBatchId(), workflowBlockStrategyContext.getOperationReason());
// 重新生成一个批次
WorkflowTaskBatchGeneratorContext workflowTaskBatchGeneratorContext = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatchGeneratorContext(
workflowBlockStrategyContext);
workflowBatchGenerator.generateJobTaskBatch(workflowTaskBatchGeneratorContext);
}
@Override
protected BlockStrategyEnum blockStrategyEnum() {
return BlockStrategyEnum.OVERLAY;
}
}

View File

@ -0,0 +1,25 @@
package com.aizuda.easy.retry.server.job.task.support.block.workflow;
import com.aizuda.easy.retry.server.job.task.support.strategy.BlockStrategies.BlockStrategyContext;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* @author: xiaowoniu
* @date : 2023-12-26
* @since : 2.6.0
*/
@EqualsAndHashCode(callSuper = true)
@Data
public class WorkflowBlockStrategyContext extends BlockStrategyContext {
/**
* 工作流id
*/
private Long workflowId;
/**
* 工作流任务批次id
*/
private Long workflowTaskBatchId;
}

View File

@ -0,0 +1,28 @@
package com.aizuda.easy.retry.server.job.task.support.block.workflow;
import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.job.task.support.BlockStrategy;
import com.aizuda.easy.retry.server.job.task.support.strategy.BlockStrategies.BlockStrategyEnum;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author: xiaowoniu
* @date : 2023-12-26
* @since : 2.6.0
*/
public final class WorkflowBlockStrategyFactory {
private static final ConcurrentHashMap<BlockStrategyEnum, BlockStrategy> CACHE = new ConcurrentHashMap<>();
private WorkflowBlockStrategyFactory() {
}
protected static void registerTaskStop(BlockStrategyEnum blockStrategyEnum, BlockStrategy blockStrategy) {
CACHE.put(blockStrategyEnum, blockStrategy);
}
public static BlockStrategy getJobTaskStop(Integer blockStrategy) {
return CACHE.get(BlockStrategyEnum.valueOf(blockStrategy));
}
}

View File

@ -1,7 +1,9 @@
package com.aizuda.easy.retry.server.job.task.support.dispatch;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
@ -17,6 +19,7 @@ import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.JobExecutor;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache;
@ -106,6 +109,26 @@ public class JobExecutorActor extends AbstractActor {
job.getNamespaceId()))) {
taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
operationReason = JobOperationReasonEnum.NOT_CLIENT.getReason();
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCompletion(int status) {
if (Objects.nonNull(taskExecute.getWorkflowNodeId()) && Objects.nonNull(taskExecute.getWorkflowTaskBatchId())) {
// 若是工作流则开启下一个任务
try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
taskExecuteDTO.setParentId(taskExecute.getWorkflowNodeId());
taskExecuteDTO.setResult(StrUtil.EMPTY);
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
} catch (Exception e) {
log.error("任务调度执行失败", e);
}
}
}
});
}
// 更新状态

View File

@ -95,21 +95,27 @@ public class WorkflowExecutorActor extends AbstractActor {
return;
}
// 添加父节点为了判断父节点的处理状态
successors.add(taskExecute.getParentId());
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
.select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId)
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
.in(JobTaskBatch::getWorkflowNodeId, successors)
);
Map<Long, JobTaskBatch> jobTaskBatchMap = jobTaskBatchList.stream().collect(Collectors.toMap(JobTaskBatch::getWorkflowNodeId, i -> i));
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
.in(WorkflowNode::getId, successors).orderByAsc(WorkflowNode::getPriorityLevel));
.in(WorkflowNode::getId, successors).orderByAsc(WorkflowNode::getPriorityLevel));
Map<Long, JobTaskBatch> jobTaskBatchMap = jobTaskBatchList.stream().collect(Collectors.toMap(JobTaskBatch::getWorkflowNodeId, i -> i));
JobTaskBatch jobTaskBatch = jobTaskBatchMap.get(taskExecute.getParentId());
if (JobTaskBatchStatusEnum.SUCCESS.getStatus() != jobTaskBatch.getTaskBatchStatus()) {
// 判断是否继续处理根据失败策略
}
List<Job> jobs = jobMapper.selectBatchIds(workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet()));
Map<Long, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i));
// 不管什么任务都需要创建一个 job_task_batch记录 保障一个节点执行创建一次同时可以判断出DAG是否全部执行完成
// 只会条件节点会使用
Boolean evaluationResult = null;
for (WorkflowNode workflowNode : workflowNodes) {

View File

@ -29,6 +29,7 @@ import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@ -66,8 +67,12 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
taskBatchStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
} else {
try {
Map<String, Object> contextMap = new HashMap<>();
// 根据配置的表达式执行
result = doEval(context.getNodeExpression(), JsonUtil.parseHashMap(context.getResult()));
if (StrUtil.isNotBlank(context.getResult())) {
contextMap = JsonUtil.parseHashMap(context.getResult());
}
result = doEval(context.getNodeExpression(), contextMap);
log.info("执行条件表达式:[{}],参数: [{}] 结果:[{}]", context.getNodeExpression(), context.getResult(), result);
} catch (Exception e) {
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();

View File

@ -1,6 +1,10 @@
package com.aizuda.easy.retry.server.job.task.support.generator.batch;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTimerTaskDTO;
@ -14,7 +18,9 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
@ -33,7 +39,23 @@ public class WorkflowBatchGenerator {
// 生成任务批次
WorkflowTaskBatch workflowTaskBatch = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatch(context);
workflowTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.WAITING.getStatus());
workflowTaskBatchMapper.insert(workflowTaskBatch);
// 无执行的节点
if (CollectionUtils.isEmpty(CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()))) {
workflowTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
workflowTaskBatch.setOperationReason(JobOperationReasonEnum.NOT_CLIENT.getReason());
} else {
// 生成一个新的任务
workflowTaskBatch.setTaskBatchStatus(Optional.ofNullable(context.getTaskBatchStatus()).orElse(JobTaskBatchStatusEnum.WAITING.getStatus()));
workflowTaskBatch.setOperationReason(context.getOperationReason());
}
Assert.isTrue(1 == workflowTaskBatchMapper.insert(workflowTaskBatch), () -> new EasyRetryServerException("新增调度任务失败. [{}]", context.getWorkflowId()));
// 非待处理状态无需进入时间轮中
if (JobTaskBatchStatusEnum.WAITING.getStatus() != workflowTaskBatch.getTaskBatchStatus()) {
return;
}
// 开始执行工作流
// 进入时间轮

View File

@ -3,14 +3,22 @@ package com.aizuda.easy.retry.server.job.task.support.handler;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.common.util.GraphUtils;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler;
import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory;
import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
@ -18,9 +26,18 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.graph.MutableGraph;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum.COMPLETED;
import static com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum.NOT_COMPLETE;
/**
* @author xiaowoniu
@ -30,6 +47,7 @@ import java.util.Optional;
@Component
@RequiredArgsConstructor
public class WorkflowBatchHandler {
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
private final WorkflowNodeMapper workflowNodeMapper;
private final JobMapper jobMapper;
@ -38,29 +56,103 @@ public class WorkflowBatchHandler {
public boolean complete(Long workflowTaskBatchId) throws IOException {
return complete(workflowTaskBatchId, null);
}
public boolean complete(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) throws IOException {
workflowTaskBatch = Optional.ofNullable(workflowTaskBatch).orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId));
workflowTaskBatch = Optional.ofNullable(workflowTaskBatch)
.orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId));
Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在"));
String flowInfo = workflowTaskBatch.getFlowInfo();
MutableGraph<Long> graph = GraphUtils.deserializeJsonToGraph(flowInfo);
// 说明没有后继节点了, 此时需要判断整个DAG是否全部执行完成
long executedTaskCount = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
.in(JobTaskBatch::getWorkflowNodeId, graph.nodes())
List<JobTaskBatch> jobTaskBatches = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
.in(JobTaskBatch::getWorkflowNodeId, graph.nodes())
);
Long taskNodeCount = workflowNodeMapper.selectCount(new LambdaQueryWrapper<WorkflowNode>()
// .eq(WorkflowNode::getNodeType, 1) // TODO 任务节点 若最后一个节点是条件或者是回调节点 这个地方就有问题
.in(WorkflowNode::getId, graph.nodes()));
// TODO 若最后几个节点都是非任务节点这里直接完成就会有问题
if (executedTaskCount < taskNodeCount) {
if (CollectionUtils.isEmpty(jobTaskBatches)) {
return false;
}
handlerTaskBatch(workflowTaskBatchId, JobTaskBatchStatusEnum.SUCCESS.getStatus(), JobOperationReasonEnum.NONE.getReason());
if (jobTaskBatches.stream().anyMatch(
jobTaskBatch -> JobTaskBatchStatusEnum.NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus()))) {
return false;
}
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
.in(WorkflowNode::getId, graph.nodes()));
if (jobTaskBatches.size() < workflowNodes.size()) {
return false;
}
Map<Long, WorkflowNode> workflowNodeMap = workflowNodes.stream()
.collect(Collectors.toMap(WorkflowNode::getId, workflowNode -> workflowNode));
Map<Long, List<JobTaskBatch>> map = jobTaskBatches.stream()
.collect(Collectors.groupingBy(JobTaskBatch::getParentWorkflowNodeId));
int taskStatus = JobTaskBatchStatusEnum.SUCCESS.getStatus();
int operationReason = JobOperationReasonEnum.NONE.getReason();
for (final JobTaskBatch jobTaskBatch : jobTaskBatches) {
Set<Long> predecessors = graph.predecessors(jobTaskBatch.getWorkflowNodeId());
WorkflowNode workflowNode = workflowNodeMap.get(jobTaskBatch.getWorkflowNodeId());
// 条件节点是或的关系一个成功就代表成功
if (WorkflowNodeTypeEnum.CONDITION.getType() == workflowNode.getNodeType()) {
for (final Long predecessor : predecessors) {
List<JobTaskBatch> jobTaskBatcheList = map.get(predecessor);
Map<Integer, Long> statusCountMap = jobTaskBatcheList.stream()
.collect(Collectors.groupingBy(JobTaskBatch::getTaskBatchStatus, Collectors.counting()));
long successCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.SUCCESS.getStatus(), 0L);
long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L);
long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L);
if (successCount > 0) {
break;
}
if (failCount > 0) {
taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
operationReason = JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason();
break;
}
if (stopCount > 0) {
taskStatus = JobTaskBatchStatusEnum.STOP.getStatus();
operationReason = JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason();
break;
}
}
} else {
for (final Long predecessor : predecessors) {
List<JobTaskBatch> jobTaskBatcheList = map.get(predecessor);
Map<Integer, Long> statusCountMap = jobTaskBatcheList.stream()
.collect(Collectors.groupingBy(JobTaskBatch::getTaskBatchStatus, Collectors.counting()));
long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L);
long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L); if (failCount > 0) {
taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
operationReason = JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason();
break;
}
if (stopCount > 0) {
taskStatus = JobTaskBatchStatusEnum.STOP.getStatus();
operationReason = JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason();
break;
}
}
}
if (taskStatus != JobTaskBatchStatusEnum.SUCCESS.getStatus()) {
break;
}
}
handlerTaskBatch(workflowTaskBatchId, taskStatus, operationReason);
return true;
}
@ -69,11 +161,51 @@ public class WorkflowBatchHandler {
WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch();
jobTaskBatch.setId(workflowTaskBatchId);
jobTaskBatch.setExecutionAt(DateUtils.toNowMilli());
jobTaskBatch.setTaskBatchStatus(taskStatus);
jobTaskBatch.setOperationReason(operationReason);
Assert.isTrue(1 == workflowTaskBatchMapper.updateById(jobTaskBatch),
() -> new EasyRetryServerException("更新任务失败"));
() -> new EasyRetryServerException("更新任务失败"));
}
public void stop(Long workflowTaskBatchId, Integer operationReason) {
if (Objects.isNull(operationReason)
|| operationReason == JobOperationReasonEnum.NONE.getReason()) {
operationReason = JobOperationReasonEnum.JOB_OVERLAY.getReason();
}
WorkflowTaskBatch workflowTaskBatch = new WorkflowTaskBatch();
workflowTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus());
workflowTaskBatch.setOperationReason(operationReason);
workflowTaskBatch.setId(workflowTaskBatchId);
// 先停止执行中的批次
Assert.isTrue(1 == workflowTaskBatchMapper.updateById(workflowTaskBatch),
() -> new EasyRetryServerException("停止工作流批次失败. id:[{}]",
workflowTaskBatchId));
// 关闭已经触发的任务
List<JobTaskBatch> jobTaskBatches = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
.in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE)
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId));
List<Job> jobs = jobMapper.selectBatchIds(
jobTaskBatches.stream().map(JobTaskBatch::getJobId).collect(Collectors.toSet()));
Map<Long, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i));
for (final JobTaskBatch jobTaskBatch : jobTaskBatches) {
Job job = jobMap.get(jobTaskBatch.getJobId());
if (Objects.nonNull(job)) {
// 停止任务
JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(job.getTaskType());
TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(job);
stopJobContext.setTaskBatchId(jobTaskBatch.getId());
stopJobContext.setJobOperationReason(JobOperationReasonEnum.JOB_TASK_INTERRUPTED.getReason());
stopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE);
stopJobContext.setForceStop(Boolean.TRUE);
instanceInterrupt.stop(stopJobContext);
}
}
}
}

View File

@ -1,9 +1,21 @@
package com.aizuda.easy.retry.server.job.task.support.prepare.workflow;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.CompleteJobBatchDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.BlockStrategy;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.block.workflow.WorkflowBlockStrategyContext;
import com.aizuda.easy.retry.server.job.task.support.block.workflow.WorkflowBlockStrategyFactory;
import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory;
import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext;
import com.aizuda.easy.retry.server.job.task.support.strategy.BlockStrategies.BlockStrategyEnum;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@ -20,6 +32,7 @@ import java.util.Objects;
@RequiredArgsConstructor
@Slf4j
public class RunningWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler {
private final WorkflowBatchHandler workflowBatchHandler;
@Override
@ -28,18 +41,43 @@ public class RunningWorkflowPrepareHandler extends AbstractWorkflowPrePareHandle
}
@Override
protected void doHandler(WorkflowTaskPrepareDTO jobPrepareDTO) {
log.info("存在运行中的任务. prepare:[{}]", JsonUtil.toJsonString(jobPrepareDTO));
protected void doHandler(WorkflowTaskPrepareDTO prepare) {
log.info("存在运行中的任务. prepare:[{}]", JsonUtil.toJsonString(prepare));
// 1. 若DAG已经支持完成了由于异常原因导致的没有更新成终态此次进行一次更新操作
try {
workflowBatchHandler.complete(jobPrepareDTO.getWorkflowTaskBatchId());
} catch (IOException e) {
// TODO 待处理
}
// 1. 若DAG已经支持完成了由于异常原因导致的没有更新成终态此次进行一次更新操作
int blockStrategy = prepare.getBlockStrategy();
if (workflowBatchHandler.complete(prepare.getWorkflowTaskBatchId())) {
// 开启新的任务
blockStrategy = BlockStrategyEnum.CONCURRENCY.getBlockStrategy();
} else {
// 计算超时时间
long delay = DateUtils.toNowMilli() - prepare.getExecutionAt();
// 2. 判断DAG是否已经支持超时
// 3. 支持阻塞策略同JOB逻辑一致
// 2. 判断DAG是否已经支持超时
// 计算超时时间到达超时时间中断任务
if (delay > DateUtils.toEpochMilli(prepare.getExecutorTimeout())) {
log.info("任务执行超时.workflowTaskBatchId:[{}] delay:[{}] executorTimeout:[{}]",
prepare.getWorkflowTaskBatchId(), delay, DateUtils.toEpochMilli(prepare.getExecutorTimeout()));
// 超时停止任务
workflowBatchHandler.stop(prepare.getWorkflowTaskBatchId(), JobOperationReasonEnum.EXECUTE_TIMEOUT.getReason());
}
}
// 仅是超时检测的不执行阻塞策略
if (prepare.isOnlyTimeoutCheck()) {
return;
}
// 3. 支持阻塞策略同JOB逻辑一致
BlockStrategy blockStrategyInterface = WorkflowBlockStrategyFactory.getJobTaskStop(blockStrategy);
WorkflowBlockStrategyContext workflowBlockStrategyContext = WorkflowTaskConverter.INSTANCE.toWorkflowBlockStrategyContext(
prepare);
blockStrategyInterface.block(workflowBlockStrategyContext);
} catch (IOException e) {
log.error("更新任务状态失败. prepare:[{}]", JsonUtil.toJsonString(prepare), e);
}
}

View File

@ -10,10 +10,13 @@ import java.util.concurrent.ConcurrentHashMap;
* @date 2023-10-02 13:04:09
* @since 2.4.0
*/
public class JobTaskStopFactory {
public final class JobTaskStopFactory {
private static final ConcurrentHashMap<TaskTypeEnum, JobTaskStopHandler> CACHE = new ConcurrentHashMap<>();
private JobTaskStopFactory() {
}
public static void registerTaskStop(TaskTypeEnum taskInstanceType, JobTaskStopHandler interrupt) {
CACHE.put(taskInstanceType, interrupt);
}
@ -21,4 +24,8 @@ public class JobTaskStopFactory {
public static JobTaskStopHandler getJobTaskStop(Integer type) {
return CACHE.get(TaskTypeEnum.valueOf(type));
}
public static JobTaskStopFactory createJobTaskStopFactory() {
return new JobTaskStopFactory();
}
}

View File

@ -46,6 +46,16 @@ public class BlockStrategies {
throw new EasyRetryServerException("不符合的阻塞策略. blockStrategy:[{}]", blockStrategy);
}
public static BlockStrategyEnum valueOf(int blockStrategy) {
for (final BlockStrategyEnum value : BlockStrategyEnum.values()) {
if (value.blockStrategy == blockStrategy) {
return value;
}
}
throw new EasyRetryServerException("不符合的阻塞策略. blockStrategy:[{}]", blockStrategy);
}
}
@Data