diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryTaskExecuteDTO.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryTaskExecuteDTO.java index 8dea85bd6..6dff0e1f1 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryTaskExecuteDTO.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryTaskExecuteDTO.java @@ -16,5 +16,7 @@ import lombok.EqualsAndHashCode; public class RetryTaskExecuteDTO extends BaseDTO { private Integer routeKey; + + private Integer retryTaskExecutorScene; } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryTaskGeneratorDTO.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryTaskGeneratorDTO.java index 2fa742d72..31a5906cb 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryTaskGeneratorDTO.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryTaskGeneratorDTO.java @@ -22,4 +22,6 @@ public class RetryTaskGeneratorDTO extends BaseDTO { private Integer taskType; private long nextTriggerAt; + + private Integer retryTaskExecutorScene; } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RetryExecutor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RetryExecutor.java index 20e7b2396..bbf0d76fb 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RetryExecutor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RetryExecutor.java @@ -12,6 +12,7 @@ import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; +import com.aizuda.snailjob.server.common.enums.RetryTaskExecutorSceneEnum; import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler; @@ -78,7 +79,11 @@ public class RetryExecutor extends AbstractActor { private void doExecute(RetryTaskExecuteDTO execute) { LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); - wrapper.eq(Retry::getId, execute.getRetryId()).eq(Retry::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()); + + if (RetryTaskExecutorSceneEnum.MANUAL_RETRY.getScene() != execute.getRetryTaskExecutorScene()) { + wrapper.eq(Retry::getId, execute.getRetryId()).eq(Retry::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()); + } + Retry retry = retryMapper.selectOne(wrapper); if (Objects.isNull(retry)) { // 没有执行中的任务不执行调度 @@ -86,6 +91,10 @@ public class RetryExecutor extends AbstractActor { return; } + execute.setNamespaceId(retry.getNamespaceId()); + execute.setGroupName(retry.getGroupName()); + execute.setTaskType(retry.getTaskType()); + if (CollUtil.isEmpty(CacheRegisterTable.getServerNodeSet(retry.getGroupName(), retry.getNamespaceId()))) { // 无客户端不执行调度 updateRetryTaskStatus(execute.getRetryTaskId(), RetryTaskStatusEnum.CANCEL.getStatus(), RetryOperationReasonEnum.NOT_CLIENT); @@ -104,8 +113,8 @@ public class RetryExecutor extends AbstractActor { } // 获取执行的客户端 - RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(execute.getRetryId().toString(), - execute.getGroupName(), execute.getNamespaceId(), retrySceneConfig.getRouteKey()); + RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(retry.getId().toString(), + retry.getGroupName(), retry.getNamespaceId(), retrySceneConfig.getRouteKey()); updateRetryTaskStatus(execute.getRetryTaskId(), RetryTaskStatusEnum.RUNNING.getStatus(), ClientInfoUtils.generate(serverNode)); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/CallbackTimerTask.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/CallbackTimerTask.java deleted file mode 100644 index 28094c4ae..000000000 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/CallbackTimerTask.java +++ /dev/null @@ -1,53 +0,0 @@ -package com.aizuda.snailjob.server.retry.task.support.timer; - -import com.aizuda.snailjob.common.core.context.SnailSpringContext; -import com.aizuda.snailjob.common.core.enums.RetryStatusEnum; -import com.aizuda.snailjob.common.log.SnailJobLog; -import com.aizuda.snailjob.template.datasource.access.AccessTemplate; -import com.aizuda.snailjob.template.datasource.access.TaskAccess; -import com.aizuda.snailjob.template.datasource.persistence.po.Retry; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import io.netty.util.Timeout; - -import java.text.MessageFormat; -import java.time.LocalDateTime; -import java.util.Objects; - -/** - * @author: opensnail - * @date : 2023-09-22 17:09 - */ -public class CallbackTimerTask extends AbstractTimerTask { - public static final String IDEMPOTENT_KEY_PREFIX = "callback_{0}_{1}_{2}"; - - private final RetryTimerContext context; - - public CallbackTimerTask(RetryTimerContext context) { - this.context = context; - super.groupName = context.getGroupName(); -// super.uniqueId = context.getUniqueId(); - super.namespaceId = context.getNamespaceId(); - } - - @Override - protected void doRun(final Timeout timeout) { - SnailJobLog.LOCAL.debug("回调任务执行 {}", LocalDateTime.now()); - AccessTemplate accessTemplate = SnailSpringContext.getBeanByType(AccessTemplate.class); - TaskAccess retryTaskAccess = accessTemplate.getRetryAccess(); - Retry retry = retryTaskAccess.one(new LambdaQueryWrapper() - .eq(Retry::getNamespaceId, context.getNamespaceId()) - .eq(Retry::getGroupName, context.getGroupName()) - .eq(Retry::getId, context.getRetryId()) - .eq(Retry::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())); - if (Objects.isNull(retry)) { - return; - } -// TaskExecutor taskExecutor = TaskActuatorFactory.getTaskActuator(context.getScene()); -// taskExecutor.actuator(retry); - } - - @Override - public String idempotentKey() { - return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, context.getGroupName(), context.getNamespaceId(), context.getRetryTaskId()); - } -} diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/RetryTimerContext.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/RetryTimerContext.java index 99a274917..9bfd1a676 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/RetryTimerContext.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/RetryTimerContext.java @@ -10,14 +10,10 @@ import lombok.Data; @Data public class RetryTimerContext { - private String namespaceId; - - private String groupName; - private Long retryId; private Long retryTaskId; -// private TaskExecutorSceneEnum scene; + private Integer retryTaskExecutorScene; } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/RetryTimerTask.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/RetryTimerTask.java index b97f49abf..0a921d929 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/RetryTimerTask.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/RetryTimerTask.java @@ -36,8 +36,6 @@ public class RetryTimerTask extends AbstractTimerTask { public RetryTimerTask(RetryTimerContext context) { this.context = context; - super.groupName = context.getGroupName(); - super.namespaceId = context.getNamespaceId(); super.retryId = context.getRetryId(); super.retryTaskId = context.getRetryTaskId(); } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/RetryConverter.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/RetryConverter.java index 68ef7f96b..544a3e947 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/RetryConverter.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/RetryConverter.java @@ -3,6 +3,8 @@ package com.aizuda.snailjob.server.web.service.convert; import com.aizuda.snailjob.server.retry.task.dto.RetryTaskPrepareDTO; import com.aizuda.snailjob.server.retry.task.dto.TaskStopJobDTO; import com.aizuda.snailjob.template.datasource.persistence.po.Retry; +import org.mapstruct.Mapper; +import org.mapstruct.Mapping; import org.mapstruct.factory.Mappers; /** @@ -13,6 +15,7 @@ import org.mapstruct.factory.Mappers; * @author opensnail * @date 2025-02-22 */ +@Mapper public interface RetryConverter { RetryConverter INSTANCE = Mappers.getMapper(RetryConverter.class); diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryServiceImpl.java index ca2793d9c..221cd43b0 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryServiceImpl.java @@ -159,7 +159,6 @@ public class RetryServiceImpl implements RetryService { } retry.setRetryStatus(requestVO.getRetryStatus()); - retry.setGroupName(requestVO.getGroupName()); // 若恢复重试则需要重新计算下次触发时间 if (RetryStatusEnum.RUNNING.getStatus().equals(retryStatusEnum.getStatus())) { @@ -393,7 +392,7 @@ public class RetryServiceImpl implements RetryService { // 设置now表示立即执行 retryTaskPrepareDTO.setNextTriggerAt(DateUtils.toNowMilli()); retryTaskPrepareDTO.setRetryTaskExecutorScene(RetryTaskExecutorSceneEnum.MANUAL_RETRY.getScene()); - + retryTaskPrepareDTO.setRetryId(retry.getId()); // 准备阶段执行 ActorRef actorRef = ActorGenerator.retryTaskPrepareActor(); actorRef.tell(retryTaskPrepareDTO, actorRef); diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java index d1304ad93..c7dd6ae1b 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java @@ -6,6 +6,7 @@ import cn.hutool.core.util.ObjUtil; import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.RetryStatusEnum; +import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.constant.LogFieldConstants; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; @@ -175,7 +176,7 @@ public class RetryTaskServiceImpl implements RetryTaskService { RetryTask retryTask = retryTaskMapper.selectOne( new LambdaQueryWrapper() - .in(RetryTask::getTaskStatus, List.of(RetryStatusEnum.FINISH.getStatus(), RetryStatusEnum.MAX_COUNT.getStatus())) + .in(RetryTask::getTaskStatus, RetryTaskStatusEnum.TERMINAL_STATUS_SET) .eq(RetryTask::getNamespaceId, namespaceId) .eq(RetryTask::getId, id)); Assert.notNull(retryTask, () -> new SnailJobServerException("数据删除失败"));