feat(sj_1.0.0): 优化RPC组件

This commit is contained in:
opensnail 2024-05-10 18:27:37 +08:00
parent a7f0398746
commit b34242420c
4 changed files with 21 additions and 9 deletions

View File

@ -72,6 +72,8 @@ public class RpcClientInvokeHandler<R> implements InvocationHandler {
throw new SnailJobClientException("Request to remote interface exception. path:[{}]", annotation.path());
} catch (TimeoutException e) {
throw new SnailJobClientTimeOutException("Request to remote interface timed out. path:[{}]", annotation.path());
} finally {
RpcContext.remove(snailJobRequest.getReqId());
}
}

View File

@ -19,7 +19,8 @@ import java.util.function.Consumer;
* @since 1.3.0
*/
@Slf4j
public class RpcContext {
public final class RpcContext {
private RpcContext() {}
private static final ConcurrentMap<Long, CompletableFuture> COMPLETABLE_FUTURE = new ConcurrentHashMap<>();
@ -36,12 +37,16 @@ public class RpcContext {
} catch (Exception e) {
SnailJobLog.LOCAL.error("回调处理失败 requestId:[{}]",requestId, e );
} finally {
COMPLETABLE_FUTURE.remove(requestId);
CALLBACK_CONSUMER.remove(requestId);
remove(requestId);
}
}
public static void remove(Long requestId) {
COMPLETABLE_FUTURE.remove(requestId);
CALLBACK_CONSUMER.remove(requestId);
}
public static <R> void setCompletableFuture(long id, CompletableFuture<R> completableFuture, Consumer<R> callable) {
if (Objects.nonNull(completableFuture)) {
COMPLETABLE_FUTURE.put(id, completableFuture);

View File

@ -60,7 +60,6 @@ public class RpcClientInvokeHandler implements InvocationHandler {
private String hostId;
private String hostIp;
private Integer hostPort;
private String contextPath;
private final boolean failRetry;
private final int retryTimes;
private final int retryInterval;
@ -80,7 +79,6 @@ public class RpcClientInvokeHandler implements InvocationHandler {
this.hostId = registerNodeInfo.getHostId();
this.hostPort = registerNodeInfo.getHostPort();
this.hostIp = registerNodeInfo.getHostIp();
this.contextPath = registerNodeInfo.getContextPath();
this.failRetry = failRetry;
this.retryTimes = retryTimes;
this.retryInterval = retryInterval;
@ -150,6 +148,7 @@ public class RpcClientInvokeHandler implements InvocationHandler {
sw.start("request start " + snailJobRequest.getReqId());
CompletableFuture completableFuture = null;
// 暂不支持异步
if (async) {
// RpcContext.setCompletableFuture(snailJobRequest.getReqId(), null);
} else {
@ -179,6 +178,8 @@ public class RpcClientInvokeHandler implements InvocationHandler {
} catch (TimeoutException e) {
throw new SnailJobServerException("Request to remote interface timed out. path:[{}]",
mapping.path());
} finally {
RpcContext.remove(snailJobRequest.getReqId());
}
}
@ -210,7 +211,6 @@ public class RpcClientInvokeHandler implements InvocationHandler {
this.hostId = serverNode.getHostId();
this.hostPort = serverNode.getHostPort();
this.hostIp = serverNode.getHostIp();
this.contextPath = serverNode.getContextPath();
} else {
// 其他异常继续抛出

View File

@ -19,7 +19,8 @@ import java.util.function.Consumer;
* @since 1.3.0
*/
@Slf4j
public class RpcContext {
public final class RpcContext {
private RpcContext() {}
private static final ConcurrentMap<Long, CompletableFuture> COMPLETABLE_FUTURE = new ConcurrentHashMap<>();
@ -36,12 +37,16 @@ public class RpcContext {
} catch (Exception e) {
SnailJobLog.LOCAL.error("回调处理失败 requestId:[{}]",requestId, e );
} finally {
COMPLETABLE_FUTURE.remove(requestId);
CALLBACK_CONSUMER.remove(requestId);
remove(requestId);
}
}
public static void remove(Long requestId) {
COMPLETABLE_FUTURE.remove(requestId);
CALLBACK_CONSUMER.remove(requestId);
}
public static <R> void setCompletableFuture(long id, CompletableFuture<R> completableFuture, Consumer<R> callable) {
if (Objects.nonNull(completableFuture)) {
COMPLETABLE_FUTURE.put(id, completableFuture);