From a5ddbab3600b9dcb6977f7e713f47bb9febd48d1 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Thu, 19 Oct 2023 23:54:29 +0800 Subject: [PATCH] =?UTF-8?q?feat:2.4.0=201.=20=E6=96=B0=E5=A2=9E=E7=B3=BB?= =?UTF-8?q?=E7=BB=9F=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../server/common}/IdempotentStrategy.java | 2 +- .../common/config/SystemProperties.java | 9 ++ .../server/common/enums/SystemModeEnum.java | 15 +++ .../batch/JobTaskBatchGenerator.java | 4 +- .../support/idempotent/TimerIdempotent.java | 37 +++++++ .../prepare/WaitJobPrepareHandler.java | 6 +- .../job/task/support/timer/JobTimerTask.java | 2 +- .../job/task/support/timer/JobTimerWheel.java | 65 +++++++++++ .../support/timer/JobTimerWheelHandler.java | 101 ------------------ .../actor/exec/ExecCallbackUnitActor.java | 2 +- .../dispatch/actor/exec/ExecUnitActor.java | 4 +- .../actor/scan/AbstractScanGroup.java | 2 +- .../dispatch/task/AbstractTaskExecutor.java | 2 +- .../BitSetIdempotentStrategyHandler.java | 2 +- .../support/strategy/FilterStrategies.java | 2 +- .../server/dispatch/ConsumerBucketActor.java | 49 +++++---- .../src/main/resources/application.yml | 1 + .../server/ConfigVersionSyncHandlerTest.java | 19 ++-- 18 files changed, 176 insertions(+), 148 deletions(-) rename easy-retry-server/{easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support => easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common}/IdempotentStrategy.java (81%) create mode 100644 easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/SystemModeEnum.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/idempotent/TimerIdempotent.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerWheel.java delete mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerWheelHandler.java diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/IdempotentStrategy.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/IdempotentStrategy.java similarity index 81% rename from easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/IdempotentStrategy.java rename to easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/IdempotentStrategy.java index 69766f1f..8b1a5392 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/IdempotentStrategy.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/IdempotentStrategy.java @@ -1,4 +1,4 @@ -package com.aizuda.easy.retry.server.retry.task.support; +package com.aizuda.easy.retry.server.common; /** * 幂等策略 diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java index 86c551ef..0928a9f5 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java @@ -1,5 +1,6 @@ package com.aizuda.easy.retry.server.common.config; +import com.aizuda.easy.retry.server.common.enums.SystemModeEnum; import com.aizuda.easy.retry.template.datasource.enums.DbTypeEnum; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -71,6 +72,14 @@ public class SystemProperties { */ private Callback callback = new Callback(); + /** + * 系统模式: + * RETRY: 分布式重试重 + * JOB: 分布式定时任务 + * ALL: 分布式重试重 && 分布式定时任务 + */ + private SystemModeEnum mode = SystemModeEnum.ALL; + /** * 回调配置 */ diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/SystemModeEnum.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/SystemModeEnum.java new file mode 100644 index 00000000..86915216 --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/SystemModeEnum.java @@ -0,0 +1,15 @@ +package com.aizuda.easy.retry.server.common.enums; + +/** + * 系统模式: 分布式重试重试、分布式定时任务 + * + * @author www.byteblogs.com + * @date 2023-10-19 22:04:38 + * @since 2.4.0 + */ +public enum SystemModeEnum { + + RETRY, + JOB, + ALL +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java index a12f2c5e..391ead6c 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java @@ -7,7 +7,7 @@ import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerTask; -import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheelHandler; +import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheel; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; import lombok.extern.slf4j.Slf4j; @@ -58,7 +58,7 @@ public class JobTaskBatchGenerator { jobTimerTaskDTO.setTaskBatchId(jobTaskBatch.getId()); jobTimerTaskDTO.setGroupName(context.getGroupName()); jobTimerTaskDTO.setJobId(context.getJobId()); - JobTimerWheelHandler.register(context.getGroupName(), jobTaskBatch.getId(), + JobTimerWheel.register(jobTaskBatch.getId(), new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS); } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/idempotent/TimerIdempotent.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/idempotent/TimerIdempotent.java new file mode 100644 index 00000000..afe0a146 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/idempotent/TimerIdempotent.java @@ -0,0 +1,37 @@ +package com.aizuda.easy.retry.server.job.task.support.idempotent; + +import com.aizuda.easy.retry.server.common.IdempotentStrategy; + +import java.util.HashSet; +import java.util.Set; + +/** + * @author www.byteblogs.com + * @date 2023-10-19 21:54:57 + * @since 2.4.0 + */ +public class TimerIdempotent implements IdempotentStrategy { + + private static final Set cache = new HashSet<>(); + + @Override + public boolean set(Long key, Long value) { + return cache.add(key); + } + + @Override + public Long get(Long s) { + throw new UnsupportedOperationException("不支持此操作"); + } + + @Override + public boolean isExist(Long key, Long value) { + return cache.contains(key); + } + + @Override + public boolean clear(Long key, Long value) { + cache.clear(); + return Boolean.TRUE; + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/WaitJobPrepareHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/WaitJobPrepareHandler.java index 630492f0..412d9729 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/WaitJobPrepareHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/WaitJobPrepareHandler.java @@ -3,7 +3,7 @@ package com.aizuda.easy.retry.server.job.task.support.prepare; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO; -import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheelHandler; +import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheel; import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerTask; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -32,7 +32,7 @@ public class WaitJobPrepareHandler extends AbstractJobPrePareHandler { log.info("存在待处理任务. taskBatchId:[{}]", jobPrepareDTO.getTaskBatchId()); // 若时间轮中数据不存在则重新加入 - if (!JobTimerWheelHandler.isExisted(jobPrepareDTO.getGroupName(), jobPrepareDTO.getTaskBatchId())) { + if (!JobTimerWheel.isExisted(jobPrepareDTO.getTaskBatchId())) { // 进入时间轮 long delay = jobPrepareDTO.getNextTriggerAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() @@ -42,7 +42,7 @@ public class WaitJobPrepareHandler extends AbstractJobPrePareHandler { jobTimerTaskDTO.setJobId(jobPrepareDTO.getJobId()); jobTimerTaskDTO.setGroupName(jobPrepareDTO.getGroupName()); - JobTimerWheelHandler.register(jobPrepareDTO.getGroupName(), jobPrepareDTO.getTaskBatchId(), + JobTimerWheel.register(jobPrepareDTO.getTaskBatchId(), new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS); } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java index f01a7362..5b8e80b5 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java @@ -49,7 +49,7 @@ public class JobTimerTask implements TimerTask { try { // 清除时间轮的缓存 - JobTimerWheelHandler.clearCache(jobTimerTaskDTO.getGroupName(), jobTimerTaskDTO.getTaskBatchId()); + JobTimerWheel.clearCache(jobTimerTaskDTO.getTaskBatchId()); JobMapper jobMapper = SpringContext.getBeanByType(JobMapper.class); Job job = jobMapper.selectOne(new LambdaQueryWrapper() diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerWheel.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerWheel.java new file mode 100644 index 00000000..8039df28 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerWheel.java @@ -0,0 +1,65 @@ +package com.aizuda.easy.retry.server.job.task.support.timer; + +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.common.Lifecycle; +import com.aizuda.easy.retry.server.job.task.support.idempotent.TimerIdempotent; +import io.netty.util.HashedWheelTimer; +import io.netty.util.TimerTask; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.concurrent.CustomizableThreadFactory; +import org.springframework.stereotype.Component; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * @author: www.byteblogs.com + * @date : 2023-09-22 17:03 + * @since : 2.4.0 + */ +@Component +@Slf4j +public class JobTimerWheel implements Lifecycle { + + private static final int TICK_DURATION = 100; + private static final String THREAD_NAME_PREFIX = "job-task-timer-wheel-"; + private static HashedWheelTimer timer = null; + + private static final TimerIdempotent idempotent = new TimerIdempotent(); + + @Override + public void start() { + timer = new HashedWheelTimer( + new CustomizableThreadFactory(THREAD_NAME_PREFIX), TICK_DURATION, + TimeUnit.MILLISECONDS); + timer.start(); + } + + public static void register(Long uniqueId, TimerTask task, long delay, TimeUnit unit) { + + if (!isExisted(uniqueId)) { + delay = delay < 0 ? 0 : delay; + log.info("加入时间轮. delay:[{}ms] uniqueId:[{}]", delay, uniqueId); + try { + timer.newTimeout(task, delay, unit); + idempotent.set(uniqueId, uniqueId); + } catch (Exception e) { + LogUtils.error(log, "加入时间轮失败. uniqueId:[{}]", uniqueId, e); + } + } + } + + public static boolean isExisted(Long uniqueId) { + return idempotent.isExist(uniqueId, uniqueId); + } + + public static void clearCache(Long uniqueId) { + idempotent.clear(uniqueId, uniqueId); + } + + @Override + public void close() { + timer.stop(); + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerWheelHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerWheelHandler.java deleted file mode 100644 index b6da4dc6..00000000 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerWheelHandler.java +++ /dev/null @@ -1,101 +0,0 @@ -package com.aizuda.easy.retry.server.job.task.support.timer; - -import cn.hutool.core.util.StrUtil; -import com.aizuda.easy.retry.common.core.log.LogUtils; -import com.aizuda.easy.retry.server.common.Lifecycle; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.hash.BloomFilter; -import com.google.common.hash.Funnels; -import io.netty.util.HashedWheelTimer; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; -import lombok.extern.slf4j.Slf4j; -import org.springframework.scheduling.concurrent.CustomizableThreadFactory; -import org.springframework.stereotype.Component; - -import java.util.Objects; -import java.util.concurrent.TimeUnit; - -/** - * @author: www.byteblogs.com - * @date : 2023-09-22 17:03 - */ -@Component -@Slf4j -public class JobTimerWheelHandler implements Lifecycle { - - private static final int TICK_DURATION = 100; - private static final String THREAD_NAME_REFIX = "job-task-timer-wheel-"; - - private static HashedWheelTimer timer = null; - - private static Cache cache; - - @Override - public void start() { - timer = new HashedWheelTimer( - new CustomizableThreadFactory(THREAD_NAME_REFIX), TICK_DURATION, - TimeUnit.MILLISECONDS); - - timer.start(); - - cache = CacheBuilder.newBuilder() - // 设置并发级别为cpu核心数 - .concurrencyLevel(Runtime.getRuntime().availableProcessors()) - .build(); - } - - public static void register(String groupName, Long taskBatchId, TimerTask task, long delay, TimeUnit unit) { - - if (delay < 0) { - delay = 0; - } - - Timeout timeout = getTimeout(groupName, taskBatchId); - if (Objects.isNull(timeout)) { - try { - log.info("加入时间轮. delay:[{}ms] taskId:[{}]", delay, taskBatchId); - timeout = timer.newTimeout(task, delay, unit); - cache.put(getKey(groupName, taskBatchId), timeout); - } catch (Exception e) { - LogUtils.error(log, "加入时间轮失败. groupName:[{}] uniqueId:[{}]", - groupName, taskBatchId, e); - } - } - - } - - private static String getKey(String groupName, Long uniqueId) { - return groupName.concat(StrUtil.UNDERLINE).concat(uniqueId.toString()); - } - - public static Timeout getTimeout(String groupName, Long uniqueId) { - return cache.getIfPresent(getKey(groupName, uniqueId)); - } - - public static boolean isExisted(String groupName, Long uniqueId) { - return Objects.nonNull(cache.getIfPresent(getKey(groupName, uniqueId))); - } - - public static boolean cancel(String groupName, Long uniqueId) { - String key = getKey(groupName, uniqueId); - Timeout timeout = cache.getIfPresent(key); - if (Objects.isNull(timeout)) { - return false; - } - - cache.invalidate(key); - return timeout.cancel(); - } - - public static void clearCache(String groupName, Long uniqueId) { - cache.invalidate(getKey(groupName, uniqueId)); - } - - @Override - public void close() { - timer.stop(); - cache.invalidateAll(); - } -} diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecCallbackUnitActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecCallbackUnitActor.java index ceefcdeb..b8f55e0d 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecCallbackUnitActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecCallbackUnitActor.java @@ -14,7 +14,7 @@ import com.aizuda.easy.retry.server.common.client.RequestBuilder; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.retry.task.client.RetryRpcClient; -import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy; +import com.aizuda.easy.retry.server.common.IdempotentStrategy; import com.aizuda.easy.retry.server.retry.task.support.context.CallbackRetryContext; import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO; import com.aizuda.easy.retry.server.retry.task.support.handler.CallbackRetryTaskHandler; diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecUnitActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecUnitActor.java index 12ad3ddf..2cc915db 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecUnitActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecUnitActor.java @@ -5,7 +5,6 @@ import akka.actor.ActorRef; import cn.hutool.core.util.StrUtil; import com.aizuda.easy.retry.client.model.DispatchRetryDTO; import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO; -import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.enums.RetryResultStatusEnum; import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; @@ -16,7 +15,7 @@ import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.client.RequestBuilder; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.retry.task.client.RetryRpcClient; -import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy; +import com.aizuda.easy.retry.server.common.IdempotentStrategy; import com.aizuda.easy.retry.server.retry.task.support.context.MaxAttemptsPersistenceRetryContext; import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO; import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor; @@ -27,7 +26,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; -import org.springframework.http.HttpHeaders; import org.springframework.stereotype.Component; import java.util.Objects; diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java index 242e7b3f..53a6623c 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java @@ -4,7 +4,7 @@ import akka.actor.AbstractActor; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.common.config.SystemProperties; -import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy; +import com.aizuda.easy.retry.server.common.IdempotentStrategy; import com.aizuda.easy.retry.server.common.dto.ScanTask; import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler; import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutor; diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/AbstractTaskExecutor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/AbstractTaskExecutor.java index 935a20df..b3d711aa 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/AbstractTaskExecutor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/AbstractTaskExecutor.java @@ -4,7 +4,7 @@ import akka.actor.ActorRef; import cn.hutool.core.lang.Pair; import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler; -import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy; +import com.aizuda.easy.retry.server.common.IdempotentStrategy; import com.aizuda.easy.retry.server.retry.task.support.RetryContext; import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor; import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/BitSetIdempotentStrategyHandler.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/BitSetIdempotentStrategyHandler.java index f2b5b052..819cc0aa 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/BitSetIdempotentStrategyHandler.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/BitSetIdempotentStrategyHandler.java @@ -1,7 +1,7 @@ package com.aizuda.easy.retry.server.retry.task.support.idempotent; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; -import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy; +import com.aizuda.easy.retry.server.common.IdempotentStrategy; import org.springframework.stereotype.Component; import java.util.BitSet; diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/FilterStrategies.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/FilterStrategies.java index 840064a1..9e3d9ea3 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/FilterStrategies.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/FilterStrategies.java @@ -6,7 +6,7 @@ import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.common.dto.DistributeInstance; import com.aizuda.easy.retry.server.retry.task.support.FilterStrategy; -import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy; +import com.aizuda.easy.retry.server.common.IdempotentStrategy; import com.aizuda.easy.retry.server.retry.task.support.RetryContext; import com.aizuda.easy.retry.server.retry.task.support.cache.CacheGroupRateLimiter; import com.aizuda.easy.retry.template.datasource.persistence.mapper.ServerNodeMapper; diff --git a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/ConsumerBucketActor.java b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/ConsumerBucketActor.java index de80d7fd..71245745 100644 --- a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/ConsumerBucketActor.java +++ b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/ConsumerBucketActor.java @@ -8,13 +8,13 @@ import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup; import com.aizuda.easy.retry.server.common.cache.CacheGroupScanActor; import com.aizuda.easy.retry.server.common.config.SystemProperties; +import com.aizuda.easy.retry.server.common.enums.SystemModeEnum; import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.dto.ScanTask; import com.aizuda.easy.retry.server.retry.task.support.cache.CacheGroupRateLimiter; import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; import com.aizuda.easy.retry.template.datasource.persistence.mapper.ServerNodeMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig; -import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig; import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.google.common.cache.Cache; @@ -70,31 +70,36 @@ public class ConsumerBucketActor extends AbstractActor { return; } - // 查询桶对应组信息 - List groupConfigs = accessTemplate.getGroupConfigAccess().list( - new LambdaQueryWrapper() - .select(GroupConfig::getGroupName) - .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()) - .in(GroupConfig::getBucketIndex, consumerBucket.getBuckets()) - ); + if (systemProperties.getMode() == SystemModeEnum.ALL || systemProperties.getMode() == SystemModeEnum.RETRY) { + // 查询桶对应组信息 + List groupConfigs = accessTemplate.getGroupConfigAccess().list( + new LambdaQueryWrapper() + .select(GroupConfig::getGroupName) + .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()) + .in(GroupConfig::getBucketIndex, consumerBucket.getBuckets()) + ); - if (!CollectionUtils.isEmpty(groupConfigs)) { - for (final GroupConfig groupConfig : groupConfigs) { - CacheConsumerGroup.addOrUpdate(groupConfig.getGroupName()); - ScanTask scanTask = new ScanTask(); - scanTask.setGroupName(groupConfig.getGroupName()); - scanTask.setBuckets(consumerBucket.getBuckets()); - produceScanActorTask(scanTask); + if (!CollectionUtils.isEmpty(groupConfigs)) { + for (final GroupConfig groupConfig : groupConfigs) { + CacheConsumerGroup.addOrUpdate(groupConfig.getGroupName()); + ScanTask scanTask = new ScanTask(); + scanTask.setGroupName(groupConfig.getGroupName()); + scanTask.setBuckets(consumerBucket.getBuckets()); + produceScanActorTask(scanTask); + } } } - // 扫描回调数据 - ScanTask scanTask = new ScanTask(); - scanTask.setBuckets(consumerBucket.getBuckets()); - scanTask.setSize(1000); - scanTask.setStartId(0); - ActorRef scanJobActorRef = cacheActorRef("DEFAULT_JOB_KEY", TaskTypeEnum.JOB); - scanJobActorRef.tell(scanTask, scanJobActorRef); + if (systemProperties.getMode() == SystemModeEnum.ALL || systemProperties.getMode() == SystemModeEnum.JOB) { + // 扫描回调数据 + ScanTask scanTask = new ScanTask(); + scanTask.setBuckets(consumerBucket.getBuckets()); + scanTask.setSize(1000); + scanTask.setStartId(0); + ActorRef scanJobActorRef = cacheActorRef("DEFAULT_JOB_KEY", TaskTypeEnum.JOB); + scanJobActorRef.tell(scanTask, scanJobActorRef); + } + } /** diff --git a/easy-retry-server/easy-retry-server-starter/src/main/resources/application.yml b/easy-retry-server/easy-retry-server-starter/src/main/resources/application.yml index c081f2fa..05937a1b 100644 --- a/easy-retry-server/easy-retry-server-starter/src/main/resources/application.yml +++ b/easy-retry-server/easy-retry-server-starter/src/main/resources/application.yml @@ -48,6 +48,7 @@ easy-retry: max-count: 288 #回调最大执行次数 trigger-interval: 900 #间隔时间 db-type: mysql #当前使用的数据库 + mode: retry diff --git a/easy-retry-server/easy-retry-server-starter/src/test/java/com/aizuda/easy/retry/server/ConfigVersionSyncHandlerTest.java b/easy-retry-server/easy-retry-server-starter/src/test/java/com/aizuda/easy/retry/server/ConfigVersionSyncHandlerTest.java index a03b7ceb..700cd2a5 100644 --- a/easy-retry-server/easy-retry-server-starter/src/test/java/com/aizuda/easy/retry/server/ConfigVersionSyncHandlerTest.java +++ b/easy-retry-server/easy-retry-server-starter/src/test/java/com/aizuda/easy/retry/server/ConfigVersionSyncHandlerTest.java @@ -1,6 +1,5 @@ package com.aizuda.easy.retry.server; -import com.aizuda.easy.retry.server.retry.task.support.handler.ConfigVersionSyncHandler; import lombok.SneakyThrows; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -13,14 +12,14 @@ import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest public class ConfigVersionSyncHandlerTest { - @Autowired - private ConfigVersionSyncHandler configVersionSyncHandler; - - @SneakyThrows - @Test - public void syncVersion() { - configVersionSyncHandler.addSyncTask( "example_group", 0); - - } +// @Autowired +// private ConfigVersionSyncHandler configVersionSyncHandler; +// +// @SneakyThrows +// @Test +// public void syncVersion() { +// configVersionSyncHandler.addSyncTask( "example_group", 0); +// +// } }