feat(sj_1.0.0): 优化RPC组件

This commit is contained in:
opensnail 2024-05-11 13:14:04 +08:00
parent 5f0f0ad8ae
commit c913337a1a
12 changed files with 195 additions and 185 deletions

View File

@ -33,6 +33,7 @@ public class SyncRemoteConfig implements Lifecycle {
try {
NettyClient client = RequestBuilder.<NettyClient, NettyResult>newBuilder()
.client(NettyClient.class)
.timeout(1000L)
.callback(nettyResult -> {
if (Objects.isNull(nettyResult.getData())) {
SnailJobLog.LOCAL.error("获取配置结果为null");

View File

@ -4,6 +4,7 @@ import com.aizuda.snailjob.client.common.event.SnailChannelReconnectEvent;
import com.aizuda.snailjob.client.common.NettyClient;
import com.aizuda.snailjob.common.core.constant.SystemConstants.BEAT;
import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.rpc.RpcContext;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.util.JsonUtil;
@ -50,7 +51,7 @@ public class NettyHttpClientHandler extends SimpleChannelInboundHandler<FullHttp
SnailJobLog.LOCAL.debug("Receive server data content:[{}], headers:[{}]", content, headers);
NettyResult nettyResult = JsonUtil.parseObject(content, NettyResult.class);
RpcContext.invoke(nettyResult.getRequestId(), nettyResult);
RpcContext.invoke(nettyResult.getRequestId(), nettyResult, false);
}

View File

@ -1,6 +1,7 @@
package com.aizuda.snailjob.client.common.rpc.client;
import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
import com.aizuda.snailjob.common.core.model.Result;
import java.lang.reflect.Proxy;
import java.util.Objects;
@ -14,7 +15,7 @@ import java.util.function.Consumer;
* @date : 2023-05-12 16:47
* @since 1.3.0
*/
public class RequestBuilder<T, R> {
public class RequestBuilder<T, R extends Result<Object>> {
private Class<T> clintInterface;
@ -22,11 +23,11 @@ public class RequestBuilder<T, R> {
private boolean async = true;
private long timeout = 60*1000;
private long timeout = 60 * 1000;
private TimeUnit unit = TimeUnit.MILLISECONDS;
public static <T, R> RequestBuilder<T, R> newBuilder() {
public static <T, R extends Result<Object>> RequestBuilder<T, R> newBuilder() {
return new RequestBuilder<>();
}

View File

@ -5,14 +5,18 @@ import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.client.common.annotation.Mapping;
import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
import com.aizuda.snailjob.client.common.exception.SnailJobClientTimeOutException;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.core.rpc.RpcContext;
import com.aizuda.snailjob.common.core.rpc.SnailJobFuture;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import io.netty.handler.codec.http.HttpMethod;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.concurrent.CompletableFuture;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -25,7 +29,7 @@ import java.util.function.Consumer;
* @date : 2023-05-11 21:45
* @since 1.3.0
*/
public class RpcClientInvokeHandler<R> implements InvocationHandler {
public class RpcClientInvokeHandler<R extends Result<Object>> implements InvocationHandler {
private final Consumer<R> consumer;
private final boolean async;
@ -47,33 +51,36 @@ public class RpcClientInvokeHandler<R> implements InvocationHandler {
sw.start("request start " + snailJobRequest.getReqId());
SnailJobFuture<R> newFuture = SnailJobFuture.newFuture(snailJobRequest.getReqId(),
timeout,
unit);
RpcContext.setFuture(newFuture);
try {
NettyChannel.send(HttpMethod.valueOf(annotation.method().name()), annotation.path(), snailJobRequest.toString());
} finally {
sw.stop();
}
CompletableFuture<R> completableFuture = null;
if (async) {
RpcContext.setCompletableFuture(snailJobRequest.getReqId(), consumer);
} else {
completableFuture = new CompletableFuture<>();
RpcContext.setCompletableFuture(snailJobRequest.getReqId(), completableFuture);
}
SnailJobLog.LOCAL.debug("request complete requestId:[{}] 耗时:[{}ms]", snailJobRequest.getReqId(), sw.getTotalTimeMillis());
SnailJobLog.LOCAL.debug("request complete requestId:[{}] 耗时:[{}ms]", snailJobRequest.getReqId(), sw.getTotalTimeMillis());
if (async) {
newFuture.whenComplete((r, t) -> {
if (Objects.nonNull(t)) {
consumer.accept(
(R) new NettyResult(StatusEnum.NO.getStatus(), t.getMessage(), null, snailJobRequest.getReqId()));
} else {
consumer.accept(r);
}
});
return null;
} else {
Assert.notNull(completableFuture, () -> new SnailJobClientException("completableFuture is null"));
Assert.notNull(newFuture, () -> new SnailJobClientException("completableFuture is null"));
try {
return completableFuture.get(timeout, unit);
return newFuture.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw new SnailJobClientException("Request to remote interface exception. path:[{}]", annotation.path());
} catch (TimeoutException e) {
throw new SnailJobClientTimeOutException("Request to remote interface timed out. path:[{}]", annotation.path());
} finally {
RpcContext.remove(snailJobRequest.getReqId());
}
}

View File

@ -1,76 +0,0 @@
package com.aizuda.snailjob.client.common.rpc.client;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.common.core.model.NettyResult;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
/**
* 处理请求服务端时需要存储的中间值
*
* @author: opensnail
* @date : 2023-05-12 09:05
* @since 1.3.0
*/
@Slf4j
public final class RpcContext {
private RpcContext() {}
private static final ConcurrentMap<Long, CompletableFuture> COMPLETABLE_FUTURE = new ConcurrentHashMap<>();
private static final ConcurrentMap<Long, Consumer> CALLBACK_CONSUMER = new ConcurrentHashMap<>();
public static void invoke(Long requestId, NettyResult nettyResult) {
try {
// 同步请求同步返回
Optional.ofNullable(getCompletableFuture(requestId)).ifPresent(completableFuture -> completableFuture.complete(nettyResult));
// 异步请求进行回调
Optional.ofNullable(getConsumer(requestId)).ifPresent(consumer -> consumer.accept(nettyResult));
} catch (Exception e) {
SnailJobLog.LOCAL.error("回调处理失败 requestId:[{}]",requestId, e );
} finally {
remove(requestId);
}
}
public static void remove(Long requestId) {
COMPLETABLE_FUTURE.remove(requestId);
CALLBACK_CONSUMER.remove(requestId);
}
public static <R> void setCompletableFuture(long id, CompletableFuture<R> completableFuture, Consumer<R> callable) {
if (Objects.nonNull(completableFuture)) {
COMPLETABLE_FUTURE.put(id, completableFuture);
}
if (Objects.nonNull(callable)) {
CALLBACK_CONSUMER.put(id, callable);
}
}
public static <R> void setCompletableFuture(Long id, Consumer<R> callable) {
setCompletableFuture(id, null, callable);
}
public static <R> void setCompletableFuture(Long id, CompletableFuture<R> completableFuture) {
setCompletableFuture(id, completableFuture, null);
}
public static CompletableFuture<Object> getCompletableFuture(Long id) {
return COMPLETABLE_FUTURE.get(id);
}
public static Consumer getConsumer(Long id) {
return CALLBACK_CONSUMER.get(id);
}
}

View File

@ -84,6 +84,10 @@
<groupId>com.aizuda</groupId>
<artifactId>snail-job-common-log</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,36 @@
package com.aizuda.snailjob.common.core.exception;
/**
* @author: opensnail
* @date : 2022-03-03 14:49
*/
public class SnailJobRemotingTimeOutException extends BaseSnailJobException {
public SnailJobRemotingTimeOutException(String message) {
super(message);
}
public SnailJobRemotingTimeOutException(String message, Throwable cause) {
super(message, cause);
}
public SnailJobRemotingTimeOutException(Throwable cause) {
super(cause);
}
public SnailJobRemotingTimeOutException(String message, Object... arguments) {
super(message, arguments);
}
public SnailJobRemotingTimeOutException(String message, Object[] arguments, Throwable cause) {
super(message, arguments, cause);
}
public SnailJobRemotingTimeOutException(String message, Object argument, Throwable cause) {
super(message, argument, cause);
}
public SnailJobRemotingTimeOutException(String message, Object argument) {
super(message, argument);
}
}

View File

@ -0,0 +1,84 @@
package com.aizuda.snailjob.common.core.rpc;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.exception.SnailJobRemotingTimeOutException;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.log.SnailJobLog;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* 处理RPC超时和回调
*
* @author: opensnail
* @date : 2023-05-12 09:05
* @since 1.3.0
*/
@Slf4j
public final class RpcContext {
private RpcContext() {
}
private static final HashedWheelTimer WHEEL_TIMER;
static {
WHEEL_TIMER = new HashedWheelTimer(
new CustomizableThreadFactory("snail-job-rpc-timeout-"), 1,
TimeUnit.SECONDS, 1024);
}
private static final ConcurrentMap<Long, SnailJobFuture> COMPLETABLE_FUTURE = new ConcurrentHashMap<>();
public static void invoke(Long requestId, NettyResult nettyResult, boolean timeout) {
try {
// 同步请求同步返回
Optional.ofNullable(COMPLETABLE_FUTURE.remove(requestId))
.ifPresent(future -> {
if (timeout) {
future.completeExceptionally(new SnailJobRemotingTimeOutException("Request to remote interface timed out."));
} else {
future.complete(nettyResult);
}
});
} catch (Exception e) {
SnailJobLog.LOCAL.error("回调处理失败 requestId:[{}]", requestId, e);
}
}
public static <R extends Result<Object>> void setFuture(SnailJobFuture<R> future) {
if (Objects.nonNull(future)) {
COMPLETABLE_FUTURE.put(future.getRequestId(), future);
}
// 放入时间轮
WHEEL_TIMER.newTimeout(new TimeoutCheckTask(future.getRequestId()), future.getTimeout(), future.getUnit());
}
public static class TimeoutCheckTask implements TimerTask {
private final Long requestId;
public TimeoutCheckTask(Long requestId) {
this.requestId = requestId;
}
@Override
public void run(final Timeout timeout) throws Exception {
invoke(requestId, new NettyResult(StatusEnum.NO.getStatus(), "Request to remote interface timed out.", null, requestId), true);
}
}
}

View File

@ -0,0 +1,27 @@
package com.aizuda.snailjob.common.core.rpc;
import com.aizuda.snailjob.common.core.model.Result;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* @author: opensanil
* @date : 2024-05-11
* @since : sj_1.0.0
*/
@AllArgsConstructor
@Getter
public class SnailJobFuture<R extends Result<Object>> extends CompletableFuture<R> {
private final Long requestId;
private final long timeout;
private final TimeUnit unit;
public static <R extends Result<Object>> SnailJobFuture<R> newFuture(Long requestId, long timeout, TimeUnit unit) {
return new SnailJobFuture<>(requestId, timeout, unit);
}
}

View File

@ -1,6 +1,7 @@
package com.aizuda.snailjob.server.common.rpc.client;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.rpc.RpcContext;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import io.netty.channel.ChannelHandlerContext;
@ -32,7 +33,7 @@ public class NettyHttpClientHandler extends SimpleChannelInboundHandler<FullHttp
SnailJobLog.LOCAL.info("Receive server data content:[{}], headers:[{}]", content, headers);
NettyResult nettyResult = JsonUtil.parseObject(content, NettyResult.class);
RpcContext.invoke(nettyResult.getRequestId(), nettyResult);
RpcContext.invoke(nettyResult.getRequestId(), nettyResult, false);
}

View File

@ -4,8 +4,12 @@ import cn.hutool.core.date.StopWatch;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.core.rpc.RpcContext;
import com.aizuda.snailjob.common.core.rpc.SnailJobFuture;
import com.aizuda.snailjob.common.core.util.NetUtil;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
@ -147,6 +151,11 @@ public class RpcClientInvokeHandler implements InvocationHandler {
sw.start("request start " + snailJobRequest.getReqId());
SnailJobFuture newFuture = SnailJobFuture.newFuture(snailJobRequest.getReqId(),
Optional.ofNullable(executorTimeout).orElse(20),
TimeUnit.SECONDS);
RpcContext.setFuture(newFuture);
try {
NettyChannel.send(hostId, hostIp, hostPort,
HttpMethod.valueOf(mapping.method().name()), // 拼接 url?a=1&b=1
@ -155,30 +164,21 @@ public class RpcClientInvokeHandler implements InvocationHandler {
sw.stop();
}
CompletableFuture completableFuture = null;
if (async) {
// RpcContext.setCompletableFuture(snailJobRequest.getReqId(), null);
} else {
completableFuture = new CompletableFuture<>();
RpcContext.setCompletableFuture(snailJobRequest.getReqId(), completableFuture);
}
SnailJobLog.LOCAL.debug("request complete requestId:[{}] 耗时:[{}ms]", snailJobRequest.getReqId(),
sw.getTotalTimeMillis());
if (async) {
// 暂时不支持异步调用
return null;
} else {
Assert.notNull(completableFuture, () -> new SnailJobServerException("completableFuture is null"));
Assert.notNull(newFuture, () -> new SnailJobServerException("completableFuture is null"));
try {
return (Result) completableFuture.get(Optional.ofNullable(executorTimeout).orElse(20), TimeUnit.SECONDS);
return (Result) newFuture.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw new SnailJobServerException("Request to remote interface exception. path:[{}]",
mapping.path());
} catch (TimeoutException e) {
throw new SnailJobServerException("Request to remote interface timed out. path:[{}]",
mapping.path());
} finally {
RpcContext.remove(snailJobRequest.getReqId());
}
}

View File

@ -1,76 +0,0 @@
package com.aizuda.snailjob.server.common.rpc.client;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.log.SnailJobLog;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
/**
* 处理请求服务端时需要存储的中间值
*
* @author: opensnail
* @date : 2023-05-12 09:05
* @since 1.3.0
*/
@Slf4j
public final class RpcContext {
private RpcContext() {}
private static final ConcurrentMap<Long, CompletableFuture> COMPLETABLE_FUTURE = new ConcurrentHashMap<>();
private static final ConcurrentMap<Long, Consumer> CALLBACK_CONSUMER = new ConcurrentHashMap<>();
public static void invoke(Long requestId, NettyResult nettyResult) {
try {
// 同步请求同步返回
Optional.ofNullable(getCompletableFuture(requestId)).ifPresent(completableFuture -> completableFuture.complete(nettyResult));
// 异步请求进行回调
Optional.ofNullable(getConsumer(requestId)).ifPresent(consumer -> consumer.accept(nettyResult));
} catch (Exception e) {
SnailJobLog.LOCAL.error("回调处理失败 requestId:[{}]",requestId, e );
} finally {
remove(requestId);
}
}
public static void remove(Long requestId) {
COMPLETABLE_FUTURE.remove(requestId);
CALLBACK_CONSUMER.remove(requestId);
}
public static <R> void setCompletableFuture(long id, CompletableFuture<R> completableFuture, Consumer<R> callable) {
if (Objects.nonNull(completableFuture)) {
COMPLETABLE_FUTURE.put(id, completableFuture);
}
if (Objects.nonNull(callable)) {
CALLBACK_CONSUMER.put(id, callable);
}
}
public static <R> void setCompletableFuture(Long id, Consumer<R> callable) {
setCompletableFuture(id, null, callable);
}
public static <R> void setCompletableFuture(Long id, CompletableFuture<R> completableFuture) {
setCompletableFuture(id, completableFuture, null);
}
public static CompletableFuture<Object> getCompletableFuture(Long id) {
return COMPLETABLE_FUTURE.get(id);
}
public static Consumer getConsumer(Long id) {
return CALLBACK_CONSUMER.get(id);
}
}