From 5f0f0ad8aeacf91a1eebdf55fb60212a5613cb84 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Fri, 10 May 2024 22:27:42 +0800 Subject: [PATCH] =?UTF-8?q?feat(sj=5F1.0.0):=20=E4=BF=AE=E5=A4=8D=E5=9B=9E?= =?UTF-8?q?=E8=B0=83=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rpc/client/RpcClientInvokeHandler.java | 12 ++++++------ .../handler/NettyHttpServerHandler.java | 3 ++- .../rpc/client/RpcClientInvokeHandler.java | 17 ++++++++--------- .../common/rpc/server/RequestHandlerActor.java | 14 +++++++------- 4 files changed, 23 insertions(+), 23 deletions(-) diff --git a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/RpcClientInvokeHandler.java b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/RpcClientInvokeHandler.java index ee3d5bdc..513ce564 100644 --- a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/RpcClientInvokeHandler.java +++ b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/RpcClientInvokeHandler.java @@ -47,6 +47,12 @@ public class RpcClientInvokeHandler implements InvocationHandler { sw.start("request start " + snailJobRequest.getReqId()); + try { + NettyChannel.send(HttpMethod.valueOf(annotation.method().name()), annotation.path(), snailJobRequest.toString()); + } finally { + sw.stop(); + } + CompletableFuture completableFuture = null; if (async) { RpcContext.setCompletableFuture(snailJobRequest.getReqId(), consumer); @@ -55,12 +61,6 @@ public class RpcClientInvokeHandler implements InvocationHandler { RpcContext.setCompletableFuture(snailJobRequest.getReqId(), completableFuture); } - try { - NettyChannel.send(HttpMethod.valueOf(annotation.method().name()), annotation.path(), snailJobRequest.toString()); - } finally { - sw.stop(); - } - SnailJobLog.LOCAL.debug("request complete requestId:[{}] 耗时:[{}ms]", snailJobRequest.getReqId(), sw.getTotalTimeMillis()); if (async) { return null; diff --git a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/supports/handler/NettyHttpServerHandler.java b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/supports/handler/NettyHttpServerHandler.java index 5d60f86f..9e37d340 100644 --- a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/supports/handler/NettyHttpServerHandler.java +++ b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/supports/handler/NettyHttpServerHandler.java @@ -3,6 +3,7 @@ package com.aizuda.snailjob.client.common.rpc.supports.handler; import com.aizuda.snailjob.client.common.config.SnailJobProperties; import com.aizuda.snailjob.client.common.config.SnailJobProperties.DispatcherThreadPool; import com.aizuda.snailjob.client.common.rpc.supports.http.HttpResponse; +import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.model.SnailJobRequest; import com.aizuda.snailjob.common.core.model.NettyResult; import com.aizuda.snailjob.common.core.util.JsonUtil; @@ -64,7 +65,7 @@ public class NettyHttpServerHandler extends SimpleChannelInboundHandler(); - RpcContext.setCompletableFuture(snailJobRequest.getReqId(), completableFuture); - } - try { NettyChannel.send(hostId, hostIp, hostPort, HttpMethod.valueOf(mapping.method().name()), // 拼接 url?a=1&b=1 @@ -164,6 +155,14 @@ public class RpcClientInvokeHandler implements InvocationHandler { sw.stop(); } + CompletableFuture completableFuture = null; + if (async) { +// RpcContext.setCompletableFuture(snailJobRequest.getReqId(), null); + } else { + completableFuture = new CompletableFuture<>(); + RpcContext.setCompletableFuture(snailJobRequest.getReqId(), completableFuture); + } + SnailJobLog.LOCAL.debug("request complete requestId:[{}] 耗时:[{}ms]", snailJobRequest.getReqId(), sw.getTotalTimeMillis()); if (async) { diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/server/RequestHandlerActor.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/server/RequestHandlerActor.java index c288177f..883f0f77 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/server/RequestHandlerActor.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/server/RequestHandlerActor.java @@ -5,6 +5,9 @@ import cn.hutool.core.net.url.UrlBuilder; import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.context.SpringContext; import com.aizuda.snailjob.common.core.enums.HeadersEnum; +import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.core.model.NettyResult; +import com.aizuda.snailjob.common.core.model.SnailJobRequest; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.common.core.model.Result; import com.aizuda.snailjob.common.core.util.JsonUtil; @@ -67,8 +70,8 @@ public class RequestHandlerActor extends AbstractActor { result = doProcess(uri, content, method, headers); } catch (Exception e) { SnailJobLog.LOCAL.error("http request error. [{}]", nettyHttpRequest.getContent(), e); - result = JsonUtil.toJsonString(new Result<>(0, e.getMessage())); - throw e; + SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class); + result = JsonUtil.toJsonString(new NettyResult(StatusEnum.NO.getStatus(), e.getMessage(), null, retryRequest.getReqId())); } finally { writeResponse(channelHandlerContext, keepAlive, result); getContext().stop(getSelf()); @@ -108,8 +111,6 @@ public class RequestHandlerActor extends AbstractActor { SnailJobLog.LOCAL.warn("client register error. groupName:[{}]", groupName); } - - UrlBuilder builder = UrlBuilder.ofHttp(uri); Collection httpRequestHandlers = SpringContext.getContext() .getBeansOfType(HttpRequestHandler.class).values(); @@ -127,11 +128,10 @@ public class RequestHandlerActor extends AbstractActor { * write response */ private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) { - // write response FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, - Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8)); // Unpooled.wrappedBuffer(responseJson) + Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8)); response.headers().set(HttpHeaderNames.CONTENT_TYPE, - HttpHeaderValues.APPLICATION_JSON); // HttpHeaderValues.TEXT_PLAIN.toString() + HttpHeaderValues.APPLICATION_JSON); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); if (keepAlive) { response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);