feat(3.3.0) 客户端基于netty 改造

This commit is contained in:
byteblogs168 2024-04-11 23:24:04 +08:00
parent c870eb2f41
commit d1e9227a13
15 changed files with 412 additions and 58 deletions

View File

@ -0,0 +1,15 @@
package com.aizuda.easy.retry.client.common;
import com.aizuda.easy.retry.client.common.netty.server.EndPointInfo;
import java.util.List;
/**
*
* @author: opensnail
* @date : 2024-04-11 22:34
*/
public interface Scanner {
List<EndPointInfo> doScan();
}

View File

@ -0,0 +1,9 @@
package com.aizuda.easy.retry.client.common.annotation;
/**
* @author opensnail
* @date 2024-04-11 21:34:24
* @since 3.3.0
*/
public @interface SnailEndPoint {
}

View File

@ -48,7 +48,7 @@ public class EasyRetryProperties {
/**
* 指定客户端端口
*/
private Integer port;
private int port = 1789;
/**
* 远程上报滑动窗口配置

View File

@ -0,0 +1,23 @@
package com.aizuda.easy.retry.client.common.netty.server;
import com.aizuda.easy.retry.client.common.netty.RequestMethod;
import lombok.Builder;
import lombok.Data;
import java.lang.reflect.Method;
/**
* @author opensnail
* @date 2024-04-11 22:35:32
* @since 3.3.0
*/
@Builder
@Data
public class EndPointInfo {
private final String executorName;
private final Method method;
private final Object executor;
private final RequestMethod requestMethod;
private final String path;
}

View File

@ -0,0 +1,32 @@
package com.aizuda.easy.retry.client.common.netty.server;
import cn.hutool.core.lang.Pair;
import com.aizuda.easy.retry.client.common.netty.RequestMethod;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author opensnail
* @date 2024-04-11 22:58:21
* @since 3.3.0
*/
public final class EndPointInfoCache {
private EndPointInfoCache() {}
private static final ConcurrentHashMap<Pair<String, RequestMethod>, EndPointInfo> ENDPOINT_REPOSITORY = new ConcurrentHashMap<>();
public static void put(EndPointInfo endPointInfo) {
ENDPOINT_REPOSITORY.put(Pair.of(endPointInfo.getPath(), endPointInfo.getRequestMethod()), endPointInfo);
}
public static EndPointInfo get(String path, RequestMethod method) {
return ENDPOINT_REPOSITORY.get(Pair.of(path, method));
}
public static boolean isExisted(String path, RequestMethod method) {
return Objects.nonNull(ENDPOINT_REPOSITORY.get(Pair.of(path, method)));
}
}

View File

@ -0,0 +1,31 @@
package com.aizuda.easy.retry.client.common.netty.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import lombok.Builder;
import lombok.Data;
/**
* netty客户端请求模型
*
* @author: www.byteblogs.com
* @date : 2023-07-24 09:32
*/
@Data
@Builder
public class NettyHttpRequest {
private ChannelHandlerContext channelHandlerContext;
private String content;
private boolean keepAlive;
private HttpMethod method;
private String uri;
private HttpHeaders headers;
}

View File

@ -0,0 +1,101 @@
package com.aizuda.easy.retry.client.common.netty.server;
import com.aizuda.easy.retry.client.common.Lifecycle;
import com.aizuda.easy.retry.client.common.config.EasyRetryProperties;
import com.aizuda.easy.retry.client.common.exception.EasyRetryClientException;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
/**
* netty server
*
* @author: www.byteblogs.com
* @date : 2022-03-07 15:54
* @since 1.0.0
*/
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
@RequiredArgsConstructor
@Getter
public class NettyHttpServer implements Runnable, Lifecycle {
private final EasyRetryProperties easyRetryProperties;
private Thread thread = null;
private volatile boolean started = false;
@Override
public void run() {
// 防止重复启动
if (started) {
return;
}
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// start server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024))
.addLast(new NettyHttpServerHandler());
}
});
// 在特定端口绑定并启动服务器 默认是1789
ChannelFuture future = bootstrap.bind(easyRetryProperties.getPort()).sync();
EasyRetryLog.LOCAL.info("------> easy-retry client remoting server start success, nettype = {}, port = {}",
NettyHttpServer.class.getName(), easyRetryProperties.getPort());
started = true;
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
EasyRetryLog.LOCAL.info("--------> easy-retry client remoting server stop.");
} catch (Exception e) {
EasyRetryLog.LOCAL.error("--------> easy-retry client remoting server error.", e);
started = false;
throw new EasyRetryClientException("easy-retry client server start error");
} finally {
// 当服务器正常关闭时关闭EventLoopGroups以释放资源
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
@Override
public void start() {
thread = new Thread(this);
thread.setDaemon(true);
thread.start();
}
@Override
public void close() {
if (thread != null && thread.isAlive()) {
thread.interrupt();
}
}
}

View File

@ -0,0 +1,71 @@
package com.aizuda.easy.retry.client.common.netty.server;
import cn.hutool.core.net.url.UrlBuilder;
import com.aizuda.easy.retry.client.common.netty.RequestMethod;
import com.aizuda.easy.retry.common.core.model.EasyRetryRequest;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
import org.springframework.util.ReflectionUtils;
/**
* @author: opensnail
* @date : 2024-04-11 16:03
* @since : 3.3.0
*/
public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
public NettyHttpServerHandler() {
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest)
throws Exception {
UrlBuilder builder = UrlBuilder.ofHttp(fullHttpRequest.uri());
RequestMethod requestMethod = RequestMethod.valueOf(fullHttpRequest.method().name());
EndPointInfo endPointInfo = EndPointInfoCache.get(builder.getPathStr(), requestMethod);
Class<?>[] paramTypes = endPointInfo.getMethod().getParameterTypes();
String content = fullHttpRequest.content().toString(CharsetUtil.UTF_8);
EasyRetryRequest retryRequest = JsonUtil.parseObject(content, EasyRetryRequest.class);
Object[] args = retryRequest.getArgs();
Object resultObj = null;
try {
if (paramTypes.length > 0) {
resultObj = ReflectionUtils.invokeMethod(endPointInfo.getMethod(), endPointInfo.getExecutor(), args);
} else {
resultObj = ReflectionUtils.invokeMethod(endPointInfo.getMethod(), endPointInfo.getExecutor());
}
} catch (Exception e) {
EasyRetryLog.LOCAL.error("http request error. [{}]", content, e);
resultObj = new Result<>(0, e.getMessage());
throw e;
} finally {
writeResponse(channelHandlerContext, HttpUtil.isKeepAlive(fullHttpRequest), JsonUtil.toJsonString(resultObj));
}
}
private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {
// write response
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE,
HttpHeaderValues.APPLICATION_JSON);
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
if (keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
ctx.writeAndFlush(response);
}
}

View File

@ -0,0 +1,37 @@
package com.aizuda.easy.retry.client.common.netty.server;
import com.aizuda.easy.retry.client.common.Lifecycle;
import com.aizuda.easy.retry.client.common.exception.EasyRetryClientException;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author opensnail
* @date 2024-04-11 22:57:03
* @since 3.3.0
*/
@Component
@RequiredArgsConstructor
public class SnailEndPointRegistrar implements Lifecycle {
private final SnailEndPointScanner snailEndPointScanner;
@Override
public void start() {
List<EndPointInfo> endPointInfos = snailEndPointScanner.doScan();
for (EndPointInfo endPointInfo : endPointInfos) {
if (EndPointInfoCache.isExisted(endPointInfo.getPath(), endPointInfo.getRequestMethod())) {
throw new EasyRetryClientException("Duplicate endpoint path: {}" , endPointInfo.getPath());
}
EndPointInfoCache.put(endPointInfo);
}
}
@Override
public void close() {
}
}

View File

@ -0,0 +1,72 @@
package com.aizuda.easy.retry.client.common.netty.server;
import com.aizuda.easy.retry.client.common.annotation.Mapping;
import com.aizuda.easy.retry.client.common.annotation.SnailEndPoint;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.annotation.AnnotatedElementUtils;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* @author opensnail
* @date 2024-04-11 22:29:07
* @since 3.3.0
*/
public class SnailEndPointScanner implements ApplicationContextAware {
private ApplicationContext context;
public List<EndPointInfo> doScan() {
return scanEndPoint();
}
private List<EndPointInfo> scanEndPoint() {
List<EndPointInfo> endPointInfoList = new ArrayList<>();
String[] beanDefinitionNames = context.getBeanNamesForType(Object.class, false, true);
for (String beanDefinitionName : beanDefinitionNames) {
Object bean = context.getBean(beanDefinitionName);
String executorClassName = bean.getClass().getName();
// 扫描类的注解
SnailEndPoint jobExecutor = bean.getClass().getAnnotation(SnailEndPoint.class);
if (Objects.nonNull(jobExecutor)) {
Map<Method, Mapping> annotatedMethods = null;
try {
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
(MethodIntrospector.MetadataLookup<Mapping>) method -> AnnotatedElementUtils
.findMergedAnnotation(method, Mapping.class));
} catch (Throwable ex) {
EasyRetryLog.LOCAL.error("{} Mapping加载异常{}", beanDefinitionName, ex);
}
for (Map.Entry<Method, Mapping> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
Mapping mapping = entry.getValue();
endPointInfoList.add(EndPointInfo.builder()
.executorName(executorClassName)
.method(method)
.executor(bean)
.path(mapping.path())
.requestMethod(mapping.method())
.build());
}
}
}
return endPointInfoList;
}
@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
this.context = context;
}
}

View File

@ -1,31 +0,0 @@
{
"groups": [
{
"name": "easy-retry",
"type": "com.aizuda.easy.retry.client.common.config.EasyRetryProperties",
"sourceType": "com.aizuda.easy.retry.client.common.config.EasyRetryProperties"
},
{
"name": "easy-retry.server",
"sourceMethod": "getServer()",
"type": "com.aizuda.easy.retry.client.common.config.EasyRetryProperties$ServerConfig",
"sourceType": "com.aizuda.easy.retry.client.common.config.EasyRetryProperties"
}
],
"properties": [
{
"name": "easy-retry.server.host",
"type": "java.lang.String",
"defaultValue": "127.0.0.1",
"description": "服务端的地址,若服务端集群部署则此处配置域名",
"sourceType": "com.aizuda.easy.retry.client.common.config.EasyRetryProperties$ServerConfig"
},
{
"name": "easy-retry.server.port",
"type": "com.aizuda.easy.retry.client.common.config.EasyRetryProperties.ServerConfig",
"sourceType": "com.aizuda.easy.retry.client.common.config.EasyRetryProperties",
"description": "服务端netty的端口号",
"defaultValue": "1788"
}
]
}

View File

@ -1,7 +1,10 @@
package com.aizuda.easy.retry.client.job.core.client;
import com.aizuda.easy.retry.client.common.annotation.Authentication;
import com.aizuda.easy.retry.client.common.annotation.Mapping;
import com.aizuda.easy.retry.client.common.annotation.SnailEndPoint;
import com.aizuda.easy.retry.client.common.log.support.EasyRetryLogManager;
import com.aizuda.easy.retry.client.common.netty.RequestMethod;
import com.aizuda.easy.retry.client.job.core.IJobExecutor;
import com.aizuda.easy.retry.client.job.core.cache.JobExecutorInfoCache;
import com.aizuda.easy.retry.client.job.core.cache.ThreadPoolCache;
@ -16,12 +19,9 @@ import com.aizuda.easy.retry.common.core.model.JobContext;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.common.log.enums.LogTypeEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Objects;
import java.util.concurrent.ThreadPoolExecutor;
@ -30,13 +30,10 @@ import java.util.concurrent.ThreadPoolExecutor;
* @author: www.byteblogs.com
* @date : 2023-09-27 16:33
*/
@RestController
@RequestMapping("/job")
@Slf4j
@SnailEndPoint
public class JobEndPoint {
@PostMapping("/dispatch/v1")
@Authentication
@Mapping(path = "/job/dispatch/v1", method = RequestMethod.POST)
public Result<Boolean> dispatchJob(@RequestBody @Validated DispatchJobRequest dispatchJob) {
try {

View File

@ -5,6 +5,7 @@ import com.aizuda.easy.retry.client.common.exception.EasyRetryClientException;
import com.aizuda.easy.retry.client.job.core.Scanner;
import com.aizuda.easy.retry.client.job.core.cache.JobExecutorInfoCache;
import com.aizuda.easy.retry.client.job.core.dto.JobExecutorInfo;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -16,10 +17,9 @@ import java.util.Objects;
* @date : 2022-02-10 09:12
*/
@Component
@RequiredArgsConstructor
public class JobExecutorRegistrar implements Lifecycle {
@Autowired
private List<Scanner> scanners;
private final List<Scanner> scanners;
public void registerRetryHandler(JobExecutorInfo jobExecutorInfo) {

View File

@ -35,12 +35,12 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware {
@Override
public List<JobExecutorInfo> doScan() {
return scanRetryAbleMethod();
return scanJobExecutor();
}
private List<JobExecutorInfo> scanRetryAbleMethod() {
private List<JobExecutorInfo> scanJobExecutor() {
List<JobExecutorInfo> retryerInfoList = new ArrayList<>();
List<JobExecutorInfo> jobExecutorInfoList = new ArrayList<>();
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
for (String beanDefinitionName : beanDefinitionNames) {
Object bean = applicationContext.getBean(beanDefinitionName);
@ -59,7 +59,7 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware {
// 通过实现接口进行注册
if (IJobExecutor.class.isAssignableFrom(bean.getClass())) {
if (!JobExecutorInfoCache.isExisted(executorClassName)) {
retryerInfoList.add(new JobExecutorInfo(executorClassName, ReflectionUtils.findMethod(bean.getClass(), "jobExecute"), bean));
jobExecutorInfoList.add(new JobExecutorInfo(executorClassName, ReflectionUtils.findMethod(bean.getClass(), "jobExecute"), bean));
}
}
@ -80,7 +80,7 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware {
method,
bean
);
retryerInfoList.add(jobExecutorInfo);
jobExecutorInfoList.add(jobExecutorInfo);
}
}
@ -103,11 +103,11 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware {
executeMethod,
bean
);
retryerInfoList.add(jobExecutorInfo);
jobExecutorInfoList.add(jobExecutorInfo);
}
}
return retryerInfoList;
return jobExecutorInfoList;
}
@Override

View File

@ -14,7 +14,8 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import org.springframework.beans.factory.annotation.Autowired;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
@ -28,10 +29,10 @@ import org.springframework.stereotype.Component;
*/
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
@RequiredArgsConstructor
@Getter
public class NettyHttpServer implements Runnable, Lifecycle {
@Autowired
private SystemProperties systemProperties;
private final SystemProperties systemProperties;
private Thread thread = null;
private volatile boolean started = false;
@ -97,8 +98,4 @@ public class NettyHttpServer implements Runnable, Lifecycle {
thread.interrupt();
}
}
public boolean isStarted() {
return started;
}
}