feat:2.4.0

1. 通过bucket来实现负载均衡
This commit is contained in:
byteblogs168 2023-09-21 23:50:54 +08:00
parent caae7f54ee
commit 28b488135b
17 changed files with 231 additions and 426 deletions

View File

@ -15,6 +15,7 @@ public class ActorGenerator {
public static final String SCAN_CALLBACK_GROUP_ACTOR = "ScanCallbackGroupActor";
public static final String SCAN_RETRY_GROUP_ACTOR = "ScanGroupActor";
public static final String SCAN_BUCKET_ACTOR = "ScanBucketActor";
public static final String FINISH_ACTOR = "FinishActor";
public static final String FAILURE_ACTOR = "FailureActor";
public static final String NO_RETRY_ACTOR = "NoRetryActor";
@ -89,6 +90,15 @@ public class ActorGenerator {
return getDispatchRetryActorSystem().actorOf(getSpringExtension().props(SCAN_CALLBACK_GROUP_ACTOR));
}
/**
* 生成扫描重试数据的actor
*
* @return actor 引用
*/
public static ActorRef scanBucketActor() {
return getDispatchRetryActorSystem().actorOf(getSpringExtension().props(SCAN_BUCKET_ACTOR));
}
/**
* 生成扫描重试数据的actor
*

View File

@ -1,74 +0,0 @@
package com.aizuda.easy.retry.server.common.cache;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
/**
* 当前POD负责消费的组
*
* @author www.byteblogs.com
* @date 2021-10-30
* @since 1.6.0
*/
@Component
@Slf4j
public class CacheConsumerBucketGroup implements Lifecycle {
private volatile List<Integer> bucketList = new ArrayList<>();
/**
* 获取所有缓存
*
* @return 缓存对象
*/
public static Set<String> getAllConsumerGroupName() {
ConcurrentMap<String, String> concurrentMap = CACHE.asMap();
return new HashSet<>(concurrentMap.values());
}
/**
* 无缓存时添加
* 有缓存时更新
*
* @return 缓存对象
*/
public static synchronized void addOrUpdate(String groupName) {
LogUtils.info(log, "add consumer cache. groupName:[{}]", groupName);
CACHE.put(groupName, groupName);
}
public static void remove(String groupName) {
LogUtils.info(log, "Remove consumer cache. groupName:[{}]", groupName);
CACHE.invalidate(groupName);
}
public static void clear() {
CACHE.invalidateAll();
}
@Override
public void start() {
LogUtils.info(log, "CacheRegisterTable start");
CACHE = CacheBuilder.newBuilder()
// 设置并发级别为cpu核心数
.concurrencyLevel(Runtime.getRuntime().availableProcessors())
.build();
}
@Override
public void close() {
LogUtils.info(log, "CacheRegisterTable stop");
CACHE.invalidateAll();
}
}

View File

@ -20,6 +20,7 @@ import java.util.concurrent.ConcurrentMap;
*/
@Component
@Slf4j
@Deprecated
public class CacheConsumerGroup implements Lifecycle {
private static Cache<String /*groupName*/, String/*groupName*/> CACHE;
@ -29,32 +30,13 @@ public class CacheConsumerGroup implements Lifecycle {
*
* @return 缓存对象
*/
@Deprecated
public static Set<String> getAllConsumerGroupName() {
ConcurrentMap<String, String> concurrentMap = CACHE.asMap();
return new HashSet<>(concurrentMap.values());
}
/**
* 无缓存时添加
* 有缓存时更新
*
* @return 缓存对象
*/
public static synchronized void addOrUpdate(String groupName) {
LogUtils.info(log, "add consumer cache. groupName:[{}]", groupName);
CACHE.put(groupName, groupName);
}
public static void remove(String groupName) {
LogUtils.info(log, "Remove consumer cache. groupName:[{}]", groupName);
CACHE.invalidate(groupName);
}
public static void clear() {
CACHE.invalidateAll();
}
@Override
public void start() {
LogUtils.info(log, "CacheRegisterTable start");

View File

@ -1,81 +0,0 @@
package com.aizuda.easy.retry.server.common.cache;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.Collections;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentMap;
/**
* 组注册表
*
* @author www.byteblogs.com
* @date 2021-10-30
* @since 1.6.0
*/
@Component
@Slf4j
public class CacheGroup implements Lifecycle {
private static Cache<String/*groupName*/, String/*groupName*/> CACHE;
/**
* 获取所有缓存
*
* @return 缓存对象
*/
public static Set<String> getAllGroup() {
ConcurrentMap<String, String> concurrentMap = CACHE.asMap();
if (CollectionUtils.isEmpty(concurrentMap)) {
return Collections.EMPTY_SET;
}
return new TreeSet<>(concurrentMap.values());
}
/**
* 获取所有缓存
*
* @return 缓存对象
*/
public static String get(String hostId) {
return CACHE.getIfPresent(hostId);
}
/**
* 无缓存时添加
* 有缓存时更新
*
* @return 缓存对象
*/
public static synchronized void addOrUpdate(String groupName) {
CACHE.put(groupName, groupName);
}
public static void remove(String groupName) {
CACHE.invalidate(groupName);
}
@Override
public void start() {
LogUtils.info(log, "CacheGroup start");
CACHE = CacheBuilder.newBuilder()
// 设置并发级别为cpu核心数
.concurrencyLevel(Runtime.getRuntime().availableProcessors())
.build();
}
@Override
public void close() {
LogUtils.info(log, "CacheGroup stop");
CACHE.invalidateAll();
}
}

View File

@ -0,0 +1,38 @@
package com.aizuda.easy.retry.server.common.dto;
import java.util.List;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* www.byteblogs.com
*
* @author: shuguang.zhang
* @date : 2023-09-21 09:26
*/
public class DistributeInstance {
private DistributeInstance() {
}
public static final DistributeInstance INSTANCE = new DistributeInstance();
/**
* 控制rebalance状态
*/
public static final AtomicBoolean RE_BALANCE_ING = new AtomicBoolean(Boolean.FALSE);
private final CopyOnWriteArraySet<Integer> CONSUMER_BUCKETS = new CopyOnWriteArraySet<>();
public CopyOnWriteArraySet<Integer> getConsumerBucket() {
return CONSUMER_BUCKETS;
}
public void setConsumerBucket(List<Integer> buckets) {
CONSUMER_BUCKETS.addAll(buckets);
}
public void clearConsumerBucket() {
CONSUMER_BUCKETS.clear();
}
}

View File

@ -1,15 +0,0 @@
package com.aizuda.easy.retry.server.common.handler;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* www.byteblogs.com
*
* @author: shuguang.zhang
* @date : 2023-09-21 09:26
*/
public class DistributeInstance {
private CopyOnWriteArraySet<Integer> CONSUMER_BUCKET = new CopyOnWriteArraySet<>();
}

View File

@ -4,10 +4,8 @@ import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.allocate.server.AllocateMessageQueueAveragely;
import com.aizuda.easy.retry.server.common.allocate.server.AllocateMessageQueueConsistentHash;
import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup;
import com.aizuda.easy.retry.server.common.cache.CacheGroup;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.DistributeInstance;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.register.ServerRegister;
@ -16,7 +14,6 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.ServerNodeMa
import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -30,9 +27,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* 负责处理组或者节点变化时重新分配组在不同的节点上消费
@ -61,14 +56,9 @@ public class ServerNodeBalance implements Lifecycle, Runnable {
private List<Integer> bucketList;
/**
* 控制rebalance状态
*/
public static final AtomicBoolean RE_BALANCE_ING = new AtomicBoolean(Boolean.FALSE);
public void doBalance() {
LogUtils.info(log, "rebalance start");
RE_BALANCE_ING.set(Boolean.TRUE);
DistributeInstance.RE_BALANCE_ING.set(Boolean.TRUE);
try {
@ -79,14 +69,9 @@ public class ServerNodeBalance implements Lifecycle, Runnable {
LogUtils.error(log, "server node is empty");
}
Set<String> allGroup = CacheGroup.getAllGroup();
if (CollectionUtils.isEmpty(allGroup)) {
LogUtils.error(log, "group is empty");
}
// 删除本地缓存的所有组信息
CacheConsumerGroup.clear();
if(CollectionUtils.isEmpty(podIpSet) || CollectionUtils.isEmpty(allGroup)) {
// 删除本地缓存的消费桶的信息
DistributeInstance.INSTANCE.clearConsumerBucket();
if(CollectionUtils.isEmpty(podIpSet)) {
return;
}
@ -94,15 +79,13 @@ public class ServerNodeBalance implements Lifecycle, Runnable {
.allocate(ServerRegister.CURRENT_CID, bucketList, new ArrayList<>(podIpSet));
// 重新覆盖本地分配的组信息
for (String groupName : allocate) {
CacheConsumerGroup.addOrUpdate(groupName);
}
DistributeInstance.INSTANCE.setConsumerBucket(allocate);
LogUtils.info(log, "rebalance complete. allocate:[{}]", allocate);
} catch (Exception e) {
LogUtils.error(log, "rebalance error. ", e);
} finally {
RE_BALANCE_ING.set(Boolean.FALSE);
DistributeInstance.RE_BALANCE_ING.set(Boolean.FALSE);
}
}
@ -128,25 +111,6 @@ public class ServerNodeBalance implements Lifecycle, Runnable {
RegisterNodeInfo registerNodeInfo = concurrentMap.get(localHostId);
// 删除过期的节点信息
CacheRegisterTable.remove(registerNodeInfo.getGroupName(), registerNodeInfo.getHostId());
// 删除本地消费组信息
CacheConsumerGroup.remove(registerNodeInfo.getGroupName());
}
}
private void refreshAndRemoveGroup(List<GroupConfig> removeGroupConfig, final Set<String> allGroup) {
if (allGroup.size() != removeGroupConfig.size()) {
allGroup.removeAll(removeGroupConfig.stream()
.map(GroupConfig::getGroupName).collect(Collectors.toSet()));
// 删除已关闭的组
for (String groupName : allGroup) {
CacheGroup.remove(groupName);
}
// 添加组
for (final GroupConfig groupConfig : removeGroupConfig) {
CacheGroup.addOrUpdate(groupConfig.getGroupName());
}
}
}
@ -210,24 +174,16 @@ public class ServerNodeBalance implements Lifecycle, Runnable {
Set<String> localHostIds = concurrentMap.values().stream().map(RegisterNodeInfo::getHostId)
.collect(Collectors.toSet());
List<GroupConfig> removeGroupConfig = accessTemplate.getGroupConfigAccess().getAllOpenGroupConfig();
Set<String> allGroup = CacheGroup.getAllGroup();
// 无缓存的节点触发refreshCache
if (CollectionUtils.isEmpty(concurrentMap)
// 节点数量不一致触发
|| isNodeSizeNotEqual(concurrentMap.size(), remotePods.size())
// 若存在远程和本地缓存的组的数量不一致则触发rebalance
|| isGroupSizeNotEqual(removeGroupConfig, allGroup)
// 判断远程节点是不是和本地节点一致的如果不一致则重新分配
|| isNodeNotMatch(remoteHostIds, localHostIds)) {
// 删除本地缓存以下线的节点信息
removeNode(concurrentMap, remoteHostIds, localHostIds);
// 刷新组配置和删除已关闭的组
refreshAndRemoveGroup(removeGroupConfig, allGroup);
// 重新获取DB中最新的服务信息
refreshCache(remotePods);
@ -253,8 +209,6 @@ public class ServerNodeBalance implements Lifecycle, Runnable {
for (final RegisterNodeInfo registerNodeInfo : expireNodeSet) {
// 删除过期的节点信息
CacheRegisterTable.remove(registerNodeInfo.getGroupName(), registerNodeInfo.getHostId());
// 删除本地消费组信息
CacheConsumerGroup.remove(registerNodeInfo.getGroupName());
}
}

View File

@ -81,17 +81,6 @@ public class ClientRegister extends AbstractRegister implements Runnable {
refreshExpireAt(serverNode);
}
// 同步当前POD消费的组的节点信息
// netty的client只会注册到一个服务端若组分配的和client连接的不是一个POD则会导致当前POD没有其他客户端的注册信息
Set<String> allConsumerGroupName = CacheConsumerGroup.getAllConsumerGroupName();
if (!CollectionUtils.isEmpty(allConsumerGroupName)) {
List<ServerNode> serverNodes = serverNodeMapper.selectList(
new LambdaQueryWrapper<ServerNode>().in(ServerNode::getGroupName, allConsumerGroupName));
for (final ServerNode node : serverNodes) {
// 刷新全量本地缓存
CacheRegisterTable.addOrUpdate(node.getGroupName(), node);
}
}
}catch (InterruptedException e) {
LogUtils.info(log, "[{}] thread stop.", Thread.currentThread().getName());
} catch (Exception e) {

View File

@ -20,17 +20,17 @@ import org.springframework.stereotype.Component;
@Component
@Data
@Slf4j
public class CacheGroupScanActor implements Lifecycle {
public class CacheBucketActor implements Lifecycle {
private static Cache<String, ActorRef> CACHE;
private static Cache<Integer, ActorRef> CACHE;
/**
* 获取所有缓存
*
* @return 缓存对象
*/
public static ActorRef get(String groupName, TaskTypeEnum typeEnum) {
return CACHE.getIfPresent(groupName.concat(typeEnum.name()));
public static ActorRef get(Integer bucket) {
return CACHE.getIfPresent(bucket);
}
/**
@ -38,8 +38,8 @@ public class CacheGroupScanActor implements Lifecycle {
*
* @return 缓存对象
*/
public static void put(String groupName, TaskTypeEnum typeEnum, ActorRef actorRef) {
CACHE.put(groupName.concat(typeEnum.name()), actorRef);
public static void put(Integer bucket, ActorRef actorRef) {
CACHE.put(bucket, actorRef);
}
@Override

View File

@ -1,156 +0,0 @@
package com.aizuda.easy.retry.server.retry.task.support.dispatch;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup;
import com.aizuda.easy.retry.server.retry.task.support.cache.CacheGroupRateLimiter;
import com.aizuda.easy.retry.server.retry.task.support.cache.CacheGroupScanActor;
import com.aizuda.easy.retry.server.common.handler.ServerNodeBalance;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.ServerNodeMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.cache.Cache;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 分发器组件
*
* @author: www.byteblogs.com
* @date : 2021-11-19 15:46
* @since 1.6.0
*/
@Component
@Slf4j
public class DispatchService implements Lifecycle {
/**
* 分配器线程
*/
private final ScheduledExecutorService dispatchService = Executors
.newSingleThreadScheduledExecutor(r -> new Thread(r, "DispatchService"));
/**
* 调度时长
*/
public static final Long PERIOD = 10L;
/**
* 延迟10s为了尽可能保障集群节点都启动完成在进行rebalance
*/
public static final Long INITIAL_DELAY = 10L;
@Autowired
private ServerNodeMapper serverNodeMapper;
@Autowired
private SystemProperties systemProperties;
@Override
public void start() {
dispatchService.scheduleAtFixedRate(() -> {
try {
// 当正在rebalance时延迟10s尽量等待所有节点都完成rebalance
if (ServerNodeBalance.RE_BALANCE_ING.get()) {
LogUtils.info(log, "正在rebalance中....");
TimeUnit.SECONDS.sleep(INITIAL_DELAY);
}
Set<String> currentHostGroupList = getCurrentHostGroupList();
LogUtils.info(log, "当前分配的组:[{}]", currentHostGroupList);
if (!CollectionUtils.isEmpty(currentHostGroupList)) {
for (String groupName : currentHostGroupList) {
ScanTaskDTO scanTaskDTO = new ScanTaskDTO();
scanTaskDTO.setGroupName(groupName);
produceScanActorTask(scanTaskDTO);
}
}
} catch (Exception e) {
LogUtils.error(log, "分发异常", e);
}
}, INITIAL_DELAY, PERIOD, TimeUnit.SECONDS);
}
/**
* 扫描任务生成器
*
* @param scanTaskDTO {@link GroupConfig} 组上下文
*/
private void produceScanActorTask(ScanTaskDTO scanTaskDTO) {
String groupName = scanTaskDTO.getGroupName();
// 缓存按照
cacheRateLimiter(groupName);
// 扫描重试数据
ActorRef scanRetryActorRef = cacheActorRef(groupName, TaskTypeEnum.RETRY);
scanRetryActorRef.tell(scanTaskDTO, scanRetryActorRef);
// 扫描回调数据
ActorRef scanCallbackActorRef = cacheActorRef(groupName, TaskTypeEnum.CALLBACK);
scanCallbackActorRef.tell(scanTaskDTO, scanCallbackActorRef);
}
/**
* 缓存限流对象
*/
private void cacheRateLimiter(String groupName) {
List<ServerNode> serverNodes = serverNodeMapper.selectList(new LambdaQueryWrapper<ServerNode>()
.eq(ServerNode::getGroupName, groupName));
Cache<String, RateLimiter> rateLimiterCache = CacheGroupRateLimiter.getAll();
for (ServerNode serverNode : serverNodes) {
RateLimiter rateLimiter = rateLimiterCache.getIfPresent(serverNode.getHostId());
if (Objects.isNull(rateLimiter)) {
rateLimiterCache.put(serverNode.getHostId(), RateLimiter.create(systemProperties.getLimiter()));
}
}
}
/**
* 缓存Actor对象
*/
private ActorRef cacheActorRef(String groupName, TaskTypeEnum typeEnum) {
ActorRef scanActorRef = CacheGroupScanActor.get(groupName, typeEnum);
if (Objects.isNull(scanActorRef)) {
scanActorRef = typeEnum.getActorRef().get();
// 缓存扫描器actor
CacheGroupScanActor.put(groupName, typeEnum, scanActorRef);
}
return scanActorRef;
}
/**
* 分配当前POD负责的组 RebalanceGroup
*
* @return {@link GroupConfig} 组上下文
*/
private Set<String> getCurrentHostGroupList() {
return CacheConsumerGroup.getAllConsumerGroupName();
}
@Override
public void close() {
}
}

View File

@ -8,8 +8,7 @@ import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy;
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.DispatchService;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.ScanTaskDTO;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.ScanTask;
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
@ -47,7 +46,7 @@ public abstract class AbstractScanGroup extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder().match(ScanTaskDTO.class, config -> {
return receiveBuilder().match(ScanTask.class, config -> {
try {
doScan(config);
@ -59,11 +58,11 @@ public abstract class AbstractScanGroup extends AbstractActor {
}
protected void doScan(final ScanTaskDTO scanTaskDTO) {
protected void doScan(final ScanTask scanTask) {
LocalDateTime lastAt = LocalDateTime.now().minusDays(systemProperties.getLastDays());
int retryPullPageSize = systemProperties.getRetryPullPageSize();
String groupName = scanTaskDTO.getGroupName();
String groupName = scanTask.getGroupName();
Long lastId = Optional.ofNullable(getLastId(groupName)).orElse(0L);
// 扫描当前Group 待处理的任务
@ -72,7 +71,7 @@ public abstract class AbstractScanGroup extends AbstractActor {
if (!CollectionUtils.isEmpty(list)) {
// 更新拉取的最大的id
putLastId(scanTaskDTO.getGroupName(), list.get(list.size() - 1).getId());
putLastId(scanTask.getGroupName(), list.get(list.size() - 1).getId());
for (RetryTask retryTask : list) {

View File

@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentMap;
@Slf4j
public class ScanGroupActor extends AbstractScanGroup {
public static final String BEAN_NAME = "ScanGroupActor";
/**
* 缓存待拉取数据的起点id
* <p>

View File

@ -4,11 +4,11 @@ import cn.hutool.core.lang.Pair;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.dto.DistributeInstance;
import com.aizuda.easy.retry.server.retry.task.support.FilterStrategy;
import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy;
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
import com.aizuda.easy.retry.server.retry.task.support.cache.CacheGroupRateLimiter;
import com.aizuda.easy.retry.server.common.handler.ServerNodeBalance;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.ServerNodeMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode;
@ -239,7 +239,7 @@ public class FilterStrategies {
@Override
public Pair<Boolean /*是否符合条件*/, StringBuilder/*描述信息*/> filter(RetryContext retryContext) {
RetryTask retryTask = retryContext.getRetryTask();
boolean result = !ServerNodeBalance.RE_BALANCE_ING.get();
boolean result = ! DistributeInstance.RE_BALANCE_ING.get();
StringBuilder description = new StringBuilder();
if (!result) {
description.append(MessageFormat.format("系统Rebalancing中数据无法重试.uniqueId:[{0}]", retryTask.getUniqueId()));

View File

@ -0,0 +1,14 @@
package com.aizuda.easy.retry.server.dispatch;
import lombok.Data;
/**
* @author www.byteblogs.com
* @date 2023-09-21 23:30:22
* @since 2.4.0
*/
@Data
public class ConsumerBucket {
private Integer bucket;
}

View File

@ -0,0 +1,26 @@
package com.aizuda.easy.retry.server.dispatch;
import akka.actor.AbstractActor;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
/**
* 消费当前节点分配的bucket并生成扫描任务
* <p>
*
* @author www.byteblogs.com
* @date 2023-09-21 23:47:23
* @since 2.4.0
*/
@Component(ActorGenerator.SCAN_BUCKET_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class ConsumerBucketActor extends AbstractActor {
@Override
public Receive createReceive() {
return null;
}
}

View File

@ -0,0 +1,120 @@
package com.aizuda.easy.retry.server.dispatch;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.dto.DistributeInstance;
import com.aizuda.easy.retry.server.retry.task.support.cache.CacheBucketActor;
import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 负责EasyRetry系统任务分发调度
*
* @author www.byteblogs.com
* @date 2023-09-21 23:36:47
* @since 2.4.0
*/
@Component
@Slf4j
public class DispatchService implements Lifecycle {
/**
* 分配器线程
*/
private final ScheduledExecutorService dispatchService = Executors
.newSingleThreadScheduledExecutor(r -> new Thread(r, "dispatch-service"));
/**
* 调度时长
*/
public static final Long PERIOD = 10L;
/**
* 延迟10s为了尽可能保障集群节点都启动完成在进行rebalance
*/
public static final Long INITIAL_DELAY = 10L;
@Override
public void start() {
dispatchService.scheduleAtFixedRate(() -> {
try {
// 当正在rebalance时延迟10s尽量等待所有节点都完成rebalance
if ( DistributeInstance.RE_BALANCE_ING.get()) {
LogUtils.info(log, "正在rebalance中....");
TimeUnit.SECONDS.sleep(INITIAL_DELAY);
}
Set<Integer> currentConsumerBuckets = getConsumerBucket();
LogUtils.info(log, "当前节点分配的桶:[{}]", currentConsumerBuckets);
if (!CollectionUtils.isEmpty(currentConsumerBuckets)) {
for (Integer bucket : currentConsumerBuckets) {
ConsumerBucket scanTaskDTO = new ConsumerBucket();
scanTaskDTO.setBucket(bucket);
produceConsumerBucketActorTask(scanTaskDTO);
}
}
} catch (Exception e) {
LogUtils.error(log, "分发异常", e);
}
}, INITIAL_DELAY, PERIOD, TimeUnit.SECONDS);
}
/**
* 生成bucket对应的Actor
*
* @param consumerBucket {@link ConsumerBucket} 消费的桶的上下文
*/
private void produceConsumerBucketActorTask(ConsumerBucket consumerBucket) {
Integer bucket = consumerBucket.getBucket();
// 缓存bucket对应的 Actor
ActorRef actorRef = cacheActorRef(bucket);
actorRef.tell(consumerBucket, actorRef);
}
/**
* 分配当前POD负责消费的桶
*
* @return {@link GroupConfig} 组上下文
*/
private Set<Integer> getConsumerBucket() {
return DistributeInstance.INSTANCE.getConsumerBucket();
}
/**
* 缓存Actor对象
*/
private ActorRef cacheActorRef(Integer bucket) {
ActorRef scanActorRef = CacheBucketActor.get(bucket);
if (Objects.isNull(scanActorRef)) {
scanActorRef = ActorGenerator.scanBucketActor();
// 缓存扫描器actor
CacheBucketActor.put(bucket, scanActorRef);
}
return scanActorRef;
}
@Override
public void close() {
}
}