From fe9048ce8a642b5759530b8199548609ca69a71a Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Sat, 13 Jan 2024 23:16:51 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.6.0=201.=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E5=9B=9E=E8=B0=83=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../workflow/AbstractWorkflowExecutor.java | 9 +-- .../workflow/CallbackWorkflowExecutor.java | 14 ++++- .../support/handler/WorkflowBatchHandler.java | 9 +-- .../RunningWorkflowPrepareHandler.java | 55 +++++++++---------- 4 files changed, 41 insertions(+), 46 deletions(-) diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java index 033e62929..427950c42 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java @@ -57,7 +57,7 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init @Autowired private JobTaskBatchGenerator jobTaskBatchGenerator; @Autowired - private WorkflowBatchHandler workflowBatchHandler; + protected WorkflowBatchHandler workflowBatchHandler; @Autowired private JobTaskMapper jobTaskMapper; @Autowired @@ -107,12 +107,7 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init generatorContext.setJobId(context.getJobId()); generatorContext.setTaskExecutorScene(context.getTaskExecutorScene()); jobTaskBatchGenerator.generateJobTaskBatch(generatorContext); - try { - workflowBatchHandler.complete(context.getWorkflowTaskBatchId()); - } catch (IOException e) { - throw new EasyRetryServerException("工作流完成处理异常", e); - } - + workflowBatchHandler.complete(context.getWorkflowTaskBatchId()); return false; } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java index 1fa388344..e72882f56 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java @@ -14,8 +14,10 @@ import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.client.RequestInterceptor; import com.aizuda.easy.retry.server.common.dto.CallbackConfig; import com.aizuda.easy.retry.server.common.enums.ContentTypeEnum; +import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO; import com.aizuda.easy.retry.server.job.task.dto.LogMetaDTO; +import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter; import com.aizuda.easy.retry.server.model.dto.CallbackParamsDTO; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; @@ -32,6 +34,7 @@ import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -108,7 +111,6 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor { context.setJobTaskStatus(jobTaskStatus); context.setEvaluationResult(result); context.setLogMessage(message); - } @@ -129,6 +131,14 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor { logMetaDTO.setTaskBatchId(jobTaskBatch.getId()); logMetaDTO.setJobId(SystemConstants.CALLBACK_JOB_ID); logMetaDTO.setTaskId(jobTask.getId()); - EasyRetryLog.REMOTE.info("workflowNodeId:[{}] 回调成功. <|>{}<|>", context.getWorkflowNodeId(), logMetaDTO); + if (jobTaskBatch.getTaskBatchStatus() == JobTaskStatusEnum.SUCCESS.getStatus()) { + EasyRetryLog.REMOTE.info("workflowNodeId:[{}] 回调成功. <|>{}<|>", context.getWorkflowNodeId(), logMetaDTO); + } else { + EasyRetryLog.REMOTE.info("workflowNodeId:[{}] 回调失败. 失败原因:[{}] <|>{}<|>", context.getWorkflowNodeId(), + context.getLogMessage(), logMetaDTO); + + // 尝试完成任务 + workflowBatchHandler.complete(context.getWorkflowTaskBatchId()); + } } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java index 56937ebc6..47a49362f 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java @@ -52,15 +52,14 @@ import static com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum.NOT public class WorkflowBatchHandler { private final WorkflowTaskBatchMapper workflowTaskBatchMapper; - private final WorkflowNodeMapper workflowNodeMapper; private final JobMapper jobMapper; private final JobTaskBatchMapper jobTaskBatchMapper; - public boolean complete(Long workflowTaskBatchId) throws IOException { + public boolean complete(Long workflowTaskBatchId) { return complete(workflowTaskBatchId, null); } - public boolean complete(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) throws IOException { + public boolean complete(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) { workflowTaskBatch = Optional.ofNullable(workflowTaskBatch) .orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId)); Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在")); @@ -83,10 +82,6 @@ public class WorkflowBatchHandler { return false; } - List workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper() - .eq(WorkflowNode::getWorkflowNodeStatus, StatusEnum.YES.getStatus()) - .in(WorkflowNode::getId, graph.nodes())); - Map> currentWorkflowNodeMap = jobTaskBatches.stream() .collect(Collectors.groupingBy(JobTaskBatch::getWorkflowNodeId)); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/RunningWorkflowPrepareHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/RunningWorkflowPrepareHandler.java index a8280573f..917b1a438 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/RunningWorkflowPrepareHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/RunningWorkflowPrepareHandler.java @@ -39,41 +39,36 @@ public class RunningWorkflowPrepareHandler extends AbstractWorkflowPrePareHandle protected void doHandler(WorkflowTaskPrepareDTO prepare) { log.info("存在运行中的任务. prepare:[{}]", JsonUtil.toJsonString(prepare)); - try { - // 1. 若DAG已经支持完成了,由于异常原因导致的没有更新成终态此次进行一次更新操作 - int blockStrategy = prepare.getBlockStrategy(); - if (workflowBatchHandler.complete(prepare.getWorkflowTaskBatchId())) { - // 开启新的任务 - blockStrategy = BlockStrategyEnum.CONCURRENCY.getBlockStrategy(); - } else { - // 计算超时时间 - long delay = DateUtils.toNowMilli() - prepare.getExecutionAt(); - // 2. 判断DAG是否已经支持超时 - // 计算超时时间,到达超时时间中断任务 - if (delay > DateUtils.toEpochMilli(prepare.getExecutorTimeout())) { - log.info("任务执行超时.workflowTaskBatchId:[{}] delay:[{}] executorTimeout:[{}]", + // 1. 若DAG已经支持完成了,由于异常原因导致的没有更新成终态此次进行一次更新操作 + int blockStrategy = prepare.getBlockStrategy(); + if (workflowBatchHandler.complete(prepare.getWorkflowTaskBatchId())) { + // 开启新的任务 + blockStrategy = BlockStrategyEnum.CONCURRENCY.getBlockStrategy(); + } else { + // 计算超时时间 + long delay = DateUtils.toNowMilli() - prepare.getExecutionAt(); + + // 2. 判断DAG是否已经支持超时 + // 计算超时时间,到达超时时间中断任务 + if (delay > DateUtils.toEpochMilli(prepare.getExecutorTimeout())) { + log.info("任务执行超时.workflowTaskBatchId:[{}] delay:[{}] executorTimeout:[{}]", prepare.getWorkflowTaskBatchId(), delay, DateUtils.toEpochMilli(prepare.getExecutorTimeout())); - // 超时停止任务 - workflowBatchHandler.stop(prepare.getWorkflowTaskBatchId(), JobOperationReasonEnum.EXECUTE_TIMEOUT.getReason()); - } + // 超时停止任务 + workflowBatchHandler.stop(prepare.getWorkflowTaskBatchId(), JobOperationReasonEnum.EXECUTE_TIMEOUT.getReason()); } - - // 仅是超时检测的,不执行阻塞策略 - if (prepare.isOnlyTimeoutCheck()) { - return; - } - - // 3. 支持阻塞策略同JOB逻辑一致 - BlockStrategy blockStrategyInterface = WorkflowBlockStrategyFactory.getJobTaskStop(blockStrategy); - WorkflowBlockStrategyContext workflowBlockStrategyContext = WorkflowTaskConverter.INSTANCE.toWorkflowBlockStrategyContext( - prepare); - blockStrategyInterface.block(workflowBlockStrategyContext); - } catch (IOException e) { - log.error("更新任务状态失败. prepare:[{}]", JsonUtil.toJsonString(prepare), e); - } + // 仅是超时检测的,不执行阻塞策略 + if (prepare.isOnlyTimeoutCheck()) { + return; + } + + // 3. 支持阻塞策略同JOB逻辑一致 + BlockStrategy blockStrategyInterface = WorkflowBlockStrategyFactory.getJobTaskStop(blockStrategy); + WorkflowBlockStrategyContext workflowBlockStrategyContext = WorkflowTaskConverter.INSTANCE.toWorkflowBlockStrategyContext( + prepare); + blockStrategyInterface.block(workflowBlockStrategyContext); } }