From 6a532ce528466ffaac2aec9f514877ebfc36b748 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Wed, 21 Jun 2023 11:37:43 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.0.0=201.=20=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E7=BB=84=E5=88=86=E9=85=8D=E4=B8=8Enetty=20client=20=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E4=B8=8D=E5=9C=A8=E4=B8=80=E4=B8=AA=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E7=AB=AFPOD=E5=AF=BC=E8=87=B4=E7=9A=84=E4=B8=8B=E5=8F=91?= =?UTF-8?q?=E9=87=8D=E8=AF=95=E6=B5=81=E7=A8=8B=E5=A4=B1=E8=B4=A5=202.=20?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=9E=81=E7=AB=AF=E6=83=85=E5=86=B5=E4=B8=8B?= =?UTF-8?q?=EF=BC=8C=E7=BB=84=E5=88=86=E9=85=8D=E4=B8=A2=E5=A4=B1=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../easy/retry/common/core/log/LogUtils.java | 2 +- .../support/cache/CacheConsumerGroup.java | 4 +++- .../support/cache/CacheRegisterTable.java | 22 +++++++++++++++-- .../support/dispatch/DispatchService.java | 2 +- .../actor/scan/ScanCallbackGroupActor.java | 2 +- .../dispatch/actor/scan/ScanGroupActor.java | 2 +- .../support/handler/ServerNodeBalance.java | 24 +++++++++++++++---- .../support/register/AbstractRegister.java | 7 ++---- .../support/register/ClientRegister.java | 24 +++++++++++++++++-- .../support/strategy/FilterStrategies.java | 13 +++++----- 10 files changed, 76 insertions(+), 26 deletions(-) diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/log/LogUtils.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/log/LogUtils.java index 6f712a1f..4e416fb1 100644 --- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/log/LogUtils.java +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/log/LogUtils.java @@ -69,7 +69,7 @@ public class LogUtils { try { Environment environment = SpringContext.CONTEXT.getBean(Environment.class); - return environment.getProperty("x.retry.log.status", Boolean.class, Boolean.TRUE); + return environment.getProperty("easy.retry.log.status", Boolean.class, Boolean.TRUE); } catch (Exception ignored) { } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheConsumerGroup.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheConsumerGroup.java index 8b2d3946..db6fee67 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheConsumerGroup.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheConsumerGroup.java @@ -35,7 +35,7 @@ public class CacheConsumerGroup implements Lifecycle { * * @return 缓存对象 */ - public static Set getAllPods() { + public static Set getAllConsumerGroupName() { ConcurrentMap concurrentMap = CACHE.asMap(); return new HashSet<>(concurrentMap.values()); @@ -57,10 +57,12 @@ public class CacheConsumerGroup implements Lifecycle { * @return 缓存对象 */ public static synchronized void addOrUpdate(String groupName) { + LogUtils.info(log, "add consumer cache. groupName:[{}]", groupName); CACHE.put(groupName, groupName); } public static void remove(String groupName) { + LogUtils.info(log, "Remove consumer cache. groupName:[{}]", groupName); CACHE.invalidate(groupName); } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheRegisterTable.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheRegisterTable.java index 9f2aef80..a4310e97 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheRegisterTable.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheRegisterTable.java @@ -11,8 +11,10 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; +import java.time.LocalDateTime; import java.util.Collections; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; @@ -99,13 +101,29 @@ public class CacheRegisterTable implements Lifecycle { .map(RegisterNodeInfo::getHostId).collect(Collectors.toSet()); } + + /** + * 刷新过期时间若不存在则初始化 + * + * @param groupName 组名称 + */ + public static synchronized void refreshExpireAt(String groupName, ServerNode serverNode) { + RegisterNodeInfo registerNodeInfo = getServerNode(groupName, serverNode.getHostId()); + // 不存在则初始化 + if (Objects.isNull(registerNodeInfo)) { + LogUtils.warn(log, "node not exists. groupName:[{}] hostId:[{}]", groupName, serverNode.getHostId()); + } else { + // 存在则刷新过期时间 + registerNodeInfo.setExpireAt(serverNode.getExpireAt()); + } + } + /** * 无缓存时添加 有缓存时更新 * * @return 缓存对象 */ public static synchronized void addOrUpdate(String groupName, ServerNode serverNode) { - ConcurrentMap concurrentMap = CACHE.getIfPresent(groupName); RegisterNodeInfo registerNodeInfo; if (Objects.isNull(concurrentMap)) { @@ -119,7 +137,7 @@ public class CacheRegisterTable implements Lifecycle { registerNodeInfo.setExpireAt(serverNode.getExpireAt()); } - LogUtils.debug(log, "Update cache. groupName:[{}] hostId:[{}] hostIp:[{}] expireAt:[{}]", groupName, + LogUtils.info(log, "Update cache. groupName:[{}] hostId:[{}] hostIp:[{}] expireAt:[{}]", groupName, serverNode.getHostId(), serverNode.getHostIp(), serverNode.getExpireAt()); concurrentMap.put(serverNode.getHostId(), registerNodeInfo); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/DispatchService.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/DispatchService.java index 0f322ba2..353fc7ad 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/DispatchService.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/DispatchService.java @@ -147,7 +147,7 @@ public class DispatchService implements Lifecycle { * @return {@link GroupConfig} 组上下文 */ private Set getCurrentHostGroupList() { - return CacheConsumerGroup.getAllPods(); + return CacheConsumerGroup.getAllConsumerGroupName(); } @Override diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanCallbackGroupActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanCallbackGroupActor.java index 7259a77f..71cb1833 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanCallbackGroupActor.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanCallbackGroupActor.java @@ -57,7 +57,7 @@ public class ScanCallbackGroupActor extends AbstractScanGroup { .withStopStrategy(StopStrategies.stopException()) .withStopStrategy(StopStrategies.stopResultStatus()) .withWaitStrategy(getWaitWaitStrategy()) - .withFilterStrategy(FilterStrategies.delayLevelFilter()) + .withFilterStrategy(FilterStrategies.triggerAtFilter()) .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) .withFilterStrategy(FilterStrategies.sceneBlackFilter()) .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanGroupActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanGroupActor.java index 2cf9f718..2043ab7b 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanGroupActor.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanGroupActor.java @@ -60,7 +60,7 @@ public class ScanGroupActor extends AbstractScanGroup { .withStopStrategy(StopStrategies.stopException()) .withStopStrategy(StopStrategies.stopResultStatusCode()) .withWaitStrategy(getWaitWaitStrategy(retryTask.getGroupName(), retryTask.getSceneName())) - .withFilterStrategy(FilterStrategies.delayLevelFilter()) + .withFilterStrategy(FilterStrategies.triggerAtFilter()) .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) .withFilterStrategy(FilterStrategies.sceneBlackFilter()) .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerNodeBalance.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerNodeBalance.java index a53eead1..8bb247a6 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerNodeBalance.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerNodeBalance.java @@ -1,5 +1,6 @@ package com.aizuda.easy.retry.server.support.handler; +import cn.hutool.core.lang.Opt; import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.dto.RegisterNodeInfo; @@ -23,7 +24,10 @@ import org.springframework.util.CollectionUtils; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; +import java.util.Objects; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -133,6 +137,16 @@ public class ServerNodeBalance implements Lifecycle, Runnable { } } + private void refreshExpireAtCache(List remotePods) { + + // 刷新最新的节点注册信息 + for (ServerNode node : remotePods) { + Optional.ofNullable(CacheRegisterTable.getServerNode(node.getGroupName(), node.getHostId())).ifPresent(registerNodeInfo -> { + registerNodeInfo.setExpireAt(node.getExpireAt()); + }); + } + } + private void refreshCache(List remotePods) { // 刷新最新的节点注册信息 @@ -175,8 +189,8 @@ public class ServerNodeBalance implements Lifecycle, Runnable { .eq(ServerNode::getNodeType, NodeTypeEnum.SERVER.getType())); // 获取缓存中的节点 - ConcurrentMap concurrentMap = CacheRegisterTable - .get(ServerRegister.GROUP_NAME); + ConcurrentMap concurrentMap = Optional.ofNullable(CacheRegisterTable + .get(ServerRegister.GROUP_NAME)).orElse(new ConcurrentHashMap<>()); Set remoteHostIds = remotePods.stream().map(ServerNode::getHostId).collect(Collectors.toSet()); @@ -189,7 +203,7 @@ public class ServerNodeBalance implements Lifecycle, Runnable { // 无缓存的节点触发refreshCache if (CollectionUtils.isEmpty(concurrentMap) // 节点数量不一致触发 - || isNodeSizeNotEqual(remotePods.size(), concurrentMap.size()) + || isNodeSizeNotEqual(concurrentMap.size(), remotePods.size()) // 若存在远程和本地缓存的组的数量不一致则触发rebalance || isGroupSizeNotEqual(removeGroupConfig, allGroup) // 判断远程节点是不是和本地节点一致的,如果不一致则重新分配 @@ -212,8 +226,8 @@ public class ServerNodeBalance implements Lifecycle, Runnable { } else { - // 重新刷新所有的缓存key - refreshCache(remotePods); + // 刷新过期时间 + refreshExpireAtCache(remotePods); // 再次获取最新的节点信息 concurrentMap = CacheRegisterTable diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/AbstractRegister.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/AbstractRegister.java index cf31ee76..159cc0ac 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/AbstractRegister.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/AbstractRegister.java @@ -1,8 +1,6 @@ package com.aizuda.easy.retry.server.support.register; import com.aizuda.easy.retry.common.core.log.LogUtils; -import com.aizuda.easy.retry.server.config.SystemProperties; -import com.aizuda.easy.retry.server.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.persistence.mybatis.mapper.ServerNodeMapper; import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode; import com.aizuda.easy.retry.server.support.Lifecycle; @@ -12,7 +10,6 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import java.time.LocalDateTime; -import java.util.Objects; /** * @author www.byteblogs.com @@ -39,8 +36,8 @@ public abstract class AbstractRegister implements Register, Lifecycle { try { serverNodeMapper.insertOrUpdate(serverNode); - // 刷新本地缓存 - CacheRegisterTable.addOrUpdate(serverNode.getGroupName(), serverNode); + // 刷新本地缓存过期时间 + CacheRegisterTable.refreshExpireAt(serverNode.getGroupName(), serverNode); }catch (Exception e) { LogUtils.error(log,"注册节点失败 groupName:[{}] hostIp:[{}]", serverNode.getGroupName(), serverNode.getHostIp(), e); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ClientRegister.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ClientRegister.java index 23ea8e4b..10817ecf 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ClientRegister.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ClientRegister.java @@ -3,11 +3,17 @@ package com.aizuda.easy.retry.server.support.register; import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode; +import com.aizuda.easy.retry.server.support.cache.CacheConsumerGroup; +import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import java.time.LocalDateTime; +import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -70,8 +76,22 @@ public class ClientRegister extends AbstractRegister implements Runnable { public void run() { while (!Thread.currentThread().isInterrupted()) { try { - ServerNode serverNode = QUEUE.take(); - refreshExpireAt(serverNode); + ServerNode serverNode = QUEUE.poll(5L, TimeUnit.SECONDS); + if (Objects.nonNull(serverNode)) { + refreshExpireAt(serverNode); + } + + // 同步当前POD消费的组的节点信息 + // netty的client只会注册到一个服务端,若组分配的和client连接的不是一个POD则会导致当前POD没有其他客户端的注册信息 + Set allConsumerGroupName = CacheConsumerGroup.getAllConsumerGroupName(); + if (!CollectionUtils.isEmpty(allConsumerGroupName)) { + List serverNodes = serverNodeMapper.selectList( + new LambdaQueryWrapper().in(ServerNode::getGroupName, allConsumerGroupName)); + for (final ServerNode node : serverNodes) { + // 刷新全量本地缓存 + CacheRegisterTable.addOrUpdate(node.getGroupName(), node); + } + } }catch (InterruptedException e) { LogUtils.error(log, "[{}] thread interrupt.", Thread.currentThread().getName()); } catch (Exception e) { diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/strategy/FilterStrategies.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/strategy/FilterStrategies.java index 10d51784..36cda6e4 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/strategy/FilterStrategies.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/strategy/FilterStrategies.java @@ -13,7 +13,6 @@ import com.aizuda.easy.retry.server.support.FilterStrategy; import com.aizuda.easy.retry.server.support.IdempotentStrategy; import com.aizuda.easy.retry.server.support.RetryContext; import com.aizuda.easy.retry.server.support.cache.CacheGroupRateLimiter; -import com.aizuda.easy.retry.server.support.context.MaxAttemptsPersistenceRetryContext; import lombok.extern.slf4j.Slf4j; import java.time.LocalDateTime; @@ -33,12 +32,12 @@ public class FilterStrategies { } /** - * 延迟等级的过滤策略 + * 触发时间过滤策略 * - * @return {@link DelayLevelFilterStrategies} 延迟等级的过滤策略 + * @return {@link TriggerAtFilterStrategies} 触发时间过滤策略 */ - public static FilterStrategy delayLevelFilter() { - return new DelayLevelFilterStrategies(); + public static FilterStrategy triggerAtFilter() { + return new TriggerAtFilterStrategies(); } /** @@ -87,11 +86,11 @@ public class FilterStrategies { } /** - * 延迟等级的过滤策略 + * 触发时间过滤策略 *

* 根据延迟等级的时间计算下次触发时间是否小于当前时间,满足则返回true 否则返回false */ - private static final class DelayLevelFilterStrategies implements FilterStrategy { + private static final class TriggerAtFilterStrategies implements FilterStrategy { @Override public boolean filter(RetryContext retryContext) {