From 78abf3fa1ee35e0852e21beaffe3f905ed23abd2 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Mon, 5 Jun 2023 18:45:32 +0800 Subject: [PATCH] =?UTF-8?q?feat:=201.5.0=201.=20=E5=AE=8C=E6=88=90?= =?UTF-8?q?=E5=9B=9E=E8=B0=83=E9=80=9A=E7=9F=A5=E6=8C=81=E4=B9=85=E5=8C=96?= =?UTF-8?q?=E6=94=B9=E9=80=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/sql/easy_retry.sql | 6 +- .../callback/SimpleRetryCompleteCallback.java | 1 + .../retry/common/core/enums/TaskTypeEnum.java | 18 -- .../retry/server/akka/ActorGenerator.java | 13 +- .../easy/retry/server/enums/TaskTypeEnum.java | 21 +++ .../access/retry/MybatisRetryTaskAccess.java | 2 +- .../retry/server/support/RetryContext.java | 6 +- .../support/cache/CacheGroupScanActor.java | 15 +- .../support/dispatch/DispatchService.java | 43 ++--- .../server/support/dispatch/ScanTaskDTO.java | 13 ++ .../actor/exec/ExecCallbackUnitActor.java | 5 +- .../dispatch/actor/result/FailureActor.java | 9 +- .../dispatch/actor/result/FinishActor.java | 14 +- .../dispatch/actor/result/NoRetryActor.java | 7 +- .../actor/scan/AbstractScanGroup.java | 132 +++++++++++++ .../actor/scan/ScanCallbackGroupActor.java | 160 +++++----------- .../dispatch/actor/scan/ScanGroupActor.java | 175 ++++++------------ .../handler/CallbackRetryTaskHandler.java | 15 +- .../BitSetIdempotentStrategyHandler.java | 4 +- .../support/strategy/WaitStrategies.java | 3 +- 20 files changed, 346 insertions(+), 316 deletions(-) delete mode 100644 easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/TaskTypeEnum.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/enums/TaskTypeEnum.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/ScanTaskDTO.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/AbstractScanGroup.java diff --git a/doc/sql/easy_retry.sql b/doc/sql/easy_retry.sql index c99a3982..209ac5f8 100644 --- a/doc/sql/easy_retry.sql +++ b/doc/sql/easy_retry.sql @@ -43,7 +43,7 @@ CREATE TABLE `retry_dead_letter_0` `executor_name` varchar(512) NOT NULL DEFAULT '' COMMENT '执行器名称', `args_str` text NOT NULL COMMENT '执行方法参数', `ext_attrs` text NOT NULL COMMENT '扩展字段', - `task_type` tinyint(4) NOT NULL DEFAULT '1' COMMENT '任务类型 1、重试数据 2、回调数据', + `task_type` tinyint(4) NOT NULL DEFAULT '1' COMMENT '任务类型 1、重试数据 2、回调数据', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', PRIMARY KEY (`id`), KEY `idx_group_name_scene_name` (`group_name`, `scene_name`), @@ -67,7 +67,7 @@ CREATE TABLE `retry_task_0` `next_trigger_at` datetime NOT NULL COMMENT '下次触发时间', `retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数', `retry_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '重试状态 0、重试中 1、成功 2、最大重试次数', - `task_type` tinyint(4) NOT NULL DEFAULT '1' COMMENT '任务类型 1、重试数据 2、回调数据', + `task_type` tinyint(4) NOT NULL DEFAULT '1' COMMENT '任务类型 1、重试数据 2、回调数据', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', PRIMARY KEY (`id`), @@ -91,7 +91,7 @@ CREATE TABLE `retry_task_log` `args_str` text NOT NULL COMMENT '执行方法参数', `ext_attrs` text NOT NULL COMMENT '扩展字段', `retry_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '重试状态 0、失败 1、成功', - `task_type` tinyint(4) NOT NULL DEFAULT '1' COMMENT '任务类型 1、重试数据 2、回调数据', + `task_type` tinyint(4) NOT NULL DEFAULT '1' COMMENT '任务类型 1、重试数据 2、回调数据', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', `error_message` text NOT NULL COMMENT '异常信息', diff --git a/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/callback/SimpleRetryCompleteCallback.java b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/callback/SimpleRetryCompleteCallback.java index f88dbd7d..6f6ec393 100644 --- a/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/callback/SimpleRetryCompleteCallback.java +++ b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/callback/SimpleRetryCompleteCallback.java @@ -13,6 +13,7 @@ public class SimpleRetryCompleteCallback implements RetryCompleteCallback { @Override public void doSuccessCallback(String sceneName, String executorName, Object[] params) { + } @Override diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/TaskTypeEnum.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/TaskTypeEnum.java deleted file mode 100644 index 24bfa958..00000000 --- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/TaskTypeEnum.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.aizuda.easy.retry.common.core.enums; - -import lombok.AllArgsConstructor; -import lombok.Getter; - -/** - * @author www.byteblogs.com - * @date 2023-06-04 - * @since 2.0 - */ -@AllArgsConstructor -@Getter -public enum TaskTypeEnum { - RETRY(1), - CALLBACK(2); - - private final Integer type; -} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/akka/ActorGenerator.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/akka/ActorGenerator.java index 2bde2335..b85da487 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/akka/ActorGenerator.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/akka/ActorGenerator.java @@ -7,6 +7,8 @@ import com.aizuda.easy.retry.server.support.dispatch.actor.exec.ExecUnitActor; import com.aizuda.easy.retry.server.support.dispatch.actor.result.FailureActor; import com.aizuda.easy.retry.server.support.dispatch.actor.result.FinishActor; import com.aizuda.easy.retry.server.support.dispatch.actor.result.NoRetryActor; +import com.aizuda.easy.retry.server.support.dispatch.actor.scan.AbstractScanGroup; +import com.aizuda.easy.retry.server.support.dispatch.actor.scan.ScanCallbackGroupActor; import com.aizuda.easy.retry.server.support.dispatch.actor.scan.ScanGroupActor; import com.aizuda.easy.retry.common.core.context.SpringContext; @@ -52,7 +54,7 @@ public class ActorGenerator { * * @return actor 引用 */ - public static ActorRef callbackRetryResultActor() { + public static ActorRef execCallbackUnitActor() { return getDispatchResultActorSystem().actorOf(getSpringExtension().props(ExecCallbackUnitActor.BEAN_NAME)); } @@ -74,6 +76,15 @@ public class ActorGenerator { return getDispatchRetryActorSystem().actorOf(getSpringExtension().props(ScanGroupActor.BEAN_NAME)); } + /** + * 生成扫描重试数据的actor + * + * @return actor 引用 + */ + public static ActorRef scanCallbackGroupActor() { + return getDispatchRetryActorSystem().actorOf(getSpringExtension().props(ScanCallbackGroupActor.BEAN_NAME)); + } + public static SpringExtension getSpringExtension() { return SpringContext.getBeanByType(SpringExtension.class); } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/enums/TaskTypeEnum.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/enums/TaskTypeEnum.java new file mode 100644 index 00000000..da613a4b --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/enums/TaskTypeEnum.java @@ -0,0 +1,21 @@ +package com.aizuda.easy.retry.server.enums; + +import akka.actor.ActorRef; +import com.aizuda.easy.retry.server.akka.ActorGenerator; +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * @author www.byteblogs.com + * @date 2023-06-04 + * @since 2.0 + */ +@AllArgsConstructor +@Getter +public enum TaskTypeEnum { + RETRY(1, ActorGenerator.scanGroupActor()), + CALLBACK(2, ActorGenerator.scanCallbackGroupActor()); + + private final Integer type; + private final ActorRef actorRef; +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/retry/MybatisRetryTaskAccess.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/retry/MybatisRetryTaskAccess.java index 28503ed0..edbf17f4 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/retry/MybatisRetryTaskAccess.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/retry/MybatisRetryTaskAccess.java @@ -37,7 +37,7 @@ public class MybatisRetryTaskAccess extends AbstractRetryTaskAccess { .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()) .eq(RetryTask::getGroupName, groupName) .eq(RetryTask::getTaskType, taskType) - .ge(RetryTask::getCreateDt, lastAt) + .gt(RetryTask::getCreateDt, lastAt) .orderByAsc(RetryTask::getCreateDt)).getRecords(); } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/RetryContext.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/RetryContext.java index 6ae1252d..cb8fecab 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/RetryContext.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/RetryContext.java @@ -46,9 +46,13 @@ public interface RetryContext { */ void setWaitStrategy(WaitStrategy waitStrategy); + WaitStrategy getWaitStrategy(); + ServerNode getServerNode(); - Set getSceneBlacklist(); + Set getSceneBlacklist(); V getCallResult(); + + } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheGroupScanActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheGroupScanActor.java index 81633260..3c3dca38 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheGroupScanActor.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheGroupScanActor.java @@ -1,6 +1,7 @@ package com.aizuda.easy.retry.server.support.cache; import akka.actor.ActorRef; +import com.aizuda.easy.retry.server.enums.TaskTypeEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; @@ -28,8 +29,17 @@ public class CacheGroupScanActor implements Lifecycle { * * @return 缓存对象 */ - public static Cache getAll() { - return CACHE; + public static ActorRef get(String groupName, TaskTypeEnum typeEnum) { + return CACHE.getIfPresent(groupName.concat(typeEnum.name())); + } + + /** + * 获取所有缓存 + * + * @return 缓存对象 + */ + public static void put(String groupName, TaskTypeEnum typeEnum, ActorRef actorRef) { + CACHE.put(groupName.concat(typeEnum.name()), actorRef); } @Override @@ -44,5 +54,6 @@ public class CacheGroupScanActor implements Lifecycle { @Override public void close() { LogUtils.info(log, "CacheGroupScanActor stop"); + CACHE.invalidateAll(); } } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/DispatchService.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/DispatchService.java index 7a6cfdea..e1607fb1 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/DispatchService.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/DispatchService.java @@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.support.dispatch; import akka.actor.ActorRef; import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum; +import com.aizuda.easy.retry.server.enums.TaskTypeEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.akka.ActorGenerator; import com.aizuda.easy.retry.server.config.SystemProperties; @@ -23,11 +24,8 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; -import java.time.LocalDateTime; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -50,14 +48,6 @@ public class DispatchService implements Lifecycle { private final ScheduledExecutorService dispatchService = Executors .newSingleThreadScheduledExecutor(r -> new Thread(r, "DispatchService")); - /** - * 缓存待拉取数据的起点时间 - *

- * MAX_ID_MAP[key] = group 的 idHash MAX_ID_MAP[value] = retry_task的 create_at时间 - */ - public static final Map LAST_AT_MAP = new HashMap<>(); - public static final Map LAST_AT_CALL_BACK_MAP = new HashMap<>(); - /** * 调度时长 */ @@ -87,7 +77,9 @@ public class DispatchService implements Lifecycle { List currentHostGroupList = getCurrentHostGroupList(); if (!CollectionUtils.isEmpty(currentHostGroupList)) { for (GroupConfig groupConfigContext : currentHostGroupList) { - produceScanActorTask(groupConfigContext); + ScanTaskDTO scanTaskDTO = new ScanTaskDTO(); + scanTaskDTO.setGroupName(groupConfigContext.getGroupName()); + produceScanActorTask(scanTaskDTO); } } @@ -102,19 +94,23 @@ public class DispatchService implements Lifecycle { /** * 扫描任务生成器 * - * @param groupConfig {@link GroupConfig} 组上下文 + * @param scanTaskDTO {@link GroupConfig} 组上下文 */ - private void produceScanActorTask(GroupConfig groupConfig) { + private void produceScanActorTask(ScanTaskDTO scanTaskDTO) { - String groupName = groupConfig.getGroupName(); - - ActorRef scanActorRef = cacheActorRef(groupName); + String groupName = scanTaskDTO.getGroupName(); // 缓存按照 cacheRateLimiter(groupName); - // rebalance 和 group scan 流程合一 - scanActorRef.tell(groupConfig, scanActorRef); + // 扫描重试数据 + ActorRef scanRetryActorRef = cacheActorRef(groupName, TaskTypeEnum.RETRY); + scanRetryActorRef.tell(scanTaskDTO, scanRetryActorRef); + + // 扫描回调数据 + ActorRef scanCallbackActorRef = cacheActorRef(groupName, TaskTypeEnum.CALLBACK); + scanCallbackActorRef.tell(scanTaskDTO, scanCallbackActorRef); + } /** @@ -136,13 +132,12 @@ public class DispatchService implements Lifecycle { /** * 缓存Actor对象 */ - private ActorRef cacheActorRef(String groupName) { - Cache actorRefCache = CacheGroupScanActor.getAll(); - ActorRef scanActorRef = actorRefCache.getIfPresent(groupName); + private ActorRef cacheActorRef(String groupName, TaskTypeEnum typeEnum) { + ActorRef scanActorRef = CacheGroupScanActor.get(groupName, typeEnum); if (Objects.isNull(scanActorRef)) { - scanActorRef = ActorGenerator.scanGroupActor(); + scanActorRef = typeEnum.getActorRef(); // 缓存扫描器actor - actorRefCache.put(groupName, scanActorRef); + CacheGroupScanActor.put(groupName, typeEnum, scanActorRef); } return scanActorRef; } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/ScanTaskDTO.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/ScanTaskDTO.java new file mode 100644 index 00000000..e351d4dc --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/ScanTaskDTO.java @@ -0,0 +1,13 @@ +package com.aizuda.easy.retry.server.support.dispatch; + +import lombok.Data; + +/** + * @author: ww.byteblogs.com + * @date : 2023-06-05 16:30 + */ +@Data +public class ScanTaskDTO { + + String groupName; +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/exec/ExecCallbackUnitActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/exec/ExecCallbackUnitActor.java index 579c0229..1c5c49b8 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/exec/ExecCallbackUnitActor.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/exec/ExecCallbackUnitActor.java @@ -6,6 +6,7 @@ import com.aizuda.easy.retry.client.model.DispatchRetryDTO; import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO; import com.aizuda.easy.retry.client.model.RetryCallbackDTO; import com.aizuda.easy.retry.common.core.constant.SystemConstants; +import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.model.EasyRetryHeaders; import com.aizuda.easy.retry.common.core.model.Result; @@ -78,7 +79,7 @@ public class ExecCallbackUnitActor extends AbstractActor { retryTaskLog.setErrorMessage(context.getException().getMessage()); } } else { - retryTaskLog.setErrorMessage("暂无可用的客户端POD"); + retryTaskLog.setErrorMessage("There are currently no available client PODs."); } }catch (Exception e) { @@ -131,7 +132,7 @@ public class ExecCallbackUnitActor extends AbstractActor { Result result = restTemplate.postForObject(format, requestEntity, Result.class); LogUtils.info(log, "回调请求客户端 response:[{}}] ", JsonUtil.toJsonString(result)); - if (1 != result.getStatus() && StringUtils.isNotBlank(result.getMessage())) { + if (StatusEnum.YES.getStatus() != result.getStatus() && StringUtils.isNotBlank(result.getMessage())) { retryTaskLog.setErrorMessage(result.getMessage()); } else { DispatchRetryResultDTO data = JsonUtil.parseObject(JsonUtil.toJsonString(result.getData()), DispatchRetryResultDTO.class); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/FailureActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/FailureActor.java index f0170d6f..c0b56ef1 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/FailureActor.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/FailureActor.java @@ -1,16 +1,14 @@ package com.aizuda.easy.retry.server.support.dispatch.actor.result; import akka.actor.AbstractActor; -import akka.actor.ActorRef; import cn.hutool.core.lang.Assert; import com.aizuda.easy.retry.common.core.constant.SystemConstants; -import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum; +import com.aizuda.easy.retry.server.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.support.handler.CallbackRetryTaskHandler; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; -import com.aizuda.easy.retry.server.akka.ActorGenerator; import com.aizuda.easy.retry.server.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskLogMapper; import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; @@ -30,7 +28,6 @@ import org.springframework.transaction.support.TransactionTemplate; import org.springframework.util.CollectionUtils; import java.util.List; -import java.util.Objects; /** * 重试完成执行器 @@ -81,12 +78,12 @@ public class FailureActor extends AbstractActor { maxRetryCount = SystemConstants.CALL_BACK.MAX_RETRY_COUNT; } else { maxRetryCount = sceneConfig.getMaxRetryCount(); - // 创建一个回调任务 - callbackRetryTaskHandler.create(retryTask); } if (maxRetryCount <= retryTask.getRetryCount()) { retryTask.setRetryStatus(RetryStatusEnum.MAX_RETRY_COUNT.getStatus()); + // 创建一个回调任务 + callbackRetryTaskHandler.create(retryTask); } retryTaskAccess.updateRetryTask(retryTask); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/FinishActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/FinishActor.java index df9917c0..f57bbd56 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/FinishActor.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/FinishActor.java @@ -1,40 +1,30 @@ package com.aizuda.easy.retry.server.support.dispatch.actor.result; import akka.actor.AbstractActor; -import akka.actor.ActorRef; import cn.hutool.core.lang.Assert; -import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum; -import com.aizuda.easy.retry.server.service.convert.RetryTaskConverter; +import com.aizuda.easy.retry.server.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.support.handler.CallbackRetryTaskHandler; -import com.aizuda.easy.retry.server.support.strategy.WaitStrategies; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; -import com.aizuda.easy.retry.server.akka.ActorGenerator; import com.aizuda.easy.retry.server.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskLogMapper; import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTaskLog; import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang.StringUtils; -import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.annotation.Transactional; -import org.springframework.transaction.support.TransactionCallback; import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionTemplate; import org.springframework.util.CollectionUtils; -import java.time.LocalDateTime; import java.util.List; -import java.util.concurrent.TimeUnit; /** * 重试完成执行器 @@ -75,7 +65,7 @@ public class FinishActor extends AbstractActor { protected void doInTransactionWithoutResult(TransactionStatus status) { retryTaskAccess.updateRetryTask(retryTask); - if (TaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) { + if (TaskTypeEnum.RETRY.getType().equals(retryTask.getTaskType())) { // 创建一个回调任务 callbackRetryTaskHandler.create(retryTask); } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/NoRetryActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/NoRetryActor.java index 004fd901..a0dda9bb 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/NoRetryActor.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/NoRetryActor.java @@ -4,8 +4,8 @@ import akka.actor.AbstractActor; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess; +import com.aizuda.easy.retry.server.support.RetryContext; import com.aizuda.easy.retry.server.support.WaitStrategy; -import com.aizuda.easy.retry.server.support.context.MaxAttemptsPersistenceRetryContext; import com.aizuda.easy.retry.server.support.retry.RetryExecutor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -19,8 +19,9 @@ import org.springframework.stereotype.Component; * * @author: www.byteblogs.com * @date : 2022-04-14 16:11 + * @since 1.0.0 */ -@Component("NoRetryActor") +@Component(NoRetryActor.BEAN_NAME) @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) @Slf4j public class NoRetryActor extends AbstractActor { @@ -35,7 +36,7 @@ public class NoRetryActor extends AbstractActor { public Receive createReceive() { return receiveBuilder().match(RetryExecutor.class, retryExecutor -> { - MaxAttemptsPersistenceRetryContext retryContext = (MaxAttemptsPersistenceRetryContext) retryExecutor.getRetryContext(); + RetryContext retryContext = retryExecutor.getRetryContext(); RetryTask retryTask = retryContext.getRetryTask(); WaitStrategy waitStrategy = retryContext.getWaitStrategy(); retryTask.setNextTriggerAt(waitStrategy.computeRetryTime(retryContext)); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/AbstractScanGroup.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/AbstractScanGroup.java new file mode 100644 index 00000000..bde95ba1 --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/AbstractScanGroup.java @@ -0,0 +1,132 @@ +package com.aizuda.easy.retry.server.support.dispatch.actor.scan; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.config.SystemProperties; +import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; +import com.aizuda.easy.retry.server.persistence.support.ConfigAccess; +import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess; +import com.aizuda.easy.retry.server.support.IdempotentStrategy; +import com.aizuda.easy.retry.server.support.RetryContext; +import com.aizuda.easy.retry.server.support.dispatch.DispatchService; +import com.aizuda.easy.retry.server.support.dispatch.ScanTaskDTO; +import com.aizuda.easy.retry.server.support.handler.ClientNodeAllocateHandler; +import com.aizuda.easy.retry.server.support.retry.RetryExecutor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.util.CollectionUtils; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.Optional; + +/** + * 数据扫描模板类 + * + * @author: www.byteblogs.com + * @date : 2023-06-05 15:44 + * @since 1.5.0 + */ +@Slf4j +public abstract class AbstractScanGroup extends AbstractActor { + @Autowired + @Qualifier("retryTaskAccessProcessor") + protected RetryTaskAccess retryTaskAccessProcessor; + @Autowired + @Qualifier("bitSetIdempotentStrategyHandler") + protected IdempotentStrategy idempotentStrategy; + @Autowired + protected SystemProperties systemProperties; + @Autowired + @Qualifier("configAccessProcessor") + protected ConfigAccess configAccess; + @Autowired + protected ClientNodeAllocateHandler clientNodeAllocateHandler; + + @Override + public Receive createReceive() { + return receiveBuilder().match(ScanTaskDTO.class, config -> { + + try { + doScan(config); + } catch (Exception e) { + LogUtils.error(log, "Data scanner processing exception. [{}]", config, e); + } + + }).build(); + + } + + protected void doScan(final ScanTaskDTO scanTaskDTO) { + + LocalDateTime defLastAt = LocalDateTime.now().minusDays(systemProperties.getLastDays()); + + String groupName = scanTaskDTO.getGroupName(); + LocalDateTime lastAt = Optional.ofNullable(getLastAt(groupName)).orElse(defLastAt); + + // 扫描当前Group 待重试的数据 + List list = retryTaskAccessProcessor.listAvailableTasks(groupName, lastAt, systemProperties.getRetryPullPageSize(), + getTaskType()); + + if (!CollectionUtils.isEmpty(list)) { + + // 更新拉取的最大的创建时间 + putLastAt(scanTaskDTO.getGroupName(), list.get(list.size() - 1).getCreateDt()); + + for (RetryTask retryTask : list) { + + // 重试次数累加 + retryCountIncrement(retryTask); + + RetryContext retryContext = builderRetryContext(groupName, retryTask); + RetryExecutor executor = builderResultRetryExecutor(retryContext); + + if (!executor.filter()) { + continue; + } + + productExecUnitActor(executor); + } + } else { + // 数据为空则休眠5s + try { + Thread.sleep((DispatchService.PERIOD / 2) * 1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + putLastAt(groupName, defLastAt); + } + + } + + protected abstract RetryContext builderRetryContext(String groupName, RetryTask retryTask); + + protected abstract RetryExecutor builderResultRetryExecutor(RetryContext retryContext); + + protected abstract Integer getTaskType(); + + protected abstract LocalDateTime getLastAt(String groupName); + + protected abstract LocalDateTime putLastAt(String groupName, LocalDateTime LocalDateTime); + + private void retryCountIncrement(RetryTask retryTask) { + Integer retryCount = retryTask.getRetryCount(); + retryTask.setRetryCount(++retryCount); + } + + private void productExecUnitActor(RetryExecutor retryExecutor) { + String groupIdHash = retryExecutor.getRetryContext().getRetryTask().getGroupName(); + Long retryId = retryExecutor.getRetryContext().getRetryTask().getId(); + idempotentStrategy.set(groupIdHash, retryId.intValue()); + + // 重试成功回调客户端 + ActorRef actorRef = getActorRef(); + actorRef.tell(retryExecutor, actorRef); + } + + protected abstract ActorRef getActorRef(); + +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanCallbackGroupActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanCallbackGroupActor.java index b233994a..59b0d4cf 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanCallbackGroupActor.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanCallbackGroupActor.java @@ -1,38 +1,26 @@ package com.aizuda.easy.retry.server.support.dispatch.actor.scan; -import akka.actor.AbstractActor; import akka.actor.ActorRef; -import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO; -import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum; -import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.enums.TaskTypeEnum; import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.server.akka.ActorGenerator; -import com.aizuda.easy.retry.server.config.SystemProperties; -import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig; import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; -import com.aizuda.easy.retry.server.persistence.support.ConfigAccess; -import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess; -import com.aizuda.easy.retry.server.support.IdempotentStrategy; +import com.aizuda.easy.retry.server.support.RetryContext; import com.aizuda.easy.retry.server.support.WaitStrategy; import com.aizuda.easy.retry.server.support.context.CallbackRetryContext; -import com.aizuda.easy.retry.server.support.context.MaxAttemptsPersistenceRetryContext; -import com.aizuda.easy.retry.server.support.dispatch.DispatchService; -import com.aizuda.easy.retry.server.support.handler.ClientNodeAllocateHandler; import com.aizuda.easy.retry.server.support.retry.RetryBuilder; import com.aizuda.easy.retry.server.support.retry.RetryExecutor; import com.aizuda.easy.retry.server.support.strategy.FilterStrategies; import com.aizuda.easy.retry.server.support.strategy.StopStrategies; import com.aizuda.easy.retry.server.support.strategy.WaitStrategies; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; -import org.springframework.util.CollectionUtils; import java.time.LocalDateTime; -import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * @author www.byteblogs.com @@ -42,100 +30,55 @@ import java.util.List; @Component(ScanCallbackGroupActor.BEAN_NAME) @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) @Slf4j -public class ScanCallbackGroupActor extends AbstractActor { - - @Autowired - @Qualifier("retryTaskAccessProcessor") - private RetryTaskAccess retryTaskAccessProcessor; - - @Autowired - @Qualifier("bitSetIdempotentStrategyHandler") - private IdempotentStrategy idempotentStrategy; - - @Autowired - private SystemProperties systemProperties; - - @Autowired - @Qualifier("configAccessProcessor") - private ConfigAccess configAccess; - - @Autowired - private ClientNodeAllocateHandler clientNodeAllocateHandler; +public class ScanCallbackGroupActor extends AbstractScanGroup { public static final String BEAN_NAME = "ScanCallbackGroupActor"; + /** + * 缓存待拉取数据的起点时间 + *

+ * LAST_AT_MAP[key] = groupName LAST_AT_MAP[value] = retry_task的 create_at时间 + */ + public static final ConcurrentMap LAST_AT_MAP = new ConcurrentHashMap<>(); + @Override - public Receive createReceive() { - return receiveBuilder().match(GroupConfig.class, config -> { - - try { - doScan(config); - } catch (Exception e) { - LogUtils.error(log, "数据扫描器处理异常 [{}]", config, e); - } - - }).build(); + protected RetryContext builderRetryContext(final String groupName, final RetryTask retryTask) { + CallbackRetryContext retryContext = new CallbackRetryContext<>(); + retryContext.setRetryTask(retryTask); + retryContext.setSceneBlacklist(configAccess.getBlacklist(groupName)); + retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName())); + return retryContext; } - /** - * 扫描数据 - * - * @param groupConfig - */ - private void doScan(GroupConfig groupConfig) { + @Override + protected RetryExecutor builderResultRetryExecutor(RetryContext retryContext) { + return RetryBuilder.newBuilder() + .withStopStrategy(StopStrategies.stopException()) + .withStopStrategy(StopStrategies.stopResultStatus()) + .withWaitStrategy(getWaitWaitStrategy()) + .withFilterStrategy(FilterStrategies.delayLevelFilter()) + .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) + .withFilterStrategy(FilterStrategies.sceneBlackFilter()) + .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) + .withFilterStrategy(FilterStrategies.rateLimiterFilter()) + .withRetryContext(retryContext) + .build(); + } - LocalDateTime defLastAt = LocalDateTime.now().minusDays(systemProperties.getLastDays()); + @Override + protected Integer getTaskType() { + return TaskTypeEnum.CALLBACK.getType(); + } - String groupName = groupConfig.getGroupName(); - LocalDateTime lastAt = DispatchService.LAST_AT_CALL_BACK_MAP.getOrDefault(groupName, defLastAt); + @Override + protected LocalDateTime getLastAt(final String groupName) { + return LAST_AT_MAP.get(groupName); + } - // 扫描当前Group 待重试的数据 - List list = retryTaskAccessProcessor.listAvailableTasks(groupName, lastAt, systemProperties.getRetryPullPageSize(), - TaskTypeEnum.CALLBACK.getType()); - - if (!CollectionUtils.isEmpty(list)) { - - DispatchService.LAST_AT_MAP.put(groupConfig.getGroupName(), list.get(list.size() - 1).getCreateDt()); - - - for (RetryTask retryTask : list) { - - retryCountIncrement(retryTask); - - CallbackRetryContext retryContext = new CallbackRetryContext<>(); - retryContext.setRetryTask(retryTask); - retryContext.setSceneBlacklist(configAccess.getBlacklist(groupName)); - retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName())); - - RetryExecutor executor = RetryBuilder.newBuilder() - .withStopStrategy(StopStrategies.stopException()) - .withStopStrategy(StopStrategies.stopResultStatus()) - .withWaitStrategy(getWaitWaitStrategy()) - .withFilterStrategy(FilterStrategies.delayLevelFilter()) - .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) - .withFilterStrategy(FilterStrategies.sceneBlackFilter()) - .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) - .withFilterStrategy(FilterStrategies.rateLimiterFilter()) - .withRetryContext(retryContext) - .build(); - - if (!executor.filter()) { - continue; - } - - productExecUnitActor(executor); - } - } else { - // 数据为空则休眠5s - try { - Thread.sleep((DispatchService.PERIOD / 2) * 1000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - - DispatchService.LAST_AT_MAP.put(groupName, defLastAt); - } + @Override + protected LocalDateTime putLastAt(final String groupName, final LocalDateTime LocalDateTime) { + return LAST_AT_MAP.put(groupName, LocalDateTime); } private WaitStrategy getWaitWaitStrategy() { @@ -143,20 +86,9 @@ public class ScanCallbackGroupActor extends AbstractActor { return WaitStrategies.WaitStrategyEnum.getWaitStrategy(WaitStrategies.WaitStrategyEnum.FIXED.getBackOff()); } - private void retryCountIncrement(RetryTask retryTask) { - Integer retryCount = retryTask.getRetryCount(); - retryTask.setRetryCount(++retryCount); - } - - - private void productExecUnitActor(RetryExecutor retryExecutor) { - String groupIdHash = retryExecutor.getRetryContext().getRetryTask().getGroupName(); - Long retryId = retryExecutor.getRetryContext().getRetryTask().getId(); - idempotentStrategy.set(groupIdHash, retryId.intValue()); - - // 重试成功回调客户端 - ActorRef actorRef = ActorGenerator.callbackRetryResultActor(); - actorRef.tell(retryExecutor, actorRef); + @Override + protected ActorRef getActorRef() { + return ActorGenerator.execCallbackUnitActor(); } } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanGroupActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanGroupActor.java index 1df507be..9eb079af 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanGroupActor.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanGroupActor.java @@ -1,140 +1,87 @@ package com.aizuda.easy.retry.server.support.dispatch.actor.scan; -import akka.actor.AbstractActor; import akka.actor.ActorRef; -import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum; +import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO; +import com.aizuda.easy.retry.server.enums.TaskTypeEnum; +import com.aizuda.easy.retry.common.core.model.Result; +import com.aizuda.easy.retry.server.akka.ActorGenerator; +import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; +import com.aizuda.easy.retry.server.persistence.mybatis.po.SceneConfig; +import com.aizuda.easy.retry.server.support.RetryContext; +import com.aizuda.easy.retry.server.support.WaitStrategy; +import com.aizuda.easy.retry.server.support.context.MaxAttemptsPersistenceRetryContext; +import com.aizuda.easy.retry.server.support.retry.RetryBuilder; import com.aizuda.easy.retry.server.support.retry.RetryExecutor; import com.aizuda.easy.retry.server.support.strategy.FilterStrategies; import com.aizuda.easy.retry.server.support.strategy.StopStrategies; -import com.aizuda.easy.retry.common.core.log.LogUtils; -import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO; -import com.aizuda.easy.retry.common.core.model.Result; -import com.aizuda.easy.retry.server.akka.ActorGenerator; -import com.aizuda.easy.retry.server.config.SystemProperties; -import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig; -import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; -import com.aizuda.easy.retry.server.persistence.mybatis.po.SceneConfig; -import com.aizuda.easy.retry.server.persistence.support.ConfigAccess; -import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess; -import com.aizuda.easy.retry.server.support.IdempotentStrategy; -import com.aizuda.easy.retry.server.support.WaitStrategy; -import com.aizuda.easy.retry.server.support.context.MaxAttemptsPersistenceRetryContext; -import com.aizuda.easy.retry.server.support.dispatch.DispatchService; -import com.aizuda.easy.retry.server.support.handler.ClientNodeAllocateHandler; -import com.aizuda.easy.retry.server.support.retry.RetryBuilder; import com.aizuda.easy.retry.server.support.strategy.WaitStrategies; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; -import org.springframework.util.CollectionUtils; import java.time.LocalDateTime; -import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * @author www.byteblogs.com * @date 2021-10-30 * @since 2.0 */ -@Component("ScanGroupActor") +@Component(ScanGroupActor.BEAN_NAME) @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) @Slf4j -public class ScanGroupActor extends AbstractActor { - - @Autowired - @Qualifier("retryTaskAccessProcessor") - private RetryTaskAccess retryTaskAccessProcessor; - - @Autowired - @Qualifier("bitSetIdempotentStrategyHandler") - private IdempotentStrategy idempotentStrategy; - - @Autowired - private SystemProperties systemProperties; - - @Autowired - @Qualifier("configAccessProcessor") - private ConfigAccess configAccess; - - @Autowired - private ClientNodeAllocateHandler clientNodeAllocateHandler; +public class ScanGroupActor extends AbstractScanGroup { public static final String BEAN_NAME = "ScanGroupActor"; + /** + * 缓存待拉取数据的起点时间 + *

+ * LAST_AT_MAP[key] = groupName LAST_AT_MAP[value] = retry_task的 create_at时间 + */ + public static final ConcurrentMap LAST_AT_MAP = new ConcurrentHashMap<>(); @Override - public Receive createReceive() { - return receiveBuilder().match(GroupConfig.class, config -> { - - try { - doScan(config); - } catch (Exception e) { - LogUtils.error(log, "数据扫描器处理异常 [{}]", config, e); - } - - }).build(); - + protected RetryContext> builderRetryContext(final String groupName, + final RetryTask retryTask) { + MaxAttemptsPersistenceRetryContext> retryContext = new MaxAttemptsPersistenceRetryContext<>(); + retryContext.setRetryTask(retryTask); + retryContext.setSceneBlacklist(configAccess.getBlacklist(groupName)); + retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName())); + return retryContext; } - /** - * 扫描数据 - * - * @param groupConfig - */ - private void doScan(GroupConfig groupConfig) { + @Override + protected RetryExecutor> builderResultRetryExecutor(RetryContext retryContext) { - LocalDateTime defLastAt = LocalDateTime.now().minusDays(systemProperties.getLastDays()); + RetryTask retryTask = retryContext.getRetryTask(); + return RetryBuilder.>newBuilder() + .withStopStrategy(StopStrategies.stopException()) + .withStopStrategy(StopStrategies.stopResultStatusCode()) + .withWaitStrategy(getWaitWaitStrategy(retryTask.getGroupName(), retryTask.getSceneName())) + .withFilterStrategy(FilterStrategies.delayLevelFilter()) + .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) + .withFilterStrategy(FilterStrategies.sceneBlackFilter()) + .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) + .withFilterStrategy(FilterStrategies.rateLimiterFilter()) + .withRetryContext(retryContext) + .build(); + } - String groupName = groupConfig.getGroupName(); - LocalDateTime lastAt = DispatchService.LAST_AT_MAP.getOrDefault(groupName, defLastAt); + @Override + protected Integer getTaskType() { + return TaskTypeEnum.RETRY.getType(); + } - // 扫描当前Group 待重试的数据 - List list = retryTaskAccessProcessor.listAvailableTasks(groupName, lastAt, systemProperties.getRetryPullPageSize(), TaskTypeEnum.RETRY.getType()); + @Override + protected LocalDateTime getLastAt(final String groupName) { + return LAST_AT_MAP.get(groupName); + } - if (!CollectionUtils.isEmpty(list)) { - - DispatchService.LAST_AT_MAP.put(groupConfig.getGroupName(), list.get(list.size() - 1).getCreateDt()); - - - for (RetryTask retryTask : list) { - - retryCountIncrement(retryTask); - - MaxAttemptsPersistenceRetryContext> retryContext = new MaxAttemptsPersistenceRetryContext<>(); - retryContext.setRetryTask(retryTask); - retryContext.setSceneBlacklist(configAccess.getBlacklist(groupName)); - retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName())); - - RetryExecutor> executor = RetryBuilder.>newBuilder() - .withStopStrategy(StopStrategies.stopException()) - .withStopStrategy(StopStrategies.stopResultStatusCode()) - .withWaitStrategy(getWaitWaitStrategy(groupName, retryTask.getSceneName())) - .withFilterStrategy(FilterStrategies.delayLevelFilter()) - .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) - .withFilterStrategy(FilterStrategies.sceneBlackFilter()) - .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) - .withFilterStrategy(FilterStrategies.rateLimiterFilter()) - .withRetryContext(retryContext) - .build(); - - if (!executor.filter()) { - continue; - } - - productExecUnitActor(executor); - } - } else { - // 数据为空则休眠5s - try { - Thread.sleep((DispatchService.PERIOD / 2) * 1000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - - DispatchService.LAST_AT_MAP.put(groupName, defLastAt); - } + @Override + protected LocalDateTime putLastAt(final String groupName, final LocalDateTime LocalDateTime) { + return LAST_AT_MAP.put(groupName, LocalDateTime); } private WaitStrategy getWaitWaitStrategy(String groupName, String sceneName) { @@ -145,20 +92,10 @@ public class ScanGroupActor extends AbstractActor { return WaitStrategies.WaitStrategyEnum.getWaitStrategy(backOff); } - private void retryCountIncrement(RetryTask retryTask) { - Integer retryCount = retryTask.getRetryCount(); - retryTask.setRetryCount(++retryCount); + @Override + protected ActorRef getActorRef() { + return ActorGenerator.execUnitActor(); } - private void productExecUnitActor(RetryExecutor> retryExecutor) { - String groupIdHash = retryExecutor.getRetryContext().getRetryTask().getGroupName(); - Long retryId = retryExecutor.getRetryContext().getRetryTask().getId(); - idempotentStrategy.set(groupIdHash, retryId.intValue()); - - ActorRef execUnitActor = ActorGenerator.execUnitActor(); - // 将扫描到的数据tell 到执行单元中 - execUnitActor.tell(retryExecutor, execUnitActor); - } - } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/CallbackRetryTaskHandler.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/CallbackRetryTaskHandler.java index d46d569a..1c36afbe 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/CallbackRetryTaskHandler.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/CallbackRetryTaskHandler.java @@ -3,7 +3,7 @@ package com.aizuda.easy.retry.server.support.handler; import cn.hutool.core.lang.Assert; import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; -import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum; +import com.aizuda.easy.retry.server.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess; @@ -35,13 +35,14 @@ public class CallbackRetryTaskHandler { RetryTask callbackRetryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryTask); callbackRetryTask.setTaskType(TaskTypeEnum.CALLBACK.getType()); - retryTask.setUniqueId(SystemConstants.CALL_BACK.CB_ + retryTask.getUniqueId()); - retryTask.setRetryStatus(RetryStatusEnum.RUNNING.getStatus()); - retryTask.setRetryCount(0); - retryTask.setCreateDt(LocalDateTime.now()); - retryTask.setUpdateDt(LocalDateTime.now()); + callbackRetryTask.setId(null); + callbackRetryTask.setUniqueId(SystemConstants.CALL_BACK.CB_ + retryTask.getUniqueId()); + callbackRetryTask.setRetryStatus(RetryStatusEnum.RUNNING.getStatus()); + callbackRetryTask.setRetryCount(0); + callbackRetryTask.setCreateDt(LocalDateTime.now()); + callbackRetryTask.setUpdateDt(LocalDateTime.now()); - retryTask.setNextTriggerAt(WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null)); + callbackRetryTask.setNextTriggerAt(WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null)); Assert.isTrue(1 == retryTaskAccess.saveRetryTask(callbackRetryTask), () -> new EasyRetryServerException("failed to report data")); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/idempotent/BitSetIdempotentStrategyHandler.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/idempotent/BitSetIdempotentStrategyHandler.java index 0db2dc06..ea18bcac 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/idempotent/BitSetIdempotentStrategyHandler.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/idempotent/BitSetIdempotentStrategyHandler.java @@ -8,6 +8,8 @@ import java.util.BitSet; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * BitSet幂等校验器 @@ -21,7 +23,7 @@ public class BitSetIdempotentStrategyHandler implements IdempotentStrategy BIT_SET_MAP = new HashMap<>(); + public static final ConcurrentMap BIT_SET_MAP = new ConcurrentHashMap<>(); @Override public boolean set(String groupId, Integer key) { diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/strategy/WaitStrategies.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/strategy/WaitStrategies.java index f8fd3bcd..e1e86d24 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/strategy/WaitStrategies.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/strategy/WaitStrategies.java @@ -1,7 +1,7 @@ package com.aizuda.easy.retry.server.support.strategy; import com.aizuda.easy.retry.common.core.constant.SystemConstants; -import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum; +import com.aizuda.easy.retry.server.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; import com.aizuda.easy.retry.server.persistence.mybatis.po.SceneConfig; @@ -12,7 +12,6 @@ import com.aizuda.easy.retry.common.core.util.CronExpression; import com.google.common.base.Preconditions; import com.aizuda.easy.retry.server.support.RetryContext; import com.aizuda.easy.retry.server.support.WaitStrategy; -import com.aizuda.easy.retry.server.support.context.MaxAttemptsPersistenceRetryContext; import lombok.Data; import lombok.Getter;