From b34242420c94e6d8f0d2e0fe9ea8b0e2acb23cd1 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Fri, 10 May 2024 18:27:37 +0800 Subject: [PATCH] =?UTF-8?q?feat(sj=5F1.0.0):=20=E4=BC=98=E5=8C=96RPC?= =?UTF-8?q?=E7=BB=84=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/rpc/client/RpcClientInvokeHandler.java | 2 ++ .../snailjob/client/common/rpc/client/RpcContext.java | 11 ++++++++--- .../common/rpc/client/RpcClientInvokeHandler.java | 6 +++--- .../snailjob/server/common/rpc/client/RpcContext.java | 11 ++++++++--- 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/RpcClientInvokeHandler.java b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/RpcClientInvokeHandler.java index 6d7d0d820..ee3d5bdc1 100644 --- a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/RpcClientInvokeHandler.java +++ b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/RpcClientInvokeHandler.java @@ -72,6 +72,8 @@ public class RpcClientInvokeHandler implements InvocationHandler { throw new SnailJobClientException("Request to remote interface exception. path:[{}]", annotation.path()); } catch (TimeoutException e) { throw new SnailJobClientTimeOutException("Request to remote interface timed out. path:[{}]", annotation.path()); + } finally { + RpcContext.remove(snailJobRequest.getReqId()); } } diff --git a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/RpcContext.java b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/RpcContext.java index 513463d73..365aca8dc 100644 --- a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/RpcContext.java +++ b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/RpcContext.java @@ -19,7 +19,8 @@ import java.util.function.Consumer; * @since 1.3.0 */ @Slf4j -public class RpcContext { +public final class RpcContext { + private RpcContext() {} private static final ConcurrentMap COMPLETABLE_FUTURE = new ConcurrentHashMap<>(); @@ -36,12 +37,16 @@ public class RpcContext { } catch (Exception e) { SnailJobLog.LOCAL.error("回调处理失败 requestId:[{}]",requestId, e ); } finally { - COMPLETABLE_FUTURE.remove(requestId); - CALLBACK_CONSUMER.remove(requestId); + remove(requestId); } } + public static void remove(Long requestId) { + COMPLETABLE_FUTURE.remove(requestId); + CALLBACK_CONSUMER.remove(requestId); + } + public static void setCompletableFuture(long id, CompletableFuture completableFuture, Consumer callable) { if (Objects.nonNull(completableFuture)) { COMPLETABLE_FUTURE.put(id, completableFuture); diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/RpcClientInvokeHandler.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/RpcClientInvokeHandler.java index 6b9b19cfa..2c3b83e68 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/RpcClientInvokeHandler.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/RpcClientInvokeHandler.java @@ -60,7 +60,6 @@ public class RpcClientInvokeHandler implements InvocationHandler { private String hostId; private String hostIp; private Integer hostPort; - private String contextPath; private final boolean failRetry; private final int retryTimes; private final int retryInterval; @@ -80,7 +79,6 @@ public class RpcClientInvokeHandler implements InvocationHandler { this.hostId = registerNodeInfo.getHostId(); this.hostPort = registerNodeInfo.getHostPort(); this.hostIp = registerNodeInfo.getHostIp(); - this.contextPath = registerNodeInfo.getContextPath(); this.failRetry = failRetry; this.retryTimes = retryTimes; this.retryInterval = retryInterval; @@ -150,6 +148,7 @@ public class RpcClientInvokeHandler implements InvocationHandler { sw.start("request start " + snailJobRequest.getReqId()); CompletableFuture completableFuture = null; + // 暂不支持异步 if (async) { // RpcContext.setCompletableFuture(snailJobRequest.getReqId(), null); } else { @@ -179,6 +178,8 @@ public class RpcClientInvokeHandler implements InvocationHandler { } catch (TimeoutException e) { throw new SnailJobServerException("Request to remote interface timed out. path:[{}]", mapping.path()); + } finally { + RpcContext.remove(snailJobRequest.getReqId()); } } @@ -210,7 +211,6 @@ public class RpcClientInvokeHandler implements InvocationHandler { this.hostId = serverNode.getHostId(); this.hostPort = serverNode.getHostPort(); this.hostIp = serverNode.getHostIp(); - this.contextPath = serverNode.getContextPath(); } else { // 其他异常继续抛出 diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/RpcContext.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/RpcContext.java index 4a204d5cc..c03b5313c 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/RpcContext.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/RpcContext.java @@ -19,7 +19,8 @@ import java.util.function.Consumer; * @since 1.3.0 */ @Slf4j -public class RpcContext { +public final class RpcContext { + private RpcContext() {} private static final ConcurrentMap COMPLETABLE_FUTURE = new ConcurrentHashMap<>(); @@ -36,12 +37,16 @@ public class RpcContext { } catch (Exception e) { SnailJobLog.LOCAL.error("回调处理失败 requestId:[{}]",requestId, e ); } finally { - COMPLETABLE_FUTURE.remove(requestId); - CALLBACK_CONSUMER.remove(requestId); + remove(requestId); } } + public static void remove(Long requestId) { + COMPLETABLE_FUTURE.remove(requestId); + CALLBACK_CONSUMER.remove(requestId); + } + public static void setCompletableFuture(long id, CompletableFuture completableFuture, Consumer callable) { if (Objects.nonNull(completableFuture)) { COMPLETABLE_FUTURE.put(id, completableFuture);