feat(1.5.0-beta1): 调试任务重试
This commit is contained in:
parent
8916f3dfb8
commit
de25b70764
@ -23,11 +23,11 @@ public class Retry extends CreateUpdateDt {
|
||||
|
||||
private String groupName;
|
||||
|
||||
private String groupId;
|
||||
private Long groupId;
|
||||
|
||||
private String sceneName;
|
||||
|
||||
private String sceneId;
|
||||
private Long sceneId;
|
||||
|
||||
private String idempotentId;
|
||||
|
||||
|
@ -151,7 +151,7 @@ public class ScanRetryActor extends AbstractActor {
|
||||
.select(RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval,
|
||||
RetrySceneConfig::getBlockStrategy, RetrySceneConfig::getSceneName,
|
||||
RetrySceneConfig::getCbTriggerType, RetrySceneConfig::getCbTriggerInterval,
|
||||
RetrySceneConfig::getExecutorTimeout)
|
||||
RetrySceneConfig::getExecutorTimeout, RetrySceneConfig::getId)
|
||||
.eq(RetrySceneConfig::getSceneStatus, StatusEnum.YES.getStatus())
|
||||
.in(RetrySceneConfig::getId, sceneIdSet));
|
||||
return StreamUtils.toIdentityMap(retrySceneConfigs, RetrySceneConfig::getId);
|
||||
@ -203,7 +203,8 @@ public class ScanRetryActor extends AbstractActor {
|
||||
.listPage(new PageDTO<>(0, systemProperties.getRetryPullPageSize(), Boolean.FALSE),
|
||||
new LambdaQueryWrapper<Retry>()
|
||||
.select(Retry::getId, Retry::getNextTriggerAt, Retry::getGroupName, Retry::getRetryCount,
|
||||
Retry::getSceneName, Retry::getNamespaceId, Retry::getTaskType)
|
||||
Retry::getSceneName, Retry::getNamespaceId, Retry::getTaskType,
|
||||
Retry::getSceneId, Retry::getGroupId)
|
||||
.eq(Retry::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())
|
||||
.in(Retry::getBucketIndex, buckets)
|
||||
.le(Retry::getNextTriggerAt, DateUtils.toNowMilli() + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD))
|
||||
@ -219,7 +220,7 @@ public class ScanRetryActor extends AbstractActor {
|
||||
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
|
||||
.in(GroupConfig::getId, StreamUtils.toSet(retries, Retry::getGroupId))),
|
||||
GroupConfig::getId);
|
||||
retries = retries.stream().filter(retry -> groupConfigs.contains(retry.getId())).collect(Collectors.toList());
|
||||
retries = retries.stream().filter(retry -> groupConfigs.contains(retry.getGroupId())).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
return RetryTaskConverter.INSTANCE.toRetryPartitionTasks(retries);
|
||||
|
@ -43,6 +43,7 @@ public class RetryTaskStopHandler {
|
||||
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(stopJobDTO);
|
||||
executorResultDTO.setExceptionMsg(stopJobDTO.getMessage());
|
||||
executorResultDTO.setTaskStatus(RetryTaskStatusEnum.FAIL.getStatus());
|
||||
executorResultDTO.setIncrementRetryCount(true);
|
||||
executorResultDTO.setOperationReason(stopJobDTO.getOperationReason());
|
||||
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
|
||||
actorRef.tell(executorResultDTO, actorRef);
|
||||
|
Loading…
Reference in New Issue
Block a user