feat(1.4.0-beta1): 调试重试信息与重试任务接口

This commit is contained in:
opensnail 2025-02-18 23:21:01 +08:00
parent b01ba347cf
commit 8b05c6f43a
14 changed files with 63 additions and 30 deletions

View File

@ -111,6 +111,8 @@ public class SnailRetryEndPoint implements Lifecycle {
// 将任务添加到时间轮中到期停止任务
TimerManager.add(new StopTaskTimerTask(request.getRetryTaskId()), request.getExecutorTimeout(), TimeUnit.SECONDS);
SnailJobLog.REMOTE.info("重试任务:[{}] 任务调度成功. ", request.getRetryTaskId());
return new Result<>(Boolean.TRUE);
}

View File

@ -48,6 +48,10 @@ public abstract class AbstractRetryAlarm<E extends ApplicationEvent> extends Abs
retryAlarmInfo.getGroupName(),
retryAlarmInfo.getSceneName(),
retryAlarmInfo.getNamespaceId()));
if (Objects.isNull(retrySceneConfig)) {
continue;
}
Set<Long> retryNotifyIds = StrUtil.isBlank(retrySceneConfig.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(retrySceneConfig.getNotifyIds(), Long.class));
for (Long retryNotifyId : retryNotifyIds) {

View File

@ -17,7 +17,7 @@ import lombok.EqualsAndHashCode;
public class RetryExecutorResultDTO extends BaseDTO {
private RetryResultStatusEnum resultStatus;
private boolean incrementRetryCount;
private String resultJson;
private Integer statusCode;
private String idempotentId;

View File

@ -39,6 +39,7 @@ import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
/**
* <p>

View File

@ -119,8 +119,9 @@ public class ScanRetryActor extends AbstractActor {
partitionTask -> ((RetryPartitionTask) partitionTask).getSceneName());
List<RetrySceneConfig> retrySceneConfigs = accessTemplate.getSceneConfigAccess()
.list(new LambdaQueryWrapper<RetrySceneConfig>()
.select(RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval, RetrySceneConfig::getSceneName,
RetrySceneConfig::getCbTriggerType, RetrySceneConfig::getCbTriggerInterval)
.select(RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval,
RetrySceneConfig::getSceneName, RetrySceneConfig::getCbTriggerType,
RetrySceneConfig::getCbTriggerInterval, RetrySceneConfig::getExecutorTimeout)
.in(RetrySceneConfig::getSceneName, sceneNameSet));
return StreamUtils.toIdentityMap(retrySceneConfigs, RetrySceneConfig::getSceneName);
}

View File

@ -19,6 +19,7 @@ import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMappe
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.helpers.FormattingTuple;
import org.slf4j.helpers.MessageFormatter;
@ -40,16 +41,9 @@ import java.util.Objects;
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class CallbackRetryTaskHandler {
private static final String CALLBACK_UNIQUE_ID_RULE = "{}_{}";
@Autowired
protected AccessTemplate accessTemplate;
@Autowired
private RetryTaskMapper retryTaskMapper;
@Autowired
private SystemProperties systemProperties;
private final AccessTemplate accessTemplate;
/**
* 创建回调数据
@ -79,6 +73,7 @@ public class CallbackRetryTaskHandler {
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getCbTriggerType());
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
waitStrategyContext.setNextTriggerAt(DateUtils.toNowMilli());
waitStrategyContext.setDelayLevel(1);
waitStrategyContext.setTriggerInterval(String.valueOf(triggerInterval));
callbackRetry.setNextTriggerAt(waitStrategy.computeTriggerTime(waitStrategyContext));

View File

@ -60,7 +60,7 @@ public class ReportDispatchResultHttpRequestHandler extends PostHttpRequestHandl
RetryResultStatusEnum statusEnum = RetryResultStatusEnum.getRetryResultStatusEnum(resultDTO.getStatusCode());
Assert.notNull(statusEnum, () -> new SnailJobServerException("status code is invalid"));
executorResultDTO.setResultStatus(statusEnum);
executorResultDTO.setIncrementRetryCount(true);
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
actorRef.tell(executorResultDTO, actorRef);

View File

@ -18,6 +18,7 @@ import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMappe
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@ -58,7 +59,9 @@ public class RetryFailureHandler extends AbstractRetryResultHandler {
accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(
context.getGroupName(), context.getSceneName(), context.getNamespaceId());
Retry retry = retryMapper.selectById(context.getRetryId());
Retry retry = retryMapper.selectOne(new LambdaQueryWrapper<Retry>()
.select(Retry::getId, Retry::getRetryCount)
.eq(Retry::getId, context.getRetryId()));
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
@ -70,17 +73,22 @@ public class RetryFailureHandler extends AbstractRetryResultHandler {
maxRetryCount = retrySceneConfig.getMaxRetryCount();
}
if (maxRetryCount <= retry.getRetryCount()) {
if (maxRetryCount <= retry.getRetryCount() + 1) {
retry.setRetryStatus(RetryStatusEnum.MAX_COUNT.getStatus());
retry.setUpdateDt(LocalDateTime.now());
Assert.isTrue(1 == retryMapper.updateById(retry),
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName()));
// 创建一个回调任务
callbackRetryTaskHandler.create(retry, retrySceneConfig);
}
} else if (context.isIncrementRetryCount()) {
retry.setRetryCount(retry.getRetryCount() + 1);
retry.setUpdateDt(LocalDateTime.now());
Assert.isTrue(1 == retryMapper.updateById(retry),
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName()));
}
RetryTask retryTask = new RetryTask();
retryTask.setId(context.getRetryTaskId());
retryTask.setTaskStatus(RetryTaskStatusEnum.FAIL.getStatus());

View File

@ -19,6 +19,8 @@ public class RetryResultContext extends BaseDTO {
private RetryResultStatusEnum resultStatus;
private boolean incrementRetryCount;
private String resultJson;
private String idempotentId;
private String exceptionMsg;

View File

@ -14,10 +14,6 @@ public class RetryTaskLogMessageQueryVO extends BaseQueryVO {
private String groupName;
// private String uniqueId;
private Long retryId;
private Long retryTaskId;
private Long startId;

View File

@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.web.model.response;
import lombok.Data;
import java.time.LocalDateTime;
import java.util.List;
/**
* @author opensnail
@ -30,7 +31,7 @@ public class RetryResponseVO {
private String executorName;
private Long nextTriggerAt;
private LocalDateTime nextTriggerAt;
private Integer retryCount;
@ -42,6 +43,6 @@ public class RetryResponseVO {
private LocalDateTime updateDt;
private RetryResponseVO children;
private List<RetryResponseVO> children;
}

View File

@ -1,11 +1,16 @@
package com.aizuda.snailjob.server.web.service.convert;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.web.model.response.RetryResponseVO;
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Mappings;
import org.mapstruct.factory.Mappers;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
/**
* @author opensnail
@ -17,7 +22,18 @@ public interface RetryTaskResponseVOConverter {
RetryTaskResponseVOConverter INSTANCE = Mappers.getMapper(RetryTaskResponseVOConverter.class);
@Mappings({
@Mapping(target = "nextTriggerAt", expression = "java(RetryTaskResponseVOConverter.toLocalDateTime(retry.getNextTriggerAt()))")
})
RetryResponseVO convert(Retry retry);
List<RetryResponseVO> convertList(List<Retry> retries);
static LocalDateTime toLocalDateTime(Long nextTriggerAt) {
if (Objects.isNull(nextTriggerAt) || nextTriggerAt == 0) {
return null;
}
return DateUtils.toLocalDateTime(nextTriggerAt);
}
}

View File

@ -72,7 +72,7 @@ public class RetryTaskLogServiceImpl implements RetryTaskLogService {
@Override
public RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage(
RetryTaskLogMessageQueryVO queryVO) {
if (queryVO.getRetryId() == null || StrUtil.isBlank(queryVO.getGroupName())) {
if (queryVO.getRetryTaskId() == null || StrUtil.isBlank(queryVO.getGroupName())) {
RetryTaskLogMessageResponseVO jobLogResponseVO = new RetryTaskLogMessageResponseVO();
jobLogResponseVO.setNextStartId(0L);
jobLogResponseVO.setFromIndex(0);
@ -87,7 +87,6 @@ public class RetryTaskLogServiceImpl implements RetryTaskLogService {
.select(RetryTaskLogMessage::getId, RetryTaskLogMessage::getLogNum)
.ge(RetryTaskLogMessage::getId, queryVO.getStartId())
.eq(RetryTaskLogMessage::getNamespaceId, namespaceId)
.eq(RetryTaskLogMessage::getRetryId, queryVO.getRetryId())
.eq(RetryTaskLogMessage::getRetryTaskId, queryVO.getRetryTaskId())
.eq(RetryTaskLogMessage::getGroupName, queryVO.getGroupName())
.orderByAsc(RetryTaskLogMessage::getId).orderByAsc(RetryTaskLogMessage::getRealTime)

View File

@ -42,6 +42,7 @@ import com.aizuda.snailjob.template.datasource.persistence.po.*;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
@ -93,21 +94,28 @@ public class RetryTaskServiceImpl implements RetryTaskService {
.eq(StrUtil.isNotBlank(queryVO.getIdempotentId()), Retry::getIdempotentId, queryVO.getIdempotentId())
.eq(Objects.nonNull(queryVO.getRetryId()), Retry::getId, queryVO.getRetryId())
.eq(Objects.nonNull(queryVO.getRetryStatus()), Retry::getRetryStatus, queryVO.getRetryStatus())
.eq(Retry::getTaskType, SyetemTaskTypeEnum.RETRY.getType())
.select(Retry::getId, Retry::getBizNo, Retry::getIdempotentId,
Retry::getGroupName, Retry::getNextTriggerAt, Retry::getRetryCount,
Retry::getRetryStatus, Retry::getUpdateDt, Retry::getSceneName,
Retry::getTaskType)
Retry::getTaskType, Retry::getParentId)
.orderByDesc(Retry::getCreateDt);
pageDTO = accessTemplate.getRetryAccess().listPage(pageDTO, queryWrapper);
Set<Long> ids = StreamUtils.toSet(pageDTO.getRecords(), Retry::getId);
List<Retry> callbackTaskList = accessTemplate.getRetryAccess().list(new LambdaQueryWrapper<Retry>().eq(Retry::getParentId, ids));
List<Retry> callbackTaskList = accessTemplate.getRetryAccess().list(new LambdaQueryWrapper<Retry>()
.in(Retry::getParentId, ids));
Map<Long, Retry> callbackMap = StreamUtils.toIdentityMap(callbackTaskList, Retry::getParentId);
List<RetryResponseVO> retryResponseList = RetryTaskResponseVOConverter.INSTANCE.convertList(pageDTO.getRecords());
for (RetryResponseVO retryResponseVO : retryResponseList) {
retryResponseVO.setChildren(RetryTaskResponseVOConverter.INSTANCE.convert(callbackMap.get(retryResponseVO.getId())));
RetryResponseVO responseVO = RetryTaskResponseVOConverter.INSTANCE.convert(callbackMap.get(retryResponseVO.getId()));
if (Objects.isNull(responseVO)) {
retryResponseVO.setChildren(Lists.newArrayList());
} else {
retryResponseVO.setChildren(Lists.newArrayList(responseVO));
}
}
return new PageResult<>(pageDTO, retryResponseList);