diff --git a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/annotation/Authentication.java b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/annotation/Authentication.java deleted file mode 100644 index b489efd7e..000000000 --- a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/annotation/Authentication.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.aizuda.easy.retry.client.common.annotation; - -/** - * Easy Retry Client 认证 - * - * @author: xiaownouniu - * @date : 2024-03-30 - * @since : 3.2.0 - */ -public @interface Authentication { -} diff --git a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/annotation/SnailEndPoint.java b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/annotation/SnailEndPoint.java index 065eac8af..bb8711b00 100644 --- a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/annotation/SnailEndPoint.java +++ b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/annotation/SnailEndPoint.java @@ -1,9 +1,30 @@ package com.aizuda.easy.retry.client.common.annotation; +import org.springframework.core.annotation.AliasFor; +import org.springframework.stereotype.Component; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + /** * @author opensnail * @date 2024-04-11 21:34:24 * @since 3.3.0 */ +@Target({ElementType.METHOD, ElementType.TYPE, ElementType.ANNOTATION_TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +@Documented +@Component public @interface SnailEndPoint { + + /** + * Alias for {@link Component#value}. + */ + @AliasFor(annotation = Component.class) + String value() default ""; } diff --git a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/intercepter/AuthenticationInterceptor.java b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/intercepter/AuthenticationInterceptor.java deleted file mode 100644 index 5ba4f7125..000000000 --- a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/intercepter/AuthenticationInterceptor.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.aizuda.easy.retry.client.common.intercepter; - -import com.aizuda.easy.retry.client.common.config.EasyRetryProperties; -import com.aizuda.easy.retry.client.common.exception.EasyRetryClientException; -import com.aizuda.easy.retry.common.core.constant.SystemConstants; -import lombok.RequiredArgsConstructor; -import org.aspectj.lang.annotation.Aspect; -import org.aspectj.lang.annotation.Before; -import org.springframework.stereotype.Component; -import org.springframework.web.context.request.RequestContextHolder; -import org.springframework.web.context.request.ServletRequestAttributes; - -import java.util.Optional; - -/** - * Easy Retry 认证拦截器 - * - * @author: xiaowoniu - * @date : 2022-04-18 09:19 - * @since 3.2.0 - */ -@Aspect -@Component -@RequiredArgsConstructor -public class AuthenticationInterceptor { - private final EasyRetryProperties easyRetryProperties; - @Before(value = "@annotation(com.aizuda.easy.retry.client.common.annotation.Authentication) || @within(com.aizuda.easy.retry.client.common.annotation.Authentication)") - public void easyRetryAuth() { - ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); - String easyRetryAuth = attributes.getRequest().getHeader(SystemConstants.EASY_RETRY_AUTH_TOKEN); - String configToken = Optional.ofNullable(easyRetryProperties.getToken()).orElse(SystemConstants.DEFAULT_TOKEN); - if (!configToken.equals(easyRetryAuth)) { - throw new EasyRetryClientException("认证失败.【请检查配置的Token是否正确】"); - } - } - -} diff --git a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/intercepter/HandlerInterceptor.java b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/intercepter/HandlerInterceptor.java new file mode 100644 index 000000000..cae7ea603 --- /dev/null +++ b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/intercepter/HandlerInterceptor.java @@ -0,0 +1,23 @@ +package com.aizuda.easy.retry.client.common.intercepter; + +import com.aizuda.easy.retry.client.common.netty.server.EndPointInfo; +import com.aizuda.easy.retry.client.common.netty.server.HttpRequest; +import com.aizuda.easy.retry.client.common.netty.server.HttpResponse; + +/** + * @author: opensnail + * @date : 2024-04-12 + * @since : 3.3.0 + */ +public interface HandlerInterceptor { + + boolean preHandle(HttpRequest httpRequest, HttpResponse httpResponse, EndPointInfo handler); + + void postHandle(HttpRequest httpRequest, HttpResponse httpResponse, EndPointInfo handler); + + void afterCompletion(HttpRequest httpRequest, HttpResponse httpResponse, EndPointInfo handler, Exception ex); + + default int order() { + return 1; + } +} diff --git a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/netty/DispatcherRequestHandler.java b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/netty/DispatcherRequestHandler.java new file mode 100644 index 000000000..8031d3351 --- /dev/null +++ b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/netty/DispatcherRequestHandler.java @@ -0,0 +1,17 @@ +package com.aizuda.easy.retry.client.common.netty; + +import com.aizuda.easy.retry.client.common.netty.server.NettyHttpRequest; +import org.springframework.stereotype.Component; + +/** + * @author: opensnail + * @date : 2024-04-12 + * @since : 3.3.0 + */ +@Component +public class DispatcherRequestHandler { + + public void dispatch(NettyHttpRequest nettyHttpRequest) { + + } +} diff --git a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/netty/NettyHttpClientHandler.java b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/netty/NettyHttpClientHandler.java index c51e08eff..bd3ecb4f2 100644 --- a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/netty/NettyHttpClientHandler.java +++ b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/netty/NettyHttpClientHandler.java @@ -35,11 +35,11 @@ public class NettyHttpClientHandler extends SimpleChannelInboundHandlernewBuilder() .client(NettyClient.class) - .callback(nettyResult ->EasyRetryLog.LOCAL.info("heartbeat check requestId:[{}]", nettyResult.getRequestId())) + .callback( + nettyResult -> EasyRetryLog.LOCAL.info("heartbeat check requestId:[{}]", nettyResult.getRequestId())) .build(); this.nettyHttpConnectClient = nettyHttpConnectClient; - } @Override @@ -49,7 +49,7 @@ public class NettyHttpClientHandler extends SimpleChannelInboundHandler headers = new HashMap<>(); + + public void setHeader(String key, Object value) { + headers.put(key, value); + } +} diff --git a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/netty/server/NettyHttpServerHandler.java b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/netty/server/NettyHttpServerHandler.java index 9c88f14cf..f2e7d3e75 100644 --- a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/netty/server/NettyHttpServerHandler.java +++ b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/netty/server/NettyHttpServerHandler.java @@ -1,18 +1,43 @@ package com.aizuda.easy.retry.client.common.netty.server; import cn.hutool.core.net.url.UrlBuilder; +import cn.hutool.core.util.ServiceLoaderUtil; +import com.aizuda.easy.retry.client.common.config.EasyRetryProperties; +import com.aizuda.easy.retry.client.common.exception.EasyRetryClientException; +import com.aizuda.easy.retry.client.common.intercepter.HandlerInterceptor; import com.aizuda.easy.retry.client.common.netty.RequestMethod; +import com.aizuda.easy.retry.common.core.constant.SystemConstants; +import com.aizuda.easy.retry.common.core.context.SpringContext; import com.aizuda.easy.retry.common.core.model.EasyRetryRequest; +import com.aizuda.easy.retry.common.core.model.NettyResult; import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.common.log.EasyRetryLog; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.*; import io.netty.util.CharsetUtil; +import org.springframework.scheduling.concurrent.CustomizableThreadFactory; +import org.springframework.util.CollectionUtils; import org.springframework.util.ReflectionUtils; +import java.lang.reflect.Method; +import java.lang.reflect.Type; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + /** * @author: opensnail * @date : 2024-04-11 16:03 @@ -20,52 +45,142 @@ import org.springframework.util.ReflectionUtils; */ public class NettyHttpServerHandler extends SimpleChannelInboundHandler { + private static final ThreadPoolExecutor DISPATCHER_THREAD_POOL = new ThreadPoolExecutor( + 16, 16, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), + new CustomizableThreadFactory("snail-netty-server-")); + public NettyHttpServerHandler() { } @Override - protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) - throws Exception { + protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) { - UrlBuilder builder = UrlBuilder.ofHttp(fullHttpRequest.uri()); - RequestMethod requestMethod = RequestMethod.valueOf(fullHttpRequest.method().name()); - - EndPointInfo endPointInfo = EndPointInfoCache.get(builder.getPathStr(), requestMethod); - - Class[] paramTypes = endPointInfo.getMethod().getParameterTypes(); String content = fullHttpRequest.content().toString(CharsetUtil.UTF_8); - EasyRetryRequest retryRequest = JsonUtil.parseObject(content, EasyRetryRequest.class); - Object[] args = retryRequest.getArgs(); + String uri = fullHttpRequest.uri(); + String name = fullHttpRequest.method().name(); + HttpHeaders headers = fullHttpRequest.headers(); + + NettyHttpRequest nettyHttpRequest = NettyHttpRequest.builder() + .keepAlive(HttpUtil.isKeepAlive(fullHttpRequest)) + .uri(fullHttpRequest.uri()) + .channelHandlerContext(channelHandlerContext) + .method(fullHttpRequest.method()) + .headers(fullHttpRequest.headers()) + .content(fullHttpRequest.content().toString(CharsetUtil.UTF_8)) + .build(); + + // 执行任务 + DISPATCHER_THREAD_POOL.execute(() -> { + + List handlerInterceptors = handlerInterceptors(); + EasyRetryRequest retryRequest = JsonUtil.parseObject(content, EasyRetryRequest.class); + HttpRequest httpRequest = new HttpRequest(); + HttpResponse httpResponse = new HttpResponse(); + EndPointInfo endPointInfo = null; + Result resultObj = null; + Exception e; + try { + EasyRetryProperties properties = SpringContext.getBean(EasyRetryProperties.class); + String easyRetryAuth = headers.getAsString(SystemConstants.EASY_RETRY_AUTH_TOKEN); + String configToken = Optional.ofNullable(properties.getToken()).orElse(SystemConstants.DEFAULT_TOKEN); + if (!configToken.equals(easyRetryAuth)) { + throw new EasyRetryClientException("认证失败.【请检查配置的Token是否正确】"); + } + + UrlBuilder builder = UrlBuilder.ofHttp(uri); + RequestMethod requestMethod = RequestMethod.valueOf(name); + + endPointInfo = EndPointInfoCache.get(builder.getPathStr(), requestMethod); + Class[] paramTypes = endPointInfo.getMethod().getParameterTypes(); + Object[] args = retryRequest.getArgs(); + + Object[] deSerialize = (Object[]) deSerialize(JsonUtil.toJsonString(args), endPointInfo.getMethod()); + + for (final HandlerInterceptor handlerInterceptor : handlerInterceptors) { + handlerInterceptor.preHandle(httpRequest, httpResponse, endPointInfo); + } + + if (paramTypes.length > 0) { + resultObj = (Result) ReflectionUtils.invokeMethod(endPointInfo.getMethod(), + endPointInfo.getExecutor(), deSerialize); + } else { + resultObj = (Result) ReflectionUtils.invokeMethod(endPointInfo.getMethod(), + endPointInfo.getExecutor()); + } + + for (final HandlerInterceptor handlerInterceptor : handlerInterceptors) { + handlerInterceptor.postHandle(httpRequest, httpResponse, endPointInfo); + } + } catch (Exception ex) { + EasyRetryLog.LOCAL.error("http request error. [{}]", content, ex); + resultObj = new NettyResult(0, ex.getMessage(), null, retryRequest.getReqId()); + e = ex; + } finally { + NettyResult nettyResult = new NettyResult(); + nettyResult.setRequestId(retryRequest.getReqId()); + if (Objects.nonNull(resultObj)) { + nettyResult.setData(resultObj.getData()); + nettyResult.setMessage(resultObj.getMessage()); + nettyResult.setStatus(resultObj.getStatus()); + } + + for (final HandlerInterceptor handlerInterceptor : handlerInterceptors) { + handlerInterceptor.afterCompletion(httpRequest, httpResponse, endPointInfo, e); + } + + writeResponse(channelHandlerContext, HttpUtil.isKeepAlive(fullHttpRequest), httpResponse, + JsonUtil.toJsonString(nettyResult)); - Object resultObj = null; - try { - if (paramTypes.length > 0) { - resultObj = ReflectionUtils.invokeMethod(endPointInfo.getMethod(), endPointInfo.getExecutor(), args); - } else { - resultObj = ReflectionUtils.invokeMethod(endPointInfo.getMethod(), endPointInfo.getExecutor()); } - } catch (Exception e) { - EasyRetryLog.LOCAL.error("http request error. [{}]", content, e); - resultObj = new Result<>(0, e.getMessage()); - throw e; - } finally { - writeResponse(channelHandlerContext, HttpUtil.isKeepAlive(fullHttpRequest), JsonUtil.toJsonString(resultObj)); - } - + }); } - private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) { + private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, final HttpResponse httpResponse, String responseJson) { // write response FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, - Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8)); + Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8)); response.headers().set(HttpHeaderNames.CONTENT_TYPE, - HttpHeaderValues.APPLICATION_JSON); + HttpHeaderValues.APPLICATION_JSON); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); if (keepAlive) { response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); } + Map headers = httpResponse.getHeaders(); + headers.forEach((key, value) -> response.headers().set(key, value)); ctx.writeAndFlush(response); } + public Object deSerialize(String infoStr, Method method) throws JsonProcessingException { + + Type[] paramTypes = method.getGenericParameterTypes(); + + Object[] params = new Object[paramTypes.length]; + + ObjectMapper mapper = new ObjectMapper(); + JsonNode jsonNode = JsonUtil.toJson(infoStr); + if (Objects.isNull(jsonNode)) { + EasyRetryLog.LOCAL.warn("jsonNode is null. infoStr:[{}]", infoStr); + return params; + } + + for (int i = 0; i < paramTypes.length; i++) { + JsonNode node = jsonNode.get(i); + if (Objects.nonNull(node)) { + params[i] = mapper.readValue(node.toString(), mapper.constructType(paramTypes[i])); + } + } + + return params; + } + + private static List handlerInterceptors() { + List handlerInterceptors = ServiceLoaderUtil.loadList(HandlerInterceptor.class); + if (CollectionUtils.isEmpty(handlerInterceptors)) { + return Collections.emptyList(); + } + + return handlerInterceptors.stream().sorted(Comparator.comparingInt(HandlerInterceptor::order)).collect( + Collectors.toList()); + } } diff --git a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/netty/server/SnailEndPointScanner.java b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/netty/server/SnailEndPointScanner.java index a33cff8a9..917f52181 100644 --- a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/netty/server/SnailEndPointScanner.java +++ b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/netty/server/SnailEndPointScanner.java @@ -3,11 +3,13 @@ package com.aizuda.easy.retry.client.common.netty.server; import com.aizuda.easy.retry.client.common.annotation.Mapping; import com.aizuda.easy.retry.client.common.annotation.SnailEndPoint; import com.aizuda.easy.retry.common.log.EasyRetryLog; +import org.springframework.aop.framework.AopProxyUtils; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.core.MethodIntrospector; import org.springframework.core.annotation.AnnotatedElementUtils; +import org.springframework.stereotype.Component; import java.lang.reflect.Method; import java.util.ArrayList; @@ -20,6 +22,7 @@ import java.util.Objects; * @date 2024-04-11 22:29:07 * @since 3.3.0 */ +@Component public class SnailEndPointScanner implements ApplicationContextAware { private ApplicationContext context; @@ -33,7 +36,8 @@ public class SnailEndPointScanner implements ApplicationContextAware { String[] beanDefinitionNames = context.getBeanNamesForType(Object.class, false, true); for (String beanDefinitionName : beanDefinitionNames) { Object bean = context.getBean(beanDefinitionName); - String executorClassName = bean.getClass().getName(); + Class executorNotProxy = AopProxyUtils.ultimateTargetClass(bean); + String executorClassName = executorNotProxy.getName(); // 扫描类的注解 SnailEndPoint jobExecutor = bean.getClass().getAnnotation(SnailEndPoint.class); diff --git a/easy-retry-client/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/client/RetryEndPoint.java b/easy-retry-client/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/client/RetryEndPoint.java index e4e91c6b7..eb5c83825 100644 --- a/easy-retry-client/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/client/RetryEndPoint.java +++ b/easy-retry-client/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/client/RetryEndPoint.java @@ -1,8 +1,10 @@ package com.aizuda.easy.retry.client.core.client; import cn.hutool.core.lang.Assert; -import com.aizuda.easy.retry.client.common.annotation.Authentication; +import com.aizuda.easy.retry.client.common.annotation.Mapping; +import com.aizuda.easy.retry.client.common.annotation.SnailEndPoint; import com.aizuda.easy.retry.client.common.log.support.EasyRetryLogManager; +import com.aizuda.easy.retry.client.common.netty.RequestMethod; import com.aizuda.easy.retry.client.core.IdempotentIdGenerate; import com.aizuda.easy.retry.client.core.RetryArgSerializer; import com.aizuda.easy.retry.client.common.cache.GroupVersionCache; @@ -30,19 +32,19 @@ import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.common.log.enums.LogTypeEnum; import com.aizuda.easy.retry.server.model.dto.ConfigDTO; import com.fasterxml.jackson.core.JsonProcessingException; -import lombok.extern.slf4j.Slf4j; +import jakarta.validation.ConstraintViolation; +import jakarta.validation.Validation; +import jakarta.validation.Validator; +import jakarta.validation.ValidatorFactory; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.util.ReflectionUtils; -import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; import java.lang.reflect.Method; import java.util.Objects; +import java.util.Set; /** * 服务端调调用客户端进行重试流量下发、配置变更通知等操作 @@ -50,9 +52,7 @@ import java.util.Objects; * @author: www.byteblogs.com * @date : 2022-03-09 16:33 */ -@RestController -@RequestMapping("/retry") -@Slf4j +@SnailEndPoint public class RetryEndPoint { @Autowired @@ -62,9 +62,15 @@ public class RetryEndPoint { /** * 服务端调度重试入口 */ - @PostMapping("/dispatch/v1") - @Authentication - public Result dispatch(@RequestBody @Validated DispatchRetryDTO executeReqDto) { + @Mapping(path = "/retry/dispatch/v1", method = RequestMethod.POST) + public Result dispatch(DispatchRetryDTO executeReqDto) { + + ValidatorFactory vf = Validation.buildDefaultValidatorFactory(); + Validator validator = vf.getValidator(); + Set> set = validator.validate(executeReqDto); + for (final ConstraintViolation violation : set) { + return new Result<>(violation.getMessage(), null); + } RetryerInfo retryerInfo = RetryerInfoCache.get(executeReqDto.getScene(), executeReqDto.getExecutorName()); if (Objects.isNull(retryerInfo)) { @@ -144,16 +150,21 @@ public class RetryEndPoint { /** * 同步版本 */ - @PostMapping("/sync/version/v1") - @Authentication - public Result syncVersion(@RequestBody ConfigDTO configDTO) { + @Mapping(path = "/retry/sync/version/v1", method = RequestMethod.POST) + public Result syncVersion(ConfigDTO configDTO) { GroupVersionCache.configDTO = configDTO; return new Result(); } - @PostMapping("/callback/v1") - @Authentication - public Result callback(@RequestBody @Validated RetryCallbackDTO callbackDTO) { + @Mapping(path = "/retry/callback/v1", method = RequestMethod.POST) + public Result callback(RetryCallbackDTO callbackDTO) { + + ValidatorFactory vf = Validation.buildDefaultValidatorFactory(); + Validator validator = vf.getValidator(); + Set> set = validator.validate(callbackDTO); + for (final ConstraintViolation violation : set) { + return new Result<>(violation.getMessage(), null); + } RetryerInfo retryerInfo = null; Object[] deSerialize = null; @@ -264,9 +275,15 @@ public class RetryEndPoint { * @return idempotentId */ @PostMapping("/generate/idempotent-id/v1") - @Authentication public Result idempotentIdGenerate( - @RequestBody @Validated GenerateRetryIdempotentIdDTO generateRetryIdempotentIdDTO) { + GenerateRetryIdempotentIdDTO generateRetryIdempotentIdDTO) { + + ValidatorFactory vf = Validation.buildDefaultValidatorFactory(); + Validator validator = vf.getValidator(); + Set> set = validator.validate(generateRetryIdempotentIdDTO); + for (final ConstraintViolation violation : set) { + return new Result<>(violation.getMessage(), null); + } String scene = generateRetryIdempotentIdDTO.getScene(); String executorName = generateRetryIdempotentIdDTO.getExecutorName(); diff --git a/easy-retry-client/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/loader/EasyRetrySpiLoader.java b/easy-retry-client/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/loader/EasyRetrySpiLoader.java index e960b8b56..f31ea0a33 100644 --- a/easy-retry-client/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/loader/EasyRetrySpiLoader.java +++ b/easy-retry-client/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/loader/EasyRetrySpiLoader.java @@ -1,6 +1,5 @@ package com.aizuda.easy.retry.client.core.loader; -import cn.hutool.core.lang.Pair; import cn.hutool.core.util.ServiceLoaderUtil; import com.aizuda.easy.retry.client.core.expression.ExpressionInvocationHandler; import com.aizuda.easy.retry.common.core.expression.ExpressionEngine; diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java index fe851c461..1c688167f 100644 --- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java +++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java @@ -1,6 +1,5 @@ package com.aizuda.easy.retry.client.job.core.client; -import com.aizuda.easy.retry.client.common.annotation.Authentication; import com.aizuda.easy.retry.client.common.annotation.Mapping; import com.aizuda.easy.retry.client.common.annotation.SnailEndPoint; import com.aizuda.easy.retry.client.common.log.support.EasyRetryLogManager; @@ -12,6 +11,7 @@ import com.aizuda.easy.retry.client.job.core.dto.JobExecutorInfo; import com.aizuda.easy.retry.client.job.core.executor.AbstractJobExecutor; import com.aizuda.easy.retry.client.job.core.executor.AnnotationJobExecutor; import com.aizuda.easy.retry.client.job.core.log.JobLogMeta; +import com.aizuda.easy.retry.client.model.DispatchRetryDTO; import com.aizuda.easy.retry.client.model.StopJobDTO; import com.aizuda.easy.retry.client.model.request.DispatchJobRequest; import com.aizuda.easy.retry.common.core.context.SpringContext; @@ -19,11 +19,15 @@ import com.aizuda.easy.retry.common.core.model.JobContext; import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.common.log.EasyRetryLog; import com.aizuda.easy.retry.common.log.enums.LogTypeEnum; +import jakarta.validation.ConstraintViolation; +import jakarta.validation.Validation; +import jakarta.validation.Validator; +import jakarta.validation.ValidatorFactory; import org.springframework.validation.annotation.Validated; -import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ThreadPoolExecutor; /** @@ -34,7 +38,14 @@ import java.util.concurrent.ThreadPoolExecutor; public class JobEndPoint { @Mapping(path = "/job/dispatch/v1", method = RequestMethod.POST) - public Result dispatchJob(@RequestBody @Validated DispatchJobRequest dispatchJob) { + public Result dispatchJob(DispatchJobRequest dispatchJob) { + + ValidatorFactory vf = Validation.buildDefaultValidatorFactory(); + Validator validator = vf.getValidator(); + Set> set = validator.validate(dispatchJob); + for (final ConstraintViolation violation : set) { + return new Result<>(violation.getMessage(), Boolean.FALSE); + } try { JobContext jobContext = buildJobContext(dispatchJob); @@ -44,7 +55,7 @@ public class JobEndPoint { if (Objects.nonNull(dispatchJob.getRetryCount()) && dispatchJob.getRetryCount() > 0) { EasyRetryLog.REMOTE.info("任务执行/调度失败执行重试. 重试次数:[{}]", - dispatchJob.getRetryCount()); + dispatchJob.getRetryCount()); } JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(jobContext.getExecutorInfo()); @@ -106,9 +117,16 @@ public class JobEndPoint { return jobContext; } - @PostMapping("/stop/v1") - @Authentication - public Result stopJob(@RequestBody @Validated StopJobDTO interruptJob) { + @Mapping(path = "/stop/v1", method = RequestMethod.POST) + public Result stopJob(StopJobDTO interruptJob) { + + ValidatorFactory vf = Validation.buildDefaultValidatorFactory(); + Validator validator = vf.getValidator(); + Set> set = validator.validate(interruptJob); + for (final ConstraintViolation violation : set) { + return new Result<>(violation.getMessage(), Boolean.FALSE); + } + ThreadPoolExecutor threadPool = ThreadPoolCache.getThreadPool(interruptJob.getTaskBatchId()); if (Objects.isNull(threadPool) || threadPool.isShutdown() || threadPool.isTerminated()) { return new Result<>(Boolean.TRUE); diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/register/scan/JobExecutorScanner.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/register/scan/JobExecutorScanner.java index 936366cf3..b6caa699d 100644 --- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/register/scan/JobExecutorScanner.java +++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/register/scan/JobExecutorScanner.java @@ -8,6 +8,7 @@ import com.aizuda.easy.retry.client.job.core.dto.JobArgs; import com.aizuda.easy.retry.client.job.core.dto.JobExecutorInfo; import com.aizuda.easy.retry.common.log.EasyRetryLog; import lombok.extern.slf4j.Slf4j; +import org.springframework.aop.framework.AopProxyUtils; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; @@ -54,14 +55,14 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware { EasyRetryLog.LOCAL.error("{} JobExecutor加载异常:{}", beanDefinitionName, ex); } - String executorClassName = bean.getClass().getName(); + Class executorNotProxy = AopProxyUtils.ultimateTargetClass(bean); + String executorClassName = executorNotProxy.getName(); // 通过实现接口进行注册 if (IJobExecutor.class.isAssignableFrom(bean.getClass())) { if (!JobExecutorInfoCache.isExisted(executorClassName)) { jobExecutorInfoList.add(new JobExecutorInfo(executorClassName, ReflectionUtils.findMethod(bean.getClass(), "jobExecute"), bean)); } - } // 扫描类的注解 diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/model/NettyResult.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/model/NettyResult.java index cf229814e..0898f293a 100644 --- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/model/NettyResult.java +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/model/NettyResult.java @@ -1,11 +1,13 @@ package com.aizuda.easy.retry.common.core.model; import lombok.Data; +import lombok.EqualsAndHashCode; /** * @author: www.byteblogs.com * @date : 2022-02-16 14:07 */ +@EqualsAndHashCode(callSuper = true) @Data public class NettyResult extends Result { diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RpcClientInvokeHandler.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RpcClientInvokeHandler.java index b37959f64..7e8816909 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RpcClientInvokeHandler.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RpcClientInvokeHandler.java @@ -1,13 +1,16 @@ package com.aizuda.easy.retry.server.common.client; +import cn.hutool.core.date.StopWatch; import cn.hutool.core.lang.Assert; import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.URLUtil; import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.context.SpringContext; +import com.aizuda.easy.retry.common.core.model.EasyRetryRequest; import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.common.core.util.NetUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil; +import com.aizuda.easy.retry.common.log.EasyRetryLog; import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; import com.aizuda.easy.retry.server.common.cache.CacheToken; import com.aizuda.easy.retry.server.common.client.annotation.Body; @@ -17,18 +20,22 @@ import com.aizuda.easy.retry.server.common.client.annotation.Param; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler; +import com.aizuda.easy.retry.server.common.netty.client.NettyChannel; +import com.aizuda.easy.retry.server.common.netty.client.RpcContext; import com.github.rholder.retry.RetryException; import com.github.rholder.retry.RetryListener; import com.github.rholder.retry.Retryer; import com.github.rholder.retry.RetryerBuilder; import com.github.rholder.retry.StopStrategies; import com.github.rholder.retry.WaitStrategies; +import io.netty.handler.codec.http.CombinedHttpHeaders; +import io.netty.handler.codec.http.DefaultHttpHeaders; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.springframework.http.HttpEntity; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpMethod; import org.springframework.http.ResponseEntity; import org.springframework.util.CollectionUtils; import org.springframework.web.client.ResourceAccessException; @@ -42,8 +49,12 @@ import java.text.MessageFormat; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * 请求处理器 @@ -68,8 +79,8 @@ public class RpcClientInvokeHandler implements InvocationHandler { private final Integer routeKey; private final String allocKey; private final Integer executorTimeout; - private final String namespaceId; + private final boolean async; public RpcClientInvokeHandler(final String groupName, final RegisterNodeInfo registerNodeInfo, final boolean failRetry, final int retryTimes, @@ -89,6 +100,7 @@ public class RpcClientInvokeHandler implements InvocationHandler { this.allocKey = allocKey; this.executorTimeout = executorTimeout; this.namespaceId = namespaceId; + this.async = false; } @Override @@ -134,33 +146,56 @@ public class RpcClientInvokeHandler implements InvocationHandler { Assert.notNull(parasResult.body, () -> new EasyRetryServerException("body cannot be null")); } - RestTemplate restTemplate = SpringContext.getBean(RestTemplate.class); - Retryer retryer = buildResultRetryer(); HttpHeaders requestHeaders = parasResult.requestHeaders; - if (Objects.nonNull(executorTimeout)) { - requestHeaders.set(RequestInterceptor.TIMEOUT_TIME, String.valueOf(executorTimeout)); - } - // 统一设置Token requestHeaders.set(SystemConstants.EASY_RETRY_AUTH_TOKEN, CacheToken.get(groupName, namespaceId)); + EasyRetryRequest easyRetryRequest = new EasyRetryRequest(args); Result result = retryer.call(() -> { - ResponseEntity response = restTemplate.exchange( - // 拼接 url?a=1&b=1 - getUrl(mapping, parasResult.paramMap).toString(), - // post or get - HttpMethod.valueOf(mapping.method().name()), - // body - new HttpEntity<>(parasResult.body, parasResult.requestHeaders), - // 返回值类型 - Result.class); - return Objects.requireNonNull(response.getBody()); + StopWatch sw = new StopWatch(); + + sw.start("request start " + easyRetryRequest.getReqId()); + + CompletableFuture completableFuture = null; + if (async) { +// RpcContext.setCompletableFuture(easyRetryRequest.getReqId(), null); + } else { + completableFuture = new CompletableFuture<>(); + RpcContext.setCompletableFuture(easyRetryRequest.getReqId(), completableFuture); + } + + try { + NettyChannel.send(hostId, hostIp, hostPort, + HttpMethod.valueOf(mapping.method().name()), // 拼接 url?a=1&b=1 + mapping.path(), easyRetryRequest.toString(), requestHeaders); + } finally { + sw.stop(); + } + + EasyRetryLog.LOCAL.debug("request complete requestId:[{}] 耗时:[{}ms]", easyRetryRequest.getReqId(), + sw.getTotalTimeMillis()); + if (async) { + return null; + } else { + Assert.notNull(completableFuture, () -> new EasyRetryServerException("completableFuture is null")); + try { + return (Result) completableFuture.get(Optional.ofNullable(executorTimeout).orElse(20), TimeUnit.SECONDS); + } catch (ExecutionException e) { + throw new EasyRetryServerException("Request to remote interface exception. path:[{}]", + mapping.path()); + } catch (TimeoutException e) { + throw new EasyRetryServerException("Request to remote interface timed out. path:[{}]", + mapping.path()); + } + } + }); - log.debug("Request client success. count:[{}] clientId:[{}] clientAddr:[{}:{}] serverIp:[{}]", count, hostId, + log.debug("Request client success. count:[{}] clientId:[{}] clientAddr:[{}:{}] serverIp:[{}]", count, + hostId, hostIp, hostPort, NetUtil.getLocalIpStr()); return result; @@ -239,7 +274,7 @@ public class RpcClientInvokeHandler implements InvocationHandler { private ParseParasResult doParseParams(Method method, Object[] args) { Object body = null; - HttpHeaders requestHeaders = new HttpHeaders(); + DefaultHttpHeaders requestHeaders = new DefaultHttpHeaders(); Map paramMap = new HashMap<>(); // 解析参数 Parameter[] parameters = method.getParameters(); @@ -267,7 +302,7 @@ public class RpcClientInvokeHandler implements InvocationHandler { private static class ParseParasResult { private Object body = null; - private HttpHeaders requestHeaders; + private DefaultHttpHeaders requestHeaders; private Map paramMap; } } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/netty/client/NettyChannel.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/netty/client/NettyChannel.java new file mode 100644 index 000000000..d8b3001a5 --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/netty/client/NettyChannel.java @@ -0,0 +1,143 @@ +package com.aizuda.easy.retry.server.common.netty.client; + +import com.aizuda.easy.retry.common.log.EasyRetryLog; +import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; +import com.aizuda.easy.retry.server.common.triple.Pair; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpVersion; +import lombok.extern.slf4j.Slf4j; + +import java.net.ConnectException; +import java.nio.channels.ClosedChannelException; +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +/** + * @author www.byteblogs.com + * @date 2023-05-13 + * @since 1.3.0 + */ +@Slf4j +public class NettyChannel { + private NettyChannel() { + } + + private static Bootstrap bootstrap; + private static ConcurrentHashMap, Channel> CHANNEL_MAP = new ConcurrentHashMap<>(16); + + + public static void setChannel(String hostId, String ip, Channel channel) { + CHANNEL_MAP.put(Pair.of(hostId, ip), channel); + } + + public static void removeChannel(Channel channel) { + CHANNEL_MAP.forEach((key, value) -> { + if (value.equals(channel)) { + CHANNEL_MAP.remove(key); + } + }); + } + + public static void setBootstrap(Bootstrap bootstrap) { + NettyChannel.bootstrap = bootstrap; + } + + /** + * 发送数据 + * + * @param method 请求方式 + * @param url url地址 + * @param body 请求的消息体 + * @throws InterruptedException + */ + public static void send(String hostId, String hostIp, Integer port, HttpMethod method, String url, String body, HttpHeaders requestHeaders) throws InterruptedException { + + Channel channel = CHANNEL_MAP.get(Pair.of(hostId, hostIp)); + if (Objects.isNull(channel) || !channel.isActive()) { + channel = connect(hostId, hostIp, port); + if (Objects.isNull(channel)) { + EasyRetryLog.LOCAL.error("send message but channel is null url:[{}] method:[{}] body:[{}] ", url, method, body); + return; + } + } + + // 配置HttpRequest的请求数据和一些配置信息 + FullHttpRequest request = new DefaultFullHttpRequest( + HttpVersion.HTTP_1_1, method, url, Unpooled.wrappedBuffer(body.getBytes(StandardCharsets.UTF_8))); + + request.headers() + .set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON) + //开启长连接 + .set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE) + //设置传递请求内容的长度 + .set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes()) + ; + request.headers().setAll(requestHeaders); + + //发送数据 + channel.writeAndFlush(request).sync(); + } + + /** + * 连接客户端 + * + * @return + */ + public static Channel connect(String hostId, String ip, Integer port) { + + try { + ChannelFuture channelFuture = bootstrap + .remoteAddress(ip, port) + .connect(); + + boolean notTimeout = channelFuture.awaitUninterruptibly(30, TimeUnit.SECONDS); + Channel channel = channelFuture.channel(); + if (notTimeout) { + // 连接成功 + if (channel != null && channel.isActive()) { + EasyRetryLog.LOCAL.info("netty client started {} connect to server", channel.localAddress()); + NettyChannel.setChannel(hostId, ip, channel); + return channel; + } + + Throwable cause = channelFuture.cause(); + if (cause != null) { + exceptionHandler(cause); + } + } else { + EasyRetryLog.LOCAL.warn("connect remote host[{}] timeout {}s", channel.remoteAddress(), 30); + } + } catch (Exception e) { + exceptionHandler(e); + } + + return null; + } + + /** + * 连接失败处理 + * + * @param cause + */ + private static void exceptionHandler(Throwable cause) { + if (cause instanceof ConnectException) { + EasyRetryLog.LOCAL.error("connect error:{}", cause.getMessage()); + } else if (cause instanceof ClosedChannelException) { + EasyRetryLog.LOCAL.error("connect error:{}", "client has destroy"); + } else { + EasyRetryLog.LOCAL.error("connect error:", cause); + } + } + +} diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/netty/client/NettyHttpClientHandler.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/netty/client/NettyHttpClientHandler.java new file mode 100644 index 000000000..8432ae24f --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/netty/client/NettyHttpClientHandler.java @@ -0,0 +1,93 @@ +package com.aizuda.easy.retry.server.common.netty.client; + +import com.aizuda.easy.retry.common.core.model.NettyResult; +import com.aizuda.easy.retry.common.core.util.JsonUtil; +import com.aizuda.easy.retry.common.log.EasyRetryLog; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.util.CharsetUtil; +import lombok.extern.slf4j.Slf4j; + +/** + * netty 客户端处理器 + * + * @author: www.byteblogs.com + * @date : 2022-03-07 18:30 + * @since 1.0.0 + */ +@Slf4j +public class NettyHttpClientHandler extends SimpleChannelInboundHandler { + + public NettyHttpClientHandler() { + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception { + + FullHttpResponse response = msg; + String content = response.content().toString(CharsetUtil.UTF_8); + HttpHeaders headers = response.headers(); + + EasyRetryLog.LOCAL.info("Receive server data content:[{}], headers:[{}]", content, headers); + NettyResult nettyResult = JsonUtil.parseObject(content, NettyResult.class); + RpcContext.invoke(nettyResult.getRequestId(), nettyResult); + + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + super.channelRegistered(ctx); + EasyRetryLog.LOCAL.debug("channelRegistered"); + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + EasyRetryLog.LOCAL.debug("channelUnregistered"); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + EasyRetryLog.LOCAL.debug("channelActive"); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + EasyRetryLog.LOCAL.debug("channelInactive"); + NettyChannel.removeChannel(ctx.channel()); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + super.channelReadComplete(ctx); + EasyRetryLog.LOCAL.debug("channelReadComplete"); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + super.channelWritabilityChanged(ctx); + EasyRetryLog.LOCAL.debug("channelWritabilityChanged"); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + EasyRetryLog.LOCAL.error("easy-retry netty-http client exception", cause); + super.exceptionCaught(ctx, cause); + NettyChannel.removeChannel(ctx.channel()); + + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + super.handlerRemoved(ctx); + NettyChannel.removeChannel(ctx.channel()); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + EasyRetryLog.LOCAL.debug("userEventTriggered"); + } +} diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/netty/client/NettyHttpConnectClient.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/netty/client/NettyHttpConnectClient.java new file mode 100644 index 000000000..38e1eda53 --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/netty/client/NettyHttpConnectClient.java @@ -0,0 +1,73 @@ +package com.aizuda.easy.retry.server.common.netty.client; + +import com.aizuda.easy.retry.common.log.EasyRetryLog; +import com.aizuda.easy.retry.server.common.Lifecycle; +import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.timeout.IdleStateHandler; +import lombok.Getter; +import org.springframework.core.Ordered; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; + +import java.net.ConnectException; +import java.nio.channels.ClosedChannelException; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +/** + * 初始化bootstrap + * + * @author: www.byteblogs.com + * @date : 2022-03-07 18:24 + * @since 1.0.0 + */ +@Getter +@Component +@Order(Ordered.HIGHEST_PRECEDENCE) +public class NettyHttpConnectClient implements Lifecycle { + + private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(); + private static final Bootstrap bootstrap = new Bootstrap(); + + @Override + public void start() { + + try { + bootstrap.group(nioEventLoopGroup) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel channel) throws Exception { + channel.pipeline() + .addLast(new IdleStateHandler(0, 0, 30, TimeUnit.SECONDS)) + .addLast(new HttpClientCodec()) + .addLast(new HttpObjectAggregator(5 * 1024 * 1024)) + .addLast(new NettyHttpClientHandler()); + } + }) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000); + NettyChannel.setBootstrap(bootstrap); + } catch (Exception e) { + EasyRetryLog.LOCAL.error("Client start exception", e); + } + } + + @Override + public void close() { + } + + + +} diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/netty/client/RpcContext.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/netty/client/RpcContext.java new file mode 100644 index 000000000..34e36380e --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/netty/client/RpcContext.java @@ -0,0 +1,71 @@ +package com.aizuda.easy.retry.server.common.netty.client; + +import com.aizuda.easy.retry.common.core.model.NettyResult; +import com.aizuda.easy.retry.common.log.EasyRetryLog; +import lombok.extern.slf4j.Slf4j; + +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Consumer; + +/** + * 处理请求服务端时需要存储的中间值 + * + * @author: www.byteblogs.com + * @date : 2023-05-12 09:05 + * @since 1.3.0 + */ +@Slf4j +public class RpcContext { + + private static final ConcurrentMap COMPLETABLE_FUTURE = new ConcurrentHashMap<>(); + + private static final ConcurrentMap CALLBACK_CONSUMER = new ConcurrentHashMap<>(); + + public static void invoke(Long requestId, NettyResult nettyResult) { + + try { + // 同步请求同步返回 + Optional.ofNullable(getCompletableFuture(requestId)).ifPresent(completableFuture -> completableFuture.complete(nettyResult)); + + // 异步请求进行回调 + Optional.ofNullable(getConsumer(requestId)).ifPresent(consumer -> consumer.accept(nettyResult)); + } catch (Exception e) { + EasyRetryLog.LOCAL.error("回调处理失败 requestId:[{}]",requestId, e ); + } finally { + COMPLETABLE_FUTURE.remove(requestId); + CALLBACK_CONSUMER.remove(requestId); + } + + } + + public static void setCompletableFuture(long id, CompletableFuture completableFuture, Consumer callable) { + if (Objects.nonNull(completableFuture)) { + COMPLETABLE_FUTURE.put(id, completableFuture); + } + + if (Objects.nonNull(callable)) { + CALLBACK_CONSUMER.put(id, callable); + } + + } + + public static void setCompletableFuture(Long id, Consumer callable) { + setCompletableFuture(id, null, callable); + } + + public static void setCompletableFuture(Long id, CompletableFuture completableFuture) { + setCompletableFuture(id, completableFuture, null); + } + + public static CompletableFuture getCompletableFuture(Long id) { + return COMPLETABLE_FUTURE.get(id); + } + + public static Consumer getConsumer(Long id) { + return CALLBACK_CONSUMER.get(id); + } +} diff --git a/pom.xml b/pom.xml index 59d1ea416..7f061e1a1 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ 17 17 17 - 3.2.0 + 3.3.0-SNAPSHOT 1.0.0 4.1.94.Final 5.8.25