feat: 2.0.0

1. 优化路由注册
This commit is contained in:
byteblogs168 2023-06-08 18:43:29 +08:00
parent 9d1c417f46
commit f8ff8fd01f
14 changed files with 265 additions and 94 deletions

View File

@ -79,7 +79,7 @@ public class MybatisConfigAccess extends AbstractConfigAccess {
@Override
public List<GroupConfig> getAllConfigGroupList() {
List<GroupConfig> allSystemConfigGroupList = groupConfigMapper.selectList(new LambdaQueryWrapper<>());
List<GroupConfig> allSystemConfigGroupList = groupConfigMapper.selectList(new LambdaQueryWrapper<GroupConfig>().orderByAsc(GroupConfig::getId));
if (CollectionUtils.isEmpty(allSystemConfigGroupList)) {
return Collections.EMPTY_LIST;
}

View File

@ -2,12 +2,15 @@ package com.aizuda.easy.retry.server.server;
import cn.hutool.core.net.url.UrlBuilder;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.support.Register;
import com.aizuda.easy.retry.server.support.handler.ClientRegisterHandler;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.enums.HeadersEnum;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.server.handler.HttpRequestHandler;
import com.aizuda.easy.retry.server.support.register.ClientRegister;
import com.aizuda.easy.retry.server.support.register.RegisterContext;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
@ -50,16 +53,25 @@ public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttp
throw new EasyRetryServerException("uri 不能为空");
}
ClientRegisterHandler registerHandler = SpringContext.getBeanByType(ClientRegisterHandler.class);
registerHandler.registerClient(headers);
Register register = SpringContext.getBean("clientRegister", Register.class);
Integer clientVersion = headers.getInt(HeadersEnum.VERSION.getKey());
String groupName = headers.get(HeadersEnum.GROUP_NAME.getKey());
RegisterContext registerContext = new RegisterContext();
String hostId = headers.get(HeadersEnum.HOST_ID.getKey());
String hostIp = headers.get(HeadersEnum.HOST_IP.getKey());
Integer hostPort = headers.getInt(HeadersEnum.HOST_PORT.getKey());
String groupName = headers.get(HeadersEnum.GROUP_NAME.getKey());
String contextPath = headers.get(HeadersEnum.CONTEXT_PATH.getKey());
registerContext.setContextPath(contextPath);
registerContext.setGroupName(groupName);
registerContext.setHostPort(hostPort);
registerContext.setHostIp(hostIp);
registerContext.setHostId(hostId);
registerHandler.syncVersion(clientVersion, groupName, hostIp, hostPort, contextPath);
register.register(registerContext);
// Integer clientVersion = headers.getInt(HeadersEnum.VERSION.getKey());
// registerHandler.syncVersion(clientVersion, groupName, hostIp, hostPort, contextPath);
UrlBuilder builder = UrlBuilder.ofHttp(uri);
Collection<HttpRequestHandler> httpRequestHandlers = SpringContext.CONTEXT.getBeansOfType(HttpRequestHandler.class).values();

View File

@ -16,8 +16,6 @@
*/
package com.aizuda.easy.retry.server.support;
import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig;
import java.util.List;
/**
@ -29,12 +27,12 @@ public interface ServerLoadBalance {
* Allocating by consumer id
*
* @param currentCID current consumer id
* @param groupList consumer set in current consumer group
* @param groupNameList consumer set in current consumer group
* @return The allocate result of given strategy
*/
List<GroupConfig> allocate(
List<String> allocate(
final String currentCID,
final List<GroupConfig> groupList,
final List<String> groupNameList,
final List<String> serverList
);

View File

@ -18,7 +18,6 @@ package com.aizuda.easy.retry.server.support.allocate.server;
import com.aizuda.easy.retry.server.support.allocate.common.ConsistentHashRouter;
import com.aizuda.easy.retry.server.support.allocate.common.HashFunction;
import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig;
import com.aizuda.easy.retry.server.support.ServerLoadBalance;
import com.aizuda.easy.retry.server.support.allocate.common.Node;
import org.springframework.util.CollectionUtils;
@ -52,7 +51,7 @@ public class AllocateMessageQueueConsistentHash implements ServerLoadBalance {
}
@Override
public List<GroupConfig> allocate(String currentCID, List<GroupConfig> groupList,
public List<String> allocate(String currentCID, List<String> groupList,
List<String> serverList) {
if (currentCID == null || currentCID.length() < 1) {
@ -65,7 +64,7 @@ public class AllocateMessageQueueConsistentHash implements ServerLoadBalance {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<GroupConfig> result = new ArrayList<>();
List<String> result = new ArrayList<>();
if (!serverList.contains(currentCID)) {
return result;
}
@ -82,11 +81,11 @@ public class AllocateMessageQueueConsistentHash implements ServerLoadBalance {
router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
}
List<GroupConfig> results = new ArrayList<>();
for (GroupConfig groupConfig : groupList) {
ClientNode clientNode = router.routeNode(groupConfig.getGroupName());
List<String> results = new ArrayList<>();
for (String groupName : groupList) {
ClientNode clientNode = router.routeNode(groupName);
if (clientNode != null && currentCID.equals(clientNode.getKey())) {
results.add(groupConfig);
results.add(groupName);
}
}

View File

@ -0,0 +1,81 @@
package com.aizuda.easy.retry.server.support.cache;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode;
import com.aizuda.easy.retry.server.support.Lifecycle;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
/**
* 当前POD负责消费的组
*
* @author www.byteblogs.com
* @date 2021-10-30
* @since 2.0
*/
@Component
@Slf4j
public class CacheConsumerGroup implements Lifecycle {
private static Cache<String /*groupName*/, String/*hostIp*/> CACHE;
/**
* 获取所有缓存
*
* @return 缓存对象
*/
public static Set<ServerNode> getAllPods() {
ConcurrentMap<String, ServerNode> concurrentMap = CACHE.asMap();
return new HashSet<>(concurrentMap.values());
}
/**
* 获取所有缓存
*
* @return 缓存对象
*/
public static ServerNode get(String hostId) {
return CACHE.getIfPresent(hostId);
}
/**
* 无缓存时添加
* 有缓存时更新
*
* @return 缓存对象
*/
public static synchronized void addOrUpdate(String groupName) {
CACHE.put(groupName, "");
}
public static void remove(String hostId) {
CACHE.invalidate(hostId);
}
@Override
public void start() {
LogUtils.info(log, "CacheRegisterTable start");
CACHE = CacheBuilder.newBuilder()
// 设置并发级别为cpu核心数
.concurrencyLevel(Runtime.getRuntime().availableProcessors())
.build();
}
@Override
public void close() {
LogUtils.info(log, "CacheRegisterTable stop");
CACHE.invalidateAll();
}
}

View File

@ -1,19 +1,17 @@
package com.aizuda.easy.retry.server.support.cache;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode;
import com.aizuda.easy.retry.server.support.Lifecycle;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Observable;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
@ -28,12 +26,27 @@ import java.util.stream.Collectors;
* @since 2.0
*/
@Component
@Data
@Slf4j
public class CacheRegisterTable implements Lifecycle {
private static Cache<String, ConcurrentMap<String, ServerNode>> CACHE;
/**
* 获取所有缓存
*
* @return 缓存对象
*/
public static Set<ServerNode> getAllPods() {
ConcurrentMap<String, ConcurrentMap<String, ServerNode>> concurrentMap = CACHE.asMap();
return (Set<ServerNode>) concurrentMap.values().stream().map(Map::values).reduce((s, y) -> {
Set<ServerNode> mergeSet = new HashSet<>(s);
mergeSet.addAll(y);
return mergeSet;
}).get();
}
/**
* 获取所有缓存
*
@ -72,24 +85,27 @@ public class CacheRegisterTable implements Lifecycle {
}
/**
* 获取排序的ServerNode
* 获取排序的hostId
*
* @return 缓存对象
*/
public static Set<String> gePodIpSet(String groupName) {
return getServerNodeSet(groupName).stream().map(ServerNode::getHostIp).collect(Collectors.toSet());
public static Set<String> getPodIdSet(String groupName) {
return getServerNodeSet(groupName).stream().map(ServerNode::getHostId).collect(Collectors.toSet());
}
/**
* 获取所有缓存
* 无缓存时添加
* 有缓存时更新
*
* @return 缓存对象
*/
public static void put(String groupName, ServerNode serverNode) {
public static synchronized void addOrUpdate(String groupName, ServerNode serverNode) {
ConcurrentMap<String, ServerNode> concurrentMap = CACHE.getIfPresent(groupName);
if (Objects.isNull(concurrentMap)) {
concurrentMap = new ConcurrentHashMap<>();
CACHE.put(groupName, concurrentMap);
}
concurrentMap.put(serverNode.getHostId(), serverNode);
}

View File

@ -14,7 +14,10 @@ import com.aizuda.easy.retry.server.support.Lifecycle;
import com.aizuda.easy.retry.server.support.allocate.server.AllocateMessageQueueConsistentHash;
import com.aizuda.easy.retry.server.support.cache.CacheGroupRateLimiter;
import com.aizuda.easy.retry.server.support.cache.CacheGroupScanActor;
import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.support.handler.ServerNodeBalance;
import com.aizuda.easy.retry.server.support.handler.ServerRegisterNodeHandler;
import com.aizuda.easy.retry.server.support.register.ServerRegister;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.cache.Cache;
import com.google.common.util.concurrent.RateLimiter;
@ -24,9 +27,11 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -62,8 +67,7 @@ public class DispatchService implements Lifecycle {
private ServerNodeMapper serverNodeMapper;
@Autowired
@Qualifier("configAccessProcessor")
private ConfigAccess configAccess;
private ServerNodeBalance serverNodeBalance;
@Autowired
private SystemProperties systemProperties;
@ -74,11 +78,11 @@ public class DispatchService implements Lifecycle {
dispatchService.scheduleAtFixedRate(() -> {
try {
List<GroupConfig> currentHostGroupList = getCurrentHostGroupList();
Set<String> currentHostGroupList = getCurrentHostGroupList();
if (!CollectionUtils.isEmpty(currentHostGroupList)) {
for (GroupConfig groupConfigContext : currentHostGroupList) {
for (String groupName : currentHostGroupList) {
ScanTaskDTO scanTaskDTO = new ScanTaskDTO();
scanTaskDTO.setGroupName(groupConfigContext.getGroupName());
scanTaskDTO.setGroupName(groupName);
produceScanActorTask(scanTaskDTO);
}
}
@ -147,24 +151,8 @@ public class DispatchService implements Lifecycle {
*
* @return {@link GroupConfig} 组上下文
*/
private List<GroupConfig> getCurrentHostGroupList() {
List<GroupConfig> prepareAllocateGroupConfig = configAccess.getAllOpenGroupConfig();
if (CollectionUtils.isEmpty(prepareAllocateGroupConfig)) {
return Collections.EMPTY_LIST;
}
//为了保证客户端分配算法的一致性,serverNodes 从数据库从数据获取
List<ServerNode> serverNodes = serverNodeMapper.selectList(
new LambdaQueryWrapper<ServerNode>().eq(ServerNode::getNodeType, NodeTypeEnum.SERVER.getType()));
if (CollectionUtils.isEmpty(serverNodes)) {
LogUtils.error(log, "服务端节点为空");
return Collections.EMPTY_LIST;
}
List<String> podIdList = serverNodes.stream().map(ServerNode::getHostId).collect(Collectors.toList());
return new AllocateMessageQueueConsistentHash()
.allocate(ServerRegisterNodeHandler.CURRENT_CID, prepareAllocateGroupConfig, podIdList);
private Set<String> getCurrentHostGroupList() {
return serverNodeBalance.getLocalCidAll();
}
@Override

View File

@ -6,6 +6,7 @@ import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode;
import com.aizuda.easy.retry.server.persistence.support.ConfigAccess;
import com.aizuda.easy.retry.server.support.allocate.client.ClientLoadBalanceManager;
import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.aizuda.easy.retry.server.support.ClientLoadBalance;
import org.springframework.beans.factory.annotation.Autowired;
@ -14,6 +15,7 @@ import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
@ -27,8 +29,6 @@ public class ClientNodeAllocateHandler {
@Autowired
@Qualifier("configAccessProcessor")
private ConfigAccess configAccess;
@Autowired
private ServerNodeMapper serverNodeMapper;
/**
* 获取分配的节点
@ -36,8 +36,7 @@ public class ClientNodeAllocateHandler {
public ServerNode getServerNode(String groupName) {
GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(groupName);
List<ServerNode> serverNodes = serverNodeMapper.selectList(new LambdaQueryWrapper<ServerNode>().eq(ServerNode::getGroupName, groupName));
Set<ServerNode> serverNodes = CacheRegisterTable.getServerNodeSet(groupName);
if (CollectionUtils.isEmpty(serverNodes)) {
return null;
}

View File

@ -41,36 +41,6 @@ public class ClientRegisterHandler {
@Qualifier("configAccessProcessor")
private ConfigAccess configAccess;
public void registerClient(HttpHeaders headers) {
threadPoolExecutor.execute(() -> {
String hostId = headers.get(HeadersEnum.HOST_ID.getKey());
String hostIp = headers.get(HeadersEnum.HOST_IP.getKey());
Integer hostPort = headers.getInt(HeadersEnum.HOST_PORT.getKey());
String groupName = headers.get(HeadersEnum.GROUP_NAME.getKey());
String contextPath = headers.get(HeadersEnum.CONTEXT_PATH.getKey());
LocalDateTime endTime = LocalDateTime.now().plusSeconds(30);
ServerNode serverNode = new ServerNode();
serverNode.setGroupName(groupName);
serverNode.setNodeType(NodeTypeEnum.CLIENT.getType());
serverNode.setHostPort(hostPort);
serverNode.setHostIp(hostIp);
serverNode.setExpireAt(endTime);
serverNode.setCreateDt(LocalDateTime.now());
serverNode.setContextPath(contextPath);
serverNode.setHostId(hostId);
try {
int i = serverNodeMapper.insertOrUpdate(serverNode);
} catch (Exception e) {
LogUtils.error(log,"注册客户端失败", e);
}
});
}
public void syncVersion(Integer clientVersion, String groupName, String hostIp, Integer hostPort, String contextPath) {
threadPoolExecutor.execute(() -> {

View File

@ -0,0 +1,61 @@
package com.aizuda.easy.retry.server.support.handler;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig;
import com.aizuda.easy.retry.server.persistence.support.ConfigAccess;
import com.aizuda.easy.retry.server.support.allocate.server.AllocateMessageQueueConsistentHash;
import com.aizuda.easy.retry.server.support.cache.CacheConsumerGroup;
import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.support.register.ServerRegister;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
/**
* @author: shuguang.zhang
* @date : 2023-06-08 15:58
*/
@Component
@Slf4j
public class ServerNodeBalance {
@Autowired
@Qualifier("configAccessProcessor")
private ConfigAccess configAccess;
public void doBalance() {
// 已经按照id 正序排序
List<GroupConfig> prepareAllocateGroupConfig = configAccess.getAllOpenGroupConfig();
if (CollectionUtils.isEmpty(prepareAllocateGroupConfig)) {
return;
}
// 为了保证客户端分配算法的一致性,serverNodes 从数据库从数据获取
Set<String> podIpSet = CacheRegisterTable.getPodIdSet(ServerRegister.GROUP_NAME);
if (CollectionUtils.isEmpty(podIpSet)) {
LogUtils.error(log, "服务端节点为空");
return;
}
Set<String> groupNameSet = prepareAllocateGroupConfig.stream().map(GroupConfig::getGroupName)
.collect(Collectors.toSet());
List<String> allocate = new AllocateMessageQueueConsistentHash()
.allocate(ServerRegisterNodeHandler.CURRENT_CID, new ArrayList<>(groupNameSet), new ArrayList<>(podIpSet));
for (final String groupName : allocate) {
CacheConsumerGroup.addOrUpdate(groupName);
}
}
}

View File

@ -6,6 +6,7 @@ import com.aizuda.easy.retry.server.persistence.mybatis.mapper.ServerNodeMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode;
import com.aizuda.easy.retry.server.support.Lifecycle;
import com.aizuda.easy.retry.server.support.Register;
import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -20,7 +21,7 @@ import java.time.LocalDateTime;
public abstract class AbstractRegister implements Register, Lifecycle {
@Autowired
private ServerNodeMapper serverNodeMapper;
protected ServerNodeMapper serverNodeMapper;
@Autowired
private SystemProperties systemProperties;
@ -38,6 +39,8 @@ public abstract class AbstractRegister implements Register, Lifecycle {
try {
serverNodeMapper.insertOrUpdate(serverNode);
// 刷新本地缓存
CacheRegisterTable.addOrUpdate(serverNode.getGroupName(), serverNode);
}catch (Exception e) {
LogUtils.error(log,"注册节点失败 groupName:[{}] hostIp:[{}]",
serverNode.getGroupName(), serverNode.getHostIp(), e);

View File

@ -2,12 +2,15 @@ package com.aizuda.easy.retry.server.support.register;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 客户端注册
@ -17,6 +20,7 @@ import java.util.concurrent.LinkedBlockingQueue;
* @since 1.6.0
*/
@Component
@Slf4j
public class ClientRegister extends AbstractRegister {
public static final int DELAY_TIME = 30;
protected static final LinkedBlockingQueue<ServerNode> QUEUE = new LinkedBlockingQueue<>();
@ -54,14 +58,17 @@ public class ClientRegister extends AbstractRegister {
@Override
public void start() {
new Thread(() -> {
while (Thread.currentThread().isInterrupted()) {
try {
ServerNode serverNode = QUEUE.take();
refreshExpireAt(serverNode);
// 防止刷的过快
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
LogUtils.error(log, "client refresh expireAt error.");
}
}).start();
}
}, "client_register_").start();
}
@Override

View File

@ -8,9 +8,15 @@ import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.HostUtils;
import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode;
import com.aizuda.easy.retry.server.support.Register;
import com.aizuda.easy.retry.server.support.cache.CacheConsumerGroup;
import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.support.handler.ServerNodeBalance;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -28,6 +34,10 @@ public class ServerRegister extends AbstractRegister {
private final ScheduledExecutorService serverRegisterNode = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r,"ServerRegisterNode"));
public static final int DELAY_TIME = 30;
public static final String CURRENT_CID;
public static final String GROUP_NAME = "DEFAULT_SERVER";
@Autowired
public ServerNodeBalance serverNodeBalance;
static {
CURRENT_CID = IdUtil.simpleUUID();
}
@ -40,6 +50,7 @@ public class ServerRegister extends AbstractRegister {
@Override
protected void beforeProcessor(RegisterContext context) {
context.setGroupName(GROUP_NAME);
context.setHostId(CURRENT_CID);
context.setHostIp(HostUtils.getIp());
context.setGroupName(StrUtil.EMPTY);
@ -54,6 +65,14 @@ public class ServerRegister extends AbstractRegister {
@Override
protected boolean doRegister(RegisterContext context, ServerNode serverNode) {
refreshExpireAt(serverNode);
Set<ServerNode> serverNodeSet = CacheRegisterTable.getServerNodeSet(context.getGroupName());
for (final ServerNode node : serverNodeSet) {
ServerNode consumerServerNode = CacheConsumerGroup.get(node.getHostId());
if (consumerServerNode.getExpireAt().isBefore(LocalDateTime.now())) {
// 触发rebalance
serverNodeBalance.doBalance();
}
}
return Boolean.TRUE;
}

View File

@ -1,10 +1,15 @@
package com.aizuda.easy.retry.server.support.schedule;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.ServerNodeMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode;
import com.aizuda.easy.retry.server.persistence.support.ConfigAccess;
import com.aizuda.easy.retry.server.service.RetryService;
import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.support.handler.ServerRegisterNodeHandler;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
import org.springframework.beans.factory.annotation.Autowired;
@ -14,6 +19,7 @@ import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.Set;
import java.util.stream.Collectors;
/**
* 清除数据线程调度器
@ -43,7 +49,19 @@ public class ClearThreadSchedule {
public void clearOfflineNode() {
try {
serverNodeMapper.deleteByExpireAt(LocalDateTime.now().minusSeconds(ServerRegisterNodeHandler.DELAY_TIME * 2));
LocalDateTime endTime = LocalDateTime.now().minusSeconds(ServerRegisterNodeHandler.DELAY_TIME * 2);
Set<ServerNode> allPods = CacheRegisterTable.getAllPods();
Set<ServerNode> waitOffline = allPods.stream().filter(serverNode -> serverNode.getExpireAt().isAfter(endTime)).collect(Collectors.toSet());
Set<String> podIds = waitOffline.stream().map(ServerNode::getHostId).collect(Collectors.toSet());
int delete = serverNodeMapper
.delete(new LambdaQueryWrapper<ServerNode>().in(ServerNode::getHostId, podIds));
Assert.isTrue(delete > 0, () -> new EasyRetryServerException("clearOfflineNode error"));
for (final ServerNode serverNode : waitOffline) {
CacheRegisterTable.remove(serverNode.getGroupName(), serverNode.getHostId());
}
} catch (Exception e) {
LogUtils.error(log, "clearOfflineNode 失败", e);
}