From a13373ccdc4ccbb7a8501628e3d6c4f8126c5991 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Sun, 24 Dec 2023 09:00:14 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.6.0=201.=20=E6=8A=BD=E8=B1=A1DAG?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E8=8A=82=E7=82=B9=E7=9A=84=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E7=AD=96=E7=95=A5=202.=20=E6=8A=BD=E7=A6=BBDAG=E6=89=B9?= =?UTF-8?q?=E6=AC=A1=E5=AE=8C=E6=88=90=E7=9A=84=E5=85=AC=E5=85=B1=E6=96=B9?= =?UTF-8?q?=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/sql/easy_retry_mysql.sql | 5 +- .../common/core/enums/JobNotifySceneEnum.java | 4 - .../core/enums/WorkflowNodeTypeEnum.java | 43 ++++++++++ .../job/task/dto/WorkflowTaskPrepareDTO.java | 2 + .../job/task/support/WorkflowExecutor.java | 16 ++++ .../support/dispatch/JobExecutorActor.java | 2 +- .../dispatch/WorkflowExecutorActor.java | 46 ++++++++--- .../workflow/AbstractWorkflowExecutor.java | 25 ++++++ .../workflow/CallbackWorkflowExecutor.java | 22 +++++ .../workflow/ConditionWorkflowExecutor.java | 80 +++++++++++++++++++ .../workflow/JobTaskWorkflowExecutor.java | 38 +++++++++ .../workflow/WorkflowExecutorContext.java | 54 +++++++++++++ .../workflow/WorkflowExecutorFactory.java | 26 ++++++ .../batch/JobTaskBatchGenerator.java | 8 +- .../support/handler/WorkflowBatchHandler.java | 79 ++++++++++++++++++ .../RunningWorkflowPrepareHandler.java | 45 +++++++++++ .../workflow/WaiWorkflowPrepareHandler.java | 52 ++++++++++++ 17 files changed, 528 insertions(+), 19 deletions(-) create mode 100644 easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/WorkflowNodeTypeEnum.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowExecutor.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorContext.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorFactory.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/RunningWorkflowPrepareHandler.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/WaiWorkflowPrepareHandler.java diff --git a/doc/sql/easy_retry_mysql.sql b/doc/sql/easy_retry_mysql.sql index 4209f116e..52d2cb1c0 100644 --- a/doc/sql/easy_retry_mysql.sql +++ b/doc/sql/easy_retry_mysql.sql @@ -361,7 +361,7 @@ CREATE TABLE `job_task_batch` `group_name` varchar(64) NOT NULL COMMENT '组名称', `job_id` bigint(20) NOT NULL COMMENT '任务id', `workflow_node_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '工作流节点id', - `parent_workflow_node_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '工作流任务批次id', + `parent_workflow_node_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '工作流任务父批次id', `workflow_task_batch_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '工作流任务批次id', `task_batch_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务批次状态 0、失败 1、成功', `operation_reason` tinyint(4) NOT NULL DEFAULT '0' COMMENT '操作原因', @@ -374,7 +374,8 @@ CREATE TABLE `job_task_batch` PRIMARY KEY (`id`), KEY `idx_job_id_task_batch_status` (`job_id`, `task_batch_status`), KEY `idx_create_dt` (`create_dt`), - KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`) + KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`), + UNIQUE KEY `uk_workflow_task_batch_id_workflow_node_id` (`workflow_task_batch_id`, `workflow_node_id`) ) ENGINE = InnoDB AUTO_INCREMENT = 0 DEFAULT CHARSET = utf8mb4 COMMENT ='任务批次'; diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobNotifySceneEnum.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobNotifySceneEnum.java index 9e1a08b46..d461436c4 100644 --- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobNotifySceneEnum.java +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobNotifySceneEnum.java @@ -11,12 +11,8 @@ import lombok.Getter; @Getter public enum JobNotifySceneEnum { - - JOB_TASK_ERROR(1, "JOB任务执行失败", NodeTypeEnum.SERVER); - - /** * 通知场景 */ diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/WorkflowNodeTypeEnum.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/WorkflowNodeTypeEnum.java new file mode 100644 index 000000000..5af55dea2 --- /dev/null +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/WorkflowNodeTypeEnum.java @@ -0,0 +1,43 @@ +package com.aizuda.easy.retry.common.core.enums; + +import lombok.Getter; + +/** + * 1、任务节点 2、条件节点 3、回调节点 + * + * @author xiaowoniu + * @date 2023-12-24 08:13:43 + * @since 2.6.0 + */ +@Getter +public enum WorkflowNodeTypeEnum { + JOB_TASK(1, "JOB任务"), + CONDITION(2, "条件节点"), + CALLBACK(3, "回调节点"), + ; + + private final int type; + private final String desc; + + WorkflowNodeTypeEnum(int type, String desc) { + this.type = type; + this.desc = desc; + } + + public int getType() { + return type; + } + + public String getDesc() { + return desc; + } + + public static WorkflowNodeTypeEnum valueOf(int type) { + for (WorkflowNodeTypeEnum workflowNodeTypeEnum : WorkflowNodeTypeEnum.values()) { + if (workflowNodeTypeEnum.getType() == type) { + return workflowNodeTypeEnum; + } + } + return null; + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java index 5f1df359a..86b0dddb0 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java @@ -10,6 +10,8 @@ import lombok.Data; @Data public class WorkflowTaskPrepareDTO { + private Long workflowTaskBatchId; + private Long workflowId; /** diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowExecutor.java new file mode 100644 index 000000000..ab474078a --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowExecutor.java @@ -0,0 +1,16 @@ +package com.aizuda.easy.retry.server.job.task.support; + +import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum; +import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowExecutorContext; + +/** + * @author www.byteblogs.com + * @date 2023-09-24 11:40:21 + * @since 2.4.0 + */ +public interface WorkflowExecutor { + + WorkflowNodeTypeEnum getWorkflowNodeType(); + + void execute(WorkflowExecutorContext context); +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java index e588e9c89..02b270e16 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java @@ -88,7 +88,7 @@ public class JobExecutorActor extends AbstractActor { private void doExecute(final TaskExecuteDTO taskExecute) { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); - // 自动的校验任务必须是开启状态,手动触发无需校验 + // 自动地校验任务必须是开启状态,手动触发无需校验 if (JobTriggerTypeEnum.AUTO.getType().equals(taskExecute.getTriggerType())) { queryWrapper.eq(Job::getJobStatus, StatusEnum.YES.getStatus()); } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java index fb1fcd3c0..03bf5846b 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -8,6 +8,7 @@ import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; @@ -17,6 +18,10 @@ import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; +import com.aizuda.easy.retry.server.job.task.support.WorkflowExecutor; +import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowExecutorContext; +import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowExecutorFactory; +import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper; @@ -40,6 +45,7 @@ import org.springframework.util.CollectionUtils; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -57,6 +63,7 @@ public class WorkflowExecutorActor extends AbstractActor { private final WorkflowNodeMapper workflowNodeMapper; private final JobMapper jobMapper; private final JobTaskBatchMapper jobTaskBatchMapper; + private final WorkflowBatchHandler workflowBatchHandler; @Override public Receive createReceive() { @@ -83,23 +90,40 @@ public class WorkflowExecutorActor extends AbstractActor { Set successors = graph.successors(taskExecute.getParentId()); if (CollectionUtils.isEmpty(successors)) { + boolean complete = workflowBatchHandler.complete(taskExecute.getWorkflowTaskBatchId(), workflowTaskBatch); return; } + List jobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper() + .select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId) + .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId()) + .in(JobTaskBatch::getWorkflowNodeId, successors) + ); + + Map jobTaskBatchMap = jobTaskBatchList.stream().collect(Collectors.toMap(JobTaskBatch::getWorkflowNodeId, i -> i)); + List workflowNodes = workflowNodeMapper.selectBatchIds(successors); List jobs = jobMapper.selectBatchIds(workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet())); - Map jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i));; + Map jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i)); + + // TODO 此次做策略,按照任务节点 条件节点 回调节点做抽象 + // 不管什么任务都需要创建一个 job_task_batch记录 保障一个节点执行创建一次,同时可以判断出DAG是否全部执行完成 for (WorkflowNode workflowNode : workflowNodes) { - // 生成任务批次 - JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(jobMap.get(workflowNode.getJobId())); - jobTaskPrepare.setTriggerType(JobTriggerTypeEnum.WORKFLOW.getType()); - jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli()); - jobTaskPrepare.setWorkflowNodeId(workflowNode.getId()); - jobTaskPrepare.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId()); - jobTaskPrepare.setParentWorkflowNodeId(taskExecute.getParentId()); - // 执行预处理阶段 - ActorRef actorRef = ActorGenerator.jobTaskPrepareActor(); - actorRef.tell(jobTaskPrepare, actorRef); + // 批次已经存在就不在重复生成 + if (Objects.nonNull(jobTaskBatchMap.get(workflowNode.getId()))) { + continue; + } + + // 执行DAG中的节点 + WorkflowExecutor workflowExecutor = WorkflowExecutorFactory.getWorkflowExecutor(workflowNode.getNodeType()); + + WorkflowExecutorContext context = new WorkflowExecutorContext(); + context.setJob(jobMap.get(workflowNode.getJobId())); + context.setWorkflowNodeId(workflowNode.getWorkflowId()); + context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId()); + context.setParentWorkflowNodeId(taskExecute.getParentId()); + + workflowExecutor.execute(context); } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java new file mode 100644 index 000000000..abdf2e93c --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java @@ -0,0 +1,25 @@ +package com.aizuda.easy.retry.server.job.task.support.executor.workflow; + +import com.aizuda.easy.retry.server.job.task.support.WorkflowExecutor; +import org.springframework.beans.factory.InitializingBean; + +/** + * @author xiaowoniu + * @date 2023-12-24 08:15:19 + * @since 2.6.0 + */ +public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, InitializingBean { + + @Override + public void execute(WorkflowExecutorContext context) { + + doExecute(context); + } + + protected abstract void doExecute(WorkflowExecutorContext context); + + @Override + public void afterPropertiesSet() throws Exception { + WorkflowExecutorFactory.registerJobExecutor(getWorkflowNodeType(), this); + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java new file mode 100644 index 000000000..5d93e7711 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java @@ -0,0 +1,22 @@ +package com.aizuda.easy.retry.server.job.task.support.executor.workflow; + +import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum; +import org.springframework.stereotype.Component; + +/** + * @author xiaowoniu + * @date 2023-12-24 08:18:06 + * @since 2.6.0 + */ +@Component +public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor { + @Override + public WorkflowNodeTypeEnum getWorkflowNodeType() { + return WorkflowNodeTypeEnum.CALLBACK; + } + + @Override + protected void doExecute(WorkflowExecutorContext context) { + + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java new file mode 100644 index 000000000..71782489a --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java @@ -0,0 +1,80 @@ +package com.aizuda.easy.retry.server.job.task.support.executor.workflow; + +import akka.actor.ActorRef; +import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; +import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum; +import com.aizuda.easy.retry.common.core.util.JsonUtil; +import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum; +import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.common.util.DateUtils; +import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; +import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; +import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator; +import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; +import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.ExpressionParser; +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.expression.spel.support.StandardEvaluationContext; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * @author xiaowoniu + * @date 2023-12-24 08:17:11 + * @since 2.6.0 + */ +@Component +@RequiredArgsConstructor +public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor { + + private final static ExpressionParser ENGINE = new SpelExpressionParser(); + + private final JobTaskBatchGenerator jobTaskBatchGenerator; + + @Override + public WorkflowNodeTypeEnum getWorkflowNodeType() { + return WorkflowNodeTypeEnum.CONDITION; + } + + @Override + protected void doExecute(WorkflowExecutorContext context) { + + // 根据配置的表达式执行 + Boolean result = doEval(context.getExpression(), context.getExpressionContext()); + if (result) { + JobTaskBatchGeneratorContext generatorContext = new JobTaskBatchGeneratorContext(); + generatorContext.setGroupName(context.getGroupName()); + generatorContext.setNamespaceId(context.getNamespaceId()); + generatorContext.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus()); + generatorContext.setOperationReason(JobOperationReasonEnum.NONE.getReason()); + // 特殊的job + generatorContext.setJobId(-1L); + generatorContext.setWorkflowNodeId(context.getWorkflowNodeId()); + generatorContext.setParentWorkflowNodeId(context.getParentWorkflowNodeId()); + generatorContext.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); + jobTaskBatchGenerator.generateJobTaskBatch(generatorContext); + } else { + // + } + + } + + protected Boolean doEval(String expression, Map context) { + try { + final EvaluationContext evaluationContext = new StandardEvaluationContext(); + context.forEach(evaluationContext::setVariable); + return ENGINE.parseExpression(expression).getValue(evaluationContext, Boolean.class); + } catch (Exception e) { + throw new EasyRetryServerException("SpEL表达式解析异常. expression:[{}] context:[{}]", + expression, JsonUtil.toJsonString(context), e); + } + + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java new file mode 100644 index 000000000..289557ca5 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java @@ -0,0 +1,38 @@ +package com.aizuda.easy.retry.server.job.task.support.executor.workflow; + +import akka.actor.ActorRef; +import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum; +import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum; +import com.aizuda.easy.retry.server.common.util.DateUtils; +import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; +import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; +import org.springframework.stereotype.Component; + +/** + * @author xiaowoniu + * @date 2023-12-24 08:09:14 + * @since 2.6.0 + */ +@Component +public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor { + + @Override + public WorkflowNodeTypeEnum getWorkflowNodeType() { + return WorkflowNodeTypeEnum.JOB_TASK; + } + + @Override + protected void doExecute(WorkflowExecutorContext context) { + // 生成任务批次 + JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(context.getJob()); + jobTaskPrepare.setTriggerType(JobTriggerTypeEnum.WORKFLOW.getType()); + jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli()); + jobTaskPrepare.setWorkflowNodeId(context.getWorkflowNodeId()); + jobTaskPrepare.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); + jobTaskPrepare.setParentWorkflowNodeId(context.getParentWorkflowNodeId()); + // 执行预处理阶段 + ActorRef actorRef = ActorGenerator.jobTaskPrepareActor(); + actorRef.tell(jobTaskPrepare, actorRef); + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorContext.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorContext.java new file mode 100644 index 000000000..c0641810e --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorContext.java @@ -0,0 +1,54 @@ +package com.aizuda.easy.retry.server.job.task.support.executor.workflow; + +import com.aizuda.easy.retry.template.datasource.persistence.po.Job; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; +import lombok.Data; + +import java.util.List; +import java.util.Map; + +/** + * @author xiaowoniu + * @date 2023-12-24 + * @since 2.6.0 + */ +@Data +public class WorkflowExecutorContext { + + private String namespaceId; + + /** + * 组名称 + */ + private String groupName; + + /** + * 任务id + */ + private Long jobId; + + /** + * 工作流任务批次id + */ + private Long workflowTaskBatchId; + + /** + * 工作流节点id + */ + private Long workflowNodeId; + + /** + * 工作流父节点id + */ + private Long parentWorkflowNodeId; + + /** + * 任务属性 + */ + private Job job; + + private String expression; + + private Map expressionContext; + +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorFactory.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorFactory.java new file mode 100644 index 000000000..acdbd9c91 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorFactory.java @@ -0,0 +1,26 @@ +package com.aizuda.easy.retry.server.job.task.support.executor.workflow; + +import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum; +import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum; +import com.aizuda.easy.retry.server.job.task.support.JobExecutor; +import com.aizuda.easy.retry.server.job.task.support.WorkflowExecutor; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author xiaowoniu + * @date 2023-12-24 13:04:09 + * @since 2.6.0 + */ +public class WorkflowExecutorFactory { + + private static final ConcurrentHashMap CACHE = new ConcurrentHashMap<>(); + + protected static void registerJobExecutor(WorkflowNodeTypeEnum workflowNodeTypeEnum, WorkflowExecutor executor) { + CACHE.put(workflowNodeTypeEnum, executor); + } + + public static WorkflowExecutor getWorkflowExecutor(Integer type) { + return CACHE.get(WorkflowNodeTypeEnum.valueOf(type)); + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java index f61b830cf..e6f796fce 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java @@ -14,6 +14,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatch import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; @@ -51,7 +52,12 @@ public class JobTaskBatchGenerator { jobTaskBatch.setOperationReason(context.getOperationReason()); } - Assert.isTrue(1 == jobTaskBatchMapper.insert(jobTaskBatch), () -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", context.getJobId())); + try { + Assert.isTrue(1 == jobTaskBatchMapper.insert(jobTaskBatch), () -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", context.getJobId())); + } catch (DuplicateKeyException ignored) { + // 忽略重复的DAG任务 + return; + } // 非待处理状态无需进入时间轮中 if (JobTaskBatchStatusEnum.WAITING.getStatus() != jobTaskBatch.getTaskBatchStatus()) { diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java new file mode 100644 index 000000000..cada04b49 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java @@ -0,0 +1,79 @@ +package com.aizuda.easy.retry.server.job.task.support.handler; + +import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; +import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.common.util.DateUtils; +import com.aizuda.easy.retry.server.common.util.GraphUtils; +import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; +import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode; +import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.google.common.graph.MutableGraph; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.Optional; + +/** + * @author xiaowoniu + * @date 2023-12-24 07:53:18 + * @since 2.6.0 + */ +@Component +@RequiredArgsConstructor +public class WorkflowBatchHandler { + private final WorkflowTaskBatchMapper workflowTaskBatchMapper; + private final WorkflowNodeMapper workflowNodeMapper; + private final JobMapper jobMapper; + private final JobTaskBatchMapper jobTaskBatchMapper; + + public boolean complete(Long workflowTaskBatchId) throws IOException { + return complete(workflowTaskBatchId, null); + } + public boolean complete(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) throws IOException { + workflowTaskBatch = Optional.ofNullable(workflowTaskBatch).orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId)); + Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在")); + + String flowInfo = workflowTaskBatch.getFlowInfo(); + MutableGraph graph = GraphUtils.deserializeJsonToGraph(flowInfo); + + // 说明没有后继节点了, 此时需要判断整个DAG是否全部执行完成 + long executedTaskCount = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper() + .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId()) + .in(JobTaskBatch::getWorkflowNodeId, graph.nodes()) + ); + + Long taskNodeCount = workflowNodeMapper.selectCount(new LambdaQueryWrapper() +// .eq(WorkflowNode::getNodeType, 1) // TODO 任务节点 若最后一个节点是条件或者是回调节点 这个地方就有问题 + .in(WorkflowNode::getId, graph.nodes())); + + // TODO 若最后几个节点都是非任务节点,这里直接完成就会有问题 + if (executedTaskCount < taskNodeCount) { + return false; + } + + handlerTaskBatch(workflowTaskBatchId, JobTaskBatchStatusEnum.SUCCESS.getStatus(), JobOperationReasonEnum.NONE.getReason()); + return true; + + } + + private void handlerTaskBatch(Long workflowTaskBatchId, int taskStatus, int operationReason) { + + WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch(); + jobTaskBatch.setId(workflowTaskBatchId); + jobTaskBatch.setExecutionAt(DateUtils.toNowMilli()); + jobTaskBatch.setTaskBatchStatus(taskStatus); + jobTaskBatch.setOperationReason(operationReason); + Assert.isTrue(1 == workflowTaskBatchMapper.updateById(jobTaskBatch), + () -> new EasyRetryServerException("更新任务失败")); + + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/RunningWorkflowPrepareHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/RunningWorkflowPrepareHandler.java new file mode 100644 index 000000000..e2cd88f57 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/RunningWorkflowPrepareHandler.java @@ -0,0 +1,45 @@ +package com.aizuda.easy.retry.server.job.task.support.prepare.workflow; + +import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.easy.retry.common.core.util.JsonUtil; +import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO; +import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + * @author xiaowoniu + * @date 2023-12-23 23:09:07 + * @since 2.6.0 + */ +@Component +@RequiredArgsConstructor +@Slf4j +public class RunningWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler { + private final WorkflowBatchHandler workflowBatchHandler; + + @Override + public boolean matches(Integer status) { + return JobTaskBatchStatusEnum.RUNNING.getStatus() == status; + } + + @Override + protected void doHandler(WorkflowTaskPrepareDTO jobPrepareDTO) { + log.info("存在运行中的任务. prepare:[{}]", JsonUtil.toJsonString(jobPrepareDTO)); + + // 1. 若DAG已经支持完成了,由于异常原因导致的没有更新成终态此次进行一次更新操作 + try { + workflowBatchHandler.complete(jobPrepareDTO.getWorkflowTaskBatchId()); + } catch (IOException e) { + // TODO 待处理 + } + + // 2. 判断DAG是否已经支持超时 + // 3. 支持阻塞策略同JOB逻辑一致 + + + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/WaiWorkflowPrepareHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/WaiWorkflowPrepareHandler.java new file mode 100644 index 000000000..52a7877e5 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/WaiWorkflowPrepareHandler.java @@ -0,0 +1,52 @@ +package com.aizuda.easy.retry.server.job.task.support.prepare.workflow; + +import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.easy.retry.server.common.util.DateUtils; +import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; +import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO; +import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO; +import com.aizuda.easy.retry.server.job.task.dto.WorkflowTimerTaskDTO; +import com.aizuda.easy.retry.server.job.task.support.prepare.AbstractJobPrePareHandler; +import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerTask; +import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheel; +import com.aizuda.easy.retry.server.job.task.support.timer.WorkflowTimerTask; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.concurrent.TimeUnit; + +/** + * 处理处于{@link JobTaskBatchStatusEnum::WAIT}状态的任务 + * + * @author xiaowoniu + * @date 2023-10-05 18:29:22 + * @since 2.6.0 + */ +@Component +@Slf4j +public class WaiWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler { + + @Override + public boolean matches(Integer status) { + return JobTaskBatchStatusEnum.WAITING.getStatus() == status; + } + + @Override + protected void doHandler(WorkflowTaskPrepareDTO workflowTaskPrepareDTO) { + log.info("存在待处理任务. workflowTaskBatchId:[{}]", workflowTaskPrepareDTO.getWorkflowTaskBatchId()); + + // 若时间轮中数据不存在则重新加入 + if (!JobTimerWheel.isExisted(workflowTaskPrepareDTO.getWorkflowTaskBatchId())) { + log.info("存在待处理任务且时间轮中不存在 workflowTaskBatchId:[{}]", workflowTaskPrepareDTO.getWorkflowTaskBatchId()); + + // 进入时间轮 + long delay = workflowTaskPrepareDTO.getNextTriggerAt() - DateUtils.toNowMilli(); + WorkflowTimerTaskDTO workflowTimerTaskDTO = new WorkflowTimerTaskDTO(); + workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskPrepareDTO.getWorkflowTaskBatchId()); + workflowTimerTaskDTO.setWorkflowId(workflowTaskPrepareDTO.getWorkflowId()); + workflowTimerTaskDTO.setTriggerType(workflowTaskPrepareDTO.getTriggerType()); + JobTimerWheel.register(workflowTaskPrepareDTO.getWorkflowTaskBatchId(), + new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS); + } + } +}