diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java index 7fd61c16c..ccf71f42f 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java @@ -1,7 +1,10 @@ package com.aizuda.snailjob.server.job.task.support.callback; +import akka.actor.ActorRef; import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.util.ClientInfoUtils; import com.aizuda.snailjob.server.job.task.dto.RealJobExecutorDTO; import com.aizuda.snailjob.server.job.task.enums.JobRetrySceneEnum; @@ -62,9 +65,15 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan if (StrUtil.isBlank(realJobExecutor.getWfContext())) { realJobExecutor.setWfContext(getWfContext(realJobExecutor.getWorkflowTaskBatchId())); } - // 注册重试任务重试间隔时间轮 - JobTimerWheel.registerWithJob(() -> new RetryJobTimerTask(realJobExecutor), Duration.ofSeconds(job.getRetryInterval())); - return; + if (JobRetrySceneEnum.MANUAL.getRetryScene().equals(context.getRetryScene())) { + // 手动重试, 则即时重试 + ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); + actorRef.tell(realJobExecutor, actorRef); + } else { + // 注册重试任务,重试间隔时间轮 + JobTimerWheel.registerWithJob(() -> new RetryJobTimerTask(realJobExecutor), Duration.ofSeconds(job.getRetryInterval())); + return; + } } // 不需要重试执行回调 diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimeoutCheckTask.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimeoutCheckTask.java index dc19523bf..2a0a289a4 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimeoutCheckTask.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimeoutCheckTask.java @@ -36,6 +36,7 @@ public class JobTimeoutCheckTask implements TimerTask { @Override public void run(Timeout timeout) throws Exception { + JobTimerWheel.clearCache(idempotentKey()); JobTaskBatchMapper jobTaskBatchMapper = SnailSpringContext.getBean(JobTaskBatchMapper.class); JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectById(taskBatchId); if (Objects.isNull(jobTaskBatch)) { diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerTask.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerTask.java index 1c5b20e8e..fd5b8b680 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerTask.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerTask.java @@ -1,13 +1,13 @@ package com.aizuda.snailjob.server.job.task.support.timer; import akka.actor.ActorRef; +import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.TimerTask; import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO; import io.netty.util.Timeout; import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; import java.text.MessageFormat; import java.time.LocalDateTime; @@ -18,7 +18,6 @@ import java.time.LocalDateTime; * @since 2.4.0 */ @AllArgsConstructor -@Slf4j public class JobTimerTask implements TimerTask { public static final String IDEMPOTENT_KEY_PREFIX = "job_{0}"; private JobTimerTaskDTO jobTimerTaskDTO; @@ -26,7 +25,7 @@ public class JobTimerTask implements TimerTask { @Override public void run(final Timeout timeout) throws Exception { // 执行任务调度 - log.debug("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobTimerTaskDTO.getTaskBatchId()); + SnailJobLog.LOCAL.debug("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobTimerTaskDTO.getTaskBatchId()); try { TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO(); @@ -39,7 +38,7 @@ public class JobTimerTask implements TimerTask { actorRef.tell(taskExecuteDTO, actorRef); } catch (Exception e) { - log.error("任务调度执行失败", e); + SnailJobLog.LOCAL.error("任务调度执行失败", e); } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/RetryJobTimerTask.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/RetryJobTimerTask.java index 86301fed0..0b6615dc9 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/RetryJobTimerTask.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/RetryJobTimerTask.java @@ -1,18 +1,17 @@ package com.aizuda.snailjob.server.job.task.support.timer; import akka.actor.ActorRef; +import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.TimerTask; import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.job.task.dto.RealJobExecutorDTO; import io.netty.util.Timeout; import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; import java.text.MessageFormat; import java.time.LocalDateTime; @AllArgsConstructor -@Slf4j public class RetryJobTimerTask implements TimerTask { public static final String IDEMPOTENT_KEY_PREFIX = "retry_job_{0}"; private RealJobExecutorDTO jobExecutorDTO; @@ -20,18 +19,18 @@ public class RetryJobTimerTask implements TimerTask { @Override public void run(final Timeout timeout) throws Exception { // 执行任务调度 - log.debug("开始执行重试任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobExecutorDTO.getTaskBatchId()); - + SnailJobLog.LOCAL.debug("开始执行重试任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobExecutorDTO.getTaskBatchId()); + JobTimerWheel.clearCache(idempotentKey()); try { ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); actorRef.tell(jobExecutorDTO, actorRef); } catch (Exception e) { - log.error("重试任务调度执行失败", e); + SnailJobLog.LOCAL.error("重试任务调度执行失败", e); } } @Override public String idempotentKey() { - return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, jobExecutorDTO.getTaskBatchId()); + return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, jobExecutorDTO.getTaskId()); } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimeoutCheckTask.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimeoutCheckTask.java index 7dfe5f364..29cdbc4d8 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimeoutCheckTask.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimeoutCheckTask.java @@ -28,6 +28,7 @@ public class WorkflowTimeoutCheckTask implements TimerTask { @Override public void run(Timeout timeout) throws Exception { + JobTimerWheel.clearCache(idempotentKey()); WorkflowTaskBatchMapper workflowTaskBatchMapper = SnailSpringContext.getBean(WorkflowTaskBatchMapper.class); WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectById(workflowTaskBatchId); // 幂等检查 diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimerTask.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimerTask.java index 4a284db22..e8f81cd03 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimerTask.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimerTask.java @@ -2,13 +2,13 @@ package com.aizuda.snailjob.server.job.task.support.timer; import akka.actor.ActorRef; import com.aizuda.snailjob.common.core.constant.SystemConstants; +import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.TimerTask; import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.snailjob.server.job.task.dto.WorkflowTimerTaskDTO; import io.netty.util.Timeout; import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; import java.text.MessageFormat; import java.time.LocalDateTime; @@ -19,7 +19,6 @@ import java.time.LocalDateTime; * @since 2.6.0 */ @AllArgsConstructor -@Slf4j public class WorkflowTimerTask implements TimerTask { public static final String IDEMPOTENT_KEY_PREFIX = "workflow_{0}"; @@ -28,7 +27,7 @@ public class WorkflowTimerTask implements TimerTask { @Override public void run(final Timeout timeout) throws Exception { // 执行任务调度 - log.debug("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), workflowTimerTaskDTO.getWorkflowTaskBatchId()); + SnailJobLog.LOCAL.debug("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), workflowTimerTaskDTO.getWorkflowTaskBatchId()); try { @@ -40,7 +39,7 @@ public class WorkflowTimerTask implements TimerTask { actorRef.tell(taskExecuteDTO, actorRef); } catch (Exception e) { - log.error("任务调度执行失败", e); + SnailJobLog.LOCAL.error("任务调度执行失败", e); } }