diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/IdempotentStrategy.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/IdempotentStrategy.java index f13a9112..c8e19369 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/IdempotentStrategy.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/IdempotentStrategy.java @@ -6,14 +6,12 @@ package com.aizuda.snailjob.server.common; * @author: opensnail * @date : 2021-11-23 09:20 */ -public interface IdempotentStrategy { +public interface IdempotentStrategy { - boolean set(T key, V value); + boolean set(T key); - V get(T t); + boolean isExist(T key); - boolean isExist(T key, V value); - - boolean clear(T key, V value); + boolean clear(T key); } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/TimerTask.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/TimerTask.java similarity index 63% rename from snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/TimerTask.java rename to snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/TimerTask.java index 0f33eba6..8c15a197 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/TimerTask.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/TimerTask.java @@ -1,4 +1,4 @@ -package com.aizuda.snailjob.server.job.task.support.timer; +package com.aizuda.snailjob.server.common; /** * @author: opensnail @@ -7,5 +7,5 @@ package com.aizuda.snailjob.server.job.task.support.timer; */ public interface TimerTask extends io.netty.util.TimerTask { - T getUniqueId(); + T idempotentKey(); } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java index cc4bdd4f..4f187daa 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java @@ -32,6 +32,7 @@ import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerat import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory; import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; import com.aizuda.snailjob.server.job.task.support.timer.JobTimeoutCheckTask; +import com.aizuda.snailjob.server.job.task.support.timer.JobTimerTask; import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel; import com.aizuda.snailjob.server.job.task.support.timer.ResidentJobTimerTask; import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper; @@ -54,6 +55,7 @@ import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.transaction.support.TransactionTemplate; +import java.text.MessageFormat; import java.time.Duration; import java.time.LocalDateTime; import java.util.List; @@ -152,7 +154,7 @@ public class JobExecutorActor extends AbstractActor { @Override public void afterCompletion(int status) { // 清除时间轮的缓存 - JobTimerWheel.clearCache(SyetemTaskTypeEnum.JOB.getType(), taskExecute.getTaskBatchId()); + JobTimerWheel.clearCache(MessageFormat.format(JobTimerTask.IDEMPOTENT_KEY_PREFIX, taskExecute.getTaskBatchId())); if (JobTaskBatchStatusEnum.RUNNING.getStatus() == finalTaskStatus) { @@ -160,10 +162,6 @@ public class JobExecutorActor extends AbstractActor { JobTimerWheel.registerWithJob(() -> new JobTimeoutCheckTask(taskExecute.getTaskBatchId(), job.getId()), // 加500ms是为了让尽量保证客户端自己先超时中断,防止客户端上报成功但是服务端已触发超时中断 Duration.ofMillis(DateUtils.toEpochMilli(job.getExecutorTimeout()) + 500)); - -// JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), taskExecute.getTaskBatchId(), -// new JobTimeoutCheckTask(taskExecute.getTaskBatchId(), job.getId()), -// job.getExecutorTimeout(), TimeUnit.SECONDS); } //方法内容 @@ -241,9 +239,11 @@ public class JobExecutorActor extends AbstractActor { // 获取时间差的毫秒数 long milliseconds = nextTriggerAt - preTriggerAt; - log.debug("常驻任务监控. 任务时间差:[{}] 取余:[{}]", milliseconds, DateUtils.toNowMilli() % 1000); + Duration duration = Duration.ofMillis(milliseconds - DateUtils.toNowMilli() % 1000); + + log.info("常驻任务监控. [{}] 任务时间差:[{}] 取余:[{}]", duration, milliseconds, DateUtils.toNowMilli() % 1000); job.setNextTriggerAt(nextTriggerAt); - JobTimerWheel.registerWithJob(() -> new ResidentJobTimerTask(jobTimerTaskDTO, job), Duration.ofMillis(milliseconds - DateUtils.toNowMilli() % 1000)); + JobTimerWheel.registerWithJob(() -> new ResidentJobTimerTask(jobTimerTaskDTO, job), duration); // JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - DateUtils.toNowMilli() % 1000, TimeUnit.MILLISECONDS); ResidentTaskCache.refresh(job.getId(), nextTriggerAt); } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java index 2f15d0d5..9040bff5 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -25,6 +25,7 @@ import com.aizuda.snailjob.server.job.task.support.executor.workflow.WorkflowExe import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel; import com.aizuda.snailjob.server.job.task.support.timer.WorkflowTimeoutCheckTask; +import com.aizuda.snailjob.server.job.task.support.timer.WorkflowTimerTask; import com.aizuda.snailjob.template.datasource.persistence.mapper.*; import com.aizuda.snailjob.template.datasource.persistence.po.*; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; @@ -36,6 +37,7 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; +import java.text.MessageFormat; import java.time.Duration; import java.time.LocalDateTime; import java.util.List; @@ -89,13 +91,10 @@ public class WorkflowExecutorActor extends AbstractActor { handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.RUNNING.getStatus(), JobOperationReasonEnum.NONE.getReason()); Workflow workflow = workflowMapper.selectById(workflowTaskBatch.getWorkflowId()); - JobTimerWheel.clearCache(SyetemTaskTypeEnum.WORKFLOW.getType(), taskExecute.getWorkflowTaskBatchId()); + JobTimerWheel.clearCache(MessageFormat.format(WorkflowTimerTask.IDEMPOTENT_KEY_PREFIX, taskExecute.getWorkflowTaskBatchId())); JobTimerWheel.registerWithWorkflow(() -> new WorkflowTimeoutCheckTask(taskExecute.getWorkflowTaskBatchId()), Duration.ofSeconds(workflow.getExecutorTimeout())); - // 超时检查 -// JobTimerWheel.register(SyetemTaskTypeEnum.WORKFLOW.getType(), taskExecute.getWorkflowTaskBatchId(), -// new WorkflowTimeoutCheckTask(taskExecute.getWorkflowTaskBatchId()), workflow.getExecutorTimeout(), TimeUnit.SECONDS); } // 获取DAG图 diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/idempotent/TimerIdempotent.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/idempotent/TimerIdempotent.java index 175f8e6f..f361165c 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/idempotent/TimerIdempotent.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/idempotent/TimerIdempotent.java @@ -1,7 +1,6 @@ package com.aizuda.snailjob.server.job.task.support.idempotent; import com.aizuda.snailjob.server.common.IdempotentStrategy; -import com.aizuda.snailjob.server.common.triple.Pair; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; @@ -12,9 +11,9 @@ import java.util.concurrent.TimeUnit; * @date 2023-10-19 21:54:57 * @since 2.4.0 */ -public class TimerIdempotent implements IdempotentStrategy { +public class TimerIdempotent implements IdempotentStrategy { - private static final Cache, Long> cache; + private static final Cache cache; static { cache = CacheBuilder.newBuilder() @@ -25,28 +24,20 @@ public class TimerIdempotent implements IdempotentStrategy { } @Override - public boolean set(Integer type, Long value) { - cache.put(getKey(type, value), value); + public boolean set(String key) { + cache.put(key, key); return Boolean.TRUE; } @Override - public Long get(Integer s) { - throw new UnsupportedOperationException("不支持此操作"); + public boolean isExist(String key) { + return cache.asMap().containsKey(key); } @Override - public boolean isExist(Integer type, Long value) { - return cache.asMap().containsKey(getKey(type, value)); - } - - @Override - public boolean clear(Integer type, Long value) { - cache.invalidate(getKey(type, value)); + public boolean clear(String key) { + cache.invalidate(key); return Boolean.TRUE; } - private static Pair getKey(Integer type, Long value) { - return Pair.of(type, value); - } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/job/WaitJobPrepareHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/job/WaitJobPrepareHandler.java index 48624cfb..572d5d6c 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/job/WaitJobPrepareHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/job/WaitJobPrepareHandler.java @@ -10,6 +10,7 @@ import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.text.MessageFormat; import java.time.Duration; /** @@ -33,7 +34,7 @@ public class WaitJobPrepareHandler extends AbstractJobPrePareHandler { log.debug("存在待处理任务. taskBatchId:[{}]", jobPrepareDTO.getTaskBatchId()); // 若时间轮中数据不存在则重新加入 - if (!JobTimerWheel.isExisted(SyetemTaskTypeEnum.JOB.getType(), jobPrepareDTO.getTaskBatchId())) { + if (!JobTimerWheel.isExisted(MessageFormat.format(JobTimerTask.IDEMPOTENT_KEY_PREFIX, jobPrepareDTO.getTaskBatchId()))) { log.info("存在待处理任务且时间轮中不存在 taskBatchId:[{}]", jobPrepareDTO.getTaskBatchId()); // 进入时间轮 @@ -43,8 +44,6 @@ public class WaitJobPrepareHandler extends AbstractJobPrePareHandler { jobTimerTaskDTO.setJobId(jobPrepareDTO.getJobId()); JobTimerWheel.registerWithJob(() -> new JobTimerTask(jobTimerTaskDTO), Duration.ofMillis(delay)); -// JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), jobPrepareDTO.getTaskBatchId(), -// new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS); } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/workflow/WaiWorkflowPrepareHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/workflow/WaitWorkflowPrepareHandler.java similarity index 80% rename from snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/workflow/WaiWorkflowPrepareHandler.java rename to snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/workflow/WaitWorkflowPrepareHandler.java index 0dea42e6..ab827b57 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/workflow/WaiWorkflowPrepareHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/workflow/WaitWorkflowPrepareHandler.java @@ -1,7 +1,6 @@ package com.aizuda.snailjob.server.job.task.support.prepare.workflow; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; -import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskPrepareDTO; import com.aizuda.snailjob.server.job.task.dto.WorkflowTimerTaskDTO; @@ -10,6 +9,7 @@ import com.aizuda.snailjob.server.job.task.support.timer.WorkflowTimerTask; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.text.MessageFormat; import java.time.Duration; import java.util.Objects; @@ -22,7 +22,7 @@ import java.util.Objects; */ @Component @Slf4j -public class WaiWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler { +public class WaitWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler { @Override public boolean matches(Integer status) { @@ -34,7 +34,7 @@ public class WaiWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler { log.debug("存在待处理任务. workflowTaskBatchId:[{}]", workflowTaskPrepareDTO.getWorkflowTaskBatchId()); // 若时间轮中数据不存在则重新加入 - if (!JobTimerWheel.isExisted(SyetemTaskTypeEnum.WORKFLOW.getType(), workflowTaskPrepareDTO.getWorkflowTaskBatchId())) { + if (!JobTimerWheel.isExisted(MessageFormat.format(WorkflowTimerTask.IDEMPOTENT_KEY_PREFIX, workflowTaskPrepareDTO.getWorkflowTaskBatchId()))) { log.info("存在待处理任务且时间轮中不存在 workflowTaskBatchId:[{}]", workflowTaskPrepareDTO.getWorkflowTaskBatchId()); // 进入时间轮 @@ -45,8 +45,6 @@ public class WaiWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler { workflowTimerTaskDTO.setTaskExecutorScene(workflowTaskPrepareDTO.getTaskExecutorScene()); JobTimerWheel.registerWithWorkflow(() -> new WorkflowTimerTask(workflowTimerTaskDTO), Duration.ofMillis(delay)); -// JobTimerWheel.register(SyetemTaskTypeEnum.WORKFLOW.getType(), workflowTaskPrepareDTO.getWorkflowTaskBatchId(), -// new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS); } } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimeoutCheckTask.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimeoutCheckTask.java index 7fcc3bf9..a000625b 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimeoutCheckTask.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimeoutCheckTask.java @@ -4,6 +4,7 @@ import com.aizuda.snailjob.common.core.context.SpringContext; import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.TimerTask; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler; import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; @@ -16,6 +17,7 @@ import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; import io.netty.util.Timeout; import lombok.AllArgsConstructor; +import java.text.MessageFormat; import java.util.Objects; /** @@ -26,7 +28,9 @@ import java.util.Objects; * @since sj_1.0.0 */ @AllArgsConstructor -public class JobTimeoutCheckTask implements TimerTask { +public class JobTimeoutCheckTask implements TimerTask { + private static final String IDEMPOTENT_KEY_PREFIX = "job_timeout_check_{0}"; + private final Long taskBatchId; private final Long jobId; @@ -65,7 +69,7 @@ public class JobTimeoutCheckTask implements TimerTask { } @Override - public Long getUniqueId() { - return taskBatchId; + public String idempotentKey() { + return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, taskBatchId); } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerTask.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerTask.java index 4a233e99..1c5b20e8 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerTask.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerTask.java @@ -1,6 +1,7 @@ package com.aizuda.snailjob.server.job.task.support.timer; import akka.actor.ActorRef; +import com.aizuda.snailjob.server.common.TimerTask; import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO; @@ -8,6 +9,7 @@ import io.netty.util.Timeout; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import java.text.MessageFormat; import java.time.LocalDateTime; /** @@ -17,8 +19,8 @@ import java.time.LocalDateTime; */ @AllArgsConstructor @Slf4j -public class JobTimerTask implements TimerTask { - +public class JobTimerTask implements TimerTask { + public static final String IDEMPOTENT_KEY_PREFIX = "job_{0}"; private JobTimerTaskDTO jobTimerTaskDTO; @Override @@ -42,7 +44,7 @@ public class JobTimerTask implements TimerTask { } @Override - public Long getUniqueId() { - return jobTimerTaskDTO.getTaskBatchId(); + public String idempotentKey() { + return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, jobTimerTaskDTO.getTaskBatchId()); } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerWheel.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerWheel.java index 1759e382..0afe2282 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerWheel.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerWheel.java @@ -1,7 +1,7 @@ package com.aizuda.snailjob.server.job.task.support.timer; import com.aizuda.snailjob.common.log.SnailJobLog; -import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; +import com.aizuda.snailjob.server.common.TimerTask; import com.aizuda.snailjob.server.job.task.support.idempotent.TimerIdempotent; import io.netty.util.HashedWheelTimer; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; @@ -36,60 +36,54 @@ public class JobTimerWheel { timer.start(); } -// @Deprecated -// public static synchronized void register(Integer taskType, Long uniqueId, TimerTask task, long delay, TimeUnit unit) { -// -// if (!isExisted(taskType, uniqueId)) { -// SnailJobLog.LOCAL.debug("加入时间轮. delay:[{}ms] taskType:[{}] uniqueId:[{}]", delay, taskType, uniqueId); -// delay = delay < 0 ? 0 : delay; -// try { -// timer.newTimeout(task, delay, unit); -// idempotent.set(taskType, uniqueId); -// } catch (Exception e) { -// SnailJobLog.LOCAL.error("加入时间轮失败. uniqueId:[{}]", uniqueId, e); -// } -// } -// } - - public static synchronized void registerWithWorkflow(Supplier> task, Duration delay) { - TimerTask timerTask = task.get(); - register(SyetemTaskTypeEnum.WORKFLOW.getType(), timerTask.getUniqueId(), timerTask, delay); + /** + * 定时任务添加时间轮 + * + * @param task 任务 + * @param delay 延迟时间 + */ + public static synchronized void registerWithWorkflow(Supplier> task, Duration delay) { + TimerTask timerTask = task.get(); + register(timerTask.idempotentKey(), timerTask, delay); } - public static synchronized void registerWithJob(Supplier> task, Duration delay) { - TimerTask timerTask = task.get(); - register(SyetemTaskTypeEnum.JOB.getType(), timerTask.getUniqueId(), timerTask, delay); + /** + * 工作流任务添加时间轮 + * 虽然job和Workflow 添加时间轮方法逻辑一样为了后面做一些不同的逻辑,这里兼容分开写 + * @param task 任务 + * @param delay 延迟时间 + */ + public static synchronized void registerWithJob(Supplier> task, Duration delay) { + TimerTask timerTask = task.get(); + register(timerTask.idempotentKey(), timerTask, delay); } - public static synchronized void register(Integer taskType, Long uniqueId, TimerTask task, Duration delay) { + public static synchronized void register(String idempotentKey, TimerTask task, Duration delay) { - register(taskType, uniqueId, new Consumer() { - @Override - public void accept(final HashedWheelTimer hashedWheelTimer) { - SnailJobLog.LOCAL.debug("加入时间轮. delay:[{}ms] taskType:[{}] uniqueId:[{}]", delay, taskType, uniqueId); - timer.newTimeout(task, Math.max(delay.toMillis(), 0), TimeUnit.MILLISECONDS); - } + register(idempotentKey, hashedWheelTimer -> { + SnailJobLog.LOCAL.debug("加入时间轮. delay:[{}ms] taskType:[{}] uniqueId:[{}]", delay, idempotentKey); + timer.newTimeout(task, Math.max(delay.toMillis(), 0), TimeUnit.MILLISECONDS); }); } - public static synchronized void register(Integer taskType, Long uniqueId, Consumer consumer) { + public static synchronized void register(String idempotentKey, Consumer consumer) { - if (!isExisted(taskType, uniqueId)) { + if (!isExisted(idempotentKey)) { try { consumer.accept(timer); - idempotent.set(taskType, uniqueId); + idempotent.set(idempotentKey); } catch (Exception e) { - SnailJobLog.LOCAL.error("加入时间轮失败. uniqueId:[{}]", uniqueId, e); + SnailJobLog.LOCAL.error("加入时间轮失败. uniqueId:[{}]", idempotentKey, e); } } } - public static boolean isExisted(Integer taskType, Long uniqueId) { - return idempotent.isExist(taskType, uniqueId); + public static boolean isExisted(String idempotentKey) { + return idempotent.isExist(idempotentKey); } - public static void clearCache(Integer taskType, Long uniqueId) { - idempotent.clear(taskType, uniqueId); + public static void clearCache(String idempotentKey) { + idempotent.clear(idempotentKey); } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/ResidentJobTimerTask.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/ResidentJobTimerTask.java index 7c3af067..c6918b2e 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/ResidentJobTimerTask.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/ResidentJobTimerTask.java @@ -1,25 +1,27 @@ package com.aizuda.snailjob.server.job.task.support.timer; import akka.actor.ActorRef; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.TimerTask; import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum; -import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.template.datasource.persistence.po.Job; import io.netty.util.Timeout; import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; + +import java.text.MessageFormat; /** * @author opensnail * @date 2023-10-20 23:09:13 * @since 2.4.0 */ -@Slf4j @AllArgsConstructor -public class ResidentJobTimerTask implements TimerTask { +public class ResidentJobTimerTask implements TimerTask { + private static final String IDEMPOTENT_KEY_PREFIX = " resident_job_{0}"; private JobTimerTaskDTO jobTimerTaskDTO; private Job job; @@ -28,19 +30,19 @@ public class ResidentJobTimerTask implements TimerTask { public void run(Timeout timeout) throws Exception { try { // 清除时间轮的缓存 - JobTimerWheel.clearCache(SyetemTaskTypeEnum.JOB.getType(), jobTimerTaskDTO.getTaskBatchId()); + JobTimerWheel.clearCache(idempotentKey()); JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job); jobTaskPrepare.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_JOB.getType()); // 执行预处理阶段 ActorRef actorRef = ActorGenerator.jobTaskPrepareActor(); actorRef.tell(jobTaskPrepare, actorRef); } catch (Exception e) { - log.error("任务调度执行失败", e); + SnailJobLog.LOCAL.error("任务调度执行失败", e); } } @Override - public Long getUniqueId() { - return jobTimerTaskDTO.getTaskBatchId(); + public String idempotentKey() { + return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, jobTimerTaskDTO.getTaskBatchId()); } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimeoutCheckTask.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimeoutCheckTask.java index be9cea99..bd5ab38b 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimeoutCheckTask.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimeoutCheckTask.java @@ -4,6 +4,7 @@ import com.aizuda.snailjob.common.core.context.SpringContext; import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.TimerTask; import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent; import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; @@ -11,6 +12,7 @@ import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch; import io.netty.util.Timeout; import lombok.AllArgsConstructor; +import java.text.MessageFormat; import java.util.Objects; /** @@ -19,7 +21,9 @@ import java.util.Objects; * @since sj_1.0.0 */ @AllArgsConstructor -public class WorkflowTimeoutCheckTask implements TimerTask { +public class WorkflowTimeoutCheckTask implements TimerTask { + private static final String IDEMPOTENT_KEY_PREFIX = "workflow_timeout_check_{0}"; + private final Long workflowTaskBatchId; @Override @@ -40,7 +44,7 @@ public class WorkflowTimeoutCheckTask implements TimerTask { } @Override - public Long getUniqueId() { - return workflowTaskBatchId; + public String idempotentKey() { + return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, workflowTaskBatchId); } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimerTask.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimerTask.java index 1243ec9b..2c96507a 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimerTask.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimerTask.java @@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.job.task.support.timer; import akka.actor.ActorRef; import com.aizuda.snailjob.common.core.constant.SystemConstants; +import com.aizuda.snailjob.server.common.TimerTask; import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.snailjob.server.job.task.dto.WorkflowTimerTaskDTO; @@ -9,6 +10,7 @@ import io.netty.util.Timeout; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import java.text.MessageFormat; import java.time.LocalDateTime; /** @@ -18,7 +20,8 @@ import java.time.LocalDateTime; */ @AllArgsConstructor @Slf4j -public class WorkflowTimerTask implements TimerTask { +public class WorkflowTimerTask implements TimerTask { + public static final String IDEMPOTENT_KEY_PREFIX = "workflow_{0}"; private WorkflowTimerTaskDTO workflowTimerTaskDTO; @@ -42,7 +45,7 @@ public class WorkflowTimerTask implements TimerTask { } @Override - public Long getUniqueId() { - return workflowTimerTaskDTO.getWorkflowTaskBatchId(); + public String idempotentKey() { + return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, workflowTimerTaskDTO.getWorkflowTaskBatchId()); } } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/result/FailureActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/result/FailureActor.java index ffc15652..c9b6abd2 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/result/FailureActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/result/FailureActor.java @@ -2,7 +2,6 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch.actor.result; import akka.actor.AbstractActor; import cn.hutool.core.lang.Assert; -import cn.hutool.core.lang.Pair; import com.aizuda.snailjob.common.core.enums.RetryStatusEnum; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.IdempotentStrategy; @@ -10,17 +9,17 @@ import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.config.SystemProperties; import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; +import com.aizuda.snailjob.server.common.triple.ImmutableTriple; import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailMoreThresholdAlarmEvent; import com.aizuda.snailjob.server.retry.task.support.handler.CallbackRetryTaskHandler; +import com.aizuda.snailjob.server.retry.task.support.idempotent.IdempotentHolder; import com.aizuda.snailjob.template.datasource.access.AccessTemplate; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMapper; import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLog; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; +import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Scope; @@ -40,23 +39,15 @@ import java.time.LocalDateTime; */ @Component(ActorGenerator.FAILURE_ACTOR) @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) -@Slf4j +@RequiredArgsConstructor public class FailureActor extends AbstractActor { - @Autowired - private ApplicationContext context; - @Autowired - private AccessTemplate accessTemplate; - @Autowired - private CallbackRetryTaskHandler callbackRetryTaskHandler; - @Autowired - private TransactionTemplate transactionTemplate; - @Autowired - private SystemProperties systemProperties; - @Autowired - @Qualifier("retryIdempotentStrategyHandler") - private IdempotentStrategy, Long> idempotentStrategy; - @Autowired - private RetryTaskLogMapper retryTaskLogMapper; + private final IdempotentStrategy idempotentStrategy = IdempotentHolder.getRetryIdempotent(); + private final ApplicationContext context; + private final AccessTemplate accessTemplate; + private final CallbackRetryTaskHandler callbackRetryTaskHandler; + private final TransactionTemplate transactionTemplate; + private final SystemProperties systemProperties; + private final RetryTaskLogMapper retryTaskLogMapper; @Override public Receive createReceive() { @@ -107,8 +98,8 @@ public class FailureActor extends AbstractActor { SnailJobLog.LOCAL.error("更新重试任务失败", e); } finally { // 清除幂等标识位 - idempotentStrategy.clear(Pair.of(retryTask.getGroupName(), retryTask.getNamespaceId()), retryTask.getId()); - getContext().stop(getSelf()); + idempotentStrategy.clear( + ImmutableTriple.of(retryTask.getGroupName(), retryTask.getNamespaceId(), retryTask.getId()).toString()); getContext().stop(getSelf()); } }).build(); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/result/FinishActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/result/FinishActor.java index 81d43ba6..86ad1221 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/result/FinishActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/result/FinishActor.java @@ -2,21 +2,20 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch.actor.result; import akka.actor.AbstractActor; import cn.hutool.core.lang.Assert; -import cn.hutool.core.lang.Pair; import com.aizuda.snailjob.common.core.enums.RetryStatusEnum; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.IdempotentStrategy; import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; +import com.aizuda.snailjob.server.common.triple.ImmutableTriple; import com.aizuda.snailjob.server.retry.task.support.handler.CallbackRetryTaskHandler; +import com.aizuda.snailjob.server.retry.task.support.idempotent.IdempotentHolder; import com.aizuda.snailjob.template.datasource.access.AccessTemplate; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMapper; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLog; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; +import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; @@ -37,21 +36,13 @@ import java.time.LocalDateTime; */ @Component(ActorGenerator.FINISH_ACTOR) @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) -@Slf4j +@RequiredArgsConstructor public class FinishActor extends AbstractActor { - - - @Autowired - private AccessTemplate accessTemplate; - @Autowired - private CallbackRetryTaskHandler callbackRetryTaskHandler; - @Autowired - private TransactionTemplate transactionTemplate; - @Autowired - @Qualifier("retryIdempotentStrategyHandler") - private IdempotentStrategy, Long> idempotentStrategy; - @Autowired - private RetryTaskLogMapper retryTaskLogMapper; + private final IdempotentStrategy idempotentStrategy = IdempotentHolder.getRetryIdempotent(); + private final AccessTemplate accessTemplate; + private final CallbackRetryTaskHandler callbackRetryTaskHandler; + private final TransactionTemplate transactionTemplate; + private final RetryTaskLogMapper retryTaskLogMapper; @Override public Receive createReceive() { @@ -89,8 +80,8 @@ public class FinishActor extends AbstractActor { SnailJobLog.LOCAL.error("更新重试任务失败", e); } finally { // 清除幂等标识位 - idempotentStrategy.clear(Pair.of(retryTask.getGroupName(), retryTask.getNamespaceId()), retryTask.getId()); - + idempotentStrategy.clear( + ImmutableTriple.of(retryTask.getGroupName(), retryTask.getNamespaceId(), retryTask.getId()).toString()); getContext().stop(getSelf()); } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/result/NoRetryActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/result/NoRetryActor.java index 0645d705..16547b67 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/result/NoRetryActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/result/NoRetryActor.java @@ -2,18 +2,17 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch.actor.result; import akka.actor.AbstractActor; import cn.hutool.core.lang.Assert; -import cn.hutool.core.lang.Pair; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.IdempotentStrategy; import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; +import com.aizuda.snailjob.server.common.triple.ImmutableTriple; import com.aizuda.snailjob.server.retry.task.support.RetryContext; +import com.aizuda.snailjob.server.retry.task.support.idempotent.IdempotentHolder; import com.aizuda.snailjob.server.retry.task.support.retry.RetryExecutor; import com.aizuda.snailjob.template.datasource.access.AccessTemplate; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; +import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; @@ -29,14 +28,10 @@ import java.time.LocalDateTime; */ @Component(ActorGenerator.NO_RETRY_ACTOR) @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) -@Slf4j +@RequiredArgsConstructor public class NoRetryActor extends AbstractActor { - - @Autowired - protected AccessTemplate accessTemplate; - @Autowired - @Qualifier("retryIdempotentStrategyHandler") - private IdempotentStrategy, Long> idempotentStrategy; + private final IdempotentStrategy idempotentStrategy = IdempotentHolder.getRetryIdempotent(); + private final AccessTemplate accessTemplate; @Override public Receive createReceive() { @@ -57,8 +52,7 @@ public class NoRetryActor extends AbstractActor { SnailJobLog.LOCAL.error("更新重试任务失败", e); } finally { // 清除幂等标识位 - idempotentStrategy.clear(Pair.of(retryTask.getGroupName(), retryTask.getNamespaceId()), retryTask.getId()); - + idempotentStrategy.clear(ImmutableTriple.of(retryTask.getGroupName(), retryTask.getNamespaceId(), retryTask.getId()).toString()); // 更新DB状态 getContext().stop(getSelf()); } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java index 90c335b7..d6f01f20 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java @@ -2,7 +2,6 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch.actor.scan; import akka.actor.AbstractActor; import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.lang.Pair; import com.aizuda.snailjob.common.core.constant.SystemConstants; import com.aizuda.snailjob.common.core.enums.RetryStatusEnum; import com.aizuda.snailjob.common.core.util.StreamUtils; @@ -24,13 +23,13 @@ import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; -import io.netty.util.TimerTask; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import java.time.Duration; import java.time.LocalDateTime; import java.util.*; -import java.util.concurrent.TimeUnit; +import com.aizuda.snailjob.server.common.TimerTask; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -137,12 +136,8 @@ public abstract class AbstractScanGroup extends AbstractActor { for (PartitionTask partitionTask : partitionTasks) { RetryPartitionTask retryPartitionTask = (RetryPartitionTask) partitionTask; long delay = DateUtils.toEpochMilli(retryPartitionTask.getNextTriggerAt()) - nowMilli - nowMilli % 100; - RetryTimerWheel.register( - Pair.of(retryPartitionTask.getGroupName(), retryPartitionTask.getNamespaceId()), - retryPartitionTask.getUniqueId(), - timerTask(retryPartitionTask), - delay, - TimeUnit.MILLISECONDS); + TimerTask timerTask = timerTask(retryPartitionTask); + RetryTimerWheel.register(timerTask.idempotentKey(), timerTask, Duration.ofMillis(delay)); } } @@ -175,7 +170,7 @@ public abstract class AbstractScanGroup extends AbstractActor { protected abstract LocalDateTime calculateNextTriggerTime(RetryPartitionTask partitionTask, final RetrySceneConfig retrySceneConfig); - protected abstract TimerTask timerTask(RetryPartitionTask partitionTask); + protected abstract TimerTask timerTask(RetryPartitionTask partitionTask); protected abstract AtomicLong preCostTime(); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java index 08f272ab..868e205c 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java @@ -1,5 +1,6 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch.actor.scan; +import com.aizuda.snailjob.server.common.TimerTask; import com.aizuda.snailjob.server.common.WaitStrategy; import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.strategy.WaitStrategies.WaitStrategyContext; @@ -11,7 +12,6 @@ import com.aizuda.snailjob.server.retry.task.support.dispatch.task.TaskExecutorS import com.aizuda.snailjob.server.retry.task.support.timer.CallbackTimerTask; import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimerContext; import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; -import io.netty.util.TimerTask; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; @@ -72,7 +72,7 @@ public class ScanCallbackTaskActor extends AbstractScanGroup { } @Override - protected TimerTask timerTask(final RetryPartitionTask partitionTask) { + protected TimerTask timerTask(final RetryPartitionTask partitionTask) { RetryTimerContext retryTimerContext = RetryTaskConverter.INSTANCE.toRetryTimerContext(partitionTask); retryTimerContext.setScene(taskActuatorScene()); return new CallbackTimerTask(retryTimerContext); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java index ccf7756c..6470d6e5 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java @@ -1,6 +1,7 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch.actor.scan; import com.aizuda.snailjob.common.core.constant.SystemConstants; +import com.aizuda.snailjob.server.common.TimerTask; import com.aizuda.snailjob.server.common.WaitStrategy; import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.strategy.WaitStrategies.WaitStrategyContext; @@ -12,7 +13,6 @@ import com.aizuda.snailjob.server.retry.task.support.dispatch.task.TaskExecutorS import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimerContext; import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimerTask; import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; -import io.netty.util.TimerTask; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; @@ -79,7 +79,7 @@ public class ScanRetryTaskActor extends AbstractScanGroup { } @Override - protected TimerTask timerTask(final RetryPartitionTask partitionTask) { + protected TimerTask timerTask(final RetryPartitionTask partitionTask) { RetryTimerContext retryTimerContext = RetryTaskConverter.INSTANCE.toRetryTimerContext(partitionTask); retryTimerContext.setScene(taskActuatorScene()); return new RetryTimerTask(retryTimerContext); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/AbstractTaskExecutor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/AbstractTaskExecutor.java index 5da64085..d9f55ace 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/AbstractTaskExecutor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/AbstractTaskExecutor.java @@ -3,33 +3,30 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch.task; import akka.actor.ActorRef; import cn.hutool.core.lang.Pair; import com.aizuda.snailjob.common.log.SnailJobLog; -import com.aizuda.snailjob.server.common.IdempotentStrategy; import com.aizuda.snailjob.server.common.config.SystemProperties; import com.aizuda.snailjob.server.common.dto.RetryLogMetaDTO; import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler; +import com.aizuda.snailjob.server.common.triple.ImmutableTriple; import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.retry.task.support.RetryContext; import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; +import com.aizuda.snailjob.server.retry.task.support.idempotent.IdempotentHolder; +import com.aizuda.snailjob.server.retry.task.support.idempotent.RetryIdempotentStrategyHandler; import com.aizuda.snailjob.server.retry.task.support.retry.RetryExecutor; import com.aizuda.snailjob.template.datasource.access.AccessTemplate; import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; -import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; /** * @author opensnail * @date 2023-09-23 08:02:17 * @since 2.4.0 */ -@Slf4j public abstract class AbstractTaskExecutor implements TaskExecutor, InitializingBean { + protected final RetryIdempotentStrategyHandler idempotentStrategy = IdempotentHolder.getRetryIdempotent(); - @Autowired - @Qualifier("retryIdempotentStrategyHandler") - protected IdempotentStrategy, Long> idempotentStrategy; @Autowired protected SystemProperties systemProperties; @Autowired @@ -59,7 +56,7 @@ public abstract class AbstractTaskExecutor implements TaskExecutor, Initializing Pair pair = executor.filter(); if (!pair.getKey()) { RetryTask retryTask = retryContext.getRetryTask(); - log.warn("当前任务不满足执行条件. groupName:[{}] uniqueId:[{}], description:[{}]", + SnailJobLog.LOCAL.warn("当前任务不满足执行条件. groupName:[{}] uniqueId:[{}], description:[{}]", retryTask.getGroupName(), retryTask.getUniqueId(), pair.getValue().toString()); @@ -83,7 +80,7 @@ public abstract class AbstractTaskExecutor implements TaskExecutor, Initializing String groupName = retryTask.getGroupName(); String namespaceId = retryTask.getNamespaceId(); Long retryId = retryExecutor.getRetryContext().getRetryTask().getId(); - idempotentStrategy.set(Pair.of(groupName, namespaceId), retryId); + idempotentStrategy.set(ImmutableTriple.of(groupName, namespaceId, retryId).toString()); ActorRef actorRef = getActorRef(); actorRef.tell(retryExecutor, actorRef); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/idempotent/IdempotentHolder.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/idempotent/IdempotentHolder.java new file mode 100644 index 00000000..a5c3e92f --- /dev/null +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/idempotent/IdempotentHolder.java @@ -0,0 +1,25 @@ +package com.aizuda.snailjob.server.retry.task.support.idempotent; + +/** + * @author: opensnail + * @date : 2024-05-23 + * @since : sj_1.0.0 + */ +public class IdempotentHolder { + + private IdempotentHolder() { + } + + public static RetryIdempotentStrategyHandler getRetryIdempotent() { + return SingletonHolder.RETRY_IDEMPOTENT_INSTANCE; + } + + public static TimerIdempotent getTimerIdempotent() { + return SingletonHolder.TIMER_IDEMPOTENT; + } + + private static class SingletonHolder { + private static final RetryIdempotentStrategyHandler RETRY_IDEMPOTENT_INSTANCE = new RetryIdempotentStrategyHandler(); + private static final TimerIdempotent TIMER_IDEMPOTENT = new TimerIdempotent(); + } +} diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/idempotent/RetryIdempotentStrategyHandler.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/idempotent/RetryIdempotentStrategyHandler.java index 0ec949e6..93f49c5b 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/idempotent/RetryIdempotentStrategyHandler.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/idempotent/RetryIdempotentStrategyHandler.java @@ -1,13 +1,10 @@ package com.aizuda.snailjob.server.retry.task.support.idempotent; -import cn.hutool.core.lang.Pair; import com.aizuda.snailjob.server.common.IdempotentStrategy; -import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import org.springframework.stereotype.Component; -import java.text.MessageFormat; import java.util.concurrent.TimeUnit; /** @@ -17,10 +14,9 @@ import java.util.concurrent.TimeUnit; * @date : 2021-11-23 09:26 */ @Component -public class RetryIdempotentStrategyHandler implements IdempotentStrategy, Long> { - private static final String KEY_FORMAT = "{0}_{1}_{2}"; +public class RetryIdempotentStrategyHandler implements IdempotentStrategy { - private static final Cache cache; + private static final Cache cache; static { cache = CacheBuilder.newBuilder() @@ -30,28 +26,21 @@ public class RetryIdempotentStrategyHandler implements IdempotentStrategy pair, Long value) { - cache.put(getKey(pair, value), value); + public boolean set(String key) { + cache.put(key, key); return Boolean.TRUE; } + @Override - public Long get(Pair pair) { - throw new SnailJobServerException("不支持的操作"); + public boolean isExist(String key) { + return cache.asMap().containsKey(key); } @Override - public boolean isExist(Pair pair, Long value) { - return cache.asMap().containsKey(getKey(pair, value)); - } - - @Override - public boolean clear(Pair pair, Long value) { - cache.invalidate(getKey(pair, value)); + public boolean clear(String key) { + cache.invalidate(key); return Boolean.TRUE; } - private static String getKey(Pair pair, final Long value) { - return MessageFormat.format(KEY_FORMAT, pair.getKey(), pair.getValue(), value); - } } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/idempotent/TimerIdempotent.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/idempotent/TimerIdempotent.java index 0fbf0eeb..15526cf2 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/idempotent/TimerIdempotent.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/idempotent/TimerIdempotent.java @@ -1,12 +1,10 @@ package com.aizuda.snailjob.server.retry.task.support.idempotent; -import cn.hutool.core.lang.Pair; import com.aizuda.snailjob.server.common.IdempotentStrategy; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import lombok.extern.slf4j.Slf4j; -import java.text.MessageFormat; import java.util.concurrent.TimeUnit; /** @@ -14,10 +12,8 @@ import java.util.concurrent.TimeUnit; * @date 2023-10-19 21:54:57 * @since 2.4.0 */ -@Slf4j -public class TimerIdempotent implements IdempotentStrategy, String> { +public class TimerIdempotent implements IdempotentStrategy { - private static final String KEY_FORMAT = "{0}_{1}_{2}"; private static final Cache cache; static { @@ -28,28 +24,19 @@ public class TimerIdempotent implements IdempotentStrategy pair, String value) { - cache.put(getKey(pair, value), value); + public boolean set(String key) { + cache.put(key, key); return Boolean.TRUE; } - private static String getKey(Pair pair, final String value) { - return MessageFormat.format(KEY_FORMAT, pair.getKey(), pair.getValue(), value); + @Override + public boolean isExist(String key) { + return cache.asMap().containsKey(key); } @Override - public String get(Pair pair) { - throw new UnsupportedOperationException("不支持此操作"); - } - - @Override - public boolean isExist(Pair pair, String value) { - return cache.asMap().containsKey(getKey(pair, value)); - } - - @Override - public boolean clear(Pair pair, String value) { - cache.invalidate(getKey(pair, value)); + public boolean clear(String key) { + cache.invalidate(key); return Boolean.TRUE; } } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/strategy/FilterStrategies.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/strategy/FilterStrategies.java index e77d41aa..b2e977dc 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/strategy/FilterStrategies.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/strategy/FilterStrategies.java @@ -7,6 +7,7 @@ import com.aizuda.snailjob.server.common.IdempotentStrategy; import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; import com.aizuda.snailjob.server.common.dto.DistributeInstance; import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; +import com.aizuda.snailjob.server.common.triple.ImmutableTriple; import com.aizuda.snailjob.server.retry.task.support.FilterStrategy; import com.aizuda.snailjob.server.retry.task.support.RetryContext; import com.aizuda.snailjob.server.retry.task.support.cache.CacheGroupRateLimiter; @@ -49,7 +50,7 @@ public class FilterStrategies { * * @return {@link BitSetIdempotentFilterStrategies} BitSet幂等的过滤策略 */ - public static FilterStrategy bitSetIdempotentFilter(IdempotentStrategy, Long> idempotentStrategy) { + public static FilterStrategy bitSetIdempotentFilter(IdempotentStrategy idempotentStrategy) { return new BitSetIdempotentFilterStrategies(idempotentStrategy); } @@ -123,17 +124,16 @@ public class FilterStrategies { */ private static final class BitSetIdempotentFilterStrategies implements FilterStrategy { - private IdempotentStrategy, Long> idempotentStrategy; + private final IdempotentStrategy idempotentStrategy; - public BitSetIdempotentFilterStrategies(IdempotentStrategy, Long> idempotentStrategy) { + public BitSetIdempotentFilterStrategies(IdempotentStrategy idempotentStrategy) { this.idempotentStrategy = idempotentStrategy; } @Override public Pair filter(RetryContext retryContext) { RetryTask retryTask = retryContext.getRetryTask(); - - boolean result = !idempotentStrategy.isExist(Pair.of(retryTask.getGroupName(), retryTask.getNamespaceId()), retryTask.getId()); + boolean result = !idempotentStrategy.isExist(ImmutableTriple.of(retryTask.getGroupName(), retryTask.getNamespaceId(), retryTask.getId()).toString()); StringBuilder description = new StringBuilder(); if (!result) { description.append(MessageFormat.format("存在执行中的任务.uniqueId:[{0}]", retryTask.getUniqueId())); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/AbstractTimerTask.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/AbstractTimerTask.java index 1e30078a..a7bd67ef 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/AbstractTimerTask.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/AbstractTimerTask.java @@ -1,8 +1,8 @@ package com.aizuda.snailjob.server.retry.task.support.timer; import cn.hutool.core.lang.Pair; +import com.aizuda.snailjob.server.common.TimerTask; import io.netty.util.Timeout; -import io.netty.util.TimerTask; import lombok.extern.slf4j.Slf4j; import java.time.LocalDateTime; @@ -13,7 +13,7 @@ import java.time.LocalDateTime; * @since 2.4.0 */ @Slf4j -public abstract class AbstractTimerTask implements TimerTask { +public abstract class AbstractTimerTask implements TimerTask { protected String groupName; protected String uniqueId; @@ -30,7 +30,7 @@ public abstract class AbstractTimerTask implements TimerTask { log.error("重试任务执行失败 groupName:[{}] uniqueId:[{}] namespaceId:[{}]", groupName, uniqueId, namespaceId, e); } finally { // 先清除时间轮的缓存 - RetryTimerWheel.clearCache(Pair.of(groupName, namespaceId), uniqueId); + RetryTimerWheel.clearCache(idempotentKey()); } } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/CallbackTimerTask.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/CallbackTimerTask.java index 0f6ba1da..bd0bf8aa 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/CallbackTimerTask.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/CallbackTimerTask.java @@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.retry.task.support.timer; import com.aizuda.snailjob.common.core.context.SpringContext; import com.aizuda.snailjob.common.core.enums.RetryStatusEnum; +import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.retry.task.support.dispatch.task.TaskActuatorFactory; import com.aizuda.snailjob.server.retry.task.support.dispatch.task.TaskExecutor; import com.aizuda.snailjob.template.datasource.access.AccessTemplate; @@ -9,8 +10,8 @@ import com.aizuda.snailjob.template.datasource.access.TaskAccess; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import io.netty.util.Timeout; -import lombok.extern.slf4j.Slf4j; +import java.text.MessageFormat; import java.time.LocalDateTime; import java.util.Objects; @@ -18,10 +19,10 @@ import java.util.Objects; * @author: opensnail * @date : 2023-09-22 17:09 */ -@Slf4j public class CallbackTimerTask extends AbstractTimerTask { + public static final String IDEMPOTENT_KEY_PREFIX = "callback_{0}_{1}_{2}"; - private RetryTimerContext context; + private final RetryTimerContext context; public CallbackTimerTask(RetryTimerContext context) { this.context = context; @@ -32,7 +33,7 @@ public class CallbackTimerTask extends AbstractTimerTask { @Override protected void doRun(final Timeout timeout) { - log.debug("回调任务执行 {}", LocalDateTime.now()); + SnailJobLog.LOCAL.debug("回调任务执行 {}", LocalDateTime.now()); AccessTemplate accessTemplate = SpringContext.getBeanByType(AccessTemplate.class); TaskAccess retryTaskAccess = accessTemplate.getRetryTaskAccess(); RetryTask retryTask = retryTaskAccess.one(context.getGroupName(), context.getNamespaceId(), @@ -48,4 +49,8 @@ public class CallbackTimerTask extends AbstractTimerTask { taskExecutor.actuator(retryTask); } + @Override + public String idempotentKey() { + return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, context.getGroupName(), context.getNamespaceId(), context.getUniqueId()); + } } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/RetryTimerTask.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/RetryTimerTask.java index 3a2031c6..8304e687 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/RetryTimerTask.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/RetryTimerTask.java @@ -9,20 +9,18 @@ import com.aizuda.snailjob.template.datasource.access.TaskAccess; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import io.netty.util.Timeout; -import lombok.Data; -import lombok.extern.slf4j.Slf4j; +import java.text.MessageFormat; import java.util.Objects; /** * @author: opensnail * @date : 2023-09-22 17:09 */ -@Data -@Slf4j public class RetryTimerTask extends AbstractTimerTask { + public static final String IDEMPOTENT_KEY_PREFIX = "retry_{0}_{1}_{2}"; - private RetryTimerContext context; + private final RetryTimerContext context; public RetryTimerTask(RetryTimerContext context) { this.context = context; @@ -47,4 +45,9 @@ public class RetryTimerTask extends AbstractTimerTask { TaskExecutor taskExecutor = TaskActuatorFactory.getTaskActuator(context.getScene()); taskExecutor.actuator(retryTask); } + + @Override + public String idempotentKey() { + return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, context.getGroupName(), context.getNamespaceId(), context.getUniqueId()); + } } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/RetryTimerWheel.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/RetryTimerWheel.java index f50e6060..8309dd97 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/RetryTimerWheel.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/RetryTimerWheel.java @@ -1,15 +1,15 @@ package com.aizuda.snailjob.server.retry.task.support.timer; -import cn.hutool.core.lang.Pair; import com.aizuda.snailjob.common.log.SnailJobLog; -import com.aizuda.snailjob.server.common.Lifecycle; +import com.aizuda.snailjob.server.retry.task.support.idempotent.IdempotentHolder; import com.aizuda.snailjob.server.retry.task.support.idempotent.TimerIdempotent; import io.netty.util.HashedWheelTimer; import io.netty.util.TimerTask; -import lombok.extern.slf4j.Slf4j; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; -import org.springframework.stereotype.Component; +import java.time.Duration; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -18,9 +18,8 @@ import java.util.concurrent.TimeUnit; * @author: opensnail * @date : 2023-09-22 17:03 */ -@Component -@Slf4j -public class RetryTimerWheel implements Lifecycle { +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class RetryTimerWheel { private static final int TICK_DURATION = 500; private static final String THREAD_NAME_PREFIX = "retry-task-timer-wheel-"; @@ -29,39 +28,33 @@ public class RetryTimerWheel implements Lifecycle { new ThreadPoolExecutor(16, 16, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new CustomizableThreadFactory(THREAD_NAME_PREFIX)); - private static final TimerIdempotent idempotent = new TimerIdempotent(); + private static final TimerIdempotent idempotent = IdempotentHolder.getTimerIdempotent(); - @Override - public void start() { + static { timer = new HashedWheelTimer( - new CustomizableThreadFactory(THREAD_NAME_PREFIX), TICK_DURATION, TimeUnit.MILLISECONDS, 512, - true, -1, executor); + new CustomizableThreadFactory(THREAD_NAME_PREFIX), TICK_DURATION, TimeUnit.MILLISECONDS, 512, + true, -1, executor); timer.start(); } - public static void register(Pair pair, String uniqueId, TimerTask task, long delay, TimeUnit unit) { + public static void register(String idempotentKey, TimerTask task, Duration delay) { - if (!isExisted(pair, uniqueId)) { - delay = delay < 0 ? 0 : delay; + if (!isExisted(idempotentKey)) { try { - timer.newTimeout(task, delay, unit); - idempotent.set(pair, uniqueId); + timer.newTimeout(task, Math.max(delay.toMillis(), 0), TimeUnit.MILLISECONDS); + idempotent.set(idempotentKey); } catch (Exception e) { - SnailJobLog.LOCAL.error("加入时间轮失败. uniqueId:[{}]", uniqueId, e); + SnailJobLog.LOCAL.error("加入时间轮失败. uniqueId:[{}]", idempotentKey, e); } } } - public static boolean isExisted(Pair pair, String uniqueId) { - return idempotent.isExist(pair, uniqueId); + public static boolean isExisted(String idempotentKey) { + return idempotent.isExist(idempotentKey); } - public static void clearCache(Pair pair, String uniqueId) { - idempotent.clear(pair, uniqueId); + public static void clearCache(String idempotentKey) { + idempotent.clear(idempotentKey); } - @Override - public void close() { - timer.stop(); - } }