diff --git a/README.md b/README.md index 06af831d5..e6d0f7df8 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@

- 🔥🔥🔥基于BASE思想实现的分布式服务重试组件 + 致力提高分布式业务系统一致性的分布式重试平台

@@ -43,13 +43,16 @@ ## 相关链接 - [字节跳动: 如何优雅地重试](https://juejin.cn/post/6914091859463634951) -- [文档](https://www.easyretry.com/pages/d1d1da/) -- [HelloWorld](https://www.easyretry.com/pages/da9ecc/) -## 原理 +- [这款分布式重试组件,治好了我的重试强迫症!](https://juejin.cn/post/7249607108043145274) +- [系统简介](https://www.easyretry.com/pages/d1d1da/) - [架构与功能](https://www.easyretry.com/pages/540554/) +## 原理 +- [场景应用](https://www.easyretry.com/pages/406a68/) +- [HelloWorld](https://www.easyretry.com/pages/da9ecc/) + ## 应用实例 -- [Spring-Boot](https://gitee.com/zhangyutongxue/easy-retry-demo) +- [easy-retry-demo](https://gitee.com/zhangyutongxue/easy-retry-demo) ## 期望 欢迎提出更好的意见,帮助完善 Easy-Retry diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/window/LeapArray.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/window/LeapArray.java index d67a26943..4178f2463 100644 --- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/window/LeapArray.java +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/window/LeapArray.java @@ -22,7 +22,8 @@ import java.util.concurrent.locks.ReentrantLock; * @author Eric Zhao * @author Carpenter Lee * - * see https://github.com/alibaba/Sentinel/blob/master/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArray.java + * 特别声明: LeapArray的设计实现是使用了Sentinelv1.8.0版本的的LeapArray + * see https://github.com/alibaba/Sentinel/blob/v1.8.0/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArray.java */ @Slf4j public abstract class LeapArray { diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/SystemProperties.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/SystemProperties.java index b10781d7a..a71bd2288 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/SystemProperties.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/SystemProperties.java @@ -42,8 +42,7 @@ public class SystemProperties { private int limiter = 100; /** - * 号段模式下步长配置 - * 默认100 + * 号段模式下步长配置 默认100 */ private int step = 100; @@ -74,9 +73,9 @@ public class SystemProperties { public static class Callback { /** - * 回调id前缀 + * 回调uniqueId前缀 */ - String prefix = "CB_"; + String prefix = "CB"; /** * 回调的最大执行次数 diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/convert/RetryDeadLetterConverter.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/convert/RetryDeadLetterConverter.java new file mode 100644 index 000000000..affd1d319 --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/convert/RetryDeadLetterConverter.java @@ -0,0 +1,28 @@ +package com.aizuda.easy.retry.server.service.convert; + +import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryDeadLetter; +import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; +import org.mapstruct.Mapper; +import org.mapstruct.Mapping; +import org.mapstruct.Mappings; +import org.mapstruct.factory.Mappers; + +import java.util.List; + +/** + * @author: www.byteblogs.com + * @date : 2023-07-25 12:35 + * @since 2.0.3 + */ +@Mapper +public interface RetryDeadLetterConverter { + + RetryDeadLetterConverter INSTANCE = Mappers.getMapper(RetryDeadLetterConverter.class); + + @Mappings({ + @Mapping(source = "id", target = "id", ignore = true), + @Mapping(source = "createDt", target = "createDt", ignore = true) + }) + List toRetryDeadLetter(List retryTasks); + +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryServiceImpl.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryServiceImpl.java index dd5e7e38e..136ec44a3 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryServiceImpl.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryServiceImpl.java @@ -1,6 +1,7 @@ package com.aizuda.easy.retry.server.service.impl; import cn.hutool.core.lang.Assert; +import cn.hutool.core.util.StrUtil; import com.aizuda.easy.retry.server.enums.DelayLevelEnum; import com.aizuda.easy.retry.server.enums.StatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; @@ -18,6 +19,7 @@ import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTaskLog; import com.aizuda.easy.retry.server.persistence.mybatis.po.SceneConfig; import com.aizuda.easy.retry.server.persistence.support.ConfigAccess; import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess; +import com.aizuda.easy.retry.server.service.convert.RetryDeadLetterConverter; import com.aizuda.easy.retry.server.service.convert.RetryTaskLogConverter; import com.aizuda.easy.retry.server.support.generator.IdGenerator; import com.aizuda.easy.retry.server.support.strategy.WaitStrategies; @@ -28,9 +30,9 @@ import com.aizuda.easy.retry.server.config.RequestDataHelper; import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.service.RetryService; import com.aizuda.easy.retry.server.service.convert.RetryTaskConverter; +import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; -import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; @@ -39,8 +41,10 @@ import org.springframework.util.CollectionUtils; import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -76,16 +80,20 @@ public class RetryServiceImpl implements RetryService { public Boolean reportRetry(RetryTaskDTO retryTaskDTO) { LogUtils.info(log, "received report data. <|>{}<|>", JsonUtil.toJsonString(retryTaskDTO)); - SceneConfig sceneConfig = configAccess.getSceneConfigByGroupNameAndSceneName(retryTaskDTO.getGroupName(), retryTaskDTO.getSceneName()); + SceneConfig sceneConfig = configAccess.getSceneConfigByGroupNameAndSceneName(retryTaskDTO.getGroupName(), + retryTaskDTO.getSceneName()); if (Objects.isNull(sceneConfig)) { GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(retryTaskDTO.getGroupName()); if (Objects.isNull(groupConfig)) { - throw new EasyRetryServerException("failed to report data, no group configuration found. groupName:[{}]", retryTaskDTO.getGroupName()); + throw new EasyRetryServerException( + "failed to report data, no group configuration found. groupName:[{}]", retryTaskDTO.getGroupName()); } if (groupConfig.getInitScene().equals(StatusEnum.NO.getStatus())) { - throw new EasyRetryServerException("failed to report data, no scene configuration found. groupName:[{}] sceneName:[{}]", retryTaskDTO.getGroupName(), retryTaskDTO.getSceneName()); + throw new EasyRetryServerException( + "failed to report data, no scene configuration found. groupName:[{}] sceneName:[{}]", + retryTaskDTO.getGroupName(), retryTaskDTO.getSceneName()); } else { // 若配置了默认初始化场景配置,则发现上报数据的时候未配置场景,默认生成一个场景 initScene(retryTaskDTO); @@ -95,11 +103,11 @@ public class RetryServiceImpl implements RetryService { RequestDataHelper.setPartition(retryTaskDTO.getGroupName()); // 此处做幂等处理,避免客户端重复多次上报 long count = retryTaskMapper.selectCount(new LambdaQueryWrapper() - .eq(RetryTask::getIdempotentId, retryTaskDTO.getIdempotentId()) - .eq(RetryTask::getGroupName, retryTaskDTO.getGroupName()) - .eq(RetryTask::getSceneName, retryTaskDTO.getSceneName()) - .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()) - ); + .eq(RetryTask::getIdempotentId, retryTaskDTO.getIdempotentId()) + .eq(RetryTask::getGroupName, retryTaskDTO.getGroupName()) + .eq(RetryTask::getSceneName, retryTaskDTO.getSceneName()) + .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()) + ); if (0 < count) { LogUtils.warn(log, "interrupted reporting in retrying task. [{}]", JsonUtil.toJsonString(retryTaskDTO)); return Boolean.TRUE; @@ -116,25 +124,25 @@ public class RetryServiceImpl implements RetryService { retryTask.setExtAttrs(StringUtils.EMPTY); } - retryTask.setNextTriggerAt(WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null)); + retryTask.setNextTriggerAt( + WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null)); - Assert.isTrue(1 == retryTaskAccess.saveRetryTask(retryTask), () -> new EasyRetryServerException("failed to report data")); + Assert.isTrue(1 == retryTaskAccess.saveRetryTask(retryTask), + () -> new EasyRetryServerException("failed to report data")); // 初始化日志 RetryTaskLog retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTask(retryTask); retryTaskLog.setTaskType(TaskTypeEnum.RETRY.getType()); retryTaskLog.setCreateDt(now); - Assert.isTrue(1 == retryTaskLogMapper.insert(retryTaskLog), + Assert.isTrue(1 == retryTaskLogMapper.insert(retryTaskLog), () -> new EasyRetryServerException("新增重试日志失败")); return Boolean.TRUE; } /** - * 若配置了默认初始化场景配置,则发现上报数据的时候未配置场景,默认生成一个场景 - * backOff(退避策略): 等级策略 - * maxRetryCount(最大重试次数): 26 - * triggerInterval(间隔时间): see: {@link DelayLevelEnum} + * 若配置了默认初始化场景配置,则发现上报数据的时候未配置场景,默认生成一个场景 backOff(退避策略): 等级策略 maxRetryCount(最大重试次数): 26 triggerInterval(间隔时间): see: + * {@link DelayLevelEnum} * * @param retryTaskDTO 重试上报DTO */ @@ -147,7 +155,8 @@ public class RetryServiceImpl implements RetryService { sceneConfig.setBackOff(WaitStrategyEnum.DELAY_LEVEL.getBackOff()); sceneConfig.setMaxRetryCount(DelayLevelEnum._21.getLevel()); sceneConfig.setDescription("自动初始化场景"); - Assert.isTrue(1 == sceneConfigMapper.insert(sceneConfig), () -> new EasyRetryServerException("init scene error")); + Assert.isTrue(1 == sceneConfigMapper.insert(sceneConfig), + () -> new EasyRetryServerException("init scene error")); } @Transactional @@ -159,18 +168,77 @@ public class RetryServiceImpl implements RetryService { @Transactional @Override - public Boolean moveDeadLetterAndDelFinish(String groupId) { + public Boolean moveDeadLetterAndDelFinish(String groupName) { - // 清除重试完成的数据 - clearFinishRetryData(groupId); + RequestDataHelper.setPartition(groupName); + List callbackRetryTasks = retryTaskMapper.selectPage(new PageDTO<>(0, 100), + new LambdaQueryWrapper() + .in(RetryTask::getRetryStatus, RetryStatusEnum.MAX_COUNT.getStatus(), + RetryStatusEnum.FINISH.getStatus()) + .eq(RetryTask::getTaskType, TaskTypeEnum.CALLBACK.getType()) + .eq(RetryTask::getGroupName, groupName)).getRecords(); - List retryTasks = retryTaskAccess.listRetryTaskByRetryCount(groupId, RetryStatusEnum.MAX_COUNT.getStatus()); - if (CollectionUtils.isEmpty(retryTasks)) { + if (CollectionUtils.isEmpty(callbackRetryTasks)) { return Boolean.TRUE; } + Set uniqueIdSet = callbackRetryTasks.stream().map(callbackTask -> { + String callbackTaskUniqueId = callbackTask.getUniqueId(); + return callbackTaskUniqueId.substring(callbackTaskUniqueId.lastIndexOf(StrUtil.UNDERLINE) + 1); + }).collect(Collectors.toSet()); + + RequestDataHelper.setPartition(groupName); + List retryTasks = retryTaskMapper.selectList(new LambdaQueryWrapper() + .eq(RetryTask::getTaskType, TaskTypeEnum.RETRY.getType()) + .in(RetryTask::getUniqueId, uniqueIdSet) + ); + // 迁移重试失败的数据 - moveDeadLetters(groupId, retryTasks); + List waitMoveDeadLetters = new ArrayList<>(); + List maxCountRetryTaskList = retryTasks.stream() + .filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.MAX_COUNT.getStatus())).collect( + Collectors.toList()); + if (!CollectionUtils.isEmpty(maxCountRetryTaskList)) { + waitMoveDeadLetters.addAll(maxCountRetryTaskList); + } + + List maxCountCallbackRetryTaskList = callbackRetryTasks.stream() + .filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.MAX_COUNT.getStatus())).collect( + Collectors.toList()); + + if (!CollectionUtils.isEmpty(maxCountRetryTaskList)) { + waitMoveDeadLetters.addAll(maxCountCallbackRetryTaskList); + } + + moveDeadLetters(groupName, waitMoveDeadLetters); + + // 删除重试完成的数据 + Set waitDelDeadLetters = new HashSet<>(); + Set finishRetryIdList = retryTasks.stream() + .filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.FINISH.getStatus())) + .map(RetryTask::getId) + .collect(Collectors.toSet()); + if (!CollectionUtils.isEmpty(finishRetryIdList)) { + waitDelDeadLetters.addAll(finishRetryIdList); + } + + Set finishCallbackRetryIdList = callbackRetryTasks.stream() + .filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.FINISH.getStatus())) + .map(RetryTask::getId) + .collect(Collectors.toSet()); + + // 迁移重试失败的数据 + if (!CollectionUtils.isEmpty(finishCallbackRetryIdList)) { + waitDelDeadLetters.addAll(finishCallbackRetryIdList); + } + + if (CollectionUtils.isEmpty(waitDelDeadLetters)) { + return Boolean.TRUE; + } + + RequestDataHelper.setPartition(groupName); + Assert.isTrue(waitDelDeadLetters.size() == retryTaskMapper.deleteBatchIds(waitDelDeadLetters), + () -> new EasyRetryServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retryTasks))); return Boolean.TRUE; } @@ -178,38 +246,24 @@ public class RetryServiceImpl implements RetryService { /** * 迁移死信队列数据 * - * @param groupName 组id + * @param groupName 组id * @param retryTasks 待迁移数据 */ private void moveDeadLetters(String groupName, List retryTasks) { - - List retryDeadLetters = new ArrayList<>(); - - for (RetryTask retryTask : retryTasks) { - RetryDeadLetter retryDeadLetter = new RetryDeadLetter(); - BeanUtils.copyProperties(retryTask, retryDeadLetter); - retryDeadLetter.setId(null); - retryDeadLetter.setCreateDt(LocalDateTime.now()); - retryDeadLetters.add(retryDeadLetter); + if (CollectionUtils.isEmpty(retryTasks)) { + return; } + List retryDeadLetters = RetryDeadLetterConverter.INSTANCE.toRetryDeadLetter(retryTasks); + GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(groupName); - Assert.isTrue(retryDeadLetters.size() == retryDeadLetterMapper.insertBatch(retryDeadLetters, groupConfig.getGroupPartition()), - () -> new EasyRetryServerException("插入死信队列失败 [{}]" , JsonUtil.toJsonString(retryDeadLetters))); + Assert.isTrue(retryDeadLetters.size() == retryDeadLetterMapper.insertBatch(retryDeadLetters, + groupConfig.getGroupPartition()), + () -> new EasyRetryServerException("插入死信队列失败 [{}]", JsonUtil.toJsonString(retryDeadLetters))); List ids = retryTasks.stream().map(RetryTask::getId).collect(Collectors.toList()); - Assert.isTrue(retryTasks.size() == retryTaskMapper.deleteBatch(ids, groupConfig.getGroupPartition()), - () -> new EasyRetryServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retryTasks))); - } - - /** - * 请求已完成的重试数据 - * - * @param groupId 组id - */ - private void clearFinishRetryData(String groupId) { - // 将已经重试完成的数据删除 - retryTaskAccess.deleteByDelayLevel(groupId, RetryStatusEnum.FINISH.getStatus()); + Assert.isTrue(retryTasks.size() == retryTaskMapper.deleteBatch(ids, groupConfig.getGroupPartition()), + () -> new EasyRetryServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retryTasks))); } /** diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/exec/ExecCallbackUnitActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/exec/ExecCallbackUnitActor.java index 2f58f4972..61f19767d 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/exec/ExecCallbackUnitActor.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/exec/ExecCallbackUnitActor.java @@ -2,20 +2,28 @@ package com.aizuda.easy.retry.server.support.dispatch.actor.exec; import akka.actor.AbstractActor; import akka.actor.ActorRef; +import cn.hutool.core.lang.Assert; +import cn.hutool.core.util.StrUtil; import com.aizuda.easy.retry.client.model.RetryCallbackDTO; import com.aizuda.easy.retry.server.akka.ActorGenerator; import com.aizuda.easy.retry.server.client.RequestBuilder; import com.aizuda.easy.retry.server.client.RpcClient; +import com.aizuda.easy.retry.server.config.RequestDataHelper; +import com.aizuda.easy.retry.server.config.SystemProperties; import com.aizuda.easy.retry.server.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.enums.StatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.common.core.util.JsonUtil; +import com.aizuda.easy.retry.server.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskMapper; import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; import com.aizuda.easy.retry.server.support.IdempotentStrategy; import com.aizuda.easy.retry.server.support.context.CallbackRetryContext; import com.aizuda.easy.retry.server.support.dispatch.actor.log.RetryTaskLogDTO; +import com.aizuda.easy.retry.server.support.handler.CallbackRetryTaskHandler; import com.aizuda.easy.retry.server.support.retry.RetryExecutor; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -25,6 +33,7 @@ import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; +import java.util.List; import java.util.Objects; import java.util.concurrent.Callable; @@ -45,6 +54,10 @@ public class ExecCallbackUnitActor extends AbstractActor { @Autowired @Qualifier("bitSetIdempotentStrategyHandler") private IdempotentStrategy idempotentStrategy; + @Autowired + private RetryTaskMapper retryTaskMapper; + @Autowired + private CallbackRetryTaskHandler callbackRetryTaskHandler; @Override public Receive createReceive() { @@ -100,10 +113,17 @@ public class ExecCallbackUnitActor extends AbstractActor { /** * 调用客户端 * - * @param retryTask {@link RetryTask} 需要重试的数据 + * @param callbackTask {@link RetryTask} 回调任务 * @return 重试结果返回值 */ - private Result callClient(RetryTask retryTask, RegisterNodeInfo serverNode) { + private Result callClient(RetryTask callbackTask, RegisterNodeInfo serverNode) { + + String retryTaskUniqueId = callbackRetryTaskHandler.getRetryTaskUniqueId(callbackTask.getUniqueId()); + RequestDataHelper.setPartition(callbackTask.getGroupName()); + RetryTask retryTask = retryTaskMapper.selectOne( + new LambdaQueryWrapper().eq(RetryTask::getUniqueId, retryTaskUniqueId)); + Assert.notNull(retryTask, () -> new EasyRetryServerException("未查询回调任务对应的重试任务. callbackUniqueId:[{}] uniqueId:[{}]", + callbackTask.getUniqueId(), retryTaskUniqueId)); // 回调参数 RetryCallbackDTO retryCallbackDTO = new RetryCallbackDTO(); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/Segment.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/Segment.java index 0cf2626f8..e297d3860 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/Segment.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/Segment.java @@ -2,6 +2,10 @@ package com.aizuda.easy.retry.server.support.generator.id; import java.util.concurrent.atomic.AtomicLong; +/** + * 特别声明: 此算法来自美团的leaf号段模式 + * see: https://github.com/Meituan-Dianping/Leaf/blob/master/leaf-server/src/main/java/com/sankuai/inf/leaf/server/service/SegmentService.java + */ public class Segment { private AtomicLong value = new AtomicLong(0); private volatile long max; diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentBuffer.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentBuffer.java index adf5cabc1..7008e4654 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentBuffer.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentBuffer.java @@ -9,7 +9,9 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** - * 双buffer + * 特别声明: 此算法来自美团的leaf号段模式 + * see: https://github.com/Meituan-Dianping/Leaf/blob/master/leaf-server/src/main/java/com/sankuai/inf/leaf/server/service/SegmentService.java + * */ @Data public class SegmentBuffer { diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentIdGenerator.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentIdGenerator.java index 40fef9c27..6e2a130c5 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentIdGenerator.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentIdGenerator.java @@ -32,9 +32,10 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; /** - * 此算法来自美团的leaf号段模式 + * 特别声明: 此算法来自美团的leaf号段模式 + * see: https://github.com/Meituan-Dianping/Leaf/blob/master/leaf-server/src/main/java/com/sankuai/inf/leaf/server/service/SegmentService.java * - * @author www.byteblogs.com + * @author www.byteblogs.com * @date 2023-05-04 * @since 1.2.0 */ diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/CallbackRetryTaskHandler.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/CallbackRetryTaskHandler.java index 37de552eb..06063491e 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/CallbackRetryTaskHandler.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/CallbackRetryTaskHandler.java @@ -1,7 +1,7 @@ package com.aizuda.easy.retry.server.support.handler; import cn.hutool.core.lang.Assert; -import com.aizuda.easy.retry.common.core.constant.SystemConstants; +import cn.hutool.core.util.StrUtil; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; import com.aizuda.easy.retry.server.config.SystemProperties; import com.aizuda.easy.retry.server.enums.TaskTypeEnum; @@ -13,6 +13,8 @@ import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess; import com.aizuda.easy.retry.server.service.convert.RetryTaskConverter; import com.aizuda.easy.retry.server.service.convert.RetryTaskLogConverter; import com.aizuda.easy.retry.server.support.strategy.WaitStrategies; +import org.slf4j.helpers.FormattingTuple; +import org.slf4j.helpers.MessageFormatter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; @@ -23,6 +25,8 @@ import java.util.concurrent.TimeUnit; /** + * 回调数据处理器 + * * @author www.byteblogs.com * @date 2023-06-04 * @since 1.5.0 @@ -30,6 +34,8 @@ import java.util.concurrent.TimeUnit; @Component public class CallbackRetryTaskHandler { + private static final String CALLBACK_UNIQUE_ID_RULE = "{}_{}"; + @Autowired @Qualifier("retryTaskAccessProcessor") private RetryTaskAccess retryTaskAccess; @@ -38,6 +44,11 @@ public class CallbackRetryTaskHandler { @Autowired private SystemProperties systemProperties; + /** + * 创建回调数据 + * + * @param retryTask {@link RetryTask} 重试任务数据 + */ @Transactional public void create(RetryTask retryTask) { if (!TaskTypeEnum.RETRY.getType().equals(retryTask.getTaskType())) { @@ -45,10 +56,9 @@ public class CallbackRetryTaskHandler { } RetryTask callbackRetryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryTask); - callbackRetryTask.setTaskType(TaskTypeEnum.CALLBACK.getType()); callbackRetryTask.setId(null); - callbackRetryTask.setUniqueId(systemProperties.getCallback().getPrefix() + retryTask.getUniqueId()); + callbackRetryTask.setUniqueId(generatorCallbackUniqueId(retryTask.getUniqueId())); callbackRetryTask.setRetryStatus(RetryStatusEnum.RUNNING.getStatus()); callbackRetryTask.setRetryCount(0); callbackRetryTask.setCreateDt(LocalDateTime.now()); @@ -68,4 +78,28 @@ public class CallbackRetryTaskHandler { } + /** + * 生成回调数据 + * + * @param uniqueId 重试任务uniqueId + * @return 回调任务uniqueId + */ + public String generatorCallbackUniqueId(String uniqueId) { + // eg: CB_202307180949471 + FormattingTuple callbackUniqueId = MessageFormatter.arrayFormat(CALLBACK_UNIQUE_ID_RULE, + new Object[]{systemProperties.getCallback().getPrefix(), uniqueId}); + + return callbackUniqueId.getMessage(); + } + + /** + * 获取重试任务uniqueId + * + * @param callbackTaskUniqueId 回调任务uniqueId + * @return 重试任务uniqueId + */ + public String getRetryTaskUniqueId(String callbackTaskUniqueId) { + return callbackTaskUniqueId.substring(callbackTaskUniqueId.lastIndexOf(StrUtil.UNDERLINE) + 1); + } + }