feat:(1.3.0-beta1): 优化服务端续租

This commit is contained in:
opensnail 2024-12-28 22:50:49 +08:00
parent 45af9d09b9
commit ef11025fee
2 changed files with 17 additions and 15 deletions

View File

@ -90,16 +90,14 @@ public class ClientRegister extends AbstractRegister {
}
public static List<ServerNode> getExpireNodes() {
try {
ServerNode serverNode = QUEUE.poll(5L, TimeUnit.SECONDS);
if (Objects.nonNull(serverNode)) {
List<ServerNode> lists = Lists.newArrayList(serverNode);
QUEUE.drainTo(lists, 256);
return lists;
}
} catch (InterruptedException e) {
SnailJobLog.LOCAL.error("client get expireNodes error.");
ServerNode serverNode = QUEUE.poll();
if (Objects.nonNull(serverNode)) {
List<ServerNode> lists = Lists.newArrayList(serverNode);
QUEUE.drainTo(lists, 256);
return lists;
}
return null;
}
@ -121,8 +119,7 @@ public class ClientRegister extends AbstractRegister {
@Component
public class RefreshNodeSchedule extends AbstractSchedule {
private final ExecutorService executorService = Executors.newFixedThreadPool(5);
private ThreadPoolExecutor refreshNodePool;
@Override
protected void doExecute() {
try {
@ -145,7 +142,7 @@ public class ClientRegister extends AbstractRegister {
if (!serverNodes.isEmpty()) {
// 并行获取所有服务端需要注册的列表
// 获取列表 并完成注册/本地完成续签
List<ServerNode> allClientList = collectAllClientQueue(serverNodes);
List<ServerNode> allClientList = pullRemoteNodeClientRegisterInfo(serverNodes);
if (CollUtil.isNotEmpty(allClientList)) {
waitRefreshDBClientNodes.addAll(allClientList);
}
@ -166,7 +163,7 @@ public class ClientRegister extends AbstractRegister {
}
}
private List<ServerNode> collectAllClientQueue(List<ServerNode> serverNodes) {
private List<ServerNode> pullRemoteNodeClientRegisterInfo(List<ServerNode> serverNodes) {
if (CollUtil.isEmpty(serverNodes)) {
return Lists.newArrayList();
}
@ -175,7 +172,7 @@ public class ClientRegister extends AbstractRegister {
// 存储处理结果
List<Future<String>> futures = new ArrayList<>(size);
for (ServerNode serverNode : serverNodes) {
Future<String> future = executorService.submit(() -> {
Future<String> future = refreshNodePool.submit(() -> {
try {
RegisterNodeInfo nodeInfo = new RegisterNodeInfo();
nodeInfo.setHostId(serverNode.getHostId());
@ -198,6 +195,7 @@ public class ClientRegister extends AbstractRegister {
return futures.stream()
.map(future -> {
try {
// 后面可以考虑配置
String jsonString = future.get(1, TimeUnit.SECONDS);
if (Objects.nonNull(jsonString)) {
return JsonUtil.parseList(jsonString, ServerNode.class);
@ -241,6 +239,10 @@ public class ClientRegister extends AbstractRegister {
}
public void startScheduler() {
// 后面可以考虑配置
refreshNodePool = new ThreadPoolExecutor(4, 8, 1, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(1000));
refreshNodePool.allowCoreThreadTimeOut(true);
taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT5S"));
}
}

View File

@ -31,7 +31,7 @@ public class NettyHttpClientHandler extends SimpleChannelInboundHandler<FullHttp
String content = response.content().toString(CharsetUtil.UTF_8);
HttpHeaders headers = response.headers();
SnailJobLog.LOCAL.info("Receive server data content:[{}], headers:[{}]", content, headers);
SnailJobLog.LOCAL.debug("Receive server data content:[{}], headers:[{}]", content, headers);
SnailJobRpcResult snailJobRpcResult = JsonUtil.parseObject(content, SnailJobRpcResult.class);
RpcContext.invoke(snailJobRpcResult.getReqId(), snailJobRpcResult, false);