diff --git a/x-retry-client-core/src/main/java/com/x/retry/client/core/annotation/Retryable.java b/x-retry-client-core/src/main/java/com/x/retry/client/core/annotation/Retryable.java index 79ce1c89a..9bb4152dd 100644 --- a/x-retry-client-core/src/main/java/com/x/retry/client/core/annotation/Retryable.java +++ b/x-retry-client-core/src/main/java/com/x/retry/client/core/annotation/Retryable.java @@ -2,6 +2,8 @@ package com.x.retry.client.core.annotation; import com.x.retry.client.core.BizIdGenerate; +import com.x.retry.client.core.callback.RetryCompleteCallback; +import com.x.retry.client.core.callback.SimpleRetryCompleteCallback; import com.x.retry.client.core.generator.SimpleBizIdGenerate; import com.x.retry.client.core.retryer.RetryType; import com.x.retry.client.core.strategy.RetryAnnotationMethod; @@ -53,6 +55,13 @@ public @interface Retryable { */ Class bizId() default SimpleBizIdGenerate.class; + /** + * 服务端重试完成(重试成功、重试到达最大次数)回调客户端 + * + * @return + */ + Class retryCompleteCallback() default SimpleRetryCompleteCallback.class; + /** * bizNo spel表达式 */ diff --git a/x-retry-client-core/src/main/java/com/x/retry/client/core/callback/RetryCompleteCallback.java b/x-retry-client-core/src/main/java/com/x/retry/client/core/callback/RetryCompleteCallback.java new file mode 100644 index 000000000..6d5c0f7d9 --- /dev/null +++ b/x-retry-client-core/src/main/java/com/x/retry/client/core/callback/RetryCompleteCallback.java @@ -0,0 +1,12 @@ +package com.x.retry.client.core.callback; + +/** + * @author: www.byteblogs.com + * @date : 2023-01-10 14:46 + */ +public interface RetryCompleteCallback { + + void doSuccessCallback(String sceneName, String executorName, Object[] params); + + void doMaxRetryCallback(String sceneName, String executorName, Object[] params); +} diff --git a/x-retry-client-core/src/main/java/com/x/retry/client/core/callback/SimpleRetryCompleteCallback.java b/x-retry-client-core/src/main/java/com/x/retry/client/core/callback/SimpleRetryCompleteCallback.java new file mode 100644 index 000000000..2019233f9 --- /dev/null +++ b/x-retry-client-core/src/main/java/com/x/retry/client/core/callback/SimpleRetryCompleteCallback.java @@ -0,0 +1,21 @@ +package com.x.retry.client.core.callback; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * @author: www.byteblogs.com + * @date : 2023-01-10 14:47 + */ +@Component +@Slf4j +public class SimpleRetryCompleteCallback implements RetryCompleteCallback { + + @Override + public void doSuccessCallback(String sceneName, String executorName, Object[] params) { + } + + @Override + public void doMaxRetryCallback(String sceneName, String executorName, Object[] params) { + } +} diff --git a/x-retry-client-core/src/main/java/com/x/retry/client/core/client/RetryEndPoint.java b/x-retry-client-core/src/main/java/com/x/retry/client/core/client/RetryEndPoint.java index 00ec4e340..5bc19027e 100644 --- a/x-retry-client-core/src/main/java/com/x/retry/client/core/client/RetryEndPoint.java +++ b/x-retry-client-core/src/main/java/com/x/retry/client/core/client/RetryEndPoint.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.x.retry.client.core.RetryArgSerializer; import com.x.retry.client.core.cache.GroupVersionCache; import com.x.retry.client.core.cache.RetryerInfoCache; +import com.x.retry.client.core.callback.RetryCompleteCallback; import com.x.retry.client.core.exception.XRetryClientException; import com.x.retry.client.core.intercepter.RetrySiteSnapshot; import com.x.retry.client.core.retryer.RetryerInfo; @@ -12,7 +13,10 @@ import com.x.retry.client.core.serializer.JacksonSerializer; import com.x.retry.client.core.strategy.RetryStrategy; import com.x.retry.client.model.DispatchRetryDTO; import com.x.retry.client.model.DispatchRetryResultDTO; +import com.x.retry.client.model.RetryCallbackDTO; +import com.x.retry.common.core.context.SpringContext; import com.x.retry.common.core.enums.RetryResultStatusEnum; +import com.x.retry.common.core.enums.RetryStatusEnum; import com.x.retry.common.core.model.Result; import com.x.retry.common.core.util.JsonUtil; import com.x.retry.server.model.dto.ConfigDTO; @@ -90,4 +94,34 @@ public class RetryEndPoint { GroupVersionCache.configDTO = configDTO; return new Result(); } + + @PostMapping("/callback/v1") + public Result callback(@RequestBody RetryCallbackDTO callbackDTO) { + RetryerInfo retryerInfo = RetryerInfoCache.get(callbackDTO.getScene(), callbackDTO.getExecutorName()); + if (Objects.isNull(retryerInfo)) { + throw new XRetryClientException("场景:[{}]配置不存在", callbackDTO.getScene()); + } + + RetryArgSerializer retryArgSerializer = new JacksonSerializer(); + + Object[] deSerialize = null; + try { + deSerialize = (Object[]) retryArgSerializer.deSerialize(callbackDTO.getArgsStr(), retryerInfo.getExecutor().getClass(), retryerInfo.getExecutorMethod()); + } catch (JsonProcessingException e) { + throw new XRetryClientException("参数解析异常", e); + } + + Class retryCompleteCallbackClazz = retryerInfo.getRetryCompleteCallback(); + RetryCompleteCallback retryCompleteCallback = SpringContext.getBeanByType(retryCompleteCallbackClazz); + + if (RetryStatusEnum.FINISH.getStatus().equals(callbackDTO.getRetryStatus())) { + retryCompleteCallback.doSuccessCallback(retryerInfo.getScene(), retryerInfo.getExecutorClassName(), deSerialize); + } + + if (RetryStatusEnum.MAX_RETRY_COUNT.getStatus().equals(callbackDTO.getRetryStatus())) { + retryCompleteCallback.doMaxRetryCallback(retryerInfo.getScene(), retryerInfo.getExecutorClassName(), deSerialize); + } + + return new Result(); + } } diff --git a/x-retry-client-core/src/main/java/com/x/retry/client/core/register/scan/RetryableScanner.java b/x-retry-client-core/src/main/java/com/x/retry/client/core/register/scan/RetryableScanner.java index 7b3f1521b..3cf2a0752 100644 --- a/x-retry-client-core/src/main/java/com/x/retry/client/core/register/scan/RetryableScanner.java +++ b/x-retry-client-core/src/main/java/com/x/retry/client/core/register/scan/RetryableScanner.java @@ -3,6 +3,7 @@ package com.x.retry.client.core.register.scan; import com.x.retry.client.core.BizIdGenerate; import com.x.retry.client.core.annotation.Retryable; import com.x.retry.client.core.Scanner; +import com.x.retry.client.core.callback.RetryCompleteCallback; import com.x.retry.client.core.retryer.RetryType; import com.x.retry.client.core.retryer.RetryerInfo; import com.x.retry.client.core.strategy.RetryMethod; @@ -77,6 +78,7 @@ public class RetryableScanner implements Scanner, ApplicationContextAware { int localInterval = retryable.localInterval(); Class retryMethod = retryable.retryMethod(); boolean throwException = retryable.isThrowException(); + Class retryCompleteCallback = retryable.retryCompleteCallback(); return new RetryerInfo(retryable.scene(), executorClassName, @@ -90,7 +92,8 @@ public class RetryableScanner implements Scanner, ApplicationContextAware { bizIdGenerate, bizNo, retryMethod, - throwException + throwException, + retryCompleteCallback ); } diff --git a/x-retry-client-core/src/main/java/com/x/retry/client/core/retryer/RetryerInfo.java b/x-retry-client-core/src/main/java/com/x/retry/client/core/retryer/RetryerInfo.java index b137e3677..eeaea8f92 100644 --- a/x-retry-client-core/src/main/java/com/x/retry/client/core/retryer/RetryerInfo.java +++ b/x-retry-client-core/src/main/java/com/x/retry/client/core/retryer/RetryerInfo.java @@ -1,6 +1,7 @@ package com.x.retry.client.core.retryer; import com.x.retry.client.core.BizIdGenerate; +import com.x.retry.client.core.callback.RetryCompleteCallback; import com.x.retry.client.core.strategy.RetryMethod; import lombok.AllArgsConstructor; import lombok.Data; @@ -29,4 +30,5 @@ public class RetryerInfo { private final String bizNo; private final Class retryMethod; private final boolean isThrowException; + private final Class retryCompleteCallback; } diff --git a/x-retry-common/x-retry-common-client-api/src/main/java/com/x/retry/client/model/RetryCallbackDTO.java b/x-retry-common/x-retry-common-client-api/src/main/java/com/x/retry/client/model/RetryCallbackDTO.java new file mode 100644 index 000000000..bbbee2bc9 --- /dev/null +++ b/x-retry-common/x-retry-common-client-api/src/main/java/com/x/retry/client/model/RetryCallbackDTO.java @@ -0,0 +1,26 @@ +package com.x.retry.client.model; + +import lombok.Data; +import org.hibernate.validator.constraints.NotBlank; + +/** + * 服务端调度重试入参 + * + * @auther www.byteblogs.com + * @date 2022/03/25 10:06 + */ +@Data +public class RetryCallbackDTO { + @NotBlank(message = "group 不能为空") + private String group; + @NotBlank(message = "scene 不能为空") + private String scene; + @NotBlank(message = "参数 不能为空") + private String argsStr; + @NotBlank(message = "bizId 不能为空") + private String bizId; + @NotBlank(message = "executorName 不能为空") + private String executorName; + @NotBlank(message = "retryStatus 不能为空") + private Integer retryStatus; +} diff --git a/x-retry-server/src/main/java/com/x/retry/server/akka/ActorGenerator.java b/x-retry-server/src/main/java/com/x/retry/server/akka/ActorGenerator.java index e1a5bbc51..d3b8a4061 100644 --- a/x-retry-server/src/main/java/com/x/retry/server/akka/ActorGenerator.java +++ b/x-retry-server/src/main/java/com/x/retry/server/akka/ActorGenerator.java @@ -3,6 +3,7 @@ package com.x.retry.server.akka; import akka.actor.ActorRef; import akka.actor.ActorSystem; import com.x.retry.common.core.context.SpringContext; +import com.x.retry.server.support.dispatch.actor.callback.CallbackRetryResultActor; import com.x.retry.server.support.dispatch.actor.exec.ExecUnitActor; import com.x.retry.server.support.dispatch.actor.result.FailureActor; import com.x.retry.server.support.dispatch.actor.result.FinishActor; @@ -46,6 +47,15 @@ public class ActorGenerator { return getDispatchResultActorSystem().actorOf(getSpringExtension().props(NoRetryActor.BEAN_NAME)); } + /** + * 不触发重试actor + * + * @return actor 引用 + */ + public static ActorRef callbackRetryResultActor() { + return getDispatchResultActorSystem().actorOf(getSpringExtension().props(CallbackRetryResultActor.BEAN_NAME)); + } + /** * 生成重试执行的actor * diff --git a/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/callback/CallbackRetryResultActor.java b/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/callback/CallbackRetryResultActor.java new file mode 100644 index 000000000..52e11efba --- /dev/null +++ b/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/callback/CallbackRetryResultActor.java @@ -0,0 +1,91 @@ +package com.x.retry.server.support.dispatch.actor.callback; + +import akka.actor.AbstractActor; +import cn.hutool.core.util.IdUtil; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.x.retry.client.model.DispatchRetryDTO; +import com.x.retry.client.model.RetryCallbackDTO; +import com.x.retry.common.core.constant.SystemConstants; +import com.x.retry.common.core.log.LogUtils; +import com.x.retry.common.core.model.Result; +import com.x.retry.common.core.model.XRetryHeaders; +import com.x.retry.common.core.util.JsonUtil; +import com.x.retry.server.persistence.mybatis.mapper.ServerNodeMapper; +import com.x.retry.server.persistence.mybatis.po.GroupConfig; +import com.x.retry.server.persistence.mybatis.po.RetryTask; +import com.x.retry.server.persistence.mybatis.po.ServerNode; +import com.x.retry.server.persistence.support.ConfigAccess; +import com.x.retry.server.support.ClientLoadBalance; +import com.x.retry.server.support.allocate.client.ClientLoadBalanceManager; +import com.x.retry.server.support.handler.ClientNodeAllocateHandler; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; +import org.springframework.web.client.RestTemplate; + +import java.text.MessageFormat; +import java.util.List; +import java.util.Objects; +import java.util.TreeSet; +import java.util.stream.Collectors; + +/** + * @author: www.byteblogs.com + * @date : 2023-01-10 08:50 + */ +@Component("CallbackRetryResultActor") +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +@Slf4j +public class CallbackRetryResultActor extends AbstractActor { + + public static final String BEAN_NAME = "CallbackRetryResultActor"; + public static final String URL = "http://{0}:{1}/{2}/retry/callback/v1"; + + @Autowired + private RestTemplate restTemplate; + @Autowired + private ClientNodeAllocateHandler clientNodeAllocateHandler; + + @Override + public Receive createReceive() { + return receiveBuilder().match(RetryTask.class, retryTask->{ + + ServerNode serverNode = clientNodeAllocateHandler.getServerNode(retryTask); + if (Objects.isNull(serverNode)) { + log.warn("暂无可用的客户端节点"); + return; + } + + // 回调参数 + RetryCallbackDTO retryCallbackDTO = new RetryCallbackDTO(); + retryCallbackDTO.setBizId(retryTask.getBizId()); + retryCallbackDTO.setRetryStatus(retryTask.getRetryStatus()); + retryCallbackDTO.setArgsStr(retryTask.getArgsStr()); + retryCallbackDTO.setScene(retryTask.getSceneName()); + retryCallbackDTO.setGroup(retryTask.getGroupName()); + retryCallbackDTO.setExecutorName(retryTask.getExecutorName()); + + // 设置header + HttpHeaders requestHeaders = new HttpHeaders(); + XRetryHeaders xRetryHeaders = new XRetryHeaders(); + xRetryHeaders.setXRetry(Boolean.TRUE); + xRetryHeaders.setXRetryId(IdUtil.simpleUUID()); + requestHeaders.add(SystemConstants.X_RETRY_HEAD_KEY, JsonUtil.toJsonString(xRetryHeaders)); + + HttpEntity requestEntity = new HttpEntity<>(retryCallbackDTO, requestHeaders); + + String format = MessageFormat.format(URL, serverNode.getHostIp(), serverNode.getHostPort().toString(), serverNode.getContextPath()); + Result result = restTemplate.postForObject(format, requestEntity, Result.class); + LogUtils.info("回调请求客户端 response:[{}}] ", JsonUtil.toJsonString(result)); + + }).build(); + } + + +} diff --git a/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/result/FailureActor.java b/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/result/FailureActor.java index d1d53cd38..6035e4e9f 100644 --- a/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/result/FailureActor.java +++ b/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/result/FailureActor.java @@ -1,11 +1,13 @@ package com.x.retry.server.support.dispatch.actor.result; import akka.actor.AbstractActor; +import akka.actor.ActorRef; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.x.retry.common.core.enums.RetryStatusEnum; import com.x.retry.common.core.log.LogUtils; import com.x.retry.common.core.util.Assert; +import com.x.retry.server.akka.ActorGenerator; import com.x.retry.server.exception.XRetryServerException; import com.x.retry.server.persistence.mybatis.mapper.RetryTaskLogMapper; import com.x.retry.server.persistence.mybatis.po.RetryTask; @@ -21,6 +23,7 @@ import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.util.List; +import java.util.Objects; /** * 重试完成执行器 @@ -57,12 +60,19 @@ public class FailureActor extends AbstractActor { SceneConfig sceneConfig = configAccess.getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName()); + ActorRef actorRef = null; if (sceneConfig.getMaxRetryCount() <= retryTask.getRetryCount()) { retryTask.setRetryStatus(RetryStatusEnum.MAX_RETRY_COUNT.getStatus()); + actorRef = ActorGenerator.callbackRetryResultActor(); } try { retryTaskAccess.updateRetryTask(retryTask); + + // 重试成功回调客户端 + if (Objects.nonNull(actorRef)) { + actorRef.tell(retryTask, actorRef); + } } catch (Exception e) { LogUtils.error("更新重试任务失败", e); } finally { diff --git a/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/result/FinishActor.java b/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/result/FinishActor.java index dbc2e60d7..b7cbc2fc1 100644 --- a/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/result/FinishActor.java +++ b/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/result/FinishActor.java @@ -1,11 +1,13 @@ package com.x.retry.server.support.dispatch.actor.result; import akka.actor.AbstractActor; +import akka.actor.ActorRef; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.x.retry.common.core.enums.RetryStatusEnum; import com.x.retry.common.core.log.LogUtils; import com.x.retry.common.core.util.Assert; +import com.x.retry.server.akka.ActorGenerator; import com.x.retry.server.exception.XRetryServerException; import com.x.retry.server.persistence.mybatis.mapper.RetryTaskLogMapper; import com.x.retry.server.persistence.mybatis.po.RetryTask; @@ -51,10 +53,14 @@ public class FinishActor extends AbstractActor { try { retryTaskAccess.updateRetryTask(retryTask); + + // 重试成功回调客户端 + ActorRef actorRef = ActorGenerator.callbackRetryResultActor(); + actorRef.tell(retryTask, actorRef); }catch (Exception e) { LogUtils.error("更新重试任务失败", e); } finally { - // 更新DB状态 + getContext().stop(getSelf()); // 记录重试日志 diff --git a/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/scan/ScanGroupActor.java b/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/scan/ScanGroupActor.java index 1581ea720..d92ed31d1 100644 --- a/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/scan/ScanGroupActor.java +++ b/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/scan/ScanGroupActor.java @@ -21,6 +21,7 @@ import com.x.retry.server.support.WaitStrategy; import com.x.retry.server.support.allocate.client.ClientLoadBalanceManager; import com.x.retry.server.support.context.MaxAttemptsPersistenceRetryContext; import com.x.retry.server.support.dispatch.DispatchService; +import com.x.retry.server.support.handler.ClientNodeAllocateHandler; import com.x.retry.server.support.retry.RetryBuilder; import com.x.retry.server.support.retry.RetryExecutor; import com.x.retry.server.support.strategy.FilterStrategies; @@ -63,7 +64,7 @@ public class ScanGroupActor extends AbstractActor { private ConfigAccess configAccess; @Autowired - private ServerNodeMapper serverNodeMapper; + private ClientNodeAllocateHandler clientNodeAllocateHandler; public static final String BEAN_NAME = "ScanGroupActor"; @@ -108,7 +109,7 @@ public class ScanGroupActor extends AbstractActor { MaxAttemptsPersistenceRetryContext> retryContext = new MaxAttemptsPersistenceRetryContext<>(); retryContext.setRetryTask(retryTask); retryContext.setSceneBlacklist(configAccess.getBlacklist(groupName)); - retryContext.setServerNode(getServerNode(retryTask)); + retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask)); RetryExecutor> executor = RetryBuilder.>newBuilder() .withStopStrategy(StopStrategies.stopResultStatus()) @@ -152,23 +153,6 @@ public class ScanGroupActor extends AbstractActor { retryTask.setRetryCount(++retryCount); } - /** - * 获取分配的节点 - */ - public ServerNode getServerNode(RetryTask retryTask) { - - GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(retryTask.getGroupName()); - List serverNodes = serverNodeMapper.selectList(new LambdaQueryWrapper().eq(ServerNode::getGroupName, retryTask.getGroupName())); - - if (CollectionUtils.isEmpty(serverNodes)) { - return null; - } - - ClientLoadBalance clientLoadBalanceRandom = ClientLoadBalanceManager.getClientLoadBalance(groupConfig.getRouteKey()); - - String hostIp = clientLoadBalanceRandom.route(retryTask.getGroupName(), new TreeSet<>(serverNodes.stream().map(ServerNode::getHostIp).collect(Collectors.toSet()))); - return serverNodes.stream().filter(s -> s.getHostIp().equals(hostIp)).findFirst().get(); - } private void productExecUnitActor(RetryExecutor> retryExecutor) { String groupIdHash = retryExecutor.getRetryContext().getRetryTask().getGroupName(); diff --git a/x-retry-server/src/main/java/com/x/retry/server/support/handler/ClientNodeAllocateHandler.java b/x-retry-server/src/main/java/com/x/retry/server/support/handler/ClientNodeAllocateHandler.java new file mode 100644 index 000000000..b52b8ff2d --- /dev/null +++ b/x-retry-server/src/main/java/com/x/retry/server/support/handler/ClientNodeAllocateHandler.java @@ -0,0 +1,51 @@ +package com.x.retry.server.support.handler; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.x.retry.server.persistence.mybatis.mapper.ServerNodeMapper; +import com.x.retry.server.persistence.mybatis.po.GroupConfig; +import com.x.retry.server.persistence.mybatis.po.RetryTask; +import com.x.retry.server.persistence.mybatis.po.ServerNode; +import com.x.retry.server.persistence.support.ConfigAccess; +import com.x.retry.server.support.ClientLoadBalance; +import com.x.retry.server.support.allocate.client.ClientLoadBalanceManager; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.util.List; +import java.util.TreeSet; +import java.util.stream.Collectors; + +/** + * @author: www.byteblogs.com + * @date : 2023-01-10 14:18 + */ +@Component +public class ClientNodeAllocateHandler { + + @Autowired + @Qualifier("configAccessProcessor") + private ConfigAccess configAccess; + @Autowired + private ServerNodeMapper serverNodeMapper; + + /** + * 获取分配的节点 + */ + public ServerNode getServerNode(RetryTask retryTask) { + + GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(retryTask.getGroupName()); + List serverNodes = serverNodeMapper.selectList(new LambdaQueryWrapper().eq(ServerNode::getGroupName, retryTask.getGroupName())); + + if (CollectionUtils.isEmpty(serverNodes)) { + return null; + } + + ClientLoadBalance clientLoadBalanceRandom = ClientLoadBalanceManager.getClientLoadBalance(groupConfig.getRouteKey()); + + String hostIp = clientLoadBalanceRandom.route(retryTask.getGroupName(), new TreeSet<>(serverNodes.stream().map(ServerNode::getHostIp).collect(Collectors.toSet()))); + return serverNodes.stream().filter(s -> s.getHostIp().equals(hostIp)).findFirst().get(); + } + +}