diff --git a/doc/sql/snail_job_mysql.sql b/doc/sql/snail_job_mysql.sql index 1ec53ab8..9e6f34bd 100644 --- a/doc/sql/snail_job_mysql.sql +++ b/doc/sql/snail_job_mysql.sql @@ -120,24 +120,23 @@ CREATE TABLE `sj_retry` `executor_name` varchar(512) NOT NULL DEFAULT '' COMMENT '执行器名称', `args_str` text NOT NULL COMMENT '执行方法参数', `ext_attrs` text NOT NULL COMMENT '扩展字段', - `next_trigger_at` bigint(13) NOT NULL COMMENT '下次触发时间', + `next_trigger_at` bigint(13) NOT NULL COMMENT '下次触发时间', `retry_count` int(11) NOT NULL DEFAULT 0 COMMENT '重试次数', `retry_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '重试状态 0、重试中 1、成功 2、最大重试次数', `task_type` tinyint(4) NOT NULL DEFAULT 1 COMMENT '任务类型 1、重试数据 2、回调数据', `bucket_index` int(11) NOT NULL DEFAULT 0 COMMENT 'bucket', `parent_id` bigint(20) NOT NULL DEFAULT 0 COMMENT '父节点id', - `deleted` tinyint(4) NOT NULL DEFAULT 0 COMMENT '逻辑删除', + `deleted` tinyint(4) NOT NULL DEFAULT 0 COMMENT '逻辑删除', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', PRIMARY KEY (`id`), KEY `idx_namespace_id_group_name_scene_name` (`namespace_id`, `group_name`, `scene_name`), - KEY `idx_namespace_id_group_name_task_type` (`namespace_id`, `group_name`, `task_type`), KEY `idx_namespace_id_group_name_retry_status` (`namespace_id`, `group_name`, `retry_status`), KEY `idx_idempotent_id` (`idempotent_id`), KEY `idx_biz_no` (`biz_no`), KEY `idx_parent_id` (`parent_id`), KEY `idx_create_dt` (`create_dt`), - UNIQUE KEY `uk_name_unique_id_deleted` (`namespace_id`, `group_name`, `idempotent_id`, `deleted`) + UNIQUE KEY `uk_name_task_type_idempotent_id_deleted` (`namespace_id`, `group_name`, `task_type`, `idempotent_id`, `deleted`) ) ENGINE = InnoDB AUTO_INCREMENT = 0 DEFAULT CHARSET = utf8mb4 COMMENT ='重试信息表' diff --git a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/cache/FutureCache.java b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/cache/FutureCache.java index 7f2a2ecf..167df8ee 100644 --- a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/cache/FutureCache.java +++ b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/cache/FutureCache.java @@ -1,7 +1,5 @@ package com.aizuda.snailjob.client.core.cache; -import com.aizuda.snailjob.client.model.DispatchRetryResultDTO; -import com.aizuda.snailjob.client.model.ExecuteResult; import com.google.common.util.concurrent.ListenableFuture; import java.util.Optional; @@ -20,10 +18,10 @@ public class FutureCache { futureCache.put(retryTaskId, future); } - public static void remove(Long taskBatchId) { - Optional.ofNullable(futureCache.get(taskBatchId)).ifPresent(future -> { + public static void remove(Long retryTaskId) { + Optional.ofNullable(futureCache.get(retryTaskId)).ifPresent(future -> { future.cancel(true); - futureCache.remove(taskBatchId); + futureCache.remove(retryTaskId); }); } diff --git a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/client/SnailRetryEndPoint.java b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/client/SnailRetryEndPoint.java index a33894cc..e09e33f0 100644 --- a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/client/SnailRetryEndPoint.java +++ b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/client/SnailRetryEndPoint.java @@ -20,9 +20,11 @@ import com.aizuda.snailjob.client.core.executor.RemoteRetryExecutor; import com.aizuda.snailjob.client.core.loader.SnailRetrySpiLoader; import com.aizuda.snailjob.client.core.retryer.RetryerInfo; import com.aizuda.snailjob.client.core.serializer.JacksonSerializer; +import com.aizuda.snailjob.client.core.timer.StopTaskTimerTask; +import com.aizuda.snailjob.client.core.timer.TimerManager; import com.aizuda.snailjob.client.model.DispatchRetryResultDTO; import com.aizuda.snailjob.client.model.GenerateRetryIdempotentIdDTO; -import com.aizuda.snailjob.client.model.RetryCallbackDTO; +import com.aizuda.snailjob.client.model.request.RetryCallbackRequest; import com.aizuda.snailjob.client.model.request.DispatchRetryRequest; import com.aizuda.snailjob.client.model.request.StopRetryRequest; import com.aizuda.snailjob.common.core.enums.StatusEnum; @@ -106,17 +108,20 @@ public class SnailRetryEndPoint implements Lifecycle { FutureCache.addFuture(request.getRetryTaskId(), submit); Futures.addCallback(submit, new RetryTaskExecutorFutureCallback(retryContext), decorator); + // 将任务添加到时间轮中,到期停止任务 + TimerManager.add(new StopTaskTimerTask(request.getRetryTaskId()), request.getExecutorTimeout(), TimeUnit.SECONDS); + return new Result<>(Boolean.TRUE); } @Mapping(path = RETRY_CALLBACK, method = RequestMethod.POST) - public Result callback(@Valid RetryCallbackDTO callbackDTO) { + public Result callback(@Valid RetryCallbackRequest callbackDTO) { CallbackContext callbackContext = new CallbackContext(); try { - RetryerInfo retryerInfo = RetryerInfoCache.get(callbackDTO.getScene(), callbackDTO.getExecutorName()); + RetryerInfo retryerInfo = RetryerInfoCache.get(callbackDTO.getSceneName(), callbackDTO.getExecutorName()); if (Objects.isNull(retryerInfo)) { - SnailJobLog.REMOTE.error("场景:[{}]配置不存在, 请检查您的场景和执行器是否存在", callbackDTO.getScene()); + SnailJobLog.REMOTE.error("场景:[{}]配置不存在, 请检查您的场景和执行器是否存在", callbackDTO.getSceneName()); return new Result<>(0, "回调失败", Boolean.FALSE); } @@ -143,6 +148,9 @@ public class SnailRetryEndPoint implements Lifecycle { FutureCache.addFuture(callbackDTO.getRetryTaskId(), submit); Futures.addCallback(submit, new CallbackTaskExecutorFutureCallback(callbackContext), decorator); + // 将任务添加到时间轮中,到期停止任务 + TimerManager.add(new StopTaskTimerTask(callbackDTO.getRetryTaskId()), callbackDTO.getExecutorTimeout(), TimeUnit.SECONDS); + return new Result<>(Boolean.TRUE); } diff --git a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/timer/StopTaskTimerTask.java b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/timer/StopTaskTimerTask.java new file mode 100644 index 00000000..c80c6955 --- /dev/null +++ b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/timer/StopTaskTimerTask.java @@ -0,0 +1,24 @@ +package com.aizuda.snailjob.client.core.timer; + +import com.aizuda.snailjob.client.core.cache.FutureCache; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; + +/** + * @author opensnail + * @date 2025-02-18 + * @since 1.4.0 + */ +public class StopTaskTimerTask implements TimerTask { + + private Long retryTaskId; + + public StopTaskTimerTask(Long retryTaskId) { + this.retryTaskId = retryTaskId; + } + + @Override + public void run(Timeout timeout) throws Exception { + FutureCache.remove(retryTaskId); + } +} diff --git a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/timer/TimerManager.java b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/timer/TimerManager.java new file mode 100644 index 00000000..b17c6c80 --- /dev/null +++ b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/timer/TimerManager.java @@ -0,0 +1,31 @@ +package com.aizuda.snailjob.client.core.timer; + +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import org.springframework.scheduling.concurrent.CustomizableThreadFactory; + +import java.util.concurrent.TimeUnit; + +/** + * @author opensnail + * @date 2023-10-08 22:23:57 + * @since 2.4.0 + */ +public class TimerManager { + + private static final HashedWheelTimer wheelTimer; + + static { + wheelTimer = new HashedWheelTimer( + new CustomizableThreadFactory("retry-task-timer-wheel-"), 1, + TimeUnit.SECONDS, 1024); + } + + private TimerManager() { + } + + public static Timeout add(TimerTask task, long delay, TimeUnit unit) { + return wheelTimer.newTimeout(task, delay, unit); + } +} diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchRetryRequest.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchRetryRequest.java index 3ad4a4d0..554878c7 100644 --- a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchRetryRequest.java +++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchRetryRequest.java @@ -29,4 +29,6 @@ public class DispatchRetryRequest { private Long retryTaskId; @NotNull(message = "retryId 不能为空") private Long retryId; + @NotNull(message = "executorTimeout 不能为空") + private Integer executorTimeout; } diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/RetryCallbackDTO.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/RetryCallbackRequest.java similarity index 69% rename from snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/RetryCallbackDTO.java rename to snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/RetryCallbackRequest.java index a5fccdf3..8fd1b487 100644 --- a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/RetryCallbackDTO.java +++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/RetryCallbackRequest.java @@ -1,4 +1,4 @@ -package com.aizuda.snailjob.client.model; +package com.aizuda.snailjob.client.model.request; import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotNull; @@ -11,11 +11,13 @@ import lombok.Data; * @date 2022/03/25 10:06 */ @Data -public class RetryCallbackDTO { - @NotBlank(message = "group 不能为空") - private String group; - @NotBlank(message = "scene 不能为空") - private String scene; +public class RetryCallbackRequest { + @NotBlank(message = "namespaceId 不能为空") + private String namespaceId; + @NotBlank(message = "groupName 不能为空") + private String groupName; + @NotBlank(message = "sceneName 不能为空") + private String sceneName; @NotBlank(message = "参数 不能为空") private String argsStr; @NotBlank(message = "idempotentId 不能为空") @@ -28,6 +30,6 @@ public class RetryCallbackDTO { private Long retryTaskId; @NotNull(message = "retryId 不能为空") private Long retryId; - @NotBlank(message = "namespaceId 不能为空") - private String namespaceId; + @NotNull(message = "executorTimeout 不能为空") + private Integer executorTimeout; } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/client/RetryRpcClient.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/client/RetryRpcClient.java index 81775f27..9575760d 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/client/RetryRpcClient.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/client/RetryRpcClient.java @@ -1,8 +1,7 @@ package com.aizuda.snailjob.server.retry.task.client; -import com.aizuda.snailjob.client.model.DispatchRetryResultDTO; import com.aizuda.snailjob.client.model.GenerateRetryIdempotentIdDTO; -import com.aizuda.snailjob.client.model.RetryCallbackDTO; +import com.aizuda.snailjob.client.model.request.RetryCallbackRequest; import com.aizuda.snailjob.client.model.request.DispatchRetryRequest; import com.aizuda.snailjob.client.model.request.StopRetryRequest; import com.aizuda.snailjob.common.core.model.Result; @@ -30,7 +29,7 @@ public interface RetryRpcClient { Result stop(@Body StopRetryRequest stopRetryRequest); @Mapping(path = RETRY_CALLBACK, method = RequestMethod.POST) - Result callback(@Body RetryCallbackDTO retryCallbackDTO); + Result callback(@Body RetryCallbackRequest retryCallbackRequest); @Mapping(path = RETRY_GENERATE_IDEM_ID, method = RequestMethod.POST) Result generateIdempotentId(@Body GenerateRetryIdempotentIdDTO retryCallbackDTO); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryTaskPrepareDTO.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryTaskPrepareDTO.java index dd5d4e18..60ec581b 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryTaskPrepareDTO.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryTaskPrepareDTO.java @@ -32,4 +32,6 @@ public class RetryTaskPrepareDTO { private Integer blockStrategy; private boolean onlyTimeoutCheck; + + private Integer executorTimeout; } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/TaskStopJobDTO.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/TaskStopJobDTO.java index 627207fa..d9d1af8f 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/TaskStopJobDTO.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/TaskStopJobDTO.java @@ -19,4 +19,11 @@ public class TaskStopJobDTO extends BaseDTO { * 操作原因 */ private Integer operationReason; + + private String message; + + /** + * 是否需要变更任务状态 + */ + private boolean needUpdateTaskStatus; } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskConverter.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskConverter.java index 79084169..a4dbf306 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskConverter.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskConverter.java @@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.retry.task.support; import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.client.model.DispatchRetryResultDTO; +import com.aizuda.snailjob.client.model.request.RetryCallbackRequest; import com.aizuda.snailjob.client.model.request.DispatchRetryRequest; import com.aizuda.snailjob.client.model.request.StopRetryRequest; import com.aizuda.snailjob.common.core.util.JsonUtil; @@ -125,6 +126,8 @@ public interface RetryTaskConverter { RetryExecutorResultDTO toRetryExecutorResultDTO(DispatchRetryResultDTO resultDTO); + RetryExecutorResultDTO toRetryExecutorResultDTO(TaskStopJobDTO resultDTO); + RetryExecutorResultDTO toRetryExecutorResultDTO(RequestRetryExecutorDTO resultDTO); RetryExecutorResultDTO toRetryExecutorResultDTO(RequestCallbackExecutorDTO resultDTO); @@ -137,7 +140,9 @@ public interface RetryTaskConverter { TaskStopJobDTO toTaskStopJobDTO(BlockStrategyContext context); - StopRetryRequest toStopRetryRequest(RequestCallbackExecutorDTO executorDTO); + TaskStopJobDTO toTaskStopJobDTO(Retry retry); + + TaskStopJobDTO toTaskStopJobDTO(RetryTaskPrepareDTO context); StopRetryRequest toStopRetryRequest(RequestStopRetryTaskExecutorDTO executorDTO); @@ -167,4 +172,6 @@ public interface RetryTaskConverter { @Mapping(target = "taskType", source = "retry.taskType"), }) RequestCallbackExecutorDTO toRequestCallbackExecutorDTO(RetrySceneConfig retrySceneConfig, Retry retry); + + RetryCallbackRequest toRetryCallbackDTO(RequestCallbackExecutorDTO executorDTO); } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/block/OverlayRetryBlockStrategy.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/block/OverlayRetryBlockStrategy.java index 7f4f12cb..5550ae34 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/block/OverlayRetryBlockStrategy.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/block/OverlayRetryBlockStrategy.java @@ -36,9 +36,10 @@ public class OverlayRetryBlockStrategy extends AbstracJobBlockStrategy { TaskStopJobDTO stopJobDTO = RetryTaskConverter.INSTANCE.toTaskStopJobDTO(context); if (Objects.isNull(context.getOperationReason()) || context.getOperationReason() == JobOperationReasonEnum.NONE.getReason()) { - stopJobDTO.setOperationReason(RetryOperationReasonEnum.JOB_DISCARD.getReason()); + stopJobDTO.setOperationReason(RetryOperationReasonEnum.JOB_OVERLAY.getReason()); } + stopJobDTO.setNeedUpdateTaskStatus(true); retryTaskStopHandler.stop(stopJobDTO); } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestCallbackClientActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestCallbackClientActor.java index 3dfac0b3..13429558 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestCallbackClientActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestCallbackClientActor.java @@ -2,7 +2,8 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch; import akka.actor.AbstractActor; import akka.actor.ActorRef; -import com.aizuda.snailjob.client.model.request.StopRetryRequest; +import com.aizuda.snailjob.client.model.request.RetryCallbackRequest; +import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum; import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.model.Result; import com.aizuda.snailjob.common.log.SnailJobLog; @@ -17,16 +18,13 @@ import com.aizuda.snailjob.server.common.util.ClientInfoUtils; import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.retry.task.client.RetryRpcClient; import com.aizuda.snailjob.server.retry.task.dto.RequestCallbackExecutorDTO; -import com.aizuda.snailjob.server.retry.task.dto.RequestRetryExecutorDTO; import com.aizuda.snailjob.server.retry.task.dto.RetryExecutorResultDTO; -import com.aizuda.snailjob.server.retry.task.dto.RetryTaskLogDTO; import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; import com.aizuda.snailjob.server.retry.task.support.RetryTaskLogConverter; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; import com.github.rholder.retry.Attempt; import com.github.rholder.retry.RetryException; -import com.github.rholder.retry.RetryListener; import com.google.common.collect.Maps; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -78,13 +76,13 @@ public class RequestCallbackClientActor extends AbstractActor { return; } - StopRetryRequest stopRetryRequest = RetryTaskConverter.INSTANCE.toStopRetryRequest(executorDTO); + RetryCallbackRequest retryCallbackRequest = RetryTaskConverter.INSTANCE.toRetryCallbackDTO(executorDTO); try { // 构建请求客户端对象 RetryRpcClient rpcClient = buildRpcClient(registerNodeInfo, executorDTO); - Result dispatch = rpcClient.stop(stopRetryRequest); + Result dispatch = rpcClient.callback(retryCallbackRequest); if (dispatch.getStatus() == StatusEnum.YES.getStatus()) { SnailJobLog.LOCAL.info("retryTaskId:[{}] 任务调度成功.", executorDTO.getRetryTaskId()); } else { @@ -128,7 +126,7 @@ public class RequestCallbackClientActor extends AbstractActor { @Override public void onRetry(final Attempt attempt) { - if (attempt.getAttemptNumber() > 0) { + if (attempt.getAttemptNumber() > 1) { // 更新最新负载节点 String hostId = (String) properties.get("HOST_ID"); String hostIp = (String) properties.get("HOST_IP"); @@ -176,6 +174,7 @@ public class RequestCallbackClientActor extends AbstractActor { ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor(); RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO); executorResultDTO.setExceptionMsg(message); + executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE); actorRef.tell(executorResultDTO, actorRef); } } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestRetryClientActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestRetryClientActor.java index eecf55ef..7056fd9d 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestRetryClientActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestRetryClientActor.java @@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch; import akka.actor.AbstractActor; import akka.actor.ActorRef; import com.aizuda.snailjob.client.model.request.DispatchRetryRequest; +import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum; import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.model.Result; import com.aizuda.snailjob.common.core.model.SnailJobHeaders; @@ -181,6 +182,7 @@ public class RequestRetryClientActor extends AbstractActor { ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor(); RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO); executorResultDTO.setExceptionMsg(message); + executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE); actorRef.tell(executorResultDTO, actorRef); } } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RetryExecutor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RetryExecutor.java index 07ffa46d..5ef17c09 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RetryExecutor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RetryExecutor.java @@ -15,12 +15,15 @@ import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler; -import com.aizuda.snailjob.server.common.strategy.WaitStrategies; import com.aizuda.snailjob.server.common.util.ClientInfoUtils; +import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.retry.task.dto.RequestCallbackExecutorDTO; import com.aizuda.snailjob.server.retry.task.dto.RequestRetryExecutorDTO; import com.aizuda.snailjob.server.retry.task.dto.RetryTaskExecuteDTO; import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; +import com.aizuda.snailjob.server.retry.task.support.handler.RetryTaskStopHandler; +import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimeoutCheckTask; +import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimerWheel; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.SceneConfigMapper; @@ -34,6 +37,7 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; +import java.time.Duration; import java.util.Objects; /** @@ -53,6 +57,7 @@ public class RetryExecutor extends AbstractActor { private final RetryTaskMapper retryTaskMapper; private final SceneConfigMapper sceneConfigMapper; private final ClientNodeAllocateHandler clientNodeAllocateHandler; + private final RetryTaskStopHandler retryTaskStopHandler; @Override public Receive createReceive() { @@ -103,14 +108,14 @@ public class RetryExecutor extends AbstractActor { updateRetryTaskStatus(execute.getRetryTaskId(), RetryTaskStatusEnum.RUNNING.getStatus(), ClientInfoUtils.generate(serverNode)); - Object executorDTO; if (SyetemTaskTypeEnum.CALLBACK.getType().equals(retry.getTaskType())) { // 请求客户端 RequestCallbackExecutorDTO callbackExecutorDTO = RetryTaskConverter.INSTANCE.toRequestCallbackExecutorDTO(retrySceneConfig, retry); callbackExecutorDTO.setClientId(serverNode.getHostId()); callbackExecutorDTO.setRetryTaskId(execute.getRetryTaskId()); - executorDTO = callbackExecutorDTO; + ActorRef actorRef = ActorGenerator.callbackRealTaskExecutorActor(); + actorRef.tell(callbackExecutorDTO, actorRef); } else { // 请求客户端 @@ -118,21 +123,21 @@ public class RetryExecutor extends AbstractActor { retryExecutorDTO.setClientId(serverNode.getHostId()); retryExecutorDTO.setRetryTaskId(execute.getRetryTaskId()); - executorDTO = retryExecutorDTO; + ActorRef actorRef = ActorGenerator.retryRealTaskExecutorActor(); + actorRef.tell(retryExecutorDTO, actorRef); } - ActorRef actorRef = ActorGenerator.retryRealTaskExecutorActor(); - actorRef.tell(executorDTO, actorRef); + // 运行中的任务,需要进行超时检查 + RetryTimerWheel.registerWithRetry(() -> new RetryTimeoutCheckTask( + execute.getRetryTaskId(), execute.getRetryId(), retryTaskStopHandler, retryMapper, retryTaskMapper), + // 加500ms是为了让尽量保证客户端自己先超时中断,防止客户端上报成功但是服务端已触发超时中断 + Duration.ofMillis(DateUtils.toEpochMilli(retrySceneConfig.getExecutorTimeout()) + 500)); } private void updateRetryTaskStatus(Long retryTaskId, Integer taskStatus, String clientInfo) { updateRetryTaskStatus(retryTaskId, taskStatus, RetryOperationReasonEnum.NONE, clientInfo); } - private void updateRetryTaskStatus(Long retryTaskId, Integer taskStatus) { - updateRetryTaskStatus(retryTaskId, taskStatus, RetryOperationReasonEnum.NONE, null); - } - private void updateRetryTaskStatus(Long retryTaskId, Integer taskStatus, RetryOperationReasonEnum reasonEnum) { updateRetryTaskStatus(retryTaskId, taskStatus, reasonEnum, null); } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java index 9100ecdd..17763a42 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java @@ -119,7 +119,8 @@ public class ScanRetryActor extends AbstractActor { partitionTask -> ((RetryPartitionTask) partitionTask).getSceneName()); List retrySceneConfigs = accessTemplate.getSceneConfigAccess() .list(new LambdaQueryWrapper() - .select(RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval, RetrySceneConfig::getSceneName) + .select(RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval, RetrySceneConfig::getSceneName, + RetrySceneConfig::getCbTriggerType, RetrySceneConfig::getCbTriggerInterval) .in(RetrySceneConfig::getSceneName, sceneNameSet)); return StreamUtils.toIdentityMap(retrySceneConfigs, RetrySceneConfig::getSceneName); } @@ -132,6 +133,7 @@ public class ScanRetryActor extends AbstractActor { RetryTaskPrepareDTO retryTaskPrepareDTO = RetryTaskConverter.INSTANCE.toRetryTaskPrepareDTO(partitionTask); retryTaskPrepareDTO.setBlockStrategy(retrySceneConfig.getBackOff()); + retryTaskPrepareDTO.setExecutorTimeout(retrySceneConfig.getExecutorTimeout()); waitExecRetries.add(retryTaskPrepareDTO); } @@ -148,13 +150,15 @@ public class ScanRetryActor extends AbstractActor { } waitStrategyContext.setNextTriggerAt(nextTriggerAt); - waitStrategyContext.setTriggerInterval(retrySceneConfig.getTriggerInterval()); waitStrategyContext.setDelayLevel(partitionTask.getRetryCount() + 1); + // 更新触发时间, 任务进入时间轮 WaitStrategy waitStrategy; if (SyetemTaskTypeEnum.CALLBACK.getType().equals(partitionTask.getTaskType())) { + waitStrategyContext.setTriggerInterval(retrySceneConfig.getCbTriggerInterval()); waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getCbTriggerType()); } else { + waitStrategyContext.setTriggerInterval(retrySceneConfig.getTriggerInterval()); waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getBackOff()); } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/generator/retry/AbstractGenerator.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/generator/retry/AbstractGenerator.java index 9e6a0c52..5f914012 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/generator/retry/AbstractGenerator.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/generator/retry/AbstractGenerator.java @@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.retry.task.support.generator.retry; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Pair; +import cn.hutool.core.util.HashUtil; import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.enums.RetryStatusEnum; import com.aizuda.snailjob.common.core.enums.StatusEnum; @@ -10,6 +11,7 @@ import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.core.util.StreamUtils; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.WaitStrategy; +import com.aizuda.snailjob.server.common.config.SystemProperties; import com.aizuda.snailjob.server.common.enums.DelayLevelEnum; import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; @@ -49,7 +51,7 @@ public abstract class AbstractGenerator implements TaskGenerator { @Autowired private List idGeneratorList; @Autowired - private RetryTaskMapper retryTaskMapper; + private SystemProperties systemProperties; @Override @Transactional @@ -79,13 +81,11 @@ public abstract class AbstractGenerator implements TaskGenerator { Map> retryTaskMap = StreamUtils.groupByKey(retries, Retry::getIdempotentId); List waitInsertTasks = new ArrayList<>(); - List waitInsertTaskLogs = new ArrayList<>(); LocalDateTime now = LocalDateTime.now(); for (TaskContext.TaskInfo taskInfo : taskInfos) { Pair, List> pair = doConvertTask(retryTaskMap, taskContext, now, taskInfo, retrySceneConfig); waitInsertTasks.addAll(pair.getKey()); - waitInsertTaskLogs.addAll(pair.getValue()); } if (CollUtil.isEmpty(waitInsertTasks)) { @@ -95,8 +95,6 @@ public abstract class AbstractGenerator implements TaskGenerator { Assert.isTrue( waitInsertTasks.size() == retryTaskAccess.insertBatch(waitInsertTasks), () -> new SnailJobServerException("failed to report data")); - /*Assert.isTrue(waitInsertTaskLogs.size() == retryTaskMapper.insertBatch(waitInsertTaskLogs), - () -> new SnailJobServerException("新增重试日志失败"));*/ } /** @@ -130,6 +128,12 @@ public abstract class AbstractGenerator implements TaskGenerator { retry.setSceneName(taskContext.getSceneName()); retry.setRetryStatus(initStatus(taskContext)); retry.setBizNo(Optional.ofNullable(retry.getBizNo()).orElse(StrUtil.EMPTY)); + // 计算分桶逻辑 + retry.setBucketIndex( + HashUtil.bkdrHash(taskContext.getGroupName() + taskContext.getSceneName() + taskInfo.getIdempotentId()) + % systemProperties.getBucketTotal() + ); + retry.setCreateDt(now); retry.setUpdateDt(now); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/generator/task/RetryTaskGeneratorHandler.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/generator/task/RetryTaskGeneratorHandler.java index 476f2ab3..f06b6125 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/generator/task/RetryTaskGeneratorHandler.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/generator/task/RetryTaskGeneratorHandler.java @@ -16,6 +16,7 @@ import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; import java.time.Duration; +import java.util.Objects; /** *

@@ -38,10 +39,20 @@ public class RetryTaskGeneratorHandler { public void generateRetryTask(RetryTaskGeneratorDTO generator) { RetryTask retryTask = RetryTaskConverter.INSTANCE.toRetryTask(generator); - retryTask.setTaskStatus(RetryTaskStatusEnum.WAITING.getStatus()); + Integer taskStatus = generator.getTaskStatus(); + if (Objects.isNull(taskStatus)) { + taskStatus = RetryTaskStatusEnum.WAITING.getStatus(); + } + retryTask.setTaskStatus(taskStatus); + retryTask.setOperationReason(generator.getOperationReason()); + retryTask.setExtAttrs(StrUtil.EMPTY); Assert.isTrue(1 == retryTaskMapper.insert(retryTask), () -> new SnailJobServerException("插入重试任务失败")); + if (!RetryTaskStatusEnum.WAITING.getStatus().equals(taskStatus)) { + return; + } + // 放到到时间轮 long delay = generator.getNextTriggerAt() - DateUtils.toNowMilli(); RetryTimerContext timerContext = RetryTaskConverter.INSTANCE.toRetryTimerContext(generator); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/RetryTaskStopHandler.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/RetryTaskStopHandler.java index 947d8735..ba045fcf 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/RetryTaskStopHandler.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/RetryTaskStopHandler.java @@ -2,10 +2,12 @@ package com.aizuda.snailjob.server.retry.task.support.handler; import akka.actor.ActorRef; import cn.hutool.core.lang.Assert; +import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum; import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum; import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.retry.task.dto.RequestRetryExecutorDTO; +import com.aizuda.snailjob.server.retry.task.dto.RetryExecutorResultDTO; import com.aizuda.snailjob.server.retry.task.dto.TaskStopJobDTO; import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper; @@ -42,6 +44,20 @@ public class RetryTaskStopHandler { RequestRetryExecutorDTO executorDTO = RetryTaskConverter.INSTANCE.toRealRetryExecutorDTO(stopJobDTO); ActorRef actorRef = ActorGenerator.stopRetryTaskActor(); actorRef.tell(executorDTO, actorRef); + + // 更新结果为失败 + doHandleResult(stopJobDTO); + } + + private static void doHandleResult(TaskStopJobDTO stopJobDTO) { + if (!stopJobDTO.isNeedUpdateTaskStatus()) { + return; + } + RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(stopJobDTO); + executorResultDTO.setExceptionMsg(stopJobDTO.getMessage()); + executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE); + ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor(); + actorRef.tell(executorResultDTO, actorRef); } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/prepare/RunningRetryPrepareHandler.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/prepare/RunningRetryPrepareHandler.java index 0a4bd363..8294b0c0 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/prepare/RunningRetryPrepareHandler.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/prepare/RunningRetryPrepareHandler.java @@ -1,13 +1,19 @@ package com.aizuda.snailjob.server.retry.task.support.prepare; +import com.aizuda.snailjob.common.core.context.SnailSpringContext; import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; +import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum; +import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.retry.task.dto.RetryTaskPrepareDTO; +import com.aizuda.snailjob.server.retry.task.dto.TaskStopJobDTO; import com.aizuda.snailjob.server.retry.task.support.BlockStrategy; import com.aizuda.snailjob.server.retry.task.support.RetryPrePareHandler; import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; import com.aizuda.snailjob.server.retry.task.support.block.BlockStrategyContext; import com.aizuda.snailjob.server.retry.task.support.block.RetryBlockStrategyFactory; +import com.aizuda.snailjob.server.retry.task.support.handler.RetryTaskStopHandler; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -23,7 +29,9 @@ import java.util.Objects; */ @Component @Slf4j +@RequiredArgsConstructor public class RunningRetryPrepareHandler implements RetryPrePareHandler { + private final RetryTaskStopHandler retryTaskStopHandler; @Override public boolean matches(Integer status) { @@ -35,13 +43,19 @@ public class RunningRetryPrepareHandler implements RetryPrePareHandler { // 若存在所有的任务都是完成,但是批次上的状态为运行中,则是并发导致的未把批次状态变成为终态,此处做一次兜底处理 int blockStrategy = prepare.getBlockStrategy(); JobOperationReasonEnum jobOperationReasonEnum = JobOperationReasonEnum.NONE; -// CompleteJobBatchDTO completeJobBatchDTO = RetryTaskConverter.INSTANCE.completeJobBatchDTO(prepare); -// completeJobBatchDTO.setJobOperationReason(jobOperationReasonEnum.getReason()); -// if (jobTaskBatchHandler.handleResult(completeJobBatchDTO)) { -// blockStrategy = BlockStrategyEnum.CONCURRENCY.getBlockStrategy(); -// } else { -// -// } + + // 计算超时时间 + long delay = DateUtils.toNowMilli() - prepare.getNextTriggerAt(); + + // 计算超时时间,到达超时时间中断任务 + if (delay > DateUtils.toEpochMilli(prepare.getExecutorTimeout())) { + log.info("任务执行超时.retryTaskId:[{}] delay:[{}] executorTimeout:[{}]", prepare.getRetryTaskId(), delay, DateUtils.toEpochMilli(prepare.getExecutorTimeout())); + // 超时停止任务 + TaskStopJobDTO stopJobDTO = RetryTaskConverter.INSTANCE.toTaskStopJobDTO(prepare); + stopJobDTO.setOperationReason(JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason()); + stopJobDTO.setNeedUpdateTaskStatus(true); + retryTaskStopHandler.stop(stopJobDTO); + } // 仅是超时检测的,不执行阻塞策略 if (prepare.isOnlyTimeoutCheck()) { diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/AbstractTimerTask.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/AbstractTimerTask.java index 10480867..42d31394 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/AbstractTimerTask.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/AbstractTimerTask.java @@ -17,15 +17,17 @@ public abstract class AbstractTimerTask implements TimerTask { protected String groupName; protected String namespaceId; protected Long retryId; + protected Long retryTaskId; @Override public void run(Timeout timeout) throws Exception { - log.info("开始执行重试任务. 当前时间:[{}] groupName:[{}] retryId:[{}] namespaceId:[{}]", LocalDateTime.now(), groupName, - retryId, namespaceId); + log.info("开始执行重试任务. 当前时间:[{}] groupName:[{}] retryId:[{}] retryTaskId:[{}] namespaceId:[{}]", + LocalDateTime.now(), groupName, retryId, retryTaskId, namespaceId); try { doRun(timeout); } catch (Exception e) { - log.error("重试任务执行失败 groupName:[{}] retryId:[{}] namespaceId:[{}]", groupName, retryId, namespaceId, e); + log.error("重试任务执行失败 groupName:[{}] retryId:[{}] retryTaskId:[{}] namespaceId:[{}]", + groupName, retryId, retryTaskId, namespaceId, e); } finally { // 先清除时间轮的缓存 RetryTimerWheel.clearCache(idempotentKey()); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/RetryTimeoutCheckTask.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/RetryTimeoutCheckTask.java new file mode 100644 index 00000000..f75c7f75 --- /dev/null +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/RetryTimeoutCheckTask.java @@ -0,0 +1,74 @@ +package com.aizuda.snailjob.server.retry.task.support.timer; + +import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; +import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.TimerTask; +import com.aizuda.snailjob.server.retry.task.dto.TaskStopJobDTO; +import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; +import com.aizuda.snailjob.server.retry.task.support.handler.RetryTaskStopHandler; +import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.Retry; +import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; +import io.netty.util.Timeout; +import lombok.AllArgsConstructor; + +import java.text.MessageFormat; +import java.util.Objects; + +/** + * 任务超时检查 + * + * @author opensnail + * @date 2024-05-20 21:16:09 + * @since sj_1.0.0 + */ +@AllArgsConstructor +public class RetryTimeoutCheckTask implements TimerTask { + private static final String IDEMPOTENT_KEY_PREFIX = "retry_timeout_check_{0}"; + + private final Long retryTaskId; + private final Long retryId; + private final RetryTaskStopHandler retryTaskStopHandler; + private final RetryMapper retryMapper; + private final RetryTaskMapper retryTaskMapper; + + @Override + public void run(Timeout timeout) throws Exception { + RetryTimerWheel.clearCache(idempotentKey()); + RetryTask retryTask = retryTaskMapper.selectById(retryTaskId); + if (Objects.isNull(retryTask)) { + SnailJobLog.LOCAL.error("retryTaskId:[{}] 不存在", retryTaskId); + return; + } + + // 已经完成了,无需重复停止任务 + if (RetryTaskStatusEnum.TERMINAL_STATUS_SET.contains(retryTask.getTaskStatus())) { + return; + } + + Retry retry = retryMapper.selectById(retryId); + if (Objects.isNull(retry)) { + SnailJobLog.LOCAL.error("retryId:[{}]不存在", retryId); + return; + } + + // 超时停止任务 + String reason = "超时中断.retryTaskId:[" + retryTaskId + "]"; + + TaskStopJobDTO stopJobDTO = RetryTaskConverter.INSTANCE.toTaskStopJobDTO(retry); + stopJobDTO.setRetryTaskId(retryTaskId); + stopJobDTO.setRetryId(retryId); + stopJobDTO.setOperationReason(JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason()); + stopJobDTO.setNeedUpdateTaskStatus(true); + retryTaskStopHandler.stop(stopJobDTO); + + SnailJobLog.LOCAL.info(reason); + } + + @Override + public String idempotentKey() { + return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, retryId); + } +} diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/RetryTimerTask.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/RetryTimerTask.java index e87fe3a6..b97f49ab 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/RetryTimerTask.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/RetryTimerTask.java @@ -38,6 +38,8 @@ public class RetryTimerTask extends AbstractTimerTask { this.context = context; super.groupName = context.getGroupName(); super.namespaceId = context.getNamespaceId(); + super.retryId = context.getRetryId(); + super.retryTaskId = context.getRetryTaskId(); } @Override