fix(1.5.0-beta1): 完成重试接入

This commit is contained in:
opensnail 2025-04-19 17:08:45 +08:00
parent 33b6f35916
commit cfb1f72967
2 changed files with 10 additions and 7 deletions

View File

@ -15,6 +15,7 @@ import com.aizuda.snailjob.server.retry.task.dto.RetryTaskGeneratorDTO;
import com.aizuda.snailjob.server.retry.task.support.block.BlockStrategyContext; import com.aizuda.snailjob.server.retry.task.support.block.BlockStrategyContext;
import com.aizuda.snailjob.server.retry.task.support.result.RetryResultContext; import com.aizuda.snailjob.server.retry.task.support.result.RetryResultContext;
import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimerContext; import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimerContext;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO;
import com.aizuda.snailjob.template.datasource.persistence.po.*; import com.aizuda.snailjob.template.datasource.persistence.po.*;
import org.mapstruct.Mapper; import org.mapstruct.Mapper;
import org.mapstruct.Mapping; import org.mapstruct.Mapping;
@ -87,7 +88,7 @@ public interface RetryTaskConverter {
return new HashSet<>(JsonUtil.parseList(notifyRecipientIdsStr, Long.class)); return new HashSet<>(JsonUtil.parseList(notifyRecipientIdsStr, Long.class));
} }
RetryTaskLogMessage toRetryTaskLogMessage(RetryLogTaskDTO retryLogTaskDTO); RetryTaskLogMessageDO toRetryTaskLogMessage(RetryLogTaskDTO retryLogTaskDTO);
@Mapping(target = "timestamp", expression = "java(DateUtils.toNowMilli())") @Mapping(target = "timestamp", expression = "java(DateUtils.toNowMilli())")
RetryLogMetaDTO toLogMetaDTO(Retry retry); RetryLogMetaDTO toLogMetaDTO(Retry retry);

View File

@ -1,5 +1,7 @@
package com.aizuda.snailjob.server.retry.task.support.dispatch; package com.aizuda.snailjob.server.retry.task.support.dispatch;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO;
import org.apache.pekko.actor.AbstractActor; import org.apache.pekko.actor.AbstractActor;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
@ -31,7 +33,7 @@ import java.util.stream.Collectors;
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@RequiredArgsConstructor @RequiredArgsConstructor
public class RetryLogActor extends AbstractActor { public class RetryLogActor extends AbstractActor {
private final RetryTaskLogMessageMapper retryTaskLogMessageMapper; private final AccessTemplate accessTemplate;
@Override @Override
public Receive createReceive() { public Receive createReceive() {
@ -54,9 +56,9 @@ public class RetryLogActor extends AbstractActor {
List<RetryLogTaskDTO> jobLogTasks = list; List<RetryLogTaskDTO> jobLogTasks = list;
Map<Long, List<RetryLogTaskDTO>> logTaskDTOMap = jobLogTasks. Map<Long, List<RetryLogTaskDTO>> logTaskDTOMap = jobLogTasks.
stream().collect(Collectors.groupingBy(RetryLogTaskDTO::getRetryTaskId, Collectors.toList())); stream().collect(Collectors.groupingBy(RetryLogTaskDTO::getRetryTaskId, Collectors.toList()));
List<RetryTaskLogMessage> retryTaskLogMessages = new ArrayList<>(); List<RetryTaskLogMessageDO> retryTaskLogMessages = new ArrayList<>();
for (List<RetryLogTaskDTO> logTaskDTOList : logTaskDTOMap.values()) { for (List<RetryLogTaskDTO> logTaskDTOList : logTaskDTOMap.values()) {
RetryTaskLogMessage retryTaskLogMessage = RetryTaskConverter.INSTANCE.toRetryTaskLogMessage( RetryTaskLogMessageDO retryTaskLogMessage = RetryTaskConverter.INSTANCE.toRetryTaskLogMessage(
logTaskDTOList.get(0)); logTaskDTOList.get(0));
retryTaskLogMessage.setCreateDt(LocalDateTime.now()); retryTaskLogMessage.setCreateDt(LocalDateTime.now());
retryTaskLogMessage.setLogNum(logTaskDTOList.size()); retryTaskLogMessage.setLogNum(logTaskDTOList.size());
@ -70,7 +72,7 @@ public class RetryLogActor extends AbstractActor {
retryTaskLogMessages.add(retryTaskLogMessage); retryTaskLogMessages.add(retryTaskLogMessage);
} }
retryTaskLogMessageMapper.insertBatch(retryTaskLogMessages); accessTemplate.getRetryTaskLogMessageAccess().insertBatch(retryTaskLogMessages);
} }
/** /**
@ -79,7 +81,7 @@ public class RetryLogActor extends AbstractActor {
private void saveRetryTaskLogMessage(final RetryTaskLogDTO retryTaskLogDTO) { private void saveRetryTaskLogMessage(final RetryTaskLogDTO retryTaskLogDTO) {
// 记录重试日志 // 记录重试日志
RetryTaskLogMessage retryTaskLogMessage = new RetryTaskLogMessage(); RetryTaskLogMessageDO retryTaskLogMessage = new RetryTaskLogMessageDO();
retryTaskLogMessage.setRetryId(retryTaskLogDTO.getRetryId()); retryTaskLogMessage.setRetryId(retryTaskLogDTO.getRetryId());
retryTaskLogMessage.setRetryTaskId(retryTaskLogDTO.getRetryTaskId()); retryTaskLogMessage.setRetryTaskId(retryTaskLogDTO.getRetryTaskId());
retryTaskLogMessage.setGroupName(retryTaskLogDTO.getGroupName()); retryTaskLogMessage.setGroupName(retryTaskLogDTO.getGroupName());
@ -90,7 +92,7 @@ public class RetryLogActor extends AbstractActor {
retryTaskLogMessage.setMessage( retryTaskLogMessage.setMessage(
StrUtil.isBlank(errorMessage) ? StrUtil.EMPTY : errorMessage); StrUtil.isBlank(errorMessage) ? StrUtil.EMPTY : errorMessage);
retryTaskLogMessage.setCreateDt(Optional.ofNullable(retryTaskLogDTO.getTriggerTime()).orElse(LocalDateTime.now())); retryTaskLogMessage.setCreateDt(Optional.ofNullable(retryTaskLogDTO.getTriggerTime()).orElse(LocalDateTime.now()));
retryTaskLogMessageMapper.insert(retryTaskLogMessage); accessTemplate.getRetryTaskLogMessageAccess().insert(retryTaskLogMessage);
} }
} }