From 0d9f885cc5dd0e0c48db58e5dd445cb814108064 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Fri, 26 Jan 2024 18:14:52 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.6.0=201.=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E9=87=8D=E8=AF=95=E5=AF=BC=E8=87=B4=E4=BA=8B=E5=8A=A1=E8=B6=85?= =?UTF-8?q?=E6=97=B6=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dispatch/JobExecutorResultActor.java | 39 +++++++-------- .../dispatch/WorkflowExecutorActor.java | 5 +- .../batch/JobTaskBatchGenerator.java | 48 ++++++++++++------- .../handler/ReportLogHttpRequestHandler.java | 1 - 4 files changed, 52 insertions(+), 41 deletions(-) diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java index e13fde35b..edc3b6c3f 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java @@ -62,29 +62,24 @@ public class JobExecutorResultActor extends AbstractActor { return receiveBuilder().match(JobExecutorResultDTO.class, result -> { log.info("更新任务状态. 参数:[{}]", JsonUtil.toJsonString(result)); try { - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - @Override - protected void doInTransactionWithoutResult(final TransactionStatus status) { - JobTask jobTask = new JobTask(); - jobTask.setTaskStatus(result.getTaskStatus()); - if (Objects.nonNull(result.getResult())) { - jobTask.setResultMessage(JsonUtil.toJsonString(result.getResult())); - } + JobTask jobTask = new JobTask(); + jobTask.setTaskStatus(result.getTaskStatus()); + if (Objects.nonNull(result.getResult())) { + jobTask.setResultMessage(JsonUtil.toJsonString(result.getResult())); + } - Assert.isTrue(1 == jobTaskMapper.update(jobTask, - new LambdaUpdateWrapper().eq(JobTask::getId, result.getTaskId())), - () -> new EasyRetryServerException("更新任务实例失败")); - // 先尝试完成,若已完成则不需要通过获取分布式锁来完成 - boolean tryCompleteAndStop = tryCompleteAndStop(result); - if (!tryCompleteAndStop) { - // 存在并发问题 - distributedLockHandler.lockWithDisposableAndRetry(() -> { - tryCompleteAndStop(result); - }, MessageFormat.format(KEY, result.getTaskBatchId(), - result.getJobId()), Duration.ofSeconds(3), Duration.ofSeconds(1), 6); - } - } - }); + Assert.isTrue(1 == jobTaskMapper.update(jobTask, + new LambdaUpdateWrapper().eq(JobTask::getId, result.getTaskId())), + () -> new EasyRetryServerException("更新任务实例失败")); + // 先尝试完成,若已完成则不需要通过获取分布式锁来完成 + boolean tryCompleteAndStop = tryCompleteAndStop(result); + if (!tryCompleteAndStop) { + // 存在并发问题 + distributedLockHandler.lockWithDisposableAndRetry(() -> { + tryCompleteAndStop(result); + }, MessageFormat.format(KEY, result.getTaskBatchId(), + result.getJobId()), Duration.ofSeconds(2), Duration.ofSeconds(1), 3); + } } catch (Exception e) { EasyRetryLog.LOCAL.error(" job executor result exception. [{}]", result, e); } finally { diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java index 578acc116..9550f58ea 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support.dispatch; import akka.actor.AbstractActor; import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.enums.FailStrategyEnum; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; @@ -80,7 +81,9 @@ public class WorkflowExecutorActor extends AbstractActor { WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectById(taskExecute.getWorkflowTaskBatchId()); Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在")); - handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.RUNNING.getStatus(), JobOperationReasonEnum.NONE.getReason()); + if (SystemConstants.ROOT.equals(taskExecute.getParentId()) && JobTaskBatchStatusEnum.WAITING.getStatus() == workflowTaskBatch.getTaskBatchStatus()) { + handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.RUNNING.getStatus(), JobOperationReasonEnum.NONE.getReason()); + } // 获取DAG图 String flowInfo = workflowTaskBatch.getFlowInfo(); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java index 0d428288a..4e1849cca 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java @@ -80,24 +80,18 @@ public class JobTaskBatchGenerator { // 非待处理状态无需进入时间轮中 if (JobTaskBatchStatusEnum.WAITING.getStatus() != jobTaskBatch.getTaskBatchStatus()) { - TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { - @Override - public void afterCompletion(int status) { - if (Objects.nonNull(context.getWorkflowNodeId()) && Objects.nonNull(context.getWorkflowTaskBatchId())) { - // 若是工作流则开启下一个任务 - try { - WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); - taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); - taskExecuteDTO.setTaskExecutorScene(context.getTaskExecutorScene()); - taskExecuteDTO.setParentId(context.getWorkflowNodeId()); - ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); - actorRef.tell(taskExecuteDTO, actorRef); - } catch (Exception e) { - log.error("任务调度执行失败", e); - } + if (TransactionSynchronizationManager.isActualTransactionActive()) { + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCompletion(int status) { + openNextNode(context); } - } - }); + }); + } else { + openNextNode(context); + } + + return jobTaskBatch; } @@ -115,4 +109,24 @@ public class JobTaskBatchGenerator { return jobTaskBatch; } + /** + * 工作流开启下一个节点 + * @param context + */ + private static void openNextNode(final JobTaskBatchGeneratorContext context) { + if (Objects.nonNull(context.getWorkflowNodeId()) && Objects.nonNull(context.getWorkflowTaskBatchId())) { + // 若是工作流则开启下一个任务 + try { + WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); + taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); + taskExecuteDTO.setTaskExecutorScene(context.getTaskExecutorScene()); + taskExecuteDTO.setParentId(context.getWorkflowNodeId()); + ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); + actorRef.tell(taskExecuteDTO, actorRef); + } catch (Exception e) { + log.error("任务调度执行失败", e); + } + } + } + } diff --git a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/server/handler/ReportLogHttpRequestHandler.java b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/server/handler/ReportLogHttpRequestHandler.java index 3fdb59329..1702ce28a 100644 --- a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/server/handler/ReportLogHttpRequestHandler.java +++ b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/server/handler/ReportLogHttpRequestHandler.java @@ -50,7 +50,6 @@ public class ReportLogHttpRequestHandler extends PostHttpRequestHandler { } @Override - @Transactional public String doHandler(String content, UrlQuery urlQuery, HttpHeaders headers) { EasyRetryLog.LOCAL.info("Begin Handler Log Report Data. [{}]", content);