diff --git a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/RpcClientInvokeHandler.java b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/RpcClientInvokeHandler.java index ee3d5bdc1..513ce564c 100644 --- a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/RpcClientInvokeHandler.java +++ b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/RpcClientInvokeHandler.java @@ -47,6 +47,12 @@ public class RpcClientInvokeHandler implements InvocationHandler { sw.start("request start " + snailJobRequest.getReqId()); + try { + NettyChannel.send(HttpMethod.valueOf(annotation.method().name()), annotation.path(), snailJobRequest.toString()); + } finally { + sw.stop(); + } + CompletableFuture completableFuture = null; if (async) { RpcContext.setCompletableFuture(snailJobRequest.getReqId(), consumer); @@ -55,12 +61,6 @@ public class RpcClientInvokeHandler implements InvocationHandler { RpcContext.setCompletableFuture(snailJobRequest.getReqId(), completableFuture); } - try { - NettyChannel.send(HttpMethod.valueOf(annotation.method().name()), annotation.path(), snailJobRequest.toString()); - } finally { - sw.stop(); - } - SnailJobLog.LOCAL.debug("request complete requestId:[{}] 耗时:[{}ms]", snailJobRequest.getReqId(), sw.getTotalTimeMillis()); if (async) { return null; diff --git a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/supports/handler/NettyHttpServerHandler.java b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/supports/handler/NettyHttpServerHandler.java index 5d60f86fe..9e37d340e 100644 --- a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/supports/handler/NettyHttpServerHandler.java +++ b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/supports/handler/NettyHttpServerHandler.java @@ -3,6 +3,7 @@ package com.aizuda.snailjob.client.common.rpc.supports.handler; import com.aizuda.snailjob.client.common.config.SnailJobProperties; import com.aizuda.snailjob.client.common.config.SnailJobProperties.DispatcherThreadPool; import com.aizuda.snailjob.client.common.rpc.supports.http.HttpResponse; +import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.model.SnailJobRequest; import com.aizuda.snailjob.common.core.model.NettyResult; import com.aizuda.snailjob.common.core.util.JsonUtil; @@ -64,7 +65,7 @@ public class NettyHttpServerHandler extends SimpleChannelInboundHandler(); - RpcContext.setCompletableFuture(snailJobRequest.getReqId(), completableFuture); - } - try { NettyChannel.send(hostId, hostIp, hostPort, HttpMethod.valueOf(mapping.method().name()), // 拼接 url?a=1&b=1 @@ -164,6 +155,14 @@ public class RpcClientInvokeHandler implements InvocationHandler { sw.stop(); } + CompletableFuture completableFuture = null; + if (async) { +// RpcContext.setCompletableFuture(snailJobRequest.getReqId(), null); + } else { + completableFuture = new CompletableFuture<>(); + RpcContext.setCompletableFuture(snailJobRequest.getReqId(), completableFuture); + } + SnailJobLog.LOCAL.debug("request complete requestId:[{}] 耗时:[{}ms]", snailJobRequest.getReqId(), sw.getTotalTimeMillis()); if (async) { diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/server/RequestHandlerActor.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/server/RequestHandlerActor.java index c288177f8..883f0f77f 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/server/RequestHandlerActor.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/server/RequestHandlerActor.java @@ -5,6 +5,9 @@ import cn.hutool.core.net.url.UrlBuilder; import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.context.SpringContext; import com.aizuda.snailjob.common.core.enums.HeadersEnum; +import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.core.model.NettyResult; +import com.aizuda.snailjob.common.core.model.SnailJobRequest; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.common.core.model.Result; import com.aizuda.snailjob.common.core.util.JsonUtil; @@ -67,8 +70,8 @@ public class RequestHandlerActor extends AbstractActor { result = doProcess(uri, content, method, headers); } catch (Exception e) { SnailJobLog.LOCAL.error("http request error. [{}]", nettyHttpRequest.getContent(), e); - result = JsonUtil.toJsonString(new Result<>(0, e.getMessage())); - throw e; + SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class); + result = JsonUtil.toJsonString(new NettyResult(StatusEnum.NO.getStatus(), e.getMessage(), null, retryRequest.getReqId())); } finally { writeResponse(channelHandlerContext, keepAlive, result); getContext().stop(getSelf()); @@ -108,8 +111,6 @@ public class RequestHandlerActor extends AbstractActor { SnailJobLog.LOCAL.warn("client register error. groupName:[{}]", groupName); } - - UrlBuilder builder = UrlBuilder.ofHttp(uri); Collection httpRequestHandlers = SpringContext.getContext() .getBeansOfType(HttpRequestHandler.class).values(); @@ -127,11 +128,10 @@ public class RequestHandlerActor extends AbstractActor { * write response */ private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) { - // write response FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, - Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8)); // Unpooled.wrappedBuffer(responseJson) + Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8)); response.headers().set(HttpHeaderNames.CONTENT_TYPE, - HttpHeaderValues.APPLICATION_JSON); // HttpHeaderValues.TEXT_PLAIN.toString() + HttpHeaderValues.APPLICATION_JSON); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); if (keepAlive) { response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);