feat(1.4.0-beta1): 1.客户端执行上报添加失败原因

This commit is contained in:
opensnail 2025-02-20 23:44:13 +08:00
parent 30bc414248
commit 849e68f208
13 changed files with 109 additions and 24 deletions

View File

@ -28,6 +28,8 @@ public enum RetryOperationReasonEnum {
MANNER_STOP(7, "手动停止"),
NOT_RUNNING_RETRY(8, "当前重试非运行中"),
SCENE_CLOSED(9, "当前场景已关闭"),
RETRY_FAIL(10, "重试失败"),
CLIENT_TRIGGER_RETRY_STOP(11, "客户端触发任务停止"),
;
private final int reason;

View File

@ -1,5 +1,6 @@
package com.aizuda.snailjob.server.retry.task.dto;
import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
import lombok.Data;
import lombok.EqualsAndHashCode;
@ -16,7 +17,8 @@ import lombok.EqualsAndHashCode;
@Data
public class RetryExecutorResultDTO extends BaseDTO {
private RetryResultStatusEnum resultStatus;
private Integer resultStatus;
private Integer operationReason;
private boolean incrementRetryCount;
private String resultJson;
private Integer statusCode;

View File

@ -188,7 +188,7 @@ public class RequestCallbackClientActor extends AbstractActor {
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO);
executorResultDTO.setExceptionMsg(message);
executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE);
executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE.getStatus());
actorRef.tell(executorResultDTO, actorRef);
}
}

View File

@ -182,7 +182,7 @@ public class RequestRetryClientActor extends AbstractActor {
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO);
executorResultDTO.setExceptionMsg(message);
executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE);
executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE.getStatus());
actorRef.tell(executorResultDTO, actorRef);
}
}

View File

@ -5,6 +5,7 @@ import akka.actor.ActorRef;
import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.WaitStrategy;
@ -19,8 +20,12 @@ import com.aizuda.snailjob.server.common.util.PartitionTaskUtils;
import com.aizuda.snailjob.server.retry.task.dto.RetryPartitionTask;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskPrepareDTO;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.handler.RateLimiterHandler;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@ -34,6 +39,7 @@ import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* <p>
@ -51,16 +57,12 @@ public class ScanRetryActor extends AbstractActor {
private final SystemProperties systemProperties;
private final AccessTemplate accessTemplate;
private final RetryMapper retryMapper;
private final static RateLimiter rateLimiter = RateLimiter.create(1000, 1, TimeUnit.SECONDS);
private final GroupConfigMapper groupConfigMapper;
private final RateLimiterHandler rateLimiterHandler;
@Override
public Receive createReceive() {
return receiveBuilder().match(ScanTask.class, config -> {
// 覆盖每秒产生多少个令牌的值
double permitsPerSecond = systemProperties.getMaxDispatchCapacity();
if (permitsPerSecond >= 1 && permitsPerSecond != rateLimiter.getRate()) {
rateLimiter.setRate(permitsPerSecond);
}
try {
doScan(config);
@ -77,8 +79,8 @@ public class ScanRetryActor extends AbstractActor {
listAvailableTasks(startId, scanTask.getBuckets()),
partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask),
partitionTasks -> {
if (!rateLimiter.tryAcquire(partitionTasks.size())) {
log.warn("当前节点触发限流 [{}]", rateLimiter.getRate());
if (CollUtil.isNotEmpty(partitionTasks) && !rateLimiterHandler.tryAcquire(partitionTasks.size())) {
log.warn("当前节点触发限流");
return false;
}
return true;
@ -136,6 +138,7 @@ public class ScanRetryActor extends AbstractActor {
.select(RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval,
RetrySceneConfig::getSceneName, RetrySceneConfig::getCbTriggerType,
RetrySceneConfig::getCbTriggerInterval, RetrySceneConfig::getExecutorTimeout)
.eq(RetrySceneConfig::getSceneStatus, StatusEnum.YES.getStatus())
.in(RetrySceneConfig::getSceneName, sceneNameSet));
return StreamUtils.toIdentityMap(retrySceneConfigs, RetrySceneConfig::getSceneName);
}
@ -193,6 +196,16 @@ public class ScanRetryActor extends AbstractActor {
.orderByAsc(Retry::getId))
.getRecords();
// 过滤已关闭的组
if (CollUtil.isNotEmpty(retries)) {
List<String> groupConfigs = StreamUtils.toList(groupConfigMapper.selectList(new LambdaQueryWrapper<GroupConfig>()
.select(GroupConfig::getGroupName)
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
.in(GroupConfig::getGroupName, StreamUtils.toSet(retries, Retry::getGroupName))),
GroupConfig::getGroupName);
retries = retries.stream().filter(retry -> groupConfigs.contains(retry.getGroupName())).collect(Collectors.toList());
}
return RetryTaskConverter.INSTANCE.toRetryPartitionTasks(retries);
}
}

View File

@ -0,0 +1,49 @@
package com.aizuda.snailjob.server.retry.task.support.handler;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.google.common.util.concurrent.RateLimiter;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* <p>
*
* </p>
*
* @author opensnail
* @date 2025-02-20
*/
@Component
@RequiredArgsConstructor
public class RateLimiterHandler implements InitializingBean {
private final SystemProperties systemProperties;
private RateLimiter rateLimiter;
public boolean tryAcquire(int permits) {
return rateLimiter.tryAcquire(permits, 0L, TimeUnit.MICROSECONDS);
}
public void refreshRate( ) {
int maxDispatchCapacity = systemProperties.getMaxDispatchCapacity();
if (maxDispatchCapacity == rateLimiter.getRate()) {
return;
}
rateLimiter.setRate(maxDispatchCapacity);
}
public void refreshRate(int maxDispatchCapacity ) {
if (maxDispatchCapacity == rateLimiter.getRate()) {
return;
}
rateLimiter.setRate(maxDispatchCapacity);
}
@Override
public void afterPropertiesSet() throws Exception {
rateLimiter = RateLimiter.create(systemProperties.getMaxDispatchCapacity(), 1, TimeUnit.SECONDS);
}
}

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.retry.task.support.handler;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
@ -35,11 +36,11 @@ public class RetryTaskStopHandler {
*/
public void stop(TaskStopJobDTO stopJobDTO) {
RetryTask retryTask = new RetryTask();
retryTask.setId(stopJobDTO.getRetryTaskId());
retryTask.setTaskStatus(RetryTaskStatusEnum.STOP.getStatus());
retryTask.setOperationReason(stopJobDTO.getOperationReason());
Assert.isTrue(1 == retryTaskMapper.updateById(retryTask), () -> new SnailJobServerException("update retry task failed"));
// RetryTask retryTask = new RetryTask();
// retryTask.setId(stopJobDTO.getRetryTaskId());
// retryTask.setTaskStatus(RetryTaskStatusEnum.STOP.getStatus());
// retryTask.setOperationReason(stopJobDTO.getOperationReason());
// Assert.isTrue(1 == retryTaskMapper.updateById(retryTask), () -> new SnailJobServerException("update retry task failed"));
RequestRetryExecutorDTO executorDTO = RetryTaskConverter.INSTANCE.toRealRetryExecutorDTO(stopJobDTO);
ActorRef actorRef = ActorGenerator.stopRetryTaskActor();
@ -55,7 +56,8 @@ public class RetryTaskStopHandler {
}
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(stopJobDTO);
executorResultDTO.setExceptionMsg(stopJobDTO.getMessage());
executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE);
executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE.getStatus());
executorResultDTO.setOperationReason(stopJobDTO.getOperationReason());
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
actorRef.tell(executorResultDTO, actorRef);
}

View File

@ -4,6 +4,7 @@ import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
@ -59,8 +60,14 @@ public class ReportDispatchResultHttpRequestHandler extends PostHttpRequestHandl
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(resultDTO);
RetryResultStatusEnum statusEnum = RetryResultStatusEnum.getRetryResultStatusEnum(resultDTO.getStatusCode());
Assert.notNull(statusEnum, () -> new SnailJobServerException("status code is invalid"));
executorResultDTO.setResultStatus(statusEnum);
executorResultDTO.setResultStatus(statusEnum.getStatus());
executorResultDTO.setIncrementRetryCount(true);
if (RetryResultStatusEnum.FAILURE.getStatus().equals(statusEnum.getStatus())) {
executorResultDTO.setOperationReason(RetryOperationReasonEnum.RETRY_FAIL.getReason());
} else if (RetryResultStatusEnum.STOP.getStatus().equals(statusEnum.getStatus())) {
executorResultDTO.setOperationReason(RetryOperationReasonEnum.CLIENT_TRIGGER_RETRY_STOP.getReason());
}
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
actorRef.tell(executorResultDTO, actorRef);

View File

@ -27,6 +27,7 @@ import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.time.LocalDateTime;
import java.util.Objects;
import static com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum.RETRY_TASK_FAIL_ERROR;
@ -50,7 +51,7 @@ public class RetryFailureHandler extends AbstractRetryResultHandler {
@Override
public boolean supports(RetryResultContext context) {
return RetryResultStatusEnum.FAILURE == context.getResultStatus();
return Objects.equals(RetryResultStatusEnum.FAILURE.getStatus(), context.getResultStatus());
}
@Override
@ -92,6 +93,7 @@ public class RetryFailureHandler extends AbstractRetryResultHandler {
RetryTask retryTask = new RetryTask();
retryTask.setId(context.getRetryTaskId());
retryTask.setTaskStatus(RetryTaskStatusEnum.FAIL.getStatus());
retryTask.setOperationReason(context.getOperationReason());
Assert.isTrue(1 == retryTaskMapper.updateById(retryTask),
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName()));

View File

@ -17,10 +17,9 @@ import lombok.EqualsAndHashCode;
@Data
public class RetryResultContext extends BaseDTO {
private RetryResultStatusEnum resultStatus;
private Integer resultStatus;
private Integer operationReason;
private boolean incrementRetryCount;
private String resultJson;
private String idempotentId;
private String exceptionMsg;

View File

@ -21,6 +21,7 @@ import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.time.LocalDateTime;
import java.util.Objects;
/**
* <p>
@ -43,7 +44,7 @@ public class RetryStopHandler extends AbstractRetryResultHandler {
@Override
public boolean supports(RetryResultContext context) {
return RetryResultStatusEnum.STOP == context.getResultStatus();
return Objects.equals(RetryResultStatusEnum.STOP.getStatus(), context.getResultStatus());
}
@Override
@ -67,6 +68,7 @@ public class RetryStopHandler extends AbstractRetryResultHandler {
RetryTask retryTask = new RetryTask();
retryTask.setId(context.getRetryTaskId());
retryTask.setOperationReason(context.getOperationReason());
retryTask.setTaskStatus(RetryTaskStatusEnum.STOP.getStatus());
Assert.isTrue(1 == retryTaskMapper.updateById(retryTask),
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName()));

View File

@ -20,6 +20,7 @@ import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.time.LocalDateTime;
import java.util.Objects;
/**
* <p>
@ -40,7 +41,7 @@ public class RetrySuccessHandler extends AbstractRetryResultHandler {
@Override
public boolean supports(RetryResultContext context) {
return RetryResultStatusEnum.SUCCESS == context.getResultStatus();
return Objects.equals(RetryResultStatusEnum.SUCCESS.getStatus(), context.getResultStatus());
}
@Override

View File

@ -9,6 +9,7 @@ import com.aizuda.snailjob.server.common.cache.CacheGroupScanActor;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.dto.ScanTask;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.retry.task.support.handler.RateLimiterHandler;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
@ -35,6 +36,7 @@ public class ConsumerBucketActor extends AbstractActor {
private static final String DEFAULT_JOB_KEY = "DEFAULT_JOB_KEY";
private static final String DEFAULT_WORKFLOW_KEY = "DEFAULT_JOB_KEY";
private final SystemProperties systemProperties;
private final RateLimiterHandler rateLimiterHandler;
@Override
public Receive createReceive() {
@ -62,6 +64,10 @@ public class ConsumerBucketActor extends AbstractActor {
}
private void doScanRetry(final ConsumerBucket consumerBucket) {
// 刷新最新的配置
rateLimiterHandler.refreshRate();
ScanTask scanTask = new ScanTask();
// 通过并行度配置计算拉取范围
List<List<Integer>> partitions = Lists.partition(new ArrayList<>(consumerBucket.getBuckets()), systemProperties.getRetryMaxPullParallel());