From f802ce02385843bf36ae18dcf0cd99bbe3f393a8 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Tue, 26 Dec 2023 23:12:35 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.6.0=201.=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E5=A4=B1=E8=B4=A5=E7=AD=96=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/sql/easy_retry_mysql.sql | 1 + .../common/core/enums/FailStrategyEnum.java | 44 +++++++++++++++++++ .../datasource/persistence/po/Workflow.java | 5 +++ .../task/dto/WorkflowPartitionTaskDTO.java | 5 +++ .../dispatch/ScanWorkflowTaskActor.java | 6 +-- .../dispatch/WorkflowExecutorActor.java | 22 +++++++--- .../dispatch/WorkflowTaskPrepareActor.java | 19 +++++++- .../batch/WorkflowBatchGenerator.java | 13 +----- .../support/handler/WorkflowBatchHandler.java | 4 ++ .../web/model/request/WorkflowRequestVO.java | 3 ++ 10 files changed, 99 insertions(+), 23 deletions(-) create mode 100644 easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/FailStrategyEnum.java diff --git a/doc/sql/easy_retry_mysql.sql b/doc/sql/easy_retry_mysql.sql index 26c41e5d5..a7d8742fd 100644 --- a/doc/sql/easy_retry_mysql.sql +++ b/doc/sql/easy_retry_mysql.sql @@ -454,6 +454,7 @@ CREATE TABLE `workflow` `trigger_type` tinyint(4) NOT NULL COMMENT '触发类型 1.CRON 表达式 2. 固定时间', `trigger_interval` varchar(255) NOT NULL COMMENT '间隔时长', `next_trigger_at` bigint(13) NOT NULL COMMENT '下次触发时间', + `block_strategy` tinyint(4) NOT NULL DEFAULT '1' COMMENT '阻塞策略 1、丢弃 2、覆盖 3、并行', `executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒', `description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述', `flow_info` text DEFAULT NULL COMMENT '流程信息', diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/FailStrategyEnum.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/FailStrategyEnum.java new file mode 100644 index 000000000..4a2b93cfa --- /dev/null +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/FailStrategyEnum.java @@ -0,0 +1,44 @@ +package com.aizuda.easy.retry.common.core.enums; + +import lombok.Getter; + +import java.util.Objects; + +/** + * @author xiaowoniu + * @date 2023-12-26 22:16:51 + * @since 2.6.0 + */ +@Getter +public enum FailStrategyEnum { + + SKIP(1, "跳过"), + BLOCK(2, "阻塞"); + + private final Integer code; + private final String desc; + + FailStrategyEnum(Integer code, String desc) { + this.code = code; + this.desc = desc; + } + + public Integer getCode() { + return code; + } + + public String getDesc() { + return desc; + } + + public static FailStrategyEnum valueOf(Integer code) { + for (FailStrategyEnum failStrategyEnum : FailStrategyEnum.values()) { + if (Objects.equals(failStrategyEnum.code, code)) { + return failStrategyEnum; + } + } + + return null; + } + +} diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Workflow.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Workflow.java index 7871f1538..64c6f4838 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Workflow.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Workflow.java @@ -50,6 +50,11 @@ public class Workflow implements Serializable { */ private Integer triggerType; + /** + * 阻塞策略 + */ + private Integer blockStrategy; + /** * 触发间隔 */ diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowPartitionTaskDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowPartitionTaskDTO.java index db3e20470..82eb5c06f 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowPartitionTaskDTO.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowPartitionTaskDTO.java @@ -23,6 +23,11 @@ public class WorkflowPartitionTaskDTO extends PartitionTask { */ private String groupName; + /** + * 阻塞策略 + */ + private Integer blockStrategy; + /** * 触发类型 */ diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java index cb4517ffb..c64b92fe2 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java @@ -129,9 +129,9 @@ public class ScanWorkflowTaskActor extends AbstractActor { List workflows = workflowMapper.selectPage(new PageDTO<>(0, systemProperties.getJobPullPageSize()), new LambdaQueryWrapper() - .select(Workflow::getGroupName, Workflow::getNextTriggerAt, Workflow::getTriggerType, - Workflow::getTriggerInterval, Workflow::getExecutorTimeout, - Workflow::getId, Workflow::getNamespaceId, Workflow::getFlowInfo) + .select(Workflow::getId, Workflow::getGroupName, Workflow::getNextTriggerAt, Workflow::getTriggerType, + Workflow::getTriggerInterval, Workflow::getExecutorTimeout, Workflow::getNamespaceId, + Workflow::getFlowInfo, Workflow::getBlockStrategy) .eq(Workflow::getWorkflowStatus, StatusEnum.YES.getStatus()) .eq(Workflow::getDeleted, StatusEnum.NO.getStatus()) .in(Workflow::getBucketIndex, scanTask.getBuckets()) diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java index eb997e95c..9da7cf959 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -5,6 +5,7 @@ import akka.actor.ActorRef; import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Pair; import com.aizuda.easy.retry.common.core.constant.SystemConstants; +import com.aizuda.easy.retry.common.core.enums.FailStrategyEnum; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; @@ -34,6 +35,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatc import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Sets; import com.google.common.graph.GraphBuilder; import com.google.common.graph.MutableGraph; import lombok.RequiredArgsConstructor; @@ -91,27 +93,33 @@ public class WorkflowExecutorActor extends AbstractActor { Set successors = graph.successors(taskExecute.getParentId()); if (CollectionUtils.isEmpty(successors)) { - boolean complete = workflowBatchHandler.complete(taskExecute.getWorkflowTaskBatchId(), workflowTaskBatch); + workflowBatchHandler.complete(taskExecute.getWorkflowTaskBatchId(), workflowTaskBatch); return; } // 添加父节点,为了判断父节点的处理状态 - successors.add(taskExecute.getParentId()); List jobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper() .select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId) .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId()) - .in(JobTaskBatch::getWorkflowNodeId, successors) + .in(JobTaskBatch::getWorkflowNodeId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId()))) ); List workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper() - .in(WorkflowNode::getId, successors).orderByAsc(WorkflowNode::getPriorityLevel)); + .in(WorkflowNode::getId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId()))).orderByAsc(WorkflowNode::getPriorityLevel)); Map jobTaskBatchMap = jobTaskBatchList.stream().collect(Collectors.toMap(JobTaskBatch::getWorkflowNodeId, i -> i)); + Map workflowNodeMap = workflowNodes.stream().collect(Collectors.toMap(WorkflowNode::getId, i -> i)); JobTaskBatch jobTaskBatch = jobTaskBatchMap.get(taskExecute.getParentId()); - if (JobTaskBatchStatusEnum.SUCCESS.getStatus() != jobTaskBatch.getTaskBatchStatus()) { - // 判断是否继续处理,根据失败策略 - } + // 失败策略处理 + if (Objects.nonNull(jobTaskBatch) && JobTaskBatchStatusEnum.SUCCESS.getStatus() != jobTaskBatch.getTaskBatchStatus()) { + // 判断是否继续处理,根据失败策略 + WorkflowNode workflowNode = workflowNodeMap.get(taskExecute.getParentId()); + // 失败了阻塞策略 + if (Objects.equals(workflowNode.getFailStrategy(), FailStrategyEnum.BLOCK.getCode())) { + return; + } + } List jobs = jobMapper.selectBatchIds(workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet())); Map jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i)); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowTaskPrepareActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowTaskPrepareActor.java index dd969c0b8..7900a4c8f 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowTaskPrepareActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowTaskPrepareActor.java @@ -6,11 +6,13 @@ import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO; +import com.aizuda.easy.retry.server.job.task.support.JobPrePareHandler; import com.aizuda.easy.retry.server.job.task.support.WorkflowPrePareHandler; import com.aizuda.easy.retry.server.job.task.support.prepare.TerminalJobPrepareHandler; import com.aizuda.easy.retry.server.job.task.support.prepare.workflow.TerminalWorkflowPrepareHandler; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.google.common.collect.Lists; @@ -65,8 +67,21 @@ public class WorkflowTaskPrepareActor extends AbstractActor { } } } else { - // 判断任务是否执行超时 - // 任务是否为发起调用 + boolean onlyTimeoutCheck = false; + for (WorkflowTaskBatch workflowTaskBatch : workflowTaskBatches) { + workflowTaskPrepareDTO.setExecutionAt(workflowTaskBatch.getExecutionAt()); + workflowTaskPrepareDTO.setWorkflowTaskBatchId(workflowTaskBatch.getId()); + workflowTaskPrepareDTO.setOnlyTimeoutCheck(onlyTimeoutCheck); + for (WorkflowPrePareHandler prePareHandler : workflowPrePareHandlers) { + if (prePareHandler.matches(workflowTaskBatch.getTaskBatchStatus())) { + prePareHandler.handler(workflowTaskPrepareDTO); + break; + } + } + + // 当存在大量待处理任务时,除了第一个任务需要执行阻塞策略,其他任务只做任务检查 + onlyTimeoutCheck = true; + } } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowBatchGenerator.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowBatchGenerator.java index 6c46dc466..7441ed39e 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowBatchGenerator.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowBatchGenerator.java @@ -38,17 +38,8 @@ public class WorkflowBatchGenerator { // 生成任务批次 WorkflowTaskBatch workflowTaskBatch = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatch(context); - workflowTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.WAITING.getStatus()); - - // 无执行的节点 - if (CollectionUtils.isEmpty(CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()))) { - workflowTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus()); - workflowTaskBatch.setOperationReason(JobOperationReasonEnum.NOT_CLIENT.getReason()); - } else { - // 生成一个新的任务 - workflowTaskBatch.setTaskBatchStatus(Optional.ofNullable(context.getTaskBatchStatus()).orElse(JobTaskBatchStatusEnum.WAITING.getStatus())); - workflowTaskBatch.setOperationReason(context.getOperationReason()); - } + workflowTaskBatch.setTaskBatchStatus(Optional.ofNullable(context.getTaskBatchStatus()).orElse(JobTaskBatchStatusEnum.WAITING.getStatus())); + workflowTaskBatch.setOperationReason(context.getOperationReason()); Assert.isTrue(1 == workflowTaskBatchMapper.insert(workflowTaskBatch), () -> new EasyRetryServerException("新增调度任务失败. [{}]", context.getWorkflowId())); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java index c287db4ef..15eb5e53f 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java @@ -188,6 +188,10 @@ public class WorkflowBatchHandler { .in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE) .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId)); + if (CollectionUtils.isEmpty(jobTaskBatches)) { + return; + } + List jobs = jobMapper.selectBatchIds( jobTaskBatches.stream().map(JobTaskBatch::getJobId).collect(Collectors.toSet())); diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/model/request/WorkflowRequestVO.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/model/request/WorkflowRequestVO.java index 8273ca2c0..c96bd9148 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/model/request/WorkflowRequestVO.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/model/request/WorkflowRequestVO.java @@ -35,6 +35,9 @@ public class WorkflowRequestVO { @NotNull(message = "执行超时时间不能为空") private Integer executorTimeout; + @NotNull(message = "阻塞策略不能为空") + private Integer blockStrategy; + /** * 0、关闭、1、开启 */