fix:(1.2.0-beta2): 修复定时任务重试间隔不生效

This commit is contained in:
srzou 2024-09-19 22:04:22 +08:00
parent 216d26cf2d
commit 580686dd6b
2 changed files with 57 additions and 22 deletions

View File

@ -1,14 +1,14 @@
package com.aizuda.snailjob.server.job.task.support.callback;
import akka.actor.ActorRef;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
import com.aizuda.snailjob.server.job.task.dto.RealJobExecutorDTO;
import com.aizuda.snailjob.server.job.task.enums.JobRetrySceneEnum;
import com.aizuda.snailjob.server.job.task.support.ClientCallbackHandler;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel;
import com.aizuda.snailjob.server.job.task.support.timer.RetryJobTimerTask;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
@ -22,6 +22,7 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import java.time.Duration;
import java.util.Objects;
/**
@ -45,28 +46,25 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan
// 判定是否需要重试
boolean needRetry = isNeedRetry(context);
if (needRetry) {
// 更新重试次数
if (updateRetryCount(context)) {
Job job = context.getJob();
JobTask jobTask = context.getJobTask();
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(
if (needRetry && updateRetryCount(context)) {
Job job = context.getJob();
JobTask jobTask = context.getJobTask();
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(
JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask);
realJobExecutor.setClientId(ClientInfoUtils.clientId(context.getClientInfo()));
realJobExecutor.setWorkflowNodeId(context.getWorkflowNodeId());
realJobExecutor.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
realJobExecutor.setRetryCount(jobTask.getRetryCount() + 1);
realJobExecutor.setRetry(Boolean.TRUE);
realJobExecutor.setRetryScene(context.getRetryScene());
realJobExecutor.setTaskName(jobTask.getTaskName());
// 这里统一收口传递上下文
if (StrUtil.isBlank(realJobExecutor.getWfContext())) {
realJobExecutor.setWfContext(getWfContext(realJobExecutor.getWorkflowTaskBatchId()));
}
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
actorRef.tell(realJobExecutor, actorRef);
return;
realJobExecutor.setClientId(ClientInfoUtils.clientId(context.getClientInfo()));
realJobExecutor.setWorkflowNodeId(context.getWorkflowNodeId());
realJobExecutor.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
realJobExecutor.setRetryCount(jobTask.getRetryCount() + 1);
realJobExecutor.setRetry(Boolean.TRUE);
realJobExecutor.setRetryScene(context.getRetryScene());
realJobExecutor.setTaskName(jobTask.getTaskName());
// 这里统一收口传递上下文
if (StrUtil.isBlank(realJobExecutor.getWfContext())) {
realJobExecutor.setWfContext(getWfContext(realJobExecutor.getWorkflowTaskBatchId()));
}
// 注册重试任务重试间隔时间轮
JobTimerWheel.registerWithJob(() -> new RetryJobTimerTask(realJobExecutor), Duration.ofSeconds(job.getRetryInterval()));
return;
}
// 不需要重试执行回调

View File

@ -0,0 +1,37 @@
package com.aizuda.snailjob.server.job.task.support.timer;
import akka.actor.ActorRef;
import com.aizuda.snailjob.server.common.TimerTask;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.job.task.dto.RealJobExecutorDTO;
import io.netty.util.Timeout;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.text.MessageFormat;
import java.time.LocalDateTime;
@AllArgsConstructor
@Slf4j
public class RetryJobTimerTask implements TimerTask<String> {
public static final String IDEMPOTENT_KEY_PREFIX = "retry_job_{0}";
private RealJobExecutorDTO jobExecutorDTO;
@Override
public void run(final Timeout timeout) throws Exception {
// 执行任务调度
log.debug("开始执行重试任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobExecutorDTO.getTaskBatchId());
try {
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
actorRef.tell(jobExecutorDTO, actorRef);
} catch (Exception e) {
log.error("重试任务调度执行失败", e);
}
}
@Override
public String idempotentKey() {
return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, jobExecutorDTO.getTaskBatchId());
}
}