From d252e46d76dc829cc9e87d1b0751d59aa832aed4 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Sat, 22 Feb 2025 21:35:31 +0800 Subject: [PATCH] =?UTF-8?q?feat(1.4.0-beta1):=201.=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E9=87=8D=E8=AF=95=E7=BB=93=E6=9E=9C=E5=A4=84=E7=90=86=E5=99=A8?= =?UTF-8?q?=202.=20=E6=94=AF=E6=8C=81=E4=BB=BB=E5=8A=A1=E5=81=9C=E6=AD=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../CallbackTaskExecutorFutureCallback.java | 18 +- .../RetryTaskExecutorFutureCallback.java | 14 +- .../client/core/client/RetryClient.java | 7 +- .../core/client/SnailRetryEndPoint.java | 3 +- .../DispatchCallbackResultRequest.java | 20 + .../request/DispatchRetryResultRequest.java | 2 +- .../common/core/constant/SystemConstants.java | 5 + .../core/enums/RetryOperationReasonEnum.java | 17 +- .../core/enums/RetryTaskStatusEnum.java | 2 + ...skBatchMapper.xml => RetryTaskService.xml} | 0 .../enums/RetryTaskExecutorSceneEnum.java | 24 + .../task/dto/RetryExecutorResultDTO.java | 6 +- .../retry/task/dto/RetryTaskPrepareDTO.java | 2 + .../server/retry/task/dto/TaskStopJobDTO.java | 3 + .../task/support/RetryTaskConverter.java | 8 +- .../block/DiscardRetryBlockStrategy.java | 2 +- .../block/OverlayRetryBlockStrategy.java | 4 +- .../dispatch/RequestCallbackClientActor.java | 4 +- .../dispatch/RequestRetryClientActor.java | 4 +- .../task/support/dispatch/ScanRetryActor.java | 2 + .../support/handler/RetryTaskStopHandler.java | 16 +- ...eportCallbackResultHttpRequestHandler.java | 71 +++ ...eportDispatchResultHttpRequestHandler.java | 20 +- .../support/result/RetryFailureHandler.java | 86 ++- .../support/result/RetryResultContext.java | 15 +- .../task/support/result/RetryStopHandler.java | 61 +- .../support/result/RetrySuccessHandler.java | 44 +- .../web/controller/RetryController.java | 29 +- .../web/controller/RetryTaskController.java | 30 +- .../request/ManualTriggerTaskRequestVO.java | 4 +- ...sponseVO.java => RetryTaskResponseVO.java} | 3 +- .../server/web/service/RetryService.java | 83 +++ .../web/service/RetryTaskLogService.java | 28 - .../server/web/service/RetryTaskService.java | 82 +-- .../web/service/convert/RetryConverter.java | 22 + .../RetryTaskLogResponseVOConverter.java | 6 +- .../web/service/impl/RetryServiceImpl.java | 404 +++++++++++++ .../service/impl/RetryTaskLogServiceImpl.java | 207 ------- .../service/impl/RetryTaskServiceImpl.java | 549 ++++++------------ 39 files changed, 1015 insertions(+), 892 deletions(-) create mode 100644 snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchCallbackResultRequest.java rename snail-job-datasource/snail-job-datasource-template/src/main/resources/template/mapper/{JobTaskBatchMapper.xml => RetryTaskService.xml} (100%) create mode 100644 snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/enums/RetryTaskExecutorSceneEnum.java create mode 100644 snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/request/ReportCallbackResultHttpRequestHandler.java rename snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/{RetryTaskLogResponseVO.java => RetryTaskResponseVO.java} (92%) create mode 100644 snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryService.java delete mode 100644 snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryTaskLogService.java create mode 100644 snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/RetryConverter.java create mode 100644 snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryServiceImpl.java delete mode 100644 snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskLogServiceImpl.java diff --git a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/callback/future/CallbackTaskExecutorFutureCallback.java b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/callback/future/CallbackTaskExecutorFutureCallback.java index b6b53d26..3d52714f 100644 --- a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/callback/future/CallbackTaskExecutorFutureCallback.java +++ b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/callback/future/CallbackTaskExecutorFutureCallback.java @@ -4,9 +4,11 @@ import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder; import com.aizuda.snailjob.client.core.context.CallbackContext; import com.aizuda.snailjob.client.core.client.RetryClient; import com.aizuda.snailjob.client.model.DispatchRetryResultDTO; +import com.aizuda.snailjob.client.model.request.DispatchCallbackResultRequest; import com.aizuda.snailjob.client.model.request.DispatchRetryResultRequest; import com.aizuda.snailjob.client.model.request.RetryCallbackResultRequest; import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum; +import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum; import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.model.SnailJobRpcResult; import com.aizuda.snailjob.common.log.SnailJobLog; @@ -44,9 +46,9 @@ public class CallbackTaskExecutorFutureCallback implements FutureCallback(Boolean.TRUE); } @@ -157,6 +157,7 @@ public class SnailRetryEndPoint implements Lifecycle { // 将任务添加到时间轮中,到期停止任务 TimerManager.add(new StopTaskTimerTask(callbackDTO.getRetryTaskId()), callbackDTO.getExecutorTimeout(), TimeUnit.SECONDS); + SnailJobLog.REMOTE.info("回调任务:[{}] 调度成功. ", callbackDTO.getRetryTaskId()); return new Result<>(Boolean.TRUE); } diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchCallbackResultRequest.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchCallbackResultRequest.java new file mode 100644 index 00000000..8f59cdcc --- /dev/null +++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchCallbackResultRequest.java @@ -0,0 +1,20 @@ +package com.aizuda.snailjob.client.model.request; + +import lombok.Data; + +/** + * @author: opensnail + * @date : 2023-09-26 15:10 + */ +@Data +public class DispatchCallbackResultRequest { + + private String namespaceId; + private String groupName; + private String sceneName; + private Long retryId; + private Long retryTaskId; + private Integer taskStatus; + + private String exceptionMsg; +} diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchRetryResultRequest.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchRetryResultRequest.java index 3cea4350..d3378c27 100644 --- a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchRetryResultRequest.java +++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchRetryResultRequest.java @@ -14,8 +14,8 @@ public class DispatchRetryResultRequest { private String sceneName; private Long retryId; private Long retryTaskId; + private Integer taskStatus; private String result; - private Integer statusCode; private String exceptionMsg; } diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java index 7dc9a87b..ac18a8c7 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java @@ -80,6 +80,11 @@ public interface SystemConstants { */ String REPORT_RETRY_DISPATCH_RESULT = "/report/retry/dispatch/result"; + /** + * 上报客户端回调执行结果 + */ + String REPORT_CALLBACK_RESULT = "/report/retry/callback/result"; + /** * 批量日志上报 */ diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/RetryOperationReasonEnum.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/RetryOperationReasonEnum.java index 62147346..ae4d9ee8 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/RetryOperationReasonEnum.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/RetryOperationReasonEnum.java @@ -4,9 +4,6 @@ import cn.hutool.core.util.StrUtil; import lombok.AllArgsConstructor; import lombok.Getter; -import java.util.Arrays; -import java.util.List; - /** * 标识某个操作的具体原因 * @@ -21,9 +18,9 @@ public enum RetryOperationReasonEnum { NONE(0, StrUtil.EMPTY), TASK_EXECUTION_TIMEOUT(1, "任务执行超时"), NOT_CLIENT(2, "无客户端节点"), - JOB_CLOSED(3, "JOB已关闭"), - JOB_DISCARD(4, "任务丢弃"), - JOB_OVERLAY(5, "任务被覆盖"), + RETRY_SUSPEND(3, "重试已暂停"), + RETRY_TASK_DISCARD(4, "任务丢弃"), + RETRY_TASK_OVERLAY(5, "任务被覆盖"), TASK_EXECUTION_ERROR(6, "任务执行期间发生非预期异常"), MANNER_STOP(7, "手动停止"), NOT_RUNNING_RETRY(8, "当前重试非运行中"), @@ -35,9 +32,13 @@ public enum RetryOperationReasonEnum { private final int reason; private final String desc; - public static RetryOperationReasonEnum getWorkflowNotifyScene(Integer notifyScene) { + public static RetryOperationReasonEnum of(Integer operationReason) { + if (operationReason == null) { + return NONE; + } + for (RetryOperationReasonEnum sceneEnum : RetryOperationReasonEnum.values()) { - if (sceneEnum.getReason() == notifyScene) { + if (sceneEnum.getReason() == operationReason) { return sceneEnum; } } diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/RetryTaskStatusEnum.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/RetryTaskStatusEnum.java index 5832aafd..454b4261 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/RetryTaskStatusEnum.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/RetryTaskStatusEnum.java @@ -59,6 +59,8 @@ public enum RetryTaskStatusEnum { public static final Set TERMINAL_STATUS_SET = Sets.newHashSet(SUCCESS.getStatus(), FAIL.getStatus(), STOP.getStatus(), CANCEL.getStatus()); + public static final Set NOT_SUCCESS = Sets.newHashSet(FAIL.getStatus(), STOP.getStatus(), CANCEL.getStatus()); + public static RetryTaskStatusEnum getByStatus(@NonNull Integer status) { for (RetryTaskStatusEnum value : RetryTaskStatusEnum.values()) { if (Objects.equals(value.status, status)) { diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/resources/template/mapper/JobTaskBatchMapper.xml b/snail-job-datasource/snail-job-datasource-template/src/main/resources/template/mapper/RetryTaskService.xml similarity index 100% rename from snail-job-datasource/snail-job-datasource-template/src/main/resources/template/mapper/JobTaskBatchMapper.xml rename to snail-job-datasource/snail-job-datasource-template/src/main/resources/template/mapper/RetryTaskService.xml diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/enums/RetryTaskExecutorSceneEnum.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/enums/RetryTaskExecutorSceneEnum.java new file mode 100644 index 00000000..3101b7e9 --- /dev/null +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/enums/RetryTaskExecutorSceneEnum.java @@ -0,0 +1,24 @@ +package com.aizuda.snailjob.server.common.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + *

+ * + *

+ * + * @author opensnail + * @date 2025-02-22 + */ +@AllArgsConstructor +@Getter +public enum RetryTaskExecutorSceneEnum { + AUTO_RETRY(1, SyetemTaskTypeEnum.RETRY), + MANUAL_RETRY(2, SyetemTaskTypeEnum.RETRY), + AUTO_CALLBACK(3, SyetemTaskTypeEnum.CALLBACK), + MANUAL_CALLBACK(4, SyetemTaskTypeEnum.CALLBACK); + + private final int scene; + private final SyetemTaskTypeEnum taskType; +} diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryExecutorResultDTO.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryExecutorResultDTO.java index 4fa0b39a..e0ed340e 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryExecutorResultDTO.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryExecutorResultDTO.java @@ -1,7 +1,5 @@ package com.aizuda.snailjob.server.retry.task.dto; -import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum; -import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum; import lombok.Data; import lombok.EqualsAndHashCode; @@ -17,12 +15,10 @@ import lombok.EqualsAndHashCode; @Data public class RetryExecutorResultDTO extends BaseDTO { - private Integer resultStatus; private Integer operationReason; private boolean incrementRetryCount; private String resultJson; - private Integer statusCode; - private String idempotentId; private String exceptionMsg; + private Integer taskStatus; } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryTaskPrepareDTO.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryTaskPrepareDTO.java index 60ec581b..66a77ecc 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryTaskPrepareDTO.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryTaskPrepareDTO.java @@ -34,4 +34,6 @@ public class RetryTaskPrepareDTO { private boolean onlyTimeoutCheck; private Integer executorTimeout; + + private Integer retryTaskExecutorScene; } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/TaskStopJobDTO.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/TaskStopJobDTO.java index d9d1af8f..603cc8dc 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/TaskStopJobDTO.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/TaskStopJobDTO.java @@ -20,6 +20,9 @@ public class TaskStopJobDTO extends BaseDTO { */ private Integer operationReason; + /** + * 若是失败补充失败信息 + */ private String message; /** diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskConverter.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskConverter.java index 36b22702..3eec3de8 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskConverter.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskConverter.java @@ -2,9 +2,7 @@ package com.aizuda.snailjob.server.retry.task.support; import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.client.model.DispatchRetryResultDTO; -import com.aizuda.snailjob.client.model.request.RetryCallbackRequest; -import com.aizuda.snailjob.client.model.request.DispatchRetryRequest; -import com.aizuda.snailjob.client.model.request.StopRetryRequest; +import com.aizuda.snailjob.client.model.request.*; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO; import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo; @@ -125,7 +123,9 @@ public interface RetryTaskConverter { RequestRetryExecutorDTO toRealRetryExecutorDTO(TaskStopJobDTO stopJobDTO); - RetryExecutorResultDTO toRetryExecutorResultDTO(DispatchRetryResultDTO resultDTO); + RetryExecutorResultDTO toRetryExecutorResultDTO(DispatchRetryResultRequest resultDTO); + + RetryExecutorResultDTO toRetryExecutorResultDTO(DispatchCallbackResultRequest resultDTO); RetryExecutorResultDTO toRetryExecutorResultDTO(TaskStopJobDTO resultDTO); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/block/DiscardRetryBlockStrategy.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/block/DiscardRetryBlockStrategy.java index 560182ea..a95babb5 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/block/DiscardRetryBlockStrategy.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/block/DiscardRetryBlockStrategy.java @@ -22,7 +22,7 @@ public class DiscardRetryBlockStrategy extends AbstracJobBlockStrategy { // 重新生成任务 RetryTaskGeneratorDTO generatorDTO = RetryTaskConverter.INSTANCE.toRetryTaskGeneratorDTO(context); generatorDTO.setTaskStatus(RetryTaskStatusEnum.CANCEL.getStatus()); - generatorDTO.setOperationReason(RetryOperationReasonEnum.JOB_DISCARD.getReason()); + generatorDTO.setOperationReason(RetryOperationReasonEnum.RETRY_TASK_DISCARD.getReason()); retryTaskGeneratorHandler.generateRetryTask(generatorDTO); } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/block/OverlayRetryBlockStrategy.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/block/OverlayRetryBlockStrategy.java index 5550ae34..b5f68e3d 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/block/OverlayRetryBlockStrategy.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/block/OverlayRetryBlockStrategy.java @@ -31,12 +31,12 @@ public class OverlayRetryBlockStrategy extends AbstracJobBlockStrategy { // 重新生成任务 RetryTaskGeneratorDTO generatorDTO = RetryTaskConverter.INSTANCE.toRetryTaskGeneratorDTO(context); generatorDTO.setTaskStatus(RetryTaskStatusEnum.CANCEL.getStatus()); - generatorDTO.setOperationReason(RetryOperationReasonEnum.JOB_DISCARD.getReason()); + generatorDTO.setOperationReason(RetryOperationReasonEnum.RETRY_TASK_DISCARD.getReason()); retryTaskGeneratorHandler.generateRetryTask(generatorDTO); TaskStopJobDTO stopJobDTO = RetryTaskConverter.INSTANCE.toTaskStopJobDTO(context); if (Objects.isNull(context.getOperationReason()) || context.getOperationReason() == JobOperationReasonEnum.NONE.getReason()) { - stopJobDTO.setOperationReason(RetryOperationReasonEnum.JOB_OVERLAY.getReason()); + stopJobDTO.setOperationReason(RetryOperationReasonEnum.RETRY_TASK_OVERLAY.getReason()); } stopJobDTO.setNeedUpdateTaskStatus(true); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestCallbackClientActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestCallbackClientActor.java index ccfd20e6..840a47c4 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestCallbackClientActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestCallbackClientActor.java @@ -3,7 +3,7 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch; import akka.actor.AbstractActor; import akka.actor.ActorRef; import com.aizuda.snailjob.client.model.request.RetryCallbackRequest; -import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum; +import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum; import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.model.Result; import com.aizuda.snailjob.common.log.SnailJobLog; @@ -188,7 +188,7 @@ public class RequestCallbackClientActor extends AbstractActor { ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor(); RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO); executorResultDTO.setExceptionMsg(message); - executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE.getStatus()); + executorResultDTO.setTaskStatus(RetryTaskStatusEnum.FAIL.getStatus()); actorRef.tell(executorResultDTO, actorRef); } } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestRetryClientActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestRetryClientActor.java index d6938c69..31e62f9f 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestRetryClientActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestRetryClientActor.java @@ -3,7 +3,7 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch; import akka.actor.AbstractActor; import akka.actor.ActorRef; import com.aizuda.snailjob.client.model.request.DispatchRetryRequest; -import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum; +import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum; import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.model.Result; import com.aizuda.snailjob.common.core.model.SnailJobHeaders; @@ -182,7 +182,7 @@ public class RequestRetryClientActor extends AbstractActor { ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor(); RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO); executorResultDTO.setExceptionMsg(message); - executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE.getStatus()); + executorResultDTO.setTaskStatus(RetryTaskStatusEnum.FAIL.getStatus()); actorRef.tell(executorResultDTO, actorRef); } } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java index bf03ceee..2c3738ad 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java @@ -14,6 +14,7 @@ import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.config.SystemProperties; import com.aizuda.snailjob.server.common.dto.PartitionTask; import com.aizuda.snailjob.server.common.dto.ScanTask; +import com.aizuda.snailjob.server.common.enums.RetryTaskExecutorSceneEnum; import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.snailjob.server.common.strategy.WaitStrategies; import com.aizuda.snailjob.server.common.util.DateUtils; @@ -166,6 +167,7 @@ public class ScanRetryActor extends AbstractActor { RetryTaskPrepareDTO retryTaskPrepareDTO = RetryTaskConverter.INSTANCE.toRetryTaskPrepareDTO(partitionTask); retryTaskPrepareDTO.setBlockStrategy(retrySceneConfig.getBackOff()); retryTaskPrepareDTO.setExecutorTimeout(retrySceneConfig.getExecutorTimeout()); + retryTaskPrepareDTO.setRetryTaskExecutorScene(RetryTaskExecutorSceneEnum.AUTO_RETRY.getScene()); waitExecRetries.add(retryTaskPrepareDTO); } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/RetryTaskStopHandler.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/RetryTaskStopHandler.java index ce96f59c..30de1fe0 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/RetryTaskStopHandler.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/RetryTaskStopHandler.java @@ -1,18 +1,12 @@ package com.aizuda.snailjob.server.retry.task.support.handler; import akka.actor.ActorRef; -import cn.hutool.core.lang.Assert; -import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum; -import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum; import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum; import com.aizuda.snailjob.server.common.akka.ActorGenerator; -import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.retry.task.dto.RequestRetryExecutorDTO; import com.aizuda.snailjob.server.retry.task.dto.RetryExecutorResultDTO; import com.aizuda.snailjob.server.retry.task.dto.TaskStopJobDTO; import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; -import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper; -import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; @@ -27,8 +21,6 @@ import org.springframework.stereotype.Component; @Component @RequiredArgsConstructor public class RetryTaskStopHandler { - private final RetryTaskMapper retryTaskMapper; - /** * 执行停止任务 * @@ -36,12 +28,6 @@ public class RetryTaskStopHandler { */ public void stop(TaskStopJobDTO stopJobDTO) { -// RetryTask retryTask = new RetryTask(); -// retryTask.setId(stopJobDTO.getRetryTaskId()); -// retryTask.setTaskStatus(RetryTaskStatusEnum.STOP.getStatus()); -// retryTask.setOperationReason(stopJobDTO.getOperationReason()); -// Assert.isTrue(1 == retryTaskMapper.updateById(retryTask), () -> new SnailJobServerException("update retry task failed")); - RequestRetryExecutorDTO executorDTO = RetryTaskConverter.INSTANCE.toRealRetryExecutorDTO(stopJobDTO); ActorRef actorRef = ActorGenerator.stopRetryTaskActor(); actorRef.tell(executorDTO, actorRef); @@ -56,7 +42,7 @@ public class RetryTaskStopHandler { } RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(stopJobDTO); executorResultDTO.setExceptionMsg(stopJobDTO.getMessage()); - executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE.getStatus()); + executorResultDTO.setTaskStatus(RetryTaskStatusEnum.FAIL.getStatus()); executorResultDTO.setOperationReason(stopJobDTO.getOperationReason()); ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor(); actorRef.tell(executorResultDTO, actorRef); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/request/ReportCallbackResultHttpRequestHandler.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/request/ReportCallbackResultHttpRequestHandler.java new file mode 100644 index 00000000..53dd6470 --- /dev/null +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/request/ReportCallbackResultHttpRequestHandler.java @@ -0,0 +1,71 @@ +package com.aizuda.snailjob.server.retry.task.support.request; + +import akka.actor.ActorRef; +import cn.hutool.core.lang.Assert; +import cn.hutool.core.net.url.UrlQuery; +import com.aizuda.snailjob.client.model.request.DispatchCallbackResultRequest; +import com.aizuda.snailjob.client.model.request.DispatchRetryResultRequest; +import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum; +import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum; +import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.core.model.SnailJobRequest; +import com.aizuda.snailjob.common.core.model.SnailJobRpcResult; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.server.common.akka.ActorGenerator; +import com.aizuda.snailjob.server.common.exception.SnailJobServerException; +import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler; +import com.aizuda.snailjob.server.retry.task.dto.RetryExecutorResultDTO; +import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.REPORT_CALLBACK_RESULT; + +/** + * 上报回调执行的处理结果 + * + * @author: opensnail + * @date : 2022-03-07 16:39 + * @since 1.0.0 + */ +@Component +@Slf4j +@RequiredArgsConstructor +public class ReportCallbackResultHttpRequestHandler extends PostHttpRequestHandler { + + @Override + public boolean supports(String path) { + return REPORT_CALLBACK_RESULT.equals(path); + } + + @Override + public HttpMethod method() { + return HttpMethod.POST; + } + + @Override + @Transactional + public SnailJobRpcResult doHandler(String content, UrlQuery urlQuery, HttpHeaders headers) { + SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class); + Object[] args = retryRequest.getArgs(); + + try { + DispatchCallbackResultRequest request = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), DispatchCallbackResultRequest.class); + RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(request); + RetryTaskStatusEnum statusEnum = RetryTaskStatusEnum.getByStatus(request.getTaskStatus()); + Assert.notNull(statusEnum, () -> new SnailJobServerException("task status code is invalid")); + executorResultDTO.setIncrementRetryCount(true); + ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor(); + actorRef.tell(executorResultDTO, actorRef); + + return new SnailJobRpcResult(StatusEnum.YES.getStatus(), "Report dispatch result processed successfully", Boolean.TRUE, retryRequest.getReqId()); + } catch (Exception e) { + return new SnailJobRpcResult(StatusEnum.YES.getStatus(), e.getMessage(), Boolean.FALSE, retryRequest.getReqId()); + } + } + +} diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/request/ReportDispatchResultHttpRequestHandler.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/request/ReportDispatchResultHttpRequestHandler.java index cd18b63a..b1df0428 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/request/ReportDispatchResultHttpRequestHandler.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/request/ReportDispatchResultHttpRequestHandler.java @@ -3,9 +3,9 @@ package com.aizuda.snailjob.server.retry.task.support.request; import akka.actor.ActorRef; import cn.hutool.core.lang.Assert; import cn.hutool.core.net.url.UrlQuery; -import com.aizuda.snailjob.client.model.DispatchRetryResultDTO; +import com.aizuda.snailjob.client.model.request.DispatchRetryResultRequest; import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum; -import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum; +import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum; import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.model.SnailJobRequest; import com.aizuda.snailjob.common.core.model.SnailJobRpcResult; @@ -15,7 +15,6 @@ import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler; import com.aizuda.snailjob.server.retry.task.dto.RetryExecutorResultDTO; import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; -import com.aizuda.snailjob.server.retry.task.support.RetryResultHandler; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; import lombok.RequiredArgsConstructor; @@ -23,8 +22,6 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; -import java.util.List; - import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.REPORT_RETRY_DISPATCH_RESULT; /** @@ -56,15 +53,14 @@ public class ReportDispatchResultHttpRequestHandler extends PostHttpRequestHandl Object[] args = retryRequest.getArgs(); try { - DispatchRetryResultDTO resultDTO = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), DispatchRetryResultDTO.class); - RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(resultDTO); - RetryResultStatusEnum statusEnum = RetryResultStatusEnum.getRetryResultStatusEnum(resultDTO.getStatusCode()); - Assert.notNull(statusEnum, () -> new SnailJobServerException("status code is invalid")); - executorResultDTO.setResultStatus(statusEnum.getStatus()); + DispatchRetryResultRequest request = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), DispatchRetryResultRequest.class); + RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(request); + RetryTaskStatusEnum statusEnum = RetryTaskStatusEnum.getByStatus(request.getTaskStatus()); + Assert.notNull(statusEnum, () -> new SnailJobServerException("task status code is invalid")); executorResultDTO.setIncrementRetryCount(true); - if (RetryResultStatusEnum.FAILURE.getStatus().equals(statusEnum.getStatus())) { + if (RetryTaskStatusEnum.FAIL.getStatus().equals(statusEnum.getStatus())) { executorResultDTO.setOperationReason(RetryOperationReasonEnum.RETRY_FAIL.getReason()); - } else if (RetryResultStatusEnum.STOP.getStatus().equals(statusEnum.getStatus())) { + } else if (RetryTaskStatusEnum.STOP.getStatus().equals(statusEnum.getStatus())) { executorResultDTO.setOperationReason(RetryOperationReasonEnum.CLIENT_TRIGGER_RETRY_STOP.getReason()); } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryFailureHandler.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryFailureHandler.java index 009a5eb1..50b26fc0 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryFailureHandler.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryFailureHandler.java @@ -2,13 +2,12 @@ package com.aizuda.snailjob.server.retry.task.support.result; import cn.hutool.core.lang.Assert; import com.aizuda.snailjob.common.core.context.SnailSpringContext; -import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum; +import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.RetryStatusEnum; import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum; import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.retry.task.dto.RetryTaskFailAlarmEventDTO; -import com.aizuda.snailjob.server.retry.task.support.RetryResultHandler; import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailAlarmEvent; import com.aizuda.snailjob.server.retry.task.support.handler.CallbackRetryTaskHandler; @@ -18,22 +17,20 @@ import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMappe import com.aizuda.snailjob.template.datasource.persistence.po.Retry; import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionTemplate; import java.time.LocalDateTime; -import java.util.Objects; +import java.util.Optional; import static com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum.RETRY_TASK_FAIL_ERROR; +import static com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum.NOT_SUCCESS; /** *

- * + * 客户端执行重试失败、服务端调度时失败等场景导致的任务执行失败 *

* * @author opensnail @@ -51,7 +48,9 @@ public class RetryFailureHandler extends AbstractRetryResultHandler { @Override public boolean supports(RetryResultContext context) { - return Objects.equals(RetryResultStatusEnum.FAILURE.getStatus(), context.getResultStatus()); + RetryOperationReasonEnum reasonEnum = RetryOperationReasonEnum.of(context.getOperationReason()); + return NOT_SUCCESS.contains(context.getTaskStatus()) + && RetryOperationReasonEnum.CLIENT_TRIGGER_RETRY_STOP.getReason() != reasonEnum.getReason(); } @Override @@ -61,47 +60,46 @@ public class RetryFailureHandler extends AbstractRetryResultHandler { context.getGroupName(), context.getSceneName(), context.getNamespaceId()); Retry retry = retryMapper.selectById(context.getRetryId()); - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - @Override - protected void doInTransactionWithoutResult(TransactionStatus status) { + transactionTemplate.execute(status -> { - Integer maxRetryCount; - if (SyetemTaskTypeEnum.CALLBACK.getType().equals(retry.getTaskType())) { - maxRetryCount = retrySceneConfig.getCbMaxCount(); - } else { - maxRetryCount = retrySceneConfig.getMaxRetryCount(); - } + Integer maxRetryCount; + if (SyetemTaskTypeEnum.CALLBACK.getType().equals(retry.getTaskType())) { + maxRetryCount = retrySceneConfig.getCbMaxCount(); + } else { + maxRetryCount = retrySceneConfig.getMaxRetryCount(); + } - if (maxRetryCount <= retry.getRetryCount() + 1) { - retry.setRetryStatus(RetryStatusEnum.MAX_COUNT.getStatus()); - retry.setRetryCount(retry.getRetryCount() + 1); - retry.setUpdateDt(LocalDateTime.now()); - retry.setDeleted(retry.getId()); - Assert.isTrue(1 == retryMapper.updateById(retry), - () -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName())); - // 创建一个回调任务 - callbackRetryTaskHandler.create(retry, retrySceneConfig); - } else if (context.isIncrementRetryCount()) { - retry.setRetryCount(retry.getRetryCount() + 1); - retry.setUpdateDt(LocalDateTime.now()); - retry.setDeleted(retry.getId()); - Assert.isTrue(1 == retryMapper.updateById(retry), - () -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName())); - - } - - RetryTask retryTask = new RetryTask(); - retryTask.setId(context.getRetryTaskId()); - retryTask.setTaskStatus(RetryTaskStatusEnum.FAIL.getStatus()); - retryTask.setOperationReason(context.getOperationReason()); - Assert.isTrue(1 == retryTaskMapper.updateById(retryTask), + if (maxRetryCount <= retry.getRetryCount() + 1) { + retry.setRetryStatus(RetryStatusEnum.MAX_COUNT.getStatus()); + retry.setRetryCount(retry.getRetryCount() + 1); + retry.setUpdateDt(LocalDateTime.now()); + retry.setDeleted(retry.getId()); + Assert.isTrue(1 == retryMapper.updateById(retry), + () -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName())); + // 创建一个回调任务 + callbackRetryTaskHandler.create(retry, retrySceneConfig); + } else if (context.isIncrementRetryCount()) { + retry.setRetryCount(retry.getRetryCount() + 1); + retry.setUpdateDt(LocalDateTime.now()); + retry.setDeleted(retry.getId()); + Assert.isTrue(1 == retryMapper.updateById(retry), () -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName())); - RetryTaskFailAlarmEventDTO retryTaskFailAlarmEventDTO = - RetryTaskConverter.INSTANCE.toRetryTaskFailAlarmEventDTO( - retry, context.getExceptionMsg(), RETRY_TASK_FAIL_ERROR.getNotifyScene()); - SnailSpringContext.getContext().publishEvent(new RetryTaskFailAlarmEvent(retryTaskFailAlarmEventDTO)); } + + RetryTask retryTask = new RetryTask(); + retryTask.setId(context.getRetryTaskId()); + retryTask.setTaskStatus(Optional.ofNullable(context.getTaskStatus()).orElse(RetryTaskStatusEnum.FAIL.getStatus())); + retryTask.setOperationReason(Optional.ofNullable(context.getOperationReason()).orElse(RetryOperationReasonEnum.NONE.getReason())); + Assert.isTrue(1 == retryTaskMapper.updateById(retryTask), + () -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName())); + + RetryTaskFailAlarmEventDTO retryTaskFailAlarmEventDTO = + RetryTaskConverter.INSTANCE.toRetryTaskFailAlarmEventDTO( + retry, context.getExceptionMsg(), RETRY_TASK_FAIL_ERROR.getNotifyScene()); + SnailSpringContext.getContext().publishEvent(new RetryTaskFailAlarmEvent(retryTaskFailAlarmEventDTO)); + return null; + }); } } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryResultContext.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryResultContext.java index 3281308a..9f711fc1 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryResultContext.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryResultContext.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.retry.task.support.result; -import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum; +import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum; import com.aizuda.snailjob.server.retry.task.dto.BaseDTO; import lombok.Data; import lombok.EqualsAndHashCode; @@ -17,8 +17,19 @@ import lombok.EqualsAndHashCode; @Data public class RetryResultContext extends BaseDTO { - private Integer resultStatus; +// /** +// * 客户端返回的结果 +// * @see RetryResultStatusEnum +// */ +// private Integer resultStatus; + + /** + * 重试任务状态 + * @see RetryTaskStatusEnum + */ + private Integer taskStatus; private Integer operationReason; + private boolean incrementRetryCount; private String resultJson; private String idempotentId; diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryStopHandler.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryStopHandler.java index 6fe6017a..2e41a603 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryStopHandler.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryStopHandler.java @@ -1,31 +1,25 @@ package com.aizuda.snailjob.server.retry.task.support.result; import cn.hutool.core.lang.Assert; -import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum; +import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.RetryStatusEnum; -import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; -import com.aizuda.snailjob.server.retry.task.support.RetryResultHandler; -import com.aizuda.snailjob.server.retry.task.support.handler.CallbackRetryTaskHandler; -import com.aizuda.snailjob.template.datasource.access.AccessTemplate; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.Retry; -import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionTemplate; import java.time.LocalDateTime; -import java.util.Objects; + +import static com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum.STOP; /** *

- * + * 客户端触发停止重试指令, 重试挂起 *

* * @author opensnail @@ -35,47 +29,38 @@ import java.util.Objects; @RequiredArgsConstructor @Slf4j public class RetryStopHandler extends AbstractRetryResultHandler { - - private final AccessTemplate accessTemplate; - private final CallbackRetryTaskHandler callbackRetryTaskHandler; private final TransactionTemplate transactionTemplate; private final RetryTaskMapper retryTaskMapper; private final RetryMapper retryMapper; @Override public boolean supports(RetryResultContext context) { - return Objects.equals(RetryResultStatusEnum.STOP.getStatus(), context.getResultStatus()); + RetryOperationReasonEnum reasonEnum = RetryOperationReasonEnum.of(context.getOperationReason()); + return STOP.getStatus().equals(context.getTaskStatus()) + && RetryOperationReasonEnum.CLIENT_TRIGGER_RETRY_STOP.getReason() == reasonEnum.getReason(); } @Override public void doHandler(RetryResultContext context) { - RetrySceneConfig retrySceneConfig = - accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName( - context.getGroupName(), context.getSceneName(), context.getNamespaceId()); - Retry retry = retryMapper.selectById(context.getRetryId()); + transactionTemplate.execute((status) -> { - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - @Override - protected void doInTransactionWithoutResult(TransactionStatus status) { + Retry retry = new Retry(); + retry.setId(context.getRetryId()); + retry.setRetryStatus(RetryStatusEnum.SUSPEND.getStatus()); + retry.setUpdateDt(LocalDateTime.now()); + retry.setRetryCount(retry.getRetryCount() + 1); + Assert.isTrue(1 == retryMapper.updateById(retry), + () -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", + retry.getGroupName())); - retry.setRetryStatus(RetryStatusEnum.FINISH.getStatus()); - retry.setUpdateDt(LocalDateTime.now()); - retry.setDeleted(retry.getId()); - retry.setRetryCount(retry.getRetryCount() + 1); - Assert.isTrue(1 == retryMapper.updateById(retry), - () -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", - retry.getGroupName())); + RetryTask retryTask = new RetryTask(); + retryTask.setId(context.getRetryTaskId()); + retryTask.setOperationReason(context.getOperationReason()); + retryTask.setTaskStatus(STOP.getStatus()); + Assert.isTrue(1 == retryTaskMapper.updateById(retryTask), + () -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName())); - RetryTask retryTask = new RetryTask(); - retryTask.setId(context.getRetryTaskId()); - retryTask.setOperationReason(context.getOperationReason()); - retryTask.setTaskStatus(RetryTaskStatusEnum.STOP.getStatus()); - Assert.isTrue(1 == retryTaskMapper.updateById(retryTask), - () -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName())); - - // 创建一个回调任务 - callbackRetryTaskHandler.create(retry, retrySceneConfig); - } + return null; }); } } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetrySuccessHandler.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetrySuccessHandler.java index 8ba2b764..4d87240b 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetrySuccessHandler.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetrySuccessHandler.java @@ -1,11 +1,9 @@ package com.aizuda.snailjob.server.retry.task.support.result; import cn.hutool.core.lang.Assert; -import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum; import com.aizuda.snailjob.common.core.enums.RetryStatusEnum; import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; -import com.aizuda.snailjob.server.retry.task.support.RetryResultHandler; import com.aizuda.snailjob.server.retry.task.support.handler.CallbackRetryTaskHandler; import com.aizuda.snailjob.template.datasource.access.AccessTemplate; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper; @@ -15,8 +13,6 @@ import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionTemplate; import java.time.LocalDateTime; @@ -24,7 +20,7 @@ import java.util.Objects; /** *

- * + * 任务执行成功 *

* * @author opensnail @@ -41,7 +37,7 @@ public class RetrySuccessHandler extends AbstractRetryResultHandler { @Override public boolean supports(RetryResultContext context) { - return Objects.equals(RetryResultStatusEnum.SUCCESS.getStatus(), context.getResultStatus()); + return Objects.equals(RetryTaskStatusEnum.SUCCESS.getStatus(), context.getTaskStatus()); } @Override @@ -52,28 +48,26 @@ public class RetrySuccessHandler extends AbstractRetryResultHandler { context.getGroupName(), context.getSceneName(), context.getNamespaceId()); Retry retry = retryMapper.selectById(context.getRetryId()); - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - @Override - protected void doInTransactionWithoutResult(TransactionStatus status) { - retry.setRetryStatus(RetryStatusEnum.FINISH.getStatus()); - retry.setUpdateDt(LocalDateTime.now()); - retry.setRetryCount(retry.getRetryCount() + 1); - retry.setDeleted(retry.getId()); - Assert.isTrue(1 == retryMapper.updateById(retry), - () -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", - retry.getGroupName())); + transactionTemplate.execute((status -> { - RetryTask retryTask = new RetryTask(); - retryTask.setId(context.getRetryTaskId()); - retryTask.setTaskStatus(RetryTaskStatusEnum.SUCCESS.getStatus()); - Assert.isTrue(1 == retryTaskMapper.updateById(retryTask), - () -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName())); + retry.setRetryStatus(RetryStatusEnum.FINISH.getStatus()); + retry.setUpdateDt(LocalDateTime.now()); + retry.setRetryCount(retry.getRetryCount() + 1); + retry.setDeleted(retry.getId()); + Assert.isTrue(1 == retryMapper.updateById(retry), + () -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", + retry.getGroupName())); - // 创建一个回调任务 - callbackRetryTaskHandler.create(retry, retrySceneConfig); + RetryTask retryTask = new RetryTask(); + retryTask.setId(context.getRetryTaskId()); + retryTask.setTaskStatus(RetryTaskStatusEnum.SUCCESS.getStatus()); + Assert.isTrue(1 == retryTaskMapper.updateById(retryTask), + () -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName())); + // 创建一个回调任务 + callbackRetryTaskHandler.create(retry, retrySceneConfig); - } - }); + return null; + })); } } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryController.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryController.java index 83381cd9..7b99434f 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryController.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryController.java @@ -5,7 +5,7 @@ import com.aizuda.snailjob.server.web.annotation.LoginRequired; import com.aizuda.snailjob.server.web.model.base.PageResult; import com.aizuda.snailjob.server.web.model.request.*; import com.aizuda.snailjob.server.web.model.response.RetryResponseVO; -import com.aizuda.snailjob.server.web.service.RetryTaskService; +import com.aizuda.snailjob.server.web.service.RetryService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; @@ -23,66 +23,61 @@ import java.util.List; public class RetryController { @Autowired - private RetryTaskService retryTaskService; + private RetryService retryService; @LoginRequired @GetMapping("list") public PageResult> getRetryTaskPage(RetryQueryVO queryVO) { - return retryTaskService.getRetryPage(queryVO); + return retryService.getRetryPage(queryVO); } @LoginRequired @GetMapping("{id}") public RetryResponseVO getRetryTaskById(@RequestParam("groupName") String groupName, @PathVariable("id") Long id) { - return retryTaskService.getRetryById(groupName, id); + return retryService.getRetryById(groupName, id); } @LoginRequired @PutMapping("status") public int updateRetryTaskStatus(@RequestBody RetryUpdateStatusRequestVO retryUpdateStatusRequestVO) { - return retryTaskService.updateRetryTaskStatus(retryUpdateStatusRequestVO); + return retryService.updateRetryTaskStatus(retryUpdateStatusRequestVO); } @LoginRequired @PostMapping public int saveRetryTask(@RequestBody @Validated RetrySaveRequestVO retryTaskRequestVO) { - return retryTaskService.saveRetryTask(retryTaskRequestVO); + return retryService.saveRetryTask(retryTaskRequestVO); } @LoginRequired @PostMapping("/generate/idempotent-id") public Result idempotentIdGenerate(@RequestBody @Validated GenerateRetryIdempotentIdVO generateRetryIdempotentIdVO) { - return new Result<>(retryTaskService.idempotentIdGenerate(generateRetryIdempotentIdVO)); + return new Result<>(retryService.idempotentIdGenerate(generateRetryIdempotentIdVO)); } @LoginRequired @PutMapping("/batch") public Integer updateRetryTaskExecutorName(@RequestBody @Validated RetryUpdateExecutorNameRequestVO requestVO) { - return retryTaskService.updateRetryExecutorName(requestVO); + return retryService.updateRetryExecutorName(requestVO); } @LoginRequired @DeleteMapping("/batch") - public boolean batchDeleteRetryTask(@RequestBody @Validated BatchDeleteRetryTaskVO requestVO) { - return retryTaskService.batchDeleteRetry(requestVO); + public boolean batchDeleteRetry(@RequestBody @Validated BatchDeleteRetryTaskVO requestVO) { + return retryService.batchDeleteRetry(requestVO); } @LoginRequired @PostMapping("/batch") public Integer parseLogs(@RequestBody @Validated ParseLogsVO parseLogsVO) { - return retryTaskService.parseLogs(parseLogsVO); + return retryService.parseLogs(parseLogsVO); } @LoginRequired @PostMapping("/manual/trigger/retry/task") public boolean manualTriggerRetryTask(@RequestBody @Validated ManualTriggerTaskRequestVO requestVO) { - return retryTaskService.manualTriggerRetry(requestVO); + return retryService.manualTriggerRetryTask(requestVO); } - @LoginRequired - @PostMapping("/manual/trigger/callback/task") - public boolean manualTriggerCallbackTask(@RequestBody @Validated ManualTriggerTaskRequestVO requestVO) { - return retryTaskService.manualTriggerCallback(requestVO); - } } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryTaskController.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryTaskController.java index c2ff7466..30fa3773 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryTaskController.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryTaskController.java @@ -5,8 +5,8 @@ import com.aizuda.snailjob.server.web.model.base.PageResult; import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO; import com.aizuda.snailjob.server.web.model.request.RetryTaskQueryVO; import com.aizuda.snailjob.server.web.model.response.RetryTaskLogMessageResponseVO; -import com.aizuda.snailjob.server.web.model.response.RetryTaskLogResponseVO; -import com.aizuda.snailjob.server.web.service.RetryTaskLogService; +import com.aizuda.snailjob.server.web.model.response.RetryTaskResponseVO; +import com.aizuda.snailjob.server.web.service.RetryTaskService; import jakarta.validation.constraints.NotEmpty; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @@ -25,35 +25,43 @@ import java.util.Set; public class RetryTaskController { @Autowired - private RetryTaskLogService retryTaskLogService; + private RetryTaskService retryTaskService; @LoginRequired @GetMapping("list") - public PageResult> getRetryTaskLogPage(RetryTaskQueryVO queryVO) { - return retryTaskLogService.getRetryTaskLogPage(queryVO); + public PageResult> getRetryTaskPage(RetryTaskQueryVO queryVO) { + return retryTaskService.getRetryTaskLogPage(queryVO); } @LoginRequired @GetMapping("/message/list") - public RetryTaskLogMessageResponseVO getRetryTaskLogPage(RetryTaskLogMessageQueryVO queryVO) { - return retryTaskLogService.getRetryTaskLogMessagePage(queryVO); + public RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage(RetryTaskLogMessageQueryVO queryVO) { + return retryTaskService.getRetryTaskLogMessagePage(queryVO); } @LoginRequired @GetMapping("{id}") - public RetryTaskLogResponseVO getRetryTaskLogById(@PathVariable("id") Long id) { - return retryTaskLogService.getRetryTaskLogById(id); + public RetryTaskResponseVO getRetryTaskById(@PathVariable("id") Long id) { + return retryTaskService.getRetryTaskById(id); + } + + @LoginRequired + @PostMapping("/stop/{id}") + public Boolean stopById(@PathVariable("id") Long id) { + return retryTaskService.stopById(id); } @LoginRequired @DeleteMapping("{id}") public Boolean deleteById(@PathVariable("id") Long id) { - return retryTaskLogService.deleteById(id); + return retryTaskService.deleteById(id); } @LoginRequired @DeleteMapping("ids") public Boolean batchDelete(@RequestBody @NotEmpty(message = "ids不能为空") Set ids) { - return retryTaskLogService.batchDelete(ids); + return retryTaskService.batchDelete(ids); } + + } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/ManualTriggerTaskRequestVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/ManualTriggerTaskRequestVO.java index 07c5ec74..920ed938 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/ManualTriggerTaskRequestVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/ManualTriggerTaskRequestVO.java @@ -19,7 +19,7 @@ public class ManualTriggerTaskRequestVO { @Pattern(regexp = "^[A-Za-z0-9_-]{1,64}$", message = "仅支持长度为1~64字符且类型为数字、字母、下划线和短横线") private String groupName; - @NotEmpty(message = "uniqueIds 不能为空") - private List uniqueIds; + @NotEmpty(message = "retryIds 不能为空") + private List retryIds; } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/RetryTaskLogResponseVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/RetryTaskResponseVO.java similarity index 92% rename from snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/RetryTaskLogResponseVO.java rename to snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/RetryTaskResponseVO.java index 537b56ea..779438ac 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/RetryTaskLogResponseVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/RetryTaskResponseVO.java @@ -9,7 +9,7 @@ import java.time.LocalDateTime; * @date : 2022-02-28 09:09 */ @Data -public class RetryTaskLogResponseVO { +public class RetryTaskResponseVO { private Long id; @@ -21,7 +21,6 @@ public class RetryTaskLogResponseVO { private Long retryId; - private Integer taskType; private LocalDateTime createDt; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryService.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryService.java new file mode 100644 index 00000000..de7cda9c --- /dev/null +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryService.java @@ -0,0 +1,83 @@ +package com.aizuda.snailjob.server.web.service; + +import com.aizuda.snailjob.server.web.model.base.PageResult; +import com.aizuda.snailjob.server.web.model.request.*; +import com.aizuda.snailjob.server.web.model.response.RetryResponseVO; + +import java.util.List; + +/** + * @author opensnail + * @date 2022-02-27 + * @since 2.0 + */ +public interface RetryService { + + PageResult> getRetryPage(RetryQueryVO queryVO); + + /** + * 通过重试任务表id获取重试任务信息 + * + * @param groupName 组名称 + * @param id 重试任务表id + * @return 重试任务 + */ + RetryResponseVO getRetryById(String groupName, Long id); + + /** + * 更新重试任务状态 + * + * @param retryUpdateStatusRequestVO 更新重试任务状态请求模型 + * @return + */ + int updateRetryTaskStatus(RetryUpdateStatusRequestVO retryUpdateStatusRequestVO); + + /** + * 手动新增重试任务 + * + * @param retryTaskRequestVO {@link RetrySaveRequestVO} 重试数据模型 + * @return + */ + int saveRetryTask(RetrySaveRequestVO retryTaskRequestVO); + + /** + * 委托客户端生成idempotentId + * + * @param generateRetryIdempotentIdVO 生成idempotentId请求模型 + * @return + */ + String idempotentIdGenerate(GenerateRetryIdempotentIdVO generateRetryIdempotentIdVO); + + /** + * 若客户端在变更了执行器,从而会导致执行重试任务时找不到执行器类,因此使用者可以在后端进行执行变更 + * + * @param requestVO 更新执行器变更模型 + * @return 更新条数 + */ + int updateRetryExecutorName(RetryUpdateExecutorNameRequestVO requestVO); + + /** + * 批量删除重试数据 + * + * @param requestVO 批量删除重试数据 + * @return + */ + boolean batchDeleteRetry(BatchDeleteRetryTaskVO requestVO); + + /** + * 解析日志 + * + * @param parseLogsVO {@link ParseLogsVO} 解析参数模型 + * @return + */ + Integer parseLogs(ParseLogsVO parseLogsVO); + + /** + * 手动支持重试任务 + * + * @param requestVO + * @return + */ + boolean manualTriggerRetryTask(ManualTriggerTaskRequestVO requestVO); + +} diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryTaskLogService.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryTaskLogService.java deleted file mode 100644 index 98dd056b..00000000 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryTaskLogService.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.aizuda.snailjob.server.web.service; - -import com.aizuda.snailjob.server.web.model.base.PageResult; -import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO; -import com.aizuda.snailjob.server.web.model.request.RetryTaskQueryVO; -import com.aizuda.snailjob.server.web.model.response.RetryTaskLogMessageResponseVO; -import com.aizuda.snailjob.server.web.model.response.RetryTaskLogResponseVO; - -import java.util.List; -import java.util.Set; - -/** - * @author opensnail - * @date 2022-02-27 - * @since 2.0 - */ -public interface RetryTaskLogService { - - PageResult> getRetryTaskLogPage(RetryTaskQueryVO queryVO); - - RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage(RetryTaskLogMessageQueryVO queryVO); - - RetryTaskLogResponseVO getRetryTaskLogById(Long id); - - boolean deleteById(Long id); - - boolean batchDelete(Set ids); -} diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryTaskService.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryTaskService.java index af07d9ad..9ab60e5e 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryTaskService.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryTaskService.java @@ -1,10 +1,13 @@ package com.aizuda.snailjob.server.web.service; import com.aizuda.snailjob.server.web.model.base.PageResult; -import com.aizuda.snailjob.server.web.model.request.*; -import com.aizuda.snailjob.server.web.model.response.RetryResponseVO; +import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO; +import com.aizuda.snailjob.server.web.model.request.RetryTaskQueryVO; +import com.aizuda.snailjob.server.web.model.response.RetryTaskLogMessageResponseVO; +import com.aizuda.snailjob.server.web.model.response.RetryTaskResponseVO; import java.util.List; +import java.util.Set; /** * @author opensnail @@ -13,78 +16,15 @@ import java.util.List; */ public interface RetryTaskService { - PageResult> getRetryPage(RetryQueryVO queryVO); + PageResult> getRetryTaskLogPage(RetryTaskQueryVO queryVO); - /** - * 通过重试任务表id获取重试任务信息 - * - * @param groupName 组名称 - * @param id 重试任务表id - * @return 重试任务 - */ - RetryResponseVO getRetryById(String groupName, Long id); + RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage(RetryTaskLogMessageQueryVO queryVO); - /** - * 更新重试任务状态 - * - * @param retryUpdateStatusRequestVO 更新重试任务状态请求模型 - * @return - */ - int updateRetryTaskStatus(RetryUpdateStatusRequestVO retryUpdateStatusRequestVO); + RetryTaskResponseVO getRetryTaskById(Long id); - /** - * 手动新增重试任务 - * - * @param retryTaskRequestVO {@link RetrySaveRequestVO} 重试数据模型 - * @return - */ - int saveRetryTask(RetrySaveRequestVO retryTaskRequestVO); + boolean deleteById(Long id); - /** - * 委托客户端生成idempotentId - * - * @param generateRetryIdempotentIdVO 生成idempotentId请求模型 - * @return - */ - String idempotentIdGenerate(GenerateRetryIdempotentIdVO generateRetryIdempotentIdVO); + boolean batchDelete(Set ids); - /** - * 若客户端在变更了执行器,从而会导致执行重试任务时找不到执行器类,因此使用者可以在后端进行执行变更 - * - * @param requestVO 更新执行器变更模型 - * @return 更新条数 - */ - int updateRetryExecutorName(RetryUpdateExecutorNameRequestVO requestVO); - - /** - * 批量删除重试数据 - * - * @param requestVO 批量删除重试数据 - * @return - */ - boolean batchDeleteRetry(BatchDeleteRetryTaskVO requestVO); - - /** - * 解析日志 - * - * @param parseLogsVO {@link ParseLogsVO} 解析参数模型 - * @return - */ - Integer parseLogs(ParseLogsVO parseLogsVO); - - /** - * 手动支持重试任务 - * - * @param requestVO - * @return - */ - boolean manualTriggerRetry(ManualTriggerTaskRequestVO requestVO); - - /** - * 手动执行回调任务 - * - * @param requestVO - * @return - */ - boolean manualTriggerCallback(ManualTriggerTaskRequestVO requestVO); + Boolean stopById(Long id); } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/RetryConverter.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/RetryConverter.java new file mode 100644 index 00000000..68ef7f96 --- /dev/null +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/RetryConverter.java @@ -0,0 +1,22 @@ +package com.aizuda.snailjob.server.web.service.convert; + +import com.aizuda.snailjob.server.retry.task.dto.RetryTaskPrepareDTO; +import com.aizuda.snailjob.server.retry.task.dto.TaskStopJobDTO; +import com.aizuda.snailjob.template.datasource.persistence.po.Retry; +import org.mapstruct.factory.Mappers; + +/** + *

+ * + *

+ * + * @author opensnail + * @date 2025-02-22 + */ +public interface RetryConverter { + RetryConverter INSTANCE = Mappers.getMapper(RetryConverter.class); + + RetryTaskPrepareDTO toRetryTaskPrepareDTO(Retry retry); + + TaskStopJobDTO toTaskStopJobDTO(Retry retry); +} diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/RetryTaskLogResponseVOConverter.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/RetryTaskLogResponseVOConverter.java index 1914741a..484ab165 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/RetryTaskLogResponseVOConverter.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/RetryTaskLogResponseVOConverter.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.web.service.convert; -import com.aizuda.snailjob.server.web.model.response.RetryTaskLogResponseVO; +import com.aizuda.snailjob.server.web.model.response.RetryTaskResponseVO; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; import org.mapstruct.Mapper; import org.mapstruct.factory.Mappers; @@ -16,8 +16,8 @@ public interface RetryTaskLogResponseVOConverter { RetryTaskLogResponseVOConverter INSTANCE = Mappers.getMapper(RetryTaskLogResponseVOConverter.class); - RetryTaskLogResponseVO convert(RetryTask retryTask); + RetryTaskResponseVO convert(RetryTask retryTask); - List convertList(List retryTasks); + List convertList(List retryTasks); } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryServiceImpl.java new file mode 100644 index 00000000..ca2793d9 --- /dev/null +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryServiceImpl.java @@ -0,0 +1,404 @@ +package com.aizuda.snailjob.server.web.service.impl; + +import akka.actor.ActorRef; +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.lang.Assert; +import cn.hutool.core.util.StrUtil; +import com.aizuda.snailjob.client.model.GenerateRetryIdempotentIdDTO; +import com.aizuda.snailjob.common.core.enums.RetryStatusEnum; +import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.core.model.Result; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.core.util.StreamUtils; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.WaitStrategy; +import com.aizuda.snailjob.server.common.akka.ActorGenerator; +import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; +import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; +import com.aizuda.snailjob.server.common.dto.RetryLogMetaDTO; +import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum; +import com.aizuda.snailjob.server.common.enums.RetryTaskExecutorSceneEnum; +import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; +import com.aizuda.snailjob.server.common.enums.TaskGeneratorSceneEnum; +import com.aizuda.snailjob.server.common.exception.SnailJobServerException; +import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler; +import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder; +import com.aizuda.snailjob.server.common.strategy.WaitStrategies.WaitStrategyContext; +import com.aizuda.snailjob.server.common.strategy.WaitStrategies.WaitStrategyEnum; +import com.aizuda.snailjob.server.common.util.DateUtils; +import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO; +import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; +import com.aizuda.snailjob.server.model.dto.RetryTaskDTO; +import com.aizuda.snailjob.server.retry.task.client.RetryRpcClient; +import com.aizuda.snailjob.server.retry.task.dto.RetryTaskPrepareDTO; +import com.aizuda.snailjob.server.retry.task.support.generator.retry.TaskContext; +import com.aizuda.snailjob.server.retry.task.support.generator.retry.TaskGenerator; +import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; +import com.aizuda.snailjob.server.web.model.base.PageResult; +import com.aizuda.snailjob.server.web.model.request.*; +import com.aizuda.snailjob.server.web.model.response.RetryResponseVO; +import com.aizuda.snailjob.server.web.service.RetryService; +import com.aizuda.snailjob.server.web.service.convert.RetryConverter; +import com.aizuda.snailjob.server.web.service.convert.RetryTaskResponseVOConverter; +import com.aizuda.snailjob.server.web.service.convert.TaskContextConverter; +import com.aizuda.snailjob.server.web.util.UserSessionUtils; +import com.aizuda.snailjob.template.datasource.access.AccessTemplate; +import com.aizuda.snailjob.template.datasource.access.TaskAccess; +import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMessageMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.*; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static com.aizuda.snailjob.common.core.enums.RetryStatusEnum.ALLOW_DELETE_STATUS; + +/** + * @author opensnail + * @date 2022-02-27 + * @since 2.0 + */ +@Service +public class RetryServiceImpl implements RetryService { + + @Autowired + private ClientNodeAllocateHandler clientNodeAllocateHandler; + @Autowired + private RetryTaskMapper retryTaskMapper; + @Autowired + private AccessTemplate accessTemplate; + @Autowired + @Lazy + private List taskGenerators; + @Autowired + private RetryTaskLogMessageMapper retryTaskLogMessageMapper; + + @Override + public PageResult> getRetryPage(RetryQueryVO queryVO) { + + PageDTO pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize()); + String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); + if (StrUtil.isBlank(queryVO.getGroupName())) { + return new PageResult<>(pageDTO, new ArrayList<>()); + } + + List groupNames = UserSessionUtils.getGroupNames(queryVO.getGroupName()); + + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() + .eq(Retry::getNamespaceId, namespaceId) + .in(CollUtil.isNotEmpty(groupNames), Retry::getGroupName, groupNames) + .eq(StrUtil.isNotBlank(queryVO.getSceneName()), Retry::getSceneName, queryVO.getSceneName()) + .eq(StrUtil.isNotBlank(queryVO.getBizNo()), Retry::getBizNo, queryVO.getBizNo()) + .eq(StrUtil.isNotBlank(queryVO.getIdempotentId()), Retry::getIdempotentId, queryVO.getIdempotentId()) + .eq(Objects.nonNull(queryVO.getRetryId()), Retry::getId, queryVO.getRetryId()) + .eq(Objects.nonNull(queryVO.getRetryStatus()), Retry::getRetryStatus, queryVO.getRetryStatus()) + .eq(Retry::getTaskType, SyetemTaskTypeEnum.RETRY.getType()) + .select(Retry::getId, Retry::getBizNo, Retry::getIdempotentId, + Retry::getGroupName, Retry::getNextTriggerAt, Retry::getRetryCount, + Retry::getRetryStatus, Retry::getUpdateDt, Retry::getCreateDt, Retry::getSceneName, + Retry::getTaskType, Retry::getParentId) + .orderByDesc(Retry::getCreateDt); + pageDTO = accessTemplate.getRetryAccess().listPage(pageDTO, queryWrapper); + + Set ids = StreamUtils.toSet(pageDTO.getRecords(), Retry::getId); + Map callbackMap = Maps.newHashMap(); + if (CollUtil.isNotEmpty(ids)) { + List callbackTaskList = accessTemplate.getRetryAccess() + .list(new LambdaQueryWrapper().in(Retry::getParentId, ids)); + callbackMap = StreamUtils.toIdentityMap(callbackTaskList, Retry::getParentId); + } + + List retryResponseList = RetryTaskResponseVOConverter.INSTANCE.convertList(pageDTO.getRecords()); + for (RetryResponseVO retryResponseVO : retryResponseList) { + RetryResponseVO responseVO = RetryTaskResponseVOConverter.INSTANCE.convert(callbackMap.get(retryResponseVO.getId())); + if (Objects.isNull(responseVO)) { + retryResponseVO.setChildren(Lists.newArrayList()); + } else { + retryResponseVO.setChildren(Lists.newArrayList(responseVO)); + } + } + + return new PageResult<>(pageDTO, retryResponseList); + } + + @Override + public RetryResponseVO getRetryById(String groupName, Long id) { + TaskAccess retryTaskAccess = accessTemplate.getRetryAccess(); + Retry retry = retryTaskAccess.one(new LambdaQueryWrapper().eq(Retry::getId, id)); + return RetryTaskResponseVOConverter.INSTANCE.convert(retry); + } + + @Override + @Transactional + public int updateRetryTaskStatus(RetryUpdateStatusRequestVO requestVO) { + + RetryStatusEnum retryStatusEnum = RetryStatusEnum.getByStatus(requestVO.getRetryStatus()); + if (Objects.isNull(retryStatusEnum)) { + throw new SnailJobServerException("重试状态错误. [{}]", requestVO.getRetryStatus()); + } + + String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); + + TaskAccess retryTaskAccess = accessTemplate.getRetryAccess(); + Retry retry = retryTaskAccess.one(new LambdaQueryWrapper() + .eq(Retry::getNamespaceId, namespaceId) + .eq(Retry::getId, requestVO.getId())); + if (Objects.isNull(retry)) { + throw new SnailJobServerException("未查询到重试任务"); + } + + retry.setRetryStatus(requestVO.getRetryStatus()); + retry.setGroupName(requestVO.getGroupName()); + + // 若恢复重试则需要重新计算下次触发时间 + if (RetryStatusEnum.RUNNING.getStatus().equals(retryStatusEnum.getStatus())) { + + RetrySceneConfig retrySceneConfig = accessTemplate.getSceneConfigAccess() + .getSceneConfigByGroupNameAndSceneName(retry.getGroupName(), retry.getSceneName(), namespaceId); + WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); + waitStrategyContext.setNextTriggerAt(DateUtils.toNowMilli()); + waitStrategyContext.setTriggerInterval(retrySceneConfig.getTriggerInterval()); + waitStrategyContext.setDelayLevel(retry.getRetryCount() + 1); + WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getBackOff()); + retry.setNextTriggerAt(waitStrategy.computeTriggerTime(waitStrategyContext)); + } + + if (RetryStatusEnum.FINISH.getStatus().equals(retryStatusEnum.getStatus())) { + RetryLogMetaDTO retryLogMetaDTO = RetryTaskConverter.INSTANCE.toLogMetaDTO(retry); + retryLogMetaDTO.setTimestamp(DateUtils.toNowMilli()); + SnailJobLog.REMOTE.info("=============手动操作完成============. <|>{}<|>", retryLogMetaDTO); + } +// +// RetryTask retryTask = new RetryTask(); +// retryTask.setTaskStatus(requestVO.getRetryStatus()); +// retryTaskMapper.update(retryTask, new LambdaUpdateWrapper() +// .eq(RetryTask::getNamespaceId, namespaceId) +// .eq(RetryTask::getUniqueId, retry.getUniqueId()) +// .eq(RetryTask::getGroupName, retry.getGroupName())); + + retry.setUpdateDt(LocalDateTime.now()); + return retryTaskAccess.updateById(retry); + } + + @Override + public int saveRetryTask(final RetrySaveRequestVO retryTaskRequestVO) { + RetryStatusEnum retryStatusEnum = RetryStatusEnum.getByStatus(retryTaskRequestVO.getRetryStatus()); + if (Objects.isNull(retryStatusEnum)) { + throw new SnailJobServerException("重试状态错误"); + } + + TaskGenerator taskGenerator = taskGenerators.stream() + .filter(t -> t.supports(TaskGeneratorSceneEnum.MANA_SINGLE.getScene())) + .findFirst().orElseThrow(() -> new SnailJobServerException("没有匹配的任务生成器")); + String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); + + TaskContext taskContext = new TaskContext(); + taskContext.setSceneName(retryTaskRequestVO.getSceneName()); + taskContext.setGroupName(retryTaskRequestVO.getGroupName()); + taskContext.setInitStatus(retryTaskRequestVO.getRetryStatus()); + taskContext.setNamespaceId(namespaceId); + taskContext.setTaskInfos( + Collections.singletonList(TaskContextConverter.INSTANCE.convert(retryTaskRequestVO))); + + // 生成任务 + taskGenerator.taskGenerator(taskContext); + + return 1; + } + + @Override + public String idempotentIdGenerate(final GenerateRetryIdempotentIdVO generateRetryIdempotentIdVO) { + + String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); + Set serverNodes = CacheRegisterTable.getServerNodeSet( + generateRetryIdempotentIdVO.getGroupName(), + namespaceId); + Assert.notEmpty(serverNodes, + () -> new SnailJobServerException("生成idempotentId失败: 不存在活跃的客户端节点")); + + RetrySceneConfig retrySceneConfig = accessTemplate.getSceneConfigAccess() + .getSceneConfigByGroupNameAndSceneName(generateRetryIdempotentIdVO.getGroupName(), + generateRetryIdempotentIdVO.getSceneName(), namespaceId); + + RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(retrySceneConfig.getSceneName(), + retrySceneConfig.getGroupName(), retrySceneConfig.getNamespaceId(), retrySceneConfig.getRouteKey()); + + // 委托客户端生成idempotentId + GenerateRetryIdempotentIdDTO generateRetryIdempotentIdDTO = new GenerateRetryIdempotentIdDTO(); + generateRetryIdempotentIdDTO.setGroup(generateRetryIdempotentIdVO.getGroupName()); + generateRetryIdempotentIdDTO.setScene(generateRetryIdempotentIdVO.getSceneName()); + generateRetryIdempotentIdDTO.setArgsStr(generateRetryIdempotentIdVO.getArgsStr()); + generateRetryIdempotentIdDTO.setExecutorName(generateRetryIdempotentIdVO.getExecutorName()); + + RetryRpcClient rpcClient = RequestBuilder.newBuilder() + .nodeInfo(serverNode) + .client(RetryRpcClient.class) + .build(); + + Result result = rpcClient.generateIdempotentId(generateRetryIdempotentIdDTO); + + Assert.notNull(result, () -> new SnailJobServerException("idempotentId生成失败")); + Assert.isTrue(1 == result.getStatus(), + () -> new SnailJobServerException("idempotentId生成失败:请确保参数与执行器名称正确")); + + return (String) result.getData(); + } + + @Override + public int updateRetryExecutorName(final RetryUpdateExecutorNameRequestVO requestVO) { + + Retry retry = new Retry(); + retry.setExecutorName(requestVO.getExecutorName()); + retry.setRetryStatus(requestVO.getRetryStatus()); + retry.setUpdateDt(LocalDateTime.now()); + + String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); + // 根据重试数据id,更新执行器名称 + TaskAccess retryTaskAccess = accessTemplate.getRetryAccess(); + return retryTaskAccess.update(retry, new LambdaUpdateWrapper() + .eq(Retry::getNamespaceId, namespaceId) + .eq(Retry::getGroupName, requestVO.getGroupName()) + .in(Retry::getId, requestVO.getIds())); + } + + @Override + @Transactional + public boolean batchDeleteRetry(final BatchDeleteRetryTaskVO requestVO) { + TaskAccess retryTaskAccess = accessTemplate.getRetryAccess(); + String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); + + List retries = retryTaskAccess.list(new LambdaQueryWrapper() + .eq(Retry::getNamespaceId, namespaceId) + .eq(Retry::getGroupName, requestVO.getGroupName()) + .in(Retry::getRetryStatus, ALLOW_DELETE_STATUS) + .in(Retry::getId, requestVO.getIds()) + ); + + Assert.notEmpty(retries, + () -> new SnailJobServerException("没有可删除的数据, 只有非【处理中】的数据可以删除")); + + Set retryIds = StreamUtils.toSet(retries, Retry::getId); + retryTaskMapper.delete(new LambdaQueryWrapper() + .eq(RetryTask::getGroupName, requestVO.getGroupName()) + .eq(RetryTask::getNamespaceId, namespaceId) + .in(RetryTask::getRetryId, retryIds)); + + retryTaskLogMessageMapper.delete( + new LambdaQueryWrapper() + .eq(RetryTaskLogMessage::getNamespaceId, namespaceId) + .eq(RetryTaskLogMessage::getGroupName, requestVO.getGroupName()) + .in(RetryTaskLogMessage::getRetryId, retryIds)); + + Assert.isTrue(requestVO.getIds().size() == retryTaskAccess.delete(new LambdaQueryWrapper() + .eq(Retry::getNamespaceId, namespaceId) + .eq(Retry::getGroupName, requestVO.getGroupName()) + .in(Retry::getRetryStatus, ALLOW_DELETE_STATUS) + .in(Retry::getId, requestVO.getIds())) + , () -> new SnailJobServerException("删除重试任务失败, 请检查任务状态是否为已完成或者最大次数")); + + return Boolean.TRUE; + } + + @Override + public Integer parseLogs(ParseLogsVO parseLogsVO) { + RetryStatusEnum retryStatusEnum = RetryStatusEnum.getByStatus(parseLogsVO.getRetryStatus()); + if (Objects.isNull(retryStatusEnum)) { + throw new SnailJobServerException("重试状态错误"); + } + + String logStr = parseLogsVO.getLogStr(); + + String patternString = "<\\|>(.*?)<\\|>"; + Pattern pattern = Pattern.compile(patternString); + Matcher matcher = pattern.matcher(logStr); + + List waitInsertList = new ArrayList<>(); + // 查找匹配的内容并输出 + while (matcher.find()) { + String extractedData = matcher.group(1); + if (StrUtil.isBlank(extractedData)) { + continue; + } + + List retryTaskList = JsonUtil.parseList(extractedData, RetryTaskDTO.class); + if (CollUtil.isNotEmpty(retryTaskList)) { + waitInsertList.addAll(retryTaskList); + } + } + + Assert.isFalse(waitInsertList.isEmpty(), () -> new SnailJobServerException("未找到匹配的数据")); + Assert.isTrue(waitInsertList.size() <= 500, () -> new SnailJobServerException("最多只能处理500条数据")); + + TaskGenerator taskGenerator = taskGenerators.stream() + .filter(t -> t.supports(TaskGeneratorSceneEnum.MANA_BATCH.getScene())) + .findFirst().orElseThrow(() -> new SnailJobServerException("没有匹配的任务生成器")); + + boolean allMatch = waitInsertList.stream() + .allMatch(retryTaskDTO -> retryTaskDTO.getGroupName().equals(parseLogsVO.getGroupName())); + Assert.isTrue(allMatch, () -> new SnailJobServerException("存在数据groupName不匹配,请检查您的数据")); + + Map> map = StreamUtils.groupByKey(waitInsertList, RetryTaskDTO::getSceneName); + + String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); + + map.forEach(((sceneName, retryTaskDTOS) -> { + TaskContext taskContext = new TaskContext(); + taskContext.setSceneName(sceneName); + taskContext.setGroupName(parseLogsVO.getGroupName()); + taskContext.setNamespaceId(namespaceId); + taskContext.setInitStatus(parseLogsVO.getRetryStatus()); + taskContext.setTaskInfos(TaskContextConverter.INSTANCE.convert(retryTaskDTOS)); + + // 生成任务 + taskGenerator.taskGenerator(taskContext); + })); + + return waitInsertList.size(); + } + + @Override + public boolean manualTriggerRetryTask(ManualTriggerTaskRequestVO requestVO) { + String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); + + long count = accessTemplate.getGroupConfigAccess().count(new LambdaQueryWrapper() + .eq(GroupConfig::getGroupName, requestVO.getGroupName()) + .eq(GroupConfig::getNamespaceId, namespaceId) + .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()) + ); + + Assert.isTrue(count > 0, () -> new SnailJobServerException("组:[{}]已经关闭,不支持手动执行.", requestVO.getGroupName())); + + List retryIds = requestVO.getRetryIds(); + + List list = accessTemplate.getRetryAccess().list(new LambdaQueryWrapper() + .eq(Retry::getNamespaceId, namespaceId) + .eq(Retry::getTaskType, SyetemTaskTypeEnum.RETRY.getType()) + .in(Retry::getId, retryIds) + ); + Assert.notEmpty(list, () -> new SnailJobServerException("没有可执行的任务")); + + for (Retry retry : list) { + RetryTaskPrepareDTO retryTaskPrepareDTO = RetryConverter.INSTANCE.toRetryTaskPrepareDTO(retry); + // 设置now表示立即执行 + retryTaskPrepareDTO.setNextTriggerAt(DateUtils.toNowMilli()); + retryTaskPrepareDTO.setRetryTaskExecutorScene(RetryTaskExecutorSceneEnum.MANUAL_RETRY.getScene()); + + // 准备阶段执行 + ActorRef actorRef = ActorGenerator.retryTaskPrepareActor(); + actorRef.tell(retryTaskPrepareDTO, actorRef); + } + + return true; + } +} diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskLogServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskLogServiceImpl.java deleted file mode 100644 index 2240d395..00000000 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskLogServiceImpl.java +++ /dev/null @@ -1,207 +0,0 @@ -package com.aizuda.snailjob.server.web.service.impl; - -import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.lang.Assert; -import cn.hutool.core.util.ObjUtil; -import cn.hutool.core.util.StrUtil; -import com.aizuda.snailjob.common.core.enums.RetryStatusEnum; -import com.aizuda.snailjob.common.core.util.JsonUtil; -import com.aizuda.snailjob.common.log.constant.LogFieldConstants; -import com.aizuda.snailjob.server.common.exception.SnailJobServerException; -import com.aizuda.snailjob.server.web.model.base.PageResult; -import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO; -import com.aizuda.snailjob.server.web.model.request.RetryTaskQueryVO; -import com.aizuda.snailjob.server.web.model.request.UserSessionVO; -import com.aizuda.snailjob.server.web.model.response.RetryTaskLogMessageResponseVO; -import com.aizuda.snailjob.server.web.model.response.RetryTaskLogResponseVO; -import com.aizuda.snailjob.server.web.service.RetryTaskLogService; -import com.aizuda.snailjob.server.web.service.convert.RetryTaskLogResponseVOConverter; -import com.aizuda.snailjob.server.web.util.UserSessionUtils; -import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper; -import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMessageMapper; -import com.aizuda.snailjob.template.datasource.persistence.po.Retry; -import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; -import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLogMessage; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; -import com.google.common.collect.Lists; -import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; - -import java.util.*; -import java.util.stream.Collectors; - -/** - * @author: opensnail - * @date : 2022-02-28 09:10 - */ -@Service -@RequiredArgsConstructor -public class RetryTaskLogServiceImpl implements RetryTaskLogService { - - private final RetryTaskMapper retryTaskMapper; - private final RetryTaskLogMessageMapper retryTaskLogMessageMapper; - - @Override - public PageResult> getRetryTaskLogPage(RetryTaskQueryVO queryVO) { - PageDTO pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize()); - - UserSessionVO userSessionVO = UserSessionUtils.currentUserSession(); - String namespaceId = userSessionVO.getNamespaceId(); - - List groupNames = UserSessionUtils.getGroupNames(queryVO.getGroupName()); - - LambdaQueryWrapper wrapper = new LambdaQueryWrapper() - .eq(RetryTask::getNamespaceId, namespaceId) - .in(CollUtil.isNotEmpty(groupNames), RetryTask::getGroupName, groupNames) - .eq(StrUtil.isNotBlank(queryVO.getSceneName()), RetryTask::getSceneName, queryVO.getSceneName()) - .eq(queryVO.getRetryStatus() != null, RetryTask::getTaskStatus, queryVO.getRetryStatus()) - .eq(Objects.nonNull(queryVO.getRetryId()), RetryTask::getRetryId, queryVO.getRetryId()) - .between(ObjUtil.isNotNull(queryVO.getDatetimeRange()), - RetryTask::getCreateDt, queryVO.getStartDt(), queryVO.getEndDt()) - .select(RetryTask::getGroupName, RetryTask::getId, RetryTask::getSceneName, RetryTask::getTaskStatus, - RetryTask::getCreateDt, RetryTask::getTaskType, RetryTask::getOperationReason, RetryTask::getRetryId) - .orderByDesc(RetryTask::getCreateDt); - - PageDTO retryTaskPageDTO = retryTaskMapper.selectPage(pageDTO, wrapper); - return new PageResult<>(retryTaskPageDTO, - RetryTaskLogResponseVOConverter.INSTANCE.convertList(retryTaskPageDTO.getRecords())); - - } - - @Override - public RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage( - RetryTaskLogMessageQueryVO queryVO) { - if (queryVO.getRetryTaskId() == null || StrUtil.isBlank(queryVO.getGroupName())) { - RetryTaskLogMessageResponseVO jobLogResponseVO = new RetryTaskLogMessageResponseVO(); - jobLogResponseVO.setNextStartId(0L); - jobLogResponseVO.setFromIndex(0); - return jobLogResponseVO; - } - - String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); - - PageDTO pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize()); - PageDTO selectPage = retryTaskLogMessageMapper.selectPage(pageDTO, - new LambdaQueryWrapper() - .select(RetryTaskLogMessage::getId, RetryTaskLogMessage::getLogNum) - .ge(RetryTaskLogMessage::getId, queryVO.getStartId()) - .eq(RetryTaskLogMessage::getNamespaceId, namespaceId) - .eq(RetryTaskLogMessage::getRetryTaskId, queryVO.getRetryTaskId()) - .eq(RetryTaskLogMessage::getGroupName, queryVO.getGroupName()) - .orderByAsc(RetryTaskLogMessage::getId).orderByAsc(RetryTaskLogMessage::getRealTime) - .orderByDesc(RetryTaskLogMessage::getCreateDt)); - - List records = selectPage.getRecords(); - - if (CollUtil.isEmpty(records)) { - return new RetryTaskLogMessageResponseVO() - .setFinished(Boolean.TRUE) - .setNextStartId(queryVO.getStartId()) - .setFromIndex(0); - } - - Integer fromIndex = Optional.ofNullable(queryVO.getFromIndex()).orElse(0); - RetryTaskLogMessage firstRecord = records.get(0); - List ids = Lists.newArrayList(firstRecord.getId()); - int total = firstRecord.getLogNum() - fromIndex; - for (int i = 1; i < records.size(); i++) { - RetryTaskLogMessage record = records.get(i); - if (total + record.getLogNum() > queryVO.getSize()) { - break; - } - - total += record.getLogNum(); - ids.add(record.getId()); - } - - long nextStartId = 0; - List> messages = Lists.newArrayList(); - List jobLogMessages = retryTaskLogMessageMapper.selectList( - new LambdaQueryWrapper() - .in(RetryTaskLogMessage::getId, ids) - .orderByAsc(RetryTaskLogMessage::getId) - .orderByAsc(RetryTaskLogMessage::getRealTime) - ); - - for (final RetryTaskLogMessage retryTaskLogMessage : jobLogMessages) { - - List> originalList = JsonUtil.parseObject(retryTaskLogMessage.getMessage(), List.class); - int size = originalList.size() - fromIndex; - List> pageList = originalList.stream().skip(fromIndex).limit(queryVO.getSize()) - .collect(Collectors.toList()); - if (messages.size() + size >= queryVO.getSize()) { - messages.addAll(pageList); - nextStartId = retryTaskLogMessage.getId(); - fromIndex = Math.min(fromIndex + queryVO.getSize(), originalList.size() - 1) + 1; - break; - } - - messages.addAll(pageList); - nextStartId = retryTaskLogMessage.getId() + 1; - fromIndex = 0; - } - - messages = messages.stream() - .sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP)))) - .collect(Collectors.toList()); - - RetryTaskLogMessageResponseVO responseVO = new RetryTaskLogMessageResponseVO(); - responseVO.setMessage(messages); - responseVO.setNextStartId(nextStartId); - responseVO.setFromIndex(fromIndex); - - return responseVO; - } - - @Override - public RetryTaskLogResponseVO getRetryTaskLogById(Long id) { - RetryTask retryTask = retryTaskMapper.selectById(id); - return RetryTaskLogResponseVOConverter.INSTANCE.convert(retryTask); - } - - @Override - @Transactional - public boolean deleteById(final Long id) { - String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); - - RetryTask retryTask = retryTaskMapper.selectOne( - new LambdaQueryWrapper() - .in(RetryTask::getTaskStatus, List.of(RetryStatusEnum.FINISH.getStatus(), RetryStatusEnum.MAX_COUNT.getStatus())) - .eq(RetryTask::getNamespaceId, namespaceId) - .eq(RetryTask::getId, id)); - Assert.notNull(retryTask, () -> new SnailJobServerException("数据删除失败")); - - retryTaskLogMessageMapper.delete(new LambdaQueryWrapper() - .eq(RetryTaskLogMessage::getNamespaceId, namespaceId) - .eq(RetryTaskLogMessage::getGroupName, retryTask.getGroupName()) - .eq(RetryTaskLogMessage::getRetryTaskId, id) - ); - - return 1 == retryTaskMapper.deleteById(id); - } - - @Override - @Transactional - public boolean batchDelete(final Set ids) { - String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); - - List retryTasks = retryTaskMapper.selectList( - new LambdaQueryWrapper() - .in(RetryTask::getTaskStatus, List.of(RetryStatusEnum.FINISH.getStatus(), RetryStatusEnum.MAX_COUNT.getStatus())) - .eq(RetryTask::getNamespaceId, namespaceId) - .in(RetryTask::getId, ids)); - Assert.notEmpty(retryTasks, () -> new SnailJobServerException("数据不存在")); - Assert.isTrue(retryTasks.size() == ids.size(), () -> new SnailJobServerException("数据不存在")); - - for (final RetryTask retryTask : retryTasks) { - retryTaskLogMessageMapper.delete( - new LambdaQueryWrapper() - .eq(RetryTaskLogMessage::getNamespaceId, namespaceId) - .eq(RetryTaskLogMessage::getGroupName, retryTask.getGroupName()) - .eq(RetryTaskLogMessage::getRetryTaskId, retryTask.getId())); - } - return 1 == retryTaskMapper.deleteByIds(ids); - } -} diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java index ee1841b0..d1304ad9 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java @@ -2,427 +2,228 @@ package com.aizuda.snailjob.server.web.service.impl; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Assert; +import cn.hutool.core.util.ObjUtil; import cn.hutool.core.util.StrUtil; -import com.aizuda.snailjob.client.model.GenerateRetryIdempotentIdDTO; +import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.RetryStatusEnum; -import com.aizuda.snailjob.common.core.enums.StatusEnum; -import com.aizuda.snailjob.common.core.model.Result; import com.aizuda.snailjob.common.core.util.JsonUtil; -import com.aizuda.snailjob.common.core.util.StreamUtils; -import com.aizuda.snailjob.common.log.SnailJobLog; -import com.aizuda.snailjob.server.common.WaitStrategy; -import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; -import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; -import com.aizuda.snailjob.server.common.dto.RetryLogMetaDTO; -import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; -import com.aizuda.snailjob.server.common.enums.TaskGeneratorSceneEnum; +import com.aizuda.snailjob.common.log.constant.LogFieldConstants; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; -import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler; -import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder; -import com.aizuda.snailjob.server.common.strategy.WaitStrategies.WaitStrategyContext; -import com.aizuda.snailjob.server.common.strategy.WaitStrategies.WaitStrategyEnum; -import com.aizuda.snailjob.server.common.util.DateUtils; -import com.aizuda.snailjob.server.model.dto.RetryTaskDTO; -import com.aizuda.snailjob.server.retry.task.client.RetryRpcClient; -import com.aizuda.snailjob.server.retry.task.support.generator.retry.TaskContext; -import com.aizuda.snailjob.server.retry.task.support.generator.retry.TaskGenerator; -import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; +import com.aizuda.snailjob.server.retry.task.dto.TaskStopJobDTO; +import com.aizuda.snailjob.server.retry.task.support.handler.RetryTaskStopHandler; import com.aizuda.snailjob.server.web.model.base.PageResult; -import com.aizuda.snailjob.server.web.model.request.*; -import com.aizuda.snailjob.server.web.model.response.RetryResponseVO; +import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO; +import com.aizuda.snailjob.server.web.model.request.RetryTaskQueryVO; +import com.aizuda.snailjob.server.web.model.request.UserSessionVO; +import com.aizuda.snailjob.server.web.model.response.RetryTaskLogMessageResponseVO; +import com.aizuda.snailjob.server.web.model.response.RetryTaskResponseVO; import com.aizuda.snailjob.server.web.service.RetryTaskService; -import com.aizuda.snailjob.server.web.service.convert.RetryTaskResponseVOConverter; -import com.aizuda.snailjob.server.web.service.convert.TaskContextConverter; +import com.aizuda.snailjob.server.web.service.convert.RetryConverter; +import com.aizuda.snailjob.server.web.service.convert.RetryTaskLogResponseVOConverter; import com.aizuda.snailjob.server.web.util.UserSessionUtils; -import com.aizuda.snailjob.template.datasource.access.AccessTemplate; -import com.aizuda.snailjob.template.datasource.access.TaskAccess; +import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMessageMapper; -import com.aizuda.snailjob.template.datasource.persistence.po.*; +import com.aizuda.snailjob.template.datasource.persistence.po.Retry; +import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; +import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLogMessage; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Lazy; +import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import java.time.LocalDateTime; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static com.aizuda.snailjob.common.core.enums.RetryStatusEnum.ALLOW_DELETE_STATUS; +import java.util.stream.Collectors; /** - * @author opensnail - * @date 2022-02-27 - * @since 2.0 + * @author: opensnail + * @date : 2022-02-28 09:10 */ @Service +@RequiredArgsConstructor public class RetryTaskServiceImpl implements RetryTaskService { - @Autowired - private ClientNodeAllocateHandler clientNodeAllocateHandler; - @Autowired - private RetryTaskMapper retryTaskMapper; - @Autowired - private AccessTemplate accessTemplate; - @Autowired - @Lazy - private List taskGenerators; - @Autowired - private RetryTaskLogMessageMapper retryTaskLogMessageMapper; + private final RetryTaskMapper retryTaskMapper; + private final RetryMapper retryMapper; + private final RetryTaskLogMessageMapper retryTaskLogMessageMapper; + private final RetryTaskStopHandler retryTaskStopHandler; @Override - public PageResult> getRetryPage(RetryQueryVO queryVO) { + public PageResult> getRetryTaskLogPage(RetryTaskQueryVO queryVO) { + PageDTO pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize()); - PageDTO pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize()); - String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); - if (StrUtil.isBlank(queryVO.getGroupName())) { - return new PageResult<>(pageDTO, new ArrayList<>()); - } + UserSessionVO userSessionVO = UserSessionUtils.currentUserSession(); + String namespaceId = userSessionVO.getNamespaceId(); List groupNames = UserSessionUtils.getGroupNames(queryVO.getGroupName()); - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .eq(Retry::getNamespaceId, namespaceId) - .in(CollUtil.isNotEmpty(groupNames), Retry::getGroupName, groupNames) - .eq(StrUtil.isNotBlank(queryVO.getSceneName()), Retry::getSceneName, queryVO.getSceneName()) - .eq(StrUtil.isNotBlank(queryVO.getBizNo()), Retry::getBizNo, queryVO.getBizNo()) - .eq(StrUtil.isNotBlank(queryVO.getIdempotentId()), Retry::getIdempotentId, queryVO.getIdempotentId()) - .eq(Objects.nonNull(queryVO.getRetryId()), Retry::getId, queryVO.getRetryId()) - .eq(Objects.nonNull(queryVO.getRetryStatus()), Retry::getRetryStatus, queryVO.getRetryStatus()) - .eq(Retry::getTaskType, SyetemTaskTypeEnum.RETRY.getType()) - .select(Retry::getId, Retry::getBizNo, Retry::getIdempotentId, - Retry::getGroupName, Retry::getNextTriggerAt, Retry::getRetryCount, - Retry::getRetryStatus, Retry::getUpdateDt, Retry::getCreateDt, Retry::getSceneName, - Retry::getTaskType, Retry::getParentId) - .orderByDesc(Retry::getCreateDt); - pageDTO = accessTemplate.getRetryAccess().listPage(pageDTO, queryWrapper); - - Set ids = StreamUtils.toSet(pageDTO.getRecords(), Retry::getId); - Map callbackMap = Maps.newHashMap(); - if (CollUtil.isNotEmpty(ids)) { - List callbackTaskList = accessTemplate.getRetryAccess() - .list(new LambdaQueryWrapper().in(Retry::getParentId, ids)); - callbackMap = StreamUtils.toIdentityMap(callbackTaskList, Retry::getParentId); - } - - List retryResponseList = RetryTaskResponseVOConverter.INSTANCE.convertList(pageDTO.getRecords()); - for (RetryResponseVO retryResponseVO : retryResponseList) { - RetryResponseVO responseVO = RetryTaskResponseVOConverter.INSTANCE.convert(callbackMap.get(retryResponseVO.getId())); - if (Objects.isNull(responseVO)) { - retryResponseVO.setChildren(Lists.newArrayList()); - } else { - retryResponseVO.setChildren(Lists.newArrayList(responseVO)); - } - } - - return new PageResult<>(pageDTO, retryResponseList); - } - - @Override - public RetryResponseVO getRetryById(String groupName, Long id) { - TaskAccess retryTaskAccess = accessTemplate.getRetryAccess(); - Retry retry = retryTaskAccess.one(new LambdaQueryWrapper().eq(Retry::getId, id)); - return RetryTaskResponseVOConverter.INSTANCE.convert(retry); - } - - @Override - @Transactional - public int updateRetryTaskStatus(RetryUpdateStatusRequestVO requestVO) { - - RetryStatusEnum retryStatusEnum = RetryStatusEnum.getByStatus(requestVO.getRetryStatus()); - if (Objects.isNull(retryStatusEnum)) { - throw new SnailJobServerException("重试状态错误. [{}]", requestVO.getRetryStatus()); - } - - String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); - - TaskAccess retryTaskAccess = accessTemplate.getRetryAccess(); - Retry retry = retryTaskAccess.one(new LambdaQueryWrapper() - .eq(Retry::getNamespaceId, namespaceId) - .eq(Retry::getId, requestVO.getId())); - if (Objects.isNull(retry)) { - throw new SnailJobServerException("未查询到重试任务"); - } - - retry.setRetryStatus(requestVO.getRetryStatus()); - retry.setGroupName(requestVO.getGroupName()); - - // 若恢复重试则需要重新计算下次触发时间 - if (RetryStatusEnum.RUNNING.getStatus().equals(retryStatusEnum.getStatus())) { - - RetrySceneConfig retrySceneConfig = accessTemplate.getSceneConfigAccess() - .getSceneConfigByGroupNameAndSceneName(retry.getGroupName(), retry.getSceneName(), namespaceId); - WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); - waitStrategyContext.setNextTriggerAt(DateUtils.toNowMilli()); - waitStrategyContext.setTriggerInterval(retrySceneConfig.getTriggerInterval()); - waitStrategyContext.setDelayLevel(retry.getRetryCount() + 1); - WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getBackOff()); - retry.setNextTriggerAt(waitStrategy.computeTriggerTime(waitStrategyContext)); - } - - if (RetryStatusEnum.FINISH.getStatus().equals(retryStatusEnum.getStatus())) { - RetryLogMetaDTO retryLogMetaDTO = RetryTaskConverter.INSTANCE.toLogMetaDTO(retry); - retryLogMetaDTO.setTimestamp(DateUtils.toNowMilli()); - SnailJobLog.REMOTE.info("=============手动操作完成============. <|>{}<|>", retryLogMetaDTO); - } -// -// RetryTask retryTask = new RetryTask(); -// retryTask.setTaskStatus(requestVO.getRetryStatus()); -// retryTaskMapper.update(retryTask, new LambdaUpdateWrapper() -// .eq(RetryTask::getNamespaceId, namespaceId) -// .eq(RetryTask::getUniqueId, retry.getUniqueId()) -// .eq(RetryTask::getGroupName, retry.getGroupName())); - - retry.setUpdateDt(LocalDateTime.now()); - return retryTaskAccess.updateById(retry); - } - - @Override - public int saveRetryTask(final RetrySaveRequestVO retryTaskRequestVO) { - RetryStatusEnum retryStatusEnum = RetryStatusEnum.getByStatus(retryTaskRequestVO.getRetryStatus()); - if (Objects.isNull(retryStatusEnum)) { - throw new SnailJobServerException("重试状态错误"); - } - - TaskGenerator taskGenerator = taskGenerators.stream() - .filter(t -> t.supports(TaskGeneratorSceneEnum.MANA_SINGLE.getScene())) - .findFirst().orElseThrow(() -> new SnailJobServerException("没有匹配的任务生成器")); - String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); - - TaskContext taskContext = new TaskContext(); - taskContext.setSceneName(retryTaskRequestVO.getSceneName()); - taskContext.setGroupName(retryTaskRequestVO.getGroupName()); - taskContext.setInitStatus(retryTaskRequestVO.getRetryStatus()); - taskContext.setNamespaceId(namespaceId); - taskContext.setTaskInfos( - Collections.singletonList(TaskContextConverter.INSTANCE.convert(retryTaskRequestVO))); - - // 生成任务 - taskGenerator.taskGenerator(taskContext); - - return 1; - } - - @Override - public String idempotentIdGenerate(final GenerateRetryIdempotentIdVO generateRetryIdempotentIdVO) { - - String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); - Set serverNodes = CacheRegisterTable.getServerNodeSet( - generateRetryIdempotentIdVO.getGroupName(), - namespaceId); - Assert.notEmpty(serverNodes, - () -> new SnailJobServerException("生成idempotentId失败: 不存在活跃的客户端节点")); - - RetrySceneConfig retrySceneConfig = accessTemplate.getSceneConfigAccess() - .getSceneConfigByGroupNameAndSceneName(generateRetryIdempotentIdVO.getGroupName(), - generateRetryIdempotentIdVO.getSceneName(), namespaceId); - - RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(retrySceneConfig.getSceneName(), - retrySceneConfig.getGroupName(), retrySceneConfig.getNamespaceId(), retrySceneConfig.getRouteKey()); - - // 委托客户端生成idempotentId - GenerateRetryIdempotentIdDTO generateRetryIdempotentIdDTO = new GenerateRetryIdempotentIdDTO(); - generateRetryIdempotentIdDTO.setGroup(generateRetryIdempotentIdVO.getGroupName()); - generateRetryIdempotentIdDTO.setScene(generateRetryIdempotentIdVO.getSceneName()); - generateRetryIdempotentIdDTO.setArgsStr(generateRetryIdempotentIdVO.getArgsStr()); - generateRetryIdempotentIdDTO.setExecutorName(generateRetryIdempotentIdVO.getExecutorName()); - - RetryRpcClient rpcClient = RequestBuilder.newBuilder() - .nodeInfo(serverNode) - .client(RetryRpcClient.class) - .build(); - - Result result = rpcClient.generateIdempotentId(generateRetryIdempotentIdDTO); - - Assert.notNull(result, () -> new SnailJobServerException("idempotentId生成失败")); - Assert.isTrue(1 == result.getStatus(), - () -> new SnailJobServerException("idempotentId生成失败:请确保参数与执行器名称正确")); - - return (String) result.getData(); - } - - @Override - public int updateRetryExecutorName(final RetryUpdateExecutorNameRequestVO requestVO) { - - Retry retry = new Retry(); - retry.setExecutorName(requestVO.getExecutorName()); - retry.setRetryStatus(requestVO.getRetryStatus()); - retry.setUpdateDt(LocalDateTime.now()); - - String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); - // 根据重试数据id,更新执行器名称 - TaskAccess retryTaskAccess = accessTemplate.getRetryAccess(); - return retryTaskAccess.update(retry, new LambdaUpdateWrapper() - .eq(Retry::getNamespaceId, namespaceId) - .eq(Retry::getGroupName, requestVO.getGroupName()) - .in(Retry::getId, requestVO.getIds())); - } - - @Override - @Transactional - public boolean batchDeleteRetry(final BatchDeleteRetryTaskVO requestVO) { - TaskAccess retryTaskAccess = accessTemplate.getRetryAccess(); - String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); - - List tasks = retryTaskAccess.list(new LambdaQueryWrapper() - .eq(Retry::getNamespaceId, namespaceId) - .eq(Retry::getGroupName, requestVO.getGroupName()) - .in(Retry::getRetryStatus, ALLOW_DELETE_STATUS) - .in(Retry::getId, requestVO.getIds()) - ); - - Assert.notEmpty(tasks, - () -> new SnailJobServerException("没有可删除的数据, 只有非【处理中】的数据可以删除")); - - Assert.isTrue(requestVO.getIds().size() == retryTaskAccess.delete(new LambdaQueryWrapper() - .eq(Retry::getNamespaceId, namespaceId) - .eq(Retry::getGroupName, requestVO.getGroupName()) - .in(Retry::getRetryStatus, ALLOW_DELETE_STATUS) - .in(Retry::getId, requestVO.getIds())) - , () -> new SnailJobServerException("删除重试任务失败, 请检查任务状态是否为已完成或者最大次数")); - - Set uniqueIds = StreamUtils.toSet(tasks, Retry::getId); - retryTaskMapper.delete(new LambdaQueryWrapper() - .in(RetryTask::getTaskStatus, ALLOW_DELETE_STATUS) - .eq(RetryTask::getGroupName, requestVO.getGroupName()) + LambdaQueryWrapper wrapper = new LambdaQueryWrapper() .eq(RetryTask::getNamespaceId, namespaceId) - .in(RetryTask::getRetryId, uniqueIds)); + .in(CollUtil.isNotEmpty(groupNames), RetryTask::getGroupName, groupNames) + .eq(StrUtil.isNotBlank(queryVO.getSceneName()), RetryTask::getSceneName, queryVO.getSceneName()) + .eq(queryVO.getRetryStatus() != null, RetryTask::getTaskStatus, queryVO.getRetryStatus()) + .eq(Objects.nonNull(queryVO.getRetryId()), RetryTask::getRetryId, queryVO.getRetryId()) + .between(ObjUtil.isNotNull(queryVO.getDatetimeRange()), + RetryTask::getCreateDt, queryVO.getStartDt(), queryVO.getEndDt()) + .select(RetryTask::getGroupName, RetryTask::getId, RetryTask::getSceneName, RetryTask::getTaskStatus, + RetryTask::getCreateDt, RetryTask::getTaskType, RetryTask::getOperationReason, RetryTask::getRetryId) + .orderByDesc(RetryTask::getCreateDt); - retryTaskLogMessageMapper.delete( + PageDTO retryTaskPageDTO = retryTaskMapper.selectPage(pageDTO, wrapper); + return new PageResult<>(retryTaskPageDTO, + RetryTaskLogResponseVOConverter.INSTANCE.convertList(retryTaskPageDTO.getRecords())); + + } + + @Override + public RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage( + RetryTaskLogMessageQueryVO queryVO) { + if (queryVO.getRetryTaskId() == null || StrUtil.isBlank(queryVO.getGroupName())) { + RetryTaskLogMessageResponseVO jobLogResponseVO = new RetryTaskLogMessageResponseVO(); + jobLogResponseVO.setNextStartId(0L); + jobLogResponseVO.setFromIndex(0); + return jobLogResponseVO; + } + + String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); + + PageDTO pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize()); + PageDTO selectPage = retryTaskLogMessageMapper.selectPage(pageDTO, new LambdaQueryWrapper() + .select(RetryTaskLogMessage::getId, RetryTaskLogMessage::getLogNum) + .ge(RetryTaskLogMessage::getId, queryVO.getStartId()) .eq(RetryTaskLogMessage::getNamespaceId, namespaceId) - .eq(RetryTaskLogMessage::getGroupName, requestVO.getGroupName()) - .in(RetryTaskLogMessage::getRetryId, uniqueIds)); - return Boolean.TRUE; + .eq(RetryTaskLogMessage::getRetryTaskId, queryVO.getRetryTaskId()) + .eq(RetryTaskLogMessage::getGroupName, queryVO.getGroupName()) + .orderByAsc(RetryTaskLogMessage::getId).orderByAsc(RetryTaskLogMessage::getRealTime) + .orderByDesc(RetryTaskLogMessage::getCreateDt)); + + List records = selectPage.getRecords(); + + if (CollUtil.isEmpty(records)) { + return new RetryTaskLogMessageResponseVO() + .setFinished(Boolean.TRUE) + .setNextStartId(queryVO.getStartId()) + .setFromIndex(0); + } + + Integer fromIndex = Optional.ofNullable(queryVO.getFromIndex()).orElse(0); + RetryTaskLogMessage firstRecord = records.get(0); + List ids = Lists.newArrayList(firstRecord.getId()); + int total = firstRecord.getLogNum() - fromIndex; + for (int i = 1; i < records.size(); i++) { + RetryTaskLogMessage record = records.get(i); + if (total + record.getLogNum() > queryVO.getSize()) { + break; + } + + total += record.getLogNum(); + ids.add(record.getId()); + } + + long nextStartId = 0; + List> messages = Lists.newArrayList(); + List jobLogMessages = retryTaskLogMessageMapper.selectList( + new LambdaQueryWrapper() + .in(RetryTaskLogMessage::getId, ids) + .orderByAsc(RetryTaskLogMessage::getId) + .orderByAsc(RetryTaskLogMessage::getRealTime) + ); + + for (final RetryTaskLogMessage retryTaskLogMessage : jobLogMessages) { + + List> originalList = JsonUtil.parseObject(retryTaskLogMessage.getMessage(), List.class); + int size = originalList.size() - fromIndex; + List> pageList = originalList.stream().skip(fromIndex).limit(queryVO.getSize()) + .collect(Collectors.toList()); + if (messages.size() + size >= queryVO.getSize()) { + messages.addAll(pageList); + nextStartId = retryTaskLogMessage.getId(); + fromIndex = Math.min(fromIndex + queryVO.getSize(), originalList.size() - 1) + 1; + break; + } + + messages.addAll(pageList); + nextStartId = retryTaskLogMessage.getId() + 1; + fromIndex = 0; + } + + messages = messages.stream() + .sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP)))) + .collect(Collectors.toList()); + + RetryTaskLogMessageResponseVO responseVO = new RetryTaskLogMessageResponseVO(); + responseVO.setMessage(messages); + responseVO.setNextStartId(nextStartId); + responseVO.setFromIndex(fromIndex); + + return responseVO; } @Override - public Integer parseLogs(ParseLogsVO parseLogsVO) { - RetryStatusEnum retryStatusEnum = RetryStatusEnum.getByStatus(parseLogsVO.getRetryStatus()); - if (Objects.isNull(retryStatusEnum)) { - throw new SnailJobServerException("重试状态错误"); - } - - String logStr = parseLogsVO.getLogStr(); - - String patternString = "<\\|>(.*?)<\\|>"; - Pattern pattern = Pattern.compile(patternString); - Matcher matcher = pattern.matcher(logStr); - - List waitInsertList = new ArrayList<>(); - // 查找匹配的内容并输出 - while (matcher.find()) { - String extractedData = matcher.group(1); - if (StrUtil.isBlank(extractedData)) { - continue; - } - - List retryTaskList = JsonUtil.parseList(extractedData, RetryTaskDTO.class); - if (CollUtil.isNotEmpty(retryTaskList)) { - waitInsertList.addAll(retryTaskList); - } - } - - Assert.isFalse(waitInsertList.isEmpty(), () -> new SnailJobServerException("未找到匹配的数据")); - Assert.isTrue(waitInsertList.size() <= 500, () -> new SnailJobServerException("最多只能处理500条数据")); - - TaskGenerator taskGenerator = taskGenerators.stream() - .filter(t -> t.supports(TaskGeneratorSceneEnum.MANA_BATCH.getScene())) - .findFirst().orElseThrow(() -> new SnailJobServerException("没有匹配的任务生成器")); - - boolean allMatch = waitInsertList.stream() - .allMatch(retryTaskDTO -> retryTaskDTO.getGroupName().equals(parseLogsVO.getGroupName())); - Assert.isTrue(allMatch, () -> new SnailJobServerException("存在数据groupName不匹配,请检查您的数据")); - - Map> map = StreamUtils.groupByKey(waitInsertList, RetryTaskDTO::getSceneName); - - String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); - - map.forEach(((sceneName, retryTaskDTOS) -> { - TaskContext taskContext = new TaskContext(); - taskContext.setSceneName(sceneName); - taskContext.setGroupName(parseLogsVO.getGroupName()); - taskContext.setNamespaceId(namespaceId); - taskContext.setInitStatus(parseLogsVO.getRetryStatus()); - taskContext.setTaskInfos(TaskContextConverter.INSTANCE.convert(retryTaskDTOS)); - - // 生成任务 - taskGenerator.taskGenerator(taskContext); - })); - - return waitInsertList.size(); + public RetryTaskResponseVO getRetryTaskById(Long id) { + RetryTask retryTask = retryTaskMapper.selectById(id); + return RetryTaskLogResponseVOConverter.INSTANCE.convert(retryTask); } @Override - public boolean manualTriggerRetry(ManualTriggerTaskRequestVO requestVO) { + @Transactional + public boolean deleteById(final Long id) { String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); - long count = accessTemplate.getGroupConfigAccess().count(new LambdaQueryWrapper() - .eq(GroupConfig::getGroupName, requestVO.getGroupName()) - .eq(GroupConfig::getNamespaceId, namespaceId) - .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()) + RetryTask retryTask = retryTaskMapper.selectOne( + new LambdaQueryWrapper() + .in(RetryTask::getTaskStatus, List.of(RetryStatusEnum.FINISH.getStatus(), RetryStatusEnum.MAX_COUNT.getStatus())) + .eq(RetryTask::getNamespaceId, namespaceId) + .eq(RetryTask::getId, id)); + Assert.notNull(retryTask, () -> new SnailJobServerException("数据删除失败")); + + retryTaskLogMessageMapper.delete(new LambdaQueryWrapper() + .eq(RetryTaskLogMessage::getNamespaceId, namespaceId) + .eq(RetryTaskLogMessage::getGroupName, retryTask.getGroupName()) + .eq(RetryTaskLogMessage::getRetryTaskId, id) ); - Assert.isTrue(count > 0, () -> new SnailJobServerException("组:[{}]已经关闭,不支持手动执行.", requestVO.getGroupName())); + return 1 == retryTaskMapper.deleteById(id); + } - List uniqueIds = requestVO.getUniqueIds(); + @Override + @Transactional + public boolean batchDelete(final Set ids) { + String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); + List retryTasks = retryTaskMapper.selectList( + new LambdaQueryWrapper() + .in(RetryTask::getTaskStatus, List.of(RetryStatusEnum.FINISH.getStatus(), RetryStatusEnum.MAX_COUNT.getStatus())) + .eq(RetryTask::getNamespaceId, namespaceId) + .in(RetryTask::getId, ids)); + Assert.notEmpty(retryTasks, () -> new SnailJobServerException("数据不存在")); + Assert.isTrue(retryTasks.size() == ids.size(), () -> new SnailJobServerException("数据不存在")); - List list = accessTemplate.getRetryAccess().list(new LambdaQueryWrapper() - .eq(Retry::getNamespaceId, namespaceId) - .eq(Retry::getTaskType, SyetemTaskTypeEnum.RETRY.getType()) -// .in(Retry::getUniqueId, uniqueIds) - ); - Assert.notEmpty(list, () -> new SnailJobServerException("没有可执行的任务")); - - for (Retry retry : list) { -// for (TaskExecutor taskExecutor : taskExecutors) { -// if (taskExecutor.getTaskType().getScene() == TaskExecutorSceneEnum.MANUAL_RETRY.getScene()) { -// taskExecutor.actuator(retry); -// } -// } + for (final RetryTask retryTask : retryTasks) { + retryTaskLogMessageMapper.delete( + new LambdaQueryWrapper() + .eq(RetryTaskLogMessage::getNamespaceId, namespaceId) + .eq(RetryTaskLogMessage::getGroupName, retryTask.getGroupName()) + .eq(RetryTaskLogMessage::getRetryTaskId, retryTask.getId())); } + return 1 == retryTaskMapper.deleteByIds(ids); + } + + @Override + public Boolean stopById(Long id) { + + Retry retry = retryMapper.selectById(id); + Assert.notNull(retry, () -> new SnailJobServerException("没有可执行的任务")); + + TaskStopJobDTO taskStopJobDTO = RetryConverter.INSTANCE.toTaskStopJobDTO(retry); + taskStopJobDTO.setOperationReason(RetryOperationReasonEnum.MANNER_STOP.getReason()); + taskStopJobDTO.setNeedUpdateTaskStatus(true); + taskStopJobDTO.setMessage("用户手动触发停止"); + retryTaskStopHandler.stop(taskStopJobDTO); return true; } - - @Override - public boolean manualTriggerCallback(ManualTriggerTaskRequestVO requestVO) { - List uniqueIds = requestVO.getUniqueIds(); - - String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); - long count = accessTemplate.getGroupConfigAccess().count(new LambdaQueryWrapper() - .eq(GroupConfig::getGroupName, requestVO.getGroupName()) - .eq(GroupConfig::getNamespaceId, namespaceId) - .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()) - ); - - Assert.isTrue(count > 0, () -> new SnailJobServerException("组:[{}]已经关闭,不支持手动执行.", requestVO.getGroupName())); - - List list = accessTemplate.getRetryAccess().list(new LambdaQueryWrapper() - .eq(Retry::getNamespaceId, namespaceId) - .eq(Retry::getTaskType, SyetemTaskTypeEnum.CALLBACK.getType()) -// .in(Retry::getUniqueId, uniqueIds) - ); - Assert.notEmpty(list, () -> new SnailJobServerException("没有可执行的任务")); - - for (Retry retry : list) { -// for (TaskExecutor taskExecutor : taskExecutors) { -// if (taskExecutor.getTaskType().getScene() == TaskExecutorSceneEnum.MANUAL_CALLBACK.getScene()) { -// taskExecutor.actuator(retry); -// } -// } - - } - - return true; - } - }