diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/RetryOperationReasonEnum.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/RetryOperationReasonEnum.java index 8250d542..62147346 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/RetryOperationReasonEnum.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/RetryOperationReasonEnum.java @@ -28,6 +28,8 @@ public enum RetryOperationReasonEnum { MANNER_STOP(7, "手动停止"), NOT_RUNNING_RETRY(8, "当前重试非运行中"), SCENE_CLOSED(9, "当前场景已关闭"), + RETRY_FAIL(10, "重试失败"), + CLIENT_TRIGGER_RETRY_STOP(11, "客户端触发任务停止"), ; private final int reason; diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryExecutorResultDTO.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryExecutorResultDTO.java index f646b1c3..4fa0b39a 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryExecutorResultDTO.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryExecutorResultDTO.java @@ -1,5 +1,6 @@ package com.aizuda.snailjob.server.retry.task.dto; +import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum; import lombok.Data; import lombok.EqualsAndHashCode; @@ -16,7 +17,8 @@ import lombok.EqualsAndHashCode; @Data public class RetryExecutorResultDTO extends BaseDTO { - private RetryResultStatusEnum resultStatus; + private Integer resultStatus; + private Integer operationReason; private boolean incrementRetryCount; private String resultJson; private Integer statusCode; diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestCallbackClientActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestCallbackClientActor.java index 346cd421..ccfd20e6 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestCallbackClientActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestCallbackClientActor.java @@ -188,7 +188,7 @@ public class RequestCallbackClientActor extends AbstractActor { ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor(); RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO); executorResultDTO.setExceptionMsg(message); - executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE); + executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE.getStatus()); actorRef.tell(executorResultDTO, actorRef); } } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestRetryClientActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestRetryClientActor.java index 7056fd9d..d6938c69 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestRetryClientActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestRetryClientActor.java @@ -182,7 +182,7 @@ public class RequestRetryClientActor extends AbstractActor { ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor(); RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO); executorResultDTO.setExceptionMsg(message); - executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE); + executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE.getStatus()); actorRef.tell(executorResultDTO, actorRef); } } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java index 36030273..02d0dbca 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java @@ -5,6 +5,7 @@ import akka.actor.ActorRef; import cn.hutool.core.collection.CollUtil; import com.aizuda.snailjob.common.core.constant.SystemConstants; import com.aizuda.snailjob.common.core.enums.RetryStatusEnum; +import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.util.StreamUtils; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.WaitStrategy; @@ -19,8 +20,12 @@ import com.aizuda.snailjob.server.common.util.PartitionTaskUtils; import com.aizuda.snailjob.server.retry.task.dto.RetryPartitionTask; import com.aizuda.snailjob.server.retry.task.dto.RetryTaskPrepareDTO; import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; +import com.aizuda.snailjob.server.retry.task.support.handler.RateLimiterHandler; import com.aizuda.snailjob.template.datasource.access.AccessTemplate; +import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.Job; import com.aizuda.snailjob.template.datasource.persistence.po.Retry; import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; @@ -34,6 +39,7 @@ import org.springframework.stereotype.Component; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** *
@@ -51,16 +57,12 @@ public class ScanRetryActor extends AbstractActor {
private final SystemProperties systemProperties;
private final AccessTemplate accessTemplate;
private final RetryMapper retryMapper;
- private final static RateLimiter rateLimiter = RateLimiter.create(1000, 1, TimeUnit.SECONDS);
+ private final GroupConfigMapper groupConfigMapper;
+ private final RateLimiterHandler rateLimiterHandler;
@Override
public Receive createReceive() {
return receiveBuilder().match(ScanTask.class, config -> {
- // 覆盖每秒产生多少个令牌的值
- double permitsPerSecond = systemProperties.getMaxDispatchCapacity();
- if (permitsPerSecond >= 1 && permitsPerSecond != rateLimiter.getRate()) {
- rateLimiter.setRate(permitsPerSecond);
- }
try {
doScan(config);
@@ -77,8 +79,8 @@ public class ScanRetryActor extends AbstractActor {
listAvailableTasks(startId, scanTask.getBuckets()),
partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask),
partitionTasks -> {
- if (!rateLimiter.tryAcquire(partitionTasks.size())) {
- log.warn("当前节点触发限流 [{}]", rateLimiter.getRate());
+ if (CollUtil.isNotEmpty(partitionTasks) && !rateLimiterHandler.tryAcquire(partitionTasks.size())) {
+ log.warn("当前节点触发限流");
return false;
}
return true;
@@ -136,6 +138,7 @@ public class ScanRetryActor extends AbstractActor {
.select(RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval,
RetrySceneConfig::getSceneName, RetrySceneConfig::getCbTriggerType,
RetrySceneConfig::getCbTriggerInterval, RetrySceneConfig::getExecutorTimeout)
+ .eq(RetrySceneConfig::getSceneStatus, StatusEnum.YES.getStatus())
.in(RetrySceneConfig::getSceneName, sceneNameSet));
return StreamUtils.toIdentityMap(retrySceneConfigs, RetrySceneConfig::getSceneName);
}
@@ -193,6 +196,16 @@ public class ScanRetryActor extends AbstractActor {
.orderByAsc(Retry::getId))
.getRecords();
+ // 过滤已关闭的组
+ if (CollUtil.isNotEmpty(retries)) {
+ List
+ *
+ *
@@ -43,7 +44,7 @@ public class RetryStopHandler extends AbstractRetryResultHandler {
@Override
public boolean supports(RetryResultContext context) {
- return RetryResultStatusEnum.STOP == context.getResultStatus();
+ return Objects.equals(RetryResultStatusEnum.STOP.getStatus(), context.getResultStatus());
}
@Override
@@ -67,6 +68,7 @@ public class RetryStopHandler extends AbstractRetryResultHandler {
RetryTask retryTask = new RetryTask();
retryTask.setId(context.getRetryTaskId());
+ retryTask.setOperationReason(context.getOperationReason());
retryTask.setTaskStatus(RetryTaskStatusEnum.STOP.getStatus());
Assert.isTrue(1 == retryTaskMapper.updateById(retryTask),
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName()));
diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetrySuccessHandler.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetrySuccessHandler.java
index 3eb6a9c9..8ba2b764 100644
--- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetrySuccessHandler.java
+++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetrySuccessHandler.java
@@ -20,6 +20,7 @@ import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.time.LocalDateTime;
+import java.util.Objects;
/**
*
@@ -40,7 +41,7 @@ public class RetrySuccessHandler extends AbstractRetryResultHandler {
@Override
public boolean supports(RetryResultContext context) {
- return RetryResultStatusEnum.SUCCESS == context.getResultStatus();
+ return Objects.equals(RetryResultStatusEnum.SUCCESS.getStatus(), context.getResultStatus());
}
@Override
diff --git a/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java b/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java
index 5c38a17a..1b8a88a7 100644
--- a/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java
+++ b/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java
@@ -9,6 +9,7 @@ import com.aizuda.snailjob.server.common.cache.CacheGroupScanActor;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.dto.ScanTask;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
+import com.aizuda.snailjob.server.retry.task.support.handler.RateLimiterHandler;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
@@ -35,6 +36,7 @@ public class ConsumerBucketActor extends AbstractActor {
private static final String DEFAULT_JOB_KEY = "DEFAULT_JOB_KEY";
private static final String DEFAULT_WORKFLOW_KEY = "DEFAULT_JOB_KEY";
private final SystemProperties systemProperties;
+ private final RateLimiterHandler rateLimiterHandler;
@Override
public Receive createReceive() {
@@ -62,6 +64,10 @@ public class ConsumerBucketActor extends AbstractActor {
}
private void doScanRetry(final ConsumerBucket consumerBucket) {
+
+ // 刷新最新的配置
+ rateLimiterHandler.refreshRate();
+
ScanTask scanTask = new ScanTask();
// 通过并行度配置计算拉取范围
List> partitions = Lists.partition(new ArrayList<>(consumerBucket.getBuckets()), systemProperties.getRetryMaxPullParallel());