From 64b20464986d9120c7881f1492b8d400c90a2cd6 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Sat, 14 Jan 2023 19:58:44 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E9=99=90=E5=88=B6=E5=8D=95?= =?UTF-8?q?=E7=82=B9=E9=87=8D=E8=AF=95=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../x/retry/server/akka/ActorGenerator.java | 10 +++ .../retry/server/config/SystemProperties.java | 5 ++ .../support/cache/CacheGroupRateLimiter.java | 59 ++++++++++++++++++ .../MaxAttemptsPersistenceRetryContext.java | 6 ++ .../support/dispatch/DispatchService.java | 48 ++++++++++++-- .../dispatch/actor/exec/ExecUnitActor.java | 31 +++------- .../dispatch/actor/result/NoRetryActor.java | 57 +++++++++++++++++ .../dispatch/actor/scan/ScanGroupActor.java | 37 ++++++++++- .../support/strategy/FilterStrategies.java | 62 +++++++++++++++---- .../server/XRetryServerApplicationTests.java | 27 ++++++++ 10 files changed, 303 insertions(+), 39 deletions(-) create mode 100644 x-retry-server/src/main/java/com/x/retry/server/support/cache/CacheGroupRateLimiter.java create mode 100644 x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/result/NoRetryActor.java diff --git a/x-retry-server/src/main/java/com/x/retry/server/akka/ActorGenerator.java b/x-retry-server/src/main/java/com/x/retry/server/akka/ActorGenerator.java index 3ac1f6886..e1a5bbc51 100644 --- a/x-retry-server/src/main/java/com/x/retry/server/akka/ActorGenerator.java +++ b/x-retry-server/src/main/java/com/x/retry/server/akka/ActorGenerator.java @@ -6,6 +6,7 @@ import com.x.retry.common.core.context.SpringContext; import com.x.retry.server.support.dispatch.actor.exec.ExecUnitActor; import com.x.retry.server.support.dispatch.actor.result.FailureActor; import com.x.retry.server.support.dispatch.actor.result.FinishActor; +import com.x.retry.server.support.dispatch.actor.result.NoRetryActor; import com.x.retry.server.support.dispatch.actor.scan.ScanGroupActor; /** @@ -36,6 +37,15 @@ public class ActorGenerator { return getDispatchResultActorSystem().actorOf(getSpringExtension().props(FailureActor.BEAN_NAME)); } + /** + * 不触发重试actor + * + * @return actor 引用 + */ + public static ActorRef noRetryActor() { + return getDispatchResultActorSystem().actorOf(getSpringExtension().props(NoRetryActor.BEAN_NAME)); + } + /** * 生成重试执行的actor * diff --git a/x-retry-server/src/main/java/com/x/retry/server/config/SystemProperties.java b/x-retry-server/src/main/java/com/x/retry/server/config/SystemProperties.java index a938c0a01..382c39770 100644 --- a/x-retry-server/src/main/java/com/x/retry/server/config/SystemProperties.java +++ b/x-retry-server/src/main/java/com/x/retry/server/config/SystemProperties.java @@ -35,4 +35,9 @@ public class SystemProperties { */ private int totalPartition = 32; + /** + * 一个客户端每秒最多接收的重试数量指令 + */ + private int limiter = 10; + } diff --git a/x-retry-server/src/main/java/com/x/retry/server/support/cache/CacheGroupRateLimiter.java b/x-retry-server/src/main/java/com/x/retry/server/support/cache/CacheGroupRateLimiter.java new file mode 100644 index 000000000..d290a97ec --- /dev/null +++ b/x-retry-server/src/main/java/com/x/retry/server/support/cache/CacheGroupRateLimiter.java @@ -0,0 +1,59 @@ +package com.x.retry.server.support.cache; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.util.concurrent.RateLimiter; +import com.x.retry.common.core.log.LogUtils; +import com.x.retry.server.support.Lifecycle; +import lombok.Data; +import org.springframework.stereotype.Component; + +import java.util.concurrent.TimeUnit; + +/** + * 缓存组组限流组件 + * + * @author www.byteblogs.com + * @date 2022-21-58 + * @since 2.0 + */ +@Component +@Data +public class CacheGroupRateLimiter implements Lifecycle { + + private static Cache CACHE; + + /** + * 获取所有缓存 + * + * @return 缓存对象 + */ + public static Cache getAll() { + return CACHE; + } + + /** + * 获取所有缓存 + * + * @return 缓存对象 + */ + public static RateLimiter getRateLimiterByKey(String hostId) { + return CACHE.getIfPresent(hostId); + } + + @Override + public void start() { + LogUtils.info("CacheGroupRateLimiter start"); + CACHE = CacheBuilder.newBuilder() + // 设置并发级别为cpu核心数 + .concurrencyLevel(Runtime.getRuntime().availableProcessors()) + .expireAfterWrite(10, TimeUnit.SECONDS) + .expireAfterAccess(10, TimeUnit.SECONDS) + .build(); + } + + @Override + public void close() { + LogUtils.info("CacheGroupRateLimiter stop"); + } +} diff --git a/x-retry-server/src/main/java/com/x/retry/server/support/context/MaxAttemptsPersistenceRetryContext.java b/x-retry-server/src/main/java/com/x/retry/server/support/context/MaxAttemptsPersistenceRetryContext.java index d76e4e572..18bc61ecd 100644 --- a/x-retry-server/src/main/java/com/x/retry/server/support/context/MaxAttemptsPersistenceRetryContext.java +++ b/x-retry-server/src/main/java/com/x/retry/server/support/context/MaxAttemptsPersistenceRetryContext.java @@ -1,6 +1,7 @@ package com.x.retry.server.support.context; import com.x.retry.server.persistence.mybatis.po.RetryTask; +import com.x.retry.server.persistence.mybatis.po.ServerNode; import com.x.retry.server.support.RetryContext; import com.x.retry.server.support.WaitStrategy; import lombok.Data; @@ -36,6 +37,11 @@ public class MaxAttemptsPersistenceRetryContext implements RetryContext { */ private Set sceneBlacklist; + /** + * 需要调度的节点 + */ + private ServerNode serverNode; + @Override public void setCallResult(V v) { this.callResult = v; diff --git a/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/DispatchService.java b/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/DispatchService.java index 5ba339ad6..705dd3407 100644 --- a/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/DispatchService.java +++ b/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/DispatchService.java @@ -2,18 +2,21 @@ package com.x.retry.server.support.dispatch; import akka.actor.ActorRef; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.google.common.util.concurrent.RateLimiter; import com.x.retry.common.core.enums.NodeTypeEnum; import com.x.retry.common.core.log.LogUtils; import com.x.retry.common.core.util.HostUtils; import com.google.common.cache.Cache; import com.x.retry.common.core.util.JsonUtil; import com.x.retry.server.akka.ActorGenerator; +import com.x.retry.server.config.SystemProperties; import com.x.retry.server.support.allocate.server.AllocateMessageQueueConsistentHash; import com.x.retry.server.persistence.mybatis.mapper.ServerNodeMapper; import com.x.retry.server.persistence.mybatis.po.GroupConfig; import com.x.retry.server.persistence.mybatis.po.ServerNode; import com.x.retry.server.persistence.support.ConfigAccess; import com.x.retry.server.support.Lifecycle; +import com.x.retry.server.support.cache.CacheGroupRateLimiter; import com.x.retry.server.support.cache.CacheGroupScanActor; import com.x.retry.server.support.handler.ServerRegisterNodeHandler; import org.springframework.beans.factory.annotation.Autowired; @@ -67,6 +70,9 @@ public class DispatchService implements Lifecycle { @Qualifier("configAccessProcessor") private ConfigAccess configAccess; + @Autowired + private SystemProperties systemProperties; + @Override public void start() { @@ -77,9 +83,8 @@ public class DispatchService implements Lifecycle { LogUtils.info("当前节点[{}] 分配的组:[{}]", HostUtils.getIp(), JsonUtil.toJsonString(currentHostGroupList)); if (!CollectionUtils.isEmpty(currentHostGroupList)) { - Cache actorRefCache = CacheGroupScanActor.getAll(); for (GroupConfig groupConfigContext : currentHostGroupList) { - produceScanActorTask(actorRefCache, groupConfigContext); + produceScanActorTask(groupConfigContext); } } @@ -94,20 +99,51 @@ public class DispatchService implements Lifecycle { /** * 扫描任务生成器 * - * @param actorRefCache 扫描任务actor缓存器 * @param groupConfig {@link GroupConfig} 组上下文 */ - private void produceScanActorTask(Cache actorRefCache, GroupConfig groupConfig) { + private void produceScanActorTask(GroupConfig groupConfig) { String groupName = groupConfig.getGroupName(); + + ActorRef scanActorRef = cacheActorRef(groupName); + + // 缓存按照 + cacheRateLimiter(groupName); + + // rebalance 和 group scan 流程合一 + scanActorRef.tell(groupConfig, scanActorRef); + } + + /** + * 缓存限流对象 + */ + private void cacheRateLimiter(String groupName) { + List serverNodes = serverNodeMapper.selectList(new LambdaQueryWrapper() + .eq(ServerNode::getGroupName, groupName)); + Cache rateLimiterCache = CacheGroupRateLimiter.getAll(); + for (ServerNode serverNode : serverNodes) { + RateLimiter rateLimiter = rateLimiterCache.getIfPresent(serverNode.getHostId()); + if (Objects.isNull(rateLimiter)) { + rateLimiterCache.put(groupName, RateLimiter.create(systemProperties.getLimiter())); + } + } + + rateLimiterCache.invalidateAll(); + + } + + /** + * 缓存Actor对象 + */ + private ActorRef cacheActorRef(String groupName) { + Cache actorRefCache = CacheGroupScanActor.getAll(); ActorRef scanActorRef = actorRefCache.getIfPresent(groupName); if (Objects.isNull(scanActorRef)) { scanActorRef = ActorGenerator.scanGroupActor(); // 缓存扫描器actor actorRefCache.put(groupName, scanActorRef); } - // rebalance 和 group scan 流程合一 - scanActorRef.tell(groupConfig, scanActorRef); + return scanActorRef; } /** diff --git a/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/exec/ExecUnitActor.java b/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/exec/ExecUnitActor.java index 233dfb5a9..f0cf54421 100644 --- a/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/exec/ExecUnitActor.java +++ b/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/exec/ExecUnitActor.java @@ -2,6 +2,7 @@ package com.x.retry.server.support.dispatch.actor.exec; import akka.actor.AbstractActor; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.google.common.util.concurrent.RateLimiter; import com.x.retry.client.model.DispatchRetryDTO; import com.x.retry.client.model.DispatchRetryResultDTO; import com.x.retry.common.core.log.LogUtils; @@ -19,6 +20,8 @@ import com.x.retry.server.persistence.support.ConfigAccess; import com.x.retry.server.support.ClientLoadBalance; import com.x.retry.server.support.IdempotentStrategy; import com.x.retry.server.support.allocate.client.ClientLoadBalanceManager; +import com.x.retry.server.support.cache.CacheGroupRateLimiter; +import com.x.retry.server.support.context.MaxAttemptsPersistenceRetryContext; import com.x.retry.server.support.retry.RetryExecutor; import org.apache.commons.lang.StringUtils; import org.springframework.beans.BeanUtils; @@ -36,6 +39,7 @@ import java.util.List; import java.util.Objects; import java.util.TreeSet; import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** @@ -55,21 +59,11 @@ public class ExecUnitActor extends AbstractActor { @Autowired @Qualifier("bitSetIdempotentStrategyHandler") private IdempotentStrategy idempotentStrategy; - @Autowired private RetryTaskLogMapper retryTaskLogMapper; - - @Autowired - private ServerNodeMapper serverNodeMapper; - @Autowired private RestTemplate restTemplate; - @Autowired - @Qualifier("configAccessProcessor") - private ConfigAccess configAccess; - - @Override public Receive createReceive() { return receiveBuilder().match(RetryExecutor.class, retryExecutor -> { @@ -77,13 +71,14 @@ public class ExecUnitActor extends AbstractActor { RetryTaskLog retryTaskLog = new RetryTaskLog(); retryTaskLog.setErrorMessage(StringUtils.EMPTY); - RetryTask retryTask = retryExecutor.getRetryContext().getRetryTask(); + MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryExecutor.getRetryContext(); + RetryTask retryTask = context.getRetryTask(); + ServerNode serverNode = context.getServerNode(); try { - List serverNodes = serverNodeMapper.selectList(new LambdaQueryWrapper().eq(ServerNode::getGroupName, retryTask.getGroupName())); - if (!CollectionUtils.isEmpty(serverNodes)) { - Object call = retryExecutor.call((Callable>) () -> callClient(retryTask, retryTaskLog, serverNodes)); + if (Objects.nonNull(serverNode)) { + Object call = retryExecutor.call((Callable>) () -> callClient(retryTask, retryTaskLog, serverNode)); } else { retryTaskLog.setErrorMessage("暂无可用的客户端POD"); } @@ -115,13 +110,7 @@ public class ExecUnitActor extends AbstractActor { * @param retryTask {@link RetryTask} 需要重试的数据 * @return 重试结果返回值 */ - private Result callClient(RetryTask retryTask, RetryTaskLog retryTaskLog, List serverNodes) { - - GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(retryTask.getGroupName()); - ClientLoadBalance clientLoadBalanceRandom = ClientLoadBalanceManager.getClientLoadBalance(groupConfig.getRouteKey()); - - String hostIp = clientLoadBalanceRandom.route(retryTask.getGroupName(), new TreeSet<>(serverNodes.stream().map(ServerNode::getHostIp).collect(Collectors.toSet()))); - ServerNode serverNode = serverNodes.stream().filter(s -> s.getHostIp().equals(hostIp)).findFirst().get(); + private Result callClient(RetryTask retryTask, RetryTaskLog retryTaskLog, ServerNode serverNode) { DispatchRetryDTO dispatchRetryDTO = new DispatchRetryDTO(); dispatchRetryDTO.setBizId(retryTask.getBizId()); diff --git a/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/result/NoRetryActor.java b/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/result/NoRetryActor.java new file mode 100644 index 000000000..115682ced --- /dev/null +++ b/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/result/NoRetryActor.java @@ -0,0 +1,57 @@ +package com.x.retry.server.support.dispatch.actor.result; + +import akka.actor.AbstractActor; +import cn.hutool.json.JSON; +import com.x.retry.common.core.log.LogUtils; +import com.x.retry.common.core.util.JsonUtil; +import com.x.retry.server.persistence.mybatis.po.RetryTask; +import com.x.retry.server.persistence.support.RetryTaskAccess; +import com.x.retry.server.support.WaitStrategy; +import com.x.retry.server.support.context.MaxAttemptsPersistenceRetryContext; +import com.x.retry.server.support.retry.RetryExecutor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +/** + * 不重试,只更新下次触发时间 + * + * @author: shuguang.zhang + * @date : 2022-04-14 16:11 + */ +@Component("NoRetryActor") +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class NoRetryActor extends AbstractActor { + + public static final String BEAN_NAME = "NoRetryActor"; + + @Autowired + @Qualifier("retryTaskAccessProcessor") + private RetryTaskAccess retryTaskAccess; + + @Override + public Receive createReceive() { + return receiveBuilder().match(RetryExecutor.class, retryExecutor -> { + + MaxAttemptsPersistenceRetryContext retryContext = (MaxAttemptsPersistenceRetryContext) retryExecutor.getRetryContext(); + RetryTask retryTask = retryContext.getRetryTask(); + WaitStrategy waitStrategy = retryContext.getWaitStrategy(); + retryTask.setNextTriggerAt(waitStrategy.computeRetryTime(retryContext)); + + // 不更新重试次数 + retryTask.setRetryCount(null); + try { + retryTaskAccess.updateRetryTask(retryTask); + }catch (Exception e) { + LogUtils.error("更新重试任务失败", e); + } finally { + // 更新DB状态 + getContext().stop(getSelf()); + } + + }).build(); + } + +} diff --git a/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/scan/ScanGroupActor.java b/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/scan/ScanGroupActor.java index b6eb9b68b..9d2bf235d 100644 --- a/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/scan/ScanGroupActor.java +++ b/x-retry-server/src/main/java/com/x/retry/server/support/dispatch/actor/scan/ScanGroupActor.java @@ -2,18 +2,23 @@ package com.x.retry.server.support.dispatch.actor.scan; import akka.actor.AbstractActor; import akka.actor.ActorRef; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.x.retry.common.core.log.LogUtils; import com.x.retry.client.model.DispatchRetryResultDTO; import com.x.retry.common.core.model.Result; import com.x.retry.server.akka.ActorGenerator; import com.x.retry.server.config.SystemProperties; +import com.x.retry.server.persistence.mybatis.mapper.ServerNodeMapper; import com.x.retry.server.persistence.mybatis.po.GroupConfig; import com.x.retry.server.persistence.mybatis.po.RetryTask; import com.x.retry.server.persistence.mybatis.po.SceneConfig; +import com.x.retry.server.persistence.mybatis.po.ServerNode; import com.x.retry.server.persistence.support.ConfigAccess; import com.x.retry.server.persistence.support.RetryTaskAccess; +import com.x.retry.server.support.ClientLoadBalance; import com.x.retry.server.support.IdempotentStrategy; import com.x.retry.server.support.WaitStrategy; +import com.x.retry.server.support.allocate.client.ClientLoadBalanceManager; import com.x.retry.server.support.context.MaxAttemptsPersistenceRetryContext; import com.x.retry.server.support.dispatch.DispatchService; import com.x.retry.server.support.retry.RetryBuilder; @@ -30,6 +35,8 @@ import org.springframework.util.CollectionUtils; import java.time.LocalDateTime; import java.util.List; +import java.util.TreeSet; +import java.util.stream.Collectors; /** * @author www.byteblogs.com @@ -55,6 +62,9 @@ public class ScanGroupActor extends AbstractActor { @Qualifier("configAccessProcessor") private ConfigAccess configAccess; + @Autowired + private ServerNodeMapper serverNodeMapper; + public static final String BEAN_NAME = "ScanGroupActor"; @Override @@ -97,6 +107,7 @@ public class ScanGroupActor extends AbstractActor { MaxAttemptsPersistenceRetryContext> retryContext = new MaxAttemptsPersistenceRetryContext<>(); retryContext.setRetryTask(retryTask); retryContext.setSceneBlacklist(configAccess.getBlacklist(groupName)); + retryContext.setServerNode(getServerNode(retryTask)); RetryExecutor> executor = RetryBuilder.>newBuilder() .withStopStrategy(StopStrategies.stopResultStatus()) @@ -104,11 +115,17 @@ public class ScanGroupActor extends AbstractActor { .withFilterStrategy(FilterStrategies.delayLevelFilter()) .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) .withFilterStrategy(FilterStrategies.sceneBlackFilter()) - .withFilterStrategy(FilterStrategies.checkAliveClientPodFilterStrategies()) + .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) + .withFilterStrategy(FilterStrategies.rateLimiterFilter()) .withRetryContext(retryContext) .build(); if (!executor.filter()) { + + // 不触发重试 + ActorRef actorRef = ActorGenerator.noRetryActor(); + actorRef.tell(executor, actorRef); + continue; } @@ -139,6 +156,24 @@ public class ScanGroupActor extends AbstractActor { retryTask.setRetryCount(++retryCount); } + /** + * 获取分配的节点 + */ + public ServerNode getServerNode(RetryTask retryTask) { + + GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(retryTask.getGroupName()); + List serverNodes = serverNodeMapper.selectList(new LambdaQueryWrapper().eq(ServerNode::getGroupName, retryTask.getGroupName())); + + if (CollectionUtils.isEmpty(serverNodes)) { + return null; + } + + ClientLoadBalance clientLoadBalanceRandom = ClientLoadBalanceManager.getClientLoadBalance(groupConfig.getRouteKey()); + + String hostIp = clientLoadBalanceRandom.route(retryTask.getGroupName(), new TreeSet<>(serverNodes.stream().map(ServerNode::getHostIp).collect(Collectors.toSet()))); + return serverNodes.stream().filter(s -> s.getHostIp().equals(hostIp)).findFirst().get(); + } + private void productExecUnitActor(RetryExecutor> retryExecutor) { String groupIdHash = retryExecutor.getRetryContext().getRetryTask().getGroupName(); Long retryId = retryExecutor.getRetryContext().getRetryTask().getId(); diff --git a/x-retry-server/src/main/java/com/x/retry/server/support/strategy/FilterStrategies.java b/x-retry-server/src/main/java/com/x/retry/server/support/strategy/FilterStrategies.java index 5ca4aff83..7cacdcf1e 100644 --- a/x-retry-server/src/main/java/com/x/retry/server/support/strategy/FilterStrategies.java +++ b/x-retry-server/src/main/java/com/x/retry/server/support/strategy/FilterStrategies.java @@ -1,18 +1,21 @@ package com.x.retry.server.support.strategy; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.google.common.util.concurrent.RateLimiter; import com.x.retry.common.core.context.SpringContext; +import com.x.retry.common.core.log.LogUtils; import com.x.retry.server.persistence.mybatis.mapper.ServerNodeMapper; import com.x.retry.server.persistence.mybatis.po.RetryTask; import com.x.retry.server.persistence.mybatis.po.ServerNode; import com.x.retry.server.support.FilterStrategy; import com.x.retry.server.support.IdempotentStrategy; import com.x.retry.server.support.RetryContext; +import com.x.retry.server.support.cache.CacheGroupRateLimiter; import com.x.retry.server.support.context.MaxAttemptsPersistenceRetryContext; -import org.springframework.util.CollectionUtils; import java.time.LocalDateTime; -import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; /** * 生成 {@link FilterStrategy} 实例. @@ -57,13 +60,23 @@ public class FilterStrategies { * * @return {@link CheckAliveClientPodFilterStrategies} 客户端存活POD检查策略 */ - public static FilterStrategy checkAliveClientPodFilterStrategies() { + public static FilterStrategy checkAliveClientPodFilter() { return new CheckAliveClientPodFilterStrategies(); } /** - * 延迟等级的过滤策略 + * 检查分配的客户端是否达到限流阈值 * + * @return {@link RateLimiterFilterStrategies} 检查分配的客户端是否达到限流阈值 + */ + public static FilterStrategy rateLimiterFilter() { + return new RateLimiterFilterStrategies(); + } + + + /** + * 延迟等级的过滤策略 + *

* 根据延迟等级的时间计算下次触发时间是否小于当前时间,满足则返回true 否则返回false */ private static final class DelayLevelFilterStrategies implements FilterStrategy { @@ -84,7 +97,7 @@ public class FilterStrategies { /** * 使用BitSet幂等的过滤策略 - * + *

* 判断BitSet中是否存在,若存在则放回false 否则返回true */ private static final class BitSetIdempotentFilterStrategies implements FilterStrategy { @@ -92,7 +105,7 @@ public class FilterStrategies { private IdempotentStrategy idempotentStrategy; public BitSetIdempotentFilterStrategies(IdempotentStrategy idempotentStrategy) { - this.idempotentStrategy = idempotentStrategy; + this.idempotentStrategy = idempotentStrategy; } @Override @@ -109,7 +122,7 @@ public class FilterStrategies { /** * 场景黑名单策略 - * + *

* 如果重试的数据在黑名单中的则返回false 否则为true */ private static final class SceneBlackFilterStrategies implements FilterStrategy { @@ -134,11 +147,15 @@ public class FilterStrategies { @Override public boolean filter(RetryContext retryContext) { MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryContext; - String groupName = context.getRetryTask().getGroupName(); - ServerNodeMapper serverNodeMapper = SpringContext.getBeanByType(ServerNodeMapper.class); - List serverNodes = serverNodeMapper.selectList(new LambdaQueryWrapper().eq(ServerNode::getGroupName, groupName)); + ServerNode serverNode = context.getServerNode(); - return !CollectionUtils.isEmpty(serverNodes); + if (Objects.isNull(serverNode)) { + return false; + } + + ServerNodeMapper serverNodeMapper = SpringContext.getBeanByType(ServerNodeMapper.class); + + return 1 == serverNodeMapper.selectCount(new LambdaQueryWrapper().eq(ServerNode::getHostId, serverNode.getHostId())); } @Override @@ -147,6 +164,29 @@ public class FilterStrategies { } } + /** + * 检查是否存在存活的客户端POD + */ + private static final class RateLimiterFilterStrategies implements FilterStrategy { + @Override + public boolean filter(RetryContext retryContext) { + MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryContext; + ServerNode serverNode = context.getServerNode(); + + RateLimiter rateLimiter = CacheGroupRateLimiter.getRateLimiterByKey(serverNode.getHostId()); + if (!rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) { + LogUtils.error("该POD:[{}]已到达最大限流阈值,本次重试不执行", serverNode.getHostId()); + return Boolean.FALSE; + } + + return Boolean.TRUE; + } + + @Override + public int order() { + return 4; + } + } } diff --git a/x-retry-server/src/test/java/com/x/retry/server/XRetryServerApplicationTests.java b/x-retry-server/src/test/java/com/x/retry/server/XRetryServerApplicationTests.java index 440589ef0..1bed4da0e 100644 --- a/x-retry-server/src/test/java/com/x/retry/server/XRetryServerApplicationTests.java +++ b/x-retry-server/src/test/java/com/x/retry/server/XRetryServerApplicationTests.java @@ -1,10 +1,37 @@ package com.x.retry.server; +import com.google.common.util.concurrent.RateLimiter; +import lombok.SneakyThrows; import org.springframework.boot.test.context.SpringBootTest; +import java.util.Date; +import java.util.concurrent.TimeUnit; + @SpringBootTest class XRetryServerApplicationTests { + @SneakyThrows + public static void main(String[] args) { + RateLimiter rateLimiter = RateLimiter.create(20); + for (int i = 0; i < 10000; i++) { + int finalI = i; + if (i % 100 == 0) { + Thread.sleep(1000); + } + new Thread(new Runnable() { + @Override + public void run() { + if (!rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) { + System.out.println("短期无法获取令牌,真不幸,排队也瞎排 " + finalI); + } else { + System.out.println(new Date() + " " + finalI); + } + } + }).start(); + } + + Thread.sleep(90000L); + } }