feat: 2.6.0

1. 添加失败策略
This commit is contained in:
byteblogs168 2023-12-26 23:12:35 +08:00
parent e12235fda1
commit 2c52d20304
10 changed files with 99 additions and 23 deletions

View File

@ -454,6 +454,7 @@ CREATE TABLE `workflow`
`trigger_type` tinyint(4) NOT NULL COMMENT '触发类型 1.CRON 表达式 2. 固定时间', `trigger_type` tinyint(4) NOT NULL COMMENT '触发类型 1.CRON 表达式 2. 固定时间',
`trigger_interval` varchar(255) NOT NULL COMMENT '间隔时长', `trigger_interval` varchar(255) NOT NULL COMMENT '间隔时长',
`next_trigger_at` bigint(13) NOT NULL COMMENT '下次触发时间', `next_trigger_at` bigint(13) NOT NULL COMMENT '下次触发时间',
`block_strategy` tinyint(4) NOT NULL DEFAULT '1' COMMENT '阻塞策略 1、丢弃 2、覆盖 3、并行',
`executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒', `executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述', `description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
`flow_info` text DEFAULT NULL COMMENT '流程信息', `flow_info` text DEFAULT NULL COMMENT '流程信息',

View File

@ -0,0 +1,44 @@
package com.aizuda.easy.retry.common.core.enums;
import lombok.Getter;
import java.util.Objects;
/**
* @author xiaowoniu
* @date 2023-12-26 22:16:51
* @since 2.6.0
*/
@Getter
public enum FailStrategyEnum {
SKIP(1, "跳过"),
BLOCK(2, "阻塞");
private final Integer code;
private final String desc;
FailStrategyEnum(Integer code, String desc) {
this.code = code;
this.desc = desc;
}
public Integer getCode() {
return code;
}
public String getDesc() {
return desc;
}
public static FailStrategyEnum valueOf(Integer code) {
for (FailStrategyEnum failStrategyEnum : FailStrategyEnum.values()) {
if (Objects.equals(failStrategyEnum.code, code)) {
return failStrategyEnum;
}
}
return null;
}
}

View File

@ -50,6 +50,11 @@ public class Workflow implements Serializable {
*/ */
private Integer triggerType; private Integer triggerType;
/**
* 阻塞策略
*/
private Integer blockStrategy;
/** /**
* 触发间隔 * 触发间隔
*/ */

View File

@ -23,6 +23,11 @@ public class WorkflowPartitionTaskDTO extends PartitionTask {
*/ */
private String groupName; private String groupName;
/**
* 阻塞策略
*/
private Integer blockStrategy;
/** /**
* 触发类型 * 触发类型
*/ */

View File

@ -129,9 +129,9 @@ public class ScanWorkflowTaskActor extends AbstractActor {
List<Workflow> workflows = workflowMapper.selectPage(new PageDTO<>(0, systemProperties.getJobPullPageSize()), List<Workflow> workflows = workflowMapper.selectPage(new PageDTO<>(0, systemProperties.getJobPullPageSize()),
new LambdaQueryWrapper<Workflow>() new LambdaQueryWrapper<Workflow>()
.select(Workflow::getGroupName, Workflow::getNextTriggerAt, Workflow::getTriggerType, .select(Workflow::getId, Workflow::getGroupName, Workflow::getNextTriggerAt, Workflow::getTriggerType,
Workflow::getTriggerInterval, Workflow::getExecutorTimeout, Workflow::getTriggerInterval, Workflow::getExecutorTimeout, Workflow::getNamespaceId,
Workflow::getId, Workflow::getNamespaceId, Workflow::getFlowInfo) Workflow::getFlowInfo, Workflow::getBlockStrategy)
.eq(Workflow::getWorkflowStatus, StatusEnum.YES.getStatus()) .eq(Workflow::getWorkflowStatus, StatusEnum.YES.getStatus())
.eq(Workflow::getDeleted, StatusEnum.NO.getStatus()) .eq(Workflow::getDeleted, StatusEnum.NO.getStatus())
.in(Workflow::getBucketIndex, scanTask.getBuckets()) .in(Workflow::getBucketIndex, scanTask.getBuckets())

View File

@ -5,6 +5,7 @@ import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Pair; import cn.hutool.core.lang.Pair;
import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.FailStrategyEnum;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.log.LogUtils;
@ -34,6 +35,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatc
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import com.google.common.graph.GraphBuilder; import com.google.common.graph.GraphBuilder;
import com.google.common.graph.MutableGraph; import com.google.common.graph.MutableGraph;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
@ -91,27 +93,33 @@ public class WorkflowExecutorActor extends AbstractActor {
Set<Long> successors = graph.successors(taskExecute.getParentId()); Set<Long> successors = graph.successors(taskExecute.getParentId());
if (CollectionUtils.isEmpty(successors)) { if (CollectionUtils.isEmpty(successors)) {
boolean complete = workflowBatchHandler.complete(taskExecute.getWorkflowTaskBatchId(), workflowTaskBatch); workflowBatchHandler.complete(taskExecute.getWorkflowTaskBatchId(), workflowTaskBatch);
return; return;
} }
// 添加父节点为了判断父节点的处理状态 // 添加父节点为了判断父节点的处理状态
successors.add(taskExecute.getParentId());
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>() List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
.select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId) .select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId)
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId()) .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
.in(JobTaskBatch::getWorkflowNodeId, successors) .in(JobTaskBatch::getWorkflowNodeId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId())))
); );
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>() List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
.in(WorkflowNode::getId, successors).orderByAsc(WorkflowNode::getPriorityLevel)); .in(WorkflowNode::getId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId()))).orderByAsc(WorkflowNode::getPriorityLevel));
Map<Long, JobTaskBatch> jobTaskBatchMap = jobTaskBatchList.stream().collect(Collectors.toMap(JobTaskBatch::getWorkflowNodeId, i -> i)); Map<Long, JobTaskBatch> jobTaskBatchMap = jobTaskBatchList.stream().collect(Collectors.toMap(JobTaskBatch::getWorkflowNodeId, i -> i));
Map<Long, WorkflowNode> workflowNodeMap = workflowNodes.stream().collect(Collectors.toMap(WorkflowNode::getId, i -> i));
JobTaskBatch jobTaskBatch = jobTaskBatchMap.get(taskExecute.getParentId()); JobTaskBatch jobTaskBatch = jobTaskBatchMap.get(taskExecute.getParentId());
if (JobTaskBatchStatusEnum.SUCCESS.getStatus() != jobTaskBatch.getTaskBatchStatus()) {
// 判断是否继续处理根据失败策略
}
// 失败策略处理
if (Objects.nonNull(jobTaskBatch) && JobTaskBatchStatusEnum.SUCCESS.getStatus() != jobTaskBatch.getTaskBatchStatus()) {
// 判断是否继续处理根据失败策略
WorkflowNode workflowNode = workflowNodeMap.get(taskExecute.getParentId());
// 失败了阻塞策略
if (Objects.equals(workflowNode.getFailStrategy(), FailStrategyEnum.BLOCK.getCode())) {
return;
}
}
List<Job> jobs = jobMapper.selectBatchIds(workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet())); List<Job> jobs = jobMapper.selectBatchIds(workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet()));
Map<Long, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i)); Map<Long, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i));

View File

@ -6,11 +6,13 @@ import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.JobPrePareHandler;
import com.aizuda.easy.retry.server.job.task.support.WorkflowPrePareHandler; import com.aizuda.easy.retry.server.job.task.support.WorkflowPrePareHandler;
import com.aizuda.easy.retry.server.job.task.support.prepare.TerminalJobPrepareHandler; import com.aizuda.easy.retry.server.job.task.support.prepare.TerminalJobPrepareHandler;
import com.aizuda.easy.retry.server.job.task.support.prepare.workflow.TerminalWorkflowPrepareHandler; import com.aizuda.easy.retry.server.job.task.support.prepare.workflow.TerminalWorkflowPrepareHandler;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch; import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -65,8 +67,21 @@ public class WorkflowTaskPrepareActor extends AbstractActor {
} }
} }
} else { } else {
// 判断任务是否执行超时 boolean onlyTimeoutCheck = false;
// 任务是否为发起调用 for (WorkflowTaskBatch workflowTaskBatch : workflowTaskBatches) {
workflowTaskPrepareDTO.setExecutionAt(workflowTaskBatch.getExecutionAt());
workflowTaskPrepareDTO.setWorkflowTaskBatchId(workflowTaskBatch.getId());
workflowTaskPrepareDTO.setOnlyTimeoutCheck(onlyTimeoutCheck);
for (WorkflowPrePareHandler prePareHandler : workflowPrePareHandlers) {
if (prePareHandler.matches(workflowTaskBatch.getTaskBatchStatus())) {
prePareHandler.handler(workflowTaskPrepareDTO);
break;
}
}
// 当存在大量待处理任务时除了第一个任务需要执行阻塞策略其他任务只做任务检查
onlyTimeoutCheck = true;
}
} }
} }

View File

@ -38,17 +38,8 @@ public class WorkflowBatchGenerator {
// 生成任务批次 // 生成任务批次
WorkflowTaskBatch workflowTaskBatch = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatch(context); WorkflowTaskBatch workflowTaskBatch = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatch(context);
workflowTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.WAITING.getStatus()); workflowTaskBatch.setTaskBatchStatus(Optional.ofNullable(context.getTaskBatchStatus()).orElse(JobTaskBatchStatusEnum.WAITING.getStatus()));
workflowTaskBatch.setOperationReason(context.getOperationReason());
// 无执行的节点
if (CollectionUtils.isEmpty(CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()))) {
workflowTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
workflowTaskBatch.setOperationReason(JobOperationReasonEnum.NOT_CLIENT.getReason());
} else {
// 生成一个新的任务
workflowTaskBatch.setTaskBatchStatus(Optional.ofNullable(context.getTaskBatchStatus()).orElse(JobTaskBatchStatusEnum.WAITING.getStatus()));
workflowTaskBatch.setOperationReason(context.getOperationReason());
}
Assert.isTrue(1 == workflowTaskBatchMapper.insert(workflowTaskBatch), () -> new EasyRetryServerException("新增调度任务失败. [{}]", context.getWorkflowId())); Assert.isTrue(1 == workflowTaskBatchMapper.insert(workflowTaskBatch), () -> new EasyRetryServerException("新增调度任务失败. [{}]", context.getWorkflowId()));

View File

@ -188,6 +188,10 @@ public class WorkflowBatchHandler {
.in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE) .in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE)
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId)); .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId));
if (CollectionUtils.isEmpty(jobTaskBatches)) {
return;
}
List<Job> jobs = jobMapper.selectBatchIds( List<Job> jobs = jobMapper.selectBatchIds(
jobTaskBatches.stream().map(JobTaskBatch::getJobId).collect(Collectors.toSet())); jobTaskBatches.stream().map(JobTaskBatch::getJobId).collect(Collectors.toSet()));

View File

@ -35,6 +35,9 @@ public class WorkflowRequestVO {
@NotNull(message = "执行超时时间不能为空") @NotNull(message = "执行超时时间不能为空")
private Integer executorTimeout; private Integer executorTimeout;
@NotNull(message = "阻塞策略不能为空")
private Integer blockStrategy;
/** /**
* 0关闭1开启 * 0关闭1开启
*/ */