feat(sj_1.0.0): 修复回调问题

This commit is contained in:
opensnail 2024-05-10 22:27:42 +08:00
parent b34242420c
commit 5f0f0ad8ae
4 changed files with 23 additions and 23 deletions

View File

@ -47,6 +47,12 @@ public class RpcClientInvokeHandler<R> implements InvocationHandler {
sw.start("request start " + snailJobRequest.getReqId()); sw.start("request start " + snailJobRequest.getReqId());
try {
NettyChannel.send(HttpMethod.valueOf(annotation.method().name()), annotation.path(), snailJobRequest.toString());
} finally {
sw.stop();
}
CompletableFuture<R> completableFuture = null; CompletableFuture<R> completableFuture = null;
if (async) { if (async) {
RpcContext.setCompletableFuture(snailJobRequest.getReqId(), consumer); RpcContext.setCompletableFuture(snailJobRequest.getReqId(), consumer);
@ -55,12 +61,6 @@ public class RpcClientInvokeHandler<R> implements InvocationHandler {
RpcContext.setCompletableFuture(snailJobRequest.getReqId(), completableFuture); RpcContext.setCompletableFuture(snailJobRequest.getReqId(), completableFuture);
} }
try {
NettyChannel.send(HttpMethod.valueOf(annotation.method().name()), annotation.path(), snailJobRequest.toString());
} finally {
sw.stop();
}
SnailJobLog.LOCAL.debug("request complete requestId:[{}] 耗时:[{}ms]", snailJobRequest.getReqId(), sw.getTotalTimeMillis()); SnailJobLog.LOCAL.debug("request complete requestId:[{}] 耗时:[{}ms]", snailJobRequest.getReqId(), sw.getTotalTimeMillis());
if (async) { if (async) {
return null; return null;

View File

@ -3,6 +3,7 @@ package com.aizuda.snailjob.client.common.rpc.supports.handler;
import com.aizuda.snailjob.client.common.config.SnailJobProperties; import com.aizuda.snailjob.client.common.config.SnailJobProperties;
import com.aizuda.snailjob.client.common.config.SnailJobProperties.DispatcherThreadPool; import com.aizuda.snailjob.client.common.config.SnailJobProperties.DispatcherThreadPool;
import com.aizuda.snailjob.client.common.rpc.supports.http.HttpResponse; import com.aizuda.snailjob.client.common.rpc.supports.http.HttpResponse;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.SnailJobRequest; import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.model.NettyResult; import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.core.util.JsonUtil;
@ -64,7 +65,7 @@ public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttp
nettyResult = dispatcher.dispatch(nettyHttpRequest); nettyResult = dispatcher.dispatch(nettyHttpRequest);
} catch (Exception e) { } catch (Exception e) {
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class); SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
nettyResult = new NettyResult(0, e.getMessage(), null, retryRequest.getReqId()); nettyResult = new NettyResult(StatusEnum.NO.getStatus(), e.getMessage(), null, retryRequest.getReqId());
} finally { } finally {
writeResponse(channelHandlerContext, writeResponse(channelHandlerContext,
HttpUtil.isKeepAlive(fullHttpRequest), HttpUtil.isKeepAlive(fullHttpRequest),

View File

@ -147,15 +147,6 @@ public class RpcClientInvokeHandler implements InvocationHandler {
sw.start("request start " + snailJobRequest.getReqId()); sw.start("request start " + snailJobRequest.getReqId());
CompletableFuture completableFuture = null;
// 暂不支持异步
if (async) {
// RpcContext.setCompletableFuture(snailJobRequest.getReqId(), null);
} else {
completableFuture = new CompletableFuture<>();
RpcContext.setCompletableFuture(snailJobRequest.getReqId(), completableFuture);
}
try { try {
NettyChannel.send(hostId, hostIp, hostPort, NettyChannel.send(hostId, hostIp, hostPort,
HttpMethod.valueOf(mapping.method().name()), // 拼接 url?a=1&b=1 HttpMethod.valueOf(mapping.method().name()), // 拼接 url?a=1&b=1
@ -164,6 +155,14 @@ public class RpcClientInvokeHandler implements InvocationHandler {
sw.stop(); sw.stop();
} }
CompletableFuture completableFuture = null;
if (async) {
// RpcContext.setCompletableFuture(snailJobRequest.getReqId(), null);
} else {
completableFuture = new CompletableFuture<>();
RpcContext.setCompletableFuture(snailJobRequest.getReqId(), completableFuture);
}
SnailJobLog.LOCAL.debug("request complete requestId:[{}] 耗时:[{}ms]", snailJobRequest.getReqId(), SnailJobLog.LOCAL.debug("request complete requestId:[{}] 耗时:[{}ms]", snailJobRequest.getReqId(),
sw.getTotalTimeMillis()); sw.getTotalTimeMillis());
if (async) { if (async) {

View File

@ -5,6 +5,9 @@ import cn.hutool.core.net.url.UrlBuilder;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.context.SpringContext; import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.HeadersEnum; import com.aizuda.snailjob.common.core.enums.HeadersEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.common.core.model.Result; import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.core.util.JsonUtil;
@ -67,8 +70,8 @@ public class RequestHandlerActor extends AbstractActor {
result = doProcess(uri, content, method, headers); result = doProcess(uri, content, method, headers);
} catch (Exception e) { } catch (Exception e) {
SnailJobLog.LOCAL.error("http request error. [{}]", nettyHttpRequest.getContent(), e); SnailJobLog.LOCAL.error("http request error. [{}]", nettyHttpRequest.getContent(), e);
result = JsonUtil.toJsonString(new Result<>(0, e.getMessage())); SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
throw e; result = JsonUtil.toJsonString(new NettyResult(StatusEnum.NO.getStatus(), e.getMessage(), null, retryRequest.getReqId()));
} finally { } finally {
writeResponse(channelHandlerContext, keepAlive, result); writeResponse(channelHandlerContext, keepAlive, result);
getContext().stop(getSelf()); getContext().stop(getSelf());
@ -108,8 +111,6 @@ public class RequestHandlerActor extends AbstractActor {
SnailJobLog.LOCAL.warn("client register error. groupName:[{}]", groupName); SnailJobLog.LOCAL.warn("client register error. groupName:[{}]", groupName);
} }
UrlBuilder builder = UrlBuilder.ofHttp(uri); UrlBuilder builder = UrlBuilder.ofHttp(uri);
Collection<HttpRequestHandler> httpRequestHandlers = SpringContext.getContext() Collection<HttpRequestHandler> httpRequestHandlers = SpringContext.getContext()
.getBeansOfType(HttpRequestHandler.class).values(); .getBeansOfType(HttpRequestHandler.class).values();
@ -127,11 +128,10 @@ public class RequestHandlerActor extends AbstractActor {
* write response * write response
*/ */
private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) { private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {
// write response
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8)); // Unpooled.wrappedBuffer(responseJson) Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, response.headers().set(HttpHeaderNames.CONTENT_TYPE,
HttpHeaderValues.APPLICATION_JSON); // HttpHeaderValues.TEXT_PLAIN.toString() HttpHeaderValues.APPLICATION_JSON);
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
if (keepAlive) { if (keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);