diff --git a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/client/SnailRetryEndPoint.java b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/client/SnailRetryEndPoint.java index e09e33f0f..2903d94e9 100644 --- a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/client/SnailRetryEndPoint.java +++ b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/client/SnailRetryEndPoint.java @@ -111,6 +111,8 @@ public class SnailRetryEndPoint implements Lifecycle { // 将任务添加到时间轮中,到期停止任务 TimerManager.add(new StopTaskTimerTask(request.getRetryTaskId()), request.getExecutorTimeout(), TimeUnit.SECONDS); + SnailJobLog.REMOTE.info("重试任务:[{}] 任务调度成功. ", request.getRetryTaskId()); + return new Result<>(Boolean.TRUE); } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractRetryAlarm.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractRetryAlarm.java index 37dc9f2ef..dd93f3af6 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractRetryAlarm.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractRetryAlarm.java @@ -48,6 +48,10 @@ public abstract class AbstractRetryAlarm extends Abs retryAlarmInfo.getGroupName(), retryAlarmInfo.getSceneName(), retryAlarmInfo.getNamespaceId())); + if (Objects.isNull(retrySceneConfig)) { + continue; + } + Set retryNotifyIds = StrUtil.isBlank(retrySceneConfig.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(retrySceneConfig.getNotifyIds(), Long.class)); for (Long retryNotifyId : retryNotifyIds) { diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryExecutorResultDTO.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryExecutorResultDTO.java index 5029290a4..f646b1c39 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryExecutorResultDTO.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryExecutorResultDTO.java @@ -17,7 +17,7 @@ import lombok.EqualsAndHashCode; public class RetryExecutorResultDTO extends BaseDTO { private RetryResultStatusEnum resultStatus; - + private boolean incrementRetryCount; private String resultJson; private Integer statusCode; private String idempotentId; diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RetryExecutor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RetryExecutor.java index 5ef17c090..20e7b2396 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RetryExecutor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RetryExecutor.java @@ -39,6 +39,7 @@ import org.springframework.stereotype.Component; import java.time.Duration; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; /** *

diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java index 17763a426..ff8ee4b68 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java @@ -119,8 +119,9 @@ public class ScanRetryActor extends AbstractActor { partitionTask -> ((RetryPartitionTask) partitionTask).getSceneName()); List retrySceneConfigs = accessTemplate.getSceneConfigAccess() .list(new LambdaQueryWrapper() - .select(RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval, RetrySceneConfig::getSceneName, - RetrySceneConfig::getCbTriggerType, RetrySceneConfig::getCbTriggerInterval) + .select(RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval, + RetrySceneConfig::getSceneName, RetrySceneConfig::getCbTriggerType, + RetrySceneConfig::getCbTriggerInterval, RetrySceneConfig::getExecutorTimeout) .in(RetrySceneConfig::getSceneName, sceneNameSet)); return StreamUtils.toIdentityMap(retrySceneConfigs, RetrySceneConfig::getSceneName); } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/CallbackRetryTaskHandler.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/CallbackRetryTaskHandler.java index 0dd68b1ab..2926be976 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/CallbackRetryTaskHandler.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/CallbackRetryTaskHandler.java @@ -19,6 +19,7 @@ import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMappe import com.aizuda.snailjob.template.datasource.persistence.po.Retry; import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.slf4j.helpers.FormattingTuple; import org.slf4j.helpers.MessageFormatter; @@ -40,16 +41,9 @@ import java.util.Objects; */ @Component @Slf4j +@RequiredArgsConstructor public class CallbackRetryTaskHandler { - - private static final String CALLBACK_UNIQUE_ID_RULE = "{}_{}"; - - @Autowired - protected AccessTemplate accessTemplate; - @Autowired - private RetryTaskMapper retryTaskMapper; - @Autowired - private SystemProperties systemProperties; + private final AccessTemplate accessTemplate; /** * 创建回调数据 @@ -79,6 +73,7 @@ public class CallbackRetryTaskHandler { WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getCbTriggerType()); WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); waitStrategyContext.setNextTriggerAt(DateUtils.toNowMilli()); + waitStrategyContext.setDelayLevel(1); waitStrategyContext.setTriggerInterval(String.valueOf(triggerInterval)); callbackRetry.setNextTriggerAt(waitStrategy.computeTriggerTime(waitStrategyContext)); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/request/ReportDispatchResultHttpRequestHandler.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/request/ReportDispatchResultHttpRequestHandler.java index a77731e9e..bb6340b74 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/request/ReportDispatchResultHttpRequestHandler.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/request/ReportDispatchResultHttpRequestHandler.java @@ -60,7 +60,7 @@ public class ReportDispatchResultHttpRequestHandler extends PostHttpRequestHandl RetryResultStatusEnum statusEnum = RetryResultStatusEnum.getRetryResultStatusEnum(resultDTO.getStatusCode()); Assert.notNull(statusEnum, () -> new SnailJobServerException("status code is invalid")); executorResultDTO.setResultStatus(statusEnum); - + executorResultDTO.setIncrementRetryCount(true); ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor(); actorRef.tell(executorResultDTO, actorRef); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryFailureHandler.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryFailureHandler.java index 2238403ab..a18cb8e3f 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryFailureHandler.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryFailureHandler.java @@ -18,6 +18,7 @@ import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMappe import com.aizuda.snailjob.template.datasource.persistence.po.Retry; import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -58,7 +59,9 @@ public class RetryFailureHandler extends AbstractRetryResultHandler { accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName( context.getGroupName(), context.getSceneName(), context.getNamespaceId()); - Retry retry = retryMapper.selectById(context.getRetryId()); + Retry retry = retryMapper.selectOne(new LambdaQueryWrapper() + .select(Retry::getId, Retry::getRetryCount) + .eq(Retry::getId, context.getRetryId())); transactionTemplate.execute(new TransactionCallbackWithoutResult() { @Override protected void doInTransactionWithoutResult(TransactionStatus status) { @@ -70,16 +73,21 @@ public class RetryFailureHandler extends AbstractRetryResultHandler { maxRetryCount = retrySceneConfig.getMaxRetryCount(); } - if (maxRetryCount <= retry.getRetryCount()) { + if (maxRetryCount <= retry.getRetryCount() + 1) { retry.setRetryStatus(RetryStatusEnum.MAX_COUNT.getStatus()); + retry.setUpdateDt(LocalDateTime.now()); + Assert.isTrue(1 == retryMapper.updateById(retry), + () -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName())); // 创建一个回调任务 callbackRetryTaskHandler.create(retry, retrySceneConfig); + } else if (context.isIncrementRetryCount()) { + retry.setRetryCount(retry.getRetryCount() + 1); + retry.setUpdateDt(LocalDateTime.now()); + Assert.isTrue(1 == retryMapper.updateById(retry), + () -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName())); + } - retry.setRetryCount(retry.getRetryCount() + 1); - retry.setUpdateDt(LocalDateTime.now()); - Assert.isTrue(1 == retryMapper.updateById(retry), - () -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName())); RetryTask retryTask = new RetryTask(); retryTask.setId(context.getRetryTaskId()); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryResultContext.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryResultContext.java index 3ddfd569d..ed1a66d10 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryResultContext.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryResultContext.java @@ -19,6 +19,8 @@ public class RetryResultContext extends BaseDTO { private RetryResultStatusEnum resultStatus; + private boolean incrementRetryCount; + private String resultJson; private String idempotentId; private String exceptionMsg; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskLogMessageQueryVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskLogMessageQueryVO.java index 54f9b0591..b3f2e1179 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskLogMessageQueryVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskLogMessageQueryVO.java @@ -14,10 +14,6 @@ public class RetryTaskLogMessageQueryVO extends BaseQueryVO { private String groupName; -// private String uniqueId; - - private Long retryId; - private Long retryTaskId; private Long startId; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/RetryResponseVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/RetryResponseVO.java index 2e3061c7c..10c57b023 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/RetryResponseVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/RetryResponseVO.java @@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.web.model.response; import lombok.Data; import java.time.LocalDateTime; +import java.util.List; /** * @author opensnail @@ -30,7 +31,7 @@ public class RetryResponseVO { private String executorName; - private Long nextTriggerAt; + private LocalDateTime nextTriggerAt; private Integer retryCount; @@ -42,6 +43,6 @@ public class RetryResponseVO { private LocalDateTime updateDt; - private RetryResponseVO children; + private List children; } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/RetryTaskResponseVOConverter.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/RetryTaskResponseVOConverter.java index 0182af255..0fd250fc5 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/RetryTaskResponseVOConverter.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/RetryTaskResponseVOConverter.java @@ -1,11 +1,16 @@ package com.aizuda.snailjob.server.web.service.convert; +import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.web.model.response.RetryResponseVO; import com.aizuda.snailjob.template.datasource.persistence.po.Retry; import org.mapstruct.Mapper; +import org.mapstruct.Mapping; +import org.mapstruct.Mappings; import org.mapstruct.factory.Mappers; +import java.time.LocalDateTime; import java.util.List; +import java.util.Objects; /** * @author opensnail @@ -17,7 +22,18 @@ public interface RetryTaskResponseVOConverter { RetryTaskResponseVOConverter INSTANCE = Mappers.getMapper(RetryTaskResponseVOConverter.class); + @Mappings({ + @Mapping(target = "nextTriggerAt", expression = "java(RetryTaskResponseVOConverter.toLocalDateTime(retry.getNextTriggerAt()))") + }) RetryResponseVO convert(Retry retry); List convertList(List retries); + + static LocalDateTime toLocalDateTime(Long nextTriggerAt) { + if (Objects.isNull(nextTriggerAt) || nextTriggerAt == 0) { + return null; + } + + return DateUtils.toLocalDateTime(nextTriggerAt); + } } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskLogServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskLogServiceImpl.java index f77c646bb..b937bd0b9 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskLogServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskLogServiceImpl.java @@ -72,7 +72,7 @@ public class RetryTaskLogServiceImpl implements RetryTaskLogService { @Override public RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage( RetryTaskLogMessageQueryVO queryVO) { - if (queryVO.getRetryId() == null || StrUtil.isBlank(queryVO.getGroupName())) { + if (queryVO.getRetryTaskId() == null || StrUtil.isBlank(queryVO.getGroupName())) { RetryTaskLogMessageResponseVO jobLogResponseVO = new RetryTaskLogMessageResponseVO(); jobLogResponseVO.setNextStartId(0L); jobLogResponseVO.setFromIndex(0); @@ -87,7 +87,6 @@ public class RetryTaskLogServiceImpl implements RetryTaskLogService { .select(RetryTaskLogMessage::getId, RetryTaskLogMessage::getLogNum) .ge(RetryTaskLogMessage::getId, queryVO.getStartId()) .eq(RetryTaskLogMessage::getNamespaceId, namespaceId) - .eq(RetryTaskLogMessage::getRetryId, queryVO.getRetryId()) .eq(RetryTaskLogMessage::getRetryTaskId, queryVO.getRetryTaskId()) .eq(RetryTaskLogMessage::getGroupName, queryVO.getGroupName()) .orderByAsc(RetryTaskLogMessage::getId).orderByAsc(RetryTaskLogMessage::getRealTime) diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java index 2e4474d9b..aac86ddbe 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java @@ -42,6 +42,7 @@ import com.aizuda.snailjob.template.datasource.persistence.po.*; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; +import com.google.common.collect.Lists; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; @@ -93,21 +94,28 @@ public class RetryTaskServiceImpl implements RetryTaskService { .eq(StrUtil.isNotBlank(queryVO.getIdempotentId()), Retry::getIdempotentId, queryVO.getIdempotentId()) .eq(Objects.nonNull(queryVO.getRetryId()), Retry::getId, queryVO.getRetryId()) .eq(Objects.nonNull(queryVO.getRetryStatus()), Retry::getRetryStatus, queryVO.getRetryStatus()) + .eq(Retry::getTaskType, SyetemTaskTypeEnum.RETRY.getType()) .select(Retry::getId, Retry::getBizNo, Retry::getIdempotentId, Retry::getGroupName, Retry::getNextTriggerAt, Retry::getRetryCount, Retry::getRetryStatus, Retry::getUpdateDt, Retry::getSceneName, - Retry::getTaskType) + Retry::getTaskType, Retry::getParentId) .orderByDesc(Retry::getCreateDt); pageDTO = accessTemplate.getRetryAccess().listPage(pageDTO, queryWrapper); Set ids = StreamUtils.toSet(pageDTO.getRecords(), Retry::getId); - List callbackTaskList = accessTemplate.getRetryAccess().list(new LambdaQueryWrapper().eq(Retry::getParentId, ids)); + List callbackTaskList = accessTemplate.getRetryAccess().list(new LambdaQueryWrapper() + .in(Retry::getParentId, ids)); Map callbackMap = StreamUtils.toIdentityMap(callbackTaskList, Retry::getParentId); List retryResponseList = RetryTaskResponseVOConverter.INSTANCE.convertList(pageDTO.getRecords()); for (RetryResponseVO retryResponseVO : retryResponseList) { - retryResponseVO.setChildren(RetryTaskResponseVOConverter.INSTANCE.convert(callbackMap.get(retryResponseVO.getId()))); + RetryResponseVO responseVO = RetryTaskResponseVOConverter.INSTANCE.convert(callbackMap.get(retryResponseVO.getId())); + if (Objects.isNull(responseVO)) { + retryResponseVO.setChildren(Lists.newArrayList()); + } else { + retryResponseVO.setChildren(Lists.newArrayList(responseVO)); + } } return new PageResult<>(pageDTO, retryResponseList);