feat: 2.0.0

1. 修复组分配与netty client 连接不在一个服务端POD导致的下发重试流程失败
2. 修复极端情况下,组分配丢失问题
This commit is contained in:
byteblogs168 2023-06-21 11:37:43 +08:00
parent f714046f2a
commit 6a532ce528
10 changed files with 76 additions and 26 deletions

View File

@ -69,7 +69,7 @@ public class LogUtils {
try {
Environment environment = SpringContext.CONTEXT.getBean(Environment.class);
return environment.getProperty("x.retry.log.status", Boolean.class, Boolean.TRUE);
return environment.getProperty("easy.retry.log.status", Boolean.class, Boolean.TRUE);
} catch (Exception ignored) {
}

View File

@ -35,7 +35,7 @@ public class CacheConsumerGroup implements Lifecycle {
*
* @return 缓存对象
*/
public static Set<String> getAllPods() {
public static Set<String> getAllConsumerGroupName() {
ConcurrentMap<String, String> concurrentMap = CACHE.asMap();
return new HashSet<>(concurrentMap.values());
@ -57,10 +57,12 @@ public class CacheConsumerGroup implements Lifecycle {
* @return 缓存对象
*/
public static synchronized void addOrUpdate(String groupName) {
LogUtils.info(log, "add consumer cache. groupName:[{}]", groupName);
CACHE.put(groupName, groupName);
}
public static void remove(String groupName) {
LogUtils.info(log, "Remove consumer cache. groupName:[{}]", groupName);
CACHE.invalidate(groupName);
}

View File

@ -11,8 +11,10 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
@ -99,13 +101,29 @@ public class CacheRegisterTable implements Lifecycle {
.map(RegisterNodeInfo::getHostId).collect(Collectors.toSet());
}
/**
* 刷新过期时间若不存在则初始化
*
* @param groupName 组名称
*/
public static synchronized void refreshExpireAt(String groupName, ServerNode serverNode) {
RegisterNodeInfo registerNodeInfo = getServerNode(groupName, serverNode.getHostId());
// 不存在则初始化
if (Objects.isNull(registerNodeInfo)) {
LogUtils.warn(log, "node not exists. groupName:[{}] hostId:[{}]", groupName, serverNode.getHostId());
} else {
// 存在则刷新过期时间
registerNodeInfo.setExpireAt(serverNode.getExpireAt());
}
}
/**
* 无缓存时添加 有缓存时更新
*
* @return 缓存对象
*/
public static synchronized void addOrUpdate(String groupName, ServerNode serverNode) {
ConcurrentMap<String, RegisterNodeInfo> concurrentMap = CACHE.getIfPresent(groupName);
RegisterNodeInfo registerNodeInfo;
if (Objects.isNull(concurrentMap)) {
@ -119,7 +137,7 @@ public class CacheRegisterTable implements Lifecycle {
registerNodeInfo.setExpireAt(serverNode.getExpireAt());
}
LogUtils.debug(log, "Update cache. groupName:[{}] hostId:[{}] hostIp:[{}] expireAt:[{}]", groupName,
LogUtils.info(log, "Update cache. groupName:[{}] hostId:[{}] hostIp:[{}] expireAt:[{}]", groupName,
serverNode.getHostId(), serverNode.getHostIp(), serverNode.getExpireAt());
concurrentMap.put(serverNode.getHostId(), registerNodeInfo);

View File

@ -147,7 +147,7 @@ public class DispatchService implements Lifecycle {
* @return {@link GroupConfig} 组上下文
*/
private Set<String> getCurrentHostGroupList() {
return CacheConsumerGroup.getAllPods();
return CacheConsumerGroup.getAllConsumerGroupName();
}
@Override

View File

@ -57,7 +57,7 @@ public class ScanCallbackGroupActor extends AbstractScanGroup {
.withStopStrategy(StopStrategies.stopException())
.withStopStrategy(StopStrategies.stopResultStatus())
.withWaitStrategy(getWaitWaitStrategy())
.withFilterStrategy(FilterStrategies.delayLevelFilter())
.withFilterStrategy(FilterStrategies.triggerAtFilter())
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
.withFilterStrategy(FilterStrategies.sceneBlackFilter())
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilter())

View File

@ -60,7 +60,7 @@ public class ScanGroupActor extends AbstractScanGroup {
.withStopStrategy(StopStrategies.stopException())
.withStopStrategy(StopStrategies.stopResultStatusCode())
.withWaitStrategy(getWaitWaitStrategy(retryTask.getGroupName(), retryTask.getSceneName()))
.withFilterStrategy(FilterStrategies.delayLevelFilter())
.withFilterStrategy(FilterStrategies.triggerAtFilter())
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
.withFilterStrategy(FilterStrategies.sceneBlackFilter())
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilter())

View File

@ -1,5 +1,6 @@
package com.aizuda.easy.retry.server.support.handler;
import cn.hutool.core.lang.Opt;
import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.dto.RegisterNodeInfo;
@ -23,7 +24,10 @@ import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -133,6 +137,16 @@ public class ServerNodeBalance implements Lifecycle, Runnable {
}
}
private void refreshExpireAtCache(List<ServerNode> remotePods) {
// 刷新最新的节点注册信息
for (ServerNode node : remotePods) {
Optional.ofNullable(CacheRegisterTable.getServerNode(node.getGroupName(), node.getHostId())).ifPresent(registerNodeInfo -> {
registerNodeInfo.setExpireAt(node.getExpireAt());
});
}
}
private void refreshCache(List<ServerNode> remotePods) {
// 刷新最新的节点注册信息
@ -175,8 +189,8 @@ public class ServerNodeBalance implements Lifecycle, Runnable {
.eq(ServerNode::getNodeType, NodeTypeEnum.SERVER.getType()));
// 获取缓存中的节点
ConcurrentMap<String/*hostId*/, RegisterNodeInfo> concurrentMap = CacheRegisterTable
.get(ServerRegister.GROUP_NAME);
ConcurrentMap<String/*hostId*/, RegisterNodeInfo> concurrentMap = Optional.ofNullable(CacheRegisterTable
.get(ServerRegister.GROUP_NAME)).orElse(new ConcurrentHashMap<>());
Set<String> remoteHostIds = remotePods.stream().map(ServerNode::getHostId).collect(Collectors.toSet());
@ -189,7 +203,7 @@ public class ServerNodeBalance implements Lifecycle, Runnable {
// 无缓存的节点触发refreshCache
if (CollectionUtils.isEmpty(concurrentMap)
// 节点数量不一致触发
|| isNodeSizeNotEqual(remotePods.size(), concurrentMap.size())
|| isNodeSizeNotEqual(concurrentMap.size(), remotePods.size())
// 若存在远程和本地缓存的组的数量不一致则触发rebalance
|| isGroupSizeNotEqual(removeGroupConfig, allGroup)
// 判断远程节点是不是和本地节点一致的如果不一致则重新分配
@ -212,8 +226,8 @@ public class ServerNodeBalance implements Lifecycle, Runnable {
} else {
// 重新刷新所有的缓存key
refreshCache(remotePods);
// 刷新过期时间
refreshExpireAtCache(remotePods);
// 再次获取最新的节点信息
concurrentMap = CacheRegisterTable

View File

@ -1,8 +1,6 @@
package com.aizuda.easy.retry.server.support.register;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.config.SystemProperties;
import com.aizuda.easy.retry.server.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.ServerNodeMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode;
import com.aizuda.easy.retry.server.support.Lifecycle;
@ -12,7 +10,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import java.time.LocalDateTime;
import java.util.Objects;
/**
* @author www.byteblogs.com
@ -39,8 +36,8 @@ public abstract class AbstractRegister implements Register, Lifecycle {
try {
serverNodeMapper.insertOrUpdate(serverNode);
// 刷新本地缓存
CacheRegisterTable.addOrUpdate(serverNode.getGroupName(), serverNode);
// 刷新本地缓存过期时间
CacheRegisterTable.refreshExpireAt(serverNode.getGroupName(), serverNode);
}catch (Exception e) {
LogUtils.error(log,"注册节点失败 groupName:[{}] hostIp:[{}]",
serverNode.getGroupName(), serverNode.getHostIp(), e);

View File

@ -3,11 +3,17 @@ package com.aizuda.easy.retry.server.support.register;
import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode;
import com.aizuda.easy.retry.server.support.cache.CacheConsumerGroup;
import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@ -70,8 +76,22 @@ public class ClientRegister extends AbstractRegister implements Runnable {
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
ServerNode serverNode = QUEUE.take();
refreshExpireAt(serverNode);
ServerNode serverNode = QUEUE.poll(5L, TimeUnit.SECONDS);
if (Objects.nonNull(serverNode)) {
refreshExpireAt(serverNode);
}
// 同步当前POD消费的组的节点信息
// netty的client只会注册到一个服务端若组分配的和client连接的不是一个POD则会导致当前POD没有其他客户端的注册信息
Set<String> allConsumerGroupName = CacheConsumerGroup.getAllConsumerGroupName();
if (!CollectionUtils.isEmpty(allConsumerGroupName)) {
List<ServerNode> serverNodes = serverNodeMapper.selectList(
new LambdaQueryWrapper<ServerNode>().in(ServerNode::getGroupName, allConsumerGroupName));
for (final ServerNode node : serverNodes) {
// 刷新全量本地缓存
CacheRegisterTable.addOrUpdate(node.getGroupName(), node);
}
}
}catch (InterruptedException e) {
LogUtils.error(log, "[{}] thread interrupt.", Thread.currentThread().getName());
} catch (Exception e) {

View File

@ -13,7 +13,6 @@ import com.aizuda.easy.retry.server.support.FilterStrategy;
import com.aizuda.easy.retry.server.support.IdempotentStrategy;
import com.aizuda.easy.retry.server.support.RetryContext;
import com.aizuda.easy.retry.server.support.cache.CacheGroupRateLimiter;
import com.aizuda.easy.retry.server.support.context.MaxAttemptsPersistenceRetryContext;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
@ -33,12 +32,12 @@ public class FilterStrategies {
}
/**
* 延迟等级的过滤策略
* 触发时间过滤策略
*
* @return {@link DelayLevelFilterStrategies} 延迟等级的过滤策略
* @return {@link TriggerAtFilterStrategies} 触发时间过滤策略
*/
public static FilterStrategy delayLevelFilter() {
return new DelayLevelFilterStrategies();
public static FilterStrategy triggerAtFilter() {
return new TriggerAtFilterStrategies();
}
/**
@ -87,11 +86,11 @@ public class FilterStrategies {
}
/**
* 延迟等级的过滤策略
* 触发时间过滤策略
* <p>
* 根据延迟等级的时间计算下次触发时间是否小于当前时间满足则返回true 否则返回false
*/
private static final class DelayLevelFilterStrategies implements FilterStrategy {
private static final class TriggerAtFilterStrategies implements FilterStrategy {
@Override
public boolean filter(RetryContext retryContext) {