From 580686dd6bee70915d01f28bb9cbef490dc77bbb Mon Sep 17 00:00:00 2001 From: srzou Date: Thu, 19 Sep 2024 22:04:22 +0800 Subject: [PATCH] =?UTF-8?q?fix:(1.2.0-beta2):=20=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1=E9=87=8D=E8=AF=95=E9=97=B4?= =?UTF-8?q?=E9=9A=94=E4=B8=8D=E7=94=9F=E6=95=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../AbstractClientCallbackHandler.java | 42 +++++++++---------- .../task/support/timer/RetryJobTimerTask.java | 37 ++++++++++++++++ 2 files changed, 57 insertions(+), 22 deletions(-) create mode 100644 snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/RetryJobTimerTask.java diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java index 8d132db02..7fd61c16c 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java @@ -1,14 +1,14 @@ package com.aizuda.snailjob.server.job.task.support.callback; -import akka.actor.ActorRef; import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; -import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.util.ClientInfoUtils; import com.aizuda.snailjob.server.job.task.dto.RealJobExecutorDTO; import com.aizuda.snailjob.server.job.task.enums.JobRetrySceneEnum; import com.aizuda.snailjob.server.job.task.support.ClientCallbackHandler; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; +import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel; +import com.aizuda.snailjob.server.job.task.support.timer.RetryJobTimerTask; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; @@ -22,6 +22,7 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.transaction.annotation.Transactional; +import java.time.Duration; import java.util.Objects; /** @@ -45,28 +46,25 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan // 判定是否需要重试 boolean needRetry = isNeedRetry(context); - if (needRetry) { - // 更新重试次数 - if (updateRetryCount(context)) { - Job job = context.getJob(); - JobTask jobTask = context.getJobTask(); - RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO( + if (needRetry && updateRetryCount(context)) { + Job job = context.getJob(); + JobTask jobTask = context.getJobTask(); + RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO( JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); - realJobExecutor.setClientId(ClientInfoUtils.clientId(context.getClientInfo())); - realJobExecutor.setWorkflowNodeId(context.getWorkflowNodeId()); - realJobExecutor.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); - realJobExecutor.setRetryCount(jobTask.getRetryCount() + 1); - realJobExecutor.setRetry(Boolean.TRUE); - realJobExecutor.setRetryScene(context.getRetryScene()); - realJobExecutor.setTaskName(jobTask.getTaskName()); - // 这里统一收口传递上下文 - if (StrUtil.isBlank(realJobExecutor.getWfContext())) { - realJobExecutor.setWfContext(getWfContext(realJobExecutor.getWorkflowTaskBatchId())); - } - ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); - actorRef.tell(realJobExecutor, actorRef); - return; + realJobExecutor.setClientId(ClientInfoUtils.clientId(context.getClientInfo())); + realJobExecutor.setWorkflowNodeId(context.getWorkflowNodeId()); + realJobExecutor.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); + realJobExecutor.setRetryCount(jobTask.getRetryCount() + 1); + realJobExecutor.setRetry(Boolean.TRUE); + realJobExecutor.setRetryScene(context.getRetryScene()); + realJobExecutor.setTaskName(jobTask.getTaskName()); + // 这里统一收口传递上下文 + if (StrUtil.isBlank(realJobExecutor.getWfContext())) { + realJobExecutor.setWfContext(getWfContext(realJobExecutor.getWorkflowTaskBatchId())); } + // 注册重试任务重试间隔时间轮 + JobTimerWheel.registerWithJob(() -> new RetryJobTimerTask(realJobExecutor), Duration.ofSeconds(job.getRetryInterval())); + return; } // 不需要重试执行回调 diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/RetryJobTimerTask.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/RetryJobTimerTask.java new file mode 100644 index 000000000..86301fed0 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/RetryJobTimerTask.java @@ -0,0 +1,37 @@ +package com.aizuda.snailjob.server.job.task.support.timer; + +import akka.actor.ActorRef; +import com.aizuda.snailjob.server.common.TimerTask; +import com.aizuda.snailjob.server.common.akka.ActorGenerator; +import com.aizuda.snailjob.server.job.task.dto.RealJobExecutorDTO; +import io.netty.util.Timeout; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.text.MessageFormat; +import java.time.LocalDateTime; + +@AllArgsConstructor +@Slf4j +public class RetryJobTimerTask implements TimerTask { + public static final String IDEMPOTENT_KEY_PREFIX = "retry_job_{0}"; + private RealJobExecutorDTO jobExecutorDTO; + + @Override + public void run(final Timeout timeout) throws Exception { + // 执行任务调度 + log.debug("开始执行重试任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobExecutorDTO.getTaskBatchId()); + + try { + ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); + actorRef.tell(jobExecutorDTO, actorRef); + } catch (Exception e) { + log.error("重试任务调度执行失败", e); + } + } + + @Override + public String idempotentKey() { + return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, jobExecutorDTO.getTaskBatchId()); + } +}