From a320cbb266b34874bc1acba8e2e30c73c563b6f2 Mon Sep 17 00:00:00 2001
From: byteblogs168 <598092184@qq.com>
Date: Tue, 25 Jul 2023 18:56:41 +0800
Subject: [PATCH 1/4] =?UTF-8?q?feat:=202.0.3=201.=20=E4=BF=AE=E5=A4=8D?=
=?UTF-8?q?=E5=9B=9E=E8=B0=83=E7=8A=B6=E6=80=81=E9=94=99=E8=AF=AF=E9=97=AE?=
=?UTF-8?q?=E9=A2=98?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../retry/server/config/SystemProperties.java | 7 +-
.../convert/RetryDeadLetterConverter.java | 28 ++++
.../server/service/impl/RetryServiceImpl.java | 139 ++++++++++++------
.../actor/exec/ExecCallbackUnitActor.java | 30 +++-
.../handler/CallbackRetryTaskHandler.java | 40 ++++-
5 files changed, 183 insertions(+), 61 deletions(-)
create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/convert/RetryDeadLetterConverter.java
diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/SystemProperties.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/SystemProperties.java
index 33d523189..04baaf14b 100644
--- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/SystemProperties.java
+++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/SystemProperties.java
@@ -41,8 +41,7 @@ public class SystemProperties {
private int limiter = 10;
/**
- * 号段模式下步长配置
- * 默认100
+ * 号段模式下步长配置 默认100
*/
private int step = 100;
@@ -63,9 +62,9 @@ public class SystemProperties {
public static class Callback {
/**
- * 回调id前缀
+ * 回调uniqueId前缀
*/
- String prefix = "CB_";
+ String prefix = "CB";
/**
* 回调的最大执行次数
diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/convert/RetryDeadLetterConverter.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/convert/RetryDeadLetterConverter.java
new file mode 100644
index 000000000..affd1d319
--- /dev/null
+++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/convert/RetryDeadLetterConverter.java
@@ -0,0 +1,28 @@
+package com.aizuda.easy.retry.server.service.convert;
+
+import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryDeadLetter;
+import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
+import org.mapstruct.Mapper;
+import org.mapstruct.Mapping;
+import org.mapstruct.Mappings;
+import org.mapstruct.factory.Mappers;
+
+import java.util.List;
+
+/**
+ * @author: www.byteblogs.com
+ * @date : 2023-07-25 12:35
+ * @since 2.0.3
+ */
+@Mapper
+public interface RetryDeadLetterConverter {
+
+ RetryDeadLetterConverter INSTANCE = Mappers.getMapper(RetryDeadLetterConverter.class);
+
+ @Mappings({
+ @Mapping(source = "id", target = "id", ignore = true),
+ @Mapping(source = "createDt", target = "createDt", ignore = true)
+ })
+ List toRetryDeadLetter(List retryTasks);
+
+}
diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryServiceImpl.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryServiceImpl.java
index 9223c074a..807244f1d 100644
--- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryServiceImpl.java
+++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryServiceImpl.java
@@ -1,6 +1,7 @@
package com.aizuda.easy.retry.server.service.impl;
import cn.hutool.core.lang.Assert;
+import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.server.enums.DelayLevelEnum;
import com.aizuda.easy.retry.server.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
@@ -18,6 +19,7 @@ import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTaskLog;
import com.aizuda.easy.retry.server.persistence.mybatis.po.SceneConfig;
import com.aizuda.easy.retry.server.persistence.support.ConfigAccess;
import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess;
+import com.aizuda.easy.retry.server.service.convert.RetryDeadLetterConverter;
import com.aizuda.easy.retry.server.service.convert.RetryTaskLogConverter;
import com.aizuda.easy.retry.server.support.generator.IdGenerator;
import com.aizuda.easy.retry.server.support.strategy.WaitStrategies;
@@ -28,6 +30,7 @@ import com.aizuda.easy.retry.server.config.RequestDataHelper;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.service.RetryService;
import com.aizuda.easy.retry.server.service.convert.RetryTaskConverter;
+import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.BeanUtils;
@@ -39,8 +42,10 @@ import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -76,16 +81,20 @@ public class RetryServiceImpl implements RetryService {
public Boolean reportRetry(RetryTaskDTO retryTaskDTO) {
LogUtils.warn(log, "received report data [{}]", JsonUtil.toJsonString(retryTaskDTO));
- SceneConfig sceneConfig = configAccess.getSceneConfigByGroupNameAndSceneName(retryTaskDTO.getGroupName(), retryTaskDTO.getSceneName());
+ SceneConfig sceneConfig = configAccess.getSceneConfigByGroupNameAndSceneName(retryTaskDTO.getGroupName(),
+ retryTaskDTO.getSceneName());
if (Objects.isNull(sceneConfig)) {
GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(retryTaskDTO.getGroupName());
if (Objects.isNull(groupConfig)) {
- throw new EasyRetryServerException("failed to report data, no group configuration found. groupName:[{}]", retryTaskDTO.getGroupName());
+ throw new EasyRetryServerException(
+ "failed to report data, no group configuration found. groupName:[{}]", retryTaskDTO.getGroupName());
}
if (groupConfig.getInitScene().equals(StatusEnum.NO.getStatus())) {
- throw new EasyRetryServerException("failed to report data, no scene configuration found. groupName:[{}] sceneName:[{}]", retryTaskDTO.getGroupName(), retryTaskDTO.getSceneName());
+ throw new EasyRetryServerException(
+ "failed to report data, no scene configuration found. groupName:[{}] sceneName:[{}]",
+ retryTaskDTO.getGroupName(), retryTaskDTO.getSceneName());
} else {
// 若配置了默认初始化场景配置,则发现上报数据的时候未配置场景,默认生成一个场景
initScene(retryTaskDTO);
@@ -95,11 +104,11 @@ public class RetryServiceImpl implements RetryService {
RequestDataHelper.setPartition(retryTaskDTO.getGroupName());
// 此处做幂等处理,避免客户端重复多次上报
long count = retryTaskMapper.selectCount(new LambdaQueryWrapper()
- .eq(RetryTask::getIdempotentId, retryTaskDTO.getIdempotentId())
- .eq(RetryTask::getGroupName, retryTaskDTO.getGroupName())
- .eq(RetryTask::getSceneName, retryTaskDTO.getSceneName())
- .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())
- );
+ .eq(RetryTask::getIdempotentId, retryTaskDTO.getIdempotentId())
+ .eq(RetryTask::getGroupName, retryTaskDTO.getGroupName())
+ .eq(RetryTask::getSceneName, retryTaskDTO.getSceneName())
+ .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())
+ );
if (0 < count) {
LogUtils.warn(log, "interrupted reporting in retrying task. [{}]", JsonUtil.toJsonString(retryTaskDTO));
return Boolean.TRUE;
@@ -116,25 +125,25 @@ public class RetryServiceImpl implements RetryService {
retryTask.setExtAttrs(StringUtils.EMPTY);
}
- retryTask.setNextTriggerAt(WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null));
+ retryTask.setNextTriggerAt(
+ WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null));
- Assert.isTrue(1 == retryTaskAccess.saveRetryTask(retryTask), () -> new EasyRetryServerException("failed to report data"));
+ Assert.isTrue(1 == retryTaskAccess.saveRetryTask(retryTask),
+ () -> new EasyRetryServerException("failed to report data"));
// 初始化日志
RetryTaskLog retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTask(retryTask);
retryTaskLog.setTaskType(TaskTypeEnum.RETRY.getType());
retryTaskLog.setCreateDt(now);
- Assert.isTrue(1 == retryTaskLogMapper.insert(retryTaskLog),
+ Assert.isTrue(1 == retryTaskLogMapper.insert(retryTaskLog),
() -> new EasyRetryServerException("新增重试日志失败"));
return Boolean.TRUE;
}
/**
- * 若配置了默认初始化场景配置,则发现上报数据的时候未配置场景,默认生成一个场景
- * backOff(退避策略): 等级策略
- * maxRetryCount(最大重试次数): 26
- * triggerInterval(间隔时间): see: {@link DelayLevelEnum}
+ * 若配置了默认初始化场景配置,则发现上报数据的时候未配置场景,默认生成一个场景 backOff(退避策略): 等级策略 maxRetryCount(最大重试次数): 26 triggerInterval(间隔时间): see:
+ * {@link DelayLevelEnum}
*
* @param retryTaskDTO 重试上报DTO
*/
@@ -147,7 +156,8 @@ public class RetryServiceImpl implements RetryService {
sceneConfig.setBackOff(WaitStrategyEnum.DELAY_LEVEL.getBackOff());
sceneConfig.setMaxRetryCount(DelayLevelEnum._21.getLevel());
sceneConfig.setDescription("自动初始化场景");
- Assert.isTrue(1 == sceneConfigMapper.insert(sceneConfig), () -> new EasyRetryServerException("init scene error"));
+ Assert.isTrue(1 == sceneConfigMapper.insert(sceneConfig),
+ () -> new EasyRetryServerException("init scene error"));
}
@Transactional
@@ -159,18 +169,72 @@ public class RetryServiceImpl implements RetryService {
@Transactional
@Override
- public Boolean moveDeadLetterAndDelFinish(String groupId) {
+ public Boolean moveDeadLetterAndDelFinish(String groupName) {
- // 清除重试完成的数据
- clearFinishRetryData(groupId);
+ RequestDataHelper.setPartition(groupName);
+ List callbackRetryTasks = retryTaskMapper.selectPage(new PageDTO<>(0, 100),
+ new LambdaQueryWrapper()
+ .in(RetryTask::getRetryStatus, RetryStatusEnum.MAX_COUNT.getStatus(),
+ RetryStatusEnum.FINISH.getStatus())
+ .eq(RetryTask::getTaskType, TaskTypeEnum.CALLBACK.getType())
+ .eq(RetryTask::getGroupName, groupName)).getRecords();
- List retryTasks = retryTaskAccess.listRetryTaskByRetryCount(groupId, RetryStatusEnum.MAX_COUNT.getStatus());
- if (CollectionUtils.isEmpty(retryTasks)) {
+ if (CollectionUtils.isEmpty(callbackRetryTasks)) {
return Boolean.TRUE;
}
+ Set uniqueIdSet = callbackRetryTasks.stream().map(callbackTask -> {
+ String callbackTaskUniqueId = callbackTask.getUniqueId();
+ return callbackTaskUniqueId.substring(callbackTaskUniqueId.lastIndexOf(StrUtil.UNDERLINE) + 1);
+ }).collect(Collectors.toSet());
+
+ RequestDataHelper.setPartition(groupName);
+ List retryTasks = retryTaskMapper.selectList(new LambdaQueryWrapper()
+ .eq(RetryTask::getTaskType, TaskTypeEnum.RETRY.getType())
+ .in(RetryTask::getUniqueId, uniqueIdSet)
+ );
+
// 迁移重试失败的数据
- moveDeadLetters(groupId, retryTasks);
+ List waitMoveDeadLetters = new ArrayList<>();
+ List maxCountRetryTaskList = retryTasks.stream()
+ .filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.MAX_COUNT.getStatus())).collect(
+ Collectors.toList());
+ if (!CollectionUtils.isEmpty(maxCountRetryTaskList)) {
+ waitMoveDeadLetters.addAll(maxCountRetryTaskList);
+ }
+
+ List maxCountCallbackRetryTaskList = callbackRetryTasks.stream()
+ .filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.MAX_COUNT.getStatus())).collect(
+ Collectors.toList());
+
+ if (!CollectionUtils.isEmpty(maxCountRetryTaskList)) {
+ waitMoveDeadLetters.addAll(maxCountCallbackRetryTaskList);
+ }
+
+ moveDeadLetters(groupName, waitMoveDeadLetters);
+
+ // 删除重试完成的数据
+ Set waitDelDeadLetters = new HashSet<>();
+ Set finishRetryIdList = retryTasks.stream()
+ .filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.FINISH.getStatus()))
+ .map(RetryTask::getId)
+ .collect(Collectors.toSet());
+ if (!CollectionUtils.isEmpty(finishRetryIdList)) {
+ waitDelDeadLetters.addAll(finishRetryIdList);
+ }
+
+ Set finishCallbackRetryIdList = callbackRetryTasks.stream()
+ .filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.FINISH.getStatus()))
+ .map(RetryTask::getId)
+ .collect(Collectors.toSet());
+
+ // 迁移重试失败的数据
+ if (!CollectionUtils.isEmpty(finishCallbackRetryIdList)) {
+ waitDelDeadLetters.addAll(finishCallbackRetryIdList);
+ }
+
+ Assert.isTrue(waitDelDeadLetters.size() == retryTaskMapper.deleteBatchIds(waitDelDeadLetters),
+ () -> new EasyRetryServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retryTasks)));
return Boolean.TRUE;
}
@@ -178,38 +242,21 @@ public class RetryServiceImpl implements RetryService {
/**
* 迁移死信队列数据
*
- * @param groupName 组id
+ * @param groupName 组id
* @param retryTasks 待迁移数据
*/
private void moveDeadLetters(String groupName, List retryTasks) {
- List retryDeadLetters = new ArrayList<>();
-
- for (RetryTask retryTask : retryTasks) {
- RetryDeadLetter retryDeadLetter = new RetryDeadLetter();
- BeanUtils.copyProperties(retryTask, retryDeadLetter);
- retryDeadLetter.setId(null);
- retryDeadLetter.setCreateDt(LocalDateTime.now());
- retryDeadLetters.add(retryDeadLetter);
- }
+ List retryDeadLetters = RetryDeadLetterConverter.INSTANCE.toRetryDeadLetter(retryTasks);
GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(groupName);
- Assert.isTrue(retryDeadLetters.size() == retryDeadLetterMapper.insertBatch(retryDeadLetters, groupConfig.getGroupPartition()),
- () -> new EasyRetryServerException("插入死信队列失败 [{}]" , JsonUtil.toJsonString(retryDeadLetters)));
+ Assert.isTrue(retryDeadLetters.size() == retryDeadLetterMapper.insertBatch(retryDeadLetters,
+ groupConfig.getGroupPartition()),
+ () -> new EasyRetryServerException("插入死信队列失败 [{}]", JsonUtil.toJsonString(retryDeadLetters)));
List ids = retryTasks.stream().map(RetryTask::getId).collect(Collectors.toList());
- Assert.isTrue(retryTasks.size() == retryTaskMapper.deleteBatch(ids, groupConfig.getGroupPartition()),
- () -> new EasyRetryServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retryTasks)));
- }
-
- /**
- * 请求已完成的重试数据
- *
- * @param groupId 组id
- */
- private void clearFinishRetryData(String groupId) {
- // 将已经重试完成的数据删除
- retryTaskAccess.deleteByDelayLevel(groupId, RetryStatusEnum.FINISH.getStatus());
+ Assert.isTrue(retryTasks.size() == retryTaskMapper.deleteBatch(ids, groupConfig.getGroupPartition()),
+ () -> new EasyRetryServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retryTasks)));
}
/**
diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/exec/ExecCallbackUnitActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/exec/ExecCallbackUnitActor.java
index 5259dd2df..61f19767d 100644
--- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/exec/ExecCallbackUnitActor.java
+++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/exec/ExecCallbackUnitActor.java
@@ -2,20 +2,28 @@ package com.aizuda.easy.retry.server.support.dispatch.actor.exec;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
+import cn.hutool.core.lang.Assert;
+import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.client.model.RetryCallbackDTO;
import com.aizuda.easy.retry.server.akka.ActorGenerator;
import com.aizuda.easy.retry.server.client.RequestBuilder;
import com.aizuda.easy.retry.server.client.RpcClient;
+import com.aizuda.easy.retry.server.config.RequestDataHelper;
+import com.aizuda.easy.retry.server.config.SystemProperties;
import com.aizuda.easy.retry.server.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
+import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
+import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
import com.aizuda.easy.retry.server.support.IdempotentStrategy;
import com.aizuda.easy.retry.server.support.context.CallbackRetryContext;
import com.aizuda.easy.retry.server.support.dispatch.actor.log.RetryTaskLogDTO;
+import com.aizuda.easy.retry.server.support.handler.CallbackRetryTaskHandler;
import com.aizuda.easy.retry.server.support.retry.RetryExecutor;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
@@ -25,6 +33,7 @@ import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
+import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
@@ -45,6 +54,10 @@ public class ExecCallbackUnitActor extends AbstractActor {
@Autowired
@Qualifier("bitSetIdempotentStrategyHandler")
private IdempotentStrategy idempotentStrategy;
+ @Autowired
+ private RetryTaskMapper retryTaskMapper;
+ @Autowired
+ private CallbackRetryTaskHandler callbackRetryTaskHandler;
@Override
public Receive createReceive() {
@@ -100,10 +113,17 @@ public class ExecCallbackUnitActor extends AbstractActor {
/**
* 调用客户端
*
- * @param retryTask {@link RetryTask} 需要重试的数据
+ * @param callbackTask {@link RetryTask} 回调任务
* @return 重试结果返回值
*/
- private Result callClient(RetryTask retryTask, RegisterNodeInfo serverNode) {
+ private Result callClient(RetryTask callbackTask, RegisterNodeInfo serverNode) {
+
+ String retryTaskUniqueId = callbackRetryTaskHandler.getRetryTaskUniqueId(callbackTask.getUniqueId());
+ RequestDataHelper.setPartition(callbackTask.getGroupName());
+ RetryTask retryTask = retryTaskMapper.selectOne(
+ new LambdaQueryWrapper().eq(RetryTask::getUniqueId, retryTaskUniqueId));
+ Assert.notNull(retryTask, () -> new EasyRetryServerException("未查询回调任务对应的重试任务. callbackUniqueId:[{}] uniqueId:[{}]",
+ callbackTask.getUniqueId(), retryTaskUniqueId));
// 回调参数
RetryCallbackDTO retryCallbackDTO = new RetryCallbackDTO();
@@ -115,12 +135,6 @@ public class ExecCallbackUnitActor extends AbstractActor {
retryCallbackDTO.setExecutorName(retryTask.getExecutorName());
retryCallbackDTO.setUniqueId(retryTask.getUniqueId());
-// HttpEntity requestEntity = new HttpEntity<>(retryCallbackDTO);
-//
-// String format = MessageFormat.format(URL, serverNode.getHostIp(), serverNode.getHostPort().toString(), serverNode.getContextPath());
-// Result result = restTemplate.postForObject(format, requestEntity, Result.class);
-
-
RpcClient rpcClient = RequestBuilder.newBuilder()
.hostPort(serverNode.getHostPort())
.groupName(serverNode.getGroupName())
diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/CallbackRetryTaskHandler.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/CallbackRetryTaskHandler.java
index 37de552eb..06063491e 100644
--- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/CallbackRetryTaskHandler.java
+++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/CallbackRetryTaskHandler.java
@@ -1,7 +1,7 @@
package com.aizuda.easy.retry.server.support.handler;
import cn.hutool.core.lang.Assert;
-import com.aizuda.easy.retry.common.core.constant.SystemConstants;
+import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.server.config.SystemProperties;
import com.aizuda.easy.retry.server.enums.TaskTypeEnum;
@@ -13,6 +13,8 @@ import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess;
import com.aizuda.easy.retry.server.service.convert.RetryTaskConverter;
import com.aizuda.easy.retry.server.service.convert.RetryTaskLogConverter;
import com.aizuda.easy.retry.server.support.strategy.WaitStrategies;
+import org.slf4j.helpers.FormattingTuple;
+import org.slf4j.helpers.MessageFormatter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
@@ -23,6 +25,8 @@ import java.util.concurrent.TimeUnit;
/**
+ * 回调数据处理器
+ *
* @author www.byteblogs.com
* @date 2023-06-04
* @since 1.5.0
@@ -30,6 +34,8 @@ import java.util.concurrent.TimeUnit;
@Component
public class CallbackRetryTaskHandler {
+ private static final String CALLBACK_UNIQUE_ID_RULE = "{}_{}";
+
@Autowired
@Qualifier("retryTaskAccessProcessor")
private RetryTaskAccess retryTaskAccess;
@@ -38,6 +44,11 @@ public class CallbackRetryTaskHandler {
@Autowired
private SystemProperties systemProperties;
+ /**
+ * 创建回调数据
+ *
+ * @param retryTask {@link RetryTask} 重试任务数据
+ */
@Transactional
public void create(RetryTask retryTask) {
if (!TaskTypeEnum.RETRY.getType().equals(retryTask.getTaskType())) {
@@ -45,10 +56,9 @@ public class CallbackRetryTaskHandler {
}
RetryTask callbackRetryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryTask);
-
callbackRetryTask.setTaskType(TaskTypeEnum.CALLBACK.getType());
callbackRetryTask.setId(null);
- callbackRetryTask.setUniqueId(systemProperties.getCallback().getPrefix() + retryTask.getUniqueId());
+ callbackRetryTask.setUniqueId(generatorCallbackUniqueId(retryTask.getUniqueId()));
callbackRetryTask.setRetryStatus(RetryStatusEnum.RUNNING.getStatus());
callbackRetryTask.setRetryCount(0);
callbackRetryTask.setCreateDt(LocalDateTime.now());
@@ -68,4 +78,28 @@ public class CallbackRetryTaskHandler {
}
+ /**
+ * 生成回调数据
+ *
+ * @param uniqueId 重试任务uniqueId
+ * @return 回调任务uniqueId
+ */
+ public String generatorCallbackUniqueId(String uniqueId) {
+ // eg: CB_202307180949471
+ FormattingTuple callbackUniqueId = MessageFormatter.arrayFormat(CALLBACK_UNIQUE_ID_RULE,
+ new Object[]{systemProperties.getCallback().getPrefix(), uniqueId});
+
+ return callbackUniqueId.getMessage();
+ }
+
+ /**
+ * 获取重试任务uniqueId
+ *
+ * @param callbackTaskUniqueId 回调任务uniqueId
+ * @return 重试任务uniqueId
+ */
+ public String getRetryTaskUniqueId(String callbackTaskUniqueId) {
+ return callbackTaskUniqueId.substring(callbackTaskUniqueId.lastIndexOf(StrUtil.UNDERLINE) + 1);
+ }
+
}
From e99ba2ef1eaa979c84ca6cc3e969470feba6d084 Mon Sep 17 00:00:00 2001
From: byteblogs168 <598092184@qq.com>
Date: Wed, 26 Jul 2023 09:28:57 +0800
Subject: [PATCH 2/4] =?UTF-8?q?feat:=202.0.3=201.=20=E4=BC=98=E5=8C=96?=
=?UTF-8?q?=E8=BF=81=E7=A7=BB=E4=BB=A3=E7=A0=81=E9=80=BB=E8=BE=91?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../easy/retry/server/service/impl/RetryServiceImpl.java | 5 ++++-
pom.xml | 2 +-
2 files changed, 5 insertions(+), 2 deletions(-)
diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryServiceImpl.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryServiceImpl.java
index 807244f1d..f3235f553 100644
--- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryServiceImpl.java
+++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryServiceImpl.java
@@ -33,7 +33,6 @@ import com.aizuda.easy.retry.server.service.convert.RetryTaskConverter;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
-import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
@@ -233,6 +232,7 @@ public class RetryServiceImpl implements RetryService {
waitDelDeadLetters.addAll(finishCallbackRetryIdList);
}
+ RequestDataHelper.setPartition(groupName);
Assert.isTrue(waitDelDeadLetters.size() == retryTaskMapper.deleteBatchIds(waitDelDeadLetters),
() -> new EasyRetryServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retryTasks)));
@@ -246,6 +246,9 @@ public class RetryServiceImpl implements RetryService {
* @param retryTasks 待迁移数据
*/
private void moveDeadLetters(String groupName, List retryTasks) {
+ if (CollectionUtils.isEmpty(retryTasks)) {
+ return;
+ }
List retryDeadLetters = RetryDeadLetterConverter.INSTANCE.toRetryDeadLetter(retryTasks);
diff --git a/pom.xml b/pom.xml
index a8a04d11e..7e4603fc6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,7 +21,7 @@
1.8
1.8
1.8
- 2.0.2
+ 2.0.3-SNAPSHOT
1.0.0
5.4.2.Final
4.1.48.Final
From 8cf7f885e6c346e2867a8bde3dca099e6482d796 Mon Sep 17 00:00:00 2001
From: byteblogs168 <598092184@qq.com>
Date: Thu, 27 Jul 2023 23:22:01 +0800
Subject: [PATCH 3/4] =?UTF-8?q?feat:=202.0.3=201.=20=E5=8F=91=E5=B8=832.0.?=
=?UTF-8?q?3?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
README.md | 14 ++++++++------
.../server/service/impl/RetryServiceImpl.java | 4 ++++
pom.xml | 2 +-
3 files changed, 13 insertions(+), 7 deletions(-)
diff --git a/README.md b/README.md
index 20963f2bd..bba94bfa9 100644
--- a/README.md
+++ b/README.md
@@ -7,7 +7,7 @@
- 分布式重试服务平台 Easy-Retry
+ 致力提高分布式业务系统一致性的分布式重试平台
@@ -41,14 +41,16 @@
## 相关链接
- [字节跳动: 如何优雅地重试](https://juejin.cn/post/6914091859463634951)
-- [文档](https://www.easyretry.com/pages/a2f161/)
-- [功能实例](https://www.easyretry.com/pages/960e25/)
+- [这款分布式重试组件,治好了我的重试强迫症!](https://juejin.cn/post/7249607108043145274)
+- [系统简介](https://www.easyretry.com/pages/d1d1da/)
+- [架构与功能](https://www.easyretry.com/pages/540554/)
+
## 原理
-- [客户端原理剖析](https://gitee.com/aizuda/easy-retry/tree/dev/example)
-- [服务端原理剖析](https://gitee.com/aizuda/easy-retry/tree/dev/example)
+- [场景应用](https://www.easyretry.com/pages/406a68/)
+- [HelloWorld](https://www.easyretry.com/pages/da9ecc/)
## 应用实例
-- [Spring-Boot](https://gitee.com/aizuda/easy-retry/tree/dev/example)
+- [easy-retry-demo](https://gitee.com/zhangyutongxue/easy-retry-demo)
## 期望
欢迎提出更好的意见,帮助完善 Easy-Retry
diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryServiceImpl.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryServiceImpl.java
index f3235f553..25126ba2c 100644
--- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryServiceImpl.java
+++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryServiceImpl.java
@@ -232,6 +232,10 @@ public class RetryServiceImpl implements RetryService {
waitDelDeadLetters.addAll(finishCallbackRetryIdList);
}
+ if (CollectionUtils.isEmpty(waitDelDeadLetters)) {
+ return Boolean.TRUE;
+ }
+
RequestDataHelper.setPartition(groupName);
Assert.isTrue(waitDelDeadLetters.size() == retryTaskMapper.deleteBatchIds(waitDelDeadLetters),
() -> new EasyRetryServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retryTasks)));
diff --git a/pom.xml b/pom.xml
index 7e4603fc6..1b4f141c8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,7 +21,7 @@
1.8
1.8
1.8
- 2.0.3-SNAPSHOT
+ 2.0.3
1.0.0
5.4.2.Final
4.1.48.Final
From 7489d4953066caf775f2afd295780176d06ff088 Mon Sep 17 00:00:00 2001
From: byteblogs168 <598092184@qq.com>
Date: Fri, 28 Jul 2023 11:10:56 +0800
Subject: [PATCH 4/4] =?UTF-8?q?feat:=202.0.3=201.=20=E6=B7=BB=E5=8A=A0?=
=?UTF-8?q?=E5=BC=95=E5=85=A5=E7=9A=84=E7=AE=97=E6=B3=95=E7=94=B3=E6=98=8E?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../com/aizuda/easy/retry/common/core/window/LeapArray.java | 3 ++-
.../easy/retry/server/support/generator/id/Segment.java | 4 ++++
.../retry/server/support/generator/id/SegmentBuffer.java | 4 +++-
.../server/support/generator/id/SegmentIdGenerator.java | 5 +++--
4 files changed, 12 insertions(+), 4 deletions(-)
diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/window/LeapArray.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/window/LeapArray.java
index d67a26943..4178f2463 100644
--- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/window/LeapArray.java
+++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/window/LeapArray.java
@@ -22,7 +22,8 @@ import java.util.concurrent.locks.ReentrantLock;
* @author Eric Zhao
* @author Carpenter Lee
*
- * see https://github.com/alibaba/Sentinel/blob/master/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArray.java
+ * 特别声明: LeapArray的设计实现是使用了Sentinelv1.8.0版本的的LeapArray
+ * see https://github.com/alibaba/Sentinel/blob/v1.8.0/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArray.java
*/
@Slf4j
public abstract class LeapArray {
diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/Segment.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/Segment.java
index 0cf2626f8..e297d3860 100644
--- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/Segment.java
+++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/Segment.java
@@ -2,6 +2,10 @@ package com.aizuda.easy.retry.server.support.generator.id;
import java.util.concurrent.atomic.AtomicLong;
+/**
+ * 特别声明: 此算法来自美团的leaf号段模式
+ * see: https://github.com/Meituan-Dianping/Leaf/blob/master/leaf-server/src/main/java/com/sankuai/inf/leaf/server/service/SegmentService.java
+ */
public class Segment {
private AtomicLong value = new AtomicLong(0);
private volatile long max;
diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentBuffer.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentBuffer.java
index adf5cabc1..7008e4654 100644
--- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentBuffer.java
+++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentBuffer.java
@@ -9,7 +9,9 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
- * 双buffer
+ * 特别声明: 此算法来自美团的leaf号段模式
+ * see: https://github.com/Meituan-Dianping/Leaf/blob/master/leaf-server/src/main/java/com/sankuai/inf/leaf/server/service/SegmentService.java
+ *
*/
@Data
public class SegmentBuffer {
diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentIdGenerator.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentIdGenerator.java
index edbb11104..c97a41d22 100644
--- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentIdGenerator.java
+++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentIdGenerator.java
@@ -32,9 +32,10 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/**
- * 此算法来自美团的leaf号段模式
+ * 特别声明: 此算法来自美团的leaf号段模式
+ * see: https://github.com/Meituan-Dianping/Leaf/blob/master/leaf-server/src/main/java/com/sankuai/inf/leaf/server/service/SegmentService.java
*
- * @author www.byteblogs.com
+ * @author www.byteblogs.com
* @date 2023-05-04
* @since 1.2.0
*/