diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/convert/RetryDeadLetterConverter.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/convert/RetryDeadLetterConverter.java index c84077c9..30e0a648 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/convert/RetryDeadLetterConverter.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/convert/RetryDeadLetterConverter.java @@ -2,9 +2,7 @@ package com.aizuda.easy.retry.server.service.convert; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; -import org.mapstruct.Mapper; -import org.mapstruct.Mapping; -import org.mapstruct.Mappings; +import org.mapstruct.*; import org.mapstruct.factory.Mappers; import java.util.List; @@ -19,10 +17,14 @@ public interface RetryDeadLetterConverter { RetryDeadLetterConverter INSTANCE = Mappers.getMapper(RetryDeadLetterConverter.class); + @Named("ignoreId") @Mappings({ - @Mapping(source = "id", target = "id", ignore = true), - @Mapping(source = "createDt", target = "createDt", ignore = true) + @Mapping(target = "id", ignore = true), + @Mapping(target = "createDt", ignore = true) }) + RetryDeadLetter toRetryDeadLetter(RetryTask retryTasks); + + @IterableMapping(qualifiedByName = "ignoreId") List toRetryDeadLetter(List retryTasks); } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryServiceImpl.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryServiceImpl.java index 5e8c0821..a9f3864a 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryServiceImpl.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryServiceImpl.java @@ -237,7 +237,10 @@ public class RetryServiceImpl implements RetryService { } List retryDeadLetters = RetryDeadLetterConverter.INSTANCE.toRetryDeadLetter(retryTasks); - + LocalDateTime now = LocalDateTime.now(); + for (RetryDeadLetter retryDeadLetter : retryDeadLetters) { + retryDeadLetter.setCreateDt(now); + } Assert.isTrue(retryDeadLetters.size() == accessTemplate .getRetryDeadLetterAccess().batchInsert(groupName, retryDeadLetters), () -> new EasyRetryServerException("插入死信队列失败 [{}]", JsonUtil.toJsonString(retryDeadLetters))); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/allocate/server/AllocateMessageQueueConsistentHash.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/allocate/server/AllocateMessageQueueConsistentHash.java index 956837a9..76f7273e 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/allocate/server/AllocateMessageQueueConsistentHash.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/allocate/server/AllocateMessageQueueConsistentHash.java @@ -58,10 +58,10 @@ public class AllocateMessageQueueConsistentHash implements ServerLoadBalance { throw new IllegalArgumentException("currentCID is empty"); } if (CollectionUtils.isEmpty(groupList)) { - throw new IllegalArgumentException("mqAll is null or mqAll empty"); + throw new IllegalArgumentException("groupList is null or groupList empty"); } if (CollectionUtils.isEmpty(serverList)) { - throw new IllegalArgumentException("cidAll is null or cidAll empty"); + throw new IllegalArgumentException("serverList is null or serverList empty"); } List result = new ArrayList<>(); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerNodeBalance.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerNodeBalance.java index 24ec7eaa..1f1c4c38 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerNodeBalance.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ServerNodeBalance.java @@ -72,20 +72,22 @@ public class ServerNodeBalance implements Lifecycle, Runnable { if (CollectionUtils.isEmpty(podIpSet)) { LogUtils.error(log, "server node is empty"); - return; } Set allGroup = CacheGroup.getAllGroup(); if (CollectionUtils.isEmpty(allGroup)) { LogUtils.error(log, "group is empty"); + } + + // 删除本地缓存的所有组信息 + CacheConsumerGroup.clear(); + if(CollectionUtils.isEmpty(podIpSet) || CollectionUtils.isEmpty(allGroup)) { return; } List allocate = new AllocateMessageQueueConsistentHash() .allocate(ServerRegister.CURRENT_CID, new ArrayList<>(allGroup), new ArrayList<>(podIpSet)); - // 删除本地缓存的所有组信息 - CacheConsumerGroup.clear(); // 重新覆盖本地分配的组信息 for (String groupName : allocate) { CacheConsumerGroup.addOrUpdate(groupName);