fix:(1.2.0-beta2): 修复时间轮幂等问题;优化手动重试逻辑;使用本框架日志组件

This commit is contained in:
srzou 2024-09-20 16:51:54 +08:00
parent ada892157d
commit e171604e7f
6 changed files with 25 additions and 17 deletions

View File

@ -1,7 +1,10 @@
package com.aizuda.snailjob.server.job.task.support.callback; package com.aizuda.snailjob.server.job.task.support.callback;
import akka.actor.ActorRef;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.util.ClientInfoUtils; import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
import com.aizuda.snailjob.server.job.task.dto.RealJobExecutorDTO; import com.aizuda.snailjob.server.job.task.dto.RealJobExecutorDTO;
import com.aizuda.snailjob.server.job.task.enums.JobRetrySceneEnum; import com.aizuda.snailjob.server.job.task.enums.JobRetrySceneEnum;
@ -62,10 +65,16 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan
if (StrUtil.isBlank(realJobExecutor.getWfContext())) { if (StrUtil.isBlank(realJobExecutor.getWfContext())) {
realJobExecutor.setWfContext(getWfContext(realJobExecutor.getWorkflowTaskBatchId())); realJobExecutor.setWfContext(getWfContext(realJobExecutor.getWorkflowTaskBatchId()));
} }
// 注册重试任务重试间隔时间轮 if (JobRetrySceneEnum.MANUAL.getRetryScene().equals(context.getRetryScene())) {
// 手动重试, 则即时重试
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
actorRef.tell(realJobExecutor, actorRef);
} else {
// 注册重试任务重试间隔时间轮
JobTimerWheel.registerWithJob(() -> new RetryJobTimerTask(realJobExecutor), Duration.ofSeconds(job.getRetryInterval())); JobTimerWheel.registerWithJob(() -> new RetryJobTimerTask(realJobExecutor), Duration.ofSeconds(job.getRetryInterval()));
return; return;
} }
}
// 不需要重试执行回调 // 不需要重试执行回调
doCallback(context); doCallback(context);

View File

@ -36,6 +36,7 @@ public class JobTimeoutCheckTask implements TimerTask<String> {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
JobTimerWheel.clearCache(idempotentKey());
JobTaskBatchMapper jobTaskBatchMapper = SnailSpringContext.getBean(JobTaskBatchMapper.class); JobTaskBatchMapper jobTaskBatchMapper = SnailSpringContext.getBean(JobTaskBatchMapper.class);
JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectById(taskBatchId); JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectById(taskBatchId);
if (Objects.isNull(jobTaskBatch)) { if (Objects.isNull(jobTaskBatch)) {

View File

@ -1,13 +1,13 @@
package com.aizuda.snailjob.server.job.task.support.timer; package com.aizuda.snailjob.server.job.task.support.timer;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.TimerTask; import com.aizuda.snailjob.server.common.TimerTask;
import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO; import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO;
import io.netty.util.Timeout; import io.netty.util.Timeout;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.time.LocalDateTime; import java.time.LocalDateTime;
@ -18,7 +18,6 @@ import java.time.LocalDateTime;
* @since 2.4.0 * @since 2.4.0
*/ */
@AllArgsConstructor @AllArgsConstructor
@Slf4j
public class JobTimerTask implements TimerTask<String> { public class JobTimerTask implements TimerTask<String> {
public static final String IDEMPOTENT_KEY_PREFIX = "job_{0}"; public static final String IDEMPOTENT_KEY_PREFIX = "job_{0}";
private JobTimerTaskDTO jobTimerTaskDTO; private JobTimerTaskDTO jobTimerTaskDTO;
@ -26,7 +25,7 @@ public class JobTimerTask implements TimerTask<String> {
@Override @Override
public void run(final Timeout timeout) throws Exception { public void run(final Timeout timeout) throws Exception {
// 执行任务调度 // 执行任务调度
log.debug("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobTimerTaskDTO.getTaskBatchId()); SnailJobLog.LOCAL.debug("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobTimerTaskDTO.getTaskBatchId());
try { try {
TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO(); TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO();
@ -39,7 +38,7 @@ public class JobTimerTask implements TimerTask<String> {
actorRef.tell(taskExecuteDTO, actorRef); actorRef.tell(taskExecuteDTO, actorRef);
} catch (Exception e) { } catch (Exception e) {
log.error("任务调度执行失败", e); SnailJobLog.LOCAL.error("任务调度执行失败", e);
} }
} }

View File

@ -1,18 +1,17 @@
package com.aizuda.snailjob.server.job.task.support.timer; package com.aizuda.snailjob.server.job.task.support.timer;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.TimerTask; import com.aizuda.snailjob.server.common.TimerTask;
import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.job.task.dto.RealJobExecutorDTO; import com.aizuda.snailjob.server.job.task.dto.RealJobExecutorDTO;
import io.netty.util.Timeout; import io.netty.util.Timeout;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.time.LocalDateTime; import java.time.LocalDateTime;
@AllArgsConstructor @AllArgsConstructor
@Slf4j
public class RetryJobTimerTask implements TimerTask<String> { public class RetryJobTimerTask implements TimerTask<String> {
public static final String IDEMPOTENT_KEY_PREFIX = "retry_job_{0}"; public static final String IDEMPOTENT_KEY_PREFIX = "retry_job_{0}";
private RealJobExecutorDTO jobExecutorDTO; private RealJobExecutorDTO jobExecutorDTO;
@ -20,18 +19,18 @@ public class RetryJobTimerTask implements TimerTask<String> {
@Override @Override
public void run(final Timeout timeout) throws Exception { public void run(final Timeout timeout) throws Exception {
// 执行任务调度 // 执行任务调度
log.debug("开始执行重试任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobExecutorDTO.getTaskBatchId()); SnailJobLog.LOCAL.debug("开始执行重试任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobExecutorDTO.getTaskBatchId());
JobTimerWheel.clearCache(idempotentKey());
try { try {
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
actorRef.tell(jobExecutorDTO, actorRef); actorRef.tell(jobExecutorDTO, actorRef);
} catch (Exception e) { } catch (Exception e) {
log.error("重试任务调度执行失败", e); SnailJobLog.LOCAL.error("重试任务调度执行失败", e);
} }
} }
@Override @Override
public String idempotentKey() { public String idempotentKey() {
return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, jobExecutorDTO.getTaskBatchId()); return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, jobExecutorDTO.getTaskId());
} }
} }

View File

@ -28,6 +28,7 @@ public class WorkflowTimeoutCheckTask implements TimerTask<String> {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
JobTimerWheel.clearCache(idempotentKey());
WorkflowTaskBatchMapper workflowTaskBatchMapper = SnailSpringContext.getBean(WorkflowTaskBatchMapper.class); WorkflowTaskBatchMapper workflowTaskBatchMapper = SnailSpringContext.getBean(WorkflowTaskBatchMapper.class);
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectById(workflowTaskBatchId); WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectById(workflowTaskBatchId);
// 幂等检查 // 幂等检查

View File

@ -2,13 +2,13 @@ package com.aizuda.snailjob.server.job.task.support.timer;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import com.aizuda.snailjob.common.core.constant.SystemConstants; import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.TimerTask; import com.aizuda.snailjob.server.common.TimerTask;
import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTimerTaskDTO; import com.aizuda.snailjob.server.job.task.dto.WorkflowTimerTaskDTO;
import io.netty.util.Timeout; import io.netty.util.Timeout;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.time.LocalDateTime; import java.time.LocalDateTime;
@ -19,7 +19,6 @@ import java.time.LocalDateTime;
* @since 2.6.0 * @since 2.6.0
*/ */
@AllArgsConstructor @AllArgsConstructor
@Slf4j
public class WorkflowTimerTask implements TimerTask<String> { public class WorkflowTimerTask implements TimerTask<String> {
public static final String IDEMPOTENT_KEY_PREFIX = "workflow_{0}"; public static final String IDEMPOTENT_KEY_PREFIX = "workflow_{0}";
@ -28,7 +27,7 @@ public class WorkflowTimerTask implements TimerTask<String> {
@Override @Override
public void run(final Timeout timeout) throws Exception { public void run(final Timeout timeout) throws Exception {
// 执行任务调度 // 执行任务调度
log.debug("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), workflowTimerTaskDTO.getWorkflowTaskBatchId()); SnailJobLog.LOCAL.debug("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), workflowTimerTaskDTO.getWorkflowTaskBatchId());
try { try {
@ -40,7 +39,7 @@ public class WorkflowTimerTask implements TimerTask<String> {
actorRef.tell(taskExecuteDTO, actorRef); actorRef.tell(taskExecuteDTO, actorRef);
} catch (Exception e) { } catch (Exception e) {
log.error("任务调度执行失败", e); SnailJobLog.LOCAL.error("任务调度执行失败", e);
} }
} }