diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/alarm/AbstractJobAlarm.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/alarm/AbstractJobAlarm.java index 809c1f63..4e9014e3 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/alarm/AbstractJobAlarm.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/alarm/AbstractJobAlarm.java @@ -11,6 +11,7 @@ import com.aizuda.easy.retry.server.common.triple.Triple; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobNotifyConfigMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.JobNotifyConfig; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.google.common.collect.Maps; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationEvent; import org.springframework.util.CollectionUtils; @@ -51,14 +52,14 @@ public abstract class AbstractJobAlarm extends Abstr // 批量获取所需的通知配置 List jobNotifyConfigs = jobNotifyConfigMapper.selectList( new LambdaQueryWrapper() - .eq(JobNotifyConfig::getNotifyStatus, StatusEnum.YES) + .eq(JobNotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus()) .eq(JobNotifyConfig::getNotifyScene, getNotifyScene()) .in(JobNotifyConfig::getNamespaceId, namespaceIds) .in(JobNotifyConfig::getGroupName, groupNames) .in(JobNotifyConfig::getJobId, jobIds) ); if (CollectionUtils.isEmpty(jobNotifyConfigs)) { - return null; + return Maps.newHashMap(); } List notifyConfigInfos = AlarmInfoConverter.INSTANCE.jobToNotifyConfigInfos(jobNotifyConfigs); diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/alarm/AbstractRetryAlarm.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/alarm/AbstractRetryAlarm.java index 116f968b..b590e5c1 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/alarm/AbstractRetryAlarm.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/alarm/AbstractRetryAlarm.java @@ -55,7 +55,7 @@ public abstract class AbstractRetryAlarm extends Abs // 批量获取所需的通知配置 List notifyConfigs = accessTemplate.getNotifyConfigAccess().list( new LambdaQueryWrapper() - .eq(NotifyConfig::getNotifyStatus, StatusEnum.YES) + .eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus()) .eq(NotifyConfig::getNotifyScene, getNotifyScene()) .in(NotifyConfig::getNamespaceId, namespaceIds) .in(NotifyConfig::getGroupName, groupNames) diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheRegisterTable.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheRegisterTable.java index 1aec0617..deb08899 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheRegisterTable.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheRegisterTable.java @@ -1,20 +1,31 @@ package com.aizuda.easy.retry.server.common.cache; import cn.hutool.core.util.StrUtil; +import com.aizuda.easy.retry.common.core.context.SpringContext; +import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.common.Lifecycle; import com.aizuda.easy.retry.server.common.RegisterNodeInfoConverter; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; +import com.aizuda.easy.retry.server.common.register.ServerRegister; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.ServerNodeMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; +import java.time.LocalDateTime; import java.util.Collections; +import java.util.HashSet; +import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; @@ -43,15 +54,15 @@ public class CacheRegisterTable implements Lifecycle { public static Set getAllPods() { ConcurrentMap> concurrentMap = CACHE.asMap(); if (CollectionUtils.isEmpty(concurrentMap)) { - return Collections.EMPTY_SET; + return Sets.newHashSet(); } return concurrentMap.values().stream() - .map(stringServerNodeConcurrentMap -> new TreeSet(stringServerNodeConcurrentMap.values())) - .reduce((s, y) -> { - s.addAll(y); - return s; - }).get(); + .map(stringServerNodeConcurrentMap -> new TreeSet(stringServerNodeConcurrentMap.values())) + .reduce((s, y) -> { + s.addAll(y); + return s; + }).get(); } @@ -72,7 +83,25 @@ public class CacheRegisterTable implements Lifecycle { public static RegisterNodeInfo getServerNode(String groupName, String namespaceId, String hostId) { ConcurrentMap concurrentMap = CACHE.getIfPresent(getKey(groupName, namespaceId)); if (Objects.isNull(concurrentMap)) { - return null; + // 此处为了降级,若缓存中没有则取DB中查询 + ServerNodeMapper serverNodeMapper = SpringContext.getBeanByType(ServerNodeMapper.class); + List serverNodes = serverNodeMapper.selectList( + new LambdaQueryWrapper() + .eq(ServerNode::getNodeType, NodeTypeEnum.CLIENT.getType()) + .eq(ServerNode::getNamespaceId, namespaceId) + .eq(ServerNode::getGroupName, groupName) + .eq(ServerNode::getHostId, hostId) + .orderByDesc(ServerNode::getExpireAt)); + if (CollectionUtils.isEmpty(serverNodes)) { + return null; + } + + CacheRegisterTable.addOrUpdate(serverNodes.get(0)); + + concurrentMap = CACHE.getIfPresent(getKey(groupName, namespaceId)); + if (CollectionUtils.isEmpty(concurrentMap)) { + return null; + } } return concurrentMap.get(hostId); @@ -86,7 +115,23 @@ public class CacheRegisterTable implements Lifecycle { public static Set getServerNodeSet(String groupName, String namespaceId) { ConcurrentMap concurrentMap = CACHE.getIfPresent(getKey(groupName, namespaceId)); if (CollectionUtils.isEmpty(concurrentMap)) { - return Collections.EMPTY_SET; + + // 此处为了降级,若缓存中没有则取DB中查询 + ServerNodeMapper serverNodeMapper = SpringContext.getBeanByType(ServerNodeMapper.class); + List serverNodes = serverNodeMapper.selectList( + new LambdaQueryWrapper() + .eq(ServerNode::getNodeType, NodeTypeEnum.CLIENT.getType()) + .eq(ServerNode::getNamespaceId, namespaceId) + .eq(ServerNode::getGroupName, groupName)); + for (final ServerNode node : serverNodes) { + // 刷新全量本地缓存 + CacheRegisterTable.addOrUpdate(node); + } + + concurrentMap = CACHE.getIfPresent(getKey(groupName, namespaceId)); + if (CollectionUtils.isEmpty(serverNodes) || CollectionUtils.isEmpty(concurrentMap)) { + return Sets.newHashSet(); + } } return new TreeSet<>(concurrentMap.values()); @@ -103,7 +148,7 @@ public class CacheRegisterTable implements Lifecycle { */ public static Set getPodIdSet(String groupName, String namespaceId) { return getServerNodeSet(groupName, namespaceId).stream() - .map(RegisterNodeInfo::getHostId).collect(Collectors.toSet()); + .map(RegisterNodeInfo::getHostId).collect(Collectors.toSet()); } @@ -113,10 +158,12 @@ public class CacheRegisterTable implements Lifecycle { * @param serverNode 服务节点 */ public static synchronized void refreshExpireAt(ServerNode serverNode) { - RegisterNodeInfo registerNodeInfo = getServerNode(serverNode.getGroupName(), serverNode.getNamespaceId(), serverNode.getHostId()); + RegisterNodeInfo registerNodeInfo = getServerNode(serverNode.getGroupName(), serverNode.getNamespaceId(), + serverNode.getHostId()); // 不存在则初始化 if (Objects.isNull(registerNodeInfo)) { - LogUtils.warn(log, "node not exists. groupName:[{}] hostId:[{}]", serverNode.getGroupName(), serverNode.getHostId()); + LogUtils.warn(log, "node not exists. groupName:[{}] hostId:[{}]", serverNode.getGroupName(), + serverNode.getHostId()); } else { // 存在则刷新过期时间 registerNodeInfo.setExpireAt(serverNode.getExpireAt()); @@ -129,22 +176,43 @@ public class CacheRegisterTable implements Lifecycle { * @return 缓存对象 */ public static synchronized void addOrUpdate(ServerNode serverNode) { - ConcurrentMap concurrentMap = CACHE.getIfPresent(getKey(serverNode.getGroupName(), serverNode.getNamespaceId())); + ConcurrentMap concurrentMap = CACHE.getIfPresent( + getKey(serverNode.getGroupName(), serverNode.getNamespaceId())); RegisterNodeInfo registerNodeInfo; if (Objects.isNull(concurrentMap)) { - LogUtils.info(log, "Add cache. groupName:[{}] namespaceId:[{}] hostId:[{}]", serverNode.getGroupName(), serverNode.getNamespaceId(), serverNode.getHostId()); + LogUtils.info(log, "Add cache. groupName:[{}] namespaceId:[{}] hostId:[{}]", serverNode.getGroupName(), + serverNode.getNamespaceId(), serverNode.getHostId()); concurrentMap = new ConcurrentHashMap<>(); registerNodeInfo = RegisterNodeInfoConverter.INSTANCE.toRegisterNodeInfo(serverNode); - CACHE.put(getKey(serverNode.getGroupName(), serverNode.getNamespaceId()), concurrentMap); + } else { // 复用缓存中的对象 - registerNodeInfo = concurrentMap.getOrDefault(serverNode.getHostId(), RegisterNodeInfoConverter.INSTANCE.toRegisterNodeInfo(serverNode)); + registerNodeInfo = concurrentMap.getOrDefault(serverNode.getHostId(), + RegisterNodeInfoConverter.INSTANCE.toRegisterNodeInfo(serverNode)); registerNodeInfo.setExpireAt(serverNode.getExpireAt()); + + // 删除过期的节点信息 + delExpireNode(concurrentMap); } concurrentMap.put(serverNode.getHostId(), registerNodeInfo); + // 此缓存设置了60秒没有写入即过期,因此这次刷新缓存防止过期 + CACHE.put(getKey(serverNode.getGroupName(), serverNode.getNamespaceId()), concurrentMap); } + /** + * 删除过期的节点信息 + * @param concurrentMap 并发映射的节点信息 + */ + private static void delExpireNode(final ConcurrentMap concurrentMap) { + concurrentMap.values().stream() + .filter(registerNodeInfo -> registerNodeInfo.getExpireAt().isBefore( + LocalDateTime.now().minusSeconds(ServerRegister.DELAY_TIME + (ServerRegister.DELAY_TIME / 3)))) + .forEach(registerNodeInfo -> remove(registerNodeInfo.getGroupName(), + registerNodeInfo.getNamespaceId(), registerNodeInfo.getHostId())); + } + + /** * 删除缓存 * @@ -167,13 +235,11 @@ public class CacheRegisterTable implements Lifecycle { CACHE = CacheBuilder.newBuilder() // 设置并发级别为cpu核心数 .concurrencyLevel(Runtime.getRuntime().availableProcessors()) + // 设置写缓存后60秒过期 + .expireAfterWrite(60, TimeUnit.SECONDS) .build(); - } - public static void main(String[] args) { - - } @Override public void close() { diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RpcClientInvokeHandler.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RpcClientInvokeHandler.java index f9d55d65..a2755cb1 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RpcClientInvokeHandler.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RpcClientInvokeHandler.java @@ -199,6 +199,10 @@ public class RpcClientInvokeHandler implements InvocationHandler { if (ex.getClass().isAssignableFrom(RetryException.class)) { RetryException re = (RetryException) ex; throwable = re.getLastFailedAttempt().getExceptionCause(); + if (throwable instanceof ResourceAccessException) { + // 若重试之后该接口仍然有问题,进行路由剔除处理 + CacheRegisterTable.remove(groupName, namespaceId, hostId); + } } throw throwable; diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/AbstractRegister.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/AbstractRegister.java index 52b5f0f3..cf88555b 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/AbstractRegister.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/AbstractRegister.java @@ -30,9 +30,15 @@ public abstract class AbstractRegister implements Register, Lifecycle { ServerNode serverNode = initServerNode(context); - return doRegister(context, serverNode); + boolean result = doRegister(context, serverNode); + + afterProcessor(serverNode); + + return result; } + protected abstract void afterProcessor(final ServerNode serverNode); + protected void refreshExpireAt(List serverNodes) { try { diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ClientRegister.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ClientRegister.java index bbc2b9ec..1ee85b12 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ClientRegister.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ClientRegister.java @@ -46,7 +46,6 @@ public class ClientRegister extends AbstractRegister implements Runnable { public boolean supports(int type) { return getNodeType().equals(type); } - @Override protected void beforeProcessor(RegisterContext context) { } @@ -65,6 +64,12 @@ public class ClientRegister extends AbstractRegister implements Runnable { return QUEUE.offerLast(serverNode); } + @Override + protected void afterProcessor(final ServerNode serverNode) { + + } + + @Override protected Integer getNodeType() { return NodeTypeEnum.CLIENT.getType(); @@ -100,22 +105,6 @@ public class ClientRegister extends AbstractRegister implements Runnable { refreshExpireAt(lists); } - - // 同步当前POD消费的组的节点信息 - // netty的client只会注册到一个服务端,若组分配的和client连接的不是一个POD则会导致当前POD没有其他客户端的注册信息 - ConcurrentMap allConsumerGroupName = CacheConsumerGroup.getAllConsumerGroupName(); - if (!CollectionUtils.isEmpty(allConsumerGroupName)) { - List serverNodes = serverNodeMapper.selectList( - new LambdaQueryWrapper() - .eq(ServerNode::getNodeType, NodeTypeEnum.CLIENT.getType()) - .in(ServerNode::getNamespaceId, new HashSet<>(allConsumerGroupName.values())) - .in(ServerNode::getGroupName, allConsumerGroupName.keySet())); - for (final ServerNode node : serverNodes) { - // 刷新全量本地缓存 - CacheRegisterTable.addOrUpdate(node); - } - } - } catch (InterruptedException ignored) { Thread.currentThread().interrupt(); } catch (Exception e) { diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ServerRegister.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ServerRegister.java index 7cbea8af..fdcd2700 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ServerRegister.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ServerRegister.java @@ -8,18 +8,25 @@ import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.util.HostUtils; import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.common.Register; +import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup; +import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.dto.ServerNodeExtAttrs; import com.aizuda.easy.retry.server.common.handler.ServerNodeBalance; import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.web.ServerProperties; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import java.time.LocalDateTime; +import java.util.HashSet; +import java.util.List; import java.util.Optional; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -57,7 +64,6 @@ public class ServerRegister extends AbstractRegister { return getNodeType().equals(type); } - @Override protected void beforeProcessor(RegisterContext context) { // 新增扩展参数 @@ -84,6 +90,29 @@ public class ServerRegister extends AbstractRegister { return Boolean.TRUE; } + + @Override + protected void afterProcessor(final ServerNode serverNode) { + try { + // 同步当前POD消费的组的节点信息 + // netty的client只会注册到一个服务端,若组分配的和client连接的不是一个POD则会导致当前POD没有其他客户端的注册信息 + ConcurrentMap allConsumerGroupName = CacheConsumerGroup.getAllConsumerGroupName(); + if (!CollectionUtils.isEmpty(allConsumerGroupName)) { + List serverNodes = serverNodeMapper.selectList( + new LambdaQueryWrapper() + .eq(ServerNode::getNodeType, NodeTypeEnum.CLIENT.getType()) + .in(ServerNode::getNamespaceId, new HashSet<>(allConsumerGroupName.values())) + .in(ServerNode::getGroupName, allConsumerGroupName.keySet())); + for (final ServerNode node : serverNodes) { + // 刷新全量本地缓存 + CacheRegisterTable.addOrUpdate(node); + } + } + } catch (Exception e) { + LogUtils.error(log, "刷新客户端失败", e); + } + } + @Override protected Integer getNodeType() { return NodeTypeEnum.SERVER.getType(); diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/FilterStrategies.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/FilterStrategies.java index 046e8756..83eb9695 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/FilterStrategies.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/FilterStrategies.java @@ -3,6 +3,7 @@ package com.aizuda.easy.retry.server.retry.task.support.strategy; import cn.hutool.core.lang.Pair; import com.aizuda.easy.retry.common.core.context.SpringContext; import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.common.dto.DistributeInstance; import com.aizuda.easy.retry.server.retry.task.support.FilterStrategy; @@ -192,6 +193,8 @@ public class FilterStrategies { ServerNodeMapper serverNodeMapper = SpringContext.getBeanByType(ServerNodeMapper.class); boolean result = 1 == serverNodeMapper.selectCount(new LambdaQueryWrapper().eq(ServerNode::getHostId, serverNode.getHostId())); if (!result) { + // 删除缓存中的失效节点 + CacheRegisterTable.remove(retryTask.getGroupName(), retryTask.getNamespaceId(), serverNode.getHostId()); description.append(MessageFormat.format("DB中未查询到客户端节点. hostId:[{0}] uniqueId:[{1}]", serverNode.getHostId(), retryTask.getUniqueId())); }