From f8ff8fd01f17d40900f385e0f2df561657e581d2 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Thu, 8 Jun 2023 18:43:29 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.0.0=201.=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E8=B7=AF=E7=94=B1=E6=B3=A8=E5=86=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../access/config/MybatisConfigAccess.java | 2 +- .../server/server/NettyHttpServerHandler.java | 22 +++-- .../server/support/ServerLoadBalance.java | 8 +- .../AllocateMessageQueueConsistentHash.java | 13 ++- .../support/cache/CacheConsumerGroup.java | 81 +++++++++++++++++++ .../support/cache/CacheRegisterTable.java | 36 ++++++--- .../support/dispatch/DispatchService.java | 34 +++----- .../handler/ClientNodeAllocateHandler.java | 7 +- .../handler/ClientRegisterHandler.java | 30 ------- .../support/handler/ServerNodeBalance.java | 61 ++++++++++++++ .../support/register/AbstractRegister.java | 5 +- .../support/register/ClientRegister.java | 21 +++-- .../support/register/ServerRegister.java | 19 +++++ .../support/schedule/ClearThreadSchedule.java | 20 ++++- 14 files changed, 265 insertions(+), 94 deletions(-) create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheConsumerGroup.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerNodeBalance.java diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/config/MybatisConfigAccess.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/config/MybatisConfigAccess.java index c4b771ff..e3f06d18 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/config/MybatisConfigAccess.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/config/MybatisConfigAccess.java @@ -79,7 +79,7 @@ public class MybatisConfigAccess extends AbstractConfigAccess { @Override public List getAllConfigGroupList() { - List allSystemConfigGroupList = groupConfigMapper.selectList(new LambdaQueryWrapper<>()); + List allSystemConfigGroupList = groupConfigMapper.selectList(new LambdaQueryWrapper().orderByAsc(GroupConfig::getId)); if (CollectionUtils.isEmpty(allSystemConfigGroupList)) { return Collections.EMPTY_LIST; } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/server/NettyHttpServerHandler.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/server/NettyHttpServerHandler.java index 7e73ede5..9c4ad16b 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/server/NettyHttpServerHandler.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/server/NettyHttpServerHandler.java @@ -2,12 +2,15 @@ package com.aizuda.easy.retry.server.server; import cn.hutool.core.net.url.UrlBuilder; import com.aizuda.easy.retry.server.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.support.Register; import com.aizuda.easy.retry.server.support.handler.ClientRegisterHandler; import com.aizuda.easy.retry.common.core.context.SpringContext; import com.aizuda.easy.retry.common.core.enums.HeadersEnum; import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.server.handler.HttpRequestHandler; +import com.aizuda.easy.retry.server.support.register.ClientRegister; +import com.aizuda.easy.retry.server.support.register.RegisterContext; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; @@ -50,16 +53,25 @@ public class NettyHttpServerHandler extends SimpleChannelInboundHandler httpRequestHandlers = SpringContext.CONTEXT.getBeansOfType(HttpRequestHandler.class).values(); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/ServerLoadBalance.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/ServerLoadBalance.java index 2aa31195..aa8b8fae 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/ServerLoadBalance.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/ServerLoadBalance.java @@ -16,8 +16,6 @@ */ package com.aizuda.easy.retry.server.support; -import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig; - import java.util.List; /** @@ -29,12 +27,12 @@ public interface ServerLoadBalance { * Allocating by consumer id * * @param currentCID current consumer id - * @param groupList consumer set in current consumer group + * @param groupNameList consumer set in current consumer group * @return The allocate result of given strategy */ - List allocate( + List allocate( final String currentCID, - final List groupList, + final List groupNameList, final List serverList ); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/allocate/server/AllocateMessageQueueConsistentHash.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/allocate/server/AllocateMessageQueueConsistentHash.java index 873e7f84..956837a9 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/allocate/server/AllocateMessageQueueConsistentHash.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/allocate/server/AllocateMessageQueueConsistentHash.java @@ -18,7 +18,6 @@ package com.aizuda.easy.retry.server.support.allocate.server; import com.aizuda.easy.retry.server.support.allocate.common.ConsistentHashRouter; import com.aizuda.easy.retry.server.support.allocate.common.HashFunction; -import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig; import com.aizuda.easy.retry.server.support.ServerLoadBalance; import com.aizuda.easy.retry.server.support.allocate.common.Node; import org.springframework.util.CollectionUtils; @@ -52,7 +51,7 @@ public class AllocateMessageQueueConsistentHash implements ServerLoadBalance { } @Override - public List allocate(String currentCID, List groupList, + public List allocate(String currentCID, List groupList, List serverList) { if (currentCID == null || currentCID.length() < 1) { @@ -65,7 +64,7 @@ public class AllocateMessageQueueConsistentHash implements ServerLoadBalance { throw new IllegalArgumentException("cidAll is null or cidAll empty"); } - List result = new ArrayList<>(); + List result = new ArrayList<>(); if (!serverList.contains(currentCID)) { return result; } @@ -82,11 +81,11 @@ public class AllocateMessageQueueConsistentHash implements ServerLoadBalance { router = new ConsistentHashRouter(cidNodes, virtualNodeCnt); } - List results = new ArrayList<>(); - for (GroupConfig groupConfig : groupList) { - ClientNode clientNode = router.routeNode(groupConfig.getGroupName()); + List results = new ArrayList<>(); + for (String groupName : groupList) { + ClientNode clientNode = router.routeNode(groupName); if (clientNode != null && currentCID.equals(clientNode.getKey())) { - results.add(groupConfig); + results.add(groupName); } } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheConsumerGroup.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheConsumerGroup.java new file mode 100644 index 00000000..dfb65fbf --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheConsumerGroup.java @@ -0,0 +1,81 @@ +package com.aizuda.easy.retry.server.support.cache; + +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode; +import com.aizuda.easy.retry.server.support.Lifecycle; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.Comparator; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; + +/** + * 当前POD负责消费的组 + * + * @author www.byteblogs.com + * @date 2021-10-30 + * @since 2.0 + */ +@Component +@Slf4j +public class CacheConsumerGroup implements Lifecycle { + + private static Cache CACHE; + + /** + * 获取所有缓存 + * + * @return 缓存对象 + */ + public static Set getAllPods() { + ConcurrentMap concurrentMap = CACHE.asMap(); + return new HashSet<>(concurrentMap.values()); + + } + + /** + * 获取所有缓存 + * + * @return 缓存对象 + */ + public static ServerNode get(String hostId) { + return CACHE.getIfPresent(hostId); + } + + /** + * 无缓存时添加 + * 有缓存时更新 + * + * @return 缓存对象 + */ + public static synchronized void addOrUpdate(String groupName) { + CACHE.put(groupName, ""); + } + + public static void remove(String hostId) { + CACHE.invalidate(hostId); + } + + @Override + public void start() { + LogUtils.info(log, "CacheRegisterTable start"); + CACHE = CacheBuilder.newBuilder() + // 设置并发级别为cpu核心数 + .concurrencyLevel(Runtime.getRuntime().availableProcessors()) + .build(); + } + + @Override + public void close() { + LogUtils.info(log, "CacheRegisterTable stop"); + CACHE.invalidateAll(); + } +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheRegisterTable.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheRegisterTable.java index 9dcdb6e0..e26155e8 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheRegisterTable.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheRegisterTable.java @@ -1,19 +1,17 @@ package com.aizuda.easy.retry.server.support.cache; -import akka.actor.ActorRef; import com.aizuda.easy.retry.common.core.log.LogUtils; -import com.aizuda.easy.retry.server.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode; import com.aizuda.easy.retry.server.support.Lifecycle; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.util.Comparator; +import java.util.HashSet; +import java.util.Map; import java.util.Objects; -import java.util.Observable; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; @@ -28,12 +26,27 @@ import java.util.stream.Collectors; * @since 2.0 */ @Component -@Data @Slf4j public class CacheRegisterTable implements Lifecycle { private static Cache> CACHE; + /** + * 获取所有缓存 + * + * @return 缓存对象 + */ + public static Set getAllPods() { + ConcurrentMap> concurrentMap = CACHE.asMap(); + + return (Set) concurrentMap.values().stream().map(Map::values).reduce((s, y) -> { + Set mergeSet = new HashSet<>(s); + mergeSet.addAll(y); + return mergeSet; + }).get(); + + } + /** * 获取所有缓存 * @@ -72,24 +85,27 @@ public class CacheRegisterTable implements Lifecycle { } /** - * 获取排序的ServerNode + * 获取排序的hostId * * @return 缓存对象 */ - public static Set gePodIpSet(String groupName) { - return getServerNodeSet(groupName).stream().map(ServerNode::getHostIp).collect(Collectors.toSet()); + public static Set getPodIdSet(String groupName) { + return getServerNodeSet(groupName).stream().map(ServerNode::getHostId).collect(Collectors.toSet()); } /** - * 获取所有缓存 + * 无缓存时添加 + * 有缓存时更新 * * @return 缓存对象 */ - public static void put(String groupName, ServerNode serverNode) { + public static synchronized void addOrUpdate(String groupName, ServerNode serverNode) { ConcurrentMap concurrentMap = CACHE.getIfPresent(groupName); if (Objects.isNull(concurrentMap)) { concurrentMap = new ConcurrentHashMap<>(); + CACHE.put(groupName, concurrentMap); } + concurrentMap.put(serverNode.getHostId(), serverNode); } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/DispatchService.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/DispatchService.java index e1607fb1..99151ad4 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/DispatchService.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/DispatchService.java @@ -14,7 +14,10 @@ import com.aizuda.easy.retry.server.support.Lifecycle; import com.aizuda.easy.retry.server.support.allocate.server.AllocateMessageQueueConsistentHash; import com.aizuda.easy.retry.server.support.cache.CacheGroupRateLimiter; import com.aizuda.easy.retry.server.support.cache.CacheGroupScanActor; +import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable; +import com.aizuda.easy.retry.server.support.handler.ServerNodeBalance; import com.aizuda.easy.retry.server.support.handler.ServerRegisterNodeHandler; +import com.aizuda.easy.retry.server.support.register.ServerRegister; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.google.common.cache.Cache; import com.google.common.util.concurrent.RateLimiter; @@ -24,9 +27,11 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -62,8 +67,7 @@ public class DispatchService implements Lifecycle { private ServerNodeMapper serverNodeMapper; @Autowired - @Qualifier("configAccessProcessor") - private ConfigAccess configAccess; + private ServerNodeBalance serverNodeBalance; @Autowired private SystemProperties systemProperties; @@ -74,11 +78,11 @@ public class DispatchService implements Lifecycle { dispatchService.scheduleAtFixedRate(() -> { try { - List currentHostGroupList = getCurrentHostGroupList(); + Set currentHostGroupList = getCurrentHostGroupList(); if (!CollectionUtils.isEmpty(currentHostGroupList)) { - for (GroupConfig groupConfigContext : currentHostGroupList) { + for (String groupName : currentHostGroupList) { ScanTaskDTO scanTaskDTO = new ScanTaskDTO(); - scanTaskDTO.setGroupName(groupConfigContext.getGroupName()); + scanTaskDTO.setGroupName(groupName); produceScanActorTask(scanTaskDTO); } } @@ -147,24 +151,8 @@ public class DispatchService implements Lifecycle { * * @return {@link GroupConfig} 组上下文 */ - private List getCurrentHostGroupList() { - List prepareAllocateGroupConfig = configAccess.getAllOpenGroupConfig(); - if (CollectionUtils.isEmpty(prepareAllocateGroupConfig)) { - return Collections.EMPTY_LIST; - } - //为了保证客户端分配算法的一致性,serverNodes 从数据库从数据获取 - List serverNodes = serverNodeMapper.selectList( - new LambdaQueryWrapper().eq(ServerNode::getNodeType, NodeTypeEnum.SERVER.getType())); - - if (CollectionUtils.isEmpty(serverNodes)) { - LogUtils.error(log, "服务端节点为空"); - return Collections.EMPTY_LIST; - } - - List podIdList = serverNodes.stream().map(ServerNode::getHostId).collect(Collectors.toList()); - - return new AllocateMessageQueueConsistentHash() - .allocate(ServerRegisterNodeHandler.CURRENT_CID, prepareAllocateGroupConfig, podIdList); + private Set getCurrentHostGroupList() { + return serverNodeBalance.getLocalCidAll(); } @Override diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ClientNodeAllocateHandler.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ClientNodeAllocateHandler.java index 54e95b43..4fbf2bb5 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ClientNodeAllocateHandler.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ClientNodeAllocateHandler.java @@ -6,6 +6,7 @@ import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode; import com.aizuda.easy.retry.server.persistence.support.ConfigAccess; import com.aizuda.easy.retry.server.support.allocate.client.ClientLoadBalanceManager; +import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.aizuda.easy.retry.server.support.ClientLoadBalance; import org.springframework.beans.factory.annotation.Autowired; @@ -14,6 +15,7 @@ import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.util.List; +import java.util.Set; import java.util.TreeSet; import java.util.stream.Collectors; @@ -27,8 +29,6 @@ public class ClientNodeAllocateHandler { @Autowired @Qualifier("configAccessProcessor") private ConfigAccess configAccess; - @Autowired - private ServerNodeMapper serverNodeMapper; /** * 获取分配的节点 @@ -36,8 +36,7 @@ public class ClientNodeAllocateHandler { public ServerNode getServerNode(String groupName) { GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(groupName); - List serverNodes = serverNodeMapper.selectList(new LambdaQueryWrapper().eq(ServerNode::getGroupName, groupName)); - + Set serverNodes = CacheRegisterTable.getServerNodeSet(groupName); if (CollectionUtils.isEmpty(serverNodes)) { return null; } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ClientRegisterHandler.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ClientRegisterHandler.java index 7756292a..eaf3f5bd 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ClientRegisterHandler.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ClientRegisterHandler.java @@ -41,36 +41,6 @@ public class ClientRegisterHandler { @Qualifier("configAccessProcessor") private ConfigAccess configAccess; - public void registerClient(HttpHeaders headers) { - - threadPoolExecutor.execute(() -> { - - String hostId = headers.get(HeadersEnum.HOST_ID.getKey()); - String hostIp = headers.get(HeadersEnum.HOST_IP.getKey()); - Integer hostPort = headers.getInt(HeadersEnum.HOST_PORT.getKey()); - String groupName = headers.get(HeadersEnum.GROUP_NAME.getKey()); - String contextPath = headers.get(HeadersEnum.CONTEXT_PATH.getKey()); - - LocalDateTime endTime = LocalDateTime.now().plusSeconds(30); - ServerNode serverNode = new ServerNode(); - serverNode.setGroupName(groupName); - serverNode.setNodeType(NodeTypeEnum.CLIENT.getType()); - serverNode.setHostPort(hostPort); - serverNode.setHostIp(hostIp); - serverNode.setExpireAt(endTime); - serverNode.setCreateDt(LocalDateTime.now()); - serverNode.setContextPath(contextPath); - serverNode.setHostId(hostId); - - try { - int i = serverNodeMapper.insertOrUpdate(serverNode); - } catch (Exception e) { - LogUtils.error(log,"注册客户端失败", e); - } - }); - - } - public void syncVersion(Integer clientVersion, String groupName, String hostIp, Integer hostPort, String contextPath) { threadPoolExecutor.execute(() -> { diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerNodeBalance.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerNodeBalance.java new file mode 100644 index 00000000..9130e037 --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerNodeBalance.java @@ -0,0 +1,61 @@ +package com.aizuda.easy.retry.server.support.handler; + +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig; +import com.aizuda.easy.retry.server.persistence.support.ConfigAccess; +import com.aizuda.easy.retry.server.support.allocate.server.AllocateMessageQueueConsistentHash; +import com.aizuda.easy.retry.server.support.cache.CacheConsumerGroup; +import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable; +import com.aizuda.easy.retry.server.support.register.ServerRegister; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; + +/** + * @author: shuguang.zhang + * @date : 2023-06-08 15:58 + */ +@Component +@Slf4j +public class ServerNodeBalance { + + @Autowired + @Qualifier("configAccessProcessor") + private ConfigAccess configAccess; + + public void doBalance() { + + // 已经按照id 正序排序 + List prepareAllocateGroupConfig = configAccess.getAllOpenGroupConfig(); + if (CollectionUtils.isEmpty(prepareAllocateGroupConfig)) { + return; + } + + // 为了保证客户端分配算法的一致性,serverNodes 从数据库从数据获取 + Set podIpSet = CacheRegisterTable.getPodIdSet(ServerRegister.GROUP_NAME); + + if (CollectionUtils.isEmpty(podIpSet)) { + LogUtils.error(log, "服务端节点为空"); + return; + } + + Set groupNameSet = prepareAllocateGroupConfig.stream().map(GroupConfig::getGroupName) + .collect(Collectors.toSet()); + + List allocate = new AllocateMessageQueueConsistentHash() + .allocate(ServerRegisterNodeHandler.CURRENT_CID, new ArrayList<>(groupNameSet), new ArrayList<>(podIpSet)); + for (final String groupName : allocate) { + CacheConsumerGroup.addOrUpdate(groupName); + + } + } +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/AbstractRegister.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/AbstractRegister.java index b3ef8295..d6a7f2e2 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/AbstractRegister.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/AbstractRegister.java @@ -6,6 +6,7 @@ import com.aizuda.easy.retry.server.persistence.mybatis.mapper.ServerNodeMapper; import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode; import com.aizuda.easy.retry.server.support.Lifecycle; import com.aizuda.easy.retry.server.support.Register; +import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -20,7 +21,7 @@ import java.time.LocalDateTime; public abstract class AbstractRegister implements Register, Lifecycle { @Autowired - private ServerNodeMapper serverNodeMapper; + protected ServerNodeMapper serverNodeMapper; @Autowired private SystemProperties systemProperties; @@ -38,6 +39,8 @@ public abstract class AbstractRegister implements Register, Lifecycle { try { serverNodeMapper.insertOrUpdate(serverNode); + // 刷新本地缓存 + CacheRegisterTable.addOrUpdate(serverNode.getGroupName(), serverNode); }catch (Exception e) { LogUtils.error(log,"注册节点失败 groupName:[{}] hostIp:[{}]", serverNode.getGroupName(), serverNode.getHostIp(), e); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ClientRegister.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ClientRegister.java index 9d5347ca..38c84fc7 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ClientRegister.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ClientRegister.java @@ -2,12 +2,15 @@ package com.aizuda.easy.retry.server.support.register; import cn.hutool.core.lang.Assert; import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum; +import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; /** * 客户端注册 @@ -17,6 +20,7 @@ import java.util.concurrent.LinkedBlockingQueue; * @since 1.6.0 */ @Component +@Slf4j public class ClientRegister extends AbstractRegister { public static final int DELAY_TIME = 30; protected static final LinkedBlockingQueue QUEUE = new LinkedBlockingQueue<>(); @@ -54,14 +58,17 @@ public class ClientRegister extends AbstractRegister { @Override public void start() { new Thread(() -> { - try { - ServerNode serverNode = QUEUE.take(); - refreshExpireAt(serverNode); - } catch (InterruptedException e) { - e.printStackTrace(); + while (Thread.currentThread().isInterrupted()) { + try { + ServerNode serverNode = QUEUE.take(); + refreshExpireAt(serverNode); + // 防止刷的过快 + TimeUnit.MILLISECONDS.sleep(100); + } catch (InterruptedException e) { + LogUtils.error(log, "client refresh expireAt error."); + } } - - }).start(); + }, "client_register_").start(); } @Override diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ServerRegister.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ServerRegister.java index 37661835..b4be57ad 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ServerRegister.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ServerRegister.java @@ -8,9 +8,15 @@ import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.util.HostUtils; import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode; import com.aizuda.easy.retry.server.support.Register; +import com.aizuda.easy.retry.server.support.cache.CacheConsumerGroup; +import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable; +import com.aizuda.easy.retry.server.support.handler.ServerNodeBalance; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.time.LocalDateTime; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -28,6 +34,10 @@ public class ServerRegister extends AbstractRegister { private final ScheduledExecutorService serverRegisterNode = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r,"ServerRegisterNode")); public static final int DELAY_TIME = 30; public static final String CURRENT_CID; + public static final String GROUP_NAME = "DEFAULT_SERVER"; + @Autowired + public ServerNodeBalance serverNodeBalance; + static { CURRENT_CID = IdUtil.simpleUUID(); } @@ -40,6 +50,7 @@ public class ServerRegister extends AbstractRegister { @Override protected void beforeProcessor(RegisterContext context) { + context.setGroupName(GROUP_NAME); context.setHostId(CURRENT_CID); context.setHostIp(HostUtils.getIp()); context.setGroupName(StrUtil.EMPTY); @@ -54,6 +65,14 @@ public class ServerRegister extends AbstractRegister { @Override protected boolean doRegister(RegisterContext context, ServerNode serverNode) { refreshExpireAt(serverNode); + Set serverNodeSet = CacheRegisterTable.getServerNodeSet(context.getGroupName()); + for (final ServerNode node : serverNodeSet) { + ServerNode consumerServerNode = CacheConsumerGroup.get(node.getHostId()); + if (consumerServerNode.getExpireAt().isBefore(LocalDateTime.now())) { + // 触发rebalance + serverNodeBalance.doBalance(); + } + } return Boolean.TRUE; } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/ClearThreadSchedule.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/ClearThreadSchedule.java index ac1f714b..11db0381 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/ClearThreadSchedule.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/ClearThreadSchedule.java @@ -1,10 +1,15 @@ package com.aizuda.easy.retry.server.support.schedule; +import cn.hutool.core.lang.Assert; import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.persistence.mybatis.mapper.ServerNodeMapper; +import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode; import com.aizuda.easy.retry.server.persistence.support.ConfigAccess; import com.aizuda.easy.retry.server.service.RetryService; +import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable; import com.aizuda.easy.retry.server.support.handler.ServerRegisterNodeHandler; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.extern.slf4j.Slf4j; import net.javacrumbs.shedlock.spring.annotation.SchedulerLock; import org.springframework.beans.factory.annotation.Autowired; @@ -14,6 +19,7 @@ import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.util.Set; +import java.util.stream.Collectors; /** * 清除数据线程调度器 @@ -43,7 +49,19 @@ public class ClearThreadSchedule { public void clearOfflineNode() { try { - serverNodeMapper.deleteByExpireAt(LocalDateTime.now().minusSeconds(ServerRegisterNodeHandler.DELAY_TIME * 2)); + LocalDateTime endTime = LocalDateTime.now().minusSeconds(ServerRegisterNodeHandler.DELAY_TIME * 2); + Set allPods = CacheRegisterTable.getAllPods(); + Set waitOffline = allPods.stream().filter(serverNode -> serverNode.getExpireAt().isAfter(endTime)).collect(Collectors.toSet()); + Set podIds = waitOffline.stream().map(ServerNode::getHostId).collect(Collectors.toSet()); + + int delete = serverNodeMapper + .delete(new LambdaQueryWrapper().in(ServerNode::getHostId, podIds)); + Assert.isTrue(delete > 0, () -> new EasyRetryServerException("clearOfflineNode error")); + + for (final ServerNode serverNode : waitOffline) { + CacheRegisterTable.remove(serverNode.getGroupName(), serverNode.getHostId()); + } + } catch (Exception e) { LogUtils.error(log, "clearOfflineNode 失败", e); }