From 2b5803c65acdfc568d8a36e038cc208be94213ba Mon Sep 17 00:00:00 2001 From: "www.byteblogs.com" <598092184@qq.com> Date: Sun, 4 Jun 2023 18:46:09 +0800 Subject: [PATCH] =?UTF-8?q?feat:=201.5.0=201.=20=E5=9B=9E=E8=B0=83?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E6=8C=81=E4=B9=85=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/sql/easy_retry.sql | 3 + .../common/core/constant/SystemConstants.java | 15 ++ .../retry/common/core/enums/TaskTypeEnum.java | 18 ++ .../retry/server/akka/ActorGenerator.java | 4 +- .../mybatis/po/RetryDeadLetter.java | 2 + .../persistence/mybatis/po/RetryTask.java | 2 + .../persistence/mybatis/po/RetryTaskLog.java | 2 + .../persistence/support/RetryTaskAccess.java | 2 +- .../access/retry/MybatisRetryTaskAccess.java | 6 +- .../processor/RetryTaskAccessProcessor.java | 4 +- .../service/convert/RetryTaskConverter.java | 2 + .../retry/server/support/RetryContext.java | 9 + .../support/context/CallbackRetryContext.java | 56 ++++++ .../support/dispatch/DispatchService.java | 1 + .../callback/CallbackRetryResultActor.java | 85 --------- .../actor/exec/ExecCallbackUnitActor.java | 151 ++++++++++++++++ .../dispatch/actor/exec/ExecUnitActor.java | 2 +- .../dispatch/actor/result/FailureActor.java | 43 +++-- .../dispatch/actor/result/FinishActor.java | 33 +++- .../actor/scan/ScanCallbackGroupActor.java | 162 ++++++++++++++++++ .../dispatch/actor/scan/ScanGroupActor.java | 5 +- .../handler/CallbackRetryTaskHandler.java | 50 ++++++ .../support/strategy/FilterStrategies.java | 13 +- .../support/strategy/StopStrategies.java | 54 +++++- .../support/strategy/WaitStrategies.java | 29 ++-- .../mapper/RetryDeadLetterMapper.xml | 3 +- .../resources/mapper/RetryTaskLogMapper.xml | 3 +- .../main/resources/mapper/RetryTaskMapper.xml | 3 +- 28 files changed, 617 insertions(+), 145 deletions(-) create mode 100644 easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/TaskTypeEnum.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/context/CallbackRetryContext.java delete mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/callback/CallbackRetryResultActor.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/exec/ExecCallbackUnitActor.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanCallbackGroupActor.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/CallbackRetryTaskHandler.java diff --git a/doc/sql/easy_retry.sql b/doc/sql/easy_retry.sql index 7dc07928c..c99a39823 100644 --- a/doc/sql/easy_retry.sql +++ b/doc/sql/easy_retry.sql @@ -43,6 +43,7 @@ CREATE TABLE `retry_dead_letter_0` `executor_name` varchar(512) NOT NULL DEFAULT '' COMMENT '执行器名称', `args_str` text NOT NULL COMMENT '执行方法参数', `ext_attrs` text NOT NULL COMMENT '扩展字段', + `task_type` tinyint(4) NOT NULL DEFAULT '1' COMMENT '任务类型 1、重试数据 2、回调数据', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', PRIMARY KEY (`id`), KEY `idx_group_name_scene_name` (`group_name`, `scene_name`), @@ -66,6 +67,7 @@ CREATE TABLE `retry_task_0` `next_trigger_at` datetime NOT NULL COMMENT '下次触发时间', `retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数', `retry_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '重试状态 0、重试中 1、成功 2、最大重试次数', + `task_type` tinyint(4) NOT NULL DEFAULT '1' COMMENT '任务类型 1、重试数据 2、回调数据', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', PRIMARY KEY (`id`), @@ -89,6 +91,7 @@ CREATE TABLE `retry_task_log` `args_str` text NOT NULL COMMENT '执行方法参数', `ext_attrs` text NOT NULL COMMENT '扩展字段', `retry_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '重试状态 0、失败 1、成功', + `task_type` tinyint(4) NOT NULL DEFAULT '1' COMMENT '任务类型 1、重试数据 2、回调数据', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', `error_message` text NOT NULL COMMENT '异常信息', diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java index 7905b0498..5b62dd1c5 100644 --- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java @@ -61,4 +61,19 @@ public interface SystemConstants { } + interface CALL_BACK { + /** + * 回调id前缀 + */ + String CB_ = "CB_"; + /** + * 最大重试次数 + */ + int MAX_RETRY_COUNT = 288; + /** + * 间隔时间 + */ + int TRIGGER_INTERVAL = 15 * 60; + } + } diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/TaskTypeEnum.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/TaskTypeEnum.java new file mode 100644 index 000000000..24bfa958b --- /dev/null +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/TaskTypeEnum.java @@ -0,0 +1,18 @@ +package com.aizuda.easy.retry.common.core.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * @author www.byteblogs.com + * @date 2023-06-04 + * @since 2.0 + */ +@AllArgsConstructor +@Getter +public enum TaskTypeEnum { + RETRY(1), + CALLBACK(2); + + private final Integer type; +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/akka/ActorGenerator.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/akka/ActorGenerator.java index 2895a01e2..2bde23359 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/akka/ActorGenerator.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/akka/ActorGenerator.java @@ -2,7 +2,7 @@ package com.aizuda.easy.retry.server.akka; import akka.actor.ActorRef; import akka.actor.ActorSystem; -import com.aizuda.easy.retry.server.support.dispatch.actor.callback.CallbackRetryResultActor; +import com.aizuda.easy.retry.server.support.dispatch.actor.exec.ExecCallbackUnitActor; import com.aizuda.easy.retry.server.support.dispatch.actor.exec.ExecUnitActor; import com.aizuda.easy.retry.server.support.dispatch.actor.result.FailureActor; import com.aizuda.easy.retry.server.support.dispatch.actor.result.FinishActor; @@ -53,7 +53,7 @@ public class ActorGenerator { * @return actor 引用 */ public static ActorRef callbackRetryResultActor() { - return getDispatchResultActorSystem().actorOf(getSpringExtension().props(CallbackRetryResultActor.BEAN_NAME)); + return getDispatchResultActorSystem().actorOf(getSpringExtension().props(ExecCallbackUnitActor.BEAN_NAME)); } /** diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/RetryDeadLetter.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/RetryDeadLetter.java index 3599b7934..ba9c1f965 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/RetryDeadLetter.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/RetryDeadLetter.java @@ -29,6 +29,8 @@ public class RetryDeadLetter implements Serializable { private String extAttrs; + private Integer taskType; + private LocalDateTime createDt; private static final long serialVersionUID = 1L; diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/RetryTask.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/RetryTask.java index 460d754df..953962e13 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/RetryTask.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/RetryTask.java @@ -37,6 +37,8 @@ public class RetryTask implements Serializable { private Integer retryStatus; + private Integer taskType; + private LocalDateTime createDt; private LocalDateTime updateDt; diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/RetryTaskLog.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/RetryTaskLog.java index 3121da53a..07f3f4ed4 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/RetryTaskLog.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/RetryTaskLog.java @@ -31,6 +31,8 @@ public class RetryTaskLog implements Serializable { private Integer retryStatus; + private Integer taskType; + private String errorMessage; private LocalDateTime createDt; diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/RetryTaskAccess.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/RetryTaskAccess.java index a7f5995ed..c01024d70 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/RetryTaskAccess.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/RetryTaskAccess.java @@ -14,7 +14,7 @@ public interface RetryTaskAccess { /** * 批量查询重试任务 */ - List listAvailableTasks(String groupName, LocalDateTime lastAt, Integer pageSize); + List listAvailableTasks(String groupName, LocalDateTime lastAt, Integer pageSize, Integer taskType); List listRetryTaskByRetryCount(String groupName, Integer retryStatus); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/retry/MybatisRetryTaskAccess.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/retry/MybatisRetryTaskAccess.java index 970442612..28503ed08 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/retry/MybatisRetryTaskAccess.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/retry/MybatisRetryTaskAccess.java @@ -30,12 +30,14 @@ public class MybatisRetryTaskAccess extends AbstractRetryTaskAccess { } @Override - public List listAvailableTasks(String groupName, LocalDateTime lastAt, Integer pageSize) { + public List listAvailableTasks(String groupName, LocalDateTime lastAt, Integer pageSize, Integer taskType) { setPartition(groupName); return retryTaskMapper.selectPage(new PageDTO<>(0, pageSize), new LambdaQueryWrapper() .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()) - .eq(RetryTask::getGroupName, groupName).ge(RetryTask::getCreateDt, lastAt) + .eq(RetryTask::getGroupName, groupName) + .eq(RetryTask::getTaskType, taskType) + .ge(RetryTask::getCreateDt, lastAt) .orderByAsc(RetryTask::getCreateDt)).getRecords(); } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/processor/RetryTaskAccessProcessor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/processor/RetryTaskAccessProcessor.java index 782db5998..c84828b3b 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/processor/RetryTaskAccessProcessor.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/processor/RetryTaskAccessProcessor.java @@ -30,8 +30,8 @@ public class RetryTaskAccessProcessor implements RetryTaskAccess { * 批量查询重试任务 */ @Override - public List listAvailableTasks(String groupName, LocalDateTime lastAt, Integer pageSize) { - return retryTaskAccesses.listAvailableTasks(groupName, lastAt, pageSize); + public List listAvailableTasks(String groupName, LocalDateTime lastAt, Integer pageSize, Integer taskType) { + return retryTaskAccesses.listAvailableTasks(groupName, lastAt, pageSize, taskType); } @Override diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/convert/RetryTaskConverter.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/convert/RetryTaskConverter.java index 5c249f818..51cb72c1e 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/convert/RetryTaskConverter.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/convert/RetryTaskConverter.java @@ -19,6 +19,8 @@ public interface RetryTaskConverter { RetryTask toRetryTask(RetryTaskDTO retryTaskDTO); + RetryTask toRetryTask(RetryTask retryTask); + RetryTask toRetryTask(RetryTaskSaveRequestVO retryTaskSaveRequestVO); List toRetryTaskList(List retryTaskDTOList); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/RetryContext.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/RetryContext.java index d4b3805a3..6ae1252d2 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/RetryContext.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/RetryContext.java @@ -1,6 +1,9 @@ package com.aizuda.easy.retry.server.support; import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; +import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode; + +import java.util.Set; /** * @author: www.byteblogs.com @@ -42,4 +45,10 @@ public interface RetryContext { * @param waitStrategy {@link WaitStrategy} 等待策略 */ void setWaitStrategy(WaitStrategy waitStrategy); + + ServerNode getServerNode(); + + Set getSceneBlacklist(); + + V getCallResult(); } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/context/CallbackRetryContext.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/context/CallbackRetryContext.java new file mode 100644 index 000000000..b67bb3c3a --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/context/CallbackRetryContext.java @@ -0,0 +1,56 @@ +package com.aizuda.easy.retry.server.support.context; + +import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; +import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode; +import com.aizuda.easy.retry.server.support.RetryContext; +import com.aizuda.easy.retry.server.support.WaitStrategy; +import lombok.Data; +import lombok.Getter; + +import java.util.Objects; +import java.util.Set; + +/** + * @author www.byteblogs.com + * @date 2023-06-04 + * @since 2.0 + */ +@Data +public class CallbackRetryContext implements RetryContext { + + /** + * 通知客户端回调结果 + */ + private V callResult; + + /** + * 异常信息 + */ + private Exception exception; + + /** + * 等待策略 + */ + private WaitStrategy waitStrategy; + + /** + * 当前重试数据 + */ + private RetryTask retryTask; + + /** + * 目前处理关闭的场景 + */ + private Set sceneBlacklist; + + /** + * 需要调度的节点 + */ + private ServerNode serverNode; + + @Override + public boolean hasException() { + return Objects.nonNull(exception); + } + +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/DispatchService.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/DispatchService.java index 694dc4949..7a6cfdeaa 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/DispatchService.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/DispatchService.java @@ -56,6 +56,7 @@ public class DispatchService implements Lifecycle { * MAX_ID_MAP[key] = group 的 idHash MAX_ID_MAP[value] = retry_task的 create_at时间 */ public static final Map LAST_AT_MAP = new HashMap<>(); + public static final Map LAST_AT_CALL_BACK_MAP = new HashMap<>(); /** * 调度时长 diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/callback/CallbackRetryResultActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/callback/CallbackRetryResultActor.java deleted file mode 100644 index 490dc149d..000000000 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/callback/CallbackRetryResultActor.java +++ /dev/null @@ -1,85 +0,0 @@ -package com.aizuda.easy.retry.server.support.dispatch.actor.callback; - -import akka.actor.AbstractActor; -import cn.hutool.core.util.IdUtil; -import com.aizuda.easy.retry.client.model.RetryCallbackDTO; -import com.aizuda.easy.retry.common.core.constant.SystemConstants; -import com.aizuda.easy.retry.common.core.log.LogUtils; -import com.aizuda.easy.retry.common.core.model.Result; -import com.aizuda.easy.retry.common.core.model.EasyRetryHeaders; -import com.aizuda.easy.retry.common.core.util.JsonUtil; -import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; -import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode; -import com.aizuda.easy.retry.server.support.handler.ClientNodeAllocateHandler; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.config.ConfigurableBeanFactory; -import org.springframework.context.annotation.Scope; -import org.springframework.http.HttpEntity; -import org.springframework.http.HttpHeaders; -import org.springframework.stereotype.Component; -import org.springframework.web.client.RestTemplate; - -import java.text.MessageFormat; -import java.util.Objects; - -/** - * @author: www.byteblogs.com - * @date : 2023-01-10 08:50 - */ -@Component("CallbackRetryResultActor") -@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) -@Slf4j -public class CallbackRetryResultActor extends AbstractActor { - - public static final String BEAN_NAME = "CallbackRetryResultActor"; - public static final String URL = "http://{0}:{1}/{2}/retry/callback/v1"; - - @Autowired - private RestTemplate restTemplate; - @Autowired - private ClientNodeAllocateHandler clientNodeAllocateHandler; - - @Override - public Receive createReceive() { - return receiveBuilder().match(RetryTask.class, retryTask->{ - - try { - ServerNode serverNode = clientNodeAllocateHandler.getServerNode(retryTask.getGroupName()); - if (Objects.isNull(serverNode)) { - LogUtils.warn(log, "暂无可用的客户端节点"); - return; - } - - // 回调参数 - RetryCallbackDTO retryCallbackDTO = new RetryCallbackDTO(); - retryCallbackDTO.setIdempotentId(retryTask.getIdempotentId()); - retryCallbackDTO.setRetryStatus(retryTask.getRetryStatus()); - retryCallbackDTO.setArgsStr(retryTask.getArgsStr()); - retryCallbackDTO.setScene(retryTask.getSceneName()); - retryCallbackDTO.setGroup(retryTask.getGroupName()); - retryCallbackDTO.setExecutorName(retryTask.getExecutorName()); - retryCallbackDTO.setUniqueId(retryTask.getUniqueId()); - - // 设置header - HttpHeaders requestHeaders = new HttpHeaders(); - EasyRetryHeaders easyRetryHeaders = new EasyRetryHeaders(); - easyRetryHeaders.setEasyRetry(Boolean.TRUE); - easyRetryHeaders.setEasyRetryId(retryTask.getUniqueId()); - requestHeaders.add(SystemConstants.EASY_RETRY_HEAD_KEY, JsonUtil.toJsonString(easyRetryHeaders)); - - HttpEntity requestEntity = new HttpEntity<>(retryCallbackDTO, requestHeaders); - - String format = MessageFormat.format(URL, serverNode.getHostIp(), serverNode.getHostPort().toString(), serverNode.getContextPath()); - Result result = restTemplate.postForObject(format, requestEntity, Result.class); - LogUtils.info(log, "回调请求客户端 response:[{}}] ", JsonUtil.toJsonString(result)); - - } finally { - getContext().stop(getSelf()); - } - - }).build(); - } - - -} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/exec/ExecCallbackUnitActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/exec/ExecCallbackUnitActor.java new file mode 100644 index 000000000..579c02292 --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/exec/ExecCallbackUnitActor.java @@ -0,0 +1,151 @@ +package com.aizuda.easy.retry.server.support.dispatch.actor.exec; + +import akka.actor.AbstractActor; +import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.client.model.DispatchRetryDTO; +import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO; +import com.aizuda.easy.retry.client.model.RetryCallbackDTO; +import com.aizuda.easy.retry.common.core.constant.SystemConstants; +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.common.core.model.EasyRetryHeaders; +import com.aizuda.easy.retry.common.core.model.Result; +import com.aizuda.easy.retry.common.core.util.JsonUtil; +import com.aizuda.easy.retry.server.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskLogMapper; +import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; +import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTaskLog; +import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode; +import com.aizuda.easy.retry.server.service.convert.RetryTaskLogConverter; +import com.aizuda.easy.retry.server.support.IdempotentStrategy; +import com.aizuda.easy.retry.server.support.context.CallbackRetryContext; +import com.aizuda.easy.retry.server.support.context.MaxAttemptsPersistenceRetryContext; +import com.aizuda.easy.retry.server.support.retry.RetryExecutor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; + +import java.text.MessageFormat; +import java.time.LocalDateTime; +import java.util.Objects; +import java.util.concurrent.Callable; + +/** + * 重试结果执行器 + * + * @author www.byteblogs.com + * @date 2021-10-30 + * @since 1.5.0 + */ +@Component(ExecCallbackUnitActor.BEAN_NAME) +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +@Slf4j +public class ExecCallbackUnitActor extends AbstractActor { + + public static final String BEAN_NAME = "ExecCallbackUnitActor"; + public static final String URL = "http://{0}:{1}/{2}/retry/callback/v1"; + + @Autowired + @Qualifier("bitSetIdempotentStrategyHandler") + private IdempotentStrategy idempotentStrategy; + @Autowired + private RetryTaskLogMapper retryTaskLogMapper; + @Autowired + private RestTemplate restTemplate; + + @Override + public Receive createReceive() { + return receiveBuilder().match(RetryExecutor.class, retryExecutor -> { + + CallbackRetryContext context = (CallbackRetryContext) retryExecutor.getRetryContext(); + RetryTask retryTask = context.getRetryTask(); + ServerNode serverNode = context.getServerNode(); + + RetryTaskLog retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTask(retryTask); + retryTaskLog.setErrorMessage(StringUtils.EMPTY); + + try { + + if (Objects.nonNull(serverNode)) { + retryExecutor.call((Callable>) () -> callClient(retryTask, retryTaskLog, serverNode)); + if (context.hasException()) { + retryTaskLog.setErrorMessage(context.getException().getMessage()); + } + } else { + retryTaskLog.setErrorMessage("暂无可用的客户端POD"); + } + + }catch (Exception e) { + LogUtils.error(log, "回调客户端失败 retryTask:[{}]", JsonUtil.toJsonString(retryTask), e); + retryTaskLog.setErrorMessage(StringUtils.isBlank(e.getMessage()) ? StringUtils.EMPTY : e.getMessage()); + } finally { + + // 清除幂等标识位 + idempotentStrategy.clear(retryTask.getGroupName(), retryTask.getId().intValue()); + getContext().stop(getSelf()); + + // 记录重试日志 + retryTaskLog.setCreateDt(LocalDateTime.now()); + retryTaskLog.setId(null); + Assert.isTrue(1 == retryTaskLogMapper.insert(retryTaskLog), + () -> new EasyRetryServerException("新增重试日志失败")); + } + + }).build(); + } + + /** + * 调用客户端 + * + * @param retryTask {@link RetryTask} 需要重试的数据 + * @return 重试结果返回值 + */ + private Result callClient(RetryTask retryTask, RetryTaskLog retryTaskLog, ServerNode serverNode) { + + // 回调参数 + RetryCallbackDTO retryCallbackDTO = new RetryCallbackDTO(); + retryCallbackDTO.setIdempotentId(retryTask.getIdempotentId()); + retryCallbackDTO.setRetryStatus(retryTask.getRetryStatus()); + retryCallbackDTO.setArgsStr(retryTask.getArgsStr()); + retryCallbackDTO.setScene(retryTask.getSceneName()); + retryCallbackDTO.setGroup(retryTask.getGroupName()); + retryCallbackDTO.setExecutorName(retryTask.getExecutorName()); + retryCallbackDTO.setUniqueId(retryTask.getUniqueId()); + + // 设置header + HttpHeaders requestHeaders = new HttpHeaders(); + EasyRetryHeaders easyRetryHeaders = new EasyRetryHeaders(); + easyRetryHeaders.setEasyRetry(Boolean.TRUE); + easyRetryHeaders.setEasyRetryId(retryTask.getUniqueId()); + requestHeaders.add(SystemConstants.EASY_RETRY_HEAD_KEY, JsonUtil.toJsonString(easyRetryHeaders)); + + HttpEntity requestEntity = new HttpEntity<>(retryCallbackDTO, requestHeaders); + + String format = MessageFormat.format(URL, serverNode.getHostIp(), serverNode.getHostPort().toString(), serverNode.getContextPath()); + Result result = restTemplate.postForObject(format, requestEntity, Result.class); + LogUtils.info(log, "回调请求客户端 response:[{}}] ", JsonUtil.toJsonString(result)); + + if (1 != result.getStatus() && StringUtils.isNotBlank(result.getMessage())) { + retryTaskLog.setErrorMessage(result.getMessage()); + } else { + DispatchRetryResultDTO data = JsonUtil.parseObject(JsonUtil.toJsonString(result.getData()), DispatchRetryResultDTO.class); + result.setData(data); + if (Objects.nonNull(data) && StringUtils.isNotBlank(data.getExceptionMsg())) { + retryTaskLog.setErrorMessage(data.getExceptionMsg()); + } + + } + + LogUtils.info(log, "请求客户端 response:[{}}] ", JsonUtil.toJsonString(result)); + return result; + + } + + +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/exec/ExecUnitActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/exec/ExecUnitActor.java index f82066217..6cf5a697b 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/exec/ExecUnitActor.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/exec/ExecUnitActor.java @@ -125,7 +125,7 @@ public class ExecUnitActor extends AbstractActor { HttpEntity requestEntity = new HttpEntity<>(dispatchRetryDTO, requestHeaders); String format = MessageFormat.format(URL, serverNode.getHostIp(), serverNode.getHostPort().toString(), serverNode.getContextPath()); - Result result = restTemplate.postForObject(format, requestEntity, Result.class); + Result result = restTemplate.postForObject(format, requestEntity, Result.class); if (1 != result.getStatus() && StringUtils.isNotBlank(result.getMessage())) { retryTaskLog.setErrorMessage(result.getMessage()); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/FailureActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/FailureActor.java index 8cb0d84ec..f0170d6f8 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/FailureActor.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/FailureActor.java @@ -3,6 +3,9 @@ package com.aizuda.easy.retry.server.support.dispatch.actor.result; import akka.actor.AbstractActor; import akka.actor.ActorRef; import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.common.core.constant.SystemConstants; +import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum; +import com.aizuda.easy.retry.server.support.handler.CallbackRetryTaskHandler; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; @@ -21,6 +24,9 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionTemplate; import org.springframework.util.CollectionUtils; import java.util.List; @@ -45,13 +51,15 @@ public class FailureActor extends AbstractActor { @Autowired @Qualifier("retryTaskAccessProcessor") private RetryTaskAccess retryTaskAccess; - @Autowired @Qualifier("configAccessProcessor") private ConfigAccess configAccess; - @Autowired private RetryTaskLogMapper retryTaskLogMapper; + @Autowired + private CallbackRetryTaskHandler callbackRetryTaskHandler; + @Autowired + private TransactionTemplate transactionTemplate; @Override public Receive createReceive() { @@ -62,19 +70,28 @@ public class FailureActor extends AbstractActor { SceneConfig sceneConfig = configAccess.getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName()); - ActorRef actorRef = null; - if (sceneConfig.getMaxRetryCount() <= retryTask.getRetryCount()) { - retryTask.setRetryStatus(RetryStatusEnum.MAX_RETRY_COUNT.getStatus()); - actorRef = ActorGenerator.callbackRetryResultActor(); - } - try { - retryTaskAccess.updateRetryTask(retryTask); - // 重试成功回调客户端 - if (Objects.nonNull(actorRef)) { - actorRef.tell(retryTask, actorRef); - } + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + @Override + protected void doInTransactionWithoutResult(TransactionStatus status) { + + Integer maxRetryCount; + if (TaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) { + maxRetryCount = SystemConstants.CALL_BACK.MAX_RETRY_COUNT; + } else { + maxRetryCount = sceneConfig.getMaxRetryCount(); + // 创建一个回调任务 + callbackRetryTaskHandler.create(retryTask); + } + + if (maxRetryCount <= retryTask.getRetryCount()) { + retryTask.setRetryStatus(RetryStatusEnum.MAX_RETRY_COUNT.getStatus()); + } + + retryTaskAccess.updateRetryTask(retryTask); + } + }); } catch (Exception e) { LogUtils.error(log,"更新重试任务失败", e); } finally { diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/FinishActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/FinishActor.java index 6a1e07efc..df9917c09 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/FinishActor.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/result/FinishActor.java @@ -3,6 +3,10 @@ package com.aizuda.easy.retry.server.support.dispatch.actor.result; import akka.actor.AbstractActor; import akka.actor.ActorRef; import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum; +import com.aizuda.easy.retry.server.service.convert.RetryTaskConverter; +import com.aizuda.easy.retry.server.support.handler.CallbackRetryTaskHandler; +import com.aizuda.easy.retry.server.support.strategy.WaitStrategies; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; @@ -14,14 +18,23 @@ import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTaskLog; import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; +import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionCallback; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionTemplate; import org.springframework.util.CollectionUtils; +import java.time.LocalDateTime; import java.util.List; +import java.util.concurrent.TimeUnit; /** * 重试完成执行器 @@ -42,9 +55,12 @@ public class FinishActor extends AbstractActor { @Autowired @Qualifier("retryTaskAccessProcessor") private RetryTaskAccess retryTaskAccess; - @Autowired private RetryTaskLogMapper retryTaskLogMapper; + @Autowired + private CallbackRetryTaskHandler callbackRetryTaskHandler; + @Autowired + private TransactionTemplate transactionTemplate; @Override public Receive createReceive() { @@ -54,11 +70,18 @@ public class FinishActor extends AbstractActor { retryTask.setRetryStatus(RetryStatusEnum.FINISH.getStatus()); try { - retryTaskAccess.updateRetryTask(retryTask); + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + @Override + protected void doInTransactionWithoutResult(TransactionStatus status) { + retryTaskAccess.updateRetryTask(retryTask); + + if (TaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) { + // 创建一个回调任务 + callbackRetryTaskHandler.create(retryTask); + } + } + }); - // 重试成功回调客户端 - ActorRef actorRef = ActorGenerator.callbackRetryResultActor(); - actorRef.tell(retryTask, actorRef); }catch (Exception e) { LogUtils.error(log, "更新重试任务失败", e); } finally { diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanCallbackGroupActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanCallbackGroupActor.java new file mode 100644 index 000000000..b233994ab --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanCallbackGroupActor.java @@ -0,0 +1,162 @@ +package com.aizuda.easy.retry.server.support.dispatch.actor.scan; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO; +import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum; +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.common.core.model.Result; +import com.aizuda.easy.retry.server.akka.ActorGenerator; +import com.aizuda.easy.retry.server.config.SystemProperties; +import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig; +import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; +import com.aizuda.easy.retry.server.persistence.support.ConfigAccess; +import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess; +import com.aizuda.easy.retry.server.support.IdempotentStrategy; +import com.aizuda.easy.retry.server.support.WaitStrategy; +import com.aizuda.easy.retry.server.support.context.CallbackRetryContext; +import com.aizuda.easy.retry.server.support.context.MaxAttemptsPersistenceRetryContext; +import com.aizuda.easy.retry.server.support.dispatch.DispatchService; +import com.aizuda.easy.retry.server.support.handler.ClientNodeAllocateHandler; +import com.aizuda.easy.retry.server.support.retry.RetryBuilder; +import com.aizuda.easy.retry.server.support.retry.RetryExecutor; +import com.aizuda.easy.retry.server.support.strategy.FilterStrategies; +import com.aizuda.easy.retry.server.support.strategy.StopStrategies; +import com.aizuda.easy.retry.server.support.strategy.WaitStrategies; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.time.LocalDateTime; +import java.util.List; + +/** + * @author www.byteblogs.com + * @date 2021-10-30 + * @since 1.5.0 + */ +@Component(ScanCallbackGroupActor.BEAN_NAME) +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +@Slf4j +public class ScanCallbackGroupActor extends AbstractActor { + + @Autowired + @Qualifier("retryTaskAccessProcessor") + private RetryTaskAccess retryTaskAccessProcessor; + + @Autowired + @Qualifier("bitSetIdempotentStrategyHandler") + private IdempotentStrategy idempotentStrategy; + + @Autowired + private SystemProperties systemProperties; + + @Autowired + @Qualifier("configAccessProcessor") + private ConfigAccess configAccess; + + @Autowired + private ClientNodeAllocateHandler clientNodeAllocateHandler; + + public static final String BEAN_NAME = "ScanCallbackGroupActor"; + + @Override + public Receive createReceive() { + return receiveBuilder().match(GroupConfig.class, config -> { + + try { + doScan(config); + } catch (Exception e) { + LogUtils.error(log, "数据扫描器处理异常 [{}]", config, e); + } + + }).build(); + + } + + /** + * 扫描数据 + * + * @param groupConfig + */ + private void doScan(GroupConfig groupConfig) { + + LocalDateTime defLastAt = LocalDateTime.now().minusDays(systemProperties.getLastDays()); + + String groupName = groupConfig.getGroupName(); + LocalDateTime lastAt = DispatchService.LAST_AT_CALL_BACK_MAP.getOrDefault(groupName, defLastAt); + + // 扫描当前Group 待重试的数据 + List list = retryTaskAccessProcessor.listAvailableTasks(groupName, lastAt, systemProperties.getRetryPullPageSize(), + TaskTypeEnum.CALLBACK.getType()); + + if (!CollectionUtils.isEmpty(list)) { + + DispatchService.LAST_AT_MAP.put(groupConfig.getGroupName(), list.get(list.size() - 1).getCreateDt()); + + + for (RetryTask retryTask : list) { + + retryCountIncrement(retryTask); + + CallbackRetryContext retryContext = new CallbackRetryContext<>(); + retryContext.setRetryTask(retryTask); + retryContext.setSceneBlacklist(configAccess.getBlacklist(groupName)); + retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName())); + + RetryExecutor executor = RetryBuilder.newBuilder() + .withStopStrategy(StopStrategies.stopException()) + .withStopStrategy(StopStrategies.stopResultStatus()) + .withWaitStrategy(getWaitWaitStrategy()) + .withFilterStrategy(FilterStrategies.delayLevelFilter()) + .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) + .withFilterStrategy(FilterStrategies.sceneBlackFilter()) + .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) + .withFilterStrategy(FilterStrategies.rateLimiterFilter()) + .withRetryContext(retryContext) + .build(); + + if (!executor.filter()) { + continue; + } + + productExecUnitActor(executor); + } + } else { + // 数据为空则休眠5s + try { + Thread.sleep((DispatchService.PERIOD / 2) * 1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + DispatchService.LAST_AT_MAP.put(groupName, defLastAt); + } + } + + private WaitStrategy getWaitWaitStrategy() { + // 回调失败每15min重试一次 + return WaitStrategies.WaitStrategyEnum.getWaitStrategy(WaitStrategies.WaitStrategyEnum.FIXED.getBackOff()); + } + + private void retryCountIncrement(RetryTask retryTask) { + Integer retryCount = retryTask.getRetryCount(); + retryTask.setRetryCount(++retryCount); + } + + + private void productExecUnitActor(RetryExecutor retryExecutor) { + String groupIdHash = retryExecutor.getRetryContext().getRetryTask().getGroupName(); + Long retryId = retryExecutor.getRetryContext().getRetryTask().getId(); + idempotentStrategy.set(groupIdHash, retryId.intValue()); + + // 重试成功回调客户端 + ActorRef actorRef = ActorGenerator.callbackRetryResultActor(); + actorRef.tell(retryExecutor, actorRef); + } + +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanGroupActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanGroupActor.java index c64064e78..1df507bea 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanGroupActor.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/ScanGroupActor.java @@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.support.dispatch.actor.scan; import akka.actor.AbstractActor; import akka.actor.ActorRef; +import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.support.retry.RetryExecutor; import com.aizuda.easy.retry.server.support.strategy.FilterStrategies; import com.aizuda.easy.retry.server.support.strategy.StopStrategies; @@ -90,7 +91,7 @@ public class ScanGroupActor extends AbstractActor { LocalDateTime lastAt = DispatchService.LAST_AT_MAP.getOrDefault(groupName, defLastAt); // 扫描当前Group 待重试的数据 - List list = retryTaskAccessProcessor.listAvailableTasks(groupName, lastAt, systemProperties.getRetryPullPageSize()); + List list = retryTaskAccessProcessor.listAvailableTasks(groupName, lastAt, systemProperties.getRetryPullPageSize(), TaskTypeEnum.RETRY.getType()); if (!CollectionUtils.isEmpty(list)) { @@ -108,7 +109,7 @@ public class ScanGroupActor extends AbstractActor { RetryExecutor> executor = RetryBuilder.>newBuilder() .withStopStrategy(StopStrategies.stopException()) - .withStopStrategy(StopStrategies.stopResultStatus()) + .withStopStrategy(StopStrategies.stopResultStatusCode()) .withWaitStrategy(getWaitWaitStrategy(groupName, retryTask.getSceneName())) .withFilterStrategy(FilterStrategies.delayLevelFilter()) .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/CallbackRetryTaskHandler.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/CallbackRetryTaskHandler.java new file mode 100644 index 000000000..d46d569a8 --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/CallbackRetryTaskHandler.java @@ -0,0 +1,50 @@ +package com.aizuda.easy.retry.server.support.handler; + +import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.common.core.constant.SystemConstants; +import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; +import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum; +import com.aizuda.easy.retry.server.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; +import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess; +import com.aizuda.easy.retry.server.service.convert.RetryTaskConverter; +import com.aizuda.easy.retry.server.support.strategy.WaitStrategies; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; +import java.util.concurrent.TimeUnit; + + +/** + * @author www.byteblogs.com + * @date 2023-06-04 + * @since 1.5.0 + */ +@Component +public class CallbackRetryTaskHandler { + + @Autowired + @Qualifier("retryTaskAccessProcessor") + private RetryTaskAccess retryTaskAccess; + + @Transactional + public void create(RetryTask retryTask) { + RetryTask callbackRetryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryTask); + + callbackRetryTask.setTaskType(TaskTypeEnum.CALLBACK.getType()); + retryTask.setUniqueId(SystemConstants.CALL_BACK.CB_ + retryTask.getUniqueId()); + retryTask.setRetryStatus(RetryStatusEnum.RUNNING.getStatus()); + retryTask.setRetryCount(0); + retryTask.setCreateDt(LocalDateTime.now()); + retryTask.setUpdateDt(LocalDateTime.now()); + + retryTask.setNextTriggerAt(WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null)); + + Assert.isTrue(1 == retryTaskAccess.saveRetryTask(callbackRetryTask), () -> new EasyRetryServerException("failed to report data")); + + } + +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/strategy/FilterStrategies.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/strategy/FilterStrategies.java index 57c56294e..b91e28be7 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/strategy/FilterStrategies.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/strategy/FilterStrategies.java @@ -85,9 +85,7 @@ public class FilterStrategies { @Override public boolean filter(RetryContext retryContext) { - MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryContext; - - LocalDateTime nextTriggerAt = context.getRetryTask().getNextTriggerAt(); + LocalDateTime nextTriggerAt = retryContext.getRetryTask().getNextTriggerAt(); return nextTriggerAt.isBefore(LocalDateTime.now()); } @@ -131,8 +129,7 @@ public class FilterStrategies { @Override public boolean filter(RetryContext retryContext) { - MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryContext; - return !context.getSceneBlacklist().contains(retryContext.getRetryTask().getSceneName()); + return !retryContext.getSceneBlacklist().contains(retryContext.getRetryTask().getSceneName()); } @Override @@ -148,8 +145,7 @@ public class FilterStrategies { @Override public boolean filter(RetryContext retryContext) { - MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryContext; - ServerNode serverNode = context.getServerNode(); + ServerNode serverNode = retryContext.getServerNode(); if (Objects.isNull(serverNode)) { return false; @@ -173,8 +169,7 @@ public class FilterStrategies { @Override public boolean filter(RetryContext retryContext) { - MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryContext; - ServerNode serverNode = context.getServerNode(); + ServerNode serverNode = retryContext.getServerNode(); RateLimiter rateLimiter = CacheGroupRateLimiter.getRateLimiterByKey(serverNode.getHostId()); if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(100, TimeUnit.MILLISECONDS)) { diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/strategy/StopStrategies.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/strategy/StopStrategies.java index 1ee7dc120..3226bcb9c 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/strategy/StopStrategies.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/strategy/StopStrategies.java @@ -31,14 +31,23 @@ public class StopStrategies { } /** - * 根据客户端返回结果判断是否终止重试 + * 根据客户端返回状态判断是否终止重试 * - * @return {@link ResultStatusStopStrategy} 重试结果停止策略 + * @return {@link ResultStatusCodeStopStrategy} 重试结果停止策略 */ public static StopStrategy stopResultStatus() { return new ResultStatusStopStrategy(); } + /** + * 根据客户端返回结果对象的状态码判断是否终止重试 + * + * @return {@link ResultStatusCodeStopStrategy} 重试结果停止策略 + */ + public static StopStrategy stopResultStatusCode() { + return new ResultStatusCodeStopStrategy(); + } + /** * 调用客户端发生异常触发停止策略 */ @@ -61,20 +70,47 @@ public class StopStrategies { } /** - * 根据客户端返回结果集判断是否应该停止 - * + * 根据客户端返回的状态码判断是否应该停止 + *

* 1、{@link Result#getStatus()} 不为1 则继续重试 - * 2、根据{@link Result#getData()}中的statusCode判断是否停止 */ private static final class ResultStatusStopStrategy implements StopStrategy { @Override public boolean shouldStop(RetryContext retryContext) { - MaxAttemptsPersistenceRetryContext> context = - (MaxAttemptsPersistenceRetryContext>) retryContext; + Result response = (Result) retryContext.getCallResult(); - Result response = context.getCallResult(); + if (Objects.isNull(response) || StatusEnum.YES.getStatus() != response.getStatus()) { + return Boolean.FALSE; + } + + return Boolean.TRUE; + } + + @Override + public boolean supports(RetryContext retryContext) { + return true; + } + + @Override + public int order() { + return 2; + } + } + + /** + * 根据客户端返回结果集判断是否应该停止 + *

+ * 1、{@link Result#getStatus()} 不为1 则继续重试 + * 2、根据{@link Result#getData()}中的statusCode判断是否停止 + */ + private static final class ResultStatusCodeStopStrategy implements StopStrategy { + + @Override + public boolean shouldStop(RetryContext retryContext) { + + Result response = (Result) retryContext.getCallResult(); if (Objects.isNull(response) || StatusEnum.YES.getStatus() != response.getStatus()) { return Boolean.FALSE; @@ -97,7 +133,7 @@ public class StopStrategies { @Override public int order() { - return 2; + return 3; } } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/strategy/WaitStrategies.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/strategy/WaitStrategies.java index 01d16ba11..f8fd3bcd1 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/strategy/WaitStrategies.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/strategy/WaitStrategies.java @@ -1,5 +1,7 @@ package com.aizuda.easy.retry.server.support.strategy; +import com.aizuda.easy.retry.common.core.constant.SystemConstants; +import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; import com.aizuda.easy.retry.server.persistence.mybatis.po.SceneConfig; @@ -145,8 +147,7 @@ public class WaitStrategies { private static final class DelayLevelWaitStrategy implements WaitStrategy { @Override - public LocalDateTime computeRetryTime(RetryContext retryContext) { - MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryContext; + public LocalDateTime computeRetryTime(RetryContext context) { RetryTask retryTask = context.getRetryTask(); DelayLevelEnum levelEnum = DelayLevelEnum.getDelayLevelByLevel(retryTask.getRetryCount()); return retryTask.getNextTriggerAt().plus(levelEnum.getTime(), levelEnum.getUnit()); @@ -160,13 +161,20 @@ public class WaitStrategies { @Override public LocalDateTime computeRetryTime(RetryContext retryContext) { - MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryContext; - RetryTask retryTask = context.getRetryTask(); - ConfigAccess configAccess = SpringContext.CONTEXT.getBean("configAccessProcessor", ConfigAccess.class); + RetryTask retryTask = retryContext.getRetryTask(); + int triggerInterval; + if (TaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) { + // 回调失败的默认15分钟执行一次重试 + triggerInterval = SystemConstants.CALL_BACK.TRIGGER_INTERVAL; + } else { + ConfigAccess configAccess = SpringContext.CONTEXT.getBean("configAccessProcessor", ConfigAccess.class); + SceneConfig sceneConfig = + configAccess.getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName()); + triggerInterval = Integer.parseInt(sceneConfig.getTriggerInterval()); + } - SceneConfig sceneConfig = - configAccess.getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName()); - return retryTask.getNextTriggerAt().plusSeconds(Integer.parseInt(sceneConfig.getTriggerInterval())); + + return retryTask.getNextTriggerAt().plusSeconds(triggerInterval); } } @@ -176,8 +184,7 @@ public class WaitStrategies { private static final class CronWaitStrategy implements WaitStrategy { @Override - public LocalDateTime computeRetryTime(RetryContext retryContext) { - MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryContext; + public LocalDateTime computeRetryTime(RetryContext context) { RetryTask retryTask = context.getRetryTask(); ConfigAccess configAccess = SpringContext.CONTEXT.getBean(ConfigAccess.class); @@ -185,7 +192,7 @@ public class WaitStrategies { SceneConfig sceneConfig = configAccess.getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName()); - Date nextValidTime = null; + Date nextValidTime; try { ZonedDateTime zdt = retryTask.getNextTriggerAt().atZone(ZoneOffset.ofHours(8)); nextValidTime = new CronExpression(sceneConfig.getTriggerInterval()).getNextValidTimeAfter(Date.from(zdt.toInstant())); diff --git a/easy-retry-server/src/main/resources/mapper/RetryDeadLetterMapper.xml b/easy-retry-server/src/main/resources/mapper/RetryDeadLetterMapper.xml index 3dd8acd46..37caef594 100644 --- a/easy-retry-server/src/main/resources/mapper/RetryDeadLetterMapper.xml +++ b/easy-retry-server/src/main/resources/mapper/RetryDeadLetterMapper.xml @@ -11,10 +11,11 @@ + - id, unique_id, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, create_dt + id, unique_id, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, create_dt, task_type insert into retry_dead_letter_${partition} (id, unique_id, group_name, scene_name, diff --git a/easy-retry-server/src/main/resources/mapper/RetryTaskLogMapper.xml b/easy-retry-server/src/main/resources/mapper/RetryTaskLogMapper.xml index 60f0d3f64..ea0bcd5a0 100644 --- a/easy-retry-server/src/main/resources/mapper/RetryTaskLogMapper.xml +++ b/easy-retry-server/src/main/resources/mapper/RetryTaskLogMapper.xml @@ -12,12 +12,13 @@ + id, unique_id, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, retry_status, error_message, - create_dt + create_dt, task_type select