diff --git a/doc/sql/easy_retry_mysql.sql b/doc/sql/easy_retry_mysql.sql
index 4209f116..52d2cb1c 100644
--- a/doc/sql/easy_retry_mysql.sql
+++ b/doc/sql/easy_retry_mysql.sql
@@ -361,7 +361,7 @@ CREATE TABLE `job_task_batch`
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`job_id` bigint(20) NOT NULL COMMENT '任务id',
`workflow_node_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '工作流节点id',
- `parent_workflow_node_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '工作流任务批次id',
+ `parent_workflow_node_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '工作流任务父批次id',
`workflow_task_batch_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '工作流任务批次id',
`task_batch_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务批次状态 0、失败 1、成功',
`operation_reason` tinyint(4) NOT NULL DEFAULT '0' COMMENT '操作原因',
@@ -374,7 +374,8 @@ CREATE TABLE `job_task_batch`
PRIMARY KEY (`id`),
KEY `idx_job_id_task_batch_status` (`job_id`, `task_batch_status`),
KEY `idx_create_dt` (`create_dt`),
- KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`)
+ KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`),
+ UNIQUE KEY `uk_workflow_task_batch_id_workflow_node_id` (`workflow_task_batch_id`, `workflow_node_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT ='任务批次';
diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobNotifySceneEnum.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobNotifySceneEnum.java
index 9e1a08b4..d461436c 100644
--- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobNotifySceneEnum.java
+++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobNotifySceneEnum.java
@@ -11,12 +11,8 @@ import lombok.Getter;
@Getter
public enum JobNotifySceneEnum {
-
-
JOB_TASK_ERROR(1, "JOB任务执行失败", NodeTypeEnum.SERVER);
-
-
/**
* 通知场景
*/
diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/WorkflowNodeTypeEnum.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/WorkflowNodeTypeEnum.java
new file mode 100644
index 00000000..5af55dea
--- /dev/null
+++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/WorkflowNodeTypeEnum.java
@@ -0,0 +1,43 @@
+package com.aizuda.easy.retry.common.core.enums;
+
+import lombok.Getter;
+
+/**
+ * 1、任务节点 2、条件节点 3、回调节点
+ *
+ * @author xiaowoniu
+ * @date 2023-12-24 08:13:43
+ * @since 2.6.0
+ */
+@Getter
+public enum WorkflowNodeTypeEnum {
+ JOB_TASK(1, "JOB任务"),
+ CONDITION(2, "条件节点"),
+ CALLBACK(3, "回调节点"),
+ ;
+
+ private final int type;
+ private final String desc;
+
+ WorkflowNodeTypeEnum(int type, String desc) {
+ this.type = type;
+ this.desc = desc;
+ }
+
+ public int getType() {
+ return type;
+ }
+
+ public String getDesc() {
+ return desc;
+ }
+
+ public static WorkflowNodeTypeEnum valueOf(int type) {
+ for (WorkflowNodeTypeEnum workflowNodeTypeEnum : WorkflowNodeTypeEnum.values()) {
+ if (workflowNodeTypeEnum.getType() == type) {
+ return workflowNodeTypeEnum;
+ }
+ }
+ return null;
+ }
+}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java
index 5f1df359..86b0dddb 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java
@@ -10,6 +10,8 @@ import lombok.Data;
@Data
public class WorkflowTaskPrepareDTO {
+ private Long workflowTaskBatchId;
+
private Long workflowId;
/**
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowExecutor.java
new file mode 100644
index 00000000..ab474078
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowExecutor.java
@@ -0,0 +1,16 @@
+package com.aizuda.easy.retry.server.job.task.support;
+
+import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
+import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowExecutorContext;
+
+/**
+ * @author www.byteblogs.com
+ * @date 2023-09-24 11:40:21
+ * @since 2.4.0
+ */
+public interface WorkflowExecutor {
+
+ WorkflowNodeTypeEnum getWorkflowNodeType();
+
+ void execute(WorkflowExecutorContext context);
+}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java
index e588e9c8..02b270e1 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java
@@ -88,7 +88,7 @@ public class JobExecutorActor extends AbstractActor {
private void doExecute(final TaskExecuteDTO taskExecute) {
LambdaQueryWrapper<Job> queryWrapper = new LambdaQueryWrapper<>();
- // 自动的校验任务必须是开启状态,手动触发无需校验
+ // 自动地校验任务必须是开启状态,手动触发无需校验
if (JobTriggerTypeEnum.AUTO.getType().equals(taskExecute.getTriggerType())) {
queryWrapper.eq(Job::getJobStatus, StatusEnum.YES.getStatus());
}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java
index fb1fcd3c..03bf5846 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java
@@ -8,6 +8,7 @@ import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
+import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
@@ -17,6 +18,10 @@ import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
+import com.aizuda.easy.retry.server.job.task.support.WorkflowExecutor;
+import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowExecutorContext;
+import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowExecutorFactory;
+import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper;
@@ -40,6 +45,7 @@ import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -57,6 +63,7 @@ public class WorkflowExecutorActor extends AbstractActor {
private final WorkflowNodeMapper workflowNodeMapper;
private final JobMapper jobMapper;
private final JobTaskBatchMapper jobTaskBatchMapper;
+ private final WorkflowBatchHandler workflowBatchHandler;
@Override
public Receive createReceive() {
@@ -83,23 +90,40 @@ public class WorkflowExecutorActor extends AbstractActor {
Set<Long> successors = graph.successors(taskExecute.getParentId());
if (CollectionUtils.isEmpty(successors)) {
+ boolean complete = workflowBatchHandler.complete(taskExecute.getWorkflowTaskBatchId(), workflowTaskBatch);
return;
}
+ List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
+ .select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId)
+ .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
+ .in(JobTaskBatch::getWorkflowNodeId, successors)
+ );
+
+ Map<Long, JobTaskBatch> jobTaskBatchMap = jobTaskBatchList.stream().collect(Collectors.toMap(JobTaskBatch::getWorkflowNodeId, i -> i));
+
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectBatchIds(successors);
List<Job> jobs = jobMapper.selectBatchIds(workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet()));
- Map<Long, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i));;
+ Map<Long, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i));
+
+ // TODO 此次做策略,按照任务节点 条件节点 回调节点做抽象
+ // 不管什么任务都需要创建一个 job_task_batch记录 保障一个节点执行创建一次,同时可以判断出DAG是否全部执行完成
for (WorkflowNode workflowNode : workflowNodes) {
- // 生成任务批次
- JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(jobMap.get(workflowNode.getJobId()));
- jobTaskPrepare.setTriggerType(JobTriggerTypeEnum.WORKFLOW.getType());
- jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli());
- jobTaskPrepare.setWorkflowNodeId(workflowNode.getId());
- jobTaskPrepare.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
- jobTaskPrepare.setParentWorkflowNodeId(taskExecute.getParentId());
- // 执行预处理阶段
- ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
- actorRef.tell(jobTaskPrepare, actorRef);
+ // 批次已经存在就不在重复生成
+ if (Objects.nonNull(jobTaskBatchMap.get(workflowNode.getId()))) {
+ continue;
+ }
+
+ // 执行DAG中的节点
+ WorkflowExecutor workflowExecutor = WorkflowExecutorFactory.getWorkflowExecutor(workflowNode.getNodeType());
+
+ WorkflowExecutorContext context = new WorkflowExecutorContext();
+ context.setJob(jobMap.get(workflowNode.getJobId()));
+ context.setWorkflowNodeId(workflowNode.getWorkflowId());
+ context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
+ context.setParentWorkflowNodeId(taskExecute.getParentId());
+
+ workflowExecutor.execute(context);
}
}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java
new file mode 100644
index 00000000..abdf2e93
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java
@@ -0,0 +1,25 @@
+package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
+
+import com.aizuda.easy.retry.server.job.task.support.WorkflowExecutor;
+import org.springframework.beans.factory.InitializingBean;
+
+/**
+ * @author xiaowoniu
+ * @date 2023-12-24 08:15:19
+ * @since 2.6.0
+ */
+public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, InitializingBean {
+
+ @Override
+ public void execute(WorkflowExecutorContext context) {
+
+ doExecute(context);
+ }
+
+ protected abstract void doExecute(WorkflowExecutorContext context);
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ WorkflowExecutorFactory.registerJobExecutor(getWorkflowNodeType(), this);
+ }
+}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java
new file mode 100644
index 00000000..5d93e771
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java
@@ -0,0 +1,22 @@
+package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
+
+import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author xiaowoniu
+ * @date 2023-12-24 08:18:06
+ * @since 2.6.0
+ */
+@Component
+public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
+ @Override
+ public WorkflowNodeTypeEnum getWorkflowNodeType() {
+ return WorkflowNodeTypeEnum.CALLBACK;
+ }
+
+ @Override
+ protected void doExecute(WorkflowExecutorContext context) {
+
+ }
+}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java
new file mode 100644
index 00000000..71782489
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java
@@ -0,0 +1,80 @@
+package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
+
+import akka.actor.ActorRef;
+import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
+import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
+import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
+import com.aizuda.easy.retry.common.core.util.JsonUtil;
+import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
+import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
+import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
+import com.aizuda.easy.retry.server.common.util.DateUtils;
+import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
+import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
+import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator;
+import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
+import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
+import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
+import lombok.RequiredArgsConstructor;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.expression.EvaluationContext;
+import org.springframework.expression.ExpressionParser;
+import org.springframework.expression.spel.standard.SpelExpressionParser;
+import org.springframework.expression.spel.support.StandardEvaluationContext;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+
+/**
+ * @author xiaowoniu
+ * @date 2023-12-24 08:17:11
+ * @since 2.6.0
+ */
+@Component
+@RequiredArgsConstructor
+public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
+
+ private final static ExpressionParser ENGINE = new SpelExpressionParser();
+
+ private final JobTaskBatchGenerator jobTaskBatchGenerator;
+
+ @Override
+ public WorkflowNodeTypeEnum getWorkflowNodeType() {
+ return WorkflowNodeTypeEnum.CONDITION;
+ }
+
+ @Override
+ protected void doExecute(WorkflowExecutorContext context) {
+
+ // 根据配置的表达式执行
+ Boolean result = doEval(context.getExpression(), context.getExpressionContext());
+ if (result) {
+ JobTaskBatchGeneratorContext generatorContext = new JobTaskBatchGeneratorContext();
+ generatorContext.setGroupName(context.getGroupName());
+ generatorContext.setNamespaceId(context.getNamespaceId());
+ generatorContext.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
+ generatorContext.setOperationReason(JobOperationReasonEnum.NONE.getReason());
+ // 特殊的job
+ generatorContext.setJobId(-1L);
+ generatorContext.setWorkflowNodeId(context.getWorkflowNodeId());
+ generatorContext.setParentWorkflowNodeId(context.getParentWorkflowNodeId());
+ generatorContext.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
+ jobTaskBatchGenerator.generateJobTaskBatch(generatorContext);
+ } else {
+ //
+ }
+
+ }
+
+ protected Boolean doEval(String expression, Map<String, Object> context) {
+ try {
+ final EvaluationContext evaluationContext = new StandardEvaluationContext();
+ context.forEach(evaluationContext::setVariable);
+ return ENGINE.parseExpression(expression).getValue(evaluationContext, Boolean.class);
+ } catch (Exception e) {
+ throw new EasyRetryServerException("SpEL表达式解析异常. expression:[{}] context:[{}]",
+ expression, JsonUtil.toJsonString(context), e);
+ }
+
+ }
+}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java
new file mode 100644
index 00000000..289557ca
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java
@@ -0,0 +1,38 @@
+package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
+
+import akka.actor.ActorRef;
+import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
+import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
+import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
+import com.aizuda.easy.retry.server.common.util.DateUtils;
+import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
+import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author xiaowoniu
+ * @date 2023-12-24 08:09:14
+ * @since 2.6.0
+ */
+@Component
+public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
+
+ @Override
+ public WorkflowNodeTypeEnum getWorkflowNodeType() {
+ return WorkflowNodeTypeEnum.JOB_TASK;
+ }
+
+ @Override
+ protected void doExecute(WorkflowExecutorContext context) {
+ // 生成任务批次
+ JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(context.getJob());
+ jobTaskPrepare.setTriggerType(JobTriggerTypeEnum.WORKFLOW.getType());
+ jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli());
+ jobTaskPrepare.setWorkflowNodeId(context.getWorkflowNodeId());
+ jobTaskPrepare.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
+ jobTaskPrepare.setParentWorkflowNodeId(context.getParentWorkflowNodeId());
+ // 执行预处理阶段
+ ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
+ actorRef.tell(jobTaskPrepare, actorRef);
+ }
+}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorContext.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorContext.java
new file mode 100644
index 00000000..c0641810
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorContext.java
@@ -0,0 +1,54 @@
+package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
+
+import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
+import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
+import lombok.Data;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author xiaowoniu
+ * @date 2023-12-24
+ * @since 2.6.0
+ */
+@Data
+public class WorkflowExecutorContext {
+
+ private String namespaceId;
+
+ /**
+ * 组名称
+ */
+ private String groupName;
+
+ /**
+ * 任务id
+ */
+ private Long jobId;
+
+ /**
+ * 工作流任务批次id
+ */
+ private Long workflowTaskBatchId;
+
+ /**
+ * 工作流节点id
+ */
+ private Long workflowNodeId;
+
+ /**
+ * 工作流父节点id
+ */
+ private Long parentWorkflowNodeId;
+
+ /**
+ * 任务属性
+ */
+ private Job job;
+
+ private String expression;
+
+ private Map<String, Object> expressionContext;
+
+}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorFactory.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorFactory.java
new file mode 100644
index 00000000..acdbd9c9
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorFactory.java
@@ -0,0 +1,26 @@
+package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
+
+import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
+import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
+import com.aizuda.easy.retry.server.job.task.support.JobExecutor;
+import com.aizuda.easy.retry.server.job.task.support.WorkflowExecutor;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author xiaowoniu
+ * @date 2023-12-24 13:04:09
+ * @since 2.6.0
+ */
+public class WorkflowExecutorFactory {
+
+ private static final ConcurrentHashMap<WorkflowNodeTypeEnum, WorkflowExecutor> CACHE = new ConcurrentHashMap<>();
+
+ protected static void registerJobExecutor(WorkflowNodeTypeEnum workflowNodeTypeEnum, WorkflowExecutor executor) {
+ CACHE.put(workflowNodeTypeEnum, executor);
+ }
+
+ public static WorkflowExecutor getWorkflowExecutor(Integer type) {
+ return CACHE.get(WorkflowNodeTypeEnum.valueOf(type));
+ }
+}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java
index f61b830c..e6f796fc 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java
@@ -14,6 +14,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatch
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
@@ -51,7 +52,12 @@ public class JobTaskBatchGenerator {
jobTaskBatch.setOperationReason(context.getOperationReason());
}
- Assert.isTrue(1 == jobTaskBatchMapper.insert(jobTaskBatch), () -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", context.getJobId()));
+ try {
+ Assert.isTrue(1 == jobTaskBatchMapper.insert(jobTaskBatch), () -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", context.getJobId()));
+ } catch (DuplicateKeyException ignored) {
+ // 忽略重复的DAG任务
+ return;
+ }
// 非待处理状态无需进入时间轮中
if (JobTaskBatchStatusEnum.WAITING.getStatus() != jobTaskBatch.getTaskBatchStatus()) {
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java
new file mode 100644
index 00000000..cada04b4
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java
@@ -0,0 +1,79 @@
+package com.aizuda.easy.retry.server.job.task.support.handler;
+
+import cn.hutool.core.lang.Assert;
+import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
+import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
+import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
+import com.aizuda.easy.retry.server.common.util.DateUtils;
+import com.aizuda.easy.retry.server.common.util.GraphUtils;
+import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
+import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
+import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
+import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper;
+import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
+import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
+import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
+import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.google.common.graph.MutableGraph;
+import lombok.RequiredArgsConstructor;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * @author xiaowoniu
+ * @date 2023-12-24 07:53:18
+ * @since 2.6.0
+ */
+@Component
+@RequiredArgsConstructor
+public class WorkflowBatchHandler {
+ private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
+ private final WorkflowNodeMapper workflowNodeMapper;
+ private final JobMapper jobMapper;
+ private final JobTaskBatchMapper jobTaskBatchMapper;
+
+ public boolean complete(Long workflowTaskBatchId) throws IOException {
+ return complete(workflowTaskBatchId, null);
+ }
+ public boolean complete(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) throws IOException {
+ workflowTaskBatch = Optional.ofNullable(workflowTaskBatch).orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId));
+ Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在"));
+
+ String flowInfo = workflowTaskBatch.getFlowInfo();
+ MutableGraph<Long> graph = GraphUtils.deserializeJsonToGraph(flowInfo);
+
+ // 说明没有后继节点了, 此时需要判断整个DAG是否全部执行完成
+ long executedTaskCount = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper<JobTaskBatch>()
+ .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
+ .in(JobTaskBatch::getWorkflowNodeId, graph.nodes())
+ );
+
+ Long taskNodeCount = workflowNodeMapper.selectCount(new LambdaQueryWrapper<WorkflowNode>()
+// .eq(WorkflowNode::getNodeType, 1) // TODO 任务节点 若最后一个节点是条件或者是回调节点 这个地方就有问题
+ .in(WorkflowNode::getId, graph.nodes()));
+
+ // TODO 若最后几个节点都是非任务节点,这里直接完成就会有问题
+ if (executedTaskCount < taskNodeCount) {
+ return false;
+ }
+
+ handlerTaskBatch(workflowTaskBatchId, JobTaskBatchStatusEnum.SUCCESS.getStatus(), JobOperationReasonEnum.NONE.getReason());
+ return true;
+
+ }
+
+ private void handlerTaskBatch(Long workflowTaskBatchId, int taskStatus, int operationReason) {
+
+ WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch();
+ jobTaskBatch.setId(workflowTaskBatchId);
+ jobTaskBatch.setExecutionAt(DateUtils.toNowMilli());
+ jobTaskBatch.setTaskBatchStatus(taskStatus);
+ jobTaskBatch.setOperationReason(operationReason);
+ Assert.isTrue(1 == workflowTaskBatchMapper.updateById(jobTaskBatch),
+ () -> new EasyRetryServerException("更新任务失败"));
+
+ }
+}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/RunningWorkflowPrepareHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/RunningWorkflowPrepareHandler.java
new file mode 100644
index 00000000..e2cd88f5
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/RunningWorkflowPrepareHandler.java
@@ -0,0 +1,45 @@
+package com.aizuda.easy.retry.server.job.task.support.prepare.workflow;
+
+import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
+import com.aizuda.easy.retry.common.core.util.JsonUtil;
+import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
+import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+
+/**
+ * @author xiaowoniu
+ * @date 2023-12-23 23:09:07
+ * @since 2.6.0
+ */
+@Component
+@RequiredArgsConstructor
+@Slf4j
+public class RunningWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler {
+ private final WorkflowBatchHandler workflowBatchHandler;
+
+ @Override
+ public boolean matches(Integer status) {
+ return JobTaskBatchStatusEnum.RUNNING.getStatus() == status;
+ }
+
+ @Override
+ protected void doHandler(WorkflowTaskPrepareDTO jobPrepareDTO) {
+ log.info("存在运行中的任务. prepare:[{}]", JsonUtil.toJsonString(jobPrepareDTO));
+
+ // 1. 若DAG已经支持完成了,由于异常原因导致的没有更新成终态此次进行一次更新操作
+ try {
+ workflowBatchHandler.complete(jobPrepareDTO.getWorkflowTaskBatchId());
+ } catch (IOException e) {
+ // TODO 待处理
+ }
+
+ // 2. 判断DAG是否已经支持超时
+ // 3. 支持阻塞策略同JOB逻辑一致
+
+
+ }
+}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/WaiWorkflowPrepareHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/WaiWorkflowPrepareHandler.java
new file mode 100644
index 00000000..52a7877e
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/WaiWorkflowPrepareHandler.java
@@ -0,0 +1,52 @@
+package com.aizuda.easy.retry.server.job.task.support.prepare.workflow;
+
+import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
+import com.aizuda.easy.retry.server.common.util.DateUtils;
+import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
+import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
+import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
+import com.aizuda.easy.retry.server.job.task.dto.WorkflowTimerTaskDTO;
+import com.aizuda.easy.retry.server.job.task.support.prepare.AbstractJobPrePareHandler;
+import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerTask;
+import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheel;
+import com.aizuda.easy.retry.server.job.task.support.timer.WorkflowTimerTask;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 处理处于{@link JobTaskBatchStatusEnum::WAIT}状态的任务
+ *
+ * @author xiaowoniu
+ * @date 2023-10-05 18:29:22
+ * @since 2.6.0
+ */
+@Component
+@Slf4j
+public class WaiWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler {
+
+ @Override
+ public boolean matches(Integer status) {
+ return JobTaskBatchStatusEnum.WAITING.getStatus() == status;
+ }
+
+ @Override
+ protected void doHandler(WorkflowTaskPrepareDTO workflowTaskPrepareDTO) {
+ log.info("存在待处理任务. workflowTaskBatchId:[{}]", workflowTaskPrepareDTO.getWorkflowTaskBatchId());
+
+ // 若时间轮中数据不存在则重新加入
+ if (!JobTimerWheel.isExisted(workflowTaskPrepareDTO.getWorkflowTaskBatchId())) {
+ log.info("存在待处理任务且时间轮中不存在 workflowTaskBatchId:[{}]", workflowTaskPrepareDTO.getWorkflowTaskBatchId());
+
+ // 进入时间轮
+ long delay = workflowTaskPrepareDTO.getNextTriggerAt() - DateUtils.toNowMilli();
+ WorkflowTimerTaskDTO workflowTimerTaskDTO = new WorkflowTimerTaskDTO();
+ workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskPrepareDTO.getWorkflowTaskBatchId());
+ workflowTimerTaskDTO.setWorkflowId(workflowTaskPrepareDTO.getWorkflowId());
+ workflowTimerTaskDTO.setTriggerType(workflowTaskPrepareDTO.getTriggerType());
+ JobTimerWheel.register(workflowTaskPrepareDTO.getWorkflowTaskBatchId(),
+ new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
+ }
+ }
+}