From 0440e127cae6bf3e581883f65585e74c5ccbe01f Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Sun, 21 Jan 2024 18:18:06 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.6.0=201.=20=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E5=B9=BF=E6=92=AD=E6=A8=A1=E5=BC=8F=E5=B9=B6=E5=8F=91=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../job/task/support/JobTaskConverter.java | 2 + .../block/job/BlockStrategyContext.java | 16 +++++++ .../dispatch/JobExecutorResultActor.java | 46 ++++++++++--------- .../support/dispatch/JobTaskPrepareActor.java | 6 ++- .../executor/job/RequestClientActor.java | 11 ++--- .../batch/JobTaskBatchGenerator.java | 38 +++++++-------- 6 files changed, 71 insertions(+), 48 deletions(-) diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java index 7587de1a..4adb999f 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java @@ -107,6 +107,8 @@ public interface JobTaskConverter { ) JobExecutorResultDTO toJobExecutorResultDTO(JobTask jobTask); + JobExecutorResultDTO toJobExecutorResultDTO(RealJobExecutorDTO realJobExecutorDTO); + RealStopTaskInstanceDTO toRealStopTaskInstanceDTO(TaskStopJobContext context); List toJobPartitionTasks(List jobs); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/job/BlockStrategyContext.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/job/BlockStrategyContext.java index dd72c635..39d2e0f5 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/job/BlockStrategyContext.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/job/BlockStrategyContext.java @@ -35,4 +35,20 @@ public class BlockStrategyContext { */ private Integer taskExecutorScene; + /** + * 工作流任务批次id + */ + private Long workflowTaskBatchId; + + /** + * 工作流节点id + */ + private Long workflowNodeId; + + /** + * 工作流父节点id + */ + private Long parentWorkflowNodeId; + + } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java index 306d94f5..1bb4df8e 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java @@ -16,6 +16,8 @@ import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO; import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO; import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler; +import com.aizuda.easy.retry.server.job.task.support.LockExecutor; +import com.aizuda.easy.retry.server.job.task.support.handler.DistributedLockHandler; import com.aizuda.easy.retry.server.job.task.support.handler.JobTaskBatchHandler; import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory; import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext; @@ -31,6 +33,8 @@ import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionTemplate; +import java.text.MessageFormat; +import java.time.Duration; import java.time.LocalDateTime; import java.util.Objects; @@ -43,13 +47,15 @@ import java.util.Objects; @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) @Slf4j public class JobExecutorResultActor extends AbstractActor { - + private static final String KEY = "job_complete_{0}_{1}"; @Autowired private JobTaskMapper jobTaskMapper; @Autowired private TransactionTemplate transactionTemplate; @Autowired private JobTaskBatchHandler jobTaskBatchHandler; + @Autowired + private DistributedLockHandler distributedLockHandler; @Override public Receive createReceive() { @@ -69,30 +75,26 @@ public class JobExecutorResultActor extends AbstractActor { new LambdaUpdateWrapper().eq(JobTask::getId, result.getTaskId())), ()-> new EasyRetryServerException("更新任务实例失败")); - - - // 更新批次上的状态 - CompleteJobBatchDTO completeJobBatchDTO = JobTaskConverter.INSTANCE.toCompleteJobBatchDTO(result); - boolean complete = jobTaskBatchHandler.complete(completeJobBatchDTO); - if (complete) { - // 尝试停止任务 - // 若是集群任务则客户端会主动关闭 - if (result.getTaskType() != JobTaskTypeEnum.CLUSTER.getType()) { - JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(result.getTaskType()); - TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(result); - stopJobContext.setNeedUpdateTaskStatus(Boolean.FALSE); - stopJobContext.setForceStop(Boolean.TRUE); - instanceInterrupt.stop(stopJobContext); + // 存在并发问题 + distributedLockHandler.lockWithDisposableAndRetry(() -> { + // 更新批次上的状态 + CompleteJobBatchDTO completeJobBatchDTO = JobTaskConverter.INSTANCE.toCompleteJobBatchDTO(result); + boolean complete = jobTaskBatchHandler.complete(completeJobBatchDTO); + if (complete) { + // 尝试停止任务 + // 若是集群任务则客户端会主动关闭 + if (result.getTaskType() != JobTaskTypeEnum.CLUSTER.getType()) { + JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(result.getTaskType()); + TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(result); + stopJobContext.setNeedUpdateTaskStatus(Boolean.FALSE); + stopJobContext.setForceStop(Boolean.TRUE); + instanceInterrupt.stop(stopJobContext); + } } - } + }, MessageFormat.format(KEY, result.getTaskBatchId(), result.getJobId()), Duration.ofSeconds(3), Duration.ofSeconds(1), 3); + } }); - -// LogMetaDTO logMetaDTO = JobTaskConverter.INSTANCE.toJobLogDTO(result); -// // 防止客户端日志还未上报完成,导致日志时序错误 -// logMetaDTO.setTimestamp(DateUtils.toEpochMilli(LocalDateTime.now().plusHours(1))); -// EasyRetryLog.REMOTE.info("taskId:[{}] 任务执行成功. <|>{}<|>", logMetaDTO.getTaskId(), logMetaDTO); - } catch (Exception e) { EasyRetryLog.LOCAL.error(" job executor result exception. [{}]", result, e); } finally { diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobTaskPrepareActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobTaskPrepareActor.java index c2e827b2..842a5c27 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobTaskPrepareActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobTaskPrepareActor.java @@ -19,6 +19,7 @@ import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.util.List; + import static com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum.NOT_COMPLETE; /** @@ -60,6 +61,9 @@ public class JobTaskPrepareActor extends AbstractActor { prepare.getTaskExecutorScene()); if (TaskTypeEnum.WORKFLOW.getType().equals(jobTaskExecutorSceneEnum.getTaskType().getType())) { queryWrapper.eq(JobTaskBatch::getWorkflowNodeId, prepare.getWorkflowNodeId()); + queryWrapper.eq(JobTaskBatch::getTaskType, TaskTypeEnum.WORKFLOW.getType()); + } else { + queryWrapper.eq(JobTaskBatch::getTaskType, TaskTypeEnum.JOB.getType()); } List notCompleteJobTaskBatchList = jobTaskBatchMapper @@ -75,7 +79,7 @@ public class JobTaskPrepareActor extends AbstractActor { for (JobTaskBatch jobTaskBatch : notCompleteJobTaskBatchList) { prepare.setExecutionAt(jobTaskBatch.getExecutionAt()); prepare.setTaskBatchId(jobTaskBatch.getId()); - prepare.setWorkflowTaskBatchId(jobTaskBatch.getWorkflowTaskBatchId()); + prepare.setWorkflowTaskBatchId(prepare.getWorkflowTaskBatchId()); prepare.setWorkflowNodeId(jobTaskBatch.getWorkflowNodeId()); prepare.setOnlyTimeoutCheck(onlyTimeoutCheck); for (JobPrePareHandler prePareHandler : prePareHandlers) { diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/job/RequestClientActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/job/RequestClientActor.java index a93a5098..24fcc1ba 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/job/RequestClientActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/job/RequestClientActor.java @@ -88,12 +88,14 @@ public class RequestClientActor extends AbstractActor { if (e.getClass().isAssignableFrom(RetryException.class)) { RetryException re = (RetryException) e; throwable = re.getLastFailedAttempt().getExceptionCause(); - taskExecuteFailure(realJobExecutorDTO, throwable.getMessage()); } LogMetaDTO logMetaDTO = JobTaskConverter.INSTANCE.toJobLogDTO(realJobExecutorDTO); logMetaDTO.setTimestamp( DateUtils.toNowMilli()); - EasyRetryLog.REMOTE.error("taskId:[{}] 任务调度成功. <|>{}<|>", logMetaDTO.getTaskId(), logMetaDTO, throwable); + EasyRetryLog.REMOTE.error("taskId:[{}] 任务调度失败. <|>{}<|>", logMetaDTO.getTaskId(), logMetaDTO, throwable); + + taskExecuteFailure(realJobExecutorDTO, throwable.getMessage()); + } } @@ -135,10 +137,7 @@ public class RequestClientActor extends AbstractActor { private static void taskExecuteFailure(RealJobExecutorDTO realJobExecutorDTO, String message) { ActorRef actorRef = ActorGenerator.jobTaskExecutorResultActor(); - JobExecutorResultDTO jobExecutorResultDTO = new JobExecutorResultDTO(); - jobExecutorResultDTO.setTaskId(realJobExecutorDTO.getTaskId()); - jobExecutorResultDTO.setJobId(realJobExecutorDTO.getJobId()); - jobExecutorResultDTO.setTaskBatchId(realJobExecutorDTO.getTaskBatchId()); + JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(realJobExecutorDTO); jobExecutorResultDTO.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus()); jobExecutorResultDTO.setMessage(message); actorRef.tell(jobExecutorResultDTO, actorRef); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java index 7f0de996..0d428288 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java @@ -60,25 +60,6 @@ public class JobTaskBatchGenerator { jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus()); jobTaskBatch.setOperationReason(JobOperationReasonEnum.NOT_CLIENT.getReason()); - TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { - @Override - public void afterCompletion(int status) { - if (Objects.nonNull(context.getWorkflowNodeId()) && Objects.nonNull(context.getWorkflowTaskBatchId())) { - // 若是工作流则开启下一个任务 - try { - WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); - taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); - taskExecuteDTO.setTaskExecutorScene(context.getTaskExecutorScene()); - taskExecuteDTO.setParentId(context.getWorkflowNodeId()); - ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); - actorRef.tell(taskExecuteDTO, actorRef); - } catch (Exception e) { - log.error("任务调度执行失败", e); - } - } - } - }); - } else { // 生成一个新的任务 jobTaskBatch.setTaskBatchStatus(Optional.ofNullable(context.getTaskBatchStatus()).orElse(JobTaskBatchStatusEnum.WAITING.getStatus())); @@ -98,6 +79,25 @@ public class JobTaskBatchGenerator { // 非待处理状态无需进入时间轮中 if (JobTaskBatchStatusEnum.WAITING.getStatus() != jobTaskBatch.getTaskBatchStatus()) { + + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCompletion(int status) { + if (Objects.nonNull(context.getWorkflowNodeId()) && Objects.nonNull(context.getWorkflowTaskBatchId())) { + // 若是工作流则开启下一个任务 + try { + WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); + taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); + taskExecuteDTO.setTaskExecutorScene(context.getTaskExecutorScene()); + taskExecuteDTO.setParentId(context.getWorkflowNodeId()); + ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); + actorRef.tell(taskExecuteDTO, actorRef); + } catch (Exception e) { + log.error("任务调度执行失败", e); + } + } + } + }); return jobTaskBatch; }