feat: 完成重试完成和重试最大次数时回调客户端

This commit is contained in:
byteblogs168 2023-01-10 15:47:20 +08:00
parent 33013948cd
commit 1e3660b0c0
13 changed files with 280 additions and 21 deletions

View File

@ -2,6 +2,8 @@ package com.x.retry.client.core.annotation;
import com.x.retry.client.core.BizIdGenerate;
import com.x.retry.client.core.callback.RetryCompleteCallback;
import com.x.retry.client.core.callback.SimpleRetryCompleteCallback;
import com.x.retry.client.core.generator.SimpleBizIdGenerate;
import com.x.retry.client.core.retryer.RetryType;
import com.x.retry.client.core.strategy.RetryAnnotationMethod;
@ -53,6 +55,13 @@ public @interface Retryable {
*/
Class<? extends BizIdGenerate> bizId() default SimpleBizIdGenerate.class;
/**
* 服务端重试完成(重试成功重试到达最大次数)回调客户端
*
* @return
*/
Class<? extends RetryCompleteCallback> retryCompleteCallback() default SimpleRetryCompleteCallback.class;
/**
* bizNo spel表达式
*/

View File

@ -0,0 +1,12 @@
package com.x.retry.client.core.callback;
/**
* @author: www.byteblogs.com
* @date : 2023-01-10 14:46
*/
public interface RetryCompleteCallback {
void doSuccessCallback(String sceneName, String executorName, Object[] params);
void doMaxRetryCallback(String sceneName, String executorName, Object[] params);
}

View File

@ -0,0 +1,21 @@
package com.x.retry.client.core.callback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @author: www.byteblogs.com
* @date : 2023-01-10 14:47
*/
@Component
@Slf4j
public class SimpleRetryCompleteCallback implements RetryCompleteCallback {
@Override
public void doSuccessCallback(String sceneName, String executorName, Object[] params) {
}
@Override
public void doMaxRetryCallback(String sceneName, String executorName, Object[] params) {
}
}

View File

@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.x.retry.client.core.RetryArgSerializer;
import com.x.retry.client.core.cache.GroupVersionCache;
import com.x.retry.client.core.cache.RetryerInfoCache;
import com.x.retry.client.core.callback.RetryCompleteCallback;
import com.x.retry.client.core.exception.XRetryClientException;
import com.x.retry.client.core.intercepter.RetrySiteSnapshot;
import com.x.retry.client.core.retryer.RetryerInfo;
@ -12,7 +13,10 @@ import com.x.retry.client.core.serializer.JacksonSerializer;
import com.x.retry.client.core.strategy.RetryStrategy;
import com.x.retry.client.model.DispatchRetryDTO;
import com.x.retry.client.model.DispatchRetryResultDTO;
import com.x.retry.client.model.RetryCallbackDTO;
import com.x.retry.common.core.context.SpringContext;
import com.x.retry.common.core.enums.RetryResultStatusEnum;
import com.x.retry.common.core.enums.RetryStatusEnum;
import com.x.retry.common.core.model.Result;
import com.x.retry.common.core.util.JsonUtil;
import com.x.retry.server.model.dto.ConfigDTO;
@ -90,4 +94,34 @@ public class RetryEndPoint {
GroupVersionCache.configDTO = configDTO;
return new Result();
}
@PostMapping("/callback/v1")
public Result callback(@RequestBody RetryCallbackDTO callbackDTO) {
RetryerInfo retryerInfo = RetryerInfoCache.get(callbackDTO.getScene(), callbackDTO.getExecutorName());
if (Objects.isNull(retryerInfo)) {
throw new XRetryClientException("场景:[{}]配置不存在", callbackDTO.getScene());
}
RetryArgSerializer retryArgSerializer = new JacksonSerializer();
Object[] deSerialize = null;
try {
deSerialize = (Object[]) retryArgSerializer.deSerialize(callbackDTO.getArgsStr(), retryerInfo.getExecutor().getClass(), retryerInfo.getExecutorMethod());
} catch (JsonProcessingException e) {
throw new XRetryClientException("参数解析异常", e);
}
Class<? extends RetryCompleteCallback> retryCompleteCallbackClazz = retryerInfo.getRetryCompleteCallback();
RetryCompleteCallback retryCompleteCallback = SpringContext.getBeanByType(retryCompleteCallbackClazz);
if (RetryStatusEnum.FINISH.getStatus().equals(callbackDTO.getRetryStatus())) {
retryCompleteCallback.doSuccessCallback(retryerInfo.getScene(), retryerInfo.getExecutorClassName(), deSerialize);
}
if (RetryStatusEnum.MAX_RETRY_COUNT.getStatus().equals(callbackDTO.getRetryStatus())) {
retryCompleteCallback.doMaxRetryCallback(retryerInfo.getScene(), retryerInfo.getExecutorClassName(), deSerialize);
}
return new Result();
}
}

View File

@ -3,6 +3,7 @@ package com.x.retry.client.core.register.scan;
import com.x.retry.client.core.BizIdGenerate;
import com.x.retry.client.core.annotation.Retryable;
import com.x.retry.client.core.Scanner;
import com.x.retry.client.core.callback.RetryCompleteCallback;
import com.x.retry.client.core.retryer.RetryType;
import com.x.retry.client.core.retryer.RetryerInfo;
import com.x.retry.client.core.strategy.RetryMethod;
@ -77,6 +78,7 @@ public class RetryableScanner implements Scanner, ApplicationContextAware {
int localInterval = retryable.localInterval();
Class<? extends RetryMethod> retryMethod = retryable.retryMethod();
boolean throwException = retryable.isThrowException();
Class<? extends RetryCompleteCallback> retryCompleteCallback = retryable.retryCompleteCallback();
return new RetryerInfo(retryable.scene(),
executorClassName,
@ -90,7 +92,8 @@ public class RetryableScanner implements Scanner, ApplicationContextAware {
bizIdGenerate,
bizNo,
retryMethod,
throwException
throwException,
retryCompleteCallback
);
}

View File

@ -1,6 +1,7 @@
package com.x.retry.client.core.retryer;
import com.x.retry.client.core.BizIdGenerate;
import com.x.retry.client.core.callback.RetryCompleteCallback;
import com.x.retry.client.core.strategy.RetryMethod;
import lombok.AllArgsConstructor;
import lombok.Data;
@ -29,4 +30,5 @@ public class RetryerInfo {
private final String bizNo;
private final Class<? extends RetryMethod> retryMethod;
private final boolean isThrowException;
private final Class<? extends RetryCompleteCallback> retryCompleteCallback;
}

View File

@ -0,0 +1,26 @@
package com.x.retry.client.model;
import lombok.Data;
import org.hibernate.validator.constraints.NotBlank;
/**
* 服务端调度重试入参
*
* @auther www.byteblogs.com
* @date 2022/03/25 10:06
*/
@Data
public class RetryCallbackDTO {
@NotBlank(message = "group 不能为空")
private String group;
@NotBlank(message = "scene 不能为空")
private String scene;
@NotBlank(message = "参数 不能为空")
private String argsStr;
@NotBlank(message = "bizId 不能为空")
private String bizId;
@NotBlank(message = "executorName 不能为空")
private String executorName;
@NotBlank(message = "retryStatus 不能为空")
private Integer retryStatus;
}

View File

@ -3,6 +3,7 @@ package com.x.retry.server.akka;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.x.retry.common.core.context.SpringContext;
import com.x.retry.server.support.dispatch.actor.callback.CallbackRetryResultActor;
import com.x.retry.server.support.dispatch.actor.exec.ExecUnitActor;
import com.x.retry.server.support.dispatch.actor.result.FailureActor;
import com.x.retry.server.support.dispatch.actor.result.FinishActor;
@ -46,6 +47,15 @@ public class ActorGenerator {
return getDispatchResultActorSystem().actorOf(getSpringExtension().props(NoRetryActor.BEAN_NAME));
}
/**
* 不触发重试actor
*
* @return actor 引用
*/
public static ActorRef callbackRetryResultActor() {
return getDispatchResultActorSystem().actorOf(getSpringExtension().props(CallbackRetryResultActor.BEAN_NAME));
}
/**
* 生成重试执行的actor
*

View File

@ -0,0 +1,91 @@
package com.x.retry.server.support.dispatch.actor.callback;
import akka.actor.AbstractActor;
import cn.hutool.core.util.IdUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.x.retry.client.model.DispatchRetryDTO;
import com.x.retry.client.model.RetryCallbackDTO;
import com.x.retry.common.core.constant.SystemConstants;
import com.x.retry.common.core.log.LogUtils;
import com.x.retry.common.core.model.Result;
import com.x.retry.common.core.model.XRetryHeaders;
import com.x.retry.common.core.util.JsonUtil;
import com.x.retry.server.persistence.mybatis.mapper.ServerNodeMapper;
import com.x.retry.server.persistence.mybatis.po.GroupConfig;
import com.x.retry.server.persistence.mybatis.po.RetryTask;
import com.x.retry.server.persistence.mybatis.po.ServerNode;
import com.x.retry.server.persistence.support.ConfigAccess;
import com.x.retry.server.support.ClientLoadBalance;
import com.x.retry.server.support.allocate.client.ClientLoadBalanceManager;
import com.x.retry.server.support.handler.ClientNodeAllocateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.web.client.RestTemplate;
import java.text.MessageFormat;
import java.util.List;
import java.util.Objects;
import java.util.TreeSet;
import java.util.stream.Collectors;
/**
* @author: www.byteblogs.com
* @date : 2023-01-10 08:50
*/
@Component("CallbackRetryResultActor")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class CallbackRetryResultActor extends AbstractActor {
public static final String BEAN_NAME = "CallbackRetryResultActor";
public static final String URL = "http://{0}:{1}/{2}/retry/callback/v1";
@Autowired
private RestTemplate restTemplate;
@Autowired
private ClientNodeAllocateHandler clientNodeAllocateHandler;
@Override
public Receive createReceive() {
return receiveBuilder().match(RetryTask.class, retryTask->{
ServerNode serverNode = clientNodeAllocateHandler.getServerNode(retryTask);
if (Objects.isNull(serverNode)) {
log.warn("暂无可用的客户端节点");
return;
}
// 回调参数
RetryCallbackDTO retryCallbackDTO = new RetryCallbackDTO();
retryCallbackDTO.setBizId(retryTask.getBizId());
retryCallbackDTO.setRetryStatus(retryTask.getRetryStatus());
retryCallbackDTO.setArgsStr(retryTask.getArgsStr());
retryCallbackDTO.setScene(retryTask.getSceneName());
retryCallbackDTO.setGroup(retryTask.getGroupName());
retryCallbackDTO.setExecutorName(retryTask.getExecutorName());
// 设置header
HttpHeaders requestHeaders = new HttpHeaders();
XRetryHeaders xRetryHeaders = new XRetryHeaders();
xRetryHeaders.setXRetry(Boolean.TRUE);
xRetryHeaders.setXRetryId(IdUtil.simpleUUID());
requestHeaders.add(SystemConstants.X_RETRY_HEAD_KEY, JsonUtil.toJsonString(xRetryHeaders));
HttpEntity<RetryCallbackDTO> requestEntity = new HttpEntity<>(retryCallbackDTO, requestHeaders);
String format = MessageFormat.format(URL, serverNode.getHostIp(), serverNode.getHostPort().toString(), serverNode.getContextPath());
Result result = restTemplate.postForObject(format, requestEntity, Result.class);
LogUtils.info("回调请求客户端 response:[{}}] ", JsonUtil.toJsonString(result));
}).build();
}
}

View File

@ -1,11 +1,13 @@
package com.x.retry.server.support.dispatch.actor.result;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.x.retry.common.core.enums.RetryStatusEnum;
import com.x.retry.common.core.log.LogUtils;
import com.x.retry.common.core.util.Assert;
import com.x.retry.server.akka.ActorGenerator;
import com.x.retry.server.exception.XRetryServerException;
import com.x.retry.server.persistence.mybatis.mapper.RetryTaskLogMapper;
import com.x.retry.server.persistence.mybatis.po.RetryTask;
@ -21,6 +23,7 @@ import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Objects;
/**
* 重试完成执行器
@ -57,12 +60,19 @@ public class FailureActor extends AbstractActor {
SceneConfig sceneConfig =
configAccess.getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName());
ActorRef actorRef = null;
if (sceneConfig.getMaxRetryCount() <= retryTask.getRetryCount()) {
retryTask.setRetryStatus(RetryStatusEnum.MAX_RETRY_COUNT.getStatus());
actorRef = ActorGenerator.callbackRetryResultActor();
}
try {
retryTaskAccess.updateRetryTask(retryTask);
// 重试成功回调客户端
if (Objects.nonNull(actorRef)) {
actorRef.tell(retryTask, actorRef);
}
} catch (Exception e) {
LogUtils.error("更新重试任务失败", e);
} finally {

View File

@ -1,11 +1,13 @@
package com.x.retry.server.support.dispatch.actor.result;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.x.retry.common.core.enums.RetryStatusEnum;
import com.x.retry.common.core.log.LogUtils;
import com.x.retry.common.core.util.Assert;
import com.x.retry.server.akka.ActorGenerator;
import com.x.retry.server.exception.XRetryServerException;
import com.x.retry.server.persistence.mybatis.mapper.RetryTaskLogMapper;
import com.x.retry.server.persistence.mybatis.po.RetryTask;
@ -51,10 +53,14 @@ public class FinishActor extends AbstractActor {
try {
retryTaskAccess.updateRetryTask(retryTask);
// 重试成功回调客户端
ActorRef actorRef = ActorGenerator.callbackRetryResultActor();
actorRef.tell(retryTask, actorRef);
}catch (Exception e) {
LogUtils.error("更新重试任务失败", e);
} finally {
// 更新DB状态
getContext().stop(getSelf());
// 记录重试日志

View File

@ -21,6 +21,7 @@ import com.x.retry.server.support.WaitStrategy;
import com.x.retry.server.support.allocate.client.ClientLoadBalanceManager;
import com.x.retry.server.support.context.MaxAttemptsPersistenceRetryContext;
import com.x.retry.server.support.dispatch.DispatchService;
import com.x.retry.server.support.handler.ClientNodeAllocateHandler;
import com.x.retry.server.support.retry.RetryBuilder;
import com.x.retry.server.support.retry.RetryExecutor;
import com.x.retry.server.support.strategy.FilterStrategies;
@ -63,7 +64,7 @@ public class ScanGroupActor extends AbstractActor {
private ConfigAccess configAccess;
@Autowired
private ServerNodeMapper serverNodeMapper;
private ClientNodeAllocateHandler clientNodeAllocateHandler;
public static final String BEAN_NAME = "ScanGroupActor";
@ -108,7 +109,7 @@ public class ScanGroupActor extends AbstractActor {
MaxAttemptsPersistenceRetryContext<Result<DispatchRetryResultDTO>> retryContext = new MaxAttemptsPersistenceRetryContext<>();
retryContext.setRetryTask(retryTask);
retryContext.setSceneBlacklist(configAccess.getBlacklist(groupName));
retryContext.setServerNode(getServerNode(retryTask));
retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask));
RetryExecutor<Result<DispatchRetryResultDTO>> executor = RetryBuilder.<Result<DispatchRetryResultDTO>>newBuilder()
.withStopStrategy(StopStrategies.stopResultStatus())
@ -152,23 +153,6 @@ public class ScanGroupActor extends AbstractActor {
retryTask.setRetryCount(++retryCount);
}
/**
* 获取分配的节点
*/
public ServerNode getServerNode(RetryTask retryTask) {
GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(retryTask.getGroupName());
List<ServerNode> serverNodes = serverNodeMapper.selectList(new LambdaQueryWrapper<ServerNode>().eq(ServerNode::getGroupName, retryTask.getGroupName()));
if (CollectionUtils.isEmpty(serverNodes)) {
return null;
}
ClientLoadBalance clientLoadBalanceRandom = ClientLoadBalanceManager.getClientLoadBalance(groupConfig.getRouteKey());
String hostIp = clientLoadBalanceRandom.route(retryTask.getGroupName(), new TreeSet<>(serverNodes.stream().map(ServerNode::getHostIp).collect(Collectors.toSet())));
return serverNodes.stream().filter(s -> s.getHostIp().equals(hostIp)).findFirst().get();
}
private void productExecUnitActor(RetryExecutor<Result<DispatchRetryResultDTO>> retryExecutor) {
String groupIdHash = retryExecutor.getRetryContext().getRetryTask().getGroupName();

View File

@ -0,0 +1,51 @@
package com.x.retry.server.support.handler;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.x.retry.server.persistence.mybatis.mapper.ServerNodeMapper;
import com.x.retry.server.persistence.mybatis.po.GroupConfig;
import com.x.retry.server.persistence.mybatis.po.RetryTask;
import com.x.retry.server.persistence.mybatis.po.ServerNode;
import com.x.retry.server.persistence.support.ConfigAccess;
import com.x.retry.server.support.ClientLoadBalance;
import com.x.retry.server.support.allocate.client.ClientLoadBalanceManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.TreeSet;
import java.util.stream.Collectors;
/**
* @author: www.byteblogs.com
* @date : 2023-01-10 14:18
*/
@Component
public class ClientNodeAllocateHandler {
@Autowired
@Qualifier("configAccessProcessor")
private ConfigAccess configAccess;
@Autowired
private ServerNodeMapper serverNodeMapper;
/**
* 获取分配的节点
*/
public ServerNode getServerNode(RetryTask retryTask) {
GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(retryTask.getGroupName());
List<ServerNode> serverNodes = serverNodeMapper.selectList(new LambdaQueryWrapper<ServerNode>().eq(ServerNode::getGroupName, retryTask.getGroupName()));
if (CollectionUtils.isEmpty(serverNodes)) {
return null;
}
ClientLoadBalance clientLoadBalanceRandom = ClientLoadBalanceManager.getClientLoadBalance(groupConfig.getRouteKey());
String hostIp = clientLoadBalanceRandom.route(retryTask.getGroupName(), new TreeSet<>(serverNodes.stream().map(ServerNode::getHostIp).collect(Collectors.toSet())));
return serverNodes.stream().filter(s -> s.getHostIp().equals(hostIp)).findFirst().get();
}
}