From e12235fda10dc6f6caa9e645d2276ff423d5c193 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Tue, 26 Dec 2023 18:35:24 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.6.0=201.=20=E8=B0=83=E8=AF=95?= =?UTF-8?q?=E6=9D=A1=E4=BB=B6=E8=8A=82=E7=82=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/enums/JobOperationReasonEnum.java | 3 +- .../job/task/dto/WorkflowTaskPrepareDTO.java | 15 ++ .../job/task/support/BlockStrategy.java | 7 + .../job/task/support/JobTaskConverter.java | 1 + .../task/support/WorkflowTaskConverter.java | 5 + .../AbstractWorkflowBlockStrategy.java | 32 ++++ .../ConcurrencyWorkflowBlockStrategy.java | 32 ++++ .../DiscardWorkflowBlockStrategy.java | 34 ++++ .../OverlayWorkflowBlockStrategy.java | 64 +++++++ .../WorkflowBlockStrategyContext.java | 25 +++ .../WorkflowBlockStrategyFactory.java | 28 ++++ .../support/dispatch/JobExecutorActor.java | 23 +++ .../dispatch/WorkflowExecutorActor.java | 16 +- .../workflow/ConditionWorkflowExecutor.java | 7 +- .../batch/WorkflowBatchGenerator.java | 24 ++- .../support/handler/WorkflowBatchHandler.java | 158 ++++++++++++++++-- .../RunningWorkflowPrepareHandler.java | 56 ++++++- .../task/support/stop/JobTaskStopFactory.java | 9 +- .../support/strategy/BlockStrategies.java | 10 ++ 19 files changed, 517 insertions(+), 32 deletions(-) create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/AbstractWorkflowBlockStrategy.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/ConcurrencyWorkflowBlockStrategy.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/DiscardWorkflowBlockStrategy.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/OverlayWorkflowBlockStrategy.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/WorkflowBlockStrategyContext.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/WorkflowBlockStrategyFactory.java diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobOperationReasonEnum.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobOperationReasonEnum.java index 0f4f6616..20855bae 100644 --- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobOperationReasonEnum.java +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobOperationReasonEnum.java @@ -4,8 +4,6 @@ import cn.hutool.core.util.StrUtil; import lombok.AllArgsConstructor; import lombok.Getter; -import java.util.HashMap; - /** * 标识某个操作的具体原因 * @@ -27,6 +25,7 @@ public enum JobOperationReasonEnum { TASK_EXECUTE_ERROR(7, "任务执行期间发生非预期异常"), MANNER_STOP(8, "手动停止"), WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR(8, "条件节点执行异常"), + JOB_TASK_INTERRUPTED(8, "任务中断"), ; private final int reason; diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java index 86b0dddb..6a179c0b 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java @@ -19,6 +19,11 @@ public class WorkflowTaskPrepareDTO { */ private Integer triggerType; + /** + * 阻塞策略 1、丢弃 2、覆盖 3、并行 + */ + private Integer blockStrategy; + /** * 工作流名称 */ @@ -58,4 +63,14 @@ public class WorkflowTaskPrepareDTO { * 下次触发时间 */ private long nextTriggerAt; + + /** + * 任务执行时间 + */ + private Long executionAt; + + /** + * 仅做超时检测 + */ + private boolean onlyTimeoutCheck; } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/BlockStrategy.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/BlockStrategy.java index 8d755be2..4602129b 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/BlockStrategy.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/BlockStrategy.java @@ -5,7 +5,14 @@ import com.aizuda.easy.retry.server.job.task.support.strategy.BlockStrategies.Bl /** * @author: www.byteblogs.com * @date : 2023-09-25 17:53 + * @since : 1.0.0 */ public interface BlockStrategy { + + /** + * 阻塞策略 + * + * @param context 策略上下文 + */ void block(BlockStrategyContext context); } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java index 816fbab6..59378728 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java @@ -52,6 +52,7 @@ public interface JobTaskConverter { TaskStopJobContext toStopJobContext(BlockStrategies.BlockStrategyContext context); TaskStopJobContext toStopJobContext(JobExecutorResultDTO context); + TaskStopJobContext toStopJobContext(Job job); TaskStopJobContext toStopJobContext(JobTaskPrepareDTO context); JobLogMessage toJobLogMessage(JobLogDTO jobLogDTO); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java index eb4b44fd..7d375b12 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java @@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support; import com.aizuda.easy.retry.server.job.task.dto.WorkflowPartitionTaskDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO; +import com.aizuda.easy.retry.server.job.task.support.block.workflow.WorkflowBlockStrategyContext; import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowExecutorContext; import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext; import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext; @@ -41,4 +42,8 @@ public interface WorkflowTaskConverter { @Mapping(source = "id", target = "workflowNodeId") ) WorkflowExecutorContext toWorkflowExecutorContext(WorkflowNode workflowNode); + + WorkflowTaskBatchGeneratorContext toWorkflowTaskBatchGeneratorContext(WorkflowBlockStrategyContext context); + + WorkflowBlockStrategyContext toWorkflowBlockStrategyContext(WorkflowTaskPrepareDTO prepareDTO); } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/AbstractWorkflowBlockStrategy.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/AbstractWorkflowBlockStrategy.java new file mode 100644 index 00000000..8efcdea0 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/AbstractWorkflowBlockStrategy.java @@ -0,0 +1,32 @@ +package com.aizuda.easy.retry.server.job.task.support.block.workflow; + +import com.aizuda.easy.retry.server.job.task.support.BlockStrategy; +import com.aizuda.easy.retry.server.job.task.support.strategy.BlockStrategies.BlockStrategyContext; +import com.aizuda.easy.retry.server.job.task.support.strategy.BlockStrategies.BlockStrategyEnum; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.transaction.annotation.Transactional; + +/** + * @author: xiaowoniu + * @date : 2023-12-26 + * @since : 2.6.0 + */ +public abstract class AbstractWorkflowBlockStrategy implements BlockStrategy, InitializingBean { + + @Override + @Transactional + public void block(final BlockStrategyContext context) { + WorkflowBlockStrategyContext workflowBlockStrategyContext = (WorkflowBlockStrategyContext) context; + + doBlock(workflowBlockStrategyContext); + } + + protected abstract void doBlock(final WorkflowBlockStrategyContext workflowBlockStrategyContext); + + protected abstract BlockStrategyEnum blockStrategyEnum(); + + @Override + public void afterPropertiesSet() throws Exception { + WorkflowBlockStrategyFactory.registerTaskStop(blockStrategyEnum(), this); + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/ConcurrencyWorkflowBlockStrategy.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/ConcurrencyWorkflowBlockStrategy.java new file mode 100644 index 00000000..c6edb4fa --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/ConcurrencyWorkflowBlockStrategy.java @@ -0,0 +1,32 @@ +package com.aizuda.easy.retry.server.job.task.support.block.workflow; + +import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; +import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter; +import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowBatchGenerator; +import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext; +import com.aizuda.easy.retry.server.job.task.support.strategy.BlockStrategies.BlockStrategyEnum; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +/** + * @author: shuguang.zhang + * @date : 2023-12-26 + * @since : 2.6.0 + */ +@Component +@RequiredArgsConstructor +public class ConcurrencyWorkflowBlockStrategy extends AbstractWorkflowBlockStrategy { + private final WorkflowBatchGenerator workflowBatchGenerator; + + @Override + protected void doBlock(final WorkflowBlockStrategyContext workflowBlockStrategyContext) { + WorkflowTaskBatchGeneratorContext workflowTaskBatchGeneratorContext = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatchGeneratorContext(workflowBlockStrategyContext); + workflowBatchGenerator.generateJobTaskBatch(workflowTaskBatchGeneratorContext); + } + + @Override + protected BlockStrategyEnum blockStrategyEnum() { + return BlockStrategyEnum.CONCURRENCY; + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/DiscardWorkflowBlockStrategy.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/DiscardWorkflowBlockStrategy.java new file mode 100644 index 00000000..a1a8c575 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/DiscardWorkflowBlockStrategy.java @@ -0,0 +1,34 @@ +package com.aizuda.easy.retry.server.job.task.support.block.workflow; + +import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; +import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter; +import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowBatchGenerator; +import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext; +import com.aizuda.easy.retry.server.job.task.support.strategy.BlockStrategies.BlockStrategyEnum; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +/** + * @author: xiaowoniu + * @date : 2023-12-26 + * @since : 2.6.0 + */ +@Component +@RequiredArgsConstructor +public class DiscardWorkflowBlockStrategy extends AbstractWorkflowBlockStrategy { + private final WorkflowBatchGenerator workflowBatchGenerator; + @Override + protected void doBlock(final WorkflowBlockStrategyContext workflowBlockStrategyContext) { + // 生成状态为取消的工作流批次 + WorkflowTaskBatchGeneratorContext workflowTaskBatchGeneratorContext = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatchGeneratorContext(workflowBlockStrategyContext); + workflowTaskBatchGeneratorContext.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus()); + workflowTaskBatchGeneratorContext.setOperationReason(JobOperationReasonEnum.JOB_DISCARD.getReason()); + workflowBatchGenerator.generateJobTaskBatch(workflowTaskBatchGeneratorContext); + } + + @Override + protected BlockStrategyEnum blockStrategyEnum() { + return BlockStrategyEnum.DISCARD; + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/OverlayWorkflowBlockStrategy.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/OverlayWorkflowBlockStrategy.java new file mode 100644 index 00000000..f201ad4e --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/OverlayWorkflowBlockStrategy.java @@ -0,0 +1,64 @@ +package com.aizuda.easy.retry.server.job.task.support.block.workflow; + +import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; +import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.job.task.support.BlockStrategy; +import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; +import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler; +import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter; +import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowBatchGenerator; +import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext; +import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler; +import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory; +import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext; +import com.aizuda.easy.retry.server.job.task.support.strategy.BlockStrategies; +import com.aizuda.easy.retry.server.job.task.support.strategy.BlockStrategies.BlockStrategyEnum; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.Job; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; +import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import static com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum.NOT_COMPLETE; + +/** + * @author: xiaowoniu + * @date : 2023-12-26 + * @since : 2.6.0 + */ +@Component +@RequiredArgsConstructor +public class OverlayWorkflowBlockStrategy extends AbstractWorkflowBlockStrategy { + + private final WorkflowBatchHandler workflowBatchHandler; + private final WorkflowBatchGenerator workflowBatchGenerator; + + @Override + protected void doBlock(final WorkflowBlockStrategyContext workflowBlockStrategyContext) { + + // 停止当前批次 + workflowBatchHandler.stop(workflowBlockStrategyContext.getWorkflowTaskBatchId(), workflowBlockStrategyContext.getOperationReason()); + + // 重新生成一个批次 + WorkflowTaskBatchGeneratorContext workflowTaskBatchGeneratorContext = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatchGeneratorContext( + workflowBlockStrategyContext); + workflowBatchGenerator.generateJobTaskBatch(workflowTaskBatchGeneratorContext); + + } + + @Override + protected BlockStrategyEnum blockStrategyEnum() { + return BlockStrategyEnum.OVERLAY; + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/WorkflowBlockStrategyContext.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/WorkflowBlockStrategyContext.java new file mode 100644 index 00000000..ac4fb4b4 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/WorkflowBlockStrategyContext.java @@ -0,0 +1,25 @@ +package com.aizuda.easy.retry.server.job.task.support.block.workflow; + +import com.aizuda.easy.retry.server.job.task.support.strategy.BlockStrategies.BlockStrategyContext; +import lombok.Data; +import lombok.EqualsAndHashCode; + +/** + * @author: xiaowoniu + * @date : 2023-12-26 + * @since : 2.6.0 + */ +@EqualsAndHashCode(callSuper = true) +@Data +public class WorkflowBlockStrategyContext extends BlockStrategyContext { + + /** + * 工作流id + */ + private Long workflowId; + + /** + * 工作流任务批次id + */ + private Long workflowTaskBatchId; +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/WorkflowBlockStrategyFactory.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/WorkflowBlockStrategyFactory.java new file mode 100644 index 00000000..779c77f4 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/WorkflowBlockStrategyFactory.java @@ -0,0 +1,28 @@ +package com.aizuda.easy.retry.server.job.task.support.block.workflow; + +import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum; +import com.aizuda.easy.retry.server.job.task.support.BlockStrategy; +import com.aizuda.easy.retry.server.job.task.support.strategy.BlockStrategies.BlockStrategyEnum; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author: xiaowoniu + * @date : 2023-12-26 + * @since : 2.6.0 + */ +public final class WorkflowBlockStrategyFactory { + private static final ConcurrentHashMap CACHE = new ConcurrentHashMap<>(); + + private WorkflowBlockStrategyFactory() { + } + + protected static void registerTaskStop(BlockStrategyEnum blockStrategyEnum, BlockStrategy blockStrategy) { + CACHE.put(blockStrategyEnum, blockStrategy); + } + + public static BlockStrategy getJobTaskStop(Integer blockStrategy) { + return CACHE.get(BlockStrategyEnum.valueOf(blockStrategy)); + } + +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java index 02b270e1..9ae7e3a9 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java @@ -1,7 +1,9 @@ package com.aizuda.easy.retry.server.job.task.support.dispatch; import akka.actor.AbstractActor; +import akka.actor.ActorRef; import cn.hutool.core.lang.Assert; +import cn.hutool.core.util.StrUtil; import com.aizuda.easy.retry.common.core.context.SpringContext; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; @@ -17,6 +19,7 @@ import com.aizuda.easy.retry.server.common.strategy.WaitStrategies; import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO; +import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.easy.retry.server.job.task.support.JobExecutor; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache; @@ -106,6 +109,26 @@ public class JobExecutorActor extends AbstractActor { job.getNamespaceId()))) { taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus(); operationReason = JobOperationReasonEnum.NOT_CLIENT.getReason(); + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCompletion(int status) { + if (Objects.nonNull(taskExecute.getWorkflowNodeId()) && Objects.nonNull(taskExecute.getWorkflowTaskBatchId())) { + // 若是工作流则开启下一个任务 + try { + WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); + taskExecuteDTO.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId()); + taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType()); + taskExecuteDTO.setParentId(taskExecute.getWorkflowNodeId()); + taskExecuteDTO.setResult(StrUtil.EMPTY); + ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); + actorRef.tell(taskExecuteDTO, actorRef); + } catch (Exception e) { + log.error("任务调度执行失败", e); + } + } + } + }); + } // 更新状态 diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java index 54bb7353..eb997e95 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -95,21 +95,27 @@ public class WorkflowExecutorActor extends AbstractActor { return; } + // 添加父节点,为了判断父节点的处理状态 + successors.add(taskExecute.getParentId()); List jobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper() .select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId) .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId()) .in(JobTaskBatch::getWorkflowNodeId, successors) ); - Map jobTaskBatchMap = jobTaskBatchList.stream().collect(Collectors.toMap(JobTaskBatch::getWorkflowNodeId, i -> i)); - List workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper() - .in(WorkflowNode::getId, successors).orderByAsc(WorkflowNode::getPriorityLevel)); + .in(WorkflowNode::getId, successors).orderByAsc(WorkflowNode::getPriorityLevel)); + + Map jobTaskBatchMap = jobTaskBatchList.stream().collect(Collectors.toMap(JobTaskBatch::getWorkflowNodeId, i -> i)); + JobTaskBatch jobTaskBatch = jobTaskBatchMap.get(taskExecute.getParentId()); + if (JobTaskBatchStatusEnum.SUCCESS.getStatus() != jobTaskBatch.getTaskBatchStatus()) { + // 判断是否继续处理,根据失败策略 + } + + List jobs = jobMapper.selectBatchIds(workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet())); Map jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i)); - // 不管什么任务都需要创建一个 job_task_batch记录 保障一个节点执行创建一次,同时可以判断出DAG是否全部执行完成 - // 只会条件节点会使用 Boolean evaluationResult = null; for (WorkflowNode workflowNode : workflowNodes) { diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java index ccc9d4f9..9850e2c6 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java @@ -29,6 +29,7 @@ import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.stereotype.Component; +import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -66,8 +67,12 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor { taskBatchStatus = JobTaskBatchStatusEnum.CANCEL.getStatus(); } else { try { + Map contextMap = new HashMap<>(); // 根据配置的表达式执行 - result = doEval(context.getNodeExpression(), JsonUtil.parseHashMap(context.getResult())); + if (StrUtil.isNotBlank(context.getResult())) { + contextMap = JsonUtil.parseHashMap(context.getResult()); + } + result = doEval(context.getNodeExpression(), contextMap); log.info("执行条件表达式:[{}],参数: [{}] 结果:[{}]", context.getNodeExpression(), context.getResult(), result); } catch (Exception e) { taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus(); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowBatchGenerator.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowBatchGenerator.java index 79b2104b..6c46dc46 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowBatchGenerator.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowBatchGenerator.java @@ -1,6 +1,10 @@ package com.aizuda.easy.retry.server.job.task.support.generator.batch; +import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; +import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowTimerTaskDTO; @@ -14,7 +18,9 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; +import org.springframework.util.CollectionUtils; +import java.util.Optional; import java.util.concurrent.TimeUnit; /** @@ -33,7 +39,23 @@ public class WorkflowBatchGenerator { // 生成任务批次 WorkflowTaskBatch workflowTaskBatch = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatch(context); workflowTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.WAITING.getStatus()); - workflowTaskBatchMapper.insert(workflowTaskBatch); + + // 无执行的节点 + if (CollectionUtils.isEmpty(CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()))) { + workflowTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus()); + workflowTaskBatch.setOperationReason(JobOperationReasonEnum.NOT_CLIENT.getReason()); + } else { + // 生成一个新的任务 + workflowTaskBatch.setTaskBatchStatus(Optional.ofNullable(context.getTaskBatchStatus()).orElse(JobTaskBatchStatusEnum.WAITING.getStatus())); + workflowTaskBatch.setOperationReason(context.getOperationReason()); + } + + Assert.isTrue(1 == workflowTaskBatchMapper.insert(workflowTaskBatch), () -> new EasyRetryServerException("新增调度任务失败. [{}]", context.getWorkflowId())); + + // 非待处理状态无需进入时间轮中 + if (JobTaskBatchStatusEnum.WAITING.getStatus() != workflowTaskBatch.getTaskBatchStatus()) { + return; + } // 开始执行工作流 // 进入时间轮 diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java index cada04b4..c287db4e 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java @@ -3,14 +3,22 @@ package com.aizuda.easy.retry.server.job.task.support.handler; import cn.hutool.core.lang.Assert; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; +import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.common.util.GraphUtils; import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO; +import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; +import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler; +import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory; +import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.Job; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode; import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch; @@ -18,9 +26,18 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.google.common.graph.MutableGraph; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum.COMPLETED; +import static com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum.NOT_COMPLETE; /** * @author xiaowoniu @@ -30,6 +47,7 @@ import java.util.Optional; @Component @RequiredArgsConstructor public class WorkflowBatchHandler { + private final WorkflowTaskBatchMapper workflowTaskBatchMapper; private final WorkflowNodeMapper workflowNodeMapper; private final JobMapper jobMapper; @@ -38,29 +56,103 @@ public class WorkflowBatchHandler { public boolean complete(Long workflowTaskBatchId) throws IOException { return complete(workflowTaskBatchId, null); } + public boolean complete(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) throws IOException { - workflowTaskBatch = Optional.ofNullable(workflowTaskBatch).orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId)); + workflowTaskBatch = Optional.ofNullable(workflowTaskBatch) + .orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId)); Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在")); String flowInfo = workflowTaskBatch.getFlowInfo(); MutableGraph graph = GraphUtils.deserializeJsonToGraph(flowInfo); // 说明没有后继节点了, 此时需要判断整个DAG是否全部执行完成 - long executedTaskCount = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper() - .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId()) - .in(JobTaskBatch::getWorkflowNodeId, graph.nodes()) + List jobTaskBatches = jobTaskBatchMapper.selectList(new LambdaQueryWrapper() + .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId()) + .in(JobTaskBatch::getWorkflowNodeId, graph.nodes()) ); - Long taskNodeCount = workflowNodeMapper.selectCount(new LambdaQueryWrapper() -// .eq(WorkflowNode::getNodeType, 1) // TODO 任务节点 若最后一个节点是条件或者是回调节点 这个地方就有问题 - .in(WorkflowNode::getId, graph.nodes())); - - // TODO 若最后几个节点都是非任务节点,这里直接完成就会有问题 - if (executedTaskCount < taskNodeCount) { + if (CollectionUtils.isEmpty(jobTaskBatches)) { return false; } - handlerTaskBatch(workflowTaskBatchId, JobTaskBatchStatusEnum.SUCCESS.getStatus(), JobOperationReasonEnum.NONE.getReason()); + if (jobTaskBatches.stream().anyMatch( + jobTaskBatch -> JobTaskBatchStatusEnum.NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus()))) { + return false; + } + + List workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper() + .in(WorkflowNode::getId, graph.nodes())); + if (jobTaskBatches.size() < workflowNodes.size()) { + return false; + } + + Map workflowNodeMap = workflowNodes.stream() + .collect(Collectors.toMap(WorkflowNode::getId, workflowNode -> workflowNode)); + + Map> map = jobTaskBatches.stream() + .collect(Collectors.groupingBy(JobTaskBatch::getParentWorkflowNodeId)); + + int taskStatus = JobTaskBatchStatusEnum.SUCCESS.getStatus(); + int operationReason = JobOperationReasonEnum.NONE.getReason(); + for (final JobTaskBatch jobTaskBatch : jobTaskBatches) { + Set predecessors = graph.predecessors(jobTaskBatch.getWorkflowNodeId()); + WorkflowNode workflowNode = workflowNodeMap.get(jobTaskBatch.getWorkflowNodeId()); + // 条件节点是或的关系一个成功就代表成功 + if (WorkflowNodeTypeEnum.CONDITION.getType() == workflowNode.getNodeType()) { + for (final Long predecessor : predecessors) { + List jobTaskBatcheList = map.get(predecessor); + Map statusCountMap = jobTaskBatcheList.stream() + .collect(Collectors.groupingBy(JobTaskBatch::getTaskBatchStatus, Collectors.counting())); + long successCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.SUCCESS.getStatus(), 0L); + long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L); + long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L); + if (successCount > 0) { + break; + } + + if (failCount > 0) { + taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus(); + operationReason = JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason(); + break; + } + + if (stopCount > 0) { + taskStatus = JobTaskBatchStatusEnum.STOP.getStatus(); + operationReason = JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason(); + break; + } + + } + } else { + + for (final Long predecessor : predecessors) { + List jobTaskBatcheList = map.get(predecessor); + Map statusCountMap = jobTaskBatcheList.stream() + .collect(Collectors.groupingBy(JobTaskBatch::getTaskBatchStatus, Collectors.counting())); + long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L); + long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L); if (failCount > 0) { + taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus(); + operationReason = JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason(); + break; + } + + if (stopCount > 0) { + taskStatus = JobTaskBatchStatusEnum.STOP.getStatus(); + operationReason = JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason(); + break; + } + } + } + + + if (taskStatus != JobTaskBatchStatusEnum.SUCCESS.getStatus()) { + break; + } + + } + + handlerTaskBatch(workflowTaskBatchId, taskStatus, operationReason); + return true; } @@ -69,11 +161,51 @@ public class WorkflowBatchHandler { WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch(); jobTaskBatch.setId(workflowTaskBatchId); - jobTaskBatch.setExecutionAt(DateUtils.toNowMilli()); jobTaskBatch.setTaskBatchStatus(taskStatus); jobTaskBatch.setOperationReason(operationReason); Assert.isTrue(1 == workflowTaskBatchMapper.updateById(jobTaskBatch), - () -> new EasyRetryServerException("更新任务失败")); + () -> new EasyRetryServerException("更新任务失败")); } + + public void stop(Long workflowTaskBatchId, Integer operationReason) { + if (Objects.isNull(operationReason) + || operationReason == JobOperationReasonEnum.NONE.getReason()) { + operationReason = JobOperationReasonEnum.JOB_OVERLAY.getReason(); + } + + WorkflowTaskBatch workflowTaskBatch = new WorkflowTaskBatch(); + workflowTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus()); + workflowTaskBatch.setOperationReason(operationReason); + workflowTaskBatch.setId(workflowTaskBatchId); + // 先停止执行中的批次 + Assert.isTrue(1 == workflowTaskBatchMapper.updateById(workflowTaskBatch), + () -> new EasyRetryServerException("停止工作流批次失败. id:[{}]", + workflowTaskBatchId)); + + // 关闭已经触发的任务 + List jobTaskBatches = jobTaskBatchMapper.selectList(new LambdaQueryWrapper() + .in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE) + .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId)); + + List jobs = jobMapper.selectBatchIds( + jobTaskBatches.stream().map(JobTaskBatch::getJobId).collect(Collectors.toSet())); + + Map jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i)); + for (final JobTaskBatch jobTaskBatch : jobTaskBatches) { + + Job job = jobMap.get(jobTaskBatch.getJobId()); + if (Objects.nonNull(job)) { + // 停止任务 + JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(job.getTaskType()); + TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(job); + stopJobContext.setTaskBatchId(jobTaskBatch.getId()); + stopJobContext.setJobOperationReason(JobOperationReasonEnum.JOB_TASK_INTERRUPTED.getReason()); + stopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE); + stopJobContext.setForceStop(Boolean.TRUE); + instanceInterrupt.stop(stopJobContext); + } + + } + } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/RunningWorkflowPrepareHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/RunningWorkflowPrepareHandler.java index eb753e4d..8c72a333 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/RunningWorkflowPrepareHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/RunningWorkflowPrepareHandler.java @@ -1,9 +1,21 @@ package com.aizuda.easy.retry.server.job.task.support.prepare.workflow; +import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.util.JsonUtil; +import com.aizuda.easy.retry.server.common.util.DateUtils; +import com.aizuda.easy.retry.server.job.task.dto.CompleteJobBatchDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO; +import com.aizuda.easy.retry.server.job.task.support.BlockStrategy; +import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; +import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler; +import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter; +import com.aizuda.easy.retry.server.job.task.support.block.workflow.WorkflowBlockStrategyContext; +import com.aizuda.easy.retry.server.job.task.support.block.workflow.WorkflowBlockStrategyFactory; import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler; +import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory; +import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext; +import com.aizuda.easy.retry.server.job.task.support.strategy.BlockStrategies.BlockStrategyEnum; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -20,6 +32,7 @@ import java.util.Objects; @RequiredArgsConstructor @Slf4j public class RunningWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler { + private final WorkflowBatchHandler workflowBatchHandler; @Override @@ -28,18 +41,43 @@ public class RunningWorkflowPrepareHandler extends AbstractWorkflowPrePareHandle } @Override - protected void doHandler(WorkflowTaskPrepareDTO jobPrepareDTO) { - log.info("存在运行中的任务. prepare:[{}]", JsonUtil.toJsonString(jobPrepareDTO)); + protected void doHandler(WorkflowTaskPrepareDTO prepare) { + log.info("存在运行中的任务. prepare:[{}]", JsonUtil.toJsonString(prepare)); - // 1. 若DAG已经支持完成了,由于异常原因导致的没有更新成终态此次进行一次更新操作 try { - workflowBatchHandler.complete(jobPrepareDTO.getWorkflowTaskBatchId()); - } catch (IOException e) { - // TODO 待处理 - } + // 1. 若DAG已经支持完成了,由于异常原因导致的没有更新成终态此次进行一次更新操作 + int blockStrategy = prepare.getBlockStrategy(); + if (workflowBatchHandler.complete(prepare.getWorkflowTaskBatchId())) { + // 开启新的任务 + blockStrategy = BlockStrategyEnum.CONCURRENCY.getBlockStrategy(); + } else { + // 计算超时时间 + long delay = DateUtils.toNowMilli() - prepare.getExecutionAt(); - // 2. 判断DAG是否已经支持超时 - // 3. 支持阻塞策略同JOB逻辑一致 + // 2. 判断DAG是否已经支持超时 + // 计算超时时间,到达超时时间中断任务 + if (delay > DateUtils.toEpochMilli(prepare.getExecutorTimeout())) { + log.info("任务执行超时.workflowTaskBatchId:[{}] delay:[{}] executorTimeout:[{}]", + prepare.getWorkflowTaskBatchId(), delay, DateUtils.toEpochMilli(prepare.getExecutorTimeout())); + // 超时停止任务 + workflowBatchHandler.stop(prepare.getWorkflowTaskBatchId(), JobOperationReasonEnum.EXECUTE_TIMEOUT.getReason()); + } + } + + // 仅是超时检测的,不执行阻塞策略 + if (prepare.isOnlyTimeoutCheck()) { + return; + } + + // 3. 支持阻塞策略同JOB逻辑一致 + BlockStrategy blockStrategyInterface = WorkflowBlockStrategyFactory.getJobTaskStop(blockStrategy); + WorkflowBlockStrategyContext workflowBlockStrategyContext = WorkflowTaskConverter.INSTANCE.toWorkflowBlockStrategyContext( + prepare); + blockStrategyInterface.block(workflowBlockStrategyContext); + } catch (IOException e) { + log.error("更新任务状态失败. prepare:[{}]", JsonUtil.toJsonString(prepare), e); + + } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/stop/JobTaskStopFactory.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/stop/JobTaskStopFactory.java index 39045a42..c241e91d 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/stop/JobTaskStopFactory.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/stop/JobTaskStopFactory.java @@ -10,10 +10,13 @@ import java.util.concurrent.ConcurrentHashMap; * @date 2023-10-02 13:04:09 * @since 2.4.0 */ -public class JobTaskStopFactory { +public final class JobTaskStopFactory { private static final ConcurrentHashMap CACHE = new ConcurrentHashMap<>(); + private JobTaskStopFactory() { + } + public static void registerTaskStop(TaskTypeEnum taskInstanceType, JobTaskStopHandler interrupt) { CACHE.put(taskInstanceType, interrupt); } @@ -21,4 +24,8 @@ public class JobTaskStopFactory { public static JobTaskStopHandler getJobTaskStop(Integer type) { return CACHE.get(TaskTypeEnum.valueOf(type)); } + + public static JobTaskStopFactory createJobTaskStopFactory() { + return new JobTaskStopFactory(); + } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/strategy/BlockStrategies.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/strategy/BlockStrategies.java index c3dbde31..d2356351 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/strategy/BlockStrategies.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/strategy/BlockStrategies.java @@ -46,6 +46,16 @@ public class BlockStrategies { throw new EasyRetryServerException("不符合的阻塞策略. blockStrategy:[{}]", blockStrategy); } + public static BlockStrategyEnum valueOf(int blockStrategy) { + for (final BlockStrategyEnum value : BlockStrategyEnum.values()) { + if (value.blockStrategy == blockStrategy) { + return value; + } + } + + throw new EasyRetryServerException("不符合的阻塞策略. blockStrategy:[{}]", blockStrategy); + } + } @Data