From a82dc72ec2b15f1efb3318cf9dc1fe022fe99d02 Mon Sep 17 00:00:00 2001 From: "www.byteblogs.com" <598092184@qq.com> Date: Thu, 8 Jun 2023 23:49:24 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.0.0=201.=20=E6=96=B0=E5=A2=9Epod?= =?UTF-8?q?=E5=88=97=E8=A1=A8=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../support/cache/CacheConsumerGroup.java | 14 ++-- .../support/cache/CacheRegisterTable.java | 36 +++++--- .../support/dispatch/DispatchService.java | 19 +---- .../handler/ClientRegisterHandler.java | 3 +- .../support/handler/ServerNodeBalance.java | 83 ++++++++++++++++++- .../handler/ServerRegisterNodeHandler.java | 80 ------------------ .../support/register/ServerRegister.java | 26 +++--- .../support/schedule/ClearThreadSchedule.java | 20 +++-- 8 files changed, 141 insertions(+), 140 deletions(-) delete mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerRegisterNodeHandler.java diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheConsumerGroup.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheConsumerGroup.java index dfb65fbf..a0e527cc 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheConsumerGroup.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheConsumerGroup.java @@ -28,15 +28,15 @@ import java.util.stream.Collectors; @Slf4j public class CacheConsumerGroup implements Lifecycle { - private static Cache CACHE; + private static Cache CACHE; /** * 获取所有缓存 * * @return 缓存对象 */ - public static Set getAllPods() { - ConcurrentMap concurrentMap = CACHE.asMap(); + public static Set getAllPods() { + ConcurrentMap concurrentMap = CACHE.asMap(); return new HashSet<>(concurrentMap.values()); } @@ -46,7 +46,7 @@ public class CacheConsumerGroup implements Lifecycle { * * @return 缓存对象 */ - public static ServerNode get(String hostId) { + public static String get(String hostId) { return CACHE.getIfPresent(hostId); } @@ -57,11 +57,11 @@ public class CacheConsumerGroup implements Lifecycle { * @return 缓存对象 */ public static synchronized void addOrUpdate(String groupName) { - CACHE.put(groupName, ""); + CACHE.put(groupName, groupName); } - public static void remove(String hostId) { - CACHE.invalidate(hostId); + public static void remove(String groupName) { + CACHE.invalidate(groupName); } @Override diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheRegisterTable.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheRegisterTable.java index e26155e8..f38c7686 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheRegisterTable.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheRegisterTable.java @@ -7,7 +7,9 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; +import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.Map; @@ -38,11 +40,15 @@ public class CacheRegisterTable implements Lifecycle { */ public static Set getAllPods() { ConcurrentMap> concurrentMap = CACHE.asMap(); + if (CollectionUtils.isEmpty(concurrentMap)) { + return Collections.EMPTY_SET; + } - return (Set) concurrentMap.values().stream().map(Map::values).reduce((s, y) -> { - Set mergeSet = new HashSet<>(s); - mergeSet.addAll(y); - return mergeSet; + return concurrentMap.values().stream() + .map(stringServerNodeConcurrentMap -> new HashSet<>(stringServerNodeConcurrentMap.values())) + .reduce((s, y) -> { + s.addAll(y); + return s; }).get(); } @@ -75,13 +81,16 @@ public class CacheRegisterTable implements Lifecycle { * * @return 缓存对象 */ - public static TreeSet getServerNodeSet(String groupName) { + public static Set getServerNodeSet(String groupName) { ConcurrentMap concurrentMap = CACHE.getIfPresent(groupName); + + Set set = new TreeSet<>(Comparator.comparingInt(o -> o.getId().intValue())); if (Objects.isNull(concurrentMap)) { - return new TreeSet<>(); + return set; } - return new TreeSet<>(Comparator.comparingInt(o -> o.getId().intValue())); + set.addAll(concurrentMap.values()); + return set; } /** @@ -90,7 +99,8 @@ public class CacheRegisterTable implements Lifecycle { * @return 缓存对象 */ public static Set getPodIdSet(String groupName) { - return getServerNodeSet(groupName).stream().map(ServerNode::getHostId).collect(Collectors.toSet()); + return getServerNodeSet(groupName).stream() + .map(ServerNode::getHostId).collect(Collectors.toSet()); } /** @@ -100,12 +110,16 @@ public class CacheRegisterTable implements Lifecycle { * @return 缓存对象 */ public static synchronized void addOrUpdate(String groupName, ServerNode serverNode) { + ConcurrentMap concurrentMap = CACHE.getIfPresent(groupName); if (Objects.isNull(concurrentMap)) { + LogUtils.info(log, "Add cache. groupName:[{}] hostId:[{}]", groupName, serverNode.getHostId()); concurrentMap = new ConcurrentHashMap<>(); CACHE.put(groupName, concurrentMap); } + LogUtils.info(log, "Update cache. groupName:[{}] hostId:[{}] hostIp:[{}]", groupName, + serverNode.getHostId(), serverNode.getExpireAt()); concurrentMap.put(serverNode.getHostId(), serverNode); } @@ -115,13 +129,10 @@ public class CacheRegisterTable implements Lifecycle { return; } + LogUtils.info(log, "Remove cache. groupName:[{}] hostId:[{}]", groupName, hostId); concurrentMap.remove(hostId); } - public static void expirationElimination(String groupName, String hostId) { - - } - @Override public void start() { LogUtils.info(log, "CacheRegisterTable start"); @@ -129,6 +140,7 @@ public class CacheRegisterTable implements Lifecycle { // 设置并发级别为cpu核心数 .concurrencyLevel(Runtime.getRuntime().availableProcessors()) .build(); + } @Override diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/DispatchService.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/DispatchService.java index 99151ad4..579ff5e2 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/DispatchService.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/DispatchService.java @@ -1,41 +1,31 @@ package com.aizuda.easy.retry.server.support.dispatch; import akka.actor.ActorRef; -import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum; -import com.aizuda.easy.retry.server.enums.TaskTypeEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; -import com.aizuda.easy.retry.server.akka.ActorGenerator; import com.aizuda.easy.retry.server.config.SystemProperties; +import com.aizuda.easy.retry.server.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.persistence.mybatis.mapper.ServerNodeMapper; import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig; import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode; -import com.aizuda.easy.retry.server.persistence.support.ConfigAccess; import com.aizuda.easy.retry.server.support.Lifecycle; -import com.aizuda.easy.retry.server.support.allocate.server.AllocateMessageQueueConsistentHash; +import com.aizuda.easy.retry.server.support.cache.CacheConsumerGroup; import com.aizuda.easy.retry.server.support.cache.CacheGroupRateLimiter; import com.aizuda.easy.retry.server.support.cache.CacheGroupScanActor; -import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable; import com.aizuda.easy.retry.server.support.handler.ServerNodeBalance; -import com.aizuda.easy.retry.server.support.handler.ServerRegisterNodeHandler; -import com.aizuda.easy.retry.server.support.register.ServerRegister; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.google.common.cache.Cache; import com.google.common.util.concurrent.RateLimiter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; /** * 分发器组件 @@ -66,9 +56,6 @@ public class DispatchService implements Lifecycle { @Autowired private ServerNodeMapper serverNodeMapper; - @Autowired - private ServerNodeBalance serverNodeBalance; - @Autowired private SystemProperties systemProperties; @@ -152,7 +139,7 @@ public class DispatchService implements Lifecycle { * @return {@link GroupConfig} 组上下文 */ private Set getCurrentHostGroupList() { - return serverNodeBalance.getLocalCidAll(); + return CacheConsumerGroup.getAllPods(); } @Override diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ClientRegisterHandler.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ClientRegisterHandler.java index eaf3f5bd..b716c09e 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ClientRegisterHandler.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ClientRegisterHandler.java @@ -21,6 +21,7 @@ import java.util.Objects; import java.util.concurrent.*; /** + * todo 待优化 * @author www.byteblogs.com * @date 2022-03-09 * @since 2.0 @@ -33,8 +34,6 @@ public class ClientRegisterHandler { new LinkedBlockingQueue<>(500), r -> new Thread(r, "CLIENT REGISTER THREAD"), (r, executor) -> LogUtils.error(log, "处理注册线程池已经超负荷运作")); public static final String URL = "http://{0}:{1}/{2}/retry/sync/version/v1"; - @Autowired - private ServerNodeMapper serverNodeMapper; @Autowired private RestTemplate restTemplate; @Autowired diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerNodeBalance.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerNodeBalance.java index 9130e037..e4de369f 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerNodeBalance.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerNodeBalance.java @@ -1,24 +1,36 @@ package com.aizuda.easy.retry.server.support.handler; +import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.persistence.mybatis.mapper.ServerNodeMapper; import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig; +import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode; import com.aizuda.easy.retry.server.persistence.support.ConfigAccess; +import com.aizuda.easy.retry.server.support.Lifecycle; import com.aizuda.easy.retry.server.support.allocate.server.AllocateMessageQueueConsistentHash; import com.aizuda.easy.retry.server.support.cache.CacheConsumerGroup; import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable; import com.aizuda.easy.retry.server.support.register.ServerRegister; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * @author: shuguang.zhang @@ -26,11 +38,15 @@ import java.util.stream.Collectors; */ @Component @Slf4j -public class ServerNodeBalance { +public class ServerNodeBalance implements Lifecycle { @Autowired @Qualifier("configAccessProcessor") private ConfigAccess configAccess; + private final ScheduledExecutorService serverRegisterNode = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "ServerRegisterNode")); + + @Autowired + protected ServerNodeMapper serverNodeMapper; public void doBalance() { @@ -52,10 +68,73 @@ public class ServerNodeBalance { .collect(Collectors.toSet()); List allocate = new AllocateMessageQueueConsistentHash() - .allocate(ServerRegisterNodeHandler.CURRENT_CID, new ArrayList<>(groupNameSet), new ArrayList<>(podIpSet)); + .allocate(ServerRegister.CURRENT_CID, new ArrayList<>(groupNameSet), new ArrayList<>(podIpSet)); for (final String groupName : allocate) { CacheConsumerGroup.addOrUpdate(groupName); } } + + @Override + public void start() { + LogUtils.info(log, "ServerNodeBalance start"); + serverRegisterNode.scheduleAtFixedRate(() -> { + try { + ConcurrentMap concurrentMap = CacheRegisterTable.get(ServerRegister.GROUP_NAME); + + if (!CollectionUtils.isEmpty(concurrentMap)) { + Set sorted = new TreeSet<>(Comparator.comparing(ServerNode::getExpireAt)); + sorted.addAll(concurrentMap.values()); + + sorted.stream().findFirst().ifPresent(serverNode -> { + // 若服务端的POD注册的第一个节点是过期了则触发rebalance + if (serverNode.getExpireAt().isBefore(LocalDateTime.now())) { + // 删除过期的节点信息 + CacheRegisterTable.remove(serverNode.getGroupName(), serverNode.getHostId()); + // 删除本地消费组信息 + CacheConsumerGroup.remove(serverNode.getGroupName()); + // 重新刷新所有的缓存key + refreshCache(); + // 触发rebalance + doBalance(); + } + }); + } else { + // 若不存在服务端的POD注册信息直接rebalance + // 重新获取DB中最新的服务信息 + refreshCache(); + + // 触发rebalance + doBalance(); + } + + } catch (Exception e) { + LogUtils.error(log, "", e); + } + }, 10, 1, TimeUnit.SECONDS); + } + + private void refreshCache() { + // 重新获取DB中最新的服务信息 + List serverNodes = serverNodeMapper.selectList( + new LambdaQueryWrapper() + .eq(ServerNode::getNodeType, NodeTypeEnum.SERVER.getType())); + + // 刷新最新的节点注册信息 + for (ServerNode node : serverNodes) { + CacheRegisterTable.addOrUpdate(node.getGroupName(), node); + } + } + + @Override + public void close() { + LogUtils.info(log, "ServerNodeBalance close"); + LogUtils.info(log, "准备删除节点 [{}]", ServerRegister.CURRENT_CID); + int i = serverNodeMapper.delete(new LambdaQueryWrapper().eq(ServerNode::getHostId, ServerRegister.CURRENT_CID)); + if (1 == i) { + LogUtils.info(log,"删除节点 [{}]成功", ServerRegister.CURRENT_CID); + } else { + LogUtils.info(log,"删除节点 [{}]失败", ServerRegister.CURRENT_CID); + } + } } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerRegisterNodeHandler.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerRegisterNodeHandler.java deleted file mode 100644 index 65b5e54e..00000000 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerRegisterNodeHandler.java +++ /dev/null @@ -1,80 +0,0 @@ -package com.aizuda.easy.retry.server.support.handler; - -import cn.hutool.core.util.IdUtil; -import cn.hutool.core.util.StrUtil; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum; -import com.aizuda.easy.retry.common.core.log.LogUtils; -import com.aizuda.easy.retry.common.core.util.HostUtils; -import com.aizuda.easy.retry.server.config.SystemProperties; -import com.aizuda.easy.retry.server.persistence.mybatis.mapper.ServerNodeMapper; -import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode; -import com.aizuda.easy.retry.server.support.Lifecycle; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import java.time.LocalDateTime; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * 服务端注册处理器 - * - * @author: www.byteblogs.com - * @date : 2021-11-19 14:49 - */ -@Component -@Slf4j -public class ServerRegisterNodeHandler implements Lifecycle { - - @Autowired - private ServerNodeMapper serverNodeMapper; - @Autowired - private SystemProperties systemProperties; - - private final ScheduledExecutorService serverRegisterNode = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r,"ServerRegisterNode")); - public static final int DELAY_TIME = 20; - public static final String CURRENT_CID; - - static { - CURRENT_CID = IdUtil.simpleUUID(); - } - - @Override - public void start() { - - ServerNode serverNode = new ServerNode(); - serverNode.setHostId(CURRENT_CID); - serverNode.setHostIp(HostUtils.getIp()); - serverNode.setGroupName(StrUtil.EMPTY); - serverNode.setHostPort(systemProperties.getNettyPort()); - serverNode.setNodeType(NodeTypeEnum.SERVER.getType()); - serverNode.setCreateDt(LocalDateTime.now()); - serverNode.setContextPath(StrUtil.EMPTY); - serverRegisterNode.scheduleAtFixedRate(()->{ - - try { - serverNode.setExpireAt(LocalDateTime.now().plusSeconds(DELAY_TIME)); - serverNodeMapper.insertOrUpdate(serverNode); - }catch (Exception e) { - LogUtils.error(log,"服务端注册节点失败", e); - } - - }, 1, DELAY_TIME / 2, TimeUnit.SECONDS); - - } - - @Override - public void close() { - LogUtils.info(log, "准备删除节点 [{}]", CURRENT_CID); - int i = serverNodeMapper.delete(new LambdaQueryWrapper().eq(ServerNode::getHostId, CURRENT_CID)); - if (1 == i) { - LogUtils.info(log,"删除节点 [{}]成功", CURRENT_CID); - } else { - LogUtils.info(log,"删除节点 [{}]失败", CURRENT_CID); - } - - } -} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ServerRegister.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ServerRegister.java index b4be57ad..ba99b4e0 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ServerRegister.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ServerRegister.java @@ -11,6 +11,7 @@ import com.aizuda.easy.retry.server.support.Register; import com.aizuda.easy.retry.server.support.cache.CacheConsumerGroup; import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable; import com.aizuda.easy.retry.server.support.handler.ServerNodeBalance; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -28,13 +29,16 @@ import java.util.concurrent.TimeUnit; * @date 2023-06-07 * @since 1.6.0 */ -@Component +@Component(ServerRegister.BEAN_NAME) +@Slf4j public class ServerRegister extends AbstractRegister { + public static final String BEAN_NAME = "serverRegister"; private final ScheduledExecutorService serverRegisterNode = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r,"ServerRegisterNode")); public static final int DELAY_TIME = 30; public static final String CURRENT_CID; public static final String GROUP_NAME = "DEFAULT_SERVER"; + @Autowired public ServerNodeBalance serverNodeBalance; @@ -53,7 +57,6 @@ public class ServerRegister extends AbstractRegister { context.setGroupName(GROUP_NAME); context.setHostId(CURRENT_CID); context.setHostIp(HostUtils.getIp()); - context.setGroupName(StrUtil.EMPTY); context.setContextPath(StrUtil.EMPTY); } @@ -65,14 +68,6 @@ public class ServerRegister extends AbstractRegister { @Override protected boolean doRegister(RegisterContext context, ServerNode serverNode) { refreshExpireAt(serverNode); - Set serverNodeSet = CacheRegisterTable.getServerNodeSet(context.getGroupName()); - for (final ServerNode node : serverNodeSet) { - ServerNode consumerServerNode = CacheConsumerGroup.get(node.getHostId()); - if (consumerServerNode.getExpireAt().isBefore(LocalDateTime.now())) { - // 触发rebalance - serverNodeBalance.doBalance(); - } - } return Boolean.TRUE; } @@ -83,15 +78,20 @@ public class ServerRegister extends AbstractRegister { @Override public void start() { - Register register = SpringContext.getBean("serverRegister", Register.class); + LogUtils.info(log, "ServerRegister start"); + + // 先删除已经过期未删除的节点 + serverNodeMapper.deleteByExpireAt(LocalDateTime.now().minusSeconds(ServerRegister.DELAY_TIME * 2)); + + Register register = SpringContext.getBean(ServerRegister.BEAN_NAME, Register.class); serverRegisterNode.scheduleAtFixedRate(()->{ register.register(new RegisterContext()); - }, 1, DELAY_TIME / 2, TimeUnit.SECONDS); + }, 0, DELAY_TIME / 2, TimeUnit.SECONDS); } @Override public void close() { - + LogUtils.info(log, "ServerRegister close"); } } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/ClearThreadSchedule.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/ClearThreadSchedule.java index 11db0381..9e532f1f 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/ClearThreadSchedule.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/ClearThreadSchedule.java @@ -8,7 +8,7 @@ import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode; import com.aizuda.easy.retry.server.persistence.support.ConfigAccess; import com.aizuda.easy.retry.server.service.RetryService; import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable; -import com.aizuda.easy.retry.server.support.handler.ServerRegisterNodeHandler; +import com.aizuda.easy.retry.server.support.register.ServerRegister; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.extern.slf4j.Slf4j; import net.javacrumbs.shedlock.spring.annotation.SchedulerLock; @@ -16,6 +16,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import java.time.LocalDateTime; import java.util.Set; @@ -49,14 +50,17 @@ public class ClearThreadSchedule { public void clearOfflineNode() { try { - LocalDateTime endTime = LocalDateTime.now().minusSeconds(ServerRegisterNodeHandler.DELAY_TIME * 2); - Set allPods = CacheRegisterTable.getAllPods(); - Set waitOffline = allPods.stream().filter(serverNode -> serverNode.getExpireAt().isAfter(endTime)).collect(Collectors.toSet()); - Set podIds = waitOffline.stream().map(ServerNode::getHostId).collect(Collectors.toSet()); - int delete = serverNodeMapper - .delete(new LambdaQueryWrapper().in(ServerNode::getHostId, podIds)); - Assert.isTrue(delete > 0, () -> new EasyRetryServerException("clearOfflineNode error")); + // TODO ING + serverNodeMapper.deleteByExpireAt(LocalDateTime.now().minusSeconds(ServerRegister.DELAY_TIME * 2)); + + LocalDateTime endTime = LocalDateTime.now().minusSeconds(ServerRegister.DELAY_TIME * 2); + Set allPods = CacheRegisterTable.getAllPods(); + Set waitOffline = allPods.stream().filter(serverNode -> serverNode.getExpireAt().isBefore(endTime)).collect(Collectors.toSet()); + Set podIds = waitOffline.stream().map(ServerNode::getHostId).collect(Collectors.toSet()); + if (CollectionUtils.isEmpty(podIds)) { + return; + } for (final ServerNode serverNode : waitOffline) { CacheRegisterTable.remove(serverNode.getGroupName(), serverNode.getHostId());