feat: 2.6.0

1. 调试条件节点
This commit is contained in:
byteblogs168 2023-12-25 18:30:40 +08:00
parent 66a8ca9f7c
commit 0b60a2ee64
11 changed files with 97 additions and 20 deletions

View File

@ -103,7 +103,7 @@ public interface SystemConstants {
Long ROOT = -1L; Long ROOT = -1L;
/** /**
* 根节点 * 系统内置的条件任务ID
*/ */
Long CONDITION_JOB_ID = -1000L; Long CONDITION_JOB_ID = -1000L;
} }

View File

@ -26,7 +26,7 @@ public enum JobOperationReasonEnum {
NOT_EXECUTE_TASK(6, "无可执行任务项"), NOT_EXECUTE_TASK(6, "无可执行任务项"),
TASK_EXECUTE_ERROR(7, "任务执行期间发生非预期异常"), TASK_EXECUTE_ERROR(7, "任务执行期间发生非预期异常"),
MANNER_STOP(8, "手动停止"), MANNER_STOP(8, "手动停止"),
WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR(8, "手动停止"), WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR(8, "条件节点执行异常"),
; ;
private final int reason; private final int reason;

View File

@ -6,6 +6,7 @@ import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowE
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext; import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext; import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext;
import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow; import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch; import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
import org.mapstruct.Mapper; import org.mapstruct.Mapper;
import org.mapstruct.Mapping; import org.mapstruct.Mapping;
@ -35,4 +36,9 @@ public interface WorkflowTaskConverter {
WorkflowTaskBatch toWorkflowTaskBatch(WorkflowTaskBatchGeneratorContext context); WorkflowTaskBatch toWorkflowTaskBatch(WorkflowTaskBatchGeneratorContext context);
JobTaskBatchGeneratorContext toJobTaskBatchGeneratorContext(WorkflowExecutorContext context); JobTaskBatchGeneratorContext toJobTaskBatchGeneratorContext(WorkflowExecutorContext context);
@Mappings(
@Mapping(source = "id", target = "workflowNodeId")
)
WorkflowExecutorContext toWorkflowExecutorContext(WorkflowNode workflowNode);
} }

View File

@ -19,6 +19,7 @@ import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.WorkflowExecutor; import com.aizuda.easy.retry.server.job.task.support.WorkflowExecutor;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowExecutorContext; import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowExecutorContext;
import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowExecutorFactory; import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowExecutorFactory;
import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler; import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler;
@ -106,7 +107,6 @@ public class WorkflowExecutorActor extends AbstractActor {
List<Job> jobs = jobMapper.selectBatchIds(workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet())); List<Job> jobs = jobMapper.selectBatchIds(workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet()));
Map<Long, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i)); Map<Long, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i));
// TODO 此次做策略按照任务节点 条件节点 回调节点做抽象
// 不管什么任务都需要创建一个 job_task_batch记录 保障一个节点执行创建一次同时可以判断出DAG是否全部执行完成 // 不管什么任务都需要创建一个 job_task_batch记录 保障一个节点执行创建一次同时可以判断出DAG是否全部执行完成
for (WorkflowNode workflowNode : workflowNodes) { for (WorkflowNode workflowNode : workflowNodes) {
// 批次已经存在就不在重复生成 // 批次已经存在就不在重复生成
@ -117,9 +117,8 @@ public class WorkflowExecutorActor extends AbstractActor {
// 执行DAG中的节点 // 执行DAG中的节点
WorkflowExecutor workflowExecutor = WorkflowExecutorFactory.getWorkflowExecutor(workflowNode.getNodeType()); WorkflowExecutor workflowExecutor = WorkflowExecutorFactory.getWorkflowExecutor(workflowNode.getNodeType());
WorkflowExecutorContext context = new WorkflowExecutorContext(); WorkflowExecutorContext context = WorkflowTaskConverter.INSTANCE.toWorkflowExecutorContext(workflowNode);
context.setJob(jobMap.get(workflowNode.getJobId())); context.setJob(jobMap.get(workflowNode.getJobId()));
context.setWorkflowNodeId(workflowNode.getWorkflowId());
context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId()); context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
context.setParentWorkflowNodeId(taskExecute.getParentId()); context.setParentWorkflowNodeId(taskExecute.getParentId());

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
import com.aizuda.easy.retry.server.job.task.support.WorkflowExecutor; import com.aizuda.easy.retry.server.job.task.support.WorkflowExecutor;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.transaction.annotation.Transactional;
/** /**
* @author xiaowoniu * @author xiaowoniu
@ -11,6 +12,7 @@ import org.springframework.beans.factory.InitializingBean;
public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, InitializingBean { public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, InitializingBean {
@Override @Override
@Transactional
public void execute(WorkflowExecutorContext context) { public void execute(WorkflowExecutorContext context) {
doExecute(context); doExecute(context);

View File

@ -1,27 +1,39 @@
package com.aizuda.easy.retry.server.job.task.support.executor.workflow; package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum; import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum; import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter; import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator; import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext; import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.expression.EvaluationContext; import org.springframework.expression.EvaluationContext;
import org.springframework.expression.ExpressionParser; import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.Map; import java.util.Map;
import java.util.Optional;
/** /**
* @author xiaowoniu * @author xiaowoniu
@ -33,24 +45,26 @@ import java.util.Map;
@Slf4j @Slf4j
public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor { public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
private final static ExpressionParser ENGINE = new SpelExpressionParser(); private static final ExpressionParser ENGINE = new SpelExpressionParser();
private final JobTaskBatchGenerator jobTaskBatchGenerator; private final JobTaskBatchGenerator jobTaskBatchGenerator;
private final JobTaskMapper jobTaskMapper;
@Override @Override
public WorkflowNodeTypeEnum getWorkflowNodeType() { public WorkflowNodeTypeEnum getWorkflowNodeType() {
return WorkflowNodeTypeEnum.CONDITION; return WorkflowNodeTypeEnum.CONDITION;
} }
@Override @Override
protected void doExecute(WorkflowExecutorContext context) { public void doExecute(WorkflowExecutorContext context) {
int taskBatchStatus = JobTaskBatchStatusEnum.SUCCESS.getStatus(); int taskBatchStatus = JobTaskBatchStatusEnum.SUCCESS.getStatus();
int operationReason = JobOperationReasonEnum.NONE.getReason(); int operationReason = JobOperationReasonEnum.NONE.getReason();
int jobTaskStatus = JobTaskStatusEnum.SUCCESS.getStatus();
String message = StrUtil.EMPTY;
try { try {
// 根据配置的表达式执行 // 根据配置的表达式执行
Boolean result = doEval(context.getExpression(), JsonUtil.parseHashMap(context.getResult())); Boolean result = doEval(context.getExpression(), JsonUtil.parseHashMap(context.getResult()));
if (result) { if (Boolean.TRUE.equals(result)) {
// 若是工作流则开启下一个任务 // 若是工作流则开启下一个任务
try { try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
@ -67,15 +81,39 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
} catch (Exception e) { } catch (Exception e) {
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus(); taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR.getReason(); operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR.getReason();
jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus();
message = e.getMessage();
} }
JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context); JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context);
generatorContext.setTaskBatchStatus(taskBatchStatus); generatorContext.setTaskBatchStatus(taskBatchStatus);
generatorContext.setOperationReason(operationReason); generatorContext.setOperationReason(operationReason);
// 特殊的job
generatorContext.setJobId(SystemConstants.CONDITION_JOB_ID); generatorContext.setJobId(SystemConstants.CONDITION_JOB_ID);
jobTaskBatchGenerator.generateJobTaskBatch(generatorContext); JobTaskBatch jobTaskBatch = jobTaskBatchGenerator.generateJobTaskBatch(generatorContext);
// 生成执行任务实例
JobTask jobTask = new JobTask();
jobTask.setGroupName(context.getGroupName());
jobTask.setNamespaceId(context.getNamespaceId());
jobTask.setJobId(SystemConstants.CONDITION_JOB_ID);
jobTask.setClientInfo(StrUtil.EMPTY);
jobTask.setTaskBatchId(jobTaskBatch.getId());
jobTask.setArgsType(1);
jobTask.setArgsStr(context.getExpression());
jobTask.setTaskStatus(jobTaskStatus);
jobTask.setResultMessage(Optional.ofNullable(context.getResult()).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败"));
// 保存执行的日志
JobLogDTO jobLogDTO = new JobLogDTO();
jobLogDTO.setMessage(message);
jobLogDTO.setTaskId(jobTask.getId());
jobLogDTO.setJobId(SystemConstants.CONDITION_JOB_ID);
jobLogDTO.setGroupName(context.getGroupName());
jobLogDTO.setNamespaceId(context.getNamespaceId());
jobLogDTO.setTaskBatchId(jobTaskBatch.getId());
ActorRef actorRef = ActorGenerator.jobLogActor();
actorRef.tell(jobLogDTO, actorRef);
} }
protected Boolean doEval(String expression, Map<String, Object> context) { protected Boolean doEval(String expression, Map<String, Object> context) {

View File

@ -54,4 +54,24 @@ public class WorkflowExecutorContext {
*/ */
private String result; private String result;
/**
* 1SpEl2Aviator 3QL
*/
private Integer expressionType;
/**
* 失败策略 1跳过 2阻塞
*/
private Integer failStrategy;
/**
* 工作流节点状态 0关闭1开启
*/
private Integer workflowNodeStatus;
/**
* 节点表达式
*/
private String nodeExpression;
} }

View File

@ -12,6 +12,7 @@ import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerTask;
import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheel; import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheel;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException; import org.springframework.dao.DuplicateKeyException;
@ -36,7 +37,7 @@ public class JobTaskBatchGenerator {
private JobTaskBatchMapper jobTaskBatchMapper; private JobTaskBatchMapper jobTaskBatchMapper;
@Transactional @Transactional
public void generateJobTaskBatch(JobTaskBatchGeneratorContext context) { public JobTaskBatch generateJobTaskBatch(JobTaskBatchGeneratorContext context) {
// 生成一个新的任务 // 生成一个新的任务
JobTaskBatch jobTaskBatch = JobTaskConverter.INSTANCE.toJobTaskBatch(context); JobTaskBatch jobTaskBatch = JobTaskConverter.INSTANCE.toJobTaskBatch(context);
@ -56,12 +57,16 @@ public class JobTaskBatchGenerator {
Assert.isTrue(1 == jobTaskBatchMapper.insert(jobTaskBatch), () -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", context.getJobId())); Assert.isTrue(1 == jobTaskBatchMapper.insert(jobTaskBatch), () -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", context.getJobId()));
} catch (DuplicateKeyException ignored) { } catch (DuplicateKeyException ignored) {
// 忽略重复的DAG任务 // 忽略重复的DAG任务
return; return jobTaskBatchMapper.selectOne(new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getWorkflowTaskBatchId, context.getWorkflowTaskBatchId())
.eq(JobTaskBatch::getWorkflowNodeId, context.getWorkflowNodeId())
);
} }
// 非待处理状态无需进入时间轮中 // 非待处理状态无需进入时间轮中
if (JobTaskBatchStatusEnum.WAITING.getStatus() != jobTaskBatch.getTaskBatchStatus()) { if (JobTaskBatchStatusEnum.WAITING.getStatus() != jobTaskBatch.getTaskBatchStatus()) {
return; return jobTaskBatch;
} }
// 进入时间轮 // 进入时间轮
@ -75,6 +80,7 @@ public class JobTaskBatchGenerator {
JobTimerWheel.register(jobTaskBatch.getId(), JobTimerWheel.register(jobTaskBatch.getId(),
new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS); new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
return jobTaskBatch;
} }
} }

View File

@ -46,10 +46,10 @@ public class WorkflowRequestVO {
*/ */
private String description; private String description;
/** /**
* DAG节点配置 * DAG节点配置
*/ */
@NotNull(message = "DAG节点配置不能为空")
private NodeConfig nodeConfig; private NodeConfig nodeConfig;
@Data @Data

View File

@ -118,10 +118,12 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
Map<Long, WorkflowDetailResponseVO.NodeInfo> workflowNodeMap = nodeInfos.stream() Map<Long, WorkflowDetailResponseVO.NodeInfo> workflowNodeMap = nodeInfos.stream()
.peek(nodeInfo -> { .peek(nodeInfo -> {
JobTaskBatch taskBatch = jobTaskBatchMap.getOrDefault(nodeInfo.getId(), new JobTaskBatch()); JobTaskBatch jobTaskBatch = jobTaskBatchMap.get(nodeInfo.getId());
nodeInfo.setExecutionAt(DateUtils.toLocalDateTime(taskBatch.getExecutionAt())); if (Objects.nonNull(jobTaskBatch)) {
nodeInfo.setTaskBatchStatus(taskBatch.getTaskBatchStatus()); nodeInfo.setExecutionAt(DateUtils.toLocalDateTime(jobTaskBatch.getExecutionAt()));
nodeInfo.setOperationReason(taskBatch.getOperationReason()); nodeInfo.setTaskBatchStatus(jobTaskBatch.getTaskBatchStatus());
nodeInfo.setOperationReason(jobTaskBatch.getOperationReason());
}
}) })
.collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, i -> i)); .collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, i -> i));

View File

@ -5,6 +5,7 @@ import cn.hutool.core.util.HashUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.WaitStrategy; import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.config.SystemProperties;
@ -232,6 +233,9 @@ public class WorkflowServiceImpl implements WorkflowService {
workflowNode.setWorkflowId(workflowId); workflowNode.setWorkflowId(workflowId);
workflowNode.setGroupName(groupName); workflowNode.setGroupName(groupName);
workflowNode.setNodeType(nodeConfig.getNodeType()); workflowNode.setNodeType(nodeConfig.getNodeType());
if (WorkflowNodeTypeEnum.CONDITION.getType() == nodeConfig.getNodeType()) {
workflowNode.setJobId(SystemConstants.CONDITION_JOB_ID);
}
Assert.isTrue(1 == workflowNodeMapper.insert(workflowNode), () -> new EasyRetryServerException("新增工作流节点失败")); Assert.isTrue(1 == workflowNodeMapper.insert(workflowNode), () -> new EasyRetryServerException("新增工作流节点失败"));
// 添加节点 // 添加节点
graph.addNode(workflowNode.getId()); graph.addNode(workflowNode.getId());