diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/RetryTaskService.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/RetryTaskService.java index 8010590c..c7feaa09 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/RetryTaskService.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/RetryTaskService.java @@ -71,4 +71,20 @@ public interface RetryTaskService { * @return */ Integer parseLogs(ParseLogsVO parseLogsVO); + + /** + * 手动支持重试任务 + * + * @param requestVO + * @return + */ + boolean manualTriggerRetryTask(ManualTriggerTaskRequestVO requestVO); + + /** + * 手动执行回调任务 + * + * @param requestVO + * @return + */ + boolean manualTriggerCallbackTask(ManualTriggerTaskRequestVO requestVO); } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryTaskServiceImpl.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryTaskServiceImpl.java index 9d7796fd..5c134661 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryTaskServiceImpl.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryTaskServiceImpl.java @@ -1,20 +1,32 @@ package com.aizuda.easy.retry.server.service.impl; +import akka.actor.ActorRef; import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO; import com.aizuda.easy.retry.client.model.GenerateRetryIdempotentIdDTO; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.common.core.util.JsonUtil; +import com.aizuda.easy.retry.server.akka.ActorGenerator; import com.aizuda.easy.retry.server.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.enums.TaskGeneratorScene; +import com.aizuda.easy.retry.server.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO; import com.aizuda.easy.retry.server.service.RetryTaskService; import com.aizuda.easy.retry.server.service.convert.RetryTaskResponseVOConverter; import com.aizuda.easy.retry.server.service.convert.TaskContextConverter; +import com.aizuda.easy.retry.server.support.IdempotentStrategy; +import com.aizuda.easy.retry.server.support.WaitStrategy; +import com.aizuda.easy.retry.server.support.context.CallbackRetryContext; +import com.aizuda.easy.retry.server.support.context.MaxAttemptsPersistenceRetryContext; import com.aizuda.easy.retry.server.support.generator.TaskGenerator; import com.aizuda.easy.retry.server.support.generator.task.TaskContext; import com.aizuda.easy.retry.server.support.handler.ClientNodeAllocateHandler; +import com.aizuda.easy.retry.server.support.retry.RetryBuilder; +import com.aizuda.easy.retry.server.support.retry.RetryExecutor; +import com.aizuda.easy.retry.server.support.strategy.FilterStrategies; +import com.aizuda.easy.retry.server.support.strategy.StopStrategies; import com.aizuda.easy.retry.server.support.strategy.WaitStrategies; import com.aizuda.easy.retry.server.web.model.base.PageResult; import com.aizuda.easy.retry.server.web.model.request.*; @@ -26,12 +38,14 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLog import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLogMessage; +import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig; import com.aizuda.easy.retry.template.datasource.utils.RequestDataHelper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import org.apache.commons.lang.StringUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.http.HttpEntity; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -68,6 +82,9 @@ public class RetryTaskServiceImpl implements RetryTaskService { private AccessTemplate accessTemplate; @Autowired private List taskGenerators; + @Autowired + @Qualifier("bitSetIdempotentStrategyHandler") + protected IdempotentStrategy idempotentStrategy; @Override public PageResult> getRetryTaskPage(RetryTaskQueryVO queryVO) { @@ -286,4 +303,107 @@ public class RetryTaskServiceImpl implements RetryTaskService { return waitInsertList.size(); } + @Override + public boolean manualTriggerRetryTask(ManualTriggerTaskRequestVO requestVO) { + + List uniqueIds = requestVO.getUniqueIds(); + String groupName = requestVO.getGroupName(); + + List list = accessTemplate.getRetryTaskAccess().list(requestVO.getGroupName(), + new LambdaQueryWrapper() + .eq(RetryTask::getTaskType, TaskTypeEnum.RETRY.getType()) + .in(RetryTask::getUniqueId, uniqueIds)); + Assert.notEmpty(list, () -> new EasyRetryServerException("没有可执行的任务")); + + for (RetryTask retryTask : list) { + MaxAttemptsPersistenceRetryContext> retryContext = new MaxAttemptsPersistenceRetryContext<>(); + retryContext.setRetryTask(retryTask); + retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName)); + retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName())); + + retryCountIncrement(retryTask); + + RetryExecutor> executor = RetryBuilder.>newBuilder() + .withStopStrategy(StopStrategies.stopException()) + .withStopStrategy(StopStrategies.stopResultStatusCode()) + .withWaitStrategy(getRetryTaskWaitWaitStrategy(retryTask.getGroupName(), retryTask.getSceneName())) + .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) + .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) + .withFilterStrategy(FilterStrategies.rebalanceFilterStrategies()) + .withFilterStrategy(FilterStrategies.rateLimiterFilter()) + .withRetryContext(retryContext) + .build(); + + Assert.isTrue(executor.filter(), () -> new EasyRetryServerException("任务:{}不满足执行条件.具体原因请查看系统日志", retryTask.getUniqueId())); + + productExecUnitActor(executor, ActorGenerator.execUnitActor()); + } + + return true; + } + + @Override + public boolean manualTriggerCallbackTask(ManualTriggerTaskRequestVO requestVO) { + List uniqueIds = requestVO.getUniqueIds(); + String groupName = requestVO.getGroupName(); + + List list = accessTemplate.getRetryTaskAccess().list(requestVO.getGroupName(), + new LambdaQueryWrapper() + .eq(RetryTask::getTaskType, TaskTypeEnum.CALLBACK.getType()) + .in(RetryTask::getUniqueId, uniqueIds)); + Assert.notEmpty(list, () -> new EasyRetryServerException("没有可执行的任务")); + + for (RetryTask retryTask : list) { + + CallbackRetryContext retryContext = new CallbackRetryContext<>(); + retryContext.setRetryTask(retryTask); + retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName)); + retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName())); + + retryCountIncrement(retryTask); + + RetryExecutor executor = RetryBuilder.newBuilder() + .withStopStrategy(StopStrategies.stopException()) + .withStopStrategy(StopStrategies.stopResultStatusCode()) + .withWaitStrategy(getCallbackWaitWaitStrategy()) + .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) + .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) + .withFilterStrategy(FilterStrategies.rebalanceFilterStrategies()) + .withFilterStrategy(FilterStrategies.rateLimiterFilter()) + .withRetryContext(retryContext) + .build(); + + Assert.isTrue(executor.filter(), () -> new EasyRetryServerException("任务:{}不满足执行条件.具体原因请查看系统日志", retryTask.getUniqueId())); + + productExecUnitActor(executor, ActorGenerator.execCallbackUnitActor()); + } + + return true; + } + + private WaitStrategy getRetryTaskWaitWaitStrategy(String groupName, String sceneName) { + + SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(groupName, sceneName); + Integer backOff = sceneConfig.getBackOff(); + + return WaitStrategies.WaitStrategyEnum.getWaitStrategy(backOff); + } + + private WaitStrategy getCallbackWaitWaitStrategy() { + // 回调失败每15min重试一次 + return WaitStrategies.WaitStrategyEnum.getWaitStrategy(WaitStrategies.WaitStrategyEnum.FIXED.getBackOff()); + } + + private void retryCountIncrement(RetryTask retryTask) { + Integer retryCount = retryTask.getRetryCount(); + retryTask.setRetryCount(++retryCount); + } + + private void productExecUnitActor(RetryExecutor retryExecutor, ActorRef actorRef) { + String groupIdHash = retryExecutor.getRetryContext().getRetryTask().getGroupName(); + Long retryId = retryExecutor.getRetryContext().getRetryTask().getId(); + idempotentStrategy.set(groupIdHash, retryId.intValue()); + + actorRef.tell(retryExecutor, actorRef); + } } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/AbstractScanGroup.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/AbstractScanGroup.java index c6083e1e..497364e2 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/AbstractScanGroup.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/scan/AbstractScanGroup.java @@ -120,7 +120,6 @@ public abstract class AbstractScanGroup extends AbstractActor { Long retryId = retryExecutor.getRetryContext().getRetryTask().getId(); idempotentStrategy.set(groupIdHash, retryId.intValue()); - // 重试成功回调客户端 ActorRef actorRef = getActorRef(); actorRef.tell(retryExecutor, actorRef); } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/web/controller/RetryTaskController.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/web/controller/RetryTaskController.java index e40dcc8d..ac7c6387 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/web/controller/RetryTaskController.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/web/controller/RetryTaskController.java @@ -81,4 +81,16 @@ public class RetryTaskController { public Integer parseLogs(@RequestBody @Validated ParseLogsVO parseLogsVO) { return retryTaskService.parseLogs(parseLogsVO); } + + @LoginRequired + @PostMapping("/manual/trigger/retry/task") + public boolean manualTriggerRetryTask(@RequestBody @Validated ManualTriggerTaskRequestVO requestVO) { + return retryTaskService.manualTriggerRetryTask(requestVO); + } + + @LoginRequired + @PostMapping("/manual/trigger/callback/task") + public boolean manualTriggerCallbackTask(@RequestBody @Validated ManualTriggerTaskRequestVO requestVO) { + return retryTaskService.manualTriggerCallbackTask(requestVO); + } } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/web/model/request/ManualTriggerTaskRequestVO.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/web/model/request/ManualTriggerTaskRequestVO.java new file mode 100644 index 00000000..fbf5fede --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/web/model/request/ManualTriggerTaskRequestVO.java @@ -0,0 +1,25 @@ +package com.aizuda.easy.retry.server.web.model.request; + +import lombok.Data; + +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.Pattern; +import java.util.List; + +/** + * @author www.byteblogs.com + * @date 2023-09-11 22:00:26 + * @since 2.3.0 + */ +@Data +public class ManualTriggerTaskRequestVO { + + @NotBlank(message = "groupName 不能为空") + @Pattern(regexp = "^[A-Za-z0-9_]{1,64}$", message = "仅支持长度为1~64字符且类型为数字、字母和下划线") + private String groupName; + + @NotEmpty(message = "uniqueIds 不能为空") + private List uniqueIds; + +} diff --git a/frontend/src/api/manage.js b/frontend/src/api/manage.js index 56ea2fc5..f310bf10 100644 --- a/frontend/src/api/manage.js +++ b/frontend/src/api/manage.js @@ -19,6 +19,8 @@ const api = { batchUpdate: '/retry-task/batch', deleteRetryTask: '/retry-task/batch', updateRetryTaskStatus: '/retry-task/status', + manualTriggerRetryTask: '/retry-task/manual/trigger/retry/task', + manualTriggerCallbackTask: '/retry-task/manual/trigger/callback/task', retryTaskLogPage: '/retry-task-log/list', retryTaskLogMessagePage: '/retry-task-log/message/list', retryTaskLogById: '/retry-task-log/', @@ -209,6 +211,22 @@ export function updateRetryTaskStatus (data) { }) } +export function manualTriggerCallbackTask (data) { + return request({ + url: api.manualTriggerCallbackTask, + method: 'post', + data + }) +} + +export function manualTriggerRetryTask (data) { + return request({ + url: api.manualTriggerRetryTask, + method: 'post', + data + }) +} + export function getScenePage (parameter) { return request({ url: api.scenePageList, diff --git a/frontend/src/views/task/RetryTaskList.vue b/frontend/src/views/task/RetryTaskList.vue index 6ed9add7..3255ebd0 100644 --- a/frontend/src/views/task/RetryTaskList.vue +++ b/frontend/src/views/task/RetryTaskList.vue @@ -104,12 +104,42 @@ @@ -124,7 +154,14 @@