feat:2.3.0
1. 修复当一个组由打开到关闭,已分配的组未能清除问题 2. 修复迁移死信队列id重复问题
This commit is contained in:
parent
8121126cdf
commit
408a0f2699
@ -2,9 +2,7 @@ package com.aizuda.easy.retry.server.service.convert;
|
|||||||
|
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
||||||
import org.mapstruct.Mapper;
|
import org.mapstruct.*;
|
||||||
import org.mapstruct.Mapping;
|
|
||||||
import org.mapstruct.Mappings;
|
|
||||||
import org.mapstruct.factory.Mappers;
|
import org.mapstruct.factory.Mappers;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -19,10 +17,14 @@ public interface RetryDeadLetterConverter {
|
|||||||
|
|
||||||
RetryDeadLetterConverter INSTANCE = Mappers.getMapper(RetryDeadLetterConverter.class);
|
RetryDeadLetterConverter INSTANCE = Mappers.getMapper(RetryDeadLetterConverter.class);
|
||||||
|
|
||||||
|
@Named("ignoreId")
|
||||||
@Mappings({
|
@Mappings({
|
||||||
@Mapping(source = "id", target = "id", ignore = true),
|
@Mapping(target = "id", ignore = true),
|
||||||
@Mapping(source = "createDt", target = "createDt", ignore = true)
|
@Mapping(target = "createDt", ignore = true)
|
||||||
})
|
})
|
||||||
|
RetryDeadLetter toRetryDeadLetter(RetryTask retryTasks);
|
||||||
|
|
||||||
|
@IterableMapping(qualifiedByName = "ignoreId")
|
||||||
List<RetryDeadLetter> toRetryDeadLetter(List<RetryTask> retryTasks);
|
List<RetryDeadLetter> toRetryDeadLetter(List<RetryTask> retryTasks);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -237,7 +237,10 @@ public class RetryServiceImpl implements RetryService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
List<RetryDeadLetter> retryDeadLetters = RetryDeadLetterConverter.INSTANCE.toRetryDeadLetter(retryTasks);
|
List<RetryDeadLetter> retryDeadLetters = RetryDeadLetterConverter.INSTANCE.toRetryDeadLetter(retryTasks);
|
||||||
|
LocalDateTime now = LocalDateTime.now();
|
||||||
|
for (RetryDeadLetter retryDeadLetter : retryDeadLetters) {
|
||||||
|
retryDeadLetter.setCreateDt(now);
|
||||||
|
}
|
||||||
Assert.isTrue(retryDeadLetters.size() == accessTemplate
|
Assert.isTrue(retryDeadLetters.size() == accessTemplate
|
||||||
.getRetryDeadLetterAccess().batchInsert(groupName, retryDeadLetters),
|
.getRetryDeadLetterAccess().batchInsert(groupName, retryDeadLetters),
|
||||||
() -> new EasyRetryServerException("插入死信队列失败 [{}]", JsonUtil.toJsonString(retryDeadLetters)));
|
() -> new EasyRetryServerException("插入死信队列失败 [{}]", JsonUtil.toJsonString(retryDeadLetters)));
|
||||||
|
@ -58,10 +58,10 @@ public class AllocateMessageQueueConsistentHash implements ServerLoadBalance {
|
|||||||
throw new IllegalArgumentException("currentCID is empty");
|
throw new IllegalArgumentException("currentCID is empty");
|
||||||
}
|
}
|
||||||
if (CollectionUtils.isEmpty(groupList)) {
|
if (CollectionUtils.isEmpty(groupList)) {
|
||||||
throw new IllegalArgumentException("mqAll is null or mqAll empty");
|
throw new IllegalArgumentException("groupList is null or groupList empty");
|
||||||
}
|
}
|
||||||
if (CollectionUtils.isEmpty(serverList)) {
|
if (CollectionUtils.isEmpty(serverList)) {
|
||||||
throw new IllegalArgumentException("cidAll is null or cidAll empty");
|
throw new IllegalArgumentException("serverList is null or serverList empty");
|
||||||
}
|
}
|
||||||
|
|
||||||
List<String> result = new ArrayList<>();
|
List<String> result = new ArrayList<>();
|
||||||
|
@ -72,20 +72,22 @@ public class ServerNodeBalance implements Lifecycle, Runnable {
|
|||||||
|
|
||||||
if (CollectionUtils.isEmpty(podIpSet)) {
|
if (CollectionUtils.isEmpty(podIpSet)) {
|
||||||
LogUtils.error(log, "server node is empty");
|
LogUtils.error(log, "server node is empty");
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Set<String> allGroup = CacheGroup.getAllGroup();
|
Set<String> allGroup = CacheGroup.getAllGroup();
|
||||||
if (CollectionUtils.isEmpty(allGroup)) {
|
if (CollectionUtils.isEmpty(allGroup)) {
|
||||||
LogUtils.error(log, "group is empty");
|
LogUtils.error(log, "group is empty");
|
||||||
|
}
|
||||||
|
|
||||||
|
// 删除本地缓存的所有组信息
|
||||||
|
CacheConsumerGroup.clear();
|
||||||
|
if(CollectionUtils.isEmpty(podIpSet) || CollectionUtils.isEmpty(allGroup)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
List<String> allocate = new AllocateMessageQueueConsistentHash()
|
List<String> allocate = new AllocateMessageQueueConsistentHash()
|
||||||
.allocate(ServerRegister.CURRENT_CID, new ArrayList<>(allGroup), new ArrayList<>(podIpSet));
|
.allocate(ServerRegister.CURRENT_CID, new ArrayList<>(allGroup), new ArrayList<>(podIpSet));
|
||||||
|
|
||||||
// 删除本地缓存的所有组信息
|
|
||||||
CacheConsumerGroup.clear();
|
|
||||||
// 重新覆盖本地分配的组信息
|
// 重新覆盖本地分配的组信息
|
||||||
for (String groupName : allocate) {
|
for (String groupName : allocate) {
|
||||||
CacheConsumerGroup.addOrUpdate(groupName);
|
CacheConsumerGroup.addOrUpdate(groupName);
|
||||||
|
Loading…
Reference in New Issue
Block a user