diff --git a/doc/sql/easy_retry.sql b/doc/sql/easy_retry.sql index 209ac5f8..532c6a3f 100644 --- a/doc/sql/easy_retry.sql +++ b/doc/sql/easy_retry.sql @@ -135,6 +135,7 @@ CREATE TABLE `server_node` `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', PRIMARY KEY (`id`), + KEY `idx_expire_at_node_type` (`expire_at`,`node_type`), UNIQUE KEY `uk_host_id_host_ip` (`host_id`,`host_ip`) ) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4 COMMENT='服务器节点' ; diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/exception/AbstractError.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/exception/AbstractError.java index 8a511ed5..ebdceff0 100644 --- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/exception/AbstractError.java +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/exception/AbstractError.java @@ -3,7 +3,7 @@ package com.aizuda.easy.retry.common.core.exception; import lombok.Data; /** - * @author zhangshuguang + * @author www.byteblogs.com * @date 2020/05/13 */ @Data diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/convert/RetryTaskLogConverter.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/convert/RetryTaskLogConverter.java index 22d14042..817933af 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/convert/RetryTaskLogConverter.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/convert/RetryTaskLogConverter.java @@ -6,7 +6,7 @@ import org.mapstruct.Mapper; import org.mapstruct.factory.Mappers; /** - * @author: shuguang.zhang + * @author: www.byteblogs.com * @date : 2023-05-05 16:15 */ @Mapper diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheGroup.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheGroup.java index a1b7069f..d260ebd5 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheGroup.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheGroup.java @@ -11,6 +11,7 @@ import org.springframework.util.CollectionUtils; import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.ConcurrentMap; /** @@ -37,7 +38,7 @@ public class CacheGroup implements Lifecycle { return Collections.EMPTY_SET; } - return new HashSet<>(concurrentMap.values()); + return new TreeSet<>(concurrentMap.values()); } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerNodeBalance.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerNodeBalance.java index efe5f43f..acbc62f5 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerNodeBalance.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerNodeBalance.java @@ -25,25 +25,30 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; /** - * @author: shuguang.zhang + * 负责处理组或者节点变化时,重新分配组在不同的节点上消费 + * + * @author: www.byteblogs.com * @date : 2023-06-08 15:58 + * @since 1.6.0 */ @Component @Slf4j -public class ServerNodeBalance implements Lifecycle { +public class ServerNodeBalance implements Lifecycle, Runnable { + + /** + * 延迟10s为了尽可能保障集群节点都启动完成在进行rebalance + */ + public static final Long INITIAL_DELAY = 10L; @Autowired @Qualifier("configAccessProcessor") private ConfigAccess configAccess; - private final ScheduledExecutorService serverRegisterNode = Executors - .newSingleThreadScheduledExecutor(r -> new Thread(r, "server-node-balance")); + private Thread THREAD = null; @Autowired protected ServerNodeMapper serverNodeMapper; @@ -58,27 +63,28 @@ public class ServerNodeBalance implements Lifecycle { RE_BALANCE_ING.set(Boolean.TRUE); try { - // 已经按照id 正序排序 - List prepareAllocateGroupConfig = configAccess.getAllOpenGroupConfig(); - if (CollectionUtils.isEmpty(prepareAllocateGroupConfig)) { - return; - } // 为了保证客户端分配算法的一致性,serverNodes 从数据库从数据获取 Set podIpSet = CacheRegisterTable.getPodIdSet(ServerRegister.GROUP_NAME); if (CollectionUtils.isEmpty(podIpSet)) { - LogUtils.error(log, "服务端节点为空"); + LogUtils.error(log, "server node is empty"); return; } - Set groupNameSet = prepareAllocateGroupConfig.stream().map(GroupConfig::getGroupName) - .collect(Collectors.toSet()); + Set allGroup = CacheGroup.getAllGroup(); + if (CollectionUtils.isEmpty(allGroup)) { + LogUtils.error(log, "group is empty"); + return; + } List allocate = new AllocateMessageQueueConsistentHash() - .allocate(ServerRegister.CURRENT_CID, new ArrayList<>(groupNameSet), new ArrayList<>(podIpSet)); + .allocate(ServerRegister.CURRENT_CID, new ArrayList<>(allGroup), new ArrayList<>(podIpSet)); + + // 删除本地缓存的所有组信息 CacheConsumerGroup.clear(); - for (final String groupName : allocate) { + // 重新覆盖本地分配的组信息 + for (String groupName : allocate) { CacheConsumerGroup.addOrUpdate(groupName); } @@ -94,84 +100,26 @@ public class ServerNodeBalance implements Lifecycle { @Override public void start() { LogUtils.info(log, "ServerNodeBalance start"); - serverRegisterNode.scheduleAtFixedRate(() -> { - try { + THREAD = new Thread(this, "server-node-balance"); + THREAD.start(); + } - List remotePods = serverNodeMapper.selectList(new LambdaQueryWrapper() - .eq(ServerNode::getNodeType, NodeTypeEnum.SERVER.getType())); + private void removeNode(ConcurrentMap concurrentMap, Set remoteHostIds, Set localHostIds) { - // 获取缓存中的节点 - ConcurrentMap concurrentMap = CacheRegisterTable - .get(ServerRegister.GROUP_NAME); - - Set remoteHostIds = remotePods.stream().map(ServerNode::getHostId).collect(Collectors.toSet()); - - Set localHostIds = concurrentMap.values().stream().map(RegisterNodeInfo::getHostId) - .collect(Collectors.toSet()); - - List removeGroupConfig = configAccess.getAllOpenGroupConfig(); - Set allGroup = CacheGroup.getAllGroup(); - - // 无缓存的节点触发refreshCache - if (CollectionUtils.isEmpty(concurrentMap) - // 节点数量不一致触发 - || remotePods.size() != concurrentMap.size() - // 若存在新的组则触发rebalance - || allGroup.size() != removeGroupConfig.size() - // 判断远程节点是不是和本地节点一致的,如果不一致则重新分配 - || !remoteHostIds.containsAll(localHostIds)) { - - localHostIds.removeAll(remoteHostIds); - for (String localHostId : localHostIds) { - RegisterNodeInfo registerNodeInfo = concurrentMap.get(localHostId); - // 删除过期的节点信息 - CacheRegisterTable.remove(registerNodeInfo.getGroupName(), registerNodeInfo.getHostId()); - // 删除本地消费组信息 - CacheConsumerGroup.remove(registerNodeInfo.getGroupName()); - } - - // 刷新组配置和删除已关闭的组 - refreshAndRemoveGroup(removeGroupConfig, allGroup); - - // 重新获取DB中最新的服务信息 - refreshCache(remotePods); - - // 触发rebalance - doBalance(); - - - } else { - - // 重新刷新所有的缓存key - refreshCache(remotePods); - - // 再次获取最新的节点信息 - concurrentMap = CacheRegisterTable - .get(ServerRegister.GROUP_NAME); - - // 找出过期的节点 - Set expireNodeSet = concurrentMap.values().stream() - .filter(registerNodeInfo -> registerNodeInfo.getExpireAt().isBefore(LocalDateTime.now())) - .collect(Collectors.toSet()); - for (final RegisterNodeInfo registerNodeInfo : expireNodeSet) { - // 删除过期的节点信息 - CacheRegisterTable.remove(registerNodeInfo.getGroupName(), registerNodeInfo.getHostId()); - // 删除本地消费组信息 - CacheConsumerGroup.remove(registerNodeInfo.getGroupName()); - } - - } - - } catch (Exception e) { - LogUtils.error(log, "check balance error", e); - } - }, 10, 1, TimeUnit.SECONDS); + localHostIds.removeAll(remoteHostIds); + for (String localHostId : localHostIds) { + RegisterNodeInfo registerNodeInfo = concurrentMap.get(localHostId); + // 删除过期的节点信息 + CacheRegisterTable.remove(registerNodeInfo.getGroupName(), registerNodeInfo.getHostId()); + // 删除本地消费组信息 + CacheConsumerGroup.remove(registerNodeInfo.getGroupName()); + } } private void refreshAndRemoveGroup(List removeGroupConfig, final Set allGroup) { if (allGroup.size() != removeGroupConfig.size()) { allGroup.removeAll(removeGroupConfig.stream() - .map(GroupConfig::getGroupName).collect(Collectors.toSet())); + .map(GroupConfig::getGroupName).collect(Collectors.toSet())); // 删除已关闭的组 for (String groupName : allGroup) { @@ -197,11 +145,11 @@ public class ServerNodeBalance implements Lifecycle { public void close() { // 停止定时任务 - serverRegisterNode.shutdown(); + THREAD.interrupt(); LogUtils.info(log, "ServerNodeBalance start. "); int i = serverNodeMapper - .delete(new LambdaQueryWrapper().eq(ServerNode::getHostId, ServerRegister.CURRENT_CID)); + .delete(new LambdaQueryWrapper().eq(ServerNode::getHostId, ServerRegister.CURRENT_CID)); if (1 == i) { LogUtils.info(log, "delete node success. [{}]", ServerRegister.CURRENT_CID); } else { @@ -210,4 +158,124 @@ public class ServerNodeBalance implements Lifecycle { LogUtils.info(log, "ServerNodeBalance close complete"); } + + @Override + public void run() { + try { + TimeUnit.SECONDS.sleep(INITIAL_DELAY); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + while (!Thread.currentThread().isInterrupted()) { + try { + + List remotePods = serverNodeMapper.selectList(new LambdaQueryWrapper() + .ge(ServerNode::getExpireAt, LocalDateTime.now()) + .eq(ServerNode::getNodeType, NodeTypeEnum.SERVER.getType())); + + // 获取缓存中的节点 + ConcurrentMap concurrentMap = CacheRegisterTable + .get(ServerRegister.GROUP_NAME); + + Set remoteHostIds = remotePods.stream().map(ServerNode::getHostId).collect(Collectors.toSet()); + + Set localHostIds = concurrentMap.values().stream().map(RegisterNodeInfo::getHostId) + .collect(Collectors.toSet()); + + List removeGroupConfig = configAccess.getAllOpenGroupConfig(); + Set allGroup = CacheGroup.getAllGroup(); + + // 无缓存的节点触发refreshCache + if (CollectionUtils.isEmpty(concurrentMap) + // 节点数量不一致触发 + || isNodeSizeNotEqual(remotePods.size(), concurrentMap.size()) + // 若存在远程和本地缓存的组的数量不一致则触发rebalance + || isGroupSizeNotEqual(removeGroupConfig, allGroup) + // 判断远程节点是不是和本地节点一致的,如果不一致则重新分配 + || isNodeNotMatch(remoteHostIds, localHostIds)) { + + // 删除本地缓存以下线的节点信息 + removeNode(concurrentMap, remoteHostIds, localHostIds); + + // 刷新组配置和删除已关闭的组 + refreshAndRemoveGroup(removeGroupConfig, allGroup); + + // 重新获取DB中最新的服务信息 + refreshCache(remotePods); + + // 触发rebalance + doBalance(); + + // 每次rebalance之后给10秒作为空闲时间,等待其他的节点也完成rebalance + TimeUnit.SECONDS.sleep(INITIAL_DELAY); + + } else { + + // 重新刷新所有的缓存key + refreshCache(remotePods); + + // 再次获取最新的节点信息 + concurrentMap = CacheRegisterTable + .get(ServerRegister.GROUP_NAME); + + // 找出过期的节点 + Set expireNodeSet = concurrentMap.values().stream() + .filter(registerNodeInfo -> registerNodeInfo.getExpireAt().isBefore(LocalDateTime.now())) + .collect(Collectors.toSet()); + for (final RegisterNodeInfo registerNodeInfo : expireNodeSet) { + // 删除过期的节点信息 + CacheRegisterTable.remove(registerNodeInfo.getGroupName(), registerNodeInfo.getHostId()); + // 删除本地消费组信息 + CacheConsumerGroup.remove(registerNodeInfo.getGroupName()); + } + + } + + } catch (InterruptedException e) { + LogUtils.error(log, "check balance interrupt"); + } catch (Exception e) { + LogUtils.error(log, "check balance error", e); + } finally { + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + } + + private boolean isNodeNotMatch(Set remoteHostIds, Set localHostIds) { + boolean b = !remoteHostIds.containsAll(localHostIds); + if (b) { + LogUtils.info(log, "判断远程节点是不是和本地节点一致. remoteHostIds:[{}] localHostIds:[{}]", + localHostIds, + remoteHostIds); + } + return b; + } + + private boolean isNodeSizeNotEqual(int localNodeSize, int remoteNodeSize) { + boolean b = localNodeSize != remoteNodeSize; + if (b) { + LogUtils.info(log, "存在远程和本地缓存的节点的数量不一致则触发rebalance. localNodeSize:[{}] remoteNodeSize:[{}]", + localNodeSize, + remoteNodeSize); + } + return b; + } + + private boolean isGroupSizeNotEqual(List removeGroupConfig, Set allGroup) { + boolean b = allGroup.size() != removeGroupConfig.size(); + if (b) { + LogUtils.info(log, "若存在远程和本地缓存的组的数量不一致则触发rebalance. localGroupSize:[{}] remoteGroupSize:[{}]", + allGroup.size(), + removeGroupConfig.size()); + } + return b; + } + + } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ServerRegister.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ServerRegister.java index 65907a0d..382d7fd9 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ServerRegister.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ServerRegister.java @@ -46,7 +46,7 @@ public class ServerRegister extends AbstractRegister { private SystemProperties systemProperties; static { - CURRENT_CID = IdUtil.simpleUUID(); + CURRENT_CID = IdUtil.getSnowflakeNextIdStr(); } @Override diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/ClearThreadSchedule.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/ClearThreadSchedule.java index 9dfedb10..695bfcba 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/ClearThreadSchedule.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/ClearThreadSchedule.java @@ -51,12 +51,12 @@ public class ClearThreadSchedule { public void clearOfflineNode() { try { + // 删除内存缓存的待下线的机器 + LocalDateTime endTime = LocalDateTime.now().minusSeconds(ServerRegister.DELAY_TIME + (ServerRegister.DELAY_TIME / 3)); // 先删除DB中需要下线的机器 - serverNodeMapper.deleteByExpireAt(LocalDateTime.now().minusSeconds(ServerRegister.DELAY_TIME * 2)); + serverNodeMapper.deleteByExpireAt(endTime); - // 删除内存缓存的待下线的机器 - LocalDateTime endTime = LocalDateTime.now().minusSeconds(ServerRegister.DELAY_TIME * 2); Set allPods = CacheRegisterTable.getAllPods(); Set waitOffline = allPods.stream().filter(registerNodeInfo -> registerNodeInfo.getExpireAt().isBefore(endTime)).collect(Collectors.toSet()); Set podIds = waitOffline.stream().map(RegisterNodeInfo::getHostId).collect(Collectors.toSet()); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/web/model/request/BatchDeleteRetryTaskVO.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/web/model/request/BatchDeleteRetryTaskVO.java index fc427caa..b9878c46 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/web/model/request/BatchDeleteRetryTaskVO.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/web/model/request/BatchDeleteRetryTaskVO.java @@ -9,7 +9,7 @@ import java.util.List; /** * 批量删除重试数据 * - * @author: shuguang.zhang + * @author: www.byteblogs.com * @date : 2023-04-30 22:30 */ @Data diff --git a/easy-retry-server/src/test/java/com/aizuda/easy/retry/server/support/generator/id/SegmentIdGeneratorTest.java b/easy-retry-server/src/test/java/com/aizuda/easy/retry/server/support/generator/id/SegmentIdGeneratorTest.java index eb42343a..7df75bdd 100644 --- a/easy-retry-server/src/test/java/com/aizuda/easy/retry/server/support/generator/id/SegmentIdGeneratorTest.java +++ b/easy-retry-server/src/test/java/com/aizuda/easy/retry/server/support/generator/id/SegmentIdGeneratorTest.java @@ -16,7 +16,7 @@ import java.util.concurrent.CountDownLatch; /** * 测试多线程情况下号段模式的运行情况 * - * @author: shuguang.zhang + * @author: www.byteblogs.com * @date : 2023-05-06 09:16 * @since 1.2.0 */ diff --git a/example/src/main/java/com/example/CodeGen.java b/example/src/main/java/com/example/CodeGen.java deleted file mode 100644 index 3fec40bb..00000000 --- a/example/src/main/java/com/example/CodeGen.java +++ /dev/null @@ -1,40 +0,0 @@ -package com.example; - -import com.baomidou.mybatisplus.annotation.IdType; -import com.baomidou.mybatisplus.generator.FastAutoGenerator; -import com.baomidou.mybatisplus.generator.config.OutputFile; -import com.baomidou.mybatisplus.generator.engine.FreemarkerTemplateEngine; - -import java.util.Collections; - -/** - * @author: www.byteblogs.com - * @date : 2021-09-30 17:19 - */ -public class CodeGen { - - public static void main(String[] args) { - FastAutoGenerator.create("jdbc:mysql://localhost:3306/demo", "root", "root") - .globalConfig(builder -> { - builder.author("www.byteblogs.com") // 设置作者 - .enableSwagger() // 开启 swagger 模式 - .fileOverride() // 覆盖已生成文件 - .outputDir("/Users/zhangshuguang/easy-retry/example/src/main/java"); // 指定输出目录 - }) - .packageConfig(builder -> { - builder.parent("com.example") // 设置父包名 - .moduleName("") // 设置父包模块名 - .entity("po") - .pathInfo(Collections.singletonMap(OutputFile.mapperXml, "/Users/zhangshuguang/easy-retry/example/src/main/resources/mapper")); // 设置mapperXml生成路径 - }) - .strategyConfig(builder -> { - builder.addInclude("school", "student", "teacher", "school_student_teacher") // 设置需要生成的表名 - .entityBuilder().idType(IdType.AUTO).formatFileName("%s") - .serviceBuilder().formatServiceFileName("%sService") - .mapperBuilder().enableBaseResultMap().enableMapperAnnotation() - ; // 设置过滤表前缀 - }) - .templateEngine(new FreemarkerTemplateEngine()) // 使用Freemarker引擎模板,默认的是Velocity引擎模板 - .execute(); - } -} diff --git a/example/src/main/java/com/example/model/TransactionalEvent.java b/example/src/main/java/com/example/model/TransactionalEvent.java index aa940093..ab94c1a5 100644 --- a/example/src/main/java/com/example/model/TransactionalEvent.java +++ b/example/src/main/java/com/example/model/TransactionalEvent.java @@ -4,7 +4,7 @@ import lombok.Getter; import org.springframework.context.ApplicationEvent; /** - * @author: shuguang.zhang + * @author: www.byteblogs.com * @date : 2023-04-25 22:48 */ @Getter