feat&fix: 1.3.0
1.限定创建组和场景仅支持字母数字和下划线 2.优化reqId使用AtomicLong作为请求id 3.优化客户端请求模板
This commit is contained in:
parent
ea40b0fc9b
commit
4916fad455
@ -0,0 +1,80 @@
|
|||||||
|
package com.aizuda.easy.retry.client.core.client.netty;
|
||||||
|
|
||||||
|
import cn.hutool.core.util.IdUtil;
|
||||||
|
import com.aizuda.easy.retry.client.core.cache.GroupVersionCache;
|
||||||
|
import com.aizuda.easy.retry.client.core.config.EasyRetryProperties;
|
||||||
|
import com.aizuda.easy.retry.common.core.context.SpringContext;
|
||||||
|
import com.aizuda.easy.retry.common.core.enums.HeadersEnum;
|
||||||
|
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||||
|
import com.aizuda.easy.retry.common.core.util.HostUtils;
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.handler.codec.http.DefaultFullHttpRequest;
|
||||||
|
import io.netty.handler.codec.http.FullHttpRequest;
|
||||||
|
import io.netty.handler.codec.http.HttpHeaderNames;
|
||||||
|
import io.netty.handler.codec.http.HttpHeaderValues;
|
||||||
|
import io.netty.handler.codec.http.HttpMethod;
|
||||||
|
import io.netty.handler.codec.http.HttpVersion;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.boot.autoconfigure.web.ServerProperties;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author www.byteblogs.com
|
||||||
|
* @date 2023-05-13
|
||||||
|
* @since 1.3.0
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class NettyChannel {
|
||||||
|
|
||||||
|
private static final String HOST_ID = IdUtil.simpleUUID();
|
||||||
|
private static final String HOST_IP = HostUtils.getIp();
|
||||||
|
|
||||||
|
private static Channel CHANNEL;
|
||||||
|
|
||||||
|
public static void setChannel(Channel channel) {
|
||||||
|
NettyChannel.CHANNEL = channel;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发送数据
|
||||||
|
*
|
||||||
|
* @param method 请求方式
|
||||||
|
* @param url url地址
|
||||||
|
* @param body 请求的消息体
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
public static void send(HttpMethod method, String url, String body) throws InterruptedException {
|
||||||
|
|
||||||
|
if (Objects.isNull(CHANNEL)) {
|
||||||
|
LogUtils.info(log, "send message but channel is null url:[{}] method:[{}] body:[{}] ", url, method, body);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 配置HttpRequest的请求数据和一些配置信息
|
||||||
|
FullHttpRequest request = new DefaultFullHttpRequest(
|
||||||
|
HttpVersion.HTTP_1_1, method, url, Unpooled.wrappedBuffer(body.getBytes(StandardCharsets.UTF_8)));
|
||||||
|
|
||||||
|
ServerProperties serverProperties = SpringContext.CONTEXT.getBean(ServerProperties.class);
|
||||||
|
|
||||||
|
request.headers()
|
||||||
|
.set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
|
||||||
|
//开启长连接
|
||||||
|
.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
|
||||||
|
//设置传递请求内容的长度
|
||||||
|
.set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes())
|
||||||
|
.set(HeadersEnum.HOST_ID.getKey(), HOST_ID)
|
||||||
|
.set(HeadersEnum.HOST_IP.getKey(), HOST_IP)
|
||||||
|
.set(HeadersEnum.GROUP_NAME.getKey(), EasyRetryProperties.getGroup())
|
||||||
|
.set(HeadersEnum.CONTEXT_PATH.getKey(), Optional.ofNullable(serverProperties.getServlet().getContextPath()).orElse("/"))
|
||||||
|
.set(HeadersEnum.HOST_PORT.getKey(), Optional.ofNullable(serverProperties.getPort()).orElse(8080))
|
||||||
|
.set(HeadersEnum.VERSION.getKey(), GroupVersionCache.getVersion())
|
||||||
|
;
|
||||||
|
|
||||||
|
//发送数据
|
||||||
|
CHANNEL.writeAndFlush(request).sync();
|
||||||
|
}
|
||||||
|
}
|
@ -25,11 +25,17 @@ import java.util.concurrent.TimeUnit;
|
|||||||
public class NettyHttpClientHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
|
public class NettyHttpClientHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
|
||||||
|
|
||||||
private NettyClient client;
|
private NettyClient client;
|
||||||
public NettyHttpClientHandler() {
|
private NettyHttpConnectClient nettyHttpConnectClient;
|
||||||
|
|
||||||
|
public NettyHttpClientHandler(NettyHttpConnectClient nettyHttpConnectClient) {
|
||||||
|
|
||||||
client = RequestBuilder.<NettyClient, NettyResult>newBuilder()
|
client = RequestBuilder.<NettyClient, NettyResult>newBuilder()
|
||||||
.client(NettyClient.class)
|
.client(NettyClient.class)
|
||||||
.callback(nettyResult -> LogUtils.info(log,"heartbeat check requestId:[{}]", nettyResult.getRequestId()))
|
.callback(nettyResult -> LogUtils.info(log,"heartbeat check requestId:[{}]", nettyResult.getRequestId()))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
this.nettyHttpConnectClient = nettyHttpConnectClient;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -53,13 +59,14 @@ public class NettyHttpClientHandler extends SimpleChannelInboundHandler<FullHttp
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
|
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
|
||||||
super.channelUnregistered(ctx);
|
|
||||||
LogUtils.debug(log, "channelUnregistered");
|
LogUtils.debug(log, "channelUnregistered");
|
||||||
ctx.channel().eventLoop().schedule(() -> {
|
ctx.channel().eventLoop().schedule(() -> {
|
||||||
EasyRetryProperties easyRetryProperties = SpringContext.getBeanByType(EasyRetryProperties.class);
|
try {
|
||||||
EasyRetryProperties.ServerConfig server = easyRetryProperties.getServer();
|
nettyHttpConnectClient.reconnect();
|
||||||
LogUtils.info(log, "Reconnecting to:" + server.getHost() + ":" + server.getPort());
|
} catch (Exception e) {
|
||||||
NettyHttpConnectClient.connect();
|
LogUtils.error(log, "reconnect error ", e);
|
||||||
|
}
|
||||||
|
|
||||||
}, 10, TimeUnit.SECONDS);
|
}, 10, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
|
||||||
@ -91,7 +98,7 @@ public class NettyHttpClientHandler extends SimpleChannelInboundHandler<FullHttp
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||||
LogUtils.error(log,"easy-retry netty_http client exception", cause);
|
LogUtils.error(log,"easy-retry netty-http client exception", cause);
|
||||||
super.exceptionCaught(ctx, cause);
|
super.exceptionCaught(ctx, cause);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,15 +1,8 @@
|
|||||||
package com.aizuda.easy.retry.client.core.client.netty;
|
package com.aizuda.easy.retry.client.core.client.netty;
|
||||||
|
|
||||||
import cn.hutool.core.util.IdUtil;
|
|
||||||
import com.aizuda.easy.retry.client.core.cache.GroupVersionCache;
|
|
||||||
import com.aizuda.easy.retry.client.core.config.EasyRetryProperties;
|
import com.aizuda.easy.retry.client.core.config.EasyRetryProperties;
|
||||||
import com.aizuda.easy.retry.client.core.Lifecycle;
|
import com.aizuda.easy.retry.client.core.Lifecycle;
|
||||||
import com.aizuda.easy.retry.common.core.context.SpringContext;
|
|
||||||
import com.aizuda.easy.retry.common.core.enums.HeadersEnum;
|
|
||||||
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
|
||||||
import com.aizuda.easy.retry.common.core.util.HostUtils;
|
|
||||||
import io.netty.bootstrap.Bootstrap;
|
import io.netty.bootstrap.Bootstrap;
|
||||||
import io.netty.buffer.Unpooled;
|
|
||||||
import io.netty.channel.*;
|
import io.netty.channel.*;
|
||||||
import io.netty.channel.nio.NioEventLoopGroup;
|
import io.netty.channel.nio.NioEventLoopGroup;
|
||||||
import io.netty.channel.socket.SocketChannel;
|
import io.netty.channel.socket.SocketChannel;
|
||||||
@ -18,34 +11,33 @@ import io.netty.handler.codec.http.*;
|
|||||||
import io.netty.handler.timeout.IdleStateHandler;
|
import io.netty.handler.timeout.IdleStateHandler;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.BeansException;
|
import org.springframework.beans.BeansException;
|
||||||
import org.springframework.boot.autoconfigure.web.ServerProperties;
|
|
||||||
import org.springframework.context.ApplicationContext;
|
import org.springframework.context.ApplicationContext;
|
||||||
import org.springframework.context.ApplicationContextAware;
|
import org.springframework.context.ApplicationContextAware;
|
||||||
import org.springframework.core.Ordered;
|
import org.springframework.core.Ordered;
|
||||||
import org.springframework.core.annotation.Order;
|
import org.springframework.core.annotation.Order;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.net.ConnectException;
|
||||||
import java.util.Objects;
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* Netty 客户端
|
||||||
|
*
|
||||||
* @author: www.byteblogs.com
|
* @author: www.byteblogs.com
|
||||||
* @date : 2022-03-07 18:24
|
* @date : 2022-03-07 18:24
|
||||||
|
* @since 1.0.0
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
@Order(Ordered.HIGHEST_PRECEDENCE)
|
@Order(Ordered.HIGHEST_PRECEDENCE)
|
||||||
public class NettyHttpConnectClient implements Lifecycle, ApplicationContextAware {
|
public class NettyHttpConnectClient implements Lifecycle, ApplicationContextAware {
|
||||||
|
|
||||||
private static final String HOST_ID = IdUtil.simpleUUID();
|
|
||||||
private static final String HOST_IP = HostUtils.getIp();
|
|
||||||
|
|
||||||
private ApplicationContext applicationContext;
|
private ApplicationContext applicationContext;
|
||||||
private static Channel channel;
|
|
||||||
private static NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
|
private static NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
|
||||||
private static Bootstrap bootstrap = new Bootstrap();
|
private static Bootstrap bootstrap = new Bootstrap();
|
||||||
|
private volatile Channel channel;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start() {
|
public void start() {
|
||||||
|
|
||||||
@ -64,96 +56,105 @@ public class NettyHttpConnectClient implements Lifecycle, ApplicationContextAwar
|
|||||||
.addLast(new IdleStateHandler(0, 0, 30, TimeUnit.SECONDS))
|
.addLast(new IdleStateHandler(0, 0, 30, TimeUnit.SECONDS))
|
||||||
.addLast(new HttpClientCodec())
|
.addLast(new HttpClientCodec())
|
||||||
.addLast(new HttpObjectAggregator(5 * 1024 * 1024))
|
.addLast(new HttpObjectAggregator(5 * 1024 * 1024))
|
||||||
.addLast(new NettyHttpClientHandler());
|
.addLast(new NettyHttpClientHandler(thisClient));
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.option(ChannelOption.SO_KEEPALIVE, true)
|
.option(ChannelOption.SO_KEEPALIVE, true)
|
||||||
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
|
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
|
||||||
ChannelFuture channelFuture = bootstrap.connect().sync();
|
|
||||||
channel = channelFuture.channel();
|
|
||||||
|
|
||||||
|
// 开启连接服务端
|
||||||
|
connect();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Client start exception", e);
|
log.error("Client start exception", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 连接客户端
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public void connect() {
|
||||||
|
|
||||||
|
try {
|
||||||
|
ChannelFuture channelFuture = bootstrap.connect();
|
||||||
|
|
||||||
|
boolean notTimeout = channelFuture.awaitUninterruptibly(30, TimeUnit.SECONDS);
|
||||||
|
channel = channelFuture.channel();
|
||||||
|
if (notTimeout) {
|
||||||
|
// 连接成功
|
||||||
|
if (channel != null && channel.isActive()) {
|
||||||
|
log.info("netty client started {} connect to server", channel.localAddress());
|
||||||
|
NettyChannel.setChannel(getChannel());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Throwable cause = channelFuture.cause();
|
||||||
|
if (cause != null) {
|
||||||
|
exceptionHandler(cause);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.warn("connect remote host[{}] timeout {}s", channel.remoteAddress(), 30);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
exceptionHandler(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
channel.close();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 重连
|
||||||
|
*/
|
||||||
|
public void reconnect() {
|
||||||
|
ChannelFuture channelFuture = bootstrap.connect();
|
||||||
|
channelFuture.addListener((ChannelFutureListener) future -> {
|
||||||
|
Throwable cause = future.cause();
|
||||||
|
if (cause != null) {
|
||||||
|
exceptionHandler(cause);
|
||||||
|
} else {
|
||||||
|
channel = channelFuture.channel();
|
||||||
|
if (channel != null && channel.isActive()) {
|
||||||
|
log.info("Netty client {} reconnect to server", channel.localAddress());
|
||||||
|
NettyChannel.setChannel(getChannel());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 连接失败处理
|
||||||
|
*
|
||||||
|
* @param cause
|
||||||
|
*/
|
||||||
|
private void exceptionHandler(Throwable cause) {
|
||||||
|
if (cause instanceof ConnectException) {
|
||||||
|
log.error("connect error:{}", cause.getMessage());
|
||||||
|
} else if (cause instanceof ClosedChannelException) {
|
||||||
|
log.error("connect error:{}", "client has destroy");
|
||||||
|
} else {
|
||||||
|
log.error("connect error:", cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
nioEventLoopGroup.shutdownGracefully();
|
if (channel != null) {
|
||||||
}
|
channel.close();
|
||||||
|
|
||||||
public static void connect(){
|
|
||||||
channel = bootstrap.connect().addListener((ChannelFutureListener) future -> {
|
|
||||||
if (future.cause() != null){
|
|
||||||
LogUtils.debug(log,"operationComplete", future.cause());
|
|
||||||
}
|
|
||||||
}).channel();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void send(HttpMethod method, String url, String body) throws InterruptedException {
|
|
||||||
|
|
||||||
if (Objects.isNull(channel)) {
|
|
||||||
LogUtils.debug(log,"channel is null");
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
if (nioEventLoopGroup != null) {
|
||||||
// 配置HttpRequest的请求数据和一些配置信息
|
nioEventLoopGroup.shutdownGracefully();
|
||||||
FullHttpRequest request = new DefaultFullHttpRequest(
|
|
||||||
HttpVersion.HTTP_1_0, method, url, Unpooled.wrappedBuffer(body.getBytes(StandardCharsets.UTF_8)));
|
|
||||||
|
|
||||||
ServerProperties serverProperties = SpringContext.CONTEXT.getBean(ServerProperties.class);
|
|
||||||
|
|
||||||
request.headers()
|
|
||||||
.set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
|
|
||||||
//开启长连接
|
|
||||||
.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
|
|
||||||
//设置传递请求内容的长度
|
|
||||||
.set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes())
|
|
||||||
.set(HeadersEnum.HOST_ID.getKey(), HOST_ID)
|
|
||||||
.set(HeadersEnum.HOST_IP.getKey(), HOST_IP)
|
|
||||||
.set(HeadersEnum.GROUP_NAME.getKey(), EasyRetryProperties.getGroup())
|
|
||||||
.set(HeadersEnum.CONTEXT_PATH.getKey(), Optional.ofNullable(serverProperties.getServlet().getContextPath()).orElse("/"))
|
|
||||||
.set(HeadersEnum.HOST_PORT.getKey(), Optional.ofNullable(serverProperties.getPort()).orElse(8080))
|
|
||||||
.set(HeadersEnum.VERSION.getKey(), GroupVersionCache.getVersion())
|
|
||||||
;
|
|
||||||
|
|
||||||
//发送数据
|
|
||||||
channel.writeAndFlush(request).sync();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void sendSync(HttpMethod method, String url, String body) throws InterruptedException {
|
|
||||||
|
|
||||||
if (Objects.isNull(channel)) {
|
|
||||||
LogUtils.debug(log,"channel is null");
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 配置HttpRequest的请求数据和一些配置信息
|
|
||||||
FullHttpRequest request = new DefaultFullHttpRequest(
|
|
||||||
HttpVersion.HTTP_1_0, method, url, Unpooled.wrappedBuffer(body.getBytes(StandardCharsets.UTF_8)));
|
|
||||||
|
|
||||||
ServerProperties serverProperties = SpringContext.CONTEXT.getBean(ServerProperties.class);
|
|
||||||
|
|
||||||
request.headers()
|
|
||||||
.set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
|
|
||||||
//开启长连接
|
|
||||||
.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
|
|
||||||
//设置传递请求内容的长度
|
|
||||||
.set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes())
|
|
||||||
.set(HeadersEnum.HOST_ID.getKey(), HOST_ID)
|
|
||||||
.set(HeadersEnum.HOST_IP.getKey(), HOST_IP)
|
|
||||||
.set(HeadersEnum.GROUP_NAME.getKey(), EasyRetryProperties.getGroup())
|
|
||||||
.set(HeadersEnum.CONTEXT_PATH.getKey(), Optional.ofNullable(serverProperties.getServlet().getContextPath()).orElse("/"))
|
|
||||||
.set(HeadersEnum.HOST_PORT.getKey(), Optional.ofNullable(serverProperties.getPort()).orElse(8080))
|
|
||||||
.set(HeadersEnum.VERSION.getKey(), GroupVersionCache.getVersion())
|
|
||||||
;
|
|
||||||
|
|
||||||
//发送数据
|
|
||||||
channel.writeAndFlush(request).sync();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
||||||
this.applicationContext = applicationContext;
|
this.applicationContext = applicationContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Channel getChannel() {
|
||||||
|
return channel;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -20,11 +20,11 @@ import java.util.function.Consumer;
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
public class RpcContext {
|
public class RpcContext {
|
||||||
|
|
||||||
private static final ConcurrentMap<String, CompletableFuture> COMPLETABLE_FUTURE = new ConcurrentHashMap<>();
|
private static final ConcurrentMap<Long, CompletableFuture> COMPLETABLE_FUTURE = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private static final ConcurrentMap<String, Consumer> CALLBACK_CONSUMER = new ConcurrentHashMap<>();
|
private static final ConcurrentMap<Long, Consumer> CALLBACK_CONSUMER = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public static void invoke(String requestId, NettyResult nettyResult) {
|
public static void invoke(Long requestId, NettyResult nettyResult) {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// 同步请求同步返回
|
// 同步请求同步返回
|
||||||
@ -41,7 +41,7 @@ public class RpcContext {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <R> void setCompletableFuture(String id, CompletableFuture<R> completableFuture, Consumer<R> callable) {
|
public static <R> void setCompletableFuture(long id, CompletableFuture<R> completableFuture, Consumer<R> callable) {
|
||||||
if (Objects.nonNull(completableFuture)) {
|
if (Objects.nonNull(completableFuture)) {
|
||||||
COMPLETABLE_FUTURE.put(id, completableFuture);
|
COMPLETABLE_FUTURE.put(id, completableFuture);
|
||||||
}
|
}
|
||||||
@ -52,19 +52,19 @@ public class RpcContext {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <R> void setCompletableFuture(String id, Consumer<R> callable) {
|
public static <R> void setCompletableFuture(Long id, Consumer<R> callable) {
|
||||||
setCompletableFuture(id, null, callable);
|
setCompletableFuture(id, null, callable);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <R> void setCompletableFuture(String id, CompletableFuture<R> completableFuture) {
|
public static <R> void setCompletableFuture(Long id, CompletableFuture<R> completableFuture) {
|
||||||
setCompletableFuture(id, completableFuture, null);
|
setCompletableFuture(id, completableFuture, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static CompletableFuture<Object> getCompletableFuture(String id) {
|
public static CompletableFuture<Object> getCompletableFuture(Long id) {
|
||||||
return COMPLETABLE_FUTURE.get(id);
|
return COMPLETABLE_FUTURE.get(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Consumer getConsumer(String id) {
|
public static Consumer getConsumer(Long id) {
|
||||||
return CALLBACK_CONSUMER.get(id);
|
return CALLBACK_CONSUMER.get(id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,7 @@ package com.aizuda.easy.retry.client.core.client.proxy;
|
|||||||
import cn.hutool.core.date.StopWatch;
|
import cn.hutool.core.date.StopWatch;
|
||||||
import cn.hutool.core.lang.Assert;
|
import cn.hutool.core.lang.Assert;
|
||||||
import com.aizuda.easy.retry.client.core.annotation.Mapping;
|
import com.aizuda.easy.retry.client.core.annotation.Mapping;
|
||||||
import com.aizuda.easy.retry.client.core.client.netty.NettyHttpConnectClient;
|
import com.aizuda.easy.retry.client.core.client.netty.NettyChannel;
|
||||||
import com.aizuda.easy.retry.client.core.client.netty.RpcContext;
|
import com.aizuda.easy.retry.client.core.client.netty.RpcContext;
|
||||||
import com.aizuda.easy.retry.client.core.exception.EasyRetryClientException;
|
import com.aizuda.easy.retry.client.core.exception.EasyRetryClientException;
|
||||||
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||||
@ -28,10 +28,10 @@ import java.util.function.Consumer;
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
public class ClientInvokeHandler<R> implements InvocationHandler {
|
public class ClientInvokeHandler<R> implements InvocationHandler {
|
||||||
|
|
||||||
private Consumer<R> consumer;
|
private final Consumer<R> consumer;
|
||||||
private boolean async;
|
private final boolean async;
|
||||||
private long timeout;
|
private final long timeout;
|
||||||
private TimeUnit unit;
|
private final TimeUnit unit;
|
||||||
|
|
||||||
public ClientInvokeHandler(boolean async, long timeout, TimeUnit unit, Consumer<R> consumer) {
|
public ClientInvokeHandler(boolean async, long timeout, TimeUnit unit, Consumer<R> consumer) {
|
||||||
this.consumer = consumer;
|
this.consumer = consumer;
|
||||||
@ -46,26 +46,23 @@ public class ClientInvokeHandler<R> implements InvocationHandler {
|
|||||||
Mapping annotation = method.getAnnotation(Mapping.class);
|
Mapping annotation = method.getAnnotation(Mapping.class);
|
||||||
EasyRetryRequest easyRetryRequest = new EasyRetryRequest(args);
|
EasyRetryRequest easyRetryRequest = new EasyRetryRequest(args);
|
||||||
|
|
||||||
sw.start("request start " + easyRetryRequest.getRequestId());
|
sw.start("request start " + easyRetryRequest.getReqId());
|
||||||
|
|
||||||
CompletableFuture<R> completableFuture = null;
|
CompletableFuture<R> completableFuture = null;
|
||||||
if (async) {
|
if (async) {
|
||||||
RpcContext.setCompletableFuture(easyRetryRequest.getRequestId(), consumer);
|
RpcContext.setCompletableFuture(easyRetryRequest.getReqId(), consumer);
|
||||||
} else {
|
} else {
|
||||||
completableFuture = new CompletableFuture<>();
|
completableFuture = new CompletableFuture<>();
|
||||||
RpcContext.setCompletableFuture(easyRetryRequest.getRequestId(), completableFuture);
|
RpcContext.setCompletableFuture(easyRetryRequest.getReqId(), completableFuture);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
NettyHttpConnectClient.send(HttpMethod.valueOf(annotation.method().name()), annotation.path(),
|
NettyChannel.send(HttpMethod.valueOf(annotation.method().name()), annotation.path(), easyRetryRequest.toString());
|
||||||
JsonUtil.toJsonString(easyRetryRequest));
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw e;
|
|
||||||
} finally {
|
} finally {
|
||||||
sw.stop();
|
sw.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
LogUtils.info(log,"request complete requestId:[{}] 耗时:[{}ms]", easyRetryRequest.getRequestId(), sw.getTotalTimeMillis());
|
LogUtils.info(log,"request complete requestId:[{}] 耗时:[{}ms]", easyRetryRequest.getReqId(), sw.getTotalTimeMillis());
|
||||||
if (async) {
|
if (async) {
|
||||||
return null;
|
return null;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1,8 +1,10 @@
|
|||||||
package com.aizuda.easy.retry.common.core.model;
|
package com.aizuda.easy.retry.common.core.model;
|
||||||
|
|
||||||
|
import com.aizuda.easy.retry.common.core.util.JsonUtil;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
|
||||||
import java.util.UUID;
|
import java.util.Arrays;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author www.byteblogs.com
|
* @author www.byteblogs.com
|
||||||
@ -12,21 +14,26 @@ import java.util.UUID;
|
|||||||
@Data
|
@Data
|
||||||
public class EasyRetryRequest {
|
public class EasyRetryRequest {
|
||||||
|
|
||||||
private String requestId;
|
private static final AtomicLong REQUEST_ID = new AtomicLong(0);
|
||||||
|
|
||||||
|
private long reqId;
|
||||||
|
|
||||||
private Object[] args;
|
private Object[] args;
|
||||||
|
|
||||||
public EasyRetryRequest(Object... args) {
|
public EasyRetryRequest(Object... args) {
|
||||||
this.args = args;
|
this.args = args;
|
||||||
this.requestId = generateRequestId();
|
this.reqId = newId();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static long newId() {
|
||||||
|
return REQUEST_ID.getAndIncrement();
|
||||||
}
|
}
|
||||||
|
|
||||||
public EasyRetryRequest() {
|
public EasyRetryRequest() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String generateRequestId() {
|
@Override
|
||||||
return UUID.randomUUID().toString();
|
public String toString() {
|
||||||
|
return JsonUtil.toJsonString(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -9,9 +9,9 @@ import lombok.Data;
|
|||||||
@Data
|
@Data
|
||||||
public class NettyResult extends Result<Object> {
|
public class NettyResult extends Result<Object> {
|
||||||
|
|
||||||
private String requestId;
|
private long requestId;
|
||||||
|
|
||||||
public NettyResult(int status, String message, Object data, String requestId) {
|
public NettyResult(int status, String message, Object data, long requestId) {
|
||||||
super(status, message, data);
|
super(status, message, data);
|
||||||
this.requestId = requestId;
|
this.requestId = requestId;
|
||||||
}
|
}
|
||||||
@ -19,7 +19,7 @@ public class NettyResult extends Result<Object> {
|
|||||||
public NettyResult() {
|
public NettyResult() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public NettyResult(Object data, String requestId) {
|
public NettyResult(Object data, long requestId) {
|
||||||
super(data);
|
super(data);
|
||||||
this.requestId = requestId;
|
this.requestId = requestId;
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package com.aizuda.easy.retry.server.server.handler;
|
package com.aizuda.easy.retry.server.server.handler;
|
||||||
|
|
||||||
import cn.hutool.core.net.url.UrlQuery;
|
import cn.hutool.core.net.url.UrlQuery;
|
||||||
|
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||||
import com.aizuda.easy.retry.server.support.handler.ClientRegisterHandler;
|
import com.aizuda.easy.retry.server.support.handler.ClientRegisterHandler;
|
||||||
import com.aizuda.easy.retry.common.core.model.NettyResult;
|
import com.aizuda.easy.retry.common.core.model.NettyResult;
|
||||||
import com.aizuda.easy.retry.common.core.model.EasyRetryRequest;
|
import com.aizuda.easy.retry.common.core.model.EasyRetryRequest;
|
||||||
@ -36,8 +37,8 @@ public class BeatHttpRequestHandler extends GetHttpRequestHandler {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String doHandler(String content, UrlQuery query, HttpHeaders headers) {
|
public String doHandler(String content, UrlQuery query, HttpHeaders headers) {
|
||||||
log.info("心跳检查 content:[{}]", query.toString());
|
LogUtils.info(log,"Beat check content:[{}]", content);
|
||||||
EasyRetryRequest retryRequest = JsonUtil.parseObject(content, EasyRetryRequest.class);
|
EasyRetryRequest retryRequest = JsonUtil.parseObject(content, EasyRetryRequest.class);
|
||||||
return JsonUtil.toJsonString(new NettyResult("PONG", retryRequest.getRequestId()));
|
return JsonUtil.toJsonString(new NettyResult("PONG", retryRequest.getReqId()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -44,6 +44,6 @@ public class ConfigHttpRequestHandler extends GetHttpRequestHandler {
|
|||||||
EasyRetryRequest retryRequest = JsonUtil.parseObject(content, EasyRetryRequest.class);
|
EasyRetryRequest retryRequest = JsonUtil.parseObject(content, EasyRetryRequest.class);
|
||||||
String groupName = headers.get(HeadersEnum.GROUP_NAME.getKey());
|
String groupName = headers.get(HeadersEnum.GROUP_NAME.getKey());
|
||||||
ConfigDTO configDTO = configAccess.getConfigInfo(groupName);
|
ConfigDTO configDTO = configAccess.getConfigInfo(groupName);
|
||||||
return JsonUtil.toJsonString(new NettyResult(JsonUtil.toJsonString(configDTO), retryRequest.getRequestId()));
|
return JsonUtil.toJsonString(new NettyResult(JsonUtil.toJsonString(configDTO), retryRequest.getReqId()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -48,10 +48,10 @@ public class ReportRetryInfoHttpRequestHandler extends PostHttpRequestHandler {
|
|||||||
Object[] args = retryRequest.getArgs();
|
Object[] args = retryRequest.getArgs();
|
||||||
|
|
||||||
Boolean aBoolean = retryService.batchReportRetry(JsonUtil.parseList(JsonUtil.toJsonString(args[0]), RetryTaskDTO.class));
|
Boolean aBoolean = retryService.batchReportRetry(JsonUtil.parseList(JsonUtil.toJsonString(args[0]), RetryTaskDTO.class));
|
||||||
return JsonUtil.toJsonString(new NettyResult(StatusEnum.YES.getStatus(), "批量上报重试数据处理成功", aBoolean, retryRequest.getRequestId()));
|
return JsonUtil.toJsonString(new NettyResult(StatusEnum.YES.getStatus(), "批量上报重试数据处理成功", aBoolean, retryRequest.getReqId()));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LogUtils.error(log, "批量上报重试数据失败", e);
|
LogUtils.error(log, "批量上报重试数据失败", e);
|
||||||
return JsonUtil.toJsonString(new NettyResult(StatusEnum.YES.getStatus(), e.getMessage(), null, retryRequest.getRequestId()));
|
return JsonUtil.toJsonString(new NettyResult(StatusEnum.YES.getStatus(), e.getMessage(), null, retryRequest.getReqId()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -102,6 +102,7 @@ export default {
|
|||||||
message: res.message
|
message: res.message
|
||||||
})
|
})
|
||||||
this.$refs.notify.reset()
|
this.$refs.notify.reset()
|
||||||
|
this.$router.go(-1)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}).catch(() => {
|
}).catch(() => {
|
||||||
|
@ -3,18 +3,21 @@
|
|||||||
|
|
||||||
<a-row class="form-row" :gutter="16">
|
<a-row class="form-row" :gutter="16">
|
||||||
<a-col :lg="6" :md="12" :sm="24">
|
<a-col :lg="6" :md="12" :sm="24">
|
||||||
<a-form-item label="组名称">
|
<a-form-item hidden>
|
||||||
<a-input
|
<a-input
|
||||||
placeholder="请输入组名称"
|
|
||||||
hidden
|
hidden
|
||||||
v-decorator="[
|
v-decorator="[
|
||||||
'id',
|
'id',
|
||||||
]" />
|
]" />
|
||||||
|
</a-form-item>
|
||||||
|
<a-form-item label="组名称">
|
||||||
<a-input
|
<a-input
|
||||||
placeholder="请输入组名称"
|
placeholder="请输入组名称"
|
||||||
|
:maxLength="64"
|
||||||
|
:disabled='this.id && this.id > 0'
|
||||||
v-decorator="[
|
v-decorator="[
|
||||||
'groupName',
|
'groupName',
|
||||||
{rules: [{ required: true, message: '请输入组名称', whitespace: true}]}
|
{rules: [{ required: true, message: '请输入组名称', whitespace: true},{required: true, max: 64, message: '最多支持64个字符!'}, {validator: validate}]}
|
||||||
]" />
|
]" />
|
||||||
</a-form-item>
|
</a-form-item>
|
||||||
</a-col>
|
</a-col>
|
||||||
@ -49,6 +52,7 @@
|
|||||||
<a-form-item label="描述">
|
<a-form-item label="描述">
|
||||||
<a-input
|
<a-input
|
||||||
placeholder="请输入描述"
|
placeholder="请输入描述"
|
||||||
|
:maxLength="256"
|
||||||
v-decorator="[
|
v-decorator="[
|
||||||
'description',
|
'description',
|
||||||
{rules: [{ required: true, message: '请输入描述', whitespace: true}]}
|
{rules: [{ required: true, message: '请输入描述', whitespace: true}]}
|
||||||
@ -142,9 +146,9 @@ export default {
|
|||||||
})
|
})
|
||||||
},
|
},
|
||||||
validate (rule, value, callback) {
|
validate (rule, value, callback) {
|
||||||
const regex = /^user-(.*)$/
|
const regex = /^[A-Za-z0-9_]+$/
|
||||||
if (!regex.test(value)) {
|
if (!regex.test(value)) {
|
||||||
callback(new Error('需要以 user- 开头'))
|
callback(new Error('仅支持数字字母下划线'))
|
||||||
}
|
}
|
||||||
callback()
|
callback()
|
||||||
},
|
},
|
||||||
@ -159,6 +163,7 @@ export default {
|
|||||||
formData.groupStatus = formData.groupStatus.toString()
|
formData.groupStatus = formData.groupStatus.toString()
|
||||||
formData.routeKey = formData.routeKey.toString()
|
formData.routeKey = formData.routeKey.toString()
|
||||||
formData.idGeneratorMode = formData.idGeneratorMode.toString()
|
formData.idGeneratorMode = formData.idGeneratorMode.toString()
|
||||||
|
this.id = formData.id
|
||||||
|
|
||||||
console.log('formData', formData)
|
console.log('formData', formData)
|
||||||
form.setFieldsValue(formData)
|
form.setFieldsValue(formData)
|
||||||
|
@ -319,6 +319,19 @@ export default {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const regex = /^[A-Za-z0-9_]{1,64}$/
|
||||||
|
if (!regex.test(sceneName)) {
|
||||||
|
this.memberLoading = false
|
||||||
|
this.$message.error('场景名称: 仅支持长度为:1~64位字符.格式为:数字、字母、下划线。')
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if (description.length > 256) {
|
||||||
|
this.memberLoading = false
|
||||||
|
this.$message.error('描述: 仅支持长度为:1~256位字符')
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
const target = this.formData.find(item => key === item.key)
|
const target = this.formData.find(item => key === item.key)
|
||||||
if (!target) {
|
if (!target) {
|
||||||
this.formData.push({
|
this.formData.push({
|
||||||
|
Loading…
Reference in New Issue
Block a user