feat(3.3.0): 优化客户端RPC

This commit is contained in:
opensnail 2024-04-12 18:44:51 +08:00
parent d1e9227a13
commit ce8d2c550d
22 changed files with 759 additions and 150 deletions

View File

@ -1,11 +0,0 @@
package com.aizuda.easy.retry.client.common.annotation;
/**
* Easy Retry Client 认证
*
* @author: xiaownouniu
* @date : 2024-03-30
* @since : 3.2.0
*/
public @interface Authentication {
}

View File

@ -1,9 +1,30 @@
package com.aizuda.easy.retry.client.common.annotation;
import org.springframework.core.annotation.AliasFor;
import org.springframework.stereotype.Component;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author opensnail
* @date 2024-04-11 21:34:24
* @since 3.3.0
*/
@Target({ElementType.METHOD, ElementType.TYPE, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
@Component
public @interface SnailEndPoint {
/**
* Alias for {@link Component#value}.
*/
@AliasFor(annotation = Component.class)
String value() default "";
}

View File

@ -1,37 +0,0 @@
package com.aizuda.easy.retry.client.common.intercepter;
import com.aizuda.easy.retry.client.common.config.EasyRetryProperties;
import com.aizuda.easy.retry.client.common.exception.EasyRetryClientException;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import lombok.RequiredArgsConstructor;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import java.util.Optional;
/**
* Easy Retry 认证拦截器
*
* @author: xiaowoniu
* @date : 2022-04-18 09:19
* @since 3.2.0
*/
@Aspect
@Component
@RequiredArgsConstructor
public class AuthenticationInterceptor {
private final EasyRetryProperties easyRetryProperties;
@Before(value = "@annotation(com.aizuda.easy.retry.client.common.annotation.Authentication) || @within(com.aizuda.easy.retry.client.common.annotation.Authentication)")
public void easyRetryAuth() {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
String easyRetryAuth = attributes.getRequest().getHeader(SystemConstants.EASY_RETRY_AUTH_TOKEN);
String configToken = Optional.ofNullable(easyRetryProperties.getToken()).orElse(SystemConstants.DEFAULT_TOKEN);
if (!configToken.equals(easyRetryAuth)) {
throw new EasyRetryClientException("认证失败.【请检查配置的Token是否正确】");
}
}
}

View File

@ -0,0 +1,23 @@
package com.aizuda.easy.retry.client.common.intercepter;
import com.aizuda.easy.retry.client.common.netty.server.EndPointInfo;
import com.aizuda.easy.retry.client.common.netty.server.HttpRequest;
import com.aizuda.easy.retry.client.common.netty.server.HttpResponse;
/**
* @author: opensnail
* @date : 2024-04-12
* @since : 3.3.0
*/
public interface HandlerInterceptor {
boolean preHandle(HttpRequest httpRequest, HttpResponse httpResponse, EndPointInfo handler);
void postHandle(HttpRequest httpRequest, HttpResponse httpResponse, EndPointInfo handler);
void afterCompletion(HttpRequest httpRequest, HttpResponse httpResponse, EndPointInfo handler, Exception ex);
default int order() {
return 1;
}
}

View File

@ -0,0 +1,17 @@
package com.aizuda.easy.retry.client.common.netty;
import com.aizuda.easy.retry.client.common.netty.server.NettyHttpRequest;
import org.springframework.stereotype.Component;
/**
* @author: opensnail
* @date : 2024-04-12
* @since : 3.3.0
*/
@Component
public class DispatcherRequestHandler {
public void dispatch(NettyHttpRequest nettyHttpRequest) {
}
}

View File

@ -35,11 +35,11 @@ public class NettyHttpClientHandler extends SimpleChannelInboundHandler<FullHttp
client = RequestBuilder.<NettyClient, NettyResult>newBuilder()
.client(NettyClient.class)
.callback(nettyResult ->EasyRetryLog.LOCAL.info("heartbeat check requestId:[{}]", nettyResult.getRequestId()))
.callback(
nettyResult -> EasyRetryLog.LOCAL.info("heartbeat check requestId:[{}]", nettyResult.getRequestId()))
.build();
this.nettyHttpConnectClient = nettyHttpConnectClient;
}
@Override
@ -49,7 +49,7 @@ public class NettyHttpClientHandler extends SimpleChannelInboundHandler<FullHttp
String content = response.content().toString(CharsetUtil.UTF_8);
HttpHeaders headers = response.headers();
EasyRetryLog.LOCAL.info("Receive server data content:[{}], headers:[{}]", content, headers);
EasyRetryLog.LOCAL.info("Receive server data content:[{}], headers:[{}]", content, headers);
NettyResult nettyResult = JsonUtil.parseObject(content, NettyResult.class);
RpcContext.invoke(nettyResult.getRequestId(), nettyResult);

View File

@ -10,6 +10,7 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
@ -20,6 +21,7 @@ import org.springframework.stereotype.Component;
import java.net.ConnectException;
import java.nio.channels.ClosedChannelException;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
@ -29,14 +31,14 @@ import java.util.concurrent.TimeUnit;
* @date : 2022-03-07 18:24
* @since 1.0.0
*/
@Getter
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class NettyHttpConnectClient implements Lifecycle, ApplicationContextAware {
public class NettyHttpConnectClient implements Lifecycle {
private ApplicationContext applicationContext;
private static NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
private static Bootstrap bootstrap = new Bootstrap();
private volatile Channel channel;
private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
private static final Bootstrap bootstrap = new Bootstrap();
private Channel channel;
@Override
public void start() {
@ -97,8 +99,10 @@ public class NettyHttpConnectClient implements Lifecycle, ApplicationContextAwar
exceptionHandler(e);
}
channel.close();
// 若连接失败尝试关闭改channel
if (Objects.nonNull(channel)) {
channel.close();
}
}
/**
@ -143,17 +147,7 @@ public class NettyHttpConnectClient implements Lifecycle, ApplicationContextAwar
if (channel != null) {
channel.close();
}
if (nioEventLoopGroup != null) {
nioEventLoopGroup.shutdownGracefully();
}
nioEventLoopGroup.shutdownGracefully();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
public Channel getChannel() {
return channel;
}
}

View File

@ -0,0 +1,10 @@
package com.aizuda.easy.retry.client.common.netty.server;
/**
* @author: opensnail
* @date : 2024-04-12
* @since : 3.3.0
*/
public class HttpRequest {
}

View File

@ -0,0 +1,21 @@
package com.aizuda.easy.retry.client.common.netty.server;
import lombok.Data;
import java.util.HashMap;
import java.util.Map;
/**
* @author: opensnail
* @date : 2024-04-12
* @since :3.3.0
*/
@Data
public class HttpResponse {
private Map<String, Object> headers = new HashMap<>();
public void setHeader(String key, Object value) {
headers.put(key, value);
}
}

View File

@ -1,18 +1,43 @@
package com.aizuda.easy.retry.client.common.netty.server;
import cn.hutool.core.net.url.UrlBuilder;
import cn.hutool.core.util.ServiceLoaderUtil;
import com.aizuda.easy.retry.client.common.config.EasyRetryProperties;
import com.aizuda.easy.retry.client.common.exception.EasyRetryClientException;
import com.aizuda.easy.retry.client.common.intercepter.HandlerInterceptor;
import com.aizuda.easy.retry.client.common.netty.RequestMethod;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.model.EasyRetryRequest;
import com.aizuda.easy.retry.common.core.model.NettyResult;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @author: opensnail
* @date : 2024-04-11 16:03
@ -20,52 +45,142 @@ import org.springframework.util.ReflectionUtils;
*/
public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private static final ThreadPoolExecutor DISPATCHER_THREAD_POOL = new ThreadPoolExecutor(
16, 16, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
new CustomizableThreadFactory("snail-netty-server-"));
public NettyHttpServerHandler() {
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest)
throws Exception {
protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
UrlBuilder builder = UrlBuilder.ofHttp(fullHttpRequest.uri());
RequestMethod requestMethod = RequestMethod.valueOf(fullHttpRequest.method().name());
EndPointInfo endPointInfo = EndPointInfoCache.get(builder.getPathStr(), requestMethod);
Class<?>[] paramTypes = endPointInfo.getMethod().getParameterTypes();
String content = fullHttpRequest.content().toString(CharsetUtil.UTF_8);
EasyRetryRequest retryRequest = JsonUtil.parseObject(content, EasyRetryRequest.class);
Object[] args = retryRequest.getArgs();
String uri = fullHttpRequest.uri();
String name = fullHttpRequest.method().name();
HttpHeaders headers = fullHttpRequest.headers();
NettyHttpRequest nettyHttpRequest = NettyHttpRequest.builder()
.keepAlive(HttpUtil.isKeepAlive(fullHttpRequest))
.uri(fullHttpRequest.uri())
.channelHandlerContext(channelHandlerContext)
.method(fullHttpRequest.method())
.headers(fullHttpRequest.headers())
.content(fullHttpRequest.content().toString(CharsetUtil.UTF_8))
.build();
// 执行任务
DISPATCHER_THREAD_POOL.execute(() -> {
List<HandlerInterceptor> handlerInterceptors = handlerInterceptors();
EasyRetryRequest retryRequest = JsonUtil.parseObject(content, EasyRetryRequest.class);
HttpRequest httpRequest = new HttpRequest();
HttpResponse httpResponse = new HttpResponse();
EndPointInfo endPointInfo = null;
Result resultObj = null;
Exception e;
try {
EasyRetryProperties properties = SpringContext.getBean(EasyRetryProperties.class);
String easyRetryAuth = headers.getAsString(SystemConstants.EASY_RETRY_AUTH_TOKEN);
String configToken = Optional.ofNullable(properties.getToken()).orElse(SystemConstants.DEFAULT_TOKEN);
if (!configToken.equals(easyRetryAuth)) {
throw new EasyRetryClientException("认证失败.【请检查配置的Token是否正确】");
}
UrlBuilder builder = UrlBuilder.ofHttp(uri);
RequestMethod requestMethod = RequestMethod.valueOf(name);
endPointInfo = EndPointInfoCache.get(builder.getPathStr(), requestMethod);
Class<?>[] paramTypes = endPointInfo.getMethod().getParameterTypes();
Object[] args = retryRequest.getArgs();
Object[] deSerialize = (Object[]) deSerialize(JsonUtil.toJsonString(args), endPointInfo.getMethod());
for (final HandlerInterceptor handlerInterceptor : handlerInterceptors) {
handlerInterceptor.preHandle(httpRequest, httpResponse, endPointInfo);
}
if (paramTypes.length > 0) {
resultObj = (Result) ReflectionUtils.invokeMethod(endPointInfo.getMethod(),
endPointInfo.getExecutor(), deSerialize);
} else {
resultObj = (Result) ReflectionUtils.invokeMethod(endPointInfo.getMethod(),
endPointInfo.getExecutor());
}
for (final HandlerInterceptor handlerInterceptor : handlerInterceptors) {
handlerInterceptor.postHandle(httpRequest, httpResponse, endPointInfo);
}
} catch (Exception ex) {
EasyRetryLog.LOCAL.error("http request error. [{}]", content, ex);
resultObj = new NettyResult(0, ex.getMessage(), null, retryRequest.getReqId());
e = ex;
} finally {
NettyResult nettyResult = new NettyResult();
nettyResult.setRequestId(retryRequest.getReqId());
if (Objects.nonNull(resultObj)) {
nettyResult.setData(resultObj.getData());
nettyResult.setMessage(resultObj.getMessage());
nettyResult.setStatus(resultObj.getStatus());
}
for (final HandlerInterceptor handlerInterceptor : handlerInterceptors) {
handlerInterceptor.afterCompletion(httpRequest, httpResponse, endPointInfo, e);
}
writeResponse(channelHandlerContext, HttpUtil.isKeepAlive(fullHttpRequest), httpResponse,
JsonUtil.toJsonString(nettyResult));
Object resultObj = null;
try {
if (paramTypes.length > 0) {
resultObj = ReflectionUtils.invokeMethod(endPointInfo.getMethod(), endPointInfo.getExecutor(), args);
} else {
resultObj = ReflectionUtils.invokeMethod(endPointInfo.getMethod(), endPointInfo.getExecutor());
}
} catch (Exception e) {
EasyRetryLog.LOCAL.error("http request error. [{}]", content, e);
resultObj = new Result<>(0, e.getMessage());
throw e;
} finally {
writeResponse(channelHandlerContext, HttpUtil.isKeepAlive(fullHttpRequest), JsonUtil.toJsonString(resultObj));
}
});
}
private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {
private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, final HttpResponse httpResponse, String responseJson) {
// write response
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8));
Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE,
HttpHeaderValues.APPLICATION_JSON);
HttpHeaderValues.APPLICATION_JSON);
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
if (keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
Map<String, Object> headers = httpResponse.getHeaders();
headers.forEach((key, value) -> response.headers().set(key, value));
ctx.writeAndFlush(response);
}
public Object deSerialize(String infoStr, Method method) throws JsonProcessingException {
Type[] paramTypes = method.getGenericParameterTypes();
Object[] params = new Object[paramTypes.length];
ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = JsonUtil.toJson(infoStr);
if (Objects.isNull(jsonNode)) {
EasyRetryLog.LOCAL.warn("jsonNode is null. infoStr:[{}]", infoStr);
return params;
}
for (int i = 0; i < paramTypes.length; i++) {
JsonNode node = jsonNode.get(i);
if (Objects.nonNull(node)) {
params[i] = mapper.readValue(node.toString(), mapper.constructType(paramTypes[i]));
}
}
return params;
}
private static List<HandlerInterceptor> handlerInterceptors() {
List<HandlerInterceptor> handlerInterceptors = ServiceLoaderUtil.loadList(HandlerInterceptor.class);
if (CollectionUtils.isEmpty(handlerInterceptors)) {
return Collections.emptyList();
}
return handlerInterceptors.stream().sorted(Comparator.comparingInt(HandlerInterceptor::order)).collect(
Collectors.toList());
}
}

View File

@ -3,11 +3,13 @@ package com.aizuda.easy.retry.client.common.netty.server;
import com.aizuda.easy.retry.client.common.annotation.Mapping;
import com.aizuda.easy.retry.client.common.annotation.SnailEndPoint;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.util.ArrayList;
@ -20,6 +22,7 @@ import java.util.Objects;
* @date 2024-04-11 22:29:07
* @since 3.3.0
*/
@Component
public class SnailEndPointScanner implements ApplicationContextAware {
private ApplicationContext context;
@ -33,7 +36,8 @@ public class SnailEndPointScanner implements ApplicationContextAware {
String[] beanDefinitionNames = context.getBeanNamesForType(Object.class, false, true);
for (String beanDefinitionName : beanDefinitionNames) {
Object bean = context.getBean(beanDefinitionName);
String executorClassName = bean.getClass().getName();
Class executorNotProxy = AopProxyUtils.ultimateTargetClass(bean);
String executorClassName = executorNotProxy.getName();
// 扫描类的注解
SnailEndPoint jobExecutor = bean.getClass().getAnnotation(SnailEndPoint.class);

View File

@ -1,8 +1,10 @@
package com.aizuda.easy.retry.client.core.client;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.client.common.annotation.Authentication;
import com.aizuda.easy.retry.client.common.annotation.Mapping;
import com.aizuda.easy.retry.client.common.annotation.SnailEndPoint;
import com.aizuda.easy.retry.client.common.log.support.EasyRetryLogManager;
import com.aizuda.easy.retry.client.common.netty.RequestMethod;
import com.aizuda.easy.retry.client.core.IdempotentIdGenerate;
import com.aizuda.easy.retry.client.core.RetryArgSerializer;
import com.aizuda.easy.retry.client.common.cache.GroupVersionCache;
@ -30,19 +32,19 @@ import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.log.enums.LogTypeEnum;
import com.aizuda.easy.retry.server.model.dto.ConfigDTO;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.Validation;
import jakarta.validation.Validator;
import jakarta.validation.ValidatorFactory;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.util.ReflectionUtils;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.lang.reflect.Method;
import java.util.Objects;
import java.util.Set;
/**
* 服务端调调用客户端进行重试流量下发配置变更通知等操作
@ -50,9 +52,7 @@ import java.util.Objects;
* @author: www.byteblogs.com
* @date : 2022-03-09 16:33
*/
@RestController
@RequestMapping("/retry")
@Slf4j
@SnailEndPoint
public class RetryEndPoint {
@Autowired
@ -62,9 +62,15 @@ public class RetryEndPoint {
/**
* 服务端调度重试入口
*/
@PostMapping("/dispatch/v1")
@Authentication
public Result<DispatchRetryResultDTO> dispatch(@RequestBody @Validated DispatchRetryDTO executeReqDto) {
@Mapping(path = "/retry/dispatch/v1", method = RequestMethod.POST)
public Result<DispatchRetryResultDTO> dispatch(DispatchRetryDTO executeReqDto) {
ValidatorFactory vf = Validation.buildDefaultValidatorFactory();
Validator validator = vf.getValidator();
Set<ConstraintViolation<DispatchRetryDTO>> set = validator.validate(executeReqDto);
for (final ConstraintViolation<DispatchRetryDTO> violation : set) {
return new Result<>(violation.getMessage(), null);
}
RetryerInfo retryerInfo = RetryerInfoCache.get(executeReqDto.getScene(), executeReqDto.getExecutorName());
if (Objects.isNull(retryerInfo)) {
@ -144,16 +150,21 @@ public class RetryEndPoint {
/**
* 同步版本
*/
@PostMapping("/sync/version/v1")
@Authentication
public Result syncVersion(@RequestBody ConfigDTO configDTO) {
@Mapping(path = "/retry/sync/version/v1", method = RequestMethod.POST)
public Result syncVersion(ConfigDTO configDTO) {
GroupVersionCache.configDTO = configDTO;
return new Result();
}
@PostMapping("/callback/v1")
@Authentication
public Result callback(@RequestBody @Validated RetryCallbackDTO callbackDTO) {
@Mapping(path = "/retry/callback/v1", method = RequestMethod.POST)
public Result callback(RetryCallbackDTO callbackDTO) {
ValidatorFactory vf = Validation.buildDefaultValidatorFactory();
Validator validator = vf.getValidator();
Set<ConstraintViolation<RetryCallbackDTO>> set = validator.validate(callbackDTO);
for (final ConstraintViolation<RetryCallbackDTO> violation : set) {
return new Result<>(violation.getMessage(), null);
}
RetryerInfo retryerInfo = null;
Object[] deSerialize = null;
@ -264,9 +275,15 @@ public class RetryEndPoint {
* @return idempotentId
*/
@PostMapping("/generate/idempotent-id/v1")
@Authentication
public Result<String> idempotentIdGenerate(
@RequestBody @Validated GenerateRetryIdempotentIdDTO generateRetryIdempotentIdDTO) {
GenerateRetryIdempotentIdDTO generateRetryIdempotentIdDTO) {
ValidatorFactory vf = Validation.buildDefaultValidatorFactory();
Validator validator = vf.getValidator();
Set<ConstraintViolation<GenerateRetryIdempotentIdDTO>> set = validator.validate(generateRetryIdempotentIdDTO);
for (final ConstraintViolation<GenerateRetryIdempotentIdDTO> violation : set) {
return new Result<>(violation.getMessage(), null);
}
String scene = generateRetryIdempotentIdDTO.getScene();
String executorName = generateRetryIdempotentIdDTO.getExecutorName();

View File

@ -1,6 +1,5 @@
package com.aizuda.easy.retry.client.core.loader;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.util.ServiceLoaderUtil;
import com.aizuda.easy.retry.client.core.expression.ExpressionInvocationHandler;
import com.aizuda.easy.retry.common.core.expression.ExpressionEngine;

View File

@ -1,6 +1,5 @@
package com.aizuda.easy.retry.client.job.core.client;
import com.aizuda.easy.retry.client.common.annotation.Authentication;
import com.aizuda.easy.retry.client.common.annotation.Mapping;
import com.aizuda.easy.retry.client.common.annotation.SnailEndPoint;
import com.aizuda.easy.retry.client.common.log.support.EasyRetryLogManager;
@ -12,6 +11,7 @@ import com.aizuda.easy.retry.client.job.core.dto.JobExecutorInfo;
import com.aizuda.easy.retry.client.job.core.executor.AbstractJobExecutor;
import com.aizuda.easy.retry.client.job.core.executor.AnnotationJobExecutor;
import com.aizuda.easy.retry.client.job.core.log.JobLogMeta;
import com.aizuda.easy.retry.client.model.DispatchRetryDTO;
import com.aizuda.easy.retry.client.model.StopJobDTO;
import com.aizuda.easy.retry.client.model.request.DispatchJobRequest;
import com.aizuda.easy.retry.common.core.context.SpringContext;
@ -19,11 +19,15 @@ import com.aizuda.easy.retry.common.core.model.JobContext;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.common.log.enums.LogTypeEnum;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.Validation;
import jakarta.validation.Validator;
import jakarta.validation.ValidatorFactory;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
/**
@ -34,7 +38,14 @@ import java.util.concurrent.ThreadPoolExecutor;
public class JobEndPoint {
@Mapping(path = "/job/dispatch/v1", method = RequestMethod.POST)
public Result<Boolean> dispatchJob(@RequestBody @Validated DispatchJobRequest dispatchJob) {
public Result<Boolean> dispatchJob(DispatchJobRequest dispatchJob) {
ValidatorFactory vf = Validation.buildDefaultValidatorFactory();
Validator validator = vf.getValidator();
Set<ConstraintViolation<DispatchJobRequest>> set = validator.validate(dispatchJob);
for (final ConstraintViolation<DispatchJobRequest> violation : set) {
return new Result<>(violation.getMessage(), Boolean.FALSE);
}
try {
JobContext jobContext = buildJobContext(dispatchJob);
@ -44,7 +55,7 @@ public class JobEndPoint {
if (Objects.nonNull(dispatchJob.getRetryCount()) && dispatchJob.getRetryCount() > 0) {
EasyRetryLog.REMOTE.info("任务执行/调度失败执行重试. 重试次数:[{}]",
dispatchJob.getRetryCount());
dispatchJob.getRetryCount());
}
JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(jobContext.getExecutorInfo());
@ -106,9 +117,16 @@ public class JobEndPoint {
return jobContext;
}
@PostMapping("/stop/v1")
@Authentication
public Result<Boolean> stopJob(@RequestBody @Validated StopJobDTO interruptJob) {
@Mapping(path = "/stop/v1", method = RequestMethod.POST)
public Result<Boolean> stopJob(StopJobDTO interruptJob) {
ValidatorFactory vf = Validation.buildDefaultValidatorFactory();
Validator validator = vf.getValidator();
Set<ConstraintViolation<StopJobDTO>> set = validator.validate(interruptJob);
for (final ConstraintViolation<StopJobDTO> violation : set) {
return new Result<>(violation.getMessage(), Boolean.FALSE);
}
ThreadPoolExecutor threadPool = ThreadPoolCache.getThreadPool(interruptJob.getTaskBatchId());
if (Objects.isNull(threadPool) || threadPool.isShutdown() || threadPool.isTerminated()) {
return new Result<>(Boolean.TRUE);

View File

@ -8,6 +8,7 @@ import com.aizuda.easy.retry.client.job.core.dto.JobArgs;
import com.aizuda.easy.retry.client.job.core.dto.JobExecutorInfo;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
@ -54,14 +55,14 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware {
EasyRetryLog.LOCAL.error("{} JobExecutor加载异常{}", beanDefinitionName, ex);
}
String executorClassName = bean.getClass().getName();
Class executorNotProxy = AopProxyUtils.ultimateTargetClass(bean);
String executorClassName = executorNotProxy.getName();
// 通过实现接口进行注册
if (IJobExecutor.class.isAssignableFrom(bean.getClass())) {
if (!JobExecutorInfoCache.isExisted(executorClassName)) {
jobExecutorInfoList.add(new JobExecutorInfo(executorClassName, ReflectionUtils.findMethod(bean.getClass(), "jobExecute"), bean));
}
}
// 扫描类的注解

View File

@ -1,11 +1,13 @@
package com.aizuda.easy.retry.common.core.model;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* @author: www.byteblogs.com
* @date : 2022-02-16 14:07
*/
@EqualsAndHashCode(callSuper = true)
@Data
public class NettyResult extends Result<Object> {

View File

@ -1,13 +1,16 @@
package com.aizuda.easy.retry.server.common.client;
import cn.hutool.core.date.StopWatch;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import cn.hutool.core.util.URLUtil;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.model.EasyRetryRequest;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.util.NetUtil;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.cache.CacheToken;
import com.aizuda.easy.retry.server.common.client.annotation.Body;
@ -17,18 +20,22 @@ import com.aizuda.easy.retry.server.common.client.annotation.Param;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.server.common.netty.client.NettyChannel;
import com.aizuda.easy.retry.server.common.netty.client.RpcContext;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import io.netty.handler.codec.http.CombinedHttpHeaders;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.util.CollectionUtils;
import org.springframework.web.client.ResourceAccessException;
@ -42,8 +49,12 @@ import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* 请求处理器
@ -68,8 +79,8 @@ public class RpcClientInvokeHandler implements InvocationHandler {
private final Integer routeKey;
private final String allocKey;
private final Integer executorTimeout;
private final String namespaceId;
private final boolean async;
public RpcClientInvokeHandler(final String groupName, final RegisterNodeInfo registerNodeInfo,
final boolean failRetry, final int retryTimes,
@ -89,6 +100,7 @@ public class RpcClientInvokeHandler implements InvocationHandler {
this.allocKey = allocKey;
this.executorTimeout = executorTimeout;
this.namespaceId = namespaceId;
this.async = false;
}
@Override
@ -134,33 +146,56 @@ public class RpcClientInvokeHandler implements InvocationHandler {
Assert.notNull(parasResult.body, () -> new EasyRetryServerException("body cannot be null"));
}
RestTemplate restTemplate = SpringContext.getBean(RestTemplate.class);
Retryer<Result> retryer = buildResultRetryer();
HttpHeaders requestHeaders = parasResult.requestHeaders;
if (Objects.nonNull(executorTimeout)) {
requestHeaders.set(RequestInterceptor.TIMEOUT_TIME, String.valueOf(executorTimeout));
}
// 统一设置Token
requestHeaders.set(SystemConstants.EASY_RETRY_AUTH_TOKEN, CacheToken.get(groupName, namespaceId));
EasyRetryRequest easyRetryRequest = new EasyRetryRequest(args);
Result result = retryer.call(() -> {
ResponseEntity<Result> response = restTemplate.exchange(
// 拼接 url?a=1&b=1
getUrl(mapping, parasResult.paramMap).toString(),
// post or get
HttpMethod.valueOf(mapping.method().name()),
// body
new HttpEntity<>(parasResult.body, parasResult.requestHeaders),
// 返回值类型
Result.class);
return Objects.requireNonNull(response.getBody());
StopWatch sw = new StopWatch();
sw.start("request start " + easyRetryRequest.getReqId());
CompletableFuture completableFuture = null;
if (async) {
// RpcContext.setCompletableFuture(easyRetryRequest.getReqId(), null);
} else {
completableFuture = new CompletableFuture<>();
RpcContext.setCompletableFuture(easyRetryRequest.getReqId(), completableFuture);
}
try {
NettyChannel.send(hostId, hostIp, hostPort,
HttpMethod.valueOf(mapping.method().name()), // 拼接 url?a=1&b=1
mapping.path(), easyRetryRequest.toString(), requestHeaders);
} finally {
sw.stop();
}
EasyRetryLog.LOCAL.debug("request complete requestId:[{}] 耗时:[{}ms]", easyRetryRequest.getReqId(),
sw.getTotalTimeMillis());
if (async) {
return null;
} else {
Assert.notNull(completableFuture, () -> new EasyRetryServerException("completableFuture is null"));
try {
return (Result) completableFuture.get(Optional.ofNullable(executorTimeout).orElse(20), TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw new EasyRetryServerException("Request to remote interface exception. path:[{}]",
mapping.path());
} catch (TimeoutException e) {
throw new EasyRetryServerException("Request to remote interface timed out. path:[{}]",
mapping.path());
}
}
});
log.debug("Request client success. count:[{}] clientId:[{}] clientAddr:[{}:{}] serverIp:[{}]", count, hostId,
log.debug("Request client success. count:[{}] clientId:[{}] clientAddr:[{}:{}] serverIp:[{}]", count,
hostId,
hostIp, hostPort, NetUtil.getLocalIpStr());
return result;
@ -239,7 +274,7 @@ public class RpcClientInvokeHandler implements InvocationHandler {
private ParseParasResult doParseParams(Method method, Object[] args) {
Object body = null;
HttpHeaders requestHeaders = new HttpHeaders();
DefaultHttpHeaders requestHeaders = new DefaultHttpHeaders();
Map<String, Object> paramMap = new HashMap<>();
// 解析参数
Parameter[] parameters = method.getParameters();
@ -267,7 +302,7 @@ public class RpcClientInvokeHandler implements InvocationHandler {
private static class ParseParasResult {
private Object body = null;
private HttpHeaders requestHeaders;
private DefaultHttpHeaders requestHeaders;
private Map<String, Object> paramMap;
}
}

View File

@ -0,0 +1,143 @@
package com.aizuda.easy.retry.server.common.netty.client;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.triple.Pair;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import lombok.extern.slf4j.Slf4j;
import java.net.ConnectException;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* @author www.byteblogs.com
* @date 2023-05-13
* @since 1.3.0
*/
@Slf4j
public class NettyChannel {
private NettyChannel() {
}
private static Bootstrap bootstrap;
private static ConcurrentHashMap<Pair<String, String>, Channel> CHANNEL_MAP = new ConcurrentHashMap<>(16);
public static void setChannel(String hostId, String ip, Channel channel) {
CHANNEL_MAP.put(Pair.of(hostId, ip), channel);
}
public static void removeChannel(Channel channel) {
CHANNEL_MAP.forEach((key, value) -> {
if (value.equals(channel)) {
CHANNEL_MAP.remove(key);
}
});
}
public static void setBootstrap(Bootstrap bootstrap) {
NettyChannel.bootstrap = bootstrap;
}
/**
* 发送数据
*
* @param method 请求方式
* @param url url地址
* @param body 请求的消息体
* @throws InterruptedException
*/
public static void send(String hostId, String hostIp, Integer port, HttpMethod method, String url, String body, HttpHeaders requestHeaders) throws InterruptedException {
Channel channel = CHANNEL_MAP.get(Pair.of(hostId, hostIp));
if (Objects.isNull(channel) || !channel.isActive()) {
channel = connect(hostId, hostIp, port);
if (Objects.isNull(channel)) {
EasyRetryLog.LOCAL.error("send message but channel is null url:[{}] method:[{}] body:[{}] ", url, method, body);
return;
}
}
// 配置HttpRequest的请求数据和一些配置信息
FullHttpRequest request = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, method, url, Unpooled.wrappedBuffer(body.getBytes(StandardCharsets.UTF_8)));
request.headers()
.set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
//开启长连接
.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
//设置传递请求内容的长度
.set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes())
;
request.headers().setAll(requestHeaders);
//发送数据
channel.writeAndFlush(request).sync();
}
/**
* 连接客户端
*
* @return
*/
public static Channel connect(String hostId, String ip, Integer port) {
try {
ChannelFuture channelFuture = bootstrap
.remoteAddress(ip, port)
.connect();
boolean notTimeout = channelFuture.awaitUninterruptibly(30, TimeUnit.SECONDS);
Channel channel = channelFuture.channel();
if (notTimeout) {
// 连接成功
if (channel != null && channel.isActive()) {
EasyRetryLog.LOCAL.info("netty client started {} connect to server", channel.localAddress());
NettyChannel.setChannel(hostId, ip, channel);
return channel;
}
Throwable cause = channelFuture.cause();
if (cause != null) {
exceptionHandler(cause);
}
} else {
EasyRetryLog.LOCAL.warn("connect remote host[{}] timeout {}s", channel.remoteAddress(), 30);
}
} catch (Exception e) {
exceptionHandler(e);
}
return null;
}
/**
* 连接失败处理
*
* @param cause
*/
private static void exceptionHandler(Throwable cause) {
if (cause instanceof ConnectException) {
EasyRetryLog.LOCAL.error("connect error:{}", cause.getMessage());
} else if (cause instanceof ClosedChannelException) {
EasyRetryLog.LOCAL.error("connect error:{}", "client has destroy");
} else {
EasyRetryLog.LOCAL.error("connect error:", cause);
}
}
}

View File

@ -0,0 +1,93 @@
package com.aizuda.easy.retry.server.common.netty.client;
import com.aizuda.easy.retry.common.core.model.NettyResult;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
/**
* netty 客户端处理器
*
* @author: www.byteblogs.com
* @date : 2022-03-07 18:30
* @since 1.0.0
*/
@Slf4j
public class NettyHttpClientHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
public NettyHttpClientHandler() {
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception {
FullHttpResponse response = msg;
String content = response.content().toString(CharsetUtil.UTF_8);
HttpHeaders headers = response.headers();
EasyRetryLog.LOCAL.info("Receive server data content:[{}], headers:[{}]", content, headers);
NettyResult nettyResult = JsonUtil.parseObject(content, NettyResult.class);
RpcContext.invoke(nettyResult.getRequestId(), nettyResult);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
EasyRetryLog.LOCAL.debug("channelRegistered");
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
EasyRetryLog.LOCAL.debug("channelUnregistered");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
EasyRetryLog.LOCAL.debug("channelActive");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
EasyRetryLog.LOCAL.debug("channelInactive");
NettyChannel.removeChannel(ctx.channel());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
EasyRetryLog.LOCAL.debug("channelReadComplete");
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
super.channelWritabilityChanged(ctx);
EasyRetryLog.LOCAL.debug("channelWritabilityChanged");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
EasyRetryLog.LOCAL.error("easy-retry netty-http client exception", cause);
super.exceptionCaught(ctx, cause);
NettyChannel.removeChannel(ctx.channel());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
NettyChannel.removeChannel(ctx.channel());
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
EasyRetryLog.LOCAL.debug("userEventTriggered");
}
}

View File

@ -0,0 +1,73 @@
package com.aizuda.easy.retry.server.common.netty.client;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.Getter;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.net.ConnectException;
import java.nio.channels.ClosedChannelException;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* 初始化bootstrap
*
* @author: www.byteblogs.com
* @date : 2022-03-07 18:24
* @since 1.0.0
*/
@Getter
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class NettyHttpConnectClient implements Lifecycle {
private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
private static final Bootstrap bootstrap = new Bootstrap();
@Override
public void start() {
try {
bootstrap.group(nioEventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30, TimeUnit.SECONDS))
.addLast(new HttpClientCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024))
.addLast(new NettyHttpClientHandler());
}
})
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
NettyChannel.setBootstrap(bootstrap);
} catch (Exception e) {
EasyRetryLog.LOCAL.error("Client start exception", e);
}
}
@Override
public void close() {
}
}

View File

@ -0,0 +1,71 @@
package com.aizuda.easy.retry.server.common.netty.client;
import com.aizuda.easy.retry.common.core.model.NettyResult;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
/**
* 处理请求服务端时需要存储的中间值
*
* @author: www.byteblogs.com
* @date : 2023-05-12 09:05
* @since 1.3.0
*/
@Slf4j
public class RpcContext {
private static final ConcurrentMap<Long, CompletableFuture> COMPLETABLE_FUTURE = new ConcurrentHashMap<>();
private static final ConcurrentMap<Long, Consumer> CALLBACK_CONSUMER = new ConcurrentHashMap<>();
public static void invoke(Long requestId, NettyResult nettyResult) {
try {
// 同步请求同步返回
Optional.ofNullable(getCompletableFuture(requestId)).ifPresent(completableFuture -> completableFuture.complete(nettyResult));
// 异步请求进行回调
Optional.ofNullable(getConsumer(requestId)).ifPresent(consumer -> consumer.accept(nettyResult));
} catch (Exception e) {
EasyRetryLog.LOCAL.error("回调处理失败 requestId:[{}]",requestId, e );
} finally {
COMPLETABLE_FUTURE.remove(requestId);
CALLBACK_CONSUMER.remove(requestId);
}
}
public static <R> void setCompletableFuture(long id, CompletableFuture<R> completableFuture, Consumer<R> callable) {
if (Objects.nonNull(completableFuture)) {
COMPLETABLE_FUTURE.put(id, completableFuture);
}
if (Objects.nonNull(callable)) {
CALLBACK_CONSUMER.put(id, callable);
}
}
public static <R> void setCompletableFuture(Long id, Consumer<R> callable) {
setCompletableFuture(id, null, callable);
}
public static <R> void setCompletableFuture(Long id, CompletableFuture<R> completableFuture) {
setCompletableFuture(id, completableFuture, null);
}
public static CompletableFuture<Object> getCompletableFuture(Long id) {
return COMPLETABLE_FUTURE.get(id);
}
public static Consumer getConsumer(Long id) {
return CALLBACK_CONSUMER.get(id);
}
}

View File

@ -21,7 +21,7 @@
<java.version>17</java.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<revision>3.2.0</revision>
<revision>3.3.0-SNAPSHOT</revision>
<dingding-talk.version>1.0.0</dingding-talk.version>
<netty-all.version>4.1.94.Final</netty-all.version>
<hutool-all.version>5.8.25</hutool-all.version>