From c20ccd618c7fdc4ab504e697a90dafb2706bc495 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Sun, 29 Oct 2023 00:00:24 +0800 Subject: [PATCH] =?UTF-8?q?feat:2.4.0=201.=20=E4=BF=AE=E5=A4=8D=E5=B9=B6?= =?UTF-8?q?=E8=A1=8C=E9=87=8D=E5=A4=8D=E4=BA=A7=E7=94=9F=E6=96=B0=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E9=97=AE=E9=A2=98=202.=20=E4=BF=AE=E5=A4=8D=E5=AE=A2?= =?UTF-8?q?=E6=88=B7=E7=AB=AF=E7=BB=A7=E6=89=BF=E7=9A=84=E6=A8=A1=E5=BC=8F?= =?UTF-8?q?=E6=97=B6=E6=8A=A5=E9=94=99=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../retry/client/job/core/client/JobEndPoint.java | 2 +- .../job/core/register/scan/JobExecutorScanner.java | 2 +- .../job/task/support/dispatch/JobExecutorActor.java | 12 +++++++++++- .../generator/batch/JobTaskBatchGenerator.java | 3 ++- .../job/task/support/idempotent/TimerIdempotent.java | 3 --- .../server/job/task/support/timer/JobTimerTask.java | 3 --- .../task/support/idempotent/TimerIdempotent.java | 5 +++++ .../easy-retry-server-starter/Dockerfile | 2 +- .../server/web/service/impl/JobServiceImpl.java | 8 ++++++-- 9 files changed, 27 insertions(+), 13 deletions(-) diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java index b8bfd5be3..640cf1bf8 100644 --- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java +++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java @@ -45,7 +45,7 @@ public class JobEndPoint { // 选择执行器 Object executor = jobExecutorInfo.getExecutor(); IJobExecutor jobExecutor; - if (executor.getClass().isAssignableFrom(IJobExecutor.class)) { + if (IJobExecutor.class.isAssignableFrom(executor.getClass())) { jobExecutor = (AbstractJobExecutor) executor; } else { jobExecutor = SpringContext.getBeanByType(AnnotationJobExecutor.class); diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/register/scan/JobExecutorScanner.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/register/scan/JobExecutorScanner.java index a08648a1f..1313afff2 100644 --- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/register/scan/JobExecutorScanner.java +++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/register/scan/JobExecutorScanner.java @@ -58,7 +58,7 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware { String executorClassName = bean.getClass().getName(); // 通过实现接口进行注册 - if (bean.getClass().isAssignableFrom(IJobExecutor.class)) { + if (IJobExecutor.class.isAssignableFrom(bean.getClass())) { if (!JobExecutorInfoCache.isExisted(executorClassName)) { retryerInfoList.add(new JobExecutorInfo(executorClassName, ReflectionUtils.findMethod(bean.getClass(), "jobExecute"), bean)); } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java index f846e6237..5af6cf441 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java @@ -33,6 +33,8 @@ import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.transaction.support.TransactionTemplate; import org.springframework.util.CollectionUtils; @@ -97,7 +99,15 @@ public class JobExecutorActor extends AbstractActor { context.setJobId(job.getId()); jobExecutor.execute(context); } finally { - doHandlerResidentTask(job, taskExecute); + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCommit() { + // 清除时间轮的缓存 + JobTimerWheel.clearCache(taskExecute.getTaskBatchId()); + //方法内容 + doHandlerResidentTask(job, taskExecute); + } + }); } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java index 5b7e47eed..fa33bfbd8 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java @@ -16,6 +16,7 @@ import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; +import java.time.LocalDateTime; import java.time.ZoneId; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -39,7 +40,7 @@ public class JobTaskBatchGenerator { JobTaskBatch jobTaskBatch = new JobTaskBatch(); jobTaskBatch.setJobId(context.getJobId()); jobTaskBatch.setGroupName(context.getGroupName()); - + jobTaskBatch.setCreateDt(LocalDateTime.now()); // 无执行的节点 if (CollectionUtils.isEmpty(CacheRegisterTable.getServerNodeSet(context.getGroupName()))) { jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus()); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/idempotent/TimerIdempotent.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/idempotent/TimerIdempotent.java index 0aa84d8a6..e4a990be3 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/idempotent/TimerIdempotent.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/idempotent/TimerIdempotent.java @@ -1,10 +1,7 @@ package com.aizuda.easy.retry.server.job.task.support.idempotent; import com.aizuda.easy.retry.server.common.IdempotentStrategy; -import scala.collection.immutable.Stream; -import java.util.HashSet; -import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; /** diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java index b7df725c7..58c33f3c3 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java @@ -28,9 +28,6 @@ public class JobTimerTask implements TimerTask { log.info("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobTimerTaskDTO.getTaskBatchId()); try { - // 清除时间轮的缓存 - JobTimerWheel.clearCache(jobTimerTaskDTO.getTaskBatchId()); - TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO(); taskExecuteDTO.setTaskBatchId(jobTimerTaskDTO.getTaskBatchId()); taskExecuteDTO.setGroupName(jobTimerTaskDTO.getGroupName()); diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/TimerIdempotent.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/TimerIdempotent.java index 381f68cee..431e53ad5 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/TimerIdempotent.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/TimerIdempotent.java @@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.retry.task.support.idempotent; import cn.hutool.core.util.StrUtil; import com.aizuda.easy.retry.server.common.IdempotentStrategy; +import lombok.extern.slf4j.Slf4j; import java.util.HashSet; import java.util.Set; @@ -12,6 +13,7 @@ import java.util.concurrent.CopyOnWriteArraySet; * @date 2023-10-19 21:54:57 * @since 2.4.0 */ +@Slf4j public class TimerIdempotent implements IdempotentStrategy { private static final CopyOnWriteArraySet cache = new CopyOnWriteArraySet<>(); @@ -28,6 +30,9 @@ public class TimerIdempotent implements IdempotentStrategy { @Override public boolean isExist(String key, String value) { + if (key == null || value == null) { + log.error("异常监控. key:[{}] value:[{}]", key, value); + } return cache.contains(key.concat(StrUtil.UNDERLINE).concat(String.valueOf(value))); } diff --git a/easy-retry-server/easy-retry-server-starter/Dockerfile b/easy-retry-server/easy-retry-server-starter/Dockerfile index 5f89619a2..eee3addb5 100644 --- a/easy-retry-server/easy-retry-server-starter/Dockerfile +++ b/easy-retry-server/easy-retry-server-starter/Dockerfile @@ -2,7 +2,7 @@ FROM openjdk:8-jdk-alpine MAINTAINER www.byteblogs.com LABEL server-name=easy-retry-server -ADD ../target/easy-retry-server.jar easy-retry-server.jar +ADD ./target/easy-retry-server.jar easy-retry-server.jar #对外暴漏的端口号 EXPOSE 8080 diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java index 777c3ec80..55564a6f0 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java @@ -1,9 +1,11 @@ package com.aizuda.easy.retry.server.web.service.impl; import cn.hutool.core.lang.Assert; +import cn.hutool.core.util.HashUtil; import cn.hutool.core.util.StrUtil; import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.util.CronExpression; +import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.job.task.support.WaitStrategy; import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies.WaitStrategyContext; @@ -43,7 +45,8 @@ import java.util.Objects; public class JobServiceImpl implements JobService { private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - + @Autowired + private SystemProperties systemProperties; @Autowired private JobMapper jobMapper; @@ -107,7 +110,7 @@ public class JobServiceImpl implements JobService { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() .select(Job::getId, Job::getJobName); if (StrUtil.isNotBlank(keywords)) { - queryWrapper.like(Job::getJobName, keywords.trim() + "%"); + queryWrapper.like(Job::getJobName, keywords.trim() + "%"); } if (Objects.nonNull(jobId)) { @@ -130,6 +133,7 @@ public class JobServiceImpl implements JobService { // 判断常驻任务 Job job = updateJobResident(jobRequestVO); + job.setBucketIndex(HashUtil.bkdrHash(jobRequestVO.getGroupName() + jobRequestVO.getJobName()) % systemProperties.getBucketTotal()); job.setNextTriggerAt(waitStrategy.computeRetryTime(waitStrategyContext)); return 1 == jobMapper.insert(job); }