feat: 2.0.0

1. 优化代码
This commit is contained in:
www.byteblogs.com 2023-06-10 10:31:14 +08:00 committed by byteblogs168
parent 04a7b09eda
commit 9826006808
11 changed files with 169 additions and 139 deletions

View File

@ -135,6 +135,7 @@ CREATE TABLE `server_node`
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
KEY `idx_expire_at_node_type` (`expire_at`,`node_type`),
UNIQUE KEY `uk_host_id_host_ip` (`host_id`,`host_ip`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4 COMMENT='服务器节点'
;

View File

@ -3,7 +3,7 @@ package com.aizuda.easy.retry.common.core.exception;
import lombok.Data;
/**
* @author zhangshuguang
* @author www.byteblogs.com
* @date 2020/05/13
*/
@Data

View File

@ -6,7 +6,7 @@ import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
/**
* @author: shuguang.zhang
* @author: www.byteblogs.com
* @date : 2023-05-05 16:15
*/
@Mapper

View File

@ -11,6 +11,7 @@ import org.springframework.util.CollectionUtils;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentMap;
/**
@ -37,7 +38,7 @@ public class CacheGroup implements Lifecycle {
return Collections.EMPTY_SET;
}
return new HashSet<>(concurrentMap.values());
return new TreeSet<>(concurrentMap.values());
}

View File

@ -25,25 +25,30 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
* @author: shuguang.zhang
* 负责处理组或者节点变化时重新分配组在不同的节点上消费
*
* @author: www.byteblogs.com
* @date : 2023-06-08 15:58
* @since 1.6.0
*/
@Component
@Slf4j
public class ServerNodeBalance implements Lifecycle {
public class ServerNodeBalance implements Lifecycle, Runnable {
/**
* 延迟10s为了尽可能保障集群节点都启动完成在进行rebalance
*/
public static final Long INITIAL_DELAY = 10L;
@Autowired
@Qualifier("configAccessProcessor")
private ConfigAccess configAccess;
private final ScheduledExecutorService serverRegisterNode = Executors
.newSingleThreadScheduledExecutor(r -> new Thread(r, "server-node-balance"));
private Thread THREAD = null;
@Autowired
protected ServerNodeMapper serverNodeMapper;
@ -58,27 +63,28 @@ public class ServerNodeBalance implements Lifecycle {
RE_BALANCE_ING.set(Boolean.TRUE);
try {
// 已经按照id 正序排序
List<GroupConfig> prepareAllocateGroupConfig = configAccess.getAllOpenGroupConfig();
if (CollectionUtils.isEmpty(prepareAllocateGroupConfig)) {
return;
}
// 为了保证客户端分配算法的一致性,serverNodes 从数据库从数据获取
Set<String> podIpSet = CacheRegisterTable.getPodIdSet(ServerRegister.GROUP_NAME);
if (CollectionUtils.isEmpty(podIpSet)) {
LogUtils.error(log, "服务端节点为空");
LogUtils.error(log, "server node is empty");
return;
}
Set<String> groupNameSet = prepareAllocateGroupConfig.stream().map(GroupConfig::getGroupName)
.collect(Collectors.toSet());
Set<String> allGroup = CacheGroup.getAllGroup();
if (CollectionUtils.isEmpty(allGroup)) {
LogUtils.error(log, "group is empty");
return;
}
List<String> allocate = new AllocateMessageQueueConsistentHash()
.allocate(ServerRegister.CURRENT_CID, new ArrayList<>(groupNameSet), new ArrayList<>(podIpSet));
.allocate(ServerRegister.CURRENT_CID, new ArrayList<>(allGroup), new ArrayList<>(podIpSet));
// 删除本地缓存的所有组信息
CacheConsumerGroup.clear();
for (final String groupName : allocate) {
// 重新覆盖本地分配的组信息
for (String groupName : allocate) {
CacheConsumerGroup.addOrUpdate(groupName);
}
@ -94,84 +100,26 @@ public class ServerNodeBalance implements Lifecycle {
@Override
public void start() {
LogUtils.info(log, "ServerNodeBalance start");
serverRegisterNode.scheduleAtFixedRate(() -> {
try {
THREAD = new Thread(this, "server-node-balance");
THREAD.start();
}
List<ServerNode> remotePods = serverNodeMapper.selectList(new LambdaQueryWrapper<ServerNode>()
.eq(ServerNode::getNodeType, NodeTypeEnum.SERVER.getType()));
private void removeNode(ConcurrentMap<String, RegisterNodeInfo> concurrentMap, Set<String> remoteHostIds, Set<String> localHostIds) {
// 获取缓存中的节点
ConcurrentMap<String/*hostId*/, RegisterNodeInfo> concurrentMap = CacheRegisterTable
.get(ServerRegister.GROUP_NAME);
Set<String> remoteHostIds = remotePods.stream().map(ServerNode::getHostId).collect(Collectors.toSet());
Set<String> localHostIds = concurrentMap.values().stream().map(RegisterNodeInfo::getHostId)
.collect(Collectors.toSet());
List<GroupConfig> removeGroupConfig = configAccess.getAllOpenGroupConfig();
Set<String> allGroup = CacheGroup.getAllGroup();
// 无缓存的节点触发refreshCache
if (CollectionUtils.isEmpty(concurrentMap)
// 节点数量不一致触发
|| remotePods.size() != concurrentMap.size()
// 若存在新的组则触发rebalance
|| allGroup.size() != removeGroupConfig.size()
// 判断远程节点是不是和本地节点一致的如果不一致则重新分配
|| !remoteHostIds.containsAll(localHostIds)) {
localHostIds.removeAll(remoteHostIds);
for (String localHostId : localHostIds) {
RegisterNodeInfo registerNodeInfo = concurrentMap.get(localHostId);
// 删除过期的节点信息
CacheRegisterTable.remove(registerNodeInfo.getGroupName(), registerNodeInfo.getHostId());
// 删除本地消费组信息
CacheConsumerGroup.remove(registerNodeInfo.getGroupName());
}
// 刷新组配置和删除已关闭的组
refreshAndRemoveGroup(removeGroupConfig, allGroup);
// 重新获取DB中最新的服务信息
refreshCache(remotePods);
// 触发rebalance
doBalance();
} else {
// 重新刷新所有的缓存key
refreshCache(remotePods);
// 再次获取最新的节点信息
concurrentMap = CacheRegisterTable
.get(ServerRegister.GROUP_NAME);
// 找出过期的节点
Set<RegisterNodeInfo> expireNodeSet = concurrentMap.values().stream()
.filter(registerNodeInfo -> registerNodeInfo.getExpireAt().isBefore(LocalDateTime.now()))
.collect(Collectors.toSet());
for (final RegisterNodeInfo registerNodeInfo : expireNodeSet) {
// 删除过期的节点信息
CacheRegisterTable.remove(registerNodeInfo.getGroupName(), registerNodeInfo.getHostId());
// 删除本地消费组信息
CacheConsumerGroup.remove(registerNodeInfo.getGroupName());
}
}
} catch (Exception e) {
LogUtils.error(log, "check balance error", e);
}
}, 10, 1, TimeUnit.SECONDS);
localHostIds.removeAll(remoteHostIds);
for (String localHostId : localHostIds) {
RegisterNodeInfo registerNodeInfo = concurrentMap.get(localHostId);
// 删除过期的节点信息
CacheRegisterTable.remove(registerNodeInfo.getGroupName(), registerNodeInfo.getHostId());
// 删除本地消费组信息
CacheConsumerGroup.remove(registerNodeInfo.getGroupName());
}
}
private void refreshAndRemoveGroup(List<GroupConfig> removeGroupConfig, final Set<String> allGroup) {
if (allGroup.size() != removeGroupConfig.size()) {
allGroup.removeAll(removeGroupConfig.stream()
.map(GroupConfig::getGroupName).collect(Collectors.toSet()));
.map(GroupConfig::getGroupName).collect(Collectors.toSet()));
// 删除已关闭的组
for (String groupName : allGroup) {
@ -197,11 +145,11 @@ public class ServerNodeBalance implements Lifecycle {
public void close() {
// 停止定时任务
serverRegisterNode.shutdown();
THREAD.interrupt();
LogUtils.info(log, "ServerNodeBalance start. ");
int i = serverNodeMapper
.delete(new LambdaQueryWrapper<ServerNode>().eq(ServerNode::getHostId, ServerRegister.CURRENT_CID));
.delete(new LambdaQueryWrapper<ServerNode>().eq(ServerNode::getHostId, ServerRegister.CURRENT_CID));
if (1 == i) {
LogUtils.info(log, "delete node success. [{}]", ServerRegister.CURRENT_CID);
} else {
@ -210,4 +158,124 @@ public class ServerNodeBalance implements Lifecycle {
LogUtils.info(log, "ServerNodeBalance close complete");
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(INITIAL_DELAY);
} catch (InterruptedException e) {
e.printStackTrace();
}
while (!Thread.currentThread().isInterrupted()) {
try {
List<ServerNode> remotePods = serverNodeMapper.selectList(new LambdaQueryWrapper<ServerNode>()
.ge(ServerNode::getExpireAt, LocalDateTime.now())
.eq(ServerNode::getNodeType, NodeTypeEnum.SERVER.getType()));
// 获取缓存中的节点
ConcurrentMap<String/*hostId*/, RegisterNodeInfo> concurrentMap = CacheRegisterTable
.get(ServerRegister.GROUP_NAME);
Set<String> remoteHostIds = remotePods.stream().map(ServerNode::getHostId).collect(Collectors.toSet());
Set<String> localHostIds = concurrentMap.values().stream().map(RegisterNodeInfo::getHostId)
.collect(Collectors.toSet());
List<GroupConfig> removeGroupConfig = configAccess.getAllOpenGroupConfig();
Set<String> allGroup = CacheGroup.getAllGroup();
// 无缓存的节点触发refreshCache
if (CollectionUtils.isEmpty(concurrentMap)
// 节点数量不一致触发
|| isNodeSizeNotEqual(remotePods.size(), concurrentMap.size())
// 若存在远程和本地缓存的组的数量不一致则触发rebalance
|| isGroupSizeNotEqual(removeGroupConfig, allGroup)
// 判断远程节点是不是和本地节点一致的如果不一致则重新分配
|| isNodeNotMatch(remoteHostIds, localHostIds)) {
// 删除本地缓存以下线的节点信息
removeNode(concurrentMap, remoteHostIds, localHostIds);
// 刷新组配置和删除已关闭的组
refreshAndRemoveGroup(removeGroupConfig, allGroup);
// 重新获取DB中最新的服务信息
refreshCache(remotePods);
// 触发rebalance
doBalance();
// 每次rebalance之后给10秒作为空闲时间等待其他的节点也完成rebalance
TimeUnit.SECONDS.sleep(INITIAL_DELAY);
} else {
// 重新刷新所有的缓存key
refreshCache(remotePods);
// 再次获取最新的节点信息
concurrentMap = CacheRegisterTable
.get(ServerRegister.GROUP_NAME);
// 找出过期的节点
Set<RegisterNodeInfo> expireNodeSet = concurrentMap.values().stream()
.filter(registerNodeInfo -> registerNodeInfo.getExpireAt().isBefore(LocalDateTime.now()))
.collect(Collectors.toSet());
for (final RegisterNodeInfo registerNodeInfo : expireNodeSet) {
// 删除过期的节点信息
CacheRegisterTable.remove(registerNodeInfo.getGroupName(), registerNodeInfo.getHostId());
// 删除本地消费组信息
CacheConsumerGroup.remove(registerNodeInfo.getGroupName());
}
}
} catch (InterruptedException e) {
LogUtils.error(log, "check balance interrupt");
} catch (Exception e) {
LogUtils.error(log, "check balance error", e);
} finally {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private boolean isNodeNotMatch(Set<String> remoteHostIds, Set<String> localHostIds) {
boolean b = !remoteHostIds.containsAll(localHostIds);
if (b) {
LogUtils.info(log, "判断远程节点是不是和本地节点一致. remoteHostIds:[{}] localHostIds:[{}]",
localHostIds,
remoteHostIds);
}
return b;
}
private boolean isNodeSizeNotEqual(int localNodeSize, int remoteNodeSize) {
boolean b = localNodeSize != remoteNodeSize;
if (b) {
LogUtils.info(log, "存在远程和本地缓存的节点的数量不一致则触发rebalance. localNodeSize:[{}] remoteNodeSize:[{}]",
localNodeSize,
remoteNodeSize);
}
return b;
}
private boolean isGroupSizeNotEqual(List<GroupConfig> removeGroupConfig, Set<String> allGroup) {
boolean b = allGroup.size() != removeGroupConfig.size();
if (b) {
LogUtils.info(log, "若存在远程和本地缓存的组的数量不一致则触发rebalance. localGroupSize:[{}] remoteGroupSize:[{}]",
allGroup.size(),
removeGroupConfig.size());
}
return b;
}
}

View File

@ -46,7 +46,7 @@ public class ServerRegister extends AbstractRegister {
private SystemProperties systemProperties;
static {
CURRENT_CID = IdUtil.simpleUUID();
CURRENT_CID = IdUtil.getSnowflakeNextIdStr();
}
@Override

View File

@ -51,12 +51,12 @@ public class ClearThreadSchedule {
public void clearOfflineNode() {
try {
// 删除内存缓存的待下线的机器
LocalDateTime endTime = LocalDateTime.now().minusSeconds(ServerRegister.DELAY_TIME + (ServerRegister.DELAY_TIME / 3));
// 先删除DB中需要下线的机器
serverNodeMapper.deleteByExpireAt(LocalDateTime.now().minusSeconds(ServerRegister.DELAY_TIME * 2));
serverNodeMapper.deleteByExpireAt(endTime);
// 删除内存缓存的待下线的机器
LocalDateTime endTime = LocalDateTime.now().minusSeconds(ServerRegister.DELAY_TIME * 2);
Set<RegisterNodeInfo> allPods = CacheRegisterTable.getAllPods();
Set<RegisterNodeInfo> waitOffline = allPods.stream().filter(registerNodeInfo -> registerNodeInfo.getExpireAt().isBefore(endTime)).collect(Collectors.toSet());
Set<String> podIds = waitOffline.stream().map(RegisterNodeInfo::getHostId).collect(Collectors.toSet());

View File

@ -9,7 +9,7 @@ import java.util.List;
/**
* 批量删除重试数据
*
* @author: shuguang.zhang
* @author: www.byteblogs.com
* @date : 2023-04-30 22:30
*/
@Data

View File

@ -16,7 +16,7 @@ import java.util.concurrent.CountDownLatch;
/**
* 测试多线程情况下号段模式的运行情况
*
* @author: shuguang.zhang
* @author: www.byteblogs.com
* @date : 2023-05-06 09:16
* @since 1.2.0
*/

View File

@ -1,40 +0,0 @@
package com.example;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.generator.FastAutoGenerator;
import com.baomidou.mybatisplus.generator.config.OutputFile;
import com.baomidou.mybatisplus.generator.engine.FreemarkerTemplateEngine;
import java.util.Collections;
/**
* @author: www.byteblogs.com
* @date : 2021-09-30 17:19
*/
public class CodeGen {
public static void main(String[] args) {
FastAutoGenerator.create("jdbc:mysql://localhost:3306/demo", "root", "root")
.globalConfig(builder -> {
builder.author("www.byteblogs.com") // 设置作者
.enableSwagger() // 开启 swagger 模式
.fileOverride() // 覆盖已生成文件
.outputDir("/Users/zhangshuguang/easy-retry/example/src/main/java"); // 指定输出目录
})
.packageConfig(builder -> {
builder.parent("com.example") // 设置父包名
.moduleName("") // 设置父包模块名
.entity("po")
.pathInfo(Collections.singletonMap(OutputFile.mapperXml, "/Users/zhangshuguang/easy-retry/example/src/main/resources/mapper")); // 设置mapperXml生成路径
})
.strategyConfig(builder -> {
builder.addInclude("school", "student", "teacher", "school_student_teacher") // 设置需要生成的表名
.entityBuilder().idType(IdType.AUTO).formatFileName("%s")
.serviceBuilder().formatServiceFileName("%sService")
.mapperBuilder().enableBaseResultMap().enableMapperAnnotation()
; // 设置过滤表前缀
})
.templateEngine(new FreemarkerTemplateEngine()) // 使用Freemarker引擎模板默认的是Velocity引擎模板
.execute();
}
}

View File

@ -4,7 +4,7 @@ import lombok.Getter;
import org.springframework.context.ApplicationEvent;
/**
* @author: shuguang.zhang
* @author: www.byteblogs.com
* @date : 2023-04-25 22:48
*/
@Getter