feat: 2.0.0

1. 新增调用客户端代理类
2. 新增下线路由剔除功能
3. 新增路由转移功能
This commit is contained in:
byteblogs168 2023-06-19 23:00:35 +08:00
parent b348ba0b0a
commit 510b651bcd
10 changed files with 409 additions and 22 deletions

View File

@ -0,0 +1,83 @@
package com.aizuda.easy.retry.server.client;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import java.lang.reflect.Proxy;
import java.util.Objects;
/**
* 构建请求类型
*
* @author: www.byteblogs.com
* @date : 2023-06-19 16:47
* @since 2.0.0
*/
public class RequestBuilder<T, R> {
private Class<T> clintInterface;
private String groupName;
private String hostId;
private String hostIp;
private Integer hostPort;
private String contextPath;
public static <T, R> RequestBuilder<T, R> newBuilder() {
return new RequestBuilder<>();
}
public RequestBuilder<T, R> client(Class<T> clintInterface) {
this.clintInterface = clintInterface;
return this;
}
public RequestBuilder<T, R> hostPort(Integer hostPort) {
this.hostPort = hostPort;
return this;
}
public RequestBuilder<T, R> contextPath(String contextPath) {
this.contextPath = contextPath;
return this;
}
public RequestBuilder<T, R> hostId(String hostId) {
this.hostId = hostId;
return this;
}
public RequestBuilder<T, R> hostIp(String hostIp) {
this.hostIp = hostIp;
return this;
}
public RequestBuilder<T, R> groupName(String groupName) {
this.groupName = groupName;
return this;
}
public T build() {
if (Objects.isNull(clintInterface)) {
throw new EasyRetryServerException("clintInterface cannot be null");
}
Assert.notBlank(groupName, () -> new EasyRetryServerException("groupName cannot be null"));
Assert.notBlank(hostId, () -> new EasyRetryServerException("hostId cannot be null"));
Assert.notBlank(hostIp, () -> new EasyRetryServerException("hostIp cannot be null"));
Assert.notNull(hostPort, () -> new EasyRetryServerException("hostPort cannot be null"));
Assert.notBlank(contextPath, () -> new EasyRetryServerException("contextPath cannot be null"));
try {
clintInterface = (Class<T>) Class.forName(clintInterface.getName());
} catch (Exception e) {
throw new EasyRetryServerException("class not found exception to: [{}]", clintInterface.getName());
}
RpcClientInvokeHandler clientInvokeHandler = new RpcClientInvokeHandler(groupName, hostId, hostIp, hostPort, contextPath);
return (T) Proxy.newProxyInstance(clintInterface.getClassLoader(),
new Class[]{clintInterface}, clientInvokeHandler);
}
}

View File

@ -0,0 +1,16 @@
package com.aizuda.easy.retry.server.client;
/**
* 请求类型
*
* @author: www.byteblogs.com
* @date : 2023-06-19 08:53
* @since 2.0.0
*/
public enum RequestMethod {
GET,
POST;
RequestMethod() {
}
}

View File

@ -0,0 +1,27 @@
package com.aizuda.easy.retry.server.client;
import com.aizuda.easy.retry.client.model.DispatchRetryDTO;
import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO;
import com.aizuda.easy.retry.client.model.RetryCallbackDTO;
import com.aizuda.easy.retry.common.core.model.EasyRetryHeaders;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.server.client.annotation.Body;
import com.aizuda.easy.retry.server.client.annotation.Header;
import com.aizuda.easy.retry.server.client.annotation.Mapping;
/**
* 调用客户端接口
*
* @author: www.byteblogs.com
* @date : 2023-06-19 15:40
* @since 2.0.0
*/
public interface RpcClient {
@Mapping(path = "/retry/dispatch/v1", method = RequestMethod.POST)
Result<DispatchRetryResultDTO> dispatch(@Body DispatchRetryDTO dispatchRetryDTO, @Header EasyRetryHeaders headers);
@Mapping(path = "/retry/callback/v1", method = RequestMethod.POST)
Result callback(@Body RetryCallbackDTO retryCallbackDTO);
}

View File

@ -0,0 +1,170 @@
package com.aizuda.easy.retry.server.client;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.URLUtil;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.client.annotation.Body;
import com.aizuda.easy.retry.server.client.annotation.Header;
import com.aizuda.easy.retry.server.client.annotation.Mapping;
import com.aizuda.easy.retry.server.client.annotation.Param;
import com.aizuda.easy.retry.server.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.support.handler.ClientNodeAllocateHandler;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.util.CollectionUtils;
import org.springframework.web.client.ResourceAccessException;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* 请求处理器
*
* @author: www.byteblogs.com
* @date : 2023-05-11 21:45
* @since 2.0.0
*/
@Slf4j
public class RpcClientInvokeHandler implements InvocationHandler {
public static final String URL = "http://{0}:{1}/{2}";
private final String groupName;
private String hostId;
private String hostIp;
private Integer hostPort;
private String contextPath;
public RpcClientInvokeHandler(
final String groupName,
final String hostId,
final String hostIp,
final Integer hostPort,
final String contextPath) {
this.groupName = groupName;
this.hostId = hostId;
this.hostIp = hostIp;
this.hostPort = hostPort;
this.contextPath = contextPath;
}
@Override
public Result invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
Mapping annotation = method.getAnnotation(Mapping.class);
Assert.notNull(annotation, () -> new EasyRetryServerException("@Mapping cannot be null"));
Set<RegisterNodeInfo> serverNodeSet = CacheRegisterTable.getServerNodeSet(groupName);
// 最多调用size次
int size = serverNodeSet.size();
for (int count = 1; count <= size; count++) {
log.info("Start request client. count:[{}] hostId:[{}] addr:[{}:{}]", count, hostId, hostIp, hostPort);
Result result = requestRemote(method, args, annotation, count);
if (Objects.nonNull(result)) {
return result;
}
}
throw new EasyRetryServerException("No available nodes.");
}
private Result requestRemote(Method method, Object[] args, Mapping mapping, int count) {
try {
Object body = null;
HttpHeaders requestHeaders = new HttpHeaders();
Map<String, Object> paramMap = new HashMap<>();
// 解析参数
Parameter[] parameters = method.getParameters();
for (int i = 0; i < parameters.length; i++) {
Parameter parameter = parameters[i];
if (parameter.isAnnotationPresent(Body.class)) {
body = args[i];
} else if ((parameter.isAnnotationPresent(Header.class))) {
requestHeaders.add(SystemConstants.EASY_RETRY_HEAD_KEY, JsonUtil.toJsonString(args[i]));
} else if ((parameter.isAnnotationPresent(Param.class))) {
paramMap.put(parameter.getAnnotation(Param.class).name(), args[i]);
} else {
throw new EasyRetryServerException("parameter error");
}
}
// 若是POST请求请求体不能是null
if (RequestMethod.POST.name().equals(mapping.method().name())) {
Assert.notNull(body, () -> new EasyRetryServerException("body cannot be null"));
}
// 拼接 url?a=1&b=1
RestTemplate restTemplate = SpringContext.CONTEXT.getBean(RestTemplate.class);
ResponseEntity<Result> response = restTemplate.exchange(
getUrl(mapping, paramMap).toString(),
HttpMethod.valueOf(mapping.method().name()),
new HttpEntity<>(body, requestHeaders),
Result.class);
log.info("Request client success. count:[{}] hostId:[{}] addr:[{}:{}]", count, hostId, hostIp, hostPort);
return Objects.requireNonNull(response.getBody());
} catch (RestClientException ex) {
// 网络异常
if (ex instanceof ResourceAccessException) {
log.error("request client I/O error, count:[{}] hostId:[{}] addr:[{}:{}]", count, hostId, hostIp, hostPort, ex);
// 进行路由剔除处理
CacheRegisterTable.remove(groupName, hostId);
// 重新选一个可用的客户端节点
ClientNodeAllocateHandler clientNodeAllocateHandler = SpringContext.CONTEXT.getBean(
ClientNodeAllocateHandler.class);
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(groupName);
// 这里表示无可用节点
if (Objects.isNull(serverNode)) {
throw ex;
}
this.hostId = serverNode.getHostId();
this.hostPort = serverNode.getHostPort();
this.hostIp = serverNode.getHostIp();
this.contextPath = serverNode.getContextPath();
} else {
// 其他异常继续抛出
log.error("request client error.count:[{}] hostId:[{}] addr:[{}:{}]", count, hostId, hostIp, hostPort, ex);
throw ex;
}
} catch (Exception ex) {
log.error("request client unknown exception. count:[{}] hostId:[{}] addr:[{}:{}]", count, hostId, hostIp, hostPort, ex);
throw ex;
}
return null;
}
@NotNull
private StringBuilder getUrl(Mapping mapping, Map<String, Object> paramMap) {
StringBuilder url = new StringBuilder(MessageFormat.format(URL, hostIp, hostPort.toString(), contextPath));
url.append(mapping.path());
if (!CollectionUtils.isEmpty(paramMap)) {
String query = URLUtil.buildQuery(paramMap, null);
url.append("?").append(query);
}
return url;
}
}

View File

@ -0,0 +1,18 @@
package com.aizuda.easy.retry.server.client.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author: shuguang.zhang
* @date : 2023-06-19 16:02
*/
@Target({ElementType.PARAMETER})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Body {
}

View File

@ -0,0 +1,18 @@
package com.aizuda.easy.retry.server.client.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author: shuguang.zhang
* @date : 2023-06-19 16:02
*/
@Target({ElementType.PARAMETER})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Header {
}

View File

@ -0,0 +1,27 @@
package com.aizuda.easy.retry.server.client.annotation;
import com.aizuda.easy.retry.server.client.RequestMethod;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 接口定义
*
* @author: www.byteblogs.com
* @date : 2023-05-11 22:32
* @since 1.3.0
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Mapping {
RequestMethod method() default RequestMethod.GET;
String path() default "";
}

View File

@ -0,0 +1,18 @@
package com.aizuda.easy.retry.server.client.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author: shuguang.zhang
* @date : 2023-06-19 16:10
*/
@Target({ElementType.PARAMETER})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Param {
String name();
}

View File

@ -3,12 +3,12 @@ package com.aizuda.easy.retry.server.support.dispatch.actor.exec;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.client.model.RetryCallbackDTO;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.server.akka.ActorGenerator;
import com.aizuda.easy.retry.server.client.RequestBuilder;
import com.aizuda.easy.retry.server.client.RpcClient;
import com.aizuda.easy.retry.server.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.model.EasyRetryHeaders;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
@ -22,12 +22,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.text.MessageFormat;
import java.util.Objects;
import java.util.concurrent.Callable;
@ -49,8 +46,6 @@ public class ExecCallbackUnitActor extends AbstractActor {
@Autowired
@Qualifier("bitSetIdempotentStrategyHandler")
private IdempotentStrategy<String, Integer> idempotentStrategy;
@Autowired
private RestTemplate restTemplate;
@Override
public Receive createReceive() {
@ -109,7 +104,7 @@ public class ExecCallbackUnitActor extends AbstractActor {
* @param retryTask {@link RetryTask} 需要重试的数据
* @return 重试结果返回值
*/
private Result<Void> callClient(RetryTask retryTask, RegisterNodeInfo serverNode) {
private Result callClient(RetryTask retryTask, RegisterNodeInfo serverNode) {
// 回调参数
RetryCallbackDTO retryCallbackDTO = new RetryCallbackDTO();
@ -121,13 +116,22 @@ public class ExecCallbackUnitActor extends AbstractActor {
retryCallbackDTO.setExecutorName(retryTask.getExecutorName());
retryCallbackDTO.setUniqueId(retryTask.getUniqueId());
HttpEntity<RetryCallbackDTO> requestEntity = new HttpEntity<>(retryCallbackDTO);
// HttpEntity<RetryCallbackDTO> requestEntity = new HttpEntity<>(retryCallbackDTO);
//
// String format = MessageFormat.format(URL, serverNode.getHostIp(), serverNode.getHostPort().toString(), serverNode.getContextPath());
// Result result = restTemplate.postForObject(format, requestEntity, Result.class);
String format = MessageFormat.format(URL, serverNode.getHostIp(), serverNode.getHostPort().toString(), serverNode.getContextPath());
Result result = restTemplate.postForObject(format, requestEntity, Result.class);
LogUtils.info(log, "请求客户端 format:[{}] response:[{}}] ", format, JsonUtil.toJsonString(result));
return result;
RpcClient rpcClient = RequestBuilder.<RpcClient, Result>newBuilder()
.hostPort(serverNode.getHostPort())
.groupName(serverNode.getGroupName())
.hostId(serverNode.getHostId())
.hostIp(serverNode.getHostIp())
.contextPath(serverNode.getContextPath())
.client(RpcClient.class)
.build();
return rpcClient.callback(retryCallbackDTO);
}

View File

@ -10,6 +10,8 @@ import com.aizuda.easy.retry.common.core.model.EasyRetryHeaders;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.akka.ActorGenerator;
import com.aizuda.easy.retry.server.client.RequestBuilder;
import com.aizuda.easy.retry.server.client.RpcClient;
import com.aizuda.easy.retry.server.enums.StatusEnum;
import com.aizuda.easy.retry.server.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
@ -50,8 +52,6 @@ public class ExecUnitActor extends AbstractActor {
@Autowired
@Qualifier("bitSetIdempotentStrategyHandler")
private IdempotentStrategy<String, Integer> idempotentStrategy;
@Autowired
private RestTemplate restTemplate;
@Override
public Receive createReceive() {
@ -137,14 +137,20 @@ public class ExecUnitActor extends AbstractActor {
easyRetryHeaders.setEasyRetryId(retryTask.getUniqueId());
requestHeaders.add(SystemConstants.EASY_RETRY_HEAD_KEY, JsonUtil.toJsonString(easyRetryHeaders));
HttpEntity<DispatchRetryDTO> requestEntity = new HttpEntity<>(dispatchRetryDTO, requestHeaders);
String format = MessageFormat.format(URL, serverNode.getHostIp(), serverNode.getHostPort().toString(), serverNode.getContextPath());
Result<DispatchRetryResultDTO> result = restTemplate.postForObject(format, requestEntity, Result.class);
LogUtils.info(log, "请求客户端 format:[{}] response:[{}}] ", format, JsonUtil.toJsonString(result));
return result;
RpcClient rpcClient = RequestBuilder.<RpcClient, Result>newBuilder()
.hostPort(serverNode.getHostPort())
.groupName(serverNode.getGroupName())
.hostId(serverNode.getHostId())
.hostIp(serverNode.getHostIp())
.contextPath(serverNode.getContextPath())
.client(RpcClient.class)
.build();
// HttpEntity<DispatchRetryDTO> requestEntity = new HttpEntity<>(dispatchRetryDTO, requestHeaders);
//
// String format = MessageFormat.format(URL, serverNode.getHostIp(), serverNode.getHostPort().toString(), serverNode.getContextPath());
// Result<DispatchRetryResultDTO> result = restTemplate.postForObject(format, requestEntity, Result.class);
return rpcClient.dispatch(dispatchRetryDTO, easyRetryHeaders);
}