Merge branch 'master' into dev_2.1.0

# Conflicts:
#	README.md
#	pom.xml
This commit is contained in:
byteblogs168 2023-07-28 17:43:12 +08:00
commit 5ccb7ad235
10 changed files with 211 additions and 65 deletions

View File

@ -7,7 +7,7 @@
</p> </p>
<p align="center"> <p align="center">
🔥🔥🔥基于BASE思想实现的分布式服务重试组件 致力提高分布式业务系统一致性的分布式重试平台
</p> </p>
@ -43,13 +43,16 @@
## 相关链接 ## 相关链接
- [字节跳动: 如何优雅地重试](https://juejin.cn/post/6914091859463634951) - [字节跳动: 如何优雅地重试](https://juejin.cn/post/6914091859463634951)
- [文档](https://www.easyretry.com/pages/d1d1da/) - [这款分布式重试组件,治好了我的重试强迫症!](https://juejin.cn/post/7249607108043145274)
- [HelloWorld](https://www.easyretry.com/pages/da9ecc/) - [系统简介](https://www.easyretry.com/pages/d1d1da/)
## 原理
- [架构与功能](https://www.easyretry.com/pages/540554/) - [架构与功能](https://www.easyretry.com/pages/540554/)
## 原理
- [场景应用](https://www.easyretry.com/pages/406a68/)
- [HelloWorld](https://www.easyretry.com/pages/da9ecc/)
## 应用实例 ## 应用实例
- [Spring-Boot](https://gitee.com/zhangyutongxue/easy-retry-demo) - [easy-retry-demo](https://gitee.com/zhangyutongxue/easy-retry-demo)
## 期望 ## 期望
欢迎提出更好的意见,帮助完善 Easy-Retry 欢迎提出更好的意见,帮助完善 Easy-Retry

View File

@ -22,7 +22,8 @@ import java.util.concurrent.locks.ReentrantLock;
* @author Eric Zhao * @author Eric Zhao
* @author Carpenter Lee * @author Carpenter Lee
* *
* see https://github.com/alibaba/Sentinel/blob/master/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArray.java * 特别声明: LeapArray的设计实现是使用了Sentinelv1.8.0版本的的LeapArray
* see https://github.com/alibaba/Sentinel/blob/v1.8.0/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArray.java
*/ */
@Slf4j @Slf4j
public abstract class LeapArray<T> { public abstract class LeapArray<T> {

View File

@ -42,8 +42,7 @@ public class SystemProperties {
private int limiter = 100; private int limiter = 100;
/** /**
* 号段模式下步长配置 * 号段模式下步长配置 默认100
* 默认100
*/ */
private int step = 100; private int step = 100;
@ -74,9 +73,9 @@ public class SystemProperties {
public static class Callback { public static class Callback {
/** /**
* 回调id前缀 * 回调uniqueId前缀
*/ */
String prefix = "CB_"; String prefix = "CB";
/** /**
* 回调的最大执行次数 * 回调的最大执行次数

View File

@ -0,0 +1,28 @@
package com.aizuda.easy.retry.server.service.convert;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryDeadLetter;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Mappings;
import org.mapstruct.factory.Mappers;
import java.util.List;
/**
* @author: www.byteblogs.com
* @date : 2023-07-25 12:35
* @since 2.0.3
*/
@Mapper
public interface RetryDeadLetterConverter {
RetryDeadLetterConverter INSTANCE = Mappers.getMapper(RetryDeadLetterConverter.class);
@Mappings({
@Mapping(source = "id", target = "id", ignore = true),
@Mapping(source = "createDt", target = "createDt", ignore = true)
})
List<RetryDeadLetter> toRetryDeadLetter(List<RetryTask> retryTasks);
}

View File

@ -1,6 +1,7 @@
package com.aizuda.easy.retry.server.service.impl; package com.aizuda.easy.retry.server.service.impl;
import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.server.enums.DelayLevelEnum; import com.aizuda.easy.retry.server.enums.DelayLevelEnum;
import com.aizuda.easy.retry.server.enums.StatusEnum; import com.aizuda.easy.retry.server.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.log.LogUtils;
@ -18,6 +19,7 @@ import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTaskLog;
import com.aizuda.easy.retry.server.persistence.mybatis.po.SceneConfig; import com.aizuda.easy.retry.server.persistence.mybatis.po.SceneConfig;
import com.aizuda.easy.retry.server.persistence.support.ConfigAccess; import com.aizuda.easy.retry.server.persistence.support.ConfigAccess;
import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess; import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess;
import com.aizuda.easy.retry.server.service.convert.RetryDeadLetterConverter;
import com.aizuda.easy.retry.server.service.convert.RetryTaskLogConverter; import com.aizuda.easy.retry.server.service.convert.RetryTaskLogConverter;
import com.aizuda.easy.retry.server.support.generator.IdGenerator; import com.aizuda.easy.retry.server.support.generator.IdGenerator;
import com.aizuda.easy.retry.server.support.strategy.WaitStrategies; import com.aizuda.easy.retry.server.support.strategy.WaitStrategies;
@ -28,9 +30,9 @@ import com.aizuda.easy.retry.server.config.RequestDataHelper;
import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.service.RetryService; import com.aizuda.easy.retry.server.service.RetryService;
import com.aizuda.easy.retry.server.service.convert.RetryTaskConverter; import com.aizuda.easy.retry.server.service.convert.RetryTaskConverter;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -39,8 +41,10 @@ import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -76,16 +80,20 @@ public class RetryServiceImpl implements RetryService {
public Boolean reportRetry(RetryTaskDTO retryTaskDTO) { public Boolean reportRetry(RetryTaskDTO retryTaskDTO) {
LogUtils.info(log, "received report data. <|>{}<|>", JsonUtil.toJsonString(retryTaskDTO)); LogUtils.info(log, "received report data. <|>{}<|>", JsonUtil.toJsonString(retryTaskDTO));
SceneConfig sceneConfig = configAccess.getSceneConfigByGroupNameAndSceneName(retryTaskDTO.getGroupName(), retryTaskDTO.getSceneName()); SceneConfig sceneConfig = configAccess.getSceneConfigByGroupNameAndSceneName(retryTaskDTO.getGroupName(),
retryTaskDTO.getSceneName());
if (Objects.isNull(sceneConfig)) { if (Objects.isNull(sceneConfig)) {
GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(retryTaskDTO.getGroupName()); GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(retryTaskDTO.getGroupName());
if (Objects.isNull(groupConfig)) { if (Objects.isNull(groupConfig)) {
throw new EasyRetryServerException("failed to report data, no group configuration found. groupName:[{}]", retryTaskDTO.getGroupName()); throw new EasyRetryServerException(
"failed to report data, no group configuration found. groupName:[{}]", retryTaskDTO.getGroupName());
} }
if (groupConfig.getInitScene().equals(StatusEnum.NO.getStatus())) { if (groupConfig.getInitScene().equals(StatusEnum.NO.getStatus())) {
throw new EasyRetryServerException("failed to report data, no scene configuration found. groupName:[{}] sceneName:[{}]", retryTaskDTO.getGroupName(), retryTaskDTO.getSceneName()); throw new EasyRetryServerException(
"failed to report data, no scene configuration found. groupName:[{}] sceneName:[{}]",
retryTaskDTO.getGroupName(), retryTaskDTO.getSceneName());
} else { } else {
// 若配置了默认初始化场景配置则发现上报数据的时候未配置场景默认生成一个场景 // 若配置了默认初始化场景配置则发现上报数据的时候未配置场景默认生成一个场景
initScene(retryTaskDTO); initScene(retryTaskDTO);
@ -95,11 +103,11 @@ public class RetryServiceImpl implements RetryService {
RequestDataHelper.setPartition(retryTaskDTO.getGroupName()); RequestDataHelper.setPartition(retryTaskDTO.getGroupName());
// 此处做幂等处理避免客户端重复多次上报 // 此处做幂等处理避免客户端重复多次上报
long count = retryTaskMapper.selectCount(new LambdaQueryWrapper<RetryTask>() long count = retryTaskMapper.selectCount(new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getIdempotentId, retryTaskDTO.getIdempotentId()) .eq(RetryTask::getIdempotentId, retryTaskDTO.getIdempotentId())
.eq(RetryTask::getGroupName, retryTaskDTO.getGroupName()) .eq(RetryTask::getGroupName, retryTaskDTO.getGroupName())
.eq(RetryTask::getSceneName, retryTaskDTO.getSceneName()) .eq(RetryTask::getSceneName, retryTaskDTO.getSceneName())
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()) .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())
); );
if (0 < count) { if (0 < count) {
LogUtils.warn(log, "interrupted reporting in retrying task. [{}]", JsonUtil.toJsonString(retryTaskDTO)); LogUtils.warn(log, "interrupted reporting in retrying task. [{}]", JsonUtil.toJsonString(retryTaskDTO));
return Boolean.TRUE; return Boolean.TRUE;
@ -116,25 +124,25 @@ public class RetryServiceImpl implements RetryService {
retryTask.setExtAttrs(StringUtils.EMPTY); retryTask.setExtAttrs(StringUtils.EMPTY);
} }
retryTask.setNextTriggerAt(WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null)); retryTask.setNextTriggerAt(
WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null));
Assert.isTrue(1 == retryTaskAccess.saveRetryTask(retryTask), () -> new EasyRetryServerException("failed to report data")); Assert.isTrue(1 == retryTaskAccess.saveRetryTask(retryTask),
() -> new EasyRetryServerException("failed to report data"));
// 初始化日志 // 初始化日志
RetryTaskLog retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTask(retryTask); RetryTaskLog retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTask(retryTask);
retryTaskLog.setTaskType(TaskTypeEnum.RETRY.getType()); retryTaskLog.setTaskType(TaskTypeEnum.RETRY.getType());
retryTaskLog.setCreateDt(now); retryTaskLog.setCreateDt(now);
Assert.isTrue(1 == retryTaskLogMapper.insert(retryTaskLog), Assert.isTrue(1 == retryTaskLogMapper.insert(retryTaskLog),
() -> new EasyRetryServerException("新增重试日志失败")); () -> new EasyRetryServerException("新增重试日志失败"));
return Boolean.TRUE; return Boolean.TRUE;
} }
/** /**
* 若配置了默认初始化场景配置则发现上报数据的时候未配置场景默认生成一个场景 * 若配置了默认初始化场景配置则发现上报数据的时候未配置场景默认生成一个场景 backOff(退避策略): 等级策略 maxRetryCount(最大重试次数): 26 triggerInterval(间隔时间): see:
* backOff(退避策略): 等级策略 * {@link DelayLevelEnum}
* maxRetryCount(最大重试次数): 26
* triggerInterval(间隔时间): see: {@link DelayLevelEnum}
* *
* @param retryTaskDTO 重试上报DTO * @param retryTaskDTO 重试上报DTO
*/ */
@ -147,7 +155,8 @@ public class RetryServiceImpl implements RetryService {
sceneConfig.setBackOff(WaitStrategyEnum.DELAY_LEVEL.getBackOff()); sceneConfig.setBackOff(WaitStrategyEnum.DELAY_LEVEL.getBackOff());
sceneConfig.setMaxRetryCount(DelayLevelEnum._21.getLevel()); sceneConfig.setMaxRetryCount(DelayLevelEnum._21.getLevel());
sceneConfig.setDescription("自动初始化场景"); sceneConfig.setDescription("自动初始化场景");
Assert.isTrue(1 == sceneConfigMapper.insert(sceneConfig), () -> new EasyRetryServerException("init scene error")); Assert.isTrue(1 == sceneConfigMapper.insert(sceneConfig),
() -> new EasyRetryServerException("init scene error"));
} }
@Transactional @Transactional
@ -159,18 +168,77 @@ public class RetryServiceImpl implements RetryService {
@Transactional @Transactional
@Override @Override
public Boolean moveDeadLetterAndDelFinish(String groupId) { public Boolean moveDeadLetterAndDelFinish(String groupName) {
// 清除重试完成的数据 RequestDataHelper.setPartition(groupName);
clearFinishRetryData(groupId); List<RetryTask> callbackRetryTasks = retryTaskMapper.selectPage(new PageDTO<>(0, 100),
new LambdaQueryWrapper<RetryTask>()
.in(RetryTask::getRetryStatus, RetryStatusEnum.MAX_COUNT.getStatus(),
RetryStatusEnum.FINISH.getStatus())
.eq(RetryTask::getTaskType, TaskTypeEnum.CALLBACK.getType())
.eq(RetryTask::getGroupName, groupName)).getRecords();
List<RetryTask> retryTasks = retryTaskAccess.listRetryTaskByRetryCount(groupId, RetryStatusEnum.MAX_COUNT.getStatus()); if (CollectionUtils.isEmpty(callbackRetryTasks)) {
if (CollectionUtils.isEmpty(retryTasks)) {
return Boolean.TRUE; return Boolean.TRUE;
} }
Set<String> uniqueIdSet = callbackRetryTasks.stream().map(callbackTask -> {
String callbackTaskUniqueId = callbackTask.getUniqueId();
return callbackTaskUniqueId.substring(callbackTaskUniqueId.lastIndexOf(StrUtil.UNDERLINE) + 1);
}).collect(Collectors.toSet());
RequestDataHelper.setPartition(groupName);
List<RetryTask> retryTasks = retryTaskMapper.selectList(new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getTaskType, TaskTypeEnum.RETRY.getType())
.in(RetryTask::getUniqueId, uniqueIdSet)
);
// 迁移重试失败的数据 // 迁移重试失败的数据
moveDeadLetters(groupId, retryTasks); List<RetryTask> waitMoveDeadLetters = new ArrayList<>();
List<RetryTask> maxCountRetryTaskList = retryTasks.stream()
.filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.MAX_COUNT.getStatus())).collect(
Collectors.toList());
if (!CollectionUtils.isEmpty(maxCountRetryTaskList)) {
waitMoveDeadLetters.addAll(maxCountRetryTaskList);
}
List<RetryTask> maxCountCallbackRetryTaskList = callbackRetryTasks.stream()
.filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.MAX_COUNT.getStatus())).collect(
Collectors.toList());
if (!CollectionUtils.isEmpty(maxCountRetryTaskList)) {
waitMoveDeadLetters.addAll(maxCountCallbackRetryTaskList);
}
moveDeadLetters(groupName, waitMoveDeadLetters);
// 删除重试完成的数据
Set<Long> waitDelDeadLetters = new HashSet<>();
Set<Long> finishRetryIdList = retryTasks.stream()
.filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.FINISH.getStatus()))
.map(RetryTask::getId)
.collect(Collectors.toSet());
if (!CollectionUtils.isEmpty(finishRetryIdList)) {
waitDelDeadLetters.addAll(finishRetryIdList);
}
Set<Long> finishCallbackRetryIdList = callbackRetryTasks.stream()
.filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.FINISH.getStatus()))
.map(RetryTask::getId)
.collect(Collectors.toSet());
// 迁移重试失败的数据
if (!CollectionUtils.isEmpty(finishCallbackRetryIdList)) {
waitDelDeadLetters.addAll(finishCallbackRetryIdList);
}
if (CollectionUtils.isEmpty(waitDelDeadLetters)) {
return Boolean.TRUE;
}
RequestDataHelper.setPartition(groupName);
Assert.isTrue(waitDelDeadLetters.size() == retryTaskMapper.deleteBatchIds(waitDelDeadLetters),
() -> new EasyRetryServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retryTasks)));
return Boolean.TRUE; return Boolean.TRUE;
} }
@ -178,38 +246,24 @@ public class RetryServiceImpl implements RetryService {
/** /**
* 迁移死信队列数据 * 迁移死信队列数据
* *
* @param groupName 组id * @param groupName 组id
* @param retryTasks 待迁移数据 * @param retryTasks 待迁移数据
*/ */
private void moveDeadLetters(String groupName, List<RetryTask> retryTasks) { private void moveDeadLetters(String groupName, List<RetryTask> retryTasks) {
if (CollectionUtils.isEmpty(retryTasks)) {
List<RetryDeadLetter> retryDeadLetters = new ArrayList<>(); return;
for (RetryTask retryTask : retryTasks) {
RetryDeadLetter retryDeadLetter = new RetryDeadLetter();
BeanUtils.copyProperties(retryTask, retryDeadLetter);
retryDeadLetter.setId(null);
retryDeadLetter.setCreateDt(LocalDateTime.now());
retryDeadLetters.add(retryDeadLetter);
} }
List<RetryDeadLetter> retryDeadLetters = RetryDeadLetterConverter.INSTANCE.toRetryDeadLetter(retryTasks);
GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(groupName); GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(groupName);
Assert.isTrue(retryDeadLetters.size() == retryDeadLetterMapper.insertBatch(retryDeadLetters, groupConfig.getGroupPartition()), Assert.isTrue(retryDeadLetters.size() == retryDeadLetterMapper.insertBatch(retryDeadLetters,
() -> new EasyRetryServerException("插入死信队列失败 [{}]" , JsonUtil.toJsonString(retryDeadLetters))); groupConfig.getGroupPartition()),
() -> new EasyRetryServerException("插入死信队列失败 [{}]", JsonUtil.toJsonString(retryDeadLetters)));
List<Long> ids = retryTasks.stream().map(RetryTask::getId).collect(Collectors.toList()); List<Long> ids = retryTasks.stream().map(RetryTask::getId).collect(Collectors.toList());
Assert.isTrue(retryTasks.size() == retryTaskMapper.deleteBatch(ids, groupConfig.getGroupPartition()), Assert.isTrue(retryTasks.size() == retryTaskMapper.deleteBatch(ids, groupConfig.getGroupPartition()),
() -> new EasyRetryServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retryTasks))); () -> new EasyRetryServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retryTasks)));
}
/**
* 请求已完成的重试数据
*
* @param groupId 组id
*/
private void clearFinishRetryData(String groupId) {
// 将已经重试完成的数据删除
retryTaskAccess.deleteByDelayLevel(groupId, RetryStatusEnum.FINISH.getStatus());
} }
/** /**

View File

@ -2,20 +2,28 @@ package com.aizuda.easy.retry.server.support.dispatch.actor.exec;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.client.model.RetryCallbackDTO; import com.aizuda.easy.retry.client.model.RetryCallbackDTO;
import com.aizuda.easy.retry.server.akka.ActorGenerator; import com.aizuda.easy.retry.server.akka.ActorGenerator;
import com.aizuda.easy.retry.server.client.RequestBuilder; import com.aizuda.easy.retry.server.client.RequestBuilder;
import com.aizuda.easy.retry.server.client.RpcClient; import com.aizuda.easy.retry.server.client.RpcClient;
import com.aizuda.easy.retry.server.config.RequestDataHelper;
import com.aizuda.easy.retry.server.config.SystemProperties;
import com.aizuda.easy.retry.server.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.enums.StatusEnum; import com.aizuda.easy.retry.server.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
import com.aizuda.easy.retry.server.support.IdempotentStrategy; import com.aizuda.easy.retry.server.support.IdempotentStrategy;
import com.aizuda.easy.retry.server.support.context.CallbackRetryContext; import com.aizuda.easy.retry.server.support.context.CallbackRetryContext;
import com.aizuda.easy.retry.server.support.dispatch.actor.log.RetryTaskLogDTO; import com.aizuda.easy.retry.server.support.dispatch.actor.log.RetryTaskLogDTO;
import com.aizuda.easy.retry.server.support.handler.CallbackRetryTaskHandler;
import com.aizuda.easy.retry.server.support.retry.RetryExecutor; import com.aizuda.easy.retry.server.support.retry.RetryExecutor;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -25,6 +33,7 @@ import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -45,6 +54,10 @@ public class ExecCallbackUnitActor extends AbstractActor {
@Autowired @Autowired
@Qualifier("bitSetIdempotentStrategyHandler") @Qualifier("bitSetIdempotentStrategyHandler")
private IdempotentStrategy<String, Integer> idempotentStrategy; private IdempotentStrategy<String, Integer> idempotentStrategy;
@Autowired
private RetryTaskMapper retryTaskMapper;
@Autowired
private CallbackRetryTaskHandler callbackRetryTaskHandler;
@Override @Override
public Receive createReceive() { public Receive createReceive() {
@ -100,10 +113,17 @@ public class ExecCallbackUnitActor extends AbstractActor {
/** /**
* 调用客户端 * 调用客户端
* *
* @param retryTask {@link RetryTask} 需要重试的数据 * @param callbackTask {@link RetryTask} 回调任务
* @return 重试结果返回值 * @return 重试结果返回值
*/ */
private Result callClient(RetryTask retryTask, RegisterNodeInfo serverNode) { private Result callClient(RetryTask callbackTask, RegisterNodeInfo serverNode) {
String retryTaskUniqueId = callbackRetryTaskHandler.getRetryTaskUniqueId(callbackTask.getUniqueId());
RequestDataHelper.setPartition(callbackTask.getGroupName());
RetryTask retryTask = retryTaskMapper.selectOne(
new LambdaQueryWrapper<RetryTask>().eq(RetryTask::getUniqueId, retryTaskUniqueId));
Assert.notNull(retryTask, () -> new EasyRetryServerException("未查询回调任务对应的重试任务. callbackUniqueId:[{}] uniqueId:[{}]",
callbackTask.getUniqueId(), retryTaskUniqueId));
// 回调参数 // 回调参数
RetryCallbackDTO retryCallbackDTO = new RetryCallbackDTO(); RetryCallbackDTO retryCallbackDTO = new RetryCallbackDTO();

View File

@ -2,6 +2,10 @@ package com.aizuda.easy.retry.server.support.generator.id;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
/**
* 特别声明: 此算法来自美团的leaf号段模式
* see https://github.com/Meituan-Dianping/Leaf/blob/master/leaf-server/src/main/java/com/sankuai/inf/leaf/server/service/SegmentService.java
*/
public class Segment { public class Segment {
private AtomicLong value = new AtomicLong(0); private AtomicLong value = new AtomicLong(0);
private volatile long max; private volatile long max;

View File

@ -9,7 +9,9 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
/** /**
* 双buffer * 特别声明: 此算法来自美团的leaf号段模式
* see https://github.com/Meituan-Dianping/Leaf/blob/master/leaf-server/src/main/java/com/sankuai/inf/leaf/server/service/SegmentService.java
*
*/ */
@Data @Data
public class SegmentBuffer { public class SegmentBuffer {

View File

@ -32,9 +32,10 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* 此算法来自美团的leaf号段模式 * 特别声明: 此算法来自美团的leaf号段模式
* see https://github.com/Meituan-Dianping/Leaf/blob/master/leaf-server/src/main/java/com/sankuai/inf/leaf/server/service/SegmentService.java
* *
* @author www.byteblogs.com * @author www.byteblogs.com
* @date 2023-05-04 * @date 2023-05-04
* @since 1.2.0 * @since 1.2.0
*/ */

View File

@ -1,7 +1,7 @@
package com.aizuda.easy.retry.server.support.handler; package com.aizuda.easy.retry.server.support.handler;
import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.constant.SystemConstants; import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.server.config.SystemProperties; import com.aizuda.easy.retry.server.config.SystemProperties;
import com.aizuda.easy.retry.server.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.enums.TaskTypeEnum;
@ -13,6 +13,8 @@ import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess;
import com.aizuda.easy.retry.server.service.convert.RetryTaskConverter; import com.aizuda.easy.retry.server.service.convert.RetryTaskConverter;
import com.aizuda.easy.retry.server.service.convert.RetryTaskLogConverter; import com.aizuda.easy.retry.server.service.convert.RetryTaskLogConverter;
import com.aizuda.easy.retry.server.support.strategy.WaitStrategies; import com.aizuda.easy.retry.server.support.strategy.WaitStrategies;
import org.slf4j.helpers.FormattingTuple;
import org.slf4j.helpers.MessageFormatter;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -23,6 +25,8 @@ import java.util.concurrent.TimeUnit;
/** /**
* 回调数据处理器
*
* @author www.byteblogs.com * @author www.byteblogs.com
* @date 2023-06-04 * @date 2023-06-04
* @since 1.5.0 * @since 1.5.0
@ -30,6 +34,8 @@ import java.util.concurrent.TimeUnit;
@Component @Component
public class CallbackRetryTaskHandler { public class CallbackRetryTaskHandler {
private static final String CALLBACK_UNIQUE_ID_RULE = "{}_{}";
@Autowired @Autowired
@Qualifier("retryTaskAccessProcessor") @Qualifier("retryTaskAccessProcessor")
private RetryTaskAccess<RetryTask> retryTaskAccess; private RetryTaskAccess<RetryTask> retryTaskAccess;
@ -38,6 +44,11 @@ public class CallbackRetryTaskHandler {
@Autowired @Autowired
private SystemProperties systemProperties; private SystemProperties systemProperties;
/**
* 创建回调数据
*
* @param retryTask {@link RetryTask} 重试任务数据
*/
@Transactional @Transactional
public void create(RetryTask retryTask) { public void create(RetryTask retryTask) {
if (!TaskTypeEnum.RETRY.getType().equals(retryTask.getTaskType())) { if (!TaskTypeEnum.RETRY.getType().equals(retryTask.getTaskType())) {
@ -45,10 +56,9 @@ public class CallbackRetryTaskHandler {
} }
RetryTask callbackRetryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryTask); RetryTask callbackRetryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryTask);
callbackRetryTask.setTaskType(TaskTypeEnum.CALLBACK.getType()); callbackRetryTask.setTaskType(TaskTypeEnum.CALLBACK.getType());
callbackRetryTask.setId(null); callbackRetryTask.setId(null);
callbackRetryTask.setUniqueId(systemProperties.getCallback().getPrefix() + retryTask.getUniqueId()); callbackRetryTask.setUniqueId(generatorCallbackUniqueId(retryTask.getUniqueId()));
callbackRetryTask.setRetryStatus(RetryStatusEnum.RUNNING.getStatus()); callbackRetryTask.setRetryStatus(RetryStatusEnum.RUNNING.getStatus());
callbackRetryTask.setRetryCount(0); callbackRetryTask.setRetryCount(0);
callbackRetryTask.setCreateDt(LocalDateTime.now()); callbackRetryTask.setCreateDt(LocalDateTime.now());
@ -68,4 +78,28 @@ public class CallbackRetryTaskHandler {
} }
/**
* 生成回调数据
*
* @param uniqueId 重试任务uniqueId
* @return 回调任务uniqueId
*/
public String generatorCallbackUniqueId(String uniqueId) {
// eg: CB_202307180949471
FormattingTuple callbackUniqueId = MessageFormatter.arrayFormat(CALLBACK_UNIQUE_ID_RULE,
new Object[]{systemProperties.getCallback().getPrefix(), uniqueId});
return callbackUniqueId.getMessage();
}
/**
* 获取重试任务uniqueId
*
* @param callbackTaskUniqueId 回调任务uniqueId
* @return 重试任务uniqueId
*/
public String getRetryTaskUniqueId(String callbackTaskUniqueId) {
return callbackTaskUniqueId.substring(callbackTaskUniqueId.lastIndexOf(StrUtil.UNDERLINE) + 1);
}
} }