新增限制单点重试功能

This commit is contained in:
byteblogs168 2023-01-14 19:58:44 +08:00
parent 7f88925cbd
commit 64b2046498
10 changed files with 303 additions and 39 deletions

View File

@ -6,6 +6,7 @@ import com.x.retry.common.core.context.SpringContext;
import com.x.retry.server.support.dispatch.actor.exec.ExecUnitActor;
import com.x.retry.server.support.dispatch.actor.result.FailureActor;
import com.x.retry.server.support.dispatch.actor.result.FinishActor;
import com.x.retry.server.support.dispatch.actor.result.NoRetryActor;
import com.x.retry.server.support.dispatch.actor.scan.ScanGroupActor;
/**
@ -36,6 +37,15 @@ public class ActorGenerator {
return getDispatchResultActorSystem().actorOf(getSpringExtension().props(FailureActor.BEAN_NAME));
}
/**
* 不触发重试actor
*
* @return actor 引用
*/
public static ActorRef noRetryActor() {
return getDispatchResultActorSystem().actorOf(getSpringExtension().props(NoRetryActor.BEAN_NAME));
}
/**
* 生成重试执行的actor
*

View File

@ -35,4 +35,9 @@ public class SystemProperties {
*/
private int totalPartition = 32;
/**
* 一个客户端每秒最多接收的重试数量指令
*/
private int limiter = 10;
}

View File

@ -0,0 +1,59 @@
package com.x.retry.server.support.cache;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.RateLimiter;
import com.x.retry.common.core.log.LogUtils;
import com.x.retry.server.support.Lifecycle;
import lombok.Data;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* 缓存组组限流组件
*
* @author www.byteblogs.com
* @date 2022-21-58
* @since 2.0
*/
@Component
@Data
public class CacheGroupRateLimiter implements Lifecycle {
private static Cache<String, RateLimiter> CACHE;
/**
* 获取所有缓存
*
* @return 缓存对象
*/
public static Cache<String, RateLimiter> getAll() {
return CACHE;
}
/**
* 获取所有缓存
*
* @return 缓存对象
*/
public static RateLimiter getRateLimiterByKey(String hostId) {
return CACHE.getIfPresent(hostId);
}
@Override
public void start() {
LogUtils.info("CacheGroupRateLimiter start");
CACHE = CacheBuilder.newBuilder()
// 设置并发级别为cpu核心数
.concurrencyLevel(Runtime.getRuntime().availableProcessors())
.expireAfterWrite(10, TimeUnit.SECONDS)
.expireAfterAccess(10, TimeUnit.SECONDS)
.build();
}
@Override
public void close() {
LogUtils.info("CacheGroupRateLimiter stop");
}
}

View File

@ -1,6 +1,7 @@
package com.x.retry.server.support.context;
import com.x.retry.server.persistence.mybatis.po.RetryTask;
import com.x.retry.server.persistence.mybatis.po.ServerNode;
import com.x.retry.server.support.RetryContext;
import com.x.retry.server.support.WaitStrategy;
import lombok.Data;
@ -36,6 +37,11 @@ public class MaxAttemptsPersistenceRetryContext<V> implements RetryContext<V> {
*/
private Set<String> sceneBlacklist;
/**
* 需要调度的节点
*/
private ServerNode serverNode;
@Override
public void setCallResult(V v) {
this.callResult = v;

View File

@ -2,18 +2,21 @@ package com.x.retry.server.support.dispatch;
import akka.actor.ActorRef;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.util.concurrent.RateLimiter;
import com.x.retry.common.core.enums.NodeTypeEnum;
import com.x.retry.common.core.log.LogUtils;
import com.x.retry.common.core.util.HostUtils;
import com.google.common.cache.Cache;
import com.x.retry.common.core.util.JsonUtil;
import com.x.retry.server.akka.ActorGenerator;
import com.x.retry.server.config.SystemProperties;
import com.x.retry.server.support.allocate.server.AllocateMessageQueueConsistentHash;
import com.x.retry.server.persistence.mybatis.mapper.ServerNodeMapper;
import com.x.retry.server.persistence.mybatis.po.GroupConfig;
import com.x.retry.server.persistence.mybatis.po.ServerNode;
import com.x.retry.server.persistence.support.ConfigAccess;
import com.x.retry.server.support.Lifecycle;
import com.x.retry.server.support.cache.CacheGroupRateLimiter;
import com.x.retry.server.support.cache.CacheGroupScanActor;
import com.x.retry.server.support.handler.ServerRegisterNodeHandler;
import org.springframework.beans.factory.annotation.Autowired;
@ -67,6 +70,9 @@ public class DispatchService implements Lifecycle {
@Qualifier("configAccessProcessor")
private ConfigAccess configAccess;
@Autowired
private SystemProperties systemProperties;
@Override
public void start() {
@ -77,9 +83,8 @@ public class DispatchService implements Lifecycle {
LogUtils.info("当前节点[{}] 分配的组:[{}]", HostUtils.getIp(), JsonUtil.toJsonString(currentHostGroupList));
if (!CollectionUtils.isEmpty(currentHostGroupList)) {
Cache<String, ActorRef> actorRefCache = CacheGroupScanActor.getAll();
for (GroupConfig groupConfigContext : currentHostGroupList) {
produceScanActorTask(actorRefCache, groupConfigContext);
produceScanActorTask(groupConfigContext);
}
}
@ -94,20 +99,51 @@ public class DispatchService implements Lifecycle {
/**
* 扫描任务生成器
*
* @param actorRefCache 扫描任务actor缓存器
* @param groupConfig {@link GroupConfig} 组上下文
*/
private void produceScanActorTask(Cache<String, ActorRef> actorRefCache, GroupConfig groupConfig) {
private void produceScanActorTask(GroupConfig groupConfig) {
String groupName = groupConfig.getGroupName();
ActorRef scanActorRef = cacheActorRef(groupName);
// 缓存按照
cacheRateLimiter(groupName);
// rebalance group scan 流程合一
scanActorRef.tell(groupConfig, scanActorRef);
}
/**
* 缓存限流对象
*/
private void cacheRateLimiter(String groupName) {
List<ServerNode> serverNodes = serverNodeMapper.selectList(new LambdaQueryWrapper<ServerNode>()
.eq(ServerNode::getGroupName, groupName));
Cache<String, RateLimiter> rateLimiterCache = CacheGroupRateLimiter.getAll();
for (ServerNode serverNode : serverNodes) {
RateLimiter rateLimiter = rateLimiterCache.getIfPresent(serverNode.getHostId());
if (Objects.isNull(rateLimiter)) {
rateLimiterCache.put(groupName, RateLimiter.create(systemProperties.getLimiter()));
}
}
rateLimiterCache.invalidateAll();
}
/**
* 缓存Actor对象
*/
private ActorRef cacheActorRef(String groupName) {
Cache<String, ActorRef> actorRefCache = CacheGroupScanActor.getAll();
ActorRef scanActorRef = actorRefCache.getIfPresent(groupName);
if (Objects.isNull(scanActorRef)) {
scanActorRef = ActorGenerator.scanGroupActor();
// 缓存扫描器actor
actorRefCache.put(groupName, scanActorRef);
}
// rebalance group scan 流程合一
scanActorRef.tell(groupConfig, scanActorRef);
return scanActorRef;
}
/**

View File

@ -2,6 +2,7 @@ package com.x.retry.server.support.dispatch.actor.exec;
import akka.actor.AbstractActor;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.util.concurrent.RateLimiter;
import com.x.retry.client.model.DispatchRetryDTO;
import com.x.retry.client.model.DispatchRetryResultDTO;
import com.x.retry.common.core.log.LogUtils;
@ -19,6 +20,8 @@ import com.x.retry.server.persistence.support.ConfigAccess;
import com.x.retry.server.support.ClientLoadBalance;
import com.x.retry.server.support.IdempotentStrategy;
import com.x.retry.server.support.allocate.client.ClientLoadBalanceManager;
import com.x.retry.server.support.cache.CacheGroupRateLimiter;
import com.x.retry.server.support.context.MaxAttemptsPersistenceRetryContext;
import com.x.retry.server.support.retry.RetryExecutor;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.BeanUtils;
@ -36,6 +39,7 @@ import java.util.List;
import java.util.Objects;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
@ -55,21 +59,11 @@ public class ExecUnitActor extends AbstractActor {
@Autowired
@Qualifier("bitSetIdempotentStrategyHandler")
private IdempotentStrategy<String, Integer> idempotentStrategy;
@Autowired
private RetryTaskLogMapper retryTaskLogMapper;
@Autowired
private ServerNodeMapper serverNodeMapper;
@Autowired
private RestTemplate restTemplate;
@Autowired
@Qualifier("configAccessProcessor")
private ConfigAccess configAccess;
@Override
public Receive createReceive() {
return receiveBuilder().match(RetryExecutor.class, retryExecutor -> {
@ -77,13 +71,14 @@ public class ExecUnitActor extends AbstractActor {
RetryTaskLog retryTaskLog = new RetryTaskLog();
retryTaskLog.setErrorMessage(StringUtils.EMPTY);
RetryTask retryTask = retryExecutor.getRetryContext().getRetryTask();
MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryExecutor.getRetryContext();
RetryTask retryTask = context.getRetryTask();
ServerNode serverNode = context.getServerNode();
try {
List<ServerNode> serverNodes = serverNodeMapper.selectList(new LambdaQueryWrapper<ServerNode>().eq(ServerNode::getGroupName, retryTask.getGroupName()));
if (!CollectionUtils.isEmpty(serverNodes)) {
Object call = retryExecutor.call((Callable<Result<DispatchRetryResultDTO>>) () -> callClient(retryTask, retryTaskLog, serverNodes));
if (Objects.nonNull(serverNode)) {
Object call = retryExecutor.call((Callable<Result<DispatchRetryResultDTO>>) () -> callClient(retryTask, retryTaskLog, serverNode));
} else {
retryTaskLog.setErrorMessage("暂无可用的客户端POD");
}
@ -115,13 +110,7 @@ public class ExecUnitActor extends AbstractActor {
* @param retryTask {@link RetryTask} 需要重试的数据
* @return 重试结果返回值
*/
private Result<DispatchRetryResultDTO> callClient(RetryTask retryTask, RetryTaskLog retryTaskLog, List<ServerNode> serverNodes) {
GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(retryTask.getGroupName());
ClientLoadBalance clientLoadBalanceRandom = ClientLoadBalanceManager.getClientLoadBalance(groupConfig.getRouteKey());
String hostIp = clientLoadBalanceRandom.route(retryTask.getGroupName(), new TreeSet<>(serverNodes.stream().map(ServerNode::getHostIp).collect(Collectors.toSet())));
ServerNode serverNode = serverNodes.stream().filter(s -> s.getHostIp().equals(hostIp)).findFirst().get();
private Result<DispatchRetryResultDTO> callClient(RetryTask retryTask, RetryTaskLog retryTaskLog, ServerNode serverNode) {
DispatchRetryDTO dispatchRetryDTO = new DispatchRetryDTO();
dispatchRetryDTO.setBizId(retryTask.getBizId());

View File

@ -0,0 +1,57 @@
package com.x.retry.server.support.dispatch.actor.result;
import akka.actor.AbstractActor;
import cn.hutool.json.JSON;
import com.x.retry.common.core.log.LogUtils;
import com.x.retry.common.core.util.JsonUtil;
import com.x.retry.server.persistence.mybatis.po.RetryTask;
import com.x.retry.server.persistence.support.RetryTaskAccess;
import com.x.retry.server.support.WaitStrategy;
import com.x.retry.server.support.context.MaxAttemptsPersistenceRetryContext;
import com.x.retry.server.support.retry.RetryExecutor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
/**
* 不重试,只更新下次触发时间
*
* @author: shuguang.zhang
* @date : 2022-04-14 16:11
*/
@Component("NoRetryActor")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class NoRetryActor extends AbstractActor {
public static final String BEAN_NAME = "NoRetryActor";
@Autowired
@Qualifier("retryTaskAccessProcessor")
private RetryTaskAccess<RetryTask> retryTaskAccess;
@Override
public Receive createReceive() {
return receiveBuilder().match(RetryExecutor.class, retryExecutor -> {
MaxAttemptsPersistenceRetryContext retryContext = (MaxAttemptsPersistenceRetryContext) retryExecutor.getRetryContext();
RetryTask retryTask = retryContext.getRetryTask();
WaitStrategy waitStrategy = retryContext.getWaitStrategy();
retryTask.setNextTriggerAt(waitStrategy.computeRetryTime(retryContext));
// 不更新重试次数
retryTask.setRetryCount(null);
try {
retryTaskAccess.updateRetryTask(retryTask);
}catch (Exception e) {
LogUtils.error("更新重试任务失败", e);
} finally {
// 更新DB状态
getContext().stop(getSelf());
}
}).build();
}
}

View File

@ -2,18 +2,23 @@ package com.x.retry.server.support.dispatch.actor.scan;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.x.retry.common.core.log.LogUtils;
import com.x.retry.client.model.DispatchRetryResultDTO;
import com.x.retry.common.core.model.Result;
import com.x.retry.server.akka.ActorGenerator;
import com.x.retry.server.config.SystemProperties;
import com.x.retry.server.persistence.mybatis.mapper.ServerNodeMapper;
import com.x.retry.server.persistence.mybatis.po.GroupConfig;
import com.x.retry.server.persistence.mybatis.po.RetryTask;
import com.x.retry.server.persistence.mybatis.po.SceneConfig;
import com.x.retry.server.persistence.mybatis.po.ServerNode;
import com.x.retry.server.persistence.support.ConfigAccess;
import com.x.retry.server.persistence.support.RetryTaskAccess;
import com.x.retry.server.support.ClientLoadBalance;
import com.x.retry.server.support.IdempotentStrategy;
import com.x.retry.server.support.WaitStrategy;
import com.x.retry.server.support.allocate.client.ClientLoadBalanceManager;
import com.x.retry.server.support.context.MaxAttemptsPersistenceRetryContext;
import com.x.retry.server.support.dispatch.DispatchService;
import com.x.retry.server.support.retry.RetryBuilder;
@ -30,6 +35,8 @@ import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.List;
import java.util.TreeSet;
import java.util.stream.Collectors;
/**
* @author www.byteblogs.com
@ -55,6 +62,9 @@ public class ScanGroupActor extends AbstractActor {
@Qualifier("configAccessProcessor")
private ConfigAccess configAccess;
@Autowired
private ServerNodeMapper serverNodeMapper;
public static final String BEAN_NAME = "ScanGroupActor";
@Override
@ -97,6 +107,7 @@ public class ScanGroupActor extends AbstractActor {
MaxAttemptsPersistenceRetryContext<Result<DispatchRetryResultDTO>> retryContext = new MaxAttemptsPersistenceRetryContext<>();
retryContext.setRetryTask(retryTask);
retryContext.setSceneBlacklist(configAccess.getBlacklist(groupName));
retryContext.setServerNode(getServerNode(retryTask));
RetryExecutor<Result<DispatchRetryResultDTO>> executor = RetryBuilder.<Result<DispatchRetryResultDTO>>newBuilder()
.withStopStrategy(StopStrategies.stopResultStatus())
@ -104,11 +115,17 @@ public class ScanGroupActor extends AbstractActor {
.withFilterStrategy(FilterStrategies.delayLevelFilter())
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
.withFilterStrategy(FilterStrategies.sceneBlackFilter())
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilterStrategies())
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilter())
.withFilterStrategy(FilterStrategies.rateLimiterFilter())
.withRetryContext(retryContext)
.build();
if (!executor.filter()) {
// 不触发重试
ActorRef actorRef = ActorGenerator.noRetryActor();
actorRef.tell(executor, actorRef);
continue;
}
@ -139,6 +156,24 @@ public class ScanGroupActor extends AbstractActor {
retryTask.setRetryCount(++retryCount);
}
/**
* 获取分配的节点
*/
public ServerNode getServerNode(RetryTask retryTask) {
GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(retryTask.getGroupName());
List<ServerNode> serverNodes = serverNodeMapper.selectList(new LambdaQueryWrapper<ServerNode>().eq(ServerNode::getGroupName, retryTask.getGroupName()));
if (CollectionUtils.isEmpty(serverNodes)) {
return null;
}
ClientLoadBalance clientLoadBalanceRandom = ClientLoadBalanceManager.getClientLoadBalance(groupConfig.getRouteKey());
String hostIp = clientLoadBalanceRandom.route(retryTask.getGroupName(), new TreeSet<>(serverNodes.stream().map(ServerNode::getHostIp).collect(Collectors.toSet())));
return serverNodes.stream().filter(s -> s.getHostIp().equals(hostIp)).findFirst().get();
}
private void productExecUnitActor(RetryExecutor<Result<DispatchRetryResultDTO>> retryExecutor) {
String groupIdHash = retryExecutor.getRetryContext().getRetryTask().getGroupName();
Long retryId = retryExecutor.getRetryContext().getRetryTask().getId();

View File

@ -1,18 +1,21 @@
package com.x.retry.server.support.strategy;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.util.concurrent.RateLimiter;
import com.x.retry.common.core.context.SpringContext;
import com.x.retry.common.core.log.LogUtils;
import com.x.retry.server.persistence.mybatis.mapper.ServerNodeMapper;
import com.x.retry.server.persistence.mybatis.po.RetryTask;
import com.x.retry.server.persistence.mybatis.po.ServerNode;
import com.x.retry.server.support.FilterStrategy;
import com.x.retry.server.support.IdempotentStrategy;
import com.x.retry.server.support.RetryContext;
import com.x.retry.server.support.cache.CacheGroupRateLimiter;
import com.x.retry.server.support.context.MaxAttemptsPersistenceRetryContext;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* 生成 {@link FilterStrategy} 实例.
@ -57,13 +60,23 @@ public class FilterStrategies {
*
* @return {@link CheckAliveClientPodFilterStrategies} 客户端存活POD检查策略
*/
public static FilterStrategy checkAliveClientPodFilterStrategies() {
public static FilterStrategy checkAliveClientPodFilter() {
return new CheckAliveClientPodFilterStrategies();
}
/**
* 延迟等级的过滤策略
* 检查分配的客户端是否达到限流阈值
*
* @return {@link RateLimiterFilterStrategies} 检查分配的客户端是否达到限流阈值
*/
public static FilterStrategy rateLimiterFilter() {
return new RateLimiterFilterStrategies();
}
/**
* 延迟等级的过滤策略
* <p>
* 根据延迟等级的时间计算下次触发时间是否小于当前时间满足则返回true 否则返回false
*/
private static final class DelayLevelFilterStrategies implements FilterStrategy {
@ -84,7 +97,7 @@ public class FilterStrategies {
/**
* 使用BitSet幂等的过滤策略
*
* <p>
* 判断BitSet中是否存在若存在则放回false 否则返回true
*/
private static final class BitSetIdempotentFilterStrategies implements FilterStrategy {
@ -92,7 +105,7 @@ public class FilterStrategies {
private IdempotentStrategy<String, Integer> idempotentStrategy;
public BitSetIdempotentFilterStrategies(IdempotentStrategy<String, Integer> idempotentStrategy) {
this.idempotentStrategy = idempotentStrategy;
this.idempotentStrategy = idempotentStrategy;
}
@Override
@ -109,7 +122,7 @@ public class FilterStrategies {
/**
* 场景黑名单策略
*
* <p>
* 如果重试的数据在黑名单中的则返回false 否则为true
*/
private static final class SceneBlackFilterStrategies implements FilterStrategy {
@ -134,11 +147,15 @@ public class FilterStrategies {
@Override
public boolean filter(RetryContext retryContext) {
MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryContext;
String groupName = context.getRetryTask().getGroupName();
ServerNodeMapper serverNodeMapper = SpringContext.getBeanByType(ServerNodeMapper.class);
List<ServerNode> serverNodes = serverNodeMapper.selectList(new LambdaQueryWrapper<ServerNode>().eq(ServerNode::getGroupName, groupName));
ServerNode serverNode = context.getServerNode();
return !CollectionUtils.isEmpty(serverNodes);
if (Objects.isNull(serverNode)) {
return false;
}
ServerNodeMapper serverNodeMapper = SpringContext.getBeanByType(ServerNodeMapper.class);
return 1 == serverNodeMapper.selectCount(new LambdaQueryWrapper<ServerNode>().eq(ServerNode::getHostId, serverNode.getHostId()));
}
@Override
@ -147,6 +164,29 @@ public class FilterStrategies {
}
}
/**
* 检查是否存在存活的客户端POD
*/
private static final class RateLimiterFilterStrategies implements FilterStrategy {
@Override
public boolean filter(RetryContext retryContext) {
MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryContext;
ServerNode serverNode = context.getServerNode();
RateLimiter rateLimiter = CacheGroupRateLimiter.getRateLimiterByKey(serverNode.getHostId());
if (!rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) {
LogUtils.error("该POD:[{}]已到达最大限流阈值,本次重试不执行", serverNode.getHostId());
return Boolean.FALSE;
}
return Boolean.TRUE;
}
@Override
public int order() {
return 4;
}
}
}

View File

@ -1,10 +1,37 @@
package com.x.retry.server;
import com.google.common.util.concurrent.RateLimiter;
import lombok.SneakyThrows;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Date;
import java.util.concurrent.TimeUnit;
@SpringBootTest
class XRetryServerApplicationTests {
@SneakyThrows
public static void main(String[] args) {
RateLimiter rateLimiter = RateLimiter.create(20);
for (int i = 0; i < 10000; i++) {
int finalI = i;
if (i % 100 == 0) {
Thread.sleep(1000);
}
new Thread(new Runnable() {
@Override
public void run() {
if (!rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) {
System.out.println("短期无法获取令牌,真不幸,排队也瞎排 " + finalI);
} else {
System.out.println(new Date() + " " + finalI);
}
}
}).start();
}
Thread.sleep(90000L);
}
}