diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java index b8bfd5be..640cf1bf 100644 --- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java +++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java @@ -45,7 +45,7 @@ public class JobEndPoint { // 选择执行器 Object executor = jobExecutorInfo.getExecutor(); IJobExecutor jobExecutor; - if (executor.getClass().isAssignableFrom(IJobExecutor.class)) { + if (IJobExecutor.class.isAssignableFrom(executor.getClass())) { jobExecutor = (AbstractJobExecutor) executor; } else { jobExecutor = SpringContext.getBeanByType(AnnotationJobExecutor.class); diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/register/scan/JobExecutorScanner.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/register/scan/JobExecutorScanner.java index a08648a1..1313afff 100644 --- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/register/scan/JobExecutorScanner.java +++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/register/scan/JobExecutorScanner.java @@ -58,7 +58,7 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware { String executorClassName = bean.getClass().getName(); // 通过实现接口进行注册 - if (bean.getClass().isAssignableFrom(IJobExecutor.class)) { + if (IJobExecutor.class.isAssignableFrom(bean.getClass())) { if (!JobExecutorInfoCache.isExisted(executorClassName)) { retryerInfoList.add(new JobExecutorInfo(executorClassName, ReflectionUtils.findMethod(bean.getClass(), "jobExecute"), bean)); } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java index f846e623..5af6cf44 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java @@ -33,6 +33,8 @@ import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.transaction.support.TransactionTemplate; import org.springframework.util.CollectionUtils; @@ -97,7 +99,15 @@ public class JobExecutorActor extends AbstractActor { context.setJobId(job.getId()); jobExecutor.execute(context); } finally { - doHandlerResidentTask(job, taskExecute); + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCommit() { + // 清除时间轮的缓存 + JobTimerWheel.clearCache(taskExecute.getTaskBatchId()); + //方法内容 + doHandlerResidentTask(job, taskExecute); + } + }); } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java index 5b7e47ee..fa33bfbd 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java @@ -16,6 +16,7 @@ import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; +import java.time.LocalDateTime; import java.time.ZoneId; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -39,7 +40,7 @@ public class JobTaskBatchGenerator { JobTaskBatch jobTaskBatch = new JobTaskBatch(); jobTaskBatch.setJobId(context.getJobId()); jobTaskBatch.setGroupName(context.getGroupName()); - + jobTaskBatch.setCreateDt(LocalDateTime.now()); // 无执行的节点 if (CollectionUtils.isEmpty(CacheRegisterTable.getServerNodeSet(context.getGroupName()))) { jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus()); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/idempotent/TimerIdempotent.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/idempotent/TimerIdempotent.java index 0aa84d8a..e4a990be 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/idempotent/TimerIdempotent.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/idempotent/TimerIdempotent.java @@ -1,10 +1,7 @@ package com.aizuda.easy.retry.server.job.task.support.idempotent; import com.aizuda.easy.retry.server.common.IdempotentStrategy; -import scala.collection.immutable.Stream; -import java.util.HashSet; -import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; /** diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java index b7df725c..58c33f3c 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java @@ -28,9 +28,6 @@ public class JobTimerTask implements TimerTask { log.info("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobTimerTaskDTO.getTaskBatchId()); try { - // 清除时间轮的缓存 - JobTimerWheel.clearCache(jobTimerTaskDTO.getTaskBatchId()); - TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO(); taskExecuteDTO.setTaskBatchId(jobTimerTaskDTO.getTaskBatchId()); taskExecuteDTO.setGroupName(jobTimerTaskDTO.getGroupName()); diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/TimerIdempotent.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/TimerIdempotent.java index 381f68ce..431e53ad 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/TimerIdempotent.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/TimerIdempotent.java @@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.retry.task.support.idempotent; import cn.hutool.core.util.StrUtil; import com.aizuda.easy.retry.server.common.IdempotentStrategy; +import lombok.extern.slf4j.Slf4j; import java.util.HashSet; import java.util.Set; @@ -12,6 +13,7 @@ import java.util.concurrent.CopyOnWriteArraySet; * @date 2023-10-19 21:54:57 * @since 2.4.0 */ +@Slf4j public class TimerIdempotent implements IdempotentStrategy { private static final CopyOnWriteArraySet cache = new CopyOnWriteArraySet<>(); @@ -28,6 +30,9 @@ public class TimerIdempotent implements IdempotentStrategy { @Override public boolean isExist(String key, String value) { + if (key == null || value == null) { + log.error("异常监控. key:[{}] value:[{}]", key, value); + } return cache.contains(key.concat(StrUtil.UNDERLINE).concat(String.valueOf(value))); } diff --git a/easy-retry-server/easy-retry-server-starter/Dockerfile b/easy-retry-server/easy-retry-server-starter/Dockerfile index 5f89619a..eee3addb 100644 --- a/easy-retry-server/easy-retry-server-starter/Dockerfile +++ b/easy-retry-server/easy-retry-server-starter/Dockerfile @@ -2,7 +2,7 @@ FROM openjdk:8-jdk-alpine MAINTAINER www.byteblogs.com LABEL server-name=easy-retry-server -ADD ../target/easy-retry-server.jar easy-retry-server.jar +ADD ./target/easy-retry-server.jar easy-retry-server.jar #对外暴漏的端口号 EXPOSE 8080 diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java index 777c3ec8..55564a6f 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java @@ -1,9 +1,11 @@ package com.aizuda.easy.retry.server.web.service.impl; import cn.hutool.core.lang.Assert; +import cn.hutool.core.util.HashUtil; import cn.hutool.core.util.StrUtil; import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.util.CronExpression; +import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.job.task.support.WaitStrategy; import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies.WaitStrategyContext; @@ -43,7 +45,8 @@ import java.util.Objects; public class JobServiceImpl implements JobService { private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - + @Autowired + private SystemProperties systemProperties; @Autowired private JobMapper jobMapper; @@ -107,7 +110,7 @@ public class JobServiceImpl implements JobService { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() .select(Job::getId, Job::getJobName); if (StrUtil.isNotBlank(keywords)) { - queryWrapper.like(Job::getJobName, keywords.trim() + "%"); + queryWrapper.like(Job::getJobName, keywords.trim() + "%"); } if (Objects.nonNull(jobId)) { @@ -130,6 +133,7 @@ public class JobServiceImpl implements JobService { // 判断常驻任务 Job job = updateJobResident(jobRequestVO); + job.setBucketIndex(HashUtil.bkdrHash(jobRequestVO.getGroupName() + jobRequestVO.getJobName()) % systemProperties.getBucketTotal()); job.setNextTriggerAt(waitStrategy.computeRetryTime(waitStrategyContext)); return 1 == jobMapper.insert(job); }