调整限流缓存时间
This commit is contained in:
parent
64b2046498
commit
a811876e61
@ -29,7 +29,7 @@ public class TestExistsTransactionalRetryService {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private RemoteService remoteService;
|
private RemoteService remoteService;
|
||||||
|
|
||||||
@Retryable(scene = "testSimpleInsert", bizNo = "#name", localTimes = 5)
|
@Retryable(scene = "testSimpleInsert", bizNo = "#name", localTimes = 3)
|
||||||
@Transactional
|
@Transactional
|
||||||
public String testSimpleInsert(String name) {
|
public String testSimpleInsert(String name) {
|
||||||
|
|
||||||
|
@ -79,7 +79,7 @@ public class ExistsTransactionalRetryServiceTest {
|
|||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
@Test
|
@Test
|
||||||
public void syncTestSimpleInsert() {
|
public void syncTestSimpleInsert() {
|
||||||
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
|
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 50, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
|
||||||
|
|
||||||
Mockito.when(remoteService.call())
|
Mockito.when(remoteService.call())
|
||||||
.thenReturn(new Result(0, "1"))
|
.thenReturn(new Result(0, "1"))
|
||||||
@ -89,7 +89,7 @@ public class ExistsTransactionalRetryServiceTest {
|
|||||||
.thenReturn(new Result(0, "5"))
|
.thenReturn(new Result(0, "5"))
|
||||||
;
|
;
|
||||||
try {
|
try {
|
||||||
for (int i = 0; i < 100; i++) {
|
for (int i = 0; i < 300; i++) {
|
||||||
threadPoolExecutor.execute(() -> testExistsTransactionalRetryService.testSimpleInsert(UUID.randomUUID().toString()));
|
threadPoolExecutor.execute(() -> testExistsTransactionalRetryService.testSimpleInsert(UUID.randomUUID().toString()));
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
FROM openjdk:8-jdk-alpine
|
FROM openjdk:8-jdk-alpine
|
||||||
MAINTAINER www.byteblogs.com
|
MAINTAINER www.byteblogs.com
|
||||||
|
LABEL server-name=x-retry-server
|
||||||
|
|
||||||
ADD ./target/x-retry-server.jar x-retry-server.jar
|
ADD ./target/x-retry-server.jar x-retry-server.jar
|
||||||
|
|
||||||
|
@ -47,8 +47,6 @@ public class CacheGroupRateLimiter implements Lifecycle {
|
|||||||
CACHE = CacheBuilder.newBuilder()
|
CACHE = CacheBuilder.newBuilder()
|
||||||
// 设置并发级别为cpu核心数
|
// 设置并发级别为cpu核心数
|
||||||
.concurrencyLevel(Runtime.getRuntime().availableProcessors())
|
.concurrencyLevel(Runtime.getRuntime().availableProcessors())
|
||||||
.expireAfterWrite(10, TimeUnit.SECONDS)
|
|
||||||
.expireAfterAccess(10, TimeUnit.SECONDS)
|
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -124,12 +124,10 @@ public class DispatchService implements Lifecycle {
|
|||||||
for (ServerNode serverNode : serverNodes) {
|
for (ServerNode serverNode : serverNodes) {
|
||||||
RateLimiter rateLimiter = rateLimiterCache.getIfPresent(serverNode.getHostId());
|
RateLimiter rateLimiter = rateLimiterCache.getIfPresent(serverNode.getHostId());
|
||||||
if (Objects.isNull(rateLimiter)) {
|
if (Objects.isNull(rateLimiter)) {
|
||||||
rateLimiterCache.put(groupName, RateLimiter.create(systemProperties.getLimiter()));
|
rateLimiterCache.put(serverNode.getHostId(), RateLimiter.create(systemProperties.getLimiter()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
rateLimiterCache.invalidateAll();
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -98,7 +98,6 @@ public class ExecUnitActor extends AbstractActor {
|
|||||||
retryTaskLog.setId(null);
|
retryTaskLog.setId(null);
|
||||||
Assert.isTrue(1 == retryTaskLogMapper.insert(retryTaskLog),
|
Assert.isTrue(1 == retryTaskLogMapper.insert(retryTaskLog),
|
||||||
new XRetryServerException("新增重试日志失败"));
|
new XRetryServerException("新增重试日志失败"));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}).build();
|
}).build();
|
||||||
|
@ -175,7 +175,7 @@ public class FilterStrategies {
|
|||||||
ServerNode serverNode = context.getServerNode();
|
ServerNode serverNode = context.getServerNode();
|
||||||
|
|
||||||
RateLimiter rateLimiter = CacheGroupRateLimiter.getRateLimiterByKey(serverNode.getHostId());
|
RateLimiter rateLimiter = CacheGroupRateLimiter.getRateLimiterByKey(serverNode.getHostId());
|
||||||
if (!rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) {
|
if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(100, TimeUnit.MILLISECONDS)) {
|
||||||
LogUtils.error("该POD:[{}]已到达最大限流阈值,本次重试不执行", serverNode.getHostId());
|
LogUtils.error("该POD:[{}]已到达最大限流阈值,本次重试不执行", serverNode.getHostId());
|
||||||
return Boolean.FALSE;
|
return Boolean.FALSE;
|
||||||
}
|
}
|
||||||
|
@ -35,5 +35,6 @@ x-retry:
|
|||||||
retryPullPageSize: 100 # 拉取重试数据的每批次的大小
|
retryPullPageSize: 100 # 拉取重试数据的每批次的大小
|
||||||
nettyPort: 1788 # 服务端netty端口
|
nettyPort: 1788 # 服务端netty端口
|
||||||
totalPartition: 32 # 重试和死信表的分区总数
|
totalPartition: 32 # 重试和死信表的分区总数
|
||||||
|
limiter: 10
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user