diff --git a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/handler/SyncRemoteConfig.java b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/handler/SyncRemoteConfig.java index d086140b..cee0ae74 100644 --- a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/handler/SyncRemoteConfig.java +++ b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/handler/SyncRemoteConfig.java @@ -33,6 +33,7 @@ public class SyncRemoteConfig implements Lifecycle { try { NettyClient client = RequestBuilder.newBuilder() .client(NettyClient.class) + .timeout(1000L) .callback(nettyResult -> { if (Objects.isNull(nettyResult.getData())) { SnailJobLog.LOCAL.error("获取配置结果为null"); diff --git a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/NettyHttpClientHandler.java b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/NettyHttpClientHandler.java index 547fda84..b7f34ba8 100644 --- a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/NettyHttpClientHandler.java +++ b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/NettyHttpClientHandler.java @@ -4,6 +4,7 @@ import com.aizuda.snailjob.client.common.event.SnailChannelReconnectEvent; import com.aizuda.snailjob.client.common.NettyClient; import com.aizuda.snailjob.common.core.constant.SystemConstants.BEAT; import com.aizuda.snailjob.common.core.context.SpringContext; +import com.aizuda.snailjob.common.core.rpc.RpcContext; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.common.core.model.NettyResult; import com.aizuda.snailjob.common.core.util.JsonUtil; @@ -50,7 +51,7 @@ public class NettyHttpClientHandler extends SimpleChannelInboundHandler { +public class RequestBuilder> { private Class clintInterface; @@ -22,11 +23,11 @@ public class RequestBuilder { private boolean async = true; - private long timeout = 60*1000; + private long timeout = 60 * 1000; private TimeUnit unit = TimeUnit.MILLISECONDS; - public static RequestBuilder newBuilder() { + public static > RequestBuilder newBuilder() { return new RequestBuilder<>(); } diff --git a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/RpcClientInvokeHandler.java b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/RpcClientInvokeHandler.java index 513ce564..201c3ac3 100644 --- a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/RpcClientInvokeHandler.java +++ b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/RpcClientInvokeHandler.java @@ -5,14 +5,18 @@ import cn.hutool.core.lang.Assert; import com.aizuda.snailjob.client.common.annotation.Mapping; import com.aizuda.snailjob.client.common.exception.SnailJobClientException; import com.aizuda.snailjob.client.common.exception.SnailJobClientTimeOutException; +import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.core.model.NettyResult; +import com.aizuda.snailjob.common.core.model.Result; +import com.aizuda.snailjob.common.core.rpc.RpcContext; +import com.aizuda.snailjob.common.core.rpc.SnailJobFuture; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.common.core.model.SnailJobRequest; import io.netty.handler.codec.http.HttpMethod; -import lombok.extern.slf4j.Slf4j; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; -import java.util.concurrent.CompletableFuture; +import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -25,7 +29,7 @@ import java.util.function.Consumer; * @date : 2023-05-11 21:45 * @since 1.3.0 */ -public class RpcClientInvokeHandler implements InvocationHandler { +public class RpcClientInvokeHandler> implements InvocationHandler { private final Consumer consumer; private final boolean async; @@ -47,33 +51,36 @@ public class RpcClientInvokeHandler implements InvocationHandler { sw.start("request start " + snailJobRequest.getReqId()); + SnailJobFuture newFuture = SnailJobFuture.newFuture(snailJobRequest.getReqId(), + timeout, + unit); + RpcContext.setFuture(newFuture); + try { NettyChannel.send(HttpMethod.valueOf(annotation.method().name()), annotation.path(), snailJobRequest.toString()); } finally { sw.stop(); } - CompletableFuture completableFuture = null; - if (async) { - RpcContext.setCompletableFuture(snailJobRequest.getReqId(), consumer); - } else { - completableFuture = new CompletableFuture<>(); - RpcContext.setCompletableFuture(snailJobRequest.getReqId(), completableFuture); - } - - SnailJobLog.LOCAL.debug("request complete requestId:[{}] 耗时:[{}ms]", snailJobRequest.getReqId(), sw.getTotalTimeMillis()); + SnailJobLog.LOCAL.debug("request complete requestId:[{}] 耗时:[{}ms]", snailJobRequest.getReqId(), sw.getTotalTimeMillis()); if (async) { + newFuture.whenComplete((r, t) -> { + if (Objects.nonNull(t)) { + consumer.accept( + (R) new NettyResult(StatusEnum.NO.getStatus(), t.getMessage(), null, snailJobRequest.getReqId())); + } else { + consumer.accept(r); + } + }); return null; } else { - Assert.notNull(completableFuture, () -> new SnailJobClientException("completableFuture is null")); + Assert.notNull(newFuture, () -> new SnailJobClientException("completableFuture is null")); try { - return completableFuture.get(timeout, unit); + return newFuture.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { throw new SnailJobClientException("Request to remote interface exception. path:[{}]", annotation.path()); } catch (TimeoutException e) { throw new SnailJobClientTimeOutException("Request to remote interface timed out. path:[{}]", annotation.path()); - } finally { - RpcContext.remove(snailJobRequest.getReqId()); } } diff --git a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/RpcContext.java b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/RpcContext.java deleted file mode 100644 index 365aca8d..00000000 --- a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/RpcContext.java +++ /dev/null @@ -1,76 +0,0 @@ -package com.aizuda.snailjob.client.common.rpc.client; - -import com.aizuda.snailjob.common.log.SnailJobLog; -import com.aizuda.snailjob.common.core.model.NettyResult; -import lombok.extern.slf4j.Slf4j; - -import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.function.Consumer; - -/** - * 处理请求服务端时需要存储的中间值 - * - * @author: opensnail - * @date : 2023-05-12 09:05 - * @since 1.3.0 - */ -@Slf4j -public final class RpcContext { - private RpcContext() {} - - private static final ConcurrentMap COMPLETABLE_FUTURE = new ConcurrentHashMap<>(); - - private static final ConcurrentMap CALLBACK_CONSUMER = new ConcurrentHashMap<>(); - - public static void invoke(Long requestId, NettyResult nettyResult) { - - try { - // 同步请求同步返回 - Optional.ofNullable(getCompletableFuture(requestId)).ifPresent(completableFuture -> completableFuture.complete(nettyResult)); - - // 异步请求进行回调 - Optional.ofNullable(getConsumer(requestId)).ifPresent(consumer -> consumer.accept(nettyResult)); - } catch (Exception e) { - SnailJobLog.LOCAL.error("回调处理失败 requestId:[{}]",requestId, e ); - } finally { - remove(requestId); - } - - } - - public static void remove(Long requestId) { - COMPLETABLE_FUTURE.remove(requestId); - CALLBACK_CONSUMER.remove(requestId); - } - - public static void setCompletableFuture(long id, CompletableFuture completableFuture, Consumer callable) { - if (Objects.nonNull(completableFuture)) { - COMPLETABLE_FUTURE.put(id, completableFuture); - } - - if (Objects.nonNull(callable)) { - CALLBACK_CONSUMER.put(id, callable); - } - - } - - public static void setCompletableFuture(Long id, Consumer callable) { - setCompletableFuture(id, null, callable); - } - - public static void setCompletableFuture(Long id, CompletableFuture completableFuture) { - setCompletableFuture(id, completableFuture, null); - } - - public static CompletableFuture getCompletableFuture(Long id) { - return COMPLETABLE_FUTURE.get(id); - } - - public static Consumer getConsumer(Long id) { - return CALLBACK_CONSUMER.get(id); - } -} diff --git a/snail-job-common/snail-job-common-core/pom.xml b/snail-job-common/snail-job-common-core/pom.xml index d6ff4394..1a732609 100644 --- a/snail-job-common/snail-job-common-core/pom.xml +++ b/snail-job-common/snail-job-common-core/pom.xml @@ -84,6 +84,10 @@ com.aizuda snail-job-common-log + + io.netty + netty-common + diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/exception/SnailJobRemotingTimeOutException.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/exception/SnailJobRemotingTimeOutException.java new file mode 100644 index 00000000..7547b3eb --- /dev/null +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/exception/SnailJobRemotingTimeOutException.java @@ -0,0 +1,36 @@ +package com.aizuda.snailjob.common.core.exception; + +/** + * @author: opensnail + * @date : 2022-03-03 14:49 + */ +public class SnailJobRemotingTimeOutException extends BaseSnailJobException { + + public SnailJobRemotingTimeOutException(String message) { + super(message); + } + + public SnailJobRemotingTimeOutException(String message, Throwable cause) { + super(message, cause); + } + + public SnailJobRemotingTimeOutException(Throwable cause) { + super(cause); + } + + public SnailJobRemotingTimeOutException(String message, Object... arguments) { + super(message, arguments); + } + + public SnailJobRemotingTimeOutException(String message, Object[] arguments, Throwable cause) { + super(message, arguments, cause); + } + + public SnailJobRemotingTimeOutException(String message, Object argument, Throwable cause) { + super(message, argument, cause); + } + + public SnailJobRemotingTimeOutException(String message, Object argument) { + super(message, argument); + } +} diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/rpc/RpcContext.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/rpc/RpcContext.java new file mode 100644 index 00000000..071806b9 --- /dev/null +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/rpc/RpcContext.java @@ -0,0 +1,84 @@ +package com.aizuda.snailjob.common.core.rpc; + +import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.core.exception.SnailJobRemotingTimeOutException; +import com.aizuda.snailjob.common.core.model.NettyResult; +import com.aizuda.snailjob.common.core.model.Result; +import com.aizuda.snailjob.common.log.SnailJobLog; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.concurrent.CustomizableThreadFactory; + +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +/** + * 处理RPC超时和回调 + * + * @author: opensnail + * @date : 2023-05-12 09:05 + * @since 1.3.0 + */ +@Slf4j +public final class RpcContext { + + private RpcContext() { + } + + private static final HashedWheelTimer WHEEL_TIMER; + + static { + WHEEL_TIMER = new HashedWheelTimer( + new CustomizableThreadFactory("snail-job-rpc-timeout-"), 1, + TimeUnit.SECONDS, 1024); + } + + private static final ConcurrentMap COMPLETABLE_FUTURE = new ConcurrentHashMap<>(); + + public static void invoke(Long requestId, NettyResult nettyResult, boolean timeout) { + + try { + // 同步请求同步返回 + Optional.ofNullable(COMPLETABLE_FUTURE.remove(requestId)) + .ifPresent(future -> { + if (timeout) { + future.completeExceptionally(new SnailJobRemotingTimeOutException("Request to remote interface timed out.")); + } else { + future.complete(nettyResult); + } + }); + + } catch (Exception e) { + SnailJobLog.LOCAL.error("回调处理失败 requestId:[{}]", requestId, e); + } + } + + public static > void setFuture(SnailJobFuture future) { + if (Objects.nonNull(future)) { + COMPLETABLE_FUTURE.put(future.getRequestId(), future); + } + + // 放入时间轮 + WHEEL_TIMER.newTimeout(new TimeoutCheckTask(future.getRequestId()), future.getTimeout(), future.getUnit()); + } + + public static class TimeoutCheckTask implements TimerTask { + + private final Long requestId; + public TimeoutCheckTask(Long requestId) { + this.requestId = requestId; + } + @Override + public void run(final Timeout timeout) throws Exception { + invoke(requestId, new NettyResult(StatusEnum.NO.getStatus(), "Request to remote interface timed out.", null, requestId), true); + } + } + +} diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/rpc/SnailJobFuture.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/rpc/SnailJobFuture.java new file mode 100644 index 00000000..56e010ea --- /dev/null +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/rpc/SnailJobFuture.java @@ -0,0 +1,27 @@ +package com.aizuda.snailjob.common.core.rpc; + +import com.aizuda.snailjob.common.core.model.Result; +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +/** + * @author: opensanil + * @date : 2024-05-11 + * @since : sj_1.0.0 + */ +@AllArgsConstructor +@Getter +public class SnailJobFuture> extends CompletableFuture { + private final Long requestId; + private final long timeout; + private final TimeUnit unit; + + public static > SnailJobFuture newFuture(Long requestId, long timeout, TimeUnit unit) { + return new SnailJobFuture<>(requestId, timeout, unit); + } + +} diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/NettyHttpClientHandler.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/NettyHttpClientHandler.java index 2f2c7a49..2e0110d8 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/NettyHttpClientHandler.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/NettyHttpClientHandler.java @@ -1,6 +1,7 @@ package com.aizuda.snailjob.server.common.rpc.client; import com.aizuda.snailjob.common.core.model.NettyResult; +import com.aizuda.snailjob.common.core.rpc.RpcContext; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.SnailJobLog; import io.netty.channel.ChannelHandlerContext; @@ -32,7 +33,7 @@ public class NettyHttpClientHandler extends SimpleChannelInboundHandler(); - RpcContext.setCompletableFuture(snailJobRequest.getReqId(), completableFuture); - } - SnailJobLog.LOCAL.debug("request complete requestId:[{}] 耗时:[{}ms]", snailJobRequest.getReqId(), sw.getTotalTimeMillis()); if (async) { + // 暂时不支持异步调用 return null; } else { - Assert.notNull(completableFuture, () -> new SnailJobServerException("completableFuture is null")); + Assert.notNull(newFuture, () -> new SnailJobServerException("completableFuture is null")); try { - return (Result) completableFuture.get(Optional.ofNullable(executorTimeout).orElse(20), TimeUnit.SECONDS); + return (Result) newFuture.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { throw new SnailJobServerException("Request to remote interface exception. path:[{}]", mapping.path()); } catch (TimeoutException e) { throw new SnailJobServerException("Request to remote interface timed out. path:[{}]", mapping.path()); - } finally { - RpcContext.remove(snailJobRequest.getReqId()); } } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/RpcContext.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/RpcContext.java deleted file mode 100644 index c03b5313..00000000 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/RpcContext.java +++ /dev/null @@ -1,76 +0,0 @@ -package com.aizuda.snailjob.server.common.rpc.client; - -import com.aizuda.snailjob.common.core.model.NettyResult; -import com.aizuda.snailjob.common.log.SnailJobLog; -import lombok.extern.slf4j.Slf4j; - -import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.function.Consumer; - -/** - * 处理请求服务端时需要存储的中间值 - * - * @author: opensnail - * @date : 2023-05-12 09:05 - * @since 1.3.0 - */ -@Slf4j -public final class RpcContext { - private RpcContext() {} - - private static final ConcurrentMap COMPLETABLE_FUTURE = new ConcurrentHashMap<>(); - - private static final ConcurrentMap CALLBACK_CONSUMER = new ConcurrentHashMap<>(); - - public static void invoke(Long requestId, NettyResult nettyResult) { - - try { - // 同步请求同步返回 - Optional.ofNullable(getCompletableFuture(requestId)).ifPresent(completableFuture -> completableFuture.complete(nettyResult)); - - // 异步请求进行回调 - Optional.ofNullable(getConsumer(requestId)).ifPresent(consumer -> consumer.accept(nettyResult)); - } catch (Exception e) { - SnailJobLog.LOCAL.error("回调处理失败 requestId:[{}]",requestId, e ); - } finally { - remove(requestId); - } - - } - - public static void remove(Long requestId) { - COMPLETABLE_FUTURE.remove(requestId); - CALLBACK_CONSUMER.remove(requestId); - } - - public static void setCompletableFuture(long id, CompletableFuture completableFuture, Consumer callable) { - if (Objects.nonNull(completableFuture)) { - COMPLETABLE_FUTURE.put(id, completableFuture); - } - - if (Objects.nonNull(callable)) { - CALLBACK_CONSUMER.put(id, callable); - } - - } - - public static void setCompletableFuture(Long id, Consumer callable) { - setCompletableFuture(id, null, callable); - } - - public static void setCompletableFuture(Long id, CompletableFuture completableFuture) { - setCompletableFuture(id, completableFuture, null); - } - - public static CompletableFuture getCompletableFuture(Long id) { - return COMPLETABLE_FUTURE.get(id); - } - - public static Consumer getConsumer(Long id) { - return CALLBACK_CONSUMER.get(id); - } -}