From a811876e6134efd366a7fe5ac9359605b3ede379 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Sat, 14 Jan 2023 19:58:59 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E9=99=90=E6=B5=81=E7=BC=93?= =?UTF-8?q?=E5=AD=98=E6=97=B6=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/example/demo/TestExistsTransactionalRetryService.java | 2 +- .../java/com/example/ExistsTransactionalRetryServiceTest.java | 4 ++-- x-retry-server/Dockerfile | 1 + .../x/retry/server/support/cache/CacheGroupRateLimiter.java | 2 -- .../com/x/retry/server/support/dispatch/DispatchService.java | 4 +--- .../server/support/dispatch/actor/exec/ExecUnitActor.java | 1 - .../com/x/retry/server/support/strategy/FilterStrategies.java | 2 +- x-retry-server/src/main/resources/application.yml | 1 + 8 files changed, 7 insertions(+), 10 deletions(-) diff --git a/example/src/main/java/com/example/demo/TestExistsTransactionalRetryService.java b/example/src/main/java/com/example/demo/TestExistsTransactionalRetryService.java index b964753fe..72e1c9829 100644 --- a/example/src/main/java/com/example/demo/TestExistsTransactionalRetryService.java +++ b/example/src/main/java/com/example/demo/TestExistsTransactionalRetryService.java @@ -29,7 +29,7 @@ public class TestExistsTransactionalRetryService { @Autowired private RemoteService remoteService; - @Retryable(scene = "testSimpleInsert", bizNo = "#name", localTimes = 5) + @Retryable(scene = "testSimpleInsert", bizNo = "#name", localTimes = 3) @Transactional public String testSimpleInsert(String name) { diff --git a/example/src/test/java/com/example/ExistsTransactionalRetryServiceTest.java b/example/src/test/java/com/example/ExistsTransactionalRetryServiceTest.java index 391feb71f..2fdc40e03 100644 --- a/example/src/test/java/com/example/ExistsTransactionalRetryServiceTest.java +++ b/example/src/test/java/com/example/ExistsTransactionalRetryServiceTest.java @@ -79,7 +79,7 @@ public class ExistsTransactionalRetryServiceTest { @SneakyThrows @Test public void syncTestSimpleInsert() { - ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 50, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); Mockito.when(remoteService.call()) .thenReturn(new Result(0, "1")) @@ -89,7 +89,7 @@ public class ExistsTransactionalRetryServiceTest { .thenReturn(new Result(0, "5")) ; try { - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 300; i++) { threadPoolExecutor.execute(() -> testExistsTransactionalRetryService.testSimpleInsert(UUID.randomUUID().toString())); } } catch (Exception e) { diff --git a/x-retry-server/Dockerfile b/x-retry-server/Dockerfile index 2804f90d7..ac5950b20 100755 --- a/x-retry-server/Dockerfile +++ b/x-retry-server/Dockerfile @@ -1,5 +1,6 @@ FROM openjdk:8-jdk-alpine MAINTAINER www.byteblogs.com +LABEL server-name=x-retry-server ADD ./target/x-retry-server.jar x-retry-server.jar diff --git a/x-retry-server/src/main/java/com/x/retry/server/support/cache/CacheGroupRateLimiter.java b/x-retry-server/src/main/java/com/x/retry/server/support/cache/CacheGroupRateLimiter.java index d290a97ec..fe23b16ac 100644 --- a/x-retry-server/src/main/java/com/x/retry/server/support/cache/CacheGroupRateLimiter.java +++ b/x-retry-server/src/main/java/com/x/retry/server/support/cache/CacheGroupRateLimiter.java @@ -47,8 +47,6 @@ public class CacheGroupRateLimiter implements Lifecycle { CACHE = CacheBuilder.newBuilder() // 设置并发级别为cpu核心数 .concurrencyLevel(Runtime.getRuntime().availableProcessors()) - .expireAfterWrite(10, TimeUnit.SECONDS) - .expireAfterAccess(10, TimeUnit.SECONDS) .build(); } diff --git a/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/DispatchService.java b/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/DispatchService.java index 705dd3407..0e7433099 100644 --- a/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/DispatchService.java +++ b/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/DispatchService.java @@ -124,12 +124,10 @@ public class DispatchService implements Lifecycle { for (ServerNode serverNode : serverNodes) { RateLimiter rateLimiter = rateLimiterCache.getIfPresent(serverNode.getHostId()); if (Objects.isNull(rateLimiter)) { - rateLimiterCache.put(groupName, RateLimiter.create(systemProperties.getLimiter())); + rateLimiterCache.put(serverNode.getHostId(), RateLimiter.create(systemProperties.getLimiter())); } } - rateLimiterCache.invalidateAll(); - } /** diff --git a/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/exec/ExecUnitActor.java b/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/exec/ExecUnitActor.java index f0cf54421..20f0a5b22 100644 --- a/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/exec/ExecUnitActor.java +++ b/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/exec/ExecUnitActor.java @@ -98,7 +98,6 @@ public class ExecUnitActor extends AbstractActor { retryTaskLog.setId(null); Assert.isTrue(1 == retryTaskLogMapper.insert(retryTaskLog), new XRetryServerException("新增重试日志失败")); - } }).build(); diff --git a/x-retry-server/src/main/java/com/x/retry/server/support/strategy/FilterStrategies.java b/x-retry-server/src/main/java/com/x/retry/server/support/strategy/FilterStrategies.java index 7cacdcf1e..8c86883c7 100644 --- a/x-retry-server/src/main/java/com/x/retry/server/support/strategy/FilterStrategies.java +++ b/x-retry-server/src/main/java/com/x/retry/server/support/strategy/FilterStrategies.java @@ -175,7 +175,7 @@ public class FilterStrategies { ServerNode serverNode = context.getServerNode(); RateLimiter rateLimiter = CacheGroupRateLimiter.getRateLimiterByKey(serverNode.getHostId()); - if (!rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) { + if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(100, TimeUnit.MILLISECONDS)) { LogUtils.error("该POD:[{}]已到达最大限流阈值,本次重试不执行", serverNode.getHostId()); return Boolean.FALSE; } diff --git a/x-retry-server/src/main/resources/application.yml b/x-retry-server/src/main/resources/application.yml index 97ae22d90..7dc1b5d24 100644 --- a/x-retry-server/src/main/resources/application.yml +++ b/x-retry-server/src/main/resources/application.yml @@ -35,5 +35,6 @@ x-retry: retryPullPageSize: 100 # 拉取重试数据的每批次的大小 nettyPort: 1788 # 服务端netty端口 totalPartition: 32 # 重试和死信表的分区总数 + limiter: 10