feat: 2.6.0
1. 添加失败策略
This commit is contained in:
parent
326b1ecbe0
commit
f802ce0238
@ -454,6 +454,7 @@ CREATE TABLE `workflow`
|
||||
`trigger_type` tinyint(4) NOT NULL COMMENT '触发类型 1.CRON 表达式 2. 固定时间',
|
||||
`trigger_interval` varchar(255) NOT NULL COMMENT '间隔时长',
|
||||
`next_trigger_at` bigint(13) NOT NULL COMMENT '下次触发时间',
|
||||
`block_strategy` tinyint(4) NOT NULL DEFAULT '1' COMMENT '阻塞策略 1、丢弃 2、覆盖 3、并行',
|
||||
`executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',
|
||||
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
|
||||
`flow_info` text DEFAULT NULL COMMENT '流程信息',
|
||||
|
@ -0,0 +1,44 @@
|
||||
package com.aizuda.easy.retry.common.core.enums;
|
||||
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* @author xiaowoniu
|
||||
* @date 2023-12-26 22:16:51
|
||||
* @since 2.6.0
|
||||
*/
|
||||
@Getter
|
||||
public enum FailStrategyEnum {
|
||||
|
||||
SKIP(1, "跳过"),
|
||||
BLOCK(2, "阻塞");
|
||||
|
||||
private final Integer code;
|
||||
private final String desc;
|
||||
|
||||
FailStrategyEnum(Integer code, String desc) {
|
||||
this.code = code;
|
||||
this.desc = desc;
|
||||
}
|
||||
|
||||
public Integer getCode() {
|
||||
return code;
|
||||
}
|
||||
|
||||
public String getDesc() {
|
||||
return desc;
|
||||
}
|
||||
|
||||
public static FailStrategyEnum valueOf(Integer code) {
|
||||
for (FailStrategyEnum failStrategyEnum : FailStrategyEnum.values()) {
|
||||
if (Objects.equals(failStrategyEnum.code, code)) {
|
||||
return failStrategyEnum;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
@ -50,6 +50,11 @@ public class Workflow implements Serializable {
|
||||
*/
|
||||
private Integer triggerType;
|
||||
|
||||
/**
|
||||
* 阻塞策略
|
||||
*/
|
||||
private Integer blockStrategy;
|
||||
|
||||
/**
|
||||
* 触发间隔
|
||||
*/
|
||||
|
@ -23,6 +23,11 @@ public class WorkflowPartitionTaskDTO extends PartitionTask {
|
||||
*/
|
||||
private String groupName;
|
||||
|
||||
/**
|
||||
* 阻塞策略
|
||||
*/
|
||||
private Integer blockStrategy;
|
||||
|
||||
/**
|
||||
* 触发类型
|
||||
*/
|
||||
|
@ -129,9 +129,9 @@ public class ScanWorkflowTaskActor extends AbstractActor {
|
||||
|
||||
List<Workflow> workflows = workflowMapper.selectPage(new PageDTO<>(0, systemProperties.getJobPullPageSize()),
|
||||
new LambdaQueryWrapper<Workflow>()
|
||||
.select(Workflow::getGroupName, Workflow::getNextTriggerAt, Workflow::getTriggerType,
|
||||
Workflow::getTriggerInterval, Workflow::getExecutorTimeout,
|
||||
Workflow::getId, Workflow::getNamespaceId, Workflow::getFlowInfo)
|
||||
.select(Workflow::getId, Workflow::getGroupName, Workflow::getNextTriggerAt, Workflow::getTriggerType,
|
||||
Workflow::getTriggerInterval, Workflow::getExecutorTimeout, Workflow::getNamespaceId,
|
||||
Workflow::getFlowInfo, Workflow::getBlockStrategy)
|
||||
.eq(Workflow::getWorkflowStatus, StatusEnum.YES.getStatus())
|
||||
.eq(Workflow::getDeleted, StatusEnum.NO.getStatus())
|
||||
.in(Workflow::getBucketIndex, scanTask.getBuckets())
|
||||
|
@ -5,6 +5,7 @@ import akka.actor.ActorRef;
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.lang.Pair;
|
||||
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
|
||||
import com.aizuda.easy.retry.common.core.enums.FailStrategyEnum;
|
||||
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
|
||||
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
|
||||
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||
@ -34,6 +35,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatc
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.graph.GraphBuilder;
|
||||
import com.google.common.graph.MutableGraph;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
@ -91,27 +93,33 @@ public class WorkflowExecutorActor extends AbstractActor {
|
||||
|
||||
Set<Long> successors = graph.successors(taskExecute.getParentId());
|
||||
if (CollectionUtils.isEmpty(successors)) {
|
||||
boolean complete = workflowBatchHandler.complete(taskExecute.getWorkflowTaskBatchId(), workflowTaskBatch);
|
||||
workflowBatchHandler.complete(taskExecute.getWorkflowTaskBatchId(), workflowTaskBatch);
|
||||
return;
|
||||
}
|
||||
|
||||
// 添加父节点,为了判断父节点的处理状态
|
||||
successors.add(taskExecute.getParentId());
|
||||
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
|
||||
.select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId)
|
||||
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
|
||||
.in(JobTaskBatch::getWorkflowNodeId, successors)
|
||||
.in(JobTaskBatch::getWorkflowNodeId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId())))
|
||||
);
|
||||
|
||||
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
|
||||
.in(WorkflowNode::getId, successors).orderByAsc(WorkflowNode::getPriorityLevel));
|
||||
.in(WorkflowNode::getId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId()))).orderByAsc(WorkflowNode::getPriorityLevel));
|
||||
|
||||
Map<Long, JobTaskBatch> jobTaskBatchMap = jobTaskBatchList.stream().collect(Collectors.toMap(JobTaskBatch::getWorkflowNodeId, i -> i));
|
||||
Map<Long, WorkflowNode> workflowNodeMap = workflowNodes.stream().collect(Collectors.toMap(WorkflowNode::getId, i -> i));
|
||||
JobTaskBatch jobTaskBatch = jobTaskBatchMap.get(taskExecute.getParentId());
|
||||
if (JobTaskBatchStatusEnum.SUCCESS.getStatus() != jobTaskBatch.getTaskBatchStatus()) {
|
||||
// 判断是否继续处理,根据失败策略
|
||||
}
|
||||
|
||||
// 失败策略处理
|
||||
if (Objects.nonNull(jobTaskBatch) && JobTaskBatchStatusEnum.SUCCESS.getStatus() != jobTaskBatch.getTaskBatchStatus()) {
|
||||
// 判断是否继续处理,根据失败策略
|
||||
WorkflowNode workflowNode = workflowNodeMap.get(taskExecute.getParentId());
|
||||
// 失败了阻塞策略
|
||||
if (Objects.equals(workflowNode.getFailStrategy(), FailStrategyEnum.BLOCK.getCode())) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
List<Job> jobs = jobMapper.selectBatchIds(workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet()));
|
||||
Map<Long, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i));
|
||||
|
@ -6,11 +6,13 @@ import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
|
||||
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
|
||||
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
|
||||
import com.aizuda.easy.retry.server.job.task.support.JobPrePareHandler;
|
||||
import com.aizuda.easy.retry.server.job.task.support.WorkflowPrePareHandler;
|
||||
import com.aizuda.easy.retry.server.job.task.support.prepare.TerminalJobPrepareHandler;
|
||||
import com.aizuda.easy.retry.server.job.task.support.prepare.workflow.TerminalWorkflowPrepareHandler;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.google.common.collect.Lists;
|
||||
@ -65,8 +67,21 @@ public class WorkflowTaskPrepareActor extends AbstractActor {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// 判断任务是否执行超时
|
||||
// 任务是否为发起调用
|
||||
boolean onlyTimeoutCheck = false;
|
||||
for (WorkflowTaskBatch workflowTaskBatch : workflowTaskBatches) {
|
||||
workflowTaskPrepareDTO.setExecutionAt(workflowTaskBatch.getExecutionAt());
|
||||
workflowTaskPrepareDTO.setWorkflowTaskBatchId(workflowTaskBatch.getId());
|
||||
workflowTaskPrepareDTO.setOnlyTimeoutCheck(onlyTimeoutCheck);
|
||||
for (WorkflowPrePareHandler prePareHandler : workflowPrePareHandlers) {
|
||||
if (prePareHandler.matches(workflowTaskBatch.getTaskBatchStatus())) {
|
||||
prePareHandler.handler(workflowTaskPrepareDTO);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// 当存在大量待处理任务时,除了第一个任务需要执行阻塞策略,其他任务只做任务检查
|
||||
onlyTimeoutCheck = true;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -38,17 +38,8 @@ public class WorkflowBatchGenerator {
|
||||
|
||||
// 生成任务批次
|
||||
WorkflowTaskBatch workflowTaskBatch = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatch(context);
|
||||
workflowTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.WAITING.getStatus());
|
||||
|
||||
// 无执行的节点
|
||||
if (CollectionUtils.isEmpty(CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()))) {
|
||||
workflowTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
|
||||
workflowTaskBatch.setOperationReason(JobOperationReasonEnum.NOT_CLIENT.getReason());
|
||||
} else {
|
||||
// 生成一个新的任务
|
||||
workflowTaskBatch.setTaskBatchStatus(Optional.ofNullable(context.getTaskBatchStatus()).orElse(JobTaskBatchStatusEnum.WAITING.getStatus()));
|
||||
workflowTaskBatch.setOperationReason(context.getOperationReason());
|
||||
}
|
||||
workflowTaskBatch.setTaskBatchStatus(Optional.ofNullable(context.getTaskBatchStatus()).orElse(JobTaskBatchStatusEnum.WAITING.getStatus()));
|
||||
workflowTaskBatch.setOperationReason(context.getOperationReason());
|
||||
|
||||
Assert.isTrue(1 == workflowTaskBatchMapper.insert(workflowTaskBatch), () -> new EasyRetryServerException("新增调度任务失败. [{}]", context.getWorkflowId()));
|
||||
|
||||
|
@ -188,6 +188,10 @@ public class WorkflowBatchHandler {
|
||||
.in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE)
|
||||
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId));
|
||||
|
||||
if (CollectionUtils.isEmpty(jobTaskBatches)) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<Job> jobs = jobMapper.selectBatchIds(
|
||||
jobTaskBatches.stream().map(JobTaskBatch::getJobId).collect(Collectors.toSet()));
|
||||
|
||||
|
@ -35,6 +35,9 @@ public class WorkflowRequestVO {
|
||||
@NotNull(message = "执行超时时间不能为空")
|
||||
private Integer executorTimeout;
|
||||
|
||||
@NotNull(message = "阻塞策略不能为空")
|
||||
private Integer blockStrategy;
|
||||
|
||||
/**
|
||||
* 0、关闭、1、开启
|
||||
*/
|
||||
|
Loading…
Reference in New Issue
Block a user