From 9e7c785ed52151843cd71fd4968c07063c2c112b Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Mon, 17 Feb 2025 23:01:12 +0800 Subject: [PATCH] =?UTF-8?q?feat(1.4.0-beta1):=20=E9=A1=B5=E9=9D=A2?= =?UTF-8?q?=E8=B0=83=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../model/request/StopRetryRequest.java | 8 - .../server/common/akka/ActorGenerator.java | 88 ++---- .../rpc/client/GrpcClientInvokeHandler.java | 9 + .../rpc/client/SnailJobRetryListener.java | 20 ++ .../retry/task/client/RetryRpcClient.java | 2 +- .../task/dto/RequestCallbackExecutorDTO.java | 32 ++ ...rDTO.java => RequestRetryExecutorDTO.java} | 2 +- .../dto/RequestStopRetryTaskExecutorDTO.java | 32 ++ .../task/support/RetryTaskConverter.java | 27 +- .../task/support/RetryTaskLogConverter.java | 10 +- .../dispatch/RequestCallbackClientActor.java | 181 ++++++++++++ .../dispatch/RequestRetryClientActor.java | 107 ++++--- .../dispatch/RequestStopClientActor.java | 123 ++------ .../task/support/dispatch/RetryExecutor.java | 31 +- .../task/support/dispatch/ScanRetryActor.java | 12 +- .../dispatch/actor/result/FailureActor.java | 0 .../support/handler/RetryTaskStopHandler.java | 4 +- .../support/idempotent/IdempotentHolder.java | 5 - .../RetryIdempotentStrategyHandler.java | 46 --- .../task/support/retry/RetryBuilder.java | 84 ------ .../task/support/retry/RetryExecutor.java | 133 --------- .../support/strategy/FilterStrategies.java | 277 ------------------ .../task/support/strategy/StopStrategies.java | 140 --------- ...skController.java => RetryController.java} | 32 +- .../controller/RetryTaskLogController.java | 6 +- ...yTaskLogQueryVO.java => RetryQueryVO.java} | 15 +- ...RequestVO.java => RetrySaveRequestVO.java} | 2 +- .../web/model/request/RetryTaskQueryVO.java | 13 +- ... => RetryUpdateExecutorNameRequestVO.java} | 2 +- ...O.java => RetryUpdateStatusRequestVO.java} | 2 +- .../model/request/SceneConfigRequestVO.java | 2 +- ...skResponseVO.java => RetryResponseVO.java} | 4 +- .../response/RetryTaskLogResponseVO.java | 22 +- .../model/response/SceneConfigResponseVO.java | 20 ++ .../web/service/RetryTaskLogService.java | 4 +- .../server/web/service/RetryTaskService.java | 22 +- .../convert/RetryTaskResponseVOConverter.java | 6 +- .../service/convert/TaskContextConverter.java | 4 +- .../service/impl/RetryTaskLogServiceImpl.java | 49 ++-- .../service/impl/RetryTaskServiceImpl.java | 35 ++- 40 files changed, 575 insertions(+), 1038 deletions(-) create mode 100644 snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/SnailJobRetryListener.java create mode 100644 snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RequestCallbackExecutorDTO.java rename snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/{RealRetryExecutorDTO.java => RequestRetryExecutorDTO.java} (89%) create mode 100644 snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RequestStopRetryTaskExecutorDTO.java create mode 100644 snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestCallbackClientActor.java delete mode 100644 snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/result/FailureActor.java delete mode 100644 snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/idempotent/RetryIdempotentStrategyHandler.java delete mode 100644 snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/retry/RetryBuilder.java delete mode 100644 snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/retry/RetryExecutor.java delete mode 100644 snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/strategy/FilterStrategies.java delete mode 100644 snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/strategy/StopStrategies.java rename snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/{RetryTaskController.java => RetryController.java} (64%) rename snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/{RetryTaskLogQueryVO.java => RetryQueryVO.java} (61%) rename snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/{RetryTaskSaveRequestVO.java => RetrySaveRequestVO.java} (96%) rename snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/{RetryTaskUpdateExecutorNameRequestVO.java => RetryUpdateExecutorNameRequestVO.java} (94%) rename snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/{RetryTaskUpdateStatusRequestVO.java => RetryUpdateStatusRequestVO.java} (94%) rename snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/{RetryTaskResponseVO.java => RetryResponseVO.java} (90%) diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/StopRetryRequest.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/StopRetryRequest.java index f3873eb6..dfd6e7d4 100644 --- a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/StopRetryRequest.java +++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/StopRetryRequest.java @@ -19,16 +19,8 @@ public class StopRetryRequest { private String groupName; @NotBlank(message = "scene 不能为空") private String scene; - @NotBlank(message = "参数 不能为空") - private String argsStr; - @NotBlank(message = "idempotentId 不能为空") - private String idempotentId; - @NotBlank(message = "executorName 不能为空") - private String executorName; @NotNull(message = "retryTaskId 不能为空") private Long retryTaskId; @NotNull(message = "retryId 不能为空") private Long retryId; - @NotNull(message = "retryCount 不能为空") - private Integer retryCount; } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/akka/ActorGenerator.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/akka/ActorGenerator.java index 44344525..7cc41362 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/akka/ActorGenerator.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/akka/ActorGenerator.java @@ -35,8 +35,9 @@ public class ActorGenerator { public static final String RETRY_EXECUTOR_ACTOR = "RetryExecutorActor"; public static final String RETRY_TASK_PREPARE_ACTOR = "RetryTaskPrepareActor"; public static final String LOG_ACTOR = "RetryLogActor"; - public static final String REAL_RETRY_EXECUTOR_ACTOR = "RealRetryExecutorActor"; public static final String RETRY_EXECUTOR_RESULT_ACTOR = "RetryExecutorResultActor"; + public static final String REAL_RETRY_EXECUTOR_ACTOR = "RealRetryExecutorActor"; + public static final String REAL_CALLBACK_EXECUTOR_ACTOR = "RealCallbackExecutorActor"; public static final String RETRY_REAL_STOP_TASK_INSTANCE_ACTOR = "RetryRealStopTaskInstanceActor"; @@ -72,64 +73,6 @@ public class ActorGenerator { private ActorGenerator() { } - /** - * 生成重试完成的actor - * - * @return actor 引用 - */ - @Deprecated - - public static ActorRef finishActor() { - return getRetryActorSystem().actorOf(getSpringExtension().props(FINISH_ACTOR).withDispatcher(RETRY_TASK_EXECUTOR_RESULT_DISPATCHER)); - } - - /** - * 生成重试失败的actor - * - * @return actor 引用 - */ - @Deprecated - - public static ActorRef failureActor() { - return getRetryActorSystem().actorOf(getSpringExtension().props(FAILURE_ACTOR).withDispatcher(RETRY_TASK_EXECUTOR_RESULT_DISPATCHER)); - } - - - /** - * 回调处理 - * - * @return actor 引用 - */ - @Deprecated - public static ActorRef execCallbackUnitActor() { - return getRetryActorSystem().actorOf(getSpringExtension() - .props(EXEC_CALLBACK_UNIT_ACTOR) - .withDispatcher(RETRY_TASK_EXECUTOR_DISPATCHER)); - } - - /** - * 生成重试执行的actor - * - * @return actor 引用 - */ - @Deprecated - public static ActorRef execUnitActor() { - return getRetryActorSystem().actorOf(getSpringExtension() - .props(EXEC_UNIT_ACTOR) - .withDispatcher(RETRY_TASK_EXECUTOR_DISPATCHER)); - } - - /** - * 生成重试执行的actor - * - * @return actor 引用 - */ - public static ActorRef stopRetryTaskActor() { - return getRetryActorSystem().actorOf(getSpringExtension() - .props(RETRY_REAL_STOP_TASK_INSTANCE_ACTOR) - .withDispatcher(RETRY_TASK_EXECUTOR_CALL_CLIENT_DISPATCHER)); - } - /** * Retry任务执行结果actor * @@ -164,7 +107,6 @@ public class ActorGenerator { .withDispatcher(COMMON_SCAN_TASK_DISPATCHER)); } - /** * actor * @@ -176,9 +118,21 @@ public class ActorGenerator { .withDispatcher(RETRY_TASK_EXECUTOR_DISPATCHER)); } + /** + * 尝试停止执行中的任务 + * + * @return ActorRef + */ + public static ActorRef stopRetryTaskActor() { + return getRetryActorSystem().actorOf(getSpringExtension() + .props(RETRY_REAL_STOP_TASK_INSTANCE_ACTOR) + .withDispatcher(RETRY_TASK_EXECUTOR_CALL_CLIENT_DISPATCHER)); + } + /** * 调用客户端执行重试 - * @return + * + * @return ActorRef */ public static ActorRef retryRealTaskExecutorActor() { return getRetryActorSystem().actorOf(getSpringExtension() @@ -186,6 +140,18 @@ public class ActorGenerator { .withDispatcher(RETRY_TASK_EXECUTOR_CALL_CLIENT_DISPATCHER)); } + /** + * 调用客户端执行回调 + * + * @return ActorRef + */ + public static ActorRef callbackRealTaskExecutorActor() { + return getRetryActorSystem().actorOf(getSpringExtension() + .props(REAL_CALLBACK_EXECUTOR_ACTOR) + .withDispatcher(RETRY_TASK_EXECUTOR_CALL_CLIENT_DISPATCHER)); + } + + /** diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/GrpcClientInvokeHandler.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/GrpcClientInvokeHandler.java index 9e0ee9e8..51f1354c 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/GrpcClientInvokeHandler.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/GrpcClientInvokeHandler.java @@ -119,6 +119,15 @@ public class GrpcClientInvokeHandler implements InvocationHandler { for (int count = 1; count <= size; count++) { log.debug("Start request client. count:[{}] clientId:[{}] clientAddr:[{}:{}] serverIp:[{}]", count, hostId, hostIp, hostPort, NetUtil.getLocalIpStr()); + if (retryListener instanceof SnailJobRetryListener) { + // 传递修改之后的客户端节点信息 + SnailJobRetryListener listener = (SnailJobRetryListener) retryListener; + Map properties = listener.properties(); + properties.put("HOST_ID", hostId); + properties.put("HOST_IP", hostIp); + properties.put("HOST_PORT", hostPort); + } + Result result = requestRemote(method, args, annotation, count); if (Objects.nonNull(result)) { return result; diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/SnailJobRetryListener.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/SnailJobRetryListener.java new file mode 100644 index 00000000..405d9f62 --- /dev/null +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/SnailJobRetryListener.java @@ -0,0 +1,20 @@ +package com.aizuda.snailjob.server.common.rpc.client; + +import com.github.rholder.retry.RetryListener; + +import java.util.Map; + +/** + * author: zhangshuguang + * date: 2025-02-17 + */ +public interface SnailJobRetryListener extends RetryListener { + + /** + * 传递属性信息 + * + * @return Map + */ + Map properties(); + +} diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/client/RetryRpcClient.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/client/RetryRpcClient.java index 2e097d31..81775f27 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/client/RetryRpcClient.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/client/RetryRpcClient.java @@ -30,7 +30,7 @@ public interface RetryRpcClient { Result stop(@Body StopRetryRequest stopRetryRequest); @Mapping(path = RETRY_CALLBACK, method = RequestMethod.POST) - Result callback(@Body RetryCallbackDTO retryCallbackDTO); + Result callback(@Body RetryCallbackDTO retryCallbackDTO); @Mapping(path = RETRY_GENERATE_IDEM_ID, method = RequestMethod.POST) Result generateIdempotentId(@Body GenerateRetryIdempotentIdDTO retryCallbackDTO); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RequestCallbackExecutorDTO.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RequestCallbackExecutorDTO.java new file mode 100644 index 00000000..41a716d4 --- /dev/null +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RequestCallbackExecutorDTO.java @@ -0,0 +1,32 @@ +package com.aizuda.snailjob.server.retry.task.dto; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +/** + *

+ * + *

+ * + * @author opensnail + * @date 2025-01-26 + */ +@EqualsAndHashCode(callSuper = true) +@Data +public class RequestCallbackExecutorDTO extends BaseDTO { + + private String clientId; + + private Integer routeKey; + + private Integer executorTimeout; + + private Long deadlineRequest; + + private String argsStr; + + private String executorName; + + private Integer retryCount; + +} diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RealRetryExecutorDTO.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RequestRetryExecutorDTO.java similarity index 89% rename from snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RealRetryExecutorDTO.java rename to snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RequestRetryExecutorDTO.java index 10b6b536..7720653b 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RealRetryExecutorDTO.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RequestRetryExecutorDTO.java @@ -13,7 +13,7 @@ import lombok.EqualsAndHashCode; */ @EqualsAndHashCode(callSuper = true) @Data -public class RealRetryExecutorDTO extends BaseDTO { +public class RequestRetryExecutorDTO extends BaseDTO { private String clientId; diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RequestStopRetryTaskExecutorDTO.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RequestStopRetryTaskExecutorDTO.java new file mode 100644 index 00000000..fee5e0de --- /dev/null +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RequestStopRetryTaskExecutorDTO.java @@ -0,0 +1,32 @@ +package com.aizuda.snailjob.server.retry.task.dto; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +/** + *

+ * + *

+ * + * @author opensnail + * @date 2025-01-26 + */ +@EqualsAndHashCode(callSuper = true) +@Data +public class RequestStopRetryTaskExecutorDTO extends BaseDTO { + + private String clientId; + + private Integer routeKey; + + private Integer executorTimeout; + + private Long deadlineRequest; + + private String argsStr; + + private String executorName; + + private Integer retryCount; + +} diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskConverter.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskConverter.java index aa898ed8..79084169 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskConverter.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskConverter.java @@ -110,7 +110,7 @@ public interface RetryTaskConverter { RetryTask toRetryTask(RetryTaskGeneratorDTO context); - DispatchRetryRequest toDispatchRetryRequest(RealRetryExecutorDTO executorDTO); + DispatchRetryRequest toDispatchRetryRequest(RequestRetryExecutorDTO executorDTO); @Mappings({ @Mapping(target = "namespaceId", source = "retry.namespaceId"), @@ -119,12 +119,16 @@ public interface RetryTaskConverter { @Mapping(target = "retryId", source = "retry.id"), @Mapping(target = "taskType", source = "retry.taskType"), }) - RealRetryExecutorDTO toRealRetryExecutorDTO(RetrySceneConfig execute, Retry retry); + RequestRetryExecutorDTO toRealRetryExecutorDTO(RetrySceneConfig execute, Retry retry); - RealRetryExecutorDTO toRealRetryExecutorDTO(TaskStopJobDTO stopJobDTO); + RequestRetryExecutorDTO toRealRetryExecutorDTO(TaskStopJobDTO stopJobDTO); RetryExecutorResultDTO toRetryExecutorResultDTO(DispatchRetryResultDTO resultDTO); + RetryExecutorResultDTO toRetryExecutorResultDTO(RequestRetryExecutorDTO resultDTO); + + RetryExecutorResultDTO toRetryExecutorResultDTO(RequestCallbackExecutorDTO resultDTO); + RetryTaskGeneratorDTO toRetryTaskGeneratorDTO(RetryTaskPrepareDTO jobPrepareDTO); RetryTaskGeneratorDTO toRetryTaskGeneratorDTO(BlockStrategyContext context); @@ -133,7 +137,9 @@ public interface RetryTaskConverter { TaskStopJobDTO toTaskStopJobDTO(BlockStrategyContext context); - StopRetryRequest toStopRetryRequest(RealRetryExecutorDTO executorDTO); + StopRetryRequest toStopRetryRequest(RequestCallbackExecutorDTO executorDTO); + + StopRetryRequest toStopRetryRequest(RequestStopRetryTaskExecutorDTO executorDTO); @Mappings({ @Mapping(source = "retry.id", target = "retryId"), @@ -147,7 +153,18 @@ public interface RetryTaskConverter { RetryTaskExecuteDTO toRetryTaskExecuteDTO(RetryTimerContext context); - JobLogMetaDTO toJobLogDTO(RealRetryExecutorDTO executorDTO); + JobLogMetaDTO toJobLogDTO(RequestRetryExecutorDTO executorDTO); + + JobLogMetaDTO toJobLogDTO(RequestCallbackExecutorDTO executorDTO); RetryResultContext toRetryResultContext(RetryExecutorResultDTO resultDTO); + + @Mappings({ + @Mapping(target = "namespaceId", source = "retry.namespaceId"), + @Mapping(target = "groupName", source = "retry.groupName"), + @Mapping(target = "sceneName", source = "retry.sceneName"), + @Mapping(target = "retryId", source = "retry.id"), + @Mapping(target = "taskType", source = "retry.taskType"), + }) + RequestCallbackExecutorDTO toRequestCallbackExecutorDTO(RetrySceneConfig retrySceneConfig, Retry retry); } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskLogConverter.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskLogConverter.java index 85b87a29..d2dee6e3 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskLogConverter.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskLogConverter.java @@ -1,6 +1,8 @@ package com.aizuda.snailjob.server.retry.task.support; -import com.aizuda.snailjob.server.retry.task.dto.RealRetryExecutorDTO; +import com.aizuda.snailjob.server.common.dto.RetryLogMetaDTO; +import com.aizuda.snailjob.server.retry.task.dto.RequestCallbackExecutorDTO; +import com.aizuda.snailjob.server.retry.task.dto.RequestRetryExecutorDTO; import com.aizuda.snailjob.server.retry.task.dto.RetryMergePartitionTaskDTO; import com.aizuda.snailjob.server.retry.task.dto.RetryTaskLogDTO; import com.aizuda.snailjob.template.datasource.persistence.po.Retry; @@ -29,9 +31,13 @@ public interface RetryTaskLogConverter { RetryTaskLogDTO toRetryTaskLogDTO(Retry retry); - RetryTaskLogDTO toRetryTaskLogDTO(RealRetryExecutorDTO retry); + RetryTaskLogDTO toRetryTaskLogDTO(RequestRetryExecutorDTO retry); List toRetryMergePartitionTaskDTOs(List retryTaskList); RetryTaskLogMessage toRetryTaskLogMessage(RetryTaskLogMessage message); + + RetryLogMetaDTO toRetryLogMetaDTO(RequestRetryExecutorDTO executorDTO); + + RetryLogMetaDTO toRetryLogMetaDTO(RequestCallbackExecutorDTO executorDTO); } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestCallbackClientActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestCallbackClientActor.java new file mode 100644 index 00000000..3dfac0b3 --- /dev/null +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestCallbackClientActor.java @@ -0,0 +1,181 @@ +package com.aizuda.snailjob.server.retry.task.support.dispatch; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import com.aizuda.snailjob.client.model.request.StopRetryRequest; +import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.core.model.Result; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.akka.ActorGenerator; +import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; +import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO; +import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; +import com.aizuda.snailjob.server.common.dto.RetryLogMetaDTO; +import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder; +import com.aizuda.snailjob.server.common.rpc.client.SnailJobRetryListener; +import com.aizuda.snailjob.server.common.util.ClientInfoUtils; +import com.aizuda.snailjob.server.common.util.DateUtils; +import com.aizuda.snailjob.server.retry.task.client.RetryRpcClient; +import com.aizuda.snailjob.server.retry.task.dto.RequestCallbackExecutorDTO; +import com.aizuda.snailjob.server.retry.task.dto.RequestRetryExecutorDTO; +import com.aizuda.snailjob.server.retry.task.dto.RetryExecutorResultDTO; +import com.aizuda.snailjob.server.retry.task.dto.RetryTaskLogDTO; +import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; +import com.aizuda.snailjob.server.retry.task.support.RetryTaskLogConverter; +import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; +import com.github.rholder.retry.Attempt; +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.RetryListener; +import com.google.common.collect.Maps; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +import java.lang.reflect.UndeclaredThrowableException; +import java.util.Map; +import java.util.Objects; + +/** + * @author opensnail + * @date 2023-10-06 16:42:08 + * @since 2.4.0 + */ +@Component(ActorGenerator.REAL_CALLBACK_EXECUTOR_ACTOR) +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +@Slf4j +@RequiredArgsConstructor +public class RequestCallbackClientActor extends AbstractActor { + private final RetryTaskMapper retryTaskMapper; + + @Override + public Receive createReceive() { + return receiveBuilder().match(RequestCallbackExecutorDTO.class, executorDTO -> { + try { + doCallback(executorDTO); + } catch (Exception e) { + log.error("请求客户端发生异常", e); + } + }).build(); + } + + private void doCallback(RequestCallbackExecutorDTO executorDTO) { + long nowMilli = DateUtils.toNowMilli(); + // 检查客户端是否存在 + RegisterNodeInfo registerNodeInfo = CacheRegisterTable.getServerNode( + executorDTO.getGroupName(), + executorDTO.getNamespaceId(), + executorDTO.getClientId() + ); + if (Objects.isNull(registerNodeInfo)) { + taskExecuteFailure(executorDTO, "客户端不存在"); + JobLogMetaDTO jobLogMetaDTO = RetryTaskConverter.INSTANCE.toJobLogDTO(executorDTO); + jobLogMetaDTO.setTimestamp(nowMilli); + SnailJobLog.REMOTE.error("retryTaskId:[{}] 任务调度失败. 失败原因: 无可执行的客户端 <|>{}<|>", executorDTO.getRetryTaskId(), + jobLogMetaDTO); + return; + } + + StopRetryRequest stopRetryRequest = RetryTaskConverter.INSTANCE.toStopRetryRequest(executorDTO); + + try { + + // 构建请求客户端对象 + RetryRpcClient rpcClient = buildRpcClient(registerNodeInfo, executorDTO); + Result dispatch = rpcClient.stop(stopRetryRequest); + if (dispatch.getStatus() == StatusEnum.YES.getStatus()) { + SnailJobLog.LOCAL.info("retryTaskId:[{}] 任务调度成功.", executorDTO.getRetryTaskId()); + } else { + // 客户端返回失败,则认为任务执行失败 + SnailJobLog.LOCAL.error("retryTaskId:[{}] 任务调度失败. msg:[{}]", executorDTO.getRetryTaskId(), dispatch.getMessage()); + taskExecuteFailure(executorDTO, dispatch.getMessage()); + } + + } catch (Exception e) { + Throwable throwable; + if (e.getClass().isAssignableFrom(RetryException.class)) { + RetryException re = (RetryException) e; + throwable = re.getLastFailedAttempt().getExceptionCause(); + } else if (e.getClass().isAssignableFrom(UndeclaredThrowableException.class)) { + UndeclaredThrowableException re = (UndeclaredThrowableException) e; + throwable = re.getUndeclaredThrowable(); + } else { + throwable = e; + } + + RetryLogMetaDTO retryTaskLogDTO = RetryTaskLogConverter.INSTANCE.toRetryLogMetaDTO(executorDTO); + retryTaskLogDTO.setTimestamp(nowMilli); + SnailJobLog.REMOTE.error("retryTaskId:[{}] 任务调度失败. <|>{}<|>", retryTaskLogDTO.getRetryTaskId(), + retryTaskLogDTO, throwable); + + taskExecuteFailure(executorDTO, throwable.getMessage()); + + } + + } + + public class RetryExecutorRetryListener implements SnailJobRetryListener { + + private final Map properties; + private final RequestCallbackExecutorDTO executorDTO; + + public RetryExecutorRetryListener(final RequestCallbackExecutorDTO executorDTO) { + this.executorDTO = executorDTO; + this.properties = Maps.newHashMap(); + } + + @Override + public void onRetry(final Attempt attempt) { + if (attempt.getAttemptNumber() > 0) { + // 更新最新负载节点 + String hostId = (String) properties.get("HOST_ID"); + String hostIp = (String) properties.get("HOST_IP"); + String hostPort = (String) properties.get("HOST_PORT"); + + RetryTask retryTask = new RetryTask(); + retryTask.setId(executorDTO.getRetryTaskId()); + RegisterNodeInfo realNodeInfo = new RegisterNodeInfo(); + realNodeInfo.setHostIp(hostIp); + realNodeInfo.setHostPort(Integer.valueOf(hostPort)); + realNodeInfo.setHostId(hostId); + retryTask.setClientInfo(ClientInfoUtils.generate(realNodeInfo)); + retryTaskMapper.updateById(retryTask); + } + + } + + @Override + public Map properties() { + return properties; + } + } + + private RetryRpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo, RequestCallbackExecutorDTO executorDTO) { + return RequestBuilder.newBuilder() + .nodeInfo(registerNodeInfo) + .failRetry(true) + .failover(true) + .retryTimes(3) + .retryInterval(1) + .routeKey(executorDTO.getRouteKey()) + .allocKey(String.valueOf(executorDTO.getRetryTaskId())) + .retryListener(new RetryExecutorRetryListener(executorDTO)) + .client(RetryRpcClient.class) + .build(); + } + + /** + * 更新是执行状态 + * + * @param executorDTO RequestRetryExecutorDTO + * @param message 失败原因 + */ + private static void taskExecuteFailure(RequestCallbackExecutorDTO executorDTO, String message) { + ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor(); + RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO); + executorResultDTO.setExceptionMsg(message); + actorRef.tell(executorResultDTO, actorRef); + } +} diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestRetryClientActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestRetryClientActor.java index 97be55c7..eecf55ef 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestRetryClientActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestRetryClientActor.java @@ -1,7 +1,7 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch; import akka.actor.AbstractActor; -import com.aizuda.snailjob.client.model.DispatchRetryResultDTO; +import akka.actor.ActorRef; import com.aizuda.snailjob.client.model.request.DispatchRetryRequest; import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.model.Result; @@ -11,22 +11,29 @@ import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO; import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; +import com.aizuda.snailjob.server.common.dto.RetryLogMetaDTO; import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder; +import com.aizuda.snailjob.server.common.rpc.client.SnailJobRetryListener; +import com.aizuda.snailjob.server.common.util.ClientInfoUtils; import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.retry.task.client.RetryRpcClient; -import com.aizuda.snailjob.server.retry.task.dto.RealRetryExecutorDTO; -import com.aizuda.snailjob.server.retry.task.dto.RetryTaskLogDTO; +import com.aizuda.snailjob.server.retry.task.dto.RequestRetryExecutorDTO; +import com.aizuda.snailjob.server.retry.task.dto.RetryExecutorResultDTO; import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; import com.aizuda.snailjob.server.retry.task.support.RetryTaskLogConverter; +import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; import com.github.rholder.retry.Attempt; import com.github.rholder.retry.RetryException; -import com.github.rholder.retry.RetryListener; +import com.google.common.collect.Maps; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.lang.reflect.UndeclaredThrowableException; +import java.util.Map; import java.util.Objects; /** @@ -37,11 +44,13 @@ import java.util.Objects; @Component(ActorGenerator.REAL_RETRY_EXECUTOR_ACTOR) @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) @Slf4j +@RequiredArgsConstructor public class RequestRetryClientActor extends AbstractActor { + private final RetryTaskMapper retryTaskMapper; @Override public Receive createReceive() { - return receiveBuilder().match(RealRetryExecutorDTO.class, realRetryExecutorDTO -> { + return receiveBuilder().match(RequestRetryExecutorDTO.class, realRetryExecutorDTO -> { try { doExecute(realRetryExecutorDTO); } catch (Exception e) { @@ -50,7 +59,7 @@ public class RequestRetryClientActor extends AbstractActor { }).build(); } - private void doExecute(RealRetryExecutorDTO executorDTO) { + private void doExecute(RequestRetryExecutorDTO executorDTO) { long nowMilli = DateUtils.toNowMilli(); // 检查客户端是否存在 RegisterNodeInfo registerNodeInfo = CacheRegisterTable.getServerNode( @@ -82,12 +91,12 @@ public class RequestRetryClientActor extends AbstractActor { RetryRpcClient rpcClient = buildRpcClient(registerNodeInfo, executorDTO); Result dispatch = rpcClient.dispatch(dispatchJobRequest, snailJobHeaders); Boolean data = dispatch.getData(); - // todo 是否需要根据DispatchRetryResultDTO if (dispatch.getStatus() == StatusEnum.YES.getStatus() && Objects.nonNull(data) && data) { SnailJobLog.LOCAL.info("retryTaskId:[{}] 任务调度成功.", executorDTO.getRetryTaskId()); } else { SnailJobLog.LOCAL.error("retryTaskId:[{}] 任务调度失败. msg:[{}]", executorDTO.getRetryTaskId(), dispatch.getMessage()); // 客户端返回失败,则认为任务执行失败 + taskExecuteFailure(executorDTO, dispatch.getMessage()); } } catch (Exception e) { @@ -102,66 +111,76 @@ public class RequestRetryClientActor extends AbstractActor { throwable = e; } - RetryTaskLogDTO jobLogMetaDTO = RetryTaskLogConverter.INSTANCE.toRetryTaskLogDTO(executorDTO); -// jobLogMetaDTO.setTimestamp(nowMilli); -// if (realJobExecutorDTO.getRetryStatus()) { -// SnailJobLog.REMOTE.error("taskId:[{}] 任务调度失败执行重试 重试次数:[{}]. <|>{}<|>", jobLogMetaDTO.getTaskId(), -// realJobExecutorDTO.getRetryCount(), jobLogMetaDTO, throwable); -// } else { -// SnailJobLog.REMOTE.error("taskId:[{}] 任务调度失败. <|>{}<|>", -// jobLogMetaDTO.getTaskId(), -// jobLogMetaDTO, throwable); -// } + RetryLogMetaDTO retryTaskLogDTO = RetryTaskLogConverter.INSTANCE.toRetryLogMetaDTO(executorDTO); + retryTaskLogDTO.setTimestamp(nowMilli); + SnailJobLog.REMOTE.error("retryTaskId:[{}] 任务调度失败. <|>{}<|>", retryTaskLogDTO.getRetryTaskId(), + retryTaskLogDTO, throwable); -// taskExecuteFailure(realJobExecutorDTO, throwable.getMessage()); -// SnailSpringContext.getContext().publishEvent( -// new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder() -// .jobTaskBatchId(dispatchJobRequest.getTaskBatchId()) -// .reason(throwable.getMessage()) -// .notifyScene(JobNotifySceneEnum.JOB_TASK_ERROR.getNotifyScene()) -// .build()) -// ); + taskExecuteFailure(executorDTO, throwable.getMessage()); } } - public static class RetryExecutorRetryListener implements RetryListener { + public class RetryExecutorRetryListener implements SnailJobRetryListener { - private final RealRetryExecutorDTO realRetryExecutorDTO; + private final Map properties; + private final RequestRetryExecutorDTO executorDTO; - public RetryExecutorRetryListener(final RealRetryExecutorDTO realJobExecutorDTO) { - this.realRetryExecutorDTO = realJobExecutorDTO; + public RetryExecutorRetryListener(final RequestRetryExecutorDTO realJobExecutorDTO) { + this.executorDTO = realJobExecutorDTO; + this.properties = Maps.newHashMap(); } @Override public void onRetry(final Attempt attempt) { - // 负载节点 - // todo 重新更新任务的客户端信息 - if (!attempt.hasException()) { - // + if (attempt.getAttemptNumber() > 1) { + // 更新最新负载节点 + String hostId = (String) properties.get("HOST_ID"); + String hostIp = (String) properties.get("HOST_IP"); + Integer hostPort = (Integer) properties.get("HOST_PORT"); + + RetryTask retryTask = new RetryTask(); + retryTask.setId(executorDTO.getRetryTaskId()); + RegisterNodeInfo realNodeInfo = new RegisterNodeInfo(); + realNodeInfo.setHostIp(hostIp); + realNodeInfo.setHostPort(Integer.valueOf(hostPort)); + realNodeInfo.setHostId(hostId); + retryTask.setClientInfo(ClientInfoUtils.generate(realNodeInfo)); + retryTaskMapper.updateById(retryTask); } + + } + + @Override + public Map properties() { + return properties; } } - private RetryRpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo, RealRetryExecutorDTO realRetryExecutorDTO) { + private RetryRpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo, RequestRetryExecutorDTO executorDTO) { return RequestBuilder.newBuilder() .nodeInfo(registerNodeInfo) .failRetry(true) .failover(true) - .retryTimes(6) + .retryTimes(3) .retryInterval(1) - .routeKey(realRetryExecutorDTO.getRouteKey()) - .allocKey(String.valueOf(realRetryExecutorDTO.getRetryTaskId())) - .retryListener(new RetryExecutorRetryListener(realRetryExecutorDTO)) + .routeKey(executorDTO.getRouteKey()) + .allocKey(String.valueOf(executorDTO.getRetryTaskId())) + .retryListener(new RetryExecutorRetryListener(executorDTO)) .client(RetryRpcClient.class) .build(); } - private static void taskExecuteFailure(RealRetryExecutorDTO realRetryExecutorDTO, String message) { -// ActorRef actorRef = ActorGenerator.jobTaskExecutorResultActor(); -// JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(realRetryExecutorDTO); -// jobExecutorResultDTO.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus()); -// jobExecutorResultDTO.setMessage(message); -// actorRef.tell(jobExecutorResultDTO, actorRef); + /** + * 更新是执行状态 + * + * @param executorDTO RequestRetryExecutorDTO + * @param message 失败原因 + */ + private static void taskExecuteFailure(RequestRetryExecutorDTO executorDTO, String message) { + ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor(); + RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO); + executorResultDTO.setExceptionMsg(message); + actorRef.tell(executorResultDTO, actorRef); } } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestStopClientActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestStopClientActor.java index 7a3f6fed..929752d3 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestStopClientActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestStopClientActor.java @@ -9,21 +9,15 @@ import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder; -import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.retry.task.client.RetryRpcClient; -import com.aizuda.snailjob.server.retry.task.dto.RealRetryExecutorDTO; +import com.aizuda.snailjob.server.retry.task.dto.RequestStopRetryTaskExecutorDTO; import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; -import com.aizuda.snailjob.server.retry.task.support.RetryTaskLogConverter; -import com.aizuda.snailjob.server.retry.task.dto.RetryTaskLogDTO; -import com.github.rholder.retry.Attempt; -import com.github.rholder.retry.RetryException; -import com.github.rholder.retry.RetryListener; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; -import java.lang.reflect.UndeclaredThrowableException; +import java.util.Objects; /** * @author opensnail @@ -37,135 +31,52 @@ public class RequestStopClientActor extends AbstractActor { @Override public Receive createReceive() { - return receiveBuilder().match(RealRetryExecutorDTO.class, realRetryExecutorDTO -> { + return receiveBuilder().match(RequestStopRetryTaskExecutorDTO.class, taskExecutorDTO -> { try { - doExecute(realRetryExecutorDTO); + doStop(taskExecutorDTO); } catch (Exception e) { log.error("请求客户端发生异常", e); } }).build(); } - private void doExecute(RealRetryExecutorDTO executorDTO) { - long nowMilli = DateUtils.toNowMilli(); + private void doStop(RequestStopRetryTaskExecutorDTO executorDTO) { // 检查客户端是否存在 RegisterNodeInfo registerNodeInfo = CacheRegisterTable.getServerNode( executorDTO.getGroupName(), executorDTO.getNamespaceId(), - executorDTO.getClientId() - ); - -// if (Objects.isNull(registerNodeInfo)) { -// taskExecuteFailure(executorDTO, "客户端不存在"); -// JobLogMetaDTO jobLogMetaDTO = JobTaskConverter.INSTANCE.toJobLogDTO(executorDTO); -// jobLogMetaDTO.setTimestamp(nowMilli); -// if (executorDTO.getRetryStatus()) { -// SnailJobLog.REMOTE.error("taskId:[{}] 任务调度失败执行重试. 失败原因: 无可执行的客户端. 重试次数:[{}]. <|>{}<|>", -// executorDTO.getTaskId(), executorDTO.getRetryCount(), jobLogMetaDTO); -// } else { -// SnailJobLog.REMOTE.error("taskId:[{}] 任务调度失败. 失败原因: 无可执行的客户端 <|>{}<|>", realJobExecutorDTO.getTaskId(), -// jobLogMetaDTO); -// } -// return; -// } + executorDTO.getClientId()); + if (Objects.isNull(registerNodeInfo)) { + return; + } + // 不用关心停止的结果,若服务端尝试终止失败,客户端会兜底进行关闭 StopRetryRequest stopRetryRequest = RetryTaskConverter.INSTANCE.toStopRetryRequest(executorDTO); try { - // 构建请求客户端对象 - RetryRpcClient rpcClient = buildRpcClient(registerNodeInfo, executorDTO); + RetryRpcClient rpcClient = buildRpcClient(registerNodeInfo); Result dispatch = rpcClient.stop(stopRetryRequest); - // todo 是否需要根据DispatchRetryResultDTO if (dispatch.getStatus() == StatusEnum.YES.getStatus()) { - SnailJobLog.LOCAL.info("retryTaskId:[{}] 任务调度成功.", executorDTO.getRetryTaskId()); + SnailJobLog.LOCAL.info("retryTaskId:[{}] 任务停止成功.", executorDTO.getRetryTaskId()); } else { // 客户端返回失败,则认为任务执行失败 -// ClientCallbackHandler clientCallback = ClientCallbackFactory.getClientCallback(realJobExecutorDTO.getTaskType()); -// ClientCallbackContext context = JobTaskConverter.INSTANCE.toClientCallbackContext(realJobExecutorDTO); -// context.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus()); -// context.setExecuteResult(ExecuteResult.failure(null, dispatch.getMessage())); -// clientCallback.callback(context); + SnailJobLog.LOCAL.warn("retryTaskId:[{}] 任务停止失败.", executorDTO.getRetryTaskId()); } } catch (Exception e) { - Throwable throwable; - if (e.getClass().isAssignableFrom(RetryException.class)) { - RetryException re = (RetryException) e; - throwable = re.getLastFailedAttempt().getExceptionCause(); - } else if (e.getClass().isAssignableFrom(UndeclaredThrowableException.class)) { - UndeclaredThrowableException re = (UndeclaredThrowableException) e; - throwable = re.getUndeclaredThrowable(); - } else { - throwable = e; - } - - RetryTaskLogDTO jobLogMetaDTO = RetryTaskLogConverter.INSTANCE.toRetryTaskLogDTO(executorDTO); -// jobLogMetaDTO.setTimestamp(nowMilli); -// if (realJobExecutorDTO.getRetryStatus()) { -// SnailJobLog.REMOTE.error("taskId:[{}] 任务调度失败执行重试 重试次数:[{}]. <|>{}<|>", jobLogMetaDTO.getTaskId(), -// realJobExecutorDTO.getRetryCount(), jobLogMetaDTO, throwable); -// } else { -// SnailJobLog.REMOTE.error("taskId:[{}] 任务调度失败. <|>{}<|>", -// jobLogMetaDTO.getTaskId(), -// jobLogMetaDTO, throwable); -// } - -// taskExecuteFailure(realJobExecutorDTO, throwable.getMessage()); -// SnailSpringContext.getContext().publishEvent( -// new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder() -// .jobTaskBatchId(dispatchJobRequest.getTaskBatchId()) -// .reason(throwable.getMessage()) -// .notifyScene(JobNotifySceneEnum.JOB_TASK_ERROR.getNotifyScene()) -// .build()) -// ); + SnailJobLog.LOCAL.error("retryTaskId:[{}] 任务停止失败.", executorDTO.getRetryTaskId(), e); } } - public static class RetryExecutorRetryListener implements RetryListener { - - private final RealRetryExecutorDTO realRetryExecutorDTO; - - public RetryExecutorRetryListener(final RealRetryExecutorDTO realJobExecutorDTO) { - this.realRetryExecutorDTO = realJobExecutorDTO; - } - - @Override - public void onRetry(final Attempt attempt) { - // 负载节点 - if (attempt.hasException()) { -// SnailJobLog.LOCAL.error("任务调度失败. taskInstanceId:[{}] count:[{}]", -// realRetryExecutorDTO.getTaskBatchId(), attempt.getAttemptNumber(), attempt.getExceptionCause()); -// ClientCallbackHandler clientCallback = ClientCallbackFactory.getClientCallback(realJobExecutorDTO.getTaskType()); -// ClientCallbackContext context = JobTaskConverter.INSTANCE.toClientCallbackContext(realJobExecutorDTO); -// context.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus()); -// context.setExecuteResult(ExecuteResult.failure(null, "网络请求失败")); -// clientCallback.callback(context); - } - } - } - - private RetryRpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo, RealRetryExecutorDTO realRetryExecutorDTO) { - -// int maxRetryTimes = realRetryExecutorDTO.getMaxRetryTimes(); -// boolean retry = realJobExecutorDTO.getRetryStatus(); + private RetryRpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo) { return RequestBuilder.newBuilder() .nodeInfo(registerNodeInfo) .failRetry(true) - .failover(true) - .retryTimes(6) + .retryTimes(3) .retryInterval(1) - .retryListener(new RetryExecutorRetryListener(realRetryExecutorDTO)) .client(RetryRpcClient.class) .build(); } - - private static void taskExecuteFailure(RealRetryExecutorDTO realRetryExecutorDTO, String message) { -// ActorRef actorRef = ActorGenerator.jobTaskExecutorResultActor(); -// JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(realRetryExecutorDTO); -// jobExecutorResultDTO.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus()); -// jobExecutorResultDTO.setMessage(message); -// actorRef.tell(jobExecutorResultDTO, actorRef); - } -} +} \ No newline at end of file diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RetryExecutor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RetryExecutor.java index 199e06d4..07ffa46d 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RetryExecutor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RetryExecutor.java @@ -12,11 +12,13 @@ import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; -import com.aizuda.snailjob.server.common.dto.ScanTask; +import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler; +import com.aizuda.snailjob.server.common.strategy.WaitStrategies; import com.aizuda.snailjob.server.common.util.ClientInfoUtils; -import com.aizuda.snailjob.server.retry.task.dto.RealRetryExecutorDTO; +import com.aizuda.snailjob.server.retry.task.dto.RequestCallbackExecutorDTO; +import com.aizuda.snailjob.server.retry.task.dto.RequestRetryExecutorDTO; import com.aizuda.snailjob.server.retry.task.dto.RetryTaskExecuteDTO; import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper; @@ -101,13 +103,26 @@ public class RetryExecutor extends AbstractActor { updateRetryTaskStatus(execute.getRetryTaskId(), RetryTaskStatusEnum.RUNNING.getStatus(), ClientInfoUtils.generate(serverNode)); - // 请求客户端 - RealRetryExecutorDTO realJobExecutor = RetryTaskConverter.INSTANCE.toRealRetryExecutorDTO(retrySceneConfig, retry); - realJobExecutor.setClientId(serverNode.getHostId()); - realJobExecutor.setRetryTaskId(execute.getRetryTaskId()); - ActorRef actorRef = ActorGenerator.retryRealTaskExecutorActor(); - actorRef.tell(realJobExecutor, actorRef); + Object executorDTO; + if (SyetemTaskTypeEnum.CALLBACK.getType().equals(retry.getTaskType())) { + // 请求客户端 + RequestCallbackExecutorDTO callbackExecutorDTO = RetryTaskConverter.INSTANCE.toRequestCallbackExecutorDTO(retrySceneConfig, retry); + callbackExecutorDTO.setClientId(serverNode.getHostId()); + callbackExecutorDTO.setRetryTaskId(execute.getRetryTaskId()); + executorDTO = callbackExecutorDTO; + } else { + + // 请求客户端 + RequestRetryExecutorDTO retryExecutorDTO = RetryTaskConverter.INSTANCE.toRealRetryExecutorDTO(retrySceneConfig, retry); + retryExecutorDTO.setClientId(serverNode.getHostId()); + retryExecutorDTO.setRetryTaskId(execute.getRetryTaskId()); + + executorDTO = retryExecutorDTO; + } + + ActorRef actorRef = ActorGenerator.retryRealTaskExecutorActor(); + actorRef.tell(executorDTO, actorRef); } private void updateRetryTaskStatus(Long retryTaskId, Integer taskStatus, String clientInfo) { diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java index 2cd5beea..52cce527 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java @@ -12,6 +12,7 @@ import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.config.SystemProperties; import com.aizuda.snailjob.server.common.dto.PartitionTask; import com.aizuda.snailjob.server.common.dto.ScanTask; +import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.snailjob.server.common.strategy.WaitStrategies; import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.common.util.PartitionTaskUtils; @@ -147,11 +148,16 @@ public class ScanRetryActor extends AbstractActor { } waitStrategyContext.setNextTriggerAt(DateUtils.toEpochMilli(nextTriggerAt)); - // todo 这里区分一下是重试还是回调任务即可 waitStrategyContext.setTriggerInterval(retrySceneConfig.getTriggerInterval()); waitStrategyContext.setDelayLevel(partitionTask.getRetryCount() + 1); // 更新触发时间, 任务进入时间轮 - WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getBackOff()); + WaitStrategy waitStrategy; + if (SyetemTaskTypeEnum.CALLBACK.getType().equals(partitionTask.getTaskType())) { + waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getCbTriggerType()); + } else { + waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getBackOff()); + } + return waitStrategy.computeTriggerTime(waitStrategyContext); } @@ -162,8 +168,6 @@ public class ScanRetryActor extends AbstractActor { .select(Retry::getId, Retry::getNextTriggerAt, Retry::getGroupName, Retry::getRetryCount, Retry::getSceneName, Retry::getNamespaceId, Retry::getTaskType) .eq(Retry::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()) - // todo -// .eq(Retry::getTaskType, taskActuatorScene().getTaskType().getType()) .in(Retry::getBucketIndex, buckets) .le(Retry::getNextTriggerAt, DateUtils.toNowMilli() + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) .gt(Retry::getId, startId) diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/result/FailureActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/result/FailureActor.java deleted file mode 100644 index e69de29b..00000000 diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/RetryTaskStopHandler.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/RetryTaskStopHandler.java index 9dca0922..947d8735 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/RetryTaskStopHandler.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/RetryTaskStopHandler.java @@ -5,7 +5,7 @@ import cn.hutool.core.lang.Assert; import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum; import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; -import com.aizuda.snailjob.server.retry.task.dto.RealRetryExecutorDTO; +import com.aizuda.snailjob.server.retry.task.dto.RequestRetryExecutorDTO; import com.aizuda.snailjob.server.retry.task.dto.TaskStopJobDTO; import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper; @@ -39,7 +39,7 @@ public class RetryTaskStopHandler { retryTask.setOperationReason(stopJobDTO.getOperationReason()); Assert.isTrue(1 == retryTaskMapper.updateById(retryTask), () -> new SnailJobServerException("update retry task failed")); - RealRetryExecutorDTO executorDTO = RetryTaskConverter.INSTANCE.toRealRetryExecutorDTO(stopJobDTO); + RequestRetryExecutorDTO executorDTO = RetryTaskConverter.INSTANCE.toRealRetryExecutorDTO(stopJobDTO); ActorRef actorRef = ActorGenerator.stopRetryTaskActor(); actorRef.tell(executorDTO, actorRef); } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/idempotent/IdempotentHolder.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/idempotent/IdempotentHolder.java index a5c3e92f..ba698821 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/idempotent/IdempotentHolder.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/idempotent/IdempotentHolder.java @@ -10,16 +10,11 @@ public class IdempotentHolder { private IdempotentHolder() { } - public static RetryIdempotentStrategyHandler getRetryIdempotent() { - return SingletonHolder.RETRY_IDEMPOTENT_INSTANCE; - } - public static TimerIdempotent getTimerIdempotent() { return SingletonHolder.TIMER_IDEMPOTENT; } private static class SingletonHolder { - private static final RetryIdempotentStrategyHandler RETRY_IDEMPOTENT_INSTANCE = new RetryIdempotentStrategyHandler(); private static final TimerIdempotent TIMER_IDEMPOTENT = new TimerIdempotent(); } } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/idempotent/RetryIdempotentStrategyHandler.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/idempotent/RetryIdempotentStrategyHandler.java deleted file mode 100644 index 93f49c5b..00000000 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/idempotent/RetryIdempotentStrategyHandler.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.aizuda.snailjob.server.retry.task.support.idempotent; - -import com.aizuda.snailjob.server.common.IdempotentStrategy; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import org.springframework.stereotype.Component; - -import java.util.concurrent.TimeUnit; - -/** - * 重试任务幂等校验器 - * - * @author: opensnail - * @date : 2021-11-23 09:26 - */ -@Component -public class RetryIdempotentStrategyHandler implements IdempotentStrategy { - - private static final Cache cache; - - static { - cache = CacheBuilder.newBuilder() - .concurrencyLevel(16) // 并发级别 - .expireAfterWrite(60, TimeUnit.SECONDS) - .build(); - } - - @Override - public boolean set(String key) { - cache.put(key, key); - return Boolean.TRUE; - } - - - @Override - public boolean isExist(String key) { - return cache.asMap().containsKey(key); - } - - @Override - public boolean clear(String key) { - cache.invalidate(key); - return Boolean.TRUE; - } - -} diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/retry/RetryBuilder.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/retry/RetryBuilder.java deleted file mode 100644 index c81d38ce..00000000 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/retry/RetryBuilder.java +++ /dev/null @@ -1,84 +0,0 @@ -package com.aizuda.snailjob.server.retry.task.support.retry; - -import cn.hutool.core.collection.CollUtil; -import com.aizuda.snailjob.server.common.WaitStrategy; -import com.aizuda.snailjob.server.common.exception.SnailJobServerException; -import com.aizuda.snailjob.server.retry.task.support.FilterStrategy; -import com.aizuda.snailjob.server.retry.task.support.RetryContext; -import com.aizuda.snailjob.server.retry.task.support.StopStrategy; - -import java.util.*; - -/** - * 重试构建 - * - * @author: opensnail - * @date : 2021-11-29 18:42 - */ -public class RetryBuilder { - - private List stopStrategies; - private WaitStrategy waitStrategy; - private List filterStrategies; - private RetryContext retryContext; - - public static RetryBuilder newBuilder() { - return new RetryBuilder<>(); - } - - public RetryBuilder withWaitStrategy(WaitStrategy waitStrategy) { - this.waitStrategy = waitStrategy; - return this; - } - - public RetryBuilder withFilterStrategy(FilterStrategy filterStrategy) { - if (CollUtil.isEmpty(filterStrategies)) { - filterStrategies = new ArrayList<>(); - } - - filterStrategies.add(filterStrategy); - return this; - } - - public RetryBuilder withStopStrategy(StopStrategy stopStrategy) { - if (CollUtil.isEmpty(stopStrategies)) { - stopStrategies = new ArrayList<>(); - } - - stopStrategies.add(stopStrategy); - return this; - } - - public RetryBuilder withRetryContext(RetryContext retryContext) { - this.retryContext = retryContext; - return this; - } - - public RetryExecutor build() { - - if (Objects.isNull(waitStrategy)) { - throw new SnailJobServerException("waitStrategy 不能为null"); - } - - if (Objects.isNull(retryContext)) { - throw new SnailJobServerException("retryContext 不能为null"); - } - - if (CollUtil.isEmpty(stopStrategies)) { - stopStrategies = Collections.EMPTY_LIST; - } else { - stopStrategies.sort(Comparator.comparingInt(StopStrategy::order)); - } - - if (CollUtil.isEmpty(filterStrategies)) { - filterStrategies = Collections.EMPTY_LIST; - } else { - filterStrategies.sort(Comparator.comparingInt(FilterStrategy::order)); - } - - retryContext.setWaitStrategy(waitStrategy); - - return new RetryExecutor(stopStrategies, waitStrategy, filterStrategies, retryContext); - } - -} diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/retry/RetryExecutor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/retry/RetryExecutor.java deleted file mode 100644 index 787b7cd7..00000000 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/retry/RetryExecutor.java +++ /dev/null @@ -1,133 +0,0 @@ -package com.aizuda.snailjob.server.retry.task.support.retry; - -import akka.actor.ActorRef; -import cn.hutool.core.lang.Pair; -import cn.hutool.core.util.StrUtil; -import com.aizuda.snailjob.client.model.DispatchRetryResultDTO; -import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum; -import com.aizuda.snailjob.common.core.model.Result; -import com.aizuda.snailjob.common.log.SnailJobLog; -import com.aizuda.snailjob.server.common.WaitStrategy; -import com.aizuda.snailjob.server.common.akka.ActorGenerator; -import com.aizuda.snailjob.server.common.dto.RetryLogMetaDTO; -import com.aizuda.snailjob.server.retry.task.dto.RetryTaskExecutorDTO; -import com.aizuda.snailjob.server.retry.task.support.FilterStrategy; -import com.aizuda.snailjob.server.retry.task.support.RetryContext; -import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; -import com.aizuda.snailjob.server.retry.task.support.StopStrategy; -import lombok.extern.slf4j.Slf4j; - -import java.util.List; -import java.util.Objects; -import java.util.concurrent.Callable; - -/** - * 重试执行器 - * - * @author: opensnail - * @date : 2021-11-29 18:57 - */ -@Slf4j -public class RetryExecutor { - - private final List stopStrategies; - private final WaitStrategy waitStrategy; - private final List filterStrategies; - private final RetryContext retryContext; - - public RetryExecutor(List stopStrategies, - WaitStrategy waitStrategy, - List filterStrategies, - RetryContext retryContext) { - this.stopStrategies = stopStrategies; - this.waitStrategy = waitStrategy; - this.filterStrategies = filterStrategies; - this.retryContext = retryContext; - } - - public Pair filter() { - - for (FilterStrategy filterStrategy : filterStrategies) { - Pair pair = filterStrategy.filter(retryContext); - if (!pair.getKey()) { - return pair; - } - } - - return Pair.of(Boolean.TRUE, new StringBuilder()); - } - - /** - * 重试执行 - * - * @param callable 重试执行回调 - * @return 重试结果 - * @throws Exception - */ - public V call(Callable callable) throws Exception { - - // 这里调用客户端可能会出现网络异常 - V call = null; - try { - call = callable.call(); - retryContext.setCallResult(call); - } catch (Exception e) { - RetryLogMetaDTO retryLogMetaDTO = RetryTaskConverter.INSTANCE.toLogMetaDTO(retryContext.getRetryTask()); - SnailJobLog.REMOTE.error("请求客户端执行失败. uniqueId:[{}] <|>{}<|>", e); - retryContext.setException(e); - } - - boolean isStop = Boolean.TRUE; - - // 触发停止策略判断 - for (StopStrategy stopStrategy : stopStrategies) { - if (stopStrategy.supports(retryContext)) { - // 必须责任链中的所有停止策略都判断为停止,此时才判定为重试完成 - if (!stopStrategy.shouldStop(retryContext)) { - isStop = Boolean.FALSE; - } - } - } - - ActorRef actorRef; - if (isStop) { - // 状态变为完成 FinishActor - actorRef = ActorGenerator.finishActor(); - } else { - // 失败 FailureActor - actorRef = ActorGenerator.failureActor(); - } - - // 获取失败原因 - String reason = StrUtil.EMPTY; - if (retryContext.hasException()) { - Exception exception = retryContext.getException(); - if (Objects.nonNull(exception)) { - reason = exception.getCause().getMessage(); - } - - } else { - if (Objects.nonNull(call)) { - Result result = (Result) call; - DispatchRetryResultDTO data = result.getData(); - if (StrUtil.isBlank(result.getMessage()) && Objects.nonNull(data)) { - reason = data.getExceptionMsg(); - } else { - reason = result.getMessage(); - } - } - } - - RetryTaskExecutorDTO retryTaskExecutorDTO = - RetryTaskConverter.INSTANCE.toRetryTaskExecutorDTO( - retryContext.getRetryTask(), reason, - RetryNotifySceneEnum.RETRY_TASK_FAIL_ERROR.getNotifyScene()); - actorRef.tell(retryTaskExecutorDTO, actorRef); - - return call; - } - - public RetryContext getRetryContext() { - return retryContext; - } -} diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/strategy/FilterStrategies.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/strategy/FilterStrategies.java deleted file mode 100644 index 5d6ee5cd..00000000 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/strategy/FilterStrategies.java +++ /dev/null @@ -1,277 +0,0 @@ -package com.aizuda.snailjob.server.retry.task.support.strategy; - -import cn.hutool.core.lang.Pair; -import com.aizuda.snailjob.common.core.context.SnailSpringContext; -import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum; -import com.aizuda.snailjob.common.log.SnailJobLog; -import com.aizuda.snailjob.server.common.IdempotentStrategy; -import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; -import com.aizuda.snailjob.server.common.dto.DistributeInstance; -import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; -import com.aizuda.snailjob.server.common.triple.ImmutableTriple; -import com.aizuda.snailjob.server.retry.task.dto.RetryTaskFailAlarmEventDTO; -import com.aizuda.snailjob.server.retry.task.support.FilterStrategy; -import com.aizuda.snailjob.server.retry.task.support.RetryContext; -import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; -import com.aizuda.snailjob.server.retry.task.support.cache.CacheGroupRateLimiter; -import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailAlarmEvent; -import com.aizuda.snailjob.template.datasource.persistence.mapper.ServerNodeMapper; -import com.aizuda.snailjob.template.datasource.persistence.po.Retry; -import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.google.common.util.concurrent.RateLimiter; -import lombok.extern.slf4j.Slf4j; - -import java.text.MessageFormat; -import java.time.LocalDateTime; -import java.util.Objects; -import java.util.concurrent.TimeUnit; - -/** - * 生成 {@link FilterStrategy} 实例. - * - * @author: opensnail - * @date : 2021-11-30 10:03 - */ -@Slf4j -@Deprecated -public class FilterStrategies { - - private FilterStrategies() { - } - - /** - * 触发时间过滤策略 - * - * @return {@link TriggerAtFilterStrategies} 触发时间过滤策略 - */ - @Deprecated - public static FilterStrategy triggerAtFilter() { - return new TriggerAtFilterStrategies(); - } - - /** - * 使用BitSet幂等的过滤策略 - * - * @return {@link BitSetIdempotentFilterStrategies} BitSet幂等的过滤策略 - */ - public static FilterStrategy bitSetIdempotentFilter(IdempotentStrategy idempotentStrategy) { - return new BitSetIdempotentFilterStrategies(idempotentStrategy); - } - - /** - * 场景黑名单策略 - * - * @return {@link SceneBlackFilterStrategies} 场景黑名单策略 - */ - public static FilterStrategy sceneBlackFilter() { - return new SceneBlackFilterStrategies(); - } - - /** - * 检查是否存在存活的客户端POD - * - * @return {@link CheckAliveClientPodFilterStrategies} 客户端存活POD检查策略 - */ - public static FilterStrategy checkAliveClientPodFilter() { - return new CheckAliveClientPodFilterStrategies(); - } - - /** - * 检查分配的客户端是否达到限流阈值 - * - * @return {@link RateLimiterFilterStrategies} 检查分配的客户端是否达到限流阈值 - */ - public static FilterStrategy rateLimiterFilter() { - return new RateLimiterFilterStrategies(); - } - - /** - * 正在rebalance时不允许下发重试流量 - * - * @return {@link ReBalanceFilterStrategies} 正在rebalance时不允许下发重试流量 - */ - public static FilterStrategy rebalanceFilterStrategies() { - return new ReBalanceFilterStrategies(); - } - - /** - * 触发时间过滤策略 - *

- * 根据延迟等级的时间计算下次触发时间是否小于当前时间,满足则返回true 否则返回false - */ - private static final class TriggerAtFilterStrategies implements FilterStrategy { - - @Override - public Pair filter(RetryContext retryContext) { - Retry retry = retryContext.getRetryTask(); - Long nextTriggerAt = retry.getNextTriggerAt(); - -// boolean result = nextTriggerAt.isBefore(LocalDateTime.now()); -// StringBuilder description = new StringBuilder(); -// if (!result) { -// description.append(MessageFormat.format("未到触发时间. uniqueId:[{0}]", retry.getId())); -// } - - return null; - } - - @Override - public int order() { - return 0; - } - } - - /** - * 使用BitSet幂等的过滤策略 - *

- * 判断BitSet中是否存在,若存在则放回false 否则返回true - */ - private static final class BitSetIdempotentFilterStrategies implements FilterStrategy { - - private final IdempotentStrategy idempotentStrategy; - - public BitSetIdempotentFilterStrategies(IdempotentStrategy idempotentStrategy) { - this.idempotentStrategy = idempotentStrategy; - } - - @Override - public Pair filter(RetryContext retryContext) { - Retry retry = retryContext.getRetryTask(); - boolean result = !idempotentStrategy.isExist(ImmutableTriple.of(retry.getGroupName(), retry.getNamespaceId(), retry.getId()).toString()); - StringBuilder description = new StringBuilder(); - if (!result) { - description.append(MessageFormat.format("存在执行中的任务.uniqueId:[{0}]", retry.getId())); - } - - return Pair.of(result, description); - } - - @Override - public int order() { - return 1; - } - } - - /** - * 场景黑名单策略 - *

- * 如果重试的数据在黑名单中的则返回false 否则为true - */ - private static final class SceneBlackFilterStrategies implements FilterStrategy { - - @Override - public Pair filter(RetryContext retryContext) { - Retry retry = retryContext.getRetryTask(); - - boolean result = !retryContext.getSceneBlacklist().contains(retry.getSceneName()); - - StringBuilder description = new StringBuilder(); - if (!result) { - description.append(MessageFormat.format("场景:[{0}]在黑名单中, 不允许执行.", retry.getSceneName())); - } - - return Pair.of(result, description); - } - - @Override - public int order() { - return 2; - } - } - - /** - * 检查是否存在存活的客户端POD - */ - private static final class CheckAliveClientPodFilterStrategies implements FilterStrategy { - - @Override - public Pair filter(RetryContext retryContext) { - Retry retry = retryContext.getRetryTask(); - RegisterNodeInfo serverNode = retryContext.getServerNode(); - - boolean result; - StringBuilder description = new StringBuilder(); - if (Objects.isNull(serverNode)) { - result = false; - description.append(MessageFormat.format("没有可执行的客户端节点. uniqueId:[{0}]", retry.getId())); - } else { - ServerNodeMapper serverNodeMapper = SnailSpringContext.getBeanByType(ServerNodeMapper.class); - result = 1 == serverNodeMapper.selectCount(new LambdaQueryWrapper().eq(ServerNode::getHostId, serverNode.getHostId())); - if (!result) { - // 删除缓存中的失效节点 - CacheRegisterTable.remove(retry.getGroupName(), retry.getNamespaceId(), serverNode.getHostId()); - description.append(MessageFormat.format("DB中未查询到客户端节点. hostId:[{0}] uniqueId:[{1}]", serverNode.getHostId(), retry.getId())); - } - } - - if (result == false) { - RetryTaskFailAlarmEventDTO toRetryTaskFailAlarmEventDTO = - RetryTaskConverter.INSTANCE.toRetryTaskFailAlarmEventDTO( - retry, - description.toString(), - RetryNotifySceneEnum.RETRY_NO_CLIENT_NODES_ERROR.getNotifyScene()); - SnailSpringContext.getContext().publishEvent(new RetryTaskFailAlarmEvent(toRetryTaskFailAlarmEventDTO)); - } - - return Pair.of(result, description); - } - - @Override - public int order() { - return 3; - } - } - - /** - * 检查是否存在存活的客户端POD - */ - private static final class RateLimiterFilterStrategies implements FilterStrategy { - - @Override - public Pair filter(RetryContext retryContext) { - RegisterNodeInfo serverNode = retryContext.getServerNode(); - Retry retry = retryContext.getRetryTask(); - - StringBuilder description = new StringBuilder(); - Boolean result = Boolean.TRUE; - RateLimiter rateLimiter = CacheGroupRateLimiter.getRateLimiterByKey(serverNode.getHostId()); - if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(100, TimeUnit.MILLISECONDS)) { - SnailJobLog.LOCAL.error("该POD:[{}]已到达最大限流阈值,本次重试不执行", serverNode.getHostId()); - description.append(MessageFormat.format("该POD:[{0}]已到达最大限流阈值,本次重试不执行.uniqueId:[{1}]", serverNode.getHostId(), retry.getId())); - result = Boolean.FALSE; - } - - return Pair.of(result, description); - } - - @Override - public int order() { - return 4; - } - } - - /** - * rebalance中数据不进行重试 - */ - private static final class ReBalanceFilterStrategies implements FilterStrategy { - - @Override - public Pair filter(RetryContext retryContext) { - Retry retry = retryContext.getRetryTask(); - boolean result = !DistributeInstance.RE_BALANCE_ING.get(); - StringBuilder description = new StringBuilder(); - if (!result) { - description.append(MessageFormat.format("系统Rebalancing中数据无法重试.uniqueId:[{0}]", retry.getId())); - } - return Pair.of(result, description); - } - - @Override - public int order() { - return 1; - } - } - - -} diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/strategy/StopStrategies.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/strategy/StopStrategies.java deleted file mode 100644 index 2368674c..00000000 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/strategy/StopStrategies.java +++ /dev/null @@ -1,140 +0,0 @@ -package com.aizuda.snailjob.server.retry.task.support.strategy; - -import com.aizuda.snailjob.client.model.DispatchRetryResultDTO; -import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum; -import com.aizuda.snailjob.common.core.enums.StatusEnum; -import com.aizuda.snailjob.common.core.model.Result; -import com.aizuda.snailjob.server.retry.task.support.RetryContext; -import com.aizuda.snailjob.server.retry.task.support.StopStrategy; -import com.aizuda.snailjob.server.retry.task.support.context.MaxAttemptsPersistenceRetryContext; - -import java.util.Objects; - -/** - * 生成 {@link StopStrategy} 实例. - * - * @author: opensnail - * @date : 2021-11-29 19:22 - */ -public class StopStrategies { - - private StopStrategies() { - } - - /** - * 调用客户端发生异常触发停止策略 - * - * @return {@link ExceptionStopStrategy} 调用客户端发生异常触发停止策略 - */ - public static StopStrategy stopException() { - return new ExceptionStopStrategy(); - } - - /** - * 根据客户端返回状态判断是否终止重试 - * - * @return {@link ResultStatusCodeStopStrategy} 重试结果停止策略 - */ - public static StopStrategy stopResultStatus() { - return new ResultStatusStopStrategy(); - } - - /** - * 根据客户端返回结果对象的状态码判断是否终止重试 - * - * @return {@link ResultStatusCodeStopStrategy} 重试结果停止策略 - */ - public static StopStrategy stopResultStatusCode() { - return new ResultStatusCodeStopStrategy(); - } - - /** - * 调用客户端发生异常触发停止策略 - */ - private static final class ExceptionStopStrategy implements StopStrategy { - - @Override - public boolean shouldStop(RetryContext retryContext) { - return !retryContext.hasException(); - } - - @Override - public boolean supports(RetryContext retryContext) { - return true; - } - - @Override - public int order() { - return 1; - } - } - - /** - * 根据客户端返回的状态码判断是否应该停止 - *

- * 1、{@link Result#getStatus()} 不为1 则继续重试 - */ - private static final class ResultStatusStopStrategy implements StopStrategy { - - @Override - public boolean shouldStop(RetryContext retryContext) { - - Result response = (Result) retryContext.getCallResult(); - - if (Objects.isNull(response) || StatusEnum.YES.getStatus() != response.getStatus()) { - return Boolean.FALSE; - } - - return Boolean.TRUE; - } - - @Override - public boolean supports(RetryContext retryContext) { - return true; - } - - @Override - public int order() { - return 2; - } - } - - /** - * 根据客户端返回结果集判断是否应该停止 - *

- * 1、{@link Result#getStatus()} 不为1 则继续重试 - * 2、根据{@link Result#getData()}中的statusCode判断是否停止 - */ - private static final class ResultStatusCodeStopStrategy implements StopStrategy { - - @Override - public boolean shouldStop(RetryContext retryContext) { - - Result response = (Result) retryContext.getCallResult(); - - if (Objects.isNull(response) || StatusEnum.YES.getStatus() != response.getStatus()) { - return Boolean.FALSE; - } - - DispatchRetryResultDTO data = response.getData(); - if (Objects.isNull(data)) { - return Boolean.FALSE; - } - - Integer statusCode = data.getStatusCode(); - Integer status = RetryResultStatusEnum.getRetryResultStatusEnum(statusCode).getStatus(); - return RetryResultStatusEnum.SUCCESS.getStatus().equals(status) || RetryResultStatusEnum.STOP.getStatus().equals(status); - } - - @Override - public boolean supports(RetryContext retryContext) { - return retryContext instanceof MaxAttemptsPersistenceRetryContext; - } - - @Override - public int order() { - return 3; - } - } - -} diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryTaskController.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryController.java similarity index 64% rename from snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryTaskController.java rename to snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryController.java index f652a23c..83381cd9 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryTaskController.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryController.java @@ -4,7 +4,7 @@ import com.aizuda.snailjob.common.core.model.Result; import com.aizuda.snailjob.server.web.annotation.LoginRequired; import com.aizuda.snailjob.server.web.model.base.PageResult; import com.aizuda.snailjob.server.web.model.request.*; -import com.aizuda.snailjob.server.web.model.response.RetryTaskResponseVO; +import com.aizuda.snailjob.server.web.model.response.RetryResponseVO; import com.aizuda.snailjob.server.web.service.RetryTaskService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; @@ -19,34 +19,34 @@ import java.util.List; * @date 2022-02-27 */ @RestController -@RequestMapping("/retry-task") -public class RetryTaskController { +@RequestMapping("/retry") +public class RetryController { @Autowired private RetryTaskService retryTaskService; @LoginRequired @GetMapping("list") - public PageResult> getRetryTaskPage(RetryTaskQueryVO queryVO) { - return retryTaskService.getRetryTaskPage(queryVO); + public PageResult> getRetryTaskPage(RetryQueryVO queryVO) { + return retryTaskService.getRetryPage(queryVO); } @LoginRequired @GetMapping("{id}") - public RetryTaskResponseVO getRetryTaskById(@RequestParam("groupName") String groupName, - @PathVariable("id") Long id) { - return retryTaskService.getRetryTaskById(groupName, id); + public RetryResponseVO getRetryTaskById(@RequestParam("groupName") String groupName, + @PathVariable("id") Long id) { + return retryTaskService.getRetryById(groupName, id); } @LoginRequired @PutMapping("status") - public int updateRetryTaskStatus(@RequestBody RetryTaskUpdateStatusRequestVO retryTaskUpdateStatusRequestVO) { - return retryTaskService.updateRetryTaskStatus(retryTaskUpdateStatusRequestVO); + public int updateRetryTaskStatus(@RequestBody RetryUpdateStatusRequestVO retryUpdateStatusRequestVO) { + return retryTaskService.updateRetryTaskStatus(retryUpdateStatusRequestVO); } @LoginRequired @PostMapping - public int saveRetryTask(@RequestBody @Validated RetryTaskSaveRequestVO retryTaskRequestVO) { + public int saveRetryTask(@RequestBody @Validated RetrySaveRequestVO retryTaskRequestVO) { return retryTaskService.saveRetryTask(retryTaskRequestVO); } @@ -58,14 +58,14 @@ public class RetryTaskController { @LoginRequired @PutMapping("/batch") - public Integer updateRetryTaskExecutorName(@RequestBody @Validated RetryTaskUpdateExecutorNameRequestVO requestVO) { - return retryTaskService.updateRetryTaskExecutorName(requestVO); + public Integer updateRetryTaskExecutorName(@RequestBody @Validated RetryUpdateExecutorNameRequestVO requestVO) { + return retryTaskService.updateRetryExecutorName(requestVO); } @LoginRequired @DeleteMapping("/batch") public boolean batchDeleteRetryTask(@RequestBody @Validated BatchDeleteRetryTaskVO requestVO) { - return retryTaskService.batchDeleteRetryTask(requestVO); + return retryTaskService.batchDeleteRetry(requestVO); } @LoginRequired @@ -77,12 +77,12 @@ public class RetryTaskController { @LoginRequired @PostMapping("/manual/trigger/retry/task") public boolean manualTriggerRetryTask(@RequestBody @Validated ManualTriggerTaskRequestVO requestVO) { - return retryTaskService.manualTriggerRetryTask(requestVO); + return retryTaskService.manualTriggerRetry(requestVO); } @LoginRequired @PostMapping("/manual/trigger/callback/task") public boolean manualTriggerCallbackTask(@RequestBody @Validated ManualTriggerTaskRequestVO requestVO) { - return retryTaskService.manualTriggerCallbackTask(requestVO); + return retryTaskService.manualTriggerCallback(requestVO); } } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryTaskLogController.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryTaskLogController.java index beaa1cc4..722364e0 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryTaskLogController.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryTaskLogController.java @@ -3,7 +3,7 @@ package com.aizuda.snailjob.server.web.controller; import com.aizuda.snailjob.server.web.annotation.LoginRequired; import com.aizuda.snailjob.server.web.model.base.PageResult; import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO; -import com.aizuda.snailjob.server.web.model.request.RetryTaskLogQueryVO; +import com.aizuda.snailjob.server.web.model.request.RetryTaskQueryVO; import com.aizuda.snailjob.server.web.model.response.RetryTaskLogMessageResponseVO; import com.aizuda.snailjob.server.web.model.response.RetryTaskLogResponseVO; import com.aizuda.snailjob.server.web.service.RetryTaskLogService; @@ -21,7 +21,7 @@ import java.util.Set; * @date 2022-02-27 */ @RestController -@RequestMapping("/retry-task-log") +@RequestMapping("/retry-task") public class RetryTaskLogController { @Autowired @@ -29,7 +29,7 @@ public class RetryTaskLogController { @LoginRequired @GetMapping("list") - public PageResult> getRetryTaskLogPage(RetryTaskLogQueryVO queryVO) { + public PageResult> getRetryTaskLogPage(RetryTaskQueryVO queryVO) { return retryTaskLogService.getRetryTaskLogPage(queryVO); } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskLogQueryVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryQueryVO.java similarity index 61% rename from snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskLogQueryVO.java rename to snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryQueryVO.java index a6d39b4e..695bbc46 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskLogQueryVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryQueryVO.java @@ -4,15 +4,14 @@ import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; import lombok.Data; import lombok.EqualsAndHashCode; -import java.time.LocalDateTime; - /** - * @author: opensnail - * @date : 2022-02-28 09:08 + * @author opensnail + * @date 2022-02-27 + * @since 2.0 */ @Data @EqualsAndHashCode(callSuper = true) -public class RetryTaskLogQueryVO extends BaseQueryVO { +public class RetryQueryVO extends BaseQueryVO { private String groupName; @@ -22,11 +21,7 @@ public class RetryTaskLogQueryVO extends BaseQueryVO { private String idempotentId; - private String uniqueId; - private Integer retryStatus; - private LocalDateTime beginDate; - - private LocalDateTime endDate; + private Long retryId; } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskSaveRequestVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetrySaveRequestVO.java similarity index 96% rename from snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskSaveRequestVO.java rename to snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetrySaveRequestVO.java index c9eba16d..dbc62998 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskSaveRequestVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetrySaveRequestVO.java @@ -12,7 +12,7 @@ import org.hibernate.validator.constraints.NotBlank; * @since 2.0 */ @Data -public class RetryTaskSaveRequestVO { +public class RetrySaveRequestVO { /** * 组名称 diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskQueryVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskQueryVO.java index 8781bef0..c8046156 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskQueryVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskQueryVO.java @@ -4,10 +4,11 @@ import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; import lombok.Data; import lombok.EqualsAndHashCode; +import java.time.LocalDateTime; + /** - * @author opensnail - * @date 2022-02-27 - * @since 2.0 + * @author: opensnail + * @date : 2022-02-28 09:08 */ @Data @EqualsAndHashCode(callSuper = true) @@ -21,7 +22,11 @@ public class RetryTaskQueryVO extends BaseQueryVO { private String idempotentId; + private Long retryId; + private Integer retryStatus; - private Long retryId; + private LocalDateTime beginDate; + + private LocalDateTime endDate; } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskUpdateExecutorNameRequestVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryUpdateExecutorNameRequestVO.java similarity index 94% rename from snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskUpdateExecutorNameRequestVO.java rename to snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryUpdateExecutorNameRequestVO.java index 040bf34c..c241391f 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskUpdateExecutorNameRequestVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryUpdateExecutorNameRequestVO.java @@ -15,7 +15,7 @@ import java.util.List; * @date 2022-09-29 */ @Data -public class RetryTaskUpdateExecutorNameRequestVO { +public class RetryUpdateExecutorNameRequestVO { /** * 组名称 diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskUpdateStatusRequestVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryUpdateStatusRequestVO.java similarity index 94% rename from snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskUpdateStatusRequestVO.java rename to snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryUpdateStatusRequestVO.java index c836018f..607e2265 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskUpdateStatusRequestVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryUpdateStatusRequestVO.java @@ -12,7 +12,7 @@ import lombok.Data; * @date 2022-09-29 */ @Data -public class RetryTaskUpdateStatusRequestVO { +public class RetryUpdateStatusRequestVO { /** * 重试状态 {@link RetryStatusEnum} diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/SceneConfigRequestVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/SceneConfigRequestVO.java index 64d4fe5b..13dce569 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/SceneConfigRequestVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/SceneConfigRequestVO.java @@ -82,7 +82,7 @@ public class SceneConfigRequestVO { /** * 回调的最大执行次数 */ - private int cbMaxCount = 288; + private int cbMaxCount; /** * 回调间隔时间 diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/RetryTaskResponseVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/RetryResponseVO.java similarity index 90% rename from snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/RetryTaskResponseVO.java rename to snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/RetryResponseVO.java index 4fe90b92..2e3061c7 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/RetryTaskResponseVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/RetryResponseVO.java @@ -10,7 +10,7 @@ import java.time.LocalDateTime; * @since 2.0 */ @Data -public class RetryTaskResponseVO { +public class RetryResponseVO { private Long id; @@ -42,4 +42,6 @@ public class RetryTaskResponseVO { private LocalDateTime updateDt; + private RetryResponseVO children; + } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/RetryTaskLogResponseVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/RetryTaskLogResponseVO.java index b6e06b5e..537b56ea 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/RetryTaskLogResponseVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/RetryTaskLogResponseVO.java @@ -13,28 +13,24 @@ public class RetryTaskLogResponseVO { private Long id; - private String uniqueId; - private String groupName; private String sceneName; - private String idempotentId; + private Integer taskStatus; - private String bizNo; + private Long retryId; - private String executorName; - - private String argsStr; - - private String extAttrs; - - private LocalDateTime nextTriggerAt; - - private Integer retryStatus; private Integer taskType; private LocalDateTime createDt; + private Integer operationReason; + + /** + * 客户端ID + */ + private String clientInfo; + } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/SceneConfigResponseVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/SceneConfigResponseVO.java index 685fe388..26f152da 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/SceneConfigResponseVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/SceneConfigResponseVO.java @@ -42,4 +42,24 @@ public class SceneConfigResponseVO { * 通知告警场景配置id列表 */ private Set notifyIds; + + /** + * 回调状态 0、不开启 1、开启 + */ + private Integer cbStatus; + + /** + * 回调触发类型 + */ + private Integer cbTriggerType; + + /** + * 回调的最大执行次数 + */ + private int cbMaxCount; + + /** + * 回调间隔时间 + */ + private String cbTriggerInterval; } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryTaskLogService.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryTaskLogService.java index c87d9001..98dd056b 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryTaskLogService.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryTaskLogService.java @@ -2,7 +2,7 @@ package com.aizuda.snailjob.server.web.service; import com.aizuda.snailjob.server.web.model.base.PageResult; import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO; -import com.aizuda.snailjob.server.web.model.request.RetryTaskLogQueryVO; +import com.aizuda.snailjob.server.web.model.request.RetryTaskQueryVO; import com.aizuda.snailjob.server.web.model.response.RetryTaskLogMessageResponseVO; import com.aizuda.snailjob.server.web.model.response.RetryTaskLogResponseVO; @@ -16,7 +16,7 @@ import java.util.Set; */ public interface RetryTaskLogService { - PageResult> getRetryTaskLogPage(RetryTaskLogQueryVO queryVO); + PageResult> getRetryTaskLogPage(RetryTaskQueryVO queryVO); RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage(RetryTaskLogMessageQueryVO queryVO); diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryTaskService.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryTaskService.java index c46eae1a..af07d9ad 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryTaskService.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryTaskService.java @@ -2,7 +2,7 @@ package com.aizuda.snailjob.server.web.service; import com.aizuda.snailjob.server.web.model.base.PageResult; import com.aizuda.snailjob.server.web.model.request.*; -import com.aizuda.snailjob.server.web.model.response.RetryTaskResponseVO; +import com.aizuda.snailjob.server.web.model.response.RetryResponseVO; import java.util.List; @@ -13,7 +13,7 @@ import java.util.List; */ public interface RetryTaskService { - PageResult> getRetryTaskPage(RetryTaskQueryVO queryVO); + PageResult> getRetryPage(RetryQueryVO queryVO); /** * 通过重试任务表id获取重试任务信息 @@ -22,23 +22,23 @@ public interface RetryTaskService { * @param id 重试任务表id * @return 重试任务 */ - RetryTaskResponseVO getRetryTaskById(String groupName, Long id); + RetryResponseVO getRetryById(String groupName, Long id); /** * 更新重试任务状态 * - * @param retryTaskUpdateStatusRequestVO 更新重试任务状态请求模型 + * @param retryUpdateStatusRequestVO 更新重试任务状态请求模型 * @return */ - int updateRetryTaskStatus(RetryTaskUpdateStatusRequestVO retryTaskUpdateStatusRequestVO); + int updateRetryTaskStatus(RetryUpdateStatusRequestVO retryUpdateStatusRequestVO); /** * 手动新增重试任务 * - * @param retryTaskRequestVO {@link RetryTaskSaveRequestVO} 重试数据模型 + * @param retryTaskRequestVO {@link RetrySaveRequestVO} 重试数据模型 * @return */ - int saveRetryTask(RetryTaskSaveRequestVO retryTaskRequestVO); + int saveRetryTask(RetrySaveRequestVO retryTaskRequestVO); /** * 委托客户端生成idempotentId @@ -54,7 +54,7 @@ public interface RetryTaskService { * @param requestVO 更新执行器变更模型 * @return 更新条数 */ - int updateRetryTaskExecutorName(RetryTaskUpdateExecutorNameRequestVO requestVO); + int updateRetryExecutorName(RetryUpdateExecutorNameRequestVO requestVO); /** * 批量删除重试数据 @@ -62,7 +62,7 @@ public interface RetryTaskService { * @param requestVO 批量删除重试数据 * @return */ - boolean batchDeleteRetryTask(BatchDeleteRetryTaskVO requestVO); + boolean batchDeleteRetry(BatchDeleteRetryTaskVO requestVO); /** * 解析日志 @@ -78,7 +78,7 @@ public interface RetryTaskService { * @param requestVO * @return */ - boolean manualTriggerRetryTask(ManualTriggerTaskRequestVO requestVO); + boolean manualTriggerRetry(ManualTriggerTaskRequestVO requestVO); /** * 手动执行回调任务 @@ -86,5 +86,5 @@ public interface RetryTaskService { * @param requestVO * @return */ - boolean manualTriggerCallbackTask(ManualTriggerTaskRequestVO requestVO); + boolean manualTriggerCallback(ManualTriggerTaskRequestVO requestVO); } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/RetryTaskResponseVOConverter.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/RetryTaskResponseVOConverter.java index 44b78f3d..0182af25 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/RetryTaskResponseVOConverter.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/RetryTaskResponseVOConverter.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.web.service.convert; -import com.aizuda.snailjob.server.web.model.response.RetryTaskResponseVO; +import com.aizuda.snailjob.server.web.model.response.RetryResponseVO; import com.aizuda.snailjob.template.datasource.persistence.po.Retry; import org.mapstruct.Mapper; import org.mapstruct.factory.Mappers; @@ -17,7 +17,7 @@ public interface RetryTaskResponseVOConverter { RetryTaskResponseVOConverter INSTANCE = Mappers.getMapper(RetryTaskResponseVOConverter.class); - RetryTaskResponseVO convert(Retry retry); + RetryResponseVO convert(Retry retry); - List convertList(List retries); + List convertList(List retries); } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/TaskContextConverter.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/TaskContextConverter.java index 6008531a..1273b138 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/TaskContextConverter.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/TaskContextConverter.java @@ -2,7 +2,7 @@ package com.aizuda.snailjob.server.web.service.convert; import com.aizuda.snailjob.server.model.dto.RetryTaskDTO; import com.aizuda.snailjob.server.retry.task.support.generator.retry.TaskContext; -import com.aizuda.snailjob.server.web.model.request.RetryTaskSaveRequestVO; +import com.aizuda.snailjob.server.web.model.request.RetrySaveRequestVO; import org.mapstruct.Mapper; import org.mapstruct.factory.Mappers; @@ -17,7 +17,7 @@ import java.util.List; public interface TaskContextConverter { TaskContextConverter INSTANCE = Mappers.getMapper(TaskContextConverter.class); - TaskContext.TaskInfo convert(RetryTaskSaveRequestVO retryTaskSaveRequestVO); + TaskContext.TaskInfo convert(RetrySaveRequestVO retrySaveRequestVO); List convert(List retryTasks); } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskLogServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskLogServiceImpl.java index 4e615593..f77c646b 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskLogServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskLogServiceImpl.java @@ -10,7 +10,7 @@ import com.aizuda.snailjob.common.log.constant.LogFieldConstants; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.web.model.base.PageResult; import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO; -import com.aizuda.snailjob.server.web.model.request.RetryTaskLogQueryVO; +import com.aizuda.snailjob.server.web.model.request.RetryTaskQueryVO; import com.aizuda.snailjob.server.web.model.request.UserSessionVO; import com.aizuda.snailjob.server.web.model.response.RetryTaskLogMessageResponseVO; import com.aizuda.snailjob.server.web.model.response.RetryTaskLogResponseVO; @@ -43,37 +43,30 @@ public class RetryTaskLogServiceImpl implements RetryTaskLogService { private final RetryTaskLogMessageMapper retryTaskLogMessageMapper; @Override - public PageResult> getRetryTaskLogPage(RetryTaskLogQueryVO queryVO) { + public PageResult> getRetryTaskLogPage(RetryTaskQueryVO queryVO) { PageDTO pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize()); -// UserSessionVO userSessionVO = UserSessionUtils.currentUserSession(); -// String namespaceId = userSessionVO.getNamespaceId(); -// -// List groupNames = UserSessionUtils.getGroupNames(queryVO.getGroupName()); -// -// LambdaQueryWrapper retryTaskLogLambdaQueryWrapper = new LambdaQueryWrapper() -// .eq(RetryTask::getNamespaceId, namespaceId) -// .in(CollUtil.isNotEmpty(groupNames), RetryTask::getGroupName, groupNames) -// .eq(StrUtil.isNotBlank(queryVO.getSceneName()), RetryTask::getSceneName, queryVO.getSceneName()) -// .eq(StrUtil.isNotBlank(queryVO.getBizNo()), RetryTask::getBizNo, queryVO.getBizNo()) -// .eq(StrUtil.isNotBlank(queryVO.getUniqueId()), RetryTask::getUniqueId, queryVO.getUniqueId()) -// .eq(StrUtil.isNotBlank(queryVO.getIdempotentId()), RetryTask::getIdempotentId, queryVO.getIdempotentId()) -// .eq(queryVO.getRetryStatus() != null, RetryTask::getTaskStatus, queryVO.getRetryStatus()) -// .between(ObjUtil.isNotNull(queryVO.getDatetimeRange()), -// RetryTask::getCreateDt, queryVO.getStartDt(), queryVO.getEndDt()) -// .select(RetryTask::getGroupName, RetryTask::getId, -// RetryTask::getSceneName, -// RetryTask::getIdempotentId, RetryTask::getBizNo, RetryTask::getTaskStatus, -// RetryTask::getCreateDt, RetryTask::getUniqueId, RetryTask::getTaskType) -// .orderByDesc(RetryTask::getCreateDt); -// PageDTO retryTaskLogPageDTO = retryTaskMapper.selectPage(pageDTO, -// retryTaskLogLambdaQueryWrapper); + UserSessionVO userSessionVO = UserSessionUtils.currentUserSession(); + String namespaceId = userSessionVO.getNamespaceId(); -// return new PageResult<>( -// retryTaskLogPageDTO, -// RetryTaskLogResponseVOConverter.INSTANCE.convertList(retryTaskLogPageDTO.getRecords())); + List groupNames = UserSessionUtils.getGroupNames(queryVO.getGroupName()); + + LambdaQueryWrapper wrapper = new LambdaQueryWrapper() + .eq(RetryTask::getNamespaceId, namespaceId) + .in(CollUtil.isNotEmpty(groupNames), RetryTask::getGroupName, groupNames) + .eq(StrUtil.isNotBlank(queryVO.getSceneName()), RetryTask::getSceneName, queryVO.getSceneName()) + .eq(queryVO.getRetryStatus() != null, RetryTask::getTaskStatus, queryVO.getRetryStatus()) + .eq(Objects.nonNull(queryVO.getRetryId()), RetryTask::getRetryId, queryVO.getRetryId()) + .between(ObjUtil.isNotNull(queryVO.getDatetimeRange()), + RetryTask::getCreateDt, queryVO.getStartDt(), queryVO.getEndDt()) + .select(RetryTask::getGroupName, RetryTask::getId, RetryTask::getSceneName, RetryTask::getTaskStatus, + RetryTask::getCreateDt, RetryTask::getTaskType, RetryTask::getOperationReason) + .orderByDesc(RetryTask::getCreateDt); + + PageDTO retryTaskPageDTO = retryTaskMapper.selectPage(pageDTO, wrapper); + return new PageResult<>(retryTaskPageDTO, + RetryTaskLogResponseVOConverter.INSTANCE.convertList(retryTaskPageDTO.getRecords())); - return null; } @Override diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java index 5e434682..2e4474d9 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java @@ -29,7 +29,7 @@ import com.aizuda.snailjob.server.retry.task.support.generator.retry.TaskGenerat import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; import com.aizuda.snailjob.server.web.model.base.PageResult; import com.aizuda.snailjob.server.web.model.request.*; -import com.aizuda.snailjob.server.web.model.response.RetryTaskResponseVO; +import com.aizuda.snailjob.server.web.model.response.RetryResponseVO; import com.aizuda.snailjob.server.web.service.RetryTaskService; import com.aizuda.snailjob.server.web.service.convert.RetryTaskResponseVOConverter; import com.aizuda.snailjob.server.web.service.convert.TaskContextConverter; @@ -71,14 +71,11 @@ public class RetryTaskServiceImpl implements RetryTaskService { @Autowired @Lazy private List taskGenerators; -// @Lazy -// @Autowired -// private List taskExecutors; @Autowired private RetryTaskLogMessageMapper retryTaskLogMessageMapper; @Override - public PageResult> getRetryTaskPage(RetryTaskQueryVO queryVO) { + public PageResult> getRetryPage(RetryQueryVO queryVO) { PageDTO pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize()); String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); @@ -102,12 +99,22 @@ public class RetryTaskServiceImpl implements RetryTaskService { Retry::getTaskType) .orderByDesc(Retry::getCreateDt); pageDTO = accessTemplate.getRetryAccess().listPage(pageDTO, queryWrapper); - return new PageResult<>(pageDTO, - RetryTaskResponseVOConverter.INSTANCE.convertList(pageDTO.getRecords())); + + Set ids = StreamUtils.toSet(pageDTO.getRecords(), Retry::getId); + List callbackTaskList = accessTemplate.getRetryAccess().list(new LambdaQueryWrapper().eq(Retry::getParentId, ids)); + + Map callbackMap = StreamUtils.toIdentityMap(callbackTaskList, Retry::getParentId); + + List retryResponseList = RetryTaskResponseVOConverter.INSTANCE.convertList(pageDTO.getRecords()); + for (RetryResponseVO retryResponseVO : retryResponseList) { + retryResponseVO.setChildren(RetryTaskResponseVOConverter.INSTANCE.convert(callbackMap.get(retryResponseVO.getId()))); + } + + return new PageResult<>(pageDTO, retryResponseList); } @Override - public RetryTaskResponseVO getRetryTaskById(String groupName, Long id) { + public RetryResponseVO getRetryById(String groupName, Long id) { TaskAccess retryTaskAccess = accessTemplate.getRetryAccess(); Retry retry = retryTaskAccess.one(new LambdaQueryWrapper().eq(Retry::getId, id)); return RetryTaskResponseVOConverter.INSTANCE.convert(retry); @@ -115,7 +122,7 @@ public class RetryTaskServiceImpl implements RetryTaskService { @Override @Transactional - public int updateRetryTaskStatus(RetryTaskUpdateStatusRequestVO requestVO) { + public int updateRetryTaskStatus(RetryUpdateStatusRequestVO requestVO) { RetryStatusEnum retryStatusEnum = RetryStatusEnum.getByStatus(requestVO.getRetryStatus()); if (Objects.isNull(retryStatusEnum)) { @@ -166,7 +173,7 @@ public class RetryTaskServiceImpl implements RetryTaskService { } @Override - public int saveRetryTask(final RetryTaskSaveRequestVO retryTaskRequestVO) { + public int saveRetryTask(final RetrySaveRequestVO retryTaskRequestVO) { RetryStatusEnum retryStatusEnum = RetryStatusEnum.getByStatus(retryTaskRequestVO.getRetryStatus()); if (Objects.isNull(retryStatusEnum)) { throw new SnailJobServerException("重试状态错误"); @@ -230,7 +237,7 @@ public class RetryTaskServiceImpl implements RetryTaskService { } @Override - public int updateRetryTaskExecutorName(final RetryTaskUpdateExecutorNameRequestVO requestVO) { + public int updateRetryExecutorName(final RetryUpdateExecutorNameRequestVO requestVO) { Retry retry = new Retry(); retry.setExecutorName(requestVO.getExecutorName()); @@ -248,7 +255,7 @@ public class RetryTaskServiceImpl implements RetryTaskService { @Override @Transactional - public boolean batchDeleteRetryTask(final BatchDeleteRetryTaskVO requestVO) { + public boolean batchDeleteRetry(final BatchDeleteRetryTaskVO requestVO) { TaskAccess retryTaskAccess = accessTemplate.getRetryAccess(); String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); @@ -342,7 +349,7 @@ public class RetryTaskServiceImpl implements RetryTaskService { } @Override - public boolean manualTriggerRetryTask(ManualTriggerTaskRequestVO requestVO) { + public boolean manualTriggerRetry(ManualTriggerTaskRequestVO requestVO) { String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); long count = accessTemplate.getGroupConfigAccess().count(new LambdaQueryWrapper() @@ -375,7 +382,7 @@ public class RetryTaskServiceImpl implements RetryTaskService { } @Override - public boolean manualTriggerCallbackTask(ManualTriggerTaskRequestVO requestVO) { + public boolean manualTriggerCallback(ManualTriggerTaskRequestVO requestVO) { List uniqueIds = requestVO.getUniqueIds(); String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();