fix: 2.5.0

1. 修复多节点服务端存在某节点无客户端连接时,手动触发和自动触发任务失败
2.修复告警通知状态枚举错误
This commit is contained in:
byteblogs168 2023-12-07 18:29:08 +08:00
parent 0d64fb7e4f
commit 6085001fd4
8 changed files with 139 additions and 41 deletions

View File

@ -11,6 +11,7 @@ import com.aizuda.easy.retry.server.common.triple.Triple;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobNotifyConfigMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobNotifyConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Maps;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.util.CollectionUtils;
@ -51,14 +52,14 @@ public abstract class AbstractJobAlarm<E extends ApplicationEvent> extends Abstr
// 批量获取所需的通知配置
List<JobNotifyConfig> jobNotifyConfigs = jobNotifyConfigMapper.selectList(
new LambdaQueryWrapper<JobNotifyConfig>()
.eq(JobNotifyConfig::getNotifyStatus, StatusEnum.YES)
.eq(JobNotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus())
.eq(JobNotifyConfig::getNotifyScene, getNotifyScene())
.in(JobNotifyConfig::getNamespaceId, namespaceIds)
.in(JobNotifyConfig::getGroupName, groupNames)
.in(JobNotifyConfig::getJobId, jobIds)
);
if (CollectionUtils.isEmpty(jobNotifyConfigs)) {
return null;
return Maps.newHashMap();
}
List<NotifyConfigInfo> notifyConfigInfos = AlarmInfoConverter.INSTANCE.jobToNotifyConfigInfos(jobNotifyConfigs);

View File

@ -55,7 +55,7 @@ public abstract class AbstractRetryAlarm<E extends ApplicationEvent> extends Abs
// 批量获取所需的通知配置
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess().list(
new LambdaQueryWrapper<NotifyConfig>()
.eq(NotifyConfig::getNotifyStatus, StatusEnum.YES)
.eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus())
.eq(NotifyConfig::getNotifyScene, getNotifyScene())
.in(NotifyConfig::getNamespaceId, namespaceIds)
.in(NotifyConfig::getGroupName, groupNames)

View File

@ -1,20 +1,31 @@
package com.aizuda.easy.retry.server.common.cache;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.RegisterNodeInfoConverter;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.register.ServerRegister;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.ServerNodeMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
@ -43,15 +54,15 @@ public class CacheRegisterTable implements Lifecycle {
public static Set<RegisterNodeInfo> getAllPods() {
ConcurrentMap<String, ConcurrentMap<String, RegisterNodeInfo>> concurrentMap = CACHE.asMap();
if (CollectionUtils.isEmpty(concurrentMap)) {
return Collections.EMPTY_SET;
return Sets.newHashSet();
}
return concurrentMap.values().stream()
.map(stringServerNodeConcurrentMap -> new TreeSet(stringServerNodeConcurrentMap.values()))
.reduce((s, y) -> {
s.addAll(y);
return s;
}).get();
.map(stringServerNodeConcurrentMap -> new TreeSet(stringServerNodeConcurrentMap.values()))
.reduce((s, y) -> {
s.addAll(y);
return s;
}).get();
}
@ -72,7 +83,25 @@ public class CacheRegisterTable implements Lifecycle {
public static RegisterNodeInfo getServerNode(String groupName, String namespaceId, String hostId) {
ConcurrentMap<String, RegisterNodeInfo> concurrentMap = CACHE.getIfPresent(getKey(groupName, namespaceId));
if (Objects.isNull(concurrentMap)) {
return null;
// 此处为了降级若缓存中没有则取DB中查询
ServerNodeMapper serverNodeMapper = SpringContext.getBeanByType(ServerNodeMapper.class);
List<ServerNode> serverNodes = serverNodeMapper.selectList(
new LambdaQueryWrapper<ServerNode>()
.eq(ServerNode::getNodeType, NodeTypeEnum.CLIENT.getType())
.eq(ServerNode::getNamespaceId, namespaceId)
.eq(ServerNode::getGroupName, groupName)
.eq(ServerNode::getHostId, hostId)
.orderByDesc(ServerNode::getExpireAt));
if (CollectionUtils.isEmpty(serverNodes)) {
return null;
}
CacheRegisterTable.addOrUpdate(serverNodes.get(0));
concurrentMap = CACHE.getIfPresent(getKey(groupName, namespaceId));
if (CollectionUtils.isEmpty(concurrentMap)) {
return null;
}
}
return concurrentMap.get(hostId);
@ -86,7 +115,23 @@ public class CacheRegisterTable implements Lifecycle {
public static Set<RegisterNodeInfo> getServerNodeSet(String groupName, String namespaceId) {
ConcurrentMap<String, RegisterNodeInfo> concurrentMap = CACHE.getIfPresent(getKey(groupName, namespaceId));
if (CollectionUtils.isEmpty(concurrentMap)) {
return Collections.EMPTY_SET;
// 此处为了降级若缓存中没有则取DB中查询
ServerNodeMapper serverNodeMapper = SpringContext.getBeanByType(ServerNodeMapper.class);
List<ServerNode> serverNodes = serverNodeMapper.selectList(
new LambdaQueryWrapper<ServerNode>()
.eq(ServerNode::getNodeType, NodeTypeEnum.CLIENT.getType())
.eq(ServerNode::getNamespaceId, namespaceId)
.eq(ServerNode::getGroupName, groupName));
for (final ServerNode node : serverNodes) {
// 刷新全量本地缓存
CacheRegisterTable.addOrUpdate(node);
}
concurrentMap = CACHE.getIfPresent(getKey(groupName, namespaceId));
if (CollectionUtils.isEmpty(serverNodes) || CollectionUtils.isEmpty(concurrentMap)) {
return Sets.newHashSet();
}
}
return new TreeSet<>(concurrentMap.values());
@ -103,7 +148,7 @@ public class CacheRegisterTable implements Lifecycle {
*/
public static Set<String> getPodIdSet(String groupName, String namespaceId) {
return getServerNodeSet(groupName, namespaceId).stream()
.map(RegisterNodeInfo::getHostId).collect(Collectors.toSet());
.map(RegisterNodeInfo::getHostId).collect(Collectors.toSet());
}
@ -113,10 +158,12 @@ public class CacheRegisterTable implements Lifecycle {
* @param serverNode 服务节点
*/
public static synchronized void refreshExpireAt(ServerNode serverNode) {
RegisterNodeInfo registerNodeInfo = getServerNode(serverNode.getGroupName(), serverNode.getNamespaceId(), serverNode.getHostId());
RegisterNodeInfo registerNodeInfo = getServerNode(serverNode.getGroupName(), serverNode.getNamespaceId(),
serverNode.getHostId());
// 不存在则初始化
if (Objects.isNull(registerNodeInfo)) {
LogUtils.warn(log, "node not exists. groupName:[{}] hostId:[{}]", serverNode.getGroupName(), serverNode.getHostId());
LogUtils.warn(log, "node not exists. groupName:[{}] hostId:[{}]", serverNode.getGroupName(),
serverNode.getHostId());
} else {
// 存在则刷新过期时间
registerNodeInfo.setExpireAt(serverNode.getExpireAt());
@ -129,22 +176,43 @@ public class CacheRegisterTable implements Lifecycle {
* @return 缓存对象
*/
public static synchronized void addOrUpdate(ServerNode serverNode) {
ConcurrentMap<String, RegisterNodeInfo> concurrentMap = CACHE.getIfPresent(getKey(serverNode.getGroupName(), serverNode.getNamespaceId()));
ConcurrentMap<String, RegisterNodeInfo> concurrentMap = CACHE.getIfPresent(
getKey(serverNode.getGroupName(), serverNode.getNamespaceId()));
RegisterNodeInfo registerNodeInfo;
if (Objects.isNull(concurrentMap)) {
LogUtils.info(log, "Add cache. groupName:[{}] namespaceId:[{}] hostId:[{}]", serverNode.getGroupName(), serverNode.getNamespaceId(), serverNode.getHostId());
LogUtils.info(log, "Add cache. groupName:[{}] namespaceId:[{}] hostId:[{}]", serverNode.getGroupName(),
serverNode.getNamespaceId(), serverNode.getHostId());
concurrentMap = new ConcurrentHashMap<>();
registerNodeInfo = RegisterNodeInfoConverter.INSTANCE.toRegisterNodeInfo(serverNode);
CACHE.put(getKey(serverNode.getGroupName(), serverNode.getNamespaceId()), concurrentMap);
} else {
// 复用缓存中的对象
registerNodeInfo = concurrentMap.getOrDefault(serverNode.getHostId(), RegisterNodeInfoConverter.INSTANCE.toRegisterNodeInfo(serverNode));
registerNodeInfo = concurrentMap.getOrDefault(serverNode.getHostId(),
RegisterNodeInfoConverter.INSTANCE.toRegisterNodeInfo(serverNode));
registerNodeInfo.setExpireAt(serverNode.getExpireAt());
// 删除过期的节点信息
delExpireNode(concurrentMap);
}
concurrentMap.put(serverNode.getHostId(), registerNodeInfo);
// 此缓存设置了60秒没有写入即过期因此这次刷新缓存防止过期
CACHE.put(getKey(serverNode.getGroupName(), serverNode.getNamespaceId()), concurrentMap);
}
/**
* 删除过期的节点信息
* @param concurrentMap 并发映射的节点信息
*/
private static void delExpireNode(final ConcurrentMap<String, RegisterNodeInfo> concurrentMap) {
concurrentMap.values().stream()
.filter(registerNodeInfo -> registerNodeInfo.getExpireAt().isBefore(
LocalDateTime.now().minusSeconds(ServerRegister.DELAY_TIME + (ServerRegister.DELAY_TIME / 3))))
.forEach(registerNodeInfo -> remove(registerNodeInfo.getGroupName(),
registerNodeInfo.getNamespaceId(), registerNodeInfo.getHostId()));
}
/**
* 删除缓存
*
@ -167,13 +235,11 @@ public class CacheRegisterTable implements Lifecycle {
CACHE = CacheBuilder.newBuilder()
// 设置并发级别为cpu核心数
.concurrencyLevel(Runtime.getRuntime().availableProcessors())
// 设置写缓存后60秒过期
.expireAfterWrite(60, TimeUnit.SECONDS)
.build();
}
public static void main(String[] args) {
}
@Override
public void close() {

View File

@ -199,6 +199,10 @@ public class RpcClientInvokeHandler implements InvocationHandler {
if (ex.getClass().isAssignableFrom(RetryException.class)) {
RetryException re = (RetryException) ex;
throwable = re.getLastFailedAttempt().getExceptionCause();
if (throwable instanceof ResourceAccessException) {
// 若重试之后该接口仍然有问题进行路由剔除处理
CacheRegisterTable.remove(groupName, namespaceId, hostId);
}
}
throw throwable;

View File

@ -30,9 +30,15 @@ public abstract class AbstractRegister implements Register, Lifecycle {
ServerNode serverNode = initServerNode(context);
return doRegister(context, serverNode);
boolean result = doRegister(context, serverNode);
afterProcessor(serverNode);
return result;
}
protected abstract void afterProcessor(final ServerNode serverNode);
protected void refreshExpireAt(List<ServerNode> serverNodes) {
try {

View File

@ -46,7 +46,6 @@ public class ClientRegister extends AbstractRegister implements Runnable {
public boolean supports(int type) {
return getNodeType().equals(type);
}
@Override
protected void beforeProcessor(RegisterContext context) {
}
@ -65,6 +64,12 @@ public class ClientRegister extends AbstractRegister implements Runnable {
return QUEUE.offerLast(serverNode);
}
@Override
protected void afterProcessor(final ServerNode serverNode) {
}
@Override
protected Integer getNodeType() {
return NodeTypeEnum.CLIENT.getType();
@ -100,22 +105,6 @@ public class ClientRegister extends AbstractRegister implements Runnable {
refreshExpireAt(lists);
}
// 同步当前POD消费的组的节点信息
// netty的client只会注册到一个服务端若组分配的和client连接的不是一个POD则会导致当前POD没有其他客户端的注册信息
ConcurrentMap<String, String> allConsumerGroupName = CacheConsumerGroup.getAllConsumerGroupName();
if (!CollectionUtils.isEmpty(allConsumerGroupName)) {
List<ServerNode> serverNodes = serverNodeMapper.selectList(
new LambdaQueryWrapper<ServerNode>()
.eq(ServerNode::getNodeType, NodeTypeEnum.CLIENT.getType())
.in(ServerNode::getNamespaceId, new HashSet<>(allConsumerGroupName.values()))
.in(ServerNode::getGroupName, allConsumerGroupName.keySet()));
for (final ServerNode node : serverNodes) {
// 刷新全量本地缓存
CacheRegisterTable.addOrUpdate(node);
}
}
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
} catch (Exception e) {

View File

@ -8,18 +8,25 @@ import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.HostUtils;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.Register;
import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.ServerNodeExtAttrs;
import com.aizuda.easy.retry.server.common.handler.ServerNodeBalance;
import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.web.ServerProperties;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -57,7 +64,6 @@ public class ServerRegister extends AbstractRegister {
return getNodeType().equals(type);
}
@Override
protected void beforeProcessor(RegisterContext context) {
// 新增扩展参数
@ -84,6 +90,29 @@ public class ServerRegister extends AbstractRegister {
return Boolean.TRUE;
}
@Override
protected void afterProcessor(final ServerNode serverNode) {
try {
// 同步当前POD消费的组的节点信息
// netty的client只会注册到一个服务端若组分配的和client连接的不是一个POD则会导致当前POD没有其他客户端的注册信息
ConcurrentMap<String /*groupName*/, String/*namespaceId*/> allConsumerGroupName = CacheConsumerGroup.getAllConsumerGroupName();
if (!CollectionUtils.isEmpty(allConsumerGroupName)) {
List<ServerNode> serverNodes = serverNodeMapper.selectList(
new LambdaQueryWrapper<ServerNode>()
.eq(ServerNode::getNodeType, NodeTypeEnum.CLIENT.getType())
.in(ServerNode::getNamespaceId, new HashSet<>(allConsumerGroupName.values()))
.in(ServerNode::getGroupName, allConsumerGroupName.keySet()));
for (final ServerNode node : serverNodes) {
// 刷新全量本地缓存
CacheRegisterTable.addOrUpdate(node);
}
}
} catch (Exception e) {
LogUtils.error(log, "刷新客户端失败", e);
}
}
@Override
protected Integer getNodeType() {
return NodeTypeEnum.SERVER.getType();

View File

@ -3,6 +3,7 @@ package com.aizuda.easy.retry.server.retry.task.support.strategy;
import cn.hutool.core.lang.Pair;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.dto.DistributeInstance;
import com.aizuda.easy.retry.server.retry.task.support.FilterStrategy;
@ -192,6 +193,8 @@ public class FilterStrategies {
ServerNodeMapper serverNodeMapper = SpringContext.getBeanByType(ServerNodeMapper.class);
boolean result = 1 == serverNodeMapper.selectCount(new LambdaQueryWrapper<ServerNode>().eq(ServerNode::getHostId, serverNode.getHostId()));
if (!result) {
// 删除缓存中的失效节点
CacheRegisterTable.remove(retryTask.getGroupName(), retryTask.getNamespaceId(), serverNode.getHostId());
description.append(MessageFormat.format("DB中未查询到客户端节点. hostId:[{0}] uniqueId:[{1}]", serverNode.getHostId(), retryTask.getUniqueId()));
}