feat(1.4.0-beta1): 1.客户端执行上报添加失败原因
This commit is contained in:
parent
35803d7fde
commit
7c80975811
@ -28,6 +28,8 @@ public enum RetryOperationReasonEnum {
|
|||||||
MANNER_STOP(7, "手动停止"),
|
MANNER_STOP(7, "手动停止"),
|
||||||
NOT_RUNNING_RETRY(8, "当前重试非运行中"),
|
NOT_RUNNING_RETRY(8, "当前重试非运行中"),
|
||||||
SCENE_CLOSED(9, "当前场景已关闭"),
|
SCENE_CLOSED(9, "当前场景已关闭"),
|
||||||
|
RETRY_FAIL(10, "重试失败"),
|
||||||
|
CLIENT_TRIGGER_RETRY_STOP(11, "客户端触发任务停止"),
|
||||||
;
|
;
|
||||||
|
|
||||||
private final int reason;
|
private final int reason;
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package com.aizuda.snailjob.server.retry.task.dto;
|
package com.aizuda.snailjob.server.retry.task.dto;
|
||||||
|
|
||||||
|
import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum;
|
||||||
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
|
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.EqualsAndHashCode;
|
import lombok.EqualsAndHashCode;
|
||||||
@ -16,7 +17,8 @@ import lombok.EqualsAndHashCode;
|
|||||||
@Data
|
@Data
|
||||||
public class RetryExecutorResultDTO extends BaseDTO {
|
public class RetryExecutorResultDTO extends BaseDTO {
|
||||||
|
|
||||||
private RetryResultStatusEnum resultStatus;
|
private Integer resultStatus;
|
||||||
|
private Integer operationReason;
|
||||||
private boolean incrementRetryCount;
|
private boolean incrementRetryCount;
|
||||||
private String resultJson;
|
private String resultJson;
|
||||||
private Integer statusCode;
|
private Integer statusCode;
|
||||||
|
@ -188,7 +188,7 @@ public class RequestCallbackClientActor extends AbstractActor {
|
|||||||
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
|
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
|
||||||
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO);
|
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO);
|
||||||
executorResultDTO.setExceptionMsg(message);
|
executorResultDTO.setExceptionMsg(message);
|
||||||
executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE);
|
executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE.getStatus());
|
||||||
actorRef.tell(executorResultDTO, actorRef);
|
actorRef.tell(executorResultDTO, actorRef);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -182,7 +182,7 @@ public class RequestRetryClientActor extends AbstractActor {
|
|||||||
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
|
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
|
||||||
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO);
|
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO);
|
||||||
executorResultDTO.setExceptionMsg(message);
|
executorResultDTO.setExceptionMsg(message);
|
||||||
executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE);
|
executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE.getStatus());
|
||||||
actorRef.tell(executorResultDTO, actorRef);
|
actorRef.tell(executorResultDTO, actorRef);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5,6 +5,7 @@ import akka.actor.ActorRef;
|
|||||||
import cn.hutool.core.collection.CollUtil;
|
import cn.hutool.core.collection.CollUtil;
|
||||||
import com.aizuda.snailjob.common.core.constant.SystemConstants;
|
import com.aizuda.snailjob.common.core.constant.SystemConstants;
|
||||||
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
|
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
|
||||||
|
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
||||||
import com.aizuda.snailjob.common.core.util.StreamUtils;
|
import com.aizuda.snailjob.common.core.util.StreamUtils;
|
||||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||||
import com.aizuda.snailjob.server.common.WaitStrategy;
|
import com.aizuda.snailjob.server.common.WaitStrategy;
|
||||||
@ -19,8 +20,12 @@ import com.aizuda.snailjob.server.common.util.PartitionTaskUtils;
|
|||||||
import com.aizuda.snailjob.server.retry.task.dto.RetryPartitionTask;
|
import com.aizuda.snailjob.server.retry.task.dto.RetryPartitionTask;
|
||||||
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskPrepareDTO;
|
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskPrepareDTO;
|
||||||
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
|
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
|
||||||
|
import com.aizuda.snailjob.server.retry.task.support.handler.RateLimiterHandler;
|
||||||
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
|
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
|
||||||
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper;
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper;
|
||||||
|
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
|
||||||
|
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
|
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
|
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
@ -34,6 +39,7 @@ import org.springframework.stereotype.Component;
|
|||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
@ -51,16 +57,12 @@ public class ScanRetryActor extends AbstractActor {
|
|||||||
private final SystemProperties systemProperties;
|
private final SystemProperties systemProperties;
|
||||||
private final AccessTemplate accessTemplate;
|
private final AccessTemplate accessTemplate;
|
||||||
private final RetryMapper retryMapper;
|
private final RetryMapper retryMapper;
|
||||||
private final static RateLimiter rateLimiter = RateLimiter.create(1000, 1, TimeUnit.SECONDS);
|
private final GroupConfigMapper groupConfigMapper;
|
||||||
|
private final RateLimiterHandler rateLimiterHandler;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Receive createReceive() {
|
public Receive createReceive() {
|
||||||
return receiveBuilder().match(ScanTask.class, config -> {
|
return receiveBuilder().match(ScanTask.class, config -> {
|
||||||
// 覆盖每秒产生多少个令牌的值
|
|
||||||
double permitsPerSecond = systemProperties.getMaxDispatchCapacity();
|
|
||||||
if (permitsPerSecond >= 1 && permitsPerSecond != rateLimiter.getRate()) {
|
|
||||||
rateLimiter.setRate(permitsPerSecond);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
doScan(config);
|
doScan(config);
|
||||||
@ -77,8 +79,8 @@ public class ScanRetryActor extends AbstractActor {
|
|||||||
listAvailableTasks(startId, scanTask.getBuckets()),
|
listAvailableTasks(startId, scanTask.getBuckets()),
|
||||||
partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask),
|
partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask),
|
||||||
partitionTasks -> {
|
partitionTasks -> {
|
||||||
if (!rateLimiter.tryAcquire(partitionTasks.size())) {
|
if (CollUtil.isNotEmpty(partitionTasks) && !rateLimiterHandler.tryAcquire(partitionTasks.size())) {
|
||||||
log.warn("当前节点触发限流 [{}]", rateLimiter.getRate());
|
log.warn("当前节点触发限流");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
@ -136,6 +138,7 @@ public class ScanRetryActor extends AbstractActor {
|
|||||||
.select(RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval,
|
.select(RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval,
|
||||||
RetrySceneConfig::getSceneName, RetrySceneConfig::getCbTriggerType,
|
RetrySceneConfig::getSceneName, RetrySceneConfig::getCbTriggerType,
|
||||||
RetrySceneConfig::getCbTriggerInterval, RetrySceneConfig::getExecutorTimeout)
|
RetrySceneConfig::getCbTriggerInterval, RetrySceneConfig::getExecutorTimeout)
|
||||||
|
.eq(RetrySceneConfig::getSceneStatus, StatusEnum.YES.getStatus())
|
||||||
.in(RetrySceneConfig::getSceneName, sceneNameSet));
|
.in(RetrySceneConfig::getSceneName, sceneNameSet));
|
||||||
return StreamUtils.toIdentityMap(retrySceneConfigs, RetrySceneConfig::getSceneName);
|
return StreamUtils.toIdentityMap(retrySceneConfigs, RetrySceneConfig::getSceneName);
|
||||||
}
|
}
|
||||||
@ -193,6 +196,16 @@ public class ScanRetryActor extends AbstractActor {
|
|||||||
.orderByAsc(Retry::getId))
|
.orderByAsc(Retry::getId))
|
||||||
.getRecords();
|
.getRecords();
|
||||||
|
|
||||||
|
// 过滤已关闭的组
|
||||||
|
if (CollUtil.isNotEmpty(retries)) {
|
||||||
|
List<String> groupConfigs = StreamUtils.toList(groupConfigMapper.selectList(new LambdaQueryWrapper<GroupConfig>()
|
||||||
|
.select(GroupConfig::getGroupName)
|
||||||
|
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
|
||||||
|
.in(GroupConfig::getGroupName, StreamUtils.toSet(retries, Retry::getGroupName))),
|
||||||
|
GroupConfig::getGroupName);
|
||||||
|
retries = retries.stream().filter(retry -> groupConfigs.contains(retry.getGroupName())).collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
return RetryTaskConverter.INSTANCE.toRetryPartitionTasks(retries);
|
return RetryTaskConverter.INSTANCE.toRetryPartitionTasks(retries);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,49 @@
|
|||||||
|
package com.aizuda.snailjob.server.retry.task.support.handler;
|
||||||
|
|
||||||
|
import com.aizuda.snailjob.server.common.config.SystemProperties;
|
||||||
|
import com.google.common.util.concurrent.RateLimiter;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import org.springframework.beans.factory.InitializingBean;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
*
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @author opensnail
|
||||||
|
* @date 2025-02-20
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class RateLimiterHandler implements InitializingBean {
|
||||||
|
private final SystemProperties systemProperties;
|
||||||
|
private RateLimiter rateLimiter;
|
||||||
|
|
||||||
|
public boolean tryAcquire(int permits) {
|
||||||
|
return rateLimiter.tryAcquire(permits, 0L, TimeUnit.MICROSECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void refreshRate( ) {
|
||||||
|
int maxDispatchCapacity = systemProperties.getMaxDispatchCapacity();
|
||||||
|
if (maxDispatchCapacity == rateLimiter.getRate()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
rateLimiter.setRate(maxDispatchCapacity);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void refreshRate(int maxDispatchCapacity ) {
|
||||||
|
if (maxDispatchCapacity == rateLimiter.getRate()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
rateLimiter.setRate(maxDispatchCapacity);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterPropertiesSet() throws Exception {
|
||||||
|
rateLimiter = RateLimiter.create(systemProperties.getMaxDispatchCapacity(), 1, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
}
|
@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.retry.task.support.handler;
|
|||||||
|
|
||||||
import akka.actor.ActorRef;
|
import akka.actor.ActorRef;
|
||||||
import cn.hutool.core.lang.Assert;
|
import cn.hutool.core.lang.Assert;
|
||||||
|
import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum;
|
||||||
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
|
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
|
||||||
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
|
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
|
||||||
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
|
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
|
||||||
@ -35,11 +36,11 @@ public class RetryTaskStopHandler {
|
|||||||
*/
|
*/
|
||||||
public void stop(TaskStopJobDTO stopJobDTO) {
|
public void stop(TaskStopJobDTO stopJobDTO) {
|
||||||
|
|
||||||
RetryTask retryTask = new RetryTask();
|
// RetryTask retryTask = new RetryTask();
|
||||||
retryTask.setId(stopJobDTO.getRetryTaskId());
|
// retryTask.setId(stopJobDTO.getRetryTaskId());
|
||||||
retryTask.setTaskStatus(RetryTaskStatusEnum.STOP.getStatus());
|
// retryTask.setTaskStatus(RetryTaskStatusEnum.STOP.getStatus());
|
||||||
retryTask.setOperationReason(stopJobDTO.getOperationReason());
|
// retryTask.setOperationReason(stopJobDTO.getOperationReason());
|
||||||
Assert.isTrue(1 == retryTaskMapper.updateById(retryTask), () -> new SnailJobServerException("update retry task failed"));
|
// Assert.isTrue(1 == retryTaskMapper.updateById(retryTask), () -> new SnailJobServerException("update retry task failed"));
|
||||||
|
|
||||||
RequestRetryExecutorDTO executorDTO = RetryTaskConverter.INSTANCE.toRealRetryExecutorDTO(stopJobDTO);
|
RequestRetryExecutorDTO executorDTO = RetryTaskConverter.INSTANCE.toRealRetryExecutorDTO(stopJobDTO);
|
||||||
ActorRef actorRef = ActorGenerator.stopRetryTaskActor();
|
ActorRef actorRef = ActorGenerator.stopRetryTaskActor();
|
||||||
@ -55,7 +56,8 @@ public class RetryTaskStopHandler {
|
|||||||
}
|
}
|
||||||
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(stopJobDTO);
|
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(stopJobDTO);
|
||||||
executorResultDTO.setExceptionMsg(stopJobDTO.getMessage());
|
executorResultDTO.setExceptionMsg(stopJobDTO.getMessage());
|
||||||
executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE);
|
executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE.getStatus());
|
||||||
|
executorResultDTO.setOperationReason(stopJobDTO.getOperationReason());
|
||||||
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
|
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
|
||||||
actorRef.tell(executorResultDTO, actorRef);
|
actorRef.tell(executorResultDTO, actorRef);
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,7 @@ import akka.actor.ActorRef;
|
|||||||
import cn.hutool.core.lang.Assert;
|
import cn.hutool.core.lang.Assert;
|
||||||
import cn.hutool.core.net.url.UrlQuery;
|
import cn.hutool.core.net.url.UrlQuery;
|
||||||
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
|
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
|
||||||
|
import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum;
|
||||||
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
|
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
|
||||||
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
||||||
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
|
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
|
||||||
@ -59,8 +60,14 @@ public class ReportDispatchResultHttpRequestHandler extends PostHttpRequestHandl
|
|||||||
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(resultDTO);
|
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(resultDTO);
|
||||||
RetryResultStatusEnum statusEnum = RetryResultStatusEnum.getRetryResultStatusEnum(resultDTO.getStatusCode());
|
RetryResultStatusEnum statusEnum = RetryResultStatusEnum.getRetryResultStatusEnum(resultDTO.getStatusCode());
|
||||||
Assert.notNull(statusEnum, () -> new SnailJobServerException("status code is invalid"));
|
Assert.notNull(statusEnum, () -> new SnailJobServerException("status code is invalid"));
|
||||||
executorResultDTO.setResultStatus(statusEnum);
|
executorResultDTO.setResultStatus(statusEnum.getStatus());
|
||||||
executorResultDTO.setIncrementRetryCount(true);
|
executorResultDTO.setIncrementRetryCount(true);
|
||||||
|
if (RetryResultStatusEnum.FAILURE.getStatus().equals(statusEnum.getStatus())) {
|
||||||
|
executorResultDTO.setOperationReason(RetryOperationReasonEnum.RETRY_FAIL.getReason());
|
||||||
|
} else if (RetryResultStatusEnum.STOP.getStatus().equals(statusEnum.getStatus())) {
|
||||||
|
executorResultDTO.setOperationReason(RetryOperationReasonEnum.CLIENT_TRIGGER_RETRY_STOP.getReason());
|
||||||
|
}
|
||||||
|
|
||||||
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
|
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
|
||||||
actorRef.tell(executorResultDTO, actorRef);
|
actorRef.tell(executorResultDTO, actorRef);
|
||||||
|
|
||||||
|
@ -27,6 +27,7 @@ import org.springframework.transaction.support.TransactionCallbackWithoutResult;
|
|||||||
import org.springframework.transaction.support.TransactionTemplate;
|
import org.springframework.transaction.support.TransactionTemplate;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
import static com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum.RETRY_TASK_FAIL_ERROR;
|
import static com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum.RETRY_TASK_FAIL_ERROR;
|
||||||
|
|
||||||
@ -50,7 +51,7 @@ public class RetryFailureHandler extends AbstractRetryResultHandler {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean supports(RetryResultContext context) {
|
public boolean supports(RetryResultContext context) {
|
||||||
return RetryResultStatusEnum.FAILURE == context.getResultStatus();
|
return Objects.equals(RetryResultStatusEnum.FAILURE.getStatus(), context.getResultStatus());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -92,6 +93,7 @@ public class RetryFailureHandler extends AbstractRetryResultHandler {
|
|||||||
RetryTask retryTask = new RetryTask();
|
RetryTask retryTask = new RetryTask();
|
||||||
retryTask.setId(context.getRetryTaskId());
|
retryTask.setId(context.getRetryTaskId());
|
||||||
retryTask.setTaskStatus(RetryTaskStatusEnum.FAIL.getStatus());
|
retryTask.setTaskStatus(RetryTaskStatusEnum.FAIL.getStatus());
|
||||||
|
retryTask.setOperationReason(context.getOperationReason());
|
||||||
Assert.isTrue(1 == retryTaskMapper.updateById(retryTask),
|
Assert.isTrue(1 == retryTaskMapper.updateById(retryTask),
|
||||||
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName()));
|
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName()));
|
||||||
|
|
||||||
|
@ -17,10 +17,9 @@ import lombok.EqualsAndHashCode;
|
|||||||
@Data
|
@Data
|
||||||
public class RetryResultContext extends BaseDTO {
|
public class RetryResultContext extends BaseDTO {
|
||||||
|
|
||||||
private RetryResultStatusEnum resultStatus;
|
private Integer resultStatus;
|
||||||
|
private Integer operationReason;
|
||||||
private boolean incrementRetryCount;
|
private boolean incrementRetryCount;
|
||||||
|
|
||||||
private String resultJson;
|
private String resultJson;
|
||||||
private String idempotentId;
|
private String idempotentId;
|
||||||
private String exceptionMsg;
|
private String exceptionMsg;
|
||||||
|
@ -21,6 +21,7 @@ import org.springframework.transaction.support.TransactionCallbackWithoutResult;
|
|||||||
import org.springframework.transaction.support.TransactionTemplate;
|
import org.springframework.transaction.support.TransactionTemplate;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
@ -43,7 +44,7 @@ public class RetryStopHandler extends AbstractRetryResultHandler {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean supports(RetryResultContext context) {
|
public boolean supports(RetryResultContext context) {
|
||||||
return RetryResultStatusEnum.STOP == context.getResultStatus();
|
return Objects.equals(RetryResultStatusEnum.STOP.getStatus(), context.getResultStatus());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -67,6 +68,7 @@ public class RetryStopHandler extends AbstractRetryResultHandler {
|
|||||||
|
|
||||||
RetryTask retryTask = new RetryTask();
|
RetryTask retryTask = new RetryTask();
|
||||||
retryTask.setId(context.getRetryTaskId());
|
retryTask.setId(context.getRetryTaskId());
|
||||||
|
retryTask.setOperationReason(context.getOperationReason());
|
||||||
retryTask.setTaskStatus(RetryTaskStatusEnum.STOP.getStatus());
|
retryTask.setTaskStatus(RetryTaskStatusEnum.STOP.getStatus());
|
||||||
Assert.isTrue(1 == retryTaskMapper.updateById(retryTask),
|
Assert.isTrue(1 == retryTaskMapper.updateById(retryTask),
|
||||||
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName()));
|
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName()));
|
||||||
|
@ -20,6 +20,7 @@ import org.springframework.transaction.support.TransactionCallbackWithoutResult;
|
|||||||
import org.springframework.transaction.support.TransactionTemplate;
|
import org.springframework.transaction.support.TransactionTemplate;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
@ -40,7 +41,7 @@ public class RetrySuccessHandler extends AbstractRetryResultHandler {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean supports(RetryResultContext context) {
|
public boolean supports(RetryResultContext context) {
|
||||||
return RetryResultStatusEnum.SUCCESS == context.getResultStatus();
|
return Objects.equals(RetryResultStatusEnum.SUCCESS.getStatus(), context.getResultStatus());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -9,6 +9,7 @@ import com.aizuda.snailjob.server.common.cache.CacheGroupScanActor;
|
|||||||
import com.aizuda.snailjob.server.common.config.SystemProperties;
|
import com.aizuda.snailjob.server.common.config.SystemProperties;
|
||||||
import com.aizuda.snailjob.server.common.dto.ScanTask;
|
import com.aizuda.snailjob.server.common.dto.ScanTask;
|
||||||
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
|
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
|
||||||
|
import com.aizuda.snailjob.server.retry.task.support.handler.RateLimiterHandler;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||||
@ -35,6 +36,7 @@ public class ConsumerBucketActor extends AbstractActor {
|
|||||||
private static final String DEFAULT_JOB_KEY = "DEFAULT_JOB_KEY";
|
private static final String DEFAULT_JOB_KEY = "DEFAULT_JOB_KEY";
|
||||||
private static final String DEFAULT_WORKFLOW_KEY = "DEFAULT_JOB_KEY";
|
private static final String DEFAULT_WORKFLOW_KEY = "DEFAULT_JOB_KEY";
|
||||||
private final SystemProperties systemProperties;
|
private final SystemProperties systemProperties;
|
||||||
|
private final RateLimiterHandler rateLimiterHandler;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Receive createReceive() {
|
public Receive createReceive() {
|
||||||
@ -62,6 +64,10 @@ public class ConsumerBucketActor extends AbstractActor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void doScanRetry(final ConsumerBucket consumerBucket) {
|
private void doScanRetry(final ConsumerBucket consumerBucket) {
|
||||||
|
|
||||||
|
// 刷新最新的配置
|
||||||
|
rateLimiterHandler.refreshRate();
|
||||||
|
|
||||||
ScanTask scanTask = new ScanTask();
|
ScanTask scanTask = new ScanTask();
|
||||||
// 通过并行度配置计算拉取范围
|
// 通过并行度配置计算拉取范围
|
||||||
List<List<Integer>> partitions = Lists.partition(new ArrayList<>(consumerBucket.getBuckets()), systemProperties.getRetryMaxPullParallel());
|
List<List<Integer>> partitions = Lists.partition(new ArrayList<>(consumerBucket.getBuckets()), systemProperties.getRetryMaxPullParallel());
|
||||||
|
Loading…
Reference in New Issue
Block a user