diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/ClientRegister.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/ClientRegister.java index 141c3c43..0fcf148d 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/ClientRegister.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/ClientRegister.java @@ -90,16 +90,14 @@ public class ClientRegister extends AbstractRegister { } public static List getExpireNodes() { - try { - ServerNode serverNode = QUEUE.poll(5L, TimeUnit.SECONDS); - if (Objects.nonNull(serverNode)) { - List lists = Lists.newArrayList(serverNode); - QUEUE.drainTo(lists, 256); - return lists; - } - } catch (InterruptedException e) { - SnailJobLog.LOCAL.error("client get expireNodes error."); + + ServerNode serverNode = QUEUE.poll(); + if (Objects.nonNull(serverNode)) { + List lists = Lists.newArrayList(serverNode); + QUEUE.drainTo(lists, 256); + return lists; } + return null; } @@ -121,8 +119,7 @@ public class ClientRegister extends AbstractRegister { @Component public class RefreshNodeSchedule extends AbstractSchedule { - private final ExecutorService executorService = Executors.newFixedThreadPool(5); - + private ThreadPoolExecutor refreshNodePool; @Override protected void doExecute() { try { @@ -145,7 +142,7 @@ public class ClientRegister extends AbstractRegister { if (!serverNodes.isEmpty()) { // 并行获取所有服务端需要注册的列表 // 获取列表 并完成注册/本地完成续签 - List allClientList = collectAllClientQueue(serverNodes); + List allClientList = pullRemoteNodeClientRegisterInfo(serverNodes); if (CollUtil.isNotEmpty(allClientList)) { waitRefreshDBClientNodes.addAll(allClientList); } @@ -166,7 +163,7 @@ public class ClientRegister extends AbstractRegister { } } - private List collectAllClientQueue(List serverNodes) { + private List pullRemoteNodeClientRegisterInfo(List serverNodes) { if (CollUtil.isEmpty(serverNodes)) { return Lists.newArrayList(); } @@ -175,7 +172,7 @@ public class ClientRegister extends AbstractRegister { // 存储处理结果 List> futures = new ArrayList<>(size); for (ServerNode serverNode : serverNodes) { - Future future = executorService.submit(() -> { + Future future = refreshNodePool.submit(() -> { try { RegisterNodeInfo nodeInfo = new RegisterNodeInfo(); nodeInfo.setHostId(serverNode.getHostId()); @@ -198,6 +195,7 @@ public class ClientRegister extends AbstractRegister { return futures.stream() .map(future -> { try { + // 后面可以考虑配置 String jsonString = future.get(1, TimeUnit.SECONDS); if (Objects.nonNull(jsonString)) { return JsonUtil.parseList(jsonString, ServerNode.class); @@ -241,6 +239,10 @@ public class ClientRegister extends AbstractRegister { } public void startScheduler() { + // 后面可以考虑配置 + refreshNodePool = new ThreadPoolExecutor(4, 8, 1, TimeUnit.SECONDS, + new LinkedBlockingDeque<>(1000)); + refreshNodePool.allowCoreThreadTimeOut(true); taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT5S")); } } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/NettyHttpClientHandler.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/NettyHttpClientHandler.java index 29c9144f..fc1a4fde 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/NettyHttpClientHandler.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/NettyHttpClientHandler.java @@ -31,7 +31,7 @@ public class NettyHttpClientHandler extends SimpleChannelInboundHandler