feat: 2.6.0

1. 抽象DAG任务节点的执行策略
2. 抽离DAG批次完成的公共方法
This commit is contained in:
byteblogs168 2023-12-24 09:00:14 +08:00
parent 1e3a96fd52
commit a13373ccdc
17 changed files with 528 additions and 19 deletions

View File

@ -361,7 +361,7 @@ CREATE TABLE `job_task_batch`
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`job_id` bigint(20) NOT NULL COMMENT '任务id',
`workflow_node_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '工作流节点id',
`parent_workflow_node_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '工作流任务批次id',
`parent_workflow_node_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '工作流任务批次id',
`workflow_task_batch_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '工作流任务批次id',
`task_batch_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务批次状态 0、失败 1、成功',
`operation_reason` tinyint(4) NOT NULL DEFAULT '0' COMMENT '操作原因',
@ -374,7 +374,8 @@ CREATE TABLE `job_task_batch`
PRIMARY KEY (`id`),
KEY `idx_job_id_task_batch_status` (`job_id`, `task_batch_status`),
KEY `idx_create_dt` (`create_dt`),
KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`)
KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`),
UNIQUE KEY `uk_workflow_task_batch_id_workflow_node_id` (`workflow_task_batch_id`, `workflow_node_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT ='任务批次';

View File

@ -11,12 +11,8 @@ import lombok.Getter;
@Getter
public enum JobNotifySceneEnum {
JOB_TASK_ERROR(1, "JOB任务执行失败", NodeTypeEnum.SERVER);
/**
* 通知场景
*/

View File

@ -0,0 +1,43 @@
package com.aizuda.easy.retry.common.core.enums;
import lombok.Getter;
/**
* 1任务节点 2条件节点 3回调节点
*
* @author xiaowoniu
* @date 2023-12-24 08:13:43
* @since 2.6.0
*/
@Getter
public enum WorkflowNodeTypeEnum {
JOB_TASK(1, "JOB任务"),
CONDITION(2, "条件节点"),
CALLBACK(3, "回调节点"),
;
private final int type;
private final String desc;
WorkflowNodeTypeEnum(int type, String desc) {
this.type = type;
this.desc = desc;
}
public int getType() {
return type;
}
public String getDesc() {
return desc;
}
public static WorkflowNodeTypeEnum valueOf(int type) {
for (WorkflowNodeTypeEnum workflowNodeTypeEnum : WorkflowNodeTypeEnum.values()) {
if (workflowNodeTypeEnum.getType() == type) {
return workflowNodeTypeEnum;
}
}
return null;
}
}

View File

@ -10,6 +10,8 @@ import lombok.Data;
@Data
public class WorkflowTaskPrepareDTO {
private Long workflowTaskBatchId;
private Long workflowId;
/**

View File

@ -0,0 +1,16 @@
package com.aizuda.easy.retry.server.job.task.support;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowExecutorContext;
/**
* @author www.byteblogs.com
* @date 2023-09-24 11:40:21
* @since 2.4.0
*/
public interface WorkflowExecutor {
WorkflowNodeTypeEnum getWorkflowNodeType();
void execute(WorkflowExecutorContext context);
}

View File

@ -88,7 +88,7 @@ public class JobExecutorActor extends AbstractActor {
private void doExecute(final TaskExecuteDTO taskExecute) {
LambdaQueryWrapper<Job> queryWrapper = new LambdaQueryWrapper<>();
// 自动校验任务必须是开启状态手动触发无需校验
// 自动校验任务必须是开启状态手动触发无需校验
if (JobTriggerTypeEnum.AUTO.getType().equals(taskExecute.getTriggerType())) {
queryWrapper.eq(Job::getJobStatus, StatusEnum.YES.getStatus());
}

View File

@ -8,6 +8,7 @@ import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
@ -17,6 +18,10 @@ import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.WorkflowExecutor;
import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowExecutorContext;
import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowExecutorFactory;
import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper;
@ -40,6 +45,7 @@ import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@ -57,6 +63,7 @@ public class WorkflowExecutorActor extends AbstractActor {
private final WorkflowNodeMapper workflowNodeMapper;
private final JobMapper jobMapper;
private final JobTaskBatchMapper jobTaskBatchMapper;
private final WorkflowBatchHandler workflowBatchHandler;
@Override
public Receive createReceive() {
@ -83,23 +90,40 @@ public class WorkflowExecutorActor extends AbstractActor {
Set<Long> successors = graph.successors(taskExecute.getParentId());
if (CollectionUtils.isEmpty(successors)) {
boolean complete = workflowBatchHandler.complete(taskExecute.getWorkflowTaskBatchId(), workflowTaskBatch);
return;
}
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
.select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId)
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
.in(JobTaskBatch::getWorkflowNodeId, successors)
);
Map<Long, JobTaskBatch> jobTaskBatchMap = jobTaskBatchList.stream().collect(Collectors.toMap(JobTaskBatch::getWorkflowNodeId, i -> i));
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectBatchIds(successors);
List<Job> jobs = jobMapper.selectBatchIds(workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet()));
Map<Long, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i));;
Map<Long, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i));
// TODO 此次做策略按照任务节点 条件节点 回调节点做抽象
// 不管什么任务都需要创建一个 job_task_batch记录 保障一个节点执行创建一次同时可以判断出DAG是否全部执行完成
for (WorkflowNode workflowNode : workflowNodes) {
// 生成任务批次
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(jobMap.get(workflowNode.getJobId()));
jobTaskPrepare.setTriggerType(JobTriggerTypeEnum.WORKFLOW.getType());
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli());
jobTaskPrepare.setWorkflowNodeId(workflowNode.getId());
jobTaskPrepare.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
jobTaskPrepare.setParentWorkflowNodeId(taskExecute.getParentId());
// 执行预处理阶段
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
actorRef.tell(jobTaskPrepare, actorRef);
// 批次已经存在就不在重复生成
if (Objects.nonNull(jobTaskBatchMap.get(workflowNode.getId()))) {
continue;
}
// 执行DAG中的节点
WorkflowExecutor workflowExecutor = WorkflowExecutorFactory.getWorkflowExecutor(workflowNode.getNodeType());
WorkflowExecutorContext context = new WorkflowExecutorContext();
context.setJob(jobMap.get(workflowNode.getJobId()));
context.setWorkflowNodeId(workflowNode.getWorkflowId());
context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
context.setParentWorkflowNodeId(taskExecute.getParentId());
workflowExecutor.execute(context);
}
}

View File

@ -0,0 +1,25 @@
package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
import com.aizuda.easy.retry.server.job.task.support.WorkflowExecutor;
import org.springframework.beans.factory.InitializingBean;
/**
* @author xiaowoniu
* @date 2023-12-24 08:15:19
* @since 2.6.0
*/
public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, InitializingBean {
@Override
public void execute(WorkflowExecutorContext context) {
doExecute(context);
}
protected abstract void doExecute(WorkflowExecutorContext context);
@Override
public void afterPropertiesSet() throws Exception {
WorkflowExecutorFactory.registerJobExecutor(getWorkflowNodeType(), this);
}
}

View File

@ -0,0 +1,22 @@
package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import org.springframework.stereotype.Component;
/**
* @author xiaowoniu
* @date 2023-12-24 08:18:06
* @since 2.6.0
*/
@Component
public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
@Override
public WorkflowNodeTypeEnum getWorkflowNodeType() {
return WorkflowNodeTypeEnum.CALLBACK;
}
@Override
protected void doExecute(WorkflowExecutorContext context) {
}
}

View File

@ -0,0 +1,80 @@
package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @author xiaowoniu
* @date 2023-12-24 08:17:11
* @since 2.6.0
*/
@Component
@RequiredArgsConstructor
public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
private final static ExpressionParser ENGINE = new SpelExpressionParser();
private final JobTaskBatchGenerator jobTaskBatchGenerator;
@Override
public WorkflowNodeTypeEnum getWorkflowNodeType() {
return WorkflowNodeTypeEnum.CONDITION;
}
@Override
protected void doExecute(WorkflowExecutorContext context) {
// 根据配置的表达式执行
Boolean result = doEval(context.getExpression(), context.getExpressionContext());
if (result) {
JobTaskBatchGeneratorContext generatorContext = new JobTaskBatchGeneratorContext();
generatorContext.setGroupName(context.getGroupName());
generatorContext.setNamespaceId(context.getNamespaceId());
generatorContext.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
generatorContext.setOperationReason(JobOperationReasonEnum.NONE.getReason());
// 特殊的job
generatorContext.setJobId(-1L);
generatorContext.setWorkflowNodeId(context.getWorkflowNodeId());
generatorContext.setParentWorkflowNodeId(context.getParentWorkflowNodeId());
generatorContext.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
jobTaskBatchGenerator.generateJobTaskBatch(generatorContext);
} else {
//
}
}
protected Boolean doEval(String expression, Map<String, Object> context) {
try {
final EvaluationContext evaluationContext = new StandardEvaluationContext();
context.forEach(evaluationContext::setVariable);
return ENGINE.parseExpression(expression).getValue(evaluationContext, Boolean.class);
} catch (Exception e) {
throw new EasyRetryServerException("SpEL表达式解析异常. expression:[{}] context:[{}]",
expression, JsonUtil.toJsonString(context), e);
}
}
}

View File

@ -0,0 +1,38 @@
package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import org.springframework.stereotype.Component;
/**
* @author xiaowoniu
* @date 2023-12-24 08:09:14
* @since 2.6.0
*/
@Component
public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
@Override
public WorkflowNodeTypeEnum getWorkflowNodeType() {
return WorkflowNodeTypeEnum.JOB_TASK;
}
@Override
protected void doExecute(WorkflowExecutorContext context) {
// 生成任务批次
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(context.getJob());
jobTaskPrepare.setTriggerType(JobTriggerTypeEnum.WORKFLOW.getType());
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli());
jobTaskPrepare.setWorkflowNodeId(context.getWorkflowNodeId());
jobTaskPrepare.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
jobTaskPrepare.setParentWorkflowNodeId(context.getParentWorkflowNodeId());
// 执行预处理阶段
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
actorRef.tell(jobTaskPrepare, actorRef);
}
}

View File

@ -0,0 +1,54 @@
package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import lombok.Data;
import java.util.List;
import java.util.Map;
/**
* @author xiaowoniu
* @date 2023-12-24
* @since 2.6.0
*/
@Data
public class WorkflowExecutorContext {
private String namespaceId;
/**
* 组名称
*/
private String groupName;
/**
* 任务id
*/
private Long jobId;
/**
* 工作流任务批次id
*/
private Long workflowTaskBatchId;
/**
* 工作流节点id
*/
private Long workflowNodeId;
/**
* 工作流父节点id
*/
private Long parentWorkflowNodeId;
/**
* 任务属性
*/
private Job job;
private String expression;
private Map<String, Object> expressionContext;
}

View File

@ -0,0 +1,26 @@
package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.server.job.task.support.JobExecutor;
import com.aizuda.easy.retry.server.job.task.support.WorkflowExecutor;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author xiaowoniu
* @date 2023-12-24 13:04:09
* @since 2.6.0
*/
public class WorkflowExecutorFactory {
private static final ConcurrentHashMap<WorkflowNodeTypeEnum, WorkflowExecutor> CACHE = new ConcurrentHashMap<>();
protected static void registerJobExecutor(WorkflowNodeTypeEnum workflowNodeTypeEnum, WorkflowExecutor executor) {
CACHE.put(workflowNodeTypeEnum, executor);
}
public static WorkflowExecutor getWorkflowExecutor(Integer type) {
return CACHE.get(WorkflowNodeTypeEnum.valueOf(type));
}
}

View File

@ -14,6 +14,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatch
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
@ -51,7 +52,12 @@ public class JobTaskBatchGenerator {
jobTaskBatch.setOperationReason(context.getOperationReason());
}
Assert.isTrue(1 == jobTaskBatchMapper.insert(jobTaskBatch), () -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", context.getJobId()));
try {
Assert.isTrue(1 == jobTaskBatchMapper.insert(jobTaskBatch), () -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", context.getJobId()));
} catch (DuplicateKeyException ignored) {
// 忽略重复的DAG任务
return;
}
// 非待处理状态无需进入时间轮中
if (JobTaskBatchStatusEnum.WAITING.getStatus() != jobTaskBatch.getTaskBatchStatus()) {

View File

@ -0,0 +1,79 @@
package com.aizuda.easy.retry.server.job.task.support.handler;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.common.util.GraphUtils;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.graph.MutableGraph;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Optional;
/**
* @author xiaowoniu
* @date 2023-12-24 07:53:18
* @since 2.6.0
*/
@Component
@RequiredArgsConstructor
public class WorkflowBatchHandler {
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
private final WorkflowNodeMapper workflowNodeMapper;
private final JobMapper jobMapper;
private final JobTaskBatchMapper jobTaskBatchMapper;
public boolean complete(Long workflowTaskBatchId) throws IOException {
return complete(workflowTaskBatchId, null);
}
public boolean complete(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) throws IOException {
workflowTaskBatch = Optional.ofNullable(workflowTaskBatch).orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId));
Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在"));
String flowInfo = workflowTaskBatch.getFlowInfo();
MutableGraph<Long> graph = GraphUtils.deserializeJsonToGraph(flowInfo);
// 说明没有后继节点了, 此时需要判断整个DAG是否全部执行完成
long executedTaskCount = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
.in(JobTaskBatch::getWorkflowNodeId, graph.nodes())
);
Long taskNodeCount = workflowNodeMapper.selectCount(new LambdaQueryWrapper<WorkflowNode>()
// .eq(WorkflowNode::getNodeType, 1) // TODO 任务节点 若最后一个节点是条件或者是回调节点 这个地方就有问题
.in(WorkflowNode::getId, graph.nodes()));
// TODO 若最后几个节点都是非任务节点这里直接完成就会有问题
if (executedTaskCount < taskNodeCount) {
return false;
}
handlerTaskBatch(workflowTaskBatchId, JobTaskBatchStatusEnum.SUCCESS.getStatus(), JobOperationReasonEnum.NONE.getReason());
return true;
}
private void handlerTaskBatch(Long workflowTaskBatchId, int taskStatus, int operationReason) {
WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch();
jobTaskBatch.setId(workflowTaskBatchId);
jobTaskBatch.setExecutionAt(DateUtils.toNowMilli());
jobTaskBatch.setTaskBatchStatus(taskStatus);
jobTaskBatch.setOperationReason(operationReason);
Assert.isTrue(1 == workflowTaskBatchMapper.updateById(jobTaskBatch),
() -> new EasyRetryServerException("更新任务失败"));
}
}

View File

@ -0,0 +1,45 @@
package com.aizuda.easy.retry.server.job.task.support.prepare.workflow;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author xiaowoniu
* @date 2023-12-23 23:09:07
* @since 2.6.0
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class RunningWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler {
private final WorkflowBatchHandler workflowBatchHandler;
@Override
public boolean matches(Integer status) {
return JobTaskBatchStatusEnum.RUNNING.getStatus() == status;
}
@Override
protected void doHandler(WorkflowTaskPrepareDTO jobPrepareDTO) {
log.info("存在运行中的任务. prepare:[{}]", JsonUtil.toJsonString(jobPrepareDTO));
// 1. 若DAG已经支持完成了由于异常原因导致的没有更新成终态此次进行一次更新操作
try {
workflowBatchHandler.complete(jobPrepareDTO.getWorkflowTaskBatchId());
} catch (IOException e) {
// TODO 待处理
}
// 2. 判断DAG是否已经支持超时
// 3. 支持阻塞策略同JOB逻辑一致
}
}

View File

@ -0,0 +1,52 @@
package com.aizuda.easy.retry.server.job.task.support.prepare.workflow;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTimerTaskDTO;
import com.aizuda.easy.retry.server.job.task.support.prepare.AbstractJobPrePareHandler;
import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerTask;
import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheel;
import com.aizuda.easy.retry.server.job.task.support.timer.WorkflowTimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* 处理处于{@link JobTaskBatchStatusEnum::WAIT}状态的任务
*
* @author xiaowoniu
* @date 2023-10-05 18:29:22
* @since 2.6.0
*/
@Component
@Slf4j
public class WaiWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler {
@Override
public boolean matches(Integer status) {
return JobTaskBatchStatusEnum.WAITING.getStatus() == status;
}
@Override
protected void doHandler(WorkflowTaskPrepareDTO workflowTaskPrepareDTO) {
log.info("存在待处理任务. workflowTaskBatchId:[{}]", workflowTaskPrepareDTO.getWorkflowTaskBatchId());
// 若时间轮中数据不存在则重新加入
if (!JobTimerWheel.isExisted(workflowTaskPrepareDTO.getWorkflowTaskBatchId())) {
log.info("存在待处理任务且时间轮中不存在 workflowTaskBatchId:[{}]", workflowTaskPrepareDTO.getWorkflowTaskBatchId());
// 进入时间轮
long delay = workflowTaskPrepareDTO.getNextTriggerAt() - DateUtils.toNowMilli();
WorkflowTimerTaskDTO workflowTimerTaskDTO = new WorkflowTimerTaskDTO();
workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskPrepareDTO.getWorkflowTaskBatchId());
workflowTimerTaskDTO.setWorkflowId(workflowTaskPrepareDTO.getWorkflowId());
workflowTimerTaskDTO.setTriggerType(workflowTaskPrepareDTO.getTriggerType());
JobTimerWheel.register(workflowTaskPrepareDTO.getWorkflowTaskBatchId(),
new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
}
}
}