From 28b488135b68545c59f557e8306279edb7d2f11f Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Thu, 21 Sep 2023 23:50:54 +0800 Subject: [PATCH] =?UTF-8?q?feat:2.4.0=201.=20=E9=80=9A=E8=BF=87bucket?= =?UTF-8?q?=E6=9D=A5=E5=AE=9E=E7=8E=B0=E8=B4=9F=E8=BD=BD=E5=9D=87=E8=A1=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../server/common/akka/ActorGenerator.java | 10 ++ .../cache/CacheConsumerBucketGroup.java | 74 --------- .../common/cache/CacheConsumerGroup.java | 22 +-- .../retry/server/common/cache/CacheGroup.java | 81 --------- .../server/common/dto/DistributeInstance.java | 38 +++++ .../common/handler/DistributeInstance.java | 15 -- .../common/handler/ServerNodeBalance.java | 60 +------ .../common/register/ClientRegister.java | 11 -- ...upScanActor.java => CacheBucketActor.java} | 12 +- .../support/dispatch/DispatchService.java | 156 ------------------ .../{ScanTaskDTO.java => ScanTask.java} | 2 +- .../actor/scan/AbstractScanGroup.java | 11 +- .../dispatch/actor/scan/ScanGroupActor.java | 1 - .../support/strategy/FilterStrategies.java | 4 +- .../retry/server/dispatch/ConsumerBucket.java | 14 ++ .../server/dispatch/ConsumerBucketActor.java | 26 +++ .../server/dispatch/DispatchService.java | 120 ++++++++++++++ 17 files changed, 231 insertions(+), 426 deletions(-) delete mode 100644 easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheConsumerBucketGroup.java delete mode 100644 easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheGroup.java create mode 100644 easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/DistributeInstance.java delete mode 100644 easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/DistributeInstance.java rename easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/cache/{CacheGroupScanActor.java => CacheBucketActor.java} (74%) delete mode 100644 easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/DispatchService.java rename easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/{ScanTaskDTO.java => ScanTask.java} (89%) create mode 100644 easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/ConsumerBucket.java create mode 100644 easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/ConsumerBucketActor.java create mode 100644 easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/DispatchService.java diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java index 56cd0118a..8b3530091 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java @@ -15,6 +15,7 @@ public class ActorGenerator { public static final String SCAN_CALLBACK_GROUP_ACTOR = "ScanCallbackGroupActor"; public static final String SCAN_RETRY_GROUP_ACTOR = "ScanGroupActor"; + public static final String SCAN_BUCKET_ACTOR = "ScanBucketActor"; public static final String FINISH_ACTOR = "FinishActor"; public static final String FAILURE_ACTOR = "FailureActor"; public static final String NO_RETRY_ACTOR = "NoRetryActor"; @@ -89,6 +90,15 @@ public class ActorGenerator { return getDispatchRetryActorSystem().actorOf(getSpringExtension().props(SCAN_CALLBACK_GROUP_ACTOR)); } + /** + * 生成扫描重试数据的actor + * + * @return actor 引用 + */ + public static ActorRef scanBucketActor() { + return getDispatchRetryActorSystem().actorOf(getSpringExtension().props(SCAN_BUCKET_ACTOR)); + } + /** * 生成扫描重试数据的actor * diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheConsumerBucketGroup.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheConsumerBucketGroup.java deleted file mode 100644 index 51e78275f..000000000 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheConsumerBucketGroup.java +++ /dev/null @@ -1,74 +0,0 @@ -package com.aizuda.easy.retry.server.common.cache; - -import com.aizuda.easy.retry.common.core.log.LogUtils; -import com.aizuda.easy.retry.server.common.Lifecycle; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; - -/** - * 当前POD负责消费的组 - * - * @author www.byteblogs.com - * @date 2021-10-30 - * @since 1.6.0 - */ -@Component -@Slf4j -public class CacheConsumerBucketGroup implements Lifecycle { - - private volatile List bucketList = new ArrayList<>(); - - /** - * 获取所有缓存 - * - * @return 缓存对象 - */ - public static Set getAllConsumerGroupName() { - ConcurrentMap concurrentMap = CACHE.asMap(); - return new HashSet<>(concurrentMap.values()); - - } - - /** - * 无缓存时添加 - * 有缓存时更新 - * - * @return 缓存对象 - */ - public static synchronized void addOrUpdate(String groupName) { - LogUtils.info(log, "add consumer cache. groupName:[{}]", groupName); - CACHE.put(groupName, groupName); - } - - public static void remove(String groupName) { - LogUtils.info(log, "Remove consumer cache. groupName:[{}]", groupName); - CACHE.invalidate(groupName); - } - - public static void clear() { - CACHE.invalidateAll(); - } - - @Override - public void start() { - LogUtils.info(log, "CacheRegisterTable start"); - CACHE = CacheBuilder.newBuilder() - // 设置并发级别为cpu核心数 - .concurrencyLevel(Runtime.getRuntime().availableProcessors()) - .build(); - } - - @Override - public void close() { - LogUtils.info(log, "CacheRegisterTable stop"); - CACHE.invalidateAll(); - } -} diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheConsumerGroup.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheConsumerGroup.java index 948263c5c..a3c94483e 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheConsumerGroup.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheConsumerGroup.java @@ -20,6 +20,7 @@ import java.util.concurrent.ConcurrentMap; */ @Component @Slf4j +@Deprecated public class CacheConsumerGroup implements Lifecycle { private static Cache CACHE; @@ -29,32 +30,13 @@ public class CacheConsumerGroup implements Lifecycle { * * @return 缓存对象 */ + @Deprecated public static Set getAllConsumerGroupName() { ConcurrentMap concurrentMap = CACHE.asMap(); return new HashSet<>(concurrentMap.values()); } - /** - * 无缓存时添加 - * 有缓存时更新 - * - * @return 缓存对象 - */ - public static synchronized void addOrUpdate(String groupName) { - LogUtils.info(log, "add consumer cache. groupName:[{}]", groupName); - CACHE.put(groupName, groupName); - } - - public static void remove(String groupName) { - LogUtils.info(log, "Remove consumer cache. groupName:[{}]", groupName); - CACHE.invalidate(groupName); - } - - public static void clear() { - CACHE.invalidateAll(); - } - @Override public void start() { LogUtils.info(log, "CacheRegisterTable start"); diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheGroup.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheGroup.java deleted file mode 100644 index 9c02dbde4..000000000 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheGroup.java +++ /dev/null @@ -1,81 +0,0 @@ -package com.aizuda.easy.retry.server.common.cache; - -import com.aizuda.easy.retry.common.core.log.LogUtils; -import com.aizuda.easy.retry.server.common.Lifecycle; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; -import org.springframework.util.CollectionUtils; - -import java.util.Collections; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentMap; - -/** - * 组注册表 - * - * @author www.byteblogs.com - * @date 2021-10-30 - * @since 1.6.0 - */ -@Component -@Slf4j -public class CacheGroup implements Lifecycle { - - private static Cache CACHE; - - /** - * 获取所有缓存 - * - * @return 缓存对象 - */ - public static Set getAllGroup() { - ConcurrentMap concurrentMap = CACHE.asMap(); - if (CollectionUtils.isEmpty(concurrentMap)) { - return Collections.EMPTY_SET; - } - - return new TreeSet<>(concurrentMap.values()); - - } - - /** - * 获取所有缓存 - * - * @return 缓存对象 - */ - public static String get(String hostId) { - return CACHE.getIfPresent(hostId); - } - - /** - * 无缓存时添加 - * 有缓存时更新 - * - * @return 缓存对象 - */ - public static synchronized void addOrUpdate(String groupName) { - CACHE.put(groupName, groupName); - } - - public static void remove(String groupName) { - CACHE.invalidate(groupName); - } - - @Override - public void start() { - LogUtils.info(log, "CacheGroup start"); - CACHE = CacheBuilder.newBuilder() - // 设置并发级别为cpu核心数 - .concurrencyLevel(Runtime.getRuntime().availableProcessors()) - .build(); - } - - @Override - public void close() { - LogUtils.info(log, "CacheGroup stop"); - CACHE.invalidateAll(); - } -} diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/DistributeInstance.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/DistributeInstance.java new file mode 100644 index 000000000..596977bdd --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/DistributeInstance.java @@ -0,0 +1,38 @@ +package com.aizuda.easy.retry.server.common.dto; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * www.byteblogs.com + * + * @author: shuguang.zhang + * @date : 2023-09-21 09:26 + */ +public class DistributeInstance { + private DistributeInstance() { + } + + public static final DistributeInstance INSTANCE = new DistributeInstance(); + + /** + * 控制rebalance状态 + */ + public static final AtomicBoolean RE_BALANCE_ING = new AtomicBoolean(Boolean.FALSE); + + private final CopyOnWriteArraySet CONSUMER_BUCKETS = new CopyOnWriteArraySet<>(); + + public CopyOnWriteArraySet getConsumerBucket() { + return CONSUMER_BUCKETS; + } + + public void setConsumerBucket(List buckets) { + CONSUMER_BUCKETS.addAll(buckets); + } + + public void clearConsumerBucket() { + CONSUMER_BUCKETS.clear(); + } + +} diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/DistributeInstance.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/DistributeInstance.java deleted file mode 100644 index 5bc9284f2..000000000 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/DistributeInstance.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.aizuda.easy.retry.server.common.handler; - -import java.util.concurrent.CopyOnWriteArraySet; - -/** - * www.byteblogs.com - * - * @author: shuguang.zhang - * @date : 2023-09-21 09:26 - */ -public class DistributeInstance { - - private CopyOnWriteArraySet CONSUMER_BUCKET = new CopyOnWriteArraySet<>(); - -} diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/ServerNodeBalance.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/ServerNodeBalance.java index e482ce898..bb301040d 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/ServerNodeBalance.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/ServerNodeBalance.java @@ -4,10 +4,8 @@ import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.common.Lifecycle; import com.aizuda.easy.retry.server.common.allocate.server.AllocateMessageQueueAveragely; -import com.aizuda.easy.retry.server.common.allocate.server.AllocateMessageQueueConsistentHash; -import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup; -import com.aizuda.easy.retry.server.common.cache.CacheGroup; import com.aizuda.easy.retry.server.common.config.SystemProperties; +import com.aizuda.easy.retry.server.common.dto.DistributeInstance; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; import com.aizuda.easy.retry.server.common.register.ServerRegister; @@ -16,7 +14,6 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.ServerNodeMa import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig; import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -30,9 +27,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; -import java.util.stream.Stream; /** * 负责处理组或者节点变化时,重新分配组在不同的节点上消费 @@ -61,14 +56,9 @@ public class ServerNodeBalance implements Lifecycle, Runnable { private List bucketList; - /** - * 控制rebalance状态 - */ - public static final AtomicBoolean RE_BALANCE_ING = new AtomicBoolean(Boolean.FALSE); - public void doBalance() { LogUtils.info(log, "rebalance start"); - RE_BALANCE_ING.set(Boolean.TRUE); + DistributeInstance.RE_BALANCE_ING.set(Boolean.TRUE); try { @@ -79,14 +69,9 @@ public class ServerNodeBalance implements Lifecycle, Runnable { LogUtils.error(log, "server node is empty"); } - Set allGroup = CacheGroup.getAllGroup(); - if (CollectionUtils.isEmpty(allGroup)) { - LogUtils.error(log, "group is empty"); - } - - // 删除本地缓存的所有组信息 - CacheConsumerGroup.clear(); - if(CollectionUtils.isEmpty(podIpSet) || CollectionUtils.isEmpty(allGroup)) { + // 删除本地缓存的消费桶的信息 + DistributeInstance.INSTANCE.clearConsumerBucket(); + if(CollectionUtils.isEmpty(podIpSet)) { return; } @@ -94,15 +79,13 @@ public class ServerNodeBalance implements Lifecycle, Runnable { .allocate(ServerRegister.CURRENT_CID, bucketList, new ArrayList<>(podIpSet)); // 重新覆盖本地分配的组信息 - for (String groupName : allocate) { - CacheConsumerGroup.addOrUpdate(groupName); - } + DistributeInstance.INSTANCE.setConsumerBucket(allocate); LogUtils.info(log, "rebalance complete. allocate:[{}]", allocate); } catch (Exception e) { LogUtils.error(log, "rebalance error. ", e); } finally { - RE_BALANCE_ING.set(Boolean.FALSE); + DistributeInstance.RE_BALANCE_ING.set(Boolean.FALSE); } } @@ -128,25 +111,6 @@ public class ServerNodeBalance implements Lifecycle, Runnable { RegisterNodeInfo registerNodeInfo = concurrentMap.get(localHostId); // 删除过期的节点信息 CacheRegisterTable.remove(registerNodeInfo.getGroupName(), registerNodeInfo.getHostId()); - // 删除本地消费组信息 - CacheConsumerGroup.remove(registerNodeInfo.getGroupName()); - } - } - - private void refreshAndRemoveGroup(List removeGroupConfig, final Set allGroup) { - if (allGroup.size() != removeGroupConfig.size()) { - allGroup.removeAll(removeGroupConfig.stream() - .map(GroupConfig::getGroupName).collect(Collectors.toSet())); - - // 删除已关闭的组 - for (String groupName : allGroup) { - CacheGroup.remove(groupName); - } - - // 添加组 - for (final GroupConfig groupConfig : removeGroupConfig) { - CacheGroup.addOrUpdate(groupConfig.getGroupName()); - } } } @@ -210,24 +174,16 @@ public class ServerNodeBalance implements Lifecycle, Runnable { Set localHostIds = concurrentMap.values().stream().map(RegisterNodeInfo::getHostId) .collect(Collectors.toSet()); - List removeGroupConfig = accessTemplate.getGroupConfigAccess().getAllOpenGroupConfig(); - Set allGroup = CacheGroup.getAllGroup(); - // 无缓存的节点触发refreshCache if (CollectionUtils.isEmpty(concurrentMap) // 节点数量不一致触发 || isNodeSizeNotEqual(concurrentMap.size(), remotePods.size()) - // 若存在远程和本地缓存的组的数量不一致则触发rebalance - || isGroupSizeNotEqual(removeGroupConfig, allGroup) // 判断远程节点是不是和本地节点一致的,如果不一致则重新分配 || isNodeNotMatch(remoteHostIds, localHostIds)) { // 删除本地缓存以下线的节点信息 removeNode(concurrentMap, remoteHostIds, localHostIds); - // 刷新组配置和删除已关闭的组 - refreshAndRemoveGroup(removeGroupConfig, allGroup); - // 重新获取DB中最新的服务信息 refreshCache(remotePods); @@ -253,8 +209,6 @@ public class ServerNodeBalance implements Lifecycle, Runnable { for (final RegisterNodeInfo registerNodeInfo : expireNodeSet) { // 删除过期的节点信息 CacheRegisterTable.remove(registerNodeInfo.getGroupName(), registerNodeInfo.getHostId()); - // 删除本地消费组信息 - CacheConsumerGroup.remove(registerNodeInfo.getGroupName()); } } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ClientRegister.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ClientRegister.java index cd0d4e6d4..82ab1f54c 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ClientRegister.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ClientRegister.java @@ -81,17 +81,6 @@ public class ClientRegister extends AbstractRegister implements Runnable { refreshExpireAt(serverNode); } - // 同步当前POD消费的组的节点信息 - // netty的client只会注册到一个服务端,若组分配的和client连接的不是一个POD则会导致当前POD没有其他客户端的注册信息 - Set allConsumerGroupName = CacheConsumerGroup.getAllConsumerGroupName(); - if (!CollectionUtils.isEmpty(allConsumerGroupName)) { - List serverNodes = serverNodeMapper.selectList( - new LambdaQueryWrapper().in(ServerNode::getGroupName, allConsumerGroupName)); - for (final ServerNode node : serverNodes) { - // 刷新全量本地缓存 - CacheRegisterTable.addOrUpdate(node.getGroupName(), node); - } - } }catch (InterruptedException e) { LogUtils.info(log, "[{}] thread stop.", Thread.currentThread().getName()); } catch (Exception e) { diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/cache/CacheGroupScanActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/cache/CacheBucketActor.java similarity index 74% rename from easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/cache/CacheGroupScanActor.java rename to easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/cache/CacheBucketActor.java index 16a1c61ca..8a11d89db 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/cache/CacheGroupScanActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/cache/CacheBucketActor.java @@ -20,17 +20,17 @@ import org.springframework.stereotype.Component; @Component @Data @Slf4j -public class CacheGroupScanActor implements Lifecycle { +public class CacheBucketActor implements Lifecycle { - private static Cache CACHE; + private static Cache CACHE; /** * 获取所有缓存 * * @return 缓存对象 */ - public static ActorRef get(String groupName, TaskTypeEnum typeEnum) { - return CACHE.getIfPresent(groupName.concat(typeEnum.name())); + public static ActorRef get(Integer bucket) { + return CACHE.getIfPresent(bucket); } /** @@ -38,8 +38,8 @@ public class CacheGroupScanActor implements Lifecycle { * * @return 缓存对象 */ - public static void put(String groupName, TaskTypeEnum typeEnum, ActorRef actorRef) { - CACHE.put(groupName.concat(typeEnum.name()), actorRef); + public static void put(Integer bucket, ActorRef actorRef) { + CACHE.put(bucket, actorRef); } @Override diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/DispatchService.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/DispatchService.java deleted file mode 100644 index 720735e88..000000000 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/DispatchService.java +++ /dev/null @@ -1,156 +0,0 @@ -package com.aizuda.easy.retry.server.retry.task.support.dispatch; - -import akka.actor.ActorRef; -import com.aizuda.easy.retry.common.core.log.LogUtils; -import com.aizuda.easy.retry.server.common.Lifecycle; -import com.aizuda.easy.retry.server.common.config.SystemProperties; -import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; -import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup; -import com.aizuda.easy.retry.server.retry.task.support.cache.CacheGroupRateLimiter; -import com.aizuda.easy.retry.server.retry.task.support.cache.CacheGroupScanActor; -import com.aizuda.easy.retry.server.common.handler.ServerNodeBalance; -import com.aizuda.easy.retry.template.datasource.persistence.mapper.ServerNodeMapper; -import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig; -import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.google.common.cache.Cache; -import com.google.common.util.concurrent.RateLimiter; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; -import org.springframework.util.CollectionUtils; - -import java.util.List; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * 分发器组件 - * - * @author: www.byteblogs.com - * @date : 2021-11-19 15:46 - * @since 1.6.0 - */ -@Component -@Slf4j -public class DispatchService implements Lifecycle { - - /** - * 分配器线程 - */ - private final ScheduledExecutorService dispatchService = Executors - .newSingleThreadScheduledExecutor(r -> new Thread(r, "DispatchService")); - - /** - * 调度时长 - */ - public static final Long PERIOD = 10L; - - /** - * 延迟10s为了尽可能保障集群节点都启动完成在进行rebalance - */ - public static final Long INITIAL_DELAY = 10L; - - @Autowired - private ServerNodeMapper serverNodeMapper; - - @Autowired - private SystemProperties systemProperties; - - @Override - public void start() { - - dispatchService.scheduleAtFixedRate(() -> { - - try { - // 当正在rebalance时延迟10s,尽量等待所有节点都完成rebalance - if (ServerNodeBalance.RE_BALANCE_ING.get()) { - LogUtils.info(log, "正在rebalance中...."); - TimeUnit.SECONDS.sleep(INITIAL_DELAY); - } - - Set currentHostGroupList = getCurrentHostGroupList(); - LogUtils.info(log, "当前分配的组:[{}]", currentHostGroupList); - if (!CollectionUtils.isEmpty(currentHostGroupList)) { - for (String groupName : currentHostGroupList) { - ScanTaskDTO scanTaskDTO = new ScanTaskDTO(); - scanTaskDTO.setGroupName(groupName); - produceScanActorTask(scanTaskDTO); - } - } - - } catch (Exception e) { - LogUtils.error(log, "分发异常", e); - } - - - }, INITIAL_DELAY, PERIOD, TimeUnit.SECONDS); - } - - /** - * 扫描任务生成器 - * - * @param scanTaskDTO {@link GroupConfig} 组上下文 - */ - private void produceScanActorTask(ScanTaskDTO scanTaskDTO) { - - String groupName = scanTaskDTO.getGroupName(); - - // 缓存按照 - cacheRateLimiter(groupName); - - // 扫描重试数据 - ActorRef scanRetryActorRef = cacheActorRef(groupName, TaskTypeEnum.RETRY); - scanRetryActorRef.tell(scanTaskDTO, scanRetryActorRef); - - // 扫描回调数据 - ActorRef scanCallbackActorRef = cacheActorRef(groupName, TaskTypeEnum.CALLBACK); - scanCallbackActorRef.tell(scanTaskDTO, scanCallbackActorRef); - - } - - /** - * 缓存限流对象 - */ - private void cacheRateLimiter(String groupName) { - List serverNodes = serverNodeMapper.selectList(new LambdaQueryWrapper() - .eq(ServerNode::getGroupName, groupName)); - Cache rateLimiterCache = CacheGroupRateLimiter.getAll(); - for (ServerNode serverNode : serverNodes) { - RateLimiter rateLimiter = rateLimiterCache.getIfPresent(serverNode.getHostId()); - if (Objects.isNull(rateLimiter)) { - rateLimiterCache.put(serverNode.getHostId(), RateLimiter.create(systemProperties.getLimiter())); - } - } - - } - - /** - * 缓存Actor对象 - */ - private ActorRef cacheActorRef(String groupName, TaskTypeEnum typeEnum) { - ActorRef scanActorRef = CacheGroupScanActor.get(groupName, typeEnum); - if (Objects.isNull(scanActorRef)) { - scanActorRef = typeEnum.getActorRef().get(); - // 缓存扫描器actor - CacheGroupScanActor.put(groupName, typeEnum, scanActorRef); - } - return scanActorRef; - } - - /** - * 分配当前POD负责的组 RebalanceGroup - * - * @return {@link GroupConfig} 组上下文 - */ - private Set getCurrentHostGroupList() { - return CacheConsumerGroup.getAllConsumerGroupName(); - } - - @Override - public void close() { - } -} diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/ScanTaskDTO.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/ScanTask.java similarity index 89% rename from easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/ScanTaskDTO.java rename to easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/ScanTask.java index e2a15e26b..c1708a467 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/ScanTaskDTO.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/ScanTask.java @@ -10,7 +10,7 @@ import lombok.Data; * @since 1.5.0 */ @Data -public class ScanTaskDTO { +public class ScanTask { String groupName; } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java index a4e841167..bf01e8ee4 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java @@ -8,8 +8,7 @@ import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy; import com.aizuda.easy.retry.server.retry.task.support.RetryContext; -import com.aizuda.easy.retry.server.retry.task.support.dispatch.DispatchService; -import com.aizuda.easy.retry.server.retry.task.support.dispatch.ScanTaskDTO; +import com.aizuda.easy.retry.server.retry.task.support.dispatch.ScanTask; import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler; import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor; import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; @@ -47,7 +46,7 @@ public abstract class AbstractScanGroup extends AbstractActor { @Override public Receive createReceive() { - return receiveBuilder().match(ScanTaskDTO.class, config -> { + return receiveBuilder().match(ScanTask.class, config -> { try { doScan(config); @@ -59,11 +58,11 @@ public abstract class AbstractScanGroup extends AbstractActor { } - protected void doScan(final ScanTaskDTO scanTaskDTO) { + protected void doScan(final ScanTask scanTask) { LocalDateTime lastAt = LocalDateTime.now().minusDays(systemProperties.getLastDays()); int retryPullPageSize = systemProperties.getRetryPullPageSize(); - String groupName = scanTaskDTO.getGroupName(); + String groupName = scanTask.getGroupName(); Long lastId = Optional.ofNullable(getLastId(groupName)).orElse(0L); // 扫描当前Group 待处理的任务 @@ -72,7 +71,7 @@ public abstract class AbstractScanGroup extends AbstractActor { if (!CollectionUtils.isEmpty(list)) { // 更新拉取的最大的id - putLastId(scanTaskDTO.getGroupName(), list.get(list.size() - 1).getId()); + putLastId(scanTask.getGroupName(), list.get(list.size() - 1).getId()); for (RetryTask retryTask : list) { diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanGroupActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanGroupActor.java index 4553237de..c868f14db 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanGroupActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanGroupActor.java @@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentMap; @Slf4j public class ScanGroupActor extends AbstractScanGroup { - public static final String BEAN_NAME = "ScanGroupActor"; /** * 缓存待拉取数据的起点id *

diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/FilterStrategies.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/FilterStrategies.java index 1c3574589..840064a12 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/FilterStrategies.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/FilterStrategies.java @@ -4,11 +4,11 @@ import cn.hutool.core.lang.Pair; import com.aizuda.easy.retry.common.core.context.SpringContext; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; +import com.aizuda.easy.retry.server.common.dto.DistributeInstance; import com.aizuda.easy.retry.server.retry.task.support.FilterStrategy; import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy; import com.aizuda.easy.retry.server.retry.task.support.RetryContext; import com.aizuda.easy.retry.server.retry.task.support.cache.CacheGroupRateLimiter; -import com.aizuda.easy.retry.server.common.handler.ServerNodeBalance; import com.aizuda.easy.retry.template.datasource.persistence.mapper.ServerNodeMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode; @@ -239,7 +239,7 @@ public class FilterStrategies { @Override public Pair filter(RetryContext retryContext) { RetryTask retryTask = retryContext.getRetryTask(); - boolean result = !ServerNodeBalance.RE_BALANCE_ING.get(); + boolean result = ! DistributeInstance.RE_BALANCE_ING.get(); StringBuilder description = new StringBuilder(); if (!result) { description.append(MessageFormat.format("系统Rebalancing中数据无法重试.uniqueId:[{0}]", retryTask.getUniqueId())); diff --git a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/ConsumerBucket.java b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/ConsumerBucket.java new file mode 100644 index 000000000..1f3d46ddd --- /dev/null +++ b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/ConsumerBucket.java @@ -0,0 +1,14 @@ +package com.aizuda.easy.retry.server.dispatch; + +import lombok.Data; + +/** + * @author www.byteblogs.com + * @date 2023-09-21 23:30:22 + * @since 2.4.0 + */ +@Data +public class ConsumerBucket { + + private Integer bucket; +} diff --git a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/ConsumerBucketActor.java b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/ConsumerBucketActor.java new file mode 100644 index 000000000..bdfbca55e --- /dev/null +++ b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/ConsumerBucketActor.java @@ -0,0 +1,26 @@ +package com.aizuda.easy.retry.server.dispatch; + +import akka.actor.AbstractActor; +import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +/** + * 消费当前节点分配的bucket并生成扫描任务 + *

+ * + * @author www.byteblogs.com + * @date 2023-09-21 23:47:23 + * @since 2.4.0 + */ +@Component(ActorGenerator.SCAN_BUCKET_ACTOR) +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +@Slf4j +public class ConsumerBucketActor extends AbstractActor { + @Override + public Receive createReceive() { + return null; + } +} diff --git a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/DispatchService.java b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/DispatchService.java new file mode 100644 index 000000000..af6cd63a7 --- /dev/null +++ b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/DispatchService.java @@ -0,0 +1,120 @@ +package com.aizuda.easy.retry.server.dispatch; + +import akka.actor.ActorRef; +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.common.Lifecycle; +import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.dto.DistributeInstance; +import com.aizuda.easy.retry.server.retry.task.support.cache.CacheBucketActor; +import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * 负责EasyRetry系统任务分发调度 + * + * @author www.byteblogs.com + * @date 2023-09-21 23:36:47 + * @since 2.4.0 + */ +@Component +@Slf4j +public class DispatchService implements Lifecycle { + + /** + * 分配器线程 + */ + private final ScheduledExecutorService dispatchService = Executors + .newSingleThreadScheduledExecutor(r -> new Thread(r, "dispatch-service")); + + /** + * 调度时长 + */ + public static final Long PERIOD = 10L; + + /** + * 延迟10s为了尽可能保障集群节点都启动完成在进行rebalance + */ + public static final Long INITIAL_DELAY = 10L; + + @Override + public void start() { + + dispatchService.scheduleAtFixedRate(() -> { + + try { + // 当正在rebalance时延迟10s,尽量等待所有节点都完成rebalance + if ( DistributeInstance.RE_BALANCE_ING.get()) { + LogUtils.info(log, "正在rebalance中...."); + TimeUnit.SECONDS.sleep(INITIAL_DELAY); + } + + Set currentConsumerBuckets = getConsumerBucket(); + LogUtils.info(log, "当前节点分配的桶:[{}]", currentConsumerBuckets); + if (!CollectionUtils.isEmpty(currentConsumerBuckets)) { + for (Integer bucket : currentConsumerBuckets) { + ConsumerBucket scanTaskDTO = new ConsumerBucket(); + scanTaskDTO.setBucket(bucket); + produceConsumerBucketActorTask(scanTaskDTO); + } + } + + } catch (Exception e) { + LogUtils.error(log, "分发异常", e); + } + + + }, INITIAL_DELAY, PERIOD, TimeUnit.SECONDS); + } + + + /** + * 生成bucket对应的Actor + * + * @param consumerBucket {@link ConsumerBucket} 消费的桶的上下文 + */ + private void produceConsumerBucketActorTask(ConsumerBucket consumerBucket) { + + Integer bucket = consumerBucket.getBucket(); + + // 缓存bucket对应的 Actor + ActorRef actorRef = cacheActorRef(bucket); + actorRef.tell(consumerBucket, actorRef); + + } + + + /** + * 分配当前POD负责消费的桶 + * + * @return {@link GroupConfig} 组上下文 + */ + private Set getConsumerBucket() { + return DistributeInstance.INSTANCE.getConsumerBucket(); + } + + /** + * 缓存Actor对象 + */ + private ActorRef cacheActorRef(Integer bucket) { + ActorRef scanActorRef = CacheBucketActor.get(bucket); + if (Objects.isNull(scanActorRef)) { + scanActorRef = ActorGenerator.scanBucketActor(); + // 缓存扫描器actor + CacheBucketActor.put(bucket, scanActorRef); + } + return scanActorRef; + } + + @Override + public void close() { + + } +}