From 849e68f208754085e855093b4460a1318f9064be Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Thu, 20 Feb 2025 23:44:13 +0800 Subject: [PATCH] =?UTF-8?q?feat(1.4.0-beta1):=201.=E5=AE=A2=E6=88=B7?= =?UTF-8?q?=E7=AB=AF=E6=89=A7=E8=A1=8C=E4=B8=8A=E6=8A=A5=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E5=A4=B1=E8=B4=A5=E5=8E=9F=E5=9B=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/enums/RetryOperationReasonEnum.java | 2 + .../task/dto/RetryExecutorResultDTO.java | 4 +- .../dispatch/RequestCallbackClientActor.java | 2 +- .../dispatch/RequestRetryClientActor.java | 2 +- .../task/support/dispatch/ScanRetryActor.java | 29 ++++++++--- .../support/handler/RateLimiterHandler.java | 49 +++++++++++++++++++ .../support/handler/RetryTaskStopHandler.java | 14 +++--- ...eportDispatchResultHttpRequestHandler.java | 9 +++- .../support/result/RetryFailureHandler.java | 4 +- .../support/result/RetryResultContext.java | 5 +- .../task/support/result/RetryStopHandler.java | 4 +- .../support/result/RetrySuccessHandler.java | 3 +- .../starter/dispatch/ConsumerBucketActor.java | 6 +++ 13 files changed, 109 insertions(+), 24 deletions(-) create mode 100644 snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/RateLimiterHandler.java diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/RetryOperationReasonEnum.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/RetryOperationReasonEnum.java index 8250d5426..62147346b 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/RetryOperationReasonEnum.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/RetryOperationReasonEnum.java @@ -28,6 +28,8 @@ public enum RetryOperationReasonEnum { MANNER_STOP(7, "手动停止"), NOT_RUNNING_RETRY(8, "当前重试非运行中"), SCENE_CLOSED(9, "当前场景已关闭"), + RETRY_FAIL(10, "重试失败"), + CLIENT_TRIGGER_RETRY_STOP(11, "客户端触发任务停止"), ; private final int reason; diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryExecutorResultDTO.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryExecutorResultDTO.java index f646b1c39..4fa0b39a2 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryExecutorResultDTO.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryExecutorResultDTO.java @@ -1,5 +1,6 @@ package com.aizuda.snailjob.server.retry.task.dto; +import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum; import lombok.Data; import lombok.EqualsAndHashCode; @@ -16,7 +17,8 @@ import lombok.EqualsAndHashCode; @Data public class RetryExecutorResultDTO extends BaseDTO { - private RetryResultStatusEnum resultStatus; + private Integer resultStatus; + private Integer operationReason; private boolean incrementRetryCount; private String resultJson; private Integer statusCode; diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestCallbackClientActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestCallbackClientActor.java index 346cd421b..ccfd20e6d 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestCallbackClientActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestCallbackClientActor.java @@ -188,7 +188,7 @@ public class RequestCallbackClientActor extends AbstractActor { ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor(); RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO); executorResultDTO.setExceptionMsg(message); - executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE); + executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE.getStatus()); actorRef.tell(executorResultDTO, actorRef); } } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestRetryClientActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestRetryClientActor.java index 7056fd9dc..d6938c69b 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestRetryClientActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestRetryClientActor.java @@ -182,7 +182,7 @@ public class RequestRetryClientActor extends AbstractActor { ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor(); RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO); executorResultDTO.setExceptionMsg(message); - executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE); + executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE.getStatus()); actorRef.tell(executorResultDTO, actorRef); } } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java index 36030273f..02d0dbcaf 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java @@ -5,6 +5,7 @@ import akka.actor.ActorRef; import cn.hutool.core.collection.CollUtil; import com.aizuda.snailjob.common.core.constant.SystemConstants; import com.aizuda.snailjob.common.core.enums.RetryStatusEnum; +import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.util.StreamUtils; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.WaitStrategy; @@ -19,8 +20,12 @@ import com.aizuda.snailjob.server.common.util.PartitionTaskUtils; import com.aizuda.snailjob.server.retry.task.dto.RetryPartitionTask; import com.aizuda.snailjob.server.retry.task.dto.RetryTaskPrepareDTO; import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; +import com.aizuda.snailjob.server.retry.task.support.handler.RateLimiterHandler; import com.aizuda.snailjob.template.datasource.access.AccessTemplate; +import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.Job; import com.aizuda.snailjob.template.datasource.persistence.po.Retry; import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; @@ -34,6 +39,7 @@ import org.springframework.stereotype.Component; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** *
@@ -51,16 +57,12 @@ public class ScanRetryActor extends AbstractActor {
private final SystemProperties systemProperties;
private final AccessTemplate accessTemplate;
private final RetryMapper retryMapper;
- private final static RateLimiter rateLimiter = RateLimiter.create(1000, 1, TimeUnit.SECONDS);
+ private final GroupConfigMapper groupConfigMapper;
+ private final RateLimiterHandler rateLimiterHandler;
@Override
public Receive createReceive() {
return receiveBuilder().match(ScanTask.class, config -> {
- // 覆盖每秒产生多少个令牌的值
- double permitsPerSecond = systemProperties.getMaxDispatchCapacity();
- if (permitsPerSecond >= 1 && permitsPerSecond != rateLimiter.getRate()) {
- rateLimiter.setRate(permitsPerSecond);
- }
try {
doScan(config);
@@ -77,8 +79,8 @@ public class ScanRetryActor extends AbstractActor {
listAvailableTasks(startId, scanTask.getBuckets()),
partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask),
partitionTasks -> {
- if (!rateLimiter.tryAcquire(partitionTasks.size())) {
- log.warn("当前节点触发限流 [{}]", rateLimiter.getRate());
+ if (CollUtil.isNotEmpty(partitionTasks) && !rateLimiterHandler.tryAcquire(partitionTasks.size())) {
+ log.warn("当前节点触发限流");
return false;
}
return true;
@@ -136,6 +138,7 @@ public class ScanRetryActor extends AbstractActor {
.select(RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval,
RetrySceneConfig::getSceneName, RetrySceneConfig::getCbTriggerType,
RetrySceneConfig::getCbTriggerInterval, RetrySceneConfig::getExecutorTimeout)
+ .eq(RetrySceneConfig::getSceneStatus, StatusEnum.YES.getStatus())
.in(RetrySceneConfig::getSceneName, sceneNameSet));
return StreamUtils.toIdentityMap(retrySceneConfigs, RetrySceneConfig::getSceneName);
}
@@ -193,6 +196,16 @@ public class ScanRetryActor extends AbstractActor {
.orderByAsc(Retry::getId))
.getRecords();
+ // 过滤已关闭的组
+ if (CollUtil.isNotEmpty(retries)) {
+ List
+ *
+ *
@@ -43,7 +44,7 @@ public class RetryStopHandler extends AbstractRetryResultHandler {
@Override
public boolean supports(RetryResultContext context) {
- return RetryResultStatusEnum.STOP == context.getResultStatus();
+ return Objects.equals(RetryResultStatusEnum.STOP.getStatus(), context.getResultStatus());
}
@Override
@@ -67,6 +68,7 @@ public class RetryStopHandler extends AbstractRetryResultHandler {
RetryTask retryTask = new RetryTask();
retryTask.setId(context.getRetryTaskId());
+ retryTask.setOperationReason(context.getOperationReason());
retryTask.setTaskStatus(RetryTaskStatusEnum.STOP.getStatus());
Assert.isTrue(1 == retryTaskMapper.updateById(retryTask),
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName()));
diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetrySuccessHandler.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetrySuccessHandler.java
index 3eb6a9c98..8ba2b764a 100644
--- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetrySuccessHandler.java
+++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetrySuccessHandler.java
@@ -20,6 +20,7 @@ import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.time.LocalDateTime;
+import java.util.Objects;
/**
*
@@ -40,7 +41,7 @@ public class RetrySuccessHandler extends AbstractRetryResultHandler {
@Override
public boolean supports(RetryResultContext context) {
- return RetryResultStatusEnum.SUCCESS == context.getResultStatus();
+ return Objects.equals(RetryResultStatusEnum.SUCCESS.getStatus(), context.getResultStatus());
}
@Override
diff --git a/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java b/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java
index 5c38a17a1..1b8a88a7d 100644
--- a/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java
+++ b/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java
@@ -9,6 +9,7 @@ import com.aizuda.snailjob.server.common.cache.CacheGroupScanActor;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.dto.ScanTask;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
+import com.aizuda.snailjob.server.retry.task.support.handler.RateLimiterHandler;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
@@ -35,6 +36,7 @@ public class ConsumerBucketActor extends AbstractActor {
private static final String DEFAULT_JOB_KEY = "DEFAULT_JOB_KEY";
private static final String DEFAULT_WORKFLOW_KEY = "DEFAULT_JOB_KEY";
private final SystemProperties systemProperties;
+ private final RateLimiterHandler rateLimiterHandler;
@Override
public Receive createReceive() {
@@ -62,6 +64,10 @@ public class ConsumerBucketActor extends AbstractActor {
}
private void doScanRetry(final ConsumerBucket consumerBucket) {
+
+ // 刷新最新的配置
+ rateLimiterHandler.refreshRate();
+
ScanTask scanTask = new ScanTask();
// 通过并行度配置计算拉取范围
List> partitions = Lists.partition(new ArrayList<>(consumerBucket.getBuckets()), systemProperties.getRetryMaxPullParallel());