feat: 2.0.0

1. 新增pod列表查询
This commit is contained in:
www.byteblogs.com 2023-06-08 23:49:24 +08:00 committed by byteblogs168
parent f8ff8fd01f
commit a82dc72ec2
8 changed files with 141 additions and 140 deletions

View File

@ -28,15 +28,15 @@ import java.util.stream.Collectors;
@Slf4j
public class CacheConsumerGroup implements Lifecycle {
private static Cache<String /*groupName*/, String/*hostIp*/> CACHE;
private static Cache<String /*groupName*/, String/*groupName*/> CACHE;
/**
* 获取所有缓存
*
* @return 缓存对象
*/
public static Set<ServerNode> getAllPods() {
ConcurrentMap<String, ServerNode> concurrentMap = CACHE.asMap();
public static Set<String> getAllPods() {
ConcurrentMap<String, String> concurrentMap = CACHE.asMap();
return new HashSet<>(concurrentMap.values());
}
@ -46,7 +46,7 @@ public class CacheConsumerGroup implements Lifecycle {
*
* @return 缓存对象
*/
public static ServerNode get(String hostId) {
public static String get(String hostId) {
return CACHE.getIfPresent(hostId);
}
@ -57,11 +57,11 @@ public class CacheConsumerGroup implements Lifecycle {
* @return 缓存对象
*/
public static synchronized void addOrUpdate(String groupName) {
CACHE.put(groupName, "");
CACHE.put(groupName, groupName);
}
public static void remove(String hostId) {
CACHE.invalidate(hostId);
public static void remove(String groupName) {
CACHE.invalidate(groupName);
}
@Override

View File

@ -7,7 +7,9 @@ import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
@ -38,11 +40,15 @@ public class CacheRegisterTable implements Lifecycle {
*/
public static Set<ServerNode> getAllPods() {
ConcurrentMap<String, ConcurrentMap<String, ServerNode>> concurrentMap = CACHE.asMap();
if (CollectionUtils.isEmpty(concurrentMap)) {
return Collections.EMPTY_SET;
}
return (Set<ServerNode>) concurrentMap.values().stream().map(Map::values).reduce((s, y) -> {
Set<ServerNode> mergeSet = new HashSet<>(s);
mergeSet.addAll(y);
return mergeSet;
return concurrentMap.values().stream()
.map(stringServerNodeConcurrentMap -> new HashSet<>(stringServerNodeConcurrentMap.values()))
.reduce((s, y) -> {
s.addAll(y);
return s;
}).get();
}
@ -75,13 +81,16 @@ public class CacheRegisterTable implements Lifecycle {
*
* @return 缓存对象
*/
public static TreeSet<ServerNode> getServerNodeSet(String groupName) {
public static Set<ServerNode> getServerNodeSet(String groupName) {
ConcurrentMap<String, ServerNode> concurrentMap = CACHE.getIfPresent(groupName);
Set<ServerNode> set = new TreeSet<>(Comparator.comparingInt(o -> o.getId().intValue()));
if (Objects.isNull(concurrentMap)) {
return new TreeSet<>();
return set;
}
return new TreeSet<>(Comparator.comparingInt(o -> o.getId().intValue()));
set.addAll(concurrentMap.values());
return set;
}
/**
@ -90,7 +99,8 @@ public class CacheRegisterTable implements Lifecycle {
* @return 缓存对象
*/
public static Set<String> getPodIdSet(String groupName) {
return getServerNodeSet(groupName).stream().map(ServerNode::getHostId).collect(Collectors.toSet());
return getServerNodeSet(groupName).stream()
.map(ServerNode::getHostId).collect(Collectors.toSet());
}
/**
@ -100,12 +110,16 @@ public class CacheRegisterTable implements Lifecycle {
* @return 缓存对象
*/
public static synchronized void addOrUpdate(String groupName, ServerNode serverNode) {
ConcurrentMap<String, ServerNode> concurrentMap = CACHE.getIfPresent(groupName);
if (Objects.isNull(concurrentMap)) {
LogUtils.info(log, "Add cache. groupName:[{}] hostId:[{}]", groupName, serverNode.getHostId());
concurrentMap = new ConcurrentHashMap<>();
CACHE.put(groupName, concurrentMap);
}
LogUtils.info(log, "Update cache. groupName:[{}] hostId:[{}] hostIp:[{}]", groupName,
serverNode.getHostId(), serverNode.getExpireAt());
concurrentMap.put(serverNode.getHostId(), serverNode);
}
@ -115,13 +129,10 @@ public class CacheRegisterTable implements Lifecycle {
return;
}
LogUtils.info(log, "Remove cache. groupName:[{}] hostId:[{}]", groupName, hostId);
concurrentMap.remove(hostId);
}
public static void expirationElimination(String groupName, String hostId) {
}
@Override
public void start() {
LogUtils.info(log, "CacheRegisterTable start");
@ -129,6 +140,7 @@ public class CacheRegisterTable implements Lifecycle {
// 设置并发级别为cpu核心数
.concurrencyLevel(Runtime.getRuntime().availableProcessors())
.build();
}
@Override

View File

@ -1,41 +1,31 @@
package com.aizuda.easy.retry.server.support.dispatch;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum;
import com.aizuda.easy.retry.server.enums.TaskTypeEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.akka.ActorGenerator;
import com.aizuda.easy.retry.server.config.SystemProperties;
import com.aizuda.easy.retry.server.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.ServerNodeMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig;
import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode;
import com.aizuda.easy.retry.server.persistence.support.ConfigAccess;
import com.aizuda.easy.retry.server.support.Lifecycle;
import com.aizuda.easy.retry.server.support.allocate.server.AllocateMessageQueueConsistentHash;
import com.aizuda.easy.retry.server.support.cache.CacheConsumerGroup;
import com.aizuda.easy.retry.server.support.cache.CacheGroupRateLimiter;
import com.aizuda.easy.retry.server.support.cache.CacheGroupScanActor;
import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.support.handler.ServerNodeBalance;
import com.aizuda.easy.retry.server.support.handler.ServerRegisterNodeHandler;
import com.aizuda.easy.retry.server.support.register.ServerRegister;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.cache.Cache;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* 分发器组件
@ -66,9 +56,6 @@ public class DispatchService implements Lifecycle {
@Autowired
private ServerNodeMapper serverNodeMapper;
@Autowired
private ServerNodeBalance serverNodeBalance;
@Autowired
private SystemProperties systemProperties;
@ -152,7 +139,7 @@ public class DispatchService implements Lifecycle {
* @return {@link GroupConfig} 组上下文
*/
private Set<String> getCurrentHostGroupList() {
return serverNodeBalance.getLocalCidAll();
return CacheConsumerGroup.getAllPods();
}
@Override

View File

@ -21,6 +21,7 @@ import java.util.Objects;
import java.util.concurrent.*;
/**
* todo 待优化
* @author www.byteblogs.com
* @date 2022-03-09
* @since 2.0
@ -33,8 +34,6 @@ public class ClientRegisterHandler {
new LinkedBlockingQueue<>(500), r -> new Thread(r, "CLIENT REGISTER THREAD"), (r, executor) -> LogUtils.error(log, "处理注册线程池已经超负荷运作"));
public static final String URL = "http://{0}:{1}/{2}/retry/sync/version/v1";
@Autowired
private ServerNodeMapper serverNodeMapper;
@Autowired
private RestTemplate restTemplate;
@Autowired

View File

@ -1,24 +1,36 @@
package com.aizuda.easy.retry.server.support.handler;
import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.ServerNodeMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig;
import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode;
import com.aizuda.easy.retry.server.persistence.support.ConfigAccess;
import com.aizuda.easy.retry.server.support.Lifecycle;
import com.aizuda.easy.retry.server.support.allocate.server.AllocateMessageQueueConsistentHash;
import com.aizuda.easy.retry.server.support.cache.CacheConsumerGroup;
import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.support.register.ServerRegister;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* @author: shuguang.zhang
@ -26,11 +38,15 @@ import java.util.stream.Collectors;
*/
@Component
@Slf4j
public class ServerNodeBalance {
public class ServerNodeBalance implements Lifecycle {
@Autowired
@Qualifier("configAccessProcessor")
private ConfigAccess configAccess;
private final ScheduledExecutorService serverRegisterNode = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "ServerRegisterNode"));
@Autowired
protected ServerNodeMapper serverNodeMapper;
public void doBalance() {
@ -52,10 +68,73 @@ public class ServerNodeBalance {
.collect(Collectors.toSet());
List<String> allocate = new AllocateMessageQueueConsistentHash()
.allocate(ServerRegisterNodeHandler.CURRENT_CID, new ArrayList<>(groupNameSet), new ArrayList<>(podIpSet));
.allocate(ServerRegister.CURRENT_CID, new ArrayList<>(groupNameSet), new ArrayList<>(podIpSet));
for (final String groupName : allocate) {
CacheConsumerGroup.addOrUpdate(groupName);
}
}
@Override
public void start() {
LogUtils.info(log, "ServerNodeBalance start");
serverRegisterNode.scheduleAtFixedRate(() -> {
try {
ConcurrentMap<String, ServerNode> concurrentMap = CacheRegisterTable.get(ServerRegister.GROUP_NAME);
if (!CollectionUtils.isEmpty(concurrentMap)) {
Set<ServerNode> sorted = new TreeSet<>(Comparator.comparing(ServerNode::getExpireAt));
sorted.addAll(concurrentMap.values());
sorted.stream().findFirst().ifPresent(serverNode -> {
// 若服务端的POD注册的第一个节点是过期了则触发rebalance
if (serverNode.getExpireAt().isBefore(LocalDateTime.now())) {
// 删除过期的节点信息
CacheRegisterTable.remove(serverNode.getGroupName(), serverNode.getHostId());
// 删除本地消费组信息
CacheConsumerGroup.remove(serverNode.getGroupName());
// 重新刷新所有的缓存key
refreshCache();
// 触发rebalance
doBalance();
}
});
} else {
// 若不存在服务端的POD注册信息直接rebalance
// 重新获取DB中最新的服务信息
refreshCache();
// 触发rebalance
doBalance();
}
} catch (Exception e) {
LogUtils.error(log, "", e);
}
}, 10, 1, TimeUnit.SECONDS);
}
private void refreshCache() {
// 重新获取DB中最新的服务信息
List<ServerNode> serverNodes = serverNodeMapper.selectList(
new LambdaQueryWrapper<ServerNode>()
.eq(ServerNode::getNodeType, NodeTypeEnum.SERVER.getType()));
// 刷新最新的节点注册信息
for (ServerNode node : serverNodes) {
CacheRegisterTable.addOrUpdate(node.getGroupName(), node);
}
}
@Override
public void close() {
LogUtils.info(log, "ServerNodeBalance close");
LogUtils.info(log, "准备删除节点 [{}]", ServerRegister.CURRENT_CID);
int i = serverNodeMapper.delete(new LambdaQueryWrapper<ServerNode>().eq(ServerNode::getHostId, ServerRegister.CURRENT_CID));
if (1 == i) {
LogUtils.info(log,"删除节点 [{}]成功", ServerRegister.CURRENT_CID);
} else {
LogUtils.info(log,"删除节点 [{}]失败", ServerRegister.CURRENT_CID);
}
}
}

View File

@ -1,80 +0,0 @@
package com.aizuda.easy.retry.server.support.handler;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.HostUtils;
import com.aizuda.easy.retry.server.config.SystemProperties;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.ServerNodeMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode;
import com.aizuda.easy.retry.server.support.Lifecycle;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 服务端注册处理器
*
* @author: www.byteblogs.com
* @date : 2021-11-19 14:49
*/
@Component
@Slf4j
public class ServerRegisterNodeHandler implements Lifecycle {
@Autowired
private ServerNodeMapper serverNodeMapper;
@Autowired
private SystemProperties systemProperties;
private final ScheduledExecutorService serverRegisterNode = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r,"ServerRegisterNode"));
public static final int DELAY_TIME = 20;
public static final String CURRENT_CID;
static {
CURRENT_CID = IdUtil.simpleUUID();
}
@Override
public void start() {
ServerNode serverNode = new ServerNode();
serverNode.setHostId(CURRENT_CID);
serverNode.setHostIp(HostUtils.getIp());
serverNode.setGroupName(StrUtil.EMPTY);
serverNode.setHostPort(systemProperties.getNettyPort());
serverNode.setNodeType(NodeTypeEnum.SERVER.getType());
serverNode.setCreateDt(LocalDateTime.now());
serverNode.setContextPath(StrUtil.EMPTY);
serverRegisterNode.scheduleAtFixedRate(()->{
try {
serverNode.setExpireAt(LocalDateTime.now().plusSeconds(DELAY_TIME));
serverNodeMapper.insertOrUpdate(serverNode);
}catch (Exception e) {
LogUtils.error(log,"服务端注册节点失败", e);
}
}, 1, DELAY_TIME / 2, TimeUnit.SECONDS);
}
@Override
public void close() {
LogUtils.info(log, "准备删除节点 [{}]", CURRENT_CID);
int i = serverNodeMapper.delete(new LambdaQueryWrapper<ServerNode>().eq(ServerNode::getHostId, CURRENT_CID));
if (1 == i) {
LogUtils.info(log,"删除节点 [{}]成功", CURRENT_CID);
} else {
LogUtils.info(log,"删除节点 [{}]失败", CURRENT_CID);
}
}
}

View File

@ -11,6 +11,7 @@ import com.aizuda.easy.retry.server.support.Register;
import com.aizuda.easy.retry.server.support.cache.CacheConsumerGroup;
import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.support.handler.ServerNodeBalance;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -28,13 +29,16 @@ import java.util.concurrent.TimeUnit;
* @date 2023-06-07
* @since 1.6.0
*/
@Component
@Component(ServerRegister.BEAN_NAME)
@Slf4j
public class ServerRegister extends AbstractRegister {
public static final String BEAN_NAME = "serverRegister";
private final ScheduledExecutorService serverRegisterNode = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r,"ServerRegisterNode"));
public static final int DELAY_TIME = 30;
public static final String CURRENT_CID;
public static final String GROUP_NAME = "DEFAULT_SERVER";
@Autowired
public ServerNodeBalance serverNodeBalance;
@ -53,7 +57,6 @@ public class ServerRegister extends AbstractRegister {
context.setGroupName(GROUP_NAME);
context.setHostId(CURRENT_CID);
context.setHostIp(HostUtils.getIp());
context.setGroupName(StrUtil.EMPTY);
context.setContextPath(StrUtil.EMPTY);
}
@ -65,14 +68,6 @@ public class ServerRegister extends AbstractRegister {
@Override
protected boolean doRegister(RegisterContext context, ServerNode serverNode) {
refreshExpireAt(serverNode);
Set<ServerNode> serverNodeSet = CacheRegisterTable.getServerNodeSet(context.getGroupName());
for (final ServerNode node : serverNodeSet) {
ServerNode consumerServerNode = CacheConsumerGroup.get(node.getHostId());
if (consumerServerNode.getExpireAt().isBefore(LocalDateTime.now())) {
// 触发rebalance
serverNodeBalance.doBalance();
}
}
return Boolean.TRUE;
}
@ -83,15 +78,20 @@ public class ServerRegister extends AbstractRegister {
@Override
public void start() {
Register register = SpringContext.getBean("serverRegister", Register.class);
LogUtils.info(log, "ServerRegister start");
// 先删除已经过期未删除的节点
serverNodeMapper.deleteByExpireAt(LocalDateTime.now().minusSeconds(ServerRegister.DELAY_TIME * 2));
Register register = SpringContext.getBean(ServerRegister.BEAN_NAME, Register.class);
serverRegisterNode.scheduleAtFixedRate(()->{
register.register(new RegisterContext());
}, 1, DELAY_TIME / 2, TimeUnit.SECONDS);
}, 0, DELAY_TIME / 2, TimeUnit.SECONDS);
}
@Override
public void close() {
LogUtils.info(log, "ServerRegister close");
}
}

View File

@ -8,7 +8,7 @@ import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode;
import com.aizuda.easy.retry.server.persistence.support.ConfigAccess;
import com.aizuda.easy.retry.server.service.RetryService;
import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.support.handler.ServerRegisterNodeHandler;
import com.aizuda.easy.retry.server.support.register.ServerRegister;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
@ -16,6 +16,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.Set;
@ -49,14 +50,17 @@ public class ClearThreadSchedule {
public void clearOfflineNode() {
try {
LocalDateTime endTime = LocalDateTime.now().minusSeconds(ServerRegisterNodeHandler.DELAY_TIME * 2);
Set<ServerNode> allPods = CacheRegisterTable.getAllPods();
Set<ServerNode> waitOffline = allPods.stream().filter(serverNode -> serverNode.getExpireAt().isAfter(endTime)).collect(Collectors.toSet());
Set<String> podIds = waitOffline.stream().map(ServerNode::getHostId).collect(Collectors.toSet());
int delete = serverNodeMapper
.delete(new LambdaQueryWrapper<ServerNode>().in(ServerNode::getHostId, podIds));
Assert.isTrue(delete > 0, () -> new EasyRetryServerException("clearOfflineNode error"));
// TODO ING
serverNodeMapper.deleteByExpireAt(LocalDateTime.now().minusSeconds(ServerRegister.DELAY_TIME * 2));
LocalDateTime endTime = LocalDateTime.now().minusSeconds(ServerRegister.DELAY_TIME * 2);
Set<ServerNode> allPods = CacheRegisterTable.getAllPods();
Set<ServerNode> waitOffline = allPods.stream().filter(serverNode -> serverNode.getExpireAt().isBefore(endTime)).collect(Collectors.toSet());
Set<String> podIds = waitOffline.stream().map(ServerNode::getHostId).collect(Collectors.toSet());
if (CollectionUtils.isEmpty(podIds)) {
return;
}
for (final ServerNode serverNode : waitOffline) {
CacheRegisterTable.remove(serverNode.getGroupName(), serverNode.getHostId());