From 2cd71f6e163ec0375bed36ebd7057f21cbfd7289 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Fri, 28 Jul 2023 17:39:45 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.1.0=201.=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E6=BB=91=E5=8A=A8=E7=AA=97=E5=8F=A3=202.=20=E5=BC=82=E6=AD=A5?= =?UTF-8?q?=E4=B8=8A=E6=8A=A5=E6=8F=90=E4=BE=9B=E5=8F=82=E6=95=B0=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=203.=20=E4=B8=8A=E6=8A=A5=E6=95=B0=E6=8D=AE=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E7=AB=AF=E6=94=AF=E6=8C=81=E5=A4=B1=E8=B4=A5=E9=87=8D?= =?UTF-8?q?=E8=AF=95=204.=20=E9=87=8D=E8=AF=95=E4=BB=BB=E5=8A=A1=E5=92=8C?= =?UTF-8?q?=E5=9B=9E=E8=B0=83=E4=BB=BB=E5=8A=A1=E6=89=AB=E6=8F=8F=E6=94=B9?= =?UTF-8?q?=E4=B8=BA=E4=BB=A5id=E4=BD=9C=E4=B8=BA=E5=81=8F=E7=A7=BB?= =?UTF-8?q?=E8=BF=9B=E8=A1=8C=E6=BB=9A=E5=8A=A8=E6=89=AB=E6=8F=8F=205.=20?= =?UTF-8?q?=E5=AE=A2=E6=88=B7=E7=AB=AF=E5=9B=9E=E8=B0=83=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E6=94=AF=E6=8C=81SpringBean=E5=92=8C=E6=99=AE=E9=80=9A?= =?UTF-8?q?=E7=B1=BB=E6=A8=A1=E5=BC=8F=206.=20=E8=B4=9F=E8=BD=BD=E5=9D=87?= =?UTF-8?q?=E8=A1=A1=E5=99=A8=E6=96=B0=E5=A2=9E=E8=B4=9F=E8=BD=BD=E5=9D=87?= =?UTF-8?q?=E8=A1=A1=E5=91=A8=E6=9C=9F=E6=97=B6=E9=97=B4=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- easy-retry-client-core/pom.xml | 2 - .../client/core/client/RetryEndPoint.java | 104 +++++++++++++++--- .../core/config/EasyRetryProperties.java | 35 +++++- .../retry/client/core/report/AsyncReport.java | 32 +++--- .../client/core/window/SlidingWindow.java | 21 +++- easy-retry-server/pom.xml | 4 + .../retry/server/akka/ActorGenerator.java | 4 +- .../retry/server/akka/AkkaConfiguration.java | 2 +- .../retry/server/config/SystemProperties.java | 7 +- .../persistence/support/RetryTaskAccess.java | 2 +- .../access/retry/MybatisRetryTaskAccess.java | 24 ++-- .../processor/RetryTaskAccessProcessor.java | 4 +- .../server/server/RequestHandlerActor.java | 45 ++------ .../ReportRetryInfoHttpRequestHandler.java | 68 ++++++++++-- .../dispatch/actor/result/FailureActor.java | 6 +- .../dispatch/actor/result/FinishActor.java | 6 +- .../dispatch/actor/result/NoRetryActor.java | 6 +- .../actor/scan/AbstractScanGroup.java | 20 ++-- .../actor/scan/ScanCallbackGroupActor.java | 12 +- .../dispatch/actor/scan/ScanGroupActor.java | 13 ++- .../handler/ConfigVersionSyncHandler.java | 2 +- .../support/handler/ServerNodeBalance.java | 5 +- pom.xml | 7 +- 23 files changed, 303 insertions(+), 128 deletions(-) diff --git a/easy-retry-client-core/pom.xml b/easy-retry-client-core/pom.xml index eb9223931..6c85a4494 100644 --- a/easy-retry-client-core/pom.xml +++ b/easy-retry-client-core/pom.xml @@ -16,7 +16,6 @@ 1.8 - 2.0.0 @@ -38,7 +37,6 @@ com.github.rholder guava-retrying - ${guava-retrying.version} cn.hutool diff --git a/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/client/RetryEndPoint.java b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/client/RetryEndPoint.java index 76371930f..5742a3927 100644 --- a/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/client/RetryEndPoint.java +++ b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/client/RetryEndPoint.java @@ -26,6 +26,8 @@ import com.aizuda.easy.retry.client.model.DispatchRetryDTO; import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO; import com.aizuda.easy.retry.client.model.RetryCallbackDTO; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.util.ReflectionUtils; @@ -60,7 +62,7 @@ public class RetryEndPoint { * 服务端调度重试入口 */ @PostMapping("/dispatch/v1") - public Result dispatch(@RequestBody @Validated DispatchRetryDTO executeReqDto) { + public Result dispatch(@RequestBody @Validated DispatchRetryDTO executeReqDto) { RetryerInfo retryerInfo = RetryerInfoCache.get(executeReqDto.getScene(), executeReqDto.getExecutorName()); if (Objects.isNull(retryerInfo)) { @@ -71,7 +73,8 @@ public class RetryEndPoint { Object[] deSerialize = null; try { - deSerialize = (Object[]) retryArgSerializer.deSerialize(executeReqDto.getArgsStr(), retryerInfo.getExecutor().getClass(), retryerInfo.getMethod()); + deSerialize = (Object[]) retryArgSerializer.deSerialize(executeReqDto.getArgsStr(), + retryerInfo.getExecutor().getClass(), retryerInfo.getMethod()); } catch (JsonProcessingException e) { throw new EasyRetryClientException("参数解析异常", e); } @@ -83,7 +86,8 @@ public class RetryEndPoint { HttpServletRequest request = Objects.requireNonNull(attributes).getRequest(); request.setAttribute("attemptNumber", executeReqDto.getRetryCount()); - RetryerResultContext retryerResultContext = retryStrategy.openRetry(executeReqDto.getScene(), executeReqDto.getExecutorName(), deSerialize); + RetryerResultContext retryerResultContext = retryStrategy.openRetry(executeReqDto.getScene(), + executeReqDto.getExecutorName(), deSerialize); if (RetrySiteSnapshot.isRetryForStatusCode()) { executeRespDto.setStatusCode(RetryResultStatusEnum.STOP.getStatus()); @@ -127,25 +131,87 @@ public class RetryEndPoint { RetryArgSerializer retryArgSerializer = new JacksonSerializer(); - Object[] deSerialize = null; + Object[] deSerialize; try { - deSerialize = (Object[]) retryArgSerializer.deSerialize(callbackDTO.getArgsStr(), retryerInfo.getExecutor().getClass(), retryerInfo.getMethod()); + deSerialize = (Object[]) retryArgSerializer.deSerialize(callbackDTO.getArgsStr(), + retryerInfo.getExecutor().getClass(), retryerInfo.getMethod()); } catch (JsonProcessingException e) { throw new EasyRetryClientException("参数解析异常", e); } + try { + // 以Spring Bean模式回调 + return doCallbackForSpringBean(callbackDTO, retryerInfo, deSerialize); + } catch (NoSuchBeanDefinitionException e) { + // 若不是SpringBean 则直接反射以普通类调用 + return doCallbackForOrdinaryClass(callbackDTO, retryerInfo, deSerialize); + } + } + + /** + * 以普通类进行回调 + * + * @param callbackDTO {@link RetryCallbackDTO} 服务端调度重试入参 + * @param retryerInfo {@link RetryerInfo} 定义重试场景的信息 + * @param deSerialize 参数信息 + * @return Result + */ + private Result doCallbackForOrdinaryClass(RetryCallbackDTO callbackDTO, RetryerInfo retryerInfo, + Object[] deSerialize) { Class retryCompleteCallbackClazz = retryerInfo.getRetryCompleteCallback(); + + try { + RetryCompleteCallback retryCompleteCallback = retryCompleteCallbackClazz.newInstance(); + Method method; + switch (Objects.requireNonNull(RetryStatusEnum.getByStatus(callbackDTO.getRetryStatus()))) { + case FINISH: + method = retryCompleteCallbackClazz.getMethod("doSuccessCallback", String.class, String.class, + Object[].class); + break; + case MAX_COUNT: + method = retryCompleteCallbackClazz.getMethod("doMaxRetryCallback", String.class, String.class, + Object[].class); + break; + default: + throw new EasyRetryClientException("回调状态异常"); + } + + Assert.notNull(method, () -> new EasyRetryClientException("no such method")); + ReflectionUtils.invokeMethod(method, retryCompleteCallback, retryerInfo.getScene(), + retryerInfo.getExecutorClassName(), deSerialize); + return new Result(1, "回调成功"); + } catch (Exception ex) { + return new Result(0, ex.getMessage()); + } + + } + + /** + * 以Spring Bean模式回调 + * + * @param callbackDTO {@link RetryCallbackDTO} 服务端调度重试入参 + * @param retryerInfo {@link RetryerInfo} 定义重试场景的信息 + * @param deSerialize 参数信息 + * @return Result + */ + private Result doCallbackForSpringBean(RetryCallbackDTO callbackDTO, RetryerInfo retryerInfo, Object[] deSerialize) { + Class retryCompleteCallbackClazz = retryerInfo.getRetryCompleteCallback(); + RetryCompleteCallback retryCompleteCallback = SpringContext.getBeanByType(retryCompleteCallbackClazz); - - if (RetryStatusEnum.FINISH.getStatus().equals(callbackDTO.getRetryStatus())) { - retryCompleteCallback.doSuccessCallback(retryerInfo.getScene(), retryerInfo.getExecutorClassName(), deSerialize); + switch (Objects.requireNonNull(RetryStatusEnum.getByStatus(callbackDTO.getRetryStatus()))) { + case FINISH: + retryCompleteCallback.doSuccessCallback(retryerInfo.getScene(), retryerInfo.getExecutorClassName(), + deSerialize); + break; + case MAX_COUNT: + retryCompleteCallback.doMaxRetryCallback(retryerInfo.getScene(), retryerInfo.getExecutorClassName(), + deSerialize); + break; + default: + throw new EasyRetryClientException("回调状态异常"); } - if (RetryStatusEnum.MAX_COUNT.getStatus().equals(callbackDTO.getRetryStatus())) { - retryCompleteCallback.doMaxRetryCallback(retryerInfo.getScene(), retryerInfo.getExecutorClassName(), deSerialize); - } - - return new Result(); + return new Result(1, "回调成功"); } /** @@ -155,14 +221,16 @@ public class RetryEndPoint { * @return idempotentId */ @PostMapping("/generate/idempotent-id/v1") - public Result idempotentIdGenerate(@RequestBody @Validated GenerateRetryIdempotentIdDTO generateRetryIdempotentIdDTO) { + public Result idempotentIdGenerate( + @RequestBody @Validated GenerateRetryIdempotentIdDTO generateRetryIdempotentIdDTO) { String scene = generateRetryIdempotentIdDTO.getScene(); String executorName = generateRetryIdempotentIdDTO.getExecutorName(); String argsStr = generateRetryIdempotentIdDTO.getArgsStr(); RetryerInfo retryerInfo = RetryerInfoCache.get(scene, executorName); - Assert.notNull(retryerInfo, ()-> new EasyRetryClientException("重试信息不存在 scene:[{}] executorName:[{}]", scene, executorName)); + Assert.notNull(retryerInfo, + () -> new EasyRetryClientException("重试信息不存在 scene:[{}] executorName:[{}]", scene, executorName)); Method executorMethod = retryerInfo.getMethod(); @@ -170,7 +238,8 @@ public class RetryEndPoint { Object[] deSerialize = null; try { - deSerialize = (Object[]) retryArgSerializer.deSerialize(argsStr, retryerInfo.getExecutor().getClass(), retryerInfo.getMethod()); + deSerialize = (Object[]) retryArgSerializer.deSerialize(argsStr, retryerInfo.getExecutor().getClass(), + retryerInfo.getMethod()); } catch (JsonProcessingException e) { throw new EasyRetryClientException("参数解析异常", e); } @@ -180,7 +249,8 @@ public class RetryEndPoint { Class idempotentIdGenerate = retryerInfo.getIdempotentIdGenerate(); IdempotentIdGenerate generate = idempotentIdGenerate.newInstance(); Method method = idempotentIdGenerate.getMethod("idGenerate", IdempotentIdContext.class); - IdempotentIdContext idempotentIdContext = new IdempotentIdContext(scene, executorName, deSerialize, executorMethod.getName()); + IdempotentIdContext idempotentIdContext = new IdempotentIdContext(scene, executorName, deSerialize, + executorMethod.getName()); idempotentId = (String) ReflectionUtils.invokeMethod(method, generate, idempotentIdContext); } catch (Exception exception) { LogUtils.error(log, "幂等id生成异常:{},{}", scene, argsStr, exception); diff --git a/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/config/EasyRetryProperties.java b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/config/EasyRetryProperties.java index 69d3661d2..dda2b5abd 100644 --- a/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/config/EasyRetryProperties.java +++ b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/config/EasyRetryProperties.java @@ -1,18 +1,21 @@ package com.aizuda.easy.retry.client.core.config; import com.aizuda.easy.retry.common.core.context.SpringContext; -import com.aizuda.easy.retry.common.core.util.HostUtils; import lombok.Data; import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; +import java.time.temporal.ChronoUnit; import java.util.Objects; /** + * easy retry 客户端配置 + * * @author: www.byteblogs.com * @date : 2022-03-04 15:53 + * @since 1.0.0 */ @Configuration @ConfigurationProperties(prefix = "easy-retry") @@ -35,6 +38,11 @@ public class EasyRetryProperties { */ private Integer port; + /** + * 远程上报滑动窗口配置 + */ + private SlidingWindowConfig slidingWindow = new SlidingWindowConfig(); + /** * 服务端配置 */ @@ -53,6 +61,31 @@ public class EasyRetryProperties { private int port = 1788; } + @Data + public static class SlidingWindowConfig { + + /** + * 总量窗口期阈值 + */ + private int totalThreshold = 50; + + /** + * 窗口数量预警 + */ + private int windowTotalThreshold = 150; + + /** + * 窗口期时间长度 + */ + private long duration = 10; + + /** + * 窗口期单位 + */ + private ChronoUnit chronoUnit = ChronoUnit.SECONDS; + + } + public static String getGroup() { EasyRetryProperties properties = SpringContext.CONTEXT.getBean(EasyRetryProperties.class); return Objects.requireNonNull(properties).group; diff --git a/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/report/AsyncReport.java b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/report/AsyncReport.java index 44d410a54..0ea4d7de9 100644 --- a/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/report/AsyncReport.java +++ b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/report/AsyncReport.java @@ -1,16 +1,15 @@ package com.aizuda.easy.retry.client.core.report; import com.aizuda.easy.retry.client.core.Lifecycle; +import com.aizuda.easy.retry.client.core.config.EasyRetryProperties; +import com.aizuda.easy.retry.client.core.config.EasyRetryProperties.SlidingWindowConfig; import com.aizuda.easy.retry.client.core.retryer.RetryerInfo; -import com.aizuda.easy.retry.client.core.window.RetryLeapArray; import com.aizuda.easy.retry.client.core.window.SlidingWindow; import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.time.temporal.ChronoUnit; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** @@ -23,8 +22,10 @@ import java.util.concurrent.TimeUnit; @Component @Slf4j public class AsyncReport extends AbstractReport implements Lifecycle { - private static SlidingWindow slidingWindow; + @Autowired + private EasyRetryProperties easyRetryProperties; + private SlidingWindow slidingWindow; @Override public boolean supports(boolean async) { return async; @@ -33,7 +34,8 @@ public class AsyncReport extends AbstractReport implements Lifecycle { @Override public boolean doReport(RetryerInfo retryerInfo, Object[] params) { - return syncReport(retryerInfo.getScene(), retryerInfo.getExecutorClassName(), params, retryerInfo.getTimeout(), retryerInfo.getUnit()); + return syncReport(retryerInfo.getScene(), retryerInfo.getExecutorClassName(), params, retryerInfo.getTimeout(), + retryerInfo.getUnit()); } /** @@ -46,17 +48,21 @@ public class AsyncReport extends AbstractReport implements Lifecycle { return Boolean.TRUE; } - @Override public void start() { + SlidingWindowConfig slidingWindowConfig = easyRetryProperties.getSlidingWindow(); + slidingWindow = SlidingWindow - .Builder - .newBuilder() - .withTotalThreshold(10) - .withDuration(5, ChronoUnit.SECONDS) - .withListener(new ReportListener()) - .build(); + .Builder + .newBuilder() + .withTotalThreshold(slidingWindowConfig.getTotalThreshold()) + .withWindowTotalThreshold(slidingWindowConfig.getWindowTotalThreshold()) + .withDuration(slidingWindowConfig.getDuration(), slidingWindowConfig.getChronoUnit()) + .withListener(new ReportListener()) + .build(); + + slidingWindow.start(); } @Override diff --git a/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/window/SlidingWindow.java b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/window/SlidingWindow.java index 4457d0587..15cdb00fa 100644 --- a/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/window/SlidingWindow.java +++ b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/window/SlidingWindow.java @@ -61,7 +61,15 @@ public class SlidingWindow { */ private final ChronoUnit chronoUnit; - public static final ReentrantLock lock = new ReentrantLock();//创建锁对象 + /** + * 新增窗口锁 + */ + private static final ReentrantLock SAVE_LOCK = new ReentrantLock(); + + /** + * 到达时间窗口期或者总量窗口期锁 + */ + private static final ReentrantLock NOTICE_LOCK = new ReentrantLock(); public SlidingWindow(int totalThreshold, int windowTotalThreshold, @@ -87,7 +95,7 @@ public class SlidingWindow { LocalDateTime now = LocalDateTime.now(); if (isOpenNewWindow(now)) { - lock.lock(); + SAVE_LOCK.lock(); LocalDateTime windowPeriod = now.plus(duration, chronoUnit); try { @@ -111,7 +119,7 @@ public class SlidingWindow { } } finally { - lock.unlock(); + SAVE_LOCK.unlock(); } } else { @@ -167,7 +175,7 @@ public class SlidingWindow { */ private void doHandlerListener(LocalDateTime windowPeriod) { - lock.lock(); + NOTICE_LOCK.lock(); try { @@ -191,7 +199,7 @@ public class SlidingWindow { } catch (Exception e) { log.error("到达总量窗口期通知异常", e); } finally { - lock.unlock(); + NOTICE_LOCK.unlock(); } } @@ -397,6 +405,7 @@ public class SlidingWindow { * @return this */ public Builder withDuration(long duration, ChronoUnit chronoUnit) { + Assert.isTrue(duration > 0, "窗口期不能小于0"); this.duration = duration; this.chronoUnit = chronoUnit; return this; @@ -421,7 +430,7 @@ public class SlidingWindow { public SlidingWindow build() { if (Objects.isNull(threadPoolExecutor)) { threadPoolExecutor = Executors - .newSingleThreadScheduledExecutor(r -> new Thread(r, "SlidingWindowThread")); + .newSingleThreadScheduledExecutor(r -> new Thread(r, "sliding-window-thread")); } if (CollectionUtils.isEmpty(listeners)) { diff --git a/easy-retry-server/pom.xml b/easy-retry-server/pom.xml index f6aea1b08..6f0f32e0a 100644 --- a/easy-retry-server/pom.xml +++ b/easy-retry-server/pom.xml @@ -127,6 +127,10 @@ perf4j ${perf4j.version} + + com.github.rholder + guava-retrying + diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/akka/ActorGenerator.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/akka/ActorGenerator.java index 62aa63655..2796c99e1 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/akka/ActorGenerator.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/akka/ActorGenerator.java @@ -92,7 +92,7 @@ public class ActorGenerator { * @return actor 引用 */ public static ActorRef logActor() { - return getNettyActorSystem().actorOf(getSpringExtension().props(LogActor.BEAN_NAME)); + return getLogActorSystemSystem().actorOf(getSpringExtension().props(LogActor.BEAN_NAME)); } /** @@ -101,7 +101,7 @@ public class ActorGenerator { * @return actor 引用 */ public static ActorRef requestHandlerActor() { - return getLogActorSystemSystem().actorOf(getSpringExtension().props(RequestHandlerActor.BEAN_NAME)); + return getNettyActorSystem().actorOf(getSpringExtension().props(RequestHandlerActor.BEAN_NAME)); } public static SpringExtension getSpringExtension() { diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/akka/AkkaConfiguration.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/akka/AkkaConfiguration.java index f1ab1a404..a4ccc52a0 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/akka/AkkaConfiguration.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/akka/AkkaConfiguration.java @@ -19,7 +19,7 @@ public class AkkaConfiguration { private static final String DISPATCH_EXEC_UNIT_RETRY_ACTOR_SYSTEM = "DISPATCH_EXEC_UNIT_RETRY_ACTOR_SYSTEM"; private static final String DISPATCH_RESULT_ACTOR_SYSTEM = "DISPATCH_RESULT_ACTOR_SYSTEM"; private static final String LOG_ACTOR_SYSTEM = "LOG_ACTOR_SYSTEM"; - private static final String NETTY_ACTOR_SYSTEM = "nettyActorSystem"; + private static final String NETTY_ACTOR_SYSTEM = "NETTY_ACTOR_SYSTEM"; @Autowired private ApplicationContext applicationContext; diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/SystemProperties.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/SystemProperties.java index 3d44502c5..b10781d7a 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/SystemProperties.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/SystemProperties.java @@ -39,7 +39,7 @@ public class SystemProperties { /** * 一个客户端每秒最多接收的重试数量指令 */ - private int limiter = 10; + private int limiter = 100; /** * 号段模式下步长配置 @@ -57,6 +57,11 @@ public class SystemProperties { */ private DbTypeEnum dbType = DbTypeEnum.MYSQL; + /** + * 负载均衡周期时间 + */ + private int loadBalanceCycleTime = 10; + /** * 回调配置 */ diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/RetryTaskAccess.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/RetryTaskAccess.java index c01024d70..3f54b2c16 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/RetryTaskAccess.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/RetryTaskAccess.java @@ -14,7 +14,7 @@ public interface RetryTaskAccess { /** * 批量查询重试任务 */ - List listAvailableTasks(String groupName, LocalDateTime lastAt, Integer pageSize, Integer taskType); + List listAvailableTasks(String groupName, LocalDateTime lastAt, final Long lastId, Integer pageSize, Integer taskType); List listRetryTaskByRetryCount(String groupName, Integer retryStatus); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/retry/MybatisRetryTaskAccess.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/retry/MybatisRetryTaskAccess.java index dd8605487..c30b01def 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/retry/MybatisRetryTaskAccess.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/retry/MybatisRetryTaskAccess.java @@ -30,31 +30,35 @@ public class MybatisRetryTaskAccess extends AbstractRetryTaskAccess { } @Override - public List listAvailableTasks(String groupName, LocalDateTime lastAt, Integer pageSize, Integer taskType) { + public List listAvailableTasks(String groupName, LocalDateTime lastAt, Long lastId, Integer pageSize, + Integer taskType) { setPartition(groupName); return retryTaskMapper.selectPage(new PageDTO<>(0, pageSize), new LambdaQueryWrapper() - .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()) - .eq(RetryTask::getGroupName, groupName) - .eq(RetryTask::getTaskType, taskType) - .gt(RetryTask::getCreateDt, lastAt) - .orderByAsc(RetryTask::getCreateDt)).getRecords(); + .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()) + .eq(RetryTask::getGroupName, groupName) + .eq(RetryTask::getTaskType, taskType) + .gt(RetryTask::getId, lastId) + .gt(RetryTask::getCreateDt, lastAt) + .orderByAsc(RetryTask::getId) + .orderByAsc(RetryTask::getCreateDt)) + .getRecords(); } @Override public List listRetryTaskByRetryCount(String groupName, Integer retryStatus) { setPartition(groupName); return retryTaskMapper.selectPage(new PageDTO<>(0, 1000), - new LambdaQueryWrapper() - .eq(RetryTask::getRetryStatus, retryStatus) - .eq(RetryTask::getGroupName, groupName)).getRecords(); + new LambdaQueryWrapper() + .eq(RetryTask::getRetryStatus, retryStatus) + .eq(RetryTask::getGroupName, groupName)).getRecords(); } @Override public int deleteByDelayLevel(String groupName, Integer retryStatus) { setPartition(groupName); return retryTaskMapper.delete(new LambdaQueryWrapper() - .eq(RetryTask::getGroupName, groupName).eq(RetryTask::getRetryStatus, retryStatus)); + .eq(RetryTask::getGroupName, groupName).eq(RetryTask::getRetryStatus, retryStatus)); } @Override diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/processor/RetryTaskAccessProcessor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/processor/RetryTaskAccessProcessor.java index c84828b3b..11443cd8a 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/processor/RetryTaskAccessProcessor.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/processor/RetryTaskAccessProcessor.java @@ -30,8 +30,8 @@ public class RetryTaskAccessProcessor implements RetryTaskAccess { * 批量查询重试任务 */ @Override - public List listAvailableTasks(String groupName, LocalDateTime lastAt, Integer pageSize, Integer taskType) { - return retryTaskAccesses.listAvailableTasks(groupName, lastAt, pageSize, taskType); + public List listAvailableTasks(String groupName, LocalDateTime lastAt, final Long lastId, Integer pageSize, Integer taskType) { + return retryTaskAccesses.listAvailableTasks(groupName, lastAt, lastId, pageSize, taskType); } @Override diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/server/RequestHandlerActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/server/RequestHandlerActor.java index 2b0757885..8f69f4198 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/server/RequestHandlerActor.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/server/RequestHandlerActor.java @@ -1,45 +1,34 @@ package com.aizuda.easy.retry.server.server; import akka.actor.AbstractActor; -import akka.actor.OneForOneStrategy; -import akka.actor.SupervisorStrategy; import cn.hutool.core.net.url.UrlBuilder; import com.aizuda.easy.retry.common.core.context.SpringContext; import com.aizuda.easy.retry.common.core.enums.HeadersEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; -import com.aizuda.easy.retry.common.core.model.EasyRetryRequest; import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.dto.NettyHttpRequest; -import com.aizuda.easy.retry.server.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.server.handler.HttpRequestHandler; import com.aizuda.easy.retry.server.support.Register; -import com.aizuda.easy.retry.server.support.dispatch.actor.log.LogActor; import com.aizuda.easy.retry.server.support.handler.ConfigVersionSyncHandler; import com.aizuda.easy.retry.server.support.register.ClientRegister; import com.aizuda.easy.retry.server.support.register.RegisterContext; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.DefaultFullHttpResponse; -import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.HttpVersion; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; -import org.springframework.dao.ConcurrencyFailureException; -import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Component; -import org.springframework.transaction.TransactionSystemException; -import scala.concurrent.duration.Duration; import java.util.Collection; @@ -57,25 +46,10 @@ public class RequestHandlerActor extends AbstractActor { public static final String BEAN_NAME = "requestHandlerActor"; - private static final SupervisorStrategy STRATEGY = - new OneForOneStrategy(5, Duration.create("1 minute"), - throwable -> { - if (throwable instanceof DuplicateKeyException - || throwable instanceof TransactionSystemException - || throwable instanceof ConcurrencyFailureException) { - LogUtils.error(log, "RequestHandlerActor handler exception", throwable); - return SupervisorStrategy.restart(); - }else { - return SupervisorStrategy.escalate(); - } - }); - - @Override public Receive createReceive() { return receiveBuilder().match(NettyHttpRequest.class, nettyHttpRequest -> { - final String uri = nettyHttpRequest.getUri(); if (StringUtils.isBlank(uri)) { LogUtils.error(log, "uri can not be null"); @@ -89,19 +63,22 @@ public class RequestHandlerActor extends AbstractActor { final String content = nettyHttpRequest.getContent(); final HttpHeaders headers = nettyHttpRequest.getHeaders(); - String result = doProcess(uri, content, method, headers); + String result = ""; + try { + result = doProcess(uri, content, method, headers); + } catch (Exception e) { + LogUtils.error(log, "http request error. [{}]", nettyHttpRequest.getContent(), e); + result = JsonUtil.toJsonString(new Result<>(0, e.getMessage())); + throw e; + } finally { + writeResponse(channelHandlerContext, keepAlive, result); + getContext().stop(getSelf()); + } - writeResponse(channelHandlerContext, keepAlive, result); }).build(); } - @Override - public SupervisorStrategy supervisorStrategy() { - return STRATEGY; - } - - private String doProcess(String uri, String content, HttpMethod method, HttpHeaders headers) { diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/server/handler/ReportRetryInfoHttpRequestHandler.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/server/handler/ReportRetryInfoHttpRequestHandler.java index 88b503afa..38c8e09a4 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/server/handler/ReportRetryInfoHttpRequestHandler.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/server/handler/ReportRetryInfoHttpRequestHandler.java @@ -13,16 +13,29 @@ import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO; import com.aizuda.easy.retry.server.service.convert.TaskContextConverter; import com.aizuda.easy.retry.server.support.generator.TaskGenerator; import com.aizuda.easy.retry.server.support.generator.task.TaskContext; +import com.github.rholder.retry.Attempt; +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.RetryListener; +import com.github.rholder.retry.Retryer; +import com.github.rholder.retry.RetryerBuilder; +import com.github.rholder.retry.StopStrategies; +import com.github.rholder.retry.WaitStrategies; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.ConcurrencyFailureException; +import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Component; +import org.springframework.transaction.TransactionSystemException; import org.springframework.transaction.annotation.Transactional; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static com.aizuda.easy.retry.common.core.constant.SystemConstants.HTTP_PATH.BATCH_REPORT; @@ -72,20 +85,55 @@ public class ReportRetryInfoHttpRequestHandler extends PostHttpRequestHandler { Map> map = retryTaskList.stream().collect(Collectors.groupingBy(RetryTaskDTO::getSceneName)); - map.forEach(((sceneName, retryTaskDTOS) -> { - TaskContext taskContext = new TaskContext(); - taskContext.setSceneName(sceneName); - taskContext.setGroupName(set.stream().findFirst().get()); - taskContext.setTaskInfos(TaskContextConverter.INSTANCE.toTaskContextInfo(retryTaskList)); + Retryer retryer = RetryerBuilder.newBuilder() + .retryIfException(throwable -> { + // 若是数据库异常则重试 + if (throwable instanceof DuplicateKeyException + || throwable instanceof TransactionSystemException + || throwable instanceof ConcurrencyFailureException + || throwable instanceof IOException) { + return true; + } + return false; + }) + .withStopStrategy(StopStrategies.stopAfterAttempt(5)) + .withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS)) + .withRetryListener(new RetryListener() { + @Override + public void onRetry(final Attempt attempt) { + if (attempt.hasException()) { + LogUtils.error(log, "数据上报发生异常执行重试. reqId:[{}] count:[{}]", + retryRequest.getReqId(), attempt.getAttemptNumber(), attempt.getExceptionCause()); + } + } + }) + .build(); - // 生成任务 - taskGenerator.taskGenerator(taskContext); - })); + retryer.call(() -> { + map.forEach(((sceneName, retryTaskDTOS) -> { + TaskContext taskContext = new TaskContext(); + taskContext.setSceneName(sceneName); + taskContext.setGroupName(set.stream().findFirst().get()); + taskContext.setTaskInfos(TaskContextConverter.INSTANCE.toTaskContextInfo(retryTaskList)); + + // 生成任务 + taskGenerator.taskGenerator(taskContext); + })); + + return null; + }); return JsonUtil.toJsonString(new NettyResult(StatusEnum.YES.getStatus(), "Batch Retry Data Upload Processed Successfully", Boolean.TRUE, retryRequest.getReqId())); } catch (Exception e) { - LogUtils.error(log, "Batch Report Retry Data Error. <|>{}<|>", args[0], e); - return JsonUtil.toJsonString(new NettyResult(StatusEnum.YES.getStatus(), e.getMessage(), Boolean.FALSE, retryRequest.getReqId())); + + Throwable throwable = e; + if (e.getClass().isAssignableFrom(RetryException.class)) { + RetryException re = (RetryException) e; + throwable = re.getLastFailedAttempt().getExceptionCause(); + } + + LogUtils.error(log, "Batch Report Retry Data Error. <|>{}<|>", args[0], throwable); + return JsonUtil.toJsonString(new NettyResult(StatusEnum.YES.getStatus(), throwable.getMessage(), Boolean.FALSE, retryRequest.getReqId())); } } } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/FailureActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/FailureActor.java index 75acd7a2e..a8b9af38b 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/FailureActor.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/FailureActor.java @@ -2,12 +2,14 @@ package com.aizuda.easy.retry.server.support.dispatch.actor.result; import akka.actor.AbstractActor; import akka.actor.ActorRef; +import cn.hutool.core.lang.Assert; import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.akka.ActorGenerator; import com.aizuda.easy.retry.server.config.SystemProperties; import com.aizuda.easy.retry.server.enums.TaskTypeEnum; +import com.aizuda.easy.retry.server.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; import com.aizuda.easy.retry.server.persistence.mybatis.po.SceneConfig; import com.aizuda.easy.retry.server.persistence.support.ConfigAccess; @@ -78,7 +80,9 @@ public class FailureActor extends AbstractActor { callbackRetryTaskHandler.create(retryTask); } - retryTaskAccess.updateRetryTask(retryTask); + Assert.isTrue(1 == retryTaskAccess.updateRetryTask(retryTask), () -> + new EasyRetryServerException("更新重试任务失败. groupName:[{}] uniqueId:[{}]", + retryTask.getGroupName(), retryTask.getUniqueId())); } }); } catch (Exception e) { diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/FinishActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/FinishActor.java index 14fafb1ef..5022ee365 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/FinishActor.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/FinishActor.java @@ -2,9 +2,11 @@ package com.aizuda.easy.retry.server.support.dispatch.actor.result; import akka.actor.AbstractActor; import akka.actor.ActorRef; +import cn.hutool.core.lang.Assert; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; import com.aizuda.easy.retry.server.akka.ActorGenerator; import com.aizuda.easy.retry.server.enums.TaskTypeEnum; +import com.aizuda.easy.retry.server.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.support.dispatch.actor.log.RetryTaskLogDTO; import com.aizuda.easy.retry.server.support.handler.CallbackRetryTaskHandler; import com.aizuda.easy.retry.common.core.log.LogUtils; @@ -55,7 +57,9 @@ public class FinishActor extends AbstractActor { transactionTemplate.execute(new TransactionCallbackWithoutResult() { @Override protected void doInTransactionWithoutResult(TransactionStatus status) { - retryTaskAccess.updateRetryTask(retryTask); + Assert.isTrue(1 == retryTaskAccess.updateRetryTask(retryTask), () -> + new EasyRetryServerException("更新重试任务失败. groupName:[{}] uniqueId:[{}]", + retryTask.getGroupName(), retryTask.getUniqueId())); // 创建一个回调任务 callbackRetryTaskHandler.create(retryTask); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/NoRetryActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/NoRetryActor.java index a0dda9bb8..bcd2bb13a 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/NoRetryActor.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/NoRetryActor.java @@ -1,7 +1,9 @@ package com.aizuda.easy.retry.server.support.dispatch.actor.result; import akka.actor.AbstractActor; +import cn.hutool.core.lang.Assert; import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess; import com.aizuda.easy.retry.server.support.RetryContext; @@ -44,7 +46,9 @@ public class NoRetryActor extends AbstractActor { // 不更新重试次数 retryTask.setRetryCount(null); try { - retryTaskAccess.updateRetryTask(retryTask); + Assert.isTrue(1 == retryTaskAccess.updateRetryTask(retryTask), () -> + new EasyRetryServerException("更新重试任务失败. groupName:[{}] uniqueId:[{}]", + retryTask.getGroupName(), retryTask.getUniqueId())); }catch (Exception e) { LogUtils.error(log,"更新重试任务失败", e); } finally { diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/AbstractScanGroup.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/AbstractScanGroup.java index bde95ba15..528487533 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/AbstractScanGroup.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/AbstractScanGroup.java @@ -61,19 +61,19 @@ public abstract class AbstractScanGroup extends AbstractActor { protected void doScan(final ScanTaskDTO scanTaskDTO) { - LocalDateTime defLastAt = LocalDateTime.now().minusDays(systemProperties.getLastDays()); - + LocalDateTime lastAt = LocalDateTime.now().minusDays(systemProperties.getLastDays()); + int retryPullPageSize = systemProperties.getRetryPullPageSize(); String groupName = scanTaskDTO.getGroupName(); - LocalDateTime lastAt = Optional.ofNullable(getLastAt(groupName)).orElse(defLastAt); + Long lastId = Optional.ofNullable(getLastId(groupName)).orElse(0L); - // 扫描当前Group 待重试的数据 - List list = retryTaskAccessProcessor.listAvailableTasks(groupName, lastAt, systemProperties.getRetryPullPageSize(), + // 扫描当前Group 待处理的任务 + List list = retryTaskAccessProcessor.listAvailableTasks(groupName, lastAt, lastId, retryPullPageSize, getTaskType()); if (!CollectionUtils.isEmpty(list)) { - // 更新拉取的最大的创建时间 - putLastAt(scanTaskDTO.getGroupName(), list.get(list.size() - 1).getCreateDt()); + // 更新拉取的最大的id + putLastId(scanTaskDTO.getGroupName(), list.get(list.size() - 1).getId()); for (RetryTask retryTask : list) { @@ -97,7 +97,7 @@ public abstract class AbstractScanGroup extends AbstractActor { Thread.currentThread().interrupt(); } - putLastAt(groupName, defLastAt); + putLastId(groupName, 0L); } } @@ -108,9 +108,9 @@ public abstract class AbstractScanGroup extends AbstractActor { protected abstract Integer getTaskType(); - protected abstract LocalDateTime getLastAt(String groupName); + protected abstract Long getLastId(String groupName); - protected abstract LocalDateTime putLastAt(String groupName, LocalDateTime LocalDateTime); + protected abstract void putLastId(String groupName, Long lastId); private void retryCountIncrement(RetryTask retryTask) { Integer retryCount = retryTask.getRetryCount(); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanCallbackGroupActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanCallbackGroupActor.java index 71cb18339..563b760bd 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanCallbackGroupActor.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanCallbackGroupActor.java @@ -35,11 +35,11 @@ public class ScanCallbackGroupActor extends AbstractScanGroup { public static final String BEAN_NAME = "ScanCallbackGroupActor"; /** - * 缓存待拉取数据的起点时间 + * 缓存待拉取数据的起点id *

- * LAST_AT_MAP[key] = groupName LAST_AT_MAP[value] = retry_task的 create_at时间 + * LAST_AT_MAP[key] = groupName LAST_AT_MAP[value] = retry_task的 id */ - public static final ConcurrentMap LAST_AT_MAP = new ConcurrentHashMap<>(); + private static final ConcurrentMap LAST_AT_MAP = new ConcurrentHashMap<>(); @Override protected RetryContext builderRetryContext(final String groupName, final RetryTask retryTask) { @@ -73,13 +73,13 @@ public class ScanCallbackGroupActor extends AbstractScanGroup { } @Override - protected LocalDateTime getLastAt(final String groupName) { + protected Long getLastId(final String groupName) { return LAST_AT_MAP.get(groupName); } @Override - protected LocalDateTime putLastAt(final String groupName, final LocalDateTime LocalDateTime) { - return LAST_AT_MAP.put(groupName, LocalDateTime); + protected void putLastId(final String groupName, final Long lastId) { + LAST_AT_MAP.put(groupName, lastId); } private WaitStrategy getWaitWaitStrategy() { diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanGroupActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanGroupActor.java index 2043ab7bc..7a0d90fc8 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanGroupActor.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanGroupActor.java @@ -36,11 +36,11 @@ public class ScanGroupActor extends AbstractScanGroup { public static final String BEAN_NAME = "ScanGroupActor"; /** - * 缓存待拉取数据的起点时间 + * 缓存待拉取数据的起点id *

- * LAST_AT_MAP[key] = groupName LAST_AT_MAP[value] = retry_task的 create_at时间 + * LAST_AT_MAP[key] = groupName LAST_AT_MAP[value] = retry_task的id */ - public static final ConcurrentMap LAST_AT_MAP = new ConcurrentHashMap<>(); + private static final ConcurrentMap LAST_AT_MAP = new ConcurrentHashMap<>(); @Override protected RetryContext> builderRetryContext(final String groupName, @@ -76,15 +76,16 @@ public class ScanGroupActor extends AbstractScanGroup { } @Override - protected LocalDateTime getLastAt(final String groupName) { + protected Long getLastId(final String groupName) { return LAST_AT_MAP.get(groupName); } @Override - protected LocalDateTime putLastAt(final String groupName, final LocalDateTime LocalDateTime) { - return LAST_AT_MAP.put(groupName, LocalDateTime); + protected void putLastId(final String groupName, final Long lastId) { + LAST_AT_MAP.put(groupName, lastId); } + private WaitStrategy getWaitWaitStrategy(String groupName, String sceneName) { SceneConfig sceneConfig = configAccess.getSceneConfigByGroupNameAndSceneName(groupName, sceneName); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ConfigVersionSyncHandler.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ConfigVersionSyncHandler.java index 387344b58..d1910c752 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ConfigVersionSyncHandler.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ConfigVersionSyncHandler.java @@ -104,7 +104,7 @@ public class ConfigVersionSyncHandler implements Lifecycle, Runnable { } finally { try { // 防止刷的过快,休眠1s - TimeUnit.MILLISECONDS.sleep(1000); + TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerNodeBalance.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerNodeBalance.java index 8bb247a68..47553c31f 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerNodeBalance.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerNodeBalance.java @@ -3,6 +3,7 @@ package com.aizuda.easy.retry.server.support.handler; import cn.hutool.core.lang.Opt; import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.config.SystemProperties; import com.aizuda.easy.retry.server.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.persistence.mybatis.mapper.ServerNodeMapper; import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig; @@ -56,6 +57,8 @@ public class ServerNodeBalance implements Lifecycle, Runnable { @Autowired protected ServerNodeMapper serverNodeMapper; + @Autowired + protected SystemProperties systemProperties; /** * 控制rebalance状态 @@ -253,7 +256,7 @@ public class ServerNodeBalance implements Lifecycle, Runnable { LogUtils.error(log, "check balance error", e); } finally { try { - TimeUnit.SECONDS.sleep(1); + TimeUnit.SECONDS.sleep(systemProperties.getLoadBalanceCycleTime()); } catch (InterruptedException e) { LogUtils.error(log, "check balance interrupt"); Thread.currentThread().interrupt(); diff --git a/pom.xml b/pom.xml index 0e974fd74..236d24046 100644 --- a/pom.xml +++ b/pom.xml @@ -27,6 +27,7 @@ 5.8.19 3.5.3.1 2.0.0 + 2.0.0 @@ -81,7 +82,11 @@ mybatis-plus-boot-starter ${mybatis-plus.version} - + + com.github.rholder + guava-retrying + ${guava-retrying.version} +